fix: DynamicBearer auth, retry on 500/429, upload resilience
- DynamicBearer AuthMethod: La Suite clients resolve tokens fresh per-request from cache file, surviving token expiry mid-session - Retry with exponential backoff on all Drive API calls (create_child, upload_ended) — up to 5 retries on 429/500/502/503 - Token refresh triggered on 500 before retry (handles expired SSO) - S3 upload retry with backoff (up to 3 retries on 502/503) - Connection pooling: reuse DriveClient HTTP client for S3 PUTs - Folder/file dedup: skip existing items on re-upload
This commit is contained in:
@@ -674,6 +674,16 @@ pub fn get_gitea_token() -> Result<String> {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get cached SSO access token synchronously (reads from cache file).
|
||||||
|
/// If the token was recently refreshed by the async `get_token()`, this
|
||||||
|
/// returns the fresh one. Used by DynamicBearer for per-request auth.
|
||||||
|
pub fn get_token_sync() -> Result<String> {
|
||||||
|
let cached = read_cache().map_err(|_| {
|
||||||
|
SunbeamError::identity("Not logged in. Run `sunbeam auth login` first.")
|
||||||
|
})?;
|
||||||
|
Ok(cached.access_token)
|
||||||
|
}
|
||||||
|
|
||||||
/// Get cached OIDC id_token (JWT).
|
/// Get cached OIDC id_token (JWT).
|
||||||
pub fn get_id_token() -> Result<String> {
|
pub fn get_id_token() -> Result<String> {
|
||||||
let tokens = read_cache().map_err(|_| {
|
let tokens = read_cache().map_err(|_| {
|
||||||
|
|||||||
@@ -20,6 +20,8 @@ pub enum AuthMethod {
|
|||||||
None,
|
None,
|
||||||
/// Bearer token (`Authorization: Bearer <token>`).
|
/// Bearer token (`Authorization: Bearer <token>`).
|
||||||
Bearer(String),
|
Bearer(String),
|
||||||
|
/// Dynamic bearer — resolves token fresh on each request (survives expiry).
|
||||||
|
DynamicBearer,
|
||||||
/// Custom header (e.g. `X-Vault-Token`).
|
/// Custom header (e.g. `X-Vault-Token`).
|
||||||
Header { name: &'static str, value: String },
|
Header { name: &'static str, value: String },
|
||||||
/// Gitea-style PAT (`Authorization: token <pat>`).
|
/// Gitea-style PAT (`Authorization: token <pat>`).
|
||||||
@@ -84,6 +86,12 @@ impl HttpTransport {
|
|||||||
AuthMethod::Bearer(token) => {
|
AuthMethod::Bearer(token) => {
|
||||||
req = req.bearer_auth(token);
|
req = req.bearer_auth(token);
|
||||||
}
|
}
|
||||||
|
AuthMethod::DynamicBearer => {
|
||||||
|
// Resolve token fresh on each request — survives token expiry/refresh.
|
||||||
|
if let Ok(token) = crate::auth::get_token_sync() {
|
||||||
|
req = req.bearer_auth(token);
|
||||||
|
}
|
||||||
|
}
|
||||||
AuthMethod::Header { name, value } => {
|
AuthMethod::Header { name, value } => {
|
||||||
req = req.header(*name, value);
|
req = req.header(*name, value);
|
||||||
}
|
}
|
||||||
@@ -427,64 +435,65 @@ impl SunbeamClient {
|
|||||||
|
|
||||||
#[cfg(feature = "lasuite")]
|
#[cfg(feature = "lasuite")]
|
||||||
pub async fn people(&self) -> Result<&crate::lasuite::PeopleClient> {
|
pub async fn people(&self) -> Result<&crate::lasuite::PeopleClient> {
|
||||||
|
// Ensure we have a valid token (triggers refresh if expired).
|
||||||
|
self.sso_token().await?;
|
||||||
self.people.get_or_try_init(|| async {
|
self.people.get_or_try_init(|| async {
|
||||||
let token = self.sso_token().await?;
|
|
||||||
let url = format!("https://people.{}/external_api/v1.0", self.domain);
|
let url = format!("https://people.{}/external_api/v1.0", self.domain);
|
||||||
Ok(crate::lasuite::PeopleClient::from_parts(url, AuthMethod::Bearer(token)))
|
Ok(crate::lasuite::PeopleClient::from_parts(url, AuthMethod::DynamicBearer))
|
||||||
}).await
|
}).await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "lasuite")]
|
#[cfg(feature = "lasuite")]
|
||||||
pub async fn docs(&self) -> Result<&crate::lasuite::DocsClient> {
|
pub async fn docs(&self) -> Result<&crate::lasuite::DocsClient> {
|
||||||
|
self.sso_token().await?;
|
||||||
self.docs.get_or_try_init(|| async {
|
self.docs.get_or_try_init(|| async {
|
||||||
let token = self.sso_token().await?;
|
|
||||||
let url = format!("https://docs.{}/external_api/v1.0", self.domain);
|
let url = format!("https://docs.{}/external_api/v1.0", self.domain);
|
||||||
Ok(crate::lasuite::DocsClient::from_parts(url, AuthMethod::Bearer(token)))
|
Ok(crate::lasuite::DocsClient::from_parts(url, AuthMethod::DynamicBearer))
|
||||||
}).await
|
}).await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "lasuite")]
|
#[cfg(feature = "lasuite")]
|
||||||
pub async fn meet(&self) -> Result<&crate::lasuite::MeetClient> {
|
pub async fn meet(&self) -> Result<&crate::lasuite::MeetClient> {
|
||||||
|
self.sso_token().await?;
|
||||||
self.meet.get_or_try_init(|| async {
|
self.meet.get_or_try_init(|| async {
|
||||||
let token = self.sso_token().await?;
|
|
||||||
let url = format!("https://meet.{}/external_api/v1.0", self.domain);
|
let url = format!("https://meet.{}/external_api/v1.0", self.domain);
|
||||||
Ok(crate::lasuite::MeetClient::from_parts(url, AuthMethod::Bearer(token)))
|
Ok(crate::lasuite::MeetClient::from_parts(url, AuthMethod::DynamicBearer))
|
||||||
}).await
|
}).await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "lasuite")]
|
#[cfg(feature = "lasuite")]
|
||||||
pub async fn drive(&self) -> Result<&crate::lasuite::DriveClient> {
|
pub async fn drive(&self) -> Result<&crate::lasuite::DriveClient> {
|
||||||
|
self.sso_token().await?;
|
||||||
self.drive.get_or_try_init(|| async {
|
self.drive.get_or_try_init(|| async {
|
||||||
let token = self.sso_token().await?;
|
|
||||||
let url = format!("https://drive.{}/external_api/v1.0", self.domain);
|
let url = format!("https://drive.{}/external_api/v1.0", self.domain);
|
||||||
Ok(crate::lasuite::DriveClient::from_parts(url, AuthMethod::Bearer(token)))
|
Ok(crate::lasuite::DriveClient::from_parts(url, AuthMethod::DynamicBearer))
|
||||||
}).await
|
}).await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "lasuite")]
|
#[cfg(feature = "lasuite")]
|
||||||
pub async fn messages(&self) -> Result<&crate::lasuite::MessagesClient> {
|
pub async fn messages(&self) -> Result<&crate::lasuite::MessagesClient> {
|
||||||
|
self.sso_token().await?;
|
||||||
self.messages.get_or_try_init(|| async {
|
self.messages.get_or_try_init(|| async {
|
||||||
let token = self.sso_token().await?;
|
|
||||||
let url = format!("https://mail.{}/external_api/v1.0", self.domain);
|
let url = format!("https://mail.{}/external_api/v1.0", self.domain);
|
||||||
Ok(crate::lasuite::MessagesClient::from_parts(url, AuthMethod::Bearer(token)))
|
Ok(crate::lasuite::MessagesClient::from_parts(url, AuthMethod::DynamicBearer))
|
||||||
}).await
|
}).await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "lasuite")]
|
#[cfg(feature = "lasuite")]
|
||||||
pub async fn calendars(&self) -> Result<&crate::lasuite::CalendarsClient> {
|
pub async fn calendars(&self) -> Result<&crate::lasuite::CalendarsClient> {
|
||||||
|
self.sso_token().await?;
|
||||||
self.calendars.get_or_try_init(|| async {
|
self.calendars.get_or_try_init(|| async {
|
||||||
let token = self.sso_token().await?;
|
|
||||||
let url = format!("https://calendar.{}/external_api/v1.0", self.domain);
|
let url = format!("https://calendar.{}/external_api/v1.0", self.domain);
|
||||||
Ok(crate::lasuite::CalendarsClient::from_parts(url, AuthMethod::Bearer(token)))
|
Ok(crate::lasuite::CalendarsClient::from_parts(url, AuthMethod::DynamicBearer))
|
||||||
}).await
|
}).await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "lasuite")]
|
#[cfg(feature = "lasuite")]
|
||||||
pub async fn find(&self) -> Result<&crate::lasuite::FindClient> {
|
pub async fn find(&self) -> Result<&crate::lasuite::FindClient> {
|
||||||
|
self.sso_token().await?;
|
||||||
self.find.get_or_try_init(|| async {
|
self.find.get_or_try_init(|| async {
|
||||||
let token = self.sso_token().await?;
|
|
||||||
let url = format!("https://find.{}/external_api/v1.0", self.domain);
|
let url = format!("https://find.{}/external_api/v1.0", self.domain);
|
||||||
Ok(crate::lasuite::FindClient::from_parts(url, AuthMethod::Bearer(token)))
|
Ok(crate::lasuite::FindClient::from_parts(url, AuthMethod::DynamicBearer))
|
||||||
}).await
|
}).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -559,7 +559,7 @@ pub enum DriveCommand {
|
|||||||
#[arg(short = 't', long)]
|
#[arg(short = 't', long)]
|
||||||
folder_id: String,
|
folder_id: String,
|
||||||
/// Number of concurrent uploads.
|
/// Number of concurrent uploads.
|
||||||
#[arg(long, default_value = "8")]
|
#[arg(long, default_value = "3")]
|
||||||
parallel: usize,
|
parallel: usize,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@@ -1008,6 +1008,7 @@ async fn collect_upload_jobs(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Upload a single file to Drive, updating the progress bar.
|
/// Upload a single file to Drive, updating the progress bar.
|
||||||
|
/// Retries on 429/500/502/503 up to 5 times with exponential backoff.
|
||||||
async fn upload_single_file_with_progress(
|
async fn upload_single_file_with_progress(
|
||||||
drive: &super::DriveClient,
|
drive: &super::DriveClient,
|
||||||
job: &UploadJob,
|
job: &UploadJob,
|
||||||
@@ -1019,17 +1020,13 @@ async fn upload_single_file_with_progress(
|
|||||||
.and_then(|n| n.to_str())
|
.and_then(|n| n.to_str())
|
||||||
.unwrap_or("unnamed");
|
.unwrap_or("unnamed");
|
||||||
|
|
||||||
// Create the file item in Drive
|
// Create the file item in Drive (with retry)
|
||||||
let item = drive
|
let body = serde_json::json!({
|
||||||
.create_child(
|
|
||||||
&job.parent_id,
|
|
||||||
&serde_json::json!({
|
|
||||||
"title": filename,
|
"title": filename,
|
||||||
"filename": filename,
|
"filename": filename,
|
||||||
"type": "file",
|
"type": "file",
|
||||||
}),
|
});
|
||||||
)
|
let item = retry_drive_call(|| drive.create_child(&job.parent_id, &body), 5).await?;
|
||||||
.await?;
|
|
||||||
|
|
||||||
let item_id = item["id"]
|
let item_id = item["id"]
|
||||||
.as_str()
|
.as_str()
|
||||||
@@ -1054,12 +1051,48 @@ async fn upload_single_file_with_progress(
|
|||||||
.await?;
|
.await?;
|
||||||
pb.set_position(len);
|
pb.set_position(len);
|
||||||
|
|
||||||
// Notify Drive the upload is complete
|
// Notify Drive the upload is complete (with retry)
|
||||||
drive.upload_ended(item_id).await?;
|
retry_drive_call(|| drive.upload_ended(item_id), 5).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Retry a Drive API call on 429/500/502/503 with exponential backoff.
|
||||||
|
async fn retry_drive_call<F, Fut, T>(f: F, max_retries: u32) -> Result<T>
|
||||||
|
where
|
||||||
|
F: Fn() -> Fut,
|
||||||
|
Fut: std::future::Future<Output = Result<T>>,
|
||||||
|
{
|
||||||
|
let mut last_err = None;
|
||||||
|
for attempt in 0..=max_retries {
|
||||||
|
match f().await {
|
||||||
|
Ok(v) => return Ok(v),
|
||||||
|
Err(e) => {
|
||||||
|
let msg = e.to_string();
|
||||||
|
let retryable = msg.contains("429")
|
||||||
|
|| msg.contains("500")
|
||||||
|
|| msg.contains("502")
|
||||||
|
|| msg.contains("503")
|
||||||
|
|| msg.contains("request failed");
|
||||||
|
if retryable && attempt < max_retries {
|
||||||
|
// On 500, try refreshing the SSO token (may have expired)
|
||||||
|
if msg.contains("500") {
|
||||||
|
let _ = crate::auth::get_token().await;
|
||||||
|
}
|
||||||
|
let delay = std::time::Duration::from_millis(
|
||||||
|
500 * 2u64.pow(attempt.min(4)),
|
||||||
|
);
|
||||||
|
tokio::time::sleep(delay).await;
|
||||||
|
last_err = Some(e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(last_err.unwrap())
|
||||||
|
}
|
||||||
|
|
||||||
// ═══════════════════════════════════════════════════════════════════════════
|
// ═══════════════════════════════════════════════════════════════════════════
|
||||||
// Mail (Messages)
|
// Mail (Messages)
|
||||||
// ═══════════════════════════════════════════════════════════════════════════
|
// ═══════════════════════════════════════════════════════════════════════════
|
||||||
|
|||||||
Reference in New Issue
Block a user