From d0a3f0e185b19fc3f0259edb8686cc4b97d8d607 Mon Sep 17 00:00:00 2001 From: Sienna Meridian Satterwhite Date: Wed, 25 Mar 2026 20:10:03 +0000 Subject: [PATCH] feat(wfe-core): add step primitives for workflow control flow 12 step primitives implementing StepBody: DecideStep, IfStep, WhileStep, ForEachStep, SequenceStep, DelayStep, WaitForStep, ScheduleStep, RecurStep, PollEndpointStep, SagaContainerStep, EndStep. Each primitive handles its state machine via persistence_data and branch creation for container steps. --- wfe-core/src/primitives/decide.rs | 54 ++++++ wfe-core/src/primitives/delay.rs | 76 ++++++++ wfe-core/src/primitives/end_step.rs | 35 ++++ wfe-core/src/primitives/foreach_step.rs | 223 ++++++++++++++++++++++ wfe-core/src/primitives/if_step.rs | 120 ++++++++++++ wfe-core/src/primitives/mod.rs | 55 ++++++ wfe-core/src/primitives/poll_endpoint.rs | 53 +++++ wfe-core/src/primitives/recur.rs | 134 +++++++++++++ wfe-core/src/primitives/saga_container.rs | 151 +++++++++++++++ wfe-core/src/primitives/schedule.rs | 105 ++++++++++ wfe-core/src/primitives/sequence.rs | 78 ++++++++ wfe-core/src/primitives/wait_for.rs | 98 ++++++++++ wfe-core/src/primitives/while_step.rs | 145 ++++++++++++++ 13 files changed, 1327 insertions(+) create mode 100644 wfe-core/src/primitives/decide.rs create mode 100644 wfe-core/src/primitives/delay.rs create mode 100644 wfe-core/src/primitives/end_step.rs create mode 100644 wfe-core/src/primitives/foreach_step.rs create mode 100644 wfe-core/src/primitives/if_step.rs create mode 100644 wfe-core/src/primitives/mod.rs create mode 100644 wfe-core/src/primitives/poll_endpoint.rs create mode 100644 wfe-core/src/primitives/recur.rs create mode 100644 wfe-core/src/primitives/saga_container.rs create mode 100644 wfe-core/src/primitives/schedule.rs create mode 100644 wfe-core/src/primitives/sequence.rs create mode 100644 wfe-core/src/primitives/wait_for.rs create mode 100644 wfe-core/src/primitives/while_step.rs diff --git a/wfe-core/src/primitives/decide.rs b/wfe-core/src/primitives/decide.rs new file mode 100644 index 0000000..63feeea --- /dev/null +++ b/wfe-core/src/primitives/decide.rs @@ -0,0 +1,54 @@ +use async_trait::async_trait; + +use crate::models::ExecutionResult; +use crate::traits::step::{StepBody, StepExecutionContext}; + +/// A decision step that returns an outcome value for routing. +pub struct DecideStep { + pub expression_value: serde_json::Value, +} + +#[async_trait] +impl StepBody for DecideStep { + async fn run(&mut self, _context: &StepExecutionContext<'_>) -> crate::Result { + Ok(ExecutionResult::outcome(self.expression_value.clone())) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::models::ExecutionPointer; + use crate::primitives::test_helpers::*; + use serde_json::json; + + #[tokio::test] + async fn returns_correct_outcome_value() { + let mut step = DecideStep { + expression_value: json!("route_a"), + }; + let pointer = ExecutionPointer::new(0); + let wf_step = default_step(); + let workflow = default_workflow(); + let ctx = make_context(&pointer, &wf_step, &workflow); + + let result = step.run(&ctx).await.unwrap(); + assert!(result.proceed); + assert_eq!(result.outcome_value, Some(json!("route_a"))); + } + + #[tokio::test] + async fn returns_numeric_outcome() { + let mut step = DecideStep { + expression_value: json!(42), + }; + let pointer = ExecutionPointer::new(0); + let wf_step = default_step(); + let workflow = default_workflow(); + let ctx = make_context(&pointer, &wf_step, &workflow); + + let result = step.run(&ctx).await.unwrap(); + assert!(result.proceed); + assert_eq!(result.outcome_value, Some(json!(42))); + } +} diff --git a/wfe-core/src/primitives/delay.rs b/wfe-core/src/primitives/delay.rs new file mode 100644 index 0000000..40f86e6 --- /dev/null +++ b/wfe-core/src/primitives/delay.rs @@ -0,0 +1,76 @@ +use std::time::Duration; + +use async_trait::async_trait; + +use crate::models::ExecutionResult; +use crate::traits::step::{StepBody, StepExecutionContext}; + +/// A step that sleeps for a specified duration before proceeding. +pub struct DelayStep { + pub duration: Duration, +} + +impl Default for DelayStep { + fn default() -> Self { + Self { + duration: Duration::ZERO, + } + } +} + +#[async_trait] +impl StepBody for DelayStep { + async fn run(&mut self, context: &StepExecutionContext<'_>) -> crate::Result { + // Read duration from step_config if our field is zero. + let duration = if self.duration == Duration::ZERO { + context + .step + .step_config + .as_ref() + .and_then(|c| c.get("duration_millis")) + .and_then(|v| v.as_u64()) + .map(Duration::from_millis) + .unwrap_or(self.duration) + } else { + self.duration + }; + Ok(ExecutionResult::sleep(duration, None)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::models::ExecutionPointer; + use crate::primitives::test_helpers::*; + + #[tokio::test] + async fn returns_correct_sleep_duration() { + let mut step = DelayStep { + duration: Duration::from_secs(60), + }; + let pointer = ExecutionPointer::new(0); + let wf_step = default_step(); + let workflow = default_workflow(); + let ctx = make_context(&pointer, &wf_step, &workflow); + + let result = step.run(&ctx).await.unwrap(); + assert!(!result.proceed); + assert_eq!(result.sleep_for, Some(Duration::from_secs(60))); + assert!(result.persistence_data.is_none()); + } + + #[tokio::test] + async fn returns_zero_duration() { + let mut step = DelayStep { + duration: Duration::ZERO, + }; + let pointer = ExecutionPointer::new(0); + let wf_step = default_step(); + let workflow = default_workflow(); + let ctx = make_context(&pointer, &wf_step, &workflow); + + let result = step.run(&ctx).await.unwrap(); + assert_eq!(result.sleep_for, Some(Duration::ZERO)); + } +} diff --git a/wfe-core/src/primitives/end_step.rs b/wfe-core/src/primitives/end_step.rs new file mode 100644 index 0000000..742dd72 --- /dev/null +++ b/wfe-core/src/primitives/end_step.rs @@ -0,0 +1,35 @@ +use async_trait::async_trait; + +use crate::models::ExecutionResult; +use crate::traits::step::{StepBody, StepExecutionContext}; + +/// A no-op marker step indicating the end of a workflow branch. +pub struct EndStep; + +#[async_trait] +impl StepBody for EndStep { + async fn run(&mut self, _context: &StepExecutionContext<'_>) -> crate::Result { + Ok(ExecutionResult::next()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::models::ExecutionPointer; + use crate::primitives::test_helpers::*; + + #[tokio::test] + async fn always_returns_next() { + let mut step = EndStep; + let pointer = ExecutionPointer::new(0); + let wf_step = default_step(); + let workflow = default_workflow(); + let ctx = make_context(&pointer, &wf_step, &workflow); + + let result = step.run(&ctx).await.unwrap(); + assert!(result.proceed); + assert!(result.outcome_value.is_none()); + assert!(result.persistence_data.is_none()); + } +} diff --git a/wfe-core/src/primitives/foreach_step.rs b/wfe-core/src/primitives/foreach_step.rs new file mode 100644 index 0000000..08b01bc --- /dev/null +++ b/wfe-core/src/primitives/foreach_step.rs @@ -0,0 +1,223 @@ +use async_trait::async_trait; +use serde_json::json; + +use crate::models::ExecutionResult; +use crate::traits::step::{StepBody, StepExecutionContext}; + +/// A step that iterates over a collection, branching for each element. +pub struct ForEachStep { + pub collection: Vec, + pub run_parallel: bool, +} + +impl Default for ForEachStep { + fn default() -> Self { + Self { + collection: Vec::new(), + run_parallel: true, + } + } +} + +#[async_trait] +impl StepBody for ForEachStep { + async fn run(&mut self, context: &StepExecutionContext<'_>) -> crate::Result { + if self.collection.is_empty() { + return Ok(ExecutionResult::next()); + } + + if self.run_parallel { + // Parallel: branch with all collection values at once. + let children_active = context + .persistence_data + .and_then(|d| d.get("children_active")) + .and_then(|v| v.as_bool()) + .unwrap_or(false); + + if children_active { + let mut scope = context.execution_pointer.scope.clone(); + scope.push(context.execution_pointer.id.clone()); + + if context.workflow.is_branch_complete(&scope) { + Ok(ExecutionResult::next()) + } else { + Ok(ExecutionResult::persist(json!({"children_active": true}))) + } + } else { + Ok(ExecutionResult::branch( + self.collection.clone(), + Some(json!({"children_active": true})), + )) + } + } else { + // Sequential: process one item at a time using current_index. + let current_index = context + .persistence_data + .and_then(|d| d.get("current_index")) + .and_then(|v| v.as_u64()) + .unwrap_or(0) as usize; + + let children_active = context + .persistence_data + .and_then(|d| d.get("children_active")) + .and_then(|v| v.as_bool()) + .unwrap_or(false); + + if children_active { + // Check if current child is complete. + let mut scope = context.execution_pointer.scope.clone(); + scope.push(context.execution_pointer.id.clone()); + + if context.workflow.is_branch_complete(&scope) { + let next_index = current_index + 1; + if next_index >= self.collection.len() { + // All items processed. + Ok(ExecutionResult::next()) + } else { + // Advance to next item. + Ok(ExecutionResult::branch( + vec![self.collection[next_index].clone()], + Some(json!({"children_active": true, "current_index": next_index})), + )) + } + } else { + Ok(ExecutionResult::persist( + json!({"children_active": true, "current_index": current_index}), + )) + } + } else { + // Start first item. + Ok(ExecutionResult::branch( + vec![self.collection[current_index].clone()], + Some(json!({"children_active": true, "current_index": current_index})), + )) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::models::{ExecutionPointer, PointerStatus}; + use crate::primitives::test_helpers::*; + + #[tokio::test] + async fn empty_collection_proceeds() { + let mut step = ForEachStep { + collection: vec![], + run_parallel: true, + }; + let pointer = ExecutionPointer::new(0); + let wf_step = default_step(); + let workflow = default_workflow(); + let ctx = make_context(&pointer, &wf_step, &workflow); + + let result = step.run(&ctx).await.unwrap(); + assert!(result.proceed); + } + + #[tokio::test] + async fn parallel_branches_all_items() { + let mut step = ForEachStep { + collection: vec![json!(1), json!(2), json!(3)], + run_parallel: true, + }; + let pointer = ExecutionPointer::new(0); + let wf_step = default_step(); + let workflow = default_workflow(); + let ctx = make_context(&pointer, &wf_step, &workflow); + + let result = step.run(&ctx).await.unwrap(); + assert!(!result.proceed); + assert_eq!(result.branch_values, Some(vec![json!(1), json!(2), json!(3)])); + } + + #[tokio::test] + async fn parallel_complete_proceeds() { + let mut step = ForEachStep { + collection: vec![json!(1), json!(2)], + run_parallel: true, + }; + let mut pointer = ExecutionPointer::new(0); + pointer.persistence_data = Some(json!({"children_active": true})); + + let wf_step = default_step(); + let mut workflow = default_workflow(); + let mut child = ExecutionPointer::new(1); + child.scope = vec![pointer.id.clone()]; + child.status = PointerStatus::Complete; + workflow.execution_pointers.push(child); + + let ctx = make_context(&pointer, &wf_step, &workflow); + let result = step.run(&ctx).await.unwrap(); + assert!(result.proceed); + } + + #[tokio::test] + async fn sequential_starts_first_item() { + let mut step = ForEachStep { + collection: vec![json!("a"), json!("b"), json!("c")], + run_parallel: false, + }; + let pointer = ExecutionPointer::new(0); + let wf_step = default_step(); + let workflow = default_workflow(); + let ctx = make_context(&pointer, &wf_step, &workflow); + + let result = step.run(&ctx).await.unwrap(); + assert!(!result.proceed); + assert_eq!(result.branch_values, Some(vec![json!("a")])); + assert_eq!( + result.persistence_data, + Some(json!({"children_active": true, "current_index": 0})) + ); + } + + #[tokio::test] + async fn sequential_advances_to_next_item() { + let mut step = ForEachStep { + collection: vec![json!("a"), json!("b"), json!("c")], + run_parallel: false, + }; + let mut pointer = ExecutionPointer::new(0); + pointer.persistence_data = Some(json!({"children_active": true, "current_index": 0})); + + let wf_step = default_step(); + let mut workflow = default_workflow(); + let mut child = ExecutionPointer::new(1); + child.scope = vec![pointer.id.clone()]; + child.status = PointerStatus::Complete; + workflow.execution_pointers.push(child); + + let ctx = make_context(&pointer, &wf_step, &workflow); + let result = step.run(&ctx).await.unwrap(); + assert!(!result.proceed); + assert_eq!(result.branch_values, Some(vec![json!("b")])); + assert_eq!( + result.persistence_data, + Some(json!({"children_active": true, "current_index": 1})) + ); + } + + #[tokio::test] + async fn sequential_completes_after_last_item() { + let mut step = ForEachStep { + collection: vec![json!("a"), json!("b")], + run_parallel: false, + }; + let mut pointer = ExecutionPointer::new(0); + pointer.persistence_data = Some(json!({"children_active": true, "current_index": 1})); + + let wf_step = default_step(); + let mut workflow = default_workflow(); + let mut child = ExecutionPointer::new(1); + child.scope = vec![pointer.id.clone()]; + child.status = PointerStatus::Complete; + workflow.execution_pointers.push(child); + + let ctx = make_context(&pointer, &wf_step, &workflow); + let result = step.run(&ctx).await.unwrap(); + assert!(result.proceed); + } +} diff --git a/wfe-core/src/primitives/if_step.rs b/wfe-core/src/primitives/if_step.rs new file mode 100644 index 0000000..f22a5d8 --- /dev/null +++ b/wfe-core/src/primitives/if_step.rs @@ -0,0 +1,120 @@ +use async_trait::async_trait; +use serde_json::json; + +use crate::models::ExecutionResult; +use crate::traits::step::{StepBody, StepExecutionContext}; + +/// A conditional step that branches execution based on a boolean condition. +pub struct IfStep { + pub condition: bool, +} + +#[async_trait] +impl StepBody for IfStep { + async fn run(&mut self, context: &StepExecutionContext<'_>) -> crate::Result { + let children_active = context + .persistence_data + .and_then(|d| d.get("children_active")) + .and_then(|v| v.as_bool()) + .unwrap_or(false); + + if children_active { + // Subsequent run: check if branch is complete. + let mut scope = context.execution_pointer.scope.clone(); + scope.push(context.execution_pointer.id.clone()); + + if context.workflow.is_branch_complete(&scope) { + Ok(ExecutionResult::next()) + } else { + Ok(ExecutionResult::persist(json!({"children_active": true}))) + } + } else { + // First run. + if self.condition { + Ok(ExecutionResult::branch( + vec![json!(null)], + Some(json!({"children_active": true})), + )) + } else { + Ok(ExecutionResult::next()) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::models::{ExecutionPointer, PointerStatus}; + use crate::primitives::test_helpers::*; + + #[tokio::test] + async fn condition_true_first_run_branches() { + let mut step = IfStep { condition: true }; + let pointer = ExecutionPointer::new(0); + let wf_step = default_step(); + let workflow = default_workflow(); + let ctx = make_context(&pointer, &wf_step, &workflow); + + let result = step.run(&ctx).await.unwrap(); + assert!(!result.proceed); + assert_eq!(result.branch_values, Some(vec![json!(null)])); + assert_eq!(result.persistence_data, Some(json!({"children_active": true}))); + } + + #[tokio::test] + async fn condition_false_first_run_proceeds() { + let mut step = IfStep { condition: false }; + let pointer = ExecutionPointer::new(0); + let wf_step = default_step(); + let workflow = default_workflow(); + let ctx = make_context(&pointer, &wf_step, &workflow); + + let result = step.run(&ctx).await.unwrap(); + assert!(result.proceed); + assert!(result.branch_values.is_none()); + } + + #[tokio::test] + async fn children_active_and_complete_proceeds() { + let mut step = IfStep { condition: true }; + let mut pointer = ExecutionPointer::new(0); + pointer.persistence_data = Some(json!({"children_active": true})); + + let wf_step = default_step(); + + // Create a workflow with child pointers that are all complete. + let mut workflow = default_workflow(); + let mut child = ExecutionPointer::new(1); + child.scope = vec![pointer.id.clone()]; + child.status = PointerStatus::Complete; + workflow.execution_pointers.push(child); + + let ctx = make_context(&pointer, &wf_step, &workflow); + + let result = step.run(&ctx).await.unwrap(); + assert!(result.proceed); + } + + #[tokio::test] + async fn children_active_and_incomplete_persists() { + let mut step = IfStep { condition: true }; + let mut pointer = ExecutionPointer::new(0); + pointer.persistence_data = Some(json!({"children_active": true})); + + let wf_step = default_step(); + + // Create a workflow with a child pointer that is still running. + let mut workflow = default_workflow(); + let mut child = ExecutionPointer::new(1); + child.scope = vec![pointer.id.clone()]; + child.status = PointerStatus::Running; + workflow.execution_pointers.push(child); + + let ctx = make_context(&pointer, &wf_step, &workflow); + + let result = step.run(&ctx).await.unwrap(); + assert!(!result.proceed); + assert_eq!(result.persistence_data, Some(json!({"children_active": true}))); + } +} diff --git a/wfe-core/src/primitives/mod.rs b/wfe-core/src/primitives/mod.rs new file mode 100644 index 0000000..6e6e9ea --- /dev/null +++ b/wfe-core/src/primitives/mod.rs @@ -0,0 +1,55 @@ +pub mod decide; +pub mod delay; +pub mod end_step; +pub mod foreach_step; +pub mod if_step; +pub mod poll_endpoint; +pub mod recur; +pub mod saga_container; +pub mod schedule; +pub mod sequence; +pub mod wait_for; +pub mod while_step; + +pub use decide::DecideStep; +pub use delay::DelayStep; +pub use end_step::EndStep; +pub use foreach_step::ForEachStep; +pub use if_step::IfStep; +pub use poll_endpoint::PollEndpointStep; +pub use recur::RecurStep; +pub use saga_container::SagaContainerStep; +pub use schedule::ScheduleStep; +pub use sequence::SequenceStep; +pub use wait_for::WaitForStep; +pub use while_step::WhileStep; + +#[cfg(test)] +mod test_helpers { + use crate::models::{ExecutionPointer, WorkflowInstance, WorkflowStep}; + use crate::traits::step::StepExecutionContext; + use tokio_util::sync::CancellationToken; + + pub fn make_context<'a>( + pointer: &'a ExecutionPointer, + step: &'a WorkflowStep, + workflow: &'a WorkflowInstance, + ) -> StepExecutionContext<'a> { + StepExecutionContext { + item: None, + execution_pointer: pointer, + persistence_data: pointer.persistence_data.as_ref(), + step, + workflow, + cancellation_token: CancellationToken::new(), + } + } + + pub fn default_workflow() -> WorkflowInstance { + WorkflowInstance::new("test-workflow", 1, serde_json::json!({})) + } + + pub fn default_step() -> WorkflowStep { + WorkflowStep::new(0, "TestStep") + } +} diff --git a/wfe-core/src/primitives/poll_endpoint.rs b/wfe-core/src/primitives/poll_endpoint.rs new file mode 100644 index 0000000..aabb9f4 --- /dev/null +++ b/wfe-core/src/primitives/poll_endpoint.rs @@ -0,0 +1,53 @@ +use async_trait::async_trait; + +use crate::models::poll_config::PollEndpointConfig; +use crate::models::ExecutionResult; +use crate::traits::step::{StepBody, StepExecutionContext}; + +/// A step that polls an external HTTP endpoint until a condition is met. +/// The actual HTTP polling is handled by the executor, not this step. +pub struct PollEndpointStep { + pub config: PollEndpointConfig, +} + +#[async_trait] +impl StepBody for PollEndpointStep { + async fn run(&mut self, _context: &StepExecutionContext<'_>) -> crate::Result { + Ok(ExecutionResult::poll_endpoint(self.config.clone())) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::models::poll_config::{HttpMethod, PollCondition}; + use crate::models::ExecutionPointer; + use crate::primitives::test_helpers::*; + use std::collections::HashMap; + use std::time::Duration; + + #[tokio::test] + async fn returns_poll_config() { + let config = PollEndpointConfig { + url: "https://api.example.com/status".into(), + method: HttpMethod::Get, + headers: HashMap::new(), + body: None, + interval: Duration::from_secs(10), + timeout: Duration::from_secs(300), + condition: PollCondition::StatusCode(200), + }; + + let mut step = PollEndpointStep { + config: config.clone(), + }; + let pointer = ExecutionPointer::new(0); + let wf_step = default_step(); + let workflow = default_workflow(); + let ctx = make_context(&pointer, &wf_step, &workflow); + + let result = step.run(&ctx).await.unwrap(); + assert!(!result.proceed); + assert_eq!(result.poll_endpoint, Some(config)); + } +} diff --git a/wfe-core/src/primitives/recur.rs b/wfe-core/src/primitives/recur.rs new file mode 100644 index 0000000..2125554 --- /dev/null +++ b/wfe-core/src/primitives/recur.rs @@ -0,0 +1,134 @@ +use std::time::Duration; + +use async_trait::async_trait; +use serde_json::json; + +use crate::models::ExecutionResult; +use crate::traits::step::{StepBody, StepExecutionContext}; + +/// A step that repeatedly schedules child execution at an interval +/// until a stop condition is met. +pub struct RecurStep { + pub interval: Duration, + pub stop_condition: bool, +} + +#[async_trait] +impl StepBody for RecurStep { + async fn run(&mut self, context: &StepExecutionContext<'_>) -> crate::Result { + if self.stop_condition { + return Ok(ExecutionResult::next()); + } + + let children_active = context + .persistence_data + .and_then(|d| d.get("children_active")) + .and_then(|v| v.as_bool()) + .unwrap_or(false); + + if children_active { + let mut scope = context.execution_pointer.scope.clone(); + scope.push(context.execution_pointer.id.clone()); + + if context.workflow.is_branch_complete(&scope) { + // Re-arm: sleep again for the next iteration. + Ok(ExecutionResult::sleep( + self.interval, + Some(json!({"children_active": true})), + )) + } else { + Ok(ExecutionResult::persist(json!({"children_active": true}))) + } + } else { + // First run: sleep for the interval, then create children. + Ok(ExecutionResult::sleep( + self.interval, + Some(json!({"children_active": true})), + )) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::models::{ExecutionPointer, PointerStatus}; + use crate::primitives::test_helpers::*; + + #[tokio::test] + async fn stop_condition_true_proceeds() { + let mut step = RecurStep { + interval: Duration::from_secs(10), + stop_condition: true, + }; + let pointer = ExecutionPointer::new(0); + let wf_step = default_step(); + let workflow = default_workflow(); + let ctx = make_context(&pointer, &wf_step, &workflow); + + let result = step.run(&ctx).await.unwrap(); + assert!(result.proceed); + } + + #[tokio::test] + async fn first_run_sleeps() { + let mut step = RecurStep { + interval: Duration::from_secs(10), + stop_condition: false, + }; + let pointer = ExecutionPointer::new(0); + let wf_step = default_step(); + let workflow = default_workflow(); + let ctx = make_context(&pointer, &wf_step, &workflow); + + let result = step.run(&ctx).await.unwrap(); + assert!(!result.proceed); + assert_eq!(result.sleep_for, Some(Duration::from_secs(10))); + assert_eq!(result.persistence_data, Some(json!({"children_active": true}))); + } + + #[tokio::test] + async fn children_complete_re_arms() { + let mut step = RecurStep { + interval: Duration::from_secs(10), + stop_condition: false, + }; + let mut pointer = ExecutionPointer::new(0); + pointer.persistence_data = Some(json!({"children_active": true})); + + let wf_step = default_step(); + let mut workflow = default_workflow(); + let mut child = ExecutionPointer::new(1); + child.scope = vec![pointer.id.clone()]; + child.status = PointerStatus::Complete; + workflow.execution_pointers.push(child); + + let ctx = make_context(&pointer, &wf_step, &workflow); + let result = step.run(&ctx).await.unwrap(); + assert!(!result.proceed); + assert_eq!(result.sleep_for, Some(Duration::from_secs(10))); + } + + #[tokio::test] + async fn children_incomplete_persists() { + let mut step = RecurStep { + interval: Duration::from_secs(10), + stop_condition: false, + }; + let mut pointer = ExecutionPointer::new(0); + pointer.persistence_data = Some(json!({"children_active": true})); + + let wf_step = default_step(); + let mut workflow = default_workflow(); + let mut child = ExecutionPointer::new(1); + child.scope = vec![pointer.id.clone()]; + child.status = PointerStatus::Running; + workflow.execution_pointers.push(child); + + let ctx = make_context(&pointer, &wf_step, &workflow); + let result = step.run(&ctx).await.unwrap(); + assert!(!result.proceed); + assert!(result.sleep_for.is_none()); + assert_eq!(result.persistence_data, Some(json!({"children_active": true}))); + } +} diff --git a/wfe-core/src/primitives/saga_container.rs b/wfe-core/src/primitives/saga_container.rs new file mode 100644 index 0000000..fdc3e06 --- /dev/null +++ b/wfe-core/src/primitives/saga_container.rs @@ -0,0 +1,151 @@ +use async_trait::async_trait; +use serde_json::json; + +use crate::models::ExecutionResult; +use crate::traits::step::{StepBody, StepExecutionContext}; + +/// A container step for saga transactions. +/// Manages child step execution and compensation on failure. +pub struct SagaContainerStep { + pub revert_children_after_compensation: bool, +} + +impl Default for SagaContainerStep { + fn default() -> Self { + Self { + revert_children_after_compensation: true, + } + } +} + +#[async_trait] +impl StepBody for SagaContainerStep { + async fn run(&mut self, context: &StepExecutionContext<'_>) -> crate::Result { + let children_active = context + .persistence_data + .and_then(|d| d.get("children_active")) + .and_then(|v| v.as_bool()) + .unwrap_or(false); + + let compensating = context + .persistence_data + .and_then(|d| d.get("compensating")) + .and_then(|v| v.as_bool()) + .unwrap_or(false); + + if compensating { + // We are in compensation mode, check if compensation children are done. + let mut scope = context.execution_pointer.scope.clone(); + scope.push(context.execution_pointer.id.clone()); + + if context.workflow.is_branch_complete(&scope) { + Ok(ExecutionResult::next()) + } else { + Ok(ExecutionResult::persist( + json!({"children_active": true, "compensating": true}), + )) + } + } else if children_active { + let mut scope = context.execution_pointer.scope.clone(); + scope.push(context.execution_pointer.id.clone()); + + // Check if compensation is needed by looking for do_compensate on the step. + if context.step.do_compensate { + // Trigger compensation. + Ok(ExecutionResult::branch( + vec![json!(null)], + Some(json!({"children_active": true, "compensating": true})), + )) + } else if context.workflow.is_branch_complete(&scope) { + // Normal completion. + Ok(ExecutionResult::next()) + } else { + Ok(ExecutionResult::persist(json!({"children_active": true}))) + } + } else { + // First run: branch for children. + Ok(ExecutionResult::branch( + vec![json!(null)], + Some(json!({"children_active": true})), + )) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::models::{ExecutionPointer, PointerStatus, WorkflowStep}; + use crate::primitives::test_helpers::*; + + #[tokio::test] + async fn first_run_branches_for_children() { + let mut step = SagaContainerStep::default(); + let pointer = ExecutionPointer::new(0); + let wf_step = default_step(); + let workflow = default_workflow(); + let ctx = make_context(&pointer, &wf_step, &workflow); + + let result = step.run(&ctx).await.unwrap(); + assert!(!result.proceed); + assert_eq!(result.branch_values, Some(vec![json!(null)])); + assert_eq!(result.persistence_data, Some(json!({"children_active": true}))); + } + + #[tokio::test] + async fn normal_completion_when_children_done() { + let mut step = SagaContainerStep::default(); + let mut pointer = ExecutionPointer::new(0); + pointer.persistence_data = Some(json!({"children_active": true})); + + let wf_step = default_step(); + let mut workflow = default_workflow(); + let mut child = ExecutionPointer::new(1); + child.scope = vec![pointer.id.clone()]; + child.status = PointerStatus::Complete; + workflow.execution_pointers.push(child); + + let ctx = make_context(&pointer, &wf_step, &workflow); + let result = step.run(&ctx).await.unwrap(); + assert!(result.proceed); + } + + #[tokio::test] + async fn compensation_triggered_when_do_compensate() { + let mut step = SagaContainerStep::default(); + let mut pointer = ExecutionPointer::new(0); + pointer.persistence_data = Some(json!({"children_active": true})); + + let mut wf_step = WorkflowStep::new(0, "SagaContainer"); + wf_step.do_compensate = true; + + let workflow = default_workflow(); + let ctx = make_context(&pointer, &wf_step, &workflow); + + let result = step.run(&ctx).await.unwrap(); + assert!(!result.proceed); + assert_eq!(result.branch_values, Some(vec![json!(null)])); + assert_eq!( + result.persistence_data, + Some(json!({"children_active": true, "compensating": true})) + ); + } + + #[tokio::test] + async fn compensation_complete_proceeds() { + let mut step = SagaContainerStep::default(); + let mut pointer = ExecutionPointer::new(0); + pointer.persistence_data = Some(json!({"children_active": true, "compensating": true})); + + let wf_step = default_step(); + let mut workflow = default_workflow(); + let mut child = ExecutionPointer::new(1); + child.scope = vec![pointer.id.clone()]; + child.status = PointerStatus::Compensated; + workflow.execution_pointers.push(child); + + let ctx = make_context(&pointer, &wf_step, &workflow); + let result = step.run(&ctx).await.unwrap(); + assert!(result.proceed); + } +} diff --git a/wfe-core/src/primitives/schedule.rs b/wfe-core/src/primitives/schedule.rs new file mode 100644 index 0000000..290587a --- /dev/null +++ b/wfe-core/src/primitives/schedule.rs @@ -0,0 +1,105 @@ +use std::time::Duration; + +use async_trait::async_trait; +use serde_json::json; + +use crate::models::ExecutionResult; +use crate::traits::step::{StepBody, StepExecutionContext}; + +/// A step that schedules child execution after a delay. +pub struct ScheduleStep { + pub interval: Duration, +} + +#[async_trait] +impl StepBody for ScheduleStep { + async fn run(&mut self, context: &StepExecutionContext<'_>) -> crate::Result { + let children_active = context + .persistence_data + .and_then(|d| d.get("children_active")) + .and_then(|v| v.as_bool()) + .unwrap_or(false); + + if children_active { + // Children have been created, check if they are complete. + let mut scope = context.execution_pointer.scope.clone(); + scope.push(context.execution_pointer.id.clone()); + + if context.workflow.is_branch_complete(&scope) { + Ok(ExecutionResult::next()) + } else { + Ok(ExecutionResult::persist(json!({"children_active": true}))) + } + } else { + // First run: sleep for the interval, then create children. + Ok(ExecutionResult::sleep( + self.interval, + Some(json!({"children_active": true})), + )) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::models::{ExecutionPointer, PointerStatus}; + use crate::primitives::test_helpers::*; + + #[tokio::test] + async fn first_run_schedules_sleep() { + let mut step = ScheduleStep { + interval: Duration::from_secs(30), + }; + let pointer = ExecutionPointer::new(0); + let wf_step = default_step(); + let workflow = default_workflow(); + let ctx = make_context(&pointer, &wf_step, &workflow); + + let result = step.run(&ctx).await.unwrap(); + assert!(!result.proceed); + assert_eq!(result.sleep_for, Some(Duration::from_secs(30))); + assert_eq!(result.persistence_data, Some(json!({"children_active": true}))); + } + + #[tokio::test] + async fn children_complete_proceeds() { + let mut step = ScheduleStep { + interval: Duration::from_secs(30), + }; + let mut pointer = ExecutionPointer::new(0); + pointer.persistence_data = Some(json!({"children_active": true})); + + let wf_step = default_step(); + let mut workflow = default_workflow(); + let mut child = ExecutionPointer::new(1); + child.scope = vec![pointer.id.clone()]; + child.status = PointerStatus::Complete; + workflow.execution_pointers.push(child); + + let ctx = make_context(&pointer, &wf_step, &workflow); + let result = step.run(&ctx).await.unwrap(); + assert!(result.proceed); + } + + #[tokio::test] + async fn children_incomplete_persists() { + let mut step = ScheduleStep { + interval: Duration::from_secs(30), + }; + let mut pointer = ExecutionPointer::new(0); + pointer.persistence_data = Some(json!({"children_active": true})); + + let wf_step = default_step(); + let mut workflow = default_workflow(); + let mut child = ExecutionPointer::new(1); + child.scope = vec![pointer.id.clone()]; + child.status = PointerStatus::Running; + workflow.execution_pointers.push(child); + + let ctx = make_context(&pointer, &wf_step, &workflow); + let result = step.run(&ctx).await.unwrap(); + assert!(!result.proceed); + assert_eq!(result.persistence_data, Some(json!({"children_active": true}))); + } +} diff --git a/wfe-core/src/primitives/sequence.rs b/wfe-core/src/primitives/sequence.rs new file mode 100644 index 0000000..f822d4d --- /dev/null +++ b/wfe-core/src/primitives/sequence.rs @@ -0,0 +1,78 @@ +use async_trait::async_trait; +use serde_json::json; + +use crate::models::ExecutionResult; +use crate::traits::step::{StepBody, StepExecutionContext}; + +/// A container step that executes its children sequentially. +/// Completes when all children have finished. +pub struct SequenceStep; + +#[async_trait] +impl StepBody for SequenceStep { + async fn run(&mut self, context: &StepExecutionContext<'_>) -> crate::Result { + let mut scope = context.execution_pointer.scope.clone(); + scope.push(context.execution_pointer.id.clone()); + + if context.workflow.is_branch_complete(&scope) { + Ok(ExecutionResult::next()) + } else { + Ok(ExecutionResult::persist(json!({"children_active": true}))) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::models::{ExecutionPointer, PointerStatus}; + use crate::primitives::test_helpers::*; + + #[tokio::test] + async fn children_complete_proceeds() { + let mut step = SequenceStep; + let pointer = ExecutionPointer::new(0); + let wf_step = default_step(); + + let mut workflow = default_workflow(); + let mut child = ExecutionPointer::new(1); + child.scope = vec![pointer.id.clone()]; + child.status = PointerStatus::Complete; + workflow.execution_pointers.push(child); + + let ctx = make_context(&pointer, &wf_step, &workflow); + let result = step.run(&ctx).await.unwrap(); + assert!(result.proceed); + } + + #[tokio::test] + async fn children_incomplete_persists() { + let mut step = SequenceStep; + let pointer = ExecutionPointer::new(0); + let wf_step = default_step(); + + let mut workflow = default_workflow(); + let mut child = ExecutionPointer::new(1); + child.scope = vec![pointer.id.clone()]; + child.status = PointerStatus::Running; + workflow.execution_pointers.push(child); + + let ctx = make_context(&pointer, &wf_step, &workflow); + let result = step.run(&ctx).await.unwrap(); + assert!(!result.proceed); + assert_eq!(result.persistence_data, Some(json!({"children_active": true}))); + } + + #[tokio::test] + async fn no_children_in_scope_proceeds() { + let mut step = SequenceStep; + let pointer = ExecutionPointer::new(0); + let wf_step = default_step(); + let workflow = default_workflow(); + + let ctx = make_context(&pointer, &wf_step, &workflow); + let result = step.run(&ctx).await.unwrap(); + // No children in scope means is_branch_complete returns true (vacuously). + assert!(result.proceed); + } +} diff --git a/wfe-core/src/primitives/wait_for.rs b/wfe-core/src/primitives/wait_for.rs new file mode 100644 index 0000000..8bc4ebd --- /dev/null +++ b/wfe-core/src/primitives/wait_for.rs @@ -0,0 +1,98 @@ +use async_trait::async_trait; +use chrono::Utc; + +use crate::models::ExecutionResult; +use crate::traits::step::{StepBody, StepExecutionContext}; + +/// A step that waits for an external event before proceeding. +#[derive(Default)] +pub struct WaitForStep { + pub event_name: String, + pub event_key: String, +} + +#[async_trait] +impl StepBody for WaitForStep { + async fn run(&mut self, context: &StepExecutionContext<'_>) -> crate::Result { + // If event data has arrived, proceed. + if context.execution_pointer.event_data.is_some() { + return Ok(ExecutionResult::next()); + } + + // Read event_name/event_key from step_config if our fields are empty. + let event_name = if self.event_name.is_empty() { + context + .step + .step_config + .as_ref() + .and_then(|c| c.get("event_name")) + .and_then(|v| v.as_str()) + .unwrap_or_default() + .to_string() + } else { + self.event_name.clone() + }; + let event_key = if self.event_key.is_empty() { + context + .step + .step_config + .as_ref() + .and_then(|c| c.get("event_key")) + .and_then(|v| v.as_str()) + .unwrap_or_default() + .to_string() + } else { + self.event_key.clone() + }; + + // Otherwise, subscribe and wait for the event. + Ok(ExecutionResult::wait_for_event( + event_name, + event_key, + Utc::now(), + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::models::ExecutionPointer; + use crate::primitives::test_helpers::*; + use serde_json::json; + + #[tokio::test] + async fn first_run_waits_for_event() { + let mut step = WaitForStep { + event_name: "order.completed".into(), + event_key: "order-123".into(), + }; + let pointer = ExecutionPointer::new(0); + let wf_step = default_step(); + let workflow = default_workflow(); + let ctx = make_context(&pointer, &wf_step, &workflow); + + let result = step.run(&ctx).await.unwrap(); + assert!(!result.proceed); + assert_eq!(result.event_name.as_deref(), Some("order.completed")); + assert_eq!(result.event_key.as_deref(), Some("order-123")); + assert!(result.event_as_of.is_some()); + } + + #[tokio::test] + async fn event_arrived_proceeds() { + let mut step = WaitForStep { + event_name: "order.completed".into(), + event_key: "order-123".into(), + }; + let mut pointer = ExecutionPointer::new(0); + pointer.event_data = Some(json!({"status": "done"})); + let wf_step = default_step(); + let workflow = default_workflow(); + let ctx = make_context(&pointer, &wf_step, &workflow); + + let result = step.run(&ctx).await.unwrap(); + assert!(result.proceed); + assert!(result.event_name.is_none()); + } +} diff --git a/wfe-core/src/primitives/while_step.rs b/wfe-core/src/primitives/while_step.rs new file mode 100644 index 0000000..2374638 --- /dev/null +++ b/wfe-core/src/primitives/while_step.rs @@ -0,0 +1,145 @@ +use async_trait::async_trait; +use serde_json::json; + +use crate::models::ExecutionResult; +use crate::traits::step::{StepBody, StepExecutionContext}; + +/// A looping step that repeats its children while a condition is true. +pub struct WhileStep { + pub condition: bool, +} + +#[async_trait] +impl StepBody for WhileStep { + async fn run(&mut self, context: &StepExecutionContext<'_>) -> crate::Result { + let children_active = context + .persistence_data + .and_then(|d| d.get("children_active")) + .and_then(|v| v.as_bool()) + .unwrap_or(false); + + if children_active { + // Subsequent run: check if the current iteration is complete. + let mut scope = context.execution_pointer.scope.clone(); + scope.push(context.execution_pointer.id.clone()); + + if context.workflow.is_branch_complete(&scope) { + // Iteration complete. Re-evaluate condition. + if self.condition { + // Start a new iteration. + Ok(ExecutionResult::branch( + vec![json!(null)], + Some(json!({"children_active": true})), + )) + } else { + Ok(ExecutionResult::next()) + } + } else { + Ok(ExecutionResult::persist(json!({"children_active": true}))) + } + } else { + // First run. + if self.condition { + Ok(ExecutionResult::branch( + vec![json!(null)], + Some(json!({"children_active": true})), + )) + } else { + Ok(ExecutionResult::next()) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::models::{ExecutionPointer, PointerStatus}; + use crate::primitives::test_helpers::*; + + #[tokio::test] + async fn condition_true_first_run_branches() { + let mut step = WhileStep { condition: true }; + let pointer = ExecutionPointer::new(0); + let wf_step = default_step(); + let workflow = default_workflow(); + let ctx = make_context(&pointer, &wf_step, &workflow); + + let result = step.run(&ctx).await.unwrap(); + assert!(!result.proceed); + assert_eq!(result.branch_values, Some(vec![json!(null)])); + assert_eq!(result.persistence_data, Some(json!({"children_active": true}))); + } + + #[tokio::test] + async fn condition_false_first_run_proceeds() { + let mut step = WhileStep { condition: false }; + let pointer = ExecutionPointer::new(0); + let wf_step = default_step(); + let workflow = default_workflow(); + let ctx = make_context(&pointer, &wf_step, &workflow); + + let result = step.run(&ctx).await.unwrap(); + assert!(result.proceed); + } + + #[tokio::test] + async fn children_complete_and_condition_true_re_branches() { + let mut step = WhileStep { condition: true }; + let mut pointer = ExecutionPointer::new(0); + pointer.persistence_data = Some(json!({"children_active": true})); + + let wf_step = default_step(); + let mut workflow = default_workflow(); + let mut child = ExecutionPointer::new(1); + child.scope = vec![pointer.id.clone()]; + child.status = PointerStatus::Complete; + workflow.execution_pointers.push(child); + + let ctx = make_context(&pointer, &wf_step, &workflow); + + let result = step.run(&ctx).await.unwrap(); + assert!(!result.proceed); + assert_eq!(result.branch_values, Some(vec![json!(null)])); + } + + #[tokio::test] + async fn children_complete_and_condition_false_proceeds() { + let mut step = WhileStep { condition: false }; + let mut pointer = ExecutionPointer::new(0); + pointer.persistence_data = Some(json!({"children_active": true})); + + let wf_step = default_step(); + let mut workflow = default_workflow(); + let mut child = ExecutionPointer::new(1); + child.scope = vec![pointer.id.clone()]; + child.status = PointerStatus::Complete; + workflow.execution_pointers.push(child); + + let ctx = make_context(&pointer, &wf_step, &workflow); + + let result = step.run(&ctx).await.unwrap(); + assert!(result.proceed); + } + + #[tokio::test] + async fn children_incomplete_persists() { + let mut step = WhileStep { condition: true }; + let mut pointer = ExecutionPointer::new(0); + pointer.persistence_data = Some(json!({"children_active": true})); + + let wf_step = default_step(); + let mut workflow = default_workflow(); + let mut child = ExecutionPointer::new(1); + child.scope = vec![pointer.id.clone()]; + child.status = PointerStatus::Running; + workflow.execution_pointers.push(child); + + let ctx = make_context(&pointer, &wf_step, &workflow); + + let result = step.run(&ctx).await.unwrap(); + assert!(!result.proceed); + assert!(result.branch_values.is_none()); + assert_eq!(result.persistence_data, Some(json!({"children_active": true}))); + } +}