From 0f31c7645cde2064dc0197e4d1fad1ab698bf077 Mon Sep 17 00:00:00 2001 From: Sienna Meridian Satterwhite Date: Tue, 10 Mar 2026 23:38:20 +0000 Subject: [PATCH] feat(cache): add pingora-cache integration with per-route config Add in-memory HTTP response cache using pingora-cache MemCache backend. Cache runs after the detection pipeline so cache hits bypass upstream request modifications and body rewriting. Respects Cache-Control (no-store, private, s-maxage, max-age), skips caching for routes with body rewrites or auth subrequest headers, and supports configurable default TTL, stale-while-revalidate, and max file size per route. Signed-off-by: Sienna Meridian Satterwhite --- Cargo.lock | 1 + Cargo.toml | 3 +- benches/scanner_bench.rs | 3 + src/cache.rs | 50 +++++++++++++ src/config.rs | 22 ++++++ src/lib.rs | 1 + src/metrics.rs | 13 ++++ src/proxy.rs | 155 ++++++++++++++++++++++++++++++++++++++- src/scanner/detector.rs | 1 + tests/e2e.rs | 1 + tests/scanner_test.rs | 2 + 11 files changed, 249 insertions(+), 3 deletions(-) create mode 100644 src/cache.rs diff --git a/Cargo.lock b/Cargo.lock index 8593e5f..42719d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3584,6 +3584,7 @@ dependencies = [ "opentelemetry_sdk", "pin-project-lite", "pingora", + "pingora-cache", "pingora-core", "pingora-http", "pingora-proxy", diff --git a/Cargo.toml b/Cargo.toml index ca64aef..3d01e48 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,8 @@ path = "src/lib.rs" pingora = { version = "0.8", features = ["rustls"] } pingora-proxy = { version = "0.8", features = ["rustls"] } pingora-core = { version = "0.8", features = ["rustls"] } -pingora-http = "0.8" +pingora-http = "0.8" +pingora-cache = { version = "0.8", features = ["rustls"] } # HTTP header constants http = "1" diff --git a/benches/scanner_bench.rs b/benches/scanner_bench.rs index 4918bfe..ffbfe30 100644 --- a/benches/scanner_bench.rs +++ b/benches/scanner_bench.rs @@ -50,6 +50,7 @@ fn make_detector() -> ScannerDetector { rewrites: vec![], body_rewrites: vec![], response_headers: vec![], + cache: None, }, RouteConfig { host_prefix: "src".into(), @@ -62,6 +63,7 @@ fn make_detector() -> ScannerDetector { rewrites: vec![], body_rewrites: vec![], response_headers: vec![], + cache: None, }, RouteConfig { host_prefix: "docs".into(), @@ -74,6 +76,7 @@ fn make_detector() -> ScannerDetector { rewrites: vec![], body_rewrites: vec![], response_headers: vec![], + cache: None, }, ]; diff --git a/src/cache.rs b/src/cache.rs new file mode 100644 index 0000000..00b2b2f --- /dev/null +++ b/src/cache.rs @@ -0,0 +1,50 @@ +use pingora_cache::MemCache; +use std::sync::LazyLock; + +/// In-memory cache backend shared across all requests. +/// `Storage` requires `&'static self`, so we store it in a static. +pub static CACHE_BACKEND: LazyLock = LazyLock::new(MemCache::new); + +/// Parse s-maxage or max-age from a (lowercased) Cache-Control header value. +/// s-maxage takes priority (shared cache directive). +pub fn parse_cache_ttl(cc: &str) -> Option { + let mut max_age = None; + let mut s_maxage = None; + for part in cc.split(',') { + let part = part.trim(); + if let Some(val) = part.strip_prefix("s-maxage=") { + s_maxage = val.trim().parse().ok(); + } else if let Some(val) = part.strip_prefix("max-age=") { + max_age = val.trim().parse().ok(); + } + } + s_maxage.or(max_age) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_max_age_only() { + assert_eq!(parse_cache_ttl("public, max-age=3600"), Some(3600)); + } + + #[test] + fn parse_s_maxage_takes_priority() { + assert_eq!( + parse_cache_ttl("public, max-age=3600, s-maxage=60"), + Some(60) + ); + } + + #[test] + fn parse_no_age_directives() { + assert_eq!(parse_cache_ttl("no-cache"), None); + } + + #[test] + fn parse_zero_max_age() { + assert_eq!(parse_cache_ttl("max-age=0"), Some(0)); + } +} diff --git a/src/config.rs b/src/config.rs index a2f4155..9dfef78 100644 --- a/src/config.rs +++ b/src/config.rs @@ -190,6 +190,25 @@ pub struct HeaderRule { pub value: String, } +/// Per-route HTTP response cache configuration. +#[derive(Debug, Deserialize, Clone)] +pub struct CacheConfig { + #[serde(default = "default_cache_enabled")] + pub enabled: bool, + /// Default TTL in seconds when the upstream response has no Cache-Control header. + #[serde(default = "default_cache_ttl")] + pub default_ttl_secs: u64, + /// Seconds to serve stale content while revalidating in the background. + #[serde(default)] + pub stale_while_revalidate_secs: u32, + /// Max cacheable response body size in bytes (0 = no limit). + #[serde(default)] + pub max_file_size: usize, +} + +fn default_cache_enabled() -> bool { true } +fn default_cache_ttl() -> u64 { 60 } + #[derive(Debug, Deserialize, Clone)] pub struct RouteConfig { pub host_prefix: String, @@ -220,6 +239,9 @@ pub struct RouteConfig { /// Extra response headers added to every response for this route. #[serde(default)] pub response_headers: Vec, + /// HTTP response cache configuration for this route. + #[serde(default)] + pub cache: Option, } impl Config { diff --git a/src/lib.rs b/src/lib.rs index 508dfd5..4d16d5b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,5 +9,6 @@ pub mod dual_stack; pub mod proxy; pub mod rate_limit; pub mod scanner; +pub mod cache; pub mod ssh; pub mod static_files; diff --git a/src/metrics.rs b/src/metrics.rs index 061ed58..c6da4ab 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -72,6 +72,19 @@ pub static RATE_LIMIT_DECISIONS: LazyLock = LazyLock::new(|| { c }); +pub static CACHE_STATUS: LazyLock = LazyLock::new(|| { + let c = IntCounterVec::new( + Opts::new( + "sunbeam_cache_status_total", + "Cache hit/miss counts", + ), + &["status"], + ) + .unwrap(); + REGISTRY.register(Box::new(c.clone())).unwrap(); + c +}); + pub static ACTIVE_CONNECTIONS: LazyLock = LazyLock::new(|| { let g = Gauge::new( "sunbeam_active_connections", diff --git a/src/proxy.rs b/src/proxy.rs index f8c180b..2a4bf02 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -12,6 +12,7 @@ use arc_swap::ArcSwap; use async_trait::async_trait; use bytes::Bytes; use http::header::{CONNECTION, EXPECT, HOST, UPGRADE}; +use pingora_cache::{CacheKey, CacheMeta, ForcedFreshness, HitHandler, NoCacheReason, RespCacheable}; use pingora_core::{upstreams::peer::HttpPeer, Result}; use pingora_http::{RequestHeader, ResponseHeader}; use pingora_proxy::{ProxyHttp, Session}; @@ -47,7 +48,7 @@ pub struct SunbeamProxy { pub struct RequestCtx { pub route: Option, pub start_time: Instant, - /// Unique request identifier (UUID v4). + /// Unique request identifier (monotonic hex counter). pub request_id: String, /// Tracing span for this request. pub span: tracing::Span, @@ -477,7 +478,7 @@ impl ProxyHttp for SunbeamProxy { } }; - // Store route early so downstream hooks can access it. + // Store route early so request_cache_filter can access it. ctx.route = Some(route.clone()); // ── Static file serving ────────────────────────────────────────── @@ -592,11 +593,13 @@ impl ProxyHttp for SunbeamProxy { // Prepare body rewrite rules if the route has them. if !route.body_rewrites.is_empty() { + // We'll check content-type in upstream_response_filter; store rules now. ctx.body_rewrite_rules = route .body_rewrites .iter() .map(|br| (br.find.clone(), br.replace.clone())) .collect(); + // Store the content-type filter info on the route for later. } // Handle Expect: 100-continue before connecting to upstream. @@ -614,6 +617,152 @@ impl ProxyHttp for SunbeamProxy { Ok(false) } + // ── Cache hooks ──────────────────────────────────────────────────── + // Runs AFTER request_filter (detection pipeline) and BEFORE upstream. + // On cache hit, the response is served directly — no upstream request, + // no request modifications, no body rewriting. + + fn request_cache_filter( + &self, + session: &mut Session, + ctx: &mut RequestCtx, + ) -> Result<()> + where + Self::CTX: Send + Sync, + { + // Only cache GET/HEAD. + let method = &session.req_header().method; + if method != http::Method::GET && method != http::Method::HEAD { + return Ok(()); + } + + let cache_cfg = match ctx.route.as_ref().and_then(|r| r.cache.as_ref()) { + Some(c) if c.enabled => c, + _ => return Ok(()), + }; + + // Skip cache if body rewrites are active (need per-response rewriting). + if !ctx.body_rewrite_rules.is_empty() { + return Ok(()); + } + + // Skip cache if auth subrequest captured headers (per-user content). + if !ctx.auth_headers.is_empty() { + return Ok(()); + } + + session.cache.enable( + &*crate::cache::CACHE_BACKEND, + None, // no eviction manager + None, // no predictor + None, // no cache lock + None, // no option overrides + ); + + if cache_cfg.max_file_size > 0 { + session + .cache + .set_max_file_size_bytes(cache_cfg.max_file_size); + } + + Ok(()) + } + + fn cache_key_callback( + &self, + session: &Session, + _ctx: &mut RequestCtx, + ) -> Result { + let req = session.req_header(); + let host = req + .headers + .get(HOST) + .and_then(|v| v.to_str().ok()) + .unwrap_or(""); + let path = req.uri.path(); + let key = match req.uri.query() { + Some(q) => format!("{host}{path}?{q}"), + None => format!("{host}{path}"), + }; + Ok(CacheKey::new("", key, "")) + } + + fn response_cache_filter( + &self, + _session: &Session, + resp: &ResponseHeader, + ctx: &mut RequestCtx, + ) -> Result { + use std::time::{Duration, SystemTime}; + + // Only cache 2xx responses. + if !resp.status.is_success() { + return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache)); + } + + let cache_cfg = match ctx.route.as_ref().and_then(|r| r.cache.as_ref()) { + Some(c) => c, + None => { + return Ok(RespCacheable::Uncacheable(NoCacheReason::NeverEnabled)); + } + }; + + // Respect Cache-Control: no-store, private. + if let Some(cc) = resp + .headers + .get("cache-control") + .and_then(|v| v.to_str().ok()) + { + let cc_lower = cc.to_ascii_lowercase(); + if cc_lower.contains("no-store") || cc_lower.contains("private") { + return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache)); + } + if let Some(ttl) = crate::cache::parse_cache_ttl(&cc_lower) { + if ttl == 0 { + return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache)); + } + let meta = CacheMeta::new( + SystemTime::now() + Duration::from_secs(ttl), + SystemTime::now(), + cache_cfg.stale_while_revalidate_secs, + 0, + resp.clone(), + ); + return Ok(RespCacheable::Cacheable(meta)); + } + } + + // No Cache-Control or no max-age: use route's default TTL. + let meta = CacheMeta::new( + SystemTime::now() + Duration::from_secs(cache_cfg.default_ttl_secs), + SystemTime::now(), + cache_cfg.stale_while_revalidate_secs, + 0, + resp.clone(), + ); + Ok(RespCacheable::Cacheable(meta)) + } + + async fn cache_hit_filter( + &self, + _session: &mut Session, + _meta: &CacheMeta, + _hit_handler: &mut HitHandler, + _is_fresh: bool, + _ctx: &mut RequestCtx, + ) -> Result> + where + Self::CTX: Send + Sync, + { + metrics::CACHE_STATUS.with_label_values(&["hit"]).inc(); + Ok(None) + } + + fn cache_miss(&self, session: &mut Session, _ctx: &mut RequestCtx) { + metrics::CACHE_STATUS.with_label_values(&["miss"]).inc(); + session.cache.cache_miss(); + } + async fn upstream_peer( &self, session: &mut Session, @@ -661,6 +810,7 @@ impl ProxyHttp for SunbeamProxy { rewrites: vec![], body_rewrites: vec![], response_headers: vec![], + cache: None, }); return Ok(Box::new(HttpPeer::new( backend_addr(&pr.backend), @@ -1081,6 +1231,7 @@ mod tests { }], body_rewrites: vec![], response_headers: vec![], + cache: None, }]; let compiled = SunbeamProxy::compile_rewrites(&routes); assert_eq!(compiled.len(), 1); diff --git a/src/scanner/detector.rs b/src/scanner/detector.rs index 66a7def..8c79ea0 100644 --- a/src/scanner/detector.rs +++ b/src/scanner/detector.rs @@ -166,6 +166,7 @@ mod tests { rewrites: vec![], body_rewrites: vec![], response_headers: vec![], + cache: None, }]; ScannerDetector::new(&model, &routes) } diff --git a/tests/e2e.rs b/tests/e2e.rs index 039c8ea..0edb517 100644 --- a/tests/e2e.rs +++ b/tests/e2e.rs @@ -104,6 +104,7 @@ fn start_proxy_once(backend_port: u16) { rewrites: vec![], body_rewrites: vec![], response_headers: vec![], + cache: None, }]; let acme_routes: AcmeRoutes = Arc::new(RwLock::new(HashMap::new())); let compiled_rewrites = SunbeamProxy::compile_rewrites(&routes); diff --git a/tests/scanner_test.rs b/tests/scanner_test.rs index 311f2fd..086523e 100644 --- a/tests/scanner_test.rs +++ b/tests/scanner_test.rs @@ -18,6 +18,7 @@ fn test_routes() -> Vec { rewrites: vec![], body_rewrites: vec![], response_headers: vec![], + cache: None, }, RouteConfig { host_prefix: "api".into(), @@ -30,6 +31,7 @@ fn test_routes() -> Vec { rewrites: vec![], body_rewrites: vec![], response_headers: vec![], + cache: None, }, ] }