feat(wfe-core): add workflow executor with result processing and error handling

WorkflowExecutor: acquire lock, load instance, run steps, process
results, persist, release lock. StepRegistry for resolving step types.

ResultProcessor handles: next, outcome, persist, branch, sleep,
wait_for_event, poll_endpoint. ErrorHandler implements Retry (with
max_retries), Suspend, Terminate, and Compensate behaviors.
This commit is contained in:
2026-03-25 20:10:45 +00:00
parent 456c3c5b2e
commit a61e68d2a9
5 changed files with 1458 additions and 0 deletions

View File

@@ -0,0 +1,81 @@
use chrono::Utc;
use crate::models::{
ErrorBehavior, ExecutionPointer, PointerStatus, WorkflowDefinition, WorkflowStatus,
};
/// Outcome of handling a step error.
pub struct ErrorHandlerResult {
pub new_pointers: Vec<ExecutionPointer>,
/// If set, the workflow status should be changed to this value.
pub workflow_status: Option<WorkflowStatus>,
}
/// Handle a step execution error by applying the appropriate ErrorBehavior.
///
/// Updates the pointer in place and returns new pointers and optional workflow status change.
pub fn handle_error(
_error_msg: &str,
pointer: &mut ExecutionPointer,
definition: &WorkflowDefinition,
) -> ErrorHandlerResult {
let mut new_pointers = Vec::new();
let mut workflow_status = None;
// Determine error behavior: step-level override or definition default.
let step = definition.steps.iter().find(|s| s.id == pointer.step_id);
let behavior = step
.and_then(|s| s.error_behavior.clone())
.unwrap_or_else(|| definition.default_error_behavior.clone());
match behavior {
ErrorBehavior::Retry { interval, max_retries } => {
if max_retries > 0 && pointer.retry_count >= max_retries {
// Exceeded max retries, suspend the workflow
pointer.status = PointerStatus::Failed;
pointer.active = false;
workflow_status = Some(WorkflowStatus::Suspended);
tracing::warn!(
retry_count = pointer.retry_count,
max_retries,
"Max retries exceeded, suspending workflow"
);
} else {
pointer.retry_count += 1;
pointer.status = PointerStatus::Sleeping;
pointer.active = true;
pointer.sleep_until = Some(
Utc::now() + chrono::Duration::milliseconds(interval.as_millis() as i64),
);
}
}
ErrorBehavior::Suspend => {
pointer.active = false;
pointer.status = PointerStatus::Failed;
workflow_status = Some(WorkflowStatus::Suspended);
}
ErrorBehavior::Terminate => {
pointer.active = false;
pointer.status = PointerStatus::Failed;
workflow_status = Some(WorkflowStatus::Terminated);
}
ErrorBehavior::Compensate => {
pointer.active = false;
pointer.status = PointerStatus::Failed;
if let Some(step) = step
&& let Some(comp_step_id) = step.compensation_step_id
{
let mut comp_pointer = ExecutionPointer::new(comp_step_id);
comp_pointer.predecessor_id = Some(pointer.id.clone());
comp_pointer.scope = pointer.scope.clone();
new_pointers.push(comp_pointer);
}
}
}
ErrorHandlerResult {
new_pointers,
workflow_status,
}
}

View File

@@ -0,0 +1,7 @@
mod error_handler;
mod result_processor;
mod step_registry;
mod workflow_executor;
pub use step_registry::StepRegistry;
pub use workflow_executor::WorkflowExecutor;

View File

