feat: Python upstream — onboard/offboard, mailbox, Projects, --no-cache

Python changes that were ported to Rust in preceding commits:
- User onboard/offboard with mailbox + Projects provisioning
- Welcome email with job title/department
- --no-cache build flag
- Date validation, apply confirmation, build targets
This commit is contained in:
2026-03-20 21:32:23 +00:00
parent 8d6e815a91
commit b92c6ad18c
6 changed files with 696 additions and 89 deletions

View File

@@ -1,8 +1,20 @@
"""CLI entry point — argparse dispatch table for all sunbeam verbs.""" """CLI entry point — argparse dispatch table for all sunbeam verbs."""
import argparse import argparse
import datetime
import sys import sys
def _date_type(value):
"""Validate YYYY-MM-DD date format for argparse."""
if not value:
return value
try:
datetime.date.fromisoformat(value)
except ValueError:
raise argparse.ArgumentTypeError(f"Invalid date: {value!r} (expected YYYY-MM-DD)")
return value
ENV_CONTEXTS = { ENV_CONTEXTS = {
"local": "sunbeam", "local": "sunbeam",
"production": "production", "production": "production",
@@ -48,6 +60,8 @@ def main() -> None:
p_apply = sub.add_parser("apply", help="kustomize build + domain subst + kubectl apply") p_apply = sub.add_parser("apply", help="kustomize build + domain subst + kubectl apply")
p_apply.add_argument("namespace", nargs="?", default="", p_apply.add_argument("namespace", nargs="?", default="",
help="Limit apply to one namespace (e.g. lasuite, ingress, ory)") help="Limit apply to one namespace (e.g. lasuite, ingress, ory)")
p_apply.add_argument("--all", action="store_true", dest="apply_all",
help="Apply all namespaces without confirmation")
p_apply.add_argument("--domain", default="", help="Domain suffix (e.g. sunbeam.pt)") p_apply.add_argument("--domain", default="", help="Domain suffix (e.g. sunbeam.pt)")
p_apply.add_argument("--email", default="", help="ACME email for cert-manager") p_apply.add_argument("--email", default="", help="ACME email for cert-manager")
@@ -83,12 +97,14 @@ def main() -> None:
"messages", "messages-backend", "messages-frontend", "messages", "messages-backend", "messages-frontend",
"messages-mta-in", "messages-mta-out", "messages-mta-in", "messages-mta-out",
"messages-mpa", "messages-socks-proxy", "messages-mpa", "messages-socks-proxy",
"tuwunel"], "tuwunel", "calendars", "projects", "sol"],
help="What to build") help="What to build")
p_build.add_argument("--push", action="store_true", p_build.add_argument("--push", action="store_true",
help="Push image to registry after building") help="Push image to registry after building")
p_build.add_argument("--deploy", action="store_true", p_build.add_argument("--deploy", action="store_true",
help="Apply manifests and rollout restart after pushing (implies --push)") help="Apply manifests and rollout restart after pushing (implies --push)")
p_build.add_argument("--no-cache", action="store_true",
help="Disable buildkitd layer cache")
# sunbeam check [ns[/name]] # sunbeam check [ns[/name]]
p_check = sub.add_parser("check", help="Functional service health checks") p_check = sub.add_parser("check", help="Functional service health checks")
@@ -161,6 +177,21 @@ def main() -> None:
p_user_set_pw.add_argument("target", help="Email or identity ID") p_user_set_pw.add_argument("target", help="Email or identity ID")
p_user_set_pw.add_argument("password", help="New password") p_user_set_pw.add_argument("password", help="New password")
p_user_onboard = user_sub.add_parser("onboard", help="Onboard new user (create + welcome email)")
p_user_onboard.add_argument("email", help="Email address")
p_user_onboard.add_argument("--name", default="", help="Display name (First Last)")
p_user_onboard.add_argument("--schema", default="employee", help="Schema ID (default: employee)")
p_user_onboard.add_argument("--no-email", action="store_true", help="Skip sending welcome email")
p_user_onboard.add_argument("--notify", default="", help="Send welcome email to this address instead of identity email")
p_user_onboard.add_argument("--job-title", default="", help="Job title")
p_user_onboard.add_argument("--department", default="", help="Department")
p_user_onboard.add_argument("--office-location", default="", help="Office location")
p_user_onboard.add_argument("--hire-date", default="", type=_date_type, help="Hire date (YYYY-MM-DD)")
p_user_onboard.add_argument("--manager", default="", help="Manager name or email")
p_user_offboard = user_sub.add_parser("offboard", help="Offboard user (disable + revoke all)")
p_user_offboard.add_argument("target", help="Email or identity ID")
args = parser.parse_args() args = parser.parse_args()
@@ -199,11 +230,25 @@ def main() -> None:
cmd_status(args.target) cmd_status(args.target)
elif args.verb == "apply": elif args.verb == "apply":
from sunbeam.manifests import cmd_apply from sunbeam.manifests import cmd_apply, MANAGED_NS
# --domain/--email can appear before OR after the verb; subparser wins if both set. # --domain/--email can appear before OR after the verb; subparser wins if both set.
domain = getattr(args, "domain", "") or "" domain = getattr(args, "domain", "") or ""
email = getattr(args, "email", "") or "" email = getattr(args, "email", "") or ""
namespace = getattr(args, "namespace", "") or "" namespace = getattr(args, "namespace", "") or ""
apply_all = getattr(args, "apply_all", False)
# Full apply on production requires --all or interactive confirmation
if args.env == "production" and not namespace and not apply_all:
from sunbeam.output import warn
warn(f"This will apply ALL namespaces ({', '.join(MANAGED_NS)}) to production.")
try:
answer = input(" Continue? [y/N] ").strip().lower()
except (EOFError, KeyboardInterrupt):
answer = ""
if answer not in ("y", "yes"):
print("Aborted.")
sys.exit(0)
cmd_apply(env=args.env, domain=domain, email=email, namespace=namespace) cmd_apply(env=args.env, domain=domain, email=email, namespace=namespace)
elif args.verb == "seed": elif args.verb == "seed":
@@ -229,7 +274,7 @@ def main() -> None:
elif args.verb == "build": elif args.verb == "build":
from sunbeam.images import cmd_build from sunbeam.images import cmd_build
push = args.push or args.deploy push = args.push or args.deploy
cmd_build(args.what, push=push, deploy=args.deploy) cmd_build(args.what, push=push, deploy=args.deploy, no_cache=args.no_cache)
elif args.verb == "check": elif args.verb == "check":
from sunbeam.checks import cmd_check from sunbeam.checks import cmd_check
@@ -294,7 +339,8 @@ def main() -> None:
from sunbeam.users import (cmd_user_list, cmd_user_get, cmd_user_create, from sunbeam.users import (cmd_user_list, cmd_user_get, cmd_user_create,
cmd_user_delete, cmd_user_recover, cmd_user_delete, cmd_user_recover,
cmd_user_disable, cmd_user_enable, cmd_user_disable, cmd_user_enable,
cmd_user_set_password) cmd_user_set_password,
cmd_user_onboard, cmd_user_offboard)
action = getattr(args, "user_action", None) action = getattr(args, "user_action", None)
if action is None: if action is None:
p_user.print_help() p_user.print_help()
@@ -315,6 +361,14 @@ def main() -> None:
cmd_user_enable(args.target) cmd_user_enable(args.target)
elif action == "set-password": elif action == "set-password":
cmd_user_set_password(args.target, args.password) cmd_user_set_password(args.target, args.password)
elif action == "onboard":
cmd_user_onboard(args.email, name=args.name, schema_id=args.schema,
send_email=not args.no_email, notify=args.notify,
job_title=args.job_title, department=args.department,
office_location=args.office_location,
hire_date=args.hire_date, manager=args.manager)
elif action == "offboard":
cmd_user_offboard(args.target)
else: else:
parser.print_help() parser.print_help()

