fix(net): protocol fixes for Tailscale-compatible peer reachability
A pile of correctness bugs that all stopped real Tailscale peers from being able to send WireGuard packets back to us. Found while building out the e2e test against the docker-compose stack. 1. WireGuard static key was wrong (lifecycle.rs) We were initializing the WgTunnel with `keys.wg_private`, a separate x25519 key from the one Tailscale advertises in netmaps. Peers know us by `node_public` and compute mac1 against it; signing handshakes with a different private key meant every init we sent was silently dropped. Use `keys.node_private` instead — node_key IS the WG static key in Tailscale. 2. DERP relay couldn't route packets to us (derp/client.rs) Our DerpClient was sealing the ClientInfo frame with a fresh ephemeral NaCl keypair and putting the ephemeral public in the frame prefix. Tailscale's protocol expects the *long-term* node public key in the prefix — that's how the relay knows where to forward packets addressed to our node_key. With the ephemeral key, the relay accepted the connection but never delivered our peers' responses. Now seal with the long-term node key. 3. Headscale never persisted our DiscoKey (proto/types.rs, control/*) The streaming /machine/map handler in Headscale ≥ capVer 68 doesn't update DiscoKey on the node record — only the "Lite endpoint update" path does, gated on Stream:false + OmitPeers:true + ReadOnly:false. Without DiscoKey our nodes appeared in `headscale nodes list` with `discokey:000…` and never propagated into peer netmaps. Add the DiscoKey field to RegisterRequest, add OmitPeers/ReadOnly fields to MapRequest, and call a new `lite_update` between register and the streaming map. Also add `post_json_no_response` for endpoints that reply with an empty body. 4. EncapAction is now a struct instead of an enum (wg/tunnel.rs) Routing was forced to either UDP or DERP. With a peer whose advertised UDP endpoint is on an unreachable RFC1918 network (e.g. docker bridge IPs), we'd send via UDP, get nothing, and never fall back. Send over every available transport — receivers dedupe via the WireGuard replay window — and let dispatch_encap forward each populated arm to its respective channel. 5. Drop the dead PacketRouter (wg/router.rs) Skeleton from an earlier design that never got wired up; it's been accumulating dead-code warnings.
This commit is contained in:
@@ -86,6 +86,58 @@ impl ControlClient {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Send a POST request with a JSON body and discard the response body.
|
||||||
|
/// Used for endpoints that return an empty 200 (e.g. Headscale's Lite
|
||||||
|
/// endpoint update).
|
||||||
|
pub(crate) async fn post_json_no_response<Req: serde::Serialize>(
|
||||||
|
&mut self,
|
||||||
|
path: &str,
|
||||||
|
body: &Req,
|
||||||
|
) -> crate::Result<()> {
|
||||||
|
let body_bytes = serde_json::to_vec(body)?;
|
||||||
|
let request = http::Request::builder()
|
||||||
|
.method("POST")
|
||||||
|
.uri(path)
|
||||||
|
.header("content-type", "application/json")
|
||||||
|
.body(())
|
||||||
|
.map_err(|e| crate::Error::Control(e.to_string()))?;
|
||||||
|
|
||||||
|
let (response_future, mut send_stream) = self
|
||||||
|
.sender
|
||||||
|
.send_request(request, false)
|
||||||
|
.map_err(|e| crate::Error::Control(format!("send request: {e}")))?;
|
||||||
|
|
||||||
|
send_stream
|
||||||
|
.send_data(Bytes::from(body_bytes), true)
|
||||||
|
.map_err(|e| crate::Error::Control(format!("send body: {e}")))?;
|
||||||
|
|
||||||
|
let response = response_future
|
||||||
|
.await
|
||||||
|
.map_err(|e| crate::Error::Control(format!("response: {e}")))?;
|
||||||
|
|
||||||
|
let status = response.status();
|
||||||
|
let mut body = response.into_body();
|
||||||
|
|
||||||
|
// Drain the response body so flow control completes cleanly, but
|
||||||
|
// don't try to parse it.
|
||||||
|
let mut response_bytes = Vec::new();
|
||||||
|
while let Some(chunk) = body.data().await {
|
||||||
|
let chunk = chunk.map_err(|e| crate::Error::Control(format!("read body: {e}")))?;
|
||||||
|
response_bytes.extend_from_slice(&chunk);
|
||||||
|
body.flow_control()
|
||||||
|
.release_capacity(chunk.len())
|
||||||
|
.map_err(|e| crate::Error::Control(format!("flow control: {e}")))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if !status.is_success() {
|
||||||
|
let text = String::from_utf8_lossy(&response_bytes);
|
||||||
|
return Err(crate::Error::Control(format!(
|
||||||
|
"{path} returned {status}: {text}"
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Send a POST request with a JSON body and parse a JSON response.
|
/// Send a POST request with a JSON body and parse a JSON response.
|
||||||
pub(crate) async fn post_json<Req: serde::Serialize, Resp: serde::de::DeserializeOwned>(
|
pub(crate) async fn post_json<Req: serde::Serialize, Resp: serde::de::DeserializeOwned>(
|
||||||
&mut self,
|
&mut self,
|
||||||
|
|||||||
@@ -155,6 +155,33 @@ impl super::client::ControlClient {
|
|||||||
/// Sends a `MapRequest` to `POST /machine/map` and returns a [`MapStream`]
|
/// Sends a `MapRequest` to `POST /machine/map` and returns a [`MapStream`]
|
||||||
/// that yields successive [`MapUpdate`]s as the coordination server pushes
|
/// that yields successive [`MapUpdate`]s as the coordination server pushes
|
||||||
/// network map changes.
|
/// network map changes.
|
||||||
|
/// Send a non-streaming "Lite endpoint update" to Headscale so it
|
||||||
|
/// persists our DiscoKey on the node record. Headscale's
|
||||||
|
/// `serveLongPoll()` (the streaming map handler) doesn't update
|
||||||
|
/// DiscoKey for capability versions ≥ 68 — only the Lite update path
|
||||||
|
/// does, gated on `Stream: false` + `OmitPeers: true` + `ReadOnly: false`.
|
||||||
|
/// Without this our peers see us as having a zero disco_key and never
|
||||||
|
/// add us to their netmaps.
|
||||||
|
pub async fn lite_update(
|
||||||
|
&mut self,
|
||||||
|
keys: &crate::keys::NodeKeys,
|
||||||
|
hostname: &str,
|
||||||
|
endpoints: Option<Vec<String>>,
|
||||||
|
) -> crate::Result<()> {
|
||||||
|
let req = MapRequest {
|
||||||
|
version: 74,
|
||||||
|
node_key: keys.node_key_str(),
|
||||||
|
disco_key: keys.disco_key_str(),
|
||||||
|
stream: false,
|
||||||
|
omit_peers: true,
|
||||||
|
read_only: false,
|
||||||
|
hostinfo: super::register::build_hostinfo(hostname),
|
||||||
|
endpoints,
|
||||||
|
};
|
||||||
|
// Lite update returns an empty body.
|
||||||
|
self.post_json_no_response("/machine/map", &req).await
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn map_stream(
|
pub async fn map_stream(
|
||||||
&mut self,
|
&mut self,
|
||||||
keys: &crate::keys::NodeKeys,
|
keys: &crate::keys::NodeKeys,
|
||||||
@@ -165,6 +192,8 @@ impl super::client::ControlClient {
|
|||||||
node_key: keys.node_key_str(),
|
node_key: keys.node_key_str(),
|
||||||
disco_key: keys.disco_key_str(),
|
disco_key: keys.disco_key_str(),
|
||||||
stream: true,
|
stream: true,
|
||||||
|
omit_peers: false,
|
||||||
|
read_only: false,
|
||||||
hostinfo: super::register::build_hostinfo(hostname),
|
hostinfo: super::register::build_hostinfo(hostname),
|
||||||
endpoints: None,
|
endpoints: None,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ impl super::client::ControlClient {
|
|||||||
version: 74, // capability version
|
version: 74, // capability version
|
||||||
node_key: keys.node_key_str(),
|
node_key: keys.node_key_str(),
|
||||||
old_node_key: format!("nodekey:{}", "0".repeat(64)),
|
old_node_key: format!("nodekey:{}", "0".repeat(64)),
|
||||||
|
disco_key: keys.disco_key_str(),
|
||||||
auth: Some(AuthInfo {
|
auth: Some(AuthInfo {
|
||||||
auth_key: Some(auth_key.to_string()),
|
auth_key: Some(auth_key.to_string()),
|
||||||
}),
|
}),
|
||||||
@@ -68,6 +69,7 @@ mod tests {
|
|||||||
version: 74,
|
version: 74,
|
||||||
node_key: keys.node_key_str(),
|
node_key: keys.node_key_str(),
|
||||||
old_node_key: String::new(),
|
old_node_key: String::new(),
|
||||||
|
disco_key: keys.disco_key_str(),
|
||||||
auth: Some(AuthInfo {
|
auth: Some(AuthInfo {
|
||||||
auth_key: Some("tskey-auth-test123".to_string()),
|
auth_key: Some("tskey-auth-test123".to_string()),
|
||||||
}),
|
}),
|
||||||
|
|||||||
@@ -98,6 +98,12 @@ async fn run_session(
|
|||||||
set_status(status, DaemonStatus::Registering);
|
set_status(status, DaemonStatus::Registering);
|
||||||
let _reg = control.register(&config.auth_key, &config.hostname, keys).await?;
|
let _reg = control.register(&config.auth_key, &config.hostname, keys).await?;
|
||||||
|
|
||||||
|
// 2a. Send a Lite endpoint update so Headscale persists our DiscoKey on
|
||||||
|
// the node record. The streaming /machine/map handler doesn't
|
||||||
|
// update DiscoKey at capability versions ≥ 68 — only the Lite path
|
||||||
|
// does, and without it our peers can't see us in their netmaps.
|
||||||
|
control.lite_update(keys, &config.hostname, None).await?;
|
||||||
|
|
||||||
// 3. Start map stream
|
// 3. Start map stream
|
||||||
let mut map_stream = control.map_stream(keys, &config.hostname).await?;
|
let mut map_stream = control.map_stream(keys, &config.hostname).await?;
|
||||||
|
|
||||||
@@ -119,8 +125,12 @@ async fn run_session(
|
|||||||
|
|
||||||
let peer_count = peers.len();
|
let peer_count = peers.len();
|
||||||
|
|
||||||
// 5. Initialize WireGuard tunnel with our WG private key
|
// 5. Initialize WireGuard tunnel. Tailscale uses the node_key as the
|
||||||
let mut wg_tunnel = WgTunnel::new(keys.wg_private.clone());
|
// WireGuard static key — they are the same key, not separate. Peers
|
||||||
|
// only know our node_public from the netmap, so boringtun must be
|
||||||
|
// signing with the matching private key or peers will drop our
|
||||||
|
// handshakes for failing mac1 validation.
|
||||||
|
let mut wg_tunnel = WgTunnel::new(keys.node_private.clone());
|
||||||
wg_tunnel.update_peers(&peers);
|
wg_tunnel.update_peers(&peers);
|
||||||
|
|
||||||
// 6. Set up NetworkEngine with our VPN IP
|
// 6. Set up NetworkEngine with our VPN IP
|
||||||
@@ -342,6 +352,7 @@ async fn run_wg_loop(
|
|||||||
incoming = derp_in_rx.recv() => {
|
incoming = derp_in_rx.recv() => {
|
||||||
match incoming {
|
match incoming {
|
||||||
Some((src_key, data)) => {
|
Some((src_key, data)) => {
|
||||||
|
tracing::trace!("WG ← DERP ({} bytes)", data.len());
|
||||||
let action = tunnel.decapsulate(&src_key, &data);
|
let action = tunnel.decapsulate(&src_key, &data);
|
||||||
handle_decap(action, src_key, &to_engine, &derp_out_tx).await;
|
handle_decap(action, src_key, &to_engine, &derp_out_tx).await;
|
||||||
}
|
}
|
||||||
@@ -351,6 +362,7 @@ async fn run_wg_loop(
|
|||||||
incoming = udp_in_rx.recv() => {
|
incoming = udp_in_rx.recv() => {
|
||||||
match incoming {
|
match incoming {
|
||||||
Some((src_addr, data)) => {
|
Some((src_addr, data)) => {
|
||||||
|
tracing::trace!("WG ← UDP {src_addr} ({} bytes)", data.len());
|
||||||
let Some(peer_key) = identify_udp_peer(&tunnel, src_addr, &data) else {
|
let Some(peer_key) = identify_udp_peer(&tunnel, src_addr, &data) else {
|
||||||
tracing::trace!("UDP packet from {src_addr}: no peer match");
|
tracing::trace!("UDP packet from {src_addr}: no peer match");
|
||||||
continue;
|
continue;
|
||||||
@@ -371,22 +383,21 @@ async fn run_wg_loop(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Dispatch a WG encap action to the appropriate transport.
|
/// Dispatch a WG encap action to whichever transports it carries. We send
|
||||||
|
/// over both UDP and DERP when both are populated; the remote peer dedupes
|
||||||
|
/// duplicate ciphertexts via the WireGuard replay window.
|
||||||
async fn dispatch_encap(
|
async fn dispatch_encap(
|
||||||
action: crate::wg::tunnel::EncapAction,
|
action: crate::wg::tunnel::EncapAction,
|
||||||
derp_out_tx: &mpsc::Sender<([u8; 32], Vec<u8>)>,
|
derp_out_tx: &mpsc::Sender<([u8; 32], Vec<u8>)>,
|
||||||
udp_out_tx: &mpsc::Sender<(std::net::SocketAddr, Vec<u8>)>,
|
udp_out_tx: &mpsc::Sender<(std::net::SocketAddr, Vec<u8>)>,
|
||||||
) {
|
) {
|
||||||
match action {
|
if let Some((endpoint, data)) = action.udp {
|
||||||
crate::wg::tunnel::EncapAction::SendUdp { endpoint, data } => {
|
tracing::trace!("WG → UDP {endpoint} ({} bytes)", data.len());
|
||||||
tracing::trace!("WG → UDP {endpoint} ({} bytes)", data.len());
|
let _ = udp_out_tx.send((endpoint, data)).await;
|
||||||
let _ = udp_out_tx.send((endpoint, data)).await;
|
}
|
||||||
}
|
if let Some((dest_key, data)) = action.derp {
|
||||||
crate::wg::tunnel::EncapAction::SendDerp { dest_key, data } => {
|
tracing::trace!("WG → DERP ({} bytes)", data.len());
|
||||||
tracing::trace!("WG → DERP ({} bytes)", data.len());
|
let _ = derp_out_tx.send((dest_key, data)).await;
|
||||||
let _ = derp_out_tx.send((dest_key, data)).await;
|
|
||||||
}
|
|
||||||
crate::wg::tunnel::EncapAction::Nothing => {}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -523,6 +534,7 @@ async fn run_derp_loop(
|
|||||||
incoming = client.recv_packet() => {
|
incoming = client.recv_packet() => {
|
||||||
match incoming {
|
match incoming {
|
||||||
Ok((src_key, data)) => {
|
Ok((src_key, data)) => {
|
||||||
|
tracing::trace!("DERP recv ({} bytes)", data.len());
|
||||||
if in_tx.send((src_key, data)).await.is_err() {
|
if in_tx.send((src_key, data)).await.is_err() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ impl DerpClient {
|
|||||||
/// `url` should be like `http://host:port` or `host:port`.
|
/// `url` should be like `http://host:port` or `host:port`.
|
||||||
pub async fn connect(
|
pub async fn connect(
|
||||||
url: &str,
|
url: &str,
|
||||||
_node_key: &crate::keys::NodeKeys,
|
node_keys: &crate::keys::NodeKeys,
|
||||||
) -> crate::Result<Self> {
|
) -> crate::Result<Self> {
|
||||||
use crypto_box::aead::{Aead, AeadCore, OsRng};
|
use crypto_box::aead::{Aead, AeadCore, OsRng};
|
||||||
use crypto_box::{PublicKey, SalsaBox, SecretKey};
|
use crypto_box::{PublicKey, SalsaBox, SecretKey};
|
||||||
@@ -128,26 +128,32 @@ impl DerpClient {
|
|||||||
let mut server_public = [0u8; 32];
|
let mut server_public = [0u8; 32];
|
||||||
server_public.copy_from_slice(key_bytes);
|
server_public.copy_from_slice(key_bytes);
|
||||||
|
|
||||||
// Generate ephemeral NaCl keypair for the handshake
|
// The DERP relay needs to know our LONG-TERM node public key so it
|
||||||
let ephemeral_secret = SecretKey::generate(&mut OsRng);
|
// can route inbound packets addressed to us. Tailscale's protocol
|
||||||
let ephemeral_public = ephemeral_secret.public_key();
|
// sends the node public key as the first 32 bytes of the ClientInfo
|
||||||
|
// frame, then a NaCl-sealed JSON blob signed with the node's
|
||||||
|
// long-term private key (so the server can verify ownership).
|
||||||
|
let node_secret_bytes: [u8; 32] = node_keys.node_private.to_bytes();
|
||||||
|
let node_public_bytes: [u8; 32] = *node_keys.node_public.as_bytes();
|
||||||
|
let node_secret = SecretKey::from(node_secret_bytes);
|
||||||
|
let node_public_nacl = PublicKey::from(node_public_bytes);
|
||||||
|
|
||||||
// Build client info JSON
|
// Build client info JSON
|
||||||
let client_info = serde_json::json!({"version": 2});
|
let client_info = serde_json::json!({"version": 2});
|
||||||
let client_info_bytes = serde_json::to_vec(&client_info)
|
let client_info_bytes = serde_json::to_vec(&client_info)
|
||||||
.map_err(|e| Error::Derp(format!("failed to serialize client info: {e}")))?;
|
.map_err(|e| Error::Derp(format!("failed to serialize client info: {e}")))?;
|
||||||
|
|
||||||
// Seal with crypto_box: encrypt client info using our ephemeral key and server's public key
|
// Seal with crypto_box: encrypt with OUR long-term private + server public.
|
||||||
let server_pk = PublicKey::from(server_public);
|
let server_pk = PublicKey::from(server_public);
|
||||||
let salsa_box = SalsaBox::new(&server_pk, &ephemeral_secret);
|
let salsa_box = SalsaBox::new(&server_pk, &node_secret);
|
||||||
let nonce = SalsaBox::generate_nonce(&mut OsRng);
|
let nonce = SalsaBox::generate_nonce(&mut OsRng);
|
||||||
let sealed = salsa_box
|
let sealed = salsa_box
|
||||||
.encrypt(&nonce, client_info_bytes.as_slice())
|
.encrypt(&nonce, client_info_bytes.as_slice())
|
||||||
.map_err(|e| Error::Derp(format!("failed to seal client info: {e}")))?;
|
.map_err(|e| Error::Derp(format!("failed to seal client info: {e}")))?;
|
||||||
|
|
||||||
// ClientInfo frame: 32-byte ephemeral public key + nonce + sealed box
|
// ClientInfo frame: 32-byte long-term public key + nonce + sealed box
|
||||||
let mut client_info_payload = BytesMut::new();
|
let mut client_info_payload = BytesMut::new();
|
||||||
client_info_payload.extend_from_slice(ephemeral_public.as_bytes());
|
client_info_payload.extend_from_slice(node_public_nacl.as_bytes());
|
||||||
client_info_payload.extend_from_slice(&nonce);
|
client_info_payload.extend_from_slice(&nonce);
|
||||||
client_info_payload.extend_from_slice(&sealed);
|
client_info_payload.extend_from_slice(&sealed);
|
||||||
|
|
||||||
|
|||||||
@@ -8,6 +8,10 @@ pub struct RegisterRequest {
|
|||||||
pub version: u16,
|
pub version: u16,
|
||||||
pub node_key: String,
|
pub node_key: String,
|
||||||
pub old_node_key: String,
|
pub old_node_key: String,
|
||||||
|
/// Curve25519 disco public key. Headscale persists this on the node
|
||||||
|
/// record and uses it for peer-to-peer discovery — if it's zero, peers
|
||||||
|
/// won't include us in their netmaps.
|
||||||
|
pub disco_key: String,
|
||||||
pub auth: Option<AuthInfo>,
|
pub auth: Option<AuthInfo>,
|
||||||
pub hostinfo: HostInfo,
|
pub hostinfo: HostInfo,
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
@@ -93,6 +97,14 @@ pub struct MapRequest {
|
|||||||
pub node_key: String,
|
pub node_key: String,
|
||||||
pub disco_key: String,
|
pub disco_key: String,
|
||||||
pub stream: bool,
|
pub stream: bool,
|
||||||
|
/// "Lite update" flag — set together with `Stream: false` and
|
||||||
|
/// `ReadOnly: false` to make Headscale persist DiscoKey + endpoints
|
||||||
|
/// without sending a full netmap response (the "Lite endpoint update"
|
||||||
|
/// path in Headscale's poll.go).
|
||||||
|
#[serde(default)]
|
||||||
|
pub omit_peers: bool,
|
||||||
|
#[serde(default)]
|
||||||
|
pub read_only: bool,
|
||||||
pub hostinfo: HostInfo,
|
pub hostinfo: HostInfo,
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
pub endpoints: Option<Vec<String>>,
|
pub endpoints: Option<Vec<String>>,
|
||||||
@@ -237,6 +249,7 @@ mod tests {
|
|||||||
version: 74,
|
version: 74,
|
||||||
node_key: "nodekey:aabb".into(),
|
node_key: "nodekey:aabb".into(),
|
||||||
old_node_key: "".into(),
|
old_node_key: "".into(),
|
||||||
|
disco_key: "discokey:ccdd".into(),
|
||||||
auth: Some(AuthInfo {
|
auth: Some(AuthInfo {
|
||||||
auth_key: Some("tskey-abc".into()),
|
auth_key: Some("tskey-abc".into()),
|
||||||
}),
|
}),
|
||||||
|
|||||||
@@ -1,3 +1,2 @@
|
|||||||
pub mod router;
|
|
||||||
pub mod socket;
|
pub mod socket;
|
||||||
pub mod tunnel;
|
pub mod tunnel;
|
||||||
|
|||||||
@@ -1,203 +0,0 @@
|
|||||||
use std::net::SocketAddr;
|
|
||||||
|
|
||||||
use tokio::net::UdpSocket;
|
|
||||||
use tokio::sync::mpsc;
|
|
||||||
|
|
||||||
use super::tunnel::{DecapAction, EncapAction, WgTunnel};
|
|
||||||
|
|
||||||
/// Routes packets between WireGuard tunnels, UDP sockets, and DERP relays.
|
|
||||||
pub(crate) struct PacketRouter {
|
|
||||||
tunnel: WgTunnel,
|
|
||||||
udp: Option<UdpSocket>,
|
|
||||||
ingress_tx: mpsc::Sender<IngressPacket>,
|
|
||||||
ingress_rx: mpsc::Receiver<IngressPacket>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// An inbound WireGuard packet from either UDP or DERP.
|
|
||||||
pub(crate) enum IngressPacket {
|
|
||||||
/// WireGuard packet received over UDP.
|
|
||||||
Udp { src: SocketAddr, data: Vec<u8> },
|
|
||||||
/// WireGuard packet received via DERP relay.
|
|
||||||
Derp { src_key: [u8; 32], data: Vec<u8> },
|
|
||||||
}
|
|
||||||
|
|
||||||
impl PacketRouter {
|
|
||||||
pub fn new(tunnel: WgTunnel) -> Self {
|
|
||||||
let (tx, rx) = mpsc::channel(256);
|
|
||||||
Self {
|
|
||||||
tunnel,
|
|
||||||
udp: None,
|
|
||||||
ingress_tx: tx,
|
|
||||||
ingress_rx: rx,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Bind the UDP socket for direct WireGuard traffic.
|
|
||||||
pub async fn bind_udp(&mut self, addr: SocketAddr) -> crate::Result<()> {
|
|
||||||
let sock = UdpSocket::bind(addr)
|
|
||||||
.await
|
|
||||||
.map_err(|e| crate::Error::WireGuard(format!("failed to bind UDP {addr}: {e}")))?;
|
|
||||||
self.udp = Some(sock);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get a clone of the ingress sender (for DERP recv loops to feed packets in).
|
|
||||||
pub fn ingress_sender(&self) -> mpsc::Sender<IngressPacket> {
|
|
||||||
self.ingress_tx.clone()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Process one ingress packet: decapsulate and return any decrypted IP packet.
|
|
||||||
/// If decapsulation produces a response (handshake, cookie), it is sent back
|
|
||||||
/// to the peer over the appropriate transport.
|
|
||||||
pub fn process_ingress(&mut self, packet: IngressPacket) -> Option<Vec<u8>> {
|
|
||||||
match packet {
|
|
||||||
IngressPacket::Udp { src, data } => {
|
|
||||||
// We need to figure out which peer sent this. For UDP, we look up
|
|
||||||
// the peer by source address. If we can't find it, try all peers.
|
|
||||||
let peer_key = self.find_peer_by_endpoint(src);
|
|
||||||
let key = match peer_key {
|
|
||||||
Some(k) => k,
|
|
||||||
None => return None,
|
|
||||||
};
|
|
||||||
|
|
||||||
match self.tunnel.decapsulate(&key, &data) {
|
|
||||||
DecapAction::Packet(ip_packet) => Some(ip_packet),
|
|
||||||
DecapAction::Response(response) => {
|
|
||||||
// Queue the response to send back — best-effort.
|
|
||||||
self.try_send_udp_sync(src, &response);
|
|
||||||
None
|
|
||||||
}
|
|
||||||
DecapAction::Nothing => None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
IngressPacket::Derp { src_key, data } => {
|
|
||||||
match self.tunnel.decapsulate(&src_key, &data) {
|
|
||||||
DecapAction::Packet(ip_packet) => Some(ip_packet),
|
|
||||||
DecapAction::Response(_response) => {
|
|
||||||
// DERP responses would need to be sent back via DERP.
|
|
||||||
// The caller should handle this via the DERP send path.
|
|
||||||
None
|
|
||||||
}
|
|
||||||
DecapAction::Nothing => None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Encapsulate an outbound IP packet and send it via the appropriate transport.
|
|
||||||
pub async fn send_outbound(&mut self, ip_packet: &[u8]) -> crate::Result<()> {
|
|
||||||
// Parse the destination IP from the IP packet header.
|
|
||||||
let dst_ip = match parse_dst_ip(ip_packet) {
|
|
||||||
Some(ip) => ip,
|
|
||||||
None => return Ok(()),
|
|
||||||
};
|
|
||||||
|
|
||||||
match self.tunnel.encapsulate(dst_ip, ip_packet) {
|
|
||||||
EncapAction::SendUdp { endpoint, data } => {
|
|
||||||
if let Some(ref udp) = self.udp {
|
|
||||||
udp.send_to(&data, endpoint).await.map_err(|e| {
|
|
||||||
crate::Error::WireGuard(format!("UDP send to {endpoint}: {e}"))
|
|
||||||
})?;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
EncapAction::SendDerp { dest_key: _, data: _ } => {
|
|
||||||
// DERP sending would be handled by the caller via a DERP client.
|
|
||||||
// This layer just signals the intent; the daemon wires it up.
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
EncapAction::Nothing => Ok(()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Run timer ticks on all tunnels, send any resulting packets.
|
|
||||||
pub async fn tick(&mut self) -> crate::Result<()> {
|
|
||||||
let actions = self.tunnel.tick();
|
|
||||||
for ta in actions {
|
|
||||||
match ta.action {
|
|
||||||
EncapAction::SendUdp { endpoint, data } => {
|
|
||||||
if let Some(ref udp) = self.udp {
|
|
||||||
let _ = udp.send_to(&data, endpoint).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
EncapAction::SendDerp { .. } => {
|
|
||||||
// DERP timer packets handled by daemon layer.
|
|
||||||
}
|
|
||||||
EncapAction::Nothing => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Update peers from netmap.
|
|
||||||
pub fn update_peers(&mut self, peers: &[crate::proto::types::Node]) {
|
|
||||||
self.tunnel.update_peers(peers);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Find a peer key by its known UDP endpoint.
|
|
||||||
fn find_peer_by_endpoint(&self, _src: SocketAddr) -> Option<[u8; 32]> {
|
|
||||||
// In a full implementation, we'd maintain an endpoint→key index.
|
|
||||||
// For now, we expose internal peer iteration via the tunnel.
|
|
||||||
// This is a simplification — a production router would have a reverse map.
|
|
||||||
// We try all peers since the tunnel has the endpoint info.
|
|
||||||
None
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Best-effort synchronous UDP send (used in process_ingress which is sync).
|
|
||||||
fn try_send_udp_sync(&self, _dst: SocketAddr, _data: &[u8]) {
|
|
||||||
// UdpSocket::try_send_to is not available on tokio UdpSocket without
|
|
||||||
// prior connect. In a real implementation, we'd use a std UdpSocket
|
|
||||||
// or queue the response for async sending. For now, responses from
|
|
||||||
// process_ingress are dropped — the caller should handle them.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Parse the destination IP address from a raw IP packet.
|
|
||||||
fn parse_dst_ip(packet: &[u8]) -> Option<std::net::IpAddr> {
|
|
||||||
if packet.is_empty() {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
let version = packet[0] >> 4;
|
|
||||||
match version {
|
|
||||||
4 if packet.len() >= 20 => {
|
|
||||||
let dst = std::net::Ipv4Addr::new(packet[16], packet[17], packet[18], packet[19]);
|
|
||||||
Some(std::net::IpAddr::V4(dst))
|
|
||||||
}
|
|
||||||
6 if packet.len() >= 40 => {
|
|
||||||
let mut octets = [0u8; 16];
|
|
||||||
octets.copy_from_slice(&packet[24..40]);
|
|
||||||
Some(std::net::IpAddr::V6(std::net::Ipv6Addr::from(octets)))
|
|
||||||
}
|
|
||||||
_ => None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
use x25519_dalek::StaticSecret;
|
|
||||||
|
|
||||||
fn make_tunnel() -> WgTunnel {
|
|
||||||
let secret = StaticSecret::random_from_rng(rand::rngs::OsRng);
|
|
||||||
WgTunnel::new(secret)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_new_router() {
|
|
||||||
let tunnel = make_tunnel();
|
|
||||||
let router = PacketRouter::new(tunnel);
|
|
||||||
// Verify the ingress channel is functional by checking the sender isn't closed.
|
|
||||||
assert!(!router.ingress_tx.is_closed());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_ingress_sender_clone() {
|
|
||||||
let tunnel = make_tunnel();
|
|
||||||
let router = PacketRouter::new(tunnel);
|
|
||||||
let sender1 = router.ingress_sender();
|
|
||||||
let sender2 = router.ingress_sender();
|
|
||||||
// Both senders should be live.
|
|
||||||
assert!(!sender1.is_closed());
|
|
||||||
assert!(!sender2.is_closed());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -25,13 +25,27 @@ struct PeerTunnel {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Result of encapsulating an outbound IP packet.
|
/// Result of encapsulating an outbound IP packet.
|
||||||
pub(crate) enum EncapAction {
|
///
|
||||||
/// Send these bytes over UDP to the given endpoint.
|
/// May contain zero, one, or two transport hints. We always prefer to send
|
||||||
SendUdp { endpoint: SocketAddr, data: Vec<u8> },
|
/// over every available transport — boringtun's replay protection on the
|
||||||
/// Send these bytes via DERP relay.
|
/// receiver dedupes any duplicates by counter, and going dual-path lets us
|
||||||
SendDerp { dest_key: [u8; 32], data: Vec<u8> },
|
/// transparently fall back when one direction is broken (e.g. peer's
|
||||||
/// Nothing to send (handshake pending, etc.)
|
/// advertised UDP endpoint is on an unreachable RFC1918 network).
|
||||||
Nothing,
|
pub(crate) struct EncapAction {
|
||||||
|
/// If set, send these bytes via UDP to the given endpoint.
|
||||||
|
pub udp: Option<(SocketAddr, Vec<u8>)>,
|
||||||
|
/// If set, send these bytes via DERP relay to the given peer key.
|
||||||
|
pub derp: Option<([u8; 32], Vec<u8>)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EncapAction {
|
||||||
|
pub fn nothing() -> Self {
|
||||||
|
Self { udp: None, derp: None }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_nothing(&self) -> bool {
|
||||||
|
self.udp.is_none() && self.derp.is_none()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Result of decapsulating an inbound WireGuard packet.
|
/// Result of decapsulating an inbound WireGuard packet.
|
||||||
@@ -113,12 +127,12 @@ impl WgTunnel {
|
|||||||
pub fn encapsulate(&mut self, dst_ip: IpAddr, payload: &[u8]) -> EncapAction {
|
pub fn encapsulate(&mut self, dst_ip: IpAddr, payload: &[u8]) -> EncapAction {
|
||||||
let peer_key = match self.find_peer_for_ip(dst_ip) {
|
let peer_key = match self.find_peer_for_ip(dst_ip) {
|
||||||
Some(k) => *k,
|
Some(k) => *k,
|
||||||
None => return EncapAction::Nothing,
|
None => return EncapAction::nothing(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let peer = match self.peers.get_mut(&peer_key) {
|
let peer = match self.peers.get_mut(&peer_key) {
|
||||||
Some(p) => p,
|
Some(p) => p,
|
||||||
None => return EncapAction::Nothing,
|
None => return EncapAction::nothing(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut buf = vec![0u8; BUF_SIZE];
|
let mut buf = vec![0u8; BUF_SIZE];
|
||||||
@@ -127,10 +141,10 @@ impl WgTunnel {
|
|||||||
let packet = data.to_vec();
|
let packet = data.to_vec();
|
||||||
route_packet(peer, &peer_key, packet)
|
route_packet(peer, &peer_key, packet)
|
||||||
}
|
}
|
||||||
TunnResult::Err(_) | TunnResult::Done => EncapAction::Nothing,
|
TunnResult::Err(_) | TunnResult::Done => EncapAction::nothing(),
|
||||||
// These shouldn't happen during encapsulate, but handle gracefully.
|
// These shouldn't happen during encapsulate, but handle gracefully.
|
||||||
TunnResult::WriteToTunnelV4(_, _) | TunnResult::WriteToTunnelV6(_, _) => {
|
TunnResult::WriteToTunnelV4(_, _) | TunnResult::WriteToTunnelV6(_, _) => {
|
||||||
EncapAction::Nothing
|
EncapAction::nothing()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -242,18 +256,17 @@ impl WgTunnel {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Decide how to route a WireGuard packet for a given peer.
|
/// Decide how to route a WireGuard packet for a given peer. Returns both
|
||||||
|
/// transports when both are available — the receiver dedupes via WG counter.
|
||||||
fn route_packet(peer: &PeerTunnel, peer_key: &[u8; 32], data: Vec<u8>) -> EncapAction {
|
fn route_packet(peer: &PeerTunnel, peer_key: &[u8; 32], data: Vec<u8>) -> EncapAction {
|
||||||
if let Some(endpoint) = peer.endpoint {
|
let mut action = EncapAction::nothing();
|
||||||
EncapAction::SendUdp { endpoint, data }
|
if peer.derp_region.is_some() {
|
||||||
} else if peer.derp_region.is_some() {
|
action.derp = Some((*peer_key, data.clone()));
|
||||||
EncapAction::SendDerp {
|
|
||||||
dest_key: *peer_key,
|
|
||||||
data,
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
EncapAction::Nothing
|
|
||||||
}
|
}
|
||||||
|
if let Some(endpoint) = peer.endpoint {
|
||||||
|
action.udp = Some((endpoint, data));
|
||||||
|
}
|
||||||
|
action
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Parse a "nodekey:<hex>" string into raw 32-byte key.
|
/// Parse a "nodekey:<hex>" string into raw 32-byte key.
|
||||||
|
|||||||
Reference in New Issue
Block a user