diff --git a/src/workflows/primitives/apply_manifest.rs b/src/workflows/primitives/apply_manifest.rs new file mode 100644 index 00000000..a20f73d5 --- /dev/null +++ b/src/workflows/primitives/apply_manifest.rs @@ -0,0 +1,77 @@ +//! ApplyManifest — atomic step that applies kustomize manifests for a single namespace. +//! +//! Reads `step_config.namespace` and resolves domain/email from workflow data. + +use wfe_core::models::ExecutionResult; +use wfe_core::traits::{StepBody, StepExecutionContext}; + +use crate::output::step; + +fn step_err(msg: impl Into) -> wfe_core::WfeError { + wfe_core::WfeError::StepExecution(msg.into()) +} + +/// Apply kustomize manifests for one namespace. +/// +/// **step_config:** `{"namespace": "ory"}` +/// +/// Reads `__ctx` (domain, acme_email) and `domain` from workflow data. +#[derive(Default)] +pub struct ApplyManifest; + +#[async_trait::async_trait] +impl StepBody for ApplyManifest { + async fn run( + &mut self, + ctx: &StepExecutionContext<'_>, + ) -> wfe_core::Result { + let config = ctx.step.step_config.as_ref() + .ok_or_else(|| step_err("ApplyManifest: missing step_config"))?; + let namespace = config.get("namespace") + .and_then(|v| v.as_str()) + .ok_or_else(|| step_err("ApplyManifest: missing namespace in step_config"))?; + + let data = &ctx.workflow.data; + + let domain = data.get("domain").and_then(|v| v.as_str()).unwrap_or(""); + let domain = if domain.is_empty() { + data.get("__ctx") + .and_then(|c| c.get("domain")) + .and_then(|v| v.as_str()) + .unwrap_or("") + } else { + domain + }; + + let email = data.get("__ctx") + .and_then(|c| c.get("acme_email")) + .and_then(|v| v.as_str()) + .unwrap_or(""); + + step(&format!("Applying {namespace}...")); + + crate::manifests::cmd_apply("production", domain, email, namespace) + .await + .map_err(|e| step_err(e.to_string()))?; + + Ok(ExecutionResult::next()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn apply_manifest_is_default() { + let _ = ApplyManifest::default(); + } + + #[test] + fn missing_step_config_is_descriptive() { + let err = step_err("ApplyManifest: missing step_config"); + let msg = err.to_string(); + assert!(msg.contains("ApplyManifest"), "error should name the step: {msg}"); + assert!(msg.contains("step_config"), "error should mention step_config: {msg}"); + } +} diff --git a/src/workflows/primitives/collect_credentials.rs b/src/workflows/primitives/collect_credentials.rs new file mode 100644 index 00000000..d8be3119 --- /dev/null +++ b/src/workflows/primitives/collect_credentials.rs @@ -0,0 +1,122 @@ +//! CollectCredentials — joins per-service cred outputs into a unified `creds` map. +//! +//! Runs after all SeedKVPath steps complete (parallel join point). + +use std::collections::HashMap; + +use wfe_core::models::ExecutionResult; +use wfe_core::traits::{StepBody, StepExecutionContext}; + +use crate::output::ok; + +fn step_err(msg: impl Into) -> wfe_core::WfeError { + wfe_core::WfeError::StepExecution(msg.into()) +} + +/// Credential mapping: maps a global cred key to a service + field. +/// E.g., `("hydra-system-secret", "hydra", "system-secret")` means +/// `creds["hydra-system-secret"] = creds_hydra["system-secret"]`. +const CRED_MAPPINGS: &[(&str, &str, &str)] = &[ + ("hydra-system-secret", "hydra", "system-secret"), + ("hydra-cookie-secret", "hydra", "cookie-secret"), + ("hydra-pairwise-salt", "hydra", "pairwise-salt"), + ("kratos-secrets-default", "kratos", "secrets-default"), + ("kratos-secrets-cookie", "kratos", "secrets-cookie"), + ("s3-access-key", "seaweedfs", "access-key"), + ("s3-secret-key", "seaweedfs", "secret-key"), + ("gitea-admin-password", "gitea", "admin-password"), + ("hive-oidc-client-id", "hive", "oidc-client-id"), + ("hive-oidc-client-secret", "hive", "oidc-client-secret"), + ("people-django-secret", "people", "django-secret-key"), + ("livekit-api-key", "livekit", "api-key"), + ("livekit-api-secret", "livekit", "api-secret"), + ("kratos-admin-cookie-secret", "kratos-admin", "cookie-secret"), + ("messages-dkim-public-key", "messages", "dkim-public-key"), +]; + +/// All services that have KV data to collect. +const KV_SERVICES: &[&str] = &[ + "hydra", "kratos", "seaweedfs", "gitea", "hive", "livekit", + "people", "login-ui", "kratos-admin", "docs", "meet", "drive", + "projects", "calendars", "messages", "collabora", "tuwunel", + "grafana", "scaleway-s3", +]; + +/// Collect per-service credential outputs into a unified `creds` map. +/// +/// Reads `creds_{service}` and `kv_data_{service}` for each service from workflow data. +/// Outputs `creds` (unified HashMap) and `dirty_paths` (Vec of dirty service names). +#[derive(Default)] +pub struct CollectCredentials; + +#[async_trait::async_trait] +impl StepBody for CollectCredentials { + async fn run( + &mut self, + ctx: &StepExecutionContext<'_>, + ) -> wfe_core::Result { + let data = &ctx.workflow.data; + + if data.get("skip_seed").and_then(|v| v.as_bool()).unwrap_or(false) { + return Ok(ExecutionResult::next()); + } + + let mut creds: HashMap = HashMap::new(); + let mut dirty_paths: Vec = Vec::new(); + + // Collect per-service creds into the global map + for (global_key, service, field) in CRED_MAPPINGS { + let value = data.get(&format!("creds_{service}")) + .and_then(|v| v.get(*field)) + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + creds.insert(global_key.to_string(), value); + } + + // Collect kv_data and dirty flags + for service in KV_SERVICES { + let kv_key = format!("kv_data_{service}"); + if let Some(kv_json) = data.get(&kv_key).and_then(|v| v.as_str()) { + creds.insert(format!("kv_data/{service}"), kv_json.to_string()); + } + + let dirty_key = format!("dirty_{service}"); + if data.get(&dirty_key).and_then(|v| v.as_bool()).unwrap_or(false) { + dirty_paths.push(service.to_string()); + } + } + + ok(&format!("Collected credentials from {} services ({} dirty)", + KV_SERVICES.len(), dirty_paths.len())); + + let mut result = ExecutionResult::next(); + result.output_data = Some(serde_json::json!({ + "creds": creds, + "dirty_paths": dirty_paths, + })); + Ok(result) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn collect_credentials_is_default() { + let _ = CollectCredentials::default(); + } + + #[test] + fn cred_mappings_cover_all_expected_keys() { + assert_eq!(CRED_MAPPINGS.len(), 15); + assert!(CRED_MAPPINGS.iter().any(|(k, _, _)| *k == "hydra-system-secret")); + assert!(CRED_MAPPINGS.iter().any(|(k, _, _)| *k == "messages-dkim-public-key")); + } + + #[test] + fn kv_services_count() { + assert_eq!(KV_SERVICES.len(), 19); + } +} diff --git a/src/workflows/primitives/create_k8s_secret.rs b/src/workflows/primitives/create_k8s_secret.rs new file mode 100644 index 00000000..48ec7091 --- /dev/null +++ b/src/workflows/primitives/create_k8s_secret.rs @@ -0,0 +1,99 @@ +//! CreateK8sSecret — atomic step that creates a single Kubernetes secret. +//! +//! Reads creds from workflow data and maps them to secret keys via step_config. + +use std::collections::HashMap; + +use wfe_core::models::ExecutionResult; +use wfe_core::traits::{StepBody, StepExecutionContext}; + +use crate::kube as k; +use crate::output::ok; + +fn step_err(msg: impl Into) -> wfe_core::WfeError { + wfe_core::WfeError::StepExecution(msg.into()) +} + +/// Create a single Kubernetes Opaque secret (idempotent via SSA). +/// +/// **step_config:** +/// ```json +/// { +/// "namespace": "ory", +/// "name": "hydra", +/// "data": { +/// "secretsSystem": "hydra-system-secret", +/// "secretsCookie": "hydra-cookie-secret" +/// } +/// } +/// ``` +/// +/// Each value in `data` is a key into the `creds` map in workflow data. +/// If a value starts with `literal:`, the rest is used as the literal value. +/// If a value is `s3_json`, builds the seaweedfs s3.json blob from s3 creds. +/// +/// Reads `creds` and `skip_seed` from workflow data. +#[derive(Default)] +pub struct CreateK8sSecret; + +#[async_trait::async_trait] +impl StepBody for CreateK8sSecret { + async fn run( + &mut self, + ctx: &StepExecutionContext<'_>, + ) -> wfe_core::Result { + let data = &ctx.workflow.data; + + if data.get("skip_seed").and_then(|v| v.as_bool()).unwrap_or(false) { + return Ok(ExecutionResult::next()); + } + + let config = ctx.step.step_config.as_ref() + .ok_or_else(|| step_err("CreateK8sSecret: missing step_config"))?; + let namespace = config.get("namespace") + .and_then(|v| v.as_str()) + .ok_or_else(|| step_err("CreateK8sSecret: missing namespace"))?; + let name = config.get("name") + .and_then(|v| v.as_str()) + .ok_or_else(|| step_err("CreateK8sSecret: missing name"))?; + let data_map = config.get("data") + .and_then(|v| v.as_object()) + .ok_or_else(|| step_err("CreateK8sSecret: missing data"))?; + + let creds: HashMap = data.get("creds") + .and_then(|v| serde_json::from_value(v.clone()).ok()) + .unwrap_or_default(); + + let mut secret_data: HashMap = HashMap::new(); + + for (secret_key, cred_ref) in data_map { + let cred_ref = cred_ref.as_str().unwrap_or(""); + let value = if cred_ref.starts_with("literal:") { + cred_ref.strip_prefix("literal:").unwrap_or("").to_string() + } else if cred_ref == "s3_json" { + let ak = creds.get("s3-access-key").cloned().unwrap_or_default(); + let sk = creds.get("s3-secret-key").cloned().unwrap_or_default(); + crate::workflows::seed::steps::k8s_secrets::build_s3_json(&ak, &sk) + } else { + creds.get(cred_ref).cloned().unwrap_or_default() + }; + secret_data.insert(secret_key.clone(), value); + } + + k::create_secret(namespace, name, secret_data).await + .map_err(|e| step_err(format!("CreateK8sSecret({namespace}/{name}): {e}")))?; + + ok(&format!("K8s secret: {namespace}/{name}")); + Ok(ExecutionResult::next()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn create_k8s_secret_is_default() { + let _ = CreateK8sSecret::default(); + } +} diff --git a/src/workflows/primitives/create_pg_database.rs b/src/workflows/primitives/create_pg_database.rs new file mode 100644 index 00000000..0219d1c5 --- /dev/null +++ b/src/workflows/primitives/create_pg_database.rs @@ -0,0 +1,71 @@ +//! CreatePGDatabase — atomic step that creates a single PostgreSQL database. +//! +//! Reads `step_config.dbname`, `step_config.owner`, and `pg_pod` from workflow data. + +use wfe_core::models::ExecutionResult; +use wfe_core::traits::{StepBody, StepExecutionContext}; + +use crate::kube as k; +use crate::output::ok; +use crate::workflows::seed::steps::postgres::create_db_sql; + +fn step_err(msg: impl Into) -> wfe_core::WfeError { + wfe_core::WfeError::StepExecution(msg.into()) +} + +/// Create a single PostgreSQL database (idempotent — CREATE DATABASE errors are ignored). +/// +/// **step_config:** `{"dbname": "kratos_db", "owner": "kratos"}` +/// +/// Reads `pg_pod` and `skip_seed` from workflow data. +#[derive(Default)] +pub struct CreatePGDatabase; + +#[async_trait::async_trait] +impl StepBody for CreatePGDatabase { + async fn run( + &mut self, + ctx: &StepExecutionContext<'_>, + ) -> wfe_core::Result { + let data = &ctx.workflow.data; + + if data.get("skip_seed").and_then(|v| v.as_bool()).unwrap_or(false) { + return Ok(ExecutionResult::next()); + } + + let pg_pod = match data.get("pg_pod").and_then(|v| v.as_str()) { + Some(p) if !p.is_empty() => p, + _ => return Ok(ExecutionResult::next()), + }; + + let config = ctx.step.step_config.as_ref() + .ok_or_else(|| step_err("CreatePGDatabase: missing step_config"))?; + let dbname = config.get("dbname") + .and_then(|v| v.as_str()) + .ok_or_else(|| step_err("CreatePGDatabase: missing dbname in step_config"))?; + let owner = config.get("owner") + .and_then(|v| v.as_str()) + .ok_or_else(|| step_err("CreatePGDatabase: missing owner in step_config"))?; + + let sql = create_db_sql(dbname, owner); + // kube_exec runs the command inside a Kubernetes pod, not a local shell + let _ = k::kube_exec( + "data", pg_pod, + &["psql", "-U", "postgres", "-c", &sql], + Some("postgres"), + ).await; + + ok(&format!("PG database: {dbname} (owner: {owner})")); + Ok(ExecutionResult::next()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn create_pg_database_is_default() { + let _ = CreatePGDatabase::default(); + } +} diff --git a/src/workflows/primitives/create_pg_role.rs b/src/workflows/primitives/create_pg_role.rs new file mode 100644 index 00000000..8a47ab94 --- /dev/null +++ b/src/workflows/primitives/create_pg_role.rs @@ -0,0 +1,67 @@ +//! CreatePGRole — atomic step that creates a single PostgreSQL role. +//! +//! Reads `step_config.username` and `pg_pod` from workflow data. + +use wfe_core::models::ExecutionResult; +use wfe_core::traits::{StepBody, StepExecutionContext}; + +use crate::kube as k; +use crate::output::ok; +use crate::workflows::seed::steps::postgres::ensure_user_sql; + +fn step_err(msg: impl Into) -> wfe_core::WfeError { + wfe_core::WfeError::StepExecution(msg.into()) +} + +/// Create a single PostgreSQL role (idempotent). +/// +/// **step_config:** `{"username": "kratos"}` +/// +/// Reads `pg_pod` and `skip_seed` from workflow data. +#[derive(Default)] +pub struct CreatePGRole; + +#[async_trait::async_trait] +impl StepBody for CreatePGRole { + async fn run( + &mut self, + ctx: &StepExecutionContext<'_>, + ) -> wfe_core::Result { + let data = &ctx.workflow.data; + + if data.get("skip_seed").and_then(|v| v.as_bool()).unwrap_or(false) { + return Ok(ExecutionResult::next()); + } + + let pg_pod = match data.get("pg_pod").and_then(|v| v.as_str()) { + Some(p) if !p.is_empty() => p, + _ => return Ok(ExecutionResult::next()), + }; + + let config = ctx.step.step_config.as_ref() + .ok_or_else(|| step_err("CreatePGRole: missing step_config"))?; + let username = config.get("username") + .and_then(|v| v.as_str()) + .ok_or_else(|| step_err("CreatePGRole: missing username in step_config"))?; + + let sql = ensure_user_sql(username); + let _ = k::kube_exec( + "data", pg_pod, + &["psql", "-U", "postgres", "-c", &sql], + Some("postgres"), + ).await; + + ok(&format!("PG role: {username}")); + Ok(ExecutionResult::next()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn create_pg_role_is_default() { + let _ = CreatePGRole::default(); + } +} diff --git a/src/workflows/primitives/ensure_namespace.rs b/src/workflows/primitives/ensure_namespace.rs new file mode 100644 index 00000000..4a4327b3 --- /dev/null +++ b/src/workflows/primitives/ensure_namespace.rs @@ -0,0 +1,45 @@ +//! EnsureNamespace — atomic step that ensures a Kubernetes namespace exists. + +use wfe_core::models::ExecutionResult; +use wfe_core::traits::{StepBody, StepExecutionContext}; + +use crate::kube as k; + +fn step_err(msg: impl Into) -> wfe_core::WfeError { + wfe_core::WfeError::StepExecution(msg.into()) +} + +/// Ensure a single Kubernetes namespace exists (idempotent). +/// +/// **step_config:** `{"namespace": "ory"}` +#[derive(Default)] +pub struct EnsureNamespace; + +#[async_trait::async_trait] +impl StepBody for EnsureNamespace { + async fn run( + &mut self, + ctx: &StepExecutionContext<'_>, + ) -> wfe_core::Result { + let config = ctx.step.step_config.as_ref() + .ok_or_else(|| step_err("EnsureNamespace: missing step_config"))?; + let namespace = config.get("namespace") + .and_then(|v| v.as_str()) + .ok_or_else(|| step_err("EnsureNamespace: missing namespace in step_config"))?; + + k::ensure_ns(namespace).await + .map_err(|e| step_err(format!("EnsureNamespace({namespace}): {e}")))?; + + Ok(ExecutionResult::next()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn ensure_namespace_is_default() { + let _ = EnsureNamespace::default(); + } +} diff --git a/src/workflows/primitives/kv_service_configs.rs b/src/workflows/primitives/kv_service_configs.rs new file mode 100644 index 00000000..db43139c --- /dev/null +++ b/src/workflows/primitives/kv_service_configs.rs @@ -0,0 +1,145 @@ +//! KV service configuration data — defines what each service needs seeded. +//! +//! Used by workflow definitions to generate SeedKVPath + WriteKVPath parallel branches. + +use serde_json::{json, Value}; + +/// Returns the step_config for each service's SeedKVPath step. +/// Order matters: seaweedfs must come before kratos-admin (dependency). +pub fn all_service_configs() -> Vec { + vec![ + json!({"service":"hydra","fields":[ + {"key":"system-secret","generator":"rand_token"}, + {"key":"cookie-secret","generator":"rand_token"}, + {"key":"pairwise-salt","generator":"rand_token"} + ]}), + json!({"service":"kratos","fields":[ + {"key":"secrets-default","generator":"rand_token"}, + {"key":"secrets-cookie","generator":"rand_token"}, + {"key":"smtp-connection-uri","generator":"smtp_uri"} + ]}), + json!({"service":"seaweedfs","fields":[ + {"key":"access-key","generator":"rand_token"}, + {"key":"secret-key","generator":"rand_token"} + ]}), + json!({"service":"gitea","fields":[ + {"key":"admin-username","generator":"gitea_admin"}, + {"key":"admin-password","generator":"rand_token"} + ]}), + json!({"service":"hive","fields":[ + {"key":"oidc-client-id","generator":"static:hive-local"}, + {"key":"oidc-client-secret","generator":"rand_token"} + ]}), + json!({"service":"livekit","fields":[ + {"key":"api-key","generator":"static:devkey"}, + {"key":"api-secret","generator":"rand_token"} + ]}), + json!({"service":"people","fields":[ + {"key":"django-secret-key","generator":"rand_token"} + ]}), + json!({"service":"login-ui","fields":[ + {"key":"cookie-secret","generator":"rand_token"}, + {"key":"csrf-cookie-secret","generator":"rand_token"} + ]}), + json!({"service":"docs","fields":[ + {"key":"django-secret-key","generator":"rand_token"}, + {"key":"collaboration-secret","generator":"rand_token"} + ]}), + json!({"service":"meet","fields":[ + {"key":"django-secret-key","generator":"rand_token"}, + {"key":"application-jwt-secret-key","generator":"rand_token"} + ]}), + json!({"service":"drive","fields":[ + {"key":"django-secret-key","generator":"rand_token"} + ]}), + json!({"service":"projects","fields":[ + {"key":"secret-key","generator":"rand_token"} + ]}), + json!({"service":"calendars","fields":[ + {"key":"django-secret-key","generator":"rand_token_50"}, + {"key":"salt-key","generator":"rand_token"}, + {"key":"caldav-inbound-api-key","generator":"rand_token"}, + {"key":"caldav-outbound-api-key","generator":"rand_token"}, + {"key":"caldav-internal-api-key","generator":"rand_token"} + ]}), + json!({"service":"messages","fields":[ + {"key":"django-secret-key","generator":"rand_token"}, + {"key":"salt-key","generator":"rand_token"}, + {"key":"mda-api-secret","generator":"rand_token"}, + {"key":"oidc-refresh-token-key","generator":"fernet_key"}, + {"key":"dkim-private-key","generator":"dkim_private"}, + {"key":"dkim-public-key","generator":"dkim_public"}, + {"key":"rspamd-password","generator":"rand_token"}, + {"key":"socks-proxy-users","generator":"socks_proxy"}, + {"key":"mta-out-smtp-username","generator":"static:sunbeam"}, + {"key":"mta-out-smtp-password","generator":"rand_token"} + ]}), + json!({"service":"collabora","fields":[ + {"key":"username","generator":"static:admin"}, + {"key":"password","generator":"rand_token"} + ]}), + json!({"service":"tuwunel","fields":[ + {"key":"oidc-client-id","generator":"static:"}, + {"key":"oidc-client-secret","generator":"static:"}, + {"key":"turn-secret","generator":"static:"}, + {"key":"registration-token","generator":"rand_token"} + ]}), + json!({"service":"grafana","fields":[ + {"key":"admin-password","generator":"rand_token"} + ]}), + json!({"service":"scaleway-s3","fields":[ + {"key":"access-key-id","generator":"scw_config_access"}, + {"key":"secret-access-key","generator":"scw_config_secret"} + ]}), + ] +} + +/// Returns the config for kratos-admin, which depends on seaweedfs creds. +/// Must be seeded AFTER seaweedfs in the workflow (sequential after seaweedfs branch). +pub fn kratos_admin_config() -> Value { + json!({"service":"kratos-admin","fields":[ + {"key":"cookie-secret","generator":"rand_token"}, + {"key":"csrf-cookie-secret","generator":"rand_token"}, + {"key":"admin-identity-ids","generator":"static:"}, + {"key":"s3-access-key","generator":"from_creds:seaweedfs.access-key"}, + {"key":"s3-secret-key","generator":"from_creds:seaweedfs.secret-key"} + ]}) +} + +/// All service names (for WriteKVPath branches). +pub fn all_service_names() -> Vec<&'static str> { + vec![ + "hydra", "kratos", "seaweedfs", "gitea", "hive", "livekit", + "people", "login-ui", "kratos-admin", "docs", "meet", "drive", + "projects", "calendars", "messages", "collabora", "tuwunel", + "grafana", "scaleway-s3", + ] +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn all_configs_have_service_and_fields() { + for cfg in all_service_configs() { + assert!(cfg.get("service").is_some(), "missing service in {cfg}"); + assert!(cfg.get("fields").and_then(|f| f.as_array()).is_some(), "missing fields in {cfg}"); + } + } + + #[test] + fn service_count() { + // 18 independent + 1 kratos-admin (dependent) + assert_eq!(all_service_configs().len(), 18); + assert_eq!(all_service_names().len(), 19); + } + + #[test] + fn kratos_admin_has_from_creds() { + let cfg = kratos_admin_config(); + let fields = cfg["fields"].as_array().unwrap(); + let s3_field = fields.iter().find(|f| f["key"] == "s3-access-key").unwrap(); + assert!(s3_field["generator"].as_str().unwrap().starts_with("from_creds:")); + } +} diff --git a/src/workflows/primitives/mod.rs b/src/workflows/primitives/mod.rs new file mode 100644 index 00000000..ac02171a --- /dev/null +++ b/src/workflows/primitives/mod.rs @@ -0,0 +1,29 @@ +//! Atomic workflow primitives. +//! +//! Each step does exactly one thing, reads its config from `ctx.step.step_config`, +//! and can be composed with `.config(json!({...}))` in workflow definitions. + +mod apply_manifest; +mod collect_credentials; +pub mod kv_service_configs; +mod create_k8s_secret; +mod create_pg_database; +mod create_pg_role; +mod ensure_namespace; +mod opensearch_ml; +mod seed_kv_path; +mod vault_auth; +mod wait_for_rollout; +mod write_kv_path; + +pub use apply_manifest::ApplyManifest; +pub use collect_credentials::CollectCredentials; +pub use create_k8s_secret::CreateK8sSecret; +pub use create_pg_database::CreatePGDatabase; +pub use create_pg_role::CreatePGRole; +pub use ensure_namespace::EnsureNamespace; +pub use opensearch_ml::{EnsureOpenSearchML, InjectOpenSearchModelId}; +pub use seed_kv_path::SeedKVPath; +pub use vault_auth::{EnableVaultAuth, WriteVaultAuthConfig, WriteVaultPolicy, WriteVaultRole}; +pub use wait_for_rollout::WaitForRollout; +pub use write_kv_path::WriteKVPath; diff --git a/src/workflows/primitives/opensearch_ml.rs b/src/workflows/primitives/opensearch_ml.rs new file mode 100644 index 00000000..d491fc64 --- /dev/null +++ b/src/workflows/primitives/opensearch_ml.rs @@ -0,0 +1,61 @@ +//! OpenSearch ML primitives — register/deploy ML model and inject model ID. +//! +//! These run in a parallel branch alongside the main pipeline so the +//! 10+ minute ML model download doesn't block everything else. + +use wfe_core::models::ExecutionResult; +use wfe_core::traits::{StepBody, StepExecutionContext}; + +use crate::output::step; + +/// Register and deploy the OpenSearch ML model (all-mpnet-base-v2). +/// +/// No step_config needed — reads nothing from workflow data. +/// Can take 10+ minutes on first run (model download). +#[derive(Default)] +pub struct EnsureOpenSearchML; + +#[async_trait::async_trait] +impl StepBody for EnsureOpenSearchML { + async fn run( + &mut self, + _ctx: &StepExecutionContext<'_>, + ) -> wfe_core::Result { + step("OpenSearch ML model..."); + crate::manifests::ensure_opensearch_ml().await; + Ok(ExecutionResult::next()) + } +} + +/// Inject the OpenSearch model_id into the matrix/opensearch-ml-config ConfigMap. +/// +/// Should run after both the ML model is deployed AND matrix manifests are applied. +#[derive(Default)] +pub struct InjectOpenSearchModelId; + +#[async_trait::async_trait] +impl StepBody for InjectOpenSearchModelId { + async fn run( + &mut self, + _ctx: &StepExecutionContext<'_>, + ) -> wfe_core::Result { + step("OpenSearch model ID injection..."); + crate::manifests::inject_opensearch_model_id().await; + Ok(ExecutionResult::next()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn ensure_opensearch_ml_is_default() { + let _ = EnsureOpenSearchML::default(); + } + + #[test] + fn inject_opensearch_model_id_is_default() { + let _ = InjectOpenSearchModelId::default(); + } +} diff --git a/src/workflows/primitives/seed_kv_path.rs b/src/workflows/primitives/seed_kv_path.rs new file mode 100644 index 00000000..ce34269b --- /dev/null +++ b/src/workflows/primitives/seed_kv_path.rs @@ -0,0 +1,224 @@ +//! SeedKVPath — atomic step that seeds a single OpenBao KV path for one service. +//! +//! Creates its own port-forward, calls `get_or_create` for one service, +//! outputs per-service creds and dirty flag. + +use std::collections::HashSet; + +use wfe_core::models::ExecutionResult; +use wfe_core::traits::{StepBody, StepExecutionContext}; + +use crate::openbao::BaoClient; +use crate::output::ok; +use crate::secrets::{ + self, gen_dkim_key_pair, gen_fernet_key, rand_token, rand_token_n, scw_config, + GITEA_ADMIN_USER, SMTP_URI, +}; + +fn step_err(msg: impl Into) -> wfe_core::WfeError { + wfe_core::WfeError::StepExecution(msg.into()) +} + +/// Resolve a generator function from a string type name. +/// Returns a closure that produces the secret value. +fn make_generator(gen_type: &str) -> Box String + Send + Sync> { + if let Some(val) = gen_type.strip_prefix("static:") { + let val = val.to_string(); + return Box::new(move || val.clone()); + } + match gen_type { + "rand_token" => Box::new(rand_token), + "rand_token_50" => Box::new(|| rand_token_n(50)), + "fernet_key" => Box::new(gen_fernet_key), + "scw_config_access" => Box::new(|| scw_config("access-key")), + "scw_config_secret" => Box::new(|| scw_config("secret-key")), + "smtp_uri" => Box::new(|| SMTP_URI.to_string()), + "gitea_admin" => Box::new(|| GITEA_ADMIN_USER.to_string()), + "socks_proxy" => Box::new(|| format!("sunbeam:{}", rand_token())), + _ => Box::new(|| String::new()), + } +} + +/// Seed a single service's KV path in OpenBao. +/// +/// **step_config:** +/// ```json +/// { +/// "service": "hydra", +/// "fields": [ +/// {"key": "system-secret", "generator": "rand_token"}, +/// {"key": "cookie-secret", "generator": "rand_token"} +/// ] +/// } +/// ``` +/// +/// Generator types: `rand_token`, `rand_token_50`, `fernet_key`, `smtp_uri`, +/// `gitea_admin`, `scw_config_access`, `scw_config_secret`, `socks_proxy`, +/// `static:`, `dkim_private`, `dkim_public`. +/// +/// For `messages` service with DKIM: use gen types `dkim_private` and `dkim_public`. +/// The step will read existing DKIM keys from OpenBao and only generate if missing. +/// +/// **Output:** `{"creds_{service}": {...}, "kv_data_{service}": "{...}", "dirty_{service}": true/false}` +/// +/// Reads `skip_seed`, `ob_pod`, `root_token` from workflow data. +/// For `from_creds:KEY` generators, reads `creds_seaweedfs.KEY` from workflow data (requires +/// seaweedfs to be seeded first — run in a sequential branch before this step). +#[derive(Default)] +pub struct SeedKVPath; + +#[async_trait::async_trait] +impl StepBody for SeedKVPath { + async fn run( + &mut self, + ctx: &StepExecutionContext<'_>, + ) -> wfe_core::Result { + let data = &ctx.workflow.data; + + if data.get("skip_seed").and_then(|v| v.as_bool()).unwrap_or(false) { + return Ok(ExecutionResult::next()); + } + + let ob_pod = match data.get("ob_pod").and_then(|v| v.as_str()) { + Some(p) => p, + None => return Ok(ExecutionResult::next()), + }; + let root_token = match data.get("root_token").and_then(|v| v.as_str()) { + Some(t) => t, + None => return Ok(ExecutionResult::next()), + }; + + let config = ctx.step.step_config.as_ref() + .ok_or_else(|| step_err("SeedKVPath: missing step_config"))?; + let service = config.get("service").and_then(|v| v.as_str()) + .ok_or_else(|| step_err("SeedKVPath: missing service"))?; + let fields = config.get("fields").and_then(|v| v.as_array()) + .ok_or_else(|| step_err("SeedKVPath: missing fields"))?; + + let pf = secrets::port_forward("data", ob_pod, 8200).await + .map_err(|e| step_err(e.to_string()))?; + let bao = BaoClient::with_token( + &format!("http://127.0.0.1:{}", pf.local_port), + root_token, + ); + + // Handle DKIM special case: read existing keys before get_or_create + let mut dkim_private = String::new(); + let mut dkim_public = String::new(); + let has_dkim = fields.iter().any(|f| + f.get("generator").and_then(|g| g.as_str()).map_or(false, |g| g == "dkim_private" || g == "dkim_public") + ); + if has_dkim { + let existing = bao.kv_get("secret", service).await + .map_err(|e| step_err(e.to_string()))? + .unwrap_or_default(); + if existing.get("dkim-private-key").filter(|v| !v.is_empty()).is_some() { + dkim_private = existing.get("dkim-private-key").cloned().unwrap_or_default(); + dkim_public = existing.get("dkim-public-key").cloned().unwrap_or_default(); + } else { + let (priv_key, pub_key) = gen_dkim_key_pair(); + dkim_private = priv_key; + dkim_public = pub_key; + } + } + + // Build field generators, resolving from_creds references from workflow data + let generators: Vec<(&str, Box String + Send + Sync>)> = fields.iter() + .filter_map(|f| { + let key = f.get("key")?.as_str()?; + let gen_type = f.get("generator")?.as_str()?; + + let genfn: Box String + Send + Sync> = if let Some(cred_key) = gen_type.strip_prefix("from_creds:") { + // Read from another service's output in workflow data + let source_service = cred_key.split('.').next().unwrap_or(""); + let source_field = cred_key.split('.').nth(1).unwrap_or(cred_key); + let val = data.get(&format!("creds_{source_service}")) + .and_then(|v| v.get(source_field)) + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + Box::new(move || val.clone()) + } else if gen_type == "dkim_private" { + let v = dkim_private.clone(); + Box::new(move || v.clone()) + } else if gen_type == "dkim_public" { + let v = dkim_public.clone(); + Box::new(move || v.clone()) + } else { + make_generator(gen_type) + }; + + Some((key, genfn)) + }) + .collect(); + + let gen_refs: Vec<(&str, &(dyn Fn() -> String + Send + Sync))> = generators.iter() + .map(|(k, g)| (*k, g.as_ref())) + .collect(); + + let mut dirty_paths: HashSet = HashSet::new(); + let result_map = secrets::get_or_create(&bao, service, &gen_refs, &mut dirty_paths) + .await + .map_err(|e| step_err(format!("SeedKVPath({service}): {e}")))?; + + let is_dirty = dirty_paths.contains(service); + let kv_json = serde_json::to_string(&result_map) + .map_err(|e| step_err(e.to_string()))?; + + ok(&format!("KV seed: {service}{}", if is_dirty { " (new)" } else { "" })); + + let mut output = serde_json::Map::new(); + output.insert( + format!("creds_{service}"), + serde_json::to_value(&result_map).unwrap_or_default(), + ); + output.insert(format!("kv_data_{service}"), serde_json::Value::String(kv_json)); + output.insert(format!("dirty_{service}"), serde_json::Value::Bool(is_dirty)); + + let mut exec_result = ExecutionResult::next(); + exec_result.output_data = Some(serde_json::Value::Object(output)); + Ok(exec_result) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn seed_kv_path_is_default() { + let _ = SeedKVPath::default(); + } + + #[test] + fn make_generator_rand_token() { + let genfn = make_generator("rand_token"); + let val = genfn(); + assert!(!val.is_empty()); + } + + #[test] + fn make_generator_static() { + let genfn = make_generator("static:hello"); + assert_eq!(genfn(),"hello"); + } + + #[test] + fn make_generator_empty_static() { + let genfn = make_generator("static:"); + assert_eq!(genfn(),""); + } + + #[test] + fn make_generator_fernet() { + let genfn = make_generator("fernet_key"); + let val = genfn(); + assert!(!val.is_empty()); + } + + #[test] + fn make_generator_unknown_returns_empty() { + let genfn = make_generator("unknown"); + assert_eq!(genfn(),""); + } +} diff --git a/src/workflows/primitives/vault_auth.rs b/src/workflows/primitives/vault_auth.rs new file mode 100644 index 00000000..ce16dea6 --- /dev/null +++ b/src/workflows/primitives/vault_auth.rs @@ -0,0 +1,189 @@ +//! Vault auth primitives — atomic steps for OpenBao auth configuration. +//! +//! Each step creates its own port-forward to OpenBao, performs one operation, and drops it. +//! Reads `ob_pod`, `root_token`, `skip_seed` from workflow data. + +use wfe_core::models::ExecutionResult; +use wfe_core::traits::{StepBody, StepExecutionContext}; + +use crate::openbao::BaoClient; +use crate::output::ok; +use crate::secrets; + +fn step_err(msg: impl Into) -> wfe_core::WfeError { + wfe_core::WfeError::StepExecution(msg.into()) +} + +fn should_skip(data: &serde_json::Value) -> bool { + data.get("skip_seed").and_then(|v| v.as_bool()).unwrap_or(false) +} + +fn get_str(data: &serde_json::Value, key: &str) -> Option { + data.get(key).and_then(|v| v.as_str()).map(|s| s.to_string()) +} + +async fn connect_bao(data: &serde_json::Value) -> Result<(BaoClient, secrets::PortForwardGuard), wfe_core::WfeError> { + let ob_pod = get_str(data, "ob_pod") + .ok_or_else(|| step_err("vault auth: missing ob_pod"))?; + let root_token = get_str(data, "root_token") + .ok_or_else(|| step_err("vault auth: missing root_token"))?; + let pf = secrets::port_forward("data", &ob_pod, 8200).await + .map_err(|e| step_err(e.to_string()))?; + let bao = BaoClient::with_token( + &format!("http://127.0.0.1:{}", pf.local_port), + &root_token, + ); + Ok((bao, pf)) +} + +// ── EnableVaultAuth ───────────────────────────────────────────────────────── + +/// Enable an auth method at a mount path. +/// +/// **step_config:** `{"mount": "kubernetes", "type": "kubernetes"}` +#[derive(Default)] +pub struct EnableVaultAuth; + +#[async_trait::async_trait] +impl StepBody for EnableVaultAuth { + async fn run(&mut self, ctx: &StepExecutionContext<'_>) -> wfe_core::Result { + let data = &ctx.workflow.data; + if should_skip(data) { return Ok(ExecutionResult::next()); } + if get_str(data, "ob_pod").is_none() || get_str(data, "root_token").is_none() { + return Ok(ExecutionResult::next()); + } + + let config = ctx.step.step_config.as_ref() + .ok_or_else(|| step_err("EnableVaultAuth: missing step_config"))?; + let mount = config.get("mount").and_then(|v| v.as_str()) + .ok_or_else(|| step_err("EnableVaultAuth: missing mount"))?; + let auth_type = config.get("type").and_then(|v| v.as_str()).unwrap_or(mount); + + let (bao, _pf) = connect_bao(data).await?; + let _ = bao.auth_enable(mount, auth_type).await; + ok(&format!("Vault auth enabled: {mount}")); + Ok(ExecutionResult::next()) + } +} + +// ── WriteVaultAuthConfig ──────────────────────────────────────────────────── + +/// Write auth method configuration. +/// +/// **step_config:** `{"mount": "kubernetes", "config": {"kubernetes_host": "..."}}` +#[derive(Default)] +pub struct WriteVaultAuthConfig; + +#[async_trait::async_trait] +impl StepBody for WriteVaultAuthConfig { + async fn run(&mut self, ctx: &StepExecutionContext<'_>) -> wfe_core::Result { + let data = &ctx.workflow.data; + if should_skip(data) { return Ok(ExecutionResult::next()); } + if get_str(data, "ob_pod").is_none() || get_str(data, "root_token").is_none() { + return Ok(ExecutionResult::next()); + } + + let config = ctx.step.step_config.as_ref() + .ok_or_else(|| step_err("WriteVaultAuthConfig: missing step_config"))?; + let mount = config.get("mount").and_then(|v| v.as_str()) + .ok_or_else(|| step_err("WriteVaultAuthConfig: missing mount"))?; + let auth_config = config.get("config") + .ok_or_else(|| step_err("WriteVaultAuthConfig: missing config"))?; + + let (bao, _pf) = connect_bao(data).await?; + bao.write(&format!("auth/{mount}/config"), auth_config).await + .map_err(|e| step_err(format!("WriteVaultAuthConfig({mount}): {e}")))?; + ok(&format!("Vault auth config: {mount}")); + Ok(ExecutionResult::next()) + } +} + +// ── WriteVaultPolicy ──────────────────────────────────────────────────────── + +/// Write a Vault/OpenBao policy. +/// +/// **step_config:** `{"name": "vso-reader", "hcl": "path \"secret/data/*\" { ... }"}` +#[derive(Default)] +pub struct WriteVaultPolicy; + +#[async_trait::async_trait] +impl StepBody for WriteVaultPolicy { + async fn run(&mut self, ctx: &StepExecutionContext<'_>) -> wfe_core::Result { + let data = &ctx.workflow.data; + if should_skip(data) { return Ok(ExecutionResult::next()); } + if get_str(data, "ob_pod").is_none() || get_str(data, "root_token").is_none() { + return Ok(ExecutionResult::next()); + } + + let config = ctx.step.step_config.as_ref() + .ok_or_else(|| step_err("WriteVaultPolicy: missing step_config"))?; + let name = config.get("name").and_then(|v| v.as_str()) + .ok_or_else(|| step_err("WriteVaultPolicy: missing name"))?; + let hcl = config.get("hcl").and_then(|v| v.as_str()) + .ok_or_else(|| step_err("WriteVaultPolicy: missing hcl"))?; + + let (bao, _pf) = connect_bao(data).await?; + bao.write_policy(name, hcl).await + .map_err(|e| step_err(format!("WriteVaultPolicy({name}): {e}")))?; + ok(&format!("Vault policy: {name}")); + Ok(ExecutionResult::next()) + } +} + +// ── WriteVaultRole ────────────────────────────────────────────────────────── + +/// Write an auth method role. +/// +/// **step_config:** `{"mount": "kubernetes", "role": "vso", "config": {...}}` +#[derive(Default)] +pub struct WriteVaultRole; + +#[async_trait::async_trait] +impl StepBody for WriteVaultRole { + async fn run(&mut self, ctx: &StepExecutionContext<'_>) -> wfe_core::Result { + let data = &ctx.workflow.data; + if should_skip(data) { return Ok(ExecutionResult::next()); } + if get_str(data, "ob_pod").is_none() || get_str(data, "root_token").is_none() { + return Ok(ExecutionResult::next()); + } + + let config = ctx.step.step_config.as_ref() + .ok_or_else(|| step_err("WriteVaultRole: missing step_config"))?; + let mount = config.get("mount").and_then(|v| v.as_str()) + .ok_or_else(|| step_err("WriteVaultRole: missing mount"))?; + let role = config.get("role").and_then(|v| v.as_str()) + .ok_or_else(|| step_err("WriteVaultRole: missing role"))?; + let role_config = config.get("config") + .ok_or_else(|| step_err("WriteVaultRole: missing config"))?; + + let (bao, _pf) = connect_bao(data).await?; + bao.write(&format!("auth/{mount}/role/{role}"), role_config).await + .map_err(|e| step_err(format!("WriteVaultRole({mount}/{role}): {e}")))?; + ok(&format!("Vault role: {mount}/{role}")); + Ok(ExecutionResult::next()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn all_vault_auth_steps_are_default() { + let _ = EnableVaultAuth::default(); + let _ = WriteVaultAuthConfig::default(); + let _ = WriteVaultPolicy::default(); + let _ = WriteVaultRole::default(); + } + + #[test] + fn should_skip_true() { + assert!(should_skip(&serde_json::json!({"skip_seed": true}))); + } + + #[test] + fn should_skip_false() { + assert!(!should_skip(&serde_json::json!({"skip_seed": false}))); + assert!(!should_skip(&serde_json::json!({}))); + } +} diff --git a/src/workflows/primitives/wait_for_rollout.rs b/src/workflows/primitives/wait_for_rollout.rs new file mode 100644 index 00000000..de0a7329 --- /dev/null +++ b/src/workflows/primitives/wait_for_rollout.rs @@ -0,0 +1,65 @@ +//! WaitForRollout — atomic step that waits for a single Deployment to be ready. +//! +//! Reads `step_config.namespace`, `step_config.deployment`, and optional `step_config.timeout_secs`. + +use wfe_core::models::ExecutionResult; +use wfe_core::traits::{StepBody, StepExecutionContext}; + +use crate::output::step; + +fn step_err(msg: impl Into) -> wfe_core::WfeError { + wfe_core::WfeError::StepExecution(msg.into()) +} + +/// Wait for a single Deployment rollout to complete. +/// +/// **step_config:** `{"namespace": "ory", "deployment": "kratos", "timeout_secs": 120}` +/// +/// `timeout_secs` defaults to 120 if omitted. +#[derive(Default)] +pub struct WaitForRollout; + +#[async_trait::async_trait] +impl StepBody for WaitForRollout { + async fn run( + &mut self, + ctx: &StepExecutionContext<'_>, + ) -> wfe_core::Result { + let config = ctx.step.step_config.as_ref() + .ok_or_else(|| step_err("WaitForRollout: missing step_config"))?; + let namespace = config.get("namespace") + .and_then(|v| v.as_str()) + .ok_or_else(|| step_err("WaitForRollout: missing namespace in step_config"))?; + let deployment = config.get("deployment") + .and_then(|v| v.as_str()) + .ok_or_else(|| step_err("WaitForRollout: missing deployment in step_config"))?; + let timeout_secs = config.get("timeout_secs") + .and_then(|v| v.as_u64()) + .unwrap_or(120); + + step(&format!("Waiting for {namespace}/{deployment}...")); + + crate::cluster::wait_rollout(namespace, deployment, timeout_secs) + .await + .map_err(|e| step_err(format!("WaitForRollout({namespace}/{deployment}): {e}")))?; + + Ok(ExecutionResult::next()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn wait_for_rollout_is_default() { + let _ = WaitForRollout::default(); + } + + #[test] + fn error_includes_context() { + let err = step_err(format!("WaitForRollout({}/{}): timed out", "ory", "kratos")); + let msg = err.to_string(); + assert!(msg.contains("ory/kratos"), "error should include ns/deploy: {msg}"); + } +} diff --git a/src/workflows/primitives/write_kv_path.rs b/src/workflows/primitives/write_kv_path.rs new file mode 100644 index 00000000..fd334b07 --- /dev/null +++ b/src/workflows/primitives/write_kv_path.rs @@ -0,0 +1,87 @@ +//! WriteKVPath — atomic step that writes a single KV path to OpenBao. +//! +//! Only writes if the service was marked dirty by SeedKVPath. + +use std::collections::HashMap; + +use wfe_core::models::ExecutionResult; +use wfe_core::traits::{StepBody, StepExecutionContext}; + +use crate::openbao::BaoClient; +use crate::output::ok; +use crate::secrets; + +fn step_err(msg: impl Into) -> wfe_core::WfeError { + wfe_core::WfeError::StepExecution(msg.into()) +} + +/// Write a single KV path to OpenBao if it was marked dirty. +/// +/// **step_config:** `{"service": "hydra"}` +/// +/// Reads `dirty_{service}`, `kv_data_{service}`, `ob_pod`, `root_token`, +/// `skip_seed` from workflow data. +#[derive(Default)] +pub struct WriteKVPath; + +#[async_trait::async_trait] +impl StepBody for WriteKVPath { + async fn run( + &mut self, + ctx: &StepExecutionContext<'_>, + ) -> wfe_core::Result { + let data = &ctx.workflow.data; + + if data.get("skip_seed").and_then(|v| v.as_bool()).unwrap_or(false) { + return Ok(ExecutionResult::next()); + } + + let config = ctx.step.step_config.as_ref() + .ok_or_else(|| step_err("WriteKVPath: missing step_config"))?; + let service = config.get("service").and_then(|v| v.as_str()) + .ok_or_else(|| step_err("WriteKVPath: missing service"))?; + + let dirty_key = format!("dirty_{service}"); + let is_dirty = data.get(&dirty_key).and_then(|v| v.as_bool()).unwrap_or(false); + if !is_dirty { + return Ok(ExecutionResult::next()); + } + + let ob_pod = match data.get("ob_pod").and_then(|v| v.as_str()) { + Some(p) => p, + None => return Ok(ExecutionResult::next()), + }; + let root_token = match data.get("root_token").and_then(|v| v.as_str()) { + Some(t) => t, + None => return Ok(ExecutionResult::next()), + }; + + let kv_key = format!("kv_data_{service}"); + let kv_json = data.get(&kv_key).and_then(|v| v.as_str()).unwrap_or("{}"); + let path_data: HashMap = serde_json::from_str(kv_json) + .map_err(|e| step_err(format!("WriteKVPath({service}): bad kv_data: {e}")))?; + + let pf = secrets::port_forward("data", ob_pod, 8200).await + .map_err(|e| step_err(e.to_string()))?; + let bao = BaoClient::with_token( + &format!("http://127.0.0.1:{}", pf.local_port), + root_token, + ); + + bao.kv_patch("secret", service, &path_data).await + .map_err(|e| step_err(format!("WriteKVPath({service}): {e}")))?; + + ok(&format!("KV write: {service}")); + Ok(ExecutionResult::next()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn write_kv_path_is_default() { + let _ = WriteKVPath::default(); + } +} diff --git a/src/workflows/seed/steps/k8s_secrets.rs b/src/workflows/seed/steps/k8s_secrets.rs new file mode 100644 index 00000000..d0ce6aae --- /dev/null +++ b/src/workflows/seed/steps/k8s_secrets.rs @@ -0,0 +1,141 @@ +//! K8s secret mirroring steps: sync Gitea admin password. +//! +//! CreateK8sSecrets is now handled by the CreateK8sSecret primitive. + +use std::collections::HashMap; + +use k8s_openapi::api::core::v1::Pod; +use kube::api::{Api, ListParams}; +use wfe_core::models::ExecutionResult; +use wfe_core::traits::{StepBody, StepExecutionContext}; + +use crate::kube as k; +use crate::output::{ok, warn}; +use crate::secrets::GITEA_ADMIN_USER; + +fn step_err(msg: impl Into) -> wfe_core::WfeError { + wfe_core::WfeError::StepExecution(msg.into()) +} + +fn json_bool(data: &serde_json::Value, key: &str) -> bool { + data.get(key).and_then(|v| v.as_bool()).unwrap_or(false) +} + +// -- Pure helpers (used by CreateK8sSecret primitive) ------------------------- + +fn get_cred(creds: &HashMap, key: &str) -> String { + creds.get(key).cloned().unwrap_or_default() +} + +pub(crate) fn build_s3_json(access_key: &str, secret_key: &str) -> String { + serde_json::json!({ + "identities": [{ + "name": "seaweed", + "credentials": [{"accessKey": access_key, "secretKey": secret_key}], + "actions": ["Admin", "Read", "Write", "List", "Tagging"] + }] + }).to_string() +} + +// -- SyncGiteaAdminPassword ------------------------------------------------- + +/// Sync gitea admin password to Gitea's own DB. +/// +/// Reads: `skip_seed`, `creds` +#[derive(Default)] +pub struct SyncGiteaAdminPassword; + +#[async_trait::async_trait] +impl StepBody for SyncGiteaAdminPassword { + async fn run( + &mut self, + ctx: &StepExecutionContext<'_>, + ) -> wfe_core::Result { + let data = &ctx.workflow.data; + + if json_bool(data, "skip_seed") { + return Ok(ExecutionResult::next()); + } + + let creds: HashMap = data.get("creds") + .and_then(|v| serde_json::from_value(v.clone()).ok()) + .unwrap_or_default(); + + let gitea_admin_pass = get_cred(&creds, "gitea-admin-password"); + if gitea_admin_pass.is_empty() { + return Ok(ExecutionResult::next()); + } + + let client = k::get_client().await.map_err(|e| step_err(e.to_string()))?; + let gitea_pods: Api = Api::namespaced(client.clone(), "devtools"); + let lp = ListParams::default().labels("app.kubernetes.io/name=gitea"); + if let Ok(pod_list) = gitea_pods.list(&lp).await { + if let Some(gitea_pod) = pod_list.items.first().and_then(|p| p.metadata.name.as_deref()) { + match k::kube_exec( + "devtools", gitea_pod, + &[ + "gitea", "admin", "user", "change-password", + "--username", GITEA_ADMIN_USER, + "--password", &gitea_admin_pass, + "--must-change-password=false", + ], + Some("gitea"), + ).await { + Ok((0, _)) => ok("Gitea admin password synced to Gitea DB."), + Ok((_, stderr)) => warn(&format!("Could not sync Gitea admin password: {stderr}")), + Err(e) => warn(&format!("Could not sync Gitea admin password: {e}")), + } + } else { + warn("Gitea pod not found -- admin password NOT synced."); + } + } + + Ok(ExecutionResult::next()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + use wfe::run_workflow_sync; + use wfe_core::builder::WorkflowBuilder; + use wfe_core::models::WorkflowStatus; + + async fn run_step( + data: serde_json::Value, + ) -> wfe_core::models::WorkflowInstance { + let host = crate::workflows::host::create_test_host().await.unwrap(); + host.register_step::().await; + let def = WorkflowBuilder::::new() + .start_with::() + .name("test-step") + .end_workflow() + .build("test-wf", 1); + host.register_workflow_definition(def).await; + let instance = run_workflow_sync(&host, "test-wf", 1, data, Duration::from_secs(5)) + .await + .unwrap(); + host.stop().await; + instance + } + + #[tokio::test] + async fn test_sync_gitea_admin_password_skip_seed() { + let instance = run_step::(serde_json::json!({ "skip_seed": true })).await; + assert_eq!(instance.status, WorkflowStatus::Complete); + } + + #[tokio::test] + async fn test_sync_gitea_admin_password_empty() { + let instance = run_step::(serde_json::json!({ "skip_seed": false })).await; + assert_eq!(instance.status, WorkflowStatus::Complete); + } + + #[test] + fn test_build_s3_json_valid() { + let json = build_s3_json("AK", "SK"); + let v: serde_json::Value = serde_json::from_str(&json).unwrap(); + assert_eq!(v["identities"][0]["credentials"][0]["accessKey"], "AK"); + } +} diff --git a/src/workflows/seed/steps/kv_seeding.rs b/src/workflows/seed/steps/kv_seeding.rs index 4fbfd87b..5db5d923 100644 --- a/src/workflows/seed/steps/kv_seeding.rs +++ b/src/workflows/seed/steps/kv_seeding.rs @@ -1,556 +1,5 @@ -//! KV secret seeding steps: generate/read all credentials, write dirty paths, -//! configure Kubernetes auth for VSO. +//! KV seeding steps — now handled by primitives (SeedKVPath, WriteKVPath, +//! CollectCredentials, EnableVaultAuth, WriteVaultAuthConfig, WriteVaultPolicy, +//! WriteVaultRole). //! -//! Data-struct-agnostic — reads JSON fields directly for cross-workflow reuse. - -use std::collections::{HashMap, HashSet}; - -use wfe_core::models::ExecutionResult; -use wfe_core::traits::{StepBody, StepExecutionContext}; - -use crate::openbao::BaoClient; -use crate::output::{ok, warn}; -use crate::secrets::{ - self, gen_dkim_key_pair, gen_fernet_key, rand_token, rand_token_n, scw_config, - GITEA_ADMIN_USER, SMTP_URI, -}; - -fn step_err(msg: impl Into) -> wfe_core::WfeError { - wfe_core::WfeError::StepExecution(msg.into()) -} - -fn json_bool(data: &serde_json::Value, key: &str) -> bool { - data.get(key).and_then(|v| v.as_bool()).unwrap_or(false) -} - -fn json_str(data: &serde_json::Value, key: &str) -> Option { - data.get(key).and_then(|v| v.as_str()).map(|s| s.to_string()) -} - -// ── SeedAllKVPaths ────────────────────────────────────────────────────────── - -/// Single step that runs the get_or_create loop for all 19 services. -/// Sets `creds` and `dirty_paths` in data. -/// -/// Reads: `skip_seed`, `ob_pod`, `root_token` -/// Writes: `creds`, `dirty_paths` -#[derive(Default)] -pub struct SeedAllKVPaths; - -#[async_trait::async_trait] -impl StepBody for SeedAllKVPaths { - async fn run( - &mut self, - ctx: &StepExecutionContext<'_>, - ) -> wfe_core::Result { - let data = &ctx.workflow.data; - - if json_bool(data, "skip_seed") { - return Ok(ExecutionResult::next()); - } - - let ob_pod = match json_str(data, "ob_pod") { - Some(p) => p, - None => { - warn("No ob_pod set -- skipping KV seeding."); - return Ok(ExecutionResult::next()); - } - }; - let root_token = match json_str(data, "root_token") { - Some(t) => t, - None => { - warn("No root_token set -- skipping KV seeding."); - return Ok(ExecutionResult::next()); - } - }; - - let pf = secrets::port_forward("data", &ob_pod, 8200).await - .map_err(|e| step_err(e.to_string()))?; - let bao = BaoClient::with_token( - &format!("http://127.0.0.1:{}", pf.local_port), - &root_token, - ); - - let mut dirty_paths: HashSet = HashSet::new(); - - let hydra = secrets::get_or_create( - &bao, "hydra", - &[ - ("system-secret", &rand_token as &(dyn Fn() -> String + Send + Sync)), - ("cookie-secret", &rand_token), - ("pairwise-salt", &rand_token), - ], - &mut dirty_paths, - ).await.map_err(|e| step_err(e.to_string()))?; - - let smtp_uri_fn = || SMTP_URI.to_string(); - let kratos = secrets::get_or_create( - &bao, "kratos", - &[ - ("secrets-default", &rand_token as &(dyn Fn() -> String + Send + Sync)), - ("secrets-cookie", &rand_token), - ("smtp-connection-uri", &smtp_uri_fn), - ], - &mut dirty_paths, - ).await.map_err(|e| step_err(e.to_string()))?; - - let seaweedfs = secrets::get_or_create( - &bao, "seaweedfs", - &[ - ("access-key", &rand_token as &(dyn Fn() -> String + Send + Sync)), - ("secret-key", &rand_token), - ], - &mut dirty_paths, - ).await.map_err(|e| step_err(e.to_string()))?; - - let gitea_admin_user_fn = || GITEA_ADMIN_USER.to_string(); - let gitea = secrets::get_or_create( - &bao, "gitea", - &[ - ("admin-username", &gitea_admin_user_fn as &(dyn Fn() -> String + Send + Sync)), - ("admin-password", &rand_token), - ], - &mut dirty_paths, - ).await.map_err(|e| step_err(e.to_string()))?; - - let hive_local_fn = || "hive-local".to_string(); - let hive = secrets::get_or_create( - &bao, "hive", - &[ - ("oidc-client-id", &hive_local_fn as &(dyn Fn() -> String + Send + Sync)), - ("oidc-client-secret", &rand_token), - ], - &mut dirty_paths, - ).await.map_err(|e| step_err(e.to_string()))?; - - let devkey_fn = || "devkey".to_string(); - let livekit = secrets::get_or_create( - &bao, "livekit", - &[ - ("api-key", &devkey_fn as &(dyn Fn() -> String + Send + Sync)), - ("api-secret", &rand_token), - ], - &mut dirty_paths, - ).await.map_err(|e| step_err(e.to_string()))?; - - let people = secrets::get_or_create( - &bao, "people", - &[("django-secret-key", &rand_token as &(dyn Fn() -> String + Send + Sync))], - &mut dirty_paths, - ).await.map_err(|e| step_err(e.to_string()))?; - - let login_ui = secrets::get_or_create( - &bao, "login-ui", - &[ - ("cookie-secret", &rand_token as &(dyn Fn() -> String + Send + Sync)), - ("csrf-cookie-secret", &rand_token), - ], - &mut dirty_paths, - ).await.map_err(|e| step_err(e.to_string()))?; - - let sw_access = seaweedfs.get("access-key").cloned().unwrap_or_default(); - let sw_secret = seaweedfs.get("secret-key").cloned().unwrap_or_default(); - let empty_fn = || String::new(); - let sw_access_fn = { let v = sw_access.clone(); move || v.clone() }; - let sw_secret_fn = { let v = sw_secret.clone(); move || v.clone() }; - - let kratos_admin = secrets::get_or_create( - &bao, "kratos-admin", - &[ - ("cookie-secret", &rand_token as &(dyn Fn() -> String + Send + Sync)), - ("csrf-cookie-secret", &rand_token), - ("admin-identity-ids", &empty_fn), - ("s3-access-key", &sw_access_fn), - ("s3-secret-key", &sw_secret_fn), - ], - &mut dirty_paths, - ).await.map_err(|e| step_err(e.to_string()))?; - - let docs = secrets::get_or_create( - &bao, "docs", - &[ - ("django-secret-key", &rand_token as &(dyn Fn() -> String + Send + Sync)), - ("collaboration-secret", &rand_token), - ], - &mut dirty_paths, - ).await.map_err(|e| step_err(e.to_string()))?; - - let meet = secrets::get_or_create( - &bao, "meet", - &[ - ("django-secret-key", &rand_token as &(dyn Fn() -> String + Send + Sync)), - ("application-jwt-secret-key", &rand_token), - ], - &mut dirty_paths, - ).await.map_err(|e| step_err(e.to_string()))?; - - let drive = secrets::get_or_create( - &bao, "drive", - &[("django-secret-key", &rand_token as &(dyn Fn() -> String + Send + Sync))], - &mut dirty_paths, - ).await.map_err(|e| step_err(e.to_string()))?; - - let projects = secrets::get_or_create( - &bao, "projects", - &[("secret-key", &rand_token as &(dyn Fn() -> String + Send + Sync))], - &mut dirty_paths, - ).await.map_err(|e| step_err(e.to_string()))?; - - let cal_django_fn = || rand_token_n(50); - let calendars = secrets::get_or_create( - &bao, "calendars", - &[ - ("django-secret-key", &cal_django_fn as &(dyn Fn() -> String + Send + Sync)), - ("salt-key", &rand_token), - ("caldav-inbound-api-key", &rand_token), - ("caldav-outbound-api-key", &rand_token), - ("caldav-internal-api-key", &rand_token), - ], - &mut dirty_paths, - ).await.map_err(|e| step_err(e.to_string()))?; - - // DKIM key pair - let existing_messages = bao.kv_get("secret", "messages").await - .map_err(|e| step_err(e.to_string()))? - .unwrap_or_default(); - let (dkim_private, dkim_public) = if existing_messages - .get("dkim-private-key").filter(|v| !v.is_empty()).is_some() - { - ( - existing_messages.get("dkim-private-key").cloned().unwrap_or_default(), - existing_messages.get("dkim-public-key").cloned().unwrap_or_default(), - ) - } else { - gen_dkim_key_pair() - }; - - let dkim_priv_fn = { let v = dkim_private.clone(); move || v.clone() }; - let dkim_pub_fn = { let v = dkim_public.clone(); move || v.clone() }; - let socks_proxy_fn = || format!("sunbeam:{}", rand_token()); - let sunbeam_fn = || "sunbeam".to_string(); - - let messages = secrets::get_or_create( - &bao, "messages", - &[ - ("django-secret-key", &rand_token as &(dyn Fn() -> String + Send + Sync)), - ("salt-key", &rand_token), - ("mda-api-secret", &rand_token), - ("oidc-refresh-token-key", &gen_fernet_key as &(dyn Fn() -> String + Send + Sync)), - ("dkim-private-key", &dkim_priv_fn), - ("dkim-public-key", &dkim_pub_fn), - ("rspamd-password", &rand_token), - ("socks-proxy-users", &socks_proxy_fn), - ("mta-out-smtp-username", &sunbeam_fn), - ("mta-out-smtp-password", &rand_token), - ], - &mut dirty_paths, - ).await.map_err(|e| step_err(e.to_string()))?; - - let admin_fn = || "admin".to_string(); - let collabora = secrets::get_or_create( - &bao, "collabora", - &[ - ("username", &admin_fn as &(dyn Fn() -> String + Send + Sync)), - ("password", &rand_token), - ], - &mut dirty_paths, - ).await.map_err(|e| step_err(e.to_string()))?; - - let tuwunel = secrets::get_or_create( - &bao, "tuwunel", - &[ - ("oidc-client-id", &empty_fn as &(dyn Fn() -> String + Send + Sync)), - ("oidc-client-secret", &empty_fn), - ("turn-secret", &empty_fn), - ("registration-token", &rand_token), - ], - &mut dirty_paths, - ).await.map_err(|e| step_err(e.to_string()))?; - - let grafana = secrets::get_or_create( - &bao, "grafana", - &[("admin-password", &rand_token as &(dyn Fn() -> String + Send + Sync))], - &mut dirty_paths, - ).await.map_err(|e| step_err(e.to_string()))?; - - let scw_access_fn = || scw_config("access-key"); - let scw_secret_fn = || scw_config("secret-key"); - let scaleway_s3 = secrets::get_or_create( - &bao, "scaleway-s3", - &[ - ("access-key-id", &scw_access_fn as &(dyn Fn() -> String + Send + Sync)), - ("secret-access-key", &scw_secret_fn), - ], - &mut dirty_paths, - ).await.map_err(|e| step_err(e.to_string()))?; - - // Build credentials map - let mut creds = HashMap::new(); - let field_map: &[(&str, &str, &HashMap)] = &[ - ("hydra-system-secret", "system-secret", &hydra), - ("hydra-cookie-secret", "cookie-secret", &hydra), - ("hydra-pairwise-salt", "pairwise-salt", &hydra), - ("kratos-secrets-default", "secrets-default", &kratos), - ("kratos-secrets-cookie", "secrets-cookie", &kratos), - ("s3-access-key", "access-key", &seaweedfs), - ("s3-secret-key", "secret-key", &seaweedfs), - ("gitea-admin-password", "admin-password", &gitea), - ("hive-oidc-client-id", "oidc-client-id", &hive), - ("hive-oidc-client-secret", "oidc-client-secret", &hive), - ("people-django-secret", "django-secret-key", &people), - ("livekit-api-key", "api-key", &livekit), - ("livekit-api-secret", "api-secret", &livekit), - ("kratos-admin-cookie-secret", "cookie-secret", &kratos_admin), - ("messages-dkim-public-key", "dkim-public-key", &messages), - ]; - - for (cred_key, field_key, source) in field_map { - creds.insert(cred_key.to_string(), source.get(*field_key).cloned().unwrap_or_default()); - } - - // Store per-path data for WriteDirtyKVPaths - let all_paths: &[(&str, &HashMap)] = &[ - ("hydra", &hydra), ("kratos", &kratos), ("seaweedfs", &seaweedfs), - ("gitea", &gitea), ("hive", &hive), ("livekit", &livekit), - ("people", &people), ("login-ui", &login_ui), ("kratos-admin", &kratos_admin), - ("docs", &docs), ("meet", &meet), ("drive", &drive), - ("projects", &projects), ("calendars", &calendars), ("messages", &messages), - ("collabora", &collabora), ("tuwunel", &tuwunel), ("grafana", &grafana), - ("scaleway-s3", &scaleway_s3), - ]; - - for (path, data) in all_paths { - let json = serde_json::to_string(data).map_err(|e| step_err(e.to_string()))?; - creds.insert(format!("kv_data/{path}"), json); - } - - let dirty_vec: Vec = dirty_paths.into_iter().collect(); - - let mut result = ExecutionResult::next(); - result.output_data = Some(serde_json::json!({ - "creds": creds, - "dirty_paths": dirty_vec, - })); - Ok(result) - } -} - -// ── WriteDirtyKVPaths ─────────────────────────────────────────────────────── - -/// Write all modified KV paths to OpenBao. -/// -/// Reads: `skip_seed`, `ob_pod`, `root_token`, `dirty_paths`, `creds` -#[derive(Default)] -pub struct WriteDirtyKVPaths; - -#[async_trait::async_trait] -impl StepBody for WriteDirtyKVPaths { - async fn run( - &mut self, - ctx: &StepExecutionContext<'_>, - ) -> wfe_core::Result { - let data = &ctx.workflow.data; - - if json_bool(data, "skip_seed") { - return Ok(ExecutionResult::next()); - } - - let ob_pod = match json_str(data, "ob_pod") { - Some(p) => p, - None => return Ok(ExecutionResult::next()), - }; - let root_token = match json_str(data, "root_token") { - Some(t) => t, - None => return Ok(ExecutionResult::next()), - }; - - let dirty_paths: Vec = data.get("dirty_paths") - .and_then(|v| serde_json::from_value(v.clone()).ok()) - .unwrap_or_default(); - - if dirty_paths.is_empty() { - ok("All OpenBao KV secrets already present -- skipping writes."); - return Ok(ExecutionResult::next()); - } - - let mut sorted_paths = dirty_paths.clone(); - sorted_paths.sort(); - ok(&format!("Writing new secrets to OpenBao KV ({})...", sorted_paths.join(", "))); - - let pf = secrets::port_forward("data", &ob_pod, 8200).await - .map_err(|e| step_err(e.to_string()))?; - let bao = BaoClient::with_token( - &format!("http://127.0.0.1:{}", pf.local_port), - &root_token, - ); - - let creds: HashMap = data.get("creds") - .and_then(|v| serde_json::from_value(v.clone()).ok()) - .unwrap_or_default(); - - let dirty_set: HashSet<&str> = dirty_paths.iter().map(|s| s.as_str()).collect(); - - let kv_paths = [ - "hydra", "kratos", "seaweedfs", "gitea", "hive", "livekit", - "people", "login-ui", "kratos-admin", "docs", "meet", "drive", - "projects", "calendars", "messages", "collabora", "tuwunel", - "grafana", "scaleway-s3", "penpot", - ]; - - for path in kv_paths { - if dirty_set.contains(path) { - let json_key = format!("kv_data/{path}"); - if let Some(json_str) = creds.get(&json_key) { - let path_data: HashMap = serde_json::from_str(json_str) - .map_err(|e| step_err(e.to_string()))?; - bao.kv_patch("secret", path, &path_data).await - .map_err(|e| step_err(e.to_string()))?; - } - } - } - - Ok(ExecutionResult::next()) - } -} - -// ── ConfigureKubernetesAuth ───────────────────────────────────────────────── - -/// Enable Kubernetes auth, configure it, write VSO policy and role. -/// -/// Reads: `skip_seed`, `ob_pod`, `root_token` -#[derive(Default)] -pub struct ConfigureKubernetesAuth; - -#[async_trait::async_trait] -impl StepBody for ConfigureKubernetesAuth { - async fn run( - &mut self, - ctx: &StepExecutionContext<'_>, - ) -> wfe_core::Result { - let data = &ctx.workflow.data; - - if json_bool(data, "skip_seed") { - return Ok(ExecutionResult::next()); - } - - let ob_pod = match json_str(data, "ob_pod") { - Some(p) => p, - None => return Ok(ExecutionResult::next()), - }; - let root_token = match json_str(data, "root_token") { - Some(t) => t, - None => return Ok(ExecutionResult::next()), - }; - - ok("Configuring Kubernetes auth for VSO..."); - - let pf = secrets::port_forward("data", &ob_pod, 8200).await - .map_err(|e| step_err(e.to_string()))?; - let bao = BaoClient::with_token( - &format!("http://127.0.0.1:{}", pf.local_port), - &root_token, - ); - - let _ = bao.auth_enable("kubernetes", "kubernetes").await; - - bao.write( - "auth/kubernetes/config", - &serde_json::json!({ - "kubernetes_host": "https://kubernetes.default.svc.cluster.local" - }), - ).await.map_err(|e| step_err(e.to_string()))?; - - let policy_hcl = concat!( - "path \"secret/data/*\" { capabilities = [\"read\"] }\n", - "path \"secret/metadata/*\" { capabilities = [\"read\", \"list\"] }\n", - "path \"database/static-creds/*\" { capabilities = [\"read\"] }\n", - ); - bao.write_policy("vso-reader", policy_hcl).await - .map_err(|e| step_err(e.to_string()))?; - - bao.write( - "auth/kubernetes/role/vso", - &serde_json::json!({ - "bound_service_account_names": "default", - "bound_service_account_namespaces": "ory,devtools,storage,lasuite,matrix,media,data,monitoring,cert-manager", - "policies": "vso-reader", - "ttl": "1h" - }), - ).await.map_err(|e| step_err(e.to_string()))?; - - Ok(ExecutionResult::next()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::time::Duration; - use wfe::run_workflow_sync; - use wfe_core::builder::WorkflowBuilder; - use wfe_core::models::WorkflowStatus; - - async fn run_step( - data: serde_json::Value, - ) -> wfe_core::models::WorkflowInstance { - let host = crate::workflows::host::create_test_host().await.unwrap(); - host.register_step::().await; - let def = WorkflowBuilder::::new() - .start_with::() - .name("test-step") - .end_workflow() - .build("test-wf", 1); - host.register_workflow_definition(def).await; - let instance = run_workflow_sync(&host, "test-wf", 1, data, Duration::from_secs(5)) - .await - .unwrap(); - host.stop().await; - instance - } - - #[tokio::test] - async fn test_seed_all_kv_paths_skip_seed() { - let instance = run_step::(serde_json::json!({ "skip_seed": true })).await; - assert_eq!(instance.status, WorkflowStatus::Complete); - } - - #[tokio::test] - async fn test_seed_all_kv_paths_no_ob_pod() { - let instance = run_step::(serde_json::json!({ "skip_seed": false })).await; - assert_eq!(instance.status, WorkflowStatus::Complete); - } - - #[tokio::test] - async fn test_seed_all_kv_paths_no_root_token() { - let instance = run_step::( - serde_json::json!({ "skip_seed": false, "ob_pod": "openbao-0" }) - ).await; - assert_eq!(instance.status, WorkflowStatus::Complete); - } - - #[tokio::test] - async fn test_write_dirty_kv_paths_skip_seed() { - let instance = run_step::(serde_json::json!({ "skip_seed": true })).await; - assert_eq!(instance.status, WorkflowStatus::Complete); - } - - #[tokio::test] - async fn test_write_dirty_kv_paths_empty_dirty_paths() { - let instance = run_step::(serde_json::json!({ - "skip_seed": false, "ob_pod": "openbao-0", "root_token": "hvs.test", "dirty_paths": [], - })).await; - assert_eq!(instance.status, WorkflowStatus::Complete); - } - - #[tokio::test] - async fn test_configure_kubernetes_auth_skip_seed() { - let instance = run_step::(serde_json::json!({ "skip_seed": true })).await; - assert_eq!(instance.status, WorkflowStatus::Complete); - } - - #[tokio::test] - async fn test_configure_kubernetes_auth_no_ob_pod() { - let instance = run_step::(serde_json::json!({ "skip_seed": false })).await; - assert_eq!(instance.status, WorkflowStatus::Complete); - } -} +//! This module is kept for backward compatibility but contains no step structs. diff --git a/src/workflows/seed/steps/postgres.rs b/src/workflows/seed/steps/postgres.rs index aab54fc2..d080d1a7 100644 --- a/src/workflows/seed/steps/postgres.rs +++ b/src/workflows/seed/steps/postgres.rs @@ -13,7 +13,7 @@ use wfe_core::traits::{StepBody, StepExecutionContext}; use crate::kube as k; use crate::openbao::BaoClient; use crate::output::{ok, warn}; -use crate::secrets::{self, PG_USERS}; +use crate::secrets; fn step_err(msg: impl Into) -> wfe_core::WfeError { wfe_core::WfeError::StepExecution(msg.into()) @@ -38,6 +38,7 @@ pub(crate) fn pg_db_map() -> HashMap<&'static str, &'static str> { ("conversations", "conversations_db"), ("people", "people_db"), ("find", "find_db"), ("calendars", "calendars_db"), ("projects", "projects_db"), ("penpot", "penpot_db"), + ("stalwart", "stalwart_db"), ].into_iter().collect() } @@ -120,48 +121,6 @@ impl StepBody for WaitForPostgres { } } -// ── EnsurePGRolesAndDatabases ─────────────────────────────────────────────── - -/// Create all 13 users and databases. -/// -/// Reads: `skip_seed`, `pg_pod` -#[derive(Default)] -pub struct EnsurePGRolesAndDatabases; - -#[async_trait::async_trait] -impl StepBody for EnsurePGRolesAndDatabases { - async fn run( - &mut self, - ctx: &StepExecutionContext<'_>, - ) -> wfe_core::Result { - let data = &ctx.workflow.data; - - if json_bool(data, "skip_seed") { - return Ok(ExecutionResult::next()); - } - - let pg_pod = match json_str(data, "pg_pod") { - Some(p) if !p.is_empty() => p, - _ => return Ok(ExecutionResult::next()), - }; - - ok("Ensuring postgres roles and databases exist..."); - - let db_map = pg_db_map(); - - for user in PG_USERS { - let sql = ensure_user_sql(user); - let _ = k::kube_exec("data", &pg_pod, &["psql", "-U", "postgres", "-c", &sql], Some("postgres")).await; - - let db = db_map.get(user).copied().unwrap_or("unknown_db"); - let sql = create_db_sql(db, user); - let _ = k::kube_exec("data", &pg_pod, &["psql", "-U", "postgres", "-c", &sql], Some("postgres")).await; - } - - Ok(ExecutionResult::next()) - } -} - // ── ConfigureDatabaseEngine ───────────────────────────────────────────────── /// Configure OpenBao database secrets engine. @@ -244,18 +203,6 @@ mod tests { assert_eq!(instance.status, WorkflowStatus::Complete); } - #[tokio::test] - async fn test_ensure_pg_roles_skip_seed() { - let instance = run_step::(serde_json::json!({ "skip_seed": true })).await; - assert_eq!(instance.status, WorkflowStatus::Complete); - } - - #[tokio::test] - async fn test_ensure_pg_roles_no_pg_pod() { - let instance = run_step::(serde_json::json!({ "skip_seed": false })).await; - assert_eq!(instance.status, WorkflowStatus::Complete); - } - #[tokio::test] async fn test_configure_db_engine_skip_seed() { let instance = run_step::(serde_json::json!({ "skip_seed": true })).await; @@ -271,8 +218,8 @@ mod tests { #[test] fn test_pg_db_map_contains_all_users() { let map = pg_db_map(); - assert_eq!(map.len(), 13); - for user in PG_USERS { + assert_eq!(map.len(), 15); + for user in crate::secrets::PG_USERS { assert!(map.contains_key(user), "pg_db_map missing key for: {user}"); } }