feat: S3Client — object storage API (21 endpoints)

Typed S3-compatible API covering buckets, objects, multipart uploads,
tagging, versioning, lifecycle, CORS, ACL, and policies.

Bump: sunbeam-sdk v0.8.0
This commit is contained in:
2026-03-21 20:28:49 +00:00
parent 329c18bd1d
commit a33697c2fb
4 changed files with 586 additions and 2 deletions

2
Cargo.lock generated
View File

@@ -3591,7 +3591,7 @@ dependencies = [
[[package]] [[package]]
name = "sunbeam-sdk" name = "sunbeam-sdk"
version = "0.6.0" version = "0.7.0"
dependencies = [ dependencies = [
"base64", "base64",
"bytes", "bytes",

View File

@@ -1,6 +1,6 @@
[package] [package]
name = "sunbeam-sdk" name = "sunbeam-sdk"
version = "0.7.0" version = "0.8.0"
edition = "2024" edition = "2024"
description = "Sunbeam SDK — reusable library for cluster management" description = "Sunbeam SDK — reusable library for cluster management"
repository = "https://src.sunbeam.pt/studio/cli" repository = "https://src.sunbeam.pt/studio/cli"

View File

@@ -0,0 +1,423 @@
//! S3-compatible storage client.
pub mod types;
use crate::client::{AuthMethod, HttpTransport, ServiceClient};
use crate::error::{Result, ResultExt, SunbeamError};
use bytes::Bytes;
use reqwest::Method;
use types::*;
/// Client for S3-compatible object storage.
pub struct S3Client {
pub(crate) transport: HttpTransport,
}
impl ServiceClient for S3Client {
fn service_name(&self) -> &'static str {
"s3"
}
fn base_url(&self) -> &str {
&self.transport.base_url
}
fn from_parts(base_url: String, auth: AuthMethod) -> Self {
Self {
transport: HttpTransport::new(&base_url, auth),
}
}
}
impl S3Client {
/// Build an S3Client from domain (e.g. `https://s3.{domain}`).
pub fn connect(domain: &str) -> Self {
let base_url = format!("https://s3.{domain}");
Self::from_parts(base_url, AuthMethod::None)
}
/// Replace the auth method (e.g. after obtaining credentials).
pub fn set_auth(&mut self, auth: AuthMethod) {
self.transport.set_auth(auth);
}
// -- Buckets ------------------------------------------------------------
/// Create a bucket.
pub async fn create_bucket(&self, bucket: &str) -> Result<()> {
self.transport
.send(Method::PUT, bucket, Option::<&()>::None, "s3 create bucket")
.await
}
/// Delete a bucket.
pub async fn delete_bucket(&self, bucket: &str) -> Result<()> {
self.transport
.send(Method::DELETE, bucket, Option::<&()>::None, "s3 delete bucket")
.await
}
/// List all buckets.
pub async fn list_buckets(&self) -> Result<ListBucketsResponse> {
self.transport
.json(Method::GET, "/", Option::<&()>::None, "s3 list buckets")
.await
}
/// Check if a bucket exists (HEAD request, returns true/false).
pub async fn head_bucket(&self, bucket: &str) -> Result<bool> {
let resp = self
.transport
.request(Method::HEAD, bucket)
.send()
.await
.with_ctx(|| format!("s3 head bucket {bucket}: request failed"))?;
Ok(resp.status().is_success())
}
/// Set versioning configuration on a bucket.
pub async fn set_versioning(&self, bucket: &str, body: &(impl serde::Serialize + Sync)) -> Result<()> {
self.transport
.send(
Method::PUT,
&format!("{bucket}?versioning"),
Some(body),
"s3 set versioning",
)
.await
}
/// Set lifecycle configuration on a bucket.
pub async fn set_lifecycle(&self, bucket: &str, body: &(impl serde::Serialize + Sync)) -> Result<()> {
self.transport
.send(
Method::PUT,
&format!("{bucket}?lifecycle"),
Some(body),
"s3 set lifecycle",
)
.await
}
/// Set CORS configuration on a bucket.
pub async fn set_cors(&self, bucket: &str, body: &(impl serde::Serialize + Sync)) -> Result<()> {
self.transport
.send(
Method::PUT,
&format!("{bucket}?cors"),
Some(body),
"s3 set cors",
)
.await
}
/// Get the ACL for a bucket.
pub async fn get_acl(&self, bucket: &str) -> Result<serde_json::Value> {
self.transport
.json(
Method::GET,
&format!("{bucket}?acl"),
Option::<&()>::None,
"s3 get acl",
)
.await
}
/// Set a bucket policy.
pub async fn set_policy(&self, bucket: &str, body: &(impl serde::Serialize + Sync)) -> Result<()> {
self.transport
.send(
Method::PUT,
&format!("{bucket}?policy"),
Some(body),
"s3 set policy",
)
.await
}
// -- Objects -------------------------------------------------------------
/// Upload an object.
pub async fn put_object(
&self,
bucket: &str,
key: &str,
content_type: &str,
data: Bytes,
) -> Result<()> {
let path = format!("{bucket}/{key}");
let resp = self
.transport
.request(Method::PUT, &path)
.header("Content-Type", content_type)
.body(data)
.send()
.await
.with_ctx(|| format!("s3 put object {path}: request failed"))?;
let status = resp.status();
if !status.is_success() {
let body_text = resp.text().await.unwrap_or_default();
return Err(SunbeamError::network(format!(
"s3 put object: HTTP {status}: {body_text}"
)));
}
Ok(())
}
/// Download an object.
pub async fn get_object(&self, bucket: &str, key: &str) -> Result<Bytes> {
self.transport
.bytes(Method::GET, &format!("{bucket}/{key}"), "s3 get object")
.await
}
/// Check if an object exists (HEAD request, returns true/false).
pub async fn head_object(&self, bucket: &str, key: &str) -> Result<bool> {
let path = format!("{bucket}/{key}");
let resp = self
.transport
.request(Method::HEAD, &path)
.send()
.await
.with_ctx(|| format!("s3 head object {path}: request failed"))?;
Ok(resp.status().is_success())
}
/// Delete an object.
pub async fn delete_object(&self, bucket: &str, key: &str) -> Result<()> {
self.transport
.send(
Method::DELETE,
&format!("{bucket}/{key}"),
Option::<&()>::None,
"s3 delete object",
)
.await
}
/// Copy an object from `source` to `{bucket}/{key}`.
pub async fn copy_object(&self, bucket: &str, key: &str, source: &str) -> Result<()> {
let path = format!("{bucket}/{key}");
let resp = self
.transport
.request(Method::PUT, &path)
.header("x-amz-copy-source", source)
.send()
.await
.with_ctx(|| format!("s3 copy object {path}: request failed"))?;
let status = resp.status();
if !status.is_success() {
let body_text = resp.text().await.unwrap_or_default();
return Err(SunbeamError::network(format!(
"s3 copy object: HTTP {status}: {body_text}"
)));
}
Ok(())
}
/// List objects in a bucket (v2).
pub async fn list_objects_v2(
&self,
bucket: &str,
prefix: Option<&str>,
max_keys: Option<u32>,
) -> Result<ListObjectsResponse> {
let mut path = format!("{bucket}?list-type=2");
if let Some(p) = prefix {
path.push_str(&format!("&prefix={p}"));
}
if let Some(m) = max_keys {
path.push_str(&format!("&max-keys={m}"));
}
self.transport
.json(Method::GET, &path, Option::<&()>::None, "s3 list objects v2")
.await
}
/// Set tags on an object.
pub async fn set_tags(
&self,
bucket: &str,
key: &str,
body: &(impl serde::Serialize + Sync),
) -> Result<()> {
self.transport
.send(
Method::PUT,
&format!("{bucket}/{key}?tagging"),
Some(body),
"s3 set tags",
)
.await
}
/// Get tags for an object.
pub async fn get_tags(&self, bucket: &str, key: &str) -> Result<serde_json::Value> {
self.transport
.json(
Method::GET,
&format!("{bucket}/{key}?tagging"),
Option::<&()>::None,
"s3 get tags",
)
.await
}
// -- Multipart -----------------------------------------------------------
/// Initiate a multipart upload.
pub async fn initiate_multipart(
&self,
bucket: &str,
key: &str,
) -> Result<InitiateMultipartResponse> {
self.transport
.json(
Method::POST,
&format!("{bucket}/{key}?uploads"),
Option::<&()>::None,
"s3 initiate multipart",
)
.await
}
/// Upload a single part of a multipart upload.
pub async fn upload_part(
&self,
bucket: &str,
key: &str,
upload_id: &str,
part_number: u32,
data: Bytes,
) -> Result<UploadPartResponse> {
let path = format!("{bucket}/{key}?partNumber={part_number}&uploadId={upload_id}");
let resp = self
.transport
.request(Method::PUT, &path)
.body(data)
.send()
.await
.with_ctx(|| format!("s3 upload part {path}: request failed"))?;
let status = resp.status();
if !status.is_success() {
let body_text = resp.text().await.unwrap_or_default();
return Err(SunbeamError::network(format!(
"s3 upload part: HTTP {status}: {body_text}"
)));
}
let etag = resp
.headers()
.get("ETag")
.and_then(|v| v.to_str().ok())
.unwrap_or("")
.to_string();
Ok(UploadPartResponse { etag, part_number })
}
/// Complete a multipart upload.
pub async fn complete_multipart(
&self,
bucket: &str,
key: &str,
upload_id: &str,
body: &(impl serde::Serialize + Sync),
) -> Result<CompleteMultipartResponse> {
self.transport
.json(
Method::POST,
&format!("{bucket}/{key}?uploadId={upload_id}"),
Some(body),
"s3 complete multipart",
)
.await
}
/// Abort a multipart upload.
pub async fn abort_multipart(
&self,
bucket: &str,
key: &str,
upload_id: &str,
) -> Result<()> {
self.transport
.send(
Method::DELETE,
&format!("{bucket}/{key}?uploadId={upload_id}"),
Option::<&()>::None,
"s3 abort multipart",
)
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_connect_url() {
let c = S3Client::connect("sunbeam.pt");
assert_eq!(c.base_url(), "https://s3.sunbeam.pt");
assert_eq!(c.service_name(), "s3");
}
#[test]
fn test_from_parts() {
let c = S3Client::from_parts("http://localhost:9000".into(), AuthMethod::None);
assert_eq!(c.base_url(), "http://localhost:9000");
}
#[test]
fn test_set_auth() {
let mut c = S3Client::connect("sunbeam.pt");
assert!(matches!(c.transport.auth, AuthMethod::None));
c.set_auth(AuthMethod::Bearer("tok".into()));
assert!(matches!(c.transport.auth, AuthMethod::Bearer(ref s) if s == "tok"));
}
#[tokio::test]
async fn test_head_bucket_unreachable() {
let c = S3Client::from_parts("http://127.0.0.1:19998".into(), AuthMethod::None);
let result = c.head_bucket("test-bucket").await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_head_object_unreachable() {
let c = S3Client::from_parts("http://127.0.0.1:19998".into(), AuthMethod::None);
let result = c.head_object("bucket", "key").await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_list_buckets_unreachable() {
let c = S3Client::from_parts("http://127.0.0.1:19998".into(), AuthMethod::None);
let result = c.list_buckets().await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_put_object_unreachable() {
let c = S3Client::from_parts("http://127.0.0.1:19998".into(), AuthMethod::None);
let result = c
.put_object("bucket", "key", "text/plain", Bytes::from("hello"))
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_copy_object_unreachable() {
let c = S3Client::from_parts("http://127.0.0.1:19998".into(), AuthMethod::None);
let result = c.copy_object("bucket", "dest", "/src-bucket/src-key").await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_upload_part_unreachable() {
let c = S3Client::from_parts("http://127.0.0.1:19998".into(), AuthMethod::None);
let result = c
.upload_part("bucket", "key", "upload-id", 1, Bytes::from("data"))
.await;
assert!(result.is_err());
}
}

View File

@@ -0,0 +1,161 @@
//! S3 storage types.
use serde::{Deserialize, Serialize};
/// Response from ListBuckets (GET /).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListBucketsResponse {
#[serde(default, rename = "Buckets")]
pub buckets: Vec<Bucket>,
#[serde(default, rename = "Owner")]
pub owner: Option<Owner>,
}
/// A single S3 bucket.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Bucket {
#[serde(rename = "Name")]
pub name: String,
#[serde(default, rename = "CreationDate")]
pub creation_date: Option<String>,
}
/// Bucket owner info.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Owner {
#[serde(default, rename = "ID")]
pub id: Option<String>,
#[serde(default, rename = "DisplayName")]
pub display_name: Option<String>,
}
/// Response from ListObjectsV2.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListObjectsResponse {
#[serde(default, rename = "Name")]
pub name: String,
#[serde(default, rename = "Prefix")]
pub prefix: Option<String>,
#[serde(default, rename = "MaxKeys")]
pub max_keys: Option<u32>,
#[serde(default, rename = "IsTruncated")]
pub is_truncated: Option<bool>,
#[serde(default, rename = "Contents")]
pub contents: Vec<Object>,
#[serde(default, rename = "NextContinuationToken")]
pub next_continuation_token: Option<String>,
}
/// A single S3 object in a listing.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Object {
#[serde(rename = "Key")]
pub key: String,
#[serde(default, rename = "LastModified")]
pub last_modified: Option<String>,
#[serde(default, rename = "ETag")]
pub etag: Option<String>,
#[serde(default, rename = "Size")]
pub size: Option<u64>,
#[serde(default, rename = "StorageClass")]
pub storage_class: Option<String>,
}
/// Response from InitiateMultipartUpload.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InitiateMultipartResponse {
#[serde(rename = "Bucket")]
pub bucket: String,
#[serde(rename = "Key")]
pub key: String,
#[serde(rename = "UploadId")]
pub upload_id: String,
}
/// Response from UploadPart (extracted from headers).
#[derive(Debug, Clone)]
pub struct UploadPartResponse {
pub etag: String,
pub part_number: u32,
}
/// Response from CompleteMultipartUpload.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompleteMultipartResponse {
#[serde(default, rename = "Location")]
pub location: Option<String>,
#[serde(rename = "Bucket")]
pub bucket: String,
#[serde(rename = "Key")]
pub key: String,
#[serde(default, rename = "ETag")]
pub etag: Option<String>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_list_buckets_response_roundtrip() {
let json = serde_json::json!({
"Buckets": [
{"Name": "my-bucket", "CreationDate": "2024-01-01T00:00:00Z"}
],
"Owner": {"ID": "owner-id", "DisplayName": "Owner"}
});
let resp: ListBucketsResponse = serde_json::from_value(json).unwrap();
assert_eq!(resp.buckets.len(), 1);
assert_eq!(resp.buckets[0].name, "my-bucket");
assert_eq!(resp.owner.unwrap().display_name, Some("Owner".to_string()));
}
#[test]
fn test_list_objects_response_roundtrip() {
let json = serde_json::json!({
"Name": "my-bucket",
"Prefix": "docs/",
"MaxKeys": 1000,
"IsTruncated": false,
"Contents": [
{"Key": "docs/readme.md", "Size": 1024, "ETag": "\"abc\""}
]
});
let resp: ListObjectsResponse = serde_json::from_value(json).unwrap();
assert_eq!(resp.name, "my-bucket");
assert_eq!(resp.contents.len(), 1);
assert_eq!(resp.contents[0].key, "docs/readme.md");
}
#[test]
fn test_initiate_multipart_response() {
let json = serde_json::json!({
"Bucket": "my-bucket",
"Key": "large-file.bin",
"UploadId": "upload-123"
});
let resp: InitiateMultipartResponse = serde_json::from_value(json).unwrap();
assert_eq!(resp.upload_id, "upload-123");
}
#[test]
fn test_complete_multipart_response() {
let json = serde_json::json!({
"Bucket": "my-bucket",
"Key": "large-file.bin",
"Location": "https://s3.example.com/my-bucket/large-file.bin",
"ETag": "\"final-etag\""
});
let resp: CompleteMultipartResponse = serde_json::from_value(json).unwrap();
assert_eq!(resp.bucket, "my-bucket");
assert_eq!(resp.location, Some("https://s3.example.com/my-bucket/large-file.bin".to_string()));
}
#[test]
fn test_empty_list_buckets() {
let json = serde_json::json!({});
let resp: ListBucketsResponse = serde_json::from_value(json).unwrap();
assert!(resp.buckets.is_empty());
assert!(resp.owner.is_none());
}
}