Files
sbbb/scripts/sunbeam.py
Sienna Meridian Satterwhite 6110c33b48 scripts: rename local-up.py → sunbeam.py; add Gitea bootstrap + registry mirroring
- Rename local-up.py → sunbeam.py; update docstring and argparser description
- Add setup_lima_vm_registry(): installs mkcert root CA into Lima VM system trust
  store and writes k3s registries.yaml (Gitea auth); restarts k3s if changed
- Add bootstrap_gitea(): waits for pod Running+Ready, sets admin password via
  gitea CLI, clears must_change_password via Postgres UPDATE (Gitea enforces
  this flag at API level regardless of auth method), creates studio/internal orgs
- Add mirror_amd64_images(): pulls amd64-only images, patches OCI index with an
  arm64 alias pointing at the same manifest (Rosetta runs it transparently),
  imports patched image into k3s containerd, pushes to Gitea container registry
- Add AMD64_ONLY_IMAGES list (currently: lasuite/people-{backend,frontend})
- Add --gitea partial flag: registry trust + Gitea bootstrap + mirror
- Add --status flag: pod health table across all managed namespaces
- Fix create_secret to use --field-manager=sunbeam so kustomize apply (manager
  kubectl) never wipes data fields written by the seed script
- Add people-frontend to SERVICES_TO_RESTART (was missing)
2026-03-01 21:04:39 +00:00

1028 lines
42 KiB
Python
Executable File

