feat: Add spawn/delete commands, fix session state and entity broadcast
- marathonctl now supports spawn/delete entity commands - Fixed session state bug (was transitioning to Left every 5s) - Fixed entity broadcast to detect Added<NetworkedEntity> - Added AppCommandQueue pattern for app-level control commands References: #131, #132
This commit is contained in:
@@ -19,7 +19,7 @@ use clap::{Parser, Subcommand};
|
||||
use std::io::{Read, Write};
|
||||
use std::os::unix::net::UnixStream;
|
||||
|
||||
use libmarathon::networking::{ControlCommand, ControlResponse, SessionId};
|
||||
use libmarathon::networking::{ControlCommand, ControlResponse};
|
||||
|
||||
/// Marathon control CLI
|
||||
#[derive(Parser, Debug)]
|
||||
@@ -51,6 +51,25 @@ enum Commands {
|
||||
},
|
||||
/// Broadcast a ping message
|
||||
Ping,
|
||||
/// Spawn an entity
|
||||
Spawn {
|
||||
/// Entity type (e.g., "cube")
|
||||
entity_type: String,
|
||||
/// X position
|
||||
#[arg(short, long, default_value = "0.0")]
|
||||
x: f32,
|
||||
/// Y position
|
||||
#[arg(short, long, default_value = "0.0")]
|
||||
y: f32,
|
||||
/// Z position
|
||||
#[arg(short, long, default_value = "0.0")]
|
||||
z: f32,
|
||||
},
|
||||
/// Delete an entity by UUID
|
||||
Delete {
|
||||
/// Entity UUID
|
||||
entity_id: String,
|
||||
},
|
||||
}
|
||||
|
||||
fn main() {
|
||||
@@ -75,6 +94,22 @@ fn main() {
|
||||
},
|
||||
}
|
||||
}
|
||||
Commands::Spawn { entity_type, x, y, z } => {
|
||||
ControlCommand::SpawnEntity {
|
||||
entity_type,
|
||||
position: [x, y, z],
|
||||
}
|
||||
}
|
||||
Commands::Delete { entity_id } => {
|
||||
use uuid::Uuid;
|
||||
match Uuid::parse_str(&entity_id) {
|
||||
Ok(uuid) => ControlCommand::DeleteEntity { entity_id: uuid },
|
||||
Err(e) => {
|
||||
eprintln!("Invalid UUID '{}': {}", entity_id, e);
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Connect to Unix socket
|
||||
@@ -135,8 +170,6 @@ fn receive_response(stream: &mut UnixStream) -> Result<ControlResponse, Box<dyn
|
||||
}
|
||||
|
||||
fn print_response(response: ControlResponse) {
|
||||
use libmarathon::networking::{SessionInfo, PeerInfo};
|
||||
|
||||
match response {
|
||||
ControlResponse::Status {
|
||||
node_id,
|
||||
|
||||
@@ -6,27 +6,90 @@
|
||||
|
||||
use anyhow::Result;
|
||||
use bevy::prelude::*;
|
||||
use crossbeam_channel::{Receiver, Sender, unbounded};
|
||||
use libmarathon::{
|
||||
engine::{EngineBridge, EngineCommand},
|
||||
networking::{ControlCommand, ControlResponse, SessionId},
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Resource holding the control socket path
|
||||
#[derive(Resource)]
|
||||
pub struct ControlSocketPath(pub String);
|
||||
|
||||
pub fn cleanup_control_socket(
|
||||
mut exit_events: MessageReader<bevy::app::AppExit>,
|
||||
socket_path: Option<Res<ControlSocketPath>>,
|
||||
) {
|
||||
for _ in exit_events.read() {
|
||||
if let Some(ref path) = socket_path {
|
||||
info!("Cleaning up control socket at {}", path.0);
|
||||
let _ = std::fs::remove_file(&path.0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Commands that can be sent from the control socket to the app
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum AppCommand {
|
||||
SpawnEntity {
|
||||
entity_type: String,
|
||||
position: Vec3,
|
||||
},
|
||||
DeleteEntity {
|
||||
entity_id: Uuid,
|
||||
},
|
||||
}
|
||||
|
||||
/// Queue for app-level commands from control socket
|
||||
#[derive(Resource, Clone)]
|
||||
pub struct AppCommandQueue {
|
||||
sender: Sender<AppCommand>,
|
||||
receiver: Receiver<AppCommand>,
|
||||
}
|
||||
|
||||
impl AppCommandQueue {
|
||||
pub fn new() -> Self {
|
||||
let (sender, receiver) = unbounded();
|
||||
Self { sender, receiver }
|
||||
}
|
||||
|
||||
pub fn send(&self, command: AppCommand) {
|
||||
let _ = self.sender.send(command);
|
||||
}
|
||||
|
||||
pub fn try_recv(&self) -> Option<AppCommand> {
|
||||
self.receiver.try_recv().ok()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for AppCommandQueue {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
/// Startup system to launch the control socket server
|
||||
#[cfg(not(target_os = "ios"))]
|
||||
#[cfg(debug_assertions)]
|
||||
pub fn start_control_socket_system(socket_path_res: Res<ControlSocketPath>, bridge: Res<EngineBridge>) {
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
pub fn start_control_socket_system(
|
||||
mut commands: Commands,
|
||||
socket_path_res: Res<ControlSocketPath>,
|
||||
bridge: Res<EngineBridge>,
|
||||
) {
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::net::UnixListener;
|
||||
|
||||
let socket_path = socket_path_res.0.clone();
|
||||
info!("Starting control socket at {}", socket_path);
|
||||
|
||||
// Clone bridge for the async task
|
||||
// Create app command queue
|
||||
let app_queue = AppCommandQueue::new();
|
||||
commands.insert_resource(app_queue.clone());
|
||||
|
||||
// Clone bridge and queue for the async task
|
||||
let bridge = bridge.clone();
|
||||
let queue = app_queue;
|
||||
|
||||
// Spawn tokio runtime in background thread
|
||||
std::thread::spawn(move || {
|
||||
@@ -52,6 +115,7 @@ pub fn start_control_socket_system(socket_path_res: Res<ControlSocketPath>, brid
|
||||
Ok((mut stream, _addr)) => {
|
||||
let bridge = bridge.clone();
|
||||
|
||||
let queue_clone = queue.clone();
|
||||
tokio::spawn(async move {
|
||||
// Read command length
|
||||
let mut len_buf = [0u8; 4];
|
||||
@@ -84,7 +148,7 @@ pub fn start_control_socket_system(socket_path_res: Res<ControlSocketPath>, brid
|
||||
info!("Received control command: {:?}", command);
|
||||
|
||||
// Handle command
|
||||
let response = handle_command(command, &bridge).await;
|
||||
let response = handle_command(command, &bridge, &queue_clone).await;
|
||||
|
||||
// Send response
|
||||
if let Err(e) = send_response(&mut stream, response).await {
|
||||
@@ -104,7 +168,11 @@ pub fn start_control_socket_system(socket_path_res: Res<ControlSocketPath>, brid
|
||||
/// Handle a control command and generate a response
|
||||
#[cfg(not(target_os = "ios"))]
|
||||
#[cfg(debug_assertions)]
|
||||
async fn handle_command(command: ControlCommand, bridge: &EngineBridge) -> ControlResponse {
|
||||
async fn handle_command(
|
||||
command: ControlCommand,
|
||||
bridge: &EngineBridge,
|
||||
app_queue: &AppCommandQueue,
|
||||
) -> ControlResponse {
|
||||
match command {
|
||||
ControlCommand::JoinSession { session_code } => {
|
||||
match SessionId::from_code(&session_code) {
|
||||
@@ -129,12 +197,58 @@ async fn handle_command(command: ControlCommand, bridge: &EngineBridge) -> Contr
|
||||
}
|
||||
}
|
||||
|
||||
ControlCommand::SpawnEntity { entity_type, position } => {
|
||||
app_queue.send(AppCommand::SpawnEntity {
|
||||
entity_type,
|
||||
position: Vec3::from_array(position),
|
||||
});
|
||||
ControlResponse::Ok {
|
||||
message: "Entity spawn command queued".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
ControlCommand::DeleteEntity { entity_id } => {
|
||||
app_queue.send(AppCommand::DeleteEntity { entity_id });
|
||||
ControlResponse::Ok {
|
||||
message: format!("Entity delete command queued for {}", entity_id),
|
||||
}
|
||||
}
|
||||
|
||||
_ => ControlResponse::Error {
|
||||
error: format!("Command {:?} not yet implemented", command),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// System to process app commands from the control socket
|
||||
pub fn process_app_commands(
|
||||
queue: Option<Res<AppCommandQueue>>,
|
||||
mut spawn_cube_writer: MessageWriter<crate::cube::SpawnCubeEvent>,
|
||||
mut delete_cube_writer: MessageWriter<crate::cube::DeleteCubeEvent>,
|
||||
) {
|
||||
let Some(queue) = queue else { return };
|
||||
|
||||
while let Some(command) = queue.try_recv() {
|
||||
match command {
|
||||
AppCommand::SpawnEntity { entity_type, position } => {
|
||||
match entity_type.as_str() {
|
||||
"cube" => {
|
||||
info!("Spawning cube at {:?}", position);
|
||||
spawn_cube_writer.write(crate::cube::SpawnCubeEvent { position });
|
||||
}
|
||||
_ => {
|
||||
warn!("Unknown entity type: {}", entity_type);
|
||||
}
|
||||
}
|
||||
}
|
||||
AppCommand::DeleteEntity { entity_id } => {
|
||||
info!("Deleting entity {}", entity_id);
|
||||
delete_cube_writer.write(crate::cube::DeleteCubeEvent { entity_id });
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Send a response back through the Unix socket
|
||||
#[cfg(not(target_os = "ios"))]
|
||||
#[cfg(debug_assertions)]
|
||||
|
||||
@@ -205,6 +205,7 @@ fn main() {
|
||||
// Insert control socket path as resource
|
||||
app.insert_resource(control::ControlSocketPath(args.control_socket.clone()));
|
||||
app.add_systems(Startup, control::start_control_socket_system);
|
||||
app.add_systems(Update, (control::process_app_commands, control::cleanup_control_socket));
|
||||
|
||||
// Rendering-only plugins
|
||||
#[cfg(not(feature = "headless"))]
|
||||
|
||||
253
crates/app/src/setup/control_socket.rs
Normal file
253
crates/app/src/setup/control_socket.rs
Normal file
@@ -0,0 +1,253 @@
|
||||
//! Unix domain socket control server for remote engine control
|
||||
//!
|
||||
//! This module provides a Unix socket server for controlling the engine
|
||||
//! programmatically without needing screen access or network ports.
|
||||
//!
|
||||
//! # Security
|
||||
//!
|
||||
//! Currently debug-only. See issue #135 for production security requirements.
|
||||
|
||||
use anyhow::Result;
|
||||
use bevy::prelude::*;
|
||||
use libmarathon::networking::{ControlCommand, ControlResponse, GossipBridge, SessionId};
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Spawn Unix domain socket control server for remote engine control
|
||||
///
|
||||
/// This spawns a tokio task that listens on a Unix socket for control commands.
|
||||
/// The socket path is `/tmp/marathon-{session_id}.sock`.
|
||||
///
|
||||
/// **Security Note**: This is currently debug-only. See issue #135 for production
|
||||
/// security requirements (authentication, rate limiting, etc.).
|
||||
///
|
||||
/// # Platform Support
|
||||
///
|
||||
/// This function is only compiled on non-iOS platforms.
|
||||
#[cfg(not(target_os = "ios"))]
|
||||
#[cfg(debug_assertions)]
|
||||
pub fn spawn_control_socket(session_id: SessionId, bridge: GossipBridge, node_id: Uuid) {
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::net::UnixListener;
|
||||
|
||||
let socket_path = format!("/tmp/marathon-{}.sock", session_id);
|
||||
|
||||
tokio::spawn(async move {
|
||||
// Clean up any existing socket
|
||||
let _ = std::fs::remove_file(&socket_path);
|
||||
|
||||
let listener = match UnixListener::bind(&socket_path) {
|
||||
Ok(l) => {
|
||||
info!("Control socket listening at {}", socket_path);
|
||||
l
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to bind control socket at {}: {}", socket_path, e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Accept connections in a loop
|
||||
loop {
|
||||
match listener.accept().await {
|
||||
Ok((mut stream, _addr)) => {
|
||||
let bridge = bridge.clone();
|
||||
let session_id = session_id.clone();
|
||||
|
||||
// Spawn a task to handle this connection
|
||||
tokio::spawn(async move {
|
||||
// Read command length (4 bytes)
|
||||
let mut len_buf = [0u8; 4];
|
||||
if let Err(e) = stream.read_exact(&mut len_buf).await {
|
||||
error!("Failed to read command length: {}", e);
|
||||
return;
|
||||
}
|
||||
let len = u32::from_le_bytes(len_buf) as usize;
|
||||
|
||||
// Read command bytes
|
||||
let mut cmd_buf = vec![0u8; len];
|
||||
if let Err(e) = stream.read_exact(&mut cmd_buf).await {
|
||||
error!("Failed to read command: {}", e);
|
||||
return;
|
||||
}
|
||||
|
||||
// Deserialize command
|
||||
let command = match ControlCommand::from_bytes(&cmd_buf) {
|
||||
Ok(cmd) => cmd,
|
||||
Err(e) => {
|
||||
error!("Failed to deserialize command: {}", e);
|
||||
let response = ControlResponse::Error {
|
||||
error: format!("Failed to deserialize command: {}", e),
|
||||
};
|
||||
let _ = send_response(&mut stream, response).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
info!("Received control command: {:?}", command);
|
||||
|
||||
// Execute command
|
||||
let response = handle_control_command(command, &bridge, session_id, node_id).await;
|
||||
|
||||
// Send response
|
||||
if let Err(e) = send_response(&mut stream, response).await {
|
||||
error!("Failed to send response: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to accept control socket connection: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Handle a control command and return a response
|
||||
#[cfg(not(target_os = "ios"))]
|
||||
#[cfg(debug_assertions)]
|
||||
async fn handle_control_command(
|
||||
command: ControlCommand,
|
||||
bridge: &GossipBridge,
|
||||
session_id: SessionId,
|
||||
node_id: Uuid,
|
||||
) -> ControlResponse {
|
||||
match command {
|
||||
ControlCommand::GetStatus => {
|
||||
// Get queue sizes from bridge
|
||||
let outgoing_size = bridge.try_recv_outgoing().map(|msg| {
|
||||
// Put it back
|
||||
let _ = bridge.send(msg);
|
||||
1
|
||||
}).unwrap_or(0);
|
||||
|
||||
ControlResponse::Status {
|
||||
node_id,
|
||||
session_id,
|
||||
outgoing_queue_size: outgoing_size,
|
||||
incoming_queue_size: 0, // We'd need to peek without consuming
|
||||
connected_peers: None, // Not easily available from bridge
|
||||
}
|
||||
}
|
||||
ControlCommand::SendTestMessage { content } => {
|
||||
use libmarathon::networking::{VersionedMessage, VectorClock, SyncMessage};
|
||||
|
||||
// Send a SyncRequest as a test message (lightweight ping-like message)
|
||||
let message = SyncMessage::SyncRequest {
|
||||
node_id,
|
||||
vector_clock: VectorClock::new(),
|
||||
};
|
||||
let versioned = VersionedMessage::new(message);
|
||||
|
||||
match bridge.send(versioned) {
|
||||
Ok(_) => ControlResponse::Ok {
|
||||
message: format!("Sent test message: {}", content),
|
||||
},
|
||||
Err(e) => ControlResponse::Error {
|
||||
error: format!("Failed to send: {}", e),
|
||||
},
|
||||
}
|
||||
}
|
||||
ControlCommand::InjectMessage { message } => {
|
||||
match bridge.push_incoming(message) {
|
||||
Ok(_) => ControlResponse::Ok {
|
||||
message: "Message injected into incoming queue".to_string(),
|
||||
},
|
||||
Err(e) => ControlResponse::Error {
|
||||
error: format!("Failed to inject message: {}", e),
|
||||
},
|
||||
}
|
||||
}
|
||||
ControlCommand::BroadcastMessage { message } => {
|
||||
use libmarathon::networking::VersionedMessage;
|
||||
|
||||
let versioned = VersionedMessage::new(message);
|
||||
match bridge.send(versioned) {
|
||||
Ok(_) => ControlResponse::Ok {
|
||||
message: "Message broadcast".to_string(),
|
||||
},
|
||||
Err(e) => ControlResponse::Error {
|
||||
error: format!("Failed to broadcast: {}", e),
|
||||
},
|
||||
}
|
||||
}
|
||||
ControlCommand::Shutdown => {
|
||||
warn!("Shutdown command received via control socket");
|
||||
ControlResponse::Ok {
|
||||
message: "Shutdown not yet implemented".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
// Session lifecycle commands (TODO: implement these properly)
|
||||
ControlCommand::JoinSession { session_code } => {
|
||||
ControlResponse::Error {
|
||||
error: format!("JoinSession not yet implemented (requested: {})", session_code),
|
||||
}
|
||||
}
|
||||
ControlCommand::LeaveSession => {
|
||||
ControlResponse::Error {
|
||||
error: "LeaveSession not yet implemented".to_string(),
|
||||
}
|
||||
}
|
||||
ControlCommand::GetSessionInfo => {
|
||||
ControlResponse::Error {
|
||||
error: "GetSessionInfo not yet implemented".to_string(),
|
||||
}
|
||||
}
|
||||
ControlCommand::ListSessions => {
|
||||
ControlResponse::Error {
|
||||
error: "ListSessions not yet implemented".to_string(),
|
||||
}
|
||||
}
|
||||
ControlCommand::DeleteSession { session_code } => {
|
||||
ControlResponse::Error {
|
||||
error: format!("DeleteSession not yet implemented (requested: {})", session_code),
|
||||
}
|
||||
}
|
||||
ControlCommand::ListPeers => {
|
||||
ControlResponse::Error {
|
||||
error: "ListPeers not yet implemented".to_string(),
|
||||
}
|
||||
}
|
||||
ControlCommand::SpawnEntity { .. } => {
|
||||
ControlResponse::Error {
|
||||
error: "SpawnEntity not available on session-level socket. Use app-level socket.".to_string(),
|
||||
}
|
||||
}
|
||||
ControlCommand::DeleteEntity { .. } => {
|
||||
ControlResponse::Error {
|
||||
error: "DeleteEntity not available on session-level socket. Use app-level socket.".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Send a response back through the Unix socket
|
||||
#[cfg(not(target_os = "ios"))]
|
||||
#[cfg(debug_assertions)]
|
||||
async fn send_response(
|
||||
stream: &mut tokio::net::UnixStream,
|
||||
response: ControlResponse,
|
||||
) -> Result<()> {
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
||||
let bytes = response.to_bytes()?;
|
||||
let len = bytes.len() as u32;
|
||||
|
||||
// Write length prefix
|
||||
stream.write_all(&len.to_le_bytes()).await?;
|
||||
// Write response bytes
|
||||
stream.write_all(&bytes).await?;
|
||||
stream.flush().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// No-op stub for iOS builds
|
||||
#[cfg(target_os = "ios")]
|
||||
pub fn spawn_control_socket(_session_id: SessionId, _bridge: GossipBridge, _node_id: Uuid) {}
|
||||
|
||||
// No-op stub for release builds
|
||||
#[cfg(all(not(target_os = "ios"), not(debug_assertions)))]
|
||||
pub fn spawn_control_socket(_session_id: SessionId, _bridge: GossipBridge, _node_id: Uuid) {
|
||||
// TODO(#135): Implement secure control socket for release builds with authentication
|
||||
}
|
||||
@@ -47,11 +47,15 @@
|
||||
//! 2. **Tokio → Bevy**: GossipBridge's internal queue (push_incoming)
|
||||
//! 3. **Thread handoff**: crossbeam_channel (one-time GossipBridge transfer)
|
||||
|
||||
mod control_socket;
|
||||
|
||||
use anyhow::Result;
|
||||
use bevy::prelude::*;
|
||||
use libmarathon::networking::{GossipBridge, SessionId};
|
||||
use uuid::Uuid;
|
||||
|
||||
use control_socket::spawn_control_socket;
|
||||
|
||||
/// Session ID to use for network initialization
|
||||
///
|
||||
/// This resource must be inserted before setup_gossip_networking runs.
|
||||
@@ -222,11 +226,12 @@ async fn init_gossip(session_id: SessionId) -> Result<GossipBridge> {
|
||||
let (sender, mut receiver) = subscribe_handle.split();
|
||||
|
||||
// Wait for join (with timeout since we might be the first node)
|
||||
info!("Waiting for gossip join...");
|
||||
match tokio::time::timeout(std::time::Duration::from_secs(2), receiver.joined()).await {
|
||||
| Ok(Ok(())) => info!("Joined gossip swarm"),
|
||||
// Increased timeout to 10s to allow mDNS discovery to work
|
||||
info!("Waiting for gossip join (10s timeout for mDNS discovery)...");
|
||||
match tokio::time::timeout(std::time::Duration::from_secs(10), receiver.joined()).await {
|
||||
| Ok(Ok(())) => info!("Joined gossip swarm successfully"),
|
||||
| Ok(Err(e)) => warn!("Join error: {} (proceeding anyway)", e),
|
||||
| Err(_) => info!("Join timeout (first node in swarm)"),
|
||||
| Err(_) => info!("Join timeout - likely first node in swarm (proceeding anyway)"),
|
||||
}
|
||||
|
||||
// Create bridge
|
||||
@@ -236,6 +241,9 @@ async fn init_gossip(session_id: SessionId) -> Result<GossipBridge> {
|
||||
// Spawn forwarding tasks - pass endpoint, router, gossip to keep them alive
|
||||
spawn_bridge_tasks(sender, receiver, bridge.clone(), endpoint, router, gossip);
|
||||
|
||||
// Spawn control socket server for remote control (debug only)
|
||||
spawn_control_socket(session_id, bridge.clone(), node_id);
|
||||
|
||||
Ok(bridge)
|
||||
}
|
||||
|
||||
@@ -301,14 +309,26 @@ fn spawn_bridge_tasks(
|
||||
loop {
|
||||
match tokio::time::timeout(Duration::from_millis(100), receiver.next()).await {
|
||||
| Ok(Some(Ok(event))) => {
|
||||
if let iroh_gossip::api::Event::Received(msg) = event {
|
||||
if let Ok(versioned_msg) =
|
||||
rkyv::from_bytes::<VersionedMessage, rkyv::rancor::Failure>(&msg.content)
|
||||
{
|
||||
if let Err(e) = bridge_in.push_incoming(versioned_msg) {
|
||||
error!("[Node {}] Push incoming failed: {}", node_id, e);
|
||||
match event {
|
||||
| iroh_gossip::api::Event::Received(msg) => {
|
||||
info!("[Node {}] Received message from gossip", node_id);
|
||||
if let Ok(versioned_msg) =
|
||||
rkyv::from_bytes::<VersionedMessage, rkyv::rancor::Failure>(&msg.content)
|
||||
{
|
||||
if let Err(e) = bridge_in.push_incoming(versioned_msg) {
|
||||
error!("[Node {}] Push incoming failed: {}", node_id, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
| iroh_gossip::api::Event::NeighborUp(peer_id) => {
|
||||
info!("[Node {}] Peer connected: {}", node_id, peer_id);
|
||||
},
|
||||
| iroh_gossip::api::Event::NeighborDown(peer_id) => {
|
||||
warn!("[Node {}] Peer disconnected: {}", node_id, peer_id);
|
||||
},
|
||||
| iroh_gossip::api::Event::Lagged => {
|
||||
warn!("[Node {}] Event stream lagged - some events may have been missed", node_id);
|
||||
},
|
||||
}
|
||||
},
|
||||
| Ok(Some(Err(e))) => error!("[Node {}] Receiver error: {}", node_id, e),
|
||||
170
crates/libmarathon/src/networking/control.rs
Normal file
170
crates/libmarathon/src/networking/control.rs
Normal file
@@ -0,0 +1,170 @@
|
||||
//! Control socket protocol for remote engine control
|
||||
//!
|
||||
//! This module defines the message protocol for controlling the engine via
|
||||
//! Unix domain sockets without exposing network ports. Used for testing,
|
||||
//! validation, and programmatic control of sessions.
|
||||
//!
|
||||
//! # Security
|
||||
//!
|
||||
//! Currently debug-only. See issue #135 for production security requirements.
|
||||
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::networking::{
|
||||
SessionId,
|
||||
SessionState,
|
||||
SyncMessage,
|
||||
VersionedMessage,
|
||||
};
|
||||
|
||||
/// Control command sent to the engine
|
||||
#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
|
||||
pub enum ControlCommand {
|
||||
/// Get current session status
|
||||
GetStatus,
|
||||
|
||||
/// Send a test message through gossip
|
||||
SendTestMessage { content: String },
|
||||
|
||||
/// Inject a message directly into the incoming queue (for testing)
|
||||
InjectMessage { message: VersionedMessage },
|
||||
|
||||
/// Broadcast a full sync message through gossip
|
||||
BroadcastMessage { message: SyncMessage },
|
||||
|
||||
/// Request graceful shutdown
|
||||
Shutdown,
|
||||
|
||||
// Session lifecycle commands
|
||||
|
||||
/// Join a specific session by code
|
||||
JoinSession { session_code: String },
|
||||
|
||||
/// Leave the current session gracefully
|
||||
LeaveSession,
|
||||
|
||||
/// Get detailed current session information
|
||||
GetSessionInfo,
|
||||
|
||||
/// List all sessions in the database
|
||||
ListSessions,
|
||||
|
||||
/// Delete a session from the database
|
||||
DeleteSession { session_code: String },
|
||||
|
||||
/// Get list of connected peers in current session
|
||||
ListPeers,
|
||||
|
||||
// Entity commands
|
||||
|
||||
/// Spawn an entity with a given type and position
|
||||
SpawnEntity {
|
||||
entity_type: String,
|
||||
position: [f32; 3],
|
||||
},
|
||||
|
||||
/// Delete an entity by its UUID
|
||||
DeleteEntity { entity_id: Uuid },
|
||||
}
|
||||
|
||||
/// Detailed session information
|
||||
#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
|
||||
pub struct SessionInfo {
|
||||
pub session_id: SessionId,
|
||||
pub session_name: Option<String>,
|
||||
pub state: SessionState,
|
||||
pub created_at: i64,
|
||||
pub last_active: i64,
|
||||
pub entity_count: usize,
|
||||
}
|
||||
|
||||
/// Peer information
|
||||
#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
|
||||
pub struct PeerInfo {
|
||||
pub node_id: Uuid,
|
||||
pub connected_since: Option<i64>,
|
||||
}
|
||||
|
||||
/// Response from the engine to a control command
|
||||
#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
|
||||
pub enum ControlResponse {
|
||||
/// Session status information
|
||||
Status {
|
||||
node_id: Uuid,
|
||||
session_id: SessionId,
|
||||
outgoing_queue_size: usize,
|
||||
incoming_queue_size: usize,
|
||||
/// Number of connected peers (if available from gossip)
|
||||
connected_peers: Option<usize>,
|
||||
},
|
||||
|
||||
/// Detailed session information
|
||||
SessionInfo(SessionInfo),
|
||||
|
||||
/// List of sessions
|
||||
Sessions(Vec<SessionInfo>),
|
||||
|
||||
/// List of connected peers
|
||||
Peers(Vec<PeerInfo>),
|
||||
|
||||
/// Acknowledgment of command execution
|
||||
Ok { message: String },
|
||||
|
||||
/// Error occurred during command execution
|
||||
Error { error: String },
|
||||
}
|
||||
|
||||
impl ControlCommand {
|
||||
/// Serialize a command to bytes using rkyv
|
||||
pub fn to_bytes(&self) -> Result<Vec<u8>, rkyv::rancor::Error> {
|
||||
rkyv::to_bytes::<rkyv::rancor::Error>(self).map(|b| b.to_vec())
|
||||
}
|
||||
|
||||
/// Deserialize a command from bytes using rkyv
|
||||
pub fn from_bytes(bytes: &[u8]) -> Result<Self, rkyv::rancor::Error> {
|
||||
rkyv::from_bytes::<Self, rkyv::rancor::Error>(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
impl ControlResponse {
|
||||
/// Serialize a response to bytes using rkyv
|
||||
pub fn to_bytes(&self) -> Result<Vec<u8>, rkyv::rancor::Error> {
|
||||
rkyv::to_bytes::<rkyv::rancor::Error>(self).map(|b| b.to_vec())
|
||||
}
|
||||
|
||||
/// Deserialize a response from bytes using rkyv
|
||||
pub fn from_bytes(bytes: &[u8]) -> Result<Self, rkyv::rancor::Error> {
|
||||
rkyv::from_bytes::<Self, rkyv::rancor::Error>(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_command_roundtrip() {
|
||||
let cmd = ControlCommand::GetStatus;
|
||||
let bytes = cmd.to_bytes().unwrap();
|
||||
let decoded = ControlCommand::from_bytes(&bytes).unwrap();
|
||||
|
||||
match decoded {
|
||||
| ControlCommand::GetStatus => {},
|
||||
| _ => panic!("Failed to decode GetStatus"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_response_roundtrip() {
|
||||
let resp = ControlResponse::Ok {
|
||||
message: "Test".to_string(),
|
||||
};
|
||||
let bytes = resp.to_bytes().unwrap();
|
||||
let decoded = ControlResponse::from_bytes(&bytes).unwrap();
|
||||
|
||||
match decoded {
|
||||
| ControlResponse::Ok { message } => assert_eq!(message, "Test"),
|
||||
| _ => panic!("Failed to decode Ok response"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -52,7 +52,7 @@ impl NodeVectorClock {
|
||||
/// System to generate and broadcast EntityDelta messages
|
||||
///
|
||||
/// This system:
|
||||
/// 1. Queries for Changed<NetworkedEntity>
|
||||
/// 1. Queries for Added<NetworkedEntity> or Changed<NetworkedEntity>
|
||||
/// 2. Serializes all components on those entities
|
||||
/// 3. Builds EntityDelta messages
|
||||
/// 4. Broadcasts via GossipBridge
|
||||
@@ -73,7 +73,7 @@ pub fn generate_delta_system(world: &mut World) {
|
||||
|
||||
let changed_entities: Vec<(Entity, uuid::Uuid, uuid::Uuid)> = {
|
||||
let mut query =
|
||||
world.query_filtered::<(Entity, &NetworkedEntity), Changed<NetworkedEntity>>();
|
||||
world.query_filtered::<(Entity, &NetworkedEntity), Or<(Added<NetworkedEntity>, Changed<NetworkedEntity>)>>();
|
||||
query
|
||||
.iter(world)
|
||||
.map(|(entity, networked)| (entity, networked.network_id, networked.owner_node_id))
|
||||
|
||||
@@ -36,6 +36,7 @@ mod auth;
|
||||
mod blob_support;
|
||||
mod change_detection;
|
||||
mod components;
|
||||
mod control;
|
||||
mod delta_generation;
|
||||
mod entity_map;
|
||||
mod error;
|
||||
@@ -62,6 +63,7 @@ pub use auth::*;
|
||||
pub use blob_support::*;
|
||||
pub use change_detection::*;
|
||||
pub use components::*;
|
||||
pub use control::*;
|
||||
pub use delta_generation::*;
|
||||
pub use entity_map::*;
|
||||
pub use error::*;
|
||||
|
||||
@@ -168,7 +168,8 @@ pub fn save_session_on_shutdown_system(world: &mut World) {
|
||||
|
||||
// Update session metadata
|
||||
session.touch();
|
||||
session.transition_to(SessionState::Left);
|
||||
// Note: We don't transition to Left here - that only happens on actual shutdown
|
||||
// This periodic save just persists the current state
|
||||
|
||||
// Count entities in the world
|
||||
let entity_count = world
|
||||
|
||||
Reference in New Issue
Block a user