fix: rewrite users.rs to fully async (was blocking tokio runtime)
Replace all blocking I/O with async equivalents: - tokio::process::Command instead of std::process::Command - tokio::time::sleep instead of std::thread::sleep - reqwest::Client (async) instead of reqwest::blocking::Client - All helper functions (api, find_identity, generate_recovery, etc.) now async - PortForward::Drop uses start_kill() (sync SIGKILL) for cleanup - send_welcome_email wrapped in spawn_blocking for lettre sync transport
This commit is contained in:
203
src/users.rs
203
src/users.rs
@@ -12,57 +12,47 @@ const SMTP_LOCAL_PORT: u16 = 10025;
|
||||
// Port-forward helper
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Spawn a kubectl port-forward process and return (child, base_url).
|
||||
/// The caller **must** kill the child when done.
|
||||
fn spawn_port_forward(
|
||||
ns: &str,
|
||||
svc: &str,
|
||||
local_port: u16,
|
||||
remote_port: u16,
|
||||
) -> Result<(std::process::Child, String)> {
|
||||
let ctx = crate::kube::context();
|
||||
let child = std::process::Command::new("kubectl")
|
||||
.arg(format!("--context={ctx}"))
|
||||
.args([
|
||||
"-n",
|
||||
ns,
|
||||
"port-forward",
|
||||
&format!("svc/{svc}"),
|
||||
&format!("{local_port}:{remote_port}"),
|
||||
])
|
||||
.stdout(std::process::Stdio::piped())
|
||||
.stderr(std::process::Stdio::piped())
|
||||
.spawn()
|
||||
.with_ctx(|| format!("Failed to spawn port-forward to {ns}/svc/{svc}"))?;
|
||||
|
||||
// Give the port-forward time to bind
|
||||
std::thread::sleep(std::time::Duration::from_millis(1500));
|
||||
|
||||
Ok((child, format!("http://localhost:{local_port}")))
|
||||
}
|
||||
|
||||
/// RAII guard that terminates the port-forward on drop.
|
||||
struct PortForward {
|
||||
child: std::process::Child,
|
||||
child: tokio::process::Child,
|
||||
pub base_url: String,
|
||||
}
|
||||
|
||||
impl PortForward {
|
||||
fn new(ns: &str, svc: &str, local_port: u16, remote_port: u16) -> Result<Self> {
|
||||
let (child, base_url) = spawn_port_forward(ns, svc, local_port, remote_port)?;
|
||||
Ok(Self { child, base_url })
|
||||
async fn new(ns: &str, svc: &str, local_port: u16, remote_port: u16) -> Result<Self> {
|
||||
let ctx = crate::kube::context();
|
||||
let child = tokio::process::Command::new("kubectl")
|
||||
.arg(format!("--context={ctx}"))
|
||||
.args([
|
||||
"-n",
|
||||
ns,
|
||||
"port-forward",
|
||||
&format!("svc/{svc}"),
|
||||
&format!("{local_port}:{remote_port}"),
|
||||
])
|
||||
.stdout(std::process::Stdio::piped())
|
||||
.stderr(std::process::Stdio::piped())
|
||||
.spawn()
|
||||
.with_ctx(|| format!("Failed to spawn port-forward to {ns}/svc/{svc}"))?;
|
||||
|
||||
// Give the port-forward time to bind
|
||||
tokio::time::sleep(std::time::Duration::from_millis(1500)).await;
|
||||
|
||||
Ok(Self {
|
||||
child,
|
||||
base_url: format!("http://localhost:{local_port}"),
|
||||
})
|
||||
}
|
||||
|
||||
/// Convenience: Kratos admin (ory/kratos-admin 80 -> 4434).
|
||||
fn kratos() -> Result<Self> {
|
||||
Self::new("ory", "kratos-admin", 4434, 80)
|
||||
async fn kratos() -> Result<Self> {
|
||||
Self::new("ory", "kratos-admin", 4434, 80).await
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PortForward {
|
||||
fn drop(&mut self) {
|
||||
let _ = self.child.kill();
|
||||
let _ = self.child.wait();
|
||||
let _ = self.child.start_kill();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -71,7 +61,7 @@ impl Drop for PortForward {
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Make an HTTP request to an admin API endpoint.
|
||||
fn api(
|
||||
async fn api(
|
||||
base_url: &str,
|
||||
path: &str,
|
||||
method: &str,
|
||||
@@ -80,7 +70,7 @@ fn api(
|
||||
ok_statuses: &[u16],
|
||||
) -> Result<Option<Value>> {
|
||||
let url = format!("{base_url}{prefix}{path}");
|
||||
let client = reqwest::blocking::Client::new();
|
||||
let client = reqwest::Client::new();
|
||||
|
||||
let mut req = match method {
|
||||
"GET" => client.get(&url),
|
||||
@@ -99,18 +89,21 @@ fn api(
|
||||
req = req.json(b);
|
||||
}
|
||||
|
||||
let resp = req.send().with_ctx(|| format!("HTTP {method} {url} failed"))?;
|
||||
let resp = req
|
||||
.send()
|
||||
.await
|
||||
.with_ctx(|| format!("HTTP {method} {url} failed"))?;
|
||||
let status = resp.status().as_u16();
|
||||
|
||||
if !resp.status().is_success() {
|
||||
if ok_statuses.contains(&status) {
|
||||
return Ok(None);
|
||||
}
|
||||
let err_text = resp.text().unwrap_or_default();
|
||||
let err_text = resp.text().await.unwrap_or_default();
|
||||
bail!("API error {status}: {err_text}");
|
||||
}
|
||||
|
||||
let text = resp.text().unwrap_or_default();
|
||||
let text = resp.text().await.unwrap_or_default();
|
||||
if text.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
@@ -120,14 +113,14 @@ fn api(
|
||||
}
|
||||
|
||||
/// Shorthand: Kratos admin API call (prefix = "/admin").
|
||||
fn kratos_api(
|
||||
async fn kratos_api(
|
||||
base_url: &str,
|
||||
path: &str,
|
||||
method: &str,
|
||||
body: Option<&Value>,
|
||||
ok_statuses: &[u16],
|
||||
) -> Result<Option<Value>> {
|
||||
api(base_url, path, method, body, "/admin", ok_statuses)
|
||||
api(base_url, path, method, body, "/admin", ok_statuses).await
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -135,10 +128,10 @@ fn kratos_api(
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Find identity by UUID or email search. Returns the identity JSON.
|
||||
fn find_identity(base_url: &str, target: &str, required: bool) -> Result<Option<Value>> {
|
||||
async fn find_identity(base_url: &str, target: &str, required: bool) -> Result<Option<Value>> {
|
||||
// Looks like a UUID?
|
||||
if target.len() == 36 && target.chars().filter(|&c| c == '-').count() == 4 {
|
||||
let result = kratos_api(base_url, &format!("/identities/{target}"), "GET", None, &[])?;
|
||||
let result = kratos_api(base_url, &format!("/identities/{target}"), "GET", None, &[]).await?;
|
||||
return Ok(result);
|
||||
}
|
||||
|
||||
@@ -149,7 +142,8 @@ fn find_identity(base_url: &str, target: &str, required: bool) -> Result<Option<
|
||||
"GET",
|
||||
None,
|
||||
&[],
|
||||
)?;
|
||||
)
|
||||
.await?;
|
||||
|
||||
if let Some(Value::Array(arr)) = &result {
|
||||
if let Some(first) = arr.first() {
|
||||
@@ -185,13 +179,13 @@ fn identity_put_body(identity: &Value, state: Option<&str>, extra: Option<Value>
|
||||
}
|
||||
|
||||
/// Generate a 24h recovery code. Returns (link, code).
|
||||
fn generate_recovery(base_url: &str, identity_id: &str) -> Result<(String, String)> {
|
||||
async fn generate_recovery(base_url: &str, identity_id: &str) -> Result<(String, String)> {
|
||||
let body = serde_json::json!({
|
||||
"identity_id": identity_id,
|
||||
"expires_in": "24h",
|
||||
});
|
||||
|
||||
let result = kratos_api(base_url, "/recovery/code", "POST", Some(&body), &[])?;
|
||||
let result = kratos_api(base_url, "/recovery/code", "POST", Some(&body), &[]).await?;
|
||||
|
||||
let recovery = result.unwrap_or_default();
|
||||
let link = recovery
|
||||
@@ -209,14 +203,15 @@ fn generate_recovery(base_url: &str, identity_id: &str) -> Result<(String, Strin
|
||||
}
|
||||
|
||||
/// Find the next sequential employee ID by scanning all employee identities.
|
||||
fn next_employee_id(base_url: &str) -> Result<String> {
|
||||
async fn next_employee_id(base_url: &str) -> Result<String> {
|
||||
let result = kratos_api(
|
||||
base_url,
|
||||
"/identities?page_size=200",
|
||||
"GET",
|
||||
None,
|
||||
&[],
|
||||
)?;
|
||||
)
|
||||
.await?;
|
||||
|
||||
let identities = match result {
|
||||
Some(Value::Array(arr)) => arr,
|
||||
@@ -300,12 +295,12 @@ fn identity_id(identity: &Value) -> Result<String> {
|
||||
pub async fn cmd_user_list(search: &str) -> Result<()> {
|
||||
step("Listing identities...");
|
||||
|
||||
let pf = PortForward::kratos()?;
|
||||
let pf = PortForward::kratos().await?;
|
||||
let mut path = "/identities?page_size=20".to_string();
|
||||
if !search.is_empty() {
|
||||
path.push_str(&format!("&credentials_identifier={search}"));
|
||||
}
|
||||
let result = kratos_api(&pf.base_url, &path, "GET", None, &[])?;
|
||||
let result = kratos_api(&pf.base_url, &path, "GET", None, &[]).await?;
|
||||
drop(pf);
|
||||
|
||||
let identities = match result {
|
||||
@@ -343,8 +338,9 @@ pub async fn cmd_user_list(search: &str) -> Result<()> {
|
||||
pub async fn cmd_user_get(target: &str) -> Result<()> {
|
||||
step(&format!("Getting identity: {target}"));
|
||||
|
||||
let pf = PortForward::kratos()?;
|
||||
let identity = find_identity(&pf.base_url, target, true)?
|
||||
let pf = PortForward::kratos().await?;
|
||||
let identity = find_identity(&pf.base_url, target, true)
|
||||
.await?
|
||||
.ok_or_else(|| SunbeamError::identity("Identity not found"))?;
|
||||
drop(pf);
|
||||
|
||||
@@ -370,14 +366,15 @@ pub async fn cmd_user_create(email: &str, name: &str, schema_id: &str) -> Result
|
||||
"state": "active",
|
||||
});
|
||||
|
||||
let pf = PortForward::kratos()?;
|
||||
let identity = kratos_api(&pf.base_url, "/identities", "POST", Some(&body), &[])?
|
||||
let pf = PortForward::kratos().await?;
|
||||
let identity = kratos_api(&pf.base_url, "/identities", "POST", Some(&body), &[])
|
||||
.await?
|
||||
.ok_or_else(|| SunbeamError::identity("Failed to create identity"))?;
|
||||
|
||||
let iid = identity_id(&identity)?;
|
||||
ok(&format!("Created identity: {iid}"));
|
||||
|
||||
let (link, code) = generate_recovery(&pf.base_url, &iid)?;
|
||||
let (link, code) = generate_recovery(&pf.base_url, &iid).await?;
|
||||
drop(pf);
|
||||
|
||||
ok("Recovery link (valid 24h):");
|
||||
@@ -399,8 +396,9 @@ pub async fn cmd_user_delete(target: &str) -> Result<()> {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let pf = PortForward::kratos()?;
|
||||
let identity = find_identity(&pf.base_url, target, true)?
|
||||
let pf = PortForward::kratos().await?;
|
||||
let identity = find_identity(&pf.base_url, target, true)
|
||||
.await?
|
||||
.ok_or_else(|| SunbeamError::identity("Identity not found"))?;
|
||||
let iid = identity_id(&identity)?;
|
||||
kratos_api(
|
||||
@@ -409,7 +407,8 @@ pub async fn cmd_user_delete(target: &str) -> Result<()> {
|
||||
"DELETE",
|
||||
None,
|
||||
&[],
|
||||
)?;
|
||||
)
|
||||
.await?;
|
||||
drop(pf);
|
||||
|
||||
ok("Deleted.");
|
||||
@@ -419,11 +418,12 @@ pub async fn cmd_user_delete(target: &str) -> Result<()> {
|
||||
pub async fn cmd_user_recover(target: &str) -> Result<()> {
|
||||
step(&format!("Generating recovery link for: {target}"));
|
||||
|
||||
let pf = PortForward::kratos()?;
|
||||
let identity = find_identity(&pf.base_url, target, true)?
|
||||
let pf = PortForward::kratos().await?;
|
||||
let identity = find_identity(&pf.base_url, target, true)
|
||||
.await?
|
||||
.ok_or_else(|| SunbeamError::identity("Identity not found"))?;
|
||||
let iid = identity_id(&identity)?;
|
||||
let (link, code) = generate_recovery(&pf.base_url, &iid)?;
|
||||
let (link, code) = generate_recovery(&pf.base_url, &iid).await?;
|
||||
drop(pf);
|
||||
|
||||
ok("Recovery link (valid 24h):");
|
||||
@@ -436,8 +436,9 @@ pub async fn cmd_user_recover(target: &str) -> Result<()> {
|
||||
pub async fn cmd_user_disable(target: &str) -> Result<()> {
|
||||
step(&format!("Disabling identity: {target}"));
|
||||
|
||||
let pf = PortForward::kratos()?;
|
||||
let identity = find_identity(&pf.base_url, target, true)?
|
||||
let pf = PortForward::kratos().await?;
|
||||
let identity = find_identity(&pf.base_url, target, true)
|
||||
.await?
|
||||
.ok_or_else(|| SunbeamError::identity("Identity not found"))?;
|
||||
let iid = identity_id(&identity)?;
|
||||
|
||||
@@ -448,14 +449,16 @@ pub async fn cmd_user_disable(target: &str) -> Result<()> {
|
||||
"PUT",
|
||||
Some(&put_body),
|
||||
&[],
|
||||
)?;
|
||||
)
|
||||
.await?;
|
||||
kratos_api(
|
||||
&pf.base_url,
|
||||
&format!("/identities/{iid}/sessions"),
|
||||
"DELETE",
|
||||
None,
|
||||
&[],
|
||||
)?;
|
||||
)
|
||||
.await?;
|
||||
drop(pf);
|
||||
|
||||
ok(&format!(
|
||||
@@ -469,8 +472,9 @@ pub async fn cmd_user_disable(target: &str) -> Result<()> {
|
||||
pub async fn cmd_user_enable(target: &str) -> Result<()> {
|
||||
step(&format!("Enabling identity: {target}"));
|
||||
|
||||
let pf = PortForward::kratos()?;
|
||||
let identity = find_identity(&pf.base_url, target, true)?
|
||||
let pf = PortForward::kratos().await?;
|
||||
let identity = find_identity(&pf.base_url, target, true)
|
||||
.await?
|
||||
.ok_or_else(|| SunbeamError::identity("Identity not found"))?;
|
||||
let iid = identity_id(&identity)?;
|
||||
|
||||
@@ -481,7 +485,8 @@ pub async fn cmd_user_enable(target: &str) -> Result<()> {
|
||||
"PUT",
|
||||
Some(&put_body),
|
||||
&[],
|
||||
)?;
|
||||
)
|
||||
.await?;
|
||||
drop(pf);
|
||||
|
||||
ok(&format!("Identity {}... re-enabled.", short_id(&iid)));
|
||||
@@ -491,8 +496,9 @@ pub async fn cmd_user_enable(target: &str) -> Result<()> {
|
||||
pub async fn cmd_user_set_password(target: &str, password: &str) -> Result<()> {
|
||||
step(&format!("Setting password for: {target}"));
|
||||
|
||||
let pf = PortForward::kratos()?;
|
||||
let identity = find_identity(&pf.base_url, target, true)?
|
||||
let pf = PortForward::kratos().await?;
|
||||
let identity = find_identity(&pf.base_url, target, true)
|
||||
.await?
|
||||
.ok_or_else(|| SunbeamError::identity("Identity not found"))?;
|
||||
let iid = identity_id(&identity)?;
|
||||
|
||||
@@ -512,7 +518,8 @@ pub async fn cmd_user_set_password(target: &str, password: &str) -> Result<()> {
|
||||
"PUT",
|
||||
Some(&put_body),
|
||||
&[],
|
||||
)?;
|
||||
)
|
||||
.await?;
|
||||
drop(pf);
|
||||
|
||||
ok(&format!("Password set for {}...", short_id(&iid)));
|
||||
@@ -524,7 +531,7 @@ pub async fn cmd_user_set_password(target: &str, password: &str) -> Result<()> {
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Send a welcome email via cluster Postfix (port-forward to svc/postfix in lasuite).
|
||||
fn send_welcome_email(
|
||||
async fn send_welcome_email(
|
||||
domain: &str,
|
||||
email: &str,
|
||||
name: &str,
|
||||
@@ -589,15 +596,19 @@ Messages (Matrix):
|
||||
.body(body_text)
|
||||
.ctx("Failed to build email message")?;
|
||||
|
||||
let _pf = PortForward::new("lasuite", "postfix", SMTP_LOCAL_PORT, 25)?;
|
||||
let _pf = PortForward::new("lasuite", "postfix", SMTP_LOCAL_PORT, 25).await?;
|
||||
|
||||
let mailer = SmtpTransport::builder_dangerous("localhost")
|
||||
.port(SMTP_LOCAL_PORT)
|
||||
.build();
|
||||
|
||||
mailer
|
||||
.send(&message)
|
||||
.ctx("Failed to send welcome email via SMTP")?;
|
||||
tokio::task::spawn_blocking(move || {
|
||||
mailer
|
||||
.send(&message)
|
||||
.map_err(|e| SunbeamError::Other(format!("Failed to send welcome email via SMTP: {e}")))
|
||||
})
|
||||
.await
|
||||
.map_err(|e| SunbeamError::Other(format!("Email send task panicked: {e}")))??;
|
||||
|
||||
ok(&format!("Welcome email sent to {email}"));
|
||||
Ok(())
|
||||
@@ -618,16 +629,16 @@ pub async fn cmd_user_onboard(
|
||||
) -> Result<()> {
|
||||
step(&format!("Onboarding: {email}"));
|
||||
|
||||
let pf = PortForward::kratos()?;
|
||||
let pf = PortForward::kratos().await?;
|
||||
|
||||
let (iid, recovery_link, recovery_code) = {
|
||||
let existing = find_identity(&pf.base_url, email, false)?;
|
||||
let existing = find_identity(&pf.base_url, email, false).await?;
|
||||
|
||||
if let Some(existing) = existing {
|
||||
let iid = identity_id(&existing)?;
|
||||
warn(&format!("Identity already exists: {}...", short_id(&iid)));
|
||||
step("Generating fresh recovery link...");
|
||||
let (link, code) = generate_recovery(&pf.base_url, &iid)?;
|
||||
let (link, code) = generate_recovery(&pf.base_url, &iid).await?;
|
||||
(iid, link, code)
|
||||
} else {
|
||||
let mut traits = serde_json::json!({ "email": email });
|
||||
@@ -640,7 +651,7 @@ pub async fn cmd_user_onboard(
|
||||
|
||||
let mut employee_id = String::new();
|
||||
if schema_id == "employee" {
|
||||
employee_id = next_employee_id(&pf.base_url)?;
|
||||
employee_id = next_employee_id(&pf.base_url).await?;
|
||||
traits["employee_id"] = Value::String(employee_id.clone());
|
||||
if !job_title.is_empty() {
|
||||
traits["job_title"] = Value::String(job_title.to_string());
|
||||
@@ -670,7 +681,8 @@ pub async fn cmd_user_onboard(
|
||||
}],
|
||||
});
|
||||
|
||||
let identity = kratos_api(&pf.base_url, "/identities", "POST", Some(&body), &[])?
|
||||
let identity = kratos_api(&pf.base_url, "/identities", "POST", Some(&body), &[])
|
||||
.await?
|
||||
.ok_or_else(|| SunbeamError::identity("Failed to create identity"))?;
|
||||
|
||||
let iid = identity_id(&identity)?;
|
||||
@@ -690,9 +702,10 @@ pub async fn cmd_user_onboard(
|
||||
"PATCH",
|
||||
Some(&patch_body),
|
||||
&[],
|
||||
)?;
|
||||
)
|
||||
.await?;
|
||||
|
||||
let (link, code) = generate_recovery(&pf.base_url, &iid)?;
|
||||
let (link, code) = generate_recovery(&pf.base_url, &iid).await?;
|
||||
(iid, link, code)
|
||||
}
|
||||
};
|
||||
@@ -702,7 +715,7 @@ pub async fn cmd_user_onboard(
|
||||
if send_email {
|
||||
let domain = crate::kube::get_domain().await?;
|
||||
let recipient = if notify.is_empty() { email } else { notify };
|
||||
send_welcome_email(&domain, recipient, name, &recovery_link, &recovery_code)?;
|
||||
send_welcome_email(&domain, recipient, name, &recovery_link, &recovery_code).await?;
|
||||
}
|
||||
|
||||
ok(&format!("Identity ID: {iid}"));
|
||||
@@ -729,8 +742,9 @@ pub async fn cmd_user_offboard(target: &str) -> Result<()> {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let pf = PortForward::kratos()?;
|
||||
let identity = find_identity(&pf.base_url, target, true)?
|
||||
let pf = PortForward::kratos().await?;
|
||||
let identity = find_identity(&pf.base_url, target, true)
|
||||
.await?
|
||||
.ok_or_else(|| SunbeamError::identity("Identity not found"))?;
|
||||
let iid = identity_id(&identity)?;
|
||||
|
||||
@@ -742,7 +756,8 @@ pub async fn cmd_user_offboard(target: &str) -> Result<()> {
|
||||
"PUT",
|
||||
Some(&put_body),
|
||||
&[],
|
||||
)?;
|
||||
)
|
||||
.await?;
|
||||
ok(&format!("Identity {}... disabled.", short_id(&iid)));
|
||||
|
||||
step("Revoking Kratos sessions...");
|
||||
@@ -752,12 +767,13 @@ pub async fn cmd_user_offboard(target: &str) -> Result<()> {
|
||||
"DELETE",
|
||||
None,
|
||||
&[404],
|
||||
)?;
|
||||
)
|
||||
.await?;
|
||||
ok("Kratos sessions revoked.");
|
||||
|
||||
step("Revoking Hydra consent sessions...");
|
||||
{
|
||||
let hydra_pf = PortForward::new("ory", "hydra-admin", 14445, 4445)?;
|
||||
let hydra_pf = PortForward::new("ory", "hydra-admin", 14445, 4445).await?;
|
||||
api(
|
||||
&hydra_pf.base_url,
|
||||
&format!("/oauth2/auth/sessions/consent?subject={iid}&all=true"),
|
||||
@@ -765,7 +781,8 @@ pub async fn cmd_user_offboard(target: &str) -> Result<()> {
|
||||
None,
|
||||
"/admin",
|
||||
&[404],
|
||||
)?;
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
ok("Hydra consent sessions revoked.");
|
||||
|
||||
|
||||
Reference in New Issue
Block a user