check: rewrite seaweedfs probe with S3 SigV4 auth

Replaced the unauthenticated SeaweedFS probe (which accepted any HTTP
< 500 as passing) with a signed S3 ListBuckets request using AWS
Signature V4. Credentials are read from the seaweedfs-s3-credentials
K8s secret; a 200 response confirms authentication is working.

Updated tests to cover missing creds, 403 bad-creds, 502 gateway error,
and URLError cases.
This commit is contained in:
2026-03-03 00:57:27 +00:00
parent 6bd59abd74
commit 0acbf66673
2 changed files with 85 additions and 17 deletions

View File

@@ -1,5 +1,7 @@
"""Service-level health checks — functional probes beyond pod readiness.""" """Service-level health checks — functional probes beyond pod readiness."""
import base64 import base64
import hashlib
import hmac
import json import json
import ssl import ssl
import subprocess import subprocess
@@ -7,6 +9,7 @@ import urllib.error
import urllib.request import urllib.request
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@@ -136,13 +139,13 @@ def check_valkey(domain: str, opener) -> CheckResult:
pod = pod.splitlines()[0].strip() if pod else "" pod = pod.splitlines()[0].strip() if pod else ""
if not pod: if not pod:
return CheckResult("valkey", "data", "valkey", False, "no valkey pod") return CheckResult("valkey", "data", "valkey", False, "no valkey pod")
_, out = kube_exec("data", pod, "valkey-cli", "ping") _, out = kube_exec("data", pod, "valkey-cli", "ping", container="valkey")
return CheckResult("valkey", "data", "valkey", out == "PONG", out or "no response") return CheckResult("valkey", "data", "valkey", out == "PONG", out or "no response")
def check_openbao(domain: str, opener) -> CheckResult: def check_openbao(domain: str, opener) -> CheckResult:
"""kubectl exec openbao-0 -- bao status -format=json -> initialized + unsealed.""" """kubectl exec openbao-0 -- bao status -format=json -> initialized + unsealed."""
rc, out = kube_exec("data", "openbao-0", "bao", "status", "-format=json") rc, out = kube_exec("data", "openbao-0", "bao", "status", "-format=json", container="openbao")
if not out: if not out:
return CheckResult("openbao", "data", "openbao", False, "no response") return CheckResult("openbao", "data", "openbao", False, "no response")
try: try:
@@ -155,13 +158,52 @@ def check_openbao(domain: str, opener) -> CheckResult:
return CheckResult("openbao", "data", "openbao", False, out[:80]) return CheckResult("openbao", "data", "openbao", False, out[:80])
def _s3_auth_headers(access_key: str, secret_key: str, host: str) -> dict:
"""Return Authorization + x-amz-date headers for an unsigned GET / S3 request."""
t = datetime.now(tz=timezone.utc)
amzdate = t.strftime("%Y%m%dT%H%M%SZ")
datestamp = t.strftime("%Y%m%d")
payload_hash = hashlib.sha256(b"").hexdigest()
canonical = f"GET\n/\n\nhost:{host}\nx-amz-date:{amzdate}\n\nhost;x-amz-date\n{payload_hash}"
credential_scope = f"{datestamp}/us-east-1/s3/aws4_request"
string_to_sign = (
f"AWS4-HMAC-SHA256\n{amzdate}\n{credential_scope}\n"
f"{hashlib.sha256(canonical.encode()).hexdigest()}"
)
def _sign(key: bytes, msg: str) -> bytes:
return hmac.new(key, msg.encode(), hashlib.sha256).digest()
k = _sign(f"AWS4{secret_key}".encode(), datestamp)
k = _sign(k, "us-east-1")
k = _sign(k, "s3")
k = _sign(k, "aws4_request")
sig = hmac.new(k, string_to_sign.encode(), hashlib.sha256).hexdigest()
auth = (
f"AWS4-HMAC-SHA256 Credential={access_key}/{credential_scope},"
f" SignedHeaders=host;x-amz-date, Signature={sig}"
)
return {"Authorization": auth, "x-amz-date": amzdate}
def check_seaweedfs(domain: str, opener) -> CheckResult: def check_seaweedfs(domain: str, opener) -> CheckResult:
"""GET https://s3.{domain}/ -> any response from the S3 API (< 500).""" """GET https://s3.{domain}/ with S3 credentials -> 200 list-buckets response."""
url = f"https://s3.{domain}/" access_key = _kube_secret("storage", "seaweedfs-s3-credentials", "S3_ACCESS_KEY")
secret_key = _kube_secret("storage", "seaweedfs-s3-credentials", "S3_SECRET_KEY")
if not access_key or not secret_key:
return CheckResult("seaweedfs", "storage", "seaweedfs", False,
"credentials not found in seaweedfs-s3-credentials secret")
host = f"s3.{domain}"
url = f"https://{host}/"
headers = _s3_auth_headers(access_key, secret_key, host)
try: try:
status, _ = _http_get(url, opener) status, _ = _http_get(url, opener, headers=headers)
# Unauthenticated S3 returns 403 (expected); 200 also ok; 5xx = problem. if status == 200:
return CheckResult("seaweedfs", "storage", "seaweedfs", status < 500, f"HTTP {status}") return CheckResult("seaweedfs", "storage", "seaweedfs", True, "S3 authenticated")
return CheckResult("seaweedfs", "storage", "seaweedfs", False, f"HTTP {status}")
except urllib.error.URLError as e: except urllib.error.URLError as e:
return CheckResult("seaweedfs", "storage", "seaweedfs", False, str(e.reason)) return CheckResult("seaweedfs", "storage", "seaweedfs", False, str(e.reason))

