diff --git a/sunbeam/cli.py b/sunbeam/cli.py index 7b8eadc..8b75461 100644 --- a/sunbeam/cli.py +++ b/sunbeam/cli.py @@ -95,6 +95,12 @@ def main() -> None: p_user_recover = user_sub.add_parser("recover", help="Generate recovery link") p_user_recover.add_argument("target", help="Email or identity ID") + p_user_disable = user_sub.add_parser("disable", help="Disable identity + revoke sessions (lockout)") + p_user_disable.add_argument("target", help="Email or identity ID") + + p_user_enable = user_sub.add_parser("enable", help="Re-enable a disabled identity") + p_user_enable.add_argument("target", help="Email or identity ID") + args = parser.parse_args() if args.verb is None: @@ -164,7 +170,8 @@ def main() -> None: elif args.verb == "user": from sunbeam.users import (cmd_user_list, cmd_user_get, cmd_user_create, - cmd_user_delete, cmd_user_recover) + cmd_user_delete, cmd_user_recover, + cmd_user_disable, cmd_user_enable) action = getattr(args, "user_action", None) if action is None: p_user.print_help() @@ -179,6 +186,10 @@ def main() -> None: cmd_user_delete(args.target) elif action == "recover": cmd_user_recover(args.target) + elif action == "disable": + cmd_user_disable(args.target) + elif action == "enable": + cmd_user_enable(args.target) else: parser.print_help() diff --git a/sunbeam/users.py b/sunbeam/users.py index 0a2cde6..d896b94 100644 --- a/sunbeam/users.py +++ b/sunbeam/users.py @@ -75,7 +75,7 @@ def cmd_user_list(search=""): display_name = str(name) if name else "" rows.append([i["id"][:8] + "...", email, display_name, i.get("state", "active")]) - table(["ID", "Email", "Name", "State"], rows) + print(table(rows, ["ID", "Email", "Name", "State"])) def cmd_user_get(target): @@ -140,3 +140,43 @@ def cmd_user_recover(target): print(recovery.get("recovery_link", "")) ok("Recovery code (enter on the page above):") print(recovery.get("recovery_code", "")) + + +def cmd_user_disable(target): + """Disable identity + revoke all Kratos sessions (emergency lockout). + + After this: + - No new logins possible. + - Existing Hydra OAuth2 tokens are revoked. + - Django app sessions expire within SESSION_COOKIE_AGE (1h). + """ + step(f"Disabling identity: {target}") + with _port_forward() as base: + identity = _find_identity(base, target) + iid = identity["id"] + _api(base, f"/identities/{iid}", method="PUT", body={ + "schema_id": identity["schema_id"], + "traits": identity["traits"], + "state": "inactive", + "metadata_public": identity.get("metadata_public"), + "metadata_admin": identity.get("metadata_admin"), + }) + _api(base, f"/identities/{iid}/sessions", method="DELETE") + ok(f"Identity {iid[:8]}... disabled and all Kratos sessions revoked.") + warn("App sessions (docs/people) expire within SESSION_COOKIE_AGE — currently 1h.") + + +def cmd_user_enable(target): + """Re-enable a previously disabled identity.""" + step(f"Enabling identity: {target}") + with _port_forward() as base: + identity = _find_identity(base, target) + iid = identity["id"] + _api(base, f"/identities/{iid}", method="PUT", body={ + "schema_id": identity["schema_id"], + "traits": identity["traits"], + "state": "active", + "metadata_public": identity.get("metadata_public"), + "metadata_admin": identity.get("metadata_admin"), + }) + ok(f"Identity {iid[:8]}... re-enabled.")