feat(wfe): auto-assign workflow names + ensure store + name-or-UUID lookups

Three related host.rs changes that together make the 1.9 name support
end-to-end functional.

1. `WorkflowHost::start()` now calls `persistence.ensure_store_exists()`.
   The method existed on the trait and was implemented by every
   provider but nothing ever invoked it, so the Postgres/SQLite schema
   was never auto-created on startup — deployments failed on first
   persist with `relation "wfc.workflows" does not exist`.

2. New `start_workflow_with_name` entry point accepting an optional
   caller-supplied name override. The normal `start_workflow` is now a
   thin wrapper that passes `None` (auto-assign). The default path
   calls `next_definition_sequence(definition_id)` and formats the
   result as `{definition_id}-{N}` before persisting. Sub-workflow
   children also get auto-assigned names via HostContextImpl.

3. `get_workflow`/`suspend_workflow`/`resume_workflow`/
   `terminate_workflow` now accept either a UUID or a human-friendly
   name. `get_workflow` tries the UUID index first, then falls back to
   name lookup. A new `resolve_workflow_id` helper returns the
   canonical UUID so the gRPC log/lifecycle streams (which are keyed
   by UUID internally) can translate before subscribing.
This commit is contained in:
2026-04-07 19:01:02 +01:00
parent 9af1a0d276
commit be0b93e959

View File

