feat(wfe-buildkit, wfe-containerd): add container executor crates

Standalone workspace crates for BuildKit image building and containerd
container execution. Config types, YAML schema integration, compiler
dispatch, validation rules, and mock-based unit tests.

Current implementation shells out to buildctl/nerdctl — will be
replaced with proper gRPC clients (buildkit-client, containerd protos)
in a follow-up. Config types, YAML integration, and test infrastructure
are stable and reusable.

wfe-buildkit: 60 tests, 97.9% library coverage
wfe-containerd: 61 tests, 97.8% library coverage
447 total workspace tests.
This commit is contained in:
2026-03-26 10:28:53 +00:00
parent d4519e862f
commit 30b26ca5f0
15 changed files with 3056 additions and 51 deletions

View File

@@ -1,5 +1,5 @@
[workspace]
members = ["wfe-core", "wfe-sqlite", "wfe-postgres", "wfe-opensearch", "wfe-valkey", "wfe", "wfe-yaml"]
members = ["wfe-core", "wfe-sqlite", "wfe-postgres", "wfe-opensearch", "wfe-valkey", "wfe", "wfe-yaml", "wfe-buildkit", "wfe-containerd"]
resolver = "2"
[workspace.package]
@@ -44,6 +44,8 @@ wfe-postgres = { version = "1.0.0", path = "wfe-postgres" }
wfe-opensearch = { version = "1.0.0", path = "wfe-opensearch" }
wfe-valkey = { version = "1.0.0", path = "wfe-valkey" }
wfe-yaml = { version = "1.0.0", path = "wfe-yaml" }
wfe-buildkit = { version = "1.0.0", path = "wfe-buildkit" }
wfe-containerd = { version = "1.0.0", path = "wfe-containerd" }
# YAML
serde_yaml = "0.9"

View File

@@ -19,4 +19,6 @@ regex = { workspace = true }
[dev-dependencies]
pretty_assertions = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true, features = ["test-util"] }
tokio-util = "0.7"

View File

@@ -175,4 +175,184 @@ mod tests {
assert_eq!(auth.username, deserialized.username);
assert_eq!(auth.password, deserialized.password);
}
#[test]
fn serde_custom_addr() {
let json = r#"{
"dockerfile": "Dockerfile",
"context": ".",
"buildkit_addr": "tcp://remote:1234"
}"#;
let config: BuildkitConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.buildkit_addr, "tcp://remote:1234");
}
#[test]
fn serde_with_timeout() {
let json = r#"{
"dockerfile": "Dockerfile",
"context": ".",
"timeout_ms": 60000
}"#;
let config: BuildkitConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.timeout_ms, Some(60000));
}
#[test]
fn serde_with_tags_and_push() {
let json = r#"{
"dockerfile": "Dockerfile",
"context": ".",
"tags": ["myapp:latest", "myapp:v1.0"],
"push": true
}"#;
let config: BuildkitConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.tags, vec!["myapp:latest", "myapp:v1.0"]);
assert!(config.push);
}
#[test]
fn serde_with_build_args() {
let json = r#"{
"dockerfile": "Dockerfile",
"context": ".",
"build_args": {"VERSION": "1.0", "DEBUG": "false"}
}"#;
let config: BuildkitConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.build_args.len(), 2);
assert_eq!(config.build_args["VERSION"], "1.0");
assert_eq!(config.build_args["DEBUG"], "false");
}
#[test]
fn serde_with_cache_config() {
let json = r#"{
"dockerfile": "Dockerfile",
"context": ".",
"cache_from": ["type=registry,ref=cache:latest"],
"cache_to": ["type=registry,ref=cache:latest,mode=max"]
}"#;
let config: BuildkitConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.cache_from.len(), 1);
assert_eq!(config.cache_to.len(), 1);
}
#[test]
fn serde_with_output_type() {
let json = r#"{
"dockerfile": "Dockerfile",
"context": ".",
"output_type": "tar"
}"#;
let config: BuildkitConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.output_type, Some("tar".to_string()));
}
#[test]
fn serde_with_registry_auth() {
let json = r#"{
"dockerfile": "Dockerfile",
"context": ".",
"registry_auth": {
"ghcr.io": {"username": "bot", "password": "tok"},
"docker.io": {"username": "u", "password": "p"}
}
}"#;
let config: BuildkitConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.registry_auth.len(), 2);
assert_eq!(config.registry_auth["ghcr.io"].username, "bot");
assert_eq!(config.registry_auth["docker.io"].password, "p");
}
#[test]
fn serde_with_tls() {
let json = r#"{
"dockerfile": "Dockerfile",
"context": ".",
"tls": {
"ca": "/certs/ca.pem",
"cert": "/certs/cert.pem",
"key": "/certs/key.pem"
}
}"#;
let config: BuildkitConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.tls.ca, Some("/certs/ca.pem".to_string()));
assert_eq!(config.tls.cert, Some("/certs/cert.pem".to_string()));
assert_eq!(config.tls.key, Some("/certs/key.pem".to_string()));
}
#[test]
fn serde_partial_tls() {
let json = r#"{
"dockerfile": "Dockerfile",
"context": ".",
"tls": {"ca": "/certs/ca.pem"}
}"#;
let config: BuildkitConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.tls.ca, Some("/certs/ca.pem".to_string()));
assert_eq!(config.tls.cert, None);
assert_eq!(config.tls.key, None);
}
#[test]
fn serde_empty_tls_object() {
let json = r#"{
"dockerfile": "Dockerfile",
"context": ".",
"tls": {}
}"#;
let config: BuildkitConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.tls.ca, None);
assert_eq!(config.tls.cert, None);
assert_eq!(config.tls.key, None);
}
#[test]
fn tls_config_clone() {
let tls = TlsConfig {
ca: Some("ca".to_string()),
cert: Some("cert".to_string()),
key: Some("key".to_string()),
};
let cloned = tls.clone();
assert_eq!(tls.ca, cloned.ca);
assert_eq!(tls.cert, cloned.cert);
assert_eq!(tls.key, cloned.key);
}
#[test]
fn tls_config_debug() {
let tls = TlsConfig::default();
let debug = format!("{:?}", tls);
assert!(debug.contains("TlsConfig"));
}
#[test]
fn buildkit_config_debug() {
let json = r#"{"dockerfile": "Dockerfile", "context": "."}"#;
let config: BuildkitConfig = serde_json::from_str(json).unwrap();
let debug = format!("{:?}", config);
assert!(debug.contains("BuildkitConfig"));
}
#[test]
fn registry_auth_clone() {
let auth = RegistryAuth {
username: "u".to_string(),
password: "p".to_string(),
};
let cloned = auth.clone();
assert_eq!(auth.username, cloned.username);
assert_eq!(auth.password, cloned.password);
}
#[test]
fn buildkit_config_clone() {
let json = r#"{"dockerfile": "Dockerfile", "context": "."}"#;
let config: BuildkitConfig = serde_json::from_str(json).unwrap();
let cloned = config.clone();
assert_eq!(config.dockerfile, cloned.dockerfile);
assert_eq!(config.context, cloned.context);
assert_eq!(config.buildkit_addr, cloned.buildkit_addr);
}
}

View File

@@ -2,4 +2,4 @@ pub mod config;
pub mod step;
pub use config::{BuildkitConfig, RegistryAuth, TlsConfig};
pub use step::BuildkitStep;
pub use step::{build_output_data, parse_digest, BuildkitStep};

View File

