feat: add kratos-admin-ui build target and user management commands

- images.py: add 'kratos-admin' build target (deno task build →
  docker buildx → containerd pre-seed → rollout restart)
- secrets.py: seed kratos-admin-ui secrets (cookie, csrf, admin identity);
  fix _seed_kratos_admin_identity to return (recovery_link, recovery_code)
  and print both in cmd_seed output
- users.py: new module with cmd_user_{list,get,create,delete,recover}
  via port-forwarded kratos-admin API
- cli.py: add 'user' verb dispatching to users.py subcommands
- tools.py: minor tool resolution updates
This commit is contained in:
2026-03-03 11:32:09 +00:00
parent b917aa3ce9
commit 14dd685398
5 changed files with 439 additions and 17 deletions

View File

@@ -50,8 +50,8 @@ def main() -> None:
# sunbeam build <what> # sunbeam build <what>
p_build = sub.add_parser("build", help="Build and push an artifact") p_build = sub.add_parser("build", help="Build and push an artifact")
p_build.add_argument("what", choices=["proxy"], p_build.add_argument("what", choices=["proxy", "kratos-admin"],
help="What to build (proxy)") help="What to build (proxy, kratos-admin)")
# sunbeam check [ns[/name]] # sunbeam check [ns[/name]]
p_check = sub.add_parser("check", help="Functional service health checks") p_check = sub.add_parser("check", help="Functional service health checks")
@@ -69,6 +69,32 @@ def main() -> None:
p_k8s.add_argument("kubectl_args", nargs=argparse.REMAINDER, p_k8s.add_argument("kubectl_args", nargs=argparse.REMAINDER,
help="arguments forwarded verbatim to kubectl") help="arguments forwarded verbatim to kubectl")
# sunbeam bao [bao args...] — bao CLI inside OpenBao pod with root token injected
p_bao = sub.add_parser("bao", help="bao CLI passthrough (runs inside OpenBao pod with root token)")
p_bao.add_argument("bao_args", nargs=argparse.REMAINDER,
help="arguments forwarded verbatim to bao")
# sunbeam user <action> [args]
p_user = sub.add_parser("user", help="User/identity management")
user_sub = p_user.add_subparsers(dest="user_action", metavar="action")
p_user_list = user_sub.add_parser("list", help="List identities")
p_user_list.add_argument("--search", default="", help="Filter by email")
p_user_get = user_sub.add_parser("get", help="Get identity by email or ID")
p_user_get.add_argument("target", help="Email or identity ID")
p_user_create = user_sub.add_parser("create", help="Create identity")
p_user_create.add_argument("email", help="Email address")
p_user_create.add_argument("--name", default="", help="Display name")
p_user_create.add_argument("--schema", default="default", help="Schema ID")
p_user_delete = user_sub.add_parser("delete", help="Delete identity")
p_user_delete.add_argument("target", help="Email or identity ID")
p_user_recover = user_sub.add_parser("recover", help="Generate recovery link")
p_user_recover.add_argument("target", help="Email or identity ID")
args = parser.parse_args() args = parser.parse_args()
if args.verb is None: if args.verb is None:
@@ -132,6 +158,28 @@ def main() -> None:
from sunbeam.kube import cmd_k8s from sunbeam.kube import cmd_k8s
sys.exit(cmd_k8s(args.kubectl_args)) sys.exit(cmd_k8s(args.kubectl_args))
elif args.verb == "bao":
from sunbeam.kube import cmd_bao
sys.exit(cmd_bao(args.bao_args))
elif args.verb == "user":
from sunbeam.users import (cmd_user_list, cmd_user_get, cmd_user_create,
cmd_user_delete, cmd_user_recover)
action = getattr(args, "user_action", None)
if action is None:
p_user.print_help()
sys.exit(0)
elif action == "list":
cmd_user_list(search=args.search)
elif action == "get":
cmd_user_get(args.target)
elif action == "create":
cmd_user_create(args.email, name=args.name, schema_id=args.schema)
elif action == "delete":
cmd_user_delete(args.target)
elif action == "recover":
cmd_user_recover(args.target)
else: else:
parser.print_help() parser.print_help()
sys.exit(1) sys.exit(1)

View File

