diff --git a/wfe-kubernetes/tests/integration.rs b/wfe-kubernetes/tests/integration.rs index d739d84..e01c407 100644 --- a/wfe-kubernetes/tests/integration.rs +++ b/wfe-kubernetes/tests/integration.rs @@ -1,10 +1,13 @@ use std::collections::HashMap; +use wfe_core::models::service::{ReadinessCheck, ReadinessProbe, ServiceDefinition, ServicePort}; use wfe_core::traits::step::StepBody; +use wfe_core::traits::ServiceProvider; use wfe_kubernetes::config::{ClusterConfig, KubernetesStepConfig}; use wfe_kubernetes::namespace; use wfe_kubernetes::cleanup; use wfe_kubernetes::client; +use wfe_kubernetes::KubernetesServiceProvider; /// Path to the Lima sunbeam VM kubeconfig. fn kubeconfig_path() -> String { @@ -20,6 +23,16 @@ fn cluster_config() -> ClusterConfig { } } +/// Generate a unique workflow ID to avoid namespace collisions between test runs. +fn unique_id(prefix: &str) -> String { + use std::time::{SystemTime, UNIX_EPOCH}; + let ts = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis(); + format!("{prefix}-{ts}") +} + fn step_config(image: &str, run: &str) -> KubernetesStepConfig { KubernetesStepConfig { image: image.into(), @@ -66,13 +79,13 @@ async fn run_echo_job() { let config = cluster_config(); let k8s_client = client::create_client(&config).await.unwrap(); - let step_cfg = step_config("alpine:3.18", "echo 'hello from k8s'"); + let ns = unique_id("echo"); + let mut step_cfg = step_config("alpine:3.18", "echo 'hello from k8s'"); + step_cfg.namespace = Some(ns.clone()); let mut step = wfe_kubernetes::KubernetesStep::new(step_cfg, config.clone(), k8s_client.clone()); - let ns = namespace::namespace_name(&config.namespace_prefix, "echo-test"); - // Build a minimal StepExecutionContext. let instance = wfe_core::models::WorkflowInstance::new("echo-wf", 1, serde_json::json!({})); let mut ws = wfe_core::models::WorkflowStep::new(0, "alpine-echo"); @@ -109,16 +122,16 @@ async fn run_job_with_wfe_output() { let config = cluster_config(); let k8s_client = client::create_client(&config).await.unwrap(); - let step_cfg = step_config( + let ns = unique_id("output"); + let mut step_cfg = step_config( "alpine:3.18", r#"echo '##wfe[output version=1.2.3]' && echo '##wfe[output status=ok]'"#, ); + step_cfg.namespace = Some(ns.clone()); let mut step = wfe_kubernetes::KubernetesStep::new(step_cfg, config.clone(), k8s_client.clone()); - let ns = namespace::namespace_name(&config.namespace_prefix, "output-test"); - let instance = wfe_core::models::WorkflowInstance::new("output-wf", 1, serde_json::json!({})); let mut ws = wfe_core::models::WorkflowStep::new(0, "alpine-output"); @@ -151,14 +164,14 @@ async fn run_job_with_env_vars() { let config = cluster_config(); let k8s_client = client::create_client(&config).await.unwrap(); + let ns = unique_id("env"); let mut step_cfg = step_config("alpine:3.18", "echo \"##wfe[output greeting=$GREETING]\""); step_cfg.env.insert("GREETING".into(), "hello".into()); + step_cfg.namespace = Some(ns.clone()); let mut step = wfe_kubernetes::KubernetesStep::new(step_cfg, config.clone(), k8s_client.clone()); - let ns = namespace::namespace_name(&config.namespace_prefix, "env-test"); - let instance = wfe_core::models::WorkflowInstance::new( "env-wf", 1, @@ -193,13 +206,13 @@ async fn run_job_nonzero_exit_fails() { let config = cluster_config(); let k8s_client = client::create_client(&config).await.unwrap(); - let step_cfg = step_config("alpine:3.18", "exit 1"); + let ns = unique_id("fail"); + let mut step_cfg = step_config("alpine:3.18", "exit 1"); + step_cfg.namespace = Some(ns.clone()); let mut step = wfe_kubernetes::KubernetesStep::new(step_cfg, config.clone(), k8s_client.clone()); - let ns = namespace::namespace_name(&config.namespace_prefix, "fail-test"); - let instance = wfe_core::models::WorkflowInstance::new("fail-wf", 1, serde_json::json!({})); let mut ws = wfe_core::models::WorkflowStep::new(0, "alpine-fail"); ws.name = Some("fail-step".into()); @@ -229,14 +242,14 @@ async fn run_job_with_timeout() { let config = cluster_config(); let k8s_client = client::create_client(&config).await.unwrap(); + let ns = unique_id("timeout"); let mut step_cfg = step_config("alpine:3.18", "sleep 60"); step_cfg.timeout_ms = Some(5_000); // 5 second timeout + step_cfg.namespace = Some(ns.clone()); let mut step = wfe_kubernetes::KubernetesStep::new(step_cfg, config.clone(), k8s_client.clone()); - let ns = namespace::namespace_name(&config.namespace_prefix, "timeout-test"); - let instance = wfe_core::models::WorkflowInstance::new("timeout-wf", 1, serde_json::json!({})); let mut ws = wfe_core::models::WorkflowStep::new(0, "alpine-timeout"); @@ -298,3 +311,231 @@ async fn delete_nonexistent_job_is_ok() { .await .unwrap(); } + +// ── Service Provider ───────────────────────────────────────────────── + +fn nginx_service() -> ServiceDefinition { + ServiceDefinition { + name: "nginx".into(), + image: "nginx:alpine".into(), + ports: vec![ServicePort::tcp(80)], + env: HashMap::new(), + readiness: Some(ReadinessProbe { + check: ReadinessCheck::TcpSocket(80), + interval_ms: 2000, + timeout_ms: 60000, + retries: 30, + }), + command: vec![], + args: vec![], + memory: Some("64Mi".into()), + cpu: Some("100m".into()), + } +} + +fn redis_service() -> ServiceDefinition { + ServiceDefinition { + name: "redis".into(), + image: "redis:7-alpine".into(), + ports: vec![ServicePort::tcp(6379)], + env: HashMap::new(), + readiness: Some(ReadinessProbe { + check: ReadinessCheck::TcpSocket(6379), + interval_ms: 2000, + timeout_ms: 60000, + retries: 30, + }), + command: vec![], + args: vec![], + memory: Some("64Mi".into()), + cpu: None, + } +} + +#[tokio::test] +async fn service_provider_can_provision() { + let config = cluster_config(); + let k8s_client = client::create_client(&config).await.unwrap(); + let provider = KubernetesServiceProvider::new(k8s_client, config); + + assert!(provider.can_provision(&[nginx_service()])); + assert!(provider.can_provision(&[])); + assert!(provider.can_provision(&[nginx_service(), redis_service()])); +} + +#[tokio::test] +async fn service_provider_provision_single_service() { + let config = cluster_config(); + let k8s_client = client::create_client(&config).await.unwrap(); + let provider = KubernetesServiceProvider::new(k8s_client, config); + + let workflow_id = &unique_id("svc-single"); + let services = vec![nginx_service()]; + + let endpoints = provider.provision(workflow_id, &services).await.unwrap(); + + assert_eq!(endpoints.len(), 1); + assert_eq!(endpoints[0].name, "nginx"); + assert_eq!(endpoints[0].host, "nginx"); // K8s DNS name + assert_eq!(endpoints[0].ports.len(), 1); + assert_eq!(endpoints[0].ports[0].container_port, 80); + + // Teardown. + provider.teardown(workflow_id).await.unwrap(); +} + +#[tokio::test] +async fn service_provider_provision_multiple_services() { + let config = cluster_config(); + let k8s_client = client::create_client(&config).await.unwrap(); + let provider = KubernetesServiceProvider::new(k8s_client, config); + + let workflow_id = &unique_id("svc-multi"); + let services = vec![nginx_service(), redis_service()]; + + let endpoints = provider.provision(workflow_id, &services).await.unwrap(); + + assert_eq!(endpoints.len(), 2); + let names: Vec<&str> = endpoints.iter().map(|e| e.name.as_str()).collect(); + assert!(names.contains(&"nginx")); + assert!(names.contains(&"redis")); + + provider.teardown(workflow_id).await.unwrap(); +} + +#[tokio::test] +async fn service_provider_teardown_is_idempotent() { + let config = cluster_config(); + let k8s_client = client::create_client(&config).await.unwrap(); + let provider = KubernetesServiceProvider::new(k8s_client, config); + + let workflow_id = &unique_id("svc-teardown"); + let services = vec![nginx_service()]; + + provider.provision(workflow_id, &services).await.unwrap(); + provider.teardown(workflow_id).await.unwrap(); + + // Second teardown should not error (namespace already deleting/gone). + // Give K8s a moment to process the deletion. + tokio::time::sleep(std::time::Duration::from_secs(2)).await; + // This may error since namespace is gone, but shouldn't panic. + let _ = provider.teardown(workflow_id).await; +} + +#[tokio::test] +async fn service_provider_provision_bad_image_fails() { + let config = cluster_config(); + let k8s_client = client::create_client(&config).await.unwrap(); + let provider = KubernetesServiceProvider::new(k8s_client, config); + + let workflow_id = &unique_id("svc-badimg"); + let services = vec![ServiceDefinition { + name: "bad".into(), + image: "nonexistent-registry.example.com/no-such-image:latest".into(), + ports: vec![ServicePort::tcp(9999)], + env: HashMap::new(), + readiness: Some(ReadinessProbe { + check: ReadinessCheck::TcpSocket(9999), + interval_ms: 1000, + timeout_ms: 8000, // Short timeout so test doesn't hang. + retries: 4, + }), + command: vec![], + args: vec![], + memory: None, + cpu: None, + }]; + + // The pod will be created but will never become ready (image pull will fail). + let result = provider.provision(workflow_id, &services).await; + assert!(result.is_err()); + let err = format!("{}", result.unwrap_err()); + assert!( + err.contains("timed out") || err.contains("failed"), + "expected timeout or failure error, got: {err}" + ); + + // Cleanup. + provider.teardown(workflow_id).await.ok(); +} + +#[tokio::test] +async fn service_provider_provision_duplicate_name_fails() { + let config = cluster_config(); + let k8s_client = client::create_client(&config).await.unwrap(); + let provider = KubernetesServiceProvider::new(k8s_client, config); + + let workflow_id = &unique_id("svc-dup"); + let svc = ServiceDefinition { + name: "dupname".into(), + image: "nginx:alpine".into(), + ports: vec![ServicePort::tcp(80)], + env: HashMap::new(), + readiness: Some(ReadinessProbe { + check: ReadinessCheck::TcpSocket(80), + interval_ms: 2000, + timeout_ms: 30000, + retries: 15, + }), + command: vec![], + args: vec![], + memory: None, + cpu: None, + }; + + // First provision succeeds. + let endpoints = provider.provision(workflow_id, &[svc.clone()]).await.unwrap(); + assert_eq!(endpoints.len(), 1); + + // Second provision with same name should fail (pod already exists). + let result = provider.provision(workflow_id, &[svc]).await; + assert!(result.is_err()); + let err = format!("{}", result.unwrap_err()); + assert!(err.contains("failed to create"), "unexpected error: {err}"); + + provider.teardown(workflow_id).await.ok(); +} + +#[tokio::test] +async fn service_provider_provision_service_object_conflict() { + // Pre-create a K8s Service to cause a conflict when the provider tries to create one. + let config = cluster_config(); + let k8s_client = client::create_client(&config).await.unwrap(); + let provider = KubernetesServiceProvider::new(k8s_client.clone(), config.clone()); + + let workflow_id = &unique_id("svc-conflict"); + let ns = namespace::namespace_name(&config.namespace_prefix, workflow_id); + namespace::ensure_namespace(&k8s_client, &ns, workflow_id).await.unwrap(); + + // Pre-create just the K8s Service (not the pod). + let svc_def = nginx_service(); + let svc_manifest = wfe_kubernetes::service_manifests::build_k8s_service(&svc_def, &ns); + let svcs: kube::Api = + kube::Api::namespaced(k8s_client.clone(), &ns); + svcs.create(&kube::api::PostParams::default(), &svc_manifest) + .await + .unwrap(); + + // Now provision — pod will create fine but service will conflict. + let result = provider.provision(workflow_id, &[svc_def]).await; + assert!(result.is_err()); + let err = format!("{}", result.unwrap_err()); + assert!( + err.contains("failed to create k8s service"), + "unexpected error: {err}" + ); + + provider.teardown(workflow_id).await.ok(); +} + +#[tokio::test] +async fn service_provider_teardown_without_provision() { + let config = cluster_config(); + let k8s_client = client::create_client(&config).await.unwrap(); + let provider = KubernetesServiceProvider::new(k8s_client, config); + + // Teardown a workflow that was never provisioned -- should error gracefully. + let result = provider.teardown("never-provisioned").await; + // Namespace doesn't exist, so delete_namespace returns an error. + assert!(result.is_err()); +}