diff --git a/src/users.rs b/src/users.rs index 0b33387..789a431 100644 --- a/src/users.rs +++ b/src/users.rs @@ -12,57 +12,47 @@ const SMTP_LOCAL_PORT: u16 = 10025; // Port-forward helper // --------------------------------------------------------------------------- -/// Spawn a kubectl port-forward process and return (child, base_url). -/// The caller **must** kill the child when done. -fn spawn_port_forward( - ns: &str, - svc: &str, - local_port: u16, - remote_port: u16, -) -> Result<(std::process::Child, String)> { - let ctx = crate::kube::context(); - let child = std::process::Command::new("kubectl") - .arg(format!("--context={ctx}")) - .args([ - "-n", - ns, - "port-forward", - &format!("svc/{svc}"), - &format!("{local_port}:{remote_port}"), - ]) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::piped()) - .spawn() - .with_ctx(|| format!("Failed to spawn port-forward to {ns}/svc/{svc}"))?; - - // Give the port-forward time to bind - std::thread::sleep(std::time::Duration::from_millis(1500)); - - Ok((child, format!("http://localhost:{local_port}"))) -} - /// RAII guard that terminates the port-forward on drop. struct PortForward { - child: std::process::Child, + child: tokio::process::Child, pub base_url: String, } impl PortForward { - fn new(ns: &str, svc: &str, local_port: u16, remote_port: u16) -> Result { - let (child, base_url) = spawn_port_forward(ns, svc, local_port, remote_port)?; - Ok(Self { child, base_url }) + async fn new(ns: &str, svc: &str, local_port: u16, remote_port: u16) -> Result { + let ctx = crate::kube::context(); + let child = tokio::process::Command::new("kubectl") + .arg(format!("--context={ctx}")) + .args([ + "-n", + ns, + "port-forward", + &format!("svc/{svc}"), + &format!("{local_port}:{remote_port}"), + ]) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + .with_ctx(|| format!("Failed to spawn port-forward to {ns}/svc/{svc}"))?; + + // Give the port-forward time to bind + tokio::time::sleep(std::time::Duration::from_millis(1500)).await; + + Ok(Self { + child, + base_url: format!("http://localhost:{local_port}"), + }) } /// Convenience: Kratos admin (ory/kratos-admin 80 -> 4434). - fn kratos() -> Result { - Self::new("ory", "kratos-admin", 4434, 80) + async fn kratos() -> Result { + Self::new("ory", "kratos-admin", 4434, 80).await } } impl Drop for PortForward { fn drop(&mut self) { - let _ = self.child.kill(); - let _ = self.child.wait(); + let _ = self.child.start_kill(); } } @@ -71,7 +61,7 @@ impl Drop for PortForward { // --------------------------------------------------------------------------- /// Make an HTTP request to an admin API endpoint. -fn api( +async fn api( base_url: &str, path: &str, method: &str, @@ -80,7 +70,7 @@ fn api( ok_statuses: &[u16], ) -> Result> { let url = format!("{base_url}{prefix}{path}"); - let client = reqwest::blocking::Client::new(); + let client = reqwest::Client::new(); let mut req = match method { "GET" => client.get(&url), @@ -99,18 +89,21 @@ fn api( req = req.json(b); } - let resp = req.send().with_ctx(|| format!("HTTP {method} {url} failed"))?; + let resp = req + .send() + .await + .with_ctx(|| format!("HTTP {method} {url} failed"))?; let status = resp.status().as_u16(); if !resp.status().is_success() { if ok_statuses.contains(&status) { return Ok(None); } - let err_text = resp.text().unwrap_or_default(); + let err_text = resp.text().await.unwrap_or_default(); bail!("API error {status}: {err_text}"); } - let text = resp.text().unwrap_or_default(); + let text = resp.text().await.unwrap_or_default(); if text.is_empty() { return Ok(None); } @@ -120,14 +113,14 @@ fn api( } /// Shorthand: Kratos admin API call (prefix = "/admin"). -fn kratos_api( +async fn kratos_api( base_url: &str, path: &str, method: &str, body: Option<&Value>, ok_statuses: &[u16], ) -> Result> { - api(base_url, path, method, body, "/admin", ok_statuses) + api(base_url, path, method, body, "/admin", ok_statuses).await } // --------------------------------------------------------------------------- @@ -135,10 +128,10 @@ fn kratos_api( // --------------------------------------------------------------------------- /// Find identity by UUID or email search. Returns the identity JSON. -fn find_identity(base_url: &str, target: &str, required: bool) -> Result> { +async fn find_identity(base_url: &str, target: &str, required: bool) -> Result> { // Looks like a UUID? if target.len() == 36 && target.chars().filter(|&c| c == '-').count() == 4 { - let result = kratos_api(base_url, &format!("/identities/{target}"), "GET", None, &[])?; + let result = kratos_api(base_url, &format!("/identities/{target}"), "GET", None, &[]).await?; return Ok(result); } @@ -149,7 +142,8 @@ fn find_identity(base_url: &str, target: &str, required: bool) -> Result, extra: Option } /// Generate a 24h recovery code. Returns (link, code). -fn generate_recovery(base_url: &str, identity_id: &str) -> Result<(String, String)> { +async fn generate_recovery(base_url: &str, identity_id: &str) -> Result<(String, String)> { let body = serde_json::json!({ "identity_id": identity_id, "expires_in": "24h", }); - let result = kratos_api(base_url, "/recovery/code", "POST", Some(&body), &[])?; + let result = kratos_api(base_url, "/recovery/code", "POST", Some(&body), &[]).await?; let recovery = result.unwrap_or_default(); let link = recovery @@ -209,14 +203,15 @@ fn generate_recovery(base_url: &str, identity_id: &str) -> Result<(String, Strin } /// Find the next sequential employee ID by scanning all employee identities. -fn next_employee_id(base_url: &str) -> Result { +async fn next_employee_id(base_url: &str) -> Result { let result = kratos_api( base_url, "/identities?page_size=200", "GET", None, &[], - )?; + ) + .await?; let identities = match result { Some(Value::Array(arr)) => arr, @@ -300,12 +295,12 @@ fn identity_id(identity: &Value) -> Result { pub async fn cmd_user_list(search: &str) -> Result<()> { step("Listing identities..."); - let pf = PortForward::kratos()?; + let pf = PortForward::kratos().await?; let mut path = "/identities?page_size=20".to_string(); if !search.is_empty() { path.push_str(&format!("&credentials_identifier={search}")); } - let result = kratos_api(&pf.base_url, &path, "GET", None, &[])?; + let result = kratos_api(&pf.base_url, &path, "GET", None, &[]).await?; drop(pf); let identities = match result { @@ -343,8 +338,9 @@ pub async fn cmd_user_list(search: &str) -> Result<()> { pub async fn cmd_user_get(target: &str) -> Result<()> { step(&format!("Getting identity: {target}")); - let pf = PortForward::kratos()?; - let identity = find_identity(&pf.base_url, target, true)? + let pf = PortForward::kratos().await?; + let identity = find_identity(&pf.base_url, target, true) + .await? .ok_or_else(|| SunbeamError::identity("Identity not found"))?; drop(pf); @@ -370,14 +366,15 @@ pub async fn cmd_user_create(email: &str, name: &str, schema_id: &str) -> Result "state": "active", }); - let pf = PortForward::kratos()?; - let identity = kratos_api(&pf.base_url, "/identities", "POST", Some(&body), &[])? + let pf = PortForward::kratos().await?; + let identity = kratos_api(&pf.base_url, "/identities", "POST", Some(&body), &[]) + .await? .ok_or_else(|| SunbeamError::identity("Failed to create identity"))?; let iid = identity_id(&identity)?; ok(&format!("Created identity: {iid}")); - let (link, code) = generate_recovery(&pf.base_url, &iid)?; + let (link, code) = generate_recovery(&pf.base_url, &iid).await?; drop(pf); ok("Recovery link (valid 24h):"); @@ -399,8 +396,9 @@ pub async fn cmd_user_delete(target: &str) -> Result<()> { return Ok(()); } - let pf = PortForward::kratos()?; - let identity = find_identity(&pf.base_url, target, true)? + let pf = PortForward::kratos().await?; + let identity = find_identity(&pf.base_url, target, true) + .await? .ok_or_else(|| SunbeamError::identity("Identity not found"))?; let iid = identity_id(&identity)?; kratos_api( @@ -409,7 +407,8 @@ pub async fn cmd_user_delete(target: &str) -> Result<()> { "DELETE", None, &[], - )?; + ) + .await?; drop(pf); ok("Deleted."); @@ -419,11 +418,12 @@ pub async fn cmd_user_delete(target: &str) -> Result<()> { pub async fn cmd_user_recover(target: &str) -> Result<()> { step(&format!("Generating recovery link for: {target}")); - let pf = PortForward::kratos()?; - let identity = find_identity(&pf.base_url, target, true)? + let pf = PortForward::kratos().await?; + let identity = find_identity(&pf.base_url, target, true) + .await? .ok_or_else(|| SunbeamError::identity("Identity not found"))?; let iid = identity_id(&identity)?; - let (link, code) = generate_recovery(&pf.base_url, &iid)?; + let (link, code) = generate_recovery(&pf.base_url, &iid).await?; drop(pf); ok("Recovery link (valid 24h):"); @@ -436,8 +436,9 @@ pub async fn cmd_user_recover(target: &str) -> Result<()> { pub async fn cmd_user_disable(target: &str) -> Result<()> { step(&format!("Disabling identity: {target}")); - let pf = PortForward::kratos()?; - let identity = find_identity(&pf.base_url, target, true)? + let pf = PortForward::kratos().await?; + let identity = find_identity(&pf.base_url, target, true) + .await? .ok_or_else(|| SunbeamError::identity("Identity not found"))?; let iid = identity_id(&identity)?; @@ -448,14 +449,16 @@ pub async fn cmd_user_disable(target: &str) -> Result<()> { "PUT", Some(&put_body), &[], - )?; + ) + .await?; kratos_api( &pf.base_url, &format!("/identities/{iid}/sessions"), "DELETE", None, &[], - )?; + ) + .await?; drop(pf); ok(&format!( @@ -469,8 +472,9 @@ pub async fn cmd_user_disable(target: &str) -> Result<()> { pub async fn cmd_user_enable(target: &str) -> Result<()> { step(&format!("Enabling identity: {target}")); - let pf = PortForward::kratos()?; - let identity = find_identity(&pf.base_url, target, true)? + let pf = PortForward::kratos().await?; + let identity = find_identity(&pf.base_url, target, true) + .await? .ok_or_else(|| SunbeamError::identity("Identity not found"))?; let iid = identity_id(&identity)?; @@ -481,7 +485,8 @@ pub async fn cmd_user_enable(target: &str) -> Result<()> { "PUT", Some(&put_body), &[], - )?; + ) + .await?; drop(pf); ok(&format!("Identity {}... re-enabled.", short_id(&iid))); @@ -491,8 +496,9 @@ pub async fn cmd_user_enable(target: &str) -> Result<()> { pub async fn cmd_user_set_password(target: &str, password: &str) -> Result<()> { step(&format!("Setting password for: {target}")); - let pf = PortForward::kratos()?; - let identity = find_identity(&pf.base_url, target, true)? + let pf = PortForward::kratos().await?; + let identity = find_identity(&pf.base_url, target, true) + .await? .ok_or_else(|| SunbeamError::identity("Identity not found"))?; let iid = identity_id(&identity)?; @@ -512,7 +518,8 @@ pub async fn cmd_user_set_password(target: &str, password: &str) -> Result<()> { "PUT", Some(&put_body), &[], - )?; + ) + .await?; drop(pf); ok(&format!("Password set for {}...", short_id(&iid))); @@ -524,7 +531,7 @@ pub async fn cmd_user_set_password(target: &str, password: &str) -> Result<()> { // --------------------------------------------------------------------------- /// Send a welcome email via cluster Postfix (port-forward to svc/postfix in lasuite). -fn send_welcome_email( +async fn send_welcome_email( domain: &str, email: &str, name: &str, @@ -589,15 +596,19 @@ Messages (Matrix): .body(body_text) .ctx("Failed to build email message")?; - let _pf = PortForward::new("lasuite", "postfix", SMTP_LOCAL_PORT, 25)?; + let _pf = PortForward::new("lasuite", "postfix", SMTP_LOCAL_PORT, 25).await?; let mailer = SmtpTransport::builder_dangerous("localhost") .port(SMTP_LOCAL_PORT) .build(); - mailer - .send(&message) - .ctx("Failed to send welcome email via SMTP")?; + tokio::task::spawn_blocking(move || { + mailer + .send(&message) + .map_err(|e| SunbeamError::Other(format!("Failed to send welcome email via SMTP: {e}"))) + }) + .await + .map_err(|e| SunbeamError::Other(format!("Email send task panicked: {e}")))??; ok(&format!("Welcome email sent to {email}")); Ok(()) @@ -618,16 +629,16 @@ pub async fn cmd_user_onboard( ) -> Result<()> { step(&format!("Onboarding: {email}")); - let pf = PortForward::kratos()?; + let pf = PortForward::kratos().await?; let (iid, recovery_link, recovery_code) = { - let existing = find_identity(&pf.base_url, email, false)?; + let existing = find_identity(&pf.base_url, email, false).await?; if let Some(existing) = existing { let iid = identity_id(&existing)?; warn(&format!("Identity already exists: {}...", short_id(&iid))); step("Generating fresh recovery link..."); - let (link, code) = generate_recovery(&pf.base_url, &iid)?; + let (link, code) = generate_recovery(&pf.base_url, &iid).await?; (iid, link, code) } else { let mut traits = serde_json::json!({ "email": email }); @@ -640,7 +651,7 @@ pub async fn cmd_user_onboard( let mut employee_id = String::new(); if schema_id == "employee" { - employee_id = next_employee_id(&pf.base_url)?; + employee_id = next_employee_id(&pf.base_url).await?; traits["employee_id"] = Value::String(employee_id.clone()); if !job_title.is_empty() { traits["job_title"] = Value::String(job_title.to_string()); @@ -670,7 +681,8 @@ pub async fn cmd_user_onboard( }], }); - let identity = kratos_api(&pf.base_url, "/identities", "POST", Some(&body), &[])? + let identity = kratos_api(&pf.base_url, "/identities", "POST", Some(&body), &[]) + .await? .ok_or_else(|| SunbeamError::identity("Failed to create identity"))?; let iid = identity_id(&identity)?; @@ -690,9 +702,10 @@ pub async fn cmd_user_onboard( "PATCH", Some(&patch_body), &[], - )?; + ) + .await?; - let (link, code) = generate_recovery(&pf.base_url, &iid)?; + let (link, code) = generate_recovery(&pf.base_url, &iid).await?; (iid, link, code) } }; @@ -702,7 +715,7 @@ pub async fn cmd_user_onboard( if send_email { let domain = crate::kube::get_domain().await?; let recipient = if notify.is_empty() { email } else { notify }; - send_welcome_email(&domain, recipient, name, &recovery_link, &recovery_code)?; + send_welcome_email(&domain, recipient, name, &recovery_link, &recovery_code).await?; } ok(&format!("Identity ID: {iid}")); @@ -729,8 +742,9 @@ pub async fn cmd_user_offboard(target: &str) -> Result<()> { return Ok(()); } - let pf = PortForward::kratos()?; - let identity = find_identity(&pf.base_url, target, true)? + let pf = PortForward::kratos().await?; + let identity = find_identity(&pf.base_url, target, true) + .await? .ok_or_else(|| SunbeamError::identity("Identity not found"))?; let iid = identity_id(&identity)?; @@ -742,7 +756,8 @@ pub async fn cmd_user_offboard(target: &str) -> Result<()> { "PUT", Some(&put_body), &[], - )?; + ) + .await?; ok(&format!("Identity {}... disabled.", short_id(&iid))); step("Revoking Kratos sessions..."); @@ -752,12 +767,13 @@ pub async fn cmd_user_offboard(target: &str) -> Result<()> { "DELETE", None, &[404], - )?; + ) + .await?; ok("Kratos sessions revoked."); step("Revoking Hydra consent sessions..."); { - let hydra_pf = PortForward::new("ory", "hydra-admin", 14445, 4445)?; + let hydra_pf = PortForward::new("ory", "hydra-admin", 14445, 4445).await?; api( &hydra_pf.base_url, &format!("/oauth2/auth/sessions/consent?subject={iid}&all=true"), @@ -765,7 +781,8 @@ pub async fn cmd_user_offboard(target: &str) -> Result<()> { None, "/admin", &[404], - )?; + ) + .await?; } ok("Hydra consent sessions revoked.");