diff --git a/wfe-buildkit/tests/integration_test.rs b/wfe-buildkit/tests/integration_test.rs index fd09ef5..7fc1610 100644 --- a/wfe-buildkit/tests/integration_test.rs +++ b/wfe-buildkit/tests/integration_test.rs @@ -73,6 +73,7 @@ async fn build_simple_dockerfile_via_grpc() { let (ws, pointer, instance) = make_test_context("integration-build"); let cancel = tokio_util::sync::CancellationToken::new(); let ctx = StepExecutionContext { + definition: None, item: None, execution_pointer: &pointer, persistence_data: None, @@ -157,6 +158,7 @@ async fn build_with_build_args() { let (ws, pointer, instance) = make_test_context("build-args-test"); let cancel = tokio_util::sync::CancellationToken::new(); let ctx = StepExecutionContext { + definition: None, item: None, execution_pointer: &pointer, persistence_data: None, @@ -208,6 +210,7 @@ async fn connect_to_unavailable_daemon_returns_error() { let (ws, pointer, instance) = make_test_context("error-test"); let cancel = tokio_util::sync::CancellationToken::new(); let ctx = StepExecutionContext { + definition: None, item: None, execution_pointer: &pointer, persistence_data: None, diff --git a/wfe-containerd/tests/integration.rs b/wfe-containerd/tests/integration.rs index f7bd062..28a71b6 100644 --- a/wfe-containerd/tests/integration.rs +++ b/wfe-containerd/tests/integration.rs @@ -68,6 +68,7 @@ fn make_context<'a>( pointer: &'a ExecutionPointer, ) -> StepExecutionContext<'a> { StepExecutionContext { + definition: None, item: None, execution_pointer: pointer, persistence_data: None, diff --git a/wfe-core/src/test_support/lock_suite.rs b/wfe-core/src/test_support/lock_suite.rs index 5084138..744e35d 100644 --- a/wfe-core/src/test_support/lock_suite.rs +++ b/wfe-core/src/test_support/lock_suite.rs @@ -44,6 +44,39 @@ macro_rules! lock_suite { // Should not error even if lock was never acquired provider.release_lock("nonexistent").await.unwrap(); } + + #[tokio::test] + async fn different_resources_are_independent() { + let provider = ($factory)().await; + assert!(provider.acquire_lock("resource-a").await.unwrap()); + // Different resource id doesn't block on the first. + assert!(provider.acquire_lock("resource-b").await.unwrap()); + // Now trying to reacquire either fails while held. + assert!(!provider.acquire_lock("resource-a").await.unwrap()); + assert!(!provider.acquire_lock("resource-b").await.unwrap()); + provider.release_lock("resource-a").await.unwrap(); + provider.release_lock("resource-b").await.unwrap(); + } + + #[tokio::test] + async fn start_and_stop_lifecycle_are_idempotent() { + let provider = ($factory)().await; + provider.start().await.unwrap(); + provider.start().await.unwrap(); + provider.stop().await.unwrap(); + provider.stop().await.unwrap(); + } + + #[tokio::test] + async fn acquire_release_acquire_roundtrip() { + let provider = ($factory)().await; + for _ in 0..5 { + assert!(provider.acquire_lock("cycling").await.unwrap()); + provider.release_lock("cycling").await.unwrap(); + } + assert!(provider.acquire_lock("cycling").await.unwrap()); + assert!(!provider.acquire_lock("cycling").await.unwrap()); + } } }; } diff --git a/wfe-core/src/test_support/persistence_suite.rs b/wfe-core/src/test_support/persistence_suite.rs index c6f51ed..22230c6 100644 --- a/wfe-core/src/test_support/persistence_suite.rs +++ b/wfe-core/src/test_support/persistence_suite.rs @@ -238,6 +238,379 @@ macro_rules! persistence_suite { assert_eq!(w.id, *id); } } + + // ─── 1.9 name / sequence / root_workflow_id coverage ──────── + + #[tokio::test] + async fn next_definition_sequence_is_monotonic_per_definition() { + let provider = ($factory)().await; + + // Persistent backends (postgres) keep the sequence counter + // table across test runs, so we need unique definition ids + // per test invocation to get deterministic starting values. + let id_a = format!( + "ci-{}", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() + ); + let id_b = format!("{id_a}-other"); + + // First definition counter starts at 1 and increments. + assert_eq!(provider.next_definition_sequence(&id_a).await.unwrap(), 1); + assert_eq!(provider.next_definition_sequence(&id_a).await.unwrap(), 2); + assert_eq!(provider.next_definition_sequence(&id_a).await.unwrap(), 3); + + // Second definition has an independent counter. + assert_eq!(provider.next_definition_sequence(&id_b).await.unwrap(), 1); + assert_eq!(provider.next_definition_sequence(&id_b).await.unwrap(), 2); + + // First definition's counter unaffected. + assert_eq!(provider.next_definition_sequence(&id_a).await.unwrap(), 4); + } + + #[tokio::test] + async fn get_workflow_instance_by_name_resolves_human_name() { + let provider = ($factory)().await; + + let mut w = WorkflowInstance::new("test-wf", 1, serde_json::json!({})); + w.name = "ci-42".into(); + let id = provider.create_new_workflow(&w).await.unwrap(); + + // Fetch by human name returns the same row as fetch-by-id. + let by_name = provider.get_workflow_instance_by_name("ci-42").await.unwrap(); + assert_eq!(by_name.id, id); + assert_eq!(by_name.name, "ci-42"); + + // Nonexistent name surfaces WorkflowNotFound. + let missing = provider + .get_workflow_instance_by_name("no-such-name") + .await; + assert!(missing.is_err()); + } + + #[tokio::test] + async fn root_workflow_id_persists_across_save_and_load() { + let provider = ($factory)().await; + + let parent_id = { + let mut p = WorkflowInstance::new("parent", 1, serde_json::json!({})); + p.name = "parent-1".into(); + provider.create_new_workflow(&p).await.unwrap() + }; + + let mut child = WorkflowInstance::new("child", 1, serde_json::json!({})); + child.name = "child-1".into(); + child.root_workflow_id = Some(parent_id.clone()); + let child_id = provider.create_new_workflow(&child).await.unwrap(); + + let loaded = provider.get_workflow_instance(&child_id).await.unwrap(); + assert_eq!(loaded.root_workflow_id.as_deref(), Some(parent_id.as_str())); + + // Round-trip through persist_workflow too. + let mut updated = loaded.clone(); + updated.description = Some("updated".into()); + provider.persist_workflow(&updated).await.unwrap(); + let reloaded = provider.get_workflow_instance(&child_id).await.unwrap(); + assert_eq!( + reloaded.root_workflow_id.as_deref(), + Some(parent_id.as_str()) + ); + assert_eq!(reloaded.description.as_deref(), Some("updated")); + } + + // ─── Additional SubscriptionRepository coverage ──────────── + + #[tokio::test] + async fn subscription_token_lifecycle() { + let provider = ($factory)().await; + let now = Utc::now(); + let sub = + EventSubscription::new("wf-1", 0, "ptr-1", "evt", "key", now); + let id = provider.create_event_subscription(&sub).await.unwrap(); + + // Claim the subscription with a token — returns true on success. + let claimed = provider + .set_subscription_token(&id, "tok-a", "worker-1", now + Duration::seconds(30)) + .await + .unwrap(); + assert!(claimed); + + // A second set_subscription_token with a different token while + // the first is still held should fail to claim. + let reclaimed = provider + .set_subscription_token(&id, "tok-b", "worker-2", now + Duration::seconds(30)) + .await + .unwrap_or(false); + assert!(!reclaimed, "token should not be reclaimed while still held"); + + // Clearing with the correct token releases the subscription. + provider.clear_subscription_token(&id, "tok-a").await.unwrap(); + + // Now another worker can claim it. + let re = provider + .set_subscription_token(&id, "tok-b", "worker-2", now + Duration::seconds(30)) + .await + .unwrap(); + assert!(re); + } + + #[tokio::test] + async fn get_first_open_subscription_returns_unlocked_only() { + let provider = ($factory)().await; + let now = Utc::now(); + + // Two subscriptions matching the same (event_name, event_key) + // — the first gets claimed, then get_first_open should return + // the second. + let sub1 = + EventSubscription::new("wf-1", 0, "p1", "order.created", "k", now); + let id1 = provider.create_event_subscription(&sub1).await.unwrap(); + let sub2 = + EventSubscription::new("wf-2", 0, "p2", "order.created", "k", now); + let _id2 = provider.create_event_subscription(&sub2).await.unwrap(); + + provider + .set_subscription_token(&id1, "tok", "w", now + Duration::seconds(30)) + .await + .unwrap(); + + let first_open = provider + .get_first_open_subscription("order.created", "k", now + Duration::seconds(1)) + .await + .unwrap(); + assert!(first_open.is_some()); + // The open one is the un-claimed wf-2, not the claimed wf-1. + let open = first_open.unwrap(); + assert_eq!(open.workflow_id, "wf-2"); + } + + #[tokio::test] + async fn persist_workflow_with_subscriptions_round_trip() { + let provider = ($factory)().await; + let mut w = WorkflowInstance::new("sub-wf", 1, serde_json::json!({})); + let id = provider.create_new_workflow(&w).await.unwrap(); + w.id = id.clone(); + + let now = Utc::now(); + let subs = vec![ + EventSubscription::new(&id, 0, "p-0", "a.evt", "k1", now), + EventSubscription::new(&id, 1, "p-1", "b.evt", "k2", now), + ]; + provider + .persist_workflow_with_subscriptions(&w, &subs) + .await + .unwrap(); + + let fetched = provider + .get_subscriptions("a.evt", "k1", now + Duration::seconds(1)) + .await + .unwrap(); + assert_eq!(fetched.len(), 1); + assert_eq!(fetched[0].workflow_id, id); + } + + // ─── Additional EventRepository coverage ──────────────────── + + #[tokio::test] + async fn mark_event_unprocessed_reverses_processed_flag() { + let provider = ($factory)().await; + let event = Event::new("evt", "key", serde_json::json!(null)); + let id = provider.create_event(&event).await.unwrap(); + + provider.mark_event_processed(&id).await.unwrap(); + let processed = provider.get_event(&id).await.unwrap(); + assert!(processed.is_processed); + + provider.mark_event_unprocessed(&id).await.unwrap(); + let unprocessed = provider.get_event(&id).await.unwrap(); + assert!(!unprocessed.is_processed); + } + + #[tokio::test] + async fn get_events_returns_matching_ids() { + let provider = ($factory)().await; + let now = Utc::now(); + + let e1 = Event::new("foo.created", "abc", serde_json::json!({})); + let id1 = provider.create_event(&e1).await.unwrap(); + let e2 = Event::new("foo.created", "xyz", serde_json::json!({})); + let _id2 = provider.create_event(&e2).await.unwrap(); + let e3 = Event::new("bar.created", "abc", serde_json::json!({})); + let _id3 = provider.create_event(&e3).await.unwrap(); + + let matching = provider + .get_events("foo.created", "abc", now + Duration::seconds(1)) + .await + .unwrap(); + assert!(matching.contains(&id1)); + assert_eq!(matching.len(), 1); + } + + // ─── get_workflow_instances (batch fetch) ───────────────── + + #[tokio::test] + async fn get_workflow_instances_fetches_multiple_by_id() { + let provider = ($factory)().await; + + let w1 = WorkflowInstance::new("a", 1, serde_json::json!({})); + let id1 = provider.create_new_workflow(&w1).await.unwrap(); + let w2 = WorkflowInstance::new("b", 1, serde_json::json!({})); + let id2 = provider.create_new_workflow(&w2).await.unwrap(); + let w3 = WorkflowInstance::new("c", 1, serde_json::json!({})); + let id3 = provider.create_new_workflow(&w3).await.unwrap(); + + let fetched = provider + .get_workflow_instances(&[id1.clone(), id2.clone(), id3.clone()]) + .await + .unwrap(); + assert_eq!(fetched.len(), 3); + + // Missing ids are silently filtered out. + let partial = provider + .get_workflow_instances(&[id1.clone(), "never".into()]) + .await + .unwrap(); + assert_eq!(partial.len(), 1); + assert_eq!(partial[0].id, id1); + } + + // ─── WorkflowNotFound on bogus id ───────────────────────── + + #[tokio::test] + async fn get_workflow_instance_missing_is_workflow_not_found() { + let provider = ($factory)().await; + let err = provider + .get_workflow_instance("definitely-not-an-id") + .await + .unwrap_err(); + assert!(matches!(err, $crate::WfeError::WorkflowNotFound(_))); + } + + // ─── ensure_store_exists idempotency ────────────────────── + + #[tokio::test] + async fn ensure_store_exists_is_idempotent() { + let provider = ($factory)().await; + // Calling twice in a row should not error (schema already there). + provider.ensure_store_exists().await.unwrap(); + provider.ensure_store_exists().await.unwrap(); + } + + // ─── Execution pointer round-trip ────────────────────────── + // + // Pointers carry the bulk of the per-step state and touch the + // trickiest serialization paths (persistence_data, event_data, + // scope, children, extension_attributes). Explicitly round-trip + // one through create → update → fetch to catch marshalling bugs. + + #[tokio::test] + async fn execution_pointer_round_trip() { + use $crate::models::{ExecutionPointer, PointerStatus}; + + let provider = ($factory)().await; + + let mut instance = + WorkflowInstance::new("ptr-test", 1, serde_json::json!({})); + let mut ptr = ExecutionPointer::new(0); + ptr.status = PointerStatus::Running; + ptr.step_name = Some("first".into()); + ptr.persistence_data = Some(serde_json::json!({"cursor": 7})); + ptr.event_name = Some("order.paid".into()); + ptr.event_key = Some("order-42".into()); + ptr.event_published = false; + ptr.retry_count = 2; + ptr.scope = vec!["parent-scope".into()]; + ptr.children = vec!["child-a".into(), "child-b".into()]; + ptr.extension_attributes = { + let mut m = std::collections::HashMap::new(); + m.insert("owner".to_string(), serde_json::json!("alice")); + m + }; + instance.execution_pointers.push(ptr); + + let id = provider.create_new_workflow(&instance).await.unwrap(); + let fetched = provider.get_workflow_instance(&id).await.unwrap(); + + assert_eq!(fetched.execution_pointers.len(), 1); + let out = &fetched.execution_pointers[0]; + assert_eq!(out.status, PointerStatus::Running); + assert_eq!(out.step_name.as_deref(), Some("first")); + assert_eq!( + out.persistence_data.as_ref().map(|v| v["cursor"].as_u64()), + Some(Some(7)) + ); + assert_eq!(out.event_name.as_deref(), Some("order.paid")); + assert_eq!(out.retry_count, 2); + assert_eq!(out.scope, vec!["parent-scope".to_string()]); + assert_eq!(out.children.len(), 2); + assert_eq!( + out.extension_attributes.get("owner"), + Some(&serde_json::json!("alice")) + ); + } + + // ─── ScheduledCommandRepository ──────────────────────────── + + #[tokio::test] + async fn scheduled_commands_round_trip_when_supported() { + use $crate::models::{CommandName, ScheduledCommand}; + use $crate::traits::ScheduledCommandRepository; + + let provider = ($factory)().await; + + // Some backends (postgres, sqlite) support scheduled + // commands; others don't. Skip the test cleanly on backends + // that report no support rather than forcing a hard-coded + // opt-in list here. + if !provider.supports_scheduled_commands() { + return; + } + + // Use a unique data payload so the UNIQUE(command_name, data) + // index doesn't collide with previous runs on persistent + // backends. + let unique = format!( + "payload-{}", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos() + ); + let cmd = ScheduledCommand { + command_name: CommandName::ProcessWorkflow, + data: unique.clone(), + execute_time: 0, + }; + provider.schedule_command(&cmd).await.unwrap(); + + // Double-scheduling the same (command_name, data) must not + // blow up — the implementation uses ON CONFLICT DO NOTHING + // semantics so this is idempotent. + provider.schedule_command(&cmd).await.unwrap(); + + // Process due commands at a point well past execute_time. + let processed = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let counter = processed.clone(); + provider + .process_commands( + Utc::now() + Duration::seconds(1), + &|_c: ScheduledCommand| { + let counter = counter.clone(); + Box::pin(async move { + counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Ok(()) + }) + }, + ) + .await + .unwrap(); + assert!( + processed.load(std::sync::atomic::Ordering::SeqCst) >= 1, + "expected at least one scheduled command to be processed" + ); + } } }; } diff --git a/wfe-core/src/test_support/queue_suite.rs b/wfe-core/src/test_support/queue_suite.rs index 9dbddbf..4954be6 100644 --- a/wfe-core/src/test_support/queue_suite.rs +++ b/wfe-core/src/test_support/queue_suite.rs @@ -100,6 +100,96 @@ macro_rules! queue_suite { .is_none() ); } + + #[tokio::test] + async fn index_queue_type_is_isolated() { + let provider = ($factory)().await; + provider + .queue_work("idx-1", QueueType::Index) + .await + .unwrap(); + provider + .queue_work("idx-2", QueueType::Index) + .await + .unwrap(); + provider + .queue_work("wf-1", QueueType::Workflow) + .await + .unwrap(); + + // Index queue drains in FIFO order... + assert_eq!( + provider + .dequeue_work(QueueType::Index) + .await + .unwrap() + .as_deref(), + Some("idx-1") + ); + assert_eq!( + provider + .dequeue_work(QueueType::Index) + .await + .unwrap() + .as_deref(), + Some("idx-2") + ); + // ...and doesn't disturb the Workflow queue. + assert_eq!( + provider + .dequeue_work(QueueType::Workflow) + .await + .unwrap() + .as_deref(), + Some("wf-1") + ); + } + + #[tokio::test] + async fn start_and_stop_lifecycle_are_idempotent() { + let provider = ($factory)().await; + // Both start and stop should be no-ops that can be called + // multiple times without error regardless of backend. + provider.start().await.unwrap(); + provider.start().await.unwrap(); + provider.stop().await.unwrap(); + provider.stop().await.unwrap(); + } + + #[tokio::test] + async fn is_dequeue_blocking_is_stable() { + let provider = ($factory)().await; + // Pure property — just make sure it doesn't panic and is + // consistent between calls. Different backends return + // different values; we only care the call works. + let a = provider.is_dequeue_blocking(); + let b = provider.is_dequeue_blocking(); + assert_eq!(a, b); + } + + #[tokio::test] + async fn enqueue_many_then_drain() { + let provider = ($factory)().await; + for i in 0..20u32 { + provider + .queue_work(&format!("item-{i}"), QueueType::Workflow) + .await + .unwrap(); + } + + for i in 0..20u32 { + let got = provider.dequeue_work(QueueType::Workflow).await.unwrap(); + assert_eq!(got.as_deref(), Some(format!("item-{i}").as_str())); + } + + assert!( + provider + .dequeue_work(QueueType::Workflow) + .await + .unwrap() + .is_none() + ); + } } }; } diff --git a/wfe-kubernetes/tests/integration.rs b/wfe-kubernetes/tests/integration.rs index 344434e..0fbc276 100644 --- a/wfe-kubernetes/tests/integration.rs +++ b/wfe-kubernetes/tests/integration.rs @@ -38,6 +38,7 @@ fn step_config(image: &str, run: &str) -> KubernetesStepConfig { image: image.into(), command: None, run: Some(run.into()), + shell: None, env: HashMap::new(), working_dir: None, memory: None, @@ -97,6 +98,7 @@ async fn run_echo_job() { let pointer = wfe_core::models::ExecutionPointer::new(0); let ctx = wfe_core::traits::step::StepExecutionContext { + definition: None, item: None, execution_pointer: &pointer, persistence_data: None, @@ -131,7 +133,7 @@ async fn run_job_with_wfe_output() { let ns = unique_id("output"); let mut step_cfg = step_config( "alpine:3.18", - r#"echo '##wfe[output version=1.2.3]' && echo '##wfe[output status=ok]'"#, + r###"echo '##wfe[output version=1.2.3]' && echo '##wfe[output status=ok]'"###, ); step_cfg.namespace = Some(ns.clone()); @@ -144,6 +146,7 @@ async fn run_job_with_wfe_output() { let pointer = wfe_core::models::ExecutionPointer::new(0); let ctx = wfe_core::traits::step::StepExecutionContext { + definition: None, item: None, execution_pointer: &pointer, persistence_data: None, @@ -187,6 +190,7 @@ async fn run_job_with_env_vars() { let pointer = wfe_core::models::ExecutionPointer::new(0); let ctx = wfe_core::traits::step::StepExecutionContext { + definition: None, item: None, execution_pointer: &pointer, persistence_data: None, @@ -224,6 +228,7 @@ async fn run_job_nonzero_exit_fails() { let pointer = wfe_core::models::ExecutionPointer::new(0); let ctx = wfe_core::traits::step::StepExecutionContext { + definition: None, item: None, execution_pointer: &pointer, persistence_data: None, @@ -261,6 +266,7 @@ async fn run_job_with_timeout() { let pointer = wfe_core::models::ExecutionPointer::new(0); let ctx = wfe_core::traits::step::StepExecutionContext { + definition: None, item: None, execution_pointer: &pointer, persistence_data: None, @@ -548,3 +554,240 @@ async fn service_provider_teardown_without_provision() { // Namespace doesn't exist, so delete_namespace returns an error. assert!(result.is_err()); } + +// ── End-to-end: multi-step workflow with shared volume ────────────── +// +// The tests above exercise the K8s step executor one step at a time. This +// test drives a realistic multi-step pipeline end-to-end, covering every +// feature that real CI workflows depend on: +// +// * multiple step containers in the same workflow (different images +// per step so we can confirm cross-image sharing through the PVC) +// * a `shared_volume` PVC provisioned on the first step, persisting +// to every subsequent step +// * the `/bin/bash` shell override so a step can use bash-only features +// * `extract_workflow_env` threading inputs through as uppercase env +// vars to every step +// * `##wfe[output ...]` capture from a later step's stdout +// * a deliberate non-zero exit on a trailing step proving downstream +// error propagation +// * namespace lifecycle: created on first step, reused by all +// siblings, deleted at the end +// +// The pipeline simulates a tiny CI run: +// +// 1. `write-files` (alpine:3.18) → writes `version.txt` + `input.sh` +// to /workspace via `/bin/sh` +// 2. `compute-hash` (busybox:1.36) → reads files from /workspace, +// computes a sha256 with `sha256sum`, +// emits ##wfe[output] lines +// 3. `verify-bash` (alpine:3.18+bash) → uses `set -o pipefail` and +// arrays to verify the hash +// 4. `inject-env` (alpine:3.18) → echoes workflow-data env vars +// (REPO, BRANCH) through outputs +// +// Without the shared volume, step 2 couldn't see step 1's files. Without +// the shell override, step 3 would fail at `set -o pipefail`. Without +// inputs → env mapping, step 4's $REPO would be empty. +#[tokio::test] +async fn multi_step_workflow_with_shared_volume() { + use tokio_util::sync::CancellationToken; + use wfe_core::models::{ + ExecutionPointer, SharedVolume, WorkflowDefinition, WorkflowInstance, WorkflowStep, + }; + use wfe_core::traits::step::{StepBody, StepExecutionContext}; + + let cluster = cluster_config(); + let client = client::create_client(&cluster).await.unwrap(); + + let root_id = unique_id("multistep"); + + // The definition declares a shared /workspace volume. The K8s + // executor reads this from `ctx.definition` and provisions a PVC + // on the first step; every subsequent step in the same namespace + // mounts the same claim. + let definition = WorkflowDefinition { + id: "multistep-ci".into(), + name: Some("Multi-Step Integration Test".into()), + version: 1, + description: None, + steps: vec![], + default_error_behavior: Default::default(), + default_error_retry_interval: None, + services: vec![], + shared_volume: Some(SharedVolume { + mount_path: "/workspace".into(), + size: Some("1Gi".into()), + }), + }; + + // Single WorkflowInstance for the whole pipeline. Each step below + // reuses it so they all share the same namespace (derived from + // root_workflow_id → id fallback) and therefore the same PVC. + let instance = WorkflowInstance { + id: root_id.clone(), + name: "multistep-1".into(), + root_workflow_id: None, + workflow_definition_id: "multistep-ci".into(), + version: 1, + description: None, + reference: None, + execution_pointers: vec![], + next_execution: None, + status: wfe_core::models::WorkflowStatus::Runnable, + data: serde_json::json!({"repo": "wfe", "branch": "mainline"}), + create_time: chrono::Utc::now(), + complete_time: None, + }; + + let ns = crate::namespace::namespace_name(&cluster.namespace_prefix, &root_id); + + // Shared helper to run one step and assert the proceed flag + return + // the captured output JSON. + async fn run_step( + step_cfg: KubernetesStepConfig, + step_name: &str, + instance: &WorkflowInstance, + definition: &WorkflowDefinition, + cluster: &wfe_kubernetes::config::ClusterConfig, + client: &kube::Client, + ) -> serde_json::Value { + let mut step = + wfe_kubernetes::KubernetesStep::new(step_cfg, cluster.clone(), client.clone()); + let mut ws = WorkflowStep::new(0, step_name); + ws.name = Some(step_name.into()); + let pointer = ExecutionPointer::new(0); + + let ctx = StepExecutionContext { + item: None, + execution_pointer: &pointer, + persistence_data: None, + step: &ws, + workflow: instance, + definition: Some(definition), + cancellation_token: CancellationToken::new(), + host_context: None, + log_sink: None, + }; + + let result = step.run(&ctx).await.unwrap_or_else(|e| { + panic!("step '{step_name}' failed: {e}"); + }); + assert!(result.proceed, "step '{step_name}' did not proceed"); + result.output_data.expect("output_data missing") + } + + // The final cleanup call at the bottom of this function handles the + // happy-path teardown. If any assertion below panics the namespace + // will be left behind; the test harness runs `cleanup_stale_namespaces` + // to reap those on the next run. + let _ = &client; // acknowledge unused in guard-less form + + // ── Step 1: write files to /workspace via /bin/sh on alpine ──── + let mut s1 = step_config( + "alpine:3.18", + r###" +mkdir -p /workspace/pipeline +echo "1.9.0-test" > /workspace/pipeline/version.txt +printf 'hello from step 1\n' > /workspace/pipeline/input.sh +ls -la /workspace/pipeline +echo "##wfe[output step1_ok=true]" +"###, + ); + s1.namespace = Some(ns.clone()); + let out1 = run_step(s1, "write-files", &instance, &definition, &cluster, &client).await; + // `true` parses as a JSON boolean in build_output_data, not a string. + assert_eq!(out1["step1_ok"], serde_json::Value::Bool(true)); + + // ── Step 2: read the files written by step 1, hash them ──────── + // Uses a DIFFERENT image (busybox) so we prove cross-image + // /workspace sharing through the PVC, not just container layer + // reuse. sha256sum output is emitted as ##wfe[output hash=...]. + let mut s2 = step_config( + "busybox:1.36", + r###" +cd /workspace/pipeline +test -f version.txt || { echo "version.txt missing" >&2; exit 1; } +test -f input.sh || { echo "input.sh missing" >&2; exit 1; } +HASH=$(sha256sum version.txt | cut -c1-16) +VERSION=$(cat version.txt) +echo "##wfe[output hash=$HASH]" +echo "##wfe[output version=$VERSION]" +"###, + ); + s2.namespace = Some(ns.clone()); + let out2 = run_step( + s2, + "compute-hash", + &instance, + &definition, + &cluster, + &client, + ) + .await; + assert_eq!(out2["version"], "1.9.0-test"); + let hash = out2["hash"].as_str().expect("hash in output"); + assert_eq!(hash.len(), 16, "hash should be 16 hex chars: {hash}"); + assert!( + hash.chars().all(|c| c.is_ascii_hexdigit()), + "hash not hex: {hash}" + ); + + // ── Step 3: bash-only features (pipefail + arrays) ────────────── + // `alpine:3.18` doesn't have bash; use the bash-tagged image and + // explicit shell override to prove the `shell:` config works + // end-to-end. + // Use debian:bookworm-slim — the `bash:5` image on docker hub mangles + // its entrypoint such that `/bin/bash -c