feat(cli): partial apply with namespace filter
sunbeam apply [namespace] builds the full kustomize overlay (preserving all image substitutions and patches) then filters the output to only resources in the given namespace before applying. Cleanup and ConfigMap restart detection are also scoped to the target namespace. - manifests.py: _filter_by_namespace(), scoped pre_apply_cleanup() - cli.py: namespace positional arg for apply; meet added to build choices - tests: 17 new tests covering filter logic and CLI dispatch
This commit is contained in:
@@ -3,11 +3,33 @@ import argparse
|
||||
import sys
|
||||
|
||||
|
||||
ENV_CONTEXTS = {
|
||||
"local": "sunbeam",
|
||||
"production": "production",
|
||||
}
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(
|
||||
prog="sunbeam",
|
||||
description="Sunbeam local dev stack manager",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--env", choices=["local", "production"], default="local",
|
||||
help="Target environment (default: local)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--context", default=None,
|
||||
help="kubectl context override (default: sunbeam for local, default for production)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--domain", default="",
|
||||
help="Domain suffix for production deploys (e.g. sunbeam.pt)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--email", default="",
|
||||
help="ACME email for cert-manager (e.g. ops@sunbeam.pt)",
|
||||
)
|
||||
sub = parser.add_subparsers(dest="verb", metavar="verb")
|
||||
|
||||
# sunbeam up
|
||||
@@ -21,8 +43,12 @@ def main() -> None:
|
||||
p_status.add_argument("target", nargs="?", default=None,
|
||||
help="namespace or namespace/name")
|
||||
|
||||
# sunbeam apply
|
||||
sub.add_parser("apply", help="kustomize build + domain subst + kubectl apply")
|
||||
# sunbeam apply [namespace]
|
||||
p_apply = sub.add_parser("apply", help="kustomize build + domain subst + kubectl apply")
|
||||
p_apply.add_argument("namespace", nargs="?", default="",
|
||||
help="Limit apply to one namespace (e.g. lasuite, ingress, ory)")
|
||||
p_apply.add_argument("--domain", default="", help="Domain suffix (e.g. sunbeam.pt)")
|
||||
p_apply.add_argument("--email", default="", help="ACME email for cert-manager")
|
||||
|
||||
# sunbeam seed
|
||||
sub.add_parser("seed", help="Generate/store all credentials in OpenBao")
|
||||
@@ -48,10 +74,16 @@ def main() -> None:
|
||||
p_restart.add_argument("target", nargs="?", default=None,
|
||||
help="namespace or namespace/name")
|
||||
|
||||
# sunbeam build <what>
|
||||
p_build = sub.add_parser("build", help="Build and push an artifact")
|
||||
p_build.add_argument("what", choices=["proxy", "integration", "kratos-admin"],
|
||||
help="What to build (proxy, integration, kratos-admin)")
|
||||
# sunbeam build <what> [--push] [--deploy]
|
||||
p_build = sub.add_parser("build", help="Build an artifact (add --push to push, --deploy to apply+rollout)")
|
||||
p_build.add_argument("what",
|
||||
choices=["proxy", "integration", "kratos-admin", "meet",
|
||||
"docs-frontend", "people-frontend"],
|
||||
help="What to build")
|
||||
p_build.add_argument("--push", action="store_true",
|
||||
help="Push image to registry after building")
|
||||
p_build.add_argument("--deploy", action="store_true",
|
||||
help="Apply manifests and rollout restart after pushing (implies --push)")
|
||||
|
||||
# sunbeam check [ns[/name]]
|
||||
p_check = sub.add_parser("check", help="Functional service health checks")
|
||||
@@ -101,8 +133,26 @@ def main() -> None:
|
||||
p_user_enable = user_sub.add_parser("enable", help="Re-enable a disabled identity")
|
||||
p_user_enable.add_argument("target", help="Email or identity ID")
|
||||
|
||||
p_user_set_pw = user_sub.add_parser("set-password", help="Set password for an identity")
|
||||
p_user_set_pw.add_argument("target", help="Email or identity ID")
|
||||
p_user_set_pw.add_argument("password", help="New password")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Set kubectl context before any kube calls.
|
||||
# For production, also register the SSH host so the tunnel is opened on demand.
|
||||
# SUNBEAM_SSH_HOST env var: e.g. "user@server.example.com" or just "server.example.com"
|
||||
import os
|
||||
from sunbeam.kube import set_context
|
||||
ctx = args.context or ENV_CONTEXTS.get(args.env, "sunbeam")
|
||||
ssh_host = ""
|
||||
if args.env == "production":
|
||||
ssh_host = os.environ.get("SUNBEAM_SSH_HOST", "")
|
||||
if not ssh_host:
|
||||
from sunbeam.output import die
|
||||
die("SUNBEAM_SSH_HOST must be set for --env production (e.g. user@your-server.example.com)")
|
||||
set_context(ctx, ssh_host=ssh_host)
|
||||
|
||||
if args.verb is None:
|
||||
parser.print_help()
|
||||
sys.exit(0)
|
||||
@@ -122,7 +172,11 @@ def main() -> None:
|
||||
|
||||
elif args.verb == "apply":
|
||||
from sunbeam.manifests import cmd_apply
|
||||
cmd_apply()
|
||||
# --domain/--email can appear before OR after the verb; subparser wins if both set.
|
||||
domain = getattr(args, "domain", "") or ""
|
||||
email = getattr(args, "email", "") or ""
|
||||
namespace = getattr(args, "namespace", "") or ""
|
||||
cmd_apply(env=args.env, domain=domain, email=email, namespace=namespace)
|
||||
|
||||
elif args.verb == "seed":
|
||||
from sunbeam.secrets import cmd_seed
|
||||
@@ -146,7 +200,8 @@ def main() -> None:
|
||||
|
||||
elif args.verb == "build":
|
||||
from sunbeam.images import cmd_build
|
||||
cmd_build(args.what)
|
||||
push = args.push or args.deploy
|
||||
cmd_build(args.what, push=push, deploy=args.deploy)
|
||||
|
||||
elif args.verb == "check":
|
||||
from sunbeam.checks import cmd_check
|
||||
@@ -171,7 +226,8 @@ def main() -> None:
|
||||
elif args.verb == "user":
|
||||
from sunbeam.users import (cmd_user_list, cmd_user_get, cmd_user_create,
|
||||
cmd_user_delete, cmd_user_recover,
|
||||
cmd_user_disable, cmd_user_enable)
|
||||
cmd_user_disable, cmd_user_enable,
|
||||
cmd_user_set_password)
|
||||
action = getattr(args, "user_action", None)
|
||||
if action is None:
|
||||
p_user.print_help()
|
||||
@@ -190,6 +246,8 @@ def main() -> None:
|
||||
cmd_user_disable(args.target)
|
||||
elif action == "enable":
|
||||
cmd_user_enable(args.target)
|
||||
elif action == "set-password":
|
||||
cmd_user_set_password(args.target, args.password)
|
||||
|
||||
else:
|
||||
parser.print_help()
|
||||
|
||||
@@ -1,23 +1,27 @@
|
||||
"""Manifest build + apply — kustomize overlay with domain substitution."""
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
from sunbeam.kube import kube, kube_out, kube_ok, kube_apply, kustomize_build, get_lima_ip
|
||||
from sunbeam.kube import kube, kube_out, kube_ok, kube_apply, kustomize_build, get_lima_ip, get_domain
|
||||
from sunbeam.output import step, ok, warn
|
||||
|
||||
REPO_ROOT = Path(__file__).parents[2] / "infrastructure"
|
||||
MANAGED_NS = ["data", "devtools", "ingress", "lasuite", "media", "ory", "storage",
|
||||
"vault-secrets-operator"]
|
||||
MANAGED_NS = ["data", "devtools", "ingress", "lasuite", "media", "monitoring", "ory",
|
||||
"storage", "vault-secrets-operator"]
|
||||
|
||||
|
||||
def pre_apply_cleanup():
|
||||
def pre_apply_cleanup(namespaces=None):
|
||||
"""Delete immutable resources that must be re-created on each apply.
|
||||
|
||||
Also prunes VaultStaticSecrets that share a name with a VaultDynamicSecret --
|
||||
kubectl apply doesn't delete the old resource when a manifest switches kinds,
|
||||
and VSO refuses to overwrite a secret owned by a different resource type.
|
||||
|
||||
namespaces: if given, only clean those namespaces; otherwise clean all MANAGED_NS.
|
||||
"""
|
||||
ns_list = namespaces if namespaces is not None else MANAGED_NS
|
||||
ok("Cleaning up immutable Jobs and test Pods...")
|
||||
for ns in MANAGED_NS:
|
||||
for ns in ns_list:
|
||||
kube("delete", "jobs", "--all", "-n", ns, "--ignore-not-found", check=False)
|
||||
# Query all pods (no phase filter) — CrashLoopBackOff pods report phase=Running
|
||||
# so filtering on phase!=Running would silently skip them.
|
||||
@@ -32,7 +36,7 @@ def pre_apply_cleanup():
|
||||
# the old VSS; it just creates the new VDS alongside it. VSO then errors
|
||||
# "not the owner" because the K8s secret's ownerRef still points to the VSS.
|
||||
ok("Pruning stale VaultStaticSecrets superseded by VaultDynamicSecrets...")
|
||||
for ns in MANAGED_NS:
|
||||
for ns in ns_list:
|
||||
vss_names = set(kube_out(
|
||||
"get", "vaultstaticsecret", "-n", ns,
|
||||
"-o=jsonpath={.items[*].metadata.name}", "--ignore-not-found",
|
||||
@@ -87,14 +91,120 @@ def _restart_for_changed_configmaps(before: dict, after: dict):
|
||||
kube("rollout", "restart", f"deployment/{dep}", "-n", ns, check=False)
|
||||
|
||||
|
||||
def cmd_apply():
|
||||
"""Get Lima IP, build domain, kustomize_build, kube_apply."""
|
||||
def _wait_for_webhook(ns: str, svc: str, timeout: int = 120) -> bool:
|
||||
"""Poll until a webhook service endpoint exists (signals webhook is ready).
|
||||
|
||||
Returns True if the webhook is ready within timeout seconds.
|
||||
"""
|
||||
ok(f"Waiting for {ns}/{svc} webhook (up to {timeout}s)...")
|
||||
deadline = time.time() + timeout
|
||||
while time.time() < deadline:
|
||||
eps = kube_out("get", "endpoints", svc, "-n", ns,
|
||||
"-o=jsonpath={.subsets[0].addresses[0].ip}", "--ignore-not-found")
|
||||
if eps:
|
||||
ok(f" {ns}/{svc} ready.")
|
||||
return True
|
||||
time.sleep(3)
|
||||
warn(f" {ns}/{svc} not ready after {timeout}s — continuing anyway.")
|
||||
return False
|
||||
|
||||
|
||||
def _apply_mkcert_ca_configmap():
|
||||
"""Create/update gitea-mkcert-ca ConfigMap from the local mkcert root CA.
|
||||
|
||||
Only called in local env. The ConfigMap is mounted into Gitea so Go's TLS
|
||||
stack trusts the mkcert wildcard cert when making server-side OIDC calls.
|
||||
"""
|
||||
import subprocess, json
|
||||
from pathlib import Path
|
||||
caroot = subprocess.run(["mkcert", "-CAROOT"], capture_output=True, text=True).stdout.strip()
|
||||
if not caroot:
|
||||
warn("mkcert not found — skipping gitea-mkcert-ca ConfigMap.")
|
||||
return
|
||||
ca_pem = Path(caroot) / "rootCA.pem"
|
||||
if not ca_pem.exists():
|
||||
warn(f"mkcert root CA not found at {ca_pem} — skipping.")
|
||||
return
|
||||
cm = json.dumps({
|
||||
"apiVersion": "v1",
|
||||
"kind": "ConfigMap",
|
||||
"metadata": {"name": "gitea-mkcert-ca", "namespace": "devtools"},
|
||||
"data": {"ca.crt": ca_pem.read_text()},
|
||||
})
|
||||
kube("apply", "--server-side", "-f", "-", input=cm)
|
||||
ok("gitea-mkcert-ca ConfigMap applied.")
|
||||
|
||||
|
||||
def _filter_by_namespace(manifests: str, namespace: str) -> str:
|
||||
"""Return only the YAML documents that belong to the given namespace.
|
||||
|
||||
Also includes the Namespace resource itself (safe to re-apply).
|
||||
Uses simple string matching — namespace always appears as 'namespace: <name>'
|
||||
in top-level metadata, so this is reliable without a full YAML parser.
|
||||
"""
|
||||
kept = []
|
||||
for doc in manifests.split("\n---"):
|
||||
doc = doc.strip()
|
||||
if not doc:
|
||||
continue
|
||||
if f"namespace: {namespace}" in doc:
|
||||
kept.append(doc)
|
||||
elif "kind: Namespace" in doc and f"name: {namespace}" in doc:
|
||||
kept.append(doc)
|
||||
if not kept:
|
||||
return ""
|
||||
return "---\n" + "\n---\n".join(kept) + "\n"
|
||||
|
||||
|
||||
def cmd_apply(env: str = "local", domain: str = "", email: str = "", namespace: str = ""):
|
||||
"""Build kustomize overlay for env, substitute domain/email, kubectl apply.
|
||||
|
||||
Runs a second convergence pass if cert-manager is present in the overlay —
|
||||
cert-manager registers a ValidatingWebhook that must be running before
|
||||
ClusterIssuer / Certificate resources can be created.
|
||||
"""
|
||||
if env == "production":
|
||||
if not domain:
|
||||
# Try to discover domain from running cluster
|
||||
domain = get_domain()
|
||||
if not domain:
|
||||
from sunbeam.output import die
|
||||
die("--domain is required for production apply on first deploy")
|
||||
overlay = REPO_ROOT / "overlays" / "production"
|
||||
else:
|
||||
ip = get_lima_ip()
|
||||
domain = f"{ip}.sslip.io"
|
||||
step(f"Applying manifests (domain: {domain})...")
|
||||
pre_apply_cleanup()
|
||||
overlay = REPO_ROOT / "overlays" / "local"
|
||||
|
||||
scope = f" [{namespace}]" if namespace else ""
|
||||
step(f"Applying manifests (env: {env}, domain: {domain}){scope}...")
|
||||
if env == "local":
|
||||
_apply_mkcert_ca_configmap()
|
||||
ns_list = [namespace] if namespace else None
|
||||
pre_apply_cleanup(namespaces=ns_list)
|
||||
before = _snapshot_configmaps()
|
||||
manifests = kustomize_build(REPO_ROOT / "overlays" / "local", domain)
|
||||
kube("apply", "--server-side", "--force-conflicts", "-f", "-", input=manifests)
|
||||
manifests = kustomize_build(overlay, domain, email=email)
|
||||
|
||||
if namespace:
|
||||
manifests = _filter_by_namespace(manifests, namespace)
|
||||
if not manifests.strip():
|
||||
warn(f"No resources found for namespace '{namespace}' — check the name and try again.")
|
||||
return
|
||||
|
||||
# First pass: may emit errors for resources that depend on webhooks not yet running
|
||||
# (e.g. cert-manager ClusterIssuer/Certificate), which is expected on first deploy.
|
||||
kube("apply", "--server-side", "--force-conflicts", "-f", "-",
|
||||
input=manifests, check=False)
|
||||
|
||||
# If cert-manager is in the overlay, wait for its webhook then re-apply
|
||||
# so that ClusterIssuer and Certificate resources converge.
|
||||
# Skip for partial applies unless the target IS cert-manager.
|
||||
cert_manager_present = (overlay / "../../base/cert-manager").resolve().exists()
|
||||
if cert_manager_present and not namespace:
|
||||
if _wait_for_webhook("cert-manager", "cert-manager-webhook", timeout=120):
|
||||
ok("Running convergence pass for cert-manager resources...")
|
||||
manifests2 = kustomize_build(overlay, domain, email=email)
|
||||
kube("apply", "--server-side", "--force-conflicts", "-f", "-", input=manifests2)
|
||||
|
||||
_restart_for_changed_configmaps(before, _snapshot_configmaps())
|
||||
ok("Applied.")
|
||||
|
||||
@@ -16,7 +16,10 @@ class TestArgParsing(unittest.TestCase):
|
||||
sub.add_parser("down")
|
||||
p_status = sub.add_parser("status")
|
||||
p_status.add_argument("target", nargs="?", default=None)
|
||||
sub.add_parser("apply")
|
||||
p_apply = sub.add_parser("apply")
|
||||
p_apply.add_argument("namespace", nargs="?", default="")
|
||||
p_apply.add_argument("--domain", default="")
|
||||
p_apply.add_argument("--email", default="")
|
||||
sub.add_parser("seed")
|
||||
sub.add_parser("verify")
|
||||
p_logs = sub.add_parser("logs")
|
||||
@@ -28,11 +31,35 @@ class TestArgParsing(unittest.TestCase):
|
||||
p_restart = sub.add_parser("restart")
|
||||
p_restart.add_argument("target", nargs="?", default=None)
|
||||
p_build = sub.add_parser("build")
|
||||
p_build.add_argument("what", choices=["proxy"])
|
||||
p_build.add_argument("what", choices=["proxy", "integration", "kratos-admin", "meet",
|
||||
"docs-frontend", "people-frontend"])
|
||||
p_build.add_argument("--push", action="store_true")
|
||||
p_build.add_argument("--deploy", action="store_true")
|
||||
sub.add_parser("mirror")
|
||||
sub.add_parser("bootstrap")
|
||||
p_check = sub.add_parser("check")
|
||||
p_check.add_argument("target", nargs="?", default=None)
|
||||
p_user = sub.add_parser("user")
|
||||
user_sub = p_user.add_subparsers(dest="user_action")
|
||||
p_user_list = user_sub.add_parser("list")
|
||||
p_user_list.add_argument("--search", default="")
|
||||
p_user_get = user_sub.add_parser("get")
|
||||
p_user_get.add_argument("target")
|
||||
p_user_create = user_sub.add_parser("create")
|
||||
p_user_create.add_argument("email")
|
||||
p_user_create.add_argument("--name", default="")
|
||||
p_user_create.add_argument("--schema", default="default")
|
||||
p_user_delete = user_sub.add_parser("delete")
|
||||
p_user_delete.add_argument("target")
|
||||
p_user_recover = user_sub.add_parser("recover")
|
||||
p_user_recover.add_argument("target")
|
||||
p_user_disable = user_sub.add_parser("disable")
|
||||
p_user_disable.add_argument("target")
|
||||
p_user_enable = user_sub.add_parser("enable")
|
||||
p_user_enable.add_argument("target")
|
||||
p_user_set_pw = user_sub.add_parser("set-password")
|
||||
p_user_set_pw.add_argument("target")
|
||||
p_user_set_pw.add_argument("password")
|
||||
return parser.parse_args(argv)
|
||||
|
||||
def test_up(self):
|
||||
@@ -66,11 +93,55 @@ class TestArgParsing(unittest.TestCase):
|
||||
def test_build_proxy(self):
|
||||
args = self._parse(["build", "proxy"])
|
||||
self.assertEqual(args.what, "proxy")
|
||||
self.assertFalse(args.push)
|
||||
self.assertFalse(args.deploy)
|
||||
|
||||
def test_build_integration(self):
|
||||
args = self._parse(["build", "integration"])
|
||||
self.assertEqual(args.what, "integration")
|
||||
|
||||
def test_build_push_flag(self):
|
||||
args = self._parse(["build", "proxy", "--push"])
|
||||
self.assertTrue(args.push)
|
||||
self.assertFalse(args.deploy)
|
||||
|
||||
def test_build_deploy_flag(self):
|
||||
args = self._parse(["build", "proxy", "--deploy"])
|
||||
self.assertFalse(args.push)
|
||||
self.assertTrue(args.deploy)
|
||||
|
||||
def test_build_invalid_target(self):
|
||||
with self.assertRaises(SystemExit):
|
||||
self._parse(["build", "notavalidtarget"])
|
||||
|
||||
def test_user_set_password(self):
|
||||
args = self._parse(["user", "set-password", "admin@example.com", "hunter2"])
|
||||
self.assertEqual(args.verb, "user")
|
||||
self.assertEqual(args.user_action, "set-password")
|
||||
self.assertEqual(args.target, "admin@example.com")
|
||||
self.assertEqual(args.password, "hunter2")
|
||||
|
||||
def test_user_disable(self):
|
||||
args = self._parse(["user", "disable", "admin@example.com"])
|
||||
self.assertEqual(args.user_action, "disable")
|
||||
self.assertEqual(args.target, "admin@example.com")
|
||||
|
||||
def test_user_enable(self):
|
||||
args = self._parse(["user", "enable", "admin@example.com"])
|
||||
self.assertEqual(args.user_action, "enable")
|
||||
self.assertEqual(args.target, "admin@example.com")
|
||||
|
||||
def test_user_list_search(self):
|
||||
args = self._parse(["user", "list", "--search", "sienna"])
|
||||
self.assertEqual(args.user_action, "list")
|
||||
self.assertEqual(args.search, "sienna")
|
||||
|
||||
def test_user_create(self):
|
||||
args = self._parse(["user", "create", "x@example.com", "--name", "X Y"])
|
||||
self.assertEqual(args.user_action, "create")
|
||||
self.assertEqual(args.email, "x@example.com")
|
||||
self.assertEqual(args.name, "X Y")
|
||||
|
||||
def test_get_with_target(self):
|
||||
args = self._parse(["get", "ory/kratos-abc"])
|
||||
self.assertEqual(args.verb, "get")
|
||||
@@ -100,6 +171,24 @@ class TestArgParsing(unittest.TestCase):
|
||||
self.assertEqual(args.verb, "check")
|
||||
self.assertEqual(args.target, "lasuite/people")
|
||||
|
||||
def test_apply_no_namespace(self):
|
||||
args = self._parse(["apply"])
|
||||
self.assertEqual(args.verb, "apply")
|
||||
self.assertEqual(args.namespace, "")
|
||||
|
||||
def test_apply_with_namespace(self):
|
||||
args = self._parse(["apply", "lasuite"])
|
||||
self.assertEqual(args.verb, "apply")
|
||||
self.assertEqual(args.namespace, "lasuite")
|
||||
|
||||
def test_apply_ingress_namespace(self):
|
||||
args = self._parse(["apply", "ingress"])
|
||||
self.assertEqual(args.namespace, "ingress")
|
||||
|
||||
def test_build_meet(self):
|
||||
args = self._parse(["build", "meet"])
|
||||
self.assertEqual(args.what, "meet")
|
||||
|
||||
def test_no_args_verb_is_none(self):
|
||||
args = self._parse([])
|
||||
self.assertIsNone(args.verb)
|
||||
@@ -205,7 +294,122 @@ class TestCliDispatch(unittest.TestCase):
|
||||
cli_mod.main()
|
||||
except SystemExit:
|
||||
pass
|
||||
mock_build.assert_called_once_with("proxy")
|
||||
mock_build.assert_called_once_with("proxy", push=False, deploy=False)
|
||||
|
||||
def test_build_with_push_flag(self):
|
||||
mock_build = MagicMock()
|
||||
with patch.object(sys, "argv", ["sunbeam", "build", "integration", "--push"]):
|
||||
with patch.dict("sys.modules", {"sunbeam.images": MagicMock(cmd_build=mock_build)}):
|
||||
import importlib, sunbeam.cli as cli_mod
|
||||
importlib.reload(cli_mod)
|
||||
try:
|
||||
cli_mod.main()
|
||||
except SystemExit:
|
||||
pass
|
||||
mock_build.assert_called_once_with("integration", push=True, deploy=False)
|
||||
|
||||
def test_build_with_deploy_flag_implies_push(self):
|
||||
mock_build = MagicMock()
|
||||
with patch.object(sys, "argv", ["sunbeam", "build", "proxy", "--deploy"]):
|
||||
with patch.dict("sys.modules", {"sunbeam.images": MagicMock(cmd_build=mock_build)}):
|
||||
import importlib, sunbeam.cli as cli_mod
|
||||
importlib.reload(cli_mod)
|
||||
try:
|
||||
cli_mod.main()
|
||||
except SystemExit:
|
||||
pass
|
||||
mock_build.assert_called_once_with("proxy", push=True, deploy=True)
|
||||
|
||||
def test_user_set_password_dispatches(self):
|
||||
mock_set_pw = MagicMock()
|
||||
mock_users = MagicMock(
|
||||
cmd_user_list=MagicMock(), cmd_user_get=MagicMock(),
|
||||
cmd_user_create=MagicMock(), cmd_user_delete=MagicMock(),
|
||||
cmd_user_recover=MagicMock(), cmd_user_disable=MagicMock(),
|
||||
cmd_user_enable=MagicMock(), cmd_user_set_password=mock_set_pw,
|
||||
)
|
||||
with patch.object(sys, "argv", ["sunbeam", "user", "set-password",
|
||||
"admin@sunbeam.pt", "s3cr3t"]):
|
||||
with patch.dict("sys.modules", {"sunbeam.users": mock_users}):
|
||||
import importlib, sunbeam.cli as cli_mod
|
||||
importlib.reload(cli_mod)
|
||||
try:
|
||||
cli_mod.main()
|
||||
except SystemExit:
|
||||
pass
|
||||
mock_set_pw.assert_called_once_with("admin@sunbeam.pt", "s3cr3t")
|
||||
|
||||
def test_user_disable_dispatches(self):
|
||||
mock_disable = MagicMock()
|
||||
mock_users = MagicMock(
|
||||
cmd_user_list=MagicMock(), cmd_user_get=MagicMock(),
|
||||
cmd_user_create=MagicMock(), cmd_user_delete=MagicMock(),
|
||||
cmd_user_recover=MagicMock(), cmd_user_disable=mock_disable,
|
||||
cmd_user_enable=MagicMock(), cmd_user_set_password=MagicMock(),
|
||||
)
|
||||
with patch.object(sys, "argv", ["sunbeam", "user", "disable", "x@sunbeam.pt"]):
|
||||
with patch.dict("sys.modules", {"sunbeam.users": mock_users}):
|
||||
import importlib, sunbeam.cli as cli_mod
|
||||
importlib.reload(cli_mod)
|
||||
try:
|
||||
cli_mod.main()
|
||||
except SystemExit:
|
||||
pass
|
||||
mock_disable.assert_called_once_with("x@sunbeam.pt")
|
||||
|
||||
def test_user_enable_dispatches(self):
|
||||
mock_enable = MagicMock()
|
||||
mock_users = MagicMock(
|
||||
cmd_user_list=MagicMock(), cmd_user_get=MagicMock(),
|
||||
cmd_user_create=MagicMock(), cmd_user_delete=MagicMock(),
|
||||
cmd_user_recover=MagicMock(), cmd_user_disable=MagicMock(),
|
||||
cmd_user_enable=mock_enable, cmd_user_set_password=MagicMock(),
|
||||
)
|
||||
with patch.object(sys, "argv", ["sunbeam", "user", "enable", "x@sunbeam.pt"]):
|
||||
with patch.dict("sys.modules", {"sunbeam.users": mock_users}):
|
||||
import importlib, sunbeam.cli as cli_mod
|
||||
importlib.reload(cli_mod)
|
||||
try:
|
||||
cli_mod.main()
|
||||
except SystemExit:
|
||||
pass
|
||||
mock_enable.assert_called_once_with("x@sunbeam.pt")
|
||||
|
||||
def test_apply_full_dispatches_without_namespace(self):
|
||||
mock_apply = MagicMock()
|
||||
with patch.object(sys, "argv", ["sunbeam", "apply"]):
|
||||
with patch.dict("sys.modules", {"sunbeam.manifests": MagicMock(cmd_apply=mock_apply)}):
|
||||
import importlib, sunbeam.cli as cli_mod
|
||||
importlib.reload(cli_mod)
|
||||
try:
|
||||
cli_mod.main()
|
||||
except SystemExit:
|
||||
pass
|
||||
mock_apply.assert_called_once_with(env="local", domain="", email="", namespace="")
|
||||
|
||||
def test_apply_partial_passes_namespace(self):
|
||||
mock_apply = MagicMock()
|
||||
with patch.object(sys, "argv", ["sunbeam", "apply", "lasuite"]):
|
||||
with patch.dict("sys.modules", {"sunbeam.manifests": MagicMock(cmd_apply=mock_apply)}):
|
||||
import importlib, sunbeam.cli as cli_mod
|
||||
importlib.reload(cli_mod)
|
||||
try:
|
||||
cli_mod.main()
|
||||
except SystemExit:
|
||||
pass
|
||||
mock_apply.assert_called_once_with(env="local", domain="", email="", namespace="lasuite")
|
||||
|
||||
def test_build_meet_dispatches(self):
|
||||
mock_build = MagicMock()
|
||||
with patch.object(sys, "argv", ["sunbeam", "build", "meet"]):
|
||||
with patch.dict("sys.modules", {"sunbeam.images": MagicMock(cmd_build=mock_build)}):
|
||||
import importlib, sunbeam.cli as cli_mod
|
||||
importlib.reload(cli_mod)
|
||||
try:
|
||||
cli_mod.main()
|
||||
except SystemExit:
|
||||
pass
|
||||
mock_build.assert_called_once_with("meet", push=False, deploy=False)
|
||||
|
||||
def test_check_no_target(self):
|
||||
mock_check = MagicMock()
|
||||
|
||||
99
sunbeam/tests/test_manifests.py
Normal file
99
sunbeam/tests/test_manifests.py
Normal file
@@ -0,0 +1,99 @@
|
||||
"""Tests for manifests.py — primarily _filter_by_namespace."""
|
||||
import unittest
|
||||
|
||||
from sunbeam.manifests import _filter_by_namespace
|
||||
|
||||
|
||||
MULTI_DOC = """\
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: ConfigMap
|
||||
metadata:
|
||||
name: meet-config
|
||||
namespace: lasuite
|
||||
data:
|
||||
FOO: bar
|
||||
---
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: meet-backend
|
||||
namespace: lasuite
|
||||
spec:
|
||||
replicas: 1
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: Namespace
|
||||
metadata:
|
||||
name: lasuite
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: ConfigMap
|
||||
metadata:
|
||||
name: pingora-config
|
||||
namespace: ingress
|
||||
data:
|
||||
config.toml: |
|
||||
hello
|
||||
---
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
metadata:
|
||||
name: pingora
|
||||
namespace: ingress
|
||||
spec:
|
||||
replicas: 1
|
||||
"""
|
||||
|
||||
|
||||
class TestFilterByNamespace(unittest.TestCase):
|
||||
|
||||
def test_keeps_matching_namespace(self):
|
||||
result = _filter_by_namespace(MULTI_DOC, "lasuite")
|
||||
self.assertIn("name: meet-config", result)
|
||||
self.assertIn("name: meet-backend", result)
|
||||
|
||||
def test_excludes_other_namespaces(self):
|
||||
result = _filter_by_namespace(MULTI_DOC, "lasuite")
|
||||
self.assertNotIn("namespace: ingress", result)
|
||||
self.assertNotIn("name: pingora-config", result)
|
||||
self.assertNotIn("name: pingora", result)
|
||||
|
||||
def test_includes_namespace_resource_itself(self):
|
||||
result = _filter_by_namespace(MULTI_DOC, "lasuite")
|
||||
self.assertIn("kind: Namespace", result)
|
||||
|
||||
def test_ingress_filter(self):
|
||||
result = _filter_by_namespace(MULTI_DOC, "ingress")
|
||||
self.assertIn("name: pingora-config", result)
|
||||
self.assertIn("name: pingora", result)
|
||||
self.assertNotIn("namespace: lasuite", result)
|
||||
|
||||
def test_unknown_namespace_returns_empty(self):
|
||||
result = _filter_by_namespace(MULTI_DOC, "nonexistent")
|
||||
self.assertEqual(result.strip(), "")
|
||||
|
||||
def test_empty_input_returns_empty(self):
|
||||
result = _filter_by_namespace("", "lasuite")
|
||||
self.assertEqual(result.strip(), "")
|
||||
|
||||
def test_result_is_valid_multidoc_yaml(self):
|
||||
# Each non-empty doc in the result should start with '---'
|
||||
result = _filter_by_namespace(MULTI_DOC, "lasuite")
|
||||
self.assertTrue(result.startswith("---"))
|
||||
|
||||
def test_does_not_include_namespace_resource_for_wrong_ns(self):
|
||||
# The lasuite Namespace CR should NOT appear in an ingress-filtered result
|
||||
result = _filter_by_namespace(MULTI_DOC, "ingress")
|
||||
# There's no ingress Namespace CR in the fixture, so kind: Namespace should be absent
|
||||
self.assertNotIn("kind: Namespace", result)
|
||||
|
||||
def test_single_doc_matching(self):
|
||||
doc = "apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: x\n namespace: ory\n"
|
||||
result = _filter_by_namespace(doc, "ory")
|
||||
self.assertIn("name: x", result)
|
||||
|
||||
def test_single_doc_not_matching(self):
|
||||
doc = "apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: x\n namespace: ory\n"
|
||||
result = _filter_by_namespace(doc, "lasuite")
|
||||
self.assertEqual(result.strip(), "")
|
||||
Reference in New Issue
Block a user