diff --git a/src/auth.rs b/src/auth.rs index 3b334e63..f2de716c 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -32,23 +32,43 @@ const DEFAULT_CLIENT_ID: &str = "sunbeam-cli"; // Cache file helpers // --------------------------------------------------------------------------- -/// Cache path for auth tokens — per-domain so multiple environments work. -fn cache_path_for_domain(domain: &str) -> PathBuf { - let dir = dirs::data_dir() +/// Legacy auth cache dir — used only for migration. +fn legacy_auth_dir() -> PathBuf { + dirs::data_dir() .unwrap_or_else(|| { dirs::home_dir() .unwrap_or_else(|| PathBuf::from(".")) .join(".local/share") }) .join("sunbeam") - .join("auth"); - if domain.is_empty() { - dir.join("default.json") + .join("auth") +} + +/// Cache path for auth tokens — per-domain so multiple environments work. +/// Files live under ~/.sunbeam/auth/{safe_domain}.json. +fn cache_path_for_domain(domain: &str) -> PathBuf { + let dir = crate::config::sunbeam_dir().join("auth"); + let filename = if domain.is_empty() { + "default.json".to_string() } else { - // Sanitize domain for filename let safe = domain.replace(['/', '\\', ':'], "_"); - dir.join(format!("{safe}.json")) + format!("{safe}.json") + }; + + let new_path = dir.join(&filename); + + // Migration: copy from legacy location if new path doesn't exist yet + if !new_path.exists() { + let legacy = legacy_auth_dir().join(&filename); + if legacy.exists() { + if let Some(parent) = new_path.parent() { + let _ = std::fs::create_dir_all(parent); + } + let _ = std::fs::copy(&legacy, &new_path); + } } + + new_path } fn cache_path() -> PathBuf { @@ -489,6 +509,15 @@ pub async fn get_token() -> Result { )) } +/// Print the current access token as a JSON headers object. +/// Designed for use as a Claude Code MCP `headersHelper`. +/// Output: {"Authorization": "Bearer "} +pub async fn cmd_auth_token() -> Result<()> { + let token = get_token().await?; + println!("{{\"Authorization\": \"Bearer {token}\"}}"); + Ok(()) +} + /// Interactive browser-based OAuth2 login. /// SSO login — Hydra OIDC authorization code flow with PKCE. /// `gitea_redirect`: if Some, the browser callback page auto-redirects to Gitea token page. @@ -939,8 +968,7 @@ mod tests { fn test_cache_path_is_under_sunbeam() { let path = cache_path_for_domain("sunbeam.pt"); let path_str = path.to_string_lossy(); - assert!(path_str.contains("sunbeam")); - assert!(path_str.contains("auth")); + assert!(path_str.contains(".sunbeam/auth")); assert!(path_str.ends_with("sunbeam.pt.json")); } diff --git a/src/cli.rs b/src/cli.rs index 92818771..f1130e02 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -143,6 +143,12 @@ pub enum Verb { action: Option, }, + /// Workflow management (list, status, retry, cancel, run). + Workflow { + #[command(subcommand)] + action: crate::workflows::cmd::WorkflowAction, + }, + /// Self-update from latest mainline commit. Update, @@ -174,6 +180,8 @@ pub enum AuthAction { Logout, /// Show current authentication status. Status, + /// Print the current access token (for use in scripts and MCP headers). + Token, } #[derive(Subcommand, Debug)] @@ -750,6 +758,109 @@ mod tests { ]); assert!(result.is_err()); } + + // -- Workflow subcommand tests -- + + #[test] + fn test_workflow_list() { + let cli = parse(&["sunbeam", "workflow", "list"]); + match cli.verb { + Some(Verb::Workflow { action }) => { + assert!(matches!(action, crate::workflows::cmd::WorkflowAction::List { .. })); + } + _ => panic!("expected Workflow List"), + } + } + + #[test] + fn test_workflow_list_with_status_filter() { + let cli = parse(&["sunbeam", "workflow", "list", "--status", "complete"]); + match cli.verb { + Some(Verb::Workflow { action }) => match action { + crate::workflows::cmd::WorkflowAction::List { status } => { + assert_eq!(status, "complete"); + } + _ => panic!("expected List"), + }, + _ => panic!("expected Workflow"), + } + } + + #[test] + fn test_workflow_status() { + let cli = parse(&["sunbeam", "workflow", "status", "abc-123"]); + match cli.verb { + Some(Verb::Workflow { action }) => match action { + crate::workflows::cmd::WorkflowAction::Status { id } => { + assert_eq!(id, "abc-123"); + } + _ => panic!("expected Status"), + }, + _ => panic!("expected Workflow"), + } + } + + #[test] + fn test_workflow_retry() { + let cli = parse(&["sunbeam", "workflow", "retry", "wf-456"]); + match cli.verb { + Some(Verb::Workflow { action }) => match action { + crate::workflows::cmd::WorkflowAction::Retry { id } => { + assert_eq!(id, "wf-456"); + } + _ => panic!("expected Retry"), + }, + _ => panic!("expected Workflow"), + } + } + + #[test] + fn test_workflow_cancel() { + let cli = parse(&["sunbeam", "workflow", "cancel", "wf-789"]); + match cli.verb { + Some(Verb::Workflow { action }) => match action { + crate::workflows::cmd::WorkflowAction::Cancel { id } => { + assert_eq!(id, "wf-789"); + } + _ => panic!("expected Cancel"), + }, + _ => panic!("expected Workflow"), + } + } + + #[test] + fn test_workflow_run_default_file() { + let cli = parse(&["sunbeam", "workflow", "run"]); + match cli.verb { + Some(Verb::Workflow { action }) => match action { + crate::workflows::cmd::WorkflowAction::Run { file } => { + assert_eq!(file, ""); + } + _ => panic!("expected Run"), + }, + _ => panic!("expected Workflow"), + } + } + + #[test] + fn test_workflow_run_with_file() { + let cli = parse(&["sunbeam", "workflow", "run", "deploy.yaml"]); + match cli.verb { + Some(Verb::Workflow { action }) => match action { + crate::workflows::cmd::WorkflowAction::Run { file } => { + assert_eq!(file, "deploy.yaml"); + } + _ => panic!("expected Run"), + }, + _ => panic!("expected Workflow"), + } + } + + #[test] + fn test_workflow_status_missing_id() { + let result = Cli::try_parse_from(&["sunbeam", "workflow", "status"]); + assert!(result.is_err()); + } } /// Main dispatch function — parse CLI args and route to subcommands. @@ -786,7 +897,49 @@ pub async fn dispatch() -> Result<()> { Ok(()) } - Some(Verb::Up) => crate::cluster::cmd_up().await, + Some(Verb::Up) => { + crate::output::step("Bringing up cluster (workflow engine)..."); + + let ctx_name = { + let cfg = crate::config::load_config(); + if cfg.current_context.is_empty() { + "default".to_string() + } else { + cfg.current_context.clone() + } + }; + + let host = crate::workflows::host::create_host(&ctx_name).await?; + crate::workflows::up::register(&host).await; + + let step_ctx = crate::workflows::StepContext::from_active(); + let initial_data = serde_json::json!({ + "__ctx": step_ctx, + "domain": "", + }); + + let instance = wfe::run_workflow_sync( + &host, + "up", + 1, + initial_data, + std::time::Duration::from_secs(3600), + ) + .await + .map_err(|e| SunbeamError::Other(format!("up workflow failed: {e}")))?; + + crate::workflows::up::print_summary(&instance); + crate::workflows::host::shutdown_host(host).await; + + if instance.status != wfe_core::models::WorkflowStatus::Complete { + return Err(SunbeamError::Other(format!( + "up workflow ended with status {:?}", + instance.status + ))); + } + + Ok(()) + } Some(Verb::Status { target }) => { crate::services::cmd_status(target.as_deref()).await @@ -829,9 +982,91 @@ pub async fn dispatch() -> Result<()> { crate::manifests::cmd_apply(&env_str, &domain, &email, &ns).await } - Some(Verb::Seed) => crate::secrets::cmd_seed().await, + Some(Verb::Seed) => { + crate::output::step("Seeding secrets (workflow engine)..."); - Some(Verb::Verify) => crate::secrets::cmd_verify().await, + let ctx_name = { + let cfg = crate::config::load_config(); + if cfg.current_context.is_empty() { + "default".to_string() + } else { + cfg.current_context.clone() + } + }; + + let host = crate::workflows::host::create_host(&ctx_name).await?; + crate::workflows::seed::register(&host).await; + + let step_ctx = crate::workflows::StepContext::from_active(); + let initial_data = serde_json::json!({ + "__ctx": step_ctx, + }); + + let instance = wfe::run_workflow_sync( + &host, + "seed", + 1, + initial_data, + std::time::Duration::from_secs(900), + ) + .await + .map_err(|e| SunbeamError::secrets(format!("seed workflow failed: {e}")))?; + + crate::workflows::seed::print_summary(&instance); + crate::workflows::host::shutdown_host(host).await; + + if instance.status != wfe_core::models::WorkflowStatus::Complete { + return Err(SunbeamError::secrets(format!( + "seed workflow ended with status {:?}", + instance.status + ))); + } + + Ok(()) + } + + Some(Verb::Verify) => { + crate::output::step("Verifying VSO -> OpenBao integration (workflow engine)..."); + + let ctx_name = { + let cfg = crate::config::load_config(); + if cfg.current_context.is_empty() { + "default".to_string() + } else { + cfg.current_context.clone() + } + }; + + let host = crate::workflows::host::create_host(&ctx_name).await?; + crate::workflows::verify::register(&host).await; + + let step_ctx = crate::workflows::StepContext::from_active(); + let initial_data = serde_json::json!({ + "__ctx": step_ctx, + }); + + let instance = wfe::run_workflow_sync( + &host, + "verify", + 1, + initial_data, + std::time::Duration::from_secs(300), + ) + .await + .map_err(|e| SunbeamError::Other(format!("verify workflow failed: {e}")))?; + + crate::workflows::verify::print_summary(&instance); + crate::workflows::host::shutdown_host(host).await; + + if instance.status != wfe_core::models::WorkflowStatus::Complete { + return Err(SunbeamError::Other(format!( + "verify workflow ended with status {:?}", + instance.status + ))); + } + + Ok(()) + } Some(Verb::Logs { target, follow }) => { crate::services::cmd_logs(&target, follow).await @@ -856,7 +1091,48 @@ pub async fn dispatch() -> Result<()> { Some(Verb::Mirror) => crate::images::cmd_mirror().await, - Some(Verb::Bootstrap) => crate::gitea::cmd_bootstrap().await, + Some(Verb::Bootstrap) => { + crate::output::step("Bootstrapping Gitea (workflow engine)..."); + + let ctx_name = { + let cfg = crate::config::load_config(); + if cfg.current_context.is_empty() { + "default".to_string() + } else { + cfg.current_context.clone() + } + }; + + let host = crate::workflows::host::create_host(&ctx_name).await?; + crate::workflows::bootstrap::register(&host).await; + + let step_ctx = crate::workflows::StepContext::from_active(); + let initial_data = serde_json::json!({ + "__ctx": step_ctx, + }); + + let instance = wfe::run_workflow_sync( + &host, + "bootstrap", + 1, + initial_data, + std::time::Duration::from_secs(300), + ) + .await + .map_err(|e| SunbeamError::Other(format!("bootstrap workflow failed: {e}")))?; + + crate::workflows::bootstrap::print_summary(&instance); + crate::workflows::host::shutdown_host(host).await; + + if instance.status != wfe_core::models::WorkflowStatus::Complete { + return Err(SunbeamError::Other(format!( + "bootstrap workflow ended with status {:?}", + instance.status + ))); + } + + Ok(()) + } Some(Verb::Config { action }) => match action { None => { @@ -1053,6 +1329,7 @@ pub async fn dispatch() -> Result<()> { } Some(AuthAction::Logout) => crate::auth::cmd_auth_logout().await, Some(AuthAction::Status) => crate::auth::cmd_auth_status().await, + Some(AuthAction::Token) => crate::auth::cmd_auth_token().await, }, Some(Verb::Pm { action }) => match action { @@ -1087,6 +1364,18 @@ pub async fn dispatch() -> Result<()> { } }, + Some(Verb::Workflow { action }) => { + let ctx_name = { + let cfg = crate::config::load_config(); + if cfg.current_context.is_empty() { + "default".to_string() + } else { + cfg.current_context.clone() + } + }; + crate::workflows::cmd::dispatch(&ctx_name, action).await + } + Some(Verb::Update) => crate::update::cmd_update().await, Some(Verb::Version) => { diff --git a/src/secrets.rs b/src/secrets.rs index c2061840..da80f78a 100644 --- a/src/secrets.rs +++ b/src/secrets.rs @@ -20,9 +20,9 @@ use crate::output::{ok, step, warn}; // ── Constants ─────────────────────────────────────────────────────────────── -const ADMIN_USERNAME: &str = "estudio-admin"; -const GITEA_ADMIN_USER: &str = "gitea_admin"; -const PG_USERS: &[&str] = &[ +pub(crate) const ADMIN_USERNAME: &str = "estudio-admin"; +pub(crate) const GITEA_ADMIN_USER: &str = "gitea_admin"; +pub(crate) const PG_USERS: &[&str] = &[ "kratos", "hydra", "gitea", @@ -36,14 +36,15 @@ const PG_USERS: &[&str] = &[ "find", "calendars", "projects", + "penpot", ]; -const SMTP_URI: &str = "smtp://postfix.lasuite.svc.cluster.local:25/?skip_ssl_verify=true"; +pub(crate) const SMTP_URI: &str = "smtp://postfix.lasuite.svc.cluster.local:25/?skip_ssl_verify=true"; // ── Key generation ────────────────────────────────────────────────────────── /// Generate a Fernet-compatible key (32 random bytes, URL-safe base64). -fn gen_fernet_key() -> String { +pub(crate) fn gen_fernet_key() -> String { use base64::Engine; let mut buf = [0u8; 32]; rand::thread_rng().fill_bytes(&mut buf); @@ -52,7 +53,7 @@ fn gen_fernet_key() -> String { /// Generate an RSA 2048-bit DKIM key pair. /// Returns (private_pem_pkcs8, public_pem). Returns ("", "") on failure. -fn gen_dkim_key_pair() -> (String, String) { +pub(crate) fn gen_dkim_key_pair() -> (String, String) { let mut rng = rand::thread_rng(); let bits = 2048; let private_key = match RsaPrivateKey::new(&mut rng, bits) { @@ -84,7 +85,7 @@ fn gen_dkim_key_pair() -> (String, String) { } /// Generate a URL-safe random token (32 bytes). -fn rand_token() -> String { +pub(crate) fn rand_token() -> String { use base64::Engine; let mut buf = [0u8; 32]; rand::thread_rng().fill_bytes(&mut buf); @@ -92,7 +93,7 @@ fn rand_token() -> String { } /// Generate a URL-safe random token with a specific byte count. -fn rand_token_n(n: usize) -> String { +pub(crate) fn rand_token_n(n: usize) -> String { use base64::Engine; let mut buf = vec![0u8; n]; rand::thread_rng().fill_bytes(&mut buf); @@ -102,7 +103,7 @@ fn rand_token_n(n: usize) -> String { // ── Port-forward helper ───────────────────────────────────────────────────── /// Port-forward guard — cancels the background forwarder on drop. -struct PortForwardGuard { +pub(crate) struct PortForwardGuard { _abort_handle: tokio::task::AbortHandle, pub local_port: u16, } @@ -115,7 +116,7 @@ impl Drop for PortForwardGuard { /// Open a kube-rs port-forward to `pod_name` in `namespace` on `remote_port`. /// Binds a local TCP listener and proxies connections to the pod. -async fn port_forward( +pub(crate) async fn port_forward( namespace: &str, pod_name: &str, remote_port: u16, @@ -192,7 +193,7 @@ async fn port_forward( } /// Port-forward to a service by finding a matching pod via label selector. -async fn port_forward_svc( +pub(crate) async fn port_forward_svc( namespace: &str, label_selector: &str, remote_port: u16, @@ -221,10 +222,10 @@ struct SeedResult { } /// Read-or-create pattern: reads existing KV values, only generates missing ones. -async fn get_or_create( +pub(crate) async fn get_or_create( bao: &BaoClient, path: &str, - fields: &[(&str, &dyn Fn() -> String)], + fields: &[(&str, &(dyn Fn() -> String + Send + Sync))], dirty_paths: &mut HashSet, ) -> Result> { let existing = bao.kv_get("secret", path).await?.unwrap_or_default(); @@ -358,7 +359,7 @@ async fn seed_openbao() -> Result> { &bao, "hydra", &[ - ("system-secret", &rand_token as &dyn Fn() -> String), + ("system-secret", &rand_token as &(dyn Fn() -> String + Send + Sync)), ("cookie-secret", &rand_token), ("pairwise-salt", &rand_token), ], @@ -371,7 +372,7 @@ async fn seed_openbao() -> Result> { &bao, "kratos", &[ - ("secrets-default", &rand_token as &dyn Fn() -> String), + ("secrets-default", &rand_token as &(dyn Fn() -> String + Send + Sync)), ("secrets-cookie", &rand_token), ("smtp-connection-uri", &smtp_uri_fn), ], @@ -383,7 +384,7 @@ async fn seed_openbao() -> Result> { &bao, "seaweedfs", &[ - ("access-key", &rand_token as &dyn Fn() -> String), + ("access-key", &rand_token as &(dyn Fn() -> String + Send + Sync)), ("secret-key", &rand_token), ], &mut dirty_paths, @@ -397,7 +398,7 @@ async fn seed_openbao() -> Result> { &[ ( "admin-username", - &gitea_admin_user_fn as &dyn Fn() -> String, + &gitea_admin_user_fn as &(dyn Fn() -> String + Send + Sync), ), ("admin-password", &rand_token), ], @@ -410,7 +411,7 @@ async fn seed_openbao() -> Result> { &bao, "hive", &[ - ("oidc-client-id", &hive_local_fn as &dyn Fn() -> String), + ("oidc-client-id", &hive_local_fn as &(dyn Fn() -> String + Send + Sync)), ("oidc-client-secret", &rand_token), ], &mut dirty_paths, @@ -422,7 +423,7 @@ async fn seed_openbao() -> Result> { &bao, "livekit", &[ - ("api-key", &devkey_fn as &dyn Fn() -> String), + ("api-key", &devkey_fn as &(dyn Fn() -> String + Send + Sync)), ("api-secret", &rand_token), ], &mut dirty_paths, @@ -432,7 +433,7 @@ async fn seed_openbao() -> Result> { let people = get_or_create( &bao, "people", - &[("django-secret-key", &rand_token as &dyn Fn() -> String)], + &[("django-secret-key", &rand_token as &(dyn Fn() -> String + Send + Sync))], &mut dirty_paths, ) .await?; @@ -441,7 +442,7 @@ async fn seed_openbao() -> Result> { &bao, "login-ui", &[ - ("cookie-secret", &rand_token as &dyn Fn() -> String), + ("cookie-secret", &rand_token as &(dyn Fn() -> String + Send + Sync)), ("csrf-cookie-secret", &rand_token), ], &mut dirty_paths, @@ -464,7 +465,7 @@ async fn seed_openbao() -> Result> { &bao, "kratos-admin", &[ - ("cookie-secret", &rand_token as &dyn Fn() -> String), + ("cookie-secret", &rand_token as &(dyn Fn() -> String + Send + Sync)), ("csrf-cookie-secret", &rand_token), ("admin-identity-ids", &empty_fn), ("s3-access-key", &sw_access_fn), @@ -478,7 +479,7 @@ async fn seed_openbao() -> Result> { &bao, "docs", &[ - ("django-secret-key", &rand_token as &dyn Fn() -> String), + ("django-secret-key", &rand_token as &(dyn Fn() -> String + Send + Sync)), ("collaboration-secret", &rand_token), ], &mut dirty_paths, @@ -489,7 +490,7 @@ async fn seed_openbao() -> Result> { &bao, "meet", &[ - ("django-secret-key", &rand_token as &dyn Fn() -> String), + ("django-secret-key", &rand_token as &(dyn Fn() -> String + Send + Sync)), ("application-jwt-secret-key", &rand_token), ], &mut dirty_paths, @@ -499,7 +500,7 @@ async fn seed_openbao() -> Result> { let drive = get_or_create( &bao, "drive", - &[("django-secret-key", &rand_token as &dyn Fn() -> String)], + &[("django-secret-key", &rand_token as &(dyn Fn() -> String + Send + Sync))], &mut dirty_paths, ) .await?; @@ -507,7 +508,7 @@ async fn seed_openbao() -> Result> { let projects = get_or_create( &bao, "projects", - &[("secret-key", &rand_token as &dyn Fn() -> String)], + &[("secret-key", &rand_token as &(dyn Fn() -> String + Send + Sync))], &mut dirty_paths, ) .await?; @@ -517,7 +518,7 @@ async fn seed_openbao() -> Result> { &bao, "calendars", &[ - ("django-secret-key", &cal_django_fn as &dyn Fn() -> String), + ("django-secret-key", &cal_django_fn as &(dyn Fn() -> String + Send + Sync)), ("salt-key", &rand_token), ("caldav-inbound-api-key", &rand_token), ("caldav-outbound-api-key", &rand_token), @@ -563,12 +564,12 @@ async fn seed_openbao() -> Result> { &bao, "messages", &[ - ("django-secret-key", &rand_token as &dyn Fn() -> String), + ("django-secret-key", &rand_token as &(dyn Fn() -> String + Send + Sync)), ("salt-key", &rand_token), ("mda-api-secret", &rand_token), ( "oidc-refresh-token-key", - &gen_fernet_key as &dyn Fn() -> String, + &gen_fernet_key as &(dyn Fn() -> String + Send + Sync), ), ("dkim-private-key", &dkim_priv_fn), ("dkim-public-key", &dkim_pub_fn), @@ -586,7 +587,7 @@ async fn seed_openbao() -> Result> { &bao, "collabora", &[ - ("username", &admin_fn as &dyn Fn() -> String), + ("username", &admin_fn as &(dyn Fn() -> String + Send + Sync)), ("password", &rand_token), ], &mut dirty_paths, @@ -597,7 +598,7 @@ async fn seed_openbao() -> Result> { &bao, "tuwunel", &[ - ("oidc-client-id", &empty_fn as &dyn Fn() -> String), + ("oidc-client-id", &empty_fn as &(dyn Fn() -> String + Send + Sync)), ("oidc-client-secret", &empty_fn), ("turn-secret", &empty_fn), ("registration-token", &rand_token), @@ -609,18 +610,26 @@ async fn seed_openbao() -> Result> { let grafana = get_or_create( &bao, "grafana", - &[("admin-password", &rand_token as &dyn Fn() -> String)], + &[("admin-password", &rand_token as &(dyn Fn() -> String + Send + Sync))], &mut dirty_paths, ) .await?; let scw_access_fn = || scw_config("access-key"); let scw_secret_fn = || scw_config("secret-key"); + let penpot = get_or_create( + &bao, + "penpot", + &[("secret-key", &rand_token as &(dyn Fn() -> String + Send + Sync))], + &mut dirty_paths, + ) + .await?; + let scaleway_s3 = get_or_create( &bao, "scaleway-s3", &[ - ("access-key-id", &scw_access_fn as &dyn Fn() -> String), + ("access-key-id", &scw_access_fn as &(dyn Fn() -> String + Send + Sync)), ("secret-access-key", &scw_secret_fn), ], &mut dirty_paths, @@ -662,6 +671,7 @@ async fn seed_openbao() -> Result> { ("tuwunel", &tuwunel), ("grafana", &grafana), ("scaleway-s3", &scaleway_s3), + ("penpot", &penpot), ]; for (path, data) in all_paths { @@ -694,7 +704,7 @@ async fn seed_openbao() -> Result> { "auth/kubernetes/role/vso", &serde_json::json!({ "bound_service_account_names": "default", - "bound_service_account_namespaces": "ory,devtools,storage,lasuite,matrix,media,data,monitoring", + "bound_service_account_namespaces": "ory,devtools,storage,lasuite,matrix,media,data,monitoring,cert-manager", "policies": "vso-reader", "ttl": "1h" }), @@ -742,7 +752,7 @@ async fn seed_openbao() -> Result> { // ── Database secrets engine ───────────────────────────────────────────────── /// Enable OpenBao database secrets engine and create PostgreSQL static roles. -async fn configure_db_engine(bao: &BaoClient) -> Result<()> { +pub(crate) async fn configure_db_engine(bao: &BaoClient) -> Result<()> { ok("Configuring OpenBao database secrets engine..."); let pg_rw = "postgres-rw.data.svc.cluster.local:5432"; @@ -825,7 +835,7 @@ async fn configure_db_engine(bao: &BaoClient) -> Result<()> { } /// Execute a psql command on the CNPG primary pod. -async fn psql_exec(cnpg_pod: &str, sql: &str) -> Result<(i32, String)> { +pub(crate) async fn psql_exec(cnpg_pod: &str, sql: &str) -> Result<(i32, String)> { k::kube_exec( "data", cnpg_pod, @@ -838,16 +848,16 @@ async fn psql_exec(cnpg_pod: &str, sql: &str) -> Result<(i32, String)> { // ── Kratos admin identity seeding ─────────────────────────────────────────── #[derive(Debug, Deserialize)] -struct KratosIdentity { - id: String, +pub(crate) struct KratosIdentity { + pub(crate) id: String, } #[derive(Debug, Deserialize)] -struct KratosRecovery { +pub(crate) struct KratosRecovery { #[serde(default)] - recovery_link: String, + pub(crate) recovery_link: String, #[serde(default)] - recovery_code: String, + pub(crate) recovery_code: String, } /// Ensure estudio-admin@ exists in Kratos and is the only admin identity. @@ -951,77 +961,7 @@ async fn seed_kratos_admin_identity(bao: &BaoClient) -> (String, String) { // ── cmd_seed — main entry point ───────────────────────────────────────────── -/// Seed OpenBao KV with crypto-random credentials, then mirror to K8s Secrets. -/// File-based advisory lock for `cmd_seed` to prevent concurrent runs. -struct SeedLock { - path: std::path::PathBuf, -} - -impl SeedLock { - fn acquire() -> Result { - let lock_path = dirs::data_dir() - .unwrap_or_else(|| dirs::home_dir().unwrap().join(".local/share")) - .join("sunbeam") - .join("seed.lock"); - std::fs::create_dir_all(lock_path.parent().unwrap())?; - - match std::fs::OpenOptions::new() - .write(true) - .create_new(true) - .open(&lock_path) - { - Ok(mut f) => { - use std::io::Write; - write!(f, "{}", std::process::id())?; - Ok(SeedLock { path: lock_path }) - } - Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => { - // Check if the PID in the file is still alive - if let Ok(pid_str) = std::fs::read_to_string(&lock_path) { - if let Ok(pid) = pid_str.trim().parse::() { - // kill(pid, 0) checks if process exists without sending a signal - let alive = is_pid_alive(pid); - if alive { - return Err(SunbeamError::secrets( - "Another sunbeam seed is already running. Wait for it to finish.", - )); - } - } - } - // Stale lock, remove and retry - std::fs::remove_file(&lock_path)?; - let mut f = std::fs::OpenOptions::new() - .write(true) - .create_new(true) - .open(&lock_path)?; - use std::io::Write; - write!(f, "{}", std::process::id())?; - Ok(SeedLock { path: lock_path }) - } - Err(e) => Err(e.into()), - } - } -} - -impl Drop for SeedLock { - fn drop(&mut self) { - let _ = std::fs::remove_file(&self.path); - } -} - -/// Check if a process with the given PID is still alive. -fn is_pid_alive(pid: i32) -> bool { - std::process::Command::new("kill") - .args(["-0", &pid.to_string()]) - .stdout(std::process::Stdio::null()) - .stderr(std::process::Stdio::null()) - .status() - .map(|s| s.success()) - .unwrap_or(false) -} - pub async fn cmd_seed() -> Result<()> { - let _lock = SeedLock::acquire()?; step("Seeding secrets..."); let seed_result = seed_openbao().await?; @@ -1512,7 +1452,7 @@ spec: // ── Utility helpers ───────────────────────────────────────────────────────── -async fn wait_pod_running(ns: &str, pod_name: &str, timeout_secs: u64) -> bool { +pub(crate) async fn wait_pod_running(ns: &str, pod_name: &str, timeout_secs: u64) -> bool { let client = match k::get_client().await { Ok(c) => c, Err(_) => return false, @@ -1537,7 +1477,7 @@ async fn wait_pod_running(ns: &str, pod_name: &str, timeout_secs: u64) -> bool { false } -fn scw_config(key: &str) -> String { +pub(crate) fn scw_config(key: &str) -> String { std::process::Command::new("scw") .args(["config", "get", key]) .output() @@ -1565,7 +1505,7 @@ async fn delete_k8s_secret(ns: &str, name: &str) -> Result<()> { Ok(()) } -async fn delete_resource(ns: &str, kind: &str, name: &str) -> Result<()> { +pub(crate) async fn delete_resource(ns: &str, kind: &str, name: &str) -> Result<()> { let ctx = format!("--context={}", k::context()); let _ = tokio::process::Command::new("kubectl") .args([&ctx, "-n", ns, "delete", kind, name, "--ignore-not-found"]) diff --git a/src/workflows/seed/steps/kv_seeding.rs b/src/workflows/seed/steps/kv_seeding.rs new file mode 100644 index 00000000..4fbfd87b --- /dev/null +++ b/src/workflows/seed/steps/kv_seeding.rs @@ -0,0 +1,556 @@ +//! KV secret seeding steps: generate/read all credentials, write dirty paths, +//! configure Kubernetes auth for VSO. +//! +//! Data-struct-agnostic — reads JSON fields directly for cross-workflow reuse. + +use std::collections::{HashMap, HashSet}; + +use wfe_core::models::ExecutionResult; +use wfe_core::traits::{StepBody, StepExecutionContext}; + +use crate::openbao::BaoClient; +use crate::output::{ok, warn}; +use crate::secrets::{ + self, gen_dkim_key_pair, gen_fernet_key, rand_token, rand_token_n, scw_config, + GITEA_ADMIN_USER, SMTP_URI, +}; + +fn step_err(msg: impl Into) -> wfe_core::WfeError { + wfe_core::WfeError::StepExecution(msg.into()) +} + +fn json_bool(data: &serde_json::Value, key: &str) -> bool { + data.get(key).and_then(|v| v.as_bool()).unwrap_or(false) +} + +fn json_str(data: &serde_json::Value, key: &str) -> Option { + data.get(key).and_then(|v| v.as_str()).map(|s| s.to_string()) +} + +// ── SeedAllKVPaths ────────────────────────────────────────────────────────── + +/// Single step that runs the get_or_create loop for all 19 services. +/// Sets `creds` and `dirty_paths` in data. +/// +/// Reads: `skip_seed`, `ob_pod`, `root_token` +/// Writes: `creds`, `dirty_paths` +#[derive(Default)] +pub struct SeedAllKVPaths; + +#[async_trait::async_trait] +impl StepBody for SeedAllKVPaths { + async fn run( + &mut self, + ctx: &StepExecutionContext<'_>, + ) -> wfe_core::Result { + let data = &ctx.workflow.data; + + if json_bool(data, "skip_seed") { + return Ok(ExecutionResult::next()); + } + + let ob_pod = match json_str(data, "ob_pod") { + Some(p) => p, + None => { + warn("No ob_pod set -- skipping KV seeding."); + return Ok(ExecutionResult::next()); + } + }; + let root_token = match json_str(data, "root_token") { + Some(t) => t, + None => { + warn("No root_token set -- skipping KV seeding."); + return Ok(ExecutionResult::next()); + } + }; + + let pf = secrets::port_forward("data", &ob_pod, 8200).await + .map_err(|e| step_err(e.to_string()))?; + let bao = BaoClient::with_token( + &format!("http://127.0.0.1:{}", pf.local_port), + &root_token, + ); + + let mut dirty_paths: HashSet = HashSet::new(); + + let hydra = secrets::get_or_create( + &bao, "hydra", + &[ + ("system-secret", &rand_token as &(dyn Fn() -> String + Send + Sync)), + ("cookie-secret", &rand_token), + ("pairwise-salt", &rand_token), + ], + &mut dirty_paths, + ).await.map_err(|e| step_err(e.to_string()))?; + + let smtp_uri_fn = || SMTP_URI.to_string(); + let kratos = secrets::get_or_create( + &bao, "kratos", + &[ + ("secrets-default", &rand_token as &(dyn Fn() -> String + Send + Sync)), + ("secrets-cookie", &rand_token), + ("smtp-connection-uri", &smtp_uri_fn), + ], + &mut dirty_paths, + ).await.map_err(|e| step_err(e.to_string()))?; + + let seaweedfs = secrets::get_or_create( + &bao, "seaweedfs", + &[ + ("access-key", &rand_token as &(dyn Fn() -> String + Send + Sync)), + ("secret-key", &rand_token), + ], + &mut dirty_paths, + ).await.map_err(|e| step_err(e.to_string()))?; + + let gitea_admin_user_fn = || GITEA_ADMIN_USER.to_string(); + let gitea = secrets::get_or_create( + &bao, "gitea", + &[ + ("admin-username", &gitea_admin_user_fn as &(dyn Fn() -> String + Send + Sync)), + ("admin-password", &rand_token), + ], + &mut dirty_paths, + ).await.map_err(|e| step_err(e.to_string()))?; + + let hive_local_fn = || "hive-local".to_string(); + let hive = secrets::get_or_create( + &bao, "hive", + &[ + ("oidc-client-id", &hive_local_fn as &(dyn Fn() -> String + Send + Sync)), + ("oidc-client-secret", &rand_token), + ], + &mut dirty_paths, + ).await.map_err(|e| step_err(e.to_string()))?; + + let devkey_fn = || "devkey".to_string(); + let livekit = secrets::get_or_create( + &bao, "livekit", + &[ + ("api-key", &devkey_fn as &(dyn Fn() -> String + Send + Sync)), + ("api-secret", &rand_token), + ], + &mut dirty_paths, + ).await.map_err(|e| step_err(e.to_string()))?; + + let people = secrets::get_or_create( + &bao, "people", + &[("django-secret-key", &rand_token as &(dyn Fn() -> String + Send + Sync))], + &mut dirty_paths, + ).await.map_err(|e| step_err(e.to_string()))?; + + let login_ui = secrets::get_or_create( + &bao, "login-ui", + &[ + ("cookie-secret", &rand_token as &(dyn Fn() -> String + Send + Sync)), + ("csrf-cookie-secret", &rand_token), + ], + &mut dirty_paths, + ).await.map_err(|e| step_err(e.to_string()))?; + + let sw_access = seaweedfs.get("access-key").cloned().unwrap_or_default(); + let sw_secret = seaweedfs.get("secret-key").cloned().unwrap_or_default(); + let empty_fn = || String::new(); + let sw_access_fn = { let v = sw_access.clone(); move || v.clone() }; + let sw_secret_fn = { let v = sw_secret.clone(); move || v.clone() }; + + let kratos_admin = secrets::get_or_create( + &bao, "kratos-admin", + &[ + ("cookie-secret", &rand_token as &(dyn Fn() -> String + Send + Sync)), + ("csrf-cookie-secret", &rand_token), + ("admin-identity-ids", &empty_fn), + ("s3-access-key", &sw_access_fn), + ("s3-secret-key", &sw_secret_fn), + ], + &mut dirty_paths, + ).await.map_err(|e| step_err(e.to_string()))?; + + let docs = secrets::get_or_create( + &bao, "docs", + &[ + ("django-secret-key", &rand_token as &(dyn Fn() -> String + Send + Sync)), + ("collaboration-secret", &rand_token), + ], + &mut dirty_paths, + ).await.map_err(|e| step_err(e.to_string()))?; + + let meet = secrets::get_or_create( + &bao, "meet", + &[ + ("django-secret-key", &rand_token as &(dyn Fn() -> String + Send + Sync)), + ("application-jwt-secret-key", &rand_token), + ], + &mut dirty_paths, + ).await.map_err(|e| step_err(e.to_string()))?; + + let drive = secrets::get_or_create( + &bao, "drive", + &[("django-secret-key", &rand_token as &(dyn Fn() -> String + Send + Sync))], + &mut dirty_paths, + ).await.map_err(|e| step_err(e.to_string()))?; + + let projects = secrets::get_or_create( + &bao, "projects", + &[("secret-key", &rand_token as &(dyn Fn() -> String + Send + Sync))], + &mut dirty_paths, + ).await.map_err(|e| step_err(e.to_string()))?; + + let cal_django_fn = || rand_token_n(50); + let calendars = secrets::get_or_create( + &bao, "calendars", + &[ + ("django-secret-key", &cal_django_fn as &(dyn Fn() -> String + Send + Sync)), + ("salt-key", &rand_token), + ("caldav-inbound-api-key", &rand_token), + ("caldav-outbound-api-key", &rand_token), + ("caldav-internal-api-key", &rand_token), + ], + &mut dirty_paths, + ).await.map_err(|e| step_err(e.to_string()))?; + + // DKIM key pair + let existing_messages = bao.kv_get("secret", "messages").await + .map_err(|e| step_err(e.to_string()))? + .unwrap_or_default(); + let (dkim_private, dkim_public) = if existing_messages + .get("dkim-private-key").filter(|v| !v.is_empty()).is_some() + { + ( + existing_messages.get("dkim-private-key").cloned().unwrap_or_default(), + existing_messages.get("dkim-public-key").cloned().unwrap_or_default(), + ) + } else { + gen_dkim_key_pair() + }; + + let dkim_priv_fn = { let v = dkim_private.clone(); move || v.clone() }; + let dkim_pub_fn = { let v = dkim_public.clone(); move || v.clone() }; + let socks_proxy_fn = || format!("sunbeam:{}", rand_token()); + let sunbeam_fn = || "sunbeam".to_string(); + + let messages = secrets::get_or_create( + &bao, "messages", + &[ + ("django-secret-key", &rand_token as &(dyn Fn() -> String + Send + Sync)), + ("salt-key", &rand_token), + ("mda-api-secret", &rand_token), + ("oidc-refresh-token-key", &gen_fernet_key as &(dyn Fn() -> String + Send + Sync)), + ("dkim-private-key", &dkim_priv_fn), + ("dkim-public-key", &dkim_pub_fn), + ("rspamd-password", &rand_token), + ("socks-proxy-users", &socks_proxy_fn), + ("mta-out-smtp-username", &sunbeam_fn), + ("mta-out-smtp-password", &rand_token), + ], + &mut dirty_paths, + ).await.map_err(|e| step_err(e.to_string()))?; + + let admin_fn = || "admin".to_string(); + let collabora = secrets::get_or_create( + &bao, "collabora", + &[ + ("username", &admin_fn as &(dyn Fn() -> String + Send + Sync)), + ("password", &rand_token), + ], + &mut dirty_paths, + ).await.map_err(|e| step_err(e.to_string()))?; + + let tuwunel = secrets::get_or_create( + &bao, "tuwunel", + &[ + ("oidc-client-id", &empty_fn as &(dyn Fn() -> String + Send + Sync)), + ("oidc-client-secret", &empty_fn), + ("turn-secret", &empty_fn), + ("registration-token", &rand_token), + ], + &mut dirty_paths, + ).await.map_err(|e| step_err(e.to_string()))?; + + let grafana = secrets::get_or_create( + &bao, "grafana", + &[("admin-password", &rand_token as &(dyn Fn() -> String + Send + Sync))], + &mut dirty_paths, + ).await.map_err(|e| step_err(e.to_string()))?; + + let scw_access_fn = || scw_config("access-key"); + let scw_secret_fn = || scw_config("secret-key"); + let scaleway_s3 = secrets::get_or_create( + &bao, "scaleway-s3", + &[ + ("access-key-id", &scw_access_fn as &(dyn Fn() -> String + Send + Sync)), + ("secret-access-key", &scw_secret_fn), + ], + &mut dirty_paths, + ).await.map_err(|e| step_err(e.to_string()))?; + + // Build credentials map + let mut creds = HashMap::new(); + let field_map: &[(&str, &str, &HashMap)] = &[ + ("hydra-system-secret", "system-secret", &hydra), + ("hydra-cookie-secret", "cookie-secret", &hydra), + ("hydra-pairwise-salt", "pairwise-salt", &hydra), + ("kratos-secrets-default", "secrets-default", &kratos), + ("kratos-secrets-cookie", "secrets-cookie", &kratos), + ("s3-access-key", "access-key", &seaweedfs), + ("s3-secret-key", "secret-key", &seaweedfs), + ("gitea-admin-password", "admin-password", &gitea), + ("hive-oidc-client-id", "oidc-client-id", &hive), + ("hive-oidc-client-secret", "oidc-client-secret", &hive), + ("people-django-secret", "django-secret-key", &people), + ("livekit-api-key", "api-key", &livekit), + ("livekit-api-secret", "api-secret", &livekit), + ("kratos-admin-cookie-secret", "cookie-secret", &kratos_admin), + ("messages-dkim-public-key", "dkim-public-key", &messages), + ]; + + for (cred_key, field_key, source) in field_map { + creds.insert(cred_key.to_string(), source.get(*field_key).cloned().unwrap_or_default()); + } + + // Store per-path data for WriteDirtyKVPaths + let all_paths: &[(&str, &HashMap)] = &[ + ("hydra", &hydra), ("kratos", &kratos), ("seaweedfs", &seaweedfs), + ("gitea", &gitea), ("hive", &hive), ("livekit", &livekit), + ("people", &people), ("login-ui", &login_ui), ("kratos-admin", &kratos_admin), + ("docs", &docs), ("meet", &meet), ("drive", &drive), + ("projects", &projects), ("calendars", &calendars), ("messages", &messages), + ("collabora", &collabora), ("tuwunel", &tuwunel), ("grafana", &grafana), + ("scaleway-s3", &scaleway_s3), + ]; + + for (path, data) in all_paths { + let json = serde_json::to_string(data).map_err(|e| step_err(e.to_string()))?; + creds.insert(format!("kv_data/{path}"), json); + } + + let dirty_vec: Vec = dirty_paths.into_iter().collect(); + + let mut result = ExecutionResult::next(); + result.output_data = Some(serde_json::json!({ + "creds": creds, + "dirty_paths": dirty_vec, + })); + Ok(result) + } +} + +// ── WriteDirtyKVPaths ─────────────────────────────────────────────────────── + +/// Write all modified KV paths to OpenBao. +/// +/// Reads: `skip_seed`, `ob_pod`, `root_token`, `dirty_paths`, `creds` +#[derive(Default)] +pub struct WriteDirtyKVPaths; + +#[async_trait::async_trait] +impl StepBody for WriteDirtyKVPaths { + async fn run( + &mut self, + ctx: &StepExecutionContext<'_>, + ) -> wfe_core::Result { + let data = &ctx.workflow.data; + + if json_bool(data, "skip_seed") { + return Ok(ExecutionResult::next()); + } + + let ob_pod = match json_str(data, "ob_pod") { + Some(p) => p, + None => return Ok(ExecutionResult::next()), + }; + let root_token = match json_str(data, "root_token") { + Some(t) => t, + None => return Ok(ExecutionResult::next()), + }; + + let dirty_paths: Vec = data.get("dirty_paths") + .and_then(|v| serde_json::from_value(v.clone()).ok()) + .unwrap_or_default(); + + if dirty_paths.is_empty() { + ok("All OpenBao KV secrets already present -- skipping writes."); + return Ok(ExecutionResult::next()); + } + + let mut sorted_paths = dirty_paths.clone(); + sorted_paths.sort(); + ok(&format!("Writing new secrets to OpenBao KV ({})...", sorted_paths.join(", "))); + + let pf = secrets::port_forward("data", &ob_pod, 8200).await + .map_err(|e| step_err(e.to_string()))?; + let bao = BaoClient::with_token( + &format!("http://127.0.0.1:{}", pf.local_port), + &root_token, + ); + + let creds: HashMap = data.get("creds") + .and_then(|v| serde_json::from_value(v.clone()).ok()) + .unwrap_or_default(); + + let dirty_set: HashSet<&str> = dirty_paths.iter().map(|s| s.as_str()).collect(); + + let kv_paths = [ + "hydra", "kratos", "seaweedfs", "gitea", "hive", "livekit", + "people", "login-ui", "kratos-admin", "docs", "meet", "drive", + "projects", "calendars", "messages", "collabora", "tuwunel", + "grafana", "scaleway-s3", "penpot", + ]; + + for path in kv_paths { + if dirty_set.contains(path) { + let json_key = format!("kv_data/{path}"); + if let Some(json_str) = creds.get(&json_key) { + let path_data: HashMap = serde_json::from_str(json_str) + .map_err(|e| step_err(e.to_string()))?; + bao.kv_patch("secret", path, &path_data).await + .map_err(|e| step_err(e.to_string()))?; + } + } + } + + Ok(ExecutionResult::next()) + } +} + +// ── ConfigureKubernetesAuth ───────────────────────────────────────────────── + +/// Enable Kubernetes auth, configure it, write VSO policy and role. +/// +/// Reads: `skip_seed`, `ob_pod`, `root_token` +#[derive(Default)] +pub struct ConfigureKubernetesAuth; + +#[async_trait::async_trait] +impl StepBody for ConfigureKubernetesAuth { + async fn run( + &mut self, + ctx: &StepExecutionContext<'_>, + ) -> wfe_core::Result { + let data = &ctx.workflow.data; + + if json_bool(data, "skip_seed") { + return Ok(ExecutionResult::next()); + } + + let ob_pod = match json_str(data, "ob_pod") { + Some(p) => p, + None => return Ok(ExecutionResult::next()), + }; + let root_token = match json_str(data, "root_token") { + Some(t) => t, + None => return Ok(ExecutionResult::next()), + }; + + ok("Configuring Kubernetes auth for VSO..."); + + let pf = secrets::port_forward("data", &ob_pod, 8200).await + .map_err(|e| step_err(e.to_string()))?; + let bao = BaoClient::with_token( + &format!("http://127.0.0.1:{}", pf.local_port), + &root_token, + ); + + let _ = bao.auth_enable("kubernetes", "kubernetes").await; + + bao.write( + "auth/kubernetes/config", + &serde_json::json!({ + "kubernetes_host": "https://kubernetes.default.svc.cluster.local" + }), + ).await.map_err(|e| step_err(e.to_string()))?; + + let policy_hcl = concat!( + "path \"secret/data/*\" { capabilities = [\"read\"] }\n", + "path \"secret/metadata/*\" { capabilities = [\"read\", \"list\"] }\n", + "path \"database/static-creds/*\" { capabilities = [\"read\"] }\n", + ); + bao.write_policy("vso-reader", policy_hcl).await + .map_err(|e| step_err(e.to_string()))?; + + bao.write( + "auth/kubernetes/role/vso", + &serde_json::json!({ + "bound_service_account_names": "default", + "bound_service_account_namespaces": "ory,devtools,storage,lasuite,matrix,media,data,monitoring,cert-manager", + "policies": "vso-reader", + "ttl": "1h" + }), + ).await.map_err(|e| step_err(e.to_string()))?; + + Ok(ExecutionResult::next()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + use wfe::run_workflow_sync; + use wfe_core::builder::WorkflowBuilder; + use wfe_core::models::WorkflowStatus; + + async fn run_step( + data: serde_json::Value, + ) -> wfe_core::models::WorkflowInstance { + let host = crate::workflows::host::create_test_host().await.unwrap(); + host.register_step::().await; + let def = WorkflowBuilder::::new() + .start_with::() + .name("test-step") + .end_workflow() + .build("test-wf", 1); + host.register_workflow_definition(def).await; + let instance = run_workflow_sync(&host, "test-wf", 1, data, Duration::from_secs(5)) + .await + .unwrap(); + host.stop().await; + instance + } + + #[tokio::test] + async fn test_seed_all_kv_paths_skip_seed() { + let instance = run_step::(serde_json::json!({ "skip_seed": true })).await; + assert_eq!(instance.status, WorkflowStatus::Complete); + } + + #[tokio::test] + async fn test_seed_all_kv_paths_no_ob_pod() { + let instance = run_step::(serde_json::json!({ "skip_seed": false })).await; + assert_eq!(instance.status, WorkflowStatus::Complete); + } + + #[tokio::test] + async fn test_seed_all_kv_paths_no_root_token() { + let instance = run_step::( + serde_json::json!({ "skip_seed": false, "ob_pod": "openbao-0" }) + ).await; + assert_eq!(instance.status, WorkflowStatus::Complete); + } + + #[tokio::test] + async fn test_write_dirty_kv_paths_skip_seed() { + let instance = run_step::(serde_json::json!({ "skip_seed": true })).await; + assert_eq!(instance.status, WorkflowStatus::Complete); + } + + #[tokio::test] + async fn test_write_dirty_kv_paths_empty_dirty_paths() { + let instance = run_step::(serde_json::json!({ + "skip_seed": false, "ob_pod": "openbao-0", "root_token": "hvs.test", "dirty_paths": [], + })).await; + assert_eq!(instance.status, WorkflowStatus::Complete); + } + + #[tokio::test] + async fn test_configure_kubernetes_auth_skip_seed() { + let instance = run_step::(serde_json::json!({ "skip_seed": true })).await; + assert_eq!(instance.status, WorkflowStatus::Complete); + } + + #[tokio::test] + async fn test_configure_kubernetes_auth_no_ob_pod() { + let instance = run_step::(serde_json::json!({ "skip_seed": false })).await; + assert_eq!(instance.status, WorkflowStatus::Complete); + } +} diff --git a/src/workflows/seed/steps/postgres.rs b/src/workflows/seed/steps/postgres.rs new file mode 100644 index 00000000..aab54fc2 --- /dev/null +++ b/src/workflows/seed/steps/postgres.rs @@ -0,0 +1,291 @@ +//! PostgreSQL setup steps: wait for CNPG cluster, create roles/databases, +//! configure OpenBao database secrets engine. +//! +//! Data-struct-agnostic — reads JSON fields directly for cross-workflow reuse. + +use std::collections::HashMap; + +use k8s_openapi::api::core::v1::Pod; +use kube::api::{Api, ApiResource, DynamicObject, ListParams}; +use wfe_core::models::ExecutionResult; +use wfe_core::traits::{StepBody, StepExecutionContext}; + +use crate::kube as k; +use crate::openbao::BaoClient; +use crate::output::{ok, warn}; +use crate::secrets::{self, PG_USERS}; + +fn step_err(msg: impl Into) -> wfe_core::WfeError { + wfe_core::WfeError::StepExecution(msg.into()) +} + +fn json_bool(data: &serde_json::Value, key: &str) -> bool { + data.get(key).and_then(|v| v.as_bool()).unwrap_or(false) +} + +fn json_str(data: &serde_json::Value, key: &str) -> Option { + data.get(key).and_then(|v| v.as_str()).map(|s| s.to_string()) +} + +// ── Pure helpers (testable without K8s) ───────────────────────────────────── + +/// Build the user to database mapping used by EnsurePGRolesAndDatabases. +pub(crate) fn pg_db_map() -> HashMap<&'static str, &'static str> { + [ + ("kratos", "kratos_db"), ("hydra", "hydra_db"), ("gitea", "gitea_db"), + ("hive", "hive_db"), ("docs", "docs_db"), ("meet", "meet_db"), + ("drive", "drive_db"), ("messages", "messages_db"), + ("conversations", "conversations_db"), ("people", "people_db"), + ("find", "find_db"), ("calendars", "calendars_db"), ("projects", "projects_db"), + ("penpot", "penpot_db"), + ].into_iter().collect() +} + +/// SQL to idempotently create a postgres user if it does not exist. +pub(crate) fn ensure_user_sql(user: &str) -> String { + format!( + "DO $$ BEGIN IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname='{user}') \ + THEN CREATE USER {user}; END IF; END $$;" + ) +} + +/// SQL to create a database owned by the given user. +pub(crate) fn create_db_sql(db: &str, user: &str) -> String { + format!("CREATE DATABASE {db} OWNER {user};") +} + +// ── WaitForPostgres ───────────────────────────────────────────────────────── + +/// Wait for CNPG cluster healthy state, set `pg_pod`. +/// +/// Reads: `skip_seed` +/// Writes: `pg_pod` +#[derive(Default)] +pub struct WaitForPostgres; + +#[async_trait::async_trait] +impl StepBody for WaitForPostgres { + async fn run( + &mut self, + ctx: &StepExecutionContext<'_>, + ) -> wfe_core::Result { + if json_bool(&ctx.workflow.data, "skip_seed") { + return Ok(ExecutionResult::next()); + } + + ok("Waiting for postgres cluster..."); + let mut pg_pod = String::new(); + + let client = k::get_client().await.map_err(|e| step_err(e.to_string()))?; + let ar = ApiResource { + group: "postgresql.cnpg.io".into(), + version: "v1".into(), + api_version: "postgresql.cnpg.io/v1".into(), + kind: "Cluster".into(), + plural: "clusters".into(), + }; + let cnpg_api: Api = Api::namespaced_with(client.clone(), "data", &ar); + + for _ in 0..60 { + if let Ok(cluster) = cnpg_api.get("postgres").await { + let phase = cluster.data + .get("status").and_then(|s| s.get("phase")) + .and_then(|p| p.as_str()).unwrap_or(""); + if phase == "Cluster in healthy state" { + let pods: Api = Api::namespaced(client.clone(), "data"); + let lp = ListParams::default().labels("cnpg.io/cluster=postgres,role=primary"); + if let Ok(pod_list) = pods.list(&lp).await { + if let Some(name) = pod_list.items.first() + .and_then(|p| p.metadata.name.as_deref()) + { + pg_pod = name.to_string(); + ok(&format!("Postgres ready ({pg_pod}).")); + break; + } + } + } + } + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } + + if pg_pod.is_empty() { + warn("Postgres not ready after 5 min -- continuing anyway."); + } + + let mut result = ExecutionResult::next(); + if !pg_pod.is_empty() { + result.output_data = Some(serde_json::json!({ "pg_pod": pg_pod })); + } + Ok(result) + } +} + +// ── EnsurePGRolesAndDatabases ─────────────────────────────────────────────── + +/// Create all 13 users and databases. +/// +/// Reads: `skip_seed`, `pg_pod` +#[derive(Default)] +pub struct EnsurePGRolesAndDatabases; + +#[async_trait::async_trait] +impl StepBody for EnsurePGRolesAndDatabases { + async fn run( + &mut self, + ctx: &StepExecutionContext<'_>, + ) -> wfe_core::Result { + let data = &ctx.workflow.data; + + if json_bool(data, "skip_seed") { + return Ok(ExecutionResult::next()); + } + + let pg_pod = match json_str(data, "pg_pod") { + Some(p) if !p.is_empty() => p, + _ => return Ok(ExecutionResult::next()), + }; + + ok("Ensuring postgres roles and databases exist..."); + + let db_map = pg_db_map(); + + for user in PG_USERS { + let sql = ensure_user_sql(user); + let _ = k::kube_exec("data", &pg_pod, &["psql", "-U", "postgres", "-c", &sql], Some("postgres")).await; + + let db = db_map.get(user).copied().unwrap_or("unknown_db"); + let sql = create_db_sql(db, user); + let _ = k::kube_exec("data", &pg_pod, &["psql", "-U", "postgres", "-c", &sql], Some("postgres")).await; + } + + Ok(ExecutionResult::next()) + } +} + +// ── ConfigureDatabaseEngine ───────────────────────────────────────────────── + +/// Configure OpenBao database secrets engine. +/// +/// Reads: `skip_seed`, `pg_pod`, `ob_pod`, `root_token` +#[derive(Default)] +pub struct ConfigureDatabaseEngine; + +#[async_trait::async_trait] +impl StepBody for ConfigureDatabaseEngine { + async fn run( + &mut self, + ctx: &StepExecutionContext<'_>, + ) -> wfe_core::Result { + let data = &ctx.workflow.data; + + if json_bool(data, "skip_seed") { + return Ok(ExecutionResult::next()); + } + + let _pg_pod = match json_str(data, "pg_pod") { + Some(p) if !p.is_empty() => p, + _ => return Ok(ExecutionResult::next()), + }; + let ob_pod = match json_str(data, "ob_pod") { + Some(p) => p, + None => { warn("Skipping DB engine config -- missing ob_pod."); return Ok(ExecutionResult::next()); } + }; + let root_token = match json_str(data, "root_token") { + Some(t) if !t.is_empty() => t, + _ => { warn("Skipping DB engine config -- missing root_token."); return Ok(ExecutionResult::next()); } + }; + + match secrets::port_forward("data", &ob_pod, 8200).await { + Ok(pf) => { + let bao = BaoClient::with_token( + &format!("http://127.0.0.1:{}", pf.local_port), + &root_token, + ); + if let Err(e) = secrets::configure_db_engine(&bao).await { + warn(&format!("DB engine config failed: {e}")); + } + } + Err(e) => warn(&format!("Port-forward to OpenBao failed: {e}")), + } + + Ok(ExecutionResult::next()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + use wfe::run_workflow_sync; + use wfe_core::builder::WorkflowBuilder; + use wfe_core::models::WorkflowStatus; + + async fn run_step( + data: serde_json::Value, + ) -> wfe_core::models::WorkflowInstance { + let host = crate::workflows::host::create_test_host().await.unwrap(); + host.register_step::().await; + let def = WorkflowBuilder::::new() + .start_with::() + .name("test-step") + .end_workflow() + .build("test-wf", 1); + host.register_workflow_definition(def).await; + let instance = run_workflow_sync(&host, "test-wf", 1, data, Duration::from_secs(5)) + .await + .unwrap(); + host.stop().await; + instance + } + + #[tokio::test] + async fn test_wait_for_postgres_skip_seed() { + let instance = run_step::(serde_json::json!({ "skip_seed": true })).await; + assert_eq!(instance.status, WorkflowStatus::Complete); + } + + #[tokio::test] + async fn test_ensure_pg_roles_skip_seed() { + let instance = run_step::(serde_json::json!({ "skip_seed": true })).await; + assert_eq!(instance.status, WorkflowStatus::Complete); + } + + #[tokio::test] + async fn test_ensure_pg_roles_no_pg_pod() { + let instance = run_step::(serde_json::json!({ "skip_seed": false })).await; + assert_eq!(instance.status, WorkflowStatus::Complete); + } + + #[tokio::test] + async fn test_configure_db_engine_skip_seed() { + let instance = run_step::(serde_json::json!({ "skip_seed": true })).await; + assert_eq!(instance.status, WorkflowStatus::Complete); + } + + #[tokio::test] + async fn test_configure_db_engine_no_pg_pod() { + let instance = run_step::(serde_json::json!({ "skip_seed": false })).await; + assert_eq!(instance.status, WorkflowStatus::Complete); + } + + #[test] + fn test_pg_db_map_contains_all_users() { + let map = pg_db_map(); + assert_eq!(map.len(), 13); + for user in PG_USERS { + assert!(map.contains_key(user), "pg_db_map missing key for: {user}"); + } + } + + #[test] + fn test_ensure_user_sql_format() { + let sql = ensure_user_sql("kratos"); + assert!(sql.contains("rolname='kratos'")); + assert!(sql.contains("CREATE USER kratos")); + } + + #[test] + fn test_create_db_sql_format() { + assert_eq!(create_db_sql("kratos_db", "kratos"), "CREATE DATABASE kratos_db OWNER kratos;"); + } +}