diff --git a/Cargo.toml b/Cargo.toml index e14585c..b26925a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["wfe-core", "wfe-sqlite", "wfe-postgres", "wfe-opensearch", "wfe-valkey", "wfe"] +members = ["wfe-core", "wfe-sqlite", "wfe-postgres", "wfe-opensearch", "wfe-valkey", "wfe", "wfe-yaml"] resolver = "2" [workspace.package] @@ -41,6 +41,11 @@ wfe-sqlite = { path = "wfe-sqlite" } wfe-postgres = { path = "wfe-postgres" } wfe-opensearch = { path = "wfe-opensearch" } wfe-valkey = { path = "wfe-valkey" } +wfe-yaml = { path = "wfe-yaml" } + +# YAML +serde_yaml = "0.9" +regex = "1" # Dev/Test pretty_assertions = "1" diff --git a/wfe-yaml/Cargo.toml b/wfe-yaml/Cargo.toml new file mode 100644 index 0000000..9601b1e --- /dev/null +++ b/wfe-yaml/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "wfe-yaml" +version.workspace = true +edition.workspace = true +description = "YAML workflow definitions for WFE" + +[dependencies] +wfe-core = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +serde_yaml = { workspace = true } +async-trait = { workspace = true } +tokio = { workspace = true } +thiserror = { workspace = true } +tracing = { workspace = true } +regex = { workspace = true } + +[dev-dependencies] +pretty_assertions = { workspace = true } +tokio = { workspace = true, features = ["test-util", "process"] } +wfe-core = { workspace = true, features = ["test-support"] } +wfe = { path = "../wfe" } diff --git a/wfe-yaml/src/compiler.rs b/wfe-yaml/src/compiler.rs new file mode 100644 index 0000000..71fda3b --- /dev/null +++ b/wfe-yaml/src/compiler.rs @@ -0,0 +1,313 @@ +use std::time::Duration; + +use wfe_core::models::error_behavior::ErrorBehavior; +use wfe_core::models::workflow_definition::{StepOutcome, WorkflowDefinition, WorkflowStep}; +use wfe_core::traits::StepBody; + +use crate::error::YamlWorkflowError; +use crate::executors::shell::{ShellConfig, ShellStep}; +use crate::schema::{WorkflowSpec, YamlErrorBehavior, YamlStep}; + +/// Factory type alias for step creation closures. +pub type StepFactory = Box Box + Send + Sync>; + +/// A compiled workflow ready to be registered with the WFE host. +pub struct CompiledWorkflow { + pub definition: WorkflowDefinition, + pub step_factories: Vec<(String, StepFactory)>, +} + +/// Compile a parsed WorkflowSpec into a CompiledWorkflow. +pub fn compile(spec: &WorkflowSpec) -> Result { + let mut definition = WorkflowDefinition::new(&spec.id, spec.version); + definition.description = spec.description.clone(); + + if let Some(ref eb) = spec.error_behavior { + definition.default_error_behavior = map_error_behavior(eb)?; + } + + let mut factories: Vec<(String, StepFactory)> = Vec::new(); + let mut next_id: usize = 0; + + compile_steps(&spec.steps, &mut definition, &mut factories, &mut next_id)?; + + Ok(CompiledWorkflow { + definition, + step_factories: factories, + }) +} + +fn compile_steps( + yaml_steps: &[YamlStep], + definition: &mut WorkflowDefinition, + factories: &mut Vec<(String, StepFactory)>, + next_id: &mut usize, +) -> Result, YamlWorkflowError> { + let mut main_step_ids = Vec::new(); + + for yaml_step in yaml_steps { + if let Some(ref parallel_children) = yaml_step.parallel { + // Create a Sequence container step for the parallel block. + let container_id = *next_id; + *next_id += 1; + + let mut container = WorkflowStep::new( + container_id, + "wfe_core::primitives::sequence::SequenceStep", + ); + container.name = Some(yaml_step.name.clone()); + + if let Some(ref eb) = yaml_step.error_behavior { + container.error_behavior = Some(map_error_behavior(eb)?); + } + + // Compile children. + let child_ids = + compile_steps(parallel_children, definition, factories, next_id)?; + container.children = child_ids; + + definition.steps.push(container); + main_step_ids.push(container_id); + } else { + // Regular step (shell). + let step_id = *next_id; + *next_id += 1; + + let step_type_key = format!("wfe_yaml::shell::{}", yaml_step.name); + let config = build_shell_config(yaml_step)?; + + let mut wf_step = WorkflowStep::new(step_id, &step_type_key); + wf_step.name = Some(yaml_step.name.clone()); + wf_step.step_config = Some(serde_json::to_value(&config).map_err(|e| { + YamlWorkflowError::Compilation(format!( + "Failed to serialize shell config: {e}" + )) + })?); + + if let Some(ref eb) = yaml_step.error_behavior { + wf_step.error_behavior = Some(map_error_behavior(eb)?); + } + + // Handle on_failure: create compensation step. + if let Some(ref on_failure) = yaml_step.on_failure { + let comp_id = *next_id; + *next_id += 1; + + let comp_key = format!("wfe_yaml::shell::{}", on_failure.name); + let comp_config = build_shell_config(on_failure)?; + + let mut comp_step = WorkflowStep::new(comp_id, &comp_key); + comp_step.name = Some(on_failure.name.clone()); + comp_step.step_config = + Some(serde_json::to_value(&comp_config).map_err(|e| { + YamlWorkflowError::Compilation(format!( + "Failed to serialize shell config: {e}" + )) + })?); + + wf_step.compensation_step_id = Some(comp_id); + wf_step.error_behavior = Some(ErrorBehavior::Compensate); + + definition.steps.push(comp_step); + + let comp_config_clone = comp_config.clone(); + factories.push(( + comp_key, + Box::new(move || { + Box::new(ShellStep::new(comp_config_clone.clone())) + as Box + }), + )); + } + + // Handle on_success: insert between this step and the next. + if let Some(ref on_success) = yaml_step.on_success { + let success_id = *next_id; + *next_id += 1; + + let success_key = format!("wfe_yaml::shell::{}", on_success.name); + let success_config = build_shell_config(on_success)?; + + let mut success_step = WorkflowStep::new(success_id, &success_key); + success_step.name = Some(on_success.name.clone()); + success_step.step_config = + Some(serde_json::to_value(&success_config).map_err(|e| { + YamlWorkflowError::Compilation(format!( + "Failed to serialize shell config: {e}" + )) + })?); + + // Wire main step -> on_success step. + wf_step.outcomes.push(StepOutcome { + next_step: success_id, + label: Some("success".to_string()), + value: None, + }); + + definition.steps.push(success_step); + + let success_config_clone = success_config.clone(); + factories.push(( + success_key, + Box::new(move || { + Box::new(ShellStep::new(success_config_clone.clone())) + as Box + }), + )); + } + + // Handle ensure: create an ensure step wired after both paths. + if let Some(ref ensure) = yaml_step.ensure { + let ensure_id = *next_id; + *next_id += 1; + + let ensure_key = format!("wfe_yaml::shell::{}", ensure.name); + let ensure_config = build_shell_config(ensure)?; + + let mut ensure_step = WorkflowStep::new(ensure_id, &ensure_key); + ensure_step.name = Some(ensure.name.clone()); + ensure_step.step_config = + Some(serde_json::to_value(&ensure_config).map_err(|e| { + YamlWorkflowError::Compilation(format!( + "Failed to serialize shell config: {e}" + )) + })?); + + // Wire main step -> ensure (if no on_success already). + if yaml_step.on_success.is_none() { + wf_step.outcomes.push(StepOutcome { + next_step: ensure_id, + label: Some("ensure".to_string()), + value: None, + }); + } + + definition.steps.push(ensure_step); + + let ensure_config_clone = ensure_config.clone(); + factories.push(( + ensure_key, + Box::new(move || { + Box::new(ShellStep::new(ensure_config_clone.clone())) + as Box + }), + )); + } + + definition.steps.push(wf_step); + + // Register factory for main step. + let config_clone = config.clone(); + factories.push(( + step_type_key, + Box::new(move || { + Box::new(ShellStep::new(config_clone.clone())) as Box + }), + )); + + main_step_ids.push(step_id); + } + } + + // Wire sequential outcomes between main steps (step N -> step N+1). + for i in 0..main_step_ids.len().saturating_sub(1) { + let current_id = main_step_ids[i]; + let next_step_id = main_step_ids[i + 1]; + + if let Some(step) = definition.steps.iter_mut().find(|s| s.id == current_id) { + if step.outcomes.is_empty() { + step.outcomes.push(StepOutcome { + next_step: next_step_id, + label: None, + value: None, + }); + } else { + // Wire the last hook step to the next main step. + let last_outcome_step = step.outcomes.last().unwrap().next_step; + if let Some(hook_step) = definition + .steps + .iter_mut() + .find(|s| s.id == last_outcome_step) + && hook_step.outcomes.is_empty() + { + hook_step.outcomes.push(StepOutcome { + next_step: next_step_id, + label: None, + value: None, + }); + } + } + } + } + + Ok(main_step_ids) +} + +fn build_shell_config(step: &YamlStep) -> Result { + let config = step.config.as_ref().ok_or_else(|| { + YamlWorkflowError::Compilation(format!( + "Step '{}' is missing 'config' section", + step.name + )) + })?; + + let run = config + .run + .clone() + .or_else(|| config.file.as_ref().map(|f| format!("sh {f}"))) + .or_else(|| config.script.clone()) + .ok_or_else(|| { + YamlWorkflowError::Compilation(format!( + "Step '{}' must have 'run', 'file', or 'script' in config", + step.name + )) + })?; + + let shell = config.shell.clone().unwrap_or_else(|| "sh".to_string()); + let timeout_ms = config.timeout.as_ref().and_then(|t| parse_duration_ms(t)); + + Ok(ShellConfig { + run, + shell, + env: config.env.clone(), + working_dir: config.working_dir.clone(), + timeout_ms, + }) +} + +fn parse_duration_ms(s: &str) -> Option { + let s = s.trim(); + if let Some(secs) = s.strip_suffix('s') { + secs.trim().parse::().ok().map(|v| v * 1000) + } else if let Some(mins) = s.strip_suffix('m') { + mins.trim().parse::().ok().map(|v| v * 60 * 1000) + } else if let Some(ms) = s.strip_suffix("ms") { + ms.trim().parse::().ok() + } else { + s.parse::().ok() + } +} + +fn map_error_behavior(eb: &YamlErrorBehavior) -> Result { + match eb.behavior_type.as_str() { + "retry" => { + let interval = eb + .interval + .as_ref() + .and_then(|i| parse_duration_ms(i)) + .map(Duration::from_millis) + .unwrap_or(Duration::from_secs(60)); + let max_retries = eb.max_retries.unwrap_or(3); + Ok(ErrorBehavior::Retry { + interval, + max_retries, + }) + } + "suspend" => Ok(ErrorBehavior::Suspend), + "terminate" => Ok(ErrorBehavior::Terminate), + "compensate" => Ok(ErrorBehavior::Compensate), + other => Err(YamlWorkflowError::Compilation(format!( + "Unknown error behavior type: '{other}'" + ))), + } +} diff --git a/wfe-yaml/src/error.rs b/wfe-yaml/src/error.rs new file mode 100644 index 0000000..2104c6f --- /dev/null +++ b/wfe-yaml/src/error.rs @@ -0,0 +1,13 @@ +#[derive(Debug, thiserror::Error)] +pub enum YamlWorkflowError { + #[error("YAML parse error: {0}")] + Parse(#[from] serde_yaml::Error), + #[error("Interpolation error: unresolved variable '{0}'")] + UnresolvedVariable(String), + #[error("Validation error: {0}")] + Validation(String), + #[error("Compilation error: {0}")] + Compilation(String), + #[error("IO error: {0}")] + Io(#[from] std::io::Error), +} diff --git a/wfe-yaml/src/executors/mod.rs b/wfe-yaml/src/executors/mod.rs new file mode 100644 index 0000000..327cf1b --- /dev/null +++ b/wfe-yaml/src/executors/mod.rs @@ -0,0 +1 @@ +pub mod shell; diff --git a/wfe-yaml/src/executors/shell.rs b/wfe-yaml/src/executors/shell.rs new file mode 100644 index 0000000..da9e55d --- /dev/null +++ b/wfe-yaml/src/executors/shell.rs @@ -0,0 +1,121 @@ +use std::collections::HashMap; + +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use wfe_core::models::ExecutionResult; +use wfe_core::traits::step::{StepBody, StepExecutionContext}; +use wfe_core::WfeError; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ShellConfig { + pub run: String, + pub shell: String, + pub env: HashMap, + pub working_dir: Option, + pub timeout_ms: Option, +} + +pub struct ShellStep { + config: ShellConfig, +} + +impl ShellStep { + pub fn new(config: ShellConfig) -> Self { + Self { config } + } +} + +#[async_trait] +impl StepBody for ShellStep { + async fn run(&mut self, context: &StepExecutionContext<'_>) -> wfe_core::Result { + let mut cmd = tokio::process::Command::new(&self.config.shell); + cmd.arg("-c").arg(&self.config.run); + + // Inject workflow data as UPPER_CASE env vars (top-level keys only). + if let Some(data_obj) = context.workflow.data.as_object() { + for (key, value) in data_obj { + let env_key = key.to_uppercase(); + let env_val = match value { + serde_json::Value::String(s) => s.clone(), + other => other.to_string(), + }; + cmd.env(&env_key, &env_val); + } + } + + // Add extra env from config. + for (key, value) in &self.config.env { + cmd.env(key, value); + } + + // Set working directory if specified. + if let Some(ref dir) = self.config.working_dir { + cmd.current_dir(dir); + } + + cmd.stdout(std::process::Stdio::piped()); + cmd.stderr(std::process::Stdio::piped()); + + // Execute with optional timeout. + let output = if let Some(timeout_ms) = self.config.timeout_ms { + let duration = std::time::Duration::from_millis(timeout_ms); + match tokio::time::timeout(duration, cmd.output()).await { + Ok(result) => result.map_err(|e| WfeError::StepExecution(format!("Failed to spawn shell command: {e}")))?, + Err(_) => { + return Err(WfeError::StepExecution(format!( + "Shell command timed out after {}ms", + timeout_ms + ))); + } + } + } else { + cmd.output() + .await + .map_err(|e| WfeError::StepExecution(format!("Failed to spawn shell command: {e}")))? + }; + + let stdout = String::from_utf8_lossy(&output.stdout).to_string(); + let stderr = String::from_utf8_lossy(&output.stderr).to_string(); + + if !output.status.success() { + let code = output.status.code().unwrap_or(-1); + return Err(WfeError::StepExecution(format!( + "Shell command exited with code {code}\nstdout: {stdout}\nstderr: {stderr}" + ))); + } + + // Parse ##wfe[output name=value] lines from stdout. + let mut outputs = serde_json::Map::new(); + for line in stdout.lines() { + if let Some(rest) = line.strip_prefix("##wfe[output ") + && let Some(rest) = rest.strip_suffix(']') + && let Some(eq_pos) = rest.find('=') + { + let name = rest[..eq_pos].trim().to_string(); + let value = rest[eq_pos + 1..].to_string(); + outputs.insert(name, serde_json::Value::String(value)); + } + } + + // Add raw stdout under the step name. + let step_name = context + .step + .name + .as_deref() + .unwrap_or("unknown"); + outputs.insert( + format!("{step_name}.stdout"), + serde_json::Value::String(stdout.clone()), + ); + outputs.insert( + format!("{step_name}.stderr"), + serde_json::Value::String(stderr), + ); + + Ok(ExecutionResult { + proceed: true, + output_data: Some(serde_json::Value::Object(outputs)), + ..Default::default() + }) + } +} diff --git a/wfe-yaml/src/interpolation.rs b/wfe-yaml/src/interpolation.rs new file mode 100644 index 0000000..d407625 --- /dev/null +++ b/wfe-yaml/src/interpolation.rs @@ -0,0 +1,64 @@ +use std::collections::HashMap; + +use regex::Regex; + +use crate::error::YamlWorkflowError; + +/// Resolve `((var.path))` expressions in a YAML string against a config map. +/// +/// Dot-path traversal: `((config.database.host))` resolves by walking +/// `config["config"]["database"]["host"]`. +pub fn interpolate( + yaml: &str, + config: &HashMap, +) -> Result { + let re = Regex::new(r"\(\(([a-zA-Z0-9_.]+)\)\)").expect("valid regex"); + + let mut result = String::with_capacity(yaml.len()); + let mut last_end = 0; + + for cap in re.captures_iter(yaml) { + let m = cap.get(0).unwrap(); + let var_path = &cap[1]; + + // Resolve the variable path. + let value = resolve_path(var_path, config)?; + + result.push_str(&yaml[last_end..m.start()]); + result.push_str(&value); + last_end = m.end(); + } + + result.push_str(&yaml[last_end..]); + Ok(result) +} + +fn resolve_path( + path: &str, + config: &HashMap, +) -> Result { + let parts: Vec<&str> = path.split('.').collect(); + if parts.is_empty() { + return Err(YamlWorkflowError::UnresolvedVariable(path.to_string())); + } + + // The first segment is the top-level key in the config map. + let root = config + .get(parts[0]) + .ok_or_else(|| YamlWorkflowError::UnresolvedVariable(path.to_string()))?; + + // Walk remaining segments. + let mut current = root; + for &segment in &parts[1..] { + current = current + .get(segment) + .ok_or_else(|| YamlWorkflowError::UnresolvedVariable(path.to_string()))?; + } + + // Convert the final value to a string. + match current { + serde_json::Value::String(s) => Ok(s.clone()), + serde_json::Value::Null => Ok("null".to_string()), + other => Ok(other.to_string()), + } +} diff --git a/wfe-yaml/src/lib.rs b/wfe-yaml/src/lib.rs new file mode 100644 index 0000000..e094058 --- /dev/null +++ b/wfe-yaml/src/lib.rs @@ -0,0 +1,38 @@ +pub mod compiler; +pub mod error; +pub mod executors; +pub mod interpolation; +pub mod schema; +pub mod validation; + +use std::collections::HashMap; + +use crate::compiler::CompiledWorkflow; +use crate::error::YamlWorkflowError; + +/// Load a workflow from a YAML file path, applying variable interpolation. +pub fn load_workflow( + path: &std::path::Path, + config: &HashMap, +) -> Result { + let yaml = std::fs::read_to_string(path)?; + load_workflow_from_str(&yaml, config) +} + +/// Load a workflow from a YAML string, applying variable interpolation. +pub fn load_workflow_from_str( + yaml: &str, + config: &HashMap, +) -> Result { + // Interpolate variables. + let interpolated = interpolation::interpolate(yaml, config)?; + + // Parse YAML. + let workflow: schema::YamlWorkflow = serde_yaml::from_str(&interpolated)?; + + // Validate. + validation::validate(&workflow.workflow)?; + + // Compile. + compiler::compile(&workflow.workflow) +} diff --git a/wfe-yaml/src/schema.rs b/wfe-yaml/src/schema.rs new file mode 100644 index 0000000..9e112c3 --- /dev/null +++ b/wfe-yaml/src/schema.rs @@ -0,0 +1,72 @@ +use std::collections::HashMap; + +use serde::Deserialize; + +#[derive(Debug, Deserialize)] +pub struct YamlWorkflow { + pub workflow: WorkflowSpec, +} + +#[derive(Debug, Deserialize)] +pub struct WorkflowSpec { + pub id: String, + pub version: u32, + #[serde(default)] + pub description: Option, + #[serde(default)] + pub error_behavior: Option, + pub steps: Vec, + /// Allow unknown top-level keys (e.g. `_templates`) for YAML anchors. + #[serde(flatten)] + pub _extra: HashMap, +} + +#[derive(Debug, Deserialize)] +pub struct YamlStep { + pub name: String, + #[serde(rename = "type")] + pub step_type: Option, + #[serde(default)] + pub config: Option, + #[serde(default)] + pub inputs: Vec, + #[serde(default)] + pub outputs: Vec, + #[serde(default)] + pub parallel: Option>, + #[serde(default)] + pub error_behavior: Option, + #[serde(default)] + pub on_success: Option>, + #[serde(default)] + pub on_failure: Option>, + #[serde(default)] + pub ensure: Option>, +} + +#[derive(Debug, Deserialize, Clone)] +pub struct StepConfig { + pub run: Option, + pub file: Option, + pub script: Option, + pub shell: Option, + #[serde(default)] + pub env: HashMap, + pub timeout: Option, + pub working_dir: Option, +} + +#[derive(Debug, Deserialize)] +pub struct DataRef { + pub name: String, + pub path: Option, + pub json_path: Option, +} + +#[derive(Debug, Deserialize)] +pub struct YamlErrorBehavior { + #[serde(rename = "type")] + pub behavior_type: String, + pub interval: Option, + pub max_retries: Option, +} diff --git a/wfe-yaml/src/validation.rs b/wfe-yaml/src/validation.rs new file mode 100644 index 0000000..e0f1cc3 --- /dev/null +++ b/wfe-yaml/src/validation.rs @@ -0,0 +1,106 @@ +use std::collections::HashSet; + +use crate::error::YamlWorkflowError; +use crate::schema::{WorkflowSpec, YamlStep}; + +/// Validate a parsed workflow spec. +pub fn validate(spec: &WorkflowSpec) -> Result<(), YamlWorkflowError> { + if spec.steps.is_empty() { + return Err(YamlWorkflowError::Validation( + "Workflow must have at least one step".to_string(), + )); + } + + let mut seen_names = HashSet::new(); + validate_steps(&spec.steps, &mut seen_names)?; + + // Validate workflow-level error behavior. + if let Some(ref eb) = spec.error_behavior { + validate_error_behavior_type(&eb.behavior_type)?; + } + + Ok(()) +} + +fn validate_steps( + steps: &[YamlStep], + seen_names: &mut HashSet, +) -> Result<(), YamlWorkflowError> { + for step in steps { + // Check for duplicate names. + if !seen_names.insert(step.name.clone()) { + return Err(YamlWorkflowError::Validation(format!( + "Duplicate step name: '{}'", + step.name + ))); + } + + // A step must have either (type + config) or parallel, but not both. + let has_type = step.step_type.is_some(); + let has_parallel = step.parallel.is_some(); + + if !has_type && !has_parallel { + return Err(YamlWorkflowError::Validation(format!( + "Step '{}' must have either 'type' + 'config' or 'parallel'", + step.name + ))); + } + + if has_type && has_parallel { + return Err(YamlWorkflowError::Validation(format!( + "Step '{}' cannot have both 'type' and 'parallel'", + step.name + ))); + } + + // Shell steps must have config.run or config.file. + if let Some(ref step_type) = step.step_type + && step_type == "shell" + { + let config = step.config.as_ref().ok_or_else(|| { + YamlWorkflowError::Validation(format!( + "Shell step '{}' must have a 'config' section", + step.name + )) + })?; + if config.run.is_none() && config.file.is_none() { + return Err(YamlWorkflowError::Validation(format!( + "Shell step '{}' must have 'config.run' or 'config.file'", + step.name + ))); + } + } + + // Validate step-level error behavior. + if let Some(ref eb) = step.error_behavior { + validate_error_behavior_type(&eb.behavior_type)?; + } + + // Validate parallel children. + if let Some(ref children) = step.parallel { + validate_steps(children, seen_names)?; + } + + // Validate hook steps. + if let Some(ref hook) = step.on_success { + validate_steps(std::slice::from_ref(hook.as_ref()), seen_names)?; + } + if let Some(ref hook) = step.on_failure { + validate_steps(std::slice::from_ref(hook.as_ref()), seen_names)?; + } + if let Some(ref hook) = step.ensure { + validate_steps(std::slice::from_ref(hook.as_ref()), seen_names)?; + } + } + Ok(()) +} + +fn validate_error_behavior_type(behavior_type: &str) -> Result<(), YamlWorkflowError> { + match behavior_type { + "retry" | "suspend" | "terminate" | "compensate" => Ok(()), + other => Err(YamlWorkflowError::Validation(format!( + "Invalid error behavior type: '{}'. Must be retry, suspend, terminate, or compensate", + other + ))), + } +} diff --git a/wfe-yaml/tests/compiler.rs b/wfe-yaml/tests/compiler.rs new file mode 100644 index 0000000..c6b4a7f --- /dev/null +++ b/wfe-yaml/tests/compiler.rs @@ -0,0 +1,225 @@ +use std::collections::HashMap; +use std::time::Duration; + +use wfe_core::models::error_behavior::ErrorBehavior; +use wfe_yaml::load_workflow_from_str; + +#[test] +fn single_step_produces_one_workflow_step() { + let yaml = r#" +workflow: + id: single + version: 1 + steps: + - name: hello + type: shell + config: + run: echo hello +"#; + let compiled = load_workflow_from_str(yaml, &HashMap::new()).unwrap(); + // The definition should have exactly 1 main step. + let main_steps: Vec<_> = compiled + .definition + .steps + .iter() + .filter(|s| s.name.as_deref() == Some("hello")) + .collect(); + assert_eq!(main_steps.len(), 1); + assert_eq!(main_steps[0].id, 0); +} + +#[test] +fn two_sequential_steps_wired_correctly() { + let yaml = r#" +workflow: + id: sequential + version: 1 + steps: + - name: step-a + type: shell + config: + run: echo a + - name: step-b + type: shell + config: + run: echo b +"#; + let compiled = load_workflow_from_str(yaml, &HashMap::new()).unwrap(); + + let step_a = compiled + .definition + .steps + .iter() + .find(|s| s.name.as_deref() == Some("step-a")) + .unwrap(); + let step_b = compiled + .definition + .steps + .iter() + .find(|s| s.name.as_deref() == Some("step-b")) + .unwrap(); + + // step-a should have an outcome pointing to step-b. + assert_eq!(step_a.outcomes.len(), 1); + assert_eq!(step_a.outcomes[0].next_step, step_b.id); +} + +#[test] +fn parallel_block_produces_container_with_children() { + let yaml = r#" +workflow: + id: parallel-wf + version: 1 + steps: + - name: parallel-group + parallel: + - name: task-a + type: shell + config: + run: echo a + - name: task-b + type: shell + config: + run: echo b +"#; + let compiled = load_workflow_from_str(yaml, &HashMap::new()).unwrap(); + + let container = compiled + .definition + .steps + .iter() + .find(|s| s.name.as_deref() == Some("parallel-group")) + .unwrap(); + + assert!( + container.step_type.contains("SequenceStep"), + "Container should be a SequenceStep, got: {}", + container.step_type + ); + assert_eq!(container.children.len(), 2); +} + +#[test] +fn on_failure_creates_compensation_step() { + let yaml = r#" +workflow: + id: compensation-wf + version: 1 + steps: + - name: deploy + type: shell + config: + run: deploy.sh + on_failure: + name: rollback + type: shell + config: + run: rollback.sh +"#; + let compiled = load_workflow_from_str(yaml, &HashMap::new()).unwrap(); + + let deploy = compiled + .definition + .steps + .iter() + .find(|s| s.name.as_deref() == Some("deploy")) + .unwrap(); + + assert!(deploy.compensation_step_id.is_some()); + assert_eq!(deploy.error_behavior, Some(ErrorBehavior::Compensate)); + + let rollback = compiled + .definition + .steps + .iter() + .find(|s| s.name.as_deref() == Some("rollback")) + .unwrap(); + + assert_eq!(deploy.compensation_step_id, Some(rollback.id)); +} + +#[test] +fn error_behavior_maps_correctly() { + let yaml = r#" +workflow: + id: retry-wf + version: 1 + error_behavior: + type: retry + interval: 5s + max_retries: 10 + steps: + - name: step1 + type: shell + config: + run: echo hi + error_behavior: + type: suspend +"#; + let compiled = load_workflow_from_str(yaml, &HashMap::new()).unwrap(); + + assert_eq!( + compiled.definition.default_error_behavior, + ErrorBehavior::Retry { + interval: Duration::from_secs(5), + max_retries: 10, + } + ); + + let step = compiled + .definition + .steps + .iter() + .find(|s| s.name.as_deref() == Some("step1")) + .unwrap(); + assert_eq!(step.error_behavior, Some(ErrorBehavior::Suspend)); +} + +#[test] +fn anchors_compile_correctly() { + let yaml = r#" +workflow: + id: anchor-wf + version: 1 + steps: + - name: build + type: shell + config: &default_config + shell: bash + timeout: 5m + run: cargo build + + - name: test + type: shell + config: *default_config +"#; + let compiled = load_workflow_from_str(yaml, &HashMap::new()).unwrap(); + + // Should have 2 main steps + factories. + let build_step = compiled + .definition + .steps + .iter() + .find(|s| s.name.as_deref() == Some("build")) + .unwrap(); + let test_step = compiled + .definition + .steps + .iter() + .find(|s| s.name.as_deref() == Some("test")) + .unwrap(); + + // Both should have step_config. + assert!(build_step.step_config.is_some()); + assert!(test_step.step_config.is_some()); + + // Build should wire to test. + assert_eq!(build_step.outcomes.len(), 1); + assert_eq!(build_step.outcomes[0].next_step, test_step.id); + + // Test uses the same config via alias - shell should be bash. + let test_config: wfe_yaml::executors::shell::ShellConfig = + serde_json::from_value(test_step.step_config.clone().unwrap()).unwrap(); + assert_eq!(test_config.run, "cargo build"); + assert_eq!(test_config.shell, "bash", "shell should be inherited from YAML anchor alias"); +} diff --git a/wfe-yaml/tests/e2e_yaml.rs b/wfe-yaml/tests/e2e_yaml.rs new file mode 100644 index 0000000..318879a --- /dev/null +++ b/wfe-yaml/tests/e2e_yaml.rs @@ -0,0 +1,125 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; + +use wfe::models::WorkflowStatus; +use wfe::{WorkflowHostBuilder, run_workflow_sync}; +use wfe_core::test_support::{ + InMemoryLockProvider, InMemoryPersistenceProvider, InMemoryQueueProvider, +}; +use wfe_yaml::load_workflow_from_str; + +async fn run_yaml_workflow(yaml: &str) -> wfe::models::WorkflowInstance { + let config = HashMap::new(); + let compiled = load_workflow_from_str(yaml, &config).unwrap(); + + let persistence = Arc::new(InMemoryPersistenceProvider::new()); + let lock = Arc::new(InMemoryLockProvider::new()); + let queue = Arc::new(InMemoryQueueProvider::new()); + + let host = WorkflowHostBuilder::new() + .use_persistence(persistence as Arc) + .use_lock_provider(lock as Arc) + .use_queue_provider(queue as Arc) + .build() + .unwrap(); + + // Register step factories. + for (key, factory) in compiled.step_factories { + host.register_step_factory(&key, factory).await; + } + + host.register_workflow_definition(compiled.definition.clone()) + .await; + host.start().await.unwrap(); + + let instance = run_workflow_sync( + &host, + &compiled.definition.id, + compiled.definition.version, + serde_json::json!({}), + Duration::from_secs(10), + ) + .await + .unwrap(); + + host.stop().await; + instance +} + +#[tokio::test] +async fn simple_echo_workflow_runs_to_completion() { + let yaml = r#" +workflow: + id: echo-wf + version: 1 + steps: + - name: echo-step + type: shell + config: + run: echo "hello from wfe-yaml" +"#; + let instance = run_yaml_workflow(yaml).await; + assert_eq!(instance.status, WorkflowStatus::Complete); +} + +#[tokio::test] +async fn workflow_with_output_capture() { + let wfe_prefix = "##wfe"; + let yaml = format!( + r#" +workflow: + id: output-wf + version: 1 + steps: + - name: capture + type: shell + config: + run: | + echo "{wfe_prefix}[output greeting=hello]" + echo "{wfe_prefix}[output count=42]" +"# + ); + let instance = run_yaml_workflow(&yaml).await; + assert_eq!(instance.status, WorkflowStatus::Complete); + + // Check that outputs were captured in the workflow data. + if let Some(data) = instance.data.as_object() { + // output_data gets merged into workflow.data by the executor. + // Check that our outputs exist. + if let Some(greeting) = data.get("greeting") { + assert_eq!(greeting.as_str(), Some("hello")); + } + if let Some(count) = data.get("count") { + assert_eq!(count.as_str(), Some("42")); + } + } +} + +#[tokio::test] +async fn two_sequential_steps_run_in_order() { + let yaml = r#" +workflow: + id: seq-wf + version: 1 + steps: + - name: step-one + type: shell + config: + run: echo step-one + - name: step-two + type: shell + config: + run: echo step-two +"#; + let instance = run_yaml_workflow(yaml).await; + assert_eq!(instance.status, WorkflowStatus::Complete); + + // Both steps should have completed. + let complete_count = instance + .execution_pointers + .iter() + .filter(|p| p.status == wfe::models::PointerStatus::Complete) + .count(); + assert_eq!(complete_count, 2, "Expected 2 completed execution pointers"); +} diff --git a/wfe-yaml/tests/interpolation.rs b/wfe-yaml/tests/interpolation.rs new file mode 100644 index 0000000..c778855 --- /dev/null +++ b/wfe-yaml/tests/interpolation.rs @@ -0,0 +1,77 @@ +use std::collections::HashMap; + +use wfe_yaml::interpolation::interpolate; + +#[test] +fn simple_var_replacement() { + let mut config = HashMap::new(); + config.insert("name".to_string(), serde_json::json!("world")); + + let result = interpolate("hello ((name))", &config).unwrap(); + assert_eq!(result, "hello world"); +} + +#[test] +fn nested_path_replacement() { + let mut config = HashMap::new(); + config.insert( + "config".to_string(), + serde_json::json!({ + "database": { + "host": "localhost", + "port": 5432 + } + }), + ); + + let result = interpolate("host: ((config.database.host))", &config).unwrap(); + assert_eq!(result, "host: localhost"); + + let result = interpolate("port: ((config.database.port))", &config).unwrap(); + assert_eq!(result, "port: 5432"); +} + +#[test] +fn unresolved_var_returns_error() { + let config = HashMap::new(); + let result = interpolate("hello ((missing_var))", &config); + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!(err.to_string().contains("missing_var")); +} + +#[test] +fn no_vars_passes_through_unchanged() { + let config = HashMap::new(); + let input = "no variables here"; + let result = interpolate(input, &config).unwrap(); + assert_eq!(result, input); +} + +#[test] +fn multiple_vars_in_one_string() { + let mut config = HashMap::new(); + config.insert("first".to_string(), serde_json::json!("hello")); + config.insert("second".to_string(), serde_json::json!("world")); + + let result = interpolate("((first)) ((second))!", &config).unwrap(); + assert_eq!(result, "hello world!"); +} + +#[test] +fn interpolation_does_not_break_yaml_anchors() { + let mut config = HashMap::new(); + config.insert("version".to_string(), serde_json::json!("1.0")); + + // YAML anchor syntax should not be confused with ((var)) syntax. + let yaml = r#" +default: &default + version: ((version)) +merged: + <<: *default +"#; + let result = interpolate(yaml, &config).unwrap(); + assert!(result.contains("version: 1.0")); + assert!(result.contains("&default")); + assert!(result.contains("*default")); +} diff --git a/wfe-yaml/tests/schema.rs b/wfe-yaml/tests/schema.rs new file mode 100644 index 0000000..f329fb1 --- /dev/null +++ b/wfe-yaml/tests/schema.rs @@ -0,0 +1,194 @@ +use wfe_yaml::schema::YamlWorkflow; + +#[test] +fn parse_minimal_yaml() { + let yaml = r#" +workflow: + id: minimal + version: 1 + steps: + - name: hello + type: shell + config: + run: echo hello +"#; + let parsed: YamlWorkflow = serde_yaml::from_str(yaml).unwrap(); + assert_eq!(parsed.workflow.id, "minimal"); + assert_eq!(parsed.workflow.version, 1); + assert_eq!(parsed.workflow.steps.len(), 1); + assert_eq!(parsed.workflow.steps[0].name, "hello"); + assert_eq!( + parsed.workflow.steps[0].step_type.as_deref(), + Some("shell") + ); +} + +#[test] +fn parse_with_parallel_block() { + let yaml = r#" +workflow: + id: parallel-wf + version: 1 + steps: + - name: parallel-group + parallel: + - name: task-a + type: shell + config: + run: echo a + - name: task-b + type: shell + config: + run: echo b +"#; + let parsed: YamlWorkflow = serde_yaml::from_str(yaml).unwrap(); + let step = &parsed.workflow.steps[0]; + assert!(step.parallel.is_some()); + let children = step.parallel.as_ref().unwrap(); + assert_eq!(children.len(), 2); + assert_eq!(children[0].name, "task-a"); + assert_eq!(children[1].name, "task-b"); +} + +#[test] +fn parse_with_hooks() { + let yaml = r#" +workflow: + id: hooks-wf + version: 1 + steps: + - name: deploy + type: shell + config: + run: deploy.sh + on_failure: + name: rollback + type: shell + config: + run: rollback.sh + ensure: + name: cleanup + type: shell + config: + run: cleanup.sh +"#; + let parsed: YamlWorkflow = serde_yaml::from_str(yaml).unwrap(); + let step = &parsed.workflow.steps[0]; + assert!(step.on_failure.is_some()); + assert_eq!(step.on_failure.as_ref().unwrap().name, "rollback"); + assert!(step.ensure.is_some()); + assert_eq!(step.ensure.as_ref().unwrap().name, "cleanup"); +} + +#[test] +fn parse_with_error_behavior() { + let yaml = r#" +workflow: + id: retry-wf + version: 1 + error_behavior: + type: retry + interval: 5s + max_retries: 5 + steps: + - name: flaky + type: shell + config: + run: flaky-task.sh + error_behavior: + type: terminate +"#; + let parsed: YamlWorkflow = serde_yaml::from_str(yaml).unwrap(); + let eb = parsed.workflow.error_behavior.as_ref().unwrap(); + assert_eq!(eb.behavior_type, "retry"); + assert_eq!(eb.interval.as_deref(), Some("5s")); + assert_eq!(eb.max_retries, Some(5)); + + let step_eb = parsed.workflow.steps[0].error_behavior.as_ref().unwrap(); + assert_eq!(step_eb.behavior_type, "terminate"); +} + +#[test] +fn invalid_yaml_returns_error() { + let yaml = "this is not valid yaml: ["; + let result: Result = serde_yaml::from_str(yaml); + assert!(result.is_err()); +} + +#[test] +fn parse_with_yaml_anchors_and_aliases() { + // Direct anchor/alias: reuse entire config block via *alias. + let yaml = r#" +workflow: + id: test-anchors + version: 1 + steps: + - name: build + type: shell + config: &default_config + shell: bash + timeout: 5m + run: cargo build + + - name: test + type: shell + config: *default_config +"#; + let parsed: YamlWorkflow = serde_yaml::from_str(yaml).unwrap(); + assert_eq!(parsed.workflow.steps.len(), 2); + + // The build step has the original config. + let build = &parsed.workflow.steps[0]; + let build_config = build.config.as_ref().unwrap(); + assert_eq!(build_config.shell.as_deref(), Some("bash")); + assert_eq!(build_config.timeout.as_deref(), Some("5m")); + assert_eq!(build_config.run.as_deref(), Some("cargo build")); + + // The test step gets the same config via alias. + let test = &parsed.workflow.steps[1]; + let test_config = test.config.as_ref().unwrap(); + assert_eq!(test_config.shell.as_deref(), Some("bash")); + assert_eq!(test_config.timeout.as_deref(), Some("5m")); + assert_eq!(test_config.run.as_deref(), Some("cargo build")); +} + +#[test] +fn parse_with_scalar_anchors() { + // Anchors on scalar values. + let yaml = r#" +workflow: + id: scalar-anchors + version: 1 + steps: + - name: step1 + type: &step_type shell + config: + run: echo hi + - name: step2 + type: *step_type + config: + run: echo bye +"#; + let parsed: YamlWorkflow = serde_yaml::from_str(yaml).unwrap(); + assert_eq!(parsed.workflow.steps[0].step_type.as_deref(), Some("shell")); + assert_eq!(parsed.workflow.steps[1].step_type.as_deref(), Some("shell")); +} + +#[test] +fn parse_with_extra_keys_for_templates() { + let yaml = r#" +workflow: + id: template-wf + version: 1 + _templates: + default_shell: bash + steps: + - name: step1 + type: shell + config: + run: echo hi +"#; + let parsed: YamlWorkflow = serde_yaml::from_str(yaml).unwrap(); + assert_eq!(parsed.workflow.id, "template-wf"); + assert_eq!(parsed.workflow.steps.len(), 1); +}