diff --git a/wfe-core/src/models/mod.rs b/wfe-core/src/models/mod.rs index a0eb83b..8149850 100644 --- a/wfe-core/src/models/mod.rs +++ b/wfe-core/src/models/mod.rs @@ -1,6 +1,7 @@ pub mod condition; pub mod error_behavior; pub mod event; +pub mod service; pub mod execution_error; pub mod execution_pointer; pub mod execution_result; @@ -26,6 +27,7 @@ pub use scheduled_command::{CommandName, ScheduledCommand}; pub use schema::{SchemaType, WorkflowSchema}; pub use status::{PointerStatus, WorkflowStatus}; pub use workflow_definition::{StepOutcome, WorkflowDefinition, WorkflowStep}; +pub use service::{ReadinessCheck, ReadinessProbe, ServiceDefinition, ServiceEndpoint, ServicePort}; pub use workflow_instance::WorkflowInstance; /// Serde helper for `Option` as milliseconds. diff --git a/wfe-core/src/models/service.rs b/wfe-core/src/models/service.rs new file mode 100644 index 0000000..7197a36 --- /dev/null +++ b/wfe-core/src/models/service.rs @@ -0,0 +1,283 @@ +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; + +/// An infrastructure service that runs alongside workflow steps. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ServiceDefinition { + /// Service name -- used as DNS hostname (K8s) or env prefix (containerd). + pub name: String, + /// Container image to run. + pub image: String, + /// Ports exposed by the service. + #[serde(default)] + pub ports: Vec, + /// Environment variables for the service container. + #[serde(default)] + pub env: HashMap, + /// How to check if the service is ready to accept connections. + #[serde(default)] + pub readiness: Option, + /// Override the container entrypoint. + #[serde(default)] + pub command: Vec, + /// Override the container command/args. + #[serde(default)] + pub args: Vec, + /// Memory limit (e.g., "512Mi"). + #[serde(default)] + pub memory: Option, + /// CPU limit (e.g., "500m"). + #[serde(default)] + pub cpu: Option, +} + +/// A port exposed by a service. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct ServicePort { + pub container_port: u16, + #[serde(default)] + pub name: Option, + /// Protocol: "TCP" (default) or "UDP". + #[serde(default = "default_protocol")] + pub protocol: String, +} + +impl ServicePort { + pub fn tcp(port: u16) -> Self { + Self { + container_port: port, + name: None, + protocol: "TCP".into(), + } + } +} + +fn default_protocol() -> String { + "TCP".into() +} + +/// How to determine if a service is ready. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ReadinessProbe { + pub check: ReadinessCheck, + /// Poll interval in milliseconds. + #[serde(default = "default_5000")] + pub interval_ms: u64, + /// Total timeout in milliseconds. + #[serde(default = "default_60000")] + pub timeout_ms: u64, + /// Maximum number of retries before giving up. + #[serde(default = "default_12")] + pub retries: u32, +} + +/// The type of readiness check to perform. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum ReadinessCheck { + /// Run a command inside the container. + Exec(Vec), + /// Check if a TCP port is accepting connections. + TcpSocket(u16), + /// Make an HTTP GET request. + HttpGet { port: u16, path: String }, +} + +/// Runtime endpoint info for a provisioned service. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct ServiceEndpoint { + pub name: String, + pub host: String, + pub ports: Vec, +} + +fn default_5000() -> u64 { + 5000 +} +fn default_60000() -> u64 { + 60000 +} +fn default_12() -> u32 { + 12 +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn service_definition_minimal_serde() { + let json = r#"{"name":"postgres","image":"postgres:15"}"#; + let svc: ServiceDefinition = serde_json::from_str(json).unwrap(); + assert_eq!(svc.name, "postgres"); + assert_eq!(svc.image, "postgres:15"); + assert!(svc.ports.is_empty()); + assert!(svc.env.is_empty()); + assert!(svc.readiness.is_none()); + assert!(svc.command.is_empty()); + assert!(svc.memory.is_none()); + } + + #[test] + fn service_definition_full_round_trip() { + let svc = ServiceDefinition { + name: "redis".into(), + image: "redis:7-alpine".into(), + ports: vec![ServicePort::tcp(6379)], + env: [("REDIS_PASSWORD".into(), "secret".into())].into(), + readiness: Some(ReadinessProbe { + check: ReadinessCheck::TcpSocket(6379), + interval_ms: 2000, + timeout_ms: 30000, + retries: 15, + }), + command: vec![], + args: vec!["--requirepass".into(), "secret".into()], + memory: Some("256Mi".into()), + cpu: Some("250m".into()), + }; + let json = serde_json::to_string(&svc).unwrap(); + let parsed: ServiceDefinition = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed.name, "redis"); + assert_eq!(parsed.ports.len(), 1); + assert_eq!(parsed.ports[0].container_port, 6379); + assert_eq!(parsed.args, vec!["--requirepass", "secret"]); + assert_eq!(parsed.memory, Some("256Mi".into())); + } + + #[test] + fn service_port_tcp_helper() { + let port = ServicePort::tcp(5432); + assert_eq!(port.container_port, 5432); + assert_eq!(port.protocol, "TCP"); + assert!(port.name.is_none()); + } + + #[test] + fn service_port_default_protocol() { + let json = r#"{"container_port": 8080}"#; + let port: ServicePort = serde_json::from_str(json).unwrap(); + assert_eq!(port.protocol, "TCP"); + } + + #[test] + fn readiness_probe_exec() { + let probe = ReadinessProbe { + check: ReadinessCheck::Exec(vec!["pg_isready".into(), "-U".into(), "postgres".into()]), + interval_ms: 5000, + timeout_ms: 60000, + retries: 12, + }; + let json = serde_json::to_string(&probe).unwrap(); + let parsed: ReadinessProbe = serde_json::from_str(&json).unwrap(); + match parsed.check { + ReadinessCheck::Exec(cmd) => assert_eq!(cmd, vec!["pg_isready", "-U", "postgres"]), + _ => panic!("expected Exec"), + } + } + + #[test] + fn readiness_probe_tcp_socket() { + let probe = ReadinessProbe { + check: ReadinessCheck::TcpSocket(6379), + interval_ms: 2000, + timeout_ms: 30000, + retries: 15, + }; + let json = serde_json::to_string(&probe).unwrap(); + let parsed: ReadinessProbe = serde_json::from_str(&json).unwrap(); + match parsed.check { + ReadinessCheck::TcpSocket(port) => assert_eq!(port, 6379), + _ => panic!("expected TcpSocket"), + } + } + + #[test] + fn readiness_probe_http_get() { + let probe = ReadinessProbe { + check: ReadinessCheck::HttpGet { + port: 8080, + path: "/health".into(), + }, + interval_ms: 5000, + timeout_ms: 60000, + retries: 12, + }; + let json = serde_json::to_string(&probe).unwrap(); + let parsed: ReadinessProbe = serde_json::from_str(&json).unwrap(); + match parsed.check { + ReadinessCheck::HttpGet { port, path } => { + assert_eq!(port, 8080); + assert_eq!(path, "/health"); + } + _ => panic!("expected HttpGet"), + } + } + + #[test] + fn readiness_probe_defaults() { + let json = r#"{"check": {"tcp_socket": 5432}}"#; + let probe: ReadinessProbe = serde_json::from_str(json).unwrap(); + assert_eq!(probe.interval_ms, 5000); + assert_eq!(probe.timeout_ms, 60000); + assert_eq!(probe.retries, 12); + } + + #[test] + fn service_endpoint_serde() { + let ep = ServiceEndpoint { + name: "postgres".into(), + host: "postgres.wfe-abc.svc.cluster.local".into(), + ports: vec![ServicePort::tcp(5432)], + }; + let json = serde_json::to_string(&ep).unwrap(); + let parsed: ServiceEndpoint = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed.name, "postgres"); + assert_eq!(parsed.host, "postgres.wfe-abc.svc.cluster.local"); + assert_eq!(parsed.ports.len(), 1); + } + + #[test] + fn service_definition_with_env() { + let svc = ServiceDefinition { + name: "postgres".into(), + image: "postgres:15".into(), + ports: vec![ServicePort::tcp(5432)], + env: [ + ("POSTGRES_PASSWORD".into(), "test".into()), + ("POSTGRES_DB".into(), "myapp".into()), + ] + .into(), + readiness: None, + command: vec![], + args: vec![], + memory: None, + cpu: None, + }; + let json = serde_json::to_string(&svc).unwrap(); + let parsed: ServiceDefinition = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed.env.get("POSTGRES_PASSWORD"), Some(&"test".into())); + assert_eq!(parsed.env.get("POSTGRES_DB"), Some(&"myapp".into())); + } + + #[test] + fn service_definition_with_command() { + let svc = ServiceDefinition { + name: "custom".into(), + image: "myimage:latest".into(), + ports: vec![], + env: HashMap::new(), + readiness: None, + command: vec!["/usr/bin/myserver".into()], + args: vec!["--config".into(), "/etc/config.yaml".into()], + memory: None, + cpu: None, + }; + let json = serde_json::to_string(&svc).unwrap(); + let parsed: ServiceDefinition = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed.command, vec!["/usr/bin/myserver"]); + assert_eq!(parsed.args, vec!["--config", "/etc/config.yaml"]); + } +} diff --git a/wfe-core/src/models/workflow_definition.rs b/wfe-core/src/models/workflow_definition.rs index 0c98db2..a59099a 100644 --- a/wfe-core/src/models/workflow_definition.rs +++ b/wfe-core/src/models/workflow_definition.rs @@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize}; use super::condition::StepCondition; use super::error_behavior::ErrorBehavior; +use super::service::ServiceDefinition; /// A compiled workflow definition ready for execution. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -15,6 +16,9 @@ pub struct WorkflowDefinition { pub default_error_behavior: ErrorBehavior, #[serde(default, with = "super::option_duration_millis")] pub default_error_retry_interval: Option, + /// Infrastructure services required by this workflow (databases, caches, etc.). + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub services: Vec, } impl WorkflowDefinition { @@ -26,6 +30,7 @@ impl WorkflowDefinition { steps: Vec::new(), default_error_behavior: ErrorBehavior::default(), default_error_retry_interval: None, + services: Vec::new(), } } } diff --git a/wfe-core/src/traits/mod.rs b/wfe-core/src/traits/mod.rs index 46db3df..399cf8a 100644 --- a/wfe-core/src/traits/mod.rs +++ b/wfe-core/src/traits/mod.rs @@ -1,5 +1,6 @@ pub mod lifecycle; pub mod lock; +pub mod service; pub mod log_sink; pub mod middleware; pub mod persistence; @@ -19,4 +20,5 @@ pub use persistence::{ pub use queue::QueueProvider; pub use registry::WorkflowRegistry; pub use search::{Page, SearchFilter, SearchIndex, WorkflowSearchResult}; +pub use service::ServiceProvider; pub use step::{HostContext, StepBody, StepExecutionContext, WorkflowData}; diff --git a/wfe-core/src/traits/service.rs b/wfe-core/src/traits/service.rs new file mode 100644 index 0000000..baef339 --- /dev/null +++ b/wfe-core/src/traits/service.rs @@ -0,0 +1,31 @@ +use async_trait::async_trait; + +use crate::models::service::{ServiceDefinition, ServiceEndpoint}; + +/// Provides infrastructure services (databases, caches, etc.) for workflows. +/// +/// Implementations handle provisioning, readiness checking, and teardown +/// of services on a specific platform (Kubernetes, containerd, etc.). +#[async_trait] +pub trait ServiceProvider: Send + Sync { + /// Check if this provider can provision the given services. + /// + /// Returns false if any service cannot be handled by this provider. + fn can_provision(&self, services: &[ServiceDefinition]) -> bool; + + /// Provision all services for a workflow. + /// + /// Creates the service containers/pods, waits for them to be ready, + /// and returns endpoint information for each service. + async fn provision( + &self, + workflow_id: &str, + services: &[ServiceDefinition], + ) -> crate::Result>; + + /// Tear down all services for a workflow. + /// + /// Called on workflow completion, failure, or termination. + /// Should be idempotent -- safe to call multiple times. + async fn teardown(&self, workflow_id: &str) -> crate::Result<()>; +}