diff --git a/Cargo.lock b/Cargo.lock index f91cb77..fb3f4fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4652,6 +4652,7 @@ dependencies = [ "raw-window-handle", "rkyv", "rusqlite", + "rustc-hash 2.1.1", "serde", "serde_json", "sha2 0.10.9", diff --git a/crates/app/src/engine_bridge.rs b/crates/app/src/engine_bridge.rs index 26c9a89..dffa267 100644 --- a/crates/app/src/engine_bridge.rs +++ b/crates/app/src/engine_bridge.rs @@ -82,10 +82,12 @@ fn poll_engine_events( commands.insert_resource(gossip_bridge); info!("Inserted GossipBridge resource"); - // Update session to use the new session ID and set state to Active + // Update session to use the new session ID and set state to Joining + // The transition_session_state_system will handle Joining → Active + // after receiving FullState from peers current_session.session = Session::new(session_id.clone()); - current_session.session.state = SessionState::Active; - info!("Updated CurrentSession to Active: {}", session_id.to_code()); + current_session.session.state = SessionState::Joining; + info!("Updated CurrentSession to Joining: {}", session_id.to_code()); // Update node ID in clock node_clock.node_id = node_id; diff --git a/crates/libmarathon/Cargo.toml b/crates/libmarathon/Cargo.toml index e3d48f8..18c6baa 100644 --- a/crates/libmarathon/Cargo.toml +++ b/crates/libmarathon/Cargo.toml @@ -70,6 +70,7 @@ itertools = "0.14" rand = "0.8" raw-window-handle = "0.6" rusqlite = { version = "0.37.0", features = ["bundled"] } +rustc-hash = "2.1" serde = { version = "1.0", features = ["derive"] } serde_json.workspace = true sha2 = "0.10" diff --git a/crates/libmarathon/src/engine/core.rs b/crates/libmarathon/src/engine/core.rs index b6866e6..22eb10a 100644 --- a/crates/libmarathon/src/engine/core.rs +++ b/crates/libmarathon/src/engine/core.rs @@ -71,11 +71,22 @@ impl EngineCore { self.stop_networking().await; } EngineCommand::SaveSession => { - // TODO: Save current session state - tracing::debug!("SaveSession command received (stub)"); + // Session state is auto-saved by save_session_on_shutdown_system in Bevy + // This command is a no-op, as persistence is handled by Bevy systems + tracing::debug!("SaveSession command received (session auto-save handled by Bevy)"); } EngineCommand::LoadSession { session_id } => { - tracing::debug!("LoadSession command received for {} (stub)", session_id.to_code()); + // Loading a session means switching to a different session + // This requires restarting networking with the new session + tracing::info!("LoadSession command received for {}", session_id.to_code()); + + // Stop current networking if any + if self.networking_task.is_some() { + self.stop_networking().await; + } + + // Start networking with the new session + self.start_networking(session_id).await; } EngineCommand::TickClock => { self.tick_clock(); diff --git a/crates/libmarathon/src/engine/networking.rs b/crates/libmarathon/src/engine/networking.rs index b616d78..6e358ae 100644 --- a/crates/libmarathon/src/engine/networking.rs +++ b/crates/libmarathon/src/engine/networking.rs @@ -240,11 +240,43 @@ impl NetworkingManager { Event::Received(msg) => { // Deserialize and forward to GossipBridge for Bevy systems if let Ok(versioned) = rkyv::from_bytes::(&msg.content) { + // Diagnostic logging: track message type and nonce + let msg_type = match &versioned.message { + SyncMessage::EntityDelta { entity_id, .. } => { + format!("EntityDelta({})", entity_id) + } + SyncMessage::JoinRequest { node_id, .. } => { + format!("JoinRequest({})", node_id) + } + SyncMessage::FullState { entities, .. } => { + format!("FullState({} entities)", entities.len()) + } + SyncMessage::SyncRequest { node_id, .. } => { + format!("SyncRequest({})", node_id) + } + SyncMessage::MissingDeltas { deltas } => { + format!("MissingDeltas({} ops)", deltas.len()) + } + SyncMessage::Lock(lock_msg) => { + format!("Lock({:?})", lock_msg) + } + }; + + tracing::debug!( + "[NetworkingManager::receive] Node {} received from iroh-gossip: {} (nonce: {})", + self.node_id, msg_type, versioned.nonce + ); + if let Err(e) = self.bridge.push_incoming(versioned) { - tracing::error!("Failed to push message to GossipBridge: {}", e); + tracing::error!("Failed to forward {} to GossipBridge: {}", msg_type, e); } else { - tracing::debug!("Forwarded message to Bevy via GossipBridge"); + tracing::debug!( + "[NetworkingManager::receive] ✓ Forwarded {} to Bevy GossipBridge", + msg_type + ); } + } else { + tracing::warn!("Failed to deserialize message from iroh-gossip"); } } Event::NeighborUp(peer) => { @@ -288,15 +320,54 @@ impl NetworkingManager { // Poll GossipBridge for outgoing messages and broadcast via iroh _ = bridge_poll_interval.tick() => { + let mut sent_count = 0; while let Some(msg) = self.bridge.try_recv_outgoing() { + // Diagnostic logging: track message type and nonce + let msg_type = match &msg.message { + SyncMessage::EntityDelta { entity_id, .. } => { + format!("EntityDelta({})", entity_id) + } + SyncMessage::JoinRequest { node_id, .. } => { + format!("JoinRequest({})", node_id) + } + SyncMessage::FullState { entities, .. } => { + format!("FullState({} entities)", entities.len()) + } + SyncMessage::SyncRequest { node_id, .. } => { + format!("SyncRequest({})", node_id) + } + SyncMessage::MissingDeltas { deltas } => { + format!("MissingDeltas({} ops)", deltas.len()) + } + SyncMessage::Lock(lock_msg) => { + format!("Lock({:?})", lock_msg) + } + }; + + tracing::debug!( + "[NetworkingManager::broadcast] Node {} broadcasting: {} (nonce: {})", + self.node_id, msg_type, msg.nonce + ); + if let Ok(bytes) = rkyv::to_bytes::(&msg).map(|b| b.to_vec()) { if let Err(e) = self.sender.broadcast(Bytes::from(bytes)).await { - tracing::error!("Failed to broadcast message: {}", e); + tracing::error!("Failed to broadcast {} to iroh-gossip: {}", msg_type, e); } else { - tracing::debug!("Broadcast message from Bevy via iroh-gossip"); + sent_count += 1; + tracing::debug!( + "[NetworkingManager::broadcast] ✓ Sent {} to iroh-gossip network", + msg_type + ); } } } + + if sent_count > 0 { + tracing::info!( + "[NetworkingManager::broadcast] Node {} sent {} messages to iroh-gossip network", + self.node_id, sent_count + ); + } } // Periodic tasks: heartbeats and lock cleanup diff --git a/crates/libmarathon/src/networking/apply_ops.rs b/crates/libmarathon/src/networking/apply_ops.rs index dc323b6..7cec1a5 100644 --- a/crates/libmarathon/src/networking/apply_ops.rs +++ b/crates/libmarathon/src/networking/apply_ops.rs @@ -165,6 +165,24 @@ pub fn apply_entity_delta(delta: &EntityDelta, world: &mut World) { ); } } + + // CRITICAL: Add marker to prevent feedback loop + // + // When we apply remote operations, insert_fn() triggers Bevy's change detection. + // This causes auto_detect_transform_changes_system to mark NetworkedEntity as changed, + // which would normally trigger generate_delta_system to broadcast it back, creating + // an infinite feedback loop. + // + // By adding SkipNextDeltaGeneration marker, we tell generate_delta_system to skip + // this entity for one frame. A cleanup system removes the marker after delta + // generation runs, allowing future local changes to be broadcast normally. + if let Ok(mut entity_mut) = world.get_entity_mut(entity) { + entity_mut.insert(crate::networking::SkipNextDeltaGeneration); + debug!( + "Added SkipNextDeltaGeneration marker to entity {:?} to prevent feedback loop", + delta.entity_id + ); + } } /// Apply a single ComponentOp to an entity @@ -235,14 +253,18 @@ fn apply_set_operation_with_lww( ) { // Get component type name for logging and clock tracking let type_registry = { - let registry_resource = world.resource::(); + let registry_resource = + world.resource::(); registry_resource.0 }; - + let component_type_name = match type_registry.get_type_name(discriminant) { | Some(name) => name, | None => { - error!("Unknown discriminant {} - component not registered", discriminant); + error!( + "Unknown discriminant {} - component not registered", + discriminant + ); return; }, }; @@ -310,7 +332,10 @@ fn apply_set_operation_with_lww( } }, | crate::networking::merge::MergeDecision::Equal => { - debug!("Ignoring remote Set for {} (clocks equal)", component_type_name); + debug!( + "Ignoring remote Set for {} (clocks equal)", + component_type_name + ); false }, } @@ -355,14 +380,9 @@ fn apply_set_operation_with_lww( /// /// Deserializes the component and inserts/updates it on the entity. /// Handles both inline data and blob references. -fn apply_set_operation( - entity: Entity, - discriminant: u16, - data: &ComponentData, - world: &mut World, -) { +fn apply_set_operation(entity: Entity, discriminant: u16, data: &ComponentData, world: &mut World) { let blob_store = world.get_resource::(); - + // Get the actual data (resolve blob if needed) let data_bytes = match data { | ComponentData::Inline(bytes) => bytes.clone(), @@ -390,7 +410,8 @@ fn apply_set_operation( // Get component type registry let type_registry = { - let registry_resource = world.resource::(); + let registry_resource = + world.resource::(); registry_resource.0 }; @@ -401,7 +422,10 @@ fn apply_set_operation( let (deserialize_fn, insert_fn) = match (deserialize_fn, insert_fn) { | (Some(d), Some(i)) => (d, i), | _ => { - error!("Discriminant {} not registered in ComponentTypeRegistry", discriminant); + error!( + "Discriminant {} not registered in ComponentTypeRegistry", + discriminant + ); return; }, }; diff --git a/crates/libmarathon/src/networking/change_detection.rs b/crates/libmarathon/src/networking/change_detection.rs index 1adc263..2957b16 100644 --- a/crates/libmarathon/src/networking/change_detection.rs +++ b/crates/libmarathon/src/networking/change_detection.rs @@ -31,6 +31,7 @@ pub fn auto_detect_transform_changes_system( ( With, Or<(Changed, Changed)>, + Without, ), >, ) { diff --git a/crates/libmarathon/src/networking/components.rs b/crates/libmarathon/src/networking/components.rs index c26e871..941631a 100644 --- a/crates/libmarathon/src/networking/components.rs +++ b/crates/libmarathon/src/networking/components.rs @@ -11,6 +11,21 @@ use serde::{ use crate::networking::vector_clock::NodeId; +/// Marker component to skip delta generation for one frame after receiving remote updates +/// +/// When we apply remote operations via `apply_entity_delta()`, the `insert_fn()` call +/// triggers Bevy's change detection. This would normally cause `generate_delta_system` +/// to create and broadcast a new delta, creating an infinite feedback loop. +/// +/// By adding this marker when we apply remote updates, we tell `generate_delta_system` +/// to skip this entity for one frame. A cleanup system removes the marker after +/// delta generation runs, allowing future local changes to be broadcast normally. +/// +/// This is an implementation detail of the feedback loop prevention mechanism. +/// User code should never need to interact with this component. +#[derive(Component, Debug)] +pub struct SkipNextDeltaGeneration; + /// Marker component indicating an entity should be synchronized over the /// network /// diff --git a/crates/libmarathon/src/networking/delta_generation.rs b/crates/libmarathon/src/networking/delta_generation.rs index f009d34..e9d7f0a 100644 --- a/crates/libmarathon/src/networking/delta_generation.rs +++ b/crates/libmarathon/src/networking/delta_generation.rs @@ -70,8 +70,13 @@ pub fn generate_delta_system(world: &mut World) { // Broadcast only happens when online let changed_entities: Vec<(Entity, uuid::Uuid, uuid::Uuid)> = { - let mut query = - world.query_filtered::<(Entity, &NetworkedEntity), Or<(Added, Changed)>>(); + let mut query = world.query_filtered::< + (Entity, &NetworkedEntity), + ( + Or<(Added, Changed)>, + Without, + ), + >(); query .iter(world) .map(|(entity, networked)| (entity, networked.network_id, networked.owner_node_id)) @@ -98,22 +103,25 @@ pub fn generate_delta_system(world: &mut World) { Option>, )> = bevy::ecs::system::SystemState::new(world); - let (node_id, vector_clock, current_seq) = { + let (node_id, vector_clock, new_seq) = { let (_, _, mut node_clock, last_versions, _) = system_state.get_mut(world); - // Check if we should sync this entity + // Check if we should sync this entity with the NEXT sequence (after tick) + // This prevents duplicate sends when system runs multiple times per frame let current_seq = node_clock.sequence(); - if !last_versions.should_sync(network_id, current_seq) { + let next_seq = current_seq + 1; // What the sequence will be after tick + if !last_versions.should_sync(network_id, next_seq) { drop(last_versions); drop(node_clock); system_state.apply(world); continue; } - // Increment our vector clock - node_clock.tick(); + // Increment our vector clock and get the NEW sequence + let new_seq = node_clock.tick(); + debug_assert_eq!(new_seq, next_seq, "tick() should return next_seq"); - (node_clock.node_id, node_clock.clock.clone(), current_seq) + (node_clock.node_id, node_clock.clock.clone(), new_seq) }; // Phase 2: Build operations (needs world access without holding other borrows) @@ -174,8 +182,10 @@ pub fn generate_delta_system(world: &mut World) { ); } - // Update last sync version (both online and offline) - last_versions.update(network_id, current_seq); + // Update last sync version with NEW sequence (after tick) to prevent duplicates + // CRITICAL: Must use new_seq (after tick), not current_seq (before tick) + // This prevents sending duplicate deltas if system runs multiple times per frame + last_versions.update(network_id, new_seq); delta }; @@ -220,6 +230,46 @@ pub fn generate_delta_system(world: &mut World) { } } +/// Remove SkipNextDeltaGeneration markers after delta generation has run +/// +/// This system must run AFTER `generate_delta_system` to allow entities to be +/// synced again on the next actual local change. The marker prevents feedback +/// loops by skipping entities that just received remote updates, but we need +/// to remove it so future local changes get broadcast. +/// +/// Add this to your app after generate_delta_system: +/// +/// ```no_run +/// use bevy::prelude::*; +/// use libmarathon::networking::{generate_delta_system, cleanup_skip_delta_markers_system}; +/// +/// App::new().add_systems(PostUpdate, ( +/// generate_delta_system, +/// cleanup_skip_delta_markers_system, +/// ).chain()); +/// ``` +pub fn cleanup_skip_delta_markers_system(world: &mut World) { + // Use immediate removal (not deferred commands) to ensure markers are removed + // synchronously after generate_delta_system runs, not at the start of next frame + let entities_to_clean: Vec = { + let mut query = world.query_filtered::>(); + query.iter(world).collect() + }; + + for entity in &entities_to_clean { + if let Ok(mut entity_mut) = world.get_entity_mut(*entity) { + entity_mut.remove::(); + } + } + + if !entities_to_clean.is_empty() { + debug!( + "cleanup_skip_delta_markers_system: Removed markers from {} entities", + entities_to_clean.len() + ); + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/libmarathon/src/networking/gossip_bridge.rs b/crates/libmarathon/src/networking/gossip_bridge.rs index 1ec690b..f1b3f49 100644 --- a/crates/libmarathon/src/networking/gossip_bridge.rs +++ b/crates/libmarathon/src/networking/gossip_bridge.rs @@ -65,6 +65,33 @@ impl GossipBridge { /// Send a message to the gossip network pub fn send(&self, message: VersionedMessage) -> Result<()> { + // Diagnostic logging: track message type and nonce + let msg_type = match &message.message { + crate::networking::SyncMessage::EntityDelta { entity_id, .. } => { + format!("EntityDelta({})", entity_id) + } + crate::networking::SyncMessage::JoinRequest { node_id, .. } => { + format!("JoinRequest({})", node_id) + } + crate::networking::SyncMessage::FullState { entities, .. } => { + format!("FullState({} entities)", entities.len()) + } + crate::networking::SyncMessage::SyncRequest { node_id, .. } => { + format!("SyncRequest({})", node_id) + } + crate::networking::SyncMessage::MissingDeltas { deltas } => { + format!("MissingDeltas({} ops)", deltas.len()) + } + crate::networking::SyncMessage::Lock(lock_msg) => { + format!("Lock({:?})", lock_msg) + } + }; + + debug!( + "[GossipBridge::send] Node {} queuing message: {} (nonce: {})", + self.node_id, msg_type, message.nonce + ); + self.outgoing .lock() .map_err(|e| NetworkingError::Gossip(format!("Failed to lock outgoing queue: {}", e)))? @@ -97,6 +124,33 @@ impl GossipBridge { /// Push a message to the incoming queue (for testing/integration) pub fn push_incoming(&self, message: VersionedMessage) -> Result<()> { + // Diagnostic logging: track incoming message type + let msg_type = match &message.message { + crate::networking::SyncMessage::EntityDelta { entity_id, .. } => { + format!("EntityDelta({})", entity_id) + } + crate::networking::SyncMessage::JoinRequest { node_id, .. } => { + format!("JoinRequest({})", node_id) + } + crate::networking::SyncMessage::FullState { entities, .. } => { + format!("FullState({} entities)", entities.len()) + } + crate::networking::SyncMessage::SyncRequest { node_id, .. } => { + format!("SyncRequest({})", node_id) + } + crate::networking::SyncMessage::MissingDeltas { deltas } => { + format!("MissingDeltas({} ops)", deltas.len()) + } + crate::networking::SyncMessage::Lock(lock_msg) => { + format!("Lock({:?})", lock_msg) + } + }; + + debug!( + "[GossipBridge::push_incoming] Node {} received from network: {} (nonce: {})", + self.node_id, msg_type, message.nonce + ); + self.incoming .lock() .map_err(|e| NetworkingError::Gossip(format!("Failed to lock incoming queue: {}", e)))? diff --git a/crates/libmarathon/src/networking/join_protocol.rs b/crates/libmarathon/src/networking/join_protocol.rs index ef44ffc..7cbb05d 100644 --- a/crates/libmarathon/src/networking/join_protocol.rs +++ b/crates/libmarathon/src/networking/join_protocol.rs @@ -17,6 +17,7 @@ use crate::networking::{ GossipBridge, NetworkedEntity, SessionId, + Synced, VectorClock, blob_support::BlobStore, delta_generation::NodeVectorClock, @@ -129,8 +130,9 @@ pub fn build_full_state( } info!( - "Built FullState with {} entities for new peer", - entities.len() + "Built FullState with {} entities ({} total networked entities queried) for new peer", + entities.len(), + networked_entities.iter().count() ); VersionedMessage::new(SyncMessage::FullState { @@ -168,12 +170,17 @@ pub fn apply_full_state( { let mut node_clock = world.resource_mut::(); node_clock.clock.merge(&remote_clock); + info!("Vector clock after merge: {:?}", node_clock.clock); } + let mut spawned_count = 0; + let mut tombstoned_count = 0; + // Spawn all entities and apply their state for entity_state in entities { // Handle deleted entities (tombstones) if entity_state.is_deleted { + tombstoned_count += 1; // Record tombstone if let Some(mut registry) = world.get_resource_mut::() { registry.record_deletion( @@ -185,20 +192,43 @@ pub fn apply_full_state( continue; } - // Spawn entity with NetworkedEntity and Persisted components - // This ensures entities received via FullState are persisted locally - let entity = world - .spawn(( - NetworkedEntity::with_id(entity_state.entity_id, entity_state.owner_node_id), - crate::persistence::Persisted::with_id(entity_state.entity_id), - )) - .id(); + // Check if entity already exists in the map + let entity = { + let entity_map = world.resource::(); + entity_map.get_entity(entity_state.entity_id) + }; - // Register in entity map - { - let mut entity_map = world.resource_mut::(); - entity_map.insert(entity_state.entity_id, entity); - } + let entity = match entity { + Some(existing_entity) => { + // Entity already exists - reuse it and update components + debug!( + "Entity {} already exists (local entity {:?}), updating components", + entity_state.entity_id, existing_entity + ); + existing_entity + } + None => { + // Spawn new entity with NetworkedEntity, Persisted, and Synced components + // This ensures entities received via FullState are persisted locally and + // will auto-sync their Transform if one is added + let entity = world + .spawn(( + NetworkedEntity::with_id(entity_state.entity_id, entity_state.owner_node_id), + crate::persistence::Persisted::with_id(entity_state.entity_id), + Synced, + )) + .id(); + + // Register in entity map + { + let mut entity_map = world.resource_mut::(); + entity_map.insert(entity_state.entity_id, entity); + } + + spawned_count += 1; + entity + } + }; let num_components = entity_state.components.len(); @@ -261,12 +291,31 @@ pub fn apply_full_state( } debug!( - "Spawned entity {:?} from FullState with {} components", + "Applied entity {:?} from FullState with {} components", entity_state.entity_id, num_components ); } - info!("FullState applied successfully"); + info!( + "FullState applied successfully: spawned {} entities, skipped {} tombstones", + spawned_count, tombstoned_count + ); + + // Send SyncRequest to catch any deltas that arrived during FullState transfer + // This implements the "Final Sync" step from RFC 0004 (Session Lifecycle) + if let Some(bridge) = world.get_resource::() { + let node_clock = world.resource::(); + let request = crate::networking::operation_log::build_sync_request( + node_clock.node_id, + node_clock.clock.clone(), + ); + + if let Err(e) = bridge.send(request) { + error!("Failed to send post-FullState SyncRequest: {}", e); + } else { + info!("Sent SyncRequest to catch deltas that arrived during FullState transfer"); + } + } } /// System to handle JoinRequest messages diff --git a/crates/libmarathon/src/networking/message_dispatcher.rs b/crates/libmarathon/src/networking/message_dispatcher.rs index 40c8237..f607056 100644 --- a/crates/libmarathon/src/networking/message_dispatcher.rs +++ b/crates/libmarathon/src/networking/message_dispatcher.rs @@ -64,8 +64,32 @@ pub fn message_dispatcher_system(world: &mut World) { bridge.drain_incoming() }; + if !messages.is_empty() { + let node_id = world.resource::().node_id; + info!( + "[message_dispatcher] Node {} processing {} messages", + node_id, + messages.len() + ); + } + // Dispatch each message (bridge is no longer borrowed) for message in messages { + let node_id = world.resource::().node_id; + let msg_type = match &message.message { + SyncMessage::EntityDelta { entity_id, .. } => format!("EntityDelta({})", entity_id), + SyncMessage::JoinRequest { node_id, .. } => format!("JoinRequest({})", node_id), + SyncMessage::FullState { entities, .. } => format!("FullState({} entities)", entities.len()), + SyncMessage::SyncRequest { node_id, .. } => format!("SyncRequest({})", node_id), + SyncMessage::MissingDeltas { deltas } => format!("MissingDeltas({} ops)", deltas.len()), + SyncMessage::Lock(_) => "Lock".to_string(), + }; + + debug!( + "[message_dispatcher] Node {} dispatching: {} (nonce: {})", + node_id, msg_type, message.nonce + ); + dispatch_message(world, message); } @@ -256,6 +280,18 @@ fn dispatch_message(world: &mut World, message: crate::networking::VersionedMess } => { debug!("Received SyncRequest from node {}", requesting_node); + // Merge the requesting node's vector clock into ours + // This ensures we learn about their latest sequence number + { + let mut node_clock = world.resource_mut::(); + node_clock.clock.merge(&their_clock); + debug!( + "Merged SyncRequest clock from node {} (seq: {})", + requesting_node, + their_clock.get(requesting_node) + ); + } + if let Some(op_log) = world.get_resource::() { // Find operations they're missing let missing_deltas = op_log.get_all_operations_newer_than(&their_clock); @@ -480,8 +516,10 @@ fn build_full_state_from_data( } info!( - "Built FullState with {} entities for new peer", - entities.len() + "Built FullState with {} entities ({} total queried, {} tombstoned) for new peer", + entities.len(), + networked_entities.len(), + networked_entities.len() - entities.len() ); crate::networking::VersionedMessage::new(SyncMessage::FullState { diff --git a/crates/libmarathon/src/networking/messages.rs b/crates/libmarathon/src/networking/messages.rs index 6f34f83..8894bcc 100644 --- a/crates/libmarathon/src/networking/messages.rs +++ b/crates/libmarathon/src/networking/messages.rs @@ -27,12 +27,13 @@ pub struct VersionedMessage { /// The actual sync message pub message: SyncMessage, - /// Timestamp (nanos since UNIX epoch) to make messages unique + /// Nonce for selective deduplication control /// - /// This prevents iroh-gossip from deduplicating identical messages sent at different times. - /// For example, releasing and re-acquiring a lock sends identical LockRequest messages, - /// but they need to be treated as separate events. - pub timestamp_nanos: u64, + /// - For Lock messages: Unique nonce (counter + timestamp hash) to prevent + /// iroh-gossip deduplication, allowing repeated heartbeats. + /// - For other messages: Constant nonce (0) to enable content-based deduplication + /// by iroh-gossip, preventing feedback loops. + pub nonce: u32, } impl VersionedMessage { @@ -40,18 +41,44 @@ impl VersionedMessage { pub const CURRENT_VERSION: u32 = 1; /// Create a new versioned message with the current protocol version + /// + /// For Lock messages: Generates a unique nonce to prevent deduplication, since + /// lock heartbeats need to be sent repeatedly even with identical content. + /// + /// For other messages: Uses a constant nonce (0) to enable iroh-gossip's + /// content-based deduplication. This prevents feedback loops where the same + /// EntityDelta gets broadcast repeatedly. pub fn new(message: SyncMessage) -> Self { - use std::time::{SystemTime, UNIX_EPOCH}; + // Only generate unique nonces for Lock messages (heartbeats need to bypass dedup) + let nonce = if matches!(message, SyncMessage::Lock(_)) { + use std::hash::Hasher; + use std::sync::atomic::{AtomicU32, Ordering}; + use std::time::{SystemTime, UNIX_EPOCH}; - let timestamp_nanos = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or_default() - .as_nanos() as u64; + // Per-node rolling counter for sequential uniqueness + static COUNTER: AtomicU32 = AtomicU32::new(0); + let counter = COUNTER.fetch_add(1, Ordering::Relaxed); + + // Millisecond timestamp for temporal uniqueness + let timestamp_millis = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u32; + + // Hash counter + timestamp for final nonce + let mut hasher = rustc_hash::FxHasher::default(); + hasher.write_u32(counter); + hasher.write_u32(timestamp_millis); + hasher.finish() as u32 + } else { + // Use constant nonce for all other messages to enable content deduplication + 0 + }; Self { version: Self::CURRENT_VERSION, message, - timestamp_nanos, + nonce, } } } diff --git a/crates/libmarathon/src/networking/mod.rs b/crates/libmarathon/src/networking/mod.rs index ec30b2e..b1c80d2 100644 --- a/crates/libmarathon/src/networking/mod.rs +++ b/crates/libmarathon/src/networking/mod.rs @@ -54,6 +54,7 @@ mod plugin; mod rga; mod session; mod session_lifecycle; +mod session_sync; mod sync_component; mod tombstones; mod vector_clock; @@ -81,6 +82,7 @@ pub use plugin::*; pub use rga::*; pub use session::*; pub use session_lifecycle::*; +pub use session_sync::*; pub use sync_component::*; pub use tombstones::*; pub use vector_clock::*; diff --git a/crates/libmarathon/src/networking/operation_builder.rs b/crates/libmarathon/src/networking/operation_builder.rs index c16fa25..4c665cb 100644 --- a/crates/libmarathon/src/networking/operation_builder.rs +++ b/crates/libmarathon/src/networking/operation_builder.rs @@ -67,13 +67,12 @@ pub fn build_entity_operations( }; // Build the operation - let mut clock = vector_clock.clone(); - clock.increment(node_id); - + // Use the vector_clock as-is - it's already been incremented by the caller (delta_generation.rs:116) + // All operations in the same EntityDelta share the same vector clock (same logical timestamp) operations.push(ComponentOp::Set { discriminant, data, - vector_clock: clock.clone(), + vector_clock: vector_clock.clone(), }); debug!(" ✓ Added Set operation for discriminant {}", discriminant); @@ -86,3 +85,67 @@ pub fn build_entity_operations( ); operations } + +#[cfg(test)] +mod tests { + use super::*; + use bevy::prelude::*; + use crate::networking::NetworkedEntity; + use crate::persistence::{ComponentTypeRegistry, Persisted}; + + #[test] + fn test_operations_use_passed_vector_clock_without_extra_increment() { + // Setup: Create a minimal world with an entity + let mut world = World::new(); + let node_id = uuid::Uuid::new_v4(); + + // Use the global registry (Transform is already registered via inventory) + let registry = ComponentTypeRegistry::init(); + + // Create test entity with Transform + let entity_id = uuid::Uuid::new_v4(); + let entity = world.spawn(( + NetworkedEntity::with_id(entity_id, node_id), + Persisted::with_id(entity_id), + Transform::from_xyz(1.0, 2.0, 3.0), + )).id(); + + // Create a vector clock that's already been ticked + let mut vector_clock = VectorClock::new(); + vector_clock.increment(node_id); // Simulate the tick that delta_generation does + let expected_clock = vector_clock.clone(); + + // Build operations + let operations = build_entity_operations( + entity, + &world, + node_id, + vector_clock.clone(), + ®istry, + None, + ); + + // Verify: All operations should use the EXACT clock that was passed in + assert!(!operations.is_empty(), "Should have created at least one operation"); + + for op in &operations { + if let ComponentOp::Set { vector_clock: op_clock, .. } = op { + assert_eq!( + *op_clock, expected_clock, + "Operation clock should match the input clock exactly. \ + The bug was that operation_builder would increment the clock again, \ + causing EntityDelta.vector_clock and ComponentOp.vector_clock to be misaligned." + ); + + // Verify the sequence number matches + let op_seq = op_clock.get(node_id); + let expected_seq = expected_clock.get(node_id); + assert_eq!( + op_seq, expected_seq, + "Operation sequence should be {} (same as input clock), but got {}", + expected_seq, op_seq + ); + } + } + } +} diff --git a/crates/libmarathon/src/networking/operation_log.rs b/crates/libmarathon/src/networking/operation_log.rs index 4338f33..3cc2573 100644 --- a/crates/libmarathon/src/networking/operation_log.rs +++ b/crates/libmarathon/src/networking/operation_log.rs @@ -351,7 +351,7 @@ pub fn handle_missing_deltas_system(world: &mut World) { /// adaptive sync intervals based on network conditions. pub fn periodic_sync_system( bridge: Option>, - node_clock: Res, + mut node_clock: ResMut, time: Res