@@ -0,0 +1,148 @@
use chrono::Utc;
use crate::models::{
EventSubscription, ExecutionPointer, ExecutionResult, PointerStatus, WorkflowDefinition,
};
/// Outcome of processing an ExecutionResult: new pointers and optional subscriptions.
pub struct ProcessResult {
pub new_pointers: Vec<ExecutionPointer>,
pub subscriptions: Vec<EventSubscription>,
}
/// Process an ExecutionResult and update the pointer accordingly.
///
/// Returns new pointers to add and subscriptions to create.
pub fn process_result(
result: &ExecutionResult,
pointer: &mut ExecutionPointer,
definition: &WorkflowDefinition,
workflow_id: &str,
) -> ProcessResult {
let mut new_pointers = Vec::new();
let mut subscriptions = Vec::new();
if result.proceed {
// Step completed - mark pointer done.
pointer.active = false;
pointer.status = PointerStatus::Complete;
pointer.end_time = Some(Utc::now());
// Determine the next step via outcomes.
let step = definition.steps.iter().find(|s| s.id == pointer.step_id);
if let Some(step) = step {
let next_step_id = find_next_step(step, &result.outcome_value);
if let Some(next_id) = next_step_id {
let mut next_pointer = ExecutionPointer::new(next_id);
next_pointer.predecessor_id = Some(pointer.id.clone());
next_pointer.scope = pointer.scope.clone();
new_pointers.push(next_pointer);
}
}
if let Some(outcome_value) = &result.outcome_value {
pointer.outcome = Some(outcome_value.clone());
}
} else if let Some(branch_values) = &result.branch_values {
// Branch: create child pointers for each value.
pointer.status = PointerStatus::Running;
pointer.persistence_data = result.persistence_data.clone();
let step = definition.steps.iter().find(|s| s.id == pointer.step_id);
let child_step_ids: Vec<usize> = step.map(|s| s.children.clone()).unwrap_or_default();
let mut child_scope = pointer.scope.clone();
child_scope.push(pointer.id.clone());
for value in branch_values {
for &child_step_id in &child_step_ids {
let mut child_pointer = ExecutionPointer::new(child_step_id);
child_pointer.context_item = Some(value.clone());
child_pointer.scope = child_scope.clone();
child_pointer.predecessor_id = Some(pointer.id.clone());
pointer.children.push(child_pointer.id.clone());
new_pointers.push(child_pointer);
}
}
} else if result.event_name.is_some() {
// Wait for event.
pointer.status = PointerStatus::WaitingForEvent;
pointer.active = false;
pointer.event_name = result.event_name.clone();
pointer.event_key = result.event_key.clone();
if let (Some(event_name), Some(event_key)) =
(&result.event_name, &result.event_key)
{
let as_of = result.event_as_of.unwrap_or_else(Utc::now);
let sub = EventSubscription::new(
workflow_id,
pointer.step_id,
pointer.id.as_str(),
event_name.as_str(),
event_key.as_str(),
as_of,
);
subscriptions.push(sub);
}
} else if result.sleep_for.is_some() {
// Sleep.
pointer.status = PointerStatus::Sleeping;
pointer.active = true;
if let Some(duration) = result.sleep_for {
pointer.sleep_until =
Some(Utc::now() + chrono::Duration::milliseconds(duration.as_millis() as i64));
}
pointer.persistence_data = result.persistence_data.clone();
} else if let Some(poll_config) = &result.poll_endpoint {
// Poll endpoint: store config and sleep for the interval.
pointer.status = PointerStatus::Sleeping;
pointer.active = true;
pointer.sleep_until = Some(
Utc::now()
+ chrono::Duration::milliseconds(poll_config.interval.as_millis() as i64),
);
pointer.persistence_data = result.persistence_data.clone();
} else if result.persistence_data.is_some() {
// Persist: keep pointer active with data.
pointer.active = true;
pointer.status = PointerStatus::Running;
pointer.persistence_data = result.persistence_data.clone();
}
ProcessResult {
new_pointers,
subscriptions,
}
}
/// Find the next step ID based on the step's outcomes and the result's outcome value.
fn find_next_step(
step: &crate::models::WorkflowStep,
outcome_value: &Option<serde_json::Value>,
) -> Option<usize> {
if step.outcomes.is_empty() {
return None;
}
if let Some(value) = outcome_value {
// Try to match a specific outcome value.
for outcome in &step.outcomes {
if let Some(ov) = &outcome.value
&& ov == value
{
return Some(outcome.next_step);
}
}
}
// Fall back to the default outcome (value == None).
for outcome in &step.outcomes {
if outcome.value.is_none() {
return Some(outcome.next_step);
}
}
// If no default, take the first outcome.
step.outcomes.first().map(|o| o.next_step)
}

View File

@@ -0,0 +1,42 @@
use std::collections::HashMap;
use crate::traits::StepBody;
/// Registry of step factories keyed by type name.
pub struct StepRegistry {
factories: HashMap<String, Box<dyn Fn() -> Box<dyn StepBody> + Send + Sync>>,
}
impl StepRegistry {
pub fn new() -> Self {
Self {
factories: HashMap::new(),
}
}
/// Register a step type using its full type name as the key.
pub fn register<S: StepBody + Default + 'static>(&mut self) {
let key = std::any::type_name::<S>().to_string();
self.factories.insert(key, Box::new(|| Box::new(S::default())));
}
/// Register a step factory with an explicit key and factory function.
pub fn register_factory(
&mut self,
key: &str,
factory: impl Fn() -> Box<dyn StepBody> + Send + Sync + 'static,
) {
self.factories.insert(key.to_string(), Box::new(factory));
}
/// Resolve a step instance by type name.
pub fn resolve(&self, step_type: &str) -> Option<Box<dyn StepBody>> {
self.factories.get(step_type).map(|f| f())
}
}
impl Default for StepRegistry {
fn default() -> Self {
Self::new()
}
}

File diff suppressed because it is too large Load Diff