feat(persistence): name column, name lookup, definition sequence counter

Land the `name` field and `next_definition_sequence` counter in the
two real persistence backends. Both providers:

* Add `name TEXT NOT NULL UNIQUE` to the `workflows` table.
* Add a `definition_sequences` table (`definition_id, next_num`) with
  an atomic UPSERT + RETURNING to give the host a race-free monotonic
  counter for `{def_id}-{N}` name generation.
* INSERT/UPDATE queries now include `name`; SELECT row parsers hydrate
  it back onto `WorkflowInstance`.
* New `get_workflow_instance_by_name` method for name-based lookups
  used by grpc handlers.

Postgres includes a DO-block migration that back-fills `name` from
`id` on pre-existing deployments so the NOT NULL + UNIQUE invariant
holds retroactively; callers can overwrite with a real name on the
next persist.
This commit is contained in:
2026-04-07 18:58:25 +01:00
parent d9b9c5651e
commit 9af1a0d276
2 changed files with 228 additions and 139 deletions

View File

@@ -6,8 +6,8 @@ use sqlx::postgres::PgPoolOptions;
use sqlx::{PgPool, Row};
use wfe_core::models::{
CommandName, Event, EventSubscription, ExecutionError, ExecutionPointer, ScheduledCommand,
WorkflowInstance, WorkflowStatus, PointerStatus,
CommandName, Event, EventSubscription, ExecutionError, ExecutionPointer, PointerStatus,
ScheduledCommand, WorkflowInstance, WorkflowStatus,
};
use wfe_core::traits::{
EventRepository, PersistenceProvider, ScheduledCommandRepository, SubscriptionRepository,
@@ -57,7 +57,9 @@ impl PostgresPersistenceProvider {
"Suspended" => Ok(WorkflowStatus::Suspended),
"Complete" => Ok(WorkflowStatus::Complete),
"Terminated" => Ok(WorkflowStatus::Terminated),
other => Err(WfeError::Persistence(format!("Unknown workflow status: {other}"))),
other => Err(WfeError::Persistence(format!(
"Unknown workflow status: {other}"
))),
}
}
@@ -88,7 +90,9 @@ impl PostgresPersistenceProvider {
"Compensated" => Ok(PointerStatus::Compensated),
"Cancelled" => Ok(PointerStatus::Cancelled),
"PendingPredecessor" => Ok(PointerStatus::PendingPredecessor),
other => Err(WfeError::Persistence(format!("Unknown pointer status: {other}"))),
other => Err(WfeError::Persistence(format!(
"Unknown pointer status: {other}"
))),
}
}
@@ -103,7 +107,9 @@ impl PostgresPersistenceProvider {
match s {
"ProcessWorkflow" => Ok(CommandName::ProcessWorkflow),
"ProcessEvent" => Ok(CommandName::ProcessEvent),
other => Err(WfeError::Persistence(format!("Unknown command name: {other}"))),
other => Err(WfeError::Persistence(format!(
"Unknown command name: {other}"
))),
}
}
@@ -118,8 +124,9 @@ impl PostgresPersistenceProvider {
.map_err(|e| WfeError::Persistence(format!("Failed to serialize children: {e}")))?;
let scope_json = serde_json::to_value(&p.scope)
.map_err(|e| WfeError::Persistence(format!("Failed to serialize scope: {e}")))?;
let ext_json = serde_json::to_value(&p.extension_attributes)
.map_err(|e| WfeError::Persistence(format!("Failed to serialize extension_attributes: {e}")))?;
let ext_json = serde_json::to_value(&p.extension_attributes).map_err(|e| {
WfeError::Persistence(format!("Failed to serialize extension_attributes: {e}"))
})?;
sqlx::query(
r#"INSERT INTO wfc.execution_pointers
@@ -158,13 +165,11 @@ impl PostgresPersistenceProvider {
}
async fn load_pointers(&self, workflow_id: &str) -> Result<Vec<ExecutionPointer>> {
let rows = sqlx::query(
"SELECT * FROM wfc.execution_pointers WHERE workflow_id = $1",
)
.bind(workflow_id)
.fetch_all(&self.pool)
.await
.map_err(Self::map_sqlx_err)?;
let rows = sqlx::query("SELECT * FROM wfc.execution_pointers WHERE workflow_id = $1")
.bind(workflow_id)
.fetch_all(&self.pool)
.await
.map_err(Self::map_sqlx_err)?;
let mut pointers = Vec::with_capacity(rows.len());
for row in &rows {
@@ -183,8 +188,9 @@ impl PostgresPersistenceProvider {
let scope: Vec<String> = serde_json::from_value(scope_json)
.map_err(|e| WfeError::Persistence(format!("Failed to deserialize scope: {e}")))?;
let extension_attributes: HashMap<String, serde_json::Value> =
serde_json::from_value(ext_json)
.map_err(|e| WfeError::Persistence(format!("Failed to deserialize extension_attributes: {e}")))?;
serde_json::from_value(ext_json).map_err(|e| {
WfeError::Persistence(format!("Failed to deserialize extension_attributes: {e}"))
})?;
let status_str: String = row.get("status");
@@ -226,11 +232,12 @@ impl WorkflowRepository for PostgresPersistenceProvider {
sqlx::query(
r#"INSERT INTO wfc.workflows
(id, definition_id, version, description, reference, status, data,
(id, name, definition_id, version, description, reference, status, data,
next_execution, create_time, complete_time)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)"#,
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)"#,
)
.bind(&id)
.bind(&instance.name)
.bind(&instance.workflow_definition_id)
.bind(instance.version as i32)
.bind(&instance.description)
@@ -245,7 +252,8 @@ impl WorkflowRepository for PostgresPersistenceProvider {
.map_err(Self::map_sqlx_err)?;
// Insert execution pointers
self.insert_pointers(&mut tx, &id, &instance.execution_pointers).await?;
self.insert_pointers(&mut tx, &id, &instance.execution_pointers)
.await?;
tx.commit().await.map_err(Self::map_sqlx_err)?;
Ok(id)
@@ -256,11 +264,12 @@ impl WorkflowRepository for PostgresPersistenceProvider {
sqlx::query(
r#"UPDATE wfc.workflows SET
definition_id=$2, version=$3, description=$4, reference=$5,
status=$6, data=$7, next_execution=$8, create_time=$9, complete_time=$10
name=$2, definition_id=$3, version=$4, description=$5, reference=$6,
status=$7, data=$8, next_execution=$9, create_time=$10, complete_time=$11
WHERE id=$1"#,
)
.bind(&instance.id)
.bind(&instance.name)
.bind(&instance.workflow_definition_id)
.bind(instance.version as i32)
.bind(&instance.description)
@@ -297,11 +306,12 @@ impl WorkflowRepository for PostgresPersistenceProvider {
sqlx::query(
r#"UPDATE wfc.workflows SET
definition_id=$2, version=$3, description=$4, reference=$5,
status=$6, data=$7, next_execution=$8, create_time=$9, complete_time=$10
name=$2, definition_id=$3, version=$4, description=$5, reference=$6,
status=$7, data=$8, next_execution=$9, create_time=$10, complete_time=$11
WHERE id=$1"#,
)
.bind(&instance.id)
.bind(&instance.name)
.bind(&instance.workflow_definition_id)
.bind(instance.version as i32)
.bind(&instance.description)
@@ -385,6 +395,7 @@ impl WorkflowRepository for PostgresPersistenceProvider {
Ok(WorkflowInstance {
id: row.get("id"),
name: row.get("name"),
workflow_definition_id: row.get("definition_id"),
version: row.get::<i32, _>("version") as u32,
description: row.get("description"),
@@ -398,6 +409,35 @@ impl WorkflowRepository for PostgresPersistenceProvider {
})
}
async fn get_workflow_instance_by_name(&self, name: &str) -> Result<WorkflowInstance> {
let row = sqlx::query("SELECT id FROM wfc.workflows WHERE name = $1")
.bind(name)
.fetch_optional(&self.pool)
.await
.map_err(Self::map_sqlx_err)?
.ok_or_else(|| WfeError::WorkflowNotFound(name.to_string()))?;
let id: String = row.get("id");
self.get_workflow_instance(&id).await
}
async fn next_definition_sequence(&self, definition_id: &str) -> Result<u64> {
// UPSERT the counter atomically and return the new value. `RETURNING`
// gives us the post-increment number in a single round trip.
let row = sqlx::query(
r#"INSERT INTO wfc.definition_sequences (definition_id, next_num)
VALUES ($1, 1)
ON CONFLICT (definition_id) DO UPDATE
SET next_num = wfc.definition_sequences.next_num + 1
RETURNING next_num"#,
)
.bind(definition_id)
.fetch_one(&self.pool)
.await
.map_err(Self::map_sqlx_err)?;
let next: i64 = row.get("next_num");
Ok(next as u64)
}
async fn get_workflow_instances(&self, ids: &[String]) -> Result<Vec<WorkflowInstance>> {
let mut result = Vec::new();
for id in ids {
@@ -413,10 +453,7 @@ impl WorkflowRepository for PostgresPersistenceProvider {
#[async_trait]
impl SubscriptionRepository for PostgresPersistenceProvider {
async fn create_event_subscription(
&self,
subscription: &EventSubscription,
) -> Result<String> {
async fn create_event_subscription(&self, subscription: &EventSubscription) -> Result<String> {
let id = if subscription.id.is_empty() {
uuid::Uuid::new_v4().to_string()
} else {
@@ -471,18 +508,14 @@ impl SubscriptionRepository for PostgresPersistenceProvider {
}
async fn terminate_subscription(&self, subscription_id: &str) -> Result<()> {
let result = sqlx::query(
"DELETE FROM wfc.event_subscriptions WHERE id = $1",
)
.bind(subscription_id)
.execute(&self.pool)
.await
.map_err(Self::map_sqlx_err)?;
let result = sqlx::query("DELETE FROM wfc.event_subscriptions WHERE id = $1")
.bind(subscription_id)
.execute(&self.pool)
.await
.map_err(Self::map_sqlx_err)?;
if result.rows_affected() == 0 {
return Err(WfeError::SubscriptionNotFound(
subscription_id.to_string(),
));
return Err(WfeError::SubscriptionNotFound(subscription_id.to_string()));
}
Ok(())
}
@@ -550,20 +583,14 @@ impl SubscriptionRepository for PostgresPersistenceProvider {
.await
.map_err(Self::map_sqlx_err)?;
if exists.is_none() {
return Err(WfeError::SubscriptionNotFound(
subscription_id.to_string(),
));
return Err(WfeError::SubscriptionNotFound(subscription_id.to_string()));
}
return Ok(false);
}
Ok(true)
}
async fn clear_subscription_token(
&self,
subscription_id: &str,
token: &str,
) -> Result<()> {
async fn clear_subscription_token(&self, subscription_id: &str, token: &str) -> Result<()> {
let result = sqlx::query(
r#"UPDATE wfc.event_subscriptions
SET external_token = NULL, external_worker_id = NULL, external_token_expiry = NULL
@@ -576,9 +603,7 @@ impl SubscriptionRepository for PostgresPersistenceProvider {
.map_err(Self::map_sqlx_err)?;
if result.rows_affected() == 0 {
return Err(WfeError::SubscriptionNotFound(
subscription_id.to_string(),
));
return Err(WfeError::SubscriptionNotFound(subscription_id.to_string()));
}
Ok(())
}
@@ -731,20 +756,23 @@ impl ScheduledCommandRepository for PostgresPersistenceProvider {
async fn process_commands(
&self,
as_of: DateTime<Utc>,
handler: &(dyn Fn(ScheduledCommand) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
+ Send
+ Sync),
handler: &(
dyn Fn(
ScheduledCommand,
)
-> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>>
+ Send
+ Sync
),
) -> Result<()> {
let as_of_millis = as_of.timestamp_millis();
// 1. SELECT due commands (do not delete yet)
let rows = sqlx::query(
"SELECT * FROM wfc.scheduled_commands WHERE execute_time <= $1",
)
.bind(as_of_millis)
.fetch_all(&self.pool)
.await
.map_err(Self::map_sqlx_err)?;
let rows = sqlx::query("SELECT * FROM wfc.scheduled_commands WHERE execute_time <= $1")
.bind(as_of_millis)
.fetch_all(&self.pool)
.await
.map_err(Self::map_sqlx_err)?;
let commands: Vec<ScheduledCommand> = rows
.iter()
@@ -803,6 +831,7 @@ impl PersistenceProvider for PostgresPersistenceProvider {
sqlx::query(
r#"CREATE TABLE IF NOT EXISTS wfc.workflows (
id TEXT PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
definition_id TEXT NOT NULL,
version INT NOT NULL,
description TEXT,
@@ -818,6 +847,39 @@ impl PersistenceProvider for PostgresPersistenceProvider {
.await
.map_err(Self::map_sqlx_err)?;
// Upgrade older databases that lack the `name` column. Back-fill with
// the UUID so the NOT NULL + UNIQUE invariant holds retroactively;
// callers can re-run with a real name on the next persist.
sqlx::query(
r#"DO $$
BEGIN
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = 'wfc' AND table_name = 'workflows'
AND column_name = 'name'
) THEN
ALTER TABLE wfc.workflows ADD COLUMN name TEXT;
UPDATE wfc.workflows SET name = id WHERE name IS NULL;
ALTER TABLE wfc.workflows ALTER COLUMN name SET NOT NULL;
CREATE UNIQUE INDEX IF NOT EXISTS idx_workflows_name
ON wfc.workflows (name);
END IF;
END$$;"#,
)
.execute(&self.pool)
.await
.map_err(Self::map_sqlx_err)?;
sqlx::query(
r#"CREATE TABLE IF NOT EXISTS wfc.definition_sequences (
definition_id TEXT PRIMARY KEY,
next_num BIGINT NOT NULL
)"#,
)
.execute(&self.pool)
.await
.map_err(Self::map_sqlx_err)?;
sqlx::query(
r#"CREATE TABLE IF NOT EXISTS wfc.execution_pointers (
id TEXT PRIMARY KEY,