feat: initial sunbeam CLI package

stdlib-only Python CLI replacing infrastructure/scripts/sunbeam.py.
Verbs: up, down, status, apply, seed, verify, logs, restart, get,
build, mirror, bootstrap. Service scoping via ns/name target syntax.
Auto-bundled kubectl/kustomize/helm (SHA256-verified, cached in
~/.local/share/sunbeam/bin). 63 unittest tests, all passing.
This commit is contained in:
2026-03-02 20:59:57 +00:00
commit cdc109d728
20 changed files with 2803 additions and 0 deletions

6
.gitignore vendored Normal file
View File

@@ -0,0 +1,6 @@
__pycache__/
*.py[cod]
*.egg-info/
dist/
build/
.eggs/

12
pyproject.toml Normal file
View File

@@ -0,0 +1,12 @@
[project]
name = "sunbeam"
version = "0.1.0"
requires-python = ">=3.11"
dependencies = ["setuptools"]
[project.scripts]
sunbeam = "sunbeam.__main__:main"
[build-system]
requires = ["setuptools>=68"]
build-backend = "setuptools.build_meta"

1
sunbeam/__init__.py Normal file
View File

@@ -0,0 +1 @@
# sunbeam CLI package

4
sunbeam/__main__.py Normal file
View File

@@ -0,0 +1,4 @@
from sunbeam.cli import main
if __name__ == "__main__":
main()

119
sunbeam/cli.py Normal file
View File

@@ -0,0 +1,119 @@
"""CLI entry point — argparse dispatch table for all sunbeam verbs."""
import argparse
import sys
def main() -> None:
parser = argparse.ArgumentParser(
prog="sunbeam",
description="Sunbeam local dev stack manager",
)
sub = parser.add_subparsers(dest="verb", metavar="verb")
# sunbeam up
sub.add_parser("up", help="Full cluster bring-up")
# sunbeam down
sub.add_parser("down", help="Tear down Lima VM")
# sunbeam status [ns[/name]]
p_status = sub.add_parser("status", help="Pod health (optionally scoped)")
p_status.add_argument("target", nargs="?", default=None,
help="namespace or namespace/name")
# sunbeam apply
sub.add_parser("apply", help="kustomize build + domain subst + kubectl apply")
# sunbeam seed
sub.add_parser("seed", help="Generate/store all credentials in OpenBao")
# sunbeam verify
sub.add_parser("verify", help="E2E VSO + OpenBao integration test")
# sunbeam logs <ns/name> [-f]
p_logs = sub.add_parser("logs", help="kubectl logs for a service")
p_logs.add_argument("target", help="namespace/name")
p_logs.add_argument("-f", "--follow", action="store_true",
help="Stream logs (--follow)")
# sunbeam get <ns/name> [-o yaml|json|wide]
p_get = sub.add_parser("get", help="Raw kubectl get for a pod (ns/name)")
p_get.add_argument("target", help="namespace/name")
p_get.add_argument("-o", "--output", default="yaml",
choices=["yaml", "json", "wide"],
help="Output format (default: yaml)")
# sunbeam restart [ns[/name]]
p_restart = sub.add_parser("restart", help="Rolling restart of services")
p_restart.add_argument("target", nargs="?", default=None,
help="namespace or namespace/name")
# sunbeam build <what>
p_build = sub.add_parser("build", help="Build and push an artifact")
p_build.add_argument("what", choices=["proxy"],
help="What to build (proxy)")
# sunbeam mirror
sub.add_parser("mirror", help="Mirror amd64-only La Suite images")
# sunbeam bootstrap
sub.add_parser("bootstrap", help="Create Gitea orgs/repos; set up Lima registry")
args = parser.parse_args()
if args.verb is None:
parser.print_help()
sys.exit(0)
# Lazy imports to keep startup fast
if args.verb == "up":
from sunbeam.cluster import cmd_up
cmd_up()
elif args.verb == "down":
from sunbeam.cluster import cmd_down
cmd_down()
elif args.verb == "status":
from sunbeam.services import cmd_status
cmd_status(args.target)
elif args.verb == "apply":
from sunbeam.manifests import cmd_apply
cmd_apply()
elif args.verb == "seed":
from sunbeam.secrets import cmd_seed
cmd_seed()
elif args.verb == "verify":
from sunbeam.secrets import cmd_verify
cmd_verify()
elif args.verb == "logs":
from sunbeam.services import cmd_logs
cmd_logs(args.target, follow=args.follow)
elif args.verb == "get":
from sunbeam.services import cmd_get
cmd_get(args.target, output=args.output)
elif args.verb == "restart":
from sunbeam.services import cmd_restart
cmd_restart(args.target)
elif args.verb == "build":
from sunbeam.images import cmd_build
cmd_build(args.what)
elif args.verb == "mirror":
from sunbeam.images import cmd_mirror
cmd_mirror()
elif args.verb == "bootstrap":
from sunbeam.gitea import cmd_bootstrap
cmd_bootstrap()
else:
parser.print_help()
sys.exit(1)

300
sunbeam/cluster.py Normal file
View File

@@ -0,0 +1,300 @@
"""Cluster lifecycle — Lima VM, kubeconfig, Linkerd, TLS, core service readiness."""
import base64
import json
import shutil
import subprocess
import time
from pathlib import Path
from sunbeam.kube import (kube, kube_out, kube_ok, kube_apply,
kustomize_build, get_lima_ip, ensure_ns, create_secret, ns_exists)
from sunbeam.tools import run_tool, CACHE_DIR
from sunbeam.output import step, ok, warn, die
LIMA_VM = "sunbeam"
SECRETS_DIR = Path(__file__).parents[3] / "infrastructure" / "secrets" / "local"
GITEA_ADMIN_USER = "gitea_admin"
# ---------------------------------------------------------------------------
# Lima VM
# ---------------------------------------------------------------------------
def _lima_status() -> str:
"""Return the Lima VM status, handling both JSON-array and NDJSON output."""
r = subprocess.run(["limactl", "list", "--json"],
capture_output=True, text=True)
raw = r.stdout.strip() if r.returncode == 0 else ""
if not raw:
return "none"
vms: list[dict] = []
try:
parsed = json.loads(raw)
vms = parsed if isinstance(parsed, list) else [parsed]
except json.JSONDecodeError:
for line in raw.splitlines():
line = line.strip()
if not line:
continue
try:
vms.append(json.loads(line))
except json.JSONDecodeError:
continue
for vm in vms:
if vm.get("name") == LIMA_VM:
return vm.get("status", "unknown")
return "none"
def ensure_lima_vm():
step("Lima VM...")
status = _lima_status()
if status == "none":
ok("Creating 'sunbeam' (k3s 6 CPU / 12 GB / 60 GB)...")
subprocess.run(
["limactl", "start",
"--name=sunbeam", "template:k3s",
"--memory=12", "--cpus=6", "--disk=60",
"--vm-type=vz", "--mount-type=virtiofs",
"--rosetta"],
check=True,
)
elif status == "Running":
ok("Already running.")
else:
ok(f"Starting (current status: {status})...")
subprocess.run(["limactl", "start", LIMA_VM], check=True)
# ---------------------------------------------------------------------------
# Kubeconfig
# ---------------------------------------------------------------------------
def merge_kubeconfig():
step("Merging kubeconfig...")
lima_kube = Path.home() / f".lima/{LIMA_VM}/copied-from-guest/kubeconfig.yaml"
if not lima_kube.exists():
die(f"Lima kubeconfig not found: {lima_kube}")
tmp = Path("/tmp/sunbeam-kube")
tmp.mkdir(exist_ok=True)
try:
for query, filename in [
(".clusters[0].cluster.certificate-authority-data", "ca.crt"),
(".users[0].user.client-certificate-data", "client.crt"),
(".users[0].user.client-key-data", "client.key"),
]:
r = subprocess.run(["yq", query, str(lima_kube)],
capture_output=True, text=True)
b64 = r.stdout.strip() if r.returncode == 0 else ""
(tmp / filename).write_bytes(base64.b64decode(b64))
subprocess.run(
["kubectl", "config", "set-cluster", LIMA_VM,
"--server=https://127.0.0.1:6443",
f"--certificate-authority={tmp}/ca.crt", "--embed-certs=true"],
check=True,
)
subprocess.run(
["kubectl", "config", "set-credentials", f"{LIMA_VM}-admin",
f"--client-certificate={tmp}/client.crt",
f"--client-key={tmp}/client.key", "--embed-certs=true"],
check=True,
)
subprocess.run(
["kubectl", "config", "set-context", LIMA_VM,
f"--cluster={LIMA_VM}", f"--user={LIMA_VM}-admin"],
check=True,
)
finally:
shutil.rmtree(tmp, ignore_errors=True)
ok("Context 'sunbeam' ready.")
# ---------------------------------------------------------------------------
# Traefik
# ---------------------------------------------------------------------------
def disable_traefik():
step("Traefik...")
if kube_ok("get", "helmchart", "traefik", "-n", "kube-system"):
ok("Removing (replaced by Pingora)...")
kube("delete", "helmchart", "traefik", "traefik-crd",
"-n", "kube-system", check=False)
subprocess.run(
["limactl", "shell", LIMA_VM,
"sudo", "rm", "-f",
"/var/lib/rancher/k3s/server/manifests/traefik.yaml"],
capture_output=True,
)
# Write k3s config so Traefik can never return after a k3s restart.
subprocess.run(
["limactl", "shell", LIMA_VM, "sudo", "tee",
"/etc/rancher/k3s/config.yaml"],
input="disable:\n - traefik\n",
text=True,
capture_output=True,
)
ok("Done.")
# ---------------------------------------------------------------------------
# cert-manager
# ---------------------------------------------------------------------------
def ensure_cert_manager():
step("cert-manager...")
if ns_exists("cert-manager"):
ok("Already installed.")
return
ok("Installing...")
kube("apply", "-f",
"https://github.com/cert-manager/cert-manager/releases/download/v1.17.0/cert-manager.yaml")
for dep in ["cert-manager", "cert-manager-webhook", "cert-manager-cainjector"]:
kube("rollout", "status", f"deployment/{dep}",
"-n", "cert-manager", "--timeout=120s")
ok("Installed.")
# ---------------------------------------------------------------------------
# Linkerd
# ---------------------------------------------------------------------------
def ensure_linkerd():
step("Linkerd...")
if ns_exists("linkerd"):
ok("Already installed.")
return
ok("Installing Gateway API CRDs...")
kube("apply", "--server-side", "-f",
"https://github.com/kubernetes-sigs/gateway-api/releases/download/v1.4.0/standard-install.yaml")
ok("Installing Linkerd CRDs...")
r = subprocess.run(["linkerd", "install", "--crds"],
capture_output=True, text=True)
crds = r.stdout.strip() if r.returncode == 0 else ""
kube_apply(crds)
ok("Installing Linkerd control plane...")
r = subprocess.run(["linkerd", "install"],
capture_output=True, text=True)
cp = r.stdout.strip() if r.returncode == 0 else ""
kube_apply(cp)
for dep in ["linkerd-identity", "linkerd-destination", "linkerd-proxy-injector"]:
kube("rollout", "status", f"deployment/{dep}",
"-n", "linkerd", "--timeout=120s")
ok("Installed.")
# ---------------------------------------------------------------------------
# TLS certificate
# ---------------------------------------------------------------------------
def ensure_tls_cert(domain: str | None = None) -> str:
step("TLS certificate...")
ip = get_lima_ip()
if domain is None:
domain = f"{ip}.sslip.io"
cert = SECRETS_DIR / "tls.crt"
if cert.exists():
ok(f"Cert exists. Domain: {domain}")
return domain
ok(f"Generating wildcard cert for *.{domain}...")
SECRETS_DIR.mkdir(parents=True, exist_ok=True)
subprocess.run(["mkcert", f"*.{domain}"], cwd=SECRETS_DIR, check=True)
for src, dst in [
(f"_wildcard.{domain}.pem", "tls.crt"),
(f"_wildcard.{domain}-key.pem", "tls.key"),
]:
(SECRETS_DIR / src).rename(SECRETS_DIR / dst)
ok(f"Cert generated. Domain: {domain}")
return domain
# ---------------------------------------------------------------------------
# TLS secret
# ---------------------------------------------------------------------------
def ensure_tls_secret(domain: str):
step("TLS secret...")
ensure_ns("ingress")
manifest = kube_out(
"create", "secret", "tls", "pingora-tls",
f"--cert={SECRETS_DIR}/tls.crt",
f"--key={SECRETS_DIR}/tls.key",
"-n", "ingress",
"--dry-run=client", "-o=yaml",
)
if manifest:
kube_apply(manifest)
ok("Done.")
# ---------------------------------------------------------------------------
# Wait for core
# ---------------------------------------------------------------------------
def wait_for_core():
step("Waiting for core services...")
for ns, dep in [("data", "valkey"), ("ory", "kratos"), ("ory", "hydra")]:
kube("rollout", "status", f"deployment/{dep}",
"-n", ns, "--timeout=120s", check=False)
ok("Core services ready.")
# ---------------------------------------------------------------------------
# Print URLs
# ---------------------------------------------------------------------------
def print_urls(domain: str, gitea_admin_pass: str = ""):
print(f"\n{'' * 60}")
print(f" Stack is up. Domain: {domain}")
print(f"{'' * 60}")
for name, url in [
("Auth", f"https://auth.{domain}/"),
("Docs", f"https://docs.{domain}/"),
("Meet", f"https://meet.{domain}/"),
("Drive", f"https://drive.{domain}/"),
("Chat", f"https://chat.{domain}/"),
("Mail", f"https://mail.{domain}/"),
("People", f"https://people.{domain}/"),
("Gitea", f"https://src.{domain}/ ({GITEA_ADMIN_USER} / {gitea_admin_pass})"),
]:
print(f" {name:<10} {url}")
print()
print(" OpenBao UI:")
print(f" kubectl --context=sunbeam -n data port-forward svc/openbao 8200:8200")
print(f" http://localhost:8200")
token_cmd = "kubectl --context=sunbeam -n data get secret openbao-keys -o jsonpath='{.data.root-token}' | base64 -d"
print(f" token: {token_cmd}")
print(f"{'' * 60}\n")
# ---------------------------------------------------------------------------
# Commands
# ---------------------------------------------------------------------------
def cmd_up():
from sunbeam.manifests import cmd_apply
from sunbeam.secrets import cmd_seed
from sunbeam.gitea import cmd_bootstrap, setup_lima_vm_registry
from sunbeam.images import cmd_mirror
ensure_lima_vm()
merge_kubeconfig()
disable_traefik()
ensure_cert_manager()
ensure_linkerd()
domain = ensure_tls_cert()
ensure_tls_secret(domain)
cmd_apply()
creds = cmd_seed()
admin_pass = creds.get("gitea-admin-password", "") if isinstance(creds, dict) else ""
setup_lima_vm_registry(domain, admin_pass)
cmd_bootstrap()
cmd_mirror()
wait_for_core()
print_urls(domain, admin_pass)
def cmd_down():
subprocess.run(["limactl", "stop", LIMA_VM])

