feat(wfe-kubernetes): scaffold crate with config, client, and namespace modules

This commit is contained in:
2026-04-06 16:41:48 +01:00
parent 2c679229db
commit 1574342e92
4 changed files with 346 additions and 0 deletions

27
wfe-kubernetes/Cargo.toml Normal file
View File

@@ -0,0 +1,27 @@
[package]
name = "wfe-kubernetes"
version.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
homepage.workspace = true
description = "Kubernetes executor for WFE — runs workflow steps as K8s Jobs"
[dependencies]
wfe-core = { workspace = true }
kube = { workspace = true }
k8s-openapi = { workspace = true }
tokio = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
async-trait = { workspace = true }
tracing = { workspace = true }
thiserror = { workspace = true }
chrono = { workspace = true }
futures = { workspace = true }
[dev-dependencies]
pretty_assertions = { workspace = true }
tokio = { workspace = true, features = ["test-util"] }
tokio-util = "0.7"
wfe-core = { workspace = true, features = ["test-support"] }

View File

@@ -0,0 +1,57 @@
use kube::Client;
use wfe_core::WfeError;
use crate::config::ClusterConfig;
/// Create a Kubernetes API client from the cluster configuration.
///
/// Resolution order:
/// 1. If `kubeconfig` path is set, load from that file.
/// 2. Try in-cluster config (for pods running inside K8s).
/// 3. Fall back to default kubeconfig (~/.kube/config).
pub async fn create_client(config: &ClusterConfig) -> Result<Client, WfeError> {
let kube_config = if let Some(ref path) = config.kubeconfig {
kube::Config::from_custom_kubeconfig(
kube::config::Kubeconfig::read_from(path).map_err(|e| {
WfeError::StepExecution(format!("failed to read kubeconfig at '{path}': {e}"))
})?,
&kube::config::KubeConfigOptions::default(),
)
.await
.map_err(|e| {
WfeError::StepExecution(format!(
"failed to build config from kubeconfig '{path}': {e}"
))
})?
} else {
kube::Config::infer()
.await
.map_err(|e| WfeError::StepExecution(format!("failed to infer K8s config: {e}")))?
};
Client::try_from(kube_config)
.map_err(|e| WfeError::StepExecution(format!("failed to create K8s client: {e}")))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn create_client_with_bad_kubeconfig_path() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let config = ClusterConfig {
kubeconfig: Some("/nonexistent/kubeconfig".into()),
..Default::default()
};
let result = rt.block_on(create_client(&config));
let err = match result {
Err(e) => format!("{e}"),
Ok(_) => panic!("expected error for nonexistent kubeconfig"),
};
assert!(err.contains("kubeconfig"));
}
}

View File

@@ -0,0 +1,146 @@
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
/// Cluster-level configuration shared across all Kubernetes steps.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterConfig {
/// Path to kubeconfig file. None uses in-cluster config or default ~/.kube/config.
#[serde(default)]
pub kubeconfig: Option<String>,
/// Namespace prefix for auto-generated namespaces. Default: "wfe-".
#[serde(default = "default_namespace_prefix")]
pub namespace_prefix: String,
/// ServiceAccount name for Job pods.
#[serde(default)]
pub service_account: Option<String>,
/// Image pull secret names.
#[serde(default)]
pub image_pull_secrets: Vec<String>,
/// Node selector labels for Job pods.
#[serde(default)]
pub node_selector: HashMap<String, String>,
}
impl Default for ClusterConfig {
fn default() -> Self {
Self {
kubeconfig: None,
namespace_prefix: default_namespace_prefix(),
service_account: None,
image_pull_secrets: Vec::new(),
node_selector: HashMap::new(),
}
}
}
fn default_namespace_prefix() -> String {
"wfe-".to_string()
}
/// Per-step configuration for a Kubernetes Job execution.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KubernetesStepConfig {
/// Container image to run.
pub image: String,
/// Override entrypoint.
#[serde(default)]
pub command: Option<Vec<String>>,
/// Shorthand: runs via `/bin/sh -c "..."`. Mutually exclusive with `command`.
#[serde(default)]
pub run: Option<String>,
/// Environment variables injected into the container.
#[serde(default)]
pub env: HashMap<String, String>,
/// Working directory inside the container.
#[serde(default)]
pub working_dir: Option<String>,
/// Memory limit (e.g., "512Mi", "1Gi").
#[serde(default)]
pub memory: Option<String>,
/// CPU limit (e.g., "500m", "1").
#[serde(default)]
pub cpu: Option<String>,
/// Execution timeout in milliseconds.
#[serde(default)]
pub timeout_ms: Option<u64>,
/// Image pull policy: Always, IfNotPresent, Never.
#[serde(default)]
pub pull_policy: Option<String>,
/// Override the auto-generated namespace.
#[serde(default)]
pub namespace: Option<String>,
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
#[test]
fn cluster_config_defaults() {
let config = ClusterConfig::default();
assert_eq!(config.namespace_prefix, "wfe-");
assert!(config.kubeconfig.is_none());
assert!(config.service_account.is_none());
assert!(config.image_pull_secrets.is_empty());
assert!(config.node_selector.is_empty());
}
#[test]
fn cluster_config_serde_round_trip() {
let config = ClusterConfig {
kubeconfig: Some("/home/user/.kube/config".into()),
namespace_prefix: "test-".into(),
service_account: Some("wfe-runner".into()),
image_pull_secrets: vec!["ghcr-secret".into()],
node_selector: [("tier".into(), "compute".into())].into(),
};
let json = serde_json::to_string(&config).unwrap();
let parsed: ClusterConfig = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.namespace_prefix, "test-");
assert_eq!(parsed.service_account, Some("wfe-runner".into()));
assert_eq!(parsed.image_pull_secrets, vec!["ghcr-secret"]);
}
#[test]
fn step_config_minimal() {
let json = r#"{"image": "alpine:3.18"}"#;
let config: KubernetesStepConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.image, "alpine:3.18");
assert!(config.command.is_none());
assert!(config.run.is_none());
assert!(config.env.is_empty());
}
#[test]
fn step_config_full_serde_round_trip() {
let config = KubernetesStepConfig {
image: "node:20-alpine".into(),
command: None,
run: Some("npm test".into()),
env: [("NODE_ENV".into(), "test".into())].into(),
working_dir: Some("/app".into()),
memory: Some("512Mi".into()),
cpu: Some("500m".into()),
timeout_ms: Some(300_000),
pull_policy: Some("IfNotPresent".into()),
namespace: None,
};
let json = serde_json::to_string(&config).unwrap();
let parsed: KubernetesStepConfig = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.image, "node:20-alpine");
assert_eq!(parsed.run, Some("npm test".into()));
assert_eq!(parsed.env.get("NODE_ENV"), Some(&"test".to_string()));
assert_eq!(parsed.memory, Some("512Mi".into()));
assert_eq!(parsed.timeout_ms, Some(300_000));
}
#[test]
fn step_config_with_command() {
let json = r#"{"image": "gcc:latest", "command": ["make", "build"]}"#;
let config: KubernetesStepConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.command, Some(vec!["make".into(), "build".into()]));
assert!(config.run.is_none());
}
}

