From 6a341e426c24b9ad2236eac7b3ed51bb03923e26 Mon Sep 17 00:00:00 2001 From: Sienna Meridian Satterwhite Date: Mon, 6 Apr 2026 16:42:09 +0100 Subject: [PATCH] feat(wfe-kubernetes): KubernetesStep executor (StepBody impl) --- wfe-kubernetes/src/lib.rs | 11 ++ wfe-kubernetes/src/step.rs | 337 +++++++++++++++++++++++++++++++++++++ 2 files changed, 348 insertions(+) create mode 100644 wfe-kubernetes/src/lib.rs create mode 100644 wfe-kubernetes/src/step.rs diff --git a/wfe-kubernetes/src/lib.rs b/wfe-kubernetes/src/lib.rs new file mode 100644 index 0000000..83c48fa --- /dev/null +++ b/wfe-kubernetes/src/lib.rs @@ -0,0 +1,11 @@ +pub mod cleanup; +pub mod client; +pub mod config; +pub mod logs; +pub mod manifests; +pub mod namespace; +pub mod output; +pub mod step; + +pub use config::{ClusterConfig, KubernetesStepConfig}; +pub use step::KubernetesStep; diff --git a/wfe-kubernetes/src/step.rs b/wfe-kubernetes/src/step.rs new file mode 100644 index 0000000..9bacf54 --- /dev/null +++ b/wfe-kubernetes/src/step.rs @@ -0,0 +1,337 @@ +use std::collections::HashMap; +use std::time::Duration; + +use async_trait::async_trait; +use k8s_openapi::api::batch::v1::Job; +use k8s_openapi::api::core::v1::Pod; +use kube::api::{ListParams, PostParams}; +use kube::{Api, Client}; +use wfe_core::models::ExecutionResult; +use wfe_core::traits::step::{StepBody, StepExecutionContext}; +use wfe_core::WfeError; + +use crate::cleanup::delete_job; +use crate::config::{ClusterConfig, KubernetesStepConfig}; +use crate::logs::{stream_logs, wait_for_pod_running}; +use crate::manifests::build_job; +use crate::namespace::{ensure_namespace, namespace_name}; +use crate::output::{build_output_data, parse_outputs}; + +/// A workflow step that runs as a Kubernetes Job. +pub struct KubernetesStep { + config: KubernetesStepConfig, + cluster: ClusterConfig, + client: Client, +} + +impl KubernetesStep { + pub fn new(config: KubernetesStepConfig, cluster: ClusterConfig, client: Client) -> Self { + Self { + config, + cluster, + client, + } + } +} + +#[async_trait] +impl StepBody for KubernetesStep { + async fn run( + &mut self, + context: &StepExecutionContext<'_>, + ) -> wfe_core::Result { + let step_name = context + .step + .name + .as_deref() + .unwrap_or("unknown") + .to_string(); + + let workflow_id = &context.workflow.id; + let definition_id = &context.workflow.workflow_definition_id; + + // 1. Determine namespace. + let ns = self + .config + .namespace + .clone() + .unwrap_or_else(|| namespace_name(&self.cluster.namespace_prefix, workflow_id)); + + // 2. Ensure namespace exists. + ensure_namespace(&self.client, &ns, workflow_id).await?; + + // 3. Merge env vars: workflow.data (uppercased) + config.env. + let env_overrides = extract_workflow_env(&context.workflow.data); + + // 4. Build Job manifest. + let job_manifest = build_job( + &self.config, + &step_name, + &ns, + &env_overrides, + &self.cluster, + ); + let job_name = job_manifest + .metadata + .name + .clone() + .unwrap_or_else(|| step_name.clone()); + + // 5. Create Job. + let jobs: Api = Api::namespaced(self.client.clone(), &ns); + jobs.create(&PostParams::default(), &job_manifest) + .await + .map_err(|e| { + WfeError::StepExecution(format!("failed to create job '{job_name}': {e}")) + })?; + + // Wrap remaining steps in timeout if configured. + let result = if let Some(timeout_ms) = self.config.timeout_ms { + match tokio::time::timeout( + Duration::from_millis(timeout_ms), + self.execute_job(&ns, &job_name, &step_name, definition_id, workflow_id, context), + ) + .await + { + Ok(result) => result, + Err(_) => { + // Timeout — clean up and fail. + delete_job(&self.client, &ns, &job_name).await.ok(); + return Err(WfeError::StepExecution(format!( + "kubernetes job '{job_name}' timed out after {timeout_ms}ms" + ))); + } + } + } else { + self.execute_job(&ns, &job_name, &step_name, definition_id, workflow_id, context) + .await + }; + + // Always attempt cleanup. + delete_job(&self.client, &ns, &job_name).await.ok(); + + result + } +} + +impl KubernetesStep { + /// Execute a Job after it's been created: wait for pod, stream logs, check exit code. + async fn execute_job( + &self, + namespace: &str, + job_name: &str, + step_name: &str, + definition_id: &str, + workflow_id: &str, + context: &StepExecutionContext<'_>, + ) -> wfe_core::Result { + // 6. Find the pod created by the Job. + let pod_name = wait_for_job_pod(&self.client, namespace, job_name).await?; + + // 7. Wait for the pod container to start. + wait_for_pod_running(&self.client, namespace, &pod_name).await?; + + // 8. Stream logs + capture stdout. + let stdout = stream_logs( + &self.client, + namespace, + &pod_name, + step_name, + definition_id, + workflow_id, + context.step.id, + context.log_sink, + ) + .await + .unwrap_or_default(); + + // 9. Wait for Job completion and get exit code. + let (exit_code, stderr) = + wait_for_job_completion(&self.client, namespace, job_name, &pod_name).await?; + + // 10. Check exit code. + if exit_code != 0 { + return Err(WfeError::StepExecution(format!( + "kubernetes job '{job_name}' exited with code {exit_code}\nstdout: {stdout}\nstderr: {stderr}" + ))); + } + + // 11. Parse outputs from stdout. + let parsed = parse_outputs(&stdout); + let output_data = build_output_data(step_name, &stdout, &stderr, exit_code, &parsed); + + Ok(ExecutionResult { + proceed: true, + output_data: Some(output_data), + ..Default::default() + }) + } +} + +/// Wait for the Job to create a pod, returning the pod name. +async fn wait_for_job_pod( + client: &Client, + namespace: &str, + job_name: &str, +) -> Result { + let pods: Api = Api::namespaced(client.clone(), namespace); + let selector = format!("job-name={job_name}"); + + for _ in 0..60 { + let pod_list = pods + .list(&ListParams::default().labels(&selector)) + .await + .map_err(|e| { + WfeError::StepExecution(format!( + "failed to list pods for job '{job_name}': {e}" + )) + })?; + + if let Some(pod) = pod_list.items.first() { + if let Some(name) = &pod.metadata.name { + return Ok(name.clone()); + } + } + + tokio::time::sleep(Duration::from_secs(1)).await; + } + + Err(WfeError::StepExecution(format!( + "no pod created for job '{job_name}' within 60s" + ))) +} + +/// Wait for a Job to reach a terminal state. Returns (exit_code, stderr). +async fn wait_for_job_completion( + client: &Client, + namespace: &str, + job_name: &str, + pod_name: &str, +) -> Result<(i32, String), WfeError> { + let jobs: Api = Api::namespaced(client.clone(), namespace); + let pods: Api = Api::namespaced(client.clone(), namespace); + + // Poll Job status. + for _ in 0..600 { + let job = jobs.get(job_name).await.map_err(|e| { + WfeError::StepExecution(format!("failed to get job '{job_name}': {e}")) + })?; + + if let Some(status) = &job.status { + if let Some(conditions) = &status.conditions { + for cond in conditions { + if cond.type_ == "Complete" && cond.status == "True" { + let exit_code = get_pod_exit_code(&pods, pod_name).await; + let stderr = get_pod_stderr(&pods, pod_name).await; + return Ok((exit_code, stderr)); + } + if cond.type_ == "Failed" && cond.status == "True" { + let exit_code = get_pod_exit_code(&pods, pod_name).await; + let stderr = get_pod_stderr(&pods, pod_name).await; + return Ok((exit_code, stderr)); + } + } + } + + // Also check succeeded/failed counts. + if status.succeeded.unwrap_or(0) > 0 { + let exit_code = get_pod_exit_code(&pods, pod_name).await; + let stderr = get_pod_stderr(&pods, pod_name).await; + return Ok((exit_code, stderr)); + } + if status.failed.unwrap_or(0) > 0 { + let exit_code = get_pod_exit_code(&pods, pod_name).await; + let stderr = get_pod_stderr(&pods, pod_name).await; + return Ok((exit_code, stderr)); + } + } + + tokio::time::sleep(Duration::from_secs(1)).await; + } + + Err(WfeError::StepExecution(format!( + "job '{job_name}' did not complete within 600s" + ))) +} + +/// Get the exit code from a pod's terminated container. +async fn get_pod_exit_code(pods: &Api, pod_name: &str) -> i32 { + match pods.get(pod_name).await { + Ok(pod) => pod + .status + .and_then(|s| s.container_statuses) + .and_then(|cs| cs.first().cloned()) + .and_then(|cs| cs.state) + .and_then(|s| s.terminated) + .map(|t| t.exit_code) + .unwrap_or(-1), + Err(_) => -1, + } +} + +/// Get stderr from a pod's logs (separate container or log endpoint). +/// K8s doesn't separate stdout/stderr in the log API, so we return empty. +async fn get_pod_stderr(_pods: &Api, _pod_name: &str) -> String { + // K8s log API interleaves stdout/stderr. Stderr extraction would require + // a sidecar pattern. For now, return empty — all output is in stdout. + String::new() +} + +/// Extract workflow data as uppercase environment variables. +pub fn extract_workflow_env(data: &serde_json::Value) -> HashMap { + let mut env = HashMap::new(); + if let Some(obj) = data.as_object() { + for (key, value) in obj { + let val_str = match value { + serde_json::Value::String(s) => s.clone(), + other => other.to_string(), + }; + env.insert(key.to_uppercase(), val_str); + } + } + env +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn extract_workflow_env_from_object() { + let data = serde_json::json!({ + "name": "World", + "count": 42, + "flag": true, + }); + let env = extract_workflow_env(&data); + assert_eq!(env.get("NAME"), Some(&"World".to_string())); + assert_eq!(env.get("COUNT"), Some(&"42".to_string())); + assert_eq!(env.get("FLAG"), Some(&"true".to_string())); + } + + #[test] + fn extract_workflow_env_from_non_object() { + let data = serde_json::json!("just a string"); + let env = extract_workflow_env(&data); + assert!(env.is_empty()); + } + + #[test] + fn extract_workflow_env_null() { + let data = serde_json::Value::Null; + let env = extract_workflow_env(&data); + assert!(env.is_empty()); + } + + #[test] + fn extract_workflow_env_nested_object() { + let data = serde_json::json!({"config": {"nested": true}}); + let env = extract_workflow_env(&data); + // Nested object serialized as JSON string. + assert_eq!( + env.get("CONFIG"), + Some(&r#"{"nested":true}"#.to_string()) + ); + } +}