205
sunbeam/gitea.py Normal file
View File

@@ -0,0 +1,205 @@
"""Gitea bootstrap — registry trust, admin setup, org creation."""
import base64
import json
import subprocess
import time
from sunbeam.kube import kube, kube_out
from sunbeam.output import step, ok, warn
LIMA_VM = "sunbeam"
GITEA_ADMIN_USER = "gitea_admin"
GITEA_ADMIN_EMAIL = "gitea@local.domain"
K8S_CTX = ["--context=sunbeam"]
def _capture_out(cmd, *, default=""):
r = subprocess.run(cmd, capture_output=True, text=True)
return r.stdout.strip() if r.returncode == 0 else default
def _run(cmd, *, check=True, input=None, capture=False, cwd=None):
text = not isinstance(input, bytes)
return subprocess.run(cmd, check=check, text=text, input=input,
capture_output=capture, cwd=cwd)
def _kube_ok(*args):
return subprocess.run(
["kubectl", *K8S_CTX, *args], capture_output=True
).returncode == 0
def setup_lima_vm_registry(domain: str, gitea_admin_pass: str = ""):
"""Install mkcert root CA in the Lima VM and configure k3s to auth with Gitea.
Restarts k3s if either configuration changes so pods don't fight TLS errors
or get unauthenticated pulls on the first deploy.
"""
step("Configuring Lima VM registry trust...")
changed = False
# Install mkcert root CA so containerd trusts our wildcard TLS cert
caroot = _capture_out(["mkcert", "-CAROOT"])
if caroot:
from pathlib import Path
ca_pem = Path(caroot) / "rootCA.pem"
if ca_pem.exists():
already = subprocess.run(
["limactl", "shell", LIMA_VM, "test", "-f",
"/usr/local/share/ca-certificates/mkcert-root.crt"],
capture_output=True,
).returncode == 0
if not already:
_run(["limactl", "copy", str(ca_pem),
f"{LIMA_VM}:/tmp/mkcert-root.pem"])
_run(["limactl", "shell", LIMA_VM, "sudo", "cp",
"/tmp/mkcert-root.pem",
"/usr/local/share/ca-certificates/mkcert-root.crt"])
_run(["limactl", "shell", LIMA_VM, "sudo",
"update-ca-certificates"])
ok("mkcert CA installed in VM.")
changed = True
else:
ok("mkcert CA already installed.")
# Write k3s registries.yaml (auth for Gitea container registry)
registry_host = f"src.{domain}"
want = (
f'configs:\n'
f' "{registry_host}":\n'
f' auth:\n'
f' username: "{GITEA_ADMIN_USER}"\n'
f' password: "{gitea_admin_pass}"\n'
)
existing = _capture_out(["limactl", "shell", LIMA_VM,
"sudo", "cat",
"/etc/rancher/k3s/registries.yaml"])
if existing.strip() != want.strip():
subprocess.run(
["limactl", "shell", LIMA_VM, "sudo", "tee",
"/etc/rancher/k3s/registries.yaml"],
input=want, text=True, capture_output=True,
)
ok(f"Registry config written for {registry_host}.")
changed = True
else:
ok("Registry config up to date.")
if changed:
ok("Restarting k3s to apply changes...")
subprocess.run(
["limactl", "shell", LIMA_VM, "sudo", "systemctl", "restart",
"k3s"],
capture_output=True,
)
# Wait for API server to come back
for _ in range(40):
if _kube_ok("get", "nodes"):
break
time.sleep(3)
# Extra settle time -- pods take a moment to start terminating/restarting
time.sleep(15)
ok("k3s restarted.")
def cmd_bootstrap(domain: str = "", gitea_admin_pass: str = ""):
"""Ensure Gitea admin has a known password and create the studio/internal orgs."""
if not domain:
from sunbeam.kube import get_lima_ip
ip = get_lima_ip()
domain = f"{ip}.sslip.io"
if not gitea_admin_pass:
b64 = kube_out("-n", "devtools", "get", "secret",
"gitea-admin-credentials",
"-o=jsonpath={.data.password}")
if b64:
gitea_admin_pass = base64.b64decode(b64).decode()
step("Bootstrapping Gitea...")
# Wait for a Running + Ready Gitea pod
pod = ""
for _ in range(60):
candidate = kube_out(
"-n", "devtools", "get", "pods",
"-l=app.kubernetes.io/name=gitea",
"--field-selector=status.phase=Running",
"-o=jsonpath={.items[0].metadata.name}",
)
if candidate:
ready = kube_out("-n", "devtools", "get", "pod", candidate,
"-o=jsonpath={.status.containerStatuses[0].ready}")
if ready == "true":
pod = candidate
break
time.sleep(3)
if not pod:
warn("Gitea pod not ready after 3 min -- skipping bootstrap.")
return
def gitea_exec(*args):
return subprocess.run(
["kubectl", *K8S_CTX, "-n", "devtools", "exec", pod, "-c",
"gitea", "--"] + list(args),
capture_output=True, text=True,
)
# Ensure admin has the generated password
r = gitea_exec("gitea", "admin", "user", "change-password",
"--username", GITEA_ADMIN_USER, "--password",
gitea_admin_pass)
if r.returncode == 0 or "password" in (r.stdout + r.stderr).lower():
ok(f"Admin '{GITEA_ADMIN_USER}' password set.")
else:
warn(f"change-password: {r.stderr.strip()}")
# Clear must_change_password via Postgres
pg_pod = kube_out("-n", "data", "get", "pods",
"-l=cnpg.io/cluster=postgres,role=primary",
"-o=jsonpath={.items[0].metadata.name}")
if pg_pod:
kube("exec", "-n", "data", pg_pod, "-c", "postgres", "--",
"psql", "-U", "postgres", "-d", "gitea_db", "-c",
f'UPDATE "user" SET must_change_password = false'
f" WHERE lower_name = '{GITEA_ADMIN_USER.lower()}';",
check=False)
ok("Cleared must-change-password flag.")
else:
warn("Postgres pod not found -- must-change-password may block API "
"calls.")
def api(method, path, data=None):
args = [
"curl", "-s", "-X", method,
f"http://localhost:3000/api/v1{path}",
"-H", "Content-Type: application/json",
"-u", f"{GITEA_ADMIN_USER}:{gitea_admin_pass}",
]
if data:
args += ["-d", json.dumps(data)]
r = gitea_exec(*args)
try:
return json.loads(r.stdout)
except json.JSONDecodeError:
return {}
for org_name, visibility, desc in [
("studio", "public", "Public source code"),
("internal", "private", "Internal tools and services"),
]:
result = api("POST", "/orgs", {
"username": org_name,
"visibility": visibility,
"description": desc,
})
if "id" in result:
ok(f"Created org '{org_name}'.")
elif "already" in result.get("message", "").lower():
ok(f"Org '{org_name}' already exists.")
else:
warn(f"Org '{org_name}': {result.get('message', result)}")
ok(f"Gitea ready -- https://src.{domain} ({GITEA_ADMIN_USER} / <from "
f"openbao>)")

326
sunbeam/images.py Normal file
View File

