diff --git a/wfe-kubernetes/src/step.rs b/wfe-kubernetes/src/step.rs index 02305ae..811980e 100644 --- a/wfe-kubernetes/src/step.rs +++ b/wfe-kubernetes/src/step.rs @@ -93,14 +93,35 @@ impl StepBody for KubernetesStep { // 2. Ensure namespace exists. ensure_namespace(&client, &ns, workflow_id).await?; - // 2b. If the definition declares a shared volume, ensure the PVC + // 2b. If the workflow declares a shared volume, ensure the PVC // exists in the namespace and compute the mount spec we'll inject // into the Job. The PVC is created once per namespace (one per // top-level workflow run) and reused by every step and - // sub-workflow. Backends with no definition in the step context - // (test fixtures) just skip the shared volume. - let shared_mount = if let Some(def) = context.definition { - if let Some(sv) = &def.shared_volume { + // sub-workflow. + // + // Check two sources: the step's definition (top-level workflow) + // and the inherited `_wfe_shared_volume` key in workflow.data + // (sub-workflows inherit it via parent-data propagation). The + // data path is the normal case for sub-workflows since only the + // root definition declares `shared_volume:`. + let shared_mount = { + // Try definition first (top-level workflow running its own steps). + let sv_from_def = context.definition.and_then(|d| d.shared_volume.as_ref()); + + // Fall back to inherited data (sub-workflow inherits parent data). + let sv_from_data: Option = if sv_from_def.is_none() { + context + .workflow + .data + .get("_wfe_shared_volume") + .and_then(|v| serde_json::from_value(v.clone()).ok()) + } else { + None + }; + + let sv = sv_from_def.or(sv_from_data.as_ref()); + + if let Some(sv) = sv { let size = sv .size .as_deref() @@ -119,8 +140,6 @@ impl StepBody for KubernetesStep { } else { None } - } else { - None }; // 3. Merge env vars: workflow.data (uppercased) + config.env. diff --git a/wfe-kubernetes/tests/integration.rs b/wfe-kubernetes/tests/integration.rs index 0fbc276..21b6139 100644 --- a/wfe-kubernetes/tests/integration.rs +++ b/wfe-kubernetes/tests/integration.rs @@ -791,3 +791,113 @@ echo "##wfe[output file_count=$COUNT]" // Explicit cleanup (the guard still runs on panic paths). namespace::delete_namespace(&client, &ns).await.ok(); } + +// ── Regression: sub-workflow inherits shared_volume via data ───────── +// +// The real CI topology is: ci (root, declares shared_volume) → checkout +// sub-workflow (no shared_volume on its definition). The K8s executor +// must pick up the shared_volume from the inherited `_wfe_shared_volume` +// key in workflow.data, NOT from the sub-workflow's definition. +// +// This test was added after a production bug where the PVC was never +// created because sub-workflow steps checked context.definition.shared_volume +// which was None on the child definition. +#[tokio::test] +async fn sub_workflow_inherits_shared_volume_from_data() { + use tokio_util::sync::CancellationToken; + use wfe_core::models::{ + ExecutionPointer, SharedVolume, WorkflowDefinition, WorkflowInstance, WorkflowStep, + }; + use wfe_core::traits::step::{StepBody, StepExecutionContext}; + + let cluster = cluster_config(); + let client = client::create_client(&cluster).await.unwrap(); + let root_id = unique_id("subwf-pvc"); + + // The *child* definition does NOT declare shared_volume — this is + // the whole point. Only the root ci workflow declares it, and the + // config propagates via `_wfe_shared_volume` in workflow.data. + let child_definition = WorkflowDefinition::new("lint", 1); + + // Simulate the data a sub-workflow receives from a root that has + // shared_volume. WorkflowHost::start_workflow_with_name injects + // `_wfe_shared_volume` into instance.data; SubWorkflowStep copies + // the parent's data into the child. + let child_data = serde_json::json!({ + "repo_url": "https://example.com/repo.git", + "_wfe_shared_volume": { + "mount_path": "/workspace", + "size": "1Gi" + } + }); + + let child_instance = WorkflowInstance { + id: unique_id("child"), + name: "lint-regression-1".into(), + // Points at the root ci workflow — K8s executor derives the + // namespace from this, placing us in the root's namespace. + root_workflow_id: Some(root_id.clone()), + workflow_definition_id: "lint".into(), + version: 1, + description: None, + reference: None, + execution_pointers: vec![], + next_execution: None, + status: wfe_core::models::WorkflowStatus::Runnable, + data: child_data, + create_time: chrono::Utc::now(), + complete_time: None, + }; + + let ns = namespace::namespace_name(&cluster.namespace_prefix, &root_id); + + // Step config — write a file to /workspace and verify it persists. + let mut step_cfg = step_config( + "alpine:3.18", + "echo pvc-ok > /workspace/pvc-test.txt && cat /workspace/pvc-test.txt", + ); + step_cfg.namespace = Some(ns.clone()); + + let mut step = wfe_kubernetes::KubernetesStep::new(step_cfg, cluster.clone(), client.clone()); + + let mut ws = WorkflowStep::new(0, "pvc-check"); + ws.name = Some("pvc-check".into()); + let pointer = ExecutionPointer::new(0); + + let ctx = StepExecutionContext { + item: None, + execution_pointer: &pointer, + persistence_data: None, + step: &ws, + workflow: &child_instance, + // definition has NO shared_volume — the executor must read + // _wfe_shared_volume from workflow.data instead. + definition: Some(&child_definition), + cancellation_token: CancellationToken::new(), + host_context: None, + log_sink: None, + }; + + let result = step.run(&ctx).await.unwrap_or_else(|e| { + panic!("pvc-check step failed (regression: sub-workflow shared_volume not inherited): {e}"); + }); + assert!(result.proceed); + + // Verify the PVC was actually created in the namespace. + use k8s_openapi::api::core::v1::PersistentVolumeClaim; + let pvcs: kube::Api = kube::Api::namespaced(client.clone(), &ns); + let pvc = pvcs.get("wfe-workspace").await; + assert!( + pvc.is_ok(), + "PVC 'wfe-workspace' should exist in namespace {ns}" + ); + + let output = result.output_data.unwrap(); + let stdout = output["pvc-check.stdout"].as_str().unwrap_or(""); + assert!( + stdout.contains("pvc-ok"), + "expected pvc-ok in stdout, got: {stdout}" + ); + + namespace::delete_namespace(&client, &ns).await.ok(); +} diff --git a/wfe/src/host.rs b/wfe/src/host.rs index dda335c..003f714 100644 --- a/wfe/src/host.rs +++ b/wfe/src/host.rs @@ -395,6 +395,20 @@ impl WorkflowHost { instance.execution_pointers.push(pointer); } + // If the definition declares a shared volume, stash it in the + // instance's data under a reserved key so sub-workflows inherit it + // automatically (via the parent-data-inheritance in SubWorkflowStep). + // The K8s executor reads it from workflow.data when + // context.definition.shared_volume is None. + if let Some(sv) = &definition.shared_volume { + if let Some(obj) = instance.data.as_object_mut() { + obj.insert( + "_wfe_shared_volume".to_string(), + serde_json::to_value(sv).unwrap_or_default(), + ); + } + } + // Assign a human-friendly name. Callers may override (e.g. webhook // handlers that want `ci-mainline-a1b2c3`); otherwise use the // sequenced default. Validation: reject empty overrides so the name