From de5c807374847461fbc55a98e1371485467f328a Mon Sep 17 00:00:00 2001 From: Sienna Meridian Satterwhite Date: Tue, 24 Mar 2026 14:55:03 +0000 Subject: [PATCH] fix: progress bar tracks files not bytes, retry on 502, dedup folders - Overall bar progress based on file count (was bytes, causing 50% bar at low file count when large files uploaded first) - Bandwidth computed manually from completed bytes / elapsed time - Per-file bars show spinner + name only (no misleading 0 B counter) - S3 upload retries up to 3x on 502/503 with backoff - Folder dedup: list_children before create, reuse existing folders - File dedup: skip files already present in target folder - Connection pooling: reuse DriveClient's HTTP client for S3 PUTs - Default parallel back to 8 (retries handle transient 502s) --- sunbeam-sdk/src/lasuite/cli.rs | 144 ++++++++++++++++--------------- sunbeam-sdk/src/lasuite/drive.rs | 43 ++++++--- 2 files changed, 106 insertions(+), 81 deletions(-) diff --git a/sunbeam-sdk/src/lasuite/cli.rs b/sunbeam-sdk/src/lasuite/cli.rs index cdd8921..54667bc 100644 --- a/sunbeam-sdk/src/lasuite/cli.rs +++ b/sunbeam-sdk/src/lasuite/cli.rs @@ -559,7 +559,7 @@ pub enum DriveCommand { #[arg(short = 't', long)] folder_id: String, /// Number of concurrent uploads. - #[arg(long, default_value = "4")] + #[arg(long, default_value = "8")] parallel: usize, }, } @@ -773,19 +773,16 @@ async fn upload_recursive( // Phase 2 — Parallel upload with progress bars. let multi = MultiProgress::new(); - // Overall bar tracks bytes (not file count) for accurate speed + ETA. + // Overall bar tracks file count. Bandwidth is computed manually in the message. let overall_style = ProgressStyle::with_template( " {spinner:.green} [{elapsed_precise}] {bar:40.cyan/blue} {pos}/{len} files {msg}", ) .unwrap() - .progress_chars("━╸─"); + .progress_chars("█▓░"); let overall = multi.add(ProgressBar::new(total_files)); overall.set_style(overall_style); overall.enable_steady_tick(std::time::Duration::from_millis(100)); - - // Byte-tracking bar (hidden) — drives speed + ETA calculation. - let bytes_bar = multi.add(ProgressBar::hidden()); - bytes_bar.set_length(total_bytes); + let completed_bytes = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)); let file_style = ProgressStyle::with_template( " {spinner:.cyan} {wide_msg}", @@ -796,70 +793,45 @@ async fn upload_recursive( let drive = Arc::new(drive.clone()); let mut handles = Vec::new(); let start = std::time::Instant::now(); - let completed_bytes = Arc::new(std::sync::atomic::AtomicU64::new(0)); - let completed_files = Arc::new(std::sync::atomic::AtomicU64::new(0)); - - // Spawn a task to update the overall bar message with speed + ETA. - { - let overall = overall.clone(); - let completed_bytes = Arc::clone(&completed_bytes); - let completed_files = Arc::clone(&completed_files); - let total_bytes = total_bytes; - let total_files = total_files; - tokio::spawn(async move { - loop { - tokio::time::sleep(std::time::Duration::from_millis(250)).await; - let done_bytes = completed_bytes.load(std::sync::atomic::Ordering::Relaxed); - let done_files = completed_files.load(std::sync::atomic::Ordering::Relaxed); - let elapsed = start.elapsed().as_secs_f64(); - let speed = if elapsed > 0.5 { done_bytes as f64 / elapsed } else { 0.0 }; - let remaining_bytes = total_bytes.saturating_sub(done_bytes); - let eta_secs = if speed > 0.0 { remaining_bytes as f64 / speed } else { 0.0 }; - let eta_min = eta_secs as u64 / 60; - let eta_sec = eta_secs as u64 % 60; - overall.set_message(format!( - "{}/{} {}/s ETA: {eta_min}m {eta_sec:02}s", - HumanBytes(done_bytes), - HumanBytes(total_bytes), - HumanBytes(speed as u64), - )); - overall.set_position(done_files); - if done_files >= total_files { - break; - } - } - }); - } for job in jobs { let permit = sem.clone().acquire_owned().await.unwrap(); let drive = Arc::clone(&drive); let multi = multi.clone(); + let overall = overall.clone(); let file_style = file_style.clone(); - let completed_bytes = Arc::clone(&completed_bytes); - let completed_files = Arc::clone(&completed_files); let job_size = job.file_size; + let completed_bytes = Arc::clone(&completed_bytes); + let total_bytes = total_bytes; + let start = start.clone(); let handle = tokio::spawn(async move { - let pb = multi.insert_before(&multi.add(ProgressBar::hidden()), ProgressBar::new_spinner()); + let pb = multi.add(ProgressBar::new_spinner()); pb.set_style(file_style); - pb.set_message(format!("⬆ {}", job.relative_path)); + pb.set_message(job.relative_path.clone()); pb.enable_steady_tick(std::time::Duration::from_millis(80)); let result = upload_single_file_with_progress(&drive, &job, &pb).await; - if result.is_ok() { - pb.set_message(format!("✓ {}", job.relative_path)); - } else { - pb.set_message(format!("✗ {}", job.relative_path)); - } - pb.finish(); - tokio::time::sleep(std::time::Duration::from_millis(150)).await; pb.finish_and_clear(); multi.remove(&pb); - completed_bytes.fetch_add(job_size, std::sync::atomic::Ordering::Relaxed); - completed_files.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + // Update overall — increment file count, compute bandwidth from bytes + overall.inc(1); + let done_bytes = completed_bytes.fetch_add(job_size, std::sync::atomic::Ordering::Relaxed) + job_size; + let elapsed = start.elapsed().as_secs_f64(); + let speed = if elapsed > 1.0 { done_bytes as f64 / elapsed } else { 0.0 }; + let remaining = total_bytes.saturating_sub(done_bytes); + let eta = if speed > 0.0 { remaining as f64 / speed } else { 0.0 }; + let eta_m = eta as u64 / 60; + let eta_s = eta as u64 % 60; + overall.set_message(format!( + "{}/{} {}/s ETA: {}m {:02}s", + indicatif::HumanBytes(done_bytes), + indicatif::HumanBytes(total_bytes), + indicatif::HumanBytes(speed as u64), + eta_m, eta_s, + )); drop(permit); result @@ -938,22 +910,54 @@ async fn collect_upload_jobs( format!("{prefix}/{dir_name}") }; - eprint!("\r\x1b[K Creating folder: {display_prefix} "); + eprint!("\r\x1b[K Scanning: {display_prefix} "); - let folder = drive - .create_child( - parent_id, - &serde_json::json!({ - "title": dir_name, - "type": "folder", - }), - ) - .await?; + // Check if folder already exists under the parent. + let existing = drive.list_children(parent_id, None).await.ok(); + let existing_folder_id = existing.and_then(|page| { + page.results.iter().find_map(|item| { + let is_folder = item.get("type").and_then(|v| v.as_str()) == Some("folder"); + let title_matches = item.get("title").and_then(|v| v.as_str()) == Some(dir_name); + if is_folder && title_matches { + item.get("id").and_then(|v| v.as_str()).map(String::from) + } else { + None + } + }) + }); - let folder_id = folder["id"] - .as_str() - .ok_or_else(|| crate::error::SunbeamError::Other("No folder ID in response".into()))? - .to_string(); + let folder_id = if let Some(id) = existing_folder_id { + id + } else { + let folder = drive + .create_child( + parent_id, + &serde_json::json!({ + "title": dir_name, + "type": "folder", + }), + ) + .await?; + folder["id"] + .as_str() + .ok_or_else(|| crate::error::SunbeamError::Other("No folder ID in response".into()))? + .to_string() + }; + + // Build a set of existing file titles in this folder to skip duplicates. + let existing_file_titles: std::collections::HashSet = { + let mut titles = std::collections::HashSet::new(); + if let Ok(page) = drive.list_children(&folder_id, None).await { + for item in &page.results { + if item.get("type").and_then(|v| v.as_str()) == Some("file") { + if let Some(title) = item.get("title").and_then(|v| v.as_str()) { + titles.insert(title.to_string()); + } + } + } + } + titles + }; let mut entries: Vec<_> = std::fs::read_dir(dir) .map_err(|e| crate::error::SunbeamError::Other(format!("reading dir: {e}")))? @@ -984,6 +988,10 @@ async fn collect_upload_jobs( )) .await?; } else if entry_path.is_file() { + // Skip if a file with this title already exists in the folder. + if existing_file_titles.contains(&name) { + continue; + } let file_size = std::fs::metadata(&entry_path) .map_err(|e| crate::error::SunbeamError::Other(format!("stat: {e}")))? .len(); diff --git a/sunbeam-sdk/src/lasuite/drive.rs b/sunbeam-sdk/src/lasuite/drive.rs index 184e91e..ffe70fb 100644 --- a/sunbeam-sdk/src/lasuite/drive.rs +++ b/sunbeam-sdk/src/lasuite/drive.rs @@ -162,21 +162,38 @@ impl DriveClient { /// Upload file bytes directly to a presigned S3 URL. /// The presigned URL's SigV4 signature covers host + x-amz-acl headers. + /// Retries up to 3 times on 502/503/connection errors. pub async fn upload_to_s3(&self, presigned_url: &str, data: bytes::Bytes) -> Result<()> { - let resp = reqwest::Client::new() - .put(presigned_url) - .header("x-amz-acl", "private") - .body(data) - .send() - .await - .map_err(|e| crate::error::SunbeamError::network(format!("S3 upload: {e}")))?; + let max_retries = 3; + for attempt in 0..=max_retries { + let resp = self.transport.http + .put(presigned_url) + .header("x-amz-acl", "private") + .body(data.clone()) + .send() + .await; - if !resp.status().is_success() { - let status = resp.status(); - let body = resp.text().await.unwrap_or_default(); - return Err(crate::error::SunbeamError::network(format!( - "S3 upload: HTTP {status}: {body}" - ))); + match resp { + Ok(r) if r.status().is_success() => return Ok(()), + Ok(r) if (r.status() == 502 || r.status() == 503) && attempt < max_retries => { + tokio::time::sleep(std::time::Duration::from_millis(500 * (attempt as u64 + 1))).await; + continue; + } + Ok(r) => { + let status = r.status(); + let body = r.text().await.unwrap_or_default(); + return Err(crate::error::SunbeamError::network(format!( + "S3 upload: HTTP {status}: {body}" + ))); + } + Err(_) if attempt < max_retries => { + tokio::time::sleep(std::time::Duration::from_millis(500 * (attempt as u64 + 1))).await; + continue; + } + Err(e) => { + return Err(crate::error::SunbeamError::network(format!("S3 upload: {e}"))); + } + } } Ok(()) }