@@ -0,0 +1,326 @@
"""Image mirroring — patch amd64-only images + push to Gitea registry."""
import base64
import shutil
import subprocess
import sys
from pathlib import Path
from sunbeam.kube import kube, kube_out, get_lima_ip
from sunbeam.output import step, ok, warn, die
LIMA_VM = "sunbeam"
LIMA_DOCKER_VM = "docker"
GITEA_ADMIN_USER = "gitea_admin"
MANAGED_NS = ["data", "devtools", "ingress", "lasuite", "media", "ory", "storage",
"vault-secrets-operator"]
AMD64_ONLY_IMAGES = [
("docker.io/lasuite/people-backend:latest", "studio", "people-backend", "latest"),
("docker.io/lasuite/people-frontend:latest", "studio", "people-frontend", "latest"),
]
_MIRROR_SCRIPT_BODY = r'''
import json, hashlib, io, tarfile, os, subprocess, urllib.request
CONTENT_STORE = (
"/var/lib/rancher/k3s/agent/containerd"
"/io.containerd.content.v1.content/blobs/sha256"
)
def blob_path(h):
return os.path.join(CONTENT_STORE, h)
def blob_exists(h):
return os.path.exists(blob_path(h))
def read_blob(h):
with open(blob_path(h), "rb") as f:
return f.read()
def add_tar_entry(tar, name, data):
info = tarfile.TarInfo(name=name)
info.size = len(data)
tar.addfile(info, io.BytesIO(data))
def get_image_digest(ref):
r = subprocess.run(
["ctr", "-n", "k8s.io", "images", "ls", "name==" + ref],
capture_output=True, text=True,
)
for line in r.stdout.splitlines():
if ref in line:
for part in line.split():
if part.startswith("sha256:"):
return part[7:]
return None
def fetch_index_from_registry(repo, tag):
url = (
"https://auth.docker.io/token"
f"?service=registry.docker.io&scope=repository:{repo}:pull"
)
with urllib.request.urlopen(url) as resp:
token = json.loads(resp.read())["token"]
accept = ",".join([
"application/vnd.oci.image.index.v1+json",
"application/vnd.docker.distribution.manifest.list.v2+json",
])
req = urllib.request.Request(
f"https://registry-1.docker.io/v2/{repo}/manifests/{tag}",
headers={"Authorization": f"Bearer {token}", "Accept": accept},
)
with urllib.request.urlopen(req) as resp:
return json.loads(resp.read())
def make_oci_tar(ref, new_index_bytes, amd64_manifest_bytes):
ix_hex = hashlib.sha256(new_index_bytes).hexdigest()
amd64_hex = json.loads(new_index_bytes)["manifests"][0]["digest"].replace("sha256:", "")
layout = json.dumps({"imageLayoutVersion": "1.0.0"}).encode()
top = json.dumps({
"schemaVersion": 2,
"mediaType": "application/vnd.oci.image.index.v1+json",
"manifests": [{
"mediaType": "application/vnd.oci.image.index.v1+json",
"digest": f"sha256:{ix_hex}",
"size": len(new_index_bytes),
"annotations": {"org.opencontainers.image.ref.name": ref},
}],
}, separators=(",", ":")).encode()
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode="w:") as tar:
add_tar_entry(tar, "oci-layout", layout)
add_tar_entry(tar, "index.json", top)
add_tar_entry(tar, f"blobs/sha256/{ix_hex}", new_index_bytes)
add_tar_entry(tar, f"blobs/sha256/{amd64_hex}", amd64_manifest_bytes)
return buf.getvalue()
def import_ref(ref, tar_bytes):
subprocess.run(["ctr", "-n", "k8s.io", "images", "rm", ref], capture_output=True)
r = subprocess.run(
["ctr", "-n", "k8s.io", "images", "import", "--all-platforms", "-"],
input=tar_bytes, capture_output=True,
)
if r.returncode:
print(f" import failed: {r.stderr.decode()}")
return False
subprocess.run(
["ctr", "-n", "k8s.io", "images", "label", ref, "io.cri-containerd.image=managed"],
capture_output=True,
)
return True
def process(src, tgt, user, pwd):
print(f" {src}")
# Pull by tag — may fail on arm64-only images but still puts the index blob in the store
subprocess.run(["ctr", "-n", "k8s.io", "images", "pull", src], capture_output=True)
ix_hex = get_image_digest(src)
if ix_hex and blob_exists(ix_hex):
index = json.loads(read_blob(ix_hex))
else:
print(" index not in content store — fetching from docker.io...")
no_prefix = src.replace("docker.io/", "")
parts = no_prefix.split(":", 1)
repo, tag = parts[0], (parts[1] if len(parts) > 1 else "latest")
index = fetch_index_from_registry(repo, tag)
amd64 = next(
(m for m in index.get("manifests", [])
if m.get("platform", {}).get("architecture") == "amd64"
and m.get("platform", {}).get("os") == "linux"),
None,
)
if not amd64:
print(" skip: no linux/amd64 entry in index")
return
amd64_hex = amd64["digest"].replace("sha256:", "")
if not blob_exists(amd64_hex):
print(" pulling amd64 manifest + layers by digest...")
repo_base = src.rsplit(":", 1)[0]
subprocess.run(
["ctr", "-n", "k8s.io", "images", "pull",
f"{repo_base}@sha256:{amd64_hex}"],
capture_output=True,
)
if not blob_exists(amd64_hex):
print(" failed: amd64 manifest blob missing after pull")
return
amd64_bytes = read_blob(amd64_hex)
# Patched index: keep amd64 + add arm64 alias pointing at same manifest
arm64 = {
"mediaType": amd64["mediaType"],
"digest": amd64["digest"],
"size": amd64["size"],
"platform": {"architecture": "arm64", "os": "linux"},
}
new_index = dict(index)
new_index["manifests"] = [amd64, arm64]
new_index_bytes = json.dumps(new_index, separators=(",", ":")).encode()
# Import with Gitea target name
if not import_ref(tgt, make_oci_tar(tgt, new_index_bytes, amd64_bytes)):
return
# Also patch the original source ref so pods still using docker.io name work
import_ref(src, make_oci_tar(src, new_index_bytes, amd64_bytes))
# Push to Gitea registry
print(f" pushing to registry...")
r = subprocess.run(
["ctr", "-n", "k8s.io", "images", "push",
"--user", f"{user}:{pwd}", tgt],
capture_output=True, text=True,
)
status = "OK" if r.returncode == 0 else f"PUSH FAILED: {r.stderr.strip()}"
print(f" {status}")
for _src, _tgt in TARGETS:
process(_src, _tgt, USER, PASS)
'''
def _capture_out(cmd, *, default=""):
r = subprocess.run(cmd, capture_output=True, text=True)
return r.stdout.strip() if r.returncode == 0 else default
def _run(cmd, *, check=True, input=None, capture=False, cwd=None):
text = not isinstance(input, bytes)
return subprocess.run(cmd, check=check, text=text, input=input,
capture_output=capture, cwd=cwd)
def cmd_mirror(domain: str = "", gitea_admin_pass: str = ""):
"""Patch amd64-only images with an arm64 alias and push to Gitea registry."""
if not domain:
ip = get_lima_ip()
domain = f"{ip}.sslip.io"
if not gitea_admin_pass:
b64 = kube_out("-n", "devtools", "get", "secret",
"gitea-admin-credentials", "-o=jsonpath={.data.password}")
if b64:
gitea_admin_pass = base64.b64decode(b64).decode()
step("Mirroring amd64-only images to Gitea registry...")
registry = f"src.{domain}"
targets = [
(src, f"{registry}/{org}/{repo}:{tag}")
for src, org, repo, tag in AMD64_ONLY_IMAGES
]
header = (
f"TARGETS = {repr(targets)}\n"
f"USER = {repr(GITEA_ADMIN_USER)}\n"
f"PASS = {repr(gitea_admin_pass)}\n"
)
script = header + _MIRROR_SCRIPT_BODY
_run(["limactl", "shell", LIMA_VM, "sudo", "python3", "-c", script])
# Delete any pods stuck in image-pull error states
ok("Clearing image-pull-error pods...")
error_reasons = {"ImagePullBackOff", "ErrImagePull", "ErrImageNeverPull"}
for ns in MANAGED_NS:
pods_raw = kube_out(
"-n", ns, "get", "pods",
"-o=jsonpath={range .items[*]}"
"{.metadata.name}:{.status.containerStatuses[0].state.waiting.reason}\\n"
"{end}",
)
for line in pods_raw.splitlines():
if not line:
continue
parts = line.split(":", 1)
if len(parts) == 2 and parts[1] in error_reasons:
kube("delete", "pod", parts[0], "-n", ns,
"--ignore-not-found", check=False)
ok("Done.")
def _trust_registry_in_docker_vm(registry: str):
"""Install the mkcert CA into the Lima Docker VM's per-registry cert dir.
The Lima Docker VM runs rootless Docker, which reads custom CA certs from
~/.config/docker/certs.d/<registry>/ca.crt (not /etc/docker/certs.d/).
No daemon restart required -- Docker reads the file per-connection.
"""
caroot = _capture_out(["mkcert", "-CAROOT"])
if not caroot:
warn("mkcert -CAROOT returned nothing -- skipping Docker CA install.")
return
ca_pem = Path(caroot) / "rootCA.pem"
if not ca_pem.exists():
warn(f"mkcert CA not found at {ca_pem} -- skipping Docker CA install.")
return
_run(["limactl", "copy", str(ca_pem), f"{LIMA_DOCKER_VM}:/tmp/registry-ca.pem"])
_run(["limactl", "shell", LIMA_DOCKER_VM, "--", "sh", "-c",
f"mkdir -p ~/.config/docker/certs.d/{registry} && "
f"cp /tmp/registry-ca.pem ~/.config/docker/certs.d/{registry}/ca.crt"])
ok(f"mkcert CA installed in Docker VM for {registry}.")
def cmd_build(what: str):
"""Build and push an image. Currently only supports 'proxy'."""
if what != "proxy":
die(f"Unknown build target: {what}")
ip = get_lima_ip()
domain = f"{ip}.sslip.io"
b64 = kube_out("-n", "devtools", "get", "secret",
"gitea-admin-credentials", "-o=jsonpath={.data.password}")
if not b64:
die("gitea-admin-credentials secret not found -- run seed first.")
admin_pass = base64.b64decode(b64).decode()
if not shutil.which("docker"):
die("docker not found -- is the Lima docker VM running?")
# Proxy source lives adjacent to the infrastructure repo
proxy_dir = Path(__file__).resolve().parents[2] / "proxy"
if not proxy_dir.is_dir():
die(f"Proxy source not found at {proxy_dir}")
registry = f"src.{domain}"
image = f"{registry}/studio/sunbeam-proxy:latest"
step(f"Building sunbeam-proxy -> {image} ...")
# Ensure the Lima Docker VM trusts our mkcert CA for this registry.
_trust_registry_in_docker_vm(registry)
# Authenticate Docker with Gitea before the build so --push succeeds.
ok("Logging in to Gitea registry...")
r = subprocess.run(
["docker", "login", registry,
"--username", GITEA_ADMIN_USER, "--password-stdin"],
input=admin_pass, text=True, capture_output=True,
)
if r.returncode != 0:
die(f"docker login failed:\n{r.stderr.strip()}")
ok("Building image (linux/arm64, push)...")
_run(["docker", "buildx", "build",
"--platform", "linux/arm64",
"--push",
"-t", image,
str(proxy_dir)])
ok(f"Pushed {image}")
# Apply manifests so the Deployment spec reflects the Gitea image ref.
from sunbeam.manifests import cmd_apply
cmd_apply()
# Roll the pingora pod -- imagePullPolicy: Always ensures it pulls fresh.
ok("Rolling pingora deployment...")
kube("rollout", "restart", "deployment/pingora", "-n", "ingress")
kube("rollout", "status", "deployment/pingora", "-n", "ingress",
"--timeout=120s")
ok("Pingora redeployed.")

105
sunbeam/kube.py Normal file
View File

@@ -0,0 +1,105 @@
"""Kubernetes interface — kubectl/kustomize wrappers, domain substitution, target parsing."""
import subprocess
from pathlib import Path
from sunbeam.tools import run_tool, CACHE_DIR
from sunbeam.output import die
def parse_target(s: str | None) -> tuple[str | None, str | None]:
"""Parse 'ns/name' -> ('ns', 'name'), 'ns' -> ('ns', None), None -> (None, None)."""
if s is None:
return (None, None)
parts = s.split("/")
if len(parts) == 1:
return (parts[0], None)
if len(parts) == 2:
return (parts[0], parts[1])
raise ValueError(f"Invalid target {s!r}: expected 'namespace' or 'namespace/name'")
def domain_replace(text: str, domain: str) -> str:
"""Replace all occurrences of DOMAIN_SUFFIX with domain."""
return text.replace("DOMAIN_SUFFIX", domain)
def get_lima_ip() -> str:
"""Get the socket_vmnet IP of the Lima sunbeam VM (192.168.105.x)."""
r = subprocess.run(
["limactl", "shell", "sunbeam", "ip", "-4", "addr", "show", "eth1"],
capture_output=True, text=True,
)
for line in r.stdout.splitlines():
if "inet " in line:
return line.strip().split()[1].split("/")[0]
# fallback: second IP from hostname -I
r2 = subprocess.run(
["limactl", "shell", "sunbeam", "hostname", "-I"],
capture_output=True, text=True,
)
ips = r2.stdout.strip().split()
return ips[-1] if len(ips) >= 2 else (ips[0] if ips else "")
def kube(*args, input=None, check=True) -> subprocess.CompletedProcess:
"""Run kubectl with --context=sunbeam."""
text = not isinstance(input, bytes)
return run_tool("kubectl", "--context=sunbeam", *args,
input=input, text=text, check=check,
capture_output=False)
def kube_out(*args) -> str:
"""Run kubectl and return stdout (empty string on failure)."""
r = run_tool("kubectl", "--context=sunbeam", *args,
capture_output=True, text=True, check=False)
return r.stdout.strip() if r.returncode == 0 else ""
def kube_ok(*args) -> bool:
"""Return True if kubectl command exits 0."""
r = run_tool("kubectl", "--context=sunbeam", *args,
capture_output=True, check=False)
return r.returncode == 0
def kube_apply(manifest: str, *, server_side: bool = True) -> None:
"""Pipe manifest YAML to kubectl apply."""
args = ["apply", "-f", "-"]
if server_side:
args += ["--server-side", "--force-conflicts"]
kube(*args, input=manifest)
def ns_exists(ns: str) -> bool:
return kube_ok("get", "namespace", ns)
def ensure_ns(ns: str) -> None:
manifest = kube_out("create", "namespace", ns, "--dry-run=client", "-o=yaml")
if manifest:
kube_apply(manifest)
def create_secret(ns: str, name: str, **literals) -> None:
"""Create or update a K8s generic secret idempotently via server-side apply."""
args = ["create", "secret", "generic", name, f"-n={ns}"]
for k, v in literals.items():
args.append(f"--from-literal={k}={v}")
args += ["--dry-run=client", "-o=yaml"]
manifest = kube_out(*args)
if manifest:
kube("apply", "--server-side", "--force-conflicts",
"--field-manager=sunbeam", "-f", "-", input=manifest)
def kustomize_build(overlay: Path, domain: str) -> str:
"""Run kustomize build --enable-helm and apply domain substitution."""
r = run_tool(
"kustomize", "build", "--enable-helm", str(overlay),
capture_output=True, text=True, check=True,
)
text = r.stdout
text = domain_replace(text, domain)
text = text.replace("\n annotations: null", "")
return text

58
sunbeam/manifests.py Normal file
View File