@@ -6,18 +6,18 @@ use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
use wfe_core::builder::WorkflowBuilder;
use wfe_core::executor::{StepRegistry, WorkflowExecutor};
use wfe_core::models::{
Event, ExecutionPointer, LifecycleEvent, LifecycleEventType, PointerStatus, QueueType,
WorkflowDefinition, WorkflowInstance, WorkflowStatus,
};
use wfe_core::traits::registry::WorkflowRegistry;
use wfe_core::traits::{
DistributedLockProvider, HostContext, LifecyclePublisher, PersistenceProvider, QueueProvider,
SearchIndex, ServiceProvider, StepBody, WorkflowData,
};
use wfe_core::traits::registry::WorkflowRegistry;
use wfe_core::{Result, WfeError};
use wfe_core::builder::WorkflowBuilder;
use crate::registry::InMemoryWorkflowRegistry;
@@ -41,12 +41,12 @@ impl HostContext for HostContextImpl {
Box::pin(async move {
// Look up the definition.
let reg = self.registry.read().await;
let definition = reg
.get_definition(&def_id, Some(version))
.ok_or_else(|| WfeError::DefinitionNotFound {
let definition = reg.get_definition(&def_id, Some(version)).ok_or_else(|| {
WfeError::DefinitionNotFound {
id: def_id.clone(),
version,
})?;
}
})?;
// Create the child workflow instance.
let mut instance = WorkflowInstance::new(&def_id, version, data);
@@ -54,6 +54,16 @@ impl HostContext for HostContextImpl {
instance.execution_pointers.push(ExecutionPointer::new(0));
}
// Auto-assign a human-friendly name before persisting so the
// child shows up as `{definition_id}-{N}` in lookups and logs.
// Sub-workflows always use the default; callers wanting a custom
// name should start the parent workflow directly.
let n = self
.persistence
.next_definition_sequence(&def_id)
.await?;
instance.name = format!("{def_id}-{n}");
let id = self.persistence.create_new_workflow(&instance).await?;
// Queue for execution.
@@ -103,6 +113,7 @@ impl WorkflowHost {
/// Spawn background polling tasks for processing workflows and events.
pub async fn start(&self) -> Result<()> {
self.register_primitives().await;
self.persistence.ensure_store_exists().await?;
self.queue_provider.start().await?;
self.lock_provider.start().await?;
if let Some(ref search) = self.search {
@@ -328,21 +339,40 @@ impl WorkflowHost {
sr.register_factory(key, factory);
}
/// Start a new workflow instance.
#[tracing::instrument(
name = "workflow.start",
skip(self, data),
fields(
workflow.definition_id = %definition_id,
workflow.version = version,
workflow.id,
)
)]
/// Start a new workflow instance. The host auto-assigns a human-friendly
/// name of the form `{definition_id}-{N}`. Use `start_workflow_with_name`
/// to supply a caller-specified override.
pub async fn start_workflow(
&self,
definition_id: &str,
version: u32,
data: serde_json::Value,
) -> Result<String> {
self.start_workflow_with_name(definition_id, version, data, None)
.await
}
/// Start a new workflow instance with an optional caller-supplied
/// human-friendly name. When `name_override` is `None` the host
/// auto-assigns `{definition_id}-{N}` using a per-definition sequence.
#[tracing::instrument(
name = "workflow.start",
skip(self, data, name_override),
fields(
definition_id = %definition_id,
version,
workflow.definition_id = %definition_id,
workflow.version = version,
workflow.id = tracing::field::Empty,
workflow.name = tracing::field::Empty,
)
)]
pub async fn start_workflow_with_name(
&self,
definition_id: &str,
version: u32,
data: serde_json::Value,
name_override: Option<String>,
) -> Result<String> {
// Verify definition exists.
let reg = self.registry.read().await;
@@ -361,12 +391,37 @@ impl WorkflowHost {
instance.execution_pointers.push(pointer);
}
// Assign a human-friendly name. Callers may override (e.g. webhook
// handlers that want `ci-mainline-a1b2c3`); otherwise use the
// sequenced default. Validation: reject empty overrides so the name
// column invariant holds.
instance.name = match name_override {
Some(n) if !n.trim().is_empty() => n,
Some(_) => {
return Err(WfeError::StepExecution(
"workflow name override must be non-empty".to_string(),
));
}
None => {
let n = self
.persistence
.next_definition_sequence(definition_id)
.await?;
format!("{definition_id}-{n}")
}
};
// Persist the instance.
let id = self.persistence.create_new_workflow(&instance).await?;
instance.id = id.clone();
tracing::Span::current().record("workflow.id", id.as_str());
tracing::Span::current().record("workflow.name", instance.name.as_str());
info!(workflow_id = %id, "Workflow instance created");
info!(
workflow_id = %id,
workflow_name = %instance.name,
"Workflow instance created"
);
// Queue for execution.
self.queue_provider
@@ -415,8 +470,8 @@ impl WorkflowHost {
}
/// Suspend a running workflow.
pub async fn suspend_workflow(&self, id: &str) -> Result<bool> {
let mut instance = self.persistence.get_workflow_instance(id).await?;
pub async fn suspend_workflow(&self, id_or_name: &str) -> Result<bool> {
let mut instance = self.get_workflow(id_or_name).await?;
if instance.status != WorkflowStatus::Runnable {
return Ok(false);
}
@@ -425,7 +480,7 @@ impl WorkflowHost {
if let Some(ref publisher) = self.lifecycle {
let _ = publisher
.publish(LifecycleEvent::new(
id,
&instance.id,
&instance.workflow_definition_id,
instance.version,
LifecycleEventType::Suspended,
@@ -436,23 +491,24 @@ impl WorkflowHost {
}
/// Resume a suspended workflow.
pub async fn resume_workflow(&self, id: &str) -> Result<bool> {
let mut instance = self.persistence.get_workflow_instance(id).await?;
pub async fn resume_workflow(&self, id_or_name: &str) -> Result<bool> {
let mut instance = self.get_workflow(id_or_name).await?;
if instance.status != WorkflowStatus::Suspended {
return Ok(false);
}
instance.status = WorkflowStatus::Runnable;
self.persistence.persist_workflow(&instance).await?;
// Re-queue for execution.
// Re-queue for execution using the canonical UUID (queue keys are
// always UUIDs, never names).
self.queue_provider
.queue_work(id, QueueType::Workflow)
.queue_work(&instance.id, QueueType::Workflow)
.await?;
if let Some(ref publisher) = self.lifecycle {
let _ = publisher
.publish(LifecycleEvent::new(
id,
&instance.id,
&instance.workflow_definition_id,
instance.version,
LifecycleEventType::Resumed,
@@ -463,8 +519,8 @@ impl WorkflowHost {
}
/// Terminate a running workflow.
pub async fn terminate_workflow(&self, id: &str) -> Result<bool> {
let mut instance = self.persistence.get_workflow_instance(id).await?;
pub async fn terminate_workflow(&self, id_or_name: &str) -> Result<bool> {
let mut instance = self.get_workflow(id_or_name).await?;
if instance.status == WorkflowStatus::Complete
|| instance.status == WorkflowStatus::Terminated
{
@@ -476,7 +532,7 @@ impl WorkflowHost {
if let Some(ref publisher) = self.lifecycle {
let _ = publisher
.publish(LifecycleEvent::new(
id,
&instance.id,
&instance.workflow_definition_id,
instance.version,
LifecycleEventType::Terminated,
@@ -486,9 +542,28 @@ impl WorkflowHost {
Ok(true)
}
/// Fetch a workflow instance by ID.
pub async fn get_workflow(&self, id: &str) -> Result<WorkflowInstance> {
self.persistence.get_workflow_instance(id).await
/// Fetch a workflow instance by UUID or human-friendly name.
///
/// Tries UUID lookup first for the common case. On `WorkflowNotFound`,
/// falls back to name lookup so callers can address instances
/// interchangeably (e.g. `ci-42` or the UUID it was assigned).
pub async fn get_workflow(&self, id_or_name: &str) -> Result<WorkflowInstance> {
match self.persistence.get_workflow_instance(id_or_name).await {
Ok(w) => Ok(w),
Err(WfeError::WorkflowNotFound(_)) => {
self.persistence
.get_workflow_instance_by_name(id_or_name)
.await
}
Err(e) => Err(e),
}
}
/// Resolve an identifier (UUID or human-friendly name) to the canonical
/// UUID. Used by mutation APIs that still take `&str id` internally.
pub async fn resolve_workflow_id(&self, id_or_name: &str) -> Result<String> {
let instance = self.get_workflow(id_or_name).await?;
Ok(instance.id)
}
/// Access the persistence provider.
@@ -649,10 +724,7 @@ async fn inject_service_endpoints(
}
if let Some(data_obj) = instance.data.as_object_mut() {
data_obj.insert(
"services".into(),
serde_json::Value::Object(services_map),
);
data_obj.insert("services".into(), serde_json::Value::Object(services_map));
}
persistence.persist_workflow(&instance).await?;