feat(wfe-kubernetes): Job manifest builder and output parsing

This commit is contained in:
2026-04-06 16:41:55 +01:00
parent 1574342e92
commit d62dc0f349
2 changed files with 586 additions and 0 deletions

View File

@@ -0,0 +1,422 @@
use std::collections::{BTreeMap, HashMap};
use k8s_openapi::api::batch::v1::{Job, JobSpec};
use k8s_openapi::api::core::v1::{
Container, EnvVar, LocalObjectReference, PodSpec, PodTemplateSpec, ResourceRequirements,
};
use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
use kube::api::ObjectMeta;
use crate::config::{ClusterConfig, KubernetesStepConfig};
const LABEL_STEP_NAME: &str = "wfe.sunbeam.pt/step-name";
const LABEL_MANAGED_BY: &str = "wfe.sunbeam.pt/managed-by";
/// Build a Kubernetes Job manifest from step configuration.
pub fn build_job(
config: &KubernetesStepConfig,
step_name: &str,
namespace: &str,
env_overrides: &HashMap<String, String>,
cluster: &ClusterConfig,
) -> Job {
let job_name = sanitize_name(step_name);
// Build container command.
let (command, args) = resolve_command(config);
// Merge environment variables: overrides first, then config (config wins).
let env_vars = build_env_vars(env_overrides, &config.env);
// Resource limits.
let resources = build_resources(&config.memory, &config.cpu);
// Image pull policy.
let pull_policy = config
.pull_policy
.clone()
.unwrap_or_else(|| "IfNotPresent".to_string());
let mut labels = BTreeMap::new();
labels.insert(LABEL_STEP_NAME.into(), step_name.to_string());
labels.insert(LABEL_MANAGED_BY.into(), "wfe-kubernetes".into());
let container = Container {
name: "step".into(),
image: Some(config.image.clone()),
command,
args,
env: Some(env_vars),
working_dir: config.working_dir.clone(),
resources: Some(resources),
image_pull_policy: Some(pull_policy),
..Default::default()
};
let image_pull_secrets = if cluster.image_pull_secrets.is_empty() {
None
} else {
Some(
cluster
.image_pull_secrets
.iter()
.map(|s| LocalObjectReference {
name: s.clone(),
})
.collect(),
)
};
let node_selector = if cluster.node_selector.is_empty() {
None
} else {
Some(cluster.node_selector.iter().map(|(k, v)| (k.clone(), v.clone())).collect::<BTreeMap<_, _>>())
};
Job {
metadata: ObjectMeta {
name: Some(job_name),
namespace: Some(namespace.to_string()),
labels: Some(labels.clone()),
..Default::default()
},
spec: Some(JobSpec {
backoff_limit: Some(0),
ttl_seconds_after_finished: Some(300),
template: PodTemplateSpec {
metadata: Some(ObjectMeta {
labels: Some(labels),
..Default::default()
}),
spec: Some(PodSpec {
containers: vec![container],
restart_policy: Some("Never".into()),
service_account_name: cluster.service_account.clone(),
image_pull_secrets,
node_selector,
..Default::default()
}),
},
..Default::default()
}),
..Default::default()
}
}
/// Resolve command and args from config.
fn resolve_command(config: &KubernetesStepConfig) -> (Option<Vec<String>>, Option<Vec<String>>) {
if let Some(ref cmd) = config.command {
(Some(cmd.clone()), None)
} else if let Some(ref run) = config.run {
(
Some(vec!["/bin/sh".into(), "-c".into()]),
Some(vec![run.clone()]),
)
} else {
(None, None)
}
}
/// Build environment variables, merging overrides and config.
fn build_env_vars(
overrides: &HashMap<String, String>,
config_env: &HashMap<String, String>,
) -> Vec<EnvVar> {
let mut merged: HashMap<String, String> = overrides.clone();
// Config env wins over overrides.
for (k, v) in config_env {
merged.insert(k.clone(), v.clone());
}
let mut vars: Vec<EnvVar> = merged
.into_iter()
.map(|(name, value)| EnvVar {
name,
value: Some(value),
..Default::default()
})
.collect();
vars.sort_by(|a, b| a.name.cmp(&b.name));
vars
}
/// Build resource requirements from memory and CPU strings.
fn build_resources(memory: &Option<String>, cpu: &Option<String>) -> ResourceRequirements {
let mut limits = BTreeMap::new();
let mut requests = BTreeMap::new();
if let Some(mem) = memory {
limits.insert("memory".into(), Quantity(mem.clone()));
requests.insert("memory".into(), Quantity(mem.clone()));
}
if let Some(cpu_val) = cpu {
limits.insert("cpu".into(), Quantity(cpu_val.clone()));
requests.insert("cpu".into(), Quantity(cpu_val.clone()));
}
ResourceRequirements {
limits: if limits.is_empty() {
None
} else {
Some(limits)
},
requests: if requests.is_empty() {
None
} else {
Some(requests)
},
..Default::default()
}
}
/// Sanitize a step name for use as a K8s resource name.
/// Must be lowercase alphanumeric + hyphens, max 63 chars.
pub fn sanitize_name(name: &str) -> String {
let sanitized: String = name
.to_lowercase()
.chars()
.map(|c| if c.is_ascii_alphanumeric() || c == '-' { c } else { '-' })
.take(63)
.collect();
sanitized.trim_end_matches('-').to_string()
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
fn default_cluster() -> ClusterConfig {
ClusterConfig::default()
}
#[test]
fn build_job_minimal() {
let config = KubernetesStepConfig {
image: "alpine:3.18".into(),
command: None,
run: None,
env: HashMap::new(),
working_dir: None,
memory: None,
cpu: None,
timeout_ms: None,
pull_policy: None,
namespace: None,
};
let job = build_job(&config, "test-step", "wfe-abc", &HashMap::new(), &default_cluster());
assert_eq!(job.metadata.name, Some("test-step".into()));
assert_eq!(job.metadata.namespace, Some("wfe-abc".into()));
let spec = job.spec.as_ref().unwrap();
assert_eq!(spec.backoff_limit, Some(0));
assert_eq!(spec.ttl_seconds_after_finished, Some(300));
let pod_spec = spec.template.spec.as_ref().unwrap();
assert_eq!(pod_spec.restart_policy, Some("Never".into()));
assert_eq!(pod_spec.containers.len(), 1);
let container = &pod_spec.containers[0];
assert_eq!(container.image, Some("alpine:3.18".into()));
assert_eq!(container.image_pull_policy, Some("IfNotPresent".into()));
assert!(container.command.is_none());
assert!(container.args.is_none());
}
#[test]
fn build_job_with_run_command() {
let config = KubernetesStepConfig {
image: "node:20".into(),
command: None,
run: Some("npm test".into()),
env: HashMap::new(),
working_dir: Some("/app".into()),
memory: None,
cpu: None,
timeout_ms: None,
pull_policy: None,
namespace: None,
};
let job = build_job(&config, "test", "ns", &HashMap::new(), &default_cluster());
let container = &job.spec.unwrap().template.spec.unwrap().containers[0];
assert_eq!(
container.command,
Some(vec!["/bin/sh".into(), "-c".into()])
);
assert_eq!(container.args, Some(vec!["npm test".into()]));
assert_eq!(container.working_dir, Some("/app".into()));
}
#[test]
fn build_job_with_explicit_command() {
let config = KubernetesStepConfig {
image: "gcc:latest".into(),
command: Some(vec!["make".into(), "build".into()]),
run: None,
env: HashMap::new(),
working_dir: None,
memory: None,
cpu: None,
timeout_ms: None,
pull_policy: None,
namespace: None,
};
let job = build_job(&config, "build", "ns", &HashMap::new(), &default_cluster());
let container = &job.spec.unwrap().template.spec.unwrap().containers[0];
assert_eq!(
container.command,
Some(vec!["make".into(), "build".into()])
);
assert!(container.args.is_none());
}
#[test]
fn build_job_merges_env_vars() {
let config = KubernetesStepConfig {
image: "alpine".into(),
command: None,
run: None,
env: [("APP_ENV".into(), "production".into())].into(),
working_dir: None,
memory: None,
cpu: None,
timeout_ms: None,
pull_policy: None,
namespace: None,
};
let overrides: HashMap<String, String> =
[("WORKFLOW_ID".into(), "wf-123".into()), ("APP_ENV".into(), "staging".into())].into();
let job = build_job(&config, "step", "ns", &overrides, &default_cluster());
let container = &job.spec.unwrap().template.spec.unwrap().containers[0];
let env = container.env.as_ref().unwrap();
// Config env wins over overrides.
let app_env = env.iter().find(|e| e.name == "APP_ENV").unwrap();
assert_eq!(app_env.value, Some("production".into()));
// Override still present.
let wf_id = env.iter().find(|e| e.name == "WORKFLOW_ID").unwrap();
assert_eq!(wf_id.value, Some("wf-123".into()));
}
#[test]
fn build_job_with_resources() {
let config = KubernetesStepConfig {
image: "alpine".into(),
command: None,
run: None,
env: HashMap::new(),
working_dir: None,
memory: Some("512Mi".into()),
cpu: Some("500m".into()),
timeout_ms: None,
pull_policy: None,
namespace: None,
};
let job = build_job(&config, "step", "ns", &HashMap::new(), &default_cluster());
let container = &job.spec.unwrap().template.spec.unwrap().containers[0];
let resources = container.resources.as_ref().unwrap();
let limits = resources.limits.as_ref().unwrap();
assert_eq!(limits.get("memory"), Some(&Quantity("512Mi".into())));
assert_eq!(limits.get("cpu"), Some(&Quantity("500m".into())));
let requests = resources.requests.as_ref().unwrap();
assert_eq!(requests.get("memory"), Some(&Quantity("512Mi".into())));
}
#[test]
fn build_job_with_cluster_config() {
let cluster = ClusterConfig {
service_account: Some("wfe-runner".into()),
image_pull_secrets: vec!["ghcr-secret".into()],
node_selector: [("tier".into(), "compute".into())].into(),
..Default::default()
};
let config = KubernetesStepConfig {
image: "alpine".into(),
command: None,
run: None,
env: HashMap::new(),
working_dir: None,
memory: None,
cpu: None,
timeout_ms: None,
pull_policy: Some("Always".into()),
namespace: None,
};
let job = build_job(&config, "step", "ns", &HashMap::new(), &cluster);
let pod_spec = job.spec.unwrap().template.spec.unwrap();
assert_eq!(pod_spec.service_account_name, Some("wfe-runner".into()));
assert_eq!(pod_spec.image_pull_secrets.unwrap().len(), 1);
assert_eq!(
pod_spec.node_selector.unwrap().get("tier"),
Some(&"compute".to_string())
);
let container = &pod_spec.containers[0];
assert_eq!(container.image_pull_policy, Some("Always".into()));
}
#[test]
fn build_job_labels() {
let config = KubernetesStepConfig {
image: "alpine".into(),
command: None,
run: None,
env: HashMap::new(),
working_dir: None,
memory: None,
cpu: None,
timeout_ms: None,
pull_policy: None,
namespace: None,
};
let job = build_job(&config, "my-step", "ns", &HashMap::new(), &default_cluster());
let labels = job.metadata.labels.as_ref().unwrap();
assert_eq!(labels.get(LABEL_STEP_NAME), Some(&"my-step".to_string()));
assert_eq!(
labels.get(LABEL_MANAGED_BY),
Some(&"wfe-kubernetes".to_string())
);
}
#[test]
fn sanitize_name_basic() {
assert_eq!(sanitize_name("my-step"), "my-step");
}
#[test]
fn sanitize_name_special_chars() {
assert_eq!(sanitize_name("my_step.v1"), "my-step-v1");
}
#[test]
fn sanitize_name_uppercase() {
assert_eq!(sanitize_name("MyStep"), "mystep");
}
#[test]
fn sanitize_name_truncates() {
let long = "a".repeat(100);
assert!(sanitize_name(&long).len() <= 63);
}
#[test]
fn sanitize_name_trims_trailing_hyphens() {
assert_eq!(sanitize_name("step---"), "step");
}
#[test]
fn env_vars_sorted_by_name() {
let overrides: HashMap<String, String> =
[("ZZZ".into(), "1".into()), ("AAA".into(), "2".into())].into();
let vars = build_env_vars(&overrides, &HashMap::new());
assert_eq!(vars[0].name, "AAA");
assert_eq!(vars[1].name, "ZZZ");
}
}

