feat(wfe-yaml): add YAML workflow definitions with shell executor

Concourse-CI-inspired YAML format for defining workflows. Compiles
to standard WorkflowDefinition + step factories.

Features:
- Schema parsing with serde_yaml (YamlWorkflow, YamlStep, StepConfig)
- ((var.path)) interpolation from config maps at load time
- YAML anchors (&anchor/*alias) fully supported
- Validation at load time (no runtime surprises)
- Shell executor: runs commands via tokio::process, captures stdout,
  parses ##wfe[output name=value] annotations for structured outputs
- Compiler: sequential wiring, parallel blocks, on_failure/on_success/
  ensure hooks, error behavior mapping
- Public API: load_workflow(), load_workflow_from_str()
- 23 tests (schema, interpolation, compiler, e2e)
This commit is contained in:
2026-03-25 21:32:00 +00:00
parent 8d0f83da3c
commit b89563af63
14 changed files with 1377 additions and 1 deletions

313
wfe-yaml/src/compiler.rs Normal file
View File

@@ -0,0 +1,313 @@
use std::time::Duration;
use wfe_core::models::error_behavior::ErrorBehavior;
use wfe_core::models::workflow_definition::{StepOutcome, WorkflowDefinition, WorkflowStep};
use wfe_core::traits::StepBody;
use crate::error::YamlWorkflowError;
use crate::executors::shell::{ShellConfig, ShellStep};
use crate::schema::{WorkflowSpec, YamlErrorBehavior, YamlStep};
/// Factory type alias for step creation closures.
pub type StepFactory = Box<dyn Fn() -> Box<dyn StepBody> + Send + Sync>;
/// A compiled workflow ready to be registered with the WFE host.
pub struct CompiledWorkflow {
pub definition: WorkflowDefinition,
pub step_factories: Vec<(String, StepFactory)>,
}
/// Compile a parsed WorkflowSpec into a CompiledWorkflow.
pub fn compile(spec: &WorkflowSpec) -> Result<CompiledWorkflow, YamlWorkflowError> {
let mut definition = WorkflowDefinition::new(&spec.id, spec.version);
definition.description = spec.description.clone();
if let Some(ref eb) = spec.error_behavior {
definition.default_error_behavior = map_error_behavior(eb)?;
}
let mut factories: Vec<(String, StepFactory)> = Vec::new();
let mut next_id: usize = 0;
compile_steps(&spec.steps, &mut definition, &mut factories, &mut next_id)?;
Ok(CompiledWorkflow {
definition,
step_factories: factories,
})
}
fn compile_steps(
yaml_steps: &[YamlStep],
definition: &mut WorkflowDefinition,
factories: &mut Vec<(String, StepFactory)>,
next_id: &mut usize,
) -> Result<Vec<usize>, YamlWorkflowError> {
let mut main_step_ids = Vec::new();
for yaml_step in yaml_steps {
if let Some(ref parallel_children) = yaml_step.parallel {
// Create a Sequence container step for the parallel block.
let container_id = *next_id;
*next_id += 1;
let mut container = WorkflowStep::new(
container_id,
"wfe_core::primitives::sequence::SequenceStep",
);
container.name = Some(yaml_step.name.clone());
if let Some(ref eb) = yaml_step.error_behavior {
container.error_behavior = Some(map_error_behavior(eb)?);
}
// Compile children.
let child_ids =
compile_steps(parallel_children, definition, factories, next_id)?;
container.children = child_ids;
definition.steps.push(container);
main_step_ids.push(container_id);
} else {
// Regular step (shell).
let step_id = *next_id;
*next_id += 1;
let step_type_key = format!("wfe_yaml::shell::{}", yaml_step.name);
let config = build_shell_config(yaml_step)?;
let mut wf_step = WorkflowStep::new(step_id, &step_type_key);
wf_step.name = Some(yaml_step.name.clone());
wf_step.step_config = Some(serde_json::to_value(&config).map_err(|e| {
YamlWorkflowError::Compilation(format!(
"Failed to serialize shell config: {e}"
))
})?);
if let Some(ref eb) = yaml_step.error_behavior {
wf_step.error_behavior = Some(map_error_behavior(eb)?);
}
// Handle on_failure: create compensation step.
if let Some(ref on_failure) = yaml_step.on_failure {
let comp_id = *next_id;
*next_id += 1;
let comp_key = format!("wfe_yaml::shell::{}", on_failure.name);
let comp_config = build_shell_config(on_failure)?;
let mut comp_step = WorkflowStep::new(comp_id, &comp_key);
comp_step.name = Some(on_failure.name.clone());
comp_step.step_config =
Some(serde_json::to_value(&comp_config).map_err(|e| {
YamlWorkflowError::Compilation(format!(
"Failed to serialize shell config: {e}"
))
})?);
wf_step.compensation_step_id = Some(comp_id);
wf_step.error_behavior = Some(ErrorBehavior::Compensate);
definition.steps.push(comp_step);
let comp_config_clone = comp_config.clone();
factories.push((
comp_key,
Box::new(move || {
Box::new(ShellStep::new(comp_config_clone.clone()))
as Box<dyn StepBody>
}),
));
}
// Handle on_success: insert between this step and the next.
if let Some(ref on_success) = yaml_step.on_success {
let success_id = *next_id;
*next_id += 1;
let success_key = format!("wfe_yaml::shell::{}", on_success.name);
let success_config = build_shell_config(on_success)?;
let mut success_step = WorkflowStep::new(success_id, &success_key);
success_step.name = Some(on_success.name.clone());
success_step.step_config =
Some(serde_json::to_value(&success_config).map_err(|e| {
YamlWorkflowError::Compilation(format!(
"Failed to serialize shell config: {e}"
))
})?);
// Wire main step -> on_success step.
wf_step.outcomes.push(StepOutcome {
next_step: success_id,
label: Some("success".to_string()),
value: None,
});
definition.steps.push(success_step);
let success_config_clone = success_config.clone();
factories.push((
success_key,
Box::new(move || {
Box::new(ShellStep::new(success_config_clone.clone()))
as Box<dyn StepBody>
}),
));
}
// Handle ensure: create an ensure step wired after both paths.
if let Some(ref ensure) = yaml_step.ensure {
let ensure_id = *next_id;
*next_id += 1;
let ensure_key = format!("wfe_yaml::shell::{}", ensure.name);
let ensure_config = build_shell_config(ensure)?;
let mut ensure_step = WorkflowStep::new(ensure_id, &ensure_key);
ensure_step.name = Some(ensure.name.clone());
ensure_step.step_config =
Some(serde_json::to_value(&ensure_config).map_err(|e| {
YamlWorkflowError::Compilation(format!(
"Failed to serialize shell config: {e}"
))
})?);
// Wire main step -> ensure (if no on_success already).
if yaml_step.on_success.is_none() {
wf_step.outcomes.push(StepOutcome {
next_step: ensure_id,
label: Some("ensure".to_string()),
value: None,
});
}
definition.steps.push(ensure_step);
let ensure_config_clone = ensure_config.clone();
factories.push((
ensure_key,
Box::new(move || {
Box::new(ShellStep::new(ensure_config_clone.clone()))
as Box<dyn StepBody>
}),
));
}
definition.steps.push(wf_step);
// Register factory for main step.
let config_clone = config.clone();
factories.push((
step_type_key,
Box::new(move || {
Box::new(ShellStep::new(config_clone.clone())) as Box<dyn StepBody>
}),
));
main_step_ids.push(step_id);
}
}
// Wire sequential outcomes between main steps (step N -> step N+1).
for i in 0..main_step_ids.len().saturating_sub(1) {
let current_id = main_step_ids[i];
let next_step_id = main_step_ids[i + 1];
if let Some(step) = definition.steps.iter_mut().find(|s| s.id == current_id) {
if step.outcomes.is_empty() {
step.outcomes.push(StepOutcome {
next_step: next_step_id,
label: None,
value: None,
});
} else {
// Wire the last hook step to the next main step.
let last_outcome_step = step.outcomes.last().unwrap().next_step;
if let Some(hook_step) = definition
.steps
.iter_mut()
.find(|s| s.id == last_outcome_step)
&& hook_step.outcomes.is_empty()
{
hook_step.outcomes.push(StepOutcome {
next_step: next_step_id,
label: None,
value: None,
});
}
}
}
}
Ok(main_step_ids)
}
fn build_shell_config(step: &YamlStep) -> Result<ShellConfig, YamlWorkflowError> {
let config = step.config.as_ref().ok_or_else(|| {
YamlWorkflowError::Compilation(format!(
"Step '{}' is missing 'config' section",
step.name
))
})?;
let run = config
.run
.clone()
.or_else(|| config.file.as_ref().map(|f| format!("sh {f}")))
.or_else(|| config.script.clone())
.ok_or_else(|| {
YamlWorkflowError::Compilation(format!(
"Step '{}' must have 'run', 'file', or 'script' in config",
step.name
))
})?;
let shell = config.shell.clone().unwrap_or_else(|| "sh".to_string());
let timeout_ms = config.timeout.as_ref().and_then(|t| parse_duration_ms(t));
Ok(ShellConfig {
run,
shell,
env: config.env.clone(),
working_dir: config.working_dir.clone(),
timeout_ms,
})
}
fn parse_duration_ms(s: &str) -> Option<u64> {
let s = s.trim();
if let Some(secs) = s.strip_suffix('s') {
secs.trim().parse::<u64>().ok().map(|v| v * 1000)
} else if let Some(mins) = s.strip_suffix('m') {
mins.trim().parse::<u64>().ok().map(|v| v * 60 * 1000)
} else if let Some(ms) = s.strip_suffix("ms") {
ms.trim().parse::<u64>().ok()
} else {
s.parse::<u64>().ok()
}
}
fn map_error_behavior(eb: &YamlErrorBehavior) -> Result<ErrorBehavior, YamlWorkflowError> {
match eb.behavior_type.as_str() {
"retry" => {
let interval = eb
.interval
.as_ref()
.and_then(|i| parse_duration_ms(i))
.map(Duration::from_millis)
.unwrap_or(Duration::from_secs(60));
let max_retries = eb.max_retries.unwrap_or(3);
Ok(ErrorBehavior::Retry {
interval,
max_retries,
})
}
"suspend" => Ok(ErrorBehavior::Suspend),
"terminate" => Ok(ErrorBehavior::Terminate),
"compensate" => Ok(ErrorBehavior::Compensate),
other => Err(YamlWorkflowError::Compilation(format!(
"Unknown error behavior type: '{other}'"
))),
}
}

13
wfe-yaml/src/error.rs Normal file
View File

@@ -0,0 +1,13 @@
#[derive(Debug, thiserror::Error)]
pub enum YamlWorkflowError {
#[error("YAML parse error: {0}")]
Parse(#[from] serde_yaml::Error),
#[error("Interpolation error: unresolved variable '{0}'")]
UnresolvedVariable(String),
#[error("Validation error: {0}")]
Validation(String),
#[error("Compilation error: {0}")]
Compilation(String),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
}

View File

@@ -0,0 +1 @@
pub mod shell;

View File

@@ -0,0 +1,121 @@
use std::collections::HashMap;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use wfe_core::models::ExecutionResult;
use wfe_core::traits::step::{StepBody, StepExecutionContext};
use wfe_core::WfeError;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ShellConfig {
pub run: String,
pub shell: String,
pub env: HashMap<String, String>,
pub working_dir: Option<String>,
pub timeout_ms: Option<u64>,
}
pub struct ShellStep {
config: ShellConfig,
}
impl ShellStep {
pub fn new(config: ShellConfig) -> Self {
Self { config }
}
}
#[async_trait]
impl StepBody for ShellStep {
async fn run(&mut self, context: &StepExecutionContext<'_>) -> wfe_core::Result<ExecutionResult> {
let mut cmd = tokio::process::Command::new(&self.config.shell);
cmd.arg("-c").arg(&self.config.run);
// Inject workflow data as UPPER_CASE env vars (top-level keys only).
if let Some(data_obj) = context.workflow.data.as_object() {
for (key, value) in data_obj {
let env_key = key.to_uppercase();
let env_val = match value {
serde_json::Value::String(s) => s.clone(),
other => other.to_string(),
};
cmd.env(&env_key, &env_val);
}
}
// Add extra env from config.
for (key, value) in &self.config.env {
cmd.env(key, value);
}
// Set working directory if specified.
if let Some(ref dir) = self.config.working_dir {
cmd.current_dir(dir);
}
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
// Execute with optional timeout.
let output = if let Some(timeout_ms) = self.config.timeout_ms {
let duration = std::time::Duration::from_millis(timeout_ms);
match tokio::time::timeout(duration, cmd.output()).await {
Ok(result) => result.map_err(|e| WfeError::StepExecution(format!("Failed to spawn shell command: {e}")))?,
Err(_) => {
return Err(WfeError::StepExecution(format!(
"Shell command timed out after {}ms",
timeout_ms
)));
}
}
} else {
cmd.output()
.await
.map_err(|e| WfeError::StepExecution(format!("Failed to spawn shell command: {e}")))?
};
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
if !output.status.success() {
let code = output.status.code().unwrap_or(-1);
return Err(WfeError::StepExecution(format!(
"Shell command exited with code {code}\nstdout: {stdout}\nstderr: {stderr}"
)));
}
// Parse ##wfe[output name=value] lines from stdout.
let mut outputs = serde_json::Map::new();
for line in stdout.lines() {
if let Some(rest) = line.strip_prefix("##wfe[output ")
&& let Some(rest) = rest.strip_suffix(']')
&& let Some(eq_pos) = rest.find('=')
{
let name = rest[..eq_pos].trim().to_string();
let value = rest[eq_pos + 1..].to_string();
outputs.insert(name, serde_json::Value::String(value));
}
}
// Add raw stdout under the step name.
let step_name = context
.step
.name
.as_deref()
.unwrap_or("unknown");
outputs.insert(
format!("{step_name}.stdout"),
serde_json::Value::String(stdout.clone()),
);
outputs.insert(
format!("{step_name}.stderr"),
serde_json::Value::String(stderr),
);
Ok(ExecutionResult {
proceed: true,
output_data: Some(serde_json::Value::Object(outputs)),
..Default::default()
})
}
}

View File

@@ -0,0 +1,64 @@
use std::collections::HashMap;
use regex::Regex;
use crate::error::YamlWorkflowError;
/// Resolve `((var.path))` expressions in a YAML string against a config map.
///
/// Dot-path traversal: `((config.database.host))` resolves by walking
/// `config["config"]["database"]["host"]`.
pub fn interpolate(
yaml: &str,
config: &HashMap<String, serde_json::Value>,
) -> Result<String, YamlWorkflowError> {
let re = Regex::new(r"\(\(([a-zA-Z0-9_.]+)\)\)").expect("valid regex");
let mut result = String::with_capacity(yaml.len());
let mut last_end = 0;
for cap in re.captures_iter(yaml) {
let m = cap.get(0).unwrap();
let var_path = &cap[1];
// Resolve the variable path.
let value = resolve_path(var_path, config)?;
result.push_str(&yaml[last_end..m.start()]);
result.push_str(&value);
last_end = m.end();
}
result.push_str(&yaml[last_end..]);
Ok(result)
}
fn resolve_path(
path: &str,
config: &HashMap<String, serde_json::Value>,
) -> Result<String, YamlWorkflowError> {
let parts: Vec<&str> = path.split('.').collect();
if parts.is_empty() {
return Err(YamlWorkflowError::UnresolvedVariable(path.to_string()));
}
// The first segment is the top-level key in the config map.
let root = config
.get(parts[0])
.ok_or_else(|| YamlWorkflowError::UnresolvedVariable(path.to_string()))?;
// Walk remaining segments.
let mut current = root;
for &segment in &parts[1..] {
current = current
.get(segment)
.ok_or_else(|| YamlWorkflowError::UnresolvedVariable(path.to_string()))?;
}
// Convert the final value to a string.
match current {
serde_json::Value::String(s) => Ok(s.clone()),
serde_json::Value::Null => Ok("null".to_string()),
other => Ok(other.to_string()),
}
}

38
wfe-yaml/src/lib.rs Normal file
View File

@@ -0,0 +1,38 @@
pub mod compiler;
pub mod error;
pub mod executors;
pub mod interpolation;
pub mod schema;
pub mod validation;
use std::collections::HashMap;
use crate::compiler::CompiledWorkflow;
use crate::error::YamlWorkflowError;
/// Load a workflow from a YAML file path, applying variable interpolation.
pub fn load_workflow(
path: &std::path::Path,
config: &HashMap<String, serde_json::Value>,
) -> Result<CompiledWorkflow, YamlWorkflowError> {
let yaml = std::fs::read_to_string(path)?;
load_workflow_from_str(&yaml, config)
}
/// Load a workflow from a YAML string, applying variable interpolation.
pub fn load_workflow_from_str(
yaml: &str,
config: &HashMap<String, serde_json::Value>,
) -> Result<CompiledWorkflow, YamlWorkflowError> {
// Interpolate variables.
let interpolated = interpolation::interpolate(yaml, config)?;
// Parse YAML.
let workflow: schema::YamlWorkflow = serde_yaml::from_str(&interpolated)?;
// Validate.
validation::validate(&workflow.workflow)?;
// Compile.
compiler::compile(&workflow.workflow)
}

72
wfe-yaml/src/schema.rs Normal file
View File

@@ -0,0 +1,72 @@
use std::collections::HashMap;
use serde::Deserialize;
#[derive(Debug, Deserialize)]
pub struct YamlWorkflow {
pub workflow: WorkflowSpec,
}
#[derive(Debug, Deserialize)]
pub struct WorkflowSpec {
pub id: String,
pub version: u32,
#[serde(default)]
pub description: Option<String>,
#[serde(default)]
pub error_behavior: Option<YamlErrorBehavior>,
pub steps: Vec<YamlStep>,
/// Allow unknown top-level keys (e.g. `_templates`) for YAML anchors.
#[serde(flatten)]
pub _extra: HashMap<String, serde_yaml::Value>,
}
#[derive(Debug, Deserialize)]
pub struct YamlStep {
pub name: String,
#[serde(rename = "type")]
pub step_type: Option<String>,
#[serde(default)]
pub config: Option<StepConfig>,
#[serde(default)]
pub inputs: Vec<DataRef>,
#[serde(default)]
pub outputs: Vec<DataRef>,
#[serde(default)]
pub parallel: Option<Vec<YamlStep>>,
#[serde(default)]
pub error_behavior: Option<YamlErrorBehavior>,
#[serde(default)]
pub on_success: Option<Box<YamlStep>>,
#[serde(default)]
pub on_failure: Option<Box<YamlStep>>,
#[serde(default)]
pub ensure: Option<Box<YamlStep>>,
}
#[derive(Debug, Deserialize, Clone)]
pub struct StepConfig {
pub run: Option<String>,
pub file: Option<String>,
pub script: Option<String>,
pub shell: Option<String>,
#[serde(default)]
pub env: HashMap<String, String>,
pub timeout: Option<String>,
pub working_dir: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct DataRef {
pub name: String,
pub path: Option<String>,
pub json_path: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct YamlErrorBehavior {
#[serde(rename = "type")]
pub behavior_type: String,
pub interval: Option<String>,
pub max_retries: Option<u32>,
}

106
wfe-yaml/src/validation.rs Normal file
View File

@@ -0,0 +1,106 @@
use std::collections::HashSet;
use crate::error::YamlWorkflowError;
use crate::schema::{WorkflowSpec, YamlStep};
/// Validate a parsed workflow spec.
pub fn validate(spec: &WorkflowSpec) -> Result<(), YamlWorkflowError> {
if spec.steps.is_empty() {
return Err(YamlWorkflowError::Validation(
"Workflow must have at least one step".to_string(),
));
}
let mut seen_names = HashSet::new();
validate_steps(&spec.steps, &mut seen_names)?;
// Validate workflow-level error behavior.
if let Some(ref eb) = spec.error_behavior {
validate_error_behavior_type(&eb.behavior_type)?;
}
Ok(())
}
fn validate_steps(
steps: &[YamlStep],
seen_names: &mut HashSet<String>,
) -> Result<(), YamlWorkflowError> {
for step in steps {
// Check for duplicate names.
if !seen_names.insert(step.name.clone()) {
return Err(YamlWorkflowError::Validation(format!(
"Duplicate step name: '{}'",
step.name
)));
}
// A step must have either (type + config) or parallel, but not both.
let has_type = step.step_type.is_some();
let has_parallel = step.parallel.is_some();
if !has_type && !has_parallel {
return Err(YamlWorkflowError::Validation(format!(
"Step '{}' must have either 'type' + 'config' or 'parallel'",
step.name
)));
}
if has_type && has_parallel {
return Err(YamlWorkflowError::Validation(format!(
"Step '{}' cannot have both 'type' and 'parallel'",
step.name
)));
}
// Shell steps must have config.run or config.file.
if let Some(ref step_type) = step.step_type
&& step_type == "shell"
{
let config = step.config.as_ref().ok_or_else(|| {
YamlWorkflowError::Validation(format!(
"Shell step '{}' must have a 'config' section",
step.name
))
})?;
if config.run.is_none() && config.file.is_none() {
return Err(YamlWorkflowError::Validation(format!(
"Shell step '{}' must have 'config.run' or 'config.file'",
step.name
)));
}
}
// Validate step-level error behavior.
if let Some(ref eb) = step.error_behavior {
validate_error_behavior_type(&eb.behavior_type)?;
}
// Validate parallel children.
if let Some(ref children) = step.parallel {
validate_steps(children, seen_names)?;
}
// Validate hook steps.
if let Some(ref hook) = step.on_success {
validate_steps(std::slice::from_ref(hook.as_ref()), seen_names)?;
}
if let Some(ref hook) = step.on_failure {
validate_steps(std::slice::from_ref(hook.as_ref()), seen_names)?;
}
if let Some(ref hook) = step.ensure {
validate_steps(std::slice::from_ref(hook.as_ref()), seen_names)?;
}
}
Ok(())
}
fn validate_error_behavior_type(behavior_type: &str) -> Result<(), YamlWorkflowError> {
match behavior_type {
"retry" | "suspend" | "terminate" | "compensate" => Ok(()),
other => Err(YamlWorkflowError::Validation(format!(
"Invalid error behavior type: '{}'. Must be retry, suspend, terminate, or compensate",
other
))),
}
}