View File

@@ -126,28 +126,54 @@ class TestCheckOpenbao(unittest.TestCase):
class TestCheckSeaweedfs(unittest.TestCase): class TestCheckSeaweedfs(unittest.TestCase):
def test_200_passes(self): def _with_creds(self, http_result=None, http_error=None):
with patch("sunbeam.checks._http_get", return_value=(200, b"")): """Helper: patch both _kube_secret (returns creds) and _http_get."""
from sunbeam import checks def secret_side_effect(ns, name, key):
r = checks.check_seaweedfs("testdomain", None) return "testkey" if key == "S3_ACCESS_KEY" else "testsecret"
self.assertTrue(r.passed)
def test_403_unauthenticated_passes(self): patches = [
# S3 returns 403 for unauthenticated requests — that means it's up. patch("sunbeam.checks._kube_secret", side_effect=secret_side_effect),
with patch("sunbeam.checks._http_get", return_value=(403, b"")): ]
if http_error:
patches.append(patch("sunbeam.checks._http_get", side_effect=http_error))
else:
patches.append(patch("sunbeam.checks._http_get", return_value=http_result))
return patches
def test_200_authenticated_passes(self):
with patch("sunbeam.checks._kube_secret", return_value="val"), \
patch("sunbeam.checks._http_get", return_value=(200, b"")):
from sunbeam import checks from sunbeam import checks
r = checks.check_seaweedfs("testdomain", None) r = checks.check_seaweedfs("testdomain", None)
self.assertTrue(r.passed) self.assertTrue(r.passed)
self.assertIn("authenticated", r.detail)
def test_missing_credentials_fails(self):
with patch("sunbeam.checks._kube_secret", return_value=""):
from sunbeam import checks
r = checks.check_seaweedfs("testdomain", None)
self.assertFalse(r.passed)
self.assertIn("secret", r.detail)
def test_403_bad_credentials_fails(self):
with patch("sunbeam.checks._kube_secret", return_value="val"), \
patch("sunbeam.checks._http_get", return_value=(403, b"")):
from sunbeam import checks
r = checks.check_seaweedfs("testdomain", None)
self.assertFalse(r.passed)
self.assertIn("403", r.detail)
def test_502_fails(self): def test_502_fails(self):
with patch("sunbeam.checks._http_get", return_value=(502, b"")): with patch("sunbeam.checks._kube_secret", return_value="val"), \
patch("sunbeam.checks._http_get", return_value=(502, b"")):
from sunbeam import checks from sunbeam import checks
r = checks.check_seaweedfs("testdomain", None) r = checks.check_seaweedfs("testdomain", None)
self.assertFalse(r.passed) self.assertFalse(r.passed)
def test_connection_error_fails(self): def test_connection_error_fails(self):
import urllib.error import urllib.error
with patch("sunbeam.checks._http_get", with patch("sunbeam.checks._kube_secret", return_value="val"), \
patch("sunbeam.checks._http_get",
side_effect=urllib.error.URLError("refused")): side_effect=urllib.error.URLError("refused")):
from sunbeam import checks from sunbeam import checks
r = checks.check_seaweedfs("testdomain", None) r = checks.check_seaweedfs("testdomain", None)