diff --git a/wfe-core/src/executor/workflow_executor.rs b/wfe-core/src/executor/workflow_executor.rs index 66e07fc..f0f47a4 100644 --- a/wfe-core/src/executor/workflow_executor.rs +++ b/wfe-core/src/executor/workflow_executor.rs @@ -23,6 +23,7 @@ pub struct WorkflowExecutor { pub queue_provider: Arc, pub lifecycle: Option>, pub search: Option>, + pub log_sink: Option>, } impl WorkflowExecutor { @@ -37,9 +38,15 @@ impl WorkflowExecutor { queue_provider, lifecycle: None, search: None, + log_sink: None, } } + pub fn with_log_sink(mut self, sink: Arc) -> Self { + self.log_sink = Some(sink); + self + } + pub fn with_lifecycle(mut self, lifecycle: Arc) -> Self { self.lifecycle = Some(lifecycle); self @@ -50,6 +57,15 @@ impl WorkflowExecutor { self } + /// Publish a lifecycle event if a publisher is configured. + async fn publish_lifecycle(&self, event: crate::models::LifecycleEvent) { + if let Some(ref publisher) = self.lifecycle { + if let Err(e) = publisher.publish(event).await { + warn!(error = %e, "failed to publish lifecycle event"); + } + } + } + /// Execute a single workflow instance. /// /// 1. Acquire lock @@ -202,6 +218,16 @@ impl WorkflowExecutor { } workflow.execution_pointers[idx].status = PointerStatus::Running; + self.publish_lifecycle(crate::models::LifecycleEvent::new( + &workflow.id, + &workflow.workflow_definition_id, + workflow.version, + crate::models::LifecycleEventType::StepStarted { + step_id, + step_name: step.name.clone(), + }, + )).await; + // c. Build StepExecutionContext (borrows workflow immutably). let cancellation_token = tokio_util::sync::CancellationToken::new(); let context = StepExecutionContext { @@ -212,6 +238,7 @@ impl WorkflowExecutor { workflow: &workflow, cancellation_token, host_context, + log_sink: self.log_sink.as_deref(), }; // d. Call step.run(context). @@ -238,6 +265,17 @@ impl WorkflowExecutor { has_branches = result.branch_values.is_some(), "Step completed" ); + + self.publish_lifecycle(crate::models::LifecycleEvent::new( + &workflow.id, + &workflow.workflow_definition_id, + workflow.version, + crate::models::LifecycleEventType::StepCompleted { + step_id, + step_name: step.name.clone(), + }, + )).await; + // e. Process the ExecutionResult. // Extract workflow_id before mutable borrow. let wf_id = workflow.id.clone(); @@ -272,6 +310,15 @@ impl WorkflowExecutor { tracing::Span::current().record("step.status", "failed"); warn!(workflow_id, step_id, error = %error_msg, "Step execution failed"); + self.publish_lifecycle(crate::models::LifecycleEvent::new( + &workflow.id, + &workflow.workflow_definition_id, + workflow.version, + crate::models::LifecycleEventType::Error { + message: error_msg.clone(), + }, + )).await; + let pointer_id = workflow.execution_pointers[idx].id.clone(); execution_errors.push(ExecutionError::new( workflow_id, @@ -293,6 +340,12 @@ impl WorkflowExecutor { workflow.status = new_status; if new_status == WorkflowStatus::Terminated { workflow.complete_time = Some(Utc::now()); + self.publish_lifecycle(crate::models::LifecycleEvent::new( + &workflow.id, + &workflow.workflow_definition_id, + workflow.version, + crate::models::LifecycleEventType::Terminated, + )).await; } } @@ -321,6 +374,13 @@ impl WorkflowExecutor { workflow.status = WorkflowStatus::Complete; workflow.complete_time = Some(Utc::now()); + self.publish_lifecycle(crate::models::LifecycleEvent::new( + &workflow.id, + &workflow.workflow_definition_id, + workflow.version, + crate::models::LifecycleEventType::Completed, + )).await; + // Publish completion event for SubWorkflow parents. let completion_event = Event::new( "wfe.workflow.completed", diff --git a/wfe-core/src/primitives/mod.rs b/wfe-core/src/primitives/mod.rs index 32262e4..af9ebac 100644 --- a/wfe-core/src/primitives/mod.rs +++ b/wfe-core/src/primitives/mod.rs @@ -45,6 +45,7 @@ mod test_helpers { workflow, cancellation_token: CancellationToken::new(), host_context: None, + log_sink: None, } } diff --git a/wfe-core/src/primitives/sub_workflow.rs b/wfe-core/src/primitives/sub_workflow.rs index b3de2a1..7aa755e 100644 --- a/wfe-core/src/primitives/sub_workflow.rs +++ b/wfe-core/src/primitives/sub_workflow.rs @@ -212,6 +212,7 @@ mod tests { workflow, cancellation_token: tokio_util::sync::CancellationToken::new(), host_context: Some(host), + log_sink: None, } } diff --git a/wfe-core/src/traits/log_sink.rs b/wfe-core/src/traits/log_sink.rs new file mode 100644 index 0000000..3394240 --- /dev/null +++ b/wfe-core/src/traits/log_sink.rs @@ -0,0 +1,59 @@ +use async_trait::async_trait; +use chrono::{DateTime, Utc}; + +/// A chunk of log output from a step execution. +#[derive(Debug, Clone)] +pub struct LogChunk { + pub workflow_id: String, + pub definition_id: String, + pub step_id: usize, + pub step_name: String, + pub stream: LogStreamType, + pub data: Vec, + pub timestamp: DateTime, +} + +/// Whether a log chunk is from stdout or stderr. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum LogStreamType { + Stdout, + Stderr, +} + +/// Receives log chunks as they're produced during step execution. +/// +/// Implementations can broadcast to live subscribers, persist to a database, +/// index for search, or any combination. The trait is designed to be called +/// from within step executors (shell, containerd, etc.) as lines are produced. +#[async_trait] +pub trait LogSink: Send + Sync { + async fn write_chunk(&self, chunk: LogChunk); +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn log_stream_type_equality() { + assert_eq!(LogStreamType::Stdout, LogStreamType::Stdout); + assert_ne!(LogStreamType::Stdout, LogStreamType::Stderr); + } + + #[test] + fn log_chunk_clone() { + let chunk = LogChunk { + workflow_id: "wf-1".to_string(), + definition_id: "def-1".to_string(), + step_id: 0, + step_name: "build".to_string(), + stream: LogStreamType::Stdout, + data: b"hello\n".to_vec(), + timestamp: Utc::now(), + }; + let cloned = chunk.clone(); + assert_eq!(cloned.workflow_id, "wf-1"); + assert_eq!(cloned.stream, LogStreamType::Stdout); + assert_eq!(cloned.data, b"hello\n"); + } +} diff --git a/wfe-core/src/traits/middleware.rs b/wfe-core/src/traits/middleware.rs index dd6b9a7..0ea1863 100644 --- a/wfe-core/src/traits/middleware.rs +++ b/wfe-core/src/traits/middleware.rs @@ -69,6 +69,7 @@ mod tests { workflow: &instance, cancellation_token: tokio_util::sync::CancellationToken::new(), host_context: None, + log_sink: None, }; mw.pre_step(&ctx).await.unwrap(); } @@ -88,6 +89,7 @@ mod tests { workflow: &instance, cancellation_token: tokio_util::sync::CancellationToken::new(), host_context: None, + log_sink: None, }; let result = ExecutionResult::next(); mw.post_step(&ctx, &result).await.unwrap(); diff --git a/wfe-core/src/traits/mod.rs b/wfe-core/src/traits/mod.rs index 1a5d79f..46db3df 100644 --- a/wfe-core/src/traits/mod.rs +++ b/wfe-core/src/traits/mod.rs @@ -1,5 +1,6 @@ pub mod lifecycle; pub mod lock; +pub mod log_sink; pub mod middleware; pub mod persistence; pub mod queue; @@ -9,6 +10,7 @@ pub mod step; pub use lifecycle::LifecyclePublisher; pub use lock::DistributedLockProvider; +pub use log_sink::{LogChunk, LogSink, LogStreamType}; pub use middleware::{StepMiddleware, WorkflowMiddleware}; pub use persistence::{ EventRepository, PersistenceProvider, ScheduledCommandRepository, SubscriptionRepository, diff --git a/wfe-core/src/traits/step.rs b/wfe-core/src/traits/step.rs index d6967e3..213a03c 100644 --- a/wfe-core/src/traits/step.rs +++ b/wfe-core/src/traits/step.rs @@ -38,6 +38,8 @@ pub struct StepExecutionContext<'a> { pub cancellation_token: tokio_util::sync::CancellationToken, /// Host context for starting child workflows. None if not available. pub host_context: Option<&'a dyn HostContext>, + /// Log sink for streaming step output. None if not configured. + pub log_sink: Option<&'a dyn super::LogSink>, } // Manual Debug impl since dyn HostContext is not Debug. @@ -50,6 +52,7 @@ impl<'a> std::fmt::Debug for StepExecutionContext<'a> { .field("step", &self.step) .field("workflow", &self.workflow) .field("host_context", &self.host_context.is_some()) + .field("log_sink", &self.log_sink.is_some()) .finish() } } diff --git a/wfe/src/host.rs b/wfe/src/host.rs index 1dbc8b2..16be00e 100644 --- a/wfe/src/host.rs +++ b/wfe/src/host.rs @@ -8,8 +8,8 @@ use tracing::{debug, error, info, warn}; use wfe_core::executor::{StepRegistry, WorkflowExecutor}; use wfe_core::models::{ - Event, ExecutionPointer, PointerStatus, QueueType, WorkflowDefinition, WorkflowInstance, - WorkflowStatus, + Event, ExecutionPointer, LifecycleEvent, LifecycleEventType, PointerStatus, QueueType, + WorkflowDefinition, WorkflowInstance, WorkflowStatus, }; use wfe_core::traits::{ DistributedLockProvider, HostContext, LifecyclePublisher, PersistenceProvider, QueueProvider, @@ -308,6 +308,18 @@ impl WorkflowHost { .queue_work(&id, QueueType::Workflow) .await?; + // Publish lifecycle event. + if let Some(ref publisher) = self.lifecycle { + let _ = publisher + .publish(LifecycleEvent::new( + &id, + definition_id, + version, + LifecycleEventType::Started, + )) + .await; + } + Ok(id) } @@ -345,6 +357,16 @@ impl WorkflowHost { } instance.status = WorkflowStatus::Suspended; self.persistence.persist_workflow(&instance).await?; + if let Some(ref publisher) = self.lifecycle { + let _ = publisher + .publish(LifecycleEvent::new( + id, + &instance.workflow_definition_id, + instance.version, + LifecycleEventType::Suspended, + )) + .await; + } Ok(true) } @@ -362,6 +384,16 @@ impl WorkflowHost { .queue_work(id, QueueType::Workflow) .await?; + if let Some(ref publisher) = self.lifecycle { + let _ = publisher + .publish(LifecycleEvent::new( + id, + &instance.workflow_definition_id, + instance.version, + LifecycleEventType::Resumed, + )) + .await; + } Ok(true) } @@ -376,6 +408,16 @@ impl WorkflowHost { instance.status = WorkflowStatus::Terminated; instance.complete_time = Some(chrono::Utc::now()); self.persistence.persist_workflow(&instance).await?; + if let Some(ref publisher) = self.lifecycle { + let _ = publisher + .publish(LifecycleEvent::new( + id, + &instance.workflow_definition_id, + instance.version, + LifecycleEventType::Terminated, + )) + .await; + } Ok(true) } diff --git a/wfe/src/host_builder.rs b/wfe/src/host_builder.rs index 18f6ea7..da1a566 100644 --- a/wfe/src/host_builder.rs +++ b/wfe/src/host_builder.rs @@ -21,6 +21,7 @@ pub struct WorkflowHostBuilder { queue_provider: Option>, lifecycle: Option>, search: Option>, + log_sink: Option>, } impl WorkflowHostBuilder { @@ -31,6 +32,7 @@ impl WorkflowHostBuilder { queue_provider: None, lifecycle: None, search: None, + log_sink: None, } } @@ -64,6 +66,12 @@ impl WorkflowHostBuilder { self } + /// Set an optional log sink for real-time step output streaming. + pub fn use_log_sink(mut self, sink: Arc) -> Self { + self.log_sink = Some(sink); + self + } + /// Build the `WorkflowHost`. /// /// Returns an error if persistence, lock_provider, or queue_provider have not been set. @@ -90,6 +98,9 @@ impl WorkflowHostBuilder { if let Some(ref search) = self.search { executor = executor.with_search(Arc::clone(search)); } + if let Some(ref log_sink) = self.log_sink { + executor = executor.with_log_sink(Arc::clone(log_sink)); + } Ok(WorkflowHost { persistence,