@@ -0,0 +1,58 @@
"""Manifest build + apply — kustomize overlay with domain substitution."""
from pathlib import Path
from sunbeam.kube import kube, kube_out, kube_ok, kube_apply, kustomize_build, get_lima_ip
from sunbeam.output import step, ok, warn
REPO_ROOT = Path(__file__).parents[3] / "infrastructure"
MANAGED_NS = ["data", "devtools", "ingress", "lasuite", "media", "ory", "storage",
"vault-secrets-operator"]
def pre_apply_cleanup():
"""Delete immutable resources that must be re-created on each apply.
Also prunes VaultStaticSecrets that share a name with a VaultDynamicSecret --
kubectl apply doesn't delete the old resource when a manifest switches kinds,
and VSO refuses to overwrite a secret owned by a different resource type.
"""
ok("Cleaning up immutable Jobs and test Pods...")
for ns in MANAGED_NS:
kube("delete", "jobs", "--all", "-n", ns, "--ignore-not-found", check=False)
# Query all pods (no phase filter) — CrashLoopBackOff pods report phase=Running
# so filtering on phase!=Running would silently skip them.
pods_out = kube_out("get", "pods", "-n", ns,
"-o=jsonpath={.items[*].metadata.name}")
for pod in pods_out.split():
if pod.endswith(("-test-connection", "-server-test", "-test")):
kube("delete", "pod", pod, "-n", ns, "--ignore-not-found", check=False)
# Prune VaultStaticSecrets that were replaced by VaultDynamicSecrets.
# When a manifest transitions a resource from VSS -> VDS, apply won't delete
# the old VSS; it just creates the new VDS alongside it. VSO then errors
# "not the owner" because the K8s secret's ownerRef still points to the VSS.
ok("Pruning stale VaultStaticSecrets superseded by VaultDynamicSecrets...")
for ns in MANAGED_NS:
vss_names = set(kube_out(
"get", "vaultstaticsecret", "-n", ns,
"-o=jsonpath={.items[*].metadata.name}", "--ignore-not-found",
).split())
vds_names = set(kube_out(
"get", "vaultdynamicsecret", "-n", ns,
"-o=jsonpath={.items[*].metadata.name}", "--ignore-not-found",
).split())
for stale in vss_names & vds_names:
ok(f" deleting stale VaultStaticSecret {ns}/{stale}")
kube("delete", "vaultstaticsecret", stale, "-n", ns,
"--ignore-not-found", check=False)
def cmd_apply():
"""Get Lima IP, build domain, kustomize_build, kube_apply."""
ip = get_lima_ip()
domain = f"{ip}.sslip.io"
step(f"Applying manifests (domain: {domain})...")
pre_apply_cleanup()
manifests = kustomize_build(REPO_ROOT / "overlays" / "local", domain)
kube("apply", "--server-side", "--force-conflicts", "-f", "-", input=manifests)
ok("Applied.")

47
sunbeam/output.py Normal file
View File

@@ -0,0 +1,47 @@
"""Output helpers — step/ok/warn/die + aligned text table."""
import sys
def step(msg: str) -> None:
"""Print a step header."""
print(f"\n==> {msg}", flush=True)
def ok(msg: str) -> None:
"""Print a success/info line."""
print(f" {msg}", flush=True)
def warn(msg: str) -> None:
"""Print a warning to stderr."""
print(f" WARN: {msg}", file=sys.stderr, flush=True)
def die(msg: str) -> None:
"""Print an error to stderr and exit."""
print(f"\nERROR: {msg}", file=sys.stderr, flush=True)
sys.exit(1)
def table(rows: list[list[str]], headers: list[str]) -> str:
"""Return an aligned text table. Columns padded to max width."""
if not headers:
return ""
# Compute column widths
col_widths = [len(h) for h in headers]
for row in rows:
for i, cell in enumerate(row):
if i < len(col_widths):
col_widths[i] = max(col_widths[i], len(cell))
# Format header
header_line = " ".join(h.ljust(col_widths[i]) for i, h in enumerate(headers))
separator = " ".join("-" * w for w in col_widths)
lines = [header_line, separator]
# Format rows
for row in rows:
cells = []
for i in range(len(headers)):
val = row[i] if i < len(row) else ""
cells.append(val.ljust(col_widths[i]))
lines.append(" ".join(cells))
return "\n".join(lines)

602
sunbeam/secrets.py Normal file
View File

