diff --git a/crates/app/src/engine_bridge.rs b/crates/app/src/engine_bridge.rs index 84f0fd6..ad225e0 100644 --- a/crates/app/src/engine_bridge.rs +++ b/crates/app/src/engine_bridge.rs @@ -42,6 +42,7 @@ fn detect_changes_and_tick( /// 1. Polls all available events from the EngineBridge /// 2. Dispatches them to update Bevy resources and state fn poll_engine_events( + mut commands: Commands, bridge: Res, mut current_session: ResMut, mut node_clock: ResMut, @@ -51,10 +52,14 @@ fn poll_engine_events( if !events.is_empty() { for event in events { match event { - EngineEvent::NetworkingStarted { session_id, node_id } => { + EngineEvent::NetworkingStarted { session_id, node_id, bridge: gossip_bridge } => { info!("Networking started: session={}, node={}", session_id.to_code(), node_id); + // Insert GossipBridge for Bevy systems to use + commands.insert_resource(gossip_bridge); + info!("Inserted GossipBridge resource"); + // Update session to use the new session ID and set state to Active current_session.session = Session::new(session_id.clone()); current_session.session.state = SessionState::Active; diff --git a/crates/libmarathon/src/engine/commands.rs b/crates/libmarathon/src/engine/commands.rs index 3966ddb..0e13de7 100644 --- a/crates/libmarathon/src/engine/commands.rs +++ b/crates/libmarathon/src/engine/commands.rs @@ -4,7 +4,6 @@ use crate::networking::SessionId; use bevy::prelude::*; use uuid::Uuid; -/// Commands that Bevy sends to the Core Engine #[derive(Debug, Clone)] pub enum EngineCommand { // Networking lifecycle diff --git a/crates/libmarathon/src/engine/core.rs b/crates/libmarathon/src/engine/core.rs index 32fb004..4814f63 100644 --- a/crates/libmarathon/src/engine/core.rs +++ b/crates/libmarathon/src/engine/core.rs @@ -94,7 +94,7 @@ impl EngineCore { } match NetworkingManager::new(session_id.clone()).await { - Ok(net_manager) => { + Ok((net_manager, bridge)) => { let node_id = net_manager.node_id(); // Spawn NetworkingManager in background task @@ -108,6 +108,7 @@ impl EngineCore { let _ = self.handle.event_tx.send(EngineEvent::NetworkingStarted { session_id: session_id.clone(), node_id, + bridge, }); tracing::info!("Networking started for session {}", session_id.to_code()); } diff --git a/crates/libmarathon/src/engine/events.rs b/crates/libmarathon/src/engine/events.rs index b611ffd..465cf39 100644 --- a/crates/libmarathon/src/engine/events.rs +++ b/crates/libmarathon/src/engine/events.rs @@ -4,13 +4,13 @@ use crate::networking::{NodeId, SessionId, VectorClock}; use bevy::prelude::*; use uuid::Uuid; -/// Events that the Core Engine emits to Bevy #[derive(Debug, Clone)] pub enum EngineEvent { // Networking status NetworkingStarted { session_id: SessionId, node_id: NodeId, + bridge: crate::networking::GossipBridge, }, NetworkingFailed { error: String, diff --git a/crates/libmarathon/src/engine/networking.rs b/crates/libmarathon/src/engine/networking.rs index 1cf63fc..edf0de2 100644 --- a/crates/libmarathon/src/engine/networking.rs +++ b/crates/libmarathon/src/engine/networking.rs @@ -26,6 +26,9 @@ pub struct NetworkingManager { _router: iroh::protocol::Router, _gossip: iroh_gossip::net::Gossip, + // Bridge to Bevy for message passing + bridge: crate::networking::GossipBridge, + // CRDT state vector_clock: VectorClock, operation_log: OperationLog, @@ -37,7 +40,7 @@ pub struct NetworkingManager { } impl NetworkingManager { - pub async fn new(session_id: SessionId) -> anyhow::Result { + pub async fn new(session_id: SessionId) -> anyhow::Result<(Self, crate::networking::GossipBridge)> { use iroh::{ discovery::mdns::MdnsDiscovery, protocol::Router, @@ -85,6 +88,9 @@ impl NetworkingManager { node_id ); + // Create GossipBridge for Bevy integration + let bridge = crate::networking::GossipBridge::new(node_id); + let manager = Self { session_id, node_id, @@ -93,6 +99,7 @@ impl NetworkingManager { _endpoint: endpoint, _router: router, _gossip: gossip, + bridge: bridge.clone(), vector_clock: VectorClock::new(), operation_log: OperationLog::new(), tombstones: TombstoneRegistry::new(), @@ -100,7 +107,7 @@ impl NetworkingManager { our_locks: std::collections::HashSet::new(), }; - Ok(manager) + Ok((manager, bridge)) } pub fn node_id(&self) -> NodeId { @@ -112,20 +119,39 @@ impl NetworkingManager { } /// Process gossip events (unbounded) and periodic tasks (heartbeats, lock cleanup) + /// Also bridges messages between iroh-gossip and Bevy's GossipBridge pub async fn run(mut self, event_tx: mpsc::UnboundedSender) { let mut heartbeat_interval = time::interval(Duration::from_secs(1)); + let mut bridge_poll_interval = time::interval(Duration::from_millis(10)); loop { tokio::select! { - // Process gossip events unbounded (as fast as they arrive) + // Process incoming gossip messages and forward to GossipBridge Some(result) = self.receiver.next() => { match result { Ok(event) => { use iroh_gossip::api::Event; - if let Event::Received(msg) = event { - self.handle_sync_message(&msg.content, &event_tx).await; + match event { + Event::Received(msg) => { + // Deserialize and forward to GossipBridge for Bevy systems + if let Ok(versioned) = rkyv::from_bytes::(&msg.content) { + if let Err(e) = self.bridge.push_incoming(versioned) { + tracing::error!("Failed to push message to GossipBridge: {}", e); + } else { + tracing::debug!("Forwarded message to Bevy via GossipBridge"); + } + } + } + Event::NeighborUp(peer) => { + tracing::info!("Peer connected: {}", peer); + } + Event::NeighborDown(peer) => { + tracing::warn!("Peer disconnected: {}", peer); + } + Event::Lagged => { + tracing::warn!("Event stream lagged"); + } } - // Note: Neighbor events are not exposed in the current API } Err(e) => { tracing::warn!("Gossip receiver error: {}", e); @@ -133,6 +159,19 @@ impl NetworkingManager { } } + // Poll GossipBridge for outgoing messages and broadcast via iroh + _ = bridge_poll_interval.tick() => { + while let Some(msg) = self.bridge.try_recv_outgoing() { + if let Ok(bytes) = rkyv::to_bytes::(&msg).map(|b| b.to_vec()) { + if let Err(e) = self.sender.broadcast(Bytes::from(bytes)).await { + tracing::error!("Failed to broadcast message: {}", e); + } else { + tracing::debug!("Broadcast message from Bevy via iroh-gossip"); + } + } + } + } + // Periodic tasks: heartbeats and lock cleanup _ = heartbeat_interval.tick() => { self.broadcast_lock_heartbeats(&event_tx).await; diff --git a/crates/libmarathon/src/networking/gossip_bridge.rs b/crates/libmarathon/src/networking/gossip_bridge.rs index 119a0fc..1ec690b 100644 --- a/crates/libmarathon/src/networking/gossip_bridge.rs +++ b/crates/libmarathon/src/networking/gossip_bridge.rs @@ -43,6 +43,16 @@ pub struct GossipBridge { pub node_id: NodeId, } +impl std::fmt::Debug for GossipBridge { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("GossipBridge") + .field("node_id", &self.node_id) + .field("outgoing_len", &self.outgoing.lock().ok().map(|q| q.len())) + .field("incoming_len", &self.incoming.lock().ok().map(|q| q.len())) + .finish() + } +} + impl GossipBridge { /// Create a new gossip bridge pub fn new(node_id: NodeId) -> Self {