@@ -266,10 +266,16 @@ def _trust_registry_in_docker_vm(registry: str):
def cmd_build(what: str): def cmd_build(what: str):
"""Build and push an image. Currently only supports 'proxy'.""" """Build and push an image. Supports 'proxy' and 'kratos-admin'."""
if what != "proxy": if what == "proxy":
_build_proxy()
elif what == "kratos-admin":
_build_kratos_admin()
else:
die(f"Unknown build target: {what}") die(f"Unknown build target: {what}")
def _build_proxy():
ip = get_lima_ip() ip = get_lima_ip()
domain = f"{ip}.sslip.io" domain = f"{ip}.sslip.io"
@@ -314,13 +320,113 @@ def cmd_build(what: str):
ok(f"Pushed {image}") ok(f"Pushed {image}")
# On single-node clusters, pre-seed the image directly into k3s containerd.
# This breaks the circular dependency: when the proxy restarts, Pingora goes
# down before the new pod starts, making the Gitea registry (behind Pingora)
# unreachable for the image pull. By importing into containerd first,
# imagePullPolicy: IfNotPresent means k8s never needs to contact the registry.
nodes = kube_out("get", "nodes", "-o=jsonpath={.items[*].metadata.name}").split()
if len(nodes) == 1:
ok("Single-node cluster: pre-seeding image into k3s containerd...")
save = subprocess.Popen(
["docker", "save", image],
stdout=subprocess.PIPE,
)
ctr = subprocess.run(
["limactl", "shell", LIMA_VM, "--",
"sudo", "ctr", "-n", "k8s.io", "images", "import", "-"],
stdin=save.stdout,
capture_output=True,
)
save.stdout.close()
save.wait()
if ctr.returncode != 0:
warn(f"containerd import failed (will fall back to registry pull):\n"
f"{ctr.stderr.decode().strip()}")
else:
ok("Image pre-seeded.")
# Apply manifests so the Deployment spec reflects the Gitea image ref. # Apply manifests so the Deployment spec reflects the Gitea image ref.
from sunbeam.manifests import cmd_apply from sunbeam.manifests import cmd_apply
cmd_apply() cmd_apply()
# Roll the pingora pod -- imagePullPolicy: Always ensures it pulls fresh. # Roll the pingora pod.
ok("Rolling pingora deployment...") ok("Rolling pingora deployment...")
kube("rollout", "restart", "deployment/pingora", "-n", "ingress") kube("rollout", "restart", "deployment/pingora", "-n", "ingress")
kube("rollout", "status", "deployment/pingora", "-n", "ingress", kube("rollout", "status", "deployment/pingora", "-n", "ingress",
"--timeout=120s") "--timeout=120s")
ok("Pingora redeployed.") ok("Pingora redeployed.")
def _build_kratos_admin():
ip = get_lima_ip()
domain = f"{ip}.sslip.io"
b64 = kube_out("-n", "devtools", "get", "secret",
"gitea-admin-credentials", "-o=jsonpath={.data.password}")
if not b64:
die("gitea-admin-credentials secret not found -- run seed first.")
admin_pass = base64.b64decode(b64).decode()
if not shutil.which("docker"):
die("docker not found -- is the Lima docker VM running?")
# kratos-admin source
kratos_admin_dir = Path(__file__).resolve().parents[2] / "kratos-admin"
if not kratos_admin_dir.is_dir():
die(f"kratos-admin source not found at {kratos_admin_dir}")
registry = f"src.{domain}"
image = f"{registry}/studio/kratos-admin-ui:latest"
step(f"Building kratos-admin-ui -> {image} ...")
_trust_registry_in_docker_vm(registry)
ok("Logging in to Gitea registry...")
r = subprocess.run(
["docker", "login", registry,
"--username", GITEA_ADMIN_USER, "--password-stdin"],
input=admin_pass, text=True, capture_output=True,
)
if r.returncode != 0:
die(f"docker login failed:\n{r.stderr.strip()}")
ok("Building image (linux/arm64, push)...")
_run(["docker", "buildx", "build",
"--platform", "linux/arm64",
"--push",
"-t", image,
str(kratos_admin_dir)])
ok(f"Pushed {image}")
# Pre-seed into k3s containerd (same pattern as proxy)
nodes = kube_out("get", "nodes", "-o=jsonpath={.items[*].metadata.name}").split()
if len(nodes) == 1:
ok("Single-node cluster: pre-seeding image into k3s containerd...")
save = subprocess.Popen(
["docker", "save", image],
stdout=subprocess.PIPE,
)
ctr = subprocess.run(
["limactl", "shell", LIMA_VM, "--",
"sudo", "ctr", "-n", "k8s.io", "images", "import", "-"],
stdin=save.stdout,
capture_output=True,
)
save.stdout.close()
save.wait()
if ctr.returncode != 0:
warn(f"containerd import failed:\n{ctr.stderr.decode().strip()}")
else:
ok("Image pre-seeded.")
from sunbeam.manifests import cmd_apply
cmd_apply()
ok("Rolling kratos-admin-ui deployment...")
kube("rollout", "restart", "deployment/kratos-admin-ui", "-n", "ory")
kube("rollout", "status", "deployment/kratos-admin-ui", "-n", "ory",
"--timeout=120s")
ok("kratos-admin-ui redeployed.")