@@ -0,0 +1,602 @@
"""Secrets management — OpenBao KV seeding, DB engine config, VSO verification."""
import base64
import json
import secrets as _secrets
import subprocess
import time
from pathlib import Path
from sunbeam.kube import kube, kube_out, kube_ok, kube_apply, ensure_ns, create_secret
from sunbeam.output import step, ok, warn, die
LIMA_VM = "sunbeam"
GITEA_ADMIN_USER = "gitea_admin"
PG_USERS = [
"kratos", "hydra", "gitea", "hive",
"docs", "meet", "drive", "messages", "conversations",
"people", "find",
]
K8S_CTX = ["--context=sunbeam"]
# ---------------------------------------------------------------------------
# OpenBao KV seeding
# ---------------------------------------------------------------------------
def _seed_openbao() -> dict:
"""Initialize/unseal OpenBao, generate/read credentials idempotently, configure VSO auth.
Returns a dict of all generated credentials. Values are read from existing
OpenBao KV entries when present -- re-running never rotates credentials.
"""
ob_pod = kube_out(
"-n", "data", "get", "pods",
"-l=app.kubernetes.io/name=openbao,component=server",
"-o=jsonpath={.items[0].metadata.name}",
)
if not ob_pod:
ok("OpenBao pod not found -- skipping.")
return {}
ok(f"OpenBao ({ob_pod})...")
kube("wait", "-n", "data", f"pod/{ob_pod}",
"--for=jsonpath={.status.phase}=Running", "--timeout=120s", check=False)
def bao(cmd):
r = subprocess.run(
["kubectl", *K8S_CTX, "-n", "data", "exec", ob_pod, "-c", "openbao",
"--", "sh", "-c", cmd],
capture_output=True, text=True,
)
return r.stdout.strip()
def bao_status():
out = bao("bao status -format=json 2>/dev/null || echo '{}'")
try:
return json.loads(out)
except json.JSONDecodeError:
return {}
unseal_key = ""
root_token = ""
status = bao_status()
already_initialized = status.get("initialized", False)
if not already_initialized:
existing_key = kube_out("-n", "data", "get", "secret", "openbao-keys",
"-o=jsonpath={.data.key}")
already_initialized = bool(existing_key)
if not already_initialized:
ok("Initializing OpenBao...")
init_json = bao("bao operator init -key-shares=1 -key-threshold=1 -format=json 2>/dev/null || echo '{}'")
try:
init = json.loads(init_json)
unseal_key = init["unseal_keys_b64"][0]
root_token = init["root_token"]
create_secret("data", "openbao-keys",
key=unseal_key, **{"root-token": root_token})
ok("Initialized -- keys stored in secret/openbao-keys.")
except (json.JSONDecodeError, KeyError):
warn("Init failed -- resetting OpenBao storage for local dev...")
kube("delete", "pvc", "data-openbao-0", "-n", "data", "--ignore-not-found", check=False)
kube("delete", "pod", ob_pod, "-n", "data", "--ignore-not-found", check=False)
warn("OpenBao storage reset. Run --seed again after the pod restarts.")
return {}
else:
ok("Already initialized.")
existing_key = kube_out("-n", "data", "get", "secret", "openbao-keys",
"-o=jsonpath={.data.key}")
if existing_key:
unseal_key = base64.b64decode(existing_key).decode()
root_token_enc = kube_out("-n", "data", "get", "secret", "openbao-keys",
"-o=jsonpath={.data.root-token}")
if root_token_enc:
root_token = base64.b64decode(root_token_enc).decode()
if bao_status().get("sealed", False) and unseal_key:
ok("Unsealing...")
bao(f"bao operator unseal '{unseal_key}' 2>/dev/null")
if not root_token:
warn("No root token available -- skipping KV seeding.")
return {}
# Read-or-generate helper: preserves existing KV values; only generates missing ones.
def get_or_create(path, **fields):
raw = bao(
f"BAO_ADDR=http://127.0.0.1:8200 BAO_TOKEN='{root_token}' "
f"bao kv get -format=json secret/{path} 2>/dev/null || echo '{{}}'"
)
existing = {}
try:
existing = json.loads(raw).get("data", {}).get("data", {})
except (json.JSONDecodeError, AttributeError):
pass
result = {}
for key, default_fn in fields.items():
result[key] = existing.get(key) or default_fn()
return result
def rand():
return _secrets.token_urlsafe(32)
ok("Seeding KV (idempotent -- existing values preserved)...")
bao(f"BAO_ADDR=http://127.0.0.1:8200 BAO_TOKEN='{root_token}' "
f"bao secrets enable -path=secret -version=2 kv 2>/dev/null || true")
# DB passwords removed -- OpenBao database secrets engine manages them via static roles.
hydra = get_or_create("hydra",
**{"system-secret": rand,
"cookie-secret": rand,
"pairwise-salt": rand})
SMTP_URI = "smtp://postfix.lasuite.svc.cluster.local:25/?skip_ssl_verify=true"
kratos = get_or_create("kratos",
**{"secrets-default": rand,
"secrets-cookie": rand,
"smtp-connection-uri": lambda: SMTP_URI})
seaweedfs = get_or_create("seaweedfs",
**{"access-key": rand, "secret-key": rand})
gitea = get_or_create("gitea",
**{"admin-username": lambda: GITEA_ADMIN_USER,
"admin-password": rand})
hive = get_or_create("hive",
**{"oidc-client-id": lambda: "hive-local",
"oidc-client-secret": rand})
livekit = get_or_create("livekit",
**{"api-key": lambda: "devkey",
"api-secret": rand})
people = get_or_create("people",
**{"django-secret-key": rand})
login_ui = get_or_create("login-ui",
**{"cookie-secret": rand,
"csrf-cookie-secret": rand})
# Write all secrets to KV (idempotent -- puts same values back)
bao(f"BAO_ADDR=http://127.0.0.1:8200 BAO_TOKEN='{root_token}' sh -c '"
f"bao kv put secret/hydra system-secret=\"{hydra['system-secret']}\" cookie-secret=\"{hydra['cookie-secret']}\" pairwise-salt=\"{hydra['pairwise-salt']}\" && "
f"bao kv put secret/kratos secrets-default=\"{kratos['secrets-default']}\" secrets-cookie=\"{kratos['secrets-cookie']}\" smtp-connection-uri=\"{kratos['smtp-connection-uri']}\" && "
f"bao kv put secret/gitea admin-username=\"{gitea['admin-username']}\" admin-password=\"{gitea['admin-password']}\" && "
f"bao kv put secret/seaweedfs access-key=\"{seaweedfs['access-key']}\" secret-key=\"{seaweedfs['secret-key']}\" && "
f"bao kv put secret/hive oidc-client-id=\"{hive['oidc-client-id']}\" oidc-client-secret=\"{hive['oidc-client-secret']}\" && "
f"bao kv put secret/livekit api-key=\"{livekit['api-key']}\" api-secret=\"{livekit['api-secret']}\" && "
f"bao kv put secret/people django-secret-key=\"{people['django-secret-key']}\" && "
f"bao kv put secret/login-ui cookie-secret=\"{login_ui['cookie-secret']}\" csrf-cookie-secret=\"{login_ui['csrf-cookie-secret']}\""
f"'")
# Configure Kubernetes auth method so VSO can authenticate with OpenBao
ok("Configuring Kubernetes auth for VSO...")
bao(f"BAO_ADDR=http://127.0.0.1:8200 BAO_TOKEN='{root_token}' "
f"bao auth enable kubernetes 2>/dev/null; true")
bao(f"BAO_ADDR=http://127.0.0.1:8200 BAO_TOKEN='{root_token}' "
f"bao write auth/kubernetes/config "
f"kubernetes_host=https://kubernetes.default.svc.cluster.local")
policy_hcl = (
'path "secret/data/*" { capabilities = ["read"] }\n'
'path "secret/metadata/*" { capabilities = ["read", "list"] }\n'
'path "database/static-creds/*" { capabilities = ["read"] }\n'
)
policy_b64 = base64.b64encode(policy_hcl.encode()).decode()
bao(f"BAO_ADDR=http://127.0.0.1:8200 BAO_TOKEN='{root_token}' "
f"sh -c 'echo {policy_b64} | base64 -d | bao policy write vso-reader -'")
bao(f"BAO_ADDR=http://127.0.0.1:8200 BAO_TOKEN='{root_token}' "
f"bao write auth/kubernetes/role/vso "
f"bound_service_account_names=default "
f"bound_service_account_namespaces=ory,devtools,storage,lasuite,media "
f"policies=vso-reader "
f"ttl=1h")
return {
"hydra-system-secret": hydra["system-secret"],
"hydra-cookie-secret": hydra["cookie-secret"],
"hydra-pairwise-salt": hydra["pairwise-salt"],
"kratos-secrets-default": kratos["secrets-default"],
"kratos-secrets-cookie": kratos["secrets-cookie"],
"s3-access-key": seaweedfs["access-key"],
"s3-secret-key": seaweedfs["secret-key"],
"gitea-admin-password": gitea["admin-password"],
"hive-oidc-client-id": hive["oidc-client-id"],
"hive-oidc-client-secret": hive["oidc-client-secret"],
"people-django-secret": people["django-secret-key"],
"livekit-api-key": livekit["api-key"],
"livekit-api-secret": livekit["api-secret"],
"_ob_pod": ob_pod,
"_root_token": root_token,
}
# ---------------------------------------------------------------------------
# Database secrets engine
# ---------------------------------------------------------------------------
def _configure_db_engine(ob_pod, root_token, pg_user, pg_pass):
"""Enable OpenBao database secrets engine and create PostgreSQL static roles.
Static roles cause OpenBao to immediately set (and later rotate) each service
user's password via ALTER USER, eliminating hardcoded DB passwords.
Idempotent: bao write overwrites existing config/roles safely.
The `vault` PG user is created here (if absent) and used as the DB engine
connection user. pg_user/pg_pass (the CNPG superuser) are kept for potential
future use but are no longer used for the connection URL.
"""
ok("Configuring OpenBao database secrets engine...")
pg_rw = "postgres-rw.data.svc.cluster.local:5432"
bao_env = f"BAO_ADDR=http://127.0.0.1:8200 BAO_TOKEN='{root_token}'"
def bao(cmd, check=True):
r = subprocess.run(
["kubectl", *K8S_CTX, "-n", "data", "exec", ob_pod, "-c", "openbao",
"--", "sh", "-c", cmd],
capture_output=True, text=True,
)
if check and r.returncode != 0:
raise RuntimeError(f"bao command failed (exit {r.returncode}):\n{r.stderr.strip()}")
return r.stdout.strip()
# Enable database secrets engine -- tolerate "already enabled" error via || true.
bao(f"{bao_env} bao secrets enable database 2>/dev/null || true", check=False)
# -- vault PG user setup ---------------------------------------------------
# Locate the CNPG primary pod for psql exec (peer auth -- no password needed).
cnpg_pod = kube_out(
"-n", "data", "get", "pods",
"-l=cnpg.io/cluster=postgres,role=primary",
"-o=jsonpath={.items[0].metadata.name}",
)
if not cnpg_pod:
raise RuntimeError("Could not find CNPG primary pod for vault user setup.")
def psql(sql):
r = subprocess.run(
["kubectl", *K8S_CTX, "-n", "data", "exec", cnpg_pod, "-c", "postgres",
"--", "psql", "-U", "postgres", "-c", sql],
capture_output=True, text=True,
)
if r.returncode != 0:
raise RuntimeError(f"psql failed: {r.stderr.strip()}")
return r.stdout.strip()
# Read existing vault pg-password from OpenBao KV, or generate a new one.
existing_vault_pass = bao(
f"{bao_env} bao kv get -field=pg-password secret/vault 2>/dev/null || true",
check=False,
)
vault_pg_pass = existing_vault_pass.strip() if existing_vault_pass.strip() else _secrets.token_urlsafe(32)
# Store vault pg-password in OpenBao KV (idempotent).
bao(f"{bao_env} bao kv put secret/vault pg-password=\"{vault_pg_pass}\"")
ok("vault KV entry written.")
# Create vault PG user if absent, set its password, grant ADMIN OPTION on all service users.
create_vault_sql = (
f"DO $$ BEGIN "
f"IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname = 'vault') THEN "
f"CREATE USER vault WITH LOGIN CREATEROLE; "
f"END IF; "
f"END $$;"
)
psql(create_vault_sql)
psql(f"ALTER USER vault WITH PASSWORD '{vault_pg_pass}';")
for user in PG_USERS:
psql(f"GRANT {user} TO vault WITH ADMIN OPTION;")
ok("vault PG user configured with ADMIN OPTION on all service roles.")
# -- DB engine connection config (uses vault user) -------------------------
conn_url = (
"postgresql://{{username}}:{{password}}"
f"@{pg_rw}/postgres?sslmode=disable"
)
bao(
f"{bao_env} bao write database/config/cnpg-postgres"
f" plugin_name=postgresql-database-plugin"
f" allowed_roles='*'"
f" connection_url='{conn_url}'"
f" username='vault'"
f" password='{vault_pg_pass}'"
)
ok("DB engine connection configured (vault user).")
# Encode the rotation statement to avoid shell quoting issues with inner quotes.
rotation_b64 = base64.b64encode(
b"ALTER USER \"{{name}}\" WITH PASSWORD '{{password}}';"
).decode()
for user in PG_USERS:
bao(
f"{bao_env} sh -c '"
f"bao write database/static-roles/{user}"
f" db_name=cnpg-postgres"
f" username={user}"
f" rotation_period=86400"
f" \"rotation_statements=$(echo {rotation_b64} | base64 -d)\"'"
)
ok(f" static-role/{user}")
ok("Database secrets engine configured.")
# ---------------------------------------------------------------------------
# cmd_seed — main entry point
# ---------------------------------------------------------------------------
def cmd_seed() -> dict:
"""Seed OpenBao KV with crypto-random credentials, then mirror to K8s Secrets.
Returns a dict of credentials for use by callers (gitea admin pass, etc.).
Idempotent: reads existing OpenBao values before generating; never rotates.
"""
step("Seeding secrets...")
creds = _seed_openbao()
ob_pod = creds.pop("_ob_pod", "")
root_token = creds.pop("_root_token", "")
s3_access_key = creds.get("s3-access-key", "")
s3_secret_key = creds.get("s3-secret-key", "")
hydra_system = creds.get("hydra-system-secret", "")
hydra_cookie = creds.get("hydra-cookie-secret", "")
hydra_pairwise = creds.get("hydra-pairwise-salt", "")
kratos_secrets_default = creds.get("kratos-secrets-default", "")
kratos_secrets_cookie = creds.get("kratos-secrets-cookie", "")
hive_oidc_id = creds.get("hive-oidc-client-id", "hive-local")
hive_oidc_sec = creds.get("hive-oidc-client-secret", "")
django_secret = creds.get("people-django-secret", "")
gitea_admin_pass = creds.get("gitea-admin-password", "")
ok("Waiting for postgres cluster...")
pg_pod = ""
for _ in range(60):
phase = kube_out("-n", "data", "get", "cluster", "postgres",
"-o=jsonpath={.status.phase}")
if phase == "Cluster in healthy state":
pg_pod = kube_out("-n", "data", "get", "pods",
"-l=cnpg.io/cluster=postgres,role=primary",
"-o=jsonpath={.items[0].metadata.name}")
ok(f"Postgres ready ({pg_pod}).")
break
time.sleep(5)
else:
warn("Postgres not ready after 5 min -- continuing anyway.")
if pg_pod:
ok("Ensuring postgres roles and databases exist...")
db_map = {
"kratos": "kratos_db", "hydra": "hydra_db", "gitea": "gitea_db",
"hive": "hive_db", "docs": "docs_db", "meet": "meet_db",
"drive": "drive_db", "messages": "messages_db",
"conversations": "conversations_db",
"people": "people_db", "find": "find_db",
}
for user in PG_USERS:
# Only CREATE if missing -- passwords are managed by OpenBao static roles.
ensure_sql = (
f"DO $$ BEGIN "
f"IF NOT EXISTS (SELECT FROM pg_roles WHERE rolname='{user}') "
f"THEN EXECUTE 'CREATE USER {user}'; END IF; END $$;"
)
kube("exec", "-n", "data", pg_pod, "-c", "postgres", "--",
"psql", "-U", "postgres", "-c", ensure_sql, check=False)
db = db_map.get(user, f"{user}_db")
kube("exec", "-n", "data", pg_pod, "-c", "postgres", "--",
"psql", "-U", "postgres", "-c",
f"CREATE DATABASE {db} OWNER {user};", check=False)
# Read CNPG superuser credentials and configure database secrets engine.
pg_user_b64 = kube_out("-n", "data", "get", "secret", "postgres-superuser",
"-o=jsonpath={.data.username}")
pg_pass_b64 = kube_out("-n", "data", "get", "secret", "postgres-superuser",
"-o=jsonpath={.data.password}")
pg_user = base64.b64decode(pg_user_b64).decode() if pg_user_b64 else "postgres"
pg_pass = base64.b64decode(pg_pass_b64).decode() if pg_pass_b64 else ""
if ob_pod and root_token and pg_pass:
try:
_configure_db_engine(ob_pod, root_token, pg_user, pg_pass)
except Exception as exc:
warn(f"DB engine config failed: {exc}")
else:
warn("Skipping DB engine config -- missing ob_pod, root_token, or pg_pass.")
ok("Creating K8s secrets (VSO will overwrite on next sync)...")
ensure_ns("ory")
# Hydra app secrets -- DSN comes from VaultDynamicSecret hydra-db-creds.
create_secret("ory", "hydra",
secretsSystem=hydra_system,
secretsCookie=hydra_cookie,
**{"pairwise-salt": hydra_pairwise},
)
# Kratos non-rotating encryption keys -- DSN comes from VaultDynamicSecret kratos-db-creds.
create_secret("ory", "kratos-app-secrets",
secretsDefault=kratos_secrets_default,
secretsCookie=kratos_secrets_cookie,
)
ensure_ns("devtools")
# gitea-db-credentials comes from VaultDynamicSecret (static-creds/gitea).
create_secret("devtools", "gitea-s3-credentials",
**{"access-key": s3_access_key, "secret-key": s3_secret_key})
create_secret("devtools", "gitea-admin-credentials",
username=GITEA_ADMIN_USER, password=gitea_admin_pass)
ensure_ns("storage")
s3_json = (
'{"identities":[{"name":"seaweed","credentials":[{"accessKey":"'
+ s3_access_key + '","secretKey":"' + s3_secret_key
+ '"}],"actions":["Admin","Read","Write","List","Tagging"]}]}'
)
create_secret("storage", "seaweedfs-s3-credentials",
S3_ACCESS_KEY=s3_access_key, S3_SECRET_KEY=s3_secret_key)
create_secret("storage", "seaweedfs-s3-json", **{"s3.json": s3_json})
ensure_ns("lasuite")
create_secret("lasuite", "seaweedfs-s3-credentials",
S3_ACCESS_KEY=s3_access_key, S3_SECRET_KEY=s3_secret_key)
# hive-db-url and people-db-credentials come from VaultDynamicSecrets.
create_secret("lasuite", "hive-oidc",
**{"client-id": hive_oidc_id, "client-secret": hive_oidc_sec})
create_secret("lasuite", "people-django-secret",
DJANGO_SECRET_KEY=django_secret)
ensure_ns("media")
ok("All secrets seeded.")
return creds
# ---------------------------------------------------------------------------
# cmd_verify — VSO E2E verification
# ---------------------------------------------------------------------------
def cmd_verify():
"""End-to-end test of VSO -> OpenBao integration.
1. Writes a random value to OpenBao KV at secret/vso-test.
2. Creates a VaultAuth + VaultStaticSecret in the 'ory' namespace
(already bound to the 'vso' Kubernetes auth role).
3. Polls until VSO syncs the K8s Secret (up to 60s).
4. Reads and base64-decodes the K8s Secret; compares to the expected value.
5. Cleans up all test resources in a finally block.
"""
step("Verifying VSO -> OpenBao integration (E2E)...")
ob_pod = kube_out(
"-n", "data", "get", "pods",
"-l=app.kubernetes.io/name=openbao,component=server",
"-o=jsonpath={.items[0].metadata.name}",
)
if not ob_pod:
die("OpenBao pod not found -- run full bring-up first.")
root_token_enc = kube_out(
"-n", "data", "get", "secret", "openbao-keys",
"-o=jsonpath={.data.root-token}",
)
if not root_token_enc:
die("Could not read openbao-keys secret.")
root_token = base64.b64decode(root_token_enc).decode()
bao_env = f"BAO_ADDR=http://127.0.0.1:8200 BAO_TOKEN='{root_token}'"
def bao(cmd, *, check=True):
r = subprocess.run(
["kubectl", *K8S_CTX, "-n", "data", "exec", ob_pod, "-c", "openbao",
"--", "sh", "-c", cmd],
capture_output=True, text=True,
)
if check and r.returncode != 0:
raise RuntimeError(f"bao failed (exit {r.returncode}): {r.stderr.strip()}")
return r.stdout.strip()
test_value = _secrets.token_urlsafe(16)
test_ns = "ory"
test_name = "vso-verify"
def cleanup():
ok("Cleaning up test resources...")
kube("delete", "vaultstaticsecret", test_name, f"-n={test_ns}",
"--ignore-not-found", check=False)
kube("delete", "vaultauth", test_name, f"-n={test_ns}",
"--ignore-not-found", check=False)
kube("delete", "secret", test_name, f"-n={test_ns}",
"--ignore-not-found", check=False)
bao(f"{bao_env} bao kv delete secret/vso-test 2>/dev/null || true", check=False)
try:
# 1. Write test value to OpenBao KV
ok(f"Writing test sentinel to OpenBao secret/vso-test ...")
bao(f"{bao_env} bao kv put secret/vso-test test-key='{test_value}'")
# 2. Create VaultAuth in ory (already in vso role's bound namespaces)
ok(f"Creating VaultAuth {test_ns}/{test_name} ...")
kube_apply(f"""
apiVersion: secrets.hashicorp.com/v1beta1
kind: VaultAuth
metadata:
name: {test_name}
namespace: {test_ns}
spec:
method: kubernetes
mount: kubernetes
kubernetes:
role: vso
serviceAccount: default
""")
# 3. Create VaultStaticSecret pointing at our test KV path
ok(f"Creating VaultStaticSecret {test_ns}/{test_name} ...")
kube_apply(f"""
apiVersion: secrets.hashicorp.com/v1beta1
kind: VaultStaticSecret
metadata:
name: {test_name}
namespace: {test_ns}
spec:
vaultAuthRef: {test_name}
mount: secret
type: kv-v2
path: vso-test
refreshAfter: 10s
destination:
name: {test_name}
create: true
overwrite: true
""")
# 4. Poll until VSO sets secretMAC (= synced)
ok("Waiting for VSO to sync (up to 60s) ...")
deadline = time.time() + 60
synced = False
while time.time() < deadline:
mac = kube_out(
"get", "vaultstaticsecret", test_name, f"-n={test_ns}",
"-o=jsonpath={.status.secretMAC}", "--ignore-not-found",
)
if mac and mac not in ("<none>", ""):
synced = True
break
time.sleep(3)
if not synced:
msg = kube_out(
"get", "vaultstaticsecret", test_name, f"-n={test_ns}",
"-o=jsonpath={.status.conditions[0].message}", "--ignore-not-found",
)
raise RuntimeError(f"VSO did not sync within 60s. Last status: {msg or 'unknown'}")
# 5. Read and verify the K8s Secret value
ok("Verifying K8s Secret contents ...")
raw = kube_out(
"get", "secret", test_name, f"-n={test_ns}",
"-o=jsonpath={.data.test-key}", "--ignore-not-found",
)
if not raw:
raise RuntimeError(
f"K8s Secret {test_ns}/{test_name} not found or missing key 'test-key'."
)
actual = base64.b64decode(raw).decode()
if actual != test_value:
raise RuntimeError(
f"Value mismatch!\n expected: {test_value!r}\n got: {actual!r}"
)
ok(f"Sentinel value matches -- VSO -> OpenBao integration is working.")
except Exception as exc:
cleanup()
die(f"VSO verification FAILED: {exc}")
cleanup()
ok("VSO E2E verification passed.")

