code review results

Signed-off-by: Sienna Meridian Satterwhite <sienna@r3t.io>
This commit is contained in:
2025-12-11 18:39:57 +00:00
parent 2847e7236f
commit 1504807afe
40 changed files with 2600 additions and 678 deletions

View File

@@ -5,12 +5,19 @@
use std::{
path::PathBuf,
time::{Duration, Instant},
time::{
Duration,
Instant,
},
};
use anyhow::Result;
use bevy::{
app::{App, ScheduleRunnerPlugin},
MinimalPlugins,
app::{
App,
ScheduleRunnerPlugin,
},
ecs::{
component::Component,
reflect::ReflectComponent,
@@ -18,23 +25,40 @@ use bevy::{
},
prelude::*,
reflect::Reflect,
MinimalPlugins,
};
use futures_lite::StreamExt;
use iroh::{Endpoint, protocol::Router};
use iroh::{
Endpoint,
protocol::Router,
};
use iroh_gossip::{
api::{GossipReceiver, GossipSender},
api::{
GossipReceiver,
GossipSender,
},
net::Gossip,
proto::TopicId,
};
use lib::{
networking::{
GossipBridge, NetworkedEntity, NetworkedTransform, NetworkingConfig, NetworkingPlugin,
Synced, VersionedMessage,
GossipBridge,
NetworkedEntity,
NetworkedTransform,
NetworkingConfig,
NetworkingPlugin,
Synced,
VersionedMessage,
},
persistence::{
Persisted,
PersistenceConfig,
PersistencePlugin,
},
persistence::{PersistenceConfig, PersistencePlugin, Persisted},
};
use serde::{Deserialize, Serialize};
use serde::{
Deserialize,
Serialize,
};
use sync_macros::Synced as SyncedDerive;
use tempfile::TempDir;
use uuid::Uuid;
@@ -68,8 +92,12 @@ struct TestHealth {
// ============================================================================
mod test_utils {
use rusqlite::{
Connection,
OptionalExtension,
};
use super::*;
use rusqlite::{Connection, OptionalExtension};
/// Test context that manages temporary directories with RAII cleanup
pub struct TestContext {
@@ -134,12 +162,11 @@ mod test_utils {
let conn = Connection::open(db_path)?;
let entity_id_bytes = entity_id.as_bytes();
let data_result: std::result::Result<Vec<u8>, rusqlite::Error> = conn
.query_row(
"SELECT data FROM components WHERE entity_id = ?1 AND component_type = ?2",
rusqlite::params![entity_id_bytes.as_slice(), component_type],
|row| row.get(0),
);
let data_result: std::result::Result<Vec<u8>, rusqlite::Error> = conn.query_row(
"SELECT data FROM components WHERE entity_id = ?1 AND component_type = ?2",
rusqlite::params![entity_id_bytes.as_slice(), component_type],
|row| row.get(0),
);
let data = data_result.optional()?;
@@ -161,11 +188,9 @@ mod test_utils {
pub fn create_test_app(node_id: Uuid, db_path: PathBuf, bridge: GossipBridge) -> App {
let mut app = App::new();
app.add_plugins(
MinimalPlugins.set(ScheduleRunnerPlugin::run_loop(Duration::from_secs_f64(
1.0 / 60.0,
))),
)
app.add_plugins(MinimalPlugins.set(ScheduleRunnerPlugin::run_loop(
Duration::from_secs_f64(1.0 / 60.0),
)))
.insert_resource(bridge)
.add_plugins(NetworkingPlugin::new(NetworkingConfig {
node_id,
@@ -233,8 +258,7 @@ mod test_utils {
check_fn: F,
) -> Result<()>
where
F: Fn(&mut World, &mut World) -> bool,
{
F: Fn(&mut World, &mut World) -> bool, {
let start = Instant::now();
let mut tick_count = 0;
@@ -245,12 +269,20 @@ mod test_utils {
tick_count += 1;
if tick_count % 50 == 0 {
println!("Waiting for sync... tick {} ({:.1}s elapsed)", tick_count, start.elapsed().as_secs_f32());
println!(
"Waiting for sync... tick {} ({:.1}s elapsed)",
tick_count,
start.elapsed().as_secs_f32()
);
}
// Check condition
if check_fn(app1.world_mut(), app2.world_mut()) {
println!("Sync completed after {} ticks ({:.3}s)", tick_count, start.elapsed().as_secs_f32());
println!(
"Sync completed after {} ticks ({:.3}s)",
tick_count,
start.elapsed().as_secs_f32()
);
return Ok(());
}
@@ -305,19 +337,27 @@ mod test_utils {
static_provider.add_endpoint_info(addr.clone());
}
endpoint.discovery().add(static_provider);
println!(" Added {} bootstrap peers to static discovery", bootstrap_count);
println!(
" Added {} bootstrap peers to static discovery",
bootstrap_count
);
// Explicitly connect to bootstrap peers
println!(" Connecting to bootstrap peers...");
for addr in &bootstrap_addrs {
match endpoint.connect(addr.clone(), iroh_gossip::ALPN).await {
Ok(_conn) => println!(" ✓ Connected to bootstrap peer: {}", addr.id),
Err(e) => println!(" ✗ Failed to connect to bootstrap peer {}: {}", addr.id, e),
| Ok(_conn) => println!(" ✓ Connected to bootstrap peer: {}", addr.id),
| Err(e) => {
println!(" ✗ Failed to connect to bootstrap peer {}: {}", addr.id, e)
},
}
}
}
println!(" Subscribing to topic with {} bootstrap peers...", bootstrap_count);
println!(
" Subscribing to topic with {} bootstrap peers...",
bootstrap_count
);
// Subscribe to the topic (the IDs now have addresses via discovery)
let subscribe_handle = gossip.subscribe(topic_id, bootstrap_ids).await?;
@@ -332,9 +372,11 @@ mod test_utils {
println!(" Waiting for join to complete (with timeout)...");
// Use a timeout in case mDNS discovery takes a while or fails
match tokio::time::timeout(Duration::from_secs(3), receiver.joined()).await {
Ok(Ok(())) => println!(" Join completed!"),
Ok(Err(e)) => println!(" Join error: {}", e),
Err(_) => println!(" Join timeout - proceeding anyway (mDNS may still connect later)"),
| Ok(Ok(())) => println!(" Join completed!"),
| Ok(Err(e)) => println!(" Join error: {}", e),
| Err(_) => {
println!(" Join timeout - proceeding anyway (mDNS may still connect later)")
},
}
} else {
println!(" No bootstrap peers - skipping join wait (first node in swarm)");
@@ -352,8 +394,7 @@ mod test_utils {
}
/// Setup a pair of iroh-gossip nodes connected to the same topic
pub async fn setup_gossip_pair(
) -> Result<(
pub async fn setup_gossip_pair() -> Result<(
Endpoint,
Endpoint,
Router,
@@ -370,13 +411,15 @@ mod test_utils {
let (ep1, _gossip1, router1, bridge1) = init_gossip_node(topic_id, vec![]).await?;
println!("Node 1 initialized with ID: {}", ep1.addr().id);
// Get node 1's full address (ID + network addresses) for node 2 to bootstrap from
// Get node 1's full address (ID + network addresses) for node 2 to bootstrap
// from
let node1_addr = ep1.addr().clone();
println!("Node 1 full address: {:?}", node1_addr);
// Initialize node 2 with node 1's full address as bootstrap peer
println!("Initializing node 2 with bootstrap peer: {}", node1_addr.id);
let (ep2, _gossip2, router2, bridge2) = init_gossip_node(topic_id, vec![node1_addr]).await?;
let (ep2, _gossip2, router2, bridge2) =
init_gossip_node(topic_id, vec![node1_addr]).await?;
println!("Node 2 initialized with ID: {}", ep2.addr().id);
// Give mDNS and gossip time to discover peers
@@ -387,7 +430,8 @@ mod test_utils {
Ok((ep1, ep2, router1, router2, bridge1, bridge2))
}
/// Spawn background tasks to forward messages between iroh-gossip and GossipBridge
/// Spawn background tasks to forward messages between iroh-gossip and
/// GossipBridge
fn spawn_gossip_bridge_tasks(
sender: GossipSender,
mut receiver: GossipReceiver,
@@ -403,18 +447,27 @@ mod test_utils {
// Poll the bridge's outgoing queue
if let Some(versioned_msg) = bridge_out.try_recv_outgoing() {
msg_count += 1;
println!("[Node {}] Sending message #{} via gossip", node_id, msg_count);
println!(
"[Node {}] Sending message #{} via gossip",
node_id, msg_count
);
// Serialize the message
match bincode::serialize(&versioned_msg) {
Ok(bytes) => {
| Ok(bytes) => {
// Broadcast via gossip
if let Err(e) = sender.broadcast(bytes.into()).await {
eprintln!("[Node {}] Failed to broadcast message: {}", node_id, e);
} else {
println!("[Node {}] Message #{} broadcasted successfully", node_id, msg_count);
println!(
"[Node {}] Message #{} broadcasted successfully",
node_id, msg_count
);
}
}
Err(e) => eprintln!("[Node {}] Failed to serialize message for broadcast: {}", node_id, e),
},
| Err(e) => eprintln!(
"[Node {}] Failed to serialize message for broadcast: {}",
node_id, e
),
}
}
@@ -431,34 +484,52 @@ mod test_utils {
loop {
// Receive from gossip (GossipReceiver is a Stream)
match tokio::time::timeout(Duration::from_millis(100), receiver.next()).await {
Ok(Some(Ok(event))) => {
println!("[Node {}] Received gossip event: {:?}", node_id, std::mem::discriminant(&event));
| Ok(Some(Ok(event))) => {
println!(
"[Node {}] Received gossip event: {:?}",
node_id,
std::mem::discriminant(&event)
);
if let iroh_gossip::api::Event::Received(msg) = event {
msg_count += 1;
println!("[Node {}] Received message #{} from gossip", node_id, msg_count);
println!(
"[Node {}] Received message #{} from gossip",
node_id, msg_count
);
// Deserialize the message
match bincode::deserialize::<VersionedMessage>(&msg.content) {
Ok(versioned_msg) => {
| Ok(versioned_msg) => {
// Push to bridge's incoming queue
if let Err(e) = bridge_in.push_incoming(versioned_msg) {
eprintln!("[Node {}] Failed to push to bridge incoming: {}", node_id, e);
eprintln!(
"[Node {}] Failed to push to bridge incoming: {}",
node_id, e
);
} else {
println!("[Node {}] Message #{} pushed to bridge incoming", node_id, msg_count);
println!(
"[Node {}] Message #{} pushed to bridge incoming",
node_id, msg_count
);
}
}
Err(e) => eprintln!("[Node {}] Failed to deserialize gossip message: {}", node_id, e),
},
| Err(e) => eprintln!(
"[Node {}] Failed to deserialize gossip message: {}",
node_id, e
),
}
}
}
Ok(Some(Err(e))) => eprintln!("[Node {}] Gossip receiver error: {}", node_id, e),
Ok(None) => {
},
| Ok(Some(Err(e))) => {
eprintln!("[Node {}] Gossip receiver error: {}", node_id, e)
},
| Ok(None) => {
// Stream ended
println!("[Node {}] Gossip stream ended", node_id);
break;
}
Err(_) => {
},
| Err(_) => {
// Timeout, no message available
}
},
}
}
});
@@ -500,12 +571,15 @@ async fn test_basic_entity_sync() -> Result<()> {
// Node 1 spawns entity
let entity_id = Uuid::new_v4();
println!("Spawning entity {} on node 1", entity_id);
let spawned_entity = app1.world_mut().spawn((
NetworkedEntity::with_id(entity_id, node1_id),
TestPosition { x: 10.0, y: 20.0 },
Persisted::with_id(entity_id),
Synced,
)).id();
let spawned_entity = app1
.world_mut()
.spawn((
NetworkedEntity::with_id(entity_id, node1_id),
TestPosition { x: 10.0, y: 20.0 },
Persisted::with_id(entity_id),
Synced,
))
.id();
// IMPORTANT: Trigger change detection for persistence
// Bevy only marks components as "changed" when mutated, not on spawn
@@ -536,7 +610,11 @@ async fn test_basic_entity_sync() -> Result<()> {
query.iter(w2).map(|ne| ne.network_id).collect()
};
if !all_networked.is_empty() {
println!(" Node 2 has {} networked entities: {:?}", all_networked.len(), all_networked);
println!(
" Node 2 has {} networked entities: {:?}",
all_networked.len(),
all_networked
);
println!(" Looking for: {}", entity_id);
}
false
@@ -565,7 +643,11 @@ async fn test_basic_entity_sync() -> Result<()> {
}
// Verify entity synced to node 2 (in-memory check)
assert_entity_synced(app2.world_mut(), entity_id, TestPosition { x: 10.0, y: 20.0 })?;
assert_entity_synced(
app2.world_mut(),
entity_id,
TestPosition { x: 10.0, y: 20.0 },
)?;
println!("✓ Entity synced in-memory on node 2");
// Give persistence system time to flush to disk
@@ -586,7 +668,11 @@ async fn test_basic_entity_sync() -> Result<()> {
);
assert!(
component_exists_in_db(&ctx1.db_path(), entity_id, "sync_integration_headless::TestPosition")?,
component_exists_in_db(
&ctx1.db_path(),
entity_id,
"sync_integration_headless::TestPosition"
)?,
"TestPosition component should exist in Node 1 database"
);
@@ -616,7 +702,11 @@ async fn test_basic_entity_sync() -> Result<()> {
);
assert!(
component_exists_in_db(&ctx2.db_path(), entity_id, "sync_integration_headless::TestPosition")?,
component_exists_in_db(
&ctx2.db_path(),
entity_id,
"sync_integration_headless::TestPosition"
)?,
"TestPosition component should exist in Node 2 database after sync"
);
@@ -666,12 +756,15 @@ async fn test_bidirectional_sync() -> Result<()> {
// Node 1 spawns entity A
let entity_a = Uuid::new_v4();
let entity_a_bevy = app1.world_mut().spawn((
NetworkedEntity::with_id(entity_a, node1_id),
TestPosition { x: 1.0, y: 2.0 },
Persisted::with_id(entity_a),
Synced,
)).id();
let entity_a_bevy = app1
.world_mut()
.spawn((
NetworkedEntity::with_id(entity_a, node1_id),
TestPosition { x: 1.0, y: 2.0 },
Persisted::with_id(entity_a),
Synced,
))
.id();
// Trigger persistence for entity A
{
@@ -685,12 +778,15 @@ async fn test_bidirectional_sync() -> Result<()> {
// Node 2 spawns entity B
let entity_b = Uuid::new_v4();
let entity_b_bevy = app2.world_mut().spawn((
NetworkedEntity::with_id(entity_b, node2_id),
TestPosition { x: 3.0, y: 4.0 },
Persisted::with_id(entity_b),
Synced,
)).id();
let entity_b_bevy = app2
.world_mut()
.spawn((
NetworkedEntity::with_id(entity_b, node2_id),
TestPosition { x: 3.0, y: 4.0 },
Persisted::with_id(entity_b),
Synced,
))
.id();
// Trigger persistence for entity B
{
@@ -709,10 +805,8 @@ async fn test_bidirectional_sync() -> Result<()> {
.await?;
// Verify both nodes have both entities
assert_entity_synced(app1.world_mut(), entity_b, TestPosition { x: 3.0, y: 4.0 })
?;
assert_entity_synced(app2.world_mut(), entity_a, TestPosition { x: 1.0, y: 2.0 })
?;
assert_entity_synced(app1.world_mut(), entity_b, TestPosition { x: 3.0, y: 4.0 })?;
assert_entity_synced(app2.world_mut(), entity_a, TestPosition { x: 1.0, y: 2.0 })?;
println!("✓ Bidirectional sync test passed");
@@ -742,13 +836,16 @@ async fn test_concurrent_conflict_resolution() -> Result<()> {
// Spawn shared entity on node 1 with Transform (which IS tracked for changes)
let entity_id = Uuid::new_v4();
let entity_bevy = app1.world_mut().spawn((
NetworkedEntity::with_id(entity_id, node1_id),
NetworkedTransform::default(),
Transform::from_xyz(0.0, 0.0, 0.0),
Persisted::with_id(entity_id),
Synced,
)).id();
let entity_bevy = app1
.world_mut()
.spawn((
NetworkedEntity::with_id(entity_id, node1_id),
NetworkedTransform::default(),
Transform::from_xyz(0.0, 0.0, 0.0),
Persisted::with_id(entity_id),
Synced,
))
.id();
// Trigger persistence
{
@@ -771,20 +868,44 @@ async fn test_concurrent_conflict_resolution() -> Result<()> {
// Check what components the entity has on each node
{
let world1 = app1.world_mut();
let mut query1 = world1.query::<(Entity, &NetworkedEntity, Option<&NetworkedTransform>, &Transform)>();
let mut query1 = world1.query::<(
Entity,
&NetworkedEntity,
Option<&NetworkedTransform>,
&Transform,
)>();
println!("Node 1 entities:");
for (entity, ne, nt, t) in query1.iter(world1) {
println!(" Entity {:?}: NetworkedEntity({:?}), NetworkedTransform={}, Transform=({}, {}, {})",
entity, ne.network_id, nt.is_some(), t.translation.x, t.translation.y, t.translation.z);
println!(
" Entity {:?}: NetworkedEntity({:?}), NetworkedTransform={}, Transform=({}, {}, {})",
entity,
ne.network_id,
nt.is_some(),
t.translation.x,
t.translation.y,
t.translation.z
);
}
}
{
let world2 = app2.world_mut();
let mut query2 = world2.query::<(Entity, &NetworkedEntity, Option<&NetworkedTransform>, &Transform)>();
let mut query2 = world2.query::<(
Entity,
&NetworkedEntity,
Option<&NetworkedTransform>,
&Transform,
)>();
println!("Node 2 entities:");
for (entity, ne, nt, t) in query2.iter(world2) {
println!(" Entity {:?}: NetworkedEntity({:?}), NetworkedTransform={}, Transform=({}, {}, {})",
entity, ne.network_id, nt.is_some(), t.translation.x, t.translation.y, t.translation.z);
println!(
" Entity {:?}: NetworkedEntity({:?}), NetworkedTransform={}, Transform=({}, {}, {})",
entity,
ne.network_id,
nt.is_some(),
t.translation.x,
t.translation.y,
t.translation.z
);
}
}
@@ -834,11 +955,14 @@ async fn test_concurrent_conflict_resolution() -> Result<()> {
let t2 = transforms2[0];
// Check if they converged (within floating point tolerance)
let converged = (t1.translation.x - t2.translation.x).abs() < 0.01
&& (t1.translation.y - t2.translation.y).abs() < 0.01;
let converged = (t1.translation.x - t2.translation.x).abs() < 0.01 &&
(t1.translation.y - t2.translation.y).abs() < 0.01;
if converged {
println!("✓ Nodes converged to: ({}, {})", t1.translation.x, t1.translation.y);
println!(
"✓ Nodes converged to: ({}, {})",
t1.translation.x, t1.translation.y
);
}
converged
@@ -876,10 +1000,7 @@ async fn test_persistence_crash_recovery() -> Result<()> {
app.world_mut().spawn((
NetworkedEntity::with_id(entity_id, node_id),
TestPosition {
x: 100.0,
y: 200.0,
},
TestPosition { x: 100.0, y: 200.0 },
Persisted::with_id(entity_id),
Synced,
));
@@ -905,8 +1026,12 @@ async fn test_persistence_crash_recovery() -> Result<()> {
app.update();
// Verify entity loaded from database
assert_entity_synced(app.world_mut(), entity_id, TestPosition { x: 100.0, y: 200.0 })
.map_err(|e| anyhow::anyhow!("Persistence recovery failed: {}", e))?;
assert_entity_synced(
app.world_mut(),
entity_id,
TestPosition { x: 100.0, y: 200.0 },
)
.map_err(|e| anyhow::anyhow!("Persistence recovery failed: {}", e))?;
println!("✓ Crash recovery test passed");
}