View File

@@ -259,6 +259,7 @@ def _buildctl_build_and_push(
*, *,
target: str | None = None, target: str | None = None,
build_args: dict[str, str] | None = None, build_args: dict[str, str] | None = None,
no_cache: bool = False,
) -> None: ) -> None:
"""Build and push an image via buildkitd running in k3s. """Build and push an image via buildkitd running in k3s.
@@ -320,6 +321,8 @@ def _buildctl_build_and_push(
] ]
if target: if target:
cmd += ["--opt", f"target={target}"] cmd += ["--opt", f"target={target}"]
if no_cache:
cmd += ["--no-cache"]
if build_args: if build_args:
for k, v in build_args.items(): for k, v in build_args.items():
cmd += ["--opt", f"build-arg:{k}={v}"] cmd += ["--opt", f"build-arg:{k}={v}"]
@@ -343,6 +346,7 @@ def _build_image(
target: str | None = None, target: str | None = None,
build_args: dict[str, str] | None = None, build_args: dict[str, str] | None = None,
push: bool = False, push: bool = False,
no_cache: bool = False,
cleanup_paths: list[Path] | None = None, cleanup_paths: list[Path] | None = None,
) -> None: ) -> None:
"""Build a container image via buildkitd and push to the Gitea registry. """Build a container image via buildkitd and push to the Gitea registry.
@@ -364,6 +368,7 @@ def _build_image(
context_dir=context_dir, context_dir=context_dir,
target=target, target=target,
build_args=build_args, build_args=build_args,
no_cache=no_cache,
) )
finally: finally:
for p in (cleanup_paths or []): for p in (cleanup_paths or []):
@@ -514,16 +519,16 @@ def cmd_mirror(domain: str = "", gitea_admin_pass: str = ""):
# Build dispatch # Build dispatch
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def cmd_build(what: str, push: bool = False, deploy: bool = False): def cmd_build(what: str, push: bool = False, deploy: bool = False, no_cache: bool = False):
"""Build an image. Pass push=True to push, deploy=True to also apply + rollout.""" """Build an image. Pass push=True to push, deploy=True to also apply + rollout."""
try: try:
_cmd_build(what, push=push, deploy=deploy) _cmd_build(what, push=push, deploy=deploy, no_cache=no_cache)
except subprocess.CalledProcessError as exc: except subprocess.CalledProcessError as exc:
cmd_str = " ".join(str(a) for a in exc.cmd) cmd_str = " ".join(str(a) for a in exc.cmd)
die(f"Build step failed (exit {exc.returncode}): {cmd_str}") die(f"Build step failed (exit {exc.returncode}): {cmd_str}")
def _cmd_build(what: str, push: bool = False, deploy: bool = False): def _cmd_build(what: str, push: bool = False, deploy: bool = False, no_cache: bool = False):
if what == "proxy": if what == "proxy":
_build_proxy(push=push, deploy=deploy) _build_proxy(push=push, deploy=deploy)
elif what == "integration": elif what == "integration":
@@ -553,6 +558,12 @@ def _cmd_build(what: str, push: bool = False, deploy: bool = False):
_build_messages(what, push=push, deploy=deploy) _build_messages(what, push=push, deploy=deploy)
elif what == "tuwunel": elif what == "tuwunel":
_build_tuwunel(push=push, deploy=deploy) _build_tuwunel(push=push, deploy=deploy)
elif what == "calendars":
_build_calendars(push=push, deploy=deploy)
elif what == "projects":
_build_projects(push=push, deploy=deploy)
elif what == "sol":
_build_sol(push=push, deploy=deploy)
else: else:
die(f"Unknown build target: {what}") die(f"Unknown build target: {what}")
@@ -923,3 +934,105 @@ def _patch_dockerfile_uv(
except Exception as exc: except Exception as exc:
warn(f"Failed to stage uv binaries: {exc}") warn(f"Failed to stage uv binaries: {exc}")
return (dockerfile_path, cleanup) return (dockerfile_path, cleanup)
def _build_projects(push: bool = False, deploy: bool = False):
"""Build projects (Planka Kanban) image from source."""
env = _get_build_env()
projects_dir = _get_repo_root() / "projects"
if not projects_dir.is_dir():
die(f"projects source not found at {projects_dir}")
image = f"{env.registry}/studio/projects:latest"
step(f"Building projects -> {image} ...")
_build_image(env, image, projects_dir / "Dockerfile", projects_dir, push=push)
if deploy:
_deploy_rollout(env, ["projects"], "lasuite", timeout="180s",
images=[image])
def _build_sol(push: bool = False, deploy: bool = False):
"""Build Sol virtual librarian image from source."""
env = _get_build_env()
sol_dir = _get_repo_root() / "sol"
if not sol_dir.is_dir():
die(f"Sol source not found at {sol_dir}")
image = f"{env.registry}/studio/sol:latest"
step(f"Building sol -> {image} ...")
_build_image(env, image, sol_dir / "Dockerfile", sol_dir, push=push)
if deploy:
_deploy_rollout(env, ["sol"], "matrix", timeout="120s")
def _build_calendars(push: bool = False, deploy: bool = False):
env = _get_build_env()
cal_dir = _get_repo_root() / "calendars"
if not cal_dir.is_dir():
die(f"calendars source not found at {cal_dir}")
backend_dir = cal_dir / "src" / "backend"
backend_image = f"{env.registry}/studio/calendars-backend:latest"
step(f"Building calendars-backend -> {backend_image} ...")
# Stage translations.json into the build context so the production image
# has it at /data/translations.json (Docker Compose mounts it; we bake it in).
translations_src = (cal_dir / "src" / "frontend" / "apps" / "calendars"
/ "src" / "features" / "i18n" / "translations.json")
translations_dst = backend_dir / "_translations.json"
cleanup: list[Path] = []
dockerfile = backend_dir / "Dockerfile"
if translations_src.exists():
shutil.copy(str(translations_src), str(translations_dst))
cleanup.append(translations_dst)
# Patch Dockerfile to COPY translations into production image
patched = dockerfile.read_text() + (
"\n# Sunbeam: bake translations.json for default calendar names\n"
"COPY _translations.json /data/translations.json\n"
)
patched_df = backend_dir / "Dockerfile._sunbeam_patched"
patched_df.write_text(patched)
cleanup.append(patched_df)
dockerfile = patched_df
_build_image(env, backend_image,
dockerfile,
backend_dir,
target="backend-production",
push=push,
cleanup_paths=cleanup)
caldav_image = f"{env.registry}/studio/calendars-caldav:latest"
step(f"Building calendars-caldav -> {caldav_image} ...")
_build_image(env, caldav_image,
cal_dir / "src" / "caldav" / "Dockerfile",
cal_dir / "src" / "caldav",
push=push)
frontend_image = f"{env.registry}/studio/calendars-frontend:latest"
step(f"Building calendars-frontend -> {frontend_image} ...")
integration_base = f"https://integration.{env.domain}"
_build_image(env, frontend_image,
cal_dir / "src" / "frontend" / "Dockerfile",
cal_dir / "src" / "frontend",
target="frontend-production",
build_args={
"VISIO_BASE_URL": f"https://meet.{env.domain}",
"GAUFRE_WIDGET_PATH": f"{integration_base}/api/v2/lagaufre.js",
"GAUFRE_API_URL": f"{integration_base}/api/v2/services.json",
"THEME_CSS_URL": f"{integration_base}/api/v2/theme.css",
},
push=push)
if deploy:
_deploy_rollout(env,
["calendars-backend", "calendars-worker",
"calendars-caldav", "calendars-frontend"],
"lasuite", timeout="180s",
images=[backend_image, caldav_image, frontend_image])

View File

@@ -50,7 +50,7 @@ GITEA_ADMIN_USER = "gitea_admin"
PG_USERS = [ PG_USERS = [
"kratos", "hydra", "gitea", "hive", "kratos", "hydra", "gitea", "hive",
"docs", "meet", "drive", "messages", "conversations", "docs", "meet", "drive", "messages", "conversations",
"people", "find", "people", "find", "calendars", "projects",
] ]
@@ -221,6 +221,16 @@ def _seed_openbao() -> dict:
drive = get_or_create("drive", drive = get_or_create("drive",
**{"django-secret-key": rand}) **{"django-secret-key": rand})
projects = get_or_create("projects",
**{"secret-key": rand})
calendars = get_or_create("calendars",
**{"django-secret-key": lambda: _secrets.token_urlsafe(50),
"salt-key": rand,
"caldav-inbound-api-key": rand,
"caldav-outbound-api-key": rand,
"caldav-internal-api-key": rand})
# DKIM key pair -- generated together since private and public keys are coupled. # DKIM key pair -- generated together since private and public keys are coupled.
# Read existing keys first; only generate a new pair when absent. # Read existing keys first; only generate a new pair when absent.
existing_messages_raw = bao( existing_messages_raw = bao(
@@ -351,6 +361,14 @@ def _seed_openbao() -> dict:
"application-jwt-secret-key": meet["application-jwt-secret-key"]}) "application-jwt-secret-key": meet["application-jwt-secret-key"]})
if "drive" in _dirty_paths: if "drive" in _dirty_paths:
_kv_put("drive", **{"django-secret-key": drive["django-secret-key"]}) _kv_put("drive", **{"django-secret-key": drive["django-secret-key"]})
if "projects" in _dirty_paths:
_kv_put("projects", **{"secret-key": projects["secret-key"]})
if "calendars" in _dirty_paths:
_kv_put("calendars", **{"django-secret-key": calendars["django-secret-key"],
"salt-key": calendars["salt-key"],
"caldav-inbound-api-key": calendars["caldav-inbound-api-key"],
"caldav-outbound-api-key": calendars["caldav-outbound-api-key"],
"caldav-internal-api-key": calendars["caldav-internal-api-key"]})
if "collabora" in _dirty_paths: if "collabora" in _dirty_paths:
_kv_put("collabora", **{"username": collabora["username"], _kv_put("collabora", **{"username": collabora["username"],
"password": collabora["password"]}) "password": collabora["password"]})
@@ -660,6 +678,7 @@ def cmd_seed() -> dict:
"drive": "drive_db", "messages": "messages_db", "drive": "drive_db", "messages": "messages_db",
"conversations": "conversations_db", "conversations": "conversations_db",
"people": "people_db", "find": "find_db", "people": "people_db", "find": "find_db",
"calendars": "calendars_db", "projects": "projects_db",
} }
for user in PG_USERS: for user in PG_USERS:
# Only CREATE if missing -- passwords are managed by OpenBao static roles. # Only CREATE if missing -- passwords are managed by OpenBao static roles.

