diff --git a/wfe-core/src/models/workflow_definition.rs b/wfe-core/src/models/workflow_definition.rs index a59099a..c2aaf6a 100644 --- a/wfe-core/src/models/workflow_definition.rs +++ b/wfe-core/src/models/workflow_definition.rs @@ -9,7 +9,14 @@ use super::service::ServiceDefinition; /// A compiled workflow definition ready for execution. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WorkflowDefinition { + /// Stable slug used as the primary key (e.g. "ci", "checkout"). Must be + /// unique within a host. Referenced by other workflows, webhooks, and + /// clients when starting new instances. pub id: String, + /// Optional human-friendly display name surfaced in UIs, listings, and + /// logs (e.g. "Continuous Integration"). Falls back to `id` when unset. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub name: Option, pub version: u32, pub description: Option, pub steps: Vec, @@ -25,6 +32,7 @@ impl WorkflowDefinition { pub fn new(id: impl Into, version: u32) -> Self { Self { id: id.into(), + name: None, version, description: None, steps: Vec::new(), @@ -33,6 +41,11 @@ impl WorkflowDefinition { services: Vec::new(), } } + + /// Return the display name when set, otherwise fall back to the slug id. + pub fn display_name(&self) -> &str { + self.name.as_deref().unwrap_or(&self.id) + } } /// A single step in a workflow definition. diff --git a/wfe-core/src/models/workflow_instance.rs b/wfe-core/src/models/workflow_instance.rs index 0bc1882..932b5b9 100644 --- a/wfe-core/src/models/workflow_instance.rs +++ b/wfe-core/src/models/workflow_instance.rs @@ -6,7 +6,14 @@ use super::status::{PointerStatus, WorkflowStatus}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WorkflowInstance { + /// UUID — the primary key, always unique, never changes. pub id: String, + /// Human-friendly unique name, e.g. "ci-42". Auto-assigned as + /// `{definition_id}-{N}` via a per-definition monotonic counter when + /// the caller does not supply an override. Used interchangeably with + /// `id` in lookup APIs. Empty when the instance has not yet been + /// persisted (the host fills it in before the first insert). + pub name: String, pub workflow_definition_id: String, pub version: u32, pub description: Option, @@ -20,9 +27,15 @@ pub struct WorkflowInstance { } impl WorkflowInstance { - pub fn new(workflow_definition_id: impl Into, version: u32, data: serde_json::Value) -> Self { + pub fn new( + workflow_definition_id: impl Into, + version: u32, + data: serde_json::Value, + ) -> Self { Self { id: uuid::Uuid::new_v4().to_string(), + // Filled in by WorkflowHost::start_workflow before persisting. + name: String::new(), workflow_definition_id: workflow_definition_id.into(), version, description: None, @@ -134,7 +147,10 @@ mod tests { let json = serde_json::to_string(&instance).unwrap(); let deserialized: WorkflowInstance = serde_json::from_str(&json).unwrap(); assert_eq!(instance.id, deserialized.id); - assert_eq!(instance.workflow_definition_id, deserialized.workflow_definition_id); + assert_eq!( + instance.workflow_definition_id, + deserialized.workflow_definition_id + ); assert_eq!(instance.version, deserialized.version); assert_eq!(instance.status, deserialized.status); assert_eq!(instance.data, deserialized.data); diff --git a/wfe-core/src/test_support/in_memory_persistence.rs b/wfe-core/src/test_support/in_memory_persistence.rs index bc8c89f..3ce0ccc 100644 --- a/wfe-core/src/test_support/in_memory_persistence.rs +++ b/wfe-core/src/test_support/in_memory_persistence.rs @@ -5,9 +5,7 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; use tokio::sync::RwLock; -use crate::models::{ - Event, EventSubscription, ExecutionError, ScheduledCommand, WorkflowInstance, -}; +use crate::models::{Event, EventSubscription, ExecutionError, ScheduledCommand, WorkflowInstance}; use crate::traits::{ EventRepository, PersistenceProvider, ScheduledCommandRepository, SubscriptionRepository, WorkflowRepository, @@ -22,6 +20,9 @@ pub struct InMemoryPersistenceProvider { subscriptions: Arc>>, errors: Arc>>, scheduled_commands: Arc>>, + /// Per-definition monotonic counter used to generate human-friendly + /// workflow instance names of the form `{definition_id}-{N}`. + sequences: Arc>>, } impl InMemoryPersistenceProvider { @@ -32,6 +33,7 @@ impl InMemoryPersistenceProvider { subscriptions: Arc::new(RwLock::new(HashMap::new())), errors: Arc::new(RwLock::new(Vec::new())), scheduled_commands: Arc::new(RwLock::new(Vec::new())), + sequences: Arc::new(RwLock::new(HashMap::new())), } } @@ -107,6 +109,23 @@ impl WorkflowRepository for InMemoryPersistenceProvider { .ok_or_else(|| WfeError::WorkflowNotFound(id.to_string())) } + async fn get_workflow_instance_by_name(&self, name: &str) -> Result { + self.workflows + .read() + .await + .values() + .find(|w| w.name == name) + .cloned() + .ok_or_else(|| WfeError::WorkflowNotFound(name.to_string())) + } + + async fn next_definition_sequence(&self, definition_id: &str) -> Result { + let mut seqs = self.sequences.write().await; + let next = seqs.get(definition_id).copied().unwrap_or(0) + 1; + seqs.insert(definition_id.to_string(), next); + Ok(next) + } + async fn get_workflow_instances(&self, ids: &[String]) -> Result> { let workflows = self.workflows.read().await; let mut result = Vec::new(); @@ -121,10 +140,7 @@ impl WorkflowRepository for InMemoryPersistenceProvider { #[async_trait] impl SubscriptionRepository for InMemoryPersistenceProvider { - async fn create_event_subscription( - &self, - subscription: &EventSubscription, - ) -> Result { + async fn create_event_subscription(&self, subscription: &EventSubscription) -> Result { let id = if subscription.id.is_empty() { uuid::Uuid::new_v4().to_string() } else { @@ -217,11 +233,7 @@ impl SubscriptionRepository for InMemoryPersistenceProvider { } } - async fn clear_subscription_token( - &self, - subscription_id: &str, - token: &str, - ) -> Result<()> { + async fn clear_subscription_token(&self, subscription_id: &str, token: &str) -> Result<()> { let mut subs = self.subscriptions.write().await; match subs.get_mut(subscription_id) { Some(sub) => { @@ -282,7 +294,9 @@ impl EventRepository for InMemoryPersistenceProvider { let events = self.events.read().await; let ids = events .values() - .filter(|e| e.event_name == event_name && e.event_key == event_key && e.event_time <= as_of) + .filter(|e| { + e.event_name == event_name && e.event_key == event_key && e.event_time <= as_of + }) .map(|e| e.id.clone()) .collect(); Ok(ids) @@ -325,9 +339,14 @@ impl ScheduledCommandRepository for InMemoryPersistenceProvider { async fn process_commands( &self, as_of: DateTime, - handler: &(dyn Fn(ScheduledCommand) -> std::pin::Pin> + Send>> - + Send - + Sync), + handler: &( + dyn Fn( + ScheduledCommand, + ) + -> std::pin::Pin> + Send>> + + Send + + Sync + ), ) -> Result<()> { let as_of_millis = as_of.timestamp_millis(); let due: Vec = { @@ -360,7 +379,7 @@ impl PersistenceProvider for InMemoryPersistenceProvider { #[cfg(test)] mod tests { use super::*; - use crate::models::{Event, EventSubscription, ExecutionError, ScheduledCommand, CommandName}; + use crate::models::{CommandName, Event, EventSubscription, ExecutionError, ScheduledCommand}; use crate::traits::{ EventRepository, PersistenceProvider, ScheduledCommandRepository, SubscriptionRepository, WorkflowRepository, diff --git a/wfe-core/src/traits/persistence.rs b/wfe-core/src/traits/persistence.rs index 9b00fe2..3cafbc8 100644 --- a/wfe-core/src/traits/persistence.rs +++ b/wfe-core/src/traits/persistence.rs @@ -1,9 +1,7 @@ use async_trait::async_trait; use chrono::{DateTime, Utc}; -use crate::models::{ - Event, EventSubscription, ExecutionError, ScheduledCommand, WorkflowInstance, -}; +use crate::models::{Event, EventSubscription, ExecutionError, ScheduledCommand, WorkflowInstance}; /// Persistence for workflow instances. #[async_trait] @@ -17,7 +15,15 @@ pub trait WorkflowRepository: Send + Sync { ) -> crate::Result<()>; async fn get_runnable_instances(&self, as_at: DateTime) -> crate::Result>; async fn get_workflow_instance(&self, id: &str) -> crate::Result; + async fn get_workflow_instance_by_name(&self, name: &str) + -> crate::Result; async fn get_workflow_instances(&self, ids: &[String]) -> crate::Result>; + + /// Atomically allocate the next sequence number for a given workflow + /// definition id. Used by the host to assign human-friendly names of the + /// form `{definition_id}-{N}` before inserting a new workflow instance. + /// Guaranteed monotonic per definition_id; no guarantees across definitions. + async fn next_definition_sequence(&self, definition_id: &str) -> crate::Result; } /// Persistence for event subscriptions. @@ -79,9 +85,14 @@ pub trait ScheduledCommandRepository: Send + Sync { async fn process_commands( &self, as_of: DateTime, - handler: &(dyn Fn(ScheduledCommand) -> std::pin::Pin> + Send>> - + Send - + Sync), + handler: &( + dyn Fn( + ScheduledCommand, + ) -> std::pin::Pin< + Box> + Send>, + > + Send + + Sync + ), ) -> crate::Result<()>; } diff --git a/wfe-deno/src/bridge.rs b/wfe-deno/src/bridge.rs index 7d29d3b..162ea8c 100644 --- a/wfe-deno/src/bridge.rs +++ b/wfe-deno/src/bridge.rs @@ -3,9 +3,9 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use std::time::Duration; use tokio::sync::{mpsc, oneshot}; +use wfe_core::WfeError; use wfe_core::models::ExecutionResult; use wfe_core::traits::step::{StepBody, StepExecutionContext}; -use wfe_core::WfeError; /// A request sent from the executor (tokio) to the V8 thread. pub struct StepRequest { @@ -160,7 +160,9 @@ pub fn deserialize_execution_result( value: &serde_json::Value, ) -> wfe_core::Result { let js_result: JsExecutionResult = serde_json::from_value(value.clone()).map_err(|e| { - WfeError::StepExecution(format!("failed to deserialize ExecutionResult from JS: {e}")) + WfeError::StepExecution(format!( + "failed to deserialize ExecutionResult from JS: {e}" + )) })?; Ok(ExecutionResult { @@ -186,6 +188,7 @@ mod tests { fn make_test_context() -> (WorkflowInstance, WorkflowStep, ExecutionPointer) { let instance = WorkflowInstance { id: "wf-1".into(), + name: "test-def-1".into(), workflow_definition_id: "test-def".into(), version: 1, description: None, @@ -373,7 +376,9 @@ mod tests { assert_eq!(req.step_type, "MyStep"); assert_eq!(req.request_id, 0); req.response_tx - .send(Ok(serde_json::json!({"proceed": true, "outputData": {"done": true}}))) + .send(Ok( + serde_json::json!({"proceed": true, "outputData": {"done": true}}), + )) .unwrap(); });