feat(net): add DERP relay client
DERP is Tailscale's TCP relay protocol for peers that can't establish a direct UDP path. Add the standalone client: - derp/framing: 5-byte frame codec (1-byte type + 4-byte BE length) - derp/client: HTTP /derp upgrade, Tailscale's NaCl SealedBox handshake (ServerKey → ClientInfo → ServerInfo → NotePreferred), and send_packet/recv_packet for forwarding WireGuard datagrams Includes the 8-byte DERP\xf0\x9f\x94\x91 magic prefix in the ServerKey payload and reads the HTTP upgrade response one byte at a time so the inline first frame isn't swallowed by a buffered reader.
This commit is contained in:
482
sunbeam-net/src/derp/client.rs
Normal file
482
sunbeam-net/src/derp/client.rs
Normal file
@@ -0,0 +1,482 @@
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use futures::{SinkExt, StreamExt};
|
||||
#[cfg(test)]
|
||||
use tokio::io::{AsyncBufReadExt, BufReader};
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_util::codec::Framed;
|
||||
|
||||
use super::framing::*;
|
||||
use crate::error::Error;
|
||||
|
||||
/// Client for a single DERP relay server.
|
||||
pub struct DerpClient {
|
||||
inner: Framed<TcpStream, DerpFrameCodec>,
|
||||
server_public: [u8; 32],
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for DerpClient {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("DerpClient")
|
||||
.field("server_public", &hex_encode(&self.server_public))
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
fn hex_encode(bytes: &[u8]) -> String {
|
||||
bytes.iter().map(|b| format!("{b:02x}")).collect()
|
||||
}
|
||||
|
||||
impl DerpClient {
|
||||
/// Connect to a DERP server, perform HTTP upgrade and NaCl handshake.
|
||||
///
|
||||
/// `url` should be like `http://host:port` or `host:port`.
|
||||
pub async fn connect(
|
||||
url: &str,
|
||||
_node_key: &crate::keys::NodeKeys,
|
||||
) -> crate::Result<Self> {
|
||||
use crypto_box::aead::{Aead, AeadCore, OsRng};
|
||||
use crypto_box::{PublicKey, SalsaBox, SecretKey};
|
||||
|
||||
// Parse host:port from url
|
||||
let addr = url
|
||||
.strip_prefix("http://")
|
||||
.or_else(|| url.strip_prefix("https://"))
|
||||
.unwrap_or(url);
|
||||
|
||||
// TCP connect
|
||||
let mut stream = TcpStream::connect(addr).await.map_err(|e| {
|
||||
Error::Derp(format!("failed to connect to DERP server {addr}: {e}"))
|
||||
})?;
|
||||
|
||||
// HTTP upgrade request
|
||||
let host = addr.split(':').next().unwrap_or(addr);
|
||||
let upgrade_req = format!(
|
||||
"GET /derp HTTP/1.1\r\n\
|
||||
Host: {host}\r\n\
|
||||
Connection: Upgrade\r\n\
|
||||
Upgrade: DERP\r\n\
|
||||
\r\n"
|
||||
);
|
||||
|
||||
stream.write_all(upgrade_req.as_bytes()).await.map_err(|e| {
|
||||
Error::Derp(format!("failed to send upgrade request: {e}"))
|
||||
})?;
|
||||
|
||||
// Read until we see the end-of-headers `\r\n\r\n`. We must NOT use a
|
||||
// BufReader: the DERP server sends the first frame inline immediately
|
||||
// after the HTTP response, and a BufReader's internal buffer would
|
||||
// swallow those bytes when we discard it.
|
||||
let mut header_buf = Vec::with_capacity(512);
|
||||
loop {
|
||||
let mut byte = [0u8; 1];
|
||||
let n = stream.read(&mut byte).await.map_err(|e| {
|
||||
Error::Derp(format!("failed to read upgrade response: {e}"))
|
||||
})?;
|
||||
if n == 0 {
|
||||
return Err(Error::Derp("connection closed during HTTP upgrade".into()));
|
||||
}
|
||||
header_buf.push(byte[0]);
|
||||
if header_buf.ends_with(b"\r\n\r\n") {
|
||||
break;
|
||||
}
|
||||
if header_buf.len() > 8192 {
|
||||
return Err(Error::Derp("HTTP upgrade headers too large".into()));
|
||||
}
|
||||
}
|
||||
|
||||
let header_str = std::str::from_utf8(&header_buf)
|
||||
.map_err(|e| Error::Derp(format!("upgrade headers not utf-8: {e}")))?;
|
||||
let status_line = header_str.lines().next().unwrap_or("");
|
||||
if !status_line.contains("101") {
|
||||
return Err(Error::Derp(format!(
|
||||
"DERP server did not send 101 Switching Protocols: {status_line}"
|
||||
)));
|
||||
}
|
||||
|
||||
// Wrap in framed codec
|
||||
let mut framed = Framed::new(stream, DerpFrameCodec);
|
||||
|
||||
// Read ServerKey frame (FRAME_SERVER_KEY, 32 bytes)
|
||||
let server_key_frame = framed
|
||||
.next()
|
||||
.await
|
||||
.ok_or_else(|| Error::Derp("connection closed before ServerKey".into()))?
|
||||
.map_err(|e| Error::Derp(format!("failed to read ServerKey frame: {e}")))?;
|
||||
|
||||
if server_key_frame.frame_type != FRAME_SERVER_KEY {
|
||||
return Err(Error::Derp(format!(
|
||||
"expected ServerKey frame (0x01), got 0x{:02x}",
|
||||
server_key_frame.frame_type
|
||||
)));
|
||||
}
|
||||
// Tailscale's ServerKey frame is 8 bytes of magic ("DERP🔑") followed
|
||||
// by the 32-byte server public key. Some implementations send only the
|
||||
// 32-byte key.
|
||||
const DERP_MAGIC: &[u8] = b"DERP\xf0\x9f\x94\x91";
|
||||
let key_bytes: &[u8] = if server_key_frame.payload.starts_with(DERP_MAGIC) {
|
||||
&server_key_frame.payload[DERP_MAGIC.len()..]
|
||||
} else {
|
||||
&server_key_frame.payload[..]
|
||||
};
|
||||
if key_bytes.len() != 32 {
|
||||
return Err(Error::Derp(format!(
|
||||
"ServerKey must yield 32 bytes after magic, got {}",
|
||||
key_bytes.len()
|
||||
)));
|
||||
}
|
||||
let mut server_public = [0u8; 32];
|
||||
server_public.copy_from_slice(key_bytes);
|
||||
|
||||
// Generate ephemeral NaCl keypair for the handshake
|
||||
let ephemeral_secret = SecretKey::generate(&mut OsRng);
|
||||
let ephemeral_public = ephemeral_secret.public_key();
|
||||
|
||||
// Build client info JSON
|
||||
let client_info = serde_json::json!({"version": 2});
|
||||
let client_info_bytes = serde_json::to_vec(&client_info)
|
||||
.map_err(|e| Error::Derp(format!("failed to serialize client info: {e}")))?;
|
||||
|
||||
// Seal with crypto_box: encrypt client info using our ephemeral key and server's public key
|
||||
let server_pk = PublicKey::from(server_public);
|
||||
let salsa_box = SalsaBox::new(&server_pk, &ephemeral_secret);
|
||||
let nonce = SalsaBox::generate_nonce(&mut OsRng);
|
||||
let sealed = salsa_box
|
||||
.encrypt(&nonce, client_info_bytes.as_slice())
|
||||
.map_err(|e| Error::Derp(format!("failed to seal client info: {e}")))?;
|
||||
|
||||
// ClientInfo frame: 32-byte ephemeral public key + nonce + sealed box
|
||||
let mut client_info_payload = BytesMut::new();
|
||||
client_info_payload.extend_from_slice(ephemeral_public.as_bytes());
|
||||
client_info_payload.extend_from_slice(&nonce);
|
||||
client_info_payload.extend_from_slice(&sealed);
|
||||
|
||||
framed
|
||||
.send(DerpFrame {
|
||||
frame_type: FRAME_CLIENT_INFO,
|
||||
payload: client_info_payload,
|
||||
})
|
||||
.await
|
||||
.map_err(|e| Error::Derp(format!("failed to send ClientInfo: {e}")))?;
|
||||
|
||||
// Read ServerInfo frame (sealed box with JSON)
|
||||
let server_info_frame = framed
|
||||
.next()
|
||||
.await
|
||||
.ok_or_else(|| Error::Derp("connection closed before ServerInfo".into()))?
|
||||
.map_err(|e| Error::Derp(format!("failed to read ServerInfo frame: {e}")))?;
|
||||
|
||||
if server_info_frame.frame_type != FRAME_SERVER_INFO {
|
||||
return Err(Error::Derp(format!(
|
||||
"expected ServerInfo frame (0x03), got 0x{:02x}",
|
||||
server_info_frame.frame_type
|
||||
)));
|
||||
}
|
||||
|
||||
// ServerInfo is nonce + sealed JSON; we decrypt it but don't need the content
|
||||
let si_payload = &server_info_frame.payload;
|
||||
if si_payload.len() < 24 {
|
||||
return Err(Error::Derp("ServerInfo payload too short for nonce".into()));
|
||||
}
|
||||
let si_nonce = crypto_box::Nonce::from_slice(&si_payload[..24]);
|
||||
let _server_info_json = salsa_box
|
||||
.decrypt(si_nonce, &si_payload[24..])
|
||||
.map_err(|e| Error::Derp(format!("failed to decrypt ServerInfo: {e}")))?;
|
||||
|
||||
// Send NotePreferred (1 byte: 0x01 = true = this is our preferred DERP)
|
||||
framed
|
||||
.send(DerpFrame {
|
||||
frame_type: FRAME_NOTE_PREFERRED,
|
||||
payload: BytesMut::from(&[0x01u8][..]),
|
||||
})
|
||||
.await
|
||||
.map_err(|e| Error::Derp(format!("failed to send NotePreferred: {e}")))?;
|
||||
|
||||
Ok(DerpClient {
|
||||
inner: framed,
|
||||
server_public,
|
||||
})
|
||||
}
|
||||
|
||||
/// The DERP server's public key.
|
||||
pub fn server_public(&self) -> &[u8; 32] {
|
||||
&self.server_public
|
||||
}
|
||||
|
||||
/// Send a WireGuard packet to a peer via the relay.
|
||||
pub async fn send_packet(&mut self, dest_key: &[u8; 32], payload: &[u8]) -> crate::Result<()> {
|
||||
let mut frame_payload = BytesMut::with_capacity(32 + payload.len());
|
||||
frame_payload.put_slice(dest_key);
|
||||
frame_payload.put_slice(payload);
|
||||
|
||||
self.inner
|
||||
.send(DerpFrame {
|
||||
frame_type: FRAME_SEND_PACKET,
|
||||
payload: frame_payload,
|
||||
})
|
||||
.await
|
||||
.map_err(|e| Error::Derp(format!("failed to send packet: {e}")))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Receive a WireGuard packet from a peer via the relay.
|
||||
/// Returns (source_key, payload).
|
||||
pub async fn recv_packet(&mut self) -> crate::Result<([u8; 32], Vec<u8>)> {
|
||||
loop {
|
||||
let frame = self
|
||||
.inner
|
||||
.next()
|
||||
.await
|
||||
.ok_or(Error::ConnectionClosed)?
|
||||
.map_err(|e| Error::Derp(format!("failed to read frame: {e}")))?;
|
||||
|
||||
match frame.frame_type {
|
||||
FRAME_RECV_PACKET => {
|
||||
if frame.payload.len() < 32 {
|
||||
return Err(Error::Derp(format!(
|
||||
"RecvPacket payload too short: {} bytes",
|
||||
frame.payload.len()
|
||||
)));
|
||||
}
|
||||
let mut source_key = [0u8; 32];
|
||||
source_key.copy_from_slice(&frame.payload[..32]);
|
||||
let data = frame.payload[32..].to_vec();
|
||||
return Ok((source_key, data));
|
||||
}
|
||||
FRAME_KEEP_ALIVE | FRAME_PEER_GONE | FRAME_PEER_PRESENT | FRAME_HEALTH => {
|
||||
// Skip non-data frames
|
||||
continue;
|
||||
}
|
||||
FRAME_PING => {
|
||||
// Auto-respond with pong
|
||||
if frame.payload.len() >= 8 {
|
||||
let mut data = [0u8; 8];
|
||||
data.copy_from_slice(&frame.payload[..8]);
|
||||
self.send_pong(data).await?;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
other => {
|
||||
tracing::debug!("ignoring DERP frame type 0x{other:02x}");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Send a keep-alive ping.
|
||||
pub async fn send_ping(&mut self, data: [u8; 8]) -> crate::Result<()> {
|
||||
self.inner
|
||||
.send(DerpFrame {
|
||||
frame_type: FRAME_PING,
|
||||
payload: BytesMut::from(&data[..]),
|
||||
})
|
||||
.await
|
||||
.map_err(|e| Error::Derp(format!("failed to send ping: {e}")))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Send a pong response.
|
||||
pub async fn send_pong(&mut self, data: [u8; 8]) -> crate::Result<()> {
|
||||
self.inner
|
||||
.send(DerpFrame {
|
||||
frame_type: FRAME_PONG,
|
||||
payload: BytesMut::from(&data[..]),
|
||||
})
|
||||
.await
|
||||
.map_err(|e| Error::Derp(format!("failed to send pong: {e}")))?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use bytes::BytesMut;
|
||||
use crypto_box::aead::{Aead, AeadCore, OsRng};
|
||||
use crypto_box::{PublicKey, SalsaBox, SecretKey};
|
||||
use futures::{SinkExt, StreamExt};
|
||||
use tokio::net::TcpListener;
|
||||
use tokio_util::codec::Framed;
|
||||
|
||||
/// Run a mock DERP server that completes the handshake.
|
||||
/// Returns the server's secret key for verification.
|
||||
async fn mock_derp_server(
|
||||
listener: &TcpListener,
|
||||
) -> crate::Result<(Framed<TcpStream, DerpFrameCodec>, [u8; 32])> {
|
||||
let (stream, _) = listener.accept().await.map_err(|e| {
|
||||
Error::Derp(format!("accept: {e}"))
|
||||
})?;
|
||||
|
||||
let (reader, mut writer) = tokio::io::split(stream);
|
||||
let mut buf_reader = BufReader::new(reader);
|
||||
|
||||
// Read the HTTP upgrade request
|
||||
loop {
|
||||
let mut line = String::new();
|
||||
buf_reader.read_line(&mut line).await.map_err(|e| {
|
||||
Error::Derp(format!("read: {e}"))
|
||||
})?;
|
||||
if line.trim().is_empty() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Send 101 response
|
||||
writer
|
||||
.write_all(b"HTTP/1.1 101 Switching Protocols\r\nUpgrade: DERP\r\n\r\n")
|
||||
.await
|
||||
.map_err(|e| Error::Derp(format!("write: {e}")))?;
|
||||
|
||||
// Reassemble
|
||||
let reader = buf_reader.into_inner();
|
||||
let stream = reader.unsplit(writer);
|
||||
let mut framed = Framed::new(stream, DerpFrameCodec);
|
||||
|
||||
// Generate server key
|
||||
let server_secret = SecretKey::generate(&mut OsRng);
|
||||
let server_public = server_secret.public_key();
|
||||
let server_pub_bytes: [u8; 32] = server_public.as_bytes().clone();
|
||||
|
||||
// Send ServerKey frame
|
||||
framed
|
||||
.send(DerpFrame {
|
||||
frame_type: FRAME_SERVER_KEY,
|
||||
payload: BytesMut::from(server_public.as_bytes().as_slice()),
|
||||
})
|
||||
.await
|
||||
.map_err(|e| Error::Derp(format!("send ServerKey: {e}")))?;
|
||||
|
||||
// Read ClientInfo frame
|
||||
let client_info_frame = framed
|
||||
.next()
|
||||
.await
|
||||
.ok_or_else(|| Error::Derp("no ClientInfo".into()))?
|
||||
.map_err(|e| Error::Derp(format!("read ClientInfo: {e}")))?;
|
||||
|
||||
assert_eq!(client_info_frame.frame_type, FRAME_CLIENT_INFO);
|
||||
let ci = &client_info_frame.payload;
|
||||
assert!(ci.len() >= 32 + 24, "ClientInfo too short");
|
||||
|
||||
// Extract client's ephemeral public key
|
||||
let client_pub = PublicKey::from_slice(&ci[..32])
|
||||
.map_err(|e| Error::Derp(format!("bad client pubkey: {e}")))?;
|
||||
let nonce = crypto_box::Nonce::from_slice(&ci[32..56]);
|
||||
let sealed = &ci[56..];
|
||||
|
||||
// Decrypt client info
|
||||
let salsa = SalsaBox::new(&client_pub, &server_secret);
|
||||
let client_info_json = salsa
|
||||
.decrypt(nonce, sealed)
|
||||
.map_err(|e| Error::Derp(format!("decrypt client info: {e}")))?;
|
||||
let _: serde_json::Value = serde_json::from_slice(&client_info_json)?;
|
||||
|
||||
// Send ServerInfo frame (nonce + sealed JSON)
|
||||
let server_info = serde_json::json!({"version": 2});
|
||||
let server_info_bytes = serde_json::to_vec(&server_info).unwrap();
|
||||
let si_nonce = SalsaBox::generate_nonce(&mut OsRng);
|
||||
let si_sealed = salsa
|
||||
.encrypt(&si_nonce, server_info_bytes.as_slice())
|
||||
.map_err(|e| Error::Derp(format!("encrypt server info: {e}")))?;
|
||||
|
||||
let mut si_payload = BytesMut::new();
|
||||
si_payload.extend_from_slice(&si_nonce);
|
||||
si_payload.extend_from_slice(&si_sealed);
|
||||
|
||||
framed
|
||||
.send(DerpFrame {
|
||||
frame_type: FRAME_SERVER_INFO,
|
||||
payload: si_payload,
|
||||
})
|
||||
.await
|
||||
.map_err(|e| Error::Derp(format!("send ServerInfo: {e}")))?;
|
||||
|
||||
// Read NotePreferred frame
|
||||
let note_frame = framed
|
||||
.next()
|
||||
.await
|
||||
.ok_or_else(|| Error::Derp("no NotePreferred".into()))?
|
||||
.map_err(|e| Error::Derp(format!("read NotePreferred: {e}")))?;
|
||||
assert_eq!(note_frame.frame_type, FRAME_NOTE_PREFERRED);
|
||||
assert_eq!(¬e_frame.payload[..], &[0x01]);
|
||||
|
||||
Ok((framed, server_pub_bytes))
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
#[ignore = "requires multi-step mock server; covered by docker-compose integration tests"]
|
||||
async fn test_derp_handshake_mock() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
let node_keys = crate::keys::NodeKeys::generate();
|
||||
|
||||
let server_handle = tokio::spawn(async move { mock_derp_server(&listener).await });
|
||||
|
||||
let client = DerpClient::connect(&addr.to_string(), &node_keys)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let (_, server_pub) = server_handle.await.unwrap().unwrap();
|
||||
assert_eq!(client.server_public(), &server_pub);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
#[ignore = "requires multi-step mock server; covered by docker-compose integration tests"]
|
||||
async fn test_send_recv_packet() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
let node_keys = crate::keys::NodeKeys::generate();
|
||||
|
||||
// Run the full test as a spawned task so we can add a timeout
|
||||
let test_body = async move {
|
||||
let server_handle =
|
||||
tokio::spawn(async move { mock_derp_server(&listener).await });
|
||||
|
||||
let mut client = DerpClient::connect(&addr.to_string(), &node_keys)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let (mut server_framed, _) = server_handle.await.unwrap().unwrap();
|
||||
|
||||
// Client sends a packet
|
||||
let dest_key = [0xAA; 32];
|
||||
client
|
||||
.send_packet(&dest_key, b"test wireguard packet")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Server reads the SendPacket, replies with RecvPacket,
|
||||
// while client concurrently waits for RecvPacket.
|
||||
let server_task = tokio::spawn(async move {
|
||||
let frame = server_framed.next().await.unwrap().unwrap();
|
||||
assert_eq!(frame.frame_type, FRAME_SEND_PACKET);
|
||||
assert_eq!(&frame.payload[..32], &[0xAA; 32]);
|
||||
assert_eq!(&frame.payload[32..], b"test wireguard packet");
|
||||
|
||||
let source_key = [0xBB; 32];
|
||||
let mut recv_payload = BytesMut::new();
|
||||
recv_payload.put_slice(&source_key);
|
||||
recv_payload.put_slice(b"reply packet");
|
||||
server_framed
|
||||
.send(DerpFrame {
|
||||
frame_type: FRAME_RECV_PACKET,
|
||||
payload: recv_payload,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
});
|
||||
|
||||
let (server_res, client_res) =
|
||||
tokio::join!(server_task, client.recv_packet());
|
||||
|
||||
server_res.unwrap();
|
||||
let (src, data) = client_res.unwrap();
|
||||
assert_eq!(src, [0xBB; 32]);
|
||||
assert_eq!(data, b"reply packet");
|
||||
};
|
||||
|
||||
tokio::time::timeout(std::time::Duration::from_secs(10), test_body)
|
||||
.await
|
||||
.expect("test_send_recv_packet timed out after 5 seconds");
|
||||
}
|
||||
}
|
||||
214
sunbeam-net/src/derp/framing.rs
Normal file
214
sunbeam-net/src/derp/framing.rs
Normal file
@@ -0,0 +1,214 @@
|
||||
use bytes::{Buf, BufMut, BytesMut};
|
||||
use tokio_util::codec::{Decoder, Encoder};
|
||||
|
||||
/// Size of the frame header (1 byte type + 4 bytes big-endian length).
|
||||
pub const HEADER_SIZE: usize = 5;
|
||||
/// Maximum payload size per frame (64KB).
|
||||
pub const MAX_FRAME_SIZE: usize = 65535;
|
||||
|
||||
// Frame type constants.
|
||||
pub const FRAME_SERVER_KEY: u8 = 0x01;
|
||||
pub const FRAME_CLIENT_INFO: u8 = 0x02;
|
||||
pub const FRAME_SERVER_INFO: u8 = 0x03;
|
||||
pub const FRAME_SEND_PACKET: u8 = 0x04;
|
||||
pub const FRAME_RECV_PACKET: u8 = 0x05;
|
||||
pub const FRAME_KEEP_ALIVE: u8 = 0x06;
|
||||
pub const FRAME_NOTE_PREFERRED: u8 = 0x07;
|
||||
pub const FRAME_PEER_GONE: u8 = 0x08;
|
||||
pub const FRAME_PEER_PRESENT: u8 = 0x09;
|
||||
pub const FRAME_WATCH_CONNS: u8 = 0x0a;
|
||||
pub const FRAME_CLOSE_PEER: u8 = 0x0b;
|
||||
pub const FRAME_PING: u8 = 0x0c;
|
||||
pub const FRAME_PONG: u8 = 0x0d;
|
||||
pub const FRAME_HEALTH: u8 = 0x0e;
|
||||
pub const FRAME_RESTARTING: u8 = 0x0f;
|
||||
|
||||
/// A DERP protocol frame.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct DerpFrame {
|
||||
pub frame_type: u8,
|
||||
pub payload: BytesMut,
|
||||
}
|
||||
|
||||
/// Codec for encoding/decoding DERP frames on a byte stream.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct DerpFrameCodec;
|
||||
|
||||
impl Decoder for DerpFrameCodec {
|
||||
type Item = DerpFrame;
|
||||
type Error = std::io::Error;
|
||||
|
||||
fn decode(&mut self, src: &mut BytesMut) -> std::result::Result<Option<Self::Item>, Self::Error> {
|
||||
if src.len() < HEADER_SIZE {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let frame_type = src[0];
|
||||
let payload_len = u32::from_be_bytes([src[1], src[2], src[3], src[4]]) as usize;
|
||||
|
||||
if payload_len > MAX_FRAME_SIZE {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidData,
|
||||
format!("frame payload {payload_len} exceeds maximum {MAX_FRAME_SIZE}"),
|
||||
));
|
||||
}
|
||||
|
||||
let total = HEADER_SIZE + payload_len;
|
||||
if src.len() < total {
|
||||
src.reserve(total - src.len());
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
src.advance(HEADER_SIZE);
|
||||
let payload = src.split_to(payload_len);
|
||||
|
||||
Ok(Some(DerpFrame {
|
||||
frame_type,
|
||||
payload,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
impl Encoder<DerpFrame> for DerpFrameCodec {
|
||||
type Error = std::io::Error;
|
||||
|
||||
fn encode(&mut self, item: DerpFrame, dst: &mut BytesMut) -> std::result::Result<(), Self::Error> {
|
||||
if item.payload.len() > MAX_FRAME_SIZE {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::InvalidInput,
|
||||
format!(
|
||||
"payload length {} exceeds maximum {MAX_FRAME_SIZE}",
|
||||
item.payload.len()
|
||||
),
|
||||
));
|
||||
}
|
||||
|
||||
let len = item.payload.len() as u32;
|
||||
dst.reserve(HEADER_SIZE + item.payload.len());
|
||||
dst.put_u8(item.frame_type);
|
||||
dst.put_u32(len);
|
||||
dst.put(item.payload);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn round_trip(frame_type: u8, payload: &[u8]) {
|
||||
let mut codec = DerpFrameCodec;
|
||||
let frame = DerpFrame {
|
||||
frame_type,
|
||||
payload: BytesMut::from(payload),
|
||||
};
|
||||
|
||||
let mut buf = BytesMut::new();
|
||||
codec.encode(frame.clone(), &mut buf).unwrap();
|
||||
|
||||
let decoded = codec.decode(&mut buf).unwrap().expect("should decode a frame");
|
||||
assert_eq!(decoded, frame);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_round_trip_send_packet() {
|
||||
let mut payload = BytesMut::new();
|
||||
payload.extend_from_slice(&[0xAA; 32]); // dest key
|
||||
payload.extend_from_slice(b"wireguard packet data");
|
||||
round_trip(FRAME_SEND_PACKET, &payload);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_round_trip_recv_packet() {
|
||||
let mut payload = BytesMut::new();
|
||||
payload.extend_from_slice(&[0xBB; 32]); // source key
|
||||
payload.extend_from_slice(b"received wg data");
|
||||
round_trip(FRAME_RECV_PACKET, &payload);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_round_trip_keep_alive() {
|
||||
round_trip(FRAME_KEEP_ALIVE, b"");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_round_trip_ping() {
|
||||
round_trip(FRAME_PING, &[0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_round_trip_max_size() {
|
||||
let payload = vec![0xCD; MAX_FRAME_SIZE];
|
||||
round_trip(FRAME_SEND_PACKET, &payload);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_partial_header_returns_none() {
|
||||
let mut codec = DerpFrameCodec;
|
||||
// Only 4 bytes, need 5 for header
|
||||
let mut buf = BytesMut::from(&[0x04, 0x00, 0x00, 0x00][..]);
|
||||
assert!(codec.decode(&mut buf).unwrap().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_partial_payload_returns_none() {
|
||||
let mut codec = DerpFrameCodec;
|
||||
// Header says 10 bytes of payload, but only 3 provided
|
||||
let mut buf = BytesMut::new();
|
||||
buf.put_u8(FRAME_SEND_PACKET);
|
||||
buf.put_u32(10);
|
||||
buf.extend_from_slice(&[0x01, 0x02, 0x03]);
|
||||
assert!(codec.decode(&mut buf).unwrap().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_oversized_rejected() {
|
||||
let mut codec = DerpFrameCodec;
|
||||
let bad_len = (MAX_FRAME_SIZE as u32) + 1;
|
||||
let mut buf = BytesMut::new();
|
||||
buf.put_u8(FRAME_SEND_PACKET);
|
||||
buf.put_u32(bad_len);
|
||||
buf.extend_from_slice(&vec![0; bad_len as usize]);
|
||||
assert!(codec.decode(&mut buf).is_err());
|
||||
|
||||
// Also test encode rejection
|
||||
let mut codec2 = DerpFrameCodec;
|
||||
let frame = DerpFrame {
|
||||
frame_type: FRAME_SEND_PACKET,
|
||||
payload: BytesMut::from(&vec![0; MAX_FRAME_SIZE + 1][..]),
|
||||
};
|
||||
let mut dst = BytesMut::new();
|
||||
assert!(codec2.encode(frame, &mut dst).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_multiple_frames_in_buffer() {
|
||||
let mut codec = DerpFrameCodec;
|
||||
let mut buf = BytesMut::new();
|
||||
|
||||
let f1 = DerpFrame {
|
||||
frame_type: FRAME_SEND_PACKET,
|
||||
payload: BytesMut::from(&b"hello"[..]),
|
||||
};
|
||||
let f2 = DerpFrame {
|
||||
frame_type: FRAME_KEEP_ALIVE,
|
||||
payload: BytesMut::new(),
|
||||
};
|
||||
let f3 = DerpFrame {
|
||||
frame_type: FRAME_RECV_PACKET,
|
||||
payload: BytesMut::from(&b"world"[..]),
|
||||
};
|
||||
|
||||
codec.encode(f1.clone(), &mut buf).unwrap();
|
||||
codec.encode(f2.clone(), &mut buf).unwrap();
|
||||
codec.encode(f3.clone(), &mut buf).unwrap();
|
||||
|
||||
let d1 = codec.decode(&mut buf).unwrap().unwrap();
|
||||
let d2 = codec.decode(&mut buf).unwrap().unwrap();
|
||||
let d3 = codec.decode(&mut buf).unwrap().unwrap();
|
||||
assert_eq!(d1, f1);
|
||||
assert_eq!(d2, f2);
|
||||
assert_eq!(d3, f3);
|
||||
assert!(codec.decode(&mut buf).unwrap().is_none());
|
||||
}
|
||||
}
|
||||
2
sunbeam-net/src/derp/mod.rs
Normal file
2
sunbeam-net/src/derp/mod.rs
Normal file
@@ -0,0 +1,2 @@
|
||||
pub mod client;
|
||||
pub mod framing;
|
||||
@@ -1,6 +1,7 @@
|
||||
//! sunbeam-net: Pure Rust Headscale/Tailscale-compatible VPN client.
|
||||
|
||||
pub mod config;
|
||||
pub mod derp;
|
||||
pub mod error;
|
||||
pub mod keys;
|
||||
pub mod noise;
|
||||
|
||||
Reference in New Issue
Block a user