feat(cache): add pingora-cache integration with per-route config
Add in-memory HTTP response cache using pingora-cache MemCache backend. Cache runs after the detection pipeline so cache hits bypass upstream request modifications and body rewriting. Respects Cache-Control (no-store, private, s-maxage, max-age), skips caching for routes with body rewrites or auth subrequest headers, and supports configurable default TTL, stale-while-revalidate, and max file size per route. Signed-off-by: Sienna Meridian Satterwhite <sienna@sunbeam.pt>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -3584,6 +3584,7 @@ dependencies = [
|
|||||||
"opentelemetry_sdk",
|
"opentelemetry_sdk",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"pingora",
|
"pingora",
|
||||||
|
"pingora-cache",
|
||||||
"pingora-core",
|
"pingora-core",
|
||||||
"pingora-http",
|
"pingora-http",
|
||||||
"pingora-proxy",
|
"pingora-proxy",
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ pingora = { version = "0.8", features = ["rustls"] }
|
|||||||
pingora-proxy = { version = "0.8", features = ["rustls"] }
|
pingora-proxy = { version = "0.8", features = ["rustls"] }
|
||||||
pingora-core = { version = "0.8", features = ["rustls"] }
|
pingora-core = { version = "0.8", features = ["rustls"] }
|
||||||
pingora-http = "0.8"
|
pingora-http = "0.8"
|
||||||
|
pingora-cache = { version = "0.8", features = ["rustls"] }
|
||||||
|
|
||||||
# HTTP header constants
|
# HTTP header constants
|
||||||
http = "1"
|
http = "1"
|
||||||
|
|||||||
@@ -50,6 +50,7 @@ fn make_detector() -> ScannerDetector {
|
|||||||
rewrites: vec![],
|
rewrites: vec![],
|
||||||
body_rewrites: vec![],
|
body_rewrites: vec![],
|
||||||
response_headers: vec![],
|
response_headers: vec![],
|
||||||
|
cache: None,
|
||||||
},
|
},
|
||||||
RouteConfig {
|
RouteConfig {
|
||||||
host_prefix: "src".into(),
|
host_prefix: "src".into(),
|
||||||
@@ -62,6 +63,7 @@ fn make_detector() -> ScannerDetector {
|
|||||||
rewrites: vec![],
|
rewrites: vec![],
|
||||||
body_rewrites: vec![],
|
body_rewrites: vec![],
|
||||||
response_headers: vec![],
|
response_headers: vec![],
|
||||||
|
cache: None,
|
||||||
},
|
},
|
||||||
RouteConfig {
|
RouteConfig {
|
||||||
host_prefix: "docs".into(),
|
host_prefix: "docs".into(),
|
||||||
@@ -74,6 +76,7 @@ fn make_detector() -> ScannerDetector {
|
|||||||
rewrites: vec![],
|
rewrites: vec![],
|
||||||
body_rewrites: vec![],
|
body_rewrites: vec![],
|
||||||
response_headers: vec![],
|
response_headers: vec![],
|
||||||
|
cache: None,
|
||||||
},
|
},
|
||||||
];
|
];
|
||||||
|
|
||||||
|
|||||||
50
src/cache.rs
Normal file
50
src/cache.rs
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
use pingora_cache::MemCache;
|
||||||
|
use std::sync::LazyLock;
|
||||||
|
|
||||||
|
/// In-memory cache backend shared across all requests.
|
||||||
|
/// `Storage` requires `&'static self`, so we store it in a static.
|
||||||
|
pub static CACHE_BACKEND: LazyLock<MemCache> = LazyLock::new(MemCache::new);
|
||||||
|
|
||||||
|
/// Parse s-maxage or max-age from a (lowercased) Cache-Control header value.
|
||||||
|
/// s-maxage takes priority (shared cache directive).
|
||||||
|
pub fn parse_cache_ttl(cc: &str) -> Option<u64> {
|
||||||
|
let mut max_age = None;
|
||||||
|
let mut s_maxage = None;
|
||||||
|
for part in cc.split(',') {
|
||||||
|
let part = part.trim();
|
||||||
|
if let Some(val) = part.strip_prefix("s-maxage=") {
|
||||||
|
s_maxage = val.trim().parse().ok();
|
||||||
|
} else if let Some(val) = part.strip_prefix("max-age=") {
|
||||||
|
max_age = val.trim().parse().ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s_maxage.or(max_age)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parse_max_age_only() {
|
||||||
|
assert_eq!(parse_cache_ttl("public, max-age=3600"), Some(3600));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parse_s_maxage_takes_priority() {
|
||||||
|
assert_eq!(
|
||||||
|
parse_cache_ttl("public, max-age=3600, s-maxage=60"),
|
||||||
|
Some(60)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parse_no_age_directives() {
|
||||||
|
assert_eq!(parse_cache_ttl("no-cache"), None);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parse_zero_max_age() {
|
||||||
|
assert_eq!(parse_cache_ttl("max-age=0"), Some(0));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -190,6 +190,25 @@ pub struct HeaderRule {
|
|||||||
pub value: String,
|
pub value: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Per-route HTTP response cache configuration.
|
||||||
|
#[derive(Debug, Deserialize, Clone)]
|
||||||
|
pub struct CacheConfig {
|
||||||
|
#[serde(default = "default_cache_enabled")]
|
||||||
|
pub enabled: bool,
|
||||||
|
/// Default TTL in seconds when the upstream response has no Cache-Control header.
|
||||||
|
#[serde(default = "default_cache_ttl")]
|
||||||
|
pub default_ttl_secs: u64,
|
||||||
|
/// Seconds to serve stale content while revalidating in the background.
|
||||||
|
#[serde(default)]
|
||||||
|
pub stale_while_revalidate_secs: u32,
|
||||||
|
/// Max cacheable response body size in bytes (0 = no limit).
|
||||||
|
#[serde(default)]
|
||||||
|
pub max_file_size: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_cache_enabled() -> bool { true }
|
||||||
|
fn default_cache_ttl() -> u64 { 60 }
|
||||||
|
|
||||||
#[derive(Debug, Deserialize, Clone)]
|
#[derive(Debug, Deserialize, Clone)]
|
||||||
pub struct RouteConfig {
|
pub struct RouteConfig {
|
||||||
pub host_prefix: String,
|
pub host_prefix: String,
|
||||||
@@ -220,6 +239,9 @@ pub struct RouteConfig {
|
|||||||
/// Extra response headers added to every response for this route.
|
/// Extra response headers added to every response for this route.
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub response_headers: Vec<HeaderRule>,
|
pub response_headers: Vec<HeaderRule>,
|
||||||
|
/// HTTP response cache configuration for this route.
|
||||||
|
#[serde(default)]
|
||||||
|
pub cache: Option<CacheConfig>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Config {
|
impl Config {
|
||||||
|
|||||||
@@ -9,5 +9,6 @@ pub mod dual_stack;
|
|||||||
pub mod proxy;
|
pub mod proxy;
|
||||||
pub mod rate_limit;
|
pub mod rate_limit;
|
||||||
pub mod scanner;
|
pub mod scanner;
|
||||||
|
pub mod cache;
|
||||||
pub mod ssh;
|
pub mod ssh;
|
||||||
pub mod static_files;
|
pub mod static_files;
|
||||||
|
|||||||
@@ -72,6 +72,19 @@ pub static RATE_LIMIT_DECISIONS: LazyLock<IntCounterVec> = LazyLock::new(|| {
|
|||||||
c
|
c
|
||||||
});
|
});
|
||||||
|
|
||||||
|
pub static CACHE_STATUS: LazyLock<IntCounterVec> = LazyLock::new(|| {
|
||||||
|
let c = IntCounterVec::new(
|
||||||
|
Opts::new(
|
||||||
|
"sunbeam_cache_status_total",
|
||||||
|
"Cache hit/miss counts",
|
||||||
|
),
|
||||||
|
&["status"],
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
REGISTRY.register(Box::new(c.clone())).unwrap();
|
||||||
|
c
|
||||||
|
});
|
||||||
|
|
||||||
pub static ACTIVE_CONNECTIONS: LazyLock<Gauge> = LazyLock::new(|| {
|
pub static ACTIVE_CONNECTIONS: LazyLock<Gauge> = LazyLock::new(|| {
|
||||||
let g = Gauge::new(
|
let g = Gauge::new(
|
||||||
"sunbeam_active_connections",
|
"sunbeam_active_connections",
|
||||||
|
|||||||
155
src/proxy.rs
155
src/proxy.rs
@@ -12,6 +12,7 @@ use arc_swap::ArcSwap;
|
|||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use http::header::{CONNECTION, EXPECT, HOST, UPGRADE};
|
use http::header::{CONNECTION, EXPECT, HOST, UPGRADE};
|
||||||
|
use pingora_cache::{CacheKey, CacheMeta, ForcedFreshness, HitHandler, NoCacheReason, RespCacheable};
|
||||||
use pingora_core::{upstreams::peer::HttpPeer, Result};
|
use pingora_core::{upstreams::peer::HttpPeer, Result};
|
||||||
use pingora_http::{RequestHeader, ResponseHeader};
|
use pingora_http::{RequestHeader, ResponseHeader};
|
||||||
use pingora_proxy::{ProxyHttp, Session};
|
use pingora_proxy::{ProxyHttp, Session};
|
||||||
@@ -47,7 +48,7 @@ pub struct SunbeamProxy {
|
|||||||
pub struct RequestCtx {
|
pub struct RequestCtx {
|
||||||
pub route: Option<RouteConfig>,
|
pub route: Option<RouteConfig>,
|
||||||
pub start_time: Instant,
|
pub start_time: Instant,
|
||||||
/// Unique request identifier (UUID v4).
|
/// Unique request identifier (monotonic hex counter).
|
||||||
pub request_id: String,
|
pub request_id: String,
|
||||||
/// Tracing span for this request.
|
/// Tracing span for this request.
|
||||||
pub span: tracing::Span,
|
pub span: tracing::Span,
|
||||||
@@ -477,7 +478,7 @@ impl ProxyHttp for SunbeamProxy {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Store route early so downstream hooks can access it.
|
// Store route early so request_cache_filter can access it.
|
||||||
ctx.route = Some(route.clone());
|
ctx.route = Some(route.clone());
|
||||||
|
|
||||||
// ── Static file serving ──────────────────────────────────────────
|
// ── Static file serving ──────────────────────────────────────────
|
||||||
@@ -592,11 +593,13 @@ impl ProxyHttp for SunbeamProxy {
|
|||||||
|
|
||||||
// Prepare body rewrite rules if the route has them.
|
// Prepare body rewrite rules if the route has them.
|
||||||
if !route.body_rewrites.is_empty() {
|
if !route.body_rewrites.is_empty() {
|
||||||
|
// We'll check content-type in upstream_response_filter; store rules now.
|
||||||
ctx.body_rewrite_rules = route
|
ctx.body_rewrite_rules = route
|
||||||
.body_rewrites
|
.body_rewrites
|
||||||
.iter()
|
.iter()
|
||||||
.map(|br| (br.find.clone(), br.replace.clone()))
|
.map(|br| (br.find.clone(), br.replace.clone()))
|
||||||
.collect();
|
.collect();
|
||||||
|
// Store the content-type filter info on the route for later.
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle Expect: 100-continue before connecting to upstream.
|
// Handle Expect: 100-continue before connecting to upstream.
|
||||||
@@ -614,6 +617,152 @@ impl ProxyHttp for SunbeamProxy {
|
|||||||
Ok(false)
|
Ok(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ── Cache hooks ────────────────────────────────────────────────────
|
||||||
|
// Runs AFTER request_filter (detection pipeline) and BEFORE upstream.
|
||||||
|
// On cache hit, the response is served directly — no upstream request,
|
||||||
|
// no request modifications, no body rewriting.
|
||||||
|
|
||||||
|
fn request_cache_filter(
|
||||||
|
&self,
|
||||||
|
session: &mut Session,
|
||||||
|
ctx: &mut RequestCtx,
|
||||||
|
) -> Result<()>
|
||||||
|
where
|
||||||
|
Self::CTX: Send + Sync,
|
||||||
|
{
|
||||||
|
// Only cache GET/HEAD.
|
||||||
|
let method = &session.req_header().method;
|
||||||
|
if method != http::Method::GET && method != http::Method::HEAD {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let cache_cfg = match ctx.route.as_ref().and_then(|r| r.cache.as_ref()) {
|
||||||
|
Some(c) if c.enabled => c,
|
||||||
|
_ => return Ok(()),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Skip cache if body rewrites are active (need per-response rewriting).
|
||||||
|
if !ctx.body_rewrite_rules.is_empty() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip cache if auth subrequest captured headers (per-user content).
|
||||||
|
if !ctx.auth_headers.is_empty() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
session.cache.enable(
|
||||||
|
&*crate::cache::CACHE_BACKEND,
|
||||||
|
None, // no eviction manager
|
||||||
|
None, // no predictor
|
||||||
|
None, // no cache lock
|
||||||
|
None, // no option overrides
|
||||||
|
);
|
||||||
|
|
||||||
|
if cache_cfg.max_file_size > 0 {
|
||||||
|
session
|
||||||
|
.cache
|
||||||
|
.set_max_file_size_bytes(cache_cfg.max_file_size);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn cache_key_callback(
|
||||||
|
&self,
|
||||||
|
session: &Session,
|
||||||
|
_ctx: &mut RequestCtx,
|
||||||
|
) -> Result<CacheKey> {
|
||||||
|
let req = session.req_header();
|
||||||
|
let host = req
|
||||||
|
.headers
|
||||||
|
.get(HOST)
|
||||||
|
.and_then(|v| v.to_str().ok())
|
||||||
|
.unwrap_or("");
|
||||||
|
let path = req.uri.path();
|
||||||
|
let key = match req.uri.query() {
|
||||||
|
Some(q) => format!("{host}{path}?{q}"),
|
||||||
|
None => format!("{host}{path}"),
|
||||||
|
};
|
||||||
|
Ok(CacheKey::new("", key, ""))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn response_cache_filter(
|
||||||
|
&self,
|
||||||
|
_session: &Session,
|
||||||
|
resp: &ResponseHeader,
|
||||||
|
ctx: &mut RequestCtx,
|
||||||
|
) -> Result<RespCacheable> {
|
||||||
|
use std::time::{Duration, SystemTime};
|
||||||
|
|
||||||
|
// Only cache 2xx responses.
|
||||||
|
if !resp.status.is_success() {
|
||||||
|
return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
|
||||||
|
}
|
||||||
|
|
||||||
|
let cache_cfg = match ctx.route.as_ref().and_then(|r| r.cache.as_ref()) {
|
||||||
|
Some(c) => c,
|
||||||
|
None => {
|
||||||
|
return Ok(RespCacheable::Uncacheable(NoCacheReason::NeverEnabled));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Respect Cache-Control: no-store, private.
|
||||||
|
if let Some(cc) = resp
|
||||||
|
.headers
|
||||||
|
.get("cache-control")
|
||||||
|
.and_then(|v| v.to_str().ok())
|
||||||
|
{
|
||||||
|
let cc_lower = cc.to_ascii_lowercase();
|
||||||
|
if cc_lower.contains("no-store") || cc_lower.contains("private") {
|
||||||
|
return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
|
||||||
|
}
|
||||||
|
if let Some(ttl) = crate::cache::parse_cache_ttl(&cc_lower) {
|
||||||
|
if ttl == 0 {
|
||||||
|
return Ok(RespCacheable::Uncacheable(NoCacheReason::OriginNotCache));
|
||||||
|
}
|
||||||
|
let meta = CacheMeta::new(
|
||||||
|
SystemTime::now() + Duration::from_secs(ttl),
|
||||||
|
SystemTime::now(),
|
||||||
|
cache_cfg.stale_while_revalidate_secs,
|
||||||
|
0,
|
||||||
|
resp.clone(),
|
||||||
|
);
|
||||||
|
return Ok(RespCacheable::Cacheable(meta));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// No Cache-Control or no max-age: use route's default TTL.
|
||||||
|
let meta = CacheMeta::new(
|
||||||
|
SystemTime::now() + Duration::from_secs(cache_cfg.default_ttl_secs),
|
||||||
|
SystemTime::now(),
|
||||||
|
cache_cfg.stale_while_revalidate_secs,
|
||||||
|
0,
|
||||||
|
resp.clone(),
|
||||||
|
);
|
||||||
|
Ok(RespCacheable::Cacheable(meta))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn cache_hit_filter(
|
||||||
|
&self,
|
||||||
|
_session: &mut Session,
|
||||||
|
_meta: &CacheMeta,
|
||||||
|
_hit_handler: &mut HitHandler,
|
||||||
|
_is_fresh: bool,
|
||||||
|
_ctx: &mut RequestCtx,
|
||||||
|
) -> Result<Option<ForcedFreshness>>
|
||||||
|
where
|
||||||
|
Self::CTX: Send + Sync,
|
||||||
|
{
|
||||||
|
metrics::CACHE_STATUS.with_label_values(&["hit"]).inc();
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn cache_miss(&self, session: &mut Session, _ctx: &mut RequestCtx) {
|
||||||
|
metrics::CACHE_STATUS.with_label_values(&["miss"]).inc();
|
||||||
|
session.cache.cache_miss();
|
||||||
|
}
|
||||||
|
|
||||||
async fn upstream_peer(
|
async fn upstream_peer(
|
||||||
&self,
|
&self,
|
||||||
session: &mut Session,
|
session: &mut Session,
|
||||||
@@ -661,6 +810,7 @@ impl ProxyHttp for SunbeamProxy {
|
|||||||
rewrites: vec![],
|
rewrites: vec![],
|
||||||
body_rewrites: vec![],
|
body_rewrites: vec![],
|
||||||
response_headers: vec![],
|
response_headers: vec![],
|
||||||
|
cache: None,
|
||||||
});
|
});
|
||||||
return Ok(Box::new(HttpPeer::new(
|
return Ok(Box::new(HttpPeer::new(
|
||||||
backend_addr(&pr.backend),
|
backend_addr(&pr.backend),
|
||||||
@@ -1081,6 +1231,7 @@ mod tests {
|
|||||||
}],
|
}],
|
||||||
body_rewrites: vec![],
|
body_rewrites: vec![],
|
||||||
response_headers: vec![],
|
response_headers: vec![],
|
||||||
|
cache: None,
|
||||||
}];
|
}];
|
||||||
let compiled = SunbeamProxy::compile_rewrites(&routes);
|
let compiled = SunbeamProxy::compile_rewrites(&routes);
|
||||||
assert_eq!(compiled.len(), 1);
|
assert_eq!(compiled.len(), 1);
|
||||||
|
|||||||
@@ -166,6 +166,7 @@ mod tests {
|
|||||||
rewrites: vec![],
|
rewrites: vec![],
|
||||||
body_rewrites: vec![],
|
body_rewrites: vec![],
|
||||||
response_headers: vec![],
|
response_headers: vec![],
|
||||||
|
cache: None,
|
||||||
}];
|
}];
|
||||||
ScannerDetector::new(&model, &routes)
|
ScannerDetector::new(&model, &routes)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -104,6 +104,7 @@ fn start_proxy_once(backend_port: u16) {
|
|||||||
rewrites: vec![],
|
rewrites: vec![],
|
||||||
body_rewrites: vec![],
|
body_rewrites: vec![],
|
||||||
response_headers: vec![],
|
response_headers: vec![],
|
||||||
|
cache: None,
|
||||||
}];
|
}];
|
||||||
let acme_routes: AcmeRoutes = Arc::new(RwLock::new(HashMap::new()));
|
let acme_routes: AcmeRoutes = Arc::new(RwLock::new(HashMap::new()));
|
||||||
let compiled_rewrites = SunbeamProxy::compile_rewrites(&routes);
|
let compiled_rewrites = SunbeamProxy::compile_rewrites(&routes);
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ fn test_routes() -> Vec<RouteConfig> {
|
|||||||
rewrites: vec![],
|
rewrites: vec![],
|
||||||
body_rewrites: vec![],
|
body_rewrites: vec![],
|
||||||
response_headers: vec![],
|
response_headers: vec![],
|
||||||
|
cache: None,
|
||||||
},
|
},
|
||||||
RouteConfig {
|
RouteConfig {
|
||||||
host_prefix: "api".into(),
|
host_prefix: "api".into(),
|
||||||
@@ -30,6 +31,7 @@ fn test_routes() -> Vec<RouteConfig> {
|
|||||||
rewrites: vec![],
|
rewrites: vec![],
|
||||||
body_rewrites: vec![],
|
body_rewrites: vec![],
|
||||||
response_headers: vec![],
|
response_headers: vec![],
|
||||||
|
cache: None,
|
||||||
},
|
},
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user