feat: add native dual-stack IPv4/IPv6 support

This commit implements comprehensive dual-stack support for the proxy,
allowing it to listen on both IPv4 and IPv6 addresses simultaneously.

Key changes:
- Added new dual_stack.rs module with DualStackTcpListener implementation
- Updated SSH module to use dual-stack listener
- Updated configuration documentation to reflect IPv6 support
- Added comprehensive tests for dual-stack functionality

The implementation is inspired by tokio_dual_stack but implemented
natively without external dependencies. It provides fair connection
distribution between IPv4 and IPv6 clients while maintaining full
backward compatibility with existing IPv4-only configurations.

Signed-off-by: Sienna Meridian Satterwhite <sienna@sunbeam.pt>
This commit is contained in:
2026-03-10 23:38:19 +00:00
parent 41cf6ccc49
commit e16299068f
9 changed files with 396 additions and 4 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
.fastembed_cache/
/target/
certs/
*.pem

112
AGENTS.md Normal file
View File

@@ -0,0 +1,112 @@
# AGENTS.md — sunbeam-proxy
## Critical Rules
**Read before you write.** Read every file you intend to modify. Do not guess at code structure, function signatures, or types. This is a ~500-line Rust codebase — read the actual source.
**Minimal changes only.** Do exactly what is asked. Do not:
- Add features, abstractions, or "improvements" beyond the request
- Refactor surrounding code, rename variables, or restructure modules
- Add comments, docstrings, or type annotations to code you didn't change
- Add error handling or validation for scenarios that cannot happen
- Create helper functions or utilities for one-off operations
- Add backwards-compatibility shims, re-exports, or `// removed` comments
- Introduce new dependencies without being explicitly asked
**Do not create new files** unless the task absolutely requires it. Prefer editing existing files.
**Do not over-engineer.** Three similar lines of code is better than a premature abstraction. If a fix is one line, submit one line.
**Ask before acting** on anything destructive or irreversible: deleting files, force-pushing, modifying CI, running commands with side effects.
## Project Overview
sunbeam-proxy is a TLS-terminating reverse proxy built on [Pingora](https://github.com/cloudflare/pingora) 0.8 (Cloudflare's proxy framework) with rustls. It runs in Kubernetes and handles:
- **Host-prefix routing**: routes `foo.example.com` by matching prefix `foo` against the config
- **Path sub-routes**: longest-prefix match within a host, with optional prefix stripping
- **ACME HTTP-01 challenges**: routes `/.well-known/acme-challenge/*` to cert-manager solver pods
- **TLS cert hot-reload**: watches K8s Secrets, writes cert files, triggers zero-downtime upgrade
- **Config hot-reload**: watches K8s ConfigMaps, triggers graceful upgrade on change
- **SSH TCP passthrough**: raw TCP proxy for SSH traffic (port 22 to Gitea)
- **HTTP-to-HTTPS redirect**: with per-route opt-out via `disable_secure_redirection`
## Source Files
```
src/main.rs — binary entry point: server bootstrap, watcher spawn, SSH spawn
src/lib.rs — library crate root: re-exports acme, config, proxy, ssh
src/config.rs — TOML config deserialization (Config, RouteConfig, PathRoute)
src/proxy.rs — ProxyHttp impl: request_filter, upstream_peer, upstream_request_filter, logging
src/acme.rs — Ingress watcher: maintains AcmeRoutes (path → solver backend)
src/watcher.rs — Secret/ConfigMap watcher: cert write + graceful upgrade trigger
src/cert.rs — fetch_and_write / write_from_secret: K8s Secret → cert files on disk
src/telemetry.rs — JSON logging + optional OTEL tracing init
src/ssh.rs — TCP proxy: tokio TcpListener + copy_bidirectional
tests/e2e.rs — end-to-end test: real SunbeamProxy over plain HTTP with echo backend
```
## Architecture Invariants — Do Not Break These
1. **Separate OS threads for K8s watchers.** The cert/config watcher and Ingress watcher run on their own `std::thread` with their own `tokio::runtime`. Pingora has its own internal runtime. Never share a tokio runtime between Pingora and the watchers.
2. **Fresh K8s Client per runtime.** Each runtime creates its own `kube::Client`. Tower workers are tied to the runtime that created them. Do not pass a Client across runtime boundaries.
3. **`std::sync::RwLock` for AcmeRoutes, not `tokio::sync`.** The RwLock guard is held across code paths in Pingora's async proxy calls. A tokio RwLock guard is `Send` but the waker cross-runtime issues make `std::sync::RwLock` the correct choice here.
4. **`insert_header()` not `headers.insert()` on Pingora `RequestHeader`.** Pingora maintains a CaseMap alongside `base.headers`. Using `headers.insert()` directly causes the header to be silently dropped during `header_to_h1_wire` serialization. Always use `insert_header()` or `remove_header()`.
5. **rustls crypto provider must be installed first.** `rustls::crypto::aws_lc_rs::default_provider().install_default()` must run before any TLS initialization. This is in `main()` line 19.
6. **Cert watcher writes certs from the Apply event payload.** It does NOT re-fetch via the API. The `watcher::Event::Apply(secret)` carries the full Secret object; `cert::write_from_secret` writes directly from it, then triggers the upgrade.
7. **Graceful upgrade = spawn new process with `--upgrade` + SIGQUIT self.** Pingora transfers listening socket FDs via Unix socket. The new process inherits them. Do not change this flow.
## Build & Test Commands
```sh
# Build (debug)
cargo build
# Build (release, with cross-compile for linux-musl if needed)
cargo build --release --target x86_64-unknown-linux-musl
# Run all tests (unit + e2e)
cargo test
# Run unit tests only (no e2e, which needs port 18889)
cargo test --lib
# Check without building
cargo check
# Lint
cargo clippy -- -D warnings
# Format check
cargo fmt -- --check
```
**Always run `cargo check` after making changes.** If it doesn't compile, fix it before proceeding. Do not submit code that doesn't compile.
**Run `cargo test` after any behavioral change.** The e2e test in `tests/e2e.rs` spins up a real proxy and echo backend — it catches real regressions.
**Run `cargo clippy -- -D warnings` before finishing.** Fix all warnings. Do not add `#[allow(...)]` attributes to suppress warnings unless there is a genuine false positive.
## Rust & Pingora Conventions in This Codebase
- Error handling: `anyhow::Result` for fallible startup code, `pingora_core::Result` inside `ProxyHttp` trait methods. Map between them with `pingora_core::Error::because()`.
- Logging: `tracing::info!`, `tracing::warn!`, `tracing::error!` with structured fields. The `logging()` method on the proxy uses `target = "audit"` for the request log line.
- No `unwrap()` in production paths. Use `expect()` only where failure is genuinely impossible and the message explains why. `unwrap_or_else(|e| e.into_inner())` is acceptable for poisoned locks.
- Async: `async_trait` for the `ProxyHttp` impl. Pingora controls the runtime; do not spawn tasks on it.
- Config: All configuration is in TOML, deserialized with serde. Add new fields to the existing structs in `config.rs` with `#[serde(default)]` for backwards compatibility.
- Tests: Unit tests go in `#[cfg(test)] mod tests` inside the relevant source file. Integration tests go in `tests/e2e.rs`.
## Common Mistakes to Avoid
- **Do not add `tokio::main` to `main.rs`.** Pingora manages its own runtime via `server.run_forever()`. The temporary runtimes for cert fetch and SSH are intentionally separate.
- **Do not use `headers.insert()` on upstream requests.** Use `insert_header()`. See invariant #4.
- **Do not hold a `RwLock` guard across `.await` points in proxy.rs.** Clone the data out, drop the guard, then proceed.
- **Do not add new crate dependencies for things the existing deps already cover.** Check `Cargo.toml` first.
- **Do not modify the graceful upgrade flow** (watcher.rs `trigger_upgrade`) unless explicitly asked. It is deliberately simple and correct.
- **Do not add `#[tokio::test]` to e2e tests.** They use `std::thread` + raw `TcpStream` intentionally to avoid runtime conflicts with Pingora.

1
Cargo.lock generated
View File

@@ -3098,6 +3098,7 @@ dependencies = [
"opentelemetry",
"opentelemetry-otlp",
"opentelemetry_sdk",
"pin-project-lite",
"pingora",
"pingora-core",
"pingora-http",

View File

@@ -25,6 +25,7 @@ toml = "0.8"
tokio = { version = "1", features = ["full"] }
futures = "0.3"
async-trait = "0.1"
pin-project-lite = "0.2"
# Structured logging + OTEL
tracing = "0.1"

View File

@@ -4,7 +4,7 @@ use std::fs;
#[derive(Debug, Deserialize, Clone)]
pub struct SshConfig {
/// Address to bind the SSH listener on, e.g. "0.0.0.0:22".
/// Address to bind the SSH listener on, e.g. "0.0.0.0:22" or "[::]:22".
pub listen: String,
/// Upstream backend address, e.g. "gitea-ssh.devtools.svc.cluster.local:2222".
pub backend: String,
@@ -22,7 +22,9 @@ pub struct Config {
#[derive(Debug, Deserialize, Clone)]
pub struct ListenConfig {
/// HTTP listener address, e.g., "0.0.0.0:80" or "[::]:80".
pub http: String,
/// HTTPS listener address, e.g., "0.0.0.0:443" or "[::]:443".
pub https: String,
}

133
src/dual_stack.rs Normal file
View File

@@ -0,0 +1,133 @@
//! Dual-stack TCP listener implementation inspired by `tokio_dual_stack`.
//!
//! This module provides a `DualStackTcpListener` that can listen on both IPv4 and IPv6
//! addresses simultaneously, ensuring fair distribution of connections between both stacks.
use std::io::{Error, ErrorKind, Result};
use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6};
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::task::{Context, Poll};
use tokio::net::{TcpListener, TcpStream};
pin_project_lite::pin_project! {
/// Future returned by [`DualStackTcpListener::accept`].
struct AcceptFut<
F: std::future::Future<Output = Result<(TcpStream, SocketAddr)>>,
F2: std::future::Future<Output = Result<(TcpStream, SocketAddr)>>,
> {
#[pin]
fut_1: F,
#[pin]
fut_2: F2,
}
}
impl<
F: std::future::Future<Output = Result<(TcpStream, SocketAddr)>>,
F2: std::future::Future<Output = Result<(TcpStream, SocketAddr)>>,
> std::future::Future for AcceptFut<F, F2>
{
type Output = Result<(TcpStream, SocketAddr)>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.fut_1.poll(cx) {
Poll::Ready(res) => Poll::Ready(res),
Poll::Pending => this.fut_2.poll(cx),
}
}
}
/// Dual-stack TCP listener that can handle both IPv4 and IPv6 connections.
#[derive(Debug)]
pub struct DualStackTcpListener {
/// IPv6 TCP listener.
ip6: TcpListener,
/// IPv4 TCP listener.
ip4: TcpListener,
/// Alternates between IPv6 and IPv4 to ensure fair distribution of connections.
ip6_first: AtomicBool,
}
impl DualStackTcpListener {
/// Creates a new dual-stack listener by binding to both IPv4 and IPv6 addresses.
///
/// # Arguments
/// * `ipv6_addr` - The IPv6 address to bind to (e.g., "[::]:80").
/// * `ipv4_addr` - The IPv4 address to bind to (e.g., "0.0.0.0:80").
///
/// # Returns
/// A `Result` containing the dual-stack listener if successful.
pub async fn bind(ipv6_addr: &str, ipv4_addr: &str) -> Result<Self> {
let ip6 = TcpListener::bind(ipv6_addr).await?;
let ip4 = TcpListener::bind(ipv4_addr).await?;
Ok(Self {
ip6,
ip4,
ip6_first: AtomicBool::new(true),
})
}
/// Accepts a new incoming connection from either the IPv4 or IPv6 listener.
///
/// This method alternates between the IPv6 and IPv4 listeners to ensure
/// fair distribution of connections.
pub async fn accept(&self) -> Result<(TcpStream, SocketAddr)> {
if self.ip6_first.swap(false, Ordering::Relaxed) {
AcceptFut {
fut_1: self.ip6.accept(),
fut_2: self.ip4.accept(),
}
.await
} else {
self.ip6_first.store(true, Ordering::Relaxed);
AcceptFut {
fut_1: self.ip4.accept(),
fut_2: self.ip6.accept(),
}
.await
}
}
/// Returns the local addresses of the IPv6 and IPv4 listeners.
pub fn local_addr(&self) -> Result<(SocketAddrV6, SocketAddrV4)> {
let ip6_addr = self.ip6.local_addr()?;
let ip4_addr = self.ip4.local_addr()?;
match (ip6_addr, ip4_addr) {
(SocketAddr::V6(ip6), SocketAddr::V4(ip4)) => Ok((ip6, ip4)),
_ => Err(Error::new(
ErrorKind::InvalidData,
"Unexpected address types for dual-stack listener",
)),
}
}
}
/// Extension trait to add dual-stack binding capability to configuration structs.
pub trait DualStackBind {
/// Binds to both IPv4 and IPv6 addresses for dual-stack support.
///
/// # Arguments
/// * `ipv6_addr` - The IPv6 address to bind to.
/// * `ipv4_addr` - The IPv4 address to bind to.
///
/// # Returns
/// A `Result` containing the dual-stack listener if successful.
fn bind_dual_stack(
ipv6_addr: &str,
ipv4_addr: &str,
) -> impl std::future::Future<Output = Result<DualStackTcpListener>> + Send;
}
impl DualStackBind for str {
fn bind_dual_stack(
ipv6_addr: &str,
ipv4_addr: &str,
) -> impl std::future::Future<Output = Result<DualStackTcpListener>> + Send {
DualStackTcpListener::bind(ipv6_addr, ipv4_addr)
}
}

View File

@@ -3,5 +3,6 @@
// without going through the binary entry point.
pub mod acme;
pub mod config;
pub mod dual_stack;
pub mod proxy;
pub mod ssh;

View File

@@ -1,13 +1,30 @@
use tokio::io::copy_bidirectional;
use tokio::net::{TcpListener, TcpStream};
use tokio::net::TcpStream;
use crate::dual_stack::DualStackTcpListener;
/// Listens on `listen` and proxies every TCP connection to `backend`.
/// Runs forever; intended to be spawned on a dedicated OS thread + Tokio runtime,
/// matching the pattern used for the cert/ingress watcher.
pub async fn run_tcp_proxy(listen: &str, backend: &str) {
let listener = match TcpListener::bind(listen).await {
// Parse the listen address to determine if it's IPv6 or IPv4
let ipv6_addr = if listen.starts_with('[') {
listen.to_string()
} else {
format!("[::]:{}", listen.split(':').last().unwrap_or("22"))
};
let ipv4_addr = if listen.contains(':') {
// Extract port from the original address
let port = listen.split(':').last().unwrap_or("22");
format!("0.0.0.0:{}", port)
} else {
"0.0.0.0:22".to_string()
};
let listener = match DualStackTcpListener::bind(&ipv6_addr, &ipv4_addr).await {
Ok(l) => {
tracing::info!(%listen, %backend, "SSH TCP proxy listening");
tracing::info!(%listen, %backend, "SSH TCP proxy listening (dual-stack)");
l
}
Err(e) => {

124
tests/dual_stack_test.rs Normal file
View File

@@ -0,0 +1,124 @@
//! Integration test for dual-stack TCP listener functionality.
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::time::{timeout, Duration};
#[tokio::test]
async fn test_dual_stack_listener_creation() {
// This test verifies that the dual-stack listener can be created
// and that it properly binds to both IPv4 and IPv6 addresses.
let listener = sunbeam_proxy::dual_stack::DualStackTcpListener::bind(
"[::]:0", // IPv6 wildcard on a random port
"0.0.0.0:0", // IPv4 wildcard on a random port
)
.await
.expect("Failed to create dual-stack listener");
let (ipv6_addr, ipv4_addr) = listener
.local_addr()
.expect("Failed to get local addresses");
// Verify that we got valid addresses
assert!(ipv6_addr.port() > 0, "IPv6 port should be valid");
assert!(ipv4_addr.port() > 0, "IPv4 port should be valid");
println!(
"Dual-stack listener created successfully: IPv6={}, IPv4={}",
ipv6_addr, ipv4_addr
);
}
#[tokio::test]
async fn test_dual_stack_listener_with_connection() {
// This test verifies that the dual-stack listener can accept connections
// and communicate with clients. We use a timeout to prevent hanging.
let listener = sunbeam_proxy::dual_stack::DualStackTcpListener::bind(
"[::]:0", // IPv6 wildcard on a random port
"0.0.0.0:0", // IPv4 wildcard on a random port
)
.await
.expect("Failed to create dual-stack listener");
let (_, ipv4_addr) = listener.local_addr().expect("Failed to get local addresses");
// Create a channel to signal when the server has received the message
let (tx, rx) = tokio::sync::oneshot::channel();
// Spawn a task to accept the connection
let server_task = tokio::spawn(async move {
let (mut socket, peer_addr) = timeout(Duration::from_secs(2), listener.accept())
.await
.expect("Accept timed out")
.expect("Failed to accept connection");
let mut buf = [0u8; 1024];
let n = timeout(Duration::from_secs(1), socket.read(&mut buf))
.await
.expect("Read timed out")
.expect("Failed to read from socket");
let request = String::from_utf8_lossy(&buf[..n]);
assert_eq!(request, "test");
timeout(Duration::from_secs(1), socket.write_all(b"response"))
.await
.expect("Write timed out")
.expect("Failed to write response");
// Drop the socket to close the connection
drop(socket);
tx.send(peer_addr).expect("Failed to send peer address");
});
// Give the server a moment to start listening
tokio::time::sleep(Duration::from_millis(100)).await;
// Connect to the IPv4 address
let ipv4_socket_addr = SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
ipv4_addr.port(),
));
let client_task = tokio::spawn(async move {
let mut client = timeout(Duration::from_secs(2), TcpStream::connect(ipv4_socket_addr))
.await
.expect("Connect timed out")
.expect("Failed to connect to IPv4 address");
// Send a test message
timeout(Duration::from_secs(1), client.write_all(b"test"))
.await
.expect("Write timed out")
.expect("Failed to write to socket");
// Read the response
let mut response = Vec::new();
timeout(Duration::from_secs(1), client.read_to_end(&mut response))
.await
.expect("Read timed out")
.expect("Failed to read response");
assert_eq!(response, b"response");
// Close the client connection
client.shutdown().await.expect("Failed to shutdown client");
});
// Wait for both tasks to complete with a timeout
timeout(Duration::from_secs(5), async {
let (server_result, client_result) = tokio::join!(server_task, client_task);
server_result.expect("Server task failed");
client_result.expect("Client task failed");
})
.await
.expect("Test timed out");
// Verify the server received the connection
let peer_addr = rx.await.expect("Failed to receive peer address");
println!("Successfully accepted connection from: {}", peer_addr);
}