From c8582eb5144384a8fb351c118af658c4b747d517 Mon Sep 17 00:00:00 2001 From: Sienna Meridian Satterwhite Date: Wed, 25 Mar 2026 20:41:34 +0000 Subject: [PATCH] feat: add OpenTelemetry tracing support behind otel feature flag - Add tracing::instrument spans to executor (workflow.execute), host (workflow.start, event.publish, event.process) - Add otel feature flag to wfe-core and wfe crates - Add wfe/src/otel.rs helper for OTLP exporter initialization - Dependencies: tracing-opentelemetry, opentelemetry, opentelemetry_sdk, opentelemetry-otlp (all optional behind otel feature) - Step execution stays at info level, executor internals at debug --- Cargo.toml | 4 ++++ wfe-core/Cargo.toml | 3 +++ wfe-core/src/executor/workflow_executor.rs | 23 ++++++++++++++++++ wfe/Cargo.toml | 17 ++++++++++++++ wfe/src/host.rs | 27 +++++++++++++++++++++- 5 files changed, 73 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index ac49eeb..e14585c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,10 @@ chrono = { version = "0.4", features = ["serde"] } thiserror = "2" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } +tracing-opentelemetry = "0.28" +opentelemetry = "0.27" +opentelemetry_sdk = { version = "0.27", features = ["rt-tokio"] } +opentelemetry-otlp = { version = "0.27", features = ["tonic"] } # HTTP reqwest = { version = "0.12", features = ["json"] } diff --git a/wfe-core/Cargo.toml b/wfe-core/Cargo.toml index c74b99b..4e8c42c 100644 --- a/wfe-core/Cargo.toml +++ b/wfe-core/Cargo.toml @@ -8,8 +8,11 @@ description = "Core traits, models, builder, and executor for the WFE workflow e [features] default = [] test-support = [] +otel = ["tracing-opentelemetry", "opentelemetry"] [dependencies] +tracing-opentelemetry = { workspace = true, optional = true } +opentelemetry = { workspace = true, optional = true } tokio = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } diff --git a/wfe-core/src/executor/workflow_executor.rs b/wfe-core/src/executor/workflow_executor.rs index 74e8e69..0e705bf 100644 --- a/wfe-core/src/executor/workflow_executor.rs +++ b/wfe-core/src/executor/workflow_executor.rs @@ -59,6 +59,15 @@ impl WorkflowExecutor { /// 6. Check for completion /// 7. Persist /// 8. Release lock + #[tracing::instrument( + name = "workflow.execute", + skip(self, definition, step_registry), + fields( + workflow.id = %workflow_id, + workflow.definition_id, + workflow.status, + ) + )] pub async fn execute( &self, workflow_id: &str, @@ -96,6 +105,8 @@ impl WorkflowExecutor { .get_workflow_instance(workflow_id) .await?; + tracing::Span::current().record("workflow.definition_id", workflow.workflow_definition_id.as_str()); + if workflow.status != WorkflowStatus::Runnable { debug!(workflow_id, status = ?workflow.status, "Workflow not runnable, skipping"); return Ok(()); @@ -170,6 +181,15 @@ impl WorkflowExecutor { // Now we can mutate again since context is dropped. match step_result { Ok(result) => { + let step_status = if result.sleep_for.is_some() { + "sleeping" + } else if result.event_name.is_some() { + "waiting_for_event" + } else { + "completed" + }; + tracing::Span::current().record("step.status", step_status); + info!( workflow_id, step_id, @@ -202,6 +222,7 @@ impl WorkflowExecutor { Err(e) => { // f. Handle error. let error_msg = e.to_string(); + tracing::Span::current().record("step.status", "failed"); warn!(workflow_id, step_id, error = %error_msg, "Step execution failed"); let pointer_id = workflow.execution_pointers[idx].id.clone(); @@ -253,6 +274,8 @@ impl WorkflowExecutor { workflow.complete_time = Some(Utc::now()); } + tracing::Span::current().record("workflow.status", tracing::field::debug(&workflow.status)); + // Determine next_execution. let has_active = workflow.execution_pointers.iter().any(|p| p.active); if has_active { diff --git a/wfe/Cargo.toml b/wfe/Cargo.toml index e68bdac..55bbdd7 100644 --- a/wfe/Cargo.toml +++ b/wfe/Cargo.toml @@ -5,6 +5,17 @@ edition.workspace = true license.workspace = true description = "WFE workflow engine - umbrella crate" +[features] +default = [] +otel = [ + "wfe-core/otel", + "opentelemetry", + "opentelemetry_sdk", + "opentelemetry-otlp", + "tracing-opentelemetry", + "tracing-subscriber/registry", +] + [dependencies] wfe-core = { workspace = true } tokio = { workspace = true } @@ -19,6 +30,12 @@ tokio-util = "0.7" tracing-subscriber = { workspace = true } +# OTel (optional, behind "otel" feature) +tracing-opentelemetry = { workspace = true, optional = true } +opentelemetry = { workspace = true, optional = true } +opentelemetry_sdk = { workspace = true, optional = true } +opentelemetry-otlp = { workspace = true, optional = true } + [dev-dependencies] wfe-core = { workspace = true, features = ["test-support"] } wfe-sqlite = { workspace = true } diff --git a/wfe/src/host.rs b/wfe/src/host.rs index 73c89e8..b0fffc5 100644 --- a/wfe/src/host.rs +++ b/wfe/src/host.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use tokio::sync::RwLock; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, warn}; +use tracing::{debug, error, info, warn}; use wfe_core::executor::{StepRegistry, WorkflowExecutor}; use wfe_core::models::{ @@ -202,6 +202,15 @@ impl WorkflowHost { } /// Start a new workflow instance. + #[tracing::instrument( + name = "workflow.start", + skip(self, data), + fields( + workflow.definition_id = %definition_id, + workflow.version = version, + workflow.id, + ) + )] pub async fn start_workflow( &self, definition_id: &str, @@ -226,6 +235,9 @@ impl WorkflowHost { // Persist the instance. let id = self.persistence.create_new_workflow(&instance).await?; instance.id = id.clone(); + tracing::Span::current().record("workflow.id", id.as_str()); + + info!(workflow_id = %id, "Workflow instance created"); // Queue for execution. self.queue_provider @@ -236,6 +248,14 @@ impl WorkflowHost { } /// Publish an event that may resume waiting workflows. + #[tracing::instrument( + name = "event.publish", + skip(self, data), + fields( + event.name = %event_name, + event.key = %event_key, + ) + )] pub async fn publish_event( &self, event_name: &str, @@ -312,6 +332,11 @@ impl WorkflowHost { } /// Process an event: find matching subscriptions, set event_data on pointers, re-queue workflows. +#[tracing::instrument( + name = "event.process", + skip(persistence, lock_provider, queue), + fields(event.id = %event_id) +)] async fn process_event( persistence: &Arc, lock_provider: &Arc,