@@ -151,6 +151,49 @@ pub fn parse_digest(output: &str) -> Option<String> {
.map(|caps| format!("sha256:{}", &caps[1]))
}
/// Build the output data JSON object from step execution results.
///
/// Assembles a `serde_json::Value::Object` containing the step's stdout,
/// stderr, digest (if found), and tags (if any).
pub fn build_output_data(
step_name: &str,
stdout: &str,
stderr: &str,
digest: Option<&str>,
tags: &[String],
) -> serde_json::Value {
let mut outputs = serde_json::Map::new();
if let Some(digest) = digest {
outputs.insert(
format!("{step_name}.digest"),
serde_json::Value::String(digest.to_string()),
);
}
if !tags.is_empty() {
outputs.insert(
format!("{step_name}.tags"),
serde_json::Value::Array(
tags.iter()
.map(|t| serde_json::Value::String(t.clone()))
.collect(),
),
);
}
outputs.insert(
format!("{step_name}.stdout"),
serde_json::Value::String(stdout.to_string()),
);
outputs.insert(
format!("{step_name}.stderr"),
serde_json::Value::String(stderr.to_string()),
);
serde_json::Value::Object(outputs)
}
#[async_trait]
impl StepBody for BuildkitStep {
async fn run(
@@ -208,40 +251,17 @@ impl StepBody for BuildkitStep {
let combined_output = format!("{stdout}\n{stderr}");
let digest = parse_digest(&combined_output);
let mut outputs = serde_json::Map::new();
if let Some(ref digest) = digest {
outputs.insert(
format!("{step_name}.digest"),
serde_json::Value::String(digest.clone()),
);
}
if !self.config.tags.is_empty() {
outputs.insert(
format!("{step_name}.tags"),
serde_json::Value::Array(
self.config
.tags
.iter()
.map(|t| serde_json::Value::String(t.clone()))
.collect(),
),
);
}
outputs.insert(
format!("{step_name}.stdout"),
serde_json::Value::String(stdout),
);
outputs.insert(
format!("{step_name}.stderr"),
serde_json::Value::String(stderr),
let output_data = build_output_data(
step_name,
&stdout,
&stderr,
digest.as_deref(),
&self.config.tags,
);
Ok(ExecutionResult {
proceed: true,
output_data: Some(serde_json::Value::Object(outputs)),
output_data: Some(output_data),
..Default::default()
})
}
@@ -273,6 +293,10 @@ mod tests {
}
}
// ---------------------------------------------------------------
// build_command tests
// ---------------------------------------------------------------
#[test]
fn build_command_minimal() {
let step = BuildkitStep::new(minimal_config());
@@ -320,6 +344,22 @@ mod tests {
);
}
#[test]
fn build_command_tags_no_push() {
let mut config = minimal_config();
config.tags = vec!["myapp:latest".to_string()];
config.push = false;
let step = BuildkitStep::new(config);
let cmd = step.build_command();
let output_idx = cmd.iter().position(|a| a == "--output").unwrap();
assert_eq!(
cmd[output_idx + 1],
"type=image,name=myapp:latest,push=false"
);
}
#[test]
fn build_command_with_build_args() {
let mut config = minimal_config();
@@ -367,6 +407,40 @@ mod tests {
);
}
#[test]
fn build_command_with_multiple_cache_sources() {
let mut config = minimal_config();
config.cache_from = vec![
"type=registry,ref=myapp:cache".to_string(),
"type=local,src=/tmp/cache".to_string(),
];
config.cache_to = vec![
"type=registry,ref=myapp:cache,mode=max".to_string(),
"type=local,dest=/tmp/cache".to_string(),
];
let step = BuildkitStep::new(config);
let cmd = step.build_command();
let import_positions: Vec<usize> = cmd
.iter()
.enumerate()
.filter(|(_, a)| *a == "--import-cache")
.map(|(i, _)| i)
.collect();
assert_eq!(import_positions.len(), 2);
assert_eq!(cmd[import_positions[0] + 1], "type=registry,ref=myapp:cache");
assert_eq!(cmd[import_positions[1] + 1], "type=local,src=/tmp/cache");
let export_positions: Vec<usize> = cmd
.iter()
.enumerate()
.filter(|(_, a)| *a == "--export-cache")
.map(|(i, _)| i)
.collect();
assert_eq!(export_positions.len(), 2);
}
#[test]
fn build_command_with_tls() {
let mut config = minimal_config();
@@ -395,6 +469,23 @@ mod tests {
assert!(key_idx < build_idx);
}
#[test]
fn build_command_with_partial_tls() {
let mut config = minimal_config();
config.tls = TlsConfig {
ca: Some("/certs/ca.pem".to_string()),
cert: None,
key: None,
};
let step = BuildkitStep::new(config);
let cmd = step.build_command();
assert!(cmd.contains(&"--tlscacert".to_string()));
assert!(!cmd.contains(&"--tlscert".to_string()));
assert!(!cmd.contains(&"--tlskey".to_string()));
}
#[test]
fn build_command_with_registry_auth() {
let mut config = minimal_config();
@@ -451,6 +542,135 @@ mod tests {
assert_eq!(cmd[output_idx + 1], "type=local");
}
#[test]
fn build_command_output_type_tar() {
let mut config = minimal_config();
config.output_type = Some("tar".to_string());
let step = BuildkitStep::new(config);
let cmd = step.build_command();
let output_idx = cmd.iter().position(|a| a == "--output").unwrap();
assert_eq!(cmd[output_idx + 1], "type=tar");
}
#[test]
fn build_command_dockerfile_at_root() {
// When dockerfile is just a bare filename (no path component),
// the directory should be "." and no filename opt is emitted.
let config = minimal_config(); // dockerfile = "Dockerfile"
let step = BuildkitStep::new(config);
let cmd = step.build_command();
assert!(cmd.contains(&"dockerfile=.".to_string()));
// "Dockerfile" is the default so no --opt filename=... should appear
assert!(!cmd.iter().any(|a| a.starts_with("filename=")));
}
#[test]
fn build_command_custom_addr() {
let mut config = minimal_config();
config.buildkit_addr = "tcp://buildkitd:1234".to_string();
let step = BuildkitStep::new(config);
let cmd = step.build_command();
assert_eq!(cmd[1], "--addr");
assert_eq!(cmd[2], "tcp://buildkitd:1234");
}
#[test]
fn build_command_all_options_combined() {
let mut config = minimal_config();
config.buildkit_addr = "tcp://remote:9999".to_string();
config.dockerfile = "ci/Dockerfile.ci".to_string();
config.context = "/workspace".to_string();
config.target = Some("final".to_string());
config.tags = vec!["img:v1".to_string()];
config.push = true;
config.build_args.insert("A".to_string(), "1".to_string());
config.cache_from = vec!["type=local,src=/c".to_string()];
config.cache_to = vec!["type=local,dest=/c".to_string()];
config.tls = TlsConfig {
ca: Some("ca".to_string()),
cert: Some("cert".to_string()),
key: Some("key".to_string()),
};
let step = BuildkitStep::new(config);
let cmd = step.build_command();
// Verify key elements exist
assert!(cmd.contains(&"tcp://remote:9999".to_string()));
assert!(cmd.contains(&"context=/workspace".to_string()));
assert!(cmd.contains(&"dockerfile=ci".to_string()));
assert!(cmd.contains(&"filename=Dockerfile.ci".to_string()));
assert!(cmd.contains(&"target=final".to_string()));
assert!(cmd.contains(&"build-arg:A=1".to_string()));
assert!(cmd.iter().any(|a| a.starts_with("type=image,name=img:v1,push=true")));
}
// ---------------------------------------------------------------
// build_registry_env tests
// ---------------------------------------------------------------
#[test]
fn build_registry_env_sanitizes_host() {
let mut config = minimal_config();
config.registry_auth.insert(
"my-registry.example.com".to_string(),
RegistryAuth {
username: "u".to_string(),
password: "p".to_string(),
},
);
let step = BuildkitStep::new(config);
let env = step.build_registry_env();
assert!(env.contains_key("BUILDKIT_HOST_MY_REGISTRY_EXAMPLE_COM_USERNAME"));
assert!(env.contains_key("BUILDKIT_HOST_MY_REGISTRY_EXAMPLE_COM_PASSWORD"));
}
#[test]
fn build_registry_env_empty_when_no_auth() {
let step = BuildkitStep::new(minimal_config());
let env = step.build_registry_env();
assert!(env.is_empty());
}
#[test]
fn build_registry_env_multiple_registries() {
let mut config = minimal_config();
config.registry_auth.insert(
"ghcr.io".to_string(),
RegistryAuth {
username: "gh_user".to_string(),
password: "gh_pass".to_string(),
},
);
config.registry_auth.insert(
"docker.io".to_string(),
RegistryAuth {
username: "dh_user".to_string(),
password: "dh_pass".to_string(),
},
);
let step = BuildkitStep::new(config);
let env = step.build_registry_env();
assert_eq!(env.len(), 4);
assert_eq!(env["BUILDKIT_HOST_GHCR_IO_USERNAME"], "gh_user");
assert_eq!(env["BUILDKIT_HOST_GHCR_IO_PASSWORD"], "gh_pass");
assert_eq!(env["BUILDKIT_HOST_DOCKER_IO_USERNAME"], "dh_user");
assert_eq!(env["BUILDKIT_HOST_DOCKER_IO_PASSWORD"], "dh_pass");
}
// ---------------------------------------------------------------
// parse_digest tests
// ---------------------------------------------------------------
#[test]
fn parse_digest_from_output() {
let output = "some build output\nexporting manifest sha256:abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789\ndone";
@@ -492,27 +712,387 @@ mod tests {
}
#[test]
fn build_registry_env_sanitizes_host() {
let mut config = minimal_config();
config.registry_auth.insert(
"my-registry.example.com".to_string(),
RegistryAuth {
username: "u".to_string(),
password: "p".to_string(),
},
);
let step = BuildkitStep::new(config);
let env = step.build_registry_env();
assert!(env.contains_key("BUILDKIT_HOST_MY_REGISTRY_EXAMPLE_COM_USERNAME"));
assert!(env.contains_key("BUILDKIT_HOST_MY_REGISTRY_EXAMPLE_COM_PASSWORD"));
fn parse_digest_empty_input() {
assert_eq!(parse_digest(""), None);
}
#[test]
fn build_registry_env_empty_when_no_auth() {
let step = BuildkitStep::new(minimal_config());
let env = step.build_registry_env();
assert!(env.is_empty());
fn parse_digest_wrong_prefix() {
// Has the hash but without a recognized prefix
let output =
"sha256:abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789";
assert_eq!(parse_digest(output), None);
}
#[test]
fn parse_digest_uppercase_hex_returns_none() {
// Regex expects lowercase hex
let output = "exporting manifest sha256:ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789";
assert_eq!(parse_digest(output), None);
}
#[test]
fn parse_digest_multiline_with_noise() {
let output = r#"
[+] Building 12.3s (8/8) FINISHED
=> exporting to image
=> exporting manifest sha256:aabbccdd0011223344556677aabbccdd0011223344556677aabbccdd00112233
=> done
"#;
assert_eq!(
parse_digest(output),
Some("sha256:aabbccdd0011223344556677aabbccdd0011223344556677aabbccdd00112233".to_string())
);
}
#[test]
fn parse_digest_first_match_wins() {
let hash1 = "a".repeat(64);
let hash2 = "b".repeat(64);
let output = format!(
"exporting manifest sha256:{hash1}\ndigest: sha256:{hash2}"
);
let digest = parse_digest(&output).unwrap();
assert_eq!(digest, format!("sha256:{hash1}"));
}
// ---------------------------------------------------------------
// build_output_data tests
// ---------------------------------------------------------------
#[test]
fn build_output_data_with_digest_and_tags() {
let digest = "sha256:abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789";
let tags = vec!["myapp:latest".to_string(), "myapp:v1".to_string()];
let result = build_output_data("build", "out", "err", Some(digest), &tags);
let obj = result.as_object().unwrap();
assert_eq!(obj["build.digest"], digest);
assert_eq!(
obj["build.tags"],
serde_json::json!(["myapp:latest", "myapp:v1"])
);
assert_eq!(obj["build.stdout"], "out");
assert_eq!(obj["build.stderr"], "err");
}
#[test]
fn build_output_data_without_digest() {
let result = build_output_data("step1", "hello", "", None, &[]);
let obj = result.as_object().unwrap();
assert!(!obj.contains_key("step1.digest"));
assert!(!obj.contains_key("step1.tags"));
assert_eq!(obj["step1.stdout"], "hello");
assert_eq!(obj["step1.stderr"], "");
}
#[test]
fn build_output_data_with_digest_no_tags() {
let digest = "sha256:0000000000000000000000000000000000000000000000000000000000000000";
let result = build_output_data("img", "ok", "warn", Some(digest), &[]);
let obj = result.as_object().unwrap();
assert_eq!(obj["img.digest"], digest);
assert!(!obj.contains_key("img.tags"));
assert_eq!(obj["img.stdout"], "ok");
assert_eq!(obj["img.stderr"], "warn");
}
#[test]
fn build_output_data_no_digest_with_tags() {
let tags = vec!["app:v2".to_string()];
let result = build_output_data("s", "", "", None, &tags);
let obj = result.as_object().unwrap();
assert!(!obj.contains_key("s.digest"));
assert_eq!(obj["s.tags"], serde_json::json!(["app:v2"]));
}
#[test]
fn build_output_data_empty_strings() {
let result = build_output_data("x", "", "", None, &[]);
let obj = result.as_object().unwrap();
assert_eq!(obj["x.stdout"], "");
assert_eq!(obj["x.stderr"], "");
assert_eq!(obj.len(), 2);
}
// ---------------------------------------------------------------
// Integration tests using mock buildctl
// ---------------------------------------------------------------
/// Helper to create a StepExecutionContext for testing.
fn make_test_context(
step_name: &str,
) -> (
wfe_core::models::WorkflowStep,
wfe_core::models::ExecutionPointer,
wfe_core::models::WorkflowInstance,
) {
let mut step = wfe_core::models::WorkflowStep::new(0, "buildkit");
step.name = Some(step_name.to_string());
let pointer = wfe_core::models::ExecutionPointer::new(0);
let instance =
wfe_core::models::WorkflowInstance::new("test-wf", 1, serde_json::json!({}));
(step, pointer, instance)
}
#[cfg(unix)]
fn write_mock_buildctl(dir: &std::path::Path, script: &str) {
let path = dir.join("buildctl");
std::fs::write(&path, script).unwrap();
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(&path, std::fs::Permissions::from_mode(0o755)).unwrap();
}
#[cfg(unix)]
fn path_with_prefix(prefix: &std::path::Path) -> String {
let current = std::env::var("PATH").unwrap_or_default();
format!("{}:{current}", prefix.display())
}
#[cfg(unix)]
#[tokio::test]
async fn run_with_mock_buildctl_success_with_digest() {
let tmp = tempfile::tempdir().unwrap();
let digest_hash = "a".repeat(64);
let script = format!(
"#!/bin/sh\necho \"exporting manifest sha256:{digest_hash}\"\nexit 0\n"
);
write_mock_buildctl(tmp.path(), &script);
let mut config = minimal_config();
config.tags = vec!["myapp:latest".to_string()];
let mut step = BuildkitStep::new(config);
let (ws, pointer, instance) = make_test_context("build-img");
let cancel = tokio_util::sync::CancellationToken::new();
let ctx = wfe_core::traits::step::StepExecutionContext {
item: None,
execution_pointer: &pointer,
persistence_data: None,
step: &ws,
workflow: &instance,
cancellation_token: cancel,
};
// Override PATH so our mock is found first
let new_path = path_with_prefix(tmp.path());
unsafe { std::env::set_var("PATH", &new_path) };
let result = step.run(&ctx).await.unwrap();
assert!(result.proceed);
let data = result.output_data.unwrap();
let obj = data.as_object().unwrap();
assert_eq!(
obj["build-img.digest"],
format!("sha256:{digest_hash}")
);
assert_eq!(
obj["build-img.tags"],
serde_json::json!(["myapp:latest"])
);
assert!(obj.contains_key("build-img.stdout"));
assert!(obj.contains_key("build-img.stderr"));
}
#[cfg(unix)]
#[tokio::test]
async fn run_with_mock_buildctl_success_no_digest() {
let tmp = tempfile::tempdir().unwrap();
write_mock_buildctl(tmp.path(), "#!/bin/sh\necho \"build complete\"\nexit 0\n");
let mut step = BuildkitStep::new(minimal_config());
let (ws, pointer, instance) = make_test_context("no-digest");
let cancel = tokio_util::sync::CancellationToken::new();
let ctx = wfe_core::traits::step::StepExecutionContext {
item: None,
execution_pointer: &pointer,
persistence_data: None,
step: &ws,
workflow: &instance,
cancellation_token: cancel,
};
let new_path = path_with_prefix(tmp.path());
unsafe { std::env::set_var("PATH", &new_path) };
let result = step.run(&ctx).await.unwrap();
assert!(result.proceed);
let data = result.output_data.unwrap();
let obj = data.as_object().unwrap();
assert!(!obj.contains_key("no-digest.digest"));
assert!(!obj.contains_key("no-digest.tags"));
}
#[cfg(unix)]
#[tokio::test]
async fn run_with_mock_buildctl_nonzero_exit() {
let tmp = tempfile::tempdir().unwrap();
write_mock_buildctl(
tmp.path(),
"#!/bin/sh\necho \"error: something failed\" >&2\nexit 1\n",
);
let mut step = BuildkitStep::new(minimal_config());
let (ws, pointer, instance) = make_test_context("fail-step");
let cancel = tokio_util::sync::CancellationToken::new();
let ctx = wfe_core::traits::step::StepExecutionContext {
item: None,
execution_pointer: &pointer,
persistence_data: None,
step: &ws,
workflow: &instance,
cancellation_token: cancel,
};
let new_path = path_with_prefix(tmp.path());
unsafe { std::env::set_var("PATH", &new_path) };
let err = step.run(&ctx).await.unwrap_err();
let msg = format!("{err}");
assert!(msg.contains("exited with code 1"), "got: {msg}");
assert!(msg.contains("something failed"), "got: {msg}");
}
#[cfg(unix)]
#[tokio::test]
async fn run_with_mock_buildctl_timeout() {
let tmp = tempfile::tempdir().unwrap();
write_mock_buildctl(tmp.path(), "#!/bin/sh\nsleep 60\n");
let mut config = minimal_config();
config.timeout_ms = Some(100); // 100ms timeout
let mut step = BuildkitStep::new(config);
let (ws, pointer, instance) = make_test_context("timeout-step");
let cancel = tokio_util::sync::CancellationToken::new();
let ctx = wfe_core::traits::step::StepExecutionContext {
item: None,
execution_pointer: &pointer,
persistence_data: None,
step: &ws,
workflow: &instance,
cancellation_token: cancel,
};
let new_path = path_with_prefix(tmp.path());
unsafe { std::env::set_var("PATH", &new_path) };
let err = step.run(&ctx).await.unwrap_err();
let msg = format!("{err}");
assert!(msg.contains("timed out after 100ms"), "got: {msg}");
}
#[cfg(unix)]
#[tokio::test]
async fn run_with_missing_buildctl() {
// Use a temp dir with no buildctl script and make it the only PATH entry
let tmp = tempfile::tempdir().unwrap();
let mut step = BuildkitStep::new(minimal_config());
let (ws, pointer, instance) = make_test_context("missing");
let cancel = tokio_util::sync::CancellationToken::new();
let ctx = wfe_core::traits::step::StepExecutionContext {
item: None,
execution_pointer: &pointer,
persistence_data: None,
step: &ws,
workflow: &instance,
cancellation_token: cancel,
};
// Set PATH to empty dir so buildctl is not found
unsafe { std::env::set_var("PATH", tmp.path()) };
let err = step.run(&ctx).await.unwrap_err();
let msg = format!("{err}");
assert!(
msg.contains("Failed to spawn buildctl"),
"got: {msg}"
);
}
#[cfg(unix)]
#[tokio::test]
async fn run_with_mock_buildctl_stderr_output() {
let tmp = tempfile::tempdir().unwrap();
let digest_hash = "b".repeat(64);
let script = format!(
"#!/bin/sh\necho \"stdout line\" \necho \"digest: sha256:{digest_hash}\" >&2\nexit 0\n"
);
write_mock_buildctl(tmp.path(), &script);
let mut config = minimal_config();
config.tags = vec!["app:v2".to_string()];
let mut step = BuildkitStep::new(config);
let (ws, pointer, instance) = make_test_context("stderr-test");
let cancel = tokio_util::sync::CancellationToken::new();
let ctx = wfe_core::traits::step::StepExecutionContext {
item: None,
execution_pointer: &pointer,
persistence_data: None,
step: &ws,
workflow: &instance,
cancellation_token: cancel,
};
let new_path = path_with_prefix(tmp.path());
unsafe { std::env::set_var("PATH", &new_path) };
let result = step.run(&ctx).await.unwrap();
let data = result.output_data.unwrap();
let obj = data.as_object().unwrap();
// Digest should be found from stderr (combined output is searched)
assert_eq!(
obj["stderr-test.digest"],
format!("sha256:{digest_hash}")
);
}
#[cfg(unix)]
#[tokio::test]
async fn run_with_unnamed_step_uses_unknown() {
let tmp = tempfile::tempdir().unwrap();
write_mock_buildctl(tmp.path(), "#!/bin/sh\necho ok\nexit 0\n");
let mut step = BuildkitStep::new(minimal_config());
// Create a step with no name
let ws = wfe_core::models::WorkflowStep::new(0, "buildkit");
let pointer = wfe_core::models::ExecutionPointer::new(0);
let instance =
wfe_core::models::WorkflowInstance::new("test-wf", 1, serde_json::json!({}));
let cancel = tokio_util::sync::CancellationToken::new();
let ctx = wfe_core::traits::step::StepExecutionContext {
item: None,
execution_pointer: &pointer,
persistence_data: None,
step: &ws,
workflow: &instance,
cancellation_token: cancel,
};
let new_path = path_with_prefix(tmp.path());
unsafe { std::env::set_var("PATH", &new_path) };
let result = step.run(&ctx).await.unwrap();
let data = result.output_data.unwrap();
let obj = data.as_object().unwrap();
// Should use "unknown" as step name
assert!(obj.contains_key("unknown.stdout"));
assert!(obj.contains_key("unknown.stderr"));
}
}

23
wfe-containerd/Cargo.toml Normal file
View File

@@ -0,0 +1,23 @@
[package]
name = "wfe-containerd"
version.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
homepage.workspace = true
description = "containerd container runner executor for WFE"
[dependencies]
wfe-core = { workspace = true }
tokio = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
async-trait = { workspace = true }
tracing = { workspace = true }
thiserror = { workspace = true }
[dev-dependencies]
pretty_assertions = { workspace = true }
tokio = { workspace = true, features = ["test-util"] }
tempfile = { workspace = true }
tokio-util = "0.7"

70
wfe-containerd/README.md Normal file
View File

@@ -0,0 +1,70 @@
# wfe-containerd
Containerd container runner executor for WFE.
## What it does
`wfe-containerd` runs containers via `nerdctl` as workflow steps. It pulls images, manages registry authentication, and executes containers with configurable networking, resource limits, volume mounts, and TLS settings. Output is captured and parsed for `##wfe[output key=value]` directives, following the same convention as the shell executor.
## Quick start
Add a containerd step to your YAML workflow:
```yaml
workflow:
id: container-pipeline
version: 1
steps:
- name: run-tests
type: containerd
config:
image: node:20-alpine
run: npm test
network: none
memory: 512m
cpu: "1.0"
timeout: 5m
env:
NODE_ENV: test
volumes:
- source: /workspace
target: /app
readonly: true
```
Enable the feature in `wfe-yaml`:
```toml
[dependencies]
wfe-yaml = { version = "1.0.0", features = ["containerd"] }
```
## Configuration
| Field | Type | Default | Description |
|---|---|---|---|
| `image` | `String` | required | Container image to run |
| `run` | `String` | - | Shell command (uses `sh -c`) |
| `command` | `Vec<String>` | - | Command array (mutually exclusive with `run`) |
| `env` | `HashMap` | `{}` | Environment variables |
| `volumes` | `Vec<VolumeMount>` | `[]` | Volume mounts |
| `working_dir` | `String` | - | Working directory inside container |
| `user` | `String` | `65534:65534` | User/group to run as (nobody by default) |
| `network` | `String` | `none` | Network mode: `none`, `host`, or `bridge` |
| `memory` | `String` | - | Memory limit (e.g. `512m`, `1g`) |
| `cpu` | `String` | - | CPU limit (e.g. `1.0`, `0.5`) |
| `pull` | `String` | `if-not-present` | Pull policy: `always`, `if-not-present`, `never` |
| `containerd_addr` | `String` | `/run/containerd/containerd.sock` | Containerd socket address |
| `tls` | `TlsConfig` | - | TLS configuration for containerd connection |
| `registry_auth` | `HashMap` | `{}` | Registry authentication per registry hostname |
| `timeout` | `String` | - | Execution timeout (e.g. `30s`, `5m`) |
## Output parsing
The step captures stdout and stderr. Lines matching `##wfe[output key=value]` are extracted as workflow outputs. Raw stdout, stderr, and exit code are also available under `{step_name}.stdout`, `{step_name}.stderr`, and `{step_name}.exit_code`.
## Security defaults
- Runs as nobody (`65534:65534`) by default
- Network disabled (`none`) by default
- Containers are always `--rm` (removed after execution)

View File

@@ -0,0 +1,226 @@
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ContainerdConfig {
pub image: String,
pub command: Option<Vec<String>>,
pub run: Option<String>,
#[serde(default)]
pub env: HashMap<String, String>,
#[serde(default)]
pub volumes: Vec<VolumeMountConfig>,
pub working_dir: Option<String>,
#[serde(default = "default_user")]
pub user: String,
#[serde(default = "default_network")]
pub network: String,
pub memory: Option<String>,
pub cpu: Option<String>,
#[serde(default = "default_pull")]
pub pull: String,
#[serde(default = "default_containerd_addr")]
pub containerd_addr: String,
/// CLI binary name: "nerdctl" (default) or "docker".
#[serde(default = "default_cli")]
pub cli: String,
#[serde(default)]
pub tls: TlsConfig,
#[serde(default)]
pub registry_auth: HashMap<String, RegistryAuth>,
pub timeout_ms: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VolumeMountConfig {
pub source: String,
pub target: String,
#[serde(default)]
pub readonly: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct TlsConfig {
pub ca: Option<String>,
pub cert: Option<String>,
pub key: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegistryAuth {
pub username: String,
pub password: String,
}
fn default_user() -> String {
"65534:65534".to_string()
}
fn default_network() -> String {
"none".to_string()
}
fn default_pull() -> String {
"if-not-present".to_string()
}
fn default_containerd_addr() -> String {
"/run/containerd/containerd.sock".to_string()
}
fn default_cli() -> String {
"nerdctl".to_string()
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
#[test]
fn serde_round_trip_full_config() {
let config = ContainerdConfig {
image: "alpine:3.18".to_string(),
command: Some(vec!["echo".to_string(), "hello".to_string()]),
run: None,
env: HashMap::from([("FOO".to_string(), "bar".to_string())]),
volumes: vec![VolumeMountConfig {
source: "/host/path".to_string(),
target: "/container/path".to_string(),
readonly: true,
}],
working_dir: Some("/app".to_string()),
user: "1000:1000".to_string(),
network: "host".to_string(),
memory: Some("512m".to_string()),
cpu: Some("1.0".to_string()),
pull: "always".to_string(),
containerd_addr: "/custom/containerd.sock".to_string(),
cli: "nerdctl".to_string(),
tls: TlsConfig {
ca: Some("/ca.pem".to_string()),
cert: Some("/cert.pem".to_string()),
key: Some("/key.pem".to_string()),
},
registry_auth: HashMap::from([(
"registry.example.com".to_string(),
RegistryAuth {
username: "user".to_string(),
password: "pass".to_string(),
},
)]),
timeout_ms: Some(30000),
};
let json = serde_json::to_string(&config).unwrap();
let deserialized: ContainerdConfig = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.image, config.image);
assert_eq!(deserialized.command, config.command);
assert_eq!(deserialized.run, config.run);
assert_eq!(deserialized.env, config.env);
assert_eq!(deserialized.volumes.len(), 1);
assert_eq!(deserialized.volumes[0].source, "/host/path");
assert_eq!(deserialized.volumes[0].readonly, true);
assert_eq!(deserialized.working_dir, Some("/app".to_string()));
assert_eq!(deserialized.user, "1000:1000");
assert_eq!(deserialized.network, "host");
assert_eq!(deserialized.memory, Some("512m".to_string()));
assert_eq!(deserialized.cpu, Some("1.0".to_string()));
assert_eq!(deserialized.pull, "always");
assert_eq!(deserialized.containerd_addr, "/custom/containerd.sock");
assert_eq!(deserialized.tls.ca, Some("/ca.pem".to_string()));
assert_eq!(deserialized.tls.cert, Some("/cert.pem".to_string()));
assert_eq!(deserialized.tls.key, Some("/key.pem".to_string()));
assert!(deserialized.registry_auth.contains_key("registry.example.com"));
assert_eq!(deserialized.timeout_ms, Some(30000));
}
#[test]
fn serde_round_trip_minimal_config() {
let json = r#"{"image": "alpine:latest"}"#;
let config: ContainerdConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.image, "alpine:latest");
assert_eq!(config.command, None);
assert_eq!(config.run, None);
assert!(config.env.is_empty());
assert!(config.volumes.is_empty());
assert_eq!(config.working_dir, None);
assert_eq!(config.user, "65534:65534");
assert_eq!(config.network, "none");
assert_eq!(config.memory, None);
assert_eq!(config.cpu, None);
assert_eq!(config.pull, "if-not-present");
assert_eq!(config.containerd_addr, "/run/containerd/containerd.sock");
assert_eq!(config.timeout_ms, None);
// Round-trip
let serialized = serde_json::to_string(&config).unwrap();
let deserialized: ContainerdConfig = serde_json::from_str(&serialized).unwrap();
assert_eq!(deserialized.image, "alpine:latest");
assert_eq!(deserialized.user, "65534:65534");
}
#[test]
fn default_values() {
let json = r#"{"image": "busybox"}"#;
let config: ContainerdConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.user, "65534:65534");
assert_eq!(config.network, "none");
assert_eq!(config.pull, "if-not-present");
assert_eq!(config.containerd_addr, "/run/containerd/containerd.sock");
}
#[test]
fn volume_mount_serde() {
let vol = VolumeMountConfig {
source: "/data".to_string(),
target: "/mnt/data".to_string(),
readonly: false,
};
let json = serde_json::to_string(&vol).unwrap();
let deserialized: VolumeMountConfig = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.source, "/data");
assert_eq!(deserialized.target, "/mnt/data");
assert_eq!(deserialized.readonly, false);
// With readonly=true
let vol_ro = VolumeMountConfig {
source: "/src".to_string(),
target: "/dest".to_string(),
readonly: true,
};
let json_ro = serde_json::to_string(&vol_ro).unwrap();
let deserialized_ro: VolumeMountConfig = serde_json::from_str(&json_ro).unwrap();
assert_eq!(deserialized_ro.readonly, true);
}
#[test]
fn tls_config_defaults() {
let tls = TlsConfig::default();
assert_eq!(tls.ca, None);
assert_eq!(tls.cert, None);
assert_eq!(tls.key, None);
let json = r#"{}"#;
let deserialized: TlsConfig = serde_json::from_str(json).unwrap();
assert_eq!(deserialized.ca, None);
assert_eq!(deserialized.cert, None);
assert_eq!(deserialized.key, None);
}
#[test]
fn registry_auth_serde() {
let auth = RegistryAuth {
username: "admin".to_string(),
password: "secret123".to_string(),
};
let json = serde_json::to_string(&auth).unwrap();
let deserialized: RegistryAuth = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.username, "admin");
assert_eq!(deserialized.password, "secret123");
}
}

View File

@@ -0,0 +1,5 @@
pub mod config;
pub mod step;
pub use config::{ContainerdConfig, RegistryAuth, TlsConfig, VolumeMountConfig};
pub use step::ContainerdStep;

1051
wfe-containerd/src/step.rs Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,532 @@
use std::collections::HashMap;
use std::io::Write;
use std::os::unix::fs::PermissionsExt;
use tempfile::TempDir;
use wfe_containerd::config::{ContainerdConfig, RegistryAuth, TlsConfig};
use wfe_containerd::ContainerdStep;
use wfe_core::models::{ExecutionPointer, WorkflowInstance, WorkflowStep};
use wfe_core::traits::step::{StepBody, StepExecutionContext};
fn minimal_config() -> ContainerdConfig {
ContainerdConfig {
image: "alpine:3.18".to_string(),
command: None,
run: Some("echo hello".to_string()),
env: HashMap::new(),
volumes: vec![],
working_dir: None,
user: "65534:65534".to_string(),
network: "none".to_string(),
memory: None,
cpu: None,
pull: "never".to_string(),
containerd_addr: "/run/containerd/containerd.sock".to_string(),
cli: "nerdctl".to_string(),
tls: TlsConfig::default(),
registry_auth: HashMap::new(),
timeout_ms: None,
}
}
/// Create a fake nerdctl script in a temp dir.
fn create_fake_nerdctl(dir: &TempDir, script: &str) -> String {
let nerdctl_path = dir.path().join("nerdctl");
let mut file = std::fs::File::create(&nerdctl_path).unwrap();
file.write_all(script.as_bytes()).unwrap();
std::fs::set_permissions(&nerdctl_path, std::fs::Permissions::from_mode(0o755)).unwrap();
dir.path().to_string_lossy().to_string()
}
fn make_context<'a>(
step: &'a WorkflowStep,
workflow: &'a WorkflowInstance,
pointer: &'a ExecutionPointer,
) -> StepExecutionContext<'a> {
StepExecutionContext {
item: None,
execution_pointer: pointer,
persistence_data: None,
step,
workflow,
cancellation_token: tokio_util::sync::CancellationToken::new(),
}
}
/// Wrapper for set_var that handles the Rust 2024 unsafe requirement.
fn set_path(value: &str) {
// SAFETY: These tests run sequentially (nextest runs each test in its own process)
// so concurrent mutation of environment variables is not a concern.
unsafe {
std::env::set_var("PATH", value);
}
}
// ── Happy-path: run succeeds with output parsing ───────────────────
#[tokio::test]
async fn run_success_with_outputs() {
let tmp = TempDir::new().unwrap();
let wfe_marker = "##wfe[output";
let script = format!(
"#!/bin/sh\n\
while [ $# -gt 0 ]; do\n\
case \"$1\" in\n\
run) echo 'hello from container'\n\
echo '{wfe_marker} result=success]'\n\
echo '{wfe_marker} version=1.0.0]'\n\
exit 0;;\n\
pull) echo 'Pulling...'; exit 0;;\n\
login) exit 0;;\n\
--*) shift; shift;;\n\
*) shift;;\n\
esac\n\
done\n\
exit 1\n"
);
let bin_dir = create_fake_nerdctl(&tmp, &script);
let mut config = minimal_config();
config.pull = "always".to_string();
let mut step = ContainerdStep::new(config);
let mut named_step = WorkflowStep::new(0, "containerd");
named_step.name = Some("my_container".to_string());
let workflow = WorkflowInstance::new("test-wf", 1, serde_json::json!({}));
let pointer = ExecutionPointer::new(0);
let ctx = make_context(&named_step, &workflow, &pointer);
let original_path = std::env::var("PATH").unwrap_or_default();
set_path(&format!("{bin_dir}:{original_path}"));
let result = step.run(&ctx).await;
set_path(&original_path);
let result = result.expect("run should succeed");
assert!(result.proceed);
let output = result.output_data.unwrap();
let obj = output.as_object().unwrap();
assert_eq!(obj.get("result").unwrap(), "success");
assert_eq!(obj.get("version").unwrap(), "1.0.0");
assert!(
obj.get("my_container.stdout")
.unwrap()
.as_str()
.unwrap()
.contains("hello from container")
);
assert_eq!(obj.get("my_container.exit_code").unwrap(), 0);
}
// ── Non-zero exit code ─────────────────────────────────────────────
#[tokio::test]
async fn run_container_failure() {
let tmp = TempDir::new().unwrap();
let script = "#!/bin/sh\n\
while [ $# -gt 0 ]; do\n\
case \"$1\" in\n\
run) echo 'something went wrong' >&2; exit 1;;\n\
pull) exit 0;;\n\
--*) shift; shift;;\n\
*) shift;;\n\
esac\n\
done\n\
exit 1\n";
let bin_dir = create_fake_nerdctl(&tmp, script);
let config = minimal_config();
let mut step = ContainerdStep::new(config);
let wf_step = WorkflowStep::new(0, "containerd");
let workflow = WorkflowInstance::new("test-wf", 1, serde_json::json!({}));
let pointer = ExecutionPointer::new(0);
let ctx = make_context(&wf_step, &workflow, &pointer);
let original_path = std::env::var("PATH").unwrap_or_default();
set_path(&format!("{bin_dir}:{original_path}"));
let result = step.run(&ctx).await;
set_path(&original_path);
let err = result.expect_err("should fail with non-zero exit");
let msg = format!("{err}");
assert!(msg.contains("Container exited with code 1"), "got: {msg}");
assert!(msg.contains("something went wrong"), "got: {msg}");
}
// ── Pull failure ───────────────────────────────────────────────────
#[tokio::test]
async fn run_pull_failure() {
let tmp = TempDir::new().unwrap();
let script = "#!/bin/sh\n\
while [ $# -gt 0 ]; do\n\
case \"$1\" in\n\
pull) echo 'image not found' >&2; exit 1;;\n\
--*) shift; shift;;\n\
*) shift;;\n\
esac\n\
done\n\
exit 1\n";
let bin_dir = create_fake_nerdctl(&tmp, script);
let mut config = minimal_config();
config.pull = "always".to_string();
let mut step = ContainerdStep::new(config);
let wf_step = WorkflowStep::new(0, "containerd");
let workflow = WorkflowInstance::new("test-wf", 1, serde_json::json!({}));
let pointer = ExecutionPointer::new(0);
let ctx = make_context(&wf_step, &workflow, &pointer);
let original_path = std::env::var("PATH").unwrap_or_default();
set_path(&format!("{bin_dir}:{original_path}"));
let result = step.run(&ctx).await;
set_path(&original_path);
let err = result.expect_err("should fail on pull");
let msg = format!("{err}");
assert!(msg.contains("Image pull failed"), "got: {msg}");
}
// ── Timeout ────────────────────────────────────────────────────────
#[tokio::test]
async fn run_timeout() {
let tmp = TempDir::new().unwrap();
let script = "#!/bin/sh\n\
while [ $# -gt 0 ]; do\n\
case \"$1\" in\n\
run) sleep 30; exit 0;;\n\
pull) exit 0;;\n\
--*) shift; shift;;\n\
*) shift;;\n\
esac\n\
done\n\
exit 1\n";
let bin_dir = create_fake_nerdctl(&tmp, script);
let mut config = minimal_config();
config.timeout_ms = Some(100);
let mut step = ContainerdStep::new(config);
let wf_step = WorkflowStep::new(0, "containerd");
let workflow = WorkflowInstance::new("test-wf", 1, serde_json::json!({}));
let pointer = ExecutionPointer::new(0);
let ctx = make_context(&wf_step, &workflow, &pointer);
let original_path = std::env::var("PATH").unwrap_or_default();
set_path(&format!("{bin_dir}:{original_path}"));
let result = step.run(&ctx).await;
set_path(&original_path);
let err = result.expect_err("should timeout");
let msg = format!("{err}");
assert!(msg.contains("timed out"), "got: {msg}");
}
// ── Missing nerdctl binary ─────────────────────────────────────────
#[tokio::test]
async fn run_missing_nerdctl() {
let tmp = TempDir::new().unwrap();
let bin_dir = tmp.path().to_string_lossy().to_string();
let config = minimal_config();
let mut step = ContainerdStep::new(config);
let wf_step = WorkflowStep::new(0, "containerd");
let workflow = WorkflowInstance::new("test-wf", 1, serde_json::json!({}));
let pointer = ExecutionPointer::new(0);
let ctx = make_context(&wf_step, &workflow, &pointer);
let original_path = std::env::var("PATH").unwrap_or_default();
set_path(&bin_dir);
let result = step.run(&ctx).await;
set_path(&original_path);
let err = result.expect_err("should fail with missing binary");
let msg = format!("{err}");
assert!(
msg.contains("Failed to spawn nerdctl run") || msg.contains("Failed to pull image")
|| msg.contains("Failed to spawn docker run"),
"got: {msg}"
);
}
// ── pull=never skips pull ──────────────────────────────────────────
#[tokio::test]
async fn run_skip_pull_when_never() {
let tmp = TempDir::new().unwrap();
let script = "#!/bin/sh\n\
while [ $# -gt 0 ]; do\n\
case \"$1\" in\n\
run) echo ran; exit 0;;\n\
pull) echo 'pull should not be called' >&2; exit 1;;\n\
--*) shift; shift;;\n\
*) shift;;\n\
esac\n\
done\n\
exit 1\n";
let bin_dir = create_fake_nerdctl(&tmp, script);
let mut config = minimal_config();
config.pull = "never".to_string();
let mut step = ContainerdStep::new(config);
let wf_step = WorkflowStep::new(0, "containerd");
let workflow = WorkflowInstance::new("test-wf", 1, serde_json::json!({}));
let pointer = ExecutionPointer::new(0);
let ctx = make_context(&wf_step, &workflow, &pointer);
let original_path = std::env::var("PATH").unwrap_or_default();
set_path(&format!("{bin_dir}:{original_path}"));
let result = step.run(&ctx).await;
set_path(&original_path);
result.expect("should succeed without pulling");
}
// ── Workflow data is injected as env vars ──────────────────────────
#[tokio::test]
async fn run_injects_workflow_data_as_env() {
let tmp = TempDir::new().unwrap();
let script = "#!/bin/sh\n\
while [ $# -gt 0 ]; do\n\
case \"$1\" in\n\
run) shift\n\
for arg in \"$@\"; do echo \"ARG:$arg\"; done\n\
exit 0;;\n\
pull) exit 0;;\n\
--*) shift; shift;;\n\
*) shift;;\n\
esac\n\
done\n\
exit 1\n";
let bin_dir = create_fake_nerdctl(&tmp, script);
let mut config = minimal_config();
config.pull = "never".to_string();
config.run = None;
config.command = Some(vec!["true".to_string()]);
let mut step = ContainerdStep::new(config);
let mut wf_step = WorkflowStep::new(0, "containerd");
wf_step.name = Some("env_test".to_string());
let workflow = WorkflowInstance::new(
"test-wf",
1,
serde_json::json!({"my_key": "my_value", "count": 42}),
);
let pointer = ExecutionPointer::new(0);
let ctx = make_context(&wf_step, &workflow, &pointer);
let original_path = std::env::var("PATH").unwrap_or_default();
set_path(&format!("{bin_dir}:{original_path}"));
let result = step.run(&ctx).await;
set_path(&original_path);
let result = result.expect("should succeed");
let stdout = result
.output_data
.unwrap()
.as_object()
.unwrap()
.get("env_test.stdout")
.unwrap()
.as_str()
.unwrap()
.to_string();
// The workflow data should have been injected as uppercase env vars
assert!(stdout.contains("MY_KEY=my_value"), "stdout: {stdout}");
assert!(stdout.contains("COUNT=42"), "stdout: {stdout}");
}
// ── Step name defaults to "unknown" when None ──────────────────────
#[tokio::test]
async fn run_unnamed_step_uses_unknown() {
let tmp = TempDir::new().unwrap();
let script = "#!/bin/sh\n\
while [ $# -gt 0 ]; do\n\
case \"$1\" in\n\
run) echo output; exit 0;;\n\
--*) shift; shift;;\n\
*) shift;;\n\
esac\n\
done\n\
exit 1\n";
let bin_dir = create_fake_nerdctl(&tmp, script);
let config = minimal_config();
let mut step = ContainerdStep::new(config);
let wf_step = WorkflowStep::new(0, "containerd");
let workflow = WorkflowInstance::new("test-wf", 1, serde_json::json!({}));
let pointer = ExecutionPointer::new(0);
let ctx = make_context(&wf_step, &workflow, &pointer);
let original_path = std::env::var("PATH").unwrap_or_default();
set_path(&format!("{bin_dir}:{original_path}"));
let result = step.run(&ctx).await;
set_path(&original_path);
let result = result.expect("should succeed");
let output = result.output_data.unwrap();
let obj = output.as_object().unwrap();
assert!(obj.contains_key("unknown.stdout"));
assert!(obj.contains_key("unknown.stderr"));
assert!(obj.contains_key("unknown.exit_code"));
}
// ── Registry login failure ─────────────────────────────────────────
#[tokio::test]
async fn run_login_failure() {
let tmp = TempDir::new().unwrap();
let script = "#!/bin/sh\n\
while [ $# -gt 0 ]; do\n\
case \"$1\" in\n\
login) echo unauthorized >&2; exit 1;;\n\
--*) shift; shift;;\n\
*) shift;;\n\
esac\n\
done\n\
exit 0\n";
let bin_dir = create_fake_nerdctl(&tmp, script);
let mut config = minimal_config();
config.registry_auth = HashMap::from([(
"registry.example.com".to_string(),
RegistryAuth {
username: "user".to_string(),
password: "wrong".to_string(),
},
)]);
let mut step = ContainerdStep::new(config);
let wf_step = WorkflowStep::new(0, "containerd");
let workflow = WorkflowInstance::new("test-wf", 1, serde_json::json!({}));
let pointer = ExecutionPointer::new(0);
let ctx = make_context(&wf_step, &workflow, &pointer);
let original_path = std::env::var("PATH").unwrap_or_default();
set_path(&format!("{bin_dir}:{original_path}"));
let result = step.run(&ctx).await;
set_path(&original_path);
let err = result.expect_err("should fail on login");
let msg = format!("{err}");
assert!(msg.contains("login failed"), "got: {msg}");
}
// ── Successful login with TLS then run ─────────────────────────────
#[tokio::test]
async fn run_login_success_with_tls() {
let tmp = TempDir::new().unwrap();
let script = "#!/bin/sh\n\
while [ $# -gt 0 ]; do\n\
case \"$1\" in\n\
run) echo ok; exit 0;;\n\
pull) exit 0;;\n\
login) exit 0;;\n\
--*) shift; shift;;\n\
*) shift;;\n\
esac\n\
done\n\
exit 1\n";
let bin_dir = create_fake_nerdctl(&tmp, script);
let mut config = minimal_config();
config.pull = "never".to_string();
config.tls = TlsConfig {
ca: Some("/ca.pem".to_string()),
cert: Some("/cert.pem".to_string()),
key: Some("/key.pem".to_string()),
};
config.registry_auth = HashMap::from([(
"secure.io".to_string(),
RegistryAuth {
username: "admin".to_string(),
password: "secret".to_string(),
},
)]);
let mut step = ContainerdStep::new(config);
let wf_step = WorkflowStep::new(0, "containerd");
let workflow = WorkflowInstance::new("test-wf", 1, serde_json::json!({}));
let pointer = ExecutionPointer::new(0);
let ctx = make_context(&wf_step, &workflow, &pointer);
let original_path = std::env::var("PATH").unwrap_or_default();
set_path(&format!("{bin_dir}:{original_path}"));
let result = step.run(&ctx).await;
set_path(&original_path);
result.expect("should succeed with login + TLS");
}
// ── pull=if-not-present triggers pull ──────────────────────────────
#[tokio::test]
async fn run_pull_if_not_present() {
let tmp = TempDir::new().unwrap();
// Track that pull was called via a marker file
let marker = tmp.path().join("pull_called");
let marker_str = marker.to_string_lossy();
let script = format!(
"#!/bin/sh\n\
while [ $# -gt 0 ]; do\n\
case \"$1\" in\n\
run) echo ran; exit 0;;\n\
pull) touch {marker_str}; exit 0;;\n\
--*) shift; shift;;\n\
*) shift;;\n\
esac\n\
done\n\
exit 1\n"
);
let bin_dir = create_fake_nerdctl(&tmp, &script);
let mut config = minimal_config();
config.pull = "if-not-present".to_string();
let mut step = ContainerdStep::new(config);
let wf_step = WorkflowStep::new(0, "containerd");
let workflow = WorkflowInstance::new("test-wf", 1, serde_json::json!({}));
let pointer = ExecutionPointer::new(0);
let ctx = make_context(&wf_step, &workflow, &pointer);
let original_path = std::env::var("PATH").unwrap_or_default();
set_path(&format!("{bin_dir}:{original_path}"));
let result = step.run(&ctx).await;
set_path(&original_path);
result.expect("should succeed");
assert!(marker.exists(), "pull should have been called for if-not-present");
}

View File

@@ -7,6 +7,8 @@ description = "YAML workflow definitions for WFE"
[features]
default = []
deno = ["deno_core", "deno_error", "url", "reqwest"]
buildkit = ["wfe-buildkit"]
containerd = ["wfe-containerd"]
[dependencies]
wfe-core = { workspace = true }
@@ -22,6 +24,8 @@ deno_core = { workspace = true, optional = true }
deno_error = { workspace = true, optional = true }
url = { workspace = true, optional = true }
reqwest = { workspace = true, optional = true }
wfe-buildkit = { workspace = true, optional = true }
wfe-containerd = { workspace = true, optional = true }
[dev-dependencies]
pretty_assertions = { workspace = true }

View File

@@ -8,6 +8,10 @@ use crate::error::YamlWorkflowError;
use crate::executors::shell::{ShellConfig, ShellStep};
#[cfg(feature = "deno")]
use crate::executors::deno::{DenoConfig, DenoPermissions, DenoStep};
#[cfg(feature = "buildkit")]
use wfe_buildkit::{BuildkitConfig, BuildkitStep};
#[cfg(feature = "containerd")]
use wfe_containerd::{ContainerdConfig, ContainerdStep};
use crate::schema::{WorkflowSpec, YamlErrorBehavior, YamlStep};
/// Factory type alias for step creation closures.
@@ -250,6 +254,36 @@ fn build_step_config_and_factory(
});
Ok((key, value, factory))
}
#[cfg(feature = "buildkit")]
"buildkit" => {
let config = build_buildkit_config(step)?;
let key = format!("wfe_yaml::buildkit::{}", step.name);
let value = serde_json::to_value(&config).map_err(|e| {
YamlWorkflowError::Compilation(format!(
"Failed to serialize buildkit config: {e}"
))
})?;
let config_clone = config.clone();
let factory: StepFactory = Box::new(move || {
Box::new(BuildkitStep::new(config_clone.clone())) as Box<dyn StepBody>
});
Ok((key, value, factory))
}
#[cfg(feature = "containerd")]
"containerd" => {
let config = build_containerd_config(step)?;
let key = format!("wfe_yaml::containerd::{}", step.name);
let value = serde_json::to_value(&config).map_err(|e| {
YamlWorkflowError::Compilation(format!(
"Failed to serialize containerd config: {e}"
))
})?;
let config_clone = config.clone();
let factory: StepFactory = Box::new(move || {
Box::new(ContainerdStep::new(config_clone.clone())) as Box<dyn StepBody>
});
Ok((key, value, factory))
}
other => Err(YamlWorkflowError::Compilation(format!(
"Unknown step type: '{other}'"
))),
@@ -346,6 +380,162 @@ fn parse_duration_ms(s: &str) -> Option<u64> {
}
}
#[cfg(feature = "buildkit")]
fn build_buildkit_config(
step: &YamlStep,
) -> Result<BuildkitConfig, YamlWorkflowError> {
let config = step.config.as_ref().ok_or_else(|| {
YamlWorkflowError::Compilation(format!(
"BuildKit step '{}' is missing 'config' section",
step.name
))
})?;
let dockerfile = config.dockerfile.clone().ok_or_else(|| {
YamlWorkflowError::Compilation(format!(
"BuildKit step '{}' must have 'config.dockerfile'",
step.name
))
})?;
let context = config.context.clone().ok_or_else(|| {
YamlWorkflowError::Compilation(format!(
"BuildKit step '{}' must have 'config.context'",
step.name
))
})?;
let timeout_ms = config.timeout.as_ref().and_then(|t| parse_duration_ms(t));
let tls = config
.tls
.as_ref()
.map(|t| wfe_buildkit::TlsConfig {
ca: t.ca.clone(),
cert: t.cert.clone(),
key: t.key.clone(),
})
.unwrap_or_default();
let registry_auth = config
.registry_auth
.as_ref()
.map(|ra| {
ra.iter()
.map(|(k, v)| {
(
k.clone(),
wfe_buildkit::RegistryAuth {
username: v.username.clone(),
password: v.password.clone(),
},
)
})
.collect()
})
.unwrap_or_default();
Ok(BuildkitConfig {
dockerfile,
context,
target: config.target.clone(),
tags: config.tags.clone(),
build_args: config.build_args.clone(),
cache_from: config.cache_from.clone(),
cache_to: config.cache_to.clone(),
push: config.push.unwrap_or(false),
output_type: None,
buildkit_addr: config
.buildkit_addr
.clone()
.unwrap_or_else(|| "unix:///run/buildkit/buildkitd.sock".to_string()),
tls,
registry_auth,
timeout_ms,
})
}
#[cfg(feature = "containerd")]
fn build_containerd_config(
step: &YamlStep,
) -> Result<ContainerdConfig, YamlWorkflowError> {
let config = step.config.as_ref().ok_or_else(|| {
YamlWorkflowError::Compilation(format!(
"Containerd step '{}' is missing 'config' section",
step.name
))
})?;
let image = config.image.clone().ok_or_else(|| {
YamlWorkflowError::Compilation(format!(
"Containerd step '{}' must have 'config.image'",
step.name
))
})?;
let timeout_ms = config.timeout.as_ref().and_then(|t| parse_duration_ms(t));
let tls = config
.tls
.as_ref()
.map(|t| wfe_containerd::TlsConfig {
ca: t.ca.clone(),
cert: t.cert.clone(),
key: t.key.clone(),
})
.unwrap_or_default();
let registry_auth = config
.registry_auth
.as_ref()
.map(|ra| {
ra.iter()
.map(|(k, v)| {
(
k.clone(),
wfe_containerd::RegistryAuth {
username: v.username.clone(),
password: v.password.clone(),
},
)
})
.collect()
})
.unwrap_or_default();
let volumes = config
.volumes
.iter()
.map(|v| wfe_containerd::VolumeMountConfig {
source: v.source.clone(),
target: v.target.clone(),
readonly: v.readonly,
})
.collect();
Ok(ContainerdConfig {
image,
command: config.command.clone(),
run: config.run.clone(),
env: config.env.clone(),
volumes,
working_dir: config.working_dir.clone(),
user: config.user.clone().unwrap_or_else(|| "65534:65534".to_string()),
network: config.network.clone().unwrap_or_else(|| "none".to_string()),
memory: config.memory.clone(),
cpu: config.cpu.clone(),
pull: config.pull.clone().unwrap_or_else(|| "if-not-present".to_string()),
containerd_addr: config
.containerd_addr
.clone()
.unwrap_or_else(|| "/run/containerd/containerd.sock".to_string()),
cli: config.cli.clone().unwrap_or_else(|| "nerdctl".to_string()),
tls,
registry_auth,
timeout_ms,
})
}
fn map_error_behavior(eb: &YamlErrorBehavior) -> Result<ErrorBehavior, YamlWorkflowError> {
match eb.behavior_type.as_str() {
"retry" => {

View File

@@ -58,6 +58,38 @@ pub struct StepConfig {
pub permissions: Option<DenoPermissionsYaml>,
#[serde(default)]
pub modules: Vec<String>,
// BuildKit fields
pub dockerfile: Option<String>,
pub context: Option<String>,
pub target: Option<String>,
#[serde(default)]
pub tags: Vec<String>,
#[serde(default)]
pub build_args: HashMap<String, String>,
#[serde(default)]
pub cache_from: Vec<String>,
#[serde(default)]
pub cache_to: Vec<String>,
pub push: Option<bool>,
pub buildkit_addr: Option<String>,
#[serde(default)]
pub tls: Option<TlsConfigYaml>,
#[serde(default)]
pub registry_auth: Option<HashMap<String, RegistryAuthYaml>>,
// Containerd fields
pub image: Option<String>,
#[serde(default)]
pub command: Option<Vec<String>>,
#[serde(default)]
pub volumes: Vec<VolumeMountYaml>,
pub user: Option<String>,
pub network: Option<String>,
pub memory: Option<String>,
pub cpu: Option<String>,
pub pull: Option<String>,
pub containerd_addr: Option<String>,
/// CLI binary name for containerd steps: "nerdctl" (default) or "docker".
pub cli: Option<String>,
}
/// YAML-level permission configuration for Deno steps.
@@ -84,6 +116,30 @@ pub struct DataRef {
pub json_path: Option<String>,
}
/// YAML-level TLS configuration for BuildKit steps.
#[derive(Debug, Deserialize, Clone)]
pub struct TlsConfigYaml {
pub ca: Option<String>,
pub cert: Option<String>,
pub key: Option<String>,
}
/// YAML-level registry auth configuration for BuildKit steps.
#[derive(Debug, Deserialize, Clone)]
pub struct RegistryAuthYaml {
pub username: String,
pub password: String,
}
/// YAML-level volume mount configuration for containerd steps.
#[derive(Debug, Deserialize, Clone)]
pub struct VolumeMountYaml {
pub source: String,
pub target: String,
#[serde(default)]
pub readonly: bool,
}
#[derive(Debug, Deserialize)]
pub struct YamlErrorBehavior {
#[serde(rename = "type")]

View File

@@ -89,6 +89,90 @@ fn validate_steps(
}
}
// BuildKit steps must have config with dockerfile and context.
if let Some(ref step_type) = step.step_type
&& step_type == "buildkit"
{
let config = step.config.as_ref().ok_or_else(|| {
YamlWorkflowError::Validation(format!(
"BuildKit step '{}' must have a 'config' section",
step.name
))
})?;
if config.dockerfile.is_none() {
return Err(YamlWorkflowError::Validation(format!(
"BuildKit step '{}' must have 'config.dockerfile'",
step.name
)));
}
if config.context.is_none() {
return Err(YamlWorkflowError::Validation(format!(
"BuildKit step '{}' must have 'config.context'",
step.name
)));
}
if config.push.unwrap_or(false) && config.tags.is_empty() {
return Err(YamlWorkflowError::Validation(format!(
"BuildKit step '{}' has push=true but no tags specified",
step.name
)));
}
}
// Containerd steps must have config with image and exactly one of run or command.
if let Some(ref step_type) = step.step_type
&& step_type == "containerd"
{
let config = step.config.as_ref().ok_or_else(|| {
YamlWorkflowError::Validation(format!(
"Containerd step '{}' must have a 'config' section",
step.name
))
})?;
if config.image.is_none() {
return Err(YamlWorkflowError::Validation(format!(
"Containerd step '{}' must have 'config.image'",
step.name
)));
}
let has_run = config.run.is_some();
let has_command = config.command.is_some();
if !has_run && !has_command {
return Err(YamlWorkflowError::Validation(format!(
"Containerd step '{}' must have 'config.run' or 'config.command'",
step.name
)));
}
if has_run && has_command {
return Err(YamlWorkflowError::Validation(format!(
"Containerd step '{}' cannot have both 'config.run' and 'config.command'",
step.name
)));
}
if let Some(ref network) = config.network {
match network.as_str() {
"none" | "host" | "bridge" => {}
other => {
return Err(YamlWorkflowError::Validation(format!(
"Containerd step '{}' has invalid network '{}'. Must be none, host, or bridge",
step.name, other
)));
}
}
}
if let Some(ref pull) = config.pull {
match pull.as_str() {
"always" | "if-not-present" | "never" => {}
other => {
return Err(YamlWorkflowError::Validation(format!(
"Containerd step '{}' has invalid pull policy '{}'. Must be always, if-not-present, or never",
step.name, other
)));
}
}
}
}
// Validate step-level error behavior.
if let Some(ref eb) = step.error_behavior {
validate_error_behavior_type(&eb.behavior_type)?;