diff --git a/wfe/src/host.rs b/wfe/src/host.rs index 43847bc..c29bb23 100644 --- a/wfe/src/host.rs +++ b/wfe/src/host.rs @@ -6,18 +6,18 @@ use tokio::sync::RwLock; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; +use wfe_core::builder::WorkflowBuilder; use wfe_core::executor::{StepRegistry, WorkflowExecutor}; use wfe_core::models::{ Event, ExecutionPointer, LifecycleEvent, LifecycleEventType, PointerStatus, QueueType, WorkflowDefinition, WorkflowInstance, WorkflowStatus, }; +use wfe_core::traits::registry::WorkflowRegistry; use wfe_core::traits::{ DistributedLockProvider, HostContext, LifecyclePublisher, PersistenceProvider, QueueProvider, SearchIndex, ServiceProvider, StepBody, WorkflowData, }; -use wfe_core::traits::registry::WorkflowRegistry; use wfe_core::{Result, WfeError}; -use wfe_core::builder::WorkflowBuilder; use crate::registry::InMemoryWorkflowRegistry; @@ -41,12 +41,12 @@ impl HostContext for HostContextImpl { Box::pin(async move { // Look up the definition. let reg = self.registry.read().await; - let definition = reg - .get_definition(&def_id, Some(version)) - .ok_or_else(|| WfeError::DefinitionNotFound { + let definition = reg.get_definition(&def_id, Some(version)).ok_or_else(|| { + WfeError::DefinitionNotFound { id: def_id.clone(), version, - })?; + } + })?; // Create the child workflow instance. let mut instance = WorkflowInstance::new(&def_id, version, data); @@ -54,6 +54,16 @@ impl HostContext for HostContextImpl { instance.execution_pointers.push(ExecutionPointer::new(0)); } + // Auto-assign a human-friendly name before persisting so the + // child shows up as `{definition_id}-{N}` in lookups and logs. + // Sub-workflows always use the default; callers wanting a custom + // name should start the parent workflow directly. + let n = self + .persistence + .next_definition_sequence(&def_id) + .await?; + instance.name = format!("{def_id}-{n}"); + let id = self.persistence.create_new_workflow(&instance).await?; // Queue for execution. @@ -103,6 +113,7 @@ impl WorkflowHost { /// Spawn background polling tasks for processing workflows and events. pub async fn start(&self) -> Result<()> { self.register_primitives().await; + self.persistence.ensure_store_exists().await?; self.queue_provider.start().await?; self.lock_provider.start().await?; if let Some(ref search) = self.search { @@ -328,21 +339,40 @@ impl WorkflowHost { sr.register_factory(key, factory); } - /// Start a new workflow instance. - #[tracing::instrument( - name = "workflow.start", - skip(self, data), - fields( - workflow.definition_id = %definition_id, - workflow.version = version, - workflow.id, - ) - )] + /// Start a new workflow instance. The host auto-assigns a human-friendly + /// name of the form `{definition_id}-{N}`. Use `start_workflow_with_name` + /// to supply a caller-specified override. pub async fn start_workflow( &self, definition_id: &str, version: u32, data: serde_json::Value, + ) -> Result { + self.start_workflow_with_name(definition_id, version, data, None) + .await + } + + /// Start a new workflow instance with an optional caller-supplied + /// human-friendly name. When `name_override` is `None` the host + /// auto-assigns `{definition_id}-{N}` using a per-definition sequence. + #[tracing::instrument( + name = "workflow.start", + skip(self, data, name_override), + fields( + definition_id = %definition_id, + version, + workflow.definition_id = %definition_id, + workflow.version = version, + workflow.id = tracing::field::Empty, + workflow.name = tracing::field::Empty, + ) + )] + pub async fn start_workflow_with_name( + &self, + definition_id: &str, + version: u32, + data: serde_json::Value, + name_override: Option, ) -> Result { // Verify definition exists. let reg = self.registry.read().await; @@ -361,12 +391,37 @@ impl WorkflowHost { instance.execution_pointers.push(pointer); } + // Assign a human-friendly name. Callers may override (e.g. webhook + // handlers that want `ci-mainline-a1b2c3`); otherwise use the + // sequenced default. Validation: reject empty overrides so the name + // column invariant holds. + instance.name = match name_override { + Some(n) if !n.trim().is_empty() => n, + Some(_) => { + return Err(WfeError::StepExecution( + "workflow name override must be non-empty".to_string(), + )); + } + None => { + let n = self + .persistence + .next_definition_sequence(definition_id) + .await?; + format!("{definition_id}-{n}") + } + }; + // Persist the instance. let id = self.persistence.create_new_workflow(&instance).await?; instance.id = id.clone(); tracing::Span::current().record("workflow.id", id.as_str()); + tracing::Span::current().record("workflow.name", instance.name.as_str()); - info!(workflow_id = %id, "Workflow instance created"); + info!( + workflow_id = %id, + workflow_name = %instance.name, + "Workflow instance created" + ); // Queue for execution. self.queue_provider @@ -415,8 +470,8 @@ impl WorkflowHost { } /// Suspend a running workflow. - pub async fn suspend_workflow(&self, id: &str) -> Result { - let mut instance = self.persistence.get_workflow_instance(id).await?; + pub async fn suspend_workflow(&self, id_or_name: &str) -> Result { + let mut instance = self.get_workflow(id_or_name).await?; if instance.status != WorkflowStatus::Runnable { return Ok(false); } @@ -425,7 +480,7 @@ impl WorkflowHost { if let Some(ref publisher) = self.lifecycle { let _ = publisher .publish(LifecycleEvent::new( - id, + &instance.id, &instance.workflow_definition_id, instance.version, LifecycleEventType::Suspended, @@ -436,23 +491,24 @@ impl WorkflowHost { } /// Resume a suspended workflow. - pub async fn resume_workflow(&self, id: &str) -> Result { - let mut instance = self.persistence.get_workflow_instance(id).await?; + pub async fn resume_workflow(&self, id_or_name: &str) -> Result { + let mut instance = self.get_workflow(id_or_name).await?; if instance.status != WorkflowStatus::Suspended { return Ok(false); } instance.status = WorkflowStatus::Runnable; self.persistence.persist_workflow(&instance).await?; - // Re-queue for execution. + // Re-queue for execution using the canonical UUID (queue keys are + // always UUIDs, never names). self.queue_provider - .queue_work(id, QueueType::Workflow) + .queue_work(&instance.id, QueueType::Workflow) .await?; if let Some(ref publisher) = self.lifecycle { let _ = publisher .publish(LifecycleEvent::new( - id, + &instance.id, &instance.workflow_definition_id, instance.version, LifecycleEventType::Resumed, @@ -463,8 +519,8 @@ impl WorkflowHost { } /// Terminate a running workflow. - pub async fn terminate_workflow(&self, id: &str) -> Result { - let mut instance = self.persistence.get_workflow_instance(id).await?; + pub async fn terminate_workflow(&self, id_or_name: &str) -> Result { + let mut instance = self.get_workflow(id_or_name).await?; if instance.status == WorkflowStatus::Complete || instance.status == WorkflowStatus::Terminated { @@ -476,7 +532,7 @@ impl WorkflowHost { if let Some(ref publisher) = self.lifecycle { let _ = publisher .publish(LifecycleEvent::new( - id, + &instance.id, &instance.workflow_definition_id, instance.version, LifecycleEventType::Terminated, @@ -486,9 +542,28 @@ impl WorkflowHost { Ok(true) } - /// Fetch a workflow instance by ID. - pub async fn get_workflow(&self, id: &str) -> Result { - self.persistence.get_workflow_instance(id).await + /// Fetch a workflow instance by UUID or human-friendly name. + /// + /// Tries UUID lookup first for the common case. On `WorkflowNotFound`, + /// falls back to name lookup so callers can address instances + /// interchangeably (e.g. `ci-42` or the UUID it was assigned). + pub async fn get_workflow(&self, id_or_name: &str) -> Result { + match self.persistence.get_workflow_instance(id_or_name).await { + Ok(w) => Ok(w), + Err(WfeError::WorkflowNotFound(_)) => { + self.persistence + .get_workflow_instance_by_name(id_or_name) + .await + } + Err(e) => Err(e), + } + } + + /// Resolve an identifier (UUID or human-friendly name) to the canonical + /// UUID. Used by mutation APIs that still take `&str id` internally. + pub async fn resolve_workflow_id(&self, id_or_name: &str) -> Result { + let instance = self.get_workflow(id_or_name).await?; + Ok(instance.id) } /// Access the persistence provider. @@ -649,10 +724,7 @@ async fn inject_service_endpoints( } if let Some(data_obj) = instance.data.as_object_mut() { - data_obj.insert( - "services".into(), - serde_json::Value::Object(services_map), - ); + data_obj.insert("services".into(), serde_json::Value::Object(services_map)); } persistence.persist_workflow(&instance).await?;