feat: initial sunbeam-proxy implementation
Custom Pingora-based edge proxy for the Sunbeam infrastructure stack. - HTTPS termination: mkcert file-based (local dev) or rustls-acme ACME (production) - Host-prefix routing with path-based sub-routing (auth virtual host) - HTTP→HTTPS redirect, WebSocket passthrough - cert-manager HTTP-01 challenge routing via Kubernetes Ingress watcher - TLS cert auto-reload via K8s Secret watcher - JSON structured audit logging (tracing-subscriber) - OpenTelemetry OTLP stub (disabled by default) - Multi-stage Dockerfile: musl static binary on chainguard/static distroless image Signed-off-by: Sienna Meridian Satterwhite <sienna@sunbeam.pt>
This commit is contained in:
5
.dockerignore
Normal file
5
.dockerignore
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
target/
|
||||||
|
.git/
|
||||||
|
.gitignore
|
||||||
|
*.md
|
||||||
|
certs/
|
||||||
5
.gitignore
vendored
Normal file
5
.gitignore
vendored
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
/target/
|
||||||
|
certs/
|
||||||
|
*.pem
|
||||||
|
*.key
|
||||||
|
*.crt
|
||||||
4089
Cargo.lock
generated
Normal file
4089
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
47
Cargo.toml
Normal file
47
Cargo.toml
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
[package]
|
||||||
|
name = "sunbeam-proxy"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
# Pingora with rustls backend (pure Rust TLS, no BoringSSL C build)
|
||||||
|
pingora = { version = "0.7", features = ["rustls"] }
|
||||||
|
pingora-proxy = { version = "0.7", features = ["rustls"] }
|
||||||
|
pingora-core = { version = "0.7", features = ["rustls"] }
|
||||||
|
pingora-http = "0.7"
|
||||||
|
|
||||||
|
# HTTP header constants
|
||||||
|
http = "1"
|
||||||
|
|
||||||
|
# Config
|
||||||
|
serde = { version = "1", features = ["derive"] }
|
||||||
|
toml = "0.8"
|
||||||
|
|
||||||
|
# Async
|
||||||
|
tokio = { version = "1", features = ["full"] }
|
||||||
|
futures = "0.3"
|
||||||
|
async-trait = "0.1"
|
||||||
|
|
||||||
|
# Structured logging + OTEL
|
||||||
|
tracing = "0.1"
|
||||||
|
tracing-subscriber = { version = "0.3", features = ["json", "env-filter"] }
|
||||||
|
tracing-opentelemetry = "0.28"
|
||||||
|
opentelemetry = { version = "0.27", features = ["trace"] }
|
||||||
|
opentelemetry_sdk = { version = "0.27", features = ["rt-tokio"] }
|
||||||
|
opentelemetry-otlp = { version = "0.27", features = ["http-proto", "reqwest-client"] }
|
||||||
|
serde_json = "1"
|
||||||
|
anyhow = "1"
|
||||||
|
|
||||||
|
# Rustls crypto provider — must be installed before any TLS init
|
||||||
|
rustls = { version = "0.23", features = ["aws-lc-rs"] }
|
||||||
|
|
||||||
|
# K8s watcher for cert/config hot-reload
|
||||||
|
kube = { version = "3", features = ["runtime", "client"] }
|
||||||
|
k8s-openapi = { version = "0.27", features = ["v1_35"] }
|
||||||
|
libc = "0.2"
|
||||||
|
|
||||||
|
[profile.release]
|
||||||
|
opt-level = 3
|
||||||
|
lto = true
|
||||||
|
codegen-units = 1
|
||||||
|
strip = true
|
||||||
67
Dockerfile
Normal file
67
Dockerfile
Normal file
@@ -0,0 +1,67 @@
|
|||||||
|
# ── Stage 1: build ──────────────────────────────────────────────
|
||||||
|
# rust:slim tracks the latest stable Rust release.
|
||||||
|
# Multi-arch image; Docker buildx selects the native platform image.
|
||||||
|
FROM rust:slim AS builder
|
||||||
|
|
||||||
|
ARG TARGETARCH
|
||||||
|
|
||||||
|
# musl-tools: musl-gcc for static linking.
|
||||||
|
# curl: download tini init binary.
|
||||||
|
# No cmake/go needed: we use the rustls feature flag (pure Rust TLS).
|
||||||
|
RUN apt-get update && apt-get install -y musl-tools curl cmake && rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
|
# Map Docker TARGETARCH to the appropriate Rust musl target,
|
||||||
|
# then configure Cargo to use musl-gcc as the linker for that target.
|
||||||
|
RUN case "${TARGETARCH}" in \
|
||||||
|
"amd64") RUST_TARGET="x86_64-unknown-linux-musl" ;; \
|
||||||
|
"arm64") RUST_TARGET="aarch64-unknown-linux-musl" ;; \
|
||||||
|
*) echo "Unsupported arch: ${TARGETARCH}" && exit 1 ;; \
|
||||||
|
esac && \
|
||||||
|
echo "${RUST_TARGET}" > /rust-target && \
|
||||||
|
rustup target add "${RUST_TARGET}" && \
|
||||||
|
mkdir -p /root/.cargo && \
|
||||||
|
printf '[target.%s]\nlinker = "musl-gcc"\n' "${RUST_TARGET}" \
|
||||||
|
>> /root/.cargo/config.toml
|
||||||
|
|
||||||
|
ENV RUSTFLAGS="-C target-feature=+crt-static"
|
||||||
|
WORKDIR /build
|
||||||
|
|
||||||
|
# Cache dependency compilation separately from source changes.
|
||||||
|
# RUSTFLAGS must match the real build or Cargo will recompile everything.
|
||||||
|
COPY Cargo.toml Cargo.lock ./
|
||||||
|
RUN mkdir src && \
|
||||||
|
echo 'fn main() {}' > src/main.rs && \
|
||||||
|
cargo build --release --target "$(cat /rust-target)" ; \
|
||||||
|
rm -rf src
|
||||||
|
|
||||||
|
# Build the real binary.
|
||||||
|
COPY src/ ./src/
|
||||||
|
RUN touch src/main.rs && \
|
||||||
|
cargo build --release --target "$(cat /rust-target)" && \
|
||||||
|
cp "target/$(cat /rust-target)/release/sunbeam-proxy" /sunbeam-proxy
|
||||||
|
|
||||||
|
# Download tini static init binary (musl, no glibc dependency).
|
||||||
|
# tini as PID 1 ensures the container stays alive when Pingora re-execs itself
|
||||||
|
# during a graceful upgrade: the new process is re-parented to tini, and tini
|
||||||
|
# correctly reaps the old process when it exits after draining connections.
|
||||||
|
RUN case "${TARGETARCH}" in \
|
||||||
|
"amd64") TINI_ARCH="amd64" ;; \
|
||||||
|
"arm64") TINI_ARCH="arm64" ;; \
|
||||||
|
*) echo "Unsupported arch: ${TARGETARCH}" && exit 1 ;; \
|
||||||
|
esac && \
|
||||||
|
curl -fsSL -o /tini \
|
||||||
|
"https://github.com/krallin/tini/releases/download/v0.19.0/tini-static-${TINI_ARCH}" && \
|
||||||
|
chmod +x /tini
|
||||||
|
|
||||||
|
# ── Stage 2: distroless final ────────────────────────────────────
|
||||||
|
# cgr.dev/chainguard/static is multi-arch (amd64 + arm64).
|
||||||
|
# No shell, no package manager — minimal attack surface.
|
||||||
|
FROM cgr.dev/chainguard/static:latest
|
||||||
|
|
||||||
|
COPY --from=builder /tini /tini
|
||||||
|
COPY --from=builder /sunbeam-proxy /usr/local/bin/sunbeam-proxy
|
||||||
|
|
||||||
|
EXPOSE 80 443
|
||||||
|
|
||||||
|
# tini as PID 1 so Pingora's graceful-upgrade re-exec doesn't kill the container.
|
||||||
|
ENTRYPOINT ["/tini", "--", "/usr/local/bin/sunbeam-proxy"]
|
||||||
67
dev.toml
Normal file
67
dev.toml
Normal file
@@ -0,0 +1,67 @@
|
|||||||
|
# Local dev config for running sunbeam-proxy directly on macOS.
|
||||||
|
#
|
||||||
|
# Uses non-privileged ports (8080/8443) and a mkcert cert for localhost.
|
||||||
|
# Certs are generated once with:
|
||||||
|
# mkcert -cert-file certs/tls.crt -key-file certs/tls.key localhost 127.0.0.1
|
||||||
|
#
|
||||||
|
# Run with:
|
||||||
|
# SUNBEAM_CONFIG=dev.toml RUST_LOG=info cargo run
|
||||||
|
#
|
||||||
|
# Then test:
|
||||||
|
# curl -v http://localhost:8080/ # → 301 to https
|
||||||
|
# curl -vk https://localhost:8443/ -H "Host: docs.localhost" # → 502 (backend unreachable, routing works)
|
||||||
|
# curl -vk https://localhost:8443/.well-known/acme-challenge/test # → 404 (no active challenge)
|
||||||
|
|
||||||
|
[listen]
|
||||||
|
http = "0.0.0.0:8080"
|
||||||
|
https = "0.0.0.0:8443"
|
||||||
|
|
||||||
|
[tls]
|
||||||
|
cert_path = "certs/tls.crt"
|
||||||
|
key_path = "certs/tls.key"
|
||||||
|
|
||||||
|
[telemetry]
|
||||||
|
otlp_endpoint = ""
|
||||||
|
|
||||||
|
# Dummy routes that mirror production — backends won't be reachable locally
|
||||||
|
# but routing, TLS termination, and redirect logic are fully exercised.
|
||||||
|
|
||||||
|
[[routes]]
|
||||||
|
host_prefix = "docs"
|
||||||
|
backend = "http://127.0.0.1:9001"
|
||||||
|
websocket = true
|
||||||
|
|
||||||
|
[[routes]]
|
||||||
|
host_prefix = "meet"
|
||||||
|
backend = "http://127.0.0.1:9002"
|
||||||
|
websocket = true
|
||||||
|
|
||||||
|
[[routes]]
|
||||||
|
host_prefix = "drive"
|
||||||
|
backend = "http://127.0.0.1:9003"
|
||||||
|
|
||||||
|
[[routes]]
|
||||||
|
host_prefix = "mail"
|
||||||
|
backend = "http://127.0.0.1:9004"
|
||||||
|
|
||||||
|
[[routes]]
|
||||||
|
host_prefix = "chat"
|
||||||
|
backend = "http://127.0.0.1:9005"
|
||||||
|
websocket = true
|
||||||
|
|
||||||
|
[[routes]]
|
||||||
|
host_prefix = "people"
|
||||||
|
backend = "http://127.0.0.1:9006"
|
||||||
|
|
||||||
|
[[routes]]
|
||||||
|
host_prefix = "src"
|
||||||
|
backend = "http://127.0.0.1:9007"
|
||||||
|
websocket = true
|
||||||
|
|
||||||
|
[[routes]]
|
||||||
|
host_prefix = "auth"
|
||||||
|
backend = "http://127.0.0.1:9008"
|
||||||
|
|
||||||
|
[[routes]]
|
||||||
|
host_prefix = "s3"
|
||||||
|
backend = "http://127.0.0.1:9009"
|
||||||
96
src/acme.rs
Normal file
96
src/acme.rs
Normal file
@@ -0,0 +1,96 @@
|
|||||||
|
use futures::StreamExt;
|
||||||
|
use k8s_openapi::api::networking::v1::Ingress;
|
||||||
|
use kube::{runtime::watcher, Api, Client};
|
||||||
|
use std::{collections::HashMap, sync::{Arc, RwLock}};
|
||||||
|
|
||||||
|
/// Maps a challenge path to the backend address that can answer it.
|
||||||
|
///
|
||||||
|
/// Key: `/.well-known/acme-challenge/<token>`
|
||||||
|
/// Value: `cm-acme-http-solver-<hash>.ingress.svc.cluster.local:8089`
|
||||||
|
///
|
||||||
|
/// cert-manager creates one Ingress per challenge domain with exactly this
|
||||||
|
/// path and backend. Our proxy consults this table to route each challenge
|
||||||
|
/// request to the specific solver pod that holds the matching token, which
|
||||||
|
/// is required for multi-SAN certificates (one solver pod per domain, all
|
||||||
|
/// running concurrently).
|
||||||
|
///
|
||||||
|
/// Uses std::sync::RwLock (not tokio) so reads are wait-free and the table
|
||||||
|
/// can be written from the watcher runtime without cross-runtime waker issues.
|
||||||
|
pub type AcmeRoutes = Arc<RwLock<HashMap<String, String>>>;
|
||||||
|
|
||||||
|
/// Watch Ingress objects in the ingress namespace and maintain `routes`.
|
||||||
|
///
|
||||||
|
/// cert-manager creates an Ingress for each HTTP-01 challenge it manages.
|
||||||
|
/// The Ingress contains a path rule for `/.well-known/acme-challenge/<token>`
|
||||||
|
/// pointing to a per-challenge solver Service. We populate the route table
|
||||||
|
/// from these rules so the proxy can forward each challenge token to the
|
||||||
|
/// correct solver pod without the nondeterminism of a shared stable Service.
|
||||||
|
pub async fn watch_ingresses(client: Client, routes: AcmeRoutes) {
|
||||||
|
let api: Api<Ingress> = Api::namespaced(client, "ingress");
|
||||||
|
|
||||||
|
// Verify Ingress API access before entering the watch loop. A failure here
|
||||||
|
// almost always means cert-manager is not installed or RBAC is wrong.
|
||||||
|
if let Err(e) = api.list(&Default::default()).await {
|
||||||
|
tracing::error!(
|
||||||
|
error = %e,
|
||||||
|
"initial Ingress list failed — is cert-manager installed? \
|
||||||
|
is the pingora-watcher Role bound correctly?"
|
||||||
|
);
|
||||||
|
// Continue into the watch loop; it will surface further errors.
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut stream = Box::pin(watcher(api, watcher::Config::default()));
|
||||||
|
|
||||||
|
while let Some(result) = stream.next().await {
|
||||||
|
match result {
|
||||||
|
Ok(watcher::Event::Apply(ing)) => {
|
||||||
|
let mut map = routes.write().unwrap_or_else(|e| e.into_inner());
|
||||||
|
upsert_routes(&ing, &mut map);
|
||||||
|
}
|
||||||
|
Ok(watcher::Event::Delete(ing)) => {
|
||||||
|
let mut map = routes.write().unwrap_or_else(|e| e.into_inner());
|
||||||
|
remove_routes(&ing, &mut map);
|
||||||
|
}
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(error = %e, "Ingress watcher error; retrying in 10s");
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn upsert_routes(ingress: &Ingress, map: &mut HashMap<String, String>) {
|
||||||
|
let Some(spec) = &ingress.spec else { return };
|
||||||
|
for rule in spec.rules.as_deref().unwrap_or(&[]) {
|
||||||
|
let Some(http) = &rule.http else { continue };
|
||||||
|
for p in &http.paths {
|
||||||
|
let Some(path) = p.path.as_deref() else { continue };
|
||||||
|
if !path.starts_with("/.well-known/acme-challenge/") {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let Some(svc) = p.backend.service.as_ref() else { continue };
|
||||||
|
let Some(port) = svc.port.as_ref().and_then(|p| p.number) else { continue };
|
||||||
|
let backend = format!(
|
||||||
|
"{}.ingress.svc.cluster.local:{port}",
|
||||||
|
svc.name
|
||||||
|
);
|
||||||
|
tracing::debug!(path, %backend, "added ACME challenge route");
|
||||||
|
map.insert(path.to_string(), backend);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn remove_routes(ingress: &Ingress, map: &mut HashMap<String, String>) {
|
||||||
|
let Some(spec) = &ingress.spec else { return };
|
||||||
|
for rule in spec.rules.as_deref().unwrap_or(&[]) {
|
||||||
|
let Some(http) = &rule.http else { continue };
|
||||||
|
for p in &http.paths {
|
||||||
|
let Some(path) = p.path.as_deref() else { continue };
|
||||||
|
if path.starts_with("/.well-known/acme-challenge/") {
|
||||||
|
tracing::debug!(path, "removed ACME challenge route");
|
||||||
|
map.remove(path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
50
src/cert.rs
Normal file
50
src/cert.rs
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
use anyhow::{Context, Result};
|
||||||
|
use k8s_openapi::api::core::v1::Secret;
|
||||||
|
use kube::{Api, Client};
|
||||||
|
|
||||||
|
/// Fetch the `pingora-tls` Secret from the ingress namespace and write
|
||||||
|
/// `tls.crt` / `tls.key` to the paths declared in config.toml.
|
||||||
|
///
|
||||||
|
/// Called at startup (non-upgrade) so the proxy never depends on kubelet
|
||||||
|
/// volume-sync timing: the cert files are written directly from the K8s API
|
||||||
|
/// before `svc.add_tls()` is called.
|
||||||
|
pub async fn fetch_and_write(client: &Client, cert_path: &str, key_path: &str) -> Result<()> {
|
||||||
|
let api: Api<Secret> = Api::namespaced(client.clone(), "ingress");
|
||||||
|
let secret = api
|
||||||
|
.get("pingora-tls")
|
||||||
|
.await
|
||||||
|
.context("fetching pingora-tls Secret from K8s API")?;
|
||||||
|
write_from_secret(&secret, cert_path, key_path)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Write `tls.crt` and `tls.key` from a Secret data map to the configured paths.
|
||||||
|
///
|
||||||
|
/// k8s-openapi base64-decodes Secret values automatically, so `data["tls.crt"].0`
|
||||||
|
/// is the raw PEM bytes ready to write. Called both from `fetch_and_write` at
|
||||||
|
/// startup and directly from the cert watcher when an `Apply` event delivers
|
||||||
|
/// the updated Secret object without requiring an additional API round-trip.
|
||||||
|
pub fn write_from_secret(secret: &Secret, cert_path: &str, key_path: &str) -> Result<()> {
|
||||||
|
let data = secret
|
||||||
|
.data
|
||||||
|
.as_ref()
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("pingora-tls Secret has no data"))?;
|
||||||
|
|
||||||
|
let crt = data
|
||||||
|
.get("tls.crt")
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("pingora-tls missing tls.crt"))?;
|
||||||
|
let key = data
|
||||||
|
.get("tls.key")
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("pingora-tls missing tls.key"))?;
|
||||||
|
|
||||||
|
// /etc/tls is an emptyDir; create it if the pod just started.
|
||||||
|
if let Some(parent) = std::path::Path::new(cert_path).parent() {
|
||||||
|
std::fs::create_dir_all(parent)
|
||||||
|
.with_context(|| format!("creating cert dir {}", parent.display()))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::fs::write(cert_path, &crt.0).with_context(|| format!("writing {cert_path}"))?;
|
||||||
|
std::fs::write(key_path, &key.0).with_context(|| format!("writing {key_path}"))?;
|
||||||
|
|
||||||
|
tracing::info!(cert_path, key_path, "cert files written from K8s Secret");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
61
src/config.rs
Normal file
61
src/config.rs
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
use anyhow::{Context, Result};
|
||||||
|
use serde::Deserialize;
|
||||||
|
use std::fs;
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Clone)]
|
||||||
|
pub struct Config {
|
||||||
|
pub listen: ListenConfig,
|
||||||
|
pub tls: TlsFileConfig,
|
||||||
|
pub telemetry: TelemetryConfig,
|
||||||
|
pub routes: Vec<RouteConfig>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Clone)]
|
||||||
|
pub struct ListenConfig {
|
||||||
|
pub http: String,
|
||||||
|
pub https: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Clone)]
|
||||||
|
pub struct TlsFileConfig {
|
||||||
|
pub cert_path: String,
|
||||||
|
pub key_path: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Clone)]
|
||||||
|
pub struct TelemetryConfig {
|
||||||
|
pub otlp_endpoint: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A path-prefix sub-route within a virtual host.
|
||||||
|
/// Matched longest-prefix-first when multiple entries share a prefix.
|
||||||
|
#[derive(Debug, Deserialize, Clone)]
|
||||||
|
pub struct PathRoute {
|
||||||
|
pub prefix: String,
|
||||||
|
pub backend: String,
|
||||||
|
/// Strip the matched prefix before forwarding to the backend.
|
||||||
|
#[serde(default)]
|
||||||
|
pub strip_prefix: bool,
|
||||||
|
#[serde(default)]
|
||||||
|
pub websocket: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Clone)]
|
||||||
|
pub struct RouteConfig {
|
||||||
|
pub host_prefix: String,
|
||||||
|
pub backend: String,
|
||||||
|
#[serde(default)]
|
||||||
|
pub websocket: bool,
|
||||||
|
/// Optional path-based sub-routes (longest prefix wins).
|
||||||
|
/// If the request path matches a sub-route, its backend is used instead.
|
||||||
|
#[serde(default)]
|
||||||
|
pub paths: Vec<PathRoute>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Config {
|
||||||
|
pub fn load(path: &str) -> Result<Self> {
|
||||||
|
let raw = fs::read_to_string(path)
|
||||||
|
.with_context(|| format!("reading config from {path}"))?;
|
||||||
|
toml::from_str(&raw).with_context(|| "parsing config.toml")
|
||||||
|
}
|
||||||
|
}
|
||||||
127
src/main.rs
Normal file
127
src/main.rs
Normal file
@@ -0,0 +1,127 @@
|
|||||||
|
mod acme;
|
||||||
|
mod cert;
|
||||||
|
mod config;
|
||||||
|
mod proxy;
|
||||||
|
mod telemetry;
|
||||||
|
mod watcher;
|
||||||
|
|
||||||
|
use std::{collections::HashMap, sync::Arc};
|
||||||
|
|
||||||
|
use anyhow::Result;
|
||||||
|
use kube::Client;
|
||||||
|
use pingora::server::{configuration::Opt, Server};
|
||||||
|
use pingora_proxy::http_proxy_service;
|
||||||
|
use proxy::SunbeamProxy;
|
||||||
|
use std::sync::RwLock;
|
||||||
|
|
||||||
|
fn main() -> Result<()> {
|
||||||
|
// Install the aws-lc-rs crypto provider for rustls before any TLS init.
|
||||||
|
// Required because rustls 0.23 no longer auto-selects a provider at compile time.
|
||||||
|
rustls::crypto::aws_lc_rs::default_provider()
|
||||||
|
.install_default()
|
||||||
|
.expect("crypto provider already installed");
|
||||||
|
|
||||||
|
let config_path = std::env::var("SUNBEAM_CONFIG")
|
||||||
|
.unwrap_or_else(|_| "/etc/pingora/config.toml".to_string());
|
||||||
|
let cfg = config::Config::load(&config_path)?;
|
||||||
|
|
||||||
|
// 1. Init telemetry (JSON logs + optional OTEL traces).
|
||||||
|
telemetry::init(&cfg.telemetry.otlp_endpoint);
|
||||||
|
|
||||||
|
// 2. Detect --upgrade flag. When present, Pingora inherits listening socket
|
||||||
|
// FDs from the upgrade Unix socket instead of binding fresh ports, enabling
|
||||||
|
// zero-downtime cert/config reloads triggered by the K8s watcher below.
|
||||||
|
let upgrade = std::env::args().any(|a| a == "--upgrade");
|
||||||
|
|
||||||
|
// 3. Fetch the TLS cert from K8s before Pingora binds the TLS port.
|
||||||
|
// The Client is created and dropped within this temp runtime — we do NOT
|
||||||
|
// carry it across runtime boundaries, which would kill its tower workers.
|
||||||
|
// The watcher thread creates its own fresh Client on its own runtime.
|
||||||
|
let k8s_available = {
|
||||||
|
let rt = tokio::runtime::Builder::new_current_thread()
|
||||||
|
.enable_all()
|
||||||
|
.build()?;
|
||||||
|
rt.block_on(async {
|
||||||
|
match Client::try_default().await {
|
||||||
|
Ok(c) => {
|
||||||
|
if !upgrade {
|
||||||
|
if let Err(e) =
|
||||||
|
cert::fetch_and_write(&c, &cfg.tls.cert_path, &cfg.tls.key_path).await
|
||||||
|
{
|
||||||
|
// Non-fatal: Secret may not exist yet on first deploy (cert-manager
|
||||||
|
// is still issuing), or the Secret name may differ in dev.
|
||||||
|
tracing::warn!(error = %e, "cert fetch from K8s failed; using existing files");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
true
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(error = %e, "no K8s client; cert auto-reload and ACME routing disabled");
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
let opt = Opt {
|
||||||
|
upgrade,
|
||||||
|
daemon: false,
|
||||||
|
nocapture: false,
|
||||||
|
test: false,
|
||||||
|
conf: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
// 4. Create Pingora server and bootstrap (binds ports or inherits FDs).
|
||||||
|
let mut server = Server::new(Some(opt))?;
|
||||||
|
server.bootstrap();
|
||||||
|
|
||||||
|
// 5. Shared ACME challenge route table. Populated by the Ingress watcher;
|
||||||
|
// consulted by the proxy for every /.well-known/acme-challenge/ request.
|
||||||
|
// Uses std::sync::RwLock so reads are sync and lock-guard-safe across
|
||||||
|
// Pingora's async proxy calls without cross-runtime waker concerns.
|
||||||
|
let acme_routes: acme::AcmeRoutes = Arc::new(RwLock::new(HashMap::new()));
|
||||||
|
|
||||||
|
let proxy = SunbeamProxy {
|
||||||
|
routes: cfg.routes.clone(),
|
||||||
|
acme_routes: acme_routes.clone(),
|
||||||
|
};
|
||||||
|
let mut svc = http_proxy_service(&server.configuration, proxy);
|
||||||
|
|
||||||
|
// Port 80: plain HTTP — 301 → HTTPS, except for ACME HTTP-01 challenges.
|
||||||
|
// Port 443: TLS-terminated HTTPS. Cert written to /etc/tls/ by cert::* above.
|
||||||
|
svc.add_tcp(&cfg.listen.http);
|
||||||
|
svc.add_tls(&cfg.listen.https, &cfg.tls.cert_path, &cfg.tls.key_path)?;
|
||||||
|
|
||||||
|
server.add_service(svc);
|
||||||
|
|
||||||
|
// 6. Background K8s watchers on their own OS thread + tokio runtime so they
|
||||||
|
// don't interfere with Pingora's internal runtime. A fresh Client is
|
||||||
|
// created here so its tower workers live on this runtime (not the
|
||||||
|
// now-dropped temp runtime from step 3).
|
||||||
|
if k8s_available {
|
||||||
|
let cert_path = cfg.tls.cert_path.clone();
|
||||||
|
let key_path = cfg.tls.key_path.clone();
|
||||||
|
std::thread::spawn(move || {
|
||||||
|
let rt = tokio::runtime::Builder::new_multi_thread()
|
||||||
|
.enable_all()
|
||||||
|
.build()
|
||||||
|
.expect("watcher runtime");
|
||||||
|
rt.block_on(async move {
|
||||||
|
let client = match Client::try_default().await {
|
||||||
|
Ok(c) => c,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(error = %e, "watcher: failed to create K8s client; watchers disabled");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
tokio::join!(
|
||||||
|
acme::watch_ingresses(client.clone(), acme_routes),
|
||||||
|
watcher::run_watcher(client, cert_path, key_path),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
tracing::info!(upgrade, "sunbeam-proxy starting");
|
||||||
|
server.run_forever();
|
||||||
|
}
|
||||||
266
src/proxy.rs
Normal file
266
src/proxy.rs
Normal file
@@ -0,0 +1,266 @@
|
|||||||
|
use crate::acme::AcmeRoutes;
|
||||||
|
use crate::config::RouteConfig;
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use http::header::{CONNECTION, HOST, UPGRADE};
|
||||||
|
use pingora_core::{upstreams::peer::HttpPeer, Result};
|
||||||
|
use pingora_http::{RequestHeader, ResponseHeader};
|
||||||
|
use pingora_proxy::{ProxyHttp, Session};
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
|
pub struct SunbeamProxy {
|
||||||
|
pub routes: Vec<RouteConfig>,
|
||||||
|
/// Per-challenge route table populated by the Ingress watcher.
|
||||||
|
/// Maps `/.well-known/acme-challenge/<token>` → solver service address.
|
||||||
|
pub acme_routes: AcmeRoutes,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct RequestCtx {
|
||||||
|
pub route: Option<RouteConfig>,
|
||||||
|
pub start_time: Instant,
|
||||||
|
/// Resolved solver backend address for this ACME challenge, if applicable.
|
||||||
|
pub acme_backend: Option<String>,
|
||||||
|
/// Path prefix to strip before forwarding to the upstream (e.g. "/kratos").
|
||||||
|
pub strip_prefix: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SunbeamProxy {
|
||||||
|
fn find_route(&self, prefix: &str) -> Option<&RouteConfig> {
|
||||||
|
self.routes.iter().find(|r| r.host_prefix == prefix)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn extract_host(session: &Session) -> String {
|
||||||
|
session
|
||||||
|
.req_header()
|
||||||
|
.headers
|
||||||
|
.get(HOST)
|
||||||
|
.and_then(|v| v.to_str().ok())
|
||||||
|
.unwrap_or("")
|
||||||
|
.to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Strip the scheme prefix from a backend URL like `http://host:port`.
|
||||||
|
fn backend_addr(backend: &str) -> &str {
|
||||||
|
backend
|
||||||
|
.trim_start_matches("https://")
|
||||||
|
.trim_start_matches("http://")
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns true if the downstream connection is plain HTTP (no TLS).
|
||||||
|
fn is_plain_http(session: &Session) -> bool {
|
||||||
|
session
|
||||||
|
.digest()
|
||||||
|
.map(|d| d.ssl_digest.is_none())
|
||||||
|
.unwrap_or(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl ProxyHttp for SunbeamProxy {
|
||||||
|
type CTX = RequestCtx;
|
||||||
|
|
||||||
|
fn new_ctx(&self) -> RequestCtx {
|
||||||
|
RequestCtx {
|
||||||
|
route: None,
|
||||||
|
start_time: Instant::now(),
|
||||||
|
acme_backend: None,
|
||||||
|
strip_prefix: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// HTTP → HTTPS redirect; ACME HTTP-01 challenges pass through on plain HTTP.
|
||||||
|
async fn request_filter(
|
||||||
|
&self,
|
||||||
|
session: &mut Session,
|
||||||
|
ctx: &mut RequestCtx,
|
||||||
|
) -> Result<bool>
|
||||||
|
where
|
||||||
|
Self::CTX: Send + Sync,
|
||||||
|
{
|
||||||
|
if is_plain_http(session) {
|
||||||
|
let path = session.req_header().uri.path().to_string();
|
||||||
|
|
||||||
|
// cert-manager HTTP-01 challenge: look up the token path in the
|
||||||
|
// Ingress-backed route table. Each challenge Ingress maps exactly
|
||||||
|
// one token to exactly one solver Service, so this routes the request
|
||||||
|
// to the right solver pod even when multiple challenges run in parallel.
|
||||||
|
if path.starts_with("/.well-known/acme-challenge/") {
|
||||||
|
// Drop the guard before any await point (RwLockReadGuard is !Send).
|
||||||
|
let backend = self
|
||||||
|
.acme_routes
|
||||||
|
.read()
|
||||||
|
.unwrap_or_else(|e| e.into_inner())
|
||||||
|
.get(&path)
|
||||||
|
.cloned();
|
||||||
|
if let Some(backend) = backend {
|
||||||
|
ctx.acme_backend = Some(backend);
|
||||||
|
return Ok(false); // pass to upstream_peer
|
||||||
|
}
|
||||||
|
// No route yet: challenge Ingress hasn't arrived from cert-manager.
|
||||||
|
let mut resp = ResponseHeader::build(404, None)?;
|
||||||
|
resp.insert_header("Content-Length", "0")?;
|
||||||
|
session.write_response_header(Box::new(resp), true).await?;
|
||||||
|
return Ok(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
// All other plain-HTTP traffic: redirect to HTTPS.
|
||||||
|
let host = extract_host(session);
|
||||||
|
let location = format!("https://{host}{path}");
|
||||||
|
let mut resp = ResponseHeader::build(301, None)?;
|
||||||
|
resp.insert_header("Location", location)?;
|
||||||
|
resp.insert_header("Content-Length", "0")?;
|
||||||
|
session.write_response_header(Box::new(resp), true).await?;
|
||||||
|
return Ok(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reject unknown host prefixes with 404.
|
||||||
|
let host = extract_host(session);
|
||||||
|
let prefix = host.split('.').next().unwrap_or("");
|
||||||
|
if self.find_route(prefix).is_none() {
|
||||||
|
let mut resp = ResponseHeader::build(404, None)?;
|
||||||
|
resp.insert_header("Content-Length", "0")?;
|
||||||
|
session.write_response_header(Box::new(resp), true).await?;
|
||||||
|
return Ok(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn upstream_peer(
|
||||||
|
&self,
|
||||||
|
session: &mut Session,
|
||||||
|
ctx: &mut RequestCtx,
|
||||||
|
) -> Result<Box<HttpPeer>> {
|
||||||
|
// ACME challenge: backend was resolved in request_filter.
|
||||||
|
if let Some(backend) = &ctx.acme_backend {
|
||||||
|
return Ok(Box::new(HttpPeer::new(
|
||||||
|
backend_addr(backend),
|
||||||
|
false,
|
||||||
|
String::new(),
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
let host = extract_host(session);
|
||||||
|
let prefix = host.split('.').next().unwrap_or("");
|
||||||
|
let route = self
|
||||||
|
.find_route(prefix)
|
||||||
|
.expect("route already validated in request_filter");
|
||||||
|
|
||||||
|
let path = session.req_header().uri.path().to_string();
|
||||||
|
|
||||||
|
// Check path sub-routes (longest matching prefix wins).
|
||||||
|
let path_route = route
|
||||||
|
.paths
|
||||||
|
.iter()
|
||||||
|
.filter(|p| path.starts_with(p.prefix.as_str()))
|
||||||
|
.max_by_key(|p| p.prefix.len());
|
||||||
|
|
||||||
|
if let Some(pr) = path_route {
|
||||||
|
if pr.strip_prefix {
|
||||||
|
ctx.strip_prefix = Some(pr.prefix.clone());
|
||||||
|
}
|
||||||
|
ctx.route = Some(crate::config::RouteConfig {
|
||||||
|
host_prefix: route.host_prefix.clone(),
|
||||||
|
backend: pr.backend.clone(),
|
||||||
|
websocket: pr.websocket || route.websocket,
|
||||||
|
paths: vec![],
|
||||||
|
});
|
||||||
|
return Ok(Box::new(HttpPeer::new(
|
||||||
|
backend_addr(&pr.backend),
|
||||||
|
false,
|
||||||
|
String::new(),
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx.route = Some(route.clone());
|
||||||
|
Ok(Box::new(HttpPeer::new(
|
||||||
|
backend_addr(&route.backend),
|
||||||
|
false,
|
||||||
|
String::new(),
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Copy WebSocket upgrade headers and apply path prefix stripping.
|
||||||
|
async fn upstream_request_filter(
|
||||||
|
&self,
|
||||||
|
session: &mut Session,
|
||||||
|
upstream_req: &mut RequestHeader,
|
||||||
|
ctx: &mut RequestCtx,
|
||||||
|
) -> Result<()>
|
||||||
|
where
|
||||||
|
Self::CTX: Send + Sync,
|
||||||
|
{
|
||||||
|
if ctx.route.as_ref().map(|r| r.websocket).unwrap_or(false) {
|
||||||
|
for name in &[CONNECTION, UPGRADE] {
|
||||||
|
if let Some(val) = session.req_header().headers.get(name.clone()) {
|
||||||
|
upstream_req.insert_header(name.clone(), val)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Strip path prefix before forwarding (e.g. /kratos → /).
|
||||||
|
if let Some(prefix) = &ctx.strip_prefix {
|
||||||
|
let old_uri = upstream_req.uri.clone();
|
||||||
|
let old_path = old_uri.path();
|
||||||
|
if let Some(stripped) = old_path.strip_prefix(prefix.as_str()) {
|
||||||
|
let new_path = if stripped.is_empty() { "/" } else { stripped };
|
||||||
|
let query_part = old_uri
|
||||||
|
.query()
|
||||||
|
.map(|q| format!("?{q}"))
|
||||||
|
.unwrap_or_default();
|
||||||
|
let new_pq: http::uri::PathAndQuery =
|
||||||
|
format!("{new_path}{query_part}").parse().map_err(|e| {
|
||||||
|
pingora_core::Error::because(
|
||||||
|
pingora_core::ErrorType::InternalError,
|
||||||
|
"invalid uri after prefix strip",
|
||||||
|
e,
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
let mut parts = old_uri.into_parts();
|
||||||
|
parts.path_and_query = Some(new_pq);
|
||||||
|
upstream_req.set_uri(
|
||||||
|
http::Uri::from_parts(parts).expect("valid uri parts"),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Emit a structured JSON audit log line for every request.
|
||||||
|
async fn logging(
|
||||||
|
&self,
|
||||||
|
session: &mut Session,
|
||||||
|
error: Option<&pingora_core::Error>,
|
||||||
|
ctx: &mut RequestCtx,
|
||||||
|
) where
|
||||||
|
Self::CTX: Send + Sync,
|
||||||
|
{
|
||||||
|
let status = session
|
||||||
|
.response_written()
|
||||||
|
.map_or(0, |r| r.status.as_u16());
|
||||||
|
let duration_ms = ctx.start_time.elapsed().as_millis();
|
||||||
|
let backend = ctx
|
||||||
|
.route
|
||||||
|
.as_ref()
|
||||||
|
.map(|r| r.backend.as_str())
|
||||||
|
.unwrap_or("-");
|
||||||
|
let client_ip = session
|
||||||
|
.client_addr()
|
||||||
|
.map(|a| a.to_string())
|
||||||
|
.unwrap_or_else(|| "-".to_string());
|
||||||
|
let error_str = error.map(|e| e.to_string());
|
||||||
|
|
||||||
|
tracing::info!(
|
||||||
|
target = "audit",
|
||||||
|
method = %session.req_header().method,
|
||||||
|
host = %extract_host(session),
|
||||||
|
path = %session.req_header().uri.path(),
|
||||||
|
client_ip,
|
||||||
|
status,
|
||||||
|
duration_ms,
|
||||||
|
backend,
|
||||||
|
error = error_str,
|
||||||
|
"request"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
40
src/telemetry.rs
Normal file
40
src/telemetry.rs
Normal file
@@ -0,0 +1,40 @@
|
|||||||
|
use opentelemetry::trace::TracerProvider as _;
|
||||||
|
use opentelemetry_otlp::WithExportConfig;
|
||||||
|
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
|
||||||
|
|
||||||
|
pub fn init(otlp_endpoint: &str) {
|
||||||
|
let fmt_layer = tracing_subscriber::fmt::layer()
|
||||||
|
.json()
|
||||||
|
.with_current_span(true)
|
||||||
|
.with_target(true);
|
||||||
|
|
||||||
|
let env_filter = EnvFilter::try_from_default_env()
|
||||||
|
.unwrap_or_else(|_| EnvFilter::new("info"));
|
||||||
|
|
||||||
|
if otlp_endpoint.is_empty() {
|
||||||
|
tracing_subscriber::registry()
|
||||||
|
.with(env_filter)
|
||||||
|
.with(fmt_layer)
|
||||||
|
.init();
|
||||||
|
} else {
|
||||||
|
let exporter = opentelemetry_otlp::SpanExporter::builder()
|
||||||
|
.with_http()
|
||||||
|
.with_endpoint(otlp_endpoint)
|
||||||
|
.build()
|
||||||
|
.expect("failed to build OTLP span exporter");
|
||||||
|
|
||||||
|
let provider = opentelemetry_sdk::trace::TracerProvider::builder()
|
||||||
|
.with_batch_exporter(exporter, opentelemetry_sdk::runtime::Tokio)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
opentelemetry::global::set_tracer_provider(provider.clone());
|
||||||
|
let tracer = provider.tracer("sunbeam-proxy");
|
||||||
|
let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
|
||||||
|
|
||||||
|
tracing_subscriber::registry()
|
||||||
|
.with(env_filter)
|
||||||
|
.with(fmt_layer)
|
||||||
|
.with(otel_layer)
|
||||||
|
.init();
|
||||||
|
}
|
||||||
|
}
|
||||||
122
src/watcher.rs
Normal file
122
src/watcher.rs
Normal file
@@ -0,0 +1,122 @@
|
|||||||
|
use futures::StreamExt;
|
||||||
|
use k8s_openapi::api::core::v1::{ConfigMap, Secret};
|
||||||
|
use kube::{runtime::watcher, Api, Client};
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
|
/// Watch `pingora-tls` and `pingora-config` in the ingress namespace.
|
||||||
|
///
|
||||||
|
/// On cert change: write new cert bytes from the Apply event directly to the
|
||||||
|
/// configured paths (avoiding kubelet volume-sync delay), then trigger a
|
||||||
|
/// graceful upgrade so the new process reads the updated cert immediately.
|
||||||
|
///
|
||||||
|
/// On config change: trigger the upgrade immediately; the kubelet usually
|
||||||
|
/// syncs ConfigMap volumes within ~60s, so the new process reads the updated
|
||||||
|
/// config shortly after restarting.
|
||||||
|
///
|
||||||
|
/// No-ops when no K8s client is available (e.g. ad-hoc local runs outside a
|
||||||
|
/// cluster) so the binary works in both environments.
|
||||||
|
pub async fn run_watcher(client: Client, cert_path: String, key_path: String) {
|
||||||
|
let (tx, mut rx) = mpsc::channel::<()>(2);
|
||||||
|
|
||||||
|
let secret_api: Api<Secret> = Api::namespaced(client.clone(), "ingress");
|
||||||
|
let cm_api: Api<ConfigMap> = Api::namespaced(client.clone(), "ingress");
|
||||||
|
|
||||||
|
tokio::spawn(watch_secret(secret_api, cert_path, key_path, tx.clone()));
|
||||||
|
tokio::spawn(watch_configmap(cm_api, tx));
|
||||||
|
|
||||||
|
if rx.recv().await.is_some() {
|
||||||
|
tracing::info!("initiating graceful upgrade");
|
||||||
|
trigger_upgrade();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn watch_secret(
|
||||||
|
api: Api<Secret>,
|
||||||
|
cert_path: String,
|
||||||
|
key_path: String,
|
||||||
|
tx: mpsc::Sender<()>,
|
||||||
|
) {
|
||||||
|
let cfg = watcher::Config::default().fields("metadata.name=pingora-tls");
|
||||||
|
let mut stream = Box::pin(watcher(api, cfg));
|
||||||
|
let mut initialized = false;
|
||||||
|
|
||||||
|
while let Some(result) = stream.next().await {
|
||||||
|
match result {
|
||||||
|
Ok(watcher::Event::InitDone) => {
|
||||||
|
initialized = true;
|
||||||
|
tracing::debug!("pingora-tls watcher ready");
|
||||||
|
}
|
||||||
|
// Write the new cert directly from the event object before triggering the
|
||||||
|
// upgrade. The Apply event carries the full updated Secret, so we don't
|
||||||
|
// need a separate API call and the cert files are ready before the new
|
||||||
|
// process's svc.add_tls() runs.
|
||||||
|
Ok(watcher::Event::Apply(secret)) if initialized => {
|
||||||
|
tracing::info!("pingora-tls changed — writing new cert");
|
||||||
|
match crate::cert::write_from_secret(&secret, &cert_path, &key_path) {
|
||||||
|
Ok(()) => {
|
||||||
|
let _ = tx.send(()).await;
|
||||||
|
}
|
||||||
|
Err(e) => tracing::error!(error = %e, "cert write failed; skipping upgrade"),
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(error = %e, resource = "pingora-tls", "watcher error; retrying in 10s");
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn watch_configmap(api: Api<ConfigMap>, tx: mpsc::Sender<()>) {
|
||||||
|
let cfg = watcher::Config::default().fields("metadata.name=pingora-config");
|
||||||
|
let mut stream = Box::pin(watcher(api, cfg));
|
||||||
|
let mut initialized = false;
|
||||||
|
|
||||||
|
while let Some(result) = stream.next().await {
|
||||||
|
match result {
|
||||||
|
Ok(watcher::Event::InitDone) => {
|
||||||
|
initialized = true;
|
||||||
|
tracing::debug!("pingora-config watcher ready");
|
||||||
|
}
|
||||||
|
Ok(watcher::Event::Apply(_)) if initialized => {
|
||||||
|
tracing::info!("pingora-config changed — triggering upgrade");
|
||||||
|
let _ = tx.send(()).await;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(error = %e, resource = "pingora-config", "watcher error; retrying in 10s");
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Spawn a new process with `--upgrade`, then send SIGQUIT to self.
|
||||||
|
///
|
||||||
|
/// Pingora's SIGQUIT handler transfers all listening socket FDs to the new
|
||||||
|
/// process via a Unix socket and begins draining existing connections. The
|
||||||
|
/// new process calls `Server::new(Some(Opt { upgrade: true }))` in
|
||||||
|
/// `bootstrap()`, inherits the FDs, and takes over without dropping connections.
|
||||||
|
fn trigger_upgrade() {
|
||||||
|
let exe = match std::env::current_exe() {
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(error = %e, "cannot resolve current exe; upgrade aborted");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
match std::process::Command::new(&exe).arg("--upgrade").spawn() {
|
||||||
|
Ok(child) => tracing::info!(pid = child.id(), "upgrade process spawned"),
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(error = %e, "failed to spawn upgrade process; upgrade aborted");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// SAFETY: kill(getpid(), SIGQUIT) is always safe; we're only signalling ourselves.
|
||||||
|
unsafe { libc::kill(libc::getpid(), libc::SIGQUIT) };
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user