From 2aaf3c16c999cd7af1370f4ec9b22ad3b6b503e3 Mon Sep 17 00:00:00 2001 From: Sienna Meridian Satterwhite Date: Thu, 9 Apr 2026 15:44:59 +0100 Subject: [PATCH] feat(wfe-core): root_workflow_id, SharedVolume, configurable shell, StepExecutionContext.definition --- wfe-core/src/executor/workflow_executor.rs | 1 + wfe-core/src/models/mod.rs | 2 +- wfe-core/src/models/workflow_definition.rs | 38 +++++++++++++++++++ wfe-core/src/models/workflow_instance.rs | 16 ++++++++ wfe-core/src/primitives/mod.rs | 1 + wfe-core/src/primitives/sub_workflow.rs | 15 +++++++- .../src/test_support/in_memory_persistence.rs | 5 +++ wfe-core/src/traits/middleware.rs | 2 + wfe-core/src/traits/step.rs | 18 ++++++++- wfe-deno/src/bridge.rs | 6 +++ wfe-yaml/src/compiler.rs | 8 ++++ wfe-yaml/src/schema.rs | 25 ++++++++++++ wfe-yaml/tests/compiler.rs | 2 + wfe-yaml/tests/deno.rs | 1 + 14 files changed, 137 insertions(+), 3 deletions(-) diff --git a/wfe-core/src/executor/workflow_executor.rs b/wfe-core/src/executor/workflow_executor.rs index f976187..1be1e86 100644 --- a/wfe-core/src/executor/workflow_executor.rs +++ b/wfe-core/src/executor/workflow_executor.rs @@ -240,6 +240,7 @@ impl WorkflowExecutor { persistence_data: workflow.execution_pointers[idx].persistence_data.as_ref(), step, workflow: &workflow, + definition: Some(definition), cancellation_token, host_context, log_sink: self.log_sink.as_deref(), diff --git a/wfe-core/src/models/mod.rs b/wfe-core/src/models/mod.rs index 154dbe4..54c7d1f 100644 --- a/wfe-core/src/models/mod.rs +++ b/wfe-core/src/models/mod.rs @@ -29,7 +29,7 @@ pub use service::{ ReadinessCheck, ReadinessProbe, ServiceDefinition, ServiceEndpoint, ServicePort, }; pub use status::{PointerStatus, WorkflowStatus}; -pub use workflow_definition::{StepOutcome, WorkflowDefinition, WorkflowStep}; +pub use workflow_definition::{SharedVolume, StepOutcome, WorkflowDefinition, WorkflowStep}; pub use workflow_instance::WorkflowInstance; /// Serde helper for `Option` as milliseconds. diff --git a/wfe-core/src/models/workflow_definition.rs b/wfe-core/src/models/workflow_definition.rs index c2aaf6a..4c0e2ac 100644 --- a/wfe-core/src/models/workflow_definition.rs +++ b/wfe-core/src/models/workflow_definition.rs @@ -6,6 +6,37 @@ use super::condition::StepCondition; use super::error_behavior::ErrorBehavior; use super::service::ServiceDefinition; +/// Declaration of a volume that persists across every step in a workflow +/// run, including sub-workflows started via `type: workflow` steps. Backends +/// that support it (currently just Kubernetes) provision a single volume +/// per top-level workflow instance and mount it on every step container at +/// `mount_path`. Sub-workflows see the same volume because they share the +/// parent's isolation domain (namespace, in the K8s case). +/// +/// Declared once on the top-level workflow (e.g. `ci`) that orchestrates +/// the sub-workflows. Declarations on non-root workflows are ignored in +/// favor of the root's declaration. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct SharedVolume { + /// Absolute path the volume is mounted at inside every step container. + /// Typical value: `/workspace`. + pub mount_path: String, + /// Optional size override (e.g. `"20Gi"`). When unset the backend falls + /// back to its configured default (ClusterConfig::default_shared_volume_size + /// for the Kubernetes executor). + #[serde(default, skip_serializing_if = "Option::is_none")] + pub size: Option, +} + +impl Default for SharedVolume { + fn default() -> Self { + Self { + mount_path: "/workspace".to_string(), + size: None, + } + } +} + /// A compiled workflow definition ready for execution. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WorkflowDefinition { @@ -26,6 +57,12 @@ pub struct WorkflowDefinition { /// Infrastructure services required by this workflow (databases, caches, etc.). #[serde(default, skip_serializing_if = "Vec::is_empty")] pub services: Vec, + /// When set, the backend provisions a single persistent volume for the + /// top-level workflow instance and mounts it on every step container. + /// All sub-workflows inherit the same volume through their shared + /// namespace/isolation domain. Sub-workflow declarations are ignored. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub shared_volume: Option, } impl WorkflowDefinition { @@ -39,6 +76,7 @@ impl WorkflowDefinition { default_error_behavior: ErrorBehavior::default(), default_error_retry_interval: None, services: Vec::new(), + shared_volume: None, } } diff --git a/wfe-core/src/models/workflow_instance.rs b/wfe-core/src/models/workflow_instance.rs index 932b5b9..47757de 100644 --- a/wfe-core/src/models/workflow_instance.rs +++ b/wfe-core/src/models/workflow_instance.rs @@ -14,6 +14,18 @@ pub struct WorkflowInstance { /// `id` in lookup APIs. Empty when the instance has not yet been /// persisted (the host fills it in before the first insert). pub name: String, + /// UUID of the top-level ancestor workflow. `None` on the root + /// (user-started) workflow; set to the parent's `root_workflow_id` + /// (or the parent's `id` if the parent is itself a root) on every + /// `SubWorkflowStep`-created child. + /// + /// Used by the Kubernetes executor to place all workflows in a tree + /// under a single namespace — siblings started via `type: workflow` + /// steps share the parent's namespace and any provisioned shared + /// volume. Backends that don't care about workflow topology can + /// ignore this field. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub root_workflow_id: Option, pub workflow_definition_id: String, pub version: u32, pub description: Option, @@ -36,6 +48,10 @@ impl WorkflowInstance { id: uuid::Uuid::new_v4().to_string(), // Filled in by WorkflowHost::start_workflow before persisting. name: String::new(), + // None by default — caller (HostContextImpl) sets this when + // starting a sub-workflow so children share the parent tree's + // namespace/volume. + root_workflow_id: None, workflow_definition_id: workflow_definition_id.into(), version, description: None, diff --git a/wfe-core/src/primitives/mod.rs b/wfe-core/src/primitives/mod.rs index 6d9fb60..2058451 100644 --- a/wfe-core/src/primitives/mod.rs +++ b/wfe-core/src/primitives/mod.rs @@ -38,6 +38,7 @@ mod test_helpers { workflow: &'a WorkflowInstance, ) -> StepExecutionContext<'a> { StepExecutionContext { + definition: None, item: None, execution_pointer: pointer, persistence_data: pointer.persistence_data.as_ref(), diff --git a/wfe-core/src/primitives/sub_workflow.rs b/wfe-core/src/primitives/sub_workflow.rs index 0213c25..6622fe3 100644 --- a/wfe-core/src/primitives/sub_workflow.rs +++ b/wfe-core/src/primitives/sub_workflow.rs @@ -123,8 +123,18 @@ impl StepBody for SubWorkflowStep { } else { serde_json::json!({}) }; + // Inherit the parent's root — or, if the parent is itself a root + // (has no root set), use the parent's own id as the root for the + // child. This makes every descendant of a top-level ci run share + // the same root_workflow_id and therefore the same namespace and + // shared volume on backends that care. + let parent_root = context + .workflow + .root_workflow_id + .clone() + .or_else(|| Some(context.workflow.id.clone())); let child_instance_id = host - .start_workflow(&self.workflow_id, self.version, child_data) + .start_workflow(&self.workflow_id, self.version, child_data, parent_root) .await?; Ok(ExecutionResult::wait_for_event( @@ -171,6 +181,7 @@ mod tests { definition_id: &str, version: u32, data: serde_json::Value, + _parent_root_workflow_id: Option, ) -> std::pin::Pin> + Send + '_>> { let def_id = definition_id.to_string(); @@ -191,6 +202,7 @@ mod tests { _definition_id: &str, _version: u32, _data: serde_json::Value, + _parent_root_workflow_id: Option, ) -> std::pin::Pin> + Send + '_>> { Box::pin(async { @@ -208,6 +220,7 @@ mod tests { host: &'a dyn HostContext, ) -> StepExecutionContext<'a> { StepExecutionContext { + definition: None, item: None, execution_pointer: pointer, persistence_data: pointer.persistence_data.as_ref(), diff --git a/wfe-core/src/test_support/in_memory_persistence.rs b/wfe-core/src/test_support/in_memory_persistence.rs index 3ce0ccc..87070c1 100644 --- a/wfe-core/src/test_support/in_memory_persistence.rs +++ b/wfe-core/src/test_support/in_memory_persistence.rs @@ -59,6 +59,11 @@ impl WorkflowRepository for InMemoryPersistenceProvider { }; let mut stored = instance.clone(); stored.id = id.clone(); + // Fall back to UUID when the caller didn't assign a human name, so + // name-based lookups work (the UUID is always unique). + if stored.name.is_empty() { + stored.name = id.clone(); + } self.workflows.write().await.insert(id.clone(), stored); Ok(id) } diff --git a/wfe-core/src/traits/middleware.rs b/wfe-core/src/traits/middleware.rs index 87b0853..84b0209 100644 --- a/wfe-core/src/traits/middleware.rs +++ b/wfe-core/src/traits/middleware.rs @@ -62,6 +62,7 @@ mod tests { let pointer = ExecutionPointer::new(0); let step = WorkflowStep::new(0, "test_step"); let ctx = StepExecutionContext { + definition: None, item: None, execution_pointer: &pointer, persistence_data: None, @@ -82,6 +83,7 @@ mod tests { let pointer = ExecutionPointer::new(0); let step = WorkflowStep::new(0, "test_step"); let ctx = StepExecutionContext { + definition: None, item: None, execution_pointer: &pointer, persistence_data: None, diff --git a/wfe-core/src/traits/step.rs b/wfe-core/src/traits/step.rs index 9ad1aef..39fefd8 100644 --- a/wfe-core/src/traits/step.rs +++ b/wfe-core/src/traits/step.rs @@ -2,7 +2,9 @@ use async_trait::async_trait; use serde::Serialize; use serde::de::DeserializeOwned; -use crate::models::{ExecutionPointer, ExecutionResult, WorkflowInstance, WorkflowStep}; +use crate::models::{ + ExecutionPointer, ExecutionResult, WorkflowDefinition, WorkflowInstance, WorkflowStep, +}; /// Marker trait for all data types that flow between workflow steps. /// Anything that is serializable and deserializable qualifies. @@ -13,12 +15,19 @@ impl WorkflowData for T where T: Serialize + DeserializeOwned + Send + Sync + /// Context for steps that need to interact with the workflow host. /// Implemented by WorkflowHost to allow steps like SubWorkflow to start child workflows. +/// +/// The `parent_root_workflow_id` argument carries the UUID of the top-level +/// ancestor workflow so backends (notably Kubernetes) can place every +/// descendant of a given root run in the same isolation domain — namespace, +/// shared volume, RBAC — so sub-workflows can share state like a cloned +/// repo checkout. Pass `None` when starting a brand-new root workflow. pub trait HostContext: Send + Sync { fn start_workflow( &self, definition_id: &str, version: u32, data: serde_json::Value, + parent_root_workflow_id: Option, ) -> std::pin::Pin> + Send + '_>>; } @@ -34,6 +43,12 @@ pub struct StepExecutionContext<'a> { pub step: &'a WorkflowStep, /// The running workflow instance. pub workflow: &'a WorkflowInstance, + /// The compiled workflow definition the instance was created from. + /// `None` on code paths that don't have it available (some test fixtures); + /// production execution always populates this so executor-specific + /// features (e.g. Kubernetes shared volumes) can inspect the + /// definition-level configuration. + pub definition: Option<&'a WorkflowDefinition>, /// Cancellation token. pub cancellation_token: tokio_util::sync::CancellationToken, /// Host context for starting child workflows. None if not available. @@ -51,6 +66,7 @@ impl<'a> std::fmt::Debug for StepExecutionContext<'a> { .field("persistence_data", &self.persistence_data) .field("step", &self.step) .field("workflow", &self.workflow) + .field("definition", &self.definition.is_some()) .field("host_context", &self.host_context.is_some()) .field("log_sink", &self.log_sink.is_some()) .finish() diff --git a/wfe-deno/src/bridge.rs b/wfe-deno/src/bridge.rs index 162ea8c..b7a6462 100644 --- a/wfe-deno/src/bridge.rs +++ b/wfe-deno/src/bridge.rs @@ -189,6 +189,7 @@ mod tests { let instance = WorkflowInstance { id: "wf-1".into(), name: "test-def-1".into(), + root_workflow_id: None, workflow_definition_id: "test-def".into(), version: 1, description: None, @@ -212,6 +213,7 @@ mod tests { step.step_config = Some(serde_json::json!({"key": "val"})); let ctx = StepExecutionContext { + definition: None, item: None, execution_pointer: &pointer, persistence_data: None, @@ -236,6 +238,7 @@ mod tests { let item = serde_json::json!({"id": 42}); let ctx = StepExecutionContext { + definition: None, item: Some(&item), execution_pointer: &pointer, persistence_data: Some(&serde_json::json!({"saved": true})), @@ -360,6 +363,7 @@ mod tests { let (instance, step, pointer) = make_test_context(); let ctx = StepExecutionContext { + definition: None, item: None, execution_pointer: &pointer, persistence_data: None, @@ -397,6 +401,7 @@ mod tests { let (instance, step, pointer) = make_test_context(); let ctx = StepExecutionContext { + definition: None, item: None, execution_pointer: &pointer, persistence_data: None, @@ -428,6 +433,7 @@ mod tests { let (instance, step, pointer) = make_test_context(); let ctx = StepExecutionContext { + definition: None, item: None, execution_pointer: &pointer, persistence_data: None, diff --git a/wfe-yaml/src/compiler.rs b/wfe-yaml/src/compiler.rs index 3d509d6..d9eb927 100644 --- a/wfe-yaml/src/compiler.rs +++ b/wfe-yaml/src/compiler.rs @@ -47,6 +47,13 @@ pub fn compile(spec: &WorkflowSpec) -> Result, + /// Optional persistent volume shared across every step in this workflow + /// run, including sub-workflows. Declared once on the top-level + /// orchestrator (e.g. `ci`); ignored on non-root workflows. The Kubernetes + /// executor provisions a single PVC per top-level run and mounts it on + /// every step container at `mount_path` so steps like `git clone` in one + /// sub-workflow are visible to `cargo fmt --check` in another. + #[serde(default)] + pub shared_volume: Option, /// Allow unknown top-level keys (e.g. `_templates`) for YAML anchors. #[serde(flatten)] #[schemars(skip)] pub _extra: HashMap, } +/// Shared volume declaration, YAML form. +#[derive(Debug, Deserialize, Serialize, JsonSchema)] +pub struct YamlSharedVolume { + /// Absolute path to mount the volume at inside every step container. + /// Defaults to `/workspace` when unset. + #[serde(default = "default_shared_volume_mount")] + pub mount_path: String, + /// Optional size (e.g. `"20Gi"`). When unset the backend falls back + /// to its configured default. + #[serde(default)] + pub size: Option, +} + +fn default_shared_volume_mount() -> String { + "/workspace".to_string() +} + /// A service definition in YAML format. #[derive(Debug, Deserialize, Serialize, JsonSchema)] pub struct YamlService { diff --git a/wfe-yaml/tests/compiler.rs b/wfe-yaml/tests/compiler.rs index 642cca0..f8b3c53 100644 --- a/wfe-yaml/tests/compiler.rs +++ b/wfe-yaml/tests/compiler.rs @@ -1092,6 +1092,7 @@ workflows: _def: &str, _ver: u32, _data: serde_json::Value, + _parent_root_workflow_id: Option, ) -> Pin> + Send + '_>> { *self.called.lock().unwrap() = true; Box::pin(async { Ok("child-instance-id".to_string()) }) @@ -1105,6 +1106,7 @@ workflows: let wf_step = WfStep::new(0, &factory_key); let workflow = WorkflowInstance::new("parent", 1, serde_json::json!({})); let ctx = StepExecutionContext { + definition: None, item: None, execution_pointer: &pointer, persistence_data: None, diff --git a/wfe-yaml/tests/deno.rs b/wfe-yaml/tests/deno.rs index a98e759..12bcd20 100644 --- a/wfe-yaml/tests/deno.rs +++ b/wfe-yaml/tests/deno.rs @@ -35,6 +35,7 @@ fn make_context<'a>( pointer: &'a ExecutionPointer, ) -> StepExecutionContext<'a> { StepExecutionContext { + definition: None, item: None, execution_pointer: pointer, persistence_data: None,