2026-03-10 23:38:19 +00:00
|
|
|
use crate::acme::AcmeRoutes;
|
|
|
|
|
use crate::config::RouteConfig;
|
|
|
|
|
use async_trait::async_trait;
|
2026-03-10 23:38:19 +00:00
|
|
|
use http::header::{CONNECTION, EXPECT, HOST, UPGRADE};
|
2026-03-10 23:38:19 +00:00
|
|
|
use pingora_core::{upstreams::peer::HttpPeer, Result};
|
|
|
|
|
use pingora_http::{RequestHeader, ResponseHeader};
|
|
|
|
|
use pingora_proxy::{ProxyHttp, Session};
|
|
|
|
|
use std::time::Instant;
|
|
|
|
|
|
|
|
|
|
pub struct SunbeamProxy {
|
|
|
|
|
pub routes: Vec<RouteConfig>,
|
|
|
|
|
/// Per-challenge route table populated by the Ingress watcher.
|
|
|
|
|
/// Maps `/.well-known/acme-challenge/<token>` → solver service address.
|
|
|
|
|
pub acme_routes: AcmeRoutes,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub struct RequestCtx {
|
|
|
|
|
pub route: Option<RouteConfig>,
|
|
|
|
|
pub start_time: Instant,
|
|
|
|
|
/// Resolved solver backend address for this ACME challenge, if applicable.
|
|
|
|
|
pub acme_backend: Option<String>,
|
|
|
|
|
/// Path prefix to strip before forwarding to the upstream (e.g. "/kratos").
|
|
|
|
|
pub strip_prefix: Option<String>,
|
2026-03-10 23:38:19 +00:00
|
|
|
/// Original downstream scheme ("http" or "https"), captured in request_filter.
|
|
|
|
|
pub downstream_scheme: &'static str,
|
2026-03-10 23:38:19 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl SunbeamProxy {
|
|
|
|
|
fn find_route(&self, prefix: &str) -> Option<&RouteConfig> {
|
|
|
|
|
self.routes.iter().find(|r| r.host_prefix == prefix)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn extract_host(session: &Session) -> String {
|
|
|
|
|
session
|
|
|
|
|
.req_header()
|
|
|
|
|
.headers
|
|
|
|
|
.get(HOST)
|
|
|
|
|
.and_then(|v| v.to_str().ok())
|
|
|
|
|
.unwrap_or("")
|
|
|
|
|
.to_string()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Strip the scheme prefix from a backend URL like `http://host:port`.
|
|
|
|
|
fn backend_addr(backend: &str) -> &str {
|
|
|
|
|
backend
|
|
|
|
|
.trim_start_matches("https://")
|
|
|
|
|
.trim_start_matches("http://")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Returns true if the downstream connection is plain HTTP (no TLS).
|
|
|
|
|
fn is_plain_http(session: &Session) -> bool {
|
|
|
|
|
session
|
|
|
|
|
.digest()
|
|
|
|
|
.map(|d| d.ssl_digest.is_none())
|
|
|
|
|
.unwrap_or(true)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[async_trait]
|
|
|
|
|
impl ProxyHttp for SunbeamProxy {
|
|
|
|
|
type CTX = RequestCtx;
|
|
|
|
|
|
|
|
|
|
fn new_ctx(&self) -> RequestCtx {
|
|
|
|
|
RequestCtx {
|
|
|
|
|
route: None,
|
|
|
|
|
start_time: Instant::now(),
|
|
|
|
|
acme_backend: None,
|
2026-03-10 23:38:19 +00:00
|
|
|
downstream_scheme: "https",
|
2026-03-10 23:38:19 +00:00
|
|
|
strip_prefix: None,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// HTTP → HTTPS redirect; ACME HTTP-01 challenges pass through on plain HTTP.
|
|
|
|
|
async fn request_filter(
|
|
|
|
|
&self,
|
|
|
|
|
session: &mut Session,
|
|
|
|
|
ctx: &mut RequestCtx,
|
|
|
|
|
) -> Result<bool>
|
|
|
|
|
where
|
|
|
|
|
Self::CTX: Send + Sync,
|
|
|
|
|
{
|
2026-03-10 23:38:19 +00:00
|
|
|
ctx.downstream_scheme = if is_plain_http(session) { "http" } else { "https" };
|
|
|
|
|
|
2026-03-10 23:38:19 +00:00
|
|
|
if is_plain_http(session) {
|
|
|
|
|
let path = session.req_header().uri.path().to_string();
|
|
|
|
|
|
|
|
|
|
// cert-manager HTTP-01 challenge: look up the token path in the
|
|
|
|
|
// Ingress-backed route table. Each challenge Ingress maps exactly
|
|
|
|
|
// one token to exactly one solver Service, so this routes the request
|
|
|
|
|
// to the right solver pod even when multiple challenges run in parallel.
|
|
|
|
|
if path.starts_with("/.well-known/acme-challenge/") {
|
|
|
|
|
// Drop the guard before any await point (RwLockReadGuard is !Send).
|
|
|
|
|
let backend = self
|
|
|
|
|
.acme_routes
|
|
|
|
|
.read()
|
|
|
|
|
.unwrap_or_else(|e| e.into_inner())
|
|
|
|
|
.get(&path)
|
|
|
|
|
.cloned();
|
|
|
|
|
if let Some(backend) = backend {
|
|
|
|
|
ctx.acme_backend = Some(backend);
|
|
|
|
|
return Ok(false); // pass to upstream_peer
|
|
|
|
|
}
|
|
|
|
|
// No route yet: challenge Ingress hasn't arrived from cert-manager.
|
|
|
|
|
let mut resp = ResponseHeader::build(404, None)?;
|
|
|
|
|
resp.insert_header("Content-Length", "0")?;
|
|
|
|
|
session.write_response_header(Box::new(resp), true).await?;
|
|
|
|
|
return Ok(true);
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-10 23:38:19 +00:00
|
|
|
// All other plain-HTTP traffic.
|
2026-03-10 23:38:19 +00:00
|
|
|
let host = extract_host(session);
|
2026-03-10 23:38:19 +00:00
|
|
|
let prefix = host.split('.').next().unwrap_or("");
|
|
|
|
|
|
|
|
|
|
// Routes that explicitly opt out of HTTPS enforcement pass through.
|
|
|
|
|
// All other requests — including unknown hosts — are redirected.
|
|
|
|
|
// This is as close to an L4 redirect as HTTP allows: the upstream is
|
|
|
|
|
// never contacted; the 301 is written directly to the downstream socket.
|
|
|
|
|
if self
|
|
|
|
|
.find_route(prefix)
|
|
|
|
|
.map(|r| r.disable_secure_redirection)
|
|
|
|
|
.unwrap_or(false)
|
|
|
|
|
{
|
|
|
|
|
return Ok(false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let query = session
|
|
|
|
|
.req_header()
|
|
|
|
|
.uri
|
|
|
|
|
.query()
|
|
|
|
|
.map(|q| format!("?{q}"))
|
|
|
|
|
.unwrap_or_default();
|
|
|
|
|
let location = format!("https://{host}{path}{query}");
|
2026-03-10 23:38:19 +00:00
|
|
|
let mut resp = ResponseHeader::build(301, None)?;
|
|
|
|
|
resp.insert_header("Location", location)?;
|
|
|
|
|
resp.insert_header("Content-Length", "0")?;
|
|
|
|
|
session.write_response_header(Box::new(resp), true).await?;
|
|
|
|
|
return Ok(true);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Reject unknown host prefixes with 404.
|
|
|
|
|
let host = extract_host(session);
|
|
|
|
|
let prefix = host.split('.').next().unwrap_or("");
|
|
|
|
|
if self.find_route(prefix).is_none() {
|
|
|
|
|
let mut resp = ResponseHeader::build(404, None)?;
|
|
|
|
|
resp.insert_header("Content-Length", "0")?;
|
|
|
|
|
session.write_response_header(Box::new(resp), true).await?;
|
|
|
|
|
return Ok(true);
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-10 23:38:19 +00:00
|
|
|
// Handle Expect: 100-continue before connecting to upstream.
|
|
|
|
|
//
|
|
|
|
|
// Docker's OCI distribution protocol sends Expect: 100-continue for
|
|
|
|
|
// large layer blob uploads (typically > 5 MB). Without this, Pingora
|
|
|
|
|
// forwards the header to the upstream (e.g. Gitea), the upstream
|
|
|
|
|
// responds with 100 Continue, and Pingora must then proxy that
|
|
|
|
|
// informational response back to the client. Pingora's handling of
|
|
|
|
|
// upstream informational responses is unreliable and can cause the
|
|
|
|
|
// upload to fail with a spurious 400 for the client.
|
|
|
|
|
//
|
|
|
|
|
// By responding with 100 Continue here — before upstream_peer is
|
|
|
|
|
// even called — we unblock the client immediately. The Expect header
|
|
|
|
|
// is stripped in upstream_request_filter so the upstream never sends
|
|
|
|
|
// its own 100 Continue.
|
|
|
|
|
if session
|
|
|
|
|
.req_header()
|
|
|
|
|
.headers
|
|
|
|
|
.get(EXPECT)
|
|
|
|
|
.and_then(|v| v.to_str().ok())
|
|
|
|
|
.map(|v| v.eq_ignore_ascii_case("100-continue"))
|
|
|
|
|
.unwrap_or(false)
|
|
|
|
|
{
|
|
|
|
|
session.write_continue_response().await?;
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-10 23:38:19 +00:00
|
|
|
Ok(false)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn upstream_peer(
|
|
|
|
|
&self,
|
|
|
|
|
session: &mut Session,
|
|
|
|
|
ctx: &mut RequestCtx,
|
|
|
|
|
) -> Result<Box<HttpPeer>> {
|
|
|
|
|
// ACME challenge: backend was resolved in request_filter.
|
|
|
|
|
if let Some(backend) = &ctx.acme_backend {
|
|
|
|
|
return Ok(Box::new(HttpPeer::new(
|
|
|
|
|
backend_addr(backend),
|
|
|
|
|
false,
|
|
|
|
|
String::new(),
|
|
|
|
|
)));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
let host = extract_host(session);
|
|
|
|
|
let prefix = host.split('.').next().unwrap_or("");
|
|
|
|
|
let route = self
|
|
|
|
|
.find_route(prefix)
|
|
|
|
|
.expect("route already validated in request_filter");
|
|
|
|
|
|
|
|
|
|
let path = session.req_header().uri.path().to_string();
|
|
|
|
|
|
|
|
|
|
// Check path sub-routes (longest matching prefix wins).
|
|
|
|
|
let path_route = route
|
|
|
|
|
.paths
|
|
|
|
|
.iter()
|
|
|
|
|
.filter(|p| path.starts_with(p.prefix.as_str()))
|
|
|
|
|
.max_by_key(|p| p.prefix.len());
|
|
|
|
|
|
|
|
|
|
if let Some(pr) = path_route {
|
|
|
|
|
if pr.strip_prefix {
|
|
|
|
|
ctx.strip_prefix = Some(pr.prefix.clone());
|
|
|
|
|
}
|
|
|
|
|
ctx.route = Some(crate::config::RouteConfig {
|
|
|
|
|
host_prefix: route.host_prefix.clone(),
|
|
|
|
|
backend: pr.backend.clone(),
|
|
|
|
|
websocket: pr.websocket || route.websocket,
|
2026-03-10 23:38:19 +00:00
|
|
|
disable_secure_redirection: route.disable_secure_redirection,
|
2026-03-10 23:38:19 +00:00
|
|
|
paths: vec![],
|
|
|
|
|
});
|
|
|
|
|
return Ok(Box::new(HttpPeer::new(
|
|
|
|
|
backend_addr(&pr.backend),
|
|
|
|
|
false,
|
|
|
|
|
String::new(),
|
|
|
|
|
)));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ctx.route = Some(route.clone());
|
|
|
|
|
Ok(Box::new(HttpPeer::new(
|
|
|
|
|
backend_addr(&route.backend),
|
|
|
|
|
false,
|
|
|
|
|
String::new(),
|
|
|
|
|
)))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Copy WebSocket upgrade headers and apply path prefix stripping.
|
|
|
|
|
async fn upstream_request_filter(
|
|
|
|
|
&self,
|
|
|
|
|
session: &mut Session,
|
|
|
|
|
upstream_req: &mut RequestHeader,
|
|
|
|
|
ctx: &mut RequestCtx,
|
|
|
|
|
) -> Result<()>
|
|
|
|
|
where
|
|
|
|
|
Self::CTX: Send + Sync,
|
|
|
|
|
{
|
2026-03-10 23:38:19 +00:00
|
|
|
// Inform backends of the original downstream scheme so they can construct
|
|
|
|
|
// correct absolute URLs (e.g. OIDC redirect_uri, CSRF checks).
|
|
|
|
|
// Must use insert_header (not headers.insert) so that both base.headers
|
|
|
|
|
// and the CaseMap are updated together — header_to_h1_wire zips them
|
|
|
|
|
// and silently drops headers only present in base.headers.
|
|
|
|
|
upstream_req
|
|
|
|
|
.insert_header("x-forwarded-proto", ctx.downstream_scheme)
|
|
|
|
|
.map_err(|e| {
|
|
|
|
|
pingora_core::Error::because(
|
|
|
|
|
pingora_core::ErrorType::InternalError,
|
|
|
|
|
"failed to insert x-forwarded-proto",
|
|
|
|
|
e,
|
|
|
|
|
)
|
|
|
|
|
})?;
|
|
|
|
|
|
2026-03-10 23:38:19 +00:00
|
|
|
if ctx.route.as_ref().map(|r| r.websocket).unwrap_or(false) {
|
|
|
|
|
for name in &[CONNECTION, UPGRADE] {
|
|
|
|
|
if let Some(val) = session.req_header().headers.get(name.clone()) {
|
|
|
|
|
upstream_req.insert_header(name.clone(), val)?;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2026-03-10 23:38:19 +00:00
|
|
|
// Strip Expect: 100-continue — the proxy already sent 100 Continue to
|
|
|
|
|
// the downstream client in request_filter, so we must not forward the
|
|
|
|
|
// header to the upstream. If the upstream also sees Expect it will
|
|
|
|
|
// send its own 100 Continue, which Pingora cannot reliably proxy back
|
|
|
|
|
// (it has already been consumed) and which can corrupt the response.
|
|
|
|
|
upstream_req.remove_header("expect");
|
|
|
|
|
|
2026-03-10 23:38:19 +00:00
|
|
|
// Strip path prefix before forwarding (e.g. /kratos → /).
|
|
|
|
|
if let Some(prefix) = &ctx.strip_prefix {
|
|
|
|
|
let old_uri = upstream_req.uri.clone();
|
|
|
|
|
let old_path = old_uri.path();
|
|
|
|
|
if let Some(stripped) = old_path.strip_prefix(prefix.as_str()) {
|
|
|
|
|
let new_path = if stripped.is_empty() { "/" } else { stripped };
|
|
|
|
|
let query_part = old_uri
|
|
|
|
|
.query()
|
|
|
|
|
.map(|q| format!("?{q}"))
|
|
|
|
|
.unwrap_or_default();
|
|
|
|
|
let new_pq: http::uri::PathAndQuery =
|
|
|
|
|
format!("{new_path}{query_part}").parse().map_err(|e| {
|
|
|
|
|
pingora_core::Error::because(
|
|
|
|
|
pingora_core::ErrorType::InternalError,
|
|
|
|
|
"invalid uri after prefix strip",
|
|
|
|
|
e,
|
|
|
|
|
)
|
|
|
|
|
})?;
|
|
|
|
|
let mut parts = old_uri.into_parts();
|
|
|
|
|
parts.path_and_query = Some(new_pq);
|
|
|
|
|
upstream_req.set_uri(
|
|
|
|
|
http::Uri::from_parts(parts).expect("valid uri parts"),
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Emit a structured JSON audit log line for every request.
|
|
|
|
|
async fn logging(
|
|
|
|
|
&self,
|
|
|
|
|
session: &mut Session,
|
|
|
|
|
error: Option<&pingora_core::Error>,
|
|
|
|
|
ctx: &mut RequestCtx,
|
|
|
|
|
) where
|
|
|
|
|
Self::CTX: Send + Sync,
|
|
|
|
|
{
|
|
|
|
|
let status = session
|
|
|
|
|
.response_written()
|
|
|
|
|
.map_or(0, |r| r.status.as_u16());
|
|
|
|
|
let duration_ms = ctx.start_time.elapsed().as_millis();
|
|
|
|
|
let backend = ctx
|
|
|
|
|
.route
|
|
|
|
|
.as_ref()
|
|
|
|
|
.map(|r| r.backend.as_str())
|
|
|
|
|
.unwrap_or("-");
|
|
|
|
|
let client_ip = session
|
|
|
|
|
.client_addr()
|
|
|
|
|
.map(|a| a.to_string())
|
|
|
|
|
.unwrap_or_else(|| "-".to_string());
|
|
|
|
|
let error_str = error.map(|e| e.to_string());
|
|
|
|
|
|
|
|
|
|
tracing::info!(
|
|
|
|
|
target = "audit",
|
|
|
|
|
method = %session.req_header().method,
|
|
|
|
|
host = %extract_host(session),
|
|
|
|
|
path = %session.req_header().uri.path(),
|
|
|
|
|
client_ip,
|
|
|
|
|
status,
|
|
|
|
|
duration_ms,
|
|
|
|
|
backend,
|
|
|
|
|
error = error_str,
|
|
|
|
|
"request"
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
2026-03-10 23:38:19 +00:00
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
mod tests {
|
|
|
|
|
use super::*;
|
|
|
|
|
use http::header::HeaderValue;
|
|
|
|
|
|
|
|
|
|
/// insert_header keeps CaseMap and base.headers in sync so the header
|
|
|
|
|
/// survives header_to_h1_wire serialization.
|
|
|
|
|
#[test]
|
|
|
|
|
fn test_x_forwarded_proto_https_roundtrips_through_insert_header() {
|
|
|
|
|
let mut req = RequestHeader::build("GET", b"/", None).unwrap();
|
|
|
|
|
req.insert_header("x-forwarded-proto", "https").unwrap();
|
|
|
|
|
assert_eq!(
|
|
|
|
|
req.headers.get("x-forwarded-proto"),
|
|
|
|
|
Some(&HeaderValue::from_static("https")),
|
|
|
|
|
);
|
|
|
|
|
// Verify it survives wire serialization (CaseMap + base.headers in sync).
|
|
|
|
|
let mut buf = Vec::new();
|
|
|
|
|
req.header_to_h1_wire(&mut buf);
|
|
|
|
|
let wire = String::from_utf8(buf).unwrap();
|
|
|
|
|
assert!(wire.contains("x-forwarded-proto: https"), "wire: {wire:?}");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn test_x_forwarded_proto_http_roundtrips_through_insert_header() {
|
|
|
|
|
let mut req = RequestHeader::build("GET", b"/", None).unwrap();
|
|
|
|
|
req.insert_header("x-forwarded-proto", "http").unwrap();
|
|
|
|
|
assert_eq!(
|
|
|
|
|
req.headers.get("x-forwarded-proto"),
|
|
|
|
|
Some(&HeaderValue::from_static("http")),
|
|
|
|
|
);
|
|
|
|
|
let mut buf = Vec::new();
|
|
|
|
|
req.header_to_h1_wire(&mut buf);
|
|
|
|
|
let wire = String::from_utf8(buf).unwrap();
|
|
|
|
|
assert!(wire.contains("x-forwarded-proto: http"), "wire: {wire:?}");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// ctx.downstream_scheme defaults to "https" and is readable.
|
|
|
|
|
#[test]
|
|
|
|
|
fn test_ctx_default_scheme_is_https() {
|
|
|
|
|
let ctx = RequestCtx {
|
|
|
|
|
route: None,
|
|
|
|
|
start_time: Instant::now(),
|
|
|
|
|
acme_backend: None,
|
|
|
|
|
strip_prefix: None,
|
|
|
|
|
downstream_scheme: "https",
|
|
|
|
|
};
|
|
|
|
|
assert_eq!(ctx.downstream_scheme, "https");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
|
fn test_backend_addr_strips_scheme() {
|
|
|
|
|
assert_eq!(backend_addr("http://svc.ns.svc.cluster.local:80"), "svc.ns.svc.cluster.local:80");
|
|
|
|
|
assert_eq!(backend_addr("https://svc.ns.svc.cluster.local:443"), "svc.ns.svc.cluster.local:443");
|
|
|
|
|
}
|
2026-03-10 23:38:19 +00:00
|
|
|
|
|
|
|
|
/// remove_header("expect") strips the header from the upstream request.
|
|
|
|
|
/// This is tested independently of the async proxy logic because
|
|
|
|
|
/// upstream_request_filter requires a live session.
|
|
|
|
|
#[test]
|
|
|
|
|
fn test_expect_header_stripped_before_upstream() {
|
|
|
|
|
let mut req = RequestHeader::build("PUT", b"/v2/studio/image/blobs/uploads/uuid", None).unwrap();
|
|
|
|
|
req.insert_header("expect", "100-continue").unwrap();
|
|
|
|
|
req.insert_header("content-length", "188000000").unwrap();
|
|
|
|
|
assert!(req.headers.get("expect").is_some(), "expect header should be present before stripping");
|
|
|
|
|
req.remove_header("expect");
|
|
|
|
|
assert!(req.headers.get("expect").is_none(), "expect header should be gone after remove_header");
|
|
|
|
|
// Content-Length must survive the strip.
|
|
|
|
|
assert!(req.headers.get("content-length").is_some());
|
|
|
|
|
}
|
2026-03-10 23:38:19 +00:00
|
|
|
}
|