diff --git a/Cargo.lock b/Cargo.lock index b74dbd7..a6f8773 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -532,6 +532,19 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "console" +version = "0.15.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "054ccb5b10f9f2cbf51eb355ca1d05c2d279ce1804688d0db74b4733a5aeafd8" +dependencies = [ + "encode_unicode", + "libc", + "once_cell", + "unicode-width", + "windows-sys 0.59.0", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -936,6 +949,12 @@ version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e079f19b08ca6239f47f8ba8509c11cf3ea30095831f7fed61441475edd8c449" +[[package]] +name = "encode_unicode" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0" + [[package]] name = "enum-ordinalize" version = "4.3.2" @@ -1672,6 +1691,20 @@ dependencies = [ "serde_core", ] +[[package]] +name = "indicatif" +version = "0.17.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "183b3088984b400f4cfac3620d5e076c84da5364016b4f49473de574b2586235" +dependencies = [ + "console", + "number_prefix", + "portable-atomic", + "tokio", + "unicode-width", + "web-time", +] + [[package]] name = "inout" version = "0.1.4" @@ -2145,6 +2178,12 @@ dependencies = [ "libc", ] +[[package]] +name = "number_prefix" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" + [[package]] name = "object" version = "0.37.3" @@ -2478,6 +2517,12 @@ dependencies = [ "universal-hash", ] +[[package]] +name = "portable-atomic" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" + [[package]] name = "potential_utf" version = "0.1.4" @@ -3526,6 +3571,7 @@ dependencies = [ "flate2", "futures", "hmac", + "indicatif", "k8s-openapi", "kube", "lettre", @@ -3948,6 +3994,12 @@ version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" +[[package]] +name = "unicode-width" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4ac048d71ede7ee76d585517add45da530660ef4390e49b098733c6e897f254" + [[package]] name = "unicode-xid" version = "0.2.6" diff --git a/sunbeam-sdk/Cargo.toml b/sunbeam-sdk/Cargo.toml index 953b60b..d81be61 100644 --- a/sunbeam-sdk/Cargo.toml +++ b/sunbeam-sdk/Cargo.toml @@ -55,6 +55,7 @@ base64 = "0.22" rand = "0.8" aes-gcm = "0.10" argon2 = "0.5" +indicatif = { version = "0.17", features = ["tokio"] } # Certificate generation rcgen = "0.14" diff --git a/sunbeam-sdk/src/lasuite/cli.rs b/sunbeam-sdk/src/lasuite/cli.rs index fc2b1cc..b3ed79d 100644 --- a/sunbeam-sdk/src/lasuite/cli.rs +++ b/sunbeam-sdk/src/lasuite/cli.rs @@ -558,6 +558,9 @@ pub enum DriveCommand { /// Target Drive folder ID. #[arg(short = 't', long)] folder_id: String, + /// Number of concurrent uploads. + #[arg(long, default_value = "4")] + parallel: usize, }, } @@ -623,13 +626,14 @@ pub async fn dispatch_drive( let page_data = drive.list_files(page).await?; output::render_list( &page_data.results, - &["ID", "NAME", "SIZE", "MIME_TYPE"], + &["ID", "TITLE", "TYPE", "SIZE", "MIMETYPE"], |f| { vec![ f.id.clone(), - f.name.clone().unwrap_or_default(), + f.title.clone().unwrap_or_default(), + f.item_type.clone().unwrap_or_default(), f.size.map_or("-".into(), |s| s.to_string()), - f.mime_type.clone().unwrap_or_default(), + f.mimetype.clone().unwrap_or_default(), ] }, fmt, @@ -655,12 +659,13 @@ pub async fn dispatch_drive( let page_data = drive.list_folders(page).await?; output::render_list( &page_data.results, - &["ID", "NAME", "PARENT_ID"], + &["ID", "TITLE", "CHILDREN", "CREATED"], |f| { vec![ f.id.clone(), - f.name.clone().unwrap_or_default(), - f.parent_id.clone().unwrap_or_default(), + f.title.clone().unwrap_or_default(), + f.numchild.map_or("-".into(), |n| n.to_string()), + f.created_at.clone().unwrap_or_default(), ] }, fmt, @@ -696,18 +701,31 @@ pub async fn dispatch_drive( ) } }, - DriveCommand::Upload { path, folder_id } => { - upload_recursive(drive, &path, &folder_id).await + DriveCommand::Upload { path, folder_id, parallel } => { + upload_recursive(drive, &path, &folder_id, parallel).await } } } +/// A file that needs uploading, collected during the directory-walk phase. +struct UploadJob { + local_path: std::path::PathBuf, + parent_id: String, + file_size: u64, + relative_path: String, +} + /// Recursively upload a local file or directory to a Drive folder. async fn upload_recursive( drive: &super::DriveClient, local_path: &str, parent_id: &str, + parallel: usize, ) -> Result<()> { + use indicatif::{HumanBytes, MultiProgress, ProgressBar, ProgressStyle}; + use std::sync::Arc; + use tokio::sync::Semaphore; + let path = std::path::Path::new(local_path); if !path.exists() { return Err(crate::error::SunbeamError::Other(format!( @@ -715,30 +733,139 @@ async fn upload_recursive( ))); } + // Phase 1 — Walk and collect: create folders sequentially, gather file jobs. + let mut jobs = Vec::new(); if path.is_file() { - upload_single_file(drive, path, parent_id).await + let file_size = std::fs::metadata(path) + .map_err(|e| crate::error::SunbeamError::Other(format!("stat: {e}")))? + .len(); + let filename = path + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or("unnamed"); + if !filename.starts_with('.') { + jobs.push(UploadJob { + local_path: path.to_path_buf(), + parent_id: parent_id.to_string(), + file_size, + relative_path: filename.to_string(), + }); + } } else if path.is_dir() { - upload_directory(drive, path, parent_id).await + collect_upload_jobs(drive, path, parent_id, "", &mut jobs).await?; } else { - Err(crate::error::SunbeamError::Other(format!( + return Err(crate::error::SunbeamError::Other(format!( "Not a file or directory: {local_path}" - ))) + ))); } + + if jobs.is_empty() { + output::ok("Nothing to upload."); + return Ok(()); + } + + let total_files = jobs.len() as u64; + let total_bytes: u64 = jobs.iter().map(|j| j.file_size).sum(); + + // Phase 2 — Parallel upload with progress bars. + let multi = MultiProgress::new(); + let overall_style = ProgressStyle::with_template( + "{spinner:.green} [{elapsed_precise}] {bar:40.cyan/blue} {pos}/{len} files ({binary_bytes_per_sec}) ETA: {eta}", + ) + .unwrap() + .progress_chars("\u{2588}\u{2593}\u{2591}"); + let overall = multi.add(ProgressBar::new(total_files)); + overall.set_style(overall_style); + overall.set_length(total_files); + + let file_style = ProgressStyle::with_template( + " {spinner:.cyan} {wide_msg} {bytes}/{total_bytes}", + ) + .unwrap(); + + let sem = Arc::new(Semaphore::new(parallel)); + let drive = Arc::new(drive.clone()); + let mut handles = Vec::new(); + let start = std::time::Instant::now(); + + for job in jobs { + let permit = sem.clone().acquire_owned().await.unwrap(); + let drive = Arc::clone(&drive); + let multi = multi.clone(); + let overall = overall.clone(); + let file_style = file_style.clone(); + + let handle = tokio::spawn(async move { + let pb = multi.add(ProgressBar::new(job.file_size)); + pb.set_style(file_style); + pb.set_message(job.relative_path.clone()); + + let result = upload_single_file_with_progress(&drive, &job, &pb).await; + + pb.finish_and_clear(); + multi.remove(&pb); + overall.inc(1); + + drop(permit); + result + }); + handles.push(handle); + } + + for handle in handles { + handle + .await + .map_err(|e| crate::error::SunbeamError::Other(format!("task join: {e}")))??; + } + + overall.finish_and_clear(); + + let elapsed = start.elapsed(); + let secs = elapsed.as_secs_f64(); + let speed = if secs > 0.0 { + total_bytes as f64 / secs + } else { + 0.0 + }; + let mins = elapsed.as_secs() / 60; + let secs_rem = elapsed.as_secs() % 60; + println!( + "\u{2713} Uploaded {total_files} files ({}) in {mins}m {secs_rem}s ({}/s)", + HumanBytes(total_bytes), + HumanBytes(speed as u64), + ); + + Ok(()) } -async fn upload_directory( +/// Phase 1: Walk a directory recursively, create folders in Drive sequentially, +/// and collect [`UploadJob`]s for every regular file. +async fn collect_upload_jobs( drive: &super::DriveClient, dir: &std::path::Path, parent_id: &str, + prefix: &str, + jobs: &mut Vec, ) -> Result<()> { let dir_name = dir .file_name() .and_then(|n| n.to_str()) .unwrap_or("unnamed"); + // Skip hidden directories + if dir_name.starts_with('.') { + return Ok(()); + } + + // Build the display prefix for children + let display_prefix = if prefix.is_empty() { + dir_name.to_string() + } else { + format!("{prefix}/{dir_name}") + }; + output::step(&format!("Creating folder: {dir_name}")); - // Create the folder in Drive let folder = drive .create_child( parent_id, @@ -751,9 +878,9 @@ async fn upload_directory( let folder_id = folder["id"] .as_str() - .ok_or_else(|| crate::error::SunbeamError::Other("No folder ID in response".into()))?; + .ok_or_else(|| crate::error::SunbeamError::Other("No folder ID in response".into()))? + .to_string(); - // Process entries let mut entries: Vec<_> = std::fs::read_dir(dir) .map_err(|e| crate::error::SunbeamError::Other(format!("reading dir: {e}")))? .filter_map(|e| e.ok()) @@ -762,39 +889,61 @@ async fn upload_directory( for entry in entries { let entry_path = entry.path(); + let name = entry + .file_name() + .to_str() + .unwrap_or_default() + .to_string(); + + // Skip hidden entries + if name.starts_with('.') { + continue; + } + if entry_path.is_dir() { - Box::pin(upload_directory(drive, &entry_path, folder_id)).await?; + Box::pin(collect_upload_jobs( + drive, + &entry_path, + &folder_id, + &display_prefix, + jobs, + )) + .await?; } else if entry_path.is_file() { - upload_single_file(drive, &entry_path, folder_id).await?; + let file_size = std::fs::metadata(&entry_path) + .map_err(|e| crate::error::SunbeamError::Other(format!("stat: {e}")))? + .len(); + jobs.push(UploadJob { + local_path: entry_path, + parent_id: folder_id.clone(), + file_size, + relative_path: format!("{display_prefix}/{name}"), + }); } } Ok(()) } -async fn upload_single_file( +/// Upload a single file to Drive, updating the progress bar. +async fn upload_single_file_with_progress( drive: &super::DriveClient, - file_path: &std::path::Path, - parent_id: &str, + job: &UploadJob, + pb: &indicatif::ProgressBar, ) -> Result<()> { - let filename = file_path + let filename = job + .local_path .file_name() .and_then(|n| n.to_str()) .unwrap_or("unnamed"); - // Skip hidden files - if filename.starts_with('.') { - return Ok(()); - } - - output::ok(&format!("Uploading: {filename}")); - // Create the file item in Drive let item = drive .create_child( - parent_id, + &job.parent_id, &serde_json::json!({ "title": filename, + "filename": filename, "type": "file", }), ) @@ -804,17 +953,24 @@ async fn upload_single_file( .as_str() .ok_or_else(|| crate::error::SunbeamError::Other("No item ID in response".into()))?; - // Get the presigned upload URL (Drive returns it as "policy" on create) let upload_url = item["policy"] .as_str() - .ok_or_else(|| crate::error::SunbeamError::Other("No upload policy URL in response — is the item a file?".into()))?; + .ok_or_else(|| { + crate::error::SunbeamError::Other( + "No upload policy URL in response \u{2014} is the item a file?".into(), + ) + })?; + + tracing::debug!("S3 presigned URL: {upload_url}"); // Read the file and upload to S3 - let data = std::fs::read(file_path) + let data = std::fs::read(&job.local_path) .map_err(|e| crate::error::SunbeamError::Other(format!("reading file: {e}")))?; + let len = data.len() as u64; drive .upload_to_s3(upload_url, bytes::Bytes::from(data)) .await?; + pb.set_position(len); // Notify Drive the upload is complete drive.upload_ended(item_id).await?; diff --git a/sunbeam-sdk/src/lasuite/drive.rs b/sunbeam-sdk/src/lasuite/drive.rs index d02c0a7..184e91e 100644 --- a/sunbeam-sdk/src/lasuite/drive.rs +++ b/sunbeam-sdk/src/lasuite/drive.rs @@ -6,6 +6,7 @@ use reqwest::Method; use super::types::*; /// Client for the La Suite Drive API. +#[derive(Clone)] pub struct DriveClient { pub(crate) transport: HttpTransport, } @@ -160,10 +161,11 @@ impl DriveClient { } /// Upload file bytes directly to a presigned S3 URL. + /// The presigned URL's SigV4 signature covers host + x-amz-acl headers. pub async fn upload_to_s3(&self, presigned_url: &str, data: bytes::Bytes) -> Result<()> { let resp = reqwest::Client::new() .put(presigned_url) - .header("Content-Type", "application/octet-stream") + .header("x-amz-acl", "private") .body(data) .send() .await diff --git a/sunbeam-sdk/src/lasuite/types.rs b/sunbeam-sdk/src/lasuite/types.rs index 0a13fc6..9f8f8fa 100644 --- a/sunbeam-sdk/src/lasuite/types.rs +++ b/sunbeam-sdk/src/lasuite/types.rs @@ -219,13 +219,17 @@ pub struct DriveFile { #[serde(default)] pub id: String, #[serde(default)] - pub name: Option, + pub title: Option, + #[serde(default)] + pub filename: Option, + #[serde(default, rename = "type")] + pub item_type: Option, #[serde(default)] pub size: Option, #[serde(default)] - pub mime_type: Option, + pub mimetype: Option, #[serde(default)] - pub folder_id: Option, + pub upload_state: Option, #[serde(default)] pub url: Option, #[serde(default)] @@ -234,15 +238,17 @@ pub struct DriveFile { pub updated_at: Option, } -/// A folder in the Drive service. +/// A folder in the Drive service (same API, type=folder). #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct DriveFolder { #[serde(default)] pub id: String, #[serde(default)] - pub name: Option, + pub title: Option, + #[serde(default, rename = "type")] + pub item_type: Option, #[serde(default)] - pub parent_id: Option, + pub numchild: Option, #[serde(default)] pub created_at: Option, #[serde(default)]