From 88fc6bf7ad1e524c0a784298ddc4a8331a1422fb Mon Sep 17 00:00:00 2001 From: Sienna Meridian Satterwhite Date: Wed, 25 Mar 2026 20:32:47 +0000 Subject: [PATCH] feat: add executor tracing, auto-register primitives, and Default impls - Add info!-level tracing to workflow executor: logs each execution round, each step run (with type and name), step completion, and workflow completion - WorkflowHost.start() now auto-registers all built-in primitive step types so users don't need to register them manually - Add #[derive(Default)] to all primitive steps and PollEndpointConfig - Add tracing-subscriber to wfe crate for the pizza example - Pizza example now shows full step-by-step execution logs --- wfe-core/src/executor/workflow_executor.rs | 26 +++++++++++++++++++++- wfe-core/src/models/poll_config.rs | 11 +++++++-- wfe-core/src/primitives/decide.rs | 1 + wfe-core/src/primitives/end_step.rs | 1 + wfe-core/src/primitives/if_step.rs | 1 + wfe-core/src/primitives/poll_endpoint.rs | 1 + wfe-core/src/primitives/recur.rs | 1 + wfe-core/src/primitives/schedule.rs | 1 + wfe-core/src/primitives/sequence.rs | 1 + wfe-core/src/primitives/while_step.rs | 1 + wfe/Cargo.toml | 2 ++ wfe/examples/pizza.rs | 7 ++++++ wfe/src/host.rs | 19 ++++++++++++++++ 13 files changed, 70 insertions(+), 3 deletions(-) diff --git a/wfe-core/src/executor/workflow_executor.rs b/wfe-core/src/executor/workflow_executor.rs index 93f862c..ef0fb90 100644 --- a/wfe-core/src/executor/workflow_executor.rs +++ b/wfe-core/src/executor/workflow_executor.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use chrono::Utc; -use tracing::{debug, error, warn}; +use tracing::{debug, error, info, warn}; use super::error_handler; use super::result_processor; @@ -106,6 +106,12 @@ impl WorkflowExecutor { let mut execution_errors = Vec::new(); // 3. Find runnable execution pointers. + info!( + workflow_id, + definition_id = %workflow.workflow_definition_id, + pointers = workflow.execution_pointers.len(), + "Executing workflow" + ); let runnable_indices: Vec = workflow .execution_pointers .iter() @@ -125,6 +131,14 @@ impl WorkflowExecutor { .find(|s| s.id == step_id) .ok_or(WfeError::StepNotFound(step_id))?; + info!( + workflow_id, + step_id, + step_type = %step.step_type, + step_name = step.name.as_deref().unwrap_or("(unnamed)"), + "Running step" + ); + // b. Resolve the step body. let mut step_body = step_registry .resolve(&step.step_type) @@ -156,6 +170,15 @@ impl WorkflowExecutor { // Now we can mutate again since context is dropped. match step_result { Ok(result) => { + info!( + workflow_id, + step_id, + proceed = result.proceed, + has_sleep = result.sleep_for.is_some(), + has_event = result.event_name.is_some(), + has_branches = result.branch_values.is_some(), + "Step completed" + ); // e. Process the ExecutionResult. // Extract workflow_id before mutable borrow. let wf_id = workflow.id.clone(); @@ -225,6 +248,7 @@ impl WorkflowExecutor { }); if all_done && workflow.status == WorkflowStatus::Runnable { + info!(workflow_id, "All pointers complete, workflow finished"); workflow.status = WorkflowStatus::Complete; workflow.complete_time = Some(Utc::now()); } diff --git a/wfe-core/src/models/poll_config.rs b/wfe-core/src/models/poll_config.rs index b8e7aa7..f5ba4db 100644 --- a/wfe-core/src/models/poll_config.rs +++ b/wfe-core/src/models/poll_config.rs @@ -3,8 +3,9 @@ use std::time::Duration; use serde::{Deserialize, Serialize}; -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] pub enum HttpMethod { + #[default] Get, Post, Put, @@ -25,7 +26,13 @@ pub enum PollCondition { BodyContains(String), } -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +impl Default for PollCondition { + fn default() -> Self { + Self::StatusCode(200) + } +} + +#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)] pub struct PollEndpointConfig { /// URL template. Supports `{placeholder}` interpolation from workflow data. pub url: String, diff --git a/wfe-core/src/primitives/decide.rs b/wfe-core/src/primitives/decide.rs index 63feeea..c0bdf73 100644 --- a/wfe-core/src/primitives/decide.rs +++ b/wfe-core/src/primitives/decide.rs @@ -4,6 +4,7 @@ use crate::models::ExecutionResult; use crate::traits::step::{StepBody, StepExecutionContext}; /// A decision step that returns an outcome value for routing. +#[derive(Default)] pub struct DecideStep { pub expression_value: serde_json::Value, } diff --git a/wfe-core/src/primitives/end_step.rs b/wfe-core/src/primitives/end_step.rs index 742dd72..4893cbc 100644 --- a/wfe-core/src/primitives/end_step.rs +++ b/wfe-core/src/primitives/end_step.rs @@ -4,6 +4,7 @@ use crate::models::ExecutionResult; use crate::traits::step::{StepBody, StepExecutionContext}; /// A no-op marker step indicating the end of a workflow branch. +#[derive(Default)] pub struct EndStep; #[async_trait] diff --git a/wfe-core/src/primitives/if_step.rs b/wfe-core/src/primitives/if_step.rs index f22a5d8..250c2b5 100644 --- a/wfe-core/src/primitives/if_step.rs +++ b/wfe-core/src/primitives/if_step.rs @@ -5,6 +5,7 @@ use crate::models::ExecutionResult; use crate::traits::step::{StepBody, StepExecutionContext}; /// A conditional step that branches execution based on a boolean condition. +#[derive(Default)] pub struct IfStep { pub condition: bool, } diff --git a/wfe-core/src/primitives/poll_endpoint.rs b/wfe-core/src/primitives/poll_endpoint.rs index aabb9f4..a47d45e 100644 --- a/wfe-core/src/primitives/poll_endpoint.rs +++ b/wfe-core/src/primitives/poll_endpoint.rs @@ -6,6 +6,7 @@ use crate::traits::step::{StepBody, StepExecutionContext}; /// A step that polls an external HTTP endpoint until a condition is met. /// The actual HTTP polling is handled by the executor, not this step. +#[derive(Default)] pub struct PollEndpointStep { pub config: PollEndpointConfig, } diff --git a/wfe-core/src/primitives/recur.rs b/wfe-core/src/primitives/recur.rs index 2125554..535aa56 100644 --- a/wfe-core/src/primitives/recur.rs +++ b/wfe-core/src/primitives/recur.rs @@ -8,6 +8,7 @@ use crate::traits::step::{StepBody, StepExecutionContext}; /// A step that repeatedly schedules child execution at an interval /// until a stop condition is met. +#[derive(Default)] pub struct RecurStep { pub interval: Duration, pub stop_condition: bool, diff --git a/wfe-core/src/primitives/schedule.rs b/wfe-core/src/primitives/schedule.rs index 290587a..1f4b12f 100644 --- a/wfe-core/src/primitives/schedule.rs +++ b/wfe-core/src/primitives/schedule.rs @@ -7,6 +7,7 @@ use crate::models::ExecutionResult; use crate::traits::step::{StepBody, StepExecutionContext}; /// A step that schedules child execution after a delay. +#[derive(Default)] pub struct ScheduleStep { pub interval: Duration, } diff --git a/wfe-core/src/primitives/sequence.rs b/wfe-core/src/primitives/sequence.rs index f822d4d..74da8e3 100644 --- a/wfe-core/src/primitives/sequence.rs +++ b/wfe-core/src/primitives/sequence.rs @@ -6,6 +6,7 @@ use crate::traits::step::{StepBody, StepExecutionContext}; /// A container step that executes its children sequentially. /// Completes when all children have finished. +#[derive(Default)] pub struct SequenceStep; #[async_trait] diff --git a/wfe-core/src/primitives/while_step.rs b/wfe-core/src/primitives/while_step.rs index 2374638..0210706 100644 --- a/wfe-core/src/primitives/while_step.rs +++ b/wfe-core/src/primitives/while_step.rs @@ -5,6 +5,7 @@ use crate::models::ExecutionResult; use crate::traits::step::{StepBody, StepExecutionContext}; /// A looping step that repeats its children while a condition is true. +#[derive(Default)] pub struct WhileStep { pub condition: bool, } diff --git a/wfe/Cargo.toml b/wfe/Cargo.toml index 0b69b0f..e68bdac 100644 --- a/wfe/Cargo.toml +++ b/wfe/Cargo.toml @@ -17,6 +17,8 @@ thiserror = { workspace = true } tracing = { workspace = true } tokio-util = "0.7" +tracing-subscriber = { workspace = true } + [dev-dependencies] wfe-core = { workspace = true, features = ["test-support"] } wfe-sqlite = { workspace = true } diff --git a/wfe/examples/pizza.rs b/wfe/examples/pizza.rs index 5dfc36a..bdbdeb3 100644 --- a/wfe/examples/pizza.rs +++ b/wfe/examples/pizza.rs @@ -444,6 +444,13 @@ fn build_pizza_workflow() -> WorkflowDefinition { #[tokio::main] async fn main() -> std::result::Result<(), Box> { + // Set up tracing so we can see the executor's step-by-step logging. + tracing_subscriber::fmt() + .with_target(false) + .with_timer(tracing_subscriber::fmt::time::uptime()) + .with_env_filter("wfe_core=info,wfe=info") + .init(); + println!("=== WFE Pizza Workflow Engine Demo ===\n"); // Build the host with in-memory providers (swap for SQLite/Postgres/Valkey in prod) diff --git a/wfe/src/host.rs b/wfe/src/host.rs index 95f445f..73c89e8 100644 --- a/wfe/src/host.rs +++ b/wfe/src/host.rs @@ -33,8 +33,27 @@ pub struct WorkflowHost { } impl WorkflowHost { + /// Register all built-in primitive step types. + async fn register_primitives(&self) { + use wfe_core::primitives::*; + let mut sr = self.step_registry.write().await; + sr.register::(); + sr.register::(); + sr.register::(); + sr.register::(); + sr.register::(); + sr.register::(); + sr.register::(); + sr.register::(); + sr.register::(); + sr.register::(); + sr.register::(); + sr.register::(); + } + /// Spawn background polling tasks for processing workflows and events. pub async fn start(&self) -> Result<()> { + self.register_primitives().await; self.queue_provider.start().await?; self.lock_provider.start().await?; if let Some(ref search) = self.search {