#!/usr/bin/env python3
"""
sunbeam.py — Sunbeam local dev stack lifecycle manager.
Idempotent: safe to run from any state (fresh Mac, existing VM, partial deploy).
Consolidates local-up.sh + local-seed-secrets.sh into one place.
Usage:
./scripts/sunbeam.py # full stack bring-up
./scripts/sunbeam.py --apply # re-apply manifests + mirror images
./scripts/sunbeam.py --seed # re-seed secrets only
./scripts/sunbeam.py --gitea # bootstrap Gitea orgs + mirror amd64 images
./scripts/sunbeam.py --restart # restart services only
./scripts/sunbeam.py --status # show pod health across all namespaces
Requires: limactl mkcert kubectl kustomize linkerd jq yq
"""
import argparse
import base64
import json
import os
import shutil
import subprocess
import sys
import time
from pathlib import Path
# ── Paths ─────────────────────────────────────────────────────────────────────
SCRIPT_DIR = Path(__file__).parent.resolve()
REPO_ROOT = SCRIPT_DIR.parent
SECRETS_DIR = REPO_ROOT / "secrets" / "local"
# ── Config ────────────────────────────────────────────────────────────────────
LIMA_VM = "sunbeam"
K8S_CTX = ["--context=sunbeam"]
# Deterministic local-dev credentials (not for production)
DB_PASSWORD = "localdev"
S3_ACCESS_KEY = "minioadmin"
S3_SECRET_KEY = "minioadmin"
HYDRA_SYSTEM_SECRET = "local-hydra-system-secret-at-least-16"
HYDRA_COOKIE_SECRET = "local-hydra-cookie-secret-at-least-16"
HYDRA_PAIRWISE_SALT = "local-hydra-pairwise-salt-value-1"
LIVEKIT_API_KEY = "devkey"
LIVEKIT_API_SECRET = "secret-placeholder"
PEOPLE_DJANGO_SECRET = "local-dev-people-django-secret-key-not-for-production"
# Gitea admin (deterministic for local dev; also set in gitea-values.yaml)
GITEA_ADMIN_USER = "gitea_admin"
GITEA_ADMIN_PASS = "localdev"
GITEA_ADMIN_EMAIL = "gitea@local.domain"
# Images that only ship linux/amd64 builds — patched + mirrored to our Gitea registry.
# Rosetta runs the amd64 binaries on arm64, but the CRI refuses to pull arm64-absent images.
# Format: (source_ref, gitea_org, gitea_repo, tag)
AMD64_ONLY_IMAGES = [
("docker.io/lasuite/people-backend:latest", "studio", "people-backend", "latest"),
("docker.io/lasuite/people-frontend:latest", "studio", "people-frontend", "latest"),
]
REQUIRED_TOOLS = ["limactl", "mkcert", "kubectl", "kustomize", "linkerd", "jq", "yq"]
PG_USERS = [
"kratos", "hydra", "gitea", "hive",
"docs", "meet", "drive", "messages", "conversations",
"people", "find",
]
SERVICES_TO_RESTART = [
("ory", "hydra"),
("ory", "kratos"),
("ory", "login-ui"),
("devtools", "gitea"),
("storage", "seaweedfs-filer"),
("lasuite", "hive"),
("lasuite", "people-backend"),
("lasuite", "people-frontend"),
("lasuite", "people-celery-worker"),
("lasuite", "people-celery-beat"),
("media", "livekit-server"),
]
# ── Output ────────────────────────────────────────────────────────────────────
def step(msg):
print(f"\n==> {msg}", flush=True)
def ok(msg):
print(f" {msg}", flush=True)
def warn(msg):
print(f" WARN: {msg}", file=sys.stderr, flush=True)
def die(msg):
print(f"\nERROR: {msg}", file=sys.stderr)
sys.exit(1)
# ── Subprocess helpers ────────────────────────────────────────────────────────
def run(cmd, *, check=True, input=None, capture=False, cwd=None):
text = not isinstance(input, bytes)
return subprocess.run(cmd, check=check, text=text, input=input,
capture_output=capture, cwd=cwd)
def capture_out(cmd, *, default=""):
r = subprocess.run(cmd, capture_output=True, text=True)
return r.stdout.strip() if r.returncode == 0 else default
def succeeds(cmd):
return subprocess.run(cmd, capture_output=True).returncode == 0
# ── kubectl wrappers ──────────────────────────────────────────────────────────
def kube(*args, input=None, check=True):
return run(["kubectl", *K8S_CTX, *args], input=input, check=check)
def kube_out(*args):
return capture_out(["kubectl", *K8S_CTX, *args])
def kube_ok(*args):
return succeeds(["kubectl", *K8S_CTX, *args])
def kube_apply(manifest, *, server_side=True):
args = ["apply", "-f", "-"]
if server_side:
args += ["--server-side", "--force-conflicts"]
kube(*args, input=manifest)
def ns_exists(ns):
return kube_ok("get", "namespace", ns)
def ensure_ns(ns):
manifest = kube_out("create", "namespace", ns, "--dry-run=client", "-o=yaml")
if manifest:
kube_apply(manifest)
def create_secret(ns, name, **literals):
"""Create or update a generic secret idempotently.
Uses --field-manager=sunbeam so kustomize apply (manager=kubectl) never
overwrites data fields written by this function, even when the kustomize
output includes a placeholder Secret with the same name.
"""
args = ["create", "secret", "generic", name, f"-n={ns}"]
for k, v in literals.items():
args.append(f"--from-literal={k}={v}")
args += ["--dry-run=client", "-o=yaml"]
manifest = kube_out(*args)
if manifest:
kube("apply", "--server-side", "--field-manager=sunbeam", "-f", "-", input=manifest)
# ── 1. Prerequisites ──────────────────────────────────────────────────────────
def check_prerequisites():
step("Checking prerequisites...")
missing = [t for t in REQUIRED_TOOLS if not shutil.which(t)]
if missing:
die(f"missing tools: {', '.join(missing)}\nInstall: brew install {' '.join(missing)}")
ok("All tools present.")
# ── 2. Lima VM ────────────────────────────────────────────────────────────────
def ensure_lima_vm():
step("Lima VM...")
status = _lima_status()
if status == "none":
ok("Creating 'sunbeam' (k3s 6 CPU / 12 GB / 60 GB)...")
run(["limactl", "start",
"--name=sunbeam", "template:k3s",
"--memory=12", "--cpus=6", "--disk=60",
"--vm-type=vz", "--mount-type=virtiofs",
"--rosetta"])
elif status == "Running":
ok("Already running.")
else:
ok(f"Starting (current status: {status})...")
run(["limactl", "start", LIMA_VM])
def _lima_status():
"""Return the Lima VM status, handling both JSON-array and NDJSON output."""
raw = capture_out(["limactl", "list", "--json"])
if not raw:
return "none"
vms = []
try:
parsed = json.loads(raw)
vms = parsed if isinstance(parsed, list) else [parsed]
except json.JSONDecodeError:
for line in raw.splitlines():
line = line.strip()
if not line:
continue
try:
vms.append(json.loads(line))
except json.JSONDecodeError:
continue
for vm in vms:
if vm.get("name") == LIMA_VM:
return vm.get("status", "unknown")
return "none"
# ── 3. Kubeconfig ─────────────────────────────────────────────────────────────
def merge_kubeconfig():
step("Merging kubeconfig...")
lima_kube = Path.home() / f".lima/{LIMA_VM}/copied-from-guest/kubeconfig.yaml"
if not lima_kube.exists():
die(f"Lima kubeconfig not found: {lima_kube}")
tmp = Path("/tmp/sunbeam-kube")
tmp.mkdir(exist_ok=True)
try:
for query, filename in [
(".clusters[0].cluster.certificate-authority-data", "ca.crt"),
(".users[0].user.client-certificate-data", "client.crt"),
(".users[0].user.client-key-data", "client.key"),
]:
b64 = capture_out(["yq", query, str(lima_kube)])
(tmp / filename).write_bytes(base64.b64decode(b64))
run(["kubectl", "config", "set-cluster", LIMA_VM,
"--server=https://127.0.0.1:6443",
f"--certificate-authority={tmp}/ca.crt", "--embed-certs=true"])
run(["kubectl", "config", "set-credentials", f"{LIMA_VM}-admin",
f"--client-certificate={tmp}/client.crt",
f"--client-key={tmp}/client.key", "--embed-certs=true"])
run(["kubectl", "config", "set-context", LIMA_VM,
f"--cluster={LIMA_VM}", f"--user={LIMA_VM}-admin"])
finally:
shutil.rmtree(tmp, ignore_errors=True)
ok("Context 'sunbeam' ready.")
# ── 4. Traefik ────────────────────────────────────────────────────────────────
def disable_traefik():
step("Traefik...")
if kube_ok("get", "helmchart", "traefik", "-n", "kube-system"):
ok("Removing (replaced by Pingora)...")
kube("delete", "helmchart", "traefik", "traefik-crd",
"-n", "kube-system", check=False)
subprocess.run(
["limactl", "shell", LIMA_VM,
"sudo", "rm", "-f",
"/var/lib/rancher/k3s/server/manifests/traefik.yaml"],
capture_output=True,
)
ok("Done.")
# ── 5. cert-manager ───────────────────────────────────────────────────────────
def ensure_cert_manager():
step("cert-manager...")
if ns_exists("cert-manager"):
ok("Already installed.")
return
ok("Installing...")
kube("apply", "-f",
"https://github.com/cert-manager/cert-manager/releases/download/v1.17.0/cert-manager.yaml")
for dep in ["cert-manager", "cert-manager-webhook", "cert-manager-cainjector"]:
kube("rollout", "status", f"deployment/{dep}",
"-n", "cert-manager", "--timeout=120s")
ok("Installed.")
# ── 6. Linkerd ────────────────────────────────────────────────────────────────
def ensure_linkerd():
step("Linkerd...")
if ns_exists("linkerd"):
ok("Already installed.")
return
ok("Installing Gateway API CRDs...")
kube("apply", "--server-side", "-f",
"https://github.com/kubernetes-sigs/gateway-api/releases/download/v1.4.0/standard-install.yaml")
ok("Installing Linkerd CRDs...")
crds = capture_out(["linkerd", "install", "--crds"])
kube_apply(crds)
ok("Installing Linkerd control plane...")
cp = capture_out(["linkerd", "install"])
kube_apply(cp)
for dep in ["linkerd-identity", "linkerd-destination", "linkerd-proxy-injector"]:
kube("rollout", "status", f"deployment/{dep}",
"-n", "linkerd", "--timeout=120s")
ok("Installed.")
# ── 7. TLS certificate ────────────────────────────────────────────────────────
def get_lima_ip():
raw = capture_out(["limactl", "shell", LIMA_VM,
"ip", "-4", "addr", "show", "eth1"])
for line in raw.splitlines():
if "inet " in line:
return line.strip().split()[1].split("/")[0]
return capture_out(["limactl", "shell", LIMA_VM, "hostname", "-I"]).split()[0]
def ensure_tls_cert():
step("TLS certificate...")
ip = get_lima_ip()
domain = f"{ip}.sslip.io"
cert = SECRETS_DIR / "tls.crt"
if cert.exists():
ok(f"Cert exists. Domain: {domain}")
return domain
ok(f"Generating wildcard cert for *.{domain}...")
SECRETS_DIR.mkdir(parents=True, exist_ok=True)
run(["mkcert", f"*.{domain}"], cwd=SECRETS_DIR)
for src, dst in [
(f"_wildcard.{domain}.pem", "tls.crt"),
(f"_wildcard.{domain}-key.pem", "tls.key"),
]:
(SECRETS_DIR / src).rename(SECRETS_DIR / dst)
ok(f"Cert generated. Domain: {domain}")
return domain
# ── 8. TLS secret ─────────────────────────────────────────────────────────────
def ensure_tls_secret(domain):
step("TLS secret...")
ensure_ns("ingress")
manifest = kube_out(
"create", "secret", "tls", "pingora-tls",
f"--cert={SECRETS_DIR}/tls.crt",
f"--key={SECRETS_DIR}/tls.key",
"-n", "ingress",
"--dry-run=client", "-o=yaml",
)
if manifest:
kube_apply(manifest)
ok("Done.")
# ── 9. Lima VM registry trust + k3s config ────────────────────────────────────
def setup_lima_vm_registry(domain):
"""Install mkcert root CA in the Lima VM and configure k3s to auth with Gitea.
Restarts k3s if either configuration changes so pods don't fight TLS errors
or get unauthenticated pulls on the first deploy.
"""
step("Configuring Lima VM registry trust...")
changed = False
# Install mkcert root CA so containerd trusts our wildcard TLS cert
caroot = capture_out(["mkcert", "-CAROOT"])
if caroot:
ca_pem = Path(caroot) / "rootCA.pem"
if ca_pem.exists():
already = subprocess.run(
["limactl", "shell", LIMA_VM, "test", "-f",
"/usr/local/share/ca-certificates/mkcert-root.crt"],
capture_output=True,
).returncode == 0
if not already:
run(["limactl", "copy", str(ca_pem), f"{LIMA_VM}:/tmp/mkcert-root.pem"])
run(["limactl", "shell", LIMA_VM, "sudo", "cp",
"/tmp/mkcert-root.pem",
"/usr/local/share/ca-certificates/mkcert-root.crt"])
run(["limactl", "shell", LIMA_VM, "sudo", "update-ca-certificates"])
ok("mkcert CA installed in VM.")
changed = True
else:
ok("mkcert CA already installed.")
# Write k3s registries.yaml (auth for Gitea container registry)
registry_host = f"src.{domain}"
want = (
f'configs:\n'
f' "{registry_host}":\n'
f' auth:\n'
f' username: "{GITEA_ADMIN_USER}"\n'
f' password: "{GITEA_ADMIN_PASS}"\n'
)
existing = capture_out(["limactl", "shell", LIMA_VM,
"sudo", "cat", "/etc/rancher/k3s/registries.yaml"])
if existing.strip() != want.strip():
subprocess.run(
["limactl", "shell", LIMA_VM, "sudo", "tee",
"/etc/rancher/k3s/registries.yaml"],
input=want, text=True, capture_output=True,
)
ok(f"Registry config written for {registry_host}.")
changed = True
else:
ok("Registry config up to date.")
if changed:
ok("Restarting k3s to apply changes...")
subprocess.run(
["limactl", "shell", LIMA_VM, "sudo", "systemctl", "restart", "k3s"],
capture_output=True,
)
# Wait for API server to come back
for _ in range(40):
if kube_ok("get", "nodes"):
break
time.sleep(3)
# Extra settle time — pods take a moment to start terminating/restarting
time.sleep(15)
ok("k3s restarted.")
# ── 10. Apply manifests ────────────────────────────────────────────────────────
MANAGED_NS = ["data", "devtools", "ingress", "lasuite", "media", "ory", "storage"]
def pre_apply_cleanup():
"""Delete immutable resources that must be re-created on each apply."""
ok("Cleaning up immutable Jobs and test Pods...")
for ns in MANAGED_NS:
kube("delete", "jobs", "--all", "-n", ns, "--ignore-not-found", check=False)
pods_out = kube_out("get", "pods", "-n", ns,
"--field-selector=status.phase!=Running",
"-o=jsonpath={.items[*].metadata.name}")
for pod in pods_out.split():
if pod.endswith(("-test-connection", "-server-test", "-test")):
kube("delete", "pod", pod, "-n", ns, "--ignore-not-found", check=False)
def apply_manifests(domain):
step(f"Applying manifests (domain: {domain})...")
pre_apply_cleanup()
r = run(
["kustomize", "build", "--enable-helm", "overlays/local/"],
capture=True, cwd=REPO_ROOT,
)
manifests = r.stdout.replace("DOMAIN_SUFFIX", domain)
manifests = manifests.replace("\n annotations: null", "")
kube("apply", "--server-side", "--force-conflicts", "-f", "-", input=manifests)
ok("Applied.")
# ── 11. Gitea bootstrap ────────────────────────────────────────────────────────
def bootstrap_gitea(domain):
"""Ensure Gitea admin has a known password and create the studio/internal orgs."""
step("Bootstrapping Gitea...")
# Wait for a Running + Ready Gitea pod (more reliable than rollout status after a k3s restart)
pod = ""
for _ in range(60):
candidate = kube_out(
"-n", "devtools", "get", "pods",
"-l=app.kubernetes.io/name=gitea",
"--field-selector=status.phase=Running",
"-o=jsonpath={.items[0].metadata.name}",
)
if candidate:
ready = kube_out("-n", "devtools", "get", "pod", candidate,
"-o=jsonpath={.status.containerStatuses[0].ready}")
if ready == "true":
pod = candidate
break
time.sleep(3)
if not pod:
warn("Gitea pod not ready after 3 min — skipping bootstrap.")
return
def gitea_exec(*args):
return subprocess.run(
["kubectl", *K8S_CTX, "-n", "devtools", "exec", pod, "-c", "gitea", "--"]
+ list(args),
capture_output=True, text=True,
)
# Ensure admin has our known password
r = gitea_exec("gitea", "admin", "user", "change-password",
"--username", GITEA_ADMIN_USER, "--password", GITEA_ADMIN_PASS)
if r.returncode == 0 or "password" in (r.stdout + r.stderr).lower():
ok(f"Admin '{GITEA_ADMIN_USER}' password set.")
else:
warn(f"change-password: {r.stderr.strip()}")
# Clear must_change_password via Postgres — Gitea enforces this flag at the API
# level for ALL auth methods (including API tokens), so we must clear it in the DB.
pg_pod = kube_out("-n", "data", "get", "pods",
"-l=cnpg.io/cluster=postgres,role=primary",
"-o=jsonpath={.items[0].metadata.name}")
if pg_pod:
kube("exec", "-n", "data", pg_pod, "-c", "postgres", "--",
"psql", "-U", "postgres", "-d", "gitea_db", "-c",
f'UPDATE "user" SET must_change_password = false'
f" WHERE lower_name = '{GITEA_ADMIN_USER.lower()}';",
check=False)
ok("Cleared must-change-password flag.")
else:
warn("Postgres pod not found — must-change-password may block API calls.")
def api(method, path, data=None):
args = [
"curl", "-s", "-X", method,
f"http://localhost:3000/api/v1{path}",
"-H", "Content-Type: application/json",
"-u", f"{GITEA_ADMIN_USER}:{GITEA_ADMIN_PASS}",
]
if data:
args += ["-d", json.dumps(data)]
r = gitea_exec(*args)
try:
return json.loads(r.stdout)
except json.JSONDecodeError:
return {}
for org_name, visibility, desc in [
("studio", "public", "Public source code"),
("internal", "private", "Internal tools and services"),
]:
result = api("POST", "/orgs", {
"username": org_name,
"visibility": visibility,
"description": desc,
})
if "id" in result:
ok(f"Created org '{org_name}'.")
elif "already" in result.get("message", "").lower():
ok(f"Org '{org_name}' already exists.")
else:
warn(f"Org '{org_name}': {result.get('message', result)}")
ok(f"Gitea ready — https://src.{domain} (studio / internal orgs)")
# ── 12. Mirror amd64-only images to Gitea registry ────────────────────────────
#
# Images like lasuite/people-backend only ship linux/amd64. Our Lima VM is arm64.
# Strategy: pull the amd64 manifest by digest, create a patched OCI index that
# adds an arm64 entry pointing to the same manifest (Rosetta runs it fine), then
# push to our Gitea registry. k8s manifests reference src.DOMAIN_SUFFIX/studio/…;
# k3s registries.yaml handles auth so no imagePullSecrets are needed.
#
# Runs inside the Lima VM via `limactl shell … sudo python3 -c …`.
# Stdlib-only — no pip install required.
_MIRROR_SCRIPT_BODY = r'''
import json, hashlib, io, tarfile, os, subprocess, urllib.request
CONTENT_STORE = (
"/var/lib/rancher/k3s/agent/containerd"
"/io.containerd.content.v1.content/blobs/sha256"
)
def blob_path(h):
return os.path.join(CONTENT_STORE, h)
def blob_exists(h):
return os.path.exists(blob_path(h))
def read_blob(h):
with open(blob_path(h), "rb") as f:
return f.read()
def add_tar_entry(tar, name, data):
info = tarfile.TarInfo(name=name)
info.size = len(data)
tar.addfile(info, io.BytesIO(data))
def get_image_digest(ref):
r = subprocess.run(
["ctr", "-n", "k8s.io", "images", "ls", "name==" + ref],
capture_output=True, text=True,
)
for line in r.stdout.splitlines():
if ref in line:
for part in line.split():
if part.startswith("sha256:"):
return part[7:]
return None
def fetch_index_from_registry(repo, tag):
url = (
"https://auth.docker.io/token"
f"?service=registry.docker.io&scope=repository:{repo}:pull"
)
with urllib.request.urlopen(url) as resp:
token = json.loads(resp.read())["token"]
accept = ",".join([
"application/vnd.oci.image.index.v1+json",
"application/vnd.docker.distribution.manifest.list.v2+json",
])
req = urllib.request.Request(
f"https://registry-1.docker.io/v2/{repo}/manifests/{tag}",
headers={"Authorization": f"Bearer {token}", "Accept": accept},
)
with urllib.request.urlopen(req) as resp:
return json.loads(resp.read())
def make_oci_tar(ref, new_index_bytes, amd64_manifest_bytes):
ix_hex = hashlib.sha256(new_index_bytes).hexdigest()
amd64_hex = json.loads(new_index_bytes)["manifests"][0]["digest"].replace("sha256:", "")
layout = json.dumps({"imageLayoutVersion": "1.0.0"}).encode()
top = json.dumps({
"schemaVersion": 2,
"mediaType": "application/vnd.oci.image.index.v1+json",
"manifests": [{
"mediaType": "application/vnd.oci.image.index.v1+json",
"digest": f"sha256:{ix_hex}",
"size": len(new_index_bytes),
"annotations": {"org.opencontainers.image.ref.name": ref},
}],
}, separators=(",", ":")).encode()
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w:") as tar:
add_tar_entry(tar, "oci-layout", layout)
add_tar_entry(tar, "index.json", top)
add_tar_entry(tar, f"blobs/sha256/{ix_hex}", new_index_bytes)
add_tar_entry(tar, f"blobs/sha256/{amd64_hex}", amd64_manifest_bytes)
return buf.getvalue()
def import_ref(ref, tar_bytes):
subprocess.run(["ctr", "-n", "k8s.io", "images", "rm", ref], capture_output=True)
r = subprocess.run(
["ctr", "-n", "k8s.io", "images", "import", "--all-platforms", "-"],
input=tar_bytes, capture_output=True,
)
if r.returncode:
print(f" import failed: {r.stderr.decode()}")
return False
subprocess.run(
["ctr", "-n", "k8s.io", "images", "label", ref, "io.cri-containerd.image=managed"],
capture_output=True,
)
return True
def process(src, tgt, user, pwd):
print(f" {src}")
# Pull by tag — may fail on arm64-only images but still puts the index blob in the store
subprocess.run(["ctr", "-n", "k8s.io", "images", "pull", src], capture_output=True)
ix_hex = get_image_digest(src)
if ix_hex and blob_exists(ix_hex):
index = json.loads(read_blob(ix_hex))
else:
print(" index not in content store — fetching from docker.io...")
no_prefix = src.replace("docker.io/", "")
parts = no_prefix.split(":", 1)
repo, tag = parts[0], (parts[1] if len(parts) > 1 else "latest")
index = fetch_index_from_registry(repo, tag)
amd64 = next(
(m for m in index.get("manifests", [])
if m.get("platform", {}).get("architecture") == "amd64"
and m.get("platform", {}).get("os") == "linux"),
None,
)
if not amd64:
print(" skip: no linux/amd64 entry in index")
return
amd64_hex = amd64["digest"].replace("sha256:", "")
if not blob_exists(amd64_hex):
print(" pulling amd64 manifest + layers by digest...")
repo_base = src.rsplit(":", 1)[0]
subprocess.run(
["ctr", "-n", "k8s.io", "images", "pull",
f"{repo_base}@sha256:{amd64_hex}"],
capture_output=True,
)
if not blob_exists(amd64_hex):
print(" failed: amd64 manifest blob missing after pull")
return
amd64_bytes = read_blob(amd64_hex)
# Patched index: keep amd64 + add arm64 alias pointing at same manifest
arm64 = {
"mediaType": amd64["mediaType"],
"digest": amd64["digest"],
"size": amd64["size"],
"platform": {"architecture": "arm64", "os": "linux"},
}
new_index = dict(index)
new_index["manifests"] = [amd64, arm64]
new_index_bytes = json.dumps(new_index, separators=(",", ":")).encode()
# Import with Gitea target name
if not import_ref(tgt, make_oci_tar(tgt, new_index_bytes, amd64_bytes)):
return
# Also patch the original source ref so pods still using docker.io name work
import_ref(src, make_oci_tar(src, new_index_bytes, amd64_bytes))
# Push to Gitea registry
print(f" pushing to registry...")
r = subprocess.run(
["ctr", "-n", "k8s.io", "images", "push",
"--user", f"{user}:{pwd}", tgt],
capture_output=True, text=True,
)
status = "OK" if r.returncode == 0 else f"PUSH FAILED: {r.stderr.strip()}"
print(f" {status}")
for _src, _tgt in TARGETS:
process(_src, _tgt, USER, PASS)
'''
def mirror_amd64_images(domain):
"""Patch amd64-only images with an arm64 alias and push them to our Gitea registry."""
step("Mirroring amd64-only images to Gitea registry...")
registry = f"src.{domain}"
targets = [
(src, f"{registry}/{org}/{repo}:{tag}")
for src, org, repo, tag in AMD64_ONLY_IMAGES
]
header = (
f"TARGETS = {repr(targets)}\n"
f"USER = {repr(GITEA_ADMIN_USER)}\n"
f"PASS = {repr(GITEA_ADMIN_PASS)}\n"
)
script = header + _MIRROR_SCRIPT_BODY
run(["limactl", "shell", LIMA_VM, "sudo", "python3", "-c", script])
# Delete any pods stuck in image-pull error states
ok("Clearing image-pull-error pods...")
error_reasons = {"ImagePullBackOff", "ErrImagePull", "ErrImageNeverPull"}
for ns in MANAGED_NS:
pods_raw = kube_out(
"-n", ns, "get", "pods",
"-o=jsonpath={range .items[*]}"
"{.metadata.name}:{.status.containerStatuses[0].state.waiting.reason}\\n"
"{end}",
)
for line in pods_raw.splitlines():
if not line:
continue
parts = line.split(":", 1)
if len(parts) == 2 and parts[1] in error_reasons:
kube("delete", "pod", parts[0], "-n", ns, "--ignore-not-found", check=False)
ok("Done.")
# ── 13. Seed secrets ──────────────────────────────────────────────────────────
def seed_secrets():
step("Seeding secrets...")
ok("Waiting for postgres cluster...")
pg_pod = ""
for _ in range(60):
phase = kube_out("-n", "data", "get", "cluster", "postgres",
"-o=jsonpath={.status.phase}")
if phase == "Cluster in healthy state":
pg_pod = kube_out("-n", "data", "get", "pods",
"-l=cnpg.io/cluster=postgres,role=primary",
"-o=jsonpath={.items[0].metadata.name}")
ok(f"Postgres ready ({pg_pod}).")
break
time.sleep(5)
else:
warn("Postgres not ready after 5 min — continuing anyway.")
if pg_pod:
ok("Ensuring postgres roles and databases...")
db_map = {
"kratos": "kratos_db", "hydra": "hydra_db", "gitea": "gitea_db",
"hive": "hive_db", "docs": "docs_db", "meet": "meet_db",
"drive": "drive_db", "messages": "messages_db",
"conversations": "conversations_db",
"people": "people_db", "find": "find_db",
}
for user in PG_USERS:
ensure_sql = (
f"DO $$ BEGIN "
f"IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname='{user}') "
f"THEN EXECUTE 'CREATE USER {user}'; END IF; END $$; "
f"ALTER USER {user} WITH PASSWORD '{DB_PASSWORD}';"
)
kube("exec", "-n", "data", pg_pod, "-c", "postgres", "--",
"psql", "-U", "postgres", "-c", ensure_sql, check=False)
db = db_map.get(user, f"{user}_db")
kube("exec", "-n", "data", pg_pod, "-c", "postgres", "--",
"psql", "-U", "postgres", "-c",
f"CREATE DATABASE {db} OWNER {user};", check=False)
ok("Creating K8s secrets...")
ensure_ns("ory")
create_secret("ory", "hydra",
dsn=(f"postgresql://hydra:{DB_PASSWORD}@"
"postgres-rw.data.svc.cluster.local:5432/hydra_db?sslmode=disable"),
secretsSystem=HYDRA_SYSTEM_SECRET,
secretsCookie=HYDRA_COOKIE_SECRET,
**{"pairwise-salt": HYDRA_PAIRWISE_SALT},
)
ensure_ns("devtools")
create_secret("devtools", "gitea-db-credentials", password=DB_PASSWORD)
create_secret("devtools", "gitea-s3-credentials",
**{"access-key": S3_ACCESS_KEY, "secret-key": S3_SECRET_KEY})
ensure_ns("storage")
create_secret("storage", "seaweedfs-s3-credentials",
S3_ACCESS_KEY=S3_ACCESS_KEY, S3_SECRET_KEY=S3_SECRET_KEY)
ensure_ns("lasuite")
create_secret("lasuite", "seaweedfs-s3-credentials",
S3_ACCESS_KEY=S3_ACCESS_KEY, S3_SECRET_KEY=S3_SECRET_KEY)
create_secret("lasuite", "hive-db-url",
url=(f"postgresql://hive:{DB_PASSWORD}@"
"postgres-rw.data.svc.cluster.local:5432/hive_db"))
create_secret("lasuite", "hive-oidc",
**{"client-id": "hive-local", "client-secret": "hive-local-secret"})
create_secret("lasuite", "people-db-credentials", password=DB_PASSWORD)
create_secret("lasuite", "people-django-secret",
DJANGO_SECRET_KEY=PEOPLE_DJANGO_SECRET)
ensure_ns("media")
_seed_openbao()
ok("All secrets seeded.")
def _seed_openbao():
ob_pod = kube_out(
"-n", "data", "get", "pods",
"-l=app.kubernetes.io/name=openbao,component=server",
"-o=jsonpath={.items[0].metadata.name}",
)
if not ob_pod:
ok("OpenBao pod not found — skipping.")
return
ok(f"OpenBao ({ob_pod})...")
kube("wait", "-n", "data", f"pod/{ob_pod}",
"--for=jsonpath={.status.phase}=Running", "--timeout=120s", check=False)
def bao(cmd):
r = subprocess.run(
["kubectl", *K8S_CTX, "-n", "data", "exec", ob_pod, "-c", "openbao",
"--", "sh", "-c", cmd],
capture_output=True, text=True,
)
return r.stdout.strip()
def bao_status():
out = bao("bao status -format=json 2>/dev/null || echo '{}'")
try:
return json.loads(out)
except json.JSONDecodeError:
return {}
unseal_key = ""
root_token = ""
status = bao_status()
already_initialized = status.get("initialized", False)
if not already_initialized:
existing_key = kube_out("-n", "data", "get", "secret", "openbao-keys",
"-o=jsonpath={.data.key}")
already_initialized = bool(existing_key)
if not already_initialized:
ok("Initializing OpenBao...")
init_json = bao("bao operator init -key-shares=1 -key-threshold=1 -format=json 2>/dev/null || echo '{}'")
try:
init = json.loads(init_json)
unseal_key = init["unseal_keys_b64"][0]
root_token = init["root_token"]
create_secret("data", "openbao-keys",
key=unseal_key, **{"root-token": root_token})
ok("Initialized — keys stored in secret/openbao-keys.")
except (json.JSONDecodeError, KeyError):
warn("Init failed — resetting OpenBao storage for local dev...")
kube("delete", "pvc", "data-openbao-0", "-n", "data", "--ignore-not-found", check=False)
kube("delete", "pod", ob_pod, "-n", "data", "--ignore-not-found", check=False)
warn("OpenBao storage reset. Run --seed again after the pod restarts.")
return
else:
ok("Already initialized.")
existing_key = kube_out("-n", "data", "get", "secret", "openbao-keys",
"-o=jsonpath={.data.key}")
if existing_key:
unseal_key = base64.b64decode(existing_key).decode()
root_token_enc = kube_out("-n", "data", "get", "secret", "openbao-keys",
"-o=jsonpath={.data.root-token}")
if root_token_enc:
root_token = base64.b64decode(root_token_enc).decode()
if bao_status().get("sealed", False) and unseal_key:
ok("Unsealing...")
bao(f"bao operator unseal '{unseal_key}' 2>/dev/null")
if root_token:
ok("Seeding KV...")
pg_rw = "postgres-rw.data.svc.cluster.local:5432"
bao(f"""
BAO_ADDR=http://127.0.0.1:8200 BAO_TOKEN='{root_token}' sh -c '
bao secrets enable -path=secret -version=2 kv 2>/dev/null || true
bao kv put secret/postgres password="{DB_PASSWORD}"
bao kv put secret/hydra db-password="{DB_PASSWORD}" system-secret="{HYDRA_SYSTEM_SECRET}" cookie-secret="{HYDRA_COOKIE_SECRET}" pairwise-salt="{HYDRA_PAIRWISE_SALT}"
bao kv put secret/kratos db-password="{DB_PASSWORD}"
bao kv put secret/gitea db-password="{DB_PASSWORD}" s3-access-key="{S3_ACCESS_KEY}" s3-secret-key="{S3_SECRET_KEY}"
bao kv put secret/seaweedfs access-key="{S3_ACCESS_KEY}" secret-key="{S3_SECRET_KEY}"
bao kv put secret/hive db-url="postgresql://hive:{DB_PASSWORD}@{pg_rw}/hive_db" oidc-client-id="hive-local" oidc-client-secret="hive-local-secret"
bao kv put secret/livekit api-key="{LIVEKIT_API_KEY}" api-secret="{LIVEKIT_API_SECRET}"
bao kv put secret/people db-password="{DB_PASSWORD}" django-secret-key="{PEOPLE_DJANGO_SECRET}"
'
""")
# ── 14. Restart services ──────────────────────────────────────────────────────
def restart_services():
step("Restarting services waiting for secrets...")
for ns, dep in SERVICES_TO_RESTART:
kube("-n", ns, "rollout", "restart", f"deployment/{dep}", check=False)
ok("Done.")
# ── 15. Wait for core ─────────────────────────────────────────────────────────
def wait_for_core():
step("Waiting for core services...")
for ns, dep in [("data", "valkey"), ("ory", "kratos"), ("ory", "hydra")]:
kube("rollout", "status", f"deployment/{dep}",
"-n", ns, "--timeout=120s", check=False)
ok("Core services ready.")
# ── 16. Print URLs ────────────────────────────────────────────────────────────
def print_urls(domain):
print(f"\n{''*60}")
print(f" Stack is up. Domain: {domain}")
print(f"{''*60}")
for name, url in [
("Auth", f"https://auth.{domain}/"),
("Docs", f"https://docs.{domain}/"),
("Meet", f"https://meet.{domain}/"),
("Drive", f"https://drive.{domain}/"),
("Chat", f"https://chat.{domain}/"),
("Mail", f"https://mail.{domain}/"),
("People", f"https://people.{domain}/"),
("Gitea", f"https://src.{domain}/ ({GITEA_ADMIN_USER} / {GITEA_ADMIN_PASS})"),
]:
print(f" {name:<10} {url}")
print()
print(" OpenBao UI:")
print(f" kubectl --context=sunbeam -n data port-forward svc/openbao 8200:8200")
print(f" http://localhost:8200")
token_cmd = "kubectl --context=sunbeam -n data get secret openbao-keys -o jsonpath='{.data.root-token}' | base64 -d"
print(f" token: {token_cmd}")
print(f"{''*60}\n")
# ── 16. Status check ──────────────────────────────────────────────────────────
def status_check():
"""Print a concise pod health table grouped by namespace."""
step("Pod health across all namespaces...")
# Fetch all pods across managed namespaces in one call
raw = capture_out([
"kubectl", *K8S_CTX,
"get", "pods",
"--field-selector=metadata.namespace!= kube-system",
"-A", "--no-headers",
])
# Filter to our namespaces only
ns_set = set(MANAGED_NS)
pods = []
for line in raw.splitlines():
cols = line.split()
if len(cols) < 4:
continue
ns = cols[0]
if ns not in ns_set:
continue
pods.append(cols)
if not pods:
warn("No pods found in managed namespaces.")
return
all_ok = True
cur_ns = None
icon_map = {"Running": "", "Completed": "", "Succeeded": "",
"Pending": "", "Failed": "", "Unknown": "?"}
for cols in sorted(pods, key=lambda c: (c[0], c[1])):
ns, name, ready, status = cols[0], cols[1], cols[2], cols[3]
if ns != cur_ns:
print(f" {ns}:")
cur_ns = ns
icon = icon_map.get(status, "?")
unhealthy = status not in ("Running", "Completed", "Succeeded")
if not unhealthy and "/" in ready:
r, t = ready.split("/")
unhealthy = r != t
if unhealthy:
all_ok = False
print(f" {icon} {name:<50} {ready:<6} {status}")
print()
if all_ok:
ok("All pods healthy.")
else:
warn("Some pods are not ready.")
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Sunbeam local dev stack manager")
parser.add_argument("--seed", action="store_true", help="Re-seed secrets only")
parser.add_argument("--apply", action="store_true", help="Re-apply manifests + mirror images")
parser.add_argument("--gitea", action="store_true", help="Bootstrap Gitea orgs + mirror images")
parser.add_argument("--restart", action="store_true", help="Restart services only")
parser.add_argument("--status", action="store_true", help="Show pod health across all namespaces")
args = parser.parse_args()
check_prerequisites()
# Partial-run modes — run in logical order: apply → gitea → seed → restart
if args.status:
status_check()
return
if args.apply or args.gitea or args.seed or args.restart:
ip = get_lima_ip()
domain = f"{ip}.sslip.io"
if args.apply:
setup_lima_vm_registry(domain)
apply_manifests(domain)
bootstrap_gitea(domain)
mirror_amd64_images(domain)
if args.gitea:
setup_lima_vm_registry(domain)
bootstrap_gitea(domain)
mirror_amd64_images(domain)
if args.seed:
seed_secrets()
restart_services()
return
# Full bring-up
ensure_lima_vm()
merge_kubeconfig()
disable_traefik()
ensure_cert_manager()
ensure_linkerd()
domain = ensure_tls_cert()
ensure_tls_secret(domain)
setup_lima_vm_registry(domain) # mkcert CA + registries.yaml + k3s restart if needed
apply_manifests(domain)
bootstrap_gitea(domain) # create studio/internal orgs
mirror_amd64_images(domain) # patch + push amd64-only images
seed_secrets()
restart_services()
wait_for_core()
print_urls(domain)
if __name__ == "__main__":
main()