diff --git a/crates/app/src/engine_bridge.rs b/crates/app/src/engine_bridge.rs index ad225e0..c3b50e6 100644 --- a/crates/app/src/engine_bridge.rs +++ b/crates/app/src/engine_bridge.rs @@ -18,6 +18,8 @@ impl Plugin for EngineBridgePlugin { app.add_systems(Update, poll_engine_events); // Detect changes and send clock tick commands to engine app.add_systems(PostUpdate, detect_changes_and_tick); + // Handle app exit to stop networking gracefully + app.add_systems(Update, handle_app_exit); } } @@ -46,16 +48,35 @@ fn poll_engine_events( bridge: Res, mut current_session: ResMut, mut node_clock: ResMut, + mut networking_status: Option>, ) { let events = (*bridge).poll_events(); if !events.is_empty() { for event in events { match event { + EngineEvent::NetworkingInitializing { session_id, status } => { + info!("Networking initializing for session {}: {:?}", session_id.to_code(), status); + + // Update NetworkingStatus resource + if let Some(ref mut net_status) = networking_status { + net_status.latest_status = Some(status); + } + + // Update session state to Joining if not already + if matches!(current_session.session.state, SessionState::Created) { + current_session.session.state = SessionState::Joining; + } + } EngineEvent::NetworkingStarted { session_id, node_id, bridge: gossip_bridge } => { info!("Networking started: session={}, node={}", session_id.to_code(), node_id); + // Clear networking status + if let Some(ref mut net_status) = networking_status { + net_status.latest_status = None; + } + // Insert GossipBridge for Bevy systems to use commands.insert_resource(gossip_bridge); info!("Inserted GossipBridge resource"); @@ -71,12 +92,22 @@ fn poll_engine_events( EngineEvent::NetworkingFailed { error } => { error!("Networking failed: {}", error); + // Clear networking status + if let Some(ref mut net_status) = networking_status { + net_status.latest_status = None; + } + // Keep session state as Created current_session.session.state = SessionState::Created; } EngineEvent::NetworkingStopped => { info!("Networking stopped"); + // Clear networking status + if let Some(ref mut net_status) = networking_status { + net_status.latest_status = None; + } + // Update session state to Disconnected current_session.session.state = SessionState::Disconnected; } @@ -133,3 +164,21 @@ fn poll_engine_events( } } } + +/// Handle app exit to stop networking immediately +fn handle_app_exit( + mut exit_events: MessageReader, + bridge: Res, + current_session: Res, +) { + for _ in exit_events.read() { + // If networking is active, send stop command + // Don't wait - the task will be aborted when the runtime shuts down + if current_session.session.state == SessionState::Active + || current_session.session.state == SessionState::Joining { + info!("App exiting, aborting networking immediately"); + bridge.send_command(EngineCommand::StopNetworking); + // Don't sleep - just let the app exit. The tokio runtime will clean up. + } + } +} diff --git a/crates/libmarathon/src/debug_ui/input.rs b/crates/libmarathon/src/debug_ui/input.rs index 5c2b860..6827e79 100644 --- a/crates/libmarathon/src/debug_ui/input.rs +++ b/crates/libmarathon/src/debug_ui/input.rs @@ -1509,6 +1509,17 @@ pub fn custom_input_system( } } + InputEvent::Text { text } => { + // Send text input to egui + for (entity, _settings, _pointer_pos) in egui_contexts.iter() { + egui_input_message_writer.write(EguiInputEvent { + context: entity, + event: egui::Event::Text(text.clone()), + }); + messages_written += 1; + } + } + _ => { // Ignore stylus and touch events for now } diff --git a/crates/libmarathon/src/engine/core.rs b/crates/libmarathon/src/engine/core.rs index 4814f63..f11ad74 100644 --- a/crates/libmarathon/src/engine/core.rs +++ b/crates/libmarathon/src/engine/core.rs @@ -1,6 +1,7 @@ //! Core Engine event loop - runs on tokio outside Bevy use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; use uuid::Uuid; use super::{EngineCommand, EngineEvent, EngineHandle, NetworkingManager, PersistenceManager}; @@ -9,6 +10,7 @@ use crate::networking::{SessionId, VectorClock}; pub struct EngineCore { handle: EngineHandle, networking_task: Option>, + networking_cancel_token: Option, #[allow(dead_code)] persistence: PersistenceManager, @@ -28,6 +30,7 @@ impl EngineCore { Self { handle, networking_task: None, // Start offline + networking_cancel_token: None, persistence, node_id, clock, @@ -93,39 +96,73 @@ impl EngineCore { return; } - match NetworkingManager::new(session_id.clone()).await { - Ok((net_manager, bridge)) => { - let node_id = net_manager.node_id(); + tracing::info!("Starting networking initialization for session {}", session_id.to_code()); - // Spawn NetworkingManager in background task - let event_tx = self.handle.event_tx.clone(); - let task = tokio::spawn(async move { - net_manager.run(event_tx).await; + // Create cancellation token for graceful shutdown + let cancel_token = CancellationToken::new(); + let cancel_token_clone = cancel_token.clone(); + + // Spawn NetworkingManager initialization in background to avoid blocking + // DHT peer discovery can take 15+ seconds with retries + let event_tx = self.handle.event_tx.clone(); + + // Create channel for progress updates + let (progress_tx, mut progress_rx) = tokio::sync::mpsc::unbounded_channel(); + + // Spawn task to forward progress updates to Bevy + let event_tx_clone = event_tx.clone(); + let session_id_clone = session_id.clone(); + tokio::spawn(async move { + while let Some(status) = progress_rx.recv().await { + let _ = event_tx_clone.send(EngineEvent::NetworkingInitializing { + session_id: session_id_clone.clone(), + status, }); - - self.networking_task = Some(task); - - let _ = self.handle.event_tx.send(EngineEvent::NetworkingStarted { - session_id: session_id.clone(), - node_id, - bridge, - }); - tracing::info!("Networking started for session {}", session_id.to_code()); } - Err(e) => { - let _ = self.handle.event_tx.send(EngineEvent::NetworkingFailed { - error: e.to_string(), - }); - tracing::error!("Failed to start networking: {}", e); + }); + + let task = tokio::spawn(async move { + match NetworkingManager::new(session_id.clone(), Some(progress_tx), cancel_token_clone.clone()).await { + Ok((net_manager, bridge)) => { + let node_id = net_manager.node_id(); + + // Notify Bevy that networking started + let _ = event_tx.send(EngineEvent::NetworkingStarted { + session_id: session_id.clone(), + node_id, + bridge, + }); + tracing::info!("Networking started for session {}", session_id.to_code()); + + // Run the networking manager loop with cancellation support + net_manager.run(event_tx.clone(), cancel_token_clone).await; + } + Err(e) => { + let _ = event_tx.send(EngineEvent::NetworkingFailed { + error: e.to_string(), + }); + tracing::error!("Failed to start networking: {}", e); + } } - } + }); + + self.networking_task = Some(task); + self.networking_cancel_token = Some(cancel_token); } async fn stop_networking(&mut self) { + // Cancel the task gracefully + if let Some(cancel_token) = self.networking_cancel_token.take() { + cancel_token.cancel(); + tracing::info!("Networking cancellation requested"); + } + + // Abort the task immediately - don't wait for graceful shutdown + // This is fine because NetworkingManager doesn't hold critical resources if let Some(task) = self.networking_task.take() { - task.abort(); // Cancel the networking task + task.abort(); + tracing::info!("Networking task aborted"); let _ = self.handle.event_tx.send(EngineEvent::NetworkingStopped); - tracing::info!("Networking stopped"); } } diff --git a/crates/libmarathon/src/engine/networking.rs b/crates/libmarathon/src/engine/networking.rs index edf0de2..17831f6 100644 --- a/crates/libmarathon/src/engine/networking.rs +++ b/crates/libmarathon/src/engine/networking.rs @@ -12,6 +12,7 @@ use crate::networking::{ }; use super::EngineEvent; +use super::events::NetworkingInitStatus; pub struct NetworkingManager { session_id: SessionId, @@ -40,9 +41,19 @@ pub struct NetworkingManager { } impl NetworkingManager { - pub async fn new(session_id: SessionId) -> anyhow::Result<(Self, crate::networking::GossipBridge)> { + pub async fn new( + session_id: SessionId, + progress_tx: Option>, + cancel_token: tokio_util::sync::CancellationToken, + ) -> anyhow::Result<(Self, crate::networking::GossipBridge)> { + let send_progress = |status: NetworkingInitStatus| { + if let Some(ref tx) = progress_tx { + let _ = tx.send(status.clone()); + } + tracing::info!("Networking init: {:?}", status); + }; use iroh::{ - discovery::mdns::MdnsDiscovery, + discovery::pkarr::dht::DhtDiscovery, protocol::Router, Endpoint, }; @@ -51,12 +62,24 @@ impl NetworkingManager { proto::TopicId, }; - // Create iroh endpoint with mDNS discovery + // Check for cancellation at start + if cancel_token.is_cancelled() { + return Err(anyhow::anyhow!("Initialization cancelled before start")); + } + + send_progress(NetworkingInitStatus::CreatingEndpoint); + + // Create iroh endpoint with DHT discovery + // This allows peers to discover each other over the internet via Mainline DHT + // Security comes from the secret session-derived ALPN, not network isolation + let dht_discovery = DhtDiscovery::builder().build()?; let endpoint = Endpoint::builder() - .discovery(MdnsDiscovery::builder()) + .discovery(dht_discovery) .bind() .await?; + send_progress(NetworkingInitStatus::EndpointReady); + let endpoint_id = endpoint.addr().id; // Convert endpoint ID to NodeId (using first 16 bytes) @@ -65,20 +88,89 @@ impl NetworkingManager { node_id_bytes.copy_from_slice(&id_bytes[..16]); let node_id = NodeId::from_bytes(node_id_bytes); - // Create gossip protocol - let gossip = Gossip::builder().spawn(endpoint.clone()); + // Create pkarr client for DHT peer discovery + let pkarr_client = pkarr::Client::builder() + .no_default_network() + .dht(|x| x) + .build()?; + + // Discover existing peers from DHT with retries + // Retry immediately without delays - if peers aren't in DHT yet, they'll appear soon + let mut peer_endpoint_ids = vec![]; + for attempt in 1..=3 { + // Check for cancellation before each attempt + if cancel_token.is_cancelled() { + tracing::info!("Networking initialization cancelled during DHT discovery"); + return Err(anyhow::anyhow!("Initialization cancelled")); + } + + send_progress(NetworkingInitStatus::DiscoveringPeers { + session_code: session_id.to_code().to_string(), + attempt, + }); + match crate::engine::peer_discovery::discover_peers_from_dht(&session_id, &pkarr_client).await { + Ok(peers) if !peers.is_empty() => { + let count = peers.len(); + peer_endpoint_ids = peers; + send_progress(NetworkingInitStatus::PeersFound { + count, + }); + break; + } + Ok(_) if attempt == 3 => { + // Last attempt and no peers found + send_progress(NetworkingInitStatus::NoPeersFound); + } + Ok(_) => { + // No peers found, but will retry immediately + } + Err(e) => { + tracing::warn!("DHT query attempt {} failed: {}", attempt, e); + } + } + } + + // Check for cancellation before publishing + if cancel_token.is_cancelled() { + tracing::info!("Networking initialization cancelled before DHT publish"); + return Err(anyhow::anyhow!("Initialization cancelled")); + } + + // Publish our presence to DHT + send_progress(NetworkingInitStatus::PublishingToDHT); + if let Err(e) = crate::engine::peer_discovery::publish_peer_to_dht( + &session_id, + endpoint_id, + &pkarr_client, + ) + .await + { + tracing::warn!("Failed to publish to DHT: {}", e); + } + + // Check for cancellation before gossip initialization + if cancel_token.is_cancelled() { + tracing::info!("Networking initialization cancelled before gossip init"); + return Err(anyhow::anyhow!("Initialization cancelled")); + } // Derive session-specific ALPN for network isolation let session_alpn = session_id.to_alpn(); + // Create gossip protocol with custom session ALPN + send_progress(NetworkingInitStatus::InitializingGossip); + let gossip = Gossip::builder() + .alpn(&session_alpn) + .spawn(endpoint.clone()); + // Set up router to accept session ALPN let router = Router::builder(endpoint.clone()) .accept(session_alpn.as_slice(), gossip.clone()) .spawn(); - // Subscribe to topic derived from session ALPN + // Subscribe to topic with discovered peers as bootstrap let topic_id = TopicId::from_bytes(session_alpn); - let subscribe_handle = gossip.subscribe(topic_id, vec![]).await?; + let subscribe_handle = gossip.subscribe(topic_id, peer_endpoint_ids).await?; let (sender, receiver) = subscribe_handle.split(); @@ -91,6 +183,14 @@ impl NetworkingManager { // Create GossipBridge for Bevy integration let bridge = crate::networking::GossipBridge::new(node_id); + // Spawn background task to maintain DHT presence + let session_id_clone = session_id.clone(); + tokio::spawn(crate::engine::peer_discovery::maintain_dht_presence( + session_id_clone, + endpoint_id, + pkarr_client, + )); + let manager = Self { session_id, node_id, @@ -120,12 +220,17 @@ impl NetworkingManager { /// Process gossip events (unbounded) and periodic tasks (heartbeats, lock cleanup) /// Also bridges messages between iroh-gossip and Bevy's GossipBridge - pub async fn run(mut self, event_tx: mpsc::UnboundedSender) { + pub async fn run(mut self, event_tx: mpsc::UnboundedSender, cancel_token: tokio_util::sync::CancellationToken) { let mut heartbeat_interval = time::interval(Duration::from_secs(1)); let mut bridge_poll_interval = time::interval(Duration::from_millis(10)); loop { tokio::select! { + // Listen for shutdown signal + _ = cancel_token.cancelled() => { + tracing::info!("NetworkingManager received shutdown signal"); + break; + } // Process incoming gossip messages and forward to GossipBridge Some(result) = self.receiver.next() => { match result { diff --git a/crates/libmarathon/src/platform/desktop/input.rs b/crates/libmarathon/src/platform/desktop/input.rs index 1233aa1..4a7968e 100644 --- a/crates/libmarathon/src/platform/desktop/input.rs +++ b/crates/libmarathon/src/platform/desktop/input.rs @@ -437,25 +437,18 @@ pub fn push_device_event(event: &winit::event::DeviceEvent) { } } -/// Drain all buffered winit events and convert to InputEvents -/// -/// Call this from your engine's input processing to consume events. -/// This uses a lock-free channel so it never blocks and can't silently drop events. pub fn drain_as_input_events() -> Vec { let (_, receiver) = get_event_channel(); - // Drain all events from the channel + // Drain all events from the channel and convert to InputEvents + // Each raw event may generate multiple InputEvents (e.g., Keyboard + Text) receiver .try_iter() - .filter_map(raw_to_input_event) + .flat_map(raw_to_input_event) .collect() } -/// Convert a raw winit event to an engine InputEvent -/// -/// Only input-related events are converted. Other events (gestures, file drop, IME, etc.) -/// return None and should be handled by the Bevy event system directly. -fn raw_to_input_event(event: RawWinitEvent) -> Option { +fn raw_to_input_event(event: RawWinitEvent) -> Vec { match event { // === MOUSE INPUT === RawWinitEvent::MouseButton { button, state, position } => { @@ -464,55 +457,70 @@ fn raw_to_input_event(event: RawWinitEvent) -> Option { ElementState::Released => TouchPhase::Ended, }; - Some(InputEvent::Mouse { + vec![InputEvent::Mouse { pos: position, button, phase, - }) + }] } RawWinitEvent::CursorMoved { position } => { // Check if any button is pressed - let input_state = INPUT_STATE.lock().ok()?; + let Some(input_state) = INPUT_STATE.lock().ok() else { + return vec![]; + }; if input_state.left_pressed { - Some(InputEvent::Mouse { + vec![InputEvent::Mouse { pos: position, button: MouseButton::Left, phase: TouchPhase::Moved, - }) + }] } else if input_state.right_pressed { - Some(InputEvent::Mouse { + vec![InputEvent::Mouse { pos: position, button: MouseButton::Right, phase: TouchPhase::Moved, - }) + }] } else if input_state.middle_pressed { - Some(InputEvent::Mouse { + vec![InputEvent::Mouse { pos: position, button: MouseButton::Middle, phase: TouchPhase::Moved, - }) + }] } else { // No button pressed - hover tracking - Some(InputEvent::MouseMove { pos: position }) + vec![InputEvent::MouseMove { pos: position }] } } RawWinitEvent::MouseWheel { delta, position } => { - Some(InputEvent::MouseWheel { + vec![InputEvent::MouseWheel { delta, pos: position, - }) + }] } // === KEYBOARD INPUT === - RawWinitEvent::Keyboard { key, state, modifiers, .. } => { - Some(InputEvent::Keyboard { + RawWinitEvent::Keyboard { key, state, modifiers, text, .. } => { + let mut events = vec![InputEvent::Keyboard { key, pressed: state == ElementState::Pressed, modifiers, - }) + }]; + + // If there's text input and the key was pressed, send a Text event too + // But only for printable characters, not control characters (backspace, etc.) + if state == ElementState::Pressed { + if let Some(text) = text { + // Filter out control characters - only send printable text + if !text.is_empty() && text.chars().all(|c| !c.is_control()) { + events.push(InputEvent::Text { text }); + } + } + } + + events } // === TOUCH INPUT (APPLE PENCIL!) === @@ -543,55 +551,55 @@ fn raw_to_input_event(event: RawWinitEvent) -> Option { 0.0, // Azimuth not provided by winit Force::Calibrated ); - Some(InputEvent::Stylus { + vec![InputEvent::Stylus { pos: position, pressure, tilt, phase: touch_phase, timestamp: 0.0, // TODO: Get actual timestamp from winit when available - }) + }] } Some(WinitForce::Normalized(pressure)) => { // Normalized pressure (0.0-1.0), likely a stylus - Some(InputEvent::Stylus { + vec![InputEvent::Stylus { pos: position, pressure: pressure as f32, tilt: Vec2::ZERO, // No tilt data in normalized mode phase: touch_phase, timestamp: 0.0, - }) + }] } None => { // No force data - regular touch (finger) - Some(InputEvent::Touch { + vec![InputEvent::Touch { pos: position, phase: touch_phase, id, - }) + }] } } } // === GESTURE INPUT === RawWinitEvent::PinchGesture { delta } => { - Some(InputEvent::PinchGesture { delta }) + vec![InputEvent::PinchGesture { delta }] } RawWinitEvent::RotationGesture { delta } => { - Some(InputEvent::RotationGesture { delta }) + vec![InputEvent::RotationGesture { delta }] } RawWinitEvent::PanGesture { delta } => { - Some(InputEvent::PanGesture { delta }) + vec![InputEvent::PanGesture { delta }] } RawWinitEvent::DoubleTapGesture => { - Some(InputEvent::DoubleTapGesture) + vec![InputEvent::DoubleTapGesture] } // === MOUSE MOTION (RAW DELTA) === RawWinitEvent::MouseMotion { delta } => { - Some(InputEvent::MouseMotion { delta }) + vec![InputEvent::MouseMotion { delta }] } // === NON-INPUT EVENTS === @@ -611,7 +619,7 @@ fn raw_to_input_event(event: RawWinitEvent) -> Option { RawWinitEvent::Moved { .. } => { // These are window/UI events, should be sent to Bevy messages // (to be implemented when we add Bevy window event forwarding) - None + vec![] } } } diff --git a/crates/libmarathon/src/platform/input/controller.rs b/crates/libmarathon/src/platform/input/controller.rs index 15b0e65..501870a 100644 --- a/crates/libmarathon/src/platform/input/controller.rs +++ b/crates/libmarathon/src/platform/input/controller.rs @@ -245,6 +245,11 @@ impl InputController { } // In other contexts, ignore MouseMotion to avoid conflicts with cursor-based input } + + InputEvent::Text { text: _ } => { + // Text input is handled by egui, not by game actions + // This is for typing in text fields, not game controls + } } actions diff --git a/crates/libmarathon/src/platform/input/events.rs b/crates/libmarathon/src/platform/input/events.rs index f5255ba..1f00cc9 100644 --- a/crates/libmarathon/src/platform/input/events.rs +++ b/crates/libmarathon/src/platform/input/events.rs @@ -52,7 +52,7 @@ pub struct InputEventBuffer { /// /// Platform-specific code converts native input (UITouch, winit events) /// into these engine-agnostic events. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] pub enum InputEvent { /// Stylus input (Apple Pencil, Surface Pen, etc.) Stylus { @@ -108,6 +108,13 @@ pub enum InputEvent { modifiers: Modifiers, }, + /// Text input from keyboard + /// This is the actual character that was typed, after applying keyboard layout + Text { + /// The text/character that was entered + text: String, + }, + /// Mouse wheel scroll MouseWheel { /// Scroll delta (pixels or lines depending on device) @@ -155,6 +162,7 @@ impl InputEvent { InputEvent::Touch { pos, .. } => Some(*pos), InputEvent::MouseWheel { pos, .. } => Some(*pos), InputEvent::Keyboard { .. } | + InputEvent::Text { .. } | InputEvent::MouseMotion { .. } | InputEvent::PinchGesture { .. } | InputEvent::RotationGesture { .. } | @@ -170,6 +178,7 @@ impl InputEvent { InputEvent::Mouse { phase, .. } => Some(*phase), InputEvent::Touch { phase, .. } => Some(*phase), InputEvent::Keyboard { .. } | + InputEvent::Text { .. } | InputEvent::MouseWheel { .. } | InputEvent::MouseMove { .. } | InputEvent::MouseMotion { .. } |