feat(users): add disable/enable lockout commands; fix table output
- Add cmd_user_disable: disables Kratos identity (state: inactive) and revokes all sessions. Provides emergency lockout — user cannot log in again; existing Django app sessions expire within SESSION_COOKIE_AGE (1h). - Add cmd_user_enable: re-enables a previously disabled identity. - Wire disable/enable as subcommands of 'sunbeam user'. - Fix cmd_user_list: table() args were swapped and result was not printed.
This commit is contained in:
@@ -95,6 +95,12 @@ def main() -> None:
|
|||||||
p_user_recover = user_sub.add_parser("recover", help="Generate recovery link")
|
p_user_recover = user_sub.add_parser("recover", help="Generate recovery link")
|
||||||
p_user_recover.add_argument("target", help="Email or identity ID")
|
p_user_recover.add_argument("target", help="Email or identity ID")
|
||||||
|
|
||||||
|
p_user_disable = user_sub.add_parser("disable", help="Disable identity + revoke sessions (lockout)")
|
||||||
|
p_user_disable.add_argument("target", help="Email or identity ID")
|
||||||
|
|
||||||
|
p_user_enable = user_sub.add_parser("enable", help="Re-enable a disabled identity")
|
||||||
|
p_user_enable.add_argument("target", help="Email or identity ID")
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
if args.verb is None:
|
if args.verb is None:
|
||||||
@@ -164,7 +170,8 @@ def main() -> None:
|
|||||||
|
|
||||||
elif args.verb == "user":
|
elif args.verb == "user":
|
||||||
from sunbeam.users import (cmd_user_list, cmd_user_get, cmd_user_create,
|
from sunbeam.users import (cmd_user_list, cmd_user_get, cmd_user_create,
|
||||||
cmd_user_delete, cmd_user_recover)
|
cmd_user_delete, cmd_user_recover,
|
||||||
|
cmd_user_disable, cmd_user_enable)
|
||||||
action = getattr(args, "user_action", None)
|
action = getattr(args, "user_action", None)
|
||||||
if action is None:
|
if action is None:
|
||||||
p_user.print_help()
|
p_user.print_help()
|
||||||
@@ -179,6 +186,10 @@ def main() -> None:
|
|||||||
cmd_user_delete(args.target)
|
cmd_user_delete(args.target)
|
||||||
elif action == "recover":
|
elif action == "recover":
|
||||||
cmd_user_recover(args.target)
|
cmd_user_recover(args.target)
|
||||||
|
elif action == "disable":
|
||||||
|
cmd_user_disable(args.target)
|
||||||
|
elif action == "enable":
|
||||||
|
cmd_user_enable(args.target)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
parser.print_help()
|
parser.print_help()
|
||||||
|
|||||||
@@ -75,7 +75,7 @@ def cmd_user_list(search=""):
|
|||||||
display_name = str(name) if name else ""
|
display_name = str(name) if name else ""
|
||||||
rows.append([i["id"][:8] + "...", email, display_name, i.get("state", "active")])
|
rows.append([i["id"][:8] + "...", email, display_name, i.get("state", "active")])
|
||||||
|
|
||||||
table(["ID", "Email", "Name", "State"], rows)
|
print(table(rows, ["ID", "Email", "Name", "State"]))
|
||||||
|
|
||||||
|
|
||||||
def cmd_user_get(target):
|
def cmd_user_get(target):
|
||||||
@@ -140,3 +140,43 @@ def cmd_user_recover(target):
|
|||||||
print(recovery.get("recovery_link", ""))
|
print(recovery.get("recovery_link", ""))
|
||||||
ok("Recovery code (enter on the page above):")
|
ok("Recovery code (enter on the page above):")
|
||||||
print(recovery.get("recovery_code", ""))
|
print(recovery.get("recovery_code", ""))
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_user_disable(target):
|
||||||
|
"""Disable identity + revoke all Kratos sessions (emergency lockout).
|
||||||
|
|
||||||
|
After this:
|
||||||
|
- No new logins possible.
|
||||||
|
- Existing Hydra OAuth2 tokens are revoked.
|
||||||
|
- Django app sessions expire within SESSION_COOKIE_AGE (1h).
|
||||||
|
"""
|
||||||
|
step(f"Disabling identity: {target}")
|
||||||
|
with _port_forward() as base:
|
||||||
|
identity = _find_identity(base, target)
|
||||||
|
iid = identity["id"]
|
||||||
|
_api(base, f"/identities/{iid}", method="PUT", body={
|
||||||
|
"schema_id": identity["schema_id"],
|
||||||
|
"traits": identity["traits"],
|
||||||
|
"state": "inactive",
|
||||||
|
"metadata_public": identity.get("metadata_public"),
|
||||||
|
"metadata_admin": identity.get("metadata_admin"),
|
||||||
|
})
|
||||||
|
_api(base, f"/identities/{iid}/sessions", method="DELETE")
|
||||||
|
ok(f"Identity {iid[:8]}... disabled and all Kratos sessions revoked.")
|
||||||
|
warn("App sessions (docs/people) expire within SESSION_COOKIE_AGE — currently 1h.")
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_user_enable(target):
|
||||||
|
"""Re-enable a previously disabled identity."""
|
||||||
|
step(f"Enabling identity: {target}")
|
||||||
|
with _port_forward() as base:
|
||||||
|
identity = _find_identity(base, target)
|
||||||
|
iid = identity["id"]
|
||||||
|
_api(base, f"/identities/{iid}", method="PUT", body={
|
||||||
|
"schema_id": identity["schema_id"],
|
||||||
|
"traits": identity["traits"],
|
||||||
|
"state": "active",
|
||||||
|
"metadata_public": identity.get("metadata_public"),
|
||||||
|
"metadata_admin": identity.get("metadata_admin"),
|
||||||
|
})
|
||||||
|
ok(f"Identity {iid[:8]}... re-enabled.")
|
||||||
|
|||||||
Reference in New Issue
Block a user