fix(wfe): propagate shared_volume to sub-workflows via instance data
Sub-workflow steps were not getting a PVC because the K8s executor checked context.definition.shared_volume which is the child definition (e.g. lint) — not the root (ci) that declares shared_volume. Only root definitions carry the config; sub-workflow definitions don't. Fix: WorkflowHost::start_workflow_with_name injects the config as _wfe_shared_volume in instance.data. SubWorkflowStep propagates the parent's data to children, so the config reaches every descendant. The K8s executor reads it from workflow.data when definition.shared_volume is None. Adds a regression test that mirrors the real topology: a child workflow instance with root_workflow_id set, no shared_volume on its definition, and _wfe_shared_volume in data — must still get the PVC.
This commit is contained in:
@@ -93,14 +93,35 @@ impl StepBody for KubernetesStep {
|
||||
// 2. Ensure namespace exists.
|
||||
ensure_namespace(&client, &ns, workflow_id).await?;
|
||||
|
||||
// 2b. If the definition declares a shared volume, ensure the PVC
|
||||
// 2b. If the workflow declares a shared volume, ensure the PVC
|
||||
// exists in the namespace and compute the mount spec we'll inject
|
||||
// into the Job. The PVC is created once per namespace (one per
|
||||
// top-level workflow run) and reused by every step and
|
||||
// sub-workflow. Backends with no definition in the step context
|
||||
// (test fixtures) just skip the shared volume.
|
||||
let shared_mount = if let Some(def) = context.definition {
|
||||
if let Some(sv) = &def.shared_volume {
|
||||
// sub-workflow.
|
||||
//
|
||||
// Check two sources: the step's definition (top-level workflow)
|
||||
// and the inherited `_wfe_shared_volume` key in workflow.data
|
||||
// (sub-workflows inherit it via parent-data propagation). The
|
||||
// data path is the normal case for sub-workflows since only the
|
||||
// root definition declares `shared_volume:`.
|
||||
let shared_mount = {
|
||||
// Try definition first (top-level workflow running its own steps).
|
||||
let sv_from_def = context.definition.and_then(|d| d.shared_volume.as_ref());
|
||||
|
||||
// Fall back to inherited data (sub-workflow inherits parent data).
|
||||
let sv_from_data: Option<wfe_core::models::SharedVolume> = if sv_from_def.is_none() {
|
||||
context
|
||||
.workflow
|
||||
.data
|
||||
.get("_wfe_shared_volume")
|
||||
.and_then(|v| serde_json::from_value(v.clone()).ok())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let sv = sv_from_def.or(sv_from_data.as_ref());
|
||||
|
||||
if let Some(sv) = sv {
|
||||
let size = sv
|
||||
.size
|
||||
.as_deref()
|
||||
@@ -119,8 +140,6 @@ impl StepBody for KubernetesStep {
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// 3. Merge env vars: workflow.data (uppercased) + config.env.
|
||||
|
||||
@@ -791,3 +791,113 @@ echo "##wfe[output file_count=$COUNT]"
|
||||
// Explicit cleanup (the guard still runs on panic paths).
|
||||
namespace::delete_namespace(&client, &ns).await.ok();
|
||||
}
|
||||
|
||||
// ── Regression: sub-workflow inherits shared_volume via data ─────────
|
||||
//
|
||||
// The real CI topology is: ci (root, declares shared_volume) → checkout
|
||||
// sub-workflow (no shared_volume on its definition). The K8s executor
|
||||
// must pick up the shared_volume from the inherited `_wfe_shared_volume`
|
||||
// key in workflow.data, NOT from the sub-workflow's definition.
|
||||
//
|
||||
// This test was added after a production bug where the PVC was never
|
||||
// created because sub-workflow steps checked context.definition.shared_volume
|
||||
// which was None on the child definition.
|
||||
#[tokio::test]
|
||||
async fn sub_workflow_inherits_shared_volume_from_data() {
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use wfe_core::models::{
|
||||
ExecutionPointer, SharedVolume, WorkflowDefinition, WorkflowInstance, WorkflowStep,
|
||||
};
|
||||
use wfe_core::traits::step::{StepBody, StepExecutionContext};
|
||||
|
||||
let cluster = cluster_config();
|
||||
let client = client::create_client(&cluster).await.unwrap();
|
||||
let root_id = unique_id("subwf-pvc");
|
||||
|
||||
// The *child* definition does NOT declare shared_volume — this is
|
||||
// the whole point. Only the root ci workflow declares it, and the
|
||||
// config propagates via `_wfe_shared_volume` in workflow.data.
|
||||
let child_definition = WorkflowDefinition::new("lint", 1);
|
||||
|
||||
// Simulate the data a sub-workflow receives from a root that has
|
||||
// shared_volume. WorkflowHost::start_workflow_with_name injects
|
||||
// `_wfe_shared_volume` into instance.data; SubWorkflowStep copies
|
||||
// the parent's data into the child.
|
||||
let child_data = serde_json::json!({
|
||||
"repo_url": "https://example.com/repo.git",
|
||||
"_wfe_shared_volume": {
|
||||
"mount_path": "/workspace",
|
||||
"size": "1Gi"
|
||||
}
|
||||
});
|
||||
|
||||
let child_instance = WorkflowInstance {
|
||||
id: unique_id("child"),
|
||||
name: "lint-regression-1".into(),
|
||||
// Points at the root ci workflow — K8s executor derives the
|
||||
// namespace from this, placing us in the root's namespace.
|
||||
root_workflow_id: Some(root_id.clone()),
|
||||
workflow_definition_id: "lint".into(),
|
||||
version: 1,
|
||||
description: None,
|
||||
reference: None,
|
||||
execution_pointers: vec![],
|
||||
next_execution: None,
|
||||
status: wfe_core::models::WorkflowStatus::Runnable,
|
||||
data: child_data,
|
||||
create_time: chrono::Utc::now(),
|
||||
complete_time: None,
|
||||
};
|
||||
|
||||
let ns = namespace::namespace_name(&cluster.namespace_prefix, &root_id);
|
||||
|
||||
// Step config — write a file to /workspace and verify it persists.
|
||||
let mut step_cfg = step_config(
|
||||
"alpine:3.18",
|
||||
"echo pvc-ok > /workspace/pvc-test.txt && cat /workspace/pvc-test.txt",
|
||||
);
|
||||
step_cfg.namespace = Some(ns.clone());
|
||||
|
||||
let mut step = wfe_kubernetes::KubernetesStep::new(step_cfg, cluster.clone(), client.clone());
|
||||
|
||||
let mut ws = WorkflowStep::new(0, "pvc-check");
|
||||
ws.name = Some("pvc-check".into());
|
||||
let pointer = ExecutionPointer::new(0);
|
||||
|
||||
let ctx = StepExecutionContext {
|
||||
item: None,
|
||||
execution_pointer: &pointer,
|
||||
persistence_data: None,
|
||||
step: &ws,
|
||||
workflow: &child_instance,
|
||||
// definition has NO shared_volume — the executor must read
|
||||
// _wfe_shared_volume from workflow.data instead.
|
||||
definition: Some(&child_definition),
|
||||
cancellation_token: CancellationToken::new(),
|
||||
host_context: None,
|
||||
log_sink: None,
|
||||
};
|
||||
|
||||
let result = step.run(&ctx).await.unwrap_or_else(|e| {
|
||||
panic!("pvc-check step failed (regression: sub-workflow shared_volume not inherited): {e}");
|
||||
});
|
||||
assert!(result.proceed);
|
||||
|
||||
// Verify the PVC was actually created in the namespace.
|
||||
use k8s_openapi::api::core::v1::PersistentVolumeClaim;
|
||||
let pvcs: kube::Api<PersistentVolumeClaim> = kube::Api::namespaced(client.clone(), &ns);
|
||||
let pvc = pvcs.get("wfe-workspace").await;
|
||||
assert!(
|
||||
pvc.is_ok(),
|
||||
"PVC 'wfe-workspace' should exist in namespace {ns}"
|
||||
);
|
||||
|
||||
let output = result.output_data.unwrap();
|
||||
let stdout = output["pvc-check.stdout"].as_str().unwrap_or("");
|
||||
assert!(
|
||||
stdout.contains("pvc-ok"),
|
||||
"expected pvc-ok in stdout, got: {stdout}"
|
||||
);
|
||||
|
||||
namespace::delete_namespace(&client, &ns).await.ok();
|
||||
}
|
||||
|
||||
@@ -395,6 +395,20 @@ impl WorkflowHost {
|
||||
instance.execution_pointers.push(pointer);
|
||||
}
|
||||
|
||||
// If the definition declares a shared volume, stash it in the
|
||||
// instance's data under a reserved key so sub-workflows inherit it
|
||||
// automatically (via the parent-data-inheritance in SubWorkflowStep).
|
||||
// The K8s executor reads it from workflow.data when
|
||||
// context.definition.shared_volume is None.
|
||||
if let Some(sv) = &definition.shared_volume {
|
||||
if let Some(obj) = instance.data.as_object_mut() {
|
||||
obj.insert(
|
||||
"_wfe_shared_volume".to_string(),
|
||||
serde_json::to_value(sv).unwrap_or_default(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Assign a human-friendly name. Callers may override (e.g. webhook
|
||||
// handlers that want `ci-mainline-a1b2c3`); otherwise use the
|
||||
// sequenced default. Validation: reject empty overrides so the name
|
||||
|
||||
Reference in New Issue
Block a user