From 8e5d295902ae8c10bd7cb4d57b3db5abab0f7b30 Mon Sep 17 00:00:00 2001 From: Sienna Meridian Satterwhite Date: Sat, 21 Mar 2026 14:38:25 +0000 Subject: [PATCH] =?UTF-8?q?refactor:=20SDK=20small=20command=20modules=20?= =?UTF-8?q?=E2=80=94=20services,=20cluster,=20manifests,=20gitea,=20update?= =?UTF-8?q?,=20auth?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sunbeam-sdk/src/auth/mod.rs | 952 +++++++++++++++++++++++++++++++ sunbeam-sdk/src/cluster/mod.rs | 461 +++++++++++++++ sunbeam-sdk/src/gitea/mod.rs | 429 ++++++++++++++ sunbeam-sdk/src/manifests/mod.rs | 880 ++++++++++++++++++++++++++++ sunbeam-sdk/src/services/mod.rs | 573 +++++++++++++++++++ sunbeam-sdk/src/update/mod.rs | 443 ++++++++++++++ 6 files changed, 3738 insertions(+) create mode 100644 sunbeam-sdk/src/auth/mod.rs create mode 100644 sunbeam-sdk/src/cluster/mod.rs create mode 100644 sunbeam-sdk/src/gitea/mod.rs create mode 100644 sunbeam-sdk/src/manifests/mod.rs create mode 100644 sunbeam-sdk/src/services/mod.rs create mode 100644 sunbeam-sdk/src/update/mod.rs diff --git a/sunbeam-sdk/src/auth/mod.rs b/sunbeam-sdk/src/auth/mod.rs new file mode 100644 index 0000000..3b334e6 --- /dev/null +++ b/sunbeam-sdk/src/auth/mod.rs @@ -0,0 +1,952 @@ +//! OAuth2 Authorization Code flow with PKCE for CLI authentication against Hydra. + +use crate::error::{Result, ResultExt, SunbeamError}; +use base64::Engine; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; +use std::path::PathBuf; + +// --------------------------------------------------------------------------- +// Token cache data +// --------------------------------------------------------------------------- + +/// Cached OAuth2 tokens persisted to disk. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AuthTokens { + pub access_token: String, + pub refresh_token: String, + pub expires_at: DateTime, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub id_token: Option, + pub domain: String, + /// Gitea personal access token (created during auth login). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub gitea_token: Option, +} + +/// Default client ID when the K8s secret is unavailable. +const DEFAULT_CLIENT_ID: &str = "sunbeam-cli"; + +// --------------------------------------------------------------------------- +// Cache file helpers +// --------------------------------------------------------------------------- + +/// Cache path for auth tokens — per-domain so multiple environments work. +fn cache_path_for_domain(domain: &str) -> PathBuf { + let dir = dirs::data_dir() + .unwrap_or_else(|| { + dirs::home_dir() + .unwrap_or_else(|| PathBuf::from(".")) + .join(".local/share") + }) + .join("sunbeam") + .join("auth"); + if domain.is_empty() { + dir.join("default.json") + } else { + // Sanitize domain for filename + let safe = domain.replace(['/', '\\', ':'], "_"); + dir.join(format!("{safe}.json")) + } +} + +fn cache_path() -> PathBuf { + let domain = crate::config::domain(); + cache_path_for_domain(domain) +} + +fn read_cache() -> Result { + let path = cache_path(); + let content = std::fs::read_to_string(&path).map_err(|e| { + SunbeamError::Identity(format!("No cached auth tokens ({}): {e}", path.display())) + })?; + let tokens: AuthTokens = serde_json::from_str(&content) + .ctx("Failed to parse cached auth tokens")?; + Ok(tokens) +} + +fn write_cache(tokens: &AuthTokens) -> Result<()> { + let path = cache_path(); + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent) + .with_ctx(|| format!("Failed to create auth cache dir: {}", parent.display()))?; + } + let content = serde_json::to_string_pretty(tokens)?; + std::fs::write(&path, &content) + .with_ctx(|| format!("Failed to write auth cache to {}", path.display()))?; + + // Set 0600 permissions on unix + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + let perms = std::fs::Permissions::from_mode(0o600); + std::fs::set_permissions(&path, perms) + .with_ctx(|| format!("Failed to set permissions on {}", path.display()))?; + } + + Ok(()) +} + +// --------------------------------------------------------------------------- +// PKCE +// --------------------------------------------------------------------------- + +/// Generate a PKCE code_verifier and code_challenge (S256). +fn generate_pkce() -> (String, String) { + let verifier_bytes: [u8; 32] = rand::random(); + let verifier = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(verifier_bytes); + let challenge = { + let hash = Sha256::digest(verifier.as_bytes()); + base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(hash) + }; + (verifier, challenge) +} + +/// Generate a random state parameter for OAuth2. +fn generate_state() -> String { + let bytes: [u8; 16] = rand::random(); + base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(bytes) +} + +// --------------------------------------------------------------------------- +// OIDC discovery +// --------------------------------------------------------------------------- + +#[derive(Debug, Deserialize)] +struct OidcDiscovery { + authorization_endpoint: String, + token_endpoint: String, +} + +/// Resolve the domain for authentication, trying multiple sources. +async fn resolve_domain(explicit: Option<&str>) -> Result { + // 1. Explicit --domain flag + if let Some(d) = explicit { + if !d.is_empty() { + return Ok(d.to_string()); + } + } + + // 2. Active context domain (set by cli::dispatch from config) + let ctx_domain = crate::config::domain(); + if !ctx_domain.is_empty() { + return Ok(ctx_domain.to_string()); + } + + // 3. Cached token domain (already logged in) + if let Ok(tokens) = read_cache() { + if !tokens.domain.is_empty() { + crate::output::ok(&format!("Using cached domain: {}", tokens.domain)); + return Ok(tokens.domain); + } + } + + // 4. Try cluster discovery (may fail if not connected) + match crate::kube::get_domain().await { + Ok(d) if !d.is_empty() && !d.starts_with('.') => return Ok(d), + _ => {} + } + + Err(SunbeamError::config( + "Could not determine domain. Use --domain flag, or configure with:\n \ + sunbeam config set --host user@your-server.example.com", + )) +} + +async fn discover_oidc(domain: &str) -> Result { + let url = format!("https://auth.{domain}/.well-known/openid-configuration"); + let client = reqwest::Client::new(); + let resp = client + .get(&url) + .send() + .await + .with_ctx(|| format!("Failed to fetch OIDC discovery from {url}"))?; + + if !resp.status().is_success() { + return Err(SunbeamError::network(format!( + "OIDC discovery returned HTTP {}", + resp.status() + ))); + } + + let discovery: OidcDiscovery = resp + .json() + .await + .ctx("Failed to parse OIDC discovery response")?; + Ok(discovery) +} + +// --------------------------------------------------------------------------- +// Token exchange / refresh +// --------------------------------------------------------------------------- + +#[derive(Debug, Deserialize)] +struct TokenResponse { + access_token: String, + #[serde(default)] + refresh_token: Option, + #[serde(default)] + expires_in: Option, + #[serde(default)] + id_token: Option, +} + +async fn exchange_code( + token_endpoint: &str, + code: &str, + redirect_uri: &str, + client_id: &str, + code_verifier: &str, +) -> Result { + let client = reqwest::Client::new(); + let resp = client + .post(token_endpoint) + .form(&[ + ("grant_type", "authorization_code"), + ("code", code), + ("redirect_uri", redirect_uri), + ("client_id", client_id), + ("code_verifier", code_verifier), + ]) + .send() + .await + .ctx("Failed to exchange authorization code")?; + + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + return Err(SunbeamError::identity(format!( + "Token exchange failed (HTTP {status}): {body}" + ))); + } + + let token_resp: TokenResponse = resp.json().await.ctx("Failed to parse token response")?; + Ok(token_resp) +} + +/// Refresh an access token using a refresh token. +async fn refresh_token(cached: &AuthTokens) -> Result { + let discovery = discover_oidc(&cached.domain).await?; + + // Try to get client_id from K8s, fall back to default + let client_id = resolve_client_id().await; + + let client = reqwest::Client::new(); + let resp = client + .post(&discovery.token_endpoint) + .form(&[ + ("grant_type", "refresh_token"), + ("refresh_token", &cached.refresh_token), + ("client_id", &client_id), + ]) + .send() + .await + .ctx("Failed to refresh token")?; + + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + return Err(SunbeamError::identity(format!( + "Token refresh failed (HTTP {status}): {body}" + ))); + } + + let token_resp: TokenResponse = resp + .json() + .await + .ctx("Failed to parse refresh token response")?; + + let expires_at = Utc::now() + + chrono::Duration::seconds(token_resp.expires_in.unwrap_or(3600)); + + let new_tokens = AuthTokens { + access_token: token_resp.access_token, + refresh_token: token_resp + .refresh_token + .unwrap_or_else(|| cached.refresh_token.clone()), + expires_at, + id_token: token_resp.id_token.or_else(|| cached.id_token.clone()), + domain: cached.domain.clone(), + gitea_token: cached.gitea_token.clone(), + }; + + write_cache(&new_tokens)?; + Ok(new_tokens) +} + +// --------------------------------------------------------------------------- +// Client ID resolution +// --------------------------------------------------------------------------- + +/// Try to read the client_id from K8s secret `oidc-sunbeam-cli` in `ory` namespace. +/// Falls back to the default client ID. +async fn resolve_client_id() -> String { + // The OAuth2Client is pre-created with a known client_id matching + // DEFAULT_CLIENT_ID ("sunbeam-cli") via a pre-seeded K8s secret. + // No cluster access needed. + DEFAULT_CLIENT_ID.to_string() +} + +// --------------------------------------------------------------------------- +// JWT payload decoding (minimal, no verification) +// --------------------------------------------------------------------------- + +/// Decode the payload of a JWT (middle segment) without verification. +/// Returns the parsed JSON value. +fn decode_jwt_payload(token: &str) -> Result { + let parts: Vec<&str> = token.splitn(3, '.').collect(); + if parts.len() < 2 { + return Err(SunbeamError::identity("Invalid JWT: not enough segments")); + } + let payload_bytes = base64::engine::general_purpose::URL_SAFE_NO_PAD + .decode(parts[1]) + .ctx("Failed to base64-decode JWT payload")?; + let payload: serde_json::Value = + serde_json::from_slice(&payload_bytes).ctx("Failed to parse JWT payload as JSON")?; + Ok(payload) +} + +/// Extract the email claim from an id_token. +fn extract_email(id_token: &str) -> Option { + let payload = decode_jwt_payload(id_token).ok()?; + payload + .get("email") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) +} + +// --------------------------------------------------------------------------- +// HTTP callback server +// --------------------------------------------------------------------------- + +/// Parsed callback parameters from the OAuth2 redirect. +struct CallbackParams { + code: String, + #[allow(dead_code)] + state: String, +} + +/// Bind a TCP listener for the OAuth2 callback, preferring ports 9876-9880. +async fn bind_callback_listener() -> Result<(tokio::net::TcpListener, u16)> { + for port in 9876..=9880 { + if let Ok(listener) = tokio::net::TcpListener::bind(("127.0.0.1", port)).await { + return Ok((listener, port)); + } + } + // Fall back to ephemeral port + let listener = tokio::net::TcpListener::bind("127.0.0.1:0") + .await + .ctx("Failed to bind callback listener")?; + let port = listener.local_addr().ctx("No local address")?.port(); + Ok((listener, port)) +} + +/// Wait for a single HTTP callback request, extract code and state, send HTML response. +async fn wait_for_callback( + listener: tokio::net::TcpListener, + expected_state: &str, + redirect_url: Option<&str>, +) -> Result { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + // Wait up to 5 minutes for the callback, or until Ctrl+C + let accept_result = tokio::time::timeout( + std::time::Duration::from_secs(300), + listener.accept(), + ) + .await + .map_err(|_| SunbeamError::identity("Login timed out (5 min). Try again with `sunbeam auth login`."))?; + + let (mut stream, _) = accept_result.ctx("Failed to accept callback connection")?; + + let mut buf = vec![0u8; 4096]; + let n = stream + .read(&mut buf) + .await + .ctx("Failed to read callback request")?; + let request = String::from_utf8_lossy(&buf[..n]); + + // Parse the GET request line: "GET /callback?code=...&state=... HTTP/1.1" + let request_line = request + .lines() + .next() + .ctx("Empty callback request")?; + + let path = request_line + .split_whitespace() + .nth(1) + .ctx("No path in callback request")?; + + // Parse query params + let query = path + .split('?') + .nth(1) + .ctx("No query params in callback")?; + + let mut code = None; + let mut state = None; + + for param in query.split('&') { + let mut kv = param.splitn(2, '='); + match (kv.next(), kv.next()) { + (Some("code"), Some(v)) => code = Some(v.to_string()), + (Some("state"), Some(v)) => state = Some(v.to_string()), + _ => {} + } + } + + let code = code.ok_or_else(|| SunbeamError::identity("No 'code' in callback"))?; + let state = state.ok_or_else(|| SunbeamError::identity("No 'state' in callback"))?; + + if state != expected_state { + return Err(SunbeamError::identity( + "OAuth2 state mismatch -- possible CSRF attack", + )); + } + + // Send success response — redirect to next step if provided, otherwise show done page + let response = if let Some(next_url) = redirect_url { + let html = format!( + "\ + \ +
\ +

SSO login successful

\ +

Redirecting to Gitea token setup...

\ +

Click here if not redirected

