115 lines
3.8 KiB
Rust
115 lines
3.8 KiB
Rust
use futures::io::AsyncBufReadExt;
|
|
use futures::StreamExt;
|
|
use k8s_openapi::api::core::v1::Pod;
|
|
use kube::api::LogParams;
|
|
use kube::{Api, Client};
|
|
use wfe_core::traits::log_sink::{LogChunk, LogSink, LogStreamType};
|
|
use wfe_core::WfeError;
|
|
|
|
/// Stream logs from a pod container, optionally forwarding to a LogSink.
|
|
///
|
|
/// Returns the full stdout content for output parsing.
|
|
/// Blocks until the container terminates and all logs are consumed.
|
|
pub async fn stream_logs(
|
|
client: &Client,
|
|
namespace: &str,
|
|
pod_name: &str,
|
|
step_name: &str,
|
|
definition_id: &str,
|
|
workflow_id: &str,
|
|
step_id: usize,
|
|
log_sink: Option<&dyn LogSink>,
|
|
) -> Result<String, WfeError> {
|
|
let pods: Api<Pod> = Api::namespaced(client.clone(), namespace);
|
|
|
|
let params = LogParams {
|
|
follow: true,
|
|
container: Some("step".into()),
|
|
..Default::default()
|
|
};
|
|
|
|
let stream = pods.log_stream(pod_name, ¶ms).await.map_err(|e| {
|
|
WfeError::StepExecution(format!(
|
|
"failed to stream logs from pod '{pod_name}': {e}"
|
|
))
|
|
})?;
|
|
|
|
let mut stdout = String::new();
|
|
let reader = futures::io::BufReader::new(stream);
|
|
let mut lines = reader.lines();
|
|
|
|
while let Some(line_result) = lines.next().await {
|
|
let line: String = line_result.map_err(|e| {
|
|
WfeError::StepExecution(format!("log stream error for pod '{pod_name}': {e}"))
|
|
})?;
|
|
stdout.push_str(&line);
|
|
stdout.push('\n');
|
|
|
|
if let Some(sink) = log_sink {
|
|
let mut data = line.into_bytes();
|
|
data.push(b'\n');
|
|
sink.write_chunk(LogChunk {
|
|
workflow_id: workflow_id.to_string(),
|
|
definition_id: definition_id.to_string(),
|
|
step_id,
|
|
step_name: step_name.to_string(),
|
|
stream: LogStreamType::Stdout,
|
|
data,
|
|
timestamp: chrono::Utc::now(),
|
|
})
|
|
.await;
|
|
}
|
|
}
|
|
|
|
Ok(stdout)
|
|
}
|
|
|
|
/// Wait for a pod's container to be in a running or terminated state.
|
|
pub async fn wait_for_pod_running(
|
|
client: &Client,
|
|
namespace: &str,
|
|
pod_name: &str,
|
|
) -> Result<(), WfeError> {
|
|
let pods: Api<Pod> = Api::namespaced(client.clone(), namespace);
|
|
|
|
for _ in 0..120 {
|
|
match pods.get(pod_name).await {
|
|
Ok(pod) => {
|
|
if let Some(status) = &pod.status {
|
|
if let Some(container_statuses) = &status.container_statuses {
|
|
for cs in container_statuses {
|
|
if let Some(state) = &cs.state {
|
|
if state.running.is_some() || state.terminated.is_some() {
|
|
return Ok(());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if let Some(conditions) = &status.conditions {
|
|
for cond in conditions {
|
|
if cond.type_ == "PodScheduled" && cond.status == "False" {
|
|
if let Some(ref msg) = cond.message {
|
|
return Err(WfeError::StepExecution(format!(
|
|
"pod '{pod_name}' scheduling failed: {msg}"
|
|
)));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Err(kube::Error::Api(err)) if err.code == 404 => {}
|
|
Err(e) => {
|
|
return Err(WfeError::StepExecution(format!(
|
|
"failed to get pod '{pod_name}': {e}"
|
|
)));
|
|
}
|
|
}
|
|
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
|
}
|
|
|
|
Err(WfeError::StepExecution(format!(
|
|
"pod '{pod_name}' did not start within 120s"
|
|
)))
|
|
}
|