diff --git a/wfe-core/Cargo.toml b/wfe-core/Cargo.toml new file mode 100644 index 0000000..c74b99b --- /dev/null +++ b/wfe-core/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "wfe-core" +version.workspace = true +edition.workspace = true +license.workspace = true +description = "Core traits, models, builder, and executor for the WFE workflow engine" + +[features] +default = [] +test-support = [] + +[dependencies] +tokio = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +async-trait = { workspace = true } +uuid = { workspace = true } +chrono = { workspace = true } +thiserror = { workspace = true } +tracing = { workspace = true } +tokio-util = "0.7" + +[dev-dependencies] +pretty_assertions = { workspace = true } +rstest = { workspace = true } +tokio = { workspace = true, features = ["test-util"] } diff --git a/wfe-core/src/error.rs b/wfe-core/src/error.rs new file mode 100644 index 0000000..2700580 --- /dev/null +++ b/wfe-core/src/error.rs @@ -0,0 +1,39 @@ +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum WfeError { + #[error("Workflow not found: {0}")] + WorkflowNotFound(String), + + #[error("Workflow definition not found: {id} v{version}")] + DefinitionNotFound { id: String, version: u32 }, + + #[error("Event not found: {0}")] + EventNotFound(String), + + #[error("Subscription not found: {0}")] + SubscriptionNotFound(String), + + #[error("Step not found: {0}")] + StepNotFound(usize), + + #[error("Lock acquisition failed for: {0}")] + LockFailed(String), + + #[error("Persistence error: {0}")] + Persistence(String), + + #[error("Serialization error: {0}")] + Serialization(#[from] serde_json::Error), + + #[error("Step execution error: {0}")] + StepExecution(String), + + #[error("Workflow cancelled")] + Cancelled, + + #[error(transparent)] + Other(#[from] Box), +} + +pub type Result = std::result::Result; diff --git a/wfe-core/src/lib.rs b/wfe-core/src/lib.rs new file mode 100644 index 0000000..73ca959 --- /dev/null +++ b/wfe-core/src/lib.rs @@ -0,0 +1,11 @@ +pub mod builder; +pub mod error; +pub mod executor; +pub mod models; +pub mod primitives; +pub mod traits; + +#[cfg(any(test, feature = "test-support"))] +pub mod test_support; + +pub use error::{Result, WfeError}; diff --git a/wfe-core/src/models/error_behavior.rs b/wfe-core/src/models/error_behavior.rs new file mode 100644 index 0000000..68917b9 --- /dev/null +++ b/wfe-core/src/models/error_behavior.rs @@ -0,0 +1,80 @@ +use std::time::Duration; + +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub enum ErrorBehavior { + Retry { + #[serde(with = "duration_millis")] + interval: Duration, + #[serde(default = "default_max_retries")] + max_retries: u32, + }, + Suspend, + Terminate, + Compensate, +} + +fn default_max_retries() -> u32 { + 3 +} + +impl Default for ErrorBehavior { + fn default() -> Self { + Self::Retry { + interval: Duration::from_secs(60), + max_retries: default_max_retries(), + } + } +} + +mod duration_millis { + use std::time::Duration; + + use serde::{Deserialize, Deserializer, Serializer}; + + pub fn serialize(duration: &Duration, serializer: S) -> Result { + serializer.serialize_u64(duration.as_millis() as u64) + } + + pub fn deserialize<'de, D: Deserializer<'de>>(deserializer: D) -> Result { + let millis = u64::deserialize(deserializer)?; + Ok(Duration::from_millis(millis)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn default_is_retry_60s() { + let behavior = ErrorBehavior::default(); + assert_eq!( + behavior, + ErrorBehavior::Retry { + interval: Duration::from_secs(60), + max_retries: 3, + } + ); + } + + #[test] + fn serde_round_trip() { + let variants = vec![ + ErrorBehavior::Retry { + interval: Duration::from_secs(30), + max_retries: 3, + }, + ErrorBehavior::Suspend, + ErrorBehavior::Terminate, + ErrorBehavior::Compensate, + ]; + for variant in variants { + let json = serde_json::to_string(&variant).unwrap(); + let deserialized: ErrorBehavior = serde_json::from_str(&json).unwrap(); + assert_eq!(variant, deserialized); + } + } +} diff --git a/wfe-core/src/models/event.rs b/wfe-core/src/models/event.rs new file mode 100644 index 0000000..484ec9d --- /dev/null +++ b/wfe-core/src/models/event.rs @@ -0,0 +1,119 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Event { + pub id: String, + pub event_name: String, + pub event_key: String, + pub event_data: serde_json::Value, + pub event_time: DateTime, + pub is_processed: bool, +} + +impl Event { + pub fn new( + event_name: impl Into, + event_key: impl Into, + event_data: serde_json::Value, + ) -> Self { + Self { + id: uuid::Uuid::new_v4().to_string(), + event_name: event_name.into(), + event_key: event_key.into(), + event_data, + event_time: Utc::now(), + is_processed: false, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EventSubscription { + pub id: String, + pub workflow_id: String, + pub step_id: usize, + pub execution_pointer_id: String, + pub event_name: String, + pub event_key: String, + pub subscribe_as_of: DateTime, + pub subscription_data: Option, + pub external_token: Option, + pub external_worker_id: Option, + pub external_token_expiry: Option>, +} + +impl EventSubscription { + pub fn new( + workflow_id: impl Into, + step_id: usize, + execution_pointer_id: impl Into, + event_name: impl Into, + event_key: impl Into, + subscribe_as_of: DateTime, + ) -> Self { + Self { + id: uuid::Uuid::new_v4().to_string(), + workflow_id: workflow_id.into(), + step_id, + execution_pointer_id: execution_pointer_id.into(), + event_name: event_name.into(), + event_key: event_key.into(), + subscribe_as_of, + subscription_data: None, + external_token: None, + external_worker_id: None, + external_token_expiry: None, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn new_event_defaults() { + let event = Event::new("order.created", "order-456", serde_json::json!({"amount": 100})); + assert_eq!(event.event_name, "order.created"); + assert_eq!(event.event_key, "order-456"); + assert!(!event.is_processed); + } + + #[test] + fn new_event_generates_unique_ids() { + let e1 = Event::new("test", "key", serde_json::json!(null)); + let e2 = Event::new("test", "key", serde_json::json!(null)); + assert_ne!(e1.id, e2.id); + } + + #[test] + fn event_serde_round_trip() { + let event = Event::new("test", "key", serde_json::json!({"data": true})); + let json = serde_json::to_string(&event).unwrap(); + let deserialized: Event = serde_json::from_str(&json).unwrap(); + assert_eq!(event.id, deserialized.id); + assert_eq!(event.event_name, deserialized.event_name); + assert_eq!(event.event_data, deserialized.event_data); + } + + #[test] + fn new_subscription_defaults() { + let sub = EventSubscription::new("wf-1", 0, "ptr-1", "evt", "key", Utc::now()); + assert_eq!(sub.workflow_id, "wf-1"); + assert_eq!(sub.step_id, 0); + assert!(sub.external_token.is_none()); + assert!(sub.subscription_data.is_none()); + } + + #[test] + fn subscription_serde_round_trip() { + let sub = EventSubscription::new("wf-1", 2, "ptr-1", "evt", "key", Utc::now()); + let json = serde_json::to_string(&sub).unwrap(); + let deserialized: EventSubscription = serde_json::from_str(&json).unwrap(); + assert_eq!(sub.id, deserialized.id); + assert_eq!(sub.workflow_id, deserialized.workflow_id); + assert_eq!(sub.event_name, deserialized.event_name); + } +} diff --git a/wfe-core/src/models/execution_error.rs b/wfe-core/src/models/execution_error.rs new file mode 100644 index 0000000..73ac619 --- /dev/null +++ b/wfe-core/src/models/execution_error.rs @@ -0,0 +1,48 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ExecutionError { + pub error_time: DateTime, + pub workflow_id: String, + pub execution_pointer_id: String, + pub message: String, +} + +impl ExecutionError { + pub fn new( + workflow_id: impl Into, + execution_pointer_id: impl Into, + message: impl Into, + ) -> Self { + Self { + error_time: Utc::now(), + workflow_id: workflow_id.into(), + execution_pointer_id: execution_pointer_id.into(), + message: message.into(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn new_error_captures_fields() { + let err = ExecutionError::new("wf-1", "ptr-1", "something went wrong"); + assert_eq!(err.workflow_id, "wf-1"); + assert_eq!(err.execution_pointer_id, "ptr-1"); + assert_eq!(err.message, "something went wrong"); + } + + #[test] + fn serde_round_trip() { + let err = ExecutionError::new("wf-1", "ptr-1", "fail"); + let json = serde_json::to_string(&err).unwrap(); + let deserialized: ExecutionError = serde_json::from_str(&json).unwrap(); + assert_eq!(err.workflow_id, deserialized.workflow_id); + assert_eq!(err.message, deserialized.message); + } +} diff --git a/wfe-core/src/models/execution_pointer.rs b/wfe-core/src/models/execution_pointer.rs new file mode 100644 index 0000000..2ed4714 --- /dev/null +++ b/wfe-core/src/models/execution_pointer.rs @@ -0,0 +1,101 @@ +use std::collections::HashMap; + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; + +use super::status::PointerStatus; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ExecutionPointer { + pub id: String, + pub step_id: usize, + pub active: bool, + pub status: PointerStatus, + pub sleep_until: Option>, + pub persistence_data: Option, + pub start_time: Option>, + pub end_time: Option>, + pub event_name: Option, + pub event_key: Option, + pub event_published: bool, + pub event_data: Option, + pub step_name: Option, + pub retry_count: u32, + pub children: Vec, + pub context_item: Option, + pub predecessor_id: Option, + pub outcome: Option, + pub scope: Vec, + pub extension_attributes: HashMap, +} + +impl ExecutionPointer { + pub fn new(step_id: usize) -> Self { + Self { + id: uuid::Uuid::new_v4().to_string(), + step_id, + active: true, + status: PointerStatus::Pending, + sleep_until: None, + persistence_data: None, + start_time: None, + end_time: None, + event_name: None, + event_key: None, + event_published: false, + event_data: None, + step_name: None, + retry_count: 0, + children: Vec::new(), + context_item: None, + predecessor_id: None, + outcome: None, + scope: Vec::new(), + extension_attributes: HashMap::new(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn new_pointer_has_correct_defaults() { + let pointer = ExecutionPointer::new(0); + assert_eq!(pointer.step_id, 0); + assert!(pointer.active); + assert_eq!(pointer.status, PointerStatus::Pending); + assert_eq!(pointer.retry_count, 0); + assert!(pointer.children.is_empty()); + assert!(pointer.scope.is_empty()); + assert!(!pointer.event_published); + } + + #[test] + fn new_pointer_generates_unique_ids() { + let p1 = ExecutionPointer::new(0); + let p2 = ExecutionPointer::new(0); + assert_ne!(p1.id, p2.id); + } + + #[test] + fn serde_round_trip() { + let mut pointer = ExecutionPointer::new(3); + pointer.status = PointerStatus::Running; + pointer.retry_count = 2; + pointer.persistence_data = Some(serde_json::json!({"step_state": true})); + pointer.children = vec!["child-1".into(), "child-2".into()]; + + let json = serde_json::to_string(&pointer).unwrap(); + let deserialized: ExecutionPointer = serde_json::from_str(&json).unwrap(); + + assert_eq!(pointer.id, deserialized.id); + assert_eq!(pointer.step_id, deserialized.step_id); + assert_eq!(pointer.status, deserialized.status); + assert_eq!(pointer.retry_count, deserialized.retry_count); + assert_eq!(pointer.persistence_data, deserialized.persistence_data); + assert_eq!(pointer.children, deserialized.children); + } +} diff --git a/wfe-core/src/models/execution_result.rs b/wfe-core/src/models/execution_result.rs new file mode 100644 index 0000000..9551e53 --- /dev/null +++ b/wfe-core/src/models/execution_result.rs @@ -0,0 +1,188 @@ +use std::time::Duration; + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; + +use super::poll_config::PollEndpointConfig; + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct ExecutionResult { + /// Whether the workflow should proceed to the next step. + pub proceed: bool, + /// Outcome value for decision-based routing. + pub outcome_value: Option, + /// Duration to sleep before re-executing. + #[serde(default, with = "super::option_duration_millis")] + pub sleep_for: Option, + /// Step-specific state to persist between executions. + pub persistence_data: Option, + /// Event name to wait for. + pub event_name: Option, + /// Event key to match. + pub event_key: Option, + /// Only consider events published after this time. + pub event_as_of: Option>, + /// Values to branch execution on (for ForEach/Parallel). + pub branch_values: Option>, + /// Poll endpoint configuration for external service polling. + pub poll_endpoint: Option, +} + +impl ExecutionResult { + /// Continue to the next step. + pub fn next() -> Self { + Self { + proceed: true, + ..Default::default() + } + } + + /// Continue with an outcome value for decision routing. + pub fn outcome(value: impl Into) -> Self { + Self { + proceed: true, + outcome_value: Some(value.into()), + ..Default::default() + } + } + + /// Pause execution and persist step-specific data. + pub fn persist(data: serde_json::Value) -> Self { + Self { + proceed: false, + persistence_data: Some(data), + ..Default::default() + } + } + + /// Create child branches for parallel/foreach execution. + pub fn branch(values: Vec, persistence_data: Option) -> Self { + Self { + proceed: false, + branch_values: Some(values), + persistence_data, + ..Default::default() + } + } + + /// Sleep for a duration before re-executing. + pub fn sleep(duration: Duration, persistence_data: Option) -> Self { + Self { + proceed: false, + sleep_for: Some(duration), + persistence_data, + ..Default::default() + } + } + + /// Wait for an external event. + pub fn wait_for_event( + event_name: impl Into, + event_key: impl Into, + as_of: DateTime, + ) -> Self { + Self { + proceed: false, + event_name: Some(event_name.into()), + event_key: Some(event_key.into()), + event_as_of: Some(as_of), + ..Default::default() + } + } + + /// Poll an external endpoint until a condition is met. + pub fn poll_endpoint(config: PollEndpointConfig) -> Self { + Self { + proceed: false, + poll_endpoint: Some(config), + ..Default::default() + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn next_proceeds_with_no_data() { + let result = ExecutionResult::next(); + assert!(result.proceed); + assert!(result.outcome_value.is_none()); + assert!(result.sleep_for.is_none()); + assert!(result.persistence_data.is_none()); + assert!(result.event_name.is_none()); + assert!(result.branch_values.is_none()); + assert!(result.poll_endpoint.is_none()); + } + + #[test] + fn outcome_proceeds_with_value() { + let result = ExecutionResult::outcome(serde_json::json!(42)); + assert!(result.proceed); + assert_eq!(result.outcome_value, Some(serde_json::json!(42))); + } + + #[test] + fn persist_does_not_proceed() { + let data = serde_json::json!({"counter": 5}); + let result = ExecutionResult::persist(data.clone()); + assert!(!result.proceed); + assert_eq!(result.persistence_data, Some(data)); + } + + #[test] + fn branch_creates_child_values() { + let values = vec![serde_json::json!(1), serde_json::json!(2), serde_json::json!(3)]; + let result = ExecutionResult::branch(values.clone(), None); + assert!(!result.proceed); + assert_eq!(result.branch_values, Some(values)); + } + + #[test] + fn sleep_sets_duration() { + let result = ExecutionResult::sleep(Duration::from_secs(30), None); + assert!(!result.proceed); + assert_eq!(result.sleep_for, Some(Duration::from_secs(30))); + } + + #[test] + fn wait_for_event_sets_event_fields() { + let now = Utc::now(); + let result = ExecutionResult::wait_for_event("order.completed", "order-123", now); + assert!(!result.proceed); + assert_eq!(result.event_name.as_deref(), Some("order.completed")); + assert_eq!(result.event_key.as_deref(), Some("order-123")); + assert_eq!(result.event_as_of, Some(now)); + } + + #[test] + fn poll_endpoint_sets_config() { + use super::super::poll_config::*; + use std::collections::HashMap; + + let config = PollEndpointConfig { + url: "https://api.example.com/status".into(), + method: HttpMethod::Get, + headers: HashMap::new(), + body: None, + interval: Duration::from_secs(10), + timeout: Duration::from_secs(300), + condition: PollCondition::StatusCode(200), + }; + let result = ExecutionResult::poll_endpoint(config.clone()); + assert!(!result.proceed); + assert_eq!(result.poll_endpoint, Some(config)); + } + + #[test] + fn serde_round_trip() { + let result = ExecutionResult::sleep(Duration::from_secs(30), Some(serde_json::json!({"x": 1}))); + let json = serde_json::to_string(&result).unwrap(); + let deserialized: ExecutionResult = serde_json::from_str(&json).unwrap(); + assert_eq!(result.proceed, deserialized.proceed); + assert_eq!(result.sleep_for, deserialized.sleep_for); + assert_eq!(result.persistence_data, deserialized.persistence_data); + } +} diff --git a/wfe-core/src/models/lifecycle.rs b/wfe-core/src/models/lifecycle.rs new file mode 100644 index 0000000..880cb55 --- /dev/null +++ b/wfe-core/src/models/lifecycle.rs @@ -0,0 +1,104 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LifecycleEvent { + pub event_time_utc: DateTime, + pub workflow_instance_id: String, + pub workflow_definition_id: String, + pub version: u32, + pub reference: Option, + pub event_type: LifecycleEventType, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum LifecycleEventType { + Started, + Resumed, + Suspended, + Completed, + Terminated, + Error { message: String }, + StepStarted { step_id: usize, step_name: Option }, + StepCompleted { step_id: usize, step_name: Option }, +} + +impl LifecycleEvent { + pub fn new( + workflow_instance_id: impl Into, + workflow_definition_id: impl Into, + version: u32, + event_type: LifecycleEventType, + ) -> Self { + Self { + event_time_utc: Utc::now(), + workflow_instance_id: workflow_instance_id.into(), + workflow_definition_id: workflow_definition_id.into(), + version, + reference: None, + event_type, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn lifecycle_event_types_are_distinct() { + assert_ne!(LifecycleEventType::Started, LifecycleEventType::Completed); + } + + #[test] + fn serde_round_trip_simple_variant() { + let event = LifecycleEvent::new("wf-1", "def-1", 1, LifecycleEventType::Started); + let json = serde_json::to_string(&event).unwrap(); + let deserialized: LifecycleEvent = serde_json::from_str(&json).unwrap(); + assert_eq!(event.workflow_instance_id, deserialized.workflow_instance_id); + assert_eq!(event.event_type, deserialized.event_type); + } + + #[test] + fn serde_round_trip_error_variant() { + let event = LifecycleEvent::new( + "wf-1", + "def-1", + 1, + LifecycleEventType::Error { + message: "boom".into(), + }, + ); + let json = serde_json::to_string(&event).unwrap(); + let deserialized: LifecycleEvent = serde_json::from_str(&json).unwrap(); + assert_eq!( + deserialized.event_type, + LifecycleEventType::Error { + message: "boom".into() + } + ); + } + + #[test] + fn serde_round_trip_step_variant() { + let event = LifecycleEvent::new( + "wf-1", + "def-1", + 1, + LifecycleEventType::StepStarted { + step_id: 3, + step_name: Some("ProcessOrder".into()), + }, + ); + let json = serde_json::to_string(&event).unwrap(); + let deserialized: LifecycleEvent = serde_json::from_str(&json).unwrap(); + assert_eq!( + deserialized.event_type, + LifecycleEventType::StepStarted { + step_id: 3, + step_name: Some("ProcessOrder".into()), + } + ); + } +} diff --git a/wfe-core/src/models/mod.rs b/wfe-core/src/models/mod.rs new file mode 100644 index 0000000..169099f --- /dev/null +++ b/wfe-core/src/models/mod.rs @@ -0,0 +1,65 @@ +pub mod error_behavior; +pub mod event; +pub mod execution_error; +pub mod execution_pointer; +pub mod execution_result; +pub mod lifecycle; +pub mod poll_config; +pub mod queue_type; +pub mod scheduled_command; +pub mod status; +pub mod workflow_definition; +pub mod workflow_instance; + +pub use error_behavior::ErrorBehavior; +pub use event::{Event, EventSubscription}; +pub use execution_error::ExecutionError; +pub use execution_pointer::ExecutionPointer; +pub use execution_result::ExecutionResult; +pub use lifecycle::{LifecycleEvent, LifecycleEventType}; +pub use poll_config::{HttpMethod, PollCondition, PollEndpointConfig}; +pub use queue_type::QueueType; +pub use scheduled_command::{CommandName, ScheduledCommand}; +pub use status::{PointerStatus, WorkflowStatus}; +pub use workflow_definition::{StepOutcome, WorkflowDefinition, WorkflowStep}; +pub use workflow_instance::WorkflowInstance; + +/// Serde helper for `Option` as milliseconds. +pub(crate) mod option_duration_millis { + use std::time::Duration; + + use serde::{Deserialize, Deserializer, Serializer}; + + pub fn serialize( + duration: &Option, + serializer: S, + ) -> Result { + match duration { + Some(d) => serializer.serialize_some(&(d.as_millis() as u64)), + None => serializer.serialize_none(), + } + } + + pub fn deserialize<'de, D: Deserializer<'de>>( + deserializer: D, + ) -> Result, D::Error> { + let millis: Option = Option::deserialize(deserializer)?; + Ok(millis.map(Duration::from_millis)) + } +} + +/// Serde helper for `Duration` as milliseconds (non-optional). +pub(crate) mod duration_millis { + use std::time::Duration; + + use serde::{Deserialize, Deserializer, Serializer}; + + pub fn serialize(duration: &Duration, serializer: S) -> Result { + serializer.serialize_u64(duration.as_millis() as u64) + } + + pub fn deserialize<'de, D: Deserializer<'de>>(deserializer: D) -> Result { + let millis = u64::deserialize(deserializer)?; + Ok(Duration::from_millis(millis)) + } +} diff --git a/wfe-core/src/models/poll_config.rs b/wfe-core/src/models/poll_config.rs new file mode 100644 index 0000000..b8e7aa7 --- /dev/null +++ b/wfe-core/src/models/poll_config.rs @@ -0,0 +1,83 @@ +use std::collections::HashMap; +use std::time::Duration; + +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum HttpMethod { + Get, + Post, + Put, + Patch, + Delete, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub enum PollCondition { + /// Check a JSON path equals a value: e.g. JsonPathEquals("$.status", "complete") + JsonPathEquals { + path: String, + value: serde_json::Value, + }, + /// Check HTTP status code + StatusCode(u16), + /// Check response body contains string + BodyContains(String), +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct PollEndpointConfig { + /// URL template. Supports `{placeholder}` interpolation from workflow data. + pub url: String, + pub method: HttpMethod, + #[serde(default)] + pub headers: HashMap, + #[serde(default)] + pub body: Option, + #[serde(with = "super::duration_millis")] + pub interval: Duration, + #[serde(with = "super::duration_millis")] + pub timeout: Duration, + pub condition: PollCondition, +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn poll_config_serde_round_trip() { + let config = PollEndpointConfig { + url: "https://api.example.com/status/{id}".into(), + method: HttpMethod::Get, + headers: HashMap::from([("Authorization".into(), "Bearer token123".into())]), + body: None, + interval: Duration::from_secs(30), + timeout: Duration::from_secs(3600), + condition: PollCondition::JsonPathEquals { + path: "$.status".into(), + value: serde_json::Value::String("complete".into()), + }, + }; + let json = serde_json::to_string(&config).unwrap(); + let deserialized: PollEndpointConfig = serde_json::from_str(&json).unwrap(); + assert_eq!(config, deserialized); + } + + #[test] + fn poll_condition_status_code() { + let cond = PollCondition::StatusCode(200); + let json = serde_json::to_string(&cond).unwrap(); + let deserialized: PollCondition = serde_json::from_str(&json).unwrap(); + assert_eq!(cond, deserialized); + } + + #[test] + fn poll_condition_body_contains() { + let cond = PollCondition::BodyContains("success".into()); + let json = serde_json::to_string(&cond).unwrap(); + let deserialized: PollCondition = serde_json::from_str(&json).unwrap(); + assert_eq!(cond, deserialized); + } +} diff --git a/wfe-core/src/models/queue_type.rs b/wfe-core/src/models/queue_type.rs new file mode 100644 index 0000000..c2804e1 --- /dev/null +++ b/wfe-core/src/models/queue_type.rs @@ -0,0 +1,30 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum QueueType { + Workflow, + Event, + Index, +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn queue_types_are_distinct() { + assert_ne!(QueueType::Workflow, QueueType::Event); + assert_ne!(QueueType::Event, QueueType::Index); + assert_ne!(QueueType::Workflow, QueueType::Index); + } + + #[test] + fn serde_round_trip() { + for qt in [QueueType::Workflow, QueueType::Event, QueueType::Index] { + let json = serde_json::to_string(&qt).unwrap(); + let deserialized: QueueType = serde_json::from_str(&json).unwrap(); + assert_eq!(qt, deserialized); + } + } +} diff --git a/wfe-core/src/models/scheduled_command.rs b/wfe-core/src/models/scheduled_command.rs new file mode 100644 index 0000000..dfc18ab --- /dev/null +++ b/wfe-core/src/models/scheduled_command.rs @@ -0,0 +1,65 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum CommandName { + ProcessWorkflow, + ProcessEvent, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ScheduledCommand { + pub command_name: CommandName, + pub data: String, + /// Epoch milliseconds when the command should execute. + pub execute_time: i64, +} + +impl ScheduledCommand { + pub fn process_workflow(workflow_id: impl Into, execute_time: i64) -> Self { + Self { + command_name: CommandName::ProcessWorkflow, + data: workflow_id.into(), + execute_time, + } + } + + pub fn process_event(event_id: impl Into, execute_time: i64) -> Self { + Self { + command_name: CommandName::ProcessEvent, + data: event_id.into(), + execute_time, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn process_workflow_factory() { + let cmd = ScheduledCommand::process_workflow("wf-123", 1000); + assert_eq!(cmd.command_name, CommandName::ProcessWorkflow); + assert_eq!(cmd.data, "wf-123"); + assert_eq!(cmd.execute_time, 1000); + } + + #[test] + fn process_event_factory() { + let cmd = ScheduledCommand::process_event("evt-456", 2000); + assert_eq!(cmd.command_name, CommandName::ProcessEvent); + assert_eq!(cmd.data, "evt-456"); + assert_eq!(cmd.execute_time, 2000); + } + + #[test] + fn serde_round_trip() { + let cmd = ScheduledCommand::process_workflow("wf-1", 500); + let json = serde_json::to_string(&cmd).unwrap(); + let deserialized: ScheduledCommand = serde_json::from_str(&json).unwrap(); + assert_eq!(cmd.command_name, deserialized.command_name); + assert_eq!(cmd.data, deserialized.data); + assert_eq!(cmd.execute_time, deserialized.execute_time); + } +} diff --git a/wfe-core/src/models/status.rs b/wfe-core/src/models/status.rs new file mode 100644 index 0000000..c76aa3e --- /dev/null +++ b/wfe-core/src/models/status.rs @@ -0,0 +1,73 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum WorkflowStatus { + #[default] + Runnable, + Suspended, + Complete, + Terminated, +} + +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum PointerStatus { + #[default] + Pending, + Running, + Complete, + Sleeping, + WaitingForEvent, + Failed, + Compensated, + Cancelled, + PendingPredecessor, +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn workflow_status_default_is_runnable() { + assert_eq!(WorkflowStatus::default(), WorkflowStatus::Runnable); + } + + #[test] + fn pointer_status_default_is_pending() { + assert_eq!(PointerStatus::default(), PointerStatus::Pending); + } + + #[test] + fn workflow_status_serde_round_trip() { + for status in [ + WorkflowStatus::Runnable, + WorkflowStatus::Suspended, + WorkflowStatus::Complete, + WorkflowStatus::Terminated, + ] { + let json = serde_json::to_string(&status).unwrap(); + let deserialized: WorkflowStatus = serde_json::from_str(&json).unwrap(); + assert_eq!(status, deserialized); + } + } + + #[test] + fn pointer_status_serde_round_trip() { + for status in [ + PointerStatus::Pending, + PointerStatus::Running, + PointerStatus::Complete, + PointerStatus::Sleeping, + PointerStatus::WaitingForEvent, + PointerStatus::Failed, + PointerStatus::Compensated, + PointerStatus::Cancelled, + PointerStatus::PendingPredecessor, + ] { + let json = serde_json::to_string(&status).unwrap(); + let deserialized: PointerStatus = serde_json::from_str(&json).unwrap(); + assert_eq!(status, deserialized); + } + } +} diff --git a/wfe-core/src/models/workflow_definition.rs b/wfe-core/src/models/workflow_definition.rs new file mode 100644 index 0000000..1ff89ca --- /dev/null +++ b/wfe-core/src/models/workflow_definition.rs @@ -0,0 +1,121 @@ +use std::time::Duration; + +use serde::{Deserialize, Serialize}; + +use super::error_behavior::ErrorBehavior; + +/// A compiled workflow definition ready for execution. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WorkflowDefinition { + pub id: String, + pub version: u32, + pub description: Option, + pub steps: Vec, + pub default_error_behavior: ErrorBehavior, + #[serde(default, with = "super::option_duration_millis")] + pub default_error_retry_interval: Option, +} + +impl WorkflowDefinition { + pub fn new(id: impl Into, version: u32) -> Self { + Self { + id: id.into(), + version, + description: None, + steps: Vec::new(), + default_error_behavior: ErrorBehavior::default(), + default_error_retry_interval: None, + } + } +} + +/// A single step in a workflow definition. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WorkflowStep { + pub id: usize, + pub name: Option, + pub external_id: Option, + pub step_type: String, + pub children: Vec, + pub outcomes: Vec, + pub error_behavior: Option, + pub compensation_step_id: Option, + pub do_compensate: bool, + #[serde(default)] + pub saga: bool, + /// Serializable configuration for primitive steps (e.g. event_name, duration). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub step_config: Option, +} + +impl WorkflowStep { + pub fn new(id: usize, step_type: impl Into) -> Self { + Self { + id, + name: None, + external_id: None, + step_type: step_type.into(), + children: Vec::new(), + outcomes: Vec::new(), + error_behavior: None, + compensation_step_id: None, + do_compensate: false, + saga: false, + step_config: None, + } + } +} + +/// Routing outcome from a step. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StepOutcome { + pub next_step: usize, + pub label: Option, + pub value: Option, +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn definition_defaults() { + let def = WorkflowDefinition::new("test-workflow", 1); + assert_eq!(def.id, "test-workflow"); + assert_eq!(def.version, 1); + assert!(def.steps.is_empty()); + assert_eq!(def.default_error_behavior, ErrorBehavior::default()); + assert!(def.default_error_retry_interval.is_none()); + } + + #[test] + fn step_defaults() { + let step = WorkflowStep::new(0, "MyStep"); + assert_eq!(step.id, 0); + assert_eq!(step.step_type, "MyStep"); + assert!(step.children.is_empty()); + assert!(step.outcomes.is_empty()); + assert!(step.error_behavior.is_none()); + assert!(step.compensation_step_id.is_none()); + } + + #[test] + fn definition_serde_round_trip() { + let mut def = WorkflowDefinition::new("wf", 3); + let mut step = WorkflowStep::new(0, "StepA"); + step.outcomes.push(StepOutcome { + next_step: 1, + label: Some("next".into()), + value: None, + }); + def.steps.push(step); + def.steps.push(WorkflowStep::new(1, "StepB")); + + let json = serde_json::to_string(&def).unwrap(); + let deserialized: WorkflowDefinition = serde_json::from_str(&json).unwrap(); + assert_eq!(def.id, deserialized.id); + assert_eq!(def.steps.len(), deserialized.steps.len()); + assert_eq!(def.steps[0].outcomes[0].next_step, 1); + } +} diff --git a/wfe-core/src/models/workflow_instance.rs b/wfe-core/src/models/workflow_instance.rs new file mode 100644 index 0000000..9fac9f4 --- /dev/null +++ b/wfe-core/src/models/workflow_instance.rs @@ -0,0 +1,141 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; + +use super::execution_pointer::ExecutionPointer; +use super::status::{PointerStatus, WorkflowStatus}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WorkflowInstance { + pub id: String, + pub workflow_definition_id: String, + pub version: u32, + pub description: Option, + pub reference: Option, + pub execution_pointers: Vec, + pub next_execution: Option, + pub status: WorkflowStatus, + pub data: serde_json::Value, + pub create_time: DateTime, + pub complete_time: Option>, +} + +impl WorkflowInstance { + pub fn new(workflow_definition_id: impl Into, version: u32, data: serde_json::Value) -> Self { + Self { + id: uuid::Uuid::new_v4().to_string(), + workflow_definition_id: workflow_definition_id.into(), + version, + description: None, + reference: None, + execution_pointers: Vec::new(), + next_execution: Some(0), + status: WorkflowStatus::Runnable, + data, + create_time: Utc::now(), + complete_time: None, + } + } + + /// Check if all execution pointers in a given scope have completed. + pub fn is_branch_complete(&self, scope: &[String]) -> bool { + self.execution_pointers + .iter() + .filter(|p| p.scope == scope) + .all(|p| { + matches!( + p.status, + PointerStatus::Complete + | PointerStatus::Compensated + | PointerStatus::Cancelled + | PointerStatus::Failed + ) + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn new_instance_defaults() { + let instance = WorkflowInstance::new("test-workflow", 1, serde_json::json!({})); + assert_eq!(instance.workflow_definition_id, "test-workflow"); + assert_eq!(instance.version, 1); + assert_eq!(instance.status, WorkflowStatus::Runnable); + assert_eq!(instance.next_execution, Some(0)); + assert!(instance.execution_pointers.is_empty()); + assert!(instance.complete_time.is_none()); + } + + #[test] + fn is_branch_complete_empty_scope_returns_true() { + let instance = WorkflowInstance::new("test", 1, serde_json::json!({})); + assert!(instance.is_branch_complete(&[])); + } + + #[test] + fn is_branch_complete_all_complete() { + let scope = vec!["parent-1".to_string()]; + let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({})); + + let mut p1 = ExecutionPointer::new(0); + p1.scope = scope.clone(); + p1.status = PointerStatus::Complete; + + let mut p2 = ExecutionPointer::new(1); + p2.scope = scope.clone(); + p2.status = PointerStatus::Compensated; + + instance.execution_pointers = vec![p1, p2]; + assert!(instance.is_branch_complete(&scope)); + } + + #[test] + fn is_branch_complete_with_active_pointer() { + let scope = vec!["parent-1".to_string()]; + let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({})); + + let mut p1 = ExecutionPointer::new(0); + p1.scope = scope.clone(); + p1.status = PointerStatus::Complete; + + let mut p2 = ExecutionPointer::new(1); + p2.scope = scope.clone(); + p2.status = PointerStatus::Running; + + instance.execution_pointers = vec![p1, p2]; + assert!(!instance.is_branch_complete(&scope)); + } + + #[test] + fn is_branch_complete_ignores_different_scope() { + let scope_a = vec!["parent-a".to_string()]; + let scope_b = vec!["parent-b".to_string()]; + let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({})); + + let mut p1 = ExecutionPointer::new(0); + p1.scope = scope_a.clone(); + p1.status = PointerStatus::Complete; + + let mut p2 = ExecutionPointer::new(1); + p2.scope = scope_b.clone(); + p2.status = PointerStatus::Running; + + instance.execution_pointers = vec![p1, p2]; + assert!(instance.is_branch_complete(&scope_a)); + } + + #[test] + fn serde_round_trip() { + let instance = WorkflowInstance::new("my-workflow", 2, serde_json::json!({"key": "value"})); + let json = serde_json::to_string(&instance).unwrap(); + let deserialized: WorkflowInstance = serde_json::from_str(&json).unwrap(); + assert_eq!(instance.id, deserialized.id); + assert_eq!(instance.workflow_definition_id, deserialized.workflow_definition_id); + assert_eq!(instance.version, deserialized.version); + assert_eq!(instance.status, deserialized.status); + assert_eq!(instance.data, deserialized.data); + } +} diff --git a/wfe-core/src/traits/lifecycle.rs b/wfe-core/src/traits/lifecycle.rs new file mode 100644 index 0000000..847582d --- /dev/null +++ b/wfe-core/src/traits/lifecycle.rs @@ -0,0 +1,9 @@ +use async_trait::async_trait; + +use crate::models::LifecycleEvent; + +/// Publishes lifecycle events for workflow state transitions. +#[async_trait] +pub trait LifecyclePublisher: Send + Sync { + async fn publish(&self, event: LifecycleEvent) -> crate::Result<()>; +} diff --git a/wfe-core/src/traits/lock.rs b/wfe-core/src/traits/lock.rs new file mode 100644 index 0000000..cd24d09 --- /dev/null +++ b/wfe-core/src/traits/lock.rs @@ -0,0 +1,10 @@ +use async_trait::async_trait; + +/// Distributed lock provider for preventing concurrent execution of the same workflow. +#[async_trait] +pub trait DistributedLockProvider: Send + Sync { + async fn acquire_lock(&self, resource: &str) -> crate::Result; + async fn release_lock(&self, resource: &str) -> crate::Result<()>; + async fn start(&self) -> crate::Result<()>; + async fn stop(&self) -> crate::Result<()>; +} diff --git a/wfe-core/src/traits/middleware.rs b/wfe-core/src/traits/middleware.rs new file mode 100644 index 0000000..7bb4152 --- /dev/null +++ b/wfe-core/src/traits/middleware.rs @@ -0,0 +1,93 @@ +use async_trait::async_trait; + +use crate::models::{ExecutionResult, WorkflowInstance}; +use crate::traits::step::StepExecutionContext; + +/// Workflow-level middleware with default no-op implementations. +#[async_trait] +pub trait WorkflowMiddleware: Send + Sync { + async fn pre_workflow(&self, _instance: &WorkflowInstance) -> crate::Result<()> { + Ok(()) + } + async fn post_workflow(&self, _instance: &WorkflowInstance) -> crate::Result<()> { + Ok(()) + } +} + +/// Step-level middleware with default no-op implementations. +#[async_trait] +pub trait StepMiddleware: Send + Sync { + async fn pre_step(&self, _context: &StepExecutionContext<'_>) -> crate::Result<()> { + Ok(()) + } + async fn post_step( + &self, + _context: &StepExecutionContext<'_>, + _result: &ExecutionResult, + ) -> crate::Result<()> { + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::models::{ExecutionPointer, ExecutionResult, WorkflowInstance}; + + struct NoOpWorkflowMiddleware; + impl WorkflowMiddleware for NoOpWorkflowMiddleware {} + + struct NoOpStepMiddleware; + impl StepMiddleware for NoOpStepMiddleware {} + + #[tokio::test] + async fn workflow_middleware_default_pre_workflow() { + let mw = NoOpWorkflowMiddleware; + let instance = WorkflowInstance::new("wf", 1, serde_json::json!({})); + mw.pre_workflow(&instance).await.unwrap(); + } + + #[tokio::test] + async fn workflow_middleware_default_post_workflow() { + let mw = NoOpWorkflowMiddleware; + let instance = WorkflowInstance::new("wf", 1, serde_json::json!({})); + mw.post_workflow(&instance).await.unwrap(); + } + + #[tokio::test] + async fn step_middleware_default_pre_step() { + use crate::models::WorkflowStep; + let mw = NoOpStepMiddleware; + let instance = WorkflowInstance::new("wf", 1, serde_json::json!({})); + let pointer = ExecutionPointer::new(0); + let step = WorkflowStep::new(0, "test_step"); + let ctx = StepExecutionContext { + item: None, + execution_pointer: &pointer, + persistence_data: None, + step: &step, + workflow: &instance, + cancellation_token: tokio_util::sync::CancellationToken::new(), + }; + mw.pre_step(&ctx).await.unwrap(); + } + + #[tokio::test] + async fn step_middleware_default_post_step() { + use crate::models::WorkflowStep; + let mw = NoOpStepMiddleware; + let instance = WorkflowInstance::new("wf", 1, serde_json::json!({})); + let pointer = ExecutionPointer::new(0); + let step = WorkflowStep::new(0, "test_step"); + let ctx = StepExecutionContext { + item: None, + execution_pointer: &pointer, + persistence_data: None, + step: &step, + workflow: &instance, + cancellation_token: tokio_util::sync::CancellationToken::new(), + }; + let result = ExecutionResult::next(); + mw.post_step(&ctx, &result).await.unwrap(); + } +} diff --git a/wfe-core/src/traits/mod.rs b/wfe-core/src/traits/mod.rs new file mode 100644 index 0000000..85151f2 --- /dev/null +++ b/wfe-core/src/traits/mod.rs @@ -0,0 +1,20 @@ +pub mod lifecycle; +pub mod lock; +pub mod middleware; +pub mod persistence; +pub mod queue; +pub mod registry; +pub mod search; +pub mod step; + +pub use lifecycle::LifecyclePublisher; +pub use lock::DistributedLockProvider; +pub use middleware::{StepMiddleware, WorkflowMiddleware}; +pub use persistence::{ + EventRepository, PersistenceProvider, ScheduledCommandRepository, SubscriptionRepository, + WorkflowRepository, +}; +pub use queue::QueueProvider; +pub use registry::WorkflowRegistry; +pub use search::{Page, SearchFilter, SearchIndex, WorkflowSearchResult}; +pub use step::{StepBody, StepExecutionContext, WorkflowData}; diff --git a/wfe-core/src/traits/persistence.rs b/wfe-core/src/traits/persistence.rs new file mode 100644 index 0000000..9b00fe2 --- /dev/null +++ b/wfe-core/src/traits/persistence.rs @@ -0,0 +1,95 @@ +use async_trait::async_trait; +use chrono::{DateTime, Utc}; + +use crate::models::{ + Event, EventSubscription, ExecutionError, ScheduledCommand, WorkflowInstance, +}; + +/// Persistence for workflow instances. +#[async_trait] +pub trait WorkflowRepository: Send + Sync { + async fn create_new_workflow(&self, instance: &WorkflowInstance) -> crate::Result; + async fn persist_workflow(&self, instance: &WorkflowInstance) -> crate::Result<()>; + async fn persist_workflow_with_subscriptions( + &self, + instance: &WorkflowInstance, + subscriptions: &[EventSubscription], + ) -> crate::Result<()>; + async fn get_runnable_instances(&self, as_at: DateTime) -> crate::Result>; + async fn get_workflow_instance(&self, id: &str) -> crate::Result; + async fn get_workflow_instances(&self, ids: &[String]) -> crate::Result>; +} + +/// Persistence for event subscriptions. +#[async_trait] +pub trait SubscriptionRepository: Send + Sync { + async fn create_event_subscription( + &self, + subscription: &EventSubscription, + ) -> crate::Result; + async fn get_subscriptions( + &self, + event_name: &str, + event_key: &str, + as_of: DateTime, + ) -> crate::Result>; + async fn terminate_subscription(&self, subscription_id: &str) -> crate::Result<()>; + async fn get_subscription(&self, subscription_id: &str) -> crate::Result; + async fn get_first_open_subscription( + &self, + event_name: &str, + event_key: &str, + as_of: DateTime, + ) -> crate::Result>; + async fn set_subscription_token( + &self, + subscription_id: &str, + token: &str, + worker_id: &str, + expiry: DateTime, + ) -> crate::Result; + async fn clear_subscription_token( + &self, + subscription_id: &str, + token: &str, + ) -> crate::Result<()>; +} + +/// Persistence for events. +#[async_trait] +pub trait EventRepository: Send + Sync { + async fn create_event(&self, event: &Event) -> crate::Result; + async fn get_event(&self, id: &str) -> crate::Result; + async fn get_runnable_events(&self, as_at: DateTime) -> crate::Result>; + async fn get_events( + &self, + event_name: &str, + event_key: &str, + as_of: DateTime, + ) -> crate::Result>; + async fn mark_event_processed(&self, id: &str) -> crate::Result<()>; + async fn mark_event_unprocessed(&self, id: &str) -> crate::Result<()>; +} + +/// Persistence for scheduled commands. +#[async_trait] +pub trait ScheduledCommandRepository: Send + Sync { + fn supports_scheduled_commands(&self) -> bool; + async fn schedule_command(&self, command: &ScheduledCommand) -> crate::Result<()>; + async fn process_commands( + &self, + as_of: DateTime, + handler: &(dyn Fn(ScheduledCommand) -> std::pin::Pin> + Send>> + + Send + + Sync), + ) -> crate::Result<()>; +} + +/// Composite persistence provider combining all repository traits. +#[async_trait] +pub trait PersistenceProvider: + WorkflowRepository + EventRepository + SubscriptionRepository + ScheduledCommandRepository +{ + async fn persist_errors(&self, errors: &[ExecutionError]) -> crate::Result<()>; + async fn ensure_store_exists(&self) -> crate::Result<()>; +} diff --git a/wfe-core/src/traits/queue.rs b/wfe-core/src/traits/queue.rs new file mode 100644 index 0000000..c221f18 --- /dev/null +++ b/wfe-core/src/traits/queue.rs @@ -0,0 +1,13 @@ +use async_trait::async_trait; + +use crate::models::QueueType; + +/// Queue provider for distributing workflow execution across workers. +#[async_trait] +pub trait QueueProvider: Send + Sync { + async fn queue_work(&self, id: &str, queue: QueueType) -> crate::Result<()>; + async fn dequeue_work(&self, queue: QueueType) -> crate::Result>; + fn is_dequeue_blocking(&self) -> bool; + async fn start(&self) -> crate::Result<()>; + async fn stop(&self) -> crate::Result<()>; +} diff --git a/wfe-core/src/traits/registry.rs b/wfe-core/src/traits/registry.rs new file mode 100644 index 0000000..f9a6e7a --- /dev/null +++ b/wfe-core/src/traits/registry.rs @@ -0,0 +1,10 @@ +use crate::models::WorkflowDefinition; + +/// Registry for workflow definitions with version support. +pub trait WorkflowRegistry: Send + Sync { + fn register(&mut self, definition: WorkflowDefinition); + fn get_definition(&self, id: &str, version: Option) -> Option<&WorkflowDefinition>; + fn is_registered(&self, id: &str, version: u32) -> bool; + fn deregister(&mut self, id: &str, version: u32) -> bool; + fn get_all_definitions(&self) -> Vec<&WorkflowDefinition>; +} diff --git a/wfe-core/src/traits/search.rs b/wfe-core/src/traits/search.rs new file mode 100644 index 0000000..829410d --- /dev/null +++ b/wfe-core/src/traits/search.rs @@ -0,0 +1,48 @@ +use async_trait::async_trait; + +use crate::models::WorkflowInstance; + +/// Result from a search query. +#[derive(Debug, Clone)] +pub struct WorkflowSearchResult { + pub id: String, + pub workflow_definition_id: String, + pub version: u32, + pub status: crate::models::WorkflowStatus, + pub reference: Option, + pub description: Option, +} + +/// Filter for search queries. +#[derive(Debug, Clone)] +pub enum SearchFilter { + Status(crate::models::WorkflowStatus), + DateRange { + field: String, + before: Option>, + after: Option>, + }, + Reference(String), +} + +/// Paginated search results. +#[derive(Debug, Clone)] +pub struct Page { + pub data: Vec, + pub total: u64, +} + +/// Search index for querying workflows. +#[async_trait] +pub trait SearchIndex: Send + Sync { + async fn index_workflow(&self, instance: &WorkflowInstance) -> crate::Result<()>; + async fn search( + &self, + terms: &str, + skip: u64, + take: u64, + filters: &[SearchFilter], + ) -> crate::Result>; + async fn start(&self) -> crate::Result<()>; + async fn stop(&self) -> crate::Result<()>; +} diff --git a/wfe-core/src/traits/step.rs b/wfe-core/src/traits/step.rs new file mode 100644 index 0000000..47b4b14 --- /dev/null +++ b/wfe-core/src/traits/step.rs @@ -0,0 +1,35 @@ +use async_trait::async_trait; +use serde::de::DeserializeOwned; +use serde::Serialize; + +use crate::models::{ExecutionPointer, ExecutionResult, WorkflowInstance, WorkflowStep}; + +/// Marker trait for all data types that flow between workflow steps. +/// Anything that is serializable and deserializable qualifies. +pub trait WorkflowData: Serialize + DeserializeOwned + Send + Sync + Clone + 'static {} + +/// Blanket implementation: any type satisfying the bounds is WorkflowData. +impl WorkflowData for T where T: Serialize + DeserializeOwned + Send + Sync + Clone + 'static {} + +/// Context available to a step during execution. +#[derive(Debug)] +pub struct StepExecutionContext<'a> { + /// The current item when iterating (ForEach). + pub item: Option<&'a serde_json::Value>, + /// The current execution pointer. + pub execution_pointer: &'a ExecutionPointer, + /// Persistence data from a previous execution of this step. + pub persistence_data: Option<&'a serde_json::Value>, + /// The step definition. + pub step: &'a WorkflowStep, + /// The running workflow instance. + pub workflow: &'a WorkflowInstance, + /// Cancellation token. + pub cancellation_token: tokio_util::sync::CancellationToken, +} + +/// The core unit of work in a workflow. Each step implements this trait. +#[async_trait] +pub trait StepBody: Send + Sync { + async fn run(&mut self, context: &StepExecutionContext<'_>) -> crate::Result; +}