feat(wfe-valkey): add Valkey provider for locks, queues, and lifecycle events
ValkeyLockProvider: SET NX EX for acquisition, Lua script for safe release. ValkeyQueueProvider: LPUSH/RPOP for FIFO queues. ValkeyLifecyclePublisher: PUBLISH to per-instance and global channels. Connections obtained once during construction (no per-operation TCP handshakes).
This commit is contained in:
25
wfe-valkey/Cargo.toml
Normal file
25
wfe-valkey/Cargo.toml
Normal file
@@ -0,0 +1,25 @@
|
||||
[package]
|
||||
name = "wfe-valkey"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
description = "Valkey/Redis provider for distributed locking, queues, and lifecycle events in WFE"
|
||||
|
||||
[dependencies]
|
||||
wfe-core = { workspace = true }
|
||||
redis = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
wfe-core = { workspace = true, features = ["test-support"] }
|
||||
pretty_assertions = { workspace = true }
|
||||
rstest = { workspace = true }
|
||||
tokio = { workspace = true, features = ["test-util"] }
|
||||
tokio-stream = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
7
wfe-valkey/src/lib.rs
Normal file
7
wfe-valkey/src/lib.rs
Normal file
@@ -0,0 +1,7 @@
|
||||
pub mod lifecycle;
|
||||
pub mod lock;
|
||||
pub mod queue;
|
||||
|
||||
pub use lifecycle::ValkeyLifecyclePublisher;
|
||||
pub use lock::ValkeyLockProvider;
|
||||
pub use queue::ValkeyQueueProvider;
|
||||
55
wfe-valkey/src/lifecycle.rs
Normal file
55
wfe-valkey/src/lifecycle.rs
Normal file
@@ -0,0 +1,55 @@
|
||||
use async_trait::async_trait;
|
||||
use wfe_core::models::LifecycleEvent;
|
||||
use wfe_core::traits::LifecyclePublisher;
|
||||
|
||||
pub struct ValkeyLifecyclePublisher {
|
||||
conn: redis::aio::MultiplexedConnection,
|
||||
prefix: String,
|
||||
}
|
||||
|
||||
impl ValkeyLifecyclePublisher {
|
||||
pub async fn new(redis_url: &str, prefix: &str) -> wfe_core::Result<Self> {
|
||||
let client = redis::Client::open(redis_url)
|
||||
.map_err(|e| wfe_core::WfeError::Persistence(e.to_string()))?;
|
||||
let conn = client
|
||||
.get_multiplexed_tokio_connection()
|
||||
.await
|
||||
.map_err(|e| wfe_core::WfeError::Persistence(e.to_string()))?;
|
||||
Ok(Self {
|
||||
conn,
|
||||
prefix: prefix.to_string(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl LifecyclePublisher for ValkeyLifecyclePublisher {
|
||||
async fn publish(&self, event: LifecycleEvent) -> wfe_core::Result<()> {
|
||||
let mut conn = self.conn.clone();
|
||||
let json = serde_json::to_string(&event)?;
|
||||
|
||||
let instance_channel = format!(
|
||||
"{}:lifecycle:{}",
|
||||
self.prefix, event.workflow_instance_id
|
||||
);
|
||||
let all_channel = format!("{}:lifecycle:all", self.prefix);
|
||||
|
||||
// Publish to the instance-specific channel.
|
||||
redis::cmd("PUBLISH")
|
||||
.arg(&instance_channel)
|
||||
.arg(&json)
|
||||
.query_async::<i64>(&mut conn)
|
||||
.await
|
||||
.map_err(|e| wfe_core::WfeError::Persistence(e.to_string()))?;
|
||||
|
||||
// Publish to the global "all" channel.
|
||||
redis::cmd("PUBLISH")
|
||||
.arg(&all_channel)
|
||||
.arg(&json)
|
||||
.query_async::<i64>(&mut conn)
|
||||
.await
|
||||
.map_err(|e| wfe_core::WfeError::Persistence(e.to_string()))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
98
wfe-valkey/src/lock.rs
Normal file
98
wfe-valkey/src/lock.rs
Normal file
@@ -0,0 +1,98 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use uuid::Uuid;
|
||||
use wfe_core::traits::DistributedLockProvider;
|
||||
|
||||
pub struct ValkeyLockProvider {
|
||||
conn: redis::aio::MultiplexedConnection,
|
||||
prefix: String,
|
||||
lock_duration: Duration,
|
||||
/// Unique identifier for this provider instance, used as the lock value.
|
||||
instance_id: String,
|
||||
}
|
||||
|
||||
impl ValkeyLockProvider {
|
||||
pub async fn new(redis_url: &str, prefix: &str) -> wfe_core::Result<Self> {
|
||||
let client = redis::Client::open(redis_url)
|
||||
.map_err(|e| wfe_core::WfeError::Persistence(e.to_string()))?;
|
||||
let conn = client
|
||||
.get_multiplexed_tokio_connection()
|
||||
.await
|
||||
.map_err(|e| wfe_core::WfeError::Persistence(e.to_string()))?;
|
||||
Ok(Self {
|
||||
conn,
|
||||
prefix: prefix.to_string(),
|
||||
lock_duration: Duration::from_secs(30),
|
||||
instance_id: Uuid::new_v4().to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Create a provider with a custom lock duration.
|
||||
pub fn with_lock_duration(mut self, duration: Duration) -> Self {
|
||||
self.lock_duration = duration;
|
||||
self
|
||||
}
|
||||
|
||||
fn lock_key(&self, resource: &str) -> String {
|
||||
format!("{}:lock:{}", self.prefix, resource)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl DistributedLockProvider for ValkeyLockProvider {
|
||||
async fn acquire_lock(&self, resource: &str) -> wfe_core::Result<bool> {
|
||||
let mut conn = self.conn.clone();
|
||||
let key = self.lock_key(resource);
|
||||
let seconds = self.lock_duration.as_secs();
|
||||
|
||||
// SET key value NX EX seconds
|
||||
let result: Option<String> = redis::cmd("SET")
|
||||
.arg(&key)
|
||||
.arg(&self.instance_id)
|
||||
.arg("NX")
|
||||
.arg("EX")
|
||||
.arg(seconds)
|
||||
.query_async(&mut conn)
|
||||
.await
|
||||
.map_err(|e| wfe_core::WfeError::Persistence(e.to_string()))?;
|
||||
|
||||
Ok(result.is_some())
|
||||
}
|
||||
|
||||
async fn release_lock(&self, resource: &str) -> wfe_core::Result<()> {
|
||||
let mut conn = self.conn.clone();
|
||||
let key = self.lock_key(resource);
|
||||
|
||||
// Use a Lua script to only delete the key if the value matches our instance_id.
|
||||
// This prevents accidentally releasing a lock held by another instance.
|
||||
let script = redis::Script::new(
|
||||
r#"
|
||||
if redis.call("GET", KEYS[1]) == ARGV[1] then
|
||||
return redis.call("DEL", KEYS[1])
|
||||
else
|
||||
return 0
|
||||
end
|
||||
"#,
|
||||
);
|
||||
|
||||
let _: i64 = script
|
||||
.key(&key)
|
||||
.arg(&self.instance_id)
|
||||
.invoke_async(&mut conn)
|
||||
.await
|
||||
.map_err(|e| wfe_core::WfeError::Persistence(e.to_string()))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn start(&self) -> wfe_core::Result<()> {
|
||||
// No-op: Redis/Valkey is always-on.
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn stop(&self) -> wfe_core::Result<()> {
|
||||
// No-op.
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
73
wfe-valkey/src/queue.rs
Normal file
73
wfe-valkey/src/queue.rs
Normal file
@@ -0,0 +1,73 @@
|
||||
use async_trait::async_trait;
|
||||
use redis::AsyncCommands;
|
||||
use wfe_core::models::QueueType;
|
||||
use wfe_core::traits::QueueProvider;
|
||||
|
||||
pub struct ValkeyQueueProvider {
|
||||
conn: redis::aio::MultiplexedConnection,
|
||||
prefix: String,
|
||||
}
|
||||
|
||||
impl ValkeyQueueProvider {
|
||||
pub async fn new(redis_url: &str, prefix: &str) -> wfe_core::Result<Self> {
|
||||
let client = redis::Client::open(redis_url)
|
||||
.map_err(|e| wfe_core::WfeError::Persistence(e.to_string()))?;
|
||||
let conn = client
|
||||
.get_multiplexed_tokio_connection()
|
||||
.await
|
||||
.map_err(|e| wfe_core::WfeError::Persistence(e.to_string()))?;
|
||||
Ok(Self {
|
||||
conn,
|
||||
prefix: prefix.to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
fn queue_key(&self, queue_type: QueueType) -> String {
|
||||
let type_str = match queue_type {
|
||||
QueueType::Workflow => "workflow",
|
||||
QueueType::Event => "event",
|
||||
QueueType::Index => "index",
|
||||
};
|
||||
format!("{}:queue:{}", self.prefix, type_str)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl QueueProvider for ValkeyQueueProvider {
|
||||
async fn queue_work(&self, id: &str, queue: QueueType) -> wfe_core::Result<()> {
|
||||
let mut conn = self.conn.clone();
|
||||
let key = self.queue_key(queue);
|
||||
|
||||
conn.lpush::<_, _, ()>(&key, id)
|
||||
.await
|
||||
.map_err(|e| wfe_core::WfeError::Persistence(e.to_string()))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn dequeue_work(&self, queue: QueueType) -> wfe_core::Result<Option<String>> {
|
||||
let mut conn = self.conn.clone();
|
||||
let key = self.queue_key(queue);
|
||||
|
||||
let result: Option<String> = conn
|
||||
.rpop(&key, None)
|
||||
.await
|
||||
.map_err(|e| wfe_core::WfeError::Persistence(e.to_string()))?;
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
fn is_dequeue_blocking(&self) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
async fn start(&self) -> wfe_core::Result<()> {
|
||||
// No-op.
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn stop(&self) -> wfe_core::Result<()> {
|
||||
// No-op.
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
114
wfe-valkey/tests/lifecycle.rs
Normal file
114
wfe-valkey/tests/lifecycle.rs
Normal file
@@ -0,0 +1,114 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use wfe_core::models::{LifecycleEvent, LifecycleEventType};
|
||||
use wfe_core::traits::LifecyclePublisher;
|
||||
|
||||
fn redis_available() -> bool {
|
||||
redis::Client::open("redis://localhost:6379")
|
||||
.and_then(|c| c.get_connection())
|
||||
.is_ok()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn publish_subscribe_round_trip() {
|
||||
if !redis_available() {
|
||||
eprintln!("Skipping test: Valkey/Redis not available");
|
||||
return;
|
||||
}
|
||||
|
||||
let prefix = format!("wfe_test_{}", uuid::Uuid::new_v4().simple());
|
||||
let publisher =
|
||||
wfe_valkey::ValkeyLifecyclePublisher::new("redis://localhost:6379", &prefix).await.unwrap();
|
||||
|
||||
let instance_id = "wf-lifecycle-test-1";
|
||||
let channel = format!("{}:lifecycle:{}", prefix, instance_id);
|
||||
|
||||
// Set up a subscriber in a background task.
|
||||
let sub_client = redis::Client::open("redis://localhost:6379").unwrap();
|
||||
let mut pubsub = sub_client.get_async_pubsub().await.unwrap();
|
||||
pubsub.subscribe(&channel).await.unwrap();
|
||||
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(1);
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
let mut stream = pubsub.into_on_message();
|
||||
use tokio_stream::StreamExt;
|
||||
if let Some(msg) = stream.next().await {
|
||||
let payload: String = msg.get_payload().unwrap();
|
||||
let _ = tx.send(payload).await;
|
||||
}
|
||||
});
|
||||
|
||||
// Small delay to ensure the subscription is active before publishing.
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
|
||||
let event = LifecycleEvent::new(
|
||||
instance_id,
|
||||
"def-1",
|
||||
1,
|
||||
LifecycleEventType::Started,
|
||||
);
|
||||
publisher.publish(event).await.unwrap();
|
||||
|
||||
// Wait for the message with a timeout.
|
||||
let received = tokio::time::timeout(Duration::from_secs(5), rx.recv())
|
||||
.await
|
||||
.expect("Timed out waiting for lifecycle event")
|
||||
.expect("Channel closed without receiving a message");
|
||||
|
||||
let deserialized: LifecycleEvent = serde_json::from_str(&received).unwrap();
|
||||
assert_eq!(deserialized.workflow_instance_id, instance_id);
|
||||
assert_eq!(deserialized.event_type, LifecycleEventType::Started);
|
||||
|
||||
handle.abort();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn publish_to_all_channel() {
|
||||
if !redis_available() {
|
||||
eprintln!("Skipping test: Valkey/Redis not available");
|
||||
return;
|
||||
}
|
||||
|
||||
let prefix = format!("wfe_test_{}", uuid::Uuid::new_v4().simple());
|
||||
let publisher =
|
||||
wfe_valkey::ValkeyLifecyclePublisher::new("redis://localhost:6379", &prefix).await.unwrap();
|
||||
|
||||
let all_channel = format!("{}:lifecycle:all", prefix);
|
||||
|
||||
let sub_client = redis::Client::open("redis://localhost:6379").unwrap();
|
||||
let mut pubsub = sub_client.get_async_pubsub().await.unwrap();
|
||||
pubsub.subscribe(&all_channel).await.unwrap();
|
||||
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(1);
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
let mut stream = pubsub.into_on_message();
|
||||
use tokio_stream::StreamExt;
|
||||
if let Some(msg) = stream.next().await {
|
||||
let payload: String = msg.get_payload().unwrap();
|
||||
let _ = tx.send(payload).await;
|
||||
}
|
||||
});
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
|
||||
let event = LifecycleEvent::new(
|
||||
"wf-all-test",
|
||||
"def-1",
|
||||
1,
|
||||
LifecycleEventType::Completed,
|
||||
);
|
||||
publisher.publish(event).await.unwrap();
|
||||
|
||||
let received = tokio::time::timeout(Duration::from_secs(5), rx.recv())
|
||||
.await
|
||||
.expect("Timed out waiting for lifecycle event on 'all' channel")
|
||||
.expect("Channel closed");
|
||||
|
||||
let deserialized: LifecycleEvent = serde_json::from_str(&received).unwrap();
|
||||
assert_eq!(deserialized.workflow_instance_id, "wf-all-test");
|
||||
assert_eq!(deserialized.event_type, LifecycleEventType::Completed);
|
||||
|
||||
handle.abort();
|
||||
}
|
||||
8
wfe-valkey/tests/lock.rs
Normal file
8
wfe-valkey/tests/lock.rs
Normal file
@@ -0,0 +1,8 @@
|
||||
use wfe_core::lock_suite;
|
||||
|
||||
async fn make_provider() -> wfe_valkey::ValkeyLockProvider {
|
||||
let prefix = format!("wfe_test_{}", uuid::Uuid::new_v4().simple());
|
||||
wfe_valkey::ValkeyLockProvider::new("redis://localhost:6379", &prefix).await.unwrap()
|
||||
}
|
||||
|
||||
lock_suite!(make_provider);
|
||||
8
wfe-valkey/tests/queue.rs
Normal file
8
wfe-valkey/tests/queue.rs
Normal file
@@ -0,0 +1,8 @@
|
||||
use wfe_core::queue_suite;
|
||||
|
||||
async fn make_provider() -> wfe_valkey::ValkeyQueueProvider {
|
||||
let prefix = format!("wfe_test_{}", uuid::Uuid::new_v4().simple());
|
||||
wfe_valkey::ValkeyQueueProvider::new("redis://localhost:6379", &prefix).await.unwrap()
|
||||
}
|
||||
|
||||
queue_suite!(make_provider);
|
||||
Reference in New Issue
Block a user