feat(wfe-core): add models, traits, and error types
Core domain models: WorkflowInstance, ExecutionPointer, WorkflowDefinition, WorkflowStep, Event, EventSubscription, ScheduledCommand, ExecutionError, LifecycleEvent, PollEndpointConfig. All serde-serializable. Provider traits: PersistenceProvider (composite of WorkflowRepository, EventRepository, SubscriptionRepository, ScheduledCommandRepository), DistributedLockProvider, QueueProvider, SearchIndex, LifecyclePublisher, WorkflowMiddleware, StepMiddleware, WorkflowRegistry. StepBody trait with StepExecutionContext for workflow step implementations. WorkflowData marker trait (blanket impl for Serialize + DeserializeOwned).
This commit is contained in:
26
wfe-core/Cargo.toml
Normal file
26
wfe-core/Cargo.toml
Normal file
@@ -0,0 +1,26 @@
|
||||
[package]
|
||||
name = "wfe-core"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
description = "Core traits, models, builder, and executor for the WFE workflow engine"
|
||||
|
||||
[features]
|
||||
default = []
|
||||
test-support = []
|
||||
|
||||
[dependencies]
|
||||
tokio = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
tokio-util = "0.7"
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions = { workspace = true }
|
||||
rstest = { workspace = true }
|
||||
tokio = { workspace = true, features = ["test-util"] }
|
||||
39
wfe-core/src/error.rs
Normal file
39
wfe-core/src/error.rs
Normal file
@@ -0,0 +1,39 @@
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum WfeError {
|
||||
#[error("Workflow not found: {0}")]
|
||||
WorkflowNotFound(String),
|
||||
|
||||
#[error("Workflow definition not found: {id} v{version}")]
|
||||
DefinitionNotFound { id: String, version: u32 },
|
||||
|
||||
#[error("Event not found: {0}")]
|
||||
EventNotFound(String),
|
||||
|
||||
#[error("Subscription not found: {0}")]
|
||||
SubscriptionNotFound(String),
|
||||
|
||||
#[error("Step not found: {0}")]
|
||||
StepNotFound(usize),
|
||||
|
||||
#[error("Lock acquisition failed for: {0}")]
|
||||
LockFailed(String),
|
||||
|
||||
#[error("Persistence error: {0}")]
|
||||
Persistence(String),
|
||||
|
||||
#[error("Serialization error: {0}")]
|
||||
Serialization(#[from] serde_json::Error),
|
||||
|
||||
#[error("Step execution error: {0}")]
|
||||
StepExecution(String),
|
||||
|
||||
#[error("Workflow cancelled")]
|
||||
Cancelled,
|
||||
|
||||
#[error(transparent)]
|
||||
Other(#[from] Box<dyn std::error::Error + Send + Sync>),
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, WfeError>;
|
||||
11
wfe-core/src/lib.rs
Normal file
11
wfe-core/src/lib.rs
Normal file
@@ -0,0 +1,11 @@
|
||||
pub mod builder;
|
||||
pub mod error;
|
||||
pub mod executor;
|
||||
pub mod models;
|
||||
pub mod primitives;
|
||||
pub mod traits;
|
||||
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
pub mod test_support;
|
||||
|
||||
pub use error::{Result, WfeError};
|
||||
80
wfe-core/src/models/error_behavior.rs
Normal file
80
wfe-core/src/models/error_behavior.rs
Normal file
@@ -0,0 +1,80 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub enum ErrorBehavior {
|
||||
Retry {
|
||||
#[serde(with = "duration_millis")]
|
||||
interval: Duration,
|
||||
#[serde(default = "default_max_retries")]
|
||||
max_retries: u32,
|
||||
},
|
||||
Suspend,
|
||||
Terminate,
|
||||
Compensate,
|
||||
}
|
||||
|
||||
fn default_max_retries() -> u32 {
|
||||
3
|
||||
}
|
||||
|
||||
impl Default for ErrorBehavior {
|
||||
fn default() -> Self {
|
||||
Self::Retry {
|
||||
interval: Duration::from_secs(60),
|
||||
max_retries: default_max_retries(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mod duration_millis {
|
||||
use std::time::Duration;
|
||||
|
||||
use serde::{Deserialize, Deserializer, Serializer};
|
||||
|
||||
pub fn serialize<S: Serializer>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error> {
|
||||
serializer.serialize_u64(duration.as_millis() as u64)
|
||||
}
|
||||
|
||||
pub fn deserialize<'de, D: Deserializer<'de>>(deserializer: D) -> Result<Duration, D::Error> {
|
||||
let millis = u64::deserialize(deserializer)?;
|
||||
Ok(Duration::from_millis(millis))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn default_is_retry_60s() {
|
||||
let behavior = ErrorBehavior::default();
|
||||
assert_eq!(
|
||||
behavior,
|
||||
ErrorBehavior::Retry {
|
||||
interval: Duration::from_secs(60),
|
||||
max_retries: 3,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde_round_trip() {
|
||||
let variants = vec![
|
||||
ErrorBehavior::Retry {
|
||||
interval: Duration::from_secs(30),
|
||||
max_retries: 3,
|
||||
},
|
||||
ErrorBehavior::Suspend,
|
||||
ErrorBehavior::Terminate,
|
||||
ErrorBehavior::Compensate,
|
||||
];
|
||||
for variant in variants {
|
||||
let json = serde_json::to_string(&variant).unwrap();
|
||||
let deserialized: ErrorBehavior = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(variant, deserialized);
|
||||
}
|
||||
}
|
||||
}
|
||||
119
wfe-core/src/models/event.rs
Normal file
119
wfe-core/src/models/event.rs
Normal file
@@ -0,0 +1,119 @@
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Event {
|
||||
pub id: String,
|
||||
pub event_name: String,
|
||||
pub event_key: String,
|
||||
pub event_data: serde_json::Value,
|
||||
pub event_time: DateTime<Utc>,
|
||||
pub is_processed: bool,
|
||||
}
|
||||
|
||||
impl Event {
|
||||
pub fn new(
|
||||
event_name: impl Into<String>,
|
||||
event_key: impl Into<String>,
|
||||
event_data: serde_json::Value,
|
||||
) -> Self {
|
||||
Self {
|
||||
id: uuid::Uuid::new_v4().to_string(),
|
||||
event_name: event_name.into(),
|
||||
event_key: event_key.into(),
|
||||
event_data,
|
||||
event_time: Utc::now(),
|
||||
is_processed: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct EventSubscription {
|
||||
pub id: String,
|
||||
pub workflow_id: String,
|
||||
pub step_id: usize,
|
||||
pub execution_pointer_id: String,
|
||||
pub event_name: String,
|
||||
pub event_key: String,
|
||||
pub subscribe_as_of: DateTime<Utc>,
|
||||
pub subscription_data: Option<serde_json::Value>,
|
||||
pub external_token: Option<String>,
|
||||
pub external_worker_id: Option<String>,
|
||||
pub external_token_expiry: Option<DateTime<Utc>>,
|
||||
}
|
||||
|
||||
impl EventSubscription {
|
||||
pub fn new(
|
||||
workflow_id: impl Into<String>,
|
||||
step_id: usize,
|
||||
execution_pointer_id: impl Into<String>,
|
||||
event_name: impl Into<String>,
|
||||
event_key: impl Into<String>,
|
||||
subscribe_as_of: DateTime<Utc>,
|
||||
) -> Self {
|
||||
Self {
|
||||
id: uuid::Uuid::new_v4().to_string(),
|
||||
workflow_id: workflow_id.into(),
|
||||
step_id,
|
||||
execution_pointer_id: execution_pointer_id.into(),
|
||||
event_name: event_name.into(),
|
||||
event_key: event_key.into(),
|
||||
subscribe_as_of,
|
||||
subscription_data: None,
|
||||
external_token: None,
|
||||
external_worker_id: None,
|
||||
external_token_expiry: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn new_event_defaults() {
|
||||
let event = Event::new("order.created", "order-456", serde_json::json!({"amount": 100}));
|
||||
assert_eq!(event.event_name, "order.created");
|
||||
assert_eq!(event.event_key, "order-456");
|
||||
assert!(!event.is_processed);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn new_event_generates_unique_ids() {
|
||||
let e1 = Event::new("test", "key", serde_json::json!(null));
|
||||
let e2 = Event::new("test", "key", serde_json::json!(null));
|
||||
assert_ne!(e1.id, e2.id);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_serde_round_trip() {
|
||||
let event = Event::new("test", "key", serde_json::json!({"data": true}));
|
||||
let json = serde_json::to_string(&event).unwrap();
|
||||
let deserialized: Event = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(event.id, deserialized.id);
|
||||
assert_eq!(event.event_name, deserialized.event_name);
|
||||
assert_eq!(event.event_data, deserialized.event_data);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn new_subscription_defaults() {
|
||||
let sub = EventSubscription::new("wf-1", 0, "ptr-1", "evt", "key", Utc::now());
|
||||
assert_eq!(sub.workflow_id, "wf-1");
|
||||
assert_eq!(sub.step_id, 0);
|
||||
assert!(sub.external_token.is_none());
|
||||
assert!(sub.subscription_data.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn subscription_serde_round_trip() {
|
||||
let sub = EventSubscription::new("wf-1", 2, "ptr-1", "evt", "key", Utc::now());
|
||||
let json = serde_json::to_string(&sub).unwrap();
|
||||
let deserialized: EventSubscription = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(sub.id, deserialized.id);
|
||||
assert_eq!(sub.workflow_id, deserialized.workflow_id);
|
||||
assert_eq!(sub.event_name, deserialized.event_name);
|
||||
}
|
||||
}
|
||||
48
wfe-core/src/models/execution_error.rs
Normal file
48
wfe-core/src/models/execution_error.rs
Normal file
@@ -0,0 +1,48 @@
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ExecutionError {
|
||||
pub error_time: DateTime<Utc>,
|
||||
pub workflow_id: String,
|
||||
pub execution_pointer_id: String,
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
impl ExecutionError {
|
||||
pub fn new(
|
||||
workflow_id: impl Into<String>,
|
||||
execution_pointer_id: impl Into<String>,
|
||||
message: impl Into<String>,
|
||||
) -> Self {
|
||||
Self {
|
||||
error_time: Utc::now(),
|
||||
workflow_id: workflow_id.into(),
|
||||
execution_pointer_id: execution_pointer_id.into(),
|
||||
message: message.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn new_error_captures_fields() {
|
||||
let err = ExecutionError::new("wf-1", "ptr-1", "something went wrong");
|
||||
assert_eq!(err.workflow_id, "wf-1");
|
||||
assert_eq!(err.execution_pointer_id, "ptr-1");
|
||||
assert_eq!(err.message, "something went wrong");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde_round_trip() {
|
||||
let err = ExecutionError::new("wf-1", "ptr-1", "fail");
|
||||
let json = serde_json::to_string(&err).unwrap();
|
||||
let deserialized: ExecutionError = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(err.workflow_id, deserialized.workflow_id);
|
||||
assert_eq!(err.message, deserialized.message);
|
||||
}
|
||||
}
|
||||
101
wfe-core/src/models/execution_pointer.rs
Normal file
101
wfe-core/src/models/execution_pointer.rs
Normal file
@@ -0,0 +1,101 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::status::PointerStatus;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ExecutionPointer {
|
||||
pub id: String,
|
||||
pub step_id: usize,
|
||||
pub active: bool,
|
||||
pub status: PointerStatus,
|
||||
pub sleep_until: Option<DateTime<Utc>>,
|
||||
pub persistence_data: Option<serde_json::Value>,
|
||||
pub start_time: Option<DateTime<Utc>>,
|
||||
pub end_time: Option<DateTime<Utc>>,
|
||||
pub event_name: Option<String>,
|
||||
pub event_key: Option<String>,
|
||||
pub event_published: bool,
|
||||
pub event_data: Option<serde_json::Value>,
|
||||
pub step_name: Option<String>,
|
||||
pub retry_count: u32,
|
||||
pub children: Vec<String>,
|
||||
pub context_item: Option<serde_json::Value>,
|
||||
pub predecessor_id: Option<String>,
|
||||
pub outcome: Option<serde_json::Value>,
|
||||
pub scope: Vec<String>,
|
||||
pub extension_attributes: HashMap<String, serde_json::Value>,
|
||||
}
|
||||
|
||||
impl ExecutionPointer {
|
||||
pub fn new(step_id: usize) -> Self {
|
||||
Self {
|
||||
id: uuid::Uuid::new_v4().to_string(),
|
||||
step_id,
|
||||
active: true,
|
||||
status: PointerStatus::Pending,
|
||||
sleep_until: None,
|
||||
persistence_data: None,
|
||||
start_time: None,
|
||||
end_time: None,
|
||||
event_name: None,
|
||||
event_key: None,
|
||||
event_published: false,
|
||||
event_data: None,
|
||||
step_name: None,
|
||||
retry_count: 0,
|
||||
children: Vec::new(),
|
||||
context_item: None,
|
||||
predecessor_id: None,
|
||||
outcome: None,
|
||||
scope: Vec::new(),
|
||||
extension_attributes: HashMap::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn new_pointer_has_correct_defaults() {
|
||||
let pointer = ExecutionPointer::new(0);
|
||||
assert_eq!(pointer.step_id, 0);
|
||||
assert!(pointer.active);
|
||||
assert_eq!(pointer.status, PointerStatus::Pending);
|
||||
assert_eq!(pointer.retry_count, 0);
|
||||
assert!(pointer.children.is_empty());
|
||||
assert!(pointer.scope.is_empty());
|
||||
assert!(!pointer.event_published);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn new_pointer_generates_unique_ids() {
|
||||
let p1 = ExecutionPointer::new(0);
|
||||
let p2 = ExecutionPointer::new(0);
|
||||
assert_ne!(p1.id, p2.id);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde_round_trip() {
|
||||
let mut pointer = ExecutionPointer::new(3);
|
||||
pointer.status = PointerStatus::Running;
|
||||
pointer.retry_count = 2;
|
||||
pointer.persistence_data = Some(serde_json::json!({"step_state": true}));
|
||||
pointer.children = vec!["child-1".into(), "child-2".into()];
|
||||
|
||||
let json = serde_json::to_string(&pointer).unwrap();
|
||||
let deserialized: ExecutionPointer = serde_json::from_str(&json).unwrap();
|
||||
|
||||
assert_eq!(pointer.id, deserialized.id);
|
||||
assert_eq!(pointer.step_id, deserialized.step_id);
|
||||
assert_eq!(pointer.status, deserialized.status);
|
||||
assert_eq!(pointer.retry_count, deserialized.retry_count);
|
||||
assert_eq!(pointer.persistence_data, deserialized.persistence_data);
|
||||
assert_eq!(pointer.children, deserialized.children);
|
||||
}
|
||||
}
|
||||
188
wfe-core/src/models/execution_result.rs
Normal file
188
wfe-core/src/models/execution_result.rs
Normal file
@@ -0,0 +1,188 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::poll_config::PollEndpointConfig;
|
||||
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
||||
pub struct ExecutionResult {
|
||||
/// Whether the workflow should proceed to the next step.
|
||||
pub proceed: bool,
|
||||
/// Outcome value for decision-based routing.
|
||||
pub outcome_value: Option<serde_json::Value>,
|
||||
/// Duration to sleep before re-executing.
|
||||
#[serde(default, with = "super::option_duration_millis")]
|
||||
pub sleep_for: Option<Duration>,
|
||||
/// Step-specific state to persist between executions.
|
||||
pub persistence_data: Option<serde_json::Value>,
|
||||
/// Event name to wait for.
|
||||
pub event_name: Option<String>,
|
||||
/// Event key to match.
|
||||
pub event_key: Option<String>,
|
||||
/// Only consider events published after this time.
|
||||
pub event_as_of: Option<DateTime<Utc>>,
|
||||
/// Values to branch execution on (for ForEach/Parallel).
|
||||
pub branch_values: Option<Vec<serde_json::Value>>,
|
||||
/// Poll endpoint configuration for external service polling.
|
||||
pub poll_endpoint: Option<PollEndpointConfig>,
|
||||
}
|
||||
|
||||
impl ExecutionResult {
|
||||
/// Continue to the next step.
|
||||
pub fn next() -> Self {
|
||||
Self {
|
||||
proceed: true,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
/// Continue with an outcome value for decision routing.
|
||||
pub fn outcome(value: impl Into<serde_json::Value>) -> Self {
|
||||
Self {
|
||||
proceed: true,
|
||||
outcome_value: Some(value.into()),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
/// Pause execution and persist step-specific data.
|
||||
pub fn persist(data: serde_json::Value) -> Self {
|
||||
Self {
|
||||
proceed: false,
|
||||
persistence_data: Some(data),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
/// Create child branches for parallel/foreach execution.
|
||||
pub fn branch(values: Vec<serde_json::Value>, persistence_data: Option<serde_json::Value>) -> Self {
|
||||
Self {
|
||||
proceed: false,
|
||||
branch_values: Some(values),
|
||||
persistence_data,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
/// Sleep for a duration before re-executing.
|
||||
pub fn sleep(duration: Duration, persistence_data: Option<serde_json::Value>) -> Self {
|
||||
Self {
|
||||
proceed: false,
|
||||
sleep_for: Some(duration),
|
||||
persistence_data,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
/// Wait for an external event.
|
||||
pub fn wait_for_event(
|
||||
event_name: impl Into<String>,
|
||||
event_key: impl Into<String>,
|
||||
as_of: DateTime<Utc>,
|
||||
) -> Self {
|
||||
Self {
|
||||
proceed: false,
|
||||
event_name: Some(event_name.into()),
|
||||
event_key: Some(event_key.into()),
|
||||
event_as_of: Some(as_of),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
/// Poll an external endpoint until a condition is met.
|
||||
pub fn poll_endpoint(config: PollEndpointConfig) -> Self {
|
||||
Self {
|
||||
proceed: false,
|
||||
poll_endpoint: Some(config),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn next_proceeds_with_no_data() {
|
||||
let result = ExecutionResult::next();
|
||||
assert!(result.proceed);
|
||||
assert!(result.outcome_value.is_none());
|
||||
assert!(result.sleep_for.is_none());
|
||||
assert!(result.persistence_data.is_none());
|
||||
assert!(result.event_name.is_none());
|
||||
assert!(result.branch_values.is_none());
|
||||
assert!(result.poll_endpoint.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn outcome_proceeds_with_value() {
|
||||
let result = ExecutionResult::outcome(serde_json::json!(42));
|
||||
assert!(result.proceed);
|
||||
assert_eq!(result.outcome_value, Some(serde_json::json!(42)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn persist_does_not_proceed() {
|
||||
let data = serde_json::json!({"counter": 5});
|
||||
let result = ExecutionResult::persist(data.clone());
|
||||
assert!(!result.proceed);
|
||||
assert_eq!(result.persistence_data, Some(data));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn branch_creates_child_values() {
|
||||
let values = vec![serde_json::json!(1), serde_json::json!(2), serde_json::json!(3)];
|
||||
let result = ExecutionResult::branch(values.clone(), None);
|
||||
assert!(!result.proceed);
|
||||
assert_eq!(result.branch_values, Some(values));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn sleep_sets_duration() {
|
||||
let result = ExecutionResult::sleep(Duration::from_secs(30), None);
|
||||
assert!(!result.proceed);
|
||||
assert_eq!(result.sleep_for, Some(Duration::from_secs(30)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn wait_for_event_sets_event_fields() {
|
||||
let now = Utc::now();
|
||||
let result = ExecutionResult::wait_for_event("order.completed", "order-123", now);
|
||||
assert!(!result.proceed);
|
||||
assert_eq!(result.event_name.as_deref(), Some("order.completed"));
|
||||
assert_eq!(result.event_key.as_deref(), Some("order-123"));
|
||||
assert_eq!(result.event_as_of, Some(now));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn poll_endpoint_sets_config() {
|
||||
use super::super::poll_config::*;
|
||||
use std::collections::HashMap;
|
||||
|
||||
let config = PollEndpointConfig {
|
||||
url: "https://api.example.com/status".into(),
|
||||
method: HttpMethod::Get,
|
||||
headers: HashMap::new(),
|
||||
body: None,
|
||||
interval: Duration::from_secs(10),
|
||||
timeout: Duration::from_secs(300),
|
||||
condition: PollCondition::StatusCode(200),
|
||||
};
|
||||
let result = ExecutionResult::poll_endpoint(config.clone());
|
||||
assert!(!result.proceed);
|
||||
assert_eq!(result.poll_endpoint, Some(config));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde_round_trip() {
|
||||
let result = ExecutionResult::sleep(Duration::from_secs(30), Some(serde_json::json!({"x": 1})));
|
||||
let json = serde_json::to_string(&result).unwrap();
|
||||
let deserialized: ExecutionResult = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(result.proceed, deserialized.proceed);
|
||||
assert_eq!(result.sleep_for, deserialized.sleep_for);
|
||||
assert_eq!(result.persistence_data, deserialized.persistence_data);
|
||||
}
|
||||
}
|
||||
104
wfe-core/src/models/lifecycle.rs
Normal file
104
wfe-core/src/models/lifecycle.rs
Normal file
@@ -0,0 +1,104 @@
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct LifecycleEvent {
|
||||
pub event_time_utc: DateTime<Utc>,
|
||||
pub workflow_instance_id: String,
|
||||
pub workflow_definition_id: String,
|
||||
pub version: u32,
|
||||
pub reference: Option<String>,
|
||||
pub event_type: LifecycleEventType,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub enum LifecycleEventType {
|
||||
Started,
|
||||
Resumed,
|
||||
Suspended,
|
||||
Completed,
|
||||
Terminated,
|
||||
Error { message: String },
|
||||
StepStarted { step_id: usize, step_name: Option<String> },
|
||||
StepCompleted { step_id: usize, step_name: Option<String> },
|
||||
}
|
||||
|
||||
impl LifecycleEvent {
|
||||
pub fn new(
|
||||
workflow_instance_id: impl Into<String>,
|
||||
workflow_definition_id: impl Into<String>,
|
||||
version: u32,
|
||||
event_type: LifecycleEventType,
|
||||
) -> Self {
|
||||
Self {
|
||||
event_time_utc: Utc::now(),
|
||||
workflow_instance_id: workflow_instance_id.into(),
|
||||
workflow_definition_id: workflow_definition_id.into(),
|
||||
version,
|
||||
reference: None,
|
||||
event_type,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn lifecycle_event_types_are_distinct() {
|
||||
assert_ne!(LifecycleEventType::Started, LifecycleEventType::Completed);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde_round_trip_simple_variant() {
|
||||
let event = LifecycleEvent::new("wf-1", "def-1", 1, LifecycleEventType::Started);
|
||||
let json = serde_json::to_string(&event).unwrap();
|
||||
let deserialized: LifecycleEvent = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(event.workflow_instance_id, deserialized.workflow_instance_id);
|
||||
assert_eq!(event.event_type, deserialized.event_type);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde_round_trip_error_variant() {
|
||||
let event = LifecycleEvent::new(
|
||||
"wf-1",
|
||||
"def-1",
|
||||
1,
|
||||
LifecycleEventType::Error {
|
||||
message: "boom".into(),
|
||||
},
|
||||
);
|
||||
let json = serde_json::to_string(&event).unwrap();
|
||||
let deserialized: LifecycleEvent = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(
|
||||
deserialized.event_type,
|
||||
LifecycleEventType::Error {
|
||||
message: "boom".into()
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde_round_trip_step_variant() {
|
||||
let event = LifecycleEvent::new(
|
||||
"wf-1",
|
||||
"def-1",
|
||||
1,
|
||||
LifecycleEventType::StepStarted {
|
||||
step_id: 3,
|
||||
step_name: Some("ProcessOrder".into()),
|
||||
},
|
||||
);
|
||||
let json = serde_json::to_string(&event).unwrap();
|
||||
let deserialized: LifecycleEvent = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(
|
||||
deserialized.event_type,
|
||||
LifecycleEventType::StepStarted {
|
||||
step_id: 3,
|
||||
step_name: Some("ProcessOrder".into()),
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
65
wfe-core/src/models/mod.rs
Normal file
65
wfe-core/src/models/mod.rs
Normal file
@@ -0,0 +1,65 @@
|
||||
pub mod error_behavior;
|
||||
pub mod event;
|
||||
pub mod execution_error;
|
||||
pub mod execution_pointer;
|
||||
pub mod execution_result;
|
||||
pub mod lifecycle;
|
||||
pub mod poll_config;
|
||||
pub mod queue_type;
|
||||
pub mod scheduled_command;
|
||||
pub mod status;
|
||||
pub mod workflow_definition;
|
||||
pub mod workflow_instance;
|
||||
|
||||
pub use error_behavior::ErrorBehavior;
|
||||
pub use event::{Event, EventSubscription};
|
||||
pub use execution_error::ExecutionError;
|
||||
pub use execution_pointer::ExecutionPointer;
|
||||
pub use execution_result::ExecutionResult;
|
||||
pub use lifecycle::{LifecycleEvent, LifecycleEventType};
|
||||
pub use poll_config::{HttpMethod, PollCondition, PollEndpointConfig};
|
||||
pub use queue_type::QueueType;
|
||||
pub use scheduled_command::{CommandName, ScheduledCommand};
|
||||
pub use status::{PointerStatus, WorkflowStatus};
|
||||
pub use workflow_definition::{StepOutcome, WorkflowDefinition, WorkflowStep};
|
||||
pub use workflow_instance::WorkflowInstance;
|
||||
|
||||
/// Serde helper for `Option<Duration>` as milliseconds.
|
||||
pub(crate) mod option_duration_millis {
|
||||
use std::time::Duration;
|
||||
|
||||
use serde::{Deserialize, Deserializer, Serializer};
|
||||
|
||||
pub fn serialize<S: Serializer>(
|
||||
duration: &Option<Duration>,
|
||||
serializer: S,
|
||||
) -> Result<S::Ok, S::Error> {
|
||||
match duration {
|
||||
Some(d) => serializer.serialize_some(&(d.as_millis() as u64)),
|
||||
None => serializer.serialize_none(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn deserialize<'de, D: Deserializer<'de>>(
|
||||
deserializer: D,
|
||||
) -> Result<Option<Duration>, D::Error> {
|
||||
let millis: Option<u64> = Option::deserialize(deserializer)?;
|
||||
Ok(millis.map(Duration::from_millis))
|
||||
}
|
||||
}
|
||||
|
||||
/// Serde helper for `Duration` as milliseconds (non-optional).
|
||||
pub(crate) mod duration_millis {
|
||||
use std::time::Duration;
|
||||
|
||||
use serde::{Deserialize, Deserializer, Serializer};
|
||||
|
||||
pub fn serialize<S: Serializer>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error> {
|
||||
serializer.serialize_u64(duration.as_millis() as u64)
|
||||
}
|
||||
|
||||
pub fn deserialize<'de, D: Deserializer<'de>>(deserializer: D) -> Result<Duration, D::Error> {
|
||||
let millis = u64::deserialize(deserializer)?;
|
||||
Ok(Duration::from_millis(millis))
|
||||
}
|
||||
}
|
||||
83
wfe-core/src/models/poll_config.rs
Normal file
83
wfe-core/src/models/poll_config.rs
Normal file
@@ -0,0 +1,83 @@
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub enum HttpMethod {
|
||||
Get,
|
||||
Post,
|
||||
Put,
|
||||
Patch,
|
||||
Delete,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub enum PollCondition {
|
||||
/// Check a JSON path equals a value: e.g. JsonPathEquals("$.status", "complete")
|
||||
JsonPathEquals {
|
||||
path: String,
|
||||
value: serde_json::Value,
|
||||
},
|
||||
/// Check HTTP status code
|
||||
StatusCode(u16),
|
||||
/// Check response body contains string
|
||||
BodyContains(String),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
pub struct PollEndpointConfig {
|
||||
/// URL template. Supports `{placeholder}` interpolation from workflow data.
|
||||
pub url: String,
|
||||
pub method: HttpMethod,
|
||||
#[serde(default)]
|
||||
pub headers: HashMap<String, String>,
|
||||
#[serde(default)]
|
||||
pub body: Option<serde_json::Value>,
|
||||
#[serde(with = "super::duration_millis")]
|
||||
pub interval: Duration,
|
||||
#[serde(with = "super::duration_millis")]
|
||||
pub timeout: Duration,
|
||||
pub condition: PollCondition,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn poll_config_serde_round_trip() {
|
||||
let config = PollEndpointConfig {
|
||||
url: "https://api.example.com/status/{id}".into(),
|
||||
method: HttpMethod::Get,
|
||||
headers: HashMap::from([("Authorization".into(), "Bearer token123".into())]),
|
||||
body: None,
|
||||
interval: Duration::from_secs(30),
|
||||
timeout: Duration::from_secs(3600),
|
||||
condition: PollCondition::JsonPathEquals {
|
||||
path: "$.status".into(),
|
||||
value: serde_json::Value::String("complete".into()),
|
||||
},
|
||||
};
|
||||
let json = serde_json::to_string(&config).unwrap();
|
||||
let deserialized: PollEndpointConfig = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(config, deserialized);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn poll_condition_status_code() {
|
||||
let cond = PollCondition::StatusCode(200);
|
||||
let json = serde_json::to_string(&cond).unwrap();
|
||||
let deserialized: PollCondition = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(cond, deserialized);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn poll_condition_body_contains() {
|
||||
let cond = PollCondition::BodyContains("success".into());
|
||||
let json = serde_json::to_string(&cond).unwrap();
|
||||
let deserialized: PollCondition = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(cond, deserialized);
|
||||
}
|
||||
}
|
||||
30
wfe-core/src/models/queue_type.rs
Normal file
30
wfe-core/src/models/queue_type.rs
Normal file
@@ -0,0 +1,30 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
pub enum QueueType {
|
||||
Workflow,
|
||||
Event,
|
||||
Index,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn queue_types_are_distinct() {
|
||||
assert_ne!(QueueType::Workflow, QueueType::Event);
|
||||
assert_ne!(QueueType::Event, QueueType::Index);
|
||||
assert_ne!(QueueType::Workflow, QueueType::Index);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde_round_trip() {
|
||||
for qt in [QueueType::Workflow, QueueType::Event, QueueType::Index] {
|
||||
let json = serde_json::to_string(&qt).unwrap();
|
||||
let deserialized: QueueType = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(qt, deserialized);
|
||||
}
|
||||
}
|
||||
}
|
||||
65
wfe-core/src/models/scheduled_command.rs
Normal file
65
wfe-core/src/models/scheduled_command.rs
Normal file
@@ -0,0 +1,65 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub enum CommandName {
|
||||
ProcessWorkflow,
|
||||
ProcessEvent,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ScheduledCommand {
|
||||
pub command_name: CommandName,
|
||||
pub data: String,
|
||||
/// Epoch milliseconds when the command should execute.
|
||||
pub execute_time: i64,
|
||||
}
|
||||
|
||||
impl ScheduledCommand {
|
||||
pub fn process_workflow(workflow_id: impl Into<String>, execute_time: i64) -> Self {
|
||||
Self {
|
||||
command_name: CommandName::ProcessWorkflow,
|
||||
data: workflow_id.into(),
|
||||
execute_time,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn process_event(event_id: impl Into<String>, execute_time: i64) -> Self {
|
||||
Self {
|
||||
command_name: CommandName::ProcessEvent,
|
||||
data: event_id.into(),
|
||||
execute_time,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn process_workflow_factory() {
|
||||
let cmd = ScheduledCommand::process_workflow("wf-123", 1000);
|
||||
assert_eq!(cmd.command_name, CommandName::ProcessWorkflow);
|
||||
assert_eq!(cmd.data, "wf-123");
|
||||
assert_eq!(cmd.execute_time, 1000);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn process_event_factory() {
|
||||
let cmd = ScheduledCommand::process_event("evt-456", 2000);
|
||||
assert_eq!(cmd.command_name, CommandName::ProcessEvent);
|
||||
assert_eq!(cmd.data, "evt-456");
|
||||
assert_eq!(cmd.execute_time, 2000);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde_round_trip() {
|
||||
let cmd = ScheduledCommand::process_workflow("wf-1", 500);
|
||||
let json = serde_json::to_string(&cmd).unwrap();
|
||||
let deserialized: ScheduledCommand = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(cmd.command_name, deserialized.command_name);
|
||||
assert_eq!(cmd.data, deserialized.data);
|
||||
assert_eq!(cmd.execute_time, deserialized.execute_time);
|
||||
}
|
||||
}
|
||||
73
wfe-core/src/models/status.rs
Normal file
73
wfe-core/src/models/status.rs
Normal file
@@ -0,0 +1,73 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
pub enum WorkflowStatus {
|
||||
#[default]
|
||||
Runnable,
|
||||
Suspended,
|
||||
Complete,
|
||||
Terminated,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
pub enum PointerStatus {
|
||||
#[default]
|
||||
Pending,
|
||||
Running,
|
||||
Complete,
|
||||
Sleeping,
|
||||
WaitingForEvent,
|
||||
Failed,
|
||||
Compensated,
|
||||
Cancelled,
|
||||
PendingPredecessor,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn workflow_status_default_is_runnable() {
|
||||
assert_eq!(WorkflowStatus::default(), WorkflowStatus::Runnable);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pointer_status_default_is_pending() {
|
||||
assert_eq!(PointerStatus::default(), PointerStatus::Pending);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn workflow_status_serde_round_trip() {
|
||||
for status in [
|
||||
WorkflowStatus::Runnable,
|
||||
WorkflowStatus::Suspended,
|
||||
WorkflowStatus::Complete,
|
||||
WorkflowStatus::Terminated,
|
||||
] {
|
||||
let json = serde_json::to_string(&status).unwrap();
|
||||
let deserialized: WorkflowStatus = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(status, deserialized);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn pointer_status_serde_round_trip() {
|
||||
for status in [
|
||||
PointerStatus::Pending,
|
||||
PointerStatus::Running,
|
||||
PointerStatus::Complete,
|
||||
PointerStatus::Sleeping,
|
||||
PointerStatus::WaitingForEvent,
|
||||
PointerStatus::Failed,
|
||||
PointerStatus::Compensated,
|
||||
PointerStatus::Cancelled,
|
||||
PointerStatus::PendingPredecessor,
|
||||
] {
|
||||
let json = serde_json::to_string(&status).unwrap();
|
||||
let deserialized: PointerStatus = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(status, deserialized);
|
||||
}
|
||||
}
|
||||
}
|
||||
121
wfe-core/src/models/workflow_definition.rs
Normal file
121
wfe-core/src/models/workflow_definition.rs
Normal file
@@ -0,0 +1,121 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::error_behavior::ErrorBehavior;
|
||||
|
||||
/// A compiled workflow definition ready for execution.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct WorkflowDefinition {
|
||||
pub id: String,
|
||||
pub version: u32,
|
||||
pub description: Option<String>,
|
||||
pub steps: Vec<WorkflowStep>,
|
||||
pub default_error_behavior: ErrorBehavior,
|
||||
#[serde(default, with = "super::option_duration_millis")]
|
||||
pub default_error_retry_interval: Option<Duration>,
|
||||
}
|
||||
|
||||
impl WorkflowDefinition {
|
||||
pub fn new(id: impl Into<String>, version: u32) -> Self {
|
||||
Self {
|
||||
id: id.into(),
|
||||
version,
|
||||
description: None,
|
||||
steps: Vec::new(),
|
||||
default_error_behavior: ErrorBehavior::default(),
|
||||
default_error_retry_interval: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A single step in a workflow definition.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct WorkflowStep {
|
||||
pub id: usize,
|
||||
pub name: Option<String>,
|
||||
pub external_id: Option<String>,
|
||||
pub step_type: String,
|
||||
pub children: Vec<usize>,
|
||||
pub outcomes: Vec<StepOutcome>,
|
||||
pub error_behavior: Option<ErrorBehavior>,
|
||||
pub compensation_step_id: Option<usize>,
|
||||
pub do_compensate: bool,
|
||||
#[serde(default)]
|
||||
pub saga: bool,
|
||||
/// Serializable configuration for primitive steps (e.g. event_name, duration).
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub step_config: Option<serde_json::Value>,
|
||||
}
|
||||
|
||||
impl WorkflowStep {
|
||||
pub fn new(id: usize, step_type: impl Into<String>) -> Self {
|
||||
Self {
|
||||
id,
|
||||
name: None,
|
||||
external_id: None,
|
||||
step_type: step_type.into(),
|
||||
children: Vec::new(),
|
||||
outcomes: Vec::new(),
|
||||
error_behavior: None,
|
||||
compensation_step_id: None,
|
||||
do_compensate: false,
|
||||
saga: false,
|
||||
step_config: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Routing outcome from a step.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct StepOutcome {
|
||||
pub next_step: usize,
|
||||
pub label: Option<String>,
|
||||
pub value: Option<serde_json::Value>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn definition_defaults() {
|
||||
let def = WorkflowDefinition::new("test-workflow", 1);
|
||||
assert_eq!(def.id, "test-workflow");
|
||||
assert_eq!(def.version, 1);
|
||||
assert!(def.steps.is_empty());
|
||||
assert_eq!(def.default_error_behavior, ErrorBehavior::default());
|
||||
assert!(def.default_error_retry_interval.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn step_defaults() {
|
||||
let step = WorkflowStep::new(0, "MyStep");
|
||||
assert_eq!(step.id, 0);
|
||||
assert_eq!(step.step_type, "MyStep");
|
||||
assert!(step.children.is_empty());
|
||||
assert!(step.outcomes.is_empty());
|
||||
assert!(step.error_behavior.is_none());
|
||||
assert!(step.compensation_step_id.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn definition_serde_round_trip() {
|
||||
let mut def = WorkflowDefinition::new("wf", 3);
|
||||
let mut step = WorkflowStep::new(0, "StepA");
|
||||
step.outcomes.push(StepOutcome {
|
||||
next_step: 1,
|
||||
label: Some("next".into()),
|
||||
value: None,
|
||||
});
|
||||
def.steps.push(step);
|
||||
def.steps.push(WorkflowStep::new(1, "StepB"));
|
||||
|
||||
let json = serde_json::to_string(&def).unwrap();
|
||||
let deserialized: WorkflowDefinition = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(def.id, deserialized.id);
|
||||
assert_eq!(def.steps.len(), deserialized.steps.len());
|
||||
assert_eq!(def.steps[0].outcomes[0].next_step, 1);
|
||||
}
|
||||
}
|
||||
141
wfe-core/src/models/workflow_instance.rs
Normal file
141
wfe-core/src/models/workflow_instance.rs
Normal file
@@ -0,0 +1,141 @@
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::execution_pointer::ExecutionPointer;
|
||||
use super::status::{PointerStatus, WorkflowStatus};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct WorkflowInstance {
|
||||
pub id: String,
|
||||
pub workflow_definition_id: String,
|
||||
pub version: u32,
|
||||
pub description: Option<String>,
|
||||
pub reference: Option<String>,
|
||||
pub execution_pointers: Vec<ExecutionPointer>,
|
||||
pub next_execution: Option<i64>,
|
||||
pub status: WorkflowStatus,
|
||||
pub data: serde_json::Value,
|
||||
pub create_time: DateTime<Utc>,
|
||||
pub complete_time: Option<DateTime<Utc>>,
|
||||
}
|
||||
|
||||
impl WorkflowInstance {
|
||||
pub fn new(workflow_definition_id: impl Into<String>, version: u32, data: serde_json::Value) -> Self {
|
||||
Self {
|
||||
id: uuid::Uuid::new_v4().to_string(),
|
||||
workflow_definition_id: workflow_definition_id.into(),
|
||||
version,
|
||||
description: None,
|
||||
reference: None,
|
||||
execution_pointers: Vec::new(),
|
||||
next_execution: Some(0),
|
||||
status: WorkflowStatus::Runnable,
|
||||
data,
|
||||
create_time: Utc::now(),
|
||||
complete_time: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if all execution pointers in a given scope have completed.
|
||||
pub fn is_branch_complete(&self, scope: &[String]) -> bool {
|
||||
self.execution_pointers
|
||||
.iter()
|
||||
.filter(|p| p.scope == scope)
|
||||
.all(|p| {
|
||||
matches!(
|
||||
p.status,
|
||||
PointerStatus::Complete
|
||||
| PointerStatus::Compensated
|
||||
| PointerStatus::Cancelled
|
||||
| PointerStatus::Failed
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn new_instance_defaults() {
|
||||
let instance = WorkflowInstance::new("test-workflow", 1, serde_json::json!({}));
|
||||
assert_eq!(instance.workflow_definition_id, "test-workflow");
|
||||
assert_eq!(instance.version, 1);
|
||||
assert_eq!(instance.status, WorkflowStatus::Runnable);
|
||||
assert_eq!(instance.next_execution, Some(0));
|
||||
assert!(instance.execution_pointers.is_empty());
|
||||
assert!(instance.complete_time.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn is_branch_complete_empty_scope_returns_true() {
|
||||
let instance = WorkflowInstance::new("test", 1, serde_json::json!({}));
|
||||
assert!(instance.is_branch_complete(&[]));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn is_branch_complete_all_complete() {
|
||||
let scope = vec!["parent-1".to_string()];
|
||||
let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({}));
|
||||
|
||||
let mut p1 = ExecutionPointer::new(0);
|
||||
p1.scope = scope.clone();
|
||||
p1.status = PointerStatus::Complete;
|
||||
|
||||
let mut p2 = ExecutionPointer::new(1);
|
||||
p2.scope = scope.clone();
|
||||
p2.status = PointerStatus::Compensated;
|
||||
|
||||
instance.execution_pointers = vec![p1, p2];
|
||||
assert!(instance.is_branch_complete(&scope));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn is_branch_complete_with_active_pointer() {
|
||||
let scope = vec!["parent-1".to_string()];
|
||||
let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({}));
|
||||
|
||||
let mut p1 = ExecutionPointer::new(0);
|
||||
p1.scope = scope.clone();
|
||||
p1.status = PointerStatus::Complete;
|
||||
|
||||
let mut p2 = ExecutionPointer::new(1);
|
||||
p2.scope = scope.clone();
|
||||
p2.status = PointerStatus::Running;
|
||||
|
||||
instance.execution_pointers = vec![p1, p2];
|
||||
assert!(!instance.is_branch_complete(&scope));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn is_branch_complete_ignores_different_scope() {
|
||||
let scope_a = vec!["parent-a".to_string()];
|
||||
let scope_b = vec!["parent-b".to_string()];
|
||||
let mut instance = WorkflowInstance::new("test", 1, serde_json::json!({}));
|
||||
|
||||
let mut p1 = ExecutionPointer::new(0);
|
||||
p1.scope = scope_a.clone();
|
||||
p1.status = PointerStatus::Complete;
|
||||
|
||||
let mut p2 = ExecutionPointer::new(1);
|
||||
p2.scope = scope_b.clone();
|
||||
p2.status = PointerStatus::Running;
|
||||
|
||||
instance.execution_pointers = vec![p1, p2];
|
||||
assert!(instance.is_branch_complete(&scope_a));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn serde_round_trip() {
|
||||
let instance = WorkflowInstance::new("my-workflow", 2, serde_json::json!({"key": "value"}));
|
||||
let json = serde_json::to_string(&instance).unwrap();
|
||||
let deserialized: WorkflowInstance = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(instance.id, deserialized.id);
|
||||
assert_eq!(instance.workflow_definition_id, deserialized.workflow_definition_id);
|
||||
assert_eq!(instance.version, deserialized.version);
|
||||
assert_eq!(instance.status, deserialized.status);
|
||||
assert_eq!(instance.data, deserialized.data);
|
||||
}
|
||||
}
|
||||
9
wfe-core/src/traits/lifecycle.rs
Normal file
9
wfe-core/src/traits/lifecycle.rs
Normal file
@@ -0,0 +1,9 @@
|
||||
use async_trait::async_trait;
|
||||
|
||||
use crate::models::LifecycleEvent;
|
||||
|
||||
/// Publishes lifecycle events for workflow state transitions.
|
||||
#[async_trait]
|
||||
pub trait LifecyclePublisher: Send + Sync {
|
||||
async fn publish(&self, event: LifecycleEvent) -> crate::Result<()>;
|
||||
}
|
||||
10
wfe-core/src/traits/lock.rs
Normal file
10
wfe-core/src/traits/lock.rs
Normal file
@@ -0,0 +1,10 @@
|
||||
use async_trait::async_trait;
|
||||
|
||||
/// Distributed lock provider for preventing concurrent execution of the same workflow.
|
||||
#[async_trait]
|
||||
pub trait DistributedLockProvider: Send + Sync {
|
||||
async fn acquire_lock(&self, resource: &str) -> crate::Result<bool>;
|
||||
async fn release_lock(&self, resource: &str) -> crate::Result<()>;
|
||||
async fn start(&self) -> crate::Result<()>;
|
||||
async fn stop(&self) -> crate::Result<()>;
|
||||
}
|
||||
93
wfe-core/src/traits/middleware.rs
Normal file
93
wfe-core/src/traits/middleware.rs
Normal file
@@ -0,0 +1,93 @@
|
||||
use async_trait::async_trait;
|
||||
|
||||
use crate::models::{ExecutionResult, WorkflowInstance};
|
||||
use crate::traits::step::StepExecutionContext;
|
||||
|
||||
/// Workflow-level middleware with default no-op implementations.
|
||||
#[async_trait]
|
||||
pub trait WorkflowMiddleware: Send + Sync {
|
||||
async fn pre_workflow(&self, _instance: &WorkflowInstance) -> crate::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
async fn post_workflow(&self, _instance: &WorkflowInstance) -> crate::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Step-level middleware with default no-op implementations.
|
||||
#[async_trait]
|
||||
pub trait StepMiddleware: Send + Sync {
|
||||
async fn pre_step(&self, _context: &StepExecutionContext<'_>) -> crate::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
async fn post_step(
|
||||
&self,
|
||||
_context: &StepExecutionContext<'_>,
|
||||
_result: &ExecutionResult,
|
||||
) -> crate::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::models::{ExecutionPointer, ExecutionResult, WorkflowInstance};
|
||||
|
||||
struct NoOpWorkflowMiddleware;
|
||||
impl WorkflowMiddleware for NoOpWorkflowMiddleware {}
|
||||
|
||||
struct NoOpStepMiddleware;
|
||||
impl StepMiddleware for NoOpStepMiddleware {}
|
||||
|
||||
#[tokio::test]
|
||||
async fn workflow_middleware_default_pre_workflow() {
|
||||
let mw = NoOpWorkflowMiddleware;
|
||||
let instance = WorkflowInstance::new("wf", 1, serde_json::json!({}));
|
||||
mw.pre_workflow(&instance).await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn workflow_middleware_default_post_workflow() {
|
||||
let mw = NoOpWorkflowMiddleware;
|
||||
let instance = WorkflowInstance::new("wf", 1, serde_json::json!({}));
|
||||
mw.post_workflow(&instance).await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn step_middleware_default_pre_step() {
|
||||
use crate::models::WorkflowStep;
|
||||
let mw = NoOpStepMiddleware;
|
||||
let instance = WorkflowInstance::new("wf", 1, serde_json::json!({}));
|
||||
let pointer = ExecutionPointer::new(0);
|
||||
let step = WorkflowStep::new(0, "test_step");
|
||||
let ctx = StepExecutionContext {
|
||||
item: None,
|
||||
execution_pointer: &pointer,
|
||||
persistence_data: None,
|
||||
step: &step,
|
||||
workflow: &instance,
|
||||
cancellation_token: tokio_util::sync::CancellationToken::new(),
|
||||
};
|
||||
mw.pre_step(&ctx).await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn step_middleware_default_post_step() {
|
||||
use crate::models::WorkflowStep;
|
||||
let mw = NoOpStepMiddleware;
|
||||
let instance = WorkflowInstance::new("wf", 1, serde_json::json!({}));
|
||||
let pointer = ExecutionPointer::new(0);
|
||||
let step = WorkflowStep::new(0, "test_step");
|
||||
let ctx = StepExecutionContext {
|
||||
item: None,
|
||||
execution_pointer: &pointer,
|
||||
persistence_data: None,
|
||||
step: &step,
|
||||
workflow: &instance,
|
||||
cancellation_token: tokio_util::sync::CancellationToken::new(),
|
||||
};
|
||||
let result = ExecutionResult::next();
|
||||
mw.post_step(&ctx, &result).await.unwrap();
|
||||
}
|
||||
}
|
||||
20
wfe-core/src/traits/mod.rs
Normal file
20
wfe-core/src/traits/mod.rs
Normal file
@@ -0,0 +1,20 @@
|
||||
pub mod lifecycle;
|
||||
pub mod lock;
|
||||
pub mod middleware;
|
||||
pub mod persistence;
|
||||
pub mod queue;
|
||||
pub mod registry;
|
||||
pub mod search;
|
||||
pub mod step;
|
||||
|
||||
pub use lifecycle::LifecyclePublisher;
|
||||
pub use lock::DistributedLockProvider;
|
||||
pub use middleware::{StepMiddleware, WorkflowMiddleware};
|
||||
pub use persistence::{
|
||||
EventRepository, PersistenceProvider, ScheduledCommandRepository, SubscriptionRepository,
|
||||
WorkflowRepository,
|
||||
};
|
||||
pub use queue::QueueProvider;
|
||||
pub use registry::WorkflowRegistry;
|
||||
pub use search::{Page, SearchFilter, SearchIndex, WorkflowSearchResult};
|
||||
pub use step::{StepBody, StepExecutionContext, WorkflowData};
|
||||
95
wfe-core/src/traits/persistence.rs
Normal file
95
wfe-core/src/traits/persistence.rs
Normal file
@@ -0,0 +1,95 @@
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
|
||||
use crate::models::{
|
||||
Event, EventSubscription, ExecutionError, ScheduledCommand, WorkflowInstance,
|
||||
};
|
||||
|
||||
/// Persistence for workflow instances.
|
||||
#[async_trait]
|
||||
pub trait WorkflowRepository: Send + Sync {
|
||||
async fn create_new_workflow(&self, instance: &WorkflowInstance) -> crate::Result<String>;
|
||||
async fn persist_workflow(&self, instance: &WorkflowInstance) -> crate::Result<()>;
|
||||
async fn persist_workflow_with_subscriptions(
|
||||
&self,
|
||||
instance: &WorkflowInstance,
|
||||
subscriptions: &[EventSubscription],
|
||||
) -> crate::Result<()>;
|
||||
async fn get_runnable_instances(&self, as_at: DateTime<Utc>) -> crate::Result<Vec<String>>;
|
||||
async fn get_workflow_instance(&self, id: &str) -> crate::Result<WorkflowInstance>;
|
||||
async fn get_workflow_instances(&self, ids: &[String]) -> crate::Result<Vec<WorkflowInstance>>;
|
||||
}
|
||||
|
||||
/// Persistence for event subscriptions.
|
||||
#[async_trait]
|
||||
pub trait SubscriptionRepository: Send + Sync {
|
||||
async fn create_event_subscription(
|
||||
&self,
|
||||
subscription: &EventSubscription,
|
||||
) -> crate::Result<String>;
|
||||
async fn get_subscriptions(
|
||||
&self,
|
||||
event_name: &str,
|
||||
event_key: &str,
|
||||
as_of: DateTime<Utc>,
|
||||
) -> crate::Result<Vec<EventSubscription>>;
|
||||
async fn terminate_subscription(&self, subscription_id: &str) -> crate::Result<()>;
|
||||
async fn get_subscription(&self, subscription_id: &str) -> crate::Result<EventSubscription>;
|
||||
async fn get_first_open_subscription(
|
||||
&self,
|
||||
event_name: &str,
|
||||
event_key: &str,
|
||||
as_of: DateTime<Utc>,
|
||||
) -> crate::Result<Option<EventSubscription>>;
|
||||
async fn set_subscription_token(
|
||||
&self,
|
||||
subscription_id: &str,
|
||||
token: &str,
|
||||
worker_id: &str,
|
||||
expiry: DateTime<Utc>,
|
||||
) -> crate::Result<bool>;
|
||||
async fn clear_subscription_token(
|
||||
&self,
|
||||
subscription_id: &str,
|
||||
token: &str,
|
||||
) -> crate::Result<()>;
|
||||
}
|
||||
|
||||
/// Persistence for events.
|
||||
#[async_trait]
|
||||
pub trait EventRepository: Send + Sync {
|
||||
async fn create_event(&self, event: &Event) -> crate::Result<String>;
|
||||
async fn get_event(&self, id: &str) -> crate::Result<Event>;
|
||||
async fn get_runnable_events(&self, as_at: DateTime<Utc>) -> crate::Result<Vec<String>>;
|
||||
async fn get_events(
|
||||
&self,
|
||||
event_name: &str,
|
||||
event_key: &str,
|
||||
as_of: DateTime<Utc>,
|
||||
) -> crate::Result<Vec<String>>;
|
||||
async fn mark_event_processed(&self, id: &str) -> crate::Result<()>;
|
||||
async fn mark_event_unprocessed(&self, id: &str) -> crate::Result<()>;
|
||||
}
|
||||
|
||||
/// Persistence for scheduled commands.
|
||||
#[async_trait]
|
||||
pub trait ScheduledCommandRepository: Send + Sync {
|
||||
fn supports_scheduled_commands(&self) -> bool;
|
||||
async fn schedule_command(&self, command: &ScheduledCommand) -> crate::Result<()>;
|
||||
async fn process_commands(
|
||||
&self,
|
||||
as_of: DateTime<Utc>,
|
||||
handler: &(dyn Fn(ScheduledCommand) -> std::pin::Pin<Box<dyn std::future::Future<Output = crate::Result<()>> + Send>>
|
||||
+ Send
|
||||
+ Sync),
|
||||
) -> crate::Result<()>;
|
||||
}
|
||||
|
||||
/// Composite persistence provider combining all repository traits.
|
||||
#[async_trait]
|
||||
pub trait PersistenceProvider:
|
||||
WorkflowRepository + EventRepository + SubscriptionRepository + ScheduledCommandRepository
|
||||
{
|
||||
async fn persist_errors(&self, errors: &[ExecutionError]) -> crate::Result<()>;
|
||||
async fn ensure_store_exists(&self) -> crate::Result<()>;
|
||||
}
|
||||
13
wfe-core/src/traits/queue.rs
Normal file
13
wfe-core/src/traits/queue.rs
Normal file
@@ -0,0 +1,13 @@
|
||||
use async_trait::async_trait;
|
||||
|
||||
use crate::models::QueueType;
|
||||
|
||||
/// Queue provider for distributing workflow execution across workers.
|
||||
#[async_trait]
|
||||
pub trait QueueProvider: Send + Sync {
|
||||
async fn queue_work(&self, id: &str, queue: QueueType) -> crate::Result<()>;
|
||||
async fn dequeue_work(&self, queue: QueueType) -> crate::Result<Option<String>>;
|
||||
fn is_dequeue_blocking(&self) -> bool;
|
||||
async fn start(&self) -> crate::Result<()>;
|
||||
async fn stop(&self) -> crate::Result<()>;
|
||||
}
|
||||
10
wfe-core/src/traits/registry.rs
Normal file
10
wfe-core/src/traits/registry.rs
Normal file
@@ -0,0 +1,10 @@
|
||||
use crate::models::WorkflowDefinition;
|
||||
|
||||
/// Registry for workflow definitions with version support.
|
||||
pub trait WorkflowRegistry: Send + Sync {
|
||||
fn register(&mut self, definition: WorkflowDefinition);
|
||||
fn get_definition(&self, id: &str, version: Option<u32>) -> Option<&WorkflowDefinition>;
|
||||
fn is_registered(&self, id: &str, version: u32) -> bool;
|
||||
fn deregister(&mut self, id: &str, version: u32) -> bool;
|
||||
fn get_all_definitions(&self) -> Vec<&WorkflowDefinition>;
|
||||
}
|
||||
48
wfe-core/src/traits/search.rs
Normal file
48
wfe-core/src/traits/search.rs
Normal file
@@ -0,0 +1,48 @@
|
||||
use async_trait::async_trait;
|
||||
|
||||
use crate::models::WorkflowInstance;
|
||||
|
||||
/// Result from a search query.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct WorkflowSearchResult {
|
||||
pub id: String,
|
||||
pub workflow_definition_id: String,
|
||||
pub version: u32,
|
||||
pub status: crate::models::WorkflowStatus,
|
||||
pub reference: Option<String>,
|
||||
pub description: Option<String>,
|
||||
}
|
||||
|
||||
/// Filter for search queries.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum SearchFilter {
|
||||
Status(crate::models::WorkflowStatus),
|
||||
DateRange {
|
||||
field: String,
|
||||
before: Option<chrono::DateTime<chrono::Utc>>,
|
||||
after: Option<chrono::DateTime<chrono::Utc>>,
|
||||
},
|
||||
Reference(String),
|
||||
}
|
||||
|
||||
/// Paginated search results.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Page<T> {
|
||||
pub data: Vec<T>,
|
||||
pub total: u64,
|
||||
}
|
||||
|
||||
/// Search index for querying workflows.
|
||||
#[async_trait]
|
||||
pub trait SearchIndex: Send + Sync {
|
||||
async fn index_workflow(&self, instance: &WorkflowInstance) -> crate::Result<()>;
|
||||
async fn search(
|
||||
&self,
|
||||
terms: &str,
|
||||
skip: u64,
|
||||
take: u64,
|
||||
filters: &[SearchFilter],
|
||||
) -> crate::Result<Page<WorkflowSearchResult>>;
|
||||
async fn start(&self) -> crate::Result<()>;
|
||||
async fn stop(&self) -> crate::Result<()>;
|
||||
}
|
||||
35
wfe-core/src/traits/step.rs
Normal file
35
wfe-core/src/traits/step.rs
Normal file
@@ -0,0 +1,35 @@
|
||||
use async_trait::async_trait;
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::models::{ExecutionPointer, ExecutionResult, WorkflowInstance, WorkflowStep};
|
||||
|
||||
/// Marker trait for all data types that flow between workflow steps.
|
||||
/// Anything that is serializable and deserializable qualifies.
|
||||
pub trait WorkflowData: Serialize + DeserializeOwned + Send + Sync + Clone + 'static {}
|
||||
|
||||
/// Blanket implementation: any type satisfying the bounds is WorkflowData.
|
||||
impl<T> WorkflowData for T where T: Serialize + DeserializeOwned + Send + Sync + Clone + 'static {}
|
||||
|
||||
/// Context available to a step during execution.
|
||||
#[derive(Debug)]
|
||||
pub struct StepExecutionContext<'a> {
|
||||
/// The current item when iterating (ForEach).
|
||||
pub item: Option<&'a serde_json::Value>,
|
||||
/// The current execution pointer.
|
||||
pub execution_pointer: &'a ExecutionPointer,
|
||||
/// Persistence data from a previous execution of this step.
|
||||
pub persistence_data: Option<&'a serde_json::Value>,
|
||||
/// The step definition.
|
||||
pub step: &'a WorkflowStep,
|
||||
/// The running workflow instance.
|
||||
pub workflow: &'a WorkflowInstance,
|
||||
/// Cancellation token.
|
||||
pub cancellation_token: tokio_util::sync::CancellationToken,
|
||||
}
|
||||
|
||||
/// The core unit of work in a workflow. Each step implements this trait.
|
||||
#[async_trait]
|
||||
pub trait StepBody: Send + Sync {
|
||||
async fn run(&mut self, context: &StepExecutionContext<'_>) -> crate::Result<ExecutionResult>;
|
||||
}
|
||||
Reference in New Issue
Block a user