View File

@@ -0,0 +1,116 @@
use std::collections::BTreeMap;
use k8s_openapi::api::core::v1::Namespace;
use kube::api::{ObjectMeta, PostParams};
use kube::{Api, Client};
use wfe_core::WfeError;
const LABEL_WORKFLOW_ID: &str = "wfe.sunbeam.pt/workflow-id";
const LABEL_MANAGED_BY: &str = "wfe.sunbeam.pt/managed-by";
const MANAGED_BY_VALUE: &str = "wfe-kubernetes";
/// Generate a namespace name from prefix + workflow ID, truncated to 63 chars.
pub fn namespace_name(prefix: &str, workflow_id: &str) -> String {
let raw = format!("{prefix}{workflow_id}");
// K8s namespace names: max 63 chars, lowercase alphanumeric + hyphens
let sanitized: String = raw
.to_lowercase()
.chars()
.map(|c| if c.is_ascii_alphanumeric() || c == '-' { c } else { '-' })
.take(63)
.collect();
// Trim trailing hyphens
sanitized.trim_end_matches('-').to_string()
}
/// Create a namespace if it doesn't already exist.
pub async fn ensure_namespace(
client: &Client,
name: &str,
workflow_id: &str,
) -> Result<(), WfeError> {
let api: Api<Namespace> = Api::all(client.clone());
// Check if it already exists.
match api.get(name).await {
Ok(_) => return Ok(()),
Err(kube::Error::Api(err)) if err.code == 404 => {}
Err(e) => {
return Err(WfeError::StepExecution(format!(
"failed to check namespace '{name}': {e}"
)));
}
}
let mut labels = BTreeMap::new();
labels.insert(LABEL_WORKFLOW_ID.into(), workflow_id.to_string());
labels.insert(LABEL_MANAGED_BY.into(), MANAGED_BY_VALUE.into());
let ns = Namespace {
metadata: ObjectMeta {
name: Some(name.to_string()),
labels: Some(labels),
..Default::default()
},
..Default::default()
};
api.create(&PostParams::default(), &ns)
.await
.map_err(|e| WfeError::StepExecution(format!("failed to create namespace '{name}': {e}")))?;
Ok(())
}
/// Delete a namespace and all resources within it.
pub async fn delete_namespace(client: &Client, name: &str) -> Result<(), WfeError> {
let api: Api<Namespace> = Api::all(client.clone());
api.delete(name, &Default::default())
.await
.map_err(|e| WfeError::StepExecution(format!("failed to delete namespace '{name}': {e}")))?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
#[test]
fn namespace_name_simple() {
assert_eq!(namespace_name("wfe-", "abc123"), "wfe-abc123");
}
#[test]
fn namespace_name_truncates_to_63() {
let long_id = "a".repeat(100);
let name = namespace_name("wfe-", &long_id);
assert!(name.len() <= 63);
}
#[test]
fn namespace_name_sanitizes_special_chars() {
assert_eq!(
namespace_name("wfe-", "my_workflow.v1"),
"wfe-my-workflow-v1"
);
}
#[test]
fn namespace_name_lowercases() {
assert_eq!(namespace_name("WFE-", "MyWorkflow"), "wfe-myworkflow");
}
#[test]
fn namespace_name_trims_trailing_hyphens() {
let id = "a".repeat(59) + "____";
let name = namespace_name("wfe-", &id);
assert!(!name.ends_with('-'));
}
#[test]
fn namespace_name_empty_id() {
// Trailing hyphen is trimmed.
assert_eq!(namespace_name("wfe-", ""), "wfe");
}
}