diff --git a/sunbeam/checks.py b/sunbeam/checks.py index c6eb951..da9fcde 100644 --- a/sunbeam/checks.py +++ b/sunbeam/checks.py @@ -1,5 +1,7 @@ """Service-level health checks — functional probes beyond pod readiness.""" import base64 +import hashlib +import hmac import json import ssl import subprocess @@ -7,6 +9,7 @@ import urllib.error import urllib.request from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass +from datetime import datetime, timezone from pathlib import Path from typing import Any @@ -136,13 +139,13 @@ def check_valkey(domain: str, opener) -> CheckResult: pod = pod.splitlines()[0].strip() if pod else "" if not pod: return CheckResult("valkey", "data", "valkey", False, "no valkey pod") - _, out = kube_exec("data", pod, "valkey-cli", "ping") + _, out = kube_exec("data", pod, "valkey-cli", "ping", container="valkey") return CheckResult("valkey", "data", "valkey", out == "PONG", out or "no response") def check_openbao(domain: str, opener) -> CheckResult: """kubectl exec openbao-0 -- bao status -format=json -> initialized + unsealed.""" - rc, out = kube_exec("data", "openbao-0", "bao", "status", "-format=json") + rc, out = kube_exec("data", "openbao-0", "bao", "status", "-format=json", container="openbao") if not out: return CheckResult("openbao", "data", "openbao", False, "no response") try: @@ -155,13 +158,52 @@ def check_openbao(domain: str, opener) -> CheckResult: return CheckResult("openbao", "data", "openbao", False, out[:80]) +def _s3_auth_headers(access_key: str, secret_key: str, host: str) -> dict: + """Return Authorization + x-amz-date headers for an unsigned GET / S3 request.""" + t = datetime.now(tz=timezone.utc) + amzdate = t.strftime("%Y%m%dT%H%M%SZ") + datestamp = t.strftime("%Y%m%d") + + payload_hash = hashlib.sha256(b"").hexdigest() + canonical = f"GET\n/\n\nhost:{host}\nx-amz-date:{amzdate}\n\nhost;x-amz-date\n{payload_hash}" + credential_scope = f"{datestamp}/us-east-1/s3/aws4_request" + string_to_sign = ( + f"AWS4-HMAC-SHA256\n{amzdate}\n{credential_scope}\n" + f"{hashlib.sha256(canonical.encode()).hexdigest()}" + ) + + def _sign(key: bytes, msg: str) -> bytes: + return hmac.new(key, msg.encode(), hashlib.sha256).digest() + + k = _sign(f"AWS4{secret_key}".encode(), datestamp) + k = _sign(k, "us-east-1") + k = _sign(k, "s3") + k = _sign(k, "aws4_request") + sig = hmac.new(k, string_to_sign.encode(), hashlib.sha256).hexdigest() + + auth = ( + f"AWS4-HMAC-SHA256 Credential={access_key}/{credential_scope}," + f" SignedHeaders=host;x-amz-date, Signature={sig}" + ) + return {"Authorization": auth, "x-amz-date": amzdate} + + def check_seaweedfs(domain: str, opener) -> CheckResult: - """GET https://s3.{domain}/ -> any response from the S3 API (< 500).""" - url = f"https://s3.{domain}/" + """GET https://s3.{domain}/ with S3 credentials -> 200 list-buckets response.""" + access_key = _kube_secret("storage", "seaweedfs-s3-credentials", "S3_ACCESS_KEY") + secret_key = _kube_secret("storage", "seaweedfs-s3-credentials", "S3_SECRET_KEY") + if not access_key or not secret_key: + return CheckResult("seaweedfs", "storage", "seaweedfs", False, + "credentials not found in seaweedfs-s3-credentials secret") + + host = f"s3.{domain}" + url = f"https://{host}/" + headers = _s3_auth_headers(access_key, secret_key, host) try: - status, _ = _http_get(url, opener) - # Unauthenticated S3 returns 403 (expected); 200 also ok; 5xx = problem. - return CheckResult("seaweedfs", "storage", "seaweedfs", status < 500, f"HTTP {status}") + status, _ = _http_get(url, opener, headers=headers) + if status == 200: + return CheckResult("seaweedfs", "storage", "seaweedfs", True, "S3 authenticated") + return CheckResult("seaweedfs", "storage", "seaweedfs", False, f"HTTP {status}") except urllib.error.URLError as e: return CheckResult("seaweedfs", "storage", "seaweedfs", False, str(e.reason)) diff --git a/sunbeam/tests/test_checks.py b/sunbeam/tests/test_checks.py index b414c9c..7023188 100644 --- a/sunbeam/tests/test_checks.py +++ b/sunbeam/tests/test_checks.py @@ -126,28 +126,54 @@ class TestCheckOpenbao(unittest.TestCase): class TestCheckSeaweedfs(unittest.TestCase): - def test_200_passes(self): - with patch("sunbeam.checks._http_get", return_value=(200, b"")): - from sunbeam import checks - r = checks.check_seaweedfs("testdomain", None) - self.assertTrue(r.passed) + def _with_creds(self, http_result=None, http_error=None): + """Helper: patch both _kube_secret (returns creds) and _http_get.""" + def secret_side_effect(ns, name, key): + return "testkey" if key == "S3_ACCESS_KEY" else "testsecret" - def test_403_unauthenticated_passes(self): - # S3 returns 403 for unauthenticated requests — that means it's up. - with patch("sunbeam.checks._http_get", return_value=(403, b"")): + patches = [ + patch("sunbeam.checks._kube_secret", side_effect=secret_side_effect), + ] + if http_error: + patches.append(patch("sunbeam.checks._http_get", side_effect=http_error)) + else: + patches.append(patch("sunbeam.checks._http_get", return_value=http_result)) + return patches + + def test_200_authenticated_passes(self): + with patch("sunbeam.checks._kube_secret", return_value="val"), \ + patch("sunbeam.checks._http_get", return_value=(200, b"")): from sunbeam import checks r = checks.check_seaweedfs("testdomain", None) self.assertTrue(r.passed) + self.assertIn("authenticated", r.detail) + + def test_missing_credentials_fails(self): + with patch("sunbeam.checks._kube_secret", return_value=""): + from sunbeam import checks + r = checks.check_seaweedfs("testdomain", None) + self.assertFalse(r.passed) + self.assertIn("secret", r.detail) + + def test_403_bad_credentials_fails(self): + with patch("sunbeam.checks._kube_secret", return_value="val"), \ + patch("sunbeam.checks._http_get", return_value=(403, b"")): + from sunbeam import checks + r = checks.check_seaweedfs("testdomain", None) + self.assertFalse(r.passed) + self.assertIn("403", r.detail) def test_502_fails(self): - with patch("sunbeam.checks._http_get", return_value=(502, b"")): + with patch("sunbeam.checks._kube_secret", return_value="val"), \ + patch("sunbeam.checks._http_get", return_value=(502, b"")): from sunbeam import checks r = checks.check_seaweedfs("testdomain", None) self.assertFalse(r.passed) def test_connection_error_fails(self): import urllib.error - with patch("sunbeam.checks._http_get", + with patch("sunbeam.checks._kube_secret", return_value="val"), \ + patch("sunbeam.checks._http_get", side_effect=urllib.error.URLError("refused")): from sunbeam import checks r = checks.check_seaweedfs("testdomain", None)