diff --git a/sunbeam-sdk/src/lasuite/cli.rs b/sunbeam-sdk/src/lasuite/cli.rs index b3ed79d..cdd8921 100644 --- a/sunbeam-sdk/src/lasuite/cli.rs +++ b/sunbeam-sdk/src/lasuite/cli.rs @@ -767,19 +767,28 @@ async fn upload_recursive( let total_files = jobs.len() as u64; let total_bytes: u64 = jobs.iter().map(|j| j.file_size).sum(); + // Clear the folder creation line + eprint!("\r\x1b[K"); + // Phase 2 — Parallel upload with progress bars. let multi = MultiProgress::new(); + + // Overall bar tracks bytes (not file count) for accurate speed + ETA. let overall_style = ProgressStyle::with_template( - "{spinner:.green} [{elapsed_precise}] {bar:40.cyan/blue} {pos}/{len} files ({binary_bytes_per_sec}) ETA: {eta}", + " {spinner:.green} [{elapsed_precise}] {bar:40.cyan/blue} {pos}/{len} files {msg}", ) .unwrap() - .progress_chars("\u{2588}\u{2593}\u{2591}"); + .progress_chars("━╸─"); let overall = multi.add(ProgressBar::new(total_files)); overall.set_style(overall_style); - overall.set_length(total_files); + overall.enable_steady_tick(std::time::Duration::from_millis(100)); + + // Byte-tracking bar (hidden) — drives speed + ETA calculation. + let bytes_bar = multi.add(ProgressBar::hidden()); + bytes_bar.set_length(total_bytes); let file_style = ProgressStyle::with_template( - " {spinner:.cyan} {wide_msg} {bytes}/{total_bytes}", + " {spinner:.cyan} {wide_msg}", ) .unwrap(); @@ -787,24 +796,70 @@ async fn upload_recursive( let drive = Arc::new(drive.clone()); let mut handles = Vec::new(); let start = std::time::Instant::now(); + let completed_bytes = Arc::new(std::sync::atomic::AtomicU64::new(0)); + let completed_files = Arc::new(std::sync::atomic::AtomicU64::new(0)); + + // Spawn a task to update the overall bar message with speed + ETA. + { + let overall = overall.clone(); + let completed_bytes = Arc::clone(&completed_bytes); + let completed_files = Arc::clone(&completed_files); + let total_bytes = total_bytes; + let total_files = total_files; + tokio::spawn(async move { + loop { + tokio::time::sleep(std::time::Duration::from_millis(250)).await; + let done_bytes = completed_bytes.load(std::sync::atomic::Ordering::Relaxed); + let done_files = completed_files.load(std::sync::atomic::Ordering::Relaxed); + let elapsed = start.elapsed().as_secs_f64(); + let speed = if elapsed > 0.5 { done_bytes as f64 / elapsed } else { 0.0 }; + let remaining_bytes = total_bytes.saturating_sub(done_bytes); + let eta_secs = if speed > 0.0 { remaining_bytes as f64 / speed } else { 0.0 }; + let eta_min = eta_secs as u64 / 60; + let eta_sec = eta_secs as u64 % 60; + overall.set_message(format!( + "{}/{} {}/s ETA: {eta_min}m {eta_sec:02}s", + HumanBytes(done_bytes), + HumanBytes(total_bytes), + HumanBytes(speed as u64), + )); + overall.set_position(done_files); + if done_files >= total_files { + break; + } + } + }); + } for job in jobs { let permit = sem.clone().acquire_owned().await.unwrap(); let drive = Arc::clone(&drive); let multi = multi.clone(); - let overall = overall.clone(); let file_style = file_style.clone(); + let completed_bytes = Arc::clone(&completed_bytes); + let completed_files = Arc::clone(&completed_files); + let job_size = job.file_size; let handle = tokio::spawn(async move { - let pb = multi.add(ProgressBar::new(job.file_size)); + let pb = multi.insert_before(&multi.add(ProgressBar::hidden()), ProgressBar::new_spinner()); pb.set_style(file_style); - pb.set_message(job.relative_path.clone()); + pb.set_message(format!("⬆ {}", job.relative_path)); + pb.enable_steady_tick(std::time::Duration::from_millis(80)); let result = upload_single_file_with_progress(&drive, &job, &pb).await; + if result.is_ok() { + pb.set_message(format!("✓ {}", job.relative_path)); + } else { + pb.set_message(format!("✗ {}", job.relative_path)); + } + pb.finish(); + tokio::time::sleep(std::time::Duration::from_millis(150)).await; pb.finish_and_clear(); multi.remove(&pb); - overall.inc(1); + + completed_bytes.fetch_add(job_size, std::sync::atomic::Ordering::Relaxed); + completed_files.fetch_add(1, std::sync::atomic::Ordering::Relaxed); drop(permit); result @@ -812,13 +867,23 @@ async fn upload_recursive( handles.push(handle); } + let mut errors = 0u64; for handle in handles { - handle - .await - .map_err(|e| crate::error::SunbeamError::Other(format!("task join: {e}")))??; + match handle.await { + Ok(Ok(())) => {} + Ok(Err(e)) => { + errors += 1; + multi.suspend(|| eprintln!(" ERROR: {e}")); + } + Err(e) => { + errors += 1; + multi.suspend(|| eprintln!(" ERROR: task panic: {e}")); + } + } } overall.finish_and_clear(); + multi.clear().ok(); let elapsed = start.elapsed(); let secs = elapsed.as_secs_f64(); @@ -829,11 +894,20 @@ async fn upload_recursive( }; let mins = elapsed.as_secs() / 60; let secs_rem = elapsed.as_secs() % 60; - println!( - "\u{2713} Uploaded {total_files} files ({}) in {mins}m {secs_rem}s ({}/s)", - HumanBytes(total_bytes), - HumanBytes(speed as u64), - ); + let uploaded = total_files - errors; + if errors > 0 { + println!( + "✓ Uploaded {uploaded}/{total_files} files ({}) in {mins}m {secs_rem}s ({}/s) — {errors} failed", + HumanBytes(total_bytes), + HumanBytes(speed as u64), + ); + } else { + println!( + "✓ Uploaded {total_files} files ({}) in {mins}m {secs_rem}s ({}/s)", + HumanBytes(total_bytes), + HumanBytes(speed as u64), + ); + } Ok(()) } @@ -864,7 +938,7 @@ async fn collect_upload_jobs( format!("{prefix}/{dir_name}") }; - output::step(&format!("Creating folder: {dir_name}")); + eprint!("\r\x1b[K Creating folder: {display_prefix} "); let folder = drive .create_child(