feat(wfe-yaml): add log streaming to shell executor + security hardening
Shell step streaming: when LogSink is present, uses cmd.spawn() with tokio::select! to interleave stdout/stderr line-by-line. Respects timeout_ms with child.kill() on timeout. Falls back to buffered mode when no LogSink. Security: block sensitive env var overrides (PATH, LD_PRELOAD, etc.) from workflow data injection. Proper error handling for pipe capture. 4 LogSink regression tests + 2 env var security regression tests.
This commit is contained in:
@@ -23,18 +23,23 @@ impl ShellStep {
|
||||
pub fn new(config: ShellConfig) -> Self {
|
||||
Self { config }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl StepBody for ShellStep {
|
||||
async fn run(&mut self, context: &StepExecutionContext<'_>) -> wfe_core::Result<ExecutionResult> {
|
||||
fn build_command(&self, context: &StepExecutionContext<'_>) -> tokio::process::Command {
|
||||
let mut cmd = tokio::process::Command::new(&self.config.shell);
|
||||
cmd.arg("-c").arg(&self.config.run);
|
||||
|
||||
// Inject workflow data as UPPER_CASE env vars (top-level keys only).
|
||||
// Skip keys that would override security-sensitive environment variables.
|
||||
const BLOCKED_KEYS: &[&str] = &[
|
||||
"PATH", "LD_PRELOAD", "LD_LIBRARY_PATH", "DYLD_LIBRARY_PATH",
|
||||
"HOME", "SHELL", "USER", "LOGNAME", "TERM",
|
||||
];
|
||||
if let Some(data_obj) = context.workflow.data.as_object() {
|
||||
for (key, value) in data_obj {
|
||||
let env_key = key.to_uppercase();
|
||||
if BLOCKED_KEYS.contains(&env_key.as_str()) {
|
||||
continue;
|
||||
}
|
||||
let env_val = match value {
|
||||
serde_json::Value::String(s) => s.clone(),
|
||||
other => other.to_string(),
|
||||
@@ -43,12 +48,10 @@ impl StepBody for ShellStep {
|
||||
}
|
||||
}
|
||||
|
||||
// Add extra env from config.
|
||||
for (key, value) in &self.config.env {
|
||||
cmd.env(key, value);
|
||||
}
|
||||
|
||||
// Set working directory if specified.
|
||||
if let Some(ref dir) = self.config.working_dir {
|
||||
cmd.current_dir(dir);
|
||||
}
|
||||
@@ -56,15 +59,137 @@ impl StepBody for ShellStep {
|
||||
cmd.stdout(std::process::Stdio::piped());
|
||||
cmd.stderr(std::process::Stdio::piped());
|
||||
|
||||
// Execute with optional timeout.
|
||||
cmd
|
||||
}
|
||||
|
||||
/// Run with streaming output via LogSink.
|
||||
///
|
||||
/// Reads stdout and stderr line-by-line, streaming each line to the
|
||||
/// LogSink as it's produced. Uses `tokio::select!` to interleave both
|
||||
/// streams without spawning tasks (avoids lifetime issues with &dyn LogSink).
|
||||
async fn run_streaming(
|
||||
&self,
|
||||
context: &StepExecutionContext<'_>,
|
||||
) -> wfe_core::Result<(String, String, i32)> {
|
||||
use tokio::io::{AsyncBufReadExt, BufReader};
|
||||
use wfe_core::traits::{LogChunk, LogStreamType};
|
||||
|
||||
let log_sink = context.log_sink.unwrap();
|
||||
let workflow_id = context.workflow.id.clone();
|
||||
let definition_id = context.workflow.workflow_definition_id.clone();
|
||||
let step_id = context.step.id;
|
||||
let step_name = context.step.name.clone().unwrap_or_else(|| "unknown".to_string());
|
||||
|
||||
let mut cmd = self.build_command(context);
|
||||
let mut child = cmd.spawn().map_err(|e| {
|
||||
WfeError::StepExecution(format!("Failed to spawn shell command: {e}"))
|
||||
})?;
|
||||
|
||||
let stdout_pipe = child.stdout.take().ok_or_else(|| {
|
||||
WfeError::StepExecution("failed to capture stdout pipe".to_string())
|
||||
})?;
|
||||
let stderr_pipe = child.stderr.take().ok_or_else(|| {
|
||||
WfeError::StepExecution("failed to capture stderr pipe".to_string())
|
||||
})?;
|
||||
let mut stdout_lines = BufReader::new(stdout_pipe).lines();
|
||||
let mut stderr_lines = BufReader::new(stderr_pipe).lines();
|
||||
|
||||
let mut stdout_buf = Vec::new();
|
||||
let mut stderr_buf = Vec::new();
|
||||
let mut stdout_done = false;
|
||||
let mut stderr_done = false;
|
||||
|
||||
// Interleave stdout/stderr reads with optional timeout.
|
||||
let read_future = async {
|
||||
while !stdout_done || !stderr_done {
|
||||
tokio::select! {
|
||||
line = stdout_lines.next_line(), if !stdout_done => {
|
||||
match line {
|
||||
Ok(Some(line)) => {
|
||||
log_sink.write_chunk(LogChunk {
|
||||
workflow_id: workflow_id.clone(),
|
||||
definition_id: definition_id.clone(),
|
||||
step_id,
|
||||
step_name: step_name.clone(),
|
||||
stream: LogStreamType::Stdout,
|
||||
data: format!("{line}\n").into_bytes(),
|
||||
timestamp: chrono::Utc::now(),
|
||||
}).await;
|
||||
stdout_buf.push(line);
|
||||
}
|
||||
_ => stdout_done = true,
|
||||
}
|
||||
}
|
||||
line = stderr_lines.next_line(), if !stderr_done => {
|
||||
match line {
|
||||
Ok(Some(line)) => {
|
||||
log_sink.write_chunk(LogChunk {
|
||||
workflow_id: workflow_id.clone(),
|
||||
definition_id: definition_id.clone(),
|
||||
step_id,
|
||||
step_name: step_name.clone(),
|
||||
stream: LogStreamType::Stderr,
|
||||
data: format!("{line}\n").into_bytes(),
|
||||
timestamp: chrono::Utc::now(),
|
||||
}).await;
|
||||
stderr_buf.push(line);
|
||||
}
|
||||
_ => stderr_done = true,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
child.wait().await
|
||||
};
|
||||
|
||||
let status = if let Some(timeout_ms) = self.config.timeout_ms {
|
||||
let duration = std::time::Duration::from_millis(timeout_ms);
|
||||
match tokio::time::timeout(duration, read_future).await {
|
||||
Ok(result) => result.map_err(|e| {
|
||||
WfeError::StepExecution(format!("Failed to wait for shell command: {e}"))
|
||||
})?,
|
||||
Err(_) => {
|
||||
// Kill the child on timeout.
|
||||
let _ = child.kill().await;
|
||||
return Err(WfeError::StepExecution(format!(
|
||||
"Shell command timed out after {timeout_ms}ms"
|
||||
)));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
read_future.await.map_err(|e| {
|
||||
WfeError::StepExecution(format!("Failed to wait for shell command: {e}"))
|
||||
})?
|
||||
};
|
||||
|
||||
let mut stdout = stdout_buf.join("\n");
|
||||
let mut stderr = stderr_buf.join("\n");
|
||||
if !stdout.is_empty() {
|
||||
stdout.push('\n');
|
||||
}
|
||||
if !stderr.is_empty() {
|
||||
stderr.push('\n');
|
||||
}
|
||||
|
||||
Ok((stdout, stderr, status.code().unwrap_or(-1)))
|
||||
}
|
||||
|
||||
/// Run with buffered output (original path, no LogSink).
|
||||
async fn run_buffered(
|
||||
&self,
|
||||
context: &StepExecutionContext<'_>,
|
||||
) -> wfe_core::Result<(String, String, i32)> {
|
||||
let mut cmd = self.build_command(context);
|
||||
|
||||
let output = if let Some(timeout_ms) = self.config.timeout_ms {
|
||||
let duration = std::time::Duration::from_millis(timeout_ms);
|
||||
match tokio::time::timeout(duration, cmd.output()).await {
|
||||
Ok(result) => result.map_err(|e| WfeError::StepExecution(format!("Failed to spawn shell command: {e}")))?,
|
||||
Ok(result) => result.map_err(|e| {
|
||||
WfeError::StepExecution(format!("Failed to spawn shell command: {e}"))
|
||||
})?,
|
||||
Err(_) => {
|
||||
return Err(WfeError::StepExecution(format!(
|
||||
"Shell command timed out after {}ms",
|
||||
timeout_ms
|
||||
"Shell command timed out after {timeout_ms}ms"
|
||||
)));
|
||||
}
|
||||
}
|
||||
@@ -76,11 +201,24 @@ impl StepBody for ShellStep {
|
||||
|
||||
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
|
||||
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
|
||||
let code = output.status.code().unwrap_or(-1);
|
||||
|
||||
if !output.status.success() {
|
||||
let code = output.status.code().unwrap_or(-1);
|
||||
Ok((stdout, stderr, code))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl StepBody for ShellStep {
|
||||
async fn run(&mut self, context: &StepExecutionContext<'_>) -> wfe_core::Result<ExecutionResult> {
|
||||
let (stdout, stderr, exit_code) = if context.log_sink.is_some() {
|
||||
self.run_streaming(context).await?
|
||||
} else {
|
||||
self.run_buffered(context).await?
|
||||
};
|
||||
|
||||
if exit_code != 0 {
|
||||
return Err(WfeError::StepExecution(format!(
|
||||
"Shell command exited with code {code}\nstdout: {stdout}\nstderr: {stderr}"
|
||||
"Shell command exited with code {exit_code}\nstdout: {stdout}\nstderr: {stderr}"
|
||||
)));
|
||||
}
|
||||
|
||||
@@ -93,7 +231,6 @@ impl StepBody for ShellStep {
|
||||
{
|
||||
let name = rest[..eq_pos].trim().to_string();
|
||||
let raw_value = rest[eq_pos + 1..].to_string();
|
||||
// Auto-convert typed values from string annotations
|
||||
let value = match raw_value.as_str() {
|
||||
"true" => serde_json::Value::Bool(true),
|
||||
"false" => serde_json::Value::Bool(false),
|
||||
@@ -110,15 +247,10 @@ impl StepBody for ShellStep {
|
||||
}
|
||||
}
|
||||
|
||||
// Add raw stdout under the step name.
|
||||
let step_name = context
|
||||
.step
|
||||
.name
|
||||
.as_deref()
|
||||
.unwrap_or("unknown");
|
||||
let step_name = context.step.name.as_deref().unwrap_or("unknown");
|
||||
outputs.insert(
|
||||
format!("{step_name}.stdout"),
|
||||
serde_json::Value::String(stdout.clone()),
|
||||
serde_json::Value::String(stdout),
|
||||
);
|
||||
outputs.insert(
|
||||
format!("{step_name}.stderr"),
|
||||
|
||||
Reference in New Issue
Block a user