feat(wfe): add WorkflowHost, registry, sync runner, and purger

WorkflowHost orchestrates the engine: background workflow and event
consumers, start/stop lifecycle, workflow CRUD, event publishing.
WorkflowHostBuilder (owned-self, returns Result).

InMemoryWorkflowRegistry with version support. SyncWorkflowRunner
for testing. WorkflowPurger stub.

Event consumer acquires distributed locks before modifying workflows
to prevent lost updates from concurrent event processing.
This commit is contained in:
2026-03-25 20:11:06 +00:00
parent a61e68d2a9
commit 8b946e86e3
7 changed files with 712 additions and 0 deletions

26
wfe/Cargo.toml Normal file
View File

@@ -0,0 +1,26 @@
[package]
name = "wfe"
version.workspace = true
edition.workspace = true
license.workspace = true
description = "WFE workflow engine - umbrella crate"
[dependencies]
wfe-core = { workspace = true }
tokio = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
async-trait = { workspace = true }
uuid = { workspace = true }
chrono = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
tokio-util = "0.7"
[dev-dependencies]
wfe-core = { workspace = true, features = ["test-support"] }
wfe-sqlite = { workspace = true }
pretty_assertions = { workspace = true }
rstest = { workspace = true }
wiremock = { workspace = true }
tokio = { workspace = true, features = ["test-util"] }

356
wfe/src/host.rs Normal file
View File

