fix(wfe-containerd): replace nerdctl CLI shelling with proper containerd gRPC API calls
This commit is contained in:
@@ -47,7 +47,7 @@ impl ContainerdStep {
|
|||||||
///
|
///
|
||||||
/// Supports Unix socket paths (bare `/path` or `unix:///path`) and
|
/// Supports Unix socket paths (bare `/path` or `unix:///path`) and
|
||||||
/// TCP/HTTP endpoints.
|
/// TCP/HTTP endpoints.
|
||||||
async fn connect(addr: &str) -> Result<Channel, WfeError> {
|
pub(crate) async fn connect(addr: &str) -> Result<Channel, WfeError> {
|
||||||
let channel = if addr.starts_with('/') || addr.starts_with("unix://") {
|
let channel = if addr.starts_with('/') || addr.starts_with("unix://") {
|
||||||
let socket_path = addr
|
let socket_path = addr
|
||||||
.strip_prefix("unix://")
|
.strip_prefix("unix://")
|
||||||
@@ -403,7 +403,7 @@ impl ContainerdStep {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Inject a `containerd-namespace` header into a tonic request.
|
/// Inject a `containerd-namespace` header into a tonic request.
|
||||||
fn with_namespace<T>(req: T, namespace: &str) -> tonic::Request<T> {
|
pub(crate) fn with_namespace<T>(req: T, namespace: &str) -> tonic::Request<T> {
|
||||||
let mut request = tonic::Request::new(req);
|
let mut request = tonic::Request::new(req);
|
||||||
request.metadata_mut().insert(
|
request.metadata_mut().insert(
|
||||||
"containerd-namespace",
|
"containerd-namespace",
|
||||||
@@ -412,64 +412,148 @@ impl ContainerdStep {
|
|||||||
request
|
request
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Start a long-running service container (does not wait for exit).
|
/// Start a long-running service container via the containerd gRPC API.
|
||||||
///
|
///
|
||||||
/// Used by `ContainerdServiceProvider` to provision infrastructure services.
|
/// Used by `ContainerdServiceProvider` to provision infrastructure services.
|
||||||
/// The container runs on the host network so its ports are accessible on 127.0.0.1.
|
/// The container runs on the host network so its ports are accessible on 127.0.0.1.
|
||||||
|
/// Unlike step execution, this does NOT wait for the container to exit.
|
||||||
pub async fn run_service(
|
pub async fn run_service(
|
||||||
_addr: &str,
|
addr: &str,
|
||||||
container_id: &str,
|
container_id: &str,
|
||||||
image: &str,
|
image: &str,
|
||||||
env: &std::collections::HashMap<String, String>,
|
env: &std::collections::HashMap<String, String>,
|
||||||
) -> Result<(), WfeError> {
|
) -> Result<(), WfeError> {
|
||||||
// TODO: Implement containerd service container lifecycle.
|
let namespace = DEFAULT_NAMESPACE;
|
||||||
// This requires refactoring the internal OCI spec builder and snapshot
|
let channel = Self::connect(addr).await?;
|
||||||
// preparation into reusable functions. For now, delegate to nerdctl CLI
|
|
||||||
// as a pragmatic fallback.
|
|
||||||
let mut cmd = tokio::process::Command::new("nerdctl");
|
|
||||||
cmd.arg("run")
|
|
||||||
.arg("-d")
|
|
||||||
.arg("--name")
|
|
||||||
.arg(container_id)
|
|
||||||
.arg("--network")
|
|
||||||
.arg("host");
|
|
||||||
|
|
||||||
for (k, v) in env {
|
// Verify image exists.
|
||||||
cmd.arg("-e").arg(format!("{k}={v}"));
|
Self::ensure_image(&channel, image, namespace).await?;
|
||||||
}
|
|
||||||
|
|
||||||
cmd.arg(image);
|
// Build a config for host-network service container.
|
||||||
|
let config = ContainerdConfig {
|
||||||
|
image: image.to_string(),
|
||||||
|
command: None,
|
||||||
|
run: None,
|
||||||
|
env: env.clone(),
|
||||||
|
volumes: vec![],
|
||||||
|
working_dir: None,
|
||||||
|
user: "0:0".to_string(),
|
||||||
|
network: "host".to_string(),
|
||||||
|
memory: None,
|
||||||
|
cpu: None,
|
||||||
|
pull: "if-not-present".to_string(),
|
||||||
|
containerd_addr: addr.to_string(),
|
||||||
|
cli: "nerdctl".to_string(),
|
||||||
|
tls: Default::default(),
|
||||||
|
registry_auth: Default::default(),
|
||||||
|
timeout_ms: None,
|
||||||
|
};
|
||||||
|
|
||||||
let output = cmd.output().await.map_err(|e| {
|
let step = Self::new(config);
|
||||||
WfeError::StepExecution(format!("failed to start service container via nerdctl: {e}"))
|
let oci_spec = step.build_oci_spec(env);
|
||||||
|
|
||||||
|
// Create container.
|
||||||
|
let mut containers_client = ContainersClient::new(channel.clone());
|
||||||
|
let create_req = Self::with_namespace(
|
||||||
|
CreateContainerRequest {
|
||||||
|
container: Some(Container {
|
||||||
|
id: container_id.to_string(),
|
||||||
|
image: image.to_string(),
|
||||||
|
runtime: Some(Runtime {
|
||||||
|
name: "io.containerd.runc.v2".to_string(),
|
||||||
|
options: None,
|
||||||
|
}),
|
||||||
|
spec: Some(oci_spec),
|
||||||
|
snapshotter: DEFAULT_SNAPSHOTTER.to_string(),
|
||||||
|
snapshot_key: container_id.to_string(),
|
||||||
|
labels: HashMap::new(),
|
||||||
|
created_at: None,
|
||||||
|
updated_at: None,
|
||||||
|
extensions: HashMap::new(),
|
||||||
|
sandbox: String::new(),
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
namespace,
|
||||||
|
);
|
||||||
|
containers_client.create(create_req).await.map_err(|e| {
|
||||||
|
WfeError::StepExecution(format!("failed to create service container: {e}"))
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
if !output.status.success() {
|
// Prepare snapshot.
|
||||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
let mut snapshots_client = SnapshotsClient::new(channel.clone());
|
||||||
return Err(WfeError::StepExecution(format!(
|
let mounts = {
|
||||||
"nerdctl run failed for service '{}': {stderr}",
|
let mounts_req = Self::with_namespace(
|
||||||
container_id
|
MountsRequest {
|
||||||
)));
|
snapshotter: DEFAULT_SNAPSHOTTER.to_string(),
|
||||||
}
|
key: container_id.to_string(),
|
||||||
|
},
|
||||||
|
namespace,
|
||||||
|
);
|
||||||
|
match snapshots_client.mounts(mounts_req).await {
|
||||||
|
Ok(resp) => resp.into_inner().mounts,
|
||||||
|
Err(_) => {
|
||||||
|
let parent =
|
||||||
|
Self::resolve_image_chain_id(&channel, image, namespace).await?;
|
||||||
|
let prepare_req = Self::with_namespace(
|
||||||
|
PrepareSnapshotRequest {
|
||||||
|
snapshotter: DEFAULT_SNAPSHOTTER.to_string(),
|
||||||
|
key: container_id.to_string(),
|
||||||
|
parent,
|
||||||
|
labels: HashMap::new(),
|
||||||
|
},
|
||||||
|
namespace,
|
||||||
|
);
|
||||||
|
snapshots_client
|
||||||
|
.prepare(prepare_req)
|
||||||
|
.await
|
||||||
|
.map_err(|e| {
|
||||||
|
WfeError::StepExecution(format!("failed to prepare snapshot: {e}"))
|
||||||
|
})?
|
||||||
|
.into_inner()
|
||||||
|
.mounts
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Create and start task (no stdout/stderr capture for services).
|
||||||
|
let mut tasks_client = TasksClient::new(channel.clone());
|
||||||
|
let create_task_req = Self::with_namespace(
|
||||||
|
CreateTaskRequest {
|
||||||
|
container_id: container_id.to_string(),
|
||||||
|
rootfs: mounts,
|
||||||
|
stdin: String::new(),
|
||||||
|
stdout: String::new(),
|
||||||
|
stderr: String::new(),
|
||||||
|
terminal: false,
|
||||||
|
checkpoint: None,
|
||||||
|
options: None,
|
||||||
|
runtime_path: String::new(),
|
||||||
|
},
|
||||||
|
namespace,
|
||||||
|
);
|
||||||
|
tasks_client.create(create_task_req).await.map_err(|e| {
|
||||||
|
WfeError::StepExecution(format!("failed to create service task: {e}"))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let start_req = Self::with_namespace(
|
||||||
|
StartRequest {
|
||||||
|
container_id: container_id.to_string(),
|
||||||
|
exec_id: String::new(),
|
||||||
|
},
|
||||||
|
namespace,
|
||||||
|
);
|
||||||
|
tasks_client.start(start_req).await.map_err(|e| {
|
||||||
|
WfeError::StepExecution(format!("failed to start service task: {e}"))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
tracing::info!(container_id = %container_id, image = %image, "service container started");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Stop and clean up a service container.
|
/// Stop and clean up a service container via the containerd gRPC API.
|
||||||
pub async fn cleanup_service(_addr: &str, container_id: &str) -> Result<(), WfeError> {
|
pub async fn cleanup_service(addr: &str, container_id: &str) -> Result<(), WfeError> {
|
||||||
// Stop the container.
|
let channel = Self::connect(addr).await?;
|
||||||
let _ = tokio::process::Command::new("nerdctl")
|
Self::cleanup(&channel, container_id, DEFAULT_NAMESPACE).await
|
||||||
.args(["stop", container_id])
|
|
||||||
.output()
|
|
||||||
.await;
|
|
||||||
|
|
||||||
// Remove the container.
|
|
||||||
let _ = tokio::process::Command::new("nerdctl")
|
|
||||||
.args(["rm", "-f", container_id])
|
|
||||||
.output()
|
|
||||||
.await;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse `##wfe[output key=value]` lines from stdout.
|
/// Parse `##wfe[output key=value]` lines from stdout.
|
||||||
@@ -816,7 +900,7 @@ impl StepBody for ContainerdStep {
|
|||||||
|
|
||||||
impl ContainerdStep {
|
impl ContainerdStep {
|
||||||
/// Delete the task and container, best-effort.
|
/// Delete the task and container, best-effort.
|
||||||
async fn cleanup(
|
pub(crate) async fn cleanup(
|
||||||
channel: &Channel,
|
channel: &Channel,
|
||||||
container_id: &str,
|
container_id: &str,
|
||||||
namespace: &str,
|
namespace: &str,
|
||||||
|
|||||||
Reference in New Issue
Block a user