fix(wfe-containerd): fix remote daemon support

Four bugs fixed in the containerd gRPC executor:

- Snapshot parent: resolve image chain ID from content store instead of
  using empty parent, which created rootless containers with no binaries
- I/O capture: replace FIFOs with regular files for stdout/stderr since
  FIFOs don't work across virtiofs filesystem boundaries (Lima VMs)
- Capabilities: grant Docker-default capability set (SETUID, SETGID,
  CHOWN, etc.) when running as root so apt-get and similar tools work
- Shell path: use /bin/sh instead of sh in process args since container
  PATH may be empty

Also adds WFE_IO_DIR env var for shared filesystem support with remote
daemons, and documents the remote daemon setup in lib.rs.
This commit is contained in:
2026-03-29 16:56:59 +01:00
parent b0bf71aa61
commit 272ddf17c2
4 changed files with 387 additions and 89 deletions

View File

@@ -9,7 +9,7 @@ description = "containerd container runner executor for WFE"
[dependencies]
wfe-core = { workspace = true }
wfe-containerd-protos = { version = "1.4.0", path = "../wfe-containerd-protos", registry = "sunbeam" }
wfe-containerd-protos = { version = "1.5.0", path = "../wfe-containerd-protos", registry = "sunbeam" }
tokio = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
@@ -21,7 +21,8 @@ tower = "0.5"
hyper-util = { version = "0.1", features = ["tokio"] }
prost-types = "0.14"
uuid = { version = "1", features = ["v4"] }
libc = "0.2"
sha2 = "0.10"
tokio-stream = "0.1"
[dev-dependencies]
pretty_assertions = { workspace = true }

View File

@@ -1,3 +1,50 @@
//! Containerd container executor for WFE.
//!
//! Runs workflow steps as isolated OCI containers via the containerd gRPC API.
//!
//! # Remote daemon support
//!
//! The executor creates named pipes (FIFOs) on the **local** filesystem for
//! stdout/stderr capture, then passes those paths to the containerd task spec.
//! The containerd shim opens the FIFOs from **its** side. This means the FIFO
//! paths must be accessible to both the executor process and the containerd
//! daemon.
//!
//! When containerd runs on a different machine (e.g. a Lima VM), you need:
//!
//! 1. **Shared filesystem** — mount a host directory into the VM so both sides
//! see the same FIFO files. With Lima + virtiofs:
//! ```yaml
//! # lima config
//! mounts:
//! - location: /tmp/wfe-io
//! mountPoint: /tmp/wfe-io
//! writable: true
//! ```
//!
//! 2. **`WFE_IO_DIR` env var** — point the executor at the shared directory:
//! ```sh
//! export WFE_IO_DIR=/tmp/wfe-io
//! ```
//! Without this, FIFOs are created under `std::env::temp_dir()` which is
//! only visible to the host.
//!
//! 3. **gRPC transport** — Lima's Unix socket forwarding is unreliable for
//! HTTP/2 (gRPC). Use a TCP socat proxy inside the VM instead:
//! ```sh
//! # Inside the VM:
//! socat TCP4-LISTEN:2500,fork,reuseaddr UNIX-CONNECT:/run/containerd/containerd.sock &
//! ```
//! Then connect via `WFE_CONTAINERD_ADDR=http://127.0.0.1:2500` (Lima
//! auto-forwards guest TCP ports).
//!
//! 4. **FIFO permissions** — the FIFOs are created with mode `0666` and a
//! temporarily cleared umask so the remote shim (running as root) can open
//! them through the shared mount.
//!
//! See `test/lima/wfe-test.yaml` for a complete VM configuration that sets all
//! of this up.
pub mod config;
pub mod step;

View File

