diff --git a/sunbeam/checks.py b/sunbeam/checks.py new file mode 100644 index 0000000..dca3897 --- /dev/null +++ b/sunbeam/checks.py @@ -0,0 +1,269 @@ +"""Service-level health checks — functional probes beyond pod readiness.""" +import base64 +import json +import ssl +import subprocess +import urllib.error +import urllib.request +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +from sunbeam.kube import get_domain, kube_exec, kube_out, parse_target +from sunbeam.output import ok, step, warn + + +@dataclass +class CheckResult: + name: str + ns: str + svc: str + passed: bool + detail: str = "" + + +def _ssl_ctx() -> ssl.SSLContext: + """Return an SSL context that trusts the mkcert local CA if available.""" + ctx = ssl.create_default_context() + try: + r = subprocess.run(["mkcert", "-CAROOT"], capture_output=True, text=True) + if r.returncode == 0: + ca_file = Path(r.stdout.strip()) / "rootCA.pem" + if ca_file.exists(): + ctx.load_verify_locations(cafile=str(ca_file)) + except FileNotFoundError: + pass + return ctx + + +def _kube_secret(ns: str, name: str, key: str) -> str: + """Read a base64-encoded K8s secret value and return the decoded string.""" + raw = kube_out("get", "secret", name, "-n", ns, f"-o=jsonpath={{.data.{key}}}") + if not raw: + return "" + try: + return base64.b64decode(raw + "==").decode() + except Exception: + return "" + + +class _NoRedirect(urllib.request.HTTPRedirectHandler): + """Prevent urllib from following redirects so we can inspect the status code.""" + def redirect_request(self, req, fp, code, msg, headers, newurl): + return None + + +def _opener(ssl_ctx: ssl.SSLContext) -> urllib.request.OpenerDirector: + return urllib.request.build_opener( + _NoRedirect(), + urllib.request.HTTPSHandler(context=ssl_ctx), + ) + + +def _http_get(url: str, opener: urllib.request.OpenerDirector, *, + headers: dict | None = None, timeout: int = 10) -> tuple[int, bytes]: + """Return (status_code, body). Redirects are not followed.""" + req = urllib.request.Request(url, headers=headers or {}) + try: + with opener.open(req, timeout=timeout) as resp: + return resp.status, resp.read() + except urllib.error.HTTPError as e: + return e.code, b"" + + +# ── Individual checks ───────────────────────────────────────────────────────── + +def check_gitea_version(domain: str, opener) -> CheckResult: + """GET /api/v1/version -> JSON with version field.""" + url = f"https://src.{domain}/api/v1/version" + try: + status, body = _http_get(url, opener) + if status == 200: + ver = json.loads(body).get("version", "?") + return CheckResult("gitea-version", "devtools", "gitea", True, f"v{ver}") + return CheckResult("gitea-version", "devtools", "gitea", False, f"HTTP {status}") + except urllib.error.URLError as e: + return CheckResult("gitea-version", "devtools", "gitea", False, str(e.reason)) + + +def check_gitea_auth(domain: str, opener) -> CheckResult: + """GET /api/v1/user with admin credentials -> 200 and login field.""" + username = _kube_secret("devtools", "gitea-admin-credentials", "admin-username") or "gitea_admin" + password = _kube_secret("devtools", "gitea-admin-credentials", "admin-password") + if not password: + return CheckResult("gitea-auth", "devtools", "gitea", False, + "admin-password not found in secret") + creds = base64.b64encode(f"{username}:{password}".encode()).decode() + url = f"https://src.{domain}/api/v1/user" + try: + status, body = _http_get(url, opener, headers={"Authorization": f"Basic {creds}"}) + if status == 200: + login = json.loads(body).get("login", "?") + return CheckResult("gitea-auth", "devtools", "gitea", True, f"user={login}") + return CheckResult("gitea-auth", "devtools", "gitea", False, f"HTTP {status}") + except urllib.error.URLError as e: + return CheckResult("gitea-auth", "devtools", "gitea", False, str(e.reason)) + + +def check_postgres(domain: str, opener) -> CheckResult: + """CNPG Cluster readyInstances == instances.""" + ready = kube_out("get", "cluster", "postgres", "-n", "data", + "-o=jsonpath={.status.readyInstances}") + total = kube_out("get", "cluster", "postgres", "-n", "data", + "-o=jsonpath={.status.instances}") + if ready and total and ready == total: + return CheckResult("postgres", "data", "postgres", True, f"{ready}/{total} ready") + detail = (f"{ready or '?'}/{total or '?'} ready" + if (ready or total) else "cluster not found") + return CheckResult("postgres", "data", "postgres", False, detail) + + +def check_valkey(domain: str, opener) -> CheckResult: + """kubectl exec valkey pod -- valkey-cli ping -> PONG.""" + pod = kube_out("get", "pods", "-n", "data", "-l", "app=valkey", + "--no-headers", "-o=custom-columns=NAME:.metadata.name") + pod = pod.splitlines()[0].strip() if pod else "" + if not pod: + return CheckResult("valkey", "data", "valkey", False, "no valkey pod") + _, out = kube_exec("data", pod, "valkey-cli", "ping") + return CheckResult("valkey", "data", "valkey", out == "PONG", out or "no response") + + +def check_openbao(domain: str, opener) -> CheckResult: + """kubectl exec openbao-0 -- bao status -format=json -> initialized + unsealed.""" + rc, out = kube_exec("data", "openbao-0", "bao", "status", "-format=json") + if not out: + return CheckResult("openbao", "data", "openbao", False, "no response") + try: + data = json.loads(out) + init = data.get("initialized", False) + sealed = data.get("sealed", True) + return CheckResult("openbao", "data", "openbao", init and not sealed, + f"init={init}, sealed={sealed}") + except json.JSONDecodeError: + return CheckResult("openbao", "data", "openbao", False, out[:80]) + + +def check_seaweedfs(domain: str, opener) -> CheckResult: + """kubectl exec seaweedfs-filer pod -- wget /dir/status -> filer responding.""" + pod = kube_out("get", "pods", "-n", "storage", "-l", "app=seaweedfs-filer", + "--no-headers", "-o=custom-columns=NAME:.metadata.name") + pod = pod.splitlines()[0].strip() if pod else "" + if not pod: + return CheckResult("seaweedfs", "storage", "seaweedfs", False, "no seaweedfs-filer pod") + rc, out = kube_exec("storage", pod, "wget", "-qO-", "http://localhost:8888/dir/status") + if rc == 0 and out: + return CheckResult("seaweedfs", "storage", "seaweedfs", True, "filer responding") + return CheckResult("seaweedfs", "storage", "seaweedfs", False, "filer not responding") + + +def check_kratos(domain: str, opener) -> CheckResult: + """GET /kratos/health/ready -> 200.""" + url = f"https://auth.{domain}/kratos/health/ready" + try: + status, body = _http_get(url, opener) + ok_flag = status == 200 + detail = f"HTTP {status}" + if not ok_flag and body: + detail += f": {body.decode(errors='replace')[:80]}" + return CheckResult("kratos", "ory", "kratos", ok_flag, detail) + except urllib.error.URLError as e: + return CheckResult("kratos", "ory", "kratos", False, str(e.reason)) + + +def check_hydra_oidc(domain: str, opener) -> CheckResult: + """GET /.well-known/openid-configuration -> 200 with issuer field.""" + url = f"https://auth.{domain}/.well-known/openid-configuration" + try: + status, body = _http_get(url, opener) + if status == 200: + issuer = json.loads(body).get("issuer", "?") + return CheckResult("hydra-oidc", "ory", "hydra", True, f"issuer={issuer}") + return CheckResult("hydra-oidc", "ory", "hydra", False, f"HTTP {status}") + except urllib.error.URLError as e: + return CheckResult("hydra-oidc", "ory", "hydra", False, str(e.reason)) + + +def check_people(domain: str, opener) -> CheckResult: + """GET https://people.{domain}/ -> any response < 500 (302 to OIDC is fine).""" + url = f"https://people.{domain}/" + try: + status, _ = _http_get(url, opener) + return CheckResult("people", "lasuite", "people", status < 500, f"HTTP {status}") + except urllib.error.URLError as e: + return CheckResult("people", "lasuite", "people", False, str(e.reason)) + + +def check_people_api(domain: str, opener) -> CheckResult: + """GET /api/v1.0/config/ -> any response < 500 (401 auth-required is fine).""" + url = f"https://people.{domain}/api/v1.0/config/" + try: + status, _ = _http_get(url, opener) + return CheckResult("people-api", "lasuite", "people", status < 500, f"HTTP {status}") + except urllib.error.URLError as e: + return CheckResult("people-api", "lasuite", "people", False, str(e.reason)) + + +def check_livekit(domain: str, opener) -> CheckResult: + """kubectl exec livekit-server pod -- wget localhost:7880/ -> rc 0.""" + pod = kube_out("get", "pods", "-n", "media", "-l", "app.kubernetes.io/name=livekit-server", + "--no-headers", "-o=custom-columns=NAME:.metadata.name") + pod = pod.splitlines()[0].strip() if pod else "" + if not pod: + return CheckResult("livekit", "media", "livekit", False, "no livekit pod") + rc, _ = kube_exec("media", pod, "wget", "-qO-", "http://localhost:7880/") + if rc == 0: + return CheckResult("livekit", "media", "livekit", True, "server responding") + return CheckResult("livekit", "media", "livekit", False, "server not responding") + + +# ── Check registry ──────────────────────────────────────────────────────────── + +CHECKS: list[tuple[Any, str, str]] = [ + (check_gitea_version, "devtools", "gitea"), + (check_gitea_auth, "devtools", "gitea"), + (check_postgres, "data", "postgres"), + (check_valkey, "data", "valkey"), + (check_openbao, "data", "openbao"), + (check_seaweedfs, "storage", "seaweedfs"), + (check_kratos, "ory", "kratos"), + (check_hydra_oidc, "ory", "hydra"), + (check_people, "lasuite", "people"), + (check_people_api, "lasuite", "people"), + (check_livekit, "media", "livekit"), +] + + +def cmd_check(target: str | None) -> None: + """Run service-level health checks, optionally scoped to a namespace or service.""" + step("Service health checks...") + + domain = get_domain() + ssl_ctx = _ssl_ctx() + op = _opener(ssl_ctx) + + ns_filter, svc_filter = parse_target(target) if target else (None, None) + fns = [ + fn for fn, ns, svc in CHECKS + if (ns_filter is None or ns == ns_filter) + and (svc_filter is None or svc == svc_filter) + ] + + if not fns: + warn(f"No checks match target: {target}") + return + + results = [] + for fn in fns: + r = fn(domain, op) + results.append(r) + icon = "\u2713" if r.passed else "\u2717" + detail = f" ({r.detail})" if r.detail else "" + print(f" {icon} {r.ns}/{r.svc} [{r.name}]{detail}") + + print() + failed = [r for r in results if not r.passed] + if failed: + warn(f"{len(failed)} check(s) failed.") + else: + ok(f"All {len(results)} check(s) passed.") diff --git a/sunbeam/cli.py b/sunbeam/cli.py index 7511948..4376d1b 100644 --- a/sunbeam/cli.py +++ b/sunbeam/cli.py @@ -53,6 +53,11 @@ def main() -> None: p_build.add_argument("what", choices=["proxy"], help="What to build (proxy)") + # sunbeam check [ns[/name]] + p_check = sub.add_parser("check", help="Functional service health checks") + p_check.add_argument("target", nargs="?", default=None, + help="namespace or namespace/name") + # sunbeam mirror sub.add_parser("mirror", help="Mirror amd64-only La Suite images") @@ -106,6 +111,10 @@ def main() -> None: from sunbeam.images import cmd_build cmd_build(args.what) + elif args.verb == "check": + from sunbeam.checks import cmd_check + cmd_check(args.target) + elif args.verb == "mirror": from sunbeam.images import cmd_mirror cmd_mirror() diff --git a/sunbeam/kube.py b/sunbeam/kube.py index 9b264a6..0fc252b 100644 --- a/sunbeam/kube.py +++ b/sunbeam/kube.py @@ -93,6 +93,28 @@ def create_secret(ns: str, name: str, **literals) -> None: "--field-manager=sunbeam", "-f", "-", input=manifest) +def kube_exec(ns: str, pod: str, *cmd: str) -> tuple[int, str]: + """Run a command inside a pod. Returns (returncode, stdout).""" + r = run_tool("kubectl", "--context=sunbeam", "exec", "-n", ns, pod, + "--", *cmd, + capture_output=True, text=True, check=False) + return r.returncode, r.stdout.strip() + + +def get_domain() -> str: + """Discover the active domain from cluster state. + + Reads a known substituted configmap value; falls back to the Lima VM IP. + """ + raw = kube_out("get", "configmap", "lasuite-oidc-provider", "-n", "lasuite", + "-o=jsonpath={.data.OIDC_OP_JWKS_ENDPOINT}") + if raw and "https://auth." in raw: + # e.g. "https://auth.192.168.105.2.sslip.io/.well-known/jwks.json" + return raw.split("https://auth.")[1].split("/")[0] + ip = get_lima_ip() + return f"{ip}.sslip.io" + + def kustomize_build(overlay: Path, domain: str) -> str: """Run kustomize build --enable-helm and apply domain substitution.""" r = run_tool( diff --git a/sunbeam/tests/test_checks.py b/sunbeam/tests/test_checks.py new file mode 100644 index 0000000..5a2a4e4 --- /dev/null +++ b/sunbeam/tests/test_checks.py @@ -0,0 +1,284 @@ +"""Tests for checks.py — service-level health probes.""" +import json +import unittest +from unittest.mock import MagicMock, patch + + +class TestCheckGiteaVersion(unittest.TestCase): + def test_ok_returns_version(self): + body = json.dumps({"version": "1.21.0"}).encode() + with patch("sunbeam.checks._http_get", return_value=(200, body)): + from sunbeam import checks + r = checks.check_gitea_version("testdomain", None) + self.assertTrue(r.passed) + self.assertIn("1.21.0", r.detail) + + def test_non_200_fails(self): + with patch("sunbeam.checks._http_get", return_value=(502, b"")): + from sunbeam import checks + r = checks.check_gitea_version("testdomain", None) + self.assertFalse(r.passed) + self.assertIn("502", r.detail) + + def test_connection_error_fails(self): + import urllib.error + with patch("sunbeam.checks._http_get", side_effect=urllib.error.URLError("refused")): + from sunbeam import checks + r = checks.check_gitea_version("testdomain", None) + self.assertFalse(r.passed) + + +class TestCheckGiteaAuth(unittest.TestCase): + def _secret(self, key, val): + def side_effect(ns, name, k): + return val if k == key else "gitea_admin" + return side_effect + + def test_ok_returns_login(self): + body = json.dumps({"login": "gitea_admin"}).encode() + with patch("sunbeam.checks._kube_secret", + side_effect=self._secret("admin-password", "hunter2")): + with patch("sunbeam.checks._http_get", return_value=(200, body)): + from sunbeam import checks + r = checks.check_gitea_auth("testdomain", None) + self.assertTrue(r.passed) + self.assertIn("gitea_admin", r.detail) + + def test_missing_password_fails(self): + with patch("sunbeam.checks._kube_secret", return_value=""): + from sunbeam import checks + r = checks.check_gitea_auth("testdomain", None) + self.assertFalse(r.passed) + self.assertIn("secret", r.detail) + + def test_non_200_fails(self): + with patch("sunbeam.checks._kube_secret", + side_effect=self._secret("admin-password", "hunter2")): + with patch("sunbeam.checks._http_get", return_value=(401, b"")): + from sunbeam import checks + r = checks.check_gitea_auth("testdomain", None) + self.assertFalse(r.passed) + + +class TestCheckPostgres(unittest.TestCase): + def test_ready_passes(self): + with patch("sunbeam.checks.kube_out", side_effect=["1", "1"]): + from sunbeam import checks + r = checks.check_postgres("testdomain", None) + self.assertTrue(r.passed) + self.assertIn("1/1", r.detail) + + def test_not_ready_fails(self): + with patch("sunbeam.checks.kube_out", side_effect=["0", "1"]): + from sunbeam import checks + r = checks.check_postgres("testdomain", None) + self.assertFalse(r.passed) + + def test_cluster_not_found_fails(self): + with patch("sunbeam.checks.kube_out", return_value=""): + from sunbeam import checks + r = checks.check_postgres("testdomain", None) + self.assertFalse(r.passed) + + +class TestCheckValkey(unittest.TestCase): + def test_pong_passes(self): + with patch("sunbeam.checks.kube_out", return_value="valkey-abc"): + with patch("sunbeam.checks.kube_exec", return_value=(0, "PONG")): + from sunbeam import checks + r = checks.check_valkey("testdomain", None) + self.assertTrue(r.passed) + + def test_no_pod_fails(self): + with patch("sunbeam.checks.kube_out", return_value=""): + from sunbeam import checks + r = checks.check_valkey("testdomain", None) + self.assertFalse(r.passed) + + def test_no_pong_fails(self): + with patch("sunbeam.checks.kube_out", return_value="valkey-abc"): + with patch("sunbeam.checks.kube_exec", return_value=(1, "")): + from sunbeam import checks + r = checks.check_valkey("testdomain", None) + self.assertFalse(r.passed) + + +class TestCheckOpenbao(unittest.TestCase): + def test_unsealed_passes(self): + out = json.dumps({"initialized": True, "sealed": False}) + with patch("sunbeam.checks.kube_exec", return_value=(0, out)): + from sunbeam import checks + r = checks.check_openbao("testdomain", None) + self.assertTrue(r.passed) + + def test_sealed_fails(self): + out = json.dumps({"initialized": True, "sealed": True}) + with patch("sunbeam.checks.kube_exec", return_value=(2, out)): + from sunbeam import checks + r = checks.check_openbao("testdomain", None) + self.assertFalse(r.passed) + + def test_no_output_fails(self): + with patch("sunbeam.checks.kube_exec", return_value=(1, "")): + from sunbeam import checks + r = checks.check_openbao("testdomain", None) + self.assertFalse(r.passed) + + +class TestCheckSeaweedfs(unittest.TestCase): + def test_responding_passes(self): + with patch("sunbeam.checks.kube_out", return_value="seaweedfs-filer-abc"): + with patch("sunbeam.checks.kube_exec", return_value=(0, "filer status data")): + from sunbeam import checks + r = checks.check_seaweedfs("testdomain", None) + self.assertTrue(r.passed) + + def test_no_pod_fails(self): + with patch("sunbeam.checks.kube_out", return_value=""): + from sunbeam import checks + r = checks.check_seaweedfs("testdomain", None) + self.assertFalse(r.passed) + + def test_exec_fails(self): + with patch("sunbeam.checks.kube_out", return_value="seaweedfs-filer-abc"): + with patch("sunbeam.checks.kube_exec", return_value=(1, "")): + from sunbeam import checks + r = checks.check_seaweedfs("testdomain", None) + self.assertFalse(r.passed) + + +class TestCheckKratos(unittest.TestCase): + def test_200_passes(self): + with patch("sunbeam.checks._http_get", return_value=(200, b"")): + from sunbeam import checks + r = checks.check_kratos("testdomain", None) + self.assertTrue(r.passed) + + def test_503_fails(self): + with patch("sunbeam.checks._http_get", return_value=(503, b"not ready")): + from sunbeam import checks + r = checks.check_kratos("testdomain", None) + self.assertFalse(r.passed) + self.assertIn("503", r.detail) + + +class TestCheckHydraOidc(unittest.TestCase): + def test_200_with_issuer_passes(self): + body = json.dumps({"issuer": "https://auth.testdomain/"}).encode() + with patch("sunbeam.checks._http_get", return_value=(200, body)): + from sunbeam import checks + r = checks.check_hydra_oidc("testdomain", None) + self.assertTrue(r.passed) + self.assertIn("testdomain", r.detail) + + def test_502_fails(self): + with patch("sunbeam.checks._http_get", return_value=(502, b"")): + from sunbeam import checks + r = checks.check_hydra_oidc("testdomain", None) + self.assertFalse(r.passed) + + +class TestCheckPeople(unittest.TestCase): + def test_200_passes(self): + with patch("sunbeam.checks._http_get", return_value=(200, b"")): + from sunbeam import checks + r = checks.check_people("testdomain", None) + self.assertTrue(r.passed) + + def test_302_redirect_passes(self): + with patch("sunbeam.checks._http_get", return_value=(302, b"")): + from sunbeam import checks + r = checks.check_people("testdomain", None) + self.assertTrue(r.passed) + self.assertIn("302", r.detail) + + def test_502_fails(self): + with patch("sunbeam.checks._http_get", return_value=(502, b"")): + from sunbeam import checks + r = checks.check_people("testdomain", None) + self.assertFalse(r.passed) + self.assertIn("502", r.detail) + + +class TestCheckPeopleApi(unittest.TestCase): + def test_200_passes(self): + with patch("sunbeam.checks._http_get", return_value=(200, b"{}")): + from sunbeam import checks + r = checks.check_people_api("testdomain", None) + self.assertTrue(r.passed) + + def test_401_auth_required_passes(self): + with patch("sunbeam.checks._http_get", return_value=(401, b"")): + from sunbeam import checks + r = checks.check_people_api("testdomain", None) + self.assertTrue(r.passed) + + def test_502_fails(self): + with patch("sunbeam.checks._http_get", return_value=(502, b"")): + from sunbeam import checks + r = checks.check_people_api("testdomain", None) + self.assertFalse(r.passed) + + +class TestCheckLivekit(unittest.TestCase): + def test_responding_passes(self): + with patch("sunbeam.checks.kube_out", return_value="livekit-server-abc"): + with patch("sunbeam.checks.kube_exec", return_value=(0, "")): + from sunbeam import checks + r = checks.check_livekit("testdomain", None) + self.assertTrue(r.passed) + + def test_no_pod_fails(self): + with patch("sunbeam.checks.kube_out", return_value=""): + from sunbeam import checks + r = checks.check_livekit("testdomain", None) + self.assertFalse(r.passed) + + def test_exec_fails(self): + with patch("sunbeam.checks.kube_out", return_value="livekit-server-abc"): + with patch("sunbeam.checks.kube_exec", return_value=(1, "")): + from sunbeam import checks + r = checks.check_livekit("testdomain", None) + self.assertFalse(r.passed) + + +class TestCmdCheck(unittest.TestCase): + def _run(self, target, mock_list): + from sunbeam import checks + result = checks.CheckResult("x", "ns", "svc", True, "ok") + fns = [MagicMock(return_value=result) for _ in mock_list] + patched = list(zip(fns, [ns for _, ns, _ in mock_list], [s for _, _, s in mock_list])) + with patch("sunbeam.checks.get_domain", return_value="td"), \ + patch("sunbeam.checks._ssl_ctx", return_value=None), \ + patch("sunbeam.checks._opener", return_value=None), \ + patch.object(checks, "CHECKS", patched): + checks.cmd_check(target) + return fns + + def test_no_target_runs_all(self): + mock_list = [("unused", "devtools", "gitea"), ("unused", "data", "postgres")] + fns = self._run(None, mock_list) + fns[0].assert_called_once_with("td", None) + fns[1].assert_called_once_with("td", None) + + def test_ns_filter_skips_other_namespaces(self): + mock_list = [("unused", "devtools", "gitea"), ("unused", "data", "postgres")] + fns = self._run("devtools", mock_list) + fns[0].assert_called_once() + fns[1].assert_not_called() + + def test_svc_filter(self): + mock_list = [("unused", "ory", "kratos"), ("unused", "ory", "hydra")] + fns = self._run("ory/kratos", mock_list) + fns[0].assert_called_once() + fns[1].assert_not_called() + + def test_no_match_warns(self): + from sunbeam import checks + with patch("sunbeam.checks.get_domain", return_value="td"), \ + patch("sunbeam.checks._ssl_ctx", return_value=None), \ + patch("sunbeam.checks._opener", return_value=None), \ + patch.object(checks, "CHECKS", []), \ + patch("sunbeam.checks.warn") as mock_warn: + checks.cmd_check("nonexistent") + mock_warn.assert_called_once() diff --git a/sunbeam/tests/test_cli.py b/sunbeam/tests/test_cli.py index bf70729..b714081 100644 --- a/sunbeam/tests/test_cli.py +++ b/sunbeam/tests/test_cli.py @@ -31,6 +31,8 @@ class TestArgParsing(unittest.TestCase): p_build.add_argument("what", choices=["proxy"]) sub.add_parser("mirror") sub.add_parser("bootstrap") + p_check = sub.add_parser("check") + p_check.add_argument("target", nargs="?", default=None) return parser.parse_args(argv) def test_up(self): @@ -83,6 +85,21 @@ class TestArgParsing(unittest.TestCase): with self.assertRaises(SystemExit): self._parse(["get", "ory/kratos-abc", "-o", "toml"]) + def test_check_no_target(self): + args = self._parse(["check"]) + self.assertEqual(args.verb, "check") + self.assertIsNone(args.target) + + def test_check_with_namespace(self): + args = self._parse(["check", "devtools"]) + self.assertEqual(args.verb, "check") + self.assertEqual(args.target, "devtools") + + def test_check_with_service(self): + args = self._parse(["check", "lasuite/people"]) + self.assertEqual(args.verb, "check") + self.assertEqual(args.target, "lasuite/people") + def test_no_args_verb_is_none(self): args = self._parse([]) self.assertIsNone(args.verb) @@ -189,3 +206,27 @@ class TestCliDispatch(unittest.TestCase): except SystemExit: pass mock_build.assert_called_once_with("proxy") + + def test_check_no_target(self): + mock_check = MagicMock() + with patch.object(sys, "argv", ["sunbeam", "check"]): + with patch.dict("sys.modules", {"sunbeam.checks": MagicMock(cmd_check=mock_check)}): + import importlib, sunbeam.cli as cli_mod + importlib.reload(cli_mod) + try: + cli_mod.main() + except SystemExit: + pass + mock_check.assert_called_once_with(None) + + def test_check_with_target(self): + mock_check = MagicMock() + with patch.object(sys, "argv", ["sunbeam", "check", "lasuite/people"]): + with patch.dict("sys.modules", {"sunbeam.checks": MagicMock(cmd_check=mock_check)}): + import importlib, sunbeam.cli as cli_mod + importlib.reload(cli_mod) + try: + cli_mod.main() + except SystemExit: + pass + mock_check.assert_called_once_with("lasuite/people")