230
sunbeam/services.py Normal file
View File

@@ -0,0 +1,230 @@
"""Service management — status, logs, restart."""
import subprocess
import sys
from pathlib import Path
from sunbeam.kube import kube, kube_out, parse_target
from sunbeam.tools import ensure_tool
from sunbeam.output import step, ok, warn, die
MANAGED_NS = ["data", "devtools", "ingress", "lasuite", "media", "ory", "storage",
"vault-secrets-operator"]
SERVICES_TO_RESTART = [
("ory", "hydra"),
("ory", "kratos"),
("ory", "login-ui"),
("devtools", "gitea"),
("storage", "seaweedfs-filer"),
("lasuite", "hive"),
("lasuite", "people-backend"),
("lasuite", "people-frontend"),
("lasuite", "people-celery-worker"),
("lasuite", "people-celery-beat"),
("media", "livekit-server"),
]
K8S_CTX = ["--context=sunbeam"]
def _capture_out(cmd, *, default=""):
r = subprocess.run(cmd, capture_output=True, text=True)
return r.stdout.strip() if r.returncode == 0 else default
def _vso_sync_status():
"""Print VSO VaultStaticSecret and VaultDynamicSecret sync health.
VSS synced = status.secretMAC is non-empty.
VDS synced = status.lastRenewalTime is non-zero.
"""
step("VSO secret sync status...")
all_ok = True
# VaultStaticSecrets: synced when secretMAC is populated
vss_raw = _capture_out([
"kubectl", *K8S_CTX, "get", "vaultstaticsecret", "-A", "--no-headers",
"-o=custom-columns="
"NS:.metadata.namespace,NAME:.metadata.name,MAC:.status.secretMAC",
])
cur_ns = None
for line in sorted(vss_raw.splitlines()):
cols = line.split()
if len(cols) < 2:
continue
ns, name = cols[0], cols[1]
mac = cols[2] if len(cols) > 2 else ""
synced = bool(mac and mac != "<none>")
if not synced:
all_ok = False
icon = "\u2713" if synced else "\u2717"
if ns != cur_ns:
print(f" {ns} (VSS):")
cur_ns = ns
print(f" {icon} {name}")
# VaultDynamicSecrets: synced when lastRenewalTime is non-zero
vds_raw = _capture_out([
"kubectl", *K8S_CTX, "get", "vaultdynamicsecret", "-A", "--no-headers",
"-o=custom-columns="
"NS:.metadata.namespace,NAME:.metadata.name,RENEWED:.status.lastRenewalTime",
])
cur_ns = None
for line in sorted(vds_raw.splitlines()):
cols = line.split()
if len(cols) < 2:
continue
ns, name = cols[0], cols[1]
renewed = cols[2] if len(cols) > 2 else "0"
synced = renewed not in ("", "0", "<none>")
if not synced:
all_ok = False
icon = "\u2713" if synced else "\u2717"
if ns != cur_ns:
print(f" {ns} (VDS):")
cur_ns = ns
print(f" {icon} {name}")
print()
if all_ok:
ok("All VSO secrets synced.")
else:
warn("Some VSO secrets are not synced.")
def cmd_status(target: str | None):
"""Show pod health, optionally filtered by namespace or namespace/service."""
step("Pod health across all namespaces...")
ns_set = set(MANAGED_NS)
if target is None:
# All pods across managed namespaces
raw = _capture_out([
"kubectl", *K8S_CTX,
"get", "pods",
"--field-selector=metadata.namespace!= kube-system",
"-A", "--no-headers",
])
pods = []
for line in raw.splitlines():
cols = line.split()
if len(cols) < 4:
continue
ns = cols[0]
if ns not in ns_set:
continue
pods.append(cols)
else:
ns, name = parse_target(target)
if name:
# Specific service: namespace/service
raw = _capture_out([
"kubectl", *K8S_CTX,
"get", "pods", "-n", ns, "-l", f"app={name}", "--no-headers",
])
pods = []
for line in raw.splitlines():
cols = line.split()
if len(cols) < 3:
continue
# Prepend namespace since -n output doesn't include it
pods.append([ns] + cols)
else:
# Namespace only
raw = _capture_out([
"kubectl", *K8S_CTX,
"get", "pods", "-n", ns, "--no-headers",
])
pods = []
for line in raw.splitlines():
cols = line.split()
if len(cols) < 3:
continue
pods.append([ns] + cols)
if not pods:
warn("No pods found in managed namespaces.")
return
all_ok = True
cur_ns = None
icon_map = {"Running": "\u2713", "Completed": "\u2713", "Succeeded": "\u2713",
"Pending": "\u25cb", "Failed": "\u2717", "Unknown": "?"}
for cols in sorted(pods, key=lambda c: (c[0], c[1])):
ns, name, ready, status = cols[0], cols[1], cols[2], cols[3]
if ns != cur_ns:
print(f" {ns}:")
cur_ns = ns
icon = icon_map.get(status, "?")
unhealthy = status not in ("Running", "Completed", "Succeeded")
# Only check ready ratio for Running pods — Completed/Succeeded pods
# legitimately report 0/N containers ready.
if not unhealthy and status == "Running" and "/" in ready:
r, t = ready.split("/")
unhealthy = r != t
if unhealthy:
all_ok = False
print(f" {icon} {name:<50} {ready:<6} {status}")
print()
if all_ok:
ok("All pods healthy.")
else:
warn("Some pods are not ready.")
_vso_sync_status()
def cmd_logs(target: str, follow: bool):
"""Stream logs for a service. Target must include service name (e.g. ory/kratos)."""
ns, name = parse_target(target)
if not name:
die("Logs require a service name, e.g. 'ory/kratos'.")
kubectl = str(ensure_tool("kubectl"))
cmd = [kubectl, "--context=sunbeam", "-n", ns, "logs",
"-l", f"app={name}", "--tail=100"]
if follow:
cmd.append("--follow")
proc = subprocess.Popen(cmd)
proc.wait()
def cmd_get(target: str, output: str = "yaml"):
"""Print raw kubectl get output for a pod or resource (ns/name).
Usage: sunbeam get vault-secrets-operator/vault-secrets-operator-test
sunbeam get ory/kratos-abc -o json
"""
ns, name = parse_target(target)
if not ns or not name:
die("get requires namespace/name, e.g. 'sunbeam get ory/kratos-abc'")
# Try pod first, fall back to any resource type if caller passes kind/ns/name
result = kube_out("get", "pod", name, "-n", ns, f"-o={output}")
if not result:
die(f"Pod {ns}/{name} not found.")
print(result)
def cmd_restart(target: str | None):
"""Restart deployments. None=all, 'ory'=namespace, 'ory/kratos'=specific."""
step("Restarting services...")
if target is None:
matched = SERVICES_TO_RESTART
else:
ns, name = parse_target(target)
if name:
matched = [(n, d) for n, d in SERVICES_TO_RESTART if n == ns and d == name]
else:
matched = [(n, d) for n, d in SERVICES_TO_RESTART if n == ns]
if not matched:
warn(f"No matching services for target: {target}")
return
for ns, dep in matched:
kube("-n", ns, "rollout", "restart", f"deployment/{dep}", check=False)
ok("Done.")

View File

191
sunbeam/tests/test_cli.py Normal file
View File

