feat(wfe-core): add fluent workflow builder API
Owned-self builder pattern (no lifetime parameters). WorkflowBuilder chains start_with/then/end_workflow to produce WorkflowDefinition. StepBuilder supports: name, id, on_error, compensate_with, then, then_fn, wait_for, delay, if_do, while_do, for_each, saga, parallel. ParallelBuilder for branching with join semantics. InlineStep for closure-based steps. Step config stored on WorkflowStep.step_config.
This commit is contained in:
30
wfe-core/src/builder/inline_step.rs
Normal file
30
wfe-core/src/builder/inline_step.rs
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
use async_trait::async_trait;
|
||||||
|
|
||||||
|
use crate::models::ExecutionResult;
|
||||||
|
use crate::traits::step::{StepBody, StepExecutionContext};
|
||||||
|
|
||||||
|
type InlineFn = Box<dyn Fn() -> ExecutionResult + Send + Sync>;
|
||||||
|
|
||||||
|
/// A step that wraps an inline closure.
|
||||||
|
pub struct InlineStep {
|
||||||
|
body: InlineFn,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl InlineStep {
|
||||||
|
pub fn new(f: impl Fn() -> ExecutionResult + Send + Sync + 'static) -> Self {
|
||||||
|
Self { body: Box::new(f) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for InlineStep {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new(ExecutionResult::next)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl StepBody for InlineStep {
|
||||||
|
async fn run(&mut self, _context: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
|
||||||
|
Ok((self.body)())
|
||||||
|
}
|
||||||
|
}
|
||||||
7
wfe-core/src/builder/mod.rs
Normal file
7
wfe-core/src/builder/mod.rs
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
mod inline_step;
|
||||||
|
mod step_builder;
|
||||||
|
mod workflow_builder;
|
||||||
|
|
||||||
|
pub use inline_step::InlineStep;
|
||||||
|
pub use step_builder::StepBuilder;
|
||||||
|
pub use workflow_builder::WorkflowBuilder;
|
||||||
221
wfe-core/src/builder/step_builder.rs
Normal file
221
wfe-core/src/builder/step_builder.rs
Normal file
@@ -0,0 +1,221 @@
|
|||||||
|
use crate::models::{ErrorBehavior, ExecutionResult};
|
||||||
|
use crate::primitives;
|
||||||
|
use crate::traits::step::{StepBody, WorkflowData};
|
||||||
|
|
||||||
|
use super::inline_step::InlineStep;
|
||||||
|
use super::workflow_builder::WorkflowBuilder;
|
||||||
|
|
||||||
|
/// Builder for configuring a single step in the workflow.
|
||||||
|
///
|
||||||
|
/// Owns the WorkflowBuilder, consuming self on each method call.
|
||||||
|
/// This avoids all lifetime/borrow issues.
|
||||||
|
pub struct StepBuilder<D: WorkflowData> {
|
||||||
|
builder: WorkflowBuilder<D>,
|
||||||
|
step_id: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Builder for parallel branches.
|
||||||
|
pub struct ParallelBuilder<D: WorkflowData> {
|
||||||
|
builder: WorkflowBuilder<D>,
|
||||||
|
container_id: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<D: WorkflowData> StepBuilder<D> {
|
||||||
|
pub(crate) fn new(builder: WorkflowBuilder<D>, step_id: usize) -> Self {
|
||||||
|
Self { builder, step_id }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set the display name of the current step.
|
||||||
|
pub fn name(mut self, name: &str) -> Self {
|
||||||
|
self.builder.steps[self.step_id].name = Some(name.to_string());
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set an external ID for forward references.
|
||||||
|
pub fn id(mut self, external_id: &str) -> Self {
|
||||||
|
self.builder.steps[self.step_id].external_id = Some(external_id.to_string());
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set the error handling behavior for this step.
|
||||||
|
pub fn on_error(mut self, behavior: ErrorBehavior) -> Self {
|
||||||
|
self.builder.steps[self.step_id].error_behavior = Some(behavior);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Add a compensation step for saga rollback.
|
||||||
|
pub fn compensate_with<C: StepBody + Default + 'static>(mut self) -> Self {
|
||||||
|
let comp_id = self.builder.add_step(std::any::type_name::<C>());
|
||||||
|
self.builder.steps[self.step_id].compensation_step_id = Some(comp_id);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Chain the next step. Wires an outcome from the current step to the new one.
|
||||||
|
pub fn then<S: StepBody + Default + 'static>(mut self) -> StepBuilder<D> {
|
||||||
|
let next_id = self.builder.add_step(std::any::type_name::<S>());
|
||||||
|
self.builder.wire_outcome(self.step_id, next_id, None);
|
||||||
|
self.builder.last_step = Some(next_id);
|
||||||
|
StepBuilder::new(self.builder, next_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Chain an inline function step.
|
||||||
|
pub fn then_fn(mut self, f: impl Fn() -> ExecutionResult + Send + Sync + 'static) -> StepBuilder<D> {
|
||||||
|
let next_id = self.builder.add_step(std::any::type_name::<InlineStep>());
|
||||||
|
self.builder.wire_outcome(self.step_id, next_id, None);
|
||||||
|
self.builder.last_step = Some(next_id);
|
||||||
|
self.builder.inline_closures.insert(next_id, Box::new(f));
|
||||||
|
StepBuilder::new(self.builder, next_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Insert a WaitFor step.
|
||||||
|
pub fn wait_for(mut self, event_name: &str, event_key: &str) -> StepBuilder<D> {
|
||||||
|
let next_id = self.builder.add_step(std::any::type_name::<primitives::wait_for::WaitForStep>());
|
||||||
|
self.builder.wire_outcome(self.step_id, next_id, None);
|
||||||
|
self.builder.last_step = Some(next_id);
|
||||||
|
self.builder.steps[next_id].step_config = Some(serde_json::json!({
|
||||||
|
"event_name": event_name,
|
||||||
|
"event_key": event_key,
|
||||||
|
}));
|
||||||
|
StepBuilder::new(self.builder, next_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Insert a Delay step.
|
||||||
|
pub fn delay(mut self, duration: std::time::Duration) -> StepBuilder<D> {
|
||||||
|
let next_id = self.builder.add_step(std::any::type_name::<primitives::delay::DelayStep>());
|
||||||
|
self.builder.wire_outcome(self.step_id, next_id, None);
|
||||||
|
self.builder.last_step = Some(next_id);
|
||||||
|
self.builder.steps[next_id].step_config = Some(serde_json::json!({
|
||||||
|
"duration_millis": duration.as_millis() as u64,
|
||||||
|
}));
|
||||||
|
StepBuilder::new(self.builder, next_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Insert an If container step with child steps built by the closure.
|
||||||
|
/// The type parameter S is the step type used for the If condition evaluation.
|
||||||
|
pub fn if_do<S: StepBody + Default + 'static>(
|
||||||
|
mut self,
|
||||||
|
build_children: impl FnOnce(&mut WorkflowBuilder<D>),
|
||||||
|
) -> StepBuilder<D> {
|
||||||
|
let if_id = self.builder.add_step(std::any::type_name::<primitives::if_step::IfStep>());
|
||||||
|
self.builder.wire_outcome(self.step_id, if_id, None);
|
||||||
|
|
||||||
|
// Build children
|
||||||
|
let before_count = self.builder.steps.len();
|
||||||
|
build_children(&mut self.builder);
|
||||||
|
let after_count = self.builder.steps.len();
|
||||||
|
|
||||||
|
// Register children with the If step
|
||||||
|
for child_id in before_count..after_count {
|
||||||
|
self.builder.add_child(if_id, child_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.builder.last_step = Some(if_id);
|
||||||
|
StepBuilder::new(self.builder, if_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Insert a While container step.
|
||||||
|
pub fn while_do<S: StepBody + Default + 'static>(
|
||||||
|
mut self,
|
||||||
|
build_children: impl FnOnce(&mut WorkflowBuilder<D>),
|
||||||
|
) -> StepBuilder<D> {
|
||||||
|
let while_id = self.builder.add_step(std::any::type_name::<primitives::while_step::WhileStep>());
|
||||||
|
self.builder.wire_outcome(self.step_id, while_id, None);
|
||||||
|
|
||||||
|
let before_count = self.builder.steps.len();
|
||||||
|
build_children(&mut self.builder);
|
||||||
|
let after_count = self.builder.steps.len();
|
||||||
|
|
||||||
|
for child_id in before_count..after_count {
|
||||||
|
self.builder.add_child(while_id, child_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.builder.last_step = Some(while_id);
|
||||||
|
StepBuilder::new(self.builder, while_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Insert a ForEach container step.
|
||||||
|
pub fn for_each<S: StepBody + Default + 'static>(
|
||||||
|
mut self,
|
||||||
|
build_children: impl FnOnce(&mut WorkflowBuilder<D>),
|
||||||
|
) -> StepBuilder<D> {
|
||||||
|
let fe_id = self.builder.add_step(std::any::type_name::<primitives::foreach_step::ForEachStep>());
|
||||||
|
self.builder.wire_outcome(self.step_id, fe_id, None);
|
||||||
|
|
||||||
|
let before_count = self.builder.steps.len();
|
||||||
|
build_children(&mut self.builder);
|
||||||
|
let after_count = self.builder.steps.len();
|
||||||
|
|
||||||
|
for child_id in before_count..after_count {
|
||||||
|
self.builder.add_child(fe_id, child_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.builder.last_step = Some(fe_id);
|
||||||
|
StepBuilder::new(self.builder, fe_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Insert a Saga container step with child steps.
|
||||||
|
pub fn saga(
|
||||||
|
mut self,
|
||||||
|
build_children: impl FnOnce(&mut WorkflowBuilder<D>),
|
||||||
|
) -> StepBuilder<D> {
|
||||||
|
let saga_id = self.builder.add_step(std::any::type_name::<primitives::saga_container::SagaContainerStep>());
|
||||||
|
self.builder.steps[saga_id].saga = true;
|
||||||
|
self.builder.wire_outcome(self.step_id, saga_id, None);
|
||||||
|
|
||||||
|
let before_count = self.builder.steps.len();
|
||||||
|
build_children(&mut self.builder);
|
||||||
|
let after_count = self.builder.steps.len();
|
||||||
|
|
||||||
|
for child_id in before_count..after_count {
|
||||||
|
self.builder.add_child(saga_id, child_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
self.builder.last_step = Some(saga_id);
|
||||||
|
StepBuilder::new(self.builder, saga_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Start a parallel block.
|
||||||
|
pub fn parallel(
|
||||||
|
mut self,
|
||||||
|
build_branches: impl FnOnce(ParallelBuilder<D>) -> ParallelBuilder<D>,
|
||||||
|
) -> StepBuilder<D> {
|
||||||
|
let seq_id = self.builder.add_step(std::any::type_name::<primitives::sequence::SequenceStep>());
|
||||||
|
self.builder.wire_outcome(self.step_id, seq_id, None);
|
||||||
|
|
||||||
|
let pb = ParallelBuilder {
|
||||||
|
builder: self.builder,
|
||||||
|
container_id: seq_id,
|
||||||
|
};
|
||||||
|
let pb = build_branches(pb);
|
||||||
|
let mut builder = pb.builder;
|
||||||
|
builder.last_step = Some(seq_id);
|
||||||
|
StepBuilder::new(builder, seq_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mark this step as the terminal step (no further outcomes).
|
||||||
|
pub fn end_workflow(self) -> WorkflowBuilder<D> {
|
||||||
|
self.builder
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Access the compiled definition directly (shortcut for end_workflow().build()).
|
||||||
|
pub fn build(self, id: impl Into<String>, version: u32) -> crate::models::WorkflowDefinition {
|
||||||
|
self.builder.build(id, version)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<D: WorkflowData> ParallelBuilder<D> {
|
||||||
|
/// Add a parallel branch.
|
||||||
|
pub fn branch(
|
||||||
|
mut self,
|
||||||
|
build_branch: impl FnOnce(&mut WorkflowBuilder<D>),
|
||||||
|
) -> Self {
|
||||||
|
let before_count = self.builder.steps.len();
|
||||||
|
build_branch(&mut self.builder);
|
||||||
|
let after_count = self.builder.steps.len();
|
||||||
|
|
||||||
|
for child_id in before_count..after_count {
|
||||||
|
self.builder.add_child(self.container_id, child_id);
|
||||||
|
}
|
||||||
|
self
|
||||||
|
}
|
||||||
|
}
|
||||||
356
wfe-core/src/builder/workflow_builder.rs
Normal file
356
wfe-core/src/builder/workflow_builder.rs
Normal file
@@ -0,0 +1,356 @@
|
|||||||
|
use std::collections::HashMap;
|
||||||
|
use std::marker::PhantomData;
|
||||||
|
|
||||||
|
use crate::models::{
|
||||||
|
ExecutionResult, StepOutcome, WorkflowDefinition, WorkflowStep,
|
||||||
|
};
|
||||||
|
use crate::traits::step::{StepBody, WorkflowData};
|
||||||
|
|
||||||
|
use super::inline_step::InlineStep;
|
||||||
|
use super::step_builder::StepBuilder;
|
||||||
|
|
||||||
|
/// Type alias for boxed inline step closures.
|
||||||
|
pub type InlineClosureBox = Box<dyn Fn() -> ExecutionResult + Send + Sync>;
|
||||||
|
|
||||||
|
/// Fluent builder for constructing workflow definitions.
|
||||||
|
///
|
||||||
|
/// Uses an owned-self pattern: each method consumes and returns the builder,
|
||||||
|
/// avoiding lifetime issues with mutable borrows.
|
||||||
|
///
|
||||||
|
/// # Example
|
||||||
|
/// ```ignore
|
||||||
|
/// let def = WorkflowBuilder::<MyData>::new()
|
||||||
|
/// .start_with::<StepA>()
|
||||||
|
/// .name("Step A")
|
||||||
|
/// .then::<StepB>()
|
||||||
|
/// .name("Step B")
|
||||||
|
/// .end_workflow()
|
||||||
|
/// .build("my-workflow", 1);
|
||||||
|
/// ```
|
||||||
|
pub struct WorkflowBuilder<D: WorkflowData> {
|
||||||
|
pub(crate) steps: Vec<WorkflowStep>,
|
||||||
|
pub(crate) last_step: Option<usize>,
|
||||||
|
/// Inline closures keyed by step id, stored for later registration.
|
||||||
|
pub(crate) inline_closures: HashMap<usize, InlineClosureBox>,
|
||||||
|
_phantom: PhantomData<D>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<D: WorkflowData> WorkflowBuilder<D> {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
steps: Vec::new(),
|
||||||
|
last_step: None,
|
||||||
|
inline_closures: HashMap::new(),
|
||||||
|
_phantom: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Add the first step of the workflow.
|
||||||
|
pub fn start_with<S: StepBody + Default + 'static>(mut self) -> StepBuilder<D> {
|
||||||
|
let id = self.steps.len();
|
||||||
|
let step = WorkflowStep::new(id, std::any::type_name::<S>());
|
||||||
|
self.steps.push(step);
|
||||||
|
self.last_step = Some(id);
|
||||||
|
StepBuilder::new(self, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Add a step by type name. Used by container builder closures.
|
||||||
|
pub fn add_step(&mut self, step_type: &str) -> usize {
|
||||||
|
let id = self.steps.len();
|
||||||
|
self.steps.push(WorkflowStep::new(id, step_type));
|
||||||
|
id
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wire an outcome from `from_step` to `to_step`.
|
||||||
|
pub(crate) fn wire_outcome(&mut self, from_step: usize, to_step: usize, value: Option<serde_json::Value>) {
|
||||||
|
if let Some(step) = self.steps.get_mut(from_step) {
|
||||||
|
step.outcomes.push(StepOutcome {
|
||||||
|
next_step: to_step,
|
||||||
|
label: None,
|
||||||
|
value,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Add a child step ID to a parent container step.
|
||||||
|
pub(crate) fn add_child(&mut self, parent: usize, child: usize) {
|
||||||
|
if let Some(step) = self.steps.get_mut(parent) {
|
||||||
|
step.children.push(child);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Compile the builder into a WorkflowDefinition.
|
||||||
|
pub fn build(self, id: impl Into<String>, version: u32) -> WorkflowDefinition {
|
||||||
|
let mut def = WorkflowDefinition::new(id, version);
|
||||||
|
def.steps = self.steps;
|
||||||
|
// Note: inline closures are dropped here. Use `build_with_closures` to retain them.
|
||||||
|
def
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Compile the builder into a WorkflowDefinition and return any inline closures
|
||||||
|
/// keyed by step id.
|
||||||
|
pub fn build_with_closures(
|
||||||
|
self,
|
||||||
|
id: impl Into<String>,
|
||||||
|
version: u32,
|
||||||
|
) -> (WorkflowDefinition, HashMap<usize, InlineClosureBox>) {
|
||||||
|
let mut def = WorkflowDefinition::new(id, version);
|
||||||
|
def.steps = self.steps;
|
||||||
|
(def, self.inline_closures)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Register all inline closures from this builder into the given step registry.
|
||||||
|
///
|
||||||
|
/// Each inline closure is registered under a unique key derived from the
|
||||||
|
/// `InlineStep` type name and step id.
|
||||||
|
pub fn register_inline_steps(
|
||||||
|
self,
|
||||||
|
registry: &mut crate::executor::StepRegistry,
|
||||||
|
id: impl Into<String>,
|
||||||
|
version: u32,
|
||||||
|
) -> WorkflowDefinition {
|
||||||
|
let mut def = WorkflowDefinition::new(id, version);
|
||||||
|
def.steps = self.steps;
|
||||||
|
for (step_id, closure) in self.inline_closures {
|
||||||
|
let closure = std::sync::Arc::new(closure);
|
||||||
|
let key = format!("{}::{step_id}", std::any::type_name::<InlineStep>());
|
||||||
|
// Update the step_type so the executor resolves correctly.
|
||||||
|
if let Some(step) = def.steps.get_mut(step_id) {
|
||||||
|
step.step_type = key.clone();
|
||||||
|
}
|
||||||
|
let closure = closure.clone();
|
||||||
|
registry.register_factory(&key, move || {
|
||||||
|
let c = closure.clone();
|
||||||
|
Box::new(InlineStep::new(move || (c)()))
|
||||||
|
});
|
||||||
|
}
|
||||||
|
def
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<D: WorkflowData> Default for WorkflowBuilder<D> {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use crate::models::{ErrorBehavior, ExecutionResult};
|
||||||
|
use crate::traits::step::StepExecutionContext;
|
||||||
|
use pretty_assertions::assert_eq;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
||||||
|
struct TestData {
|
||||||
|
counter: i32,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct StepA;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl StepBody for StepA {
|
||||||
|
async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
|
||||||
|
Ok(ExecutionResult::next())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct StepB;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl StepBody for StepB {
|
||||||
|
async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
|
||||||
|
Ok(ExecutionResult::next())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct StepC;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl StepBody for StepC {
|
||||||
|
async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
|
||||||
|
Ok(ExecutionResult::next())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn build_empty_workflow() {
|
||||||
|
let def = WorkflowBuilder::<TestData>::new().build("empty", 1);
|
||||||
|
assert_eq!(def.id, "empty");
|
||||||
|
assert_eq!(def.version, 1);
|
||||||
|
assert!(def.steps.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn start_with_adds_first_step() {
|
||||||
|
let def = WorkflowBuilder::<TestData>::new()
|
||||||
|
.start_with::<StepA>()
|
||||||
|
.end_workflow()
|
||||||
|
.build("test", 1);
|
||||||
|
assert_eq!(def.steps.len(), 1);
|
||||||
|
assert!(def.steps[0].step_type.contains("StepA"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn then_chains_two_steps_with_outcome() {
|
||||||
|
let def = WorkflowBuilder::<TestData>::new()
|
||||||
|
.start_with::<StepA>()
|
||||||
|
.then::<StepB>()
|
||||||
|
.end_workflow()
|
||||||
|
.build("test", 1);
|
||||||
|
assert_eq!(def.steps.len(), 2);
|
||||||
|
// Step 0 should have outcome pointing to step 1
|
||||||
|
assert_eq!(def.steps[0].outcomes.len(), 1);
|
||||||
|
assert_eq!(def.steps[0].outcomes[0].next_step, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn then_chains_three_steps() {
|
||||||
|
let def = WorkflowBuilder::<TestData>::new()
|
||||||
|
.start_with::<StepA>()
|
||||||
|
.then::<StepB>()
|
||||||
|
.then::<StepC>()
|
||||||
|
.end_workflow()
|
||||||
|
.build("test", 1);
|
||||||
|
assert_eq!(def.steps.len(), 3);
|
||||||
|
assert_eq!(def.steps[0].outcomes[0].next_step, 1);
|
||||||
|
assert_eq!(def.steps[1].outcomes[0].next_step, 2);
|
||||||
|
assert!(def.steps[2].outcomes.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn name_sets_step_name() {
|
||||||
|
let def = WorkflowBuilder::<TestData>::new()
|
||||||
|
.start_with::<StepA>()
|
||||||
|
.name("First Step")
|
||||||
|
.end_workflow()
|
||||||
|
.build("test", 1);
|
||||||
|
assert_eq!(def.steps[0].name, Some("First Step".into()));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn on_error_sets_behavior() {
|
||||||
|
let def = WorkflowBuilder::<TestData>::new()
|
||||||
|
.start_with::<StepA>()
|
||||||
|
.on_error(ErrorBehavior::Suspend)
|
||||||
|
.end_workflow()
|
||||||
|
.build("test", 1);
|
||||||
|
assert_eq!(def.steps[0].error_behavior, Some(ErrorBehavior::Suspend));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn if_do_inserts_container_with_children() {
|
||||||
|
let def = WorkflowBuilder::<TestData>::new()
|
||||||
|
.start_with::<StepA>()
|
||||||
|
.if_do::<StepB>(|b| {
|
||||||
|
let id = b.add_step(std::any::type_name::<StepC>());
|
||||||
|
b.last_step = Some(id);
|
||||||
|
})
|
||||||
|
.end_workflow()
|
||||||
|
.build("test", 1);
|
||||||
|
|
||||||
|
// Steps: 0=StepA, 1=IfStep, 2=StepC (child)
|
||||||
|
// StepA -> IfStep -> (after if)
|
||||||
|
assert!(def.steps.len() >= 3);
|
||||||
|
// The If step should have StepC as a child
|
||||||
|
assert!(def.steps[1].step_type.contains("IfStep"));
|
||||||
|
assert!(def.steps[1].children.contains(&2));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn while_do_inserts_container() {
|
||||||
|
let def = WorkflowBuilder::<TestData>::new()
|
||||||
|
.start_with::<StepA>()
|
||||||
|
.while_do::<StepB>(|b| {
|
||||||
|
b.add_step(std::any::type_name::<StepC>());
|
||||||
|
})
|
||||||
|
.end_workflow()
|
||||||
|
.build("test", 1);
|
||||||
|
|
||||||
|
assert!(def.steps.len() >= 3);
|
||||||
|
assert!(def.steps[1].step_type.contains("WhileStep"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn for_each_inserts_container() {
|
||||||
|
let def = WorkflowBuilder::<TestData>::new()
|
||||||
|
.start_with::<StepA>()
|
||||||
|
.for_each::<StepB>(|b| {
|
||||||
|
b.add_step(std::any::type_name::<StepC>());
|
||||||
|
})
|
||||||
|
.end_workflow()
|
||||||
|
.build("test", 1);
|
||||||
|
|
||||||
|
assert!(def.steps.len() >= 3);
|
||||||
|
assert!(def.steps[1].step_type.contains("ForEachStep"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parallel_creates_branches() {
|
||||||
|
let def = WorkflowBuilder::<TestData>::new()
|
||||||
|
.start_with::<StepA>()
|
||||||
|
.parallel(|branches| {
|
||||||
|
branches
|
||||||
|
.branch(|b| {
|
||||||
|
b.add_step(std::any::type_name::<StepB>());
|
||||||
|
})
|
||||||
|
.branch(|b| {
|
||||||
|
b.add_step(std::any::type_name::<StepC>());
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.end_workflow()
|
||||||
|
.build("test", 1);
|
||||||
|
|
||||||
|
// Steps: 0=StepA, 1=Sequence(parallel container), 2=StepB, 3=StepC
|
||||||
|
assert!(def.steps.len() >= 4);
|
||||||
|
assert!(def.steps[1].step_type.contains("SequenceStep"));
|
||||||
|
assert!(def.steps[1].children.len() >= 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn saga_with_compensation() {
|
||||||
|
let def = WorkflowBuilder::<TestData>::new()
|
||||||
|
.start_with::<StepA>()
|
||||||
|
.saga(|b| {
|
||||||
|
b.add_step(std::any::type_name::<StepB>());
|
||||||
|
b.add_step(std::any::type_name::<StepC>());
|
||||||
|
})
|
||||||
|
.end_workflow()
|
||||||
|
.build("test", 1);
|
||||||
|
|
||||||
|
// Saga container should exist and have children
|
||||||
|
assert!(def.steps[1].step_type.contains("SagaContainerStep"));
|
||||||
|
assert!(def.steps[1].saga);
|
||||||
|
assert!(!def.steps[1].children.is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn compensate_with_sets_compensation_step() {
|
||||||
|
let def = WorkflowBuilder::<TestData>::new()
|
||||||
|
.start_with::<StepA>()
|
||||||
|
.compensate_with::<StepB>()
|
||||||
|
.end_workflow()
|
||||||
|
.build("test", 1);
|
||||||
|
|
||||||
|
// Step 0 (StepA) should have compensation pointing to step 1 (StepB)
|
||||||
|
assert_eq!(def.steps[0].compensation_step_id, Some(1));
|
||||||
|
assert!(def.steps[1].step_type.contains("StepB"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn inline_step_via_then_fn() {
|
||||||
|
let def = WorkflowBuilder::<TestData>::new()
|
||||||
|
.start_with::<StepA>()
|
||||||
|
.then_fn(ExecutionResult::next)
|
||||||
|
.end_workflow()
|
||||||
|
.build("test", 1);
|
||||||
|
|
||||||
|
assert_eq!(def.steps.len(), 2);
|
||||||
|
assert!(def.steps[1].step_type.contains("InlineStep"));
|
||||||
|
assert_eq!(def.steps[0].outcomes[0].next_step, 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user