feat: add OpenTelemetry tracing support behind otel feature flag

- Add tracing::instrument spans to executor (workflow.execute),
  host (workflow.start, event.publish, event.process)
- Add otel feature flag to wfe-core and wfe crates
- Add wfe/src/otel.rs helper for OTLP exporter initialization
- Dependencies: tracing-opentelemetry, opentelemetry, opentelemetry_sdk,
  opentelemetry-otlp (all optional behind otel feature)
- Step execution stays at info level, executor internals at debug
This commit is contained in:
2026-03-25 20:41:34 +00:00
parent bd51517e9f
commit c8582eb514
5 changed files with 73 additions and 1 deletions

View File

@@ -18,6 +18,10 @@ chrono = { version = "0.4", features = ["serde"] }
thiserror = "2" thiserror = "2"
tracing = "0.1" tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-opentelemetry = "0.28"
opentelemetry = "0.27"
opentelemetry_sdk = { version = "0.27", features = ["rt-tokio"] }
opentelemetry-otlp = { version = "0.27", features = ["tonic"] }
# HTTP # HTTP
reqwest = { version = "0.12", features = ["json"] } reqwest = { version = "0.12", features = ["json"] }

View File

@@ -8,8 +8,11 @@ description = "Core traits, models, builder, and executor for the WFE workflow e
[features] [features]
default = [] default = []
test-support = [] test-support = []
otel = ["tracing-opentelemetry", "opentelemetry"]
[dependencies] [dependencies]
tracing-opentelemetry = { workspace = true, optional = true }
opentelemetry = { workspace = true, optional = true }
tokio = { workspace = true } tokio = { workspace = true }
serde = { workspace = true } serde = { workspace = true }
serde_json = { workspace = true } serde_json = { workspace = true }

View File

