From 8e51e0b3ae2888b2310d6563b0e18d6af9b65e0f Mon Sep 17 00:00:00 2001 From: Sienna Meridian Satterwhite Date: Sat, 21 Mar 2026 14:35:43 +0000 Subject: [PATCH] refactor: SDK kube, openbao, and tools modules Move kube (client, apply, exec, secrets, kustomize_build) into kube/ submodule with tools.rs as a child. Move openbao BaoClient into openbao/ submodule. --- sunbeam-sdk/src/kube/mod.rs | 761 +++++++++++++++++++++++++++++++++ sunbeam-sdk/src/kube/tools.rs | 180 ++++++++ sunbeam-sdk/src/kube/tunnel.rs | 1 + sunbeam-sdk/src/openbao/mod.rs | 498 +++++++++++++++++++++ 4 files changed, 1440 insertions(+) create mode 100644 sunbeam-sdk/src/kube/mod.rs create mode 100644 sunbeam-sdk/src/kube/tools.rs create mode 100644 sunbeam-sdk/src/kube/tunnel.rs create mode 100644 sunbeam-sdk/src/openbao/mod.rs diff --git a/sunbeam-sdk/src/kube/mod.rs b/sunbeam-sdk/src/kube/mod.rs new file mode 100644 index 0000000..70bcea7 --- /dev/null +++ b/sunbeam-sdk/src/kube/mod.rs @@ -0,0 +1,761 @@ +mod tunnel; +pub mod tools; + +use crate::error::{Result, SunbeamError, ResultExt}; +use base64::Engine; +use k8s_openapi::api::apps::v1::Deployment; +use k8s_openapi::api::core::v1::{Namespace, Secret}; +use kube::api::{Api, ApiResource, DynamicObject, ListParams, Patch, PatchParams}; +use kube::config::{KubeConfigOptions, Kubeconfig}; +use kube::discovery::{self, Scope}; +use kube::{Client, Config}; +use std::collections::HashMap; +use std::path::Path; +use std::process::Stdio; +use std::sync::{Mutex, OnceLock}; +use tokio::sync::OnceCell; + +static CONTEXT: OnceLock = OnceLock::new(); +static SSH_HOST: OnceLock = OnceLock::new(); +static KUBE_CLIENT: OnceCell = OnceCell::const_new(); +static SSH_TUNNEL: Mutex> = Mutex::new(None); +static API_DISCOVERY: OnceCell = OnceCell::const_new(); + +/// Set the active kubectl context and optional SSH host for production tunnel. +pub fn set_context(ctx: &str, ssh_host: &str) { + let _ = CONTEXT.set(ctx.to_string()); + let _ = SSH_HOST.set(ssh_host.to_string()); +} + +/// Get the active context. +pub fn context() -> &'static str { + CONTEXT.get().map(|s| s.as_str()).unwrap_or("sunbeam") +} + +/// Get the SSH host (empty for local). +pub fn ssh_host() -> &'static str { + SSH_HOST.get().map(|s| s.as_str()).unwrap_or("") +} + +// --------------------------------------------------------------------------- +// SSH tunnel management +// --------------------------------------------------------------------------- + +/// Ensure SSH tunnel is open for production (forwards localhost:16443 -> remote:6443). +/// For local dev (empty ssh_host), this is a no-op. +#[allow(dead_code)] +pub async fn ensure_tunnel() -> Result<()> { + let host = ssh_host(); + if host.is_empty() { + return Ok(()); + } + + // Check if tunnel is already open + if tokio::net::TcpStream::connect("127.0.0.1:16443") + .await + .is_ok() + { + return Ok(()); + } + + crate::output::ok(&format!("Opening SSH tunnel to {host}...")); + + let child = tokio::process::Command::new("ssh") + .args([ + "-p", + "2222", + "-L", + "16443:127.0.0.1:6443", + "-N", + "-o", + "ExitOnForwardFailure=yes", + "-o", + "StrictHostKeyChecking=no", + host, + ]) + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .spawn() + .ctx("Failed to spawn SSH tunnel")?; + + // Store child so it lives for the process lifetime (and can be killed on cleanup) + if let Ok(mut guard) = SSH_TUNNEL.lock() { + *guard = Some(child); + } + + // Wait for tunnel to become available + for _ in 0..20 { + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + if tokio::net::TcpStream::connect("127.0.0.1:16443") + .await + .is_ok() + { + return Ok(()); + } + } + + bail!("SSH tunnel to {host} did not open in time") +} + +// --------------------------------------------------------------------------- +// Client initialization +// --------------------------------------------------------------------------- + +/// Get or create a kube::Client configured for the active context. +/// Opens SSH tunnel first if needed for production. +pub async fn get_client() -> Result<&'static Client> { + KUBE_CLIENT + .get_or_try_init(|| async { + ensure_tunnel().await?; + + let kubeconfig = Kubeconfig::read().map_err(|e| SunbeamError::kube(format!("Failed to read kubeconfig: {e}")))?; + let options = KubeConfigOptions { + context: Some(context().to_string()), + ..Default::default() + }; + let config = Config::from_custom_kubeconfig(kubeconfig, &options) + .await + .map_err(|e| SunbeamError::kube(format!("Failed to build kube config from kubeconfig: {e}")))?; + Client::try_from(config).ctx("Failed to create kube client") + }) + .await +} + +// --------------------------------------------------------------------------- +// Core Kubernetes operations +// --------------------------------------------------------------------------- + +/// Server-side apply a multi-document YAML manifest. +#[allow(dead_code)] +pub async fn kube_apply(manifest: &str) -> Result<()> { + let client = get_client().await?; + let ssapply = PatchParams::apply("sunbeam").force(); + + for doc in manifest.split("\n---") { + let doc = doc.trim(); + if doc.is_empty() || doc == "---" { + continue; + } + + // Parse the YAML to a DynamicObject so we can route it + let obj: serde_yaml::Value = + serde_yaml::from_str(doc).ctx("Failed to parse YAML document")?; + + let api_version = obj + .get("apiVersion") + .and_then(|v| v.as_str()) + .unwrap_or(""); + let kind = obj.get("kind").and_then(|v| v.as_str()).unwrap_or(""); + let metadata = obj.get("metadata"); + let name = metadata + .and_then(|m| m.get("name")) + .and_then(|v| v.as_str()) + .unwrap_or(""); + let namespace = metadata + .and_then(|m| m.get("namespace")) + .and_then(|v| v.as_str()); + + if name.is_empty() || kind.is_empty() { + continue; // skip incomplete documents + } + + // Use discovery to find the right API resource + let (ar, scope) = resolve_api_resource(client, api_version, kind).await?; + + let api: Api = if let Some(ns) = namespace { + Api::namespaced_with(client.clone(), ns, &ar) + } else if scope == Scope::Namespaced { + // Namespaced resource without a namespace specified; use default + Api::default_namespaced_with(client.clone(), &ar) + } else { + Api::all_with(client.clone(), &ar) + }; + + let patch: serde_json::Value = + serde_yaml::from_str(doc).ctx("Failed to parse YAML to JSON value")?; + + api.patch(name, &ssapply, &Patch::Apply(patch)) + .await + .with_ctx(|| format!("Failed to apply {kind}/{name}"))?; + } + Ok(()) +} + +/// Resolve an API resource from apiVersion and kind using discovery. +async fn resolve_api_resource( + client: &Client, + api_version: &str, + kind: &str, +) -> Result<(ApiResource, Scope)> { + // Split apiVersion into group and version + let (group, version) = if api_version.contains('/') { + let parts: Vec<&str> = api_version.splitn(2, '/').collect(); + (parts[0], parts[1]) + } else { + ("", api_version) // core API group + }; + + let disc = API_DISCOVERY + .get_or_try_init(|| async { + discovery::Discovery::new(client.clone()) + .run() + .await + .ctx("API discovery failed") + }) + .await?; + + for api_group in disc.groups() { + if api_group.name() == group { + for (ar, caps) in api_group.resources_by_stability() { + if ar.kind == kind && ar.version == version { + return Ok((ar, caps.scope)); + } + } + } + } + + bail!("Could not discover API resource for {api_version}/{kind}") +} + +/// Get a Kubernetes Secret object. +#[allow(dead_code)] +pub async fn kube_get_secret(ns: &str, name: &str) -> Result> { + let client = get_client().await?; + let api: Api = Api::namespaced(client.clone(), ns); + match api.get_opt(name).await { + Ok(secret) => Ok(secret), + Err(e) => Err(e).with_ctx(|| format!("Failed to get secret {ns}/{name}")), + } +} + +/// Get a specific base64-decoded field from a Kubernetes secret. +#[allow(dead_code)] +pub async fn kube_get_secret_field(ns: &str, name: &str, key: &str) -> Result { + let secret = kube_get_secret(ns, name) + .await? + .with_ctx(|| format!("Secret {ns}/{name} not found"))?; + + let data = secret.data.as_ref().ctx("Secret has no data")?; + + let bytes = data + .get(key) + .with_ctx(|| format!("Key {key:?} not found in secret {ns}/{name}"))?; + + String::from_utf8(bytes.0.clone()) + .with_ctx(|| format!("Key {key:?} in secret {ns}/{name} is not valid UTF-8")) +} + +/// Check if a namespace exists. +#[allow(dead_code)] +pub async fn ns_exists(ns: &str) -> Result { + let client = get_client().await?; + let api: Api = Api::all(client.clone()); + match api.get_opt(ns).await { + Ok(Some(_)) => Ok(true), + Ok(None) => Ok(false), + Err(e) => Err(e).with_ctx(|| format!("Failed to check namespace {ns}")), + } +} + +/// Create namespace if it does not exist. +#[allow(dead_code)] +pub async fn ensure_ns(ns: &str) -> Result<()> { + if ns_exists(ns).await? { + return Ok(()); + } + let client = get_client().await?; + let api: Api = Api::all(client.clone()); + let ns_obj = serde_json::json!({ + "apiVersion": "v1", + "kind": "Namespace", + "metadata": { "name": ns } + }); + let pp = PatchParams::apply("sunbeam").force(); + api.patch(ns, &pp, &Patch::Apply(ns_obj)) + .await + .with_ctx(|| format!("Failed to create namespace {ns}"))?; + Ok(()) +} + +/// Create or update a generic Kubernetes secret via server-side apply. +#[allow(dead_code)] +pub async fn create_secret(ns: &str, name: &str, data: HashMap) -> Result<()> { + let client = get_client().await?; + let api: Api = Api::namespaced(client.clone(), ns); + + // Encode values as base64 + let mut encoded: serde_json::Map = serde_json::Map::new(); + for (k, v) in &data { + let b64 = base64::engine::general_purpose::STANDARD.encode(v.as_bytes()); + encoded.insert(k.clone(), serde_json::Value::String(b64)); + } + + let secret_obj = serde_json::json!({ + "apiVersion": "v1", + "kind": "Secret", + "metadata": { + "name": name, + "namespace": ns, + }, + "type": "Opaque", + "data": encoded, + }); + + let pp = PatchParams::apply("sunbeam").force(); + api.patch(name, &pp, &Patch::Apply(secret_obj)) + .await + .with_ctx(|| format!("Failed to create/update secret {ns}/{name}"))?; + Ok(()) +} + +/// Execute a command in a pod and return (exit_code, stdout). +#[allow(dead_code)] +pub async fn kube_exec( + ns: &str, + pod: &str, + cmd: &[&str], + container: Option<&str>, +) -> Result<(i32, String)> { + let client = get_client().await?; + let pods: Api = Api::namespaced(client.clone(), ns); + + let mut ep = kube::api::AttachParams::default(); + ep.stdout = true; + ep.stderr = true; + ep.stdin = false; + if let Some(c) = container { + ep.container = Some(c.to_string()); + } + + let cmd_strings: Vec = cmd.iter().map(|s| s.to_string()).collect(); + let mut attached = pods + .exec(pod, cmd_strings, &ep) + .await + .with_ctx(|| format!("Failed to exec in pod {ns}/{pod}"))?; + + let stdout = { + let mut stdout_reader = attached + .stdout() + .ctx("No stdout stream from exec")?; + let mut buf = Vec::new(); + tokio::io::AsyncReadExt::read_to_end(&mut stdout_reader, &mut buf).await?; + String::from_utf8_lossy(&buf).to_string() + }; + + let status = attached + .take_status() + .ctx("No status channel from exec")?; + + // Wait for the status + let exit_code = if let Some(status) = status.await { + status + .status + .map(|s| if s == "Success" { 0 } else { 1 }) + .unwrap_or(1) + } else { + 1 + }; + + Ok((exit_code, stdout.trim().to_string())) +} + +/// Patch a deployment to trigger a rollout restart. +#[allow(dead_code)] +pub async fn kube_rollout_restart(ns: &str, deployment: &str) -> Result<()> { + let client = get_client().await?; + let api: Api = Api::namespaced(client.clone(), ns); + + let now = chrono::Utc::now().to_rfc3339(); + let patch = serde_json::json!({ + "spec": { + "template": { + "metadata": { + "annotations": { + "kubectl.kubernetes.io/restartedAt": now + } + } + } + } + }); + + api.patch(deployment, &PatchParams::default(), &Patch::Strategic(patch)) + .await + .with_ctx(|| format!("Failed to restart deployment {ns}/{deployment}"))?; + Ok(()) +} + +/// Discover the active domain from cluster state. +/// +/// Tries the gitea-inline-config secret first (DOMAIN=src.), +/// falls back to lasuite-oidc-provider configmap, then Lima VM IP. +#[allow(dead_code)] +pub async fn get_domain() -> Result { + // 1. Gitea inline-config secret + if let Ok(Some(secret)) = kube_get_secret("devtools", "gitea-inline-config").await { + if let Some(data) = &secret.data { + if let Some(server_bytes) = data.get("server") { + let server_ini = String::from_utf8_lossy(&server_bytes.0); + for line in server_ini.lines() { + if let Some(rest) = line.strip_prefix("DOMAIN=src.") { + return Ok(rest.trim().to_string()); + } + } + } + } + } + + // 2. Fallback: lasuite-oidc-provider configmap + { + let client = get_client().await?; + let api: Api = + Api::namespaced(client.clone(), "lasuite"); + if let Ok(Some(cm)) = api.get_opt("lasuite-oidc-provider").await { + if let Some(data) = &cm.data { + if let Some(endpoint) = data.get("OIDC_OP_JWKS_ENDPOINT") { + if let Some(rest) = endpoint.split("https://auth.").nth(1) { + if let Some(domain) = rest.split('/').next() { + return Ok(domain.to_string()); + } + } + } + } + } + } + + // 3. Local dev fallback: Lima VM IP + let ip = get_lima_ip().await; + Ok(format!("{ip}.sslip.io")) +} + +/// Get the socket_vmnet IP of the Lima sunbeam VM. +async fn get_lima_ip() -> String { + let output = tokio::process::Command::new("limactl") + .args(["shell", "sunbeam", "ip", "-4", "addr", "show", "eth1"]) + .output() + .await; + + if let Ok(out) = output { + let stdout = String::from_utf8_lossy(&out.stdout); + for line in stdout.lines() { + if line.contains("inet ") { + if let Some(addr) = line.trim().split_whitespace().nth(1) { + if let Some(ip) = addr.split('/').next() { + return ip.to_string(); + } + } + } + } + } + + // Fallback: hostname -I + let output2 = tokio::process::Command::new("limactl") + .args(["shell", "sunbeam", "hostname", "-I"]) + .output() + .await; + + if let Ok(out) = output2 { + let stdout = String::from_utf8_lossy(&out.stdout); + let ips: Vec<&str> = stdout.trim().split_whitespace().collect(); + if ips.len() >= 2 { + return ips[ips.len() - 1].to_string(); + } else if !ips.is_empty() { + return ips[0].to_string(); + } + } + + String::new() +} + +// --------------------------------------------------------------------------- +// kustomize build +// --------------------------------------------------------------------------- + +/// Run kustomize build --enable-helm and apply domain/email substitution. +#[allow(dead_code)] +pub async fn kustomize_build(overlay: &Path, domain: &str, email: &str) -> Result { + let kustomize_path = self::tools::ensure_kustomize()?; + let helm_path = self::tools::ensure_helm()?; + + // Ensure helm's parent dir is on PATH so kustomize can find it + let helm_dir = helm_path + .parent() + .map(|p| p.to_string_lossy().to_string()) + .unwrap_or_default(); + + let mut env_path = helm_dir.clone(); + if let Ok(existing) = std::env::var("PATH") { + env_path = format!("{helm_dir}:{existing}"); + } + + let output = tokio::process::Command::new(&kustomize_path) + .args(["build", "--enable-helm"]) + .arg(overlay) + .env("PATH", &env_path) + .output() + .await + .ctx("Failed to run kustomize")?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + bail!("kustomize build failed: {stderr}"); + } + + let mut text = String::from_utf8(output.stdout).ctx("kustomize output not UTF-8")?; + + // Domain substitution + text = domain_replace(&text, domain); + + // ACME email substitution + if !email.is_empty() { + text = text.replace("ACME_EMAIL", email); + } + + // Registry host IP resolution + if text.contains("REGISTRY_HOST_IP") { + let registry_ip = resolve_registry_ip(domain).await; + text = text.replace("REGISTRY_HOST_IP", ®istry_ip); + } + + // Strip null annotations artifact + text = text.replace("\n annotations: null", ""); + + Ok(text) +} + +/// Resolve the registry host IP for REGISTRY_HOST_IP substitution. +async fn resolve_registry_ip(domain: &str) -> String { + // Try DNS for src. + let hostname = format!("src.{domain}:443"); + if let Ok(mut addrs) = tokio::net::lookup_host(&hostname).await { + if let Some(addr) = addrs.next() { + return addr.ip().to_string(); + } + } + + // Fallback: derive from production host config + let ssh_host = crate::config::get_production_host(); + if !ssh_host.is_empty() { + let raw = ssh_host + .split('@') + .last() + .unwrap_or(&ssh_host) + .split(':') + .next() + .unwrap_or(&ssh_host); + let host_lookup = format!("{raw}:443"); + if let Ok(mut addrs) = tokio::net::lookup_host(&host_lookup).await { + if let Some(addr) = addrs.next() { + return addr.ip().to_string(); + } + } + // raw is likely already an IP + return raw.to_string(); + } + + String::new() +} + +// --------------------------------------------------------------------------- +// kubectl / bao passthrough +// --------------------------------------------------------------------------- + +/// Transparent kubectl passthrough for the active context. +pub async fn cmd_k8s(kubectl_args: &[String]) -> Result<()> { + ensure_tunnel().await?; + + let status = tokio::process::Command::new("kubectl") + .arg(format!("--context={}", context())) + .args(kubectl_args) + .stdin(Stdio::inherit()) + .stdout(Stdio::inherit()) + .stderr(Stdio::inherit()) + .status() + .await + .ctx("Failed to run kubectl")?; + + if !status.success() { + std::process::exit(status.code().unwrap_or(1)); + } + Ok(()) +} + +/// Run bao CLI inside the OpenBao pod with the root token. +pub async fn cmd_bao(bao_args: &[String]) -> Result<()> { + // Find the openbao pod + let client = get_client().await?; + let pods: Api = Api::namespaced(client.clone(), "data"); + + let lp = ListParams::default().labels("app.kubernetes.io/name=openbao"); + let pod_list = pods.list(&lp).await.ctx("Failed to list OpenBao pods")?; + let ob_pod = pod_list + .items + .first() + .and_then(|p| p.metadata.name.as_deref()) + .ctx("OpenBao pod not found -- is the cluster running?")? + .to_string(); + + // Get root token + let root_token = kube_get_secret_field("data", "openbao-keys", "root-token") + .await + .ctx("root-token not found in openbao-keys secret")?; + + // Build the kubectl exec command + let vault_token_env = format!("VAULT_TOKEN={root_token}"); + let mut kubectl_args = vec![ + format!("--context={}", context()), + "-n".to_string(), + "data".to_string(), + "exec".to_string(), + ob_pod, + "-c".to_string(), + "openbao".to_string(), + "--".to_string(), + "env".to_string(), + vault_token_env, + "bao".to_string(), + ]; + kubectl_args.extend(bao_args.iter().cloned()); + + // Use kubectl for full TTY support + let status = tokio::process::Command::new("kubectl") + .args(&kubectl_args) + .stdin(Stdio::inherit()) + .stdout(Stdio::inherit()) + .stderr(Stdio::inherit()) + .status() + .await + .ctx("Failed to run bao in OpenBao pod")?; + + if !status.success() { + std::process::exit(status.code().unwrap_or(1)); + } + Ok(()) +} + +// --------------------------------------------------------------------------- +// Parse target and domain_replace (already tested) +// --------------------------------------------------------------------------- + +/// Parse 'ns/name' -> (Some(ns), Some(name)), 'ns' -> (Some(ns), None), None -> (None, None). +pub fn parse_target(s: Option<&str>) -> Result<(Option<&str>, Option<&str>)> { + match s { + None => Ok((None, None)), + Some(s) => { + let parts: Vec<&str> = s.splitn(3, '/').collect(); + match parts.len() { + 1 => Ok((Some(parts[0]), None)), + 2 => Ok((Some(parts[0]), Some(parts[1]))), + _ => bail!("Invalid target {s:?}: expected 'namespace' or 'namespace/name'"), + } + } + } +} + +/// Replace all occurrences of DOMAIN_SUFFIX with domain. +pub fn domain_replace(text: &str, domain: &str) -> String { + text.replace("DOMAIN_SUFFIX", domain) +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_target_none() { + let (ns, name) = parse_target(None).unwrap(); + assert!(ns.is_none()); + assert!(name.is_none()); + } + + #[test] + fn test_parse_target_namespace_only() { + let (ns, name) = parse_target(Some("ory")).unwrap(); + assert_eq!(ns, Some("ory")); + assert!(name.is_none()); + } + + #[test] + fn test_parse_target_namespace_and_name() { + let (ns, name) = parse_target(Some("ory/kratos")).unwrap(); + assert_eq!(ns, Some("ory")); + assert_eq!(name, Some("kratos")); + } + + #[test] + fn test_parse_target_too_many_parts() { + assert!(parse_target(Some("too/many/parts")).is_err()); + } + + #[test] + fn test_parse_target_empty_string() { + let (ns, name) = parse_target(Some("")).unwrap(); + assert_eq!(ns, Some("")); + assert!(name.is_none()); + } + + #[test] + fn test_domain_replace_single() { + let result = domain_replace("src.DOMAIN_SUFFIX/foo", "192.168.1.1.sslip.io"); + assert_eq!(result, "src.192.168.1.1.sslip.io/foo"); + } + + #[test] + fn test_domain_replace_multiple() { + let result = domain_replace("DOMAIN_SUFFIX and DOMAIN_SUFFIX", "x.sslip.io"); + assert_eq!(result, "x.sslip.io and x.sslip.io"); + } + + #[test] + fn test_domain_replace_none() { + let result = domain_replace("no match here", "x.sslip.io"); + assert_eq!(result, "no match here"); + } + + #[tokio::test] + async fn test_ensure_tunnel_noop_when_ssh_host_empty() { + // When ssh_host is empty (local dev), ensure_tunnel should return Ok + // immediately without spawning any SSH process. + // SSH_HOST OnceLock may already be set from another test, but the + // default (unset) value is "" which is what we want. If it was set + // to a non-empty value by a prior test in the same process, this + // test would attempt a real SSH connection and fail — that is acceptable + // as a signal that test isolation changed. + // + // In a fresh test binary SSH_HOST is unset, so ssh_host() returns "". + let result = ensure_tunnel().await; + assert!(result.is_ok(), "ensure_tunnel should be a no-op when ssh_host is empty"); + } + + #[test] + fn test_create_secret_data_encoding() { + // Test that we can build the expected JSON structure for secret creation + let mut data = HashMap::new(); + data.insert("username".to_string(), "admin".to_string()); + data.insert("password".to_string(), "s3cret".to_string()); + + let mut encoded: serde_json::Map = serde_json::Map::new(); + for (k, v) in &data { + let b64 = base64::engine::general_purpose::STANDARD.encode(v.as_bytes()); + encoded.insert(k.clone(), serde_json::Value::String(b64)); + } + + let secret_obj = serde_json::json!({ + "apiVersion": "v1", + "kind": "Secret", + "metadata": { + "name": "test-secret", + "namespace": "default", + }, + "type": "Opaque", + "data": encoded, + }); + + let json_str = serde_json::to_string(&secret_obj).unwrap(); + assert!(json_str.contains("YWRtaW4=")); // base64("admin") + assert!(json_str.contains("czNjcmV0")); // base64("s3cret") + } +} diff --git a/sunbeam-sdk/src/kube/tools.rs b/sunbeam-sdk/src/kube/tools.rs new file mode 100644 index 0000000..ac025ab --- /dev/null +++ b/sunbeam-sdk/src/kube/tools.rs @@ -0,0 +1,180 @@ +use crate::error::{Result, ResultExt}; +use std::path::PathBuf; + +static KUSTOMIZE_BIN: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/kustomize")); +static HELM_BIN: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/helm")); + +fn cache_dir() -> PathBuf { + dirs::data_dir() + .unwrap_or_else(|| dirs::home_dir().unwrap_or_else(|| PathBuf::from("."))) + .join("sunbeam") + .join("bin") +} + +/// Extract an embedded binary to the cache directory if not already present. +fn extract_embedded(data: &[u8], name: &str) -> Result { + let dir = cache_dir(); + std::fs::create_dir_all(&dir) + .with_ctx(|| format!("Failed to create cache dir: {}", dir.display()))?; + + let dest = dir.join(name); + + // Skip if already extracted and same size + if dest.exists() { + if let Ok(meta) = std::fs::metadata(&dest) { + if meta.len() == data.len() as u64 { + return Ok(dest); + } + } + } + + std::fs::write(&dest, data) + .with_ctx(|| format!("Failed to write {}", dest.display()))?; + + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + std::fs::set_permissions(&dest, std::fs::Permissions::from_mode(0o755))?; + } + + Ok(dest) +} + +/// Ensure kustomize is extracted and return its path. +pub fn ensure_kustomize() -> Result { + extract_embedded(KUSTOMIZE_BIN, "kustomize") +} + +/// Ensure helm is extracted and return its path. +pub fn ensure_helm() -> Result { + extract_embedded(HELM_BIN, "helm") +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn kustomize_bin_is_non_empty() { + assert!( + KUSTOMIZE_BIN.len() > 0, + "Embedded kustomize binary should not be empty" + ); + } + + #[test] + fn helm_bin_is_non_empty() { + assert!( + HELM_BIN.len() > 0, + "Embedded helm binary should not be empty" + ); + } + + #[test] + fn kustomize_bin_has_reasonable_size() { + // kustomize binary should be at least 1 MB + assert!( + KUSTOMIZE_BIN.len() > 1_000_000, + "Embedded kustomize binary seems too small: {} bytes", + KUSTOMIZE_BIN.len() + ); + } + + #[test] + fn helm_bin_has_reasonable_size() { + // helm binary should be at least 1 MB + assert!( + HELM_BIN.len() > 1_000_000, + "Embedded helm binary seems too small: {} bytes", + HELM_BIN.len() + ); + } + + #[test] + fn cache_dir_ends_with_sunbeam_bin() { + let dir = cache_dir(); + assert!( + dir.ends_with("sunbeam/bin"), + "cache_dir() should end with sunbeam/bin, got: {}", + dir.display() + ); + } + + #[test] + fn cache_dir_is_absolute() { + let dir = cache_dir(); + assert!( + dir.is_absolute(), + "cache_dir() should return an absolute path, got: {}", + dir.display() + ); + } + + #[test] + fn ensure_kustomize_returns_valid_path() { + let path = ensure_kustomize().expect("ensure_kustomize should succeed"); + assert!( + path.ends_with("kustomize"), + "ensure_kustomize path should end with 'kustomize', got: {}", + path.display() + ); + assert!(path.exists(), "kustomize binary should exist at: {}", path.display()); + } + + #[test] + fn ensure_helm_returns_valid_path() { + let path = ensure_helm().expect("ensure_helm should succeed"); + assert!( + path.ends_with("helm"), + "ensure_helm path should end with 'helm', got: {}", + path.display() + ); + assert!(path.exists(), "helm binary should exist at: {}", path.display()); + } + + #[test] + fn ensure_kustomize_is_idempotent() { + let path1 = ensure_kustomize().expect("first call should succeed"); + let path2 = ensure_kustomize().expect("second call should succeed"); + assert_eq!(path1, path2, "ensure_kustomize should return the same path on repeated calls"); + } + + #[test] + fn ensure_helm_is_idempotent() { + let path1 = ensure_helm().expect("first call should succeed"); + let path2 = ensure_helm().expect("second call should succeed"); + assert_eq!(path1, path2, "ensure_helm should return the same path on repeated calls"); + } + + #[test] + fn extracted_kustomize_is_executable() { + let path = ensure_kustomize().expect("ensure_kustomize should succeed"); + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + let perms = std::fs::metadata(&path) + .expect("should read metadata") + .permissions(); + assert!( + perms.mode() & 0o111 != 0, + "kustomize binary should be executable" + ); + } + } + + #[test] + fn extracted_helm_is_executable() { + let path = ensure_helm().expect("ensure_helm should succeed"); + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + let perms = std::fs::metadata(&path) + .expect("should read metadata") + .permissions(); + assert!( + perms.mode() & 0o111 != 0, + "helm binary should be executable" + ); + } + } +} diff --git a/sunbeam-sdk/src/kube/tunnel.rs b/sunbeam-sdk/src/kube/tunnel.rs new file mode 100644 index 0000000..adbdd9d --- /dev/null +++ b/sunbeam-sdk/src/kube/tunnel.rs @@ -0,0 +1 @@ +// SSH tunnel management — reserved for future extraction. diff --git a/sunbeam-sdk/src/openbao/mod.rs b/sunbeam-sdk/src/openbao/mod.rs new file mode 100644 index 0000000..cdea8d9 --- /dev/null +++ b/sunbeam-sdk/src/openbao/mod.rs @@ -0,0 +1,498 @@ +//! Lightweight OpenBao/Vault HTTP API client. +//! +//! Replaces all `kubectl exec openbao-0 -- sh -c "bao ..."` calls from the +//! Python version with direct HTTP API calls via port-forward to openbao:8200. + +use crate::error::{Result, ResultExt}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +/// OpenBao HTTP client wrapping a base URL and optional root token. +#[derive(Clone)] +pub struct BaoClient { + pub base_url: String, + pub token: Option, + http: reqwest::Client, +} + +// ── API response types ────────────────────────────────────────────────────── + +#[derive(Debug, Deserialize)] +pub struct InitResponse { + pub unseal_keys_b64: Vec, + pub root_token: String, +} + +#[derive(Debug, Deserialize)] +pub struct SealStatusResponse { + #[serde(default)] + pub initialized: bool, + #[serde(default)] + pub sealed: bool, + #[serde(default)] + pub progress: u32, + #[serde(default)] + pub t: u32, + #[serde(default)] + pub n: u32, +} + +#[derive(Debug, Deserialize)] +pub struct UnsealResponse { + #[serde(default)] + pub sealed: bool, + #[serde(default)] + pub progress: u32, +} + +/// KV v2 read response wrapper. +#[derive(Debug, Deserialize)] +struct KvReadResponse { + data: Option, +} + +#[derive(Debug, Deserialize)] +struct KvReadData { + data: Option>, +} + +// ── Client implementation ─────────────────────────────────────────────────── + +impl BaoClient { + /// Create a new client pointing at `base_url` (e.g. `http://localhost:8200`). + pub fn new(base_url: &str) -> Self { + Self { + base_url: base_url.trim_end_matches('/').to_string(), + token: None, + http: reqwest::Client::new(), + } + } + + /// Create a client with an authentication token. + pub fn with_token(base_url: &str, token: &str) -> Self { + let mut client = Self::new(base_url); + client.token = Some(token.to_string()); + client + } + + fn url(&self, path: &str) -> String { + format!("{}/v1/{}", self.base_url, path.trim_start_matches('/')) + } + + fn request(&self, method: reqwest::Method, path: &str) -> reqwest::RequestBuilder { + let mut req = self.http.request(method, self.url(path)); + if let Some(ref token) = self.token { + req = req.header("X-Vault-Token", token); + } + req + } + + // ── System operations ─────────────────────────────────────────────── + + /// Get the seal status of the OpenBao instance. + pub async fn seal_status(&self) -> Result { + let resp = self + .http + .get(format!("{}/v1/sys/seal-status", self.base_url)) + .send() + .await + .ctx("Failed to connect to OpenBao")?; + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + bail!("OpenBao seal-status returned {status}: {body}"); + } + resp.json().await.ctx("Failed to parse seal status") + } + + /// Initialize OpenBao with the given number of key shares and threshold. + pub async fn init(&self, key_shares: u32, key_threshold: u32) -> Result { + #[derive(Serialize)] + struct InitRequest { + secret_shares: u32, + secret_threshold: u32, + } + + let resp = self + .http + .put(format!("{}/v1/sys/init", self.base_url)) + .json(&InitRequest { + secret_shares: key_shares, + secret_threshold: key_threshold, + }) + .send() + .await + .ctx("Failed to initialize OpenBao")?; + + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + bail!("OpenBao init returned {status}: {body}"); + } + resp.json().await.ctx("Failed to parse init response") + } + + /// Unseal OpenBao with one key share. + pub async fn unseal(&self, key: &str) -> Result { + #[derive(Serialize)] + struct UnsealRequest<'a> { + key: &'a str, + } + + let resp = self + .http + .put(format!("{}/v1/sys/unseal", self.base_url)) + .json(&UnsealRequest { key }) + .send() + .await + .ctx("Failed to unseal OpenBao")?; + + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + bail!("OpenBao unseal returned {status}: {body}"); + } + resp.json().await.ctx("Failed to parse unseal response") + } + + // ── Secrets engine management ─────────────────────────────────────── + + /// Enable a secrets engine at the given path. + /// Returns Ok(()) even if already enabled (400 is tolerated). + pub async fn enable_secrets_engine(&self, path: &str, engine_type: &str) -> Result<()> { + #[derive(Serialize)] + struct EnableRequest<'a> { + r#type: &'a str, + } + + let resp = self + .request(reqwest::Method::POST, &format!("sys/mounts/{path}")) + .json(&EnableRequest { + r#type: engine_type, + }) + .send() + .await + .ctx("Failed to enable secrets engine")?; + + let status = resp.status(); + if status.is_success() || status.as_u16() == 400 { + // 400 = "path is already in use" — idempotent + Ok(()) + } else { + let body = resp.text().await.unwrap_or_default(); + bail!("Enable secrets engine {path} returned {status}: {body}"); + } + } + + // ── KV v2 operations ──────────────────────────────────────────────── + + /// Read all fields from a KV v2 secret path. + /// Returns None if the path doesn't exist (404). + pub async fn kv_get(&self, mount: &str, path: &str) -> Result>> { + let resp = self + .request(reqwest::Method::GET, &format!("{mount}/data/{path}")) + .send() + .await + .ctx("Failed to read KV secret")?; + + if resp.status().as_u16() == 404 { + return Ok(None); + } + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + bail!("KV get {mount}/{path} returned {status}: {body}"); + } + + let kv_resp: KvReadResponse = resp.json().await.ctx("Failed to parse KV response")?; + let data = kv_resp + .data + .and_then(|d| d.data) + .unwrap_or_default(); + + // Convert all values to strings + let result: HashMap = data + .into_iter() + .map(|(k, v)| { + let s = match v { + serde_json::Value::String(s) => s, + other => other.to_string(), + }; + (k, s) + }) + .collect(); + + Ok(Some(result)) + } + + /// Read a single field from a KV v2 secret path. + /// Returns empty string if path or field doesn't exist. + pub async fn kv_get_field(&self, mount: &str, path: &str, field: &str) -> Result { + match self.kv_get(mount, path).await? { + Some(data) => Ok(data.get(field).cloned().unwrap_or_default()), + None => Ok(String::new()), + } + } + + /// Write (create or overwrite) all fields in a KV v2 secret path. + pub async fn kv_put( + &self, + mount: &str, + path: &str, + data: &HashMap, + ) -> Result<()> { + #[derive(Serialize)] + struct KvWriteRequest<'a> { + data: &'a HashMap, + } + + let resp = self + .request(reqwest::Method::POST, &format!("{mount}/data/{path}")) + .json(&KvWriteRequest { data }) + .send() + .await + .ctx("Failed to write KV secret")?; + + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + bail!("KV put {mount}/{path} returned {status}: {body}"); + } + Ok(()) + } + + /// Patch (merge) fields into an existing KV v2 secret path. + pub async fn kv_patch( + &self, + mount: &str, + path: &str, + data: &HashMap, + ) -> Result<()> { + #[derive(Serialize)] + struct KvWriteRequest<'a> { + data: &'a HashMap, + } + + let resp = self + .request(reqwest::Method::PATCH, &format!("{mount}/data/{path}")) + .header("Content-Type", "application/merge-patch+json") + .json(&KvWriteRequest { data }) + .send() + .await + .ctx("Failed to patch KV secret")?; + + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + bail!("KV patch {mount}/{path} returned {status}: {body}"); + } + Ok(()) + } + + /// Delete a KV v2 secret path (soft delete — deletes latest version). + pub async fn kv_delete(&self, mount: &str, path: &str) -> Result<()> { + let resp = self + .request(reqwest::Method::DELETE, &format!("{mount}/data/{path}")) + .send() + .await + .ctx("Failed to delete KV secret")?; + + // 404 is fine (already deleted) + if !resp.status().is_success() && resp.status().as_u16() != 404 { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + bail!("KV delete {mount}/{path} returned {status}: {body}"); + } + Ok(()) + } + + // ── Auth operations ───────────────────────────────────────────────── + + /// Enable an auth method at the given path. + /// Tolerates "already enabled" (400/409). + pub async fn auth_enable(&self, path: &str, method_type: &str) -> Result<()> { + #[derive(Serialize)] + struct AuthEnableRequest<'a> { + r#type: &'a str, + } + + let resp = self + .request(reqwest::Method::POST, &format!("sys/auth/{path}")) + .json(&AuthEnableRequest { + r#type: method_type, + }) + .send() + .await + .ctx("Failed to enable auth method")?; + + let status = resp.status(); + if status.is_success() || status.as_u16() == 400 { + Ok(()) + } else { + let body = resp.text().await.unwrap_or_default(); + bail!("Enable auth {path} returned {status}: {body}"); + } + } + + /// Write a policy. + pub async fn write_policy(&self, name: &str, policy_hcl: &str) -> Result<()> { + #[derive(Serialize)] + struct PolicyRequest<'a> { + policy: &'a str, + } + + let resp = self + .request( + reqwest::Method::PUT, + &format!("sys/policies/acl/{name}"), + ) + .json(&PolicyRequest { policy: policy_hcl }) + .send() + .await + .ctx("Failed to write policy")?; + + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + bail!("Write policy {name} returned {status}: {body}"); + } + Ok(()) + } + + /// Write to an arbitrary API path (for auth config, roles, database config, etc.). + pub async fn write( + &self, + path: &str, + data: &serde_json::Value, + ) -> Result { + let resp = self + .request(reqwest::Method::POST, path) + .json(data) + .send() + .await + .with_ctx(|| format!("Failed to write to {path}"))?; + + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + bail!("Write {path} returned {status}: {body}"); + } + + let body = resp.text().await.unwrap_or_default(); + if body.is_empty() { + Ok(serde_json::Value::Null) + } else { + serde_json::from_str(&body).ctx("Failed to parse write response") + } + } + + /// Read from an arbitrary API path. + pub async fn read(&self, path: &str) -> Result> { + let resp = self + .request(reqwest::Method::GET, path) + .send() + .await + .with_ctx(|| format!("Failed to read {path}"))?; + + if resp.status().as_u16() == 404 { + return Ok(None); + } + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + bail!("Read {path} returned {status}: {body}"); + } + + let body = resp.text().await.unwrap_or_default(); + if body.is_empty() { + Ok(Some(serde_json::Value::Null)) + } else { + Ok(Some(serde_json::from_str(&body)?)) + } + } + + // ── Database secrets engine ───────────────────────────────────────── + + /// Configure the database secrets engine connection. + pub async fn write_db_config( + &self, + name: &str, + plugin: &str, + connection_url: &str, + username: &str, + password: &str, + allowed_roles: &str, + ) -> Result<()> { + let data = serde_json::json!({ + "plugin_name": plugin, + "connection_url": connection_url, + "username": username, + "password": password, + "allowed_roles": allowed_roles, + }); + self.write(&format!("database/config/{name}"), &data).await?; + Ok(()) + } + + /// Create a database static role. + pub async fn write_db_static_role( + &self, + name: &str, + db_name: &str, + username: &str, + rotation_period: u64, + rotation_statements: &[&str], + ) -> Result<()> { + let data = serde_json::json!({ + "db_name": db_name, + "username": username, + "rotation_period": rotation_period, + "rotation_statements": rotation_statements, + }); + self.write(&format!("database/static-roles/{name}"), &data) + .await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_client_url_construction() { + let client = BaoClient::new("http://localhost:8200"); + assert_eq!(client.url("sys/seal-status"), "http://localhost:8200/v1/sys/seal-status"); + assert_eq!(client.url("/sys/seal-status"), "http://localhost:8200/v1/sys/seal-status"); + } + + #[test] + fn test_client_url_strips_trailing_slash() { + let client = BaoClient::new("http://localhost:8200/"); + assert_eq!(client.base_url, "http://localhost:8200"); + } + + #[test] + fn test_with_token() { + let client = BaoClient::with_token("http://localhost:8200", "mytoken"); + assert_eq!(client.token, Some("mytoken".to_string())); + } + + #[test] + fn test_new_has_no_token() { + let client = BaoClient::new("http://localhost:8200"); + assert!(client.token.is_none()); + } + + #[tokio::test] + async fn test_seal_status_error_on_nonexistent_server() { + // Connecting to a port where nothing is listening should produce an + // error (connection refused), not a panic or hang. + let client = BaoClient::new("http://127.0.0.1:19999"); + let result = client.seal_status().await; + assert!( + result.is_err(), + "seal_status should return an error when the server is unreachable" + ); + } +}