From 579b6bcabc73cbd7a9681745dbb3354bfc886aa7 Mon Sep 17 00:00:00 2001 From: Sienna Meridian Satterwhite Date: Sun, 16 Nov 2025 16:34:55 +0000 Subject: [PATCH] finished initial networking impl Signed-off-by: Sienna Meridian Satterwhite --- Cargo.lock | 71 +- crates/lib/Cargo.toml | 1 + crates/lib/src/lib.rs | 1 + crates/lib/src/networking/apply_ops.rs | 386 +++++++++++ crates/lib/src/networking/blob_support.rs | 379 +++++++++++ crates/lib/src/networking/change_detection.rs | 117 ++++ crates/lib/src/networking/components.rs | 410 +++++++++++ crates/lib/src/networking/delta_generation.rs | 193 ++++++ crates/lib/src/networking/entity_map.rs | 438 ++++++++++++ crates/lib/src/networking/error.rs | 77 +++ crates/lib/src/networking/gossip_bridge.rs | 142 ++++ crates/lib/src/networking/join_protocol.rs | 509 ++++++++++++++ crates/lib/src/networking/merge.rs | 263 +++++++ .../lib/src/networking/message_dispatcher.rs | 214 ++++++ crates/lib/src/networking/messages.rs | 345 ++++++++++ crates/lib/src/networking/mod.rs | 71 ++ .../lib/src/networking/operation_builder.rs | 251 +++++++ crates/lib/src/networking/operation_log.rs | 529 +++++++++++++++ crates/lib/src/networking/operations.rs | 388 +++++++++++ crates/lib/src/networking/orset.rs | 483 +++++++++++++ crates/lib/src/networking/plugin.rs | 292 ++++++++ crates/lib/src/networking/rga.rs | 639 ++++++++++++++++++ crates/lib/src/networking/tombstones.rs | 430 ++++++++++++ crates/lib/src/networking/vector_clock.rs | 456 +++++++++++++ crates/lib/src/persistence/mod.rs | 2 +- crates/lib/tests/networking_gossip_test.rs | 39 ++ docs/rfcs/0001-crdt-gossip-sync.md | 2 +- docs/rfcs/0002-persistence-strategy.md | 2 +- 28 files changed, 7117 insertions(+), 13 deletions(-) create mode 100644 crates/lib/src/networking/apply_ops.rs create mode 100644 crates/lib/src/networking/blob_support.rs create mode 100644 crates/lib/src/networking/change_detection.rs create mode 100644 crates/lib/src/networking/components.rs create mode 100644 crates/lib/src/networking/delta_generation.rs create mode 100644 crates/lib/src/networking/entity_map.rs create mode 100644 crates/lib/src/networking/error.rs create mode 100644 crates/lib/src/networking/gossip_bridge.rs create mode 100644 crates/lib/src/networking/join_protocol.rs create mode 100644 crates/lib/src/networking/merge.rs create mode 100644 crates/lib/src/networking/message_dispatcher.rs create mode 100644 crates/lib/src/networking/messages.rs create mode 100644 crates/lib/src/networking/mod.rs create mode 100644 crates/lib/src/networking/operation_builder.rs create mode 100644 crates/lib/src/networking/operation_log.rs create mode 100644 crates/lib/src/networking/operations.rs create mode 100644 crates/lib/src/networking/orset.rs create mode 100644 crates/lib/src/networking/plugin.rs create mode 100644 crates/lib/src/networking/rga.rs create mode 100644 crates/lib/src/networking/tombstones.rs create mode 100644 crates/lib/src/networking/vector_clock.rs create mode 100644 crates/lib/tests/networking_gossip_test.rs diff --git a/Cargo.lock b/Cargo.lock index 44d7f23..f32da28 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -102,7 +102,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac8202ab55fcbf46ca829833f347a82a2a4ce0596f0304ac322c2d100030cd56" dependencies = [ "bytes", - "crypto-common", + "crypto-common 0.2.0-rc.4", "inout", ] @@ -1739,6 +1739,15 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d8c1fef690941d3e7788d328517591fecc684c084084702d6ff1641e993699a" +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "block-buffer" version = "0.11.0" @@ -1980,8 +1989,8 @@ version = "0.5.0-rc.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e12a13eb01ded5d32ee9658d94f553a19e804204f2dc811df69ab4d9e0cb8c7" dependencies = [ - "block-buffer", - "crypto-common", + "block-buffer 0.11.0", + "crypto-common 0.2.0-rc.4", "inout", "zeroize", ] @@ -2341,6 +2350,16 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" +[[package]] +name = "crypto-common" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a" +dependencies = [ + "generic-array", + "typenum", +] + [[package]] name = "crypto-common" version = "0.2.0-rc.4" @@ -2409,7 +2428,7 @@ dependencies = [ "cfg-if", "cpufeatures", "curve25519-dalek-derive", - "digest", + "digest 0.11.0-rc.3", "fiat-crypto", "rand_core 0.9.3", "rustc_version", @@ -2586,15 +2605,25 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab03c107fafeb3ee9f5925686dbb7a73bc76e3932abb0d2b365cb64b169cf04c" +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer 0.10.4", + "crypto-common 0.1.7", +] + [[package]] name = "digest" version = "0.11.0-rc.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dac89f8a64533a9b0eaa73a68e424db0fb1fd6271c74cc0125336a05f090568d" dependencies = [ - "block-buffer", + "block-buffer 0.11.0", "const-oid", - "crypto-common", + "crypto-common 0.2.0-rc.4", ] [[package]] @@ -2753,7 +2782,7 @@ dependencies = [ "ed25519", "rand_core 0.9.3", "serde", - "sha2", + "sha2 0.11.0-rc.2", "signature", "subtle", "zeroize", @@ -3451,6 +3480,16 @@ dependencies = [ "windows 0.61.3", ] +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "gethostname" version = "1.1.0" @@ -4696,6 +4735,7 @@ dependencies = [ "rusqlite", "serde", "serde_json", + "sha2 0.10.9", "sync-macros", "thiserror 2.0.17", "tokio", @@ -7037,7 +7077,7 @@ checksum = "c5e046edf639aa2e7afb285589e5405de2ef7e61d4b0ac1e30256e3eab911af9" dependencies = [ "cfg-if", "cpufeatures", - "digest", + "digest 0.11.0-rc.3", ] [[package]] @@ -7046,6 +7086,17 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" +[[package]] +name = "sha2" +version = "0.10.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest 0.10.7", +] + [[package]] name = "sha2" version = "0.11.0-rc.2" @@ -7054,7 +7105,7 @@ checksum = "d1e3878ab0f98e35b2df35fe53201d088299b41a6bb63e3e34dada2ac4abd924" dependencies = [ "cfg-if", "cpufeatures", - "digest", + "digest 0.11.0-rc.3", ] [[package]] @@ -8078,7 +8129,7 @@ version = "0.6.0-rc.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a55be643b40a21558f44806b53ee9319595bc7ca6896372e4e08e5d7d83c9cd6" dependencies = [ - "crypto-common", + "crypto-common 0.2.0-rc.4", "subtle", ] diff --git a/crates/lib/Cargo.toml b/crates/lib/Cargo.toml index 3e9f892..36e6067 100644 --- a/crates/lib/Cargo.toml +++ b/crates/lib/Cargo.toml @@ -18,6 +18,7 @@ tracing.workspace = true bevy.workspace = true bincode = "1.3" futures-lite = "2.0" +sha2 = "0.10" [dev-dependencies] tokio.workspace = true diff --git a/crates/lib/src/lib.rs b/crates/lib/src/lib.rs index 8f6b368..2837811 100644 --- a/crates/lib/src/lib.rs +++ b/crates/lib/src/lib.rs @@ -24,6 +24,7 @@ mod db; mod error; mod models; +pub mod networking; pub mod persistence; pub mod sync; diff --git a/crates/lib/src/networking/apply_ops.rs b/crates/lib/src/networking/apply_ops.rs new file mode 100644 index 0000000..f2ae754 --- /dev/null +++ b/crates/lib/src/networking/apply_ops.rs @@ -0,0 +1,386 @@ +//! Apply remote operations to local ECS state +//! +//! This module handles incoming EntityDelta messages and applies them to the +//! local Bevy world using CRDT merge semantics. + +use bevy::{ + prelude::*, + reflect::TypeRegistry, +}; + +use crate::{ + networking::{ + blob_support::{ + get_component_data, + BlobStore, + }, + delta_generation::NodeVectorClock, + entity_map::NetworkEntityMap, + messages::{ + ComponentData, + EntityDelta, + SyncMessage, + }, + operations::ComponentOp, + NetworkedEntity, + }, + persistence::reflection::deserialize_component, +}; + +/// Apply an EntityDelta message to the local world +/// +/// This function: +/// 1. Checks tombstone registry to prevent resurrection +/// 2. Looks up the entity by network_id +/// 3. Spawns a new entity if it doesn't exist +/// 4. Applies each ComponentOp using CRDT merge semantics +/// +/// # Parameters +/// +/// - `delta`: The EntityDelta to apply +/// - `commands`: Bevy Commands for spawning/modifying entities +/// - `entity_map`: Map from network_id to Entity +/// - `type_registry`: Bevy's type registry for deserialization +/// - `node_clock`: Our node's vector clock (for causality tracking) +/// - `blob_store`: Optional blob store for resolving large component references +/// - `tombstone_registry`: Optional tombstone registry for deletion tracking +pub fn apply_entity_delta( + delta: &EntityDelta, + commands: &mut Commands, + entity_map: &mut NetworkEntityMap, + type_registry: &TypeRegistry, + node_clock: &mut NodeVectorClock, + blob_store: Option<&BlobStore>, + mut tombstone_registry: Option<&mut crate::networking::TombstoneRegistry>, +) { + // Validate and merge the remote vector clock + // Check for clock regression (shouldn't happen in correct implementations) + if delta.vector_clock.happened_before(&node_clock.clock) { + warn!( + "Received operation with clock from the past for entity {:?}. \ + Remote clock happened before our clock. This may indicate clock issues.", + delta.entity_id + ); + } + + // Merge the remote vector clock into ours + node_clock.clock.merge(&delta.vector_clock); + + // Check if any operations are Delete operations + for op in &delta.operations { + if let crate::networking::ComponentOp::Delete { vector_clock } = op { + // Record tombstone + if let Some(ref mut registry) = tombstone_registry { + registry.record_deletion( + delta.entity_id, + delta.node_id, + vector_clock.clone(), + ); + + // Despawn the entity if it exists locally + if let Some(entity) = entity_map.get_entity(delta.entity_id) { + commands.entity(entity).despawn(); + entity_map.remove_by_network_id(delta.entity_id); + info!("Despawned entity {:?} due to Delete operation", delta.entity_id); + } + + // Don't process other operations - entity is deleted + return; + } + } + } + + // Check if we should ignore this delta due to deletion + if let Some(ref registry) = tombstone_registry { + if registry.should_ignore_operation(delta.entity_id, &delta.vector_clock) { + debug!( + "Ignoring delta for deleted entity {:?}", + delta.entity_id + ); + return; + } + } + + // Look up or create the entity + let entity = match entity_map.get_entity(delta.entity_id) { + Some(entity) => entity, + None => { + // Spawn new entity with NetworkedEntity component + let entity = commands + .spawn(NetworkedEntity::with_id(delta.entity_id, delta.node_id)) + .id(); + + entity_map.insert(delta.entity_id, entity); + info!( + "Spawned new networked entity {:?} from node {}", + delta.entity_id, delta.node_id + ); + + entity + } + }; + + // Apply each operation (skip Delete operations - handled above) + for op in &delta.operations { + if !op.is_delete() { + apply_component_op(entity, op, commands, type_registry, blob_store); + } + } +} + +/// Apply a single ComponentOp to an entity +/// +/// This dispatches to the appropriate CRDT merge logic based on the operation +/// type. +fn apply_component_op( + entity: Entity, + op: &ComponentOp, + commands: &mut Commands, + type_registry: &TypeRegistry, + blob_store: Option<&BlobStore>, +) { + match op { + | ComponentOp::Set { + component_type, + data, + vector_clock: _, + } => { + apply_set_operation(entity, component_type, data, commands, type_registry, blob_store); + } + | ComponentOp::SetAdd { component_type, .. } => { + // OR-Set add - Phase 10 provides OrSet type + // Application code should use OrSet in components and handle SetAdd/SetRemove + // Full integration will be in Phase 12 plugin + debug!("SetAdd operation for {} (use OrSet in components)", component_type); + } + | ComponentOp::SetRemove { component_type, .. } => { + // OR-Set remove - Phase 10 provides OrSet type + // Application code should use OrSet in components and handle SetAdd/SetRemove + // Full integration will be in Phase 12 plugin + debug!("SetRemove operation for {} (use OrSet in components)", component_type); + } + | ComponentOp::SequenceInsert { .. } => { + // RGA insert - will be implemented in Phase 11 + debug!("SequenceInsert operation not yet implemented"); + } + | ComponentOp::SequenceDelete { .. } => { + // RGA delete - will be implemented in Phase 11 + debug!("SequenceDelete operation not yet implemented"); + } + | ComponentOp::Delete { .. } => { + // Entity deletion - will be implemented in Phase 9 + debug!("Delete operation not yet implemented"); + } + } +} + +/// Apply a Set operation (Last-Write-Wins) +/// +/// Deserializes the component and inserts/updates it on the entity. +/// Handles both inline data and blob references. +fn apply_set_operation( + entity: Entity, + component_type: &str, + data: &ComponentData, + commands: &mut Commands, + type_registry: &TypeRegistry, + blob_store: Option<&BlobStore>, +) { + // Get the actual data (resolve blob if needed) + let data_bytes = match data { + | ComponentData::Inline(bytes) => bytes.clone(), + | ComponentData::BlobRef { hash: _, size: _ } => { + if let Some(store) = blob_store { + match get_component_data(data, store) { + Ok(bytes) => bytes, + Err(e) => { + error!( + "Failed to retrieve blob for component {}: {}", + component_type, e + ); + return; + } + } + } else { + error!( + "Blob reference for {} but no blob store available", + component_type + ); + return; + } + } + }; + + // Deserialize the component + let reflected = match deserialize_component(&data_bytes, type_registry) { + Ok(reflected) => reflected, + Err(e) => { + error!( + "Failed to deserialize component {}: {}", + component_type, e + ); + return; + } + }; + + // Get the type registration + let registration = match type_registry.get_with_type_path(component_type) { + Some(reg) => reg, + None => { + error!("Component type {} not registered", component_type); + return; + } + }; + + // Get ReflectComponent data + let reflect_component = match registration.data::() { + Some(rc) => rc.clone(), + None => { + error!( + "Component type {} does not have ReflectComponent data", + component_type + ); + return; + } + }; + + // Clone what we need to avoid lifetime issues + let component_type_owned = component_type.to_string(); + + // Insert or update the component + commands.queue(move |world: &mut World| { + // Get the type registry from the world and clone it + let type_registry_arc = { + let Some(type_registry_res) = world.get_resource::() else { + error!("AppTypeRegistry not found in world"); + return; + }; + type_registry_res.clone() + }; + + // Now we can safely get mutable access to the world + let type_registry = type_registry_arc.read(); + + if let Ok(mut entity_mut) = world.get_entity_mut(entity) { + reflect_component.insert(&mut entity_mut, &*reflected, &type_registry); + debug!("Applied Set operation for {}", component_type_owned); + } + }); +} + +/// System to receive and apply incoming EntityDelta messages +/// +/// This system polls the GossipBridge for incoming messages and applies them +/// to the local world. +/// +/// Add this to your app: +/// +/// ```no_run +/// use bevy::prelude::*; +/// use lib::networking::receive_and_apply_deltas_system; +/// +/// App::new() +/// .add_systems(Update, receive_and_apply_deltas_system); +/// ``` +pub fn receive_and_apply_deltas_system( + mut commands: Commands, + bridge: Option>, + mut entity_map: ResMut, + type_registry: Res, + mut node_clock: ResMut, + blob_store: Option>, + mut tombstone_registry: Option>, +) { + let Some(bridge) = bridge else { + return; + }; + + let registry = type_registry.read(); + let blob_store_ref = blob_store.as_deref(); + + // Poll for incoming messages + while let Some(message) = bridge.try_recv() { + match message.message { + | SyncMessage::EntityDelta { + entity_id, + node_id, + vector_clock, + operations, + } => { + let delta = EntityDelta { + entity_id, + node_id, + vector_clock, + operations, + }; + + debug!( + "Received EntityDelta for entity {:?} with {} operations", + delta.entity_id, + delta.operations.len() + ); + + apply_entity_delta( + &delta, + &mut commands, + &mut entity_map, + ®istry, + &mut node_clock, + blob_store_ref, + tombstone_registry.as_deref_mut(), + ); + } + | SyncMessage::JoinRequest { .. } => { + // Handled by handle_join_requests_system + debug!("JoinRequest handled by dedicated system"); + } + | SyncMessage::FullState { .. } => { + // Handled by handle_full_state_system + debug!("FullState handled by dedicated system"); + } + | SyncMessage::SyncRequest { .. } => { + // Handled by handle_sync_requests_system + debug!("SyncRequest handled by dedicated system"); + } + | SyncMessage::MissingDeltas { .. } => { + // Handled by handle_missing_deltas_system + debug!("MissingDeltas handled by dedicated system"); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_node_clock_merge() { + let node_id = uuid::Uuid::new_v4(); + let mut node_clock = NodeVectorClock::new(node_id); + + let remote_node = uuid::Uuid::new_v4(); + let mut remote_clock = crate::networking::VectorClock::new(); + remote_clock.increment(remote_node); + remote_clock.increment(remote_node); + + // Merge remote clock + node_clock.clock.merge(&remote_clock); + + // Our clock should have the remote node's sequence + assert_eq!(node_clock.clock.get(remote_node), 2); + } + + #[test] + fn test_entity_delta_structure() { + let entity_id = uuid::Uuid::new_v4(); + let node_id = uuid::Uuid::new_v4(); + let clock = crate::networking::VectorClock::new(); + + let delta = EntityDelta::new(entity_id, node_id, clock, vec![]); + + assert_eq!(delta.entity_id, entity_id); + assert_eq!(delta.node_id, node_id); + assert_eq!(delta.operations.len(), 0); + } +} diff --git a/crates/lib/src/networking/blob_support.rs b/crates/lib/src/networking/blob_support.rs new file mode 100644 index 0000000..547efb0 --- /dev/null +++ b/crates/lib/src/networking/blob_support.rs @@ -0,0 +1,379 @@ +//! Large blob support for components >64KB +//! +//! This module handles large component data using iroh-blobs. When a component +//! exceeds the inline threshold (64KB), it's stored as a blob and referenced +//! by its hash in the ComponentOp. +//! +//! **NOTE:** This is a simplified implementation for Phase 6. Full iroh-blobs +//! integration will be completed when we integrate with actual gossip networking. + +use std::{ + collections::HashMap, + sync::{ + Arc, + Mutex, + }, +}; + +use bevy::prelude::*; + +use crate::networking::{ + error::{ + NetworkingError, + Result, + }, + messages::ComponentData, +}; + +/// Threshold for storing data as a blob (64KB) +pub const BLOB_THRESHOLD: usize = 64 * 1024; + +/// Hash type for blob references +pub type BlobHash = Vec; + +/// Bevy resource for managing blobs +/// +/// This resource provides blob storage and retrieval. In Phase 6, we use +/// an in-memory cache. Later phases will integrate with iroh-blobs for +/// persistent storage and P2P transfer. +#[derive(Resource, Clone)] +pub struct BlobStore { + /// In-memory cache of blobs (hash -> data) + cache: Arc>>>, +} + +impl BlobStore { + /// Create a new blob store + pub fn new() -> Self { + Self { + cache: Arc::new(Mutex::new(HashMap::new())), + } + } + + /// Store a blob and return its hash + /// + /// # Example + /// + /// ``` + /// use lib::networking::BlobStore; + /// + /// let store = BlobStore::new(); + /// let data = vec![1, 2, 3, 4, 5]; + /// let hash = store.store_blob(data.clone()).unwrap(); + /// + /// let retrieved = store.get_blob(&hash).unwrap(); + /// assert_eq!(retrieved, Some(data)); + /// ``` + pub fn store_blob(&self, data: Vec) -> Result { + // Use SHA-256 for content-addressable storage + let hash = Self::hash_data(&data); + + self.cache + .lock() + .map_err(|e| NetworkingError::Blob(format!("Failed to lock cache: {}", e)))? + .insert(hash.clone(), data); + + Ok(hash) + } + + /// Retrieve a blob by its hash + /// + /// Returns `None` if the blob is not in the cache. + pub fn get_blob(&self, hash: &BlobHash) -> Result>> { + Ok(self + .cache + .lock() + .map_err(|e| NetworkingError::Blob(format!("Failed to lock cache: {}", e)))? + .get(hash) + .cloned()) + } + + /// Check if a blob exists in the cache + /// + /// Returns an error if the cache lock is poisoned. + pub fn has_blob(&self, hash: &BlobHash) -> Result { + Ok(self.cache + .lock() + .map_err(|e| NetworkingError::Blob(format!("Failed to lock cache: {}", e)))? + .contains_key(hash)) + } + + /// Get a blob if it exists (atomic check-and-get) + /// + /// This is safer than calling `has_blob()` followed by `get_blob()` because + /// it's atomic - the blob can't be removed between the check and get. + pub fn get_blob_if_exists(&self, hash: &BlobHash) -> Result>> { + Ok(self.cache + .lock() + .map_err(|e| NetworkingError::Blob(format!("Failed to lock cache: {}", e)))? + .get(hash) + .cloned()) + } + + /// Get cache size (number of blobs) + /// + /// Returns an error if the cache lock is poisoned. + pub fn cache_size(&self) -> Result { + Ok(self.cache + .lock() + .map_err(|e| NetworkingError::Blob(format!("Failed to lock cache: {}", e)))? + .len()) + } + + /// Clear the cache + pub fn clear_cache(&self) -> Result<()> { + self.cache + .lock() + .map_err(|e| NetworkingError::Blob(format!("Failed to lock cache: {}", e)))? + .clear(); + Ok(()) + } + + /// Hash data using SHA-256 + fn hash_data(data: &[u8]) -> BlobHash { + use sha2::{ + Digest, + Sha256, + }; + + let mut hasher = Sha256::new(); + hasher.update(data); + hasher.finalize().to_vec() + } +} + +impl Default for BlobStore { + fn default() -> Self { + Self::new() + } +} + +/// Determine whether data should be stored as a blob +/// +/// # Example +/// +/// ``` +/// use lib::networking::should_use_blob; +/// +/// let small_data = vec![1, 2, 3]; +/// assert!(!should_use_blob(&small_data)); +/// +/// let large_data = vec![0u8; 100_000]; +/// assert!(should_use_blob(&large_data)); +/// ``` +pub fn should_use_blob(data: &[u8]) -> bool { + data.len() > BLOB_THRESHOLD +} + +/// Create ComponentData, automatically choosing inline vs blob +/// +/// This helper function inspects the data size and creates the appropriate +/// ComponentData variant. +/// +/// # Example +/// +/// ``` +/// use lib::networking::{create_component_data, BlobStore}; +/// +/// let store = BlobStore::new(); +/// +/// // Small data goes inline +/// let small_data = vec![1, 2, 3]; +/// let component_data = create_component_data(small_data, &store).unwrap(); +/// +/// // Large data becomes a blob reference +/// let large_data = vec![0u8; 100_000]; +/// let component_data = create_component_data(large_data, &store).unwrap(); +/// ``` +pub fn create_component_data(data: Vec, blob_store: &BlobStore) -> Result { + if should_use_blob(&data) { + let size = data.len() as u64; + let hash = blob_store.store_blob(data)?; + Ok(ComponentData::BlobRef { hash, size }) + } else { + Ok(ComponentData::Inline(data)) + } +} + +/// Retrieve the actual data from ComponentData +/// +/// This resolves blob references by fetching from the blob store. +/// +/// # Example +/// +/// ``` +/// use lib::networking::{get_component_data, BlobStore, ComponentData}; +/// +/// let store = BlobStore::new(); +/// +/// // Inline data +/// let inline = ComponentData::Inline(vec![1, 2, 3]); +/// let data = get_component_data(&inline, &store).unwrap(); +/// assert_eq!(data, vec![1, 2, 3]); +/// ``` +pub fn get_component_data(data: &ComponentData, blob_store: &BlobStore) -> Result> { + match data { + | ComponentData::Inline(bytes) => Ok(bytes.clone()), + | ComponentData::BlobRef { hash, size: _ } => blob_store + .get_blob(hash)? + .ok_or_else(|| NetworkingError::Blob(format!("Blob not found: {:x?}", hash))), + } +} + +/// Request a blob from the network +/// +/// **NOTE:** This is a stub for Phase 6. Will be implemented in later phases +/// when we have full gossip integration. +pub fn request_blob_from_network(_hash: &BlobHash, _blob_store: &BlobStore) -> Result<()> { + // TODO: Implement in later phases with iroh-gossip + debug!("request_blob_from_network not yet implemented"); + Ok(()) +} + +/// Bevy system to handle blob requests +/// +/// This system processes incoming blob requests and serves blobs to peers. +/// +/// **NOTE:** Stub implementation for Phase 6. +pub fn blob_transfer_system(_blob_store: Option>) { + // TODO: Implement when we have gossip networking +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_blob_store_creation() { + let store = BlobStore::new(); + assert_eq!(store.cache_size().unwrap(), 0); + } + + #[test] + fn test_store_and_retrieve_blob() { + let store = BlobStore::new(); + let data = vec![1, 2, 3, 4, 5]; + + let hash = store.store_blob(data.clone()).unwrap(); + let retrieved = store.get_blob(&hash).unwrap(); + + assert_eq!(retrieved, Some(data)); + } + + #[test] + fn test_blob_hash_is_deterministic() { + let store = BlobStore::new(); + let data = vec![1, 2, 3, 4, 5]; + + let hash1 = store.store_blob(data.clone()).unwrap(); + let hash2 = store.store_blob(data.clone()).unwrap(); + + assert_eq!(hash1, hash2); + } + + #[test] + fn test_has_blob() { + let store = BlobStore::new(); + let data = vec![1, 2, 3, 4, 5]; + + let hash = store.store_blob(data).unwrap(); + assert!(store.has_blob(&hash).unwrap()); + + let fake_hash = vec![0; 32]; + assert!(!store.has_blob(&fake_hash).unwrap()); + } + + #[test] + fn test_clear_cache() { + let store = BlobStore::new(); + let data = vec![1, 2, 3, 4, 5]; + + store.store_blob(data).unwrap(); + assert_eq!(store.cache_size().unwrap(), 1); + + store.clear_cache().unwrap(); + assert_eq!(store.cache_size().unwrap(), 0); + } + + #[test] + fn test_should_use_blob() { + let small_data = vec![0u8; 1000]; + assert!(!should_use_blob(&small_data)); + + let large_data = vec![0u8; 100_000]; + assert!(should_use_blob(&large_data)); + + let threshold_data = vec![0u8; BLOB_THRESHOLD]; + assert!(!should_use_blob(&threshold_data)); + + let over_threshold = vec![0u8; BLOB_THRESHOLD + 1]; + assert!(should_use_blob(&over_threshold)); + } + + #[test] + fn test_create_component_data_inline() { + let store = BlobStore::new(); + let small_data = vec![1, 2, 3]; + + let component_data = create_component_data(small_data.clone(), &store).unwrap(); + + match component_data { + | ComponentData::Inline(data) => assert_eq!(data, small_data), + | ComponentData::BlobRef { .. } => panic!("Expected inline data"), + } + } + + #[test] + fn test_create_component_data_blob() { + let store = BlobStore::new(); + let large_data = vec![0u8; 100_000]; + + let component_data = create_component_data(large_data.clone(), &store).unwrap(); + + match component_data { + | ComponentData::BlobRef { hash, size } => { + assert_eq!(size, 100_000); + assert!(store.has_blob(&hash).unwrap()); + } + | ComponentData::Inline(_) => panic!("Expected blob reference"), + } + } + + #[test] + fn test_get_component_data_inline() { + let store = BlobStore::new(); + let inline = ComponentData::Inline(vec![1, 2, 3]); + + let data = get_component_data(&inline, &store).unwrap(); + assert_eq!(data, vec![1, 2, 3]); + } + + #[test] + fn test_get_component_data_blob() { + let store = BlobStore::new(); + let large_data = vec![0u8; 100_000]; + let hash = store.store_blob(large_data.clone()).unwrap(); + + let blob_ref = ComponentData::BlobRef { + hash, + size: 100_000, + }; + + let data = get_component_data(&blob_ref, &store).unwrap(); + assert_eq!(data, large_data); + } + + #[test] + fn test_get_component_data_missing_blob() { + let store = BlobStore::new(); + let fake_hash = vec![0; 32]; + + let blob_ref = ComponentData::BlobRef { + hash: fake_hash, + size: 1000, + }; + + let result = get_component_data(&blob_ref, &store); + assert!(result.is_err()); + } +} diff --git a/crates/lib/src/networking/change_detection.rs b/crates/lib/src/networking/change_detection.rs new file mode 100644 index 0000000..990959e --- /dev/null +++ b/crates/lib/src/networking/change_detection.rs @@ -0,0 +1,117 @@ +//! Change detection for networked entities +//! +//! This module provides systems that detect when networked components change +//! and prepare them for delta generation. + +use bevy::prelude::*; + +use crate::networking::{ + NetworkedEntity, + NetworkedTransform, +}; + +/// System to automatically detect Transform changes and mark entity for sync +/// +/// This system detects changes to Transform components on networked entities +/// and triggers persistence by accessing `NetworkedEntity` mutably (which marks +/// it as changed via Bevy's change detection). +/// +/// Add this system to your app if you want automatic synchronization of +/// Transform changes: +/// +/// ```no_run +/// use bevy::prelude::*; +/// use lib::networking::auto_detect_transform_changes_system; +/// +/// App::new() +/// .add_systems(Update, auto_detect_transform_changes_system); +/// ``` +pub fn auto_detect_transform_changes_system( + mut query: Query< + &mut NetworkedEntity, + ( + With, + Or<(Changed, Changed)>, + ), + >, +) { + // Simply accessing &mut NetworkedEntity triggers Bevy's change detection + for mut _networked in query.iter_mut() { + // No-op - the mutable access itself marks NetworkedEntity as changed + // This will trigger the delta generation system + } +} + +/// Resource to track the last sync version for each entity +/// +/// This helps us avoid sending redundant deltas for the same changes. +#[derive(Resource, Default)] +pub struct LastSyncVersions { + /// Map from network_id to the last vector clock we synced + versions: std::collections::HashMap, +} + +impl LastSyncVersions { + /// Check if we should sync this entity based on version + pub fn should_sync(&self, network_id: uuid::Uuid, version: u64) -> bool { + match self.versions.get(&network_id) { + Some(&last_version) => version > last_version, + None => true, // Never synced before + } + } + + /// Update the last synced version for an entity + pub fn update(&mut self, network_id: uuid::Uuid, version: u64) { + self.versions.insert(network_id, version); + } + + /// Remove tracking for an entity (when despawned) + pub fn remove(&mut self, network_id: uuid::Uuid) { + self.versions.remove(&network_id); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_last_sync_versions() { + let mut versions = LastSyncVersions::default(); + let id = uuid::Uuid::new_v4(); + + // Should sync when never synced before + assert!(versions.should_sync(id, 1)); + + // Update to version 1 + versions.update(id, 1); + + // Should not sync same version + assert!(!versions.should_sync(id, 1)); + + // Should not sync older version + assert!(!versions.should_sync(id, 0)); + + // Should sync newer version + assert!(versions.should_sync(id, 2)); + + // Remove and should sync again + versions.remove(id); + assert!(versions.should_sync(id, 2)); + } + + #[test] + fn test_multiple_entities() { + let mut versions = LastSyncVersions::default(); + let id1 = uuid::Uuid::new_v4(); + let id2 = uuid::Uuid::new_v4(); + + versions.update(id1, 5); + versions.update(id2, 3); + + assert!(!versions.should_sync(id1, 4)); + assert!(versions.should_sync(id1, 6)); + assert!(!versions.should_sync(id2, 2)); + assert!(versions.should_sync(id2, 4)); + } +} diff --git a/crates/lib/src/networking/components.rs b/crates/lib/src/networking/components.rs new file mode 100644 index 0000000..edec5e6 --- /dev/null +++ b/crates/lib/src/networking/components.rs @@ -0,0 +1,410 @@ +//! Networked entity components +//! +//! This module defines components that mark entities as networked and track +//! their network identity across the distributed system. + +use bevy::prelude::*; +use serde::{ + Deserialize, + Serialize, +}; + +use crate::networking::vector_clock::NodeId; + +/// Marker component indicating an entity should be synchronized over the +/// network +/// +/// Add this component to any entity that should have its state synchronized +/// across peers. The networking system will automatically track changes and +/// broadcast deltas. +/// +/// # Relationship with Persisted +/// +/// NetworkedEntity and Persisted are complementary: +/// - `Persisted` - Entity state saved to local SQLite database +/// - `NetworkedEntity` - Entity state synchronized across network peers +/// +/// Most entities will have both components for full durability and sync. +/// +/// # Network Identity +/// +/// Each networked entity has: +/// - `network_id` - Globally unique UUID for this entity across all peers +/// - `owner_node_id` - Node that originally created this entity +/// +/// # Example +/// +/// ``` +/// use bevy::prelude::*; +/// use lib::networking::NetworkedEntity; +/// use uuid::Uuid; +/// +/// fn spawn_networked_entity(mut commands: Commands) { +/// let node_id = Uuid::new_v4(); +/// +/// commands.spawn(( +/// NetworkedEntity::new(node_id), +/// Transform::default(), +/// )); +/// } +/// ``` +#[derive(Component, Reflect, Debug, Clone, Serialize, Deserialize)] +#[reflect(Component)] +pub struct NetworkedEntity { + /// Globally unique network ID for this entity + /// + /// This ID is used to identify the entity across all peers in the network. + /// When a peer receives an EntityDelta, it uses this ID to locate the + /// corresponding local entity. + pub network_id: uuid::Uuid, + + /// Node that created this entity + /// + /// Used for conflict resolution and ownership tracking. When two nodes + /// concurrently create entities, the owner_node_id can be used as a + /// tiebreaker. + pub owner_node_id: NodeId, +} + +impl NetworkedEntity { + /// Create a new networked entity + /// + /// Generates a new random network_id and sets the owner to the specified + /// node. + /// + /// # Example + /// + /// ``` + /// use lib::networking::NetworkedEntity; + /// use uuid::Uuid; + /// + /// let node_id = Uuid::new_v4(); + /// let entity = NetworkedEntity::new(node_id); + /// + /// assert_eq!(entity.owner_node_id, node_id); + /// ``` + pub fn new(owner_node_id: NodeId) -> Self { + Self { + network_id: uuid::Uuid::new_v4(), + owner_node_id, + } + } + + /// Create a networked entity with a specific network ID + /// + /// Used when receiving entities from remote peers - we need to use their + /// network_id rather than generating a new one. + /// + /// # Example + /// + /// ``` + /// use lib::networking::NetworkedEntity; + /// use uuid::Uuid; + /// + /// let network_id = Uuid::new_v4(); + /// let owner_id = Uuid::new_v4(); + /// let entity = NetworkedEntity::with_id(network_id, owner_id); + /// + /// assert_eq!(entity.network_id, network_id); + /// assert_eq!(entity.owner_node_id, owner_id); + /// ``` + pub fn with_id(network_id: uuid::Uuid, owner_node_id: NodeId) -> Self { + Self { + network_id, + owner_node_id, + } + } + + /// Check if this node owns the entity + pub fn is_owned_by(&self, node_id: NodeId) -> bool { + self.owner_node_id == node_id + } +} + +impl Default for NetworkedEntity { + fn default() -> Self { + Self { + network_id: uuid::Uuid::new_v4(), + owner_node_id: uuid::Uuid::new_v4(), + } + } +} + +/// Wrapper for Transform component that enables CRDT synchronization +/// +/// This is a marker component used alongside Transform to indicate that +/// Transform changes should be synchronized using Last-Write-Wins semantics. +/// +/// # Example +/// +/// ``` +/// use bevy::prelude::*; +/// use lib::networking::{NetworkedEntity, NetworkedTransform}; +/// use uuid::Uuid; +/// +/// fn spawn_synced_transform(mut commands: Commands) { +/// let node_id = Uuid::new_v4(); +/// +/// commands.spawn(( +/// NetworkedEntity::new(node_id), +/// Transform::default(), +/// NetworkedTransform, +/// )); +/// } +/// ``` +#[derive(Component, Reflect, Debug, Clone, Copy, Default)] +#[reflect(Component)] +pub struct NetworkedTransform; + +/// Wrapper for a selection component using OR-Set semantics +/// +/// Tracks a set of selected entity network IDs. Uses OR-Set (Observed-Remove) +/// CRDT to handle concurrent add/remove operations correctly. +/// +/// # OR-Set Semantics +/// +/// - Concurrent adds and removes: add wins +/// - Each add has a unique operation ID +/// - Removes reference specific add operation IDs +/// +/// # Example +/// +/// ``` +/// use bevy::prelude::*; +/// use lib::networking::{NetworkedEntity, NetworkedSelection}; +/// use uuid::Uuid; +/// +/// fn create_selection(mut commands: Commands) { +/// let node_id = Uuid::new_v4(); +/// let mut selection = NetworkedSelection::new(); +/// +/// // Add some entities to the selection +/// selection.selected_ids.insert(Uuid::new_v4()); +/// selection.selected_ids.insert(Uuid::new_v4()); +/// +/// commands.spawn(( +/// NetworkedEntity::new(node_id), +/// selection, +/// )); +/// } +/// ``` +#[derive(Component, Reflect, Debug, Clone, Default)] +#[reflect(Component)] +pub struct NetworkedSelection { + /// Set of selected entity network IDs + /// + /// This will be synchronized using OR-Set CRDT semantics in later phases. + /// For now, it's a simple HashSet. + pub selected_ids: std::collections::HashSet, +} + +impl NetworkedSelection { + /// Create a new empty selection + pub fn new() -> Self { + Self { + selected_ids: std::collections::HashSet::new(), + } + } + + /// Add an entity to the selection + pub fn add(&mut self, entity_id: uuid::Uuid) { + self.selected_ids.insert(entity_id); + } + + /// Remove an entity from the selection + pub fn remove(&mut self, entity_id: uuid::Uuid) { + self.selected_ids.remove(&entity_id); + } + + /// Check if an entity is selected + pub fn contains(&self, entity_id: uuid::Uuid) -> bool { + self.selected_ids.contains(&entity_id) + } + + /// Clear all selections + pub fn clear(&mut self) { + self.selected_ids.clear(); + } + + /// Get the number of selected entities + pub fn len(&self) -> usize { + self.selected_ids.len() + } + + /// Check if the selection is empty + pub fn is_empty(&self) -> bool { + self.selected_ids.is_empty() + } +} + +/// Wrapper for a drawing path component using Sequence CRDT semantics +/// +/// Represents an ordered sequence of points that can be collaboratively edited. +/// Uses RGA (Replicated Growable Array) CRDT to maintain consistent ordering +/// across concurrent insertions. +/// +/// # RGA Semantics +/// +/// - Each point has a unique operation ID +/// - Points reference the ID of the point they're inserted after +/// - Concurrent insertions maintain consistent ordering +/// +/// # Example +/// +/// ``` +/// use bevy::prelude::*; +/// use lib::networking::{NetworkedEntity, NetworkedDrawingPath}; +/// use uuid::Uuid; +/// +/// fn create_path(mut commands: Commands) { +/// let node_id = Uuid::new_v4(); +/// let mut path = NetworkedDrawingPath::new(); +/// +/// // Add some points to the path +/// path.points.push(Vec2::new(0.0, 0.0)); +/// path.points.push(Vec2::new(10.0, 10.0)); +/// path.points.push(Vec2::new(20.0, 5.0)); +/// +/// commands.spawn(( +/// NetworkedEntity::new(node_id), +/// path, +/// )); +/// } +/// ``` +#[derive(Component, Reflect, Debug, Clone, Default)] +#[reflect(Component)] +pub struct NetworkedDrawingPath { + /// Ordered sequence of points in the path + /// + /// This will be synchronized using RGA (Sequence CRDT) semantics in later + /// phases. For now, it's a simple Vec. + pub points: Vec, + + /// Drawing stroke color + pub color: Color, + + /// Stroke width + pub width: f32, +} + +impl NetworkedDrawingPath { + /// Create a new empty drawing path + pub fn new() -> Self { + Self { + points: Vec::new(), + color: Color::BLACK, + width: 2.0, + } + } + + /// Create a path with a specific color and width + pub fn with_style(color: Color, width: f32) -> Self { + Self { + points: Vec::new(), + color, + width, + } + } + + /// Add a point to the end of the path + pub fn push(&mut self, point: Vec2) { + self.points.push(point); + } + + /// Get the number of points in the path + pub fn len(&self) -> usize { + self.points.len() + } + + /// Check if the path is empty + pub fn is_empty(&self) -> bool { + self.points.is_empty() + } + + /// Clear all points from the path + pub fn clear(&mut self) { + self.points.clear(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_networked_entity_new() { + let node_id = uuid::Uuid::new_v4(); + let entity = NetworkedEntity::new(node_id); + + assert_eq!(entity.owner_node_id, node_id); + assert_ne!(entity.network_id, uuid::Uuid::nil()); + } + + #[test] + fn test_networked_entity_with_id() { + let network_id = uuid::Uuid::new_v4(); + let owner_id = uuid::Uuid::new_v4(); + let entity = NetworkedEntity::with_id(network_id, owner_id); + + assert_eq!(entity.network_id, network_id); + assert_eq!(entity.owner_node_id, owner_id); + } + + #[test] + fn test_networked_entity_is_owned_by() { + let owner_id = uuid::Uuid::new_v4(); + let other_id = uuid::Uuid::new_v4(); + let entity = NetworkedEntity::new(owner_id); + + assert!(entity.is_owned_by(owner_id)); + assert!(!entity.is_owned_by(other_id)); + } + + #[test] + fn test_networked_selection() { + let mut selection = NetworkedSelection::new(); + let id1 = uuid::Uuid::new_v4(); + let id2 = uuid::Uuid::new_v4(); + + assert!(selection.is_empty()); + + selection.add(id1); + assert_eq!(selection.len(), 1); + assert!(selection.contains(id1)); + + selection.add(id2); + assert_eq!(selection.len(), 2); + assert!(selection.contains(id2)); + + selection.remove(id1); + assert_eq!(selection.len(), 1); + assert!(!selection.contains(id1)); + + selection.clear(); + assert!(selection.is_empty()); + } + + #[test] + fn test_networked_drawing_path() { + let mut path = NetworkedDrawingPath::new(); + + assert!(path.is_empty()); + + path.push(Vec2::new(0.0, 0.0)); + assert_eq!(path.len(), 1); + + path.push(Vec2::new(10.0, 10.0)); + assert_eq!(path.len(), 2); + + path.clear(); + assert!(path.is_empty()); + } + + #[test] + fn test_drawing_path_with_style() { + let path = NetworkedDrawingPath::with_style(Color::srgb(1.0, 0.0, 0.0), 5.0); + + assert_eq!(path.color, Color::srgb(1.0, 0.0, 0.0)); + assert_eq!(path.width, 5.0); + } +} diff --git a/crates/lib/src/networking/delta_generation.rs b/crates/lib/src/networking/delta_generation.rs new file mode 100644 index 0000000..c4ea718 --- /dev/null +++ b/crates/lib/src/networking/delta_generation.rs @@ -0,0 +1,193 @@ +//! Delta generation system for broadcasting entity changes +//! +//! This module implements the core delta generation logic that detects changed +//! entities and broadcasts EntityDelta messages. + +use bevy::prelude::*; + +use crate::networking::{ + change_detection::LastSyncVersions, + entity_map::NetworkEntityMap, + gossip_bridge::GossipBridge, + messages::{ + EntityDelta, + SyncMessage, + VersionedMessage, + }, + operation_builder::build_entity_operations, + vector_clock::{ + NodeId, + VectorClock, + }, + NetworkedEntity, +}; + +/// Resource wrapping our node's vector clock +/// +/// This tracks the logical time for our local operations. +#[derive(Resource)] +pub struct NodeVectorClock { + pub node_id: NodeId, + pub clock: VectorClock, +} + +impl NodeVectorClock { + pub fn new(node_id: NodeId) -> Self { + Self { + node_id, + clock: VectorClock::new(), + } + } + + /// Increment our clock for a new operation + pub fn tick(&mut self) -> u64 { + self.clock.increment(self.node_id) + } + + /// Get current sequence number for our node + pub fn sequence(&self) -> u64 { + self.clock.get(self.node_id) + } +} + +/// System to generate and broadcast EntityDelta messages +/// +/// This system: +/// 1. Queries for Changed +/// 2. Serializes all components on those entities +/// 3. Builds EntityDelta messages +/// 4. Broadcasts via GossipBridge +/// +/// Add this to your app to enable delta broadcasting: +/// +/// ```no_run +/// use bevy::prelude::*; +/// use lib::networking::generate_delta_system; +/// +/// App::new() +/// .add_systems(Update, generate_delta_system); +/// ``` +pub fn generate_delta_system( + query: Query<(Entity, &NetworkedEntity), Changed>, + world: &World, + type_registry: Res, + mut node_clock: ResMut, + mut last_versions: ResMut, + bridge: Option>, + _entity_map: Res, + mut operation_log: Option>, +) { + // Early return if no gossip bridge + let Some(bridge) = bridge else { + return; + }; + + let registry = type_registry.read(); + + for (entity, networked) in query.iter() { + // Check if we should sync this entity + let current_seq = node_clock.sequence(); + if !last_versions.should_sync(networked.network_id, current_seq) { + continue; + } + + // Increment our vector clock + node_clock.tick(); + + // Build operations for all components + // TODO: Add BlobStore support in future phases + let operations = build_entity_operations( + entity, + world, + node_clock.node_id, + node_clock.clock.clone(), + ®istry, + None, // blob_store - will be added in later phases + ); + + if operations.is_empty() { + continue; + } + + // Create EntityDelta + let delta = EntityDelta::new( + networked.network_id, + node_clock.node_id, + node_clock.clock.clone(), + operations, + ); + + // Record in operation log for anti-entropy + if let Some(ref mut log) = operation_log { + log.record_operation(delta.clone()); + } + + // Wrap in VersionedMessage + let message = VersionedMessage::new(SyncMessage::EntityDelta { + entity_id: delta.entity_id, + node_id: delta.node_id, + vector_clock: delta.vector_clock.clone(), + operations: delta.operations.clone(), + }); + + // Broadcast + if let Err(e) = bridge.send(message) { + error!("Failed to broadcast EntityDelta: {}", e); + } else { + debug!( + "Broadcast EntityDelta for entity {:?} with {} operations", + networked.network_id, + delta.operations.len() + ); + + // Update last sync version + last_versions.update(networked.network_id, current_seq); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_node_vector_clock_creation() { + let node_id = uuid::Uuid::new_v4(); + let clock = NodeVectorClock::new(node_id); + + assert_eq!(clock.node_id, node_id); + assert_eq!(clock.sequence(), 0); + } + + #[test] + fn test_node_vector_clock_tick() { + let node_id = uuid::Uuid::new_v4(); + let mut clock = NodeVectorClock::new(node_id); + + assert_eq!(clock.tick(), 1); + assert_eq!(clock.sequence(), 1); + + assert_eq!(clock.tick(), 2); + assert_eq!(clock.sequence(), 2); + } + + #[test] + fn test_node_vector_clock_multiple_nodes() { + let node1 = uuid::Uuid::new_v4(); + let node2 = uuid::Uuid::new_v4(); + + let mut clock1 = NodeVectorClock::new(node1); + let mut clock2 = NodeVectorClock::new(node2); + + clock1.tick(); + clock2.tick(); + + assert_eq!(clock1.sequence(), 1); + assert_eq!(clock2.sequence(), 1); + + // Merge clocks + clock1.clock.merge(&clock2.clock); + assert_eq!(clock1.clock.get(node1), 1); + assert_eq!(clock1.clock.get(node2), 1); + } +} diff --git a/crates/lib/src/networking/entity_map.rs b/crates/lib/src/networking/entity_map.rs new file mode 100644 index 0000000..c24c81d --- /dev/null +++ b/crates/lib/src/networking/entity_map.rs @@ -0,0 +1,438 @@ +//! Bidirectional mapping between network IDs and Bevy entities +//! +//! This module provides efficient lookup in both directions: +//! - network_id → Entity (when receiving remote operations) +//! - Entity → network_id (when broadcasting local changes) + +use std::collections::HashMap; + +use bevy::prelude::*; + +/// Bidirectional mapping between network IDs and Bevy entities +/// +/// This resource maintains two HashMaps for O(1) lookup in both directions. +/// It's updated automatically by the networking systems when entities are +/// spawned or despawned. +/// +/// # Thread Safety +/// +/// This is a Bevy Resource, so it's automatically synchronized across systems. +/// +/// # Example +/// +/// ``` +/// use bevy::prelude::*; +/// use lib::networking::{NetworkEntityMap, NetworkedEntity}; +/// use uuid::Uuid; +/// +/// fn example_system( +/// mut map: ResMut, +/// query: Query<(Entity, &NetworkedEntity)>, +/// ) { +/// // Register networked entities +/// for (entity, networked) in query.iter() { +/// map.insert(networked.network_id, entity); +/// } +/// +/// // Later, look up by network ID +/// let network_id = Uuid::new_v4(); +/// if let Some(entity) = map.get_entity(network_id) { +/// println!("Found entity: {:?}", entity); +/// } +/// } +/// ``` +#[derive(Resource, Default, Debug)] +pub struct NetworkEntityMap { + /// Map from network ID to Bevy Entity + network_id_to_entity: HashMap, + + /// Map from Bevy Entity to network ID + entity_to_network_id: HashMap, +} + +impl NetworkEntityMap { + /// Create a new empty entity map + pub fn new() -> Self { + Self { + network_id_to_entity: HashMap::new(), + entity_to_network_id: HashMap::new(), + } + } + + /// Insert a bidirectional mapping + /// + /// If the network_id or entity already exists in the map, the old mapping + /// is removed first to maintain consistency. + /// + /// # Example + /// + /// ``` + /// use bevy::prelude::*; + /// use lib::networking::NetworkEntityMap; + /// use uuid::Uuid; + /// + /// # let mut world = World::new(); + /// # let entity = world.spawn_empty().id(); + /// let mut map = NetworkEntityMap::new(); + /// let network_id = Uuid::new_v4(); + /// + /// map.insert(network_id, entity); + /// assert_eq!(map.get_entity(network_id), Some(entity)); + /// assert_eq!(map.get_network_id(entity), Some(network_id)); + /// ``` + pub fn insert(&mut self, network_id: uuid::Uuid, entity: Entity) { + // Remove old mappings if they exist + if let Some(old_entity) = self.network_id_to_entity.get(&network_id) { + self.entity_to_network_id.remove(old_entity); + } + if let Some(old_network_id) = self.entity_to_network_id.get(&entity) { + self.network_id_to_entity.remove(old_network_id); + } + + // Insert new mappings + self.network_id_to_entity.insert(network_id, entity); + self.entity_to_network_id.insert(entity, network_id); + } + + /// Get the Bevy Entity for a network ID + /// + /// Returns None if the network ID is not in the map. + /// + /// # Example + /// + /// ``` + /// use bevy::prelude::*; + /// use lib::networking::NetworkEntityMap; + /// use uuid::Uuid; + /// + /// # let mut world = World::new(); + /// # let entity = world.spawn_empty().id(); + /// let mut map = NetworkEntityMap::new(); + /// let network_id = Uuid::new_v4(); + /// + /// map.insert(network_id, entity); + /// assert_eq!(map.get_entity(network_id), Some(entity)); + /// + /// let unknown_id = Uuid::new_v4(); + /// assert_eq!(map.get_entity(unknown_id), None); + /// ``` + pub fn get_entity(&self, network_id: uuid::Uuid) -> Option { + self.network_id_to_entity.get(&network_id).copied() + } + + /// Get the network ID for a Bevy Entity + /// + /// Returns None if the entity is not in the map. + /// + /// # Example + /// + /// ``` + /// use bevy::prelude::*; + /// use lib::networking::NetworkEntityMap; + /// use uuid::Uuid; + /// + /// # let mut world = World::new(); + /// # let entity = world.spawn_empty().id(); + /// let mut map = NetworkEntityMap::new(); + /// let network_id = Uuid::new_v4(); + /// + /// map.insert(network_id, entity); + /// assert_eq!(map.get_network_id(entity), Some(network_id)); + /// + /// # let unknown_entity = world.spawn_empty().id(); + /// assert_eq!(map.get_network_id(unknown_entity), None); + /// ``` + pub fn get_network_id(&self, entity: Entity) -> Option { + self.entity_to_network_id.get(&entity).copied() + } + + /// Remove a mapping by network ID + /// + /// Returns the Entity that was mapped to this network ID, if any. + /// + /// # Example + /// + /// ``` + /// use bevy::prelude::*; + /// use lib::networking::NetworkEntityMap; + /// use uuid::Uuid; + /// + /// # let mut world = World::new(); + /// # let entity = world.spawn_empty().id(); + /// let mut map = NetworkEntityMap::new(); + /// let network_id = Uuid::new_v4(); + /// + /// map.insert(network_id, entity); + /// assert_eq!(map.remove_by_network_id(network_id), Some(entity)); + /// assert_eq!(map.get_entity(network_id), None); + /// ``` + pub fn remove_by_network_id(&mut self, network_id: uuid::Uuid) -> Option { + if let Some(entity) = self.network_id_to_entity.remove(&network_id) { + self.entity_to_network_id.remove(&entity); + Some(entity) + } else { + None + } + } + + /// Remove a mapping by Entity + /// + /// Returns the network ID that was mapped to this entity, if any. + /// + /// # Example + /// + /// ``` + /// use bevy::prelude::*; + /// use lib::networking::NetworkEntityMap; + /// use uuid::Uuid; + /// + /// # let mut world = World::new(); + /// # let entity = world.spawn_empty().id(); + /// let mut map = NetworkEntityMap::new(); + /// let network_id = Uuid::new_v4(); + /// + /// map.insert(network_id, entity); + /// assert_eq!(map.remove_by_entity(entity), Some(network_id)); + /// assert_eq!(map.get_network_id(entity), None); + /// ``` + pub fn remove_by_entity(&mut self, entity: Entity) -> Option { + if let Some(network_id) = self.entity_to_network_id.remove(&entity) { + self.network_id_to_entity.remove(&network_id); + Some(network_id) + } else { + None + } + } + + /// Check if a network ID exists in the map + pub fn contains_network_id(&self, network_id: uuid::Uuid) -> bool { + self.network_id_to_entity.contains_key(&network_id) + } + + /// Check if an entity exists in the map + pub fn contains_entity(&self, entity: Entity) -> bool { + self.entity_to_network_id.contains_key(&entity) + } + + /// Get the number of mapped entities + pub fn len(&self) -> usize { + self.network_id_to_entity.len() + } + + /// Check if the map is empty + pub fn is_empty(&self) -> bool { + self.network_id_to_entity.is_empty() + } + + /// Clear all mappings + pub fn clear(&mut self) { + self.network_id_to_entity.clear(); + self.entity_to_network_id.clear(); + } + + /// Get an iterator over all (network_id, entity) pairs + pub fn iter(&self) -> impl Iterator { + self.network_id_to_entity.iter() + } + + /// Get all network IDs + pub fn network_ids(&self) -> impl Iterator { + self.network_id_to_entity.keys() + } + + /// Get all entities + pub fn entities(&self) -> impl Iterator { + self.entity_to_network_id.keys() + } +} + +/// System to automatically register NetworkedEntity components in the map +/// +/// This system runs in PostUpdate to catch newly spawned networked entities +/// and add them to the NetworkEntityMap. +/// +/// Add this to your app: +/// ```no_run +/// use bevy::prelude::*; +/// use lib::networking::register_networked_entities_system; +/// +/// App::new() +/// .add_systems(PostUpdate, register_networked_entities_system); +/// ``` +pub fn register_networked_entities_system( + mut map: ResMut, + query: Query<(Entity, &crate::networking::NetworkedEntity), Added>, +) { + for (entity, networked) in query.iter() { + map.insert(networked.network_id, entity); + } +} + +/// System to automatically unregister despawned entities from the map +/// +/// This system cleans up the NetworkEntityMap when networked entities are +/// despawned. +/// +/// Add this to your app: +/// ```no_run +/// use bevy::prelude::*; +/// use lib::networking::cleanup_despawned_entities_system; +/// +/// App::new() +/// .add_systems(PostUpdate, cleanup_despawned_entities_system); +/// ``` +pub fn cleanup_despawned_entities_system( + mut map: ResMut, + mut removed: RemovedComponents, +) { + for entity in removed.read() { + map.remove_by_entity(entity); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_insert_and_get() { + let mut map = NetworkEntityMap::new(); + let mut world = World::new(); + let entity = world.spawn_empty().id(); + let network_id = uuid::Uuid::new_v4(); + + map.insert(network_id, entity); + + assert_eq!(map.get_entity(network_id), Some(entity)); + assert_eq!(map.get_network_id(entity), Some(network_id)); + } + + #[test] + fn test_get_nonexistent() { + let map = NetworkEntityMap::new(); + let mut world = World::new(); + let entity = world.spawn_empty().id(); + let network_id = uuid::Uuid::new_v4(); + + assert_eq!(map.get_entity(network_id), None); + assert_eq!(map.get_network_id(entity), None); + } + + #[test] + fn test_remove_by_network_id() { + let mut map = NetworkEntityMap::new(); + let mut world = World::new(); + let entity = world.spawn_empty().id(); + let network_id = uuid::Uuid::new_v4(); + + map.insert(network_id, entity); + assert_eq!(map.remove_by_network_id(network_id), Some(entity)); + assert_eq!(map.get_entity(network_id), None); + assert_eq!(map.get_network_id(entity), None); + } + + #[test] + fn test_remove_by_entity() { + let mut map = NetworkEntityMap::new(); + let mut world = World::new(); + let entity = world.spawn_empty().id(); + let network_id = uuid::Uuid::new_v4(); + + map.insert(network_id, entity); + assert_eq!(map.remove_by_entity(entity), Some(network_id)); + assert_eq!(map.get_entity(network_id), None); + assert_eq!(map.get_network_id(entity), None); + } + + #[test] + fn test_contains() { + let mut map = NetworkEntityMap::new(); + let mut world = World::new(); + let entity = world.spawn_empty().id(); + let network_id = uuid::Uuid::new_v4(); + + assert!(!map.contains_network_id(network_id)); + assert!(!map.contains_entity(entity)); + + map.insert(network_id, entity); + + assert!(map.contains_network_id(network_id)); + assert!(map.contains_entity(entity)); + } + + #[test] + fn test_len_and_is_empty() { + let mut map = NetworkEntityMap::new(); + let mut world = World::new(); + + assert!(map.is_empty()); + assert_eq!(map.len(), 0); + + let entity1 = world.spawn_empty().id(); + let id1 = uuid::Uuid::new_v4(); + map.insert(id1, entity1); + + assert!(!map.is_empty()); + assert_eq!(map.len(), 1); + + let entity2 = world.spawn_empty().id(); + let id2 = uuid::Uuid::new_v4(); + map.insert(id2, entity2); + + assert_eq!(map.len(), 2); + } + + #[test] + fn test_clear() { + let mut map = NetworkEntityMap::new(); + let mut world = World::new(); + let entity = world.spawn_empty().id(); + let network_id = uuid::Uuid::new_v4(); + + map.insert(network_id, entity); + assert_eq!(map.len(), 1); + + map.clear(); + assert!(map.is_empty()); + } + + #[test] + fn test_insert_overwrites_old_mapping() { + let mut map = NetworkEntityMap::new(); + let mut world = World::new(); + let entity1 = world.spawn_empty().id(); + let entity2 = world.spawn_empty().id(); + let network_id = uuid::Uuid::new_v4(); + + // Insert first mapping + map.insert(network_id, entity1); + assert_eq!(map.get_entity(network_id), Some(entity1)); + + // Insert same network_id with different entity + map.insert(network_id, entity2); + assert_eq!(map.get_entity(network_id), Some(entity2)); + assert_eq!(map.get_network_id(entity1), None); // Old mapping removed + assert_eq!(map.len(), 1); // Still only one mapping + } + + #[test] + fn test_iter() { + let mut map = NetworkEntityMap::new(); + let mut world = World::new(); + let entity1 = world.spawn_empty().id(); + let entity2 = world.spawn_empty().id(); + let id1 = uuid::Uuid::new_v4(); + let id2 = uuid::Uuid::new_v4(); + + map.insert(id1, entity1); + map.insert(id2, entity2); + + let mut count = 0; + for (network_id, entity) in map.iter() { + assert!(network_id == &id1 || network_id == &id2); + assert!(entity == &entity1 || entity == &entity2); + count += 1; + } + assert_eq!(count, 2); + } +} diff --git a/crates/lib/src/networking/error.rs b/crates/lib/src/networking/error.rs new file mode 100644 index 0000000..0fc37b1 --- /dev/null +++ b/crates/lib/src/networking/error.rs @@ -0,0 +1,77 @@ +//! Error types for the networking layer + +use std::fmt; + +/// Result type for networking operations +pub type Result = std::result::Result; + +/// Errors that can occur in the networking layer +#[derive(Debug)] +pub enum NetworkingError { + /// Serialization error + Serialization(String), + + /// Deserialization error + Deserialization(String), + + /// Gossip error (iroh-gossip) + Gossip(String), + + /// Blob transfer error (iroh-blobs) + Blob(String), + + /// Entity not found in network map + EntityNotFound(uuid::Uuid), + + /// Vector clock comparison failed + VectorClockError(String), + + /// CRDT merge conflict + MergeConflict(String), + + /// Invalid message format + InvalidMessage(String), + + /// Authentication/security error + SecurityError(String), + + /// Rate limit exceeded + RateLimitExceeded, + + /// Other networking errors + Other(String), +} + +impl fmt::Display for NetworkingError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + | NetworkingError::Serialization(msg) => write!(f, "Serialization error: {}", msg), + | NetworkingError::Deserialization(msg) => { + write!(f, "Deserialization error: {}", msg) + }, + | NetworkingError::Gossip(msg) => write!(f, "Gossip error: {}", msg), + | NetworkingError::Blob(msg) => write!(f, "Blob transfer error: {}", msg), + | NetworkingError::EntityNotFound(id) => write!(f, "Entity not found: {}", id), + | NetworkingError::VectorClockError(msg) => write!(f, "Vector clock error: {}", msg), + | NetworkingError::MergeConflict(msg) => write!(f, "CRDT merge conflict: {}", msg), + | NetworkingError::InvalidMessage(msg) => write!(f, "Invalid message: {}", msg), + | NetworkingError::SecurityError(msg) => write!(f, "Security error: {}", msg), + | NetworkingError::RateLimitExceeded => write!(f, "Rate limit exceeded"), + | NetworkingError::Other(msg) => write!(f, "{}", msg), + } + } +} + +impl std::error::Error for NetworkingError {} + +impl From for NetworkingError { + fn from(e: bincode::Error) -> Self { + NetworkingError::Serialization(e.to_string()) + } +} + +impl From for NetworkingError { + fn from(e: crate::persistence::PersistenceError) -> Self { + NetworkingError::Other(format!("Persistence error: {}", e)) + } +} diff --git a/crates/lib/src/networking/gossip_bridge.rs b/crates/lib/src/networking/gossip_bridge.rs new file mode 100644 index 0000000..3d36d20 --- /dev/null +++ b/crates/lib/src/networking/gossip_bridge.rs @@ -0,0 +1,142 @@ +//! Async-to-sync bridge for iroh-gossip integration with Bevy +//! +//! This module provides the bridge between Bevy's synchronous ECS world and +//! iroh-gossip's async runtime. It uses channels to pass messages between the +//! async tokio tasks and Bevy systems. +//! +//! **NOTE:** This is a simplified implementation for Phase 3. Full gossip +//! integration will be completed in later phases. + +use std::{ + collections::VecDeque, + sync::{ + Arc, + Mutex, + }, +}; + +use bevy::prelude::*; + +use crate::networking::{ + error::{ + NetworkingError, + Result, + }, + messages::VersionedMessage, + vector_clock::NodeId, +}; + +/// Bevy resource wrapping the gossip bridge +/// +/// This resource provides the interface between Bevy systems and the async +/// gossip network. Systems can send messages via `send()` and poll for +/// incoming messages via `try_recv()`. +#[derive(Resource, Clone)] +pub struct GossipBridge { + /// Queue for outgoing messages + outgoing: Arc>>, + + /// Queue for incoming messages + incoming: Arc>>, + + /// Our node ID + pub node_id: NodeId, +} + +impl GossipBridge { + /// Create a new gossip bridge + pub fn new(node_id: NodeId) -> Self { + Self { + outgoing: Arc::new(Mutex::new(VecDeque::new())), + incoming: Arc::new(Mutex::new(VecDeque::new())), + node_id, + } + } + + /// Send a message to the gossip network + pub fn send(&self, message: VersionedMessage) -> Result<()> { + self.outgoing + .lock() + .map_err(|e| NetworkingError::Gossip(format!("Failed to lock outgoing queue: {}", e)))? + .push_back(message); + Ok(()) + } + + /// Try to receive a message from the gossip network + pub fn try_recv(&self) -> Option { + self.incoming.lock().ok()?.pop_front() + } + + /// Get our node ID + pub fn node_id(&self) -> NodeId { + self.node_id + } +} + +/// Initialize the gossip bridge +pub fn init_gossip_bridge(node_id: NodeId) -> GossipBridge { + info!("Initializing gossip bridge for node: {}", node_id); + GossipBridge::new(node_id) +} + +/// Bevy system to broadcast outgoing messages +pub fn broadcast_messages_system(/* will be implemented in later phases */) { + // This will be populated when we have delta generation +} + +/// Bevy system to receive incoming messages +/// +/// **Note:** This is deprecated in favor of `receive_and_apply_deltas_system` +/// which provides full CRDT merge semantics. This stub remains for backward +/// compatibility. +pub fn receive_messages_system(bridge: Option>) { + let Some(bridge) = bridge else { + return; + }; + + // Poll for incoming messages + while let Some(message) = bridge.try_recv() { + // For now, just log the message + debug!("Received message: {:?}", message.message); + + // Use receive_and_apply_deltas_system for full functionality + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_gossip_bridge_creation() { + let node_id = uuid::Uuid::new_v4(); + let bridge = GossipBridge::new(node_id); + + assert_eq!(bridge.node_id(), node_id); + } + + #[test] + fn test_send_message() { + use crate::networking::SyncMessage; + + let node_id = uuid::Uuid::new_v4(); + let bridge = GossipBridge::new(node_id); + + let message = SyncMessage::JoinRequest { + node_id, + session_secret: None, + }; + let versioned = VersionedMessage::new(message); + + let result = bridge.send(versioned); + assert!(result.is_ok()); + } + + #[test] + fn test_try_recv_empty() { + let node_id = uuid::Uuid::new_v4(); + let bridge = GossipBridge::new(node_id); + + assert!(bridge.try_recv().is_none()); + } +} diff --git a/crates/lib/src/networking/join_protocol.rs b/crates/lib/src/networking/join_protocol.rs new file mode 100644 index 0000000..d7221a1 --- /dev/null +++ b/crates/lib/src/networking/join_protocol.rs @@ -0,0 +1,509 @@ +//! Join protocol for new peer onboarding +//! +//! This module handles the protocol for new peers to join an existing session +//! and receive the full world state. The join flow: +//! +//! 1. New peer sends JoinRequest with node ID and optional session secret +//! 2. Existing peer validates request and responds with FullState +//! 3. New peer applies FullState to initialize local world +//! 4. New peer begins participating in delta synchronization +//! +//! **NOTE:** This is a simplified implementation for Phase 7. Full security +//! and session management will be enhanced in Phase 13. + +use bevy::{ + prelude::*, + reflect::TypeRegistry, +}; + +use crate::networking::{ + blob_support::BlobStore, + delta_generation::NodeVectorClock, + entity_map::NetworkEntityMap, + messages::{ + EntityState, + SyncMessage, + VersionedMessage, + }, + GossipBridge, + NetworkedEntity, +}; + +/// Session secret for join authentication +/// +/// In Phase 7, this is optional. Phase 13 will add full authentication. +pub type SessionSecret = Vec; + +/// Build a JoinRequest message +/// +/// # Example +/// +/// ``` +/// use lib::networking::build_join_request; +/// use uuid::Uuid; +/// +/// let node_id = Uuid::new_v4(); +/// let request = build_join_request(node_id, None); +/// ``` +pub fn build_join_request(node_id: uuid::Uuid, session_secret: Option) -> VersionedMessage { + VersionedMessage::new(SyncMessage::JoinRequest { + node_id, + session_secret, + }) +} + +/// Build a FullState message containing all networked entities +/// +/// This serializes the entire world state for a new peer. Large worlds may +/// take significant bandwidth - Phase 14 will add compression. +/// +/// # Parameters +/// +/// - `world`: Bevy world containing entities +/// - `query`: Query for all NetworkedEntity components +/// - `type_registry`: Type registry for serialization +/// - `node_clock`: Current node vector clock +/// - `blob_store`: Optional blob store for large components +/// +/// # Returns +/// +/// A FullState message ready to send to the joining peer +pub fn build_full_state( + world: &World, + networked_entities: &Query<(Entity, &NetworkedEntity)>, + type_registry: &TypeRegistry, + node_clock: &NodeVectorClock, + blob_store: Option<&BlobStore>, +) -> VersionedMessage { + use crate::{ + networking::{ + blob_support::create_component_data, + messages::ComponentState, + }, + persistence::reflection::serialize_component, + }; + + let mut entities = Vec::new(); + + for (entity, networked) in networked_entities.iter() { + let entity_ref = world.entity(entity); + let mut components = Vec::new(); + + // Iterate over all type registrations to find components + for registration in type_registry.iter() { + // Skip if no ReflectComponent data + let Some(reflect_component) = registration.data::() else { + continue; + }; + + let type_path = registration.type_info().type_path(); + + // Skip networked wrapper components + if type_path.ends_with("::NetworkedEntity") + || type_path.ends_with("::NetworkedTransform") + || type_path.ends_with("::NetworkedSelection") + || type_path.ends_with("::NetworkedDrawingPath") + { + continue; + } + + // Try to reflect this component from the entity + if let Some(reflected) = reflect_component.reflect(entity_ref) { + // Serialize the component + if let Ok(serialized) = serialize_component(reflected, type_registry) { + // Create component data (inline or blob) + let data = if let Some(store) = blob_store { + match create_component_data(serialized, store) { + Ok(d) => d, + Err(_) => continue, + } + } else { + crate::networking::ComponentData::Inline(serialized) + }; + + components.push(ComponentState { + component_type: type_path.to_string(), + data, + }); + } + } + } + + entities.push(EntityState { + entity_id: networked.network_id, + owner_node_id: networked.owner_node_id, + vector_clock: node_clock.clock.clone(), + components, + is_deleted: false, + }); + } + + info!( + "Built FullState with {} entities for new peer", + entities.len() + ); + + VersionedMessage::new(SyncMessage::FullState { + entities, + vector_clock: node_clock.clock.clone(), + }) +} + +/// Apply a FullState message to the local world +/// +/// This initializes the world for a newly joined peer by spawning all entities +/// and applying their component state. +/// +/// # Parameters +/// +/// - `entities`: List of entity states from FullState message +/// - `vector_clock`: Vector clock from FullState +/// - `commands`: Bevy commands for spawning entities +/// - `entity_map`: Entity map to populate +/// - `type_registry`: Type registry for deserialization +/// - `node_clock`: Our node's vector clock to update +/// - `blob_store`: Optional blob store for resolving blob references +/// - `tombstone_registry`: Optional tombstone registry for deletion tracking +pub fn apply_full_state( + entities: Vec, + remote_clock: crate::networking::VectorClock, + commands: &mut Commands, + entity_map: &mut NetworkEntityMap, + type_registry: &TypeRegistry, + node_clock: &mut NodeVectorClock, + blob_store: Option<&BlobStore>, + mut tombstone_registry: Option<&mut crate::networking::TombstoneRegistry>, +) { + use crate::{ + networking::blob_support::get_component_data, + persistence::reflection::deserialize_component, + }; + + info!("Applying FullState with {} entities", entities.len()); + + // Merge the remote vector clock + node_clock.clock.merge(&remote_clock); + + // Spawn all entities and apply their state + for entity_state in entities { + // Handle deleted entities (tombstones) + if entity_state.is_deleted { + // Record tombstone + if let Some(ref mut registry) = tombstone_registry { + registry.record_deletion( + entity_state.entity_id, + entity_state.owner_node_id, + entity_state.vector_clock.clone(), + ); + } + continue; + } + + // Spawn entity with NetworkedEntity component + let entity = commands + .spawn(NetworkedEntity::with_id( + entity_state.entity_id, + entity_state.owner_node_id, + )) + .id(); + + // Register in entity map + entity_map.insert(entity_state.entity_id, entity); + + let num_components = entity_state.components.len(); + + // Apply all components + for component_state in &entity_state.components { + // Get the actual data (resolve blob if needed) + let data_bytes = match &component_state.data { + | crate::networking::ComponentData::Inline(bytes) => bytes.clone(), + | blob_ref @ crate::networking::ComponentData::BlobRef { .. } => { + if let Some(store) = blob_store { + match get_component_data(blob_ref, store) { + Ok(bytes) => bytes, + Err(e) => { + error!( + "Failed to retrieve blob for {}: {}", + component_state.component_type, e + ); + continue; + } + } + } else { + error!( + "Blob reference for {} but no blob store available", + component_state.component_type + ); + continue; + } + } + }; + + // Deserialize the component + let reflected = match deserialize_component(&data_bytes, type_registry) { + Ok(r) => r, + Err(e) => { + error!( + "Failed to deserialize {}: {}", + component_state.component_type, e + ); + continue; + } + }; + + // Get the type registration + let registration = match type_registry.get_with_type_path(&component_state.component_type) + { + Some(reg) => reg, + None => { + error!( + "Component type {} not registered", + component_state.component_type + ); + continue; + } + }; + + // Get ReflectComponent data + let reflect_component = match registration.data::() { + Some(rc) => rc.clone(), + None => { + error!( + "Component type {} does not have ReflectComponent data", + component_state.component_type + ); + continue; + } + }; + + // Insert the component + let component_type_owned = component_state.component_type.clone(); + commands.queue(move |world: &mut World| { + let type_registry_arc = { + let Some(type_registry_res) = world.get_resource::() else { + error!("AppTypeRegistry not found in world"); + return; + }; + type_registry_res.clone() + }; + + let type_registry = type_registry_arc.read(); + + if let Ok(mut entity_mut) = world.get_entity_mut(entity) { + reflect_component.insert(&mut entity_mut, &*reflected, &type_registry); + debug!("Applied component {} from FullState", component_type_owned); + } + }); + } + + debug!( + "Spawned entity {:?} from FullState with {} components", + entity_state.entity_id, + num_components + ); + } + + info!("FullState applied successfully"); +} + +/// System to handle JoinRequest messages +/// +/// When we receive a JoinRequest, build and send a FullState response. +/// +/// Add this to your app: +/// +/// ```no_run +/// use bevy::prelude::*; +/// use lib::networking::handle_join_requests_system; +/// +/// App::new() +/// .add_systems(Update, handle_join_requests_system); +/// ``` +pub fn handle_join_requests_system( + world: &World, + bridge: Option>, + networked_entities: Query<(Entity, &NetworkedEntity)>, + type_registry: Res, + node_clock: Res, + blob_store: Option>, +) { + let Some(bridge) = bridge else { + return; + }; + + let registry = type_registry.read(); + let blob_store_ref = blob_store.as_deref(); + + // Poll for incoming JoinRequest messages + while let Some(message) = bridge.try_recv() { + match message.message { + | SyncMessage::JoinRequest { + node_id, + session_secret, + } => { + info!("Received JoinRequest from node {}", node_id); + + // TODO: Validate session_secret in Phase 13 + if let Some(_secret) = session_secret { + debug!("Session secret validation not yet implemented"); + } + + // Build full state + let full_state = build_full_state( + world, + &networked_entities, + ®istry, + &node_clock, + blob_store_ref, + ); + + // Send full state to joining peer + if let Err(e) = bridge.send(full_state) { + error!("Failed to send FullState: {}", e); + } else { + info!("Sent FullState to node {}", node_id); + } + } + | _ => { + // Not a JoinRequest, ignore (other systems handle other messages) + } + } + } +} + +/// System to handle FullState messages +/// +/// When we receive a FullState (after sending JoinRequest), apply it to our world. +/// +/// This system should run BEFORE receive_and_apply_deltas_system to ensure +/// we're fully initialized before processing deltas. +pub fn handle_full_state_system( + mut commands: Commands, + bridge: Option>, + mut entity_map: ResMut, + type_registry: Res, + mut node_clock: ResMut, + blob_store: Option>, + mut tombstone_registry: Option>, +) { + let Some(bridge) = bridge else { + return; + }; + + let registry = type_registry.read(); + let blob_store_ref = blob_store.as_deref(); + + // Poll for FullState messages + while let Some(message) = bridge.try_recv() { + match message.message { + | SyncMessage::FullState { + entities, + vector_clock, + } => { + info!("Received FullState with {} entities", entities.len()); + + apply_full_state( + entities, + vector_clock, + &mut commands, + &mut entity_map, + ®istry, + &mut node_clock, + blob_store_ref, + tombstone_registry.as_deref_mut(), + ); + } + | _ => { + // Not a FullState, ignore + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::networking::VectorClock; + + #[test] + fn test_build_join_request() { + let node_id = uuid::Uuid::new_v4(); + let request = build_join_request(node_id, None); + + match request.message { + | SyncMessage::JoinRequest { + node_id: req_node_id, + session_secret, + } => { + assert_eq!(req_node_id, node_id); + assert!(session_secret.is_none()); + } + | _ => panic!("Expected JoinRequest"), + } + } + + #[test] + fn test_build_join_request_with_secret() { + let node_id = uuid::Uuid::new_v4(); + let secret = vec![1, 2, 3, 4]; + let request = build_join_request(node_id, Some(secret.clone())); + + match request.message { + | SyncMessage::JoinRequest { + node_id: _, + session_secret, + } => { + assert_eq!(session_secret, Some(secret)); + } + | _ => panic!("Expected JoinRequest"), + } + } + + #[test] + fn test_entity_state_structure() { + let entity_id = uuid::Uuid::new_v4(); + let owner_node_id = uuid::Uuid::new_v4(); + + let state = EntityState { + entity_id, + owner_node_id, + vector_clock: VectorClock::new(), + components: vec![], + is_deleted: false, + }; + + assert_eq!(state.entity_id, entity_id); + assert_eq!(state.owner_node_id, owner_node_id); + assert_eq!(state.components.len(), 0); + assert!(!state.is_deleted); + } + + #[test] + fn test_apply_full_state_empty() { + let node_id = uuid::Uuid::new_v4(); + let mut node_clock = NodeVectorClock::new(node_id); + let remote_clock = VectorClock::new(); + + // Create minimal setup for testing + let mut entity_map = NetworkEntityMap::new(); + let type_registry = TypeRegistry::new(); + + // Need a minimal Bevy app for Commands + let mut app = App::new(); + let mut commands = app.world_mut().commands(); + + apply_full_state( + vec![], + remote_clock.clone(), + &mut commands, + &mut entity_map, + &type_registry, + &mut node_clock, + None, + None, // tombstone_registry + ); + + // Should have merged clocks + assert_eq!(node_clock.clock, remote_clock); + } +} diff --git a/crates/lib/src/networking/merge.rs b/crates/lib/src/networking/merge.rs new file mode 100644 index 0000000..1e4d6c4 --- /dev/null +++ b/crates/lib/src/networking/merge.rs @@ -0,0 +1,263 @@ +//! CRDT merge logic for conflict resolution +//! +//! This module implements the merge semantics for different CRDT types: +//! - Last-Write-Wins (LWW) for simple components +//! - OR-Set for concurrent add/remove +//! - Sequence CRDT (RGA) for ordered lists + +use bevy::prelude::*; + +use crate::networking::{ + operations::ComponentOp, + vector_clock::{ + NodeId, + VectorClock, + }, +}; + +/// Result of comparing two operations for merge +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum MergeDecision { + /// The local operation wins (keep local, discard remote) + KeepLocal, + + /// The remote operation wins (apply remote, discard local) + ApplyRemote, + + /// Operations are concurrent, need CRDT-specific merge + Concurrent, + + /// Operations are identical + Equal, +} + +/// Compare two operations using vector clocks to determine merge decision +/// +/// This implements Last-Write-Wins (LWW) semantics with node ID tiebreaking. +/// +/// # Algorithm +/// +/// 1. If local happened-before remote: ApplyRemote +/// 2. If remote happened-before local: KeepLocal +/// 3. If concurrent: use node ID as tiebreaker (higher node ID wins) +/// 4. If equal: Equal +/// +/// # Example +/// +/// ``` +/// use lib::networking::{VectorClock, compare_operations_lww}; +/// use uuid::Uuid; +/// +/// let node1 = Uuid::new_v4(); +/// let node2 = Uuid::new_v4(); +/// +/// let mut clock1 = VectorClock::new(); +/// clock1.increment(node1); +/// +/// let mut clock2 = VectorClock::new(); +/// clock2.increment(node2); +/// +/// // Concurrent operations use node ID as tiebreaker +/// let decision = compare_operations_lww(&clock1, node1, &clock2, node2); +/// ``` +pub fn compare_operations_lww( + local_clock: &VectorClock, + local_node: NodeId, + remote_clock: &VectorClock, + remote_node: NodeId, +) -> MergeDecision { + // Check if clocks are equal + if local_clock == remote_clock && local_node == remote_node { + return MergeDecision::Equal; + } + + // Check happens-before relationship + if local_clock.happened_before(remote_clock) { + return MergeDecision::ApplyRemote; + } + + if remote_clock.happened_before(local_clock) { + return MergeDecision::KeepLocal; + } + + // Concurrent operations - use node ID as tiebreaker + // Higher node ID wins for deterministic resolution + if remote_node > local_node { + MergeDecision::ApplyRemote + } else if local_node > remote_node { + MergeDecision::KeepLocal + } else { + MergeDecision::Concurrent + } +} + +/// Determine if a remote Set operation should be applied +/// +/// This is a convenience wrapper around `compare_operations_lww` for Set +/// operations specifically. +pub fn should_apply_set(local_op: &ComponentOp, remote_op: &ComponentOp) -> bool { + // Extract vector clocks and node IDs + let (local_clock, local_data) = match local_op { + | ComponentOp::Set { + vector_clock, data, .. + } => (vector_clock, data), + | _ => return false, + }; + + let (remote_clock, remote_data) = match remote_op { + | ComponentOp::Set { + vector_clock, data, .. + } => (vector_clock, data), + | _ => return false, + }; + + // If data is identical, no need to apply + if local_data == remote_data { + return false; + } + + // Use the sequence number from the clocks as a simple tiebreaker + // In a real implementation, we'd use the full node IDs + let local_seq: u64 = local_clock.clocks.values().sum(); + let remote_seq: u64 = remote_clock.clocks.values().sum(); + + // Compare clocks + match compare_operations_lww( + local_clock, + uuid::Uuid::nil(), // Simplified - would use actual node IDs + remote_clock, + uuid::Uuid::nil(), + ) { + | MergeDecision::ApplyRemote => true, + | MergeDecision::KeepLocal => false, + | MergeDecision::Concurrent => remote_seq > local_seq, + | MergeDecision::Equal => false, + } +} + +/// Log a merge conflict for debugging +/// +/// This helps track when concurrent operations occur and how they're resolved. +pub fn log_merge_conflict( + component_type: &str, + local_clock: &VectorClock, + remote_clock: &VectorClock, + decision: MergeDecision, +) { + info!( + "Merge conflict on {}: local={:?}, remote={:?}, decision={:?}", + component_type, local_clock, remote_clock, decision + ); +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::networking::messages::ComponentData; + + #[test] + fn test_lww_happened_before() { + let node1 = uuid::Uuid::new_v4(); + let node2 = uuid::Uuid::new_v4(); + + let mut clock1 = VectorClock::new(); + clock1.increment(node1); + + let mut clock2 = VectorClock::new(); + clock2.increment(node1); + clock2.increment(node1); + + let decision = compare_operations_lww(&clock1, node1, &clock2, node2); + assert_eq!(decision, MergeDecision::ApplyRemote); + + let decision = compare_operations_lww(&clock2, node1, &clock1, node2); + assert_eq!(decision, MergeDecision::KeepLocal); + } + + #[test] + fn test_lww_concurrent() { + let node1 = uuid::Uuid::new_v4(); + let node2 = uuid::Uuid::new_v4(); + + let mut clock1 = VectorClock::new(); + clock1.increment(node1); + + let mut clock2 = VectorClock::new(); + clock2.increment(node2); + + // Concurrent operations use node ID tiebreaker + let decision = compare_operations_lww(&clock1, node1, &clock2, node2); + + // Should use node ID as tiebreaker + assert!( + decision == MergeDecision::ApplyRemote || decision == MergeDecision::KeepLocal + ); + } + + #[test] + fn test_lww_equal() { + let node1 = uuid::Uuid::new_v4(); + + let mut clock1 = VectorClock::new(); + clock1.increment(node1); + + let clock2 = clock1.clone(); + + let decision = compare_operations_lww(&clock1, node1, &clock2, node1); + assert_eq!(decision, MergeDecision::Equal); + } + + #[test] + fn test_should_apply_set_same_data() { + let node_id = uuid::Uuid::new_v4(); + let mut clock = VectorClock::new(); + clock.increment(node_id); + + let data = vec![1, 2, 3]; + + let op1 = ComponentOp::Set { + component_type: "Transform".to_string(), + data: ComponentData::Inline(data.clone()), + vector_clock: clock.clone(), + }; + + let op2 = ComponentOp::Set { + component_type: "Transform".to_string(), + data: ComponentData::Inline(data.clone()), + vector_clock: clock, + }; + + // Same data, should not apply + assert!(!should_apply_set(&op1, &op2)); + } + + #[test] + fn test_should_apply_set_newer_wins() { + let node_id = uuid::Uuid::new_v4(); + + let mut clock1 = VectorClock::new(); + clock1.increment(node_id); + + let mut clock2 = VectorClock::new(); + clock2.increment(node_id); + clock2.increment(node_id); + + let op1 = ComponentOp::Set { + component_type: "Transform".to_string(), + data: ComponentData::Inline(vec![1, 2, 3]), + vector_clock: clock1, + }; + + let op2 = ComponentOp::Set { + component_type: "Transform".to_string(), + data: ComponentData::Inline(vec![4, 5, 6]), + vector_clock: clock2, + }; + + // op2 is newer, should apply + assert!(should_apply_set(&op1, &op2)); + + // op1 is older, should not apply + assert!(!should_apply_set(&op2, &op1)); + } +} diff --git a/crates/lib/src/networking/message_dispatcher.rs b/crates/lib/src/networking/message_dispatcher.rs new file mode 100644 index 0000000..9c37a57 --- /dev/null +++ b/crates/lib/src/networking/message_dispatcher.rs @@ -0,0 +1,214 @@ +//! Message dispatcher for efficient message routing +//! +//! This module eliminates the DRY violation and O(n²) behavior from having +//! multiple systems each polling the same message queue. Instead, a single +//! dispatcher system polls once and routes messages to appropriate handlers. + +use bevy::prelude::*; + +use crate::networking::{ + apply_entity_delta, + apply_full_state, + blob_support::BlobStore, + build_full_state, + build_missing_deltas, + delta_generation::NodeVectorClock, + entity_map::NetworkEntityMap, + messages::SyncMessage, + operation_log::OperationLog, + GossipBridge, + NetworkedEntity, + TombstoneRegistry, +}; + +/// Central message dispatcher system +/// +/// This system replaces the individual message polling loops in: +/// - `receive_and_apply_deltas_system` +/// - `handle_join_requests_system` +/// - `handle_full_state_system` +/// - `handle_sync_requests_system` +/// - `handle_missing_deltas_system` +/// +/// By polling the message queue once and routing to handlers, we eliminate +/// O(n²) behavior and code duplication. +/// +/// # Performance +/// +/// - **Before**: O(n × m) where n = messages, m = systems (~5) +/// - **After**: O(n) - each message processed exactly once +/// +/// # Example +/// +/// ```no_run +/// use bevy::prelude::*; +/// use lib::networking::message_dispatcher_system; +/// +/// App::new() +/// .add_systems(Update, message_dispatcher_system); +/// ``` +pub fn message_dispatcher_system( + world: &World, + mut commands: Commands, + bridge: Option>, + mut entity_map: ResMut, + type_registry: Res, + mut node_clock: ResMut, + blob_store: Option>, + mut tombstone_registry: Option>, + operation_log: Option>, + networked_entities: Query<(Entity, &NetworkedEntity)>, +) { + let Some(bridge) = bridge else { + return; + }; + + let registry = type_registry.read(); + let blob_store_ref = blob_store.as_deref(); + + // Poll messages once and route to appropriate handlers + while let Some(message) = bridge.try_recv() { + match message.message { + // EntityDelta - apply remote operations + | SyncMessage::EntityDelta { + entity_id, + node_id, + vector_clock, + operations, + } => { + let delta = crate::networking::EntityDelta { + entity_id, + node_id, + vector_clock, + operations, + }; + + debug!( + "Received EntityDelta for entity {:?} with {} operations", + delta.entity_id, + delta.operations.len() + ); + + apply_entity_delta( + &delta, + &mut commands, + &mut entity_map, + ®istry, + &mut node_clock, + blob_store_ref, + tombstone_registry.as_deref_mut(), + ); + } + + // JoinRequest - new peer joining + | SyncMessage::JoinRequest { + node_id, + session_secret, + } => { + info!("Received JoinRequest from node {}", node_id); + + // TODO: Validate session_secret in Phase 13 + if let Some(_secret) = session_secret { + debug!("Session secret validation not yet implemented"); + } + + // Build and send full state + let full_state = build_full_state( + world, + &networked_entities, + ®istry, + &node_clock, + blob_store_ref, + ); + + if let Err(e) = bridge.send(full_state) { + error!("Failed to send FullState: {}", e); + } else { + info!("Sent FullState to node {}", node_id); + } + } + + // FullState - receiving world state after join + | SyncMessage::FullState { + entities, + vector_clock, + } => { + info!("Received FullState with {} entities", entities.len()); + + apply_full_state( + entities, + vector_clock, + &mut commands, + &mut entity_map, + ®istry, + &mut node_clock, + blob_store_ref, + tombstone_registry.as_deref_mut(), + ); + } + + // SyncRequest - peer requesting missing operations + | SyncMessage::SyncRequest { + node_id: requesting_node, + vector_clock: their_clock, + } => { + debug!("Received SyncRequest from node {}", requesting_node); + + if let Some(ref op_log) = operation_log { + // Find operations they're missing + let missing_deltas = op_log.get_all_operations_newer_than(&their_clock); + + if !missing_deltas.is_empty() { + info!( + "Sending {} missing deltas to node {}", + missing_deltas.len(), + requesting_node + ); + + // Send MissingDeltas response + let response = build_missing_deltas(missing_deltas); + if let Err(e) = bridge.send(response) { + error!("Failed to send MissingDeltas: {}", e); + } + } else { + debug!("No missing deltas for node {}", requesting_node); + } + } else { + warn!("Received SyncRequest but OperationLog resource not available"); + } + } + + // MissingDeltas - receiving operations we requested + | SyncMessage::MissingDeltas { deltas } => { + info!("Received MissingDeltas with {} operations", deltas.len()); + + // Apply each delta + for delta in deltas { + debug!( + "Applying missing delta for entity {:?}", + delta.entity_id + ); + + apply_entity_delta( + &delta, + &mut commands, + &mut entity_map, + ®istry, + &mut node_clock, + blob_store_ref, + tombstone_registry.as_deref_mut(), + ); + } + } + } + } +} + +#[cfg(test)] +mod tests { + #[test] + fn test_message_dispatcher_compiles() { + // This test just ensures the dispatcher system compiles + // Integration tests would require a full Bevy app setup + } +} diff --git a/crates/lib/src/networking/messages.rs b/crates/lib/src/networking/messages.rs new file mode 100644 index 0000000..f69046d --- /dev/null +++ b/crates/lib/src/networking/messages.rs @@ -0,0 +1,345 @@ +//! Network message types for CRDT synchronization +//! +//! This module defines the protocol messages used for distributed +//! synchronization according to RFC 0001. + +use serde::{ + Deserialize, + Serialize, +}; + +use crate::networking::{ + operations::ComponentOp, + vector_clock::{ + NodeId, + VectorClock, + }, +}; + +/// Top-level message envelope with versioning +/// +/// All messages sent over the network are wrapped in this envelope to support +/// protocol version negotiation and future compatibility. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct VersionedMessage { + /// Protocol version (currently 1) + pub version: u32, + + /// The actual sync message + pub message: SyncMessage, +} + +impl VersionedMessage { + /// Current protocol version + pub const CURRENT_VERSION: u32 = 1; + + /// Create a new versioned message with the current protocol version + pub fn new(message: SyncMessage) -> Self { + Self { + version: Self::CURRENT_VERSION, + message, + } + } +} + +/// CRDT synchronization protocol messages +/// +/// These messages implement the sync protocol defined in RFC 0001. +/// +/// # Protocol Flow +/// +/// 1. **Join**: New peer sends `JoinRequest`, receives `FullState` +/// 2. **Normal Operation**: Peers broadcast `EntityDelta` on changes +/// 3. **Anti-Entropy**: Periodic `SyncRequest` to detect missing operations +/// 4. **Recovery**: `MissingDeltas` sent in response to `SyncRequest` +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum SyncMessage { + /// Request to join the network and receive full state + /// + /// Sent by a new peer when it first connects. The response will be a + /// `FullState` message containing all entities and their components. + JoinRequest { + /// ID of the node requesting to join + node_id: NodeId, + + /// Optional session secret for authentication + session_secret: Option>, + }, + + /// Complete world state sent to new peers + /// + /// Contains all networked entities and their components. Sent in response + /// to a `JoinRequest`. + FullState { + /// All entities in the world + entities: Vec, + + /// Current vector clock of the sending node + vector_clock: VectorClock, + }, + + /// Delta update for a single entity + /// + /// Broadcast when a component changes. Recipients apply the operations + /// using CRDT merge semantics. + EntityDelta { + /// Network ID of the entity being updated + entity_id: uuid::Uuid, + + /// Node that generated this delta + node_id: NodeId, + + /// Vector clock at the time this delta was created + vector_clock: VectorClock, + + /// Component operations (Set, SetAdd, SequenceInsert, etc.) + operations: Vec, + }, + + /// Request for operations newer than our vector clock + /// + /// Sent periodically for anti-entropy. The recipient compares vector + /// clocks and sends `MissingDeltas` if they have newer operations. + SyncRequest { + /// ID of the node requesting sync + node_id: NodeId, + + /// Our current vector clock + vector_clock: VectorClock, + }, + + /// Operations that the recipient is missing + /// + /// Sent in response to `SyncRequest` when we have operations the peer + /// doesn't know about yet. + MissingDeltas { + /// Entity deltas that the recipient is missing + deltas: Vec, + }, +} + +/// Complete state of a single entity +/// +/// Used in `FullState` messages to transfer all components of an entity. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EntityState { + /// Network ID of the entity + pub entity_id: uuid::Uuid, + + /// Node that originally created this entity + pub owner_node_id: NodeId, + + /// Vector clock when this entity was last updated + pub vector_clock: VectorClock, + + /// All components on this entity + pub components: Vec, + + /// Whether this entity has been deleted (tombstone) + pub is_deleted: bool, +} + +/// State of a single component +/// +/// Contains the component type and its serialized data. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ComponentState { + /// Type path of the component (e.g., "bevy_transform::components::Transform") + pub component_type: String, + + /// Serialized component data (bincode) + pub data: ComponentData, +} + +/// Component data - either inline or a blob reference +/// +/// Components larger than 64KB are stored as blobs and referenced by hash. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub enum ComponentData { + /// Inline data for small components (<64KB) + Inline(Vec), + + /// Reference to a blob for large components (>64KB) + BlobRef { + /// iroh-blobs hash + hash: Vec, + + /// Size of the blob in bytes + size: u64, + }, +} + +impl ComponentData { + /// Threshold for using blobs vs inline data (64KB) + pub const BLOB_THRESHOLD: usize = 64 * 1024; + + /// Create component data, automatically choosing inline vs blob + pub fn new(data: Vec) -> Self { + if data.len() > Self::BLOB_THRESHOLD { + // Will be populated later when uploaded to iroh-blobs + Self::BlobRef { + hash: Vec::new(), + size: data.len() as u64, + } + } else { + Self::Inline(data) + } + } + + /// Check if this is a blob reference + pub fn is_blob(&self) -> bool { + matches!(self, ComponentData::BlobRef { .. }) + } + + /// Get inline data, if available + pub fn as_inline(&self) -> Option<&[u8]> { + match self { + | ComponentData::Inline(data) => Some(data), + | _ => None, + } + } + + /// Get blob reference, if this is a blob + pub fn as_blob_ref(&self) -> Option<(&[u8], u64)> { + match self { + | ComponentData::BlobRef { hash, size } => Some((hash, *size)), + | _ => None, + } + } +} + +/// Wrapper for EntityDelta to allow it to be used directly +/// +/// This struct exists because EntityDelta is defined as an enum variant +/// but we sometimes need to work with it as a standalone type. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EntityDelta { + /// Network ID of the entity being updated + pub entity_id: uuid::Uuid, + + /// Node that generated this delta + pub node_id: NodeId, + + /// Vector clock at the time this delta was created + pub vector_clock: VectorClock, + + /// Component operations (Set, SetAdd, SequenceInsert, etc.) + pub operations: Vec, +} + +impl EntityDelta { + /// Create a new entity delta + pub fn new( + entity_id: uuid::Uuid, + node_id: NodeId, + vector_clock: VectorClock, + operations: Vec, + ) -> Self { + Self { + entity_id, + node_id, + vector_clock, + operations, + } + } + + /// Convert to a SyncMessage::EntityDelta variant + pub fn into_message(self) -> SyncMessage { + SyncMessage::EntityDelta { + entity_id: self.entity_id, + node_id: self.node_id, + vector_clock: self.vector_clock, + operations: self.operations, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_versioned_message_creation() { + let node_id = uuid::Uuid::new_v4(); + let message = SyncMessage::JoinRequest { + node_id, + session_secret: None, + }; + + let versioned = VersionedMessage::new(message); + assert_eq!(versioned.version, VersionedMessage::CURRENT_VERSION); + } + + #[test] + fn test_component_data_inline() { + let data = vec![1, 2, 3, 4]; + let component_data = ComponentData::new(data.clone()); + + assert!(!component_data.is_blob()); + assert_eq!(component_data.as_inline(), Some(data.as_slice())); + } + + #[test] + fn test_component_data_blob() { + // Create data larger than threshold + let data = vec![0u8; ComponentData::BLOB_THRESHOLD + 1]; + let component_data = ComponentData::new(data.clone()); + + assert!(component_data.is_blob()); + assert_eq!(component_data.as_inline(), None); + } + + #[test] + fn test_entity_delta_creation() { + let entity_id = uuid::Uuid::new_v4(); + let node_id = uuid::Uuid::new_v4(); + let vector_clock = VectorClock::new(); + + let delta = EntityDelta::new(entity_id, node_id, vector_clock.clone(), vec![]); + + assert_eq!(delta.entity_id, entity_id); + assert_eq!(delta.node_id, node_id); + assert_eq!(delta.vector_clock, vector_clock); + } + + #[test] + fn test_message_serialization() -> bincode::Result<()> { + let node_id = uuid::Uuid::new_v4(); + let message = SyncMessage::JoinRequest { + node_id, + session_secret: None, + }; + + let versioned = VersionedMessage::new(message); + let bytes = bincode::serialize(&versioned)?; + let deserialized: VersionedMessage = bincode::deserialize(&bytes)?; + + assert_eq!(deserialized.version, versioned.version); + + Ok(()) + } + + #[test] + fn test_full_state_serialization() -> bincode::Result<()> { + let entity_id = uuid::Uuid::new_v4(); + let owner_node = uuid::Uuid::new_v4(); + + let entity_state = EntityState { + entity_id, + owner_node_id: owner_node, + vector_clock: VectorClock::new(), + components: vec![], + is_deleted: false, + }; + + let message = SyncMessage::FullState { + entities: vec![entity_state], + vector_clock: VectorClock::new(), + }; + + let bytes = bincode::serialize(&message)?; + let _deserialized: SyncMessage = bincode::deserialize(&bytes)?; + + Ok(()) + } +} diff --git a/crates/lib/src/networking/mod.rs b/crates/lib/src/networking/mod.rs new file mode 100644 index 0000000..b5811fd --- /dev/null +++ b/crates/lib/src/networking/mod.rs @@ -0,0 +1,71 @@ +//! CRDT-based networking layer for distributed synchronization +//! +//! This module implements the networking strategy defined in RFC 0001. +//! It provides CRDT-based synchronization over iroh-gossip with support for: +//! +//! - **Entity Synchronization** - Automatic sync of NetworkedEntity components +//! - **CRDT Merge Semantics** - LWW, OR-Set, and Sequence CRDTs +//! - **Large Blob Support** - Files >64KB via iroh-blobs +//! - **Join Protocol** - New peers receive full world state +//! - **Anti-Entropy** - Periodic sync to repair network partitions +//! - **Vector Clock** - Causality tracking for distributed operations +//! +//! # Example +//! +//! ``` +//! use lib::networking::*; +//! use uuid::Uuid; +//! +//! // Create a vector clock and track operations +//! let node_id = Uuid::new_v4(); +//! let mut clock = VectorClock::new(); +//! +//! // Increment the clock for local operations +//! clock.increment(node_id); +//! +//! // Build a component operation +//! let builder = ComponentOpBuilder::new(node_id, clock.clone()); +//! let op = builder.set("Transform".to_string(), ComponentData::Inline(vec![1, 2, 3])); +//! ``` + +mod apply_ops; +mod blob_support; +mod change_detection; +mod components; +mod delta_generation; +mod entity_map; +mod error; +mod gossip_bridge; +mod join_protocol; +mod merge; +mod message_dispatcher; +mod messages; +mod operation_builder; +mod operation_log; +mod operations; +mod orset; +mod plugin; +mod rga; +mod tombstones; +mod vector_clock; + +pub use apply_ops::*; +pub use blob_support::*; +pub use change_detection::*; +pub use components::*; +pub use delta_generation::*; +pub use entity_map::*; +pub use error::*; +pub use gossip_bridge::*; +pub use join_protocol::*; +pub use merge::*; +pub use message_dispatcher::*; +pub use messages::*; +pub use operation_builder::*; +pub use operation_log::*; +pub use operations::*; +pub use orset::*; +pub use plugin::*; +pub use rga::*; +pub use tombstones::*; +pub use vector_clock::*; diff --git a/crates/lib/src/networking/operation_builder.rs b/crates/lib/src/networking/operation_builder.rs new file mode 100644 index 0000000..4e6d6ee --- /dev/null +++ b/crates/lib/src/networking/operation_builder.rs @@ -0,0 +1,251 @@ +//! Build CRDT operations from ECS component changes +//! +//! This module provides utilities to convert Bevy component changes into +//! ComponentOp operations that can be synchronized across the network. + +use bevy::{ + prelude::*, + reflect::TypeRegistry, +}; + +use crate::{ + networking::{ + blob_support::{ + create_component_data, + BlobStore, + }, + error::Result, + messages::ComponentData, + operations::{ + ComponentOp, + ComponentOpBuilder, + }, + vector_clock::{ + NodeId, + VectorClock, + }, + }, + persistence::reflection::serialize_component, +}; + +/// Build a Set operation (LWW) from a component +/// +/// Serializes the component using Bevy's reflection system and creates a +/// ComponentOp::Set for Last-Write-Wins synchronization. Automatically uses +/// blob storage for components >64KB. +/// +/// # Parameters +/// +/// - `component`: The component to serialize +/// - `component_type`: Type path string +/// - `node_id`: Our node ID +/// - `vector_clock`: Current vector clock +/// - `type_registry`: Bevy's type registry +/// - `blob_store`: Optional blob store for large components +/// +/// # Returns +/// +/// A ComponentOp::Set ready to be broadcast +pub fn build_set_operation( + component: &dyn Reflect, + component_type: String, + node_id: NodeId, + vector_clock: VectorClock, + type_registry: &TypeRegistry, + blob_store: Option<&BlobStore>, +) -> Result { + // Serialize the component + let serialized = serialize_component(component, type_registry)?; + + // Create component data (inline or blob) + let data = if let Some(store) = blob_store { + create_component_data(serialized, store)? + } else { + ComponentData::Inline(serialized) + }; + + // Build the operation + let builder = ComponentOpBuilder::new(node_id, vector_clock); + Ok(builder.set(component_type, data)) +} + +/// Build Set operations for all components on an entity +/// +/// This iterates over all components with reflection data and creates Set +/// operations for each one. Automatically uses blob storage for large components. +/// +/// # Parameters +/// +/// - `entity`: The entity to serialize +/// - `world`: Bevy world +/// - `node_id`: Our node ID +/// - `vector_clock`: Current vector clock +/// - `type_registry`: Bevy's type registry +/// - `blob_store`: Optional blob store for large components +/// +/// # Returns +/// +/// Vector of ComponentOp::Set operations, one per component +pub fn build_entity_operations( + entity: Entity, + world: &World, + node_id: NodeId, + vector_clock: VectorClock, + type_registry: &TypeRegistry, + blob_store: Option<&BlobStore>, +) -> Vec { + let mut operations = Vec::new(); + let entity_ref = world.entity(entity); + + // Iterate over all type registrations + for registration in type_registry.iter() { + // Skip if no ReflectComponent data + let Some(reflect_component) = registration.data::() else { + continue; + }; + + // Get the type path + let type_path = registration.type_info().type_path(); + + // Skip certain components + if type_path.ends_with("::NetworkedEntity") + || type_path.ends_with("::NetworkedTransform") + || type_path.ends_with("::NetworkedSelection") + || type_path.ends_with("::NetworkedDrawingPath") + { + continue; + } + + // Try to reflect this component from the entity + if let Some(reflected) = reflect_component.reflect(entity_ref) { + // Serialize the component + if let Ok(serialized) = serialize_component(reflected, type_registry) { + // Create component data (inline or blob) + let data = if let Some(store) = blob_store { + if let Ok(component_data) = create_component_data(serialized, store) { + component_data + } else { + continue; // Skip this component if blob storage fails + } + } else { + ComponentData::Inline(serialized) + }; + + // Build the operation + let mut clock = vector_clock.clone(); + clock.increment(node_id); + + operations.push(ComponentOp::Set { + component_type: type_path.to_string(), + data, + vector_clock: clock, + }); + } + } + } + + operations +} + +/// Build a Set operation for Transform component specifically +/// +/// This is a helper for the common case of synchronizing Transform changes. +/// +/// # Example +/// +/// ``` +/// use bevy::prelude::*; +/// use lib::networking::{build_transform_operation, VectorClock}; +/// use uuid::Uuid; +/// +/// # fn example(transform: &Transform, type_registry: &bevy::reflect::TypeRegistry) { +/// let node_id = Uuid::new_v4(); +/// let clock = VectorClock::new(); +/// +/// let op = build_transform_operation(transform, node_id, clock, type_registry, None).unwrap(); +/// # } +/// ``` +pub fn build_transform_operation( + transform: &Transform, + node_id: NodeId, + vector_clock: VectorClock, + type_registry: &TypeRegistry, + blob_store: Option<&BlobStore>, +) -> Result { + // Use reflection to serialize Transform + let serialized = serialize_component(transform.as_reflect(), type_registry)?; + + // Create component data (inline or blob) + let data = if let Some(store) = blob_store { + create_component_data(serialized, store)? + } else { + ComponentData::Inline(serialized) + }; + + let builder = ComponentOpBuilder::new(node_id, vector_clock); + Ok(builder.set("bevy_transform::components::transform::Transform".to_string(), data)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_build_transform_operation() { + let mut type_registry = TypeRegistry::new(); + type_registry.register::(); + + let transform = Transform::default(); + let node_id = uuid::Uuid::new_v4(); + let clock = VectorClock::new(); + + let op = build_transform_operation(&transform, node_id, clock, &type_registry, None).unwrap(); + + assert!(op.is_set()); + assert_eq!( + op.component_type(), + Some("bevy_transform::components::transform::Transform") + ); + assert_eq!(op.vector_clock().get(node_id), 1); + } + + #[test] + fn test_build_entity_operations() { + let mut world = World::new(); + let mut type_registry = TypeRegistry::new(); + + // Register Transform + type_registry.register::(); + + // Spawn entity with Transform + let entity = world + .spawn(Transform::from_xyz(1.0, 2.0, 3.0)) + .id(); + + let node_id = uuid::Uuid::new_v4(); + let clock = VectorClock::new(); + + let ops = build_entity_operations(entity, &world, node_id, clock, &type_registry, None); + + // Should have at least Transform operation + assert!(!ops.is_empty()); + assert!(ops.iter().all(|op| op.is_set())); + } + + #[test] + fn test_vector_clock_increment() { + let mut type_registry = TypeRegistry::new(); + type_registry.register::(); + + let transform = Transform::default(); + let node_id = uuid::Uuid::new_v4(); + let mut clock = VectorClock::new(); + + let op1 = build_transform_operation(&transform, node_id, clock.clone(), &type_registry, None).unwrap(); + assert_eq!(op1.vector_clock().get(node_id), 1); + + clock.increment(node_id); + let op2 = build_transform_operation(&transform, node_id, clock.clone(), &type_registry, None).unwrap(); + assert_eq!(op2.vector_clock().get(node_id), 2); + } +} diff --git a/crates/lib/src/networking/operation_log.rs b/crates/lib/src/networking/operation_log.rs new file mode 100644 index 0000000..e017c7e --- /dev/null +++ b/crates/lib/src/networking/operation_log.rs @@ -0,0 +1,529 @@ +//! Operation log for anti-entropy and partition recovery +//! +//! This module maintains a bounded log of recent operations for each entity, +//! enabling peers to request missing deltas after network partitions or when +//! they join late. +//! +//! The operation log: +//! - Stores EntityDelta messages for recent operations +//! - Bounded by time (keep operations from last N minutes) or size (max M ops) +//! - Allows peers to request operations newer than their vector clock +//! - Supports periodic anti-entropy sync to repair partitions + +use std::collections::{ + HashMap, + VecDeque, +}; + +use bevy::prelude::*; + +use crate::networking::{ + messages::{ + EntityDelta, + SyncMessage, + VersionedMessage, + }, + vector_clock::{ + NodeId, + VectorClock, + }, + GossipBridge, + NodeVectorClock, +}; + +/// Maximum operations to keep per entity (prevents unbounded growth) +const MAX_OPS_PER_ENTITY: usize = 100; + +/// Maximum age for operations (in seconds) +const MAX_OP_AGE_SECS: u64 = 300; // 5 minutes + +/// Maximum number of entities to track (prevents unbounded growth) +const MAX_ENTITIES: usize = 10_000; + +/// Operation log entry with timestamp +#[derive(Debug, Clone)] +struct LogEntry { + /// The entity delta operation + delta: EntityDelta, + + /// When this operation was created (for pruning old ops) + timestamp: std::time::Instant, +} + +/// Resource storing the operation log for all entities +/// +/// This is used for anti-entropy - peers can request operations they're missing +/// by comparing vector clocks. +/// +/// # Bounded Growth +/// +/// The operation log is bounded in three ways: +/// - Max operations per entity: `MAX_OPS_PER_ENTITY` (100) +/// - Max operation age: `MAX_OP_AGE_SECS` (300 seconds / 5 minutes) +/// - Max entities: `MAX_ENTITIES` (10,000) +/// +/// When limits are exceeded, oldest operations/entities are pruned automatically. +#[derive(Resource)] +pub struct OperationLog { + /// Map from entity ID to list of recent operations + logs: HashMap>, + + /// Total number of operations across all entities (for monitoring) + total_ops: usize, +} + +impl OperationLog { + /// Create a new operation log + pub fn new() -> Self { + Self { + logs: HashMap::new(), + total_ops: 0, + } + } + + /// Record an operation in the log + /// + /// This should be called whenever we generate or apply an EntityDelta. + /// + /// # Example + /// + /// ``` + /// use lib::networking::{OperationLog, EntityDelta, VectorClock}; + /// use uuid::Uuid; + /// + /// let mut log = OperationLog::new(); + /// let entity_id = Uuid::new_v4(); + /// let node_id = Uuid::new_v4(); + /// let clock = VectorClock::new(); + /// + /// let delta = EntityDelta::new(entity_id, node_id, clock, vec![]); + /// log.record_operation(delta); + /// ``` + pub fn record_operation(&mut self, delta: EntityDelta) { + // Check if we're at the entity limit + if self.logs.len() >= MAX_ENTITIES && !self.logs.contains_key(&delta.entity_id) { + // Prune oldest entity (by finding entity with oldest operation) + if let Some(oldest_entity_id) = self.find_oldest_entity() { + warn!( + "Operation log at entity limit ({}), pruning oldest entity {:?}", + MAX_ENTITIES, oldest_entity_id + ); + if let Some(removed_log) = self.logs.remove(&oldest_entity_id) { + self.total_ops = self.total_ops.saturating_sub(removed_log.len()); + } + } + } + + let entry = LogEntry { + delta: delta.clone(), + timestamp: std::time::Instant::now(), + }; + + let log = self.logs.entry(delta.entity_id).or_insert_with(VecDeque::new); + log.push_back(entry); + self.total_ops += 1; + + // Prune if we exceed max ops per entity + while log.len() > MAX_OPS_PER_ENTITY { + log.pop_front(); + self.total_ops = self.total_ops.saturating_sub(1); + } + } + + /// Find the entity with the oldest operation (for LRU eviction) + fn find_oldest_entity(&self) -> Option { + self.logs + .iter() + .filter_map(|(entity_id, log)| { + log.front().map(|entry| (*entity_id, entry.timestamp)) + }) + .min_by_key(|(_, timestamp)| *timestamp) + .map(|(entity_id, _)| entity_id) + } + + /// Get operations for an entity that are newer than a given vector clock + /// + /// This is used to respond to SyncRequest messages. + pub fn get_operations_newer_than( + &self, + entity_id: uuid::Uuid, + their_clock: &VectorClock, + ) -> Vec { + let Some(log) = self.logs.get(&entity_id) else { + return vec![]; + }; + + log.iter() + .filter(|entry| { + // Include operation if they haven't seen it yet + // (their clock happened before the operation's clock) + their_clock.happened_before(&entry.delta.vector_clock) + }) + .map(|entry| entry.delta.clone()) + .collect() + } + + /// Get all operations newer than a vector clock across all entities + /// + /// This is used to respond to SyncRequest for the entire world state. + pub fn get_all_operations_newer_than(&self, their_clock: &VectorClock) -> Vec { + let mut deltas = Vec::new(); + + for (entity_id, _log) in &self.logs { + let entity_deltas = self.get_operations_newer_than(*entity_id, their_clock); + deltas.extend(entity_deltas); + } + + deltas + } + + /// Prune old operations from the log + /// + /// This should be called periodically to prevent unbounded growth. + /// Removes operations older than MAX_OP_AGE_SECS. + pub fn prune_old_operations(&mut self) { + let max_age = std::time::Duration::from_secs(MAX_OP_AGE_SECS); + let now = std::time::Instant::now(); + + let mut pruned_count = 0; + + for log in self.logs.values_mut() { + let before_len = log.len(); + log.retain(|entry| { + now.duration_since(entry.timestamp) < max_age + }); + pruned_count += before_len - log.len(); + } + + // Update total_ops counter + self.total_ops = self.total_ops.saturating_sub(pruned_count); + + // Remove empty logs + self.logs.retain(|_, log| !log.is_empty()); + } + + /// Get the number of operations in the log + pub fn total_operations(&self) -> usize { + self.total_ops + } + + /// Get the number of entities with logged operations + pub fn num_entities(&self) -> usize { + self.logs.len() + } +} + +impl Default for OperationLog { + fn default() -> Self { + Self::new() + } +} + +/// Build a SyncRequest message +/// +/// This asks peers to send us any operations we're missing. +/// +/// # Example +/// +/// ``` +/// use lib::networking::{build_sync_request, VectorClock}; +/// use uuid::Uuid; +/// +/// let node_id = Uuid::new_v4(); +/// let clock = VectorClock::new(); +/// let request = build_sync_request(node_id, clock); +/// ``` +pub fn build_sync_request(node_id: NodeId, vector_clock: VectorClock) -> VersionedMessage { + VersionedMessage::new(SyncMessage::SyncRequest { + node_id, + vector_clock, + }) +} + +/// Build a MissingDeltas response +/// +/// This contains operations that the requesting peer is missing. +pub fn build_missing_deltas(deltas: Vec) -> VersionedMessage { + VersionedMessage::new(SyncMessage::MissingDeltas { deltas }) +} + +/// System to handle SyncRequest messages +/// +/// When we receive a SyncRequest, compare vector clocks and send any +/// operations the peer is missing. +/// +/// Add this to your app: +/// +/// ```no_run +/// use bevy::prelude::*; +/// use lib::networking::handle_sync_requests_system; +/// +/// App::new() +/// .add_systems(Update, handle_sync_requests_system); +/// ``` +pub fn handle_sync_requests_system( + bridge: Option>, + operation_log: Res, +) { + let Some(bridge) = bridge else { + return; + }; + + // Poll for SyncRequest messages + while let Some(message) = bridge.try_recv() { + match message.message { + | SyncMessage::SyncRequest { + node_id: requesting_node, + vector_clock: their_clock, + } => { + debug!("Received SyncRequest from node {}", requesting_node); + + // Find operations they're missing + let missing_deltas = operation_log.get_all_operations_newer_than(&their_clock); + + if !missing_deltas.is_empty() { + info!( + "Sending {} missing deltas to node {}", + missing_deltas.len(), + requesting_node + ); + + // Send MissingDeltas response + let response = build_missing_deltas(missing_deltas); + if let Err(e) = bridge.send(response) { + error!("Failed to send MissingDeltas: {}", e); + } + } else { + debug!("No missing deltas for node {}", requesting_node); + } + } + | _ => { + // Not a SyncRequest, ignore + } + } + } +} + +/// System to handle MissingDeltas messages +/// +/// When we receive MissingDeltas (in response to our SyncRequest), apply them. +pub fn handle_missing_deltas_system( + mut commands: Commands, + bridge: Option>, + mut entity_map: ResMut, + type_registry: Res, + mut node_clock: ResMut, + blob_store: Option>, + mut tombstone_registry: Option>, +) { + let Some(bridge) = bridge else { + return; + }; + + let registry = type_registry.read(); + let blob_store_ref = blob_store.as_deref(); + + // Poll for MissingDeltas messages + while let Some(message) = bridge.try_recv() { + match message.message { + | SyncMessage::MissingDeltas { deltas } => { + info!("Received MissingDeltas with {} operations", deltas.len()); + + // Apply each delta + for delta in deltas { + debug!( + "Applying missing delta for entity {:?}", + delta.entity_id + ); + + crate::networking::apply_entity_delta( + &delta, + &mut commands, + &mut entity_map, + ®istry, + &mut node_clock, + blob_store_ref, + tombstone_registry.as_deref_mut(), + ); + } + } + | _ => { + // Not MissingDeltas, ignore + } + } + } +} + +/// System to periodically send SyncRequest for anti-entropy +/// +/// This runs every N seconds to request any operations we might be missing, +/// helping to repair network partitions. +/// +/// **NOTE:** This is a simple timer-based implementation. Phase 14 will add +/// adaptive sync intervals based on network conditions. +pub fn periodic_sync_system( + bridge: Option>, + node_clock: Res, + time: Res