View File

@@ -22,6 +22,7 @@ SERVICES_TO_RESTART = [
("lasuite", "people-frontend"), ("lasuite", "people-frontend"),
("lasuite", "people-celery-worker"), ("lasuite", "people-celery-worker"),
("lasuite", "people-celery-beat"), ("lasuite", "people-celery-beat"),
("lasuite", "projects"),
("matrix", "tuwunel"), ("matrix", "tuwunel"),
("media", "livekit-server"), ("media", "livekit-server"),
] ]

View File

@@ -63,6 +63,19 @@ class TestArgParsing(unittest.TestCase):
p_user_set_pw = user_sub.add_parser("set-password") p_user_set_pw = user_sub.add_parser("set-password")
p_user_set_pw.add_argument("target") p_user_set_pw.add_argument("target")
p_user_set_pw.add_argument("password") p_user_set_pw.add_argument("password")
p_user_onboard = user_sub.add_parser("onboard")
p_user_onboard.add_argument("email")
p_user_onboard.add_argument("--name", default="")
p_user_onboard.add_argument("--schema", default="employee")
p_user_onboard.add_argument("--no-email", action="store_true")
p_user_onboard.add_argument("--notify", default="")
p_user_onboard.add_argument("--job-title", default="")
p_user_onboard.add_argument("--department", default="")
p_user_onboard.add_argument("--office-location", default="")
p_user_onboard.add_argument("--hire-date", default="")
p_user_onboard.add_argument("--manager", default="")
p_user_offboard = user_sub.add_parser("offboard")
p_user_offboard.add_argument("target")
# Add config subcommand for testing # Add config subcommand for testing
p_config = sub.add_parser("config") p_config = sub.add_parser("config")
@@ -155,6 +168,42 @@ class TestArgParsing(unittest.TestCase):
self.assertEqual(args.email, "x@example.com") self.assertEqual(args.email, "x@example.com")
self.assertEqual(args.name, "X Y") self.assertEqual(args.name, "X Y")
def test_user_onboard_basic(self):
args = self._parse(["user", "onboard", "a@b.com"])
self.assertEqual(args.user_action, "onboard")
self.assertEqual(args.email, "a@b.com")
self.assertEqual(args.name, "")
self.assertEqual(args.schema, "employee")
self.assertFalse(args.no_email)
self.assertEqual(args.notify, "")
def test_user_onboard_full(self):
args = self._parse(["user", "onboard", "a@b.com", "--name", "A B", "--schema", "default",
"--no-email", "--job-title", "Engineer", "--department", "Dev",
"--office-location", "Paris", "--hire-date", "2026-01-15",
"--manager", "boss@b.com"])
self.assertEqual(args.user_action, "onboard")
self.assertEqual(args.email, "a@b.com")
self.assertEqual(args.name, "A B")
self.assertEqual(args.schema, "default")
self.assertTrue(args.no_email)
self.assertEqual(args.job_title, "Engineer")
self.assertEqual(args.department, "Dev")
self.assertEqual(args.office_location, "Paris")
self.assertEqual(args.hire_date, "2026-01-15")
self.assertEqual(args.manager, "boss@b.com")
def test_user_onboard_notify(self):
args = self._parse(["user", "onboard", "a@work.com", "--notify", "a@personal.com"])
self.assertEqual(args.email, "a@work.com")
self.assertEqual(args.notify, "a@personal.com")
self.assertFalse(args.no_email)
def test_user_offboard(self):
args = self._parse(["user", "offboard", "a@b.com"])
self.assertEqual(args.user_action, "offboard")
self.assertEqual(args.target, "a@b.com")
def test_get_with_target(self): def test_get_with_target(self):
args = self._parse(["get", "ory/kratos-abc"]) args = self._parse(["get", "ory/kratos-abc"])
self.assertEqual(args.verb, "get") self.assertEqual(args.verb, "get")
@@ -259,6 +308,16 @@ class TestArgParsing(unittest.TestCase):
class TestCliDispatch(unittest.TestCase): class TestCliDispatch(unittest.TestCase):
"""Test that main() dispatches to the correct command function.""" """Test that main() dispatches to the correct command function."""
@staticmethod
def _mock_users(**overrides):
defaults = {f: MagicMock() for f in [
"cmd_user_list", "cmd_user_get", "cmd_user_create", "cmd_user_delete",
"cmd_user_recover", "cmd_user_disable", "cmd_user_enable",
"cmd_user_set_password", "cmd_user_onboard", "cmd_user_offboard",
]}
defaults.update(overrides)
return MagicMock(**defaults)
def test_no_verb_exits_0(self): def test_no_verb_exits_0(self):
with patch.object(sys, "argv", ["sunbeam"]): with patch.object(sys, "argv", ["sunbeam"]):
from sunbeam import cli from sunbeam import cli
@@ -356,7 +415,7 @@ class TestCliDispatch(unittest.TestCase):
cli_mod.main() cli_mod.main()
except SystemExit: except SystemExit:
pass pass
mock_build.assert_called_once_with("proxy", push=False, deploy=False) mock_build.assert_called_once_with("proxy", push=False, deploy=False, no_cache=False)
def test_build_with_push_flag(self): def test_build_with_push_flag(self):
mock_build = MagicMock() mock_build = MagicMock()
@@ -368,7 +427,7 @@ class TestCliDispatch(unittest.TestCase):
cli_mod.main() cli_mod.main()
except SystemExit: except SystemExit:
pass pass
mock_build.assert_called_once_with("integration", push=True, deploy=False) mock_build.assert_called_once_with("integration", push=True, deploy=False, no_cache=False)
def test_build_with_deploy_flag_implies_push(self): def test_build_with_deploy_flag_implies_push(self):
mock_build = MagicMock() mock_build = MagicMock()
@@ -380,16 +439,11 @@ class TestCliDispatch(unittest.TestCase):
cli_mod.main() cli_mod.main()
except SystemExit: except SystemExit:
pass pass
mock_build.assert_called_once_with("proxy", push=True, deploy=True) mock_build.assert_called_once_with("proxy", push=True, deploy=True, no_cache=False)
def test_user_set_password_dispatches(self): def test_user_set_password_dispatches(self):
mock_set_pw = MagicMock() mock_set_pw = MagicMock()
mock_users = MagicMock( mock_users = self._mock_users(cmd_user_set_password=mock_set_pw)
cmd_user_list=MagicMock(), cmd_user_get=MagicMock(),
cmd_user_create=MagicMock(), cmd_user_delete=MagicMock(),
cmd_user_recover=MagicMock(), cmd_user_disable=MagicMock(),
cmd_user_enable=MagicMock(), cmd_user_set_password=mock_set_pw,
)
with patch.object(sys, "argv", ["sunbeam", "user", "set-password", with patch.object(sys, "argv", ["sunbeam", "user", "set-password",
"admin@sunbeam.pt", "s3cr3t"]): "admin@sunbeam.pt", "s3cr3t"]):
with patch.dict("sys.modules", {"sunbeam.users": mock_users}): with patch.dict("sys.modules", {"sunbeam.users": mock_users}):
@@ -403,12 +457,7 @@ class TestCliDispatch(unittest.TestCase):
def test_user_disable_dispatches(self): def test_user_disable_dispatches(self):
mock_disable = MagicMock() mock_disable = MagicMock()
mock_users = MagicMock( mock_users = self._mock_users(cmd_user_disable=mock_disable)
cmd_user_list=MagicMock(), cmd_user_get=MagicMock(),
cmd_user_create=MagicMock(), cmd_user_delete=MagicMock(),
cmd_user_recover=MagicMock(), cmd_user_disable=mock_disable,
cmd_user_enable=MagicMock(), cmd_user_set_password=MagicMock(),
)
with patch.object(sys, "argv", ["sunbeam", "user", "disable", "x@sunbeam.pt"]): with patch.object(sys, "argv", ["sunbeam", "user", "disable", "x@sunbeam.pt"]):
with patch.dict("sys.modules", {"sunbeam.users": mock_users}): with patch.dict("sys.modules", {"sunbeam.users": mock_users}):
import importlib, sunbeam.cli as cli_mod import importlib, sunbeam.cli as cli_mod
@@ -421,12 +470,7 @@ class TestCliDispatch(unittest.TestCase):
def test_user_enable_dispatches(self): def test_user_enable_dispatches(self):
mock_enable = MagicMock() mock_enable = MagicMock()
mock_users = MagicMock( mock_users = self._mock_users(cmd_user_enable=mock_enable)
cmd_user_list=MagicMock(), cmd_user_get=MagicMock(),
cmd_user_create=MagicMock(), cmd_user_delete=MagicMock(),
cmd_user_recover=MagicMock(), cmd_user_disable=MagicMock(),
cmd_user_enable=mock_enable, cmd_user_set_password=MagicMock(),
)
with patch.object(sys, "argv", ["sunbeam", "user", "enable", "x@sunbeam.pt"]): with patch.object(sys, "argv", ["sunbeam", "user", "enable", "x@sunbeam.pt"]):
with patch.dict("sys.modules", {"sunbeam.users": mock_users}): with patch.dict("sys.modules", {"sunbeam.users": mock_users}):
import importlib, sunbeam.cli as cli_mod import importlib, sunbeam.cli as cli_mod
@@ -471,7 +515,7 @@ class TestCliDispatch(unittest.TestCase):
cli_mod.main() cli_mod.main()
except SystemExit: except SystemExit:
pass pass
mock_build.assert_called_once_with("people", push=False, deploy=False) mock_build.assert_called_once_with("people", push=False, deploy=False, no_cache=False)
def test_build_people_push_dispatches(self): def test_build_people_push_dispatches(self):
mock_build = MagicMock() mock_build = MagicMock()
@@ -483,7 +527,7 @@ class TestCliDispatch(unittest.TestCase):
cli_mod.main() cli_mod.main()
except SystemExit: except SystemExit:
pass pass
mock_build.assert_called_once_with("people", push=True, deploy=False) mock_build.assert_called_once_with("people", push=True, deploy=False, no_cache=False)
def test_build_people_deploy_implies_push(self): def test_build_people_deploy_implies_push(self):
mock_build = MagicMock() mock_build = MagicMock()
@@ -495,7 +539,7 @@ class TestCliDispatch(unittest.TestCase):
cli_mod.main() cli_mod.main()
except SystemExit: except SystemExit:
pass pass
mock_build.assert_called_once_with("people", push=True, deploy=True) mock_build.assert_called_once_with("people", push=True, deploy=True, no_cache=False)
def test_build_meet_dispatches(self): def test_build_meet_dispatches(self):
mock_build = MagicMock() mock_build = MagicMock()
@@ -507,7 +551,7 @@ class TestCliDispatch(unittest.TestCase):
cli_mod.main() cli_mod.main()
except SystemExit: except SystemExit:
pass pass
mock_build.assert_called_once_with("meet", push=False, deploy=False) mock_build.assert_called_once_with("meet", push=False, deploy=False, no_cache=False)
def test_check_no_target(self): def test_check_no_target(self):
mock_check = MagicMock() mock_check = MagicMock()
@@ -534,6 +578,56 @@ class TestCliDispatch(unittest.TestCase):
mock_check.assert_called_once_with("lasuite/people") mock_check.assert_called_once_with("lasuite/people")
def test_user_onboard_dispatches(self):
mock_onboard = MagicMock()
mock_users = self._mock_users(cmd_user_onboard=mock_onboard)
with patch.object(sys, "argv", ["sunbeam", "user", "onboard",
"new@sunbeam.pt", "--name", "New User"]):
with patch.dict("sys.modules", {"sunbeam.users": mock_users}):
import importlib, sunbeam.cli as cli_mod
importlib.reload(cli_mod)
try:
cli_mod.main()
except SystemExit:
pass
mock_onboard.assert_called_once_with("new@sunbeam.pt", name="New User",
schema_id="employee", send_email=True,
notify="", job_title="", department="",
office_location="", hire_date="",
manager="")
def test_user_onboard_no_email_dispatches(self):
mock_onboard = MagicMock()
mock_users = self._mock_users(cmd_user_onboard=mock_onboard)
with patch.object(sys, "argv", ["sunbeam", "user", "onboard",
"new@sunbeam.pt", "--no-email"]):
with patch.dict("sys.modules", {"sunbeam.users": mock_users}):
import importlib, sunbeam.cli as cli_mod
importlib.reload(cli_mod)
try:
cli_mod.main()
except SystemExit:
pass
mock_onboard.assert_called_once_with("new@sunbeam.pt", name="",
schema_id="employee", send_email=False,
notify="", job_title="", department="",
office_location="", hire_date="",
manager="")
def test_user_offboard_dispatches(self):
mock_offboard = MagicMock()
mock_users = self._mock_users(cmd_user_offboard=mock_offboard)
with patch.object(sys, "argv", ["sunbeam", "user", "offboard", "x@sunbeam.pt"]):
with patch.dict("sys.modules", {"sunbeam.users": mock_users}):
import importlib, sunbeam.cli as cli_mod
importlib.reload(cli_mod)
try:
cli_mod.main()
except SystemExit:
pass
mock_offboard.assert_called_once_with("x@sunbeam.pt")
class TestConfigCli(unittest.TestCase): class TestConfigCli(unittest.TestCase):
"""Test config subcommand functionality.""" """Test config subcommand functionality."""

