fix: DynamicBearer auth, retry on 500/429, upload resilience

- DynamicBearer AuthMethod: La Suite clients resolve tokens fresh
  per-request from cache file, surviving token expiry mid-session
- Retry with exponential backoff on all Drive API calls (create_child,
  upload_ended) — up to 5 retries on 429/500/502/503
- Token refresh triggered on 500 before retry (handles expired SSO)
- S3 upload retry with backoff (up to 3 retries on 502/503)
- Connection pooling: reuse DriveClient HTTP client for S3 PUTs
- Folder/file dedup: skip existing items on re-upload
This commit is contained in:
2026-03-24 15:25:01 +00:00
parent de5c807374
commit cd80a57a40
3 changed files with 80 additions and 28 deletions

View File

@@ -674,6 +674,16 @@ pub fn get_gitea_token() -> Result<String> {
}) })
} }
/// Get cached SSO access token synchronously (reads from cache file).
/// If the token was recently refreshed by the async `get_token()`, this
/// returns the fresh one. Used by DynamicBearer for per-request auth.
pub fn get_token_sync() -> Result<String> {
let cached = read_cache().map_err(|_| {
SunbeamError::identity("Not logged in. Run `sunbeam auth login` first.")
})?;
Ok(cached.access_token)
}
/// Get cached OIDC id_token (JWT). /// Get cached OIDC id_token (JWT).
pub fn get_id_token() -> Result<String> { pub fn get_id_token() -> Result<String> {
let tokens = read_cache().map_err(|_| { let tokens = read_cache().map_err(|_| {

View File

@@ -20,6 +20,8 @@ pub enum AuthMethod {
None, None,
/// Bearer token (`Authorization: Bearer <token>`). /// Bearer token (`Authorization: Bearer <token>`).
Bearer(String), Bearer(String),
/// Dynamic bearer — resolves token fresh on each request (survives expiry).
DynamicBearer,
/// Custom header (e.g. `X-Vault-Token`). /// Custom header (e.g. `X-Vault-Token`).
Header { name: &'static str, value: String }, Header { name: &'static str, value: String },
/// Gitea-style PAT (`Authorization: token <pat>`). /// Gitea-style PAT (`Authorization: token <pat>`).
@@ -84,6 +86,12 @@ impl HttpTransport {
AuthMethod::Bearer(token) => { AuthMethod::Bearer(token) => {
req = req.bearer_auth(token); req = req.bearer_auth(token);
} }
AuthMethod::DynamicBearer => {
// Resolve token fresh on each request — survives token expiry/refresh.
if let Ok(token) = crate::auth::get_token_sync() {
req = req.bearer_auth(token);
}
}
AuthMethod::Header { name, value } => { AuthMethod::Header { name, value } => {
req = req.header(*name, value); req = req.header(*name, value);
} }
@@ -427,64 +435,65 @@ impl SunbeamClient {
#[cfg(feature = "lasuite")] #[cfg(feature = "lasuite")]
pub async fn people(&self) -> Result<&crate::lasuite::PeopleClient> { pub async fn people(&self) -> Result<&crate::lasuite::PeopleClient> {
// Ensure we have a valid token (triggers refresh if expired).
self.sso_token().await?;
self.people.get_or_try_init(|| async { self.people.get_or_try_init(|| async {
let token = self.sso_token().await?;
let url = format!("https://people.{}/external_api/v1.0", self.domain); let url = format!("https://people.{}/external_api/v1.0", self.domain);
Ok(crate::lasuite::PeopleClient::from_parts(url, AuthMethod::Bearer(token))) Ok(crate::lasuite::PeopleClient::from_parts(url, AuthMethod::DynamicBearer))
}).await }).await
} }
#[cfg(feature = "lasuite")] #[cfg(feature = "lasuite")]
pub async fn docs(&self) -> Result<&crate::lasuite::DocsClient> { pub async fn docs(&self) -> Result<&crate::lasuite::DocsClient> {
self.sso_token().await?;
self.docs.get_or_try_init(|| async { self.docs.get_or_try_init(|| async {
let token = self.sso_token().await?;
let url = format!("https://docs.{}/external_api/v1.0", self.domain); let url = format!("https://docs.{}/external_api/v1.0", self.domain);
Ok(crate::lasuite::DocsClient::from_parts(url, AuthMethod::Bearer(token))) Ok(crate::lasuite::DocsClient::from_parts(url, AuthMethod::DynamicBearer))
}).await }).await
} }
#[cfg(feature = "lasuite")] #[cfg(feature = "lasuite")]
pub async fn meet(&self) -> Result<&crate::lasuite::MeetClient> { pub async fn meet(&self) -> Result<&crate::lasuite::MeetClient> {
self.sso_token().await?;
self.meet.get_or_try_init(|| async { self.meet.get_or_try_init(|| async {
let token = self.sso_token().await?;
let url = format!("https://meet.{}/external_api/v1.0", self.domain); let url = format!("https://meet.{}/external_api/v1.0", self.domain);
Ok(crate::lasuite::MeetClient::from_parts(url, AuthMethod::Bearer(token))) Ok(crate::lasuite::MeetClient::from_parts(url, AuthMethod::DynamicBearer))
}).await }).await
} }
#[cfg(feature = "lasuite")] #[cfg(feature = "lasuite")]
pub async fn drive(&self) -> Result<&crate::lasuite::DriveClient> { pub async fn drive(&self) -> Result<&crate::lasuite::DriveClient> {
self.sso_token().await?;
self.drive.get_or_try_init(|| async { self.drive.get_or_try_init(|| async {
let token = self.sso_token().await?;
let url = format!("https://drive.{}/external_api/v1.0", self.domain); let url = format!("https://drive.{}/external_api/v1.0", self.domain);
Ok(crate::lasuite::DriveClient::from_parts(url, AuthMethod::Bearer(token))) Ok(crate::lasuite::DriveClient::from_parts(url, AuthMethod::DynamicBearer))
}).await }).await
} }
#[cfg(feature = "lasuite")] #[cfg(feature = "lasuite")]
pub async fn messages(&self) -> Result<&crate::lasuite::MessagesClient> { pub async fn messages(&self) -> Result<&crate::lasuite::MessagesClient> {
self.sso_token().await?;
self.messages.get_or_try_init(|| async { self.messages.get_or_try_init(|| async {
let token = self.sso_token().await?;
let url = format!("https://mail.{}/external_api/v1.0", self.domain); let url = format!("https://mail.{}/external_api/v1.0", self.domain);
Ok(crate::lasuite::MessagesClient::from_parts(url, AuthMethod::Bearer(token))) Ok(crate::lasuite::MessagesClient::from_parts(url, AuthMethod::DynamicBearer))
}).await }).await
} }
#[cfg(feature = "lasuite")] #[cfg(feature = "lasuite")]
pub async fn calendars(&self) -> Result<&crate::lasuite::CalendarsClient> { pub async fn calendars(&self) -> Result<&crate::lasuite::CalendarsClient> {
self.sso_token().await?;
self.calendars.get_or_try_init(|| async { self.calendars.get_or_try_init(|| async {
let token = self.sso_token().await?;
let url = format!("https://calendar.{}/external_api/v1.0", self.domain); let url = format!("https://calendar.{}/external_api/v1.0", self.domain);
Ok(crate::lasuite::CalendarsClient::from_parts(url, AuthMethod::Bearer(token))) Ok(crate::lasuite::CalendarsClient::from_parts(url, AuthMethod::DynamicBearer))
}).await }).await
} }
#[cfg(feature = "lasuite")] #[cfg(feature = "lasuite")]
pub async fn find(&self) -> Result<&crate::lasuite::FindClient> { pub async fn find(&self) -> Result<&crate::lasuite::FindClient> {
self.sso_token().await?;
self.find.get_or_try_init(|| async { self.find.get_or_try_init(|| async {
let token = self.sso_token().await?;
let url = format!("https://find.{}/external_api/v1.0", self.domain); let url = format!("https://find.{}/external_api/v1.0", self.domain);
Ok(crate::lasuite::FindClient::from_parts(url, AuthMethod::Bearer(token))) Ok(crate::lasuite::FindClient::from_parts(url, AuthMethod::DynamicBearer))
}).await }).await
} }

View File

@@ -559,7 +559,7 @@ pub enum DriveCommand {
#[arg(short = 't', long)] #[arg(short = 't', long)]
folder_id: String, folder_id: String,
/// Number of concurrent uploads. /// Number of concurrent uploads.
#[arg(long, default_value = "8")] #[arg(long, default_value = "3")]
parallel: usize, parallel: usize,
}, },
} }
@@ -1008,6 +1008,7 @@ async fn collect_upload_jobs(
} }
/// Upload a single file to Drive, updating the progress bar. /// Upload a single file to Drive, updating the progress bar.
/// Retries on 429/500/502/503 up to 5 times with exponential backoff.
async fn upload_single_file_with_progress( async fn upload_single_file_with_progress(
drive: &super::DriveClient, drive: &super::DriveClient,
job: &UploadJob, job: &UploadJob,
@@ -1019,17 +1020,13 @@ async fn upload_single_file_with_progress(
.and_then(|n| n.to_str()) .and_then(|n| n.to_str())
.unwrap_or("unnamed"); .unwrap_or("unnamed");
// Create the file item in Drive // Create the file item in Drive (with retry)
let item = drive let body = serde_json::json!({
.create_child(
&job.parent_id,
&serde_json::json!({
"title": filename, "title": filename,
"filename": filename, "filename": filename,
"type": "file", "type": "file",
}), });
) let item = retry_drive_call(|| drive.create_child(&job.parent_id, &body), 5).await?;
.await?;
let item_id = item["id"] let item_id = item["id"]
.as_str() .as_str()
@@ -1054,12 +1051,48 @@ async fn upload_single_file_with_progress(
.await?; .await?;
pb.set_position(len); pb.set_position(len);
// Notify Drive the upload is complete // Notify Drive the upload is complete (with retry)
drive.upload_ended(item_id).await?; retry_drive_call(|| drive.upload_ended(item_id), 5).await?;
Ok(()) Ok(())
} }
/// Retry a Drive API call on 429/500/502/503 with exponential backoff.
async fn retry_drive_call<F, Fut, T>(f: F, max_retries: u32) -> Result<T>
where
F: Fn() -> Fut,
Fut: std::future::Future<Output = Result<T>>,
{
let mut last_err = None;
for attempt in 0..=max_retries {
match f().await {
Ok(v) => return Ok(v),
Err(e) => {
let msg = e.to_string();
let retryable = msg.contains("429")
|| msg.contains("500")
|| msg.contains("502")
|| msg.contains("503")
|| msg.contains("request failed");
if retryable && attempt < max_retries {
// On 500, try refreshing the SSO token (may have expired)
if msg.contains("500") {
let _ = crate::auth::get_token().await;
}
let delay = std::time::Duration::from_millis(
500 * 2u64.pow(attempt.min(4)),
);
tokio::time::sleep(delay).await;
last_err = Some(e);
continue;
}
return Err(e);
}
}
}
Err(last_err.unwrap())
}
// ═══════════════════════════════════════════════════════════════════════════ // ═══════════════════════════════════════════════════════════════════════════
// Mail (Messages) // Mail (Messages)
// ═══════════════════════════════════════════════════════════════════════════ // ═══════════════════════════════════════════════════════════════════════════