#!/usr/bin/env python3 """ sunbeam.py — Sunbeam local dev stack lifecycle manager. Idempotent: safe to run from any state (fresh Mac, existing VM, partial deploy). Consolidates local-up.sh + local-seed-secrets.sh into one place. Usage: ./scripts/sunbeam.py # full stack bring-up ./scripts/sunbeam.py --apply # re-apply manifests + mirror images ./scripts/sunbeam.py --seed # re-seed secrets only ./scripts/sunbeam.py --gitea # bootstrap Gitea orgs + mirror amd64 images ./scripts/sunbeam.py --restart # restart services only ./scripts/sunbeam.py --status # show pod health across all namespaces Requires: limactl mkcert kubectl kustomize linkerd jq yq """ import argparse import base64 import json import os import shutil import subprocess import sys import time from pathlib import Path # ── Paths ───────────────────────────────────────────────────────────────────── SCRIPT_DIR = Path(__file__).parent.resolve() REPO_ROOT = SCRIPT_DIR.parent SECRETS_DIR = REPO_ROOT / "secrets" / "local" # ── Config ──────────────────────────────────────────────────────────────────── LIMA_VM = "sunbeam" K8S_CTX = ["--context=sunbeam"] # Deterministic local-dev credentials (not for production) DB_PASSWORD = "localdev" S3_ACCESS_KEY = "minioadmin" S3_SECRET_KEY = "minioadmin" HYDRA_SYSTEM_SECRET = "local-hydra-system-secret-at-least-16" HYDRA_COOKIE_SECRET = "local-hydra-cookie-secret-at-least-16" HYDRA_PAIRWISE_SALT = "local-hydra-pairwise-salt-value-1" LIVEKIT_API_KEY = "devkey" LIVEKIT_API_SECRET = "secret-placeholder" PEOPLE_DJANGO_SECRET = "local-dev-people-django-secret-key-not-for-production" # Gitea admin (deterministic for local dev; also set in gitea-values.yaml) GITEA_ADMIN_USER = "gitea_admin" GITEA_ADMIN_PASS = "localdev" GITEA_ADMIN_EMAIL = "gitea@local.domain" # Images that only ship linux/amd64 builds — patched + mirrored to our Gitea registry. # Rosetta runs the amd64 binaries on arm64, but the CRI refuses to pull arm64-absent images. # Format: (source_ref, gitea_org, gitea_repo, tag) AMD64_ONLY_IMAGES = [ ("docker.io/lasuite/people-backend:latest", "studio", "people-backend", "latest"), ("docker.io/lasuite/people-frontend:latest", "studio", "people-frontend", "latest"), ] REQUIRED_TOOLS = ["limactl", "mkcert", "kubectl", "kustomize", "linkerd", "jq", "yq"] PG_USERS = [ "kratos", "hydra", "gitea", "hive", "docs", "meet", "drive", "messages", "conversations", "people", "find", ] SERVICES_TO_RESTART = [ ("ory", "hydra"), ("ory", "kratos"), ("ory", "login-ui"), ("devtools", "gitea"), ("storage", "seaweedfs-filer"), ("lasuite", "hive"), ("lasuite", "people-backend"), ("lasuite", "people-frontend"), ("lasuite", "people-celery-worker"), ("lasuite", "people-celery-beat"), ("media", "livekit-server"), ] # ── Output ──────────────────────────────────────────────────────────────────── def step(msg): print(f"\n==> {msg}", flush=True) def ok(msg): print(f" {msg}", flush=True) def warn(msg): print(f" WARN: {msg}", file=sys.stderr, flush=True) def die(msg): print(f"\nERROR: {msg}", file=sys.stderr) sys.exit(1) # ── Subprocess helpers ──────────────────────────────────────────────────────── def run(cmd, *, check=True, input=None, capture=False, cwd=None): text = not isinstance(input, bytes) return subprocess.run(cmd, check=check, text=text, input=input, capture_output=capture, cwd=cwd) def capture_out(cmd, *, default=""): r = subprocess.run(cmd, capture_output=True, text=True) return r.stdout.strip() if r.returncode == 0 else default def succeeds(cmd): return subprocess.run(cmd, capture_output=True).returncode == 0 # ── kubectl wrappers ────────────────────────────────────────────────────────── def kube(*args, input=None, check=True): return run(["kubectl", *K8S_CTX, *args], input=input, check=check) def kube_out(*args): return capture_out(["kubectl", *K8S_CTX, *args]) def kube_ok(*args): return succeeds(["kubectl", *K8S_CTX, *args]) def kube_apply(manifest, *, server_side=True): args = ["apply", "-f", "-"] if server_side: args += ["--server-side", "--force-conflicts"] kube(*args, input=manifest) def ns_exists(ns): return kube_ok("get", "namespace", ns) def ensure_ns(ns): manifest = kube_out("create", "namespace", ns, "--dry-run=client", "-o=yaml") if manifest: kube_apply(manifest) def create_secret(ns, name, **literals): """Create or update a generic secret idempotently. Uses --field-manager=sunbeam so kustomize apply (manager=kubectl) never overwrites data fields written by this function, even when the kustomize output includes a placeholder Secret with the same name. """ args = ["create", "secret", "generic", name, f"-n={ns}"] for k, v in literals.items(): args.append(f"--from-literal={k}={v}") args += ["--dry-run=client", "-o=yaml"] manifest = kube_out(*args) if manifest: kube("apply", "--server-side", "--field-manager=sunbeam", "-f", "-", input=manifest) # ── 1. Prerequisites ────────────────────────────────────────────────────────── def check_prerequisites(): step("Checking prerequisites...") missing = [t for t in REQUIRED_TOOLS if not shutil.which(t)] if missing: die(f"missing tools: {', '.join(missing)}\nInstall: brew install {' '.join(missing)}") ok("All tools present.") # ── 2. Lima VM ──────────────────────────────────────────────────────────────── def ensure_lima_vm(): step("Lima VM...") status = _lima_status() if status == "none": ok("Creating 'sunbeam' (k3s 6 CPU / 12 GB / 60 GB)...") run(["limactl", "start", "--name=sunbeam", "template:k3s", "--memory=12", "--cpus=6", "--disk=60", "--vm-type=vz", "--mount-type=virtiofs", "--rosetta"]) elif status == "Running": ok("Already running.") else: ok(f"Starting (current status: {status})...") run(["limactl", "start", LIMA_VM]) def _lima_status(): """Return the Lima VM status, handling both JSON-array and NDJSON output.""" raw = capture_out(["limactl", "list", "--json"]) if not raw: return "none" vms = [] try: parsed = json.loads(raw) vms = parsed if isinstance(parsed, list) else [parsed] except json.JSONDecodeError: for line in raw.splitlines(): line = line.strip() if not line: continue try: vms.append(json.loads(line)) except json.JSONDecodeError: continue for vm in vms: if vm.get("name") == LIMA_VM: return vm.get("status", "unknown") return "none" # ── 3. Kubeconfig ───────────────────────────────────────────────────────────── def merge_kubeconfig(): step("Merging kubeconfig...") lima_kube = Path.home() / f".lima/{LIMA_VM}/copied-from-guest/kubeconfig.yaml" if not lima_kube.exists(): die(f"Lima kubeconfig not found: {lima_kube}") tmp = Path("/tmp/sunbeam-kube") tmp.mkdir(exist_ok=True) try: for query, filename in [ (".clusters[0].cluster.certificate-authority-data", "ca.crt"), (".users[0].user.client-certificate-data", "client.crt"), (".users[0].user.client-key-data", "client.key"), ]: b64 = capture_out(["yq", query, str(lima_kube)]) (tmp / filename).write_bytes(base64.b64decode(b64)) run(["kubectl", "config", "set-cluster", LIMA_VM, "--server=https://127.0.0.1:6443", f"--certificate-authority={tmp}/ca.crt", "--embed-certs=true"]) run(["kubectl", "config", "set-credentials", f"{LIMA_VM}-admin", f"--client-certificate={tmp}/client.crt", f"--client-key={tmp}/client.key", "--embed-certs=true"]) run(["kubectl", "config", "set-context", LIMA_VM, f"--cluster={LIMA_VM}", f"--user={LIMA_VM}-admin"]) finally: shutil.rmtree(tmp, ignore_errors=True) ok("Context 'sunbeam' ready.") # ── 4. Traefik ──────────────────────────────────────────────────────────────── def disable_traefik(): step("Traefik...") if kube_ok("get", "helmchart", "traefik", "-n", "kube-system"): ok("Removing (replaced by Pingora)...") kube("delete", "helmchart", "traefik", "traefik-crd", "-n", "kube-system", check=False) subprocess.run( ["limactl", "shell", LIMA_VM, "sudo", "rm", "-f", "/var/lib/rancher/k3s/server/manifests/traefik.yaml"], capture_output=True, ) ok("Done.") # ── 5. cert-manager ─────────────────────────────────────────────────────────── def ensure_cert_manager(): step("cert-manager...") if ns_exists("cert-manager"): ok("Already installed.") return ok("Installing...") kube("apply", "-f", "https://github.com/cert-manager/cert-manager/releases/download/v1.17.0/cert-manager.yaml") for dep in ["cert-manager", "cert-manager-webhook", "cert-manager-cainjector"]: kube("rollout", "status", f"deployment/{dep}", "-n", "cert-manager", "--timeout=120s") ok("Installed.") # ── 6. Linkerd ──────────────────────────────────────────────────────────────── def ensure_linkerd(): step("Linkerd...") if ns_exists("linkerd"): ok("Already installed.") return ok("Installing Gateway API CRDs...") kube("apply", "--server-side", "-f", "https://github.com/kubernetes-sigs/gateway-api/releases/download/v1.4.0/standard-install.yaml") ok("Installing Linkerd CRDs...") crds = capture_out(["linkerd", "install", "--crds"]) kube_apply(crds) ok("Installing Linkerd control plane...") cp = capture_out(["linkerd", "install"]) kube_apply(cp) for dep in ["linkerd-identity", "linkerd-destination", "linkerd-proxy-injector"]: kube("rollout", "status", f"deployment/{dep}", "-n", "linkerd", "--timeout=120s") ok("Installed.") # ── 7. TLS certificate ──────────────────────────────────────────────────────── def get_lima_ip(): raw = capture_out(["limactl", "shell", LIMA_VM, "ip", "-4", "addr", "show", "eth1"]) for line in raw.splitlines(): if "inet " in line: return line.strip().split()[1].split("/")[0] return capture_out(["limactl", "shell", LIMA_VM, "hostname", "-I"]).split()[0] def ensure_tls_cert(): step("TLS certificate...") ip = get_lima_ip() domain = f"{ip}.sslip.io" cert = SECRETS_DIR / "tls.crt" if cert.exists(): ok(f"Cert exists. Domain: {domain}") return domain ok(f"Generating wildcard cert for *.{domain}...") SECRETS_DIR.mkdir(parents=True, exist_ok=True) run(["mkcert", f"*.{domain}"], cwd=SECRETS_DIR) for src, dst in [ (f"_wildcard.{domain}.pem", "tls.crt"), (f"_wildcard.{domain}-key.pem", "tls.key"), ]: (SECRETS_DIR / src).rename(SECRETS_DIR / dst) ok(f"Cert generated. Domain: {domain}") return domain # ── 8. TLS secret ───────────────────────────────────────────────────────────── def ensure_tls_secret(domain): step("TLS secret...") ensure_ns("ingress") manifest = kube_out( "create", "secret", "tls", "pingora-tls", f"--cert={SECRETS_DIR}/tls.crt", f"--key={SECRETS_DIR}/tls.key", "-n", "ingress", "--dry-run=client", "-o=yaml", ) if manifest: kube_apply(manifest) ok("Done.") # ── 9. Lima VM registry trust + k3s config ──────────────────────────────────── def setup_lima_vm_registry(domain): """Install mkcert root CA in the Lima VM and configure k3s to auth with Gitea. Restarts k3s if either configuration changes so pods don't fight TLS errors or get unauthenticated pulls on the first deploy. """ step("Configuring Lima VM registry trust...") changed = False # Install mkcert root CA so containerd trusts our wildcard TLS cert caroot = capture_out(["mkcert", "-CAROOT"]) if caroot: ca_pem = Path(caroot) / "rootCA.pem" if ca_pem.exists(): already = subprocess.run( ["limactl", "shell", LIMA_VM, "test", "-f", "/usr/local/share/ca-certificates/mkcert-root.crt"], capture_output=True, ).returncode == 0 if not already: run(["limactl", "copy", str(ca_pem), f"{LIMA_VM}:/tmp/mkcert-root.pem"]) run(["limactl", "shell", LIMA_VM, "sudo", "cp", "/tmp/mkcert-root.pem", "/usr/local/share/ca-certificates/mkcert-root.crt"]) run(["limactl", "shell", LIMA_VM, "sudo", "update-ca-certificates"]) ok("mkcert CA installed in VM.") changed = True else: ok("mkcert CA already installed.") # Write k3s registries.yaml (auth for Gitea container registry) registry_host = f"src.{domain}" want = ( f'configs:\n' f' "{registry_host}":\n' f' auth:\n' f' username: "{GITEA_ADMIN_USER}"\n' f' password: "{GITEA_ADMIN_PASS}"\n' ) existing = capture_out(["limactl", "shell", LIMA_VM, "sudo", "cat", "/etc/rancher/k3s/registries.yaml"]) if existing.strip() != want.strip(): subprocess.run( ["limactl", "shell", LIMA_VM, "sudo", "tee", "/etc/rancher/k3s/registries.yaml"], input=want, text=True, capture_output=True, ) ok(f"Registry config written for {registry_host}.") changed = True else: ok("Registry config up to date.") if changed: ok("Restarting k3s to apply changes...") subprocess.run( ["limactl", "shell", LIMA_VM, "sudo", "systemctl", "restart", "k3s"], capture_output=True, ) # Wait for API server to come back for _ in range(40): if kube_ok("get", "nodes"): break time.sleep(3) # Extra settle time — pods take a moment to start terminating/restarting time.sleep(15) ok("k3s restarted.") # ── 10. Apply manifests ──────────────────────────────────────────────────────── MANAGED_NS = ["data", "devtools", "ingress", "lasuite", "media", "ory", "storage"] def pre_apply_cleanup(): """Delete immutable resources that must be re-created on each apply.""" ok("Cleaning up immutable Jobs and test Pods...") for ns in MANAGED_NS: kube("delete", "jobs", "--all", "-n", ns, "--ignore-not-found", check=False) pods_out = kube_out("get", "pods", "-n", ns, "--field-selector=status.phase!=Running", "-o=jsonpath={.items[*].metadata.name}") for pod in pods_out.split(): if pod.endswith(("-test-connection", "-server-test", "-test")): kube("delete", "pod", pod, "-n", ns, "--ignore-not-found", check=False) def apply_manifests(domain): step(f"Applying manifests (domain: {domain})...") pre_apply_cleanup() r = run( ["kustomize", "build", "--enable-helm", "overlays/local/"], capture=True, cwd=REPO_ROOT, ) manifests = r.stdout.replace("DOMAIN_SUFFIX", domain) manifests = manifests.replace("\n annotations: null", "") kube("apply", "--server-side", "--force-conflicts", "-f", "-", input=manifests) ok("Applied.") # ── 11. Gitea bootstrap ──────────────────────────────────────────────────────── def bootstrap_gitea(domain): """Ensure Gitea admin has a known password and create the studio/internal orgs.""" step("Bootstrapping Gitea...") # Wait for a Running + Ready Gitea pod (more reliable than rollout status after a k3s restart) pod = "" for _ in range(60): candidate = kube_out( "-n", "devtools", "get", "pods", "-l=app.kubernetes.io/name=gitea", "--field-selector=status.phase=Running", "-o=jsonpath={.items[0].metadata.name}", ) if candidate: ready = kube_out("-n", "devtools", "get", "pod", candidate, "-o=jsonpath={.status.containerStatuses[0].ready}") if ready == "true": pod = candidate break time.sleep(3) if not pod: warn("Gitea pod not ready after 3 min — skipping bootstrap.") return def gitea_exec(*args): return subprocess.run( ["kubectl", *K8S_CTX, "-n", "devtools", "exec", pod, "-c", "gitea", "--"] + list(args), capture_output=True, text=True, ) # Ensure admin has our known password r = gitea_exec("gitea", "admin", "user", "change-password", "--username", GITEA_ADMIN_USER, "--password", GITEA_ADMIN_PASS) if r.returncode == 0 or "password" in (r.stdout + r.stderr).lower(): ok(f"Admin '{GITEA_ADMIN_USER}' password set.") else: warn(f"change-password: {r.stderr.strip()}") # Clear must_change_password via Postgres — Gitea enforces this flag at the API # level for ALL auth methods (including API tokens), so we must clear it in the DB. pg_pod = kube_out("-n", "data", "get", "pods", "-l=cnpg.io/cluster=postgres,role=primary", "-o=jsonpath={.items[0].metadata.name}") if pg_pod: kube("exec", "-n", "data", pg_pod, "-c", "postgres", "--", "psql", "-U", "postgres", "-d", "gitea_db", "-c", f'UPDATE "user" SET must_change_password = false' f" WHERE lower_name = '{GITEA_ADMIN_USER.lower()}';", check=False) ok("Cleared must-change-password flag.") else: warn("Postgres pod not found — must-change-password may block API calls.") def api(method, path, data=None): args = [ "curl", "-s", "-X", method, f"http://localhost:3000/api/v1{path}", "-H", "Content-Type: application/json", "-u", f"{GITEA_ADMIN_USER}:{GITEA_ADMIN_PASS}", ] if data: args += ["-d", json.dumps(data)] r = gitea_exec(*args) try: return json.loads(r.stdout) except json.JSONDecodeError: return {} for org_name, visibility, desc in [ ("studio", "public", "Public source code"), ("internal", "private", "Internal tools and services"), ]: result = api("POST", "/orgs", { "username": org_name, "visibility": visibility, "description": desc, }) if "id" in result: ok(f"Created org '{org_name}'.") elif "already" in result.get("message", "").lower(): ok(f"Org '{org_name}' already exists.") else: warn(f"Org '{org_name}': {result.get('message', result)}") ok(f"Gitea ready — https://src.{domain} (studio / internal orgs)") # ── 12. Mirror amd64-only images to Gitea registry ──────────────────────────── # # Images like lasuite/people-backend only ship linux/amd64. Our Lima VM is arm64. # Strategy: pull the amd64 manifest by digest, create a patched OCI index that # adds an arm64 entry pointing to the same manifest (Rosetta runs it fine), then # push to our Gitea registry. k8s manifests reference src.DOMAIN_SUFFIX/studio/…; # k3s registries.yaml handles auth so no imagePullSecrets are needed. # # Runs inside the Lima VM via `limactl shell … sudo python3 -c …`. # Stdlib-only — no pip install required. _MIRROR_SCRIPT_BODY = r''' import json, hashlib, io, tarfile, os, subprocess, urllib.request CONTENT_STORE = ( "/var/lib/rancher/k3s/agent/containerd" "/io.containerd.content.v1.content/blobs/sha256" ) def blob_path(h): return os.path.join(CONTENT_STORE, h) def blob_exists(h): return os.path.exists(blob_path(h)) def read_blob(h): with open(blob_path(h), "rb") as f: return f.read() def add_tar_entry(tar, name, data): info = tarfile.TarInfo(name=name) info.size = len(data) tar.addfile(info, io.BytesIO(data)) def get_image_digest(ref): r = subprocess.run( ["ctr", "-n", "k8s.io", "images", "ls", "name==" + ref], capture_output=True, text=True, ) for line in r.stdout.splitlines(): if ref in line: for part in line.split(): if part.startswith("sha256:"): return part[7:] return None def fetch_index_from_registry(repo, tag): url = ( "https://auth.docker.io/token" f"?service=registry.docker.io&scope=repository:{repo}:pull" ) with urllib.request.urlopen(url) as resp: token = json.loads(resp.read())["token"] accept = ",".join([ "application/vnd.oci.image.index.v1+json", "application/vnd.docker.distribution.manifest.list.v2+json", ]) req = urllib.request.Request( f"https://registry-1.docker.io/v2/{repo}/manifests/{tag}", headers={"Authorization": f"Bearer {token}", "Accept": accept}, ) with urllib.request.urlopen(req) as resp: return json.loads(resp.read()) def make_oci_tar(ref, new_index_bytes, amd64_manifest_bytes): ix_hex = hashlib.sha256(new_index_bytes).hexdigest() amd64_hex = json.loads(new_index_bytes)["manifests"][0]["digest"].replace("sha256:", "") layout = json.dumps({"imageLayoutVersion": "1.0.0"}).encode() top = json.dumps({ "schemaVersion": 2, "mediaType": "application/vnd.oci.image.index.v1+json", "manifests": [{ "mediaType": "application/vnd.oci.image.index.v1+json", "digest": f"sha256:{ix_hex}", "size": len(new_index_bytes), "annotations": {"org.opencontainers.image.ref.name": ref}, }], }, separators=(",", ":")).encode() buf = io.BytesIO() with tarfile.open(fileobj=buf, mode="w:") as tar: add_tar_entry(tar, "oci-layout", layout) add_tar_entry(tar, "index.json", top) add_tar_entry(tar, f"blobs/sha256/{ix_hex}", new_index_bytes) add_tar_entry(tar, f"blobs/sha256/{amd64_hex}", amd64_manifest_bytes) return buf.getvalue() def import_ref(ref, tar_bytes): subprocess.run(["ctr", "-n", "k8s.io", "images", "rm", ref], capture_output=True) r = subprocess.run( ["ctr", "-n", "k8s.io", "images", "import", "--all-platforms", "-"], input=tar_bytes, capture_output=True, ) if r.returncode: print(f" import failed: {r.stderr.decode()}") return False subprocess.run( ["ctr", "-n", "k8s.io", "images", "label", ref, "io.cri-containerd.image=managed"], capture_output=True, ) return True def process(src, tgt, user, pwd): print(f" {src}") # Pull by tag — may fail on arm64-only images but still puts the index blob in the store subprocess.run(["ctr", "-n", "k8s.io", "images", "pull", src], capture_output=True) ix_hex = get_image_digest(src) if ix_hex and blob_exists(ix_hex): index = json.loads(read_blob(ix_hex)) else: print(" index not in content store — fetching from docker.io...") no_prefix = src.replace("docker.io/", "") parts = no_prefix.split(":", 1) repo, tag = parts[0], (parts[1] if len(parts) > 1 else "latest") index = fetch_index_from_registry(repo, tag) amd64 = next( (m for m in index.get("manifests", []) if m.get("platform", {}).get("architecture") == "amd64" and m.get("platform", {}).get("os") == "linux"), None, ) if not amd64: print(" skip: no linux/amd64 entry in index") return amd64_hex = amd64["digest"].replace("sha256:", "") if not blob_exists(amd64_hex): print(" pulling amd64 manifest + layers by digest...") repo_base = src.rsplit(":", 1)[0] subprocess.run( ["ctr", "-n", "k8s.io", "images", "pull", f"{repo_base}@sha256:{amd64_hex}"], capture_output=True, ) if not blob_exists(amd64_hex): print(" failed: amd64 manifest blob missing after pull") return amd64_bytes = read_blob(amd64_hex) # Patched index: keep amd64 + add arm64 alias pointing at same manifest arm64 = { "mediaType": amd64["mediaType"], "digest": amd64["digest"], "size": amd64["size"], "platform": {"architecture": "arm64", "os": "linux"}, } new_index = dict(index) new_index["manifests"] = [amd64, arm64] new_index_bytes = json.dumps(new_index, separators=(",", ":")).encode() # Import with Gitea target name if not import_ref(tgt, make_oci_tar(tgt, new_index_bytes, amd64_bytes)): return # Also patch the original source ref so pods still using docker.io name work import_ref(src, make_oci_tar(src, new_index_bytes, amd64_bytes)) # Push to Gitea registry print(f" pushing to registry...") r = subprocess.run( ["ctr", "-n", "k8s.io", "images", "push", "--user", f"{user}:{pwd}", tgt], capture_output=True, text=True, ) status = "OK" if r.returncode == 0 else f"PUSH FAILED: {r.stderr.strip()}" print(f" {status}") for _src, _tgt in TARGETS: process(_src, _tgt, USER, PASS) ''' def mirror_amd64_images(domain): """Patch amd64-only images with an arm64 alias and push them to our Gitea registry.""" step("Mirroring amd64-only images to Gitea registry...") registry = f"src.{domain}" targets = [ (src, f"{registry}/{org}/{repo}:{tag}") for src, org, repo, tag in AMD64_ONLY_IMAGES ] header = ( f"TARGETS = {repr(targets)}\n" f"USER = {repr(GITEA_ADMIN_USER)}\n" f"PASS = {repr(GITEA_ADMIN_PASS)}\n" ) script = header + _MIRROR_SCRIPT_BODY run(["limactl", "shell", LIMA_VM, "sudo", "python3", "-c", script]) # Delete any pods stuck in image-pull error states ok("Clearing image-pull-error pods...") error_reasons = {"ImagePullBackOff", "ErrImagePull", "ErrImageNeverPull"} for ns in MANAGED_NS: pods_raw = kube_out( "-n", ns, "get", "pods", "-o=jsonpath={range .items[*]}" "{.metadata.name}:{.status.containerStatuses[0].state.waiting.reason}\\n" "{end}", ) for line in pods_raw.splitlines(): if not line: continue parts = line.split(":", 1) if len(parts) == 2 and parts[1] in error_reasons: kube("delete", "pod", parts[0], "-n", ns, "--ignore-not-found", check=False) ok("Done.") # ── 13. Seed secrets ────────────────────────────────────────────────────────── def seed_secrets(): step("Seeding secrets...") ok("Waiting for postgres cluster...") pg_pod = "" for _ in range(60): phase = kube_out("-n", "data", "get", "cluster", "postgres", "-o=jsonpath={.status.phase}") if phase == "Cluster in healthy state": pg_pod = kube_out("-n", "data", "get", "pods", "-l=cnpg.io/cluster=postgres,role=primary", "-o=jsonpath={.items[0].metadata.name}") ok(f"Postgres ready ({pg_pod}).") break time.sleep(5) else: warn("Postgres not ready after 5 min — continuing anyway.") if pg_pod: ok("Ensuring postgres roles and databases...") db_map = { "kratos": "kratos_db", "hydra": "hydra_db", "gitea": "gitea_db", "hive": "hive_db", "docs": "docs_db", "meet": "meet_db", "drive": "drive_db", "messages": "messages_db", "conversations": "conversations_db", "people": "people_db", "find": "find_db", } for user in PG_USERS: ensure_sql = ( f"DO $$ BEGIN " f"IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname='{user}') " f"THEN EXECUTE 'CREATE USER {user}'; END IF; END $$; " f"ALTER USER {user} WITH PASSWORD '{DB_PASSWORD}';" ) kube("exec", "-n", "data", pg_pod, "-c", "postgres", "--", "psql", "-U", "postgres", "-c", ensure_sql, check=False) db = db_map.get(user, f"{user}_db") kube("exec", "-n", "data", pg_pod, "-c", "postgres", "--", "psql", "-U", "postgres", "-c", f"CREATE DATABASE {db} OWNER {user};", check=False) ok("Creating K8s secrets...") ensure_ns("ory") create_secret("ory", "hydra", dsn=(f"postgresql://hydra:{DB_PASSWORD}@" "postgres-rw.data.svc.cluster.local:5432/hydra_db?sslmode=disable"), secretsSystem=HYDRA_SYSTEM_SECRET, secretsCookie=HYDRA_COOKIE_SECRET, **{"pairwise-salt": HYDRA_PAIRWISE_SALT}, ) ensure_ns("devtools") create_secret("devtools", "gitea-db-credentials", password=DB_PASSWORD) create_secret("devtools", "gitea-s3-credentials", **{"access-key": S3_ACCESS_KEY, "secret-key": S3_SECRET_KEY}) ensure_ns("storage") create_secret("storage", "seaweedfs-s3-credentials", S3_ACCESS_KEY=S3_ACCESS_KEY, S3_SECRET_KEY=S3_SECRET_KEY) ensure_ns("lasuite") create_secret("lasuite", "seaweedfs-s3-credentials", S3_ACCESS_KEY=S3_ACCESS_KEY, S3_SECRET_KEY=S3_SECRET_KEY) create_secret("lasuite", "hive-db-url", url=(f"postgresql://hive:{DB_PASSWORD}@" "postgres-rw.data.svc.cluster.local:5432/hive_db")) create_secret("lasuite", "hive-oidc", **{"client-id": "hive-local", "client-secret": "hive-local-secret"}) create_secret("lasuite", "people-db-credentials", password=DB_PASSWORD) create_secret("lasuite", "people-django-secret", DJANGO_SECRET_KEY=PEOPLE_DJANGO_SECRET) ensure_ns("media") _seed_openbao() ok("All secrets seeded.") def _seed_openbao(): ob_pod = kube_out( "-n", "data", "get", "pods", "-l=app.kubernetes.io/name=openbao,component=server", "-o=jsonpath={.items[0].metadata.name}", ) if not ob_pod: ok("OpenBao pod not found — skipping.") return ok(f"OpenBao ({ob_pod})...") kube("wait", "-n", "data", f"pod/{ob_pod}", "--for=jsonpath={.status.phase}=Running", "--timeout=120s", check=False) def bao(cmd): r = subprocess.run( ["kubectl", *K8S_CTX, "-n", "data", "exec", ob_pod, "-c", "openbao", "--", "sh", "-c", cmd], capture_output=True, text=True, ) return r.stdout.strip() def bao_status(): out = bao("bao status -format=json 2>/dev/null || echo '{}'") try: return json.loads(out) except json.JSONDecodeError: return {} unseal_key = "" root_token = "" status = bao_status() already_initialized = status.get("initialized", False) if not already_initialized: existing_key = kube_out("-n", "data", "get", "secret", "openbao-keys", "-o=jsonpath={.data.key}") already_initialized = bool(existing_key) if not already_initialized: ok("Initializing OpenBao...") init_json = bao("bao operator init -key-shares=1 -key-threshold=1 -format=json 2>/dev/null || echo '{}'") try: init = json.loads(init_json) unseal_key = init["unseal_keys_b64"][0] root_token = init["root_token"] create_secret("data", "openbao-keys", key=unseal_key, **{"root-token": root_token}) ok("Initialized — keys stored in secret/openbao-keys.") except (json.JSONDecodeError, KeyError): warn("Init failed — resetting OpenBao storage for local dev...") kube("delete", "pvc", "data-openbao-0", "-n", "data", "--ignore-not-found", check=False) kube("delete", "pod", ob_pod, "-n", "data", "--ignore-not-found", check=False) warn("OpenBao storage reset. Run --seed again after the pod restarts.") return else: ok("Already initialized.") existing_key = kube_out("-n", "data", "get", "secret", "openbao-keys", "-o=jsonpath={.data.key}") if existing_key: unseal_key = base64.b64decode(existing_key).decode() root_token_enc = kube_out("-n", "data", "get", "secret", "openbao-keys", "-o=jsonpath={.data.root-token}") if root_token_enc: root_token = base64.b64decode(root_token_enc).decode() if bao_status().get("sealed", False) and unseal_key: ok("Unsealing...") bao(f"bao operator unseal '{unseal_key}' 2>/dev/null") if root_token: ok("Seeding KV...") pg_rw = "postgres-rw.data.svc.cluster.local:5432" bao(f""" BAO_ADDR=http://127.0.0.1:8200 BAO_TOKEN='{root_token}' sh -c ' bao secrets enable -path=secret -version=2 kv 2>/dev/null || true bao kv put secret/postgres password="{DB_PASSWORD}" bao kv put secret/hydra db-password="{DB_PASSWORD}" system-secret="{HYDRA_SYSTEM_SECRET}" cookie-secret="{HYDRA_COOKIE_SECRET}" pairwise-salt="{HYDRA_PAIRWISE_SALT}" bao kv put secret/kratos db-password="{DB_PASSWORD}" bao kv put secret/gitea db-password="{DB_PASSWORD}" s3-access-key="{S3_ACCESS_KEY}" s3-secret-key="{S3_SECRET_KEY}" bao kv put secret/seaweedfs access-key="{S3_ACCESS_KEY}" secret-key="{S3_SECRET_KEY}" bao kv put secret/hive db-url="postgresql://hive:{DB_PASSWORD}@{pg_rw}/hive_db" oidc-client-id="hive-local" oidc-client-secret="hive-local-secret" bao kv put secret/livekit api-key="{LIVEKIT_API_KEY}" api-secret="{LIVEKIT_API_SECRET}" bao kv put secret/people db-password="{DB_PASSWORD}" django-secret-key="{PEOPLE_DJANGO_SECRET}" ' """) # ── 14. Restart services ────────────────────────────────────────────────────── def restart_services(): step("Restarting services waiting for secrets...") for ns, dep in SERVICES_TO_RESTART: kube("-n", ns, "rollout", "restart", f"deployment/{dep}", check=False) ok("Done.") # ── 15. Wait for core ───────────────────────────────────────────────────────── def wait_for_core(): step("Waiting for core services...") for ns, dep in [("data", "valkey"), ("ory", "kratos"), ("ory", "hydra")]: kube("rollout", "status", f"deployment/{dep}", "-n", ns, "--timeout=120s", check=False) ok("Core services ready.") # ── 16. Print URLs ──────────────────────────────────────────────────────────── def print_urls(domain): print(f"\n{'─'*60}") print(f" Stack is up. Domain: {domain}") print(f"{'─'*60}") for name, url in [ ("Auth", f"https://auth.{domain}/"), ("Docs", f"https://docs.{domain}/"), ("Meet", f"https://meet.{domain}/"), ("Drive", f"https://drive.{domain}/"), ("Chat", f"https://chat.{domain}/"), ("Mail", f"https://mail.{domain}/"), ("People", f"https://people.{domain}/"), ("Gitea", f"https://src.{domain}/ ({GITEA_ADMIN_USER} / {GITEA_ADMIN_PASS})"), ]: print(f" {name:<10} {url}") print() print(" OpenBao UI:") print(f" kubectl --context=sunbeam -n data port-forward svc/openbao 8200:8200") print(f" http://localhost:8200") token_cmd = "kubectl --context=sunbeam -n data get secret openbao-keys -o jsonpath='{.data.root-token}' | base64 -d" print(f" token: {token_cmd}") print(f"{'─'*60}\n") # ── 16. Status check ────────────────────────────────────────────────────────── def status_check(): """Print a concise pod health table grouped by namespace.""" step("Pod health across all namespaces...") # Fetch all pods across managed namespaces in one call raw = capture_out([ "kubectl", *K8S_CTX, "get", "pods", "--field-selector=metadata.namespace!= kube-system", "-A", "--no-headers", ]) # Filter to our namespaces only ns_set = set(MANAGED_NS) pods = [] for line in raw.splitlines(): cols = line.split() if len(cols) < 4: continue ns = cols[0] if ns not in ns_set: continue pods.append(cols) if not pods: warn("No pods found in managed namespaces.") return all_ok = True cur_ns = None icon_map = {"Running": "✓", "Completed": "✓", "Succeeded": "✓", "Pending": "○", "Failed": "✗", "Unknown": "?"} for cols in sorted(pods, key=lambda c: (c[0], c[1])): ns, name, ready, status = cols[0], cols[1], cols[2], cols[3] if ns != cur_ns: print(f" {ns}:") cur_ns = ns icon = icon_map.get(status, "?") unhealthy = status not in ("Running", "Completed", "Succeeded") if not unhealthy and "/" in ready: r, t = ready.split("/") unhealthy = r != t if unhealthy: all_ok = False print(f" {icon} {name:<50} {ready:<6} {status}") print() if all_ok: ok("All pods healthy.") else: warn("Some pods are not ready.") # ── Main ────────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description="Sunbeam local dev stack manager") parser.add_argument("--seed", action="store_true", help="Re-seed secrets only") parser.add_argument("--apply", action="store_true", help="Re-apply manifests + mirror images") parser.add_argument("--gitea", action="store_true", help="Bootstrap Gitea orgs + mirror images") parser.add_argument("--restart", action="store_true", help="Restart services only") parser.add_argument("--status", action="store_true", help="Show pod health across all namespaces") args = parser.parse_args() check_prerequisites() # Partial-run modes — run in logical order: apply → gitea → seed → restart if args.status: status_check() return if args.apply or args.gitea or args.seed or args.restart: ip = get_lima_ip() domain = f"{ip}.sslip.io" if args.apply: setup_lima_vm_registry(domain) apply_manifests(domain) bootstrap_gitea(domain) mirror_amd64_images(domain) if args.gitea: setup_lima_vm_registry(domain) bootstrap_gitea(domain) mirror_amd64_images(domain) if args.seed: seed_secrets() restart_services() return # Full bring-up ensure_lima_vm() merge_kubeconfig() disable_traefik() ensure_cert_manager() ensure_linkerd() domain = ensure_tls_cert() ensure_tls_secret(domain) setup_lima_vm_registry(domain) # mkcert CA + registries.yaml + k3s restart if needed apply_manifests(domain) bootstrap_gitea(domain) # create studio/internal orgs mirror_amd64_images(domain) # patch + push amd64-only images seed_secrets() restart_services() wait_for_core() print_urls(domain) if __name__ == "__main__": main()