diff --git a/wfe-yaml/src/compiler.rs b/wfe-yaml/src/compiler.rs index bf74fde..52f721c 100644 --- a/wfe-yaml/src/compiler.rs +++ b/wfe-yaml/src/compiler.rs @@ -13,6 +13,7 @@ use crate::executors::deno::{DenoConfig, DenoPermissions, DenoStep}; use wfe_buildkit::{BuildkitConfig, BuildkitStep}; #[cfg(feature = "containerd")] use wfe_containerd::{ContainerdConfig, ContainerdStep}; +use wfe_core::primitives::sub_workflow::SubWorkflowStep; use crate::schema::{WorkflowSpec, YamlErrorBehavior, YamlStep}; /// Configuration for a sub-workflow step. @@ -23,30 +24,6 @@ pub struct SubWorkflowConfig { pub output_keys: Vec, } -/// Placeholder step body for sub-workflow steps. -/// -/// This is a compile-time placeholder. When wfe-core provides a real -/// `SubWorkflowStep`, it should replace this. The placeholder always -/// returns `ExecutionResult::Next` so compilation and basic tests work. -#[derive(Debug, Default)] -pub struct SubWorkflowPlaceholderStep { - pub workflow_id: String, - pub version: u32, - pub output_keys: Vec, -} - -#[async_trait::async_trait] -impl StepBody for SubWorkflowPlaceholderStep { - async fn run( - &mut self, - context: &wfe_core::traits::StepExecutionContext<'_>, - ) -> wfe_core::Result { - let _ = context; - // Placeholder: a real implementation would start the child workflow. - Ok(wfe_core::models::ExecutionResult::next()) - } -} - /// Factory type alias for step creation closures. pub type StepFactory = Box Box + Send + Sync>; @@ -346,10 +323,13 @@ fn build_step_config_and_factory( })?; let config_clone = sub_config.clone(); let factory: StepFactory = Box::new(move || { - Box::new(SubWorkflowPlaceholderStep { + Box::new(SubWorkflowStep { workflow_id: config_clone.workflow_id.clone(), version: config_clone.version, output_keys: config_clone.output_keys.clone(), + inputs: serde_json::Value::Null, + input_schema: None, + output_schema: None, }) as Box }); Ok((key, value, factory)) diff --git a/wfe-yaml/tests/compiler.rs b/wfe-yaml/tests/compiler.rs index c2e4acc..eec271e 100644 --- a/wfe-yaml/tests/compiler.rs +++ b/wfe-yaml/tests/compiler.rs @@ -1013,3 +1013,92 @@ workflow: assert_eq!(setup.outcomes[0].next_step, run_child.id); assert_eq!(run_child.outcomes[0].next_step, cleanup.id); } + +/// Regression test: SubWorkflowStep must actually wait for child completion, +/// not return next() immediately. The compiled factory must produce a real +/// SubWorkflowStep (from wfe-core), not a placeholder. +#[tokio::test] +async fn workflow_step_factory_produces_real_sub_workflow_step() { + use wfe_core::models::{ExecutionPointer, WorkflowInstance, WorkflowStep as WfStep}; + use wfe_core::traits::step::{HostContext, StepExecutionContext}; + use std::pin::Pin; + use std::future::Future; + use std::sync::Mutex; + + let yaml = r#" +workflows: + - id: child + version: 1 + steps: + - name: do-work + type: shell + config: + run: echo done + + - id: parent + version: 1 + steps: + - name: run-child + type: workflow + config: + workflow: child +"#; + let config = HashMap::new(); + let workflows = load_workflow_from_str(yaml, &config).unwrap(); + + // Find the parent workflow's factory for the "run-child" step + let parent = workflows.iter().find(|w| w.definition.id == "parent").unwrap(); + let factory_key = parent.step_factories.iter() + .find(|(k, _)| k.contains("run-child")) + .map(|(k, _)| k.clone()) + .expect("run-child factory should exist"); + + // Create a step from the factory + let factory = &parent.step_factories.iter() + .find(|(k, _)| *k == factory_key) + .unwrap().1; + let mut step = factory(); + + // Mock host context that records the start_workflow call + struct MockHost { called: Mutex } + impl HostContext for MockHost { + fn start_workflow(&self, _def: &str, _ver: u32, _data: serde_json::Value) + -> Pin> + Send + '_>> + { + *self.called.lock().unwrap() = true; + Box::pin(async { Ok("child-instance-id".to_string()) }) + } + } + + let host = MockHost { called: Mutex::new(false) }; + let pointer = ExecutionPointer::new(0); + let wf_step = WfStep::new(0, &factory_key); + let workflow = WorkflowInstance::new("parent", 1, serde_json::json!({})); + let ctx = StepExecutionContext { + item: None, + execution_pointer: &pointer, + persistence_data: None, + step: &wf_step, + workflow: &workflow, + cancellation_token: tokio_util::sync::CancellationToken::new(), + host_context: Some(&host), + }; + + let result = step.run(&ctx).await.unwrap(); + + // THE KEY ASSERTION: must NOT proceed immediately. + // It must return wait_for_event so the parent waits for the child. + assert!( + !result.proceed, + "SubWorkflowStep must NOT proceed immediately — it should wait for child completion" + ); + assert_eq!( + result.event_name.as_deref(), + Some("wfe.workflow.completed"), + "SubWorkflowStep must wait for wfe.workflow.completed event" + ); + assert!( + *host.called.lock().unwrap(), + "SubWorkflowStep must call host_context.start_workflow()" + ); +} diff --git a/wfe-yaml/tests/deno.rs b/wfe-yaml/tests/deno.rs index e5ec102..c0a005c 100644 --- a/wfe-yaml/tests/deno.rs +++ b/wfe-yaml/tests/deno.rs @@ -41,6 +41,7 @@ fn make_context<'a>( step, workflow, cancellation_token: tokio_util::sync::CancellationToken::new(), + host_context: None, } }