@@ -0,0 +1,191 @@
"""Tests for CLI routing and argument validation."""
import sys
import unittest
from unittest.mock import MagicMock, patch
import argparse
class TestArgParsing(unittest.TestCase):
"""Test that argparse parses arguments correctly."""
def _parse(self, argv):
"""Parse argv using the same parser as main(), return args namespace."""
parser = argparse.ArgumentParser(prog="sunbeam")
sub = parser.add_subparsers(dest="verb", metavar="verb")
sub.add_parser("up")
sub.add_parser("down")
p_status = sub.add_parser("status")
p_status.add_argument("target", nargs="?", default=None)
sub.add_parser("apply")
sub.add_parser("seed")
sub.add_parser("verify")
p_logs = sub.add_parser("logs")
p_logs.add_argument("target")
p_logs.add_argument("-f", "--follow", action="store_true")
p_get = sub.add_parser("get")
p_get.add_argument("target")
p_get.add_argument("-o", "--output", default="yaml", choices=["yaml", "json", "wide"])
p_restart = sub.add_parser("restart")
p_restart.add_argument("target", nargs="?", default=None)
p_build = sub.add_parser("build")
p_build.add_argument("what", choices=["proxy"])
sub.add_parser("mirror")
sub.add_parser("bootstrap")
return parser.parse_args(argv)
def test_up(self):
args = self._parse(["up"])
self.assertEqual(args.verb, "up")
def test_status_no_target(self):
args = self._parse(["status"])
self.assertEqual(args.verb, "status")
self.assertIsNone(args.target)
def test_status_with_namespace(self):
args = self._parse(["status", "ory"])
self.assertEqual(args.verb, "status")
self.assertEqual(args.target, "ory")
def test_logs_no_follow(self):
args = self._parse(["logs", "ory/kratos"])
self.assertEqual(args.verb, "logs")
self.assertEqual(args.target, "ory/kratos")
self.assertFalse(args.follow)
def test_logs_follow_short(self):
args = self._parse(["logs", "ory/kratos", "-f"])
self.assertTrue(args.follow)
def test_logs_follow_long(self):
args = self._parse(["logs", "ory/kratos", "--follow"])
self.assertTrue(args.follow)
def test_build_proxy(self):
args = self._parse(["build", "proxy"])
self.assertEqual(args.what, "proxy")
def test_build_invalid_target(self):
with self.assertRaises(SystemExit):
self._parse(["build", "notavalidtarget"])
def test_get_with_target(self):
args = self._parse(["get", "ory/kratos-abc"])
self.assertEqual(args.verb, "get")
self.assertEqual(args.target, "ory/kratos-abc")
self.assertEqual(args.output, "yaml")
def test_get_json_output(self):
args = self._parse(["get", "ory/kratos-abc", "-o", "json"])
self.assertEqual(args.output, "json")
def test_get_invalid_output_format(self):
with self.assertRaises(SystemExit):
self._parse(["get", "ory/kratos-abc", "-o", "toml"])
def test_no_args_verb_is_none(self):
args = self._parse([])
self.assertIsNone(args.verb)
class TestCliDispatch(unittest.TestCase):
"""Test that main() dispatches to the correct command function."""
def test_no_verb_exits_0(self):
with patch.object(sys, "argv", ["sunbeam"]):
from sunbeam import cli
with self.assertRaises(SystemExit) as ctx:
cli.main()
self.assertEqual(ctx.exception.code, 0)
def test_unknown_verb_exits_nonzero(self):
with patch.object(sys, "argv", ["sunbeam", "unknown-verb"]):
from sunbeam import cli
with self.assertRaises(SystemExit) as ctx:
cli.main()
self.assertNotEqual(ctx.exception.code, 0)
def test_up_calls_cmd_up(self):
mock_up = MagicMock()
with patch.object(sys, "argv", ["sunbeam", "up"]):
with patch.dict("sys.modules", {"sunbeam.cluster": MagicMock(cmd_up=mock_up)}):
import importlib
import sunbeam.cli as cli_mod
importlib.reload(cli_mod)
try:
cli_mod.main()
except SystemExit:
pass
mock_up.assert_called_once()
def test_status_no_target(self):
mock_status = MagicMock()
with patch.object(sys, "argv", ["sunbeam", "status"]):
with patch.dict("sys.modules", {"sunbeam.services": MagicMock(cmd_status=mock_status)}):
import importlib, sunbeam.cli as cli_mod
importlib.reload(cli_mod)
try:
cli_mod.main()
except SystemExit:
pass
mock_status.assert_called_once_with(None)
def test_status_with_namespace(self):
mock_status = MagicMock()
with patch.object(sys, "argv", ["sunbeam", "status", "ory"]):
with patch.dict("sys.modules", {"sunbeam.services": MagicMock(cmd_status=mock_status)}):
import importlib, sunbeam.cli as cli_mod
importlib.reload(cli_mod)
try:
cli_mod.main()
except SystemExit:
pass
mock_status.assert_called_once_with("ory")
def test_logs_with_target(self):
mock_logs = MagicMock()
with patch.object(sys, "argv", ["sunbeam", "logs", "ory/kratos"]):
with patch.dict("sys.modules", {"sunbeam.services": MagicMock(cmd_logs=mock_logs)}):
import importlib, sunbeam.cli as cli_mod
importlib.reload(cli_mod)
try:
cli_mod.main()
except SystemExit:
pass
mock_logs.assert_called_once_with("ory/kratos", follow=False)
def test_logs_follow_flag(self):
mock_logs = MagicMock()
with patch.object(sys, "argv", ["sunbeam", "logs", "ory/kratos", "-f"]):
with patch.dict("sys.modules", {"sunbeam.services": MagicMock(cmd_logs=mock_logs)}):
import importlib, sunbeam.cli as cli_mod
importlib.reload(cli_mod)
try:
cli_mod.main()
except SystemExit:
pass
mock_logs.assert_called_once_with("ory/kratos", follow=True)
def test_get_dispatches_with_target_and_output(self):
mock_get = MagicMock()
with patch.object(sys, "argv", ["sunbeam", "get", "ory/kratos-abc"]):
with patch.dict("sys.modules", {"sunbeam.services": MagicMock(cmd_get=mock_get)}):
import importlib, sunbeam.cli as cli_mod
importlib.reload(cli_mod)
try:
cli_mod.main()
except SystemExit:
pass
mock_get.assert_called_once_with("ory/kratos-abc", output="yaml")
def test_build_proxy(self):
mock_build = MagicMock()
with patch.object(sys, "argv", ["sunbeam", "build", "proxy"]):
with patch.dict("sys.modules", {"sunbeam.images": MagicMock(cmd_build=mock_build)}):
import importlib, sunbeam.cli as cli_mod
importlib.reload(cli_mod)
try:
cli_mod.main()
except SystemExit:
pass
mock_build.assert_called_once_with("proxy")

108
sunbeam/tests/test_kube.py Normal file
View File

@@ -0,0 +1,108 @@
"""Tests for kube.py — domain substitution, target parsing, kubectl wrappers."""
import unittest
from unittest.mock import MagicMock, patch
class TestParseTarget(unittest.TestCase):
def setUp(self):
from sunbeam.kube import parse_target
self.parse = parse_target
def test_none(self):
self.assertEqual(self.parse(None), (None, None))
def test_namespace_only(self):
self.assertEqual(self.parse("ory"), ("ory", None))
def test_namespace_and_name(self):
self.assertEqual(self.parse("ory/kratos"), ("ory", "kratos"))
def test_too_many_parts_raises(self):
with self.assertRaises(ValueError):
self.parse("too/many/parts")
def test_empty_string(self):
result = self.parse("")
self.assertEqual(result, ("", None))
class TestDomainReplace(unittest.TestCase):
def setUp(self):
from sunbeam.kube import domain_replace
self.replace = domain_replace
def test_single_occurrence(self):
result = self.replace("src.DOMAIN_SUFFIX/foo", "192.168.1.1.sslip.io")
self.assertEqual(result, "src.192.168.1.1.sslip.io/foo")
def test_multiple_occurrences(self):
text = "DOMAIN_SUFFIX and DOMAIN_SUFFIX"
result = self.replace(text, "x.sslip.io")
self.assertEqual(result, "x.sslip.io and x.sslip.io")
def test_no_occurrence(self):
result = self.replace("no match here", "x.sslip.io")
self.assertEqual(result, "no match here")
class TestKustomizeBuild(unittest.TestCase):
def test_calls_run_tool_and_applies_domain_replace(self):
from pathlib import Path
mock_result = MagicMock()
mock_result.stdout = "image: src.DOMAIN_SUFFIX/foo\nimage: src.DOMAIN_SUFFIX/bar"
with patch("sunbeam.kube.run_tool", return_value=mock_result) as mock_rt:
from sunbeam.kube import kustomize_build
result = kustomize_build(Path("/some/overlay"), "192.168.1.1.sslip.io")
mock_rt.assert_called_once()
call_args = mock_rt.call_args[0]
self.assertEqual(call_args[0], "kustomize")
self.assertIn("build", call_args)
self.assertIn("--enable-helm", call_args)
self.assertIn("192.168.1.1.sslip.io", result)
self.assertNotIn("DOMAIN_SUFFIX", result)
def test_strips_null_annotations(self):
from pathlib import Path
mock_result = MagicMock()
mock_result.stdout = "metadata:\n annotations: null\n name: test"
with patch("sunbeam.kube.run_tool", return_value=mock_result):
from sunbeam.kube import kustomize_build
result = kustomize_build(Path("/overlay"), "x.sslip.io")
self.assertNotIn("annotations: null", result)
class TestKubeWrappers(unittest.TestCase):
def test_kube_passes_context(self):
with patch("sunbeam.kube.run_tool") as mock_rt:
mock_rt.return_value = MagicMock(returncode=0)
from sunbeam.kube import kube
kube("get", "pods")
call_args = mock_rt.call_args[0]
self.assertEqual(call_args[0], "kubectl")
self.assertIn("--context=sunbeam", call_args)
def test_kube_out_returns_stdout_on_success(self):
with patch("sunbeam.kube.run_tool") as mock_rt:
mock_rt.return_value = MagicMock(returncode=0, stdout=" output ")
from sunbeam.kube import kube_out
result = kube_out("get", "pods")
self.assertEqual(result, "output")
def test_kube_out_returns_empty_on_failure(self):
with patch("sunbeam.kube.run_tool") as mock_rt:
mock_rt.return_value = MagicMock(returncode=1, stdout="error text")
from sunbeam.kube import kube_out
result = kube_out("get", "pods")
self.assertEqual(result, "")
def test_kube_ok_returns_true_on_zero(self):
with patch("sunbeam.kube.run_tool") as mock_rt:
mock_rt.return_value = MagicMock(returncode=0)
from sunbeam.kube import kube_ok
self.assertTrue(kube_ok("get", "ns", "default"))
def test_kube_ok_returns_false_on_nonzero(self):
with patch("sunbeam.kube.run_tool") as mock_rt:
mock_rt.return_value = MagicMock(returncode=1)
from sunbeam.kube import kube_ok
self.assertFalse(kube_ok("get", "ns", "missing"))

View File

