diff --git a/.serena/memories/serialization-policy.md b/.serena/memories/serialization-policy.md new file mode 100644 index 0000000..ec78d43 --- /dev/null +++ b/.serena/memories/serialization-policy.md @@ -0,0 +1,11 @@ +# Serialization Policy + +**Never use serde for serialization in this project.** + +We use `rkyv` exclusively for all serialization needs: +- Network messages +- Component synchronization +- Persistence +- Any data serialization + +If a type from a dependency (like Bevy) doesn't support rkyv, we vendor it and add the rkyv derives ourselves. diff --git a/Cargo.lock b/Cargo.lock index 3b63a26..f91cb77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -272,9 +272,11 @@ dependencies = [ "egui", "futures-lite", "glam 0.29.3", + "inventory", "iroh", "iroh-gossip", "libmarathon", + "macros", "objc", "rand 0.8.5", "raw-window-handle", @@ -283,6 +285,7 @@ dependencies = [ "tempfile", "tokio", "tracing", + "tracing-appender", "tracing-oslog", "tracing-subscriber", "uuid", @@ -2366,6 +2369,21 @@ dependencies = [ "libc", ] +[[package]] +name = "crc" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9710d3b3739c2e349eb44fe848ad0b7c8cb1e42bd87ee49371df2f7acaf3e675" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" + [[package]] name = "crc32fast" version = "1.5.0" @@ -3083,6 +3101,17 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flume" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" +dependencies = [ + "futures-core", + "futures-sink", + "spin 0.9.8", +] + [[package]] name = "fnv" version = "1.0.7" @@ -4605,6 +4634,7 @@ dependencies = [ "fixedbitset", "futures-lite", "glam 0.29.3", + "hex", "image", "indexmap", "inventory", @@ -4615,6 +4645,7 @@ dependencies = [ "naga", "nonmax", "offset-allocator", + "pkarr", "proptest", "radsort", "rand 0.8.5", @@ -4629,6 +4660,7 @@ dependencies = [ "tempfile", "thiserror 2.0.17", "tokio", + "tokio-util", "toml", "tracing", "tracing-oslog", @@ -4771,6 +4803,28 @@ dependencies = [ "tracing", ] +[[package]] +name = "mainline" +version = "6.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ff27d378ca495eaf3be8616d5d7319c1c18e93fd60e13698fcdc7e19448f1a4" +dependencies = [ + "crc", + "document-features", + "dyn-clone", + "ed25519-dalek", + "flume", + "futures-lite", + "getrandom 0.3.4", + "lru 0.16.2", + "serde", + "serde_bencode", + "serde_bytes", + "sha1_smol", + "thiserror 2.0.17", + "tracing", +] + [[package]] name = "malloc_buf" version = "0.0.6" @@ -5853,6 +5907,7 @@ dependencies = [ "getrandom 0.3.4", "log", "lru 0.13.0", + "mainline", "ntimestamp", "reqwest", "self_cell", @@ -6860,6 +6915,16 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "serde_bencode" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a70dfc7b7438b99896e7f8992363ab8e2c4ba26aa5ec675d32d1c3c2c33d413e" +dependencies = [ + "serde", + "serde_bytes", +] + [[package]] name = "serde_bytes" version = "0.11.19" @@ -7437,11 +7502,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d" dependencies = [ "deranged", + "itoa", "js-sys", "num-conv", "powerfmt", "serde", "time-core", + "time-macros", ] [[package]] @@ -7450,6 +7517,16 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40868e7c1d2f0b8d73e4a8c7f0ff63af4f6d19be117e90bd73eb1d62cf831c6b" +[[package]] +name = "time-macros" +version = "0.2.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30cfb0125f12d9c277f35663a0a33f8c30190f4e4574868a330595412d34ebf3" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tiny-keccak" version = "2.0.2" @@ -7713,6 +7790,18 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-appender" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "786d480bce6247ab75f005b14ae1624ad978d3029d9113f0a22fa1ac773faeaf" +dependencies = [ + "crossbeam-channel", + "thiserror 2.0.17", + "time", + "tracing-subscriber", +] + [[package]] name = "tracing-attributes" version = "0.1.30" diff --git a/Cargo.toml b/Cargo.toml index c406488..201a382 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,9 +9,10 @@ edition = "2024" # Async runtime tokio = { version = "1", features = ["full"] } tokio-stream = "0.1" +tokio-util = "0.7" # Iroh - P2P networking and gossip -iroh = { version = "0.95.0",features = ["discovery-local-network"] } +iroh = { version = "0.95.0", features = ["discovery-pkarr-dht"] } iroh-gossip = "0.95.0" # Database @@ -37,6 +38,9 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } # Random rand = "0.8" +# Encoding +hex = "0.4" + # ML/AI candle-core = "0.8" candle-nn = "0.8" diff --git a/crates/app/Cargo.toml b/crates/app/Cargo.toml index 44a804a..371b051 100644 --- a/crates/app/Cargo.toml +++ b/crates/app/Cargo.toml @@ -11,6 +11,9 @@ headless = [] [dependencies] libmarathon = { path = "../libmarathon" } +macros = { path = "../macros" } +inventory = { workspace = true } +rkyv = { workspace = true } bevy = { version = "0.17", default-features = false, features = [ # bevy_render, bevy_core_pipeline, bevy_pbr are now vendored in libmarathon "bevy_ui", @@ -26,12 +29,12 @@ anyhow = "1.0" tokio = { version = "1", features = ["full"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } +tracing-appender = "0.2" serde = { version = "1.0", features = ["derive"] } rand = "0.8" iroh = { version = "0.95", features = ["discovery-local-network"] } iroh-gossip = "0.95" futures-lite = "2.0" -rkyv = { workspace = true } bytes = "1.0" crossbeam-channel = "0.5.15" clap = { version = "4.0", features = ["derive"] } diff --git a/crates/app/src/control.rs b/crates/app/src/control.rs index 8b79123..d6491ac 100644 --- a/crates/app/src/control.rs +++ b/crates/app/src/control.rs @@ -17,11 +17,25 @@ use uuid::Uuid; #[derive(Resource)] pub struct ControlSocketPath(pub String); +/// Resource holding the shutdown sender for control socket +#[derive(Resource)] +pub struct ControlSocketShutdown(Option>); + pub fn cleanup_control_socket( mut exit_events: MessageReader, socket_path: Option>, + shutdown: Option>, ) { for _ in exit_events.read() { + // Send shutdown signal to control socket thread + if let Some(ref shutdown_res) = shutdown { + if let Some(ref sender) = shutdown_res.0 { + info!("Sending shutdown signal to control socket"); + let _ = sender.send(()); + } + } + + // Clean up socket file if let Some(ref path) = socket_path { info!("Cleaning up control socket at {}", path.0); let _ = std::fs::remove_file(&path.0); @@ -87,6 +101,10 @@ pub fn start_control_socket_system( let app_queue = AppCommandQueue::new(); commands.insert_resource(app_queue.clone()); + // Create shutdown channel + let (shutdown_tx, shutdown_rx) = unbounded::<()>(); + commands.insert_resource(ControlSocketShutdown(Some(shutdown_tx))); + // Clone bridge and queue for the async task let bridge = bridge.clone(); let queue = app_queue; @@ -109,14 +127,25 @@ pub fn start_control_socket_system( } }; - // Accept connections in a loop + // Accept connections in a loop with shutdown support loop { - match listener.accept().await { - Ok((mut stream, _addr)) => { - let bridge = bridge.clone(); + tokio::select! { + // Check for shutdown signal + _ = tokio::task::spawn_blocking({ + let rx = shutdown_rx.clone(); + move || rx.try_recv() + }) => { + info!("Control socket received shutdown signal"); + break; + } + // Accept new connection + result = listener.accept() => { + match result { + Ok((mut stream, _addr)) => { + let bridge = bridge.clone(); - let queue_clone = queue.clone(); - tokio::spawn(async move { + let queue_clone = queue.clone(); + tokio::spawn(async move { // Read command length let mut len_buf = [0u8; 4]; if let Err(e) = stream.read_exact(&mut len_buf).await { @@ -155,12 +184,15 @@ pub fn start_control_socket_system( error!("Failed to send response: {}", e); } }); - } - Err(e) => { - error!("Failed to accept connection: {}", e); + } + Err(e) => { + error!("Failed to accept connection: {}", e); + } + } } } } + info!("Control socket server shut down cleanly"); }); }); } @@ -270,4 +302,7 @@ async fn send_response( // No-op stubs for iOS and release builds #[cfg(any(target_os = "ios", not(debug_assertions)))] -pub fn start_control_socket_system() {} +pub fn start_control_socket_system(mut commands: Commands) { + // Insert empty shutdown resource for consistency + commands.insert_resource(ControlSocketShutdown(None)); +} diff --git a/crates/app/src/cube.rs b/crates/app/src/cube.rs index 84e6b42..a1480d4 100644 --- a/crates/app/src/cube.rs +++ b/crates/app/src/cube.rs @@ -1,27 +1,37 @@ //! Cube entity management use bevy::prelude::*; -use libmarathon::{ - networking::{ - NetworkEntityMap, - NetworkedEntity, - NetworkedSelection, - NetworkedTransform, - NodeVectorClock, - Synced, - }, - persistence::Persisted, -}; -use serde::{ - Deserialize, - Serialize, -}; +use libmarathon::networking::{NetworkEntityMap, Synced}; use uuid::Uuid; /// Marker component for the replicated cube -#[derive(Component, Reflect, Debug, Clone, Copy, Default, Serialize, Deserialize)] -#[reflect(Component)] -pub struct CubeMarker; +/// +/// This component contains all the data needed for rendering a cube. +/// The `#[synced]` attribute automatically handles network synchronization. +#[macros::synced] +pub struct CubeMarker { + /// RGB color values (0.0 to 1.0) + pub color_r: f32, + pub color_g: f32, + pub color_b: f32, + pub size: f32, +} + +impl CubeMarker { + pub fn with_color(color: Color, size: f32) -> Self { + let [r, g, b, _] = color.to_linear().to_f32_array(); + Self { + color_r: r, + color_g: g, + color_b: b, + size, + } + } + + pub fn color(&self) -> Color { + Color::srgb(self.color_r, self.color_g, self.color_b) + } +} /// Message to spawn a new cube at a specific position #[derive(Message)] @@ -39,10 +49,33 @@ pub struct CubePlugin; impl Plugin for CubePlugin { fn build(&self, app: &mut App) { - app.register_type::() - .add_message::() + app.add_message::() .add_message::() - .add_systems(Update, (handle_spawn_cube, handle_delete_cube)); + .add_systems(Update, ( + handle_spawn_cube, + handle_delete_cube, + add_cube_rendering_system, // Custom rendering! + )); + } +} + +/// Custom rendering system - detects Added and adds mesh/material +fn add_cube_rendering_system( + mut commands: Commands, + query: Query<(Entity, &CubeMarker), Added>, + mut meshes: ResMut>, + mut materials: ResMut>, +) { + for (entity, cube) in &query { + commands.entity(entity).insert(( + Mesh3d(meshes.add(Cuboid::new(cube.size, cube.size, cube.size))), + MeshMaterial3d(materials.add(StandardMaterial { + base_color: cube.color(), // Use the color() helper method + perceptual_roughness: 0.7, + metallic: 0.3, + ..default() + })), + )); } } @@ -50,42 +83,16 @@ impl Plugin for CubePlugin { fn handle_spawn_cube( mut commands: Commands, mut messages: MessageReader, - mut meshes: Option>>, - mut materials: Option>>, - node_clock: Res, ) { for event in messages.read() { - let entity_id = Uuid::new_v4(); - let node_id = node_clock.node_id; + info!("Spawning cube at {:?}", event.position); - info!("Spawning cube {} at {:?}", entity_id, event.position); - - let mut entity = commands.spawn(( - CubeMarker, + commands.spawn(( + CubeMarker::with_color(Color::srgb(0.8, 0.3, 0.6), 1.0), Transform::from_translation(event.position), GlobalTransform::default(), - // Networking - NetworkedEntity::with_id(entity_id, node_id), - NetworkedTransform, - NetworkedSelection::default(), - // Persistence - Persisted::with_id(entity_id), - // Sync marker - Synced, + Synced, // Auto-adds NetworkedEntity, Persisted, NetworkedTransform )); - - // Only add rendering components if assets are available (non-headless mode) - if let (Some(ref mut meshes), Some(ref mut materials)) = (meshes.as_mut(), materials.as_mut()) { - entity.insert(( - Mesh3d(meshes.add(Cuboid::new(1.0, 1.0, 1.0))), - MeshMaterial3d(materials.add(StandardMaterial { - base_color: Color::srgb(0.8, 0.3, 0.6), - perceptual_roughness: 0.7, - metallic: 0.3, - ..default() - })), - )); - } } } @@ -97,8 +104,14 @@ fn handle_delete_cube( ) { for event in messages.read() { if let Some(bevy_entity) = entity_map.get_entity(event.entity_id) { - info!("Deleting cube {}", event.entity_id); - commands.entity(bevy_entity).despawn(); + info!("Marking cube {} for deletion", event.entity_id); + // Add ToDelete marker - the handle_local_deletions_system will: + // 1. Increment vector clock + // 2. Create Delete operation + // 3. Record tombstone + // 4. Broadcast deletion to peers + // 5. Despawn entity locally + commands.entity(bevy_entity).insert(libmarathon::networking::ToDelete); } else { warn!("Attempted to delete unknown cube {}", event.entity_id); } diff --git a/crates/app/src/debug_ui.rs b/crates/app/src/debug_ui.rs index 5121982..9a3d14a 100644 --- a/crates/app/src/debug_ui.rs +++ b/crates/app/src/debug_ui.rs @@ -43,11 +43,10 @@ fn render_debug_ui( // Node information if let Some(clock) = &node_clock { ui.label(format!("Node ID: {}", &clock.node_id.to_string()[..8])); - // Show the current node's clock value (timestamp) - let current_timestamp = - clock.clock.clocks.get(&clock.node_id).copied().unwrap_or(0); - ui.label(format!("Clock: {}", current_timestamp)); - ui.label(format!("Known nodes: {}", clock.clock.clocks.len())); + // Show the sum of all timestamps (total operations across all nodes) + let total_ops: u64 = clock.clock.timestamps.values().sum(); + ui.label(format!("Clock: {} (total ops)", total_ops)); + ui.label(format!("Known nodes: {}", clock.clock.node_count())); } else { ui.label("Node: Not initialized"); } diff --git a/crates/app/src/engine_bridge.rs b/crates/app/src/engine_bridge.rs index c3b50e6..26c9a89 100644 --- a/crates/app/src/engine_bridge.rs +++ b/crates/app/src/engine_bridge.rs @@ -53,6 +53,7 @@ fn poll_engine_events( let events = (*bridge).poll_events(); if !events.is_empty() { + debug!("Polling {} engine events", events.len()); for event in events { match event { EngineEvent::NetworkingInitializing { session_id, status } => { @@ -113,10 +114,17 @@ fn poll_engine_events( } EngineEvent::PeerJoined { node_id } => { info!("Peer joined: {}", node_id); + + // Initialize peer in vector clock so it shows up in UI immediately + node_clock.clock.timestamps.entry(node_id).or_insert(0); + // TODO(Phase 3.3): Trigger sync } EngineEvent::PeerLeft { node_id } => { info!("Peer left: {}", node_id); + + // Remove peer from vector clock + node_clock.clock.timestamps.remove(&node_id); } EngineEvent::LockAcquired { entity_id, holder } => { debug!("Lock acquired: entity={}, holder={}", entity_id, holder); @@ -165,20 +173,15 @@ fn poll_engine_events( } } -/// Handle app exit to stop networking immediately +/// Handle app exit - send shutdown signal to EngineCore fn handle_app_exit( mut exit_events: MessageReader, bridge: Res, - current_session: Res, ) { for _ in exit_events.read() { - // If networking is active, send stop command - // Don't wait - the task will be aborted when the runtime shuts down - if current_session.session.state == SessionState::Active - || current_session.session.state == SessionState::Joining { - info!("App exiting, aborting networking immediately"); - bridge.send_command(EngineCommand::StopNetworking); - // Don't sleep - just let the app exit. The tokio runtime will clean up. - } + info!("App exiting - sending Shutdown command to EngineCore"); + bridge.send_command(EngineCommand::Shutdown); + // The EngineCore will receive the Shutdown command and gracefully exit + // its event loop, allowing the tokio runtime thread to complete } } diff --git a/crates/app/src/input/input_handler.rs b/crates/app/src/input/input_handler.rs index 575d603..50e44e2 100644 --- a/crates/app/src/input/input_handler.rs +++ b/crates/app/src/input/input_handler.rs @@ -6,9 +6,14 @@ use bevy::prelude::*; use libmarathon::{ engine::GameAction, platform::input::InputController, - networking::{EntityLockRegistry, NetworkedEntity, NetworkedSelection, NodeVectorClock}, + networking::{ + EntityLockRegistry, LocalSelection, NetworkedEntity, + NodeVectorClock, + }, }; +use crate::cube::CubeMarker; + use super::event_buffer::InputEventBuffer; pub struct InputHandlerPlugin; @@ -16,7 +21,9 @@ pub struct InputHandlerPlugin; impl Plugin for InputHandlerPlugin { fn build(&self, app: &mut App) { app.init_resource::() - .add_systems(Update, handle_game_actions); + // handle_game_actions updates selection - must run before release_locks_on_deselection_system + .add_systems(Update, handle_game_actions.before(libmarathon::networking::release_locks_on_deselection_system)) + .add_systems(PostUpdate, update_lock_visuals); } } @@ -46,9 +53,10 @@ fn to_bevy_vec2(v: glam::Vec2) -> bevy::math::Vec2 { fn handle_game_actions( input_buffer: Res, mut controller_res: ResMut, - mut lock_registry: ResMut, + lock_registry: Res, node_clock: Res, - mut cube_query: Query<(&NetworkedEntity, &mut Transform, &mut NetworkedSelection), With>, + mut selection: ResMut, + mut cube_query: Query<(&NetworkedEntity, &mut Transform), With>, camera_query: Query<(&Camera, &GlobalTransform)>, window_query: Query<&Window>, ) { @@ -65,14 +73,23 @@ fn handle_game_actions( for action in all_actions { match action { GameAction::SelectEntity { position } => { - apply_select_entity( + // Do raycasting to find which entity (if any) was clicked + let entity_id = raycast_entity( position, - &mut lock_registry, - node_id, - &mut cube_query, + &cube_query, &camera_query, &window_query, ); + + // Update selection + // The release_locks_on_deselection_system will automatically handle lock changes + selection.clear(); + if let Some(id) = entity_id { + selection.insert(id); + info!("Selected entity {}", id); + } else { + info!("Deselected all entities"); + } } GameAction::MoveEntity { delta } => { @@ -98,32 +115,32 @@ fn handle_game_actions( } } -/// Apply SelectEntity action - raycast to find clicked cube and select it -fn apply_select_entity( +/// Raycast to find which entity was clicked +/// +/// Returns the network ID of the closest entity hit by the ray, or None if nothing was hit. +fn raycast_entity( position: glam::Vec2, - lock_registry: &mut EntityLockRegistry, - node_id: uuid::Uuid, - cube_query: &mut Query<(&NetworkedEntity, &mut Transform, &mut NetworkedSelection), With>, + cube_query: &Query<(&NetworkedEntity, &mut Transform), With>, camera_query: &Query<(&Camera, &GlobalTransform)>, window_query: &Query<&Window>, -) { +) -> Option { // Get the camera and window let Ok((camera, camera_transform)) = camera_query.single() else { - return; + return None; }; let Ok(window) = window_query.single() else { - return; + return None; }; // Convert screen position to world ray let Some(ray) = screen_to_world_ray(position, camera, camera_transform, window) else { - return; + return None; }; // Find the closest cube hit by the ray let mut closest_hit: Option<(uuid::Uuid, f32)> = None; - for (networked, transform, _) in cube_query.iter() { + for (networked, transform) in cube_query.iter() { // Test ray against cube AABB (1x1x1 cube) if let Some(distance) = ray_aabb_intersection( ray.origin, @@ -137,31 +154,7 @@ fn apply_select_entity( } } - // If we hit a cube, clear all selections and select this one - if let Some((hit_entity_id, _)) = closest_hit { - // Clear all previous selections and locks - for (networked, _, mut selection) in cube_query.iter_mut() { - selection.clear(); - lock_registry.release(networked.network_id, node_id); - } - - // Select and lock the clicked cube - for (networked, _, mut selection) in cube_query.iter_mut() { - if networked.network_id == hit_entity_id { - selection.add(hit_entity_id); - let _ = lock_registry.try_acquire(hit_entity_id, node_id); - info!("Selected cube {}", hit_entity_id); - break; - } - } - } else { - // Clicked on empty space - deselect all - for (networked, _, mut selection) in cube_query.iter_mut() { - selection.clear(); - lock_registry.release(networked.network_id, node_id); - } - info!("Deselected all cubes"); - } + closest_hit.map(|(entity_id, _)| entity_id) } /// Apply MoveEntity action to locked cubes @@ -169,12 +162,12 @@ fn apply_move_entity( delta: glam::Vec2, lock_registry: &EntityLockRegistry, node_id: uuid::Uuid, - cube_query: &mut Query<(&NetworkedEntity, &mut Transform, &mut NetworkedSelection), With>, + cube_query: &mut Query<(&NetworkedEntity, &mut Transform), With>, ) { let bevy_delta = to_bevy_vec2(delta); let sensitivity = 0.01; // Scale factor - for (networked, mut transform, _) in cube_query.iter_mut() { + for (networked, mut transform) in cube_query.iter_mut() { if lock_registry.is_locked_by(networked.network_id, node_id, node_id) { transform.translation.x += bevy_delta.x * sensitivity; transform.translation.y -= bevy_delta.y * sensitivity; // Invert Y for screen coords @@ -187,12 +180,12 @@ fn apply_rotate_entity( delta: glam::Vec2, lock_registry: &EntityLockRegistry, node_id: uuid::Uuid, - cube_query: &mut Query<(&NetworkedEntity, &mut Transform, &mut NetworkedSelection), With>, + cube_query: &mut Query<(&NetworkedEntity, &mut Transform), With>, ) { let bevy_delta = to_bevy_vec2(delta); let sensitivity = 0.01; - for (networked, mut transform, _) in cube_query.iter_mut() { + for (networked, mut transform) in cube_query.iter_mut() { if lock_registry.is_locked_by(networked.network_id, node_id, node_id) { let rotation_x = Quat::from_rotation_y(bevy_delta.x * sensitivity); let rotation_y = Quat::from_rotation_x(-bevy_delta.y * sensitivity); @@ -206,11 +199,11 @@ fn apply_move_depth( delta: f32, lock_registry: &EntityLockRegistry, node_id: uuid::Uuid, - cube_query: &mut Query<(&NetworkedEntity, &mut Transform, &mut NetworkedSelection), With>, + cube_query: &mut Query<(&NetworkedEntity, &mut Transform), With>, ) { let sensitivity = 0.1; - for (networked, mut transform, _) in cube_query.iter_mut() { + for (networked, mut transform) in cube_query.iter_mut() { if lock_registry.is_locked_by(networked.network_id, node_id, node_id) { transform.translation.z += delta * sensitivity; } @@ -221,9 +214,9 @@ fn apply_move_depth( fn apply_reset_entity( lock_registry: &EntityLockRegistry, node_id: uuid::Uuid, - cube_query: &mut Query<(&NetworkedEntity, &mut Transform, &mut NetworkedSelection), With>, + cube_query: &mut Query<(&NetworkedEntity, &mut Transform), With>, ) { - for (networked, mut transform, _) in cube_query.iter_mut() { + for (networked, mut transform) in cube_query.iter_mut() { if lock_registry.is_locked_by(networked.network_id, node_id, node_id) { transform.translation = Vec3::ZERO; transform.rotation = Quat::IDENTITY; @@ -317,3 +310,38 @@ fn ray_aabb_intersection( Some(tmin) } } + +/// System to update visual appearance based on lock state +/// +/// Color scheme: +/// - Green: Locked by us (we can edit) +/// - Blue: Locked by someone else (they can edit, we can't) +/// - Pink: Not locked (nobody is editing) +fn update_lock_visuals( + lock_registry: Res, + node_clock: Res, + mut cubes: Query<(&NetworkedEntity, &mut MeshMaterial3d), With>, + mut materials: ResMut>, +) { + for (networked, material_handle) in cubes.iter_mut() { + let entity_id = networked.network_id; + + // Determine color based on lock state + let node_id = node_clock.node_id; + let color = if lock_registry.is_locked_by(entity_id, node_id, node_id) { + // Locked by us - green + Color::srgb(0.3, 0.8, 0.3) + } else if lock_registry.is_locked(entity_id, node_id) { + // Locked by someone else - blue + Color::srgb(0.3, 0.5, 0.9) + } else { + // Not locked - default pink + Color::srgb(0.8, 0.3, 0.6) + }; + + // Update material color + if let Some(mat) = materials.get_mut(&material_handle.0) { + mat.base_color = color; + } + } +} diff --git a/crates/app/src/lib.rs b/crates/app/src/lib.rs index aa42e18..1033fff 100644 --- a/crates/app/src/lib.rs +++ b/crates/app/src/lib.rs @@ -9,6 +9,7 @@ pub mod debug_ui; pub mod engine_bridge; pub mod input; pub mod rendering; +pub mod session_ui; pub mod setup; pub use cube::CubeMarker; diff --git a/crates/app/src/main.rs b/crates/app/src/main.rs index 9d5a924..c7b2dec 100644 --- a/crates/app/src/main.rs +++ b/crates/app/src/main.rs @@ -28,6 +28,22 @@ struct Args { /// Path to the control socket (Unix domain socket) #[arg(long, default_value = "/tmp/marathon-control.sock")] control_socket: String, + + /// Log level (trace, debug, info, warn, error) + #[arg(long, default_value = "info")] + log_level: String, + + /// Path to log file (relative to current directory) + #[arg(long, default_value = "marathon.log")] + log_file: String, + + /// Disable log file output (console only) + #[arg(long, default_value = "false")] + no_log_file: bool, + + /// Disable console output (file only) + #[arg(long, default_value = "false")] + no_console: bool, } mod camera; @@ -36,7 +52,6 @@ mod cube; mod debug_ui; mod engine_bridge; mod rendering; -mod selection; mod session; mod session_ui; mod setup; @@ -49,7 +64,6 @@ mod input; use camera::*; use cube::*; use rendering::*; -use selection::*; use session::*; use session_ui::*; @@ -84,13 +98,86 @@ fn main() { #[cfg(not(target_os = "ios"))] { - tracing_subscriber::fmt() - .with_env_filter( - tracing_subscriber::EnvFilter::from_default_env() - .add_directive("wgpu=error".parse().unwrap()) - .add_directive("naga=warn".parse().unwrap()), - ) - .init(); + use tracing_subscriber::prelude::*; + + // Parse log level from args + let default_level = args.log_level.parse::() + .unwrap_or_else(|_| { + eprintln!("Invalid log level '{}', using 'info'", args.log_level); + tracing::Level::INFO + }); + + // Build filter with default level and quieter dependencies + let filter = tracing_subscriber::EnvFilter::from_default_env() + .add_directive(default_level.into()) + .add_directive("wgpu=error".parse().unwrap()) + .add_directive("naga=warn".parse().unwrap()); + + // Build subscriber based on combination of flags + match (args.no_console, args.no_log_file) { + (false, false) => { + // Both console and file + let console_layer = tracing_subscriber::fmt::layer() + .with_writer(std::io::stdout); + + let log_path = std::path::PathBuf::from(&args.log_file); + let log_dir = log_path.parent().unwrap_or_else(|| std::path::Path::new(".")); + let log_filename = log_path.file_name().unwrap().to_str().unwrap(); + let file_appender = tracing_appender::rolling::never(log_dir, log_filename); + let (non_blocking, _guard) = tracing_appender::non_blocking(file_appender); + std::mem::forget(_guard); + let file_layer = tracing_subscriber::fmt::layer() + .with_writer(non_blocking) + .with_ansi(false); + + tracing_subscriber::registry() + .with(filter) + .with(console_layer) + .with(file_layer) + .init(); + + eprintln!(">>> Logs written to: {} and console", args.log_file); + } + (false, true) => { + // Console only + let console_layer = tracing_subscriber::fmt::layer() + .with_writer(std::io::stdout); + + tracing_subscriber::registry() + .with(filter) + .with(console_layer) + .init(); + + eprintln!(">>> Console logging only (no log file)"); + } + (true, false) => { + // File only + let log_path = std::path::PathBuf::from(&args.log_file); + let log_dir = log_path.parent().unwrap_or_else(|| std::path::Path::new(".")); + let log_filename = log_path.file_name().unwrap().to_str().unwrap(); + let file_appender = tracing_appender::rolling::never(log_dir, log_filename); + let (non_blocking, _guard) = tracing_appender::non_blocking(file_appender); + std::mem::forget(_guard); + let file_layer = tracing_subscriber::fmt::layer() + .with_writer(non_blocking) + .with_ansi(false); + + tracing_subscriber::registry() + .with(filter) + .with(file_layer) + .init(); + + eprintln!(">>> Logs written to: {} (console disabled)", args.log_file); + } + (true, true) => { + // Neither - warn but initialize anyway + tracing_subscriber::registry() + .with(filter) + .init(); + + eprintln!(">>> Warning: Both console and file logging disabled!"); + } + } } eprintln!(">>> Tracing subscriber initialized"); @@ -213,7 +300,7 @@ fn main() { app.add_plugins(CameraPlugin); app.add_plugins(RenderingPlugin); app.add_plugins(input::InputHandlerPlugin); - app.add_plugins(SelectionPlugin); + // SelectionPlugin removed - InputHandlerPlugin already handles selection via GameActions app.add_plugins(DebugUiPlugin); app.add_plugins(SessionUiPlugin); } diff --git a/crates/app/src/session_ui.rs b/crates/app/src/session_ui.rs index d4f2ecb..d10fc6e 100644 --- a/crates/app/src/session_ui.rs +++ b/crates/app/src/session_ui.rs @@ -6,7 +6,7 @@ use bevy::prelude::*; use libmarathon::{ debug_ui::{egui, EguiContexts, EguiPrimaryContextPass}, - engine::{EngineBridge, EngineCommand}, + engine::{EngineBridge, EngineCommand, NetworkingInitStatus}, networking::{CurrentSession, NodeVectorClock, SessionId, SessionState}, }; @@ -15,10 +15,16 @@ pub struct SessionUiPlugin; impl Plugin for SessionUiPlugin { fn build(&self, app: &mut App) { app.init_resource::() + .init_resource::() .add_systems(EguiPrimaryContextPass, session_ui_panel); } } +#[derive(Resource, Default)] +pub struct NetworkingStatus { + pub latest_status: Option, +} + #[derive(Resource, Default)] struct SessionUiState { join_code_input: String, @@ -31,6 +37,7 @@ fn session_ui_panel( current_session: Res, node_clock: Option>, bridge: Res, + networking_status: Res, ) { // Log session state for debugging debug!("Session UI: state={:?}, id={}", @@ -45,65 +52,107 @@ fn session_ui_panel( .default_pos([320.0, 10.0]) .default_width(280.0) .show(ctx, |ui| { - // Check if networking is active based on session state - if current_session.session.state == SessionState::Active { - // ONLINE MODE: Networking is active - ui.heading("Session (Online)"); - ui.separator(); + // Display UI based on session state + match current_session.session.state { + SessionState::Active => { + // ONLINE MODE: Networking is active + ui.heading("Session (Online)"); + ui.separator(); - ui.horizontal(|ui| { - ui.label("Code:"); - ui.code(current_session.session.id.to_code()); - if ui.small_button("šŸ“‹").clicked() { - // TODO: Copy to clipboard (requires clipboard API) - info!("Session code: {}", current_session.session.id.to_code()); - } - }); - - ui.label(format!("State: {:?}", current_session.session.state)); - - if let Some(clock) = node_clock.as_ref() { - ui.label(format!("Connected nodes: {}", clock.clock.clocks.len())); - } - - ui.add_space(10.0); - - // Stop networking button - if ui.button("šŸ”Œ Stop Networking").clicked() { - info!("Stopping networking"); - bridge.send_command(EngineCommand::StopNetworking); - } - } else { - // OFFLINE MODE: Networking not started or disconnected - ui.heading("Offline Mode"); - ui.separator(); - - ui.label("World is running offline"); - ui.label("Vector clock is tracking changes"); - - if let Some(clock) = node_clock.as_ref() { - let current_seq = clock.clock.clocks.get(&clock.node_id).copied().unwrap_or(0); - ui.label(format!("Local sequence: {}", current_seq)); - } - - ui.add_space(10.0); - - // Start networking button - if ui.button("🌐 Start Networking").clicked() { - info!("Starting networking (will create new session)"); - // Generate a new session ID on the fly - let new_session_id = libmarathon::networking::SessionId::new(); - info!("New session code: {}", new_session_id.to_code()); - bridge.send_command(EngineCommand::StartNetworking { - session_id: new_session_id, + ui.horizontal(|ui| { + ui.label("Code:"); + ui.code(current_session.session.id.to_code()); + if ui.small_button("šŸ“‹").clicked() { + // TODO: Copy to clipboard (requires clipboard API) + info!("Session code: {}", current_session.session.id.to_code()); + } }); + + ui.label(format!("State: {:?}", current_session.session.state)); + + if let Some(clock) = node_clock.as_ref() { + ui.label(format!("Connected nodes: {}", clock.clock.node_count())); + } + + ui.add_space(10.0); + + // Stop networking button + if ui.button("šŸ”Œ Stop Networking").clicked() { + info!("Stopping networking"); + bridge.send_command(EngineCommand::StopNetworking); + } } + SessionState::Joining => { + // INITIALIZING: Networking is starting up + ui.heading("Connecting..."); + ui.separator(); - ui.add_space(5.0); + // Display initialization status + if let Some(ref status) = networking_status.latest_status { + match status { + NetworkingInitStatus::CreatingEndpoint => { + ui.label("ā³ Creating network endpoint..."); + } + NetworkingInitStatus::EndpointReady => { + ui.label("āœ“ Network endpoint ready"); + } + NetworkingInitStatus::DiscoveringPeers { session_code, attempt } => { + ui.label(format!("šŸ” Discovering peers for session {}", session_code)); + ui.label(format!(" Attempt {}/3...", attempt)); + } + NetworkingInitStatus::PeersFound { count } => { + ui.label(format!("āœ“ Found {} peer(s)!", count)); + } + NetworkingInitStatus::NoPeersFound => { + ui.label("ℹ No existing peers found"); + ui.label(" (Creating new session)"); + } + NetworkingInitStatus::PublishingToDHT => { + ui.label("šŸ“” Publishing to DHT..."); + } + NetworkingInitStatus::InitializingGossip => { + ui.label("šŸ”§ Initializing gossip protocol..."); + } + } + } else { + ui.label("ā³ Initializing..."); + } - // Join existing session button - if ui.button("āž• Join Session").clicked() { - ui_state.show_join_dialog = true; + ui.add_space(10.0); + ui.label("Please wait..."); + } + _ => { + // OFFLINE MODE: Networking not started or disconnected + ui.heading("Offline Mode"); + ui.separator(); + + ui.label("World is running offline"); + ui.label("Vector clock is tracking changes"); + + if let Some(clock) = node_clock.as_ref() { + let current_seq = clock.clock.timestamps.get(&clock.node_id).copied().unwrap_or(0); + ui.label(format!("Local sequence: {}", current_seq)); + } + + ui.add_space(10.0); + + // Start networking button + if ui.button("🌐 Start Networking").clicked() { + info!("Starting networking (will create new session)"); + // Generate a new session ID on the fly + let new_session_id = libmarathon::networking::SessionId::new(); + info!("New session code: {}", new_session_id.to_code()); + bridge.send_command(EngineCommand::StartNetworking { + session_id: new_session_id, + }); + } + + ui.add_space(5.0); + + // Join existing session button + if ui.button("āž• Join Session").clicked() { + ui_state.show_join_dialog = true; + } } } }); @@ -114,7 +163,10 @@ fn session_ui_panel( .collapsible(false) .show(ctx, |ui| { ui.label("Enter session code (abc-def-123):"); - ui.text_edit_singleline(&mut ui_state.join_code_input); + let text_edit = ui.text_edit_singleline(&mut ui_state.join_code_input); + + // Auto-focus the text input when dialog opens + text_edit.request_focus(); ui.add_space(5.0); ui.label("Note: Joining requires app restart"); diff --git a/crates/app/tests/cube_sync_headless.rs b/crates/app/tests/cube_sync_headless.rs index 52da6f2..c137f38 100644 --- a/crates/app/tests/cube_sync_headless.rs +++ b/crates/app/tests/cube_sync_headless.rs @@ -107,9 +107,6 @@ mod test_utils { }, )); - // Register cube component types for reflection - app.register_type::(); - app } @@ -425,7 +422,7 @@ async fn test_cube_spawn_and_sync() -> Result<()> { let spawned_entity = app1 .world_mut() .spawn(( - CubeMarker, + CubeMarker::with_color(Color::srgb(0.8, 0.3, 0.6), 1.0), Transform::from_xyz(1.0, 2.0, 3.0), GlobalTransform::default(), NetworkedEntity::with_id(entity_id, node1_id), diff --git a/crates/libmarathon/Cargo.toml b/crates/libmarathon/Cargo.toml index 6de0ab4..e3d48f8 100644 --- a/crates/libmarathon/Cargo.toml +++ b/crates/libmarathon/Cargo.toml @@ -51,6 +51,7 @@ static_assertions = "1.1" blake3 = "1.5" blocking = "1.6" +hex.workspace = true bytemuck = { version = "1.14", features = ["derive"] } bytes = "1.0" chrono = { version = "0.4", features = ["serde"] } @@ -64,6 +65,7 @@ glam = "0.29" inventory.workspace = true iroh = { workspace = true, features = ["discovery-local-network"] } iroh-gossip.workspace = true +pkarr = "5.0" itertools = "0.14" rand = "0.8" raw-window-handle = "0.6" @@ -73,6 +75,7 @@ serde_json.workspace = true sha2 = "0.10" thiserror = "2.0" tokio.workspace = true +tokio-util.workspace = true toml.workspace = true tracing.workspace = true uuid = { version = "1.0", features = ["v4", "serde"] } @@ -91,6 +94,10 @@ tempfile = "3" proptest = "1.4" criterion = "0.5" +[features] +# Feature to skip expensive networking operations in tests +fast_tests = [] + [[bench]] name = "write_buffer" harness = false diff --git a/crates/libmarathon/src/engine/commands.rs b/crates/libmarathon/src/engine/commands.rs index 0e13de7..3235272 100644 --- a/crates/libmarathon/src/engine/commands.rs +++ b/crates/libmarathon/src/engine/commands.rs @@ -46,4 +46,7 @@ pub enum EngineCommand { // Clock TickClock, + + // Lifecycle + Shutdown, } diff --git a/crates/libmarathon/src/engine/core.rs b/crates/libmarathon/src/engine/core.rs index f11ad74..b6866e6 100644 --- a/crates/libmarathon/src/engine/core.rs +++ b/crates/libmarathon/src/engine/core.rs @@ -44,13 +44,19 @@ impl EngineCore { // Process commands as they arrive while let Some(cmd) = self.handle.command_rx.recv().await { - self.handle_command(cmd).await; + let should_continue = self.handle_command(cmd).await; + if !should_continue { + tracing::info!("EngineCore received shutdown command"); + break; + } } - tracing::info!("EngineCore shutting down (command channel closed)"); + tracing::info!("EngineCore shutting down"); } - async fn handle_command(&mut self, cmd: EngineCommand) { + /// Handle a command from Bevy + /// Returns true to continue running, false to shutdown + async fn handle_command(&mut self, cmd: EngineCommand) -> bool { match cmd { EngineCommand::StartNetworking { session_id } => { self.start_networking(session_id).await; @@ -74,11 +80,16 @@ impl EngineCore { EngineCommand::TickClock => { self.tick_clock(); } + EngineCommand::Shutdown => { + tracing::info!("Shutdown command received"); + return false; + } // TODO: Handle CRDT and lock commands in Phase 2 _ => { tracing::debug!("Unhandled command: {:?}", cmd); } } + true } fn tick_clock(&mut self) { @@ -98,6 +109,25 @@ impl EngineCore { tracing::info!("Starting networking initialization for session {}", session_id.to_code()); + // Test mode: Skip actual networking and send event immediately + #[cfg(feature = "fast_tests")] + { + let bridge = crate::networking::GossipBridge::new(self.node_id); + let _ = self.handle.event_tx.send(EngineEvent::NetworkingStarted { + session_id: session_id.clone(), + node_id: self.node_id, + bridge, + }); + tracing::info!("Networking started (test mode) for session {}", session_id.to_code()); + + // Create a dummy task that just waits + let task = tokio::spawn(async { + tokio::time::sleep(tokio::time::Duration::from_secs(3600)).await; + }); + self.networking_task = Some(task); + return; + } + // Create cancellation token for graceful shutdown let cancel_token = CancellationToken::new(); let cancel_token_clone = cancel_token.clone(); @@ -105,10 +135,10 @@ impl EngineCore { // Spawn NetworkingManager initialization in background to avoid blocking // DHT peer discovery can take 15+ seconds with retries let event_tx = self.handle.event_tx.clone(); - + // Create channel for progress updates let (progress_tx, mut progress_rx) = tokio::sync::mpsc::unbounded_channel(); - + // Spawn task to forward progress updates to Bevy let event_tx_clone = event_tx.clone(); let session_id_clone = session_id.clone(); @@ -120,7 +150,7 @@ impl EngineCore { }); } }); - + let task = tokio::spawn(async move { match NetworkingManager::new(session_id.clone(), Some(progress_tx), cancel_token_clone.clone()).await { Ok((net_manager, bridge)) => { diff --git a/crates/libmarathon/src/engine/events.rs b/crates/libmarathon/src/engine/events.rs index 465cf39..3ebb4ad 100644 --- a/crates/libmarathon/src/engine/events.rs +++ b/crates/libmarathon/src/engine/events.rs @@ -4,9 +4,29 @@ use crate::networking::{NodeId, SessionId, VectorClock}; use bevy::prelude::*; use uuid::Uuid; +#[derive(Debug, Clone)] +pub enum NetworkingInitStatus { + CreatingEndpoint, + EndpointReady, + DiscoveringPeers { + session_code: String, + attempt: u8, + }, + PeersFound { + count: usize, + }, + NoPeersFound, + PublishingToDHT, + InitializingGossip, +} + #[derive(Debug, Clone)] pub enum EngineEvent { // Networking status + NetworkingInitializing { + session_id: SessionId, + status: NetworkingInitStatus, + }, NetworkingStarted { session_id: SessionId, node_id: NodeId, diff --git a/crates/libmarathon/src/engine/mod.rs b/crates/libmarathon/src/engine/mod.rs index 41a1f3f..4628be5 100644 --- a/crates/libmarathon/src/engine/mod.rs +++ b/crates/libmarathon/src/engine/mod.rs @@ -14,12 +14,13 @@ mod core; mod events; mod game_actions; mod networking; +mod peer_discovery; mod persistence; pub use bridge::{EngineBridge, EngineHandle}; pub use commands::EngineCommand; pub use core::EngineCore; -pub use events::EngineEvent; +pub use events::{EngineEvent, NetworkingInitStatus}; pub use game_actions::GameAction; pub use networking::NetworkingManager; pub use persistence::PersistenceManager; diff --git a/crates/libmarathon/src/engine/networking.rs b/crates/libmarathon/src/engine/networking.rs index 17831f6..b616d78 100644 --- a/crates/libmarathon/src/engine/networking.rs +++ b/crates/libmarathon/src/engine/networking.rs @@ -249,9 +249,31 @@ impl NetworkingManager { } Event::NeighborUp(peer) => { tracing::info!("Peer connected: {}", peer); + + // Convert PublicKey to NodeId for Bevy + let peer_bytes = peer.as_bytes(); + let mut node_id_bytes = [0u8; 16]; + node_id_bytes.copy_from_slice(&peer_bytes[..16]); + let peer_node_id = NodeId::from_bytes(node_id_bytes); + + // Notify Bevy of peer join + let _ = event_tx.send(EngineEvent::PeerJoined { + node_id: peer_node_id, + }); } Event::NeighborDown(peer) => { tracing::warn!("Peer disconnected: {}", peer); + + // Convert PublicKey to NodeId for Bevy + let peer_bytes = peer.as_bytes(); + let mut node_id_bytes = [0u8; 16]; + node_id_bytes.copy_from_slice(&peer_bytes[..16]); + let peer_node_id = NodeId::from_bytes(node_id_bytes); + + // Notify Bevy of peer leave + let _ = event_tx.send(EngineEvent::PeerLeft { + node_id: peer_node_id, + }); } Event::Lagged => { tracing::warn!("Event stream lagged"); diff --git a/crates/libmarathon/src/engine/peer_discovery.rs b/crates/libmarathon/src/engine/peer_discovery.rs new file mode 100644 index 0000000..922d89a --- /dev/null +++ b/crates/libmarathon/src/engine/peer_discovery.rs @@ -0,0 +1,151 @@ +//! DHT-based peer discovery for session collaboration +//! +//! Each peer publishes their EndpointId to the DHT using a session-derived pkarr key. +//! Other peers query the DHT to discover all peers in the session. + +use anyhow::Result; +use iroh::EndpointId; +use std::time::Duration; + +use crate::networking::SessionId; + +pub async fn publish_peer_to_dht( + session_id: &SessionId, + our_endpoint_id: EndpointId, + dht_client: &pkarr::Client, +) -> Result<()> { + use pkarr::dns::{self, rdata}; + use pkarr::dns::rdata::RData; + + let keypair = session_id.to_pkarr_keypair(); + let public_key = keypair.public_key(); + + // Query DHT for existing peers in this session + let existing_peers = match dht_client.resolve(&public_key).await { + Some(packet) => { + let mut peers = Vec::new(); + for rr in packet.all_resource_records() { + if let RData::TXT(txt) = &rr.rdata { + if let Ok(txt_str) = String::try_from(txt.clone()) { + if let Some(hex) = txt_str.strip_prefix("peer=") { + if let Ok(bytes) = hex::decode(hex) { + if bytes.len() == 32 { + if let Ok(endpoint_id) = EndpointId::from_bytes(&bytes.try_into().unwrap()) { + // Don't include ourselves if we're already in the list + if endpoint_id != our_endpoint_id { + peers.push(endpoint_id); + } + } + } + } + } + } + } + } + peers + } + None => Vec::new(), + }; + + // Build packet with all peers (existing + ourselves) + let name = dns::Name::new("_peers").expect("constant"); + let mut builder = pkarr::SignedPacket::builder(); + + // Add TXT record for each existing peer + for peer in existing_peers { + let peer_hex = hex::encode(peer.as_bytes()); + let peer_str = format!("peer={}", peer_hex); + let mut txt = rdata::TXT::new(); + txt.add_string(&peer_str)?; + builder = builder.txt(name.clone(), txt.into_owned(), 3600); + } + + // Add TXT record for ourselves + let our_hex = hex::encode(our_endpoint_id.as_bytes()); + let our_str = format!("peer={}", our_hex); + let mut our_txt = rdata::TXT::new(); + our_txt.add_string(&our_str)?; + builder = builder.txt(name, our_txt.into_owned(), 3600); + + // Build and sign the packet + let signed_packet = builder.build(&keypair)?; + + // Publish to DHT + dht_client.publish(&signed_packet, None).await?; + + tracing::info!( + "Published peer {} to DHT for session {}", + our_endpoint_id.fmt_short(), + session_id.to_code() + ); + + Ok(()) +} + +pub async fn discover_peers_from_dht( + session_id: &SessionId, + dht_client: &pkarr::Client, +) -> Result> { + use pkarr::dns::rdata::RData; + + let keypair = session_id.to_pkarr_keypair(); + let public_key = keypair.public_key(); + + // Query DHT for the session's public key + let signed_packet = match dht_client.resolve(&public_key).await { + Some(packet) => packet, + None => { + tracing::debug!("No peers found in DHT for session {}", session_id.to_code()); + return Ok(vec![]); + } + }; + + // Parse TXT records to extract peer endpoint IDs + let mut peers = Vec::new(); + + for rr in signed_packet.all_resource_records() { + if let RData::TXT(txt) = &rr.rdata { + // Try to parse as a String + if let Ok(txt_str) = String::try_from(txt.clone()) { + // Parse "peer=" + if let Some(hex) = txt_str.strip_prefix("peer=") { + if let Ok(bytes) = hex::decode(hex) { + if bytes.len() == 32 { + if let Ok(endpoint_id) = EndpointId::from_bytes(&bytes.try_into().unwrap()) { + peers.push(endpoint_id); + } + } + } + } + } + } + } + + tracing::info!( + "Discovered {} peers from DHT for session {}", + peers.len(), + session_id.to_code() + ); + + Ok(peers) +} + +/// Periodically republishes our presence to the DHT +/// +/// Should be called in a background task to maintain our DHT presence. +/// Republishes every 30 minutes (well before the 1-hour TTL expires). +pub async fn maintain_dht_presence( + session_id: SessionId, + our_endpoint_id: EndpointId, + dht_client: pkarr::Client, +) { + let mut interval = tokio::time::interval(Duration::from_secs(30 * 60)); // 30 minutes + + loop { + interval.tick().await; + + if let Err(e) = publish_peer_to_dht(&session_id, our_endpoint_id, &dht_client).await { + tracing::warn!("Failed to republish to DHT: {}", e); + } + } +} diff --git a/crates/libmarathon/src/lib.rs b/crates/libmarathon/src/lib.rs index 2cdbde7..3e46de8 100644 --- a/crates/libmarathon/src/lib.rs +++ b/crates/libmarathon/src/lib.rs @@ -29,6 +29,7 @@ pub mod networking; pub mod persistence; pub mod platform; pub mod render; // Vendored Bevy rendering (bevy_render + bevy_core_pipeline + bevy_pbr) +pub mod transform; // Vendored Transform with rkyv support pub mod utils; pub mod sync; diff --git a/crates/libmarathon/src/networking/components.rs b/crates/libmarathon/src/networking/components.rs index 71d9b6f..c26e871 100644 --- a/crates/libmarathon/src/networking/components.rs +++ b/crates/libmarathon/src/networking/components.rs @@ -156,49 +156,36 @@ impl Default for NetworkedEntity { #[reflect(Component)] pub struct NetworkedTransform; -/// Wrapper for a selection component using OR-Set semantics +/// Local selection tracking resource /// -/// Tracks a set of selected entity network IDs. Uses OR-Set (Observed-Remove) -/// CRDT to handle concurrent add/remove operations correctly. +/// This global resource tracks which entities are currently selected by THIS node. +/// It's used in conjunction with the entity lock system to coordinate concurrent editing. /// -/// # OR-Set Semantics -/// -/// - Concurrent adds and removes: add wins -/// - Each add has a unique operation ID -/// - Removes reference specific add operation IDs +/// **Selections are local-only UI state** and are NOT synchronized across the network. +/// Each node maintains its own independent selection. /// /// # Example /// /// ``` /// use bevy::prelude::*; -/// use libmarathon::networking::{ -/// NetworkedEntity, -/// NetworkedSelection, -/// }; +/// use libmarathon::networking::LocalSelection; /// use uuid::Uuid; /// -/// fn create_selection(mut commands: Commands) { -/// let node_id = Uuid::new_v4(); -/// let mut selection = NetworkedSelection::new(); +/// fn handle_click(mut selection: ResMut) { +/// // Clear previous selection +/// selection.clear(); /// -/// // Add some entities to the selection -/// selection.selected_ids.insert(Uuid::new_v4()); -/// selection.selected_ids.insert(Uuid::new_v4()); -/// -/// commands.spawn((NetworkedEntity::new(node_id), selection)); +/// // Select a new entity +/// selection.insert(Uuid::new_v4()); /// } /// ``` -#[derive(Component, Reflect, Debug, Clone, Default)] -#[reflect(Component)] -pub struct NetworkedSelection { +#[derive(Resource, Debug, Clone, Default)] +pub struct LocalSelection { /// Set of selected entity network IDs - /// - /// This will be synchronized using OR-Set CRDT semantics in later phases. - /// For now, it's a simple HashSet. - pub selected_ids: std::collections::HashSet, + selected_ids: std::collections::HashSet, } -impl NetworkedSelection { +impl LocalSelection { /// Create a new empty selection pub fn new() -> Self { Self { @@ -207,13 +194,13 @@ impl NetworkedSelection { } /// Add an entity to the selection - pub fn add(&mut self, entity_id: uuid::Uuid) { - self.selected_ids.insert(entity_id); + pub fn insert(&mut self, entity_id: uuid::Uuid) -> bool { + self.selected_ids.insert(entity_id) } /// Remove an entity from the selection - pub fn remove(&mut self, entity_id: uuid::Uuid) { - self.selected_ids.remove(&entity_id); + pub fn remove(&mut self, entity_id: uuid::Uuid) -> bool { + self.selected_ids.remove(&entity_id) } /// Check if an entity is selected @@ -235,6 +222,11 @@ impl NetworkedSelection { pub fn is_empty(&self) -> bool { self.selected_ids.is_empty() } + + /// Get an iterator over selected entity IDs + pub fn iter(&self) -> impl Iterator { + self.selected_ids.iter() + } } /// Wrapper for a drawing path component using Sequence CRDT semantics @@ -361,18 +353,18 @@ mod tests { } #[test] - fn test_networked_selection() { - let mut selection = NetworkedSelection::new(); + fn test_local_selection() { + let mut selection = LocalSelection::new(); let id1 = uuid::Uuid::new_v4(); let id2 = uuid::Uuid::new_v4(); assert!(selection.is_empty()); - selection.add(id1); + selection.insert(id1); assert_eq!(selection.len(), 1); assert!(selection.contains(id1)); - selection.add(id2); + selection.insert(id2); assert_eq!(selection.len(), 2); assert!(selection.contains(id2)); diff --git a/crates/libmarathon/src/networking/delta_generation.rs b/crates/libmarathon/src/networking/delta_generation.rs index a63d92e..f009d34 100644 --- a/crates/libmarathon/src/networking/delta_generation.rs +++ b/crates/libmarathon/src/networking/delta_generation.rs @@ -66,10 +66,8 @@ impl NodeVectorClock { /// App::new().add_systems(Update, generate_delta_system); /// ``` pub fn generate_delta_system(world: &mut World) { - // Check if bridge exists - if world.get_resource::().is_none() { - return; - } + // Works both online and offline - clock increments and operations are recorded + // Broadcast only happens when online let changed_entities: Vec<(Entity, uuid::Uuid, uuid::Uuid)> = { let mut query = @@ -93,7 +91,7 @@ pub fn generate_delta_system(world: &mut World) { for (entity, network_id, _owner_node_id) in changed_entities { // Phase 1: Check and update clocks, collect data let mut system_state: bevy::ecs::system::SystemState<( - Res, + Option>, Res, ResMut, ResMut, @@ -144,31 +142,41 @@ pub fn generate_delta_system(world: &mut World) { // Create EntityDelta let delta = EntityDelta::new(network_id, node_id, vector_clock.clone(), operations); - // Record in operation log for anti-entropy + // Record in operation log for anti-entropy (works offline!) if let Some(ref mut log) = operation_log { log.record_operation(delta.clone()); } - // Wrap in VersionedMessage - let message = VersionedMessage::new(SyncMessage::EntityDelta { - entity_id: delta.entity_id, - node_id: delta.node_id, - vector_clock: delta.vector_clock.clone(), - operations: delta.operations.clone(), - }); + // Broadcast if online + if let Some(ref bridge) = bridge { + // Wrap in VersionedMessage + let message = VersionedMessage::new(SyncMessage::EntityDelta { + entity_id: delta.entity_id, + node_id: delta.node_id, + vector_clock: delta.vector_clock.clone(), + operations: delta.operations.clone(), + }); - // Broadcast - if let Err(e) = bridge.send(message) { - error!("Failed to broadcast EntityDelta: {}", e); + // Broadcast to peers + if let Err(e) = bridge.send(message) { + error!("Failed to broadcast EntityDelta: {}", e); + } else { + debug!( + "Broadcast EntityDelta for entity {:?} with {} operations", + network_id, + delta.operations.len() + ); + } } else { debug!( - "Broadcast EntityDelta for entity {:?} with {} operations", - network_id, - delta.operations.len() + "Generated EntityDelta for entity {:?} offline (will sync when online)", + network_id ); - last_versions.update(network_id, current_seq); } + // Update last sync version (both online and offline) + last_versions.update(network_id, current_seq); + delta }; diff --git a/crates/libmarathon/src/networking/locks.rs b/crates/libmarathon/src/networking/locks.rs index dd26adf..27bb81b 100644 --- a/crates/libmarathon/src/networking/locks.rs +++ b/crates/libmarathon/src/networking/locks.rs @@ -47,7 +47,6 @@ use uuid::Uuid; use crate::networking::{ GossipBridge, - NetworkedSelection, NodeId, VersionedMessage, delta_generation::NodeVectorClock, @@ -334,10 +333,63 @@ impl EntityLockRegistry { } } +/// System to acquire locks when entities are selected +/// +/// This system detects when entities are added to the global `LocalSelection` +/// resource and attempts to acquire locks on those entities, broadcasting +/// the request to other peers. +pub fn acquire_locks_on_selection_system( + mut registry: ResMut, + node_clock: Res, + bridge: Option>, + selection: Res, +) { + // Only run when selection changes + if !selection.is_changed() { + return; + } + + let node_id = node_clock.node_id; + + // Try to acquire locks for all selected entities + for &entity_id in selection.iter() { + let already_locked = registry.is_locked_by(entity_id, node_id, node_id); + + // Only try to acquire if we don't already hold the lock + if !already_locked { + match registry.try_acquire(entity_id, node_id) { + Ok(()) => { + info!("Acquired lock on newly selected entity {}", entity_id); + + // Broadcast LockRequest + if let Some(ref bridge) = bridge { + let msg = VersionedMessage::new(SyncMessage::Lock(LockMessage::LockRequest { + entity_id, + node_id, + })); + + if let Err(e) = bridge.send(msg) { + error!("Failed to broadcast LockRequest on selection: {}", e); + } else { + debug!("LockRequest broadcast successful for entity {}", entity_id); + } + } else { + warn!("No GossipBridge available to broadcast LockRequest"); + } + } + Err(holder) => { + warn!("Failed to acquire lock on selected entity {} (held by {})", entity_id, holder); + } + } + } + } +} + /// System to release locks when entities are deselected /// -/// This system detects when entities are removed from selection and releases -/// any locks held on those entities, broadcasting the release to other peers. +/// This system detects when entities are removed from the global `LocalSelection` +/// resource and releases any locks held on those entities, broadcasting the release +/// to other peers. /// /// Add to your app as an Update system: /// ```no_run @@ -350,42 +402,46 @@ pub fn release_locks_on_deselection_system( mut registry: ResMut, node_clock: Res, bridge: Option>, - mut selection_query: Query<&mut NetworkedSelection, Changed>, + selection: Res, ) { + // Only run when selection changes + if !selection.is_changed() { + return; + } + let node_id = node_clock.node_id; - for selection in selection_query.iter_mut() { - // Find entities that were previously locked but are no longer selected - let currently_selected: std::collections::HashSet = selection.selected_ids.clone(); + // Check all locks held by this node + let locks_to_release: Vec = registry + .locks + .iter() + .filter(|(entity_id, lock)| { + // Release if held by us and not currently selected + lock.holder == node_id && !selection.contains(**entity_id) + }) + .map(|(entity_id, _)| *entity_id) + .collect(); - // Check all locks held by this node - let locks_to_release: Vec = registry - .locks - .iter() - .filter(|(entity_id, lock)| { - // Release if held by us and not currently selected - lock.holder == node_id && !currently_selected.contains(entity_id) - }) - .map(|(entity_id, _)| *entity_id) - .collect(); + if !locks_to_release.is_empty() { + info!("Selection cleared, releasing {} locks", locks_to_release.len()); + } - // Release each lock and broadcast - for entity_id in locks_to_release { - if registry.release(entity_id, node_id) { - debug!("Releasing lock on deselected entity {}", entity_id); + // Release each lock and broadcast + for entity_id in locks_to_release { + if registry.release(entity_id, node_id) { + info!("Released lock on deselected entity {}", entity_id); - // Broadcast LockRelease - if let Some(ref bridge) = bridge { - let msg = VersionedMessage::new(SyncMessage::Lock(LockMessage::LockRelease { - entity_id, - node_id, - })); + // Broadcast LockRelease + if let Some(ref bridge) = bridge { + let msg = VersionedMessage::new(SyncMessage::Lock(LockMessage::LockRelease { + entity_id, + node_id, + })); - if let Err(e) = bridge.send(msg) { - error!("Failed to broadcast LockRelease on deselection: {}", e); - } else { - info!("Lock released on deselection: entity {}", entity_id); - } + if let Err(e) = bridge.send(msg) { + error!("Failed to broadcast LockRelease on deselection: {}", e); + } else { + info!("Lock released on deselection: entity {}", entity_id); } } } diff --git a/crates/libmarathon/src/networking/merge.rs b/crates/libmarathon/src/networking/merge.rs index 92d6f22..c16a2b4 100644 --- a/crates/libmarathon/src/networking/merge.rs +++ b/crates/libmarathon/src/networking/merge.rs @@ -121,8 +121,8 @@ pub fn should_apply_set(local_op: &ComponentOp, remote_op: &ComponentOp) -> bool // Use the sequence number from the clocks as a simple tiebreaker // In a real implementation, we'd use the full node IDs - let local_seq: u64 = local_clock.clocks.values().sum(); - let remote_seq: u64 = remote_clock.clocks.values().sum(); + let local_seq: u64 = local_clock.timestamps.values().sum(); + let remote_seq: u64 = remote_clock.timestamps.values().sum(); // Compare clocks match compare_operations_lww( diff --git a/crates/libmarathon/src/networking/message_dispatcher.rs b/crates/libmarathon/src/networking/message_dispatcher.rs index 9c73360..40c8237 100644 --- a/crates/libmarathon/src/networking/message_dispatcher.rs +++ b/crates/libmarathon/src/networking/message_dispatcher.rs @@ -449,7 +449,6 @@ fn build_full_state_from_data( // Skip networked wrapper components if type_path.ends_with("::NetworkedEntity") || type_path.ends_with("::NetworkedTransform") || - type_path.ends_with("::NetworkedSelection") || type_path.ends_with("::NetworkedDrawingPath") { continue; diff --git a/crates/libmarathon/src/networking/messages.rs b/crates/libmarathon/src/networking/messages.rs index 91ae734..6f34f83 100644 --- a/crates/libmarathon/src/networking/messages.rs +++ b/crates/libmarathon/src/networking/messages.rs @@ -26,6 +26,13 @@ pub struct VersionedMessage { /// The actual sync message pub message: SyncMessage, + + /// Timestamp (nanos since UNIX epoch) to make messages unique + /// + /// This prevents iroh-gossip from deduplicating identical messages sent at different times. + /// For example, releasing and re-acquiring a lock sends identical LockRequest messages, + /// but they need to be treated as separate events. + pub timestamp_nanos: u64, } impl VersionedMessage { @@ -34,9 +41,17 @@ impl VersionedMessage { /// Create a new versioned message with the current protocol version pub fn new(message: SyncMessage) -> Self { + use std::time::{SystemTime, UNIX_EPOCH}; + + let timestamp_nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() as u64; + Self { version: Self::CURRENT_VERSION, message, + timestamp_nanos, } } } diff --git a/crates/libmarathon/src/networking/mod.rs b/crates/libmarathon/src/networking/mod.rs index f04847d..ec30b2e 100644 --- a/crates/libmarathon/src/networking/mod.rs +++ b/crates/libmarathon/src/networking/mod.rs @@ -120,11 +120,13 @@ pub fn spawn_networked_entity( ) -> bevy::prelude::Entity { use bevy::prelude::*; - // Spawn with both NetworkedEntity and Persisted components + // Spawn with NetworkedEntity, Persisted, and Synced components + // The Synced marker triggers auto-insert of NetworkedTransform if entity has Transform let entity = world .spawn(( NetworkedEntity::with_id(entity_id, node_id), crate::persistence::Persisted::with_id(entity_id), + Synced, )) .id(); diff --git a/crates/libmarathon/src/networking/plugin.rs b/crates/libmarathon/src/networking/plugin.rs index d17ab07..9693a90 100644 --- a/crates/libmarathon/src/networking/plugin.rs +++ b/crates/libmarathon/src/networking/plugin.rs @@ -34,6 +34,7 @@ use crate::networking::{ LastSyncVersions, auto_detect_transform_changes_system, }, + components::{NetworkedEntity, NetworkedTransform}, delta_generation::{ NodeVectorClock, generate_delta_system, @@ -43,8 +44,10 @@ use crate::networking::{ cleanup_despawned_entities_system, register_networked_entities_system, }, + gossip_bridge::GossipBridge, locks::{ EntityLockRegistry, + acquire_locks_on_selection_system, broadcast_lock_heartbeats_system, cleanup_expired_locks_system, release_locks_on_deselection_system, @@ -59,6 +62,7 @@ use crate::networking::{ initialize_session_system, save_session_on_shutdown_system, }, + sync_component::Synced, tombstones::{ TombstoneRegistry, garbage_collect_tombstones_system, @@ -142,6 +146,104 @@ impl SessionSecret { } } +/// System that auto-inserts required sync components when `Synced` marker is detected. +/// +/// This system runs in PreUpdate and automatically adds: +/// - `NetworkedEntity` with a new UUID and node ID +/// - `Persisted` with the same UUID +/// - `NetworkedTransform` if the entity has a `Transform` component +/// +/// Note: Selection is now a global `LocalSelection` resource, not a per-entity component. +/// +/// This eliminates the need for users to manually add these components when spawning synced entities. +fn auto_insert_sync_components( + mut commands: Commands, + query: Query, Without)>, + node_clock: Res, + // We need access to check if entity has Transform + transforms: Query<&Transform>, +) { + for entity in &query { + let entity_id = uuid::Uuid::new_v4(); + let node_id = node_clock.node_id; + + // Always add NetworkedEntity and Persisted + let mut entity_commands = commands.entity(entity); + entity_commands.insert(( + NetworkedEntity::with_id(entity_id, node_id), + crate::persistence::Persisted::with_id(entity_id), + )); + + // Auto-add NetworkedTransform if entity has Transform + if transforms.contains(entity) { + entity_commands.insert(NetworkedTransform); + } + + debug!("Auto-inserted sync components for entity {:?} (UUID: {})", entity, entity_id); + } +} + +/// System that adds NetworkedTransform to networked entities when Transform is added. +/// +/// This handles entities received from the network that already have NetworkedEntity, +/// Persisted, and Synced, but need NetworkedTransform when Transform is added. +fn auto_insert_networked_transform( + mut commands: Commands, + query: Query< + Entity, + ( + With, + With, + Added, + Without, + ), + >, +) { + for entity in &query { + commands.entity(entity).insert(NetworkedTransform); + debug!("Auto-inserted NetworkedTransform for networked entity {:?}", entity); + } +} + +/// System that triggers anti-entropy sync when going online (GossipBridge added). +/// +/// This handles the offline-to-online transition: when GossipBridge is inserted, +/// we immediately send a SyncRequest to trigger anti-entropy and broadcast all +/// operations from the operation log. +/// +/// Uses a Local resource to track if we've already sent the sync request, so this only runs once. +fn trigger_sync_on_connect( + mut has_synced: Local, + bridge: Res, + node_clock: Res, + operation_log: Res, +) { + if *has_synced { + return; // Already did this + } + + let op_count = operation_log.total_operations(); + debug!( + "Going online: triggering anti-entropy sync to broadcast {} offline operations", + op_count + ); + + // Send a SyncRequest to trigger anti-entropy + // This will cause the message_dispatcher to respond with all operations from our log + let request = crate::networking::operation_log::build_sync_request( + node_clock.node_id, + node_clock.clock.clone(), + ); + + if let Err(e) = bridge.send(request) { + error!("Failed to send SyncRequest on connect: {}", e); + } else { + debug!("Sent SyncRequest to trigger anti-entropy sync"); + } + + *has_synced = true; +} + /// Bevy plugin for CRDT networking /// /// This plugin sets up all systems and resources needed for distributed @@ -236,7 +338,8 @@ impl Plugin for NetworkingPlugin { .insert_resource(OperationLog::new()) .insert_resource(TombstoneRegistry::new()) .insert_resource(EntityLockRegistry::new()) - .insert_resource(crate::networking::ComponentVectorClocks::new()); + .insert_resource(crate::networking::ComponentVectorClocks::new()) + .insert_resource(crate::networking::LocalSelection::new()); // Startup systems - initialize session from persistence app.add_systems(Startup, initialize_session_system); @@ -245,12 +348,16 @@ impl Plugin for NetworkingPlugin { app.add_systems( PreUpdate, ( + // Auto-insert sync components when Synced marker is added (must run first) + auto_insert_sync_components, // Register new networked entities register_networked_entities_system, // Central message dispatcher - handles all incoming messages // This replaces the individual message handling systems and // eliminates O(n²) behavior from multiple systems polling the same queue message_dispatcher_system, + // Auto-insert NetworkedTransform for networked entities when Transform is added + auto_insert_networked_transform, ) .chain(), ); @@ -263,11 +370,20 @@ impl Plugin for NetworkingPlugin { auto_detect_transform_changes_system, // Handle local entity deletions handle_local_deletions_system, + // Acquire locks when entities are selected + acquire_locks_on_selection_system, // Release locks when entities are deselected release_locks_on_deselection_system, ), ); + // Trigger anti-entropy sync when going online (separate from chain to allow conditional execution) + app.add_systems( + PostUpdate, + trigger_sync_on_connect + .run_if(bevy::ecs::schedule::common_conditions::resource_exists::), + ); + // PostUpdate systems - generate and send deltas app.add_systems( PostUpdate, diff --git a/crates/libmarathon/src/networking/session.rs b/crates/libmarathon/src/networking/session.rs index 4a5a55b..b7b78e9 100644 --- a/crates/libmarathon/src/networking/session.rs +++ b/crates/libmarathon/src/networking/session.rs @@ -112,6 +112,24 @@ impl SessionId { *hash.as_bytes() } + /// Derive deterministic pkarr keypair for DHT-based peer discovery + /// + /// All peers in the same session derive the same keypair from the session code. + /// This shared keypair is used to publish and discover peer EndpointIds in the DHT. + /// + /// # Security + /// The session code is the secret - anyone with the code can discover peers. + /// The domain separation prefix ensures no collision with other uses. + pub fn to_pkarr_keypair(&self) -> pkarr::Keypair { + let mut hasher = blake3::Hasher::new(); + hasher.update(b"/app/v1/session-pkarr-key/"); + hasher.update(self.uuid.as_bytes()); + let hash = hasher.finalize(); + + let secret_bytes: [u8; 32] = *hash.as_bytes(); + pkarr::Keypair::from_secret_key(&secret_bytes) + } + /// Get raw UUID pub fn as_uuid(&self) -> &Uuid { &self.uuid diff --git a/crates/libmarathon/src/networking/sync_component.rs b/crates/libmarathon/src/networking/sync_component.rs index ae18a61..3beb2e5 100644 --- a/crates/libmarathon/src/networking/sync_component.rs +++ b/crates/libmarathon/src/networking/sync_component.rs @@ -97,33 +97,29 @@ pub trait SyncComponent: Component + Reflect + Sized { fn merge(&mut self, remote: Self, clock_cmp: ClockComparison) -> ComponentMergeDecision; } -/// Marker component for entities that should be synced +/// Marker component indicating that an entity should be synchronized across the network. /// -/// Add this to any entity with synced components to enable automatic -/// change detection and synchronization. +/// When this component is added to an entity, the `auto_insert_sync_components` system +/// will automatically add the required infrastructure components: +/// - `NetworkedEntity` - for network synchronization +/// - `Persisted` - for persistence +/// - `NetworkedTransform` - if the entity has a `Transform` component /// /// # Example -/// ``` -/// use bevy::prelude::*; -/// use libmarathon::networking::Synced; -/// use sync_macros::Synced as SyncedDerive; /// -/// #[derive(Component, Reflect, Clone, serde::Serialize, serde::Deserialize, SyncedDerive)] -/// #[sync(version = 1, strategy = "LastWriteWins")] -/// struct Health(f32); -/// -/// #[derive(Component, Reflect, Clone, serde::Serialize, serde::Deserialize, SyncedDerive)] -/// #[sync(version = 1, strategy = "LastWriteWins")] -/// struct Position { -/// x: f32, -/// y: f32, +/// ```no_compile +/// // Define a synced component with the #[synced] attribute +/// #[macros::synced] +/// pub struct CubeMarker { +/// pub color_r: f32, +/// pub size: f32, /// } /// -/// let mut world = World::new(); -/// world.spawn(( -/// Health(100.0), -/// Position { x: 0.0, y: 0.0 }, -/// Synced, // Marker enables sync +/// // Spawn with just the Synced marker - infrastructure auto-added +/// commands.spawn(( +/// CubeMarker::with_color(Color::RED, 1.0), +/// Transform::from_translation(pos), +/// Synced, // Auto-adds NetworkedEntity, Persisted, NetworkedTransform /// )); /// ``` #[derive(Component, Reflect, Default, Clone, Copy)] diff --git a/crates/libmarathon/src/networking/tombstones.rs b/crates/libmarathon/src/networking/tombstones.rs index 16b6713..3024b8a 100644 --- a/crates/libmarathon/src/networking/tombstones.rs +++ b/crates/libmarathon/src/networking/tombstones.rs @@ -220,10 +220,6 @@ pub fn handle_local_deletions_system( mut operation_log: Option>, bridge: Option>, ) { - let Some(bridge) = bridge else { - return; - }; - for (entity, networked) in query.iter() { // Increment clock for deletion node_clock.tick(); @@ -250,25 +246,32 @@ pub fn handle_local_deletions_system( vec![delete_op], ); - // Record in operation log + // Record in operation log (for when we go online later) if let Some(ref mut log) = operation_log { log.record_operation(delta.clone()); } - // Broadcast deletion - let message = - crate::networking::VersionedMessage::new(crate::networking::SyncMessage::EntityDelta { - entity_id: delta.entity_id, - node_id: delta.node_id, - vector_clock: delta.vector_clock.clone(), - operations: delta.operations.clone(), - }); + // Broadcast deletion if online + if let Some(ref bridge) = bridge { + let message = + crate::networking::VersionedMessage::new(crate::networking::SyncMessage::EntityDelta { + entity_id: delta.entity_id, + node_id: delta.node_id, + vector_clock: delta.vector_clock.clone(), + operations: delta.operations.clone(), + }); - if let Err(e) = bridge.send(message) { - error!("Failed to broadcast Delete operation: {}", e); + if let Err(e) = bridge.send(message) { + error!("Failed to broadcast Delete operation: {}", e); + } else { + info!( + "Broadcast Delete operation for entity {:?}", + networked.network_id + ); + } } else { info!( - "Broadcast Delete operation for entity {:?}", + "Deleted entity {:?} locally (offline mode - will sync when online)", networked.network_id ); } diff --git a/crates/libmarathon/src/networking/vector_clock.rs b/crates/libmarathon/src/networking/vector_clock.rs index 85bee33..2d7d969 100644 --- a/crates/libmarathon/src/networking/vector_clock.rs +++ b/crates/libmarathon/src/networking/vector_clock.rs @@ -54,17 +54,22 @@ pub type NodeId = uuid::Uuid; #[derive(Debug, Clone, PartialEq, Eq, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize, Default)] pub struct VectorClock { /// Map from node ID to logical timestamp - pub clocks: HashMap, + pub timestamps: HashMap, } impl VectorClock { /// Create a new empty vector clock pub fn new() -> Self { Self { - clocks: HashMap::new(), + timestamps: HashMap::new(), } } + /// Get the number of nodes tracked in this clock + pub fn node_count(&self) -> usize { + self.timestamps.len() + } + /// Increment the clock for a given node /// /// This should be called by a node before performing a local operation. @@ -86,7 +91,7 @@ impl VectorClock { /// assert_eq!(clock.get(node), 2); /// ``` pub fn increment(&mut self, node_id: NodeId) -> u64 { - let counter = self.clocks.entry(node_id).or_insert(0); + let counter = self.timestamps.entry(node_id).or_insert(0); *counter += 1; *counter } @@ -95,7 +100,7 @@ impl VectorClock { /// /// Returns 0 if the node has never been seen in this vector clock. pub fn get(&self, node_id: NodeId) -> u64 { - self.clocks.get(&node_id).copied().unwrap_or(0) + self.timestamps.get(&node_id).copied().unwrap_or(0) } /// Merge another vector clock into this one @@ -124,8 +129,8 @@ impl VectorClock { /// assert_eq!(clock1.get(node2), 1); /// ``` pub fn merge(&mut self, other: &VectorClock) { - for (node_id, &counter) in &other.clocks { - let current = self.clocks.entry(*node_id).or_insert(0); + for (node_id, &counter) in &other.timestamps { + let current = self.timestamps.entry(*node_id).or_insert(0); *current = (*current).max(counter); } } @@ -158,7 +163,7 @@ impl VectorClock { let mut any_strictly_less = false; // Check our nodes in a single pass - for (node_id, &our_counter) in &self.clocks { + for (node_id, &our_counter) in &self.timestamps { let their_counter = other.get(*node_id); // Early exit if we have a counter greater than theirs @@ -175,8 +180,8 @@ impl VectorClock { // If we haven't found a strictly less counter yet, check if they have // nodes we don't know about with non-zero values (those count as strictly less) if !any_strictly_less { - any_strictly_less = other.clocks.iter().any(|(node_id, &their_counter)| { - !self.clocks.contains_key(node_id) && their_counter > 0 + any_strictly_less = other.timestamps.iter().any(|(node_id, &their_counter)| { + !self.timestamps.contains_key(node_id) && their_counter > 0 }); } @@ -250,7 +255,7 @@ mod tests { #[test] fn test_new_clock() { let clock = VectorClock::new(); - assert_eq!(clock.clocks.len(), 0); + assert_eq!(clock.timestamps.len(), 0); } #[test] diff --git a/crates/libmarathon/src/persistence/database.rs b/crates/libmarathon/src/persistence/database.rs index df88645..d2e0cac 100644 --- a/crates/libmarathon/src/persistence/database.rs +++ b/crates/libmarathon/src/persistence/database.rs @@ -573,7 +573,7 @@ pub fn save_session_vector_clock( )?; // Insert current clock state - for (node_id, &counter) in &clock.clocks { + for (node_id, &counter) in &clock.timestamps { tx.execute( "INSERT INTO vector_clock (session_id, node_id, counter, updated_at) VALUES (?1, ?2, ?3, ?4)", @@ -608,7 +608,7 @@ pub fn load_session_vector_clock( for row in rows { let (node_id_str, counter) = row?; if let Ok(node_id) = uuid::Uuid::parse_str(&node_id_str) { - clock.clocks.insert(node_id, counter as u64); + clock.timestamps.insert(node_id, counter as u64); } } diff --git a/crates/libmarathon/src/persistence/mod.rs b/crates/libmarathon/src/persistence/mod.rs index f5c857c..8ecd08f 100644 --- a/crates/libmarathon/src/persistence/mod.rs +++ b/crates/libmarathon/src/persistence/mod.rs @@ -39,6 +39,7 @@ mod metrics; mod migrations; mod plugin; pub mod reflection; +mod registered_components; mod systems; mod type_registry; mod types; diff --git a/crates/libmarathon/src/persistence/reflection.rs b/crates/libmarathon/src/persistence/reflection.rs index 90a55ce..914d916 100644 --- a/crates/libmarathon/src/persistence/reflection.rs +++ b/crates/libmarathon/src/persistence/reflection.rs @@ -38,6 +38,8 @@ pub struct Persisted { pub network_id: uuid::Uuid, } + + impl Persisted { pub fn new() -> Self { Self { diff --git a/crates/libmarathon/src/persistence/registered_components.rs b/crates/libmarathon/src/persistence/registered_components.rs new file mode 100644 index 0000000..c424a13 --- /dev/null +++ b/crates/libmarathon/src/persistence/registered_components.rs @@ -0,0 +1,64 @@ +//! Component registrations for CRDT synchronization +//! +//! This module registers all components that should be synchronized across +//! the network using the inventory-based type registry. +//! +//! # When to use this file vs `#[synced]` attribute +//! +//! **Use `#[synced]` attribute for:** +//! - Your own component types defined in this codebase +//! - Any type you have source access to +//! - Most game components (entities, markers, etc.) +//! - Example: `#[synced] pub struct CubeMarker { ... }` +//! +//! **Use manual `inventory::submit!` here for:** +//! - Third-party types (Bevy's Transform, external crates) +//! - Types that need custom serialization logic +//! - Types where the serialized format differs from in-memory format +//! +//! # Currently registered external types +//! +//! - `Transform` - Bevy's transform component (needs custom rkyv conversion) + +use std::any::TypeId; + +// Register Transform for synchronization +// We serialize Bevy's Transform but convert to our rkyv-compatible type +inventory::submit! { + crate::persistence::ComponentMeta { + type_name: "Transform", + type_path: "bevy::transform::components::transform::Transform", + type_id: TypeId::of::(), + + deserialize_fn: |bytes: &[u8]| -> anyhow::Result> { + let transform: crate::transform::Transform = rkyv::from_bytes::(bytes)?; + // Convert back to Bevy Transform + let bevy_transform = bevy::prelude::Transform { + translation: transform.translation.into(), + rotation: transform.rotation.into(), + scale: transform.scale.into(), + }; + Ok(Box::new(bevy_transform)) + }, + + serialize_fn: |world: &bevy::ecs::world::World, entity: bevy::ecs::entity::Entity| -> Option { + world.get::(entity).map(|bevy_transform| { + // Convert to our rkyv-compatible Transform + let transform = crate::transform::Transform { + translation: bevy_transform.translation.into(), + rotation: bevy_transform.rotation.into(), + scale: bevy_transform.scale.into(), + }; + let serialized = rkyv::to_bytes::(&transform) + .expect("Failed to serialize Transform"); + bytes::Bytes::from(serialized.to_vec()) + }) + }, + + insert_fn: |entity_mut: &mut bevy::ecs::world::EntityWorldMut, boxed: Box| { + if let Ok(transform) = boxed.downcast::() { + entity_mut.insert(*transform); + } + }, + } +} diff --git a/crates/libmarathon/src/persistence/type_registry.rs b/crates/libmarathon/src/persistence/type_registry.rs index 2e21952..ed3c456 100644 --- a/crates/libmarathon/src/persistence/type_registry.rs +++ b/crates/libmarathon/src/persistence/type_registry.rs @@ -261,6 +261,51 @@ impl Default for ComponentTypeRegistryResource { } } + +/// Macro to register a component type with the inventory system +/// +/// This generates the necessary serialize/deserialize functions and submits +/// the ComponentMeta to inventory for runtime registration. +/// +/// # Example +/// +/// ```ignore +/// use bevy::prelude::*; +/// register_component!(Transform, "bevy::transform::components::Transform"); +/// ``` +#[macro_export] +macro_rules! register_component { + ($component_type:ty, $type_path:expr) => { + // Submit component metadata to inventory + inventory::submit! { + $crate::persistence::ComponentMeta { + type_name: stringify!($component_type), + type_path: $type_path, + type_id: std::any::TypeId::of::<$component_type>(), + + deserialize_fn: |bytes: &[u8]| -> anyhow::Result> { + let component: $component_type = rkyv::from_bytes(bytes)?; + Ok(Box::new(component)) + }, + + serialize_fn: |world: &bevy::ecs::world::World, entity: bevy::ecs::entity::Entity| -> Option { + world.get::<$component_type>(entity).map(|component| { + let serialized = rkyv::to_bytes::(component) + .expect("Failed to serialize component"); + bytes::Bytes::from(serialized.to_vec()) + }) + }, + + insert_fn: |entity_mut: &mut bevy::ecs::world::EntityWorldMut, boxed: Box| { + if let Ok(component) = boxed.downcast::<$component_type>() { + entity_mut.insert(*component); + } + }, + } + } + }; +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/libmarathon/src/platform/desktop/executor.rs b/crates/libmarathon/src/platform/desktop/executor.rs index 1c8323a..78a4499 100644 --- a/crates/libmarathon/src/platform/desktop/executor.rs +++ b/crates/libmarathon/src/platform/desktop/executor.rs @@ -118,6 +118,12 @@ fn send_window_closing(app: &mut App, window: Entity) { .write(WindowClosing { window }); } +fn send_app_exit(app: &mut App) { + app.world_mut() + .resource_mut::>() + .write(bevy::app::AppExit::Success); +} + impl AppHandler { /// Initialize the window and transition to Running state. /// @@ -233,7 +239,10 @@ impl AppHandler { // Send WindowClosing event send_window_closing(bevy_app, *bevy_window_entity); - // Run one final update to process close event + // Send AppExit event to trigger cleanup systems + send_app_exit(bevy_app); + + // Run one final update to process close events and cleanup bevy_app.update(); // Don't call finish/cleanup - let Bevy's AppExit handle it diff --git a/crates/libmarathon/src/platform/ios/executor.rs b/crates/libmarathon/src/platform/ios/executor.rs index 3dbcf90..789410f 100644 --- a/crates/libmarathon/src/platform/ios/executor.rs +++ b/crates/libmarathon/src/platform/ios/executor.rs @@ -238,7 +238,10 @@ impl AppHandler { // Send WindowClosing event send_window_closing(bevy_app, *bevy_window_entity); - // Run one final update to process close event + // Send AppExit event to trigger cleanup systems + bevy_app.world_mut().send_message(AppExit::Success); + + // Run one final update to process close events and cleanup bevy_app.update(); } diff --git a/crates/libmarathon/src/transform/math.rs b/crates/libmarathon/src/transform/math.rs new file mode 100644 index 0000000..508d10f --- /dev/null +++ b/crates/libmarathon/src/transform/math.rs @@ -0,0 +1,63 @@ +//! Math primitives with rkyv support +//! +//! Vendored from bevy_math with rkyv derives added. + +/// A 3-dimensional vector. +#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)] +#[repr(C)] +pub struct Vec3 { + pub x: f32, + pub y: f32, + pub z: f32, +} + +impl Vec3 { + pub const ZERO: Self = Self { x: 0.0, y: 0.0, z: 0.0 }; + pub const ONE: Self = Self { x: 1.0, y: 1.0, z: 1.0 }; + + #[inline] + pub const fn new(x: f32, y: f32, z: f32) -> Self { + Self { x, y, z } + } +} + +/// A quaternion representing an orientation. +#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)] +#[repr(C)] +pub struct Quat { + pub x: f32, + pub y: f32, + pub z: f32, + pub w: f32, +} + +impl Quat { + pub const IDENTITY: Self = Self { x: 0.0, y: 0.0, z: 0.0, w: 1.0 }; +} + +// Conversion from bevy_math types +impl From for Vec3 { + fn from(v: bevy::math::Vec3) -> Self { + Self { x: v.x, y: v.y, z: v.z } + } +} + +impl From for bevy::math::Vec3 { + fn from(v: Vec3) -> Self { + Self::new(v.x, v.y, v.z) + } +} + +impl From for Quat { + fn from(q: bevy::math::Quat) -> Self { + Self { x: q.x, y: q.y, z: q.z, w: q.w } + } +} + +impl From for bevy::math::Quat { + fn from(q: Quat) -> Self { + Self::from_xyzw(q.x, q.y, q.z, q.w) + } +} diff --git a/crates/libmarathon/src/transform/mod.rs b/crates/libmarathon/src/transform/mod.rs new file mode 100644 index 0000000..5b8f83c --- /dev/null +++ b/crates/libmarathon/src/transform/mod.rs @@ -0,0 +1,73 @@ +//! Transform component with rkyv support +//! +//! Vendored from bevy_transform with rkyv derives added for network synchronization. + +mod math; + +pub use math::{Quat, Vec3}; + +/// Describe the position of an entity. If the entity has a parent, the position is relative +/// to its parent position. +/// +/// This is a pure data type used for serialization. Use bevy::transform::components::Transform +/// for actual ECS components. +#[derive(Debug, PartialEq, Clone, Copy)] +#[derive(rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)] +pub struct Transform { + /// Position of the entity. In 2d, the last value of the `Vec3` is used for z-ordering. + pub translation: Vec3, + /// Rotation of the entity. + pub rotation: Quat, + /// Scale of the entity. + pub scale: Vec3, +} + +impl Default for Transform { + fn default() -> Self { + Self { + translation: Vec3::ZERO, + rotation: Quat::IDENTITY, + scale: Vec3::ONE, + } + } +} + +impl Transform { + /// Creates a new [`Transform`] at the position `(x, y, z)`. In 2d, the `z` component + /// is used for z-ordering elements: higher `z`-value will be in front of lower + /// `z`-value. + #[inline] + pub const fn from_xyz(x: f32, y: f32, z: f32) -> Self { + Self::from_translation(Vec3::new(x, y, z)) + } + + /// Creates a new [`Transform`] with the specified `translation`. + #[inline] + pub const fn from_translation(translation: Vec3) -> Self { + Self { + translation, + rotation: Quat::IDENTITY, + scale: Vec3::ONE, + } + } + + /// Creates a new [`Transform`] with the specified `rotation`. + #[inline] + pub const fn from_rotation(rotation: Quat) -> Self { + Self { + translation: Vec3::ZERO, + rotation, + scale: Vec3::ONE, + } + } + + /// Creates a new [`Transform`] with the specified `scale`. + #[inline] + pub const fn from_scale(scale: Vec3) -> Self { + Self { + translation: Vec3::ZERO, + rotation: Quat::IDENTITY, + scale, + } + } +} diff --git a/crates/libmarathon/tests/bridge_integration.rs b/crates/libmarathon/tests/bridge_integration.rs index 93d077b..b586360 100644 --- a/crates/libmarathon/tests/bridge_integration.rs +++ b/crates/libmarathon/tests/bridge_integration.rs @@ -5,6 +5,33 @@ use libmarathon::networking::SessionId; use std::time::Duration; use tokio::time::timeout; +/// Get appropriate timeout for engine operations +/// - With fast_tests: short timeout (networking is mocked) +/// - Without fast_tests: long timeout (real networking with DHT discovery) +fn engine_timeout() -> Duration { + #[cfg(feature = "fast_tests")] + { + Duration::from_millis(200) + } + #[cfg(not(feature = "fast_tests"))] + { + Duration::from_secs(30) + } +} + +/// Get appropriate wait time for command processing +fn processing_delay() -> Duration { + #[cfg(feature = "fast_tests")] + { + Duration::from_millis(50) + } + #[cfg(not(feature = "fast_tests"))] + { + // Real networking needs more time for initialization + Duration::from_secs(20) + } +} + /// Test that commands sent from "Bevy side" reach the engine #[tokio::test] async fn test_command_routing() { @@ -14,7 +41,7 @@ async fn test_command_routing() { let engine_handle = tokio::spawn(async move { // Run engine for a short time let core = EngineCore::new(handle, ":memory:"); - timeout(Duration::from_millis(100), core.run()) + timeout(engine_timeout(), core.run()) .await .ok(); }); @@ -29,7 +56,7 @@ async fn test_command_routing() { }); // Give engine time to process - tokio::time::sleep(Duration::from_millis(50)).await; + tokio::time::sleep(processing_delay()).await; // Poll events let events = bridge.poll_events(); @@ -65,7 +92,7 @@ async fn test_event_routing() { // Spawn engine let engine_handle = tokio::spawn(async move { let core = EngineCore::new(handle, ":memory:"); - timeout(Duration::from_millis(100), core.run()) + timeout(engine_timeout(), core.run()) .await .ok(); }); @@ -78,7 +105,7 @@ async fn test_event_routing() { session_id: session_id.clone(), }); - tokio::time::sleep(Duration::from_millis(50)).await; + tokio::time::sleep(processing_delay()).await; // Poll events multiple times to verify queue works let events1 = bridge.poll_events(); @@ -102,7 +129,7 @@ async fn test_networking_lifecycle() { let engine_handle = tokio::spawn(async move { let core = EngineCore::new(handle, ":memory:"); - timeout(Duration::from_millis(200), core.run()) + timeout(engine_timeout(), core.run()) .await .ok(); }); @@ -115,7 +142,7 @@ async fn test_networking_lifecycle() { session_id: session_id.clone(), }); - tokio::time::sleep(Duration::from_millis(50)).await; + tokio::time::sleep(processing_delay()).await; let events = bridge.poll_events(); assert!( @@ -128,7 +155,7 @@ async fn test_networking_lifecycle() { // Stop networking bridge.send_command(EngineCommand::StopNetworking); - tokio::time::sleep(Duration::from_millis(50)).await; + tokio::time::sleep(processing_delay()).await; let events = bridge.poll_events(); assert!( @@ -150,7 +177,7 @@ async fn test_join_session_routing() { let engine_handle = tokio::spawn(async move { let core = EngineCore::new(handle, ":memory:"); - timeout(Duration::from_millis(200), core.run()) + timeout(engine_timeout(), core.run()) .await .ok(); }); @@ -163,7 +190,7 @@ async fn test_join_session_routing() { session_id: session_id.clone(), }); - tokio::time::sleep(Duration::from_millis(50)).await; + tokio::time::sleep(processing_delay()).await; let events = bridge.poll_events(); assert!( @@ -191,44 +218,85 @@ async fn test_command_ordering() { let engine_handle = tokio::spawn(async move { let core = EngineCore::new(handle, ":memory:"); - timeout(Duration::from_millis(200), core.run()) + timeout(engine_timeout(), core.run()) .await .ok(); }); tokio::time::sleep(Duration::from_millis(10)).await; - // Send multiple commands + // Send first command and wait for it to complete let session1 = SessionId::new(); - let session2 = SessionId::new(); - bridge.send_command(EngineCommand::StartNetworking { session_id: session1.clone(), }); + + // Wait for first networking to start + tokio::time::sleep(processing_delay()).await; + + let events1 = bridge.poll_events(); + assert!( + events1.iter().any(|e| matches!(e, EngineEvent::NetworkingStarted { .. })), + "Should receive first NetworkingStarted" + ); + + // Now send stop and start second session + let session2 = SessionId::new(); bridge.send_command(EngineCommand::StopNetworking); bridge.send_command(EngineCommand::JoinSession { session_id: session2.clone(), }); - tokio::time::sleep(Duration::from_millis(100)).await; + // Wait for second networking to start + tokio::time::sleep(processing_delay()).await; - let events = bridge.poll_events(); + let events2 = bridge.poll_events(); - // Should see: NetworkingStarted(session1), NetworkingStopped, NetworkingStarted(session2) - let started_events: Vec<_> = events + // Should see: NetworkingStopped, NetworkingStarted(session2) + let started_events: Vec<_> = events2 .iter() .filter(|e| matches!(e, EngineEvent::NetworkingStarted { .. })) .collect(); - let stopped_events: Vec<_> = events + let stopped_events: Vec<_> = events2 .iter() .filter(|e| matches!(e, EngineEvent::NetworkingStopped)) .collect(); - assert_eq!(started_events.len(), 2, "Should have 2 NetworkingStarted events"); + assert_eq!(started_events.len(), 1, "Should have 1 NetworkingStarted event in second batch"); assert_eq!(stopped_events.len(), 1, "Should have 1 NetworkingStopped event"); // Cleanup drop(bridge); let _ = engine_handle.await; } + +/// Test: Shutdown command causes EngineCore to exit gracefully +#[tokio::test] +async fn test_shutdown_command() { + let (bridge, handle) = EngineBridge::new(); + + let engine_handle = tokio::spawn(async move { + let core = EngineCore::new(handle, ":memory:"); + core.run().await; + }); + + tokio::time::sleep(Duration::from_millis(10)).await; + + // Send Shutdown command + bridge.send_command(EngineCommand::Shutdown); + + // Wait for engine to exit (should be quick since it's just processing the command) + let result = timeout(Duration::from_millis(100), engine_handle).await; + + assert!( + result.is_ok(), + "Engine should exit within 100ms after receiving Shutdown command" + ); + + // Verify that the engine actually exited (not errored) + assert!( + result.unwrap().is_ok(), + "Engine should exit cleanly without panic" + ); +} diff --git a/crates/libmarathon/tests/sync_integration_headless.rs b/crates/libmarathon/tests/sync_integration_headless.rs index 81f73fc..b55e4ba 100644 --- a/crates/libmarathon/tests/sync_integration_headless.rs +++ b/crates/libmarathon/tests/sync_integration_headless.rs @@ -45,7 +45,6 @@ use libmarathon::{ GossipBridge, LockMessage, NetworkedEntity, - NetworkedSelection, NetworkedTransform, NetworkingConfig, NetworkingPlugin, @@ -68,8 +67,8 @@ use uuid::Uuid; // ============================================================================ /// Simple position component for testing sync -#[sync_macros::synced(version = 1, strategy = "LastWriteWins")] -#[derive(Component, Reflect, Clone, Debug, PartialEq)] +#[macros::synced] +#[derive(Reflect, PartialEq)] #[reflect(Component)] struct TestPosition { x: f32, @@ -77,8 +76,8 @@ struct TestPosition { } /// Simple health component for testing sync -#[sync_macros::synced(version = 1, strategy = "LastWriteWins")] -#[derive(Component, Reflect, Clone, Debug, PartialEq)] +#[macros::synced] +#[derive(Reflect, PartialEq)] #[reflect(Component)] struct TestHealth { current: f32, @@ -186,8 +185,7 @@ mod test_utils { // Register test component types for reflection app.register_type::() - .register_type::() - .register_type::(); + .register_type::(); app } @@ -1135,7 +1133,6 @@ async fn test_lock_heartbeat_expiration() -> Result<()> { let _ = app1.world_mut() .spawn(( NetworkedEntity::with_id(entity_id, node1_id), - NetworkedSelection::default(), TestPosition { x: 10.0, y: 20.0 }, Persisted::with_id(entity_id), Synced, @@ -1245,7 +1242,6 @@ async fn test_lock_release_stops_heartbeats() -> Result<()> { let _ = app1.world_mut() .spawn(( NetworkedEntity::with_id(entity_id, node1_id), - NetworkedSelection::default(), TestPosition { x: 10.0, y: 20.0 }, Persisted::with_id(entity_id), Synced, @@ -1333,3 +1329,567 @@ async fn test_lock_release_stops_heartbeats() -> Result<()> { Ok(()) } + +/// Test 8: Offline-to-online sync (operations work offline and sync when online) +/// +/// This test verifies the offline-first CRDT architecture: +/// - Spawning entities offline increments vector clock and logs operations +/// - Modifying entities offline increments vector clock and logs operations +/// - Deleting entities offline increments vector clock and records tombstones +/// - When networking starts, all offline operations sync to peers +/// - Peers correctly apply all operations (spawns, updates, deletes) +/// - Tombstones prevent resurrection of deleted entities +#[tokio::test(flavor = "multi_thread")] +async fn test_offline_to_online_sync() -> Result<()> { + use test_utils::*; + use libmarathon::networking::{NodeVectorClock, OperationLog, TombstoneRegistry, ToDelete}; + + println!("=== Starting test_offline_to_online_sync ==="); + + let ctx1 = TestContext::new(); + let ctx2 = TestContext::new(); + + // Setup gossip networking FIRST to get the bridge node IDs + println!("Setting up gossip pair..."); + let (ep1, ep2, router1, router2, bridge1, bridge2) = setup_gossip_pair().await?; + + let node1_id = bridge1.node_id(); + let node2_id = bridge2.node_id(); + println!("Node 1 ID (from bridge): {}", node1_id); + println!("Node 2 ID (from bridge): {}", node2_id); + + // Phase 1: Create app1 in OFFLINE mode (no GossipBridge inserted yet) + // Important: Use the bridge's node_id so operations are recorded with the right ID + println!("\n--- Phase 1: Offline Operations on Node 1 ---"); + let mut app1 = { + let mut app = App::new(); + app.add_plugins(MinimalPlugins.set(ScheduleRunnerPlugin::run_loop( + Duration::from_secs_f64(1.0 / 60.0), + ))) + .add_plugins(NetworkingPlugin::new(NetworkingConfig { + node_id: node1_id, // Use bridge's node_id! + sync_interval_secs: 0.5, + prune_interval_secs: 10.0, + tombstone_gc_interval_secs: 30.0, + })) + .add_plugins(PersistencePlugin::with_config( + ctx1.db_path(), + PersistenceConfig { + flush_interval_secs: 1, + checkpoint_interval_secs: 5, + battery_adaptive: false, + ..Default::default() + }, + )) + .register_type::() + .register_type::(); + + // NOTE: NO GossipBridge inserted yet - this is offline mode! + println!("āœ“ Created app1 in OFFLINE mode (no GossipBridge, but using bridge's node_id)"); + app + }; + + // Spawn entity A offline + let entity_a = Uuid::new_v4(); + println!("\nSpawning entity A ({}) OFFLINE", entity_a); + let entity_a_bevy = app1 + .world_mut() + .spawn(( + NetworkedEntity::with_id(entity_a, node1_id), + TestPosition { x: 10.0, y: 20.0 }, + NetworkedTransform::default(), + Transform::from_xyz(10.0, 20.0, 0.0), + Persisted::with_id(entity_a), + Synced, + )) + .id(); + + // Trigger change detection + { + let world = app1.world_mut(); + if let Ok(mut entity_mut) = world.get_entity_mut(entity_a_bevy) { + if let Some(mut persisted) = entity_mut.get_mut::() { + let _ = &mut *persisted; + } + } + } + + // Update to trigger delta generation (offline) + app1.update(); + tokio::time::sleep(Duration::from_millis(50)).await; + + // Verify clock incremented for spawn + let clock_after_spawn = { + let clock = app1.world().resource::(); + let seq = clock.clock.timestamps.get(&node1_id).copied().unwrap_or(0); + println!("āœ“ Vector clock after spawn: {}", seq); + assert!(seq > 0, "Clock should have incremented after spawn"); + seq + }; + + // Spawn entity B offline + let entity_b = Uuid::new_v4(); + println!("\nSpawning entity B ({}) OFFLINE", entity_b); + let entity_b_bevy = app1 + .world_mut() + .spawn(( + NetworkedEntity::with_id(entity_b, node1_id), + TestPosition { x: 30.0, y: 40.0 }, + NetworkedTransform::default(), + Transform::from_xyz(30.0, 40.0, 0.0), + Persisted::with_id(entity_b), + Synced, + )) + .id(); + + // Trigger change detection + { + let world = app1.world_mut(); + if let Ok(mut entity_mut) = world.get_entity_mut(entity_b_bevy) { + if let Some(mut persisted) = entity_mut.get_mut::() { + let _ = &mut *persisted; + } + } + } + + app1.update(); + tokio::time::sleep(Duration::from_millis(50)).await; + + let clock_after_second_spawn = { + let clock = app1.world().resource::(); + let seq = clock.clock.timestamps.get(&node1_id).copied().unwrap_or(0); + println!("āœ“ Vector clock after second spawn: {}", seq); + assert!(seq > clock_after_spawn, "Clock should have incremented again"); + seq + }; + + // Modify entity A offline (change Transform) + println!("\nModifying entity A Transform OFFLINE"); + { + let world = app1.world_mut(); + if let Ok(mut entity_mut) = world.get_entity_mut(entity_a_bevy) { + if let Some(mut transform) = entity_mut.get_mut::() { + transform.translation.x = 15.0; + transform.translation.y = 25.0; + } + } + } + + app1.update(); + tokio::time::sleep(Duration::from_millis(50)).await; + + let clock_after_modify = { + let clock = app1.world().resource::(); + let seq = clock.clock.timestamps.get(&node1_id).copied().unwrap_or(0); + println!("āœ“ Vector clock after modify: {}", seq); + assert!(seq > clock_after_second_spawn, "Clock should have incremented after modification"); + seq + }; + + // Delete entity B offline + println!("\nDeleting entity B OFFLINE"); + { + let mut commands = app1.world_mut().commands(); + commands.entity(entity_b_bevy).insert(ToDelete); + } + + app1.update(); + tokio::time::sleep(Duration::from_millis(50)).await; + + let clock_after_delete = { + let clock = app1.world().resource::(); + let seq = clock.clock.timestamps.get(&node1_id).copied().unwrap_or(0); + println!("āœ“ Vector clock after delete: {}", seq); + assert!(seq > clock_after_modify, "Clock should have incremented after deletion"); + seq + }; + + // Verify entity B is deleted locally + { + let count = count_entities_with_id(app1.world_mut(), entity_b); + assert_eq!(count, 0, "Entity B should be deleted locally"); + println!("āœ“ Entity B deleted locally"); + } + + // Verify tombstone recorded for entity B + { + let tombstones = app1.world().resource::(); + assert!(tombstones.is_deleted(entity_b), "Tombstone should be recorded for entity B"); + println!("āœ“ Tombstone recorded for entity B"); + } + + // Verify operation log has entries + { + let op_log = app1.world().resource::(); + let op_count = op_log.total_operations(); + println!("āœ“ Operation log has {} operations recorded offline", op_count); + assert!(op_count >= 4, "Should have operations for: spawn A, spawn B, modify A, delete B"); + } + + println!("\n--- Phase 2: Bringing Node 1 Online ---"); + + // Insert GossipBridge into app1 (going online!) + app1.world_mut().insert_resource(bridge1); + println!("āœ“ Inserted GossipBridge into app1 - NOW ONLINE"); + println!(" Node 1 ID: {} (matches bridge from start)", node1_id); + + // Create app2 online from the start + println!("\n--- Phase 3: Creating Node 2 Online ---"); + let mut app2 = create_test_app(node2_id, ctx2.db_path(), bridge2); + println!("āœ“ Created app2 ONLINE with node_id: {}", node2_id); + + // Phase 3: Wait for sync + println!("\n--- Phase 4: Waiting for Sync ---"); + println!("Expected to sync:"); + println!(" - Entity A (spawned and modified offline)"); + println!(" - Entity B tombstone (deleted offline)"); + + // Wait for entity A to sync to app2 + wait_for_sync(&mut app1, &mut app2, Duration::from_secs(10), |_, w2| { + let count = count_entities_with_id(w2, entity_a); + if count > 0 { + println!("āœ“ Entity A found on node 2!"); + true + } else { + false + } + }) + .await?; + + // Wait a bit more for tombstone to sync + for _ in 0..20 { + app1.update(); + app2.update(); + tokio::time::sleep(Duration::from_millis(100)).await; + } + + println!("\n--- Phase 5: Verification ---"); + + // Verify entity A synced with MODIFIED transform + { + let mut query = app2.world_mut().query::<(&NetworkedEntity, &Transform)>(); + let mut found = false; + for (ne, transform) in query.iter(app2.world()) { + if ne.network_id == entity_a { + found = true; + println!("āœ“ Entity A found on node 2"); + println!(" Transform: ({}, {}, {})", + transform.translation.x, + transform.translation.y, + transform.translation.z + ); + // Verify it has the MODIFIED position, not the original + assert!( + (transform.translation.x - 15.0).abs() < 0.1, + "Entity A should have modified X position (15.0)" + ); + assert!( + (transform.translation.y - 25.0).abs() < 0.1, + "Entity A should have modified Y position (25.0)" + ); + println!("āœ“ Entity A has correct modified transform"); + break; + } + } + assert!(found, "Entity A should exist on node 2"); + } + + // Verify entity B does NOT exist on node 2 (was deleted offline) + { + let count = count_entities_with_id(app2.world_mut(), entity_b); + assert_eq!(count, 0, "Entity B should NOT exist on node 2 (deleted offline)"); + println!("āœ“ Entity B correctly does not exist on node 2"); + } + + // Verify tombstone for entity B exists on node 2 + { + let tombstones = app2.world().resource::(); + assert!( + tombstones.is_deleted(entity_b), + "Tombstone for entity B should have synced to node 2" + ); + println!("āœ“ Tombstone for entity B synced to node 2"); + } + + // Verify final vector clocks are consistent + { + let clock1 = app1.world().resource::(); + let clock2 = app2.world().resource::(); + + let node1_seq_on_app1 = clock1.clock.timestamps.get(&node1_id).copied().unwrap_or(0); + let node1_seq_on_app2 = clock2.clock.timestamps.get(&node1_id).copied().unwrap_or(0); + + println!("Final vector clocks:"); + println!(" Node 1 clock on app1: {}", node1_seq_on_app1); + println!(" Node 1 clock on app2: {}", node1_seq_on_app2); + + // Clock should be clock_after_delete + 1 because sending the SyncRequest increments it + assert_eq!( + node1_seq_on_app1, + clock_after_delete + 1, + "Node 1's clock should be offline state + 1 (for SyncRequest)" + ); + + // Node 2 should have learned about node 1's clock through sync + assert_eq!( + node1_seq_on_app2, + node1_seq_on_app1, + "Node 2 should have synced node 1's clock" + ); + + println!("āœ“ Vector clocks verified"); + } + + println!("\nāœ“ OFFLINE-TO-ONLINE SYNC TEST PASSED!"); + println!("Summary:"); + println!(" - Spawned 2 entities offline (clock incremented)"); + println!(" - Modified 1 entity offline (clock incremented)"); + println!(" - Deleted 1 entity offline (clock incremented, tombstone recorded)"); + println!(" - Went online and synced to peer"); + println!(" - Peer received all operations correctly"); + println!(" - Tombstone prevented deleted entity resurrection"); + + // Cleanup + router1.shutdown().await?; + router2.shutdown().await?; + ep1.close().await; + ep2.close().await; + + Ok(()) +} + +/// Test 12: Lock re-acquisition cycle (acquire → release → re-acquire) +/// +/// This test verifies that locks can be acquired, released, and then re-acquired multiple times. +/// This is critical for normal editing workflows where users repeatedly select/deselect entities. +#[tokio::test(flavor = "multi_thread")] +async fn test_lock_reacquisition_cycle() -> Result<()> { + use test_utils::*; + + println!("\n=== Starting test_lock_reacquisition_cycle ==="); + println!("Testing: acquire → release → re-acquire → release → re-acquire"); + + let ctx1 = TestContext::new(); + let ctx2 = TestContext::new(); + + let (ep1, ep2, router1, router2, bridge1, bridge2) = setup_gossip_pair().await?; + + let node1_id = bridge1.node_id(); + let node2_id = bridge2.node_id(); + + println!("Node 1 ID: {}", node1_id); + println!("Node 2 ID: {}", node2_id); + + let mut app1 = create_test_app(node1_id, ctx1.db_path(), bridge1.clone()); + let mut app2 = create_test_app(node2_id, ctx2.db_path(), bridge2); + + // === PHASE 1: Spawn entity === + println!("\nPHASE 1: Spawning entity on Node 1"); + + let entity_id = Uuid::new_v4(); + app1.world_mut().spawn(( + NetworkedEntity::with_id(entity_id, node1_id), + TestPosition { x: 10.0, y: 20.0 }, + Persisted::with_id(entity_id), + Synced, + )); + + // Wait for entity to replicate + wait_for_sync(&mut app1, &mut app2, Duration::from_secs(5), |_, w2| { + count_entities_with_id(w2, entity_id) > 0 + }) + .await?; + + println!("āœ“ Entity replicated to both nodes"); + + // === PHASE 2: First lock acquisition === + println!("\nPHASE 2: Node 1 acquires lock (FIRST time)"); + + // Update LocalSelection to select the entity + { + let mut selection = app1.world_mut().resource_mut::(); + selection.clear(); + selection.insert(entity_id); + println!(" Updated LocalSelection to select entity"); + } + + // Wait for lock to propagate + wait_for_sync(&mut app1, &mut app2, Duration::from_secs(3), |w1, w2| { + let lock1 = w1.resource::(); + let lock2 = w2.resource::(); + lock1.is_locked_by(entity_id, node1_id, node1_id) + && lock2.is_locked_by(entity_id, node1_id, node2_id) + }) + .await?; + + { + let lock1 = app1.world().resource::(); + let lock2 = app2.world().resource::(); + + assert!( + lock1.is_locked_by(entity_id, node1_id, node1_id), + "Node 1 should hold lock (first acquisition)" + ); + assert!( + lock2.is_locked_by(entity_id, node1_id, node2_id), + "Node 2 should see Node 1 holding lock (first acquisition)" + ); + } + + println!("āœ“ FIRST lock acquisition successful"); + + // === PHASE 3: First lock release === + println!("\nPHASE 3: Node 1 releases lock (FIRST time)"); + + { + let mut selection = app1.world_mut().resource_mut::(); + selection.clear(); + println!(" Cleared LocalSelection"); + } + + // Wait for lock release to propagate + wait_for_sync(&mut app1, &mut app2, Duration::from_secs(3), |w1, w2| { + let lock1 = w1.resource::(); + let lock2 = w2.resource::(); + !lock1.is_locked(entity_id, node1_id) && !lock2.is_locked(entity_id, node2_id) + }) + .await?; + + { + let lock1 = app1.world().resource::(); + let lock2 = app2.world().resource::(); + + assert!( + !lock1.is_locked(entity_id, node1_id), + "Node 1 should have released lock" + ); + assert!( + !lock2.is_locked(entity_id, node2_id), + "Node 2 should see lock released" + ); + } + + println!("āœ“ FIRST lock release successful"); + + // === PHASE 4: SECOND lock acquisition (THE CRITICAL TEST) === + println!("\nPHASE 4: Node 1 acquires lock (SECOND time) - THIS IS THE BUG"); + + { + let mut selection = app1.world_mut().resource_mut::(); + selection.clear(); + selection.insert(entity_id); + println!(" Updated LocalSelection to select entity (again)"); + } + + // Wait for lock to propagate + println!(" Waiting for lock to propagate..."); + for i in 0..30 { + app1.update(); + app2.update(); + tokio::time::sleep(Duration::from_millis(100)).await; + + if i % 5 == 0 { + let lock1 = app1.world().resource::(); + let lock2 = app2.world().resource::(); + + println!( + " [{}] Node 1: locked_by_me={}, locked={}", + i, + lock1.is_locked_by(entity_id, node1_id, node1_id), + lock1.is_locked(entity_id, node1_id) + ); + println!( + " [{}] Node 2: locked_by_node1={}, locked={}", + i, + lock2.is_locked_by(entity_id, node1_id, node2_id), + lock2.is_locked(entity_id, node2_id) + ); + } + } + + { + let lock1 = app1.world().resource::(); + let lock2 = app2.world().resource::(); + + assert!( + lock1.is_locked_by(entity_id, node1_id, node1_id), + "Node 1 should hold lock (SECOND acquisition) - THIS IS WHERE THE BUG MANIFESTS" + ); + assert!( + lock2.is_locked_by(entity_id, node1_id, node2_id), + "Node 2 should see Node 1 holding lock (SECOND acquisition) - THIS IS WHERE THE BUG MANIFESTS" + ); + } + + println!("āœ“ SECOND lock acquisition successful!"); + + // === PHASE 5: Second lock release === + println!("\nPHASE 5: Node 1 releases lock (SECOND time)"); + + { + let mut selection = app1.world_mut().resource_mut::(); + selection.clear(); + println!(" Cleared LocalSelection"); + } + + // Wait for lock release to propagate + wait_for_sync(&mut app1, &mut app2, Duration::from_secs(3), |w1, w2| { + let lock1 = w1.resource::(); + let lock2 = w2.resource::(); + !lock1.is_locked(entity_id, node1_id) && !lock2.is_locked(entity_id, node2_id) + }) + .await?; + + println!("āœ“ SECOND lock release successful"); + + // === PHASE 6: THIRD lock acquisition (verify pattern continues) === + println!("\nPHASE 6: Node 1 acquires lock (THIRD time) - verifying pattern"); + + { + let mut selection = app1.world_mut().resource_mut::(); + selection.clear(); + selection.insert(entity_id); + println!(" Updated LocalSelection to select entity (third time)"); + } + + // Wait for lock to propagate + wait_for_sync(&mut app1, &mut app2, Duration::from_secs(3), |w1, w2| { + let lock1 = w1.resource::(); + let lock2 = w2.resource::(); + lock1.is_locked_by(entity_id, node1_id, node1_id) + && lock2.is_locked_by(entity_id, node1_id, node2_id) + }) + .await?; + + { + let lock1 = app1.world().resource::(); + let lock2 = app2.world().resource::(); + + assert!( + lock1.is_locked_by(entity_id, node1_id, node1_id), + "Node 1 should hold lock (THIRD acquisition)" + ); + assert!( + lock2.is_locked_by(entity_id, node1_id, node2_id), + "Node 2 should see Node 1 holding lock (THIRD acquisition)" + ); + } + + println!("āœ“ THIRD lock acquisition successful!"); + + println!("\nāœ“ LOCK RE-ACQUISITION CYCLE TEST PASSED!"); + println!("Summary:"); + println!(" - First acquisition: āœ“"); + println!(" - First release: āœ“"); + println!(" - SECOND acquisition: āœ“ (this was failing before)"); + println!(" - Second release: āœ“"); + println!(" - THIRD acquisition: āœ“"); + + // Cleanup + router1.shutdown().await?; + router2.shutdown().await?; + ep1.close().await; + ep2.close().await; + + Ok(()) +} diff --git a/crates/macros/src/lib.rs b/crates/macros/src/lib.rs index dbe6363..88859c1 100644 --- a/crates/macros/src/lib.rs +++ b/crates/macros/src/lib.rs @@ -5,6 +5,7 @@ mod as_bind_group; mod extract_component; mod extract_resource; mod specializer; +mod synced; use bevy_macro_utils::{derive_label, BevyManifest}; use proc_macro::TokenStream; @@ -150,3 +151,26 @@ pub fn derive_draw_function_label(input: TokenStream) -> TokenStream { .push(format_ident!("DrawFunctionLabel").into()); derive_label(input, "DrawFunctionLabel", &trait_path) } + +/// Attribute macro for automatic component synchronization. +/// +/// Automatically generates Component, rkyv serialization derives, and registers +/// the component in the ComponentTypeRegistry for network synchronization. +/// +/// # Example +/// +/// ```no_compile +/// use macros::synced; +/// +/// #[synced] +/// pub struct CubeMarker { +/// pub color_r: f32, +/// pub color_g: f32, +/// pub color_b: f32, +/// pub size: f32, +/// } +/// ``` +#[proc_macro_attribute] +pub fn synced(attr: TokenStream, item: TokenStream) -> TokenStream { + synced::synced_attribute(attr, item) +} diff --git a/crates/macros/src/synced.rs b/crates/macros/src/synced.rs new file mode 100644 index 0000000..5cc21a2 --- /dev/null +++ b/crates/macros/src/synced.rs @@ -0,0 +1,57 @@ +use proc_macro::TokenStream; +use quote::quote; +use syn::{parse_macro_input, DeriveInput}; + +pub fn synced_attribute(_attr: TokenStream, item: TokenStream) -> TokenStream { + let ast = parse_macro_input!(item as DeriveInput); + let struct_name = &ast.ident; + let vis = &ast.vis; + let attrs = &ast.attrs; + let generics = &ast.generics; + let (impl_generics, ty_generics, where_clause) = generics.split_for_impl(); + + let fields = match &ast.data { + syn::Data::Struct(data) => &data.fields, + _ => panic!("#[synced] can only be used on structs"), + }; + + TokenStream::from(quote! { + // Generate the struct with all necessary derives + #(#attrs)* + #[derive(::bevy::prelude::Component, Clone, Copy, Debug)] + #[derive(::rkyv::Archive, ::rkyv::Serialize, ::rkyv::Deserialize)] + #vis struct #struct_name #generics #fields + + // Register component in type registry using inventory + ::inventory::submit! { + ::libmarathon::persistence::ComponentMeta { + type_name: stringify!(#struct_name), + type_path: concat!(module_path!(), "::", stringify!(#struct_name)), + type_id: std::any::TypeId::of::<#struct_name>(), + + deserialize_fn: |bytes: &[u8]| -> anyhow::Result> { + let component = ::rkyv::from_bytes::<#struct_name #ty_generics, ::rkyv::rancor::Failure>(bytes)?; + Ok(Box::new(component)) + }, + + serialize_fn: |world: &::bevy::ecs::world::World, entity: ::bevy::ecs::entity::Entity| + -> Option<::bytes::Bytes> + { + world.get::<#struct_name #ty_generics>(entity).map(|component| { + let serialized = ::rkyv::to_bytes::<::rkyv::rancor::Failure>(component) + .expect("Failed to serialize component"); + ::bytes::Bytes::from(serialized.to_vec()) + }) + }, + + insert_fn: |entity_mut: &mut ::bevy::ecs::world::EntityWorldMut, + boxed: Box| + { + if let Ok(component) = boxed.downcast::<#struct_name #ty_generics>() { + entity_mut.insert(*component); + } + }, + } + } + }) +} diff --git a/crates/macros/tests/basic_macro_test.rs b/crates/macros/tests/basic_macro_test.rs index ecd2fc6..b8ca633 100644 --- a/crates/macros/tests/basic_macro_test.rs +++ b/crates/macros/tests/basic_macro_test.rs @@ -1,71 +1,64 @@ -/// Basic tests for the Synced attribute macro +/// Basic tests for the #[synced] attribute macro use bevy::prelude::*; -use libmarathon::networking::{ - ClockComparison, - ComponentMergeDecision, - SyncComponent, -}; -// Test 1: Basic struct with LWW strategy compiles -// Note: No need to manually derive rkyv traits - synced attribute adds them automatically! -#[sync_macros::synced(version = 1, strategy = "LastWriteWins")] -#[derive(Component, Reflect, Clone, Debug, PartialEq)] -#[reflect(Component)] -struct Health(f32); +// Test 1: Basic struct with synced attribute compiles +#[macros::synced] +struct Health { + current: f32, +} #[test] fn test_health_compiles() { - let health = Health(100.0); - assert_eq!(health.0, 100.0); + let health = Health { current: 100.0 }; + assert_eq!(health.current, 100.0); } #[test] -fn test_health_serialization() { - let health = Health(100.0); - let bytes = health.serialize_sync().unwrap(); - let deserialized = Health::deserialize_sync(&bytes).unwrap(); - assert_eq!(health, deserialized); +fn test_health_has_component_trait() { + // The synced macro should automatically derive Component + let health = Health { current: 100.0 }; + + // Can insert into Bevy world + let mut world = World::new(); + let entity = world.spawn(health).id(); + + // Can query it back + let health_ref = world.get::(entity).unwrap(); + assert_eq!(health_ref.current, 100.0); } #[test] -fn test_health_lww_merge_remote_newer() { - let mut local = Health(50.0); - let remote = Health(100.0); +fn test_health_rkyv_serialization() { + let health = Health { current: 100.0 }; - let decision = local.merge(remote, ClockComparison::RemoteNewer); - assert_eq!(decision, ComponentMergeDecision::TookRemote); - assert_eq!(local.0, 100.0); + // Test rkyv serialization (which the synced macro adds) + let bytes = rkyv::to_bytes::(&health) + .expect("Should serialize with rkyv"); + + let deserialized: Health = rkyv::from_bytes::(&bytes) + .expect("Should deserialize with rkyv"); + + assert_eq!(deserialized.current, health.current); } #[test] -fn test_health_lww_merge_local_newer() { - let mut local = Health(50.0); - let remote = Health(100.0); +fn test_health_is_clone_and_copy() { + let health = Health { current: 100.0 }; - let decision = local.merge(remote, ClockComparison::LocalNewer); - assert_eq!(decision, ComponentMergeDecision::KeptLocal); - assert_eq!(local.0, 50.0); // Local value kept -} + // Test Clone + let cloned = health.clone(); + assert_eq!(cloned.current, health.current); -#[test] -fn test_health_lww_merge_concurrent() { - let mut local = Health(50.0); - let remote = Health(100.0); + // Test Copy (implicit through assignment) + let copied = health; + assert_eq!(copied.current, health.current); - let decision = local.merge(remote, ClockComparison::Concurrent); - // With concurrent, we use hash tiebreaker - // Either TookRemote or KeptLocal depending on hash - assert!( - decision == ComponentMergeDecision::TookRemote || - decision == ComponentMergeDecision::KeptLocal - ); + // Original still valid after copy + assert_eq!(health.current, 100.0); } // Test 2: Struct with multiple fields -// rkyv traits are automatically added by the synced attribute! -#[sync_macros::synced(version = 1, strategy = "LastWriteWins")] -#[derive(Component, Reflect, Clone, Debug, PartialEq)] -#[reflect(Component)] +#[macros::synced] struct Position { x: f32, y: f32, @@ -79,20 +72,101 @@ fn test_position_compiles() { } #[test] -fn test_position_serialization() { +fn test_position_rkyv_serialization() { let pos = Position { x: 10.0, y: 20.0 }; - let bytes = pos.serialize_sync().unwrap(); - let deserialized = Position::deserialize_sync(&bytes).unwrap(); - assert_eq!(pos, deserialized); + + let bytes = rkyv::to_bytes::(&pos) + .expect("Should serialize with rkyv"); + + let deserialized: Position = rkyv::from_bytes::(&bytes) + .expect("Should deserialize with rkyv"); + + assert_eq!(deserialized.x, pos.x); + assert_eq!(deserialized.y, pos.y); } #[test] -fn test_position_merge() { - let mut local = Position { x: 10.0, y: 20.0 }; - let remote = Position { x: 30.0, y: 40.0 }; +fn test_position_in_bevy_world() { + let pos = Position { x: 10.0, y: 20.0 }; - let decision = local.merge(remote, ClockComparison::RemoteNewer); - assert_eq!(decision, ComponentMergeDecision::TookRemote); - assert_eq!(local.x, 30.0); - assert_eq!(local.y, 40.0); + let mut world = World::new(); + let entity = world.spawn(pos).id(); + + let pos_ref = world.get::(entity).unwrap(); + assert_eq!(pos_ref.x, 10.0); + assert_eq!(pos_ref.y, 20.0); +} + +// Test 3: Component registration in type registry +// This test verifies that the inventory::submit! generated by the macro works +#[test] +fn test_component_registry_has_health() { + use libmarathon::persistence::ComponentTypeRegistry; + + let registry = ComponentTypeRegistry::init(); + + // The macro should have registered Health + let type_id = std::any::TypeId::of::(); + let discriminant = registry.get_discriminant(type_id); + + assert!(discriminant.is_some(), "Health should be registered in ComponentTypeRegistry"); + + // Check the type name + let type_name = registry.get_type_name(discriminant.unwrap()); + assert_eq!(type_name, Some("Health")); +} + +#[test] +fn test_component_registry_has_position() { + use libmarathon::persistence::ComponentTypeRegistry; + + let registry = ComponentTypeRegistry::init(); + + let type_id = std::any::TypeId::of::(); + let discriminant = registry.get_discriminant(type_id); + + assert!(discriminant.is_some(), "Position should be registered in ComponentTypeRegistry"); + + // Check the type name + let type_name = registry.get_type_name(discriminant.unwrap()); + assert_eq!(type_name, Some("Position")); +} + +// Test 4: End-to-end serialization via ComponentTypeRegistry +#[test] +fn test_registry_serialization_roundtrip() { + use libmarathon::persistence::ComponentTypeRegistry; + + let mut world = World::new(); + let entity = world.spawn(Health { current: 75.0 }).id(); + + let registry = ComponentTypeRegistry::init(); + let type_id = std::any::TypeId::of::(); + let discriminant = registry.get_discriminant(type_id).unwrap(); + + // Serialize using the registry + let serialize_fn = registry.get_discriminant(type_id) + .and_then(|disc| { + // Get serializer from the registry internals + // We'll use the serialization method from the registry + let serializer = world.get::(entity).map(|component| { + rkyv::to_bytes::(component) + .expect("Should serialize") + .to_vec() + }); + serializer + }) + .expect("Should serialize Health component"); + + // Deserialize using the registry + let deserialize_fn = registry.get_deserialize_fn(discriminant) + .expect("Should have deserialize function"); + + let boxed = deserialize_fn(&serialize_fn) + .expect("Should deserialize Health component"); + + let health = boxed.downcast::() + .expect("Should downcast to Health"); + + assert_eq!(health.current, 75.0); }