checkpoint for the demo. almost!!
Signed-off-by: Sienna Meridian Satterwhite <sienna@r3t.io>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -4652,6 +4652,7 @@ dependencies = [
|
||||
"raw-window-handle",
|
||||
"rkyv",
|
||||
"rusqlite",
|
||||
"rustc-hash 2.1.1",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha2 0.10.9",
|
||||
|
||||
@@ -82,10 +82,12 @@ fn poll_engine_events(
|
||||
commands.insert_resource(gossip_bridge);
|
||||
info!("Inserted GossipBridge resource");
|
||||
|
||||
// Update session to use the new session ID and set state to Active
|
||||
// Update session to use the new session ID and set state to Joining
|
||||
// The transition_session_state_system will handle Joining → Active
|
||||
// after receiving FullState from peers
|
||||
current_session.session = Session::new(session_id.clone());
|
||||
current_session.session.state = SessionState::Active;
|
||||
info!("Updated CurrentSession to Active: {}", session_id.to_code());
|
||||
current_session.session.state = SessionState::Joining;
|
||||
info!("Updated CurrentSession to Joining: {}", session_id.to_code());
|
||||
|
||||
// Update node ID in clock
|
||||
node_clock.node_id = node_id;
|
||||
|
||||
@@ -70,6 +70,7 @@ itertools = "0.14"
|
||||
rand = "0.8"
|
||||
raw-window-handle = "0.6"
|
||||
rusqlite = { version = "0.37.0", features = ["bundled"] }
|
||||
rustc-hash = "2.1"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json.workspace = true
|
||||
sha2 = "0.10"
|
||||
|
||||
@@ -71,11 +71,22 @@ impl EngineCore {
|
||||
self.stop_networking().await;
|
||||
}
|
||||
EngineCommand::SaveSession => {
|
||||
// TODO: Save current session state
|
||||
tracing::debug!("SaveSession command received (stub)");
|
||||
// Session state is auto-saved by save_session_on_shutdown_system in Bevy
|
||||
// This command is a no-op, as persistence is handled by Bevy systems
|
||||
tracing::debug!("SaveSession command received (session auto-save handled by Bevy)");
|
||||
}
|
||||
EngineCommand::LoadSession { session_id } => {
|
||||
tracing::debug!("LoadSession command received for {} (stub)", session_id.to_code());
|
||||
// Loading a session means switching to a different session
|
||||
// This requires restarting networking with the new session
|
||||
tracing::info!("LoadSession command received for {}", session_id.to_code());
|
||||
|
||||
// Stop current networking if any
|
||||
if self.networking_task.is_some() {
|
||||
self.stop_networking().await;
|
||||
}
|
||||
|
||||
// Start networking with the new session
|
||||
self.start_networking(session_id).await;
|
||||
}
|
||||
EngineCommand::TickClock => {
|
||||
self.tick_clock();
|
||||
|
||||
@@ -240,11 +240,43 @@ impl NetworkingManager {
|
||||
Event::Received(msg) => {
|
||||
// Deserialize and forward to GossipBridge for Bevy systems
|
||||
if let Ok(versioned) = rkyv::from_bytes::<VersionedMessage, rkyv::rancor::Failure>(&msg.content) {
|
||||
if let Err(e) = self.bridge.push_incoming(versioned) {
|
||||
tracing::error!("Failed to push message to GossipBridge: {}", e);
|
||||
} else {
|
||||
tracing::debug!("Forwarded message to Bevy via GossipBridge");
|
||||
// Diagnostic logging: track message type and nonce
|
||||
let msg_type = match &versioned.message {
|
||||
SyncMessage::EntityDelta { entity_id, .. } => {
|
||||
format!("EntityDelta({})", entity_id)
|
||||
}
|
||||
SyncMessage::JoinRequest { node_id, .. } => {
|
||||
format!("JoinRequest({})", node_id)
|
||||
}
|
||||
SyncMessage::FullState { entities, .. } => {
|
||||
format!("FullState({} entities)", entities.len())
|
||||
}
|
||||
SyncMessage::SyncRequest { node_id, .. } => {
|
||||
format!("SyncRequest({})", node_id)
|
||||
}
|
||||
SyncMessage::MissingDeltas { deltas } => {
|
||||
format!("MissingDeltas({} ops)", deltas.len())
|
||||
}
|
||||
SyncMessage::Lock(lock_msg) => {
|
||||
format!("Lock({:?})", lock_msg)
|
||||
}
|
||||
};
|
||||
|
||||
tracing::debug!(
|
||||
"[NetworkingManager::receive] Node {} received from iroh-gossip: {} (nonce: {})",
|
||||
self.node_id, msg_type, versioned.nonce
|
||||
);
|
||||
|
||||
if let Err(e) = self.bridge.push_incoming(versioned) {
|
||||
tracing::error!("Failed to forward {} to GossipBridge: {}", msg_type, e);
|
||||
} else {
|
||||
tracing::debug!(
|
||||
"[NetworkingManager::receive] ✓ Forwarded {} to Bevy GossipBridge",
|
||||
msg_type
|
||||
);
|
||||
}
|
||||
} else {
|
||||
tracing::warn!("Failed to deserialize message from iroh-gossip");
|
||||
}
|
||||
}
|
||||
Event::NeighborUp(peer) => {
|
||||
@@ -288,15 +320,54 @@ impl NetworkingManager {
|
||||
|
||||
// Poll GossipBridge for outgoing messages and broadcast via iroh
|
||||
_ = bridge_poll_interval.tick() => {
|
||||
let mut sent_count = 0;
|
||||
while let Some(msg) = self.bridge.try_recv_outgoing() {
|
||||
// Diagnostic logging: track message type and nonce
|
||||
let msg_type = match &msg.message {
|
||||
SyncMessage::EntityDelta { entity_id, .. } => {
|
||||
format!("EntityDelta({})", entity_id)
|
||||
}
|
||||
SyncMessage::JoinRequest { node_id, .. } => {
|
||||
format!("JoinRequest({})", node_id)
|
||||
}
|
||||
SyncMessage::FullState { entities, .. } => {
|
||||
format!("FullState({} entities)", entities.len())
|
||||
}
|
||||
SyncMessage::SyncRequest { node_id, .. } => {
|
||||
format!("SyncRequest({})", node_id)
|
||||
}
|
||||
SyncMessage::MissingDeltas { deltas } => {
|
||||
format!("MissingDeltas({} ops)", deltas.len())
|
||||
}
|
||||
SyncMessage::Lock(lock_msg) => {
|
||||
format!("Lock({:?})", lock_msg)
|
||||
}
|
||||
};
|
||||
|
||||
tracing::debug!(
|
||||
"[NetworkingManager::broadcast] Node {} broadcasting: {} (nonce: {})",
|
||||
self.node_id, msg_type, msg.nonce
|
||||
);
|
||||
|
||||
if let Ok(bytes) = rkyv::to_bytes::<rkyv::rancor::Failure>(&msg).map(|b| b.to_vec()) {
|
||||
if let Err(e) = self.sender.broadcast(Bytes::from(bytes)).await {
|
||||
tracing::error!("Failed to broadcast message: {}", e);
|
||||
tracing::error!("Failed to broadcast {} to iroh-gossip: {}", msg_type, e);
|
||||
} else {
|
||||
tracing::debug!("Broadcast message from Bevy via iroh-gossip");
|
||||
sent_count += 1;
|
||||
tracing::debug!(
|
||||
"[NetworkingManager::broadcast] ✓ Sent {} to iroh-gossip network",
|
||||
msg_type
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if sent_count > 0 {
|
||||
tracing::info!(
|
||||
"[NetworkingManager::broadcast] Node {} sent {} messages to iroh-gossip network",
|
||||
self.node_id, sent_count
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Periodic tasks: heartbeats and lock cleanup
|
||||
|
||||
@@ -165,6 +165,24 @@ pub fn apply_entity_delta(delta: &EntityDelta, world: &mut World) {
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// CRITICAL: Add marker to prevent feedback loop
|
||||
//
|
||||
// When we apply remote operations, insert_fn() triggers Bevy's change detection.
|
||||
// This causes auto_detect_transform_changes_system to mark NetworkedEntity as changed,
|
||||
// which would normally trigger generate_delta_system to broadcast it back, creating
|
||||
// an infinite feedback loop.
|
||||
//
|
||||
// By adding SkipNextDeltaGeneration marker, we tell generate_delta_system to skip
|
||||
// this entity for one frame. A cleanup system removes the marker after delta
|
||||
// generation runs, allowing future local changes to be broadcast normally.
|
||||
if let Ok(mut entity_mut) = world.get_entity_mut(entity) {
|
||||
entity_mut.insert(crate::networking::SkipNextDeltaGeneration);
|
||||
debug!(
|
||||
"Added SkipNextDeltaGeneration marker to entity {:?} to prevent feedback loop",
|
||||
delta.entity_id
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Apply a single ComponentOp to an entity
|
||||
@@ -235,14 +253,18 @@ fn apply_set_operation_with_lww(
|
||||
) {
|
||||
// Get component type name for logging and clock tracking
|
||||
let type_registry = {
|
||||
let registry_resource = world.resource::<crate::persistence::ComponentTypeRegistryResource>();
|
||||
let registry_resource =
|
||||
world.resource::<crate::persistence::ComponentTypeRegistryResource>();
|
||||
registry_resource.0
|
||||
};
|
||||
|
||||
let component_type_name = match type_registry.get_type_name(discriminant) {
|
||||
| Some(name) => name,
|
||||
| None => {
|
||||
error!("Unknown discriminant {} - component not registered", discriminant);
|
||||
error!(
|
||||
"Unknown discriminant {} - component not registered",
|
||||
discriminant
|
||||
);
|
||||
return;
|
||||
},
|
||||
};
|
||||
@@ -310,7 +332,10 @@ fn apply_set_operation_with_lww(
|
||||
}
|
||||
},
|
||||
| crate::networking::merge::MergeDecision::Equal => {
|
||||
debug!("Ignoring remote Set for {} (clocks equal)", component_type_name);
|
||||
debug!(
|
||||
"Ignoring remote Set for {} (clocks equal)",
|
||||
component_type_name
|
||||
);
|
||||
false
|
||||
},
|
||||
}
|
||||
@@ -355,12 +380,7 @@ fn apply_set_operation_with_lww(
|
||||
///
|
||||
/// Deserializes the component and inserts/updates it on the entity.
|
||||
/// Handles both inline data and blob references.
|
||||
fn apply_set_operation(
|
||||
entity: Entity,
|
||||
discriminant: u16,
|
||||
data: &ComponentData,
|
||||
world: &mut World,
|
||||
) {
|
||||
fn apply_set_operation(entity: Entity, discriminant: u16, data: &ComponentData, world: &mut World) {
|
||||
let blob_store = world.get_resource::<BlobStore>();
|
||||
|
||||
// Get the actual data (resolve blob if needed)
|
||||
@@ -390,7 +410,8 @@ fn apply_set_operation(
|
||||
|
||||
// Get component type registry
|
||||
let type_registry = {
|
||||
let registry_resource = world.resource::<crate::persistence::ComponentTypeRegistryResource>();
|
||||
let registry_resource =
|
||||
world.resource::<crate::persistence::ComponentTypeRegistryResource>();
|
||||
registry_resource.0
|
||||
};
|
||||
|
||||
@@ -401,7 +422,10 @@ fn apply_set_operation(
|
||||
let (deserialize_fn, insert_fn) = match (deserialize_fn, insert_fn) {
|
||||
| (Some(d), Some(i)) => (d, i),
|
||||
| _ => {
|
||||
error!("Discriminant {} not registered in ComponentTypeRegistry", discriminant);
|
||||
error!(
|
||||
"Discriminant {} not registered in ComponentTypeRegistry",
|
||||
discriminant
|
||||
);
|
||||
return;
|
||||
},
|
||||
};
|
||||
|
||||
@@ -31,6 +31,7 @@ pub fn auto_detect_transform_changes_system(
|
||||
(
|
||||
With<NetworkedTransform>,
|
||||
Or<(Changed<Transform>, Changed<GlobalTransform>)>,
|
||||
Without<crate::networking::SkipNextDeltaGeneration>,
|
||||
),
|
||||
>,
|
||||
) {
|
||||
|
||||
@@ -11,6 +11,21 @@ use serde::{
|
||||
|
||||
use crate::networking::vector_clock::NodeId;
|
||||
|
||||
/// Marker component to skip delta generation for one frame after receiving remote updates
|
||||
///
|
||||
/// When we apply remote operations via `apply_entity_delta()`, the `insert_fn()` call
|
||||
/// triggers Bevy's change detection. This would normally cause `generate_delta_system`
|
||||
/// to create and broadcast a new delta, creating an infinite feedback loop.
|
||||
///
|
||||
/// By adding this marker when we apply remote updates, we tell `generate_delta_system`
|
||||
/// to skip this entity for one frame. A cleanup system removes the marker after
|
||||
/// delta generation runs, allowing future local changes to be broadcast normally.
|
||||
///
|
||||
/// This is an implementation detail of the feedback loop prevention mechanism.
|
||||
/// User code should never need to interact with this component.
|
||||
#[derive(Component, Debug)]
|
||||
pub struct SkipNextDeltaGeneration;
|
||||
|
||||
/// Marker component indicating an entity should be synchronized over the
|
||||
/// network
|
||||
///
|
||||
|
||||
@@ -70,8 +70,13 @@ pub fn generate_delta_system(world: &mut World) {
|
||||
// Broadcast only happens when online
|
||||
|
||||
let changed_entities: Vec<(Entity, uuid::Uuid, uuid::Uuid)> = {
|
||||
let mut query =
|
||||
world.query_filtered::<(Entity, &NetworkedEntity), Or<(Added<NetworkedEntity>, Changed<NetworkedEntity>)>>();
|
||||
let mut query = world.query_filtered::<
|
||||
(Entity, &NetworkedEntity),
|
||||
(
|
||||
Or<(Added<NetworkedEntity>, Changed<NetworkedEntity>)>,
|
||||
Without<crate::networking::SkipNextDeltaGeneration>,
|
||||
),
|
||||
>();
|
||||
query
|
||||
.iter(world)
|
||||
.map(|(entity, networked)| (entity, networked.network_id, networked.owner_node_id))
|
||||
@@ -98,22 +103,25 @@ pub fn generate_delta_system(world: &mut World) {
|
||||
Option<ResMut<crate::networking::OperationLog>>,
|
||||
)> = bevy::ecs::system::SystemState::new(world);
|
||||
|
||||
let (node_id, vector_clock, current_seq) = {
|
||||
let (node_id, vector_clock, new_seq) = {
|
||||
let (_, _, mut node_clock, last_versions, _) = system_state.get_mut(world);
|
||||
|
||||
// Check if we should sync this entity
|
||||
// Check if we should sync this entity with the NEXT sequence (after tick)
|
||||
// This prevents duplicate sends when system runs multiple times per frame
|
||||
let current_seq = node_clock.sequence();
|
||||
if !last_versions.should_sync(network_id, current_seq) {
|
||||
let next_seq = current_seq + 1; // What the sequence will be after tick
|
||||
if !last_versions.should_sync(network_id, next_seq) {
|
||||
drop(last_versions);
|
||||
drop(node_clock);
|
||||
system_state.apply(world);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Increment our vector clock
|
||||
node_clock.tick();
|
||||
// Increment our vector clock and get the NEW sequence
|
||||
let new_seq = node_clock.tick();
|
||||
debug_assert_eq!(new_seq, next_seq, "tick() should return next_seq");
|
||||
|
||||
(node_clock.node_id, node_clock.clock.clone(), current_seq)
|
||||
(node_clock.node_id, node_clock.clock.clone(), new_seq)
|
||||
};
|
||||
|
||||
// Phase 2: Build operations (needs world access without holding other borrows)
|
||||
@@ -174,8 +182,10 @@ pub fn generate_delta_system(world: &mut World) {
|
||||
);
|
||||
}
|
||||
|
||||
// Update last sync version (both online and offline)
|
||||
last_versions.update(network_id, current_seq);
|
||||
// Update last sync version with NEW sequence (after tick) to prevent duplicates
|
||||
// CRITICAL: Must use new_seq (after tick), not current_seq (before tick)
|
||||
// This prevents sending duplicate deltas if system runs multiple times per frame
|
||||
last_versions.update(network_id, new_seq);
|
||||
|
||||
delta
|
||||
};
|
||||
@@ -220,6 +230,46 @@ pub fn generate_delta_system(world: &mut World) {
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove SkipNextDeltaGeneration markers after delta generation has run
|
||||
///
|
||||
/// This system must run AFTER `generate_delta_system` to allow entities to be
|
||||
/// synced again on the next actual local change. The marker prevents feedback
|
||||
/// loops by skipping entities that just received remote updates, but we need
|
||||
/// to remove it so future local changes get broadcast.
|
||||
///
|
||||
/// Add this to your app after generate_delta_system:
|
||||
///
|
||||
/// ```no_run
|
||||
/// use bevy::prelude::*;
|
||||
/// use libmarathon::networking::{generate_delta_system, cleanup_skip_delta_markers_system};
|
||||
///
|
||||
/// App::new().add_systems(PostUpdate, (
|
||||
/// generate_delta_system,
|
||||
/// cleanup_skip_delta_markers_system,
|
||||
/// ).chain());
|
||||
/// ```
|
||||
pub fn cleanup_skip_delta_markers_system(world: &mut World) {
|
||||
// Use immediate removal (not deferred commands) to ensure markers are removed
|
||||
// synchronously after generate_delta_system runs, not at the start of next frame
|
||||
let entities_to_clean: Vec<Entity> = {
|
||||
let mut query = world.query_filtered::<Entity, With<crate::networking::SkipNextDeltaGeneration>>();
|
||||
query.iter(world).collect()
|
||||
};
|
||||
|
||||
for entity in &entities_to_clean {
|
||||
if let Ok(mut entity_mut) = world.get_entity_mut(*entity) {
|
||||
entity_mut.remove::<crate::networking::SkipNextDeltaGeneration>();
|
||||
}
|
||||
}
|
||||
|
||||
if !entities_to_clean.is_empty() {
|
||||
debug!(
|
||||
"cleanup_skip_delta_markers_system: Removed markers from {} entities",
|
||||
entities_to_clean.len()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -65,6 +65,33 @@ impl GossipBridge {
|
||||
|
||||
/// Send a message to the gossip network
|
||||
pub fn send(&self, message: VersionedMessage) -> Result<()> {
|
||||
// Diagnostic logging: track message type and nonce
|
||||
let msg_type = match &message.message {
|
||||
crate::networking::SyncMessage::EntityDelta { entity_id, .. } => {
|
||||
format!("EntityDelta({})", entity_id)
|
||||
}
|
||||
crate::networking::SyncMessage::JoinRequest { node_id, .. } => {
|
||||
format!("JoinRequest({})", node_id)
|
||||
}
|
||||
crate::networking::SyncMessage::FullState { entities, .. } => {
|
||||
format!("FullState({} entities)", entities.len())
|
||||
}
|
||||
crate::networking::SyncMessage::SyncRequest { node_id, .. } => {
|
||||
format!("SyncRequest({})", node_id)
|
||||
}
|
||||
crate::networking::SyncMessage::MissingDeltas { deltas } => {
|
||||
format!("MissingDeltas({} ops)", deltas.len())
|
||||
}
|
||||
crate::networking::SyncMessage::Lock(lock_msg) => {
|
||||
format!("Lock({:?})", lock_msg)
|
||||
}
|
||||
};
|
||||
|
||||
debug!(
|
||||
"[GossipBridge::send] Node {} queuing message: {} (nonce: {})",
|
||||
self.node_id, msg_type, message.nonce
|
||||
);
|
||||
|
||||
self.outgoing
|
||||
.lock()
|
||||
.map_err(|e| NetworkingError::Gossip(format!("Failed to lock outgoing queue: {}", e)))?
|
||||
@@ -97,6 +124,33 @@ impl GossipBridge {
|
||||
|
||||
/// Push a message to the incoming queue (for testing/integration)
|
||||
pub fn push_incoming(&self, message: VersionedMessage) -> Result<()> {
|
||||
// Diagnostic logging: track incoming message type
|
||||
let msg_type = match &message.message {
|
||||
crate::networking::SyncMessage::EntityDelta { entity_id, .. } => {
|
||||
format!("EntityDelta({})", entity_id)
|
||||
}
|
||||
crate::networking::SyncMessage::JoinRequest { node_id, .. } => {
|
||||
format!("JoinRequest({})", node_id)
|
||||
}
|
||||
crate::networking::SyncMessage::FullState { entities, .. } => {
|
||||
format!("FullState({} entities)", entities.len())
|
||||
}
|
||||
crate::networking::SyncMessage::SyncRequest { node_id, .. } => {
|
||||
format!("SyncRequest({})", node_id)
|
||||
}
|
||||
crate::networking::SyncMessage::MissingDeltas { deltas } => {
|
||||
format!("MissingDeltas({} ops)", deltas.len())
|
||||
}
|
||||
crate::networking::SyncMessage::Lock(lock_msg) => {
|
||||
format!("Lock({:?})", lock_msg)
|
||||
}
|
||||
};
|
||||
|
||||
debug!(
|
||||
"[GossipBridge::push_incoming] Node {} received from network: {} (nonce: {})",
|
||||
self.node_id, msg_type, message.nonce
|
||||
);
|
||||
|
||||
self.incoming
|
||||
.lock()
|
||||
.map_err(|e| NetworkingError::Gossip(format!("Failed to lock incoming queue: {}", e)))?
|
||||
|
||||
@@ -17,6 +17,7 @@ use crate::networking::{
|
||||
GossipBridge,
|
||||
NetworkedEntity,
|
||||
SessionId,
|
||||
Synced,
|
||||
VectorClock,
|
||||
blob_support::BlobStore,
|
||||
delta_generation::NodeVectorClock,
|
||||
@@ -129,8 +130,9 @@ pub fn build_full_state(
|
||||
}
|
||||
|
||||
info!(
|
||||
"Built FullState with {} entities for new peer",
|
||||
entities.len()
|
||||
"Built FullState with {} entities ({} total networked entities queried) for new peer",
|
||||
entities.len(),
|
||||
networked_entities.iter().count()
|
||||
);
|
||||
|
||||
VersionedMessage::new(SyncMessage::FullState {
|
||||
@@ -168,12 +170,17 @@ pub fn apply_full_state(
|
||||
{
|
||||
let mut node_clock = world.resource_mut::<NodeVectorClock>();
|
||||
node_clock.clock.merge(&remote_clock);
|
||||
info!("Vector clock after merge: {:?}", node_clock.clock);
|
||||
}
|
||||
|
||||
let mut spawned_count = 0;
|
||||
let mut tombstoned_count = 0;
|
||||
|
||||
// Spawn all entities and apply their state
|
||||
for entity_state in entities {
|
||||
// Handle deleted entities (tombstones)
|
||||
if entity_state.is_deleted {
|
||||
tombstoned_count += 1;
|
||||
// Record tombstone
|
||||
if let Some(mut registry) = world.get_resource_mut::<crate::networking::TombstoneRegistry>() {
|
||||
registry.record_deletion(
|
||||
@@ -185,12 +192,30 @@ pub fn apply_full_state(
|
||||
continue;
|
||||
}
|
||||
|
||||
// Spawn entity with NetworkedEntity and Persisted components
|
||||
// This ensures entities received via FullState are persisted locally
|
||||
// Check if entity already exists in the map
|
||||
let entity = {
|
||||
let entity_map = world.resource::<NetworkEntityMap>();
|
||||
entity_map.get_entity(entity_state.entity_id)
|
||||
};
|
||||
|
||||
let entity = match entity {
|
||||
Some(existing_entity) => {
|
||||
// Entity already exists - reuse it and update components
|
||||
debug!(
|
||||
"Entity {} already exists (local entity {:?}), updating components",
|
||||
entity_state.entity_id, existing_entity
|
||||
);
|
||||
existing_entity
|
||||
}
|
||||
None => {
|
||||
// Spawn new entity with NetworkedEntity, Persisted, and Synced components
|
||||
// This ensures entities received via FullState are persisted locally and
|
||||
// will auto-sync their Transform if one is added
|
||||
let entity = world
|
||||
.spawn((
|
||||
NetworkedEntity::with_id(entity_state.entity_id, entity_state.owner_node_id),
|
||||
crate::persistence::Persisted::with_id(entity_state.entity_id),
|
||||
Synced,
|
||||
))
|
||||
.id();
|
||||
|
||||
@@ -200,6 +225,11 @@ pub fn apply_full_state(
|
||||
entity_map.insert(entity_state.entity_id, entity);
|
||||
}
|
||||
|
||||
spawned_count += 1;
|
||||
entity
|
||||
}
|
||||
};
|
||||
|
||||
let num_components = entity_state.components.len();
|
||||
|
||||
// Apply all components
|
||||
@@ -261,12 +291,31 @@ pub fn apply_full_state(
|
||||
}
|
||||
|
||||
debug!(
|
||||
"Spawned entity {:?} from FullState with {} components",
|
||||
"Applied entity {:?} from FullState with {} components",
|
||||
entity_state.entity_id, num_components
|
||||
);
|
||||
}
|
||||
|
||||
info!("FullState applied successfully");
|
||||
info!(
|
||||
"FullState applied successfully: spawned {} entities, skipped {} tombstones",
|
||||
spawned_count, tombstoned_count
|
||||
);
|
||||
|
||||
// Send SyncRequest to catch any deltas that arrived during FullState transfer
|
||||
// This implements the "Final Sync" step from RFC 0004 (Session Lifecycle)
|
||||
if let Some(bridge) = world.get_resource::<GossipBridge>() {
|
||||
let node_clock = world.resource::<NodeVectorClock>();
|
||||
let request = crate::networking::operation_log::build_sync_request(
|
||||
node_clock.node_id,
|
||||
node_clock.clock.clone(),
|
||||
);
|
||||
|
||||
if let Err(e) = bridge.send(request) {
|
||||
error!("Failed to send post-FullState SyncRequest: {}", e);
|
||||
} else {
|
||||
info!("Sent SyncRequest to catch deltas that arrived during FullState transfer");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// System to handle JoinRequest messages
|
||||
|
||||
@@ -64,8 +64,32 @@ pub fn message_dispatcher_system(world: &mut World) {
|
||||
bridge.drain_incoming()
|
||||
};
|
||||
|
||||
if !messages.is_empty() {
|
||||
let node_id = world.resource::<GossipBridge>().node_id;
|
||||
info!(
|
||||
"[message_dispatcher] Node {} processing {} messages",
|
||||
node_id,
|
||||
messages.len()
|
||||
);
|
||||
}
|
||||
|
||||
// Dispatch each message (bridge is no longer borrowed)
|
||||
for message in messages {
|
||||
let node_id = world.resource::<GossipBridge>().node_id;
|
||||
let msg_type = match &message.message {
|
||||
SyncMessage::EntityDelta { entity_id, .. } => format!("EntityDelta({})", entity_id),
|
||||
SyncMessage::JoinRequest { node_id, .. } => format!("JoinRequest({})", node_id),
|
||||
SyncMessage::FullState { entities, .. } => format!("FullState({} entities)", entities.len()),
|
||||
SyncMessage::SyncRequest { node_id, .. } => format!("SyncRequest({})", node_id),
|
||||
SyncMessage::MissingDeltas { deltas } => format!("MissingDeltas({} ops)", deltas.len()),
|
||||
SyncMessage::Lock(_) => "Lock".to_string(),
|
||||
};
|
||||
|
||||
debug!(
|
||||
"[message_dispatcher] Node {} dispatching: {} (nonce: {})",
|
||||
node_id, msg_type, message.nonce
|
||||
);
|
||||
|
||||
dispatch_message(world, message);
|
||||
}
|
||||
|
||||
@@ -256,6 +280,18 @@ fn dispatch_message(world: &mut World, message: crate::networking::VersionedMess
|
||||
} => {
|
||||
debug!("Received SyncRequest from node {}", requesting_node);
|
||||
|
||||
// Merge the requesting node's vector clock into ours
|
||||
// This ensures we learn about their latest sequence number
|
||||
{
|
||||
let mut node_clock = world.resource_mut::<NodeVectorClock>();
|
||||
node_clock.clock.merge(&their_clock);
|
||||
debug!(
|
||||
"Merged SyncRequest clock from node {} (seq: {})",
|
||||
requesting_node,
|
||||
their_clock.get(requesting_node)
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(op_log) = world.get_resource::<OperationLog>() {
|
||||
// Find operations they're missing
|
||||
let missing_deltas = op_log.get_all_operations_newer_than(&their_clock);
|
||||
@@ -480,8 +516,10 @@ fn build_full_state_from_data(
|
||||
}
|
||||
|
||||
info!(
|
||||
"Built FullState with {} entities for new peer",
|
||||
entities.len()
|
||||
"Built FullState with {} entities ({} total queried, {} tombstoned) for new peer",
|
||||
entities.len(),
|
||||
networked_entities.len(),
|
||||
networked_entities.len() - entities.len()
|
||||
);
|
||||
|
||||
crate::networking::VersionedMessage::new(SyncMessage::FullState {
|
||||
|
||||
@@ -27,12 +27,13 @@ pub struct VersionedMessage {
|
||||
/// The actual sync message
|
||||
pub message: SyncMessage,
|
||||
|
||||
/// Timestamp (nanos since UNIX epoch) to make messages unique
|
||||
/// Nonce for selective deduplication control
|
||||
///
|
||||
/// This prevents iroh-gossip from deduplicating identical messages sent at different times.
|
||||
/// For example, releasing and re-acquiring a lock sends identical LockRequest messages,
|
||||
/// but they need to be treated as separate events.
|
||||
pub timestamp_nanos: u64,
|
||||
/// - For Lock messages: Unique nonce (counter + timestamp hash) to prevent
|
||||
/// iroh-gossip deduplication, allowing repeated heartbeats.
|
||||
/// - For other messages: Constant nonce (0) to enable content-based deduplication
|
||||
/// by iroh-gossip, preventing feedback loops.
|
||||
pub nonce: u32,
|
||||
}
|
||||
|
||||
impl VersionedMessage {
|
||||
@@ -40,18 +41,44 @@ impl VersionedMessage {
|
||||
pub const CURRENT_VERSION: u32 = 1;
|
||||
|
||||
/// Create a new versioned message with the current protocol version
|
||||
///
|
||||
/// For Lock messages: Generates a unique nonce to prevent deduplication, since
|
||||
/// lock heartbeats need to be sent repeatedly even with identical content.
|
||||
///
|
||||
/// For other messages: Uses a constant nonce (0) to enable iroh-gossip's
|
||||
/// content-based deduplication. This prevents feedback loops where the same
|
||||
/// EntityDelta gets broadcast repeatedly.
|
||||
pub fn new(message: SyncMessage) -> Self {
|
||||
// Only generate unique nonces for Lock messages (heartbeats need to bypass dedup)
|
||||
let nonce = if matches!(message, SyncMessage::Lock(_)) {
|
||||
use std::hash::Hasher;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
let timestamp_nanos = SystemTime::now()
|
||||
// Per-node rolling counter for sequential uniqueness
|
||||
static COUNTER: AtomicU32 = AtomicU32::new(0);
|
||||
let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// Millisecond timestamp for temporal uniqueness
|
||||
let timestamp_millis = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_nanos() as u64;
|
||||
.as_millis() as u32;
|
||||
|
||||
// Hash counter + timestamp for final nonce
|
||||
let mut hasher = rustc_hash::FxHasher::default();
|
||||
hasher.write_u32(counter);
|
||||
hasher.write_u32(timestamp_millis);
|
||||
hasher.finish() as u32
|
||||
} else {
|
||||
// Use constant nonce for all other messages to enable content deduplication
|
||||
0
|
||||
};
|
||||
|
||||
Self {
|
||||
version: Self::CURRENT_VERSION,
|
||||
message,
|
||||
timestamp_nanos,
|
||||
nonce,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,6 +54,7 @@ mod plugin;
|
||||
mod rga;
|
||||
mod session;
|
||||
mod session_lifecycle;
|
||||
mod session_sync;
|
||||
mod sync_component;
|
||||
mod tombstones;
|
||||
mod vector_clock;
|
||||
@@ -81,6 +82,7 @@ pub use plugin::*;
|
||||
pub use rga::*;
|
||||
pub use session::*;
|
||||
pub use session_lifecycle::*;
|
||||
pub use session_sync::*;
|
||||
pub use sync_component::*;
|
||||
pub use tombstones::*;
|
||||
pub use vector_clock::*;
|
||||
|
||||
@@ -67,13 +67,12 @@ pub fn build_entity_operations(
|
||||
};
|
||||
|
||||
// Build the operation
|
||||
let mut clock = vector_clock.clone();
|
||||
clock.increment(node_id);
|
||||
|
||||
// Use the vector_clock as-is - it's already been incremented by the caller (delta_generation.rs:116)
|
||||
// All operations in the same EntityDelta share the same vector clock (same logical timestamp)
|
||||
operations.push(ComponentOp::Set {
|
||||
discriminant,
|
||||
data,
|
||||
vector_clock: clock.clone(),
|
||||
vector_clock: vector_clock.clone(),
|
||||
});
|
||||
|
||||
debug!(" ✓ Added Set operation for discriminant {}", discriminant);
|
||||
@@ -86,3 +85,67 @@ pub fn build_entity_operations(
|
||||
);
|
||||
operations
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use bevy::prelude::*;
|
||||
use crate::networking::NetworkedEntity;
|
||||
use crate::persistence::{ComponentTypeRegistry, Persisted};
|
||||
|
||||
#[test]
|
||||
fn test_operations_use_passed_vector_clock_without_extra_increment() {
|
||||
// Setup: Create a minimal world with an entity
|
||||
let mut world = World::new();
|
||||
let node_id = uuid::Uuid::new_v4();
|
||||
|
||||
// Use the global registry (Transform is already registered via inventory)
|
||||
let registry = ComponentTypeRegistry::init();
|
||||
|
||||
// Create test entity with Transform
|
||||
let entity_id = uuid::Uuid::new_v4();
|
||||
let entity = world.spawn((
|
||||
NetworkedEntity::with_id(entity_id, node_id),
|
||||
Persisted::with_id(entity_id),
|
||||
Transform::from_xyz(1.0, 2.0, 3.0),
|
||||
)).id();
|
||||
|
||||
// Create a vector clock that's already been ticked
|
||||
let mut vector_clock = VectorClock::new();
|
||||
vector_clock.increment(node_id); // Simulate the tick that delta_generation does
|
||||
let expected_clock = vector_clock.clone();
|
||||
|
||||
// Build operations
|
||||
let operations = build_entity_operations(
|
||||
entity,
|
||||
&world,
|
||||
node_id,
|
||||
vector_clock.clone(),
|
||||
®istry,
|
||||
None,
|
||||
);
|
||||
|
||||
// Verify: All operations should use the EXACT clock that was passed in
|
||||
assert!(!operations.is_empty(), "Should have created at least one operation");
|
||||
|
||||
for op in &operations {
|
||||
if let ComponentOp::Set { vector_clock: op_clock, .. } = op {
|
||||
assert_eq!(
|
||||
*op_clock, expected_clock,
|
||||
"Operation clock should match the input clock exactly. \
|
||||
The bug was that operation_builder would increment the clock again, \
|
||||
causing EntityDelta.vector_clock and ComponentOp.vector_clock to be misaligned."
|
||||
);
|
||||
|
||||
// Verify the sequence number matches
|
||||
let op_seq = op_clock.get(node_id);
|
||||
let expected_seq = expected_clock.get(node_id);
|
||||
assert_eq!(
|
||||
op_seq, expected_seq,
|
||||
"Operation sequence should be {} (same as input clock), but got {}",
|
||||
expected_seq, op_seq
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -351,7 +351,7 @@ pub fn handle_missing_deltas_system(world: &mut World) {
|
||||
/// adaptive sync intervals based on network conditions.
|
||||
pub fn periodic_sync_system(
|
||||
bridge: Option<Res<GossipBridge>>,
|
||||
node_clock: Res<NodeVectorClock>,
|
||||
mut node_clock: ResMut<NodeVectorClock>,
|
||||
time: Res<Time>,
|
||||
mut last_sync: Local<f32>,
|
||||
) {
|
||||
@@ -369,6 +369,9 @@ pub fn periodic_sync_system(
|
||||
|
||||
debug!("Sending periodic SyncRequest for anti-entropy");
|
||||
|
||||
// Increment clock for sending SyncRequest (this is a local operation)
|
||||
node_clock.tick();
|
||||
|
||||
let request = build_sync_request(node_clock.node_id, node_clock.clock.clone());
|
||||
if let Err(e) = bridge.send(request) {
|
||||
error!("Failed to send SyncRequest: {}", e);
|
||||
|
||||
@@ -37,6 +37,7 @@ use crate::networking::{
|
||||
components::{NetworkedEntity, NetworkedTransform},
|
||||
delta_generation::{
|
||||
NodeVectorClock,
|
||||
cleanup_skip_delta_markers_system,
|
||||
generate_delta_system,
|
||||
},
|
||||
entity_map::{
|
||||
@@ -53,6 +54,10 @@ use crate::networking::{
|
||||
release_locks_on_deselection_system,
|
||||
},
|
||||
message_dispatcher::message_dispatcher_system,
|
||||
messages::{
|
||||
SyncMessage,
|
||||
VersionedMessage,
|
||||
},
|
||||
operation_log::{
|
||||
OperationLog,
|
||||
periodic_sync_system,
|
||||
@@ -62,13 +67,21 @@ use crate::networking::{
|
||||
initialize_session_system,
|
||||
save_session_on_shutdown_system,
|
||||
},
|
||||
session_sync::{
|
||||
JoinRequestSent,
|
||||
send_join_request_once_system,
|
||||
transition_session_state_system,
|
||||
},
|
||||
sync_component::Synced,
|
||||
tombstones::{
|
||||
TombstoneRegistry,
|
||||
garbage_collect_tombstones_system,
|
||||
handle_local_deletions_system,
|
||||
},
|
||||
vector_clock::NodeId,
|
||||
vector_clock::{
|
||||
NodeId,
|
||||
VectorClock,
|
||||
},
|
||||
};
|
||||
|
||||
/// Configuration for the networking plugin
|
||||
@@ -179,7 +192,17 @@ fn auto_insert_sync_components(
|
||||
entity_commands.insert(NetworkedTransform);
|
||||
}
|
||||
|
||||
debug!("Auto-inserted sync components for entity {:?} (UUID: {})", entity, entity_id);
|
||||
info!(
|
||||
"[auto_insert_sync] Entity {:?} → NetworkedEntity({}), Persisted, {} auto-added",
|
||||
entity,
|
||||
entity_id,
|
||||
if transforms.contains(entity) { "NetworkedTransform" } else { "no transform" }
|
||||
);
|
||||
}
|
||||
|
||||
let count = query.iter().count();
|
||||
if count > 0 {
|
||||
debug!("[auto_insert_sync] Processed {} newly synced entities this frame", count);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -215,7 +238,7 @@ fn auto_insert_networked_transform(
|
||||
fn trigger_sync_on_connect(
|
||||
mut has_synced: Local<bool>,
|
||||
bridge: Res<GossipBridge>,
|
||||
node_clock: Res<NodeVectorClock>,
|
||||
mut node_clock: ResMut<NodeVectorClock>,
|
||||
operation_log: Res<OperationLog>,
|
||||
) {
|
||||
if *has_synced {
|
||||
@@ -224,12 +247,39 @@ fn trigger_sync_on_connect(
|
||||
|
||||
let op_count = operation_log.total_operations();
|
||||
debug!(
|
||||
"Going online: triggering anti-entropy sync to broadcast {} offline operations",
|
||||
"Going online: broadcasting {} offline operations to peers",
|
||||
op_count
|
||||
);
|
||||
|
||||
// Send a SyncRequest to trigger anti-entropy
|
||||
// This will cause the message_dispatcher to respond with all operations from our log
|
||||
// Broadcast all our stored operations to peers
|
||||
// Use an empty vector clock to get ALL operations (not just newer ones)
|
||||
let all_operations = operation_log.get_all_operations_newer_than(&VectorClock::new());
|
||||
|
||||
for delta in all_operations {
|
||||
// Wrap in VersionedMessage
|
||||
let message = VersionedMessage::new(SyncMessage::EntityDelta {
|
||||
entity_id: delta.entity_id,
|
||||
node_id: delta.node_id,
|
||||
vector_clock: delta.vector_clock.clone(),
|
||||
operations: delta.operations.clone(),
|
||||
});
|
||||
|
||||
// Broadcast to peers
|
||||
if let Err(e) = bridge.send(message) {
|
||||
error!("Failed to broadcast offline EntityDelta: {}", e);
|
||||
} else {
|
||||
debug!(
|
||||
"Broadcast offline EntityDelta for entity {:?} with {} operations",
|
||||
delta.entity_id,
|
||||
delta.operations.len()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Also send a SyncRequest to get any operations we're missing from peers
|
||||
// Increment clock for sending SyncRequest (this is a local operation)
|
||||
node_clock.tick();
|
||||
|
||||
let request = crate::networking::operation_log::build_sync_request(
|
||||
node_clock.node_id,
|
||||
node_clock.clock.clone(),
|
||||
@@ -238,7 +288,7 @@ fn trigger_sync_on_connect(
|
||||
if let Err(e) = bridge.send(request) {
|
||||
error!("Failed to send SyncRequest on connect: {}", e);
|
||||
} else {
|
||||
debug!("Sent SyncRequest to trigger anti-entropy sync");
|
||||
debug!("Sent SyncRequest to get missing operations from peers");
|
||||
}
|
||||
|
||||
*has_synced = true;
|
||||
@@ -267,7 +317,10 @@ fn trigger_sync_on_connect(
|
||||
/// ## Update
|
||||
/// - Auto-detect Transform changes
|
||||
/// - Handle local entity deletions
|
||||
/// - Acquire locks when entities are selected
|
||||
/// - Release locks when entities are deselected
|
||||
/// - Send JoinRequest when networking starts (one-shot)
|
||||
/// - Transition session state (Joining → Active)
|
||||
///
|
||||
/// ## PostUpdate
|
||||
/// - Generate and broadcast EntityDelta for changed entities
|
||||
@@ -289,6 +342,7 @@ fn trigger_sync_on_connect(
|
||||
/// - `OperationLog` - Operation log for anti-entropy
|
||||
/// - `TombstoneRegistry` - Tombstone tracking for deletions
|
||||
/// - `EntityLockRegistry` - Entity lock registry with heartbeat tracking
|
||||
/// - `JoinRequestSent` - Tracks if JoinRequest has been sent (session sync)
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
@@ -338,6 +392,7 @@ impl Plugin for NetworkingPlugin {
|
||||
.insert_resource(OperationLog::new())
|
||||
.insert_resource(TombstoneRegistry::new())
|
||||
.insert_resource(EntityLockRegistry::new())
|
||||
.insert_resource(JoinRequestSent::default())
|
||||
.insert_resource(crate::networking::ComponentVectorClocks::new())
|
||||
.insert_resource(crate::networking::LocalSelection::new());
|
||||
|
||||
@@ -374,6 +429,10 @@ impl Plugin for NetworkingPlugin {
|
||||
acquire_locks_on_selection_system,
|
||||
// Release locks when entities are deselected
|
||||
release_locks_on_deselection_system,
|
||||
// Session sync: send JoinRequest when networking starts
|
||||
send_join_request_once_system,
|
||||
// Session sync: transition session state based on sync completion
|
||||
transition_session_state_system,
|
||||
),
|
||||
);
|
||||
|
||||
@@ -388,8 +447,10 @@ impl Plugin for NetworkingPlugin {
|
||||
app.add_systems(
|
||||
PostUpdate,
|
||||
(
|
||||
// Generate deltas for changed entities
|
||||
generate_delta_system,
|
||||
// Generate deltas for changed entities, then cleanup markers
|
||||
// CRITICAL: cleanup_skip_delta_markers_system must run immediately after
|
||||
// generate_delta_system to remove SkipNextDeltaGeneration markers
|
||||
(generate_delta_system, cleanup_skip_delta_markers_system).chain(),
|
||||
// Periodic anti-entropy sync
|
||||
periodic_sync_system,
|
||||
// Maintenance tasks
|
||||
|
||||
399
crates/libmarathon/src/networking/session_sync.rs
Normal file
399
crates/libmarathon/src/networking/session_sync.rs
Normal file
@@ -0,0 +1,399 @@
|
||||
//! Session synchronization systems
|
||||
//!
|
||||
//! This module handles automatic session lifecycle:
|
||||
//! - Sending JoinRequest when networking starts
|
||||
//! - Transitioning session state when receiving FullState
|
||||
//! - Persisting session state changes
|
||||
|
||||
use bevy::prelude::*;
|
||||
|
||||
use crate::networking::{
|
||||
CurrentSession,
|
||||
GossipBridge,
|
||||
JoinType,
|
||||
NodeVectorClock,
|
||||
SessionState,
|
||||
build_join_request,
|
||||
plugin::SessionSecret,
|
||||
};
|
||||
|
||||
/// System to send JoinRequest when networking comes online
|
||||
///
|
||||
/// This system detects when GossipBridge is added and sends a JoinRequest
|
||||
/// to discover peers and sync state. It only runs once when networking starts.
|
||||
///
|
||||
/// Add to your app as a Startup system AFTER GossipBridge is created:
|
||||
/// ```no_run
|
||||
/// use bevy::prelude::*;
|
||||
/// use libmarathon::networking::send_join_request_on_connect_system;
|
||||
///
|
||||
/// App::new()
|
||||
/// .add_systems(Update, send_join_request_on_connect_system);
|
||||
/// ```
|
||||
pub fn send_join_request_on_connect_system(
|
||||
current_session: ResMut<CurrentSession>,
|
||||
node_clock: Res<NodeVectorClock>,
|
||||
bridge: Option<Res<GossipBridge>>,
|
||||
session_secret: Option<Res<SessionSecret>>,
|
||||
) {
|
||||
// Only run when bridge exists and session is in Joining state
|
||||
let Some(bridge) = bridge else {
|
||||
return;
|
||||
};
|
||||
|
||||
// Only send JoinRequest when in Joining state
|
||||
if current_session.session.state != SessionState::Joining {
|
||||
return;
|
||||
}
|
||||
|
||||
let node_id = node_clock.node_id;
|
||||
let session_id = current_session.session.id.clone();
|
||||
|
||||
// Determine join type based on whether we have a last known clock
|
||||
let join_type = if current_session.last_known_clock.node_count() > 0 {
|
||||
// Rejoin - we have a previous clock snapshot
|
||||
JoinType::Rejoin {
|
||||
last_active: current_session.session.last_active,
|
||||
entity_count: current_session.session.entity_count,
|
||||
}
|
||||
} else {
|
||||
// Fresh join - no previous state
|
||||
JoinType::Fresh
|
||||
};
|
||||
|
||||
// Get session secret if configured
|
||||
let secret = session_secret.as_ref().map(|s| bytes::Bytes::from(s.as_bytes().to_vec()));
|
||||
|
||||
// Build JoinRequest
|
||||
let last_known_clock = if current_session.last_known_clock.node_count() > 0 {
|
||||
Some(current_session.last_known_clock.clone())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let request = build_join_request(
|
||||
node_id,
|
||||
session_id.clone(),
|
||||
secret,
|
||||
last_known_clock,
|
||||
join_type.clone(),
|
||||
);
|
||||
|
||||
// Send JoinRequest
|
||||
match bridge.send(request) {
|
||||
Ok(()) => {
|
||||
info!(
|
||||
"Sent JoinRequest for session {} (type: {:?})",
|
||||
session_id.to_code(),
|
||||
join_type
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to send JoinRequest: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// System to transition session to Active when sync completes
|
||||
///
|
||||
/// This system monitors for session state changes and handles transitions:
|
||||
/// - Joining → Active: When we receive FullState or initial sync completes
|
||||
///
|
||||
/// This is an exclusive system to allow world queries.
|
||||
///
|
||||
/// Add to your app as an Update system:
|
||||
/// ```no_run
|
||||
/// use bevy::prelude::*;
|
||||
/// use libmarathon::networking::transition_session_state_system;
|
||||
///
|
||||
/// App::new()
|
||||
/// .add_systems(Update, transition_session_state_system);
|
||||
/// ```
|
||||
pub fn transition_session_state_system(world: &mut World) {
|
||||
// Only process state transitions when we have networking
|
||||
if world.get_resource::<GossipBridge>().is_none() {
|
||||
return;
|
||||
}
|
||||
|
||||
// Get values we need (clone to avoid holding references)
|
||||
let (session_state, session_id, join_request_sent, clock_node_count) = {
|
||||
let Some(current_session) = world.get_resource::<CurrentSession>() else {
|
||||
return;
|
||||
};
|
||||
|
||||
let Some(node_clock) = world.get_resource::<NodeVectorClock>() else {
|
||||
return;
|
||||
};
|
||||
|
||||
let Some(join_sent) = world.get_resource::<JoinRequestSent>() else {
|
||||
return;
|
||||
};
|
||||
|
||||
(
|
||||
current_session.session.state,
|
||||
current_session.session.id.clone(),
|
||||
join_sent.sent,
|
||||
node_clock.clock.node_count(),
|
||||
)
|
||||
};
|
||||
|
||||
// Use a non-send resource for the timer to ensure it's stored per-world
|
||||
#[derive(Default, Resource)]
|
||||
struct JoinTimer(Option<std::time::Instant>);
|
||||
|
||||
if !world.contains_resource::<JoinTimer>() {
|
||||
world.insert_resource(JoinTimer::default());
|
||||
}
|
||||
|
||||
match session_state {
|
||||
SessionState::Joining => {
|
||||
// Start timer when JoinRequest is sent
|
||||
{
|
||||
let mut timer = world.resource_mut::<JoinTimer>();
|
||||
if join_request_sent && timer.0.is_none() {
|
||||
timer.0 = Some(std::time::Instant::now());
|
||||
debug!("Started join timer - will transition to Active after timeout if no peers respond");
|
||||
}
|
||||
}
|
||||
|
||||
// Count entities in world
|
||||
let entity_count = world
|
||||
.query::<&crate::networking::NetworkedEntity>()
|
||||
.iter(world)
|
||||
.count();
|
||||
|
||||
// Transition to Active if:
|
||||
// 1. We have received entities (entity_count > 0) AND have multiple nodes in clock
|
||||
// This ensures FullState was received and applied, OR
|
||||
// 2. We've waited 3 seconds and either:
|
||||
// a) We have entities (sync completed), OR
|
||||
// b) No entities exist yet (we're the first node in session)
|
||||
let should_transition = if entity_count > 0 && clock_node_count > 1 {
|
||||
// We've received and applied FullState with entities
|
||||
info!(
|
||||
"Session {} transitioning to Active (received {} entities from {} peers)",
|
||||
session_id.to_code(),
|
||||
entity_count,
|
||||
clock_node_count - 1
|
||||
);
|
||||
true
|
||||
} else {
|
||||
let timer = world.resource::<JoinTimer>();
|
||||
if let Some(start_time) = timer.0 {
|
||||
// Check if 3 seconds have passed since JoinRequest
|
||||
if start_time.elapsed().as_secs() >= 3 {
|
||||
if entity_count > 0 {
|
||||
info!(
|
||||
"Session {} transitioning to Active (timeout reached, have {} entities)",
|
||||
session_id.to_code(),
|
||||
entity_count
|
||||
);
|
||||
} else {
|
||||
info!(
|
||||
"Session {} transitioning to Active (timeout - no peers or empty session, first node)",
|
||||
session_id.to_code()
|
||||
);
|
||||
}
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
};
|
||||
|
||||
if should_transition {
|
||||
let mut current_session = world.resource_mut::<CurrentSession>();
|
||||
current_session.transition_to(SessionState::Active);
|
||||
|
||||
// Reset timer
|
||||
let mut timer = world.resource_mut::<JoinTimer>();
|
||||
timer.0 = None;
|
||||
}
|
||||
}
|
||||
SessionState::Active => {
|
||||
// Already active, reset timer
|
||||
if let Some(mut timer) = world.get_resource_mut::<JoinTimer>() {
|
||||
timer.0 = None;
|
||||
}
|
||||
}
|
||||
SessionState::Disconnected => {
|
||||
// If we reconnected (bridge exists), transition to Joining
|
||||
// This is handled by the networking startup logic
|
||||
if let Some(mut timer) = world.get_resource_mut::<JoinTimer>() {
|
||||
timer.0 = None;
|
||||
}
|
||||
}
|
||||
SessionState::Created | SessionState::Left => {
|
||||
// Should not be in these states when networking is active
|
||||
if let Some(mut timer) = world.get_resource_mut::<JoinTimer>() {
|
||||
timer.0 = None;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Resource to track if we've sent the initial JoinRequest
|
||||
///
|
||||
/// This prevents sending multiple JoinRequests on subsequent frame updates
|
||||
#[derive(Resource)]
|
||||
pub struct JoinRequestSent {
|
||||
pub sent: bool,
|
||||
/// Timer to wait for peers before sending JoinRequest
|
||||
/// If no peers connect after 1 second, send anyway (we're first node)
|
||||
pub wait_started: Option<std::time::Instant>,
|
||||
}
|
||||
|
||||
impl Default for JoinRequestSent {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
sent: false,
|
||||
wait_started: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// One-shot system to send JoinRequest only once when networking starts
|
||||
///
|
||||
/// CRITICAL: Waits for at least one peer to connect via pkarr+DHT before sending
|
||||
/// JoinRequest. This prevents broadcasting to an empty network.
|
||||
///
|
||||
/// Timing:
|
||||
/// - If peers connect: Send JoinRequest immediately (they'll receive it)
|
||||
/// - If no peers after 1 second: Send anyway (we're probably first node in session)
|
||||
pub fn send_join_request_once_system(
|
||||
mut join_sent: ResMut<JoinRequestSent>,
|
||||
current_session: ResMut<CurrentSession>,
|
||||
node_clock: Res<NodeVectorClock>,
|
||||
bridge: Option<Res<GossipBridge>>,
|
||||
session_secret: Option<Res<SessionSecret>>,
|
||||
) {
|
||||
// Skip if already sent
|
||||
if join_sent.sent {
|
||||
return;
|
||||
}
|
||||
|
||||
// Only run when bridge exists and session is in Joining state
|
||||
let Some(bridge) = bridge else {
|
||||
return;
|
||||
};
|
||||
|
||||
if current_session.session.state != SessionState::Joining {
|
||||
return;
|
||||
}
|
||||
|
||||
// Start wait timer when conditions are met
|
||||
if join_sent.wait_started.is_none() {
|
||||
join_sent.wait_started = Some(std::time::Instant::now());
|
||||
debug!("Started waiting for peers before sending JoinRequest (max 1 second)");
|
||||
}
|
||||
|
||||
// Check if we have any peers connected (node_count > 1 means we + at least 1 peer)
|
||||
let peer_count = node_clock.clock.node_count().saturating_sub(1);
|
||||
let wait_elapsed = join_sent.wait_started.unwrap().elapsed();
|
||||
|
||||
// Send JoinRequest if:
|
||||
// 1. At least one peer has connected (they'll receive our JoinRequest), OR
|
||||
// 2. We've waited 1 second with no peers (we're probably the first node)
|
||||
let should_send = if peer_count > 0 {
|
||||
debug!(
|
||||
"Sending JoinRequest now - {} peer(s) connected (waited {:?})",
|
||||
peer_count, wait_elapsed
|
||||
);
|
||||
true
|
||||
} else if wait_elapsed.as_millis() >= 1000 {
|
||||
debug!(
|
||||
"Sending JoinRequest after timeout - no peers connected, assuming first node (waited {:?})",
|
||||
wait_elapsed
|
||||
);
|
||||
true
|
||||
} else {
|
||||
// Still waiting for peers
|
||||
false
|
||||
};
|
||||
|
||||
if !should_send {
|
||||
return;
|
||||
}
|
||||
|
||||
let node_id = node_clock.node_id;
|
||||
let session_id = current_session.session.id.clone();
|
||||
|
||||
// Determine join type
|
||||
let join_type = if current_session.last_known_clock.node_count() > 0 {
|
||||
JoinType::Rejoin {
|
||||
last_active: current_session.session.last_active,
|
||||
entity_count: current_session.session.entity_count,
|
||||
}
|
||||
} else {
|
||||
JoinType::Fresh
|
||||
};
|
||||
|
||||
// Get session secret if configured
|
||||
let secret = session_secret.as_ref().map(|s| bytes::Bytes::from(s.as_bytes().to_vec()));
|
||||
|
||||
// Build JoinRequest
|
||||
let last_known_clock = if current_session.last_known_clock.node_count() > 0 {
|
||||
Some(current_session.last_known_clock.clone())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let request = build_join_request(
|
||||
node_id,
|
||||
session_id.clone(),
|
||||
secret,
|
||||
last_known_clock,
|
||||
join_type.clone(),
|
||||
);
|
||||
|
||||
// Send JoinRequest
|
||||
match bridge.send(request) {
|
||||
Ok(()) => {
|
||||
info!(
|
||||
"Sent JoinRequest for session {} (type: {:?})",
|
||||
session_id.to_code(),
|
||||
join_type
|
||||
);
|
||||
join_sent.sent = true;
|
||||
|
||||
// Transition to Active immediately if we're the first node
|
||||
// (Otherwise we'll wait for FullState)
|
||||
// Actually, let's always wait a bit for potential peers
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to send JoinRequest: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::networking::{Session, SessionId, VectorClock};
|
||||
|
||||
#[test]
|
||||
fn test_join_request_sent_tracking() {
|
||||
let mut sent = JoinRequestSent::default();
|
||||
assert!(!sent.sent);
|
||||
|
||||
sent.sent = true;
|
||||
assert!(sent.sent);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_session_state_transitions() {
|
||||
let session_id = SessionId::new();
|
||||
let session = Session::new(session_id);
|
||||
let mut current = CurrentSession::new(session, VectorClock::new());
|
||||
|
||||
assert_eq!(current.session.state, SessionState::Created);
|
||||
|
||||
current.transition_to(SessionState::Joining);
|
||||
assert_eq!(current.session.state, SessionState::Joining);
|
||||
|
||||
current.transition_to(SessionState::Active);
|
||||
assert_eq!(current.session.state, SessionState::Active);
|
||||
}
|
||||
}
|
||||
@@ -219,6 +219,7 @@ pub fn handle_local_deletions_system(
|
||||
mut tombstone_registry: ResMut<TombstoneRegistry>,
|
||||
mut operation_log: Option<ResMut<crate::networking::OperationLog>>,
|
||||
bridge: Option<Res<GossipBridge>>,
|
||||
mut write_buffer: Option<ResMut<crate::persistence::WriteBufferResource>>,
|
||||
) {
|
||||
for (entity, networked) in query.iter() {
|
||||
// Increment clock for deletion
|
||||
@@ -231,13 +232,34 @@ pub fn handle_local_deletions_system(
|
||||
)
|
||||
.delete();
|
||||
|
||||
// Record tombstone
|
||||
// Record tombstone in memory
|
||||
tombstone_registry.record_deletion(
|
||||
networked.network_id,
|
||||
node_clock.node_id,
|
||||
node_clock.clock.clone(),
|
||||
);
|
||||
|
||||
// Persist tombstone to database
|
||||
if let Some(ref mut buffer) = write_buffer {
|
||||
// Serialize the vector clock using rkyv
|
||||
match rkyv::to_bytes::<rkyv::rancor::Failure>(&node_clock.clock).map(|b| b.to_vec()) {
|
||||
Ok(clock_bytes) => {
|
||||
if let Err(e) = buffer.add(crate::persistence::PersistenceOp::RecordTombstone {
|
||||
entity_id: networked.network_id,
|
||||
deleting_node: node_clock.node_id,
|
||||
deletion_clock: bytes::Bytes::from(clock_bytes),
|
||||
}) {
|
||||
error!("Failed to persist tombstone for entity {:?}: {}", networked.network_id, e);
|
||||
} else {
|
||||
debug!("Persisted tombstone for entity {:?} to database", networked.network_id);
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
error!("Failed to serialize vector clock for tombstone persistence: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create EntityDelta with Delete operation
|
||||
let delta = crate::networking::EntityDelta::new(
|
||||
networked.network_id,
|
||||
|
||||
@@ -253,6 +253,24 @@ pub fn flush_to_sqlite(ops: &[PersistenceOp], conn: &mut Connection) -> Result<u
|
||||
)?;
|
||||
count += 1;
|
||||
},
|
||||
|
||||
| PersistenceOp::RecordTombstone {
|
||||
entity_id,
|
||||
deleting_node,
|
||||
deletion_clock,
|
||||
} => {
|
||||
tx.execute(
|
||||
"INSERT OR REPLACE INTO tombstones (entity_id, deleting_node, deletion_clock, created_at)
|
||||
VALUES (?1, ?2, ?3, ?4)",
|
||||
rusqlite::params![
|
||||
entity_id.as_bytes(),
|
||||
&deleting_node.to_string(),
|
||||
deletion_clock.as_ref(),
|
||||
current_timestamp(),
|
||||
],
|
||||
)?;
|
||||
count += 1;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -977,6 +995,117 @@ pub fn rehydrate_all_entities(world: &mut bevy::prelude::World) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Load all tombstones from the database into the TombstoneRegistry
|
||||
///
|
||||
/// This function is called during startup to restore deletion tombstones
|
||||
/// from the database, preventing resurrection of deleted entities after
|
||||
/// application restart.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `world` - The Bevy world containing the TombstoneRegistry resource
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// Returns an error if:
|
||||
/// - Database connection fails
|
||||
/// - Tombstone loading fails
|
||||
/// - Vector clock deserialization fails
|
||||
pub fn load_tombstones(world: &mut bevy::prelude::World) -> Result<()> {
|
||||
use bevy::prelude::*;
|
||||
|
||||
// Get database connection and load tombstones
|
||||
let tombstone_rows = {
|
||||
let db_res = world.resource::<crate::persistence::PersistenceDb>();
|
||||
let conn = db_res
|
||||
.conn
|
||||
.lock()
|
||||
.map_err(|e| PersistenceError::Other(format!("Failed to lock database: {}", e)))?;
|
||||
|
||||
// Load all tombstones from database
|
||||
let mut stmt = conn.prepare(
|
||||
"SELECT entity_id, deleting_node, deletion_clock, created_at
|
||||
FROM tombstones
|
||||
ORDER BY created_at ASC",
|
||||
)?;
|
||||
|
||||
let rows = stmt.query_map([], |row| {
|
||||
let entity_id_bytes: std::borrow::Cow<'_, [u8]> = row.get(0)?;
|
||||
let mut entity_id_array = [0u8; 16];
|
||||
entity_id_array.copy_from_slice(&entity_id_bytes);
|
||||
let entity_id = uuid::Uuid::from_bytes(entity_id_array);
|
||||
|
||||
let deleting_node_str: String = row.get(1)?;
|
||||
let deletion_clock_bytes: std::borrow::Cow<'_, [u8]> = row.get(2)?;
|
||||
let created_at_ts: i64 = row.get(3)?;
|
||||
|
||||
Ok((
|
||||
entity_id,
|
||||
deleting_node_str,
|
||||
deletion_clock_bytes.to_vec(),
|
||||
created_at_ts,
|
||||
))
|
||||
})?;
|
||||
|
||||
rows.collect::<std::result::Result<Vec<_>, _>>()?
|
||||
};
|
||||
|
||||
info!("Loaded {} tombstones from database", tombstone_rows.len());
|
||||
|
||||
if tombstone_rows.is_empty() {
|
||||
info!("No tombstones to restore");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Restore tombstones into TombstoneRegistry
|
||||
let mut loaded_count = 0;
|
||||
let mut failed_count = 0;
|
||||
|
||||
{
|
||||
let mut tombstone_registry = world.resource_mut::<crate::networking::TombstoneRegistry>();
|
||||
|
||||
for (entity_id, deleting_node_str, deletion_clock_bytes, _created_at_ts) in tombstone_rows {
|
||||
// Parse node ID
|
||||
let deleting_node = match uuid::Uuid::parse_str(&deleting_node_str) {
|
||||
Ok(id) => id,
|
||||
Err(e) => {
|
||||
error!("Failed to parse deleting_node UUID for entity {:?}: {}", entity_id, e);
|
||||
failed_count += 1;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Deserialize vector clock
|
||||
let deletion_clock = match rkyv::from_bytes::<crate::networking::VectorClock, rkyv::rancor::Failure>(&deletion_clock_bytes) {
|
||||
Ok(clock) => clock,
|
||||
Err(e) => {
|
||||
error!("Failed to deserialize vector clock for tombstone {:?}: {:?}", entity_id, e);
|
||||
failed_count += 1;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Record the tombstone in the registry
|
||||
tombstone_registry.record_deletion(entity_id, deleting_node, deletion_clock);
|
||||
loaded_count += 1;
|
||||
}
|
||||
}
|
||||
|
||||
info!(
|
||||
"Tombstone restoration complete: {} succeeded, {} failed",
|
||||
loaded_count, failed_count
|
||||
);
|
||||
|
||||
if failed_count > 0 {
|
||||
warn!(
|
||||
"{} tombstones failed to restore - check logs for details",
|
||||
failed_count
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -29,6 +29,11 @@ pub const MIGRATIONS: &[Migration] = &[
|
||||
name: "sessions",
|
||||
up: include_str!("migrations/004_sessions.sql"),
|
||||
},
|
||||
Migration {
|
||||
version: 5,
|
||||
name: "tombstones",
|
||||
up: include_str!("migrations/005_tombstones.sql"),
|
||||
},
|
||||
];
|
||||
|
||||
/// Initialize the migrations table
|
||||
|
||||
@@ -0,0 +1,13 @@
|
||||
-- Migration 005: Add tombstones table
|
||||
-- Stores deletion tombstones to prevent resurrection of deleted entities
|
||||
|
||||
CREATE TABLE IF NOT EXISTS tombstones (
|
||||
entity_id BLOB PRIMARY KEY,
|
||||
deleting_node TEXT NOT NULL,
|
||||
deletion_clock BLOB NOT NULL,
|
||||
created_at INTEGER NOT NULL
|
||||
);
|
||||
|
||||
-- Index for querying tombstones by session (for future session scoping)
|
||||
CREATE INDEX IF NOT EXISTS idx_tombstones_created
|
||||
ON tombstones(created_at DESC);
|
||||
@@ -92,10 +92,11 @@ impl Plugin for PersistencePlugin {
|
||||
.init_resource::<ComponentTypeRegistryResource>();
|
||||
|
||||
// Add startup systems
|
||||
// First initialize the database, then rehydrate entities
|
||||
// First initialize the database, then rehydrate entities and tombstones
|
||||
app.add_systems(Startup, (
|
||||
persistence_startup_system,
|
||||
rehydrate_entities_system,
|
||||
load_tombstones_system,
|
||||
).chain());
|
||||
|
||||
// Add systems in the appropriate schedule
|
||||
@@ -168,7 +169,43 @@ fn persistence_startup_system(db: Res<PersistenceDb>, mut metrics: ResMut<Persis
|
||||
/// This system runs after `persistence_startup_system` and loads all entities
|
||||
/// from SQLite, deserializing and spawning them into the Bevy world with all
|
||||
/// their components.
|
||||
///
|
||||
/// **Important**: Only rehydrates entities when rejoining an existing session.
|
||||
/// New sessions start with 0 entities to avoid loading entities from previous
|
||||
/// sessions.
|
||||
fn rehydrate_entities_system(world: &mut World) {
|
||||
// Check if we're rejoining an existing session
|
||||
let should_rehydrate = {
|
||||
let current_session = world.get_resource::<crate::networking::CurrentSession>();
|
||||
match current_session {
|
||||
Some(session) => {
|
||||
// Only rehydrate if we have a last_known_clock (indicates we're rejoining)
|
||||
let is_rejoin = session.last_known_clock.node_count() > 0;
|
||||
if is_rejoin {
|
||||
info!(
|
||||
"Rejoining session {} - will rehydrate persisted entities",
|
||||
session.session.id.to_code()
|
||||
);
|
||||
} else {
|
||||
info!(
|
||||
"New session {} - starting with 0 entities",
|
||||
session.session.id.to_code()
|
||||
);
|
||||
}
|
||||
is_rejoin
|
||||
}
|
||||
None => {
|
||||
warn!("No CurrentSession found - skipping entity rehydration");
|
||||
false
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if !should_rehydrate {
|
||||
info!("Skipping entity rehydration for new session");
|
||||
return;
|
||||
}
|
||||
|
||||
if let Err(e) = crate::persistence::database::rehydrate_all_entities(world) {
|
||||
error!("Failed to rehydrate entities from database: {}", e);
|
||||
} else {
|
||||
@@ -176,6 +213,19 @@ fn rehydrate_entities_system(world: &mut World) {
|
||||
}
|
||||
}
|
||||
|
||||
/// Exclusive startup system to load tombstones from database
|
||||
///
|
||||
/// This system runs after `rehydrate_entities_system` and loads all tombstones
|
||||
/// from SQLite, deserializing them into the TombstoneRegistry to prevent
|
||||
/// resurrection of deleted entities.
|
||||
fn load_tombstones_system(world: &mut World) {
|
||||
if let Err(e) = crate::persistence::database::load_tombstones(world) {
|
||||
error!("Failed to load tombstones from database: {}", e);
|
||||
} else {
|
||||
info!("Successfully loaded tombstones from database");
|
||||
}
|
||||
}
|
||||
|
||||
/// System to collect dirty entities using Bevy's change detection
|
||||
///
|
||||
/// This system tracks changes to the `Persisted` component. When `Persisted` is
|
||||
|
||||
@@ -126,6 +126,13 @@ pub enum PersistenceOp {
|
||||
entity_id: EntityId,
|
||||
component_type: String,
|
||||
},
|
||||
|
||||
/// Record a tombstone for a deleted entity
|
||||
RecordTombstone {
|
||||
entity_id: EntityId,
|
||||
deleting_node: NodeId,
|
||||
deletion_clock: bytes::Bytes, // Serialized VectorClock
|
||||
},
|
||||
}
|
||||
|
||||
impl PersistenceOp {
|
||||
|
||||
652
crates/libmarathon/tests/multi_node_sync_test.rs
Normal file
652
crates/libmarathon/tests/multi_node_sync_test.rs
Normal file
@@ -0,0 +1,652 @@
|
||||
//! Multi-node sync integration tests with real iroh-gossip networking
|
||||
//!
|
||||
//! These tests verify actual message passing and synchronization between nodes
|
||||
//! using real iroh-gossip with localhost connections.
|
||||
|
||||
mod test_utils;
|
||||
|
||||
use anyhow::Result;
|
||||
use test_utils::{TestContext, create_test_app, wait_for_sync, count_entities_with_id, setup_gossip_pair, setup_gossip_trio};
|
||||
use bevy::prelude::*;
|
||||
use iroh::{Endpoint, protocol::Router};
|
||||
use libmarathon::networking::{
|
||||
CurrentSession,
|
||||
NetworkEntityMap,
|
||||
NetworkedEntity,
|
||||
SessionState,
|
||||
Synced,
|
||||
};
|
||||
use libmarathon::persistence::Persisted;
|
||||
use std::time::Duration;
|
||||
use tokio::time::Instant;
|
||||
use uuid::Uuid;
|
||||
|
||||
// ============================================================================
|
||||
// Integration Tests
|
||||
// ============================================================================
|
||||
// Note: All test utilities (gossip setup, test app creation, sync helpers)
|
||||
// are now in the shared test_utils module to avoid duplication
|
||||
|
||||
/// Test: Two nodes can synchronize a cube spawn using real iroh-gossip
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_two_nodes_sync_cube_spawn() -> Result<()> {
|
||||
println!("=== Starting test_two_nodes_sync_cube_spawn ===");
|
||||
|
||||
let ctx1 = TestContext::new();
|
||||
let ctx2 = TestContext::new();
|
||||
|
||||
let (ep1, ep2, router1, router2, bridge1, bridge2) = setup_gossip_pair().await?;
|
||||
|
||||
let node1_id = bridge1.node_id();
|
||||
let node2_id = bridge2.node_id();
|
||||
|
||||
let mut app1 = create_test_app(node1_id, ctx1.db_path(), bridge1);
|
||||
let mut app2 = create_test_app(node2_id, ctx2.db_path(), bridge2);
|
||||
|
||||
println!("Node1 ID: {}", node1_id);
|
||||
println!("Node2 ID: {}", node2_id);
|
||||
|
||||
// Initialize both apps
|
||||
app1.update();
|
||||
app2.update();
|
||||
|
||||
// Connect both to the same session
|
||||
use libmarathon::networking::SessionId;
|
||||
let session_id = SessionId::new();
|
||||
|
||||
{
|
||||
let mut session1 = app1.world_mut().resource_mut::<CurrentSession>();
|
||||
session1.session.id = session_id.clone();
|
||||
session1.transition_to(SessionState::Active);
|
||||
}
|
||||
{
|
||||
let mut session2 = app2.world_mut().resource_mut::<CurrentSession>();
|
||||
session2.session.id = session_id.clone();
|
||||
session2.transition_to(SessionState::Active);
|
||||
}
|
||||
|
||||
// Node 1: Spawn a cube
|
||||
let cube_id = Uuid::new_v4();
|
||||
{
|
||||
app1.world_mut().spawn((
|
||||
NetworkedEntity::with_id(cube_id, node1_id),
|
||||
Persisted::with_id(cube_id),
|
||||
Synced,
|
||||
Transform::from_xyz(1.0, 2.0, 3.0),
|
||||
));
|
||||
}
|
||||
|
||||
println!("Node1: Spawned cube {}", cube_id);
|
||||
|
||||
// Wait for sync
|
||||
wait_for_sync(&mut app1, &mut app2, Duration::from_secs(5), |_w1, w2| {
|
||||
let entity_map = w2.resource::<NetworkEntityMap>();
|
||||
entity_map.get_entity(cube_id).is_some()
|
||||
})
|
||||
.await?;
|
||||
|
||||
println!("✓ Node2 successfully received cube from Node1");
|
||||
|
||||
// Verify vector clocks converged
|
||||
{
|
||||
use libmarathon::networking::NodeVectorClock;
|
||||
|
||||
let clock1 = app1.world().resource::<NodeVectorClock>();
|
||||
let clock2 = app2.world().resource::<NodeVectorClock>();
|
||||
|
||||
println!("Node1 clock: {:?}", clock1.clock);
|
||||
println!("Node2 clock: {:?}", clock2.clock);
|
||||
|
||||
// Both clocks should know about both nodes
|
||||
assert!(
|
||||
clock2.clock.node_count() >= 2,
|
||||
"Node2 should know about both nodes in its clock"
|
||||
);
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
router1.shutdown().await?;
|
||||
router2.shutdown().await?;
|
||||
ep1.close().await;
|
||||
ep2.close().await;
|
||||
|
||||
println!("✓ Two nodes sync cube spawn test passed");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Test: Three nodes maintain consistent state using real iroh-gossip
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_three_nodes_consistency() -> Result<()> {
|
||||
println!("=== Starting test_three_nodes_consistency ===");
|
||||
|
||||
let ctx1 = TestContext::new();
|
||||
let ctx2 = TestContext::new();
|
||||
let ctx3 = TestContext::new();
|
||||
|
||||
let (ep1, ep2, ep3, router1, router2, router3, bridge1, bridge2, bridge3) =
|
||||
setup_gossip_trio().await?;
|
||||
|
||||
let node1_id = bridge1.node_id();
|
||||
let node2_id = bridge2.node_id();
|
||||
let node3_id = bridge3.node_id();
|
||||
|
||||
let mut app1 = create_test_app(node1_id, ctx1.db_path(), bridge1);
|
||||
let mut app2 = create_test_app(node2_id, ctx2.db_path(), bridge2);
|
||||
let mut app3 = create_test_app(node3_id, ctx3.db_path(), bridge3);
|
||||
|
||||
println!("Node1 ID: {}", node1_id);
|
||||
println!("Node2 ID: {}", node2_id);
|
||||
println!("Node3 ID: {}", node3_id);
|
||||
|
||||
// Initialize all apps
|
||||
app1.update();
|
||||
app2.update();
|
||||
app3.update();
|
||||
|
||||
// Connect all to the same session
|
||||
use libmarathon::networking::SessionId;
|
||||
let session_id = SessionId::new();
|
||||
|
||||
for app in [&mut app1, &mut app2, &mut app3] {
|
||||
let mut session = app.world_mut().resource_mut::<CurrentSession>();
|
||||
session.session.id = session_id.clone();
|
||||
session.transition_to(SessionState::Active);
|
||||
}
|
||||
|
||||
// Each node spawns a cube
|
||||
let cube1_id = Uuid::new_v4();
|
||||
let cube2_id = Uuid::new_v4();
|
||||
let cube3_id = Uuid::new_v4();
|
||||
|
||||
app1.world_mut().spawn((
|
||||
NetworkedEntity::with_id(cube1_id, node1_id),
|
||||
Persisted::with_id(cube1_id),
|
||||
Synced,
|
||||
Transform::from_xyz(1.0, 0.0, 0.0),
|
||||
));
|
||||
|
||||
app2.world_mut().spawn((
|
||||
NetworkedEntity::with_id(cube2_id, node2_id),
|
||||
Persisted::with_id(cube2_id),
|
||||
Synced,
|
||||
Transform::from_xyz(0.0, 2.0, 0.0),
|
||||
));
|
||||
|
||||
app3.world_mut().spawn((
|
||||
NetworkedEntity::with_id(cube3_id, node3_id),
|
||||
Persisted::with_id(cube3_id),
|
||||
Synced,
|
||||
Transform::from_xyz(0.0, 0.0, 3.0),
|
||||
));
|
||||
|
||||
println!("All nodes spawned their cubes");
|
||||
|
||||
// Wait for convergence - all nodes should have all 3 cubes
|
||||
let start = Instant::now();
|
||||
let mut converged = false;
|
||||
|
||||
while start.elapsed() < Duration::from_secs(10) {
|
||||
app1.update();
|
||||
app2.update();
|
||||
app3.update();
|
||||
|
||||
let map1 = app1.world().resource::<NetworkEntityMap>();
|
||||
let map2 = app2.world().resource::<NetworkEntityMap>();
|
||||
let map3 = app3.world().resource::<NetworkEntityMap>();
|
||||
|
||||
let count1 = [cube1_id, cube2_id, cube3_id].iter().filter(|id| map1.get_entity(**id).is_some()).count();
|
||||
let count2 = [cube1_id, cube2_id, cube3_id].iter().filter(|id| map2.get_entity(**id).is_some()).count();
|
||||
let count3 = [cube1_id, cube2_id, cube3_id].iter().filter(|id| map3.get_entity(**id).is_some()).count();
|
||||
|
||||
if count1 == 3 && count2 == 3 && count3 == 3 {
|
||||
println!("✓ All nodes converged to 3 cubes");
|
||||
converged = true;
|
||||
break;
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(16)).await;
|
||||
}
|
||||
|
||||
assert!(converged, "Nodes did not converge to consistent state");
|
||||
|
||||
// Cleanup
|
||||
router1.shutdown().await?;
|
||||
router2.shutdown().await?;
|
||||
router3.shutdown().await?;
|
||||
ep1.close().await;
|
||||
ep2.close().await;
|
||||
ep3.close().await;
|
||||
|
||||
println!("✓ Three nodes consistency test passed");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Test: FullState sync does not create duplicate entities
|
||||
/// This tests the fix for the bug where apply_full_state() would spawn
|
||||
/// duplicate entities instead of reusing existing ones.
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_fullstate_no_duplicate_entities() -> Result<()> {
|
||||
println!("=== Starting test_fullstate_no_duplicate_entities ===");
|
||||
|
||||
let ctx1 = TestContext::new();
|
||||
let ctx2 = TestContext::new();
|
||||
|
||||
let (ep1, ep2, router1, router2, bridge1, bridge2) = setup_gossip_pair().await?;
|
||||
|
||||
let node1_id = bridge1.node_id();
|
||||
let node2_id = bridge2.node_id();
|
||||
|
||||
let mut app1 = create_test_app(node1_id, ctx1.db_path(), bridge1);
|
||||
let mut app2 = create_test_app(node2_id, ctx2.db_path(), bridge2);
|
||||
|
||||
println!("Node1 ID: {}", node1_id);
|
||||
println!("Node2 ID: {}", node2_id);
|
||||
|
||||
// Initialize both apps
|
||||
app1.update();
|
||||
app2.update();
|
||||
|
||||
// Connect both to the same session
|
||||
use libmarathon::networking::SessionId;
|
||||
let session_id = SessionId::new();
|
||||
|
||||
{
|
||||
let mut session1 = app1.world_mut().resource_mut::<CurrentSession>();
|
||||
session1.session.id = session_id.clone();
|
||||
session1.transition_to(SessionState::Active);
|
||||
}
|
||||
{
|
||||
let mut session2 = app2.world_mut().resource_mut::<CurrentSession>();
|
||||
session2.session.id = session_id.clone();
|
||||
session2.transition_to(SessionState::Active);
|
||||
}
|
||||
|
||||
// Node 1: Spawn multiple cubes
|
||||
let cube1_id = Uuid::new_v4();
|
||||
let cube2_id = Uuid::new_v4();
|
||||
let cube3_id = Uuid::new_v4();
|
||||
|
||||
{
|
||||
app1.world_mut().spawn((
|
||||
NetworkedEntity::with_id(cube1_id, node1_id),
|
||||
Persisted::with_id(cube1_id),
|
||||
Synced,
|
||||
Transform::from_xyz(1.0, 0.0, 0.0),
|
||||
));
|
||||
|
||||
app1.world_mut().spawn((
|
||||
NetworkedEntity::with_id(cube2_id, node1_id),
|
||||
Persisted::with_id(cube2_id),
|
||||
Synced,
|
||||
Transform::from_xyz(2.0, 0.0, 0.0),
|
||||
));
|
||||
|
||||
app1.world_mut().spawn((
|
||||
NetworkedEntity::with_id(cube3_id, node1_id),
|
||||
Persisted::with_id(cube3_id),
|
||||
Synced,
|
||||
Transform::from_xyz(3.0, 0.0, 0.0),
|
||||
));
|
||||
}
|
||||
|
||||
println!("Node1: Spawned 3 cubes: {}, {}, {}", cube1_id, cube2_id, cube3_id);
|
||||
|
||||
// Wait for sync - Node2 should receive all 3 cubes via FullState
|
||||
wait_for_sync(&mut app1, &mut app2, Duration::from_secs(5), |_w1, w2| {
|
||||
let entity_map = w2.resource::<NetworkEntityMap>();
|
||||
entity_map.get_entity(cube1_id).is_some()
|
||||
&& entity_map.get_entity(cube2_id).is_some()
|
||||
&& entity_map.get_entity(cube3_id).is_some()
|
||||
})
|
||||
.await?;
|
||||
|
||||
println!("✓ Node2 received all cubes from Node1");
|
||||
|
||||
// CRITICAL CHECK: Verify no duplicate entities were created
|
||||
// Each unique network_id should appear exactly once
|
||||
{
|
||||
let count1 = count_entities_with_id(app2.world_mut(), cube1_id);
|
||||
let count2 = count_entities_with_id(app2.world_mut(), cube2_id);
|
||||
let count3 = count_entities_with_id(app2.world_mut(), cube3_id);
|
||||
|
||||
println!("Entity counts in Node2:");
|
||||
println!(" Cube1 ({}): {}", cube1_id, count1);
|
||||
println!(" Cube2 ({}): {}", cube2_id, count2);
|
||||
println!(" Cube3 ({}): {}", cube3_id, count3);
|
||||
|
||||
assert_eq!(
|
||||
count1, 1,
|
||||
"Cube1 should appear exactly once, found {} instances",
|
||||
count1
|
||||
);
|
||||
assert_eq!(
|
||||
count2, 1,
|
||||
"Cube2 should appear exactly once, found {} instances",
|
||||
count2
|
||||
);
|
||||
assert_eq!(
|
||||
count3, 1,
|
||||
"Cube3 should appear exactly once, found {} instances",
|
||||
count3
|
||||
);
|
||||
}
|
||||
|
||||
// Also verify total entity count matches expected
|
||||
{
|
||||
use libmarathon::networking::NetworkedEntity;
|
||||
let mut query = app2.world_mut().query::<&NetworkedEntity>();
|
||||
let total_count = query.iter(app2.world()).count();
|
||||
|
||||
println!("Total NetworkedEntity count in Node2: {}", total_count);
|
||||
|
||||
assert_eq!(
|
||||
total_count, 3,
|
||||
"Node2 should have exactly 3 networked entities, found {}",
|
||||
total_count
|
||||
);
|
||||
}
|
||||
|
||||
println!("✓ No duplicate entities created - FullState correctly reused existing entities");
|
||||
|
||||
// Continue syncing for a bit to ensure no duplicates appear over time
|
||||
let start = Instant::now();
|
||||
while start.elapsed() < Duration::from_secs(2) {
|
||||
app1.update();
|
||||
app2.update();
|
||||
tokio::time::sleep(Duration::from_millis(16)).await;
|
||||
}
|
||||
|
||||
// Final verification - counts should still be correct
|
||||
{
|
||||
let count1 = count_entities_with_id(app2.world_mut(), cube1_id);
|
||||
let count2 = count_entities_with_id(app2.world_mut(), cube2_id);
|
||||
let count3 = count_entities_with_id(app2.world_mut(), cube3_id);
|
||||
|
||||
assert_eq!(count1, 1, "Cube1 count changed during continued sync");
|
||||
assert_eq!(count2, 1, "Cube2 count changed during continued sync");
|
||||
assert_eq!(count3, 1, "Cube3 count changed during continued sync");
|
||||
}
|
||||
|
||||
println!("✓ Entity counts remained stable during continued sync");
|
||||
|
||||
// Cleanup
|
||||
router1.shutdown().await?;
|
||||
router2.shutdown().await?;
|
||||
ep1.close().await;
|
||||
ep2.close().await;
|
||||
|
||||
println!("✓ FullState no duplicate entities test passed");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Test: Remote delta does not cause feedback loop
|
||||
/// Verifies that applying a remote operation doesn't trigger re-broadcasting
|
||||
/// the same change back to the network (runaway vector clock bug)
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_remote_delta_no_feedback_loop() -> Result<()> {
|
||||
println!("=== Starting test_remote_delta_no_feedback_loop ===");
|
||||
|
||||
let ctx1 = TestContext::new();
|
||||
let ctx2 = TestContext::new();
|
||||
|
||||
let (ep1, ep2, router1, router2, bridge1, bridge2) = setup_gossip_pair().await?;
|
||||
|
||||
let node1_id = bridge1.node_id();
|
||||
let node2_id = bridge2.node_id();
|
||||
|
||||
let mut app1 = create_test_app(node1_id, ctx1.db_path(), bridge1);
|
||||
let mut app2 = create_test_app(node2_id, ctx2.db_path(), bridge2);
|
||||
|
||||
println!("Node1 ID: {}", node1_id);
|
||||
println!("Node2 ID: {}", node2_id);
|
||||
|
||||
// Initialize both apps
|
||||
app1.update();
|
||||
app2.update();
|
||||
|
||||
// Connect both to the same session
|
||||
use libmarathon::networking::SessionId;
|
||||
let session_id = SessionId::new();
|
||||
|
||||
{
|
||||
let mut session1 = app1.world_mut().resource_mut::<CurrentSession>();
|
||||
session1.session.id = session_id.clone();
|
||||
session1.transition_to(SessionState::Active);
|
||||
}
|
||||
{
|
||||
let mut session2 = app2.world_mut().resource_mut::<CurrentSession>();
|
||||
session2.session.id = session_id.clone();
|
||||
session2.transition_to(SessionState::Active);
|
||||
}
|
||||
|
||||
// Node 1: Spawn a cube
|
||||
let cube_id = Uuid::new_v4();
|
||||
{
|
||||
app1.world_mut().spawn((
|
||||
NetworkedEntity::with_id(cube_id, node1_id),
|
||||
Persisted::with_id(cube_id),
|
||||
Synced,
|
||||
Transform::from_xyz(1.0, 2.0, 3.0),
|
||||
));
|
||||
}
|
||||
|
||||
println!("Node1: Spawned cube {}", cube_id);
|
||||
|
||||
// Wait for initial sync
|
||||
wait_for_sync(&mut app1, &mut app2, Duration::from_secs(5), |_w1, w2| {
|
||||
let entity_map = w2.resource::<NetworkEntityMap>();
|
||||
entity_map.get_entity(cube_id).is_some()
|
||||
})
|
||||
.await?;
|
||||
|
||||
println!("✓ Node2 received cube from Node1");
|
||||
|
||||
// Get initial clock sequences
|
||||
let node1_initial_seq = {
|
||||
let clock1 = app1.world().resource::<libmarathon::networking::NodeVectorClock>();
|
||||
clock1.sequence()
|
||||
};
|
||||
let node2_initial_seq = {
|
||||
let clock2 = app2.world().resource::<libmarathon::networking::NodeVectorClock>();
|
||||
clock2.sequence()
|
||||
};
|
||||
|
||||
println!("Initial clocks: Node1={}, Node2={}", node1_initial_seq, node2_initial_seq);
|
||||
|
||||
// Run both apps for a few seconds to see if clocks stabilize
|
||||
// If there's a feedback loop, clocks will keep incrementing rapidly
|
||||
let start = Instant::now();
|
||||
while start.elapsed() < Duration::from_secs(2) {
|
||||
app1.update();
|
||||
app2.update();
|
||||
tokio::time::sleep(Duration::from_millis(16)).await;
|
||||
}
|
||||
|
||||
// Check final clock sequences
|
||||
let node1_final_seq = {
|
||||
let clock1 = app1.world().resource::<libmarathon::networking::NodeVectorClock>();
|
||||
clock1.sequence()
|
||||
};
|
||||
let node2_final_seq = {
|
||||
let clock2 = app2.world().resource::<libmarathon::networking::NodeVectorClock>();
|
||||
clock2.sequence()
|
||||
};
|
||||
|
||||
println!("Final clocks: Node1={}, Node2={}", node1_final_seq, node2_final_seq);
|
||||
|
||||
// Calculate clock growth
|
||||
let node1_growth = node1_final_seq - node1_initial_seq;
|
||||
let node2_growth = node2_final_seq - node2_initial_seq;
|
||||
|
||||
println!("Clock growth: Node1=+{}, Node2=+{}", node1_growth, node2_growth);
|
||||
|
||||
// With feedback loop: clocks would grow by 100s (every frame generates delta)
|
||||
// Without feedback loop: clocks should grow by 0-5 (only periodic sync)
|
||||
assert!(
|
||||
node1_growth < 10,
|
||||
"Node1 clock grew too much ({}) - indicates feedback loop",
|
||||
node1_growth
|
||||
);
|
||||
assert!(
|
||||
node2_growth < 10,
|
||||
"Node2 clock grew too much ({}) - indicates feedback loop",
|
||||
node2_growth
|
||||
);
|
||||
|
||||
println!("✓ No runaway vector clock - feedback loop prevented");
|
||||
|
||||
// Cleanup
|
||||
router1.shutdown().await?;
|
||||
router2.shutdown().await?;
|
||||
ep1.close().await;
|
||||
ep2.close().await;
|
||||
|
||||
println!("✓ Remote delta feedback loop prevention test passed");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Test: Local change after remote delta gets broadcast
|
||||
/// Verifies that making a local change after receiving a remote delta
|
||||
/// still results in broadcasting the local change (with one frame delay)
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_local_change_after_remote_delta() -> Result<()> {
|
||||
println!("=== Starting test_local_change_after_remote_delta ===");
|
||||
|
||||
let ctx1 = TestContext::new();
|
||||
let ctx2 = TestContext::new();
|
||||
|
||||
let (ep1, ep2, router1, router2, bridge1, bridge2) = setup_gossip_pair().await?;
|
||||
|
||||
let node1_id = bridge1.node_id();
|
||||
let node2_id = bridge2.node_id();
|
||||
|
||||
let mut app1 = create_test_app(node1_id, ctx1.db_path(), bridge1);
|
||||
let mut app2 = create_test_app(node2_id, ctx2.db_path(), bridge2);
|
||||
|
||||
println!("Node1 ID: {}", node1_id);
|
||||
println!("Node2 ID: {}", node2_id);
|
||||
|
||||
// Initialize both apps
|
||||
app1.update();
|
||||
app2.update();
|
||||
|
||||
// Connect both to the same session
|
||||
use libmarathon::networking::SessionId;
|
||||
let session_id = SessionId::new();
|
||||
|
||||
{
|
||||
let mut session1 = app1.world_mut().resource_mut::<CurrentSession>();
|
||||
session1.session.id = session_id.clone();
|
||||
session1.transition_to(SessionState::Active);
|
||||
}
|
||||
{
|
||||
let mut session2 = app2.world_mut().resource_mut::<CurrentSession>();
|
||||
session2.session.id = session_id.clone();
|
||||
session2.transition_to(SessionState::Active);
|
||||
}
|
||||
|
||||
// Node 1: Spawn a cube at position (1, 2, 3)
|
||||
let cube_id = Uuid::new_v4();
|
||||
{
|
||||
app1.world_mut().spawn((
|
||||
NetworkedEntity::with_id(cube_id, node1_id),
|
||||
Persisted::with_id(cube_id),
|
||||
Synced,
|
||||
Transform::from_xyz(1.0, 2.0, 3.0),
|
||||
));
|
||||
}
|
||||
|
||||
println!("Node1: Spawned cube {} at (1, 2, 3)", cube_id);
|
||||
|
||||
// Wait for sync
|
||||
wait_for_sync(&mut app1, &mut app2, Duration::from_secs(5), |_w1, w2| {
|
||||
let entity_map = w2.resource::<NetworkEntityMap>();
|
||||
entity_map.get_entity(cube_id).is_some()
|
||||
})
|
||||
.await?;
|
||||
|
||||
println!("✓ Node2 received cube");
|
||||
|
||||
// Node 2: Make a local change (move cube to 10, 20, 30)
|
||||
{
|
||||
let entity_map = app2.world().resource::<NetworkEntityMap>();
|
||||
let entity = entity_map.get_entity(cube_id).expect("Cube should exist on Node2");
|
||||
|
||||
if let Ok(mut entity_mut) = app2.world_mut().get_entity_mut(entity) {
|
||||
if let Some(mut transform) = entity_mut.get_mut::<Transform>() {
|
||||
transform.translation = Vec3::new(10.0, 20.0, 30.0);
|
||||
println!("Node2: Moved cube to (10, 20, 30)");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for Node1 to receive the update from Node2
|
||||
wait_for_sync(&mut app1, &mut app2, Duration::from_secs(5), |w1, _w2| {
|
||||
let entity_map = w1.resource::<NetworkEntityMap>();
|
||||
if let Some(entity) = entity_map.get_entity(cube_id) {
|
||||
if let Ok(entity_ref) = w1.get_entity(entity) {
|
||||
if let Some(transform) = entity_ref.get::<Transform>() {
|
||||
// Check if position is close to (10, 20, 30)
|
||||
let pos = transform.translation;
|
||||
(pos.x - 10.0).abs() < 0.1 &&
|
||||
(pos.y - 20.0).abs() < 0.1 &&
|
||||
(pos.z - 30.0).abs() < 0.1
|
||||
} else {
|
||||
false
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
})
|
||||
.await?;
|
||||
|
||||
println!("✓ Node1 received local change from Node2");
|
||||
|
||||
// Verify final position on Node1
|
||||
{
|
||||
let entity_map = app1.world().resource::<NetworkEntityMap>();
|
||||
let entity = entity_map.get_entity(cube_id).expect("Cube should exist on Node1");
|
||||
|
||||
if let Ok(entity_ref) = app1.world().get_entity(entity) {
|
||||
if let Some(transform) = entity_ref.get::<Transform>() {
|
||||
let pos = transform.translation;
|
||||
println!("Node1 final position: ({}, {}, {})", pos.x, pos.y, pos.z);
|
||||
|
||||
assert!(
|
||||
(pos.x - 10.0).abs() < 0.1,
|
||||
"X position should be ~10.0, got {}",
|
||||
pos.x
|
||||
);
|
||||
assert!(
|
||||
(pos.y - 20.0).abs() < 0.1,
|
||||
"Y position should be ~20.0, got {}",
|
||||
pos.y
|
||||
);
|
||||
assert!(
|
||||
(pos.z - 30.0).abs() < 0.1,
|
||||
"Z position should be ~30.0, got {}",
|
||||
pos.z
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
println!("✓ Local change after remote delta was successfully broadcast");
|
||||
|
||||
// Cleanup
|
||||
router1.shutdown().await?;
|
||||
router2.shutdown().await?;
|
||||
ep1.close().await;
|
||||
ep2.close().await;
|
||||
|
||||
println!("✓ Local change after remote delta test passed");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
1022
crates/libmarathon/tests/session_sync_test.rs
Normal file
1022
crates/libmarathon/tests/session_sync_test.rs
Normal file
File diff suppressed because it is too large
Load Diff
@@ -3,6 +3,8 @@
|
||||
//! These tests validate end-to-end CRDT synchronization and persistence
|
||||
//! using multiple headless Bevy apps with real iroh-gossip networking.
|
||||
|
||||
mod test_utils;
|
||||
|
||||
use std::{
|
||||
path::PathBuf,
|
||||
time::{
|
||||
@@ -12,6 +14,7 @@ use std::{
|
||||
};
|
||||
|
||||
use anyhow::Result;
|
||||
use test_utils::{setup_gossip_pair, TestContext, wait_for_sync};
|
||||
use bevy::{
|
||||
MinimalPlugins,
|
||||
app::{
|
||||
@@ -85,37 +88,15 @@ struct TestHealth {
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Test Utilities
|
||||
// Test-Specific Utilities
|
||||
// ============================================================================
|
||||
// Common utilities (TestContext, wait_for_sync, gossip setup) are in shared test_utils
|
||||
// These are specific to this test file (DB checks, TestPosition assertions)
|
||||
|
||||
mod test_utils {
|
||||
use rusqlite::Connection;
|
||||
|
||||
use super::*;
|
||||
|
||||
/// Test context that manages temporary directories with RAII cleanup
|
||||
pub struct TestContext {
|
||||
_temp_dir: TempDir,
|
||||
db_path: PathBuf,
|
||||
}
|
||||
|
||||
impl TestContext {
|
||||
pub fn new() -> Self {
|
||||
let temp_dir = TempDir::new().expect("Failed to create temp directory");
|
||||
let db_path = temp_dir.path().join("test.db");
|
||||
Self {
|
||||
_temp_dir: temp_dir,
|
||||
db_path,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn db_path(&self) -> PathBuf {
|
||||
self.db_path.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if an entity exists in the database
|
||||
pub fn entity_exists_in_db(db_path: &PathBuf, entity_id: Uuid) -> Result<bool> {
|
||||
fn entity_exists_in_db(db_path: &PathBuf, entity_id: Uuid) -> Result<bool> {
|
||||
let conn = Connection::open(db_path)?;
|
||||
let entity_id_bytes = entity_id.as_bytes();
|
||||
|
||||
@@ -129,7 +110,7 @@ mod test_utils {
|
||||
}
|
||||
|
||||
/// Check if a component exists for an entity in the database
|
||||
pub fn component_exists_in_db(
|
||||
fn component_exists_in_db(
|
||||
db_path: &PathBuf,
|
||||
entity_id: Uuid,
|
||||
component_type: &str,
|
||||
@@ -149,7 +130,7 @@ mod test_utils {
|
||||
/// Load a component from the database and deserialize it
|
||||
/// TODO: Rewrite to use ComponentTypeRegistry instead of reflection
|
||||
#[allow(dead_code)]
|
||||
pub fn load_component_from_db<T: Component + Clone>(
|
||||
fn load_component_from_db<T: Component + Clone>(
|
||||
_db_path: &PathBuf,
|
||||
_entity_id: Uuid,
|
||||
_component_type: &str,
|
||||
@@ -159,48 +140,19 @@ mod test_utils {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Create a headless Bevy app configured for testing
|
||||
pub fn create_test_app(node_id: Uuid, db_path: PathBuf, bridge: GossipBridge) -> App {
|
||||
let mut app = App::new();
|
||||
/// Create a test app with TestPosition and TestHealth registered
|
||||
fn create_test_app(node_id: Uuid, db_path: PathBuf, bridge: GossipBridge) -> App {
|
||||
let mut app = test_utils::create_test_app(node_id, db_path, bridge);
|
||||
|
||||
app.add_plugins(MinimalPlugins.set(ScheduleRunnerPlugin::run_loop(
|
||||
Duration::from_secs_f64(1.0 / 60.0),
|
||||
)))
|
||||
.insert_resource(bridge)
|
||||
.add_plugins(NetworkingPlugin::new(NetworkingConfig {
|
||||
node_id,
|
||||
sync_interval_secs: 0.5, // Fast for testing
|
||||
prune_interval_secs: 10.0,
|
||||
tombstone_gc_interval_secs: 30.0,
|
||||
}))
|
||||
.add_plugins(PersistencePlugin::with_config(
|
||||
db_path,
|
||||
PersistenceConfig {
|
||||
flush_interval_secs: 1,
|
||||
checkpoint_interval_secs: 5,
|
||||
battery_adaptive: false,
|
||||
..Default::default()
|
||||
},
|
||||
));
|
||||
|
||||
// Register test component types for reflection
|
||||
// Register test-specific component types
|
||||
app.register_type::<TestPosition>()
|
||||
.register_type::<TestHealth>();
|
||||
|
||||
app
|
||||
}
|
||||
|
||||
/// Count entities with a specific network ID
|
||||
pub fn count_entities_with_id(world: &mut World, network_id: Uuid) -> usize {
|
||||
let mut query = world.query::<&NetworkedEntity>();
|
||||
query
|
||||
.iter(world)
|
||||
.filter(|entity| entity.network_id == network_id)
|
||||
.count()
|
||||
}
|
||||
|
||||
/// Assert that an entity with specific network ID and position exists
|
||||
pub fn assert_entity_synced(
|
||||
fn assert_entity_synced(
|
||||
world: &mut World,
|
||||
network_id: Uuid,
|
||||
expected_position: TestPosition,
|
||||
@@ -225,277 +177,29 @@ mod test_utils {
|
||||
anyhow::bail!("Entity {} not found in world", network_id)
|
||||
}
|
||||
|
||||
/// Wait for sync condition to be met, polling both apps
|
||||
pub async fn wait_for_sync<F>(
|
||||
app1: &mut App,
|
||||
app2: &mut App,
|
||||
timeout: Duration,
|
||||
check_fn: F,
|
||||
) -> Result<()>
|
||||
where
|
||||
F: Fn(&mut World, &mut World) -> bool, {
|
||||
let start = Instant::now();
|
||||
let mut tick_count = 0;
|
||||
|
||||
while start.elapsed() < timeout {
|
||||
// Tick both apps
|
||||
app1.update();
|
||||
app2.update();
|
||||
tick_count += 1;
|
||||
|
||||
if tick_count % 50 == 0 {
|
||||
println!(
|
||||
"Waiting for sync... tick {} ({:.1}s elapsed)",
|
||||
tick_count,
|
||||
start.elapsed().as_secs_f32()
|
||||
);
|
||||
}
|
||||
|
||||
// Check condition
|
||||
if check_fn(app1.world_mut(), app2.world_mut()) {
|
||||
println!(
|
||||
"Sync completed after {} ticks ({:.3}s)",
|
||||
tick_count,
|
||||
start.elapsed().as_secs_f32()
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Small delay to avoid spinning
|
||||
tokio::time::sleep(Duration::from_millis(16)).await;
|
||||
}
|
||||
|
||||
println!("Sync timeout after {} ticks", tick_count);
|
||||
anyhow::bail!("Sync timeout after {:?}. Condition not met.", timeout)
|
||||
}
|
||||
|
||||
/// Initialize a single iroh-gossip node
|
||||
async fn init_gossip_node(
|
||||
topic_id: TopicId,
|
||||
bootstrap_addrs: Vec<iroh::EndpointAddr>,
|
||||
) -> Result<(Endpoint, Gossip, Router, GossipBridge)> {
|
||||
println!(" Creating endpoint (localhost only for fast testing)...");
|
||||
// Create the Iroh endpoint bound to localhost only (no mDNS needed)
|
||||
let endpoint = Endpoint::builder()
|
||||
.bind_addr_v4(std::net::SocketAddrV4::new(std::net::Ipv4Addr::LOCALHOST, 0))
|
||||
.bind()
|
||||
.await?;
|
||||
let endpoint_id = endpoint.addr().id;
|
||||
println!(" Endpoint created: {}", endpoint_id);
|
||||
|
||||
// Convert 32-byte endpoint ID to 16-byte UUID by taking first 16 bytes
|
||||
let id_bytes = endpoint_id.as_bytes();
|
||||
let mut uuid_bytes = [0u8; 16];
|
||||
uuid_bytes.copy_from_slice(&id_bytes[..16]);
|
||||
let node_id = Uuid::from_bytes(uuid_bytes);
|
||||
|
||||
println!(" Spawning gossip protocol...");
|
||||
// Build the gossip protocol
|
||||
let gossip = Gossip::builder().spawn(endpoint.clone());
|
||||
|
||||
println!(" Setting up router...");
|
||||
// Setup the router to handle incoming connections
|
||||
let router = Router::builder(endpoint.clone())
|
||||
.accept(iroh_gossip::ALPN, gossip.clone())
|
||||
.spawn();
|
||||
|
||||
// Add bootstrap peers using StaticProvider for direct localhost connections
|
||||
let bootstrap_count = bootstrap_addrs.len();
|
||||
let has_bootstrap_peers = !bootstrap_addrs.is_empty();
|
||||
|
||||
// Collect bootstrap IDs before moving the addresses
|
||||
let bootstrap_ids: Vec<_> = bootstrap_addrs.iter().map(|a| a.id).collect();
|
||||
|
||||
if has_bootstrap_peers {
|
||||
let static_provider = iroh::discovery::static_provider::StaticProvider::default();
|
||||
for addr in &bootstrap_addrs {
|
||||
static_provider.add_endpoint_info(addr.clone());
|
||||
}
|
||||
endpoint.discovery().add(static_provider);
|
||||
println!(" Added {} bootstrap peers to discovery", bootstrap_count);
|
||||
|
||||
// Connect to bootstrap peers (localhost connections are instant)
|
||||
for addr in &bootstrap_addrs {
|
||||
match endpoint.connect(addr.clone(), iroh_gossip::ALPN).await {
|
||||
| Ok(_conn) => println!(" ✓ Connected to {}", addr.id),
|
||||
| Err(e) => println!(" ✗ Connection failed: {}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe to the topic
|
||||
let subscribe_handle = gossip.subscribe(topic_id, bootstrap_ids).await?;
|
||||
let (sender, mut receiver) = subscribe_handle.split();
|
||||
|
||||
// Wait for join if we have bootstrap peers (should be instant on localhost)
|
||||
if has_bootstrap_peers {
|
||||
match tokio::time::timeout(Duration::from_millis(500), receiver.joined()).await {
|
||||
| Ok(Ok(())) => println!(" ✓ Join completed"),
|
||||
| Ok(Err(e)) => println!(" ✗ Join error: {}", e),
|
||||
| Err(_) => println!(" ⚠ Join timeout (proceeding anyway)"),
|
||||
}
|
||||
}
|
||||
|
||||
// Create bridge and wire it up
|
||||
let bridge = GossipBridge::new(node_id);
|
||||
println!(" Spawning bridge tasks...");
|
||||
|
||||
// Spawn background tasks to forward messages between gossip and bridge
|
||||
spawn_gossip_bridge_tasks(sender, receiver, bridge.clone());
|
||||
|
||||
println!(" Node initialization complete");
|
||||
Ok((endpoint, gossip, router, bridge))
|
||||
}
|
||||
|
||||
/// Setup a pair of iroh-gossip nodes connected to the same topic
|
||||
pub async fn setup_gossip_pair() -> Result<(
|
||||
Endpoint,
|
||||
Endpoint,
|
||||
Router,
|
||||
Router,
|
||||
GossipBridge,
|
||||
GossipBridge,
|
||||
)> {
|
||||
// Use a shared topic for both nodes
|
||||
let topic_id = TopicId::from_bytes([42; 32]);
|
||||
println!("Using topic ID: {:?}", topic_id);
|
||||
|
||||
// Initialize node 1 with no bootstrap peers
|
||||
println!("Initializing node 1...");
|
||||
let (ep1, _gossip1, router1, bridge1) = init_gossip_node(topic_id, vec![]).await?;
|
||||
println!("Node 1 initialized with ID: {}", ep1.addr().id);
|
||||
|
||||
// Get node 1's full address (ID + network addresses) for node 2 to bootstrap
|
||||
// from
|
||||
let node1_addr = ep1.addr().clone();
|
||||
println!("Node 1 full address: {:?}", node1_addr);
|
||||
|
||||
// Initialize node 2 with node 1's full address as bootstrap peer
|
||||
println!("Initializing node 2 with bootstrap peer: {}", node1_addr.id);
|
||||
let (ep2, _gossip2, router2, bridge2) =
|
||||
init_gossip_node(topic_id, vec![node1_addr]).await?;
|
||||
println!("Node 2 initialized with ID: {}", ep2.addr().id);
|
||||
|
||||
// Brief wait for gossip protocol to stabilize (localhost is fast)
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
|
||||
Ok((ep1, ep2, router1, router2, bridge1, bridge2))
|
||||
}
|
||||
|
||||
/// Spawn background tasks to forward messages between iroh-gossip and
|
||||
/// GossipBridge
|
||||
fn spawn_gossip_bridge_tasks(
|
||||
sender: GossipSender,
|
||||
mut receiver: GossipReceiver,
|
||||
bridge: GossipBridge,
|
||||
) {
|
||||
let node_id = bridge.node_id();
|
||||
|
||||
// Task 1: Forward from bridge.outgoing → gossip sender
|
||||
let bridge_out = bridge.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut msg_count = 0;
|
||||
loop {
|
||||
// Poll the bridge's outgoing queue
|
||||
if let Some(versioned_msg) = bridge_out.try_recv_outgoing() {
|
||||
msg_count += 1;
|
||||
println!(
|
||||
"[Node {}] Sending message #{} via gossip",
|
||||
node_id, msg_count
|
||||
);
|
||||
// Serialize the message
|
||||
match rkyv::to_bytes::<rkyv::rancor::Failure>(&versioned_msg).map(|b| b.to_vec()) {
|
||||
| Ok(bytes) => {
|
||||
// Broadcast via gossip
|
||||
if let Err(e) = sender.broadcast(bytes.into()).await {
|
||||
eprintln!("[Node {}] Failed to broadcast message: {}", node_id, e);
|
||||
} else {
|
||||
println!(
|
||||
"[Node {}] Message #{} broadcasted successfully",
|
||||
node_id, msg_count
|
||||
);
|
||||
}
|
||||
},
|
||||
| Err(e) => eprintln!(
|
||||
"[Node {}] Failed to serialize message for broadcast: {}",
|
||||
node_id, e
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
// Small delay to avoid spinning
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
});
|
||||
|
||||
// Task 2: Forward from gossip receiver → bridge.incoming
|
||||
let bridge_in = bridge.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut msg_count = 0;
|
||||
println!("[Node {}] Gossip receiver task started", node_id);
|
||||
loop {
|
||||
// Receive from gossip (GossipReceiver is a Stream)
|
||||
match tokio::time::timeout(Duration::from_millis(100), receiver.next()).await {
|
||||
| Ok(Some(Ok(event))) => {
|
||||
println!(
|
||||
"[Node {}] Received gossip event: {:?}",
|
||||
node_id,
|
||||
std::mem::discriminant(&event)
|
||||
);
|
||||
if let iroh_gossip::api::Event::Received(msg) = event {
|
||||
msg_count += 1;
|
||||
println!(
|
||||
"[Node {}] Received message #{} from gossip",
|
||||
node_id, msg_count
|
||||
);
|
||||
// Deserialize the message
|
||||
match rkyv::from_bytes::<VersionedMessage, rkyv::rancor::Failure>(&msg.content) {
|
||||
| Ok(versioned_msg) => {
|
||||
// Push to bridge's incoming queue
|
||||
if let Err(e) = bridge_in.push_incoming(versioned_msg) {
|
||||
eprintln!(
|
||||
"[Node {}] Failed to push to bridge incoming: {}",
|
||||
node_id, e
|
||||
);
|
||||
} else {
|
||||
println!(
|
||||
"[Node {}] Message #{} pushed to bridge incoming",
|
||||
node_id, msg_count
|
||||
);
|
||||
}
|
||||
},
|
||||
| Err(e) => eprintln!(
|
||||
"[Node {}] Failed to deserialize gossip message: {}",
|
||||
node_id, e
|
||||
),
|
||||
}
|
||||
}
|
||||
},
|
||||
| Ok(Some(Err(e))) => {
|
||||
eprintln!("[Node {}] Gossip receiver error: {}", node_id, e)
|
||||
},
|
||||
| Ok(None) => {
|
||||
// Stream ended
|
||||
println!("[Node {}] Gossip stream ended", node_id);
|
||||
break;
|
||||
},
|
||||
| Err(_) => {
|
||||
// Timeout, no message available
|
||||
},
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Integration Tests
|
||||
// ============================================================================
|
||||
|
||||
/// Test 1: Basic entity sync (Node A spawns → Node B receives)
|
||||
use test_utils::count_entities_with_id;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_basic_entity_sync() -> Result<()> {
|
||||
use test_utils::*;
|
||||
println!("=== Starting test_basic_entity_sync ===");
|
||||
|
||||
let ctx1 = TestContext::new();
|
||||
let ctx2 = TestContext::new();
|
||||
|
||||
let (ep1, ep2, router1, router2, bridge1, bridge2) = setup_gossip_pair().await?;
|
||||
|
||||
let node1_id = bridge1.node_id();
|
||||
let node2_id = bridge2.node_id();
|
||||
|
||||
let mut app1 = create_test_app(node1_id, ctx1.db_path(), bridge1);
|
||||
let mut app2 = create_test_app(node2_id, ctx2.db_path(), bridge2);
|
||||
|
||||
println!("Node1 ID: {}", node1_id);
|
||||
println!("Node2 ID: {}", node2_id);
|
||||
|
||||
println!("=== Starting test_basic_entity_sync ===");
|
||||
|
||||
|
||||
315
crates/libmarathon/tests/test_utils/gossip.rs
Normal file
315
crates/libmarathon/tests/test_utils/gossip.rs
Normal file
@@ -0,0 +1,315 @@
|
||||
//! Shared iroh-gossip setup utilities for integration tests
|
||||
//!
|
||||
//! This module provides real iroh-gossip networking infrastructure that all
|
||||
//! integration tests should use. No shortcuts - always use real localhost connections.
|
||||
|
||||
use anyhow::Result;
|
||||
use futures_lite::StreamExt;
|
||||
use iroh::{
|
||||
Endpoint,
|
||||
discovery::static_provider::StaticProvider,
|
||||
protocol::Router,
|
||||
};
|
||||
use iroh_gossip::{
|
||||
api::{GossipReceiver, GossipSender},
|
||||
net::Gossip,
|
||||
proto::TopicId,
|
||||
};
|
||||
use libmarathon::networking::{GossipBridge, VersionedMessage};
|
||||
use std::time::Duration;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Initialize a single iroh-gossip node
|
||||
///
|
||||
/// Creates a real iroh endpoint bound to localhost, spawns the gossip protocol,
|
||||
/// sets up routing, and optionally connects to bootstrap peers.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `topic_id` - The gossip topic to subscribe to
|
||||
/// * `bootstrap_addrs` - Optional bootstrap peers to connect to
|
||||
///
|
||||
/// # Returns
|
||||
/// * Endpoint - The iroh endpoint for this node
|
||||
/// * Gossip - The gossip protocol handler
|
||||
/// * Router - The router handling incoming connections
|
||||
/// * GossipBridge - The bridge for Bevy ECS integration
|
||||
pub async fn init_gossip_node(
|
||||
topic_id: TopicId,
|
||||
bootstrap_addrs: Vec<iroh::EndpointAddr>,
|
||||
) -> Result<(Endpoint, Gossip, Router, GossipBridge)> {
|
||||
println!(" Creating endpoint (localhost only for fast testing)...");
|
||||
// Create the Iroh endpoint bound to localhost only (no mDNS needed)
|
||||
let endpoint = Endpoint::builder()
|
||||
.bind_addr_v4(std::net::SocketAddrV4::new(std::net::Ipv4Addr::LOCALHOST, 0))
|
||||
.bind()
|
||||
.await?;
|
||||
let endpoint_id = endpoint.addr().id;
|
||||
println!(" Endpoint created: {}", endpoint_id);
|
||||
|
||||
// Convert 32-byte endpoint ID to 16-byte UUID by taking first 16 bytes
|
||||
let id_bytes = endpoint_id.as_bytes();
|
||||
let mut uuid_bytes = [0u8; 16];
|
||||
uuid_bytes.copy_from_slice(&id_bytes[..16]);
|
||||
let node_id = Uuid::from_bytes(uuid_bytes);
|
||||
|
||||
println!(" Spawning gossip protocol...");
|
||||
// Build the gossip protocol
|
||||
let gossip = Gossip::builder().spawn(endpoint.clone());
|
||||
|
||||
println!(" Setting up router...");
|
||||
// Setup the router to handle incoming connections
|
||||
let router = Router::builder(endpoint.clone())
|
||||
.accept(iroh_gossip::ALPN, gossip.clone())
|
||||
.spawn();
|
||||
|
||||
// Add bootstrap peers using StaticProvider for direct localhost connections
|
||||
let bootstrap_count = bootstrap_addrs.len();
|
||||
let has_bootstrap_peers = !bootstrap_addrs.is_empty();
|
||||
|
||||
// Collect bootstrap IDs before moving the addresses
|
||||
let bootstrap_ids: Vec<_> = bootstrap_addrs.iter().map(|a| a.id).collect();
|
||||
|
||||
if has_bootstrap_peers {
|
||||
let static_provider = StaticProvider::default();
|
||||
for addr in &bootstrap_addrs {
|
||||
static_provider.add_endpoint_info(addr.clone());
|
||||
}
|
||||
endpoint.discovery().add(static_provider);
|
||||
println!(" Added {} bootstrap peers to discovery", bootstrap_count);
|
||||
|
||||
// Connect to bootstrap peers (localhost connections are instant)
|
||||
for addr in &bootstrap_addrs {
|
||||
match endpoint.connect(addr.clone(), iroh_gossip::ALPN).await {
|
||||
Ok(_conn) => println!(" ✓ Connected to {}", addr.id),
|
||||
Err(e) => println!(" ✗ Connection failed: {}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe to the topic
|
||||
let subscribe_handle = gossip.subscribe(topic_id, bootstrap_ids).await?;
|
||||
let (sender, mut receiver) = subscribe_handle.split();
|
||||
|
||||
// Wait for join if we have bootstrap peers (should be instant on localhost)
|
||||
if has_bootstrap_peers {
|
||||
match tokio::time::timeout(Duration::from_millis(500), receiver.joined()).await {
|
||||
Ok(Ok(())) => println!(" ✓ Join completed"),
|
||||
Ok(Err(e)) => println!(" ✗ Join error: {}", e),
|
||||
Err(_) => println!(" ⚠ Join timeout (proceeding anyway)"),
|
||||
}
|
||||
}
|
||||
|
||||
// Create bridge and wire it up
|
||||
let bridge = GossipBridge::new(node_id);
|
||||
println!(" Spawning bridge tasks...");
|
||||
|
||||
// Spawn background tasks to forward messages between gossip and bridge
|
||||
spawn_gossip_bridge_tasks(sender, receiver, bridge.clone());
|
||||
|
||||
println!(" Node initialization complete");
|
||||
Ok((endpoint, gossip, router, bridge))
|
||||
}
|
||||
|
||||
/// Spawn background tasks to forward messages between iroh-gossip and GossipBridge
|
||||
///
|
||||
/// This creates two tokio tasks:
|
||||
/// 1. Forward from bridge.outgoing → gossip sender (broadcasts to peers)
|
||||
/// 2. Forward from gossip receiver → bridge.incoming (receives from peers)
|
||||
///
|
||||
/// These tasks run indefinitely and handle serialization/deserialization.
|
||||
pub fn spawn_gossip_bridge_tasks(
|
||||
sender: GossipSender,
|
||||
mut receiver: GossipReceiver,
|
||||
bridge: GossipBridge,
|
||||
) {
|
||||
let node_id = bridge.node_id();
|
||||
|
||||
// Task 1: Forward from bridge.outgoing → gossip sender
|
||||
let bridge_out = bridge.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut msg_count = 0;
|
||||
loop {
|
||||
// Poll the bridge's outgoing queue
|
||||
if let Some(versioned_msg) = bridge_out.try_recv_outgoing() {
|
||||
msg_count += 1;
|
||||
println!(
|
||||
"[Node {}] Sending message #{} via gossip",
|
||||
node_id, msg_count
|
||||
);
|
||||
// Serialize the message
|
||||
match rkyv::to_bytes::<rkyv::rancor::Failure>(&versioned_msg).map(|b| b.to_vec()) {
|
||||
Ok(bytes) => {
|
||||
// Broadcast via gossip
|
||||
if let Err(e) = sender.broadcast(bytes.into()).await {
|
||||
eprintln!("[Node {}] Failed to broadcast message: {}", node_id, e);
|
||||
} else {
|
||||
println!(
|
||||
"[Node {}] Message #{} broadcasted successfully",
|
||||
node_id, msg_count
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => eprintln!(
|
||||
"[Node {}] Failed to serialize message for broadcast: {}",
|
||||
node_id, e
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
// Small delay to avoid spinning
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
});
|
||||
|
||||
// Task 2: Forward from gossip receiver → bridge.incoming
|
||||
let bridge_in = bridge.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut msg_count = 0;
|
||||
println!("[Node {}] Gossip receiver task started", node_id);
|
||||
loop {
|
||||
// Receive from gossip (GossipReceiver is a Stream)
|
||||
match tokio::time::timeout(Duration::from_millis(100), receiver.next()).await {
|
||||
Ok(Some(Ok(event))) => {
|
||||
println!(
|
||||
"[Node {}] Received gossip event: {:?}",
|
||||
node_id,
|
||||
std::mem::discriminant(&event)
|
||||
);
|
||||
if let iroh_gossip::api::Event::Received(msg) = event {
|
||||
msg_count += 1;
|
||||
println!(
|
||||
"[Node {}] Received message #{} from gossip",
|
||||
node_id, msg_count
|
||||
);
|
||||
// Deserialize the message
|
||||
match rkyv::from_bytes::<VersionedMessage, rkyv::rancor::Failure>(&msg.content) {
|
||||
Ok(versioned_msg) => {
|
||||
// Push to bridge's incoming queue
|
||||
if let Err(e) = bridge_in.push_incoming(versioned_msg) {
|
||||
eprintln!(
|
||||
"[Node {}] Failed to push to bridge incoming: {}",
|
||||
node_id, e
|
||||
);
|
||||
} else {
|
||||
println!(
|
||||
"[Node {}] Message #{} pushed to bridge incoming",
|
||||
node_id, msg_count
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => eprintln!(
|
||||
"[Node {}] Failed to deserialize gossip message: {}",
|
||||
node_id, e
|
||||
),
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Some(Err(e))) => {
|
||||
eprintln!("[Node {}] Gossip receiver error: {}", node_id, e)
|
||||
}
|
||||
Ok(None) => {
|
||||
// Stream ended
|
||||
println!("[Node {}] Gossip stream ended", node_id);
|
||||
break;
|
||||
}
|
||||
Err(_) => {
|
||||
// Timeout, no message available
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Setup a pair of iroh-gossip nodes connected to the same topic
|
||||
///
|
||||
/// This creates two nodes:
|
||||
/// - Node 1: Initialized first with no bootstrap peers
|
||||
/// - Node 2: Bootstraps from Node 1's address
|
||||
///
|
||||
/// Both nodes are subscribed to the same topic and connected via localhost.
|
||||
///
|
||||
/// # Returns
|
||||
/// Tuple of (endpoint1, endpoint2, router1, router2, bridge1, bridge2)
|
||||
pub async fn setup_gossip_pair() -> Result<(
|
||||
Endpoint,
|
||||
Endpoint,
|
||||
Router,
|
||||
Router,
|
||||
GossipBridge,
|
||||
GossipBridge,
|
||||
)> {
|
||||
// Use a shared topic for both nodes
|
||||
let topic_id = TopicId::from_bytes([42; 32]);
|
||||
println!("Using topic ID: {:?}", topic_id);
|
||||
|
||||
// Initialize node 1 with no bootstrap peers
|
||||
println!("Initializing node 1...");
|
||||
let (ep1, _gossip1, router1, bridge1) = init_gossip_node(topic_id, vec![]).await?;
|
||||
println!("Node 1 initialized with ID: {}", ep1.addr().id);
|
||||
|
||||
// Get node 1's full address (ID + network addresses) for node 2 to bootstrap from
|
||||
let node1_addr = ep1.addr().clone();
|
||||
println!("Node 1 full address: {:?}", node1_addr);
|
||||
|
||||
// Initialize node 2 with node 1's full address as bootstrap peer
|
||||
println!("Initializing node 2 with bootstrap peer: {}", node1_addr.id);
|
||||
let (ep2, _gossip2, router2, bridge2) =
|
||||
init_gossip_node(topic_id, vec![node1_addr]).await?;
|
||||
println!("Node 2 initialized with ID: {}", ep2.addr().id);
|
||||
|
||||
// Brief wait for gossip protocol to stabilize (localhost is fast)
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
|
||||
Ok((ep1, ep2, router1, router2, bridge1, bridge2))
|
||||
}
|
||||
|
||||
/// Setup three iroh-gossip nodes connected to the same topic
|
||||
///
|
||||
/// This creates three nodes:
|
||||
/// - Node 1: Initialized first with no bootstrap peers
|
||||
/// - Node 2: Bootstraps from Node 1
|
||||
/// - Node 3: Bootstraps from both Node 1 and Node 2
|
||||
///
|
||||
/// All nodes are subscribed to the same topic and connected via localhost.
|
||||
///
|
||||
/// # Returns
|
||||
/// Tuple of (ep1, ep2, ep3, router1, router2, router3, bridge1, bridge2, bridge3)
|
||||
pub async fn setup_gossip_trio() -> Result<(
|
||||
Endpoint,
|
||||
Endpoint,
|
||||
Endpoint,
|
||||
Router,
|
||||
Router,
|
||||
Router,
|
||||
GossipBridge,
|
||||
GossipBridge,
|
||||
GossipBridge,
|
||||
)> {
|
||||
let topic_id = TopicId::from_bytes([42; 32]);
|
||||
println!("Using topic ID: {:?}", topic_id);
|
||||
|
||||
// Initialize node 1
|
||||
println!("Initializing node 1...");
|
||||
let (ep1, _gossip1, router1, bridge1) = init_gossip_node(topic_id, vec![]).await?;
|
||||
println!("Node 1 initialized with ID: {}", ep1.addr().id);
|
||||
|
||||
let node1_addr = ep1.addr().clone();
|
||||
|
||||
// Initialize node 2 with node 1 as bootstrap
|
||||
println!("Initializing node 2 with bootstrap peer: {}", node1_addr.id);
|
||||
let (ep2, _gossip2, router2, bridge2) =
|
||||
init_gossip_node(topic_id, vec![node1_addr.clone()]).await?;
|
||||
println!("Node 2 initialized with ID: {}", ep2.addr().id);
|
||||
|
||||
// Initialize node 3 with both node 1 and node 2 as bootstrap
|
||||
let node2_addr = ep2.addr().clone();
|
||||
println!("Initializing node 3 with bootstrap peers: {} and {}", node1_addr.id, node2_addr.id);
|
||||
let (ep3, _gossip3, router3, bridge3) =
|
||||
init_gossip_node(topic_id, vec![node1_addr, node2_addr]).await?;
|
||||
println!("Node 3 initialized with ID: {}", ep3.addr().id);
|
||||
|
||||
// Brief wait for gossip protocol to stabilize
|
||||
tokio::time::sleep(Duration::from_millis(300)).await;
|
||||
|
||||
Ok((ep1, ep2, ep3, router1, router2, router3, bridge1, bridge2, bridge3))
|
||||
}
|
||||
144
crates/libmarathon/tests/test_utils/mod.rs
Normal file
144
crates/libmarathon/tests/test_utils/mod.rs
Normal file
@@ -0,0 +1,144 @@
|
||||
//! Shared test utilities for integration tests
|
||||
//!
|
||||
//! This module provides common test infrastructure that all integration tests use:
|
||||
//! - Real iroh-gossip setup with localhost connections
|
||||
//! - Test app creation with networking + persistence
|
||||
//! - Wait helpers for async sync verification
|
||||
|
||||
pub mod gossip;
|
||||
|
||||
pub use gossip::{init_gossip_node, setup_gossip_pair, setup_gossip_trio, spawn_gossip_bridge_tasks};
|
||||
|
||||
use anyhow::Result;
|
||||
use bevy::{
|
||||
MinimalPlugins,
|
||||
app::{App, ScheduleRunnerPlugin},
|
||||
prelude::*,
|
||||
};
|
||||
use libmarathon::{
|
||||
networking::{
|
||||
GossipBridge,
|
||||
NetworkingConfig,
|
||||
NetworkingPlugin,
|
||||
},
|
||||
persistence::{
|
||||
PersistenceConfig,
|
||||
PersistencePlugin,
|
||||
},
|
||||
};
|
||||
use std::{
|
||||
path::PathBuf,
|
||||
time::Duration,
|
||||
};
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::Instant;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Test context that manages temporary directories with RAII cleanup
|
||||
pub struct TestContext {
|
||||
temp_dir: TempDir,
|
||||
}
|
||||
|
||||
impl TestContext {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
temp_dir: TempDir::new().expect("Failed to create temp directory"),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn db_path(&self) -> PathBuf {
|
||||
self.temp_dir.path().join("test.db")
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a test app with networking and persistence
|
||||
pub fn create_test_app(node_id: Uuid, db_path: PathBuf, bridge: GossipBridge) -> App {
|
||||
create_test_app_maybe_offline(node_id, db_path, Some(bridge))
|
||||
}
|
||||
|
||||
/// Create a test app with optional bridge (for testing offline scenarios)
|
||||
pub fn create_test_app_maybe_offline(node_id: Uuid, db_path: PathBuf, bridge: Option<GossipBridge>) -> App {
|
||||
let mut app = App::new();
|
||||
|
||||
app.add_plugins(MinimalPlugins.set(ScheduleRunnerPlugin::run_loop(
|
||||
Duration::from_secs_f64(1.0 / 60.0),
|
||||
)))
|
||||
.add_plugins(NetworkingPlugin::new(NetworkingConfig {
|
||||
node_id,
|
||||
sync_interval_secs: 0.5,
|
||||
prune_interval_secs: 10.0,
|
||||
tombstone_gc_interval_secs: 30.0,
|
||||
}))
|
||||
.add_plugins(PersistencePlugin::with_config(
|
||||
db_path,
|
||||
PersistenceConfig {
|
||||
flush_interval_secs: 1,
|
||||
checkpoint_interval_secs: 5,
|
||||
battery_adaptive: false,
|
||||
..Default::default()
|
||||
},
|
||||
));
|
||||
|
||||
// Insert bridge if provided (online mode)
|
||||
if let Some(bridge) = bridge {
|
||||
app.insert_resource(bridge);
|
||||
}
|
||||
|
||||
app
|
||||
}
|
||||
|
||||
/// Wait for sync condition to be met, polling both apps
|
||||
pub async fn wait_for_sync<F>(
|
||||
app1: &mut App,
|
||||
app2: &mut App,
|
||||
timeout: Duration,
|
||||
check_fn: F,
|
||||
) -> Result<()>
|
||||
where
|
||||
F: Fn(&mut World, &mut World) -> bool,
|
||||
{
|
||||
let start = Instant::now();
|
||||
let mut tick_count = 0;
|
||||
|
||||
while start.elapsed() < timeout {
|
||||
// Tick both apps
|
||||
app1.update();
|
||||
app2.update();
|
||||
tick_count += 1;
|
||||
|
||||
if tick_count % 50 == 0 {
|
||||
println!(
|
||||
"Waiting for sync... tick {} ({:.1}s elapsed)",
|
||||
tick_count,
|
||||
start.elapsed().as_secs_f32()
|
||||
);
|
||||
}
|
||||
|
||||
// Check condition
|
||||
if check_fn(app1.world_mut(), app2.world_mut()) {
|
||||
println!(
|
||||
"Sync completed after {} ticks ({:.3}s)",
|
||||
tick_count,
|
||||
start.elapsed().as_secs_f32()
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Small delay to avoid spinning
|
||||
tokio::time::sleep(Duration::from_millis(16)).await;
|
||||
}
|
||||
|
||||
println!("Sync timeout after {} ticks", tick_count);
|
||||
anyhow::bail!("Sync timeout after {:?}. Condition not met.", timeout)
|
||||
}
|
||||
|
||||
/// Count entities with a specific network_id
|
||||
pub fn count_entities_with_id(world: &mut World, network_id: Uuid) -> usize {
|
||||
use libmarathon::networking::NetworkedEntity;
|
||||
|
||||
let mut query = world.query::<&NetworkedEntity>();
|
||||
query
|
||||
.iter(world)
|
||||
.filter(|ne| ne.network_id == network_id)
|
||||
.count()
|
||||
}
|
||||
Reference in New Issue
Block a user