feat(cluster): wire cluster into proxy lifecycle and request pipeline

Spawn cluster on dedicated thread in main.rs with graceful fallback to
standalone on failure. Add cluster field to SunbeamProxy, record
bandwidth in logging(), and enforce cluster-wide bandwidth cap in
request_filter with 429 JSON response.

Signed-off-by: Sienna Meridian Satterwhite <sienna@sunbeam.pt>
This commit is contained in:
2026-03-10 23:38:21 +00:00
parent 5d279f992b
commit 65516404e1
4 changed files with 63 additions and 3 deletions

View File

@@ -2,13 +2,14 @@
// integration tests in tests/ can construct and drive a SunbeamProxy
// without going through the binary entry point.
pub mod acme;
pub mod cache;
pub mod cluster;
pub mod config;
pub mod metrics;
pub mod ddos;
pub mod dual_stack;
pub mod metrics;
pub mod proxy;
pub mod rate_limit;
pub mod scanner;
pub mod cache;
pub mod ssh;
pub mod static_files;

View File

@@ -353,6 +353,31 @@ fn run_serve(upgrade: bool) -> Result<()> {
// Pingora's async proxy calls without cross-runtime waker concerns.
let acme_routes: acme::AcmeRoutes = Arc::new(RwLock::new(HashMap::new()));
// 2d. Spawn cluster gossip if configured.
let cluster_handle = if let Some(cc) = &cfg.cluster {
if cc.enabled {
match sunbeam_proxy::cluster::spawn_cluster(cc) {
Ok(handle) => {
tracing::info!(
endpoint_id = %handle.endpoint_id,
tenant = %cc.tenant,
port = cc.gossip_port,
"cluster gossip started"
);
Some(Arc::new(handle))
}
Err(e) => {
tracing::warn!(error = %e, "failed to start cluster; running standalone");
None
}
}
} else {
None
}
} else {
None
};
let compiled_rewrites = SunbeamProxy::compile_rewrites(&cfg.routes);
let http_client = reqwest::Client::new();
@@ -368,6 +393,7 @@ fn run_serve(upgrade: bool) -> Result<()> {
pipeline_bypass_cidrs: crate::rate_limit::cidr::parse_cidrs(
&cfg.rate_limit.as_ref().map(|rl| rl.bypass_cidrs.clone()).unwrap_or_default(),
),
cluster: cluster_handle,
};
let mut svc = http_proxy_service(&server.configuration, proxy);

View File

@@ -1,4 +1,5 @@
use crate::acme::AcmeRoutes;
use crate::cluster::ClusterHandle;
use crate::config::RouteConfig;
use crate::ddos::detector::DDoSDetector;
use crate::ddos::model::DDoSAction;
@@ -45,6 +46,8 @@ pub struct SunbeamProxy {
pub http_client: reqwest::Client,
/// Parsed bypass CIDRs — IPs in these ranges skip the detection pipeline.
pub pipeline_bypass_cidrs: Vec<crate::rate_limit::cidr::CidrBlock>,
/// Optional cluster handle for multi-node bandwidth tracking.
pub cluster: Option<Arc<ClusterHandle>>,
}
pub struct RequestCtx {
@@ -479,6 +482,24 @@ impl ProxyHttp for SunbeamProxy {
}
}
// Cluster-wide bandwidth cap enforcement.
if let Some(c) = &self.cluster {
use crate::cluster::bandwidth::BandwidthLimitResult;
let bw_result = c.limiter.check();
let decision = if bw_result == BandwidthLimitResult::Reject { "block" } else { "allow" };
metrics::BANDWIDTH_LIMIT_DECISIONS.with_label_values(&[decision]).inc();
if bw_result == BandwidthLimitResult::Reject {
let body = b"{\"error\":\"bandwidth_limit_exceeded\",\"message\":\"Request rate-limited: aggregate bandwidth capacity exceeded. Please try again shortly.\"}";
let mut resp = ResponseHeader::build(429, None)?;
resp.insert_header("Retry-After", "5")?;
resp.insert_header("Content-Type", "application/json")?;
resp.insert_header("Content-Length", body.len().to_string())?;
session.write_response_header(Box::new(resp), false).await?;
session.write_response_body(Some(Bytes::from_static(body)), true).await?;
return Ok(true);
}
}
// Reject unknown host prefixes with 404.
let host = extract_host(session);
let prefix = host.split('.').next().unwrap_or("");
@@ -1074,6 +1095,18 @@ impl ProxyHttp for SunbeamProxy {
.inc();
metrics::REQUEST_DURATION.observe(duration_secs);
// Record bandwidth for cluster aggregation.
if let Some(c) = &self.cluster {
let req_bytes: u64 = session
.req_header()
.headers
.get("content-length")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse().ok())
.unwrap_or(0);
c.bandwidth.record(req_bytes, session.body_bytes_sent() as u64);
}
let content_length: u64 = session
.req_header()
.headers

View File

@@ -108,7 +108,7 @@ fn start_proxy_once(backend_port: u16) {
}];
let acme_routes: AcmeRoutes = Arc::new(RwLock::new(HashMap::new()));
let compiled_rewrites = SunbeamProxy::compile_rewrites(&routes);
let proxy = SunbeamProxy { routes, acme_routes, ddos_detector: None, scanner_detector: None, bot_allowlist: None, rate_limiter: None, compiled_rewrites, http_client: reqwest::Client::new(), pipeline_bypass_cidrs: vec![] };
let proxy = SunbeamProxy { routes, acme_routes, ddos_detector: None, scanner_detector: None, bot_allowlist: None, rate_limiter: None, compiled_rewrites, http_client: reqwest::Client::new(), pipeline_bypass_cidrs: vec![], cluster: None };
let opt = Opt {
upgrade: false,