From 85a83e75809c3c880e05bb2c3c14bc099d5752e8 Mon Sep 17 00:00:00 2001 From: Sienna Meridian Satterwhite Date: Mon, 6 Apr 2026 16:42:04 +0100 Subject: [PATCH] feat(wfe-kubernetes): log streaming and resource cleanup --- wfe-kubernetes/src/cleanup.rs | 71 +++++++++++++++++++++ wfe-kubernetes/src/logs.rs | 114 ++++++++++++++++++++++++++++++++++ 2 files changed, 185 insertions(+) create mode 100644 wfe-kubernetes/src/cleanup.rs create mode 100644 wfe-kubernetes/src/logs.rs diff --git a/wfe-kubernetes/src/cleanup.rs b/wfe-kubernetes/src/cleanup.rs new file mode 100644 index 0000000..d27137c --- /dev/null +++ b/wfe-kubernetes/src/cleanup.rs @@ -0,0 +1,71 @@ +use std::time::Duration; + +use k8s_openapi::api::batch::v1::Job; +use k8s_openapi::api::core::v1::Namespace; +use kube::api::{DeleteParams, ListParams}; +use kube::{Api, Client}; +use wfe_core::WfeError; + +/// Delete a Job by name. The associated Pod is cleaned up via owner references. +pub async fn delete_job(client: &Client, namespace: &str, name: &str) -> Result<(), WfeError> { + let jobs: Api = Api::namespaced(client.clone(), namespace); + + let dp = DeleteParams { + propagation_policy: Some(kube::api::PropagationPolicy::Background), + ..Default::default() + }; + + match jobs.delete(name, &dp).await { + Ok(_) => Ok(()), + Err(kube::Error::Api(err)) if err.code == 404 => Ok(()), + Err(e) => Err(WfeError::StepExecution(format!( + "failed to delete job '{name}' in namespace '{namespace}': {e}" + ))), + } +} + +/// Clean up stale WFE namespaces older than the given duration. +/// +/// Returns the number of namespaces deleted. +pub async fn cleanup_stale_namespaces( + client: &Client, + prefix: &str, + older_than: Duration, +) -> Result { + let namespaces: Api = Api::all(client.clone()); + let lp = ListParams::default().labels("wfe.sunbeam.pt/managed-by=wfe-kubernetes"); + + let ns_list = namespaces + .list(&lp) + .await + .map_err(|e| WfeError::StepExecution(format!("failed to list namespaces: {e}")))?; + + let cutoff_secs = chrono::Utc::now().timestamp() - older_than.as_secs() as i64; + let mut deleted = 0u32; + + for ns in ns_list { + let name = ns.metadata.name.as_deref().unwrap_or(""); + if !name.starts_with(prefix) { + continue; + } + + // Extract creation timestamp as unix seconds. + let created_secs = ns + .metadata + .creation_timestamp + .as_ref() + .map(|t| t.0.as_second()) + .unwrap_or(i64::MAX); + + if created_secs < cutoff_secs { + if let Err(e) = namespaces.delete(name, &Default::default()).await { + tracing::warn!("failed to delete stale namespace '{name}': {e}"); + } else { + tracing::info!("cleaned up stale namespace '{name}'"); + deleted += 1; + } + } + } + + Ok(deleted) +} diff --git a/wfe-kubernetes/src/logs.rs b/wfe-kubernetes/src/logs.rs new file mode 100644 index 0000000..e205b2a --- /dev/null +++ b/wfe-kubernetes/src/logs.rs @@ -0,0 +1,114 @@ +use futures::io::AsyncBufReadExt; +use futures::StreamExt; +use k8s_openapi::api::core::v1::Pod; +use kube::api::LogParams; +use kube::{Api, Client}; +use wfe_core::traits::log_sink::{LogChunk, LogSink, LogStreamType}; +use wfe_core::WfeError; + +/// Stream logs from a pod container, optionally forwarding to a LogSink. +/// +/// Returns the full stdout content for output parsing. +/// Blocks until the container terminates and all logs are consumed. +pub async fn stream_logs( + client: &Client, + namespace: &str, + pod_name: &str, + step_name: &str, + definition_id: &str, + workflow_id: &str, + step_id: usize, + log_sink: Option<&dyn LogSink>, +) -> Result { + let pods: Api = Api::namespaced(client.clone(), namespace); + + let params = LogParams { + follow: true, + container: Some("step".into()), + ..Default::default() + }; + + let stream = pods.log_stream(pod_name, ¶ms).await.map_err(|e| { + WfeError::StepExecution(format!( + "failed to stream logs from pod '{pod_name}': {e}" + )) + })?; + + let mut stdout = String::new(); + let reader = futures::io::BufReader::new(stream); + let mut lines = reader.lines(); + + while let Some(line_result) = lines.next().await { + let line: String = line_result.map_err(|e| { + WfeError::StepExecution(format!("log stream error for pod '{pod_name}': {e}")) + })?; + stdout.push_str(&line); + stdout.push('\n'); + + if let Some(sink) = log_sink { + let mut data = line.into_bytes(); + data.push(b'\n'); + sink.write_chunk(LogChunk { + workflow_id: workflow_id.to_string(), + definition_id: definition_id.to_string(), + step_id, + step_name: step_name.to_string(), + stream: LogStreamType::Stdout, + data, + timestamp: chrono::Utc::now(), + }) + .await; + } + } + + Ok(stdout) +} + +/// Wait for a pod's container to be in a running or terminated state. +pub async fn wait_for_pod_running( + client: &Client, + namespace: &str, + pod_name: &str, +) -> Result<(), WfeError> { + let pods: Api = Api::namespaced(client.clone(), namespace); + + for _ in 0..120 { + match pods.get(pod_name).await { + Ok(pod) => { + if let Some(status) = &pod.status { + if let Some(container_statuses) = &status.container_statuses { + for cs in container_statuses { + if let Some(state) = &cs.state { + if state.running.is_some() || state.terminated.is_some() { + return Ok(()); + } + } + } + } + if let Some(conditions) = &status.conditions { + for cond in conditions { + if cond.type_ == "PodScheduled" && cond.status == "False" { + if let Some(ref msg) = cond.message { + return Err(WfeError::StepExecution(format!( + "pod '{pod_name}' scheduling failed: {msg}" + ))); + } + } + } + } + } + } + Err(kube::Error::Api(err)) if err.code == 404 => {} + Err(e) => { + return Err(WfeError::StepExecution(format!( + "failed to get pod '{pod_name}': {e}" + ))); + } + } + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + + Err(WfeError::StepExecution(format!( + "pod '{pod_name}' did not start within 120s" + ))) +}