feat(wfe-core): add test support with in-memory providers and test suites

InMemoryPersistenceProvider, InMemoryLockProvider, InMemoryQueueProvider,
InMemoryLifecyclePublisher behind test-support feature flag.

Shared test suite macros: persistence_suite!, lock_suite!, queue_suite!
that run the same tests against any provider implementation.
This commit is contained in:
2026-03-25 20:09:17 +00:00
parent d87d888787
commit eea8bdb824
9 changed files with 1248 additions and 0 deletions

View File

@@ -0,0 +1,62 @@
use chrono::Utc;
use crate::models::{Event, EventSubscription, ExecutionError, WorkflowInstance};
/// Create a sample `WorkflowInstance` for testing.
pub fn sample_workflow_instance() -> WorkflowInstance {
WorkflowInstance::new("test-workflow", 1, serde_json::json!({"key": "value"}))
}
/// Create a sample `Event` for testing.
pub fn sample_event() -> Event {
Event::new(
"order.created",
"order-123",
serde_json::json!({"amount": 42}),
)
}
/// Create a sample `EventSubscription` for testing.
pub fn sample_subscription() -> EventSubscription {
EventSubscription::new("wf-1", 0, "ptr-1", "order.created", "order-123", Utc::now())
}
/// Create a sample `ExecutionError` for testing.
pub fn sample_execution_error() -> ExecutionError {
ExecutionError::new("wf-1", "ptr-1", "something went wrong")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn sample_workflow_instance_has_expected_fields() {
let instance = sample_workflow_instance();
assert_eq!(instance.workflow_definition_id, "test-workflow");
assert_eq!(instance.version, 1);
}
#[test]
fn sample_event_has_expected_fields() {
let event = sample_event();
assert_eq!(event.event_name, "order.created");
assert_eq!(event.event_key, "order-123");
}
#[test]
fn sample_subscription_has_expected_fields() {
let sub = sample_subscription();
assert_eq!(sub.workflow_id, "wf-1");
assert_eq!(sub.event_name, "order.created");
assert_eq!(sub.event_key, "order-123");
}
#[test]
fn sample_execution_error_has_expected_fields() {
let err = sample_execution_error();
assert_eq!(err.workflow_id, "wf-1");
assert_eq!(err.execution_pointer_id, "ptr-1");
assert_eq!(err.message, "something went wrong");
}
}

View File