View File

@@ -1,19 +1,23 @@
"""User management — Kratos identity operations via port-forwarded admin API.""" """User management — Kratos identity operations via port-forwarded admin API."""
import json import json
import smtplib
import subprocess import subprocess
import sys import sys
import time import time
import urllib.request import urllib.request
import urllib.error import urllib.error
from contextlib import contextmanager from contextlib import contextmanager
from email.message import EmailMessage
import sunbeam.kube as _kube_mod import sunbeam.kube as _kube_mod
from sunbeam.output import step, ok, warn, die, table from sunbeam.output import step, ok, warn, die, table
_SMTP_LOCAL_PORT = 10025
@contextmanager @contextmanager
def _port_forward(ns="ory", svc="kratos-admin", local_port=4434, remote_port=80): def _port_forward(ns="ory", svc="kratos-admin", local_port=4434, remote_port=80):
"""Port-forward directly to the Kratos admin HTTP API and yield the local URL.""" """Port-forward to a cluster service and yield the local base URL."""
proc = subprocess.Popen( proc = subprocess.Popen(
["kubectl", _kube_mod.context_arg(), "-n", ns, "port-forward", ["kubectl", _kube_mod.context_arg(), "-n", ns, "port-forward",
f"svc/{svc}", f"{local_port}:{remote_port}"], f"svc/{svc}", f"{local_port}:{remote_port}"],
@@ -28,23 +32,25 @@ def _port_forward(ns="ory", svc="kratos-admin", local_port=4434, remote_port=80)
proc.wait() proc.wait()
def _api(base_url, path, method="GET", body=None): def _api(base_url, path, method="GET", body=None, prefix="/admin", ok_statuses=()):
"""Make a request to the Kratos admin API via port-forward.""" """Make a request to an admin API via port-forward."""
url = f"{base_url}/admin{path}" url = f"{base_url}{prefix}{path}"
data = json.dumps(body).encode() if body is not None else None data = json.dumps(body).encode() if body is not None else None
headers = {"Content-Type": "application/json", "Accept": "application/json"} headers = {"Content-Type": "application/json", "Accept": "application/json"}
req = urllib.request.Request(url, data=data, headers=headers, method=method) req = urllib.request.Request(url, data=data, headers=headers, method=method)
try: try:
with urllib.request.urlopen(req) as resp: with urllib.request.urlopen(req) as resp:
body = resp.read() resp_body = resp.read()
return json.loads(body) if body else None return json.loads(resp_body) if resp_body else None
except urllib.error.HTTPError as e: except urllib.error.HTTPError as e:
body_text = e.read().decode() if e.code in ok_statuses:
die(f"API error {e.code}: {body_text}") return None
err_text = e.read().decode()
die(f"API error {e.code}: {err_text}")
def _find_identity(base_url, target): def _find_identity(base_url, target, required=True):
"""Find identity by email or ID. Returns identity dict.""" """Find identity by email or ID. Returns identity dict or None if not required."""
# Try as ID first # Try as ID first
if len(target) == 36 and target.count("-") == 4: if len(target) == 36 and target.count("-") == 4:
return _api(base_url, f"/identities/{target}") return _api(base_url, f"/identities/{target}")
@@ -52,7 +58,31 @@ def _find_identity(base_url, target):
result = _api(base_url, f"/identities?credentials_identifier={target}&page_size=1") result = _api(base_url, f"/identities?credentials_identifier={target}&page_size=1")
if isinstance(result, list) and result: if isinstance(result, list) and result:
return result[0] return result[0]
if required:
die(f"Identity not found: {target}") die(f"Identity not found: {target}")
return None
def _identity_put_body(identity, state=None, **extra):
"""Build the PUT body for updating an identity, preserving all required fields."""
body = {
"schema_id": identity["schema_id"],
"traits": identity["traits"],
"state": state or identity.get("state", "active"),
"metadata_public": identity.get("metadata_public"),
"metadata_admin": identity.get("metadata_admin"),
}
body.update(extra)
return body
def _generate_recovery(base_url, identity_id):
"""Generate a 24h recovery code. Returns (link, code)."""
recovery = _api(base_url, "/recovery/code", method="POST", body={
"identity_id": identity_id,
"expires_in": "24h",
})
return recovery.get("recovery_link", ""), recovery.get("recovery_code", "")
def cmd_user_list(search=""): def cmd_user_list(search=""):
@@ -67,6 +97,12 @@ def cmd_user_list(search=""):
for i in identities or []: for i in identities or []:
traits = i.get("traits", {}) traits = i.get("traits", {})
email = traits.get("email", "") email = traits.get("email", "")
# Support both employee (given_name/family_name) and default (name.first/last) schemas
given = traits.get("given_name", "")
family = traits.get("family_name", "")
if given or family:
display_name = f"{given} {family}".strip()
else:
name = traits.get("name", {}) name = traits.get("name", {})
if isinstance(name, dict): if isinstance(name, dict):
display_name = f"{name.get('first', '')} {name.get('last', '')}".strip() display_name = f"{name.get('first', '')} {name.get('last', '')}".strip()
@@ -100,17 +136,12 @@ def cmd_user_create(email, name="", schema_id="default"):
with _port_forward() as base: with _port_forward() as base:
identity = _api(base, "/identities", method="POST", body=body) identity = _api(base, "/identities", method="POST", body=body)
ok(f"Created identity: {identity['id']}") ok(f"Created identity: {identity['id']}")
link, code = _generate_recovery(base, identity["id"])
# Generate recovery code (link is deprecated in Kratos v1.x)
recovery = _api(base, "/recovery/code", method="POST", body={
"identity_id": identity["id"],
"expires_in": "24h",
})
ok("Recovery link (valid 24h):") ok("Recovery link (valid 24h):")
print(recovery.get("recovery_link", "")) print(link)
ok("Recovery code (enter on the page above):") ok("Recovery code (enter on the page above):")
print(recovery.get("recovery_code", "")) print(code)
def cmd_user_delete(target): def cmd_user_delete(target):
@@ -131,14 +162,11 @@ def cmd_user_recover(target):
step(f"Generating recovery link for: {target}") step(f"Generating recovery link for: {target}")
with _port_forward() as base: with _port_forward() as base:
identity = _find_identity(base, target) identity = _find_identity(base, target)
recovery = _api(base, "/recovery/code", method="POST", body={ link, code = _generate_recovery(base, identity["id"])
"identity_id": identity["id"],
"expires_in": "24h",
})
ok("Recovery link (valid 24h):") ok("Recovery link (valid 24h):")
print(recovery.get("recovery_link", "")) print(link)
ok("Recovery code (enter on the page above):") ok("Recovery code (enter on the page above):")
print(recovery.get("recovery_code", "")) print(code)
def cmd_user_disable(target): def cmd_user_disable(target):
@@ -153,13 +181,8 @@ def cmd_user_disable(target):
with _port_forward() as base: with _port_forward() as base:
identity = _find_identity(base, target) identity = _find_identity(base, target)
iid = identity["id"] iid = identity["id"]
_api(base, f"/identities/{iid}", method="PUT", body={ _api(base, f"/identities/{iid}", method="PUT",
"schema_id": identity["schema_id"], body=_identity_put_body(identity, state="inactive"))
"traits": identity["traits"],
"state": "inactive",
"metadata_public": identity.get("metadata_public"),
"metadata_admin": identity.get("metadata_admin"),
})
_api(base, f"/identities/{iid}/sessions", method="DELETE") _api(base, f"/identities/{iid}/sessions", method="DELETE")
ok(f"Identity {iid[:8]}... disabled and all Kratos sessions revoked.") ok(f"Identity {iid[:8]}... disabled and all Kratos sessions revoked.")
warn("App sessions (docs/people) expire within SESSION_COOKIE_AGE — currently 1h.") warn("App sessions (docs/people) expire within SESSION_COOKIE_AGE — currently 1h.")
@@ -171,18 +194,10 @@ def cmd_user_set_password(target, password):
with _port_forward() as base: with _port_forward() as base:
identity = _find_identity(base, target) identity = _find_identity(base, target)
iid = identity["id"] iid = identity["id"]
_api(base, f"/identities/{iid}", method="PUT", body={ _api(base, f"/identities/{iid}", method="PUT",
"schema_id": identity["schema_id"], body=_identity_put_body(identity, credentials={
"traits": identity["traits"], "password": {"config": {"password": password}},
"state": identity.get("state", "active"), }))
"metadata_public": identity.get("metadata_public"),
"metadata_admin": identity.get("metadata_admin"),
"credentials": {
"password": {
"config": {"password": password},
},
},
})
ok(f"Password set for {iid[:8]}...") ok(f"Password set for {iid[:8]}...")
@@ -192,11 +207,322 @@ def cmd_user_enable(target):
with _port_forward() as base: with _port_forward() as base:
identity = _find_identity(base, target) identity = _find_identity(base, target)
iid = identity["id"] iid = identity["id"]
_api(base, f"/identities/{iid}", method="PUT", body={ _api(base, f"/identities/{iid}", method="PUT",
"schema_id": identity["schema_id"], body=_identity_put_body(identity, state="active"))
"traits": identity["traits"],
"state": "active",
"metadata_public": identity.get("metadata_public"),
"metadata_admin": identity.get("metadata_admin"),
})
ok(f"Identity {iid[:8]}... re-enabled.") ok(f"Identity {iid[:8]}... re-enabled.")
def _send_welcome_email(domain, email, name, recovery_link, recovery_code,
job_title="", department=""):
"""Send a welcome email via cluster Postfix (port-forward to svc/postfix in lasuite)."""
greeting = f"Hi {name}" if name else "Hi"
body_text = f"""{greeting},
Welcome to Sunbeam Studios!{f" You're joining as {job_title} in the {department} department." if job_title and department else ""} Your account has been created.
To set your password, open this link and enter the recovery code below:
Link: {recovery_link}
Code: {recovery_code}
This link expires in 24 hours.
Once signed in you will be prompted to set up 2FA (mandatory).
After that, head to https://auth.{domain}/settings to set up your
profile — add your name, profile picture, and any other details.
Your services:
Calendar: https://cal.{domain}
Drive: https://drive.{domain}
Mail: https://mail.{domain}
Meet: https://meet.{domain}
Projects: https://projects.{domain}
Source Code: https://src.{domain}
Messages (Matrix):
Download Element for your platform:
Desktop: https://element.io/download
iOS: https://apps.apple.com/app/element-messenger/id1083446067
Android: https://play.google.com/store/apps/details?id=im.vector.app
Setup:
1. Open Element and tap "Sign in"
2. Tap "Edit" next to the homeserver field (matrix.org)
3. Enter: https://messages.{domain}
4. Tap "Continue" — you'll be redirected to Sunbeam Studios SSO
5. Sign in with your {domain} email and password
\u2014 With Love & Warmth, Sunbeam Studios
"""
msg = EmailMessage()
msg["Subject"] = "Welcome to Sunbeam Studios — Set Your Password"
msg["From"] = f"Sunbeam Studios <noreply@{domain}>"
msg["To"] = email
msg.set_content(body_text)
with _port_forward(ns="lasuite", svc="postfix", local_port=_SMTP_LOCAL_PORT, remote_port=25):
with smtplib.SMTP("localhost", _SMTP_LOCAL_PORT) as smtp:
smtp.send_message(msg)
ok(f"Welcome email sent to {email}")
def _next_employee_id(base_url):
"""Find the next sequential employee ID by scanning all employee identities."""
identities = _api(base_url, "/identities?page_size=200") or []
max_num = 0
for ident in identities:
eid = ident.get("traits", {}).get("employee_id", "")
if eid and eid.isdigit():
max_num = max(max_num, int(eid))
return str(max_num + 1)
def _create_mailbox(email, name=""):
"""Create a mailbox in Messages via kubectl exec into the backend."""
local_part, domain_part = email.split("@", 1)
display_name = name or local_part
step(f"Creating mailbox: {email}")
result = _kube_mod.kube_out(
"exec", "deployment/messages-backend", "-n", "lasuite",
"-c", "messages-backend", "--",
"python", "manage.py", "shell", "-c",
f"""
mb, created = Mailbox.objects.get_or_create(
local_part="{local_part}",
domain=MailDomain.objects.get(name="{domain_part}"),
)
print("created" if created else "exists")
""",
)
if "created" in (result or ""):
ok(f"Mailbox {email} created.")
elif "exists" in (result or ""):
ok(f"Mailbox {email} already exists.")
else:
warn(f"Could not create mailbox (Messages backend may not be running): {result}")
def _delete_mailbox(email):
"""Delete a mailbox and associated Django user in Messages."""
local_part, domain_part = email.split("@", 1)
step(f"Cleaning up mailbox: {email}")
result = _kube_mod.kube_out(
"exec", "deployment/messages-backend", "-n", "lasuite",
"-c", "messages-backend", "--",
"python", "manage.py", "shell", "-c",
f"""
from django.contrib.auth import get_user_model
User = get_user_model()
# Delete mailbox + access + contacts
deleted = 0
for mb in Mailbox.objects.filter(local_part="{local_part}", domain__name="{domain_part}"):
mb.delete()
deleted += 1
# Delete Django user
try:
u = User.objects.get(email="{email}")
u.delete()
deleted += 1
except User.DoesNotExist:
pass
print(f"deleted {{deleted}}")
""",
)
if "deleted" in (result or ""):
ok(f"Mailbox and user cleaned up.")
else:
warn(f"Could not clean up mailbox: {result}")
def _setup_projects_user(email, name=""):
"""Create a Projects (Planka) user and add them as manager of the Default project."""
step(f"Setting up Projects user: {email}")
js = f"""
const knex = require('knex')({{client: 'pg', connection: process.env.DATABASE_URL}});
async function go() {{
// Create or find user
let user = await knex('user_account').where({{email: '{email}'}}).first();
if (!user) {{
const id = Date.now().toString();
await knex('user_account').insert({{
id, email: '{email}', name: '{name}', password: '',
is_admin: true, is_sso: true, language: 'en-US',
created_at: new Date(), updated_at: new Date()
}});
user = {{id}};
console.log('user_created');
}} else {{
console.log('user_exists');
}}
// Add to Default project
const project = await knex('project').where({{name: 'Default'}}).first();
if (project) {{
const exists = await knex('project_manager').where({{project_id: project.id, user_id: user.id}}).first();
if (!exists) {{
await knex('project_manager').insert({{
id: (Date.now()+1).toString(), project_id: project.id,
user_id: user.id, created_at: new Date()
}});
console.log('manager_added');
}} else {{
console.log('manager_exists');
}}
}} else {{
console.log('no_default_project');
}}
}}
go().then(() => process.exit(0)).catch(e => {{ console.error(e.message); process.exit(1); }});
"""
result = _kube_mod.kube_out(
"exec", "deployment/projects", "-n", "lasuite",
"-c", "projects", "--", "node", "-e", js,
)
if "manager_added" in (result or "") or "manager_exists" in (result or ""):
ok(f"Projects user ready.")
elif "no_default_project" in (result or ""):
warn("No Default project found in Projects — skip.")
else:
warn(f"Could not set up Projects user: {result}")
def _cleanup_projects_user(email):
"""Remove a user from Projects (Planka) — delete memberships and user record."""
step(f"Cleaning up Projects user: {email}")
js = f"""
const knex = require('knex')({{client: 'pg', connection: process.env.DATABASE_URL}});
async function go() {{
const user = await knex('user_account').where({{email: '{email}'}}).first();
if (!user) {{ console.log('not_found'); return; }}
await knex('board_membership').where({{user_id: user.id}}).del();
await knex('project_manager').where({{user_id: user.id}}).del();
await knex('user_account').where({{id: user.id}}).update({{deleted_at: new Date()}});
console.log('cleaned');
}}
go().then(() => process.exit(0)).catch(e => {{ console.error(e.message); process.exit(1); }});
"""
result = _kube_mod.kube_out(
"exec", "deployment/projects", "-n", "lasuite",
"-c", "projects", "--", "node", "-e", js,
)
if "cleaned" in (result or ""):
ok("Projects user cleaned up.")
else:
warn(f"Could not clean up Projects user: {result}")
def cmd_user_onboard(email, name="", schema_id="employee", send_email=True,
notify="", job_title="", department="", office_location="",
hire_date="", manager=""):
"""Onboard a new user: create identity, generate recovery link, optionally send welcome email."""
step(f"Onboarding: {email}")
with _port_forward() as base:
existing = _find_identity(base, email, required=False)
if existing:
warn(f"Identity already exists: {existing['id'][:8]}...")
step("Generating fresh recovery link...")
iid = existing["id"]
recovery_link, recovery_code = _generate_recovery(base, iid)
else:
traits = {"email": email}
if name:
parts = name.split(" ", 1)
traits["given_name"] = parts[0]
traits["family_name"] = parts[1] if len(parts) > 1 else ""
# Auto-assign employee ID if not provided and using employee schema
employee_id = ""
if schema_id == "employee":
employee_id = _next_employee_id(base)
traits["employee_id"] = employee_id
if job_title:
traits["job_title"] = job_title
if department:
traits["department"] = department
if office_location:
traits["office_location"] = office_location
if hire_date:
traits["hire_date"] = hire_date
if manager:
traits["manager"] = manager
identity = _api(base, "/identities", method="POST", body={
"schema_id": schema_id,
"traits": traits,
"state": "active",
"verifiable_addresses": [{
"value": email,
"verified": True,
"via": "email",
}],
})
iid = identity["id"]
ok(f"Created identity: {iid}")
if employee_id:
ok(f"Employee #{employee_id}")
# Kratos ignores verifiable_addresses on POST — PATCH is required
_api(base, f"/identities/{iid}", method="PATCH", body=[
{"op": "replace", "path": "/verifiable_addresses/0/verified", "value": True},
{"op": "replace", "path": "/verifiable_addresses/0/status", "value": "completed"},
])
recovery_link, recovery_code = _generate_recovery(base, iid)
# Provision app-level accounts
if not existing:
_create_mailbox(email, name)
_setup_projects_user(email, name)
if send_email:
domain = _kube_mod.get_domain()
recipient = notify or email
_send_welcome_email(domain, recipient, name, recovery_link, recovery_code,
job_title=job_title, department=department)
ok(f"Identity ID: {iid}")
ok("Recovery link (valid 24h):")
print(recovery_link)
ok("Recovery code:")
print(recovery_code)
def cmd_user_offboard(target):
"""Offboard a user: disable identity, revoke all Kratos + Hydra sessions."""
step(f"Offboarding: {target}")
confirm = input(f"Offboard '{target}'? This will disable the account and revoke all sessions. [y/N] ").strip().lower()
if confirm != "y":
ok("Cancelled.")
return
with _port_forward() as base:
identity = _find_identity(base, target)
iid = identity["id"]
step("Disabling identity...")
_api(base, f"/identities/{iid}", method="PUT",
body=_identity_put_body(identity, state="inactive"))
ok(f"Identity {iid[:8]}... disabled.")
step("Revoking Kratos sessions...")
_api(base, f"/identities/{iid}/sessions", method="DELETE", ok_statuses=(404,))
ok("Kratos sessions revoked.")
step("Revoking Hydra consent sessions...")
with _port_forward(svc="hydra-admin", local_port=14445, remote_port=4445) as hydra_base:
_api(hydra_base, f"/oauth2/auth/sessions/consent?subject={iid}&all=true",
method="DELETE", prefix="/admin", ok_statuses=(404,))
ok("Hydra consent sessions revoked.")
# Clean up Messages Django user and mailbox
email = identity.get("traits", {}).get("email", "")
if email:
_delete_mailbox(email)
_cleanup_projects_user(email)
ok(f"Offboarding complete for {iid[:8]}...")
warn("Existing access tokens expire within ~1h (Hydra TTL).")
warn("App sessions (docs/people) expire within SESSION_COOKIE_AGE (~1h).")