refactor(wfe-kubernetes): add lazy client mode for factory-based step creation

This commit is contained in:
2026-04-06 17:09:11 +01:00
parent a9b0993ffb
commit 8f3539ed34

View File

@@ -21,17 +21,35 @@ use crate::output::{build_output_data, parse_outputs};
pub struct KubernetesStep { pub struct KubernetesStep {
config: KubernetesStepConfig, config: KubernetesStepConfig,
cluster: ClusterConfig, cluster: ClusterConfig,
client: Client, client: Option<Client>,
} }
impl KubernetesStep { impl KubernetesStep {
/// Create a step with a pre-built client.
pub fn new(config: KubernetesStepConfig, cluster: ClusterConfig, client: Client) -> Self { pub fn new(config: KubernetesStepConfig, cluster: ClusterConfig, client: Client) -> Self {
Self { Self {
config, config,
cluster, cluster,
client, client: Some(client),
} }
} }
/// Create a step that will lazily connect to the cluster on first run.
pub fn lazy(config: KubernetesStepConfig, cluster: ClusterConfig) -> Self {
Self {
config,
cluster,
client: None,
}
}
async fn get_client(&mut self) -> wfe_core::Result<&Client> {
if self.client.is_none() {
let client = crate::client::create_client(&self.cluster).await?;
self.client = Some(client);
}
Ok(self.client.as_ref().unwrap())
}
} }
#[async_trait] #[async_trait]
@@ -50,6 +68,10 @@ impl StepBody for KubernetesStep {
let workflow_id = &context.workflow.id; let workflow_id = &context.workflow.id;
let definition_id = &context.workflow.workflow_definition_id; let definition_id = &context.workflow.workflow_definition_id;
// 0. Ensure client is connected.
let _ = self.get_client().await?;
let client = self.client.as_ref().unwrap().clone();
// 1. Determine namespace. // 1. Determine namespace.
let ns = self let ns = self
.config .config
@@ -58,7 +80,7 @@ impl StepBody for KubernetesStep {
.unwrap_or_else(|| namespace_name(&self.cluster.namespace_prefix, workflow_id)); .unwrap_or_else(|| namespace_name(&self.cluster.namespace_prefix, workflow_id));
// 2. Ensure namespace exists. // 2. Ensure namespace exists.
ensure_namespace(&self.client, &ns, workflow_id).await?; ensure_namespace(&client, &ns, workflow_id).await?;
// 3. Merge env vars: workflow.data (uppercased) + config.env. // 3. Merge env vars: workflow.data (uppercased) + config.env.
let env_overrides = extract_workflow_env(&context.workflow.data); let env_overrides = extract_workflow_env(&context.workflow.data);
@@ -78,7 +100,7 @@ impl StepBody for KubernetesStep {
.unwrap_or_else(|| step_name.clone()); .unwrap_or_else(|| step_name.clone());
// 5. Create Job. // 5. Create Job.
let jobs: Api<Job> = Api::namespaced(self.client.clone(), &ns); let jobs: Api<Job> = Api::namespaced(client.clone(), &ns);
jobs.create(&PostParams::default(), &job_manifest) jobs.create(&PostParams::default(), &job_manifest)
.await .await
.map_err(|e| { .map_err(|e| {
@@ -89,26 +111,26 @@ impl StepBody for KubernetesStep {
let result = if let Some(timeout_ms) = self.config.timeout_ms { let result = if let Some(timeout_ms) = self.config.timeout_ms {
match tokio::time::timeout( match tokio::time::timeout(
Duration::from_millis(timeout_ms), Duration::from_millis(timeout_ms),
self.execute_job(&ns, &job_name, &step_name, definition_id, workflow_id, context), self.execute_job(&client, &ns, &job_name, &step_name, definition_id, workflow_id, context),
) )
.await .await
{ {
Ok(result) => result, Ok(result) => result,
Err(_) => { Err(_) => {
// Timeout — clean up and fail. // Timeout — clean up and fail.
delete_job(&self.client, &ns, &job_name).await.ok(); delete_job(&client, &ns, &job_name).await.ok();
return Err(WfeError::StepExecution(format!( return Err(WfeError::StepExecution(format!(
"kubernetes job '{job_name}' timed out after {timeout_ms}ms" "kubernetes job '{job_name}' timed out after {timeout_ms}ms"
))); )));
} }
} }
} else { } else {
self.execute_job(&ns, &job_name, &step_name, definition_id, workflow_id, context) self.execute_job(&client, &ns, &job_name, &step_name, definition_id, workflow_id, context)
.await .await
}; };
// Always attempt cleanup. // Always attempt cleanup.
delete_job(&self.client, &ns, &job_name).await.ok(); delete_job(&client, &ns, &job_name).await.ok();
result result
} }
@@ -118,6 +140,7 @@ impl KubernetesStep {
/// Execute a Job after it's been created: wait for pod, stream logs, check exit code. /// Execute a Job after it's been created: wait for pod, stream logs, check exit code.
async fn execute_job( async fn execute_job(
&self, &self,
client: &Client,
namespace: &str, namespace: &str,
job_name: &str, job_name: &str,
step_name: &str, step_name: &str,
@@ -126,14 +149,14 @@ impl KubernetesStep {
context: &StepExecutionContext<'_>, context: &StepExecutionContext<'_>,
) -> wfe_core::Result<ExecutionResult> { ) -> wfe_core::Result<ExecutionResult> {
// 6. Find the pod created by the Job. // 6. Find the pod created by the Job.
let pod_name = wait_for_job_pod(&self.client, namespace, job_name).await?; let pod_name = wait_for_job_pod(client, namespace, job_name).await?;
// 7. Wait for the pod container to start. // 7. Wait for the pod container to start.
wait_for_pod_running(&self.client, namespace, &pod_name).await?; wait_for_pod_running(client, namespace, &pod_name).await?;
// 8. Stream logs + capture stdout. // 8. Stream logs + capture stdout.
let stdout = stream_logs( let stdout = stream_logs(
&self.client, client,
namespace, namespace,
&pod_name, &pod_name,
step_name, step_name,
@@ -147,7 +170,7 @@ impl KubernetesStep {
// 9. Wait for Job completion and get exit code. // 9. Wait for Job completion and get exit code.
let (exit_code, stderr) = let (exit_code, stderr) =
wait_for_job_completion(&self.client, namespace, job_name, &pod_name).await?; wait_for_job_completion(client, namespace, job_name, &pod_name).await?;
// 10. Check exit code. // 10. Check exit code.
if exit_code != 0 { if exit_code != 0 {