fix: rewrite users.rs to fully async (was blocking tokio runtime)

Replace all blocking I/O with async equivalents:
- tokio::process::Command instead of std::process::Command
- tokio::time::sleep instead of std::thread::sleep
- reqwest::Client (async) instead of reqwest::blocking::Client
- All helper functions (api, find_identity, generate_recovery, etc.) now async
- PortForward::Drop uses start_kill() (sync SIGKILL) for cleanup
- send_welcome_email wrapped in spawn_blocking for lettre sync transport
This commit is contained in:
2026-03-20 13:31:45 +00:00
parent 24e98b4e7d
commit e95ee4f377

View File

@@ -12,16 +12,16 @@ const SMTP_LOCAL_PORT: u16 = 10025;
// Port-forward helper // Port-forward helper
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
/// Spawn a kubectl port-forward process and return (child, base_url). /// RAII guard that terminates the port-forward on drop.
/// The caller **must** kill the child when done. struct PortForward {
fn spawn_port_forward( child: tokio::process::Child,
ns: &str, pub base_url: String,
svc: &str, }
local_port: u16,
remote_port: u16, impl PortForward {
) -> Result<(std::process::Child, String)> { async fn new(ns: &str, svc: &str, local_port: u16, remote_port: u16) -> Result<Self> {
let ctx = crate::kube::context(); let ctx = crate::kube::context();
let child = std::process::Command::new("kubectl") let child = tokio::process::Command::new("kubectl")
.arg(format!("--context={ctx}")) .arg(format!("--context={ctx}"))
.args([ .args([
"-n", "-n",
@@ -36,33 +36,23 @@ fn spawn_port_forward(
.with_ctx(|| format!("Failed to spawn port-forward to {ns}/svc/{svc}"))?; .with_ctx(|| format!("Failed to spawn port-forward to {ns}/svc/{svc}"))?;
// Give the port-forward time to bind // Give the port-forward time to bind
std::thread::sleep(std::time::Duration::from_millis(1500)); tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
Ok((child, format!("http://localhost:{local_port}"))) Ok(Self {
} child,
base_url: format!("http://localhost:{local_port}"),
/// RAII guard that terminates the port-forward on drop. })
struct PortForward {
child: std::process::Child,
pub base_url: String,
}
impl PortForward {
fn new(ns: &str, svc: &str, local_port: u16, remote_port: u16) -> Result<Self> {
let (child, base_url) = spawn_port_forward(ns, svc, local_port, remote_port)?;
Ok(Self { child, base_url })
} }
/// Convenience: Kratos admin (ory/kratos-admin 80 -> 4434). /// Convenience: Kratos admin (ory/kratos-admin 80 -> 4434).
fn kratos() -> Result<Self> { async fn kratos() -> Result<Self> {
Self::new("ory", "kratos-admin", 4434, 80) Self::new("ory", "kratos-admin", 4434, 80).await
} }
} }
impl Drop for PortForward { impl Drop for PortForward {
fn drop(&mut self) { fn drop(&mut self) {
let _ = self.child.kill(); let _ = self.child.start_kill();
let _ = self.child.wait();
} }
} }
@@ -71,7 +61,7 @@ impl Drop for PortForward {
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
/// Make an HTTP request to an admin API endpoint. /// Make an HTTP request to an admin API endpoint.
fn api( async fn api(
base_url: &str, base_url: &str,
path: &str, path: &str,
method: &str, method: &str,
@@ -80,7 +70,7 @@ fn api(
ok_statuses: &[u16], ok_statuses: &[u16],
) -> Result<Option<Value>> { ) -> Result<Option<Value>> {
let url = format!("{base_url}{prefix}{path}"); let url = format!("{base_url}{prefix}{path}");
let client = reqwest::blocking::Client::new(); let client = reqwest::Client::new();
let mut req = match method { let mut req = match method {
"GET" => client.get(&url), "GET" => client.get(&url),
@@ -99,18 +89,21 @@ fn api(
req = req.json(b); req = req.json(b);
} }
let resp = req.send().with_ctx(|| format!("HTTP {method} {url} failed"))?; let resp = req
.send()
.await
.with_ctx(|| format!("HTTP {method} {url} failed"))?;
let status = resp.status().as_u16(); let status = resp.status().as_u16();
if !resp.status().is_success() { if !resp.status().is_success() {
if ok_statuses.contains(&status) { if ok_statuses.contains(&status) {
return Ok(None); return Ok(None);
} }
let err_text = resp.text().unwrap_or_default(); let err_text = resp.text().await.unwrap_or_default();
bail!("API error {status}: {err_text}"); bail!("API error {status}: {err_text}");
} }
let text = resp.text().unwrap_or_default(); let text = resp.text().await.unwrap_or_default();
if text.is_empty() { if text.is_empty() {
return Ok(None); return Ok(None);
} }
@@ -120,14 +113,14 @@ fn api(
} }
/// Shorthand: Kratos admin API call (prefix = "/admin"). /// Shorthand: Kratos admin API call (prefix = "/admin").
fn kratos_api( async fn kratos_api(
base_url: &str, base_url: &str,
path: &str, path: &str,
method: &str, method: &str,
body: Option<&Value>, body: Option<&Value>,
ok_statuses: &[u16], ok_statuses: &[u16],
) -> Result<Option<Value>> { ) -> Result<Option<Value>> {
api(base_url, path, method, body, "/admin", ok_statuses) api(base_url, path, method, body, "/admin", ok_statuses).await
} }
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -135,10 +128,10 @@ fn kratos_api(
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
/// Find identity by UUID or email search. Returns the identity JSON. /// Find identity by UUID or email search. Returns the identity JSON.
fn find_identity(base_url: &str, target: &str, required: bool) -> Result<Option<Value>> { async fn find_identity(base_url: &str, target: &str, required: bool) -> Result<Option<Value>> {
// Looks like a UUID? // Looks like a UUID?
if target.len() == 36 && target.chars().filter(|&c| c == '-').count() == 4 { if target.len() == 36 && target.chars().filter(|&c| c == '-').count() == 4 {
let result = kratos_api(base_url, &format!("/identities/{target}"), "GET", None, &[])?; let result = kratos_api(base_url, &format!("/identities/{target}"), "GET", None, &[]).await?;
return Ok(result); return Ok(result);
} }
@@ -149,7 +142,8 @@ fn find_identity(base_url: &str, target: &str, required: bool) -> Result<Option<
"GET", "GET",
None, None,
&[], &[],
)?; )
.await?;
if let Some(Value::Array(arr)) = &result { if let Some(Value::Array(arr)) = &result {
if let Some(first) = arr.first() { if let Some(first) = arr.first() {
@@ -185,13 +179,13 @@ fn identity_put_body(identity: &Value, state: Option<&str>, extra: Option<Value>
} }
/// Generate a 24h recovery code. Returns (link, code). /// Generate a 24h recovery code. Returns (link, code).
fn generate_recovery(base_url: &str, identity_id: &str) -> Result<(String, String)> { async fn generate_recovery(base_url: &str, identity_id: &str) -> Result<(String, String)> {
let body = serde_json::json!({ let body = serde_json::json!({
"identity_id": identity_id, "identity_id": identity_id,
"expires_in": "24h", "expires_in": "24h",
}); });
let result = kratos_api(base_url, "/recovery/code", "POST", Some(&body), &[])?; let result = kratos_api(base_url, "/recovery/code", "POST", Some(&body), &[]).await?;
let recovery = result.unwrap_or_default(); let recovery = result.unwrap_or_default();
let link = recovery let link = recovery
@@ -209,14 +203,15 @@ fn generate_recovery(base_url: &str, identity_id: &str) -> Result<(String, Strin
} }
/// Find the next sequential employee ID by scanning all employee identities. /// Find the next sequential employee ID by scanning all employee identities.
fn next_employee_id(base_url: &str) -> Result<String> { async fn next_employee_id(base_url: &str) -> Result<String> {
let result = kratos_api( let result = kratos_api(
base_url, base_url,
"/identities?page_size=200", "/identities?page_size=200",
"GET", "GET",
None, None,
&[], &[],
)?; )
.await?;
let identities = match result { let identities = match result {
Some(Value::Array(arr)) => arr, Some(Value::Array(arr)) => arr,
@@ -300,12 +295,12 @@ fn identity_id(identity: &Value) -> Result<String> {
pub async fn cmd_user_list(search: &str) -> Result<()> { pub async fn cmd_user_list(search: &str) -> Result<()> {
step("Listing identities..."); step("Listing identities...");
let pf = PortForward::kratos()?; let pf = PortForward::kratos().await?;
let mut path = "/identities?page_size=20".to_string(); let mut path = "/identities?page_size=20".to_string();
if !search.is_empty() { if !search.is_empty() {
path.push_str(&format!("&credentials_identifier={search}")); path.push_str(&format!("&credentials_identifier={search}"));
} }
let result = kratos_api(&pf.base_url, &path, "GET", None, &[])?; let result = kratos_api(&pf.base_url, &path, "GET", None, &[]).await?;
drop(pf); drop(pf);
let identities = match result { let identities = match result {
@@ -343,8 +338,9 @@ pub async fn cmd_user_list(search: &str) -> Result<()> {
pub async fn cmd_user_get(target: &str) -> Result<()> { pub async fn cmd_user_get(target: &str) -> Result<()> {
step(&format!("Getting identity: {target}")); step(&format!("Getting identity: {target}"));
let pf = PortForward::kratos()?; let pf = PortForward::kratos().await?;
let identity = find_identity(&pf.base_url, target, true)? let identity = find_identity(&pf.base_url, target, true)
.await?
.ok_or_else(|| SunbeamError::identity("Identity not found"))?; .ok_or_else(|| SunbeamError::identity("Identity not found"))?;
drop(pf); drop(pf);
@@ -370,14 +366,15 @@ pub async fn cmd_user_create(email: &str, name: &str, schema_id: &str) -> Result
"state": "active", "state": "active",
}); });
let pf = PortForward::kratos()?; let pf = PortForward::kratos().await?;
let identity = kratos_api(&pf.base_url, "/identities", "POST", Some(&body), &[])? let identity = kratos_api(&pf.base_url, "/identities", "POST", Some(&body), &[])
.await?
.ok_or_else(|| SunbeamError::identity("Failed to create identity"))?; .ok_or_else(|| SunbeamError::identity("Failed to create identity"))?;
let iid = identity_id(&identity)?; let iid = identity_id(&identity)?;
ok(&format!("Created identity: {iid}")); ok(&format!("Created identity: {iid}"));
let (link, code) = generate_recovery(&pf.base_url, &iid)?; let (link, code) = generate_recovery(&pf.base_url, &iid).await?;
drop(pf); drop(pf);
ok("Recovery link (valid 24h):"); ok("Recovery link (valid 24h):");
@@ -399,8 +396,9 @@ pub async fn cmd_user_delete(target: &str) -> Result<()> {
return Ok(()); return Ok(());
} }
let pf = PortForward::kratos()?; let pf = PortForward::kratos().await?;
let identity = find_identity(&pf.base_url, target, true)? let identity = find_identity(&pf.base_url, target, true)
.await?
.ok_or_else(|| SunbeamError::identity("Identity not found"))?; .ok_or_else(|| SunbeamError::identity("Identity not found"))?;
let iid = identity_id(&identity)?; let iid = identity_id(&identity)?;
kratos_api( kratos_api(
@@ -409,7 +407,8 @@ pub async fn cmd_user_delete(target: &str) -> Result<()> {
"DELETE", "DELETE",
None, None,
&[], &[],
)?; )
.await?;
drop(pf); drop(pf);
ok("Deleted."); ok("Deleted.");
@@ -419,11 +418,12 @@ pub async fn cmd_user_delete(target: &str) -> Result<()> {
pub async fn cmd_user_recover(target: &str) -> Result<()> { pub async fn cmd_user_recover(target: &str) -> Result<()> {
step(&format!("Generating recovery link for: {target}")); step(&format!("Generating recovery link for: {target}"));
let pf = PortForward::kratos()?; let pf = PortForward::kratos().await?;
let identity = find_identity(&pf.base_url, target, true)? let identity = find_identity(&pf.base_url, target, true)
.await?
.ok_or_else(|| SunbeamError::identity("Identity not found"))?; .ok_or_else(|| SunbeamError::identity("Identity not found"))?;
let iid = identity_id(&identity)?; let iid = identity_id(&identity)?;
let (link, code) = generate_recovery(&pf.base_url, &iid)?; let (link, code) = generate_recovery(&pf.base_url, &iid).await?;
drop(pf); drop(pf);
ok("Recovery link (valid 24h):"); ok("Recovery link (valid 24h):");
@@ -436,8 +436,9 @@ pub async fn cmd_user_recover(target: &str) -> Result<()> {
pub async fn cmd_user_disable(target: &str) -> Result<()> { pub async fn cmd_user_disable(target: &str) -> Result<()> {
step(&format!("Disabling identity: {target}")); step(&format!("Disabling identity: {target}"));
let pf = PortForward::kratos()?; let pf = PortForward::kratos().await?;
let identity = find_identity(&pf.base_url, target, true)? let identity = find_identity(&pf.base_url, target, true)
.await?
.ok_or_else(|| SunbeamError::identity("Identity not found"))?; .ok_or_else(|| SunbeamError::identity("Identity not found"))?;
let iid = identity_id(&identity)?; let iid = identity_id(&identity)?;
@@ -448,14 +449,16 @@ pub async fn cmd_user_disable(target: &str) -> Result<()> {
"PUT", "PUT",
Some(&put_body), Some(&put_body),
&[], &[],
)?; )
.await?;
kratos_api( kratos_api(
&pf.base_url, &pf.base_url,
&format!("/identities/{iid}/sessions"), &format!("/identities/{iid}/sessions"),
"DELETE", "DELETE",
None, None,
&[], &[],
)?; )
.await?;
drop(pf); drop(pf);
ok(&format!( ok(&format!(
@@ -469,8 +472,9 @@ pub async fn cmd_user_disable(target: &str) -> Result<()> {
pub async fn cmd_user_enable(target: &str) -> Result<()> { pub async fn cmd_user_enable(target: &str) -> Result<()> {
step(&format!("Enabling identity: {target}")); step(&format!("Enabling identity: {target}"));
let pf = PortForward::kratos()?; let pf = PortForward::kratos().await?;
let identity = find_identity(&pf.base_url, target, true)? let identity = find_identity(&pf.base_url, target, true)
.await?
.ok_or_else(|| SunbeamError::identity("Identity not found"))?; .ok_or_else(|| SunbeamError::identity("Identity not found"))?;
let iid = identity_id(&identity)?; let iid = identity_id(&identity)?;
@@ -481,7 +485,8 @@ pub async fn cmd_user_enable(target: &str) -> Result<()> {
"PUT", "PUT",
Some(&put_body), Some(&put_body),
&[], &[],
)?; )
.await?;
drop(pf); drop(pf);
ok(&format!("Identity {}... re-enabled.", short_id(&iid))); ok(&format!("Identity {}... re-enabled.", short_id(&iid)));
@@ -491,8 +496,9 @@ pub async fn cmd_user_enable(target: &str) -> Result<()> {
pub async fn cmd_user_set_password(target: &str, password: &str) -> Result<()> { pub async fn cmd_user_set_password(target: &str, password: &str) -> Result<()> {
step(&format!("Setting password for: {target}")); step(&format!("Setting password for: {target}"));
let pf = PortForward::kratos()?; let pf = PortForward::kratos().await?;
let identity = find_identity(&pf.base_url, target, true)? let identity = find_identity(&pf.base_url, target, true)
.await?
.ok_or_else(|| SunbeamError::identity("Identity not found"))?; .ok_or_else(|| SunbeamError::identity("Identity not found"))?;
let iid = identity_id(&identity)?; let iid = identity_id(&identity)?;
@@ -512,7 +518,8 @@ pub async fn cmd_user_set_password(target: &str, password: &str) -> Result<()> {
"PUT", "PUT",
Some(&put_body), Some(&put_body),
&[], &[],
)?; )
.await?;
drop(pf); drop(pf);
ok(&format!("Password set for {}...", short_id(&iid))); ok(&format!("Password set for {}...", short_id(&iid)));
@@ -524,7 +531,7 @@ pub async fn cmd_user_set_password(target: &str, password: &str) -> Result<()> {
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
/// Send a welcome email via cluster Postfix (port-forward to svc/postfix in lasuite). /// Send a welcome email via cluster Postfix (port-forward to svc/postfix in lasuite).
fn send_welcome_email( async fn send_welcome_email(
domain: &str, domain: &str,
email: &str, email: &str,
name: &str, name: &str,
@@ -589,15 +596,19 @@ Messages (Matrix):
.body(body_text) .body(body_text)
.ctx("Failed to build email message")?; .ctx("Failed to build email message")?;
let _pf = PortForward::new("lasuite", "postfix", SMTP_LOCAL_PORT, 25)?; let _pf = PortForward::new("lasuite", "postfix", SMTP_LOCAL_PORT, 25).await?;
let mailer = SmtpTransport::builder_dangerous("localhost") let mailer = SmtpTransport::builder_dangerous("localhost")
.port(SMTP_LOCAL_PORT) .port(SMTP_LOCAL_PORT)
.build(); .build();
tokio::task::spawn_blocking(move || {
mailer mailer
.send(&message) .send(&message)
.ctx("Failed to send welcome email via SMTP")?; .map_err(|e| SunbeamError::Other(format!("Failed to send welcome email via SMTP: {e}")))
})
.await
.map_err(|e| SunbeamError::Other(format!("Email send task panicked: {e}")))??;
ok(&format!("Welcome email sent to {email}")); ok(&format!("Welcome email sent to {email}"));
Ok(()) Ok(())
@@ -618,16 +629,16 @@ pub async fn cmd_user_onboard(
) -> Result<()> { ) -> Result<()> {
step(&format!("Onboarding: {email}")); step(&format!("Onboarding: {email}"));
let pf = PortForward::kratos()?; let pf = PortForward::kratos().await?;
let (iid, recovery_link, recovery_code) = { let (iid, recovery_link, recovery_code) = {
let existing = find_identity(&pf.base_url, email, false)?; let existing = find_identity(&pf.base_url, email, false).await?;
if let Some(existing) = existing { if let Some(existing) = existing {
let iid = identity_id(&existing)?; let iid = identity_id(&existing)?;
warn(&format!("Identity already exists: {}...", short_id(&iid))); warn(&format!("Identity already exists: {}...", short_id(&iid)));
step("Generating fresh recovery link..."); step("Generating fresh recovery link...");
let (link, code) = generate_recovery(&pf.base_url, &iid)?; let (link, code) = generate_recovery(&pf.base_url, &iid).await?;
(iid, link, code) (iid, link, code)
} else { } else {
let mut traits = serde_json::json!({ "email": email }); let mut traits = serde_json::json!({ "email": email });
@@ -640,7 +651,7 @@ pub async fn cmd_user_onboard(
let mut employee_id = String::new(); let mut employee_id = String::new();
if schema_id == "employee" { if schema_id == "employee" {
employee_id = next_employee_id(&pf.base_url)?; employee_id = next_employee_id(&pf.base_url).await?;
traits["employee_id"] = Value::String(employee_id.clone()); traits["employee_id"] = Value::String(employee_id.clone());
if !job_title.is_empty() { if !job_title.is_empty() {
traits["job_title"] = Value::String(job_title.to_string()); traits["job_title"] = Value::String(job_title.to_string());
@@ -670,7 +681,8 @@ pub async fn cmd_user_onboard(
}], }],
}); });
let identity = kratos_api(&pf.base_url, "/identities", "POST", Some(&body), &[])? let identity = kratos_api(&pf.base_url, "/identities", "POST", Some(&body), &[])
.await?
.ok_or_else(|| SunbeamError::identity("Failed to create identity"))?; .ok_or_else(|| SunbeamError::identity("Failed to create identity"))?;
let iid = identity_id(&identity)?; let iid = identity_id(&identity)?;
@@ -690,9 +702,10 @@ pub async fn cmd_user_onboard(
"PATCH", "PATCH",
Some(&patch_body), Some(&patch_body),
&[], &[],
)?; )
.await?;
let (link, code) = generate_recovery(&pf.base_url, &iid)?; let (link, code) = generate_recovery(&pf.base_url, &iid).await?;
(iid, link, code) (iid, link, code)
} }
}; };
@@ -702,7 +715,7 @@ pub async fn cmd_user_onboard(
if send_email { if send_email {
let domain = crate::kube::get_domain().await?; let domain = crate::kube::get_domain().await?;
let recipient = if notify.is_empty() { email } else { notify }; let recipient = if notify.is_empty() { email } else { notify };
send_welcome_email(&domain, recipient, name, &recovery_link, &recovery_code)?; send_welcome_email(&domain, recipient, name, &recovery_link, &recovery_code).await?;
} }
ok(&format!("Identity ID: {iid}")); ok(&format!("Identity ID: {iid}"));
@@ -729,8 +742,9 @@ pub async fn cmd_user_offboard(target: &str) -> Result<()> {
return Ok(()); return Ok(());
} }
let pf = PortForward::kratos()?; let pf = PortForward::kratos().await?;
let identity = find_identity(&pf.base_url, target, true)? let identity = find_identity(&pf.base_url, target, true)
.await?
.ok_or_else(|| SunbeamError::identity("Identity not found"))?; .ok_or_else(|| SunbeamError::identity("Identity not found"))?;
let iid = identity_id(&identity)?; let iid = identity_id(&identity)?;
@@ -742,7 +756,8 @@ pub async fn cmd_user_offboard(target: &str) -> Result<()> {
"PUT", "PUT",
Some(&put_body), Some(&put_body),
&[], &[],
)?; )
.await?;
ok(&format!("Identity {}... disabled.", short_id(&iid))); ok(&format!("Identity {}... disabled.", short_id(&iid)));
step("Revoking Kratos sessions..."); step("Revoking Kratos sessions...");
@@ -752,12 +767,13 @@ pub async fn cmd_user_offboard(target: &str) -> Result<()> {
"DELETE", "DELETE",
None, None,
&[404], &[404],
)?; )
.await?;
ok("Kratos sessions revoked."); ok("Kratos sessions revoked.");
step("Revoking Hydra consent sessions..."); step("Revoking Hydra consent sessions...");
{ {
let hydra_pf = PortForward::new("ory", "hydra-admin", 14445, 4445)?; let hydra_pf = PortForward::new("ory", "hydra-admin", 14445, 4445).await?;
api( api(
&hydra_pf.base_url, &hydra_pf.base_url,
&format!("/oauth2/auth/sessions/consent?subject={iid}&all=true"), &format!("/oauth2/auth/sessions/consent?subject={iid}&all=true"),
@@ -765,7 +781,8 @@ pub async fn cmd_user_offboard(target: &str) -> Result<()> {
None, None,
"/admin", "/admin",
&[404], &[404],
)?; )
.await?;
} }
ok("Hydra consent sessions revoked."); ok("Hydra consent sessions revoked.");