feat(wfe-yaml): add workflow step type, cross-ref validation, cycle detection
Compiler dispatches type: workflow to SubWorkflowStep. Validation detects circular workflow references via DFS with coloring. Cross- workflow reference checking for multi-workflow files. Duplicate workflow ID detection. 28 edge case tests for validation paths.
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use serde::Serialize;
|
||||
use wfe_core::models::error_behavior::ErrorBehavior;
|
||||
use wfe_core::models::workflow_definition::{StepOutcome, WorkflowDefinition, WorkflowStep};
|
||||
use wfe_core::traits::StepBody;
|
||||
@@ -14,6 +15,38 @@ use wfe_buildkit::{BuildkitConfig, BuildkitStep};
|
||||
use wfe_containerd::{ContainerdConfig, ContainerdStep};
|
||||
use crate::schema::{WorkflowSpec, YamlErrorBehavior, YamlStep};
|
||||
|
||||
/// Configuration for a sub-workflow step.
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct SubWorkflowConfig {
|
||||
pub workflow_id: String,
|
||||
pub version: u32,
|
||||
pub output_keys: Vec<String>,
|
||||
}
|
||||
|
||||
/// Placeholder step body for sub-workflow steps.
|
||||
///
|
||||
/// This is a compile-time placeholder. When wfe-core provides a real
|
||||
/// `SubWorkflowStep`, it should replace this. The placeholder always
|
||||
/// returns `ExecutionResult::Next` so compilation and basic tests work.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct SubWorkflowPlaceholderStep {
|
||||
pub workflow_id: String,
|
||||
pub version: u32,
|
||||
pub output_keys: Vec<String>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl StepBody for SubWorkflowPlaceholderStep {
|
||||
async fn run(
|
||||
&mut self,
|
||||
context: &wfe_core::traits::StepExecutionContext<'_>,
|
||||
) -> wfe_core::Result<wfe_core::models::ExecutionResult> {
|
||||
let _ = context;
|
||||
// Placeholder: a real implementation would start the child workflow.
|
||||
Ok(wfe_core::models::ExecutionResult::next())
|
||||
}
|
||||
}
|
||||
|
||||
/// Factory type alias for step creation closures.
|
||||
pub type StepFactory = Box<dyn Fn() -> Box<dyn StepBody> + Send + Sync>;
|
||||
|
||||
@@ -284,6 +317,43 @@ fn build_step_config_and_factory(
|
||||
});
|
||||
Ok((key, value, factory))
|
||||
}
|
||||
"workflow" => {
|
||||
let config = step.config.as_ref().ok_or_else(|| {
|
||||
YamlWorkflowError::Compilation(format!(
|
||||
"Workflow step '{}' is missing 'config' section",
|
||||
step.name
|
||||
))
|
||||
})?;
|
||||
let child_workflow_id = config.child_workflow.as_ref().ok_or_else(|| {
|
||||
YamlWorkflowError::Compilation(format!(
|
||||
"Workflow step '{}' must have 'config.workflow'",
|
||||
step.name
|
||||
))
|
||||
})?;
|
||||
let child_version = config.child_version.unwrap_or(1);
|
||||
|
||||
let sub_config = SubWorkflowConfig {
|
||||
workflow_id: child_workflow_id.clone(),
|
||||
version: child_version,
|
||||
output_keys: step.outputs.iter().map(|o| o.name.clone()).collect(),
|
||||
};
|
||||
|
||||
let key = format!("wfe_yaml::workflow::{}", step.name);
|
||||
let value = serde_json::to_value(&sub_config).map_err(|e| {
|
||||
YamlWorkflowError::Compilation(format!(
|
||||
"Failed to serialize workflow config: {e}"
|
||||
))
|
||||
})?;
|
||||
let config_clone = sub_config.clone();
|
||||
let factory: StepFactory = Box::new(move || {
|
||||
Box::new(SubWorkflowPlaceholderStep {
|
||||
workflow_id: config_clone.workflow_id.clone(),
|
||||
version: config_clone.version,
|
||||
output_keys: config_clone.output_keys.clone(),
|
||||
}) as Box<dyn StepBody>
|
||||
});
|
||||
Ok((key, value, factory))
|
||||
}
|
||||
other => Err(YamlWorkflowError::Compilation(format!(
|
||||
"Unknown step type: '{other}'"
|
||||
))),
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::collections::HashSet;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use crate::error::YamlWorkflowError;
|
||||
use crate::schema::{WorkflowSpec, YamlStep};
|
||||
@@ -22,6 +22,140 @@ pub fn validate(spec: &WorkflowSpec) -> Result<(), YamlWorkflowError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Validate multiple workflow specs from a multi-workflow file.
|
||||
/// Checks cross-workflow references and cycles in addition to per-workflow validation.
|
||||
pub fn validate_multi(specs: &[WorkflowSpec]) -> Result<(), YamlWorkflowError> {
|
||||
// Validate each workflow individually.
|
||||
for spec in specs {
|
||||
validate(spec)?;
|
||||
}
|
||||
|
||||
// Check for duplicate workflow IDs.
|
||||
let mut seen_ids = HashSet::new();
|
||||
for spec in specs {
|
||||
if !seen_ids.insert(&spec.id) {
|
||||
return Err(YamlWorkflowError::Validation(format!(
|
||||
"Duplicate workflow ID: '{}'",
|
||||
spec.id
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
// Validate cross-workflow references and detect cycles.
|
||||
validate_workflow_references(specs)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Validate that workflow step references point to known workflows
|
||||
/// and detect circular dependencies.
|
||||
fn validate_workflow_references(specs: &[WorkflowSpec]) -> Result<(), YamlWorkflowError> {
|
||||
let known_ids: HashSet<&str> = specs.iter().map(|s| s.id.as_str()).collect();
|
||||
|
||||
// Build a dependency graph: workflow_id -> set of referenced workflow_ids.
|
||||
let mut deps: HashMap<&str, HashSet<&str>> = HashMap::new();
|
||||
|
||||
for spec in specs {
|
||||
let mut spec_deps = HashSet::new();
|
||||
collect_workflow_refs(&spec.steps, &mut spec_deps);
|
||||
deps.insert(spec.id.as_str(), spec_deps);
|
||||
}
|
||||
|
||||
// Detect cycles using DFS with coloring.
|
||||
detect_cycles(&known_ids, &deps)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Collect all workflow IDs referenced by `type: workflow` steps.
|
||||
fn collect_workflow_refs<'a>(steps: &'a [YamlStep], refs: &mut HashSet<&'a str>) {
|
||||
for step in steps {
|
||||
if step.step_type.as_deref() == Some("workflow")
|
||||
&& let Some(ref config) = step.config
|
||||
&& let Some(ref wf_id) = config.child_workflow
|
||||
{
|
||||
refs.insert(wf_id.as_str());
|
||||
}
|
||||
if let Some(ref children) = step.parallel {
|
||||
collect_workflow_refs(children, refs);
|
||||
}
|
||||
if let Some(ref hook) = step.on_success {
|
||||
collect_workflow_refs(std::slice::from_ref(hook.as_ref()), refs);
|
||||
}
|
||||
if let Some(ref hook) = step.on_failure {
|
||||
collect_workflow_refs(std::slice::from_ref(hook.as_ref()), refs);
|
||||
}
|
||||
if let Some(ref hook) = step.ensure {
|
||||
collect_workflow_refs(std::slice::from_ref(hook.as_ref()), refs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Detect circular references in the workflow dependency graph.
|
||||
fn detect_cycles(
|
||||
known_ids: &HashSet<&str>,
|
||||
deps: &HashMap<&str, HashSet<&str>>,
|
||||
) -> Result<(), YamlWorkflowError> {
|
||||
#[derive(Clone, Copy, PartialEq)]
|
||||
enum Color {
|
||||
White,
|
||||
Gray,
|
||||
Black,
|
||||
}
|
||||
|
||||
let mut colors: HashMap<&str, Color> = known_ids.iter().map(|id| (*id, Color::White)).collect();
|
||||
|
||||
fn dfs<'a>(
|
||||
node: &'a str,
|
||||
deps: &HashMap<&str, HashSet<&'a str>>,
|
||||
colors: &mut HashMap<&'a str, Color>,
|
||||
path: &mut Vec<&'a str>,
|
||||
) -> Result<(), YamlWorkflowError> {
|
||||
colors.insert(node, Color::Gray);
|
||||
path.push(node);
|
||||
|
||||
if let Some(neighbors) = deps.get(node) {
|
||||
for &neighbor in neighbors {
|
||||
match colors.get(neighbor) {
|
||||
Some(Color::Gray) => {
|
||||
// Found a cycle. Build the cycle path for the error message.
|
||||
let cycle_start = path.iter().position(|&n| n == neighbor).unwrap();
|
||||
let cycle: Vec<&str> = path[cycle_start..].to_vec();
|
||||
return Err(YamlWorkflowError::Validation(format!(
|
||||
"Circular workflow reference detected: {} -> {}",
|
||||
cycle.join(" -> "),
|
||||
neighbor
|
||||
)));
|
||||
}
|
||||
Some(Color::White) | None => {
|
||||
// Only recurse into nodes that are in our known set.
|
||||
if colors.contains_key(neighbor) {
|
||||
dfs(neighbor, deps, colors, path)?;
|
||||
}
|
||||
}
|
||||
Some(Color::Black) => {
|
||||
// Already fully processed, skip.
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
path.pop();
|
||||
colors.insert(node, Color::Black);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
let nodes: Vec<&str> = known_ids.iter().copied().collect();
|
||||
for node in nodes {
|
||||
if colors.get(node) == Some(&Color::White) {
|
||||
let mut path = Vec::new();
|
||||
dfs(node, deps, &mut colors, &mut path)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn validate_steps(
|
||||
steps: &[YamlStep],
|
||||
seen_names: &mut HashSet<String>,
|
||||
@@ -173,6 +307,24 @@ fn validate_steps(
|
||||
}
|
||||
}
|
||||
|
||||
// Workflow steps must have config.workflow.
|
||||
if let Some(ref step_type) = step.step_type
|
||||
&& step_type == "workflow"
|
||||
{
|
||||
let config = step.config.as_ref().ok_or_else(|| {
|
||||
YamlWorkflowError::Validation(format!(
|
||||
"Workflow step '{}' must have a 'config' section",
|
||||
step.name
|
||||
))
|
||||
})?;
|
||||
if config.child_workflow.is_none() {
|
||||
return Err(YamlWorkflowError::Validation(format!(
|
||||
"Workflow step '{}' must have 'config.workflow'",
|
||||
step.name
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
// Validate step-level error behavior.
|
||||
if let Some(ref eb) = step.error_behavior {
|
||||
validate_error_behavior_type(&eb.behavior_type)?;
|
||||
|
||||
Reference in New Issue
Block a user