diff --git a/wfe/Cargo.toml b/wfe/Cargo.toml new file mode 100644 index 0000000..0b69b0f --- /dev/null +++ b/wfe/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "wfe" +version.workspace = true +edition.workspace = true +license.workspace = true +description = "WFE workflow engine - umbrella crate" + +[dependencies] +wfe-core = { workspace = true } +tokio = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +async-trait = { workspace = true } +uuid = { workspace = true } +chrono = { workspace = true } +thiserror = { workspace = true } +tracing = { workspace = true } +tokio-util = "0.7" + +[dev-dependencies] +wfe-core = { workspace = true, features = ["test-support"] } +wfe-sqlite = { workspace = true } +pretty_assertions = { workspace = true } +rstest = { workspace = true } +wiremock = { workspace = true } +tokio = { workspace = true, features = ["test-util"] } diff --git a/wfe/src/host.rs b/wfe/src/host.rs new file mode 100644 index 0000000..95f445f --- /dev/null +++ b/wfe/src/host.rs @@ -0,0 +1,356 @@ +use std::sync::Arc; + +use tokio::sync::RwLock; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error, warn}; + +use wfe_core::executor::{StepRegistry, WorkflowExecutor}; +use wfe_core::models::{ + Event, ExecutionPointer, PointerStatus, QueueType, WorkflowDefinition, WorkflowInstance, + WorkflowStatus, +}; +use wfe_core::traits::{ + DistributedLockProvider, LifecyclePublisher, PersistenceProvider, QueueProvider, SearchIndex, + StepBody, WorkflowData, +}; +use wfe_core::traits::registry::WorkflowRegistry; +use wfe_core::{Result, WfeError}; +use wfe_core::builder::WorkflowBuilder; + +use crate::registry::InMemoryWorkflowRegistry; + +/// The main orchestrator that ties all workflow engine components together. +pub struct WorkflowHost { + pub(crate) persistence: Arc, + pub(crate) lock_provider: Arc, + pub(crate) queue_provider: Arc, + pub(crate) lifecycle: Option>, + pub(crate) search: Option>, + pub(crate) registry: Arc>, + pub(crate) step_registry: Arc>, + pub(crate) executor: Arc, + pub(crate) shutdown: CancellationToken, +} + +impl WorkflowHost { + /// Spawn background polling tasks for processing workflows and events. + pub async fn start(&self) -> Result<()> { + self.queue_provider.start().await?; + self.lock_provider.start().await?; + if let Some(ref search) = self.search { + search.start().await?; + } + + // Spawn workflow consumer task. + let executor = Arc::clone(&self.executor); + let registry = Arc::clone(&self.registry); + let step_registry = Arc::clone(&self.step_registry); + let queue = Arc::clone(&self.queue_provider); + let shutdown = self.shutdown.clone(); + + tokio::spawn(async move { + loop { + tokio::select! { + _ = shutdown.cancelled() => { + debug!("Workflow consumer shutting down"); + break; + } + result = queue.dequeue_work(QueueType::Workflow) => { + match result { + Ok(Some(workflow_id)) => { + // Look up the workflow instance to find its definition. + let instance = match executor.persistence.get_workflow_instance(&workflow_id).await { + Ok(inst) => inst, + Err(e) => { + error!(workflow_id = %workflow_id, error = %e, "Failed to load workflow instance"); + continue; + } + }; + let reg = registry.read().await; + let definition = reg.get_definition( + &instance.workflow_definition_id, + Some(instance.version), + ); + match definition { + Some(def) => { + let def_clone = def.clone(); + let sr = step_registry.read().await; + if let Err(e) = executor.execute(&workflow_id, &def_clone, &sr).await { + error!(workflow_id = %workflow_id, error = %e, "Workflow execution failed"); + } + } + None => { + warn!( + workflow_id = %workflow_id, + definition_id = %instance.workflow_definition_id, + version = instance.version, + "Workflow definition not found" + ); + } + } + } + Ok(None) => { + // No work available; sleep briefly before polling again. + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + } + Err(e) => { + error!(error = %e, "Failed to dequeue workflow work"); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + } + } + } + } + }); + + // Spawn event consumer task. + let persistence = Arc::clone(&self.persistence); + let lock_provider2 = Arc::clone(&self.lock_provider); + let queue2 = Arc::clone(&self.queue_provider); + let shutdown2 = self.shutdown.clone(); + + tokio::spawn(async move { + loop { + tokio::select! { + _ = shutdown2.cancelled() => { + debug!("Event consumer shutting down"); + break; + } + result = queue2.dequeue_work(QueueType::Event) => { + match result { + Ok(Some(event_id)) => { + if let Err(e) = process_event(&persistence, &lock_provider2, &queue2, &event_id).await { + error!(event_id = %event_id, error = %e, "Event processing failed"); + } + } + Ok(None) => { + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + } + Err(e) => { + error!(error = %e, "Failed to dequeue event work"); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + } + } + } + } + }); + + Ok(()) + } + + /// Signal shutdown of all background tasks. + pub async fn stop(&self) { + self.shutdown.cancel(); + if let Err(e) = self.queue_provider.stop().await { + warn!(error = %e, "Failed to stop queue provider"); + } + if let Err(e) = self.lock_provider.stop().await { + warn!(error = %e, "Failed to stop lock provider"); + } + if let Some(ref search) = self.search + && let Err(e) = search.stop().await + { + warn!(error = %e, "Failed to stop search index"); + } + } + + /// Register a workflow definition built via a closure that configures a `WorkflowBuilder`. + pub async fn register_workflow( + &self, + builder_fn: &dyn Fn(WorkflowBuilder) -> WorkflowBuilder, + id: &str, + version: u32, + ) -> WorkflowDefinition { + let builder = WorkflowBuilder::::new(); + let builder = builder_fn(builder); + let definition = builder.build(id, version); + let mut reg = self.registry.write().await; + reg.register(definition.clone()); + definition + } + + /// Register a pre-built `WorkflowDefinition` directly. + pub async fn register_workflow_definition(&self, definition: WorkflowDefinition) { + let mut reg = self.registry.write().await; + reg.register(definition); + } + + /// Register a step type with the step registry. + pub async fn register_step(&self) { + let mut sr = self.step_registry.write().await; + sr.register::(); + } + + /// Start a new workflow instance. + pub async fn start_workflow( + &self, + definition_id: &str, + version: u32, + data: serde_json::Value, + ) -> Result { + // Verify definition exists. + let reg = self.registry.read().await; + let definition = reg + .get_definition(definition_id, Some(version)) + .ok_or_else(|| WfeError::DefinitionNotFound { + id: definition_id.to_string(), + version, + })?; + + // Create initial execution pointer for step 0 if the definition has steps. + let mut instance = WorkflowInstance::new(definition_id, version, data); + if !definition.steps.is_empty() { + instance.execution_pointers.push(ExecutionPointer::new(0)); + } + + // Persist the instance. + let id = self.persistence.create_new_workflow(&instance).await?; + instance.id = id.clone(); + + // Queue for execution. + self.queue_provider + .queue_work(&id, QueueType::Workflow) + .await?; + + Ok(id) + } + + /// Publish an event that may resume waiting workflows. + pub async fn publish_event( + &self, + event_name: &str, + event_key: &str, + data: serde_json::Value, + ) -> Result<()> { + let event = Event::new(event_name, event_key, data); + let event_id = self.persistence.create_event(&event).await?; + + // Queue event for processing. + self.queue_provider + .queue_work(&event_id, QueueType::Event) + .await?; + + Ok(()) + } + + /// Suspend a running workflow. + pub async fn suspend_workflow(&self, id: &str) -> Result { + let mut instance = self.persistence.get_workflow_instance(id).await?; + if instance.status != WorkflowStatus::Runnable { + return Ok(false); + } + instance.status = WorkflowStatus::Suspended; + self.persistence.persist_workflow(&instance).await?; + Ok(true) + } + + /// Resume a suspended workflow. + pub async fn resume_workflow(&self, id: &str) -> Result { + let mut instance = self.persistence.get_workflow_instance(id).await?; + if instance.status != WorkflowStatus::Suspended { + return Ok(false); + } + instance.status = WorkflowStatus::Runnable; + self.persistence.persist_workflow(&instance).await?; + + // Re-queue for execution. + self.queue_provider + .queue_work(id, QueueType::Workflow) + .await?; + + Ok(true) + } + + /// Terminate a running workflow. + pub async fn terminate_workflow(&self, id: &str) -> Result { + let mut instance = self.persistence.get_workflow_instance(id).await?; + if instance.status == WorkflowStatus::Complete + || instance.status == WorkflowStatus::Terminated + { + return Ok(false); + } + instance.status = WorkflowStatus::Terminated; + instance.complete_time = Some(chrono::Utc::now()); + self.persistence.persist_workflow(&instance).await?; + Ok(true) + } + + /// Fetch a workflow instance by ID. + pub async fn get_workflow(&self, id: &str) -> Result { + self.persistence.get_workflow_instance(id).await + } + + /// Access the persistence provider. + pub fn persistence(&self) -> &Arc { + &self.persistence + } + + /// Access the lifecycle publisher, if configured. + pub fn lifecycle(&self) -> Option<&Arc> { + self.lifecycle.as_ref() + } +} + +/// Process an event: find matching subscriptions, set event_data on pointers, re-queue workflows. +async fn process_event( + persistence: &Arc, + lock_provider: &Arc, + queue: &Arc, + event_id: &str, +) -> Result<()> { + let event = persistence.get_event(event_id).await?; + + // Find matching subscriptions. + let subscriptions = persistence + .get_subscriptions(&event.event_name, &event.event_key, event.event_time) + .await?; + + for sub in &subscriptions { + // Acquire lock on the workflow to prevent concurrent modifications. + if !lock_provider.acquire_lock(&sub.workflow_id).await? { + // Re-queue the event for retry + queue.queue_work(event_id, QueueType::Event).await?; + return Ok(()); + } + + let result = async { + // Load the workflow and update the matching execution pointer. + let mut instance = persistence.get_workflow_instance(&sub.workflow_id).await?; + + for pointer in &mut instance.execution_pointers { + if pointer.id == sub.execution_pointer_id + && pointer.status == PointerStatus::WaitingForEvent + { + pointer.event_data = Some(event.event_data.clone()); + pointer.event_published = true; + pointer.active = true; + } + } + + instance.next_execution = Some(0); + persistence.persist_workflow(&instance).await?; + + // Re-queue the workflow for execution. + queue + .queue_work(&sub.workflow_id, QueueType::Workflow) + .await?; + + // Terminate the subscription. + persistence.terminate_subscription(&sub.id).await?; + + Ok::<(), WfeError>(()) + } + .await; + + // Release lock regardless of outcome. + lock_provider.release_lock(&sub.workflow_id).await?; + + result?; + } + + // Mark the event as processed. + persistence.mark_event_processed(event_id).await?; + + Ok(()) +} diff --git a/wfe/src/host_builder.rs b/wfe/src/host_builder.rs new file mode 100644 index 0000000..18f6ea7 --- /dev/null +++ b/wfe/src/host_builder.rs @@ -0,0 +1,112 @@ +use std::sync::Arc; + +use tokio::sync::RwLock; +use tokio_util::sync::CancellationToken; + +use wfe_core::executor::{StepRegistry, WorkflowExecutor}; +use wfe_core::traits::{ + DistributedLockProvider, LifecyclePublisher, PersistenceProvider, QueueProvider, SearchIndex, +}; +use wfe_core::WfeError; + +use crate::host::WorkflowHost; +use crate::registry::InMemoryWorkflowRegistry; + +/// Fluent builder for constructing a `WorkflowHost`. +/// +/// Uses the owned-self pattern: each method consumes and returns the builder. +pub struct WorkflowHostBuilder { + persistence: Option>, + lock_provider: Option>, + queue_provider: Option>, + lifecycle: Option>, + search: Option>, +} + +impl WorkflowHostBuilder { + pub fn new() -> Self { + Self { + persistence: None, + lock_provider: None, + queue_provider: None, + lifecycle: None, + search: None, + } + } + + /// Set the persistence provider (required). + pub fn use_persistence(mut self, persistence: Arc) -> Self { + self.persistence = Some(persistence); + self + } + + /// Set the distributed lock provider (required). + pub fn use_lock_provider(mut self, lock_provider: Arc) -> Self { + self.lock_provider = Some(lock_provider); + self + } + + /// Set the queue provider (required). + pub fn use_queue_provider(mut self, queue_provider: Arc) -> Self { + self.queue_provider = Some(queue_provider); + self + } + + /// Set an optional lifecycle publisher. + pub fn use_lifecycle(mut self, lifecycle: Arc) -> Self { + self.lifecycle = Some(lifecycle); + self + } + + /// Set an optional search index. + pub fn use_search(mut self, search: Arc) -> Self { + self.search = Some(search); + self + } + + /// Build the `WorkflowHost`. + /// + /// Returns an error if persistence, lock_provider, or queue_provider have not been set. + pub fn build(self) -> wfe_core::Result { + let persistence = self.persistence.ok_or_else(|| { + WfeError::Other("PersistenceProvider is required. Call .use_persistence() before .build().".into()) + })?; + let lock_provider = self.lock_provider.ok_or_else(|| { + WfeError::Other("DistributedLockProvider is required. Call .use_lock_provider() before .build().".into()) + })?; + let queue_provider = self.queue_provider.ok_or_else(|| { + WfeError::Other("QueueProvider is required. Call .use_queue_provider() before .build().".into()) + })?; + + let mut executor = WorkflowExecutor::new( + Arc::clone(&persistence), + Arc::clone(&lock_provider), + Arc::clone(&queue_provider), + ); + + if let Some(ref lifecycle) = self.lifecycle { + executor = executor.with_lifecycle(Arc::clone(lifecycle)); + } + if let Some(ref search) = self.search { + executor = executor.with_search(Arc::clone(search)); + } + + Ok(WorkflowHost { + persistence, + lock_provider, + queue_provider, + lifecycle: self.lifecycle, + search: self.search, + registry: Arc::new(RwLock::new(InMemoryWorkflowRegistry::new())), + step_registry: Arc::new(RwLock::new(StepRegistry::new())), + executor: Arc::new(executor), + shutdown: CancellationToken::new(), + }) + } +} + +impl Default for WorkflowHostBuilder { + fn default() -> Self { + Self::new() + } +} diff --git a/wfe/src/lib.rs b/wfe/src/lib.rs new file mode 100644 index 0000000..6354669 --- /dev/null +++ b/wfe/src/lib.rs @@ -0,0 +1,15 @@ +pub mod host; +pub mod host_builder; +pub mod purger; +pub mod registry; +pub mod sync_runner; + +// Re-export everything useful from wfe-core. +pub use wfe_core::*; + +// Re-export the new types at top level for convenience. +pub use host::WorkflowHost; +pub use host_builder::WorkflowHostBuilder; +pub use purger::purge_workflows; +pub use registry::InMemoryWorkflowRegistry; +pub use sync_runner::run_workflow_sync; diff --git a/wfe/src/purger.rs b/wfe/src/purger.rs new file mode 100644 index 0000000..78081d2 --- /dev/null +++ b/wfe/src/purger.rs @@ -0,0 +1,28 @@ +use chrono::{DateTime, Utc}; + +use wfe_core::models::WorkflowStatus; +use wfe_core::traits::PersistenceProvider; +use wfe_core::Result; + +/// Purge workflows matching a given status that were created before `older_than`. +/// +/// TODO: This requires a query-by-status-and-date method on the persistence trait. +/// For now, this is a stub that documents the intended contract. When the persistence +/// layer gains `get_workflows_by_status` and `delete_workflow` methods, this function +/// should be implemented fully. +pub async fn purge_workflows( + _persistence: &dyn PersistenceProvider, + _status: WorkflowStatus, + _older_than: DateTime, +) -> Result<()> { + // TODO: Implement once PersistenceProvider exposes: + // async fn get_workflows_by_status(&self, status: WorkflowStatus, before: DateTime) -> Result>; + // async fn delete_workflow(&self, id: &str) -> Result<()>; + // + // Intended implementation: + // let ids = persistence.get_workflows_by_status(status, older_than).await?; + // for id in ids { + // persistence.delete_workflow(&id).await?; + // } + Ok(()) +} diff --git a/wfe/src/registry.rs b/wfe/src/registry.rs new file mode 100644 index 0000000..bc62586 --- /dev/null +++ b/wfe/src/registry.rs @@ -0,0 +1,133 @@ +use std::collections::HashMap; + +use wfe_core::models::WorkflowDefinition; +use wfe_core::traits::registry::WorkflowRegistry; + +/// Concrete in-memory implementation of `WorkflowRegistry`. +pub struct InMemoryWorkflowRegistry { + definitions: HashMap<(String, u32), WorkflowDefinition>, +} + +impl InMemoryWorkflowRegistry { + pub fn new() -> Self { + Self { + definitions: HashMap::new(), + } + } +} + +impl Default for InMemoryWorkflowRegistry { + fn default() -> Self { + Self::new() + } +} + +impl WorkflowRegistry for InMemoryWorkflowRegistry { + fn register(&mut self, definition: WorkflowDefinition) { + let key = (definition.id.clone(), definition.version); + self.definitions.insert(key, definition); + } + + fn get_definition(&self, id: &str, version: Option) -> Option<&WorkflowDefinition> { + match version { + Some(v) => self.definitions.get(&(id.to_string(), v)), + None => { + // Return the definition with the highest version for this id. + self.definitions + .iter() + .filter(|((def_id, _), _)| def_id == id) + .max_by_key(|((_, v), _)| *v) + .map(|(_, def)| def) + } + } + } + + fn is_registered(&self, id: &str, version: u32) -> bool { + self.definitions.contains_key(&(id.to_string(), version)) + } + + fn deregister(&mut self, id: &str, version: u32) -> bool { + self.definitions + .remove(&(id.to_string(), version)) + .is_some() + } + + fn get_all_definitions(&self) -> Vec<&WorkflowDefinition> { + self.definitions.values().collect() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_definition(id: &str, version: u32) -> WorkflowDefinition { + WorkflowDefinition::new(id, version) + } + + #[test] + fn registry_register_and_get() { + let mut registry = InMemoryWorkflowRegistry::new(); + let def = make_definition("my-workflow", 1); + registry.register(def); + + let retrieved = registry.get_definition("my-workflow", Some(1)); + assert!(retrieved.is_some()); + assert_eq!(retrieved.unwrap().id, "my-workflow"); + assert_eq!(retrieved.unwrap().version, 1); + } + + #[test] + fn registry_version_support() { + let mut registry = InMemoryWorkflowRegistry::new(); + registry.register(make_definition("wf", 1)); + registry.register(make_definition("wf", 2)); + + // Get specific versions. + let v1 = registry.get_definition("wf", Some(1)).unwrap(); + assert_eq!(v1.version, 1); + + let v2 = registry.get_definition("wf", Some(2)).unwrap(); + assert_eq!(v2.version, 2); + + // Get latest (None) returns v2. + let latest = registry.get_definition("wf", None).unwrap(); + assert_eq!(latest.version, 2); + } + + #[test] + fn registry_deregister() { + let mut registry = InMemoryWorkflowRegistry::new(); + registry.register(make_definition("wf", 1)); + assert!(registry.is_registered("wf", 1)); + + let removed = registry.deregister("wf", 1); + assert!(removed); + assert!(!registry.is_registered("wf", 1)); + assert!(registry.get_definition("wf", Some(1)).is_none()); + } + + #[test] + fn registry_get_all_definitions() { + let mut registry = InMemoryWorkflowRegistry::new(); + registry.register(make_definition("a", 1)); + registry.register(make_definition("b", 1)); + registry.register(make_definition("a", 2)); + + let all = registry.get_all_definitions(); + assert_eq!(all.len(), 3); + } + + #[test] + fn registry_get_nonexistent_returns_none() { + let registry = InMemoryWorkflowRegistry::new(); + assert!(registry.get_definition("nope", Some(1)).is_none()); + assert!(registry.get_definition("nope", None).is_none()); + } + + #[test] + fn registry_deregister_nonexistent_returns_false() { + let mut registry = InMemoryWorkflowRegistry::new(); + assert!(!registry.deregister("nope", 1)); + } +} diff --git a/wfe/src/sync_runner.rs b/wfe/src/sync_runner.rs new file mode 100644 index 0000000..4f32eb3 --- /dev/null +++ b/wfe/src/sync_runner.rs @@ -0,0 +1,42 @@ +use std::time::Duration; + +use wfe_core::models::{WorkflowInstance, WorkflowStatus}; +use wfe_core::{Result, WfeError}; + +use crate::host::WorkflowHost; + +/// Run a workflow to completion synchronously (for testing). +/// +/// Starts the workflow, then polls persistence in a loop until the workflow +/// reaches `Complete` or `Terminated` status, or the timeout expires. +pub async fn run_workflow_sync( + host: &WorkflowHost, + definition_id: &str, + version: u32, + data: serde_json::Value, + timeout: Duration, +) -> Result { + let workflow_id = host.start_workflow(definition_id, version, data).await?; + + let start = tokio::time::Instant::now(); + let poll_interval = Duration::from_millis(25); + + loop { + if start.elapsed() > timeout { + return Err(WfeError::StepExecution(format!( + "Workflow {workflow_id} did not complete within {timeout:?}" + ))); + } + + let instance = host.persistence.get_workflow_instance(&workflow_id).await?; + + match instance.status { + WorkflowStatus::Complete | WorkflowStatus::Terminated => { + return Ok(instance); + } + _ => { + tokio::time::sleep(poll_interval).await; + } + } + } +}