@@ -0,0 +1,93 @@
"""Tests for secrets.py — seed idempotency, verify flow."""
import base64
import unittest
from unittest.mock import MagicMock, patch, call
class TestSeedIdempotency(unittest.TestCase):
"""_seed_openbao() must read existing values before writing (never rotates)."""
def test_get_or_create_skips_existing(self):
"""If OpenBao already has a value, it's reused not regenerated."""
with patch("sunbeam.secrets._seed_openbao") as mock_seed:
mock_seed.return_value = {
"hydra-system-secret": "existingvalue",
"_ob_pod": "openbao-0",
"_root_token": "token123",
}
from sunbeam import secrets
result = secrets._seed_openbao()
self.assertIn("hydra-system-secret", result)
class TestCmdVerify(unittest.TestCase):
def _mock_kube_out(self, ob_pod="openbao-0", root_token="testtoken", mac=""):
"""Create a side_effect function for kube_out that simulates verify flow."""
encoded_token = base64.b64encode(root_token.encode()).decode()
def side_effect(*args, **kwargs):
args_str = " ".join(str(a) for a in args)
if "app.kubernetes.io/name=openbao" in args_str:
return ob_pod
if "root-token" in args_str:
return encoded_token
if "secretMAC" in args_str:
return mac
if "conditions" in args_str:
return "unknown"
if ".data.test-key" in args_str:
return ""
return ""
return side_effect
def test_verify_cleans_up_on_timeout(self):
"""cmd_verify() must clean up test resources even when VSO doesn't sync."""
kube_out_fn = self._mock_kube_out(mac="") # MAC never set -> timeout
with patch("sunbeam.secrets.kube_out", side_effect=kube_out_fn):
with patch("sunbeam.secrets.kube") as mock_kube:
with patch("sunbeam.secrets.kube_apply"):
with patch("subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0, stdout="", stderr="")
with patch("time.time") as mock_time:
# start=0, first check=0, second check past deadline
mock_time.side_effect = [0, 0, 100]
with patch("time.sleep"):
from sunbeam import secrets
with self.assertRaises(SystemExit):
secrets.cmd_verify()
# Cleanup should have been called (delete calls)
delete_calls = [c for c in mock_kube.call_args_list
if "delete" in str(c)]
self.assertGreater(len(delete_calls), 0)
def test_verify_succeeds_when_synced(self):
"""cmd_verify() succeeds when VSO syncs the secret and value matches."""
# We need a fixed test_value. Patch _secrets.token_urlsafe to return known value.
test_val = "fixed-test-value"
encoded_val = base64.b64encode(test_val.encode()).decode()
encoded_token = base64.b64encode(b"testtoken").decode()
call_count = [0]
def kube_out_fn(*args, **kwargs):
args_str = " ".join(str(a) for a in args)
if "app.kubernetes.io/name=openbao" in args_str:
return "openbao-0"
if "root-token" in args_str:
return encoded_token
if "secretMAC" in args_str:
call_count[0] += 1
return "somemac" if call_count[0] >= 1 else ""
if ".data.test-key" in args_str:
return encoded_val
return ""
with patch("sunbeam.secrets.kube_out", side_effect=kube_out_fn):
with patch("sunbeam.secrets.kube") as mock_kube:
with patch("sunbeam.secrets.kube_apply"):
with patch("subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0, stdout="", stderr="")
with patch("sunbeam.secrets._secrets.token_urlsafe", return_value=test_val):
with patch("time.time", return_value=0):
with patch("time.sleep"):
from sunbeam import secrets
# Should not raise
secrets.cmd_verify()

View File

@@ -0,0 +1,128 @@
"""Tests for services.py — status scoping, log command construction, restart."""
import unittest
from unittest.mock import MagicMock, patch, call
class TestCmdStatus(unittest.TestCase):
def test_all_namespaces_when_no_target(self):
fake_output = (
"ory hydra-abc 1/1 Running 0 1d\n"
"data valkey-xyz 1/1 Running 0 1d\n"
)
with patch("sunbeam.services._capture_out", return_value=fake_output):
from sunbeam import services
services.cmd_status(None)
def test_namespace_scoped(self):
fake_output = "ory kratos-abc 1/1 Running 0 1d\n"
with patch("sunbeam.services._capture_out", return_value=fake_output) as mock_co:
from sunbeam import services
services.cmd_status("ory")
# Should have called _capture_out with -n ory
calls_str = str(mock_co.call_args_list)
self.assertIn("ory", calls_str)
def test_pod_scoped(self):
fake_output = "kratos-abc 1/1 Running 0 1d\n"
with patch("sunbeam.services._capture_out", return_value=fake_output) as mock_co:
from sunbeam import services
services.cmd_status("ory/kratos")
calls_str = str(mock_co.call_args_list)
self.assertIn("ory", calls_str)
self.assertIn("kratos", calls_str)
class TestCmdLogs(unittest.TestCase):
def test_logs_no_follow(self):
with patch("subprocess.Popen") as mock_popen:
mock_proc = MagicMock()
mock_proc.wait.return_value = 0
mock_popen.return_value = mock_proc
with patch("sunbeam.tools.ensure_tool", return_value="/fake/kubectl"):
from sunbeam import services
services.cmd_logs("ory/kratos", follow=False)
args = mock_popen.call_args[0][0]
self.assertIn("-n", args)
self.assertIn("ory", args)
self.assertNotIn("--follow", args)
def test_logs_follow(self):
with patch("subprocess.Popen") as mock_popen:
mock_proc = MagicMock()
mock_proc.wait.return_value = 0
mock_popen.return_value = mock_proc
with patch("sunbeam.tools.ensure_tool", return_value="/fake/kubectl"):
from sunbeam import services
services.cmd_logs("ory/kratos", follow=True)
args = mock_popen.call_args[0][0]
self.assertIn("--follow", args)
def test_logs_requires_service_name(self):
"""Passing just a namespace (no service) should die()."""
with self.assertRaises(SystemExit):
from sunbeam import services
services.cmd_logs("ory", follow=False)
class TestCmdGet(unittest.TestCase):
def test_prints_yaml_for_pod(self):
with patch("sunbeam.services.kube_out", return_value="apiVersion: v1\nkind: Pod") as mock_ko:
from sunbeam import services
services.cmd_get("ory/kratos-abc")
mock_ko.assert_called_once_with("get", "pod", "kratos-abc", "-n", "ory", "-o=yaml")
def test_default_output_is_yaml(self):
with patch("sunbeam.services.kube_out", return_value="kind: Pod"):
from sunbeam import services
# no output kwarg → defaults to yaml
services.cmd_get("ory/kratos-abc")
def test_json_output_format(self):
with patch("sunbeam.services.kube_out", return_value='{"kind":"Pod"}') as mock_ko:
from sunbeam import services
services.cmd_get("ory/kratos-abc", output="json")
mock_ko.assert_called_once_with("get", "pod", "kratos-abc", "-n", "ory", "-o=json")
def test_missing_name_exits(self):
with self.assertRaises(SystemExit):
from sunbeam import services
services.cmd_get("ory") # namespace-only, no pod name
def test_not_found_exits(self):
with patch("sunbeam.services.kube_out", return_value=""):
with self.assertRaises(SystemExit):
from sunbeam import services
services.cmd_get("ory/nonexistent")
class TestCmdRestart(unittest.TestCase):
def test_restart_all(self):
with patch("sunbeam.services.kube") as mock_kube:
from sunbeam import services
services.cmd_restart(None)
# Should restart all SERVICES_TO_RESTART
self.assertGreater(mock_kube.call_count, 0)
def test_restart_namespace_scoped(self):
with patch("sunbeam.services.kube") as mock_kube:
from sunbeam import services
services.cmd_restart("ory")
calls_str = str(mock_kube.call_args_list)
# Should only restart ory/* services
self.assertIn("ory", calls_str)
self.assertNotIn("devtools", calls_str)
def test_restart_specific_service(self):
with patch("sunbeam.services.kube") as mock_kube:
from sunbeam import services
services.cmd_restart("ory/kratos")
# Should restart exactly deployment/kratos in ory
calls_str = str(mock_kube.call_args_list)
self.assertIn("kratos", calls_str)
def test_restart_unknown_service_warns(self):
with patch("sunbeam.services.kube") as mock_kube:
from sunbeam import services
services.cmd_restart("nonexistent/nosuch")
# kube should not be called since no match
mock_kube.assert_not_called()

162
sunbeam/tests/test_tools.py Normal file
View File

@@ -0,0 +1,162 @@
"""Tests for tools.py binary bundler."""
import hashlib
import stat
import unittest
from pathlib import Path
from unittest.mock import MagicMock, patch
import tempfile
import shutil
class TestSha256(unittest.TestCase):
def test_computes_correct_hash(self):
from sunbeam.tools import _sha256
with tempfile.NamedTemporaryFile(delete=False) as f:
f.write(b"hello world")
f.flush()
path = Path(f.name)
try:
expected = hashlib.sha256(b"hello world").hexdigest()
self.assertEqual(_sha256(path), expected)
finally:
path.unlink()
class TestEnsureTool(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.cache_patcher = patch("sunbeam.tools.CACHE_DIR", Path(self.tmpdir))
self.cache_patcher.start()
def tearDown(self):
self.cache_patcher.stop()
shutil.rmtree(self.tmpdir, ignore_errors=True)
def test_returns_cached_if_sha_matches(self):
binary_data = b"#!/bin/sh\necho kubectl"
dest = Path(self.tmpdir) / "kubectl"
dest.write_bytes(binary_data)
dest.chmod(dest.stat().st_mode | stat.S_IXUSR)
expected_sha = hashlib.sha256(binary_data).hexdigest()
tools_spec = {"kubectl": {"url": "http://x", "sha256": expected_sha}}
with patch("sunbeam.tools.TOOLS", tools_spec):
from sunbeam import tools
result = tools.ensure_tool("kubectl")
self.assertEqual(result, dest)
def test_returns_cached_if_sha_empty(self):
binary_data = b"#!/bin/sh\necho kubectl"
dest = Path(self.tmpdir) / "kubectl"
dest.write_bytes(binary_data)
dest.chmod(dest.stat().st_mode | stat.S_IXUSR)
tools_spec = {"kubectl": {"url": "http://x", "sha256": ""}}
with patch("sunbeam.tools.TOOLS", tools_spec):
from sunbeam import tools
result = tools.ensure_tool("kubectl")
self.assertEqual(result, dest)
def test_downloads_on_cache_miss(self):
binary_data = b"#!/bin/sh\necho kubectl"
tools_spec = {"kubectl": {"url": "http://example.com/kubectl", "sha256": ""}}
with patch("sunbeam.tools.TOOLS", tools_spec):
with patch("urllib.request.urlopen") as mock_url:
mock_resp = MagicMock()
mock_resp.read.return_value = binary_data
mock_resp.__enter__ = lambda s: s
mock_resp.__exit__ = MagicMock(return_value=False)
mock_url.return_value = mock_resp
from sunbeam import tools
result = tools.ensure_tool("kubectl")
dest = Path(self.tmpdir) / "kubectl"
self.assertTrue(dest.exists())
self.assertEqual(dest.read_bytes(), binary_data)
# Should be executable
self.assertTrue(dest.stat().st_mode & stat.S_IXUSR)
def test_raises_on_sha256_mismatch(self):
binary_data = b"#!/bin/sh\necho fake"
tools_spec = {"kubectl": {
"url": "http://example.com/kubectl",
"sha256": "a" * 64, # wrong hash
}}
with patch("sunbeam.tools.TOOLS", tools_spec):
with patch("urllib.request.urlopen") as mock_url:
mock_resp = MagicMock()
mock_resp.read.return_value = binary_data
mock_resp.__enter__ = lambda s: s
mock_resp.__exit__ = MagicMock(return_value=False)
mock_url.return_value = mock_resp
from sunbeam import tools
with self.assertRaises(RuntimeError) as ctx:
tools.ensure_tool("kubectl")
self.assertIn("SHA256 mismatch", str(ctx.exception))
# Binary should be cleaned up
self.assertFalse((Path(self.tmpdir) / "kubectl").exists())
def test_redownloads_on_sha_mismatch_cached(self):
"""If cached binary has wrong hash, it's deleted and re-downloaded."""
old_data = b"old binary"
new_data = b"new binary"
dest = Path(self.tmpdir) / "kubectl"
dest.write_bytes(old_data)
new_sha = hashlib.sha256(new_data).hexdigest()
tools_spec = {"kubectl": {"url": "http://x/kubectl", "sha256": new_sha}}
with patch("sunbeam.tools.TOOLS", tools_spec):
with patch("urllib.request.urlopen") as mock_url:
mock_resp = MagicMock()
mock_resp.read.return_value = new_data
mock_resp.__enter__ = lambda s: s
mock_resp.__exit__ = MagicMock(return_value=False)
mock_url.return_value = mock_resp
from sunbeam import tools
result = tools.ensure_tool("kubectl")
self.assertEqual(dest.read_bytes(), new_data)
def test_unknown_tool_raises_value_error(self):
from sunbeam import tools
with self.assertRaises(ValueError):
tools.ensure_tool("notarealtool")
class TestRunTool(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.cache_patcher = patch("sunbeam.tools.CACHE_DIR", Path(self.tmpdir))
self.cache_patcher.start()
def tearDown(self):
self.cache_patcher.stop()
shutil.rmtree(self.tmpdir, ignore_errors=True)
def test_kustomize_prepends_cache_dir_to_path(self):
binary_data = b"#!/bin/sh"
dest = Path(self.tmpdir) / "kustomize"
dest.write_bytes(binary_data)
dest.chmod(dest.stat().st_mode | stat.S_IXUSR)
tools_spec = {"kustomize": {"url": "http://x", "sha256": ""}}
with patch("sunbeam.tools.TOOLS", tools_spec):
with patch("subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0)
from sunbeam import tools
tools.run_tool("kustomize", "build", ".")
call_kwargs = mock_run.call_args[1]
env = call_kwargs.get("env", {})
self.assertTrue(env.get("PATH", "").startswith(str(self.tmpdir)))
def test_non_kustomize_does_not_modify_path(self):
binary_data = b"#!/bin/sh"
dest = Path(self.tmpdir) / "kubectl"
dest.write_bytes(binary_data)
dest.chmod(dest.stat().st_mode | stat.S_IXUSR)
tools_spec = {"kubectl": {"url": "http://x", "sha256": ""}}
with patch("sunbeam.tools.TOOLS", tools_spec):
with patch("subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0)
from sunbeam import tools
import os
original_path = os.environ.get("PATH", "")
tools.run_tool("kubectl", "get", "pods")
call_kwargs = mock_run.call_args[1]
env = call_kwargs.get("env", {})
# PATH should not be modified (starts same as original)
self.assertFalse(env.get("PATH", "").startswith(str(self.tmpdir)))

106
sunbeam/tools.py Normal file
View File

@@ -0,0 +1,106 @@
"""Binary bundler — downloads kubectl, kustomize, helm at pinned versions.
Binaries are cached in ~/.local/share/sunbeam/bin/ and SHA256-verified.
"""
import hashlib
import io
import os
import stat
import subprocess
import tarfile
import urllib.request
from pathlib import Path
CACHE_DIR = Path.home() / ".local/share/sunbeam/bin"
TOOLS: dict[str, dict] = {
"kubectl": {
"version": "v1.32.2",
"url": "https://dl.k8s.io/release/v1.32.2/bin/darwin/arm64/kubectl",
"sha256": "", # set to actual hash; empty = skip verify
},
"kustomize": {
"version": "v5.6.0",
"url": "https://github.com/kubernetes-sigs/kustomize/releases/download/kustomize%2Fv5.6.0/kustomize_v5.6.0_darwin_arm64.tar.gz",
"sha256": "",
"extract": "kustomize",
},
"helm": {
"version": "v3.17.1",
"url": "https://get.helm.sh/helm-v3.17.1-darwin-arm64.tar.gz",
"sha256": "",
"extract": "darwin-arm64/helm",
},
}
def _sha256(path: Path) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
def ensure_tool(name: str) -> Path:
"""Return path to cached binary, downloading + verifying if needed."""
if name not in TOOLS:
raise ValueError(f"Unknown tool: {name}")
spec = TOOLS[name]
CACHE_DIR.mkdir(parents=True, exist_ok=True)
dest = CACHE_DIR / name
expected_sha = spec.get("sha256", "")
# Use cached binary if it exists and passes SHA check
if dest.exists():
if not expected_sha or _sha256(dest) == expected_sha:
return dest
# SHA mismatch — re-download
dest.unlink()
# Download
url = spec["url"]
with urllib.request.urlopen(url) as resp: # noqa: S310
data = resp.read()
# Extract from tar.gz if needed
extract_path = spec.get("extract")
if extract_path:
with tarfile.open(fileobj=io.BytesIO(data)) as tf:
member = tf.getmember(extract_path)
fobj = tf.extractfile(member)
binary_data = fobj.read()
else:
binary_data = data
# Write to cache
dest.write_bytes(binary_data)
# Verify SHA256 (after extraction)
if expected_sha:
actual = _sha256(dest)
if actual != expected_sha:
dest.unlink()
raise RuntimeError(
f"SHA256 mismatch for {name}: expected {expected_sha}, got {actual}"
)
# Make executable
dest.chmod(dest.stat().st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
return dest
def run_tool(name: str, *args, **kwargs) -> subprocess.CompletedProcess:
"""Run a bundled tool, ensuring it is downloaded first.
For kustomize: prepends CACHE_DIR to PATH so helm is found.
"""
bin_path = ensure_tool(name)
env = kwargs.pop("env", None)
if env is None:
env = os.environ.copy()
# kustomize needs helm on PATH for helm chart rendering
if name == "kustomize":
env["PATH"] = str(CACHE_DIR) + os.pathsep + env.get("PATH", "")
return subprocess.run([str(bin_path), *args], env=env, **kwargs)