check: rewrite seaweedfs probe with S3 SigV4 auth

Replaced the unauthenticated SeaweedFS probe (which accepted any HTTP
< 500 as passing) with a signed S3 ListBuckets request using AWS
Signature V4. Credentials are read from the seaweedfs-s3-credentials
K8s secret; a 200 response confirms authentication is working.

Updated tests to cover missing creds, 403 bad-creds, 502 gateway error,
and URLError cases.
This commit is contained in:
2026-03-03 00:57:27 +00:00
parent 6bd59abd74
commit 0acbf66673
2 changed files with 85 additions and 17 deletions

View File

@@ -1,5 +1,7 @@
"""Service-level health checks — functional probes beyond pod readiness."""
import base64
import hashlib
import hmac
import json
import ssl
import subprocess
@@ -7,6 +9,7 @@ import urllib.error
import urllib.request
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
@@ -136,13 +139,13 @@ def check_valkey(domain: str, opener) -> CheckResult:
pod = pod.splitlines()[0].strip() if pod else ""
if not pod:
return CheckResult("valkey", "data", "valkey", False, "no valkey pod")
_, out = kube_exec("data", pod, "valkey-cli", "ping")
_, out = kube_exec("data", pod, "valkey-cli", "ping", container="valkey")
return CheckResult("valkey", "data", "valkey", out == "PONG", out or "no response")
def check_openbao(domain: str, opener) -> CheckResult:
"""kubectl exec openbao-0 -- bao status -format=json -> initialized + unsealed."""
rc, out = kube_exec("data", "openbao-0", "bao", "status", "-format=json")
rc, out = kube_exec("data", "openbao-0", "bao", "status", "-format=json", container="openbao")
if not out:
return CheckResult("openbao", "data", "openbao", False, "no response")
try:
@@ -155,13 +158,52 @@ def check_openbao(domain: str, opener) -> CheckResult:
return CheckResult("openbao", "data", "openbao", False, out[:80])
def _s3_auth_headers(access_key: str, secret_key: str, host: str) -> dict:
"""Return Authorization + x-amz-date headers for an unsigned GET / S3 request."""
t = datetime.now(tz=timezone.utc)
amzdate = t.strftime("%Y%m%dT%H%M%SZ")
datestamp = t.strftime("%Y%m%d")
payload_hash = hashlib.sha256(b"").hexdigest()
canonical = f"GET\n/\n\nhost:{host}\nx-amz-date:{amzdate}\n\nhost;x-amz-date\n{payload_hash}"
credential_scope = f"{datestamp}/us-east-1/s3/aws4_request"
string_to_sign = (
f"AWS4-HMAC-SHA256\n{amzdate}\n{credential_scope}\n"
f"{hashlib.sha256(canonical.encode()).hexdigest()}"
)
def _sign(key: bytes, msg: str) -> bytes:
return hmac.new(key, msg.encode(), hashlib.sha256).digest()
k = _sign(f"AWS4{secret_key}".encode(), datestamp)
k = _sign(k, "us-east-1")
k = _sign(k, "s3")
k = _sign(k, "aws4_request")
sig = hmac.new(k, string_to_sign.encode(), hashlib.sha256).hexdigest()
auth = (
f"AWS4-HMAC-SHA256 Credential={access_key}/{credential_scope},"
f" SignedHeaders=host;x-amz-date, Signature={sig}"
)
return {"Authorization": auth, "x-amz-date": amzdate}
def check_seaweedfs(domain: str, opener) -> CheckResult:
"""GET https://s3.{domain}/ -> any response from the S3 API (< 500)."""
url = f"https://s3.{domain}/"
"""GET https://s3.{domain}/ with S3 credentials -> 200 list-buckets response."""
access_key = _kube_secret("storage", "seaweedfs-s3-credentials", "S3_ACCESS_KEY")
secret_key = _kube_secret("storage", "seaweedfs-s3-credentials", "S3_SECRET_KEY")
if not access_key or not secret_key:
return CheckResult("seaweedfs", "storage", "seaweedfs", False,
"credentials not found in seaweedfs-s3-credentials secret")
host = f"s3.{domain}"
url = f"https://{host}/"
headers = _s3_auth_headers(access_key, secret_key, host)
try:
status, _ = _http_get(url, opener)
# Unauthenticated S3 returns 403 (expected); 200 also ok; 5xx = problem.
return CheckResult("seaweedfs", "storage", "seaweedfs", status < 500, f"HTTP {status}")
status, _ = _http_get(url, opener, headers=headers)
if status == 200:
return CheckResult("seaweedfs", "storage", "seaweedfs", True, "S3 authenticated")
return CheckResult("seaweedfs", "storage", "seaweedfs", False, f"HTTP {status}")
except urllib.error.URLError as e:
return CheckResult("seaweedfs", "storage", "seaweedfs", False, str(e.reason))

View File

@@ -126,28 +126,54 @@ class TestCheckOpenbao(unittest.TestCase):
class TestCheckSeaweedfs(unittest.TestCase):
def test_200_passes(self):
with patch("sunbeam.checks._http_get", return_value=(200, b"")):
from sunbeam import checks
r = checks.check_seaweedfs("testdomain", None)
self.assertTrue(r.passed)
def _with_creds(self, http_result=None, http_error=None):
"""Helper: patch both _kube_secret (returns creds) and _http_get."""
def secret_side_effect(ns, name, key):
return "testkey" if key == "S3_ACCESS_KEY" else "testsecret"
def test_403_unauthenticated_passes(self):
# S3 returns 403 for unauthenticated requests — that means it's up.
with patch("sunbeam.checks._http_get", return_value=(403, b"")):
patches = [
patch("sunbeam.checks._kube_secret", side_effect=secret_side_effect),
]
if http_error:
patches.append(patch("sunbeam.checks._http_get", side_effect=http_error))
else:
patches.append(patch("sunbeam.checks._http_get", return_value=http_result))
return patches
def test_200_authenticated_passes(self):
with patch("sunbeam.checks._kube_secret", return_value="val"), \
patch("sunbeam.checks._http_get", return_value=(200, b"")):
from sunbeam import checks
r = checks.check_seaweedfs("testdomain", None)
self.assertTrue(r.passed)
self.assertIn("authenticated", r.detail)
def test_missing_credentials_fails(self):
with patch("sunbeam.checks._kube_secret", return_value=""):
from sunbeam import checks
r = checks.check_seaweedfs("testdomain", None)
self.assertFalse(r.passed)
self.assertIn("secret", r.detail)
def test_403_bad_credentials_fails(self):
with patch("sunbeam.checks._kube_secret", return_value="val"), \
patch("sunbeam.checks._http_get", return_value=(403, b"")):
from sunbeam import checks
r = checks.check_seaweedfs("testdomain", None)
self.assertFalse(r.passed)
self.assertIn("403", r.detail)
def test_502_fails(self):
with patch("sunbeam.checks._http_get", return_value=(502, b"")):
with patch("sunbeam.checks._kube_secret", return_value="val"), \
patch("sunbeam.checks._http_get", return_value=(502, b"")):
from sunbeam import checks
r = checks.check_seaweedfs("testdomain", None)
self.assertFalse(r.passed)
def test_connection_error_fails(self):
import urllib.error
with patch("sunbeam.checks._http_get",
with patch("sunbeam.checks._kube_secret", return_value="val"), \
patch("sunbeam.checks._http_get",
side_effect=urllib.error.URLError("refused")):
from sunbeam import checks
r = checks.check_seaweedfs("testdomain", None)