View File

@@ -0,0 +1,164 @@
use std::collections::HashMap;
/// Parse `##wfe[output key=value]` lines from stdout.
///
/// This is the same output protocol used by wfe-containerd and wfe-yaml shell steps.
pub fn parse_outputs(stdout: &str) -> HashMap<String, String> {
let mut outputs = HashMap::new();
for line in stdout.lines() {
let trimmed = line.trim();
if let Some(inner) = trimmed
.strip_prefix("##wfe[output ")
.and_then(|s| s.strip_suffix(']'))
{
if let Some(eq_pos) = inner.find('=') {
let key = inner[..eq_pos].trim().to_string();
let value = inner[eq_pos + 1..].to_string();
if !key.is_empty() {
outputs.insert(key, value);
}
}
}
}
outputs
}
/// Build the output_data JSON object from step execution results.
///
/// Includes parsed `##wfe[output]` values plus raw stdout/stderr/exit_code
/// prefixed with the step name.
pub fn build_output_data(
step_name: &str,
stdout: &str,
stderr: &str,
exit_code: i32,
parsed: &HashMap<String, String>,
) -> serde_json::Value {
let mut map = serde_json::Map::new();
// Add parsed outputs.
for (key, value) in parsed {
// Try to parse as JSON value, fall back to string.
let json_val = serde_json::from_str(value)
.unwrap_or_else(|_| serde_json::Value::String(value.clone()));
map.insert(key.clone(), json_val);
}
// Add raw stdout/stderr/exit_code with step name prefix.
map.insert(
format!("{step_name}.stdout"),
serde_json::Value::String(stdout.to_string()),
);
map.insert(
format!("{step_name}.stderr"),
serde_json::Value::String(stderr.to_string()),
);
map.insert(
format!("{step_name}.exit_code"),
serde_json::Value::Number(exit_code.into()),
);
serde_json::Value::Object(map)
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
#[test]
fn parse_empty_stdout() {
assert!(parse_outputs("").is_empty());
}
#[test]
fn parse_no_output_markers() {
let stdout = "hello world\nsome other output\n";
assert!(parse_outputs(stdout).is_empty());
}
#[test]
fn parse_single_output() {
let stdout = "building...\n##wfe[output version=1.2.3]\ndone\n";
let outputs = parse_outputs(stdout);
assert_eq!(outputs.get("version"), Some(&"1.2.3".to_string()));
}
#[test]
fn parse_multiple_outputs() {
let stdout = "##wfe[output digest=sha256:abc]\n##wfe[output tag=latest]\n";
let outputs = parse_outputs(stdout);
assert_eq!(outputs.len(), 2);
assert_eq!(outputs.get("digest"), Some(&"sha256:abc".to_string()));
assert_eq!(outputs.get("tag"), Some(&"latest".to_string()));
}
#[test]
fn parse_value_with_equals() {
let stdout = "##wfe[output url=https://example.com?a=1&b=2]\n";
let outputs = parse_outputs(stdout);
assert_eq!(
outputs.get("url"),
Some(&"https://example.com?a=1&b=2".to_string())
);
}
#[test]
fn parse_duplicate_key_last_wins() {
let stdout = "##wfe[output key=first]\n##wfe[output key=second]\n";
let outputs = parse_outputs(stdout);
assert_eq!(outputs.get("key"), Some(&"second".to_string()));
}
#[test]
fn parse_whitespace_in_key() {
let stdout = "##wfe[output key = value]\n";
let outputs = parse_outputs(stdout);
assert_eq!(outputs.get("key"), Some(&" value".to_string()));
}
#[test]
fn parse_empty_value() {
let stdout = "##wfe[output key=]\n";
let outputs = parse_outputs(stdout);
assert_eq!(outputs.get("key"), Some(&"".to_string()));
}
#[test]
fn parse_ignores_malformed() {
let stdout = "##wfe[output ]\n##wfe[output no_equals]\n##wfe[output =no_key]\n";
let outputs = parse_outputs(stdout);
// "=no_key" has empty key, ignored. "no_equals" has no =, ignored. " " has no =, ignored.
assert!(outputs.is_empty());
}
#[test]
fn build_output_data_with_parsed() {
let parsed: HashMap<String, String> =
[("version".into(), "1.0.0".into())].into_iter().collect();
let data = build_output_data("test", "hello\n", "warn\n", 0, &parsed);
assert_eq!(data["version"], "1.0.0");
assert_eq!(data["test.stdout"], "hello\n");
assert_eq!(data["test.stderr"], "warn\n");
assert_eq!(data["test.exit_code"], 0);
}
#[test]
fn build_output_data_empty() {
let data = build_output_data("step", "", "", 0, &HashMap::new());
assert_eq!(data["step.stdout"], "");
assert_eq!(data["step.exit_code"], 0);
}
#[test]
fn build_output_data_json_value() {
let parsed: HashMap<String, String> =
[("count".into(), "42".into()), ("flag".into(), "true".into())]
.into_iter()
.collect();
let data = build_output_data("s", "", "", 0, &parsed);
// Numbers and booleans should be parsed as JSON, not strings.
assert_eq!(data["count"], 42);
assert_eq!(data["flag"], true);
}
}