feat(users): add disable/enable lockout commands; fix table output

- Add cmd_user_disable: disables Kratos identity (state: inactive) and
  revokes all sessions. Provides emergency lockout — user cannot log in
  again; existing Django app sessions expire within SESSION_COOKIE_AGE (1h).

- Add cmd_user_enable: re-enables a previously disabled identity.

- Wire disable/enable as subcommands of 'sunbeam user'.

- Fix cmd_user_list: table() args were swapped and result was not printed.
This commit is contained in:
2026-03-03 18:07:51 +00:00
parent cb5a290b0c
commit c759f2c014
2 changed files with 53 additions and 2 deletions

View File

@@ -95,6 +95,12 @@ def main() -> None:
p_user_recover = user_sub.add_parser("recover", help="Generate recovery link") p_user_recover = user_sub.add_parser("recover", help="Generate recovery link")
p_user_recover.add_argument("target", help="Email or identity ID") p_user_recover.add_argument("target", help="Email or identity ID")
p_user_disable = user_sub.add_parser("disable", help="Disable identity + revoke sessions (lockout)")
p_user_disable.add_argument("target", help="Email or identity ID")
p_user_enable = user_sub.add_parser("enable", help="Re-enable a disabled identity")
p_user_enable.add_argument("target", help="Email or identity ID")
args = parser.parse_args() args = parser.parse_args()
if args.verb is None: if args.verb is None:
@@ -164,7 +170,8 @@ def main() -> None:
elif args.verb == "user": elif args.verb == "user":
from sunbeam.users import (cmd_user_list, cmd_user_get, cmd_user_create, from sunbeam.users import (cmd_user_list, cmd_user_get, cmd_user_create,
cmd_user_delete, cmd_user_recover) cmd_user_delete, cmd_user_recover,
cmd_user_disable, cmd_user_enable)
action = getattr(args, "user_action", None) action = getattr(args, "user_action", None)
if action is None: if action is None:
p_user.print_help() p_user.print_help()
@@ -179,6 +186,10 @@ def main() -> None:
cmd_user_delete(args.target) cmd_user_delete(args.target)
elif action == "recover": elif action == "recover":
cmd_user_recover(args.target) cmd_user_recover(args.target)
elif action == "disable":
cmd_user_disable(args.target)
elif action == "enable":
cmd_user_enable(args.target)
else: else:
parser.print_help() parser.print_help()

View File

@@ -75,7 +75,7 @@ def cmd_user_list(search=""):
display_name = str(name) if name else "" display_name = str(name) if name else ""
rows.append([i["id"][:8] + "...", email, display_name, i.get("state", "active")]) rows.append([i["id"][:8] + "...", email, display_name, i.get("state", "active")])
table(["ID", "Email", "Name", "State"], rows) print(table(rows, ["ID", "Email", "Name", "State"]))
def cmd_user_get(target): def cmd_user_get(target):
@@ -140,3 +140,43 @@ def cmd_user_recover(target):
print(recovery.get("recovery_link", "")) print(recovery.get("recovery_link", ""))
ok("Recovery code (enter on the page above):") ok("Recovery code (enter on the page above):")
print(recovery.get("recovery_code", "")) print(recovery.get("recovery_code", ""))
def cmd_user_disable(target):
"""Disable identity + revoke all Kratos sessions (emergency lockout).
After this:
- No new logins possible.
- Existing Hydra OAuth2 tokens are revoked.
- Django app sessions expire within SESSION_COOKIE_AGE (1h).
"""
step(f"Disabling identity: {target}")
with _port_forward() as base:
identity = _find_identity(base, target)
iid = identity["id"]
_api(base, f"/identities/{iid}", method="PUT", body={
"schema_id": identity["schema_id"],
"traits": identity["traits"],
"state": "inactive",
"metadata_public": identity.get("metadata_public"),
"metadata_admin": identity.get("metadata_admin"),
})
_api(base, f"/identities/{iid}/sessions", method="DELETE")
ok(f"Identity {iid[:8]}... disabled and all Kratos sessions revoked.")
warn("App sessions (docs/people) expire within SESSION_COOKIE_AGE — currently 1h.")
def cmd_user_enable(target):
"""Re-enable a previously disabled identity."""
step(f"Enabling identity: {target}")
with _port_forward() as base:
identity = _find_identity(base, target)
iid = identity["id"]
_api(base, f"/identities/{iid}", method="PUT", body={
"schema_id": identity["schema_id"],
"traits": identity["traits"],
"state": "active",
"metadata_public": identity.get("metadata_public"),
"metadata_admin": identity.get("metadata_admin"),
})
ok(f"Identity {iid[:8]}... re-enabled.")