diff --git a/wfe-yaml/Cargo.toml b/wfe-yaml/Cargo.toml index 10d17fb..aba7893 100644 --- a/wfe-yaml/Cargo.toml +++ b/wfe-yaml/Cargo.toml @@ -21,6 +21,7 @@ async-trait = { workspace = true } tokio = { workspace = true } thiserror = { workspace = true } tracing = { workspace = true } +chrono = { workspace = true } regex = { workspace = true } deno_core = { workspace = true, optional = true } deno_error = { workspace = true, optional = true } diff --git a/wfe-yaml/src/executors/shell.rs b/wfe-yaml/src/executors/shell.rs index 6db5a43..539c206 100644 --- a/wfe-yaml/src/executors/shell.rs +++ b/wfe-yaml/src/executors/shell.rs @@ -23,18 +23,23 @@ impl ShellStep { pub fn new(config: ShellConfig) -> Self { Self { config } } -} -#[async_trait] -impl StepBody for ShellStep { - async fn run(&mut self, context: &StepExecutionContext<'_>) -> wfe_core::Result { + fn build_command(&self, context: &StepExecutionContext<'_>) -> tokio::process::Command { let mut cmd = tokio::process::Command::new(&self.config.shell); cmd.arg("-c").arg(&self.config.run); // Inject workflow data as UPPER_CASE env vars (top-level keys only). + // Skip keys that would override security-sensitive environment variables. + const BLOCKED_KEYS: &[&str] = &[ + "PATH", "LD_PRELOAD", "LD_LIBRARY_PATH", "DYLD_LIBRARY_PATH", + "HOME", "SHELL", "USER", "LOGNAME", "TERM", + ]; if let Some(data_obj) = context.workflow.data.as_object() { for (key, value) in data_obj { let env_key = key.to_uppercase(); + if BLOCKED_KEYS.contains(&env_key.as_str()) { + continue; + } let env_val = match value { serde_json::Value::String(s) => s.clone(), other => other.to_string(), @@ -43,12 +48,10 @@ impl StepBody for ShellStep { } } - // Add extra env from config. for (key, value) in &self.config.env { cmd.env(key, value); } - // Set working directory if specified. if let Some(ref dir) = self.config.working_dir { cmd.current_dir(dir); } @@ -56,15 +59,137 @@ impl StepBody for ShellStep { cmd.stdout(std::process::Stdio::piped()); cmd.stderr(std::process::Stdio::piped()); - // Execute with optional timeout. + cmd + } + + /// Run with streaming output via LogSink. + /// + /// Reads stdout and stderr line-by-line, streaming each line to the + /// LogSink as it's produced. Uses `tokio::select!` to interleave both + /// streams without spawning tasks (avoids lifetime issues with &dyn LogSink). + async fn run_streaming( + &self, + context: &StepExecutionContext<'_>, + ) -> wfe_core::Result<(String, String, i32)> { + use tokio::io::{AsyncBufReadExt, BufReader}; + use wfe_core::traits::{LogChunk, LogStreamType}; + + let log_sink = context.log_sink.unwrap(); + let workflow_id = context.workflow.id.clone(); + let definition_id = context.workflow.workflow_definition_id.clone(); + let step_id = context.step.id; + let step_name = context.step.name.clone().unwrap_or_else(|| "unknown".to_string()); + + let mut cmd = self.build_command(context); + let mut child = cmd.spawn().map_err(|e| { + WfeError::StepExecution(format!("Failed to spawn shell command: {e}")) + })?; + + let stdout_pipe = child.stdout.take().ok_or_else(|| { + WfeError::StepExecution("failed to capture stdout pipe".to_string()) + })?; + let stderr_pipe = child.stderr.take().ok_or_else(|| { + WfeError::StepExecution("failed to capture stderr pipe".to_string()) + })?; + let mut stdout_lines = BufReader::new(stdout_pipe).lines(); + let mut stderr_lines = BufReader::new(stderr_pipe).lines(); + + let mut stdout_buf = Vec::new(); + let mut stderr_buf = Vec::new(); + let mut stdout_done = false; + let mut stderr_done = false; + + // Interleave stdout/stderr reads with optional timeout. + let read_future = async { + while !stdout_done || !stderr_done { + tokio::select! { + line = stdout_lines.next_line(), if !stdout_done => { + match line { + Ok(Some(line)) => { + log_sink.write_chunk(LogChunk { + workflow_id: workflow_id.clone(), + definition_id: definition_id.clone(), + step_id, + step_name: step_name.clone(), + stream: LogStreamType::Stdout, + data: format!("{line}\n").into_bytes(), + timestamp: chrono::Utc::now(), + }).await; + stdout_buf.push(line); + } + _ => stdout_done = true, + } + } + line = stderr_lines.next_line(), if !stderr_done => { + match line { + Ok(Some(line)) => { + log_sink.write_chunk(LogChunk { + workflow_id: workflow_id.clone(), + definition_id: definition_id.clone(), + step_id, + step_name: step_name.clone(), + stream: LogStreamType::Stderr, + data: format!("{line}\n").into_bytes(), + timestamp: chrono::Utc::now(), + }).await; + stderr_buf.push(line); + } + _ => stderr_done = true, + } + } + } + } + child.wait().await + }; + + let status = if let Some(timeout_ms) = self.config.timeout_ms { + let duration = std::time::Duration::from_millis(timeout_ms); + match tokio::time::timeout(duration, read_future).await { + Ok(result) => result.map_err(|e| { + WfeError::StepExecution(format!("Failed to wait for shell command: {e}")) + })?, + Err(_) => { + // Kill the child on timeout. + let _ = child.kill().await; + return Err(WfeError::StepExecution(format!( + "Shell command timed out after {timeout_ms}ms" + ))); + } + } + } else { + read_future.await.map_err(|e| { + WfeError::StepExecution(format!("Failed to wait for shell command: {e}")) + })? + }; + + let mut stdout = stdout_buf.join("\n"); + let mut stderr = stderr_buf.join("\n"); + if !stdout.is_empty() { + stdout.push('\n'); + } + if !stderr.is_empty() { + stderr.push('\n'); + } + + Ok((stdout, stderr, status.code().unwrap_or(-1))) + } + + /// Run with buffered output (original path, no LogSink). + async fn run_buffered( + &self, + context: &StepExecutionContext<'_>, + ) -> wfe_core::Result<(String, String, i32)> { + let mut cmd = self.build_command(context); + let output = if let Some(timeout_ms) = self.config.timeout_ms { let duration = std::time::Duration::from_millis(timeout_ms); match tokio::time::timeout(duration, cmd.output()).await { - Ok(result) => result.map_err(|e| WfeError::StepExecution(format!("Failed to spawn shell command: {e}")))?, + Ok(result) => result.map_err(|e| { + WfeError::StepExecution(format!("Failed to spawn shell command: {e}")) + })?, Err(_) => { return Err(WfeError::StepExecution(format!( - "Shell command timed out after {}ms", - timeout_ms + "Shell command timed out after {timeout_ms}ms" ))); } } @@ -76,11 +201,24 @@ impl StepBody for ShellStep { let stdout = String::from_utf8_lossy(&output.stdout).to_string(); let stderr = String::from_utf8_lossy(&output.stderr).to_string(); + let code = output.status.code().unwrap_or(-1); - if !output.status.success() { - let code = output.status.code().unwrap_or(-1); + Ok((stdout, stderr, code)) + } +} + +#[async_trait] +impl StepBody for ShellStep { + async fn run(&mut self, context: &StepExecutionContext<'_>) -> wfe_core::Result { + let (stdout, stderr, exit_code) = if context.log_sink.is_some() { + self.run_streaming(context).await? + } else { + self.run_buffered(context).await? + }; + + if exit_code != 0 { return Err(WfeError::StepExecution(format!( - "Shell command exited with code {code}\nstdout: {stdout}\nstderr: {stderr}" + "Shell command exited with code {exit_code}\nstdout: {stdout}\nstderr: {stderr}" ))); } @@ -93,7 +231,6 @@ impl StepBody for ShellStep { { let name = rest[..eq_pos].trim().to_string(); let raw_value = rest[eq_pos + 1..].to_string(); - // Auto-convert typed values from string annotations let value = match raw_value.as_str() { "true" => serde_json::Value::Bool(true), "false" => serde_json::Value::Bool(false), @@ -110,15 +247,10 @@ impl StepBody for ShellStep { } } - // Add raw stdout under the step name. - let step_name = context - .step - .name - .as_deref() - .unwrap_or("unknown"); + let step_name = context.step.name.as_deref().unwrap_or("unknown"); outputs.insert( format!("{step_name}.stdout"), - serde_json::Value::String(stdout.clone()), + serde_json::Value::String(stdout), ); outputs.insert( format!("{step_name}.stderr"), diff --git a/wfe-yaml/tests/compiler.rs b/wfe-yaml/tests/compiler.rs index 1746076..0899e3c 100644 --- a/wfe-yaml/tests/compiler.rs +++ b/wfe-yaml/tests/compiler.rs @@ -1082,6 +1082,7 @@ workflows: workflow: &workflow, cancellation_token: tokio_util::sync::CancellationToken::new(), host_context: Some(&host), + log_sink: None, }; let result = step.run(&ctx).await.unwrap(); diff --git a/wfe-yaml/tests/deno.rs b/wfe-yaml/tests/deno.rs index c0a005c..71e7972 100644 --- a/wfe-yaml/tests/deno.rs +++ b/wfe-yaml/tests/deno.rs @@ -42,6 +42,7 @@ fn make_context<'a>( workflow, cancellation_token: tokio_util::sync::CancellationToken::new(), host_context: None, + log_sink: None, } } diff --git a/wfe-yaml/tests/shell.rs b/wfe-yaml/tests/shell.rs index 6b2de05..ea0e8d2 100644 --- a/wfe-yaml/tests/shell.rs +++ b/wfe-yaml/tests/shell.rs @@ -53,6 +53,70 @@ async fn run_yaml_workflow(yaml: &str) -> wfe::models::WorkflowInstance { run_yaml_workflow_with_data(yaml, serde_json::json!({})).await } +/// A test LogSink that collects all chunks. +struct CollectingLogSink { + chunks: tokio::sync::Mutex>, +} + +impl CollectingLogSink { + fn new() -> Self { + Self { chunks: tokio::sync::Mutex::new(Vec::new()) } + } + + async fn chunks(&self) -> Vec { + self.chunks.lock().await.clone() + } +} + +#[async_trait::async_trait] +impl wfe_core::traits::LogSink for CollectingLogSink { + async fn write_chunk(&self, chunk: wfe_core::traits::LogChunk) { + self.chunks.lock().await.push(chunk); + } +} + +/// Run a workflow with a LogSink to verify log streaming works end-to-end. +async fn run_yaml_workflow_with_log_sink( + yaml: &str, + log_sink: Arc, +) -> wfe::models::WorkflowInstance { + let config = HashMap::new(); + let compiled = load_single_workflow_from_str(yaml, &config).unwrap(); + + let persistence = Arc::new(InMemoryPersistenceProvider::new()); + let lock = Arc::new(InMemoryLockProvider::new()); + let queue = Arc::new(InMemoryQueueProvider::new()); + + let host = WorkflowHostBuilder::new() + .use_persistence(persistence as Arc) + .use_lock_provider(lock as Arc) + .use_queue_provider(queue as Arc) + .use_log_sink(log_sink as Arc) + .build() + .unwrap(); + + for (key, factory) in compiled.step_factories { + host.register_step_factory(&key, factory).await; + } + + host.register_workflow_definition(compiled.definition.clone()) + .await; + host.start().await.unwrap(); + + let instance = run_workflow_sync( + &host, + &compiled.definition.id, + compiled.definition.version, + serde_json::json!({}), + Duration::from_secs(10), + ) + .await + .unwrap(); + + host.stop().await; + instance +} + #[tokio::test] async fn simple_echo_captures_stdout() { let yaml = r#" @@ -236,3 +300,176 @@ workflow: let instance = run_yaml_workflow(yaml).await; assert_eq!(instance.status, WorkflowStatus::Complete); } + +// ── LogSink regression tests ───────────────────────────────────────── + +#[tokio::test] +async fn log_sink_receives_stdout_chunks() { + let log_sink = Arc::new(CollectingLogSink::new()); + let yaml = r#" +workflow: + id: logsink-stdout-wf + version: 1 + steps: + - name: echo-step + type: shell + config: + run: echo "line one" && echo "line two" +"#; + let instance = run_yaml_workflow_with_log_sink(yaml, log_sink.clone()).await; + assert_eq!(instance.status, WorkflowStatus::Complete); + + let chunks = log_sink.chunks().await; + assert!(chunks.len() >= 2, "expected at least 2 stdout chunks, got {}", chunks.len()); + + let stdout_chunks: Vec<_> = chunks + .iter() + .filter(|c| c.stream == wfe_core::traits::LogStreamType::Stdout) + .collect(); + assert!(stdout_chunks.len() >= 2, "expected at least 2 stdout chunks"); + + let all_data: String = stdout_chunks.iter() + .map(|c| String::from_utf8_lossy(&c.data).to_string()) + .collect(); + assert!(all_data.contains("line one"), "stdout should contain 'line one', got: {all_data}"); + assert!(all_data.contains("line two"), "stdout should contain 'line two', got: {all_data}"); + + // Verify chunk metadata. + for chunk in &stdout_chunks { + assert!(!chunk.workflow_id.is_empty()); + assert_eq!(chunk.step_name, "echo-step"); + } +} + +#[tokio::test] +async fn log_sink_receives_stderr_chunks() { + let log_sink = Arc::new(CollectingLogSink::new()); + let yaml = r#" +workflow: + id: logsink-stderr-wf + version: 1 + steps: + - name: err-step + type: shell + config: + run: echo "stderr output" >&2 +"#; + let instance = run_yaml_workflow_with_log_sink(yaml, log_sink.clone()).await; + assert_eq!(instance.status, WorkflowStatus::Complete); + + let chunks = log_sink.chunks().await; + let stderr_chunks: Vec<_> = chunks + .iter() + .filter(|c| c.stream == wfe_core::traits::LogStreamType::Stderr) + .collect(); + assert!(!stderr_chunks.is_empty(), "expected stderr chunks"); + + let stderr_data: String = stderr_chunks.iter() + .map(|c| String::from_utf8_lossy(&c.data).to_string()) + .collect(); + assert!(stderr_data.contains("stderr output"), "stderr should contain 'stderr output', got: {stderr_data}"); +} + +#[tokio::test] +async fn log_sink_captures_multi_step_workflow() { + let log_sink = Arc::new(CollectingLogSink::new()); + let yaml = r#" +workflow: + id: logsink-multi-wf + version: 1 + steps: + - name: step-a + type: shell + config: + run: echo "from step a" + - name: step-b + type: shell + config: + run: echo "from step b" +"#; + let instance = run_yaml_workflow_with_log_sink(yaml, log_sink.clone()).await; + assert_eq!(instance.status, WorkflowStatus::Complete); + + let chunks = log_sink.chunks().await; + let step_names: Vec<_> = chunks.iter().map(|c| c.step_name.as_str()).collect(); + assert!(step_names.contains(&"step-a"), "should have chunks from step-a"); + assert!(step_names.contains(&"step-b"), "should have chunks from step-b"); +} + +#[tokio::test] +async fn log_sink_not_configured_still_works() { + // Without a log_sink, the buffered path should still work. + let yaml = r#" +workflow: + id: no-logsink-wf + version: 1 + steps: + - name: echo-step + type: shell + config: + run: echo "no sink" +"#; + let instance = run_yaml_workflow(yaml).await; + assert_eq!(instance.status, WorkflowStatus::Complete); + let data = instance.data.as_object().unwrap(); + assert!(data.get("echo-step.stdout").unwrap().as_str().unwrap().contains("no sink")); +} + +// ── Security regression tests ──────────────────────────────────────── + +#[tokio::test] +async fn security_blocked_env_vars_not_injected() { + // MEDIUM-22: Workflow data keys like "path" must NOT override PATH. + let yaml = r#" +workflow: + id: sec-env-wf + version: 1 + steps: + - name: check-path + type: shell + config: + run: echo "$PATH" +"#; + // Set a workflow data key "path" that would override PATH if not blocked. + let instance = run_yaml_workflow_with_data( + yaml, + serde_json::json!({"path": "/attacker/bin"}), + ) + .await; + assert_eq!(instance.status, WorkflowStatus::Complete); + + let data = instance.data.as_object().unwrap(); + let stdout = data.get("check-path.stdout").unwrap().as_str().unwrap(); + // PATH should NOT contain /attacker/bin. + assert!( + !stdout.contains("/attacker/bin"), + "PATH should not be overridden by workflow data, got: {stdout}" + ); +} + +#[tokio::test] +async fn security_safe_env_vars_still_injected() { + // Verify non-blocked keys still work after the security fix. + let wfe_prefix = "##wfe"; + let yaml = format!( + r#" +workflow: + id: sec-safe-env-wf + version: 1 + steps: + - name: check-var + type: shell + config: + run: echo "{wfe_prefix}[output val=$MY_CUSTOM_VAR]" +"# + ); + let instance = run_yaml_workflow_with_data( + &yaml, + serde_json::json!({"my_custom_var": "works"}), + ) + .await; + assert_eq!(instance.status, WorkflowStatus::Complete); + + let data = instance.data.as_object().unwrap(); + assert_eq!(data.get("val").and_then(|v| v.as_str()), Some("works")); +}