fix: progress bar tracks files not bytes, retry on 502, dedup folders
- Overall bar progress based on file count (was bytes, causing 50% bar at low file count when large files uploaded first) - Bandwidth computed manually from completed bytes / elapsed time - Per-file bars show spinner + name only (no misleading 0 B counter) - S3 upload retries up to 3x on 502/503 with backoff - Folder dedup: list_children before create, reuse existing folders - File dedup: skip files already present in target folder - Connection pooling: reuse DriveClient's HTTP client for S3 PUTs - Default parallel back to 8 (retries handle transient 502s)
This commit is contained in:
@@ -559,7 +559,7 @@ pub enum DriveCommand {
|
|||||||
#[arg(short = 't', long)]
|
#[arg(short = 't', long)]
|
||||||
folder_id: String,
|
folder_id: String,
|
||||||
/// Number of concurrent uploads.
|
/// Number of concurrent uploads.
|
||||||
#[arg(long, default_value = "4")]
|
#[arg(long, default_value = "8")]
|
||||||
parallel: usize,
|
parallel: usize,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@@ -773,19 +773,16 @@ async fn upload_recursive(
|
|||||||
// Phase 2 — Parallel upload with progress bars.
|
// Phase 2 — Parallel upload with progress bars.
|
||||||
let multi = MultiProgress::new();
|
let multi = MultiProgress::new();
|
||||||
|
|
||||||
// Overall bar tracks bytes (not file count) for accurate speed + ETA.
|
// Overall bar tracks file count. Bandwidth is computed manually in the message.
|
||||||
let overall_style = ProgressStyle::with_template(
|
let overall_style = ProgressStyle::with_template(
|
||||||
" {spinner:.green} [{elapsed_precise}] {bar:40.cyan/blue} {pos}/{len} files {msg}",
|
" {spinner:.green} [{elapsed_precise}] {bar:40.cyan/blue} {pos}/{len} files {msg}",
|
||||||
)
|
)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.progress_chars("━╸─");
|
.progress_chars("█▓░");
|
||||||
let overall = multi.add(ProgressBar::new(total_files));
|
let overall = multi.add(ProgressBar::new(total_files));
|
||||||
overall.set_style(overall_style);
|
overall.set_style(overall_style);
|
||||||
overall.enable_steady_tick(std::time::Duration::from_millis(100));
|
overall.enable_steady_tick(std::time::Duration::from_millis(100));
|
||||||
|
let completed_bytes = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
|
||||||
// Byte-tracking bar (hidden) — drives speed + ETA calculation.
|
|
||||||
let bytes_bar = multi.add(ProgressBar::hidden());
|
|
||||||
bytes_bar.set_length(total_bytes);
|
|
||||||
|
|
||||||
let file_style = ProgressStyle::with_template(
|
let file_style = ProgressStyle::with_template(
|
||||||
" {spinner:.cyan} {wide_msg}",
|
" {spinner:.cyan} {wide_msg}",
|
||||||
@@ -796,70 +793,45 @@ async fn upload_recursive(
|
|||||||
let drive = Arc::new(drive.clone());
|
let drive = Arc::new(drive.clone());
|
||||||
let mut handles = Vec::new();
|
let mut handles = Vec::new();
|
||||||
let start = std::time::Instant::now();
|
let start = std::time::Instant::now();
|
||||||
let completed_bytes = Arc::new(std::sync::atomic::AtomicU64::new(0));
|
|
||||||
let completed_files = Arc::new(std::sync::atomic::AtomicU64::new(0));
|
|
||||||
|
|
||||||
// Spawn a task to update the overall bar message with speed + ETA.
|
|
||||||
{
|
|
||||||
let overall = overall.clone();
|
|
||||||
let completed_bytes = Arc::clone(&completed_bytes);
|
|
||||||
let completed_files = Arc::clone(&completed_files);
|
|
||||||
let total_bytes = total_bytes;
|
|
||||||
let total_files = total_files;
|
|
||||||
tokio::spawn(async move {
|
|
||||||
loop {
|
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(250)).await;
|
|
||||||
let done_bytes = completed_bytes.load(std::sync::atomic::Ordering::Relaxed);
|
|
||||||
let done_files = completed_files.load(std::sync::atomic::Ordering::Relaxed);
|
|
||||||
let elapsed = start.elapsed().as_secs_f64();
|
|
||||||
let speed = if elapsed > 0.5 { done_bytes as f64 / elapsed } else { 0.0 };
|
|
||||||
let remaining_bytes = total_bytes.saturating_sub(done_bytes);
|
|
||||||
let eta_secs = if speed > 0.0 { remaining_bytes as f64 / speed } else { 0.0 };
|
|
||||||
let eta_min = eta_secs as u64 / 60;
|
|
||||||
let eta_sec = eta_secs as u64 % 60;
|
|
||||||
overall.set_message(format!(
|
|
||||||
"{}/{} {}/s ETA: {eta_min}m {eta_sec:02}s",
|
|
||||||
HumanBytes(done_bytes),
|
|
||||||
HumanBytes(total_bytes),
|
|
||||||
HumanBytes(speed as u64),
|
|
||||||
));
|
|
||||||
overall.set_position(done_files);
|
|
||||||
if done_files >= total_files {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
for job in jobs {
|
for job in jobs {
|
||||||
let permit = sem.clone().acquire_owned().await.unwrap();
|
let permit = sem.clone().acquire_owned().await.unwrap();
|
||||||
let drive = Arc::clone(&drive);
|
let drive = Arc::clone(&drive);
|
||||||
let multi = multi.clone();
|
let multi = multi.clone();
|
||||||
|
let overall = overall.clone();
|
||||||
let file_style = file_style.clone();
|
let file_style = file_style.clone();
|
||||||
let completed_bytes = Arc::clone(&completed_bytes);
|
|
||||||
let completed_files = Arc::clone(&completed_files);
|
|
||||||
let job_size = job.file_size;
|
let job_size = job.file_size;
|
||||||
|
let completed_bytes = Arc::clone(&completed_bytes);
|
||||||
|
let total_bytes = total_bytes;
|
||||||
|
let start = start.clone();
|
||||||
|
|
||||||
let handle = tokio::spawn(async move {
|
let handle = tokio::spawn(async move {
|
||||||
let pb = multi.insert_before(&multi.add(ProgressBar::hidden()), ProgressBar::new_spinner());
|
let pb = multi.add(ProgressBar::new_spinner());
|
||||||
pb.set_style(file_style);
|
pb.set_style(file_style);
|
||||||
pb.set_message(format!("⬆ {}", job.relative_path));
|
pb.set_message(job.relative_path.clone());
|
||||||
pb.enable_steady_tick(std::time::Duration::from_millis(80));
|
pb.enable_steady_tick(std::time::Duration::from_millis(80));
|
||||||
|
|
||||||
let result = upload_single_file_with_progress(&drive, &job, &pb).await;
|
let result = upload_single_file_with_progress(&drive, &job, &pb).await;
|
||||||
|
|
||||||
if result.is_ok() {
|
|
||||||
pb.set_message(format!("✓ {}", job.relative_path));
|
|
||||||
} else {
|
|
||||||
pb.set_message(format!("✗ {}", job.relative_path));
|
|
||||||
}
|
|
||||||
pb.finish();
|
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(150)).await;
|
|
||||||
pb.finish_and_clear();
|
pb.finish_and_clear();
|
||||||
multi.remove(&pb);
|
multi.remove(&pb);
|
||||||
|
|
||||||
completed_bytes.fetch_add(job_size, std::sync::atomic::Ordering::Relaxed);
|
// Update overall — increment file count, compute bandwidth from bytes
|
||||||
completed_files.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
overall.inc(1);
|
||||||
|
let done_bytes = completed_bytes.fetch_add(job_size, std::sync::atomic::Ordering::Relaxed) + job_size;
|
||||||
|
let elapsed = start.elapsed().as_secs_f64();
|
||||||
|
let speed = if elapsed > 1.0 { done_bytes as f64 / elapsed } else { 0.0 };
|
||||||
|
let remaining = total_bytes.saturating_sub(done_bytes);
|
||||||
|
let eta = if speed > 0.0 { remaining as f64 / speed } else { 0.0 };
|
||||||
|
let eta_m = eta as u64 / 60;
|
||||||
|
let eta_s = eta as u64 % 60;
|
||||||
|
overall.set_message(format!(
|
||||||
|
"{}/{} {}/s ETA: {}m {:02}s",
|
||||||
|
indicatif::HumanBytes(done_bytes),
|
||||||
|
indicatif::HumanBytes(total_bytes),
|
||||||
|
indicatif::HumanBytes(speed as u64),
|
||||||
|
eta_m, eta_s,
|
||||||
|
));
|
||||||
|
|
||||||
drop(permit);
|
drop(permit);
|
||||||
result
|
result
|
||||||
@@ -938,22 +910,54 @@ async fn collect_upload_jobs(
|
|||||||
format!("{prefix}/{dir_name}")
|
format!("{prefix}/{dir_name}")
|
||||||
};
|
};
|
||||||
|
|
||||||
eprint!("\r\x1b[K Creating folder: {display_prefix} ");
|
eprint!("\r\x1b[K Scanning: {display_prefix} ");
|
||||||
|
|
||||||
let folder = drive
|
// Check if folder already exists under the parent.
|
||||||
.create_child(
|
let existing = drive.list_children(parent_id, None).await.ok();
|
||||||
parent_id,
|
let existing_folder_id = existing.and_then(|page| {
|
||||||
&serde_json::json!({
|
page.results.iter().find_map(|item| {
|
||||||
"title": dir_name,
|
let is_folder = item.get("type").and_then(|v| v.as_str()) == Some("folder");
|
||||||
"type": "folder",
|
let title_matches = item.get("title").and_then(|v| v.as_str()) == Some(dir_name);
|
||||||
}),
|
if is_folder && title_matches {
|
||||||
)
|
item.get("id").and_then(|v| v.as_str()).map(String::from)
|
||||||
.await?;
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
let folder_id = folder["id"]
|
let folder_id = if let Some(id) = existing_folder_id {
|
||||||
.as_str()
|
id
|
||||||
.ok_or_else(|| crate::error::SunbeamError::Other("No folder ID in response".into()))?
|
} else {
|
||||||
.to_string();
|
let folder = drive
|
||||||
|
.create_child(
|
||||||
|
parent_id,
|
||||||
|
&serde_json::json!({
|
||||||
|
"title": dir_name,
|
||||||
|
"type": "folder",
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
folder["id"]
|
||||||
|
.as_str()
|
||||||
|
.ok_or_else(|| crate::error::SunbeamError::Other("No folder ID in response".into()))?
|
||||||
|
.to_string()
|
||||||
|
};
|
||||||
|
|
||||||
|
// Build a set of existing file titles in this folder to skip duplicates.
|
||||||
|
let existing_file_titles: std::collections::HashSet<String> = {
|
||||||
|
let mut titles = std::collections::HashSet::new();
|
||||||
|
if let Ok(page) = drive.list_children(&folder_id, None).await {
|
||||||
|
for item in &page.results {
|
||||||
|
if item.get("type").and_then(|v| v.as_str()) == Some("file") {
|
||||||
|
if let Some(title) = item.get("title").and_then(|v| v.as_str()) {
|
||||||
|
titles.insert(title.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
titles
|
||||||
|
};
|
||||||
|
|
||||||
let mut entries: Vec<_> = std::fs::read_dir(dir)
|
let mut entries: Vec<_> = std::fs::read_dir(dir)
|
||||||
.map_err(|e| crate::error::SunbeamError::Other(format!("reading dir: {e}")))?
|
.map_err(|e| crate::error::SunbeamError::Other(format!("reading dir: {e}")))?
|
||||||
@@ -984,6 +988,10 @@ async fn collect_upload_jobs(
|
|||||||
))
|
))
|
||||||
.await?;
|
.await?;
|
||||||
} else if entry_path.is_file() {
|
} else if entry_path.is_file() {
|
||||||
|
// Skip if a file with this title already exists in the folder.
|
||||||
|
if existing_file_titles.contains(&name) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
let file_size = std::fs::metadata(&entry_path)
|
let file_size = std::fs::metadata(&entry_path)
|
||||||
.map_err(|e| crate::error::SunbeamError::Other(format!("stat: {e}")))?
|
.map_err(|e| crate::error::SunbeamError::Other(format!("stat: {e}")))?
|
||||||
.len();
|
.len();
|
||||||
|
|||||||
@@ -162,21 +162,38 @@ impl DriveClient {
|
|||||||
|
|
||||||
/// Upload file bytes directly to a presigned S3 URL.
|
/// Upload file bytes directly to a presigned S3 URL.
|
||||||
/// The presigned URL's SigV4 signature covers host + x-amz-acl headers.
|
/// The presigned URL's SigV4 signature covers host + x-amz-acl headers.
|
||||||
|
/// Retries up to 3 times on 502/503/connection errors.
|
||||||
pub async fn upload_to_s3(&self, presigned_url: &str, data: bytes::Bytes) -> Result<()> {
|
pub async fn upload_to_s3(&self, presigned_url: &str, data: bytes::Bytes) -> Result<()> {
|
||||||
let resp = reqwest::Client::new()
|
let max_retries = 3;
|
||||||
.put(presigned_url)
|
for attempt in 0..=max_retries {
|
||||||
.header("x-amz-acl", "private")
|
let resp = self.transport.http
|
||||||
.body(data)
|
.put(presigned_url)
|
||||||
.send()
|
.header("x-amz-acl", "private")
|
||||||
.await
|
.body(data.clone())
|
||||||
.map_err(|e| crate::error::SunbeamError::network(format!("S3 upload: {e}")))?;
|
.send()
|
||||||
|
.await;
|
||||||
|
|
||||||
if !resp.status().is_success() {
|
match resp {
|
||||||
let status = resp.status();
|
Ok(r) if r.status().is_success() => return Ok(()),
|
||||||
let body = resp.text().await.unwrap_or_default();
|
Ok(r) if (r.status() == 502 || r.status() == 503) && attempt < max_retries => {
|
||||||
return Err(crate::error::SunbeamError::network(format!(
|
tokio::time::sleep(std::time::Duration::from_millis(500 * (attempt as u64 + 1))).await;
|
||||||
"S3 upload: HTTP {status}: {body}"
|
continue;
|
||||||
)));
|
}
|
||||||
|
Ok(r) => {
|
||||||
|
let status = r.status();
|
||||||
|
let body = r.text().await.unwrap_or_default();
|
||||||
|
return Err(crate::error::SunbeamError::network(format!(
|
||||||
|
"S3 upload: HTTP {status}: {body}"
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
Err(_) if attempt < max_retries => {
|
||||||
|
tokio::time::sleep(std::time::Duration::from_millis(500 * (attempt as u64 + 1))).await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
return Err(crate::error::SunbeamError::network(format!("S3 upload: {e}")));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user