From 7019937f6f06c1715dbf71287d13d705cf4c704e Mon Sep 17 00:00:00 2001 From: Sienna Meridian Satterwhite Date: Tue, 7 Apr 2026 14:46:47 +0100 Subject: [PATCH] feat(net): real IPC client + working remote shutdown MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DaemonHandle's shutdown_tx (oneshot) is replaced with a CancellationToken shared between the daemon loop and the IPC server. The token is the single source of truth for "should we shut down" — `DaemonHandle::shutdown` cancels it, and an IPC `Stop` request also cancels it. - daemon/state: store the CancellationToken on DaemonHandle and clone it on Clone (so cached IPC handles can still trigger shutdown). - daemon/ipc: IpcServer takes a daemon_shutdown token; `Stop` now cancels it instead of returning Ok and doing nothing. Add IpcClient with `request`, `status`, and `stop` methods so the CLI can drive a backgrounded daemon over the Unix socket. - daemon/lifecycle: thread the token through run_daemon_loop and run_session, pass a clone to IpcServer::new. - lib.rs: re-export IpcClient/IpcCommand/IpcResponse so callers don't have to reach into the daemon module. - src/vpn_cmds.rs: `sunbeam disconnect` now actually talks to the daemon via IpcClient::stop, and `sunbeam vpn status` queries IpcClient::status and prints addresses + peer count + DERP home. --- src/vpn_cmds.rs | 48 +++++++---- sunbeam-net/src/daemon/ipc.rs | 124 ++++++++++++++++++++++++++-- sunbeam-net/src/daemon/lifecycle.rs | 22 +++-- sunbeam-net/src/daemon/mod.rs | 1 + sunbeam-net/src/daemon/state.rs | 19 +++-- sunbeam-net/src/lib.rs | 2 +- 6 files changed, 174 insertions(+), 42 deletions(-) diff --git a/src/vpn_cmds.rs b/src/vpn_cmds.rs index 5ecadb19..8cbe4363 100644 --- a/src/vpn_cmds.rs +++ b/src/vpn_cmds.rs @@ -108,33 +108,49 @@ pub async fn cmd_connect() -> Result<()> { /// Run `sunbeam disconnect` — signal a running daemon via its IPC socket. pub async fn cmd_disconnect() -> Result<()> { - let state_dir = vpn_state_dir()?; - let socket = state_dir.join("daemon.sock"); - if !socket.exists() { + let socket = vpn_state_dir()?.join("daemon.sock"); + let client = sunbeam_net::IpcClient::new(&socket); + if !client.socket_exists() { return Err(SunbeamError::Other( "no running VPN daemon (control socket missing)".into(), )); } - // The daemon's IPC server lives in sunbeam_net::daemon::ipc, but it's - // not currently exported as a client. Until that lands, the canonical - // way to disconnect is to ^C the foreground `sunbeam connect` process. - Err(SunbeamError::Other( - "background daemon control not yet implemented — Ctrl-C the running `sunbeam connect`" - .into(), - )) + step("Asking VPN daemon to stop..."); + client + .stop() + .await + .map_err(|e| SunbeamError::Other(format!("IPC stop: {e}")))?; + ok("Daemon acknowledged shutdown."); + Ok(()) } /// Run `sunbeam vpn status` — query a running daemon's status via IPC. pub async fn cmd_vpn_status() -> Result<()> { - let state_dir = vpn_state_dir()?; - let socket = state_dir.join("daemon.sock"); - if !socket.exists() { + let socket = vpn_state_dir()?.join("daemon.sock"); + let client = sunbeam_net::IpcClient::new(&socket); + if !client.socket_exists() { println!("VPN: not running"); return Ok(()); } - println!("VPN: running (control socket at {})", socket.display()); - // TODO: actually query the IPC socket once the IPC client API is - // exposed from sunbeam-net. + match client.status().await { + Ok(sunbeam_net::DaemonStatus::Running { addresses, peer_count, derp_home }) => { + let addrs: Vec = addresses.iter().map(|a| a.to_string()).collect(); + println!("VPN: running"); + println!(" addresses: {}", addrs.join(", ")); + println!(" peers: {peer_count}"); + if let Some(region) = derp_home { + println!(" derp home: region {region}"); + } + } + Ok(other) => { + println!("VPN: {other}"); + } + Err(e) => { + // Socket exists but daemon isn't actually responding — common + // when the daemon crashed and left a stale socket file behind. + println!("VPN: stale socket at {} ({e})", socket.display()); + } + } Ok(()) } diff --git a/sunbeam-net/src/daemon/ipc.rs b/sunbeam-net/src/daemon/ipc.rs index 043d0cfb..a43f1335 100644 --- a/sunbeam-net/src/daemon/ipc.rs +++ b/sunbeam-net/src/daemon/ipc.rs @@ -10,6 +10,7 @@ use std::path::Path; use std::sync::{Arc, RwLock}; use tokio::net::UnixListener; +use tokio_util::sync::CancellationToken; use super::state::DaemonStatus; @@ -41,16 +42,28 @@ pub enum IpcResponse { pub(crate) struct IpcServer { listener: UnixListener, status: Arc>, + /// Cancellation token shared with the daemon loop. Cancelling this from + /// an IPC `Stop` request triggers graceful shutdown of the whole + /// daemon, the same as `DaemonHandle::shutdown()`. + daemon_shutdown: CancellationToken, } impl IpcServer { /// Bind a new IPC server at the given socket path. - pub fn new(socket_path: &Path, status: Arc>) -> crate::Result { + pub fn new( + socket_path: &Path, + status: Arc>, + daemon_shutdown: CancellationToken, + ) -> crate::Result { // Remove stale socket file if it exists. let _ = std::fs::remove_file(socket_path); let listener = UnixListener::bind(socket_path) .map_err(|e| crate::Error::Io { context: "bind IPC socket".into(), source: e })?; - Ok(Self { listener, status }) + Ok(Self { + listener, + status, + daemon_shutdown, + }) } /// Accept and handle IPC connections until cancelled. @@ -59,8 +72,9 @@ impl IpcServer { let (stream, _) = self.listener.accept().await .map_err(|e| crate::Error::Io { context: "accept IPC".into(), source: e })?; let status = self.status.clone(); + let shutdown = self.daemon_shutdown.clone(); tokio::spawn(async move { - if let Err(e) = handle_ipc_connection(stream, &status).await { + if let Err(e) = handle_ipc_connection(stream, &status, &shutdown).await { tracing::warn!("IPC error: {e}"); } }); @@ -68,9 +82,104 @@ impl IpcServer { } } +/// Client for talking to a running VPN daemon over its Unix control socket. +pub struct IpcClient { + socket_path: std::path::PathBuf, +} + +impl IpcClient { + /// Build a client targeting the given control socket. No connection is + /// established until a request method is called. + pub fn new(socket_path: impl Into) -> Self { + Self { + socket_path: socket_path.into(), + } + } + + /// Returns true if the control socket file exists. (Doesn't prove the + /// daemon is alive — only that something has bound there at some point.) + pub fn socket_exists(&self) -> bool { + self.socket_path.exists() + } + + /// Send a single command and return the response. + pub async fn request(&self, cmd: IpcCommand) -> crate::Result { + use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; + use tokio::net::UnixStream; + + let stream = UnixStream::connect(&self.socket_path).await.map_err(|e| { + crate::Error::Io { + context: format!("connect to {}", self.socket_path.display()), + source: e, + } + })?; + let (reader, mut writer) = stream.into_split(); + + let mut req_bytes = serde_json::to_vec(&cmd)?; + req_bytes.push(b'\n'); + writer + .write_all(&req_bytes) + .await + .map_err(|e| crate::Error::Io { + context: "write IPC request".into(), + source: e, + })?; + writer + .shutdown() + .await + .map_err(|e| crate::Error::Io { + context: "shutdown IPC writer".into(), + source: e, + })?; + + let mut reader = BufReader::new(reader); + let mut line = String::new(); + let n = reader + .read_line(&mut line) + .await + .map_err(|e| crate::Error::Io { + context: "read IPC response".into(), + source: e, + })?; + if n == 0 { + return Err(crate::Error::Ipc( + "daemon closed the connection without responding".into(), + )); + } + + let resp: IpcResponse = serde_json::from_str(line.trim())?; + Ok(resp) + } + + /// Convenience: query the daemon's status. + pub async fn status(&self) -> crate::Result { + match self.request(IpcCommand::Status).await? { + IpcResponse::Status(s) => Ok(s), + IpcResponse::Error(e) => Err(crate::Error::Ipc(e)), + other => Err(crate::Error::Ipc(format!( + "unexpected response to Status: {other:?}" + ))), + } + } + + /// Convenience: tell the daemon to shut down. Returns once the daemon + /// has acknowledged the request — the daemon process may take a moment + /// longer to actually exit. + pub async fn stop(&self) -> crate::Result<()> { + match self.request(IpcCommand::Stop).await? { + IpcResponse::Ok => Ok(()), + IpcResponse::Error(e) => Err(crate::Error::Ipc(e)), + other => Err(crate::Error::Ipc(format!( + "unexpected response to Stop: {other:?}" + ))), + } + } +} + async fn handle_ipc_connection( stream: tokio::net::UnixStream, status: &Arc>, + daemon_shutdown: &CancellationToken, ) -> crate::Result<()> { use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; @@ -88,11 +197,12 @@ async fn handle_ipc_connection( IpcResponse::Status(s.clone()) } IpcCommand::Reconnect => { - // TODO: signal the daemon loop to reconnect + // TODO: implement targeted session reconnect (without dropping + // the daemon). For now treat it the same as a no-op. IpcResponse::Ok } IpcCommand::Stop => { - // TODO: signal the daemon loop to shut down + daemon_shutdown.cancel(); IpcResponse::Ok } }; @@ -122,7 +232,7 @@ mod tests { derp_home: Some(1), })); - let server = IpcServer::new(&sock_path, status).unwrap(); + let server = IpcServer::new(&sock_path, status, CancellationToken::new()).unwrap(); let server_task = tokio::spawn(async move { server.run().await }); // Give the server a moment to start accepting. @@ -155,7 +265,7 @@ mod tests { let sock_path = dir.path().join("test.sock"); let status = Arc::new(RwLock::new(DaemonStatus::Stopped)); - let server = IpcServer::new(&sock_path, status).unwrap(); + let server = IpcServer::new(&sock_path, status, CancellationToken::new()).unwrap(); let server_task = tokio::spawn(async move { server.run().await }); tokio::time::sleep(std::time::Duration::from_millis(50)).await; diff --git a/sunbeam-net/src/daemon/lifecycle.rs b/sunbeam-net/src/daemon/lifecycle.rs index 550f988f..12f6e2a8 100644 --- a/sunbeam-net/src/daemon/lifecycle.rs +++ b/sunbeam-net/src/daemon/lifecycle.rs @@ -3,7 +3,7 @@ use std::sync::{Arc, RwLock}; use std::time::Duration; use smoltcp::wire::IpAddress; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; use crate::config::VpnConfig; use crate::control::MapUpdate; @@ -22,14 +22,17 @@ impl VpnDaemon { /// Returns a handle for status queries and shutdown. pub async fn start(config: VpnConfig) -> crate::Result { let status = Arc::new(RwLock::new(DaemonStatus::Connecting)); - let (shutdown_tx, shutdown_rx) = oneshot::channel(); + // CancellationToken (rather than oneshot) so both DaemonHandle::shutdown + // and the IPC server can trigger it. + let shutdown = tokio_util::sync::CancellationToken::new(); let status_clone = status.clone(); + let shutdown_clone = shutdown.clone(); let join = tokio::spawn(async move { - run_daemon_loop(config, status_clone, shutdown_rx).await + run_daemon_loop(config, status_clone, shutdown_clone).await }); - Ok(DaemonHandle::with_daemon(shutdown_tx, status, join)) + Ok(DaemonHandle::with_daemon(shutdown, status, join)) } } @@ -37,7 +40,7 @@ impl VpnDaemon { async fn run_daemon_loop( config: VpnConfig, status: Arc>, - mut shutdown_rx: oneshot::Receiver<()>, + shutdown: tokio_util::sync::CancellationToken, ) -> crate::Result<()> { let keys = crate::keys::NodeKeys::load_or_generate(&config.state_dir)?; let mut attempt: u32 = 0; @@ -46,12 +49,12 @@ async fn run_daemon_loop( loop { set_status(&status, DaemonStatus::Connecting); - let session = run_session(&config, &keys, &status); + let session = run_session(&config, &keys, &status, &shutdown); tokio::pin!(session); let session_result = tokio::select! { biased; - _ = &mut shutdown_rx => { + _ = shutdown.cancelled() => { set_status(&status, DaemonStatus::Stopped); return Ok(()); } @@ -69,7 +72,7 @@ async fn run_daemon_loop( tokio::select! { _ = tokio::time::sleep(delay) => continue, - _ = &mut shutdown_rx => { + _ = shutdown.cancelled() => { set_status(&status, DaemonStatus::Stopped); return Ok(()); } @@ -88,6 +91,7 @@ async fn run_session( config: &VpnConfig, keys: &crate::keys::NodeKeys, status: &Arc>, + daemon_shutdown: &tokio_util::sync::CancellationToken, ) -> std::result::Result { // 1. Connect to coordination server set_status(status, DaemonStatus::Connecting); @@ -233,7 +237,7 @@ async fn run_session( }); // 11. Start IPC server - let ipc = IpcServer::new(&config.control_socket, status.clone())?; + let ipc = IpcServer::new(&config.control_socket, status.clone(), daemon_shutdown.clone())?; // Mark as ready let derp_home = derp_map.as_ref() diff --git a/sunbeam-net/src/daemon/mod.rs b/sunbeam-net/src/daemon/mod.rs index 6e113872..a89b6a5f 100644 --- a/sunbeam-net/src/daemon/mod.rs +++ b/sunbeam-net/src/daemon/mod.rs @@ -2,5 +2,6 @@ pub mod ipc; pub mod lifecycle; pub mod state; +pub use ipc::{IpcClient, IpcCommand, IpcResponse}; pub use lifecycle::VpnDaemon; pub use state::{DaemonHandle, DaemonStatus}; diff --git a/sunbeam-net/src/daemon/state.rs b/sunbeam-net/src/daemon/state.rs index 953df3b7..0968c929 100644 --- a/sunbeam-net/src/daemon/state.rs +++ b/sunbeam-net/src/daemon/state.rs @@ -2,8 +2,8 @@ use std::net::IpAddr; use std::path::PathBuf; use std::sync::{Arc, RwLock}; -use tokio::sync::oneshot; use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; /// Operational status of the VPN daemon. #[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] @@ -66,8 +66,9 @@ pub struct DaemonHandle { pub control_socket: PathBuf, /// Cached status (for IPC-based handles). pub status: DaemonStatus, - /// Shutdown signal sender (for in-process handles). - shutdown_tx: Option>, + /// Cancellation token shared with the daemon loop and the IPC server. + /// `shutdown()` cancels it; an IPC `Stop` command also cancels it. + shutdown: Option, /// Shared live status (for in-process handles). live_status: Option>>, /// Background task join handle. @@ -88,7 +89,7 @@ impl Clone for DaemonHandle { Self { control_socket: self.control_socket.clone(), status: self.status.clone(), - shutdown_tx: None, + shutdown: self.shutdown.clone(), live_status: self.live_status.clone(), join: None, } @@ -101,7 +102,7 @@ impl DaemonHandle { Self { control_socket, status: DaemonStatus::Stopped, - shutdown_tx: None, + shutdown: None, live_status: None, join: None, } @@ -109,14 +110,14 @@ impl DaemonHandle { /// Create a handle for an in-process daemon with shutdown and status tracking. pub(crate) fn with_daemon( - shutdown_tx: oneshot::Sender<()>, + shutdown: CancellationToken, status: Arc>, join: JoinHandle>, ) -> Self { Self { control_socket: PathBuf::new(), status: DaemonStatus::Starting, - shutdown_tx: Some(shutdown_tx), + shutdown: Some(shutdown), live_status: Some(status), join: Some(join), } @@ -134,8 +135,8 @@ impl DaemonHandle { /// Signal the daemon to shut down and wait for it to finish. pub async fn shutdown(self) -> crate::Result<()> { - if let Some(tx) = self.shutdown_tx { - let _ = tx.send(()); + if let Some(token) = self.shutdown { + token.cancel(); } if let Some(join) = self.join { match join.await { diff --git a/sunbeam-net/src/lib.rs b/sunbeam-net/src/lib.rs index 94c6ac4d..0a9d666c 100644 --- a/sunbeam-net/src/lib.rs +++ b/sunbeam-net/src/lib.rs @@ -12,5 +12,5 @@ pub mod wg; pub(crate) mod proto; pub use config::VpnConfig; -pub use daemon::{DaemonHandle, DaemonStatus, VpnDaemon}; +pub use daemon::{DaemonHandle, DaemonStatus, IpcClient, IpcCommand, IpcResponse, VpnDaemon}; pub use error::{Error, Result};