#!/usr/bin/env python3 """ sunbeam.py — Sunbeam local dev stack lifecycle manager. Idempotent: safe to run from any state (fresh Mac, existing VM, partial deploy). Consolidates local-up.sh + local-seed-secrets.sh into one place. Usage: ./scripts/sunbeam.py # full stack bring-up ./scripts/sunbeam.py --apply # re-apply manifests + mirror images ./scripts/sunbeam.py --seed # re-seed secrets only ./scripts/sunbeam.py --gitea # bootstrap Gitea orgs + mirror amd64 images ./scripts/sunbeam.py --restart # restart services only ./scripts/sunbeam.py --status # show pod health across all namespaces ./scripts/sunbeam.py --verify # E2E test VSO → OpenBao integration ./scripts/sunbeam.py --build # Build + push sunbeam-proxy to Gitea; redeploy Requires: limactl mkcert kubectl kustomize linkerd jq yq """ import argparse import base64 import json import os import secrets as _secrets import shutil import subprocess import sys import time from pathlib import Path # ── Paths ───────────────────────────────────────────────────────────────────── SCRIPT_DIR = Path(__file__).parent.resolve() REPO_ROOT = SCRIPT_DIR.parent SECRETS_DIR = REPO_ROOT / "secrets" / "local" PROXY_DIR = REPO_ROOT.parent / "proxy" # ── Config ──────────────────────────────────────────────────────────────────── LIMA_VM = "sunbeam" K8S_CTX = ["--context=sunbeam"] # Gitea admin username (not secret; password is generated and stored in OpenBao) GITEA_ADMIN_USER = "gitea_admin" GITEA_ADMIN_EMAIL = "gitea@local.domain" # Images that only ship linux/amd64 builds — patched + mirrored to our Gitea registry. # Rosetta runs the amd64 binaries on arm64, but the CRI refuses to pull arm64-absent images. # Format: (source_ref, gitea_org, gitea_repo, tag) AMD64_ONLY_IMAGES = [ ("docker.io/lasuite/people-backend:latest", "studio", "people-backend", "latest"), ("docker.io/lasuite/people-frontend:latest", "studio", "people-frontend", "latest"), ] REQUIRED_TOOLS = ["limactl", "mkcert", "kubectl", "kustomize", "linkerd", "jq", "yq"] PG_USERS = [ "kratos", "hydra", "gitea", "hive", "docs", "meet", "drive", "messages", "conversations", "people", "find", ] SERVICES_TO_RESTART = [ ("ory", "hydra"), ("ory", "kratos"), ("ory", "login-ui"), ("devtools", "gitea"), ("storage", "seaweedfs-filer"), ("lasuite", "hive"), ("lasuite", "people-backend"), ("lasuite", "people-frontend"), ("lasuite", "people-celery-worker"), ("lasuite", "people-celery-beat"), ("media", "livekit-server"), ] # ── Output ──────────────────────────────────────────────────────────────────── def step(msg): print(f"\n==> {msg}", flush=True) def ok(msg): print(f" {msg}", flush=True) def warn(msg): print(f" WARN: {msg}", file=sys.stderr, flush=True) def die(msg): print(f"\nERROR: {msg}", file=sys.stderr) sys.exit(1) # ── Subprocess helpers ──────────────────────────────────────────────────────── def run(cmd, *, check=True, input=None, capture=False, cwd=None): text = not isinstance(input, bytes) return subprocess.run(cmd, check=check, text=text, input=input, capture_output=capture, cwd=cwd) def capture_out(cmd, *, default=""): r = subprocess.run(cmd, capture_output=True, text=True) return r.stdout.strip() if r.returncode == 0 else default def succeeds(cmd): return subprocess.run(cmd, capture_output=True).returncode == 0 # ── kubectl wrappers ────────────────────────────────────────────────────────── def kube(*args, input=None, check=True): return run(["kubectl", *K8S_CTX, *args], input=input, check=check) def kube_out(*args): return capture_out(["kubectl", *K8S_CTX, *args]) def kube_ok(*args): return succeeds(["kubectl", *K8S_CTX, *args]) def kube_apply(manifest, *, server_side=True): args = ["apply", "-f", "-"] if server_side: args += ["--server-side", "--force-conflicts"] kube(*args, input=manifest) def ns_exists(ns): return kube_ok("get", "namespace", ns) def ensure_ns(ns): manifest = kube_out("create", "namespace", ns, "--dry-run=client", "-o=yaml") if manifest: kube_apply(manifest) def create_secret(ns, name, **literals): """Create or update a generic secret idempotently. Uses --field-manager=sunbeam so kustomize apply (manager=kubectl) never overwrites data fields written by this function, even when the kustomize output includes a placeholder Secret with the same name. """ args = ["create", "secret", "generic", name, f"-n={ns}"] for k, v in literals.items(): args.append(f"--from-literal={k}={v}") args += ["--dry-run=client", "-o=yaml"] manifest = kube_out(*args) if manifest: kube("apply", "--server-side", "--force-conflicts", "--field-manager=sunbeam", "-f", "-", input=manifest) # ── 1. Prerequisites ────────────────────────────────────────────────────────── def check_prerequisites(): step("Checking prerequisites...") missing = [t for t in REQUIRED_TOOLS if not shutil.which(t)] if missing: die(f"missing tools: {', '.join(missing)}\nInstall: brew install {' '.join(missing)}") ok("All tools present.") # ── 2. Lima VM ──────────────────────────────────────────────────────────────── def ensure_lima_vm(): step("Lima VM...") status = _lima_status() if status == "none": ok("Creating 'sunbeam' (k3s 6 CPU / 12 GB / 60 GB)...") run(["limactl", "start", "--name=sunbeam", "template:k3s", "--memory=12", "--cpus=6", "--disk=60", "--vm-type=vz", "--mount-type=virtiofs", "--rosetta"]) elif status == "Running": ok("Already running.") else: ok(f"Starting (current status: {status})...") run(["limactl", "start", LIMA_VM]) def _lima_status(): """Return the Lima VM status, handling both JSON-array and NDJSON output.""" raw = capture_out(["limactl", "list", "--json"]) if not raw: return "none" vms = [] try: parsed = json.loads(raw) vms = parsed if isinstance(parsed, list) else [parsed] except json.JSONDecodeError: for line in raw.splitlines(): line = line.strip() if not line: continue try: vms.append(json.loads(line)) except json.JSONDecodeError: continue for vm in vms: if vm.get("name") == LIMA_VM: return vm.get("status", "unknown") return "none" # ── 3. Kubeconfig ───────────────────────────────────────────────────────────── def merge_kubeconfig(): step("Merging kubeconfig...") lima_kube = Path.home() / f".lima/{LIMA_VM}/copied-from-guest/kubeconfig.yaml" if not lima_kube.exists(): die(f"Lima kubeconfig not found: {lima_kube}") tmp = Path("/tmp/sunbeam-kube") tmp.mkdir(exist_ok=True) try: for query, filename in [ (".clusters[0].cluster.certificate-authority-data", "ca.crt"), (".users[0].user.client-certificate-data", "client.crt"), (".users[0].user.client-key-data", "client.key"), ]: b64 = capture_out(["yq", query, str(lima_kube)]) (tmp / filename).write_bytes(base64.b64decode(b64)) run(["kubectl", "config", "set-cluster", LIMA_VM, "--server=https://127.0.0.1:6443", f"--certificate-authority={tmp}/ca.crt", "--embed-certs=true"]) run(["kubectl", "config", "set-credentials", f"{LIMA_VM}-admin", f"--client-certificate={tmp}/client.crt", f"--client-key={tmp}/client.key", "--embed-certs=true"]) run(["kubectl", "config", "set-context", LIMA_VM, f"--cluster={LIMA_VM}", f"--user={LIMA_VM}-admin"]) finally: shutil.rmtree(tmp, ignore_errors=True) ok("Context 'sunbeam' ready.") # ── 4. Traefik ──────────────────────────────────────────────────────────────── def disable_traefik(): step("Traefik...") if kube_ok("get", "helmchart", "traefik", "-n", "kube-system"): ok("Removing (replaced by Pingora)...") kube("delete", "helmchart", "traefik", "traefik-crd", "-n", "kube-system", check=False) subprocess.run( ["limactl", "shell", LIMA_VM, "sudo", "rm", "-f", "/var/lib/rancher/k3s/server/manifests/traefik.yaml"], capture_output=True, ) # Write k3s config so Traefik can never return after a k3s restart. subprocess.run( ["limactl", "shell", LIMA_VM, "sudo", "tee", "/etc/rancher/k3s/config.yaml"], input="disable:\n - traefik\n", text=True, capture_output=True, ) ok("Done.") # ── 5. cert-manager ─────────────────────────────────────────────────────────── def ensure_cert_manager(): step("cert-manager...") if ns_exists("cert-manager"): ok("Already installed.") return ok("Installing...") kube("apply", "-f", "https://github.com/cert-manager/cert-manager/releases/download/v1.17.0/cert-manager.yaml") for dep in ["cert-manager", "cert-manager-webhook", "cert-manager-cainjector"]: kube("rollout", "status", f"deployment/{dep}", "-n", "cert-manager", "--timeout=120s") ok("Installed.") # ── 6. Linkerd ──────────────────────────────────────────────────────────────── def ensure_linkerd(): step("Linkerd...") if ns_exists("linkerd"): ok("Already installed.") return ok("Installing Gateway API CRDs...") kube("apply", "--server-side", "-f", "https://github.com/kubernetes-sigs/gateway-api/releases/download/v1.4.0/standard-install.yaml") ok("Installing Linkerd CRDs...") crds = capture_out(["linkerd", "install", "--crds"]) kube_apply(crds) ok("Installing Linkerd control plane...") cp = capture_out(["linkerd", "install"]) kube_apply(cp) for dep in ["linkerd-identity", "linkerd-destination", "linkerd-proxy-injector"]: kube("rollout", "status", f"deployment/{dep}", "-n", "linkerd", "--timeout=120s") ok("Installed.") # ── 7. TLS certificate ──────────────────────────────────────────────────────── def get_lima_ip(): raw = capture_out(["limactl", "shell", LIMA_VM, "ip", "-4", "addr", "show", "eth1"]) for line in raw.splitlines(): if "inet " in line: return line.strip().split()[1].split("/")[0] return capture_out(["limactl", "shell", LIMA_VM, "hostname", "-I"]).split()[0] def ensure_tls_cert(): step("TLS certificate...") ip = get_lima_ip() domain = f"{ip}.sslip.io" cert = SECRETS_DIR / "tls.crt" if cert.exists(): ok(f"Cert exists. Domain: {domain}") return domain ok(f"Generating wildcard cert for *.{domain}...") SECRETS_DIR.mkdir(parents=True, exist_ok=True) run(["mkcert", f"*.{domain}"], cwd=SECRETS_DIR) for src, dst in [ (f"_wildcard.{domain}.pem", "tls.crt"), (f"_wildcard.{domain}-key.pem", "tls.key"), ]: (SECRETS_DIR / src).rename(SECRETS_DIR / dst) ok(f"Cert generated. Domain: {domain}") return domain # ── 8. TLS secret ───────────────────────────────────────────────────────────── def ensure_tls_secret(domain): step("TLS secret...") ensure_ns("ingress") manifest = kube_out( "create", "secret", "tls", "pingora-tls", f"--cert={SECRETS_DIR}/tls.crt", f"--key={SECRETS_DIR}/tls.key", "-n", "ingress", "--dry-run=client", "-o=yaml", ) if manifest: kube_apply(manifest) ok("Done.") # ── 9. Lima VM registry trust + k3s config ──────────────────────────────────── def setup_lima_vm_registry(domain, gitea_admin_pass=""): """Install mkcert root CA in the Lima VM and configure k3s to auth with Gitea. Restarts k3s if either configuration changes so pods don't fight TLS errors or get unauthenticated pulls on the first deploy. """ step("Configuring Lima VM registry trust...") changed = False # Install mkcert root CA so containerd trusts our wildcard TLS cert caroot = capture_out(["mkcert", "-CAROOT"]) if caroot: ca_pem = Path(caroot) / "rootCA.pem" if ca_pem.exists(): already = subprocess.run( ["limactl", "shell", LIMA_VM, "test", "-f", "/usr/local/share/ca-certificates/mkcert-root.crt"], capture_output=True, ).returncode == 0 if not already: run(["limactl", "copy", str(ca_pem), f"{LIMA_VM}:/tmp/mkcert-root.pem"]) run(["limactl", "shell", LIMA_VM, "sudo", "cp", "/tmp/mkcert-root.pem", "/usr/local/share/ca-certificates/mkcert-root.crt"]) run(["limactl", "shell", LIMA_VM, "sudo", "update-ca-certificates"]) ok("mkcert CA installed in VM.") changed = True else: ok("mkcert CA already installed.") # Write k3s registries.yaml (auth for Gitea container registry) registry_host = f"src.{domain}" want = ( f'configs:\n' f' "{registry_host}":\n' f' auth:\n' f' username: "{GITEA_ADMIN_USER}"\n' f' password: "{gitea_admin_pass}"\n' ) existing = capture_out(["limactl", "shell", LIMA_VM, "sudo", "cat", "/etc/rancher/k3s/registries.yaml"]) if existing.strip() != want.strip(): subprocess.run( ["limactl", "shell", LIMA_VM, "sudo", "tee", "/etc/rancher/k3s/registries.yaml"], input=want, text=True, capture_output=True, ) ok(f"Registry config written for {registry_host}.") changed = True else: ok("Registry config up to date.") if changed: ok("Restarting k3s to apply changes...") subprocess.run( ["limactl", "shell", LIMA_VM, "sudo", "systemctl", "restart", "k3s"], capture_output=True, ) # Wait for API server to come back for _ in range(40): if kube_ok("get", "nodes"): break time.sleep(3) # Extra settle time — pods take a moment to start terminating/restarting time.sleep(15) ok("k3s restarted.") # ── 10. Apply manifests ──────────────────────────────────────────────────────── MANAGED_NS = ["data", "devtools", "ingress", "lasuite", "media", "ory", "storage", "vault-secrets-operator"] def pre_apply_cleanup(): """Delete immutable resources that must be re-created on each apply. Also prunes VaultStaticSecrets that share a name with a VaultDynamicSecret — kubectl apply doesn't delete the old resource when a manifest switches kinds, and VSO refuses to overwrite a secret owned by a different resource type. """ ok("Cleaning up immutable Jobs and test Pods...") for ns in MANAGED_NS: kube("delete", "jobs", "--all", "-n", ns, "--ignore-not-found", check=False) pods_out = kube_out("get", "pods", "-n", ns, "--field-selector=status.phase!=Running", "-o=jsonpath={.items[*].metadata.name}") for pod in pods_out.split(): if pod.endswith(("-test-connection", "-server-test", "-test")): kube("delete", "pod", pod, "-n", ns, "--ignore-not-found", check=False) # Prune VaultStaticSecrets that were replaced by VaultDynamicSecrets. # When a manifest transitions a resource from VSS → VDS, apply won't delete # the old VSS; it just creates the new VDS alongside it. VSO then errors # "not the owner" because the K8s secret's ownerRef still points to the VSS. ok("Pruning stale VaultStaticSecrets superseded by VaultDynamicSecrets...") for ns in MANAGED_NS: vss_names = set(kube_out( "get", "vaultstaticsecret", "-n", ns, "-o=jsonpath={.items[*].metadata.name}", "--ignore-not-found", ).split()) vds_names = set(kube_out( "get", "vaultdynamicsecret", "-n", ns, "-o=jsonpath={.items[*].metadata.name}", "--ignore-not-found", ).split()) for stale in vss_names & vds_names: ok(f" deleting stale VaultStaticSecret {ns}/{stale}") kube("delete", "vaultstaticsecret", stale, "-n", ns, "--ignore-not-found", check=False) def apply_manifests(domain): step(f"Applying manifests (domain: {domain})...") pre_apply_cleanup() r = run( ["kustomize", "build", "--enable-helm", "overlays/local/"], capture=True, cwd=REPO_ROOT, ) manifests = r.stdout.replace("DOMAIN_SUFFIX", domain) manifests = manifests.replace("\n annotations: null", "") kube("apply", "--server-side", "--force-conflicts", "-f", "-", input=manifests) ok("Applied.") # ── 11. Gitea bootstrap ──────────────────────────────────────────────────────── def bootstrap_gitea(domain, gitea_admin_pass=""): """Ensure Gitea admin has a known password and create the studio/internal orgs.""" step("Bootstrapping Gitea...") # Wait for a Running + Ready Gitea pod (more reliable than rollout status after a k3s restart) pod = "" for _ in range(60): candidate = kube_out( "-n", "devtools", "get", "pods", "-l=app.kubernetes.io/name=gitea", "--field-selector=status.phase=Running", "-o=jsonpath={.items[0].metadata.name}", ) if candidate: ready = kube_out("-n", "devtools", "get", "pod", candidate, "-o=jsonpath={.status.containerStatuses[0].ready}") if ready == "true": pod = candidate break time.sleep(3) if not pod: warn("Gitea pod not ready after 3 min — skipping bootstrap.") return def gitea_exec(*args): return subprocess.run( ["kubectl", *K8S_CTX, "-n", "devtools", "exec", pod, "-c", "gitea", "--"] + list(args), capture_output=True, text=True, ) # Ensure admin has the generated password r = gitea_exec("gitea", "admin", "user", "change-password", "--username", GITEA_ADMIN_USER, "--password", gitea_admin_pass) if r.returncode == 0 or "password" in (r.stdout + r.stderr).lower(): ok(f"Admin '{GITEA_ADMIN_USER}' password set.") else: warn(f"change-password: {r.stderr.strip()}") # Clear must_change_password via Postgres — Gitea enforces this flag at the API # level for ALL auth methods (including API tokens), so we must clear it in the DB. pg_pod = kube_out("-n", "data", "get", "pods", "-l=cnpg.io/cluster=postgres,role=primary", "-o=jsonpath={.items[0].metadata.name}") if pg_pod: kube("exec", "-n", "data", pg_pod, "-c", "postgres", "--", "psql", "-U", "postgres", "-d", "gitea_db", "-c", f'UPDATE "user" SET must_change_password = false' f" WHERE lower_name = '{GITEA_ADMIN_USER.lower()}';", check=False) ok("Cleared must-change-password flag.") else: warn("Postgres pod not found — must-change-password may block API calls.") def api(method, path, data=None): args = [ "curl", "-s", "-X", method, f"http://localhost:3000/api/v1{path}", "-H", "Content-Type: application/json", "-u", f"{GITEA_ADMIN_USER}:{gitea_admin_pass}", ] if data: args += ["-d", json.dumps(data)] r = gitea_exec(*args) try: return json.loads(r.stdout) except json.JSONDecodeError: return {} for org_name, visibility, desc in [ ("studio", "public", "Public source code"), ("internal", "private", "Internal tools and services"), ]: result = api("POST", "/orgs", { "username": org_name, "visibility": visibility, "description": desc, }) if "id" in result: ok(f"Created org '{org_name}'.") elif "already" in result.get("message", "").lower(): ok(f"Org '{org_name}' already exists.") else: warn(f"Org '{org_name}': {result.get('message', result)}") ok(f"Gitea ready — https://src.{domain} ({GITEA_ADMIN_USER} / )") # ── 12. Mirror amd64-only images to Gitea registry ──────────────────────────── # # Images like lasuite/people-backend only ship linux/amd64. Our Lima VM is arm64. # Strategy: pull the amd64 manifest by digest, create a patched OCI index that # adds an arm64 entry pointing to the same manifest (Rosetta runs it fine), then # push to our Gitea registry. k8s manifests reference src.DOMAIN_SUFFIX/studio/…; # k3s registries.yaml handles auth so no imagePullSecrets are needed. # # Runs inside the Lima VM via `limactl shell … sudo python3 -c …`. # Stdlib-only — no pip install required. _MIRROR_SCRIPT_BODY = r''' import json, hashlib, io, tarfile, os, subprocess, urllib.request CONTENT_STORE = ( "/var/lib/rancher/k3s/agent/containerd" "/io.containerd.content.v1.content/blobs/sha256" ) def blob_path(h): return os.path.join(CONTENT_STORE, h) def blob_exists(h): return os.path.exists(blob_path(h)) def read_blob(h): with open(blob_path(h), "rb") as f: return f.read() def add_tar_entry(tar, name, data): info = tarfile.TarInfo(name=name) info.size = len(data) tar.addfile(info, io.BytesIO(data)) def get_image_digest(ref): r = subprocess.run( ["ctr", "-n", "k8s.io", "images", "ls", "name==" + ref], capture_output=True, text=True, ) for line in r.stdout.splitlines(): if ref in line: for part in line.split(): if part.startswith("sha256:"): return part[7:] return None def fetch_index_from_registry(repo, tag): url = ( "https://auth.docker.io/token" f"?service=registry.docker.io&scope=repository:{repo}:pull" ) with urllib.request.urlopen(url) as resp: token = json.loads(resp.read())["token"] accept = ",".join([ "application/vnd.oci.image.index.v1+json", "application/vnd.docker.distribution.manifest.list.v2+json", ]) req = urllib.request.Request( f"https://registry-1.docker.io/v2/{repo}/manifests/{tag}", headers={"Authorization": f"Bearer {token}", "Accept": accept}, ) with urllib.request.urlopen(req) as resp: return json.loads(resp.read()) def make_oci_tar(ref, new_index_bytes, amd64_manifest_bytes): ix_hex = hashlib.sha256(new_index_bytes).hexdigest() amd64_hex = json.loads(new_index_bytes)["manifests"][0]["digest"].replace("sha256:", "") layout = json.dumps({"imageLayoutVersion": "1.0.0"}).encode() top = json.dumps({ "schemaVersion": 2, "mediaType": "application/vnd.oci.image.index.v1+json", "manifests": [{ "mediaType": "application/vnd.oci.image.index.v1+json", "digest": f"sha256:{ix_hex}", "size": len(new_index_bytes), "annotations": {"org.opencontainers.image.ref.name": ref}, }], }, separators=(",", ":")).encode() buf = io.BytesIO() with tarfile.open(fileobj=buf, mode="w:") as tar: add_tar_entry(tar, "oci-layout", layout) add_tar_entry(tar, "index.json", top) add_tar_entry(tar, f"blobs/sha256/{ix_hex}", new_index_bytes) add_tar_entry(tar, f"blobs/sha256/{amd64_hex}", amd64_manifest_bytes) return buf.getvalue() def import_ref(ref, tar_bytes): subprocess.run(["ctr", "-n", "k8s.io", "images", "rm", ref], capture_output=True) r = subprocess.run( ["ctr", "-n", "k8s.io", "images", "import", "--all-platforms", "-"], input=tar_bytes, capture_output=True, ) if r.returncode: print(f" import failed: {r.stderr.decode()}") return False subprocess.run( ["ctr", "-n", "k8s.io", "images", "label", ref, "io.cri-containerd.image=managed"], capture_output=True, ) return True def process(src, tgt, user, pwd): print(f" {src}") # Pull by tag — may fail on arm64-only images but still puts the index blob in the store subprocess.run(["ctr", "-n", "k8s.io", "images", "pull", src], capture_output=True) ix_hex = get_image_digest(src) if ix_hex and blob_exists(ix_hex): index = json.loads(read_blob(ix_hex)) else: print(" index not in content store — fetching from docker.io...") no_prefix = src.replace("docker.io/", "") parts = no_prefix.split(":", 1) repo, tag = parts[0], (parts[1] if len(parts) > 1 else "latest") index = fetch_index_from_registry(repo, tag) amd64 = next( (m for m in index.get("manifests", []) if m.get("platform", {}).get("architecture") == "amd64" and m.get("platform", {}).get("os") == "linux"), None, ) if not amd64: print(" skip: no linux/amd64 entry in index") return amd64_hex = amd64["digest"].replace("sha256:", "") if not blob_exists(amd64_hex): print(" pulling amd64 manifest + layers by digest...") repo_base = src.rsplit(":", 1)[0] subprocess.run( ["ctr", "-n", "k8s.io", "images", "pull", f"{repo_base}@sha256:{amd64_hex}"], capture_output=True, ) if not blob_exists(amd64_hex): print(" failed: amd64 manifest blob missing after pull") return amd64_bytes = read_blob(amd64_hex) # Patched index: keep amd64 + add arm64 alias pointing at same manifest arm64 = { "mediaType": amd64["mediaType"], "digest": amd64["digest"], "size": amd64["size"], "platform": {"architecture": "arm64", "os": "linux"}, } new_index = dict(index) new_index["manifests"] = [amd64, arm64] new_index_bytes = json.dumps(new_index, separators=(",", ":")).encode() # Import with Gitea target name if not import_ref(tgt, make_oci_tar(tgt, new_index_bytes, amd64_bytes)): return # Also patch the original source ref so pods still using docker.io name work import_ref(src, make_oci_tar(src, new_index_bytes, amd64_bytes)) # Push to Gitea registry print(f" pushing to registry...") r = subprocess.run( ["ctr", "-n", "k8s.io", "images", "push", "--user", f"{user}:{pwd}", tgt], capture_output=True, text=True, ) status = "OK" if r.returncode == 0 else f"PUSH FAILED: {r.stderr.strip()}" print(f" {status}") for _src, _tgt in TARGETS: process(_src, _tgt, USER, PASS) ''' def mirror_amd64_images(domain, gitea_admin_pass=""): """Patch amd64-only images with an arm64 alias and push them to our Gitea registry.""" step("Mirroring amd64-only images to Gitea registry...") registry = f"src.{domain}" targets = [ (src, f"{registry}/{org}/{repo}:{tag}") for src, org, repo, tag in AMD64_ONLY_IMAGES ] header = ( f"TARGETS = {repr(targets)}\n" f"USER = {repr(GITEA_ADMIN_USER)}\n" f"PASS = {repr(gitea_admin_pass)}\n" ) script = header + _MIRROR_SCRIPT_BODY run(["limactl", "shell", LIMA_VM, "sudo", "python3", "-c", script]) # Delete any pods stuck in image-pull error states ok("Clearing image-pull-error pods...") error_reasons = {"ImagePullBackOff", "ErrImagePull", "ErrImageNeverPull"} for ns in MANAGED_NS: pods_raw = kube_out( "-n", ns, "get", "pods", "-o=jsonpath={range .items[*]}" "{.metadata.name}:{.status.containerStatuses[0].state.waiting.reason}\\n" "{end}", ) for line in pods_raw.splitlines(): if not line: continue parts = line.split(":", 1) if len(parts) == 2 and parts[1] in error_reasons: kube("delete", "pod", parts[0], "-n", ns, "--ignore-not-found", check=False) ok("Done.") # ── 13. Seed secrets ────────────────────────────────────────────────────────── def seed_secrets(): """Seed OpenBao KV with crypto-random credentials, then mirror to K8s Secrets. Returns a dict of credentials for use by callers (gitea admin pass, etc.). Idempotent: reads existing OpenBao values before generating; never rotates. """ step("Seeding secrets...") creds = _seed_openbao() ob_pod = creds.pop("_ob_pod", "") root_token = creds.pop("_root_token", "") s3_access_key = creds.get("s3-access-key", "") s3_secret_key = creds.get("s3-secret-key", "") hydra_system = creds.get("hydra-system-secret", "") hydra_cookie = creds.get("hydra-cookie-secret", "") hydra_pairwise = creds.get("hydra-pairwise-salt", "") kratos_secrets_default = creds.get("kratos-secrets-default", "") kratos_secrets_cookie = creds.get("kratos-secrets-cookie", "") hive_oidc_id = creds.get("hive-oidc-client-id", "hive-local") hive_oidc_sec = creds.get("hive-oidc-client-secret", "") django_secret = creds.get("people-django-secret", "") gitea_admin_pass = creds.get("gitea-admin-password", "") ok("Waiting for postgres cluster...") pg_pod = "" for _ in range(60): phase = kube_out("-n", "data", "get", "cluster", "postgres", "-o=jsonpath={.status.phase}") if phase == "Cluster in healthy state": pg_pod = kube_out("-n", "data", "get", "pods", "-l=cnpg.io/cluster=postgres,role=primary", "-o=jsonpath={.items[0].metadata.name}") ok(f"Postgres ready ({pg_pod}).") break time.sleep(5) else: warn("Postgres not ready after 5 min — continuing anyway.") if pg_pod: ok("Ensuring postgres roles and databases exist...") db_map = { "kratos": "kratos_db", "hydra": "hydra_db", "gitea": "gitea_db", "hive": "hive_db", "docs": "docs_db", "meet": "meet_db", "drive": "drive_db", "messages": "messages_db", "conversations": "conversations_db", "people": "people_db", "find": "find_db", } for user in PG_USERS: # Only CREATE if missing — passwords are managed by OpenBao static roles. ensure_sql = ( f"DO $$ BEGIN " f"IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname='{user}') " f"THEN EXECUTE 'CREATE USER {user}'; END IF; END $$;" ) kube("exec", "-n", "data", pg_pod, "-c", "postgres", "--", "psql", "-U", "postgres", "-c", ensure_sql, check=False) db = db_map.get(user, f"{user}_db") kube("exec", "-n", "data", pg_pod, "-c", "postgres", "--", "psql", "-U", "postgres", "-c", f"CREATE DATABASE {db} OWNER {user};", check=False) # Read CNPG superuser credentials and configure database secrets engine. pg_user_b64 = kube_out("-n", "data", "get", "secret", "postgres-superuser", "-o=jsonpath={.data.username}") pg_pass_b64 = kube_out("-n", "data", "get", "secret", "postgres-superuser", "-o=jsonpath={.data.password}") pg_user = base64.b64decode(pg_user_b64).decode() if pg_user_b64 else "postgres" pg_pass = base64.b64decode(pg_pass_b64).decode() if pg_pass_b64 else "" if ob_pod and root_token and pg_pass: try: _configure_db_engine(ob_pod, root_token, pg_user, pg_pass) except Exception as exc: warn(f"DB engine config failed: {exc}") else: warn("Skipping DB engine config — missing ob_pod, root_token, or pg_pass.") ok("Creating K8s secrets (VSO will overwrite on next sync)...") ensure_ns("ory") # Hydra app secrets — DSN comes from VaultDynamicSecret hydra-db-creds. create_secret("ory", "hydra", secretsSystem=hydra_system, secretsCookie=hydra_cookie, **{"pairwise-salt": hydra_pairwise}, ) # Kratos non-rotating encryption keys — DSN comes from VaultDynamicSecret kratos-db-creds. create_secret("ory", "kratos-app-secrets", secretsDefault=kratos_secrets_default, secretsCookie=kratos_secrets_cookie, ) ensure_ns("devtools") # gitea-db-credentials comes from VaultDynamicSecret (static-creds/gitea). create_secret("devtools", "gitea-s3-credentials", **{"access-key": s3_access_key, "secret-key": s3_secret_key}) create_secret("devtools", "gitea-admin-credentials", username=GITEA_ADMIN_USER, password=gitea_admin_pass) ensure_ns("storage") s3_json = ( '{"identities":[{"name":"seaweed","credentials":[{"accessKey":"' + s3_access_key + '","secretKey":"' + s3_secret_key + '"}],"actions":["Admin","Read","Write","List","Tagging"]}]}' ) create_secret("storage", "seaweedfs-s3-credentials", S3_ACCESS_KEY=s3_access_key, S3_SECRET_KEY=s3_secret_key) create_secret("storage", "seaweedfs-s3-json", **{"s3.json": s3_json}) ensure_ns("lasuite") create_secret("lasuite", "seaweedfs-s3-credentials", S3_ACCESS_KEY=s3_access_key, S3_SECRET_KEY=s3_secret_key) # hive-db-url and people-db-credentials come from VaultDynamicSecrets. create_secret("lasuite", "hive-oidc", **{"client-id": hive_oidc_id, "client-secret": hive_oidc_sec}) create_secret("lasuite", "people-django-secret", DJANGO_SECRET_KEY=django_secret) ensure_ns("media") ok("All secrets seeded.") return creds def _seed_openbao(): """Initialize/unseal OpenBao, generate/read credentials idempotently, configure VSO auth. Returns a dict of all generated credentials. Values are read from existing OpenBao KV entries when present — re-running never rotates credentials. """ ob_pod = kube_out( "-n", "data", "get", "pods", "-l=app.kubernetes.io/name=openbao,component=server", "-o=jsonpath={.items[0].metadata.name}", ) if not ob_pod: ok("OpenBao pod not found — skipping.") return {} ok(f"OpenBao ({ob_pod})...") kube("wait", "-n", "data", f"pod/{ob_pod}", "--for=jsonpath={.status.phase}=Running", "--timeout=120s", check=False) def bao(cmd): r = subprocess.run( ["kubectl", *K8S_CTX, "-n", "data", "exec", ob_pod, "-c", "openbao", "--", "sh", "-c", cmd], capture_output=True, text=True, ) return r.stdout.strip() def bao_status(): out = bao("bao status -format=json 2>/dev/null || echo '{}'") try: return json.loads(out) except json.JSONDecodeError: return {} unseal_key = "" root_token = "" status = bao_status() already_initialized = status.get("initialized", False) if not already_initialized: existing_key = kube_out("-n", "data", "get", "secret", "openbao-keys", "-o=jsonpath={.data.key}") already_initialized = bool(existing_key) if not already_initialized: ok("Initializing OpenBao...") init_json = bao("bao operator init -key-shares=1 -key-threshold=1 -format=json 2>/dev/null || echo '{}'") try: init = json.loads(init_json) unseal_key = init["unseal_keys_b64"][0] root_token = init["root_token"] create_secret("data", "openbao-keys", key=unseal_key, **{"root-token": root_token}) ok("Initialized — keys stored in secret/openbao-keys.") except (json.JSONDecodeError, KeyError): warn("Init failed — resetting OpenBao storage for local dev...") kube("delete", "pvc", "data-openbao-0", "-n", "data", "--ignore-not-found", check=False) kube("delete", "pod", ob_pod, "-n", "data", "--ignore-not-found", check=False) warn("OpenBao storage reset. Run --seed again after the pod restarts.") return {} else: ok("Already initialized.") existing_key = kube_out("-n", "data", "get", "secret", "openbao-keys", "-o=jsonpath={.data.key}") if existing_key: unseal_key = base64.b64decode(existing_key).decode() root_token_enc = kube_out("-n", "data", "get", "secret", "openbao-keys", "-o=jsonpath={.data.root-token}") if root_token_enc: root_token = base64.b64decode(root_token_enc).decode() if bao_status().get("sealed", False) and unseal_key: ok("Unsealing...") bao(f"bao operator unseal '{unseal_key}' 2>/dev/null") if not root_token: warn("No root token available — skipping KV seeding.") return {} # Read-or-generate helper: preserves existing KV values; only generates missing ones. def get_or_create(path, **fields): raw = bao( f"BAO_ADDR=http://127.0.0.1:8200 BAO_TOKEN='{root_token}' " f"bao kv get -format=json secret/{path} 2>/dev/null || echo '{{}}'" ) existing = {} try: existing = json.loads(raw).get("data", {}).get("data", {}) except (json.JSONDecodeError, AttributeError): pass result = {} for key, default_fn in fields.items(): result[key] = existing.get(key) or default_fn() return result def rand(): return _secrets.token_urlsafe(32) ok("Seeding KV (idempotent — existing values preserved)...") bao(f"BAO_ADDR=http://127.0.0.1:8200 BAO_TOKEN='{root_token}' " f"bao secrets enable -path=secret -version=2 kv 2>/dev/null || true") # DB passwords removed — OpenBao database secrets engine manages them via static roles. hydra = get_or_create("hydra", **{"system-secret": rand, "cookie-secret": rand, "pairwise-salt": rand}) SMTP_URI = "smtp://postfix.lasuite.svc.cluster.local:25/?skip_ssl_verify=true" kratos = get_or_create("kratos", **{"secrets-default": rand, "secrets-cookie": rand, "smtp-connection-uri": lambda: SMTP_URI}) seaweedfs = get_or_create("seaweedfs", **{"access-key": rand, "secret-key": rand}) gitea = get_or_create("gitea", **{"admin-username": lambda: GITEA_ADMIN_USER, "admin-password": rand}) hive = get_or_create("hive", **{"oidc-client-id": lambda: "hive-local", "oidc-client-secret": rand}) livekit = get_or_create("livekit", **{"api-key": lambda: "devkey", "api-secret": rand}) people = get_or_create("people", **{"django-secret-key": rand}) login_ui = get_or_create("login-ui", **{"cookie-secret": rand, "csrf-cookie-secret": rand}) # Write all secrets to KV (idempotent — puts same values back) bao(f"BAO_ADDR=http://127.0.0.1:8200 BAO_TOKEN='{root_token}' sh -c '" f"bao kv put secret/hydra system-secret=\"{hydra['system-secret']}\" cookie-secret=\"{hydra['cookie-secret']}\" pairwise-salt=\"{hydra['pairwise-salt']}\" && " f"bao kv put secret/kratos secrets-default=\"{kratos['secrets-default']}\" secrets-cookie=\"{kratos['secrets-cookie']}\" smtp-connection-uri=\"{kratos['smtp-connection-uri']}\" && " f"bao kv put secret/gitea admin-username=\"{gitea['admin-username']}\" admin-password=\"{gitea['admin-password']}\" && " f"bao kv put secret/seaweedfs access-key=\"{seaweedfs['access-key']}\" secret-key=\"{seaweedfs['secret-key']}\" && " f"bao kv put secret/hive oidc-client-id=\"{hive['oidc-client-id']}\" oidc-client-secret=\"{hive['oidc-client-secret']}\" && " f"bao kv put secret/livekit api-key=\"{livekit['api-key']}\" api-secret=\"{livekit['api-secret']}\" && " f"bao kv put secret/people django-secret-key=\"{people['django-secret-key']}\" && " f"bao kv put secret/login-ui cookie-secret=\"{login_ui['cookie-secret']}\" csrf-cookie-secret=\"{login_ui['csrf-cookie-secret']}\"" f"'") # Configure Kubernetes auth method so VSO can authenticate with OpenBao ok("Configuring Kubernetes auth for VSO...") bao(f"BAO_ADDR=http://127.0.0.1:8200 BAO_TOKEN='{root_token}' " f"bao auth enable kubernetes 2>/dev/null; true") bao(f"BAO_ADDR=http://127.0.0.1:8200 BAO_TOKEN='{root_token}' " f"bao write auth/kubernetes/config " f"kubernetes_host=https://kubernetes.default.svc.cluster.local") policy_hcl = ( 'path "secret/data/*" { capabilities = ["read"] }\n' 'path "secret/metadata/*" { capabilities = ["read", "list"] }\n' 'path "database/static-creds/*" { capabilities = ["read"] }\n' ) policy_b64 = base64.b64encode(policy_hcl.encode()).decode() bao(f"BAO_ADDR=http://127.0.0.1:8200 BAO_TOKEN='{root_token}' " f"sh -c 'echo {policy_b64} | base64 -d | bao policy write vso-reader -'") bao(f"BAO_ADDR=http://127.0.0.1:8200 BAO_TOKEN='{root_token}' " f"bao write auth/kubernetes/role/vso " f"bound_service_account_names=default " f"bound_service_account_namespaces=ory,devtools,storage,lasuite,media " f"policies=vso-reader " f"ttl=1h") return { "hydra-system-secret": hydra["system-secret"], "hydra-cookie-secret": hydra["cookie-secret"], "hydra-pairwise-salt": hydra["pairwise-salt"], "kratos-secrets-default": kratos["secrets-default"], "kratos-secrets-cookie": kratos["secrets-cookie"], "s3-access-key": seaweedfs["access-key"], "s3-secret-key": seaweedfs["secret-key"], "gitea-admin-password": gitea["admin-password"], "hive-oidc-client-id": hive["oidc-client-id"], "hive-oidc-client-secret": hive["oidc-client-secret"], "people-django-secret": people["django-secret-key"], "livekit-api-key": livekit["api-key"], "livekit-api-secret": livekit["api-secret"], "_ob_pod": ob_pod, "_root_token": root_token, } # ── 13b. Configure OpenBao database secrets engine ──────────────────────────── def _configure_db_engine(ob_pod, root_token, pg_user, pg_pass): """Enable OpenBao database secrets engine and create PostgreSQL static roles. Static roles cause OpenBao to immediately set (and later rotate) each service user's password via ALTER USER, eliminating hardcoded DB passwords. Idempotent: bao write overwrites existing config/roles safely. The `vault` PG user is created here (if absent) and used as the DB engine connection user. pg_user/pg_pass (the CNPG superuser) are kept for potential future use but are no longer used for the connection URL. """ ok("Configuring OpenBao database secrets engine...") pg_rw = "postgres-rw.data.svc.cluster.local:5432" bao_env = f"BAO_ADDR=http://127.0.0.1:8200 BAO_TOKEN='{root_token}'" def bao(cmd, check=True): r = subprocess.run( ["kubectl", *K8S_CTX, "-n", "data", "exec", ob_pod, "-c", "openbao", "--", "sh", "-c", cmd], capture_output=True, text=True, ) if check and r.returncode != 0: raise RuntimeError(f"bao command failed (exit {r.returncode}):\n{r.stderr.strip()}") return r.stdout.strip() # Enable database secrets engine — tolerate "already enabled" error via || true. bao(f"{bao_env} bao secrets enable database 2>/dev/null || true", check=False) # ── vault PG user setup ──────────────────────────────────────────────────── # Locate the CNPG primary pod for psql exec (peer auth — no password needed). cnpg_pod = kube_out( "-n", "data", "get", "pods", "-l=cnpg.io/cluster=postgres,role=primary", "-o=jsonpath={.items[0].metadata.name}", ) if not cnpg_pod: raise RuntimeError("Could not find CNPG primary pod for vault user setup.") def psql(sql): r = subprocess.run( ["kubectl", *K8S_CTX, "-n", "data", "exec", cnpg_pod, "-c", "postgres", "--", "psql", "-U", "postgres", "-c", sql], capture_output=True, text=True, ) if r.returncode != 0: raise RuntimeError(f"psql failed: {r.stderr.strip()}") return r.stdout.strip() # Read existing vault pg-password from OpenBao KV, or generate a new one. existing_vault_pass = bao( f"{bao_env} bao kv get -field=pg-password secret/vault 2>/dev/null || true", check=False, ) vault_pg_pass = existing_vault_pass.strip() if existing_vault_pass.strip() else _secrets.token_urlsafe(32) # Store vault pg-password in OpenBao KV (idempotent). bao(f"{bao_env} bao kv put secret/vault pg-password=\"{vault_pg_pass}\"") ok("vault KV entry written.") # Create vault PG user if absent, set its password, grant ADMIN OPTION on all service users. create_vault_sql = ( f"DO $$ BEGIN " f"IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'vault') THEN " f"CREATE USER vault WITH LOGIN CREATEROLE; " f"END IF; " f"END $$;" ) psql(create_vault_sql) psql(f"ALTER USER vault WITH PASSWORD '{vault_pg_pass}';") for user in PG_USERS: psql(f"GRANT {user} TO vault WITH ADMIN OPTION;") ok("vault PG user configured with ADMIN OPTION on all service roles.") # ── DB engine connection config (uses vault user) ───────────────────────── conn_url = ( "postgresql://{{username}}:{{password}}" f"@{pg_rw}/postgres?sslmode=disable" ) bao( f"{bao_env} bao write database/config/cnpg-postgres" f" plugin_name=postgresql-database-plugin" f" allowed_roles='*'" f" connection_url='{conn_url}'" f" username='vault'" f" password='{vault_pg_pass}'" ) ok("DB engine connection configured (vault user).") # Encode the rotation statement to avoid shell quoting issues with inner quotes. rotation_b64 = base64.b64encode( b"ALTER USER \"{{name}}\" WITH PASSWORD '{{password}}';" ).decode() for user in PG_USERS: bao( f"{bao_env} sh -c '" f"bao write database/static-roles/{user}" f" db_name=cnpg-postgres" f" username={user}" f" rotation_period=86400" f" \"rotation_statements=$(echo {rotation_b64} | base64 -d)\"'" ) ok(f" static-role/{user}") ok("Database secrets engine configured.") # ── 14. Restart services ────────────────────────────────────────────────────── def restart_services(): step("Restarting services waiting for secrets...") for ns, dep in SERVICES_TO_RESTART: kube("-n", ns, "rollout", "restart", f"deployment/{dep}", check=False) ok("Done.") # ── 15. Wait for core ───────────────────────────────────────────────────────── def wait_for_core(): step("Waiting for core services...") for ns, dep in [("data", "valkey"), ("ory", "kratos"), ("ory", "hydra")]: kube("rollout", "status", f"deployment/{dep}", "-n", ns, "--timeout=120s", check=False) ok("Core services ready.") # ── 16. Print URLs ──────────────────────────────────────────────────────────── def print_urls(domain, gitea_admin_pass=""): print(f"\n{'─'*60}") print(f" Stack is up. Domain: {domain}") print(f"{'─'*60}") for name, url in [ ("Auth", f"https://auth.{domain}/"), ("Docs", f"https://docs.{domain}/"), ("Meet", f"https://meet.{domain}/"), ("Drive", f"https://drive.{domain}/"), ("Chat", f"https://chat.{domain}/"), ("Mail", f"https://mail.{domain}/"), ("People", f"https://people.{domain}/"), ("Gitea", f"https://src.{domain}/ ({GITEA_ADMIN_USER} / {gitea_admin_pass})"), ]: print(f" {name:<10} {url}") print() print(" OpenBao UI:") print(f" kubectl --context=sunbeam -n data port-forward svc/openbao 8200:8200") print(f" http://localhost:8200") token_cmd = "kubectl --context=sunbeam -n data get secret openbao-keys -o jsonpath='{.data.root-token}' | base64 -d" print(f" token: {token_cmd}") print(f"{'─'*60}\n") # ── 16. Status check ────────────────────────────────────────────────────────── def status_check(): """Print a concise pod health table grouped by namespace.""" step("Pod health across all namespaces...") # Fetch all pods across managed namespaces in one call raw = capture_out([ "kubectl", *K8S_CTX, "get", "pods", "--field-selector=metadata.namespace!= kube-system", "-A", "--no-headers", ]) # Filter to our namespaces only ns_set = set(MANAGED_NS) pods = [] for line in raw.splitlines(): cols = line.split() if len(cols) < 4: continue ns = cols[0] if ns not in ns_set: continue pods.append(cols) if not pods: warn("No pods found in managed namespaces.") return all_ok = True cur_ns = None icon_map = {"Running": "✓", "Completed": "✓", "Succeeded": "✓", "Pending": "○", "Failed": "✗", "Unknown": "?"} for cols in sorted(pods, key=lambda c: (c[0], c[1])): ns, name, ready, status = cols[0], cols[1], cols[2], cols[3] if ns != cur_ns: print(f" {ns}:") cur_ns = ns icon = icon_map.get(status, "?") unhealthy = status not in ("Running", "Completed", "Succeeded") # Only check ready ratio for Running pods — Completed/Succeeded pods # legitimately report 0/N containers ready. if not unhealthy and status == "Running" and "/" in ready: r, t = ready.split("/") unhealthy = r != t if unhealthy: all_ok = False print(f" {icon} {name:<50} {ready:<6} {status}") print() if all_ok: ok("All pods healthy.") else: warn("Some pods are not ready.") _vso_sync_status() def _vso_sync_status(): """Print VSO VaultStaticSecret and VaultDynamicSecret sync health. VSS synced = status.secretMAC is non-empty. VDS synced = status.lastRenewalTime is non-zero. """ step("VSO secret sync status...") all_ok = True # VaultStaticSecrets: synced when secretMAC is populated vss_raw = capture_out([ "kubectl", *K8S_CTX, "get", "vaultstaticsecret", "-A", "--no-headers", "-o=custom-columns=" "NS:.metadata.namespace,NAME:.metadata.name,MAC:.status.secretMAC", ]) cur_ns = None for line in sorted(vss_raw.splitlines()): cols = line.split() if len(cols) < 2: continue ns, name = cols[0], cols[1] mac = cols[2] if len(cols) > 2 else "" synced = bool(mac and mac != "") if not synced: all_ok = False icon = "✓" if synced else "✗" if ns != cur_ns: print(f" {ns} (VSS):") cur_ns = ns print(f" {icon} {name}") # VaultDynamicSecrets: synced when lastRenewalTime is non-zero vds_raw = capture_out([ "kubectl", *K8S_CTX, "get", "vaultdynamicsecret", "-A", "--no-headers", "-o=custom-columns=" "NS:.metadata.namespace,NAME:.metadata.name,RENEWED:.status.lastRenewalTime", ]) cur_ns = None for line in sorted(vds_raw.splitlines()): cols = line.split() if len(cols) < 2: continue ns, name = cols[0], cols[1] renewed = cols[2] if len(cols) > 2 else "0" synced = renewed not in ("", "0", "") if not synced: all_ok = False icon = "✓" if synced else "✗" if ns != cur_ns: print(f" {ns} (VDS):") cur_ns = ns print(f" {icon} {name}") print() if all_ok: ok("All VSO secrets synced.") else: warn("Some VSO secrets are not synced.") # ── 17. VSO E2E verification ────────────────────────────────────────────────── def verify_vso(): """End-to-end test of VSO → OpenBao integration. 1. Writes a random value to OpenBao KV at secret/vso-test. 2. Creates a VaultAuth + VaultStaticSecret in the 'ory' namespace (already bound to the 'vso' Kubernetes auth role). 3. Polls until VSO syncs the K8s Secret (up to 60s). 4. Reads and base64-decodes the K8s Secret; compares to the expected value. 5. Cleans up all test resources in a finally block. """ step("Verifying VSO → OpenBao integration (E2E)...") ob_pod = kube_out( "-n", "data", "get", "pods", "-l=app.kubernetes.io/name=openbao,component=server", "-o=jsonpath={.items[0].metadata.name}", ) if not ob_pod: die("OpenBao pod not found — run full bring-up first.") root_token_enc = kube_out( "-n", "data", "get", "secret", "openbao-keys", "-o=jsonpath={.data.root-token}", ) if not root_token_enc: die("Could not read openbao-keys secret.") root_token = base64.b64decode(root_token_enc).decode() bao_env = f"BAO_ADDR=http://127.0.0.1:8200 BAO_TOKEN='{root_token}'" def bao(cmd, *, check=True): r = subprocess.run( ["kubectl", *K8S_CTX, "-n", "data", "exec", ob_pod, "-c", "openbao", "--", "sh", "-c", cmd], capture_output=True, text=True, ) if check and r.returncode != 0: raise RuntimeError(f"bao failed (exit {r.returncode}): {r.stderr.strip()}") return r.stdout.strip() test_value = _secrets.token_urlsafe(16) test_ns = "ory" test_name = "vso-verify" def cleanup(): ok("Cleaning up test resources...") kube("delete", "vaultstaticsecret", test_name, f"-n={test_ns}", "--ignore-not-found", check=False) kube("delete", "vaultauth", test_name, f"-n={test_ns}", "--ignore-not-found", check=False) kube("delete", "secret", test_name, f"-n={test_ns}", "--ignore-not-found", check=False) bao(f"{bao_env} bao kv delete secret/vso-test 2>/dev/null || true", check=False) try: # 1. Write test value to OpenBao KV ok(f"Writing test sentinel to OpenBao secret/vso-test ...") bao(f"{bao_env} bao kv put secret/vso-test test-key='{test_value}'") # 2. Create VaultAuth in ory (already in vso role's bound namespaces) ok(f"Creating VaultAuth {test_ns}/{test_name} ...") kube_apply(f""" apiVersion: secrets.hashicorp.com/v1beta1 kind: VaultAuth metadata: name: {test_name} namespace: {test_ns} spec: method: kubernetes mount: kubernetes kubernetes: role: vso serviceAccount: default """) # 3. Create VaultStaticSecret pointing at our test KV path ok(f"Creating VaultStaticSecret {test_ns}/{test_name} ...") kube_apply(f""" apiVersion: secrets.hashicorp.com/v1beta1 kind: VaultStaticSecret metadata: name: {test_name} namespace: {test_ns} spec: vaultAuthRef: {test_name} mount: secret type: kv-v2 path: vso-test refreshAfter: 10s destination: name: {test_name} create: true overwrite: true """) # 4. Poll until VSO sets secretMAC (= synced) ok("Waiting for VSO to sync (up to 60s) ...") deadline = time.time() + 60 synced = False while time.time() < deadline: mac = kube_out( "get", "vaultstaticsecret", test_name, f"-n={test_ns}", "-o=jsonpath={.status.secretMAC}", "--ignore-not-found", ) if mac and mac not in ("", ""): synced = True break time.sleep(3) if not synced: msg = kube_out( "get", "vaultstaticsecret", test_name, f"-n={test_ns}", "-o=jsonpath={.status.conditions[0].message}", "--ignore-not-found", ) raise RuntimeError(f"VSO did not sync within 60s. Last status: {msg or 'unknown'}") # 5. Read and verify the K8s Secret value ok("Verifying K8s Secret contents ...") raw = kube_out( "get", "secret", test_name, f"-n={test_ns}", "-o=jsonpath={.data.test-key}", "--ignore-not-found", ) if not raw: raise RuntimeError( f"K8s Secret {test_ns}/{test_name} not found or missing key 'test-key'." ) actual = base64.b64decode(raw).decode() if actual != test_value: raise RuntimeError( f"Value mismatch!\n expected: {test_value!r}\n got: {actual!r}" ) ok(f"✓ Sentinel value matches — VSO → OpenBao integration is working.") except Exception as exc: cleanup() die(f"VSO verification FAILED: {exc}") cleanup() ok("VSO E2E verification passed.") # ── 18. Build + push sunbeam-proxy ─────────────────────────────────────────── def build_proxy(domain, admin_pass): """Build sunbeam-proxy for linux/arm64 and push to our Gitea registry. Requires Docker (buildx) on the host. The mkcert CA must already be trusted by Docker (docker-desktop uses the macOS Keychain, so `mkcert -install` is sufficient). After pushing, applies manifests so the Deployment picks up the updated image reference, then rolls the pingora pod to trigger a pull. """ if not shutil.which("docker"): die("docker not found — install Docker Desktop to use --build.") if not PROXY_DIR.is_dir(): die(f"Proxy source not found at {PROXY_DIR}") registry = f"src.{domain}" image = f"{registry}/studio/sunbeam-proxy:latest" step(f"Building sunbeam-proxy → {image} ...") # Authenticate Docker with Gitea before the build so --push succeeds. ok("Logging in to Gitea registry...") r = subprocess.run( ["docker", "login", registry, "--username", GITEA_ADMIN_USER, "--password-stdin"], input=admin_pass, text=True, capture_output=True, ) if r.returncode != 0: die(f"docker login failed:\n{r.stderr.strip()}") ok(f"Building image (linux/arm64, push)...") run(["docker", "buildx", "build", "--platform", "linux/arm64", "--push", "-t", image, str(PROXY_DIR)]) ok(f"Pushed {image}") # Apply manifests so the Deployment spec reflects the Gitea image ref. apply_manifests(domain) # Roll the pingora pod — imagePullPolicy: Always ensures it pulls fresh. ok("Rolling pingora deployment...") kube("rollout", "restart", "deployment/pingora", "-n", "ingress") kube("rollout", "status", "deployment/pingora", "-n", "ingress", "--timeout=120s") ok("Pingora redeployed.") # ── Main ────────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="Sunbeam local dev stack manager") parser.add_argument("--seed", action="store_true", help="Re-seed secrets only") parser.add_argument("--apply", action="store_true", help="Re-apply manifests + mirror images") parser.add_argument("--gitea", action="store_true", help="Bootstrap Gitea orgs + mirror images") parser.add_argument("--restart", action="store_true", help="Restart services only") parser.add_argument("--status", action="store_true", help="Show pod health across all namespaces") parser.add_argument("--verify", action="store_true", help="E2E test VSO → OpenBao integration") parser.add_argument("--build", action="store_true", help="Build + push sunbeam-proxy to Gitea; redeploy") args = parser.parse_args() check_prerequisites() # Partial-run modes — run in logical order: apply → seed → gitea → restart if args.status: status_check() return if args.verify: verify_vso() return if args.build: ip = get_lima_ip() domain = f"{ip}.sslip.io" admin_pass_b64 = kube_out( "-n", "devtools", "get", "secret", "gitea-admin-credentials", "-o=jsonpath={.data.password}", ) if not admin_pass_b64: die("gitea-admin-credentials secret not found — run --seed first.") admin_pass = base64.b64decode(admin_pass_b64).decode() build_proxy(domain, admin_pass) return if args.apply or args.gitea or args.seed or args.restart: ip = get_lima_ip() domain = f"{ip}.sslip.io" creds = {} if args.apply: apply_manifests(domain) if args.apply or args.gitea or args.seed: creds = seed_secrets() if args.apply or args.gitea: admin_pass = creds.get("gitea-admin-password", "") setup_lima_vm_registry(domain, admin_pass) bootstrap_gitea(domain, admin_pass) mirror_amd64_images(domain, admin_pass) restart_services() return # Full bring-up ensure_lima_vm() merge_kubeconfig() disable_traefik() ensure_cert_manager() ensure_linkerd() domain = ensure_tls_cert() ensure_tls_secret(domain) apply_manifests(domain) creds = seed_secrets() # waits for OpenBao; generates/reads all credentials admin_pass = creds.get("gitea-admin-password", "") setup_lima_vm_registry(domain, admin_pass) # mkcert CA + registries.yaml + k3s restart bootstrap_gitea(domain, admin_pass) # create studio/internal orgs mirror_amd64_images(domain, admin_pass) # patch + push amd64-only images restart_services() wait_for_core() print_urls(domain, admin_pass) if __name__ == "__main__": main()