feat(persistence+k8s): root_workflow_id schema, PVC provisioning, name fallback, host name-or-UUID

This commit is contained in:
2026-04-09 15:45:47 +01:00
parent 2aaf3c16c9
commit 48e5d9a26f
8 changed files with 327 additions and 39 deletions

View File

@@ -20,6 +20,18 @@ pub struct ClusterConfig {
/// Node selector labels for Job pods. /// Node selector labels for Job pods.
#[serde(default)] #[serde(default)]
pub node_selector: HashMap<String, String>, pub node_selector: HashMap<String, String>,
/// Default size (e.g. "10Gi") used when a workflow declares a
/// `shared_volume` without specifying its own `size`. The K8s executor
/// creates one PVC per top-level workflow instance and mounts it on
/// every step container so sub-workflows can share a cloned checkout,
/// sccache directory, etc. Cluster operators tune this based on the
/// typical working-set size of their pipelines.
#[serde(default = "default_shared_volume_size")]
pub default_shared_volume_size: String,
/// Optional StorageClass to use when provisioning shared-volume PVCs.
/// Falls back to the cluster's default StorageClass when unset.
#[serde(default)]
pub shared_volume_storage_class: Option<String>,
} }
impl Default for ClusterConfig { impl Default for ClusterConfig {
@@ -30,6 +42,8 @@ impl Default for ClusterConfig {
service_account: None, service_account: None,
image_pull_secrets: Vec::new(), image_pull_secrets: Vec::new(),
node_selector: HashMap::new(), node_selector: HashMap::new(),
default_shared_volume_size: default_shared_volume_size(),
shared_volume_storage_class: None,
} }
} }
} }
@@ -38,6 +52,10 @@ fn default_namespace_prefix() -> String {
"wfe-".to_string() "wfe-".to_string()
} }
fn default_shared_volume_size() -> String {
"10Gi".to_string()
}
/// Per-step configuration for a Kubernetes Job execution. /// Per-step configuration for a Kubernetes Job execution.
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KubernetesStepConfig { pub struct KubernetesStepConfig {
@@ -46,9 +64,19 @@ pub struct KubernetesStepConfig {
/// Override entrypoint. /// Override entrypoint.
#[serde(default)] #[serde(default)]
pub command: Option<Vec<String>>, pub command: Option<Vec<String>>,
/// Shorthand: runs via `/bin/sh -c "..."`. Mutually exclusive with `command`. /// Shorthand: runs the given script via the configured `shell`
/// (default `/bin/sh`). Mutually exclusive with `command`. For scripts
/// that rely on bashisms like `set -o pipefail`, process substitution,
/// or arrays, set `shell: /bin/bash` explicitly — the default /bin/sh
/// keeps alpine/busybox containers working out of the box.
#[serde(default)] #[serde(default)]
pub run: Option<String>, pub run: Option<String>,
/// Shell used to execute a `run:` script. Defaults to `/bin/sh` so
/// minimal containers (alpine, distroless) work unchanged. Override
/// to `/bin/bash` or any other interpreter when the script needs
/// features dash doesn't support.
#[serde(default)]
pub shell: Option<String>,
/// Environment variables injected into the container. /// Environment variables injected into the container.
#[serde(default)] #[serde(default)]
pub env: HashMap<String, String>, pub env: HashMap<String, String>,
@@ -95,6 +123,8 @@ mod tests {
service_account: Some("wfe-runner".into()), service_account: Some("wfe-runner".into()),
image_pull_secrets: vec!["ghcr-secret".into()], image_pull_secrets: vec!["ghcr-secret".into()],
node_selector: [("tier".into(), "compute".into())].into(), node_selector: [("tier".into(), "compute".into())].into(),
default_shared_volume_size: "20Gi".into(),
shared_volume_storage_class: Some("fast-ssd".into()),
}; };
let json = serde_json::to_string(&config).unwrap(); let json = serde_json::to_string(&config).unwrap();
let parsed: ClusterConfig = serde_json::from_str(&json).unwrap(); let parsed: ClusterConfig = serde_json::from_str(&json).unwrap();
@@ -119,6 +149,7 @@ mod tests {
image: "node:20-alpine".into(), image: "node:20-alpine".into(),
command: None, command: None,
run: Some("npm test".into()), run: Some("npm test".into()),
shell: None,
env: [("NODE_ENV".into(), "test".into())].into(), env: [("NODE_ENV".into(), "test".into())].into(),
working_dir: Some("/app".into()), working_dir: Some("/app".into()),
memory: Some("512Mi".into()), memory: Some("512Mi".into()),

View File

@@ -5,6 +5,7 @@ pub mod logs;
pub mod manifests; pub mod manifests;
pub mod namespace; pub mod namespace;
pub mod output; pub mod output;
pub mod pvc;
pub mod service_manifests; pub mod service_manifests;
pub mod service_provider; pub mod service_provider;
pub mod step; pub mod step;

View File

@@ -2,7 +2,8 @@ use std::collections::{BTreeMap, HashMap};
use k8s_openapi::api::batch::v1::{Job, JobSpec}; use k8s_openapi::api::batch::v1::{Job, JobSpec};
use k8s_openapi::api::core::v1::{ use k8s_openapi::api::core::v1::{
Container, EnvVar, LocalObjectReference, PodSpec, PodTemplateSpec, ResourceRequirements, Container, EnvVar, LocalObjectReference, PersistentVolumeClaimVolumeSource, PodSpec,
PodTemplateSpec, ResourceRequirements, Volume, VolumeMount,
}; };
use k8s_openapi::apimachinery::pkg::api::resource::Quantity; use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
use kube::api::ObjectMeta; use kube::api::ObjectMeta;
@@ -11,6 +12,18 @@ use crate::config::{ClusterConfig, KubernetesStepConfig};
const LABEL_STEP_NAME: &str = "wfe.sunbeam.pt/step-name"; const LABEL_STEP_NAME: &str = "wfe.sunbeam.pt/step-name";
const LABEL_MANAGED_BY: &str = "wfe.sunbeam.pt/managed-by"; const LABEL_MANAGED_BY: &str = "wfe.sunbeam.pt/managed-by";
/// Name of the pod volume referencing the shared PVC. Stable so VolumeMount
/// name and Volume name stay in sync inside a single pod spec.
const SHARED_VOLUME_NAME: &str = "wfe-workspace";
/// Request that the generated Job mount a pre-existing PVC into every
/// step container at `mount_path`. Passed in by the step executor, which
/// is responsible for creating the PVC before calling `build_job`.
#[derive(Debug, Clone)]
pub struct SharedVolumeMount {
pub claim_name: String,
pub mount_path: String,
}
/// Build a Kubernetes Job manifest from step configuration. /// Build a Kubernetes Job manifest from step configuration.
pub fn build_job( pub fn build_job(
@@ -19,6 +32,7 @@ pub fn build_job(
namespace: &str, namespace: &str,
env_overrides: &HashMap<String, String>, env_overrides: &HashMap<String, String>,
cluster: &ClusterConfig, cluster: &ClusterConfig,
shared_volume: Option<&SharedVolumeMount>,
) -> Job { ) -> Job {
let job_name = sanitize_name(step_name); let job_name = sanitize_name(step_name);
@@ -41,6 +55,14 @@ pub fn build_job(
labels.insert(LABEL_STEP_NAME.into(), step_name.to_string()); labels.insert(LABEL_STEP_NAME.into(), step_name.to_string());
labels.insert(LABEL_MANAGED_BY.into(), "wfe-kubernetes".into()); labels.insert(LABEL_MANAGED_BY.into(), "wfe-kubernetes".into());
let volume_mounts = shared_volume.map(|sv| {
vec![VolumeMount {
name: SHARED_VOLUME_NAME.into(),
mount_path: sv.mount_path.clone(),
..Default::default()
}]
});
let container = Container { let container = Container {
name: "step".into(), name: "step".into(),
image: Some(config.image.clone()), image: Some(config.image.clone()),
@@ -50,6 +72,7 @@ pub fn build_job(
working_dir: config.working_dir.clone(), working_dir: config.working_dir.clone(),
resources: Some(resources), resources: Some(resources),
image_pull_policy: Some(pull_policy), image_pull_policy: Some(pull_policy),
volume_mounts,
..Default::default() ..Default::default()
}; };
@@ -77,6 +100,17 @@ pub fn build_job(
) )
}; };
let volumes = shared_volume.map(|sv| {
vec![Volume {
name: SHARED_VOLUME_NAME.into(),
persistent_volume_claim: Some(PersistentVolumeClaimVolumeSource {
claim_name: sv.claim_name.clone(),
read_only: Some(false),
}),
..Default::default()
}]
});
Job { Job {
metadata: ObjectMeta { metadata: ObjectMeta {
name: Some(job_name), name: Some(job_name),
@@ -98,6 +132,7 @@ pub fn build_job(
service_account_name: cluster.service_account.clone(), service_account_name: cluster.service_account.clone(),
image_pull_secrets, image_pull_secrets,
node_selector, node_selector,
volumes,
..Default::default() ..Default::default()
}), }),
}, },
@@ -112,13 +147,15 @@ fn resolve_command(config: &KubernetesStepConfig) -> (Option<Vec<String>>, Optio
if let Some(ref cmd) = config.command { if let Some(ref cmd) = config.command {
(Some(cmd.clone()), None) (Some(cmd.clone()), None)
} else if let Some(ref run) = config.run { } else if let Some(ref run) = config.run {
// Use bash so that scripts can rely on `set -o pipefail`, process // Default to /bin/sh so minimal container images (alpine/busybox,
// substitution, arrays, and other bashisms that dash (/bin/sh on // distroless) work unchanged. Scripts that need bash features
// debian-family images) does not support. // (pipefail, arrays, process substitution) should set
( // `shell: /bin/bash` explicitly on the step config.
Some(vec!["/bin/bash".into(), "-c".into()]), let shell = config
Some(vec![run.clone()]), .shell
) .clone()
.unwrap_or_else(|| "/bin/sh".to_string());
(Some(vec![shell, "-c".into()]), Some(vec![run.clone()]))
} else { } else {
(None, None) (None, None)
} }
@@ -208,6 +245,7 @@ mod tests {
image: "alpine:3.18".into(), image: "alpine:3.18".into(),
command: None, command: None,
run: None, run: None,
shell: None,
env: HashMap::new(), env: HashMap::new(),
working_dir: None, working_dir: None,
memory: None, memory: None,
@@ -222,6 +260,7 @@ mod tests {
"wfe-abc", "wfe-abc",
&HashMap::new(), &HashMap::new(),
&default_cluster(), &default_cluster(),
None,
); );
assert_eq!(job.metadata.name, Some("test-step".into())); assert_eq!(job.metadata.name, Some("test-step".into()));
@@ -248,6 +287,7 @@ mod tests {
image: "node:20".into(), image: "node:20".into(),
command: None, command: None,
run: Some("npm test".into()), run: Some("npm test".into()),
shell: None,
env: HashMap::new(), env: HashMap::new(),
working_dir: Some("/app".into()), working_dir: Some("/app".into()),
memory: None, memory: None,
@@ -256,13 +296,17 @@ mod tests {
pull_policy: None, pull_policy: None,
namespace: None, namespace: None,
}; };
let job = build_job(&config, "test", "ns", &HashMap::new(), &default_cluster()); let job = build_job(
&config,
"test",
"ns",
&HashMap::new(),
&default_cluster(),
None,
);
let container = &job.spec.unwrap().template.spec.unwrap().containers[0]; let container = &job.spec.unwrap().template.spec.unwrap().containers[0];
assert_eq!( assert_eq!(container.command, Some(vec!["/bin/sh".into(), "-c".into()]));
container.command,
Some(vec!["/bin/bash".into(), "-c".into()])
);
assert_eq!(container.args, Some(vec!["npm test".into()])); assert_eq!(container.args, Some(vec!["npm test".into()]));
assert_eq!(container.working_dir, Some("/app".into())); assert_eq!(container.working_dir, Some("/app".into()));
} }
@@ -273,6 +317,7 @@ mod tests {
image: "gcc:latest".into(), image: "gcc:latest".into(),
command: Some(vec!["make".into(), "build".into()]), command: Some(vec!["make".into(), "build".into()]),
run: None, run: None,
shell: None,
env: HashMap::new(), env: HashMap::new(),
working_dir: None, working_dir: None,
memory: None, memory: None,
@@ -281,7 +326,14 @@ mod tests {
pull_policy: None, pull_policy: None,
namespace: None, namespace: None,
}; };
let job = build_job(&config, "build", "ns", &HashMap::new(), &default_cluster()); let job = build_job(
&config,
"build",
"ns",
&HashMap::new(),
&default_cluster(),
None,
);
let container = &job.spec.unwrap().template.spec.unwrap().containers[0]; let container = &job.spec.unwrap().template.spec.unwrap().containers[0];
assert_eq!(container.command, Some(vec!["make".into(), "build".into()])); assert_eq!(container.command, Some(vec!["make".into(), "build".into()]));
@@ -294,6 +346,7 @@ mod tests {
image: "alpine".into(), image: "alpine".into(),
command: None, command: None,
run: None, run: None,
shell: None,
env: [("APP_ENV".into(), "production".into())].into(), env: [("APP_ENV".into(), "production".into())].into(),
working_dir: None, working_dir: None,
memory: None, memory: None,
@@ -308,7 +361,7 @@ mod tests {
] ]
.into(); .into();
let job = build_job(&config, "step", "ns", &overrides, &default_cluster()); let job = build_job(&config, "step", "ns", &overrides, &default_cluster(), None);
let container = &job.spec.unwrap().template.spec.unwrap().containers[0]; let container = &job.spec.unwrap().template.spec.unwrap().containers[0];
let env = container.env.as_ref().unwrap(); let env = container.env.as_ref().unwrap();
@@ -327,6 +380,7 @@ mod tests {
image: "alpine".into(), image: "alpine".into(),
command: None, command: None,
run: None, run: None,
shell: None,
env: HashMap::new(), env: HashMap::new(),
working_dir: None, working_dir: None,
memory: Some("512Mi".into()), memory: Some("512Mi".into()),
@@ -335,7 +389,14 @@ mod tests {
pull_policy: None, pull_policy: None,
namespace: None, namespace: None,
}; };
let job = build_job(&config, "step", "ns", &HashMap::new(), &default_cluster()); let job = build_job(
&config,
"step",
"ns",
&HashMap::new(),
&default_cluster(),
None,
);
let container = &job.spec.unwrap().template.spec.unwrap().containers[0]; let container = &job.spec.unwrap().template.spec.unwrap().containers[0];
let resources = container.resources.as_ref().unwrap(); let resources = container.resources.as_ref().unwrap();
@@ -359,6 +420,7 @@ mod tests {
image: "alpine".into(), image: "alpine".into(),
command: None, command: None,
run: None, run: None,
shell: None,
env: HashMap::new(), env: HashMap::new(),
working_dir: None, working_dir: None,
memory: None, memory: None,
@@ -367,7 +429,7 @@ mod tests {
pull_policy: Some("Always".into()), pull_policy: Some("Always".into()),
namespace: None, namespace: None,
}; };
let job = build_job(&config, "step", "ns", &HashMap::new(), &cluster); let job = build_job(&config, "step", "ns", &HashMap::new(), &cluster, None);
let pod_spec = job.spec.unwrap().template.spec.unwrap(); let pod_spec = job.spec.unwrap().template.spec.unwrap();
assert_eq!(pod_spec.service_account_name, Some("wfe-runner".into())); assert_eq!(pod_spec.service_account_name, Some("wfe-runner".into()));
@@ -387,6 +449,7 @@ mod tests {
image: "alpine".into(), image: "alpine".into(),
command: None, command: None,
run: None, run: None,
shell: None,
env: HashMap::new(), env: HashMap::new(),
working_dir: None, working_dir: None,
memory: None, memory: None,
@@ -401,6 +464,7 @@ mod tests {
"ns", "ns",
&HashMap::new(), &HashMap::new(),
&default_cluster(), &default_cluster(),
None,
); );
let labels = job.metadata.labels.as_ref().unwrap(); let labels = job.metadata.labels.as_ref().unwrap();
assert_eq!(labels.get(LABEL_STEP_NAME), Some(&"my-step".to_string())); assert_eq!(labels.get(LABEL_STEP_NAME), Some(&"my-step".to_string()));

96
wfe-kubernetes/src/pvc.rs Normal file
View File

@@ -0,0 +1,96 @@
//! PersistentVolumeClaim provisioning for cross-step shared workspaces.
//!
//! When a workflow definition declares a `shared_volume`, the K8s executor
//! materializes it as a single PVC in the workflow's namespace. Every step
//! container mounts that PVC at the declared `mount_path`, so sub-workflows
//! of a top-level run see the same filesystem — a clone in `checkout` is
//! still visible to `cargo fmt --check` in `lint`.
//!
//! The PVC name is derived from the namespace (one PVC per namespace) so
//! multiple steps racing to create it are idempotent: the first `ensure_pvc`
//! wins, and every subsequent call sees the existing claim.
use std::collections::BTreeMap;
use k8s_openapi::api::core::v1::{
PersistentVolumeClaim, PersistentVolumeClaimSpec, VolumeResourceRequirements,
};
use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
use kube::api::{ObjectMeta, PostParams};
use kube::{Api, Client};
use wfe_core::WfeError;
const LABEL_MANAGED_BY: &str = "wfe.sunbeam.pt/managed-by";
const MANAGED_BY_VALUE: &str = "wfe-kubernetes";
/// Canonical name for the shared volume PVC within a given namespace. Using
/// a stable name (rather than e.g. a UUID) means every step in the same
/// namespace references the same claim without needing to pass the name
/// through workflow metadata.
pub fn shared_volume_pvc_name() -> &'static str {
"wfe-workspace"
}
/// Create the shared-volume PVC in `namespace` if it does not already exist.
/// `size` is a K8s resource quantity string (e.g. `"10Gi"`). `storage_class`
/// is optional — when `None` the PVC uses the cluster's default StorageClass.
pub async fn ensure_shared_volume_pvc(
client: &Client,
namespace: &str,
size: &str,
storage_class: Option<&str>,
) -> Result<(), WfeError> {
let api: Api<PersistentVolumeClaim> = Api::namespaced(client.clone(), namespace);
let name = shared_volume_pvc_name();
// Idempotent: if it already exists we're done. This races cleanly with
// concurrent steps because `get` + `create(AlreadyExists)` is tolerated.
if api.get(name).await.is_ok() {
return Ok(());
}
let mut labels = BTreeMap::new();
labels.insert(LABEL_MANAGED_BY.into(), MANAGED_BY_VALUE.into());
let pvc = PersistentVolumeClaim {
metadata: ObjectMeta {
name: Some(name.to_string()),
namespace: Some(namespace.to_string()),
labels: Some(labels),
..Default::default()
},
spec: Some(PersistentVolumeClaimSpec {
access_modes: Some(vec!["ReadWriteOnce".to_string()]),
resources: Some(VolumeResourceRequirements {
requests: Some(
[("storage".to_string(), Quantity(size.to_string()))]
.into_iter()
.collect(),
),
..Default::default()
}),
storage_class_name: storage_class.map(|s| s.to_string()),
..Default::default()
}),
..Default::default()
};
match api.create(&PostParams::default(), &pvc).await {
Ok(_) => Ok(()),
// Another step created it between our get and create — also fine.
Err(kube::Error::Api(err)) if err.code == 409 => Ok(()),
Err(e) => Err(WfeError::StepExecution(format!(
"failed to create shared-volume PVC '{name}' in '{namespace}': {e}"
))),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn shared_volume_pvc_name_is_stable() {
assert_eq!(shared_volume_pvc_name(), "wfe-workspace");
}
}

View File

@@ -13,9 +13,10 @@ use wfe_core::traits::step::{StepBody, StepExecutionContext};
use crate::cleanup::delete_job; use crate::cleanup::delete_job;
use crate::config::{ClusterConfig, KubernetesStepConfig}; use crate::config::{ClusterConfig, KubernetesStepConfig};
use crate::logs::{stream_logs, wait_for_pod_running}; use crate::logs::{stream_logs, wait_for_pod_running};
use crate::manifests::build_job; use crate::manifests::{SharedVolumeMount, build_job};
use crate::namespace::{ensure_namespace, namespace_name}; use crate::namespace::{ensure_namespace, namespace_name};
use crate::output::{build_output_data, parse_outputs}; use crate::output::{build_output_data, parse_outputs};
use crate::pvc::{ensure_shared_volume_pvc, shared_volume_pvc_name};
/// A workflow step that runs as a Kubernetes Job. /// A workflow step that runs as a Kubernetes Job.
pub struct KubernetesStep { pub struct KubernetesStep {
@@ -65,6 +66,15 @@ impl StepBody for KubernetesStep {
.unwrap_or("unknown") .unwrap_or("unknown")
.to_string(); .to_string();
// Isolation domain is keyed on the *root* workflow when set so
// sub-workflows started via `type: workflow` share their parent's
// namespace + PVC. Top-level (user-started) workflows fall back
// to their own id.
let isolation_id = context
.workflow
.root_workflow_id
.as_deref()
.unwrap_or(&context.workflow.id);
let workflow_id = &context.workflow.id; let workflow_id = &context.workflow.id;
let definition_id = &context.workflow.workflow_definition_id; let definition_id = &context.workflow.workflow_definition_id;
@@ -72,21 +82,59 @@ impl StepBody for KubernetesStep {
let _ = self.get_client().await?; let _ = self.get_client().await?;
let client = self.client.as_ref().unwrap().clone(); let client = self.client.as_ref().unwrap().clone();
// 1. Determine namespace. // 1. Determine namespace. Honors explicit step-level override,
// otherwise derives from the isolation domain.
let ns = self let ns = self
.config .config
.namespace .namespace
.clone() .clone()
.unwrap_or_else(|| namespace_name(&self.cluster.namespace_prefix, workflow_id)); .unwrap_or_else(|| namespace_name(&self.cluster.namespace_prefix, isolation_id));
// 2. Ensure namespace exists. // 2. Ensure namespace exists.
ensure_namespace(&client, &ns, workflow_id).await?; ensure_namespace(&client, &ns, workflow_id).await?;
// 2b. If the definition declares a shared volume, ensure the PVC
// exists in the namespace and compute the mount spec we'll inject
// into the Job. The PVC is created once per namespace (one per
// top-level workflow run) and reused by every step and
// sub-workflow. Backends with no definition in the step context
// (test fixtures) just skip the shared volume.
let shared_mount = if let Some(def) = context.definition {
if let Some(sv) = &def.shared_volume {
let size = sv
.size
.as_deref()
.unwrap_or(&self.cluster.default_shared_volume_size);
ensure_shared_volume_pvc(
&client,
&ns,
size,
self.cluster.shared_volume_storage_class.as_deref(),
)
.await?;
Some(SharedVolumeMount {
claim_name: shared_volume_pvc_name().to_string(),
mount_path: sv.mount_path.clone(),
})
} else {
None
}
} else {
None
};
// 3. Merge env vars: workflow.data (uppercased) + config.env. // 3. Merge env vars: workflow.data (uppercased) + config.env.
let env_overrides = extract_workflow_env(&context.workflow.data); let env_overrides = extract_workflow_env(&context.workflow.data);
// 4. Build Job manifest. // 4. Build Job manifest.
let job_manifest = build_job(&self.config, &step_name, &ns, &env_overrides, &self.cluster); let job_manifest = build_job(
&self.config,
&step_name,
&ns,
&env_overrides,
&self.cluster,
shared_mount.as_ref(),
);
let job_name = job_manifest let job_name = job_manifest
.metadata .metadata
.name .name

View File

@@ -227,17 +227,27 @@ impl WorkflowRepository for PostgresPersistenceProvider {
} else { } else {
instance.id.clone() instance.id.clone()
}; };
// Fall back to the UUID when the caller didn't assign a human name.
// In production `WorkflowHost::start_workflow` always fills this in
// via `next_definition_sequence`, but test fixtures and any external
// caller that forgets shouldn't trip the UNIQUE constraint.
let name = if instance.name.is_empty() {
id.clone()
} else {
instance.name.clone()
};
let mut tx = self.pool.begin().await.map_err(Self::map_sqlx_err)?; let mut tx = self.pool.begin().await.map_err(Self::map_sqlx_err)?;
sqlx::query( sqlx::query(
r#"INSERT INTO wfc.workflows r#"INSERT INTO wfc.workflows
(id, name, definition_id, version, description, reference, status, data, (id, name, root_workflow_id, definition_id, version, description, reference,
next_execution, create_time, complete_time) status, data, next_execution, create_time, complete_time)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11)"#, VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)"#,
) )
.bind(&id) .bind(&id)
.bind(&instance.name) .bind(&name)
.bind(&instance.root_workflow_id)
.bind(&instance.workflow_definition_id) .bind(&instance.workflow_definition_id)
.bind(instance.version as i32) .bind(instance.version as i32)
.bind(&instance.description) .bind(&instance.description)
@@ -264,12 +274,14 @@ impl WorkflowRepository for PostgresPersistenceProvider {
sqlx::query( sqlx::query(
r#"UPDATE wfc.workflows SET r#"UPDATE wfc.workflows SET
name=$2, definition_id=$3, version=$4, description=$5, reference=$6, name=$2, root_workflow_id=$3, definition_id=$4, version=$5,
status=$7, data=$8, next_execution=$9, create_time=$10, complete_time=$11 description=$6, reference=$7, status=$8, data=$9, next_execution=$10,
create_time=$11, complete_time=$12
WHERE id=$1"#, WHERE id=$1"#,
) )
.bind(&instance.id) .bind(&instance.id)
.bind(&instance.name) .bind(&instance.name)
.bind(&instance.root_workflow_id)
.bind(&instance.workflow_definition_id) .bind(&instance.workflow_definition_id)
.bind(instance.version as i32) .bind(instance.version as i32)
.bind(&instance.description) .bind(&instance.description)
@@ -306,12 +318,14 @@ impl WorkflowRepository for PostgresPersistenceProvider {
sqlx::query( sqlx::query(
r#"UPDATE wfc.workflows SET r#"UPDATE wfc.workflows SET
name=$2, definition_id=$3, version=$4, description=$5, reference=$6, name=$2, root_workflow_id=$3, definition_id=$4, version=$5,
status=$7, data=$8, next_execution=$9, create_time=$10, complete_time=$11 description=$6, reference=$7, status=$8, data=$9, next_execution=$10,
create_time=$11, complete_time=$12
WHERE id=$1"#, WHERE id=$1"#,
) )
.bind(&instance.id) .bind(&instance.id)
.bind(&instance.name) .bind(&instance.name)
.bind(&instance.root_workflow_id)
.bind(&instance.workflow_definition_id) .bind(&instance.workflow_definition_id)
.bind(instance.version as i32) .bind(instance.version as i32)
.bind(&instance.description) .bind(&instance.description)
@@ -396,6 +410,7 @@ impl WorkflowRepository for PostgresPersistenceProvider {
Ok(WorkflowInstance { Ok(WorkflowInstance {
id: row.get("id"), id: row.get("id"),
name: row.get("name"), name: row.get("name"),
root_workflow_id: row.get("root_workflow_id"),
workflow_definition_id: row.get("definition_id"), workflow_definition_id: row.get("definition_id"),
version: row.get::<i32, _>("version") as u32, version: row.get::<i32, _>("version") as u32,
description: row.get("description"), description: row.get("description"),
@@ -832,6 +847,7 @@ impl PersistenceProvider for PostgresPersistenceProvider {
r#"CREATE TABLE IF NOT EXISTS wfc.workflows ( r#"CREATE TABLE IF NOT EXISTS wfc.workflows (
id TEXT PRIMARY KEY, id TEXT PRIMARY KEY,
name TEXT NOT NULL UNIQUE, name TEXT NOT NULL UNIQUE,
root_workflow_id TEXT,
definition_id TEXT NOT NULL, definition_id TEXT NOT NULL,
version INT NOT NULL, version INT NOT NULL,
description TEXT, description TEXT,
@@ -864,6 +880,13 @@ impl PersistenceProvider for PostgresPersistenceProvider {
CREATE UNIQUE INDEX IF NOT EXISTS idx_workflows_name CREATE UNIQUE INDEX IF NOT EXISTS idx_workflows_name
ON wfc.workflows (name); ON wfc.workflows (name);
END IF; END IF;
IF NOT EXISTS (
SELECT 1 FROM information_schema.columns
WHERE table_schema = 'wfc' AND table_name = 'workflows'
AND column_name = 'root_workflow_id'
) THEN
ALTER TABLE wfc.workflows ADD COLUMN root_workflow_id TEXT;
END IF;
END$$;"#, END$$;"#,
) )
.execute(&self.pool) .execute(&self.pool)

View File

@@ -58,6 +58,7 @@ impl SqlitePersistenceProvider {
"CREATE TABLE IF NOT EXISTS workflows ( "CREATE TABLE IF NOT EXISTS workflows (
id TEXT PRIMARY KEY, id TEXT PRIMARY KEY,
name TEXT NOT NULL UNIQUE, name TEXT NOT NULL UNIQUE,
root_workflow_id TEXT,
definition_id TEXT NOT NULL, definition_id TEXT NOT NULL,
version INTEGER NOT NULL, version INTEGER NOT NULL,
description TEXT, description TEXT,
@@ -250,6 +251,9 @@ fn row_to_workflow(
Ok(WorkflowInstance { Ok(WorkflowInstance {
id: row.try_get("id").map_err(to_persistence_err)?, id: row.try_get("id").map_err(to_persistence_err)?,
name: row.try_get("name").map_err(to_persistence_err)?, name: row.try_get("name").map_err(to_persistence_err)?,
root_workflow_id: row
.try_get("root_workflow_id")
.map_err(to_persistence_err)?,
workflow_definition_id: row.try_get("definition_id").map_err(to_persistence_err)?, workflow_definition_id: row.try_get("definition_id").map_err(to_persistence_err)?,
version: row version: row
.try_get::<i64, _>("version") .try_get::<i64, _>("version")
@@ -429,6 +433,15 @@ impl WorkflowRepository for SqlitePersistenceProvider {
} else { } else {
instance.id.clone() instance.id.clone()
}; };
// Fall back to the UUID when the caller didn't assign a human name.
// Production callers go through `WorkflowHost::start_workflow` which
// always fills this in, but test fixtures and external callers
// shouldn't trip the UNIQUE constraint.
let name = if instance.name.is_empty() {
id.clone()
} else {
instance.name.clone()
};
let status_str = serde_json::to_value(instance.status) let status_str = serde_json::to_value(instance.status)
.map_err(|e| WfeError::Persistence(e.to_string()))? .map_err(|e| WfeError::Persistence(e.to_string()))?
@@ -443,11 +456,12 @@ impl WorkflowRepository for SqlitePersistenceProvider {
let mut tx = self.pool.begin().await.map_err(to_persistence_err)?; let mut tx = self.pool.begin().await.map_err(to_persistence_err)?;
sqlx::query( sqlx::query(
"INSERT INTO workflows (id, name, definition_id, version, description, reference, status, data, next_execution, create_time, complete_time) "INSERT INTO workflows (id, name, root_workflow_id, definition_id, version, description, reference, status, data, next_execution, create_time, complete_time)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)", VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
) )
.bind(&id) .bind(&id)
.bind(&instance.name) .bind(&name)
.bind(&instance.root_workflow_id)
.bind(&instance.workflow_definition_id) .bind(&instance.workflow_definition_id)
.bind(instance.version as i64) .bind(instance.version as i64)
.bind(&instance.description) .bind(&instance.description)
@@ -482,11 +496,13 @@ impl WorkflowRepository for SqlitePersistenceProvider {
let mut tx = self.pool.begin().await.map_err(to_persistence_err)?; let mut tx = self.pool.begin().await.map_err(to_persistence_err)?;
sqlx::query( sqlx::query(
"UPDATE workflows SET name = ?1, definition_id = ?2, version = ?3, description = ?4, reference = ?5, "UPDATE workflows SET name = ?1, root_workflow_id = ?2, definition_id = ?3,
status = ?6, data = ?7, next_execution = ?8, complete_time = ?9 version = ?4, description = ?5, reference = ?6, status = ?7, data = ?8,
WHERE id = ?10", next_execution = ?9, complete_time = ?10
WHERE id = ?11",
) )
.bind(&instance.name) .bind(&instance.name)
.bind(&instance.root_workflow_id)
.bind(&instance.workflow_definition_id) .bind(&instance.workflow_definition_id)
.bind(instance.version as i64) .bind(instance.version as i64)
.bind(&instance.description) .bind(&instance.description)
@@ -532,11 +548,13 @@ impl WorkflowRepository for SqlitePersistenceProvider {
let mut tx = self.pool.begin().await.map_err(to_persistence_err)?; let mut tx = self.pool.begin().await.map_err(to_persistence_err)?;
sqlx::query( sqlx::query(
"UPDATE workflows SET name = ?1, definition_id = ?2, version = ?3, description = ?4, reference = ?5, "UPDATE workflows SET name = ?1, root_workflow_id = ?2, definition_id = ?3,
status = ?6, data = ?7, next_execution = ?8, complete_time = ?9 version = ?4, description = ?5, reference = ?6, status = ?7, data = ?8,
WHERE id = ?10", next_execution = ?9, complete_time = ?10
WHERE id = ?11",
) )
.bind(&instance.name) .bind(&instance.name)
.bind(&instance.root_workflow_id)
.bind(&instance.workflow_definition_id) .bind(&instance.workflow_definition_id)
.bind(instance.version as i64) .bind(instance.version as i64)
.bind(&instance.description) .bind(&instance.description)

View File

@@ -36,8 +36,10 @@ impl HostContext for HostContextImpl {
definition_id: &str, definition_id: &str,
version: u32, version: u32,
data: serde_json::Value, data: serde_json::Value,
parent_root_workflow_id: Option<String>,
) -> Pin<Box<dyn Future<Output = Result<String>> + Send + '_>> { ) -> Pin<Box<dyn Future<Output = Result<String>> + Send + '_>> {
let def_id = definition_id.to_string(); let def_id = definition_id.to_string();
let parent_root = parent_root_workflow_id;
Box::pin(async move { Box::pin(async move {
// Look up the definition. // Look up the definition.
let reg = self.registry.read().await; let reg = self.registry.read().await;
@@ -54,6 +56,11 @@ impl HostContext for HostContextImpl {
instance.execution_pointers.push(ExecutionPointer::new(0)); instance.execution_pointers.push(ExecutionPointer::new(0));
} }
// Inherit the parent's root so every descendant of a given
// top-level workflow lands in the same Kubernetes namespace
// and can share a provisioned volume.
instance.root_workflow_id = parent_root;
// Auto-assign a human-friendly name before persisting so the // Auto-assign a human-friendly name before persisting so the
// child shows up as `{definition_id}-{N}` in lookups and logs. // child shows up as `{definition_id}-{N}` in lookups and logs.
// Sub-workflows always use the default; callers wanting a custom // Sub-workflows always use the default; callers wanting a custom