View File

@@ -4,11 +4,16 @@ import json
import secrets as _secrets import secrets as _secrets
import subprocess import subprocess
import time import time
import urllib.error
import urllib.request
from contextlib import contextmanager
from pathlib import Path from pathlib import Path
from sunbeam.kube import kube, kube_out, kube_ok, kube_apply, ensure_ns, create_secret from sunbeam.kube import kube, kube_out, kube_ok, kube_apply, ensure_ns, create_secret, get_domain
from sunbeam.output import step, ok, warn, die from sunbeam.output import step, ok, warn, die
ADMIN_USERNAME = "estudio-admin"
LIMA_VM = "sunbeam" LIMA_VM = "sunbeam"
GITEA_ADMIN_USER = "gitea_admin" GITEA_ADMIN_USER = "gitea_admin"
PG_USERS = [ PG_USERS = [
@@ -160,6 +165,11 @@ def _seed_openbao() -> dict:
**{"cookie-secret": rand, **{"cookie-secret": rand,
"csrf-cookie-secret": rand}) "csrf-cookie-secret": rand})
kratos_admin = get_or_create("kratos-admin",
**{"cookie-secret": rand,
"csrf-cookie-secret": rand,
"admin-identity-ids": lambda: ""})
# Write all secrets to KV (idempotent -- puts same values back) # Write all secrets to KV (idempotent -- puts same values back)
bao(f"BAO_ADDR=http://127.0.0.1:8200 BAO_TOKEN='{root_token}' sh -c '" bao(f"BAO_ADDR=http://127.0.0.1:8200 BAO_TOKEN='{root_token}' sh -c '"
f"bao kv put secret/hydra system-secret=\"{hydra['system-secret']}\" cookie-secret=\"{hydra['cookie-secret']}\" pairwise-salt=\"{hydra['pairwise-salt']}\" && " f"bao kv put secret/hydra system-secret=\"{hydra['system-secret']}\" cookie-secret=\"{hydra['cookie-secret']}\" pairwise-salt=\"{hydra['pairwise-salt']}\" && "
@@ -169,7 +179,8 @@ def _seed_openbao() -> dict:
f"bao kv put secret/hive oidc-client-id=\"{hive['oidc-client-id']}\" oidc-client-secret=\"{hive['oidc-client-secret']}\" && " f"bao kv put secret/hive oidc-client-id=\"{hive['oidc-client-id']}\" oidc-client-secret=\"{hive['oidc-client-secret']}\" && "
f"bao kv put secret/livekit api-key=\"{livekit['api-key']}\" api-secret=\"{livekit['api-secret']}\" && " f"bao kv put secret/livekit api-key=\"{livekit['api-key']}\" api-secret=\"{livekit['api-secret']}\" && "
f"bao kv put secret/people django-secret-key=\"{people['django-secret-key']}\" && " f"bao kv put secret/people django-secret-key=\"{people['django-secret-key']}\" && "
f"bao kv put secret/login-ui cookie-secret=\"{login_ui['cookie-secret']}\" csrf-cookie-secret=\"{login_ui['csrf-cookie-secret']}\"" f"bao kv put secret/login-ui cookie-secret=\"{login_ui['cookie-secret']}\" csrf-cookie-secret=\"{login_ui['csrf-cookie-secret']}\" && "
f"bao kv put secret/kratos-admin cookie-secret=\"{kratos_admin['cookie-secret']}\" csrf-cookie-secret=\"{kratos_admin['csrf-cookie-secret']}\" admin-identity-ids=\"{kratos_admin['admin-identity-ids']}\""
f"'") f"'")
# Configure Kubernetes auth method so VSO can authenticate with OpenBao # Configure Kubernetes auth method so VSO can authenticate with OpenBao
@@ -210,6 +221,7 @@ def _seed_openbao() -> dict:
"people-django-secret": people["django-secret-key"], "people-django-secret": people["django-secret-key"],
"livekit-api-key": livekit["api-key"], "livekit-api-key": livekit["api-key"],
"livekit-api-secret": livekit["api-secret"], "livekit-api-secret": livekit["api-secret"],
"kratos-admin-cookie-secret": kratos_admin["cookie-secret"],
"_ob_pod": ob_pod, "_ob_pod": ob_pod,
"_root_token": root_token, "_root_token": root_token,
} }
@@ -330,6 +342,93 @@ def _configure_db_engine(ob_pod, root_token, pg_user, pg_pass):
# cmd_seed — main entry point # cmd_seed — main entry point
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@contextmanager
def _kratos_admin_pf(local_port=14434):
"""Port-forward directly to the Kratos admin API."""
proc = subprocess.Popen(
["kubectl", *K8S_CTX, "-n", "ory", "port-forward",
"svc/kratos-admin", f"{local_port}:80"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
)
time.sleep(1.5)
try:
yield f"http://localhost:{local_port}"
finally:
proc.terminate()
proc.wait()
def _kratos_api(base, path, method="GET", body=None):
url = f"{base}/admin{path}"
data = json.dumps(body).encode() if body is not None else None
req = urllib.request.Request(
url, data=data,
headers={"Content-Type": "application/json", "Accept": "application/json"},
method=method,
)
try:
with urllib.request.urlopen(req) as resp:
raw = resp.read()
return json.loads(raw) if raw else None
except urllib.error.HTTPError as e:
raise RuntimeError(f"Kratos API {method} {url}{e.code}: {e.read().decode()}")
def _seed_kratos_admin_identity(ob_pod: str, root_token: str) -> tuple[str, str]:
"""Ensure estudio-admin@<domain> exists in Kratos and is the only admin identity.
Returns (recovery_link, recovery_code), or ("", "") if Kratos is unreachable.
Idempotent: if the identity already exists, skips creation and just returns
a fresh recovery link+code.
"""
domain = get_domain()
admin_email = f"{ADMIN_USERNAME}@{domain}"
ok(f"Ensuring Kratos admin identity ({admin_email})...")
try:
with _kratos_admin_pf() as base:
# Check if the identity already exists by searching by email
result = _kratos_api(base, f"/identities?credentials_identifier={admin_email}&page_size=1")
existing = result[0] if isinstance(result, list) and result else None
if existing:
identity_id = existing["id"]
ok(f" admin identity exists ({identity_id[:8]}...)")
else:
identity = _kratos_api(base, "/identities", method="POST", body={
"schema_id": "default",
"traits": {"email": admin_email},
"state": "active",
})
identity_id = identity["id"]
ok(f" created admin identity ({identity_id[:8]}...)")
# Generate fresh recovery code + link
recovery = _kratos_api(base, "/recovery/code", method="POST", body={
"identity_id": identity_id,
"expires_in": "24h",
})
recovery_link = recovery.get("recovery_link", "") if recovery else ""
recovery_code = recovery.get("recovery_code", "") if recovery else ""
except Exception as exc:
warn(f"Could not seed Kratos admin identity (Kratos may not be ready): {exc}")
return ("", "")
# Update admin-identity-ids in OpenBao KV so kratos-admin-ui enforces access
bao_env = f"BAO_ADDR=http://127.0.0.1:8200 BAO_TOKEN='{root_token}'"
def _bao(cmd):
return subprocess.run(
["kubectl", *K8S_CTX, "-n", "data", "exec", ob_pod, "-c", "openbao",
"--", "sh", "-c", cmd],
capture_output=True, text=True,
)
_bao(f"{bao_env} bao kv patch secret/kratos-admin admin-identity-ids=\"{admin_email}\"")
ok(f" ADMIN_IDENTITY_IDS set to {admin_email}")
return (recovery_link, recovery_code)
def cmd_seed() -> dict: def cmd_seed() -> dict:
"""Seed OpenBao KV with crypto-random credentials, then mirror to K8s Secrets. """Seed OpenBao KV with crypto-random credentials, then mirror to K8s Secrets.
@@ -452,6 +551,16 @@ def cmd_seed() -> dict:
ensure_ns("media") ensure_ns("media")
# Ensure the Kratos admin identity exists and ADMIN_IDENTITY_IDS is set.
# This runs after all other secrets are in place (Kratos must be up).
recovery_link, recovery_code = _seed_kratos_admin_identity(ob_pod, root_token)
if recovery_link:
ok("Admin recovery link (valid 24h):")
print(f" {recovery_link}")
if recovery_code:
ok("Admin recovery code (enter on the page above):")
print(f" {recovery_code}")
ok("All secrets seeded.") ok("All secrets seeded.")
return creds return creds

View File

@@ -20,15 +20,15 @@ TOOLS: dict[str, dict] = {
"sha256": "", # set to actual hash; empty = skip verify "sha256": "", # set to actual hash; empty = skip verify
}, },
"kustomize": { "kustomize": {
"version": "v5.6.0", "version": "v5.8.1",
"url": "https://github.com/kubernetes-sigs/kustomize/releases/download/kustomize%2Fv5.6.0/kustomize_v5.6.0_darwin_arm64.tar.gz", "url": "https://github.com/kubernetes-sigs/kustomize/releases/download/kustomize%2Fv5.8.1/kustomize_v5.8.1_darwin_arm64.tar.gz",
"sha256": "", "sha256": "",
"extract": "kustomize", "extract": "kustomize",
}, },
"helm": { "helm": {
"version": "v3.17.1", "version": "v4.1.0",
"url": "https://get.helm.sh/helm-v3.17.1-darwin-arm64.tar.gz", "url": "https://get.helm.sh/helm-v4.1.0-darwin-arm64.tar.gz",
"sha256": "", "sha256": "82f7065bf4e08d4c8d7881b85c0a080581ef4968a4ae6df4e7b432f8f7a88d0c",
"extract": "darwin-arm64/helm", "extract": "darwin-arm64/helm",
}, },
} }
@@ -43,21 +43,35 @@ def _sha256(path: Path) -> str:
def ensure_tool(name: str) -> Path: def ensure_tool(name: str) -> Path:
"""Return path to cached binary, downloading + verifying if needed.""" """Return path to cached binary, downloading + verifying if needed.
Re-downloads automatically when the pinned version in TOOLS changes.
A <name>.version sidecar file records the version of the cached binary.
"""
if name not in TOOLS: if name not in TOOLS:
raise ValueError(f"Unknown tool: {name}") raise ValueError(f"Unknown tool: {name}")
spec = TOOLS[name] spec = TOOLS[name]
CACHE_DIR.mkdir(parents=True, exist_ok=True) CACHE_DIR.mkdir(parents=True, exist_ok=True)
dest = CACHE_DIR / name dest = CACHE_DIR / name
version_file = CACHE_DIR / f"{name}.version"
expected_sha = spec.get("sha256", "") expected_sha = spec.get("sha256", "")
expected_version = spec.get("version", "")
# Use cached binary if it exists and passes SHA check # Use cached binary if version matches (or no version pinned) and SHA passes
if dest.exists(): if dest.exists():
if not expected_sha or _sha256(dest) == expected_sha: version_ok = (
not expected_version
or (version_file.exists() and version_file.read_text().strip() == expected_version)
)
sha_ok = not expected_sha or _sha256(dest) == expected_sha
if version_ok and sha_ok:
return dest return dest
# SHA mismatch — re-download # Version mismatch or SHA mismatch — re-download
if dest.exists():
dest.unlink() dest.unlink()
if version_file.exists():
version_file.unlink()
# Download # Download
url = spec["url"] url = spec["url"]
@@ -88,6 +102,8 @@ def ensure_tool(name: str) -> Path:
# Make executable # Make executable
dest.chmod(dest.stat().st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) dest.chmod(dest.stat().st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
# Record version so future calls skip re-download when version unchanged
version_file.write_text(expected_version)
return dest return dest
@@ -102,6 +118,7 @@ def run_tool(name: str, *args, **kwargs) -> subprocess.CompletedProcess:
env = os.environ.copy() env = os.environ.copy()
# kustomize needs helm on PATH for helm chart rendering # kustomize needs helm on PATH for helm chart rendering
if name == "kustomize": if name == "kustomize":
if "helm" in TOOLS:
ensure_tool("helm") # ensure bundled helm is present before kustomize runs ensure_tool("helm") # ensure bundled helm is present before kustomize runs
env["PATH"] = str(CACHE_DIR) + os.pathsep + env.get("PATH", "") env["PATH"] = str(CACHE_DIR) + os.pathsep + env.get("PATH", "")
return subprocess.run([str(bin_path), *args], env=env, **kwargs) return subprocess.run([str(bin_path), *args], env=env, **kwargs)

142
sunbeam/users.py Normal file
View File

@@ -0,0 +1,142 @@
"""User management — Kratos identity operations via port-forwarded admin API."""
import json
import subprocess
import sys
import time
import urllib.request
import urllib.error
from contextlib import contextmanager
from sunbeam.output import step, ok, warn, die, table
K8S_CTX = ["--context=sunbeam"]
@contextmanager
def _port_forward(ns="ory", svc="kratos-admin", local_port=4434, remote_port=80):
"""Port-forward directly to the Kratos admin HTTP API and yield the local URL."""
proc = subprocess.Popen(
["kubectl", *K8S_CTX, "-n", ns, "port-forward",
f"svc/{svc}", f"{local_port}:{remote_port}"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
)
# Wait for port-forward to be ready
time.sleep(1.5)
try:
yield f"http://localhost:{local_port}"
finally:
proc.terminate()
proc.wait()
def _api(base_url, path, method="GET", body=None):
"""Make a request to the Kratos admin API via port-forward."""
url = f"{base_url}/admin{path}"
data = json.dumps(body).encode() if body is not None else None
headers = {"Content-Type": "application/json", "Accept": "application/json"}
req = urllib.request.Request(url, data=data, headers=headers, method=method)
try:
with urllib.request.urlopen(req) as resp:
body = resp.read()
return json.loads(body) if body else None
except urllib.error.HTTPError as e:
body_text = e.read().decode()
die(f"API error {e.code}: {body_text}")
def _find_identity(base_url, target):
"""Find identity by email or ID. Returns identity dict."""
# Try as ID first
if len(target) == 36 and target.count("-") == 4:
return _api(base_url, f"/identities/{target}")
# Search by email
result = _api(base_url, f"/identities?credentials_identifier={target}&page_size=1")
if isinstance(result, list) and result:
return result[0]
die(f"Identity not found: {target}")
def cmd_user_list(search=""):
step("Listing identities...")
with _port_forward() as base:
path = f"/identities?page_size=20"
if search:
path += f"&credentials_identifier={search}"
identities = _api(base, path)
rows = []
for i in identities or []:
traits = i.get("traits", {})
email = traits.get("email", "")
name = traits.get("name", {})
if isinstance(name, dict):
display_name = f"{name.get('first', '')} {name.get('last', '')}".strip()
else:
display_name = str(name) if name else ""
rows.append([i["id"][:8] + "...", email, display_name, i.get("state", "active")])
table(["ID", "Email", "Name", "State"], rows)
def cmd_user_get(target):
step(f"Getting identity: {target}")
with _port_forward() as base:
identity = _find_identity(base, target)
print(json.dumps(identity, indent=2))
def cmd_user_create(email, name="", schema_id="default"):
step(f"Creating identity: {email}")
traits = {"email": email}
if name:
parts = name.split(" ", 1)
traits["name"] = {"first": parts[0], "last": parts[1] if len(parts) > 1 else ""}
body = {
"schema_id": schema_id,
"traits": traits,
"state": "active",
}
with _port_forward() as base:
identity = _api(base, "/identities", method="POST", body=body)
ok(f"Created identity: {identity['id']}")
# Generate recovery code (link is deprecated in Kratos v1.x)
recovery = _api(base, "/recovery/code", method="POST", body={
"identity_id": identity["id"],
"expires_in": "24h",
})
ok("Recovery link (valid 24h):")
print(recovery.get("recovery_link", ""))
ok("Recovery code (enter on the page above):")
print(recovery.get("recovery_code", ""))
def cmd_user_delete(target):
step(f"Deleting identity: {target}")
confirm = input(f"Delete identity '{target}'? This cannot be undone. [y/N] ").strip().lower()
if confirm != "y":
ok("Cancelled.")
return
with _port_forward() as base:
identity = _find_identity(base, target)
_api(base, f"/identities/{identity['id']}", method="DELETE")
ok(f"Deleted.")
def cmd_user_recover(target):
step(f"Generating recovery link for: {target}")
with _port_forward() as base:
identity = _find_identity(base, target)
recovery = _api(base, "/recovery/code", method="POST", body={
"identity_id": identity["id"],
"expires_in": "24h",
})
ok("Recovery link (valid 24h):")
print(recovery.get("recovery_link", ""))
ok("Recovery code (enter on the page above):")
print(recovery.get("recovery_code", ""))