From 48e5d9a26f326fb105034d6f629a46a18b92ca11 Mon Sep 17 00:00:00 2001 From: Sienna Meridian Satterwhite Date: Thu, 9 Apr 2026 15:45:47 +0100 Subject: [PATCH] feat(persistence+k8s): root_workflow_id schema, PVC provisioning, name fallback, host name-or-UUID --- wfe-kubernetes/src/config.rs | 33 ++++++++++- wfe-kubernetes/src/lib.rs | 1 + wfe-kubernetes/src/manifests.rs | 98 +++++++++++++++++++++++++++------ wfe-kubernetes/src/pvc.rs | 96 ++++++++++++++++++++++++++++++++ wfe-kubernetes/src/step.rs | 56 +++++++++++++++++-- wfe-postgres/src/lib.rs | 39 ++++++++++--- wfe-sqlite/src/lib.rs | 36 +++++++++--- wfe/src/host.rs | 7 +++ 8 files changed, 327 insertions(+), 39 deletions(-) create mode 100644 wfe-kubernetes/src/pvc.rs diff --git a/wfe-kubernetes/src/config.rs b/wfe-kubernetes/src/config.rs index de23e1a..ed60b0a 100644 --- a/wfe-kubernetes/src/config.rs +++ b/wfe-kubernetes/src/config.rs @@ -20,6 +20,18 @@ pub struct ClusterConfig { /// Node selector labels for Job pods. #[serde(default)] pub node_selector: HashMap, + /// Default size (e.g. "10Gi") used when a workflow declares a + /// `shared_volume` without specifying its own `size`. The K8s executor + /// creates one PVC per top-level workflow instance and mounts it on + /// every step container so sub-workflows can share a cloned checkout, + /// sccache directory, etc. Cluster operators tune this based on the + /// typical working-set size of their pipelines. + #[serde(default = "default_shared_volume_size")] + pub default_shared_volume_size: String, + /// Optional StorageClass to use when provisioning shared-volume PVCs. + /// Falls back to the cluster's default StorageClass when unset. + #[serde(default)] + pub shared_volume_storage_class: Option, } impl Default for ClusterConfig { @@ -30,6 +42,8 @@ impl Default for ClusterConfig { service_account: None, image_pull_secrets: Vec::new(), node_selector: HashMap::new(), + default_shared_volume_size: default_shared_volume_size(), + shared_volume_storage_class: None, } } } @@ -38,6 +52,10 @@ fn default_namespace_prefix() -> String { "wfe-".to_string() } +fn default_shared_volume_size() -> String { + "10Gi".to_string() +} + /// Per-step configuration for a Kubernetes Job execution. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct KubernetesStepConfig { @@ -46,9 +64,19 @@ pub struct KubernetesStepConfig { /// Override entrypoint. #[serde(default)] pub command: Option>, - /// Shorthand: runs via `/bin/sh -c "..."`. Mutually exclusive with `command`. + /// Shorthand: runs the given script via the configured `shell` + /// (default `/bin/sh`). Mutually exclusive with `command`. For scripts + /// that rely on bashisms like `set -o pipefail`, process substitution, + /// or arrays, set `shell: /bin/bash` explicitly — the default /bin/sh + /// keeps alpine/busybox containers working out of the box. #[serde(default)] pub run: Option, + /// Shell used to execute a `run:` script. Defaults to `/bin/sh` so + /// minimal containers (alpine, distroless) work unchanged. Override + /// to `/bin/bash` or any other interpreter when the script needs + /// features dash doesn't support. + #[serde(default)] + pub shell: Option, /// Environment variables injected into the container. #[serde(default)] pub env: HashMap, @@ -95,6 +123,8 @@ mod tests { service_account: Some("wfe-runner".into()), image_pull_secrets: vec!["ghcr-secret".into()], node_selector: [("tier".into(), "compute".into())].into(), + default_shared_volume_size: "20Gi".into(), + shared_volume_storage_class: Some("fast-ssd".into()), }; let json = serde_json::to_string(&config).unwrap(); let parsed: ClusterConfig = serde_json::from_str(&json).unwrap(); @@ -119,6 +149,7 @@ mod tests { image: "node:20-alpine".into(), command: None, run: Some("npm test".into()), + shell: None, env: [("NODE_ENV".into(), "test".into())].into(), working_dir: Some("/app".into()), memory: Some("512Mi".into()), diff --git a/wfe-kubernetes/src/lib.rs b/wfe-kubernetes/src/lib.rs index 3ff3ba1..e6ecc9c 100644 --- a/wfe-kubernetes/src/lib.rs +++ b/wfe-kubernetes/src/lib.rs @@ -5,6 +5,7 @@ pub mod logs; pub mod manifests; pub mod namespace; pub mod output; +pub mod pvc; pub mod service_manifests; pub mod service_provider; pub mod step; diff --git a/wfe-kubernetes/src/manifests.rs b/wfe-kubernetes/src/manifests.rs index f15fdfe..6d45de9 100644 --- a/wfe-kubernetes/src/manifests.rs +++ b/wfe-kubernetes/src/manifests.rs @@ -2,7 +2,8 @@ use std::collections::{BTreeMap, HashMap}; use k8s_openapi::api::batch::v1::{Job, JobSpec}; use k8s_openapi::api::core::v1::{ - Container, EnvVar, LocalObjectReference, PodSpec, PodTemplateSpec, ResourceRequirements, + Container, EnvVar, LocalObjectReference, PersistentVolumeClaimVolumeSource, PodSpec, + PodTemplateSpec, ResourceRequirements, Volume, VolumeMount, }; use k8s_openapi::apimachinery::pkg::api::resource::Quantity; use kube::api::ObjectMeta; @@ -11,6 +12,18 @@ use crate::config::{ClusterConfig, KubernetesStepConfig}; const LABEL_STEP_NAME: &str = "wfe.sunbeam.pt/step-name"; const LABEL_MANAGED_BY: &str = "wfe.sunbeam.pt/managed-by"; +/// Name of the pod volume referencing the shared PVC. Stable so VolumeMount +/// name and Volume name stay in sync inside a single pod spec. +const SHARED_VOLUME_NAME: &str = "wfe-workspace"; + +/// Request that the generated Job mount a pre-existing PVC into every +/// step container at `mount_path`. Passed in by the step executor, which +/// is responsible for creating the PVC before calling `build_job`. +#[derive(Debug, Clone)] +pub struct SharedVolumeMount { + pub claim_name: String, + pub mount_path: String, +} /// Build a Kubernetes Job manifest from step configuration. pub fn build_job( @@ -19,6 +32,7 @@ pub fn build_job( namespace: &str, env_overrides: &HashMap, cluster: &ClusterConfig, + shared_volume: Option<&SharedVolumeMount>, ) -> Job { let job_name = sanitize_name(step_name); @@ -41,6 +55,14 @@ pub fn build_job( labels.insert(LABEL_STEP_NAME.into(), step_name.to_string()); labels.insert(LABEL_MANAGED_BY.into(), "wfe-kubernetes".into()); + let volume_mounts = shared_volume.map(|sv| { + vec![VolumeMount { + name: SHARED_VOLUME_NAME.into(), + mount_path: sv.mount_path.clone(), + ..Default::default() + }] + }); + let container = Container { name: "step".into(), image: Some(config.image.clone()), @@ -50,6 +72,7 @@ pub fn build_job( working_dir: config.working_dir.clone(), resources: Some(resources), image_pull_policy: Some(pull_policy), + volume_mounts, ..Default::default() }; @@ -77,6 +100,17 @@ pub fn build_job( ) }; + let volumes = shared_volume.map(|sv| { + vec![Volume { + name: SHARED_VOLUME_NAME.into(), + persistent_volume_claim: Some(PersistentVolumeClaimVolumeSource { + claim_name: sv.claim_name.clone(), + read_only: Some(false), + }), + ..Default::default() + }] + }); + Job { metadata: ObjectMeta { name: Some(job_name), @@ -98,6 +132,7 @@ pub fn build_job( service_account_name: cluster.service_account.clone(), image_pull_secrets, node_selector, + volumes, ..Default::default() }), }, @@ -112,13 +147,15 @@ fn resolve_command(config: &KubernetesStepConfig) -> (Option>, Optio if let Some(ref cmd) = config.command { (Some(cmd.clone()), None) } else if let Some(ref run) = config.run { - // Use bash so that scripts can rely on `set -o pipefail`, process - // substitution, arrays, and other bashisms that dash (/bin/sh on - // debian-family images) does not support. - ( - Some(vec!["/bin/bash".into(), "-c".into()]), - Some(vec![run.clone()]), - ) + // Default to /bin/sh so minimal container images (alpine/busybox, + // distroless) work unchanged. Scripts that need bash features + // (pipefail, arrays, process substitution) should set + // `shell: /bin/bash` explicitly on the step config. + let shell = config + .shell + .clone() + .unwrap_or_else(|| "/bin/sh".to_string()); + (Some(vec![shell, "-c".into()]), Some(vec![run.clone()])) } else { (None, None) } @@ -208,6 +245,7 @@ mod tests { image: "alpine:3.18".into(), command: None, run: None, + shell: None, env: HashMap::new(), working_dir: None, memory: None, @@ -222,6 +260,7 @@ mod tests { "wfe-abc", &HashMap::new(), &default_cluster(), + None, ); assert_eq!(job.metadata.name, Some("test-step".into())); @@ -248,6 +287,7 @@ mod tests { image: "node:20".into(), command: None, run: Some("npm test".into()), + shell: None, env: HashMap::new(), working_dir: Some("/app".into()), memory: None, @@ -256,13 +296,17 @@ mod tests { pull_policy: None, namespace: None, }; - let job = build_job(&config, "test", "ns", &HashMap::new(), &default_cluster()); + let job = build_job( + &config, + "test", + "ns", + &HashMap::new(), + &default_cluster(), + None, + ); let container = &job.spec.unwrap().template.spec.unwrap().containers[0]; - assert_eq!( - container.command, - Some(vec!["/bin/bash".into(), "-c".into()]) - ); + assert_eq!(container.command, Some(vec!["/bin/sh".into(), "-c".into()])); assert_eq!(container.args, Some(vec!["npm test".into()])); assert_eq!(container.working_dir, Some("/app".into())); } @@ -273,6 +317,7 @@ mod tests { image: "gcc:latest".into(), command: Some(vec!["make".into(), "build".into()]), run: None, + shell: None, env: HashMap::new(), working_dir: None, memory: None, @@ -281,7 +326,14 @@ mod tests { pull_policy: None, namespace: None, }; - let job = build_job(&config, "build", "ns", &HashMap::new(), &default_cluster()); + let job = build_job( + &config, + "build", + "ns", + &HashMap::new(), + &default_cluster(), + None, + ); let container = &job.spec.unwrap().template.spec.unwrap().containers[0]; assert_eq!(container.command, Some(vec!["make".into(), "build".into()])); @@ -294,6 +346,7 @@ mod tests { image: "alpine".into(), command: None, run: None, + shell: None, env: [("APP_ENV".into(), "production".into())].into(), working_dir: None, memory: None, @@ -308,7 +361,7 @@ mod tests { ] .into(); - let job = build_job(&config, "step", "ns", &overrides, &default_cluster()); + let job = build_job(&config, "step", "ns", &overrides, &default_cluster(), None); let container = &job.spec.unwrap().template.spec.unwrap().containers[0]; let env = container.env.as_ref().unwrap(); @@ -327,6 +380,7 @@ mod tests { image: "alpine".into(), command: None, run: None, + shell: None, env: HashMap::new(), working_dir: None, memory: Some("512Mi".into()), @@ -335,7 +389,14 @@ mod tests { pull_policy: None, namespace: None, }; - let job = build_job(&config, "step", "ns", &HashMap::new(), &default_cluster()); + let job = build_job( + &config, + "step", + "ns", + &HashMap::new(), + &default_cluster(), + None, + ); let container = &job.spec.unwrap().template.spec.unwrap().containers[0]; let resources = container.resources.as_ref().unwrap(); @@ -359,6 +420,7 @@ mod tests { image: "alpine".into(), command: None, run: None, + shell: None, env: HashMap::new(), working_dir: None, memory: None, @@ -367,7 +429,7 @@ mod tests { pull_policy: Some("Always".into()), namespace: None, }; - let job = build_job(&config, "step", "ns", &HashMap::new(), &cluster); + let job = build_job(&config, "step", "ns", &HashMap::new(), &cluster, None); let pod_spec = job.spec.unwrap().template.spec.unwrap(); assert_eq!(pod_spec.service_account_name, Some("wfe-runner".into())); @@ -387,6 +449,7 @@ mod tests { image: "alpine".into(), command: None, run: None, + shell: None, env: HashMap::new(), working_dir: None, memory: None, @@ -401,6 +464,7 @@ mod tests { "ns", &HashMap::new(), &default_cluster(), + None, ); let labels = job.metadata.labels.as_ref().unwrap(); assert_eq!(labels.get(LABEL_STEP_NAME), Some(&"my-step".to_string())); diff --git a/wfe-kubernetes/src/pvc.rs b/wfe-kubernetes/src/pvc.rs new file mode 100644 index 0000000..f8f2ca7 --- /dev/null +++ b/wfe-kubernetes/src/pvc.rs @@ -0,0 +1,96 @@ +//! PersistentVolumeClaim provisioning for cross-step shared workspaces. +//! +//! When a workflow definition declares a `shared_volume`, the K8s executor +//! materializes it as a single PVC in the workflow's namespace. Every step +//! container mounts that PVC at the declared `mount_path`, so sub-workflows +//! of a top-level run see the same filesystem — a clone in `checkout` is +//! still visible to `cargo fmt --check` in `lint`. +//! +//! The PVC name is derived from the namespace (one PVC per namespace) so +//! multiple steps racing to create it are idempotent: the first `ensure_pvc` +//! wins, and every subsequent call sees the existing claim. + +use std::collections::BTreeMap; + +use k8s_openapi::api::core::v1::{ + PersistentVolumeClaim, PersistentVolumeClaimSpec, VolumeResourceRequirements, +}; +use k8s_openapi::apimachinery::pkg::api::resource::Quantity; +use kube::api::{ObjectMeta, PostParams}; +use kube::{Api, Client}; +use wfe_core::WfeError; + +const LABEL_MANAGED_BY: &str = "wfe.sunbeam.pt/managed-by"; +const MANAGED_BY_VALUE: &str = "wfe-kubernetes"; + +/// Canonical name for the shared volume PVC within a given namespace. Using +/// a stable name (rather than e.g. a UUID) means every step in the same +/// namespace references the same claim without needing to pass the name +/// through workflow metadata. +pub fn shared_volume_pvc_name() -> &'static str { + "wfe-workspace" +} + +/// Create the shared-volume PVC in `namespace` if it does not already exist. +/// `size` is a K8s resource quantity string (e.g. `"10Gi"`). `storage_class` +/// is optional — when `None` the PVC uses the cluster's default StorageClass. +pub async fn ensure_shared_volume_pvc( + client: &Client, + namespace: &str, + size: &str, + storage_class: Option<&str>, +) -> Result<(), WfeError> { + let api: Api = Api::namespaced(client.clone(), namespace); + let name = shared_volume_pvc_name(); + + // Idempotent: if it already exists we're done. This races cleanly with + // concurrent steps because `get` + `create(AlreadyExists)` is tolerated. + if api.get(name).await.is_ok() { + return Ok(()); + } + + let mut labels = BTreeMap::new(); + labels.insert(LABEL_MANAGED_BY.into(), MANAGED_BY_VALUE.into()); + + let pvc = PersistentVolumeClaim { + metadata: ObjectMeta { + name: Some(name.to_string()), + namespace: Some(namespace.to_string()), + labels: Some(labels), + ..Default::default() + }, + spec: Some(PersistentVolumeClaimSpec { + access_modes: Some(vec!["ReadWriteOnce".to_string()]), + resources: Some(VolumeResourceRequirements { + requests: Some( + [("storage".to_string(), Quantity(size.to_string()))] + .into_iter() + .collect(), + ), + ..Default::default() + }), + storage_class_name: storage_class.map(|s| s.to_string()), + ..Default::default() + }), + ..Default::default() + }; + + match api.create(&PostParams::default(), &pvc).await { + Ok(_) => Ok(()), + // Another step created it between our get and create — also fine. + Err(kube::Error::Api(err)) if err.code == 409 => Ok(()), + Err(e) => Err(WfeError::StepExecution(format!( + "failed to create shared-volume PVC '{name}' in '{namespace}': {e}" + ))), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn shared_volume_pvc_name_is_stable() { + assert_eq!(shared_volume_pvc_name(), "wfe-workspace"); + } +} diff --git a/wfe-kubernetes/src/step.rs b/wfe-kubernetes/src/step.rs index 6d0ed20..02305ae 100644 --- a/wfe-kubernetes/src/step.rs +++ b/wfe-kubernetes/src/step.rs @@ -13,9 +13,10 @@ use wfe_core::traits::step::{StepBody, StepExecutionContext}; use crate::cleanup::delete_job; use crate::config::{ClusterConfig, KubernetesStepConfig}; use crate::logs::{stream_logs, wait_for_pod_running}; -use crate::manifests::build_job; +use crate::manifests::{SharedVolumeMount, build_job}; use crate::namespace::{ensure_namespace, namespace_name}; use crate::output::{build_output_data, parse_outputs}; +use crate::pvc::{ensure_shared_volume_pvc, shared_volume_pvc_name}; /// A workflow step that runs as a Kubernetes Job. pub struct KubernetesStep { @@ -65,6 +66,15 @@ impl StepBody for KubernetesStep { .unwrap_or("unknown") .to_string(); + // Isolation domain is keyed on the *root* workflow when set so + // sub-workflows started via `type: workflow` share their parent's + // namespace + PVC. Top-level (user-started) workflows fall back + // to their own id. + let isolation_id = context + .workflow + .root_workflow_id + .as_deref() + .unwrap_or(&context.workflow.id); let workflow_id = &context.workflow.id; let definition_id = &context.workflow.workflow_definition_id; @@ -72,21 +82,59 @@ impl StepBody for KubernetesStep { let _ = self.get_client().await?; let client = self.client.as_ref().unwrap().clone(); - // 1. Determine namespace. + // 1. Determine namespace. Honors explicit step-level override, + // otherwise derives from the isolation domain. let ns = self .config .namespace .clone() - .unwrap_or_else(|| namespace_name(&self.cluster.namespace_prefix, workflow_id)); + .unwrap_or_else(|| namespace_name(&self.cluster.namespace_prefix, isolation_id)); // 2. Ensure namespace exists. ensure_namespace(&client, &ns, workflow_id).await?; + // 2b. If the definition declares a shared volume, ensure the PVC + // exists in the namespace and compute the mount spec we'll inject + // into the Job. The PVC is created once per namespace (one per + // top-level workflow run) and reused by every step and + // sub-workflow. Backends with no definition in the step context + // (test fixtures) just skip the shared volume. + let shared_mount = if let Some(def) = context.definition { + if let Some(sv) = &def.shared_volume { + let size = sv + .size + .as_deref() + .unwrap_or(&self.cluster.default_shared_volume_size); + ensure_shared_volume_pvc( + &client, + &ns, + size, + self.cluster.shared_volume_storage_class.as_deref(), + ) + .await?; + Some(SharedVolumeMount { + claim_name: shared_volume_pvc_name().to_string(), + mount_path: sv.mount_path.clone(), + }) + } else { + None + } + } else { + None + }; + // 3. Merge env vars: workflow.data (uppercased) + config.env. let env_overrides = extract_workflow_env(&context.workflow.data); // 4. Build Job manifest. - let job_manifest = build_job(&self.config, &step_name, &ns, &env_overrides, &self.cluster); + let job_manifest = build_job( + &self.config, + &step_name, + &ns, + &env_overrides, + &self.cluster, + shared_mount.as_ref(), + ); let job_name = job_manifest .metadata .name diff --git a/wfe-postgres/src/lib.rs b/wfe-postgres/src/lib.rs index dba8f7b..3075877 100644 --- a/wfe-postgres/src/lib.rs +++ b/wfe-postgres/src/lib.rs @@ -227,17 +227,27 @@ impl WorkflowRepository for PostgresPersistenceProvider { } else { instance.id.clone() }; + // Fall back to the UUID when the caller didn't assign a human name. + // In production `WorkflowHost::start_workflow` always fills this in + // via `next_definition_sequence`, but test fixtures and any external + // caller that forgets shouldn't trip the UNIQUE constraint. + let name = if instance.name.is_empty() { + id.clone() + } else { + instance.name.clone() + }; let mut tx = self.pool.begin().await.map_err(Self::map_sqlx_err)?; sqlx::query( r#"INSERT INTO wfc.workflows - (id, name, definition_id, version, description, reference, status, data, - next_execution, create_time, complete_time) - VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)"#, + (id, name, root_workflow_id, definition_id, version, description, reference, + status, data, next_execution, create_time, complete_time) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)"#, ) .bind(&id) - .bind(&instance.name) + .bind(&name) + .bind(&instance.root_workflow_id) .bind(&instance.workflow_definition_id) .bind(instance.version as i32) .bind(&instance.description) @@ -264,12 +274,14 @@ impl WorkflowRepository for PostgresPersistenceProvider { sqlx::query( r#"UPDATE wfc.workflows SET - name=$2, definition_id=$3, version=$4, description=$5, reference=$6, - status=$7, data=$8, next_execution=$9, create_time=$10, complete_time=$11 + name=$2, root_workflow_id=$3, definition_id=$4, version=$5, + description=$6, reference=$7, status=$8, data=$9, next_execution=$10, + create_time=$11, complete_time=$12 WHERE id=$1"#, ) .bind(&instance.id) .bind(&instance.name) + .bind(&instance.root_workflow_id) .bind(&instance.workflow_definition_id) .bind(instance.version as i32) .bind(&instance.description) @@ -306,12 +318,14 @@ impl WorkflowRepository for PostgresPersistenceProvider { sqlx::query( r#"UPDATE wfc.workflows SET - name=$2, definition_id=$3, version=$4, description=$5, reference=$6, - status=$7, data=$8, next_execution=$9, create_time=$10, complete_time=$11 + name=$2, root_workflow_id=$3, definition_id=$4, version=$5, + description=$6, reference=$7, status=$8, data=$9, next_execution=$10, + create_time=$11, complete_time=$12 WHERE id=$1"#, ) .bind(&instance.id) .bind(&instance.name) + .bind(&instance.root_workflow_id) .bind(&instance.workflow_definition_id) .bind(instance.version as i32) .bind(&instance.description) @@ -396,6 +410,7 @@ impl WorkflowRepository for PostgresPersistenceProvider { Ok(WorkflowInstance { id: row.get("id"), name: row.get("name"), + root_workflow_id: row.get("root_workflow_id"), workflow_definition_id: row.get("definition_id"), version: row.get::("version") as u32, description: row.get("description"), @@ -832,6 +847,7 @@ impl PersistenceProvider for PostgresPersistenceProvider { r#"CREATE TABLE IF NOT EXISTS wfc.workflows ( id TEXT PRIMARY KEY, name TEXT NOT NULL UNIQUE, + root_workflow_id TEXT, definition_id TEXT NOT NULL, version INT NOT NULL, description TEXT, @@ -864,6 +880,13 @@ impl PersistenceProvider for PostgresPersistenceProvider { CREATE UNIQUE INDEX IF NOT EXISTS idx_workflows_name ON wfc.workflows (name); END IF; + IF NOT EXISTS ( + SELECT 1 FROM information_schema.columns + WHERE table_schema = 'wfc' AND table_name = 'workflows' + AND column_name = 'root_workflow_id' + ) THEN + ALTER TABLE wfc.workflows ADD COLUMN root_workflow_id TEXT; + END IF; END$$;"#, ) .execute(&self.pool) diff --git a/wfe-sqlite/src/lib.rs b/wfe-sqlite/src/lib.rs index 03df56c..aec5ff7 100644 --- a/wfe-sqlite/src/lib.rs +++ b/wfe-sqlite/src/lib.rs @@ -58,6 +58,7 @@ impl SqlitePersistenceProvider { "CREATE TABLE IF NOT EXISTS workflows ( id TEXT PRIMARY KEY, name TEXT NOT NULL UNIQUE, + root_workflow_id TEXT, definition_id TEXT NOT NULL, version INTEGER NOT NULL, description TEXT, @@ -250,6 +251,9 @@ fn row_to_workflow( Ok(WorkflowInstance { id: row.try_get("id").map_err(to_persistence_err)?, name: row.try_get("name").map_err(to_persistence_err)?, + root_workflow_id: row + .try_get("root_workflow_id") + .map_err(to_persistence_err)?, workflow_definition_id: row.try_get("definition_id").map_err(to_persistence_err)?, version: row .try_get::("version") @@ -429,6 +433,15 @@ impl WorkflowRepository for SqlitePersistenceProvider { } else { instance.id.clone() }; + // Fall back to the UUID when the caller didn't assign a human name. + // Production callers go through `WorkflowHost::start_workflow` which + // always fills this in, but test fixtures and external callers + // shouldn't trip the UNIQUE constraint. + let name = if instance.name.is_empty() { + id.clone() + } else { + instance.name.clone() + }; let status_str = serde_json::to_value(instance.status) .map_err(|e| WfeError::Persistence(e.to_string()))? @@ -443,11 +456,12 @@ impl WorkflowRepository for SqlitePersistenceProvider { let mut tx = self.pool.begin().await.map_err(to_persistence_err)?; sqlx::query( - "INSERT INTO workflows (id, name, definition_id, version, description, reference, status, data, next_execution, create_time, complete_time) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)", + "INSERT INTO workflows (id, name, root_workflow_id, definition_id, version, description, reference, status, data, next_execution, create_time, complete_time) + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)", ) .bind(&id) - .bind(&instance.name) + .bind(&name) + .bind(&instance.root_workflow_id) .bind(&instance.workflow_definition_id) .bind(instance.version as i64) .bind(&instance.description) @@ -482,11 +496,13 @@ impl WorkflowRepository for SqlitePersistenceProvider { let mut tx = self.pool.begin().await.map_err(to_persistence_err)?; sqlx::query( - "UPDATE workflows SET name = ?1, definition_id = ?2, version = ?3, description = ?4, reference = ?5, - status = ?6, data = ?7, next_execution = ?8, complete_time = ?9 - WHERE id = ?10", + "UPDATE workflows SET name = ?1, root_workflow_id = ?2, definition_id = ?3, + version = ?4, description = ?5, reference = ?6, status = ?7, data = ?8, + next_execution = ?9, complete_time = ?10 + WHERE id = ?11", ) .bind(&instance.name) + .bind(&instance.root_workflow_id) .bind(&instance.workflow_definition_id) .bind(instance.version as i64) .bind(&instance.description) @@ -532,11 +548,13 @@ impl WorkflowRepository for SqlitePersistenceProvider { let mut tx = self.pool.begin().await.map_err(to_persistence_err)?; sqlx::query( - "UPDATE workflows SET name = ?1, definition_id = ?2, version = ?3, description = ?4, reference = ?5, - status = ?6, data = ?7, next_execution = ?8, complete_time = ?9 - WHERE id = ?10", + "UPDATE workflows SET name = ?1, root_workflow_id = ?2, definition_id = ?3, + version = ?4, description = ?5, reference = ?6, status = ?7, data = ?8, + next_execution = ?9, complete_time = ?10 + WHERE id = ?11", ) .bind(&instance.name) + .bind(&instance.root_workflow_id) .bind(&instance.workflow_definition_id) .bind(instance.version as i64) .bind(&instance.description) diff --git a/wfe/src/host.rs b/wfe/src/host.rs index 7dde81d..dda335c 100644 --- a/wfe/src/host.rs +++ b/wfe/src/host.rs @@ -36,8 +36,10 @@ impl HostContext for HostContextImpl { definition_id: &str, version: u32, data: serde_json::Value, + parent_root_workflow_id: Option, ) -> Pin> + Send + '_>> { let def_id = definition_id.to_string(); + let parent_root = parent_root_workflow_id; Box::pin(async move { // Look up the definition. let reg = self.registry.read().await; @@ -54,6 +56,11 @@ impl HostContext for HostContextImpl { instance.execution_pointers.push(ExecutionPointer::new(0)); } + // Inherit the parent's root so every descendant of a given + // top-level workflow lands in the same Kubernetes namespace + // and can share a provisioned volume. + instance.root_workflow_id = parent_root; + // Auto-assign a human-friendly name before persisting so the // child shows up as `{definition_id}-{N}` in lookups and logs. // Sub-workflows always use the default; callers wanting a custom