From ae049cf2d3ce41c1efb0575e8742d98d9bc9f45a Mon Sep 17 00:00:00 2001 From: Sienna Meridian Satterwhite Date: Mon, 6 Apr 2026 18:09:48 +0100 Subject: [PATCH] fix(wfe-containerd): replace nerdctl CLI shelling with proper containerd gRPC API calls --- wfe-containerd/src/step.rs | 172 +++++++++++++++++++++++++++---------- 1 file changed, 128 insertions(+), 44 deletions(-) diff --git a/wfe-containerd/src/step.rs b/wfe-containerd/src/step.rs index 491ab8f..2d13ddc 100644 --- a/wfe-containerd/src/step.rs +++ b/wfe-containerd/src/step.rs @@ -47,7 +47,7 @@ impl ContainerdStep { /// /// Supports Unix socket paths (bare `/path` or `unix:///path`) and /// TCP/HTTP endpoints. - async fn connect(addr: &str) -> Result { + pub(crate) async fn connect(addr: &str) -> Result { let channel = if addr.starts_with('/') || addr.starts_with("unix://") { let socket_path = addr .strip_prefix("unix://") @@ -403,7 +403,7 @@ impl ContainerdStep { } /// Inject a `containerd-namespace` header into a tonic request. - fn with_namespace(req: T, namespace: &str) -> tonic::Request { + pub(crate) fn with_namespace(req: T, namespace: &str) -> tonic::Request { let mut request = tonic::Request::new(req); request.metadata_mut().insert( "containerd-namespace", @@ -412,64 +412,148 @@ impl ContainerdStep { request } - /// Start a long-running service container (does not wait for exit). + /// Start a long-running service container via the containerd gRPC API. /// /// Used by `ContainerdServiceProvider` to provision infrastructure services. /// The container runs on the host network so its ports are accessible on 127.0.0.1. + /// Unlike step execution, this does NOT wait for the container to exit. pub async fn run_service( - _addr: &str, + addr: &str, container_id: &str, image: &str, env: &std::collections::HashMap, ) -> Result<(), WfeError> { - // TODO: Implement containerd service container lifecycle. - // This requires refactoring the internal OCI spec builder and snapshot - // preparation into reusable functions. For now, delegate to nerdctl CLI - // as a pragmatic fallback. - let mut cmd = tokio::process::Command::new("nerdctl"); - cmd.arg("run") - .arg("-d") - .arg("--name") - .arg(container_id) - .arg("--network") - .arg("host"); + let namespace = DEFAULT_NAMESPACE; + let channel = Self::connect(addr).await?; - for (k, v) in env { - cmd.arg("-e").arg(format!("{k}={v}")); - } + // Verify image exists. + Self::ensure_image(&channel, image, namespace).await?; - cmd.arg(image); + // Build a config for host-network service container. + let config = ContainerdConfig { + image: image.to_string(), + command: None, + run: None, + env: env.clone(), + volumes: vec![], + working_dir: None, + user: "0:0".to_string(), + network: "host".to_string(), + memory: None, + cpu: None, + pull: "if-not-present".to_string(), + containerd_addr: addr.to_string(), + cli: "nerdctl".to_string(), + tls: Default::default(), + registry_auth: Default::default(), + timeout_ms: None, + }; - let output = cmd.output().await.map_err(|e| { - WfeError::StepExecution(format!("failed to start service container via nerdctl: {e}")) + let step = Self::new(config); + let oci_spec = step.build_oci_spec(env); + + // Create container. + let mut containers_client = ContainersClient::new(channel.clone()); + let create_req = Self::with_namespace( + CreateContainerRequest { + container: Some(Container { + id: container_id.to_string(), + image: image.to_string(), + runtime: Some(Runtime { + name: "io.containerd.runc.v2".to_string(), + options: None, + }), + spec: Some(oci_spec), + snapshotter: DEFAULT_SNAPSHOTTER.to_string(), + snapshot_key: container_id.to_string(), + labels: HashMap::new(), + created_at: None, + updated_at: None, + extensions: HashMap::new(), + sandbox: String::new(), + }), + }, + namespace, + ); + containers_client.create(create_req).await.map_err(|e| { + WfeError::StepExecution(format!("failed to create service container: {e}")) })?; - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - return Err(WfeError::StepExecution(format!( - "nerdctl run failed for service '{}': {stderr}", - container_id - ))); - } + // Prepare snapshot. + let mut snapshots_client = SnapshotsClient::new(channel.clone()); + let mounts = { + let mounts_req = Self::with_namespace( + MountsRequest { + snapshotter: DEFAULT_SNAPSHOTTER.to_string(), + key: container_id.to_string(), + }, + namespace, + ); + match snapshots_client.mounts(mounts_req).await { + Ok(resp) => resp.into_inner().mounts, + Err(_) => { + let parent = + Self::resolve_image_chain_id(&channel, image, namespace).await?; + let prepare_req = Self::with_namespace( + PrepareSnapshotRequest { + snapshotter: DEFAULT_SNAPSHOTTER.to_string(), + key: container_id.to_string(), + parent, + labels: HashMap::new(), + }, + namespace, + ); + snapshots_client + .prepare(prepare_req) + .await + .map_err(|e| { + WfeError::StepExecution(format!("failed to prepare snapshot: {e}")) + })? + .into_inner() + .mounts + } + } + }; + // Create and start task (no stdout/stderr capture for services). + let mut tasks_client = TasksClient::new(channel.clone()); + let create_task_req = Self::with_namespace( + CreateTaskRequest { + container_id: container_id.to_string(), + rootfs: mounts, + stdin: String::new(), + stdout: String::new(), + stderr: String::new(), + terminal: false, + checkpoint: None, + options: None, + runtime_path: String::new(), + }, + namespace, + ); + tasks_client.create(create_task_req).await.map_err(|e| { + WfeError::StepExecution(format!("failed to create service task: {e}")) + })?; + + let start_req = Self::with_namespace( + StartRequest { + container_id: container_id.to_string(), + exec_id: String::new(), + }, + namespace, + ); + tasks_client.start(start_req).await.map_err(|e| { + WfeError::StepExecution(format!("failed to start service task: {e}")) + })?; + + tracing::info!(container_id = %container_id, image = %image, "service container started"); Ok(()) } - /// Stop and clean up a service container. - pub async fn cleanup_service(_addr: &str, container_id: &str) -> Result<(), WfeError> { - // Stop the container. - let _ = tokio::process::Command::new("nerdctl") - .args(["stop", container_id]) - .output() - .await; - - // Remove the container. - let _ = tokio::process::Command::new("nerdctl") - .args(["rm", "-f", container_id]) - .output() - .await; - - Ok(()) + /// Stop and clean up a service container via the containerd gRPC API. + pub async fn cleanup_service(addr: &str, container_id: &str) -> Result<(), WfeError> { + let channel = Self::connect(addr).await?; + Self::cleanup(&channel, container_id, DEFAULT_NAMESPACE).await } /// Parse `##wfe[output key=value]` lines from stdout. @@ -816,7 +900,7 @@ impl StepBody for ContainerdStep { impl ContainerdStep { /// Delete the task and container, best-effort. - async fn cleanup( + pub(crate) async fn cleanup( channel: &Channel, container_id: &str, namespace: &str,