feat: add kratos-admin-ui build target and user management commands
- images.py: add 'kratos-admin' build target (deno task build →
docker buildx → containerd pre-seed → rollout restart)
- secrets.py: seed kratos-admin-ui secrets (cookie, csrf, admin identity);
fix _seed_kratos_admin_identity to return (recovery_link, recovery_code)
and print both in cmd_seed output
- users.py: new module with cmd_user_{list,get,create,delete,recover}
via port-forwarded kratos-admin API
- cli.py: add 'user' verb dispatching to users.py subcommands
- tools.py: minor tool resolution updates
This commit is contained in:
@@ -50,8 +50,8 @@ def main() -> None:
|
||||
|
||||
# sunbeam build <what>
|
||||
p_build = sub.add_parser("build", help="Build and push an artifact")
|
||||
p_build.add_argument("what", choices=["proxy"],
|
||||
help="What to build (proxy)")
|
||||
p_build.add_argument("what", choices=["proxy", "kratos-admin"],
|
||||
help="What to build (proxy, kratos-admin)")
|
||||
|
||||
# sunbeam check [ns[/name]]
|
||||
p_check = sub.add_parser("check", help="Functional service health checks")
|
||||
@@ -69,6 +69,32 @@ def main() -> None:
|
||||
p_k8s.add_argument("kubectl_args", nargs=argparse.REMAINDER,
|
||||
help="arguments forwarded verbatim to kubectl")
|
||||
|
||||
# sunbeam bao [bao args...] — bao CLI inside OpenBao pod with root token injected
|
||||
p_bao = sub.add_parser("bao", help="bao CLI passthrough (runs inside OpenBao pod with root token)")
|
||||
p_bao.add_argument("bao_args", nargs=argparse.REMAINDER,
|
||||
help="arguments forwarded verbatim to bao")
|
||||
|
||||
# sunbeam user <action> [args]
|
||||
p_user = sub.add_parser("user", help="User/identity management")
|
||||
user_sub = p_user.add_subparsers(dest="user_action", metavar="action")
|
||||
|
||||
p_user_list = user_sub.add_parser("list", help="List identities")
|
||||
p_user_list.add_argument("--search", default="", help="Filter by email")
|
||||
|
||||
p_user_get = user_sub.add_parser("get", help="Get identity by email or ID")
|
||||
p_user_get.add_argument("target", help="Email or identity ID")
|
||||
|
||||
p_user_create = user_sub.add_parser("create", help="Create identity")
|
||||
p_user_create.add_argument("email", help="Email address")
|
||||
p_user_create.add_argument("--name", default="", help="Display name")
|
||||
p_user_create.add_argument("--schema", default="default", help="Schema ID")
|
||||
|
||||
p_user_delete = user_sub.add_parser("delete", help="Delete identity")
|
||||
p_user_delete.add_argument("target", help="Email or identity ID")
|
||||
|
||||
p_user_recover = user_sub.add_parser("recover", help="Generate recovery link")
|
||||
p_user_recover.add_argument("target", help="Email or identity ID")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.verb is None:
|
||||
@@ -132,6 +158,28 @@ def main() -> None:
|
||||
from sunbeam.kube import cmd_k8s
|
||||
sys.exit(cmd_k8s(args.kubectl_args))
|
||||
|
||||
elif args.verb == "bao":
|
||||
from sunbeam.kube import cmd_bao
|
||||
sys.exit(cmd_bao(args.bao_args))
|
||||
|
||||
elif args.verb == "user":
|
||||
from sunbeam.users import (cmd_user_list, cmd_user_get, cmd_user_create,
|
||||
cmd_user_delete, cmd_user_recover)
|
||||
action = getattr(args, "user_action", None)
|
||||
if action is None:
|
||||
p_user.print_help()
|
||||
sys.exit(0)
|
||||
elif action == "list":
|
||||
cmd_user_list(search=args.search)
|
||||
elif action == "get":
|
||||
cmd_user_get(args.target)
|
||||
elif action == "create":
|
||||
cmd_user_create(args.email, name=args.name, schema_id=args.schema)
|
||||
elif action == "delete":
|
||||
cmd_user_delete(args.target)
|
||||
elif action == "recover":
|
||||
cmd_user_recover(args.target)
|
||||
|
||||
else:
|
||||
parser.print_help()
|
||||
sys.exit(1)
|
||||
|
||||
@@ -266,10 +266,16 @@ def _trust_registry_in_docker_vm(registry: str):
|
||||
|
||||
|
||||
def cmd_build(what: str):
|
||||
"""Build and push an image. Currently only supports 'proxy'."""
|
||||
if what != "proxy":
|
||||
"""Build and push an image. Supports 'proxy' and 'kratos-admin'."""
|
||||
if what == "proxy":
|
||||
_build_proxy()
|
||||
elif what == "kratos-admin":
|
||||
_build_kratos_admin()
|
||||
else:
|
||||
die(f"Unknown build target: {what}")
|
||||
|
||||
|
||||
def _build_proxy():
|
||||
ip = get_lima_ip()
|
||||
domain = f"{ip}.sslip.io"
|
||||
|
||||
@@ -314,13 +320,113 @@ def cmd_build(what: str):
|
||||
|
||||
ok(f"Pushed {image}")
|
||||
|
||||
# On single-node clusters, pre-seed the image directly into k3s containerd.
|
||||
# This breaks the circular dependency: when the proxy restarts, Pingora goes
|
||||
# down before the new pod starts, making the Gitea registry (behind Pingora)
|
||||
# unreachable for the image pull. By importing into containerd first,
|
||||
# imagePullPolicy: IfNotPresent means k8s never needs to contact the registry.
|
||||
nodes = kube_out("get", "nodes", "-o=jsonpath={.items[*].metadata.name}").split()
|
||||
if len(nodes) == 1:
|
||||
ok("Single-node cluster: pre-seeding image into k3s containerd...")
|
||||
save = subprocess.Popen(
|
||||
["docker", "save", image],
|
||||
stdout=subprocess.PIPE,
|
||||
)
|
||||
ctr = subprocess.run(
|
||||
["limactl", "shell", LIMA_VM, "--",
|
||||
"sudo", "ctr", "-n", "k8s.io", "images", "import", "-"],
|
||||
stdin=save.stdout,
|
||||
capture_output=True,
|
||||
)
|
||||
save.stdout.close()
|
||||
save.wait()
|
||||
if ctr.returncode != 0:
|
||||
warn(f"containerd import failed (will fall back to registry pull):\n"
|
||||
f"{ctr.stderr.decode().strip()}")
|
||||
else:
|
||||
ok("Image pre-seeded.")
|
||||
|
||||
# Apply manifests so the Deployment spec reflects the Gitea image ref.
|
||||
from sunbeam.manifests import cmd_apply
|
||||
cmd_apply()
|
||||
|
||||
# Roll the pingora pod -- imagePullPolicy: Always ensures it pulls fresh.
|
||||
# Roll the pingora pod.
|
||||
ok("Rolling pingora deployment...")
|
||||
kube("rollout", "restart", "deployment/pingora", "-n", "ingress")
|
||||
kube("rollout", "status", "deployment/pingora", "-n", "ingress",
|
||||
"--timeout=120s")
|
||||
ok("Pingora redeployed.")
|
||||
|
||||
|
||||
def _build_kratos_admin():
|
||||
ip = get_lima_ip()
|
||||
domain = f"{ip}.sslip.io"
|
||||
|
||||
b64 = kube_out("-n", "devtools", "get", "secret",
|
||||
"gitea-admin-credentials", "-o=jsonpath={.data.password}")
|
||||
if not b64:
|
||||
die("gitea-admin-credentials secret not found -- run seed first.")
|
||||
admin_pass = base64.b64decode(b64).decode()
|
||||
|
||||
if not shutil.which("docker"):
|
||||
die("docker not found -- is the Lima docker VM running?")
|
||||
|
||||
# kratos-admin source
|
||||
kratos_admin_dir = Path(__file__).resolve().parents[2] / "kratos-admin"
|
||||
if not kratos_admin_dir.is_dir():
|
||||
die(f"kratos-admin source not found at {kratos_admin_dir}")
|
||||
|
||||
registry = f"src.{domain}"
|
||||
image = f"{registry}/studio/kratos-admin-ui:latest"
|
||||
|
||||
step(f"Building kratos-admin-ui -> {image} ...")
|
||||
|
||||
_trust_registry_in_docker_vm(registry)
|
||||
|
||||
ok("Logging in to Gitea registry...")
|
||||
r = subprocess.run(
|
||||
["docker", "login", registry,
|
||||
"--username", GITEA_ADMIN_USER, "--password-stdin"],
|
||||
input=admin_pass, text=True, capture_output=True,
|
||||
)
|
||||
if r.returncode != 0:
|
||||
die(f"docker login failed:\n{r.stderr.strip()}")
|
||||
|
||||
ok("Building image (linux/arm64, push)...")
|
||||
_run(["docker", "buildx", "build",
|
||||
"--platform", "linux/arm64",
|
||||
"--push",
|
||||
"-t", image,
|
||||
str(kratos_admin_dir)])
|
||||
|
||||
ok(f"Pushed {image}")
|
||||
|
||||
# Pre-seed into k3s containerd (same pattern as proxy)
|
||||
nodes = kube_out("get", "nodes", "-o=jsonpath={.items[*].metadata.name}").split()
|
||||
if len(nodes) == 1:
|
||||
ok("Single-node cluster: pre-seeding image into k3s containerd...")
|
||||
save = subprocess.Popen(
|
||||
["docker", "save", image],
|
||||
stdout=subprocess.PIPE,
|
||||
)
|
||||
ctr = subprocess.run(
|
||||
["limactl", "shell", LIMA_VM, "--",
|
||||
"sudo", "ctr", "-n", "k8s.io", "images", "import", "-"],
|
||||
stdin=save.stdout,
|
||||
capture_output=True,
|
||||
)
|
||||
save.stdout.close()
|
||||
save.wait()
|
||||
if ctr.returncode != 0:
|
||||
warn(f"containerd import failed:\n{ctr.stderr.decode().strip()}")
|
||||
else:
|
||||
ok("Image pre-seeded.")
|
||||
|
||||
from sunbeam.manifests import cmd_apply
|
||||
cmd_apply()
|
||||
|
||||
ok("Rolling kratos-admin-ui deployment...")
|
||||
kube("rollout", "restart", "deployment/kratos-admin-ui", "-n", "ory")
|
||||
kube("rollout", "status", "deployment/kratos-admin-ui", "-n", "ory",
|
||||
"--timeout=120s")
|
||||
ok("kratos-admin-ui redeployed.")
|
||||
|
||||
@@ -4,11 +4,16 @@ import json
|
||||
import secrets as _secrets
|
||||
import subprocess
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
from contextlib import contextmanager
|
||||
from pathlib import Path
|
||||
|
||||
from sunbeam.kube import kube, kube_out, kube_ok, kube_apply, ensure_ns, create_secret
|
||||
from sunbeam.kube import kube, kube_out, kube_ok, kube_apply, ensure_ns, create_secret, get_domain
|
||||
from sunbeam.output import step, ok, warn, die
|
||||
|
||||
ADMIN_USERNAME = "estudio-admin"
|
||||
|
||||
LIMA_VM = "sunbeam"
|
||||
GITEA_ADMIN_USER = "gitea_admin"
|
||||
PG_USERS = [
|
||||
@@ -160,6 +165,11 @@ def _seed_openbao() -> dict:
|
||||
**{"cookie-secret": rand,
|
||||
"csrf-cookie-secret": rand})
|
||||
|
||||
kratos_admin = get_or_create("kratos-admin",
|
||||
**{"cookie-secret": rand,
|
||||
"csrf-cookie-secret": rand,
|
||||
"admin-identity-ids": lambda: ""})
|
||||
|
||||
# Write all secrets to KV (idempotent -- puts same values back)
|
||||
bao(f"BAO_ADDR=http://127.0.0.1:8200 BAO_TOKEN='{root_token}' sh -c '"
|
||||
f"bao kv put secret/hydra system-secret=\"{hydra['system-secret']}\" cookie-secret=\"{hydra['cookie-secret']}\" pairwise-salt=\"{hydra['pairwise-salt']}\" && "
|
||||
@@ -169,7 +179,8 @@ def _seed_openbao() -> dict:
|
||||
f"bao kv put secret/hive oidc-client-id=\"{hive['oidc-client-id']}\" oidc-client-secret=\"{hive['oidc-client-secret']}\" && "
|
||||
f"bao kv put secret/livekit api-key=\"{livekit['api-key']}\" api-secret=\"{livekit['api-secret']}\" && "
|
||||
f"bao kv put secret/people django-secret-key=\"{people['django-secret-key']}\" && "
|
||||
f"bao kv put secret/login-ui cookie-secret=\"{login_ui['cookie-secret']}\" csrf-cookie-secret=\"{login_ui['csrf-cookie-secret']}\""
|
||||
f"bao kv put secret/login-ui cookie-secret=\"{login_ui['cookie-secret']}\" csrf-cookie-secret=\"{login_ui['csrf-cookie-secret']}\" && "
|
||||
f"bao kv put secret/kratos-admin cookie-secret=\"{kratos_admin['cookie-secret']}\" csrf-cookie-secret=\"{kratos_admin['csrf-cookie-secret']}\" admin-identity-ids=\"{kratos_admin['admin-identity-ids']}\""
|
||||
f"'")
|
||||
|
||||
# Configure Kubernetes auth method so VSO can authenticate with OpenBao
|
||||
@@ -210,6 +221,7 @@ def _seed_openbao() -> dict:
|
||||
"people-django-secret": people["django-secret-key"],
|
||||
"livekit-api-key": livekit["api-key"],
|
||||
"livekit-api-secret": livekit["api-secret"],
|
||||
"kratos-admin-cookie-secret": kratos_admin["cookie-secret"],
|
||||
"_ob_pod": ob_pod,
|
||||
"_root_token": root_token,
|
||||
}
|
||||
@@ -330,6 +342,93 @@ def _configure_db_engine(ob_pod, root_token, pg_user, pg_pass):
|
||||
# cmd_seed — main entry point
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@contextmanager
|
||||
def _kratos_admin_pf(local_port=14434):
|
||||
"""Port-forward directly to the Kratos admin API."""
|
||||
proc = subprocess.Popen(
|
||||
["kubectl", *K8S_CTX, "-n", "ory", "port-forward",
|
||||
"svc/kratos-admin", f"{local_port}:80"],
|
||||
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
||||
)
|
||||
time.sleep(1.5)
|
||||
try:
|
||||
yield f"http://localhost:{local_port}"
|
||||
finally:
|
||||
proc.terminate()
|
||||
proc.wait()
|
||||
|
||||
|
||||
def _kratos_api(base, path, method="GET", body=None):
|
||||
url = f"{base}/admin{path}"
|
||||
data = json.dumps(body).encode() if body is not None else None
|
||||
req = urllib.request.Request(
|
||||
url, data=data,
|
||||
headers={"Content-Type": "application/json", "Accept": "application/json"},
|
||||
method=method,
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(req) as resp:
|
||||
raw = resp.read()
|
||||
return json.loads(raw) if raw else None
|
||||
except urllib.error.HTTPError as e:
|
||||
raise RuntimeError(f"Kratos API {method} {url} → {e.code}: {e.read().decode()}")
|
||||
|
||||
|
||||
def _seed_kratos_admin_identity(ob_pod: str, root_token: str) -> tuple[str, str]:
|
||||
"""Ensure estudio-admin@<domain> exists in Kratos and is the only admin identity.
|
||||
|
||||
Returns (recovery_link, recovery_code), or ("", "") if Kratos is unreachable.
|
||||
Idempotent: if the identity already exists, skips creation and just returns
|
||||
a fresh recovery link+code.
|
||||
"""
|
||||
domain = get_domain()
|
||||
admin_email = f"{ADMIN_USERNAME}@{domain}"
|
||||
|
||||
ok(f"Ensuring Kratos admin identity ({admin_email})...")
|
||||
try:
|
||||
with _kratos_admin_pf() as base:
|
||||
# Check if the identity already exists by searching by email
|
||||
result = _kratos_api(base, f"/identities?credentials_identifier={admin_email}&page_size=1")
|
||||
existing = result[0] if isinstance(result, list) and result else None
|
||||
|
||||
if existing:
|
||||
identity_id = existing["id"]
|
||||
ok(f" admin identity exists ({identity_id[:8]}...)")
|
||||
else:
|
||||
identity = _kratos_api(base, "/identities", method="POST", body={
|
||||
"schema_id": "default",
|
||||
"traits": {"email": admin_email},
|
||||
"state": "active",
|
||||
})
|
||||
identity_id = identity["id"]
|
||||
ok(f" created admin identity ({identity_id[:8]}...)")
|
||||
|
||||
# Generate fresh recovery code + link
|
||||
recovery = _kratos_api(base, "/recovery/code", method="POST", body={
|
||||
"identity_id": identity_id,
|
||||
"expires_in": "24h",
|
||||
})
|
||||
recovery_link = recovery.get("recovery_link", "") if recovery else ""
|
||||
recovery_code = recovery.get("recovery_code", "") if recovery else ""
|
||||
except Exception as exc:
|
||||
warn(f"Could not seed Kratos admin identity (Kratos may not be ready): {exc}")
|
||||
return ("", "")
|
||||
|
||||
# Update admin-identity-ids in OpenBao KV so kratos-admin-ui enforces access
|
||||
bao_env = f"BAO_ADDR=http://127.0.0.1:8200 BAO_TOKEN='{root_token}'"
|
||||
|
||||
def _bao(cmd):
|
||||
return subprocess.run(
|
||||
["kubectl", *K8S_CTX, "-n", "data", "exec", ob_pod, "-c", "openbao",
|
||||
"--", "sh", "-c", cmd],
|
||||
capture_output=True, text=True,
|
||||
)
|
||||
|
||||
_bao(f"{bao_env} bao kv patch secret/kratos-admin admin-identity-ids=\"{admin_email}\"")
|
||||
ok(f" ADMIN_IDENTITY_IDS set to {admin_email}")
|
||||
return (recovery_link, recovery_code)
|
||||
|
||||
|
||||
def cmd_seed() -> dict:
|
||||
"""Seed OpenBao KV with crypto-random credentials, then mirror to K8s Secrets.
|
||||
|
||||
@@ -452,6 +551,16 @@ def cmd_seed() -> dict:
|
||||
|
||||
ensure_ns("media")
|
||||
|
||||
# Ensure the Kratos admin identity exists and ADMIN_IDENTITY_IDS is set.
|
||||
# This runs after all other secrets are in place (Kratos must be up).
|
||||
recovery_link, recovery_code = _seed_kratos_admin_identity(ob_pod, root_token)
|
||||
if recovery_link:
|
||||
ok("Admin recovery link (valid 24h):")
|
||||
print(f" {recovery_link}")
|
||||
if recovery_code:
|
||||
ok("Admin recovery code (enter on the page above):")
|
||||
print(f" {recovery_code}")
|
||||
|
||||
ok("All secrets seeded.")
|
||||
return creds
|
||||
|
||||
|
||||
@@ -20,15 +20,15 @@ TOOLS: dict[str, dict] = {
|
||||
"sha256": "", # set to actual hash; empty = skip verify
|
||||
},
|
||||
"kustomize": {
|
||||
"version": "v5.6.0",
|
||||
"url": "https://github.com/kubernetes-sigs/kustomize/releases/download/kustomize%2Fv5.6.0/kustomize_v5.6.0_darwin_arm64.tar.gz",
|
||||
"version": "v5.8.1",
|
||||
"url": "https://github.com/kubernetes-sigs/kustomize/releases/download/kustomize%2Fv5.8.1/kustomize_v5.8.1_darwin_arm64.tar.gz",
|
||||
"sha256": "",
|
||||
"extract": "kustomize",
|
||||
},
|
||||
"helm": {
|
||||
"version": "v3.17.1",
|
||||
"url": "https://get.helm.sh/helm-v3.17.1-darwin-arm64.tar.gz",
|
||||
"sha256": "",
|
||||
"version": "v4.1.0",
|
||||
"url": "https://get.helm.sh/helm-v4.1.0-darwin-arm64.tar.gz",
|
||||
"sha256": "82f7065bf4e08d4c8d7881b85c0a080581ef4968a4ae6df4e7b432f8f7a88d0c",
|
||||
"extract": "darwin-arm64/helm",
|
||||
},
|
||||
}
|
||||
@@ -43,21 +43,35 @@ def _sha256(path: Path) -> str:
|
||||
|
||||
|
||||
def ensure_tool(name: str) -> Path:
|
||||
"""Return path to cached binary, downloading + verifying if needed."""
|
||||
"""Return path to cached binary, downloading + verifying if needed.
|
||||
|
||||
Re-downloads automatically when the pinned version in TOOLS changes.
|
||||
A <name>.version sidecar file records the version of the cached binary.
|
||||
"""
|
||||
if name not in TOOLS:
|
||||
raise ValueError(f"Unknown tool: {name}")
|
||||
spec = TOOLS[name]
|
||||
CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
||||
dest = CACHE_DIR / name
|
||||
version_file = CACHE_DIR / f"{name}.version"
|
||||
|
||||
expected_sha = spec.get("sha256", "")
|
||||
expected_version = spec.get("version", "")
|
||||
|
||||
# Use cached binary if it exists and passes SHA check
|
||||
# Use cached binary if version matches (or no version pinned) and SHA passes
|
||||
if dest.exists():
|
||||
if not expected_sha or _sha256(dest) == expected_sha:
|
||||
version_ok = (
|
||||
not expected_version
|
||||
or (version_file.exists() and version_file.read_text().strip() == expected_version)
|
||||
)
|
||||
sha_ok = not expected_sha or _sha256(dest) == expected_sha
|
||||
if version_ok and sha_ok:
|
||||
return dest
|
||||
# SHA mismatch — re-download
|
||||
# Version mismatch or SHA mismatch — re-download
|
||||
if dest.exists():
|
||||
dest.unlink()
|
||||
if version_file.exists():
|
||||
version_file.unlink()
|
||||
|
||||
# Download
|
||||
url = spec["url"]
|
||||
@@ -88,6 +102,8 @@ def ensure_tool(name: str) -> Path:
|
||||
|
||||
# Make executable
|
||||
dest.chmod(dest.stat().st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
|
||||
# Record version so future calls skip re-download when version unchanged
|
||||
version_file.write_text(expected_version)
|
||||
return dest
|
||||
|
||||
|
||||
@@ -102,6 +118,7 @@ def run_tool(name: str, *args, **kwargs) -> subprocess.CompletedProcess:
|
||||
env = os.environ.copy()
|
||||
# kustomize needs helm on PATH for helm chart rendering
|
||||
if name == "kustomize":
|
||||
if "helm" in TOOLS:
|
||||
ensure_tool("helm") # ensure bundled helm is present before kustomize runs
|
||||
env["PATH"] = str(CACHE_DIR) + os.pathsep + env.get("PATH", "")
|
||||
return subprocess.run([str(bin_path), *args], env=env, **kwargs)
|
||||
|
||||
142
sunbeam/users.py
Normal file
142
sunbeam/users.py
Normal file
@@ -0,0 +1,142 @@
|
||||
"""User management — Kratos identity operations via port-forwarded admin API."""
|
||||
import json
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
from contextlib import contextmanager
|
||||
|
||||
from sunbeam.output import step, ok, warn, die, table
|
||||
|
||||
K8S_CTX = ["--context=sunbeam"]
|
||||
|
||||
|
||||
@contextmanager
|
||||
def _port_forward(ns="ory", svc="kratos-admin", local_port=4434, remote_port=80):
|
||||
"""Port-forward directly to the Kratos admin HTTP API and yield the local URL."""
|
||||
proc = subprocess.Popen(
|
||||
["kubectl", *K8S_CTX, "-n", ns, "port-forward",
|
||||
f"svc/{svc}", f"{local_port}:{remote_port}"],
|
||||
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
||||
)
|
||||
# Wait for port-forward to be ready
|
||||
time.sleep(1.5)
|
||||
try:
|
||||
yield f"http://localhost:{local_port}"
|
||||
finally:
|
||||
proc.terminate()
|
||||
proc.wait()
|
||||
|
||||
|
||||
def _api(base_url, path, method="GET", body=None):
|
||||
"""Make a request to the Kratos admin API via port-forward."""
|
||||
url = f"{base_url}/admin{path}"
|
||||
data = json.dumps(body).encode() if body is not None else None
|
||||
headers = {"Content-Type": "application/json", "Accept": "application/json"}
|
||||
req = urllib.request.Request(url, data=data, headers=headers, method=method)
|
||||
try:
|
||||
with urllib.request.urlopen(req) as resp:
|
||||
body = resp.read()
|
||||
return json.loads(body) if body else None
|
||||
except urllib.error.HTTPError as e:
|
||||
body_text = e.read().decode()
|
||||
die(f"API error {e.code}: {body_text}")
|
||||
|
||||
|
||||
def _find_identity(base_url, target):
|
||||
"""Find identity by email or ID. Returns identity dict."""
|
||||
# Try as ID first
|
||||
if len(target) == 36 and target.count("-") == 4:
|
||||
return _api(base_url, f"/identities/{target}")
|
||||
# Search by email
|
||||
result = _api(base_url, f"/identities?credentials_identifier={target}&page_size=1")
|
||||
if isinstance(result, list) and result:
|
||||
return result[0]
|
||||
die(f"Identity not found: {target}")
|
||||
|
||||
|
||||
def cmd_user_list(search=""):
|
||||
step("Listing identities...")
|
||||
with _port_forward() as base:
|
||||
path = f"/identities?page_size=20"
|
||||
if search:
|
||||
path += f"&credentials_identifier={search}"
|
||||
identities = _api(base, path)
|
||||
|
||||
rows = []
|
||||
for i in identities or []:
|
||||
traits = i.get("traits", {})
|
||||
email = traits.get("email", "")
|
||||
name = traits.get("name", {})
|
||||
if isinstance(name, dict):
|
||||
display_name = f"{name.get('first', '')} {name.get('last', '')}".strip()
|
||||
else:
|
||||
display_name = str(name) if name else ""
|
||||
rows.append([i["id"][:8] + "...", email, display_name, i.get("state", "active")])
|
||||
|
||||
table(["ID", "Email", "Name", "State"], rows)
|
||||
|
||||
|
||||
def cmd_user_get(target):
|
||||
step(f"Getting identity: {target}")
|
||||
with _port_forward() as base:
|
||||
identity = _find_identity(base, target)
|
||||
print(json.dumps(identity, indent=2))
|
||||
|
||||
|
||||
def cmd_user_create(email, name="", schema_id="default"):
|
||||
step(f"Creating identity: {email}")
|
||||
traits = {"email": email}
|
||||
if name:
|
||||
parts = name.split(" ", 1)
|
||||
traits["name"] = {"first": parts[0], "last": parts[1] if len(parts) > 1 else ""}
|
||||
|
||||
body = {
|
||||
"schema_id": schema_id,
|
||||
"traits": traits,
|
||||
"state": "active",
|
||||
}
|
||||
|
||||
with _port_forward() as base:
|
||||
identity = _api(base, "/identities", method="POST", body=body)
|
||||
ok(f"Created identity: {identity['id']}")
|
||||
|
||||
# Generate recovery code (link is deprecated in Kratos v1.x)
|
||||
recovery = _api(base, "/recovery/code", method="POST", body={
|
||||
"identity_id": identity["id"],
|
||||
"expires_in": "24h",
|
||||
})
|
||||
|
||||
ok("Recovery link (valid 24h):")
|
||||
print(recovery.get("recovery_link", ""))
|
||||
ok("Recovery code (enter on the page above):")
|
||||
print(recovery.get("recovery_code", ""))
|
||||
|
||||
|
||||
def cmd_user_delete(target):
|
||||
step(f"Deleting identity: {target}")
|
||||
|
||||
confirm = input(f"Delete identity '{target}'? This cannot be undone. [y/N] ").strip().lower()
|
||||
if confirm != "y":
|
||||
ok("Cancelled.")
|
||||
return
|
||||
|
||||
with _port_forward() as base:
|
||||
identity = _find_identity(base, target)
|
||||
_api(base, f"/identities/{identity['id']}", method="DELETE")
|
||||
ok(f"Deleted.")
|
||||
|
||||
|
||||
def cmd_user_recover(target):
|
||||
step(f"Generating recovery link for: {target}")
|
||||
with _port_forward() as base:
|
||||
identity = _find_identity(base, target)
|
||||
recovery = _api(base, "/recovery/code", method="POST", body={
|
||||
"identity_id": identity["id"],
|
||||
"expires_in": "24h",
|
||||
})
|
||||
ok("Recovery link (valid 24h):")
|
||||
print(recovery.get("recovery_link", ""))
|
||||
ok("Recovery code (enter on the page above):")
|
||||
print(recovery.get("recovery_code", ""))
|
||||
Reference in New Issue
Block a user