fix: DynamicBearer auth, retry on 500/429, upload resilience

- DynamicBearer AuthMethod: La Suite clients resolve tokens fresh
  per-request from cache file, surviving token expiry mid-session
- Retry with exponential backoff on all Drive API calls (create_child,
  upload_ended) — up to 5 retries on 429/500/502/503
- Token refresh triggered on 500 before retry (handles expired SSO)
- S3 upload retry with backoff (up to 3 retries on 502/503)
- Connection pooling: reuse DriveClient HTTP client for S3 PUTs
- Folder/file dedup: skip existing items on re-upload
This commit is contained in:
2026-03-24 15:25:01 +00:00
parent de5c807374
commit cd80a57a40
3 changed files with 80 additions and 28 deletions

View File

@@ -559,7 +559,7 @@ pub enum DriveCommand {
#[arg(short = 't', long)]
folder_id: String,
/// Number of concurrent uploads.
#[arg(long, default_value = "8")]
#[arg(long, default_value = "3")]
parallel: usize,
},
}
@@ -1008,6 +1008,7 @@ async fn collect_upload_jobs(
}
/// Upload a single file to Drive, updating the progress bar.
/// Retries on 429/500/502/503 up to 5 times with exponential backoff.
async fn upload_single_file_with_progress(
drive: &super::DriveClient,
job: &UploadJob,
@@ -1019,17 +1020,13 @@ async fn upload_single_file_with_progress(
.and_then(|n| n.to_str())
.unwrap_or("unnamed");
// Create the file item in Drive
let item = drive
.create_child(
&job.parent_id,
&serde_json::json!({
"title": filename,
"filename": filename,
"type": "file",
}),
)
.await?;
// Create the file item in Drive (with retry)
let body = serde_json::json!({
"title": filename,
"filename": filename,
"type": "file",
});
let item = retry_drive_call(|| drive.create_child(&job.parent_id, &body), 5).await?;
let item_id = item["id"]
.as_str()
@@ -1054,12 +1051,48 @@ async fn upload_single_file_with_progress(
.await?;
pb.set_position(len);
// Notify Drive the upload is complete
drive.upload_ended(item_id).await?;
// Notify Drive the upload is complete (with retry)
retry_drive_call(|| drive.upload_ended(item_id), 5).await?;
Ok(())
}
/// Retry a Drive API call on 429/500/502/503 with exponential backoff.
async fn retry_drive_call<F, Fut, T>(f: F, max_retries: u32) -> Result<T>
where
F: Fn() -> Fut,
Fut: std::future::Future<Output = Result<T>>,
{
let mut last_err = None;
for attempt in 0..=max_retries {
match f().await {
Ok(v) => return Ok(v),
Err(e) => {
let msg = e.to_string();
let retryable = msg.contains("429")
|| msg.contains("500")
|| msg.contains("502")
|| msg.contains("503")
|| msg.contains("request failed");
if retryable && attempt < max_retries {
// On 500, try refreshing the SSO token (may have expired)
if msg.contains("500") {
let _ = crate::auth::get_token().await;
}
let delay = std::time::Duration::from_millis(
500 * 2u64.pow(attempt.min(4)),
);
tokio::time::sleep(delay).await;
last_err = Some(e);
continue;
}
return Err(e);
}
}
}
Err(last_err.unwrap())
}
// ═══════════════════════════════════════════════════════════════════════════
// Mail (Messages)
// ═══════════════════════════════════════════════════════════════════════════