@@ -0,0 +1,356 @@
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, warn};
use wfe_core::executor::{StepRegistry, WorkflowExecutor};
use wfe_core::models::{
Event, ExecutionPointer, PointerStatus, QueueType, WorkflowDefinition, WorkflowInstance,
WorkflowStatus,
};
use wfe_core::traits::{
DistributedLockProvider, LifecyclePublisher, PersistenceProvider, QueueProvider, SearchIndex,
StepBody, WorkflowData,
};
use wfe_core::traits::registry::WorkflowRegistry;
use wfe_core::{Result, WfeError};
use wfe_core::builder::WorkflowBuilder;
use crate::registry::InMemoryWorkflowRegistry;
/// The main orchestrator that ties all workflow engine components together.
pub struct WorkflowHost {
pub(crate) persistence: Arc<dyn PersistenceProvider>,
pub(crate) lock_provider: Arc<dyn DistributedLockProvider>,
pub(crate) queue_provider: Arc<dyn QueueProvider>,
pub(crate) lifecycle: Option<Arc<dyn LifecyclePublisher>>,
pub(crate) search: Option<Arc<dyn SearchIndex>>,
pub(crate) registry: Arc<RwLock<InMemoryWorkflowRegistry>>,
pub(crate) step_registry: Arc<RwLock<StepRegistry>>,
pub(crate) executor: Arc<WorkflowExecutor>,
pub(crate) shutdown: CancellationToken,
}
impl WorkflowHost {
/// Spawn background polling tasks for processing workflows and events.
pub async fn start(&self) -> Result<()> {
self.queue_provider.start().await?;
self.lock_provider.start().await?;
if let Some(ref search) = self.search {
search.start().await?;
}
// Spawn workflow consumer task.
let executor = Arc::clone(&self.executor);
let registry = Arc::clone(&self.registry);
let step_registry = Arc::clone(&self.step_registry);
let queue = Arc::clone(&self.queue_provider);
let shutdown = self.shutdown.clone();
tokio::spawn(async move {
loop {
tokio::select! {
_ = shutdown.cancelled() => {
debug!("Workflow consumer shutting down");
break;
}
result = queue.dequeue_work(QueueType::Workflow) => {
match result {
Ok(Some(workflow_id)) => {
// Look up the workflow instance to find its definition.
let instance = match executor.persistence.get_workflow_instance(&workflow_id).await {
Ok(inst) => inst,
Err(e) => {
error!(workflow_id = %workflow_id, error = %e, "Failed to load workflow instance");
continue;
}
};
let reg = registry.read().await;
let definition = reg.get_definition(
&instance.workflow_definition_id,
Some(instance.version),
);
match definition {
Some(def) => {
let def_clone = def.clone();
let sr = step_registry.read().await;
if let Err(e) = executor.execute(&workflow_id, &def_clone, &sr).await {
error!(workflow_id = %workflow_id, error = %e, "Workflow execution failed");
}
}
None => {
warn!(
workflow_id = %workflow_id,
definition_id = %instance.workflow_definition_id,
version = instance.version,
"Workflow definition not found"
);
}
}
}
Ok(None) => {
// No work available; sleep briefly before polling again.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
Err(e) => {
error!(error = %e, "Failed to dequeue workflow work");
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
}
}
}
});
// Spawn event consumer task.
let persistence = Arc::clone(&self.persistence);
let lock_provider2 = Arc::clone(&self.lock_provider);
let queue2 = Arc::clone(&self.queue_provider);
let shutdown2 = self.shutdown.clone();
tokio::spawn(async move {
loop {
tokio::select! {
_ = shutdown2.cancelled() => {
debug!("Event consumer shutting down");
break;
}
result = queue2.dequeue_work(QueueType::Event) => {
match result {
Ok(Some(event_id)) => {
if let Err(e) = process_event(&persistence, &lock_provider2, &queue2, &event_id).await {
error!(event_id = %event_id, error = %e, "Event processing failed");
}
}
Ok(None) => {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
Err(e) => {
error!(error = %e, "Failed to dequeue event work");
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
}
}
}
});
Ok(())
}
/// Signal shutdown of all background tasks.
pub async fn stop(&self) {
self.shutdown.cancel();
if let Err(e) = self.queue_provider.stop().await {
warn!(error = %e, "Failed to stop queue provider");
}
if let Err(e) = self.lock_provider.stop().await {
warn!(error = %e, "Failed to stop lock provider");
}
if let Some(ref search) = self.search
&& let Err(e) = search.stop().await
{
warn!(error = %e, "Failed to stop search index");
}
}
/// Register a workflow definition built via a closure that configures a `WorkflowBuilder`.
pub async fn register_workflow<D: WorkflowData>(
&self,
builder_fn: &dyn Fn(WorkflowBuilder<D>) -> WorkflowBuilder<D>,
id: &str,
version: u32,
) -> WorkflowDefinition {
let builder = WorkflowBuilder::<D>::new();
let builder = builder_fn(builder);
let definition = builder.build(id, version);
let mut reg = self.registry.write().await;
reg.register(definition.clone());
definition
}
/// Register a pre-built `WorkflowDefinition` directly.
pub async fn register_workflow_definition(&self, definition: WorkflowDefinition) {
let mut reg = self.registry.write().await;
reg.register(definition);
}
/// Register a step type with the step registry.
pub async fn register_step<S: StepBody + Default + 'static>(&self) {
let mut sr = self.step_registry.write().await;
sr.register::<S>();
}
/// Start a new workflow instance.
pub async fn start_workflow(
&self,
definition_id: &str,
version: u32,
data: serde_json::Value,
) -> Result<String> {
// Verify definition exists.
let reg = self.registry.read().await;
let definition = reg
.get_definition(definition_id, Some(version))
.ok_or_else(|| WfeError::DefinitionNotFound {
id: definition_id.to_string(),
version,
})?;
// Create initial execution pointer for step 0 if the definition has steps.
let mut instance = WorkflowInstance::new(definition_id, version, data);
if !definition.steps.is_empty() {
instance.execution_pointers.push(ExecutionPointer::new(0));
}
// Persist the instance.
let id = self.persistence.create_new_workflow(&instance).await?;
instance.id = id.clone();
// Queue for execution.
self.queue_provider
.queue_work(&id, QueueType::Workflow)
.await?;
Ok(id)
}
/// Publish an event that may resume waiting workflows.
pub async fn publish_event(
&self,
event_name: &str,
event_key: &str,
data: serde_json::Value,
) -> Result<()> {
let event = Event::new(event_name, event_key, data);
let event_id = self.persistence.create_event(&event).await?;
// Queue event for processing.
self.queue_provider
.queue_work(&event_id, QueueType::Event)
.await?;
Ok(())
}
/// Suspend a running workflow.
pub async fn suspend_workflow(&self, id: &str) -> Result<bool> {
let mut instance = self.persistence.get_workflow_instance(id).await?;
if instance.status != WorkflowStatus::Runnable {
return Ok(false);
}
instance.status = WorkflowStatus::Suspended;
self.persistence.persist_workflow(&instance).await?;
Ok(true)
}
/// Resume a suspended workflow.
pub async fn resume_workflow(&self, id: &str) -> Result<bool> {
let mut instance = self.persistence.get_workflow_instance(id).await?;
if instance.status != WorkflowStatus::Suspended {
return Ok(false);
}
instance.status = WorkflowStatus::Runnable;
self.persistence.persist_workflow(&instance).await?;
// Re-queue for execution.
self.queue_provider
.queue_work(id, QueueType::Workflow)
.await?;
Ok(true)
}
/// Terminate a running workflow.
pub async fn terminate_workflow(&self, id: &str) -> Result<bool> {
let mut instance = self.persistence.get_workflow_instance(id).await?;
if instance.status == WorkflowStatus::Complete
|| instance.status == WorkflowStatus::Terminated
{
return Ok(false);
}
instance.status = WorkflowStatus::Terminated;
instance.complete_time = Some(chrono::Utc::now());
self.persistence.persist_workflow(&instance).await?;
Ok(true)
}
/// Fetch a workflow instance by ID.
pub async fn get_workflow(&self, id: &str) -> Result<WorkflowInstance> {
self.persistence.get_workflow_instance(id).await
}
/// Access the persistence provider.
pub fn persistence(&self) -> &Arc<dyn PersistenceProvider> {
&self.persistence
}
/// Access the lifecycle publisher, if configured.
pub fn lifecycle(&self) -> Option<&Arc<dyn LifecyclePublisher>> {
self.lifecycle.as_ref()
}
}
/// Process an event: find matching subscriptions, set event_data on pointers, re-queue workflows.
async fn process_event(
persistence: &Arc<dyn PersistenceProvider>,
lock_provider: &Arc<dyn DistributedLockProvider>,
queue: &Arc<dyn QueueProvider>,
event_id: &str,
) -> Result<()> {
let event = persistence.get_event(event_id).await?;
// Find matching subscriptions.
let subscriptions = persistence
.get_subscriptions(&event.event_name, &event.event_key, event.event_time)
.await?;
for sub in &subscriptions {
// Acquire lock on the workflow to prevent concurrent modifications.
if !lock_provider.acquire_lock(&sub.workflow_id).await? {
// Re-queue the event for retry
queue.queue_work(event_id, QueueType::Event).await?;
return Ok(());
}
let result = async {
// Load the workflow and update the matching execution pointer.
let mut instance = persistence.get_workflow_instance(&sub.workflow_id).await?;
for pointer in &mut instance.execution_pointers {
if pointer.id == sub.execution_pointer_id
&& pointer.status == PointerStatus::WaitingForEvent
{
pointer.event_data = Some(event.event_data.clone());
pointer.event_published = true;
pointer.active = true;
}
}
instance.next_execution = Some(0);
persistence.persist_workflow(&instance).await?;
// Re-queue the workflow for execution.
queue
.queue_work(&sub.workflow_id, QueueType::Workflow)
.await?;
// Terminate the subscription.
persistence.terminate_subscription(&sub.id).await?;
Ok::<(), WfeError>(())
}
.await;
// Release lock regardless of outcome.
lock_provider.release_lock(&sub.workflow_id).await?;
result?;
}
// Mark the event as processed.
persistence.mark_event_processed(event_id).await?;
Ok(())
}

112
wfe/src/host_builder.rs Normal file
View File

@@ -0,0 +1,112 @@
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
use wfe_core::executor::{StepRegistry, WorkflowExecutor};
use wfe_core::traits::{
DistributedLockProvider, LifecyclePublisher, PersistenceProvider, QueueProvider, SearchIndex,
};
use wfe_core::WfeError;
use crate::host::WorkflowHost;
use crate::registry::InMemoryWorkflowRegistry;
/// Fluent builder for constructing a `WorkflowHost`.
///
/// Uses the owned-self pattern: each method consumes and returns the builder.
pub struct WorkflowHostBuilder {
persistence: Option<Arc<dyn PersistenceProvider>>,
lock_provider: Option<Arc<dyn DistributedLockProvider>>,
queue_provider: Option<Arc<dyn QueueProvider>>,
lifecycle: Option<Arc<dyn LifecyclePublisher>>,
search: Option<Arc<dyn SearchIndex>>,
}
impl WorkflowHostBuilder {
pub fn new() -> Self {
Self {
persistence: None,
lock_provider: None,
queue_provider: None,
lifecycle: None,
search: None,
}
}
/// Set the persistence provider (required).
pub fn use_persistence(mut self, persistence: Arc<dyn PersistenceProvider>) -> Self {
self.persistence = Some(persistence);
self
}
/// Set the distributed lock provider (required).
pub fn use_lock_provider(mut self, lock_provider: Arc<dyn DistributedLockProvider>) -> Self {
self.lock_provider = Some(lock_provider);
self
}
/// Set the queue provider (required).
pub fn use_queue_provider(mut self, queue_provider: Arc<dyn QueueProvider>) -> Self {
self.queue_provider = Some(queue_provider);
self
}
/// Set an optional lifecycle publisher.
pub fn use_lifecycle(mut self, lifecycle: Arc<dyn LifecyclePublisher>) -> Self {
self.lifecycle = Some(lifecycle);
self
}
/// Set an optional search index.
pub fn use_search(mut self, search: Arc<dyn SearchIndex>) -> Self {
self.search = Some(search);
self
}
/// Build the `WorkflowHost`.
///
/// Returns an error if persistence, lock_provider, or queue_provider have not been set.
pub fn build(self) -> wfe_core::Result<WorkflowHost> {
let persistence = self.persistence.ok_or_else(|| {
WfeError::Other("PersistenceProvider is required. Call .use_persistence() before .build().".into())
})?;
let lock_provider = self.lock_provider.ok_or_else(|| {
WfeError::Other("DistributedLockProvider is required. Call .use_lock_provider() before .build().".into())
})?;
let queue_provider = self.queue_provider.ok_or_else(|| {
WfeError::Other("QueueProvider is required. Call .use_queue_provider() before .build().".into())
})?;
let mut executor = WorkflowExecutor::new(
Arc::clone(&persistence),
Arc::clone(&lock_provider),
Arc::clone(&queue_provider),
);
if let Some(ref lifecycle) = self.lifecycle {
executor = executor.with_lifecycle(Arc::clone(lifecycle));
}
if let Some(ref search) = self.search {
executor = executor.with_search(Arc::clone(search));
}
Ok(WorkflowHost {
persistence,
lock_provider,
queue_provider,
lifecycle: self.lifecycle,
search: self.search,
registry: Arc::new(RwLock::new(InMemoryWorkflowRegistry::new())),
step_registry: Arc::new(RwLock::new(StepRegistry::new())),
executor: Arc::new(executor),
shutdown: CancellationToken::new(),
})
}
}
impl Default for WorkflowHostBuilder {
fn default() -> Self {
Self::new()
}
}

15
wfe/src/lib.rs Normal file
View File

@@ -0,0 +1,15 @@
pub mod host;
pub mod host_builder;
pub mod purger;
pub mod registry;
pub mod sync_runner;
// Re-export everything useful from wfe-core.
pub use wfe_core::*;
// Re-export the new types at top level for convenience.
pub use host::WorkflowHost;
pub use host_builder::WorkflowHostBuilder;
pub use purger::purge_workflows;
pub use registry::InMemoryWorkflowRegistry;
pub use sync_runner::run_workflow_sync;

28
wfe/src/purger.rs Normal file
View File

@@ -0,0 +1,28 @@
use chrono::{DateTime, Utc};
use wfe_core::models::WorkflowStatus;
use wfe_core::traits::PersistenceProvider;
use wfe_core::Result;
/// Purge workflows matching a given status that were created before `older_than`.
///
/// TODO: This requires a query-by-status-and-date method on the persistence trait.
/// For now, this is a stub that documents the intended contract. When the persistence
/// layer gains `get_workflows_by_status` and `delete_workflow` methods, this function
/// should be implemented fully.
pub async fn purge_workflows(
_persistence: &dyn PersistenceProvider,
_status: WorkflowStatus,
_older_than: DateTime<Utc>,
) -> Result<()> {
// TODO: Implement once PersistenceProvider exposes:
// async fn get_workflows_by_status(&self, status: WorkflowStatus, before: DateTime<Utc>) -> Result<Vec<String>>;
// async fn delete_workflow(&self, id: &str) -> Result<()>;
//
// Intended implementation:
// let ids = persistence.get_workflows_by_status(status, older_than).await?;
// for id in ids {
// persistence.delete_workflow(&id).await?;
// }
Ok(())
}

133
wfe/src/registry.rs Normal file
View File

@@ -0,0 +1,133 @@
use std::collections::HashMap;
use wfe_core::models::WorkflowDefinition;
use wfe_core::traits::registry::WorkflowRegistry;
/// Concrete in-memory implementation of `WorkflowRegistry`.
pub struct InMemoryWorkflowRegistry {
definitions: HashMap<(String, u32), WorkflowDefinition>,
}
impl InMemoryWorkflowRegistry {
pub fn new() -> Self {
Self {
definitions: HashMap::new(),
}
}
}
impl Default for InMemoryWorkflowRegistry {
fn default() -> Self {
Self::new()
}
}
impl WorkflowRegistry for InMemoryWorkflowRegistry {
fn register(&mut self, definition: WorkflowDefinition) {
let key = (definition.id.clone(), definition.version);
self.definitions.insert(key, definition);
}
fn get_definition(&self, id: &str, version: Option<u32>) -> Option<&WorkflowDefinition> {
match version {
Some(v) => self.definitions.get(&(id.to_string(), v)),
None => {
// Return the definition with the highest version for this id.
self.definitions
.iter()
.filter(|((def_id, _), _)| def_id == id)
.max_by_key(|((_, v), _)| *v)
.map(|(_, def)| def)
}
}
}
fn is_registered(&self, id: &str, version: u32) -> bool {
self.definitions.contains_key(&(id.to_string(), version))
}
fn deregister(&mut self, id: &str, version: u32) -> bool {
self.definitions
.remove(&(id.to_string(), version))
.is_some()
}
fn get_all_definitions(&self) -> Vec<&WorkflowDefinition> {
self.definitions.values().collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_definition(id: &str, version: u32) -> WorkflowDefinition {
WorkflowDefinition::new(id, version)
}
#[test]
fn registry_register_and_get() {
let mut registry = InMemoryWorkflowRegistry::new();
let def = make_definition("my-workflow", 1);
registry.register(def);
let retrieved = registry.get_definition("my-workflow", Some(1));
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().id, "my-workflow");
assert_eq!(retrieved.unwrap().version, 1);
}
#[test]
fn registry_version_support() {
let mut registry = InMemoryWorkflowRegistry::new();
registry.register(make_definition("wf", 1));
registry.register(make_definition("wf", 2));
// Get specific versions.
let v1 = registry.get_definition("wf", Some(1)).unwrap();
assert_eq!(v1.version, 1);
let v2 = registry.get_definition("wf", Some(2)).unwrap();
assert_eq!(v2.version, 2);
// Get latest (None) returns v2.
let latest = registry.get_definition("wf", None).unwrap();
assert_eq!(latest.version, 2);
}
#[test]
fn registry_deregister() {
let mut registry = InMemoryWorkflowRegistry::new();
registry.register(make_definition("wf", 1));
assert!(registry.is_registered("wf", 1));
let removed = registry.deregister("wf", 1);
assert!(removed);
assert!(!registry.is_registered("wf", 1));
assert!(registry.get_definition("wf", Some(1)).is_none());
}
#[test]
fn registry_get_all_definitions() {
let mut registry = InMemoryWorkflowRegistry::new();
registry.register(make_definition("a", 1));
registry.register(make_definition("b", 1));
registry.register(make_definition("a", 2));
let all = registry.get_all_definitions();
assert_eq!(all.len(), 3);
}
#[test]
fn registry_get_nonexistent_returns_none() {
let registry = InMemoryWorkflowRegistry::new();
assert!(registry.get_definition("nope", Some(1)).is_none());
assert!(registry.get_definition("nope", None).is_none());
}
#[test]
fn registry_deregister_nonexistent_returns_false() {
let mut registry = InMemoryWorkflowRegistry::new();
assert!(!registry.deregister("nope", 1));
}
}

42
wfe/src/sync_runner.rs Normal file
View File

@@ -0,0 +1,42 @@
use std::time::Duration;
use wfe_core::models::{WorkflowInstance, WorkflowStatus};
use wfe_core::{Result, WfeError};
use crate::host::WorkflowHost;
/// Run a workflow to completion synchronously (for testing).
///
/// Starts the workflow, then polls persistence in a loop until the workflow
/// reaches `Complete` or `Terminated` status, or the timeout expires.
pub async fn run_workflow_sync(
host: &WorkflowHost,
definition_id: &str,
version: u32,
data: serde_json::Value,
timeout: Duration,
) -> Result<WorkflowInstance> {
let workflow_id = host.start_workflow(definition_id, version, data).await?;
let start = tokio::time::Instant::now();
let poll_interval = Duration::from_millis(25);
loop {
if start.elapsed() > timeout {
return Err(WfeError::StepExecution(format!(
"Workflow {workflow_id} did not complete within {timeout:?}"
)));
}
let instance = host.persistence.get_workflow_instance(&workflow_id).await?;
match instance.status {
WorkflowStatus::Complete | WorkflowStatus::Terminated => {
return Ok(instance);
}
_ => {
tokio::time::sleep(poll_interval).await;
}
}
}
}