Add sunbeam check verb with service-level health probes

11 checks across 7 namespaces: gitea version+auth, postgres CNPG
readiness, valkey PONG, openbao sealed state, seaweedfs filer,
kratos health, hydra OIDC discovery, people HTTP (catches 502s),
people API, and livekit. Supports ns and ns/svc scoping.

- checks.py: new module with _http_get (no-redirect opener + mkcert SSL),
  kube_exec-based exec checks, and cmd_check dispatch
- kube.py: add kube_exec() and get_domain() (reads from cluster configmap)
- cli.py: add 'check [target]' verb
- 103 tests, all passing
This commit is contained in:
2026-03-02 21:49:57 +00:00
parent cdc109d728
commit 1573faa0fd
5 changed files with 625 additions and 0 deletions

269
sunbeam/checks.py Normal file
View File

@@ -0,0 +1,269 @@
"""Service-level health checks — functional probes beyond pod readiness."""
import base64
import json
import ssl
import subprocess
import urllib.error
import urllib.request
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from sunbeam.kube import get_domain, kube_exec, kube_out, parse_target
from sunbeam.output import ok, step, warn
@dataclass
class CheckResult:
name: str
ns: str
svc: str
passed: bool
detail: str = ""
def _ssl_ctx() -> ssl.SSLContext:
"""Return an SSL context that trusts the mkcert local CA if available."""
ctx = ssl.create_default_context()
try:
r = subprocess.run(["mkcert", "-CAROOT"], capture_output=True, text=True)
if r.returncode == 0:
ca_file = Path(r.stdout.strip()) / "rootCA.pem"
if ca_file.exists():
ctx.load_verify_locations(cafile=str(ca_file))
except FileNotFoundError:
pass
return ctx
def _kube_secret(ns: str, name: str, key: str) -> str:
"""Read a base64-encoded K8s secret value and return the decoded string."""
raw = kube_out("get", "secret", name, "-n", ns, f"-o=jsonpath={{.data.{key}}}")
if not raw:
return ""
try:
return base64.b64decode(raw + "==").decode()
except Exception:
return ""
class _NoRedirect(urllib.request.HTTPRedirectHandler):
"""Prevent urllib from following redirects so we can inspect the status code."""
def redirect_request(self, req, fp, code, msg, headers, newurl):
return None
def _opener(ssl_ctx: ssl.SSLContext) -> urllib.request.OpenerDirector:
return urllib.request.build_opener(
_NoRedirect(),
urllib.request.HTTPSHandler(context=ssl_ctx),
)
def _http_get(url: str, opener: urllib.request.OpenerDirector, *,
headers: dict | None = None, timeout: int = 10) -> tuple[int, bytes]:
"""Return (status_code, body). Redirects are not followed."""
req = urllib.request.Request(url, headers=headers or {})
try:
with opener.open(req, timeout=timeout) as resp:
return resp.status, resp.read()
except urllib.error.HTTPError as e:
return e.code, b""
# ── Individual checks ─────────────────────────────────────────────────────────
def check_gitea_version(domain: str, opener) -> CheckResult:
"""GET /api/v1/version -> JSON with version field."""
url = f"https://src.{domain}/api/v1/version"
try:
status, body = _http_get(url, opener)
if status == 200:
ver = json.loads(body).get("version", "?")
return CheckResult("gitea-version", "devtools", "gitea", True, f"v{ver}")
return CheckResult("gitea-version", "devtools", "gitea", False, f"HTTP {status}")
except urllib.error.URLError as e:
return CheckResult("gitea-version", "devtools", "gitea", False, str(e.reason))
def check_gitea_auth(domain: str, opener) -> CheckResult:
"""GET /api/v1/user with admin credentials -> 200 and login field."""
username = _kube_secret("devtools", "gitea-admin-credentials", "admin-username") or "gitea_admin"
password = _kube_secret("devtools", "gitea-admin-credentials", "admin-password")
if not password:
return CheckResult("gitea-auth", "devtools", "gitea", False,
"admin-password not found in secret")
creds = base64.b64encode(f"{username}:{password}".encode()).decode()
url = f"https://src.{domain}/api/v1/user"
try:
status, body = _http_get(url, opener, headers={"Authorization": f"Basic {creds}"})
if status == 200:
login = json.loads(body).get("login", "?")
return CheckResult("gitea-auth", "devtools", "gitea", True, f"user={login}")
return CheckResult("gitea-auth", "devtools", "gitea", False, f"HTTP {status}")
except urllib.error.URLError as e:
return CheckResult("gitea-auth", "devtools", "gitea", False, str(e.reason))
def check_postgres(domain: str, opener) -> CheckResult:
"""CNPG Cluster readyInstances == instances."""
ready = kube_out("get", "cluster", "postgres", "-n", "data",
"-o=jsonpath={.status.readyInstances}")
total = kube_out("get", "cluster", "postgres", "-n", "data",
"-o=jsonpath={.status.instances}")
if ready and total and ready == total:
return CheckResult("postgres", "data", "postgres", True, f"{ready}/{total} ready")
detail = (f"{ready or '?'}/{total or '?'} ready"
if (ready or total) else "cluster not found")
return CheckResult("postgres", "data", "postgres", False, detail)
def check_valkey(domain: str, opener) -> CheckResult:
"""kubectl exec valkey pod -- valkey-cli ping -> PONG."""
pod = kube_out("get", "pods", "-n", "data", "-l", "app=valkey",
"--no-headers", "-o=custom-columns=NAME:.metadata.name")
pod = pod.splitlines()[0].strip() if pod else ""
if not pod:
return CheckResult("valkey", "data", "valkey", False, "no valkey pod")
_, out = kube_exec("data", pod, "valkey-cli", "ping")
return CheckResult("valkey", "data", "valkey", out == "PONG", out or "no response")
def check_openbao(domain: str, opener) -> CheckResult:
"""kubectl exec openbao-0 -- bao status -format=json -> initialized + unsealed."""
rc, out = kube_exec("data", "openbao-0", "bao", "status", "-format=json")
if not out:
return CheckResult("openbao", "data", "openbao", False, "no response")
try:
data = json.loads(out)
init = data.get("initialized", False)
sealed = data.get("sealed", True)
return CheckResult("openbao", "data", "openbao", init and not sealed,
f"init={init}, sealed={sealed}")
except json.JSONDecodeError:
return CheckResult("openbao", "data", "openbao", False, out[:80])
def check_seaweedfs(domain: str, opener) -> CheckResult:
"""kubectl exec seaweedfs-filer pod -- wget /dir/status -> filer responding."""
pod = kube_out("get", "pods", "-n", "storage", "-l", "app=seaweedfs-filer",
"--no-headers", "-o=custom-columns=NAME:.metadata.name")
pod = pod.splitlines()[0].strip() if pod else ""
if not pod:
return CheckResult("seaweedfs", "storage", "seaweedfs", False, "no seaweedfs-filer pod")
rc, out = kube_exec("storage", pod, "wget", "-qO-", "http://localhost:8888/dir/status")
if rc == 0 and out:
return CheckResult("seaweedfs", "storage", "seaweedfs", True, "filer responding")
return CheckResult("seaweedfs", "storage", "seaweedfs", False, "filer not responding")
def check_kratos(domain: str, opener) -> CheckResult:
"""GET /kratos/health/ready -> 200."""
url = f"https://auth.{domain}/kratos/health/ready"
try:
status, body = _http_get(url, opener)
ok_flag = status == 200
detail = f"HTTP {status}"
if not ok_flag and body:
detail += f": {body.decode(errors='replace')[:80]}"
return CheckResult("kratos", "ory", "kratos", ok_flag, detail)
except urllib.error.URLError as e:
return CheckResult("kratos", "ory", "kratos", False, str(e.reason))
def check_hydra_oidc(domain: str, opener) -> CheckResult:
"""GET /.well-known/openid-configuration -> 200 with issuer field."""
url = f"https://auth.{domain}/.well-known/openid-configuration"
try:
status, body = _http_get(url, opener)
if status == 200:
issuer = json.loads(body).get("issuer", "?")
return CheckResult("hydra-oidc", "ory", "hydra", True, f"issuer={issuer}")
return CheckResult("hydra-oidc", "ory", "hydra", False, f"HTTP {status}")
except urllib.error.URLError as e:
return CheckResult("hydra-oidc", "ory", "hydra", False, str(e.reason))
def check_people(domain: str, opener) -> CheckResult:
"""GET https://people.{domain}/ -> any response < 500 (302 to OIDC is fine)."""
url = f"https://people.{domain}/"
try:
status, _ = _http_get(url, opener)
return CheckResult("people", "lasuite", "people", status < 500, f"HTTP {status}")
except urllib.error.URLError as e:
return CheckResult("people", "lasuite", "people", False, str(e.reason))
def check_people_api(domain: str, opener) -> CheckResult:
"""GET /api/v1.0/config/ -> any response < 500 (401 auth-required is fine)."""
url = f"https://people.{domain}/api/v1.0/config/"
try:
status, _ = _http_get(url, opener)
return CheckResult("people-api", "lasuite", "people", status < 500, f"HTTP {status}")
except urllib.error.URLError as e:
return CheckResult("people-api", "lasuite", "people", False, str(e.reason))
def check_livekit(domain: str, opener) -> CheckResult:
"""kubectl exec livekit-server pod -- wget localhost:7880/ -> rc 0."""
pod = kube_out("get", "pods", "-n", "media", "-l", "app.kubernetes.io/name=livekit-server",
"--no-headers", "-o=custom-columns=NAME:.metadata.name")
pod = pod.splitlines()[0].strip() if pod else ""
if not pod:
return CheckResult("livekit", "media", "livekit", False, "no livekit pod")
rc, _ = kube_exec("media", pod, "wget", "-qO-", "http://localhost:7880/")
if rc == 0:
return CheckResult("livekit", "media", "livekit", True, "server responding")
return CheckResult("livekit", "media", "livekit", False, "server not responding")
# ── Check registry ────────────────────────────────────────────────────────────
CHECKS: list[tuple[Any, str, str]] = [
(check_gitea_version, "devtools", "gitea"),
(check_gitea_auth, "devtools", "gitea"),
(check_postgres, "data", "postgres"),
(check_valkey, "data", "valkey"),
(check_openbao, "data", "openbao"),
(check_seaweedfs, "storage", "seaweedfs"),
(check_kratos, "ory", "kratos"),
(check_hydra_oidc, "ory", "hydra"),
(check_people, "lasuite", "people"),
(check_people_api, "lasuite", "people"),
(check_livekit, "media", "livekit"),
]
def cmd_check(target: str | None) -> None:
"""Run service-level health checks, optionally scoped to a namespace or service."""
step("Service health checks...")
domain = get_domain()
ssl_ctx = _ssl_ctx()
op = _opener(ssl_ctx)
ns_filter, svc_filter = parse_target(target) if target else (None, None)
fns = [
fn for fn, ns, svc in CHECKS
if (ns_filter is None or ns == ns_filter)
and (svc_filter is None or svc == svc_filter)
]
if not fns:
warn(f"No checks match target: {target}")
return
results = []
for fn in fns:
r = fn(domain, op)
results.append(r)
icon = "\u2713" if r.passed else "\u2717"
detail = f" ({r.detail})" if r.detail else ""
print(f" {icon} {r.ns}/{r.svc} [{r.name}]{detail}")
print()
failed = [r for r in results if not r.passed]
if failed:
warn(f"{len(failed)} check(s) failed.")
else:
ok(f"All {len(results)} check(s) passed.")

