diff --git a/src/lib.rs b/src/lib.rs index 4d16d5b..daa5a61 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,13 +2,14 @@ // integration tests in tests/ can construct and drive a SunbeamProxy // without going through the binary entry point. pub mod acme; +pub mod cache; +pub mod cluster; pub mod config; -pub mod metrics; pub mod ddos; pub mod dual_stack; +pub mod metrics; pub mod proxy; pub mod rate_limit; pub mod scanner; -pub mod cache; pub mod ssh; pub mod static_files; diff --git a/src/main.rs b/src/main.rs index 868c271..3e10f1d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -353,6 +353,31 @@ fn run_serve(upgrade: bool) -> Result<()> { // Pingora's async proxy calls without cross-runtime waker concerns. let acme_routes: acme::AcmeRoutes = Arc::new(RwLock::new(HashMap::new())); + // 2d. Spawn cluster gossip if configured. + let cluster_handle = if let Some(cc) = &cfg.cluster { + if cc.enabled { + match sunbeam_proxy::cluster::spawn_cluster(cc) { + Ok(handle) => { + tracing::info!( + endpoint_id = %handle.endpoint_id, + tenant = %cc.tenant, + port = cc.gossip_port, + "cluster gossip started" + ); + Some(Arc::new(handle)) + } + Err(e) => { + tracing::warn!(error = %e, "failed to start cluster; running standalone"); + None + } + } + } else { + None + } + } else { + None + }; + let compiled_rewrites = SunbeamProxy::compile_rewrites(&cfg.routes); let http_client = reqwest::Client::new(); @@ -368,6 +393,7 @@ fn run_serve(upgrade: bool) -> Result<()> { pipeline_bypass_cidrs: crate::rate_limit::cidr::parse_cidrs( &cfg.rate_limit.as_ref().map(|rl| rl.bypass_cidrs.clone()).unwrap_or_default(), ), + cluster: cluster_handle, }; let mut svc = http_proxy_service(&server.configuration, proxy); diff --git a/src/proxy.rs b/src/proxy.rs index 3ce401f..b515a16 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -1,4 +1,5 @@ use crate::acme::AcmeRoutes; +use crate::cluster::ClusterHandle; use crate::config::RouteConfig; use crate::ddos::detector::DDoSDetector; use crate::ddos::model::DDoSAction; @@ -45,6 +46,8 @@ pub struct SunbeamProxy { pub http_client: reqwest::Client, /// Parsed bypass CIDRs — IPs in these ranges skip the detection pipeline. pub pipeline_bypass_cidrs: Vec, + /// Optional cluster handle for multi-node bandwidth tracking. + pub cluster: Option>, } pub struct RequestCtx { @@ -479,6 +482,24 @@ impl ProxyHttp for SunbeamProxy { } } + // Cluster-wide bandwidth cap enforcement. + if let Some(c) = &self.cluster { + use crate::cluster::bandwidth::BandwidthLimitResult; + let bw_result = c.limiter.check(); + let decision = if bw_result == BandwidthLimitResult::Reject { "block" } else { "allow" }; + metrics::BANDWIDTH_LIMIT_DECISIONS.with_label_values(&[decision]).inc(); + if bw_result == BandwidthLimitResult::Reject { + let body = b"{\"error\":\"bandwidth_limit_exceeded\",\"message\":\"Request rate-limited: aggregate bandwidth capacity exceeded. Please try again shortly.\"}"; + let mut resp = ResponseHeader::build(429, None)?; + resp.insert_header("Retry-After", "5")?; + resp.insert_header("Content-Type", "application/json")?; + resp.insert_header("Content-Length", body.len().to_string())?; + session.write_response_header(Box::new(resp), false).await?; + session.write_response_body(Some(Bytes::from_static(body)), true).await?; + return Ok(true); + } + } + // Reject unknown host prefixes with 404. let host = extract_host(session); let prefix = host.split('.').next().unwrap_or(""); @@ -1074,6 +1095,18 @@ impl ProxyHttp for SunbeamProxy { .inc(); metrics::REQUEST_DURATION.observe(duration_secs); + // Record bandwidth for cluster aggregation. + if let Some(c) = &self.cluster { + let req_bytes: u64 = session + .req_header() + .headers + .get("content-length") + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse().ok()) + .unwrap_or(0); + c.bandwidth.record(req_bytes, session.body_bytes_sent() as u64); + } + let content_length: u64 = session .req_header() .headers diff --git a/tests/e2e.rs b/tests/e2e.rs index ee7be37..d435f7e 100644 --- a/tests/e2e.rs +++ b/tests/e2e.rs @@ -108,7 +108,7 @@ fn start_proxy_once(backend_port: u16) { }]; let acme_routes: AcmeRoutes = Arc::new(RwLock::new(HashMap::new())); let compiled_rewrites = SunbeamProxy::compile_rewrites(&routes); - let proxy = SunbeamProxy { routes, acme_routes, ddos_detector: None, scanner_detector: None, bot_allowlist: None, rate_limiter: None, compiled_rewrites, http_client: reqwest::Client::new(), pipeline_bypass_cidrs: vec![] }; + let proxy = SunbeamProxy { routes, acme_routes, ddos_detector: None, scanner_detector: None, bot_allowlist: None, rate_limiter: None, compiled_rewrites, http_client: reqwest::Client::new(), pipeline_bypass_cidrs: vec![], cluster: None }; let opt = Opt { upgrade: false,