diff --git a/wfe-core/src/builder/inline_step.rs b/wfe-core/src/builder/inline_step.rs new file mode 100644 index 0000000..9c1b5ea --- /dev/null +++ b/wfe-core/src/builder/inline_step.rs @@ -0,0 +1,30 @@ +use async_trait::async_trait; + +use crate::models::ExecutionResult; +use crate::traits::step::{StepBody, StepExecutionContext}; + +type InlineFn = Box ExecutionResult + Send + Sync>; + +/// A step that wraps an inline closure. +pub struct InlineStep { + body: InlineFn, +} + +impl InlineStep { + pub fn new(f: impl Fn() -> ExecutionResult + Send + Sync + 'static) -> Self { + Self { body: Box::new(f) } + } +} + +impl Default for InlineStep { + fn default() -> Self { + Self::new(ExecutionResult::next) + } +} + +#[async_trait] +impl StepBody for InlineStep { + async fn run(&mut self, _context: &StepExecutionContext<'_>) -> crate::Result { + Ok((self.body)()) + } +} diff --git a/wfe-core/src/builder/mod.rs b/wfe-core/src/builder/mod.rs new file mode 100644 index 0000000..f748733 --- /dev/null +++ b/wfe-core/src/builder/mod.rs @@ -0,0 +1,7 @@ +mod inline_step; +mod step_builder; +mod workflow_builder; + +pub use inline_step::InlineStep; +pub use step_builder::StepBuilder; +pub use workflow_builder::WorkflowBuilder; diff --git a/wfe-core/src/builder/step_builder.rs b/wfe-core/src/builder/step_builder.rs new file mode 100644 index 0000000..7c90502 --- /dev/null +++ b/wfe-core/src/builder/step_builder.rs @@ -0,0 +1,221 @@ +use crate::models::{ErrorBehavior, ExecutionResult}; +use crate::primitives; +use crate::traits::step::{StepBody, WorkflowData}; + +use super::inline_step::InlineStep; +use super::workflow_builder::WorkflowBuilder; + +/// Builder for configuring a single step in the workflow. +/// +/// Owns the WorkflowBuilder, consuming self on each method call. +/// This avoids all lifetime/borrow issues. +pub struct StepBuilder { + builder: WorkflowBuilder, + step_id: usize, +} + +/// Builder for parallel branches. +pub struct ParallelBuilder { + builder: WorkflowBuilder, + container_id: usize, +} + +impl StepBuilder { + pub(crate) fn new(builder: WorkflowBuilder, step_id: usize) -> Self { + Self { builder, step_id } + } + + /// Set the display name of the current step. + pub fn name(mut self, name: &str) -> Self { + self.builder.steps[self.step_id].name = Some(name.to_string()); + self + } + + /// Set an external ID for forward references. + pub fn id(mut self, external_id: &str) -> Self { + self.builder.steps[self.step_id].external_id = Some(external_id.to_string()); + self + } + + /// Set the error handling behavior for this step. + pub fn on_error(mut self, behavior: ErrorBehavior) -> Self { + self.builder.steps[self.step_id].error_behavior = Some(behavior); + self + } + + /// Add a compensation step for saga rollback. + pub fn compensate_with(mut self) -> Self { + let comp_id = self.builder.add_step(std::any::type_name::()); + self.builder.steps[self.step_id].compensation_step_id = Some(comp_id); + self + } + + /// Chain the next step. Wires an outcome from the current step to the new one. + pub fn then(mut self) -> StepBuilder { + let next_id = self.builder.add_step(std::any::type_name::()); + self.builder.wire_outcome(self.step_id, next_id, None); + self.builder.last_step = Some(next_id); + StepBuilder::new(self.builder, next_id) + } + + /// Chain an inline function step. + pub fn then_fn(mut self, f: impl Fn() -> ExecutionResult + Send + Sync + 'static) -> StepBuilder { + let next_id = self.builder.add_step(std::any::type_name::()); + self.builder.wire_outcome(self.step_id, next_id, None); + self.builder.last_step = Some(next_id); + self.builder.inline_closures.insert(next_id, Box::new(f)); + StepBuilder::new(self.builder, next_id) + } + + /// Insert a WaitFor step. + pub fn wait_for(mut self, event_name: &str, event_key: &str) -> StepBuilder { + let next_id = self.builder.add_step(std::any::type_name::()); + self.builder.wire_outcome(self.step_id, next_id, None); + self.builder.last_step = Some(next_id); + self.builder.steps[next_id].step_config = Some(serde_json::json!({ + "event_name": event_name, + "event_key": event_key, + })); + StepBuilder::new(self.builder, next_id) + } + + /// Insert a Delay step. + pub fn delay(mut self, duration: std::time::Duration) -> StepBuilder { + let next_id = self.builder.add_step(std::any::type_name::()); + self.builder.wire_outcome(self.step_id, next_id, None); + self.builder.last_step = Some(next_id); + self.builder.steps[next_id].step_config = Some(serde_json::json!({ + "duration_millis": duration.as_millis() as u64, + })); + StepBuilder::new(self.builder, next_id) + } + + /// Insert an If container step with child steps built by the closure. + /// The type parameter S is the step type used for the If condition evaluation. + pub fn if_do( + mut self, + build_children: impl FnOnce(&mut WorkflowBuilder), + ) -> StepBuilder { + let if_id = self.builder.add_step(std::any::type_name::()); + self.builder.wire_outcome(self.step_id, if_id, None); + + // Build children + let before_count = self.builder.steps.len(); + build_children(&mut self.builder); + let after_count = self.builder.steps.len(); + + // Register children with the If step + for child_id in before_count..after_count { + self.builder.add_child(if_id, child_id); + } + + self.builder.last_step = Some(if_id); + StepBuilder::new(self.builder, if_id) + } + + /// Insert a While container step. + pub fn while_do( + mut self, + build_children: impl FnOnce(&mut WorkflowBuilder), + ) -> StepBuilder { + let while_id = self.builder.add_step(std::any::type_name::()); + self.builder.wire_outcome(self.step_id, while_id, None); + + let before_count = self.builder.steps.len(); + build_children(&mut self.builder); + let after_count = self.builder.steps.len(); + + for child_id in before_count..after_count { + self.builder.add_child(while_id, child_id); + } + + self.builder.last_step = Some(while_id); + StepBuilder::new(self.builder, while_id) + } + + /// Insert a ForEach container step. + pub fn for_each( + mut self, + build_children: impl FnOnce(&mut WorkflowBuilder), + ) -> StepBuilder { + let fe_id = self.builder.add_step(std::any::type_name::()); + self.builder.wire_outcome(self.step_id, fe_id, None); + + let before_count = self.builder.steps.len(); + build_children(&mut self.builder); + let after_count = self.builder.steps.len(); + + for child_id in before_count..after_count { + self.builder.add_child(fe_id, child_id); + } + + self.builder.last_step = Some(fe_id); + StepBuilder::new(self.builder, fe_id) + } + + /// Insert a Saga container step with child steps. + pub fn saga( + mut self, + build_children: impl FnOnce(&mut WorkflowBuilder), + ) -> StepBuilder { + let saga_id = self.builder.add_step(std::any::type_name::()); + self.builder.steps[saga_id].saga = true; + self.builder.wire_outcome(self.step_id, saga_id, None); + + let before_count = self.builder.steps.len(); + build_children(&mut self.builder); + let after_count = self.builder.steps.len(); + + for child_id in before_count..after_count { + self.builder.add_child(saga_id, child_id); + } + + self.builder.last_step = Some(saga_id); + StepBuilder::new(self.builder, saga_id) + } + + /// Start a parallel block. + pub fn parallel( + mut self, + build_branches: impl FnOnce(ParallelBuilder) -> ParallelBuilder, + ) -> StepBuilder { + let seq_id = self.builder.add_step(std::any::type_name::()); + self.builder.wire_outcome(self.step_id, seq_id, None); + + let pb = ParallelBuilder { + builder: self.builder, + container_id: seq_id, + }; + let pb = build_branches(pb); + let mut builder = pb.builder; + builder.last_step = Some(seq_id); + StepBuilder::new(builder, seq_id) + } + + /// Mark this step as the terminal step (no further outcomes). + pub fn end_workflow(self) -> WorkflowBuilder { + self.builder + } + + /// Access the compiled definition directly (shortcut for end_workflow().build()). + pub fn build(self, id: impl Into, version: u32) -> crate::models::WorkflowDefinition { + self.builder.build(id, version) + } +} + +impl ParallelBuilder { + /// Add a parallel branch. + pub fn branch( + mut self, + build_branch: impl FnOnce(&mut WorkflowBuilder), + ) -> Self { + let before_count = self.builder.steps.len(); + build_branch(&mut self.builder); + let after_count = self.builder.steps.len(); + + for child_id in before_count..after_count { + self.builder.add_child(self.container_id, child_id); + } + self + } +} diff --git a/wfe-core/src/builder/workflow_builder.rs b/wfe-core/src/builder/workflow_builder.rs new file mode 100644 index 0000000..d67128b --- /dev/null +++ b/wfe-core/src/builder/workflow_builder.rs @@ -0,0 +1,356 @@ +use std::collections::HashMap; +use std::marker::PhantomData; + +use crate::models::{ + ExecutionResult, StepOutcome, WorkflowDefinition, WorkflowStep, +}; +use crate::traits::step::{StepBody, WorkflowData}; + +use super::inline_step::InlineStep; +use super::step_builder::StepBuilder; + +/// Type alias for boxed inline step closures. +pub type InlineClosureBox = Box ExecutionResult + Send + Sync>; + +/// Fluent builder for constructing workflow definitions. +/// +/// Uses an owned-self pattern: each method consumes and returns the builder, +/// avoiding lifetime issues with mutable borrows. +/// +/// # Example +/// ```ignore +/// let def = WorkflowBuilder::::new() +/// .start_with::() +/// .name("Step A") +/// .then::() +/// .name("Step B") +/// .end_workflow() +/// .build("my-workflow", 1); +/// ``` +pub struct WorkflowBuilder { + pub(crate) steps: Vec, + pub(crate) last_step: Option, + /// Inline closures keyed by step id, stored for later registration. + pub(crate) inline_closures: HashMap, + _phantom: PhantomData, +} + +impl WorkflowBuilder { + pub fn new() -> Self { + Self { + steps: Vec::new(), + last_step: None, + inline_closures: HashMap::new(), + _phantom: PhantomData, + } + } + + /// Add the first step of the workflow. + pub fn start_with(mut self) -> StepBuilder { + let id = self.steps.len(); + let step = WorkflowStep::new(id, std::any::type_name::()); + self.steps.push(step); + self.last_step = Some(id); + StepBuilder::new(self, id) + } + + /// Add a step by type name. Used by container builder closures. + pub fn add_step(&mut self, step_type: &str) -> usize { + let id = self.steps.len(); + self.steps.push(WorkflowStep::new(id, step_type)); + id + } + + /// Wire an outcome from `from_step` to `to_step`. + pub(crate) fn wire_outcome(&mut self, from_step: usize, to_step: usize, value: Option) { + if let Some(step) = self.steps.get_mut(from_step) { + step.outcomes.push(StepOutcome { + next_step: to_step, + label: None, + value, + }); + } + } + + /// Add a child step ID to a parent container step. + pub(crate) fn add_child(&mut self, parent: usize, child: usize) { + if let Some(step) = self.steps.get_mut(parent) { + step.children.push(child); + } + } + + /// Compile the builder into a WorkflowDefinition. + pub fn build(self, id: impl Into, version: u32) -> WorkflowDefinition { + let mut def = WorkflowDefinition::new(id, version); + def.steps = self.steps; + // Note: inline closures are dropped here. Use `build_with_closures` to retain them. + def + } + + /// Compile the builder into a WorkflowDefinition and return any inline closures + /// keyed by step id. + pub fn build_with_closures( + self, + id: impl Into, + version: u32, + ) -> (WorkflowDefinition, HashMap) { + let mut def = WorkflowDefinition::new(id, version); + def.steps = self.steps; + (def, self.inline_closures) + } + + /// Register all inline closures from this builder into the given step registry. + /// + /// Each inline closure is registered under a unique key derived from the + /// `InlineStep` type name and step id. + pub fn register_inline_steps( + self, + registry: &mut crate::executor::StepRegistry, + id: impl Into, + version: u32, + ) -> WorkflowDefinition { + let mut def = WorkflowDefinition::new(id, version); + def.steps = self.steps; + for (step_id, closure) in self.inline_closures { + let closure = std::sync::Arc::new(closure); + let key = format!("{}::{step_id}", std::any::type_name::()); + // Update the step_type so the executor resolves correctly. + if let Some(step) = def.steps.get_mut(step_id) { + step.step_type = key.clone(); + } + let closure = closure.clone(); + registry.register_factory(&key, move || { + let c = closure.clone(); + Box::new(InlineStep::new(move || (c)())) + }); + } + def + } +} + +impl Default for WorkflowBuilder { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::models::{ErrorBehavior, ExecutionResult}; + use crate::traits::step::StepExecutionContext; + use pretty_assertions::assert_eq; + use serde::{Deserialize, Serialize}; + + #[derive(Debug, Clone, Default, Serialize, Deserialize)] + struct TestData { + counter: i32, + } + + #[derive(Default)] + struct StepA; + + #[async_trait::async_trait] + impl StepBody for StepA { + async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result { + Ok(ExecutionResult::next()) + } + } + + #[derive(Default)] + struct StepB; + + #[async_trait::async_trait] + impl StepBody for StepB { + async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result { + Ok(ExecutionResult::next()) + } + } + + #[derive(Default)] + struct StepC; + + #[async_trait::async_trait] + impl StepBody for StepC { + async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result { + Ok(ExecutionResult::next()) + } + } + + #[test] + fn build_empty_workflow() { + let def = WorkflowBuilder::::new().build("empty", 1); + assert_eq!(def.id, "empty"); + assert_eq!(def.version, 1); + assert!(def.steps.is_empty()); + } + + #[test] + fn start_with_adds_first_step() { + let def = WorkflowBuilder::::new() + .start_with::() + .end_workflow() + .build("test", 1); + assert_eq!(def.steps.len(), 1); + assert!(def.steps[0].step_type.contains("StepA")); + } + + #[test] + fn then_chains_two_steps_with_outcome() { + let def = WorkflowBuilder::::new() + .start_with::() + .then::() + .end_workflow() + .build("test", 1); + assert_eq!(def.steps.len(), 2); + // Step 0 should have outcome pointing to step 1 + assert_eq!(def.steps[0].outcomes.len(), 1); + assert_eq!(def.steps[0].outcomes[0].next_step, 1); + } + + #[test] + fn then_chains_three_steps() { + let def = WorkflowBuilder::::new() + .start_with::() + .then::() + .then::() + .end_workflow() + .build("test", 1); + assert_eq!(def.steps.len(), 3); + assert_eq!(def.steps[0].outcomes[0].next_step, 1); + assert_eq!(def.steps[1].outcomes[0].next_step, 2); + assert!(def.steps[2].outcomes.is_empty()); + } + + #[test] + fn name_sets_step_name() { + let def = WorkflowBuilder::::new() + .start_with::() + .name("First Step") + .end_workflow() + .build("test", 1); + assert_eq!(def.steps[0].name, Some("First Step".into())); + } + + #[test] + fn on_error_sets_behavior() { + let def = WorkflowBuilder::::new() + .start_with::() + .on_error(ErrorBehavior::Suspend) + .end_workflow() + .build("test", 1); + assert_eq!(def.steps[0].error_behavior, Some(ErrorBehavior::Suspend)); + } + + #[test] + fn if_do_inserts_container_with_children() { + let def = WorkflowBuilder::::new() + .start_with::() + .if_do::(|b| { + let id = b.add_step(std::any::type_name::()); + b.last_step = Some(id); + }) + .end_workflow() + .build("test", 1); + + // Steps: 0=StepA, 1=IfStep, 2=StepC (child) + // StepA -> IfStep -> (after if) + assert!(def.steps.len() >= 3); + // The If step should have StepC as a child + assert!(def.steps[1].step_type.contains("IfStep")); + assert!(def.steps[1].children.contains(&2)); + } + + #[test] + fn while_do_inserts_container() { + let def = WorkflowBuilder::::new() + .start_with::() + .while_do::(|b| { + b.add_step(std::any::type_name::()); + }) + .end_workflow() + .build("test", 1); + + assert!(def.steps.len() >= 3); + assert!(def.steps[1].step_type.contains("WhileStep")); + } + + #[test] + fn for_each_inserts_container() { + let def = WorkflowBuilder::::new() + .start_with::() + .for_each::(|b| { + b.add_step(std::any::type_name::()); + }) + .end_workflow() + .build("test", 1); + + assert!(def.steps.len() >= 3); + assert!(def.steps[1].step_type.contains("ForEachStep")); + } + + #[test] + fn parallel_creates_branches() { + let def = WorkflowBuilder::::new() + .start_with::() + .parallel(|branches| { + branches + .branch(|b| { + b.add_step(std::any::type_name::()); + }) + .branch(|b| { + b.add_step(std::any::type_name::()); + }) + }) + .end_workflow() + .build("test", 1); + + // Steps: 0=StepA, 1=Sequence(parallel container), 2=StepB, 3=StepC + assert!(def.steps.len() >= 4); + assert!(def.steps[1].step_type.contains("SequenceStep")); + assert!(def.steps[1].children.len() >= 2); + } + + #[test] + fn saga_with_compensation() { + let def = WorkflowBuilder::::new() + .start_with::() + .saga(|b| { + b.add_step(std::any::type_name::()); + b.add_step(std::any::type_name::()); + }) + .end_workflow() + .build("test", 1); + + // Saga container should exist and have children + assert!(def.steps[1].step_type.contains("SagaContainerStep")); + assert!(def.steps[1].saga); + assert!(!def.steps[1].children.is_empty()); + } + + #[test] + fn compensate_with_sets_compensation_step() { + let def = WorkflowBuilder::::new() + .start_with::() + .compensate_with::() + .end_workflow() + .build("test", 1); + + // Step 0 (StepA) should have compensation pointing to step 1 (StepB) + assert_eq!(def.steps[0].compensation_step_id, Some(1)); + assert!(def.steps[1].step_type.contains("StepB")); + } + + #[test] + fn inline_step_via_then_fn() { + let def = WorkflowBuilder::::new() + .start_with::() + .then_fn(ExecutionResult::next) + .end_workflow() + .build("test", 1); + + assert_eq!(def.steps.len(), 2); + assert!(def.steps[1].step_type.contains("InlineStep")); + assert_eq!(def.steps[0].outcomes[0].next_step, 1); + } +}