5 Commits

Author SHA1 Message Date
4d9659a8bb chore: bump to v1.1.1, update CHANGELOG 2026-03-24 15:29:05 +00:00
cd80a57a40 fix: DynamicBearer auth, retry on 500/429, upload resilience
- DynamicBearer AuthMethod: La Suite clients resolve tokens fresh
  per-request from cache file, surviving token expiry mid-session
- Retry with exponential backoff on all Drive API calls (create_child,
  upload_ended) — up to 5 retries on 429/500/502/503
- Token refresh triggered on 500 before retry (handles expired SSO)
- S3 upload retry with backoff (up to 3 retries on 502/503)
- Connection pooling: reuse DriveClient HTTP client for S3 PUTs
- Folder/file dedup: skip existing items on re-upload
2026-03-24 15:25:01 +00:00
de5c807374 fix: progress bar tracks files not bytes, retry on 502, dedup folders
- Overall bar progress based on file count (was bytes, causing 50%
  bar at low file count when large files uploaded first)
- Bandwidth computed manually from completed bytes / elapsed time
- Per-file bars show spinner + name only (no misleading 0 B counter)
- S3 upload retries up to 3x on 502/503 with backoff
- Folder dedup: list_children before create, reuse existing folders
- File dedup: skip files already present in target folder
- Connection pooling: reuse DriveClient's HTTP client for S3 PUTs
- Default parallel back to 8 (retries handle transient 502s)
2026-03-24 14:55:03 +00:00
2ab2fd5b8f fix: polish Drive upload progress UI
- Inline folder creation status (no scroll)
- Overall bar shows file count + bytes + speed + ETA
- Per-file spinners: ⬆ uploading, ✓ done, ✗ failed
- Bars pop in/out dynamically as uploads start/finish
- Error count in summary line
- Default parallel reduced to 4 (proxy can't handle 8)
2026-03-24 13:36:17 +00:00
27536b4695 feat: parallel Drive upload with indicatif progress UI
- Parallel file uploads with --parallel flag (default 4)
- indicatif MultiProgress: overall bar with file count, speed, ETA
- Per-file spinner bars showing filename during upload
- Phase 1: walk tree + create folders sequentially
- Phase 2: upload files concurrently via semaphore
- Summary line on completion (files, bytes, time, speed)
- Fixed DriveFile/DriveFolder types to match actual API fields
- DriveClient now Clone for Arc sharing across tasks
2026-03-24 13:26:16 +00:00
9 changed files with 468 additions and 93 deletions

View File

@@ -1,5 +1,12 @@
# Changelog
## v1.1.1
- cd80a57 fix: DynamicBearer auth, retry on 500/429, upload resilience
- de5c807 fix: progress bar tracks files not bytes, retry on 502, dedup folders
- 2ab2fd5 fix: polish Drive upload progress UI
- 27536b4 feat: parallel Drive upload with indicatif progress UI
## v1.1.0
- 477006e chore: bump to v1.1.0, update package description

56
Cargo.lock generated
View File

@@ -532,6 +532,19 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "console"
version = "0.15.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "054ccb5b10f9f2cbf51eb355ca1d05c2d279ce1804688d0db74b4733a5aeafd8"
dependencies = [
"encode_unicode",
"libc",
"once_cell",
"unicode-width",
"windows-sys 0.59.0",
]
[[package]]
name = "const-oid"
version = "0.9.6"
@@ -936,6 +949,12 @@ version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e079f19b08ca6239f47f8ba8509c11cf3ea30095831f7fed61441475edd8c449"
[[package]]
name = "encode_unicode"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0"
[[package]]
name = "enum-ordinalize"
version = "4.3.2"
@@ -1672,6 +1691,20 @@ dependencies = [
"serde_core",
]
[[package]]
name = "indicatif"
version = "0.17.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "183b3088984b400f4cfac3620d5e076c84da5364016b4f49473de574b2586235"
dependencies = [
"console",
"number_prefix",
"portable-atomic",
"tokio",
"unicode-width",
"web-time",
]
[[package]]
name = "inout"
version = "0.1.4"
@@ -2145,6 +2178,12 @@ dependencies = [
"libc",
]
[[package]]
name = "number_prefix"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3"
[[package]]
name = "object"
version = "0.37.3"
@@ -2478,6 +2517,12 @@ dependencies = [
"universal-hash",
]
[[package]]
name = "portable-atomic"
version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49"
[[package]]
name = "potential_utf"
version = "0.1.4"
@@ -3501,7 +3546,7 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]]
name = "sunbeam"
version = "1.1.0"
version = "1.1.1"
dependencies = [
"chrono",
"clap",
@@ -3514,7 +3559,7 @@ dependencies = [
[[package]]
name = "sunbeam-sdk"
version = "1.1.0"
version = "1.1.1"
dependencies = [
"aes-gcm",
"argon2",
@@ -3526,6 +3571,7 @@ dependencies = [
"flate2",
"futures",
"hmac",
"indicatif",
"k8s-openapi",
"kube",
"lettre",
@@ -3948,6 +3994,12 @@ version = "1.0.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75"
[[package]]
name = "unicode-width"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4ac048d71ede7ee76d585517add45da530660ef4390e49b098733c6e897f254"
[[package]]
name = "unicode-xid"
version = "0.2.6"

View File

@@ -1,6 +1,6 @@
[package]
name = "sunbeam-sdk"
version = "1.1.0"
version = "1.1.1"
edition = "2024"
description = "Sunbeam Studios SDK, CLI, and ecosystem integrations"
repository = "https://src.sunbeam.pt/studio/cli"
@@ -55,6 +55,7 @@ base64 = "0.22"
rand = "0.8"
aes-gcm = "0.10"
argon2 = "0.5"
indicatif = { version = "0.17", features = ["tokio"] }
# Certificate generation
rcgen = "0.14"

View File

@@ -674,6 +674,16 @@ pub fn get_gitea_token() -> Result<String> {
})
}
/// Get cached SSO access token synchronously (reads from cache file).
/// If the token was recently refreshed by the async `get_token()`, this
/// returns the fresh one. Used by DynamicBearer for per-request auth.
pub fn get_token_sync() -> Result<String> {
let cached = read_cache().map_err(|_| {
SunbeamError::identity("Not logged in. Run `sunbeam auth login` first.")
})?;
Ok(cached.access_token)
}
/// Get cached OIDC id_token (JWT).
pub fn get_id_token() -> Result<String> {
let tokens = read_cache().map_err(|_| {

View File

@@ -20,6 +20,8 @@ pub enum AuthMethod {
None,
/// Bearer token (`Authorization: Bearer <token>`).
Bearer(String),
/// Dynamic bearer — resolves token fresh on each request (survives expiry).
DynamicBearer,
/// Custom header (e.g. `X-Vault-Token`).
Header { name: &'static str, value: String },
/// Gitea-style PAT (`Authorization: token <pat>`).
@@ -84,6 +86,12 @@ impl HttpTransport {
AuthMethod::Bearer(token) => {
req = req.bearer_auth(token);
}
AuthMethod::DynamicBearer => {
// Resolve token fresh on each request — survives token expiry/refresh.
if let Ok(token) = crate::auth::get_token_sync() {
req = req.bearer_auth(token);
}
}
AuthMethod::Header { name, value } => {
req = req.header(*name, value);
}
@@ -427,64 +435,65 @@ impl SunbeamClient {
#[cfg(feature = "lasuite")]
pub async fn people(&self) -> Result<&crate::lasuite::PeopleClient> {
// Ensure we have a valid token (triggers refresh if expired).
self.sso_token().await?;
self.people.get_or_try_init(|| async {
let token = self.sso_token().await?;
let url = format!("https://people.{}/external_api/v1.0", self.domain);
Ok(crate::lasuite::PeopleClient::from_parts(url, AuthMethod::Bearer(token)))
Ok(crate::lasuite::PeopleClient::from_parts(url, AuthMethod::DynamicBearer))
}).await
}
#[cfg(feature = "lasuite")]
pub async fn docs(&self) -> Result<&crate::lasuite::DocsClient> {
self.sso_token().await?;
self.docs.get_or_try_init(|| async {
let token = self.sso_token().await?;
let url = format!("https://docs.{}/external_api/v1.0", self.domain);
Ok(crate::lasuite::DocsClient::from_parts(url, AuthMethod::Bearer(token)))
Ok(crate::lasuite::DocsClient::from_parts(url, AuthMethod::DynamicBearer))
}).await
}
#[cfg(feature = "lasuite")]
pub async fn meet(&self) -> Result<&crate::lasuite::MeetClient> {
self.sso_token().await?;
self.meet.get_or_try_init(|| async {
let token = self.sso_token().await?;
let url = format!("https://meet.{}/external_api/v1.0", self.domain);
Ok(crate::lasuite::MeetClient::from_parts(url, AuthMethod::Bearer(token)))
Ok(crate::lasuite::MeetClient::from_parts(url, AuthMethod::DynamicBearer))
}).await
}
#[cfg(feature = "lasuite")]
pub async fn drive(&self) -> Result<&crate::lasuite::DriveClient> {
self.sso_token().await?;
self.drive.get_or_try_init(|| async {
let token = self.sso_token().await?;
let url = format!("https://drive.{}/external_api/v1.0", self.domain);
Ok(crate::lasuite::DriveClient::from_parts(url, AuthMethod::Bearer(token)))
Ok(crate::lasuite::DriveClient::from_parts(url, AuthMethod::DynamicBearer))
}).await
}
#[cfg(feature = "lasuite")]
pub async fn messages(&self) -> Result<&crate::lasuite::MessagesClient> {
self.sso_token().await?;
self.messages.get_or_try_init(|| async {
let token = self.sso_token().await?;
let url = format!("https://mail.{}/external_api/v1.0", self.domain);
Ok(crate::lasuite::MessagesClient::from_parts(url, AuthMethod::Bearer(token)))
Ok(crate::lasuite::MessagesClient::from_parts(url, AuthMethod::DynamicBearer))
}).await
}
#[cfg(feature = "lasuite")]
pub async fn calendars(&self) -> Result<&crate::lasuite::CalendarsClient> {
self.sso_token().await?;
self.calendars.get_or_try_init(|| async {
let token = self.sso_token().await?;
let url = format!("https://calendar.{}/external_api/v1.0", self.domain);
Ok(crate::lasuite::CalendarsClient::from_parts(url, AuthMethod::Bearer(token)))
Ok(crate::lasuite::CalendarsClient::from_parts(url, AuthMethod::DynamicBearer))
}).await
}
#[cfg(feature = "lasuite")]
pub async fn find(&self) -> Result<&crate::lasuite::FindClient> {
self.sso_token().await?;
self.find.get_or_try_init(|| async {
let token = self.sso_token().await?;
let url = format!("https://find.{}/external_api/v1.0", self.domain);
Ok(crate::lasuite::FindClient::from_parts(url, AuthMethod::Bearer(token)))
Ok(crate::lasuite::FindClient::from_parts(url, AuthMethod::DynamicBearer))
}).await
}

View File

@@ -558,6 +558,9 @@ pub enum DriveCommand {
/// Target Drive folder ID.
#[arg(short = 't', long)]
folder_id: String,
/// Number of concurrent uploads.
#[arg(long, default_value = "3")]
parallel: usize,
},
}
@@ -623,13 +626,14 @@ pub async fn dispatch_drive(
let page_data = drive.list_files(page).await?;
output::render_list(
&page_data.results,
&["ID", "NAME", "SIZE", "MIME_TYPE"],
&["ID", "TITLE", "TYPE", "SIZE", "MIMETYPE"],
|f| {
vec![
f.id.clone(),
f.name.clone().unwrap_or_default(),
f.title.clone().unwrap_or_default(),
f.item_type.clone().unwrap_or_default(),
f.size.map_or("-".into(), |s| s.to_string()),
f.mime_type.clone().unwrap_or_default(),
f.mimetype.clone().unwrap_or_default(),
]
},
fmt,
@@ -655,12 +659,13 @@ pub async fn dispatch_drive(
let page_data = drive.list_folders(page).await?;
output::render_list(
&page_data.results,
&["ID", "NAME", "PARENT_ID"],
&["ID", "TITLE", "CHILDREN", "CREATED"],
|f| {
vec![
f.id.clone(),
f.name.clone().unwrap_or_default(),
f.parent_id.clone().unwrap_or_default(),
f.title.clone().unwrap_or_default(),
f.numchild.map_or("-".into(), |n| n.to_string()),
f.created_at.clone().unwrap_or_default(),
]
},
fmt,
@@ -696,18 +701,31 @@ pub async fn dispatch_drive(
)
}
},
DriveCommand::Upload { path, folder_id } => {
upload_recursive(drive, &path, &folder_id).await
DriveCommand::Upload { path, folder_id, parallel } => {
upload_recursive(drive, &path, &folder_id, parallel).await
}
}
}
/// A file that needs uploading, collected during the directory-walk phase.
struct UploadJob {
local_path: std::path::PathBuf,
parent_id: String,
file_size: u64,
relative_path: String,
}
/// Recursively upload a local file or directory to a Drive folder.
async fn upload_recursive(
drive: &super::DriveClient,
local_path: &str,
parent_id: &str,
parallel: usize,
) -> Result<()> {
use indicatif::{HumanBytes, MultiProgress, ProgressBar, ProgressStyle};
use std::sync::Arc;
use tokio::sync::Semaphore;
let path = std::path::Path::new(local_path);
if !path.exists() {
return Err(crate::error::SunbeamError::Other(format!(
@@ -715,45 +733,232 @@ async fn upload_recursive(
)));
}
// Phase 1 — Walk and collect: create folders sequentially, gather file jobs.
let mut jobs = Vec::new();
if path.is_file() {
upload_single_file(drive, path, parent_id).await
let file_size = std::fs::metadata(path)
.map_err(|e| crate::error::SunbeamError::Other(format!("stat: {e}")))?
.len();
let filename = path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unnamed");
if !filename.starts_with('.') {
jobs.push(UploadJob {
local_path: path.to_path_buf(),
parent_id: parent_id.to_string(),
file_size,
relative_path: filename.to_string(),
});
}
} else if path.is_dir() {
upload_directory(drive, path, parent_id).await
collect_upload_jobs(drive, path, parent_id, "", &mut jobs).await?;
} else {
Err(crate::error::SunbeamError::Other(format!(
return Err(crate::error::SunbeamError::Other(format!(
"Not a file or directory: {local_path}"
)))
)));
}
if jobs.is_empty() {
output::ok("Nothing to upload.");
return Ok(());
}
let total_files = jobs.len() as u64;
let total_bytes: u64 = jobs.iter().map(|j| j.file_size).sum();
// Clear the folder creation line
eprint!("\r\x1b[K");
// Phase 2 — Parallel upload with progress bars.
let multi = MultiProgress::new();
// Overall bar tracks file count. Bandwidth is computed manually in the message.
let overall_style = ProgressStyle::with_template(
" {spinner:.green} [{elapsed_precise}] {bar:40.cyan/blue} {pos}/{len} files {msg}",
)
.unwrap()
.progress_chars("█▓░");
let overall = multi.add(ProgressBar::new(total_files));
overall.set_style(overall_style);
overall.enable_steady_tick(std::time::Duration::from_millis(100));
let completed_bytes = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
let file_style = ProgressStyle::with_template(
" {spinner:.cyan} {wide_msg}",
)
.unwrap();
let sem = Arc::new(Semaphore::new(parallel));
let drive = Arc::new(drive.clone());
let mut handles = Vec::new();
let start = std::time::Instant::now();
for job in jobs {
let permit = sem.clone().acquire_owned().await.unwrap();
let drive = Arc::clone(&drive);
let multi = multi.clone();
let overall = overall.clone();
let file_style = file_style.clone();
let job_size = job.file_size;
let completed_bytes = Arc::clone(&completed_bytes);
let total_bytes = total_bytes;
let start = start.clone();
let handle = tokio::spawn(async move {
let pb = multi.add(ProgressBar::new_spinner());
pb.set_style(file_style);
pb.set_message(job.relative_path.clone());
pb.enable_steady_tick(std::time::Duration::from_millis(80));
let result = upload_single_file_with_progress(&drive, &job, &pb).await;
pb.finish_and_clear();
multi.remove(&pb);
// Update overall — increment file count, compute bandwidth from bytes
overall.inc(1);
let done_bytes = completed_bytes.fetch_add(job_size, std::sync::atomic::Ordering::Relaxed) + job_size;
let elapsed = start.elapsed().as_secs_f64();
let speed = if elapsed > 1.0 { done_bytes as f64 / elapsed } else { 0.0 };
let remaining = total_bytes.saturating_sub(done_bytes);
let eta = if speed > 0.0 { remaining as f64 / speed } else { 0.0 };
let eta_m = eta as u64 / 60;
let eta_s = eta as u64 % 60;
overall.set_message(format!(
"{}/{} {}/s ETA: {}m {:02}s",
indicatif::HumanBytes(done_bytes),
indicatif::HumanBytes(total_bytes),
indicatif::HumanBytes(speed as u64),
eta_m, eta_s,
));
drop(permit);
result
});
handles.push(handle);
}
let mut errors = 0u64;
for handle in handles {
match handle.await {
Ok(Ok(())) => {}
Ok(Err(e)) => {
errors += 1;
multi.suspend(|| eprintln!(" ERROR: {e}"));
}
Err(e) => {
errors += 1;
multi.suspend(|| eprintln!(" ERROR: task panic: {e}"));
}
}
}
overall.finish_and_clear();
multi.clear().ok();
let elapsed = start.elapsed();
let secs = elapsed.as_secs_f64();
let speed = if secs > 0.0 {
total_bytes as f64 / secs
} else {
0.0
};
let mins = elapsed.as_secs() / 60;
let secs_rem = elapsed.as_secs() % 60;
let uploaded = total_files - errors;
if errors > 0 {
println!(
"✓ Uploaded {uploaded}/{total_files} files ({}) in {mins}m {secs_rem}s ({}/s) — {errors} failed",
HumanBytes(total_bytes),
HumanBytes(speed as u64),
);
} else {
println!(
"✓ Uploaded {total_files} files ({}) in {mins}m {secs_rem}s ({}/s)",
HumanBytes(total_bytes),
HumanBytes(speed as u64),
);
}
Ok(())
}
async fn upload_directory(
/// Phase 1: Walk a directory recursively, create folders in Drive sequentially,
/// and collect [`UploadJob`]s for every regular file.
async fn collect_upload_jobs(
drive: &super::DriveClient,
dir: &std::path::Path,
parent_id: &str,
prefix: &str,
jobs: &mut Vec<UploadJob>,
) -> Result<()> {
let dir_name = dir
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unnamed");
output::step(&format!("Creating folder: {dir_name}"));
// Skip hidden directories
if dir_name.starts_with('.') {
return Ok(());
}
// Create the folder in Drive
let folder = drive
.create_child(
parent_id,
&serde_json::json!({
"title": dir_name,
"type": "folder",
}),
)
.await?;
// Build the display prefix for children
let display_prefix = if prefix.is_empty() {
dir_name.to_string()
} else {
format!("{prefix}/{dir_name}")
};
let folder_id = folder["id"]
.as_str()
.ok_or_else(|| crate::error::SunbeamError::Other("No folder ID in response".into()))?;
eprint!("\r\x1b[K Scanning: {display_prefix} ");
// Check if folder already exists under the parent.
let existing = drive.list_children(parent_id, None).await.ok();
let existing_folder_id = existing.and_then(|page| {
page.results.iter().find_map(|item| {
let is_folder = item.get("type").and_then(|v| v.as_str()) == Some("folder");
let title_matches = item.get("title").and_then(|v| v.as_str()) == Some(dir_name);
if is_folder && title_matches {
item.get("id").and_then(|v| v.as_str()).map(String::from)
} else {
None
}
})
});
let folder_id = if let Some(id) = existing_folder_id {
id
} else {
let folder = drive
.create_child(
parent_id,
&serde_json::json!({
"title": dir_name,
"type": "folder",
}),
)
.await?;
folder["id"]
.as_str()
.ok_or_else(|| crate::error::SunbeamError::Other("No folder ID in response".into()))?
.to_string()
};
// Build a set of existing file titles in this folder to skip duplicates.
let existing_file_titles: std::collections::HashSet<String> = {
let mut titles = std::collections::HashSet::new();
if let Ok(page) = drive.list_children(&folder_id, None).await {
for item in &page.results {
if item.get("type").and_then(|v| v.as_str()) == Some("file") {
if let Some(title) = item.get("title").and_then(|v| v.as_str()) {
titles.insert(title.to_string());
}
}
}
}
titles
};
// Process entries
let mut entries: Vec<_> = std::fs::read_dir(dir)
.map_err(|e| crate::error::SunbeamError::Other(format!("reading dir: {e}")))?
.filter_map(|e| e.ok())
@@ -762,66 +967,132 @@ async fn upload_directory(
for entry in entries {
let entry_path = entry.path();
let name = entry
.file_name()
.to_str()
.unwrap_or_default()
.to_string();
// Skip hidden entries
if name.starts_with('.') {
continue;
}
if entry_path.is_dir() {
Box::pin(upload_directory(drive, &entry_path, folder_id)).await?;
Box::pin(collect_upload_jobs(
drive,
&entry_path,
&folder_id,
&display_prefix,
jobs,
))
.await?;
} else if entry_path.is_file() {
upload_single_file(drive, &entry_path, folder_id).await?;
// Skip if a file with this title already exists in the folder.
if existing_file_titles.contains(&name) {
continue;
}
let file_size = std::fs::metadata(&entry_path)
.map_err(|e| crate::error::SunbeamError::Other(format!("stat: {e}")))?
.len();
jobs.push(UploadJob {
local_path: entry_path,
parent_id: folder_id.clone(),
file_size,
relative_path: format!("{display_prefix}/{name}"),
});
}
}
Ok(())
}
async fn upload_single_file(
/// Upload a single file to Drive, updating the progress bar.
/// Retries on 429/500/502/503 up to 5 times with exponential backoff.
async fn upload_single_file_with_progress(
drive: &super::DriveClient,
file_path: &std::path::Path,
parent_id: &str,
job: &UploadJob,
pb: &indicatif::ProgressBar,
) -> Result<()> {
let filename = file_path
let filename = job
.local_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unnamed");
// Skip hidden files
if filename.starts_with('.') {
return Ok(());
}
output::ok(&format!("Uploading: {filename}"));
// Create the file item in Drive
let item = drive
.create_child(
parent_id,
&serde_json::json!({
"title": filename,
"type": "file",
}),
)
.await?;
// Create the file item in Drive (with retry)
let body = serde_json::json!({
"title": filename,
"filename": filename,
"type": "file",
});
let item = retry_drive_call(|| drive.create_child(&job.parent_id, &body), 5).await?;
let item_id = item["id"]
.as_str()
.ok_or_else(|| crate::error::SunbeamError::Other("No item ID in response".into()))?;
// Get the presigned upload URL (Drive returns it as "policy" on create)
let upload_url = item["policy"]
.as_str()
.ok_or_else(|| crate::error::SunbeamError::Other("No upload policy URL in response — is the item a file?".into()))?;
.ok_or_else(|| {
crate::error::SunbeamError::Other(
"No upload policy URL in response \u{2014} is the item a file?".into(),
)
})?;
tracing::debug!("S3 presigned URL: {upload_url}");
// Read the file and upload to S3
let data = std::fs::read(file_path)
let data = std::fs::read(&job.local_path)
.map_err(|e| crate::error::SunbeamError::Other(format!("reading file: {e}")))?;
let len = data.len() as u64;
drive
.upload_to_s3(upload_url, bytes::Bytes::from(data))
.await?;
pb.set_position(len);
// Notify Drive the upload is complete
drive.upload_ended(item_id).await?;
// Notify Drive the upload is complete (with retry)
retry_drive_call(|| drive.upload_ended(item_id), 5).await?;
Ok(())
}
/// Retry a Drive API call on 429/500/502/503 with exponential backoff.
async fn retry_drive_call<F, Fut, T>(f: F, max_retries: u32) -> Result<T>
where
F: Fn() -> Fut,
Fut: std::future::Future<Output = Result<T>>,
{
let mut last_err = None;
for attempt in 0..=max_retries {
match f().await {
Ok(v) => return Ok(v),
Err(e) => {
let msg = e.to_string();
let retryable = msg.contains("429")
|| msg.contains("500")
|| msg.contains("502")
|| msg.contains("503")
|| msg.contains("request failed");
if retryable && attempt < max_retries {
// On 500, try refreshing the SSO token (may have expired)
if msg.contains("500") {
let _ = crate::auth::get_token().await;
}
let delay = std::time::Duration::from_millis(
500 * 2u64.pow(attempt.min(4)),
);
tokio::time::sleep(delay).await;
last_err = Some(e);
continue;
}
return Err(e);
}
}
}
Err(last_err.unwrap())
}
// ═══════════════════════════════════════════════════════════════════════════
// Mail (Messages)
// ═══════════════════════════════════════════════════════════════════════════

View File

@@ -6,6 +6,7 @@ use reqwest::Method;
use super::types::*;
/// Client for the La Suite Drive API.
#[derive(Clone)]
pub struct DriveClient {
pub(crate) transport: HttpTransport,
}
@@ -160,21 +161,39 @@ impl DriveClient {
}
/// Upload file bytes directly to a presigned S3 URL.
/// The presigned URL's SigV4 signature covers host + x-amz-acl headers.
/// Retries up to 3 times on 502/503/connection errors.
pub async fn upload_to_s3(&self, presigned_url: &str, data: bytes::Bytes) -> Result<()> {
let resp = reqwest::Client::new()
.put(presigned_url)
.header("Content-Type", "application/octet-stream")
.body(data)
.send()
.await
.map_err(|e| crate::error::SunbeamError::network(format!("S3 upload: {e}")))?;
let max_retries = 3;
for attempt in 0..=max_retries {
let resp = self.transport.http
.put(presigned_url)
.header("x-amz-acl", "private")
.body(data.clone())
.send()
.await;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(crate::error::SunbeamError::network(format!(
"S3 upload: HTTP {status}: {body}"
)));
match resp {
Ok(r) if r.status().is_success() => return Ok(()),
Ok(r) if (r.status() == 502 || r.status() == 503) && attempt < max_retries => {
tokio::time::sleep(std::time::Duration::from_millis(500 * (attempt as u64 + 1))).await;
continue;
}
Ok(r) => {
let status = r.status();
let body = r.text().await.unwrap_or_default();
return Err(crate::error::SunbeamError::network(format!(
"S3 upload: HTTP {status}: {body}"
)));
}
Err(_) if attempt < max_retries => {
tokio::time::sleep(std::time::Duration::from_millis(500 * (attempt as u64 + 1))).await;
continue;
}
Err(e) => {
return Err(crate::error::SunbeamError::network(format!("S3 upload: {e}")));
}
}
}
Ok(())
}

View File

@@ -219,13 +219,17 @@ pub struct DriveFile {
#[serde(default)]
pub id: String,
#[serde(default)]
pub name: Option<String>,
pub title: Option<String>,
#[serde(default)]
pub filename: Option<String>,
#[serde(default, rename = "type")]
pub item_type: Option<String>,
#[serde(default)]
pub size: Option<u64>,
#[serde(default)]
pub mime_type: Option<String>,
pub mimetype: Option<String>,
#[serde(default)]
pub folder_id: Option<String>,
pub upload_state: Option<String>,
#[serde(default)]
pub url: Option<String>,
#[serde(default)]
@@ -234,15 +238,17 @@ pub struct DriveFile {
pub updated_at: Option<String>,
}
/// A folder in the Drive service.
/// A folder in the Drive service (same API, type=folder).
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct DriveFolder {
#[serde(default)]
pub id: String,
#[serde(default)]
pub name: Option<String>,
pub title: Option<String>,
#[serde(default, rename = "type")]
pub item_type: Option<String>,
#[serde(default)]
pub parent_id: Option<String>,
pub numchild: Option<u32>,
#[serde(default)]
pub created_at: Option<String>,
#[serde(default)]

View File

@@ -1,6 +1,6 @@
[package]
name = "sunbeam"
version = "1.1.0"
version = "1.1.1"
edition = "2024"
description = "Sunbeam Studios SDK, CLI, and ecosystem integrations"