@@ -11,6 +11,9 @@ use wfe_containerd_protos::containerd::services::containers::v1::{
containers_client::ContainersClient, Container, CreateContainerRequest,
DeleteContainerRequest, container::Runtime,
};
use wfe_containerd_protos::containerd::services::content::v1::{
content_client::ContentClient, ReadContentRequest,
};
use wfe_containerd_protos::containerd::services::images::v1::{
images_client::ImagesClient, GetImageRequest,
};
@@ -134,6 +137,153 @@ impl ContainerdStep {
}
}
/// Resolve the snapshot chain ID for an image.
///
/// This reads the image manifest and config from the content store to
/// compute the chain ID of the topmost layer. The chain ID is used as
/// the parent snapshot when preparing a writable rootfs for a container.
///
/// Chain ID computation follows the OCI image spec:
/// chain_id[0] = diff_id[0]
/// chain_id[n] = sha256(chain_id[n-1] + " " + diff_id[n])
async fn resolve_image_chain_id(
channel: &Channel,
image: &str,
namespace: &str,
) -> Result<String, WfeError> {
use sha2::{Sha256, Digest};
// 1. Get the image record to find the manifest digest.
let mut images_client = ImagesClient::new(channel.clone());
let req = Self::with_namespace(
GetImageRequest { name: image.to_string() },
namespace,
);
let image_resp = images_client.get(req).await.map_err(|e| {
WfeError::StepExecution(format!("failed to get image '{image}': {e}"))
})?;
let img = image_resp.into_inner().image.ok_or_else(|| {
WfeError::StepExecution(format!("image '{image}' has no record"))
})?;
let target = img.target.ok_or_else(|| {
WfeError::StepExecution(format!("image '{image}' has no target descriptor"))
})?;
// The target might be an index (multi-platform) or a manifest.
// Read the content and determine based on mediaType.
let manifest_digest = target.digest.clone();
let manifest_bytes = Self::read_content(channel, &manifest_digest, namespace).await?;
let manifest_json: serde_json::Value = serde_json::from_slice(&manifest_bytes)
.map_err(|e| WfeError::StepExecution(format!("failed to parse manifest: {e}")))?;
// 2. If it's an index, pick the matching platform manifest.
let manifest_json = if manifest_json.get("manifests").is_some() {
// OCI image index — find the platform-matching manifest.
let arch = std::env::consts::ARCH;
let oci_arch = match arch {
"aarch64" => "arm64",
"x86_64" => "amd64",
other => other,
};
let manifests = manifest_json["manifests"].as_array().ok_or_else(|| {
WfeError::StepExecution("image index has no manifests array".to_string())
})?;
let platform_manifest = manifests.iter().find(|m| {
m.get("platform")
.and_then(|p| p.get("architecture"))
.and_then(|a| a.as_str())
== Some(oci_arch)
}).ok_or_else(|| {
WfeError::StepExecution(format!(
"no manifest for architecture '{oci_arch}' in image index"
))
})?;
let digest = platform_manifest["digest"].as_str().ok_or_else(|| {
WfeError::StepExecution("platform manifest has no digest".to_string())
})?;
let bytes = Self::read_content(channel, digest, namespace).await?;
serde_json::from_slice(&bytes)
.map_err(|e| WfeError::StepExecution(format!("failed to parse platform manifest: {e}")))?
} else {
manifest_json
};
// 3. Get the config digest from the manifest.
let config_digest = manifest_json["config"]["digest"]
.as_str()
.ok_or_else(|| {
WfeError::StepExecution("manifest has no config.digest".to_string())
})?;
// 4. Read the image config.
let config_bytes = Self::read_content(channel, config_digest, namespace).await?;
let config_json: serde_json::Value = serde_json::from_slice(&config_bytes)
.map_err(|e| WfeError::StepExecution(format!("failed to parse image config: {e}")))?;
// 5. Extract diff_ids and compute chain ID.
let diff_ids = config_json["rootfs"]["diff_ids"]
.as_array()
.ok_or_else(|| {
WfeError::StepExecution("image config has no rootfs.diff_ids".to_string())
})?;
if diff_ids.is_empty() {
return Err(WfeError::StepExecution(
"image has no layers (empty diff_ids)".to_string(),
));
}
let mut chain_id = diff_ids[0]
.as_str()
.ok_or_else(|| WfeError::StepExecution("diff_id is not a string".to_string()))?
.to_string();
for diff_id in &diff_ids[1..] {
let diff = diff_id.as_str().ok_or_else(|| {
WfeError::StepExecution("diff_id is not a string".to_string())
})?;
let mut hasher = Sha256::new();
hasher.update(format!("{chain_id} {diff}"));
chain_id = format!("sha256:{:x}", hasher.finalize());
}
tracing::debug!(image = image, chain_id = %chain_id, "resolved image chain ID");
Ok(chain_id)
}
/// Read content from the containerd content store by digest.
async fn read_content(
channel: &Channel,
digest: &str,
namespace: &str,
) -> Result<Vec<u8>, WfeError> {
use tokio_stream::StreamExt;
let mut client = ContentClient::new(channel.clone());
let req = Self::with_namespace(
ReadContentRequest {
digest: digest.to_string(),
offset: 0,
size: 0, // read all
},
namespace,
);
let mut stream = client.read(req).await.map_err(|e| {
WfeError::StepExecution(format!("failed to read content {digest}: {e}"))
})?.into_inner();
let mut data = Vec::new();
while let Some(chunk) = stream.next().await {
let chunk = chunk.map_err(|e| {
WfeError::StepExecution(format!("error reading content {digest}: {e}"))
})?;
data.extend_from_slice(&chunk.data);
}
Ok(data)
}
/// Build a minimal OCI runtime spec as a `prost_types::Any`.
///
/// The spec is serialized as JSON and wrapped in a protobuf Any with
@@ -144,7 +294,7 @@ impl ContainerdStep {
) -> prost_types::Any {
// Build the args array for the process.
let args: Vec<String> = if let Some(ref run) = self.config.run {
vec!["sh".to_string(), "-c".to_string(), run.clone()]
vec!["/bin/sh".to_string(), "-c".to_string(), run.clone()]
} else if let Some(ref command) = self.config.command {
command.clone()
} else {
@@ -206,13 +356,24 @@ impl ContainerdStep {
"cwd": self.config.working_dir.as_deref().unwrap_or("/"),
});
// Add capabilities (minimal set).
// Add capabilities. When running as root, grant the default Docker
// capability set so tools like apt-get work. Non-root gets nothing.
let caps = if uid == 0 {
serde_json::json!([
"CAP_AUDIT_WRITE", "CAP_CHOWN", "CAP_DAC_OVERRIDE",
"CAP_FOWNER", "CAP_FSETID", "CAP_KILL", "CAP_MKNOD",
"CAP_NET_BIND_SERVICE", "CAP_NET_RAW", "CAP_SETFCAP",
"CAP_SETGID", "CAP_SETPCAP", "CAP_SETUID", "CAP_SYS_CHROOT",
])
} else {
serde_json::json!([])
};
process["capabilities"] = serde_json::json!({
"bounding": [],
"effective": [],
"inheritable": [],
"permitted": [],
"ambient": [],
"bounding": caps,
"effective": caps,
"inheritable": caps,
"permitted": caps,
"ambient": caps,
});
let spec = serde_json::json!({
@@ -400,15 +561,11 @@ impl StepBody for ContainerdStep {
WfeError::StepExecution(format!("failed to create container: {e}"))
})?;
// 6. Prepare snapshot to get rootfs mounts.
// 6. Prepare snapshot with the image's layers as parent.
let mut snapshots_client = SnapshotsClient::new(channel.clone());
// Get the image's chain ID to use as parent for the snapshot.
// We try to get mounts from the snapshot (already committed by image unpack).
// If snapshot already exists, use Mounts; otherwise Prepare from the image's
// snapshot key (same as container_id for our flow).
let mounts = {
// First try: see if the snapshot was already prepared.
// First try: see if a snapshot was already prepared for this container.
let mounts_req = Self::with_namespace(
MountsRequest {
snapshotter: DEFAULT_SNAPSHOTTER.to_string(),
@@ -420,12 +577,18 @@ impl StepBody for ContainerdStep {
match snapshots_client.mounts(mounts_req).await {
Ok(resp) => resp.into_inner().mounts,
Err(_) => {
// Try to prepare a fresh snapshot.
// Resolve the image's chain ID to use as snapshot parent.
let parent = if should_check {
Self::resolve_image_chain_id(&channel, &self.config.image, namespace).await?
} else {
String::new()
};
let prepare_req = Self::with_namespace(
PrepareSnapshotRequest {
snapshotter: DEFAULT_SNAPSHOTTER.to_string(),
key: container_id.clone(),
parent: String::new(),
parent,
labels: HashMap::new(),
},
namespace,
@@ -445,7 +608,12 @@ impl StepBody for ContainerdStep {
};
// 7. Create FIFO paths for stdout/stderr capture.
let tmp_dir = std::env::temp_dir().join(format!("wfe-io-{container_id}"));
// Use WFE_IO_DIR if set (e.g., a shared mount with a remote containerd daemon),
// otherwise fall back to the system temp directory.
let io_base = std::env::var("WFE_IO_DIR")
.map(std::path::PathBuf::from)
.unwrap_or_else(|_| std::env::temp_dir());
let tmp_dir = io_base.join(format!("wfe-io-{container_id}"));
std::fs::create_dir_all(&tmp_dir).map_err(|e| {
WfeError::StepExecution(format!("failed to create IO temp dir: {e}"))
})?;
@@ -453,19 +621,26 @@ impl StepBody for ContainerdStep {
let stdout_path = tmp_dir.join("stdout");
let stderr_path = tmp_dir.join("stderr");
// Create named pipes (FIFOs) for the task I/O.
// Create empty files for the shim to write stdout/stderr to.
// We use regular files instead of FIFOs because FIFOs don't work
// across filesystem boundaries (e.g., virtiofs mounts with Lima VMs).
for path in [&stdout_path, &stderr_path] {
// Remove if exists from a previous run.
let _ = std::fs::remove_file(path);
nix_mkfifo(path).map_err(|e| {
WfeError::StepExecution(format!("failed to create FIFO {}: {e}", path.display()))
std::fs::File::create(path).map_err(|e| {
WfeError::StepExecution(format!("failed to create IO file {}: {e}", path.display()))
})?;
// Ensure the remote shim can write to it.
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o666)).ok();
}
}
let stdout_str = stdout_path.to_string_lossy().to_string();
let stderr_str = stderr_path.to_string_lossy().to_string();
// 8. Create and start task.
// 8. Create task.
let mut tasks_client = TasksClient::new(channel.clone());
let create_task_req = Self::with_namespace(
@@ -487,17 +662,6 @@ impl StepBody for ContainerdStep {
WfeError::StepExecution(format!("failed to create task: {e}"))
})?;
// Spawn readers for FIFOs before starting the task (FIFOs block on open
// until both ends connect).
let stdout_reader = {
let path = stdout_path.clone();
tokio::spawn(async move { read_fifo(&path).await })
};
let stderr_reader = {
let path = stderr_path.clone();
tokio::spawn(async move { read_fifo(&path).await })
};
// Start the task.
let start_req = Self::with_namespace(
StartRequest {
@@ -555,14 +719,12 @@ impl StepBody for ContainerdStep {
}
};
// 10. Read captured output.
let stdout_content = stdout_reader
// 10. Read captured output from files.
let stdout_content = tokio::fs::read_to_string(&stdout_path)
.await
.unwrap_or_else(|_| Ok(String::new()))
.unwrap_or_default();
let stderr_content = stderr_reader
let stderr_content = tokio::fs::read_to_string(&stderr_path)
.await
.unwrap_or_else(|_| Ok(String::new()))
.unwrap_or_default();
// 11. Cleanup: delete task, then container.
@@ -629,38 +791,6 @@ impl ContainerdStep {
}
}
/// Create a named pipe (FIFO) at the given path. This is a thin wrapper
/// around the `mkfifo` libc call, avoiding an extra dependency.
fn nix_mkfifo(path: &Path) -> std::io::Result<()> {
use std::ffi::CString;
use std::os::unix::ffi::OsStrExt;
let c_path = CString::new(path.as_os_str().as_bytes())
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
// SAFETY: c_path is a valid null-terminated C string and 0o622 is a
// standard FIFO permission mode.
let ret = unsafe { libc::mkfifo(c_path.as_ptr(), 0o622) };
if ret != 0 {
Err(std::io::Error::last_os_error())
} else {
Ok(())
}
}
/// Read the entire contents of a FIFO into a String. This opens the FIFO
/// in read mode (which blocks until a writer opens the other end) and reads
/// until EOF.
async fn read_fifo(path: &Path) -> Result<String, std::io::Error> {
use tokio::io::AsyncReadExt;
let file = tokio::fs::File::open(path).await?;
let mut reader = tokio::io::BufReader::new(file);
let mut buf = String::new();
reader.read_to_string(&mut buf).await?;
Ok(buf)
}
#[cfg(test)]
mod tests {
use super::*;
@@ -1033,22 +1163,6 @@ mod tests {
assert_eq!(step.config.containerd_addr, "/run/containerd/containerd.sock");
}
// ── nix_mkfifo ─────────────────────────────────────────────────────
#[test]
fn mkfifo_creates_and_removes_fifo() {
let tmp = tempfile::tempdir().unwrap();
let fifo_path = tmp.path().join("test.fifo");
nix_mkfifo(&fifo_path).unwrap();
assert!(fifo_path.exists());
std::fs::remove_file(&fifo_path).unwrap();
}
#[test]
fn mkfifo_invalid_path_returns_error() {
let result = nix_mkfifo(Path::new("/nonexistent-dir/fifo"));
assert!(result.is_err());
}
}
/// Integration tests that require a live containerd daemon.

View File

@@ -2,7 +2,7 @@
//!
//! These tests require a live containerd daemon. They are skipped when the
//! socket is not available. Set `WFE_CONTAINERD_ADDR` to point to a custom
//! socket, or use the default `~/.lima/wfe-test/sock/containerd.sock`.
//! socket, or use the default `~/.lima/wfe-test/containerd.sock`.
//!
//! Before running, ensure the test image is pre-pulled:
//! ctr -n default image pull docker.io/library/alpine:3.18
@@ -19,7 +19,7 @@ use wfe_core::traits::step::{StepBody, StepExecutionContext};
fn containerd_addr() -> Option<String> {
let addr = std::env::var("WFE_CONTAINERD_ADDR").unwrap_or_else(|_| {
format!(
"unix://{}/.lima/wfe-test/sock/containerd.sock",
"unix://{}/.lima/wfe-test/containerd.sock",
std::env::var("HOME").unwrap_or_else(|_| "/root".to_string())
)
});
@@ -151,6 +151,142 @@ async fn skip_image_check_when_pull_never() {
);
}
// ── Run a real container end-to-end ──────────────────────────────────
#[tokio::test]
async fn run_echo_hello_in_container() {
let Some(addr) = containerd_addr() else {
eprintln!("SKIP: containerd socket not available");
return;
};
let mut config = minimal_config(&addr);
config.image = "docker.io/library/alpine:3.18".to_string();
config.run = Some("echo hello-from-container".to_string());
config.pull = "if-not-present".to_string();
config.user = "0:0".to_string();
config.timeout_ms = Some(30_000);
let mut step = ContainerdStep::new(config);
let mut wf_step = WorkflowStep::new(0, "containerd");
wf_step.name = Some("echo-test".to_string());
let workflow = WorkflowInstance::new("test-wf", 1, serde_json::json!({}));
let pointer = ExecutionPointer::new(0);
let ctx = make_context(&wf_step, &workflow, &pointer);
let result = step.run(&ctx).await;
match &result {
Ok(r) => {
eprintln!("SUCCESS: {:?}", r.output_data);
let data = r.output_data.as_ref().unwrap().as_object().unwrap();
let stdout = data.get("echo-test.stdout").unwrap().as_str().unwrap();
assert!(stdout.contains("hello-from-container"), "stdout: {stdout}");
}
Err(e) => panic!("container step failed: {e}"),
}
}
// ── Run a container with a volume mount ──────────────────────────────
#[tokio::test]
async fn run_container_with_volume_mount() {
let Some(addr) = containerd_addr() else {
eprintln!("SKIP: containerd socket not available");
return;
};
let shared_dir = std::env::var("WFE_IO_DIR")
.unwrap_or_else(|_| "/tmp/wfe-io".to_string());
let vol_dir = format!("{shared_dir}/test-vol");
std::fs::create_dir_all(&vol_dir).unwrap();
let mut config = minimal_config(&addr);
config.image = "docker.io/library/alpine:3.18".to_string();
config.run = Some("echo hello > /mnt/test/output.txt && cat /mnt/test/output.txt".to_string());
config.pull = "if-not-present".to_string();
config.user = "0:0".to_string();
config.timeout_ms = Some(30_000);
config.volumes = vec![wfe_containerd::VolumeMountConfig {
source: vol_dir.clone(),
target: "/mnt/test".to_string(),
readonly: false,
}];
let mut step = ContainerdStep::new(config);
let mut wf_step = WorkflowStep::new(0, "containerd");
wf_step.name = Some("vol-test".to_string());
let workflow = WorkflowInstance::new("test-wf", 1, serde_json::json!({}));
let pointer = ExecutionPointer::new(0);
let ctx = make_context(&wf_step, &workflow, &pointer);
match step.run(&ctx).await {
Ok(r) => {
let data = r.output_data.as_ref().unwrap().as_object().unwrap();
let stdout = data.get("vol-test.stdout").unwrap().as_str().unwrap();
assert!(stdout.contains("hello"), "stdout: {stdout}");
}
Err(e) => panic!("container step with volume failed: {e}"),
}
std::fs::remove_dir_all(&vol_dir).ok();
}
// ── Run a container with volume mount and network (simulates install step) ──
#[tokio::test]
async fn run_debian_with_volume_and_network() {
let Some(addr) = containerd_addr() else {
eprintln!("SKIP: containerd socket not available");
return;
};
let shared_dir = std::env::var("WFE_IO_DIR")
.unwrap_or_else(|_| "/tmp/wfe-io".to_string());
let cargo_dir = format!("{shared_dir}/test-cargo");
let rustup_dir = format!("{shared_dir}/test-rustup");
std::fs::create_dir_all(&cargo_dir).unwrap();
std::fs::create_dir_all(&rustup_dir).unwrap();
let mut config = minimal_config(&addr);
config.image = "docker.io/library/debian:bookworm-slim".to_string();
config.run = Some("echo hello && ls /cargo && ls /rustup".to_string());
config.pull = "if-not-present".to_string();
config.user = "0:0".to_string();
config.network = "host".to_string();
config.timeout_ms = Some(30_000);
config.env.insert("CARGO_HOME".to_string(), "/cargo".to_string());
config.env.insert("RUSTUP_HOME".to_string(), "/rustup".to_string());
config.volumes = vec![
wfe_containerd::VolumeMountConfig {
source: cargo_dir.clone(),
target: "/cargo".to_string(),
readonly: false,
},
wfe_containerd::VolumeMountConfig {
source: rustup_dir.clone(),
target: "/rustup".to_string(),
readonly: false,
},
];
let mut step = ContainerdStep::new(config);
let mut wf_step = WorkflowStep::new(0, "containerd");
wf_step.name = Some("debian-test".to_string());
let workflow = WorkflowInstance::new("test-wf", 1, serde_json::json!({}));
let pointer = ExecutionPointer::new(0);
let ctx = make_context(&wf_step, &workflow, &pointer);
match step.run(&ctx).await {
Ok(r) => {
eprintln!("SUCCESS: {:?}", r.output_data);
}
Err(e) => panic!("debian container with volumes failed: {e}"),
}
std::fs::remove_dir_all(&cargo_dir).ok();
std::fs::remove_dir_all(&rustup_dir).ok();
}
// ── Step name defaults to "unknown" when None ────────────────────────
#[tokio::test]