feat(wfe-core): add output_data to ExecutionResult and register_step_factory to host
Core plumbing for YAML workflow support: - Add output_data field to ExecutionResult for step output capture - Executor merges output_data into workflow.data after step completion - Add register_step_factory(key, factory) to WorkflowHost for dynamic step registration by external crates like wfe-yaml
This commit is contained in:
@@ -4,10 +4,12 @@ use crate::models::{
|
|||||||
EventSubscription, ExecutionPointer, ExecutionResult, PointerStatus, WorkflowDefinition,
|
EventSubscription, ExecutionPointer, ExecutionResult, PointerStatus, WorkflowDefinition,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Outcome of processing an ExecutionResult: new pointers and optional subscriptions.
|
/// Outcome of processing an ExecutionResult: new pointers, subscriptions, and output data.
|
||||||
pub struct ProcessResult {
|
pub struct ProcessResult {
|
||||||
pub new_pointers: Vec<ExecutionPointer>,
|
pub new_pointers: Vec<ExecutionPointer>,
|
||||||
pub subscriptions: Vec<EventSubscription>,
|
pub subscriptions: Vec<EventSubscription>,
|
||||||
|
/// Output data to merge into workflow.data (from step's output_data field).
|
||||||
|
pub output_data: Option<serde_json::Value>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process an ExecutionResult and update the pointer accordingly.
|
/// Process an ExecutionResult and update the pointer accordingly.
|
||||||
@@ -113,6 +115,7 @@ pub fn process_result(
|
|||||||
ProcessResult {
|
ProcessResult {
|
||||||
new_pointers,
|
new_pointers,
|
||||||
subscriptions,
|
subscriptions,
|
||||||
|
output_data: result.output_data.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -214,6 +214,14 @@ impl WorkflowExecutor {
|
|||||||
|
|
||||||
all_subscriptions.extend(process_result.subscriptions);
|
all_subscriptions.extend(process_result.subscriptions);
|
||||||
|
|
||||||
|
// Merge output data into workflow.data.
|
||||||
|
#[allow(clippy::collapsible_if)]
|
||||||
|
if let Some(serde_json::Value::Object(out_map)) = process_result.output_data {
|
||||||
|
if let serde_json::Value::Object(wf_map) = &mut workflow.data {
|
||||||
|
wf_map.extend(out_map);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Add new pointers.
|
// Add new pointers.
|
||||||
for new_pointer in process_result.new_pointers {
|
for new_pointer in process_result.new_pointers {
|
||||||
workflow.execution_pointers.push(new_pointer);
|
workflow.execution_pointers.push(new_pointer);
|
||||||
|
|||||||
@@ -26,6 +26,9 @@ pub struct ExecutionResult {
|
|||||||
pub branch_values: Option<Vec<serde_json::Value>>,
|
pub branch_values: Option<Vec<serde_json::Value>>,
|
||||||
/// Poll endpoint configuration for external service polling.
|
/// Poll endpoint configuration for external service polling.
|
||||||
pub poll_endpoint: Option<PollEndpointConfig>,
|
pub poll_endpoint: Option<PollEndpointConfig>,
|
||||||
|
/// Output data to merge into workflow.data after step completion.
|
||||||
|
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||||
|
pub output_data: Option<serde_json::Value>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ExecutionResult {
|
impl ExecutionResult {
|
||||||
|
|||||||
@@ -201,6 +201,17 @@ impl WorkflowHost {
|
|||||||
sr.register::<S>();
|
sr.register::<S>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Register a step factory with an explicit key and factory function.
|
||||||
|
/// Used by wfe-yaml and other dynamic step sources.
|
||||||
|
pub async fn register_step_factory(
|
||||||
|
&self,
|
||||||
|
key: &str,
|
||||||
|
factory: impl Fn() -> Box<dyn StepBody> + Send + Sync + 'static,
|
||||||
|
) {
|
||||||
|
let mut sr = self.step_registry.write().await;
|
||||||
|
sr.register_factory(key, factory);
|
||||||
|
}
|
||||||
|
|
||||||
/// Start a new workflow instance.
|
/// Start a new workflow instance.
|
||||||
#[tracing::instrument(
|
#[tracing::instrument(
|
||||||
name = "workflow.start",
|
name = "workflow.start",
|
||||||
|
|||||||
Reference in New Issue
Block a user