From d62dc0f34963af463fb3182537e58bb533a802db Mon Sep 17 00:00:00 2001 From: Sienna Meridian Satterwhite Date: Mon, 6 Apr 2026 16:41:55 +0100 Subject: [PATCH] feat(wfe-kubernetes): Job manifest builder and output parsing --- wfe-kubernetes/src/manifests.rs | 422 ++++++++++++++++++++++++++++++++ wfe-kubernetes/src/output.rs | 164 +++++++++++++ 2 files changed, 586 insertions(+) create mode 100644 wfe-kubernetes/src/manifests.rs create mode 100644 wfe-kubernetes/src/output.rs diff --git a/wfe-kubernetes/src/manifests.rs b/wfe-kubernetes/src/manifests.rs new file mode 100644 index 0000000..2166c0f --- /dev/null +++ b/wfe-kubernetes/src/manifests.rs @@ -0,0 +1,422 @@ +use std::collections::{BTreeMap, HashMap}; + +use k8s_openapi::api::batch::v1::{Job, JobSpec}; +use k8s_openapi::api::core::v1::{ + Container, EnvVar, LocalObjectReference, PodSpec, PodTemplateSpec, ResourceRequirements, +}; +use k8s_openapi::apimachinery::pkg::api::resource::Quantity; +use kube::api::ObjectMeta; + +use crate::config::{ClusterConfig, KubernetesStepConfig}; + +const LABEL_STEP_NAME: &str = "wfe.sunbeam.pt/step-name"; +const LABEL_MANAGED_BY: &str = "wfe.sunbeam.pt/managed-by"; + +/// Build a Kubernetes Job manifest from step configuration. +pub fn build_job( + config: &KubernetesStepConfig, + step_name: &str, + namespace: &str, + env_overrides: &HashMap, + cluster: &ClusterConfig, +) -> Job { + let job_name = sanitize_name(step_name); + + // Build container command. + let (command, args) = resolve_command(config); + + // Merge environment variables: overrides first, then config (config wins). + let env_vars = build_env_vars(env_overrides, &config.env); + + // Resource limits. + let resources = build_resources(&config.memory, &config.cpu); + + // Image pull policy. + let pull_policy = config + .pull_policy + .clone() + .unwrap_or_else(|| "IfNotPresent".to_string()); + + let mut labels = BTreeMap::new(); + labels.insert(LABEL_STEP_NAME.into(), step_name.to_string()); + labels.insert(LABEL_MANAGED_BY.into(), "wfe-kubernetes".into()); + + let container = Container { + name: "step".into(), + image: Some(config.image.clone()), + command, + args, + env: Some(env_vars), + working_dir: config.working_dir.clone(), + resources: Some(resources), + image_pull_policy: Some(pull_policy), + ..Default::default() + }; + + let image_pull_secrets = if cluster.image_pull_secrets.is_empty() { + None + } else { + Some( + cluster + .image_pull_secrets + .iter() + .map(|s| LocalObjectReference { + name: s.clone(), + }) + .collect(), + ) + }; + + let node_selector = if cluster.node_selector.is_empty() { + None + } else { + Some(cluster.node_selector.iter().map(|(k, v)| (k.clone(), v.clone())).collect::>()) + }; + + Job { + metadata: ObjectMeta { + name: Some(job_name), + namespace: Some(namespace.to_string()), + labels: Some(labels.clone()), + ..Default::default() + }, + spec: Some(JobSpec { + backoff_limit: Some(0), + ttl_seconds_after_finished: Some(300), + template: PodTemplateSpec { + metadata: Some(ObjectMeta { + labels: Some(labels), + ..Default::default() + }), + spec: Some(PodSpec { + containers: vec![container], + restart_policy: Some("Never".into()), + service_account_name: cluster.service_account.clone(), + image_pull_secrets, + node_selector, + ..Default::default() + }), + }, + ..Default::default() + }), + ..Default::default() + } +} + +/// Resolve command and args from config. +fn resolve_command(config: &KubernetesStepConfig) -> (Option>, Option>) { + if let Some(ref cmd) = config.command { + (Some(cmd.clone()), None) + } else if let Some(ref run) = config.run { + ( + Some(vec!["/bin/sh".into(), "-c".into()]), + Some(vec![run.clone()]), + ) + } else { + (None, None) + } +} + +/// Build environment variables, merging overrides and config. +fn build_env_vars( + overrides: &HashMap, + config_env: &HashMap, +) -> Vec { + let mut merged: HashMap = overrides.clone(); + // Config env wins over overrides. + for (k, v) in config_env { + merged.insert(k.clone(), v.clone()); + } + let mut vars: Vec = merged + .into_iter() + .map(|(name, value)| EnvVar { + name, + value: Some(value), + ..Default::default() + }) + .collect(); + vars.sort_by(|a, b| a.name.cmp(&b.name)); + vars +} + +/// Build resource requirements from memory and CPU strings. +fn build_resources(memory: &Option, cpu: &Option) -> ResourceRequirements { + let mut limits = BTreeMap::new(); + let mut requests = BTreeMap::new(); + + if let Some(mem) = memory { + limits.insert("memory".into(), Quantity(mem.clone())); + requests.insert("memory".into(), Quantity(mem.clone())); + } + if let Some(cpu_val) = cpu { + limits.insert("cpu".into(), Quantity(cpu_val.clone())); + requests.insert("cpu".into(), Quantity(cpu_val.clone())); + } + + ResourceRequirements { + limits: if limits.is_empty() { + None + } else { + Some(limits) + }, + requests: if requests.is_empty() { + None + } else { + Some(requests) + }, + ..Default::default() + } +} + +/// Sanitize a step name for use as a K8s resource name. +/// Must be lowercase alphanumeric + hyphens, max 63 chars. +pub fn sanitize_name(name: &str) -> String { + let sanitized: String = name + .to_lowercase() + .chars() + .map(|c| if c.is_ascii_alphanumeric() || c == '-' { c } else { '-' }) + .take(63) + .collect(); + sanitized.trim_end_matches('-').to_string() +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + fn default_cluster() -> ClusterConfig { + ClusterConfig::default() + } + + #[test] + fn build_job_minimal() { + let config = KubernetesStepConfig { + image: "alpine:3.18".into(), + command: None, + run: None, + env: HashMap::new(), + working_dir: None, + memory: None, + cpu: None, + timeout_ms: None, + pull_policy: None, + namespace: None, + }; + let job = build_job(&config, "test-step", "wfe-abc", &HashMap::new(), &default_cluster()); + + assert_eq!(job.metadata.name, Some("test-step".into())); + assert_eq!(job.metadata.namespace, Some("wfe-abc".into())); + + let spec = job.spec.as_ref().unwrap(); + assert_eq!(spec.backoff_limit, Some(0)); + assert_eq!(spec.ttl_seconds_after_finished, Some(300)); + + let pod_spec = spec.template.spec.as_ref().unwrap(); + assert_eq!(pod_spec.restart_policy, Some("Never".into())); + assert_eq!(pod_spec.containers.len(), 1); + + let container = &pod_spec.containers[0]; + assert_eq!(container.image, Some("alpine:3.18".into())); + assert_eq!(container.image_pull_policy, Some("IfNotPresent".into())); + assert!(container.command.is_none()); + assert!(container.args.is_none()); + } + + #[test] + fn build_job_with_run_command() { + let config = KubernetesStepConfig { + image: "node:20".into(), + command: None, + run: Some("npm test".into()), + env: HashMap::new(), + working_dir: Some("/app".into()), + memory: None, + cpu: None, + timeout_ms: None, + pull_policy: None, + namespace: None, + }; + let job = build_job(&config, "test", "ns", &HashMap::new(), &default_cluster()); + let container = &job.spec.unwrap().template.spec.unwrap().containers[0]; + + assert_eq!( + container.command, + Some(vec!["/bin/sh".into(), "-c".into()]) + ); + assert_eq!(container.args, Some(vec!["npm test".into()])); + assert_eq!(container.working_dir, Some("/app".into())); + } + + #[test] + fn build_job_with_explicit_command() { + let config = KubernetesStepConfig { + image: "gcc:latest".into(), + command: Some(vec!["make".into(), "build".into()]), + run: None, + env: HashMap::new(), + working_dir: None, + memory: None, + cpu: None, + timeout_ms: None, + pull_policy: None, + namespace: None, + }; + let job = build_job(&config, "build", "ns", &HashMap::new(), &default_cluster()); + let container = &job.spec.unwrap().template.spec.unwrap().containers[0]; + + assert_eq!( + container.command, + Some(vec!["make".into(), "build".into()]) + ); + assert!(container.args.is_none()); + } + + #[test] + fn build_job_merges_env_vars() { + let config = KubernetesStepConfig { + image: "alpine".into(), + command: None, + run: None, + env: [("APP_ENV".into(), "production".into())].into(), + working_dir: None, + memory: None, + cpu: None, + timeout_ms: None, + pull_policy: None, + namespace: None, + }; + let overrides: HashMap = + [("WORKFLOW_ID".into(), "wf-123".into()), ("APP_ENV".into(), "staging".into())].into(); + + let job = build_job(&config, "step", "ns", &overrides, &default_cluster()); + let container = &job.spec.unwrap().template.spec.unwrap().containers[0]; + let env = container.env.as_ref().unwrap(); + + // Config env wins over overrides. + let app_env = env.iter().find(|e| e.name == "APP_ENV").unwrap(); + assert_eq!(app_env.value, Some("production".into())); + + // Override still present. + let wf_id = env.iter().find(|e| e.name == "WORKFLOW_ID").unwrap(); + assert_eq!(wf_id.value, Some("wf-123".into())); + } + + #[test] + fn build_job_with_resources() { + let config = KubernetesStepConfig { + image: "alpine".into(), + command: None, + run: None, + env: HashMap::new(), + working_dir: None, + memory: Some("512Mi".into()), + cpu: Some("500m".into()), + timeout_ms: None, + pull_policy: None, + namespace: None, + }; + let job = build_job(&config, "step", "ns", &HashMap::new(), &default_cluster()); + let container = &job.spec.unwrap().template.spec.unwrap().containers[0]; + let resources = container.resources.as_ref().unwrap(); + + let limits = resources.limits.as_ref().unwrap(); + assert_eq!(limits.get("memory"), Some(&Quantity("512Mi".into()))); + assert_eq!(limits.get("cpu"), Some(&Quantity("500m".into()))); + + let requests = resources.requests.as_ref().unwrap(); + assert_eq!(requests.get("memory"), Some(&Quantity("512Mi".into()))); + } + + #[test] + fn build_job_with_cluster_config() { + let cluster = ClusterConfig { + service_account: Some("wfe-runner".into()), + image_pull_secrets: vec!["ghcr-secret".into()], + node_selector: [("tier".into(), "compute".into())].into(), + ..Default::default() + }; + let config = KubernetesStepConfig { + image: "alpine".into(), + command: None, + run: None, + env: HashMap::new(), + working_dir: None, + memory: None, + cpu: None, + timeout_ms: None, + pull_policy: Some("Always".into()), + namespace: None, + }; + let job = build_job(&config, "step", "ns", &HashMap::new(), &cluster); + let pod_spec = job.spec.unwrap().template.spec.unwrap(); + + assert_eq!(pod_spec.service_account_name, Some("wfe-runner".into())); + assert_eq!(pod_spec.image_pull_secrets.unwrap().len(), 1); + assert_eq!( + pod_spec.node_selector.unwrap().get("tier"), + Some(&"compute".to_string()) + ); + + let container = &pod_spec.containers[0]; + assert_eq!(container.image_pull_policy, Some("Always".into())); + } + + #[test] + fn build_job_labels() { + let config = KubernetesStepConfig { + image: "alpine".into(), + command: None, + run: None, + env: HashMap::new(), + working_dir: None, + memory: None, + cpu: None, + timeout_ms: None, + pull_policy: None, + namespace: None, + }; + let job = build_job(&config, "my-step", "ns", &HashMap::new(), &default_cluster()); + let labels = job.metadata.labels.as_ref().unwrap(); + assert_eq!(labels.get(LABEL_STEP_NAME), Some(&"my-step".to_string())); + assert_eq!( + labels.get(LABEL_MANAGED_BY), + Some(&"wfe-kubernetes".to_string()) + ); + } + + #[test] + fn sanitize_name_basic() { + assert_eq!(sanitize_name("my-step"), "my-step"); + } + + #[test] + fn sanitize_name_special_chars() { + assert_eq!(sanitize_name("my_step.v1"), "my-step-v1"); + } + + #[test] + fn sanitize_name_uppercase() { + assert_eq!(sanitize_name("MyStep"), "mystep"); + } + + #[test] + fn sanitize_name_truncates() { + let long = "a".repeat(100); + assert!(sanitize_name(&long).len() <= 63); + } + + #[test] + fn sanitize_name_trims_trailing_hyphens() { + assert_eq!(sanitize_name("step---"), "step"); + } + + #[test] + fn env_vars_sorted_by_name() { + let overrides: HashMap = + [("ZZZ".into(), "1".into()), ("AAA".into(), "2".into())].into(); + let vars = build_env_vars(&overrides, &HashMap::new()); + assert_eq!(vars[0].name, "AAA"); + assert_eq!(vars[1].name, "ZZZ"); + } +} diff --git a/wfe-kubernetes/src/output.rs b/wfe-kubernetes/src/output.rs new file mode 100644 index 0000000..c9396fe --- /dev/null +++ b/wfe-kubernetes/src/output.rs @@ -0,0 +1,164 @@ +use std::collections::HashMap; + +/// Parse `##wfe[output key=value]` lines from stdout. +/// +/// This is the same output protocol used by wfe-containerd and wfe-yaml shell steps. +pub fn parse_outputs(stdout: &str) -> HashMap { + let mut outputs = HashMap::new(); + for line in stdout.lines() { + let trimmed = line.trim(); + if let Some(inner) = trimmed + .strip_prefix("##wfe[output ") + .and_then(|s| s.strip_suffix(']')) + { + if let Some(eq_pos) = inner.find('=') { + let key = inner[..eq_pos].trim().to_string(); + let value = inner[eq_pos + 1..].to_string(); + if !key.is_empty() { + outputs.insert(key, value); + } + } + } + } + outputs +} + +/// Build the output_data JSON object from step execution results. +/// +/// Includes parsed `##wfe[output]` values plus raw stdout/stderr/exit_code +/// prefixed with the step name. +pub fn build_output_data( + step_name: &str, + stdout: &str, + stderr: &str, + exit_code: i32, + parsed: &HashMap, +) -> serde_json::Value { + let mut map = serde_json::Map::new(); + + // Add parsed outputs. + for (key, value) in parsed { + // Try to parse as JSON value, fall back to string. + let json_val = serde_json::from_str(value) + .unwrap_or_else(|_| serde_json::Value::String(value.clone())); + map.insert(key.clone(), json_val); + } + + // Add raw stdout/stderr/exit_code with step name prefix. + map.insert( + format!("{step_name}.stdout"), + serde_json::Value::String(stdout.to_string()), + ); + map.insert( + format!("{step_name}.stderr"), + serde_json::Value::String(stderr.to_string()), + ); + map.insert( + format!("{step_name}.exit_code"), + serde_json::Value::Number(exit_code.into()), + ); + + serde_json::Value::Object(map) +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn parse_empty_stdout() { + assert!(parse_outputs("").is_empty()); + } + + #[test] + fn parse_no_output_markers() { + let stdout = "hello world\nsome other output\n"; + assert!(parse_outputs(stdout).is_empty()); + } + + #[test] + fn parse_single_output() { + let stdout = "building...\n##wfe[output version=1.2.3]\ndone\n"; + let outputs = parse_outputs(stdout); + assert_eq!(outputs.get("version"), Some(&"1.2.3".to_string())); + } + + #[test] + fn parse_multiple_outputs() { + let stdout = "##wfe[output digest=sha256:abc]\n##wfe[output tag=latest]\n"; + let outputs = parse_outputs(stdout); + assert_eq!(outputs.len(), 2); + assert_eq!(outputs.get("digest"), Some(&"sha256:abc".to_string())); + assert_eq!(outputs.get("tag"), Some(&"latest".to_string())); + } + + #[test] + fn parse_value_with_equals() { + let stdout = "##wfe[output url=https://example.com?a=1&b=2]\n"; + let outputs = parse_outputs(stdout); + assert_eq!( + outputs.get("url"), + Some(&"https://example.com?a=1&b=2".to_string()) + ); + } + + #[test] + fn parse_duplicate_key_last_wins() { + let stdout = "##wfe[output key=first]\n##wfe[output key=second]\n"; + let outputs = parse_outputs(stdout); + assert_eq!(outputs.get("key"), Some(&"second".to_string())); + } + + #[test] + fn parse_whitespace_in_key() { + let stdout = "##wfe[output key = value]\n"; + let outputs = parse_outputs(stdout); + assert_eq!(outputs.get("key"), Some(&" value".to_string())); + } + + #[test] + fn parse_empty_value() { + let stdout = "##wfe[output key=]\n"; + let outputs = parse_outputs(stdout); + assert_eq!(outputs.get("key"), Some(&"".to_string())); + } + + #[test] + fn parse_ignores_malformed() { + let stdout = "##wfe[output ]\n##wfe[output no_equals]\n##wfe[output =no_key]\n"; + let outputs = parse_outputs(stdout); + // "=no_key" has empty key, ignored. "no_equals" has no =, ignored. " " has no =, ignored. + assert!(outputs.is_empty()); + } + + #[test] + fn build_output_data_with_parsed() { + let parsed: HashMap = + [("version".into(), "1.0.0".into())].into_iter().collect(); + let data = build_output_data("test", "hello\n", "warn\n", 0, &parsed); + assert_eq!(data["version"], "1.0.0"); + assert_eq!(data["test.stdout"], "hello\n"); + assert_eq!(data["test.stderr"], "warn\n"); + assert_eq!(data["test.exit_code"], 0); + } + + #[test] + fn build_output_data_empty() { + let data = build_output_data("step", "", "", 0, &HashMap::new()); + assert_eq!(data["step.stdout"], ""); + assert_eq!(data["step.exit_code"], 0); + } + + #[test] + fn build_output_data_json_value() { + let parsed: HashMap = + [("count".into(), "42".into()), ("flag".into(), "true".into())] + .into_iter() + .collect(); + let data = build_output_data("s", "", "", 0, &parsed); + // Numbers and booleans should be parsed as JSON, not strings. + assert_eq!(data["count"], 42); + assert_eq!(data["flag"], true); + } +}