From eea8bdb82409ed3acbb8bdf61e67a8c0a094c562 Mon Sep 17 00:00:00 2001 From: Sienna Meridian Satterwhite Date: Wed, 25 Mar 2026 20:09:17 +0000 Subject: [PATCH] feat(wfe-core): add test support with in-memory providers and test suites InMemoryPersistenceProvider, InMemoryLockProvider, InMemoryQueueProvider, InMemoryLifecyclePublisher behind test-support feature flag. Shared test suite macros: persistence_suite!, lock_suite!, queue_suite! that run the same tests against any provider implementation. --- wfe-core/src/test_support/fixtures.rs | 62 ++ .../src/test_support/in_memory_lifecycle.rs | 65 ++ wfe-core/src/test_support/in_memory_lock.rs | 50 ++ .../src/test_support/in_memory_persistence.rs | 585 ++++++++++++++++++ wfe-core/src/test_support/in_memory_queue.rs | 58 ++ wfe-core/src/test_support/lock_suite.rs | 49 ++ wfe-core/src/test_support/mod.rs | 26 + .../src/test_support/persistence_suite.rs | 243 ++++++++ wfe-core/src/test_support/queue_suite.rs | 110 ++++ 9 files changed, 1248 insertions(+) create mode 100644 wfe-core/src/test_support/fixtures.rs create mode 100644 wfe-core/src/test_support/in_memory_lifecycle.rs create mode 100644 wfe-core/src/test_support/in_memory_lock.rs create mode 100644 wfe-core/src/test_support/in_memory_persistence.rs create mode 100644 wfe-core/src/test_support/in_memory_queue.rs create mode 100644 wfe-core/src/test_support/lock_suite.rs create mode 100644 wfe-core/src/test_support/mod.rs create mode 100644 wfe-core/src/test_support/persistence_suite.rs create mode 100644 wfe-core/src/test_support/queue_suite.rs diff --git a/wfe-core/src/test_support/fixtures.rs b/wfe-core/src/test_support/fixtures.rs new file mode 100644 index 0000000..6740feb --- /dev/null +++ b/wfe-core/src/test_support/fixtures.rs @@ -0,0 +1,62 @@ +use chrono::Utc; + +use crate::models::{Event, EventSubscription, ExecutionError, WorkflowInstance}; + +/// Create a sample `WorkflowInstance` for testing. +pub fn sample_workflow_instance() -> WorkflowInstance { + WorkflowInstance::new("test-workflow", 1, serde_json::json!({"key": "value"})) +} + +/// Create a sample `Event` for testing. +pub fn sample_event() -> Event { + Event::new( + "order.created", + "order-123", + serde_json::json!({"amount": 42}), + ) +} + +/// Create a sample `EventSubscription` for testing. +pub fn sample_subscription() -> EventSubscription { + EventSubscription::new("wf-1", 0, "ptr-1", "order.created", "order-123", Utc::now()) +} + +/// Create a sample `ExecutionError` for testing. +pub fn sample_execution_error() -> ExecutionError { + ExecutionError::new("wf-1", "ptr-1", "something went wrong") +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn sample_workflow_instance_has_expected_fields() { + let instance = sample_workflow_instance(); + assert_eq!(instance.workflow_definition_id, "test-workflow"); + assert_eq!(instance.version, 1); + } + + #[test] + fn sample_event_has_expected_fields() { + let event = sample_event(); + assert_eq!(event.event_name, "order.created"); + assert_eq!(event.event_key, "order-123"); + } + + #[test] + fn sample_subscription_has_expected_fields() { + let sub = sample_subscription(); + assert_eq!(sub.workflow_id, "wf-1"); + assert_eq!(sub.event_name, "order.created"); + assert_eq!(sub.event_key, "order-123"); + } + + #[test] + fn sample_execution_error_has_expected_fields() { + let err = sample_execution_error(); + assert_eq!(err.workflow_id, "wf-1"); + assert_eq!(err.execution_pointer_id, "ptr-1"); + assert_eq!(err.message, "something went wrong"); + } +} diff --git a/wfe-core/src/test_support/in_memory_lifecycle.rs b/wfe-core/src/test_support/in_memory_lifecycle.rs new file mode 100644 index 0000000..39295d7 --- /dev/null +++ b/wfe-core/src/test_support/in_memory_lifecycle.rs @@ -0,0 +1,65 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use tokio::sync::Mutex; + +use crate::models::LifecycleEvent; +use crate::traits::LifecyclePublisher; +use crate::Result; + +/// An in-memory implementation of `LifecyclePublisher` for testing. +#[derive(Debug, Clone)] +pub struct InMemoryLifecyclePublisher { + events: Arc>>, +} + +impl InMemoryLifecyclePublisher { + pub fn new() -> Self { + Self { + events: Arc::new(Mutex::new(Vec::new())), + } + } + + /// Retrieve all published lifecycle events for assertions. + pub async fn events(&self) -> Vec { + self.events.lock().await.clone() + } +} + +impl Default for InMemoryLifecyclePublisher { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl LifecyclePublisher for InMemoryLifecyclePublisher { + async fn publish(&self, event: LifecycleEvent) -> Result<()> { + self.events.lock().await.push(event); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::models::LifecycleEventType; + use crate::traits::LifecyclePublisher; + + #[test] + fn default_impl() { + let lc = InMemoryLifecyclePublisher::default(); + drop(lc); + } + + #[tokio::test] + async fn publish_and_retrieve_events() { + let lc = InMemoryLifecyclePublisher::new(); + let event = LifecycleEvent::new("wf-1", "def-1", 1, LifecycleEventType::Started); + lc.publish(event).await.unwrap(); + + let events = lc.events().await; + assert_eq!(events.len(), 1); + assert_eq!(events[0].event_type, LifecycleEventType::Started); + } +} diff --git a/wfe-core/src/test_support/in_memory_lock.rs b/wfe-core/src/test_support/in_memory_lock.rs new file mode 100644 index 0000000..75352d7 --- /dev/null +++ b/wfe-core/src/test_support/in_memory_lock.rs @@ -0,0 +1,50 @@ +use std::collections::HashSet; +use std::sync::Arc; + +use async_trait::async_trait; +use tokio::sync::Mutex; + +use crate::traits::DistributedLockProvider; +use crate::Result; + +/// An in-memory implementation of `DistributedLockProvider` for testing. +#[derive(Debug, Clone)] +pub struct InMemoryLockProvider { + locks: Arc>>, +} + +impl InMemoryLockProvider { + pub fn new() -> Self { + Self { + locks: Arc::new(Mutex::new(HashSet::new())), + } + } +} + +impl Default for InMemoryLockProvider { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl DistributedLockProvider for InMemoryLockProvider { + async fn acquire_lock(&self, resource: &str) -> Result { + let mut locks = self.locks.lock().await; + Ok(locks.insert(resource.to_string())) + } + + async fn release_lock(&self, resource: &str) -> Result<()> { + let mut locks = self.locks.lock().await; + locks.remove(resource); + Ok(()) + } + + async fn start(&self) -> Result<()> { + Ok(()) + } + + async fn stop(&self) -> Result<()> { + Ok(()) + } +} diff --git a/wfe-core/src/test_support/in_memory_persistence.rs b/wfe-core/src/test_support/in_memory_persistence.rs new file mode 100644 index 0000000..bc8c89f --- /dev/null +++ b/wfe-core/src/test_support/in_memory_persistence.rs @@ -0,0 +1,585 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use tokio::sync::RwLock; + +use crate::models::{ + Event, EventSubscription, ExecutionError, ScheduledCommand, WorkflowInstance, +}; +use crate::traits::{ + EventRepository, PersistenceProvider, ScheduledCommandRepository, SubscriptionRepository, + WorkflowRepository, +}; +use crate::{Result, WfeError}; + +/// An in-memory implementation of `PersistenceProvider` for testing. +#[derive(Debug, Clone)] +pub struct InMemoryPersistenceProvider { + workflows: Arc>>, + events: Arc>>, + subscriptions: Arc>>, + errors: Arc>>, + scheduled_commands: Arc>>, +} + +impl InMemoryPersistenceProvider { + pub fn new() -> Self { + Self { + workflows: Arc::new(RwLock::new(HashMap::new())), + events: Arc::new(RwLock::new(HashMap::new())), + subscriptions: Arc::new(RwLock::new(HashMap::new())), + errors: Arc::new(RwLock::new(Vec::new())), + scheduled_commands: Arc::new(RwLock::new(Vec::new())), + } + } + + /// Retrieve all stored errors (for test assertions). + pub async fn get_errors(&self) -> Vec { + self.errors.read().await.clone() + } +} + +impl Default for InMemoryPersistenceProvider { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl WorkflowRepository for InMemoryPersistenceProvider { + async fn create_new_workflow(&self, instance: &WorkflowInstance) -> Result { + let id = if instance.id.is_empty() { + uuid::Uuid::new_v4().to_string() + } else { + instance.id.clone() + }; + let mut stored = instance.clone(); + stored.id = id.clone(); + self.workflows.write().await.insert(id.clone(), stored); + Ok(id) + } + + async fn persist_workflow(&self, instance: &WorkflowInstance) -> Result<()> { + self.workflows + .write() + .await + .insert(instance.id.clone(), instance.clone()); + Ok(()) + } + + async fn persist_workflow_with_subscriptions( + &self, + instance: &WorkflowInstance, + subscriptions: &[EventSubscription], + ) -> Result<()> { + self.persist_workflow(instance).await?; + let mut subs = self.subscriptions.write().await; + for sub in subscriptions { + subs.insert(sub.id.clone(), sub.clone()); + } + Ok(()) + } + + async fn get_runnable_instances(&self, as_at: DateTime) -> Result> { + let workflows = self.workflows.read().await; + let as_at_millis = as_at.timestamp_millis(); + let ids = workflows + .values() + .filter(|w| { + w.status == crate::models::WorkflowStatus::Runnable + && w.next_execution + .map(|ne| ne <= as_at_millis) + .unwrap_or(false) + }) + .map(|w| w.id.clone()) + .collect(); + Ok(ids) + } + + async fn get_workflow_instance(&self, id: &str) -> Result { + self.workflows + .read() + .await + .get(id) + .cloned() + .ok_or_else(|| WfeError::WorkflowNotFound(id.to_string())) + } + + async fn get_workflow_instances(&self, ids: &[String]) -> Result> { + let workflows = self.workflows.read().await; + let mut result = Vec::new(); + for id in ids { + if let Some(w) = workflows.get(id) { + result.push(w.clone()); + } + } + Ok(result) + } +} + +#[async_trait] +impl SubscriptionRepository for InMemoryPersistenceProvider { + async fn create_event_subscription( + &self, + subscription: &EventSubscription, + ) -> Result { + let id = if subscription.id.is_empty() { + uuid::Uuid::new_v4().to_string() + } else { + subscription.id.clone() + }; + let mut stored = subscription.clone(); + stored.id = id.clone(); + self.subscriptions.write().await.insert(id.clone(), stored); + Ok(id) + } + + async fn get_subscriptions( + &self, + event_name: &str, + event_key: &str, + as_of: DateTime, + ) -> Result> { + let subs = self.subscriptions.read().await; + let result = subs + .values() + .filter(|s| { + s.event_name == event_name + && s.event_key == event_key + && s.subscribe_as_of <= as_of + && s.external_token.is_none() // not terminated + }) + .cloned() + .collect(); + Ok(result) + } + + async fn terminate_subscription(&self, subscription_id: &str) -> Result<()> { + let mut subs = self.subscriptions.write().await; + match subs.get_mut(subscription_id) { + Some(sub) => { + sub.external_token = Some("__terminated__".to_string()); + Ok(()) + } + None => Err(WfeError::SubscriptionNotFound(subscription_id.to_string())), + } + } + + async fn get_subscription(&self, subscription_id: &str) -> Result { + self.subscriptions + .read() + .await + .get(subscription_id) + .cloned() + .ok_or_else(|| WfeError::SubscriptionNotFound(subscription_id.to_string())) + } + + async fn get_first_open_subscription( + &self, + event_name: &str, + event_key: &str, + as_of: DateTime, + ) -> Result> { + let subs = self.subscriptions.read().await; + let result = subs + .values() + .find(|s| { + s.event_name == event_name + && s.event_key == event_key + && s.subscribe_as_of <= as_of + && s.external_token.is_none() + }) + .cloned(); + Ok(result) + } + + async fn set_subscription_token( + &self, + subscription_id: &str, + token: &str, + worker_id: &str, + expiry: DateTime, + ) -> Result { + let mut subs = self.subscriptions.write().await; + match subs.get_mut(subscription_id) { + Some(sub) => { + if sub.external_token.is_some() { + return Ok(false); + } + sub.external_token = Some(token.to_string()); + sub.external_worker_id = Some(worker_id.to_string()); + sub.external_token_expiry = Some(expiry); + Ok(true) + } + None => Err(WfeError::SubscriptionNotFound(subscription_id.to_string())), + } + } + + async fn clear_subscription_token( + &self, + subscription_id: &str, + token: &str, + ) -> Result<()> { + let mut subs = self.subscriptions.write().await; + match subs.get_mut(subscription_id) { + Some(sub) => { + if sub.external_token.as_deref() != Some(token) { + return Err(WfeError::Persistence(format!( + "Token mismatch for subscription {subscription_id}" + ))); + } + sub.external_token = None; + sub.external_worker_id = None; + sub.external_token_expiry = None; + Ok(()) + } + None => Err(WfeError::SubscriptionNotFound(subscription_id.to_string())), + } + } +} + +#[async_trait] +impl EventRepository for InMemoryPersistenceProvider { + async fn create_event(&self, event: &Event) -> Result { + let id = if event.id.is_empty() { + uuid::Uuid::new_v4().to_string() + } else { + event.id.clone() + }; + let mut stored = event.clone(); + stored.id = id.clone(); + self.events.write().await.insert(id.clone(), stored); + Ok(id) + } + + async fn get_event(&self, id: &str) -> Result { + self.events + .read() + .await + .get(id) + .cloned() + .ok_or_else(|| WfeError::EventNotFound(id.to_string())) + } + + async fn get_runnable_events(&self, as_at: DateTime) -> Result> { + let events = self.events.read().await; + let ids = events + .values() + .filter(|e| !e.is_processed && e.event_time <= as_at) + .map(|e| e.id.clone()) + .collect(); + Ok(ids) + } + + async fn get_events( + &self, + event_name: &str, + event_key: &str, + as_of: DateTime, + ) -> Result> { + let events = self.events.read().await; + let ids = events + .values() + .filter(|e| e.event_name == event_name && e.event_key == event_key && e.event_time <= as_of) + .map(|e| e.id.clone()) + .collect(); + Ok(ids) + } + + async fn mark_event_processed(&self, id: &str) -> Result<()> { + let mut events = self.events.write().await; + match events.get_mut(id) { + Some(event) => { + event.is_processed = true; + Ok(()) + } + None => Err(WfeError::EventNotFound(id.to_string())), + } + } + + async fn mark_event_unprocessed(&self, id: &str) -> Result<()> { + let mut events = self.events.write().await; + match events.get_mut(id) { + Some(event) => { + event.is_processed = false; + Ok(()) + } + None => Err(WfeError::EventNotFound(id.to_string())), + } + } +} + +#[async_trait] +impl ScheduledCommandRepository for InMemoryPersistenceProvider { + fn supports_scheduled_commands(&self) -> bool { + true + } + + async fn schedule_command(&self, command: &ScheduledCommand) -> Result<()> { + self.scheduled_commands.write().await.push(command.clone()); + Ok(()) + } + + async fn process_commands( + &self, + as_of: DateTime, + handler: &(dyn Fn(ScheduledCommand) -> std::pin::Pin> + Send>> + + Send + + Sync), + ) -> Result<()> { + let as_of_millis = as_of.timestamp_millis(); + let due: Vec = { + let mut cmds = self.scheduled_commands.write().await; + let (due, remaining): (Vec<_>, Vec<_>) = + cmds.drain(..).partition(|c| c.execute_time <= as_of_millis); + *cmds = remaining; + due + }; + for cmd in due { + handler(cmd).await?; + } + Ok(()) + } +} + +#[async_trait] +impl PersistenceProvider for InMemoryPersistenceProvider { + async fn persist_errors(&self, errors: &[ExecutionError]) -> Result<()> { + let mut stored = self.errors.write().await; + stored.extend(errors.iter().cloned()); + Ok(()) + } + + async fn ensure_store_exists(&self) -> Result<()> { + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::models::{Event, EventSubscription, ExecutionError, ScheduledCommand, CommandName}; + use crate::traits::{ + EventRepository, PersistenceProvider, ScheduledCommandRepository, SubscriptionRepository, + WorkflowRepository, + }; + use chrono::{Duration, Utc}; + + #[tokio::test] + async fn default_impl() { + let p = InMemoryPersistenceProvider::default(); + p.ensure_store_exists().await.unwrap(); + } + + #[tokio::test] + async fn get_workflow_instances_batch() { + let p = InMemoryPersistenceProvider::new(); + let w1 = WorkflowInstance::new("wf", 1, serde_json::json!({})); + let id1 = p.create_new_workflow(&w1).await.unwrap(); + let w2 = WorkflowInstance::new("wf", 1, serde_json::json!({})); + let id2 = p.create_new_workflow(&w2).await.unwrap(); + + let ids = vec![id1.clone(), id2.clone(), "nonexistent".to_string()]; + let result = p.get_workflow_instances(&ids).await.unwrap(); + // Only existing workflows are returned. + assert_eq!(result.len(), 2); + } + + #[tokio::test] + async fn get_first_open_subscription_returns_match() { + let p = InMemoryPersistenceProvider::new(); + let now = Utc::now(); + let sub = EventSubscription::new("wf-1", 0, "ptr-1", "evt", "key", now); + let id = p.create_event_subscription(&sub).await.unwrap(); + + let found = p + .get_first_open_subscription("evt", "key", now + Duration::seconds(1)) + .await + .unwrap(); + assert!(found.is_some()); + assert_eq!(found.unwrap().id, id); + } + + #[tokio::test] + async fn get_first_open_subscription_returns_none() { + let p = InMemoryPersistenceProvider::new(); + let now = Utc::now(); + let found = p + .get_first_open_subscription("evt", "key", now) + .await + .unwrap(); + assert!(found.is_none()); + } + + #[tokio::test] + async fn set_subscription_token_success() { + let p = InMemoryPersistenceProvider::new(); + let now = Utc::now(); + let sub = EventSubscription::new("wf-1", 0, "ptr-1", "evt", "key", now); + let id = p.create_event_subscription(&sub).await.unwrap(); + + let expiry = now + Duration::hours(1); + let result = p + .set_subscription_token(&id, "token-1", "worker-1", expiry) + .await + .unwrap(); + assert!(result); + + // Setting token again on already-tokened subscription returns false. + let result2 = p + .set_subscription_token(&id, "token-2", "worker-2", expiry) + .await + .unwrap(); + assert!(!result2); + } + + #[tokio::test] + async fn set_subscription_token_not_found() { + let p = InMemoryPersistenceProvider::new(); + let result = p + .set_subscription_token("nonexistent", "t", "w", Utc::now()) + .await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn clear_subscription_token_success() { + let p = InMemoryPersistenceProvider::new(); + let now = Utc::now(); + let sub = EventSubscription::new("wf-1", 0, "ptr-1", "evt", "key", now); + let id = p.create_event_subscription(&sub).await.unwrap(); + + let expiry = now + Duration::hours(1); + p.set_subscription_token(&id, "token-1", "worker-1", expiry) + .await + .unwrap(); + + p.clear_subscription_token(&id, "token-1").await.unwrap(); + + // After clearing, the subscription should be open again. + let retrieved = p.get_subscription(&id).await.unwrap(); + assert!(retrieved.external_token.is_none()); + } + + #[tokio::test] + async fn clear_subscription_token_not_found() { + let p = InMemoryPersistenceProvider::new(); + let result = p.clear_subscription_token("nonexistent", "t").await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn terminate_subscription_not_found() { + let p = InMemoryPersistenceProvider::new(); + let result = p.terminate_subscription("nonexistent").await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn get_events_by_name_and_key() { + let p = InMemoryPersistenceProvider::new(); + let now = Utc::now(); + let e1 = Event::new("evt-a", "key-1", serde_json::json!({})); + let id1 = p.create_event(&e1).await.unwrap(); + let e2 = Event::new("evt-a", "key-2", serde_json::json!({})); + let _id2 = p.create_event(&e2).await.unwrap(); + let e3 = Event::new("evt-b", "key-1", serde_json::json!({})); + let _id3 = p.create_event(&e3).await.unwrap(); + + let ids = p + .get_events("evt-a", "key-1", now + Duration::seconds(1)) + .await + .unwrap(); + assert_eq!(ids.len(), 1); + assert_eq!(ids[0], id1); + } + + #[tokio::test] + async fn mark_event_unprocessed() { + let p = InMemoryPersistenceProvider::new(); + let e = Event::new("evt", "key", serde_json::json!({})); + let id = p.create_event(&e).await.unwrap(); + + p.mark_event_processed(&id).await.unwrap(); + let event = p.get_event(&id).await.unwrap(); + assert!(event.is_processed); + + p.mark_event_unprocessed(&id).await.unwrap(); + let event = p.get_event(&id).await.unwrap(); + assert!(!event.is_processed); + } + + #[tokio::test] + async fn mark_event_unprocessed_not_found() { + let p = InMemoryPersistenceProvider::new(); + let result = p.mark_event_unprocessed("nonexistent").await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn mark_event_processed_not_found() { + let p = InMemoryPersistenceProvider::new(); + let result = p.mark_event_processed("nonexistent").await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn get_errors_returns_persisted_errors() { + let p = InMemoryPersistenceProvider::new(); + let errors = vec![ + ExecutionError::new("wf-1", "ptr-1", "err1"), + ExecutionError::new("wf-2", "ptr-2", "err2"), + ]; + p.persist_errors(&errors).await.unwrap(); + let stored = p.get_errors().await; + assert_eq!(stored.len(), 2); + } + + #[tokio::test] + async fn supports_scheduled_commands_returns_true() { + let p = InMemoryPersistenceProvider::new(); + assert!(p.supports_scheduled_commands()); + } + + #[tokio::test] + async fn schedule_and_process_commands() { + let p = InMemoryPersistenceProvider::new(); + let now = Utc::now(); + let cmd = ScheduledCommand { + command_name: CommandName::ProcessEvent, + data: "test-event-id".to_string(), + execute_time: now.timestamp_millis() - 1000, + }; + p.schedule_command(&cmd).await.unwrap(); + + p.process_commands(now, &|c: ScheduledCommand| { + let _data = c.data; + Box::pin(async move { Ok(()) }) + }) + .await + .unwrap(); + + // After processing, no more due commands remain. + let remaining_count = p.scheduled_commands.read().await.len(); + assert_eq!(remaining_count, 0); + } + + #[tokio::test] + async fn persist_workflow_with_subscriptions() { + let p = InMemoryPersistenceProvider::new(); + let w = WorkflowInstance::new("wf", 1, serde_json::json!({})); + let id = p.create_new_workflow(&w).await.unwrap(); + let mut updated = p.get_workflow_instance(&id).await.unwrap(); + updated.description = Some("updated".to_string()); + + let sub = EventSubscription::new("wf-1", 0, "ptr-1", "evt", "key", Utc::now()); + p.persist_workflow_with_subscriptions(&updated, &[sub]) + .await + .unwrap(); + + let retrieved = p.get_workflow_instance(&id).await.unwrap(); + assert_eq!(retrieved.description.as_deref(), Some("updated")); + } +} diff --git a/wfe-core/src/test_support/in_memory_queue.rs b/wfe-core/src/test_support/in_memory_queue.rs new file mode 100644 index 0000000..e1f023d --- /dev/null +++ b/wfe-core/src/test_support/in_memory_queue.rs @@ -0,0 +1,58 @@ +use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; + +use async_trait::async_trait; +use tokio::sync::Mutex; + +use crate::models::QueueType; +use crate::traits::QueueProvider; +use crate::Result; + +/// An in-memory implementation of `QueueProvider` for testing. +#[derive(Debug, Clone)] +pub struct InMemoryQueueProvider { + queues: Arc>>>, +} + +impl InMemoryQueueProvider { + pub fn new() -> Self { + Self { + queues: Arc::new(Mutex::new(HashMap::new())), + } + } +} + +impl Default for InMemoryQueueProvider { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl QueueProvider for InMemoryQueueProvider { + async fn queue_work(&self, id: &str, queue: QueueType) -> Result<()> { + let mut queues = self.queues.lock().await; + queues + .entry(queue) + .or_insert_with(VecDeque::new) + .push_back(id.to_string()); + Ok(()) + } + + async fn dequeue_work(&self, queue: QueueType) -> Result> { + let mut queues = self.queues.lock().await; + Ok(queues.get_mut(&queue).and_then(|q| q.pop_front())) + } + + fn is_dequeue_blocking(&self) -> bool { + false + } + + async fn start(&self) -> Result<()> { + Ok(()) + } + + async fn stop(&self) -> Result<()> { + Ok(()) + } +} diff --git a/wfe-core/src/test_support/lock_suite.rs b/wfe-core/src/test_support/lock_suite.rs new file mode 100644 index 0000000..5084138 --- /dev/null +++ b/wfe-core/src/test_support/lock_suite.rs @@ -0,0 +1,49 @@ +/// Generates a test suite for any `DistributedLockProvider` implementation. +/// +/// The macro takes a factory expression that returns an `impl DistributedLockProvider`. +/// +/// # Example +/// ```ignore +/// lock_suite!(|| async { InMemoryLockProvider::new() }); +/// ``` +#[macro_export] +macro_rules! lock_suite { + ($factory:expr) => { + mod lock_suite { + use super::*; + use $crate::traits::DistributedLockProvider; + + #[tokio::test] + async fn acquire_lock_succeeds() { + let provider = ($factory)().await; + let acquired = provider.acquire_lock("resource-1").await.unwrap(); + assert!(acquired); + } + + #[tokio::test] + async fn double_acquire_fails() { + let provider = ($factory)().await; + let first = provider.acquire_lock("resource-1").await.unwrap(); + assert!(first); + let second = provider.acquire_lock("resource-1").await.unwrap(); + assert!(!second); + } + + #[tokio::test] + async fn release_then_reacquire() { + let provider = ($factory)().await; + provider.acquire_lock("resource-1").await.unwrap(); + provider.release_lock("resource-1").await.unwrap(); + let reacquired = provider.acquire_lock("resource-1").await.unwrap(); + assert!(reacquired); + } + + #[tokio::test] + async fn release_nonexistent_ok() { + let provider = ($factory)().await; + // Should not error even if lock was never acquired + provider.release_lock("nonexistent").await.unwrap(); + } + } + }; +} diff --git a/wfe-core/src/test_support/mod.rs b/wfe-core/src/test_support/mod.rs new file mode 100644 index 0000000..94746ca --- /dev/null +++ b/wfe-core/src/test_support/mod.rs @@ -0,0 +1,26 @@ +pub mod fixtures; +pub mod in_memory_lifecycle; +pub mod in_memory_lock; +pub mod in_memory_persistence; +pub mod in_memory_queue; + +// Test suite macros (exported via #[macro_export] at crate level) +mod lock_suite; +mod persistence_suite; +mod queue_suite; + +pub use fixtures::*; +pub use in_memory_lifecycle::InMemoryLifecyclePublisher; +pub use in_memory_lock::InMemoryLockProvider; +pub use in_memory_persistence::InMemoryPersistenceProvider; +pub use in_memory_queue::InMemoryQueueProvider; + +// Run the test suites against the in-memory providers. +#[cfg(test)] +mod tests { + use super::*; + + crate::persistence_suite!(|| async { InMemoryPersistenceProvider::new() }); + crate::lock_suite!(|| async { InMemoryLockProvider::new() }); + crate::queue_suite!(|| async { InMemoryQueueProvider::new() }); +} diff --git a/wfe-core/src/test_support/persistence_suite.rs b/wfe-core/src/test_support/persistence_suite.rs new file mode 100644 index 0000000..c6f51ed --- /dev/null +++ b/wfe-core/src/test_support/persistence_suite.rs @@ -0,0 +1,243 @@ +/// Generates a test suite for any `PersistenceProvider` implementation. +/// +/// The macro takes a factory expression that returns an `impl PersistenceProvider`. +/// The factory expression must be async-compatible (it will be `.await`ed). +/// +/// # Example +/// ```ignore +/// persistence_suite!(|| async { InMemoryPersistenceProvider::new() }); +/// ``` +#[macro_export] +macro_rules! persistence_suite { + ($factory:expr) => { + mod persistence_suite { + use super::*; + use chrono::{Duration, Utc}; + use $crate::models::{ + Event, EventSubscription, ExecutionError, WorkflowInstance, WorkflowStatus, + }; + use $crate::traits::{ + EventRepository, PersistenceProvider, SubscriptionRepository, WorkflowRepository, + }; + + #[tokio::test] + async fn create_new_workflow_generates_id() { + let provider = ($factory)().await; + let instance = WorkflowInstance::new("test-wf", 1, serde_json::json!({})); + let id = provider.create_new_workflow(&instance).await.unwrap(); + assert!(!id.is_empty()); + } + + #[tokio::test] + async fn get_workflow_instance_retrieves_workflow() { + let provider = ($factory)().await; + let instance = WorkflowInstance::new("test-wf", 1, serde_json::json!({"x": 1})); + let id = provider.create_new_workflow(&instance).await.unwrap(); + let retrieved = provider.get_workflow_instance(&id).await.unwrap(); + assert_eq!(retrieved.id, id); + assert_eq!(retrieved.workflow_definition_id, "test-wf"); + assert_eq!(retrieved.data, serde_json::json!({"x": 1})); + } + + #[tokio::test] + async fn persist_workflow_updates_fields() { + let provider = ($factory)().await; + let mut instance = + WorkflowInstance::new("test-wf", 1, serde_json::json!({})); + let id = provider.create_new_workflow(&instance).await.unwrap(); + instance.id = id.clone(); + instance.description = Some("updated".to_string()); + instance.status = WorkflowStatus::Suspended; + provider.persist_workflow(&instance).await.unwrap(); + + let retrieved = provider.get_workflow_instance(&id).await.unwrap(); + assert_eq!(retrieved.description.as_deref(), Some("updated")); + assert_eq!(retrieved.status, WorkflowStatus::Suspended); + } + + #[tokio::test] + async fn get_runnable_instances_filters_by_time() { + let provider = ($factory)().await; + + // Runnable with next_execution in the past + let mut w1 = WorkflowInstance::new("wf", 1, serde_json::json!({})); + w1.next_execution = Some(0); + let id1 = provider.create_new_workflow(&w1).await.unwrap(); + + // Runnable with next_execution far in the future + let mut w2 = WorkflowInstance::new("wf", 1, serde_json::json!({})); + w2.next_execution = Some(i64::MAX); + let _id2 = provider.create_new_workflow(&w2).await.unwrap(); + + // Suspended workflow + let mut w3 = WorkflowInstance::new("wf", 1, serde_json::json!({})); + w3.next_execution = Some(0); + w3.status = WorkflowStatus::Suspended; + let id3 = provider.create_new_workflow(&w3).await.unwrap(); + // Need to persist updated status + w3.id = id3; + provider.persist_workflow(&w3).await.unwrap(); + + let runnable = provider.get_runnable_instances(Utc::now()).await.unwrap(); + assert!(runnable.contains(&id1)); + assert_eq!(runnable.len(), 1); + } + + #[tokio::test] + async fn persist_errors_stores_errors() { + let provider = ($factory)().await; + let errors = vec![ + ExecutionError::new("wf-1", "ptr-1", "error one"), + ExecutionError::new("wf-1", "ptr-2", "error two"), + ]; + provider.persist_errors(&errors).await.unwrap(); + + // Persist more errors + let more = vec![ExecutionError::new("wf-2", "ptr-3", "error three")]; + provider.persist_errors(&more).await.unwrap(); + + // We can't directly read errors from the trait, but persist_errors should not fail + } + + #[tokio::test] + async fn create_and_get_subscription() { + let provider = ($factory)().await; + let sub = EventSubscription::new( + "wf-1", + 0, + "ptr-1", + "order.created", + "order-123", + Utc::now(), + ); + let id = provider.create_event_subscription(&sub).await.unwrap(); + let retrieved = provider.get_subscription(&id).await.unwrap(); + assert_eq!(retrieved.id, id); + assert_eq!(retrieved.event_name, "order.created"); + assert_eq!(retrieved.event_key, "order-123"); + } + + #[tokio::test] + async fn get_subscriptions_by_event() { + let provider = ($factory)().await; + let now = Utc::now(); + + let sub1 = EventSubscription::new( + "wf-1", 0, "ptr-1", "order.created", "key-A", now, + ); + provider.create_event_subscription(&sub1).await.unwrap(); + + let sub2 = EventSubscription::new( + "wf-2", 1, "ptr-2", "order.created", "key-A", now, + ); + provider.create_event_subscription(&sub2).await.unwrap(); + + let sub3 = EventSubscription::new( + "wf-3", 0, "ptr-3", "order.created", "key-B", now, + ); + provider.create_event_subscription(&sub3).await.unwrap(); + + let result = provider + .get_subscriptions("order.created", "key-A", now + Duration::seconds(1)) + .await + .unwrap(); + assert_eq!(result.len(), 2); + } + + #[tokio::test] + async fn terminate_subscription() { + let provider = ($factory)().await; + let now = Utc::now(); + let sub = EventSubscription::new( + "wf-1", 0, "ptr-1", "evt", "key", now, + ); + let id = provider.create_event_subscription(&sub).await.unwrap(); + + provider.terminate_subscription(&id).await.unwrap(); + + // Terminated subscriptions should not appear in get_subscriptions + let subs = provider + .get_subscriptions("evt", "key", now + Duration::seconds(1)) + .await + .unwrap(); + assert!(subs.is_empty()); + } + + #[tokio::test] + async fn create_and_get_event() { + let provider = ($factory)().await; + let event = + Event::new("order.created", "order-123", serde_json::json!({"x": 1})); + let id = provider.create_event(&event).await.unwrap(); + let retrieved = provider.get_event(&id).await.unwrap(); + assert_eq!(retrieved.id, id); + assert_eq!(retrieved.event_name, "order.created"); + assert_eq!(retrieved.event_data, serde_json::json!({"x": 1})); + } + + #[tokio::test] + async fn get_runnable_events() { + let provider = ($factory)().await; + let event = Event::new("evt", "key", serde_json::json!(null)); + let id = provider.create_event(&event).await.unwrap(); + + let runnable = provider + .get_runnable_events(Utc::now() + Duration::seconds(1)) + .await + .unwrap(); + assert!(runnable.contains(&id)); + } + + #[tokio::test] + async fn mark_event_processed() { + let provider = ($factory)().await; + let event = Event::new("evt", "key", serde_json::json!(null)); + let id = provider.create_event(&event).await.unwrap(); + + provider.mark_event_processed(&id).await.unwrap(); + + let runnable = provider + .get_runnable_events(Utc::now() + Duration::seconds(1)) + .await + .unwrap(); + assert!(!runnable.contains(&id)); + + let retrieved = provider.get_event(&id).await.unwrap(); + assert!(retrieved.is_processed); + } + + #[tokio::test] + async fn concurrent_persist_workflow_no_data_race() { + let provider = std::sync::Arc::new(($factory)().await); + let mut handles = Vec::new(); + + for i in 0..30 { + let p = provider.clone(); + handles.push(tokio::spawn(async move { + let instance = WorkflowInstance::new( + format!("wf-{i}"), + 1, + serde_json::json!({"i": i}), + ); + p.create_new_workflow(&instance).await.unwrap() + })); + } + + let mut ids = Vec::new(); + for handle in handles { + ids.push(handle.await.unwrap()); + } + + // All 30 should be unique + let unique: std::collections::HashSet<_> = ids.iter().collect(); + assert_eq!(unique.len(), 30); + + // All should be retrievable + for id in &ids { + let w = provider.get_workflow_instance(id).await.unwrap(); + assert_eq!(w.id, *id); + } + } + } + }; +} diff --git a/wfe-core/src/test_support/queue_suite.rs b/wfe-core/src/test_support/queue_suite.rs new file mode 100644 index 0000000..9f5e549 --- /dev/null +++ b/wfe-core/src/test_support/queue_suite.rs @@ -0,0 +1,110 @@ +/// Generates a test suite for any `QueueProvider` implementation. +/// +/// The macro takes a factory expression that returns an `impl QueueProvider`. +/// +/// # Example +/// ```ignore +/// queue_suite!(|| async { InMemoryQueueProvider::new() }); +/// ``` +#[macro_export] +macro_rules! queue_suite { + ($factory:expr) => { + mod queue_suite { + use super::*; + use $crate::models::QueueType; + use $crate::traits::QueueProvider; + + #[tokio::test] + async fn enqueue_dequeue_fifo() { + let provider = ($factory)().await; + provider + .queue_work("a", QueueType::Workflow) + .await + .unwrap(); + provider + .queue_work("b", QueueType::Workflow) + .await + .unwrap(); + provider + .queue_work("c", QueueType::Workflow) + .await + .unwrap(); + + assert_eq!( + provider + .dequeue_work(QueueType::Workflow) + .await + .unwrap() + .as_deref(), + Some("a") + ); + assert_eq!( + provider + .dequeue_work(QueueType::Workflow) + .await + .unwrap() + .as_deref(), + Some("b") + ); + assert_eq!( + provider + .dequeue_work(QueueType::Workflow) + .await + .unwrap() + .as_deref(), + Some("c") + ); + } + + #[tokio::test] + async fn dequeue_empty_returns_none() { + let provider = ($factory)().await; + let result = provider.dequeue_work(QueueType::Workflow).await.unwrap(); + assert!(result.is_none()); + } + + #[tokio::test] + async fn multiple_queue_types_independent() { + let provider = ($factory)().await; + provider + .queue_work("wf-1", QueueType::Workflow) + .await + .unwrap(); + provider + .queue_work("evt-1", QueueType::Event) + .await + .unwrap(); + + // Dequeue from Event queue should get evt-1, not wf-1 + assert_eq!( + provider + .dequeue_work(QueueType::Event) + .await + .unwrap() + .as_deref(), + Some("evt-1") + ); + assert_eq!( + provider + .dequeue_work(QueueType::Workflow) + .await + .unwrap() + .as_deref(), + Some("wf-1") + ); + + // Both should now be empty + assert!(provider + .dequeue_work(QueueType::Event) + .await + .unwrap() + .is_none()); + assert!(provider + .dequeue_work(QueueType::Workflow) + .await + .unwrap() + .is_none()); + } + } + }; +}