check: rewrite seaweedfs probe with S3 SigV4 auth
Replaced the unauthenticated SeaweedFS probe (which accepted any HTTP < 500 as passing) with a signed S3 ListBuckets request using AWS Signature V4. Credentials are read from the seaweedfs-s3-credentials K8s secret; a 200 response confirms authentication is working. Updated tests to cover missing creds, 403 bad-creds, 502 gateway error, and URLError cases.
This commit is contained in:
@@ -126,28 +126,54 @@ class TestCheckOpenbao(unittest.TestCase):
|
||||
|
||||
|
||||
class TestCheckSeaweedfs(unittest.TestCase):
|
||||
def test_200_passes(self):
|
||||
with patch("sunbeam.checks._http_get", return_value=(200, b"")):
|
||||
from sunbeam import checks
|
||||
r = checks.check_seaweedfs("testdomain", None)
|
||||
self.assertTrue(r.passed)
|
||||
def _with_creds(self, http_result=None, http_error=None):
|
||||
"""Helper: patch both _kube_secret (returns creds) and _http_get."""
|
||||
def secret_side_effect(ns, name, key):
|
||||
return "testkey" if key == "S3_ACCESS_KEY" else "testsecret"
|
||||
|
||||
def test_403_unauthenticated_passes(self):
|
||||
# S3 returns 403 for unauthenticated requests — that means it's up.
|
||||
with patch("sunbeam.checks._http_get", return_value=(403, b"")):
|
||||
patches = [
|
||||
patch("sunbeam.checks._kube_secret", side_effect=secret_side_effect),
|
||||
]
|
||||
if http_error:
|
||||
patches.append(patch("sunbeam.checks._http_get", side_effect=http_error))
|
||||
else:
|
||||
patches.append(patch("sunbeam.checks._http_get", return_value=http_result))
|
||||
return patches
|
||||
|
||||
def test_200_authenticated_passes(self):
|
||||
with patch("sunbeam.checks._kube_secret", return_value="val"), \
|
||||
patch("sunbeam.checks._http_get", return_value=(200, b"")):
|
||||
from sunbeam import checks
|
||||
r = checks.check_seaweedfs("testdomain", None)
|
||||
self.assertTrue(r.passed)
|
||||
self.assertIn("authenticated", r.detail)
|
||||
|
||||
def test_missing_credentials_fails(self):
|
||||
with patch("sunbeam.checks._kube_secret", return_value=""):
|
||||
from sunbeam import checks
|
||||
r = checks.check_seaweedfs("testdomain", None)
|
||||
self.assertFalse(r.passed)
|
||||
self.assertIn("secret", r.detail)
|
||||
|
||||
def test_403_bad_credentials_fails(self):
|
||||
with patch("sunbeam.checks._kube_secret", return_value="val"), \
|
||||
patch("sunbeam.checks._http_get", return_value=(403, b"")):
|
||||
from sunbeam import checks
|
||||
r = checks.check_seaweedfs("testdomain", None)
|
||||
self.assertFalse(r.passed)
|
||||
self.assertIn("403", r.detail)
|
||||
|
||||
def test_502_fails(self):
|
||||
with patch("sunbeam.checks._http_get", return_value=(502, b"")):
|
||||
with patch("sunbeam.checks._kube_secret", return_value="val"), \
|
||||
patch("sunbeam.checks._http_get", return_value=(502, b"")):
|
||||
from sunbeam import checks
|
||||
r = checks.check_seaweedfs("testdomain", None)
|
||||
self.assertFalse(r.passed)
|
||||
|
||||
def test_connection_error_fails(self):
|
||||
import urllib.error
|
||||
with patch("sunbeam.checks._http_get",
|
||||
with patch("sunbeam.checks._kube_secret", return_value="val"), \
|
||||
patch("sunbeam.checks._http_get",
|
||||
side_effect=urllib.error.URLError("refused")):
|
||||
from sunbeam import checks
|
||||
r = checks.check_seaweedfs("testdomain", None)
|
||||
|
||||
Reference in New Issue
Block a user