diff --git a/src/service_cmds.rs b/src/service_cmds.rs new file mode 100644 index 00000000..73777570 --- /dev/null +++ b/src/service_cmds.rs @@ -0,0 +1,201 @@ +//! Service-oriented CLI commands: deploy, secrets, shell. +//! +//! These commands use the service registry for name resolution. + +use crate::cli::SecretsAction; +use crate::error::{Result, SunbeamError}; +use crate::output::{ok, step, warn}; +use sunbeam_sdk::registry::{self, ServiceRegistry}; + +/// Discover the service registry from the cluster. +async fn get_registry() -> Result { + let client = crate::kube::get_client().await?; + registry::discover(client).await + .map_err(|e| SunbeamError::Other(format!("service discovery failed: {e}"))) +} + +/// Deploy service(s) by name, category, or namespace. +/// +/// Applies manifests for each unique namespace, then rollout-restarts the +/// specific deployments that belong to the resolved services. +pub async fn cmd_deploy(target: &str, domain: &str, email: &str) -> Result<()> { + let reg = get_registry().await?; + let resolved = reg.resolve(target); + if resolved.is_empty() { + bail!("Unknown service: '{target}'. Try 'sunbeam deploy --all' or a service name like 'hydra'."); + } + + // Get unique namespaces + let mut namespaces: Vec<&str> = resolved.iter().map(|s| s.namespace.as_str()).collect(); + namespaces.sort_unstable(); + namespaces.dedup(); + + // Apply manifests for each namespace + let is_production = !crate::config::active_context().ssh_host.is_empty(); + let env_str = if is_production { "production" } else { "local" }; + for ns in &namespaces { + step(&format!("Applying manifests for {ns}...")); + crate::manifests::cmd_apply(env_str, domain, email, ns).await?; + } + + // Rollout restart the specific deployments + for svc in &resolved { + for deploy in &svc.deployments { + step(&format!("Restarting {}/{}...", svc.namespace, deploy)); + crate::kube::kube_rollout_restart(&svc.namespace, deploy).await?; + } + } + + ok("Deploy complete."); + Ok(()) +} + +/// View or get secrets for a service from OpenBao. +pub async fn cmd_secrets(service: &str, action: Option) -> Result<()> { + let reg = get_registry().await?; + let svc = reg.get(service) + .ok_or_else(|| SunbeamError::Other(format!("Unknown service: '{service}'")))?; + + let kv_path = svc + .kv_path + .as_deref() + .ok_or_else(|| SunbeamError::Other(format!("Service '{service}' has no secrets in OpenBao")))?; + + // Port-forward to OpenBao and read the secret + let ob_pod = crate::kube::find_pod_by_label( + "data", + "app.kubernetes.io/name=openbao,component=server", + ) + .await + .ok_or_else(|| SunbeamError::Other("OpenBao pod not found".into()))?; + + let pf = crate::secrets::port_forward("data", &ob_pod, 8200).await?; + let bao_url = format!("http://127.0.0.1:{}", pf.local_port); + + // Get root token from k8s secret + let token = crate::kube::kube_get_secret_field("data", "openbao-keys", "root-token") + .await + .map_err(|_| SunbeamError::Other("Failed to get OpenBao root token".into()))?; + + let bao = crate::openbao::BaoClient::with_token(&bao_url, &token); + + match action { + None => { + // List all fields for the service + match bao.kv_get("secret", kv_path).await? { + Some(data) => { + step(&format!("Secrets for {service} (secret/{kv_path}):")); + let mut keys: Vec<&String> = data.keys().collect(); + keys.sort(); + for key in keys { + let value = &data[key]; + // Mask values longer than 8 chars for security + let display = if value.len() > 8 { + format!("{}...{}", &value[..4], &value[value.len() - 4..]) + } else { + value.clone() + }; + println!(" {key}: {display}"); + } + } + None => { + warn(&format!("No secrets found at secret/{kv_path}")); + } + } + } + Some(SecretsAction::Get { key }) => { + let value = bao.kv_get_field("secret", kv_path, &key).await?; + if value.is_empty() { + warn(&format!("Field '{key}' not found in secret/{kv_path}")); + } else { + println!("{value}"); + } + } + } + + Ok(()) +} + +/// Interactive shell into a service pod. +/// +/// Special-cases postgres for psql; everything else gets `/bin/sh`. +pub async fn cmd_shell(service: &str) -> Result<()> { + let reg = get_registry().await?; + let svc = reg.get(service) + .ok_or_else(|| SunbeamError::Other(format!("Unknown service: '{service}'")))?; + + let context = crate::kube::context(); + + match service { + "postgres" => { + step("Connecting to PostgreSQL primary..."); + let pod = crate::kube::find_pod_by_label( + "data", + "cnpg.io/cluster=postgres,role=primary", + ) + .await + .ok_or_else(|| SunbeamError::Other("PostgreSQL primary pod not found".into()))?; + + let status = tokio::process::Command::new("kubectl") + .args([ + &format!("--context={context}"), + "exec", + "-it", + "-n", + "data", + &pod, + "--", + "psql", + "-U", + "postgres", + ]) + .stdin(std::process::Stdio::inherit()) + .stdout(std::process::Stdio::inherit()) + .stderr(std::process::Stdio::inherit()) + .status() + .await + .map_err(|e| SunbeamError::Other(format!("Failed to exec into pod: {e}")))?; + + if !status.success() { + warn("psql session ended with non-zero exit code"); + } + Ok(()) + } + _ => { + if svc.deployments.is_empty() { + bail!("Service '{service}' has no deployments"); + } + let deploy = &svc.deployments[0]; + let pod = crate::kube::find_pod_by_label( + &svc.namespace, + &format!("app={deploy}"), + ) + .await + .ok_or_else(|| SunbeamError::Other(format!("No pod found for {service}")))?; + + step(&format!("Connecting to {service} ({pod})...")); + let status = tokio::process::Command::new("kubectl") + .args([ + &format!("--context={context}"), + "exec", + "-it", + "-n", + &svc.namespace, + &pod, + "--", + "/bin/sh", + ]) + .stdin(std::process::Stdio::inherit()) + .stdout(std::process::Stdio::inherit()) + .stderr(std::process::Stdio::inherit()) + .status() + .await + .map_err(|e| SunbeamError::Other(format!("Failed to exec into pod: {e}")))?; + + if !status.success() { + warn("Shell session ended with non-zero exit code"); + } + Ok(()) + } + } +} diff --git a/src/services.rs b/src/services.rs index 1f691090..53c68460 100644 --- a/src/services.rs +++ b/src/services.rs @@ -4,27 +4,20 @@ use crate::error::{Result, SunbeamError}; use k8s_openapi::api::core::v1::Pod; use kube::api::{Api, DynamicObject, ListParams, LogParams}; use kube::ResourceExt; -use std::collections::BTreeMap; -use crate::constants::MANAGED_NS; use crate::kube::{get_client, kube_rollout_restart, parse_target}; use crate::output::{ok, step, warn}; +use sunbeam_sdk::registry::{self, Category, ServiceRegistry}; -/// Services that can be rollout-restarted, as (namespace, deployment) pairs. -pub const SERVICES_TO_RESTART: &[(&str, &str)] = &[ - ("ory", "hydra"), - ("ory", "kratos"), - ("ory", "login-ui"), - ("devtools", "gitea"), - ("storage", "seaweedfs-filer"), - ("lasuite", "hive"), - ("lasuite", "people-backend"), - ("lasuite", "people-frontend"), - ("lasuite", "people-celery-worker"), - ("lasuite", "people-celery-beat"), - ("lasuite", "projects"), - ("matrix", "tuwunel"), - ("media", "livekit-server"), -]; +// --------------------------------------------------------------------------- +// Registry helper +// --------------------------------------------------------------------------- + +/// Discover the service registry from the cluster. +async fn get_registry() -> Result { + let client = get_client().await?; + registry::discover(client).await + .map_err(|e| SunbeamError::Other(format!("service discovery failed: {e}"))) +} // --------------------------------------------------------------------------- // Status helpers @@ -118,7 +111,7 @@ async fn vso_sync_status() -> Result<()> { if let Ok(list) = list { // Group by namespace and sort - let mut grouped: BTreeMap> = BTreeMap::new(); + let mut grouped: std::collections::BTreeMap> = std::collections::BTreeMap::new(); for obj in &list.items { let ns = obj.namespace().unwrap_or_default(); let name = obj.name_any(); @@ -159,7 +152,7 @@ async fn vso_sync_status() -> Result<()> { let list = api.list(&ListParams::default()).await; if let Ok(list) = list { - let mut grouped: BTreeMap> = BTreeMap::new(); + let mut grouped: std::collections::BTreeMap> = std::collections::BTreeMap::new(); for obj in &list.items { let ns = obj.namespace().unwrap_or_default(); let name = obj.name_any(); @@ -199,21 +192,23 @@ async fn vso_sync_status() -> Result<()> { // Public commands // --------------------------------------------------------------------------- -/// Show pod health, optionally filtered by namespace or namespace/service. +/// Show pod health, optionally filtered by service name, category, namespace, +/// or legacy namespace/service syntax. pub async fn cmd_status(target: Option<&str>) -> Result<()> { step("Pod health across all namespaces..."); let client = get_client().await?; - let (ns_filter, svc_filter) = parse_target(target)?; + let reg = get_registry().await?; let mut pods: Vec = Vec::new(); - match (ns_filter, svc_filter) { - (None, _) => { - // All managed namespaces + match target { + None => { + // All managed namespaces (derived from registry) + let namespaces = reg.namespaces(); let ns_set: std::collections::HashSet<&str> = - MANAGED_NS.iter().copied().collect(); - for ns in MANAGED_NS { + namespaces.iter().copied().collect(); + for ns in &namespaces { let api: Api = Api::namespaced(client.clone(), ns); let lp = ListParams::default(); if let Ok(list) = api.list(&lp).await { @@ -232,33 +227,89 @@ pub async fn cmd_status(target: Option<&str>) -> Result<()> { } } } - (Some(ns), None) => { - // All pods in a namespace - let api: Api = Api::namespaced(client.clone(), ns); - let lp = ListParams::default(); - if let Ok(list) = api.list(&lp).await { - for pod in list.items { - pods.push(PodRow { - ns: ns.to_string(), - name: pod.name_any(), - ready: pod_ready_str(&pod), - status: pod_phase(&pod), - }); + Some(input) => { + // Try registry resolution first + let resolved = reg.resolve(input); + if !resolved.is_empty() { + // Collect unique namespaces from resolved services + let mut namespaces: Vec<&str> = resolved.iter() + .map(|s| s.namespace.as_str()) + .collect(); + namespaces.sort_unstable(); + namespaces.dedup(); + + // Collect deployment names for label filtering + let deploy_names: std::collections::HashSet<&str> = resolved.iter() + .flat_map(|s| s.deployments.iter().map(|d| d.as_str())) + .collect(); + + for ns in &namespaces { + let api: Api = Api::namespaced(client.clone(), ns); + if deploy_names.is_empty() { + // Services with no deployments: show all pods in namespace + let lp = ListParams::default(); + if let Ok(list) = api.list(&lp).await { + for pod in list.items { + pods.push(PodRow { + ns: ns.to_string(), + name: pod.name_any(), + ready: pod_ready_str(&pod), + status: pod_phase(&pod), + }); + } + } + } else { + // Filter by app label for each deployment + for deploy in &deploy_names { + let lp = ListParams::default() + .labels(&format!("app={deploy}")); + if let Ok(list) = api.list(&lp).await { + for pod in list.items { + pods.push(PodRow { + ns: ns.to_string(), + name: pod.name_any(), + ready: pod_ready_str(&pod), + status: pod_phase(&pod), + }); + } + } + } + } } - } - } - (Some(ns), Some(svc)) => { - // Specific service: filter by app label - let api: Api = Api::namespaced(client.clone(), ns); - let lp = ListParams::default().labels(&format!("app={svc}")); - if let Ok(list) = api.list(&lp).await { - for pod in list.items { - pods.push(PodRow { - ns: ns.to_string(), - name: pod.name_any(), - ready: pod_ready_str(&pod), - status: pod_phase(&pod), - }); + } else { + // Fallback: parse as namespace or namespace/name + let (ns_filter, svc_filter) = parse_target(Some(input))?; + match (ns_filter, svc_filter) { + (Some(ns), None) => { + let api: Api = Api::namespaced(client.clone(), ns); + let lp = ListParams::default(); + if let Ok(list) = api.list(&lp).await { + for pod in list.items { + pods.push(PodRow { + ns: ns.to_string(), + name: pod.name_any(), + ready: pod_ready_str(&pod), + status: pod_phase(&pod), + }); + } + } + } + (Some(ns), Some(svc)) => { + let api: Api = Api::namespaced(client.clone(), ns); + let lp = ListParams::default() + .labels(&format!("app={svc}")); + if let Ok(list) = api.list(&lp).await { + for pod in list.items { + pods.push(PodRow { + ns: ns.to_string(), + name: pod.name_any(), + ready: pod_ready_str(&pod), + status: pod_phase(&pod), + }); + } + } + } + _ => {} } } } @@ -308,13 +359,28 @@ pub async fn cmd_status(target: Option<&str>) -> Result<()> { Ok(()) } -/// Stream logs for a service. Target must include service name (e.g. ory/kratos). +/// Stream logs for a service. Accepts a service name (e.g. "hydra") or legacy +/// namespace/name syntax (e.g. "ory/kratos"). pub async fn cmd_logs(target: &str, follow: bool) -> Result<()> { - let (ns_opt, name_opt) = parse_target(Some(target))?; - let ns = ns_opt.unwrap_or(""); - let name = match name_opt { - Some(n) => n, - None => bail!("Logs require a service name, e.g. 'ory/kratos'."), + // Try registry first for exact service name match + let reg = get_registry().await?; + let (ns, name) = if let Some(svc) = reg.get(target) { + if svc.deployments.is_empty() { + bail!("{target} has no deployments to show logs for"); + } + (svc.namespace.as_str(), svc.deployments[0].as_str()) + } else { + // Fallback: parse as namespace/name + let (ns_opt, name_opt) = parse_target(Some(target))?; + let ns = ns_opt.unwrap_or(""); + let name = match name_opt { + Some(n) => n, + None => bail!( + "No service '{target}' found in registry. Use namespace/name syntax \ + (e.g. 'ory/kratos') or a known service name." + ), + }; + (ns, name) }; let client = get_client().await?; @@ -395,27 +461,50 @@ pub async fn cmd_get(target: &str, output: &str) -> Result<()> { Ok(()) } -/// Restart deployments. None=all, 'ory'=namespace, 'ory/kratos'=specific. +/// Restart deployments. Accepts service names, categories, namespaces, or +/// legacy namespace/name syntax. None restarts all non-infra services with +/// deployments. pub async fn cmd_restart(target: Option<&str>) -> Result<()> { step("Restarting services..."); - let (ns_filter, svc_filter) = parse_target(target)?; + let reg = get_registry().await?; - let matched: Vec<(&str, &str)> = match (ns_filter, svc_filter) { - (None, _) => SERVICES_TO_RESTART.to_vec(), - (Some(ns), None) => SERVICES_TO_RESTART - .iter() - .filter(|(n, _)| *n == ns) - .copied() - .collect(), - (Some(ns), Some(name)) => SERVICES_TO_RESTART - .iter() - .filter(|(n, d)| *n == ns && *d == name) - .copied() - .collect(), + // Collect (namespace, deployment) pairs to restart. + let pairs: Vec<(String, String)> = match target { + None => { + // All non-infra services with deployments + reg.all().iter() + .filter(|s| !s.deployments.is_empty()) + .filter(|s| s.category != Category::Infra) + .flat_map(|s| s.deployments.iter().map(move |d| (s.namespace.clone(), d.clone()))) + .collect() + } + Some(input) => { + let resolved = reg.resolve(input); + if !resolved.is_empty() { + resolved.iter() + .flat_map(|s| s.deployments.iter().map(move |d| (s.namespace.clone(), d.clone()))) + .collect() + } else { + // Fallback: parse as namespace or namespace/name + let (ns_filter, svc_filter) = parse_target(Some(input))?; + match (ns_filter, svc_filter) { + (Some(ns), None) => { + // Restart all deployments in this namespace from registry + reg.by_namespace(ns).iter() + .flat_map(|s| s.deployments.iter().map(move |d| (s.namespace.clone(), d.clone()))) + .collect() + } + (Some(ns), Some(name)) => { + vec![(ns.to_string(), name.to_string())] + } + _ => vec![], + } + } + } }; - if matched.is_empty() { + if pairs.is_empty() { warn(&format!( "No matching services for target: {}", target.unwrap_or("(none)") @@ -423,7 +512,7 @@ pub async fn cmd_restart(target: Option<&str>) -> Result<()> { return Ok(()); } - for (ns, dep) in &matched { + for (ns, dep) in &pairs { if let Err(e) = kube_rollout_restart(ns, dep).await { warn(&format!("Failed to restart {ns}/{dep}: {e}")); } @@ -440,34 +529,6 @@ pub async fn cmd_restart(target: Option<&str>) -> Result<()> { mod tests { use super::*; - #[test] - fn test_managed_ns_contains_expected() { - assert!(MANAGED_NS.contains(&"ory")); - assert!(MANAGED_NS.contains(&"data")); - assert!(MANAGED_NS.contains(&"devtools")); - assert!(MANAGED_NS.contains(&"ingress")); - assert!(MANAGED_NS.contains(&"lasuite")); - assert!(MANAGED_NS.contains(&"matrix")); - assert!(MANAGED_NS.contains(&"media")); - assert!(MANAGED_NS.contains(&"storage")); - assert!(MANAGED_NS.contains(&"monitoring")); - assert!(MANAGED_NS.contains(&"vault-secrets-operator")); - assert_eq!(MANAGED_NS.len(), 10); - } - - #[test] - fn test_services_to_restart_contains_expected() { - assert!(SERVICES_TO_RESTART.contains(&("ory", "hydra"))); - assert!(SERVICES_TO_RESTART.contains(&("ory", "kratos"))); - assert!(SERVICES_TO_RESTART.contains(&("ory", "login-ui"))); - assert!(SERVICES_TO_RESTART.contains(&("devtools", "gitea"))); - assert!(SERVICES_TO_RESTART.contains(&("storage", "seaweedfs-filer"))); - assert!(SERVICES_TO_RESTART.contains(&("lasuite", "hive"))); - assert!(SERVICES_TO_RESTART.contains(&("matrix", "tuwunel"))); - assert!(SERVICES_TO_RESTART.contains(&("media", "livekit-server"))); - assert_eq!(SERVICES_TO_RESTART.len(), 13); - } - #[test] fn test_icon_for_status() { assert_eq!(icon_for_status("Running"), "\u{2713}"); @@ -479,58 +540,16 @@ mod tests { assert_eq!(icon_for_status("CrashLoopBackOff"), "?"); } - #[test] - fn test_restart_filter_namespace() { - let matched: Vec<(&str, &str)> = SERVICES_TO_RESTART - .iter() - .filter(|(n, _)| *n == "ory") - .copied() - .collect(); - assert_eq!(matched.len(), 3); - assert!(matched.contains(&("ory", "hydra"))); - assert!(matched.contains(&("ory", "kratos"))); - assert!(matched.contains(&("ory", "login-ui"))); - } - - #[test] - fn test_restart_filter_specific() { - let matched: Vec<(&str, &str)> = SERVICES_TO_RESTART - .iter() - .filter(|(n, d)| *n == "ory" && *d == "kratos") - .copied() - .collect(); - assert_eq!(matched.len(), 1); - assert_eq!(matched[0], ("ory", "kratos")); - } - - #[test] - fn test_restart_filter_no_match() { - let matched: Vec<(&str, &str)> = SERVICES_TO_RESTART - .iter() - .filter(|(n, d)| *n == "nonexistent" && *d == "nosuch") - .copied() - .collect(); - assert!(matched.is_empty()); - } - - #[test] - fn test_restart_filter_all() { - let matched: Vec<(&str, &str)> = SERVICES_TO_RESTART.to_vec(); - assert_eq!(matched.len(), 13); - } - #[test] fn test_pod_ready_string_format() { - // Verify format: "N/M" let ready = "2/3"; let parts: Vec<&str> = ready.split('/').collect(); assert_eq!(parts.len(), 2); - assert_ne!(parts[0], parts[1]); // unhealthy + assert_ne!(parts[0], parts[1]); } #[test] fn test_unhealthy_detection_by_ready_ratio() { - // Simulate the ready ratio check used in cmd_status let ready = "1/2"; let status = "Running"; let mut unhealthy = !matches!(status, "Running" | "Completed" | "Succeeded");