@@ -0,0 +1,65 @@
use std::sync::Arc;
use async_trait::async_trait;
use tokio::sync::Mutex;
use crate::models::LifecycleEvent;
use crate::traits::LifecyclePublisher;
use crate::Result;
/// An in-memory implementation of `LifecyclePublisher` for testing.
#[derive(Debug, Clone)]
pub struct InMemoryLifecyclePublisher {
events: Arc<Mutex<Vec<LifecycleEvent>>>,
}
impl InMemoryLifecyclePublisher {
pub fn new() -> Self {
Self {
events: Arc::new(Mutex::new(Vec::new())),
}
}
/// Retrieve all published lifecycle events for assertions.
pub async fn events(&self) -> Vec<LifecycleEvent> {
self.events.lock().await.clone()
}
}
impl Default for InMemoryLifecyclePublisher {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl LifecyclePublisher for InMemoryLifecyclePublisher {
async fn publish(&self, event: LifecycleEvent) -> Result<()> {
self.events.lock().await.push(event);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::models::LifecycleEventType;
use crate::traits::LifecyclePublisher;
#[test]
fn default_impl() {
let lc = InMemoryLifecyclePublisher::default();
drop(lc);
}
#[tokio::test]
async fn publish_and_retrieve_events() {
let lc = InMemoryLifecyclePublisher::new();
let event = LifecycleEvent::new("wf-1", "def-1", 1, LifecycleEventType::Started);
lc.publish(event).await.unwrap();
let events = lc.events().await;
assert_eq!(events.len(), 1);
assert_eq!(events[0].event_type, LifecycleEventType::Started);
}
}

View File

@@ -0,0 +1,50 @@
use std::collections::HashSet;
use std::sync::Arc;
use async_trait::async_trait;
use tokio::sync::Mutex;
use crate::traits::DistributedLockProvider;
use crate::Result;
/// An in-memory implementation of `DistributedLockProvider` for testing.
#[derive(Debug, Clone)]
pub struct InMemoryLockProvider {
locks: Arc<Mutex<HashSet<String>>>,
}
impl InMemoryLockProvider {
pub fn new() -> Self {
Self {
locks: Arc::new(Mutex::new(HashSet::new())),
}
}
}
impl Default for InMemoryLockProvider {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl DistributedLockProvider for InMemoryLockProvider {
async fn acquire_lock(&self, resource: &str) -> Result<bool> {
let mut locks = self.locks.lock().await;
Ok(locks.insert(resource.to_string()))
}
async fn release_lock(&self, resource: &str) -> Result<()> {
let mut locks = self.locks.lock().await;
locks.remove(resource);
Ok(())
}
async fn start(&self) -> Result<()> {
Ok(())
}
async fn stop(&self) -> Result<()> {
Ok(())
}
}

View File

@@ -0,0 +1,585 @@
use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use tokio::sync::RwLock;
use crate::models::{
Event, EventSubscription, ExecutionError, ScheduledCommand, WorkflowInstance,
};
use crate::traits::{
EventRepository, PersistenceProvider, ScheduledCommandRepository, SubscriptionRepository,
WorkflowRepository,
};
use crate::{Result, WfeError};
/// An in-memory implementation of `PersistenceProvider` for testing.
#[derive(Debug, Clone)]
pub struct InMemoryPersistenceProvider {
workflows: Arc<RwLock<HashMap<String, WorkflowInstance>>>,
events: Arc<RwLock<HashMap<String, Event>>>,
subscriptions: Arc<RwLock<HashMap<String, EventSubscription>>>,
errors: Arc<RwLock<Vec<ExecutionError>>>,
scheduled_commands: Arc<RwLock<Vec<ScheduledCommand>>>,
}
impl InMemoryPersistenceProvider {
pub fn new() -> Self {
Self {
workflows: Arc::new(RwLock::new(HashMap::new())),
events: Arc::new(RwLock::new(HashMap::new())),
subscriptions: Arc::new(RwLock::new(HashMap::new())),
errors: Arc::new(RwLock::new(Vec::new())),
scheduled_commands: Arc::new(RwLock::new(Vec::new())),
}
}
/// Retrieve all stored errors (for test assertions).
pub async fn get_errors(&self) -> Vec<ExecutionError> {
self.errors.read().await.clone()
}
}
impl Default for InMemoryPersistenceProvider {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl WorkflowRepository for InMemoryPersistenceProvider {
async fn create_new_workflow(&self, instance: &WorkflowInstance) -> Result<String> {
let id = if instance.id.is_empty() {
uuid::Uuid::new_v4().to_string()
} else {
instance.id.clone()
};
let mut stored = instance.clone();
stored.id = id.clone();
self.workflows.write().await.insert(id.clone(), stored);
Ok(id)
}
async fn persist_workflow(&self, instance: &WorkflowInstance) -> Result<()> {
self.workflows
.write()
.await
.insert(instance.id.clone(), instance.clone());
Ok(())
}
async fn persist_workflow_with_subscriptions(
&self,
instance: &WorkflowInstance,
subscriptions: &[EventSubscription],
) -> Result<()> {
self.persist_workflow(instance).await?;
let mut subs = self.subscriptions.write().await;
for sub in subscriptions {
subs.insert(sub.id.clone(), sub.clone());
}
Ok(())
}
async fn get_runnable_instances(&self, as_at: DateTime<Utc>) -> Result<Vec<String>> {
let workflows = self.workflows.read().await;
let as_at_millis = as_at.timestamp_millis();
let ids = workflows
.values()
.filter(|w| {
w.status == crate::models::WorkflowStatus::Runnable
&& w.next_execution
.map(|ne| ne <= as_at_millis)
.unwrap_or(false)
})
.map(|w| w.id.clone())
.collect();
Ok(ids)
}
async fn get_workflow_instance(&self, id: &str) -> Result<WorkflowInstance> {
self.workflows
.read()
.await
.get(id)
.cloned()
.ok_or_else(|| WfeError::WorkflowNotFound(id.to_string()))
}
async fn get_workflow_instances(&self, ids: &[String]) -> Result<Vec<WorkflowInstance>> {
let workflows = self.workflows.read().await;
let mut result = Vec::new();
for id in ids {
if let Some(w) = workflows.get(id) {
result.push(w.clone());
}
}
Ok(result)
}
}
#[async_trait]
impl SubscriptionRepository for InMemoryPersistenceProvider {
async fn create_event_subscription(
&self,
subscription: &EventSubscription,
) -> Result<String> {
let id = if subscription.id.is_empty() {
uuid::Uuid::new_v4().to_string()
} else {
subscription.id.clone()
};
let mut stored = subscription.clone();
stored.id = id.clone();
self.subscriptions.write().await.insert(id.clone(), stored);
Ok(id)
}
async fn get_subscriptions(
&self,
event_name: &str,
event_key: &str,
as_of: DateTime<Utc>,
) -> Result<Vec<EventSubscription>> {
let subs = self.subscriptions.read().await;
let result = subs
.values()
.filter(|s| {
s.event_name == event_name
&& s.event_key == event_key
&& s.subscribe_as_of <= as_of
&& s.external_token.is_none() // not terminated
})
.cloned()
.collect();
Ok(result)
}
async fn terminate_subscription(&self, subscription_id: &str) -> Result<()> {
let mut subs = self.subscriptions.write().await;
match subs.get_mut(subscription_id) {
Some(sub) => {
sub.external_token = Some("__terminated__".to_string());
Ok(())
}
None => Err(WfeError::SubscriptionNotFound(subscription_id.to_string())),
}
}
async fn get_subscription(&self, subscription_id: &str) -> Result<EventSubscription> {
self.subscriptions
.read()
.await
.get(subscription_id)
.cloned()
.ok_or_else(|| WfeError::SubscriptionNotFound(subscription_id.to_string()))
}
async fn get_first_open_subscription(
&self,
event_name: &str,
event_key: &str,
as_of: DateTime<Utc>,
) -> Result<Option<EventSubscription>> {
let subs = self.subscriptions.read().await;
let result = subs
.values()
.find(|s| {
s.event_name == event_name
&& s.event_key == event_key
&& s.subscribe_as_of <= as_of
&& s.external_token.is_none()
})
.cloned();
Ok(result)
}
async fn set_subscription_token(
&self,
subscription_id: &str,
token: &str,
worker_id: &str,
expiry: DateTime<Utc>,
) -> Result<bool> {
let mut subs = self.subscriptions.write().await;
match subs.get_mut(subscription_id) {
Some(sub) => {
if sub.external_token.is_some() {
return Ok(false);
}
sub.external_token = Some(token.to_string());
sub.external_worker_id = Some(worker_id.to_string());
sub.external_token_expiry = Some(expiry);
Ok(true)
}
None => Err(WfeError::SubscriptionNotFound(subscription_id.to_string())),
}
}
async fn clear_subscription_token(
&self,
subscription_id: &str,
token: &str,
) -> Result<()> {
let mut subs = self.subscriptions.write().await;
match subs.get_mut(subscription_id) {
Some(sub) => {
if sub.external_token.as_deref() != Some(token) {
return Err(WfeError::Persistence(format!(
"Token mismatch for subscription {subscription_id}"
)));
}
sub.external_token = None;
sub.external_worker_id = None;
sub.external_token_expiry = None;
Ok(())
}
None => Err(WfeError::SubscriptionNotFound(subscription_id.to_string())),
}
}
}
#[async_trait]
impl EventRepository for InMemoryPersistenceProvider {
async fn create_event(&self, event: &Event) -> Result<String> {
let id = if event.id.is_empty() {
uuid::Uuid::new_v4().to_string()
} else {
event.id.clone()
};
let mut stored = event.clone();
stored.id = id.clone();
self.events.write().await.insert(id.clone(), stored);
Ok(id)
}
async fn get_event(&self, id: &str) -> Result<Event> {
self.events
.read()
.await
.get(id)
.cloned()
.ok_or_else(|| WfeError::EventNotFound(id.to_string()))
}
async fn get_runnable_events(&self, as_at: DateTime<Utc>) -> Result<Vec<String>> {
let events = self.events.read().await;
let ids = events
.values()
.filter(|e| !e.is_processed && e.event_time <= as_at)
.map(|e| e.id.clone())
.collect();
Ok(ids)
}
async fn get_events(
&self,
event_name: &str,
event_key: &str,
as_of: DateTime<Utc>,
) -> Result<Vec<String>> {
let events = self.events.read().await;
let ids = events
.values()
.filter(|e| e.event_name == event_name && e.event_key == event_key && e.event_time <= as_of)
.map(|e| e.id.clone())
.collect();
Ok(ids)
}
async fn mark_event_processed(&self, id: &str) -> Result<()> {
let mut events = self.events.write().await;
match events.get_mut(id) {
Some(event) => {
event.is_processed = true;
Ok(())
}
None => Err(WfeError::EventNotFound(id.to_string())),
}
}
async fn mark_event_unprocessed(&self, id: &str) -> Result<()> {
let mut events = self.events.write().await;
match events.get_mut(id) {
Some(event) => {
event.is_processed = false;
Ok(())
}
None => Err(WfeError::EventNotFound(id.to_string())),
}
}
}
#[async_trait]
impl ScheduledCommandRepository for InMemoryPersistenceProvider {
fn supports_scheduled_commands(&self) -> bool {
true
}
async fn schedule_command(&self, command: &ScheduledCommand) -> Result<()> {
self.scheduled_commands.write().await.push(command.clone());
Ok(())
}
async fn process_commands(
&self,
as_of: DateTime<Utc>,
handler: &(dyn Fn(ScheduledCommand) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
+ Send
+ Sync),
) -> Result<()> {
let as_of_millis = as_of.timestamp_millis();
let due: Vec<ScheduledCommand> = {
let mut cmds = self.scheduled_commands.write().await;
let (due, remaining): (Vec<_>, Vec<_>) =
cmds.drain(..).partition(|c| c.execute_time <= as_of_millis);
*cmds = remaining;
due
};
for cmd in due {
handler(cmd).await?;
}
Ok(())
}
}
#[async_trait]
impl PersistenceProvider for InMemoryPersistenceProvider {
async fn persist_errors(&self, errors: &[ExecutionError]) -> Result<()> {
let mut stored = self.errors.write().await;
stored.extend(errors.iter().cloned());
Ok(())
}
async fn ensure_store_exists(&self) -> Result<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::models::{Event, EventSubscription, ExecutionError, ScheduledCommand, CommandName};
use crate::traits::{
EventRepository, PersistenceProvider, ScheduledCommandRepository, SubscriptionRepository,
WorkflowRepository,
};
use chrono::{Duration, Utc};
#[tokio::test]
async fn default_impl() {
let p = InMemoryPersistenceProvider::default();
p.ensure_store_exists().await.unwrap();
}
#[tokio::test]
async fn get_workflow_instances_batch() {
let p = InMemoryPersistenceProvider::new();
let w1 = WorkflowInstance::new("wf", 1, serde_json::json!({}));
let id1 = p.create_new_workflow(&w1).await.unwrap();
let w2 = WorkflowInstance::new("wf", 1, serde_json::json!({}));
let id2 = p.create_new_workflow(&w2).await.unwrap();
let ids = vec![id1.clone(), id2.clone(), "nonexistent".to_string()];
let result = p.get_workflow_instances(&ids).await.unwrap();
// Only existing workflows are returned.
assert_eq!(result.len(), 2);
}
#[tokio::test]
async fn get_first_open_subscription_returns_match() {
let p = InMemoryPersistenceProvider::new();
let now = Utc::now();
let sub = EventSubscription::new("wf-1", 0, "ptr-1", "evt", "key", now);
let id = p.create_event_subscription(&sub).await.unwrap();
let found = p
.get_first_open_subscription("evt", "key", now + Duration::seconds(1))
.await
.unwrap();
assert!(found.is_some());
assert_eq!(found.unwrap().id, id);
}
#[tokio::test]
async fn get_first_open_subscription_returns_none() {
let p = InMemoryPersistenceProvider::new();
let now = Utc::now();
let found = p
.get_first_open_subscription("evt", "key", now)
.await
.unwrap();
assert!(found.is_none());
}
#[tokio::test]
async fn set_subscription_token_success() {
let p = InMemoryPersistenceProvider::new();
let now = Utc::now();
let sub = EventSubscription::new("wf-1", 0, "ptr-1", "evt", "key", now);
let id = p.create_event_subscription(&sub).await.unwrap();
let expiry = now + Duration::hours(1);
let result = p
.set_subscription_token(&id, "token-1", "worker-1", expiry)
.await
.unwrap();
assert!(result);
// Setting token again on already-tokened subscription returns false.
let result2 = p
.set_subscription_token(&id, "token-2", "worker-2", expiry)
.await
.unwrap();
assert!(!result2);
}
#[tokio::test]
async fn set_subscription_token_not_found() {
let p = InMemoryPersistenceProvider::new();
let result = p
.set_subscription_token("nonexistent", "t", "w", Utc::now())
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn clear_subscription_token_success() {
let p = InMemoryPersistenceProvider::new();
let now = Utc::now();
let sub = EventSubscription::new("wf-1", 0, "ptr-1", "evt", "key", now);
let id = p.create_event_subscription(&sub).await.unwrap();
let expiry = now + Duration::hours(1);
p.set_subscription_token(&id, "token-1", "worker-1", expiry)
.await
.unwrap();
p.clear_subscription_token(&id, "token-1").await.unwrap();
// After clearing, the subscription should be open again.
let retrieved = p.get_subscription(&id).await.unwrap();
assert!(retrieved.external_token.is_none());
}
#[tokio::test]
async fn clear_subscription_token_not_found() {
let p = InMemoryPersistenceProvider::new();
let result = p.clear_subscription_token("nonexistent", "t").await;
assert!(result.is_err());
}
#[tokio::test]
async fn terminate_subscription_not_found() {
let p = InMemoryPersistenceProvider::new();
let result = p.terminate_subscription("nonexistent").await;
assert!(result.is_err());
}
#[tokio::test]
async fn get_events_by_name_and_key() {
let p = InMemoryPersistenceProvider::new();
let now = Utc::now();
let e1 = Event::new("evt-a", "key-1", serde_json::json!({}));
let id1 = p.create_event(&e1).await.unwrap();
let e2 = Event::new("evt-a", "key-2", serde_json::json!({}));
let _id2 = p.create_event(&e2).await.unwrap();
let e3 = Event::new("evt-b", "key-1", serde_json::json!({}));
let _id3 = p.create_event(&e3).await.unwrap();
let ids = p
.get_events("evt-a", "key-1", now + Duration::seconds(1))
.await
.unwrap();
assert_eq!(ids.len(), 1);
assert_eq!(ids[0], id1);
}
#[tokio::test]
async fn mark_event_unprocessed() {
let p = InMemoryPersistenceProvider::new();
let e = Event::new("evt", "key", serde_json::json!({}));
let id = p.create_event(&e).await.unwrap();
p.mark_event_processed(&id).await.unwrap();
let event = p.get_event(&id).await.unwrap();
assert!(event.is_processed);
p.mark_event_unprocessed(&id).await.unwrap();
let event = p.get_event(&id).await.unwrap();
assert!(!event.is_processed);
}
#[tokio::test]
async fn mark_event_unprocessed_not_found() {
let p = InMemoryPersistenceProvider::new();
let result = p.mark_event_unprocessed("nonexistent").await;
assert!(result.is_err());
}
#[tokio::test]
async fn mark_event_processed_not_found() {
let p = InMemoryPersistenceProvider::new();
let result = p.mark_event_processed("nonexistent").await;
assert!(result.is_err());
}
#[tokio::test]
async fn get_errors_returns_persisted_errors() {
let p = InMemoryPersistenceProvider::new();
let errors = vec![
ExecutionError::new("wf-1", "ptr-1", "err1"),
ExecutionError::new("wf-2", "ptr-2", "err2"),
];
p.persist_errors(&errors).await.unwrap();
let stored = p.get_errors().await;
assert_eq!(stored.len(), 2);
}
#[tokio::test]
async fn supports_scheduled_commands_returns_true() {
let p = InMemoryPersistenceProvider::new();
assert!(p.supports_scheduled_commands());
}
#[tokio::test]
async fn schedule_and_process_commands() {
let p = InMemoryPersistenceProvider::new();
let now = Utc::now();
let cmd = ScheduledCommand {
command_name: CommandName::ProcessEvent,
data: "test-event-id".to_string(),
execute_time: now.timestamp_millis() - 1000,
};
p.schedule_command(&cmd).await.unwrap();
p.process_commands(now, &|c: ScheduledCommand| {
let _data = c.data;
Box::pin(async move { Ok(()) })
})
.await
.unwrap();
// After processing, no more due commands remain.
let remaining_count = p.scheduled_commands.read().await.len();
assert_eq!(remaining_count, 0);
}
#[tokio::test]
async fn persist_workflow_with_subscriptions() {
let p = InMemoryPersistenceProvider::new();
let w = WorkflowInstance::new("wf", 1, serde_json::json!({}));
let id = p.create_new_workflow(&w).await.unwrap();
let mut updated = p.get_workflow_instance(&id).await.unwrap();
updated.description = Some("updated".to_string());
let sub = EventSubscription::new("wf-1", 0, "ptr-1", "evt", "key", Utc::now());
p.persist_workflow_with_subscriptions(&updated, &[sub])
.await
.unwrap();
let retrieved = p.get_workflow_instance(&id).await.unwrap();
assert_eq!(retrieved.description.as_deref(), Some("updated"));
}
}

View File

@@ -0,0 +1,58 @@
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use async_trait::async_trait;
use tokio::sync::Mutex;
use crate::models::QueueType;
use crate::traits::QueueProvider;
use crate::Result;
/// An in-memory implementation of `QueueProvider` for testing.
#[derive(Debug, Clone)]
pub struct InMemoryQueueProvider {
queues: Arc<Mutex<HashMap<QueueType, VecDeque<String>>>>,
}
impl InMemoryQueueProvider {
pub fn new() -> Self {
Self {
queues: Arc::new(Mutex::new(HashMap::new())),
}
}
}
impl Default for InMemoryQueueProvider {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl QueueProvider for InMemoryQueueProvider {
async fn queue_work(&self, id: &str, queue: QueueType) -> Result<()> {
let mut queues = self.queues.lock().await;
queues
.entry(queue)
.or_insert_with(VecDeque::new)
.push_back(id.to_string());
Ok(())
}
async fn dequeue_work(&self, queue: QueueType) -> Result<Option<String>> {
let mut queues = self.queues.lock().await;
Ok(queues.get_mut(&queue).and_then(|q| q.pop_front()))
}
fn is_dequeue_blocking(&self) -> bool {
false
}
async fn start(&self) -> Result<()> {
Ok(())
}
async fn stop(&self) -> Result<()> {
Ok(())
}
}

View File

@@ -0,0 +1,49 @@
/// Generates a test suite for any `DistributedLockProvider` implementation.
///
/// The macro takes a factory expression that returns an `impl DistributedLockProvider`.
///
/// # Example
/// ```ignore
/// lock_suite!(|| async { InMemoryLockProvider::new() });
/// ```
#[macro_export]
macro_rules! lock_suite {
($factory:expr) => {
mod lock_suite {
use super::*;
use $crate::traits::DistributedLockProvider;
#[tokio::test]
async fn acquire_lock_succeeds() {
let provider = ($factory)().await;
let acquired = provider.acquire_lock("resource-1").await.unwrap();
assert!(acquired);
}
#[tokio::test]
async fn double_acquire_fails() {
let provider = ($factory)().await;
let first = provider.acquire_lock("resource-1").await.unwrap();
assert!(first);
let second = provider.acquire_lock("resource-1").await.unwrap();
assert!(!second);
}
#[tokio::test]
async fn release_then_reacquire() {
let provider = ($factory)().await;
provider.acquire_lock("resource-1").await.unwrap();
provider.release_lock("resource-1").await.unwrap();
let reacquired = provider.acquire_lock("resource-1").await.unwrap();
assert!(reacquired);
}
#[tokio::test]
async fn release_nonexistent_ok() {
let provider = ($factory)().await;
// Should not error even if lock was never acquired
provider.release_lock("nonexistent").await.unwrap();
}
}
};
}

View File

@@ -0,0 +1,26 @@
pub mod fixtures;
pub mod in_memory_lifecycle;
pub mod in_memory_lock;
pub mod in_memory_persistence;
pub mod in_memory_queue;
// Test suite macros (exported via #[macro_export] at crate level)
mod lock_suite;
mod persistence_suite;
mod queue_suite;
pub use fixtures::*;
pub use in_memory_lifecycle::InMemoryLifecyclePublisher;
pub use in_memory_lock::InMemoryLockProvider;
pub use in_memory_persistence::InMemoryPersistenceProvider;
pub use in_memory_queue::InMemoryQueueProvider;
// Run the test suites against the in-memory providers.
#[cfg(test)]
mod tests {
use super::*;
crate::persistence_suite!(|| async { InMemoryPersistenceProvider::new() });
crate::lock_suite!(|| async { InMemoryLockProvider::new() });
crate::queue_suite!(|| async { InMemoryQueueProvider::new() });
}

View File

@@ -0,0 +1,243 @@
/// Generates a test suite for any `PersistenceProvider` implementation.
///
/// The macro takes a factory expression that returns an `impl PersistenceProvider`.
/// The factory expression must be async-compatible (it will be `.await`ed).
///
/// # Example
/// ```ignore
/// persistence_suite!(|| async { InMemoryPersistenceProvider::new() });
/// ```
#[macro_export]
macro_rules! persistence_suite {
($factory:expr) => {
mod persistence_suite {
use super::*;
use chrono::{Duration, Utc};
use $crate::models::{
Event, EventSubscription, ExecutionError, WorkflowInstance, WorkflowStatus,
};
use $crate::traits::{
EventRepository, PersistenceProvider, SubscriptionRepository, WorkflowRepository,
};
#[tokio::test]
async fn create_new_workflow_generates_id() {
let provider = ($factory)().await;
let instance = WorkflowInstance::new("test-wf", 1, serde_json::json!({}));
let id = provider.create_new_workflow(&instance).await.unwrap();
assert!(!id.is_empty());
}
#[tokio::test]
async fn get_workflow_instance_retrieves_workflow() {
let provider = ($factory)().await;
let instance = WorkflowInstance::new("test-wf", 1, serde_json::json!({"x": 1}));
let id = provider.create_new_workflow(&instance).await.unwrap();
let retrieved = provider.get_workflow_instance(&id).await.unwrap();
assert_eq!(retrieved.id, id);
assert_eq!(retrieved.workflow_definition_id, "test-wf");
assert_eq!(retrieved.data, serde_json::json!({"x": 1}));
}
#[tokio::test]
async fn persist_workflow_updates_fields() {
let provider = ($factory)().await;
let mut instance =
WorkflowInstance::new("test-wf", 1, serde_json::json!({}));
let id = provider.create_new_workflow(&instance).await.unwrap();
instance.id = id.clone();
instance.description = Some("updated".to_string());
instance.status = WorkflowStatus::Suspended;
provider.persist_workflow(&instance).await.unwrap();
let retrieved = provider.get_workflow_instance(&id).await.unwrap();
assert_eq!(retrieved.description.as_deref(), Some("updated"));
assert_eq!(retrieved.status, WorkflowStatus::Suspended);
}
#[tokio::test]
async fn get_runnable_instances_filters_by_time() {
let provider = ($factory)().await;
// Runnable with next_execution in the past
let mut w1 = WorkflowInstance::new("wf", 1, serde_json::json!({}));
w1.next_execution = Some(0);
let id1 = provider.create_new_workflow(&w1).await.unwrap();
// Runnable with next_execution far in the future
let mut w2 = WorkflowInstance::new("wf", 1, serde_json::json!({}));
w2.next_execution = Some(i64::MAX);
let _id2 = provider.create_new_workflow(&w2).await.unwrap();
// Suspended workflow
let mut w3 = WorkflowInstance::new("wf", 1, serde_json::json!({}));
w3.next_execution = Some(0);
w3.status = WorkflowStatus::Suspended;
let id3 = provider.create_new_workflow(&w3).await.unwrap();
// Need to persist updated status
w3.id = id3;
provider.persist_workflow(&w3).await.unwrap();
let runnable = provider.get_runnable_instances(Utc::now()).await.unwrap();
assert!(runnable.contains(&id1));
assert_eq!(runnable.len(), 1);
}
#[tokio::test]
async fn persist_errors_stores_errors() {
let provider = ($factory)().await;
let errors = vec![
ExecutionError::new("wf-1", "ptr-1", "error one"),
ExecutionError::new("wf-1", "ptr-2", "error two"),
];
provider.persist_errors(&errors).await.unwrap();
// Persist more errors
let more = vec![ExecutionError::new("wf-2", "ptr-3", "error three")];
provider.persist_errors(&more).await.unwrap();
// We can't directly read errors from the trait, but persist_errors should not fail
}
#[tokio::test]
async fn create_and_get_subscription() {
let provider = ($factory)().await;
let sub = EventSubscription::new(
"wf-1",
0,
"ptr-1",
"order.created",
"order-123",
Utc::now(),
);
let id = provider.create_event_subscription(&sub).await.unwrap();
let retrieved = provider.get_subscription(&id).await.unwrap();
assert_eq!(retrieved.id, id);
assert_eq!(retrieved.event_name, "order.created");
assert_eq!(retrieved.event_key, "order-123");
}
#[tokio::test]
async fn get_subscriptions_by_event() {
let provider = ($factory)().await;
let now = Utc::now();
let sub1 = EventSubscription::new(
"wf-1", 0, "ptr-1", "order.created", "key-A", now,
);
provider.create_event_subscription(&sub1).await.unwrap();
let sub2 = EventSubscription::new(
"wf-2", 1, "ptr-2", "order.created", "key-A", now,
);
provider.create_event_subscription(&sub2).await.unwrap();
let sub3 = EventSubscription::new(
"wf-3", 0, "ptr-3", "order.created", "key-B", now,
);
provider.create_event_subscription(&sub3).await.unwrap();
let result = provider
.get_subscriptions("order.created", "key-A", now + Duration::seconds(1))
.await
.unwrap();
assert_eq!(result.len(), 2);
}
#[tokio::test]
async fn terminate_subscription() {
let provider = ($factory)().await;
let now = Utc::now();
let sub = EventSubscription::new(
"wf-1", 0, "ptr-1", "evt", "key", now,
);
let id = provider.create_event_subscription(&sub).await.unwrap();
provider.terminate_subscription(&id).await.unwrap();
// Terminated subscriptions should not appear in get_subscriptions
let subs = provider
.get_subscriptions("evt", "key", now + Duration::seconds(1))
.await
.unwrap();
assert!(subs.is_empty());
}
#[tokio::test]
async fn create_and_get_event() {
let provider = ($factory)().await;
let event =
Event::new("order.created", "order-123", serde_json::json!({"x": 1}));
let id = provider.create_event(&event).await.unwrap();
let retrieved = provider.get_event(&id).await.unwrap();
assert_eq!(retrieved.id, id);
assert_eq!(retrieved.event_name, "order.created");
assert_eq!(retrieved.event_data, serde_json::json!({"x": 1}));
}
#[tokio::test]
async fn get_runnable_events() {
let provider = ($factory)().await;
let event = Event::new("evt", "key", serde_json::json!(null));
let id = provider.create_event(&event).await.unwrap();
let runnable = provider
.get_runnable_events(Utc::now() + Duration::seconds(1))
.await
.unwrap();
assert!(runnable.contains(&id));
}
#[tokio::test]
async fn mark_event_processed() {
let provider = ($factory)().await;
let event = Event::new("evt", "key", serde_json::json!(null));
let id = provider.create_event(&event).await.unwrap();
provider.mark_event_processed(&id).await.unwrap();
let runnable = provider
.get_runnable_events(Utc::now() + Duration::seconds(1))
.await
.unwrap();
assert!(!runnable.contains(&id));
let retrieved = provider.get_event(&id).await.unwrap();
assert!(retrieved.is_processed);
}
#[tokio::test]
async fn concurrent_persist_workflow_no_data_race() {
let provider = std::sync::Arc::new(($factory)().await);
let mut handles = Vec::new();
for i in 0..30 {
let p = provider.clone();
handles.push(tokio::spawn(async move {
let instance = WorkflowInstance::new(
format!("wf-{i}"),
1,
serde_json::json!({"i": i}),
);
p.create_new_workflow(&instance).await.unwrap()
}));
}
let mut ids = Vec::new();
for handle in handles {
ids.push(handle.await.unwrap());
}
// All 30 should be unique
let unique: std::collections::HashSet<_> = ids.iter().collect();
assert_eq!(unique.len(), 30);
// All should be retrievable
for id in &ids {
let w = provider.get_workflow_instance(id).await.unwrap();
assert_eq!(w.id, *id);
}
}
}
};
}

View File

@@ -0,0 +1,110 @@
/// Generates a test suite for any `QueueProvider` implementation.
///
/// The macro takes a factory expression that returns an `impl QueueProvider`.
///
/// # Example
/// ```ignore
/// queue_suite!(|| async { InMemoryQueueProvider::new() });
/// ```
#[macro_export]
macro_rules! queue_suite {
($factory:expr) => {
mod queue_suite {
use super::*;
use $crate::models::QueueType;
use $crate::traits::QueueProvider;
#[tokio::test]
async fn enqueue_dequeue_fifo() {
let provider = ($factory)().await;
provider
.queue_work("a", QueueType::Workflow)
.await
.unwrap();
provider
.queue_work("b", QueueType::Workflow)
.await
.unwrap();
provider
.queue_work("c", QueueType::Workflow)
.await
.unwrap();
assert_eq!(
provider
.dequeue_work(QueueType::Workflow)
.await
.unwrap()
.as_deref(),
Some("a")
);
assert_eq!(
provider
.dequeue_work(QueueType::Workflow)
.await
.unwrap()
.as_deref(),
Some("b")
);
assert_eq!(
provider
.dequeue_work(QueueType::Workflow)
.await
.unwrap()
.as_deref(),
Some("c")
);
}
#[tokio::test]
async fn dequeue_empty_returns_none() {
let provider = ($factory)().await;
let result = provider.dequeue_work(QueueType::Workflow).await.unwrap();
assert!(result.is_none());
}
#[tokio::test]
async fn multiple_queue_types_independent() {
let provider = ($factory)().await;
provider
.queue_work("wf-1", QueueType::Workflow)
.await
.unwrap();
provider
.queue_work("evt-1", QueueType::Event)
.await
.unwrap();
// Dequeue from Event queue should get evt-1, not wf-1
assert_eq!(
provider
.dequeue_work(QueueType::Event)
.await
.unwrap()
.as_deref(),
Some("evt-1")
);
assert_eq!(
provider
.dequeue_work(QueueType::Workflow)
.await
.unwrap()
.as_deref(),
Some("wf-1")
);
// Both should now be empty
assert!(provider
.dequeue_work(QueueType::Event)
.await
.unwrap()
.is_none());
assert!(provider
.dequeue_work(QueueType::Workflow)
.await
.unwrap()
.is_none());
}
}
};
}