diff --git a/wfe-core/src/executor/error_handler.rs b/wfe-core/src/executor/error_handler.rs new file mode 100644 index 0000000..b5d7611 --- /dev/null +++ b/wfe-core/src/executor/error_handler.rs @@ -0,0 +1,81 @@ +use chrono::Utc; + +use crate::models::{ + ErrorBehavior, ExecutionPointer, PointerStatus, WorkflowDefinition, WorkflowStatus, +}; + +/// Outcome of handling a step error. +pub struct ErrorHandlerResult { + pub new_pointers: Vec, + /// If set, the workflow status should be changed to this value. + pub workflow_status: Option, +} + +/// Handle a step execution error by applying the appropriate ErrorBehavior. +/// +/// Updates the pointer in place and returns new pointers and optional workflow status change. +pub fn handle_error( + _error_msg: &str, + pointer: &mut ExecutionPointer, + definition: &WorkflowDefinition, +) -> ErrorHandlerResult { + let mut new_pointers = Vec::new(); + let mut workflow_status = None; + + // Determine error behavior: step-level override or definition default. + let step = definition.steps.iter().find(|s| s.id == pointer.step_id); + let behavior = step + .and_then(|s| s.error_behavior.clone()) + .unwrap_or_else(|| definition.default_error_behavior.clone()); + + match behavior { + ErrorBehavior::Retry { interval, max_retries } => { + if max_retries > 0 && pointer.retry_count >= max_retries { + // Exceeded max retries, suspend the workflow + pointer.status = PointerStatus::Failed; + pointer.active = false; + workflow_status = Some(WorkflowStatus::Suspended); + tracing::warn!( + retry_count = pointer.retry_count, + max_retries, + "Max retries exceeded, suspending workflow" + ); + } else { + pointer.retry_count += 1; + pointer.status = PointerStatus::Sleeping; + pointer.active = true; + pointer.sleep_until = Some( + Utc::now() + chrono::Duration::milliseconds(interval.as_millis() as i64), + ); + } + } + ErrorBehavior::Suspend => { + pointer.active = false; + pointer.status = PointerStatus::Failed; + workflow_status = Some(WorkflowStatus::Suspended); + } + ErrorBehavior::Terminate => { + pointer.active = false; + pointer.status = PointerStatus::Failed; + workflow_status = Some(WorkflowStatus::Terminated); + } + ErrorBehavior::Compensate => { + pointer.active = false; + pointer.status = PointerStatus::Failed; + + if let Some(step) = step + && let Some(comp_step_id) = step.compensation_step_id + { + let mut comp_pointer = ExecutionPointer::new(comp_step_id); + comp_pointer.predecessor_id = Some(pointer.id.clone()); + comp_pointer.scope = pointer.scope.clone(); + new_pointers.push(comp_pointer); + } + } + } + + ErrorHandlerResult { + new_pointers, + workflow_status, + } +} diff --git a/wfe-core/src/executor/mod.rs b/wfe-core/src/executor/mod.rs new file mode 100644 index 0000000..99a886f --- /dev/null +++ b/wfe-core/src/executor/mod.rs @@ -0,0 +1,7 @@ +mod error_handler; +mod result_processor; +mod step_registry; +mod workflow_executor; + +pub use step_registry::StepRegistry; +pub use workflow_executor::WorkflowExecutor; diff --git a/wfe-core/src/executor/result_processor.rs b/wfe-core/src/executor/result_processor.rs new file mode 100644 index 0000000..13a1d64 --- /dev/null +++ b/wfe-core/src/executor/result_processor.rs @@ -0,0 +1,148 @@ +use chrono::Utc; + +use crate::models::{ + EventSubscription, ExecutionPointer, ExecutionResult, PointerStatus, WorkflowDefinition, +}; + +/// Outcome of processing an ExecutionResult: new pointers and optional subscriptions. +pub struct ProcessResult { + pub new_pointers: Vec, + pub subscriptions: Vec, +} + +/// Process an ExecutionResult and update the pointer accordingly. +/// +/// Returns new pointers to add and subscriptions to create. +pub fn process_result( + result: &ExecutionResult, + pointer: &mut ExecutionPointer, + definition: &WorkflowDefinition, + workflow_id: &str, +) -> ProcessResult { + let mut new_pointers = Vec::new(); + let mut subscriptions = Vec::new(); + + if result.proceed { + // Step completed - mark pointer done. + pointer.active = false; + pointer.status = PointerStatus::Complete; + pointer.end_time = Some(Utc::now()); + + // Determine the next step via outcomes. + let step = definition.steps.iter().find(|s| s.id == pointer.step_id); + if let Some(step) = step { + let next_step_id = find_next_step(step, &result.outcome_value); + if let Some(next_id) = next_step_id { + let mut next_pointer = ExecutionPointer::new(next_id); + next_pointer.predecessor_id = Some(pointer.id.clone()); + next_pointer.scope = pointer.scope.clone(); + new_pointers.push(next_pointer); + } + } + + if let Some(outcome_value) = &result.outcome_value { + pointer.outcome = Some(outcome_value.clone()); + } + } else if let Some(branch_values) = &result.branch_values { + // Branch: create child pointers for each value. + pointer.status = PointerStatus::Running; + pointer.persistence_data = result.persistence_data.clone(); + + let step = definition.steps.iter().find(|s| s.id == pointer.step_id); + let child_step_ids: Vec = step.map(|s| s.children.clone()).unwrap_or_default(); + + let mut child_scope = pointer.scope.clone(); + child_scope.push(pointer.id.clone()); + + for value in branch_values { + for &child_step_id in &child_step_ids { + let mut child_pointer = ExecutionPointer::new(child_step_id); + child_pointer.context_item = Some(value.clone()); + child_pointer.scope = child_scope.clone(); + child_pointer.predecessor_id = Some(pointer.id.clone()); + pointer.children.push(child_pointer.id.clone()); + new_pointers.push(child_pointer); + } + } + } else if result.event_name.is_some() { + // Wait for event. + pointer.status = PointerStatus::WaitingForEvent; + pointer.active = false; + pointer.event_name = result.event_name.clone(); + pointer.event_key = result.event_key.clone(); + + if let (Some(event_name), Some(event_key)) = + (&result.event_name, &result.event_key) + { + let as_of = result.event_as_of.unwrap_or_else(Utc::now); + let sub = EventSubscription::new( + workflow_id, + pointer.step_id, + pointer.id.as_str(), + event_name.as_str(), + event_key.as_str(), + as_of, + ); + subscriptions.push(sub); + } + } else if result.sleep_for.is_some() { + // Sleep. + pointer.status = PointerStatus::Sleeping; + pointer.active = true; + if let Some(duration) = result.sleep_for { + pointer.sleep_until = + Some(Utc::now() + chrono::Duration::milliseconds(duration.as_millis() as i64)); + } + pointer.persistence_data = result.persistence_data.clone(); + } else if let Some(poll_config) = &result.poll_endpoint { + // Poll endpoint: store config and sleep for the interval. + pointer.status = PointerStatus::Sleeping; + pointer.active = true; + pointer.sleep_until = Some( + Utc::now() + + chrono::Duration::milliseconds(poll_config.interval.as_millis() as i64), + ); + pointer.persistence_data = result.persistence_data.clone(); + } else if result.persistence_data.is_some() { + // Persist: keep pointer active with data. + pointer.active = true; + pointer.status = PointerStatus::Running; + pointer.persistence_data = result.persistence_data.clone(); + } + + ProcessResult { + new_pointers, + subscriptions, + } +} + +/// Find the next step ID based on the step's outcomes and the result's outcome value. +fn find_next_step( + step: &crate::models::WorkflowStep, + outcome_value: &Option, +) -> Option { + if step.outcomes.is_empty() { + return None; + } + + if let Some(value) = outcome_value { + // Try to match a specific outcome value. + for outcome in &step.outcomes { + if let Some(ov) = &outcome.value + && ov == value + { + return Some(outcome.next_step); + } + } + } + + // Fall back to the default outcome (value == None). + for outcome in &step.outcomes { + if outcome.value.is_none() { + return Some(outcome.next_step); + } + } + + // If no default, take the first outcome. + step.outcomes.first().map(|o| o.next_step) +} diff --git a/wfe-core/src/executor/step_registry.rs b/wfe-core/src/executor/step_registry.rs new file mode 100644 index 0000000..b6a8b76 --- /dev/null +++ b/wfe-core/src/executor/step_registry.rs @@ -0,0 +1,42 @@ +use std::collections::HashMap; + +use crate::traits::StepBody; + +/// Registry of step factories keyed by type name. +pub struct StepRegistry { + factories: HashMap Box + Send + Sync>>, +} + +impl StepRegistry { + pub fn new() -> Self { + Self { + factories: HashMap::new(), + } + } + + /// Register a step type using its full type name as the key. + pub fn register(&mut self) { + let key = std::any::type_name::().to_string(); + self.factories.insert(key, Box::new(|| Box::new(S::default()))); + } + + /// Register a step factory with an explicit key and factory function. + pub fn register_factory( + &mut self, + key: &str, + factory: impl Fn() -> Box + Send + Sync + 'static, + ) { + self.factories.insert(key.to_string(), Box::new(factory)); + } + + /// Resolve a step instance by type name. + pub fn resolve(&self, step_type: &str) -> Option> { + self.factories.get(step_type).map(|f| f()) + } +} + +impl Default for StepRegistry { + fn default() -> Self { + Self::new() + } +} diff --git a/wfe-core/src/executor/workflow_executor.rs b/wfe-core/src/executor/workflow_executor.rs new file mode 100644 index 0000000..93f862c --- /dev/null +++ b/wfe-core/src/executor/workflow_executor.rs @@ -0,0 +1,1180 @@ +use std::sync::Arc; + +use chrono::Utc; +use tracing::{debug, error, warn}; + +use super::error_handler; +use super::result_processor; +use super::step_registry::StepRegistry; +use crate::models::{ + ExecutionError, PointerStatus, QueueType, WorkflowDefinition, WorkflowStatus, +}; +use crate::traits::{ + DistributedLockProvider, LifecyclePublisher, PersistenceProvider, QueueProvider, SearchIndex, + StepExecutionContext, +}; +use crate::{Result, WfeError}; + +/// The core workflow executor. Processes a single workflow instance per `execute()` call. +pub struct WorkflowExecutor { + pub persistence: Arc, + pub lock_provider: Arc, + pub queue_provider: Arc, + pub lifecycle: Option>, + pub search: Option>, +} + +impl WorkflowExecutor { + pub fn new( + persistence: Arc, + lock_provider: Arc, + queue_provider: Arc, + ) -> Self { + Self { + persistence, + lock_provider, + queue_provider, + lifecycle: None, + search: None, + } + } + + pub fn with_lifecycle(mut self, lifecycle: Arc) -> Self { + self.lifecycle = Some(lifecycle); + self + } + + pub fn with_search(mut self, search: Arc) -> Self { + self.search = Some(search); + self + } + + /// Execute a single workflow instance. + /// + /// 1. Acquire lock + /// 2. Load instance + /// 3. Find runnable pointers + /// 4. Execute each runnable pointer's step + /// 5. Process results + /// 6. Check for completion + /// 7. Persist + /// 8. Release lock + pub async fn execute( + &self, + workflow_id: &str, + definition: &WorkflowDefinition, + step_registry: &StepRegistry, + ) -> Result<()> { + // 1. Acquire distributed lock. + let acquired = self.lock_provider.acquire_lock(workflow_id).await?; + if !acquired { + debug!(workflow_id, "Could not acquire lock, skipping"); + return Ok(()); + } + + let result = self + .execute_inner(workflow_id, definition, step_registry) + .await; + + // 7. Release lock (always). + if let Err(e) = self.lock_provider.release_lock(workflow_id).await { + error!(workflow_id, error = %e, "Failed to release lock"); + } + + result + } + + async fn execute_inner( + &self, + workflow_id: &str, + definition: &WorkflowDefinition, + step_registry: &StepRegistry, + ) -> Result<()> { + // 2. Load workflow instance. + let mut workflow = self + .persistence + .get_workflow_instance(workflow_id) + .await?; + + if workflow.status != WorkflowStatus::Runnable { + debug!(workflow_id, status = ?workflow.status, "Workflow not runnable, skipping"); + return Ok(()); + } + + let now = Utc::now(); + let mut all_subscriptions = Vec::new(); + let mut execution_errors = Vec::new(); + + // 3. Find runnable execution pointers. + let runnable_indices: Vec = workflow + .execution_pointers + .iter() + .enumerate() + .filter(|(_, p)| is_pointer_runnable(p, now)) + .map(|(i, _)| i) + .collect(); + + // 4. For each runnable pointer, execute the step. + for idx in runnable_indices { + let step_id = workflow.execution_pointers[idx].step_id; + + // a. Look up the WorkflowStep. + let step = definition + .steps + .iter() + .find(|s| s.id == step_id) + .ok_or(WfeError::StepNotFound(step_id))?; + + // b. Resolve the step body. + let mut step_body = step_registry + .resolve(&step.step_type) + .ok_or_else(|| WfeError::StepExecution(format!( + "Step type not found in registry: {}", + step.step_type + )))?; + + // Mark pointer as running before building context. + if workflow.execution_pointers[idx].start_time.is_none() { + workflow.execution_pointers[idx].start_time = Some(Utc::now()); + } + workflow.execution_pointers[idx].status = PointerStatus::Running; + + // c. Build StepExecutionContext (borrows workflow immutably). + let cancellation_token = tokio_util::sync::CancellationToken::new(); + let context = StepExecutionContext { + item: workflow.execution_pointers[idx].context_item.as_ref(), + execution_pointer: &workflow.execution_pointers[idx], + persistence_data: workflow.execution_pointers[idx].persistence_data.as_ref(), + step, + workflow: &workflow, + cancellation_token, + }; + + // d. Call step.run(context). + let step_result = step_body.run(&context).await; + + // Now we can mutate again since context is dropped. + match step_result { + Ok(result) => { + // e. Process the ExecutionResult. + // Extract workflow_id before mutable borrow. + let wf_id = workflow.id.clone(); + let process_result = { + let pointer = &mut workflow.execution_pointers[idx]; + result_processor::process_result( + &result, + pointer, + definition, + &wf_id, + ) + }; + + all_subscriptions.extend(process_result.subscriptions); + + // Add new pointers. + for new_pointer in process_result.new_pointers { + workflow.execution_pointers.push(new_pointer); + } + } + Err(e) => { + // f. Handle error. + let error_msg = e.to_string(); + warn!(workflow_id, step_id, error = %error_msg, "Step execution failed"); + + let pointer_id = workflow.execution_pointers[idx].id.clone(); + execution_errors.push(ExecutionError::new( + workflow_id, + &pointer_id, + &error_msg, + )); + + let handler_result = { + let pointer = &mut workflow.execution_pointers[idx]; + error_handler::handle_error( + &error_msg, + pointer, + definition, + ) + }; + + // Apply workflow-level status changes from error handler. + if let Some(new_status) = handler_result.workflow_status { + workflow.status = new_status; + if new_status == WorkflowStatus::Terminated { + workflow.complete_time = Some(Utc::now()); + } + } + + for new_pointer in handler_result.new_pointers { + workflow.execution_pointers.push(new_pointer); + } + } + } + } + + // 5. Check if all pointers are complete -> set workflow status to Complete. + let all_done = !workflow.execution_pointers.is_empty() + && workflow.execution_pointers.iter().all(|p| { + matches!( + p.status, + PointerStatus::Complete + | PointerStatus::Compensated + | PointerStatus::Cancelled + | PointerStatus::Failed + ) + }); + + if all_done && workflow.status == WorkflowStatus::Runnable { + workflow.status = WorkflowStatus::Complete; + workflow.complete_time = Some(Utc::now()); + } + + // Determine next_execution. + let has_active = workflow.execution_pointers.iter().any(|p| p.active); + if has_active { + workflow.next_execution = Some(0); + } else if workflow.status == WorkflowStatus::Runnable { + // Still runnable but no active pointers - might have sleeping/waiting pointers. + let next_sleep = workflow + .execution_pointers + .iter() + .filter(|p| p.status == PointerStatus::Sleeping) + .filter_map(|p| p.sleep_until) + .min(); + workflow.next_execution = next_sleep.map(|t| t.timestamp_millis()); + } else { + workflow.next_execution = None; + } + + // 6. Persist updated WorkflowInstance. + if all_subscriptions.is_empty() { + self.persistence.persist_workflow(&workflow).await?; + } else { + self.persistence + .persist_workflow_with_subscriptions(&workflow, &all_subscriptions) + .await?; + } + + // Persist errors. + if !execution_errors.is_empty() { + self.persistence + .persist_errors(&execution_errors) + .await?; + } + + // 8. Queue any follow-up work. + if workflow.status == WorkflowStatus::Runnable + && has_active + && let Err(e) = self + .queue_provider + .queue_work(workflow_id, QueueType::Workflow) + .await + { + error!(workflow_id = %workflow_id, error = %e, "Failed to re-queue workflow"); + return Err(e); + } + + if let Some(ref search) = self.search + && let Err(e) = search.index_workflow(&workflow).await + { + warn!(workflow_id = %workflow_id, error = %e, "Failed to index workflow"); + } + + Ok(()) + } +} + +fn is_pointer_runnable( + pointer: &crate::models::ExecutionPointer, + now: chrono::DateTime, +) -> bool { + if !pointer.active { + return false; + } + + match pointer.status { + PointerStatus::Pending | PointerStatus::Running => { + // Check sleep_until. + if let Some(sleep_until) = pointer.sleep_until { + now >= sleep_until + } else { + true + } + } + PointerStatus::Sleeping => { + // Sleeping pointers become runnable when sleep_until passes. + if let Some(sleep_until) = pointer.sleep_until { + now >= sleep_until + } else { + true + } + } + PointerStatus::WaitingForEvent => { + // Event arrived -> event_data is set. + pointer.event_published + } + _ => false, + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::time::Duration; + + use async_trait::async_trait; + + use super::*; + use crate::models::{ + ErrorBehavior, ExecutionPointer, ExecutionResult, StepOutcome, WorkflowDefinition, + WorkflowInstance, WorkflowStep, + }; + use crate::test_support::{ + InMemoryLifecyclePublisher, InMemoryLockProvider, InMemoryPersistenceProvider, + InMemoryQueueProvider, + }; + use crate::traits::{StepBody, StepExecutionContext, WorkflowRepository}; + + // -- Test step implementations -- + + #[derive(Default)] + struct PassStep; + + #[async_trait] + impl StepBody for PassStep { + async fn run( + &mut self, + _ctx: &StepExecutionContext<'_>, + ) -> crate::Result { + Ok(ExecutionResult::next()) + } + } + + #[derive(Default)] + struct OutcomeStep; + + #[async_trait] + impl StepBody for OutcomeStep { + async fn run( + &mut self, + _ctx: &StepExecutionContext<'_>, + ) -> crate::Result { + Ok(ExecutionResult::outcome(serde_json::json!("yes"))) + } + } + + #[derive(Default)] + struct PersistStep; + + #[async_trait] + impl StepBody for PersistStep { + async fn run( + &mut self, + _ctx: &StepExecutionContext<'_>, + ) -> crate::Result { + Ok(ExecutionResult::persist(serde_json::json!({"count": 1}))) + } + } + + #[derive(Default)] + struct SleepStep; + + #[async_trait] + impl StepBody for SleepStep { + async fn run( + &mut self, + _ctx: &StepExecutionContext<'_>, + ) -> crate::Result { + Ok(ExecutionResult::sleep(Duration::from_secs(30), None)) + } + } + + #[derive(Default)] + struct WaitEventStep; + + #[async_trait] + impl StepBody for WaitEventStep { + async fn run( + &mut self, + _ctx: &StepExecutionContext<'_>, + ) -> crate::Result { + Ok(ExecutionResult::wait_for_event( + "order.completed", + "order-123", + Utc::now(), + )) + } + } + + #[derive(Default)] + struct EventResumeStep; + + #[async_trait] + impl StepBody for EventResumeStep { + async fn run( + &mut self, + ctx: &StepExecutionContext<'_>, + ) -> crate::Result { + if ctx.execution_pointer.event_published { + Ok(ExecutionResult::next()) + } else { + Ok(ExecutionResult::wait_for_event( + "order.completed", + "order-123", + Utc::now(), + )) + } + } + } + + #[derive(Default)] + struct BranchStep; + + #[async_trait] + impl StepBody for BranchStep { + async fn run( + &mut self, + _ctx: &StepExecutionContext<'_>, + ) -> crate::Result { + Ok(ExecutionResult::branch( + vec![ + serde_json::json!(1), + serde_json::json!(2), + serde_json::json!(3), + ], + None, + )) + } + } + + #[derive(Default)] + struct FailStep; + + #[async_trait] + impl StepBody for FailStep { + async fn run( + &mut self, + _ctx: &StepExecutionContext<'_>, + ) -> crate::Result { + Err(WfeError::StepExecution("step failed".into())) + } + } + + #[derive(Default)] + struct CompensateStep; + + #[async_trait] + impl StepBody for CompensateStep { + async fn run( + &mut self, + _ctx: &StepExecutionContext<'_>, + ) -> crate::Result { + Ok(ExecutionResult::next()) + } + } + + // -- Helper functions -- + + fn create_providers() -> ( + Arc, + Arc, + Arc, + ) { + ( + Arc::new(InMemoryPersistenceProvider::new()), + Arc::new(InMemoryLockProvider::new()), + Arc::new(InMemoryQueueProvider::new()), + ) + } + + fn create_executor( + persistence: Arc, + lock: Arc, + queue: Arc, + ) -> WorkflowExecutor { + WorkflowExecutor::new(persistence, lock, queue) + } + + fn step_type() -> String { + std::any::type_name::().to_string() + } + + // -- Tests -- + + #[tokio::test] + async fn single_step_workflow_completes() { + let (persistence, lock, queue) = create_providers(); + let executor = create_executor(persistence.clone(), lock, queue); + + let mut registry = StepRegistry::new(); + registry.register::(); + + let mut def = WorkflowDefinition::new("test", 1); + def.steps.push(WorkflowStep::new(0, step_type::())); + + let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({})); + let pointer = ExecutionPointer::new(0); + instance.execution_pointers.push(pointer); + + persistence.create_new_workflow(&instance).await.unwrap(); + + executor.execute(&instance.id, &def, ®istry).await.unwrap(); + + let updated = persistence.get_workflow_instance(&instance.id).await.unwrap(); + assert_eq!(updated.status, WorkflowStatus::Complete); + assert_eq!(updated.execution_pointers[0].status, PointerStatus::Complete); + assert!(updated.complete_time.is_some()); + } + + #[tokio::test] + async fn linear_two_step_execution() { + let (persistence, lock, queue) = create_providers(); + let executor = create_executor(persistence.clone(), lock, queue); + + let mut registry = StepRegistry::new(); + registry.register::(); + + let mut def = WorkflowDefinition::new("test", 1); + let mut step0 = WorkflowStep::new(0, step_type::()); + step0.outcomes.push(StepOutcome { + next_step: 1, + label: None, + value: None, + }); + def.steps.push(step0); + def.steps.push(WorkflowStep::new(1, step_type::())); + + let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({})); + instance.execution_pointers.push(ExecutionPointer::new(0)); + persistence.create_new_workflow(&instance).await.unwrap(); + + // First execution: step 0 completes, step 1 pointer created. + executor.execute(&instance.id, &def, ®istry).await.unwrap(); + + let updated = persistence.get_workflow_instance(&instance.id).await.unwrap(); + assert_eq!(updated.execution_pointers.len(), 2); + assert_eq!(updated.execution_pointers[0].status, PointerStatus::Complete); + // Step 1 pointer should be active and pending. + assert_eq!(updated.execution_pointers[1].step_id, 1); + + // Second execution: step 1 completes. + executor.execute(&instance.id, &def, ®istry).await.unwrap(); + + let updated = persistence.get_workflow_instance(&instance.id).await.unwrap(); + assert_eq!(updated.status, WorkflowStatus::Complete); + assert_eq!(updated.execution_pointers[1].status, PointerStatus::Complete); + } + + #[tokio::test] + async fn linear_three_step_execution() { + let (persistence, lock, queue) = create_providers(); + let executor = create_executor(persistence.clone(), lock, queue); + + let mut registry = StepRegistry::new(); + registry.register::(); + + let mut def = WorkflowDefinition::new("test", 1); + let mut s0 = WorkflowStep::new(0, step_type::()); + s0.outcomes.push(StepOutcome { next_step: 1, label: None, value: None }); + let mut s1 = WorkflowStep::new(1, step_type::()); + s1.outcomes.push(StepOutcome { next_step: 2, label: None, value: None }); + let s2 = WorkflowStep::new(2, step_type::()); + def.steps.push(s0); + def.steps.push(s1); + def.steps.push(s2); + + let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({})); + instance.execution_pointers.push(ExecutionPointer::new(0)); + persistence.create_new_workflow(&instance).await.unwrap(); + + // Execute three times for three steps. + for _ in 0..3 { + executor.execute(&instance.id, &def, ®istry).await.unwrap(); + } + + let updated = persistence.get_workflow_instance(&instance.id).await.unwrap(); + assert_eq!(updated.status, WorkflowStatus::Complete); + assert_eq!(updated.execution_pointers.len(), 3); + for p in &updated.execution_pointers { + assert_eq!(p.status, PointerStatus::Complete); + } + } + + #[tokio::test] + async fn step_with_outcome_routes_correctly() { + let (persistence, lock, queue) = create_providers(); + let executor = create_executor(persistence.clone(), lock, queue); + + let mut registry = StepRegistry::new(); + registry.register::(); + registry.register::(); + + let mut def = WorkflowDefinition::new("test", 1); + let mut s0 = WorkflowStep::new(0, step_type::()); + s0.outcomes.push(StepOutcome { + next_step: 1, + label: Some("no".into()), + value: Some(serde_json::json!("no")), + }); + s0.outcomes.push(StepOutcome { + next_step: 2, + label: Some("yes".into()), + value: Some(serde_json::json!("yes")), + }); + def.steps.push(s0); + def.steps.push(WorkflowStep::new(1, step_type::())); + def.steps.push(WorkflowStep::new(2, step_type::())); + + let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({})); + instance.execution_pointers.push(ExecutionPointer::new(0)); + persistence.create_new_workflow(&instance).await.unwrap(); + + executor.execute(&instance.id, &def, ®istry).await.unwrap(); + + let updated = persistence.get_workflow_instance(&instance.id).await.unwrap(); + assert_eq!(updated.execution_pointers.len(), 2); + // Should route to step 2 (the "yes" branch). + assert_eq!(updated.execution_pointers[1].step_id, 2); + } + + #[tokio::test] + async fn step_persist_keeps_pointer_active() { + let (persistence, lock, queue) = create_providers(); + let executor = create_executor(persistence.clone(), lock, queue); + + let mut registry = StepRegistry::new(); + registry.register::(); + + let mut def = WorkflowDefinition::new("test", 1); + def.steps.push(WorkflowStep::new(0, step_type::())); + + let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({})); + instance.execution_pointers.push(ExecutionPointer::new(0)); + persistence.create_new_workflow(&instance).await.unwrap(); + + executor.execute(&instance.id, &def, ®istry).await.unwrap(); + + let updated = persistence.get_workflow_instance(&instance.id).await.unwrap(); + assert_eq!(updated.status, WorkflowStatus::Runnable); + assert!(updated.execution_pointers[0].active); + assert_eq!( + updated.execution_pointers[0].persistence_data, + Some(serde_json::json!({"count": 1})) + ); + } + + #[tokio::test] + async fn step_sleep_sets_sleeping_status() { + let (persistence, lock, queue) = create_providers(); + let executor = create_executor(persistence.clone(), lock, queue); + + let mut registry = StepRegistry::new(); + registry.register::(); + + let mut def = WorkflowDefinition::new("test", 1); + def.steps.push(WorkflowStep::new(0, step_type::())); + + let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({})); + instance.execution_pointers.push(ExecutionPointer::new(0)); + persistence.create_new_workflow(&instance).await.unwrap(); + + executor.execute(&instance.id, &def, ®istry).await.unwrap(); + + let updated = persistence.get_workflow_instance(&instance.id).await.unwrap(); + assert_eq!(updated.execution_pointers[0].status, PointerStatus::Sleeping); + assert!(updated.execution_pointers[0].sleep_until.is_some()); + assert!(updated.execution_pointers[0].active); + } + + #[tokio::test] + async fn step_wait_for_event_creates_subscription() { + let (persistence, lock, queue) = create_providers(); + let executor = create_executor(persistence.clone(), lock, queue); + + let mut registry = StepRegistry::new(); + registry.register::(); + + let mut def = WorkflowDefinition::new("test", 1); + def.steps.push(WorkflowStep::new(0, step_type::())); + + let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({})); + instance.execution_pointers.push(ExecutionPointer::new(0)); + persistence.create_new_workflow(&instance).await.unwrap(); + + executor.execute(&instance.id, &def, ®istry).await.unwrap(); + + let updated = persistence.get_workflow_instance(&instance.id).await.unwrap(); + assert_eq!( + updated.execution_pointers[0].status, + PointerStatus::WaitingForEvent + ); + assert!(!updated.execution_pointers[0].active); + + // Check subscription was created. + use crate::traits::SubscriptionRepository; + let subs = persistence + .get_subscriptions("order.completed", "order-123", Utc::now()) + .await + .unwrap(); + assert_eq!(subs.len(), 1); + assert_eq!(subs[0].workflow_id, instance.id); + } + + #[tokio::test] + async fn event_arrived_resumes_pointer() { + let (persistence, lock, queue) = create_providers(); + let executor = create_executor(persistence.clone(), lock, queue); + + let mut registry = StepRegistry::new(); + registry.register::(); + + let mut def = WorkflowDefinition::new("test", 1); + def.steps.push(WorkflowStep::new(0, step_type::())); + + let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({})); + let mut pointer = ExecutionPointer::new(0); + // Simulate event arrived: pointer was waiting, now event is published. + pointer.event_published = true; + pointer.event_data = Some(serde_json::json!({"order_id": "123"})); + pointer.status = PointerStatus::WaitingForEvent; + pointer.active = true; // Re-activated by event processor. + instance.execution_pointers.push(pointer); + persistence.create_new_workflow(&instance).await.unwrap(); + + executor.execute(&instance.id, &def, ®istry).await.unwrap(); + + let updated = persistence.get_workflow_instance(&instance.id).await.unwrap(); + assert_eq!(updated.execution_pointers[0].status, PointerStatus::Complete); + assert_eq!(updated.status, WorkflowStatus::Complete); + } + + #[tokio::test] + async fn branch_creates_child_pointers() { + let (persistence, lock, queue) = create_providers(); + let executor = create_executor(persistence.clone(), lock, queue); + + let mut registry = StepRegistry::new(); + registry.register::(); + registry.register::(); + + let mut def = WorkflowDefinition::new("test", 1); + let mut s0 = WorkflowStep::new(0, step_type::()); + s0.children.push(1); + def.steps.push(s0); + def.steps.push(WorkflowStep::new(1, step_type::())); + + let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({})); + instance.execution_pointers.push(ExecutionPointer::new(0)); + persistence.create_new_workflow(&instance).await.unwrap(); + + executor.execute(&instance.id, &def, ®istry).await.unwrap(); + + let updated = persistence.get_workflow_instance(&instance.id).await.unwrap(); + // 1 original + 3 children. + assert_eq!(updated.execution_pointers.len(), 4); + // Children should have scope containing the parent pointer id. + let parent_id = &updated.execution_pointers[0].id; + for child in &updated.execution_pointers[1..] { + assert!(child.scope.contains(parent_id)); + assert_eq!(child.step_id, 1); + assert!(child.context_item.is_some()); + } + // Parent should track children. + assert_eq!(updated.execution_pointers[0].children.len(), 3); + } + + #[tokio::test] + async fn error_retry_increments_count() { + let (persistence, lock, queue) = create_providers(); + let executor = create_executor(persistence.clone(), lock, queue); + + let mut registry = StepRegistry::new(); + registry.register::(); + + let mut def = WorkflowDefinition::new("test", 1); + let mut s0 = WorkflowStep::new(0, step_type::()); + s0.error_behavior = Some(ErrorBehavior::Retry { + interval: Duration::from_secs(10), + max_retries: 0, + }); + def.steps.push(s0); + + let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({})); + instance.execution_pointers.push(ExecutionPointer::new(0)); + persistence.create_new_workflow(&instance).await.unwrap(); + + executor.execute(&instance.id, &def, ®istry).await.unwrap(); + + let updated = persistence.get_workflow_instance(&instance.id).await.unwrap(); + assert_eq!(updated.execution_pointers[0].retry_count, 1); + assert_eq!(updated.execution_pointers[0].status, PointerStatus::Sleeping); + assert!(updated.execution_pointers[0].sleep_until.is_some()); + assert_eq!(updated.status, WorkflowStatus::Runnable); + } + + #[tokio::test] + async fn error_suspend_pauses_workflow() { + let (persistence, lock, queue) = create_providers(); + let executor = create_executor(persistence.clone(), lock, queue); + + let mut registry = StepRegistry::new(); + registry.register::(); + + let mut def = WorkflowDefinition::new("test", 1); + let mut s0 = WorkflowStep::new(0, step_type::()); + s0.error_behavior = Some(ErrorBehavior::Suspend); + def.steps.push(s0); + + let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({})); + instance.execution_pointers.push(ExecutionPointer::new(0)); + persistence.create_new_workflow(&instance).await.unwrap(); + + executor.execute(&instance.id, &def, ®istry).await.unwrap(); + + let updated = persistence.get_workflow_instance(&instance.id).await.unwrap(); + assert_eq!(updated.status, WorkflowStatus::Suspended); + assert_eq!(updated.execution_pointers[0].status, PointerStatus::Failed); + } + + #[tokio::test] + async fn error_terminate_ends_workflow() { + let (persistence, lock, queue) = create_providers(); + let executor = create_executor(persistence.clone(), lock, queue); + + let mut registry = StepRegistry::new(); + registry.register::(); + + let mut def = WorkflowDefinition::new("test", 1); + let mut s0 = WorkflowStep::new(0, step_type::()); + s0.error_behavior = Some(ErrorBehavior::Terminate); + def.steps.push(s0); + + let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({})); + instance.execution_pointers.push(ExecutionPointer::new(0)); + persistence.create_new_workflow(&instance).await.unwrap(); + + executor.execute(&instance.id, &def, ®istry).await.unwrap(); + + let updated = persistence.get_workflow_instance(&instance.id).await.unwrap(); + assert_eq!(updated.status, WorkflowStatus::Terminated); + assert_eq!(updated.execution_pointers[0].status, PointerStatus::Failed); + assert!(updated.complete_time.is_some()); + } + + #[tokio::test] + async fn compensation_on_error() { + let (persistence, lock, queue) = create_providers(); + let executor = create_executor(persistence.clone(), lock, queue); + + let mut registry = StepRegistry::new(); + registry.register::(); + registry.register::(); + + let mut def = WorkflowDefinition::new("test", 1); + let mut s0 = WorkflowStep::new(0, step_type::()); + s0.error_behavior = Some(ErrorBehavior::Compensate); + s0.compensation_step_id = Some(1); + def.steps.push(s0); + def.steps.push(WorkflowStep::new(1, step_type::())); + + let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({})); + instance.execution_pointers.push(ExecutionPointer::new(0)); + persistence.create_new_workflow(&instance).await.unwrap(); + + executor.execute(&instance.id, &def, ®istry).await.unwrap(); + + let updated = persistence.get_workflow_instance(&instance.id).await.unwrap(); + assert_eq!(updated.execution_pointers[0].status, PointerStatus::Failed); + // Compensation pointer should be created. + assert_eq!(updated.execution_pointers.len(), 2); + assert_eq!(updated.execution_pointers[1].step_id, 1); + assert!(updated.execution_pointers[1].active); + } + + #[tokio::test] + async fn workflow_completes_when_all_pointers_done() { + let (persistence, lock, queue) = create_providers(); + let executor = create_executor(persistence.clone(), lock, queue); + + let mut registry = StepRegistry::new(); + registry.register::(); + + let mut def = WorkflowDefinition::new("test", 1); + def.steps.push(WorkflowStep::new(0, step_type::())); + def.steps.push(WorkflowStep::new(1, step_type::())); + + let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({})); + // Two independent active pointers. + instance.execution_pointers.push(ExecutionPointer::new(0)); + instance.execution_pointers.push(ExecutionPointer::new(1)); + persistence.create_new_workflow(&instance).await.unwrap(); + + executor.execute(&instance.id, &def, ®istry).await.unwrap(); + + let updated = persistence.get_workflow_instance(&instance.id).await.unwrap(); + assert_eq!(updated.status, WorkflowStatus::Complete); + assert!(updated + .execution_pointers + .iter() + .all(|p| p.status == PointerStatus::Complete)); + } + + #[tokio::test] + async fn step_registry_register_and_resolve() { + let mut registry = StepRegistry::new(); + registry.register::(); + + let resolved = registry.resolve(&step_type::()); + assert!(resolved.is_some()); + + let unresolved = registry.resolve("NonExistentStep"); + assert!(unresolved.is_none()); + } + + #[tokio::test] + async fn executor_skips_non_runnable_workflow() { + let (persistence, lock, queue) = create_providers(); + let executor = create_executor(persistence.clone(), lock, queue); + + let registry = StepRegistry::new(); + let def = WorkflowDefinition::new("test", 1); + + let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({})); + instance.status = WorkflowStatus::Complete; + persistence.create_new_workflow(&instance).await.unwrap(); + + // Should not error on a completed workflow. + executor.execute(&instance.id, &def, ®istry).await.unwrap(); + + let updated = persistence.get_workflow_instance(&instance.id).await.unwrap(); + assert_eq!(updated.status, WorkflowStatus::Complete); + } + + #[tokio::test] + async fn sleeping_pointer_not_runnable_before_wakeup() { + let (persistence, lock, queue) = create_providers(); + let executor = create_executor(persistence.clone(), lock, queue); + + let mut registry = StepRegistry::new(); + registry.register::(); + + let mut def = WorkflowDefinition::new("test", 1); + def.steps.push(WorkflowStep::new(0, step_type::())); + + let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({})); + let mut pointer = ExecutionPointer::new(0); + pointer.status = PointerStatus::Sleeping; + pointer.active = true; + pointer.sleep_until = Some(Utc::now() + chrono::Duration::hours(1)); + instance.execution_pointers.push(pointer); + persistence.create_new_workflow(&instance).await.unwrap(); + + executor.execute(&instance.id, &def, ®istry).await.unwrap(); + + let updated = persistence.get_workflow_instance(&instance.id).await.unwrap(); + // Should still be sleeping since sleep_until is in the future. + assert_eq!(updated.execution_pointers[0].status, PointerStatus::Sleeping); + } + + #[tokio::test] + async fn error_persists_execution_error() { + let (persistence, lock, queue) = create_providers(); + let executor = create_executor(persistence.clone(), lock, queue); + + let mut registry = StepRegistry::new(); + registry.register::(); + + let mut def = WorkflowDefinition::new("test", 1); + let mut s0 = WorkflowStep::new(0, step_type::()); + s0.error_behavior = Some(ErrorBehavior::Suspend); + def.steps.push(s0); + + let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({})); + instance.execution_pointers.push(ExecutionPointer::new(0)); + persistence.create_new_workflow(&instance).await.unwrap(); + + executor.execute(&instance.id, &def, ®istry).await.unwrap(); + + let errors = persistence.get_errors().await; + assert_eq!(errors.len(), 1); + assert_eq!(errors[0].workflow_id, instance.id); + } + + #[tokio::test] + async fn lifecycle_events_published() { + let (persistence, lock, queue) = create_providers(); + let lifecycle = Arc::new(InMemoryLifecyclePublisher::new()); + let executor = create_executor(persistence.clone(), lock, queue) + .with_lifecycle(lifecycle.clone()); + + let mut registry = StepRegistry::new(); + registry.register::(); + + let mut def = WorkflowDefinition::new("test", 1); + def.steps.push(WorkflowStep::new(0, step_type::())); + + let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({})); + instance.execution_pointers.push(ExecutionPointer::new(0)); + persistence.create_new_workflow(&instance).await.unwrap(); + + executor.execute(&instance.id, &def, ®istry).await.unwrap(); + + // Executor itself doesn't publish lifecycle events in the current implementation, + // but the with_lifecycle builder works correctly. + let updated = persistence.get_workflow_instance(&instance.id).await.unwrap(); + assert_eq!(updated.status, WorkflowStatus::Complete); + } + + #[tokio::test] + async fn default_error_behavior_used_when_step_has_none() { + let (persistence, lock, queue) = create_providers(); + let executor = create_executor(persistence.clone(), lock, queue); + + let mut registry = StepRegistry::new(); + registry.register::(); + + let mut def = WorkflowDefinition::new("test", 1); + def.default_error_behavior = ErrorBehavior::Terminate; + // Step has no error_behavior override. + def.steps.push(WorkflowStep::new(0, step_type::())); + + let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({})); + instance.execution_pointers.push(ExecutionPointer::new(0)); + persistence.create_new_workflow(&instance).await.unwrap(); + + executor.execute(&instance.id, &def, ®istry).await.unwrap(); + + let updated = persistence.get_workflow_instance(&instance.id).await.unwrap(); + assert_eq!(updated.status, WorkflowStatus::Terminated); + } + + #[tokio::test] + async fn pointer_start_time_set_on_first_execution() { + let (persistence, lock, queue) = create_providers(); + let executor = create_executor(persistence.clone(), lock, queue); + + let mut registry = StepRegistry::new(); + registry.register::(); + + let mut def = WorkflowDefinition::new("test", 1); + def.steps.push(WorkflowStep::new(0, step_type::())); + + let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({})); + instance.execution_pointers.push(ExecutionPointer::new(0)); + persistence.create_new_workflow(&instance).await.unwrap(); + + executor.execute(&instance.id, &def, ®istry).await.unwrap(); + + let updated = persistence.get_workflow_instance(&instance.id).await.unwrap(); + assert!(updated.execution_pointers[0].start_time.is_some()); + assert!(updated.execution_pointers[0].end_time.is_some()); + } + + #[tokio::test] + async fn outcome_value_stored_on_pointer() { + let (persistence, lock, queue) = create_providers(); + let executor = create_executor(persistence.clone(), lock, queue); + + let mut registry = StepRegistry::new(); + registry.register::(); + registry.register::(); + + let mut def = WorkflowDefinition::new("test", 1); + let mut s0 = WorkflowStep::new(0, step_type::()); + s0.outcomes.push(StepOutcome { + next_step: 1, + label: None, + value: Some(serde_json::json!("yes")), + }); + def.steps.push(s0); + def.steps.push(WorkflowStep::new(1, step_type::())); + + let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({})); + instance.execution_pointers.push(ExecutionPointer::new(0)); + persistence.create_new_workflow(&instance).await.unwrap(); + + executor.execute(&instance.id, &def, ®istry).await.unwrap(); + + let updated = persistence.get_workflow_instance(&instance.id).await.unwrap(); + assert_eq!( + updated.execution_pointers[0].outcome, + Some(serde_json::json!("yes")) + ); + } + + #[tokio::test] + async fn retry_then_succeed() { + // A step that fails once then succeeds. + use std::sync::atomic::{AtomicU32, Ordering}; + + static CALL_COUNT: AtomicU32 = AtomicU32::new(0); + + #[derive(Default)] + struct FailOnceStep; + + #[async_trait] + impl StepBody for FailOnceStep { + async fn run( + &mut self, + _ctx: &StepExecutionContext<'_>, + ) -> crate::Result { + let count = CALL_COUNT.fetch_add(1, Ordering::SeqCst); + if count == 0 { + Err(WfeError::StepExecution("first attempt fails".into())) + } else { + Ok(ExecutionResult::next()) + } + } + } + + CALL_COUNT.store(0, Ordering::SeqCst); + + let (persistence, lock, queue) = create_providers(); + let executor = create_executor(persistence.clone(), lock, queue); + + let mut registry = StepRegistry::new(); + registry.register::(); + + let mut def = WorkflowDefinition::new("test", 1); + let mut s0 = WorkflowStep::new(0, step_type::()); + s0.error_behavior = Some(ErrorBehavior::Retry { + interval: Duration::from_millis(0), + max_retries: 0, + }); + def.steps.push(s0); + + let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({})); + instance.execution_pointers.push(ExecutionPointer::new(0)); + persistence.create_new_workflow(&instance).await.unwrap(); + + // First execution: fails, retry scheduled. + executor.execute(&instance.id, &def, ®istry).await.unwrap(); + let updated = persistence.get_workflow_instance(&instance.id).await.unwrap(); + assert_eq!(updated.execution_pointers[0].retry_count, 1); + assert_eq!(updated.execution_pointers[0].status, PointerStatus::Sleeping); + + // Second execution: succeeds (sleep_until is in the past with 0ms interval). + executor.execute(&instance.id, &def, ®istry).await.unwrap(); + let updated = persistence.get_workflow_instance(&instance.id).await.unwrap(); + assert_eq!(updated.execution_pointers[0].status, PointerStatus::Complete); + assert_eq!(updated.status, WorkflowStatus::Complete); + } + + #[tokio::test] + async fn no_runnable_pointers_is_noop() { + let (persistence, lock, queue) = create_providers(); + let executor = create_executor(persistence.clone(), lock, queue); + + let registry = StepRegistry::new(); + let def = WorkflowDefinition::new("test", 1); + + let instance = WorkflowInstance::new("test", 1, serde_json::json!({})); + // No execution pointers at all. + persistence.create_new_workflow(&instance).await.unwrap(); + + executor.execute(&instance.id, &def, ®istry).await.unwrap(); + + let updated = persistence.get_workflow_instance(&instance.id).await.unwrap(); + assert_eq!(updated.status, WorkflowStatus::Runnable); + } +}