initial commit for session and lock features

Signed-off-by: Sienna Meridian Satterwhite <sienna@r3t.io>
This commit is contained in:
2025-12-12 20:18:41 +00:00
parent 2dbbf28bc6
commit b098a19d6b
28 changed files with 3178 additions and 655 deletions

2
Cargo.lock generated
View File

@@ -4473,6 +4473,7 @@ dependencies = [
"anyhow",
"bevy",
"bincode",
"blake3",
"blocking",
"chrono",
"crdts",
@@ -4481,6 +4482,7 @@ dependencies = [
"iroh",
"iroh-gossip",
"proptest",
"rand 0.8.5",
"rusqlite",
"serde",
"serde_json",

View File

@@ -14,14 +14,14 @@ impl Plugin for CameraPlugin {
/// Set up the 3D camera
///
/// Camera is positioned at (4, 3, 6) looking at the cube's initial position (0, 0.5, 0).
/// This provides a good viewing angle to see the cube, ground plane, and any movements.
/// Camera is positioned at (4, 3, 6) looking at the cube's initial position (0,
/// 0.5, 0). This provides a good viewing angle to see the cube, ground plane,
/// and any movements.
fn setup_camera(mut commands: Commands) {
info!("Setting up camera");
commands.spawn((
Camera3d::default(),
Transform::from_xyz(4.0, 3.0, 6.0)
.looking_at(Vec3::new(0.0, 0.5, 0.0), Vec3::Y),
Transform::from_xyz(4.0, 3.0, 6.0).looking_at(Vec3::new(0.0, 0.5, 0.0), Vec3::Y),
));
}

View File

@@ -2,10 +2,17 @@
use bevy::prelude::*;
use lib::{
networking::{NetworkedEntity, NetworkedTransform, Synced},
networking::{
NetworkedEntity,
NetworkedTransform,
Synced,
},
persistence::Persisted,
};
use serde::{Deserialize, Serialize};
use serde::{
Deserialize,
Serialize,
};
use uuid::Uuid;
/// Marker component for the replicated cube

View File

@@ -1,8 +1,15 @@
//! Debug UI overlay using egui
use bevy::prelude::*;
use bevy_egui::{egui, EguiContexts, EguiPrimaryContextPass};
use lib::networking::{GossipBridge, NodeVectorClock};
use bevy_egui::{
egui,
EguiContexts,
EguiPrimaryContextPass,
};
use lib::networking::{
GossipBridge,
NodeVectorClock,
};
pub struct DebugUiPlugin;
@@ -17,7 +24,10 @@ fn render_debug_ui(
mut contexts: EguiContexts,
node_clock: Option<Res<NodeVectorClock>>,
gossip_bridge: Option<Res<GossipBridge>>,
cube_query: Query<(&Transform, &lib::networking::NetworkedEntity), With<crate::cube::CubeMarker>>,
cube_query: Query<
(&Transform, &lib::networking::NetworkedEntity),
With<crate::cube::CubeMarker>,
>,
) {
let Ok(ctx) = contexts.ctx_mut() else {
return;
@@ -35,7 +45,8 @@ fn render_debug_ui(
if let Some(clock) = &node_clock {
ui.label(format!("Node ID: {}", &clock.node_id.to_string()[..8]));
// Show the current node's clock value (timestamp)
let current_timestamp = clock.clock.clocks.get(&clock.node_id).copied().unwrap_or(0);
let current_timestamp =
clock.clock.clocks.get(&clock.node_id).copied().unwrap_or(0);
ui.label(format!("Clock: {}", current_timestamp));
ui.label(format!("Known nodes: {}", clock.clock.clocks.len()));
} else {
@@ -46,7 +57,10 @@ fn render_debug_ui(
// Gossip bridge status
if let Some(bridge) = &gossip_bridge {
ui.label(format!("Bridge Node: {}", &bridge.node_id().to_string()[..8]));
ui.label(format!(
"Bridge Node: {}",
&bridge.node_id().to_string()[..8]
));
ui.label("Status: Connected");
} else {
ui.label("Gossip: Not ready");
@@ -58,25 +72,38 @@ fn render_debug_ui(
// Cube information
match cube_query.iter().next() {
Some((transform, networked)) => {
| Some((transform, networked)) => {
let pos = transform.translation;
ui.label(format!("Position: ({:.2}, {:.2}, {:.2})", pos.x, pos.y, pos.z));
ui.label(format!(
"Position: ({:.2}, {:.2}, {:.2})",
pos.x, pos.y, pos.z
));
let (axis, angle) = transform.rotation.to_axis_angle();
let angle_deg: f32 = angle.to_degrees();
ui.label(format!("Rotation: {:.2}° around ({:.2}, {:.2}, {:.2})",
angle_deg, axis.x, axis.y, axis.z));
ui.label(format!(
"Rotation: {:.2}° around ({:.2}, {:.2}, {:.2})",
angle_deg, axis.x, axis.y, axis.z
));
ui.label(format!("Scale: ({:.2}, {:.2}, {:.2})",
transform.scale.x, transform.scale.y, transform.scale.z));
ui.label(format!(
"Scale: ({:.2}, {:.2}, {:.2})",
transform.scale.x, transform.scale.y, transform.scale.z
));
ui.add_space(5.0);
ui.label(format!("Network ID: {}", &networked.network_id.to_string()[..8]));
ui.label(format!("Owner: {}", &networked.owner_node_id.to_string()[..8]));
}
None => {
ui.label(format!(
"Network ID: {}",
&networked.network_id.to_string()[..8]
));
ui.label(format!(
"Owner: {}",
&networked.owner_node_id.to_string()[..8]
));
},
| None => {
ui.label("Cube: Not spawned yet");
}
},
}
ui.add_space(10.0);

View File

@@ -1,7 +1,8 @@
//! Gossip networking setup with dedicated tokio runtime
//!
//! This module manages iroh-gossip networking with a tokio runtime running as a sidecar to Bevy.
//! The tokio runtime runs in a dedicated background thread, separate from Bevy's ECS loop.
//! This module manages iroh-gossip networking with a tokio runtime running as a
//! sidecar to Bevy. The tokio runtime runs in a dedicated background thread,
//! separate from Bevy's ECS loop.
//!
//! # Architecture
//!
@@ -48,9 +49,16 @@
use anyhow::Result;
use bevy::prelude::*;
use lib::networking::GossipBridge;
use lib::networking::{GossipBridge, SessionId};
use uuid::Uuid;
/// Session ID to use for network initialization
///
/// This resource must be inserted before setup_gossip_networking runs.
/// It provides the session ID used to derive the session-specific ALPN.
#[derive(Resource, Clone)]
pub struct InitialSessionId(pub SessionId);
/// Channel for receiving the GossipBridge from the background thread
///
/// This resource exists temporarily during startup. Once the GossipBridge
@@ -69,8 +77,21 @@ pub struct GossipBridgeChannel(crossbeam_channel::Receiver<GossipBridge>);
///
/// - **macOS**: Full support with mDNS discovery
/// - **iOS**: Not yet implemented
pub fn setup_gossip_networking(mut commands: Commands) {
info!("Setting up gossip networking...");
///
/// # Requirements
///
/// The InitialSessionId resource must be inserted before this system runs.
/// If not present, an error is logged and networking is disabled.
pub fn setup_gossip_networking(
mut commands: Commands,
session_id: Option<Res<InitialSessionId>>,
) {
let Some(session_id) = session_id else {
error!("InitialSessionId resource not found - cannot initialize networking");
return;
};
info!("Setting up gossip networking for session {}...", session_id.0);
// Spawn dedicated thread with Tokio runtime for gossip initialization
#[cfg(not(target_os = "ios"))]
@@ -78,19 +99,20 @@ pub fn setup_gossip_networking(mut commands: Commands) {
let (sender, receiver) = crossbeam_channel::unbounded();
commands.insert_resource(GossipBridgeChannel(receiver));
let session_id = session_id.0.clone();
std::thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
match init_gossip().await {
Ok(bridge) => {
match init_gossip(session_id).await {
| Ok(bridge) => {
info!("Gossip bridge initialized successfully");
if let Err(e) = sender.send(bridge) {
error!("Failed to send bridge to main thread: {}", e);
}
}
Err(e) => {
},
| Err(e) => {
error!("Failed to initialize gossip: {}", e);
}
},
}
});
});
@@ -114,8 +136,7 @@ pub fn setup_gossip_networking(mut commands: Commands) {
/// - **iOS**: No-op (networking not implemented)
pub fn poll_gossip_bridge(
mut commands: Commands,
#[cfg(not(target_os = "ios"))]
channel: Option<Res<GossipBridgeChannel>>,
#[cfg(not(target_os = "ios"))] channel: Option<Res<GossipBridgeChannel>>,
) {
#[cfg(not(target_os = "ios"))]
if let Some(channel) = channel {
@@ -127,16 +148,21 @@ pub fn poll_gossip_bridge(
}
}
/// Initialize iroh-gossip networking stack
/// Initialize iroh-gossip networking stack with session-specific ALPN
///
/// This async function runs in the background tokio runtime and:
/// 1. Creates an iroh endpoint with mDNS discovery
/// 2. Spawns the gossip protocol
/// 3. Sets up the router to accept gossip connections
/// 4. Subscribes to a shared topic (ID: [42; 32])
/// 5. Waits for join with a 2-second timeout
/// 6. Creates and configures the GossipBridge
/// 7. Spawns forwarding tasks to bridge messages
/// 3. Derives session-specific ALPN from session ID (using BLAKE3)
/// 4. Sets up the router to accept connections on the session ALPN
/// 5. Subscribes to a topic derived from the session ALPN
/// 6. Waits for join with a 2-second timeout
/// 7. Creates and configures the GossipBridge
/// 8. Spawns forwarding tasks to bridge messages
///
/// # Parameters
///
/// - `session_id`: The session ID used to derive the ALPN for network isolation
///
/// # Returns
///
@@ -147,12 +173,16 @@ pub fn poll_gossip_bridge(
///
/// This function is only compiled on non-iOS platforms.
#[cfg(not(target_os = "ios"))]
async fn init_gossip() -> Result<GossipBridge> {
use iroh::discovery::mdns::MdnsDiscovery;
use iroh::protocol::Router;
use iroh::Endpoint;
use iroh_gossip::net::Gossip;
use iroh_gossip::proto::TopicId;
async fn init_gossip(session_id: SessionId) -> Result<GossipBridge> {
use iroh::{
discovery::mdns::MdnsDiscovery,
protocol::Router,
Endpoint,
};
use iroh_gossip::{
net::Gossip,
proto::TopicId,
};
info!("Creating endpoint with mDNS discovery...");
let endpoint = Endpoint::builder()
@@ -172,14 +202,21 @@ async fn init_gossip() -> Result<GossipBridge> {
info!("Spawning gossip protocol...");
let gossip = Gossip::builder().spawn(endpoint.clone());
// Derive session-specific ALPN for network isolation
let session_alpn = session_id.to_alpn();
info!(
"Using session-specific ALPN (session: {})",
session_id
);
info!("Setting up router...");
let router = Router::builder(endpoint.clone())
.accept(iroh_gossip::ALPN, gossip.clone())
.accept(session_alpn.as_slice(), gossip.clone())
.spawn();
// Subscribe to shared topic
let topic_id = TopicId::from_bytes([42; 32]);
info!("Subscribing to topic...");
// Subscribe to topic derived from session ALPN (use same bytes for consistency)
let topic_id = TopicId::from_bytes(session_alpn);
info!("Subscribing to session topic...");
let subscribe_handle = gossip.subscribe(topic_id, vec![]).await?;
let (sender, mut receiver) = subscribe_handle.split();
@@ -187,9 +224,9 @@ async fn init_gossip() -> Result<GossipBridge> {
// Wait for join (with timeout since we might be the first node)
info!("Waiting for gossip join...");
match tokio::time::timeout(std::time::Duration::from_secs(2), receiver.joined()).await {
Ok(Ok(())) => info!("Joined gossip swarm"),
Ok(Err(e)) => warn!("Join error: {} (proceeding anyway)", e),
Err(_) => info!("Join timeout (first node in swarm)"),
| Ok(Ok(())) => info!("Joined gossip swarm"),
| Ok(Err(e)) => warn!("Join error: {} (proceeding anyway)", e),
| Err(_) => info!("Join timeout (first node in swarm)"),
}
// Create bridge
@@ -204,16 +241,19 @@ async fn init_gossip() -> Result<GossipBridge> {
/// Spawn tokio tasks to forward messages between iroh-gossip and GossipBridge
///
/// This function spawns two concurrent tokio tasks that run for the lifetime of the application:
/// This function spawns two concurrent tokio tasks that run for the lifetime of
/// the application:
///
/// 1. **Outgoing Task**: Polls GossipBridge for outgoing messages and broadcasts them via gossip
/// 2. **Incoming Task**: Receives messages from gossip and pushes them into GossipBridge
/// 1. **Outgoing Task**: Polls GossipBridge for outgoing messages and
/// broadcasts them via gossip
/// 2. **Incoming Task**: Receives messages from gossip and pushes them into
/// GossipBridge
///
/// # Lifetime Management
///
/// The iroh resources (endpoint, router, gossip) are moved into the first task to keep them
/// alive for the application lifetime. Without this, they would be dropped immediately and
/// the gossip connection would close.
/// The iroh resources (endpoint, router, gossip) are moved into the first task
/// to keep them alive for the application lifetime. Without this, they would be
/// dropped immediately and the gossip connection would close.
///
/// # Platform Support
///
@@ -227,10 +267,11 @@ fn spawn_bridge_tasks(
_router: iroh::protocol::Router,
_gossip: iroh_gossip::net::Gossip,
) {
use std::time::Duration;
use bytes::Bytes;
use futures_lite::StreamExt;
use lib::networking::VersionedMessage;
use std::time::Duration;
let node_id = bridge.node_id();
@@ -259,7 +300,7 @@ fn spawn_bridge_tasks(
tokio::spawn(async move {
loop {
match tokio::time::timeout(Duration::from_millis(100), receiver.next()).await {
Ok(Some(Ok(event))) => {
| Ok(Some(Ok(event))) => {
if let iroh_gossip::api::Event::Received(msg) = event {
if let Ok(versioned_msg) =
bincode::deserialize::<VersionedMessage>(&msg.content)
@@ -269,10 +310,10 @@ fn spawn_bridge_tasks(
}
}
}
}
Ok(Some(Err(e))) => error!("[Node {}] Receiver error: {}", node_id, e),
Ok(None) => break,
Err(_) => {} // Timeout
},
| Ok(Some(Err(e))) => error!("[Node {}] Receiver error: {}", node_id, e),
| Ok(None) => break,
| Err(_) => {}, // Timeout
}
}
});

View File

@@ -5,31 +5,52 @@
use std::{
path::PathBuf,
time::{Duration, Instant},
time::{
Duration,
Instant,
},
};
use anyhow::Result;
use app::CubeMarker;
use bevy::{
app::{App, ScheduleRunnerPlugin},
app::{
App,
ScheduleRunnerPlugin,
},
ecs::world::World,
prelude::*,
MinimalPlugins,
};
use bytes::Bytes;
use futures_lite::StreamExt;
use iroh::{protocol::Router, Endpoint};
use iroh::{
protocol::Router,
Endpoint,
};
use iroh_gossip::{
api::{GossipReceiver, GossipSender},
api::{
GossipReceiver,
GossipSender,
},
net::Gossip,
proto::TopicId,
};
use lib::{
networking::{
GossipBridge, NetworkedEntity, NetworkedTransform, NetworkingConfig, NetworkingPlugin,
Synced, VersionedMessage,
GossipBridge,
NetworkedEntity,
NetworkedTransform,
NetworkingConfig,
NetworkingPlugin,
Synced,
VersionedMessage,
},
persistence::{
Persisted,
PersistenceConfig,
PersistencePlugin,
},
persistence::{Persisted, PersistenceConfig, PersistencePlugin},
};
use tempfile::TempDir;
use uuid::Uuid;
@@ -66,11 +87,9 @@ mod test_utils {
pub fn create_test_app(node_id: Uuid, db_path: PathBuf, bridge: GossipBridge) -> App {
let mut app = App::new();
app.add_plugins(
MinimalPlugins.set(ScheduleRunnerPlugin::run_loop(Duration::from_secs_f64(
1.0 / 60.0,
))),
)
app.add_plugins(MinimalPlugins.set(ScheduleRunnerPlugin::run_loop(
Duration::from_secs_f64(1.0 / 60.0),
)))
.insert_resource(bridge)
.add_plugins(NetworkingPlugin::new(NetworkingConfig {
node_id,
@@ -117,8 +136,7 @@ mod test_utils {
check_fn: F,
) -> Result<()>
where
F: Fn(&mut World, &mut World) -> bool,
{
F: Fn(&mut World, &mut World) -> bool, {
let start = Instant::now();
let mut tick_count = 0;
@@ -200,10 +218,10 @@ mod test_utils {
println!(" Connecting to bootstrap peers...");
for addr in &bootstrap_addrs {
match endpoint.connect(addr.clone(), iroh_gossip::ALPN).await {
Ok(_conn) => println!(" ✓ Connected to bootstrap peer: {}", addr.id),
Err(e) => {
| Ok(_conn) => println!(" ✓ Connected to bootstrap peer: {}", addr.id),
| Err(e) => {
println!(" ✗ Failed to connect to bootstrap peer {}: {}", addr.id, e)
}
},
}
}
}
@@ -220,11 +238,11 @@ mod test_utils {
if has_bootstrap_peers {
println!(" Waiting for join to complete (with timeout)...");
match tokio::time::timeout(Duration::from_secs(3), receiver.joined()).await {
Ok(Ok(())) => println!(" Join completed!"),
Ok(Err(e)) => println!(" Join error: {}", e),
Err(_) => {
| Ok(Ok(())) => println!(" Join completed!"),
| Ok(Err(e)) => println!(" Join error: {}", e),
| Err(_) => {
println!(" Join timeout - proceeding anyway (mDNS may still connect later)")
}
},
}
} else {
println!(" No bootstrap peers - skipping join wait (first node in swarm)");
@@ -270,7 +288,8 @@ mod test_utils {
Ok((ep1, ep2, router1, router2, bridge1, bridge2))
}
/// Spawn background tasks to forward messages between iroh-gossip and GossipBridge
/// Spawn background tasks to forward messages between iroh-gossip and
/// GossipBridge
fn spawn_gossip_bridge_tasks(
sender: GossipSender,
mut receiver: GossipReceiver,
@@ -290,7 +309,7 @@ mod test_utils {
node_id, msg_count
);
match bincode::serialize(&versioned_msg) {
Ok(bytes) => {
| Ok(bytes) => {
if let Err(e) = sender.broadcast(Bytes::from(bytes)).await {
eprintln!("[Node {}] Failed to broadcast message: {}", node_id, e);
} else {
@@ -299,8 +318,8 @@ mod test_utils {
node_id, msg_count
);
}
}
Err(e) => eprintln!(
},
| Err(e) => eprintln!(
"[Node {}] Failed to serialize message for broadcast: {}",
node_id, e
),
@@ -318,7 +337,7 @@ mod test_utils {
println!("[Node {}] Gossip receiver task started", node_id);
loop {
match tokio::time::timeout(Duration::from_millis(100), receiver.next()).await {
Ok(Some(Ok(event))) => {
| Ok(Some(Ok(event))) => {
println!(
"[Node {}] Received gossip event: {:?}",
node_id,
@@ -331,7 +350,7 @@ mod test_utils {
node_id, msg_count
);
match bincode::deserialize::<VersionedMessage>(&msg.content) {
Ok(versioned_msg) => {
| Ok(versioned_msg) => {
if let Err(e) = bridge_in.push_incoming(versioned_msg) {
eprintln!(
"[Node {}] Failed to push to bridge incoming: {}",
@@ -343,24 +362,24 @@ mod test_utils {
node_id, msg_count
);
}
}
Err(e) => eprintln!(
},
| Err(e) => eprintln!(
"[Node {}] Failed to deserialize gossip message: {}",
node_id, e
),
}
}
}
Ok(Some(Err(e))) => {
},
| Ok(Some(Err(e))) => {
eprintln!("[Node {}] Gossip receiver error: {}", node_id, e)
}
Ok(None) => {
},
| Ok(None) => {
println!("[Node {}] Gossip stream ended", node_id);
break;
}
Err(_) => {
},
| Err(_) => {
// Timeout, no message available
}
},
}
}
});

View File

@@ -19,6 +19,8 @@ bevy.workspace = true
bincode = "1.3"
futures-lite = "2.0"
sha2 = "0.10"
blake3 = "1.5"
rand = "0.8"
tokio.workspace = true
blocking = "1.6"

View File

@@ -4,11 +4,11 @@
//! single-pass optimization in the happened_before() method.
use criterion::{
BenchmarkId,
Criterion,
black_box,
criterion_group,
criterion_main,
BenchmarkId,
Criterion,
};
use lib::networking::VectorClock;
@@ -109,7 +109,8 @@ fn bench_happened_before_large_clocks(c: &mut Criterion) {
/// Benchmark: happened_before with disjoint node sets
///
/// This is the scenario where early exit optimization provides the most benefit.
/// This is the scenario where early exit optimization provides the most
/// benefit.
fn bench_happened_before_disjoint(c: &mut Criterion) {
let mut group = c.benchmark_group("VectorClock::happened_before (disjoint)");
@@ -257,27 +258,27 @@ fn bench_realistic_workload(c: &mut Criterion) {
// Simulate 100 operations across 3 nodes
for i in 0..100 {
match i % 7 {
0 => {
| 0 => {
clock1.increment(node1);
}
1 => {
},
| 1 => {
clock2.increment(node2);
}
2 => {
},
| 2 => {
clock3.increment(node3);
}
3 => {
},
| 3 => {
clock1.merge(&clock2);
}
4 => {
},
| 4 => {
clock2.merge(&clock3);
}
5 => {
},
| 5 => {
let _ = clock1.happened_before(&clock2);
}
_ => {
},
| _ => {
let _ = clock2.is_concurrent_with(&clock3);
}
},
}
}

View File

@@ -4,18 +4,19 @@
//! optimization that replaced Vec::retain() with HashMap-based indexing.
use criterion::{
BenchmarkId,
Criterion,
black_box,
criterion_group,
criterion_main,
BenchmarkId,
Criterion,
};
use lib::persistence::{
PersistenceOp,
WriteBuffer,
};
/// Benchmark: Add many updates to the same component (worst case for old implementation)
/// Benchmark: Add many updates to the same component (worst case for old
/// implementation)
///
/// This scenario heavily stresses the deduplication logic. In the old O(n)
/// implementation, each add() would scan the entire buffer. With HashMap
@@ -105,9 +106,9 @@ fn bench_mixed_workload(c: &mut Criterion) {
// 70% updates to Transform, 20% to Material, 10% to unique components
for i in 0..num_ops {
let (component_type, data_size) = match i % 10 {
0..=6 => ("Transform".to_string(), 64), // 70%
7..=8 => ("Material".to_string(), 128), // 20%
_ => (format!("Component{}", i), 32), // 10%
| 0..=6 => ("Transform".to_string(), 64), // 70%
| 7..=8 => ("Material".to_string(), 128), // 20%
| _ => (format!("Component{}", i), 32), // 10%
};
let op = PersistenceOp::UpsertComponent {

View File

@@ -502,6 +502,10 @@ pub fn receive_and_apply_deltas_system(world: &mut World) {
// Handled by handle_missing_deltas_system
debug!("MissingDeltas handled by dedicated system");
},
| SyncMessage::Lock { .. } => {
// Handled by lock message dispatcher
debug!("Lock message handled by dedicated system");
},
}
}
}

View File

@@ -144,14 +144,22 @@ mod tests {
#[test]
fn test_send_message() {
use crate::networking::SyncMessage;
use crate::networking::{
JoinType,
SessionId,
SyncMessage,
};
let node_id = uuid::Uuid::new_v4();
let bridge = GossipBridge::new(node_id);
let session_id = SessionId::new();
let message = SyncMessage::JoinRequest {
node_id,
session_id,
session_secret: None,
last_known_clock: None,
join_type: JoinType::Fresh,
};
let versioned = VersionedMessage::new(message);

View File

@@ -19,11 +19,14 @@ use bevy::{
use crate::networking::{
GossipBridge,
NetworkedEntity,
SessionId,
VectorClock,
blob_support::BlobStore,
delta_generation::NodeVectorClock,
entity_map::NetworkEntityMap,
messages::{
EntityState,
JoinType,
SyncMessage,
VersionedMessage,
},
@@ -33,24 +36,34 @@ use crate::networking::{
///
/// # Arguments
/// * `node_id` - The UUID of the node requesting to join
/// * `session_id` - The session to join
/// * `session_secret` - Optional pre-shared secret for authentication
/// * `last_known_clock` - Optional vector clock from previous session (for rejoin)
/// * `join_type` - Whether this is a fresh join or rejoin
///
/// # Example
///
/// ```
/// use lib::networking::build_join_request;
/// use lib::networking::{build_join_request, SessionId, JoinType};
/// use uuid::Uuid;
///
/// let node_id = Uuid::new_v4();
/// let request = build_join_request(node_id, None);
/// let session_id = SessionId::new();
/// let request = build_join_request(node_id, session_id, None, None, JoinType::Fresh);
/// ```
pub fn build_join_request(
node_id: uuid::Uuid,
session_id: SessionId,
session_secret: Option<Vec<u8>>,
last_known_clock: Option<VectorClock>,
join_type: JoinType,
) -> VersionedMessage {
VersionedMessage::new(SyncMessage::JoinRequest {
node_id,
session_id,
session_secret,
last_known_clock,
join_type,
})
}
@@ -340,9 +353,15 @@ pub fn handle_join_requests_system(
match message.message {
| SyncMessage::JoinRequest {
node_id,
session_id,
session_secret,
last_known_clock: _,
join_type,
} => {
info!("Received JoinRequest from node {}", node_id);
info!(
"Received JoinRequest from node {} for session {} (type: {:?})",
node_id, session_id, join_type
);
// Validate session secret if configured
if let Some(expected) =
@@ -454,15 +473,22 @@ mod tests {
#[test]
fn test_build_join_request() {
let node_id = uuid::Uuid::new_v4();
let request = build_join_request(node_id, None);
let session_id = SessionId::new();
let request = build_join_request(node_id, session_id.clone(), None, None, JoinType::Fresh);
match request.message {
| SyncMessage::JoinRequest {
node_id: req_node_id,
session_id: req_session_id,
session_secret,
last_known_clock,
join_type,
} => {
assert_eq!(req_node_id, node_id);
assert_eq!(req_session_id, session_id);
assert!(session_secret.is_none());
assert!(last_known_clock.is_none());
assert!(matches!(join_type, JoinType::Fresh));
},
| _ => panic!("Expected JoinRequest"),
}
@@ -471,15 +497,64 @@ mod tests {
#[test]
fn test_build_join_request_with_secret() {
let node_id = uuid::Uuid::new_v4();
let session_id = SessionId::new();
let secret = vec![1, 2, 3, 4];
let request = build_join_request(node_id, Some(secret.clone()));
let request = build_join_request(
node_id,
session_id.clone(),
Some(secret.clone()),
None,
JoinType::Fresh,
);
match request.message {
| SyncMessage::JoinRequest {
node_id: _,
session_id: req_session_id,
session_secret,
last_known_clock,
join_type,
} => {
assert_eq!(req_session_id, session_id);
assert_eq!(session_secret, Some(secret));
assert!(last_known_clock.is_none());
assert!(matches!(join_type, JoinType::Fresh));
},
| _ => panic!("Expected JoinRequest"),
}
}
#[test]
fn test_build_join_request_rejoin() {
let node_id = uuid::Uuid::new_v4();
let session_id = SessionId::new();
let clock = VectorClock::new();
let join_type = JoinType::Rejoin {
last_active: 1234567890,
entity_count: 42,
};
let request = build_join_request(
node_id,
session_id.clone(),
None,
Some(clock.clone()),
join_type.clone(),
);
match request.message {
| SyncMessage::JoinRequest {
node_id: req_node_id,
session_id: req_session_id,
session_secret,
last_known_clock,
join_type: req_join_type,
} => {
assert_eq!(req_node_id, node_id);
assert_eq!(req_session_id, session_id);
assert!(session_secret.is_none());
assert_eq!(last_known_clock, Some(clock));
assert!(matches!(req_join_type, JoinType::Rejoin { .. }));
},
| _ => panic!("Expected JoinRequest"),
}

View File

@@ -0,0 +1,682 @@
//! Entity lock system for collaborative editing
//!
//! Provides optimistic entity locking to prevent concurrent modifications.
//! Locks are acquired when entities are selected and released when deselected.
//!
//! # Lock Protocol
//!
//! 1. **Acquisition**: User selects entity → broadcast `LockRequest`
//! 2. **Optimistic Apply**: All peers apply lock locally
//! 3. **Confirm**: Holder broadcasts `LockAcquired`
//! 4. **Conflict Resolution**: If two nodes acquire simultaneously, higher node ID wins
//! 5. **Release**: User deselects entity → broadcast `LockReleased`
//! 6. **Timeout**: 5-second timeout as crash recovery fallback
//!
//! # Example
//!
//! ```no_run
//! use bevy::prelude::*;
//! use lib::networking::{EntityLockRegistry, acquire_entity_lock, release_entity_lock};
//! use uuid::Uuid;
//!
//! fn my_system(world: &mut World) {
//! let entity_id = Uuid::new_v4();
//! let node_id = Uuid::new_v4();
//!
//! let mut registry = world.resource_mut::<EntityLockRegistry>();
//!
//! // Acquire lock when user selects entity
//! registry.try_acquire(entity_id, node_id);
//!
//! // Release lock when user deselects entity
//! registry.release(entity_id, node_id);
//! }
//! ```
use std::{
collections::HashMap,
time::{
Duration,
Instant,
},
};
use bevy::prelude::*;
use serde::{
Deserialize,
Serialize,
};
use uuid::Uuid;
use crate::networking::{
GossipBridge,
NetworkedSelection,
NodeId,
VersionedMessage,
delta_generation::NodeVectorClock,
messages::SyncMessage,
};
/// Duration before a lock automatically expires (crash recovery)
pub const LOCK_TIMEOUT: Duration = Duration::from_secs(5);
/// Maximum number of concurrent locks per node (rate limiting)
pub const MAX_LOCKS_PER_NODE: usize = 100;
/// Lock acquisition/release messages
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum LockMessage {
/// Request to acquire a lock on an entity
LockRequest {
entity_id: Uuid,
node_id: NodeId,
},
/// Confirmation that a lock was successfully acquired
LockAcquired {
entity_id: Uuid,
holder: NodeId,
},
/// Lock acquisition failed (already locked by another node)
LockRejected {
entity_id: Uuid,
requester: NodeId,
current_holder: NodeId,
},
/// Heartbeat to renew a held lock (sent ~1/sec by holder)
///
/// If no heartbeat is received for 5 seconds, the lock expires.
/// This provides automatic crash recovery without explicit timeouts.
LockHeartbeat {
entity_id: Uuid,
holder: NodeId,
},
/// Request to release a lock
LockRelease {
entity_id: Uuid,
node_id: NodeId,
},
/// Confirmation that a lock was released
LockReleased {
entity_id: Uuid,
},
}
/// Information about an active entity lock
#[derive(Debug, Clone)]
pub struct EntityLock {
/// ID of the entity being locked
pub entity_id: Uuid,
/// Node that holds the lock
pub holder: NodeId,
/// When the last heartbeat was received (or when lock was acquired)
pub last_heartbeat: Instant,
/// Lock timeout duration (expires if no heartbeat for this long)
pub timeout: Duration,
}
impl EntityLock {
/// Create a new entity lock
pub fn new(entity_id: Uuid, holder: NodeId) -> Self {
Self {
entity_id,
holder,
last_heartbeat: Instant::now(),
timeout: LOCK_TIMEOUT,
}
}
/// Renew the lock with a heartbeat
pub fn renew(&mut self) {
self.last_heartbeat = Instant::now();
}
/// Check if the lock has expired (no heartbeat for > timeout)
pub fn is_expired(&self) -> bool {
self.last_heartbeat.elapsed() >= self.timeout
}
/// Check if this lock is held by the given node
pub fn is_held_by(&self, node_id: NodeId) -> bool {
self.holder == node_id
}
}
/// Registry of all active entity locks
///
/// This resource tracks which entities are locked and by whom.
/// It's used to prevent concurrent modifications to the same entity.
#[derive(Resource, Default)]
pub struct EntityLockRegistry {
/// Map of entity ID to lock info
locks: HashMap<Uuid, EntityLock>,
/// Count of locks held by each node (for rate limiting)
locks_per_node: HashMap<NodeId, usize>,
}
impl EntityLockRegistry {
/// Create a new empty lock registry
pub fn new() -> Self {
Self {
locks: HashMap::new(),
locks_per_node: HashMap::new(),
}
}
/// Try to acquire a lock on an entity
///
/// Returns Ok(()) if lock was acquired, Err with current holder if already locked.
pub fn try_acquire(&mut self, entity_id: Uuid, node_id: NodeId) -> Result<(), NodeId> {
// Check if already locked
if let Some(existing_lock) = self.locks.get(&entity_id) {
// If expired, allow re-acquisition
if !existing_lock.is_expired() {
return Err(existing_lock.holder);
}
// Remove expired lock
self.remove_lock(entity_id);
}
// Check rate limit
let node_lock_count = self.locks_per_node.get(&node_id).copied().unwrap_or(0);
if node_lock_count >= MAX_LOCKS_PER_NODE {
warn!(
"Node {} at lock limit ({}/{}), rejecting acquisition",
node_id, node_lock_count, MAX_LOCKS_PER_NODE
);
return Err(node_id); // Return self as "holder" to indicate rate limit
}
// Acquire the lock
let lock = EntityLock::new(entity_id, node_id);
self.locks.insert(entity_id, lock);
// Update node lock count
*self.locks_per_node.entry(node_id).or_insert(0) += 1;
debug!("Lock acquired: entity {} by node {}", entity_id, node_id);
Ok(())
}
/// Release a lock on an entity
///
/// Only succeeds if the node currently holds the lock.
pub fn release(&mut self, entity_id: Uuid, node_id: NodeId) -> bool {
if let Some(lock) = self.locks.get(&entity_id) {
if lock.holder == node_id {
self.remove_lock(entity_id);
debug!("Lock released: entity {} by node {}", entity_id, node_id);
return true;
} else {
warn!(
"Node {} tried to release lock held by node {}",
node_id, lock.holder
);
}
}
false
}
/// Force release a lock (for timeout cleanup)
pub fn force_release(&mut self, entity_id: Uuid) {
if self.locks.remove(&entity_id).is_some() {
debug!("Lock force-released: entity {}", entity_id);
}
}
/// Check if an entity is locked
pub fn is_locked(&self, entity_id: Uuid) -> bool {
self.locks.get(&entity_id).map_or(false, |lock| !lock.is_expired())
}
/// Check if an entity is locked by a specific node
pub fn is_locked_by(&self, entity_id: Uuid, node_id: NodeId) -> bool {
self.locks
.get(&entity_id)
.map_or(false, |lock| !lock.is_expired() && lock.holder == node_id)
}
/// Get the holder of a lock (if locked)
pub fn get_holder(&self, entity_id: Uuid) -> Option<NodeId> {
self.locks.get(&entity_id).and_then(|lock| {
if !lock.is_expired() {
Some(lock.holder)
} else {
None
}
})
}
/// Renew a lock's heartbeat
///
/// Returns true if the heartbeat was renewed, false if lock doesn't exist
/// or is held by a different node.
pub fn renew_heartbeat(&mut self, entity_id: Uuid, node_id: NodeId) -> bool {
if let Some(lock) = self.locks.get_mut(&entity_id) {
if lock.holder == node_id {
lock.renew();
return true;
}
}
false
}
/// Get all expired locks
pub fn get_expired_locks(&self) -> Vec<Uuid> {
self.locks
.iter()
.filter(|(_, lock)| lock.is_expired())
.map(|(entity_id, _)| *entity_id)
.collect()
}
/// Get number of locks held by a node
pub fn get_node_lock_count(&self, node_id: NodeId) -> usize {
self.locks_per_node.get(&node_id).copied().unwrap_or(0)
}
/// Get total number of active locks
pub fn total_locks(&self) -> usize {
self.locks.len()
}
/// Remove a lock and update bookkeeping
fn remove_lock(&mut self, entity_id: Uuid) {
if let Some(lock) = self.locks.remove(&entity_id) {
// Decrement node lock count
if let Some(count) = self.locks_per_node.get_mut(&lock.holder) {
*count = count.saturating_sub(1);
if *count == 0 {
self.locks_per_node.remove(&lock.holder);
}
}
}
}
/// Test helper: Manually expire a lock by setting its heartbeat timestamp to the past
///
/// This is only intended for testing purposes to simulate lock expiration without waiting.
pub fn expire_lock_for_testing(&mut self, entity_id: Uuid) {
if let Some(lock) = self.locks.get_mut(&entity_id) {
lock.last_heartbeat = Instant::now() - Duration::from_secs(10);
}
}
}
/// System to release locks when entities are deselected
///
/// This system detects when entities are removed from selection and releases
/// any locks held on those entities, broadcasting the release to other peers.
///
/// Add to your app as an Update system:
/// ```no_run
/// use bevy::prelude::*;
/// use lib::networking::release_locks_on_deselection_system;
///
/// App::new().add_systems(Update, release_locks_on_deselection_system);
/// ```
pub fn release_locks_on_deselection_system(
mut registry: ResMut<EntityLockRegistry>,
node_clock: Res<NodeVectorClock>,
bridge: Option<Res<GossipBridge>>,
mut selection_query: Query<&mut NetworkedSelection, Changed<NetworkedSelection>>,
) {
let node_id = node_clock.node_id;
for selection in selection_query.iter_mut() {
// Find entities that were previously locked but are no longer selected
let currently_selected: std::collections::HashSet<Uuid> = selection.selected_ids.clone();
// Check all locks held by this node
let locks_to_release: Vec<Uuid> = registry
.locks
.iter()
.filter(|(entity_id, lock)| {
// Release if held by us and not currently selected
lock.holder == node_id && !currently_selected.contains(entity_id)
})
.map(|(entity_id, _)| *entity_id)
.collect();
// Release each lock and broadcast
for entity_id in locks_to_release {
if registry.release(entity_id, node_id) {
debug!("Releasing lock on deselected entity {}", entity_id);
// Broadcast LockRelease
if let Some(ref bridge) = bridge {
let msg = VersionedMessage::new(SyncMessage::Lock(LockMessage::LockRelease {
entity_id,
node_id,
}));
if let Err(e) = bridge.send(msg) {
error!("Failed to broadcast LockRelease on deselection: {}", e);
} else {
info!("Lock released on deselection: entity {}", entity_id);
}
}
}
}
}
}
/// System to clean up expired locks (crash recovery)
///
/// This system periodically removes locks that have exceeded their timeout
/// duration (default 5 seconds). This provides crash recovery - if a node
/// crashes while holding a lock, it will eventually expire.
///
/// Add to your app as an Update system:
/// ```no_run
/// use bevy::prelude::*;
/// use lib::networking::cleanup_expired_locks_system;
///
/// App::new().add_systems(Update, cleanup_expired_locks_system);
/// ```
pub fn cleanup_expired_locks_system(
mut registry: ResMut<EntityLockRegistry>,
bridge: Option<Res<GossipBridge>>,
) {
let expired = registry.get_expired_locks();
if !expired.is_empty() {
info!("Cleaning up {} expired locks", expired.len());
for entity_id in expired {
debug!("Force-releasing expired lock on entity {}", entity_id);
registry.force_release(entity_id);
// Broadcast LockReleased
if let Some(ref bridge) = bridge {
let msg =
VersionedMessage::new(SyncMessage::Lock(LockMessage::LockReleased { entity_id }));
if let Err(e) = bridge.send(msg) {
error!("Failed to broadcast LockReleased for expired lock: {}", e);
} else {
info!("Expired lock cleaned up: entity {}", entity_id);
}
}
}
}
}
/// System to broadcast heartbeats for all locks we currently hold
///
/// This system runs periodically (~1/sec) and broadcasts a heartbeat for each
/// lock this node holds. This keeps locks alive and provides crash detection -
/// if a node crashes, heartbeats stop and locks expire after 5 seconds.
///
/// Add to your app as an Update system with a run condition to throttle it:
/// ```no_run
/// use bevy::prelude::*;
/// use bevy::time::common_conditions::on_timer;
/// use std::time::Duration;
/// use lib::networking::broadcast_lock_heartbeats_system;
///
/// App::new().add_systems(Update,
/// broadcast_lock_heartbeats_system.run_if(on_timer(Duration::from_secs(1)))
/// );
/// ```
pub fn broadcast_lock_heartbeats_system(
registry: Res<EntityLockRegistry>,
node_clock: Res<NodeVectorClock>,
bridge: Option<Res<GossipBridge>>,
) {
let node_id = node_clock.node_id;
// Find all locks held by this node
let our_locks: Vec<Uuid> = registry
.locks
.iter()
.filter(|(_, lock)| lock.holder == node_id && !lock.is_expired())
.map(|(entity_id, _)| *entity_id)
.collect();
if our_locks.is_empty() {
return;
}
debug!("Broadcasting {} lock heartbeats", our_locks.len());
// Broadcast heartbeat for each lock
if let Some(ref bridge) = bridge {
for entity_id in our_locks {
let msg = VersionedMessage::new(SyncMessage::Lock(LockMessage::LockHeartbeat {
entity_id,
holder: node_id,
}));
if let Err(e) = bridge.send(msg) {
error!(
"Failed to broadcast heartbeat for entity {}: {}",
entity_id, e
);
} else {
trace!("Heartbeat sent for locked entity {}", entity_id);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_lock_acquisition() {
let mut registry = EntityLockRegistry::new();
let entity_id = Uuid::new_v4();
let node_id = Uuid::new_v4();
// Should acquire successfully
assert!(registry.try_acquire(entity_id, node_id).is_ok());
assert!(registry.is_locked(entity_id));
assert!(registry.is_locked_by(entity_id, node_id));
assert_eq!(registry.get_holder(entity_id), Some(node_id));
}
#[test]
fn test_lock_conflict() {
let mut registry = EntityLockRegistry::new();
let entity_id = Uuid::new_v4();
let node1 = Uuid::new_v4();
let node2 = Uuid::new_v4();
// Node 1 acquires
assert!(registry.try_acquire(entity_id, node1).is_ok());
// Node 2 should be rejected
assert_eq!(registry.try_acquire(entity_id, node2), Err(node1));
}
#[test]
fn test_lock_release() {
let mut registry = EntityLockRegistry::new();
let entity_id = Uuid::new_v4();
let node_id = Uuid::new_v4();
// Acquire and release
registry.try_acquire(entity_id, node_id).unwrap();
assert!(registry.release(entity_id, node_id));
assert!(!registry.is_locked(entity_id));
}
#[test]
fn test_wrong_node_cannot_release() {
let mut registry = EntityLockRegistry::new();
let entity_id = Uuid::new_v4();
let node1 = Uuid::new_v4();
let node2 = Uuid::new_v4();
// Node 1 acquires
registry.try_acquire(entity_id, node1).unwrap();
// Node 2 cannot release
assert!(!registry.release(entity_id, node2));
assert!(registry.is_locked(entity_id));
assert!(registry.is_locked_by(entity_id, node1));
}
#[test]
fn test_lock_timeout() {
let mut registry = EntityLockRegistry::new();
let entity_id = Uuid::new_v4();
let node_id = Uuid::new_v4();
// Acquire with very short timeout
registry.try_acquire(entity_id, node_id).unwrap();
// Manually set timeout to 0 for testing
if let Some(lock) = registry.locks.get_mut(&entity_id) {
lock.timeout = Duration::from_secs(0);
}
// Should be detected as expired
let expired = registry.get_expired_locks();
assert_eq!(expired.len(), 1);
assert_eq!(expired[0], entity_id);
}
#[test]
fn test_force_release() {
let mut registry = EntityLockRegistry::new();
let entity_id = Uuid::new_v4();
let node_id = Uuid::new_v4();
registry.try_acquire(entity_id, node_id).unwrap();
registry.force_release(entity_id);
assert!(!registry.is_locked(entity_id));
}
#[test]
fn test_rate_limiting() {
let mut registry = EntityLockRegistry::new();
let node_id = Uuid::new_v4();
// Acquire MAX_LOCKS_PER_NODE locks
for _ in 0..MAX_LOCKS_PER_NODE {
let entity_id = Uuid::new_v4();
assert!(registry.try_acquire(entity_id, node_id).is_ok());
}
// Next acquisition should fail (rate limit)
let entity_id = Uuid::new_v4();
assert!(registry.try_acquire(entity_id, node_id).is_err());
}
#[test]
fn test_node_lock_count() {
let mut registry = EntityLockRegistry::new();
let node_id = Uuid::new_v4();
assert_eq!(registry.get_node_lock_count(node_id), 0);
// Acquire 3 locks
for _ in 0..3 {
let entity_id = Uuid::new_v4();
registry.try_acquire(entity_id, node_id).unwrap();
}
assert_eq!(registry.get_node_lock_count(node_id), 3);
assert_eq!(registry.total_locks(), 3);
}
#[test]
fn test_lock_message_serialization() {
let entity_id = Uuid::new_v4();
let node_id = Uuid::new_v4();
let messages = vec![
LockMessage::LockRequest { entity_id, node_id },
LockMessage::LockAcquired {
entity_id,
holder: node_id,
},
LockMessage::LockRejected {
entity_id,
requester: node_id,
current_holder: Uuid::new_v4(),
},
LockMessage::LockHeartbeat {
entity_id,
holder: node_id,
},
LockMessage::LockRelease { entity_id, node_id },
LockMessage::LockReleased { entity_id },
];
for message in messages {
let bytes = bincode::serialize(&message).unwrap();
let deserialized: LockMessage = bincode::deserialize(&bytes).unwrap();
assert_eq!(message, deserialized);
}
}
#[test]
fn test_heartbeat_renewal() {
let mut registry = EntityLockRegistry::new();
let entity_id = Uuid::new_v4();
let node_id = Uuid::new_v4();
// Acquire lock
registry.try_acquire(entity_id, node_id).unwrap();
// Get initial heartbeat time
let initial_heartbeat = registry.locks.get(&entity_id).unwrap().last_heartbeat;
// Sleep a bit to ensure time difference
std::thread::sleep(std::time::Duration::from_millis(10));
// Renew heartbeat
assert!(registry.renew_heartbeat(entity_id, node_id));
// Check that heartbeat was updated
let updated_heartbeat = registry.locks.get(&entity_id).unwrap().last_heartbeat;
assert!(updated_heartbeat > initial_heartbeat);
}
#[test]
fn test_heartbeat_wrong_node() {
let mut registry = EntityLockRegistry::new();
let entity_id = Uuid::new_v4();
let node1 = Uuid::new_v4();
let node2 = Uuid::new_v4();
// Node 1 acquires
registry.try_acquire(entity_id, node1).unwrap();
// Node 2 tries to renew heartbeat - should fail
assert!(!registry.renew_heartbeat(entity_id, node2));
}
#[test]
fn test_heartbeat_expiration() {
let mut registry = EntityLockRegistry::new();
let entity_id = Uuid::new_v4();
let node_id = Uuid::new_v4();
// Acquire with very short timeout
registry.try_acquire(entity_id, node_id).unwrap();
// Manually set timeout to 0 for testing
if let Some(lock) = registry.locks.get_mut(&entity_id) {
lock.timeout = Duration::from_secs(0);
}
// Should be detected as expired
let expired = registry.get_expired_locks();
assert_eq!(expired.len(), 1);
assert_eq!(expired[0], entity_id);
}
}

View File

@@ -11,8 +11,10 @@ use bevy::{
use crate::networking::{
GossipBridge,
JoinType,
NetworkedEntity,
TombstoneRegistry,
VersionedMessage,
apply_entity_delta,
apply_full_state,
blob_support::BlobStore,
@@ -101,12 +103,18 @@ fn dispatch_message(world: &mut World, message: crate::networking::VersionedMess
apply_entity_delta(&delta, world);
},
// JoinRequest - new peer joining
// JoinRequest - new peer joining (or rejoining)
| SyncMessage::JoinRequest {
node_id,
session_id,
session_secret,
last_known_clock,
join_type,
} => {
info!("Received JoinRequest from node {}", node_id);
info!(
"Received JoinRequest from node {} for session {} (type: {:?})",
node_id, session_id, join_type
);
// Validate session secret if configured
if let Some(expected) = world.get_resource::<SessionSecret>() {
@@ -133,14 +141,21 @@ fn dispatch_message(world: &mut World, message: crate::networking::VersionedMess
debug!("Session secret provided but none configured, accepting");
}
// Build and send full state
// We need to collect data in separate steps to avoid borrow conflicts
// Hybrid join protocol: decide between FullState and MissingDeltas
// Fresh joins always get FullState
// Rejoins get deltas if <1000 operations, otherwise FullState
let response = match (&join_type, &last_known_clock) {
// Fresh join or no clock provided → send FullState
| (JoinType::Fresh, _) | (_, None) => {
info!("Fresh join from node {} - sending FullState", node_id);
// Collect networked entities
let networked_entities = {
let mut query = world.query::<(Entity, &NetworkedEntity)>();
query.iter(world).collect::<Vec<_>>()
};
let full_state = {
// Build full state
let type_registry = world.resource::<AppTypeRegistry>().read();
let node_clock = world.resource::<NodeVectorClock>();
let blob_store = world.get_resource::<BlobStore>();
@@ -152,14 +167,67 @@ fn dispatch_message(world: &mut World, message: crate::networking::VersionedMess
&node_clock,
blob_store.map(|b| b as &BlobStore),
)
},
// Rejoin with known clock → check delta count
| (JoinType::Rejoin { .. }, Some(their_clock)) => {
info!(
"Rejoin from node {} - checking delta count since last known clock",
node_id
);
// Get operation log and check missing deltas
let operation_log = world.resource::<crate::networking::OperationLog>();
let missing_deltas =
operation_log.get_all_operations_newer_than(their_clock);
// If delta count is small (<= 1000 ops), send deltas
// Otherwise fall back to full state
if missing_deltas.len() <= 1000 {
info!(
"Rejoin from node {} - sending {} MissingDeltas (efficient rejoin)",
node_id,
missing_deltas.len()
);
VersionedMessage::new(SyncMessage::MissingDeltas {
deltas: missing_deltas,
})
} else {
info!(
"Rejoin from node {} - delta count {} exceeds threshold, sending FullState",
node_id,
missing_deltas.len()
);
// Collect networked entities
let networked_entities = {
let mut query = world.query::<(Entity, &NetworkedEntity)>();
query.iter(world).collect::<Vec<_>>()
};
// Get bridge to send response
// Build full state
let type_registry = world.resource::<AppTypeRegistry>().read();
let node_clock = world.resource::<NodeVectorClock>();
let blob_store = world.get_resource::<BlobStore>();
build_full_state_from_data(
world,
&networked_entities,
&type_registry,
&node_clock,
blob_store.map(|b| b as &BlobStore),
)
}
},
};
// Send response
if let Some(bridge) = world.get_resource::<GossipBridge>() {
if let Err(e) = bridge.send(full_state) {
error!("Failed to send FullState: {}", e);
if let Err(e) = bridge.send(response) {
error!("Failed to send join response: {}", e);
} else {
info!("Sent FullState to node {}", node_id);
info!("Sent join response to node {}", node_id);
}
}
},
@@ -252,6 +320,112 @@ fn dispatch_message(world: &mut World, message: crate::networking::VersionedMess
apply_entity_delta(&delta, world);
}
},
// Lock - entity lock protocol messages
| SyncMessage::Lock(lock_msg) => {
use crate::networking::LockMessage;
if let Some(mut registry) = world.get_resource_mut::<crate::networking::EntityLockRegistry>() {
match lock_msg {
| LockMessage::LockRequest { entity_id, node_id } => {
debug!("Received LockRequest for entity {} from node {}", entity_id, node_id);
match registry.try_acquire(entity_id, node_id) {
| Ok(()) => {
// Acquired successfully - broadcast confirmation
if let Some(bridge) = world.get_resource::<GossipBridge>() {
let msg = VersionedMessage::new(SyncMessage::Lock(
LockMessage::LockAcquired {
entity_id,
holder: node_id,
},
));
if let Err(e) = bridge.send(msg) {
error!("Failed to broadcast LockAcquired: {}", e);
} else {
info!("Lock acquired: entity {} by node {}", entity_id, node_id);
}
}
},
| Err(current_holder) => {
// Already locked - send rejection
if let Some(bridge) = world.get_resource::<GossipBridge>() {
let msg = VersionedMessage::new(SyncMessage::Lock(
LockMessage::LockRejected {
entity_id,
requester: node_id,
current_holder,
},
));
if let Err(e) = bridge.send(msg) {
error!("Failed to send LockRejected: {}", e);
} else {
debug!("Lock rejected: entity {} requested by {} (held by {})",
entity_id, node_id, current_holder);
}
}
},
}
},
| LockMessage::LockAcquired { entity_id, holder } => {
debug!("Received LockAcquired for entity {} by node {}", entity_id, holder);
// Lock already applied optimistically, just log confirmation
},
| LockMessage::LockRejected {
entity_id,
requester,
current_holder,
} => {
warn!(
"Lock rejected: entity {} requested by {} (held by {})",
entity_id, requester, current_holder
);
// Could trigger UI notification here
},
| LockMessage::LockHeartbeat { entity_id, holder } => {
trace!("Received LockHeartbeat for entity {} from node {}", entity_id, holder);
// Renew the lock's heartbeat timestamp
if registry.renew_heartbeat(entity_id, holder) {
trace!("Lock heartbeat renewed: entity {} by node {}", entity_id, holder);
} else {
debug!(
"Received heartbeat for entity {} from {}, but lock not found or holder mismatch",
entity_id, holder
);
}
},
| LockMessage::LockRelease { entity_id, node_id } => {
debug!("Received LockRelease for entity {} from node {}", entity_id, node_id);
if registry.release(entity_id, node_id) {
// Broadcast confirmation
if let Some(bridge) = world.get_resource::<GossipBridge>() {
let msg = VersionedMessage::new(SyncMessage::Lock(
LockMessage::LockReleased { entity_id },
));
if let Err(e) = bridge.send(msg) {
error!("Failed to broadcast LockReleased: {}", e);
} else {
info!("Lock released: entity {}", entity_id);
}
}
}
},
| LockMessage::LockReleased { entity_id } => {
debug!("Received LockReleased for entity {}", entity_id);
// Lock already released locally, just log confirmation
},
}
} else {
warn!("Received lock message but EntityLockRegistry not available");
}
},
}
}

View File

@@ -9,7 +9,9 @@ use serde::{
};
use crate::networking::{
locks::LockMessage,
operations::ComponentOp,
session::SessionId,
vector_clock::{
NodeId,
VectorClock,
@@ -42,6 +44,22 @@ impl VersionedMessage {
}
}
/// Join request type - distinguishes fresh joins from rejoin attempts
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum JoinType {
/// Fresh join - never connected to this session before
Fresh,
/// Rejoin - returning to a session we left earlier
Rejoin {
/// When we were last active in this session (Unix timestamp)
last_active: i64,
/// Cached entity count from when we left
entity_count: usize,
},
}
/// CRDT synchronization protocol messages
///
/// These messages implement the sync protocol defined in RFC 0001.
@@ -56,14 +74,25 @@ impl VersionedMessage {
pub enum SyncMessage {
/// Request to join the network and receive full state
///
/// Sent by a new peer when it first connects. The response will be a
/// `FullState` message containing all entities and their components.
/// Sent by a new peer when it first connects. For fresh joins, the response
/// will be a `FullState` message. For rejoins with small deltas (<1000 ops),
/// the response will be `MissingDeltas`.
JoinRequest {
/// ID of the node requesting to join
node_id: NodeId,
/// Session ID to join
session_id: SessionId,
/// Optional session secret for authentication
session_secret: Option<Vec<u8>>,
/// Vector clock from when we last left this session
/// None = fresh join, Some = rejoin
last_known_clock: Option<VectorClock>,
/// Type of join (fresh or rejoin with metadata)
join_type: JoinType,
},
/// Complete world state sent to new peers
@@ -116,6 +145,12 @@ pub enum SyncMessage {
/// Entity deltas that the recipient is missing
deltas: Vec<EntityDelta>,
},
/// Entity lock protocol messages
///
/// Used for collaborative editing to prevent concurrent modifications.
/// Locks are acquired when entities are selected and released when deselected.
Lock(LockMessage),
}
/// Complete state of a single entity
@@ -262,9 +297,13 @@ mod tests {
#[test]
fn test_versioned_message_creation() {
let node_id = uuid::Uuid::new_v4();
let session_id = SessionId::new();
let message = SyncMessage::JoinRequest {
node_id,
session_id,
session_secret: None,
last_known_clock: None,
join_type: JoinType::Fresh,
};
let versioned = VersionedMessage::new(message);
@@ -306,9 +345,13 @@ mod tests {
#[test]
fn test_message_serialization() -> bincode::Result<()> {
let node_id = uuid::Uuid::new_v4();
let session_id = SessionId::new();
let message = SyncMessage::JoinRequest {
node_id,
session_id,
session_secret: None,
last_known_clock: None,
join_type: JoinType::Fresh,
};
let versioned = VersionedMessage::new(message);
@@ -343,4 +386,132 @@ mod tests {
Ok(())
}
#[test]
fn test_join_type_fresh() {
let join_type = JoinType::Fresh;
// Fresh join should serialize correctly
let bytes = bincode::serialize(&join_type).unwrap();
let deserialized: JoinType = bincode::deserialize(&bytes).unwrap();
assert!(matches!(deserialized, JoinType::Fresh));
}
#[test]
fn test_join_type_rejoin() {
let join_type = JoinType::Rejoin {
last_active: 1234567890,
entity_count: 42,
};
// Rejoin should serialize correctly
let bytes = bincode::serialize(&join_type).unwrap();
let deserialized: JoinType = bincode::deserialize(&bytes).unwrap();
match deserialized {
| JoinType::Rejoin {
last_active,
entity_count,
} => {
assert_eq!(last_active, 1234567890);
assert_eq!(entity_count, 42);
},
| _ => panic!("Expected JoinType::Rejoin"),
}
}
#[test]
fn test_hybrid_join_protocol_fresh() {
// Fresh join should have no last_known_clock
let node_id = uuid::Uuid::new_v4();
let session_id = SessionId::new();
let message = SyncMessage::JoinRequest {
node_id,
session_id,
session_secret: None,
last_known_clock: None,
join_type: JoinType::Fresh,
};
let bytes = bincode::serialize(&message).unwrap();
let deserialized: SyncMessage = bincode::deserialize(&bytes).unwrap();
match deserialized {
| SyncMessage::JoinRequest {
join_type,
last_known_clock,
..
} => {
assert!(matches!(join_type, JoinType::Fresh));
assert!(last_known_clock.is_none());
},
| _ => panic!("Expected JoinRequest"),
}
}
#[test]
fn test_hybrid_join_protocol_rejoin() {
// Rejoin should have last_known_clock
let node_id = uuid::Uuid::new_v4();
let session_id = SessionId::new();
let clock = VectorClock::new();
let message = SyncMessage::JoinRequest {
node_id,
session_id,
session_secret: None,
last_known_clock: Some(clock.clone()),
join_type: JoinType::Rejoin {
last_active: 1234567890,
entity_count: 100,
},
};
let bytes = bincode::serialize(&message).unwrap();
let deserialized: SyncMessage = bincode::deserialize(&bytes).unwrap();
match deserialized {
| SyncMessage::JoinRequest {
join_type,
last_known_clock,
..
} => {
assert!(matches!(join_type, JoinType::Rejoin { .. }));
assert_eq!(last_known_clock, Some(clock));
},
| _ => panic!("Expected JoinRequest"),
}
}
#[test]
fn test_missing_deltas_serialization() -> bincode::Result<()> {
// Test that MissingDeltas message serializes correctly
let node_id = uuid::Uuid::new_v4();
let entity_id = uuid::Uuid::new_v4();
let clock = VectorClock::new();
let delta = EntityDelta {
entity_id,
node_id,
vector_clock: clock,
operations: vec![],
};
let message = SyncMessage::MissingDeltas {
deltas: vec![delta],
};
let bytes = bincode::serialize(&message)?;
let deserialized: SyncMessage = bincode::deserialize(&bytes)?;
match deserialized {
| SyncMessage::MissingDeltas { deltas } => {
assert_eq!(deltas.len(), 1);
assert_eq!(deltas[0].entity_id, entity_id);
},
| _ => panic!("Expected MissingDeltas"),
}
Ok(())
}
}

View File

@@ -41,6 +41,7 @@ mod entity_map;
mod error;
mod gossip_bridge;
mod join_protocol;
mod locks;
mod merge;
mod message_dispatcher;
mod messages;
@@ -50,6 +51,8 @@ mod operations;
mod orset;
mod plugin;
mod rga;
mod session;
mod session_lifecycle;
mod sync_component;
mod tombstones;
mod vector_clock;
@@ -64,6 +67,7 @@ pub use entity_map::*;
pub use error::*;
pub use gossip_bridge::*;
pub use join_protocol::*;
pub use locks::*;
pub use merge::*;
pub use message_dispatcher::*;
pub use messages::*;
@@ -73,6 +77,8 @@ pub use operations::*;
pub use orset::*;
pub use plugin::*;
pub use rga::*;
pub use session::*;
pub use session_lifecycle::*;
pub use sync_component::*;
pub use tombstones::*;
pub use vector_clock::*;

View File

@@ -43,12 +43,22 @@ use crate::networking::{
cleanup_despawned_entities_system,
register_networked_entities_system,
},
locks::{
EntityLockRegistry,
broadcast_lock_heartbeats_system,
cleanup_expired_locks_system,
release_locks_on_deselection_system,
},
message_dispatcher::message_dispatcher_system,
operation_log::{
OperationLog,
periodic_sync_system,
prune_operation_log_system,
},
session_lifecycle::{
initialize_session_system,
save_session_on_shutdown_system,
},
tombstones::{
TombstoneRegistry,
garbage_collect_tombstones_system,
@@ -139,6 +149,9 @@ impl SessionSecret {
///
/// # Systems Added
///
/// ## Startup
/// - Initialize or restore session from persistence (auto-rejoin)
///
/// ## PreUpdate
/// - Register newly spawned networked entities
/// - **Central message dispatcher** (handles all incoming messages efficiently)
@@ -147,17 +160,25 @@ impl SessionSecret {
/// - FullState messages
/// - SyncRequest messages
/// - MissingDeltas messages
/// - Lock messages (LockRequest, LockAcquired, LockRejected, LockHeartbeat, LockRelease, LockReleased)
///
/// ## Update
/// - Auto-detect Transform changes
/// - Handle local entity deletions
/// - Release locks when entities are deselected
///
/// ## PostUpdate
/// - Generate and broadcast EntityDelta for changed entities
/// - Periodic SyncRequest for anti-entropy
/// - Broadcast lock heartbeats to maintain active locks
/// - Prune old operations from operation log
/// - Garbage collect tombstones
/// - Cleanup expired locks (5-second timeout)
/// - Cleanup despawned entities from entity map
///
/// ## Last
/// - Save session state and vector clock to persistence
///
/// # Resources Added
///
/// - `NodeVectorClock` - This node's vector clock
@@ -165,6 +186,7 @@ impl SessionSecret {
/// - `LastSyncVersions` - Change detection for entities
/// - `OperationLog` - Operation log for anti-entropy
/// - `TombstoneRegistry` - Tombstone tracking for deletions
/// - `EntityLockRegistry` - Entity lock registry with heartbeat tracking
///
/// # Example
///
@@ -213,8 +235,12 @@ impl Plugin for NetworkingPlugin {
.insert_resource(LastSyncVersions::default())
.insert_resource(OperationLog::new())
.insert_resource(TombstoneRegistry::new())
.insert_resource(EntityLockRegistry::new())
.insert_resource(crate::networking::ComponentVectorClocks::new());
// Startup systems - initialize session from persistence
app.add_systems(Startup, initialize_session_system);
// PreUpdate systems - handle incoming messages first
app.add_systems(
PreUpdate,
@@ -237,6 +263,8 @@ impl Plugin for NetworkingPlugin {
auto_detect_transform_changes_system,
// Handle local entity deletions
handle_local_deletions_system,
// Release locks when entities are deselected
release_locks_on_deselection_system,
),
);
@@ -251,11 +279,23 @@ impl Plugin for NetworkingPlugin {
// Maintenance tasks
prune_operation_log_system,
garbage_collect_tombstones_system,
cleanup_expired_locks_system,
// Cleanup despawned entities
cleanup_despawned_entities_system,
),
);
// Broadcast lock heartbeats every 1 second to maintain active locks
app.add_systems(
PostUpdate,
broadcast_lock_heartbeats_system.run_if(bevy::time::common_conditions::on_timer(
std::time::Duration::from_secs(1),
)),
);
// Last schedule - save session state on shutdown
app.add_systems(Last, save_session_on_shutdown_system);
info!(
"NetworkingPlugin initialized for node {}",
self.config.node_id
@@ -333,6 +373,7 @@ mod tests {
assert!(app.world().get_resource::<LastSyncVersions>().is_some());
assert!(app.world().get_resource::<OperationLog>().is_some());
assert!(app.world().get_resource::<TombstoneRegistry>().is_some());
assert!(app.world().get_resource::<EntityLockRegistry>().is_some());
}
#[test]

View File

@@ -0,0 +1,465 @@
use std::fmt;
///! Session identification and lifecycle management
///!
///! This module provides session-scoped collaborative sessions with
/// human-readable ! session codes, ALPN-based network isolation, and persistent
/// session tracking.
use bevy::prelude::*;
use serde::{
Deserialize,
Serialize,
};
use uuid::Uuid;
use crate::networking::VectorClock;
/// Session identifier - UUID internally, human-readable code for display
///
/// Session IDs provide both technical uniqueness (UUID) and human usability
/// (abc-def-123 codes). All peers in a session share the same session ID.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct SessionId {
uuid: Uuid,
code: String,
}
impl SessionId {
/// Create a new random session ID
pub fn new() -> Self {
// Generate a random 9-character code
use rand::Rng;
const CHARSET: &[u8] = b"abcdefghjkmnpqrstuvwxyz23456789";
let mut rng = rand::thread_rng();
let mut code = String::with_capacity(11);
for i in 0..9 {
let idx = rng.gen_range(0..CHARSET.len());
code.push(CHARSET[idx] as char);
if i == 2 || i == 5 {
code.push('-');
}
}
// Hash the code to get a UUID
let mut hasher = blake3::Hasher::new();
hasher.update(b"/app/v1/session-code/");
hasher.update(code.as_bytes());
let hash = hasher.finalize();
let mut uuid_bytes = [0u8; 16];
uuid_bytes.copy_from_slice(&hash.as_bytes()[..16]);
let uuid = Uuid::from_bytes(uuid_bytes);
Self { uuid, code }
}
/// Parse a session code (format: abc-def-123)
///
/// Hashes the code to derive a deterministic UUID.
/// Returns error if code format is invalid.
pub fn from_code(code: &str) -> Result<Self, SessionError> {
// Validate format: xxx-yyy-zzz (11 chars total: 3 + dash + 3 + dash + 3)
if code.len() != 11 {
return Err(SessionError::InvalidCodeFormat);
}
// Check dashes at positions 3 and 7
let chars: Vec<char> = code.chars().collect();
if chars.len() != 11 || chars[3] != '-' || chars[7] != '-' {
return Err(SessionError::InvalidCodeFormat);
}
// Validate all characters are in the charset
const CHARSET: &str = "abcdefghjkmnpqrstuvwxyz23456789-";
let code_lower = code.to_lowercase();
if !code_lower.chars().all(|c| CHARSET.contains(c)) {
return Err(SessionError::InvalidCodeFormat);
}
// Hash the code to get a UUID (deterministic)
let mut hasher = blake3::Hasher::new();
hasher.update(b"/app/v1/session-code/");
hasher.update(code_lower.as_bytes());
let hash = hasher.finalize();
let mut uuid_bytes = [0u8; 16];
uuid_bytes.copy_from_slice(&hash.as_bytes()[..16]);
let uuid = Uuid::from_bytes(uuid_bytes);
Ok(Self {
uuid,
code: code_lower,
})
}
/// Convert to human-readable code (abc-def-123 format)
pub fn to_code(&self) -> &str {
&self.code
}
/// Derive ALPN identifier for network isolation
///
/// Computes deterministic 32-byte BLAKE3 hash from session UUID.
/// All peers independently compute the same ALPN from session code.
///
/// # Security
/// The domain separation prefix (`/app/v1/session-id/`) ensures ALPNs
/// cannot collide with other protocol uses of the same hash space.
pub fn to_alpn(&self) -> [u8; 32] {
let mut hasher = blake3::Hasher::new();
hasher.update(b"/app/v1/session-id/");
hasher.update(self.uuid.as_bytes());
let hash = hasher.finalize();
*hash.as_bytes()
}
/// Get raw UUID
pub fn as_uuid(&self) -> &Uuid {
&self.uuid
}
}
impl Default for SessionId {
fn default() -> Self {
Self::new()
}
}
impl fmt::Display for SessionId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", &self.code)
}
}
/// Session lifecycle states
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum SessionState {
/// Session exists in database but hasn't connected to network yet
Created,
/// Currently attempting to join network and sync state
Joining,
/// Fully synchronized and actively collaborating
Active,
/// Temporarily offline, will attempt to rejoin when network restored
Disconnected,
/// User explicitly left the session (clean shutdown)
Left,
}
impl fmt::Display for SessionState {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
| SessionState::Created => write!(f, "created"),
| SessionState::Joining => write!(f, "joining"),
| SessionState::Active => write!(f, "active"),
| SessionState::Disconnected => write!(f, "disconnected"),
| SessionState::Left => write!(f, "left"),
}
}
}
impl SessionState {
/// Parse from string representation
pub fn from_str(s: &str) -> Option<Self> {
match s {
| "created" => Some(SessionState::Created),
| "joining" => Some(SessionState::Joining),
| "active" => Some(SessionState::Active),
| "disconnected" => Some(SessionState::Disconnected),
| "left" => Some(SessionState::Left),
| _ => None,
}
}
}
/// Session metadata
///
/// Tracks session identity, creation time, entity count, and lifecycle state.
/// Persisted to database for crash recovery and auto-rejoin.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Session {
/// Unique session identifier
pub id: SessionId,
/// Optional human-readable name
pub name: Option<String>,
/// When the session was created (Unix timestamp)
pub created_at: i64,
/// When this node was last active in the session (Unix timestamp)
pub last_active: i64,
/// Cached count of entities in this session
pub entity_count: usize,
/// Current lifecycle state
pub state: SessionState,
/// Optional encrypted session secret for access control
pub secret: Option<Vec<u8>>,
}
impl Session {
/// Create a new session with default values
pub fn new(id: SessionId) -> Self {
let now = chrono::Utc::now().timestamp();
Self {
id,
name: None,
created_at: now,
last_active: now,
entity_count: 0,
state: SessionState::Created,
secret: None,
}
}
/// Update the last active timestamp to current time
pub fn touch(&mut self) {
self.last_active = chrono::Utc::now().timestamp();
}
/// Transition to a new state and update last active time
pub fn transition_to(&mut self, new_state: SessionState) {
tracing::info!(
"Session {} transitioning: {:?} -> {:?}",
self.id,
self.state,
new_state
);
self.state = new_state;
self.touch();
}
}
/// Current session resource for Bevy ECS
///
/// Contains both session metadata and the vector clock snapshot from when
/// we joined (for hybrid sync protocol).
#[derive(Resource, Clone)]
pub struct CurrentSession {
/// Session metadata
pub session: Session,
/// Vector clock when we last left/joined this session
/// Used for hybrid sync to request only missing deltas
pub last_known_clock: VectorClock,
}
impl CurrentSession {
/// Create a new current session
pub fn new(session: Session, last_known_clock: VectorClock) -> Self {
Self {
session,
last_known_clock,
}
}
/// Transition the session to a new state
pub fn transition_to(&mut self, new_state: SessionState) {
self.session.transition_to(new_state);
}
}
/// Session-related errors
#[derive(Debug, thiserror::Error)]
pub enum SessionError {
#[error("Invalid session code format (expected: abc-def-123)")]
InvalidCodeFormat,
#[error("Session not found")]
NotFound,
#[error("Database error: {0}")]
Database(String),
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_session_id_creation() {
let id1 = SessionId::new();
let id2 = SessionId::new();
// Different session IDs should be different
assert_ne!(id1, id2);
}
#[test]
fn test_session_code_roundtrip() {
let id = SessionId::new();
let code = id.to_code();
// Code should be 11 characters: xxx-yyy-zzz
assert_eq!(code.len(), 11);
assert_eq!(&code[3..4], "-");
assert_eq!(&code[7..8], "-");
// Parse back
let parsed = SessionId::from_code(&code).expect("Failed to parse code");
// Should get same session ID
assert_eq!(id, parsed);
}
#[test]
fn test_session_code_deterministic() {
// Same code should always produce same SessionId
let code = "abc-def-234";
let id1 = SessionId::from_code(code).unwrap();
let id2 = SessionId::from_code(code).unwrap();
assert_eq!(id1, id2);
}
#[test]
fn test_session_code_case_insensitive() {
// Codes should be case-insensitive
let id1 = SessionId::from_code("abc-def-234").unwrap();
let id2 = SessionId::from_code("ABC-DEF-234").unwrap();
assert_eq!(id1, id2);
}
#[test]
fn test_session_code_invalid_format() {
// Too short
assert!(SessionId::from_code("abc-def").is_err());
// Too long
assert!(SessionId::from_code("abc-def-1234").is_err());
// Missing dash
assert!(SessionId::from_code("abcdef-123").is_err());
assert!(SessionId::from_code("abc-def123").is_err());
// Wrong dash positions
assert!(SessionId::from_code("ab-cdef-123").is_err());
}
#[test]
fn test_alpn_derivation_deterministic() {
// Same session ID should always produce same ALPN
let id = SessionId::new();
let alpn1 = id.to_alpn();
let alpn2 = id.to_alpn();
assert_eq!(alpn1, alpn2);
}
#[test]
fn test_alpn_derivation_unique() {
// Different session IDs should produce different ALPNs
let id1 = SessionId::new();
let id2 = SessionId::new();
let alpn1 = id1.to_alpn();
let alpn2 = id2.to_alpn();
assert_ne!(alpn1, alpn2);
}
#[test]
fn test_alpn_length() {
// ALPN should always be 32 bytes
let id = SessionId::new();
let alpn = id.to_alpn();
assert_eq!(alpn.len(), 32);
}
#[test]
fn test_session_state_display() {
assert_eq!(SessionState::Created.to_string(), "created");
assert_eq!(SessionState::Joining.to_string(), "joining");
assert_eq!(SessionState::Active.to_string(), "active");
assert_eq!(SessionState::Disconnected.to_string(), "disconnected");
assert_eq!(SessionState::Left.to_string(), "left");
}
#[test]
fn test_session_state_from_str() {
assert_eq!(
SessionState::from_str("created"),
Some(SessionState::Created)
);
assert_eq!(
SessionState::from_str("joining"),
Some(SessionState::Joining)
);
assert_eq!(SessionState::from_str("active"), Some(SessionState::Active));
assert_eq!(
SessionState::from_str("disconnected"),
Some(SessionState::Disconnected)
);
assert_eq!(SessionState::from_str("left"), Some(SessionState::Left));
assert_eq!(SessionState::from_str("invalid"), None);
}
#[test]
fn test_session_creation() {
let id = SessionId::new();
let session = Session::new(id.clone());
assert_eq!(session.id, id);
assert_eq!(session.name, None);
assert_eq!(session.entity_count, 0);
assert_eq!(session.state, SessionState::Created);
assert_eq!(session.secret, None);
assert!(session.created_at > 0);
assert_eq!(session.created_at, session.last_active);
}
#[test]
fn test_session_transition() {
let id = SessionId::new();
let mut session = Session::new(id);
let initial_state = session.state;
let initial_time = session.last_active;
session.transition_to(SessionState::Joining);
assert_ne!(session.state, initial_state);
assert_eq!(session.state, SessionState::Joining);
// Timestamp should be updated (greater or equal due to precision)
assert!(session.last_active >= initial_time);
}
#[test]
fn test_session_display() {
let id = SessionId::new();
let code = id.to_code();
let display = format!("{}", id);
assert_eq!(code, &display);
}
#[test]
fn test_current_session_creation() {
let id = SessionId::new();
let session = Session::new(id);
let clock = VectorClock::new();
let current = CurrentSession::new(session.clone(), clock);
assert_eq!(current.session.id, session.id);
assert_eq!(current.session.state, SessionState::Created);
}
#[test]
fn test_current_session_transition() {
let id = SessionId::new();
let session = Session::new(id);
let clock = VectorClock::new();
let mut current = CurrentSession::new(session, clock);
current.transition_to(SessionState::Active);
assert_eq!(current.session.state, SessionState::Active);
}
}

View File

@@ -0,0 +1,257 @@
//! Session lifecycle management - startup and shutdown
//!
//! This module handles automatic session restoration on startup and clean
//! session persistence on shutdown. It enables seamless auto-rejoin after
//! app restarts.
//!
//! # Lifecycle Flow
//!
//! **Startup:**
//! 1. Check database for last active session
//! 2. If found and state is Active/Disconnected → auto-rejoin
//! 3. Load last known vector clock for hybrid sync
//! 4. Insert CurrentSession resource
//!
//! **Shutdown:**
//! 1. Update session metadata (state, last_active, entity_count)
//! 2. Save session to database
//! 3. Save current vector clock
//! 4. Mark clean shutdown in database
use bevy::prelude::*;
use crate::{
networking::{
CurrentSession,
Session,
SessionId,
SessionState,
VectorClock,
delta_generation::NodeVectorClock,
},
persistence::{
PersistenceDb,
get_last_active_session,
load_session_vector_clock,
save_session,
save_session_vector_clock,
},
};
/// System to initialize or restore session on startup
///
/// This system runs once at startup and either:
/// - Restores the last active session (auto-rejoin)
/// - Creates a new session
///
/// Add to your app as a Startup system AFTER setup_persistence:
/// ```no_run
/// use bevy::prelude::*;
/// use lib::networking::initialize_session_system;
///
/// App::new()
/// .add_systems(Startup, initialize_session_system);
/// ```
pub fn initialize_session_system(world: &mut World) {
info!("Initializing session...");
// Load session data in a scoped block to release the database lock
let session_data: Option<(Session, VectorClock)> = {
// Get database connection
let db = match world.get_resource::<PersistenceDb>() {
| Some(db) => db,
| None => {
error!("PersistenceDb resource not found - cannot initialize session");
return;
},
};
// Lock the database connection
let conn = match db.conn.lock() {
| Ok(conn) => conn,
| Err(e) => {
error!("Failed to lock database connection: {}", e);
return;
},
};
// Try to load last active session
match get_last_active_session(&conn) {
| Ok(Some(mut session)) => {
// Check if we should auto-rejoin
match session.state {
| SessionState::Active | SessionState::Disconnected => {
info!(
"Found previous session {} in state {:?} - attempting auto-rejoin",
session.id, session.state
);
// Load last known vector clock
let last_known_clock = match load_session_vector_clock(&conn, session.id.clone()) {
| Ok(clock) => clock,
| Err(e) => {
warn!(
"Failed to load vector clock for session {}: {} - using empty clock",
session.id, e
);
VectorClock::new()
},
};
// Transition to Joining state
session.transition_to(SessionState::Joining);
Some((session, last_known_clock))
},
| _ => {
// For Created, Left, or Joining states, create new session
None
},
}
},
| Ok(None) => None,
| Err(e) => {
error!("Failed to load last active session: {}", e);
None
},
}
}; // conn and db are dropped here, releasing the lock
// Now insert the session resource (no longer holding database lock)
let current_session = match session_data {
| Some((session, last_known_clock)) => {
info!("Session initialized for auto-rejoin");
CurrentSession::new(session, last_known_clock)
},
| None => {
info!("Creating new session");
let session_id = SessionId::new();
let session = Session::new(session_id);
CurrentSession::new(session, VectorClock::new())
},
};
world.insert_resource(current_session);
}
/// System to save session state on shutdown
///
/// This system should run during app shutdown to persist session state
/// for auto-rejoin on next startup.
///
/// Add to your app using the Last schedule:
/// ```no_run
/// use bevy::prelude::*;
/// use lib::networking::save_session_on_shutdown_system;
///
/// App::new()
/// .add_systems(Last, save_session_on_shutdown_system);
/// ```
pub fn save_session_on_shutdown_system(world: &mut World) {
info!("Saving session state on shutdown...");
// Get current session
let current_session = match world.get_resource::<CurrentSession>() {
| Some(session) => session.clone(),
| None => {
warn!("No CurrentSession found - skipping session save");
return;
},
};
let mut session = current_session.session.clone();
// Update session metadata
session.touch();
session.transition_to(SessionState::Left);
// Count entities in the world
let entity_count = world
.query::<&crate::networking::NetworkedEntity>()
.iter(world)
.count();
session.entity_count = entity_count;
// Get current vector clock
let vector_clock = world
.get_resource::<NodeVectorClock>()
.map(|nc| nc.clock.clone());
// Save to database in a scoped block
{
// Get database connection
let db = match world.get_resource::<PersistenceDb>() {
| Some(db) => db,
| None => {
error!("PersistenceDb resource not found - cannot save session");
return;
},
};
// Lock the database connection
let mut conn = match db.conn.lock() {
| Ok(conn) => conn,
| Err(e) => {
error!("Failed to lock database connection: {}", e);
return;
},
};
// Save session to database
match save_session(&mut conn, &session) {
| Ok(()) => {
info!("Session {} saved successfully", session.id);
},
| Err(e) => {
error!("Failed to save session {}: {}", session.id, e);
return;
},
}
// Save current vector clock
if let Some(ref clock) = vector_clock {
match save_session_vector_clock(&mut conn, session.id.clone(), clock) {
| Ok(()) => {
info!("Vector clock saved for session {}", session.id);
},
| Err(e) => {
error!("Failed to save vector clock for session {}: {}", session.id, e);
},
}
}
} // conn and db are dropped here
info!("Session state saved successfully");
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_initialize_session_creates_new() {
let mut app = App::new();
// Run initialize without PersistenceDb - should handle gracefully
initialize_session_system(&mut app.world_mut());
// Should not have CurrentSession (no db)
assert!(app.world().get_resource::<CurrentSession>().is_none());
}
#[test]
fn test_session_roundtrip() {
// Create a session
let session_id = SessionId::new();
let mut session = Session::new(session_id.clone());
session.entity_count = 5;
session.transition_to(SessionState::Active);
// Session should have updated timestamp (or equal if sub-millisecond)
assert!(session.last_active >= session.created_at);
assert_eq!(session.state, SessionState::Active);
assert_eq!(session.entity_count, 5);
}
}

View File

@@ -32,10 +32,12 @@ fn current_timestamp() -> i64 {
/// Initialize SQLite connection with WAL mode and optimizations
pub fn initialize_persistence_db<P: AsRef<Path>>(path: P) -> Result<Connection> {
let conn = Connection::open(path)?;
let mut conn = Connection::open(path)?;
configure_sqlite_for_persistence(&conn)?;
create_persistence_schema(&conn)?;
// Run migrations to ensure schema is up to date
crate::persistence::run_migrations(&mut conn)?;
Ok(conn)
}
@@ -464,6 +466,153 @@ pub fn mark_clean_shutdown(conn: &mut Connection) -> Result<()> {
set_session_state(conn, "clean_shutdown", "true")
}
//
// ============================================================================
// Session Management Operations
// ============================================================================
//
/// Save session metadata to database
pub fn save_session(conn: &mut Connection, session: &crate::networking::Session) -> Result<()> {
conn.execute(
"INSERT OR REPLACE INTO sessions (id, code, name, created_at, last_active, entity_count, state, secret)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
rusqlite::params![
session.id.as_uuid().as_bytes(),
session.id.to_code(),
session.name,
session.created_at,
session.last_active,
session.entity_count as i64,
session.state.to_string(),
session.secret,
],
)?;
Ok(())
}
/// Load session by ID
pub fn load_session(
conn: &Connection,
session_id: crate::networking::SessionId,
) -> Result<Option<crate::networking::Session>> {
conn.query_row(
"SELECT code, name, created_at, last_active, entity_count, state, secret
FROM sessions WHERE id = ?1",
[session_id.as_uuid().as_bytes()],
|row| {
let code: String = row.get(0)?;
let state_str: String = row.get(5)?;
let state = crate::networking::SessionState::from_str(&state_str)
.unwrap_or(crate::networking::SessionState::Created);
// Reconstruct SessionId from the stored code
let id = crate::networking::SessionId::from_code(&code)
.map_err(|_| rusqlite::Error::InvalidQuery)?;
Ok(crate::networking::Session {
id,
name: row.get(1)?,
created_at: row.get(2)?,
last_active: row.get(3)?,
entity_count: row.get::<_, i64>(4)? as usize,
state,
secret: row.get(6)?,
})
},
)
.optional()
.map_err(PersistenceError::from)
}
/// Get the most recently active session
pub fn get_last_active_session(conn: &Connection) -> Result<Option<crate::networking::Session>> {
conn.query_row(
"SELECT code, name, created_at, last_active, entity_count, state, secret
FROM sessions ORDER BY last_active DESC LIMIT 1",
[],
|row| {
let code: String = row.get(0)?;
let state_str: String = row.get(5)?;
let state = crate::networking::SessionState::from_str(&state_str)
.unwrap_or(crate::networking::SessionState::Created);
// Reconstruct SessionId from the stored code
let id = crate::networking::SessionId::from_code(&code)
.map_err(|_| rusqlite::Error::InvalidQuery)?;
Ok(crate::networking::Session {
id,
name: row.get(1)?,
created_at: row.get(2)?,
last_active: row.get(3)?,
entity_count: row.get::<_, i64>(4)? as usize,
state,
secret: row.get(6)?,
})
},
)
.optional()
.map_err(PersistenceError::from)
}
/// Save session vector clock to database
pub fn save_session_vector_clock(
conn: &mut Connection,
session_id: crate::networking::SessionId,
clock: &crate::networking::VectorClock,
) -> Result<()> {
let tx = conn.transaction()?;
// Delete old clock entries for this session
tx.execute(
"DELETE FROM vector_clock WHERE session_id = ?1",
[session_id.as_uuid().as_bytes()],
)?;
// Insert current clock state
for (node_id, &counter) in &clock.clocks {
tx.execute(
"INSERT INTO vector_clock (session_id, node_id, counter, updated_at)
VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![
session_id.as_uuid().as_bytes(),
node_id.to_string(),
counter as i64,
current_timestamp(),
],
)?;
}
tx.commit()?;
Ok(())
}
/// Load session vector clock from database
pub fn load_session_vector_clock(
conn: &Connection,
session_id: crate::networking::SessionId,
) -> Result<crate::networking::VectorClock> {
let mut stmt =
conn.prepare("SELECT node_id, counter FROM vector_clock WHERE session_id = ?1")?;
let mut clock = crate::networking::VectorClock::new();
let rows = stmt.query_map([session_id.as_uuid().as_bytes()], |row| {
let node_id_str: String = row.get(0)?;
let counter: i64 = row.get(1)?;
Ok((node_id_str, counter))
})?;
for row in rows {
let (node_id_str, counter) = row?;
if let Ok(node_id) = uuid::Uuid::parse_str(&node_id_str) {
clock.clocks.insert(node_id, counter as u64);
}
}
Ok(clock)
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -0,0 +1,189 @@
//! Database migration system
//!
//! Provides versioned schema migrations for SQLite database evolution.
use rusqlite::Connection;
use crate::persistence::error::Result;
/// Migration metadata
#[derive(Debug, Clone)]
pub struct Migration {
/// Migration version number
pub version: i64,
/// Migration name/description
pub name: &'static str,
/// SQL statements to apply
pub up: &'static str,
}
/// All available migrations in order
pub const MIGRATIONS: &[Migration] = &[
Migration {
version: 1,
name: "initial_schema",
up: include_str!("migrations/001_initial_schema.sql"),
},
Migration {
version: 4,
name: "sessions",
up: include_str!("migrations/004_sessions.sql"),
},
];
/// Initialize the migrations table
fn create_migrations_table(conn: &Connection) -> Result<()> {
conn.execute(
"CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
name TEXT NOT NULL,
applied_at INTEGER NOT NULL
)",
[],
)?;
Ok(())
}
/// Get the current schema version
pub fn get_current_version(conn: &Connection) -> Result<i64> {
create_migrations_table(conn)?;
let version = conn
.query_row(
"SELECT COALESCE(MAX(version), 0) FROM schema_migrations",
[],
|row| row.get(0),
)
.unwrap_or(0);
Ok(version)
}
/// Check if a migration has been applied
fn is_migration_applied(conn: &Connection, version: i64) -> Result<bool> {
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM schema_migrations WHERE version = ?1",
[version],
|row| row.get(0),
)?;
Ok(count > 0)
}
/// Apply a single migration
fn apply_migration(conn: &mut Connection, migration: &Migration) -> Result<()> {
tracing::info!(
"Applying migration {} ({})",
migration.version,
migration.name
);
let tx = conn.transaction()?;
// Execute the migration SQL
tx.execute_batch(migration.up)?;
// Record that we applied this migration
tx.execute(
"INSERT INTO schema_migrations (version, name, applied_at)
VALUES (?1, ?2, ?3)",
rusqlite::params![
migration.version,
migration.name,
chrono::Utc::now().timestamp(),
],
)?;
tx.commit()?;
tracing::info!(
"Migration {} ({}) applied successfully",
migration.version,
migration.name
);
Ok(())
}
/// Run all pending migrations
pub fn run_migrations(conn: &mut Connection) -> Result<()> {
create_migrations_table(conn)?;
let current_version = get_current_version(conn)?;
tracing::info!("Current schema version: {}", current_version);
let mut applied_count = 0;
for migration in MIGRATIONS {
if !is_migration_applied(conn, migration.version)? {
apply_migration(conn, migration)?;
applied_count += 1;
}
}
if applied_count > 0 {
tracing::info!("Applied {} migration(s)", applied_count);
} else {
tracing::debug!("No pending migrations");
}
Ok(())
}
#[cfg(test)]
mod tests {
use rusqlite::Connection;
use super::*;
#[test]
fn test_migration_system() {
let mut conn = Connection::open_in_memory().unwrap();
// Initially at version 0
assert_eq!(get_current_version(&conn).unwrap(), 0);
// Run migrations
run_migrations(&mut conn).unwrap();
// Should be at latest version
let latest_version = MIGRATIONS.last().unwrap().version;
assert_eq!(get_current_version(&conn).unwrap(), latest_version);
// Running again should be a no-op
run_migrations(&mut conn).unwrap();
assert_eq!(get_current_version(&conn).unwrap(), latest_version);
}
#[test]
fn test_migrations_table_created() {
let conn = Connection::open_in_memory().unwrap();
create_migrations_table(&conn).unwrap();
// Should be able to query the table
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM schema_migrations", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(count, 0);
}
#[test]
fn test_is_migration_applied() {
let conn = Connection::open_in_memory().unwrap();
create_migrations_table(&conn).unwrap();
// Migration 1 should not be applied yet
assert!(!is_migration_applied(&conn, 1).unwrap());
// Apply migration 1
conn.execute(
"INSERT INTO schema_migrations (version, name, applied_at) VALUES (1, 'test', 0)",
[],
)
.unwrap();
// Now it should be applied
assert!(is_migration_applied(&conn, 1).unwrap());
}
}

View File

@@ -0,0 +1,62 @@
-- Migration 001: Initial schema
-- Creates the base tables for entity persistence and CRDT sync
-- Entities table - stores entity metadata
CREATE TABLE IF NOT EXISTS entities (
id BLOB PRIMARY KEY,
entity_type TEXT NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
-- Components table - stores serialized component data
CREATE TABLE IF NOT EXISTS components (
entity_id BLOB NOT NULL,
component_type TEXT NOT NULL,
data BLOB NOT NULL,
updated_at INTEGER NOT NULL,
PRIMARY KEY (entity_id, component_type),
FOREIGN KEY (entity_id) REFERENCES entities(id) ON DELETE CASCADE
);
-- Index for querying components by entity
CREATE INDEX IF NOT EXISTS idx_components_entity
ON components(entity_id);
-- Operation log - for CRDT sync protocol
CREATE TABLE IF NOT EXISTS operation_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
node_id TEXT NOT NULL,
sequence_number INTEGER NOT NULL,
operation BLOB NOT NULL,
timestamp INTEGER NOT NULL,
UNIQUE(node_id, sequence_number)
);
-- Index for efficient operation log queries
CREATE INDEX IF NOT EXISTS idx_oplog_node_seq
ON operation_log(node_id, sequence_number);
-- Vector clock table - for causality tracking
CREATE TABLE IF NOT EXISTS vector_clock (
node_id TEXT PRIMARY KEY,
counter INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
-- Session state table - for crash detection
CREATE TABLE IF NOT EXISTS session_state (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at INTEGER NOT NULL
);
-- WAL checkpoint tracking
CREATE TABLE IF NOT EXISTS checkpoint_state (
last_checkpoint INTEGER NOT NULL,
wal_size_bytes INTEGER NOT NULL
);
-- Initialize checkpoint state if not exists
INSERT OR IGNORE INTO checkpoint_state (rowid, last_checkpoint, wal_size_bytes)
VALUES (1, strftime('%s', 'now'), 0);

View File

@@ -0,0 +1,51 @@
-- Migration 004: Add session support
-- Adds session tables and session-scopes existing tables
-- Sessions table
CREATE TABLE IF NOT EXISTS sessions (
id BLOB PRIMARY KEY,
code TEXT NOT NULL,
name TEXT,
created_at INTEGER NOT NULL,
last_active INTEGER NOT NULL,
entity_count INTEGER NOT NULL DEFAULT 0,
state TEXT NOT NULL,
secret BLOB,
UNIQUE(id),
UNIQUE(code)
);
-- Index for finding recent sessions
CREATE INDEX IF NOT EXISTS idx_sessions_last_active
ON sessions(last_active DESC);
-- Session membership (which node was in which session)
CREATE TABLE IF NOT EXISTS session_membership (
session_id BLOB NOT NULL,
node_id TEXT NOT NULL,
joined_at INTEGER NOT NULL,
left_at INTEGER,
PRIMARY KEY (session_id, node_id),
FOREIGN KEY (session_id) REFERENCES sessions(id) ON DELETE CASCADE
);
-- Add session_id to entities table
ALTER TABLE entities ADD COLUMN session_id BLOB;
-- Index for session-scoped entity queries
CREATE INDEX IF NOT EXISTS idx_entities_session
ON entities(session_id);
-- Add session_id to vector_clock
ALTER TABLE vector_clock ADD COLUMN session_id BLOB;
-- Composite index for session + node lookups
CREATE INDEX IF NOT EXISTS idx_vector_clock_session_node
ON vector_clock(session_id, node_id);
-- Add session_id to operation_log
ALTER TABLE operation_log ADD COLUMN session_id BLOB;
-- Index for session-scoped operation queries
CREATE INDEX IF NOT EXISTS idx_operation_log_session
ON operation_log(session_id, node_id, sequence_number);

View File

@@ -36,6 +36,7 @@ mod error;
mod health;
mod lifecycle;
mod metrics;
mod migrations;
mod plugin;
pub mod reflection;
mod systems;
@@ -47,6 +48,7 @@ pub use error::*;
pub use health::*;
pub use lifecycle::*;
pub use metrics::*;
pub use migrations::*;
pub use plugin::*;
pub use reflection::*;
pub use systems::*;

View File

@@ -195,18 +195,29 @@ mod tests {
let node1 = uuid::Uuid::from_u128(1);
let node2 = uuid::Uuid::from_u128(2);
// Create SyncedValue FIRST, then capture a timestamp that's guaranteed to be newer
// Create SyncedValue FIRST, then capture a timestamp that's guaranteed to be
// newer
let mut lww = SyncedValue::new(100, node1);
std::thread::sleep(std::time::Duration::from_millis(1)); // Ensure ts is after init
let ts = Utc::now();
// Apply update from node1 at timestamp ts
lww.apply_lww(100, ts, node1);
println!("After node1 update: value={}, ts={:?}, node={}", lww.get(), lww.timestamp, lww.node_id);
println!(
"After node1 update: value={}, ts={:?}, node={}",
lww.get(),
lww.timestamp,
lww.node_id
);
// Apply conflicting update from node2 at SAME timestamp
lww.apply_lww(200, ts, node2);
println!("After node2 update: value={}, ts={:?}, node={}", lww.get(), lww.timestamp, lww.node_id);
println!(
"After node2 update: value={}, ts={:?}, node={}",
lww.get(),
lww.timestamp,
lww.node_id
);
// node2 > node1, so value2 should win
assert_eq!(*lww.get(), 200, "Higher node_id should win tiebreaker");

View File

@@ -15,12 +15,21 @@ fn test_gossip_bridge_creation() {
#[test]
fn test_gossip_bridge_send() {
use lib::networking::{
JoinType,
SessionId,
};
let node_id = uuid::Uuid::new_v4();
let bridge = init_gossip_bridge(node_id);
let session_id = SessionId::new();
let message = SyncMessage::JoinRequest {
node_id,
session_id,
session_secret: None,
last_known_clock: None,
join_type: JoinType::Fresh,
};
let versioned = VersionedMessage::new(message);

View File

@@ -41,12 +41,16 @@ use iroh_gossip::{
};
use lib::{
networking::{
EntityLockRegistry,
GossipBridge,
LockMessage,
NetworkedEntity,
NetworkedSelection,
NetworkedTransform,
NetworkingConfig,
NetworkingPlugin,
Synced,
SyncMessage,
VersionedMessage,
},
persistence::{
@@ -210,7 +214,8 @@ mod test_utils {
// Register test component types for reflection
app.register_type::<TestPosition>()
.register_type::<TestHealth>();
.register_type::<TestHealth>()
.register_type::<NetworkedSelection>();
app
}
@@ -299,10 +304,10 @@ mod test_utils {
topic_id: TopicId,
bootstrap_addrs: Vec<iroh::EndpointAddr>,
) -> Result<(Endpoint, Gossip, Router, GossipBridge)> {
println!(" Creating endpoint with mDNS discovery...");
// Create the Iroh endpoint with mDNS local discovery
println!(" Creating endpoint (localhost only for fast testing)...");
// Create the Iroh endpoint bound to localhost only (no mDNS needed)
let endpoint = Endpoint::builder()
.discovery(iroh::discovery::mdns::MdnsDiscovery::builder())
.bind_addr_v4(std::net::SocketAddrV4::new(std::net::Ipv4Addr::LOCALHOST, 0))
.bind()
.await?;
let endpoint_id = endpoint.addr().id;
@@ -324,7 +329,7 @@ mod test_utils {
.accept(iroh_gossip::ALPN, gossip.clone())
.spawn();
// Add bootstrap peers to endpoint's discovery using StaticProvider
// Add bootstrap peers using StaticProvider for direct localhost connections
let bootstrap_count = bootstrap_addrs.len();
let has_bootstrap_peers = !bootstrap_addrs.is_empty();
@@ -337,49 +342,28 @@ mod test_utils {
static_provider.add_endpoint_info(addr.clone());
}
endpoint.discovery().add(static_provider);
println!(
" Added {} bootstrap peers to static discovery",
bootstrap_count
);
println!(" Added {} bootstrap peers to discovery", bootstrap_count);
// Explicitly connect to bootstrap peers
println!(" Connecting to bootstrap peers...");
// Connect to bootstrap peers (localhost connections are instant)
for addr in &bootstrap_addrs {
match endpoint.connect(addr.clone(), iroh_gossip::ALPN).await {
| Ok(_conn) => println!(" ✓ Connected to bootstrap peer: {}", addr.id),
| Err(e) => {
println!(" ✗ Failed to connect to bootstrap peer {}: {}", addr.id, e)
},
| Ok(_conn) => println!(" ✓ Connected to {}", addr.id),
| Err(e) => println!(" ✗ Connection failed: {}", e),
}
}
}
println!(
" Subscribing to topic with {} bootstrap peers...",
bootstrap_count
);
// Subscribe to the topic (the IDs now have addresses via discovery)
// Subscribe to the topic
let subscribe_handle = gossip.subscribe(topic_id, bootstrap_ids).await?;
println!(" Splitting sender/receiver...");
// Split into sender and receiver
let (sender, mut receiver) = subscribe_handle.split();
// Only wait for join if we have bootstrap peers
// receiver.joined() waits until we've connected to at least one peer
// If there are no bootstrap peers (first node), skip this step
// Wait for join if we have bootstrap peers (should be instant on localhost)
if has_bootstrap_peers {
println!(" Waiting for join to complete (with timeout)...");
// Use a timeout in case mDNS discovery takes a while or fails
match tokio::time::timeout(Duration::from_secs(3), receiver.joined()).await {
| Ok(Ok(())) => println!(" Join completed!"),
| Ok(Err(e)) => println!(" Join error: {}", e),
| Err(_) => {
println!(" Join timeout - proceeding anyway (mDNS may still connect later)")
},
match tokio::time::timeout(Duration::from_millis(500), receiver.joined()).await {
| Ok(Ok(())) => println!(" ✓ Join completed"),
| Ok(Err(e)) => println!(" ✗ Join error: {}", e),
| Err(_) => println!(" Join timeout (proceeding anyway)"),
}
} else {
println!(" No bootstrap peers - skipping join wait (first node in swarm)");
}
// Create bridge and wire it up
@@ -422,10 +406,8 @@ mod test_utils {
init_gossip_node(topic_id, vec![node1_addr]).await?;
println!("Node 2 initialized with ID: {}", ep2.addr().id);
// Give mDNS and gossip time to discover peers
println!("Waiting for mDNS/gossip peer discovery...");
tokio::time::sleep(Duration::from_secs(2)).await;
println!("Peer discovery wait complete");
// Brief wait for gossip protocol to stabilize (localhost is fast)
tokio::time::sleep(Duration::from_millis(200)).await;
Ok((ep1, ep2, router1, router2, bridge1, bridge2))
}
@@ -1038,3 +1020,346 @@ async fn test_persistence_crash_recovery() -> Result<()> {
Ok(())
}
/// Test 5: Lock heartbeat renewal mechanism
#[tokio::test(flavor = "multi_thread")]
async fn test_lock_heartbeat_renewal() -> Result<()> {
use test_utils::*;
println!("=== Starting test_lock_heartbeat_renewal ===");
let ctx1 = TestContext::new();
let ctx2 = TestContext::new();
let (ep1, ep2, router1, router2, bridge1, bridge2) = setup_gossip_pair().await?;
let node1_id = bridge1.node_id();
let node2_id = bridge2.node_id();
let mut app1 = create_test_app(node1_id, ctx1.db_path(), bridge1);
let mut app2 = create_test_app(node2_id, ctx2.db_path(), bridge2);
// Spawn entity
let entity_id = Uuid::new_v4();
let _ = app1.world_mut()
.spawn((
NetworkedEntity::with_id(entity_id, node1_id),
TestPosition { x: 10.0, y: 20.0 },
Persisted::with_id(entity_id),
Synced,
))
.id();
wait_for_sync(&mut app1, &mut app2, Duration::from_secs(3), |_, w2| {
count_entities_with_id(w2, entity_id) > 0
})
.await?;
println!("✓ Entity synced");
// Acquire lock on both nodes
{
let world = app1.world_mut();
let mut registry = world.resource_mut::<EntityLockRegistry>();
registry.try_acquire(entity_id, node1_id).ok();
}
{
let bridge = app1.world().resource::<GossipBridge>();
let msg = VersionedMessage::new(SyncMessage::Lock(LockMessage::LockRequest {
entity_id,
node_id: node1_id,
}));
bridge.send(msg).ok();
}
for _ in 0..5 {
app1.update();
app2.update();
tokio::time::sleep(Duration::from_millis(100)).await;
}
// Verify both nodes have the lock
{
let registry1 = app1.world().resource::<EntityLockRegistry>();
let registry2 = app2.world().resource::<EntityLockRegistry>();
assert!(registry1.is_locked(entity_id), "Lock should exist on node 1");
assert!(registry2.is_locked(entity_id), "Lock should exist on node 2");
println!("✓ Lock acquired on both nodes");
}
// Test heartbeat renewal: send a few heartbeats and verify locks persist
for i in 0..3 {
// Renew on node 1
{
let world = app1.world_mut();
let mut registry = world.resource_mut::<EntityLockRegistry>();
assert!(
registry.renew_heartbeat(entity_id, node1_id),
"Should successfully renew lock"
);
}
// Send heartbeat to node 2
{
let bridge = app1.world().resource::<GossipBridge>();
let msg = VersionedMessage::new(SyncMessage::Lock(LockMessage::LockHeartbeat {
entity_id,
holder: node1_id,
}));
bridge.send(msg).ok();
}
// Process
for _ in 0..3 {
app1.update();
app2.update();
tokio::time::sleep(Duration::from_millis(50)).await;
}
// Verify locks still exist after heartbeat
{
let registry1 = app1.world().resource::<EntityLockRegistry>();
let registry2 = app2.world().resource::<EntityLockRegistry>();
assert!(
registry1.is_locked(entity_id),
"Lock should persist on node 1 after heartbeat {}",
i + 1
);
assert!(
registry2.is_locked(entity_id),
"Lock should persist on node 2 after heartbeat {}",
i + 1
);
}
}
println!("✓ Heartbeat renewal mechanism working correctly");
router1.shutdown().await?;
router2.shutdown().await?;
ep1.close().await;
ep2.close().await;
Ok(())
}
/// Test 6: Lock expires without heartbeats
#[tokio::test(flavor = "multi_thread")]
async fn test_lock_heartbeat_expiration() -> Result<()> {
use test_utils::*;
println!("=== Starting test_lock_heartbeat_expiration ===");
let ctx1 = TestContext::new();
let ctx2 = TestContext::new();
let (ep1, ep2, router1, router2, bridge1, bridge2) = setup_gossip_pair().await?;
let node1_id = bridge1.node_id();
let node2_id = bridge2.node_id();
let mut app1 = create_test_app(node1_id, ctx1.db_path(), bridge1);
let mut app2 = create_test_app(node2_id, ctx2.db_path(), bridge2);
// Node 1 spawns entity and selects it
let entity_id = Uuid::new_v4();
let _ = app1.world_mut()
.spawn((
NetworkedEntity::with_id(entity_id, node1_id),
NetworkedSelection::default(),
TestPosition { x: 10.0, y: 20.0 },
Persisted::with_id(entity_id),
Synced,
))
.id();
// Wait for sync
wait_for_sync(&mut app1, &mut app2, Duration::from_secs(5), |_, w2| {
count_entities_with_id(w2, entity_id) > 0
})
.await?;
// Acquire lock locally on node 1 (gossip doesn't loop back to sender)
{
let world = app1.world_mut();
let mut registry = world.resource_mut::<EntityLockRegistry>();
registry.try_acquire(entity_id, node1_id).ok();
}
// Broadcast LockRequest so other nodes apply optimistically
{
let bridge = app1.world().resource::<GossipBridge>();
let msg = VersionedMessage::new(SyncMessage::Lock(LockMessage::LockRequest {
entity_id,
node_id: node1_id,
}));
bridge.send(msg).ok();
}
// Update to allow lock propagation
for _ in 0..10 {
app1.update();
app2.update();
tokio::time::sleep(Duration::from_millis(100)).await;
}
// Verify lock acquired
wait_for_sync(&mut app1, &mut app2, Duration::from_secs(2), |_, w2| {
let registry2 = w2.resource::<EntityLockRegistry>();
registry2.is_locked(entity_id)
})
.await?;
println!("✓ Lock acquired and propagated");
// Simulate node 1 crash: remove lock from node 1's registry without sending release
// This stops heartbeat broadcasts from node 1
{
let mut registry = app1.world_mut().resource_mut::<EntityLockRegistry>();
registry.force_release(entity_id);
println!("✓ Simulated node 1 crash (stopped heartbeats)");
}
// Force the lock to expire on node 2 (simulating 5+ seconds passing without heartbeats)
{
let mut registry = app2.world_mut().resource_mut::<EntityLockRegistry>();
registry.expire_lock_for_testing(entity_id);
println!("✓ Forced lock to appear expired on node 2");
}
// Run cleanup system (which removes expired locks and broadcasts LockReleased)
println!("Running cleanup to expire locks...");
for _ in 0..10 {
app2.update();
tokio::time::sleep(Duration::from_millis(100)).await;
}
// Verify lock was removed from node 2
{
let registry = app2.world().resource::<EntityLockRegistry>();
assert!(
!registry.is_locked(entity_id),
"Lock should be expired on node 2 after cleanup"
);
println!("✓ Lock expired on node 2 after 5 seconds without heartbeat");
}
println!("✓ Lock heartbeat expiration test passed");
router1.shutdown().await?;
router2.shutdown().await?;
ep1.close().await;
ep2.close().await;
Ok(())
}
/// Test 7: Lock release stops heartbeats
#[tokio::test(flavor = "multi_thread")]
async fn test_lock_release_stops_heartbeats() -> Result<()> {
use test_utils::*;
println!("=== Starting test_lock_release_stops_heartbeats ===");
let ctx1 = TestContext::new();
let ctx2 = TestContext::new();
let (ep1, ep2, router1, router2, bridge1, bridge2) = setup_gossip_pair().await?;
let node1_id = bridge1.node_id();
let node2_id = bridge2.node_id();
let mut app1 = create_test_app(node1_id, ctx1.db_path(), bridge1);
let mut app2 = create_test_app(node2_id, ctx2.db_path(), bridge2);
// Node 1 spawns entity and selects it
let entity_id = Uuid::new_v4();
let _ = app1.world_mut()
.spawn((
NetworkedEntity::with_id(entity_id, node1_id),
NetworkedSelection::default(),
TestPosition { x: 10.0, y: 20.0 },
Persisted::with_id(entity_id),
Synced,
))
.id();
// Wait for sync
wait_for_sync(&mut app1, &mut app2, Duration::from_secs(5), |_, w2| {
count_entities_with_id(w2, entity_id) > 0
})
.await?;
// Acquire lock locally on node 1 (gossip doesn't loop back to sender)
{
let world = app1.world_mut();
let mut registry = world.resource_mut::<EntityLockRegistry>();
registry.try_acquire(entity_id, node1_id).ok();
}
// Broadcast LockRequest so other nodes apply optimistically
{
let bridge = app1.world().resource::<GossipBridge>();
let msg = VersionedMessage::new(SyncMessage::Lock(LockMessage::LockRequest {
entity_id,
node_id: node1_id,
}));
bridge.send(msg).ok();
}
// Update to allow lock propagation
for _ in 0..10 {
app1.update();
app2.update();
tokio::time::sleep(Duration::from_millis(100)).await;
}
// Wait for lock to propagate
wait_for_sync(&mut app1, &mut app2, Duration::from_secs(2), |_, w2| {
let registry2 = w2.resource::<EntityLockRegistry>();
registry2.is_locked(entity_id)
})
.await?;
println!("✓ Lock acquired and propagated");
// Release lock on node 1
{
let world = app1.world_mut();
let mut registry = world.resource_mut::<EntityLockRegistry>();
if registry.release(entity_id, node1_id) {
println!("✓ Lock released on node 1");
}
}
// Broadcast LockRelease message to other nodes
{
let bridge = app1.world().resource::<GossipBridge>();
let msg = VersionedMessage::new(SyncMessage::Lock(LockMessage::LockRelease {
entity_id,
node_id: node1_id,
}));
bridge.send(msg).ok();
}
// Update to trigger lock release propagation
for _ in 0..10 {
app1.update();
app2.update();
tokio::time::sleep(Duration::from_millis(100)).await;
}
// Wait for release to propagate to node 2
wait_for_sync(&mut app1, &mut app2, Duration::from_secs(3), |_, w2| {
let registry2 = w2.resource::<EntityLockRegistry>();
!registry2.is_locked(entity_id)
})
.await?;
println!("✓ Lock release propagated to node 2");
println!("✓ Lock release stops heartbeats test passed");
router1.shutdown().await?;
router2.shutdown().await?;
ep1.close().await;
ep2.close().await;
Ok(())
}

View File

@@ -68,45 +68,32 @@ The session data model defines how collaborative sessions are identified, tracke
### Session Identification
Each collaborative session needs a globally unique identifier that's both machine-readable and human-friendly. We use UUIDs internally for uniqueness while providing a human-readable "session code" format (like `abc-def-123`) that users can easily share and enter manually.
Each collaborative session needs a unique identifier that users can easily share and enter. The design prioritizes human usability while maintaining technical uniqueness and security.
The `SessionId` type wraps a UUID but provides bidirectional conversion to/from 6-character alphanumeric codes. This code format makes it easy to verbally communicate session IDs ("join session abc-def-one-two-three") or manually type them into a join dialog.
**User-Facing Session Codes**:
Additionally, each session ID can be deterministically converted to an ALPN (Application-Layer Protocol Negotiation) identifier using BLAKE3 hashing. This ensures that peers in different sessions are cryptographically isolated at the network transport layer - they literally cannot discover or communicate with each other.
Sessions are identified by short, memorable codes in the format `abc-def-123` (9 alphanumeric characters in three groups). This format is:
- **Easy to communicate verbally**: "Join session abc-def-one-two-three"
- **Simple to type**: No confusing characters (0 vs O, 1 vs l)
- **Shareable**: Can be sent via chat, email, or written down
```rust
/// Unique identifier for a collaborative session
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)]
pub struct SessionId(Uuid);
When a user creates a session, they see a code like `xyz-789-mno` that they can share with collaborators. When joining, they simply type this code into a dialog.
impl SessionId {
/// Create a new random session ID
pub fn new() -> Self {
Self(Uuid::new_v4())
}
**Technical Implementation**:
/// Create from a human-readable code (e.g., "abc-def-ghi")
pub fn from_code(code: &str) -> Result<Self> {
// Parse format: xxx-yyy-zzz (3 groups of 3 lowercase alphanumeric)
// Maps to deterministic UUID using hash
let uuid = generate_uuid_from_code(code)?;
Ok(Self(uuid))
}
Behind the scenes, each session code maps to a UUID (Universally Unique Identifier) that provides true global uniqueness. The `SessionId` type handles bidirectional conversion:
- User codes → UUIDs via deterministic hashing
- UUIDs → display codes via formatting
/// Get human-readable code (first 9 characters of UUID formatted)
pub fn to_code(&self) -> String {
format_uuid_as_code(&self.0)
}
**Network Isolation**:
/// Derive ALPN protocol identifier from session ID
pub fn to_alpn(&self) -> [u8; 32] {
let mut hasher = blake3::Hasher::new();
hasher.update(b"lonni-session-v1");
hasher.update(self.0.as_bytes());
*hasher.finalize().as_bytes()
}
}
```
Each session ID also derives a unique ALPN (Application-Layer Protocol Negotiation) identifier using BLAKE3 hashing. This provides cryptographic isolation at the transport layer - peers in different sessions literally cannot discover or communicate with each other, even if they're on the same local network.
**Key Operations**:
- `SessionId::new()`: Generates a random UUID v4 for new sessions
- `SessionId::from_code(code)`: Parses human-readable codes (format: `xxx-yyy-zzz`) into UUIDs
- `to_code()`: Converts UUID to a 9-character alphanumeric code for display
- `to_alpn()`: Derives a 32-byte ALPN identifier for network isolation
### Session Metadata
@@ -120,58 +107,23 @@ This metadata serves several purposes:
The `CurrentSession` resource represents the active session within the Bevy ECS world. It includes both the session metadata and the vector clock state at the time of joining, which is essential for the hybrid sync protocol.
```rust
/// Metadata about a collaborative session
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Session {
/// Unique identifier for this session
pub id: SessionId,
**Session Structure**:
/// Human-readable name (optional)
pub name: Option<String>,
The `Session` struct contains:
- **id**: Unique `SessionId` identifying this session
- **name**: Optional human-readable label (e.g., "Monday Design Review")
- **created_at**: Timestamp of session creation
- **last_active**: When this node was last active in the session (for auto-rejoin)
- **entity_count**: Cached count of entities (for UI display)
- **state**: Current lifecycle state (see state machine below)
- **secret**: Optional encrypted password for session access control
/// When this session was created
pub created_at: DateTime<Utc>,
**Session States**:
/// Last time this node was active in this session
pub last_active: DateTime<Utc>,
Five states track the session lifecycle (see "Session State Transitions" section below for detailed state machine):
- `Created`, `Joining`, `Active`, `Disconnected`, `Left`
/// How many entities are in this session (cached)
pub entity_count: usize,
/// Session state
pub state: SessionState,
/// Optional session secret for authentication
pub secret: Option<Vec<u8>>,
}
/// Current state of a session
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum SessionState {
/// Session created but not yet joined network
Created,
/// Currently joining (waiting for FullState or deltas)
Joining,
/// Fully synchronized and active
Active,
/// Temporarily disconnected (will attempt rejoin)
Disconnected,
/// Cleanly left (archived, can rejoin later)
Left,
}
/// Bevy resource tracking the current session
#[derive(Resource)]
pub struct CurrentSession {
pub session: Session,
pub vector_clock_at_join: VectorClock,
}
```
The `CurrentSession` Bevy resource wraps the session metadata along with the vector clock captured at join time. This clock snapshot enables the hybrid sync protocol to determine which deltas are needed when rejoining.
### Database Schema
@@ -272,15 +224,9 @@ Instead of using a single global gossip topic, each session gets its own ALPN (A
Each session derives a unique ALPN identifier using BLAKE3 cryptographic hashing. The derivation is deterministic - the same session ID always produces the same ALPN - which allows all peers to independently compute the correct ALPN for a session they want to join.
```rust
/// Derive a unique ALPN protocol identifier from a session ID
pub fn derive_alpn_from_session(session_id: &SessionId) -> [u8; 32] {
let mut hasher = blake3::Hasher::new();
hasher.update(b"/app/v1/session-id/"); // Domain separation prefix
hasher.update(session_id.0.as_bytes());
*hasher.finalize().as_bytes()
}
```
**Derivation Process**:
The ALPN is computed by hashing the session UUID with BLAKE3, using a domain separation prefix (`/app/v1/session-id/`) followed by the session ID bytes. This produces a deterministic 32-byte identifier that all peers independently compute from the same session code.
The design provides several security and isolation guarantees:
@@ -301,59 +247,29 @@ The gossip network initialization is modified to use session-specific ALPNs inst
The combination ensures both local and remote sessions work seamlessly.
```rust
/// Initialize gossip with session-specific ALPN
async fn init_gossip_for_session(session: &Session) -> Result<GossipBridge> {
info!("Creating endpoint with discovery...");
let endpoint = Endpoint::builder()
.discovery(MdnsDiscovery::builder()) // Local network
.discovery(PkarrDiscovery::builder()) // Internet-wide via pkarr DNS
.bind()
.await?;
**Initialization Flow**:
let endpoint_id = endpoint.addr().id;
let node_id = endpoint_id_to_uuid(&endpoint_id);
The initialization process has three temporal phases:
info!("Node ID: {}", node_id);
info!("Session ID: {}", session.id.to_code());
**One-time Endpoint Setup** (occurs once per application launch):
1. **Endpoint Creation**: Build an iroh `Endpoint` with both mDNS and Pkarr discovery mechanisms enabled
2. **Gossip Protocol**: Spawn the gossip protocol handler using `Gossip::builder().spawn(endpoint)`
// Derive session-specific ALPN
let alpn = session.id.to_alpn();
info!("Using session ALPN: {}", hex::encode(&alpn[..8]));
**Per-session Connection** (occurs when joining each session):
3. **ALPN Derivation**: Call `session.id.to_alpn()` to compute the 32-byte session-specific ALPN identifier
4. **Router Configuration**: Create a router that only accepts connections on the session's ALPN
- Critical: Use the derived ALPN, not the default `iroh_gossip::ALPN`
- This enforces network isolation at the transport layer
5. **Topic Subscription**: Subscribe to a gossip topic derived from the ALPN (can reuse the same bytes)
6. **Join Wait**: Wait up to 2 seconds for the join confirmation
- Timeout is expected for the first node in a session (no peers yet)
- Errors are logged but don't prevent continuing
info!("Spawning gossip protocol...");
let gossip = Gossip::builder().spawn(endpoint.clone());
**Background Operation** (runs continuously while session is active):
7. **Bridge Creation**: Create a `GossipBridge` that wraps the gossip channels and provides session context
8. **Task Spawning**: Launch background tasks to forward messages between gossip and application
// Router accepts connections with this specific ALPN only
info!("Setting up router with session ALPN...");
let router = Router::builder(endpoint.clone())
.accept(alpn, gossip.clone()) // NOTE: Use session ALPN, not iroh_gossip::ALPN
.spawn();
// Subscribe to topic (can use session ID directly as topic)
let topic_id = TopicId::from_bytes(alpn);
info!("Subscribing to session topic...");
let subscribe_handle = gossip.subscribe(topic_id, vec![]).await?;
let (sender, mut receiver) = subscribe_handle.split();
// Wait for join (with timeout)
info!("Waiting for gossip join...");
match tokio::time::timeout(Duration::from_secs(2), receiver.joined()).await {
Ok(Ok(())) => info!("Joined session gossip swarm"),
Ok(Err(e)) => warn!("Join error: {} (proceeding anyway)", e),
Err(_) => info!("Join timeout (first node in session)"),
}
// Create bridge with session context
let bridge = GossipBridge::new_with_session(node_id, session.id.clone());
// Spawn forwarding tasks
spawn_bridge_tasks(sender, receiver, bridge.clone(), endpoint, router, gossip);
Ok(bridge)
}
```
The key architectural decision is using the same ALPN bytes for both transport-layer connection acceptance and application-layer topic identification. This ensures consistent isolation across both layers.
### Session Discovery
@@ -405,42 +321,17 @@ The key fields are:
Existing peers use this information to decide: "Can I send just deltas, or do I need to send the full state?"
```rust
/// Request to join a session
JoinRequest {
/// ID of the node requesting to join
node_id: NodeId,
**JoinRequest Message Structure**:
/// Session ID we're trying to join
session_id: SessionId,
| Field | Type | Purpose |
|-------|------|---------|
| `node_id` | `NodeId` | Identifier of the joining node |
| `session_id` | `SessionId` | Target session UUID |
| `session_secret` | `Option<Vec<u8>>` | Authentication credential if session is password-protected |
| `last_known_clock` | `Option<VectorClock>` | Vector clock from previous participation; `None` indicates fresh join requiring full state |
| `join_type` | `JoinType` | Enum: `Fresh` or `Rejoin { last_active, entity_count }` |
/// Optional session secret for authentication
session_secret: Option<Vec<u8>>,
/// Our last known vector clock for this session (if rejoining)
/// None = fresh join (need full state)
/// Some = rejoin (only need deltas since this clock)
last_known_clock: Option<VectorClock>,
/// Are we rejoining or joining fresh?
join_type: JoinType,
},
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum JoinType {
/// First time joining this session
Fresh,
/// Rejoining after disconnect/restart
Rejoin {
/// When we last left
last_active: DateTime<Utc>,
/// How many entities we had
entity_count: usize,
},
}
```
The `last_known_clock` field is the key discriminator: its presence signals that the node has persistent state and only needs deltas, while its absence triggers full state transfer.
### Join Flow: Fresh Join
@@ -532,103 +423,40 @@ sequenceDiagram
The join handler is a Bevy system that runs on existing peers and responds to incoming `JoinRequest` messages. Its responsibility is to decide whether to send full state or deltas based on the joining node's vector clock.
The handler performs several critical validations:
**Message Processing Loop**:
1. **Session ID Validation**: Ensures the request is for the current session (prevents cross-session pollution)
2. **Secret Validation**: If the session is password-protected, validates the provided secret using constant-time comparison
3. **Clock Analysis**: Compares the requester's `last_known_clock` with the operation log to determine if deltas are feasible
4. **Response Selection**: Chooses between sending `MissingDeltas` (for rejoins with <1000 operations) or `FullState` (for fresh joins or large deltas)
The system polls the gossip bridge for incoming messages and filters for `JoinRequest` messages. For each request, it performs a multi-stage validation and response pipeline:
The 1000-operation threshold is a heuristic: below this, sending individual deltas is more efficient than serializing the entire world. Above it, the overhead of transmitting many small deltas exceeds the cost of sending a snapshot.
**Stage 1: Security Validation**
```rust
pub fn handle_join_request_system(
world: &World,
bridge: Res<GossipBridge>,
current_session: Res<CurrentSession>,
operation_log: Res<OperationLog>,
networked_entities: Query<(Entity, &NetworkedEntity)>,
type_registry: Res<AppTypeRegistry>,
node_clock: Res<NodeVectorClock>,
) {
while let Some(message) = bridge.try_recv() {
match message.message {
SyncMessage::JoinRequest {
node_id,
session_id,
session_secret,
last_known_clock,
join_type,
} => {
// Validate session ID matches
if session_id != current_session.session.id {
warn!("JoinRequest for wrong session: expected {}, got {}",
current_session.session.id.to_code(),
session_id.to_code());
continue;
}
1. **Session ID Check**: Verify the request is for the current session
- Mismatched session IDs are logged and rejected
- Prevents cross-session message pollution
2. **Secret Validation**: If the session has a secret, validate the provided credential
- Uses constant-time comparison via `validate_session_secret()`
- Rejects requests with invalid or missing secrets
// Validate session secret if configured
if let Some(expected_secret) = &current_session.session.secret {
match &session_secret {
Some(provided) if validate_session_secret(provided, expected_secret).is_ok() => {
info!("Session secret validated for node {}", node_id);
}
_ => {
error!("JoinRequest from {} rejected: invalid secret", node_id);
continue;
}
}
}
**Stage 2: Delta Feasibility Analysis**
info!("Handling JoinRequest from {} ({:?})", node_id, join_type);
The handler examines the `join_type` and `last_known_clock` fields:
// Decide: send deltas or full state?
let response = match (join_type, last_known_clock) {
(JoinType::Rejoin { .. }, Some(their_clock)) => {
// Check if we can send deltas
let missing_deltas = operation_log.get_all_operations_newer_than(&their_clock);
- **Fresh Join** (`join_type: Fresh` or `last_known_clock: None`):
- Always send `FullState` - no other option available
- Call `build_full_state_for_session()` to serialize all entities and components
const MAX_DELTA_OPS: usize = 1000;
if missing_deltas.len() <= MAX_DELTA_OPS {
info!("Sending {} deltas to rejoining node {}", missing_deltas.len(), node_id);
VersionedMessage::new(SyncMessage::MissingDeltas {
deltas: missing_deltas,
})
} else {
info!("Too many deltas ({}), sending FullState instead", missing_deltas.len());
build_full_state_for_session(
world,
&networked_entities,
&type_registry.read(),
&node_clock,
&session_id,
)
}
}
_ => {
// Fresh join - send full state
info!("Sending FullState to node {}", node_id);
build_full_state_for_session(
world,
&networked_entities,
&type_registry.read(),
&node_clock,
&session_id,
)
}
};
- **Rejoin** (`join_type: Rejoin` with `last_known_clock: Some(clock)`):
- Query operation log: `get_all_operations_newer_than(their_clock)`
- Count missing operations
- If count ≤ 1000: Send `MissingDeltas` message with operation list
- If count > 1000: Send `FullState` instead (more efficient than 1000+ small messages)
// Send response
if let Err(e) = bridge.send(response) {
error!("Failed to send join response: {}", e);
}
}
_ => {}
}
}
}
```
**Stage 3: Response Transmission**
Send the constructed response message (`FullState` or `MissingDeltas`) over the gossip bridge. Log errors if transmission fails.
**Design Rationale**:
The 1000-operation threshold is a heuristic based on message overhead: below this, individual delta messages are smaller than a full world snapshot. Above it, the cost of serializing and transmitting 1000+ small messages exceeds the cost of sending one large snapshot. This threshold can be tuned based on profiling.
## Temporary Lock-based Ownership
@@ -643,18 +471,20 @@ To prevent CRDT conflicts on complex operations (e.g., multi-step drawing, entit
**Design Principles:**
- **Initiator-driven**: Locks are requested immediately when user interaction begins (e.g., clicking an object), not after waiting for server approval
- **Optimistic**: The local client assumes the lock will succeed and allows immediate interaction; conflicts are resolved asynchronously
- **Temporary**: Locks auto-expire after 5 seconds (default) to prevent orphaned locks from crashed nodes
- **Temporary**: Locks auto-expire after 5 seconds to prevent orphaned locks from crashed nodes
- **Advisory**: Locks are checked before delta generation, but the underlying CRDT still handles conflicts if locks fail
- **Deterministic conflict resolution**: When two nodes request the same lock simultaneously, the higher node ID wins
- **Auto-release**: Disconnected nodes automatically lose all their locks
**Note**: The 5-second lock timeout is fixed in the initial implementation. Future versions may make this configurable per-entity-type or per-session based on UX requirements.
### Lock State Model
The lock system is implemented as a simple in-memory registry that tracks which entities are currently locked and by whom. Each lock contains:
- **Entity ID**: Which entity is locked
- **Holder**: Which node owns the lock
- **Acquisition timestamp**: When the lock was acquired
- **Timeout duration**: How long until auto-expiry (default 5 seconds)
- **Timeout duration**: How long until auto-expiry (5 seconds)
The `EntityLockRegistry` resource maintains a HashMap of entity ID to lock state, plus an acquisition history queue for rate limiting.
@@ -841,187 +671,85 @@ The integration is implemented through Bevy systems that run at specific lifecyc
The session lifecycle is managed through two primary Bevy systems: one for initialization on startup, and one for persisting state on shutdown.
```rust
/// Load or create session on startup
pub fn initialize_session_system(
mut commands: Commands,
db: Res<PersistenceDb>,
) {
let conn = db.lock().unwrap();
**Startup: `initialize_session_system`**
// Check for previous session
let session = match get_last_active_session(&conn) {
Ok(Some(session)) => {
info!("Resuming previous session: {}", session.id.to_code());
session
}
Ok(None) | Err(_) => {
info!("No previous session, creating new one");
let session = Session {
id: SessionId::new(),
name: None,
created_at: Utc::now(),
last_active: Utc::now(),
entity_count: 0,
state: SessionState::Created,
secret: None,
};
On application startup, this system queries the database for the most recent active session:
// Persist to database
if let Err(e) = save_session(&conn, &session) {
error!("Failed to save new session: {}", e);
}
1. **Session Discovery**: Query `sessions` table ordered by `last_active DESC` to find the most recent session
2. **Decision Point**:
- If a session exists: Resume it (enables automatic rejoin after crashes)
- If no session exists: Create a new session with a random UUID and default state
3. **Vector Clock Loading**: Load the session's vector clock from the database, or initialize an empty clock for new sessions
4. **Resource Initialization**: Insert `CurrentSession` resource containing session metadata and the saved vector clock
session
}
};
This enables crash recovery - if the app crashes and restarts, it automatically resumes the previous session and rejoins the network.
// Load vector clock for this session
let vector_clock = load_session_vector_clock(&conn, &session.id)
.unwrap_or_else(|_| VectorClock::new());
**Shutdown: `save_session_state_system`**
// Insert as resource
commands.insert_resource(CurrentSession {
session: session.clone(),
vector_clock_at_join: vector_clock.clone(),
});
On clean shutdown, this system persists the current session state:
info!("Session initialized: {}", session.id.to_code());
}
1. **Update Metadata**: Set `last_active` timestamp, count current entities, mark state as `Left`
2. **Save Session**: Write session metadata to the `sessions` table using `INSERT OR REPLACE`
3. **Save Vector Clock**: Transaction-based save that clears old clock entries and inserts current state for all known nodes
/// Save session state on shutdown
pub fn save_session_state_system(
current_session: Res<CurrentSession>,
node_clock: Res<NodeVectorClock>,
networked_entities: Query<&NetworkedEntity>,
db: Res<PersistenceDb>,
) {
let mut conn = db.lock().unwrap();
// Update session metadata
let mut session = current_session.session.clone();
session.last_active = Utc::now();
session.entity_count = networked_entities.iter().count();
session.state = SessionState::Left;
if let Err(e) = save_session(&conn, &session) {
error!("Failed to save session state: {}", e);
}
// Save vector clock
if let Err(e) = save_session_vector_clock(&conn, &session.id, &node_clock.clock) {
error!("Failed to save vector clock: {}", e);
}
info!("Session state saved for {}", session.id.to_code());
}
```
The vector clock save uses a transaction to ensure atomic updates - either all clock entries are saved, or none are. This prevents partial clock states that could cause sync issues on rejoin.
### Database Operations
```rust
/// Load the most recent active session
pub fn get_last_active_session(conn: &Connection) -> Result<Option<Session>> {
let row = conn.query_row(
"SELECT id, name, created_at, last_active, entity_count, state, secret
FROM sessions
ORDER BY last_active DESC
LIMIT 1",
[],
|row| {
Ok(Session {
id: SessionId(Uuid::from_slice(&row.get::<_, Vec<u8>>(0)?)?),
name: row.get(1)?,
created_at: timestamp_to_datetime(row.get(2)?),
last_active: timestamp_to_datetime(row.get(3)?),
entity_count: row.get(4)?,
state: parse_session_state(&row.get::<_, String>(5)?),
secret: row.get(6)?,
})
},
).optional()?;
The persistence layer provides several key database operations:
Ok(row)
}
**Session Queries**:
- `get_last_active_session()`: Queries the most recent session by `last_active DESC`, returns `Option<Session>`
- `save_session()`: Upserts session metadata using `INSERT OR REPLACE`, persisting all session fields
/// Save session metadata
pub fn save_session(conn: &Connection, session: &Session) -> Result<()> {
conn.execute(
"INSERT OR REPLACE INTO sessions (id, name, created_at, last_active, entity_count, state, secret)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
rusqlite::params![
session.id.0.as_bytes(),
session.name,
session.created_at.timestamp(),
session.last_active.timestamp(),
session.entity_count,
format!("{:?}", session.state).to_lowercase(),
session.secret,
],
)?;
**Vector Clock Persistence**:
- `load_session_vector_clock()`: Queries all `node_id`/`counter` pairs for a session, rebuilding the HashMap
- `save_session_vector_clock()`: Transactional save that deletes old entries then inserts current clock state
Ok(())
}
/// Load vector clock for a session
pub fn load_session_vector_clock(conn: &Connection, session_id: &SessionId) -> Result<VectorClock> {
let mut clock = VectorClock::new();
let mut stmt = conn.prepare(
"SELECT node_id, counter
FROM vector_clock
WHERE session_id = ?1"
)?;
let rows = stmt.query_map([session_id.0.as_bytes()], |row| {
let node_str: String = row.get(0)?;
let counter: u64 = row.get(1)?;
Ok((Uuid::parse_str(&node_str).unwrap(), counter))
})?;
for row in rows {
let (node_id, counter) = row?;
clock.clocks.insert(node_id, counter);
}
Ok(clock)
}
/// Save vector clock for a session
pub fn save_session_vector_clock(
conn: &mut Connection,
session_id: &SessionId,
clock: &VectorClock,
) -> Result<()> {
let tx = conn.transaction()?;
// Clear old clocks for this session
tx.execute(
"DELETE FROM vector_clock WHERE session_id = ?1",
[session_id.0.as_bytes()],
)?;
// Insert current clocks
for (node_id, counter) in &clock.clocks {
tx.execute(
"INSERT INTO vector_clock (session_id, node_id, counter, updated_at)
VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![
session_id.0.as_bytes(),
node_id.to_string(),
counter,
Utc::now().timestamp(),
],
)?;
}
tx.commit()?;
Ok(())
}
```
All operations use parameterized queries to prevent SQL injection and handle optional fields (like `name` and `secret`) correctly. Session IDs are stored as 16-byte BLOBs for efficiency.
## Implementation Roadmap
### Documentation Standards
All public APIs should follow Rust documentation conventions with comprehensive docstrings. Expected format:
```rust
/// Derives a session-specific ALPN identifier for network isolation.
///
/// This function computes a deterministic 32-byte BLAKE3 hash from the session ID,
/// using a domain separation prefix to prevent collisions with other protocol uses.
/// All peers independently compute the same ALPN from a given session code, enabling
/// decentralized coordination without a central authority.
///
/// # Arguments
/// * `session_id` - The unique session identifier
///
/// # Returns
/// A 32-byte BLAKE3 hash suitable for use as an ALPN protocol identifier
///
/// # Example
/// ```
/// let session = SessionId::new();
/// let alpn = derive_alpn_from_session(&session);
/// assert_eq!(alpn.len(), 32);
/// ```
///
/// # Security
/// The domain separation prefix (`/app/v1/session-id/`) ensures ALPNs cannot
/// collide with other protocol uses of the same hash space.
pub fn derive_alpn_from_session(session_id: &SessionId) -> [u8; 32]
```
Key documentation elements:
- **Summary**: One-line description of purpose
- **Detailed explanation**: How it works and why
- **Arguments**: All parameters with types and descriptions
- **Returns**: What the function produces
- **Examples**: Working code demonstrating usage
- **Panics/Errors**: Document failure conditions
- **Security/Safety**: Highlight security-critical behavior
### Phase 1: Session Data Model & Persistence
- Create `SessionId`, `Session`, `SessionState` types
- Add database schema migration
@@ -1121,6 +849,24 @@ pub fn save_session_vector_clock(
- Vector clock + LWW determines final state
- Locks are advisory (CRDTs provide safety net)
**Convergence Behavior**:
When the partition heals, the gossip protocol reconnects and nodes exchange their full state. The CRDT merge process happens automatically:
1. Vector clocks from both partitions are compared
2. Operations with concurrent clocks are merged using Last-Write-Wins (LWW) based on timestamps
3. The final state reflects the operation with the highest timestamp
4. Convergence typically completes within 1-2 seconds after reconnection
**UX Implications**:
Users in the "losing" partition may see their changes overridden. To minimize surprise:
- Visual indicator shows when the app is disconnected (yellow/orange connection status)
- On reconnection, entities that changed display a brief animation/highlight
- A notification shows "Reconnected - syncing changes" with entity count
- Changes made during disconnection that were overridden could be logged for potential manual recovery (future enhancement)
The system prioritizes consistency over preserving every edit during splits, which is acceptable for collaborative creative work where real-time coordination is expected.
## Security Considerations
Security in a peer-to-peer collaborative environment requires careful balance between usability and protection. This RFC addresses two primary security concerns: session access control and protocol integrity.
@@ -1134,34 +880,22 @@ The secret is hashed using BLAKE3 before comparison, ensuring that:
- Timing analysis cannot reveal secret length or content
- Fast validation (BLAKE3 is extremely performant)
```rust
/// Validate session secret using constant-time comparison
pub fn validate_session_secret(provided: &[u8], expected: &[u8]) -> Result<(), AuthError> {
use subtle::ConstantTimeEq;
let provided_hash = blake3::hash(provided);
let expected_hash = blake3::hash(expected);
if provided_hash.as_bytes().ct_eq(expected_hash.as_bytes()).into() {
Ok(())
} else {
Err(AuthError::InvalidSecret)
}
}
```
The validation function uses the `subtle` crate's `ConstantTimeEq` trait to perform constant-time comparison of the hashed secrets, preventing timing-based attacks that could leak information about the secret.
### Rate Limiting
```rust
// In EntityLockRegistry
const MAX_LOCKS_PER_NODE: usize = 100;
const MAX_LOCK_REQUESTS_PER_SEC: usize = 10;
To prevent abuse and buggy clients from monopolizing resources, the lock system implements two rate limits:
// Prevent lock spamming
if self.locks_held_by(node_id) >= MAX_LOCKS_PER_NODE {
return Err(LockError::TooManyLocks);
}
```
1. **Total Locks per Node**: Maximum 100 concurrent locks per node
- Prevents a single node from locking every entity in the session
- Ensures entities remain available for other participants
2. **Acquisition Rate**: Maximum 10 lock requests per second per node
- Prevents rapid lock spamming attacks
- Tracked via a rolling 60-second acquisition history queue
- Old entries are pruned to prevent memory growth
When a rate limit is exceeded, the lock request returns a `LockError::RateLimited` error. The requesting node's UI should display appropriate feedback to the user.
## Performance Considerations
@@ -1210,6 +944,14 @@ This streaming approach provides several UX benefits:
The 100x bandwidth improvement for delta-based rejoins remains the primary optimization, but streaming ensures fresh joins also have good UX.
**Implementation Notes**:
Several implementation details are deferred to Phase 3 (Hybrid Join Protocol):
- **Dependency Ordering**: Entities with parent-child relationships or component references will be sorted before streaming to ensure dependencies arrive before dependents
- **Message Size Limits**: Batch size (50-100 entities) will be dynamically adjusted based on average entity serialization size to stay under QUIC message size limits (~1 MB)
- **Retry Mechanism**: Missing entity ranges are tracked and can be requested via `RequestMissingEntities { start_index, end_index }` if gaps are detected
- **Cancellation**: If the user abandons the join before completion, in-flight batches are discarded and the partial state is cleared
## Testing Strategy
### Unit Tests