From 8f3539ed34ef3d3f47d5653922db67d1f243efab Mon Sep 17 00:00:00 2001 From: Sienna Meridian Satterwhite Date: Mon, 6 Apr 2026 17:09:11 +0100 Subject: [PATCH] refactor(wfe-kubernetes): add lazy client mode for factory-based step creation --- wfe-kubernetes/src/step.rs | 47 ++++++++++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 12 deletions(-) diff --git a/wfe-kubernetes/src/step.rs b/wfe-kubernetes/src/step.rs index 9bacf54..fe94d7e 100644 --- a/wfe-kubernetes/src/step.rs +++ b/wfe-kubernetes/src/step.rs @@ -21,17 +21,35 @@ use crate::output::{build_output_data, parse_outputs}; pub struct KubernetesStep { config: KubernetesStepConfig, cluster: ClusterConfig, - client: Client, + client: Option, } impl KubernetesStep { + /// Create a step with a pre-built client. pub fn new(config: KubernetesStepConfig, cluster: ClusterConfig, client: Client) -> Self { Self { config, cluster, - client, + client: Some(client), } } + + /// Create a step that will lazily connect to the cluster on first run. + pub fn lazy(config: KubernetesStepConfig, cluster: ClusterConfig) -> Self { + Self { + config, + cluster, + client: None, + } + } + + async fn get_client(&mut self) -> wfe_core::Result<&Client> { + if self.client.is_none() { + let client = crate::client::create_client(&self.cluster).await?; + self.client = Some(client); + } + Ok(self.client.as_ref().unwrap()) + } } #[async_trait] @@ -50,6 +68,10 @@ impl StepBody for KubernetesStep { let workflow_id = &context.workflow.id; let definition_id = &context.workflow.workflow_definition_id; + // 0. Ensure client is connected. + let _ = self.get_client().await?; + let client = self.client.as_ref().unwrap().clone(); + // 1. Determine namespace. let ns = self .config @@ -58,7 +80,7 @@ impl StepBody for KubernetesStep { .unwrap_or_else(|| namespace_name(&self.cluster.namespace_prefix, workflow_id)); // 2. Ensure namespace exists. - ensure_namespace(&self.client, &ns, workflow_id).await?; + ensure_namespace(&client, &ns, workflow_id).await?; // 3. Merge env vars: workflow.data (uppercased) + config.env. let env_overrides = extract_workflow_env(&context.workflow.data); @@ -78,7 +100,7 @@ impl StepBody for KubernetesStep { .unwrap_or_else(|| step_name.clone()); // 5. Create Job. - let jobs: Api = Api::namespaced(self.client.clone(), &ns); + let jobs: Api = Api::namespaced(client.clone(), &ns); jobs.create(&PostParams::default(), &job_manifest) .await .map_err(|e| { @@ -89,26 +111,26 @@ impl StepBody for KubernetesStep { let result = if let Some(timeout_ms) = self.config.timeout_ms { match tokio::time::timeout( Duration::from_millis(timeout_ms), - self.execute_job(&ns, &job_name, &step_name, definition_id, workflow_id, context), + self.execute_job(&client, &ns, &job_name, &step_name, definition_id, workflow_id, context), ) .await { Ok(result) => result, Err(_) => { // Timeout — clean up and fail. - delete_job(&self.client, &ns, &job_name).await.ok(); + delete_job(&client, &ns, &job_name).await.ok(); return Err(WfeError::StepExecution(format!( "kubernetes job '{job_name}' timed out after {timeout_ms}ms" ))); } } } else { - self.execute_job(&ns, &job_name, &step_name, definition_id, workflow_id, context) + self.execute_job(&client, &ns, &job_name, &step_name, definition_id, workflow_id, context) .await }; // Always attempt cleanup. - delete_job(&self.client, &ns, &job_name).await.ok(); + delete_job(&client, &ns, &job_name).await.ok(); result } @@ -118,6 +140,7 @@ impl KubernetesStep { /// Execute a Job after it's been created: wait for pod, stream logs, check exit code. async fn execute_job( &self, + client: &Client, namespace: &str, job_name: &str, step_name: &str, @@ -126,14 +149,14 @@ impl KubernetesStep { context: &StepExecutionContext<'_>, ) -> wfe_core::Result { // 6. Find the pod created by the Job. - let pod_name = wait_for_job_pod(&self.client, namespace, job_name).await?; + let pod_name = wait_for_job_pod(client, namespace, job_name).await?; // 7. Wait for the pod container to start. - wait_for_pod_running(&self.client, namespace, &pod_name).await?; + wait_for_pod_running(client, namespace, &pod_name).await?; // 8. Stream logs + capture stdout. let stdout = stream_logs( - &self.client, + client, namespace, &pod_name, step_name, @@ -147,7 +170,7 @@ impl KubernetesStep { // 9. Wait for Job completion and get exit code. let (exit_code, stderr) = - wait_for_job_completion(&self.client, namespace, job_name, &pod_name).await?; + wait_for_job_completion(client, namespace, job_name, &pod_name).await?; // 10. Check exit code. if exit_code != 0 {