feat: add executor tracing, auto-register primitives, and Default impls
- Add info!-level tracing to workflow executor: logs each execution round, each step run (with type and name), step completion, and workflow completion - WorkflowHost.start() now auto-registers all built-in primitive step types so users don't need to register them manually - Add #[derive(Default)] to all primitive steps and PollEndpointConfig - Add tracing-subscriber to wfe crate for the pizza example - Pizza example now shows full step-by-step execution logs
This commit is contained in:
@@ -1,7 +1,7 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use tracing::{debug, error, warn};
|
use tracing::{debug, error, info, warn};
|
||||||
|
|
||||||
use super::error_handler;
|
use super::error_handler;
|
||||||
use super::result_processor;
|
use super::result_processor;
|
||||||
@@ -106,6 +106,12 @@ impl WorkflowExecutor {
|
|||||||
let mut execution_errors = Vec::new();
|
let mut execution_errors = Vec::new();
|
||||||
|
|
||||||
// 3. Find runnable execution pointers.
|
// 3. Find runnable execution pointers.
|
||||||
|
info!(
|
||||||
|
workflow_id,
|
||||||
|
definition_id = %workflow.workflow_definition_id,
|
||||||
|
pointers = workflow.execution_pointers.len(),
|
||||||
|
"Executing workflow"
|
||||||
|
);
|
||||||
let runnable_indices: Vec<usize> = workflow
|
let runnable_indices: Vec<usize> = workflow
|
||||||
.execution_pointers
|
.execution_pointers
|
||||||
.iter()
|
.iter()
|
||||||
@@ -125,6 +131,14 @@ impl WorkflowExecutor {
|
|||||||
.find(|s| s.id == step_id)
|
.find(|s| s.id == step_id)
|
||||||
.ok_or(WfeError::StepNotFound(step_id))?;
|
.ok_or(WfeError::StepNotFound(step_id))?;
|
||||||
|
|
||||||
|
info!(
|
||||||
|
workflow_id,
|
||||||
|
step_id,
|
||||||
|
step_type = %step.step_type,
|
||||||
|
step_name = step.name.as_deref().unwrap_or("(unnamed)"),
|
||||||
|
"Running step"
|
||||||
|
);
|
||||||
|
|
||||||
// b. Resolve the step body.
|
// b. Resolve the step body.
|
||||||
let mut step_body = step_registry
|
let mut step_body = step_registry
|
||||||
.resolve(&step.step_type)
|
.resolve(&step.step_type)
|
||||||
@@ -156,6 +170,15 @@ impl WorkflowExecutor {
|
|||||||
// Now we can mutate again since context is dropped.
|
// Now we can mutate again since context is dropped.
|
||||||
match step_result {
|
match step_result {
|
||||||
Ok(result) => {
|
Ok(result) => {
|
||||||
|
info!(
|
||||||
|
workflow_id,
|
||||||
|
step_id,
|
||||||
|
proceed = result.proceed,
|
||||||
|
has_sleep = result.sleep_for.is_some(),
|
||||||
|
has_event = result.event_name.is_some(),
|
||||||
|
has_branches = result.branch_values.is_some(),
|
||||||
|
"Step completed"
|
||||||
|
);
|
||||||
// e. Process the ExecutionResult.
|
// e. Process the ExecutionResult.
|
||||||
// Extract workflow_id before mutable borrow.
|
// Extract workflow_id before mutable borrow.
|
||||||
let wf_id = workflow.id.clone();
|
let wf_id = workflow.id.clone();
|
||||||
@@ -225,6 +248,7 @@ impl WorkflowExecutor {
|
|||||||
});
|
});
|
||||||
|
|
||||||
if all_done && workflow.status == WorkflowStatus::Runnable {
|
if all_done && workflow.status == WorkflowStatus::Runnable {
|
||||||
|
info!(workflow_id, "All pointers complete, workflow finished");
|
||||||
workflow.status = WorkflowStatus::Complete;
|
workflow.status = WorkflowStatus::Complete;
|
||||||
workflow.complete_time = Some(Utc::now());
|
workflow.complete_time = Some(Utc::now());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,8 +3,9 @@ use std::time::Duration;
|
|||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
|
||||||
pub enum HttpMethod {
|
pub enum HttpMethod {
|
||||||
|
#[default]
|
||||||
Get,
|
Get,
|
||||||
Post,
|
Post,
|
||||||
Put,
|
Put,
|
||||||
@@ -25,7 +26,13 @@ pub enum PollCondition {
|
|||||||
BodyContains(String),
|
BodyContains(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
impl Default for PollCondition {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::StatusCode(200)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
|
||||||
pub struct PollEndpointConfig {
|
pub struct PollEndpointConfig {
|
||||||
/// URL template. Supports `{placeholder}` interpolation from workflow data.
|
/// URL template. Supports `{placeholder}` interpolation from workflow data.
|
||||||
pub url: String,
|
pub url: String,
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ use crate::models::ExecutionResult;
|
|||||||
use crate::traits::step::{StepBody, StepExecutionContext};
|
use crate::traits::step::{StepBody, StepExecutionContext};
|
||||||
|
|
||||||
/// A decision step that returns an outcome value for routing.
|
/// A decision step that returns an outcome value for routing.
|
||||||
|
#[derive(Default)]
|
||||||
pub struct DecideStep {
|
pub struct DecideStep {
|
||||||
pub expression_value: serde_json::Value,
|
pub expression_value: serde_json::Value,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ use crate::models::ExecutionResult;
|
|||||||
use crate::traits::step::{StepBody, StepExecutionContext};
|
use crate::traits::step::{StepBody, StepExecutionContext};
|
||||||
|
|
||||||
/// A no-op marker step indicating the end of a workflow branch.
|
/// A no-op marker step indicating the end of a workflow branch.
|
||||||
|
#[derive(Default)]
|
||||||
pub struct EndStep;
|
pub struct EndStep;
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ use crate::models::ExecutionResult;
|
|||||||
use crate::traits::step::{StepBody, StepExecutionContext};
|
use crate::traits::step::{StepBody, StepExecutionContext};
|
||||||
|
|
||||||
/// A conditional step that branches execution based on a boolean condition.
|
/// A conditional step that branches execution based on a boolean condition.
|
||||||
|
#[derive(Default)]
|
||||||
pub struct IfStep {
|
pub struct IfStep {
|
||||||
pub condition: bool,
|
pub condition: bool,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ use crate::traits::step::{StepBody, StepExecutionContext};
|
|||||||
|
|
||||||
/// A step that polls an external HTTP endpoint until a condition is met.
|
/// A step that polls an external HTTP endpoint until a condition is met.
|
||||||
/// The actual HTTP polling is handled by the executor, not this step.
|
/// The actual HTTP polling is handled by the executor, not this step.
|
||||||
|
#[derive(Default)]
|
||||||
pub struct PollEndpointStep {
|
pub struct PollEndpointStep {
|
||||||
pub config: PollEndpointConfig,
|
pub config: PollEndpointConfig,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ use crate::traits::step::{StepBody, StepExecutionContext};
|
|||||||
|
|
||||||
/// A step that repeatedly schedules child execution at an interval
|
/// A step that repeatedly schedules child execution at an interval
|
||||||
/// until a stop condition is met.
|
/// until a stop condition is met.
|
||||||
|
#[derive(Default)]
|
||||||
pub struct RecurStep {
|
pub struct RecurStep {
|
||||||
pub interval: Duration,
|
pub interval: Duration,
|
||||||
pub stop_condition: bool,
|
pub stop_condition: bool,
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ use crate::models::ExecutionResult;
|
|||||||
use crate::traits::step::{StepBody, StepExecutionContext};
|
use crate::traits::step::{StepBody, StepExecutionContext};
|
||||||
|
|
||||||
/// A step that schedules child execution after a delay.
|
/// A step that schedules child execution after a delay.
|
||||||
|
#[derive(Default)]
|
||||||
pub struct ScheduleStep {
|
pub struct ScheduleStep {
|
||||||
pub interval: Duration,
|
pub interval: Duration,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ use crate::traits::step::{StepBody, StepExecutionContext};
|
|||||||
|
|
||||||
/// A container step that executes its children sequentially.
|
/// A container step that executes its children sequentially.
|
||||||
/// Completes when all children have finished.
|
/// Completes when all children have finished.
|
||||||
|
#[derive(Default)]
|
||||||
pub struct SequenceStep;
|
pub struct SequenceStep;
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ use crate::models::ExecutionResult;
|
|||||||
use crate::traits::step::{StepBody, StepExecutionContext};
|
use crate::traits::step::{StepBody, StepExecutionContext};
|
||||||
|
|
||||||
/// A looping step that repeats its children while a condition is true.
|
/// A looping step that repeats its children while a condition is true.
|
||||||
|
#[derive(Default)]
|
||||||
pub struct WhileStep {
|
pub struct WhileStep {
|
||||||
pub condition: bool,
|
pub condition: bool,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,6 +17,8 @@ thiserror = { workspace = true }
|
|||||||
tracing = { workspace = true }
|
tracing = { workspace = true }
|
||||||
tokio-util = "0.7"
|
tokio-util = "0.7"
|
||||||
|
|
||||||
|
tracing-subscriber = { workspace = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
wfe-core = { workspace = true, features = ["test-support"] }
|
wfe-core = { workspace = true, features = ["test-support"] }
|
||||||
wfe-sqlite = { workspace = true }
|
wfe-sqlite = { workspace = true }
|
||||||
|
|||||||
@@ -444,6 +444,13 @@ fn build_pizza_workflow() -> WorkflowDefinition {
|
|||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
|
||||||
|
// Set up tracing so we can see the executor's step-by-step logging.
|
||||||
|
tracing_subscriber::fmt()
|
||||||
|
.with_target(false)
|
||||||
|
.with_timer(tracing_subscriber::fmt::time::uptime())
|
||||||
|
.with_env_filter("wfe_core=info,wfe=info")
|
||||||
|
.init();
|
||||||
|
|
||||||
println!("=== WFE Pizza Workflow Engine Demo ===\n");
|
println!("=== WFE Pizza Workflow Engine Demo ===\n");
|
||||||
|
|
||||||
// Build the host with in-memory providers (swap for SQLite/Postgres/Valkey in prod)
|
// Build the host with in-memory providers (swap for SQLite/Postgres/Valkey in prod)
|
||||||
|
|||||||
@@ -33,8 +33,27 @@ pub struct WorkflowHost {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl WorkflowHost {
|
impl WorkflowHost {
|
||||||
|
/// Register all built-in primitive step types.
|
||||||
|
async fn register_primitives(&self) {
|
||||||
|
use wfe_core::primitives::*;
|
||||||
|
let mut sr = self.step_registry.write().await;
|
||||||
|
sr.register::<decide::DecideStep>();
|
||||||
|
sr.register::<delay::DelayStep>();
|
||||||
|
sr.register::<end_step::EndStep>();
|
||||||
|
sr.register::<foreach_step::ForEachStep>();
|
||||||
|
sr.register::<if_step::IfStep>();
|
||||||
|
sr.register::<poll_endpoint::PollEndpointStep>();
|
||||||
|
sr.register::<recur::RecurStep>();
|
||||||
|
sr.register::<saga_container::SagaContainerStep>();
|
||||||
|
sr.register::<schedule::ScheduleStep>();
|
||||||
|
sr.register::<sequence::SequenceStep>();
|
||||||
|
sr.register::<wait_for::WaitForStep>();
|
||||||
|
sr.register::<while_step::WhileStep>();
|
||||||
|
}
|
||||||
|
|
||||||
/// Spawn background polling tasks for processing workflows and events.
|
/// Spawn background polling tasks for processing workflows and events.
|
||||||
pub async fn start(&self) -> Result<()> {
|
pub async fn start(&self) -> Result<()> {
|
||||||
|
self.register_primitives().await;
|
||||||
self.queue_provider.start().await?;
|
self.queue_provider.start().await?;
|
||||||
self.lock_provider.start().await?;
|
self.lock_provider.start().await?;
|
||||||
if let Some(ref search) = self.search {
|
if let Some(ref search) = self.search {
|
||||||
|
|||||||
Reference in New Issue
Block a user