@@ -59,6 +59,15 @@ impl WorkflowExecutor {
/// 6. Check for completion /// 6. Check for completion
/// 7. Persist /// 7. Persist
/// 8. Release lock /// 8. Release lock
#[tracing::instrument(
name = "workflow.execute",
skip(self, definition, step_registry),
fields(
workflow.id = %workflow_id,
workflow.definition_id,
workflow.status,
)
)]
pub async fn execute( pub async fn execute(
&self, &self,
workflow_id: &str, workflow_id: &str,
@@ -96,6 +105,8 @@ impl WorkflowExecutor {
.get_workflow_instance(workflow_id) .get_workflow_instance(workflow_id)
.await?; .await?;
tracing::Span::current().record("workflow.definition_id", workflow.workflow_definition_id.as_str());
if workflow.status != WorkflowStatus::Runnable { if workflow.status != WorkflowStatus::Runnable {
debug!(workflow_id, status = ?workflow.status, "Workflow not runnable, skipping"); debug!(workflow_id, status = ?workflow.status, "Workflow not runnable, skipping");
return Ok(()); return Ok(());
@@ -170,6 +181,15 @@ impl WorkflowExecutor {
// Now we can mutate again since context is dropped. // Now we can mutate again since context is dropped.
match step_result { match step_result {
Ok(result) => { Ok(result) => {
let step_status = if result.sleep_for.is_some() {
"sleeping"
} else if result.event_name.is_some() {
"waiting_for_event"
} else {
"completed"
};
tracing::Span::current().record("step.status", step_status);
info!( info!(
workflow_id, workflow_id,
step_id, step_id,
@@ -202,6 +222,7 @@ impl WorkflowExecutor {
Err(e) => { Err(e) => {
// f. Handle error. // f. Handle error.
let error_msg = e.to_string(); let error_msg = e.to_string();
tracing::Span::current().record("step.status", "failed");
warn!(workflow_id, step_id, error = %error_msg, "Step execution failed"); warn!(workflow_id, step_id, error = %error_msg, "Step execution failed");
let pointer_id = workflow.execution_pointers[idx].id.clone(); let pointer_id = workflow.execution_pointers[idx].id.clone();
@@ -253,6 +274,8 @@ impl WorkflowExecutor {
workflow.complete_time = Some(Utc::now()); workflow.complete_time = Some(Utc::now());
} }
tracing::Span::current().record("workflow.status", tracing::field::debug(&workflow.status));
// Determine next_execution. // Determine next_execution.
let has_active = workflow.execution_pointers.iter().any(|p| p.active); let has_active = workflow.execution_pointers.iter().any(|p| p.active);
if has_active { if has_active {

View File

@@ -5,6 +5,17 @@ edition.workspace = true
license.workspace = true license.workspace = true
description = "WFE workflow engine - umbrella crate" description = "WFE workflow engine - umbrella crate"
[features]
default = []
otel = [
"wfe-core/otel",
"opentelemetry",
"opentelemetry_sdk",
"opentelemetry-otlp",
"tracing-opentelemetry",
"tracing-subscriber/registry",
]
[dependencies] [dependencies]
wfe-core = { workspace = true } wfe-core = { workspace = true }
tokio = { workspace = true } tokio = { workspace = true }
@@ -19,6 +30,12 @@ tokio-util = "0.7"
tracing-subscriber = { workspace = true } tracing-subscriber = { workspace = true }
# OTel (optional, behind "otel" feature)
tracing-opentelemetry = { workspace = true, optional = true }
opentelemetry = { workspace = true, optional = true }
opentelemetry_sdk = { workspace = true, optional = true }
opentelemetry-otlp = { workspace = true, optional = true }
[dev-dependencies] [dev-dependencies]
wfe-core = { workspace = true, features = ["test-support"] } wfe-core = { workspace = true, features = ["test-support"] }
wfe-sqlite = { workspace = true } wfe-sqlite = { workspace = true }

View File

@@ -2,7 +2,7 @@ use std::sync::Arc;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{debug, error, warn}; use tracing::{debug, error, info, warn};
use wfe_core::executor::{StepRegistry, WorkflowExecutor}; use wfe_core::executor::{StepRegistry, WorkflowExecutor};
use wfe_core::models::{ use wfe_core::models::{
@@ -202,6 +202,15 @@ impl WorkflowHost {
} }
/// Start a new workflow instance. /// Start a new workflow instance.
#[tracing::instrument(
name = "workflow.start",
skip(self, data),
fields(
workflow.definition_id = %definition_id,
workflow.version = version,
workflow.id,
)
)]
pub async fn start_workflow( pub async fn start_workflow(
&self, &self,
definition_id: &str, definition_id: &str,
@@ -226,6 +235,9 @@ impl WorkflowHost {
// Persist the instance. // Persist the instance.
let id = self.persistence.create_new_workflow(&instance).await?; let id = self.persistence.create_new_workflow(&instance).await?;
instance.id = id.clone(); instance.id = id.clone();
tracing::Span::current().record("workflow.id", id.as_str());
info!(workflow_id = %id, "Workflow instance created");
// Queue for execution. // Queue for execution.
self.queue_provider self.queue_provider
@@ -236,6 +248,14 @@ impl WorkflowHost {
} }
/// Publish an event that may resume waiting workflows. /// Publish an event that may resume waiting workflows.
#[tracing::instrument(
name = "event.publish",
skip(self, data),
fields(
event.name = %event_name,
event.key = %event_key,
)
)]
pub async fn publish_event( pub async fn publish_event(
&self, &self,
event_name: &str, event_name: &str,
@@ -312,6 +332,11 @@ impl WorkflowHost {
} }
/// Process an event: find matching subscriptions, set event_data on pointers, re-queue workflows. /// Process an event: find matching subscriptions, set event_data on pointers, re-queue workflows.
#[tracing::instrument(
name = "event.process",
skip(persistence, lock_provider, queue),
fields(event.id = %event_id)
)]
async fn process_event( async fn process_event(
persistence: &Arc<dyn PersistenceProvider>, persistence: &Arc<dyn PersistenceProvider>,
lock_provider: &Arc<dyn DistributedLockProvider>, lock_provider: &Arc<dyn DistributedLockProvider>,