diff --git a/Cargo.lock b/Cargo.lock index 05e1059..3b63a26 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -267,6 +267,7 @@ dependencies = [ "anyhow", "bevy", "bytes", + "clap", "crossbeam-channel", "egui", "futures-lite", diff --git a/crates/app/Cargo.toml b/crates/app/Cargo.toml index e77d010..44a804a 100644 --- a/crates/app/Cargo.toml +++ b/crates/app/Cargo.toml @@ -34,6 +34,7 @@ futures-lite = "2.0" rkyv = { workspace = true } bytes = "1.0" crossbeam-channel = "0.5.15" +clap = { version = "4.0", features = ["derive"] } [target.'cfg(target_os = "ios")'.dependencies] objc = "0.2" diff --git a/crates/app/src/bin/marathonctl.rs b/crates/app/src/bin/marathonctl.rs new file mode 100644 index 0000000..6f86ef8 --- /dev/null +++ b/crates/app/src/bin/marathonctl.rs @@ -0,0 +1,193 @@ +//! Marathon control CLI +//! +//! Send control commands to a running Marathon instance via Unix domain socket. +//! +//! # Usage +//! +//! ```bash +//! # Get session status +//! marathonctl status +//! +//! # Start networking with a session +//! marathonctl start +//! +//! # Use custom socket +//! marathonctl --socket /tmp/marathon1.sock status +//! ``` + +use clap::{Parser, Subcommand}; +use std::io::{Read, Write}; +use std::os::unix::net::UnixStream; + +use libmarathon::networking::{ControlCommand, ControlResponse, SessionId}; + +/// Marathon control CLI +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + /// Path to the control socket + #[arg(long, default_value = "/tmp/marathon-control.sock")] + socket: String, + + #[command(subcommand)] + command: Commands, +} + +#[derive(Subcommand, Debug)] +enum Commands { + /// Start networking with a session + Start { + /// Session code (e.g., abc-def-123) + session_code: String, + }, + /// Stop networking + Stop, + /// Get current session status + Status, + /// Send a test message + Test { + /// Message content + content: String, + }, + /// Broadcast a ping message + Ping, +} + +fn main() { + let args = Args::parse(); + + // Build command from subcommand + let command = match args.command { + Commands::Start { session_code } => ControlCommand::JoinSession { session_code }, + Commands::Stop => ControlCommand::LeaveSession, + Commands::Status => ControlCommand::GetStatus, + Commands::Test { content } => ControlCommand::SendTestMessage { content }, + Commands::Ping => { + use libmarathon::networking::{SyncMessage, VectorClock}; + use uuid::Uuid; + + // For ping, we send a SyncRequest (lightweight ping-like message) + let node_id = Uuid::new_v4(); + ControlCommand::BroadcastMessage { + message: SyncMessage::SyncRequest { + node_id, + vector_clock: VectorClock::new(), + }, + } + } + }; + + // Connect to Unix socket + let socket_path = &args.socket; + let mut stream = match UnixStream::connect(&socket_path) { + Ok(s) => s, + Err(e) => { + eprintln!("Failed to connect to {}: {}", socket_path, e); + eprintln!("Is the Marathon app running?"); + std::process::exit(1); + } + }; + + // Send command + if let Err(e) = send_command(&mut stream, &command) { + eprintln!("Failed to send command: {}", e); + std::process::exit(1); + } + + // Receive response + match receive_response(&mut stream) { + Ok(response) => { + print_response(response); + } + Err(e) => { + eprintln!("Failed to receive response: {}", e); + std::process::exit(1); + } + } +} + +fn send_command(stream: &mut UnixStream, command: &ControlCommand) -> Result<(), Box> { + let bytes = command.to_bytes()?; + let len = bytes.len() as u32; + + // Write length prefix + stream.write_all(&len.to_le_bytes())?; + // Write command bytes + stream.write_all(&bytes)?; + stream.flush()?; + + Ok(()) +} + +fn receive_response(stream: &mut UnixStream) -> Result> { + // Read length prefix + let mut len_buf = [0u8; 4]; + stream.read_exact(&mut len_buf)?; + let len = u32::from_le_bytes(len_buf) as usize; + + // Read response bytes + let mut response_buf = vec![0u8; len]; + stream.read_exact(&mut response_buf)?; + + // Deserialize response + let response = ControlResponse::from_bytes(&response_buf)?; + Ok(response) +} + +fn print_response(response: ControlResponse) { + use libmarathon::networking::{SessionInfo, PeerInfo}; + + match response { + ControlResponse::Status { + node_id, + session_id, + outgoing_queue_size, + incoming_queue_size, + connected_peers, + } => { + println!("Session Status:"); + println!(" Node ID: {}", node_id); + println!(" Session: {}", session_id); + println!(" Outgoing Queue: {} messages", outgoing_queue_size); + println!(" Incoming Queue: {} messages", incoming_queue_size); + if let Some(peers) = connected_peers { + println!(" Connected Peers: {}", peers); + } + } + ControlResponse::SessionInfo(info) => { + println!("Session Info:"); + println!(" ID: {}", info.session_id); + if let Some(ref name) = info.session_name { + println!(" Name: {}", name); + } + println!(" State: {:?}", info.state); + println!(" Entities: {}", info.entity_count); + println!(" Created: {}", info.created_at); + println!(" Last Active: {}", info.last_active); + } + ControlResponse::Sessions(sessions) => { + println!("Sessions ({} total):", sessions.len()); + for session in sessions { + println!(" {}: {:?} ({} entities)", session.session_id, session.state, session.entity_count); + } + } + ControlResponse::Peers(peers) => { + println!("Connected Peers ({} total):", peers.len()); + for peer in peers { + print!(" {}", peer.node_id); + if let Some(since) = peer.connected_since { + println!(" (connected since: {})", since); + } else { + println!(); + } + } + } + ControlResponse::Ok { message } => { + println!("Success: {}", message); + } + ControlResponse::Error { error } => { + eprintln!("Error: {}", error); + std::process::exit(1); + } + } +} diff --git a/crates/app/src/control.rs b/crates/app/src/control.rs new file mode 100644 index 0000000..4ae3e67 --- /dev/null +++ b/crates/app/src/control.rs @@ -0,0 +1,159 @@ +//! Standalone control socket for engine control +//! +//! This control socket starts at app launch and allows external control +//! of the engine, including starting/stopping networking, before any +//! networking is initialized. + +use anyhow::Result; +use bevy::prelude::*; +use libmarathon::{ + engine::{EngineBridge, EngineCommand}, + networking::{ControlCommand, ControlResponse, SessionId}, +}; + +/// Resource holding the control socket path +#[derive(Resource)] +pub struct ControlSocketPath(pub String); + +/// Startup system to launch the control socket server +#[cfg(not(target_os = "ios"))] +#[cfg(debug_assertions)] +pub fn start_control_socket_system(socket_path_res: Res, bridge: Res) { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use tokio::net::UnixListener; + + let socket_path = socket_path_res.0.clone(); + info!("Starting control socket at {}", socket_path); + + // Clone bridge for the async task + let bridge = bridge.clone(); + + // Spawn tokio runtime in background thread + std::thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async move { + // Clean up any existing socket + let _ = std::fs::remove_file(&socket_path); + + let listener = match UnixListener::bind(&socket_path) { + Ok(l) => { + info!("Control socket listening at {}", socket_path); + l + } + Err(e) => { + error!("Failed to bind control socket: {}", e); + return; + } + }; + + // Accept connections in a loop + loop { + match listener.accept().await { + Ok((mut stream, _addr)) => { + let bridge = bridge.clone(); + + tokio::spawn(async move { + // Read command length + let mut len_buf = [0u8; 4]; + if let Err(e) = stream.read_exact(&mut len_buf).await { + error!("Failed to read command length: {}", e); + return; + } + let len = u32::from_le_bytes(len_buf) as usize; + + // Read command bytes + let mut cmd_buf = vec![0u8; len]; + if let Err(e) = stream.read_exact(&mut cmd_buf).await { + error!("Failed to read command: {}", e); + return; + } + + // Deserialize command + let command = match ControlCommand::from_bytes(&cmd_buf) { + Ok(cmd) => cmd, + Err(e) => { + error!("Failed to deserialize command: {}", e); + let response = ControlResponse::Error { + error: format!("Failed to deserialize: {}", e), + }; + let _ = send_response(&mut stream, response).await; + return; + } + }; + + info!("Received control command: {:?}", command); + + // Handle command + let response = handle_command(command, &bridge).await; + + // Send response + if let Err(e) = send_response(&mut stream, response).await { + error!("Failed to send response: {}", e); + } + }); + } + Err(e) => { + error!("Failed to accept connection: {}", e); + } + } + } + }); + }); +} + +/// Handle a control command and generate a response +#[cfg(not(target_os = "ios"))] +#[cfg(debug_assertions)] +async fn handle_command(command: ControlCommand, bridge: &EngineBridge) -> ControlResponse { + match command { + ControlCommand::JoinSession { session_code } => { + match SessionId::from_code(&session_code) { + Ok(session_id) => { + bridge.send_command(EngineCommand::StartNetworking { + session_id: session_id.clone(), + }); + ControlResponse::Ok { + message: format!("Starting networking with session: {}", session_id), + } + } + Err(e) => ControlResponse::Error { + error: format!("Invalid session code: {}", e), + }, + } + } + + ControlCommand::LeaveSession => { + bridge.send_command(EngineCommand::StopNetworking); + ControlResponse::Ok { + message: "Stopping networking".to_string(), + } + } + + _ => ControlResponse::Error { + error: format!("Command {:?} not yet implemented", command), + }, + } +} + +/// Send a response back through the Unix socket +#[cfg(not(target_os = "ios"))] +#[cfg(debug_assertions)] +async fn send_response( + stream: &mut tokio::net::UnixStream, + response: ControlResponse, +) -> Result<()> { + use tokio::io::AsyncWriteExt; + + let bytes = response.to_bytes()?; + let len = bytes.len() as u32; + + stream.write_all(&len.to_le_bytes()).await?; + stream.write_all(&bytes).await?; + stream.flush().await?; + + Ok(()) +} + +// No-op stubs for iOS and release builds +#[cfg(any(target_os = "ios", not(debug_assertions)))] +pub fn start_control_socket_system() {} diff --git a/crates/app/src/main.rs b/crates/app/src/main.rs index 037e9f7..13368d1 100644 --- a/crates/app/src/main.rs +++ b/crates/app/src/main.rs @@ -3,6 +3,7 @@ //! This demonstrates real-time CRDT synchronization with Apple Pencil input. use bevy::prelude::*; +use clap::Parser; use libmarathon::{ engine::{ EngineBridge, @@ -16,6 +17,19 @@ use bevy::app::ScheduleRunnerPlugin; #[cfg(feature = "headless")] use std::time::Duration; +/// Marathon - CRDT-based collaborative editing engine +#[derive(Parser, Debug)] +#[command(version, about, long_about = None)] +struct Args { + /// Path to the database file + #[arg(long, default_value = "marathon.db")] + db_path: String, + + /// Path to the control socket (Unix domain socket) + #[arg(long, default_value = "/tmp/marathon-control.sock")] + control_socket: String, +} + mod camera; mod control; mod cube; @@ -40,6 +54,9 @@ use session::*; use session_ui::*; fn main() { + // Parse command-line arguments + let args = Args::parse(); + // Note: eprintln doesn't work on iOS, but tracing-oslog will once initialized eprintln!(">>> RUST ENTRY: main() started"); @@ -81,9 +98,8 @@ fn main() { // Application configuration const APP_NAME: &str = "Aspen"; - // Get platform-appropriate database path - eprintln!(">>> Getting database path"); - let db_path = libmarathon::platform::get_database_path(APP_NAME); + // Use database path from CLI args + let db_path = std::path::PathBuf::from(&args.db_path); let db_path_str = db_path.to_str().unwrap().to_string(); info!("Database path: {}", db_path_str); eprintln!(">>> Database path: {}", db_path_str); @@ -185,6 +201,9 @@ fn main() { app.add_plugins(EngineBridgePlugin); app.add_plugins(CubePlugin); app.add_systems(Startup, initialize_offline_resources); + + // Insert control socket path as resource + app.insert_resource(control::ControlSocketPath(args.control_socket.clone())); app.add_systems(Startup, control::start_control_socket_system); // Rendering-only plugins