diff --git a/wfe-kubernetes/Cargo.toml b/wfe-kubernetes/Cargo.toml new file mode 100644 index 0000000..7d574dd --- /dev/null +++ b/wfe-kubernetes/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "wfe-kubernetes" +version.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true +homepage.workspace = true +description = "Kubernetes executor for WFE — runs workflow steps as K8s Jobs" + +[dependencies] +wfe-core = { workspace = true } +kube = { workspace = true } +k8s-openapi = { workspace = true } +tokio = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +async-trait = { workspace = true } +tracing = { workspace = true } +thiserror = { workspace = true } +chrono = { workspace = true } +futures = { workspace = true } + +[dev-dependencies] +pretty_assertions = { workspace = true } +tokio = { workspace = true, features = ["test-util"] } +tokio-util = "0.7" +wfe-core = { workspace = true, features = ["test-support"] } diff --git a/wfe-kubernetes/src/client.rs b/wfe-kubernetes/src/client.rs new file mode 100644 index 0000000..f196502 --- /dev/null +++ b/wfe-kubernetes/src/client.rs @@ -0,0 +1,57 @@ +use kube::Client; +use wfe_core::WfeError; + +use crate::config::ClusterConfig; + +/// Create a Kubernetes API client from the cluster configuration. +/// +/// Resolution order: +/// 1. If `kubeconfig` path is set, load from that file. +/// 2. Try in-cluster config (for pods running inside K8s). +/// 3. Fall back to default kubeconfig (~/.kube/config). +pub async fn create_client(config: &ClusterConfig) -> Result { + let kube_config = if let Some(ref path) = config.kubeconfig { + kube::Config::from_custom_kubeconfig( + kube::config::Kubeconfig::read_from(path).map_err(|e| { + WfeError::StepExecution(format!("failed to read kubeconfig at '{path}': {e}")) + })?, + &kube::config::KubeConfigOptions::default(), + ) + .await + .map_err(|e| { + WfeError::StepExecution(format!( + "failed to build config from kubeconfig '{path}': {e}" + )) + })? + } else { + kube::Config::infer() + .await + .map_err(|e| WfeError::StepExecution(format!("failed to infer K8s config: {e}")))? + }; + + Client::try_from(kube_config) + .map_err(|e| WfeError::StepExecution(format!("failed to create K8s client: {e}"))) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn create_client_with_bad_kubeconfig_path() { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + let config = ClusterConfig { + kubeconfig: Some("/nonexistent/kubeconfig".into()), + ..Default::default() + }; + let result = rt.block_on(create_client(&config)); + let err = match result { + Err(e) => format!("{e}"), + Ok(_) => panic!("expected error for nonexistent kubeconfig"), + }; + assert!(err.contains("kubeconfig")); + } +} diff --git a/wfe-kubernetes/src/config.rs b/wfe-kubernetes/src/config.rs new file mode 100644 index 0000000..de23e1a --- /dev/null +++ b/wfe-kubernetes/src/config.rs @@ -0,0 +1,146 @@ +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; + +/// Cluster-level configuration shared across all Kubernetes steps. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ClusterConfig { + /// Path to kubeconfig file. None uses in-cluster config or default ~/.kube/config. + #[serde(default)] + pub kubeconfig: Option, + /// Namespace prefix for auto-generated namespaces. Default: "wfe-". + #[serde(default = "default_namespace_prefix")] + pub namespace_prefix: String, + /// ServiceAccount name for Job pods. + #[serde(default)] + pub service_account: Option, + /// Image pull secret names. + #[serde(default)] + pub image_pull_secrets: Vec, + /// Node selector labels for Job pods. + #[serde(default)] + pub node_selector: HashMap, +} + +impl Default for ClusterConfig { + fn default() -> Self { + Self { + kubeconfig: None, + namespace_prefix: default_namespace_prefix(), + service_account: None, + image_pull_secrets: Vec::new(), + node_selector: HashMap::new(), + } + } +} + +fn default_namespace_prefix() -> String { + "wfe-".to_string() +} + +/// Per-step configuration for a Kubernetes Job execution. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct KubernetesStepConfig { + /// Container image to run. + pub image: String, + /// Override entrypoint. + #[serde(default)] + pub command: Option>, + /// Shorthand: runs via `/bin/sh -c "..."`. Mutually exclusive with `command`. + #[serde(default)] + pub run: Option, + /// Environment variables injected into the container. + #[serde(default)] + pub env: HashMap, + /// Working directory inside the container. + #[serde(default)] + pub working_dir: Option, + /// Memory limit (e.g., "512Mi", "1Gi"). + #[serde(default)] + pub memory: Option, + /// CPU limit (e.g., "500m", "1"). + #[serde(default)] + pub cpu: Option, + /// Execution timeout in milliseconds. + #[serde(default)] + pub timeout_ms: Option, + /// Image pull policy: Always, IfNotPresent, Never. + #[serde(default)] + pub pull_policy: Option, + /// Override the auto-generated namespace. + #[serde(default)] + pub namespace: Option, +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn cluster_config_defaults() { + let config = ClusterConfig::default(); + assert_eq!(config.namespace_prefix, "wfe-"); + assert!(config.kubeconfig.is_none()); + assert!(config.service_account.is_none()); + assert!(config.image_pull_secrets.is_empty()); + assert!(config.node_selector.is_empty()); + } + + #[test] + fn cluster_config_serde_round_trip() { + let config = ClusterConfig { + kubeconfig: Some("/home/user/.kube/config".into()), + namespace_prefix: "test-".into(), + service_account: Some("wfe-runner".into()), + image_pull_secrets: vec!["ghcr-secret".into()], + node_selector: [("tier".into(), "compute".into())].into(), + }; + let json = serde_json::to_string(&config).unwrap(); + let parsed: ClusterConfig = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed.namespace_prefix, "test-"); + assert_eq!(parsed.service_account, Some("wfe-runner".into())); + assert_eq!(parsed.image_pull_secrets, vec!["ghcr-secret"]); + } + + #[test] + fn step_config_minimal() { + let json = r#"{"image": "alpine:3.18"}"#; + let config: KubernetesStepConfig = serde_json::from_str(json).unwrap(); + assert_eq!(config.image, "alpine:3.18"); + assert!(config.command.is_none()); + assert!(config.run.is_none()); + assert!(config.env.is_empty()); + } + + #[test] + fn step_config_full_serde_round_trip() { + let config = KubernetesStepConfig { + image: "node:20-alpine".into(), + command: None, + run: Some("npm test".into()), + env: [("NODE_ENV".into(), "test".into())].into(), + working_dir: Some("/app".into()), + memory: Some("512Mi".into()), + cpu: Some("500m".into()), + timeout_ms: Some(300_000), + pull_policy: Some("IfNotPresent".into()), + namespace: None, + }; + let json = serde_json::to_string(&config).unwrap(); + let parsed: KubernetesStepConfig = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed.image, "node:20-alpine"); + assert_eq!(parsed.run, Some("npm test".into())); + assert_eq!(parsed.env.get("NODE_ENV"), Some(&"test".to_string())); + assert_eq!(parsed.memory, Some("512Mi".into())); + assert_eq!(parsed.timeout_ms, Some(300_000)); + } + + #[test] + fn step_config_with_command() { + let json = r#"{"image": "gcc:latest", "command": ["make", "build"]}"#; + let config: KubernetesStepConfig = serde_json::from_str(json).unwrap(); + assert_eq!(config.command, Some(vec!["make".into(), "build".into()])); + assert!(config.run.is_none()); + } +} diff --git a/wfe-kubernetes/src/namespace.rs b/wfe-kubernetes/src/namespace.rs new file mode 100644 index 0000000..0139857 --- /dev/null +++ b/wfe-kubernetes/src/namespace.rs @@ -0,0 +1,116 @@ +use std::collections::BTreeMap; + +use k8s_openapi::api::core::v1::Namespace; +use kube::api::{ObjectMeta, PostParams}; +use kube::{Api, Client}; +use wfe_core::WfeError; + +const LABEL_WORKFLOW_ID: &str = "wfe.sunbeam.pt/workflow-id"; +const LABEL_MANAGED_BY: &str = "wfe.sunbeam.pt/managed-by"; +const MANAGED_BY_VALUE: &str = "wfe-kubernetes"; + +/// Generate a namespace name from prefix + workflow ID, truncated to 63 chars. +pub fn namespace_name(prefix: &str, workflow_id: &str) -> String { + let raw = format!("{prefix}{workflow_id}"); + // K8s namespace names: max 63 chars, lowercase alphanumeric + hyphens + let sanitized: String = raw + .to_lowercase() + .chars() + .map(|c| if c.is_ascii_alphanumeric() || c == '-' { c } else { '-' }) + .take(63) + .collect(); + // Trim trailing hyphens + sanitized.trim_end_matches('-').to_string() +} + +/// Create a namespace if it doesn't already exist. +pub async fn ensure_namespace( + client: &Client, + name: &str, + workflow_id: &str, +) -> Result<(), WfeError> { + let api: Api = Api::all(client.clone()); + + // Check if it already exists. + match api.get(name).await { + Ok(_) => return Ok(()), + Err(kube::Error::Api(err)) if err.code == 404 => {} + Err(e) => { + return Err(WfeError::StepExecution(format!( + "failed to check namespace '{name}': {e}" + ))); + } + } + + let mut labels = BTreeMap::new(); + labels.insert(LABEL_WORKFLOW_ID.into(), workflow_id.to_string()); + labels.insert(LABEL_MANAGED_BY.into(), MANAGED_BY_VALUE.into()); + + let ns = Namespace { + metadata: ObjectMeta { + name: Some(name.to_string()), + labels: Some(labels), + ..Default::default() + }, + ..Default::default() + }; + + api.create(&PostParams::default(), &ns) + .await + .map_err(|e| WfeError::StepExecution(format!("failed to create namespace '{name}': {e}")))?; + + Ok(()) +} + +/// Delete a namespace and all resources within it. +pub async fn delete_namespace(client: &Client, name: &str) -> Result<(), WfeError> { + let api: Api = Api::all(client.clone()); + api.delete(name, &Default::default()) + .await + .map_err(|e| WfeError::StepExecution(format!("failed to delete namespace '{name}': {e}")))?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn namespace_name_simple() { + assert_eq!(namespace_name("wfe-", "abc123"), "wfe-abc123"); + } + + #[test] + fn namespace_name_truncates_to_63() { + let long_id = "a".repeat(100); + let name = namespace_name("wfe-", &long_id); + assert!(name.len() <= 63); + } + + #[test] + fn namespace_name_sanitizes_special_chars() { + assert_eq!( + namespace_name("wfe-", "my_workflow.v1"), + "wfe-my-workflow-v1" + ); + } + + #[test] + fn namespace_name_lowercases() { + assert_eq!(namespace_name("WFE-", "MyWorkflow"), "wfe-myworkflow"); + } + + #[test] + fn namespace_name_trims_trailing_hyphens() { + let id = "a".repeat(59) + "____"; + let name = namespace_name("wfe-", &id); + assert!(!name.ends_with('-')); + } + + #[test] + fn namespace_name_empty_id() { + // Trailing hyphen is trimmed. + assert_eq!(namespace_name("wfe-", ""), "wfe"); + } +}