\ +
" + ); + format!( + "HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + html.len(), + html + ) + } else { + let html = "\ +
\ +

Authentication successful

\ +

You can close this tab and return to the terminal.

\ +
"; + format!( + "HTTP/1.1 200 OK\r\nContent-Type: text/html\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + html.len(), + html + ) + }; + let _ = stream.write_all(response.as_bytes()).await; + let _ = stream.shutdown().await; + + Ok(CallbackParams { code, state }) +} + +// --------------------------------------------------------------------------- +// Public API +// --------------------------------------------------------------------------- + +/// Get a valid access token, refreshing if needed. +/// +/// Returns the access token string ready for use in Authorization headers. +/// If no cached token exists or refresh fails, returns an error prompting +/// the user to run `sunbeam auth login`. +pub async fn get_token() -> Result { + let cached = match read_cache() { + Ok(tokens) => tokens, + Err(_) => { + return Err(SunbeamError::identity( + "Not logged in. Run `sunbeam auth login` to authenticate.", + )); + } + }; + + // Check if access token is still valid (>60s remaining) + let now = Utc::now(); + if cached.expires_at > now + chrono::Duration::seconds(60) { + return Ok(cached.access_token); + } + + // Try to refresh + if !cached.refresh_token.is_empty() { + match refresh_token(&cached).await { + Ok(new_tokens) => return Ok(new_tokens.access_token), + Err(e) => { + crate::output::warn(&format!("Token refresh failed: {e}")); + } + } + } + + Err(SunbeamError::identity( + "Session expired. Run `sunbeam auth login` to re-authenticate.", + )) +} + +/// Interactive browser-based OAuth2 login. +/// SSO login — Hydra OIDC authorization code flow with PKCE. +/// `gitea_redirect`: if Some, the browser callback page auto-redirects to Gitea token page. +pub async fn cmd_auth_sso_login_with_redirect( + domain_override: Option<&str>, + gitea_redirect: Option<&str>, +) -> Result<()> { + crate::output::step("Authenticating with Hydra"); + + // Resolve domain: explicit flag > cached token domain > config > cluster discovery + let domain = resolve_domain(domain_override).await?; + + crate::output::ok(&format!("Domain: {domain}")); + + // OIDC discovery + let discovery = discover_oidc(&domain).await?; + + // Resolve client_id + let client_id = resolve_client_id().await; + + // Generate PKCE + let (code_verifier, code_challenge) = generate_pkce(); + + // Generate state + let state = generate_state(); + + // Bind callback listener + let (listener, port) = bind_callback_listener().await?; + let redirect_uri = format!("http://localhost:{port}/callback"); + + // Build authorization URL + let auth_url = format!( + "{}?client_id={}&redirect_uri={}&response_type=code&scope={}&code_challenge={}&code_challenge_method=S256&state={}", + discovery.authorization_endpoint, + urlencoding(&client_id), + urlencoding(&redirect_uri), + "openid%20email%20profile%20offline_access", + code_challenge, + state, + ); + + crate::output::ok("Opening browser for login..."); + println!("\n {auth_url}\n"); + + // Try to open the browser + let _open_result = open_browser(&auth_url); + + // Wait for callback + crate::output::ok("Waiting for authentication callback..."); + let callback = wait_for_callback(listener, &state, gitea_redirect).await?; + + // Exchange code for tokens + crate::output::ok("Exchanging authorization code for tokens..."); + let token_resp = exchange_code( + &discovery.token_endpoint, + &callback.code, + &redirect_uri, + &client_id, + &code_verifier, + ) + .await?; + + let expires_at = Utc::now() + + chrono::Duration::seconds(token_resp.expires_in.unwrap_or(3600)); + + let tokens = AuthTokens { + access_token: token_resp.access_token, + refresh_token: token_resp.refresh_token.unwrap_or_default(), + expires_at, + id_token: token_resp.id_token.clone(), + domain: domain.clone(), + gitea_token: None, + }; + + // Print success with email if available + let email = tokens + .id_token + .as_ref() + .and_then(|t| extract_email(t)); + if let Some(ref email) = email { + crate::output::ok(&format!("Logged in as {email}")); + } else { + crate::output::ok("Logged in successfully"); + } + + write_cache(&tokens)?; + Ok(()) +} + +/// SSO login — standalone (no redirect after callback). +pub async fn cmd_auth_sso_login(domain_override: Option<&str>) -> Result<()> { + cmd_auth_sso_login_with_redirect(domain_override, None).await +} + +/// Gitea token login — opens the PAT creation page and prompts for the token. +pub async fn cmd_auth_git_login(domain_override: Option<&str>) -> Result<()> { + crate::output::step("Setting up Gitea API access"); + + let domain = resolve_domain(domain_override).await?; + let url = format!("https://src.{domain}/user/settings/applications"); + + crate::output::ok("Opening Gitea token page in your browser..."); + crate::output::ok("Create a token with all scopes selected, then paste it below."); + println!("\n {url}\n"); + + let _ = open_browser(&url); + + // Prompt for the token + eprint!(" Gitea token: "); + let mut token = String::new(); + std::io::stdin() + .read_line(&mut token) + .ctx("Failed to read token from stdin")?; + let token = token.trim().to_string(); + + if token.is_empty() { + return Err(SunbeamError::identity("No token provided.")); + } + + // Verify the token works + let client = reqwest::Client::new(); + let resp = client + .get(format!("https://src.{domain}/api/v1/user")) + .header("Authorization", format!("token {token}")) + .send() + .await + .ctx("Failed to verify Gitea token")?; + + if !resp.status().is_success() { + return Err(SunbeamError::identity(format!( + "Gitea token is invalid (HTTP {}). Check the token and try again.", + resp.status() + ))); + } + + let user: serde_json::Value = resp.json().await?; + let login = user + .get("login") + .and_then(|v| v.as_str()) + .unwrap_or("unknown"); + + // Save to cache + let mut tokens = read_cache().unwrap_or_else(|_| AuthTokens { + access_token: String::new(), + refresh_token: String::new(), + expires_at: Utc::now(), + id_token: None, + domain: domain.clone(), + gitea_token: None, + }); + tokens.gitea_token = Some(token); + if tokens.domain.is_empty() { + tokens.domain = domain; + } + write_cache(&tokens)?; + + crate::output::ok(&format!("Gitea authenticated as {login}")); + Ok(()) +} + +/// Combined login — SSO first, then Gitea. +pub async fn cmd_auth_login_all(domain_override: Option<&str>) -> Result<()> { + // Resolve domain early so we can build the Gitea redirect URL + let domain = resolve_domain(domain_override).await?; + let gitea_url = format!("https://src.{domain}/user/settings/applications"); + cmd_auth_sso_login_with_redirect(Some(&domain), Some(&gitea_url)).await?; + cmd_auth_git_login(Some(&domain)).await?; + Ok(()) +} + +/// Get the Gitea API token (for use by pm.rs). +pub fn get_gitea_token() -> Result { + let tokens = read_cache().map_err(|_| { + SunbeamError::identity("Not logged in. Run `sunbeam auth login` first.") + })?; + tokens.gitea_token.ok_or_else(|| { + SunbeamError::identity( + "No Gitea token. Run `sunbeam auth login` or `sunbeam auth set-gitea-token `.", + ) + }) +} + +/// Remove cached auth tokens. +pub async fn cmd_auth_logout() -> Result<()> { + let path = cache_path(); + if path.exists() { + std::fs::remove_file(&path) + .with_ctx(|| format!("Failed to remove {}", path.display()))?; + crate::output::ok("Logged out (cached tokens removed)"); + } else { + crate::output::ok("Not logged in (no cached tokens to remove)"); + } + Ok(()) +} + +/// Print current auth status. +pub async fn cmd_auth_status() -> Result<()> { + match read_cache() { + Ok(tokens) => { + let now = Utc::now(); + let expired = tokens.expires_at <= now; + + // Try to get email from id_token + let identity = tokens + .id_token + .as_deref() + .and_then(extract_email) + .unwrap_or_else(|| "unknown".to_string()); + + if expired { + crate::output::ok(&format!( + "Logged in as {identity} (token expired at {})", + tokens.expires_at.format("%Y-%m-%d %H:%M:%S UTC") + )); + if !tokens.refresh_token.is_empty() { + crate::output::ok("Token can be refreshed automatically on next use"); + } + } else { + crate::output::ok(&format!( + "Logged in as {identity} (token valid until {})", + tokens.expires_at.format("%Y-%m-%d %H:%M:%S UTC") + )); + } + crate::output::ok(&format!("Domain: {}", tokens.domain)); + } + Err(_) => { + crate::output::ok("Not logged in. Run `sunbeam auth login` to authenticate."); + } + } + Ok(()) +} + +// --------------------------------------------------------------------------- +// Utility helpers +// --------------------------------------------------------------------------- + +/// Minimal percent-encoding for URL query parameters. +fn urlencoding(s: &str) -> String { + let mut out = String::with_capacity(s.len()); + for b in s.bytes() { + match b { + b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => { + out.push(b as char); + } + _ => { + out.push_str(&format!("%{:02X}", b)); + } + } + } + out +} + +/// Try to open a URL in the default browser. +fn open_browser(url: &str) -> std::result::Result<(), std::io::Error> { + #[cfg(target_os = "macos")] + { + std::process::Command::new("open").arg(url).spawn()?; + } + #[cfg(target_os = "linux")] + { + std::process::Command::new("xdg-open").arg(url).spawn()?; + } + #[cfg(not(any(target_os = "macos", target_os = "linux")))] + { + let _ = url; + // No-op on unsupported platforms; URL is printed to the terminal. + } + Ok(()) +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + use chrono::Duration; + + #[test] + fn test_pkce_generation() { + let (verifier, challenge) = generate_pkce(); + + // Verifier should be base64url-encoded 32 bytes -> 43 chars + assert_eq!(verifier.len(), 43); + + // Challenge should be base64url-encoded SHA256 -> 43 chars + assert_eq!(challenge.len(), 43); + + // Verify the challenge matches the verifier + let expected_hash = Sha256::digest(verifier.as_bytes()); + let expected_challenge = + base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(expected_hash); + assert_eq!(challenge, expected_challenge); + + // Two calls should produce different values + let (v2, c2) = generate_pkce(); + assert_ne!(verifier, v2); + assert_ne!(challenge, c2); + } + + #[test] + fn test_token_cache_roundtrip() { + let tokens = AuthTokens { + access_token: "access_abc".to_string(), + refresh_token: "refresh_xyz".to_string(), + expires_at: Utc::now() + Duration::hours(1), + id_token: Some("eyJhbGciOiJSUzI1NiJ9.eyJlbWFpbCI6InRlc3RAZXhhbXBsZS5jb20ifQ.sig".to_string()), + domain: "sunbeam.pt".to_string(), + gitea_token: None, + }; + + let json = serde_json::to_string_pretty(&tokens).unwrap(); + let deserialized: AuthTokens = serde_json::from_str(&json).unwrap(); + + assert_eq!(deserialized.access_token, "access_abc"); + assert_eq!(deserialized.refresh_token, "refresh_xyz"); + assert_eq!(deserialized.domain, "sunbeam.pt"); + assert!(deserialized.id_token.is_some()); + + // Verify expires_at survives roundtrip (within 1 second tolerance) + let diff = (deserialized.expires_at - tokens.expires_at) + .num_milliseconds() + .abs(); + assert!(diff < 1000, "expires_at drift: {diff}ms"); + } + + #[test] + fn test_token_cache_roundtrip_no_id_token() { + let tokens = AuthTokens { + access_token: "access".to_string(), + refresh_token: "refresh".to_string(), + expires_at: Utc::now() + Duration::hours(1), + id_token: None, + domain: "example.com".to_string(), + gitea_token: None, + }; + + let json = serde_json::to_string(&tokens).unwrap(); + // id_token should be absent from the JSON when None + assert!(!json.contains("id_token")); + + let deserialized: AuthTokens = serde_json::from_str(&json).unwrap(); + assert!(deserialized.id_token.is_none()); + } + + #[test] + fn test_token_expiry_check_valid() { + let tokens = AuthTokens { + access_token: "valid".to_string(), + refresh_token: "refresh".to_string(), + expires_at: Utc::now() + Duration::hours(1), + id_token: None, + domain: "example.com".to_string(), + gitea_token: None, + }; + + let now = Utc::now(); + // Token is valid: more than 60 seconds until expiry + assert!(tokens.expires_at > now + Duration::seconds(60)); + } + + #[test] + fn test_token_expiry_check_expired() { + let tokens = AuthTokens { + access_token: "expired".to_string(), + refresh_token: "refresh".to_string(), + expires_at: Utc::now() - Duration::hours(1), + id_token: None, + domain: "example.com".to_string(), + gitea_token: None, + }; + + let now = Utc::now(); + // Token is expired + assert!(tokens.expires_at <= now + Duration::seconds(60)); + } + + #[test] + fn test_token_expiry_check_almost_expired() { + let tokens = AuthTokens { + access_token: "almost".to_string(), + refresh_token: "refresh".to_string(), + expires_at: Utc::now() + Duration::seconds(30), + id_token: None, + domain: "example.com".to_string(), + gitea_token: None, + }; + + let now = Utc::now(); + // Token expires in 30s, which is within the 60s threshold + assert!(tokens.expires_at <= now + Duration::seconds(60)); + } + + #[test] + fn test_jwt_payload_decode() { + // Build a fake JWT: header.payload.signature + let payload_json = r#"{"email":"user@example.com","sub":"12345"}"#; + let encoded_payload = + base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(payload_json.as_bytes()); + let fake_jwt = format!("eyJhbGciOiJSUzI1NiJ9.{encoded_payload}.fakesig"); + + let payload = decode_jwt_payload(&fake_jwt).unwrap(); + assert_eq!(payload["email"], "user@example.com"); + assert_eq!(payload["sub"], "12345"); + } + + #[test] + fn test_extract_email() { + let payload_json = r#"{"email":"alice@sunbeam.pt","name":"Alice"}"#; + let encoded_payload = + base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(payload_json.as_bytes()); + let fake_jwt = format!("eyJhbGciOiJSUzI1NiJ9.{encoded_payload}.fakesig"); + + assert_eq!(extract_email(&fake_jwt), Some("alice@sunbeam.pt".to_string())); + } + + #[test] + fn test_extract_email_missing() { + let payload_json = r#"{"sub":"12345","name":"Bob"}"#; + let encoded_payload = + base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(payload_json.as_bytes()); + let fake_jwt = format!("eyJhbGciOiJSUzI1NiJ9.{encoded_payload}.fakesig"); + + assert_eq!(extract_email(&fake_jwt), None); + } + + #[test] + fn test_urlencoding() { + assert_eq!(urlencoding("hello"), "hello"); + assert_eq!(urlencoding("hello world"), "hello%20world"); + assert_eq!( + urlencoding("http://localhost:9876/callback"), + "http%3A%2F%2Flocalhost%3A9876%2Fcallback" + ); + } + + #[test] + fn test_generate_state() { + let s1 = generate_state(); + let s2 = generate_state(); + assert_ne!(s1, s2); + // 16 bytes base64url -> 22 chars + assert_eq!(s1.len(), 22); + } + + #[test] + fn test_cache_path_is_under_sunbeam() { + let path = cache_path_for_domain("sunbeam.pt"); + let path_str = path.to_string_lossy(); + assert!(path_str.contains("sunbeam")); + assert!(path_str.contains("auth")); + assert!(path_str.ends_with("sunbeam.pt.json")); + } + + #[test] + fn test_cache_path_default_domain() { + let path = cache_path_for_domain(""); + assert!(path.to_string_lossy().ends_with("default.json")); + } +} diff --git a/sunbeam-sdk/src/cluster/mod.rs b/sunbeam-sdk/src/cluster/mod.rs new file mode 100644 index 0000000..e5de3cf --- /dev/null +++ b/sunbeam-sdk/src/cluster/mod.rs @@ -0,0 +1,461 @@ +//! Cluster lifecycle — cert-manager, Linkerd, TLS, core service readiness. +//! +//! Pure K8s implementation: no Lima VM operations. + +use crate::constants::GITEA_ADMIN_USER; +use crate::error::{Result, ResultExt, SunbeamError}; +use std::path::PathBuf; + +const CERT_MANAGER_URL: &str = + "https://github.com/cert-manager/cert-manager/releases/download/v1.17.0/cert-manager.yaml"; + +const GATEWAY_API_CRDS_URL: &str = + "https://github.com/kubernetes-sigs/gateway-api/releases/download/v1.4.0/standard-install.yaml"; + +fn secrets_dir() -> PathBuf { + crate::config::get_infra_dir() + .join("secrets") + .join("local") +} + +// --------------------------------------------------------------------------- +// cert-manager +// --------------------------------------------------------------------------- + +async fn ensure_cert_manager() -> Result<()> { + crate::output::step("cert-manager..."); + + if crate::kube::ns_exists("cert-manager").await? { + crate::output::ok("Already installed."); + return Ok(()); + } + + crate::output::ok("Installing..."); + + // Download and apply cert-manager YAML + let body = reqwest::get(CERT_MANAGER_URL) + .await + .ctx("Failed to download cert-manager manifest")? + .text() + .await + .ctx("Failed to read cert-manager manifest body")?; + + crate::kube::kube_apply(&body).await?; + + // Wait for rollout + for dep in &[ + "cert-manager", + "cert-manager-webhook", + "cert-manager-cainjector", + ] { + crate::output::ok(&format!("Waiting for {dep}...")); + wait_rollout("cert-manager", dep, 120).await?; + } + + crate::output::ok("Installed."); + Ok(()) +} + +// --------------------------------------------------------------------------- +// Linkerd +// --------------------------------------------------------------------------- + +async fn ensure_linkerd() -> Result<()> { + crate::output::step("Linkerd..."); + + if crate::kube::ns_exists("linkerd").await? { + crate::output::ok("Already installed."); + return Ok(()); + } + + // Gateway API CRDs + crate::output::ok("Installing Gateway API CRDs..."); + let gateway_body = reqwest::get(GATEWAY_API_CRDS_URL) + .await + .ctx("Failed to download Gateway API CRDs")? + .text() + .await?; + + // Gateway API CRDs require server-side apply; kube_apply already does SSA + crate::kube::kube_apply(&gateway_body).await?; + + // Linkerd CRDs via subprocess (no pure HTTP source for linkerd manifests) + crate::output::ok("Installing Linkerd CRDs..."); + let crds_output = tokio::process::Command::new("linkerd") + .args(["install", "--crds"]) + .output() + .await + .ctx("Failed to run `linkerd install --crds`")?; + + if !crds_output.status.success() { + let stderr = String::from_utf8_lossy(&crds_output.stderr); + return Err(SunbeamError::tool("linkerd", format!("install --crds failed: {stderr}"))); + } + let crds = String::from_utf8_lossy(&crds_output.stdout); + crate::kube::kube_apply(&crds).await?; + + // Linkerd control plane + crate::output::ok("Installing Linkerd control plane..."); + let cp_output = tokio::process::Command::new("linkerd") + .args(["install"]) + .output() + .await + .ctx("Failed to run `linkerd install`")?; + + if !cp_output.status.success() { + let stderr = String::from_utf8_lossy(&cp_output.stderr); + return Err(SunbeamError::tool("linkerd", format!("install failed: {stderr}"))); + } + let cp = String::from_utf8_lossy(&cp_output.stdout); + crate::kube::kube_apply(&cp).await?; + + for dep in &[ + "linkerd-identity", + "linkerd-destination", + "linkerd-proxy-injector", + ] { + crate::output::ok(&format!("Waiting for {dep}...")); + wait_rollout("linkerd", dep, 120).await?; + } + + crate::output::ok("Installed."); + Ok(()) +} + +// --------------------------------------------------------------------------- +// TLS certificate (rcgen) +// --------------------------------------------------------------------------- + +async fn ensure_tls_cert(domain: &str) -> Result<()> { + crate::output::step("TLS certificate..."); + + let dir = secrets_dir(); + let cert_path = dir.join("tls.crt"); + let key_path = dir.join("tls.key"); + + if cert_path.exists() { + crate::output::ok(&format!("Cert exists. Domain: {domain}")); + return Ok(()); + } + + crate::output::ok(&format!("Generating wildcard cert for *.{domain}...")); + std::fs::create_dir_all(&dir) + .with_ctx(|| format!("Failed to create secrets dir: {}", dir.display()))?; + + let subject_alt_names = vec![format!("*.{domain}")]; + let mut params = rcgen::CertificateParams::new(subject_alt_names) + .map_err(|e| SunbeamError::kube(format!("Failed to create certificate params: {e}")))?; + params + .distinguished_name + .push(rcgen::DnType::CommonName, format!("*.{domain}")); + + let key_pair = rcgen::KeyPair::generate() + .map_err(|e| SunbeamError::kube(format!("Failed to generate key pair: {e}")))?; + let cert = params + .self_signed(&key_pair) + .map_err(|e| SunbeamError::kube(format!("Failed to generate self-signed certificate: {e}")))?; + + std::fs::write(&cert_path, cert.pem()) + .with_ctx(|| format!("Failed to write {}", cert_path.display()))?; + std::fs::write(&key_path, key_pair.serialize_pem()) + .with_ctx(|| format!("Failed to write {}", key_path.display()))?; + + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600))?; + } + + crate::output::ok(&format!("Cert generated. Domain: {domain}")); + Ok(()) +} + +// --------------------------------------------------------------------------- +// TLS secret +// --------------------------------------------------------------------------- + +async fn ensure_tls_secret(domain: &str) -> Result<()> { + crate::output::step("TLS secret..."); + + let _ = domain; // domain used contextually above; secret uses files + crate::kube::ensure_ns("ingress").await?; + + let dir = secrets_dir(); + let cert_pem = + std::fs::read_to_string(dir.join("tls.crt")).ctx("Failed to read tls.crt")?; + let key_pem = + std::fs::read_to_string(dir.join("tls.key")).ctx("Failed to read tls.key")?; + + // Create TLS secret via kube-rs + let client = crate::kube::get_client().await?; + let api: kube::api::Api = + kube::api::Api::namespaced(client.clone(), "ingress"); + + let b64_cert = base64::Engine::encode( + &base64::engine::general_purpose::STANDARD, + cert_pem.as_bytes(), + ); + let b64_key = base64::Engine::encode( + &base64::engine::general_purpose::STANDARD, + key_pem.as_bytes(), + ); + + let secret_obj = serde_json::json!({ + "apiVersion": "v1", + "kind": "Secret", + "metadata": { + "name": "pingora-tls", + "namespace": "ingress", + }, + "type": "kubernetes.io/tls", + "data": { + "tls.crt": b64_cert, + "tls.key": b64_key, + }, + }); + + let pp = kube::api::PatchParams::apply("sunbeam").force(); + api.patch("pingora-tls", &pp, &kube::api::Patch::Apply(secret_obj)) + .await + .ctx("Failed to create TLS secret")?; + + crate::output::ok("Done."); + Ok(()) +} + +// --------------------------------------------------------------------------- +// Wait for core +// --------------------------------------------------------------------------- + +async fn wait_for_core() -> Result<()> { + crate::output::step("Waiting for core services..."); + + for (ns, dep) in &[("data", "valkey"), ("ory", "kratos"), ("ory", "hydra")] { + let _ = wait_rollout(ns, dep, 120).await; + } + + crate::output::ok("Core services ready."); + Ok(()) +} + +// --------------------------------------------------------------------------- +// Print URLs +// --------------------------------------------------------------------------- + +fn print_urls(domain: &str, _gitea_admin_pass: &str) { + let sep = "\u{2500}".repeat(60); + println!("\n{sep}"); + println!(" Stack is up. Domain: {domain}"); + println!("{sep}"); + + let urls: &[(&str, String)] = &[ + ("Auth", format!("https://auth.{domain}/")), + ("Docs", format!("https://docs.{domain}/")), + ("Meet", format!("https://meet.{domain}/")), + ("Drive", format!("https://drive.{domain}/")), + ("Chat", format!("https://chat.{domain}/")), + ("Mail", format!("https://mail.{domain}/")), + ("People", format!("https://people.{domain}/")), + ( + "Gitea", + format!( + "https://src.{domain}/ ({GITEA_ADMIN_USER} / )" + ), + ), + ]; + + for (name, url) in urls { + println!(" {name:<10} {url}"); + } + + println!(); + println!(" OpenBao UI:"); + println!(" kubectl --context=sunbeam -n data port-forward svc/openbao 8200:8200"); + println!(" http://localhost:8200"); + println!( + " token: kubectl --context=sunbeam -n data get secret openbao-keys \ + -o jsonpath='{{.data.root-token}}' | base64 -d" + ); + println!("{sep}\n"); +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/// Poll deployment rollout status (approximate: check Available condition). +async fn wait_rollout(ns: &str, deployment: &str, timeout_secs: u64) -> Result<()> { + use k8s_openapi::api::apps::v1::Deployment; + use std::time::{Duration, Instant}; + + let client = crate::kube::get_client().await?; + let api: kube::api::Api = kube::api::Api::namespaced(client.clone(), ns); + + let deadline = Instant::now() + Duration::from_secs(timeout_secs); + + loop { + if Instant::now() > deadline { + return Err(SunbeamError::kube(format!("Timed out waiting for deployment {ns}/{deployment}"))); + } + + match api.get_opt(deployment).await? { + Some(dep) => { + if let Some(status) = &dep.status { + if let Some(conditions) = &status.conditions { + let available = conditions.iter().any(|c| { + c.type_ == "Available" && c.status == "True" + }); + if available { + return Ok(()); + } + } + } + } + None => { + // Deployment doesn't exist yet — keep waiting + } + } + + tokio::time::sleep(Duration::from_secs(3)).await; + } +} + +// --------------------------------------------------------------------------- +// Commands +// --------------------------------------------------------------------------- + +/// Full cluster bring-up (pure K8s — no Lima VM operations). +pub async fn cmd_up() -> Result<()> { + // Resolve domain from cluster state + let domain = crate::kube::get_domain().await?; + + ensure_cert_manager().await?; + ensure_linkerd().await?; + ensure_tls_cert(&domain).await?; + ensure_tls_secret(&domain).await?; + + // Apply manifests + crate::manifests::cmd_apply("local", &domain, "", "").await?; + + // Seed secrets + crate::secrets::cmd_seed().await?; + + // Gitea bootstrap + crate::gitea::cmd_bootstrap().await?; + + // Mirror amd64-only images + crate::images::cmd_mirror().await?; + + // Wait for core services + wait_for_core().await?; + + // Get gitea admin password for URL display + let admin_pass = crate::kube::kube_get_secret_field( + "devtools", + "gitea-admin-credentials", + "password", + ) + .await + .unwrap_or_default(); + + print_urls(&domain, &admin_pass); + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn cert_manager_url_points_to_github_release() { + assert!(CERT_MANAGER_URL.starts_with("https://github.com/cert-manager/cert-manager/")); + assert!(CERT_MANAGER_URL.contains("/releases/download/")); + assert!(CERT_MANAGER_URL.ends_with(".yaml")); + } + + #[test] + fn cert_manager_url_has_version() { + // Verify the URL contains a version tag like v1.x.x + assert!( + CERT_MANAGER_URL.contains("/v1."), + "CERT_MANAGER_URL should reference a v1.x release" + ); + } + + #[test] + fn gateway_api_crds_url_points_to_github_release() { + assert!(GATEWAY_API_CRDS_URL + .starts_with("https://github.com/kubernetes-sigs/gateway-api/")); + assert!(GATEWAY_API_CRDS_URL.contains("/releases/download/")); + assert!(GATEWAY_API_CRDS_URL.ends_with(".yaml")); + } + + #[test] + fn gateway_api_crds_url_has_version() { + assert!( + GATEWAY_API_CRDS_URL.contains("/v1."), + "GATEWAY_API_CRDS_URL should reference a v1.x release" + ); + } + + #[test] + fn secrets_dir_ends_with_secrets_local() { + let dir = secrets_dir(); + assert!( + dir.ends_with("secrets/local"), + "secrets_dir() should end with secrets/local, got: {}", + dir.display() + ); + } + + #[test] + fn secrets_dir_has_at_least_three_components() { + let dir = secrets_dir(); + let components: Vec<_> = dir.components().collect(); + assert!( + components.len() >= 3, + "secrets_dir() should have at least 3 path components (base/secrets/local), got: {}", + dir.display() + ); + } + + #[test] + fn gitea_admin_user_constant() { + assert_eq!(GITEA_ADMIN_USER, "gitea_admin"); + } + + #[test] + fn print_urls_contains_expected_services() { + // Capture print_urls output by checking the URL construction logic. + // We can't easily capture stdout in unit tests, but we can verify + // the URL format matches expectations. + let domain = "test.local"; + let expected_urls = [ + format!("https://auth.{domain}/"), + format!("https://docs.{domain}/"), + format!("https://meet.{domain}/"), + format!("https://drive.{domain}/"), + format!("https://chat.{domain}/"), + format!("https://mail.{domain}/"), + format!("https://people.{domain}/"), + format!("https://src.{domain}/"), + ]; + + // Verify URL patterns are valid + for url in &expected_urls { + assert!(url.starts_with("https://")); + assert!(url.contains(domain)); + } + } + + #[test] + fn print_urls_gitea_includes_credentials() { + let domain = "example.local"; + let gitea_url = format!( + "https://src.{domain}/ ({GITEA_ADMIN_USER} / )" + ); + assert!(gitea_url.contains(GITEA_ADMIN_USER)); + assert!(gitea_url.contains("")); + assert!(gitea_url.contains(&format!("src.{domain}"))); + } +} diff --git a/sunbeam-sdk/src/gitea/mod.rs b/sunbeam-sdk/src/gitea/mod.rs new file mode 100644 index 0000000..b94924c --- /dev/null +++ b/sunbeam-sdk/src/gitea/mod.rs @@ -0,0 +1,429 @@ +//! Gitea bootstrap -- admin setup, org creation, OIDC auth source configuration. + +use crate::error::Result; +use k8s_openapi::api::core::v1::Pod; +use kube::api::{Api, ListParams}; +use serde_json::Value; + +use crate::kube::{get_client, get_domain, kube_exec, kube_get_secret_field}; +use crate::output::{ok, step, warn}; + +const GITEA_ADMIN_USER: &str = "gitea_admin"; +const GITEA_ADMIN_EMAIL: &str = "gitea@local.domain"; + +/// Bootstrap Gitea: set admin password, create orgs, configure OIDC. +pub async fn cmd_bootstrap() -> Result<()> { + let domain = get_domain().await?; + + // Retrieve gitea admin password from cluster secret + let gitea_admin_pass = kube_get_secret_field("devtools", "gitea-admin-credentials", "password") + .await + .unwrap_or_default(); + + if gitea_admin_pass.is_empty() { + warn("gitea-admin-credentials password not found -- cannot bootstrap."); + return Ok(()); + } + + step("Bootstrapping Gitea..."); + + // Wait for a Running + Ready Gitea pod + let pod_name = wait_for_gitea_pod().await?; + let Some(pod) = pod_name else { + warn("Gitea pod not ready after 3 min -- skipping bootstrap."); + return Ok(()); + }; + + // Set admin password + set_admin_password(&pod, &gitea_admin_pass).await?; + + // Mark admin as private + mark_admin_private(&pod, &gitea_admin_pass).await?; + + // Create orgs + create_orgs(&pod, &gitea_admin_pass).await?; + + // Configure OIDC auth source + configure_oidc(&pod, &gitea_admin_pass).await?; + + ok(&format!( + "Gitea ready -- https://src.{domain} ({GITEA_ADMIN_USER} / )" + )); + Ok(()) +} + +/// Wait for a Running + Ready Gitea pod (up to 3 minutes). +async fn wait_for_gitea_pod() -> Result> { + let client = get_client().await?; + let pods: Api = Api::namespaced(client.clone(), "devtools"); + + for _ in 0..60 { + let lp = ListParams::default().labels("app.kubernetes.io/name=gitea"); + if let Ok(pod_list) = pods.list(&lp).await { + for pod in &pod_list.items { + let phase = pod + .status + .as_ref() + .and_then(|s| s.phase.as_deref()) + .unwrap_or(""); + + if phase != "Running" { + continue; + } + + let ready = pod + .status + .as_ref() + .and_then(|s| s.container_statuses.as_ref()) + .and_then(|cs| cs.first()) + .map(|c| c.ready) + .unwrap_or(false); + + if ready { + let name = pod + .metadata + .name + .as_deref() + .unwrap_or("") + .to_string(); + if !name.is_empty() { + return Ok(Some(name)); + } + } + } + } + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + } + + Ok(None) +} + +/// Set the admin password via gitea CLI exec. +async fn set_admin_password(pod: &str, password: &str) -> Result<()> { + let (code, output) = kube_exec( + "devtools", + pod, + &[ + "gitea", + "admin", + "user", + "change-password", + "--username", + GITEA_ADMIN_USER, + "--password", + password, + "--must-change-password=false", + ], + Some("gitea"), + ) + .await?; + + if code == 0 || output.to_lowercase().contains("password") { + ok(&format!("Admin '{GITEA_ADMIN_USER}' password set.")); + } else { + warn(&format!("change-password: {output}")); + } + Ok(()) +} + +/// Call Gitea API via kubectl exec + curl inside the pod. +async fn gitea_api( + pod: &str, + method: &str, + path: &str, + password: &str, + data: Option<&Value>, +) -> Result { + let url = format!("http://localhost:3000/api/v1{path}"); + let auth = format!("{GITEA_ADMIN_USER}:{password}"); + + let mut args = vec![ + "curl", "-s", "-X", method, &url, "-H", "Content-Type: application/json", "-u", &auth, + ]; + + let data_str; + if let Some(d) = data { + data_str = serde_json::to_string(d)?; + args.push("-d"); + args.push(&data_str); + } + + let (_, stdout) = kube_exec("devtools", pod, &args, Some("gitea")).await?; + + Ok(serde_json::from_str(&stdout).unwrap_or(Value::Object(Default::default()))) +} + +/// Mark the admin account as private. +async fn mark_admin_private(pod: &str, password: &str) -> Result<()> { + let data = serde_json::json!({ + "source_id": 0, + "login_name": GITEA_ADMIN_USER, + "email": GITEA_ADMIN_EMAIL, + "visibility": "private", + }); + + let result = gitea_api( + pod, + "PATCH", + &format!("/admin/users/{GITEA_ADMIN_USER}"), + password, + Some(&data), + ) + .await?; + + if result.get("login").and_then(|v| v.as_str()) == Some(GITEA_ADMIN_USER) { + ok(&format!("Admin '{GITEA_ADMIN_USER}' marked as private.")); + } else { + warn(&format!("Could not set admin visibility: {result}")); + } + Ok(()) +} + +/// Create the studio and internal organizations. +async fn create_orgs(pod: &str, password: &str) -> Result<()> { + let orgs = [ + ("studio", "public", "Public source code"), + ("internal", "private", "Internal tools and services"), + ]; + + for (org_name, visibility, desc) in &orgs { + let data = serde_json::json!({ + "username": org_name, + "visibility": visibility, + "description": desc, + }); + + let result = gitea_api(pod, "POST", "/orgs", password, Some(&data)).await?; + + if result.get("id").is_some() { + ok(&format!("Created org '{org_name}'.")); + } else if result + .get("message") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_lowercase() + .contains("already") + { + ok(&format!("Org '{org_name}' already exists.")); + } else { + let msg = result + .get("message") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) + .unwrap_or_else(|| format!("{result}")); + warn(&format!("Org '{org_name}': {msg}")); + } + } + Ok(()) +} + +/// Configure Hydra as the OIDC authentication source. +async fn configure_oidc(pod: &str, _password: &str) -> Result<()> { + // List existing auth sources + let (_, auth_list_output) = + kube_exec("devtools", pod, &["gitea", "admin", "auth", "list"], Some("gitea")).await?; + + let mut existing_id: Option = None; + let mut exact_ok = false; + + for line in auth_list_output.lines().skip(1) { + // Tab-separated: ID\tName\tType\tEnabled + let parts: Vec<&str> = line.split('\t').collect(); + if parts.len() < 2 { + continue; + } + let src_id = parts[0].trim(); + let src_name = parts[1].trim(); + + if src_name == "Sunbeam" { + exact_ok = true; + break; + } + + let src_type = if parts.len() > 2 { + parts[2].trim() + } else { + "" + }; + + if src_name == "Sunbeam Auth" + || (src_name.starts_with("Sunbeam") && src_type == "OAuth2") + { + existing_id = Some(src_id.to_string()); + } + } + + if exact_ok { + ok("OIDC auth source 'Sunbeam' already present."); + return Ok(()); + } + + if let Some(eid) = existing_id { + // Wrong name -- rename in-place + let (code, stderr) = kube_exec( + "devtools", + pod, + &[ + "gitea", + "admin", + "auth", + "update-oauth", + "--id", + &eid, + "--name", + "Sunbeam", + ], + Some("gitea"), + ) + .await?; + + if code == 0 { + ok(&format!( + "Renamed OIDC auth source (id={eid}) to 'Sunbeam'." + )); + } else { + warn(&format!("Rename failed: {stderr}")); + } + return Ok(()); + } + + // Create new OIDC auth source + let oidc_id = kube_get_secret_field("lasuite", "oidc-gitea", "CLIENT_ID").await; + let oidc_secret = kube_get_secret_field("lasuite", "oidc-gitea", "CLIENT_SECRET").await; + + match (oidc_id, oidc_secret) { + (Ok(oidc_id), Ok(oidc_sec)) => { + let discover_url = + "http://hydra-public.ory.svc.cluster.local:4444/.well-known/openid-configuration"; + + let (code, stderr) = kube_exec( + "devtools", + pod, + &[ + "gitea", + "admin", + "auth", + "add-oauth", + "--name", + "Sunbeam", + "--provider", + "openidConnect", + "--key", + &oidc_id, + "--secret", + &oidc_sec, + "--auto-discover-url", + discover_url, + "--scopes", + "openid", + "--scopes", + "email", + "--scopes", + "profile", + ], + Some("gitea"), + ) + .await?; + + if code == 0 { + ok("OIDC auth source 'Sunbeam' configured."); + } else { + warn(&format!("OIDC auth source config failed: {stderr}")); + } + } + _ => { + warn("oidc-gitea secret not found -- OIDC auth source not configured."); + } + } + + Ok(()) +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_constants() { + assert_eq!(GITEA_ADMIN_USER, "gitea_admin"); + assert_eq!(GITEA_ADMIN_EMAIL, "gitea@local.domain"); + } + + #[test] + fn test_org_definitions() { + // Verify the org configs match the Python version + let orgs = [ + ("studio", "public", "Public source code"), + ("internal", "private", "Internal tools and services"), + ]; + assert_eq!(orgs[0].0, "studio"); + assert_eq!(orgs[0].1, "public"); + assert_eq!(orgs[1].0, "internal"); + assert_eq!(orgs[1].1, "private"); + } + + #[test] + fn test_parse_auth_list_output() { + let output = "ID\tName\tType\tEnabled\n1\tSunbeam\tOAuth2\ttrue\n"; + let mut found = false; + for line in output.lines().skip(1) { + let parts: Vec<&str> = line.split('\t').collect(); + if parts.len() >= 2 && parts[1].trim() == "Sunbeam" { + found = true; + } + } + assert!(found); + } + + #[test] + fn test_parse_auth_list_rename_needed() { + let output = "ID\tName\tType\tEnabled\n5\tSunbeam Auth\tOAuth2\ttrue\n"; + let mut rename_id: Option = None; + for line in output.lines().skip(1) { + let parts: Vec<&str> = line.split('\t').collect(); + if parts.len() >= 3 { + let name = parts[1].trim(); + let typ = parts[2].trim(); + if name == "Sunbeam Auth" || (name.starts_with("Sunbeam") && typ == "OAuth2") { + rename_id = Some(parts[0].trim().to_string()); + } + } + } + assert_eq!(rename_id, Some("5".to_string())); + } + + #[test] + fn test_gitea_api_response_parsing() { + // Simulate a successful org creation response + let json_str = r#"{"id": 1, "username": "studio"}"#; + let val: Value = serde_json::from_str(json_str).unwrap(); + assert!(val.get("id").is_some()); + + // Simulate an "already exists" response + let json_str = r#"{"message": "organization already exists"}"#; + let val: Value = serde_json::from_str(json_str).unwrap(); + assert!(val + .get("message") + .unwrap() + .as_str() + .unwrap() + .to_lowercase() + .contains("already")); + } + + #[test] + fn test_admin_visibility_patch_body() { + let data = serde_json::json!({ + "source_id": 0, + "login_name": GITEA_ADMIN_USER, + "email": GITEA_ADMIN_EMAIL, + "visibility": "private", + }); + assert_eq!(data["login_name"], "gitea_admin"); + assert_eq!(data["visibility"], "private"); + } +} diff --git a/sunbeam-sdk/src/manifests/mod.rs b/sunbeam-sdk/src/manifests/mod.rs new file mode 100644 index 0000000..f0761b9 --- /dev/null +++ b/sunbeam-sdk/src/manifests/mod.rs @@ -0,0 +1,880 @@ +use crate::error::Result; +use crate::constants::MANAGED_NS; + +/// Return only the YAML documents that belong to the given namespace. +pub fn filter_by_namespace(manifests: &str, namespace: &str) -> String { + let mut kept = Vec::new(); + for doc in manifests.split("\n---") { + let doc = doc.trim(); + if doc.is_empty() { + continue; + } + let has_ns = doc.contains(&format!("namespace: {namespace}")); + let is_ns_resource = + doc.contains("kind: Namespace") && doc.contains(&format!("name: {namespace}")); + if has_ns || is_ns_resource { + kept.push(doc); + } + } + if kept.is_empty() { + return String::new(); + } + format!("---\n{}\n", kept.join("\n---\n")) +} + +/// Build kustomize overlay for env, substitute domain/email, apply via kube-rs. +/// +/// Runs a second convergence pass if cert-manager is present in the overlay — +/// cert-manager registers a ValidatingWebhook that must be running before +/// ClusterIssuer / Certificate resources can be created. +pub async fn cmd_apply(env: &str, domain: &str, email: &str, namespace: &str) -> Result<()> { + // Fall back to config for ACME email if not provided via CLI flag. + let email = if email.is_empty() { + crate::config::load_config().acme_email + } else { + email.to_string() + }; + + let infra_dir = crate::config::get_infra_dir(); + + let (resolved_domain, overlay) = if env == "production" { + let d = if domain.is_empty() { + crate::kube::get_domain().await? + } else { + domain.to_string() + }; + if d.is_empty() { + bail!("--domain is required for production apply on first deploy"); + } + let overlay = infra_dir.join("overlays").join("production"); + (d, overlay) + } else { + // Local: discover domain from Lima IP + let d = crate::kube::get_domain().await?; + let overlay = infra_dir.join("overlays").join("local"); + (d, overlay) + }; + + let scope = if namespace.is_empty() { + String::new() + } else { + format!(" [{namespace}]") + }; + crate::output::step(&format!( + "Applying manifests (env: {env}, domain: {resolved_domain}){scope}..." + )); + + if env == "local" { + apply_mkcert_ca_configmap().await; + } + + let ns_list = if namespace.is_empty() { + None + } else { + Some(vec![namespace.to_string()]) + }; + pre_apply_cleanup(ns_list.as_deref()).await; + + let before = snapshot_configmaps().await; + let mut manifests = + crate::kube::kustomize_build(&overlay, &resolved_domain, &email).await?; + + if !namespace.is_empty() { + manifests = filter_by_namespace(&manifests, namespace); + if manifests.trim().is_empty() { + crate::output::warn(&format!( + "No resources found for namespace '{namespace}' -- check the name and try again." + )); + return Ok(()); + } + } + + // First pass: may emit errors for resources that depend on webhooks not yet running + if let Err(e) = crate::kube::kube_apply(&manifests).await { + crate::output::warn(&format!("First apply pass had errors (may be expected): {e}")); + } + + // If cert-manager is in the overlay, wait for its webhook then re-apply + let cert_manager_present = overlay + .join("../../base/cert-manager") + .exists(); + + if cert_manager_present && namespace.is_empty() { + if wait_for_webhook("cert-manager", "cert-manager-webhook", 120).await { + crate::output::ok("Running convergence pass for cert-manager resources..."); + let manifests2 = + crate::kube::kustomize_build(&overlay, &resolved_domain, &email).await?; + crate::kube::kube_apply(&manifests2).await?; + } + } + + restart_for_changed_configmaps(&before, &snapshot_configmaps().await).await; + + // Post-apply hooks + if namespace.is_empty() || namespace == "matrix" { + patch_tuwunel_oauth2_redirect(&resolved_domain).await; + inject_opensearch_model_id().await; + } + if namespace.is_empty() || namespace == "data" { + ensure_opensearch_ml().await; + } + + crate::output::ok("Applied."); + Ok(()) +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +/// Delete immutable resources that must be re-created on each apply. +async fn pre_apply_cleanup(namespaces: Option<&[String]>) { + let ns_list: Vec<&str> = match namespaces { + Some(ns) => ns.iter().map(|s| s.as_str()).collect(), + None => MANAGED_NS.to_vec(), + }; + + crate::output::ok("Cleaning up immutable Jobs and test Pods..."); + + // Prune stale VaultStaticSecrets that share a name with VaultDynamicSecrets + prune_stale_vault_static_secrets(&ns_list).await; + + for ns in &ns_list { + // Delete all jobs + let client = match crate::kube::get_client().await { + Ok(c) => c, + Err(e) => { + crate::output::warn(&format!("Failed to get kube client: {e}")); + return; + } + }; + let jobs: kube::api::Api = + kube::api::Api::namespaced(client.clone(), ns); + if let Ok(job_list) = jobs.list(&kube::api::ListParams::default()).await { + for job in job_list.items { + if let Some(name) = &job.metadata.name { + let dp = kube::api::DeleteParams::default(); + let _ = jobs.delete(name, &dp).await; + } + } + } + + // Delete test pods + let pods: kube::api::Api = + kube::api::Api::namespaced(client.clone(), ns); + if let Ok(pod_list) = pods.list(&kube::api::ListParams::default()).await { + for pod in pod_list.items { + if let Some(name) = &pod.metadata.name { + if name.ends_with("-test-connection") + || name.ends_with("-server-test") + || name.ends_with("-test") + { + let dp = kube::api::DeleteParams::default(); + let _ = pods.delete(name, &dp).await; + } + } + } + } + } +} + +/// Prune VaultStaticSecrets that share a name with VaultDynamicSecrets in the same namespace. +async fn prune_stale_vault_static_secrets(namespaces: &[&str]) { + let client = match crate::kube::get_client().await { + Ok(c) => c, + Err(e) => { + crate::output::warn(&format!("Failed to get kube client for VSS pruning: {e}")); + return; + } + }; + + let vss_ar = kube::api::ApiResource { + group: "secrets.hashicorp.com".into(), + version: "v1beta1".into(), + api_version: "secrets.hashicorp.com/v1beta1".into(), + kind: "VaultStaticSecret".into(), + plural: "vaultstaticsecrets".into(), + }; + + let vds_ar = kube::api::ApiResource { + group: "secrets.hashicorp.com".into(), + version: "v1beta1".into(), + api_version: "secrets.hashicorp.com/v1beta1".into(), + kind: "VaultDynamicSecret".into(), + plural: "vaultdynamicsecrets".into(), + }; + + for ns in namespaces { + let vss_api: kube::api::Api = + kube::api::Api::namespaced_with(client.clone(), ns, &vss_ar); + let vds_api: kube::api::Api = + kube::api::Api::namespaced_with(client.clone(), ns, &vds_ar); + + let vss_list = match vss_api.list(&kube::api::ListParams::default()).await { + Ok(l) => l, + Err(_) => continue, + }; + let vds_list = match vds_api.list(&kube::api::ListParams::default()).await { + Ok(l) => l, + Err(_) => continue, + }; + + let vds_names: std::collections::HashSet = vds_list + .items + .iter() + .filter_map(|o| o.metadata.name.clone()) + .collect(); + + for vss in &vss_list.items { + if let Some(name) = &vss.metadata.name { + if vds_names.contains(name) { + crate::output::ok(&format!( + "Pruning stale VaultStaticSecret {ns}/{name} (replaced by VaultDynamicSecret)" + )); + let dp = kube::api::DeleteParams::default(); + let _ = vss_api.delete(name, &dp).await; + } + } + } + } +} + +/// Snapshot ConfigMap resourceVersions across managed namespaces. +async fn snapshot_configmaps() -> std::collections::HashMap { + let mut result = std::collections::HashMap::new(); + let client = match crate::kube::get_client().await { + Ok(c) => c, + Err(_) => return result, + }; + + for ns in MANAGED_NS { + let cms: kube::api::Api = + kube::api::Api::namespaced(client.clone(), ns); + if let Ok(cm_list) = cms.list(&kube::api::ListParams::default()).await { + for cm in cm_list.items { + if let (Some(name), Some(rv)) = ( + &cm.metadata.name, + &cm.metadata.resource_version, + ) { + result.insert(format!("{ns}/{name}"), rv.clone()); + } + } + } + } + result +} + +/// Restart deployments that mount any ConfigMap whose resourceVersion changed. +async fn restart_for_changed_configmaps( + before: &std::collections::HashMap, + after: &std::collections::HashMap, +) { + let mut changed_by_ns: std::collections::HashMap<&str, std::collections::HashSet<&str>> = + std::collections::HashMap::new(); + + for (key, rv) in after { + if before.get(key) != Some(rv) { + if let Some((ns, name)) = key.split_once('/') { + changed_by_ns.entry(ns).or_default().insert(name); + } + } + } + + if changed_by_ns.is_empty() { + return; + } + + let client = match crate::kube::get_client().await { + Ok(c) => c, + Err(_) => return, + }; + + for (ns, cm_names) in &changed_by_ns { + let deps: kube::api::Api = + kube::api::Api::namespaced(client.clone(), ns); + if let Ok(dep_list) = deps.list(&kube::api::ListParams::default()).await { + for dep in dep_list.items { + let dep_name = dep.metadata.name.as_deref().unwrap_or(""); + // Check if this deployment mounts any changed ConfigMap + let volumes = dep + .spec + .as_ref() + .and_then(|s| s.template.spec.as_ref()) + .and_then(|s| s.volumes.as_ref()); + + if let Some(vols) = volumes { + let mounts_changed = vols.iter().any(|v| { + if let Some(cm) = &v.config_map { + cm_names.contains(cm.name.as_str()) + } else { + false + } + }); + if mounts_changed { + crate::output::ok(&format!( + "Restarting {ns}/{dep_name} (ConfigMap updated)..." + )); + let _ = crate::kube::kube_rollout_restart(ns, dep_name).await; + } + } + } + } + } +} + +/// Wait for a webhook endpoint to become ready. +async fn wait_for_webhook(ns: &str, svc: &str, timeout_secs: u64) -> bool { + crate::output::ok(&format!( + "Waiting for {ns}/{svc} webhook (up to {timeout_secs}s)..." + )); + let deadline = + std::time::Instant::now() + std::time::Duration::from_secs(timeout_secs); + + let client = match crate::kube::get_client().await { + Ok(c) => c, + Err(_) => return false, + }; + let eps: kube::api::Api = + kube::api::Api::namespaced(client.clone(), ns); + + loop { + if std::time::Instant::now() > deadline { + crate::output::warn(&format!( + " {ns}/{svc} not ready after {timeout_secs}s -- continuing anyway." + )); + return false; + } + + if let Ok(Some(ep)) = eps.get_opt(svc).await { + let has_addr = ep + .subsets + .as_ref() + .and_then(|ss| ss.first()) + .and_then(|s| s.addresses.as_ref()) + .is_some_and(|a| !a.is_empty()); + if has_addr { + crate::output::ok(&format!(" {ns}/{svc} ready.")); + return true; + } + } + + tokio::time::sleep(std::time::Duration::from_secs(3)).await; + } +} + +/// Create/update gitea-mkcert-ca ConfigMap from the local mkcert root CA. +async fn apply_mkcert_ca_configmap() { + let caroot = tokio::process::Command::new("mkcert") + .arg("-CAROOT") + .output() + .await; + + let caroot_path = match caroot { + Ok(out) if out.status.success() => { + String::from_utf8_lossy(&out.stdout).trim().to_string() + } + _ => { + crate::output::warn("mkcert not found -- skipping gitea-mkcert-ca ConfigMap."); + return; + } + }; + + let ca_pem_path = std::path::Path::new(&caroot_path).join("rootCA.pem"); + let ca_pem = match std::fs::read_to_string(&ca_pem_path) { + Ok(s) => s, + Err(_) => { + crate::output::warn(&format!( + "mkcert root CA not found at {} -- skipping.", + ca_pem_path.display() + )); + return; + } + }; + + let cm = serde_json::json!({ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": {"name": "gitea-mkcert-ca", "namespace": "devtools"}, + "data": {"ca.crt": ca_pem}, + }); + + let manifest = serde_json::to_string(&cm).unwrap_or_default(); + if let Err(e) = crate::kube::kube_apply(&manifest).await { + crate::output::warn(&format!("Failed to apply gitea-mkcert-ca: {e}")); + } else { + crate::output::ok("gitea-mkcert-ca ConfigMap applied."); + } +} + +/// Patch the tuwunel OAuth2Client redirect URI with the actual client_id. +async fn patch_tuwunel_oauth2_redirect(domain: &str) { + let client_id = match crate::kube::kube_get_secret_field("matrix", "oidc-tuwunel", "CLIENT_ID") + .await + { + Ok(id) if !id.is_empty() => id, + _ => { + crate::output::warn( + "oidc-tuwunel secret not yet available -- skipping redirect URI patch.", + ); + return; + } + }; + + let redirect_uri = format!( + "https://messages.{domain}/_matrix/client/unstable/login/sso/callback/{client_id}" + ); + + // Patch the OAuth2Client CRD via kube-rs + let client = match crate::kube::get_client().await { + Ok(c) => c, + Err(_) => return, + }; + + let ar = kube::api::ApiResource { + group: "hydra.ory.sh".into(), + version: "v1alpha1".into(), + api_version: "hydra.ory.sh/v1alpha1".into(), + kind: "OAuth2Client".into(), + plural: "oauth2clients".into(), + }; + + let api: kube::api::Api = + kube::api::Api::namespaced_with(client.clone(), "matrix", &ar); + + let patch = serde_json::json!({ + "spec": { + "redirectUris": [redirect_uri] + } + }); + + let pp = kube::api::PatchParams::default(); + if let Err(e) = api + .patch("tuwunel", &pp, &kube::api::Patch::Merge(patch)) + .await + { + crate::output::warn(&format!("Failed to patch tuwunel OAuth2Client: {e}")); + } else { + crate::output::ok("Patched tuwunel OAuth2Client redirect URI."); + } +} + +// --------------------------------------------------------------------------- +// OpenSearch helpers (kube exec + curl inside pod) +// --------------------------------------------------------------------------- + +/// Call OpenSearch API via kube exec curl inside the opensearch pod. +async fn os_api(path: &str, method: &str, body: Option<&str>) -> Option { + let url = format!("http://localhost:9200{path}"); + let mut curl_args: Vec<&str> = vec!["curl", "-sf", &url]; + if method != "GET" { + curl_args.extend_from_slice(&["-X", method]); + } + let body_string; + if let Some(b) = body { + body_string = b.to_string(); + curl_args.extend_from_slice(&["-H", "Content-Type: application/json", "-d", &body_string]); + } + + // Build the full exec command: exec deploy/opensearch -n data -c opensearch -- curl ... + let exec_cmd = curl_args; + + match crate::kube::kube_exec("data", "opensearch-0", &exec_cmd, Some("opensearch")).await { + Ok((0, out)) if !out.is_empty() => Some(out), + _ => None, + } +} + +/// Inject OpenSearch model_id into matrix/opensearch-ml-config ConfigMap. +async fn inject_opensearch_model_id() { + let pipe_resp = + match os_api("/_ingest/pipeline/tuwunel_embedding_pipeline", "GET", None).await { + Some(r) => r, + None => { + crate::output::warn( + "OpenSearch ingest pipeline not found -- skipping model_id injection.", + ); + return; + } + }; + + let model_id = serde_json::from_str::(&pipe_resp) + .ok() + .and_then(|v| { + v.get("tuwunel_embedding_pipeline")? + .get("processors")? + .as_array()? + .iter() + .find_map(|p| { + p.get("text_embedding")? + .get("model_id")? + .as_str() + .map(String::from) + }) + }); + + let Some(model_id) = model_id else { + crate::output::warn( + "No model_id in ingest pipeline -- tuwunel hybrid search unavailable.", + ); + return; + }; + + // Check if ConfigMap already has this value + if let Ok(current) = + crate::kube::kube_get_secret_field("matrix", "opensearch-ml-config", "model_id").await + { + if current == model_id { + return; + } + } + + let cm = serde_json::json!({ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": {"name": "opensearch-ml-config", "namespace": "matrix"}, + "data": {"model_id": &model_id}, + }); + + let manifest = serde_json::to_string(&cm).unwrap_or_default(); + if let Err(e) = crate::kube::kube_apply(&manifest).await { + crate::output::warn(&format!("Failed to inject OpenSearch model_id: {e}")); + } else { + crate::output::ok(&format!( + "Injected OpenSearch model_id ({model_id}) into matrix/opensearch-ml-config." + )); + } +} + +/// Configure OpenSearch ML Commons for neural search. +/// +/// 1. Sets cluster settings to allow ML on data nodes. +/// 2. Registers and deploys all-mpnet-base-v2 (pre-trained, 384-dim). +/// 3. Creates ingest + search pipelines for hybrid BM25+neural scoring. +async fn ensure_opensearch_ml() { + if os_api("/_cluster/health", "GET", None).await.is_none() { + crate::output::warn("OpenSearch not reachable -- skipping ML setup."); + return; + } + + // 1. ML Commons cluster settings + let settings = serde_json::json!({ + "persistent": { + "plugins.ml_commons.only_run_on_ml_node": false, + "plugins.ml_commons.native_memory_threshold": 90, + "plugins.ml_commons.model_access_control_enabled": false, + "plugins.ml_commons.allow_registering_model_via_url": true, + } + }); + os_api( + "/_cluster/settings", + "PUT", + Some(&serde_json::to_string(&settings).unwrap()), + ) + .await; + + // 2. Check if model already registered and deployed + let search_body = + r#"{"query":{"match":{"name":"huggingface/sentence-transformers/all-mpnet-base-v2"}}}"#; + let search_resp = match os_api("/_plugins/_ml/models/_search", "POST", Some(search_body)).await + { + Some(r) => r, + None => { + crate::output::warn("OpenSearch ML search API failed -- skipping ML setup."); + return; + } + }; + + let resp: serde_json::Value = match serde_json::from_str(&search_resp) { + Ok(v) => v, + Err(_) => return, + }; + + let hits = resp + .get("hits") + .and_then(|h| h.get("hits")) + .and_then(|h| h.as_array()) + .cloned() + .unwrap_or_default(); + + let mut model_id: Option = None; + let mut already_deployed = false; + + for hit in &hits { + let state = hit + .get("_source") + .and_then(|s| s.get("model_state")) + .and_then(|v| v.as_str()) + .unwrap_or(""); + let id = hit.get("_id").and_then(|v| v.as_str()).unwrap_or(""); + match state { + "DEPLOYED" => { + model_id = Some(id.to_string()); + already_deployed = true; + break; + } + "REGISTERED" | "DEPLOYING" => { + model_id = Some(id.to_string()); + } + _ => {} + } + } + + if !already_deployed { + if let Some(ref mid) = model_id { + // Registered but not deployed -- deploy it + crate::output::ok("Deploying OpenSearch ML model..."); + os_api( + &format!("/_plugins/_ml/models/{mid}/_deploy"), + "POST", + None, + ) + .await; + for _ in 0..30 { + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + if let Some(r) = + os_api(&format!("/_plugins/_ml/models/{mid}"), "GET", None).await + { + if r.contains("\"DEPLOYED\"") { + break; + } + } + } + } else { + // Register from pre-trained hub + crate::output::ok("Registering OpenSearch ML model (all-mpnet-base-v2)..."); + let reg_body = serde_json::json!({ + "name": "huggingface/sentence-transformers/all-mpnet-base-v2", + "version": "1.0.1", + "model_format": "TORCH_SCRIPT", + }); + let reg_resp = match os_api( + "/_plugins/_ml/models/_register", + "POST", + Some(&serde_json::to_string(®_body).unwrap()), + ) + .await + { + Some(r) => r, + None => { + crate::output::warn("Failed to register ML model -- skipping."); + return; + } + }; + + let task_id = serde_json::from_str::(®_resp) + .ok() + .and_then(|v| v.get("task_id")?.as_str().map(String::from)) + .unwrap_or_default(); + + if task_id.is_empty() { + crate::output::warn("No task_id from model registration -- skipping."); + return; + } + + crate::output::ok("Waiting for model registration..."); + let mut registered_id = None; + for _ in 0..60 { + tokio::time::sleep(std::time::Duration::from_secs(10)).await; + if let Some(task_resp) = + os_api(&format!("/_plugins/_ml/tasks/{task_id}"), "GET", None).await + { + if let Ok(task) = serde_json::from_str::(&task_resp) { + match task.get("state").and_then(|v| v.as_str()).unwrap_or("") { + "COMPLETED" => { + registered_id = task + .get("model_id") + .and_then(|v| v.as_str()) + .map(String::from); + break; + } + "FAILED" => { + crate::output::warn(&format!( + "ML model registration failed: {task_resp}" + )); + return; + } + _ => {} + } + } + } + } + + let Some(mid) = registered_id else { + crate::output::warn("ML model registration timed out."); + return; + }; + + crate::output::ok("Deploying ML model..."); + os_api( + &format!("/_plugins/_ml/models/{mid}/_deploy"), + "POST", + None, + ) + .await; + for _ in 0..30 { + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + if let Some(r) = + os_api(&format!("/_plugins/_ml/models/{mid}"), "GET", None).await + { + if r.contains("\"DEPLOYED\"") { + break; + } + } + } + model_id = Some(mid); + } + } + + let Some(model_id) = model_id else { + crate::output::warn("No ML model available -- skipping pipeline setup."); + return; + }; + + // 3. Ingest pipeline + let ingest = serde_json::json!({ + "description": "Tuwunel message embedding pipeline", + "processors": [{"text_embedding": { + "model_id": &model_id, + "field_map": {"body": "embedding"}, + }}], + }); + os_api( + "/_ingest/pipeline/tuwunel_embedding_pipeline", + "PUT", + Some(&serde_json::to_string(&ingest).unwrap()), + ) + .await; + + // 4. Search pipeline + let search = serde_json::json!({ + "description": "Tuwunel hybrid BM25+neural search pipeline", + "phase_results_processors": [{"normalization-processor": { + "normalization": {"technique": "min_max"}, + "combination": { + "technique": "arithmetic_mean", + "parameters": {"weights": [0.3, 0.7]}, + }, + }}], + }); + os_api( + "/_search/pipeline/tuwunel_hybrid_pipeline", + "PUT", + Some(&serde_json::to_string(&search).unwrap()), + ) + .await; + + crate::output::ok(&format!("OpenSearch ML ready (model: {model_id}).")); +} + +#[cfg(test)] +mod tests { + use super::*; + + const MULTI_DOC: &str = "\ +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: meet-config + namespace: lasuite +data: + FOO: bar +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: meet-backend + namespace: lasuite +spec: + replicas: 1 +--- +apiVersion: v1 +kind: Namespace +metadata: + name: lasuite +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: pingora-config + namespace: ingress +data: + config.toml: | + hello +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: pingora + namespace: ingress +spec: + replicas: 1 +"; + + #[test] + fn test_keeps_matching_namespace() { + let result = filter_by_namespace(MULTI_DOC, "lasuite"); + assert!(result.contains("name: meet-config")); + assert!(result.contains("name: meet-backend")); + } + + #[test] + fn test_excludes_other_namespaces() { + let result = filter_by_namespace(MULTI_DOC, "lasuite"); + assert!(!result.contains("namespace: ingress")); + assert!(!result.contains("name: pingora-config")); + assert!(!result.contains("name: pingora\n")); + } + + #[test] + fn test_includes_namespace_resource_itself() { + let result = filter_by_namespace(MULTI_DOC, "lasuite"); + assert!(result.contains("kind: Namespace")); + } + + #[test] + fn test_ingress_filter() { + let result = filter_by_namespace(MULTI_DOC, "ingress"); + assert!(result.contains("name: pingora-config")); + assert!(result.contains("name: pingora")); + assert!(!result.contains("namespace: lasuite")); + } + + #[test] + fn test_unknown_namespace_returns_empty() { + let result = filter_by_namespace(MULTI_DOC, "nonexistent"); + assert!(result.trim().is_empty()); + } + + #[test] + fn test_empty_input_returns_empty() { + let result = filter_by_namespace("", "lasuite"); + assert!(result.trim().is_empty()); + } + + #[test] + fn test_result_starts_with_separator() { + let result = filter_by_namespace(MULTI_DOC, "lasuite"); + assert!(result.starts_with("---")); + } + + #[test] + fn test_does_not_include_namespace_resource_for_wrong_ns() { + let result = filter_by_namespace(MULTI_DOC, "ingress"); + assert!(!result.contains("kind: Namespace")); + } + + #[test] + fn test_single_doc_matching() { + let doc = "apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: x\n namespace: ory\n"; + let result = filter_by_namespace(doc, "ory"); + assert!(result.contains("name: x")); + } + + #[test] + fn test_single_doc_not_matching() { + let doc = "apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: x\n namespace: ory\n"; + let result = filter_by_namespace(doc, "lasuite"); + assert!(result.trim().is_empty()); + } +} diff --git a/sunbeam-sdk/src/services/mod.rs b/sunbeam-sdk/src/services/mod.rs new file mode 100644 index 0000000..1f69109 --- /dev/null +++ b/sunbeam-sdk/src/services/mod.rs @@ -0,0 +1,573 @@ +//! Service management — status, logs, restart. + +use crate::error::{Result, SunbeamError}; +use k8s_openapi::api::core::v1::Pod; +use kube::api::{Api, DynamicObject, ListParams, LogParams}; +use kube::ResourceExt; +use std::collections::BTreeMap; +use crate::constants::MANAGED_NS; +use crate::kube::{get_client, kube_rollout_restart, parse_target}; +use crate::output::{ok, step, warn}; + +/// Services that can be rollout-restarted, as (namespace, deployment) pairs. +pub const SERVICES_TO_RESTART: &[(&str, &str)] = &[ + ("ory", "hydra"), + ("ory", "kratos"), + ("ory", "login-ui"), + ("devtools", "gitea"), + ("storage", "seaweedfs-filer"), + ("lasuite", "hive"), + ("lasuite", "people-backend"), + ("lasuite", "people-frontend"), + ("lasuite", "people-celery-worker"), + ("lasuite", "people-celery-beat"), + ("lasuite", "projects"), + ("matrix", "tuwunel"), + ("media", "livekit-server"), +]; + +// --------------------------------------------------------------------------- +// Status helpers +// --------------------------------------------------------------------------- + +/// Parsed pod row for display. +struct PodRow { + ns: String, + name: String, + ready: String, + status: String, +} + +fn icon_for_status(status: &str) -> &'static str { + match status { + "Running" | "Completed" | "Succeeded" => "\u{2713}", + "Pending" => "\u{25cb}", + "Failed" => "\u{2717}", + _ => "?", + } +} + +fn is_unhealthy(pod: &Pod) -> bool { + let status = pod.status.as_ref(); + let phase = status + .and_then(|s| s.phase.as_deref()) + .unwrap_or("Unknown"); + + match phase { + "Running" => { + // Check all containers are ready. + let container_statuses = status + .and_then(|s| s.container_statuses.as_ref()); + if let Some(cs) = container_statuses { + let total = cs.len(); + let ready = cs.iter().filter(|c| c.ready).count(); + ready != total + } else { + true + } + } + "Succeeded" | "Completed" => false, + _ => true, + } +} + +fn pod_phase(pod: &Pod) -> String { + pod.status + .as_ref() + .and_then(|s| s.phase.clone()) + .unwrap_or_else(|| "Unknown".to_string()) +} + +fn pod_ready_str(pod: &Pod) -> String { + let cs = pod + .status + .as_ref() + .and_then(|s| s.container_statuses.as_ref()); + match cs { + Some(cs) => { + let total = cs.len(); + let ready = cs.iter().filter(|c| c.ready).count(); + format!("{ready}/{total}") + } + None => "0/0".to_string(), + } +} + +// --------------------------------------------------------------------------- +// VSO sync status +// --------------------------------------------------------------------------- + +async fn vso_sync_status() -> Result<()> { + step("VSO secret sync status..."); + + let client = get_client().await?; + let mut all_ok = true; + + // --- VaultStaticSecrets --- + { + let ar = kube::api::ApiResource { + group: "secrets.hashicorp.com".into(), + version: "v1beta1".into(), + api_version: "secrets.hashicorp.com/v1beta1".into(), + kind: "VaultStaticSecret".into(), + plural: "vaultstaticsecrets".into(), + }; + + let api: Api = Api::all_with(client.clone(), &ar); + let list = api.list(&ListParams::default()).await; + + if let Ok(list) = list { + // Group by namespace and sort + let mut grouped: BTreeMap> = BTreeMap::new(); + for obj in &list.items { + let ns = obj.namespace().unwrap_or_default(); + let name = obj.name_any(); + let mac = obj + .data + .get("status") + .and_then(|s| s.get("secretMAC")) + .and_then(|v| v.as_str()) + .unwrap_or(""); + let synced = !mac.is_empty() && mac != ""; + if !synced { + all_ok = false; + } + grouped.entry(ns).or_default().push((name, synced)); + } + for (ns, mut items) in grouped { + println!(" {ns} (VSS):"); + items.sort(); + for (name, synced) in items { + let icon = if synced { "\u{2713}" } else { "\u{2717}" }; + println!(" {icon} {name}"); + } + } + } + } + + // --- VaultDynamicSecrets --- + { + let ar = kube::api::ApiResource { + group: "secrets.hashicorp.com".into(), + version: "v1beta1".into(), + api_version: "secrets.hashicorp.com/v1beta1".into(), + kind: "VaultDynamicSecret".into(), + plural: "vaultdynamicsecrets".into(), + }; + + let api: Api = Api::all_with(client.clone(), &ar); + let list = api.list(&ListParams::default()).await; + + if let Ok(list) = list { + let mut grouped: BTreeMap> = BTreeMap::new(); + for obj in &list.items { + let ns = obj.namespace().unwrap_or_default(); + let name = obj.name_any(); + let renewed = obj + .data + .get("status") + .and_then(|s| s.get("lastRenewalTime")) + .and_then(|v| v.as_str()) + .unwrap_or("0"); + let synced = !renewed.is_empty() && renewed != "0" && renewed != ""; + if !synced { + all_ok = false; + } + grouped.entry(ns).or_default().push((name, synced)); + } + for (ns, mut items) in grouped { + println!(" {ns} (VDS):"); + items.sort(); + for (name, synced) in items { + let icon = if synced { "\u{2713}" } else { "\u{2717}" }; + println!(" {icon} {name}"); + } + } + } + } + + println!(); + if all_ok { + ok("All VSO secrets synced."); + } else { + warn("Some VSO secrets are not synced."); + } + Ok(()) +} + +// --------------------------------------------------------------------------- +// Public commands +// --------------------------------------------------------------------------- + +/// Show pod health, optionally filtered by namespace or namespace/service. +pub async fn cmd_status(target: Option<&str>) -> Result<()> { + step("Pod health across all namespaces..."); + + let client = get_client().await?; + let (ns_filter, svc_filter) = parse_target(target)?; + + let mut pods: Vec = Vec::new(); + + match (ns_filter, svc_filter) { + (None, _) => { + // All managed namespaces + let ns_set: std::collections::HashSet<&str> = + MANAGED_NS.iter().copied().collect(); + for ns in MANAGED_NS { + let api: Api = Api::namespaced(client.clone(), ns); + let lp = ListParams::default(); + if let Ok(list) = api.list(&lp).await { + for pod in list.items { + let pod_ns = pod.namespace().unwrap_or_default(); + if !ns_set.contains(pod_ns.as_str()) { + continue; + } + pods.push(PodRow { + ns: pod_ns, + name: pod.name_any(), + ready: pod_ready_str(&pod), + status: pod_phase(&pod), + }); + } + } + } + } + (Some(ns), None) => { + // All pods in a namespace + let api: Api = Api::namespaced(client.clone(), ns); + let lp = ListParams::default(); + if let Ok(list) = api.list(&lp).await { + for pod in list.items { + pods.push(PodRow { + ns: ns.to_string(), + name: pod.name_any(), + ready: pod_ready_str(&pod), + status: pod_phase(&pod), + }); + } + } + } + (Some(ns), Some(svc)) => { + // Specific service: filter by app label + let api: Api = Api::namespaced(client.clone(), ns); + let lp = ListParams::default().labels(&format!("app={svc}")); + if let Ok(list) = api.list(&lp).await { + for pod in list.items { + pods.push(PodRow { + ns: ns.to_string(), + name: pod.name_any(), + ready: pod_ready_str(&pod), + status: pod_phase(&pod), + }); + } + } + } + } + + if pods.is_empty() { + warn("No pods found in managed namespaces."); + return Ok(()); + } + + pods.sort_by(|a, b| (&a.ns, &a.name).cmp(&(&b.ns, &b.name))); + + let mut all_ok = true; + let mut cur_ns: Option<&str> = None; + for row in &pods { + if cur_ns != Some(&row.ns) { + println!(" {}:", row.ns); + cur_ns = Some(&row.ns); + } + let icon = icon_for_status(&row.status); + + let mut unhealthy = !matches!( + row.status.as_str(), + "Running" | "Completed" | "Succeeded" + ); + // For Running pods, check ready ratio + if !unhealthy && row.status == "Running" && row.ready.contains('/') { + let parts: Vec<&str> = row.ready.split('/').collect(); + if parts.len() == 2 && parts[0] != parts[1] { + unhealthy = true; + } + } + if unhealthy { + all_ok = false; + } + println!(" {icon} {:<50} {:<6} {}", row.name, row.ready, row.status); + } + + println!(); + if all_ok { + ok("All pods healthy."); + } else { + warn("Some pods are not ready."); + } + + vso_sync_status().await?; + Ok(()) +} + +/// Stream logs for a service. Target must include service name (e.g. ory/kratos). +pub async fn cmd_logs(target: &str, follow: bool) -> Result<()> { + let (ns_opt, name_opt) = parse_target(Some(target))?; + let ns = ns_opt.unwrap_or(""); + let name = match name_opt { + Some(n) => n, + None => bail!("Logs require a service name, e.g. 'ory/kratos'."), + }; + + let client = get_client().await?; + let api: Api = Api::namespaced(client.clone(), ns); + + // Find pods matching the app label + let lp = ListParams::default().labels(&format!("app={name}")); + let pod_list = api.list(&lp).await?; + + if pod_list.items.is_empty() { + bail!("No pods found for {ns}/{name}"); + } + + if follow { + // Stream logs from the first matching pod + let pod_name = pod_list.items[0].name_any(); + let mut lp = LogParams::default(); + lp.follow = true; + lp.tail_lines = Some(100); + + // log_stream returns a futures::AsyncBufRead — use the futures crate to read it + use futures::AsyncBufReadExt; + let stream = api.log_stream(&pod_name, &lp).await?; + let reader = futures::io::BufReader::new(stream); + let mut lines = reader.lines(); + use futures::StreamExt; + while let Some(line) = lines.next().await { + match line { + Ok(line) => println!("{line}"), + Err(e) => { + warn(&format!("Log stream error: {e}")); + break; + } + } + } + } else { + // Print logs from all matching pods + for pod in &pod_list.items { + let pod_name = pod.name_any(); + let mut lp = LogParams::default(); + lp.tail_lines = Some(100); + + match api.logs(&pod_name, &lp).await { + Ok(logs) => print!("{logs}"), + Err(e) => warn(&format!("Failed to get logs for {pod_name}: {e}")), + } + } + } + + Ok(()) +} + +/// Print raw pod output in YAML or JSON format. +pub async fn cmd_get(target: &str, output: &str) -> Result<()> { + let (ns_opt, name_opt) = parse_target(Some(target))?; + let ns = match ns_opt { + Some(n) if !n.is_empty() => n, + _ => bail!("get requires namespace/name, e.g. 'sunbeam get ory/kratos-abc'"), + }; + let name = match name_opt { + Some(n) => n, + None => bail!("get requires namespace/name, e.g. 'sunbeam get ory/kratos-abc'"), + }; + + let client = get_client().await?; + let api: Api = Api::namespaced(client.clone(), ns); + + let pod = api + .get_opt(name) + .await? + .ok_or_else(|| SunbeamError::kube(format!("Pod {ns}/{name} not found.")))?; + + let text = match output { + "json" => serde_json::to_string_pretty(&pod)?, + _ => serde_yaml::to_string(&pod)?, + }; + println!("{text}"); + Ok(()) +} + +/// Restart deployments. None=all, 'ory'=namespace, 'ory/kratos'=specific. +pub async fn cmd_restart(target: Option<&str>) -> Result<()> { + step("Restarting services..."); + + let (ns_filter, svc_filter) = parse_target(target)?; + + let matched: Vec<(&str, &str)> = match (ns_filter, svc_filter) { + (None, _) => SERVICES_TO_RESTART.to_vec(), + (Some(ns), None) => SERVICES_TO_RESTART + .iter() + .filter(|(n, _)| *n == ns) + .copied() + .collect(), + (Some(ns), Some(name)) => SERVICES_TO_RESTART + .iter() + .filter(|(n, d)| *n == ns && *d == name) + .copied() + .collect(), + }; + + if matched.is_empty() { + warn(&format!( + "No matching services for target: {}", + target.unwrap_or("(none)") + )); + return Ok(()); + } + + for (ns, dep) in &matched { + if let Err(e) = kube_rollout_restart(ns, dep).await { + warn(&format!("Failed to restart {ns}/{dep}: {e}")); + } + } + ok("Done."); + Ok(()) +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_managed_ns_contains_expected() { + assert!(MANAGED_NS.contains(&"ory")); + assert!(MANAGED_NS.contains(&"data")); + assert!(MANAGED_NS.contains(&"devtools")); + assert!(MANAGED_NS.contains(&"ingress")); + assert!(MANAGED_NS.contains(&"lasuite")); + assert!(MANAGED_NS.contains(&"matrix")); + assert!(MANAGED_NS.contains(&"media")); + assert!(MANAGED_NS.contains(&"storage")); + assert!(MANAGED_NS.contains(&"monitoring")); + assert!(MANAGED_NS.contains(&"vault-secrets-operator")); + assert_eq!(MANAGED_NS.len(), 10); + } + + #[test] + fn test_services_to_restart_contains_expected() { + assert!(SERVICES_TO_RESTART.contains(&("ory", "hydra"))); + assert!(SERVICES_TO_RESTART.contains(&("ory", "kratos"))); + assert!(SERVICES_TO_RESTART.contains(&("ory", "login-ui"))); + assert!(SERVICES_TO_RESTART.contains(&("devtools", "gitea"))); + assert!(SERVICES_TO_RESTART.contains(&("storage", "seaweedfs-filer"))); + assert!(SERVICES_TO_RESTART.contains(&("lasuite", "hive"))); + assert!(SERVICES_TO_RESTART.contains(&("matrix", "tuwunel"))); + assert!(SERVICES_TO_RESTART.contains(&("media", "livekit-server"))); + assert_eq!(SERVICES_TO_RESTART.len(), 13); + } + + #[test] + fn test_icon_for_status() { + assert_eq!(icon_for_status("Running"), "\u{2713}"); + assert_eq!(icon_for_status("Completed"), "\u{2713}"); + assert_eq!(icon_for_status("Succeeded"), "\u{2713}"); + assert_eq!(icon_for_status("Pending"), "\u{25cb}"); + assert_eq!(icon_for_status("Failed"), "\u{2717}"); + assert_eq!(icon_for_status("Unknown"), "?"); + assert_eq!(icon_for_status("CrashLoopBackOff"), "?"); + } + + #[test] + fn test_restart_filter_namespace() { + let matched: Vec<(&str, &str)> = SERVICES_TO_RESTART + .iter() + .filter(|(n, _)| *n == "ory") + .copied() + .collect(); + assert_eq!(matched.len(), 3); + assert!(matched.contains(&("ory", "hydra"))); + assert!(matched.contains(&("ory", "kratos"))); + assert!(matched.contains(&("ory", "login-ui"))); + } + + #[test] + fn test_restart_filter_specific() { + let matched: Vec<(&str, &str)> = SERVICES_TO_RESTART + .iter() + .filter(|(n, d)| *n == "ory" && *d == "kratos") + .copied() + .collect(); + assert_eq!(matched.len(), 1); + assert_eq!(matched[0], ("ory", "kratos")); + } + + #[test] + fn test_restart_filter_no_match() { + let matched: Vec<(&str, &str)> = SERVICES_TO_RESTART + .iter() + .filter(|(n, d)| *n == "nonexistent" && *d == "nosuch") + .copied() + .collect(); + assert!(matched.is_empty()); + } + + #[test] + fn test_restart_filter_all() { + let matched: Vec<(&str, &str)> = SERVICES_TO_RESTART.to_vec(); + assert_eq!(matched.len(), 13); + } + + #[test] + fn test_pod_ready_string_format() { + // Verify format: "N/M" + let ready = "2/3"; + let parts: Vec<&str> = ready.split('/').collect(); + assert_eq!(parts.len(), 2); + assert_ne!(parts[0], parts[1]); // unhealthy + } + + #[test] + fn test_unhealthy_detection_by_ready_ratio() { + // Simulate the ready ratio check used in cmd_status + let ready = "1/2"; + let status = "Running"; + let mut unhealthy = !matches!(status, "Running" | "Completed" | "Succeeded"); + if !unhealthy && status == "Running" && ready.contains('/') { + let parts: Vec<&str> = ready.split('/').collect(); + if parts.len() == 2 && parts[0] != parts[1] { + unhealthy = true; + } + } + assert!(unhealthy); + } + + #[test] + fn test_healthy_detection_by_ready_ratio() { + let ready = "2/2"; + let status = "Running"; + let mut unhealthy = !matches!(status, "Running" | "Completed" | "Succeeded"); + if !unhealthy && status == "Running" && ready.contains('/') { + let parts: Vec<&str> = ready.split('/').collect(); + if parts.len() == 2 && parts[0] != parts[1] { + unhealthy = true; + } + } + assert!(!unhealthy); + } + + #[test] + fn test_completed_pods_are_healthy() { + let status = "Completed"; + let unhealthy = !matches!(status, "Running" | "Completed" | "Succeeded"); + assert!(!unhealthy); + } + + #[test] + fn test_pending_pods_are_unhealthy() { + let status = "Pending"; + let unhealthy = !matches!(status, "Running" | "Completed" | "Succeeded"); + assert!(unhealthy); + } +} diff --git a/sunbeam-sdk/src/update/mod.rs b/sunbeam-sdk/src/update/mod.rs new file mode 100644 index 0000000..554d12e --- /dev/null +++ b/sunbeam-sdk/src/update/mod.rs @@ -0,0 +1,443 @@ +use crate::error::{Result, ResultExt}; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; +use std::fs; +use std::path::PathBuf; + +/// Compile-time commit SHA set by build.rs. +pub const COMMIT: &str = env!("SUNBEAM_COMMIT"); + +/// Compile-time build target triple set by build.rs. +pub const TARGET: &str = env!("SUNBEAM_TARGET"); + +/// Compile-time build date set by build.rs. +pub const BUILD_DATE: &str = env!("SUNBEAM_BUILD_DATE"); + +/// Artifact name prefix for this platform. +fn artifact_name() -> String { + format!("sunbeam-{TARGET}") +} + +/// Resolve the forge URL (Gitea instance). +/// +/// TODO: Once kube.rs exposes `get_domain()`, derive this automatically as +/// `https://src.{domain}`. For now we read the SUNBEAM_FORGE_URL environment +/// variable with a sensible fallback. +fn forge_url() -> String { + if let Ok(url) = std::env::var("SUNBEAM_FORGE_URL") { + return url.trim_end_matches('/').to_string(); + } + + // Derive from production_host domain in config + let config = crate::config::load_config(); + if !config.production_host.is_empty() { + // production_host is like "user@server.example.com" — extract domain + let host = config + .production_host + .split('@') + .last() + .unwrap_or(&config.production_host); + // Strip any leading subdomain segments that look like a hostname to get the base domain. + // For a host like "admin.sunbeam.pt", the forge is "src.sunbeam.pt". + // Heuristic: use the last two segments as the domain. + let parts: Vec<&str> = host.split('.').collect(); + if parts.len() >= 2 { + let domain = format!("{}.{}", parts[parts.len() - 2], parts[parts.len() - 1]); + return format!("https://src.{domain}"); + } + } + + // Hard fallback — will fail at runtime if not configured, which is fine. + String::new() +} + +/// Cache file location for background update checks. +fn update_cache_path() -> PathBuf { + dirs::data_dir() + .unwrap_or_else(|| dirs::home_dir().unwrap_or_else(|| PathBuf::from(".")).join(".local/share")) + .join("sunbeam") + .join("update-check.json") +} + +// --------------------------------------------------------------------------- +// Gitea API response types +// --------------------------------------------------------------------------- + +#[derive(Debug, Deserialize)] +struct BranchResponse { + commit: BranchCommit, +} + +#[derive(Debug, Deserialize)] +struct BranchCommit { + id: String, +} + +#[derive(Debug, Deserialize)] +struct ArtifactListResponse { + artifacts: Vec, +} + +#[derive(Debug, Deserialize)] +struct Artifact { + name: String, + id: u64, +} + +// --------------------------------------------------------------------------- +// Update-check cache +// --------------------------------------------------------------------------- + +#[derive(Debug, Serialize, Deserialize)] +struct UpdateCache { + last_check: DateTime, + latest_commit: String, + current_commit: String, +} + +// --------------------------------------------------------------------------- +// Public API +// --------------------------------------------------------------------------- + +/// Print version information. +pub fn cmd_version() { + println!("sunbeam {COMMIT}"); + println!(" target: {TARGET}"); + println!(" built: {BUILD_DATE}"); +} + +/// Self-update from the latest mainline commit via Gitea CI artifacts. +pub async fn cmd_update() -> Result<()> { + let base = forge_url(); + if base.is_empty() { + bail!( + "Forge URL not configured. Set SUNBEAM_FORGE_URL or configure a \ + production host via `sunbeam config set --host`." + ); + } + + crate::output::step("Checking for updates..."); + + let client = reqwest::Client::new(); + + // 1. Check latest commit on mainline + let latest_commit = fetch_latest_commit(&client, &base).await?; + let short_latest = &latest_commit[..std::cmp::min(8, latest_commit.len())]; + + crate::output::ok(&format!("Current: {COMMIT}")); + crate::output::ok(&format!("Latest: {short_latest}")); + + if latest_commit.starts_with(COMMIT) || COMMIT.starts_with(&latest_commit[..std::cmp::min(COMMIT.len(), latest_commit.len())]) { + crate::output::ok("Already up to date."); + return Ok(()); + } + + // 2. Find the CI artifact for our platform + crate::output::step("Downloading update..."); + let wanted = artifact_name(); + + let artifacts = fetch_artifacts(&client, &base).await?; + let binary_artifact = artifacts + .iter() + .find(|a| a.name == wanted) + .with_ctx(|| format!("No artifact found for platform '{wanted}'"))?; + + let checksums_artifact = artifacts + .iter() + .find(|a| a.name == "checksums.txt" || a.name == "checksums"); + + // 3. Download the binary + let binary_url = format!( + "{base}/api/v1/repos/studio/cli/actions/artifacts/{id}", + id = binary_artifact.id + ); + let binary_bytes = client + .get(&binary_url) + .send() + .await? + .error_for_status() + .ctx("Failed to download binary artifact")? + .bytes() + .await?; + + crate::output::ok(&format!("Downloaded {} bytes", binary_bytes.len())); + + // 4. Verify SHA256 if checksums artifact exists + if let Some(checksums) = checksums_artifact { + let checksums_url = format!( + "{base}/api/v1/repos/studio/cli/actions/artifacts/{id}", + id = checksums.id + ); + let checksums_text = client + .get(&checksums_url) + .send() + .await? + .error_for_status() + .ctx("Failed to download checksums")? + .text() + .await?; + + verify_checksum(&binary_bytes, &wanted, &checksums_text)?; + crate::output::ok("SHA256 checksum verified."); + } else { + crate::output::warn("No checksums artifact found; skipping verification."); + } + + // 5. Atomic self-replace + crate::output::step("Installing update..."); + let current_exe = std::env::current_exe().ctx("Failed to determine current executable path")?; + atomic_replace(¤t_exe, &binary_bytes)?; + + crate::output::ok(&format!( + "Updated sunbeam {COMMIT} -> {short_latest}" + )); + + // Update the cache so background check knows we are current + let _ = write_cache(&UpdateCache { + last_check: Utc::now(), + latest_commit: latest_commit.clone(), + current_commit: latest_commit, + }); + + Ok(()) +} + +/// Background update check. Returns a notification message if a newer version +/// is available, or None if up-to-date / on error / checked too recently. +/// +/// This function never blocks for long and never returns errors — it silently +/// returns None on any failure. +pub async fn check_update_background() -> Option { + // Read cache + let cache_path = update_cache_path(); + if let Ok(data) = fs::read_to_string(&cache_path) { + if let Ok(cache) = serde_json::from_str::(&data) { + let age = Utc::now().signed_duration_since(cache.last_check); + if age.num_seconds() < 3600 { + // Checked recently — just compare cached values + if cache.latest_commit.starts_with(COMMIT) + || COMMIT.starts_with(&cache.latest_commit[..std::cmp::min(COMMIT.len(), cache.latest_commit.len())]) + { + return None; // up to date + } + let short = &cache.latest_commit[..std::cmp::min(8, cache.latest_commit.len())]; + return Some(format!( + "A newer version of sunbeam is available ({short}). Run `sunbeam update` to upgrade." + )); + } + } + } + + // Time to check again + let base = forge_url(); + if base.is_empty() { + return None; + } + + let client = reqwest::Client::builder() + .timeout(std::time::Duration::from_secs(5)) + .build() + .ok()?; + + let latest = fetch_latest_commit(&client, &base).await.ok()?; + + let cache = UpdateCache { + last_check: Utc::now(), + latest_commit: latest.clone(), + current_commit: COMMIT.to_string(), + }; + let _ = write_cache(&cache); + + if latest.starts_with(COMMIT) + || COMMIT.starts_with(&latest[..std::cmp::min(COMMIT.len(), latest.len())]) + { + return None; + } + + let short = &latest[..std::cmp::min(8, latest.len())]; + Some(format!( + "A newer version of sunbeam is available ({short}). Run `sunbeam update` to upgrade." + )) +} + +// --------------------------------------------------------------------------- +// Internal helpers +// --------------------------------------------------------------------------- + +/// Fetch the latest commit SHA on the mainline branch. +async fn fetch_latest_commit(client: &reqwest::Client, forge_url: &str) -> Result { + let url = format!("{forge_url}/api/v1/repos/studio/cli/branches/mainline"); + let resp: BranchResponse = client + .get(&url) + .send() + .await? + .error_for_status() + .ctx("Failed to query mainline branch")? + .json() + .await?; + Ok(resp.commit.id) +} + +/// Fetch the list of CI artifacts for the repo. +async fn fetch_artifacts(client: &reqwest::Client, forge_url: &str) -> Result> { + let url = format!("{forge_url}/api/v1/repos/studio/cli/actions/artifacts"); + let resp: ArtifactListResponse = client + .get(&url) + .send() + .await? + .error_for_status() + .ctx("Failed to query CI artifacts")? + .json() + .await?; + Ok(resp.artifacts) +} + +/// Verify that the downloaded binary matches the expected SHA256 from checksums text. +/// +/// Checksums file format (one per line): +/// +fn verify_checksum(binary: &[u8], artifact_name: &str, checksums_text: &str) -> Result<()> { + let actual = { + let mut hasher = Sha256::new(); + hasher.update(binary); + format!("{:x}", hasher.finalize()) + }; + + for line in checksums_text.lines() { + // Split on whitespace — format is " " or " " + let mut parts = line.split_whitespace(); + if let (Some(expected_hash), Some(name)) = (parts.next(), parts.next()) { + if name == artifact_name { + if actual != expected_hash { + bail!( + "Checksum mismatch for {artifact_name}:\n expected: {expected_hash}\n actual: {actual}" + ); + } + return Ok(()); + } + } + } + + bail!("No checksum entry found for '{artifact_name}' in checksums file"); +} + +/// Atomically replace the binary at `target` with `new_bytes`. +/// +/// Writes to a temp file in the same directory, sets executable permissions, +/// then renames over the original. +fn atomic_replace(target: &std::path::Path, new_bytes: &[u8]) -> Result<()> { + let parent = target + .parent() + .ctx("Cannot determine parent directory of current executable")?; + + let tmp_path = parent.join(".sunbeam-update.tmp"); + + // Write new binary + fs::write(&tmp_path, new_bytes).ctx("Failed to write temporary update file")?; + + // Set executable permissions (unix) + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + fs::set_permissions(&tmp_path, fs::Permissions::from_mode(0o755)) + .ctx("Failed to set executable permissions")?; + } + + // Atomic rename + fs::rename(&tmp_path, target).ctx("Failed to replace current executable")?; + + Ok(()) +} + +/// Write the update-check cache to disk. +fn write_cache(cache: &UpdateCache) -> Result<()> { + let path = update_cache_path(); + if let Some(parent) = path.parent() { + fs::create_dir_all(parent)?; + } + let json = serde_json::to_string_pretty(cache)?; + fs::write(&path, json)?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_version_consts() { + // COMMIT, TARGET, BUILD_DATE are set at compile time + assert!(!COMMIT.is_empty()); + assert!(!TARGET.is_empty()); + assert!(!BUILD_DATE.is_empty()); + } + + #[test] + fn test_artifact_name() { + let name = artifact_name(); + assert!(name.starts_with("sunbeam-")); + assert!(name.contains(TARGET)); + } + + #[test] + fn test_verify_checksum_ok() { + let data = b"hello world"; + let mut hasher = Sha256::new(); + hasher.update(data); + let hash = format!("{:x}", hasher.finalize()); + + let checksums = format!("{hash} sunbeam-test"); + assert!(verify_checksum(data, "sunbeam-test", &checksums).is_ok()); + } + + #[test] + fn test_verify_checksum_mismatch() { + let checksums = "0000000000000000000000000000000000000000000000000000000000000000 sunbeam-test"; + assert!(verify_checksum(b"hello", "sunbeam-test", checksums).is_err()); + } + + #[test] + fn test_verify_checksum_missing_entry() { + let checksums = "abcdef1234567890 sunbeam-other"; + assert!(verify_checksum(b"hello", "sunbeam-test", checksums).is_err()); + } + + #[test] + fn test_update_cache_path() { + let path = update_cache_path(); + assert!(path.to_string_lossy().contains("sunbeam")); + assert!(path.to_string_lossy().ends_with("update-check.json")); + } + + #[test] + fn test_cache_roundtrip() { + let cache = UpdateCache { + last_check: Utc::now(), + latest_commit: "abc12345".to_string(), + current_commit: "def67890".to_string(), + }; + let json = serde_json::to_string(&cache).unwrap(); + let loaded: UpdateCache = serde_json::from_str(&json).unwrap(); + assert_eq!(loaded.latest_commit, "abc12345"); + assert_eq!(loaded.current_commit, "def67890"); + } + + #[tokio::test] + async fn test_check_update_background_returns_none_when_forge_url_empty() { + // When SUNBEAM_FORGE_URL is unset and there is no production_host config, + // forge_url() returns "" and check_update_background should return None + // without making any network requests. + // Clear the env var to ensure we hit the empty-URL path. + // SAFETY: This test is not run concurrently with other tests that depend on this env var. + unsafe { std::env::remove_var("SUNBEAM_FORGE_URL") }; + // Note: this test assumes no production_host is configured in the test + // environment, which is the default for CI/dev. If forge_url() returns + // a non-empty string (e.g. from config), the test may still pass because + // the background check silently returns None on network errors. + let result = check_update_background().await; + // Either None (empty forge URL or network error) — never panics. + // The key property: this completes quickly without hanging. + drop(result); + } +}