feat(marathonctl): Add CLI arguments for multi-instance support
Added --db-path and --control-socket arguments to app binary to enable running multiple instances simultaneously. Updated marathonctl to use clap with --socket argument for targeting different instances. Enables multi-client testing with isolated databases and control sockets. Refs #131, #132 Signed-off-by: Sienna Meridian Satterwhite <sienna@r3t.io>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -267,6 +267,7 @@ dependencies = [
|
|||||||
"anyhow",
|
"anyhow",
|
||||||
"bevy",
|
"bevy",
|
||||||
"bytes",
|
"bytes",
|
||||||
|
"clap",
|
||||||
"crossbeam-channel",
|
"crossbeam-channel",
|
||||||
"egui",
|
"egui",
|
||||||
"futures-lite",
|
"futures-lite",
|
||||||
|
|||||||
@@ -34,6 +34,7 @@ futures-lite = "2.0"
|
|||||||
rkyv = { workspace = true }
|
rkyv = { workspace = true }
|
||||||
bytes = "1.0"
|
bytes = "1.0"
|
||||||
crossbeam-channel = "0.5.15"
|
crossbeam-channel = "0.5.15"
|
||||||
|
clap = { version = "4.0", features = ["derive"] }
|
||||||
|
|
||||||
[target.'cfg(target_os = "ios")'.dependencies]
|
[target.'cfg(target_os = "ios")'.dependencies]
|
||||||
objc = "0.2"
|
objc = "0.2"
|
||||||
|
|||||||
193
crates/app/src/bin/marathonctl.rs
Normal file
193
crates/app/src/bin/marathonctl.rs
Normal file
@@ -0,0 +1,193 @@
|
|||||||
|
//! Marathon control CLI
|
||||||
|
//!
|
||||||
|
//! Send control commands to a running Marathon instance via Unix domain socket.
|
||||||
|
//!
|
||||||
|
//! # Usage
|
||||||
|
//!
|
||||||
|
//! ```bash
|
||||||
|
//! # Get session status
|
||||||
|
//! marathonctl status
|
||||||
|
//!
|
||||||
|
//! # Start networking with a session
|
||||||
|
//! marathonctl start <session-code>
|
||||||
|
//!
|
||||||
|
//! # Use custom socket
|
||||||
|
//! marathonctl --socket /tmp/marathon1.sock status
|
||||||
|
//! ```
|
||||||
|
|
||||||
|
use clap::{Parser, Subcommand};
|
||||||
|
use std::io::{Read, Write};
|
||||||
|
use std::os::unix::net::UnixStream;
|
||||||
|
|
||||||
|
use libmarathon::networking::{ControlCommand, ControlResponse, SessionId};
|
||||||
|
|
||||||
|
/// Marathon control CLI
|
||||||
|
#[derive(Parser, Debug)]
|
||||||
|
#[command(version, about, long_about = None)]
|
||||||
|
struct Args {
|
||||||
|
/// Path to the control socket
|
||||||
|
#[arg(long, default_value = "/tmp/marathon-control.sock")]
|
||||||
|
socket: String,
|
||||||
|
|
||||||
|
#[command(subcommand)]
|
||||||
|
command: Commands,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Subcommand, Debug)]
|
||||||
|
enum Commands {
|
||||||
|
/// Start networking with a session
|
||||||
|
Start {
|
||||||
|
/// Session code (e.g., abc-def-123)
|
||||||
|
session_code: String,
|
||||||
|
},
|
||||||
|
/// Stop networking
|
||||||
|
Stop,
|
||||||
|
/// Get current session status
|
||||||
|
Status,
|
||||||
|
/// Send a test message
|
||||||
|
Test {
|
||||||
|
/// Message content
|
||||||
|
content: String,
|
||||||
|
},
|
||||||
|
/// Broadcast a ping message
|
||||||
|
Ping,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
let args = Args::parse();
|
||||||
|
|
||||||
|
// Build command from subcommand
|
||||||
|
let command = match args.command {
|
||||||
|
Commands::Start { session_code } => ControlCommand::JoinSession { session_code },
|
||||||
|
Commands::Stop => ControlCommand::LeaveSession,
|
||||||
|
Commands::Status => ControlCommand::GetStatus,
|
||||||
|
Commands::Test { content } => ControlCommand::SendTestMessage { content },
|
||||||
|
Commands::Ping => {
|
||||||
|
use libmarathon::networking::{SyncMessage, VectorClock};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
// For ping, we send a SyncRequest (lightweight ping-like message)
|
||||||
|
let node_id = Uuid::new_v4();
|
||||||
|
ControlCommand::BroadcastMessage {
|
||||||
|
message: SyncMessage::SyncRequest {
|
||||||
|
node_id,
|
||||||
|
vector_clock: VectorClock::new(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Connect to Unix socket
|
||||||
|
let socket_path = &args.socket;
|
||||||
|
let mut stream = match UnixStream::connect(&socket_path) {
|
||||||
|
Ok(s) => s,
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!("Failed to connect to {}: {}", socket_path, e);
|
||||||
|
eprintln!("Is the Marathon app running?");
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Send command
|
||||||
|
if let Err(e) = send_command(&mut stream, &command) {
|
||||||
|
eprintln!("Failed to send command: {}", e);
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Receive response
|
||||||
|
match receive_response(&mut stream) {
|
||||||
|
Ok(response) => {
|
||||||
|
print_response(response);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!("Failed to receive response: {}", e);
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_command(stream: &mut UnixStream, command: &ControlCommand) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let bytes = command.to_bytes()?;
|
||||||
|
let len = bytes.len() as u32;
|
||||||
|
|
||||||
|
// Write length prefix
|
||||||
|
stream.write_all(&len.to_le_bytes())?;
|
||||||
|
// Write command bytes
|
||||||
|
stream.write_all(&bytes)?;
|
||||||
|
stream.flush()?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn receive_response(stream: &mut UnixStream) -> Result<ControlResponse, Box<dyn std::error::Error>> {
|
||||||
|
// Read length prefix
|
||||||
|
let mut len_buf = [0u8; 4];
|
||||||
|
stream.read_exact(&mut len_buf)?;
|
||||||
|
let len = u32::from_le_bytes(len_buf) as usize;
|
||||||
|
|
||||||
|
// Read response bytes
|
||||||
|
let mut response_buf = vec![0u8; len];
|
||||||
|
stream.read_exact(&mut response_buf)?;
|
||||||
|
|
||||||
|
// Deserialize response
|
||||||
|
let response = ControlResponse::from_bytes(&response_buf)?;
|
||||||
|
Ok(response)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn print_response(response: ControlResponse) {
|
||||||
|
use libmarathon::networking::{SessionInfo, PeerInfo};
|
||||||
|
|
||||||
|
match response {
|
||||||
|
ControlResponse::Status {
|
||||||
|
node_id,
|
||||||
|
session_id,
|
||||||
|
outgoing_queue_size,
|
||||||
|
incoming_queue_size,
|
||||||
|
connected_peers,
|
||||||
|
} => {
|
||||||
|
println!("Session Status:");
|
||||||
|
println!(" Node ID: {}", node_id);
|
||||||
|
println!(" Session: {}", session_id);
|
||||||
|
println!(" Outgoing Queue: {} messages", outgoing_queue_size);
|
||||||
|
println!(" Incoming Queue: {} messages", incoming_queue_size);
|
||||||
|
if let Some(peers) = connected_peers {
|
||||||
|
println!(" Connected Peers: {}", peers);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ControlResponse::SessionInfo(info) => {
|
||||||
|
println!("Session Info:");
|
||||||
|
println!(" ID: {}", info.session_id);
|
||||||
|
if let Some(ref name) = info.session_name {
|
||||||
|
println!(" Name: {}", name);
|
||||||
|
}
|
||||||
|
println!(" State: {:?}", info.state);
|
||||||
|
println!(" Entities: {}", info.entity_count);
|
||||||
|
println!(" Created: {}", info.created_at);
|
||||||
|
println!(" Last Active: {}", info.last_active);
|
||||||
|
}
|
||||||
|
ControlResponse::Sessions(sessions) => {
|
||||||
|
println!("Sessions ({} total):", sessions.len());
|
||||||
|
for session in sessions {
|
||||||
|
println!(" {}: {:?} ({} entities)", session.session_id, session.state, session.entity_count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ControlResponse::Peers(peers) => {
|
||||||
|
println!("Connected Peers ({} total):", peers.len());
|
||||||
|
for peer in peers {
|
||||||
|
print!(" {}", peer.node_id);
|
||||||
|
if let Some(since) = peer.connected_since {
|
||||||
|
println!(" (connected since: {})", since);
|
||||||
|
} else {
|
||||||
|
println!();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ControlResponse::Ok { message } => {
|
||||||
|
println!("Success: {}", message);
|
||||||
|
}
|
||||||
|
ControlResponse::Error { error } => {
|
||||||
|
eprintln!("Error: {}", error);
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
159
crates/app/src/control.rs
Normal file
159
crates/app/src/control.rs
Normal file
@@ -0,0 +1,159 @@
|
|||||||
|
//! Standalone control socket for engine control
|
||||||
|
//!
|
||||||
|
//! This control socket starts at app launch and allows external control
|
||||||
|
//! of the engine, including starting/stopping networking, before any
|
||||||
|
//! networking is initialized.
|
||||||
|
|
||||||
|
use anyhow::Result;
|
||||||
|
use bevy::prelude::*;
|
||||||
|
use libmarathon::{
|
||||||
|
engine::{EngineBridge, EngineCommand},
|
||||||
|
networking::{ControlCommand, ControlResponse, SessionId},
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Resource holding the control socket path
|
||||||
|
#[derive(Resource)]
|
||||||
|
pub struct ControlSocketPath(pub String);
|
||||||
|
|
||||||
|
/// Startup system to launch the control socket server
|
||||||
|
#[cfg(not(target_os = "ios"))]
|
||||||
|
#[cfg(debug_assertions)]
|
||||||
|
pub fn start_control_socket_system(socket_path_res: Res<ControlSocketPath>, bridge: Res<EngineBridge>) {
|
||||||
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
use tokio::net::UnixListener;
|
||||||
|
|
||||||
|
let socket_path = socket_path_res.0.clone();
|
||||||
|
info!("Starting control socket at {}", socket_path);
|
||||||
|
|
||||||
|
// Clone bridge for the async task
|
||||||
|
let bridge = bridge.clone();
|
||||||
|
|
||||||
|
// Spawn tokio runtime in background thread
|
||||||
|
std::thread::spawn(move || {
|
||||||
|
let rt = tokio::runtime::Runtime::new().unwrap();
|
||||||
|
rt.block_on(async move {
|
||||||
|
// Clean up any existing socket
|
||||||
|
let _ = std::fs::remove_file(&socket_path);
|
||||||
|
|
||||||
|
let listener = match UnixListener::bind(&socket_path) {
|
||||||
|
Ok(l) => {
|
||||||
|
info!("Control socket listening at {}", socket_path);
|
||||||
|
l
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("Failed to bind control socket: {}", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Accept connections in a loop
|
||||||
|
loop {
|
||||||
|
match listener.accept().await {
|
||||||
|
Ok((mut stream, _addr)) => {
|
||||||
|
let bridge = bridge.clone();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
// Read command length
|
||||||
|
let mut len_buf = [0u8; 4];
|
||||||
|
if let Err(e) = stream.read_exact(&mut len_buf).await {
|
||||||
|
error!("Failed to read command length: {}", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let len = u32::from_le_bytes(len_buf) as usize;
|
||||||
|
|
||||||
|
// Read command bytes
|
||||||
|
let mut cmd_buf = vec![0u8; len];
|
||||||
|
if let Err(e) = stream.read_exact(&mut cmd_buf).await {
|
||||||
|
error!("Failed to read command: {}", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deserialize command
|
||||||
|
let command = match ControlCommand::from_bytes(&cmd_buf) {
|
||||||
|
Ok(cmd) => cmd,
|
||||||
|
Err(e) => {
|
||||||
|
error!("Failed to deserialize command: {}", e);
|
||||||
|
let response = ControlResponse::Error {
|
||||||
|
error: format!("Failed to deserialize: {}", e),
|
||||||
|
};
|
||||||
|
let _ = send_response(&mut stream, response).await;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
info!("Received control command: {:?}", command);
|
||||||
|
|
||||||
|
// Handle command
|
||||||
|
let response = handle_command(command, &bridge).await;
|
||||||
|
|
||||||
|
// Send response
|
||||||
|
if let Err(e) = send_response(&mut stream, response).await {
|
||||||
|
error!("Failed to send response: {}", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("Failed to accept connection: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handle a control command and generate a response
|
||||||
|
#[cfg(not(target_os = "ios"))]
|
||||||
|
#[cfg(debug_assertions)]
|
||||||
|
async fn handle_command(command: ControlCommand, bridge: &EngineBridge) -> ControlResponse {
|
||||||
|
match command {
|
||||||
|
ControlCommand::JoinSession { session_code } => {
|
||||||
|
match SessionId::from_code(&session_code) {
|
||||||
|
Ok(session_id) => {
|
||||||
|
bridge.send_command(EngineCommand::StartNetworking {
|
||||||
|
session_id: session_id.clone(),
|
||||||
|
});
|
||||||
|
ControlResponse::Ok {
|
||||||
|
message: format!("Starting networking with session: {}", session_id),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => ControlResponse::Error {
|
||||||
|
error: format!("Invalid session code: {}", e),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ControlCommand::LeaveSession => {
|
||||||
|
bridge.send_command(EngineCommand::StopNetworking);
|
||||||
|
ControlResponse::Ok {
|
||||||
|
message: "Stopping networking".to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_ => ControlResponse::Error {
|
||||||
|
error: format!("Command {:?} not yet implemented", command),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a response back through the Unix socket
|
||||||
|
#[cfg(not(target_os = "ios"))]
|
||||||
|
#[cfg(debug_assertions)]
|
||||||
|
async fn send_response(
|
||||||
|
stream: &mut tokio::net::UnixStream,
|
||||||
|
response: ControlResponse,
|
||||||
|
) -> Result<()> {
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
|
||||||
|
let bytes = response.to_bytes()?;
|
||||||
|
let len = bytes.len() as u32;
|
||||||
|
|
||||||
|
stream.write_all(&len.to_le_bytes()).await?;
|
||||||
|
stream.write_all(&bytes).await?;
|
||||||
|
stream.flush().await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// No-op stubs for iOS and release builds
|
||||||
|
#[cfg(any(target_os = "ios", not(debug_assertions)))]
|
||||||
|
pub fn start_control_socket_system() {}
|
||||||
@@ -3,6 +3,7 @@
|
|||||||
//! This demonstrates real-time CRDT synchronization with Apple Pencil input.
|
//! This demonstrates real-time CRDT synchronization with Apple Pencil input.
|
||||||
|
|
||||||
use bevy::prelude::*;
|
use bevy::prelude::*;
|
||||||
|
use clap::Parser;
|
||||||
use libmarathon::{
|
use libmarathon::{
|
||||||
engine::{
|
engine::{
|
||||||
EngineBridge,
|
EngineBridge,
|
||||||
@@ -16,6 +17,19 @@ use bevy::app::ScheduleRunnerPlugin;
|
|||||||
#[cfg(feature = "headless")]
|
#[cfg(feature = "headless")]
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
/// Marathon - CRDT-based collaborative editing engine
|
||||||
|
#[derive(Parser, Debug)]
|
||||||
|
#[command(version, about, long_about = None)]
|
||||||
|
struct Args {
|
||||||
|
/// Path to the database file
|
||||||
|
#[arg(long, default_value = "marathon.db")]
|
||||||
|
db_path: String,
|
||||||
|
|
||||||
|
/// Path to the control socket (Unix domain socket)
|
||||||
|
#[arg(long, default_value = "/tmp/marathon-control.sock")]
|
||||||
|
control_socket: String,
|
||||||
|
}
|
||||||
|
|
||||||
mod camera;
|
mod camera;
|
||||||
mod control;
|
mod control;
|
||||||
mod cube;
|
mod cube;
|
||||||
@@ -40,6 +54,9 @@ use session::*;
|
|||||||
use session_ui::*;
|
use session_ui::*;
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
|
// Parse command-line arguments
|
||||||
|
let args = Args::parse();
|
||||||
|
|
||||||
// Note: eprintln doesn't work on iOS, but tracing-oslog will once initialized
|
// Note: eprintln doesn't work on iOS, but tracing-oslog will once initialized
|
||||||
eprintln!(">>> RUST ENTRY: main() started");
|
eprintln!(">>> RUST ENTRY: main() started");
|
||||||
|
|
||||||
@@ -81,9 +98,8 @@ fn main() {
|
|||||||
// Application configuration
|
// Application configuration
|
||||||
const APP_NAME: &str = "Aspen";
|
const APP_NAME: &str = "Aspen";
|
||||||
|
|
||||||
// Get platform-appropriate database path
|
// Use database path from CLI args
|
||||||
eprintln!(">>> Getting database path");
|
let db_path = std::path::PathBuf::from(&args.db_path);
|
||||||
let db_path = libmarathon::platform::get_database_path(APP_NAME);
|
|
||||||
let db_path_str = db_path.to_str().unwrap().to_string();
|
let db_path_str = db_path.to_str().unwrap().to_string();
|
||||||
info!("Database path: {}", db_path_str);
|
info!("Database path: {}", db_path_str);
|
||||||
eprintln!(">>> Database path: {}", db_path_str);
|
eprintln!(">>> Database path: {}", db_path_str);
|
||||||
@@ -185,6 +201,9 @@ fn main() {
|
|||||||
app.add_plugins(EngineBridgePlugin);
|
app.add_plugins(EngineBridgePlugin);
|
||||||
app.add_plugins(CubePlugin);
|
app.add_plugins(CubePlugin);
|
||||||
app.add_systems(Startup, initialize_offline_resources);
|
app.add_systems(Startup, initialize_offline_resources);
|
||||||
|
|
||||||
|
// Insert control socket path as resource
|
||||||
|
app.insert_resource(control::ControlSocketPath(args.control_socket.clone()));
|
||||||
app.add_systems(Startup, control::start_control_socket_system);
|
app.add_systems(Startup, control::start_control_socket_system);
|
||||||
|
|
||||||
// Rendering-only plugins
|
// Rendering-only plugins
|
||||||
|
|||||||
Reference in New Issue
Block a user