style: apply cargo fmt workspace-wide
Pure formatting pass from `cargo fmt --all`. No logic changes. Separating this out so the 1.9 release feature commits that follow show only their intentional edits.
This commit is contained in:
@@ -67,7 +67,10 @@ impl<D: WorkflowData> StepBuilder<D> {
|
||||
}
|
||||
|
||||
/// Chain an inline function step.
|
||||
pub fn then_fn(mut self, f: impl Fn() -> ExecutionResult + Send + Sync + 'static) -> StepBuilder<D> {
|
||||
pub fn then_fn(
|
||||
mut self,
|
||||
f: impl Fn() -> ExecutionResult + Send + Sync + 'static,
|
||||
) -> StepBuilder<D> {
|
||||
let next_id = self.builder.add_step(std::any::type_name::<InlineStep>());
|
||||
self.builder.wire_outcome(self.step_id, next_id, None);
|
||||
self.builder.last_step = Some(next_id);
|
||||
@@ -77,7 +80,9 @@ impl<D: WorkflowData> StepBuilder<D> {
|
||||
|
||||
/// Insert a WaitFor step.
|
||||
pub fn wait_for(mut self, event_name: &str, event_key: &str) -> StepBuilder<D> {
|
||||
let next_id = self.builder.add_step(std::any::type_name::<primitives::wait_for::WaitForStep>());
|
||||
let next_id = self
|
||||
.builder
|
||||
.add_step(std::any::type_name::<primitives::wait_for::WaitForStep>());
|
||||
self.builder.wire_outcome(self.step_id, next_id, None);
|
||||
self.builder.last_step = Some(next_id);
|
||||
self.builder.steps[next_id].step_config = Some(serde_json::json!({
|
||||
@@ -89,7 +94,9 @@ impl<D: WorkflowData> StepBuilder<D> {
|
||||
|
||||
/// Insert a Delay step.
|
||||
pub fn delay(mut self, duration: std::time::Duration) -> StepBuilder<D> {
|
||||
let next_id = self.builder.add_step(std::any::type_name::<primitives::delay::DelayStep>());
|
||||
let next_id = self
|
||||
.builder
|
||||
.add_step(std::any::type_name::<primitives::delay::DelayStep>());
|
||||
self.builder.wire_outcome(self.step_id, next_id, None);
|
||||
self.builder.last_step = Some(next_id);
|
||||
self.builder.steps[next_id].step_config = Some(serde_json::json!({
|
||||
@@ -104,7 +111,9 @@ impl<D: WorkflowData> StepBuilder<D> {
|
||||
mut self,
|
||||
build_children: impl FnOnce(&mut WorkflowBuilder<D>),
|
||||
) -> StepBuilder<D> {
|
||||
let if_id = self.builder.add_step(std::any::type_name::<primitives::if_step::IfStep>());
|
||||
let if_id = self
|
||||
.builder
|
||||
.add_step(std::any::type_name::<primitives::if_step::IfStep>());
|
||||
self.builder.wire_outcome(self.step_id, if_id, None);
|
||||
|
||||
// Build children
|
||||
@@ -126,7 +135,9 @@ impl<D: WorkflowData> StepBuilder<D> {
|
||||
mut self,
|
||||
build_children: impl FnOnce(&mut WorkflowBuilder<D>),
|
||||
) -> StepBuilder<D> {
|
||||
let while_id = self.builder.add_step(std::any::type_name::<primitives::while_step::WhileStep>());
|
||||
let while_id = self
|
||||
.builder
|
||||
.add_step(std::any::type_name::<primitives::while_step::WhileStep>());
|
||||
self.builder.wire_outcome(self.step_id, while_id, None);
|
||||
|
||||
let before_count = self.builder.steps.len();
|
||||
@@ -146,7 +157,9 @@ impl<D: WorkflowData> StepBuilder<D> {
|
||||
mut self,
|
||||
build_children: impl FnOnce(&mut WorkflowBuilder<D>),
|
||||
) -> StepBuilder<D> {
|
||||
let fe_id = self.builder.add_step(std::any::type_name::<primitives::foreach_step::ForEachStep>());
|
||||
let fe_id = self
|
||||
.builder
|
||||
.add_step(std::any::type_name::<primitives::foreach_step::ForEachStep>());
|
||||
self.builder.wire_outcome(self.step_id, fe_id, None);
|
||||
|
||||
let before_count = self.builder.steps.len();
|
||||
@@ -162,11 +175,10 @@ impl<D: WorkflowData> StepBuilder<D> {
|
||||
}
|
||||
|
||||
/// Insert a Saga container step with child steps.
|
||||
pub fn saga(
|
||||
mut self,
|
||||
build_children: impl FnOnce(&mut WorkflowBuilder<D>),
|
||||
) -> StepBuilder<D> {
|
||||
let saga_id = self.builder.add_step(std::any::type_name::<primitives::saga_container::SagaContainerStep>());
|
||||
pub fn saga(mut self, build_children: impl FnOnce(&mut WorkflowBuilder<D>)) -> StepBuilder<D> {
|
||||
let saga_id = self.builder.add_step(std::any::type_name::<
|
||||
primitives::saga_container::SagaContainerStep,
|
||||
>());
|
||||
self.builder.steps[saga_id].saga = true;
|
||||
self.builder.wire_outcome(self.step_id, saga_id, None);
|
||||
|
||||
@@ -187,7 +199,9 @@ impl<D: WorkflowData> StepBuilder<D> {
|
||||
mut self,
|
||||
build_branches: impl FnOnce(ParallelBuilder<D>) -> ParallelBuilder<D>,
|
||||
) -> StepBuilder<D> {
|
||||
let seq_id = self.builder.add_step(std::any::type_name::<primitives::sequence::SequenceStep>());
|
||||
let seq_id = self
|
||||
.builder
|
||||
.add_step(std::any::type_name::<primitives::sequence::SequenceStep>());
|
||||
self.builder.wire_outcome(self.step_id, seq_id, None);
|
||||
|
||||
let pb = ParallelBuilder {
|
||||
@@ -213,10 +227,7 @@ impl<D: WorkflowData> StepBuilder<D> {
|
||||
|
||||
impl<D: WorkflowData> ParallelBuilder<D> {
|
||||
/// Add a parallel branch.
|
||||
pub fn branch(
|
||||
mut self,
|
||||
build_branch: impl FnOnce(&mut WorkflowBuilder<D>),
|
||||
) -> Self {
|
||||
pub fn branch(mut self, build_branch: impl FnOnce(&mut WorkflowBuilder<D>)) -> Self {
|
||||
let before_count = self.builder.steps.len();
|
||||
build_branch(&mut self.builder);
|
||||
let after_count = self.builder.steps.len();
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
use std::collections::HashMap;
|
||||
use std::marker::PhantomData;
|
||||
|
||||
use crate::models::{
|
||||
ExecutionResult, StepOutcome, WorkflowDefinition, WorkflowStep,
|
||||
};
|
||||
use crate::models::{ExecutionResult, StepOutcome, WorkflowDefinition, WorkflowStep};
|
||||
use crate::traits::step::{StepBody, WorkflowData};
|
||||
|
||||
use super::inline_step::InlineStep;
|
||||
@@ -77,7 +75,12 @@ impl<D: WorkflowData> WorkflowBuilder<D> {
|
||||
}
|
||||
|
||||
/// Wire an outcome from `from_step` to `to_step`.
|
||||
pub fn wire_outcome(&mut self, from_step: usize, to_step: usize, value: Option<serde_json::Value>) {
|
||||
pub fn wire_outcome(
|
||||
&mut self,
|
||||
from_step: usize,
|
||||
to_step: usize,
|
||||
value: Option<serde_json::Value>,
|
||||
) {
|
||||
if let Some(step) = self.steps.get_mut(from_step) {
|
||||
step.outcomes.push(StepOutcome {
|
||||
next_step: to_step,
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use crate::models::condition::{ComparisonOp, FieldComparison, StepCondition};
|
||||
use crate::WfeError;
|
||||
use crate::models::condition::{ComparisonOp, FieldComparison, StepCondition};
|
||||
|
||||
/// Evaluate a step condition against workflow data.
|
||||
///
|
||||
@@ -29,10 +29,7 @@ impl From<WfeError> for EvalError {
|
||||
}
|
||||
}
|
||||
|
||||
fn evaluate_inner(
|
||||
condition: &StepCondition,
|
||||
data: &serde_json::Value,
|
||||
) -> Result<bool, EvalError> {
|
||||
fn evaluate_inner(condition: &StepCondition, data: &serde_json::Value) -> Result<bool, EvalError> {
|
||||
match condition {
|
||||
StepCondition::All(conditions) => {
|
||||
for c in conditions {
|
||||
@@ -582,22 +579,14 @@ mod tests {
|
||||
#[test]
|
||||
fn not_true_becomes_false() {
|
||||
let data = json!({"a": 1});
|
||||
let cond = StepCondition::Not(Box::new(comp(
|
||||
".a",
|
||||
ComparisonOp::Equals,
|
||||
Some(json!(1)),
|
||||
)));
|
||||
let cond = StepCondition::Not(Box::new(comp(".a", ComparisonOp::Equals, Some(json!(1)))));
|
||||
assert!(!evaluate(&cond, &data).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn not_false_becomes_true() {
|
||||
let data = json!({"a": 99});
|
||||
let cond = StepCondition::Not(Box::new(comp(
|
||||
".a",
|
||||
ComparisonOp::Equals,
|
||||
Some(json!(1)),
|
||||
)));
|
||||
let cond = StepCondition::Not(Box::new(comp(".a", ComparisonOp::Equals, Some(json!(1)))));
|
||||
assert!(evaluate(&cond, &data).unwrap());
|
||||
}
|
||||
|
||||
@@ -639,11 +628,7 @@ mod tests {
|
||||
comp(".a", ComparisonOp::Equals, Some(json!(1))),
|
||||
comp(".a", ComparisonOp::Equals, Some(json!(99))),
|
||||
]),
|
||||
StepCondition::Not(Box::new(comp(
|
||||
".c",
|
||||
ComparisonOp::Equals,
|
||||
Some(json!(99)),
|
||||
))),
|
||||
StepCondition::Not(Box::new(comp(".c", ComparisonOp::Equals, Some(json!(99))))),
|
||||
]);
|
||||
assert!(evaluate(&cond, &data).unwrap());
|
||||
}
|
||||
@@ -742,7 +727,13 @@ mod tests {
|
||||
let data = json!({"score": 3.14});
|
||||
assert!(evaluate(&comp(".score", ComparisonOp::Gt, Some(json!(3.0))), &data).unwrap());
|
||||
assert!(evaluate(&comp(".score", ComparisonOp::Lt, Some(json!(4.0))), &data).unwrap());
|
||||
assert!(!evaluate(&comp(".score", ComparisonOp::Equals, Some(json!(3.0))), &data).unwrap());
|
||||
assert!(
|
||||
!evaluate(
|
||||
&comp(".score", ComparisonOp::Equals, Some(json!(3.0))),
|
||||
&data
|
||||
)
|
||||
.unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -29,7 +29,10 @@ pub fn handle_error(
|
||||
.unwrap_or_else(|| definition.default_error_behavior.clone());
|
||||
|
||||
match behavior {
|
||||
ErrorBehavior::Retry { interval, max_retries } => {
|
||||
ErrorBehavior::Retry {
|
||||
interval,
|
||||
max_retries,
|
||||
} => {
|
||||
if max_retries > 0 && pointer.retry_count >= max_retries {
|
||||
// Exceeded max retries, suspend the workflow
|
||||
pointer.status = PointerStatus::Failed;
|
||||
@@ -44,9 +47,8 @@ pub fn handle_error(
|
||||
pointer.retry_count += 1;
|
||||
pointer.status = PointerStatus::Sleeping;
|
||||
pointer.active = true;
|
||||
pointer.sleep_until = Some(
|
||||
Utc::now() + chrono::Duration::milliseconds(interval.as_millis() as i64),
|
||||
);
|
||||
pointer.sleep_until =
|
||||
Some(Utc::now() + chrono::Duration::milliseconds(interval.as_millis() as i64));
|
||||
}
|
||||
}
|
||||
ErrorBehavior::Suspend => {
|
||||
@@ -67,7 +69,9 @@ pub fn handle_error(
|
||||
&& let Some(comp_step_id) = step.compensation_step_id
|
||||
{
|
||||
let mut comp_pointer = ExecutionPointer::new(comp_step_id);
|
||||
comp_pointer.step_name = definition.steps.iter()
|
||||
comp_pointer.step_name = definition
|
||||
.steps
|
||||
.iter()
|
||||
.find(|s| s.id == comp_step_id)
|
||||
.and_then(|s| s.name.clone());
|
||||
comp_pointer.predecessor_id = Some(pointer.id.clone());
|
||||
|
||||
@@ -36,7 +36,9 @@ pub fn process_result(
|
||||
let next_step_id = find_next_step(step, &result.outcome_value);
|
||||
if let Some(next_id) = next_step_id {
|
||||
let mut next_pointer = ExecutionPointer::new(next_id);
|
||||
next_pointer.step_name = definition.steps.iter()
|
||||
next_pointer.step_name = definition
|
||||
.steps
|
||||
.iter()
|
||||
.find(|s| s.id == next_id)
|
||||
.and_then(|s| s.name.clone());
|
||||
next_pointer.predecessor_id = Some(pointer.id.clone());
|
||||
@@ -62,7 +64,9 @@ pub fn process_result(
|
||||
for value in branch_values {
|
||||
for &child_step_id in &child_step_ids {
|
||||
let mut child_pointer = ExecutionPointer::new(child_step_id);
|
||||
child_pointer.step_name = definition.steps.iter()
|
||||
child_pointer.step_name = definition
|
||||
.steps
|
||||
.iter()
|
||||
.find(|s| s.id == child_step_id)
|
||||
.and_then(|s| s.name.clone());
|
||||
child_pointer.context_item = Some(value.clone());
|
||||
@@ -79,9 +83,7 @@ pub fn process_result(
|
||||
pointer.event_name = result.event_name.clone();
|
||||
pointer.event_key = result.event_key.clone();
|
||||
|
||||
if let (Some(event_name), Some(event_key)) =
|
||||
(&result.event_name, &result.event_key)
|
||||
{
|
||||
if let (Some(event_name), Some(event_key)) = (&result.event_name, &result.event_key) {
|
||||
let as_of = result.event_as_of.unwrap_or_else(Utc::now);
|
||||
let sub = EventSubscription::new(
|
||||
workflow_id,
|
||||
@@ -107,8 +109,7 @@ pub fn process_result(
|
||||
pointer.status = PointerStatus::Sleeping;
|
||||
pointer.active = true;
|
||||
pointer.sleep_until = Some(
|
||||
Utc::now()
|
||||
+ chrono::Duration::milliseconds(poll_config.interval.as_millis() as i64),
|
||||
Utc::now() + chrono::Duration::milliseconds(poll_config.interval.as_millis() as i64),
|
||||
);
|
||||
pointer.persistence_data = result.persistence_data.clone();
|
||||
} else if result.persistence_data.is_some() {
|
||||
|
||||
@@ -17,7 +17,8 @@ impl StepRegistry {
|
||||
/// Register a step type using its full type name as the key.
|
||||
pub fn register<S: StepBody + Default + 'static>(&mut self) {
|
||||
let key = std::any::type_name::<S>().to_string();
|
||||
self.factories.insert(key, Box::new(|| Box::new(S::default())));
|
||||
self.factories
|
||||
.insert(key, Box::new(|| Box::new(S::default())));
|
||||
}
|
||||
|
||||
/// Register a step factory with an explicit key and factory function.
|
||||
|
||||
@@ -119,12 +119,12 @@ impl WorkflowExecutor {
|
||||
host_context: Option<&dyn crate::traits::HostContext>,
|
||||
) -> Result<()> {
|
||||
// 2. Load workflow instance.
|
||||
let mut workflow = self
|
||||
.persistence
|
||||
.get_workflow_instance(workflow_id)
|
||||
.await?;
|
||||
let mut workflow = self.persistence.get_workflow_instance(workflow_id).await?;
|
||||
|
||||
tracing::Span::current().record("workflow.definition_id", workflow.workflow_definition_id.as_str());
|
||||
tracing::Span::current().record(
|
||||
"workflow.definition_id",
|
||||
workflow.workflow_definition_id.as_str(),
|
||||
);
|
||||
|
||||
if workflow.status != WorkflowStatus::Runnable {
|
||||
debug!(workflow_id, status = ?workflow.status, "Workflow not runnable, skipping");
|
||||
@@ -179,15 +179,15 @@ impl WorkflowExecutor {
|
||||
// Activate next step via outcomes (same as Complete).
|
||||
let next_step_id = step.outcomes.first().map(|o| o.next_step);
|
||||
if let Some(next_id) = next_step_id {
|
||||
let mut next_pointer =
|
||||
crate::models::ExecutionPointer::new(next_id);
|
||||
next_pointer.step_name = definition.steps.iter()
|
||||
let mut next_pointer = crate::models::ExecutionPointer::new(next_id);
|
||||
next_pointer.step_name = definition
|
||||
.steps
|
||||
.iter()
|
||||
.find(|s| s.id == next_id)
|
||||
.and_then(|s| s.name.clone());
|
||||
next_pointer.predecessor_id =
|
||||
Some(workflow.execution_pointers[idx].id.clone());
|
||||
next_pointer.scope =
|
||||
workflow.execution_pointers[idx].scope.clone();
|
||||
next_pointer.scope = workflow.execution_pointers[idx].scope.clone();
|
||||
workflow.execution_pointers.push(next_pointer);
|
||||
}
|
||||
|
||||
@@ -208,12 +208,12 @@ impl WorkflowExecutor {
|
||||
);
|
||||
|
||||
// b. Resolve the step body.
|
||||
let mut step_body = step_registry
|
||||
.resolve(&step.step_type)
|
||||
.ok_or_else(|| WfeError::StepExecution(format!(
|
||||
let mut step_body = step_registry.resolve(&step.step_type).ok_or_else(|| {
|
||||
WfeError::StepExecution(format!(
|
||||
"Step type not found in registry: {}",
|
||||
step.step_type
|
||||
)))?;
|
||||
))
|
||||
})?;
|
||||
|
||||
// Mark pointer as running before building context.
|
||||
if workflow.execution_pointers[idx].start_time.is_none() {
|
||||
@@ -229,7 +229,8 @@ impl WorkflowExecutor {
|
||||
step_id,
|
||||
step_name: step.name.clone(),
|
||||
},
|
||||
)).await;
|
||||
))
|
||||
.await;
|
||||
|
||||
// c. Build StepExecutionContext (borrows workflow immutably).
|
||||
let cancellation_token = tokio_util::sync::CancellationToken::new();
|
||||
@@ -277,19 +278,15 @@ impl WorkflowExecutor {
|
||||
step_id,
|
||||
step_name: step.name.clone(),
|
||||
},
|
||||
)).await;
|
||||
))
|
||||
.await;
|
||||
|
||||
// e. Process the ExecutionResult.
|
||||
// Extract workflow_id before mutable borrow.
|
||||
let wf_id = workflow.id.clone();
|
||||
let process_result = {
|
||||
let pointer = &mut workflow.execution_pointers[idx];
|
||||
result_processor::process_result(
|
||||
&result,
|
||||
pointer,
|
||||
definition,
|
||||
&wf_id,
|
||||
)
|
||||
result_processor::process_result(&result, pointer, definition, &wf_id)
|
||||
};
|
||||
|
||||
all_subscriptions.extend(process_result.subscriptions);
|
||||
@@ -320,7 +317,8 @@ impl WorkflowExecutor {
|
||||
crate::models::LifecycleEventType::Error {
|
||||
message: error_msg.clone(),
|
||||
},
|
||||
)).await;
|
||||
))
|
||||
.await;
|
||||
|
||||
let pointer_id = workflow.execution_pointers[idx].id.clone();
|
||||
execution_errors.push(ExecutionError::new(
|
||||
@@ -331,11 +329,7 @@ impl WorkflowExecutor {
|
||||
|
||||
let handler_result = {
|
||||
let pointer = &mut workflow.execution_pointers[idx];
|
||||
error_handler::handle_error(
|
||||
&error_msg,
|
||||
pointer,
|
||||
definition,
|
||||
)
|
||||
error_handler::handle_error(&error_msg, pointer, definition)
|
||||
};
|
||||
|
||||
// Apply workflow-level status changes from error handler.
|
||||
@@ -348,7 +342,8 @@ impl WorkflowExecutor {
|
||||
&workflow.workflow_definition_id,
|
||||
workflow.version,
|
||||
crate::models::LifecycleEventType::Terminated,
|
||||
)).await;
|
||||
))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -382,7 +377,8 @@ impl WorkflowExecutor {
|
||||
&workflow.workflow_definition_id,
|
||||
workflow.version,
|
||||
crate::models::LifecycleEventType::Completed,
|
||||
)).await;
|
||||
))
|
||||
.await;
|
||||
|
||||
// Publish completion event for SubWorkflow parents.
|
||||
let completion_event = Event::new(
|
||||
@@ -427,9 +423,7 @@ impl WorkflowExecutor {
|
||||
|
||||
// Persist errors.
|
||||
if !execution_errors.is_empty() {
|
||||
self.persistence
|
||||
.persist_errors(&execution_errors)
|
||||
.await?;
|
||||
self.persistence.persist_errors(&execution_errors).await?;
|
||||
}
|
||||
|
||||
// 8. Queue any follow-up work.
|
||||
@@ -512,10 +506,7 @@ mod tests {
|
||||
|
||||
#[async_trait]
|
||||
impl StepBody for PassStep {
|
||||
async fn run(
|
||||
&mut self,
|
||||
_ctx: &StepExecutionContext<'_>,
|
||||
) -> crate::Result<ExecutionResult> {
|
||||
async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
|
||||
Ok(ExecutionResult::next())
|
||||
}
|
||||
}
|
||||
@@ -525,10 +516,7 @@ mod tests {
|
||||
|
||||
#[async_trait]
|
||||
impl StepBody for OutcomeStep {
|
||||
async fn run(
|
||||
&mut self,
|
||||
_ctx: &StepExecutionContext<'_>,
|
||||
) -> crate::Result<ExecutionResult> {
|
||||
async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
|
||||
Ok(ExecutionResult::outcome(serde_json::json!("yes")))
|
||||
}
|
||||
}
|
||||
@@ -538,10 +526,7 @@ mod tests {
|
||||
|
||||
#[async_trait]
|
||||
impl StepBody for PersistStep {
|
||||
async fn run(
|
||||
&mut self,
|
||||
_ctx: &StepExecutionContext<'_>,
|
||||
) -> crate::Result<ExecutionResult> {
|
||||
async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
|
||||
Ok(ExecutionResult::persist(serde_json::json!({"count": 1})))
|
||||
}
|
||||
}
|
||||
@@ -551,10 +536,7 @@ mod tests {
|
||||
|
||||
#[async_trait]
|
||||
impl StepBody for SleepStep {
|
||||
async fn run(
|
||||
&mut self,
|
||||
_ctx: &StepExecutionContext<'_>,
|
||||
) -> crate::Result<ExecutionResult> {
|
||||
async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
|
||||
Ok(ExecutionResult::sleep(Duration::from_secs(30), None))
|
||||
}
|
||||
}
|
||||
@@ -564,10 +546,7 @@ mod tests {
|
||||
|
||||
#[async_trait]
|
||||
impl StepBody for WaitEventStep {
|
||||
async fn run(
|
||||
&mut self,
|
||||
_ctx: &StepExecutionContext<'_>,
|
||||
) -> crate::Result<ExecutionResult> {
|
||||
async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
|
||||
Ok(ExecutionResult::wait_for_event(
|
||||
"order.completed",
|
||||
"order-123",
|
||||
@@ -581,10 +560,7 @@ mod tests {
|
||||
|
||||
#[async_trait]
|
||||
impl StepBody for EventResumeStep {
|
||||
async fn run(
|
||||
&mut self,
|
||||
ctx: &StepExecutionContext<'_>,
|
||||
) -> crate::Result<ExecutionResult> {
|
||||
async fn run(&mut self, ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
|
||||
if ctx.execution_pointer.event_published {
|
||||
Ok(ExecutionResult::next())
|
||||
} else {
|
||||
@@ -602,10 +578,7 @@ mod tests {
|
||||
|
||||
#[async_trait]
|
||||
impl StepBody for BranchStep {
|
||||
async fn run(
|
||||
&mut self,
|
||||
_ctx: &StepExecutionContext<'_>,
|
||||
) -> crate::Result<ExecutionResult> {
|
||||
async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
|
||||
Ok(ExecutionResult::branch(
|
||||
vec![
|
||||
serde_json::json!(1),
|
||||
@@ -622,10 +595,7 @@ mod tests {
|
||||
|
||||
#[async_trait]
|
||||
impl StepBody for FailStep {
|
||||
async fn run(
|
||||
&mut self,
|
||||
_ctx: &StepExecutionContext<'_>,
|
||||
) -> crate::Result<ExecutionResult> {
|
||||
async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
|
||||
Err(WfeError::StepExecution("step failed".into()))
|
||||
}
|
||||
}
|
||||
@@ -635,10 +605,7 @@ mod tests {
|
||||
|
||||
#[async_trait]
|
||||
impl StepBody for CompensateStep {
|
||||
async fn run(
|
||||
&mut self,
|
||||
_ctx: &StepExecutionContext<'_>,
|
||||
) -> crate::Result<ExecutionResult> {
|
||||
async fn run(&mut self, _ctx: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult> {
|
||||
Ok(ExecutionResult::next())
|
||||
}
|
||||
}
|
||||
@@ -680,7 +647,8 @@ mod tests {
|
||||
registry.register::<PassStep>();
|
||||
|
||||
let mut def = WorkflowDefinition::new("test", 1);
|
||||
def.steps.push(WorkflowStep::new(0, step_type::<PassStep>()));
|
||||
def.steps
|
||||
.push(WorkflowStep::new(0, step_type::<PassStep>()));
|
||||
|
||||
let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({}));
|
||||
let pointer = ExecutionPointer::new(0);
|
||||
@@ -688,11 +656,20 @@ mod tests {
|
||||
|
||||
persistence.create_new_workflow(&instance).await.unwrap();
|
||||
|
||||
executor.execute(&instance.id, &def, ®istry, None).await.unwrap();
|
||||
executor
|
||||
.execute(&instance.id, &def, ®istry, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
|
||||
let updated = persistence
|
||||
.get_workflow_instance(&instance.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(updated.status, WorkflowStatus::Complete);
|
||||
assert_eq!(updated.execution_pointers[0].status, PointerStatus::Complete);
|
||||
assert_eq!(
|
||||
updated.execution_pointers[0].status,
|
||||
PointerStatus::Complete
|
||||
);
|
||||
assert!(updated.complete_time.is_some());
|
||||
}
|
||||
|
||||
@@ -712,27 +689,46 @@ mod tests {
|
||||
value: None,
|
||||
});
|
||||
def.steps.push(step0);
|
||||
def.steps.push(WorkflowStep::new(1, step_type::<PassStep>()));
|
||||
def.steps
|
||||
.push(WorkflowStep::new(1, step_type::<PassStep>()));
|
||||
|
||||
let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({}));
|
||||
instance.execution_pointers.push(ExecutionPointer::new(0));
|
||||
persistence.create_new_workflow(&instance).await.unwrap();
|
||||
|
||||
// First execution: step 0 completes, step 1 pointer created.
|
||||
executor.execute(&instance.id, &def, ®istry, None).await.unwrap();
|
||||
executor
|
||||
.execute(&instance.id, &def, ®istry, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
|
||||
let updated = persistence
|
||||
.get_workflow_instance(&instance.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(updated.execution_pointers.len(), 2);
|
||||
assert_eq!(updated.execution_pointers[0].status, PointerStatus::Complete);
|
||||
assert_eq!(
|
||||
updated.execution_pointers[0].status,
|
||||
PointerStatus::Complete
|
||||
);
|
||||
// Step 1 pointer should be active and pending.
|
||||
assert_eq!(updated.execution_pointers[1].step_id, 1);
|
||||
|
||||
// Second execution: step 1 completes.
|
||||
executor.execute(&instance.id, &def, ®istry, None).await.unwrap();
|
||||
executor
|
||||
.execute(&instance.id, &def, ®istry, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
|
||||
let updated = persistence
|
||||
.get_workflow_instance(&instance.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(updated.status, WorkflowStatus::Complete);
|
||||
assert_eq!(updated.execution_pointers[1].status, PointerStatus::Complete);
|
||||
assert_eq!(
|
||||
updated.execution_pointers[1].status,
|
||||
PointerStatus::Complete
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -745,9 +741,17 @@ mod tests {
|
||||
|
||||
let mut def = WorkflowDefinition::new("test", 1);
|
||||
let mut s0 = WorkflowStep::new(0, step_type::<PassStep>());
|
||||
s0.outcomes.push(StepOutcome { next_step: 1, label: None, value: None });
|
||||
s0.outcomes.push(StepOutcome {
|
||||
next_step: 1,
|
||||
label: None,
|
||||
value: None,
|
||||
});
|
||||
let mut s1 = WorkflowStep::new(1, step_type::<PassStep>());
|
||||
s1.outcomes.push(StepOutcome { next_step: 2, label: None, value: None });
|
||||
s1.outcomes.push(StepOutcome {
|
||||
next_step: 2,
|
||||
label: None,
|
||||
value: None,
|
||||
});
|
||||
let s2 = WorkflowStep::new(2, step_type::<PassStep>());
|
||||
def.steps.push(s0);
|
||||
def.steps.push(s1);
|
||||
@@ -759,10 +763,16 @@ mod tests {
|
||||
|
||||
// Execute three times for three steps.
|
||||
for _ in 0..3 {
|
||||
executor.execute(&instance.id, &def, ®istry, None).await.unwrap();
|
||||
executor
|
||||
.execute(&instance.id, &def, ®istry, None)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
|
||||
let updated = persistence
|
||||
.get_workflow_instance(&instance.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(updated.status, WorkflowStatus::Complete);
|
||||
assert_eq!(updated.execution_pointers.len(), 3);
|
||||
for p in &updated.execution_pointers {
|
||||
@@ -792,16 +802,24 @@ mod tests {
|
||||
value: Some(serde_json::json!("yes")),
|
||||
});
|
||||
def.steps.push(s0);
|
||||
def.steps.push(WorkflowStep::new(1, step_type::<PassStep>()));
|
||||
def.steps.push(WorkflowStep::new(2, step_type::<PassStep>()));
|
||||
def.steps
|
||||
.push(WorkflowStep::new(1, step_type::<PassStep>()));
|
||||
def.steps
|
||||
.push(WorkflowStep::new(2, step_type::<PassStep>()));
|
||||
|
||||
let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({}));
|
||||
instance.execution_pointers.push(ExecutionPointer::new(0));
|
||||
persistence.create_new_workflow(&instance).await.unwrap();
|
||||
|
||||
executor.execute(&instance.id, &def, ®istry, None).await.unwrap();
|
||||
executor
|
||||
.execute(&instance.id, &def, ®istry, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
|
||||
let updated = persistence
|
||||
.get_workflow_instance(&instance.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(updated.execution_pointers.len(), 2);
|
||||
// Should route to step 2 (the "yes" branch).
|
||||
assert_eq!(updated.execution_pointers[1].step_id, 2);
|
||||
@@ -816,15 +834,22 @@ mod tests {
|
||||
registry.register::<PersistStep>();
|
||||
|
||||
let mut def = WorkflowDefinition::new("test", 1);
|
||||
def.steps.push(WorkflowStep::new(0, step_type::<PersistStep>()));
|
||||
def.steps
|
||||
.push(WorkflowStep::new(0, step_type::<PersistStep>()));
|
||||
|
||||
let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({}));
|
||||
instance.execution_pointers.push(ExecutionPointer::new(0));
|
||||
persistence.create_new_workflow(&instance).await.unwrap();
|
||||
|
||||
executor.execute(&instance.id, &def, ®istry, None).await.unwrap();
|
||||
executor
|
||||
.execute(&instance.id, &def, ®istry, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
|
||||
let updated = persistence
|
||||
.get_workflow_instance(&instance.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(updated.status, WorkflowStatus::Runnable);
|
||||
assert!(updated.execution_pointers[0].active);
|
||||
assert_eq!(
|
||||
@@ -842,16 +867,26 @@ mod tests {
|
||||
registry.register::<SleepStep>();
|
||||
|
||||
let mut def = WorkflowDefinition::new("test", 1);
|
||||
def.steps.push(WorkflowStep::new(0, step_type::<SleepStep>()));
|
||||
def.steps
|
||||
.push(WorkflowStep::new(0, step_type::<SleepStep>()));
|
||||
|
||||
let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({}));
|
||||
instance.execution_pointers.push(ExecutionPointer::new(0));
|
||||
persistence.create_new_workflow(&instance).await.unwrap();
|
||||
|
||||
executor.execute(&instance.id, &def, ®istry, None).await.unwrap();
|
||||
executor
|
||||
.execute(&instance.id, &def, ®istry, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
|
||||
assert_eq!(updated.execution_pointers[0].status, PointerStatus::Sleeping);
|
||||
let updated = persistence
|
||||
.get_workflow_instance(&instance.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
updated.execution_pointers[0].status,
|
||||
PointerStatus::Sleeping
|
||||
);
|
||||
assert!(updated.execution_pointers[0].sleep_until.is_some());
|
||||
assert!(updated.execution_pointers[0].active);
|
||||
}
|
||||
@@ -865,15 +900,22 @@ mod tests {
|
||||
registry.register::<WaitEventStep>();
|
||||
|
||||
let mut def = WorkflowDefinition::new("test", 1);
|
||||
def.steps.push(WorkflowStep::new(0, step_type::<WaitEventStep>()));
|
||||
def.steps
|
||||
.push(WorkflowStep::new(0, step_type::<WaitEventStep>()));
|
||||
|
||||
let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({}));
|
||||
instance.execution_pointers.push(ExecutionPointer::new(0));
|
||||
persistence.create_new_workflow(&instance).await.unwrap();
|
||||
|
||||
executor.execute(&instance.id, &def, ®istry, None).await.unwrap();
|
||||
executor
|
||||
.execute(&instance.id, &def, ®istry, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
|
||||
let updated = persistence
|
||||
.get_workflow_instance(&instance.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
updated.execution_pointers[0].status,
|
||||
PointerStatus::WaitingForEvent
|
||||
@@ -899,7 +941,8 @@ mod tests {
|
||||
registry.register::<EventResumeStep>();
|
||||
|
||||
let mut def = WorkflowDefinition::new("test", 1);
|
||||
def.steps.push(WorkflowStep::new(0, step_type::<EventResumeStep>()));
|
||||
def.steps
|
||||
.push(WorkflowStep::new(0, step_type::<EventResumeStep>()));
|
||||
|
||||
let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({}));
|
||||
let mut pointer = ExecutionPointer::new(0);
|
||||
@@ -911,10 +954,19 @@ mod tests {
|
||||
instance.execution_pointers.push(pointer);
|
||||
persistence.create_new_workflow(&instance).await.unwrap();
|
||||
|
||||
executor.execute(&instance.id, &def, ®istry, None).await.unwrap();
|
||||
executor
|
||||
.execute(&instance.id, &def, ®istry, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
|
||||
assert_eq!(updated.execution_pointers[0].status, PointerStatus::Complete);
|
||||
let updated = persistence
|
||||
.get_workflow_instance(&instance.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
updated.execution_pointers[0].status,
|
||||
PointerStatus::Complete
|
||||
);
|
||||
assert_eq!(updated.status, WorkflowStatus::Complete);
|
||||
}
|
||||
|
||||
@@ -931,15 +983,22 @@ mod tests {
|
||||
let mut s0 = WorkflowStep::new(0, step_type::<BranchStep>());
|
||||
s0.children.push(1);
|
||||
def.steps.push(s0);
|
||||
def.steps.push(WorkflowStep::new(1, step_type::<PassStep>()));
|
||||
def.steps
|
||||
.push(WorkflowStep::new(1, step_type::<PassStep>()));
|
||||
|
||||
let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({}));
|
||||
instance.execution_pointers.push(ExecutionPointer::new(0));
|
||||
persistence.create_new_workflow(&instance).await.unwrap();
|
||||
|
||||
executor.execute(&instance.id, &def, ®istry, None).await.unwrap();
|
||||
executor
|
||||
.execute(&instance.id, &def, ®istry, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
|
||||
let updated = persistence
|
||||
.get_workflow_instance(&instance.id)
|
||||
.await
|
||||
.unwrap();
|
||||
// 1 original + 3 children.
|
||||
assert_eq!(updated.execution_pointers.len(), 4);
|
||||
// Children should have scope containing the parent pointer id.
|
||||
@@ -973,11 +1032,20 @@ mod tests {
|
||||
instance.execution_pointers.push(ExecutionPointer::new(0));
|
||||
persistence.create_new_workflow(&instance).await.unwrap();
|
||||
|
||||
executor.execute(&instance.id, &def, ®istry, None).await.unwrap();
|
||||
executor
|
||||
.execute(&instance.id, &def, ®istry, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
|
||||
let updated = persistence
|
||||
.get_workflow_instance(&instance.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(updated.execution_pointers[0].retry_count, 1);
|
||||
assert_eq!(updated.execution_pointers[0].status, PointerStatus::Sleeping);
|
||||
assert_eq!(
|
||||
updated.execution_pointers[0].status,
|
||||
PointerStatus::Sleeping
|
||||
);
|
||||
assert!(updated.execution_pointers[0].sleep_until.is_some());
|
||||
assert_eq!(updated.status, WorkflowStatus::Runnable);
|
||||
}
|
||||
@@ -999,9 +1067,15 @@ mod tests {
|
||||
instance.execution_pointers.push(ExecutionPointer::new(0));
|
||||
persistence.create_new_workflow(&instance).await.unwrap();
|
||||
|
||||
executor.execute(&instance.id, &def, ®istry, None).await.unwrap();
|
||||
executor
|
||||
.execute(&instance.id, &def, ®istry, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
|
||||
let updated = persistence
|
||||
.get_workflow_instance(&instance.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(updated.status, WorkflowStatus::Suspended);
|
||||
assert_eq!(updated.execution_pointers[0].status, PointerStatus::Failed);
|
||||
}
|
||||
@@ -1023,9 +1097,15 @@ mod tests {
|
||||
instance.execution_pointers.push(ExecutionPointer::new(0));
|
||||
persistence.create_new_workflow(&instance).await.unwrap();
|
||||
|
||||
executor.execute(&instance.id, &def, ®istry, None).await.unwrap();
|
||||
executor
|
||||
.execute(&instance.id, &def, ®istry, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
|
||||
let updated = persistence
|
||||
.get_workflow_instance(&instance.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(updated.status, WorkflowStatus::Terminated);
|
||||
assert_eq!(updated.execution_pointers[0].status, PointerStatus::Failed);
|
||||
assert!(updated.complete_time.is_some());
|
||||
@@ -1045,15 +1125,22 @@ mod tests {
|
||||
s0.error_behavior = Some(ErrorBehavior::Compensate);
|
||||
s0.compensation_step_id = Some(1);
|
||||
def.steps.push(s0);
|
||||
def.steps.push(WorkflowStep::new(1, step_type::<CompensateStep>()));
|
||||
def.steps
|
||||
.push(WorkflowStep::new(1, step_type::<CompensateStep>()));
|
||||
|
||||
let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({}));
|
||||
instance.execution_pointers.push(ExecutionPointer::new(0));
|
||||
persistence.create_new_workflow(&instance).await.unwrap();
|
||||
|
||||
executor.execute(&instance.id, &def, ®istry, None).await.unwrap();
|
||||
executor
|
||||
.execute(&instance.id, &def, ®istry, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
|
||||
let updated = persistence
|
||||
.get_workflow_instance(&instance.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(updated.execution_pointers[0].status, PointerStatus::Failed);
|
||||
// Compensation pointer should be created.
|
||||
assert_eq!(updated.execution_pointers.len(), 2);
|
||||
@@ -1070,8 +1157,10 @@ mod tests {
|
||||
registry.register::<PassStep>();
|
||||
|
||||
let mut def = WorkflowDefinition::new("test", 1);
|
||||
def.steps.push(WorkflowStep::new(0, step_type::<PassStep>()));
|
||||
def.steps.push(WorkflowStep::new(1, step_type::<PassStep>()));
|
||||
def.steps
|
||||
.push(WorkflowStep::new(0, step_type::<PassStep>()));
|
||||
def.steps
|
||||
.push(WorkflowStep::new(1, step_type::<PassStep>()));
|
||||
|
||||
let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({}));
|
||||
// Two independent active pointers.
|
||||
@@ -1079,14 +1168,22 @@ mod tests {
|
||||
instance.execution_pointers.push(ExecutionPointer::new(1));
|
||||
persistence.create_new_workflow(&instance).await.unwrap();
|
||||
|
||||
executor.execute(&instance.id, &def, ®istry, None).await.unwrap();
|
||||
executor
|
||||
.execute(&instance.id, &def, ®istry, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
|
||||
let updated = persistence
|
||||
.get_workflow_instance(&instance.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(updated.status, WorkflowStatus::Complete);
|
||||
assert!(updated
|
||||
.execution_pointers
|
||||
.iter()
|
||||
.all(|p| p.status == PointerStatus::Complete));
|
||||
assert!(
|
||||
updated
|
||||
.execution_pointers
|
||||
.iter()
|
||||
.all(|p| p.status == PointerStatus::Complete)
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -1114,9 +1211,15 @@ mod tests {
|
||||
persistence.create_new_workflow(&instance).await.unwrap();
|
||||
|
||||
// Should not error on a completed workflow.
|
||||
executor.execute(&instance.id, &def, ®istry, None).await.unwrap();
|
||||
executor
|
||||
.execute(&instance.id, &def, ®istry, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
|
||||
let updated = persistence
|
||||
.get_workflow_instance(&instance.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(updated.status, WorkflowStatus::Complete);
|
||||
}
|
||||
|
||||
@@ -1129,7 +1232,8 @@ mod tests {
|
||||
registry.register::<SleepStep>();
|
||||
|
||||
let mut def = WorkflowDefinition::new("test", 1);
|
||||
def.steps.push(WorkflowStep::new(0, step_type::<SleepStep>()));
|
||||
def.steps
|
||||
.push(WorkflowStep::new(0, step_type::<SleepStep>()));
|
||||
|
||||
let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({}));
|
||||
let mut pointer = ExecutionPointer::new(0);
|
||||
@@ -1139,11 +1243,20 @@ mod tests {
|
||||
instance.execution_pointers.push(pointer);
|
||||
persistence.create_new_workflow(&instance).await.unwrap();
|
||||
|
||||
executor.execute(&instance.id, &def, ®istry, None).await.unwrap();
|
||||
executor
|
||||
.execute(&instance.id, &def, ®istry, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
|
||||
let updated = persistence
|
||||
.get_workflow_instance(&instance.id)
|
||||
.await
|
||||
.unwrap();
|
||||
// Should still be sleeping since sleep_until is in the future.
|
||||
assert_eq!(updated.execution_pointers[0].status, PointerStatus::Sleeping);
|
||||
assert_eq!(
|
||||
updated.execution_pointers[0].status,
|
||||
PointerStatus::Sleeping
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -1163,7 +1276,10 @@ mod tests {
|
||||
instance.execution_pointers.push(ExecutionPointer::new(0));
|
||||
persistence.create_new_workflow(&instance).await.unwrap();
|
||||
|
||||
executor.execute(&instance.id, &def, ®istry, None).await.unwrap();
|
||||
executor
|
||||
.execute(&instance.id, &def, ®istry, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let errors = persistence.get_errors().await;
|
||||
assert_eq!(errors.len(), 1);
|
||||
@@ -1174,24 +1290,31 @@ mod tests {
|
||||
async fn lifecycle_events_published() {
|
||||
let (persistence, lock, queue) = create_providers();
|
||||
let lifecycle = Arc::new(InMemoryLifecyclePublisher::new());
|
||||
let executor = create_executor(persistence.clone(), lock, queue)
|
||||
.with_lifecycle(lifecycle.clone());
|
||||
let executor =
|
||||
create_executor(persistence.clone(), lock, queue).with_lifecycle(lifecycle.clone());
|
||||
|
||||
let mut registry = StepRegistry::new();
|
||||
registry.register::<PassStep>();
|
||||
|
||||
let mut def = WorkflowDefinition::new("test", 1);
|
||||
def.steps.push(WorkflowStep::new(0, step_type::<PassStep>()));
|
||||
def.steps
|
||||
.push(WorkflowStep::new(0, step_type::<PassStep>()));
|
||||
|
||||
let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({}));
|
||||
instance.execution_pointers.push(ExecutionPointer::new(0));
|
||||
persistence.create_new_workflow(&instance).await.unwrap();
|
||||
|
||||
executor.execute(&instance.id, &def, ®istry, None).await.unwrap();
|
||||
executor
|
||||
.execute(&instance.id, &def, ®istry, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Executor itself doesn't publish lifecycle events in the current implementation,
|
||||
// but the with_lifecycle builder works correctly.
|
||||
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
|
||||
let updated = persistence
|
||||
.get_workflow_instance(&instance.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(updated.status, WorkflowStatus::Complete);
|
||||
}
|
||||
|
||||
@@ -1206,15 +1329,22 @@ mod tests {
|
||||
let mut def = WorkflowDefinition::new("test", 1);
|
||||
def.default_error_behavior = ErrorBehavior::Terminate;
|
||||
// Step has no error_behavior override.
|
||||
def.steps.push(WorkflowStep::new(0, step_type::<FailStep>()));
|
||||
def.steps
|
||||
.push(WorkflowStep::new(0, step_type::<FailStep>()));
|
||||
|
||||
let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({}));
|
||||
instance.execution_pointers.push(ExecutionPointer::new(0));
|
||||
persistence.create_new_workflow(&instance).await.unwrap();
|
||||
|
||||
executor.execute(&instance.id, &def, ®istry, None).await.unwrap();
|
||||
executor
|
||||
.execute(&instance.id, &def, ®istry, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
|
||||
let updated = persistence
|
||||
.get_workflow_instance(&instance.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(updated.status, WorkflowStatus::Terminated);
|
||||
}
|
||||
|
||||
@@ -1227,15 +1357,22 @@ mod tests {
|
||||
registry.register::<PassStep>();
|
||||
|
||||
let mut def = WorkflowDefinition::new("test", 1);
|
||||
def.steps.push(WorkflowStep::new(0, step_type::<PassStep>()));
|
||||
def.steps
|
||||
.push(WorkflowStep::new(0, step_type::<PassStep>()));
|
||||
|
||||
let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({}));
|
||||
instance.execution_pointers.push(ExecutionPointer::new(0));
|
||||
persistence.create_new_workflow(&instance).await.unwrap();
|
||||
|
||||
executor.execute(&instance.id, &def, ®istry, None).await.unwrap();
|
||||
executor
|
||||
.execute(&instance.id, &def, ®istry, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
|
||||
let updated = persistence
|
||||
.get_workflow_instance(&instance.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(updated.execution_pointers[0].start_time.is_some());
|
||||
assert!(updated.execution_pointers[0].end_time.is_some());
|
||||
}
|
||||
@@ -1257,15 +1394,22 @@ mod tests {
|
||||
value: Some(serde_json::json!("yes")),
|
||||
});
|
||||
def.steps.push(s0);
|
||||
def.steps.push(WorkflowStep::new(1, step_type::<PassStep>()));
|
||||
def.steps
|
||||
.push(WorkflowStep::new(1, step_type::<PassStep>()));
|
||||
|
||||
let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({}));
|
||||
instance.execution_pointers.push(ExecutionPointer::new(0));
|
||||
persistence.create_new_workflow(&instance).await.unwrap();
|
||||
|
||||
executor.execute(&instance.id, &def, ®istry, None).await.unwrap();
|
||||
executor
|
||||
.execute(&instance.id, &def, ®istry, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
|
||||
let updated = persistence
|
||||
.get_workflow_instance(&instance.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
updated.execution_pointers[0].outcome,
|
||||
Some(serde_json::json!("yes"))
|
||||
@@ -1318,15 +1462,33 @@ mod tests {
|
||||
persistence.create_new_workflow(&instance).await.unwrap();
|
||||
|
||||
// First execution: fails, retry scheduled.
|
||||
executor.execute(&instance.id, &def, ®istry, None).await.unwrap();
|
||||
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
|
||||
executor
|
||||
.execute(&instance.id, &def, ®istry, None)
|
||||
.await
|
||||
.unwrap();
|
||||
let updated = persistence
|
||||
.get_workflow_instance(&instance.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(updated.execution_pointers[0].retry_count, 1);
|
||||
assert_eq!(updated.execution_pointers[0].status, PointerStatus::Sleeping);
|
||||
assert_eq!(
|
||||
updated.execution_pointers[0].status,
|
||||
PointerStatus::Sleeping
|
||||
);
|
||||
|
||||
// Second execution: succeeds (sleep_until is in the past with 0ms interval).
|
||||
executor.execute(&instance.id, &def, ®istry, None).await.unwrap();
|
||||
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
|
||||
assert_eq!(updated.execution_pointers[0].status, PointerStatus::Complete);
|
||||
executor
|
||||
.execute(&instance.id, &def, ®istry, None)
|
||||
.await
|
||||
.unwrap();
|
||||
let updated = persistence
|
||||
.get_workflow_instance(&instance.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
updated.execution_pointers[0].status,
|
||||
PointerStatus::Complete
|
||||
);
|
||||
assert_eq!(updated.status, WorkflowStatus::Complete);
|
||||
}
|
||||
|
||||
@@ -1342,9 +1504,15 @@ mod tests {
|
||||
// No execution pointers at all.
|
||||
persistence.create_new_workflow(&instance).await.unwrap();
|
||||
|
||||
executor.execute(&instance.id, &def, ®istry, None).await.unwrap();
|
||||
executor
|
||||
.execute(&instance.id, &def, ®istry, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let updated = persistence.get_workflow_instance(&instance.id).await.unwrap();
|
||||
let updated = persistence
|
||||
.get_workflow_instance(&instance.id)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(updated.status, WorkflowStatus::Runnable);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -136,13 +136,11 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn step_condition_any_serde_round_trip() {
|
||||
let condition = StepCondition::Any(vec![
|
||||
StepCondition::Comparison(FieldComparison {
|
||||
field: ".x".to_string(),
|
||||
operator: ComparisonOp::IsNull,
|
||||
value: None,
|
||||
}),
|
||||
]);
|
||||
let condition = StepCondition::Any(vec![StepCondition::Comparison(FieldComparison {
|
||||
field: ".x".to_string(),
|
||||
operator: ComparisonOp::IsNull,
|
||||
value: None,
|
||||
})]);
|
||||
let json_str = serde_json::to_string(&condition).unwrap();
|
||||
let deserialized: StepCondition = serde_json::from_str(&json_str).unwrap();
|
||||
assert_eq!(condition, deserialized);
|
||||
@@ -150,13 +148,11 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn step_condition_none_serde_round_trip() {
|
||||
let condition = StepCondition::None(vec![
|
||||
StepCondition::Comparison(FieldComparison {
|
||||
field: ".err".to_string(),
|
||||
operator: ComparisonOp::IsNotNull,
|
||||
value: None,
|
||||
}),
|
||||
]);
|
||||
let condition = StepCondition::None(vec![StepCondition::Comparison(FieldComparison {
|
||||
field: ".err".to_string(),
|
||||
operator: ComparisonOp::IsNotNull,
|
||||
value: None,
|
||||
})]);
|
||||
let json_str = serde_json::to_string(&condition).unwrap();
|
||||
let deserialized: StepCondition = serde_json::from_str(&json_str).unwrap();
|
||||
assert_eq!(condition, deserialized);
|
||||
|
||||
@@ -75,7 +75,11 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn new_event_defaults() {
|
||||
let event = Event::new("order.created", "order-456", serde_json::json!({"amount": 100}));
|
||||
let event = Event::new(
|
||||
"order.created",
|
||||
"order-456",
|
||||
serde_json::json!({"amount": 100}),
|
||||
);
|
||||
assert_eq!(event.event_name, "order.created");
|
||||
assert_eq!(event.event_key, "order-456");
|
||||
assert!(!event.is_processed);
|
||||
|
||||
@@ -59,7 +59,10 @@ impl ExecutionResult {
|
||||
}
|
||||
|
||||
/// Create child branches for parallel/foreach execution.
|
||||
pub fn branch(values: Vec<serde_json::Value>, persistence_data: Option<serde_json::Value>) -> Self {
|
||||
pub fn branch(
|
||||
values: Vec<serde_json::Value>,
|
||||
persistence_data: Option<serde_json::Value>,
|
||||
) -> Self {
|
||||
Self {
|
||||
proceed: false,
|
||||
branch_values: Some(values),
|
||||
@@ -137,7 +140,11 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn branch_creates_child_values() {
|
||||
let values = vec![serde_json::json!(1), serde_json::json!(2), serde_json::json!(3)];
|
||||
let values = vec![
|
||||
serde_json::json!(1),
|
||||
serde_json::json!(2),
|
||||
serde_json::json!(3),
|
||||
];
|
||||
let result = ExecutionResult::branch(values.clone(), None);
|
||||
assert!(!result.proceed);
|
||||
assert_eq!(result.branch_values, Some(values));
|
||||
@@ -181,7 +188,8 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn serde_round_trip() {
|
||||
let result = ExecutionResult::sleep(Duration::from_secs(30), Some(serde_json::json!({"x": 1})));
|
||||
let result =
|
||||
ExecutionResult::sleep(Duration::from_secs(30), Some(serde_json::json!({"x": 1})));
|
||||
let json = serde_json::to_string(&result).unwrap();
|
||||
let deserialized: ExecutionResult = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(result.proceed, deserialized.proceed);
|
||||
|
||||
@@ -18,9 +18,17 @@ pub enum LifecycleEventType {
|
||||
Suspended,
|
||||
Completed,
|
||||
Terminated,
|
||||
Error { message: String },
|
||||
StepStarted { step_id: usize, step_name: Option<String> },
|
||||
StepCompleted { step_id: usize, step_name: Option<String> },
|
||||
Error {
|
||||
message: String,
|
||||
},
|
||||
StepStarted {
|
||||
step_id: usize,
|
||||
step_name: Option<String>,
|
||||
},
|
||||
StepCompleted {
|
||||
step_id: usize,
|
||||
step_name: Option<String>,
|
||||
},
|
||||
}
|
||||
|
||||
impl LifecycleEvent {
|
||||
@@ -56,7 +64,10 @@ mod tests {
|
||||
let event = LifecycleEvent::new("wf-1", "def-1", 1, LifecycleEventType::Started);
|
||||
let json = serde_json::to_string(&event).unwrap();
|
||||
let deserialized: LifecycleEvent = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(event.workflow_instance_id, deserialized.workflow_instance_id);
|
||||
assert_eq!(
|
||||
event.workflow_instance_id,
|
||||
deserialized.workflow_instance_id
|
||||
);
|
||||
assert_eq!(event.event_type, deserialized.event_type);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
pub mod condition;
|
||||
pub mod error_behavior;
|
||||
pub mod event;
|
||||
pub mod service;
|
||||
pub mod execution_error;
|
||||
pub mod execution_pointer;
|
||||
pub mod execution_result;
|
||||
@@ -10,6 +9,7 @@ pub mod poll_config;
|
||||
pub mod queue_type;
|
||||
pub mod scheduled_command;
|
||||
pub mod schema;
|
||||
pub mod service;
|
||||
pub mod status;
|
||||
pub mod workflow_definition;
|
||||
pub mod workflow_instance;
|
||||
@@ -25,9 +25,11 @@ pub use poll_config::{HttpMethod, PollCondition, PollEndpointConfig};
|
||||
pub use queue_type::QueueType;
|
||||
pub use scheduled_command::{CommandName, ScheduledCommand};
|
||||
pub use schema::{SchemaType, WorkflowSchema};
|
||||
pub use service::{
|
||||
ReadinessCheck, ReadinessProbe, ServiceDefinition, ServiceEndpoint, ServicePort,
|
||||
};
|
||||
pub use status::{PointerStatus, WorkflowStatus};
|
||||
pub use workflow_definition::{StepOutcome, WorkflowDefinition, WorkflowStep};
|
||||
pub use service::{ReadinessCheck, ReadinessProbe, ServiceDefinition, ServiceEndpoint, ServicePort};
|
||||
pub use workflow_instance::WorkflowInstance;
|
||||
|
||||
/// Serde helper for `Option<Duration>` as milliseconds.
|
||||
|
||||
@@ -63,9 +63,7 @@ pub fn parse_type(s: &str) -> crate::Result<SchemaType> {
|
||||
"integer" => Ok(SchemaType::Integer),
|
||||
"bool" => Ok(SchemaType::Bool),
|
||||
"any" => Ok(SchemaType::Any),
|
||||
_ => Err(crate::WfeError::StepExecution(format!(
|
||||
"Unknown type: {s}"
|
||||
))),
|
||||
_ => Err(crate::WfeError::StepExecution(format!("Unknown type: {s}"))),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -110,8 +108,7 @@ pub fn validate_value(value: &serde_json::Value, expected: &SchemaType) -> Resul
|
||||
SchemaType::List(inner) => {
|
||||
if let Some(arr) = value.as_array() {
|
||||
for (i, item) in arr.iter().enumerate() {
|
||||
validate_value(item, inner)
|
||||
.map_err(|e| format!("list element [{i}]: {e}"))?;
|
||||
validate_value(item, inner).map_err(|e| format!("list element [{i}]: {e}"))?;
|
||||
}
|
||||
Ok(())
|
||||
} else {
|
||||
@@ -121,8 +118,7 @@ pub fn validate_value(value: &serde_json::Value, expected: &SchemaType) -> Resul
|
||||
SchemaType::Map(inner) => {
|
||||
if let Some(obj) = value.as_object() {
|
||||
for (key, val) in obj {
|
||||
validate_value(val, inner)
|
||||
.map_err(|e| format!("map key \"{key}\": {e}"))?;
|
||||
validate_value(val, inner).map_err(|e| format!("map key \"{key}\": {e}"))?;
|
||||
}
|
||||
Ok(())
|
||||
} else {
|
||||
|
||||
@@ -130,7 +130,10 @@ mod tests {
|
||||
|
||||
let result = step.run(&ctx).await.unwrap();
|
||||
assert!(!result.proceed);
|
||||
assert_eq!(result.branch_values, Some(vec![json!(1), json!(2), json!(3)]));
|
||||
assert_eq!(
|
||||
result.branch_values,
|
||||
Some(vec![json!(1), json!(2), json!(3)])
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -60,7 +60,10 @@ mod tests {
|
||||
let result = step.run(&ctx).await.unwrap();
|
||||
assert!(!result.proceed);
|
||||
assert_eq!(result.branch_values, Some(vec![json!(null)]));
|
||||
assert_eq!(result.persistence_data, Some(json!({"children_active": true})));
|
||||
assert_eq!(
|
||||
result.persistence_data,
|
||||
Some(json!({"children_active": true}))
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -116,6 +119,9 @@ mod tests {
|
||||
|
||||
let result = step.run(&ctx).await.unwrap();
|
||||
assert!(!result.proceed);
|
||||
assert_eq!(result.persistence_data, Some(json!({"children_active": true})));
|
||||
assert_eq!(
|
||||
result.persistence_data,
|
||||
Some(json!({"children_active": true}))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,7 +45,7 @@ mod test_helpers {
|
||||
workflow,
|
||||
cancellation_token: CancellationToken::new(),
|
||||
host_context: None,
|
||||
log_sink: None,
|
||||
log_sink: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use async_trait::async_trait;
|
||||
|
||||
use crate::models::poll_config::PollEndpointConfig;
|
||||
use crate::models::ExecutionResult;
|
||||
use crate::models::poll_config::PollEndpointConfig;
|
||||
use crate::traits::step::{StepBody, StepExecutionContext};
|
||||
|
||||
/// A step that polls an external HTTP endpoint until a condition is met.
|
||||
@@ -21,8 +21,8 @@ impl StepBody for PollEndpointStep {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::models::poll_config::{HttpMethod, PollCondition};
|
||||
use crate::models::ExecutionPointer;
|
||||
use crate::models::poll_config::{HttpMethod, PollCondition};
|
||||
use crate::primitives::test_helpers::*;
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -85,7 +85,10 @@ mod tests {
|
||||
let result = step.run(&ctx).await.unwrap();
|
||||
assert!(!result.proceed);
|
||||
assert_eq!(result.sleep_for, Some(Duration::from_secs(10)));
|
||||
assert_eq!(result.persistence_data, Some(json!({"children_active": true})));
|
||||
assert_eq!(
|
||||
result.persistence_data,
|
||||
Some(json!({"children_active": true}))
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -130,6 +133,9 @@ mod tests {
|
||||
let result = step.run(&ctx).await.unwrap();
|
||||
assert!(!result.proceed);
|
||||
assert!(result.sleep_for.is_none());
|
||||
assert_eq!(result.persistence_data, Some(json!({"children_active": true})));
|
||||
assert_eq!(
|
||||
result.persistence_data,
|
||||
Some(json!({"children_active": true}))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -89,7 +89,10 @@ mod tests {
|
||||
let result = step.run(&ctx).await.unwrap();
|
||||
assert!(!result.proceed);
|
||||
assert_eq!(result.branch_values, Some(vec![json!(null)]));
|
||||
assert_eq!(result.persistence_data, Some(json!({"children_active": true})));
|
||||
assert_eq!(
|
||||
result.persistence_data,
|
||||
Some(json!({"children_active": true}))
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -60,7 +60,10 @@ mod tests {
|
||||
let result = step.run(&ctx).await.unwrap();
|
||||
assert!(!result.proceed);
|
||||
assert_eq!(result.sleep_for, Some(Duration::from_secs(30)));
|
||||
assert_eq!(result.persistence_data, Some(json!({"children_active": true})));
|
||||
assert_eq!(
|
||||
result.persistence_data,
|
||||
Some(json!({"children_active": true}))
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -101,6 +104,9 @@ mod tests {
|
||||
let ctx = make_context(&pointer, &wf_step, &workflow);
|
||||
let result = step.run(&ctx).await.unwrap();
|
||||
assert!(!result.proceed);
|
||||
assert_eq!(result.persistence_data, Some(json!({"children_active": true})));
|
||||
assert_eq!(
|
||||
result.persistence_data,
|
||||
Some(json!({"children_active": true}))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -61,7 +61,10 @@ mod tests {
|
||||
let ctx = make_context(&pointer, &wf_step, &workflow);
|
||||
let result = step.run(&ctx).await.unwrap();
|
||||
assert!(!result.proceed);
|
||||
assert_eq!(result.persistence_data, Some(json!({"children_active": true})));
|
||||
assert_eq!(
|
||||
result.persistence_data,
|
||||
Some(json!({"children_active": true}))
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -69,7 +69,10 @@ mod tests {
|
||||
let result = step.run(&ctx).await.unwrap();
|
||||
assert!(!result.proceed);
|
||||
assert_eq!(result.branch_values, Some(vec![json!(null)]));
|
||||
assert_eq!(result.persistence_data, Some(json!({"children_active": true})));
|
||||
assert_eq!(
|
||||
result.persistence_data,
|
||||
Some(json!({"children_active": true}))
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -141,6 +144,9 @@ mod tests {
|
||||
let result = step.run(&ctx).await.unwrap();
|
||||
assert!(!result.proceed);
|
||||
assert!(result.branch_values.is_none());
|
||||
assert_eq!(result.persistence_data, Some(json!({"children_active": true})));
|
||||
assert_eq!(
|
||||
result.persistence_data,
|
||||
Some(json!({"children_active": true}))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,9 +3,9 @@ use std::sync::Arc;
|
||||
use async_trait::async_trait;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::Result;
|
||||
use crate::models::LifecycleEvent;
|
||||
use crate::traits::LifecyclePublisher;
|
||||
use crate::Result;
|
||||
|
||||
/// An in-memory implementation of `LifecyclePublisher` for testing.
|
||||
#[derive(Debug, Clone)]
|
||||
|
||||
@@ -4,8 +4,8 @@ use std::sync::Arc;
|
||||
use async_trait::async_trait;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::traits::DistributedLockProvider;
|
||||
use crate::Result;
|
||||
use crate::traits::DistributedLockProvider;
|
||||
|
||||
/// An in-memory implementation of `DistributedLockProvider` for testing.
|
||||
#[derive(Debug, Clone)]
|
||||
|
||||
@@ -4,9 +4,9 @@ use std::sync::Arc;
|
||||
use async_trait::async_trait;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::Result;
|
||||
use crate::models::QueueType;
|
||||
use crate::traits::QueueProvider;
|
||||
use crate::Result;
|
||||
|
||||
/// An in-memory implementation of `QueueProvider` for testing.
|
||||
#[derive(Debug, Clone)]
|
||||
|
||||
@@ -17,18 +17,9 @@ macro_rules! queue_suite {
|
||||
#[tokio::test]
|
||||
async fn enqueue_dequeue_fifo() {
|
||||
let provider = ($factory)().await;
|
||||
provider
|
||||
.queue_work("a", QueueType::Workflow)
|
||||
.await
|
||||
.unwrap();
|
||||
provider
|
||||
.queue_work("b", QueueType::Workflow)
|
||||
.await
|
||||
.unwrap();
|
||||
provider
|
||||
.queue_work("c", QueueType::Workflow)
|
||||
.await
|
||||
.unwrap();
|
||||
provider.queue_work("a", QueueType::Workflow).await.unwrap();
|
||||
provider.queue_work("b", QueueType::Workflow).await.unwrap();
|
||||
provider.queue_work("c", QueueType::Workflow).await.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
provider
|
||||
@@ -94,16 +85,20 @@ macro_rules! queue_suite {
|
||||
);
|
||||
|
||||
// Both should now be empty
|
||||
assert!(provider
|
||||
.dequeue_work(QueueType::Event)
|
||||
.await
|
||||
.unwrap()
|
||||
.is_none());
|
||||
assert!(provider
|
||||
.dequeue_work(QueueType::Workflow)
|
||||
.await
|
||||
.unwrap()
|
||||
.is_none());
|
||||
assert!(
|
||||
provider
|
||||
.dequeue_work(QueueType::Event)
|
||||
.await
|
||||
.unwrap()
|
||||
.is_none()
|
||||
);
|
||||
assert!(
|
||||
provider
|
||||
.dequeue_work(QueueType::Workflow)
|
||||
.await
|
||||
.unwrap()
|
||||
.is_none()
|
||||
);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -69,7 +69,7 @@ mod tests {
|
||||
workflow: &instance,
|
||||
cancellation_token: tokio_util::sync::CancellationToken::new(),
|
||||
host_context: None,
|
||||
log_sink: None,
|
||||
log_sink: None,
|
||||
};
|
||||
mw.pre_step(&ctx).await.unwrap();
|
||||
}
|
||||
@@ -89,7 +89,7 @@ mod tests {
|
||||
workflow: &instance,
|
||||
cancellation_token: tokio_util::sync::CancellationToken::new(),
|
||||
host_context: None,
|
||||
log_sink: None,
|
||||
log_sink: None,
|
||||
};
|
||||
let result = ExecutionResult::next();
|
||||
mw.post_step(&ctx, &result).await.unwrap();
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
pub mod lifecycle;
|
||||
pub mod lock;
|
||||
pub mod service;
|
||||
pub mod log_sink;
|
||||
pub mod middleware;
|
||||
pub mod persistence;
|
||||
pub mod queue;
|
||||
pub mod registry;
|
||||
pub mod search;
|
||||
pub mod service;
|
||||
pub mod step;
|
||||
|
||||
pub use lifecycle::LifecyclePublisher;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use async_trait::async_trait;
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::Serialize;
|
||||
use serde::de::DeserializeOwned;
|
||||
|
||||
use crate::models::{ExecutionPointer, ExecutionResult, WorkflowInstance, WorkflowStep};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user