diff --git a/wfe-valkey/Cargo.toml b/wfe-valkey/Cargo.toml new file mode 100644 index 0000000..4c17990 --- /dev/null +++ b/wfe-valkey/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "wfe-valkey" +version.workspace = true +edition.workspace = true +license.workspace = true +description = "Valkey/Redis provider for distributed locking, queues, and lifecycle events in WFE" + +[dependencies] +wfe-core = { workspace = true } +redis = { workspace = true } +tokio = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +async-trait = { workspace = true } +uuid = { workspace = true } +thiserror = { workspace = true } +tracing = { workspace = true } + +[dev-dependencies] +wfe-core = { workspace = true, features = ["test-support"] } +pretty_assertions = { workspace = true } +rstest = { workspace = true } +tokio = { workspace = true, features = ["test-util"] } +tokio-stream = { workspace = true } +uuid = { workspace = true } diff --git a/wfe-valkey/src/lib.rs b/wfe-valkey/src/lib.rs new file mode 100644 index 0000000..1ab0ab8 --- /dev/null +++ b/wfe-valkey/src/lib.rs @@ -0,0 +1,7 @@ +pub mod lifecycle; +pub mod lock; +pub mod queue; + +pub use lifecycle::ValkeyLifecyclePublisher; +pub use lock::ValkeyLockProvider; +pub use queue::ValkeyQueueProvider; diff --git a/wfe-valkey/src/lifecycle.rs b/wfe-valkey/src/lifecycle.rs new file mode 100644 index 0000000..eaadf8c --- /dev/null +++ b/wfe-valkey/src/lifecycle.rs @@ -0,0 +1,55 @@ +use async_trait::async_trait; +use wfe_core::models::LifecycleEvent; +use wfe_core::traits::LifecyclePublisher; + +pub struct ValkeyLifecyclePublisher { + conn: redis::aio::MultiplexedConnection, + prefix: String, +} + +impl ValkeyLifecyclePublisher { + pub async fn new(redis_url: &str, prefix: &str) -> wfe_core::Result { + let client = redis::Client::open(redis_url) + .map_err(|e| wfe_core::WfeError::Persistence(e.to_string()))?; + let conn = client + .get_multiplexed_tokio_connection() + .await + .map_err(|e| wfe_core::WfeError::Persistence(e.to_string()))?; + Ok(Self { + conn, + prefix: prefix.to_string(), + }) + } +} + +#[async_trait] +impl LifecyclePublisher for ValkeyLifecyclePublisher { + async fn publish(&self, event: LifecycleEvent) -> wfe_core::Result<()> { + let mut conn = self.conn.clone(); + let json = serde_json::to_string(&event)?; + + let instance_channel = format!( + "{}:lifecycle:{}", + self.prefix, event.workflow_instance_id + ); + let all_channel = format!("{}:lifecycle:all", self.prefix); + + // Publish to the instance-specific channel. + redis::cmd("PUBLISH") + .arg(&instance_channel) + .arg(&json) + .query_async::(&mut conn) + .await + .map_err(|e| wfe_core::WfeError::Persistence(e.to_string()))?; + + // Publish to the global "all" channel. + redis::cmd("PUBLISH") + .arg(&all_channel) + .arg(&json) + .query_async::(&mut conn) + .await + .map_err(|e| wfe_core::WfeError::Persistence(e.to_string()))?; + + Ok(()) + } +} diff --git a/wfe-valkey/src/lock.rs b/wfe-valkey/src/lock.rs new file mode 100644 index 0000000..b49cbc8 --- /dev/null +++ b/wfe-valkey/src/lock.rs @@ -0,0 +1,98 @@ +use std::time::Duration; + +use async_trait::async_trait; +use uuid::Uuid; +use wfe_core::traits::DistributedLockProvider; + +pub struct ValkeyLockProvider { + conn: redis::aio::MultiplexedConnection, + prefix: String, + lock_duration: Duration, + /// Unique identifier for this provider instance, used as the lock value. + instance_id: String, +} + +impl ValkeyLockProvider { + pub async fn new(redis_url: &str, prefix: &str) -> wfe_core::Result { + let client = redis::Client::open(redis_url) + .map_err(|e| wfe_core::WfeError::Persistence(e.to_string()))?; + let conn = client + .get_multiplexed_tokio_connection() + .await + .map_err(|e| wfe_core::WfeError::Persistence(e.to_string()))?; + Ok(Self { + conn, + prefix: prefix.to_string(), + lock_duration: Duration::from_secs(30), + instance_id: Uuid::new_v4().to_string(), + }) + } + + /// Create a provider with a custom lock duration. + pub fn with_lock_duration(mut self, duration: Duration) -> Self { + self.lock_duration = duration; + self + } + + fn lock_key(&self, resource: &str) -> String { + format!("{}:lock:{}", self.prefix, resource) + } +} + +#[async_trait] +impl DistributedLockProvider for ValkeyLockProvider { + async fn acquire_lock(&self, resource: &str) -> wfe_core::Result { + let mut conn = self.conn.clone(); + let key = self.lock_key(resource); + let seconds = self.lock_duration.as_secs(); + + // SET key value NX EX seconds + let result: Option = redis::cmd("SET") + .arg(&key) + .arg(&self.instance_id) + .arg("NX") + .arg("EX") + .arg(seconds) + .query_async(&mut conn) + .await + .map_err(|e| wfe_core::WfeError::Persistence(e.to_string()))?; + + Ok(result.is_some()) + } + + async fn release_lock(&self, resource: &str) -> wfe_core::Result<()> { + let mut conn = self.conn.clone(); + let key = self.lock_key(resource); + + // Use a Lua script to only delete the key if the value matches our instance_id. + // This prevents accidentally releasing a lock held by another instance. + let script = redis::Script::new( + r#" + if redis.call("GET", KEYS[1]) == ARGV[1] then + return redis.call("DEL", KEYS[1]) + else + return 0 + end + "#, + ); + + let _: i64 = script + .key(&key) + .arg(&self.instance_id) + .invoke_async(&mut conn) + .await + .map_err(|e| wfe_core::WfeError::Persistence(e.to_string()))?; + + Ok(()) + } + + async fn start(&self) -> wfe_core::Result<()> { + // No-op: Redis/Valkey is always-on. + Ok(()) + } + + async fn stop(&self) -> wfe_core::Result<()> { + // No-op. + Ok(()) + } +} diff --git a/wfe-valkey/src/queue.rs b/wfe-valkey/src/queue.rs new file mode 100644 index 0000000..44f4a12 --- /dev/null +++ b/wfe-valkey/src/queue.rs @@ -0,0 +1,73 @@ +use async_trait::async_trait; +use redis::AsyncCommands; +use wfe_core::models::QueueType; +use wfe_core::traits::QueueProvider; + +pub struct ValkeyQueueProvider { + conn: redis::aio::MultiplexedConnection, + prefix: String, +} + +impl ValkeyQueueProvider { + pub async fn new(redis_url: &str, prefix: &str) -> wfe_core::Result { + let client = redis::Client::open(redis_url) + .map_err(|e| wfe_core::WfeError::Persistence(e.to_string()))?; + let conn = client + .get_multiplexed_tokio_connection() + .await + .map_err(|e| wfe_core::WfeError::Persistence(e.to_string()))?; + Ok(Self { + conn, + prefix: prefix.to_string(), + }) + } + + fn queue_key(&self, queue_type: QueueType) -> String { + let type_str = match queue_type { + QueueType::Workflow => "workflow", + QueueType::Event => "event", + QueueType::Index => "index", + }; + format!("{}:queue:{}", self.prefix, type_str) + } +} + +#[async_trait] +impl QueueProvider for ValkeyQueueProvider { + async fn queue_work(&self, id: &str, queue: QueueType) -> wfe_core::Result<()> { + let mut conn = self.conn.clone(); + let key = self.queue_key(queue); + + conn.lpush::<_, _, ()>(&key, id) + .await + .map_err(|e| wfe_core::WfeError::Persistence(e.to_string()))?; + + Ok(()) + } + + async fn dequeue_work(&self, queue: QueueType) -> wfe_core::Result> { + let mut conn = self.conn.clone(); + let key = self.queue_key(queue); + + let result: Option = conn + .rpop(&key, None) + .await + .map_err(|e| wfe_core::WfeError::Persistence(e.to_string()))?; + + Ok(result) + } + + fn is_dequeue_blocking(&self) -> bool { + false + } + + async fn start(&self) -> wfe_core::Result<()> { + // No-op. + Ok(()) + } + + async fn stop(&self) -> wfe_core::Result<()> { + // No-op. + Ok(()) + } +} diff --git a/wfe-valkey/tests/lifecycle.rs b/wfe-valkey/tests/lifecycle.rs new file mode 100644 index 0000000..a91a37b --- /dev/null +++ b/wfe-valkey/tests/lifecycle.rs @@ -0,0 +1,114 @@ +use std::time::Duration; + +use wfe_core::models::{LifecycleEvent, LifecycleEventType}; +use wfe_core::traits::LifecyclePublisher; + +fn redis_available() -> bool { + redis::Client::open("redis://localhost:6379") + .and_then(|c| c.get_connection()) + .is_ok() +} + +#[tokio::test] +async fn publish_subscribe_round_trip() { + if !redis_available() { + eprintln!("Skipping test: Valkey/Redis not available"); + return; + } + + let prefix = format!("wfe_test_{}", uuid::Uuid::new_v4().simple()); + let publisher = + wfe_valkey::ValkeyLifecyclePublisher::new("redis://localhost:6379", &prefix).await.unwrap(); + + let instance_id = "wf-lifecycle-test-1"; + let channel = format!("{}:lifecycle:{}", prefix, instance_id); + + // Set up a subscriber in a background task. + let sub_client = redis::Client::open("redis://localhost:6379").unwrap(); + let mut pubsub = sub_client.get_async_pubsub().await.unwrap(); + pubsub.subscribe(&channel).await.unwrap(); + + let (tx, mut rx) = tokio::sync::mpsc::channel::(1); + + let handle = tokio::spawn(async move { + let mut stream = pubsub.into_on_message(); + use tokio_stream::StreamExt; + if let Some(msg) = stream.next().await { + let payload: String = msg.get_payload().unwrap(); + let _ = tx.send(payload).await; + } + }); + + // Small delay to ensure the subscription is active before publishing. + tokio::time::sleep(Duration::from_millis(200)).await; + + let event = LifecycleEvent::new( + instance_id, + "def-1", + 1, + LifecycleEventType::Started, + ); + publisher.publish(event).await.unwrap(); + + // Wait for the message with a timeout. + let received = tokio::time::timeout(Duration::from_secs(5), rx.recv()) + .await + .expect("Timed out waiting for lifecycle event") + .expect("Channel closed without receiving a message"); + + let deserialized: LifecycleEvent = serde_json::from_str(&received).unwrap(); + assert_eq!(deserialized.workflow_instance_id, instance_id); + assert_eq!(deserialized.event_type, LifecycleEventType::Started); + + handle.abort(); +} + +#[tokio::test] +async fn publish_to_all_channel() { + if !redis_available() { + eprintln!("Skipping test: Valkey/Redis not available"); + return; + } + + let prefix = format!("wfe_test_{}", uuid::Uuid::new_v4().simple()); + let publisher = + wfe_valkey::ValkeyLifecyclePublisher::new("redis://localhost:6379", &prefix).await.unwrap(); + + let all_channel = format!("{}:lifecycle:all", prefix); + + let sub_client = redis::Client::open("redis://localhost:6379").unwrap(); + let mut pubsub = sub_client.get_async_pubsub().await.unwrap(); + pubsub.subscribe(&all_channel).await.unwrap(); + + let (tx, mut rx) = tokio::sync::mpsc::channel::(1); + + let handle = tokio::spawn(async move { + let mut stream = pubsub.into_on_message(); + use tokio_stream::StreamExt; + if let Some(msg) = stream.next().await { + let payload: String = msg.get_payload().unwrap(); + let _ = tx.send(payload).await; + } + }); + + tokio::time::sleep(Duration::from_millis(200)).await; + + let event = LifecycleEvent::new( + "wf-all-test", + "def-1", + 1, + LifecycleEventType::Completed, + ); + publisher.publish(event).await.unwrap(); + + let received = tokio::time::timeout(Duration::from_secs(5), rx.recv()) + .await + .expect("Timed out waiting for lifecycle event on 'all' channel") + .expect("Channel closed"); + + let deserialized: LifecycleEvent = serde_json::from_str(&received).unwrap(); + assert_eq!(deserialized.workflow_instance_id, "wf-all-test"); + assert_eq!(deserialized.event_type, LifecycleEventType::Completed); + + handle.abort(); +} diff --git a/wfe-valkey/tests/lock.rs b/wfe-valkey/tests/lock.rs new file mode 100644 index 0000000..681a504 --- /dev/null +++ b/wfe-valkey/tests/lock.rs @@ -0,0 +1,8 @@ +use wfe_core::lock_suite; + +async fn make_provider() -> wfe_valkey::ValkeyLockProvider { + let prefix = format!("wfe_test_{}", uuid::Uuid::new_v4().simple()); + wfe_valkey::ValkeyLockProvider::new("redis://localhost:6379", &prefix).await.unwrap() +} + +lock_suite!(make_provider); diff --git a/wfe-valkey/tests/queue.rs b/wfe-valkey/tests/queue.rs new file mode 100644 index 0000000..48f08e0 --- /dev/null +++ b/wfe-valkey/tests/queue.rs @@ -0,0 +1,8 @@ +use wfe_core::queue_suite; + +async fn make_provider() -> wfe_valkey::ValkeyQueueProvider { + let prefix = format!("wfe_test_{}", uuid::Uuid::new_v4().simple()); + wfe_valkey::ValkeyQueueProvider::new("redis://localhost:6379", &prefix).await.unwrap() +} + +queue_suite!(make_provider);