diff --git a/wfe-kubernetes/src/lib.rs b/wfe-kubernetes/src/lib.rs index 83c48fa..3ff3ba1 100644 --- a/wfe-kubernetes/src/lib.rs +++ b/wfe-kubernetes/src/lib.rs @@ -5,7 +5,10 @@ pub mod logs; pub mod manifests; pub mod namespace; pub mod output; +pub mod service_manifests; +pub mod service_provider; pub mod step; pub use config::{ClusterConfig, KubernetesStepConfig}; +pub use service_provider::KubernetesServiceProvider; pub use step::KubernetesStep; diff --git a/wfe-kubernetes/src/service_manifests.rs b/wfe-kubernetes/src/service_manifests.rs new file mode 100644 index 0000000..d336545 --- /dev/null +++ b/wfe-kubernetes/src/service_manifests.rs @@ -0,0 +1,261 @@ +use std::collections::BTreeMap; + +use k8s_openapi::api::core::v1::{ + Container, ContainerPort, EnvVar, Pod, PodSpec, ResourceRequirements, Service, ServicePort, + ServiceSpec, +}; +use k8s_openapi::apimachinery::pkg::api::resource::Quantity; +use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString; +use kube::api::ObjectMeta; +use wfe_core::models::service::ServiceDefinition; + +const LABEL_SERVICE_NAME: &str = "wfe.sunbeam.pt/service-name"; +const LABEL_MANAGED_BY: &str = "wfe.sunbeam.pt/managed-by"; + +/// Build a Kubernetes Pod for a service definition. +pub fn build_service_pod(svc: &ServiceDefinition, namespace: &str) -> Pod { + let mut labels = BTreeMap::new(); + labels.insert(LABEL_SERVICE_NAME.into(), svc.name.clone()); + labels.insert(LABEL_MANAGED_BY.into(), "wfe-kubernetes".into()); + + let ports: Vec = svc + .ports + .iter() + .map(|p| ContainerPort { + container_port: p.container_port as i32, + name: p.name.clone(), + protocol: Some(p.protocol.clone()), + ..Default::default() + }) + .collect(); + + let env: Vec = svc + .env + .iter() + .map(|(k, v)| EnvVar { + name: k.clone(), + value: Some(v.clone()), + ..Default::default() + }) + .collect(); + + let mut limits = BTreeMap::new(); + let mut requests = BTreeMap::new(); + if let Some(ref mem) = svc.memory { + limits.insert("memory".into(), Quantity(mem.clone())); + requests.insert("memory".into(), Quantity(mem.clone())); + } + if let Some(ref cpu) = svc.cpu { + limits.insert("cpu".into(), Quantity(cpu.clone())); + requests.insert("cpu".into(), Quantity(cpu.clone())); + } + + let command = if svc.command.is_empty() { + None + } else { + Some(svc.command.clone()) + }; + let args = if svc.args.is_empty() { + None + } else { + Some(svc.args.clone()) + }; + + Pod { + metadata: ObjectMeta { + name: Some(svc.name.clone()), + namespace: Some(namespace.into()), + labels: Some(labels.clone()), + ..Default::default() + }, + spec: Some(PodSpec { + containers: vec![Container { + name: svc.name.clone(), + image: Some(svc.image.clone()), + ports: Some(ports), + env: Some(env), + command, + args, + resources: Some(ResourceRequirements { + limits: if limits.is_empty() { None } else { Some(limits) }, + requests: if requests.is_empty() { None } else { Some(requests) }, + ..Default::default() + }), + ..Default::default() + }], + restart_policy: Some("Always".into()), + ..Default::default() + }), + ..Default::default() + } +} + +/// Build a Kubernetes Service for DNS resolution of a service definition. +pub fn build_k8s_service(svc: &ServiceDefinition, namespace: &str) -> Service { + let mut selector = BTreeMap::new(); + selector.insert(LABEL_SERVICE_NAME.into(), svc.name.clone()); + + let ports: Vec = svc + .ports + .iter() + .enumerate() + .map(|(i, p)| ServicePort { + name: p.name.clone().or_else(|| Some(format!("port-{i}"))), + port: p.container_port as i32, + target_port: Some(IntOrString::Int(p.container_port as i32)), + protocol: Some(p.protocol.clone()), + ..Default::default() + }) + .collect(); + + Service { + metadata: ObjectMeta { + name: Some(svc.name.clone()), + namespace: Some(namespace.into()), + labels: Some({ + let mut labels = BTreeMap::new(); + labels.insert(LABEL_MANAGED_BY.into(), "wfe-kubernetes".into()); + labels + }), + ..Default::default() + }, + spec: Some(ServiceSpec { + selector: Some(selector), + ports: Some(ports), + ..Default::default() + }), + ..Default::default() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + use wfe_core::models::service::ServicePort as WfeServicePort; + + fn postgres_service() -> ServiceDefinition { + ServiceDefinition { + name: "postgres".into(), + image: "postgres:15".into(), + ports: vec![WfeServicePort::tcp(5432)], + env: [ + ("POSTGRES_PASSWORD".into(), "test".into()), + ("POSTGRES_DB".into(), "myapp".into()), + ] + .into(), + readiness: None, + command: vec![], + args: vec![], + memory: Some("512Mi".into()), + cpu: Some("500m".into()), + } + } + + #[test] + fn build_pod_basic() { + let svc = postgres_service(); + let pod = build_service_pod(&svc, "wfe-test"); + + assert_eq!(pod.metadata.name, Some("postgres".into())); + assert_eq!(pod.metadata.namespace, Some("wfe-test".into())); + + let spec = pod.spec.as_ref().unwrap(); + assert_eq!(spec.restart_policy, Some("Always".into())); + assert_eq!(spec.containers.len(), 1); + + let container = &spec.containers[0]; + assert_eq!(container.name, "postgres"); + assert_eq!(container.image, Some("postgres:15".into())); + + let ports = container.ports.as_ref().unwrap(); + assert_eq!(ports.len(), 1); + assert_eq!(ports[0].container_port, 5432); + + let env = container.env.as_ref().unwrap(); + assert!(env.iter().any(|e| e.name == "POSTGRES_PASSWORD")); + } + + #[test] + fn build_pod_with_resources() { + let svc = postgres_service(); + let pod = build_service_pod(&svc, "ns"); + let container = &pod.spec.unwrap().containers[0]; + let resources = container.resources.as_ref().unwrap(); + let limits = resources.limits.as_ref().unwrap(); + assert_eq!(limits.get("memory"), Some(&Quantity("512Mi".into()))); + assert_eq!(limits.get("cpu"), Some(&Quantity("500m".into()))); + } + + #[test] + fn build_pod_labels() { + let svc = postgres_service(); + let pod = build_service_pod(&svc, "ns"); + let labels = pod.metadata.labels.as_ref().unwrap(); + assert_eq!(labels.get(LABEL_SERVICE_NAME), Some(&"postgres".into())); + assert_eq!(labels.get(LABEL_MANAGED_BY), Some(&"wfe-kubernetes".into())); + } + + #[test] + fn build_pod_with_command() { + let svc = ServiceDefinition { + name: "custom".into(), + image: "myimage".into(), + ports: vec![], + env: Default::default(), + readiness: None, + command: vec!["/usr/bin/server".into()], + args: vec!["--port".into(), "8080".into()], + memory: None, + cpu: None, + }; + let pod = build_service_pod(&svc, "ns"); + let container = &pod.spec.unwrap().containers[0]; + assert_eq!(container.command, Some(vec!["/usr/bin/server".into()])); + assert_eq!(container.args, Some(vec!["--port".into(), "8080".into()])); + } + + #[test] + fn build_k8s_service_basic() { + let svc = postgres_service(); + let k8s_svc = build_k8s_service(&svc, "wfe-test"); + + assert_eq!(k8s_svc.metadata.name, Some("postgres".into())); + assert_eq!(k8s_svc.metadata.namespace, Some("wfe-test".into())); + + let spec = k8s_svc.spec.as_ref().unwrap(); + let selector = spec.selector.as_ref().unwrap(); + assert_eq!(selector.get(LABEL_SERVICE_NAME), Some(&"postgres".into())); + + let ports = spec.ports.as_ref().unwrap(); + assert_eq!(ports.len(), 1); + assert_eq!(ports[0].port, 5432); + assert_eq!( + ports[0].target_port, + Some(IntOrString::Int(5432)) + ); + } + + #[test] + fn build_k8s_service_multiple_ports() { + let svc = ServiceDefinition { + name: "app".into(), + image: "myapp".into(), + ports: vec![ + WfeServicePort::tcp(8080), + WfeServicePort::tcp(8443), + ], + env: Default::default(), + readiness: None, + command: vec![], + args: vec![], + memory: None, + cpu: None, + }; + let k8s_svc = build_k8s_service(&svc, "ns"); + let ports = k8s_svc.spec.unwrap().ports.unwrap(); + assert_eq!(ports.len(), 2); + assert_eq!(ports[0].port, 8080); + assert_eq!(ports[1].port, 8443); + } +} diff --git a/wfe-kubernetes/src/service_provider.rs b/wfe-kubernetes/src/service_provider.rs new file mode 100644 index 0000000..a742e83 --- /dev/null +++ b/wfe-kubernetes/src/service_provider.rs @@ -0,0 +1,116 @@ +use std::time::Duration; + +use async_trait::async_trait; +use k8s_openapi::api::core::v1::Pod; +use kube::api::PostParams; +use kube::{Api, Client}; +use wfe_core::models::service::{ServiceDefinition, ServiceEndpoint}; +use wfe_core::traits::ServiceProvider; +use wfe_core::WfeError; + +use crate::config::ClusterConfig; +use crate::logs::wait_for_pod_running; +use crate::namespace::{delete_namespace, ensure_namespace, namespace_name}; +use crate::service_manifests::{build_k8s_service, build_service_pod}; + +/// Provisions infrastructure services as Kubernetes Pods + Services. +/// +/// Each workflow gets its own namespace. Services are accessible by name +/// via Kubernetes DNS within the namespace. +pub struct KubernetesServiceProvider { + client: Client, + config: ClusterConfig, +} + +impl KubernetesServiceProvider { + pub fn new(client: Client, config: ClusterConfig) -> Self { + Self { client, config } + } +} + +#[async_trait] +impl ServiceProvider for KubernetesServiceProvider { + fn can_provision(&self, _services: &[ServiceDefinition]) -> bool { + true // K8s can run any container image + } + + async fn provision( + &self, + workflow_id: &str, + services: &[ServiceDefinition], + ) -> wfe_core::Result> { + let ns = namespace_name(&self.config.namespace_prefix, workflow_id); + ensure_namespace(&self.client, &ns, workflow_id).await?; + + let mut endpoints = Vec::new(); + + for svc in services { + // Create Pod. + let pod_manifest = build_service_pod(svc, &ns); + let pods: Api = Api::namespaced(self.client.clone(), &ns); + pods.create(&PostParams::default(), &pod_manifest) + .await + .map_err(|e| { + WfeError::StepExecution(format!( + "failed to create service pod '{}': {e}", + svc.name + )) + })?; + + // Create K8s Service for DNS. + let svc_manifest = build_k8s_service(svc, &ns); + let svcs: Api = + Api::namespaced(self.client.clone(), &ns); + svcs.create(&PostParams::default(), &svc_manifest) + .await + .map_err(|e| { + WfeError::StepExecution(format!( + "failed to create k8s service '{}': {e}", + svc.name + )) + })?; + + // Wait for pod readiness. + let timeout = svc + .readiness + .as_ref() + .map(|r| Duration::from_millis(r.timeout_ms)) + .unwrap_or(Duration::from_secs(120)); + + match tokio::time::timeout( + timeout, + wait_for_pod_running(&self.client, &ns, &svc.name), + ) + .await + { + Ok(Ok(())) => {} + Ok(Err(e)) => { + return Err(WfeError::StepExecution(format!( + "service '{}' pod failed to start: {e}", + svc.name + ))); + } + Err(_) => { + return Err(WfeError::StepExecution(format!( + "service '{}' readiness timed out after {}ms", + svc.name, + timeout.as_millis() + ))); + } + } + + endpoints.push(ServiceEndpoint { + name: svc.name.clone(), + host: svc.name.clone(), // K8s DNS resolves within namespace + ports: svc.ports.clone(), + }); + } + + Ok(endpoints) + } + + async fn teardown(&self, workflow_id: &str) -> wfe_core::Result<()> { + let ns = namespace_name(&self.config.namespace_prefix, workflow_id); + delete_namespace(&self.client, &ns).await + } +}