From 8d0f83da3c6beafe3fdf7c8d9cd318cf73b1fc23 Mon Sep 17 00:00:00 2001 From: Sienna Meridian Satterwhite Date: Wed, 25 Mar 2026 21:16:09 +0000 Subject: [PATCH] feat(wfe-core): add output_data to ExecutionResult and register_step_factory to host Core plumbing for YAML workflow support: - Add output_data field to ExecutionResult for step output capture - Executor merges output_data into workflow.data after step completion - Add register_step_factory(key, factory) to WorkflowHost for dynamic step registration by external crates like wfe-yaml --- wfe-core/src/executor/result_processor.rs | 5 ++++- wfe-core/src/executor/workflow_executor.rs | 8 ++++++++ wfe-core/src/models/execution_result.rs | 3 +++ wfe/src/host.rs | 11 +++++++++++ 4 files changed, 26 insertions(+), 1 deletion(-) diff --git a/wfe-core/src/executor/result_processor.rs b/wfe-core/src/executor/result_processor.rs index 13a1d64..645e8af 100644 --- a/wfe-core/src/executor/result_processor.rs +++ b/wfe-core/src/executor/result_processor.rs @@ -4,10 +4,12 @@ use crate::models::{ EventSubscription, ExecutionPointer, ExecutionResult, PointerStatus, WorkflowDefinition, }; -/// Outcome of processing an ExecutionResult: new pointers and optional subscriptions. +/// Outcome of processing an ExecutionResult: new pointers, subscriptions, and output data. pub struct ProcessResult { pub new_pointers: Vec, pub subscriptions: Vec, + /// Output data to merge into workflow.data (from step's output_data field). + pub output_data: Option, } /// Process an ExecutionResult and update the pointer accordingly. @@ -113,6 +115,7 @@ pub fn process_result( ProcessResult { new_pointers, subscriptions, + output_data: result.output_data.clone(), } } diff --git a/wfe-core/src/executor/workflow_executor.rs b/wfe-core/src/executor/workflow_executor.rs index 0e705bf..11378d2 100644 --- a/wfe-core/src/executor/workflow_executor.rs +++ b/wfe-core/src/executor/workflow_executor.rs @@ -214,6 +214,14 @@ impl WorkflowExecutor { all_subscriptions.extend(process_result.subscriptions); + // Merge output data into workflow.data. + #[allow(clippy::collapsible_if)] + if let Some(serde_json::Value::Object(out_map)) = process_result.output_data { + if let serde_json::Value::Object(wf_map) = &mut workflow.data { + wf_map.extend(out_map); + } + } + // Add new pointers. for new_pointer in process_result.new_pointers { workflow.execution_pointers.push(new_pointer); diff --git a/wfe-core/src/models/execution_result.rs b/wfe-core/src/models/execution_result.rs index 9551e53..2e79296 100644 --- a/wfe-core/src/models/execution_result.rs +++ b/wfe-core/src/models/execution_result.rs @@ -26,6 +26,9 @@ pub struct ExecutionResult { pub branch_values: Option>, /// Poll endpoint configuration for external service polling. pub poll_endpoint: Option, + /// Output data to merge into workflow.data after step completion. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub output_data: Option, } impl ExecutionResult { diff --git a/wfe/src/host.rs b/wfe/src/host.rs index b0fffc5..0be340e 100644 --- a/wfe/src/host.rs +++ b/wfe/src/host.rs @@ -201,6 +201,17 @@ impl WorkflowHost { sr.register::(); } + /// Register a step factory with an explicit key and factory function. + /// Used by wfe-yaml and other dynamic step sources. + pub async fn register_step_factory( + &self, + key: &str, + factory: impl Fn() -> Box + Send + Sync + 'static, + ) { + let mut sr = self.step_registry.write().await; + sr.register_factory(key, factory); + } + /// Start a new workflow instance. #[tracing::instrument( name = "workflow.start",