From 65516404e181f57a0ec4ce3293ecdec6197c4ce1 Mon Sep 17 00:00:00 2001 From: Sienna Meridian Satterwhite Date: Tue, 10 Mar 2026 23:38:21 +0000 Subject: [PATCH] feat(cluster): wire cluster into proxy lifecycle and request pipeline Spawn cluster on dedicated thread in main.rs with graceful fallback to standalone on failure. Add cluster field to SunbeamProxy, record bandwidth in logging(), and enforce cluster-wide bandwidth cap in request_filter with 429 JSON response. Signed-off-by: Sienna Meridian Satterwhite --- src/lib.rs | 5 +++-- src/main.rs | 26 ++++++++++++++++++++++++++ src/proxy.rs | 33 +++++++++++++++++++++++++++++++++ tests/e2e.rs | 2 +- 4 files changed, 63 insertions(+), 3 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 4d16d5b..daa5a61 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,13 +2,14 @@ // integration tests in tests/ can construct and drive a SunbeamProxy // without going through the binary entry point. pub mod acme; +pub mod cache; +pub mod cluster; pub mod config; -pub mod metrics; pub mod ddos; pub mod dual_stack; +pub mod metrics; pub mod proxy; pub mod rate_limit; pub mod scanner; -pub mod cache; pub mod ssh; pub mod static_files; diff --git a/src/main.rs b/src/main.rs index 868c271..3e10f1d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -353,6 +353,31 @@ fn run_serve(upgrade: bool) -> Result<()> { // Pingora's async proxy calls without cross-runtime waker concerns. let acme_routes: acme::AcmeRoutes = Arc::new(RwLock::new(HashMap::new())); + // 2d. Spawn cluster gossip if configured. + let cluster_handle = if let Some(cc) = &cfg.cluster { + if cc.enabled { + match sunbeam_proxy::cluster::spawn_cluster(cc) { + Ok(handle) => { + tracing::info!( + endpoint_id = %handle.endpoint_id, + tenant = %cc.tenant, + port = cc.gossip_port, + "cluster gossip started" + ); + Some(Arc::new(handle)) + } + Err(e) => { + tracing::warn!(error = %e, "failed to start cluster; running standalone"); + None + } + } + } else { + None + } + } else { + None + }; + let compiled_rewrites = SunbeamProxy::compile_rewrites(&cfg.routes); let http_client = reqwest::Client::new(); @@ -368,6 +393,7 @@ fn run_serve(upgrade: bool) -> Result<()> { pipeline_bypass_cidrs: crate::rate_limit::cidr::parse_cidrs( &cfg.rate_limit.as_ref().map(|rl| rl.bypass_cidrs.clone()).unwrap_or_default(), ), + cluster: cluster_handle, }; let mut svc = http_proxy_service(&server.configuration, proxy); diff --git a/src/proxy.rs b/src/proxy.rs index 3ce401f..b515a16 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -1,4 +1,5 @@ use crate::acme::AcmeRoutes; +use crate::cluster::ClusterHandle; use crate::config::RouteConfig; use crate::ddos::detector::DDoSDetector; use crate::ddos::model::DDoSAction; @@ -45,6 +46,8 @@ pub struct SunbeamProxy { pub http_client: reqwest::Client, /// Parsed bypass CIDRs — IPs in these ranges skip the detection pipeline. pub pipeline_bypass_cidrs: Vec, + /// Optional cluster handle for multi-node bandwidth tracking. + pub cluster: Option>, } pub struct RequestCtx { @@ -479,6 +482,24 @@ impl ProxyHttp for SunbeamProxy { } } + // Cluster-wide bandwidth cap enforcement. + if let Some(c) = &self.cluster { + use crate::cluster::bandwidth::BandwidthLimitResult; + let bw_result = c.limiter.check(); + let decision = if bw_result == BandwidthLimitResult::Reject { "block" } else { "allow" }; + metrics::BANDWIDTH_LIMIT_DECISIONS.with_label_values(&[decision]).inc(); + if bw_result == BandwidthLimitResult::Reject { + let body = b"{\"error\":\"bandwidth_limit_exceeded\",\"message\":\"Request rate-limited: aggregate bandwidth capacity exceeded. Please try again shortly.\"}"; + let mut resp = ResponseHeader::build(429, None)?; + resp.insert_header("Retry-After", "5")?; + resp.insert_header("Content-Type", "application/json")?; + resp.insert_header("Content-Length", body.len().to_string())?; + session.write_response_header(Box::new(resp), false).await?; + session.write_response_body(Some(Bytes::from_static(body)), true).await?; + return Ok(true); + } + } + // Reject unknown host prefixes with 404. let host = extract_host(session); let prefix = host.split('.').next().unwrap_or(""); @@ -1074,6 +1095,18 @@ impl ProxyHttp for SunbeamProxy { .inc(); metrics::REQUEST_DURATION.observe(duration_secs); + // Record bandwidth for cluster aggregation. + if let Some(c) = &self.cluster { + let req_bytes: u64 = session + .req_header() + .headers + .get("content-length") + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse().ok()) + .unwrap_or(0); + c.bandwidth.record(req_bytes, session.body_bytes_sent() as u64); + } + let content_length: u64 = session .req_header() .headers diff --git a/tests/e2e.rs b/tests/e2e.rs index ee7be37..d435f7e 100644 --- a/tests/e2e.rs +++ b/tests/e2e.rs @@ -108,7 +108,7 @@ fn start_proxy_once(backend_port: u16) { }]; let acme_routes: AcmeRoutes = Arc::new(RwLock::new(HashMap::new())); let compiled_rewrites = SunbeamProxy::compile_rewrites(&routes); - let proxy = SunbeamProxy { routes, acme_routes, ddos_detector: None, scanner_detector: None, bot_allowlist: None, rate_limiter: None, compiled_rewrites, http_client: reqwest::Client::new(), pipeline_bypass_cidrs: vec![] }; + let proxy = SunbeamProxy { routes, acme_routes, ddos_detector: None, scanner_detector: None, bot_allowlist: None, rate_limiter: None, compiled_rewrites, http_client: reqwest::Client::new(), pipeline_bypass_cidrs: vec![], cluster: None }; let opt = Opt { upgrade: false,