Add sunbeam check verb with service-level health probes
11 checks across 7 namespaces: gitea version+auth, postgres CNPG readiness, valkey PONG, openbao sealed state, seaweedfs filer, kratos health, hydra OIDC discovery, people HTTP (catches 502s), people API, and livekit. Supports ns and ns/svc scoping. - checks.py: new module with _http_get (no-redirect opener + mkcert SSL), kube_exec-based exec checks, and cmd_check dispatch - kube.py: add kube_exec() and get_domain() (reads from cluster configmap) - cli.py: add 'check [target]' verb - 103 tests, all passing
This commit is contained in:
269
sunbeam/checks.py
Normal file
269
sunbeam/checks.py
Normal file
@@ -0,0 +1,269 @@
|
|||||||
|
"""Service-level health checks — functional probes beyond pod readiness."""
|
||||||
|
import base64
|
||||||
|
import json
|
||||||
|
import ssl
|
||||||
|
import subprocess
|
||||||
|
import urllib.error
|
||||||
|
import urllib.request
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from sunbeam.kube import get_domain, kube_exec, kube_out, parse_target
|
||||||
|
from sunbeam.output import ok, step, warn
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class CheckResult:
|
||||||
|
name: str
|
||||||
|
ns: str
|
||||||
|
svc: str
|
||||||
|
passed: bool
|
||||||
|
detail: str = ""
|
||||||
|
|
||||||
|
|
||||||
|
def _ssl_ctx() -> ssl.SSLContext:
|
||||||
|
"""Return an SSL context that trusts the mkcert local CA if available."""
|
||||||
|
ctx = ssl.create_default_context()
|
||||||
|
try:
|
||||||
|
r = subprocess.run(["mkcert", "-CAROOT"], capture_output=True, text=True)
|
||||||
|
if r.returncode == 0:
|
||||||
|
ca_file = Path(r.stdout.strip()) / "rootCA.pem"
|
||||||
|
if ca_file.exists():
|
||||||
|
ctx.load_verify_locations(cafile=str(ca_file))
|
||||||
|
except FileNotFoundError:
|
||||||
|
pass
|
||||||
|
return ctx
|
||||||
|
|
||||||
|
|
||||||
|
def _kube_secret(ns: str, name: str, key: str) -> str:
|
||||||
|
"""Read a base64-encoded K8s secret value and return the decoded string."""
|
||||||
|
raw = kube_out("get", "secret", name, "-n", ns, f"-o=jsonpath={{.data.{key}}}")
|
||||||
|
if not raw:
|
||||||
|
return ""
|
||||||
|
try:
|
||||||
|
return base64.b64decode(raw + "==").decode()
|
||||||
|
except Exception:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
class _NoRedirect(urllib.request.HTTPRedirectHandler):
|
||||||
|
"""Prevent urllib from following redirects so we can inspect the status code."""
|
||||||
|
def redirect_request(self, req, fp, code, msg, headers, newurl):
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _opener(ssl_ctx: ssl.SSLContext) -> urllib.request.OpenerDirector:
|
||||||
|
return urllib.request.build_opener(
|
||||||
|
_NoRedirect(),
|
||||||
|
urllib.request.HTTPSHandler(context=ssl_ctx),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _http_get(url: str, opener: urllib.request.OpenerDirector, *,
|
||||||
|
headers: dict | None = None, timeout: int = 10) -> tuple[int, bytes]:
|
||||||
|
"""Return (status_code, body). Redirects are not followed."""
|
||||||
|
req = urllib.request.Request(url, headers=headers or {})
|
||||||
|
try:
|
||||||
|
with opener.open(req, timeout=timeout) as resp:
|
||||||
|
return resp.status, resp.read()
|
||||||
|
except urllib.error.HTTPError as e:
|
||||||
|
return e.code, b""
|
||||||
|
|
||||||
|
|
||||||
|
# ── Individual checks ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def check_gitea_version(domain: str, opener) -> CheckResult:
|
||||||
|
"""GET /api/v1/version -> JSON with version field."""
|
||||||
|
url = f"https://src.{domain}/api/v1/version"
|
||||||
|
try:
|
||||||
|
status, body = _http_get(url, opener)
|
||||||
|
if status == 200:
|
||||||
|
ver = json.loads(body).get("version", "?")
|
||||||
|
return CheckResult("gitea-version", "devtools", "gitea", True, f"v{ver}")
|
||||||
|
return CheckResult("gitea-version", "devtools", "gitea", False, f"HTTP {status}")
|
||||||
|
except urllib.error.URLError as e:
|
||||||
|
return CheckResult("gitea-version", "devtools", "gitea", False, str(e.reason))
|
||||||
|
|
||||||
|
|
||||||
|
def check_gitea_auth(domain: str, opener) -> CheckResult:
|
||||||
|
"""GET /api/v1/user with admin credentials -> 200 and login field."""
|
||||||
|
username = _kube_secret("devtools", "gitea-admin-credentials", "admin-username") or "gitea_admin"
|
||||||
|
password = _kube_secret("devtools", "gitea-admin-credentials", "admin-password")
|
||||||
|
if not password:
|
||||||
|
return CheckResult("gitea-auth", "devtools", "gitea", False,
|
||||||
|
"admin-password not found in secret")
|
||||||
|
creds = base64.b64encode(f"{username}:{password}".encode()).decode()
|
||||||
|
url = f"https://src.{domain}/api/v1/user"
|
||||||
|
try:
|
||||||
|
status, body = _http_get(url, opener, headers={"Authorization": f"Basic {creds}"})
|
||||||
|
if status == 200:
|
||||||
|
login = json.loads(body).get("login", "?")
|
||||||
|
return CheckResult("gitea-auth", "devtools", "gitea", True, f"user={login}")
|
||||||
|
return CheckResult("gitea-auth", "devtools", "gitea", False, f"HTTP {status}")
|
||||||
|
except urllib.error.URLError as e:
|
||||||
|
return CheckResult("gitea-auth", "devtools", "gitea", False, str(e.reason))
|
||||||
|
|
||||||
|
|
||||||
|
def check_postgres(domain: str, opener) -> CheckResult:
|
||||||
|
"""CNPG Cluster readyInstances == instances."""
|
||||||
|
ready = kube_out("get", "cluster", "postgres", "-n", "data",
|
||||||
|
"-o=jsonpath={.status.readyInstances}")
|
||||||
|
total = kube_out("get", "cluster", "postgres", "-n", "data",
|
||||||
|
"-o=jsonpath={.status.instances}")
|
||||||
|
if ready and total and ready == total:
|
||||||
|
return CheckResult("postgres", "data", "postgres", True, f"{ready}/{total} ready")
|
||||||
|
detail = (f"{ready or '?'}/{total or '?'} ready"
|
||||||
|
if (ready or total) else "cluster not found")
|
||||||
|
return CheckResult("postgres", "data", "postgres", False, detail)
|
||||||
|
|
||||||
|
|
||||||
|
def check_valkey(domain: str, opener) -> CheckResult:
|
||||||
|
"""kubectl exec valkey pod -- valkey-cli ping -> PONG."""
|
||||||
|
pod = kube_out("get", "pods", "-n", "data", "-l", "app=valkey",
|
||||||
|
"--no-headers", "-o=custom-columns=NAME:.metadata.name")
|
||||||
|
pod = pod.splitlines()[0].strip() if pod else ""
|
||||||
|
if not pod:
|
||||||
|
return CheckResult("valkey", "data", "valkey", False, "no valkey pod")
|
||||||
|
_, out = kube_exec("data", pod, "valkey-cli", "ping")
|
||||||
|
return CheckResult("valkey", "data", "valkey", out == "PONG", out or "no response")
|
||||||
|
|
||||||
|
|
||||||
|
def check_openbao(domain: str, opener) -> CheckResult:
|
||||||
|
"""kubectl exec openbao-0 -- bao status -format=json -> initialized + unsealed."""
|
||||||
|
rc, out = kube_exec("data", "openbao-0", "bao", "status", "-format=json")
|
||||||
|
if not out:
|
||||||
|
return CheckResult("openbao", "data", "openbao", False, "no response")
|
||||||
|
try:
|
||||||
|
data = json.loads(out)
|
||||||
|
init = data.get("initialized", False)
|
||||||
|
sealed = data.get("sealed", True)
|
||||||
|
return CheckResult("openbao", "data", "openbao", init and not sealed,
|
||||||
|
f"init={init}, sealed={sealed}")
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
return CheckResult("openbao", "data", "openbao", False, out[:80])
|
||||||
|
|
||||||
|
|
||||||
|
def check_seaweedfs(domain: str, opener) -> CheckResult:
|
||||||
|
"""kubectl exec seaweedfs-filer pod -- wget /dir/status -> filer responding."""
|
||||||
|
pod = kube_out("get", "pods", "-n", "storage", "-l", "app=seaweedfs-filer",
|
||||||
|
"--no-headers", "-o=custom-columns=NAME:.metadata.name")
|
||||||
|
pod = pod.splitlines()[0].strip() if pod else ""
|
||||||
|
if not pod:
|
||||||
|
return CheckResult("seaweedfs", "storage", "seaweedfs", False, "no seaweedfs-filer pod")
|
||||||
|
rc, out = kube_exec("storage", pod, "wget", "-qO-", "http://localhost:8888/dir/status")
|
||||||
|
if rc == 0 and out:
|
||||||
|
return CheckResult("seaweedfs", "storage", "seaweedfs", True, "filer responding")
|
||||||
|
return CheckResult("seaweedfs", "storage", "seaweedfs", False, "filer not responding")
|
||||||
|
|
||||||
|
|
||||||
|
def check_kratos(domain: str, opener) -> CheckResult:
|
||||||
|
"""GET /kratos/health/ready -> 200."""
|
||||||
|
url = f"https://auth.{domain}/kratos/health/ready"
|
||||||
|
try:
|
||||||
|
status, body = _http_get(url, opener)
|
||||||
|
ok_flag = status == 200
|
||||||
|
detail = f"HTTP {status}"
|
||||||
|
if not ok_flag and body:
|
||||||
|
detail += f": {body.decode(errors='replace')[:80]}"
|
||||||
|
return CheckResult("kratos", "ory", "kratos", ok_flag, detail)
|
||||||
|
except urllib.error.URLError as e:
|
||||||
|
return CheckResult("kratos", "ory", "kratos", False, str(e.reason))
|
||||||
|
|
||||||
|
|
||||||
|
def check_hydra_oidc(domain: str, opener) -> CheckResult:
|
||||||
|
"""GET /.well-known/openid-configuration -> 200 with issuer field."""
|
||||||
|
url = f"https://auth.{domain}/.well-known/openid-configuration"
|
||||||
|
try:
|
||||||
|
status, body = _http_get(url, opener)
|
||||||
|
if status == 200:
|
||||||
|
issuer = json.loads(body).get("issuer", "?")
|
||||||
|
return CheckResult("hydra-oidc", "ory", "hydra", True, f"issuer={issuer}")
|
||||||
|
return CheckResult("hydra-oidc", "ory", "hydra", False, f"HTTP {status}")
|
||||||
|
except urllib.error.URLError as e:
|
||||||
|
return CheckResult("hydra-oidc", "ory", "hydra", False, str(e.reason))
|
||||||
|
|
||||||
|
|
||||||
|
def check_people(domain: str, opener) -> CheckResult:
|
||||||
|
"""GET https://people.{domain}/ -> any response < 500 (302 to OIDC is fine)."""
|
||||||
|
url = f"https://people.{domain}/"
|
||||||
|
try:
|
||||||
|
status, _ = _http_get(url, opener)
|
||||||
|
return CheckResult("people", "lasuite", "people", status < 500, f"HTTP {status}")
|
||||||
|
except urllib.error.URLError as e:
|
||||||
|
return CheckResult("people", "lasuite", "people", False, str(e.reason))
|
||||||
|
|
||||||
|
|
||||||
|
def check_people_api(domain: str, opener) -> CheckResult:
|
||||||
|
"""GET /api/v1.0/config/ -> any response < 500 (401 auth-required is fine)."""
|
||||||
|
url = f"https://people.{domain}/api/v1.0/config/"
|
||||||
|
try:
|
||||||
|
status, _ = _http_get(url, opener)
|
||||||
|
return CheckResult("people-api", "lasuite", "people", status < 500, f"HTTP {status}")
|
||||||
|
except urllib.error.URLError as e:
|
||||||
|
return CheckResult("people-api", "lasuite", "people", False, str(e.reason))
|
||||||
|
|
||||||
|
|
||||||
|
def check_livekit(domain: str, opener) -> CheckResult:
|
||||||
|
"""kubectl exec livekit-server pod -- wget localhost:7880/ -> rc 0."""
|
||||||
|
pod = kube_out("get", "pods", "-n", "media", "-l", "app.kubernetes.io/name=livekit-server",
|
||||||
|
"--no-headers", "-o=custom-columns=NAME:.metadata.name")
|
||||||
|
pod = pod.splitlines()[0].strip() if pod else ""
|
||||||
|
if not pod:
|
||||||
|
return CheckResult("livekit", "media", "livekit", False, "no livekit pod")
|
||||||
|
rc, _ = kube_exec("media", pod, "wget", "-qO-", "http://localhost:7880/")
|
||||||
|
if rc == 0:
|
||||||
|
return CheckResult("livekit", "media", "livekit", True, "server responding")
|
||||||
|
return CheckResult("livekit", "media", "livekit", False, "server not responding")
|
||||||
|
|
||||||
|
|
||||||
|
# ── Check registry ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
CHECKS: list[tuple[Any, str, str]] = [
|
||||||
|
(check_gitea_version, "devtools", "gitea"),
|
||||||
|
(check_gitea_auth, "devtools", "gitea"),
|
||||||
|
(check_postgres, "data", "postgres"),
|
||||||
|
(check_valkey, "data", "valkey"),
|
||||||
|
(check_openbao, "data", "openbao"),
|
||||||
|
(check_seaweedfs, "storage", "seaweedfs"),
|
||||||
|
(check_kratos, "ory", "kratos"),
|
||||||
|
(check_hydra_oidc, "ory", "hydra"),
|
||||||
|
(check_people, "lasuite", "people"),
|
||||||
|
(check_people_api, "lasuite", "people"),
|
||||||
|
(check_livekit, "media", "livekit"),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def cmd_check(target: str | None) -> None:
|
||||||
|
"""Run service-level health checks, optionally scoped to a namespace or service."""
|
||||||
|
step("Service health checks...")
|
||||||
|
|
||||||
|
domain = get_domain()
|
||||||
|
ssl_ctx = _ssl_ctx()
|
||||||
|
op = _opener(ssl_ctx)
|
||||||
|
|
||||||
|
ns_filter, svc_filter = parse_target(target) if target else (None, None)
|
||||||
|
fns = [
|
||||||
|
fn for fn, ns, svc in CHECKS
|
||||||
|
if (ns_filter is None or ns == ns_filter)
|
||||||
|
and (svc_filter is None or svc == svc_filter)
|
||||||
|
]
|
||||||
|
|
||||||
|
if not fns:
|
||||||
|
warn(f"No checks match target: {target}")
|
||||||
|
return
|
||||||
|
|
||||||
|
results = []
|
||||||
|
for fn in fns:
|
||||||
|
r = fn(domain, op)
|
||||||
|
results.append(r)
|
||||||
|
icon = "\u2713" if r.passed else "\u2717"
|
||||||
|
detail = f" ({r.detail})" if r.detail else ""
|
||||||
|
print(f" {icon} {r.ns}/{r.svc} [{r.name}]{detail}")
|
||||||
|
|
||||||
|
print()
|
||||||
|
failed = [r for r in results if not r.passed]
|
||||||
|
if failed:
|
||||||
|
warn(f"{len(failed)} check(s) failed.")
|
||||||
|
else:
|
||||||
|
ok(f"All {len(results)} check(s) passed.")
|
||||||
@@ -53,6 +53,11 @@ def main() -> None:
|
|||||||
p_build.add_argument("what", choices=["proxy"],
|
p_build.add_argument("what", choices=["proxy"],
|
||||||
help="What to build (proxy)")
|
help="What to build (proxy)")
|
||||||
|
|
||||||
|
# sunbeam check [ns[/name]]
|
||||||
|
p_check = sub.add_parser("check", help="Functional service health checks")
|
||||||
|
p_check.add_argument("target", nargs="?", default=None,
|
||||||
|
help="namespace or namespace/name")
|
||||||
|
|
||||||
# sunbeam mirror
|
# sunbeam mirror
|
||||||
sub.add_parser("mirror", help="Mirror amd64-only La Suite images")
|
sub.add_parser("mirror", help="Mirror amd64-only La Suite images")
|
||||||
|
|
||||||
@@ -106,6 +111,10 @@ def main() -> None:
|
|||||||
from sunbeam.images import cmd_build
|
from sunbeam.images import cmd_build
|
||||||
cmd_build(args.what)
|
cmd_build(args.what)
|
||||||
|
|
||||||
|
elif args.verb == "check":
|
||||||
|
from sunbeam.checks import cmd_check
|
||||||
|
cmd_check(args.target)
|
||||||
|
|
||||||
elif args.verb == "mirror":
|
elif args.verb == "mirror":
|
||||||
from sunbeam.images import cmd_mirror
|
from sunbeam.images import cmd_mirror
|
||||||
cmd_mirror()
|
cmd_mirror()
|
||||||
|
|||||||
@@ -93,6 +93,28 @@ def create_secret(ns: str, name: str, **literals) -> None:
|
|||||||
"--field-manager=sunbeam", "-f", "-", input=manifest)
|
"--field-manager=sunbeam", "-f", "-", input=manifest)
|
||||||
|
|
||||||
|
|
||||||
|
def kube_exec(ns: str, pod: str, *cmd: str) -> tuple[int, str]:
|
||||||
|
"""Run a command inside a pod. Returns (returncode, stdout)."""
|
||||||
|
r = run_tool("kubectl", "--context=sunbeam", "exec", "-n", ns, pod,
|
||||||
|
"--", *cmd,
|
||||||
|
capture_output=True, text=True, check=False)
|
||||||
|
return r.returncode, r.stdout.strip()
|
||||||
|
|
||||||
|
|
||||||
|
def get_domain() -> str:
|
||||||
|
"""Discover the active domain from cluster state.
|
||||||
|
|
||||||
|
Reads a known substituted configmap value; falls back to the Lima VM IP.
|
||||||
|
"""
|
||||||
|
raw = kube_out("get", "configmap", "lasuite-oidc-provider", "-n", "lasuite",
|
||||||
|
"-o=jsonpath={.data.OIDC_OP_JWKS_ENDPOINT}")
|
||||||
|
if raw and "https://auth." in raw:
|
||||||
|
# e.g. "https://auth.192.168.105.2.sslip.io/.well-known/jwks.json"
|
||||||
|
return raw.split("https://auth.")[1].split("/")[0]
|
||||||
|
ip = get_lima_ip()
|
||||||
|
return f"{ip}.sslip.io"
|
||||||
|
|
||||||
|
|
||||||
def kustomize_build(overlay: Path, domain: str) -> str:
|
def kustomize_build(overlay: Path, domain: str) -> str:
|
||||||
"""Run kustomize build --enable-helm and apply domain substitution."""
|
"""Run kustomize build --enable-helm and apply domain substitution."""
|
||||||
r = run_tool(
|
r = run_tool(
|
||||||
|
|||||||
284
sunbeam/tests/test_checks.py
Normal file
284
sunbeam/tests/test_checks.py
Normal file
@@ -0,0 +1,284 @@
|
|||||||
|
"""Tests for checks.py — service-level health probes."""
|
||||||
|
import json
|
||||||
|
import unittest
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
|
||||||
|
class TestCheckGiteaVersion(unittest.TestCase):
|
||||||
|
def test_ok_returns_version(self):
|
||||||
|
body = json.dumps({"version": "1.21.0"}).encode()
|
||||||
|
with patch("sunbeam.checks._http_get", return_value=(200, body)):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_gitea_version("testdomain", None)
|
||||||
|
self.assertTrue(r.passed)
|
||||||
|
self.assertIn("1.21.0", r.detail)
|
||||||
|
|
||||||
|
def test_non_200_fails(self):
|
||||||
|
with patch("sunbeam.checks._http_get", return_value=(502, b"")):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_gitea_version("testdomain", None)
|
||||||
|
self.assertFalse(r.passed)
|
||||||
|
self.assertIn("502", r.detail)
|
||||||
|
|
||||||
|
def test_connection_error_fails(self):
|
||||||
|
import urllib.error
|
||||||
|
with patch("sunbeam.checks._http_get", side_effect=urllib.error.URLError("refused")):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_gitea_version("testdomain", None)
|
||||||
|
self.assertFalse(r.passed)
|
||||||
|
|
||||||
|
|
||||||
|
class TestCheckGiteaAuth(unittest.TestCase):
|
||||||
|
def _secret(self, key, val):
|
||||||
|
def side_effect(ns, name, k):
|
||||||
|
return val if k == key else "gitea_admin"
|
||||||
|
return side_effect
|
||||||
|
|
||||||
|
def test_ok_returns_login(self):
|
||||||
|
body = json.dumps({"login": "gitea_admin"}).encode()
|
||||||
|
with patch("sunbeam.checks._kube_secret",
|
||||||
|
side_effect=self._secret("admin-password", "hunter2")):
|
||||||
|
with patch("sunbeam.checks._http_get", return_value=(200, body)):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_gitea_auth("testdomain", None)
|
||||||
|
self.assertTrue(r.passed)
|
||||||
|
self.assertIn("gitea_admin", r.detail)
|
||||||
|
|
||||||
|
def test_missing_password_fails(self):
|
||||||
|
with patch("sunbeam.checks._kube_secret", return_value=""):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_gitea_auth("testdomain", None)
|
||||||
|
self.assertFalse(r.passed)
|
||||||
|
self.assertIn("secret", r.detail)
|
||||||
|
|
||||||
|
def test_non_200_fails(self):
|
||||||
|
with patch("sunbeam.checks._kube_secret",
|
||||||
|
side_effect=self._secret("admin-password", "hunter2")):
|
||||||
|
with patch("sunbeam.checks._http_get", return_value=(401, b"")):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_gitea_auth("testdomain", None)
|
||||||
|
self.assertFalse(r.passed)
|
||||||
|
|
||||||
|
|
||||||
|
class TestCheckPostgres(unittest.TestCase):
|
||||||
|
def test_ready_passes(self):
|
||||||
|
with patch("sunbeam.checks.kube_out", side_effect=["1", "1"]):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_postgres("testdomain", None)
|
||||||
|
self.assertTrue(r.passed)
|
||||||
|
self.assertIn("1/1", r.detail)
|
||||||
|
|
||||||
|
def test_not_ready_fails(self):
|
||||||
|
with patch("sunbeam.checks.kube_out", side_effect=["0", "1"]):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_postgres("testdomain", None)
|
||||||
|
self.assertFalse(r.passed)
|
||||||
|
|
||||||
|
def test_cluster_not_found_fails(self):
|
||||||
|
with patch("sunbeam.checks.kube_out", return_value=""):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_postgres("testdomain", None)
|
||||||
|
self.assertFalse(r.passed)
|
||||||
|
|
||||||
|
|
||||||
|
class TestCheckValkey(unittest.TestCase):
|
||||||
|
def test_pong_passes(self):
|
||||||
|
with patch("sunbeam.checks.kube_out", return_value="valkey-abc"):
|
||||||
|
with patch("sunbeam.checks.kube_exec", return_value=(0, "PONG")):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_valkey("testdomain", None)
|
||||||
|
self.assertTrue(r.passed)
|
||||||
|
|
||||||
|
def test_no_pod_fails(self):
|
||||||
|
with patch("sunbeam.checks.kube_out", return_value=""):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_valkey("testdomain", None)
|
||||||
|
self.assertFalse(r.passed)
|
||||||
|
|
||||||
|
def test_no_pong_fails(self):
|
||||||
|
with patch("sunbeam.checks.kube_out", return_value="valkey-abc"):
|
||||||
|
with patch("sunbeam.checks.kube_exec", return_value=(1, "")):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_valkey("testdomain", None)
|
||||||
|
self.assertFalse(r.passed)
|
||||||
|
|
||||||
|
|
||||||
|
class TestCheckOpenbao(unittest.TestCase):
|
||||||
|
def test_unsealed_passes(self):
|
||||||
|
out = json.dumps({"initialized": True, "sealed": False})
|
||||||
|
with patch("sunbeam.checks.kube_exec", return_value=(0, out)):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_openbao("testdomain", None)
|
||||||
|
self.assertTrue(r.passed)
|
||||||
|
|
||||||
|
def test_sealed_fails(self):
|
||||||
|
out = json.dumps({"initialized": True, "sealed": True})
|
||||||
|
with patch("sunbeam.checks.kube_exec", return_value=(2, out)):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_openbao("testdomain", None)
|
||||||
|
self.assertFalse(r.passed)
|
||||||
|
|
||||||
|
def test_no_output_fails(self):
|
||||||
|
with patch("sunbeam.checks.kube_exec", return_value=(1, "")):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_openbao("testdomain", None)
|
||||||
|
self.assertFalse(r.passed)
|
||||||
|
|
||||||
|
|
||||||
|
class TestCheckSeaweedfs(unittest.TestCase):
|
||||||
|
def test_responding_passes(self):
|
||||||
|
with patch("sunbeam.checks.kube_out", return_value="seaweedfs-filer-abc"):
|
||||||
|
with patch("sunbeam.checks.kube_exec", return_value=(0, "filer status data")):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_seaweedfs("testdomain", None)
|
||||||
|
self.assertTrue(r.passed)
|
||||||
|
|
||||||
|
def test_no_pod_fails(self):
|
||||||
|
with patch("sunbeam.checks.kube_out", return_value=""):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_seaweedfs("testdomain", None)
|
||||||
|
self.assertFalse(r.passed)
|
||||||
|
|
||||||
|
def test_exec_fails(self):
|
||||||
|
with patch("sunbeam.checks.kube_out", return_value="seaweedfs-filer-abc"):
|
||||||
|
with patch("sunbeam.checks.kube_exec", return_value=(1, "")):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_seaweedfs("testdomain", None)
|
||||||
|
self.assertFalse(r.passed)
|
||||||
|
|
||||||
|
|
||||||
|
class TestCheckKratos(unittest.TestCase):
|
||||||
|
def test_200_passes(self):
|
||||||
|
with patch("sunbeam.checks._http_get", return_value=(200, b"")):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_kratos("testdomain", None)
|
||||||
|
self.assertTrue(r.passed)
|
||||||
|
|
||||||
|
def test_503_fails(self):
|
||||||
|
with patch("sunbeam.checks._http_get", return_value=(503, b"not ready")):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_kratos("testdomain", None)
|
||||||
|
self.assertFalse(r.passed)
|
||||||
|
self.assertIn("503", r.detail)
|
||||||
|
|
||||||
|
|
||||||
|
class TestCheckHydraOidc(unittest.TestCase):
|
||||||
|
def test_200_with_issuer_passes(self):
|
||||||
|
body = json.dumps({"issuer": "https://auth.testdomain/"}).encode()
|
||||||
|
with patch("sunbeam.checks._http_get", return_value=(200, body)):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_hydra_oidc("testdomain", None)
|
||||||
|
self.assertTrue(r.passed)
|
||||||
|
self.assertIn("testdomain", r.detail)
|
||||||
|
|
||||||
|
def test_502_fails(self):
|
||||||
|
with patch("sunbeam.checks._http_get", return_value=(502, b"")):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_hydra_oidc("testdomain", None)
|
||||||
|
self.assertFalse(r.passed)
|
||||||
|
|
||||||
|
|
||||||
|
class TestCheckPeople(unittest.TestCase):
|
||||||
|
def test_200_passes(self):
|
||||||
|
with patch("sunbeam.checks._http_get", return_value=(200, b"<html>")):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_people("testdomain", None)
|
||||||
|
self.assertTrue(r.passed)
|
||||||
|
|
||||||
|
def test_302_redirect_passes(self):
|
||||||
|
with patch("sunbeam.checks._http_get", return_value=(302, b"")):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_people("testdomain", None)
|
||||||
|
self.assertTrue(r.passed)
|
||||||
|
self.assertIn("302", r.detail)
|
||||||
|
|
||||||
|
def test_502_fails(self):
|
||||||
|
with patch("sunbeam.checks._http_get", return_value=(502, b"")):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_people("testdomain", None)
|
||||||
|
self.assertFalse(r.passed)
|
||||||
|
self.assertIn("502", r.detail)
|
||||||
|
|
||||||
|
|
||||||
|
class TestCheckPeopleApi(unittest.TestCase):
|
||||||
|
def test_200_passes(self):
|
||||||
|
with patch("sunbeam.checks._http_get", return_value=(200, b"{}")):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_people_api("testdomain", None)
|
||||||
|
self.assertTrue(r.passed)
|
||||||
|
|
||||||
|
def test_401_auth_required_passes(self):
|
||||||
|
with patch("sunbeam.checks._http_get", return_value=(401, b"")):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_people_api("testdomain", None)
|
||||||
|
self.assertTrue(r.passed)
|
||||||
|
|
||||||
|
def test_502_fails(self):
|
||||||
|
with patch("sunbeam.checks._http_get", return_value=(502, b"")):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_people_api("testdomain", None)
|
||||||
|
self.assertFalse(r.passed)
|
||||||
|
|
||||||
|
|
||||||
|
class TestCheckLivekit(unittest.TestCase):
|
||||||
|
def test_responding_passes(self):
|
||||||
|
with patch("sunbeam.checks.kube_out", return_value="livekit-server-abc"):
|
||||||
|
with patch("sunbeam.checks.kube_exec", return_value=(0, "")):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_livekit("testdomain", None)
|
||||||
|
self.assertTrue(r.passed)
|
||||||
|
|
||||||
|
def test_no_pod_fails(self):
|
||||||
|
with patch("sunbeam.checks.kube_out", return_value=""):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_livekit("testdomain", None)
|
||||||
|
self.assertFalse(r.passed)
|
||||||
|
|
||||||
|
def test_exec_fails(self):
|
||||||
|
with patch("sunbeam.checks.kube_out", return_value="livekit-server-abc"):
|
||||||
|
with patch("sunbeam.checks.kube_exec", return_value=(1, "")):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_livekit("testdomain", None)
|
||||||
|
self.assertFalse(r.passed)
|
||||||
|
|
||||||
|
|
||||||
|
class TestCmdCheck(unittest.TestCase):
|
||||||
|
def _run(self, target, mock_list):
|
||||||
|
from sunbeam import checks
|
||||||
|
result = checks.CheckResult("x", "ns", "svc", True, "ok")
|
||||||
|
fns = [MagicMock(return_value=result) for _ in mock_list]
|
||||||
|
patched = list(zip(fns, [ns for _, ns, _ in mock_list], [s for _, _, s in mock_list]))
|
||||||
|
with patch("sunbeam.checks.get_domain", return_value="td"), \
|
||||||
|
patch("sunbeam.checks._ssl_ctx", return_value=None), \
|
||||||
|
patch("sunbeam.checks._opener", return_value=None), \
|
||||||
|
patch.object(checks, "CHECKS", patched):
|
||||||
|
checks.cmd_check(target)
|
||||||
|
return fns
|
||||||
|
|
||||||
|
def test_no_target_runs_all(self):
|
||||||
|
mock_list = [("unused", "devtools", "gitea"), ("unused", "data", "postgres")]
|
||||||
|
fns = self._run(None, mock_list)
|
||||||
|
fns[0].assert_called_once_with("td", None)
|
||||||
|
fns[1].assert_called_once_with("td", None)
|
||||||
|
|
||||||
|
def test_ns_filter_skips_other_namespaces(self):
|
||||||
|
mock_list = [("unused", "devtools", "gitea"), ("unused", "data", "postgres")]
|
||||||
|
fns = self._run("devtools", mock_list)
|
||||||
|
fns[0].assert_called_once()
|
||||||
|
fns[1].assert_not_called()
|
||||||
|
|
||||||
|
def test_svc_filter(self):
|
||||||
|
mock_list = [("unused", "ory", "kratos"), ("unused", "ory", "hydra")]
|
||||||
|
fns = self._run("ory/kratos", mock_list)
|
||||||
|
fns[0].assert_called_once()
|
||||||
|
fns[1].assert_not_called()
|
||||||
|
|
||||||
|
def test_no_match_warns(self):
|
||||||
|
from sunbeam import checks
|
||||||
|
with patch("sunbeam.checks.get_domain", return_value="td"), \
|
||||||
|
patch("sunbeam.checks._ssl_ctx", return_value=None), \
|
||||||
|
patch("sunbeam.checks._opener", return_value=None), \
|
||||||
|
patch.object(checks, "CHECKS", []), \
|
||||||
|
patch("sunbeam.checks.warn") as mock_warn:
|
||||||
|
checks.cmd_check("nonexistent")
|
||||||
|
mock_warn.assert_called_once()
|
||||||
@@ -31,6 +31,8 @@ class TestArgParsing(unittest.TestCase):
|
|||||||
p_build.add_argument("what", choices=["proxy"])
|
p_build.add_argument("what", choices=["proxy"])
|
||||||
sub.add_parser("mirror")
|
sub.add_parser("mirror")
|
||||||
sub.add_parser("bootstrap")
|
sub.add_parser("bootstrap")
|
||||||
|
p_check = sub.add_parser("check")
|
||||||
|
p_check.add_argument("target", nargs="?", default=None)
|
||||||
return parser.parse_args(argv)
|
return parser.parse_args(argv)
|
||||||
|
|
||||||
def test_up(self):
|
def test_up(self):
|
||||||
@@ -83,6 +85,21 @@ class TestArgParsing(unittest.TestCase):
|
|||||||
with self.assertRaises(SystemExit):
|
with self.assertRaises(SystemExit):
|
||||||
self._parse(["get", "ory/kratos-abc", "-o", "toml"])
|
self._parse(["get", "ory/kratos-abc", "-o", "toml"])
|
||||||
|
|
||||||
|
def test_check_no_target(self):
|
||||||
|
args = self._parse(["check"])
|
||||||
|
self.assertEqual(args.verb, "check")
|
||||||
|
self.assertIsNone(args.target)
|
||||||
|
|
||||||
|
def test_check_with_namespace(self):
|
||||||
|
args = self._parse(["check", "devtools"])
|
||||||
|
self.assertEqual(args.verb, "check")
|
||||||
|
self.assertEqual(args.target, "devtools")
|
||||||
|
|
||||||
|
def test_check_with_service(self):
|
||||||
|
args = self._parse(["check", "lasuite/people"])
|
||||||
|
self.assertEqual(args.verb, "check")
|
||||||
|
self.assertEqual(args.target, "lasuite/people")
|
||||||
|
|
||||||
def test_no_args_verb_is_none(self):
|
def test_no_args_verb_is_none(self):
|
||||||
args = self._parse([])
|
args = self._parse([])
|
||||||
self.assertIsNone(args.verb)
|
self.assertIsNone(args.verb)
|
||||||
@@ -189,3 +206,27 @@ class TestCliDispatch(unittest.TestCase):
|
|||||||
except SystemExit:
|
except SystemExit:
|
||||||
pass
|
pass
|
||||||
mock_build.assert_called_once_with("proxy")
|
mock_build.assert_called_once_with("proxy")
|
||||||
|
|
||||||
|
def test_check_no_target(self):
|
||||||
|
mock_check = MagicMock()
|
||||||
|
with patch.object(sys, "argv", ["sunbeam", "check"]):
|
||||||
|
with patch.dict("sys.modules", {"sunbeam.checks": MagicMock(cmd_check=mock_check)}):
|
||||||
|
import importlib, sunbeam.cli as cli_mod
|
||||||
|
importlib.reload(cli_mod)
|
||||||
|
try:
|
||||||
|
cli_mod.main()
|
||||||
|
except SystemExit:
|
||||||
|
pass
|
||||||
|
mock_check.assert_called_once_with(None)
|
||||||
|
|
||||||
|
def test_check_with_target(self):
|
||||||
|
mock_check = MagicMock()
|
||||||
|
with patch.object(sys, "argv", ["sunbeam", "check", "lasuite/people"]):
|
||||||
|
with patch.dict("sys.modules", {"sunbeam.checks": MagicMock(cmd_check=mock_check)}):
|
||||||
|
import importlib, sunbeam.cli as cli_mod
|
||||||
|
importlib.reload(cli_mod)
|
||||||
|
try:
|
||||||
|
cli_mod.main()
|
||||||
|
except SystemExit:
|
||||||
|
pass
|
||||||
|
mock_check.assert_called_once_with("lasuite/people")
|
||||||
|
|||||||
Reference in New Issue
Block a user