diff --git a/wfe-postgres/src/lib.rs b/wfe-postgres/src/lib.rs index 09a0a19..dba8f7b 100644 --- a/wfe-postgres/src/lib.rs +++ b/wfe-postgres/src/lib.rs @@ -6,8 +6,8 @@ use sqlx::postgres::PgPoolOptions; use sqlx::{PgPool, Row}; use wfe_core::models::{ - CommandName, Event, EventSubscription, ExecutionError, ExecutionPointer, ScheduledCommand, - WorkflowInstance, WorkflowStatus, PointerStatus, + CommandName, Event, EventSubscription, ExecutionError, ExecutionPointer, PointerStatus, + ScheduledCommand, WorkflowInstance, WorkflowStatus, }; use wfe_core::traits::{ EventRepository, PersistenceProvider, ScheduledCommandRepository, SubscriptionRepository, @@ -57,7 +57,9 @@ impl PostgresPersistenceProvider { "Suspended" => Ok(WorkflowStatus::Suspended), "Complete" => Ok(WorkflowStatus::Complete), "Terminated" => Ok(WorkflowStatus::Terminated), - other => Err(WfeError::Persistence(format!("Unknown workflow status: {other}"))), + other => Err(WfeError::Persistence(format!( + "Unknown workflow status: {other}" + ))), } } @@ -88,7 +90,9 @@ impl PostgresPersistenceProvider { "Compensated" => Ok(PointerStatus::Compensated), "Cancelled" => Ok(PointerStatus::Cancelled), "PendingPredecessor" => Ok(PointerStatus::PendingPredecessor), - other => Err(WfeError::Persistence(format!("Unknown pointer status: {other}"))), + other => Err(WfeError::Persistence(format!( + "Unknown pointer status: {other}" + ))), } } @@ -103,7 +107,9 @@ impl PostgresPersistenceProvider { match s { "ProcessWorkflow" => Ok(CommandName::ProcessWorkflow), "ProcessEvent" => Ok(CommandName::ProcessEvent), - other => Err(WfeError::Persistence(format!("Unknown command name: {other}"))), + other => Err(WfeError::Persistence(format!( + "Unknown command name: {other}" + ))), } } @@ -118,8 +124,9 @@ impl PostgresPersistenceProvider { .map_err(|e| WfeError::Persistence(format!("Failed to serialize children: {e}")))?; let scope_json = serde_json::to_value(&p.scope) .map_err(|e| WfeError::Persistence(format!("Failed to serialize scope: {e}")))?; - let ext_json = serde_json::to_value(&p.extension_attributes) - .map_err(|e| WfeError::Persistence(format!("Failed to serialize extension_attributes: {e}")))?; + let ext_json = serde_json::to_value(&p.extension_attributes).map_err(|e| { + WfeError::Persistence(format!("Failed to serialize extension_attributes: {e}")) + })?; sqlx::query( r#"INSERT INTO wfc.execution_pointers @@ -158,13 +165,11 @@ impl PostgresPersistenceProvider { } async fn load_pointers(&self, workflow_id: &str) -> Result> { - let rows = sqlx::query( - "SELECT * FROM wfc.execution_pointers WHERE workflow_id = $1", - ) - .bind(workflow_id) - .fetch_all(&self.pool) - .await - .map_err(Self::map_sqlx_err)?; + let rows = sqlx::query("SELECT * FROM wfc.execution_pointers WHERE workflow_id = $1") + .bind(workflow_id) + .fetch_all(&self.pool) + .await + .map_err(Self::map_sqlx_err)?; let mut pointers = Vec::with_capacity(rows.len()); for row in &rows { @@ -183,8 +188,9 @@ impl PostgresPersistenceProvider { let scope: Vec = serde_json::from_value(scope_json) .map_err(|e| WfeError::Persistence(format!("Failed to deserialize scope: {e}")))?; let extension_attributes: HashMap = - serde_json::from_value(ext_json) - .map_err(|e| WfeError::Persistence(format!("Failed to deserialize extension_attributes: {e}")))?; + serde_json::from_value(ext_json).map_err(|e| { + WfeError::Persistence(format!("Failed to deserialize extension_attributes: {e}")) + })?; let status_str: String = row.get("status"); @@ -226,11 +232,12 @@ impl WorkflowRepository for PostgresPersistenceProvider { sqlx::query( r#"INSERT INTO wfc.workflows - (id, definition_id, version, description, reference, status, data, + (id, name, definition_id, version, description, reference, status, data, next_execution, create_time, complete_time) - VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)"#, + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)"#, ) .bind(&id) + .bind(&instance.name) .bind(&instance.workflow_definition_id) .bind(instance.version as i32) .bind(&instance.description) @@ -245,7 +252,8 @@ impl WorkflowRepository for PostgresPersistenceProvider { .map_err(Self::map_sqlx_err)?; // Insert execution pointers - self.insert_pointers(&mut tx, &id, &instance.execution_pointers).await?; + self.insert_pointers(&mut tx, &id, &instance.execution_pointers) + .await?; tx.commit().await.map_err(Self::map_sqlx_err)?; Ok(id) @@ -256,11 +264,12 @@ impl WorkflowRepository for PostgresPersistenceProvider { sqlx::query( r#"UPDATE wfc.workflows SET - definition_id=$2, version=$3, description=$4, reference=$5, - status=$6, data=$7, next_execution=$8, create_time=$9, complete_time=$10 + name=$2, definition_id=$3, version=$4, description=$5, reference=$6, + status=$7, data=$8, next_execution=$9, create_time=$10, complete_time=$11 WHERE id=$1"#, ) .bind(&instance.id) + .bind(&instance.name) .bind(&instance.workflow_definition_id) .bind(instance.version as i32) .bind(&instance.description) @@ -297,11 +306,12 @@ impl WorkflowRepository for PostgresPersistenceProvider { sqlx::query( r#"UPDATE wfc.workflows SET - definition_id=$2, version=$3, description=$4, reference=$5, - status=$6, data=$7, next_execution=$8, create_time=$9, complete_time=$10 + name=$2, definition_id=$3, version=$4, description=$5, reference=$6, + status=$7, data=$8, next_execution=$9, create_time=$10, complete_time=$11 WHERE id=$1"#, ) .bind(&instance.id) + .bind(&instance.name) .bind(&instance.workflow_definition_id) .bind(instance.version as i32) .bind(&instance.description) @@ -385,6 +395,7 @@ impl WorkflowRepository for PostgresPersistenceProvider { Ok(WorkflowInstance { id: row.get("id"), + name: row.get("name"), workflow_definition_id: row.get("definition_id"), version: row.get::("version") as u32, description: row.get("description"), @@ -398,6 +409,35 @@ impl WorkflowRepository for PostgresPersistenceProvider { }) } + async fn get_workflow_instance_by_name(&self, name: &str) -> Result { + let row = sqlx::query("SELECT id FROM wfc.workflows WHERE name = $1") + .bind(name) + .fetch_optional(&self.pool) + .await + .map_err(Self::map_sqlx_err)? + .ok_or_else(|| WfeError::WorkflowNotFound(name.to_string()))?; + let id: String = row.get("id"); + self.get_workflow_instance(&id).await + } + + async fn next_definition_sequence(&self, definition_id: &str) -> Result { + // UPSERT the counter atomically and return the new value. `RETURNING` + // gives us the post-increment number in a single round trip. + let row = sqlx::query( + r#"INSERT INTO wfc.definition_sequences (definition_id, next_num) + VALUES ($1, 1) + ON CONFLICT (definition_id) DO UPDATE + SET next_num = wfc.definition_sequences.next_num + 1 + RETURNING next_num"#, + ) + .bind(definition_id) + .fetch_one(&self.pool) + .await + .map_err(Self::map_sqlx_err)?; + let next: i64 = row.get("next_num"); + Ok(next as u64) + } + async fn get_workflow_instances(&self, ids: &[String]) -> Result> { let mut result = Vec::new(); for id in ids { @@ -413,10 +453,7 @@ impl WorkflowRepository for PostgresPersistenceProvider { #[async_trait] impl SubscriptionRepository for PostgresPersistenceProvider { - async fn create_event_subscription( - &self, - subscription: &EventSubscription, - ) -> Result { + async fn create_event_subscription(&self, subscription: &EventSubscription) -> Result { let id = if subscription.id.is_empty() { uuid::Uuid::new_v4().to_string() } else { @@ -471,18 +508,14 @@ impl SubscriptionRepository for PostgresPersistenceProvider { } async fn terminate_subscription(&self, subscription_id: &str) -> Result<()> { - let result = sqlx::query( - "DELETE FROM wfc.event_subscriptions WHERE id = $1", - ) - .bind(subscription_id) - .execute(&self.pool) - .await - .map_err(Self::map_sqlx_err)?; + let result = sqlx::query("DELETE FROM wfc.event_subscriptions WHERE id = $1") + .bind(subscription_id) + .execute(&self.pool) + .await + .map_err(Self::map_sqlx_err)?; if result.rows_affected() == 0 { - return Err(WfeError::SubscriptionNotFound( - subscription_id.to_string(), - )); + return Err(WfeError::SubscriptionNotFound(subscription_id.to_string())); } Ok(()) } @@ -550,20 +583,14 @@ impl SubscriptionRepository for PostgresPersistenceProvider { .await .map_err(Self::map_sqlx_err)?; if exists.is_none() { - return Err(WfeError::SubscriptionNotFound( - subscription_id.to_string(), - )); + return Err(WfeError::SubscriptionNotFound(subscription_id.to_string())); } return Ok(false); } Ok(true) } - async fn clear_subscription_token( - &self, - subscription_id: &str, - token: &str, - ) -> Result<()> { + async fn clear_subscription_token(&self, subscription_id: &str, token: &str) -> Result<()> { let result = sqlx::query( r#"UPDATE wfc.event_subscriptions SET external_token = NULL, external_worker_id = NULL, external_token_expiry = NULL @@ -576,9 +603,7 @@ impl SubscriptionRepository for PostgresPersistenceProvider { .map_err(Self::map_sqlx_err)?; if result.rows_affected() == 0 { - return Err(WfeError::SubscriptionNotFound( - subscription_id.to_string(), - )); + return Err(WfeError::SubscriptionNotFound(subscription_id.to_string())); } Ok(()) } @@ -731,20 +756,23 @@ impl ScheduledCommandRepository for PostgresPersistenceProvider { async fn process_commands( &self, as_of: DateTime, - handler: &(dyn Fn(ScheduledCommand) -> std::pin::Pin> + Send>> - + Send - + Sync), + handler: &( + dyn Fn( + ScheduledCommand, + ) + -> std::pin::Pin> + Send>> + + Send + + Sync + ), ) -> Result<()> { let as_of_millis = as_of.timestamp_millis(); // 1. SELECT due commands (do not delete yet) - let rows = sqlx::query( - "SELECT * FROM wfc.scheduled_commands WHERE execute_time <= $1", - ) - .bind(as_of_millis) - .fetch_all(&self.pool) - .await - .map_err(Self::map_sqlx_err)?; + let rows = sqlx::query("SELECT * FROM wfc.scheduled_commands WHERE execute_time <= $1") + .bind(as_of_millis) + .fetch_all(&self.pool) + .await + .map_err(Self::map_sqlx_err)?; let commands: Vec = rows .iter() @@ -803,6 +831,7 @@ impl PersistenceProvider for PostgresPersistenceProvider { sqlx::query( r#"CREATE TABLE IF NOT EXISTS wfc.workflows ( id TEXT PRIMARY KEY, + name TEXT NOT NULL UNIQUE, definition_id TEXT NOT NULL, version INT NOT NULL, description TEXT, @@ -818,6 +847,39 @@ impl PersistenceProvider for PostgresPersistenceProvider { .await .map_err(Self::map_sqlx_err)?; + // Upgrade older databases that lack the `name` column. Back-fill with + // the UUID so the NOT NULL + UNIQUE invariant holds retroactively; + // callers can re-run with a real name on the next persist. + sqlx::query( + r#"DO $$ + BEGIN + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'wfc' AND table_name = 'workflows' + AND column_name = 'name' + ) THEN + ALTER TABLE wfc.workflows ADD COLUMN name TEXT; + UPDATE wfc.workflows SET name = id WHERE name IS NULL; + ALTER TABLE wfc.workflows ALTER COLUMN name SET NOT NULL; + CREATE UNIQUE INDEX IF NOT EXISTS idx_workflows_name + ON wfc.workflows (name); + END IF; + END$$;"#, + ) + .execute(&self.pool) + .await + .map_err(Self::map_sqlx_err)?; + + sqlx::query( + r#"CREATE TABLE IF NOT EXISTS wfc.definition_sequences ( + definition_id TEXT PRIMARY KEY, + next_num BIGINT NOT NULL + )"#, + ) + .execute(&self.pool) + .await + .map_err(Self::map_sqlx_err)?; + sqlx::query( r#"CREATE TABLE IF NOT EXISTS wfc.execution_pointers ( id TEXT PRIMARY KEY, diff --git a/wfe-sqlite/src/lib.rs b/wfe-sqlite/src/lib.rs index c038fc7..03df56c 100644 --- a/wfe-sqlite/src/lib.rs +++ b/wfe-sqlite/src/lib.rs @@ -57,6 +57,7 @@ impl SqlitePersistenceProvider { sqlx::query( "CREATE TABLE IF NOT EXISTS workflows ( id TEXT PRIMARY KEY, + name TEXT NOT NULL UNIQUE, definition_id TEXT NOT NULL, version INTEGER NOT NULL, description TEXT, @@ -71,6 +72,17 @@ impl SqlitePersistenceProvider { .execute(&self.pool) .await?; + // Per-definition monotonic counter used to generate human-friendly + // instance names of the form `{definition_id}-{N}`. + sqlx::query( + "CREATE TABLE IF NOT EXISTS definition_sequences ( + definition_id TEXT PRIMARY KEY, + next_num INTEGER NOT NULL + )", + ) + .execute(&self.pool) + .await?; + sqlx::query( "CREATE TABLE IF NOT EXISTS execution_pointers ( id TEXT PRIMARY KEY, @@ -157,30 +169,28 @@ impl SqlitePersistenceProvider { .await?; // Indexes - sqlx::query("CREATE INDEX IF NOT EXISTS idx_workflows_next_execution ON workflows(next_execution)") - .execute(&self.pool) - .await?; sqlx::query( - "CREATE INDEX IF NOT EXISTS idx_workflows_status ON workflows(status)", + "CREATE INDEX IF NOT EXISTS idx_workflows_next_execution ON workflows(next_execution)", ) .execute(&self.pool) .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_workflows_status ON workflows(status)") + .execute(&self.pool) + .await?; sqlx::query("CREATE INDEX IF NOT EXISTS idx_execution_pointers_workflow_id ON execution_pointers(workflow_id)") .execute(&self.pool) .await?; - sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_name_key ON events(event_name, event_key)") + sqlx::query( + "CREATE INDEX IF NOT EXISTS idx_events_name_key ON events(event_name, event_key)", + ) + .execute(&self.pool) + .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_is_processed ON events(is_processed)") + .execute(&self.pool) + .await?; + sqlx::query("CREATE INDEX IF NOT EXISTS idx_events_event_time ON events(event_time)") .execute(&self.pool) .await?; - sqlx::query( - "CREATE INDEX IF NOT EXISTS idx_events_is_processed ON events(is_processed)", - ) - .execute(&self.pool) - .await?; - sqlx::query( - "CREATE INDEX IF NOT EXISTS idx_events_event_time ON events(event_time)", - ) - .execute(&self.pool) - .await?; sqlx::query("CREATE INDEX IF NOT EXISTS idx_event_subscriptions_name_key ON event_subscriptions(event_name, event_key)") .execute(&self.pool) .await?; @@ -226,10 +236,8 @@ fn row_to_workflow( pointers: Vec, ) -> std::result::Result { let status_str: String = row.try_get("status").map_err(to_persistence_err)?; - let status: WorkflowStatus = - serde_json::from_str(&format!("\"{status_str}\"")).map_err(|e| { - WfeError::Persistence(format!("Failed to deserialize WorkflowStatus: {e}")) - })?; + let status: WorkflowStatus = serde_json::from_str(&format!("\"{status_str}\"")) + .map_err(|e| WfeError::Persistence(format!("Failed to deserialize WorkflowStatus: {e}")))?; let data_str: String = row.try_get("data").map_err(to_persistence_err)?; let data: serde_json::Value = serde_json::from_str(&data_str) @@ -241,6 +249,7 @@ fn row_to_workflow( Ok(WorkflowInstance { id: row.try_get("id").map_err(to_persistence_err)?, + name: row.try_get("name").map_err(to_persistence_err)?, workflow_definition_id: row.try_get("definition_id").map_err(to_persistence_err)?, version: row .try_get::("version") @@ -272,10 +281,11 @@ fn row_to_pointer( .as_deref() .map(serde_json::from_str) .transpose() - .map_err(|e| WfeError::Persistence(format!("Failed to deserialize persistence_data: {e}")))?; + .map_err(|e| { + WfeError::Persistence(format!("Failed to deserialize persistence_data: {e}")) + })?; - let event_data_str: Option = - row.try_get("event_data").map_err(to_persistence_err)?; + let event_data_str: Option = row.try_get("event_data").map_err(to_persistence_err)?; let event_data: Option = event_data_str .as_deref() .map(serde_json::from_str) @@ -308,15 +318,13 @@ fn row_to_pointer( let ext_str: String = row .try_get("extension_attributes") .map_err(to_persistence_err)?; - let extension_attributes: HashMap = - serde_json::from_str(&ext_str).map_err(|e| { - WfeError::Persistence(format!("Failed to deserialize extension_attributes: {e}")) - })?; + let extension_attributes: HashMap = serde_json::from_str(&ext_str) + .map_err(|e| { + WfeError::Persistence(format!("Failed to deserialize extension_attributes: {e}")) + })?; - let sleep_until_str: Option = - row.try_get("sleep_until").map_err(to_persistence_err)?; - let start_time_str: Option = - row.try_get("start_time").map_err(to_persistence_err)?; + let sleep_until_str: Option = row.try_get("sleep_until").map_err(to_persistence_err)?; + let start_time_str: Option = row.try_get("start_time").map_err(to_persistence_err)?; let end_time_str: Option = row.try_get("end_time").map_err(to_persistence_err)?; Ok(ExecutionPointer { @@ -373,8 +381,7 @@ fn row_to_event(row: &sqlx::sqlite::SqliteRow) -> std::result::Result std::result::Result { - let subscribe_as_of_str: String = - row.try_get("subscribe_as_of").map_err(to_persistence_err)?; + let subscribe_as_of_str: String = row.try_get("subscribe_as_of").map_err(to_persistence_err)?; let subscription_data_str: Option = row .try_get("subscription_data") @@ -436,10 +443,11 @@ impl WorkflowRepository for SqlitePersistenceProvider { let mut tx = self.pool.begin().await.map_err(to_persistence_err)?; sqlx::query( - "INSERT INTO workflows (id, definition_id, version, description, reference, status, data, next_execution, create_time, complete_time) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", + "INSERT INTO workflows (id, name, definition_id, version, description, reference, status, data, next_execution, create_time, complete_time) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)", ) .bind(&id) + .bind(&instance.name) .bind(&instance.workflow_definition_id) .bind(instance.version as i64) .bind(&instance.description) @@ -474,10 +482,11 @@ impl WorkflowRepository for SqlitePersistenceProvider { let mut tx = self.pool.begin().await.map_err(to_persistence_err)?; sqlx::query( - "UPDATE workflows SET definition_id = ?1, version = ?2, description = ?3, reference = ?4, - status = ?5, data = ?6, next_execution = ?7, complete_time = ?8 - WHERE id = ?9", + "UPDATE workflows SET name = ?1, definition_id = ?2, version = ?3, description = ?4, reference = ?5, + status = ?6, data = ?7, next_execution = ?8, complete_time = ?9 + WHERE id = ?10", ) + .bind(&instance.name) .bind(&instance.workflow_definition_id) .bind(instance.version as i64) .bind(&instance.description) @@ -523,10 +532,11 @@ impl WorkflowRepository for SqlitePersistenceProvider { let mut tx = self.pool.begin().await.map_err(to_persistence_err)?; sqlx::query( - "UPDATE workflows SET definition_id = ?1, version = ?2, description = ?3, reference = ?4, - status = ?5, data = ?6, next_execution = ?7, complete_time = ?8 - WHERE id = ?9", + "UPDATE workflows SET name = ?1, definition_id = ?2, version = ?3, description = ?4, reference = ?5, + status = ?6, data = ?7, next_execution = ?8, complete_time = ?9 + WHERE id = ?10", ) + .bind(&instance.name) .bind(&instance.workflow_definition_id) .bind(instance.version as i64) .bind(&instance.description) @@ -583,12 +593,11 @@ impl WorkflowRepository for SqlitePersistenceProvider { .map_err(to_persistence_err)? .ok_or_else(|| WfeError::WorkflowNotFound(id.to_string()))?; - let pointer_rows = - sqlx::query("SELECT * FROM execution_pointers WHERE workflow_id = ?1") - .bind(id) - .fetch_all(&self.pool) - .await - .map_err(to_persistence_err)?; + let pointer_rows = sqlx::query("SELECT * FROM execution_pointers WHERE workflow_id = ?1") + .bind(id) + .fetch_all(&self.pool) + .await + .map_err(to_persistence_err)?; let pointers = pointer_rows .iter() @@ -598,6 +607,36 @@ impl WorkflowRepository for SqlitePersistenceProvider { row_to_workflow(&row, pointers) } + async fn get_workflow_instance_by_name(&self, name: &str) -> Result { + let row = sqlx::query("SELECT id FROM workflows WHERE name = ?1") + .bind(name) + .fetch_optional(&self.pool) + .await + .map_err(to_persistence_err)? + .ok_or_else(|| WfeError::WorkflowNotFound(name.to_string()))?; + let id: String = row.try_get("id").map_err(to_persistence_err)?; + self.get_workflow_instance(&id).await + } + + async fn next_definition_sequence(&self, definition_id: &str) -> Result { + // SQLite doesn't support `INSERT ... ON CONFLICT ... RETURNING` prior + // to 3.35, but sqlx bundles a new-enough build. Emulate an atomic + // increment via UPSERT + RETURNING so concurrent callers don't collide. + let row = sqlx::query( + "INSERT INTO definition_sequences (definition_id, next_num) + VALUES (?1, 1) + ON CONFLICT(definition_id) DO UPDATE + SET next_num = next_num + 1 + RETURNING next_num", + ) + .bind(definition_id) + .fetch_one(&self.pool) + .await + .map_err(to_persistence_err)?; + let next: i64 = row.try_get("next_num").map_err(to_persistence_err)?; + Ok(next as u64) + } + async fn get_workflow_instances(&self, ids: &[String]) -> Result> { if ids.is_empty() { return Ok(Vec::new()); @@ -735,10 +774,7 @@ async fn insert_subscription( #[async_trait] impl SubscriptionRepository for SqlitePersistenceProvider { - async fn create_event_subscription( - &self, - subscription: &EventSubscription, - ) -> Result { + async fn create_event_subscription(&self, subscription: &EventSubscription) -> Result { let id = if subscription.id.is_empty() { uuid::Uuid::new_v4().to_string() } else { @@ -776,18 +812,14 @@ impl SubscriptionRepository for SqlitePersistenceProvider { } async fn terminate_subscription(&self, subscription_id: &str) -> Result<()> { - let result = sqlx::query( - "UPDATE event_subscriptions SET terminated = 1 WHERE id = ?1", - ) - .bind(subscription_id) - .execute(&self.pool) - .await - .map_err(to_persistence_err)?; + let result = sqlx::query("UPDATE event_subscriptions SET terminated = 1 WHERE id = ?1") + .bind(subscription_id) + .execute(&self.pool) + .await + .map_err(to_persistence_err)?; if result.rows_affected() == 0 { - return Err(WfeError::SubscriptionNotFound( - subscription_id.to_string(), - )); + return Err(WfeError::SubscriptionNotFound(subscription_id.to_string())); } Ok(()) } @@ -860,20 +892,14 @@ impl SubscriptionRepository for SqlitePersistenceProvider { .await .map_err(to_persistence_err)?; if exists.is_none() { - return Err(WfeError::SubscriptionNotFound( - subscription_id.to_string(), - )); + return Err(WfeError::SubscriptionNotFound(subscription_id.to_string())); } return Ok(false); } Ok(true) } - async fn clear_subscription_token( - &self, - subscription_id: &str, - token: &str, - ) -> Result<()> { + async fn clear_subscription_token(&self, subscription_id: &str, token: &str) -> Result<()> { let result = sqlx::query( "UPDATE event_subscriptions SET external_token = NULL, external_worker_id = NULL, external_token_expiry = NULL @@ -886,9 +912,7 @@ impl SubscriptionRepository for SqlitePersistenceProvider { .map_err(to_persistence_err)?; if result.rows_affected() == 0 { - return Err(WfeError::SubscriptionNotFound( - subscription_id.to_string(), - )); + return Err(WfeError::SubscriptionNotFound(subscription_id.to_string())); } Ok(()) } @@ -937,13 +961,11 @@ impl EventRepository for SqlitePersistenceProvider { async fn get_runnable_events(&self, as_at: DateTime) -> Result> { let as_at_str = dt_to_string(&as_at); - let rows = sqlx::query( - "SELECT id FROM events WHERE is_processed = 0 AND event_time <= ?1", - ) - .bind(&as_at_str) - .fetch_all(&self.pool) - .await - .map_err(to_persistence_err)?; + let rows = sqlx::query("SELECT id FROM events WHERE is_processed = 0 AND event_time <= ?1") + .bind(&as_at_str) + .fetch_all(&self.pool) + .await + .map_err(to_persistence_err)?; rows.iter() .map(|r| r.try_get("id").map_err(to_persistence_err)) @@ -1029,9 +1051,14 @@ impl ScheduledCommandRepository for SqlitePersistenceProvider { async fn process_commands( &self, as_of: DateTime, - handler: &(dyn Fn(ScheduledCommand) -> std::pin::Pin> + Send>> - + Send - + Sync), + handler: &( + dyn Fn( + ScheduledCommand, + ) + -> std::pin::Pin> + Send>> + + Send + + Sync + ), ) -> Result<()> { let as_of_millis = as_of.timestamp_millis();