fix: rewrite users.rs to fully async (was blocking tokio runtime)

Replace all blocking I/O with async equivalents:
- tokio::process::Command instead of std::process::Command
- tokio::time::sleep instead of std::thread::sleep
- reqwest::Client (async) instead of reqwest::blocking::Client
- All helper functions (api, find_identity, generate_recovery, etc.) now async
- PortForward::Drop uses start_kill() (sync SIGKILL) for cleanup
- send_welcome_email wrapped in spawn_blocking for lettre sync transport
This commit is contained in:
2026-03-20 13:31:45 +00:00
parent 24e98b4e7d
commit e95ee4f377

View File

@@ -12,16 +12,16 @@ const SMTP_LOCAL_PORT: u16 = 10025;
// Port-forward helper
// ---------------------------------------------------------------------------
/// Spawn a kubectl port-forward process and return (child, base_url).
/// The caller **must** kill the child when done.
fn spawn_port_forward(
ns: &str,
svc: &str,
local_port: u16,
remote_port: u16,
) -> Result<(std::process::Child, String)> {
/// RAII guard that terminates the port-forward on drop.
struct PortForward {
child: tokio::process::Child,
pub base_url: String,
}
impl PortForward {
async fn new(ns: &str, svc: &str, local_port: u16, remote_port: u16) -> Result<Self> {
let ctx = crate::kube::context();
let child = std::process::Command::new("kubectl")
let child = tokio::process::Command::new("kubectl")
.arg(format!("--context={ctx}"))
.args([
"-n",
@@ -36,33 +36,23 @@ fn spawn_port_forward(
.with_ctx(|| format!("Failed to spawn port-forward to {ns}/svc/{svc}"))?;
// Give the port-forward time to bind
std::thread::sleep(std::time::Duration::from_millis(1500));
tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
Ok((child, format!("http://localhost:{local_port}")))
}
/// RAII guard that terminates the port-forward on drop.
struct PortForward {
child: std::process::Child,
pub base_url: String,
}
impl PortForward {
fn new(ns: &str, svc: &str, local_port: u16, remote_port: u16) -> Result<Self> {
let (child, base_url) = spawn_port_forward(ns, svc, local_port, remote_port)?;
Ok(Self { child, base_url })
Ok(Self {
child,
base_url: format!("http://localhost:{local_port}"),
})
}
/// Convenience: Kratos admin (ory/kratos-admin 80 -> 4434).
fn kratos() -> Result<Self> {
Self::new("ory", "kratos-admin", 4434, 80)
async fn kratos() -> Result<Self> {
Self::new("ory", "kratos-admin", 4434, 80).await
}
}
impl Drop for PortForward {
fn drop(&mut self) {
let _ = self.child.kill();
let _ = self.child.wait();
let _ = self.child.start_kill();
}
}
@@ -71,7 +61,7 @@ impl Drop for PortForward {
// ---------------------------------------------------------------------------
/// Make an HTTP request to an admin API endpoint.
fn api(
async fn api(
base_url: &str,
path: &str,
method: &str,
@@ -80,7 +70,7 @@ fn api(
ok_statuses: &[u16],
) -> Result<Option<Value>> {
let url = format!("{base_url}{prefix}{path}");
let client = reqwest::blocking::Client::new();
let client = reqwest::Client::new();
let mut req = match method {
"GET" => client.get(&url),
@@ -99,18 +89,21 @@ fn api(
req = req.json(b);
}
let resp = req.send().with_ctx(|| format!("HTTP {method} {url} failed"))?;
let resp = req
.send()
.await
.with_ctx(|| format!("HTTP {method} {url} failed"))?;
let status = resp.status().as_u16();
if !resp.status().is_success() {
if ok_statuses.contains(&status) {
return Ok(None);
}
let err_text = resp.text().unwrap_or_default();
let err_text = resp.text().await.unwrap_or_default();
bail!("API error {status}: {err_text}");
}
let text = resp.text().unwrap_or_default();
let text = resp.text().await.unwrap_or_default();
if text.is_empty() {
return Ok(None);
}
@@ -120,14 +113,14 @@ fn api(
}
/// Shorthand: Kratos admin API call (prefix = "/admin").
fn kratos_api(
async fn kratos_api(
base_url: &str,
path: &str,
method: &str,
body: Option<&Value>,
ok_statuses: &[u16],
) -> Result<Option<Value>> {
api(base_url, path, method, body, "/admin", ok_statuses)
api(base_url, path, method, body, "/admin", ok_statuses).await
}
// ---------------------------------------------------------------------------
@@ -135,10 +128,10 @@ fn kratos_api(
// ---------------------------------------------------------------------------
/// Find identity by UUID or email search. Returns the identity JSON.
fn find_identity(base_url: &str, target: &str, required: bool) -> Result<Option<Value>> {
async fn find_identity(base_url: &str, target: &str, required: bool) -> Result<Option<Value>> {
// Looks like a UUID?
if target.len() == 36 && target.chars().filter(|&c| c == '-').count() == 4 {
let result = kratos_api(base_url, &format!("/identities/{target}"), "GET", None, &[])?;
let result = kratos_api(base_url, &format!("/identities/{target}"), "GET", None, &[]).await?;
return Ok(result);
}
@@ -149,7 +142,8 @@ fn find_identity(base_url: &str, target: &str, required: bool) -> Result<Option<
"GET",
None,
&[],
)?;
)
.await?;
if let Some(Value::Array(arr)) = &result {
if let Some(first) = arr.first() {
@@ -185,13 +179,13 @@ fn identity_put_body(identity: &Value, state: Option<&str>, extra: Option<Value>
}
/// Generate a 24h recovery code. Returns (link, code).
fn generate_recovery(base_url: &str, identity_id: &str) -> Result<(String, String)> {
async fn generate_recovery(base_url: &str, identity_id: &str) -> Result<(String, String)> {
let body = serde_json::json!({
"identity_id": identity_id,
"expires_in": "24h",
});
let result = kratos_api(base_url, "/recovery/code", "POST", Some(&body), &[])?;
let result = kratos_api(base_url, "/recovery/code", "POST", Some(&body), &[]).await?;
let recovery = result.unwrap_or_default();
let link = recovery
@@ -209,14 +203,15 @@ fn generate_recovery(base_url: &str, identity_id: &str) -> Result<(String, Strin
}
/// Find the next sequential employee ID by scanning all employee identities.
fn next_employee_id(base_url: &str) -> Result<String> {
async fn next_employee_id(base_url: &str) -> Result<String> {
let result = kratos_api(
base_url,
"/identities?page_size=200",
"GET",
None,
&[],
)?;
)
.await?;
let identities = match result {
Some(Value::Array(arr)) => arr,
@@ -300,12 +295,12 @@ fn identity_id(identity: &Value) -> Result<String> {
pub async fn cmd_user_list(search: &str) -> Result<()> {
step("Listing identities...");
let pf = PortForward::kratos()?;
let pf = PortForward::kratos().await?;
let mut path = "/identities?page_size=20".to_string();
if !search.is_empty() {
path.push_str(&format!("&credentials_identifier={search}"));
}
let result = kratos_api(&pf.base_url, &path, "GET", None, &[])?;
let result = kratos_api(&pf.base_url, &path, "GET", None, &[]).await?;
drop(pf);
let identities = match result {
@@ -343,8 +338,9 @@ pub async fn cmd_user_list(search: &str) -> Result<()> {
pub async fn cmd_user_get(target: &str) -> Result<()> {
step(&format!("Getting identity: {target}"));
let pf = PortForward::kratos()?;
let identity = find_identity(&pf.base_url, target, true)?
let pf = PortForward::kratos().await?;
let identity = find_identity(&pf.base_url, target, true)
.await?
.ok_or_else(|| SunbeamError::identity("Identity not found"))?;
drop(pf);
@@ -370,14 +366,15 @@ pub async fn cmd_user_create(email: &str, name: &str, schema_id: &str) -> Result
"state": "active",
});
let pf = PortForward::kratos()?;
let identity = kratos_api(&pf.base_url, "/identities", "POST", Some(&body), &[])?
let pf = PortForward::kratos().await?;
let identity = kratos_api(&pf.base_url, "/identities", "POST", Some(&body), &[])
.await?
.ok_or_else(|| SunbeamError::identity("Failed to create identity"))?;
let iid = identity_id(&identity)?;
ok(&format!("Created identity: {iid}"));
let (link, code) = generate_recovery(&pf.base_url, &iid)?;
let (link, code) = generate_recovery(&pf.base_url, &iid).await?;
drop(pf);
ok("Recovery link (valid 24h):");
@@ -399,8 +396,9 @@ pub async fn cmd_user_delete(target: &str) -> Result<()> {
return Ok(());
}
let pf = PortForward::kratos()?;
let identity = find_identity(&pf.base_url, target, true)?
let pf = PortForward::kratos().await?;
let identity = find_identity(&pf.base_url, target, true)
.await?
.ok_or_else(|| SunbeamError::identity("Identity not found"))?;
let iid = identity_id(&identity)?;
kratos_api(
@@ -409,7 +407,8 @@ pub async fn cmd_user_delete(target: &str) -> Result<()> {
"DELETE",
None,
&[],
)?;
)
.await?;
drop(pf);
ok("Deleted.");
@@ -419,11 +418,12 @@ pub async fn cmd_user_delete(target: &str) -> Result<()> {
pub async fn cmd_user_recover(target: &str) -> Result<()> {
step(&format!("Generating recovery link for: {target}"));
let pf = PortForward::kratos()?;
let identity = find_identity(&pf.base_url, target, true)?
let pf = PortForward::kratos().await?;
let identity = find_identity(&pf.base_url, target, true)
.await?
.ok_or_else(|| SunbeamError::identity("Identity not found"))?;
let iid = identity_id(&identity)?;
let (link, code) = generate_recovery(&pf.base_url, &iid)?;
let (link, code) = generate_recovery(&pf.base_url, &iid).await?;
drop(pf);
ok("Recovery link (valid 24h):");
@@ -436,8 +436,9 @@ pub async fn cmd_user_recover(target: &str) -> Result<()> {
pub async fn cmd_user_disable(target: &str) -> Result<()> {
step(&format!("Disabling identity: {target}"));
let pf = PortForward::kratos()?;
let identity = find_identity(&pf.base_url, target, true)?
let pf = PortForward::kratos().await?;
let identity = find_identity(&pf.base_url, target, true)
.await?
.ok_or_else(|| SunbeamError::identity("Identity not found"))?;
let iid = identity_id(&identity)?;
@@ -448,14 +449,16 @@ pub async fn cmd_user_disable(target: &str) -> Result<()> {
"PUT",
Some(&put_body),
&[],
)?;
)
.await?;
kratos_api(
&pf.base_url,
&format!("/identities/{iid}/sessions"),
"DELETE",
None,
&[],
)?;
)
.await?;
drop(pf);
ok(&format!(
@@ -469,8 +472,9 @@ pub async fn cmd_user_disable(target: &str) -> Result<()> {
pub async fn cmd_user_enable(target: &str) -> Result<()> {
step(&format!("Enabling identity: {target}"));
let pf = PortForward::kratos()?;
let identity = find_identity(&pf.base_url, target, true)?
let pf = PortForward::kratos().await?;
let identity = find_identity(&pf.base_url, target, true)
.await?
.ok_or_else(|| SunbeamError::identity("Identity not found"))?;
let iid = identity_id(&identity)?;
@@ -481,7 +485,8 @@ pub async fn cmd_user_enable(target: &str) -> Result<()> {
"PUT",
Some(&put_body),
&[],
)?;
)
.await?;
drop(pf);
ok(&format!("Identity {}... re-enabled.", short_id(&iid)));
@@ -491,8 +496,9 @@ pub async fn cmd_user_enable(target: &str) -> Result<()> {
pub async fn cmd_user_set_password(target: &str, password: &str) -> Result<()> {
step(&format!("Setting password for: {target}"));
let pf = PortForward::kratos()?;
let identity = find_identity(&pf.base_url, target, true)?
let pf = PortForward::kratos().await?;
let identity = find_identity(&pf.base_url, target, true)
.await?
.ok_or_else(|| SunbeamError::identity("Identity not found"))?;
let iid = identity_id(&identity)?;
@@ -512,7 +518,8 @@ pub async fn cmd_user_set_password(target: &str, password: &str) -> Result<()> {
"PUT",
Some(&put_body),
&[],
)?;
)
.await?;
drop(pf);
ok(&format!("Password set for {}...", short_id(&iid)));
@@ -524,7 +531,7 @@ pub async fn cmd_user_set_password(target: &str, password: &str) -> Result<()> {
// ---------------------------------------------------------------------------
/// Send a welcome email via cluster Postfix (port-forward to svc/postfix in lasuite).
fn send_welcome_email(
async fn send_welcome_email(
domain: &str,
email: &str,
name: &str,
@@ -589,15 +596,19 @@ Messages (Matrix):
.body(body_text)
.ctx("Failed to build email message")?;
let _pf = PortForward::new("lasuite", "postfix", SMTP_LOCAL_PORT, 25)?;
let _pf = PortForward::new("lasuite", "postfix", SMTP_LOCAL_PORT, 25).await?;
let mailer = SmtpTransport::builder_dangerous("localhost")
.port(SMTP_LOCAL_PORT)
.build();
tokio::task::spawn_blocking(move || {
mailer
.send(&message)
.ctx("Failed to send welcome email via SMTP")?;
.map_err(|e| SunbeamError::Other(format!("Failed to send welcome email via SMTP: {e}")))
})
.await
.map_err(|e| SunbeamError::Other(format!("Email send task panicked: {e}")))??;
ok(&format!("Welcome email sent to {email}"));
Ok(())
@@ -618,16 +629,16 @@ pub async fn cmd_user_onboard(
) -> Result<()> {
step(&format!("Onboarding: {email}"));
let pf = PortForward::kratos()?;
let pf = PortForward::kratos().await?;
let (iid, recovery_link, recovery_code) = {
let existing = find_identity(&pf.base_url, email, false)?;
let existing = find_identity(&pf.base_url, email, false).await?;
if let Some(existing) = existing {
let iid = identity_id(&existing)?;
warn(&format!("Identity already exists: {}...", short_id(&iid)));
step("Generating fresh recovery link...");
let (link, code) = generate_recovery(&pf.base_url, &iid)?;
let (link, code) = generate_recovery(&pf.base_url, &iid).await?;
(iid, link, code)
} else {
let mut traits = serde_json::json!({ "email": email });
@@ -640,7 +651,7 @@ pub async fn cmd_user_onboard(
let mut employee_id = String::new();
if schema_id == "employee" {
employee_id = next_employee_id(&pf.base_url)?;
employee_id = next_employee_id(&pf.base_url).await?;
traits["employee_id"] = Value::String(employee_id.clone());
if !job_title.is_empty() {
traits["job_title"] = Value::String(job_title.to_string());
@@ -670,7 +681,8 @@ pub async fn cmd_user_onboard(
}],
});
let identity = kratos_api(&pf.base_url, "/identities", "POST", Some(&body), &[])?
let identity = kratos_api(&pf.base_url, "/identities", "POST", Some(&body), &[])
.await?
.ok_or_else(|| SunbeamError::identity("Failed to create identity"))?;
let iid = identity_id(&identity)?;
@@ -690,9 +702,10 @@ pub async fn cmd_user_onboard(
"PATCH",
Some(&patch_body),
&[],
)?;
)
.await?;
let (link, code) = generate_recovery(&pf.base_url, &iid)?;
let (link, code) = generate_recovery(&pf.base_url, &iid).await?;
(iid, link, code)
}
};
@@ -702,7 +715,7 @@ pub async fn cmd_user_onboard(
if send_email {
let domain = crate::kube::get_domain().await?;
let recipient = if notify.is_empty() { email } else { notify };
send_welcome_email(&domain, recipient, name, &recovery_link, &recovery_code)?;
send_welcome_email(&domain, recipient, name, &recovery_link, &recovery_code).await?;
}
ok(&format!("Identity ID: {iid}"));
@@ -729,8 +742,9 @@ pub async fn cmd_user_offboard(target: &str) -> Result<()> {
return Ok(());
}
let pf = PortForward::kratos()?;
let identity = find_identity(&pf.base_url, target, true)?
let pf = PortForward::kratos().await?;
let identity = find_identity(&pf.base_url, target, true)
.await?
.ok_or_else(|| SunbeamError::identity("Identity not found"))?;
let iid = identity_id(&identity)?;
@@ -742,7 +756,8 @@ pub async fn cmd_user_offboard(target: &str) -> Result<()> {
"PUT",
Some(&put_body),
&[],
)?;
)
.await?;
ok(&format!("Identity {}... disabled.", short_id(&iid)));
step("Revoking Kratos sessions...");
@@ -752,12 +767,13 @@ pub async fn cmd_user_offboard(target: &str) -> Result<()> {
"DELETE",
None,
&[404],
)?;
)
.await?;
ok("Kratos sessions revoked.");
step("Revoking Hydra consent sessions...");
{
let hydra_pf = PortForward::new("ory", "hydra-admin", 14445, 4445)?;
let hydra_pf = PortForward::new("ory", "hydra-admin", 14445, 4445).await?;
api(
&hydra_pf.base_url,
&format!("/oauth2/auth/sessions/consent?subject={iid}&all=true"),
@@ -765,7 +781,8 @@ pub async fn cmd_user_offboard(target: &str) -> Result<()> {
None,
"/admin",
&[404],
)?;
)
.await?;
}
ok("Hydra consent sessions revoked.");