From a5810dd8a7400d5e0b20f6199c702823198a6aa7 Mon Sep 17 00:00:00 2001 From: Sienna Meridian Satterwhite Date: Tue, 10 Mar 2026 23:38:20 +0000 Subject: [PATCH] feat: configurable k8s resources, CSIC training pipeline, unified Dockerfile - Make K8s namespace, TLS secret, and config ConfigMap names configurable via [kubernetes] config section (previously hardcoded to "ingress") - Add CSIC 2010 dataset converter and auto-download for scanner training - Unify Dockerfile for local and production builds (remove cross-compile path) - Bake ML models directory into container image - Update CSIC dataset URL to self-hosted mirror (src.sunbeam.pt) - Fix rate_limit pipeline log missing fields - Consolidate docs/README.md into root README.md Signed-off-by: Sienna Meridian Satterwhite --- AGENTS.md | 3 +- Cargo.toml | 2 +- Dockerfile | 18 +- README.md | 386 ++++++++++++++++++++++++++++++++---- docs/README.md | 406 -------------------------------------- models/.gitkeep | 0 scripts/convert_csic.py | 5 +- src/acme.rs | 17 +- src/cert.rs | 23 ++- src/config.rs | 30 +++ src/ddos/detector.rs | 1 + src/ddos/features.rs | 10 + src/ddos/train.rs | 2 +- src/main.rs | 37 +++- src/proxy.rs | 6 +- src/scanner/allowlist.rs | 2 +- src/scanner/csic.rs | 411 +++++++++++++++++++++++++++++++++++++++ src/scanner/detector.rs | 5 +- src/scanner/features.rs | 1 + src/scanner/mod.rs | 1 + src/scanner/train.rs | 50 +++++ src/ssh.rs | 4 +- src/watcher.rs | 40 ++-- 23 files changed, 946 insertions(+), 514 deletions(-) delete mode 100644 docs/README.md create mode 100644 models/.gitkeep create mode 100644 src/scanner/csic.rs diff --git a/AGENTS.md b/AGENTS.md index 9e8fc26..c021378 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -41,7 +41,7 @@ sunbeam-proxy is a TLS-terminating reverse proxy built on [Pingora](https://gith - **SSH TCP passthrough**: raw TCP proxy for SSH traffic (port 22 to Gitea) - **HTTP-to-HTTPS redirect**: with per-route opt-out via `disable_secure_redirection` -See [docs/README.md](docs/README.md) for full feature documentation and configuration reference. +See [README.md](README.md) for full feature documentation and configuration reference. ## Source Files @@ -64,7 +64,6 @@ src/rate_limit/ — Leaky bucket rate limiter (limiter, key extraction) src/dual_stack.rs — Dual-stack (IPv4+IPv6) TCP listener tests/e2e.rs — end-to-end test: real SunbeamProxy over plain HTTP with echo backend tests/proptest.rs — property-based tests for static files, rewrites, config, metrics, etc. -docs/README.md — comprehensive feature documentation ``` ## Architecture Invariants — Do Not Break These diff --git a/Cargo.toml b/Cargo.toml index 573e76e..3443b8b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,7 +61,7 @@ bytes = "1" regex = "1" # Auth subrequests -reqwest = { version = "0.12", features = ["rustls-tls"], default-features = false } +reqwest = { version = "0.12", features = ["rustls-tls", "blocking"], default-features = false } # Rustls crypto provider — must be installed before any TLS init rustls = { version = "0.23", features = ["aws-lc-rs"] } diff --git a/Dockerfile b/Dockerfile index 2d28ff8..2ba450f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,17 +1,10 @@ # ── Stage 1: build ────────────────────────────────────────────── -# rust:slim tracks the latest stable Rust release. -# Multi-arch image; Docker buildx selects the native platform image. FROM rust:slim AS builder ARG TARGETARCH -# musl-tools: musl-gcc for static linking. -# curl: download tini init binary. -# No cmake/go needed: we use the rustls feature flag (pure Rust TLS). RUN apt-get update && apt-get install -y musl-tools curl cmake && rm -rf /var/lib/apt/lists/* -# Map Docker TARGETARCH to the appropriate Rust musl target, -# then configure Cargo to use musl-gcc as the linker for that target. RUN case "${TARGETARCH}" in \ "amd64") RUST_TARGET="x86_64-unknown-linux-musl" ;; \ "arm64") RUST_TARGET="aarch64-unknown-linux-musl" ;; \ @@ -26,24 +19,17 @@ RUN case "${TARGETARCH}" in \ ENV RUSTFLAGS="-C target-feature=+crt-static" WORKDIR /build -# Cache dependency compilation separately from source changes. -# RUSTFLAGS must match the real build or Cargo will recompile everything. COPY Cargo.toml Cargo.lock ./ RUN mkdir src && \ echo 'fn main() {}' > src/main.rs && \ cargo build --release --target "$(cat /rust-target)" ; \ rm -rf src -# Build the real binary. COPY src/ ./src/ RUN touch src/main.rs && \ cargo build --release --target "$(cat /rust-target)" && \ cp "target/$(cat /rust-target)/release/sunbeam-proxy" /sunbeam-proxy -# Download tini static init binary (musl, no glibc dependency). -# tini as PID 1 ensures the container stays alive when Pingora re-execs itself -# during a graceful upgrade: the new process is re-parented to tini, and tini -# correctly reaps the old process when it exits after draining connections. RUN case "${TARGETARCH}" in \ "amd64") TINI_ARCH="amd64" ;; \ "arm64") TINI_ARCH="arm64" ;; \ @@ -54,14 +40,12 @@ RUN case "${TARGETARCH}" in \ chmod +x /tini # ── Stage 2: distroless final ──────────────────────────────────── -# cgr.dev/chainguard/static is multi-arch (amd64 + arm64). -# No shell, no package manager — minimal attack surface. FROM cgr.dev/chainguard/static:latest COPY --from=builder /tini /tini COPY --from=builder /sunbeam-proxy /usr/local/bin/sunbeam-proxy +COPY models/ /models/ EXPOSE 80 443 -# tini as PID 1 so Pingora's graceful-upgrade re-exec doesn't kill the container. ENTRYPOINT ["/tini", "--", "/usr/local/bin/sunbeam-proxy"] diff --git a/README.md b/README.md index 1315a87..975e04b 100644 --- a/README.md +++ b/README.md @@ -2,33 +2,32 @@ A cloud-native reverse proxy with adaptive ML threat detection. Built on [Pingora](https://github.com/cloudflare/pingora) by [Sunbeam Studios](https://sunbeam.pt). -Sunbeam Proxy learns what normal traffic looks like *for your infrastructure* and automatically adapts its defenses. Instead of relying on generic rulesets written for someone else's problems, it trains on your own audit logs to build behavioral models that protect against the threats you actually face. +Sunbeam Proxy learns what normal traffic looks like *for your infrastructure* and adapts its defenses automatically. Instead of relying on generic rulesets written for someone else's problems, it trains on your own audit logs to build behavioral models that protect against the threats you actually face. -## Why It Exists +## Why it exists -We are a small, women-led queer game studio and we need to be able to handle extraordinary threats in today's internet. We have a small team and a small budget, so we need to be able to do more with less. We also need to be able to scale up quickly when we need to without having to worry about the security of our infrastructure. However, the problems faced in different regions, and with different bot nets, DDoS attacks, and other threats, make it difficult to find a scalable solution. +We're a small, women-led queer game studio and we need to handle extraordinary threats on today's internet. Small team, small budget, but the same DDoS attacks, vulnerability scanners, and bot nets that hit everyone else. Off-the-shelf solutions either cost too much or apply someone else's rules to our traffic. So we built a proxy that learns from what it sees and gets better at protecting us over time — and we figured others could use it too. ## What it does -**Adaptive threat detection** — Two ML models run in the request pipeline. A KNN-based DDoS detector classifies per-IP behavior over sliding windows. A logistic regression scanner detector catches vulnerability probes, directory enumeration, and bot traffic per-request. Both models are trained on your logs, hot-reloaded without downtime, and continuously improvable as your traffic evolves. +**Adaptive threat detection** — Two ML models run in the request pipeline. A KNN-based DDoS detector classifies per-IP behavior over sliding windows. A logistic regression scanner detector catches vulnerability probes, directory enumeration, and bot traffic per-request. Both models train on your logs, hot-reload without downtime, and improve continuously as your traffic evolves. -**Rate limiting** — Leaky bucket throttling with identity-aware keys (session cookies, bearer tokens, or IP fallback). Separate limits for authenticated and unauthenticated traffic. 256-shard concurrent map, zero contention. +**Rate limiting** — Leaky bucket throttling with identity-aware keys (session cookies, bearer tokens, or IP fallback). Separate limits for authenticated and unauthenticated traffic. -**HTTP response caching** — Per-route in-memory cache backed by pingora-cache. Respects `Cache-Control`, supports `stale-while-revalidate`, and sits after the security pipeline so blocked requests never touch the cache. +**HTTP response caching** — Per-route in-memory cache backed by pingora-cache. Respects `Cache-Control`, supports `stale-while-revalidate`, sits after the security pipeline so blocked requests never touch the cache. -**Static file serving** — Serve frontends directly from the proxy with try_files chains, SPA fallback, content-type detection, and cache headers. Replace nginx/caddy sidecar containers with a single config block. +**Static file serving** — Serve frontends directly from the proxy with try_files chains, SPA fallback, content-type detection, and cache headers. Replaces nginx/caddy sidecar containers with a single config block. -**Everything else you need from a reverse proxy** — TLS termination with cert hot-reload, host-prefix routing, path sub-routes with prefix stripping, regex URL rewrites, response body rewriting (like nginx `sub_filter`), auth subrequests, WebSocket forwarding, SSH TCP passthrough, HTTP-to-HTTPS redirect, ACME HTTP-01 challenge routing, and Prometheus metrics with request tracing. +**Everything else** — TLS termination with cert hot-reload, host-prefix routing, path sub-routes with prefix stripping, regex URL rewrites, response body rewriting (nginx `sub_filter`), auth subrequests, WebSocket forwarding, SSH TCP passthrough, HTTP-to-HTTPS redirect, ACME HTTP-01 challenge routing, Prometheus metrics, and per-request tracing. ## Quick start ```sh cargo build SUNBEAM_CONFIG=dev.toml RUST_LOG=info cargo run +cargo test ``` -See [docs/](docs/README.md) for full configuration reference. - ## The self-learning loop ``` @@ -54,22 +53,22 @@ See [docs/](docs/README.md) for full configuration reference. (no restart needed) ``` -Every request produces a structured audit log with 15+ behavioral features. Feed those logs back into the training pipeline and the models get better at distinguishing your real users from threats — automatically, without manual rule-writing. +Every request produces a structured audit log with 15+ behavioral features. Feed those logs back into the training pipeline and the models get better at telling your real users apart from threats — no manual rule-writing required. ```sh # Train DDoS model from your audit logs -cargo run -- train --input logs.jsonl --output ddos_model.bin --heuristics heuristics.toml +cargo run -- train-ddos --input logs.jsonl --output ddos_model.bin --heuristics heuristics.toml -# Train scanner model -cargo run -- train-scanner --input logs.jsonl --output scanner_model.bin +# Train scanner model (--csic mixes in the CSIC 2010 dataset as a base) +cargo run -- train-scanner --input logs.jsonl --output scanner_model.bin --csic # Replay logs to evaluate model accuracy -cargo run -- replay --input logs.jsonl --model ddos_model.bin +cargo run -- replay-ddos --input logs.jsonl --model ddos_model.bin ``` ## Detection pipeline -Every HTTPS request passes through three detection layers before reaching your backend: +Every HTTPS request passes through three layers before reaching your backend: | Layer | Model | Granularity | Response | |-------|-------|-------------|----------| @@ -79,45 +78,360 @@ Every HTTPS request passes through three detection layers before reaching your b Verified bots (Googlebot, Bingbot, etc.) bypass scanner detection via reverse-DNS verification and configurable allowlists. -## Configuration +``` +Request + │ + ├── DDoS detection (KNN per-IP) + │ └── blocked → 429 + │ + ├── Scanner detection (logistic regression per-request) + │ └── blocked → 403 + │ + ├── Rate limiting (leaky bucket per-identity) + │ └── blocked → 429 + │ + ├── Cache lookup + │ └── hit → serve cached response + │ + └── Upstream request + ├── Auth subrequest (if configured) + ├── Response body rewriting (if configured) + └── Response to client +``` -Everything is TOML. Here's a route that serves a frontend statically with an API backend, response body rewriting, caching, and custom headers: +--- + +## Configuration reference + +All configuration is TOML, loaded from `$SUNBEAM_CONFIG` or `/etc/pingora/config.toml`. + +### Listeners and TLS + +```toml +[listen] +http = "0.0.0.0:80" +https = "0.0.0.0:443" + +[tls] +cert_path = "/etc/ssl/tls.crt" +key_path = "/etc/ssl/tls.key" +``` + +### Telemetry + +```toml +[telemetry] +otlp_endpoint = "" # OpenTelemetry OTLP endpoint (empty = disabled) +metrics_port = 9090 # Prometheus scrape port (0 = disabled) +``` + +### Kubernetes + +Resource names and namespaces for the cert/config watchers and ACME Ingress routing. Override these if you've renamed the namespace, TLS Secret, or ConfigMap from the defaults. + +```toml +[kubernetes] +namespace = "ingress" # namespace for Secret, ConfigMap, and Ingress watches +tls_secret = "pingora-tls" # TLS Secret name (watched for cert hot-reload) +config_configmap = "pingora-config" # ConfigMap name (watched for config hot-reload) +``` + +All three fields default to the values shown above, so the section can be omitted entirely if you're using the standard naming. + +### Routes + +Each route maps a host prefix to a backend. `host_prefix = "docs"` matches requests to `docs.`. ```toml [[routes]] -host_prefix = "docs" -backend = "http://docs-backend:8080" -static_root = "/srv/docs" -fallback = "index.html" +host_prefix = "docs" +backend = "http://docs-backend.default.svc.cluster.local:8080" +websocket = false # forward WebSocket upgrade headers +disable_secure_redirection = false # true = allow plain HTTP +``` -[routes.cache] -enabled = true -default_ttl_secs = 300 +#### Path sub-routes +Path sub-routes use longest-prefix matching within a host, so you can mix static file serving with API proxying on the same domain. + +```toml +[[routes.paths]] +prefix = "/api" +backend = "http://api-backend:8000" +strip_prefix = true # /api/users → /users +websocket = false +``` + +#### Static file serving + +When a route has `static_root` set, the proxy tries to serve files from disk before forwarding to the upstream backend. Candidates are checked in order: + +1. `$static_root/$uri` — exact file +2. `$static_root/$uri.html` — with `.html` extension +3. `$static_root/$uri/index.html` — directory index +4. `$static_root/$fallback` — SPA fallback + +If nothing matches, the request goes to the backend as usual. + +```toml +[[routes]] +host_prefix = "meet" +backend = "http://meet-backend:8080" +static_root = "/srv/meet" +fallback = "index.html" +``` + +Content types are detected by file extension: + +| Extensions | Content-Type | +|-----------|-------------| +| `html`, `htm` | `text/html; charset=utf-8` | +| `css` | `text/css; charset=utf-8` | +| `js`, `mjs` | `application/javascript; charset=utf-8` | +| `json` | `application/json; charset=utf-8` | +| `svg` | `image/svg+xml` | +| `png`, `jpg`, `gif`, `webp`, `avif` | `image/*` | +| `woff`, `woff2`, `ttf`, `otf` | `font/*` | +| `wasm` | `application/wasm` | + +Cache-control headers are set automatically: + +| Extensions | Cache-Control | +|-----------|-------------| +| `js`, `css`, `woff2`, `wasm` | `public, max-age=31536000, immutable` | +| `png`, `jpg`, `svg`, `ico` | `public, max-age=86400` | +| Everything else | `no-cache` | + +Path sub-routes always take priority over static serving. Path traversal (`..`) is rejected. + +#### URL rewrites + +Regex patterns are compiled at startup and applied before static file lookup. First match wins. + +```toml [[routes.rewrites]] -pattern = "^/docs/[0-9a-f-]+/?$" +pattern = "^/docs/[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}/?$" target = "/docs/[id]/index.html" +``` +#### Response body rewriting + +Find/replace on response bodies, like nginx `sub_filter`. Only applies to `text/html`, `application/javascript`, and `text/javascript` responses — binary responses pass through untouched. + +The full response is buffered before substitution (fine for HTML/JS, typically under 1MB). `Content-Length` is removed since the body size may change. + +```toml [[routes.body_rewrites]] find = "old-domain.example.com" -replace = "docs.sunbeam.pt" +replace = "new-domain.sunbeam.pt" +``` +#### Custom response headers + +```toml [[routes.response_headers]] name = "X-Frame-Options" value = "DENY" - -[[routes.paths]] -prefix = "/api" -backend = "http://docs-api:8000" -strip_prefix = true ``` +#### Auth subrequests + +Path routes can require an HTTP auth check before forwarding upstream, similar to nginx `auth_request`. + +```toml +[[routes.paths]] +prefix = "/media" +backend = "http://seaweedfs-filer:8333" +strip_prefix = true +auth_request = "http://drive-backend/api/v1.0/items/media-auth/" +auth_capture_headers = ["Authorization", "X-Amz-Date", "X-Amz-Content-Sha256"] +upstream_path_prefix = "/sunbeam-drive/" +``` + +The proxy sends a GET to `auth_request` with the original `Cookie`, `Authorization`, and `X-Original-URI` headers. + +| Auth response | Result | +|--------------|--------| +| 2xx | Capture specified headers, forward to backend | +| Non-2xx | 403 to client | +| Network error | 502 to client | + +#### HTTP response cache + +Per-route in-memory cache backed by pingora-cache. + +```toml +[routes.cache] +enabled = true +default_ttl_secs = 60 # TTL when upstream has no Cache-Control +stale_while_revalidate_secs = 0 # serve stale while revalidating +max_file_size = 0 # max cacheable body size (0 = unlimited) +``` + +The cache sits after the security pipeline (`Request → DDoS → Scanner → Rate Limit → Cache → Upstream`), so blocked requests never populate it. + +- Only caches GET and HEAD requests +- Respects `Cache-Control: no-store` and `Cache-Control: private` +- TTL priority: `s-maxage` > `max-age` > `default_ttl_secs` +- Skips routes with body rewrites (content varies per-response) +- Skips requests with auth subrequest headers (content varies per-user) +- Cache key: `{host}{path}?{query}` + +### SSH passthrough + +Raw TCP proxy for SSH traffic. + +```toml +[ssh] +listen = "0.0.0.0:22" +backend = "gitea-ssh.devtools.svc.cluster.local:2222" +``` + +### DDoS detection + +KNN-based per-IP behavioral classification over sliding windows. 14-feature vectors cover request rate, path diversity, error rate, cookie/referer presence, and more. + +```toml +[ddos] +enabled = true +model_path = "ddos_model.bin" +k = 5 +threshold = 0.6 +window_secs = 60 +window_capacity = 1000 +min_events = 10 +``` + +### Scanner detection + +Logistic regression per-request classification with verified bot allowlist and model hot-reload. + +```toml +[scanner] +enabled = true +model_path = "scanner_model.bin" +threshold = 0.5 +poll_interval_secs = 30 # hot-reload check interval (0 = disabled) +bot_cache_ttl_secs = 86400 # verified bot IP cache TTL + +[[scanner.allowlist]] +ua_prefix = "Googlebot" +reason = "Google crawler" +dns_suffixes = ["googlebot.com", "google.com"] +cidrs = ["66.249.64.0/19"] +``` + +### Rate limiting + +Leaky bucket per-identity throttling. Identity is resolved as: session cookie > bearer token > client IP. + +```toml +[rate_limit] +enabled = true +eviction_interval_secs = 300 +stale_after_secs = 600 +bypass_cidrs = ["10.42.0.0/16"] + +[rate_limit.authenticated] +burst = 200 +rate = 50.0 + +[rate_limit.unauthenticated] +burst = 50 +rate = 10.0 +``` + +--- + ## Observability -- **Request IDs**: UUID v4 per request, forwarded via `X-Request-Id` to upstreams and clients -- **Prometheus metrics**: `GET /metrics` on configurable port — request totals, latency histograms, detection decisions, cache hit rates, active connections -- **Health checks**: `GET /health` returns 200 for k8s probes -- **Structured audit logs**: JSON with request ID, client IP, timing, headers, backend, detection decisions +### Request IDs + +Every request gets a UUID v4 request ID, attached to a `tracing::info_span!` so all log lines within the request inherit it. The ID is forwarded upstream and returned to clients via the `X-Request-Id` header. + +### Prometheus metrics + +Served at `GET /metrics` on `metrics_port` (default 9090). `GET /health` returns 200 for k8s probes. + +| Metric | Type | Labels | +|--------|------|--------| +| `sunbeam_requests_total` | Counter | `method`, `host`, `status`, `backend` | +| `sunbeam_request_duration_seconds` | Histogram | — | +| `sunbeam_ddos_decisions_total` | Counter | `decision` | +| `sunbeam_scanner_decisions_total` | Counter | `decision`, `reason` | +| `sunbeam_rate_limit_decisions_total` | Counter | `decision` | +| `sunbeam_cache_status_total` | Counter | `status` | +| `sunbeam_active_connections` | Gauge | — | + +```yaml +# Prometheus scrape config +- job_name: sunbeam-proxy + static_configs: + - targets: ['sunbeam-proxy.ingress.svc.cluster.local:9090'] +``` + +### Audit logs + +Every request produces a structured JSON log line (`target = "audit"`): + +```json +{ + "request_id": "550e8400-e29b-41d4-a716-446655440000", + "method": "GET", + "host": "docs.sunbeam.pt", + "path": "/api/v1/pages", + "query": "limit=10", + "client_ip": "203.0.113.42", + "status": 200, + "duration_ms": 23, + "content_length": 0, + "user_agent": "Mozilla/5.0 ...", + "referer": "https://docs.sunbeam.pt/", + "accept_language": "en-US", + "accept": "text/html", + "has_cookies": true, + "cf_country": "FR", + "backend": "http://docs-backend:8080", + "error": null +} +``` + +### Detection pipeline logs + +Each security layer logs its decision before acting, so the training pipeline always sees the full traffic picture: + +``` +layer=ddos → all HTTPS traffic +layer=scanner → traffic that passed DDoS +layer=rate_limit → traffic that passed scanner +``` + +--- + +## CLI commands + +```sh +# Start the proxy server +sunbeam-proxy serve [--upgrade] + +# Train DDoS model from audit logs +sunbeam-proxy train-ddos --input logs.jsonl --output ddos_model.bin \ + [--attack-ips ips.txt] [--normal-ips ips.txt] \ + [--heuristics heuristics.toml] [--k 5] [--threshold 0.6] + +# Replay logs through the DDoS detection pipeline +sunbeam-proxy replay-ddos --input logs.jsonl --model ddos_model.bin \ + [--config config.toml] [--rate-limit] + +# Train scanner model +sunbeam-proxy train-scanner --input logs.jsonl --output scanner_model.bin \ + [--wordlists path/to/wordlists] [--threshold 0.5] + +# Train scanner model with CSIC 2010 base dataset (auto-downloaded, cached locally) +sunbeam-proxy train-scanner --input logs.jsonl --output scanner_model.bin --csic +``` + +--- ## Building diff --git a/docs/README.md b/docs/README.md deleted file mode 100644 index ea13b0e..0000000 --- a/docs/README.md +++ /dev/null @@ -1,406 +0,0 @@ ---- -layout: default -title: Sunbeam Proxy Documentation -description: Configuration reference and feature documentation for Sunbeam Proxy -toc: true ---- - -# Sunbeam Proxy Documentation - -Complete reference for configuring and operating Sunbeam Proxy — a TLS-terminating reverse proxy built on [Pingora](https://github.com/cloudflare/pingora) 0.8. - -## Quick Start - -```sh -# Local development -SUNBEAM_CONFIG=dev.toml RUST_LOG=info cargo run - -# Run tests -cargo nextest run - -# Build release (linux-musl for containers) -cargo build --release --target x86_64-unknown-linux-musl -``` - ---- - -## Configuration Reference - -Configuration is TOML, loaded from `$SUNBEAM_CONFIG` or `/etc/pingora/config.toml`. - -### Listeners & TLS - -```toml -[listen] -http = "0.0.0.0:80" -https = "0.0.0.0:443" - -[tls] -cert_path = "/etc/ssl/tls.crt" -key_path = "/etc/ssl/tls.key" -``` - -### Telemetry - -```toml -[telemetry] -otlp_endpoint = "" # OpenTelemetry OTLP endpoint (empty = disabled) -metrics_port = 9090 # Prometheus scrape port (0 = disabled) -``` - -### Routes - -Each route maps a host prefix to a backend. `host_prefix = "docs"` matches requests to `docs.`. - -```toml -[[routes]] -host_prefix = "docs" -backend = "http://docs-backend.default.svc.cluster.local:8080" -websocket = false # forward WebSocket upgrade headers -disable_secure_redirection = false # true = allow plain HTTP -``` - -#### Path Sub-Routes - -Longest-prefix match within a host. Mix static serving with API proxying. - -```toml -[[routes.paths]] -prefix = "/api" -backend = "http://api-backend:8000" -strip_prefix = true # /api/users → /users -websocket = false -``` - -#### Static File Serving - -Serve frontends directly from the proxy. The try_files chain checks candidates in order: - -1. `$static_root/$uri` — exact file -2. `$static_root/$uri.html` — with `.html` extension -3. `$static_root/$uri/index.html` — directory index -4. `$static_root/$fallback` — SPA fallback - -If nothing matches, the request falls through to the upstream backend. - -```toml -[[routes]] -host_prefix = "meet" -backend = "http://meet-backend:8080" -static_root = "/srv/meet" -fallback = "index.html" -``` - -**Content-type detection** is based on file extension: - -| Extensions | Content-Type | -|-----------|-------------| -| `html`, `htm` | `text/html; charset=utf-8` | -| `css` | `text/css; charset=utf-8` | -| `js`, `mjs` | `application/javascript; charset=utf-8` | -| `json` | `application/json; charset=utf-8` | -| `svg` | `image/svg+xml` | -| `png`, `jpg`, `gif`, `webp`, `avif` | `image/*` | -| `woff`, `woff2`, `ttf`, `otf` | `font/*` | -| `wasm` | `application/wasm` | - -**Cache-control headers** are set per extension type: - -| Extensions | Cache-Control | -|-----------|-------------| -| `js`, `css`, `woff2`, `wasm` | `public, max-age=31536000, immutable` | -| `png`, `jpg`, `svg`, `ico` | `public, max-age=86400` | -| Everything else | `no-cache` | - -Path sub-routes take priority over static serving — if `/api` matches a path route, it goes to that backend even if a static file exists. - -Path traversal (`..`) is rejected and falls through to the upstream. - -#### URL Rewrites - -Regex patterns compiled at startup, applied before static file lookup. First match wins. - -```toml -[[routes.rewrites]] -pattern = "^/docs/[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}/?$" -target = "/docs/[id]/index.html" -``` - -#### Response Body Rewriting - -Find/replace in response bodies, like nginx `sub_filter`. Only applies to `text/html`, `application/javascript`, and `text/javascript` responses. Binary responses pass through untouched. - -The entire response is buffered in memory before substitution (fine for HTML/JS — typically <1MB). `Content-Length` is removed since the body size may change. - -```toml -[[routes.body_rewrites]] -find = "old-domain.example.com" -replace = "new-domain.sunbeam.pt" -``` - -#### Custom Response Headers - -```toml -[[routes.response_headers]] -name = "X-Frame-Options" -value = "DENY" -``` - -#### Auth Subrequests - -Gate path routes with an HTTP auth check before forwarding upstream. Similar to nginx `auth_request`. - -```toml -[[routes.paths]] -prefix = "/media" -backend = "http://seaweedfs-filer:8333" -strip_prefix = true -auth_request = "http://drive-backend/api/v1.0/items/media-auth/" -auth_capture_headers = ["Authorization", "X-Amz-Date", "X-Amz-Content-Sha256"] -upstream_path_prefix = "/sunbeam-drive/" -``` - -The auth subrequest sends a GET to `auth_request` with the original `Cookie`, `Authorization`, and `X-Original-URI` headers. - -| Auth response | Proxy behavior | -|--------------|----------------| -| 2xx | Capture specified headers, forward to backend | -| Non-2xx | Return 403 to client | -| Network error | Return 502 to client | - -#### HTTP Response Cache - -Per-route in-memory cache backed by pingora-cache. - -```toml -[routes.cache] -enabled = true -default_ttl_secs = 60 # TTL when upstream has no Cache-Control -stale_while_revalidate_secs = 0 # serve stale while revalidating -max_file_size = 0 # max cacheable body size (0 = unlimited) -``` - -**Pipeline position**: Cache runs after the security pipeline and before upstream modifications. - -``` -Request → DDoS → Scanner → Rate Limit → Cache → Upstream -``` - -Cache behavior: -- Only caches GET and HEAD requests -- Respects `Cache-Control: no-store` and `Cache-Control: private` -- TTL priority: `s-maxage` > `max-age` > `default_ttl_secs` -- Skips routes with body rewrites (content varies) -- Skips requests with auth subrequest headers (per-user content) -- Cache key: `{host}{path}?{query}` - -### SSH Passthrough - -Raw TCP proxy for SSH traffic. - -```toml -[ssh] -listen = "0.0.0.0:22" -backend = "gitea-ssh.devtools.svc.cluster.local:2222" -``` - -### DDoS Detection - -KNN-based per-IP behavioral classification over sliding windows. - -```toml -[ddos] -enabled = true -model_path = "ddos_model.bin" -k = 5 -threshold = 0.6 -window_secs = 60 -window_capacity = 1000 -min_events = 10 -``` - -### Scanner Detection - -Logistic regression per-request classification with verified bot allowlist. - -```toml -[scanner] -enabled = true -model_path = "scanner_model.bin" -threshold = 0.5 -poll_interval_secs = 30 # hot-reload check interval (0 = disabled) -bot_cache_ttl_secs = 86400 # verified bot IP cache TTL - -[[scanner.allowlist]] -ua_prefix = "Googlebot" -reason = "Google crawler" -dns_suffixes = ["googlebot.com", "google.com"] -cidrs = ["66.249.64.0/19"] -``` - -### Rate Limiting - -Leaky bucket per-identity throttling. Identity resolution: `ory_kratos_session` cookie > Bearer token > client IP. - -```toml -[rate_limit] -enabled = true -eviction_interval_secs = 300 -stale_after_secs = 600 -bypass_cidrs = ["10.42.0.0/16"] - -[rate_limit.authenticated] -burst = 200 -rate = 50.0 - -[rate_limit.unauthenticated] -burst = 50 -rate = 10.0 -``` - ---- - -## Observability - -### Request IDs - -Every request gets a UUID v4 request ID. It's: -- Attached to a `tracing::info_span!` so all log lines within the request inherit it -- Forwarded upstream via `X-Request-Id` -- Returned to clients via `X-Request-Id` -- Included in audit log lines - -### Prometheus Metrics - -Served at `GET /metrics` on `metrics_port` (default 9090). - -| Metric | Type | Labels | -|--------|------|--------| -| `sunbeam_requests_total` | Counter | `method`, `host`, `status`, `backend` | -| `sunbeam_request_duration_seconds` | Histogram | — | -| `sunbeam_ddos_decisions_total` | Counter | `decision` | -| `sunbeam_scanner_decisions_total` | Counter | `decision`, `reason` | -| `sunbeam_rate_limit_decisions_total` | Counter | `decision` | -| `sunbeam_cache_status_total` | Counter | `status` | -| `sunbeam_active_connections` | Gauge | — | - -`GET /health` returns 200 for k8s probes. - -```yaml -# Prometheus scrape config -- job_name: sunbeam-proxy - static_configs: - - targets: ['sunbeam-proxy.ingress.svc.cluster.local:9090'] -``` - -### Audit Logs - -Every request produces a structured JSON log line (`target = "audit"`): - -```json -{ - "request_id": "550e8400-e29b-41d4-a716-446655440000", - "method": "GET", - "host": "docs.sunbeam.pt", - "path": "/api/v1/pages", - "query": "limit=10", - "client_ip": "203.0.113.42", - "status": 200, - "duration_ms": 23, - "content_length": 0, - "user_agent": "Mozilla/5.0 ...", - "referer": "https://docs.sunbeam.pt/", - "accept_language": "en-US", - "accept": "text/html", - "has_cookies": true, - "cf_country": "FR", - "backend": "http://docs-backend:8080", - "error": null -} -``` - -### Detection Pipeline Logs - -Each security layer emits a `target = "pipeline"` log line before acting: - -``` -layer=ddos → all HTTPS traffic (scanner training data) -layer=scanner → traffic that passed DDoS (rate-limit training data) -layer=rate_limit → traffic that passed scanner -``` - -This guarantees training pipelines always see the full traffic picture. - ---- - -## CLI Commands - -```sh -# Start the proxy server -sunbeam-proxy serve [--upgrade] - -# Train DDoS model from audit logs -sunbeam-proxy train --input logs.jsonl --output ddos_model.bin \ - [--attack-ips ips.txt] [--normal-ips ips.txt] \ - [--heuristics heuristics.toml] [--k 5] [--threshold 0.6] - -# Replay logs through detection pipeline -sunbeam-proxy replay --input logs.jsonl --model ddos_model.bin \ - [--config config.toml] [--rate-limit] - -# Train scanner model -sunbeam-proxy train-scanner --input logs.jsonl --output scanner_model.bin \ - [--wordlists path/to/wordlists] [--threshold 0.5] -``` - ---- - -## Architecture - -### Source Files - -``` -src/main.rs — server bootstrap, watcher spawn, SSH spawn -src/lib.rs — library crate root -src/config.rs — TOML config deserialization -src/proxy.rs — ProxyHttp impl: routing, filtering, caching, logging -src/acme.rs — Ingress watcher for ACME HTTP-01 challenges -src/watcher.rs — Secret/ConfigMap watcher for cert + config hot-reload -src/cert.rs — K8s Secret → cert files on disk -src/telemetry.rs — JSON logging + OTEL tracing init -src/ssh.rs — TCP proxy for SSH passthrough -src/metrics.rs — Prometheus metrics and scrape endpoint -src/static_files.rs — Static file serving with try_files chain -src/cache.rs — pingora-cache MemCache backend -src/ddos/ — KNN-based DDoS detection -src/scanner/ — Logistic regression scanner detection -src/rate_limit/ — Leaky bucket rate limiter -src/dual_stack.rs — Dual-stack (IPv4+IPv6) TCP listener -``` - -### Runtime Model - -Pingora manages its own async runtime. K8s watchers (cert/config, Ingress) each run on separate OS threads with their own tokio runtimes. This isolation is deliberate — Pingora's internal runtime has specific constraints that don't mix with general-purpose async work. - -### Security Pipeline - -``` -Request - │ - ├── DDoS detection (KNN per-IP) - │ └── blocked → 429 - │ - ├── Scanner detection (logistic regression per-request) - │ └── blocked → 403 - │ - ├── Rate limiting (leaky bucket per-identity) - │ └── blocked → 429 - │ - ├── Cache lookup - │ └── hit → serve cached response - │ - └── Upstream request - ├── Auth subrequest (if configured) - ├── Response body rewriting (if configured) - └── Response to client -``` diff --git a/models/.gitkeep b/models/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/scripts/convert_csic.py b/scripts/convert_csic.py index a3dcd8f..278d7a3 100755 --- a/scripts/convert_csic.py +++ b/scripts/convert_csic.py @@ -7,7 +7,7 @@ Label is determined by which file it came from (normal vs anomalous). Usage: # Download the dataset first: - git clone https://github.com/msudol/Web-Application-Attack-Datasets.git /tmp/csic + git clone https://src.sunbeam.pt/studio/csic-dataset.git /tmp/csic # Convert all three files: python3 scripts/convert_csic.py \ @@ -20,8 +20,9 @@ Usage: # Merge with production logs: cat logs.jsonl csic_converted.jsonl > combined.jsonl - # Train: + # Train (or just use --csic flag which does this automatically): cargo run -- train-scanner --input combined.jsonl --output scanner_model.bin + # Simpler: cargo run -- train-scanner --input logs.jsonl --output scanner_model.bin --csic """ import argparse diff --git a/src/acme.rs b/src/acme.rs index 3e46bfd..afe39c5 100644 --- a/src/acme.rs +++ b/src/acme.rs @@ -6,7 +6,7 @@ use std::{collections::HashMap, sync::{Arc, RwLock}}; /// Maps a challenge path to the backend address that can answer it. /// /// Key: `/.well-known/acme-challenge/` -/// Value: `cm-acme-http-solver-.ingress.svc.cluster.local:8089` +/// Value: `cm-acme-http-solver-..svc.cluster.local:8089` /// /// cert-manager creates one Ingress per challenge domain with exactly this /// path and backend. Our proxy consults this table to route each challenge @@ -18,15 +18,15 @@ use std::{collections::HashMap, sync::{Arc, RwLock}}; /// can be written from the watcher runtime without cross-runtime waker issues. pub type AcmeRoutes = Arc>>; -/// Watch Ingress objects in the ingress namespace and maintain `routes`. +/// Watch Ingress objects and maintain `routes`. /// /// cert-manager creates an Ingress for each HTTP-01 challenge it manages. /// The Ingress contains a path rule for `/.well-known/acme-challenge/` /// pointing to a per-challenge solver Service. We populate the route table /// from these rules so the proxy can forward each challenge token to the /// correct solver pod without the nondeterminism of a shared stable Service. -pub async fn watch_ingresses(client: Client, routes: AcmeRoutes) { - let api: Api = Api::namespaced(client, "ingress"); +pub async fn watch_ingresses(client: Client, namespace: String, routes: AcmeRoutes) { + let api: Api = Api::namespaced(client, &namespace); // Verify Ingress API access before entering the watch loop. A failure here // almost always means cert-manager is not installed or RBAC is wrong. @@ -43,12 +43,9 @@ pub async fn watch_ingresses(client: Client, routes: AcmeRoutes) { while let Some(result) = stream.next().await { match result { - // InitApply fires for each Ingress during the initial list (kube v3+). - // Apply fires for subsequent creates/updates. - // Both must be handled to catch Ingresses that existed before the proxy started. Ok(watcher::Event::InitApply(ing)) | Ok(watcher::Event::Apply(ing)) => { let mut map = routes.write().unwrap_or_else(|e| e.into_inner()); - upsert_routes(&ing, &mut map); + upsert_routes(&ing, &namespace, &mut map); } Ok(watcher::Event::Delete(ing)) => { let mut map = routes.write().unwrap_or_else(|e| e.into_inner()); @@ -63,7 +60,7 @@ pub async fn watch_ingresses(client: Client, routes: AcmeRoutes) { } } -fn upsert_routes(ingress: &Ingress, map: &mut HashMap) { +fn upsert_routes(ingress: &Ingress, namespace: &str, map: &mut HashMap) { let Some(spec) = &ingress.spec else { return }; for rule in spec.rules.as_deref().unwrap_or(&[]) { let Some(http) = &rule.http else { continue }; @@ -75,7 +72,7 @@ fn upsert_routes(ingress: &Ingress, map: &mut HashMap) { let Some(svc) = p.backend.service.as_ref() else { continue }; let Some(port) = svc.port.as_ref().and_then(|p| p.number) else { continue }; let backend = format!( - "{}.ingress.svc.cluster.local:{port}", + "{}.{namespace}.svc.cluster.local:{port}", svc.name ); tracing::debug!(path, %backend, "added ACME challenge route"); diff --git a/src/cert.rs b/src/cert.rs index 95f328b..7e212e6 100644 --- a/src/cert.rs +++ b/src/cert.rs @@ -2,18 +2,23 @@ use anyhow::{Context, Result}; use k8s_openapi::api::core::v1::Secret; use kube::{Api, Client}; -/// Fetch the `pingora-tls` Secret from the ingress namespace and write -/// `tls.crt` / `tls.key` to the paths declared in config.toml. +/// Fetch the TLS Secret and write `tls.crt` / `tls.key` to the configured paths. /// /// Called at startup (non-upgrade) so the proxy never depends on kubelet /// volume-sync timing: the cert files are written directly from the K8s API /// before `svc.add_tls()` is called. -pub async fn fetch_and_write(client: &Client, cert_path: &str, key_path: &str) -> Result<()> { - let api: Api = Api::namespaced(client.clone(), "ingress"); +pub async fn fetch_and_write( + client: &Client, + namespace: &str, + secret_name: &str, + cert_path: &str, + key_path: &str, +) -> Result<()> { + let api: Api = Api::namespaced(client.clone(), namespace); let secret = api - .get("pingora-tls") + .get(secret_name) .await - .context("fetching pingora-tls Secret from K8s API")?; + .with_context(|| format!("fetching {secret_name} Secret from K8s API"))?; write_from_secret(&secret, cert_path, key_path) } @@ -27,14 +32,14 @@ pub fn write_from_secret(secret: &Secret, cert_path: &str, key_path: &str) -> Re let data = secret .data .as_ref() - .ok_or_else(|| anyhow::anyhow!("pingora-tls Secret has no data"))?; + .ok_or_else(|| anyhow::anyhow!("TLS Secret has no data"))?; let crt = data .get("tls.crt") - .ok_or_else(|| anyhow::anyhow!("pingora-tls missing tls.crt"))?; + .ok_or_else(|| anyhow::anyhow!("TLS Secret missing tls.crt"))?; let key = data .get("tls.key") - .ok_or_else(|| anyhow::anyhow!("pingora-tls missing tls.key"))?; + .ok_or_else(|| anyhow::anyhow!("TLS Secret missing tls.key"))?; // /etc/tls is an emptyDir; create it if the pod just started. if let Some(parent) = std::path::Path::new(cert_path).parent() { diff --git a/src/config.rs b/src/config.rs index 9dfef78..0498721 100644 --- a/src/config.rs +++ b/src/config.rs @@ -24,8 +24,38 @@ pub struct Config { pub rate_limit: Option, /// Optional per-request scanner detection. pub scanner: Option, + /// Kubernetes resource names and namespaces for watchers. + #[serde(default)] + pub kubernetes: KubernetesConfig, } +#[derive(Debug, Deserialize, Clone)] +pub struct KubernetesConfig { + /// Namespace where the proxy's resources live (Secret, ConfigMap, Ingresses). + #[serde(default = "default_k8s_namespace")] + pub namespace: String, + /// Name of the TLS Secret watched for cert hot-reload. + #[serde(default = "default_tls_secret")] + pub tls_secret: String, + /// Name of the ConfigMap watched for config hot-reload. + #[serde(default = "default_config_configmap")] + pub config_configmap: String, +} + +impl Default for KubernetesConfig { + fn default() -> Self { + Self { + namespace: default_k8s_namespace(), + tls_secret: default_tls_secret(), + config_configmap: default_config_configmap(), + } + } +} + +fn default_k8s_namespace() -> String { "ingress".to_string() } +fn default_tls_secret() -> String { "pingora-tls".to_string() } +fn default_config_configmap() -> String { "pingora-config".to_string() } + #[derive(Debug, Deserialize, Clone)] pub struct DDoSConfig { pub model_path: String, diff --git a/src/ddos/detector.rs b/src/ddos/detector.rs index eedeba4..711893e 100644 --- a/src/ddos/detector.rs +++ b/src/ddos/detector.rs @@ -39,6 +39,7 @@ impl DDoSDetector { /// Record an incoming request and classify the IP. /// Called from request_filter (before upstream). + #[allow(clippy::too_many_arguments)] pub fn check( &self, ip: IpAddr, diff --git a/src/ddos/features.rs b/src/ddos/features.rs index 60a2fc1..ecff1bd 100644 --- a/src/ddos/features.rs +++ b/src/ddos/features.rs @@ -70,6 +70,10 @@ impl IpState { self.events.len() } + pub fn is_empty(&self) -> bool { + self.events.is_empty() + } + /// Prune events older than `window` from the logical view. /// Returns a slice of active events (not necessarily contiguous in ring buffer, /// so we collect into a Vec). @@ -274,6 +278,12 @@ pub struct LogIpState { pub suspicious_paths: Vec, } +impl Default for LogIpState { + fn default() -> Self { + Self::new() + } +} + impl LogIpState { pub fn new() -> Self { Self { diff --git a/src/ddos/train.rs b/src/ddos/train.rs index 99d6860..d9ea323 100644 --- a/src/ddos/train.rs +++ b/src/ddos/train.rs @@ -113,7 +113,7 @@ pub fn run(args: TrainArgs) -> Result<()> { let ip = audit_log::strip_port(&entry.fields.client_ip).to_string(); let ts = parse_timestamp(&entry.timestamp); - let state = ip_states.entry(ip).or_insert_with(LogIpState::new); + let state = ip_states.entry(ip).or_default(); state.timestamps.push(ts); state.methods.push(method_to_u8(&entry.fields.method)); state.path_hashes.push(fx_hash(&entry.fields.path)); diff --git a/src/main.rs b/src/main.rs index 9deeb63..4b5497f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -33,7 +33,7 @@ enum Commands { upgrade: bool, }, /// Replay audit logs through the DDoS detector and rate limiter - Replay { + ReplayDdos { /// Path to audit log JSONL file #[arg(short, long)] input: String, @@ -60,7 +60,7 @@ enum Commands { rate_limit: bool, }, /// Train a DDoS detection model from audit logs - Train { + TrainDdos { /// Path to audit log JSONL file #[arg(short, long)] input: String, @@ -103,6 +103,9 @@ enum Commands { /// Classification threshold #[arg(long, default_value = "0.5")] threshold: f64, + /// Include CSIC 2010 dataset as base training data (downloaded from GitHub, cached locally) + #[arg(long)] + csic: bool, }, } @@ -110,7 +113,7 @@ fn main() -> Result<()> { let cli = Cli::parse(); match cli.command.unwrap_or(Commands::Serve { upgrade: false }) { Commands::Serve { upgrade } => run_serve(upgrade), - Commands::Replay { + Commands::ReplayDdos { input, model, config, @@ -129,7 +132,7 @@ fn main() -> Result<()> { min_events, rate_limit, }), - Commands::Train { + Commands::TrainDdos { input, output, attack_ips, @@ -155,11 +158,13 @@ fn main() -> Result<()> { output, wordlists, threshold, + csic, } => scanner::train::run(scanner::train::TrainScannerArgs { input, output, wordlists, threshold, + csic, }), } } @@ -309,7 +314,13 @@ fn run_serve(upgrade: bool) -> Result<()> { Ok(c) => { if !upgrade { if let Err(e) = - cert::fetch_and_write(&c, &cfg.tls.cert_path, &cfg.tls.key_path).await + cert::fetch_and_write( + &c, + &cfg.kubernetes.namespace, + &cfg.kubernetes.tls_secret, + &cfg.tls.cert_path, + &cfg.tls.key_path, + ).await { tracing::warn!(error = %e, "cert fetch from K8s failed; using existing files"); } @@ -405,6 +416,7 @@ fn run_serve(upgrade: bool) -> Result<()> { // 6. Background K8s watchers on their own OS thread + tokio runtime. if k8s_available { + let k8s_cfg = cfg.kubernetes.clone(); let cert_path = cfg.tls.cert_path.clone(); let key_path = cfg.tls.key_path.clone(); std::thread::spawn(move || { @@ -421,8 +433,19 @@ fn run_serve(upgrade: bool) -> Result<()> { } }; tokio::join!( - acme::watch_ingresses(client.clone(), acme_routes), - watcher::run_watcher(client, cert_path, key_path), + acme::watch_ingresses( + client.clone(), + k8s_cfg.namespace.clone(), + acme_routes, + ), + watcher::run_watcher( + client, + k8s_cfg.namespace, + k8s_cfg.tls_secret, + k8s_cfg.config_configmap, + cert_path, + key_path, + ), ); }); }); diff --git a/src/proxy.rs b/src/proxy.rs index 2a4bf02..45360bb 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -410,7 +410,7 @@ impl ProxyHttp for SunbeamProxy { ); metrics::SCANNER_DECISIONS - .with_label_values(&[decision, &reason]) + .with_label_values(&[decision, reason]) .inc(); if decision == "block" { @@ -447,7 +447,11 @@ impl ProxyHttp for SunbeamProxy { path = %session.req_header().uri.path(), client_ip = %ip, user_agent = session.req_header().headers.get("user-agent").and_then(|v| v.to_str().ok()).unwrap_or("-"), + content_length = session.req_header().headers.get("content-length").and_then(|v| v.to_str().ok()).unwrap_or("0"), has_cookies = cookie.is_some(), + has_referer = session.req_header().headers.get("referer").is_some(), + has_accept_language = session.req_header().headers.get("accept-language").is_some(), + accept = session.req_header().headers.get("accept").and_then(|v| v.to_str().ok()).unwrap_or("-"), "pipeline" ); diff --git a/src/scanner/allowlist.rs b/src/scanner/allowlist.rs index 964698b..f493700 100644 --- a/src/scanner/allowlist.rs +++ b/src/scanner/allowlist.rs @@ -214,7 +214,7 @@ fn verify_dns(ip: IpAddr, suffixes: &[String]) -> bool { // Step 3: forward DNS — the hostname must resolve back to our IP. match dns_lookup::lookup_host(&hostname) { - Ok(addrs) => addrs.iter().any(|a| *a == ip), + Ok(addrs) => addrs.contains(&ip), Err(_) => false, } } diff --git a/src/scanner/csic.rs b/src/scanner/csic.rs new file mode 100644 index 0000000..314a7fb --- /dev/null +++ b/src/scanner/csic.rs @@ -0,0 +1,411 @@ +//! Fetch and convert the CSIC 2010 HTTP dataset into labeled training samples. +//! +//! The CSIC 2010 dataset contains raw HTTP/1.1 requests (normal + anomalous) +//! from a web application. When `--csic` is passed to `train-scanner`, this +//! module downloads the dataset from GitHub, parses the raw HTTP requests, +//! and converts them into `AuditFields` entries with ground-truth labels. + +use crate::ddos::audit_log::AuditFields; +use anyhow::{Context, Result}; +use std::path::PathBuf; + +const REPO_BASE: &str = + "https://src.sunbeam.pt/studio/csic-dataset/raw/branch/mainline"; + +const FILES: &[(&str, &str)] = &[ + ("normalTrafficTraining.txt", "normal"), + ("normalTrafficTest.txt", "normal"), + ("anomalousTrafficTest.txt", "anomalous"), +]; + +const DEFAULT_HOSTS: &[&str] = &[ + "admin", "src", "docs", "auth", "drive", "grafana", "people", "meet", "s3", "livekit", +]; + +fn cache_dir() -> PathBuf { + let base = std::env::var("XDG_CACHE_HOME") + .map(PathBuf::from) + .unwrap_or_else(|_| { + let home = std::env::var("HOME").unwrap_or_else(|_| "/tmp".to_string()); + PathBuf::from(home).join(".cache") + }); + base.join("sunbeam").join("csic") +} + +fn download_or_cached(filename: &str) -> Result { + let dir = cache_dir(); + let path = dir.join(filename); + + if path.exists() { + eprintln!(" cached: {}", path.display()); + return std::fs::read_to_string(&path) + .with_context(|| format!("reading cached {}", path.display())); + } + + let url = format!("{REPO_BASE}/{filename}"); + eprintln!(" downloading: {url}"); + let body = reqwest::blocking::get(&url) + .with_context(|| format!("fetching {url}"))? + .error_for_status() + .with_context(|| format!("HTTP error for {url}"))? + .text() + .with_context(|| format!("reading body of {url}"))?; + + std::fs::create_dir_all(&dir)?; + std::fs::write(&path, &body)?; + Ok(body) +} + +struct ParsedRequest { + method: String, + path: String, + query: String, + user_agent: String, + has_cookies: bool, + content_length: u64, + referer: String, + accept_language: String, +} + +fn parse_csic_content(content: &str) -> Vec { + let mut requests = Vec::new(); + let mut current_lines: Vec<&str> = Vec::new(); + + for line in content.lines() { + if line.is_empty() && !current_lines.is_empty() { + if let Some(req) = parse_single_request(¤t_lines) { + requests.push(req); + } + current_lines.clear(); + } else { + current_lines.push(line); + } + } + if !current_lines.is_empty() { + if let Some(req) = parse_single_request(¤t_lines) { + requests.push(req); + } + } + requests +} + +fn parse_single_request(lines: &[&str]) -> Option { + if lines.is_empty() { + return None; + } + + let parts: Vec<&str> = lines[0].splitn(3, ' ').collect(); + if parts.len() < 2 { + return None; + } + let method = parts[0].to_string(); + let raw_url = parts[1]; + + // Extract path and query — URL may be absolute (http://localhost:8080/path?q=1) + let (path, query) = if let Some(rest) = raw_url.strip_prefix("http://") { + // Skip host portion + let after_host = rest.find('/').map(|i| &rest[i..]).unwrap_or("/"); + split_path_query(after_host) + } else if let Some(rest) = raw_url.strip_prefix("https://") { + let after_host = rest.find('/').map(|i| &rest[i..]).unwrap_or("/"); + split_path_query(after_host) + } else { + split_path_query(raw_url) + }; + + // Parse headers + let mut headers: Vec<(&str, &str)> = Vec::new(); + let mut body_start = None; + for (i, line) in lines[1..].iter().enumerate() { + if line.is_empty() { + body_start = Some(i + 2); // +2 because we started from lines[1..] + break; + } + if let Some(colon) = line.find(':') { + let key = line[..colon].trim(); + let value = line[colon + 1..].trim(); + headers.push((key, value)); + } + } + + let get_header = |name: &str| -> Option<&str> { + headers + .iter() + .find(|(k, _)| k.eq_ignore_ascii_case(name)) + .map(|(_, v)| *v) + }; + + let body_len = if let Some(start) = body_start { + if start < lines.len() { + lines[start..].iter().map(|l| l.len()).sum::() as u64 + } else { + 0 + } + } else { + 0 + }; + + let content_length = get_header("Content-Length") + .and_then(|v| v.parse().ok()) + .unwrap_or(body_len); + + Some(ParsedRequest { + method, + path, + query, + user_agent: get_header("User-Agent").unwrap_or("-").to_string(), + has_cookies: get_header("Cookie").is_some(), + content_length, + referer: get_header("Referer").unwrap_or("-").to_string(), + accept_language: get_header("Accept-Language").unwrap_or("-").to_string(), + }) +} + +fn split_path_query(url: &str) -> (String, String) { + if let Some(q) = url.find('?') { + (url[..q].to_string(), url[q + 1..].to_string()) + } else { + (url.to_string(), String::new()) + } +} + +/// Simple deterministic LCG for reproducible randomness without pulling in `rand`. +struct Rng(u64); + +impl Rng { + fn new(seed: u64) -> Self { + Self(seed) + } + fn next_u64(&mut self) -> u64 { + self.0 = self.0.wrapping_mul(6364136223846793005).wrapping_add(1442695040888963407); + self.0 + } + fn next_usize(&mut self, bound: usize) -> usize { + (self.next_u64() >> 33) as usize % bound + } + fn next_f64(&mut self) -> f64 { + (self.next_u64() >> 11) as f64 / (1u64 << 53) as f64 + } + fn choice<'a>(&mut self, items: &'a [&str]) -> &'a str { + items[self.next_usize(items.len())] + } +} + +fn to_audit_fields( + req: &ParsedRequest, + label: &str, + hosts: &[&str], + rng: &mut Rng, +) -> AuditFields { + let (host_prefix, status) = if label == "normal" { + let host = rng.choice(hosts).to_string(); + let statuses: &[u16] = &[200, 200, 200, 200, 301, 304]; + let status = statuses[rng.next_usize(statuses.len())]; + (host, status) + } else { + let host = if rng.next_f64() < 0.7 { + let unknown: &[&str] = &["unknown", "scanner", "probe", "test"]; + rng.choice(unknown).to_string() + } else { + rng.choice(hosts).to_string() + }; + let statuses: &[u16] = &[404, 404, 404, 400, 403, 500]; + let status = statuses[rng.next_usize(statuses.len())]; + (host, status) + }; + + let host = format!("{host_prefix}.sunbeam.pt"); + + // For anomalous samples, simulate real scanner behavior: + // strip cookies/referer/accept-language that CSIC attacks have from their session. + let (has_cookies, referer, accept_language, user_agent) = if label != "normal" { + let referer = None; + let accept_language = if rng.next_f64() < 0.8 { + None + } else { + Some(req.accept_language.clone()).filter(|a| a != "-") + }; + let r = rng.next_f64(); + let user_agent = if r < 0.15 { + String::new() + } else if r < 0.25 { + "curl/7.68.0".to_string() + } else if r < 0.35 { + "python-requests/2.28.0".to_string() + } else if r < 0.40 { + "Go-http-client/1.1".to_string() + } else { + req.user_agent.clone() + }; + (false, referer, accept_language, user_agent) + } else { + ( + req.has_cookies, + Some(req.referer.clone()).filter(|r| r != "-"), + Some(req.accept_language.clone()).filter(|a| a != "-"), + req.user_agent.clone(), + ) + }; + + AuditFields { + method: req.method.clone(), + host, + path: req.path.clone(), + query: req.query.clone(), + client_ip: format!( + "{}.{}.{}.{}", + rng.next_usize(223) + 1, + rng.next_usize(256), + rng.next_usize(256), + rng.next_usize(254) + 1, + ), + status, + duration_ms: rng.next_usize(50) as u64 + 1, + content_length: req.content_length, + user_agent, + has_cookies: Some(has_cookies), + referer, + accept_language, + backend: if label == "normal" { + format!("{host_prefix}-svc:8080") + } else { + "-".to_string() + }, + label: Some( + if label == "normal" { "normal" } else { "attack" }.to_string(), + ), + } +} + +/// Download (or use cached) CSIC 2010 dataset, parse raw HTTP requests, +/// and convert into labeled `AuditFields` entries ready for scanner training. +pub fn fetch_csic_dataset() -> Result> { + eprintln!("fetching CSIC 2010 dataset..."); + + let mut rng = Rng::new(42); + let mut all_entries: Vec<(AuditFields, String)> = Vec::new(); + + for (filename, label) in FILES { + let content = download_or_cached(filename)?; + let requests = parse_csic_content(&content); + eprintln!(" parsed {} {label} requests from {filename}", requests.len()); + + for req in &requests { + let fields = to_audit_fields(req, label, DEFAULT_HOSTS, &mut rng); + let host_prefix = fields.host.split('.').next().unwrap_or("").to_string(); + all_entries.push((fields, host_prefix)); + } + } + + // Shuffle to interleave normal/attack + let n = all_entries.len(); + for i in (1..n).rev() { + let j = rng.next_usize(i + 1); + all_entries.swap(i, j); + } + + eprintln!( + "CSIC total: {} ({} normal, {} attack)", + all_entries.len(), + all_entries.iter().filter(|(f, _)| f.label.as_deref() == Some("normal")).count(), + all_entries.iter().filter(|(f, _)| f.label.as_deref() == Some("attack")).count(), + ); + + Ok(all_entries) +} + +/// Check if cached CSIC files exist. +pub fn csic_is_cached() -> bool { + let dir = cache_dir(); + FILES.iter().all(|(f, _)| dir.join(f).exists()) +} + +/// Return the cache directory path for display. +pub fn csic_cache_path() -> PathBuf { + cache_dir() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_single_http_request() { + let lines = vec![ + "GET /index.html HTTP/1.1", + "Host: localhost:8080", + "User-Agent: Mozilla/5.0", + "Cookie: session=abc", + "Accept: text/html", + ]; + let req = parse_single_request(&lines).unwrap(); + assert_eq!(req.method, "GET"); + assert_eq!(req.path, "/index.html"); + assert!(req.has_cookies); + assert_eq!(req.user_agent, "Mozilla/5.0"); + } + + #[test] + fn test_parse_absolute_url() { + let lines = vec!["POST http://localhost:8080/tienda1/miembros/editar.jsp?id=2 HTTP/1.1"]; + let req = parse_single_request(&lines).unwrap(); + assert_eq!(req.method, "POST"); + assert_eq!(req.path, "/tienda1/miembros/editar.jsp"); + assert_eq!(req.query, "id=2"); + } + + #[test] + fn test_parse_csic_content_multiple_requests() { + let content = "GET /page1 HTTP/1.1\nHost: localhost\n\nPOST /page2 HTTP/1.1\nHost: localhost\n\n"; + let reqs = parse_csic_content(content); + assert_eq!(reqs.len(), 2); + assert_eq!(reqs[0].method, "GET"); + assert_eq!(reqs[1].method, "POST"); + } + + #[test] + fn test_to_audit_fields_normal() { + let req = ParsedRequest { + method: "GET".to_string(), + path: "/index.html".to_string(), + query: String::new(), + user_agent: "Mozilla/5.0".to_string(), + has_cookies: true, + content_length: 100, + referer: "https://example.com".to_string(), + accept_language: "en-US".to_string(), + }; + let mut rng = Rng::new(42); + let fields = to_audit_fields(&req, "normal", DEFAULT_HOSTS, &mut rng); + assert_eq!(fields.label.as_deref(), Some("normal")); + assert!(fields.has_cookies.unwrap_or(false)); + assert!(fields.host.ends_with(".sunbeam.pt")); + } + + #[test] + fn test_to_audit_fields_anomalous_strips_cookies() { + let req = ParsedRequest { + method: "GET".to_string(), + path: "/.env".to_string(), + query: String::new(), + user_agent: "Mozilla/5.0".to_string(), + has_cookies: true, + content_length: 0, + referer: "https://example.com".to_string(), + accept_language: "en-US".to_string(), + }; + let mut rng = Rng::new(42); + let fields = to_audit_fields(&req, "anomalous", DEFAULT_HOSTS, &mut rng); + assert_eq!(fields.label.as_deref(), Some("attack")); + assert!(!fields.has_cookies.unwrap_or(true)); + } + + #[test] + fn test_rng_deterministic() { + let mut a = Rng::new(42); + let mut b = Rng::new(42); + for _ in 0..100 { + assert_eq!(a.next_u64(), b.next_u64()); + } + } +} diff --git a/src/scanner/detector.rs b/src/scanner/detector.rs index 8c79ea0..63251f2 100644 --- a/src/scanner/detector.rs +++ b/src/scanner/detector.rs @@ -50,6 +50,7 @@ impl ScannerDetector { /// Returns a verdict with the action, raw score, and reason. /// The score and reason are captured in pipeline logs so the training /// pipeline always has unfiltered data to retrain from. + #[allow(clippy::too_many_arguments)] pub fn check( &self, method: &str, @@ -107,8 +108,8 @@ impl ScannerDetector { // 3. Compute score = bias + dot(weights, features) + interaction terms let mut score = self.weights[NUM_SCANNER_FEATURES + 2]; // bias (index 14) - for i in 0..NUM_SCANNER_FEATURES { - score += self.weights[i] * f[i]; + for (i, &fi) in f.iter().enumerate().take(NUM_SCANNER_FEATURES) { + score += self.weights[i] * fi; } // Interaction: suspicious_path AND no_cookies score += self.weights[12] * f[0] * (1.0 - f[3]); diff --git a/src/scanner/features.rs b/src/scanner/features.rs index 46bab8c..3a7b528 100644 --- a/src/scanner/features.rs +++ b/src/scanner/features.rs @@ -14,6 +14,7 @@ const TRAVERSAL_PATTERNS: &[&str] = &["..", "%00", "%0a", "%27", "%3c"]; /// Extract all 12 scanner features from a single request. /// No heap allocation — all work done on references and stack buffers. +#[allow(clippy::too_many_arguments)] pub fn extract_features( method: &str, path: &str, diff --git a/src/scanner/mod.rs b/src/scanner/mod.rs index 4008f4f..bfe6192 100644 --- a/src/scanner/mod.rs +++ b/src/scanner/mod.rs @@ -1,4 +1,5 @@ pub mod allowlist; +pub mod csic; pub mod detector; pub mod features; pub mod model; diff --git a/src/scanner/train.rs b/src/scanner/train.rs index 25c9a98..96e4d24 100644 --- a/src/scanner/train.rs +++ b/src/scanner/train.rs @@ -14,6 +14,7 @@ pub struct TrainScannerArgs { pub output: String, pub wordlists: Option, pub threshold: f64, + pub csic: bool, } /// Default suspicious fragments — matches the DDoS feature list plus extras. @@ -135,6 +136,54 @@ pub fn run(args: TrainScannerArgs) -> Result<()> { } } + // 1b. Optionally fetch CSIC 2010 dataset and add labeled entries + if args.csic { + let csic_entries = crate::scanner::csic::fetch_csic_dataset()?; + for (_, host_prefix) in &csic_entries { + log_hosts.insert(fx_hash_bytes(host_prefix.as_bytes())); + } + for (fields, host_prefix) in &csic_entries { + let has_cookies = fields.has_cookies.unwrap_or(false); + let has_referer = fields + .referer + .as_ref() + .map(|r| r != "-" && !r.is_empty()) + .unwrap_or(false); + let has_accept_language = fields + .accept_language + .as_ref() + .map(|a| a != "-" && !a.is_empty()) + .unwrap_or(false); + + let feats = features::extract_features( + &fields.method, + &fields.path, + host_prefix, + has_cookies, + has_referer, + has_accept_language, + "-", + &fields.user_agent, + fields.content_length, + &fragment_hashes, + &extension_hashes, + &log_hosts, + ); + + // CSIC entries always have a ground-truth label. + let label = match fields.label.as_deref() { + Some("attack" | "anomalous") => 1.0, + Some("normal") => 0.0, + _ => continue, + }; + + samples.push(LabeledSample { + features: feats, + label, + }); + } + } + let log_sample_count = samples.len(); let log_attack_count = samples.iter().filter(|s| s.label > 0.5).count(); eprintln!( @@ -408,6 +457,7 @@ fn train_logistic_regression( weights } +#[allow(clippy::too_many_arguments)] fn label_request( path: &str, has_cookies: bool, diff --git a/src/ssh.rs b/src/ssh.rs index a7c7fd7..b23b594 100644 --- a/src/ssh.rs +++ b/src/ssh.rs @@ -11,12 +11,12 @@ pub async fn run_tcp_proxy(listen: &str, backend: &str) { let ipv6_addr = if listen.starts_with('[') { listen.to_string() } else { - format!("[::]:{}", listen.split(':').last().unwrap_or("22")) + format!("[::]:{}", listen.split(':').next_back().unwrap_or("22")) }; let ipv4_addr = if listen.contains(':') { // Extract port from the original address - let port = listen.split(':').last().unwrap_or("22"); + let port = listen.split(':').next_back().unwrap_or("22"); format!("0.0.0.0:{}", port) } else { "0.0.0.0:22".to_string() diff --git a/src/watcher.rs b/src/watcher.rs index 719ac74..3f66fed 100644 --- a/src/watcher.rs +++ b/src/watcher.rs @@ -3,7 +3,7 @@ use k8s_openapi::api::core::v1::{ConfigMap, Secret}; use kube::{runtime::watcher, Api, Client}; use tokio::sync::mpsc; -/// Watch `pingora-tls` and `pingora-config` in the ingress namespace. +/// Watch the TLS Secret and config ConfigMap for changes. /// /// On cert change: write new cert bytes from the Apply event directly to the /// configured paths (avoiding kubelet volume-sync delay), then trigger a @@ -15,14 +15,21 @@ use tokio::sync::mpsc; /// /// No-ops when no K8s client is available (e.g. ad-hoc local runs outside a /// cluster) so the binary works in both environments. -pub async fn run_watcher(client: Client, cert_path: String, key_path: String) { +pub async fn run_watcher( + client: Client, + namespace: String, + tls_secret: String, + config_configmap: String, + cert_path: String, + key_path: String, +) { let (tx, mut rx) = mpsc::channel::<()>(2); - let secret_api: Api = Api::namespaced(client.clone(), "ingress"); - let cm_api: Api = Api::namespaced(client.clone(), "ingress"); + let secret_api: Api = Api::namespaced(client.clone(), &namespace); + let cm_api: Api = Api::namespaced(client.clone(), &namespace); - tokio::spawn(watch_secret(secret_api, cert_path, key_path, tx.clone())); - tokio::spawn(watch_configmap(cm_api, tx)); + tokio::spawn(watch_secret(secret_api, tls_secret, cert_path, key_path, tx.clone())); + tokio::spawn(watch_configmap(cm_api, config_configmap, tx)); if rx.recv().await.is_some() { tracing::info!("initiating graceful upgrade"); @@ -32,11 +39,13 @@ pub async fn run_watcher(client: Client, cert_path: String, key_path: String) { async fn watch_secret( api: Api, + secret_name: String, cert_path: String, key_path: String, tx: mpsc::Sender<()>, ) { - let cfg = watcher::Config::default().fields("metadata.name=pingora-tls"); + let field_selector = format!("metadata.name={secret_name}"); + let cfg = watcher::Config::default().fields(&field_selector); let mut stream = Box::pin(watcher(api, cfg)); let mut initialized = false; @@ -44,14 +53,10 @@ async fn watch_secret( match result { Ok(watcher::Event::InitDone) => { initialized = true; - tracing::debug!("pingora-tls watcher ready"); + tracing::debug!(%secret_name, "TLS secret watcher ready"); } - // Write the new cert directly from the event object before triggering the - // upgrade. The Apply event carries the full updated Secret, so we don't - // need a separate API call and the cert files are ready before the new - // process's svc.add_tls() runs. Ok(watcher::Event::Apply(secret)) if initialized => { - tracing::info!("pingora-tls changed — writing new cert"); + tracing::info!(%secret_name, "TLS secret changed — writing new cert"); match crate::cert::write_from_secret(&secret, &cert_path, &key_path) { Ok(()) => { let _ = tx.send(()).await; @@ -69,8 +74,9 @@ async fn watch_secret( } } -async fn watch_configmap(api: Api, tx: mpsc::Sender<()>) { - let cfg = watcher::Config::default().fields("metadata.name=pingora-config"); +async fn watch_configmap(api: Api, configmap_name: String, tx: mpsc::Sender<()>) { + let field_selector = format!("metadata.name={configmap_name}"); + let cfg = watcher::Config::default().fields(&field_selector); let mut stream = Box::pin(watcher(api, cfg)); let mut initialized = false; @@ -78,10 +84,10 @@ async fn watch_configmap(api: Api, tx: mpsc::Sender<()>) { match result { Ok(watcher::Event::InitDone) => { initialized = true; - tracing::debug!("pingora-config watcher ready"); + tracing::debug!(%configmap_name, "config watcher ready"); } Ok(watcher::Event::Apply(_)) if initialized => { - tracing::info!("pingora-config changed — triggering upgrade"); + tracing::info!(%configmap_name, "config changed — triggering upgrade"); let _ = tx.send(()).await; return; }