fix keyboard input and app shutdown freeze
Signed-off-by: Sienna Meridian Satterwhite <sienna@r3t.io>
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
//! Core Engine event loop - runs on tokio outside Bevy
|
||||
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::{EngineCommand, EngineEvent, EngineHandle, NetworkingManager, PersistenceManager};
|
||||
@@ -9,6 +10,7 @@ use crate::networking::{SessionId, VectorClock};
|
||||
pub struct EngineCore {
|
||||
handle: EngineHandle,
|
||||
networking_task: Option<JoinHandle<()>>,
|
||||
networking_cancel_token: Option<CancellationToken>,
|
||||
#[allow(dead_code)]
|
||||
persistence: PersistenceManager,
|
||||
|
||||
@@ -28,6 +30,7 @@ impl EngineCore {
|
||||
Self {
|
||||
handle,
|
||||
networking_task: None, // Start offline
|
||||
networking_cancel_token: None,
|
||||
persistence,
|
||||
node_id,
|
||||
clock,
|
||||
@@ -93,39 +96,73 @@ impl EngineCore {
|
||||
return;
|
||||
}
|
||||
|
||||
match NetworkingManager::new(session_id.clone()).await {
|
||||
Ok((net_manager, bridge)) => {
|
||||
let node_id = net_manager.node_id();
|
||||
tracing::info!("Starting networking initialization for session {}", session_id.to_code());
|
||||
|
||||
// Spawn NetworkingManager in background task
|
||||
let event_tx = self.handle.event_tx.clone();
|
||||
let task = tokio::spawn(async move {
|
||||
net_manager.run(event_tx).await;
|
||||
// Create cancellation token for graceful shutdown
|
||||
let cancel_token = CancellationToken::new();
|
||||
let cancel_token_clone = cancel_token.clone();
|
||||
|
||||
// Spawn NetworkingManager initialization in background to avoid blocking
|
||||
// DHT peer discovery can take 15+ seconds with retries
|
||||
let event_tx = self.handle.event_tx.clone();
|
||||
|
||||
// Create channel for progress updates
|
||||
let (progress_tx, mut progress_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
|
||||
// Spawn task to forward progress updates to Bevy
|
||||
let event_tx_clone = event_tx.clone();
|
||||
let session_id_clone = session_id.clone();
|
||||
tokio::spawn(async move {
|
||||
while let Some(status) = progress_rx.recv().await {
|
||||
let _ = event_tx_clone.send(EngineEvent::NetworkingInitializing {
|
||||
session_id: session_id_clone.clone(),
|
||||
status,
|
||||
});
|
||||
|
||||
self.networking_task = Some(task);
|
||||
|
||||
let _ = self.handle.event_tx.send(EngineEvent::NetworkingStarted {
|
||||
session_id: session_id.clone(),
|
||||
node_id,
|
||||
bridge,
|
||||
});
|
||||
tracing::info!("Networking started for session {}", session_id.to_code());
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = self.handle.event_tx.send(EngineEvent::NetworkingFailed {
|
||||
error: e.to_string(),
|
||||
});
|
||||
tracing::error!("Failed to start networking: {}", e);
|
||||
});
|
||||
|
||||
let task = tokio::spawn(async move {
|
||||
match NetworkingManager::new(session_id.clone(), Some(progress_tx), cancel_token_clone.clone()).await {
|
||||
Ok((net_manager, bridge)) => {
|
||||
let node_id = net_manager.node_id();
|
||||
|
||||
// Notify Bevy that networking started
|
||||
let _ = event_tx.send(EngineEvent::NetworkingStarted {
|
||||
session_id: session_id.clone(),
|
||||
node_id,
|
||||
bridge,
|
||||
});
|
||||
tracing::info!("Networking started for session {}", session_id.to_code());
|
||||
|
||||
// Run the networking manager loop with cancellation support
|
||||
net_manager.run(event_tx.clone(), cancel_token_clone).await;
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = event_tx.send(EngineEvent::NetworkingFailed {
|
||||
error: e.to_string(),
|
||||
});
|
||||
tracing::error!("Failed to start networking: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
self.networking_task = Some(task);
|
||||
self.networking_cancel_token = Some(cancel_token);
|
||||
}
|
||||
|
||||
async fn stop_networking(&mut self) {
|
||||
// Cancel the task gracefully
|
||||
if let Some(cancel_token) = self.networking_cancel_token.take() {
|
||||
cancel_token.cancel();
|
||||
tracing::info!("Networking cancellation requested");
|
||||
}
|
||||
|
||||
// Abort the task immediately - don't wait for graceful shutdown
|
||||
// This is fine because NetworkingManager doesn't hold critical resources
|
||||
if let Some(task) = self.networking_task.take() {
|
||||
task.abort(); // Cancel the networking task
|
||||
task.abort();
|
||||
tracing::info!("Networking task aborted");
|
||||
let _ = self.handle.event_tx.send(EngineEvent::NetworkingStopped);
|
||||
tracing::info!("Networking stopped");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ use crate::networking::{
|
||||
};
|
||||
|
||||
use super::EngineEvent;
|
||||
use super::events::NetworkingInitStatus;
|
||||
|
||||
pub struct NetworkingManager {
|
||||
session_id: SessionId,
|
||||
@@ -40,9 +41,19 @@ pub struct NetworkingManager {
|
||||
}
|
||||
|
||||
impl NetworkingManager {
|
||||
pub async fn new(session_id: SessionId) -> anyhow::Result<(Self, crate::networking::GossipBridge)> {
|
||||
pub async fn new(
|
||||
session_id: SessionId,
|
||||
progress_tx: Option<tokio::sync::mpsc::UnboundedSender<NetworkingInitStatus>>,
|
||||
cancel_token: tokio_util::sync::CancellationToken,
|
||||
) -> anyhow::Result<(Self, crate::networking::GossipBridge)> {
|
||||
let send_progress = |status: NetworkingInitStatus| {
|
||||
if let Some(ref tx) = progress_tx {
|
||||
let _ = tx.send(status.clone());
|
||||
}
|
||||
tracing::info!("Networking init: {:?}", status);
|
||||
};
|
||||
use iroh::{
|
||||
discovery::mdns::MdnsDiscovery,
|
||||
discovery::pkarr::dht::DhtDiscovery,
|
||||
protocol::Router,
|
||||
Endpoint,
|
||||
};
|
||||
@@ -51,12 +62,24 @@ impl NetworkingManager {
|
||||
proto::TopicId,
|
||||
};
|
||||
|
||||
// Create iroh endpoint with mDNS discovery
|
||||
// Check for cancellation at start
|
||||
if cancel_token.is_cancelled() {
|
||||
return Err(anyhow::anyhow!("Initialization cancelled before start"));
|
||||
}
|
||||
|
||||
send_progress(NetworkingInitStatus::CreatingEndpoint);
|
||||
|
||||
// Create iroh endpoint with DHT discovery
|
||||
// This allows peers to discover each other over the internet via Mainline DHT
|
||||
// Security comes from the secret session-derived ALPN, not network isolation
|
||||
let dht_discovery = DhtDiscovery::builder().build()?;
|
||||
let endpoint = Endpoint::builder()
|
||||
.discovery(MdnsDiscovery::builder())
|
||||
.discovery(dht_discovery)
|
||||
.bind()
|
||||
.await?;
|
||||
|
||||
send_progress(NetworkingInitStatus::EndpointReady);
|
||||
|
||||
let endpoint_id = endpoint.addr().id;
|
||||
|
||||
// Convert endpoint ID to NodeId (using first 16 bytes)
|
||||
@@ -65,20 +88,89 @@ impl NetworkingManager {
|
||||
node_id_bytes.copy_from_slice(&id_bytes[..16]);
|
||||
let node_id = NodeId::from_bytes(node_id_bytes);
|
||||
|
||||
// Create gossip protocol
|
||||
let gossip = Gossip::builder().spawn(endpoint.clone());
|
||||
// Create pkarr client for DHT peer discovery
|
||||
let pkarr_client = pkarr::Client::builder()
|
||||
.no_default_network()
|
||||
.dht(|x| x)
|
||||
.build()?;
|
||||
|
||||
// Discover existing peers from DHT with retries
|
||||
// Retry immediately without delays - if peers aren't in DHT yet, they'll appear soon
|
||||
let mut peer_endpoint_ids = vec![];
|
||||
for attempt in 1..=3 {
|
||||
// Check for cancellation before each attempt
|
||||
if cancel_token.is_cancelled() {
|
||||
tracing::info!("Networking initialization cancelled during DHT discovery");
|
||||
return Err(anyhow::anyhow!("Initialization cancelled"));
|
||||
}
|
||||
|
||||
send_progress(NetworkingInitStatus::DiscoveringPeers {
|
||||
session_code: session_id.to_code().to_string(),
|
||||
attempt,
|
||||
});
|
||||
match crate::engine::peer_discovery::discover_peers_from_dht(&session_id, &pkarr_client).await {
|
||||
Ok(peers) if !peers.is_empty() => {
|
||||
let count = peers.len();
|
||||
peer_endpoint_ids = peers;
|
||||
send_progress(NetworkingInitStatus::PeersFound {
|
||||
count,
|
||||
});
|
||||
break;
|
||||
}
|
||||
Ok(_) if attempt == 3 => {
|
||||
// Last attempt and no peers found
|
||||
send_progress(NetworkingInitStatus::NoPeersFound);
|
||||
}
|
||||
Ok(_) => {
|
||||
// No peers found, but will retry immediately
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("DHT query attempt {} failed: {}", attempt, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check for cancellation before publishing
|
||||
if cancel_token.is_cancelled() {
|
||||
tracing::info!("Networking initialization cancelled before DHT publish");
|
||||
return Err(anyhow::anyhow!("Initialization cancelled"));
|
||||
}
|
||||
|
||||
// Publish our presence to DHT
|
||||
send_progress(NetworkingInitStatus::PublishingToDHT);
|
||||
if let Err(e) = crate::engine::peer_discovery::publish_peer_to_dht(
|
||||
&session_id,
|
||||
endpoint_id,
|
||||
&pkarr_client,
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::warn!("Failed to publish to DHT: {}", e);
|
||||
}
|
||||
|
||||
// Check for cancellation before gossip initialization
|
||||
if cancel_token.is_cancelled() {
|
||||
tracing::info!("Networking initialization cancelled before gossip init");
|
||||
return Err(anyhow::anyhow!("Initialization cancelled"));
|
||||
}
|
||||
|
||||
// Derive session-specific ALPN for network isolation
|
||||
let session_alpn = session_id.to_alpn();
|
||||
|
||||
// Create gossip protocol with custom session ALPN
|
||||
send_progress(NetworkingInitStatus::InitializingGossip);
|
||||
let gossip = Gossip::builder()
|
||||
.alpn(&session_alpn)
|
||||
.spawn(endpoint.clone());
|
||||
|
||||
// Set up router to accept session ALPN
|
||||
let router = Router::builder(endpoint.clone())
|
||||
.accept(session_alpn.as_slice(), gossip.clone())
|
||||
.spawn();
|
||||
|
||||
// Subscribe to topic derived from session ALPN
|
||||
// Subscribe to topic with discovered peers as bootstrap
|
||||
let topic_id = TopicId::from_bytes(session_alpn);
|
||||
let subscribe_handle = gossip.subscribe(topic_id, vec![]).await?;
|
||||
let subscribe_handle = gossip.subscribe(topic_id, peer_endpoint_ids).await?;
|
||||
|
||||
let (sender, receiver) = subscribe_handle.split();
|
||||
|
||||
@@ -91,6 +183,14 @@ impl NetworkingManager {
|
||||
// Create GossipBridge for Bevy integration
|
||||
let bridge = crate::networking::GossipBridge::new(node_id);
|
||||
|
||||
// Spawn background task to maintain DHT presence
|
||||
let session_id_clone = session_id.clone();
|
||||
tokio::spawn(crate::engine::peer_discovery::maintain_dht_presence(
|
||||
session_id_clone,
|
||||
endpoint_id,
|
||||
pkarr_client,
|
||||
));
|
||||
|
||||
let manager = Self {
|
||||
session_id,
|
||||
node_id,
|
||||
@@ -120,12 +220,17 @@ impl NetworkingManager {
|
||||
|
||||
/// Process gossip events (unbounded) and periodic tasks (heartbeats, lock cleanup)
|
||||
/// Also bridges messages between iroh-gossip and Bevy's GossipBridge
|
||||
pub async fn run(mut self, event_tx: mpsc::UnboundedSender<EngineEvent>) {
|
||||
pub async fn run(mut self, event_tx: mpsc::UnboundedSender<EngineEvent>, cancel_token: tokio_util::sync::CancellationToken) {
|
||||
let mut heartbeat_interval = time::interval(Duration::from_secs(1));
|
||||
let mut bridge_poll_interval = time::interval(Duration::from_millis(10));
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Listen for shutdown signal
|
||||
_ = cancel_token.cancelled() => {
|
||||
tracing::info!("NetworkingManager received shutdown signal");
|
||||
break;
|
||||
}
|
||||
// Process incoming gossip messages and forward to GossipBridge
|
||||
Some(result) = self.receiver.next() => {
|
||||
match result {
|
||||
|
||||
Reference in New Issue
Block a user