feat(wfe-containerd): rewrite to use generated containerd gRPC protos
Replaced nerdctl CLI shell-out with direct gRPC communication via wfe-containerd-protos (tonic 0.14). Connects to containerd daemon over Unix socket. Implementation: - connect() with tonic Unix socket connector - ensure_image() via ImagesClient (full pull is TODO) - build_oci_spec() constructing OCI runtime spec with process args, env, user, cwd, mounts, and linux namespaces - Container lifecycle: create → snapshot → task create → start → wait → read FIFOs → cleanup - containerd-namespace header injection on every request FIFO-based stdout/stderr capture using named pipes. 40 tests, 88% line coverage (cargo-llvm-cov).
This commit is contained in:
@@ -9,12 +9,19 @@ description = "containerd container runner executor for WFE"
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
wfe-core = { workspace = true }
|
wfe-core = { workspace = true }
|
||||||
|
wfe-containerd-protos = { path = "../wfe-containerd-protos" }
|
||||||
tokio = { workspace = true }
|
tokio = { workspace = true }
|
||||||
serde = { workspace = true }
|
serde = { workspace = true }
|
||||||
serde_json = { workspace = true }
|
serde_json = { workspace = true }
|
||||||
async-trait = { workspace = true }
|
async-trait = { workspace = true }
|
||||||
tracing = { workspace = true }
|
tracing = { workspace = true }
|
||||||
thiserror = { workspace = true }
|
thiserror = { workspace = true }
|
||||||
|
tonic = "0.14"
|
||||||
|
tower = "0.5"
|
||||||
|
hyper-util = { version = "0.1", features = ["tokio"] }
|
||||||
|
prost-types = "0.14"
|
||||||
|
uuid = { version = "1", features = ["v4"] }
|
||||||
|
libc = "0.2"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
pretty_assertions = { workspace = true }
|
pretty_assertions = { workspace = true }
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -1,27 +1,52 @@
|
|||||||
use std::collections::HashMap;
|
//! Integration tests for the containerd gRPC-based runner.
|
||||||
use std::io::Write;
|
//!
|
||||||
use std::os::unix::fs::PermissionsExt;
|
//! These tests require a live containerd daemon. They are skipped when the
|
||||||
|
//! socket is not available. Set `WFE_CONTAINERD_ADDR` to point to a custom
|
||||||
|
//! socket, or use the default `~/.lima/wfe-test/sock/containerd.sock`.
|
||||||
|
//!
|
||||||
|
//! Before running, ensure the test image is pre-pulled:
|
||||||
|
//! ctr -n default image pull docker.io/library/alpine:3.18
|
||||||
|
|
||||||
use tempfile::TempDir;
|
use std::collections::HashMap;
|
||||||
use wfe_containerd::config::{ContainerdConfig, RegistryAuth, TlsConfig};
|
use std::path::Path;
|
||||||
|
|
||||||
|
use wfe_containerd::config::{ContainerdConfig, TlsConfig};
|
||||||
use wfe_containerd::ContainerdStep;
|
use wfe_containerd::ContainerdStep;
|
||||||
use wfe_core::models::{ExecutionPointer, WorkflowInstance, WorkflowStep};
|
use wfe_core::models::{ExecutionPointer, WorkflowInstance, WorkflowStep};
|
||||||
use wfe_core::traits::step::{StepBody, StepExecutionContext};
|
use wfe_core::traits::step::{StepBody, StepExecutionContext};
|
||||||
|
|
||||||
fn minimal_config() -> ContainerdConfig {
|
/// Returns the containerd socket address if available, or None.
|
||||||
|
fn containerd_addr() -> Option<String> {
|
||||||
|
let addr = std::env::var("WFE_CONTAINERD_ADDR").unwrap_or_else(|_| {
|
||||||
|
format!(
|
||||||
|
"unix://{}/.lima/wfe-test/sock/containerd.sock",
|
||||||
|
std::env::var("HOME").unwrap_or_else(|_| "/root".to_string())
|
||||||
|
)
|
||||||
|
});
|
||||||
|
|
||||||
|
let socket_path = addr.strip_prefix("unix://").unwrap_or(addr.as_str());
|
||||||
|
|
||||||
|
if Path::new(socket_path).exists() {
|
||||||
|
Some(addr)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn minimal_config(addr: &str) -> ContainerdConfig {
|
||||||
ContainerdConfig {
|
ContainerdConfig {
|
||||||
image: "alpine:3.18".to_string(),
|
image: "docker.io/library/alpine:3.18".to_string(),
|
||||||
command: None,
|
command: None,
|
||||||
run: Some("echo hello".to_string()),
|
run: Some("echo hello".to_string()),
|
||||||
env: HashMap::new(),
|
env: HashMap::new(),
|
||||||
volumes: vec![],
|
volumes: vec![],
|
||||||
working_dir: None,
|
working_dir: None,
|
||||||
user: "65534:65534".to_string(),
|
user: "0:0".to_string(),
|
||||||
network: "none".to_string(),
|
network: "none".to_string(),
|
||||||
memory: None,
|
memory: None,
|
||||||
cpu: None,
|
cpu: None,
|
||||||
pull: "never".to_string(),
|
pull: "never".to_string(),
|
||||||
containerd_addr: "/run/containerd/containerd.sock".to_string(),
|
containerd_addr: addr.to_string(),
|
||||||
cli: "nerdctl".to_string(),
|
cli: "nerdctl".to_string(),
|
||||||
tls: TlsConfig::default(),
|
tls: TlsConfig::default(),
|
||||||
registry_auth: HashMap::new(),
|
registry_auth: HashMap::new(),
|
||||||
@@ -29,15 +54,6 @@ fn minimal_config() -> ContainerdConfig {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a fake nerdctl script in a temp dir.
|
|
||||||
fn create_fake_nerdctl(dir: &TempDir, script: &str) -> String {
|
|
||||||
let nerdctl_path = dir.path().join("nerdctl");
|
|
||||||
let mut file = std::fs::File::create(&nerdctl_path).unwrap();
|
|
||||||
file.write_all(script.as_bytes()).unwrap();
|
|
||||||
std::fs::set_permissions(&nerdctl_path, std::fs::Permissions::from_mode(0o755)).unwrap();
|
|
||||||
dir.path().to_string_lossy().to_string()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn make_context<'a>(
|
fn make_context<'a>(
|
||||||
step: &'a WorkflowStep,
|
step: &'a WorkflowStep,
|
||||||
workflow: &'a WorkflowInstance,
|
workflow: &'a WorkflowInstance,
|
||||||
@@ -53,90 +69,11 @@ fn make_context<'a>(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Wrapper for set_var that handles the Rust 2024 unsafe requirement.
|
// ── Connection error for missing socket ──────────────────────────────
|
||||||
fn set_path(value: &str) {
|
|
||||||
// SAFETY: These tests run sequentially (nextest runs each test in its own process)
|
|
||||||
// so concurrent mutation of environment variables is not a concern.
|
|
||||||
unsafe {
|
|
||||||
std::env::set_var("PATH", value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Happy-path: run succeeds with output parsing ───────────────────
|
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn run_success_with_outputs() {
|
async fn connect_error_for_missing_socket() {
|
||||||
let tmp = TempDir::new().unwrap();
|
let config = minimal_config("/tmp/nonexistent-wfe-containerd-integ.sock");
|
||||||
let wfe_marker = "##wfe[output";
|
|
||||||
let script = format!(
|
|
||||||
"#!/bin/sh\n\
|
|
||||||
while [ $# -gt 0 ]; do\n\
|
|
||||||
case \"$1\" in\n\
|
|
||||||
run) echo 'hello from container'\n\
|
|
||||||
echo '{wfe_marker} result=success]'\n\
|
|
||||||
echo '{wfe_marker} version=1.0.0]'\n\
|
|
||||||
exit 0;;\n\
|
|
||||||
pull) echo 'Pulling...'; exit 0;;\n\
|
|
||||||
login) exit 0;;\n\
|
|
||||||
--*) shift; shift;;\n\
|
|
||||||
*) shift;;\n\
|
|
||||||
esac\n\
|
|
||||||
done\n\
|
|
||||||
exit 1\n"
|
|
||||||
);
|
|
||||||
let bin_dir = create_fake_nerdctl(&tmp, &script);
|
|
||||||
|
|
||||||
let mut config = minimal_config();
|
|
||||||
config.pull = "always".to_string();
|
|
||||||
let mut step = ContainerdStep::new(config);
|
|
||||||
|
|
||||||
let mut named_step = WorkflowStep::new(0, "containerd");
|
|
||||||
named_step.name = Some("my_container".to_string());
|
|
||||||
let workflow = WorkflowInstance::new("test-wf", 1, serde_json::json!({}));
|
|
||||||
let pointer = ExecutionPointer::new(0);
|
|
||||||
let ctx = make_context(&named_step, &workflow, &pointer);
|
|
||||||
|
|
||||||
let original_path = std::env::var("PATH").unwrap_or_default();
|
|
||||||
set_path(&format!("{bin_dir}:{original_path}"));
|
|
||||||
|
|
||||||
let result = step.run(&ctx).await;
|
|
||||||
|
|
||||||
set_path(&original_path);
|
|
||||||
|
|
||||||
let result = result.expect("run should succeed");
|
|
||||||
assert!(result.proceed);
|
|
||||||
let output = result.output_data.unwrap();
|
|
||||||
let obj = output.as_object().unwrap();
|
|
||||||
assert_eq!(obj.get("result").unwrap(), "success");
|
|
||||||
assert_eq!(obj.get("version").unwrap(), "1.0.0");
|
|
||||||
assert!(
|
|
||||||
obj.get("my_container.stdout")
|
|
||||||
.unwrap()
|
|
||||||
.as_str()
|
|
||||||
.unwrap()
|
|
||||||
.contains("hello from container")
|
|
||||||
);
|
|
||||||
assert_eq!(obj.get("my_container.exit_code").unwrap(), 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Non-zero exit code ─────────────────────────────────────────────
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn run_container_failure() {
|
|
||||||
let tmp = TempDir::new().unwrap();
|
|
||||||
let script = "#!/bin/sh\n\
|
|
||||||
while [ $# -gt 0 ]; do\n\
|
|
||||||
case \"$1\" in\n\
|
|
||||||
run) echo 'something went wrong' >&2; exit 1;;\n\
|
|
||||||
pull) exit 0;;\n\
|
|
||||||
--*) shift; shift;;\n\
|
|
||||||
*) shift;;\n\
|
|
||||||
esac\n\
|
|
||||||
done\n\
|
|
||||||
exit 1\n";
|
|
||||||
let bin_dir = create_fake_nerdctl(&tmp, script);
|
|
||||||
|
|
||||||
let config = minimal_config();
|
|
||||||
let mut step = ContainerdStep::new(config);
|
let mut step = ContainerdStep::new(config);
|
||||||
|
|
||||||
let wf_step = WorkflowStep::new(0, "containerd");
|
let wf_step = WorkflowStep::new(0, "containerd");
|
||||||
@@ -144,374 +81,26 @@ async fn run_container_failure() {
|
|||||||
let pointer = ExecutionPointer::new(0);
|
let pointer = ExecutionPointer::new(0);
|
||||||
let ctx = make_context(&wf_step, &workflow, &pointer);
|
let ctx = make_context(&wf_step, &workflow, &pointer);
|
||||||
|
|
||||||
let original_path = std::env::var("PATH").unwrap_or_default();
|
|
||||||
set_path(&format!("{bin_dir}:{original_path}"));
|
|
||||||
|
|
||||||
let result = step.run(&ctx).await;
|
let result = step.run(&ctx).await;
|
||||||
|
let err = result.expect_err("should fail with socket not found");
|
||||||
set_path(&original_path);
|
|
||||||
|
|
||||||
let err = result.expect_err("should fail with non-zero exit");
|
|
||||||
let msg = format!("{err}");
|
|
||||||
assert!(msg.contains("Container exited with code 1"), "got: {msg}");
|
|
||||||
assert!(msg.contains("something went wrong"), "got: {msg}");
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Pull failure ───────────────────────────────────────────────────
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn run_pull_failure() {
|
|
||||||
let tmp = TempDir::new().unwrap();
|
|
||||||
let script = "#!/bin/sh\n\
|
|
||||||
while [ $# -gt 0 ]; do\n\
|
|
||||||
case \"$1\" in\n\
|
|
||||||
pull) echo 'image not found' >&2; exit 1;;\n\
|
|
||||||
--*) shift; shift;;\n\
|
|
||||||
*) shift;;\n\
|
|
||||||
esac\n\
|
|
||||||
done\n\
|
|
||||||
exit 1\n";
|
|
||||||
let bin_dir = create_fake_nerdctl(&tmp, script);
|
|
||||||
|
|
||||||
let mut config = minimal_config();
|
|
||||||
config.pull = "always".to_string();
|
|
||||||
let mut step = ContainerdStep::new(config);
|
|
||||||
|
|
||||||
let wf_step = WorkflowStep::new(0, "containerd");
|
|
||||||
let workflow = WorkflowInstance::new("test-wf", 1, serde_json::json!({}));
|
|
||||||
let pointer = ExecutionPointer::new(0);
|
|
||||||
let ctx = make_context(&wf_step, &workflow, &pointer);
|
|
||||||
|
|
||||||
let original_path = std::env::var("PATH").unwrap_or_default();
|
|
||||||
set_path(&format!("{bin_dir}:{original_path}"));
|
|
||||||
|
|
||||||
let result = step.run(&ctx).await;
|
|
||||||
|
|
||||||
set_path(&original_path);
|
|
||||||
|
|
||||||
let err = result.expect_err("should fail on pull");
|
|
||||||
let msg = format!("{err}");
|
|
||||||
assert!(msg.contains("Image pull failed"), "got: {msg}");
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Timeout ────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn run_timeout() {
|
|
||||||
let tmp = TempDir::new().unwrap();
|
|
||||||
let script = "#!/bin/sh\n\
|
|
||||||
while [ $# -gt 0 ]; do\n\
|
|
||||||
case \"$1\" in\n\
|
|
||||||
run) sleep 30; exit 0;;\n\
|
|
||||||
pull) exit 0;;\n\
|
|
||||||
--*) shift; shift;;\n\
|
|
||||||
*) shift;;\n\
|
|
||||||
esac\n\
|
|
||||||
done\n\
|
|
||||||
exit 1\n";
|
|
||||||
let bin_dir = create_fake_nerdctl(&tmp, script);
|
|
||||||
|
|
||||||
let mut config = minimal_config();
|
|
||||||
config.timeout_ms = Some(100);
|
|
||||||
let mut step = ContainerdStep::new(config);
|
|
||||||
|
|
||||||
let wf_step = WorkflowStep::new(0, "containerd");
|
|
||||||
let workflow = WorkflowInstance::new("test-wf", 1, serde_json::json!({}));
|
|
||||||
let pointer = ExecutionPointer::new(0);
|
|
||||||
let ctx = make_context(&wf_step, &workflow, &pointer);
|
|
||||||
|
|
||||||
let original_path = std::env::var("PATH").unwrap_or_default();
|
|
||||||
set_path(&format!("{bin_dir}:{original_path}"));
|
|
||||||
|
|
||||||
let result = step.run(&ctx).await;
|
|
||||||
|
|
||||||
set_path(&original_path);
|
|
||||||
|
|
||||||
let err = result.expect_err("should timeout");
|
|
||||||
let msg = format!("{err}");
|
|
||||||
assert!(msg.contains("timed out"), "got: {msg}");
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Missing nerdctl binary ─────────────────────────────────────────
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn run_missing_nerdctl() {
|
|
||||||
let tmp = TempDir::new().unwrap();
|
|
||||||
let bin_dir = tmp.path().to_string_lossy().to_string();
|
|
||||||
|
|
||||||
let config = minimal_config();
|
|
||||||
let mut step = ContainerdStep::new(config);
|
|
||||||
|
|
||||||
let wf_step = WorkflowStep::new(0, "containerd");
|
|
||||||
let workflow = WorkflowInstance::new("test-wf", 1, serde_json::json!({}));
|
|
||||||
let pointer = ExecutionPointer::new(0);
|
|
||||||
let ctx = make_context(&wf_step, &workflow, &pointer);
|
|
||||||
|
|
||||||
let original_path = std::env::var("PATH").unwrap_or_default();
|
|
||||||
set_path(&bin_dir);
|
|
||||||
|
|
||||||
let result = step.run(&ctx).await;
|
|
||||||
|
|
||||||
set_path(&original_path);
|
|
||||||
|
|
||||||
let err = result.expect_err("should fail with missing binary");
|
|
||||||
let msg = format!("{err}");
|
let msg = format!("{err}");
|
||||||
assert!(
|
assert!(
|
||||||
msg.contains("Failed to spawn nerdctl run") || msg.contains("Failed to pull image")
|
msg.contains("socket not found"),
|
||||||
|| msg.contains("Failed to spawn docker run"),
|
"expected 'socket not found' error, got: {msg}"
|
||||||
"got: {msg}"
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// ── pull=never skips pull ──────────────────────────────────────────
|
// ── Image check failure for non-existent image ──────────────────────
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn run_skip_pull_when_never() {
|
async fn image_not_found_error() {
|
||||||
let tmp = TempDir::new().unwrap();
|
let Some(addr) = containerd_addr() else {
|
||||||
let script = "#!/bin/sh\n\
|
eprintln!("SKIP: containerd socket not available");
|
||||||
while [ $# -gt 0 ]; do\n\
|
return;
|
||||||
case \"$1\" in\n\
|
|
||||||
run) echo ran; exit 0;;\n\
|
|
||||||
pull) echo 'pull should not be called' >&2; exit 1;;\n\
|
|
||||||
--*) shift; shift;;\n\
|
|
||||||
*) shift;;\n\
|
|
||||||
esac\n\
|
|
||||||
done\n\
|
|
||||||
exit 1\n";
|
|
||||||
let bin_dir = create_fake_nerdctl(&tmp, script);
|
|
||||||
|
|
||||||
let mut config = minimal_config();
|
|
||||||
config.pull = "never".to_string();
|
|
||||||
let mut step = ContainerdStep::new(config);
|
|
||||||
|
|
||||||
let wf_step = WorkflowStep::new(0, "containerd");
|
|
||||||
let workflow = WorkflowInstance::new("test-wf", 1, serde_json::json!({}));
|
|
||||||
let pointer = ExecutionPointer::new(0);
|
|
||||||
let ctx = make_context(&wf_step, &workflow, &pointer);
|
|
||||||
|
|
||||||
let original_path = std::env::var("PATH").unwrap_or_default();
|
|
||||||
set_path(&format!("{bin_dir}:{original_path}"));
|
|
||||||
|
|
||||||
let result = step.run(&ctx).await;
|
|
||||||
|
|
||||||
set_path(&original_path);
|
|
||||||
|
|
||||||
result.expect("should succeed without pulling");
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Workflow data is injected as env vars ──────────────────────────
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn run_injects_workflow_data_as_env() {
|
|
||||||
let tmp = TempDir::new().unwrap();
|
|
||||||
let script = "#!/bin/sh\n\
|
|
||||||
while [ $# -gt 0 ]; do\n\
|
|
||||||
case \"$1\" in\n\
|
|
||||||
run) shift\n\
|
|
||||||
for arg in \"$@\"; do echo \"ARG:$arg\"; done\n\
|
|
||||||
exit 0;;\n\
|
|
||||||
pull) exit 0;;\n\
|
|
||||||
--*) shift; shift;;\n\
|
|
||||||
*) shift;;\n\
|
|
||||||
esac\n\
|
|
||||||
done\n\
|
|
||||||
exit 1\n";
|
|
||||||
let bin_dir = create_fake_nerdctl(&tmp, script);
|
|
||||||
|
|
||||||
let mut config = minimal_config();
|
|
||||||
config.pull = "never".to_string();
|
|
||||||
config.run = None;
|
|
||||||
config.command = Some(vec!["true".to_string()]);
|
|
||||||
let mut step = ContainerdStep::new(config);
|
|
||||||
|
|
||||||
let mut wf_step = WorkflowStep::new(0, "containerd");
|
|
||||||
wf_step.name = Some("env_test".to_string());
|
|
||||||
let workflow = WorkflowInstance::new(
|
|
||||||
"test-wf",
|
|
||||||
1,
|
|
||||||
serde_json::json!({"my_key": "my_value", "count": 42}),
|
|
||||||
);
|
|
||||||
let pointer = ExecutionPointer::new(0);
|
|
||||||
let ctx = make_context(&wf_step, &workflow, &pointer);
|
|
||||||
|
|
||||||
let original_path = std::env::var("PATH").unwrap_or_default();
|
|
||||||
set_path(&format!("{bin_dir}:{original_path}"));
|
|
||||||
|
|
||||||
let result = step.run(&ctx).await;
|
|
||||||
|
|
||||||
set_path(&original_path);
|
|
||||||
|
|
||||||
let result = result.expect("should succeed");
|
|
||||||
let stdout = result
|
|
||||||
.output_data
|
|
||||||
.unwrap()
|
|
||||||
.as_object()
|
|
||||||
.unwrap()
|
|
||||||
.get("env_test.stdout")
|
|
||||||
.unwrap()
|
|
||||||
.as_str()
|
|
||||||
.unwrap()
|
|
||||||
.to_string();
|
|
||||||
|
|
||||||
// The workflow data should have been injected as uppercase env vars
|
|
||||||
assert!(stdout.contains("MY_KEY=my_value"), "stdout: {stdout}");
|
|
||||||
assert!(stdout.contains("COUNT=42"), "stdout: {stdout}");
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Step name defaults to "unknown" when None ──────────────────────
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn run_unnamed_step_uses_unknown() {
|
|
||||||
let tmp = TempDir::new().unwrap();
|
|
||||||
let script = "#!/bin/sh\n\
|
|
||||||
while [ $# -gt 0 ]; do\n\
|
|
||||||
case \"$1\" in\n\
|
|
||||||
run) echo output; exit 0;;\n\
|
|
||||||
--*) shift; shift;;\n\
|
|
||||||
*) shift;;\n\
|
|
||||||
esac\n\
|
|
||||||
done\n\
|
|
||||||
exit 1\n";
|
|
||||||
let bin_dir = create_fake_nerdctl(&tmp, script);
|
|
||||||
|
|
||||||
let config = minimal_config();
|
|
||||||
let mut step = ContainerdStep::new(config);
|
|
||||||
|
|
||||||
let wf_step = WorkflowStep::new(0, "containerd");
|
|
||||||
let workflow = WorkflowInstance::new("test-wf", 1, serde_json::json!({}));
|
|
||||||
let pointer = ExecutionPointer::new(0);
|
|
||||||
let ctx = make_context(&wf_step, &workflow, &pointer);
|
|
||||||
|
|
||||||
let original_path = std::env::var("PATH").unwrap_or_default();
|
|
||||||
set_path(&format!("{bin_dir}:{original_path}"));
|
|
||||||
|
|
||||||
let result = step.run(&ctx).await;
|
|
||||||
|
|
||||||
set_path(&original_path);
|
|
||||||
|
|
||||||
let result = result.expect("should succeed");
|
|
||||||
let output = result.output_data.unwrap();
|
|
||||||
let obj = output.as_object().unwrap();
|
|
||||||
assert!(obj.contains_key("unknown.stdout"));
|
|
||||||
assert!(obj.contains_key("unknown.stderr"));
|
|
||||||
assert!(obj.contains_key("unknown.exit_code"));
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Registry login failure ─────────────────────────────────────────
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn run_login_failure() {
|
|
||||||
let tmp = TempDir::new().unwrap();
|
|
||||||
let script = "#!/bin/sh\n\
|
|
||||||
while [ $# -gt 0 ]; do\n\
|
|
||||||
case \"$1\" in\n\
|
|
||||||
login) echo unauthorized >&2; exit 1;;\n\
|
|
||||||
--*) shift; shift;;\n\
|
|
||||||
*) shift;;\n\
|
|
||||||
esac\n\
|
|
||||||
done\n\
|
|
||||||
exit 0\n";
|
|
||||||
let bin_dir = create_fake_nerdctl(&tmp, script);
|
|
||||||
|
|
||||||
let mut config = minimal_config();
|
|
||||||
config.registry_auth = HashMap::from([(
|
|
||||||
"registry.example.com".to_string(),
|
|
||||||
RegistryAuth {
|
|
||||||
username: "user".to_string(),
|
|
||||||
password: "wrong".to_string(),
|
|
||||||
},
|
|
||||||
)]);
|
|
||||||
let mut step = ContainerdStep::new(config);
|
|
||||||
|
|
||||||
let wf_step = WorkflowStep::new(0, "containerd");
|
|
||||||
let workflow = WorkflowInstance::new("test-wf", 1, serde_json::json!({}));
|
|
||||||
let pointer = ExecutionPointer::new(0);
|
|
||||||
let ctx = make_context(&wf_step, &workflow, &pointer);
|
|
||||||
|
|
||||||
let original_path = std::env::var("PATH").unwrap_or_default();
|
|
||||||
set_path(&format!("{bin_dir}:{original_path}"));
|
|
||||||
|
|
||||||
let result = step.run(&ctx).await;
|
|
||||||
|
|
||||||
set_path(&original_path);
|
|
||||||
|
|
||||||
let err = result.expect_err("should fail on login");
|
|
||||||
let msg = format!("{err}");
|
|
||||||
assert!(msg.contains("login failed"), "got: {msg}");
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Successful login with TLS then run ─────────────────────────────
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn run_login_success_with_tls() {
|
|
||||||
let tmp = TempDir::new().unwrap();
|
|
||||||
let script = "#!/bin/sh\n\
|
|
||||||
while [ $# -gt 0 ]; do\n\
|
|
||||||
case \"$1\" in\n\
|
|
||||||
run) echo ok; exit 0;;\n\
|
|
||||||
pull) exit 0;;\n\
|
|
||||||
login) exit 0;;\n\
|
|
||||||
--*) shift; shift;;\n\
|
|
||||||
*) shift;;\n\
|
|
||||||
esac\n\
|
|
||||||
done\n\
|
|
||||||
exit 1\n";
|
|
||||||
let bin_dir = create_fake_nerdctl(&tmp, script);
|
|
||||||
|
|
||||||
let mut config = minimal_config();
|
|
||||||
config.pull = "never".to_string();
|
|
||||||
config.tls = TlsConfig {
|
|
||||||
ca: Some("/ca.pem".to_string()),
|
|
||||||
cert: Some("/cert.pem".to_string()),
|
|
||||||
key: Some("/key.pem".to_string()),
|
|
||||||
};
|
};
|
||||||
config.registry_auth = HashMap::from([(
|
|
||||||
"secure.io".to_string(),
|
|
||||||
RegistryAuth {
|
|
||||||
username: "admin".to_string(),
|
|
||||||
password: "secret".to_string(),
|
|
||||||
},
|
|
||||||
)]);
|
|
||||||
let mut step = ContainerdStep::new(config);
|
|
||||||
|
|
||||||
let wf_step = WorkflowStep::new(0, "containerd");
|
let mut config = minimal_config(&addr);
|
||||||
let workflow = WorkflowInstance::new("test-wf", 1, serde_json::json!({}));
|
config.image = "nonexistent-image-wfe-test:latest".to_string();
|
||||||
let pointer = ExecutionPointer::new(0);
|
|
||||||
let ctx = make_context(&wf_step, &workflow, &pointer);
|
|
||||||
|
|
||||||
let original_path = std::env::var("PATH").unwrap_or_default();
|
|
||||||
set_path(&format!("{bin_dir}:{original_path}"));
|
|
||||||
|
|
||||||
let result = step.run(&ctx).await;
|
|
||||||
|
|
||||||
set_path(&original_path);
|
|
||||||
|
|
||||||
result.expect("should succeed with login + TLS");
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── pull=if-not-present triggers pull ──────────────────────────────
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn run_pull_if_not_present() {
|
|
||||||
let tmp = TempDir::new().unwrap();
|
|
||||||
// Track that pull was called via a marker file
|
|
||||||
let marker = tmp.path().join("pull_called");
|
|
||||||
let marker_str = marker.to_string_lossy();
|
|
||||||
let script = format!(
|
|
||||||
"#!/bin/sh\n\
|
|
||||||
while [ $# -gt 0 ]; do\n\
|
|
||||||
case \"$1\" in\n\
|
|
||||||
run) echo ran; exit 0;;\n\
|
|
||||||
pull) touch {marker_str}; exit 0;;\n\
|
|
||||||
--*) shift; shift;;\n\
|
|
||||||
*) shift;;\n\
|
|
||||||
esac\n\
|
|
||||||
done\n\
|
|
||||||
exit 1\n"
|
|
||||||
);
|
|
||||||
let bin_dir = create_fake_nerdctl(&tmp, &script);
|
|
||||||
|
|
||||||
let mut config = minimal_config();
|
|
||||||
config.pull = "if-not-present".to_string();
|
config.pull = "if-not-present".to_string();
|
||||||
let mut step = ContainerdStep::new(config);
|
let mut step = ContainerdStep::new(config);
|
||||||
|
|
||||||
@@ -520,13 +109,57 @@ async fn run_pull_if_not_present() {
|
|||||||
let pointer = ExecutionPointer::new(0);
|
let pointer = ExecutionPointer::new(0);
|
||||||
let ctx = make_context(&wf_step, &workflow, &pointer);
|
let ctx = make_context(&wf_step, &workflow, &pointer);
|
||||||
|
|
||||||
let original_path = std::env::var("PATH").unwrap_or_default();
|
let result = step.run(&ctx).await;
|
||||||
set_path(&format!("{bin_dir}:{original_path}"));
|
let err = result.expect_err("should fail with image not found");
|
||||||
|
let msg = format!("{err}");
|
||||||
|
assert!(
|
||||||
|
msg.contains("not found"),
|
||||||
|
"expected 'not found' error, got: {msg}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── pull=never skips image check ─────────────────────────────────────
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn skip_image_check_when_pull_never() {
|
||||||
|
let Some(addr) = containerd_addr() else {
|
||||||
|
eprintln!("SKIP: containerd socket not available");
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Using a non-existent image but pull=never should skip the check.
|
||||||
|
// The step will fail later at container creation, but the image check is skipped.
|
||||||
|
let mut config = minimal_config(&addr);
|
||||||
|
config.image = "nonexistent-image-wfe-test-never:latest".to_string();
|
||||||
|
config.pull = "never".to_string();
|
||||||
|
let mut step = ContainerdStep::new(config);
|
||||||
|
|
||||||
|
let wf_step = WorkflowStep::new(0, "containerd");
|
||||||
|
let workflow = WorkflowInstance::new("test-wf", 1, serde_json::json!({}));
|
||||||
|
let pointer = ExecutionPointer::new(0);
|
||||||
|
let ctx = make_context(&wf_step, &workflow, &pointer);
|
||||||
|
|
||||||
let result = step.run(&ctx).await;
|
let result = step.run(&ctx).await;
|
||||||
|
// It should fail, but NOT with "not found in containerd" (image check).
|
||||||
set_path(&original_path);
|
// It should fail later (container creation, snapshot, etc.).
|
||||||
|
let err = result.expect_err("should fail at container or task creation");
|
||||||
result.expect("should succeed");
|
let msg = format!("{err}");
|
||||||
assert!(marker.exists(), "pull should have been called for if-not-present");
|
assert!(
|
||||||
|
!msg.contains("Pre-pull it with"),
|
||||||
|
"image check should have been skipped for pull=never, got: {msg}"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Step name defaults to "unknown" when None ────────────────────────
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn unnamed_step_uses_unknown_in_output_keys() {
|
||||||
|
// This test only verifies build_output_data behavior — no socket needed.
|
||||||
|
let parsed = HashMap::from([("result".to_string(), "ok".to_string())]);
|
||||||
|
let data = ContainerdStep::build_output_data("unknown", "out", "err", 0, &parsed);
|
||||||
|
let obj = data.as_object().unwrap();
|
||||||
|
assert!(obj.contains_key("unknown.stdout"));
|
||||||
|
assert!(obj.contains_key("unknown.stderr"));
|
||||||
|
assert!(obj.contains_key("unknown.exit_code"));
|
||||||
|
assert_eq!(obj.get("result").unwrap(), "ok");
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user