feat: S3Client — object storage API (21 endpoints)
Typed S3-compatible API covering buckets, objects, multipart uploads, tagging, versioning, lifecycle, CORS, ACL, and policies. Bump: sunbeam-sdk v0.8.0
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -3591,7 +3591,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "sunbeam-sdk"
|
||||
version = "0.6.0"
|
||||
version = "0.7.0"
|
||||
dependencies = [
|
||||
"base64",
|
||||
"bytes",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "sunbeam-sdk"
|
||||
version = "0.7.0"
|
||||
version = "0.8.0"
|
||||
edition = "2024"
|
||||
description = "Sunbeam SDK — reusable library for cluster management"
|
||||
repository = "https://src.sunbeam.pt/studio/cli"
|
||||
|
||||
423
sunbeam-sdk/src/storage/mod.rs
Normal file
423
sunbeam-sdk/src/storage/mod.rs
Normal file
@@ -0,0 +1,423 @@
|
||||
//! S3-compatible storage client.
|
||||
|
||||
pub mod types;
|
||||
|
||||
use crate::client::{AuthMethod, HttpTransport, ServiceClient};
|
||||
use crate::error::{Result, ResultExt, SunbeamError};
|
||||
use bytes::Bytes;
|
||||
use reqwest::Method;
|
||||
use types::*;
|
||||
|
||||
/// Client for S3-compatible object storage.
|
||||
pub struct S3Client {
|
||||
pub(crate) transport: HttpTransport,
|
||||
}
|
||||
|
||||
impl ServiceClient for S3Client {
|
||||
fn service_name(&self) -> &'static str {
|
||||
"s3"
|
||||
}
|
||||
|
||||
fn base_url(&self) -> &str {
|
||||
&self.transport.base_url
|
||||
}
|
||||
|
||||
fn from_parts(base_url: String, auth: AuthMethod) -> Self {
|
||||
Self {
|
||||
transport: HttpTransport::new(&base_url, auth),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl S3Client {
|
||||
/// Build an S3Client from domain (e.g. `https://s3.{domain}`).
|
||||
pub fn connect(domain: &str) -> Self {
|
||||
let base_url = format!("https://s3.{domain}");
|
||||
Self::from_parts(base_url, AuthMethod::None)
|
||||
}
|
||||
|
||||
/// Replace the auth method (e.g. after obtaining credentials).
|
||||
pub fn set_auth(&mut self, auth: AuthMethod) {
|
||||
self.transport.set_auth(auth);
|
||||
}
|
||||
|
||||
// -- Buckets ------------------------------------------------------------
|
||||
|
||||
/// Create a bucket.
|
||||
pub async fn create_bucket(&self, bucket: &str) -> Result<()> {
|
||||
self.transport
|
||||
.send(Method::PUT, bucket, Option::<&()>::None, "s3 create bucket")
|
||||
.await
|
||||
}
|
||||
|
||||
/// Delete a bucket.
|
||||
pub async fn delete_bucket(&self, bucket: &str) -> Result<()> {
|
||||
self.transport
|
||||
.send(Method::DELETE, bucket, Option::<&()>::None, "s3 delete bucket")
|
||||
.await
|
||||
}
|
||||
|
||||
/// List all buckets.
|
||||
pub async fn list_buckets(&self) -> Result<ListBucketsResponse> {
|
||||
self.transport
|
||||
.json(Method::GET, "/", Option::<&()>::None, "s3 list buckets")
|
||||
.await
|
||||
}
|
||||
|
||||
/// Check if a bucket exists (HEAD request, returns true/false).
|
||||
pub async fn head_bucket(&self, bucket: &str) -> Result<bool> {
|
||||
let resp = self
|
||||
.transport
|
||||
.request(Method::HEAD, bucket)
|
||||
.send()
|
||||
.await
|
||||
.with_ctx(|| format!("s3 head bucket {bucket}: request failed"))?;
|
||||
Ok(resp.status().is_success())
|
||||
}
|
||||
|
||||
/// Set versioning configuration on a bucket.
|
||||
pub async fn set_versioning(&self, bucket: &str, body: &(impl serde::Serialize + Sync)) -> Result<()> {
|
||||
self.transport
|
||||
.send(
|
||||
Method::PUT,
|
||||
&format!("{bucket}?versioning"),
|
||||
Some(body),
|
||||
"s3 set versioning",
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Set lifecycle configuration on a bucket.
|
||||
pub async fn set_lifecycle(&self, bucket: &str, body: &(impl serde::Serialize + Sync)) -> Result<()> {
|
||||
self.transport
|
||||
.send(
|
||||
Method::PUT,
|
||||
&format!("{bucket}?lifecycle"),
|
||||
Some(body),
|
||||
"s3 set lifecycle",
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Set CORS configuration on a bucket.
|
||||
pub async fn set_cors(&self, bucket: &str, body: &(impl serde::Serialize + Sync)) -> Result<()> {
|
||||
self.transport
|
||||
.send(
|
||||
Method::PUT,
|
||||
&format!("{bucket}?cors"),
|
||||
Some(body),
|
||||
"s3 set cors",
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Get the ACL for a bucket.
|
||||
pub async fn get_acl(&self, bucket: &str) -> Result<serde_json::Value> {
|
||||
self.transport
|
||||
.json(
|
||||
Method::GET,
|
||||
&format!("{bucket}?acl"),
|
||||
Option::<&()>::None,
|
||||
"s3 get acl",
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Set a bucket policy.
|
||||
pub async fn set_policy(&self, bucket: &str, body: &(impl serde::Serialize + Sync)) -> Result<()> {
|
||||
self.transport
|
||||
.send(
|
||||
Method::PUT,
|
||||
&format!("{bucket}?policy"),
|
||||
Some(body),
|
||||
"s3 set policy",
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
// -- Objects -------------------------------------------------------------
|
||||
|
||||
/// Upload an object.
|
||||
pub async fn put_object(
|
||||
&self,
|
||||
bucket: &str,
|
||||
key: &str,
|
||||
content_type: &str,
|
||||
data: Bytes,
|
||||
) -> Result<()> {
|
||||
let path = format!("{bucket}/{key}");
|
||||
let resp = self
|
||||
.transport
|
||||
.request(Method::PUT, &path)
|
||||
.header("Content-Type", content_type)
|
||||
.body(data)
|
||||
.send()
|
||||
.await
|
||||
.with_ctx(|| format!("s3 put object {path}: request failed"))?;
|
||||
let status = resp.status();
|
||||
if !status.is_success() {
|
||||
let body_text = resp.text().await.unwrap_or_default();
|
||||
return Err(SunbeamError::network(format!(
|
||||
"s3 put object: HTTP {status}: {body_text}"
|
||||
)));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Download an object.
|
||||
pub async fn get_object(&self, bucket: &str, key: &str) -> Result<Bytes> {
|
||||
self.transport
|
||||
.bytes(Method::GET, &format!("{bucket}/{key}"), "s3 get object")
|
||||
.await
|
||||
}
|
||||
|
||||
/// Check if an object exists (HEAD request, returns true/false).
|
||||
pub async fn head_object(&self, bucket: &str, key: &str) -> Result<bool> {
|
||||
let path = format!("{bucket}/{key}");
|
||||
let resp = self
|
||||
.transport
|
||||
.request(Method::HEAD, &path)
|
||||
.send()
|
||||
.await
|
||||
.with_ctx(|| format!("s3 head object {path}: request failed"))?;
|
||||
Ok(resp.status().is_success())
|
||||
}
|
||||
|
||||
/// Delete an object.
|
||||
pub async fn delete_object(&self, bucket: &str, key: &str) -> Result<()> {
|
||||
self.transport
|
||||
.send(
|
||||
Method::DELETE,
|
||||
&format!("{bucket}/{key}"),
|
||||
Option::<&()>::None,
|
||||
"s3 delete object",
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Copy an object from `source` to `{bucket}/{key}`.
|
||||
pub async fn copy_object(&self, bucket: &str, key: &str, source: &str) -> Result<()> {
|
||||
let path = format!("{bucket}/{key}");
|
||||
let resp = self
|
||||
.transport
|
||||
.request(Method::PUT, &path)
|
||||
.header("x-amz-copy-source", source)
|
||||
.send()
|
||||
.await
|
||||
.with_ctx(|| format!("s3 copy object {path}: request failed"))?;
|
||||
let status = resp.status();
|
||||
if !status.is_success() {
|
||||
let body_text = resp.text().await.unwrap_or_default();
|
||||
return Err(SunbeamError::network(format!(
|
||||
"s3 copy object: HTTP {status}: {body_text}"
|
||||
)));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// List objects in a bucket (v2).
|
||||
pub async fn list_objects_v2(
|
||||
&self,
|
||||
bucket: &str,
|
||||
prefix: Option<&str>,
|
||||
max_keys: Option<u32>,
|
||||
) -> Result<ListObjectsResponse> {
|
||||
let mut path = format!("{bucket}?list-type=2");
|
||||
if let Some(p) = prefix {
|
||||
path.push_str(&format!("&prefix={p}"));
|
||||
}
|
||||
if let Some(m) = max_keys {
|
||||
path.push_str(&format!("&max-keys={m}"));
|
||||
}
|
||||
self.transport
|
||||
.json(Method::GET, &path, Option::<&()>::None, "s3 list objects v2")
|
||||
.await
|
||||
}
|
||||
|
||||
/// Set tags on an object.
|
||||
pub async fn set_tags(
|
||||
&self,
|
||||
bucket: &str,
|
||||
key: &str,
|
||||
body: &(impl serde::Serialize + Sync),
|
||||
) -> Result<()> {
|
||||
self.transport
|
||||
.send(
|
||||
Method::PUT,
|
||||
&format!("{bucket}/{key}?tagging"),
|
||||
Some(body),
|
||||
"s3 set tags",
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Get tags for an object.
|
||||
pub async fn get_tags(&self, bucket: &str, key: &str) -> Result<serde_json::Value> {
|
||||
self.transport
|
||||
.json(
|
||||
Method::GET,
|
||||
&format!("{bucket}/{key}?tagging"),
|
||||
Option::<&()>::None,
|
||||
"s3 get tags",
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
// -- Multipart -----------------------------------------------------------
|
||||
|
||||
/// Initiate a multipart upload.
|
||||
pub async fn initiate_multipart(
|
||||
&self,
|
||||
bucket: &str,
|
||||
key: &str,
|
||||
) -> Result<InitiateMultipartResponse> {
|
||||
self.transport
|
||||
.json(
|
||||
Method::POST,
|
||||
&format!("{bucket}/{key}?uploads"),
|
||||
Option::<&()>::None,
|
||||
"s3 initiate multipart",
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Upload a single part of a multipart upload.
|
||||
pub async fn upload_part(
|
||||
&self,
|
||||
bucket: &str,
|
||||
key: &str,
|
||||
upload_id: &str,
|
||||
part_number: u32,
|
||||
data: Bytes,
|
||||
) -> Result<UploadPartResponse> {
|
||||
let path = format!("{bucket}/{key}?partNumber={part_number}&uploadId={upload_id}");
|
||||
let resp = self
|
||||
.transport
|
||||
.request(Method::PUT, &path)
|
||||
.body(data)
|
||||
.send()
|
||||
.await
|
||||
.with_ctx(|| format!("s3 upload part {path}: request failed"))?;
|
||||
let status = resp.status();
|
||||
if !status.is_success() {
|
||||
let body_text = resp.text().await.unwrap_or_default();
|
||||
return Err(SunbeamError::network(format!(
|
||||
"s3 upload part: HTTP {status}: {body_text}"
|
||||
)));
|
||||
}
|
||||
let etag = resp
|
||||
.headers()
|
||||
.get("ETag")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.unwrap_or("")
|
||||
.to_string();
|
||||
Ok(UploadPartResponse { etag, part_number })
|
||||
}
|
||||
|
||||
/// Complete a multipart upload.
|
||||
pub async fn complete_multipart(
|
||||
&self,
|
||||
bucket: &str,
|
||||
key: &str,
|
||||
upload_id: &str,
|
||||
body: &(impl serde::Serialize + Sync),
|
||||
) -> Result<CompleteMultipartResponse> {
|
||||
self.transport
|
||||
.json(
|
||||
Method::POST,
|
||||
&format!("{bucket}/{key}?uploadId={upload_id}"),
|
||||
Some(body),
|
||||
"s3 complete multipart",
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Abort a multipart upload.
|
||||
pub async fn abort_multipart(
|
||||
&self,
|
||||
bucket: &str,
|
||||
key: &str,
|
||||
upload_id: &str,
|
||||
) -> Result<()> {
|
||||
self.transport
|
||||
.send(
|
||||
Method::DELETE,
|
||||
&format!("{bucket}/{key}?uploadId={upload_id}"),
|
||||
Option::<&()>::None,
|
||||
"s3 abort multipart",
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_connect_url() {
|
||||
let c = S3Client::connect("sunbeam.pt");
|
||||
assert_eq!(c.base_url(), "https://s3.sunbeam.pt");
|
||||
assert_eq!(c.service_name(), "s3");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_from_parts() {
|
||||
let c = S3Client::from_parts("http://localhost:9000".into(), AuthMethod::None);
|
||||
assert_eq!(c.base_url(), "http://localhost:9000");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_set_auth() {
|
||||
let mut c = S3Client::connect("sunbeam.pt");
|
||||
assert!(matches!(c.transport.auth, AuthMethod::None));
|
||||
c.set_auth(AuthMethod::Bearer("tok".into()));
|
||||
assert!(matches!(c.transport.auth, AuthMethod::Bearer(ref s) if s == "tok"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_head_bucket_unreachable() {
|
||||
let c = S3Client::from_parts("http://127.0.0.1:19998".into(), AuthMethod::None);
|
||||
let result = c.head_bucket("test-bucket").await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_head_object_unreachable() {
|
||||
let c = S3Client::from_parts("http://127.0.0.1:19998".into(), AuthMethod::None);
|
||||
let result = c.head_object("bucket", "key").await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_list_buckets_unreachable() {
|
||||
let c = S3Client::from_parts("http://127.0.0.1:19998".into(), AuthMethod::None);
|
||||
let result = c.list_buckets().await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_put_object_unreachable() {
|
||||
let c = S3Client::from_parts("http://127.0.0.1:19998".into(), AuthMethod::None);
|
||||
let result = c
|
||||
.put_object("bucket", "key", "text/plain", Bytes::from("hello"))
|
||||
.await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_copy_object_unreachable() {
|
||||
let c = S3Client::from_parts("http://127.0.0.1:19998".into(), AuthMethod::None);
|
||||
let result = c.copy_object("bucket", "dest", "/src-bucket/src-key").await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_upload_part_unreachable() {
|
||||
let c = S3Client::from_parts("http://127.0.0.1:19998".into(), AuthMethod::None);
|
||||
let result = c
|
||||
.upload_part("bucket", "key", "upload-id", 1, Bytes::from("data"))
|
||||
.await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
}
|
||||
161
sunbeam-sdk/src/storage/types.rs
Normal file
161
sunbeam-sdk/src/storage/types.rs
Normal file
@@ -0,0 +1,161 @@
|
||||
//! S3 storage types.
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// Response from ListBuckets (GET /).
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ListBucketsResponse {
|
||||
#[serde(default, rename = "Buckets")]
|
||||
pub buckets: Vec<Bucket>,
|
||||
#[serde(default, rename = "Owner")]
|
||||
pub owner: Option<Owner>,
|
||||
}
|
||||
|
||||
/// A single S3 bucket.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Bucket {
|
||||
#[serde(rename = "Name")]
|
||||
pub name: String,
|
||||
#[serde(default, rename = "CreationDate")]
|
||||
pub creation_date: Option<String>,
|
||||
}
|
||||
|
||||
/// Bucket owner info.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Owner {
|
||||
#[serde(default, rename = "ID")]
|
||||
pub id: Option<String>,
|
||||
#[serde(default, rename = "DisplayName")]
|
||||
pub display_name: Option<String>,
|
||||
}
|
||||
|
||||
/// Response from ListObjectsV2.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ListObjectsResponse {
|
||||
#[serde(default, rename = "Name")]
|
||||
pub name: String,
|
||||
#[serde(default, rename = "Prefix")]
|
||||
pub prefix: Option<String>,
|
||||
#[serde(default, rename = "MaxKeys")]
|
||||
pub max_keys: Option<u32>,
|
||||
#[serde(default, rename = "IsTruncated")]
|
||||
pub is_truncated: Option<bool>,
|
||||
#[serde(default, rename = "Contents")]
|
||||
pub contents: Vec<Object>,
|
||||
#[serde(default, rename = "NextContinuationToken")]
|
||||
pub next_continuation_token: Option<String>,
|
||||
}
|
||||
|
||||
/// A single S3 object in a listing.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Object {
|
||||
#[serde(rename = "Key")]
|
||||
pub key: String,
|
||||
#[serde(default, rename = "LastModified")]
|
||||
pub last_modified: Option<String>,
|
||||
#[serde(default, rename = "ETag")]
|
||||
pub etag: Option<String>,
|
||||
#[serde(default, rename = "Size")]
|
||||
pub size: Option<u64>,
|
||||
#[serde(default, rename = "StorageClass")]
|
||||
pub storage_class: Option<String>,
|
||||
}
|
||||
|
||||
/// Response from InitiateMultipartUpload.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct InitiateMultipartResponse {
|
||||
#[serde(rename = "Bucket")]
|
||||
pub bucket: String,
|
||||
#[serde(rename = "Key")]
|
||||
pub key: String,
|
||||
#[serde(rename = "UploadId")]
|
||||
pub upload_id: String,
|
||||
}
|
||||
|
||||
/// Response from UploadPart (extracted from headers).
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct UploadPartResponse {
|
||||
pub etag: String,
|
||||
pub part_number: u32,
|
||||
}
|
||||
|
||||
/// Response from CompleteMultipartUpload.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct CompleteMultipartResponse {
|
||||
#[serde(default, rename = "Location")]
|
||||
pub location: Option<String>,
|
||||
#[serde(rename = "Bucket")]
|
||||
pub bucket: String,
|
||||
#[serde(rename = "Key")]
|
||||
pub key: String,
|
||||
#[serde(default, rename = "ETag")]
|
||||
pub etag: Option<String>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_list_buckets_response_roundtrip() {
|
||||
let json = serde_json::json!({
|
||||
"Buckets": [
|
||||
{"Name": "my-bucket", "CreationDate": "2024-01-01T00:00:00Z"}
|
||||
],
|
||||
"Owner": {"ID": "owner-id", "DisplayName": "Owner"}
|
||||
});
|
||||
let resp: ListBucketsResponse = serde_json::from_value(json).unwrap();
|
||||
assert_eq!(resp.buckets.len(), 1);
|
||||
assert_eq!(resp.buckets[0].name, "my-bucket");
|
||||
assert_eq!(resp.owner.unwrap().display_name, Some("Owner".to_string()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_list_objects_response_roundtrip() {
|
||||
let json = serde_json::json!({
|
||||
"Name": "my-bucket",
|
||||
"Prefix": "docs/",
|
||||
"MaxKeys": 1000,
|
||||
"IsTruncated": false,
|
||||
"Contents": [
|
||||
{"Key": "docs/readme.md", "Size": 1024, "ETag": "\"abc\""}
|
||||
]
|
||||
});
|
||||
let resp: ListObjectsResponse = serde_json::from_value(json).unwrap();
|
||||
assert_eq!(resp.name, "my-bucket");
|
||||
assert_eq!(resp.contents.len(), 1);
|
||||
assert_eq!(resp.contents[0].key, "docs/readme.md");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_initiate_multipart_response() {
|
||||
let json = serde_json::json!({
|
||||
"Bucket": "my-bucket",
|
||||
"Key": "large-file.bin",
|
||||
"UploadId": "upload-123"
|
||||
});
|
||||
let resp: InitiateMultipartResponse = serde_json::from_value(json).unwrap();
|
||||
assert_eq!(resp.upload_id, "upload-123");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_complete_multipart_response() {
|
||||
let json = serde_json::json!({
|
||||
"Bucket": "my-bucket",
|
||||
"Key": "large-file.bin",
|
||||
"Location": "https://s3.example.com/my-bucket/large-file.bin",
|
||||
"ETag": "\"final-etag\""
|
||||
});
|
||||
let resp: CompleteMultipartResponse = serde_json::from_value(json).unwrap();
|
||||
assert_eq!(resp.bucket, "my-bucket");
|
||||
assert_eq!(resp.location, Some("https://s3.example.com/my-bucket/large-file.bin".to_string()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_empty_list_buckets() {
|
||||
let json = serde_json::json!({});
|
||||
let resp: ListBucketsResponse = serde_json::from_value(json).unwrap();
|
||||
assert!(resp.buckets.is_empty());
|
||||
assert!(resp.owner.is_none());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user