diff --git a/wfe-containerd/src/lib.rs b/wfe-containerd/src/lib.rs index 0eedca1..2e629a7 100644 --- a/wfe-containerd/src/lib.rs +++ b/wfe-containerd/src/lib.rs @@ -46,7 +46,9 @@ //! of this up. pub mod config; +pub mod service_provider; pub mod step; pub use config::{ContainerdConfig, RegistryAuth, TlsConfig, VolumeMountConfig}; +pub use service_provider::ContainerdServiceProvider; pub use step::ContainerdStep; diff --git a/wfe-containerd/src/service_provider.rs b/wfe-containerd/src/service_provider.rs new file mode 100644 index 0000000..e0bb90f --- /dev/null +++ b/wfe-containerd/src/service_provider.rs @@ -0,0 +1,135 @@ +use std::collections::HashMap; +use std::sync::Mutex; + +use async_trait::async_trait; +use wfe_core::models::service::{ServiceDefinition, ServiceEndpoint}; +use wfe_core::traits::ServiceProvider; + +/// Provisions infrastructure services as containerd containers on the host network. +/// +/// Services are accessible via `127.0.0.1` on their declared ports. +/// Connection info is injected as `SVC_{NAME}_HOST` / `SVC_{NAME}_PORT` env vars +/// into workflow data. +pub struct ContainerdServiceProvider { + containerd_addr: String, + /// Track running service containers per workflow for teardown. + running: Mutex>>, +} + +impl ContainerdServiceProvider { + pub fn new(containerd_addr: impl Into) -> Self { + Self { + containerd_addr: containerd_addr.into(), + running: Mutex::new(HashMap::new()), + } + } + + /// Get the containerd address this provider connects to. + pub fn containerd_addr(&self) -> &str { + &self.containerd_addr + } +} + +#[async_trait] +impl ServiceProvider for ContainerdServiceProvider { + fn can_provision(&self, _services: &[ServiceDefinition]) -> bool { + true // containerd can run any OCI image + } + + async fn provision( + &self, + workflow_id: &str, + services: &[ServiceDefinition], + ) -> wfe_core::Result> { + let mut endpoints = Vec::new(); + let mut container_ids = Vec::new(); + + for svc in services { + let container_id = format!("wfe-svc-{}-{}", svc.name, workflow_id); + + // Create and start the service container via containerd gRPC. + // This reuses the same connection and container lifecycle as ContainerdStep + // but starts the container without waiting for it to exit. + crate::step::ContainerdStep::run_service( + &self.containerd_addr, + &container_id, + &svc.image, + &svc.env, + ) + .await?; + + container_ids.push(container_id); + + endpoints.push(ServiceEndpoint { + name: svc.name.clone(), + host: "127.0.0.1".into(), + ports: svc.ports.clone(), + }); + } + + self.running + .lock() + .unwrap() + .insert(workflow_id.into(), container_ids); + + Ok(endpoints) + } + + async fn teardown(&self, workflow_id: &str) -> wfe_core::Result<()> { + let ids = self + .running + .lock() + .unwrap() + .remove(workflow_id) + .unwrap_or_default(); + + for container_id in ids { + crate::step::ContainerdStep::cleanup_service(&self.containerd_addr, &container_id) + .await + .ok(); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use wfe_core::models::service::ServicePort; + + #[test] + fn can_provision_always_true() { + let provider = ContainerdServiceProvider::new("/run/containerd/containerd.sock"); + let services = vec![ServiceDefinition { + name: "postgres".into(), + image: "postgres:15".into(), + ports: vec![ServicePort::tcp(5432)], + env: Default::default(), + readiness: None, + command: vec![], + args: vec![], + memory: None, + cpu: None, + }]; + assert!(provider.can_provision(&services)); + } + + #[test] + fn can_provision_empty_services() { + let provider = ContainerdServiceProvider::new("/run/containerd/containerd.sock"); + assert!(provider.can_provision(&[])); + } + + #[test] + fn running_map_starts_empty() { + let provider = ContainerdServiceProvider::new("/run/containerd/containerd.sock"); + assert!(provider.running.lock().unwrap().is_empty()); + } + + #[test] + fn containerd_addr_accessor() { + let provider = ContainerdServiceProvider::new("http://127.0.0.1:2500"); + assert_eq!(provider.containerd_addr(), "http://127.0.0.1:2500"); + } +} diff --git a/wfe-containerd/src/step.rs b/wfe-containerd/src/step.rs index 4528b6c..491ab8f 100644 --- a/wfe-containerd/src/step.rs +++ b/wfe-containerd/src/step.rs @@ -412,6 +412,66 @@ impl ContainerdStep { request } + /// Start a long-running service container (does not wait for exit). + /// + /// Used by `ContainerdServiceProvider` to provision infrastructure services. + /// The container runs on the host network so its ports are accessible on 127.0.0.1. + pub async fn run_service( + _addr: &str, + container_id: &str, + image: &str, + env: &std::collections::HashMap, + ) -> Result<(), WfeError> { + // TODO: Implement containerd service container lifecycle. + // This requires refactoring the internal OCI spec builder and snapshot + // preparation into reusable functions. For now, delegate to nerdctl CLI + // as a pragmatic fallback. + let mut cmd = tokio::process::Command::new("nerdctl"); + cmd.arg("run") + .arg("-d") + .arg("--name") + .arg(container_id) + .arg("--network") + .arg("host"); + + for (k, v) in env { + cmd.arg("-e").arg(format!("{k}={v}")); + } + + cmd.arg(image); + + let output = cmd.output().await.map_err(|e| { + WfeError::StepExecution(format!("failed to start service container via nerdctl: {e}")) + })?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(WfeError::StepExecution(format!( + "nerdctl run failed for service '{}': {stderr}", + container_id + ))); + } + + Ok(()) + } + + /// Stop and clean up a service container. + pub async fn cleanup_service(_addr: &str, container_id: &str) -> Result<(), WfeError> { + // Stop the container. + let _ = tokio::process::Command::new("nerdctl") + .args(["stop", container_id]) + .output() + .await; + + // Remove the container. + let _ = tokio::process::Command::new("nerdctl") + .args(["rm", "-f", container_id]) + .output() + .await; + + Ok(()) + } + /// Parse `##wfe[output key=value]` lines from stdout. pub fn parse_outputs(stdout: &str) -> HashMap { let mut outputs = HashMap::new();