View File

@@ -53,6 +53,11 @@ def main() -> None:
p_build.add_argument("what", choices=["proxy"], p_build.add_argument("what", choices=["proxy"],
help="What to build (proxy)") help="What to build (proxy)")
# sunbeam check [ns[/name]]
p_check = sub.add_parser("check", help="Functional service health checks")
p_check.add_argument("target", nargs="?", default=None,
help="namespace or namespace/name")
# sunbeam mirror # sunbeam mirror
sub.add_parser("mirror", help="Mirror amd64-only La Suite images") sub.add_parser("mirror", help="Mirror amd64-only La Suite images")
@@ -106,6 +111,10 @@ def main() -> None:
from sunbeam.images import cmd_build from sunbeam.images import cmd_build
cmd_build(args.what) cmd_build(args.what)
elif args.verb == "check":
from sunbeam.checks import cmd_check
cmd_check(args.target)
elif args.verb == "mirror": elif args.verb == "mirror":
from sunbeam.images import cmd_mirror from sunbeam.images import cmd_mirror
cmd_mirror() cmd_mirror()

View File

@@ -93,6 +93,28 @@ def create_secret(ns: str, name: str, **literals) -> None:
"--field-manager=sunbeam", "-f", "-", input=manifest) "--field-manager=sunbeam", "-f", "-", input=manifest)
def kube_exec(ns: str, pod: str, *cmd: str) -> tuple[int, str]:
"""Run a command inside a pod. Returns (returncode, stdout)."""
r = run_tool("kubectl", "--context=sunbeam", "exec", "-n", ns, pod,
"--", *cmd,
capture_output=True, text=True, check=False)
return r.returncode, r.stdout.strip()
def get_domain() -> str:
"""Discover the active domain from cluster state.
Reads a known substituted configmap value; falls back to the Lima VM IP.
"""
raw = kube_out("get", "configmap", "lasuite-oidc-provider", "-n", "lasuite",
"-o=jsonpath={.data.OIDC_OP_JWKS_ENDPOINT}")
if raw and "https://auth." in raw:
# e.g. "https://auth.192.168.105.2.sslip.io/.well-known/jwks.json"
return raw.split("https://auth.")[1].split("/")[0]
ip = get_lima_ip()
return f"{ip}.sslip.io"
def kustomize_build(overlay: Path, domain: str) -> str: def kustomize_build(overlay: Path, domain: str) -> str:
"""Run kustomize build --enable-helm and apply domain substitution.""" """Run kustomize build --enable-helm and apply domain substitution."""
r = run_tool( r = run_tool(

View File

@@ -0,0 +1,284 @@
"""Tests for checks.py — service-level health probes."""
import json
import unittest
from unittest.mock import MagicMock, patch
class TestCheckGiteaVersion(unittest.TestCase):
def test_ok_returns_version(self):
body = json.dumps({"version": "1.21.0"}).encode()
with patch("sunbeam.checks._http_get", return_value=(200, body)):
from sunbeam import checks
r = checks.check_gitea_version("testdomain", None)
self.assertTrue(r.passed)
self.assertIn("1.21.0", r.detail)
def test_non_200_fails(self):
with patch("sunbeam.checks._http_get", return_value=(502, b"")):
from sunbeam import checks
r = checks.check_gitea_version("testdomain", None)
self.assertFalse(r.passed)
self.assertIn("502", r.detail)
def test_connection_error_fails(self):
import urllib.error
with patch("sunbeam.checks._http_get", side_effect=urllib.error.URLError("refused")):
from sunbeam import checks
r = checks.check_gitea_version("testdomain", None)
self.assertFalse(r.passed)
class TestCheckGiteaAuth(unittest.TestCase):
def _secret(self, key, val):
def side_effect(ns, name, k):
return val if k == key else "gitea_admin"
return side_effect
def test_ok_returns_login(self):
body = json.dumps({"login": "gitea_admin"}).encode()
with patch("sunbeam.checks._kube_secret",
side_effect=self._secret("admin-password", "hunter2")):
with patch("sunbeam.checks._http_get", return_value=(200, body)):
from sunbeam import checks
r = checks.check_gitea_auth("testdomain", None)
self.assertTrue(r.passed)
self.assertIn("gitea_admin", r.detail)
def test_missing_password_fails(self):
with patch("sunbeam.checks._kube_secret", return_value=""):
from sunbeam import checks
r = checks.check_gitea_auth("testdomain", None)
self.assertFalse(r.passed)
self.assertIn("secret", r.detail)
def test_non_200_fails(self):
with patch("sunbeam.checks._kube_secret",
side_effect=self._secret("admin-password", "hunter2")):
with patch("sunbeam.checks._http_get", return_value=(401, b"")):
from sunbeam import checks
r = checks.check_gitea_auth("testdomain", None)
self.assertFalse(r.passed)
class TestCheckPostgres(unittest.TestCase):
def test_ready_passes(self):
with patch("sunbeam.checks.kube_out", side_effect=["1", "1"]):
from sunbeam import checks
r = checks.check_postgres("testdomain", None)
self.assertTrue(r.passed)
self.assertIn("1/1", r.detail)
def test_not_ready_fails(self):
with patch("sunbeam.checks.kube_out", side_effect=["0", "1"]):
from sunbeam import checks
r = checks.check_postgres("testdomain", None)
self.assertFalse(r.passed)
def test_cluster_not_found_fails(self):
with patch("sunbeam.checks.kube_out", return_value=""):
from sunbeam import checks
r = checks.check_postgres("testdomain", None)
self.assertFalse(r.passed)
class TestCheckValkey(unittest.TestCase):
def test_pong_passes(self):
with patch("sunbeam.checks.kube_out", return_value="valkey-abc"):
with patch("sunbeam.checks.kube_exec", return_value=(0, "PONG")):
from sunbeam import checks
r = checks.check_valkey("testdomain", None)
self.assertTrue(r.passed)
def test_no_pod_fails(self):
with patch("sunbeam.checks.kube_out", return_value=""):
from sunbeam import checks
r = checks.check_valkey("testdomain", None)
self.assertFalse(r.passed)
def test_no_pong_fails(self):
with patch("sunbeam.checks.kube_out", return_value="valkey-abc"):
with patch("sunbeam.checks.kube_exec", return_value=(1, "")):
from sunbeam import checks
r = checks.check_valkey("testdomain", None)
self.assertFalse(r.passed)
class TestCheckOpenbao(unittest.TestCase):
def test_unsealed_passes(self):
out = json.dumps({"initialized": True, "sealed": False})
with patch("sunbeam.checks.kube_exec", return_value=(0, out)):
from sunbeam import checks
r = checks.check_openbao("testdomain", None)
self.assertTrue(r.passed)
def test_sealed_fails(self):
out = json.dumps({"initialized": True, "sealed": True})
with patch("sunbeam.checks.kube_exec", return_value=(2, out)):
from sunbeam import checks
r = checks.check_openbao("testdomain", None)
self.assertFalse(r.passed)
def test_no_output_fails(self):
with patch("sunbeam.checks.kube_exec", return_value=(1, "")):
from sunbeam import checks
r = checks.check_openbao("testdomain", None)
self.assertFalse(r.passed)
class TestCheckSeaweedfs(unittest.TestCase):
def test_responding_passes(self):
with patch("sunbeam.checks.kube_out", return_value="seaweedfs-filer-abc"):
with patch("sunbeam.checks.kube_exec", return_value=(0, "filer status data")):
from sunbeam import checks
r = checks.check_seaweedfs("testdomain", None)
self.assertTrue(r.passed)
def test_no_pod_fails(self):
with patch("sunbeam.checks.kube_out", return_value=""):
from sunbeam import checks
r = checks.check_seaweedfs("testdomain", None)
self.assertFalse(r.passed)
def test_exec_fails(self):
with patch("sunbeam.checks.kube_out", return_value="seaweedfs-filer-abc"):
with patch("sunbeam.checks.kube_exec", return_value=(1, "")):
from sunbeam import checks
r = checks.check_seaweedfs("testdomain", None)
self.assertFalse(r.passed)
class TestCheckKratos(unittest.TestCase):
def test_200_passes(self):
with patch("sunbeam.checks._http_get", return_value=(200, b"")):
from sunbeam import checks
r = checks.check_kratos("testdomain", None)
self.assertTrue(r.passed)
def test_503_fails(self):
with patch("sunbeam.checks._http_get", return_value=(503, b"not ready")):
from sunbeam import checks
r = checks.check_kratos("testdomain", None)
self.assertFalse(r.passed)
self.assertIn("503", r.detail)
class TestCheckHydraOidc(unittest.TestCase):
def test_200_with_issuer_passes(self):
body = json.dumps({"issuer": "https://auth.testdomain/"}).encode()
with patch("sunbeam.checks._http_get", return_value=(200, body)):
from sunbeam import checks
r = checks.check_hydra_oidc("testdomain", None)
self.assertTrue(r.passed)
self.assertIn("testdomain", r.detail)
def test_502_fails(self):
with patch("sunbeam.checks._http_get", return_value=(502, b"")):
from sunbeam import checks
r = checks.check_hydra_oidc("testdomain", None)
self.assertFalse(r.passed)
class TestCheckPeople(unittest.TestCase):
def test_200_passes(self):
with patch("sunbeam.checks._http_get", return_value=(200, b"<html>")):
from sunbeam import checks
r = checks.check_people("testdomain", None)
self.assertTrue(r.passed)
def test_302_redirect_passes(self):
with patch("sunbeam.checks._http_get", return_value=(302, b"")):
from sunbeam import checks
r = checks.check_people("testdomain", None)
self.assertTrue(r.passed)
self.assertIn("302", r.detail)
def test_502_fails(self):
with patch("sunbeam.checks._http_get", return_value=(502, b"")):
from sunbeam import checks
r = checks.check_people("testdomain", None)
self.assertFalse(r.passed)
self.assertIn("502", r.detail)
class TestCheckPeopleApi(unittest.TestCase):
def test_200_passes(self):
with patch("sunbeam.checks._http_get", return_value=(200, b"{}")):
from sunbeam import checks
r = checks.check_people_api("testdomain", None)
self.assertTrue(r.passed)
def test_401_auth_required_passes(self):
with patch("sunbeam.checks._http_get", return_value=(401, b"")):
from sunbeam import checks
r = checks.check_people_api("testdomain", None)
self.assertTrue(r.passed)
def test_502_fails(self):
with patch("sunbeam.checks._http_get", return_value=(502, b"")):
from sunbeam import checks
r = checks.check_people_api("testdomain", None)
self.assertFalse(r.passed)
class TestCheckLivekit(unittest.TestCase):
def test_responding_passes(self):
with patch("sunbeam.checks.kube_out", return_value="livekit-server-abc"):
with patch("sunbeam.checks.kube_exec", return_value=(0, "")):
from sunbeam import checks
r = checks.check_livekit("testdomain", None)
self.assertTrue(r.passed)
def test_no_pod_fails(self):
with patch("sunbeam.checks.kube_out", return_value=""):
from sunbeam import checks
r = checks.check_livekit("testdomain", None)
self.assertFalse(r.passed)
def test_exec_fails(self):
with patch("sunbeam.checks.kube_out", return_value="livekit-server-abc"):
with patch("sunbeam.checks.kube_exec", return_value=(1, "")):
from sunbeam import checks
r = checks.check_livekit("testdomain", None)
self.assertFalse(r.passed)
class TestCmdCheck(unittest.TestCase):
def _run(self, target, mock_list):
from sunbeam import checks
result = checks.CheckResult("x", "ns", "svc", True, "ok")
fns = [MagicMock(return_value=result) for _ in mock_list]
patched = list(zip(fns, [ns for _, ns, _ in mock_list], [s for _, _, s in mock_list]))
with patch("sunbeam.checks.get_domain", return_value="td"), \
patch("sunbeam.checks._ssl_ctx", return_value=None), \
patch("sunbeam.checks._opener", return_value=None), \
patch.object(checks, "CHECKS", patched):
checks.cmd_check(target)
return fns
def test_no_target_runs_all(self):
mock_list = [("unused", "devtools", "gitea"), ("unused", "data", "postgres")]
fns = self._run(None, mock_list)
fns[0].assert_called_once_with("td", None)
fns[1].assert_called_once_with("td", None)
def test_ns_filter_skips_other_namespaces(self):
mock_list = [("unused", "devtools", "gitea"), ("unused", "data", "postgres")]
fns = self._run("devtools", mock_list)
fns[0].assert_called_once()
fns[1].assert_not_called()
def test_svc_filter(self):
mock_list = [("unused", "ory", "kratos"), ("unused", "ory", "hydra")]
fns = self._run("ory/kratos", mock_list)
fns[0].assert_called_once()
fns[1].assert_not_called()
def test_no_match_warns(self):
from sunbeam import checks
with patch("sunbeam.checks.get_domain", return_value="td"), \
patch("sunbeam.checks._ssl_ctx", return_value=None), \
patch("sunbeam.checks._opener", return_value=None), \
patch.object(checks, "CHECKS", []), \
patch("sunbeam.checks.warn") as mock_warn:
checks.cmd_check("nonexistent")
mock_warn.assert_called_once()

View File

@@ -31,6 +31,8 @@ class TestArgParsing(unittest.TestCase):
p_build.add_argument("what", choices=["proxy"]) p_build.add_argument("what", choices=["proxy"])
sub.add_parser("mirror") sub.add_parser("mirror")
sub.add_parser("bootstrap") sub.add_parser("bootstrap")
p_check = sub.add_parser("check")
p_check.add_argument("target", nargs="?", default=None)
return parser.parse_args(argv) return parser.parse_args(argv)
def test_up(self): def test_up(self):
@@ -83,6 +85,21 @@ class TestArgParsing(unittest.TestCase):
with self.assertRaises(SystemExit): with self.assertRaises(SystemExit):
self._parse(["get", "ory/kratos-abc", "-o", "toml"]) self._parse(["get", "ory/kratos-abc", "-o", "toml"])
def test_check_no_target(self):
args = self._parse(["check"])
self.assertEqual(args.verb, "check")
self.assertIsNone(args.target)
def test_check_with_namespace(self):
args = self._parse(["check", "devtools"])
self.assertEqual(args.verb, "check")
self.assertEqual(args.target, "devtools")
def test_check_with_service(self):
args = self._parse(["check", "lasuite/people"])
self.assertEqual(args.verb, "check")
self.assertEqual(args.target, "lasuite/people")
def test_no_args_verb_is_none(self): def test_no_args_verb_is_none(self):
args = self._parse([]) args = self._parse([])
self.assertIsNone(args.verb) self.assertIsNone(args.verb)
@@ -189,3 +206,27 @@ class TestCliDispatch(unittest.TestCase):
except SystemExit: except SystemExit:
pass pass
mock_build.assert_called_once_with("proxy") mock_build.assert_called_once_with("proxy")
def test_check_no_target(self):
mock_check = MagicMock()
with patch.object(sys, "argv", ["sunbeam", "check"]):
with patch.dict("sys.modules", {"sunbeam.checks": MagicMock(cmd_check=mock_check)}):
import importlib, sunbeam.cli as cli_mod
importlib.reload(cli_mod)
try:
cli_mod.main()
except SystemExit:
pass
mock_check.assert_called_once_with(None)
def test_check_with_target(self):
mock_check = MagicMock()
with patch.object(sys, "argv", ["sunbeam", "check", "lasuite/people"]):
with patch.dict("sys.modules", {"sunbeam.checks": MagicMock(cmd_check=mock_check)}):
import importlib, sunbeam.cli as cli_mod
importlib.reload(cli_mod)
try:
cli_mod.main()
except SystemExit:
pass
mock_check.assert_called_once_with("lasuite/people")