//! Headless integration tests for networking and persistence //! //! These tests validate end-to-end CRDT synchronization and persistence //! using multiple headless Bevy apps with real iroh-gossip networking. mod test_utils; use std::{ path::PathBuf, time::{ Duration, Instant, }, }; use anyhow::Result; use test_utils::{setup_gossip_pair, TestContext, wait_for_sync}; use bevy::{ MinimalPlugins, app::{ App, ScheduleRunnerPlugin, }, ecs::{ component::Component, reflect::ReflectComponent, world::World, }, prelude::*, reflect::Reflect, }; use futures_lite::StreamExt; use iroh::{ Endpoint, protocol::Router, }; use iroh_gossip::{ api::{ GossipReceiver, GossipSender, }, net::Gossip, proto::TopicId, }; use libmarathon::{ networking::{ EntityLockRegistry, GossipBridge, LockMessage, NetworkedEntity, NetworkedTransform, NetworkingConfig, NetworkingPlugin, Synced, SyncMessage, VersionedMessage, }, persistence::{ Persisted, PersistenceConfig, PersistencePlugin, }, }; // Note: Test components use rkyv instead of serde use tempfile::TempDir; use uuid::Uuid; // ============================================================================ // Test Components // ============================================================================ /// Simple position component for testing sync #[libmarathon_macros::synced] #[derive(Reflect, PartialEq)] #[reflect(Component)] struct TestPosition { x: f32, y: f32, } /// Simple health component for testing sync #[libmarathon_macros::synced] #[derive(Reflect, PartialEq)] #[reflect(Component)] struct TestHealth { current: f32, max: f32, } // ============================================================================ // Test-Specific Utilities // ============================================================================ // Common utilities (TestContext, wait_for_sync, gossip setup) are in shared test_utils // These are specific to this test file (DB checks, TestPosition assertions) use rusqlite::Connection; /// Helper to ensure FixedUpdate and FixedPostUpdate run (since they're on a fixed timestep) fn update_with_fixed(app: &mut App) { use bevy::prelude::{FixedUpdate, FixedPostUpdate}; // Run Main schedule (which includes Update) app.update(); // Explicitly run FixedUpdate to ensure systems there execute app.world_mut().run_schedule(FixedUpdate); // Explicitly run FixedPostUpdate to ensure delta generation executes app.world_mut().run_schedule(FixedPostUpdate); } /// Check if an entity exists in the database fn entity_exists_in_db(db_path: &PathBuf, entity_id: Uuid) -> Result { let conn = Connection::open(db_path)?; let entity_id_bytes = entity_id.as_bytes(); let exists: bool = conn.query_row( "SELECT COUNT(*) > 0 FROM entities WHERE id = ?1", [entity_id_bytes.as_slice()], |row| row.get(0), )?; Ok(exists) } /// Check if a component exists for an entity in the database fn component_exists_in_db( db_path: &PathBuf, entity_id: Uuid, component_type: &str, ) -> Result { let conn = Connection::open(db_path)?; let entity_id_bytes = entity_id.as_bytes(); let exists: bool = conn.query_row( "SELECT COUNT(*) > 0 FROM components WHERE entity_id = ?1 AND component_type = ?2", rusqlite::params![entity_id_bytes.as_slice(), component_type], |row| row.get(0), )?; Ok(exists) } /// Load a component from the database and deserialize it /// TODO: Rewrite to use ComponentTypeRegistry instead of reflection #[allow(dead_code)] fn load_component_from_db( _db_path: &PathBuf, _entity_id: Uuid, _component_type: &str, ) -> Result> { // This function needs to be rewritten to use ComponentTypeRegistry // For now, return None to allow tests to compile Ok(None) } /// Create a test app with TestPosition and TestHealth registered fn create_test_app(node_id: Uuid, db_path: PathBuf, bridge: GossipBridge) -> App { let mut app = test_utils::create_test_app(node_id, db_path, bridge); // Register test-specific component types app.register_type::() .register_type::(); app } /// Assert that an entity with specific network ID and position exists fn assert_entity_synced( world: &mut World, network_id: Uuid, expected_position: TestPosition, ) -> Result<()> { let mut query = world.query::<(&NetworkedEntity, &TestPosition)>(); for (entity, position) in query.iter(world) { if entity.network_id == network_id { if position == &expected_position { return Ok(()); } else { anyhow::bail!( "Position mismatch for entity {}: expected {:?}, got {:?}", network_id, expected_position, position ); } } } anyhow::bail!("Entity {} not found in world", network_id) } // ============================================================================ // Integration Tests // ============================================================================ use test_utils::count_entities_with_id; #[tokio::test(flavor = "multi_thread")] async fn test_basic_entity_sync() -> Result<()> { println!("=== Starting test_basic_entity_sync ==="); let ctx1 = TestContext::new(); let ctx2 = TestContext::new(); let (ep1, ep2, router1, router2, bridge1, bridge2) = setup_gossip_pair().await?; let node1_id = bridge1.node_id(); let node2_id = bridge2.node_id(); let mut app1 = create_test_app(node1_id, ctx1.db_path(), bridge1); let mut app2 = create_test_app(node2_id, ctx2.db_path(), bridge2); println!("Node1 ID: {}", node1_id); println!("Node2 ID: {}", node2_id); println!("=== Starting test_basic_entity_sync ==="); // Setup contexts println!("Creating test contexts..."); let ctx1 = TestContext::new(); let ctx2 = TestContext::new(); // Setup gossip networking println!("Setting up gossip pair..."); let (ep1, ep2, router1, router2, bridge1, bridge2) = setup_gossip_pair().await?; let node1_id = bridge1.node_id(); let node2_id = bridge2.node_id(); // Create headless apps println!("Creating Bevy apps..."); let mut app1 = create_test_app(node1_id, ctx1.db_path(), bridge1); let mut app2 = create_test_app(node2_id, ctx2.db_path(), bridge2); println!("Apps created successfully"); println!("Node 1 ID: {}", node1_id); println!("Node 2 ID: {}", node2_id); // Node 1 spawns entity let entity_id = Uuid::new_v4(); println!("Spawning entity {} on node 1", entity_id); let spawned_entity = app1 .world_mut() .spawn(( NetworkedEntity::with_id(entity_id, node1_id), TestPosition { x: 10.0, y: 20.0 }, Persisted::with_id(entity_id), Synced, )) .id(); // IMPORTANT: Trigger change detection for persistence // Bevy only marks components as "changed" when mutated, not on spawn // Access Persisted mutably to trigger the persistence system { let world = app1.world_mut(); if let Ok(mut entity_mut) = world.get_entity_mut(spawned_entity) { if let Some(mut persisted) = entity_mut.get_mut::() { // Dereferencing the mutable borrow triggers change detection let _ = &mut *persisted; } } } println!("Entity spawned, triggered persistence"); println!("Entity spawned, starting sync wait..."); // Wait for sync wait_for_sync(&mut app1, &mut app2, Duration::from_secs(10), |_, w2| { let count = count_entities_with_id(w2, entity_id); if count > 0 { println!("✓ Entity found on node 2!"); true } else { // Debug: print what entities we DO have let all_networked: Vec = { let mut query = w2.query::<&NetworkedEntity>(); query.iter(w2).map(|ne| ne.network_id).collect() }; if !all_networked.is_empty() { println!( " Node 2 has {} networked entities: {:?}", all_networked.len(), all_networked ); println!(" Looking for: {}", entity_id); } false } }) .await?; // Update app2 one more time to ensure queued commands are applied println!("Running final update to flush commands..."); app2.update(); // Debug: Check what components the entity has { let world = app2.world_mut(); let mut query = world.query::<(&NetworkedEntity, Option<&TestPosition>)>(); for (ne, pos) in query.iter(world) { if ne.network_id == entity_id { println!("Debug: Entity {} has NetworkedEntity", entity_id); if let Some(pos) = pos { println!("Debug: Entity has TestPosition: {:?}", pos); } else { println!("Debug: Entity MISSING TestPosition component!"); } } } } // Verify entity synced to node 2 (in-memory check) assert_entity_synced( app2.world_mut(), entity_id, TestPosition { x: 10.0, y: 20.0 }, )?; println!("✓ Entity synced in-memory on node 2"); // Give persistence system time to flush to disk // The persistence system runs on Update with a 1-second flush interval println!("Waiting for persistence to flush..."); for _ in 0..15 { app1.update(); app2.update(); tokio::time::sleep(Duration::from_millis(100)).await; } // Verify persistence on Node 1 (originating node) println!("Checking Node 1 database persistence..."); assert!( entity_exists_in_db(&ctx1.db_path(), entity_id)?, "Entity {} should exist in Node 1 database", entity_id ); assert!( component_exists_in_db( &ctx1.db_path(), entity_id, "sync_integration_headless::TestPosition" )?, "TestPosition component should exist in Node 1 database" ); // TODO: Rewrite this test to use ComponentTypeRegistry instead of reflection // let node1_position = { // load_component_from_db::( // &ctx1.db_path(), // entity_id, // "sync_integration_headless::TestPosition", // )? // }; // assert_eq!( // node1_position, // Some(TestPosition { x: 10.0, y: 20.0 }), // "TestPosition data should be correctly persisted in Node 1 database" // ); println!("✓ Node 1 persistence verified"); // Verify persistence on Node 2 (receiving node after sync) println!("Checking Node 2 database persistence..."); assert!( entity_exists_in_db(&ctx2.db_path(), entity_id)?, "Entity {} should exist in Node 2 database after sync", entity_id ); assert!( component_exists_in_db( &ctx2.db_path(), entity_id, "sync_integration_headless::TestPosition" )?, "TestPosition component should exist in Node 2 database after sync" ); // TODO: Rewrite this test to use ComponentTypeRegistry instead of reflection // let node2_position = { // load_component_from_db::( // &ctx2.db_path(), // entity_id, // "sync_integration_headless::TestPosition", // )? // }; // assert_eq!( // node2_position, // Some(TestPosition { x: 10.0, y: 20.0 }), // "TestPosition data should be correctly persisted in Node 2 database after sync" // ); println!("✓ Node 2 persistence verified"); println!("✓ Full sync and persistence test passed!"); // Cleanup router1.shutdown().await?; router2.shutdown().await?; ep1.close().await; ep2.close().await; Ok(()) } /// Test 2: Bidirectional sync (both nodes modify different entities) #[tokio::test(flavor = "multi_thread")] async fn test_bidirectional_sync() -> Result<()> { use test_utils::*; let ctx1 = TestContext::new(); let ctx2 = TestContext::new(); let (ep1, ep2, router1, router2, bridge1, bridge2) = setup_gossip_pair().await?; let node1_id = bridge1.node_id(); let node2_id = bridge2.node_id(); let mut app1 = create_test_app(node1_id, ctx1.db_path(), bridge1); let mut app2 = create_test_app(node2_id, ctx2.db_path(), bridge2); // Node 1 spawns entity A let entity_a = Uuid::new_v4(); let entity_a_bevy = app1 .world_mut() .spawn(( NetworkedEntity::with_id(entity_a, node1_id), TestPosition { x: 1.0, y: 2.0 }, Persisted::with_id(entity_a), Synced, )) .id(); // Trigger persistence for entity A { let world = app1.world_mut(); if let Ok(mut entity_mut) = world.get_entity_mut(entity_a_bevy) { if let Some(mut persisted) = entity_mut.get_mut::() { let _ = &mut *persisted; } } } // Node 2 spawns entity B let entity_b = Uuid::new_v4(); let entity_b_bevy = app2 .world_mut() .spawn(( NetworkedEntity::with_id(entity_b, node2_id), TestPosition { x: 3.0, y: 4.0 }, Persisted::with_id(entity_b), Synced, )) .id(); // Trigger persistence for entity B { let world = app2.world_mut(); if let Ok(mut entity_mut) = world.get_entity_mut(entity_b_bevy) { if let Some(mut persisted) = entity_mut.get_mut::() { let _ = &mut *persisted; } } } // Wait for bidirectional sync wait_for_sync(&mut app1, &mut app2, Duration::from_secs(5), |w1, w2| { count_entities_with_id(w1, entity_b) > 0 && count_entities_with_id(w2, entity_a) > 0 }) .await?; // Verify both nodes have both entities assert_entity_synced(app1.world_mut(), entity_b, TestPosition { x: 3.0, y: 4.0 })?; assert_entity_synced(app2.world_mut(), entity_a, TestPosition { x: 1.0, y: 2.0 })?; println!("✓ Bidirectional sync test passed"); router1.shutdown().await?; router2.shutdown().await?; ep1.close().await; ep2.close().await; Ok(()) } /// Test 3: Concurrent conflict resolution (LWW merge semantics) #[tokio::test(flavor = "multi_thread")] async fn test_concurrent_conflict_resolution() -> Result<()> { use test_utils::*; let ctx1 = TestContext::new(); let ctx2 = TestContext::new(); let (ep1, ep2, router1, router2, bridge1, bridge2) = setup_gossip_pair().await?; let node1_id = bridge1.node_id(); let node2_id = bridge2.node_id(); let mut app1 = create_test_app(node1_id, ctx1.db_path(), bridge1); let mut app2 = create_test_app(node2_id, ctx2.db_path(), bridge2); // Spawn shared entity on node 1 with Transform (which IS tracked for changes) let entity_id = Uuid::new_v4(); let entity_bevy = app1 .world_mut() .spawn(( NetworkedEntity::with_id(entity_id, node1_id), NetworkedTransform::default(), Transform::from_xyz(0.0, 0.0, 0.0), Persisted::with_id(entity_id), Synced, )) .id(); // Trigger persistence { let world = app1.world_mut(); if let Ok(mut entity_mut) = world.get_entity_mut(entity_bevy) { if let Some(mut persisted) = entity_mut.get_mut::() { let _ = &mut *persisted; } } } // Wait for initial sync wait_for_sync(&mut app1, &mut app2, Duration::from_secs(2), |_, w2| { count_entities_with_id(w2, entity_id) > 0 }) .await?; println!("✓ Initial sync complete, both nodes have the entity"); // Check what components the entity has on each node { let world1 = app1.world_mut(); let mut query1 = world1.query::<( Entity, &NetworkedEntity, Option<&NetworkedTransform>, &Transform, )>(); println!("Node 1 entities:"); for (entity, ne, nt, t) in query1.iter(world1) { println!( " Entity {:?}: NetworkedEntity({:?}), NetworkedTransform={}, Transform=({}, {}, {})", entity, ne.network_id, nt.is_some(), t.translation.x, t.translation.y, t.translation.z ); } } { let world2 = app2.world_mut(); let mut query2 = world2.query::<( Entity, &NetworkedEntity, Option<&NetworkedTransform>, &Transform, )>(); println!("Node 2 entities:"); for (entity, ne, nt, t) in query2.iter(world2) { println!( " Entity {:?}: NetworkedEntity({:?}), NetworkedTransform={}, Transform=({}, {}, {})", entity, ne.network_id, nt.is_some(), t.translation.x, t.translation.y, t.translation.z ); } } // Both nodes modify the same entity concurrently // Node 1 update (earlier timestamp) { let mut query1 = app1.world_mut().query::<&mut Transform>(); for mut transform in query1.iter_mut(app1.world_mut()) { println!("Node 1: Modifying Transform to (10, 10)"); transform.translation.x = 10.0; transform.translation.y = 10.0; } } app1.update(); // Generate and send delta // Small delay to ensure node 2's change has a later vector clock tokio::time::sleep(Duration::from_millis(100)).await; // Node 2 update (later timestamp, should win with LWW) { let mut query2 = app2.world_mut().query::<&mut Transform>(); let count = query2.iter(app2.world()).count(); println!("Node 2: Found {} entities with Transform", count); for mut transform in query2.iter_mut(app2.world_mut()) { println!("Node 2: Modifying Transform to (20, 20)"); transform.translation.x = 20.0; transform.translation.y = 20.0; } } app2.update(); // Generate and send delta println!("Both nodes modified the entity, waiting for convergence..."); // Wait for convergence - both nodes should have the same Transform value wait_for_sync(&mut app1, &mut app2, Duration::from_secs(5), |w1, w2| { let mut query1 = w1.query::<&Transform>(); let mut query2 = w2.query::<&Transform>(); let transforms1: Vec<_> = query1.iter(w1).collect(); let transforms2: Vec<_> = query2.iter(w2).collect(); if transforms1.is_empty() || transforms2.is_empty() { return false; } let t1 = transforms1[0]; let t2 = transforms2[0]; // Check if they converged (within floating point tolerance) let converged = (t1.translation.x - t2.translation.x).abs() < 0.01 && (t1.translation.y - t2.translation.y).abs() < 0.01; if converged { println!( "✓ Nodes converged to: ({}, {})", t1.translation.x, t1.translation.y ); } converged }) .await?; println!("✓ Conflict resolution test passed (converged)"); router1.shutdown().await?; router2.shutdown().await?; ep1.close().await; ep2.close().await; Ok(()) } /// Test 4: Persistence crash recovery /// /// NOTE: This test is expected to initially fail as the persistence system /// doesn't currently have entity loading on startup. This test documents /// the gap and will drive future feature implementation. #[tokio::test(flavor = "multi_thread")] #[ignore = "Persistence loading not yet implemented - documents gap"] async fn test_persistence_crash_recovery() -> Result<()> { use test_utils::*; let ctx = TestContext::new(); let node_id = Uuid::new_v4(); let entity_id = Uuid::new_v4(); // Phase 1: Create entity, persist, "crash" { let bridge = GossipBridge::new(node_id); let mut app = create_test_app(node_id, ctx.db_path(), bridge); app.world_mut().spawn(( NetworkedEntity::with_id(entity_id, node_id), TestPosition { x: 100.0, y: 200.0 }, Persisted::with_id(entity_id), Synced, )); // Tick until flushed (2 seconds at 60 FPS) for _ in 0..120 { app.update(); tokio::time::sleep(Duration::from_millis(16)).await; } println!("Phase 1: Entity persisted, simulating crash..."); // App drops here (crash simulation) } // Phase 2: Restart app, verify state restored { let bridge = GossipBridge::new(node_id); let mut app = create_test_app(node_id, ctx.db_path(), bridge); // TODO: Need startup system to load entities from persistence // This is currently missing from the implementation app.update(); // Verify entity loaded from database assert_entity_synced( app.world_mut(), entity_id, TestPosition { x: 100.0, y: 200.0 }, ) .map_err(|e| anyhow::anyhow!("Persistence recovery failed: {}", e))?; println!("✓ Crash recovery test passed"); } Ok(()) } /// Test 5: Lock heartbeat renewal mechanism #[tokio::test(flavor = "multi_thread")] async fn test_lock_heartbeat_renewal() -> Result<()> { use test_utils::*; println!("=== Starting test_lock_heartbeat_renewal ==="); let ctx1 = TestContext::new(); let ctx2 = TestContext::new(); let (ep1, ep2, router1, router2, bridge1, bridge2) = setup_gossip_pair().await?; let node1_id = bridge1.node_id(); let node2_id = bridge2.node_id(); let mut app1 = create_test_app(node1_id, ctx1.db_path(), bridge1); let mut app2 = create_test_app(node2_id, ctx2.db_path(), bridge2); // Spawn entity let entity_id = Uuid::new_v4(); let _ = app1.world_mut() .spawn(( NetworkedEntity::with_id(entity_id, node1_id), TestPosition { x: 10.0, y: 20.0 }, Persisted::with_id(entity_id), Synced, )) .id(); wait_for_sync(&mut app1, &mut app2, Duration::from_secs(3), |_, w2| { count_entities_with_id(w2, entity_id) > 0 }) .await?; println!("✓ Entity synced"); // Acquire lock on both nodes { let world = app1.world_mut(); let mut registry = world.resource_mut::(); registry.try_acquire(entity_id, node1_id).ok(); } { let bridge = app1.world().resource::(); let msg = VersionedMessage::new(SyncMessage::Lock(LockMessage::LockRequest { entity_id, node_id: node1_id, })); bridge.send(msg).ok(); } for _ in 0..5 { app1.update(); app2.update(); tokio::time::sleep(Duration::from_millis(100)).await; } // Verify both nodes have the lock { let registry1 = app1.world().resource::(); let registry2 = app2.world().resource::(); assert!(registry1.is_locked(entity_id, node1_id), "Lock should exist on node 1"); assert!(registry2.is_locked(entity_id, node2_id), "Lock should exist on node 2"); println!("✓ Lock acquired on both nodes"); } // Test heartbeat renewal: send a few heartbeats and verify locks persist for i in 0..3 { // Renew on node 1 { let world = app1.world_mut(); let mut registry = world.resource_mut::(); assert!( registry.renew_heartbeat(entity_id, node1_id), "Should successfully renew lock" ); } // Send heartbeat to node 2 { let bridge = app1.world().resource::(); let msg = VersionedMessage::new(SyncMessage::Lock(LockMessage::LockHeartbeat { entity_id, holder: node1_id, })); bridge.send(msg).ok(); } // Process for _ in 0..3 { app1.update(); app2.update(); tokio::time::sleep(Duration::from_millis(50)).await; } // Verify locks still exist after heartbeat { let registry1 = app1.world().resource::(); let registry2 = app2.world().resource::(); assert!( registry1.is_locked(entity_id, node1_id), "Lock should persist on node 1 after heartbeat {}", i + 1 ); assert!( registry2.is_locked(entity_id, node2_id), "Lock should persist on node 2 after heartbeat {}", i + 1 ); } } println!("✓ Heartbeat renewal mechanism working correctly"); router1.shutdown().await?; router2.shutdown().await?; ep1.close().await; ep2.close().await; Ok(()) } /// Test 6: Lock expires without heartbeats #[tokio::test(flavor = "multi_thread")] async fn test_lock_heartbeat_expiration() -> Result<()> { use test_utils::*; println!("=== Starting test_lock_heartbeat_expiration ==="); let ctx1 = TestContext::new(); let ctx2 = TestContext::new(); let (ep1, ep2, router1, router2, bridge1, bridge2) = setup_gossip_pair().await?; let node1_id = bridge1.node_id(); let node2_id = bridge2.node_id(); let mut app1 = create_test_app(node1_id, ctx1.db_path(), bridge1); let mut app2 = create_test_app(node2_id, ctx2.db_path(), bridge2); // Node 1 spawns entity and selects it let entity_id = Uuid::new_v4(); let _ = app1.world_mut() .spawn(( NetworkedEntity::with_id(entity_id, node1_id), TestPosition { x: 10.0, y: 20.0 }, Persisted::with_id(entity_id), Synced, )) .id(); // Wait for sync wait_for_sync(&mut app1, &mut app2, Duration::from_secs(5), |_, w2| { count_entities_with_id(w2, entity_id) > 0 }) .await?; // Acquire lock locally on node 1 (gossip doesn't loop back to sender) { let world = app1.world_mut(); let mut registry = world.resource_mut::(); registry.try_acquire(entity_id, node1_id).ok(); } // Broadcast LockRequest so other nodes apply optimistically { let bridge = app1.world().resource::(); let msg = VersionedMessage::new(SyncMessage::Lock(LockMessage::LockRequest { entity_id, node_id: node1_id, })); bridge.send(msg).ok(); } // Update to allow lock propagation for _ in 0..10 { update_with_fixed(&mut app1); update_with_fixed(&mut app2); tokio::time::sleep(Duration::from_millis(100)).await; } // Verify lock acquired wait_for_sync(&mut app1, &mut app2, Duration::from_secs(2), |_, w2| { let registry2 = w2.resource::(); registry2.is_locked(entity_id, node2_id) }) .await?; println!("✓ Lock acquired and propagated"); // Simulate node 1 crash: remove lock from node 1's registry without sending release // This stops heartbeat broadcasts from node 1 { let mut registry = app1.world_mut().resource_mut::(); registry.force_release(entity_id); println!("✓ Simulated node 1 crash (stopped heartbeats)"); } // Force the lock to expire on node 2 (simulating 5+ seconds passing without heartbeats) { let mut registry = app2.world_mut().resource_mut::(); registry.expire_lock_for_testing(entity_id); println!("✓ Forced lock to appear expired on node 2"); } // Run cleanup system (which removes expired locks and broadcasts LockReleased) println!("Running cleanup to expire locks..."); for _ in 0..10 { update_with_fixed(&mut app2); tokio::time::sleep(Duration::from_millis(100)).await; } // Verify lock was removed from node 2 { let registry = app2.world().resource::(); assert!( !registry.is_locked(entity_id, node2_id), "Lock should be expired on node 2 after cleanup" ); println!("✓ Lock expired on node 2 after 5 seconds without heartbeat"); } println!("✓ Lock heartbeat expiration test passed"); router1.shutdown().await?; router2.shutdown().await?; ep1.close().await; ep2.close().await; Ok(()) } /// Test 7: Lock release stops heartbeats #[tokio::test(flavor = "multi_thread")] async fn test_lock_release_stops_heartbeats() -> Result<()> { use test_utils::*; println!("=== Starting test_lock_release_stops_heartbeats ==="); let ctx1 = TestContext::new(); let ctx2 = TestContext::new(); let (ep1, ep2, router1, router2, bridge1, bridge2) = setup_gossip_pair().await?; let node1_id = bridge1.node_id(); let node2_id = bridge2.node_id(); let mut app1 = create_test_app(node1_id, ctx1.db_path(), bridge1); let mut app2 = create_test_app(node2_id, ctx2.db_path(), bridge2); // Node 1 spawns entity and selects it let entity_id = Uuid::new_v4(); let _ = app1.world_mut() .spawn(( NetworkedEntity::with_id(entity_id, node1_id), TestPosition { x: 10.0, y: 20.0 }, Persisted::with_id(entity_id), Synced, )) .id(); // Wait for sync wait_for_sync(&mut app1, &mut app2, Duration::from_secs(5), |_, w2| { count_entities_with_id(w2, entity_id) > 0 }) .await?; // Acquire lock locally on node 1 (gossip doesn't loop back to sender) { let world = app1.world_mut(); let mut registry = world.resource_mut::(); registry.try_acquire(entity_id, node1_id).ok(); } // Broadcast LockRequest so other nodes apply optimistically { let bridge = app1.world().resource::(); let msg = VersionedMessage::new(SyncMessage::Lock(LockMessage::LockRequest { entity_id, node_id: node1_id, })); bridge.send(msg).ok(); } // Update to allow lock propagation for _ in 0..10 { app1.update(); app2.update(); tokio::time::sleep(Duration::from_millis(100)).await; } // Wait for lock to propagate wait_for_sync(&mut app1, &mut app2, Duration::from_secs(2), |_, w2| { let registry2 = w2.resource::(); registry2.is_locked(entity_id, node2_id) }) .await?; println!("✓ Lock acquired and propagated"); // Release lock on node 1 { let world = app1.world_mut(); let mut registry = world.resource_mut::(); if registry.release(entity_id, node1_id) { println!("✓ Lock released on node 1"); } } // Broadcast LockRelease message to other nodes { let bridge = app1.world().resource::(); let msg = VersionedMessage::new(SyncMessage::Lock(LockMessage::LockRelease { entity_id, node_id: node1_id, })); bridge.send(msg).ok(); } // Update to trigger lock release propagation for _ in 0..10 { app1.update(); app2.update(); tokio::time::sleep(Duration::from_millis(100)).await; } // Wait for release to propagate to node 2 wait_for_sync(&mut app1, &mut app2, Duration::from_secs(3), |_, w2| { let registry2 = w2.resource::(); !registry2.is_locked(entity_id, node2_id) }) .await?; println!("✓ Lock release propagated to node 2"); println!("✓ Lock release stops heartbeats test passed"); router1.shutdown().await?; router2.shutdown().await?; ep1.close().await; ep2.close().await; Ok(()) } /// Test 8: Offline-to-online sync (operations work offline and sync when online) /// /// This test verifies the offline-first CRDT architecture: /// - Spawning entities offline increments vector clock and logs operations /// - Modifying entities offline increments vector clock and logs operations /// - Deleting entities offline increments vector clock and records tombstones /// - When networking starts, all offline operations sync to peers /// - Peers correctly apply all operations (spawns, updates, deletes) /// - Tombstones prevent resurrection of deleted entities #[tokio::test(flavor = "multi_thread")] async fn test_offline_to_online_sync() -> Result<()> { use test_utils::*; use libmarathon::networking::{NodeVectorClock, OperationLog, TombstoneRegistry, ToDelete}; println!("=== Starting test_offline_to_online_sync ==="); let ctx1 = TestContext::new(); let ctx2 = TestContext::new(); // Setup gossip networking FIRST to get the bridge node IDs println!("Setting up gossip pair..."); let (ep1, ep2, router1, router2, bridge1, bridge2) = setup_gossip_pair().await?; let node1_id = bridge1.node_id(); let node2_id = bridge2.node_id(); println!("Node 1 ID (from bridge): {}", node1_id); println!("Node 2 ID (from bridge): {}", node2_id); // Phase 1: Create app1 in OFFLINE mode (no GossipBridge inserted yet) // Important: Use the bridge's node_id so operations are recorded with the right ID println!("\n--- Phase 1: Offline Operations on Node 1 ---"); let mut app1 = { let mut app = App::new(); app.add_plugins(MinimalPlugins.set(ScheduleRunnerPlugin::run_loop( Duration::from_secs_f64(1.0 / 60.0), ))) .add_plugins(NetworkingPlugin::new(NetworkingConfig { node_id: node1_id, // Use bridge's node_id! sync_interval_secs: 0.5, prune_interval_secs: 10.0, tombstone_gc_interval_secs: 30.0, })) .add_plugins(PersistencePlugin::with_config( ctx1.db_path(), PersistenceConfig { flush_interval_secs: 1, checkpoint_interval_secs: 5, battery_adaptive: false, ..Default::default() }, )) .register_type::() .register_type::(); // NOTE: NO GossipBridge inserted yet - this is offline mode! println!("✓ Created app1 in OFFLINE mode (no GossipBridge, but using bridge's node_id)"); app }; // Spawn entity A offline let entity_a = Uuid::new_v4(); println!("\nSpawning entity A ({}) OFFLINE", entity_a); let entity_a_bevy = app1 .world_mut() .spawn(( NetworkedEntity::with_id(entity_a, node1_id), TestPosition { x: 10.0, y: 20.0 }, NetworkedTransform::default(), Transform::from_xyz(10.0, 20.0, 0.0), Persisted::with_id(entity_a), Synced, )) .id(); // Trigger change detection { let world = app1.world_mut(); if let Ok(mut entity_mut) = world.get_entity_mut(entity_a_bevy) { if let Some(mut persisted) = entity_mut.get_mut::() { let _ = &mut *persisted; } } } // Update to trigger delta generation (offline) update_with_fixed(&mut app1); tokio::time::sleep(Duration::from_millis(50)).await; // Verify clock incremented for spawn let clock_after_spawn = { let clock = app1.world().resource::(); let seq = clock.clock.timestamps.get(&node1_id).copied().unwrap_or(0); println!("✓ Vector clock after spawn: {}", seq); assert!(seq > 0, "Clock should have incremented after spawn"); seq }; // Spawn entity B offline let entity_b = Uuid::new_v4(); println!("\nSpawning entity B ({}) OFFLINE", entity_b); let entity_b_bevy = app1 .world_mut() .spawn(( NetworkedEntity::with_id(entity_b, node1_id), TestPosition { x: 30.0, y: 40.0 }, NetworkedTransform::default(), Transform::from_xyz(30.0, 40.0, 0.0), Persisted::with_id(entity_b), Synced, )) .id(); // Trigger change detection { let world = app1.world_mut(); if let Ok(mut entity_mut) = world.get_entity_mut(entity_b_bevy) { if let Some(mut persisted) = entity_mut.get_mut::() { let _ = &mut *persisted; } } } update_with_fixed(&mut app1); tokio::time::sleep(Duration::from_millis(50)).await; let clock_after_second_spawn = { let clock = app1.world().resource::(); let seq = clock.clock.timestamps.get(&node1_id).copied().unwrap_or(0); println!("✓ Vector clock after second spawn: {}", seq); assert!(seq > clock_after_spawn, "Clock should have incremented again"); seq }; // Modify entity A offline (change Transform) println!("\nModifying entity A Transform OFFLINE"); { let world = app1.world_mut(); if let Ok(mut entity_mut) = world.get_entity_mut(entity_a_bevy) { if let Some(mut transform) = entity_mut.get_mut::() { transform.translation.x = 15.0; transform.translation.y = 25.0; } } } update_with_fixed(&mut app1); tokio::time::sleep(Duration::from_millis(50)).await; let clock_after_modify = { let clock = app1.world().resource::(); let seq = clock.clock.timestamps.get(&node1_id).copied().unwrap_or(0); println!("✓ Vector clock after modify: {}", seq); assert!(seq > clock_after_second_spawn, "Clock should have incremented after modification"); seq }; // Delete entity B offline println!("\nDeleting entity B OFFLINE"); { let mut commands = app1.world_mut().commands(); commands.entity(entity_b_bevy).insert(ToDelete); } update_with_fixed(&mut app1); tokio::time::sleep(Duration::from_millis(50)).await; let clock_after_delete = { let clock = app1.world().resource::(); let seq = clock.clock.timestamps.get(&node1_id).copied().unwrap_or(0); println!("✓ Vector clock after delete: {}", seq); assert!(seq > clock_after_modify, "Clock should have incremented after deletion"); seq }; // Verify entity B is deleted locally { let count = count_entities_with_id(app1.world_mut(), entity_b); assert_eq!(count, 0, "Entity B should be deleted locally"); println!("✓ Entity B deleted locally"); } // Verify tombstone recorded for entity B { let tombstones = app1.world().resource::(); assert!(tombstones.is_deleted(entity_b), "Tombstone should be recorded for entity B"); println!("✓ Tombstone recorded for entity B"); } // Verify operation log has entries { let op_log = app1.world().resource::(); let op_count = op_log.total_operations(); println!("✓ Operation log has {} operations recorded offline", op_count); assert!(op_count >= 4, "Should have operations for: spawn A, spawn B, modify A, delete B"); } println!("\n--- Phase 2: Bringing Node 1 Online ---"); // Insert GossipBridge into app1 (going online!) app1.world_mut().insert_resource(bridge1); println!("✓ Inserted GossipBridge into app1 - NOW ONLINE"); println!(" Node 1 ID: {} (matches bridge from start)", node1_id); // Create app2 online from the start println!("\n--- Phase 3: Creating Node 2 Online ---"); let mut app2 = create_test_app(node2_id, ctx2.db_path(), bridge2); println!("✓ Created app2 ONLINE with node_id: {}", node2_id); // Phase 3: Wait for sync println!("\n--- Phase 4: Waiting for Sync ---"); println!("Expected to sync:"); println!(" - Entity A (spawned and modified offline)"); println!(" - Entity B tombstone (deleted offline)"); // Wait for entity A to sync to app2 wait_for_sync(&mut app1, &mut app2, Duration::from_secs(10), |_, w2| { let count = count_entities_with_id(w2, entity_a); if count > 0 { println!("✓ Entity A found on node 2!"); true } else { false } }) .await?; // Wait a bit more for tombstone to sync for _ in 0..20 { update_with_fixed(&mut app1); update_with_fixed(&mut app2); tokio::time::sleep(Duration::from_millis(100)).await; } println!("\n--- Phase 5: Verification ---"); // Verify entity A synced with MODIFIED transform { let mut query = app2.world_mut().query::<(&NetworkedEntity, &Transform)>(); let mut found = false; for (ne, transform) in query.iter(app2.world()) { if ne.network_id == entity_a { found = true; println!("✓ Entity A found on node 2"); println!(" Transform: ({}, {}, {})", transform.translation.x, transform.translation.y, transform.translation.z ); // Verify it has the MODIFIED position, not the original assert!( (transform.translation.x - 15.0).abs() < 0.1, "Entity A should have modified X position (15.0)" ); assert!( (transform.translation.y - 25.0).abs() < 0.1, "Entity A should have modified Y position (25.0)" ); println!("✓ Entity A has correct modified transform"); break; } } assert!(found, "Entity A should exist on node 2"); } // Verify entity B does NOT exist on node 2 (was deleted offline) { let count = count_entities_with_id(app2.world_mut(), entity_b); assert_eq!(count, 0, "Entity B should NOT exist on node 2 (deleted offline)"); println!("✓ Entity B correctly does not exist on node 2"); } // Verify tombstone for entity B exists on node 2 { let tombstones = app2.world().resource::(); assert!( tombstones.is_deleted(entity_b), "Tombstone for entity B should have synced to node 2" ); println!("✓ Tombstone for entity B synced to node 2"); } // Verify final vector clocks are consistent { let clock1 = app1.world().resource::(); let clock2 = app2.world().resource::(); let node1_seq_on_app1 = clock1.clock.timestamps.get(&node1_id).copied().unwrap_or(0); let node1_seq_on_app2 = clock2.clock.timestamps.get(&node1_id).copied().unwrap_or(0); println!("Final vector clocks:"); println!(" Node 1 clock on app1: {}", node1_seq_on_app1); println!(" Node 1 clock on app2: {}", node1_seq_on_app2); // Clock should be clock_after_delete + 1 because sending the SyncRequest increments it assert_eq!( node1_seq_on_app1, clock_after_delete + 1, "Node 1's clock should be offline state + 1 (for SyncRequest)" ); // Node 2 should have learned about node 1's clock through sync assert_eq!( node1_seq_on_app2, node1_seq_on_app1, "Node 2 should have synced node 1's clock" ); println!("✓ Vector clocks verified"); } println!("\n✓ OFFLINE-TO-ONLINE SYNC TEST PASSED!"); println!("Summary:"); println!(" - Spawned 2 entities offline (clock incremented)"); println!(" - Modified 1 entity offline (clock incremented)"); println!(" - Deleted 1 entity offline (clock incremented, tombstone recorded)"); println!(" - Went online and synced to peer"); println!(" - Peer received all operations correctly"); println!(" - Tombstone prevented deleted entity resurrection"); // Cleanup router1.shutdown().await?; router2.shutdown().await?; ep1.close().await; ep2.close().await; Ok(()) } /// Test 12: Lock re-acquisition cycle (acquire → release → re-acquire) /// /// This test verifies that locks can be acquired, released, and then re-acquired multiple times. /// This is critical for normal editing workflows where users repeatedly select/deselect entities. #[tokio::test(flavor = "multi_thread")] async fn test_lock_reacquisition_cycle() -> Result<()> { use test_utils::*; println!("\n=== Starting test_lock_reacquisition_cycle ==="); println!("Testing: acquire → release → re-acquire → release → re-acquire"); let ctx1 = TestContext::new(); let ctx2 = TestContext::new(); let (ep1, ep2, router1, router2, bridge1, bridge2) = setup_gossip_pair().await?; let node1_id = bridge1.node_id(); let node2_id = bridge2.node_id(); println!("Node 1 ID: {}", node1_id); println!("Node 2 ID: {}", node2_id); let mut app1 = create_test_app(node1_id, ctx1.db_path(), bridge1.clone()); let mut app2 = create_test_app(node2_id, ctx2.db_path(), bridge2); // === PHASE 1: Spawn entity === println!("\nPHASE 1: Spawning entity on Node 1"); let entity_id = Uuid::new_v4(); app1.world_mut().spawn(( NetworkedEntity::with_id(entity_id, node1_id), TestPosition { x: 10.0, y: 20.0 }, Persisted::with_id(entity_id), Synced, )); // Wait for entity to replicate wait_for_sync(&mut app1, &mut app2, Duration::from_secs(5), |_, w2| { count_entities_with_id(w2, entity_id) > 0 }) .await?; println!("✓ Entity replicated to both nodes"); // === PHASE 2: First lock acquisition === println!("\nPHASE 2: Node 1 acquires lock (FIRST time)"); // Update LocalSelection to select the entity { let mut selection = app1.world_mut().resource_mut::(); selection.clear(); selection.insert(entity_id); println!(" Updated LocalSelection to select entity"); } // Wait for lock to propagate wait_for_sync(&mut app1, &mut app2, Duration::from_secs(3), |w1, w2| { let lock1 = w1.resource::(); let lock2 = w2.resource::(); lock1.is_locked_by(entity_id, node1_id, node1_id) && lock2.is_locked_by(entity_id, node1_id, node2_id) }) .await?; { let lock1 = app1.world().resource::(); let lock2 = app2.world().resource::(); assert!( lock1.is_locked_by(entity_id, node1_id, node1_id), "Node 1 should hold lock (first acquisition)" ); assert!( lock2.is_locked_by(entity_id, node1_id, node2_id), "Node 2 should see Node 1 holding lock (first acquisition)" ); } println!("✓ FIRST lock acquisition successful"); // === PHASE 3: First lock release === println!("\nPHASE 3: Node 1 releases lock (FIRST time)"); { let mut selection = app1.world_mut().resource_mut::(); selection.clear(); println!(" Cleared LocalSelection"); } // Wait for lock release to propagate wait_for_sync(&mut app1, &mut app2, Duration::from_secs(3), |w1, w2| { let lock1 = w1.resource::(); let lock2 = w2.resource::(); !lock1.is_locked(entity_id, node1_id) && !lock2.is_locked(entity_id, node2_id) }) .await?; { let lock1 = app1.world().resource::(); let lock2 = app2.world().resource::(); assert!( !lock1.is_locked(entity_id, node1_id), "Node 1 should have released lock" ); assert!( !lock2.is_locked(entity_id, node2_id), "Node 2 should see lock released" ); } println!("✓ FIRST lock release successful"); // === PHASE 4: SECOND lock acquisition (THE CRITICAL TEST) === println!("\nPHASE 4: Node 1 acquires lock (SECOND time) - THIS IS THE BUG"); { let mut selection = app1.world_mut().resource_mut::(); selection.clear(); selection.insert(entity_id); println!(" Updated LocalSelection to select entity (again)"); } // Wait for lock to propagate println!(" Waiting for lock to propagate..."); for i in 0..30 { app1.update(); app2.update(); tokio::time::sleep(Duration::from_millis(100)).await; if i % 5 == 0 { let lock1 = app1.world().resource::(); let lock2 = app2.world().resource::(); println!( " [{}] Node 1: locked_by_me={}, locked={}", i, lock1.is_locked_by(entity_id, node1_id, node1_id), lock1.is_locked(entity_id, node1_id) ); println!( " [{}] Node 2: locked_by_node1={}, locked={}", i, lock2.is_locked_by(entity_id, node1_id, node2_id), lock2.is_locked(entity_id, node2_id) ); } } { let lock1 = app1.world().resource::(); let lock2 = app2.world().resource::(); assert!( lock1.is_locked_by(entity_id, node1_id, node1_id), "Node 1 should hold lock (SECOND acquisition) - THIS IS WHERE THE BUG MANIFESTS" ); assert!( lock2.is_locked_by(entity_id, node1_id, node2_id), "Node 2 should see Node 1 holding lock (SECOND acquisition) - THIS IS WHERE THE BUG MANIFESTS" ); } println!("✓ SECOND lock acquisition successful!"); // === PHASE 5: Second lock release === println!("\nPHASE 5: Node 1 releases lock (SECOND time)"); { let mut selection = app1.world_mut().resource_mut::(); selection.clear(); println!(" Cleared LocalSelection"); } // Wait for lock release to propagate wait_for_sync(&mut app1, &mut app2, Duration::from_secs(3), |w1, w2| { let lock1 = w1.resource::(); let lock2 = w2.resource::(); !lock1.is_locked(entity_id, node1_id) && !lock2.is_locked(entity_id, node2_id) }) .await?; println!("✓ SECOND lock release successful"); // === PHASE 6: THIRD lock acquisition (verify pattern continues) === println!("\nPHASE 6: Node 1 acquires lock (THIRD time) - verifying pattern"); { let mut selection = app1.world_mut().resource_mut::(); selection.clear(); selection.insert(entity_id); println!(" Updated LocalSelection to select entity (third time)"); } // Wait for lock to propagate wait_for_sync(&mut app1, &mut app2, Duration::from_secs(3), |w1, w2| { let lock1 = w1.resource::(); let lock2 = w2.resource::(); lock1.is_locked_by(entity_id, node1_id, node1_id) && lock2.is_locked_by(entity_id, node1_id, node2_id) }) .await?; { let lock1 = app1.world().resource::(); let lock2 = app2.world().resource::(); assert!( lock1.is_locked_by(entity_id, node1_id, node1_id), "Node 1 should hold lock (THIRD acquisition)" ); assert!( lock2.is_locked_by(entity_id, node1_id, node2_id), "Node 2 should see Node 1 holding lock (THIRD acquisition)" ); } println!("✓ THIRD lock acquisition successful!"); println!("\n✓ LOCK RE-ACQUISITION CYCLE TEST PASSED!"); println!("Summary:"); println!(" - First acquisition: ✓"); println!(" - First release: ✓"); println!(" - SECOND acquisition: ✓ (this was failing before)"); println!(" - Second release: ✓"); println!(" - THIRD acquisition: ✓"); // Cleanup router1.shutdown().await?; router2.shutdown().await?; ep1.close().await; ep2.close().await; Ok(()) }