check: rewrite seaweedfs probe with S3 SigV4 auth
Replaced the unauthenticated SeaweedFS probe (which accepted any HTTP < 500 as passing) with a signed S3 ListBuckets request using AWS Signature V4. Credentials are read from the seaweedfs-s3-credentials K8s secret; a 200 response confirms authentication is working. Updated tests to cover missing creds, 403 bad-creds, 502 gateway error, and URLError cases.
This commit is contained in:
@@ -1,5 +1,7 @@
|
|||||||
"""Service-level health checks — functional probes beyond pod readiness."""
|
"""Service-level health checks — functional probes beyond pod readiness."""
|
||||||
import base64
|
import base64
|
||||||
|
import hashlib
|
||||||
|
import hmac
|
||||||
import json
|
import json
|
||||||
import ssl
|
import ssl
|
||||||
import subprocess
|
import subprocess
|
||||||
@@ -7,6 +9,7 @@ import urllib.error
|
|||||||
import urllib.request
|
import urllib.request
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
from datetime import datetime, timezone
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
@@ -136,13 +139,13 @@ def check_valkey(domain: str, opener) -> CheckResult:
|
|||||||
pod = pod.splitlines()[0].strip() if pod else ""
|
pod = pod.splitlines()[0].strip() if pod else ""
|
||||||
if not pod:
|
if not pod:
|
||||||
return CheckResult("valkey", "data", "valkey", False, "no valkey pod")
|
return CheckResult("valkey", "data", "valkey", False, "no valkey pod")
|
||||||
_, out = kube_exec("data", pod, "valkey-cli", "ping")
|
_, out = kube_exec("data", pod, "valkey-cli", "ping", container="valkey")
|
||||||
return CheckResult("valkey", "data", "valkey", out == "PONG", out or "no response")
|
return CheckResult("valkey", "data", "valkey", out == "PONG", out or "no response")
|
||||||
|
|
||||||
|
|
||||||
def check_openbao(domain: str, opener) -> CheckResult:
|
def check_openbao(domain: str, opener) -> CheckResult:
|
||||||
"""kubectl exec openbao-0 -- bao status -format=json -> initialized + unsealed."""
|
"""kubectl exec openbao-0 -- bao status -format=json -> initialized + unsealed."""
|
||||||
rc, out = kube_exec("data", "openbao-0", "bao", "status", "-format=json")
|
rc, out = kube_exec("data", "openbao-0", "bao", "status", "-format=json", container="openbao")
|
||||||
if not out:
|
if not out:
|
||||||
return CheckResult("openbao", "data", "openbao", False, "no response")
|
return CheckResult("openbao", "data", "openbao", False, "no response")
|
||||||
try:
|
try:
|
||||||
@@ -155,13 +158,52 @@ def check_openbao(domain: str, opener) -> CheckResult:
|
|||||||
return CheckResult("openbao", "data", "openbao", False, out[:80])
|
return CheckResult("openbao", "data", "openbao", False, out[:80])
|
||||||
|
|
||||||
|
|
||||||
|
def _s3_auth_headers(access_key: str, secret_key: str, host: str) -> dict:
|
||||||
|
"""Return Authorization + x-amz-date headers for an unsigned GET / S3 request."""
|
||||||
|
t = datetime.now(tz=timezone.utc)
|
||||||
|
amzdate = t.strftime("%Y%m%dT%H%M%SZ")
|
||||||
|
datestamp = t.strftime("%Y%m%d")
|
||||||
|
|
||||||
|
payload_hash = hashlib.sha256(b"").hexdigest()
|
||||||
|
canonical = f"GET\n/\n\nhost:{host}\nx-amz-date:{amzdate}\n\nhost;x-amz-date\n{payload_hash}"
|
||||||
|
credential_scope = f"{datestamp}/us-east-1/s3/aws4_request"
|
||||||
|
string_to_sign = (
|
||||||
|
f"AWS4-HMAC-SHA256\n{amzdate}\n{credential_scope}\n"
|
||||||
|
f"{hashlib.sha256(canonical.encode()).hexdigest()}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _sign(key: bytes, msg: str) -> bytes:
|
||||||
|
return hmac.new(key, msg.encode(), hashlib.sha256).digest()
|
||||||
|
|
||||||
|
k = _sign(f"AWS4{secret_key}".encode(), datestamp)
|
||||||
|
k = _sign(k, "us-east-1")
|
||||||
|
k = _sign(k, "s3")
|
||||||
|
k = _sign(k, "aws4_request")
|
||||||
|
sig = hmac.new(k, string_to_sign.encode(), hashlib.sha256).hexdigest()
|
||||||
|
|
||||||
|
auth = (
|
||||||
|
f"AWS4-HMAC-SHA256 Credential={access_key}/{credential_scope},"
|
||||||
|
f" SignedHeaders=host;x-amz-date, Signature={sig}"
|
||||||
|
)
|
||||||
|
return {"Authorization": auth, "x-amz-date": amzdate}
|
||||||
|
|
||||||
|
|
||||||
def check_seaweedfs(domain: str, opener) -> CheckResult:
|
def check_seaweedfs(domain: str, opener) -> CheckResult:
|
||||||
"""GET https://s3.{domain}/ -> any response from the S3 API (< 500)."""
|
"""GET https://s3.{domain}/ with S3 credentials -> 200 list-buckets response."""
|
||||||
url = f"https://s3.{domain}/"
|
access_key = _kube_secret("storage", "seaweedfs-s3-credentials", "S3_ACCESS_KEY")
|
||||||
|
secret_key = _kube_secret("storage", "seaweedfs-s3-credentials", "S3_SECRET_KEY")
|
||||||
|
if not access_key or not secret_key:
|
||||||
|
return CheckResult("seaweedfs", "storage", "seaweedfs", False,
|
||||||
|
"credentials not found in seaweedfs-s3-credentials secret")
|
||||||
|
|
||||||
|
host = f"s3.{domain}"
|
||||||
|
url = f"https://{host}/"
|
||||||
|
headers = _s3_auth_headers(access_key, secret_key, host)
|
||||||
try:
|
try:
|
||||||
status, _ = _http_get(url, opener)
|
status, _ = _http_get(url, opener, headers=headers)
|
||||||
# Unauthenticated S3 returns 403 (expected); 200 also ok; 5xx = problem.
|
if status == 200:
|
||||||
return CheckResult("seaweedfs", "storage", "seaweedfs", status < 500, f"HTTP {status}")
|
return CheckResult("seaweedfs", "storage", "seaweedfs", True, "S3 authenticated")
|
||||||
|
return CheckResult("seaweedfs", "storage", "seaweedfs", False, f"HTTP {status}")
|
||||||
except urllib.error.URLError as e:
|
except urllib.error.URLError as e:
|
||||||
return CheckResult("seaweedfs", "storage", "seaweedfs", False, str(e.reason))
|
return CheckResult("seaweedfs", "storage", "seaweedfs", False, str(e.reason))
|
||||||
|
|
||||||
|
|||||||
@@ -126,28 +126,54 @@ class TestCheckOpenbao(unittest.TestCase):
|
|||||||
|
|
||||||
|
|
||||||
class TestCheckSeaweedfs(unittest.TestCase):
|
class TestCheckSeaweedfs(unittest.TestCase):
|
||||||
def test_200_passes(self):
|
def _with_creds(self, http_result=None, http_error=None):
|
||||||
with patch("sunbeam.checks._http_get", return_value=(200, b"")):
|
"""Helper: patch both _kube_secret (returns creds) and _http_get."""
|
||||||
from sunbeam import checks
|
def secret_side_effect(ns, name, key):
|
||||||
r = checks.check_seaweedfs("testdomain", None)
|
return "testkey" if key == "S3_ACCESS_KEY" else "testsecret"
|
||||||
self.assertTrue(r.passed)
|
|
||||||
|
|
||||||
def test_403_unauthenticated_passes(self):
|
patches = [
|
||||||
# S3 returns 403 for unauthenticated requests — that means it's up.
|
patch("sunbeam.checks._kube_secret", side_effect=secret_side_effect),
|
||||||
with patch("sunbeam.checks._http_get", return_value=(403, b"")):
|
]
|
||||||
|
if http_error:
|
||||||
|
patches.append(patch("sunbeam.checks._http_get", side_effect=http_error))
|
||||||
|
else:
|
||||||
|
patches.append(patch("sunbeam.checks._http_get", return_value=http_result))
|
||||||
|
return patches
|
||||||
|
|
||||||
|
def test_200_authenticated_passes(self):
|
||||||
|
with patch("sunbeam.checks._kube_secret", return_value="val"), \
|
||||||
|
patch("sunbeam.checks._http_get", return_value=(200, b"")):
|
||||||
from sunbeam import checks
|
from sunbeam import checks
|
||||||
r = checks.check_seaweedfs("testdomain", None)
|
r = checks.check_seaweedfs("testdomain", None)
|
||||||
self.assertTrue(r.passed)
|
self.assertTrue(r.passed)
|
||||||
|
self.assertIn("authenticated", r.detail)
|
||||||
|
|
||||||
|
def test_missing_credentials_fails(self):
|
||||||
|
with patch("sunbeam.checks._kube_secret", return_value=""):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_seaweedfs("testdomain", None)
|
||||||
|
self.assertFalse(r.passed)
|
||||||
|
self.assertIn("secret", r.detail)
|
||||||
|
|
||||||
|
def test_403_bad_credentials_fails(self):
|
||||||
|
with patch("sunbeam.checks._kube_secret", return_value="val"), \
|
||||||
|
patch("sunbeam.checks._http_get", return_value=(403, b"")):
|
||||||
|
from sunbeam import checks
|
||||||
|
r = checks.check_seaweedfs("testdomain", None)
|
||||||
|
self.assertFalse(r.passed)
|
||||||
|
self.assertIn("403", r.detail)
|
||||||
|
|
||||||
def test_502_fails(self):
|
def test_502_fails(self):
|
||||||
with patch("sunbeam.checks._http_get", return_value=(502, b"")):
|
with patch("sunbeam.checks._kube_secret", return_value="val"), \
|
||||||
|
patch("sunbeam.checks._http_get", return_value=(502, b"")):
|
||||||
from sunbeam import checks
|
from sunbeam import checks
|
||||||
r = checks.check_seaweedfs("testdomain", None)
|
r = checks.check_seaweedfs("testdomain", None)
|
||||||
self.assertFalse(r.passed)
|
self.assertFalse(r.passed)
|
||||||
|
|
||||||
def test_connection_error_fails(self):
|
def test_connection_error_fails(self):
|
||||||
import urllib.error
|
import urllib.error
|
||||||
with patch("sunbeam.checks._http_get",
|
with patch("sunbeam.checks._kube_secret", return_value="val"), \
|
||||||
|
patch("sunbeam.checks._http_get",
|
||||||
side_effect=urllib.error.URLError("refused")):
|
side_effect=urllib.error.URLError("refused")):
|
||||||
from sunbeam import checks
|
from sunbeam import checks
|
||||||
r = checks.check_seaweedfs("testdomain", None)
|
r = checks.check_seaweedfs("testdomain", None)
|
||||||
|
|||||||
Reference in New Issue
Block a user