diff --git a/sunbeam/cli.py b/sunbeam/cli.py index 8b75461..e6d94dc 100644 --- a/sunbeam/cli.py +++ b/sunbeam/cli.py @@ -3,11 +3,33 @@ import argparse import sys +ENV_CONTEXTS = { + "local": "sunbeam", + "production": "production", +} + + def main() -> None: parser = argparse.ArgumentParser( prog="sunbeam", description="Sunbeam local dev stack manager", ) + parser.add_argument( + "--env", choices=["local", "production"], default="local", + help="Target environment (default: local)", + ) + parser.add_argument( + "--context", default=None, + help="kubectl context override (default: sunbeam for local, default for production)", + ) + parser.add_argument( + "--domain", default="", + help="Domain suffix for production deploys (e.g. sunbeam.pt)", + ) + parser.add_argument( + "--email", default="", + help="ACME email for cert-manager (e.g. ops@sunbeam.pt)", + ) sub = parser.add_subparsers(dest="verb", metavar="verb") # sunbeam up @@ -21,8 +43,12 @@ def main() -> None: p_status.add_argument("target", nargs="?", default=None, help="namespace or namespace/name") - # sunbeam apply - sub.add_parser("apply", help="kustomize build + domain subst + kubectl apply") + # sunbeam apply [namespace] + p_apply = sub.add_parser("apply", help="kustomize build + domain subst + kubectl apply") + p_apply.add_argument("namespace", nargs="?", default="", + help="Limit apply to one namespace (e.g. lasuite, ingress, ory)") + p_apply.add_argument("--domain", default="", help="Domain suffix (e.g. sunbeam.pt)") + p_apply.add_argument("--email", default="", help="ACME email for cert-manager") # sunbeam seed sub.add_parser("seed", help="Generate/store all credentials in OpenBao") @@ -48,10 +74,16 @@ def main() -> None: p_restart.add_argument("target", nargs="?", default=None, help="namespace or namespace/name") - # sunbeam build - p_build = sub.add_parser("build", help="Build and push an artifact") - p_build.add_argument("what", choices=["proxy", "integration", "kratos-admin"], - help="What to build (proxy, integration, kratos-admin)") + # sunbeam build [--push] [--deploy] + p_build = sub.add_parser("build", help="Build an artifact (add --push to push, --deploy to apply+rollout)") + p_build.add_argument("what", + choices=["proxy", "integration", "kratos-admin", "meet", + "docs-frontend", "people-frontend"], + help="What to build") + p_build.add_argument("--push", action="store_true", + help="Push image to registry after building") + p_build.add_argument("--deploy", action="store_true", + help="Apply manifests and rollout restart after pushing (implies --push)") # sunbeam check [ns[/name]] p_check = sub.add_parser("check", help="Functional service health checks") @@ -101,8 +133,26 @@ def main() -> None: p_user_enable = user_sub.add_parser("enable", help="Re-enable a disabled identity") p_user_enable.add_argument("target", help="Email or identity ID") + p_user_set_pw = user_sub.add_parser("set-password", help="Set password for an identity") + p_user_set_pw.add_argument("target", help="Email or identity ID") + p_user_set_pw.add_argument("password", help="New password") + args = parser.parse_args() + # Set kubectl context before any kube calls. + # For production, also register the SSH host so the tunnel is opened on demand. + # SUNBEAM_SSH_HOST env var: e.g. "user@server.example.com" or just "server.example.com" + import os + from sunbeam.kube import set_context + ctx = args.context or ENV_CONTEXTS.get(args.env, "sunbeam") + ssh_host = "" + if args.env == "production": + ssh_host = os.environ.get("SUNBEAM_SSH_HOST", "") + if not ssh_host: + from sunbeam.output import die + die("SUNBEAM_SSH_HOST must be set for --env production (e.g. user@your-server.example.com)") + set_context(ctx, ssh_host=ssh_host) + if args.verb is None: parser.print_help() sys.exit(0) @@ -122,7 +172,11 @@ def main() -> None: elif args.verb == "apply": from sunbeam.manifests import cmd_apply - cmd_apply() + # --domain/--email can appear before OR after the verb; subparser wins if both set. + domain = getattr(args, "domain", "") or "" + email = getattr(args, "email", "") or "" + namespace = getattr(args, "namespace", "") or "" + cmd_apply(env=args.env, domain=domain, email=email, namespace=namespace) elif args.verb == "seed": from sunbeam.secrets import cmd_seed @@ -146,7 +200,8 @@ def main() -> None: elif args.verb == "build": from sunbeam.images import cmd_build - cmd_build(args.what) + push = args.push or args.deploy + cmd_build(args.what, push=push, deploy=args.deploy) elif args.verb == "check": from sunbeam.checks import cmd_check @@ -171,7 +226,8 @@ def main() -> None: elif args.verb == "user": from sunbeam.users import (cmd_user_list, cmd_user_get, cmd_user_create, cmd_user_delete, cmd_user_recover, - cmd_user_disable, cmd_user_enable) + cmd_user_disable, cmd_user_enable, + cmd_user_set_password) action = getattr(args, "user_action", None) if action is None: p_user.print_help() @@ -190,6 +246,8 @@ def main() -> None: cmd_user_disable(args.target) elif action == "enable": cmd_user_enable(args.target) + elif action == "set-password": + cmd_user_set_password(args.target, args.password) else: parser.print_help() diff --git a/sunbeam/manifests.py b/sunbeam/manifests.py index 5d8ca66..983940d 100644 --- a/sunbeam/manifests.py +++ b/sunbeam/manifests.py @@ -1,23 +1,27 @@ """Manifest build + apply — kustomize overlay with domain substitution.""" +import time from pathlib import Path -from sunbeam.kube import kube, kube_out, kube_ok, kube_apply, kustomize_build, get_lima_ip +from sunbeam.kube import kube, kube_out, kube_ok, kube_apply, kustomize_build, get_lima_ip, get_domain from sunbeam.output import step, ok, warn REPO_ROOT = Path(__file__).parents[2] / "infrastructure" -MANAGED_NS = ["data", "devtools", "ingress", "lasuite", "media", "ory", "storage", - "vault-secrets-operator"] +MANAGED_NS = ["data", "devtools", "ingress", "lasuite", "media", "monitoring", "ory", + "storage", "vault-secrets-operator"] -def pre_apply_cleanup(): +def pre_apply_cleanup(namespaces=None): """Delete immutable resources that must be re-created on each apply. Also prunes VaultStaticSecrets that share a name with a VaultDynamicSecret -- kubectl apply doesn't delete the old resource when a manifest switches kinds, and VSO refuses to overwrite a secret owned by a different resource type. + + namespaces: if given, only clean those namespaces; otherwise clean all MANAGED_NS. """ + ns_list = namespaces if namespaces is not None else MANAGED_NS ok("Cleaning up immutable Jobs and test Pods...") - for ns in MANAGED_NS: + for ns in ns_list: kube("delete", "jobs", "--all", "-n", ns, "--ignore-not-found", check=False) # Query all pods (no phase filter) — CrashLoopBackOff pods report phase=Running # so filtering on phase!=Running would silently skip them. @@ -32,7 +36,7 @@ def pre_apply_cleanup(): # the old VSS; it just creates the new VDS alongside it. VSO then errors # "not the owner" because the K8s secret's ownerRef still points to the VSS. ok("Pruning stale VaultStaticSecrets superseded by VaultDynamicSecrets...") - for ns in MANAGED_NS: + for ns in ns_list: vss_names = set(kube_out( "get", "vaultstaticsecret", "-n", ns, "-o=jsonpath={.items[*].metadata.name}", "--ignore-not-found", @@ -87,14 +91,120 @@ def _restart_for_changed_configmaps(before: dict, after: dict): kube("rollout", "restart", f"deployment/{dep}", "-n", ns, check=False) -def cmd_apply(): - """Get Lima IP, build domain, kustomize_build, kube_apply.""" - ip = get_lima_ip() - domain = f"{ip}.sslip.io" - step(f"Applying manifests (domain: {domain})...") - pre_apply_cleanup() +def _wait_for_webhook(ns: str, svc: str, timeout: int = 120) -> bool: + """Poll until a webhook service endpoint exists (signals webhook is ready). + + Returns True if the webhook is ready within timeout seconds. + """ + ok(f"Waiting for {ns}/{svc} webhook (up to {timeout}s)...") + deadline = time.time() + timeout + while time.time() < deadline: + eps = kube_out("get", "endpoints", svc, "-n", ns, + "-o=jsonpath={.subsets[0].addresses[0].ip}", "--ignore-not-found") + if eps: + ok(f" {ns}/{svc} ready.") + return True + time.sleep(3) + warn(f" {ns}/{svc} not ready after {timeout}s — continuing anyway.") + return False + + +def _apply_mkcert_ca_configmap(): + """Create/update gitea-mkcert-ca ConfigMap from the local mkcert root CA. + + Only called in local env. The ConfigMap is mounted into Gitea so Go's TLS + stack trusts the mkcert wildcard cert when making server-side OIDC calls. + """ + import subprocess, json + from pathlib import Path + caroot = subprocess.run(["mkcert", "-CAROOT"], capture_output=True, text=True).stdout.strip() + if not caroot: + warn("mkcert not found — skipping gitea-mkcert-ca ConfigMap.") + return + ca_pem = Path(caroot) / "rootCA.pem" + if not ca_pem.exists(): + warn(f"mkcert root CA not found at {ca_pem} — skipping.") + return + cm = json.dumps({ + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": {"name": "gitea-mkcert-ca", "namespace": "devtools"}, + "data": {"ca.crt": ca_pem.read_text()}, + }) + kube("apply", "--server-side", "-f", "-", input=cm) + ok("gitea-mkcert-ca ConfigMap applied.") + + +def _filter_by_namespace(manifests: str, namespace: str) -> str: + """Return only the YAML documents that belong to the given namespace. + + Also includes the Namespace resource itself (safe to re-apply). + Uses simple string matching — namespace always appears as 'namespace: ' + in top-level metadata, so this is reliable without a full YAML parser. + """ + kept = [] + for doc in manifests.split("\n---"): + doc = doc.strip() + if not doc: + continue + if f"namespace: {namespace}" in doc: + kept.append(doc) + elif "kind: Namespace" in doc and f"name: {namespace}" in doc: + kept.append(doc) + if not kept: + return "" + return "---\n" + "\n---\n".join(kept) + "\n" + + +def cmd_apply(env: str = "local", domain: str = "", email: str = "", namespace: str = ""): + """Build kustomize overlay for env, substitute domain/email, kubectl apply. + + Runs a second convergence pass if cert-manager is present in the overlay — + cert-manager registers a ValidatingWebhook that must be running before + ClusterIssuer / Certificate resources can be created. + """ + if env == "production": + if not domain: + # Try to discover domain from running cluster + domain = get_domain() + if not domain: + from sunbeam.output import die + die("--domain is required for production apply on first deploy") + overlay = REPO_ROOT / "overlays" / "production" + else: + ip = get_lima_ip() + domain = f"{ip}.sslip.io" + overlay = REPO_ROOT / "overlays" / "local" + + scope = f" [{namespace}]" if namespace else "" + step(f"Applying manifests (env: {env}, domain: {domain}){scope}...") + if env == "local": + _apply_mkcert_ca_configmap() + ns_list = [namespace] if namespace else None + pre_apply_cleanup(namespaces=ns_list) before = _snapshot_configmaps() - manifests = kustomize_build(REPO_ROOT / "overlays" / "local", domain) - kube("apply", "--server-side", "--force-conflicts", "-f", "-", input=manifests) + manifests = kustomize_build(overlay, domain, email=email) + + if namespace: + manifests = _filter_by_namespace(manifests, namespace) + if not manifests.strip(): + warn(f"No resources found for namespace '{namespace}' — check the name and try again.") + return + + # First pass: may emit errors for resources that depend on webhooks not yet running + # (e.g. cert-manager ClusterIssuer/Certificate), which is expected on first deploy. + kube("apply", "--server-side", "--force-conflicts", "-f", "-", + input=manifests, check=False) + + # If cert-manager is in the overlay, wait for its webhook then re-apply + # so that ClusterIssuer and Certificate resources converge. + # Skip for partial applies unless the target IS cert-manager. + cert_manager_present = (overlay / "../../base/cert-manager").resolve().exists() + if cert_manager_present and not namespace: + if _wait_for_webhook("cert-manager", "cert-manager-webhook", timeout=120): + ok("Running convergence pass for cert-manager resources...") + manifests2 = kustomize_build(overlay, domain, email=email) + kube("apply", "--server-side", "--force-conflicts", "-f", "-", input=manifests2) + _restart_for_changed_configmaps(before, _snapshot_configmaps()) ok("Applied.") diff --git a/sunbeam/tests/test_cli.py b/sunbeam/tests/test_cli.py index b714081..5a42e1c 100644 --- a/sunbeam/tests/test_cli.py +++ b/sunbeam/tests/test_cli.py @@ -16,7 +16,10 @@ class TestArgParsing(unittest.TestCase): sub.add_parser("down") p_status = sub.add_parser("status") p_status.add_argument("target", nargs="?", default=None) - sub.add_parser("apply") + p_apply = sub.add_parser("apply") + p_apply.add_argument("namespace", nargs="?", default="") + p_apply.add_argument("--domain", default="") + p_apply.add_argument("--email", default="") sub.add_parser("seed") sub.add_parser("verify") p_logs = sub.add_parser("logs") @@ -28,11 +31,35 @@ class TestArgParsing(unittest.TestCase): p_restart = sub.add_parser("restart") p_restart.add_argument("target", nargs="?", default=None) p_build = sub.add_parser("build") - p_build.add_argument("what", choices=["proxy"]) + p_build.add_argument("what", choices=["proxy", "integration", "kratos-admin", "meet", + "docs-frontend", "people-frontend"]) + p_build.add_argument("--push", action="store_true") + p_build.add_argument("--deploy", action="store_true") sub.add_parser("mirror") sub.add_parser("bootstrap") p_check = sub.add_parser("check") p_check.add_argument("target", nargs="?", default=None) + p_user = sub.add_parser("user") + user_sub = p_user.add_subparsers(dest="user_action") + p_user_list = user_sub.add_parser("list") + p_user_list.add_argument("--search", default="") + p_user_get = user_sub.add_parser("get") + p_user_get.add_argument("target") + p_user_create = user_sub.add_parser("create") + p_user_create.add_argument("email") + p_user_create.add_argument("--name", default="") + p_user_create.add_argument("--schema", default="default") + p_user_delete = user_sub.add_parser("delete") + p_user_delete.add_argument("target") + p_user_recover = user_sub.add_parser("recover") + p_user_recover.add_argument("target") + p_user_disable = user_sub.add_parser("disable") + p_user_disable.add_argument("target") + p_user_enable = user_sub.add_parser("enable") + p_user_enable.add_argument("target") + p_user_set_pw = user_sub.add_parser("set-password") + p_user_set_pw.add_argument("target") + p_user_set_pw.add_argument("password") return parser.parse_args(argv) def test_up(self): @@ -66,11 +93,55 @@ class TestArgParsing(unittest.TestCase): def test_build_proxy(self): args = self._parse(["build", "proxy"]) self.assertEqual(args.what, "proxy") + self.assertFalse(args.push) + self.assertFalse(args.deploy) + + def test_build_integration(self): + args = self._parse(["build", "integration"]) + self.assertEqual(args.what, "integration") + + def test_build_push_flag(self): + args = self._parse(["build", "proxy", "--push"]) + self.assertTrue(args.push) + self.assertFalse(args.deploy) + + def test_build_deploy_flag(self): + args = self._parse(["build", "proxy", "--deploy"]) + self.assertFalse(args.push) + self.assertTrue(args.deploy) def test_build_invalid_target(self): with self.assertRaises(SystemExit): self._parse(["build", "notavalidtarget"]) + def test_user_set_password(self): + args = self._parse(["user", "set-password", "admin@example.com", "hunter2"]) + self.assertEqual(args.verb, "user") + self.assertEqual(args.user_action, "set-password") + self.assertEqual(args.target, "admin@example.com") + self.assertEqual(args.password, "hunter2") + + def test_user_disable(self): + args = self._parse(["user", "disable", "admin@example.com"]) + self.assertEqual(args.user_action, "disable") + self.assertEqual(args.target, "admin@example.com") + + def test_user_enable(self): + args = self._parse(["user", "enable", "admin@example.com"]) + self.assertEqual(args.user_action, "enable") + self.assertEqual(args.target, "admin@example.com") + + def test_user_list_search(self): + args = self._parse(["user", "list", "--search", "sienna"]) + self.assertEqual(args.user_action, "list") + self.assertEqual(args.search, "sienna") + + def test_user_create(self): + args = self._parse(["user", "create", "x@example.com", "--name", "X Y"]) + self.assertEqual(args.user_action, "create") + self.assertEqual(args.email, "x@example.com") + self.assertEqual(args.name, "X Y") + def test_get_with_target(self): args = self._parse(["get", "ory/kratos-abc"]) self.assertEqual(args.verb, "get") @@ -100,6 +171,24 @@ class TestArgParsing(unittest.TestCase): self.assertEqual(args.verb, "check") self.assertEqual(args.target, "lasuite/people") + def test_apply_no_namespace(self): + args = self._parse(["apply"]) + self.assertEqual(args.verb, "apply") + self.assertEqual(args.namespace, "") + + def test_apply_with_namespace(self): + args = self._parse(["apply", "lasuite"]) + self.assertEqual(args.verb, "apply") + self.assertEqual(args.namespace, "lasuite") + + def test_apply_ingress_namespace(self): + args = self._parse(["apply", "ingress"]) + self.assertEqual(args.namespace, "ingress") + + def test_build_meet(self): + args = self._parse(["build", "meet"]) + self.assertEqual(args.what, "meet") + def test_no_args_verb_is_none(self): args = self._parse([]) self.assertIsNone(args.verb) @@ -205,7 +294,122 @@ class TestCliDispatch(unittest.TestCase): cli_mod.main() except SystemExit: pass - mock_build.assert_called_once_with("proxy") + mock_build.assert_called_once_with("proxy", push=False, deploy=False) + + def test_build_with_push_flag(self): + mock_build = MagicMock() + with patch.object(sys, "argv", ["sunbeam", "build", "integration", "--push"]): + with patch.dict("sys.modules", {"sunbeam.images": MagicMock(cmd_build=mock_build)}): + import importlib, sunbeam.cli as cli_mod + importlib.reload(cli_mod) + try: + cli_mod.main() + except SystemExit: + pass + mock_build.assert_called_once_with("integration", push=True, deploy=False) + + def test_build_with_deploy_flag_implies_push(self): + mock_build = MagicMock() + with patch.object(sys, "argv", ["sunbeam", "build", "proxy", "--deploy"]): + with patch.dict("sys.modules", {"sunbeam.images": MagicMock(cmd_build=mock_build)}): + import importlib, sunbeam.cli as cli_mod + importlib.reload(cli_mod) + try: + cli_mod.main() + except SystemExit: + pass + mock_build.assert_called_once_with("proxy", push=True, deploy=True) + + def test_user_set_password_dispatches(self): + mock_set_pw = MagicMock() + mock_users = MagicMock( + cmd_user_list=MagicMock(), cmd_user_get=MagicMock(), + cmd_user_create=MagicMock(), cmd_user_delete=MagicMock(), + cmd_user_recover=MagicMock(), cmd_user_disable=MagicMock(), + cmd_user_enable=MagicMock(), cmd_user_set_password=mock_set_pw, + ) + with patch.object(sys, "argv", ["sunbeam", "user", "set-password", + "admin@sunbeam.pt", "s3cr3t"]): + with patch.dict("sys.modules", {"sunbeam.users": mock_users}): + import importlib, sunbeam.cli as cli_mod + importlib.reload(cli_mod) + try: + cli_mod.main() + except SystemExit: + pass + mock_set_pw.assert_called_once_with("admin@sunbeam.pt", "s3cr3t") + + def test_user_disable_dispatches(self): + mock_disable = MagicMock() + mock_users = MagicMock( + cmd_user_list=MagicMock(), cmd_user_get=MagicMock(), + cmd_user_create=MagicMock(), cmd_user_delete=MagicMock(), + cmd_user_recover=MagicMock(), cmd_user_disable=mock_disable, + cmd_user_enable=MagicMock(), cmd_user_set_password=MagicMock(), + ) + with patch.object(sys, "argv", ["sunbeam", "user", "disable", "x@sunbeam.pt"]): + with patch.dict("sys.modules", {"sunbeam.users": mock_users}): + import importlib, sunbeam.cli as cli_mod + importlib.reload(cli_mod) + try: + cli_mod.main() + except SystemExit: + pass + mock_disable.assert_called_once_with("x@sunbeam.pt") + + def test_user_enable_dispatches(self): + mock_enable = MagicMock() + mock_users = MagicMock( + cmd_user_list=MagicMock(), cmd_user_get=MagicMock(), + cmd_user_create=MagicMock(), cmd_user_delete=MagicMock(), + cmd_user_recover=MagicMock(), cmd_user_disable=MagicMock(), + cmd_user_enable=mock_enable, cmd_user_set_password=MagicMock(), + ) + with patch.object(sys, "argv", ["sunbeam", "user", "enable", "x@sunbeam.pt"]): + with patch.dict("sys.modules", {"sunbeam.users": mock_users}): + import importlib, sunbeam.cli as cli_mod + importlib.reload(cli_mod) + try: + cli_mod.main() + except SystemExit: + pass + mock_enable.assert_called_once_with("x@sunbeam.pt") + + def test_apply_full_dispatches_without_namespace(self): + mock_apply = MagicMock() + with patch.object(sys, "argv", ["sunbeam", "apply"]): + with patch.dict("sys.modules", {"sunbeam.manifests": MagicMock(cmd_apply=mock_apply)}): + import importlib, sunbeam.cli as cli_mod + importlib.reload(cli_mod) + try: + cli_mod.main() + except SystemExit: + pass + mock_apply.assert_called_once_with(env="local", domain="", email="", namespace="") + + def test_apply_partial_passes_namespace(self): + mock_apply = MagicMock() + with patch.object(sys, "argv", ["sunbeam", "apply", "lasuite"]): + with patch.dict("sys.modules", {"sunbeam.manifests": MagicMock(cmd_apply=mock_apply)}): + import importlib, sunbeam.cli as cli_mod + importlib.reload(cli_mod) + try: + cli_mod.main() + except SystemExit: + pass + mock_apply.assert_called_once_with(env="local", domain="", email="", namespace="lasuite") + + def test_build_meet_dispatches(self): + mock_build = MagicMock() + with patch.object(sys, "argv", ["sunbeam", "build", "meet"]): + with patch.dict("sys.modules", {"sunbeam.images": MagicMock(cmd_build=mock_build)}): + import importlib, sunbeam.cli as cli_mod + importlib.reload(cli_mod) + try: + cli_mod.main() + except SystemExit: + pass + mock_build.assert_called_once_with("meet", push=False, deploy=False) def test_check_no_target(self): mock_check = MagicMock() diff --git a/sunbeam/tests/test_manifests.py b/sunbeam/tests/test_manifests.py new file mode 100644 index 0000000..0048ade --- /dev/null +++ b/sunbeam/tests/test_manifests.py @@ -0,0 +1,99 @@ +"""Tests for manifests.py — primarily _filter_by_namespace.""" +import unittest + +from sunbeam.manifests import _filter_by_namespace + + +MULTI_DOC = """\ +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: meet-config + namespace: lasuite +data: + FOO: bar +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: meet-backend + namespace: lasuite +spec: + replicas: 1 +--- +apiVersion: v1 +kind: Namespace +metadata: + name: lasuite +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: pingora-config + namespace: ingress +data: + config.toml: | + hello +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: pingora + namespace: ingress +spec: + replicas: 1 +""" + + +class TestFilterByNamespace(unittest.TestCase): + + def test_keeps_matching_namespace(self): + result = _filter_by_namespace(MULTI_DOC, "lasuite") + self.assertIn("name: meet-config", result) + self.assertIn("name: meet-backend", result) + + def test_excludes_other_namespaces(self): + result = _filter_by_namespace(MULTI_DOC, "lasuite") + self.assertNotIn("namespace: ingress", result) + self.assertNotIn("name: pingora-config", result) + self.assertNotIn("name: pingora", result) + + def test_includes_namespace_resource_itself(self): + result = _filter_by_namespace(MULTI_DOC, "lasuite") + self.assertIn("kind: Namespace", result) + + def test_ingress_filter(self): + result = _filter_by_namespace(MULTI_DOC, "ingress") + self.assertIn("name: pingora-config", result) + self.assertIn("name: pingora", result) + self.assertNotIn("namespace: lasuite", result) + + def test_unknown_namespace_returns_empty(self): + result = _filter_by_namespace(MULTI_DOC, "nonexistent") + self.assertEqual(result.strip(), "") + + def test_empty_input_returns_empty(self): + result = _filter_by_namespace("", "lasuite") + self.assertEqual(result.strip(), "") + + def test_result_is_valid_multidoc_yaml(self): + # Each non-empty doc in the result should start with '---' + result = _filter_by_namespace(MULTI_DOC, "lasuite") + self.assertTrue(result.startswith("---")) + + def test_does_not_include_namespace_resource_for_wrong_ns(self): + # The lasuite Namespace CR should NOT appear in an ingress-filtered result + result = _filter_by_namespace(MULTI_DOC, "ingress") + # There's no ingress Namespace CR in the fixture, so kind: Namespace should be absent + self.assertNotIn("kind: Namespace", result) + + def test_single_doc_matching(self): + doc = "apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: x\n namespace: ory\n" + result = _filter_by_namespace(doc, "ory") + self.assertIn("name: x", result) + + def test_single_doc_not_matching(self): + doc = "apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: x\n namespace: ory\n" + result = _filter_by_namespace(doc, "lasuite") + self.assertEqual(result.strip(), "")