feat(libmarathon): added methods for #128

* migrated to `bytes` to ensure zero-copy

Signed-off-by: Sienna Meridian Satterwhite <sienna@r3t.io>
This commit is contained in:
2025-12-17 21:05:37 +00:00
parent 0bbc2c094a
commit bb4393bb9e
5 changed files with 407 additions and 28 deletions

View File

@@ -201,7 +201,7 @@ pub fn flush_to_sqlite(ops: &[PersistenceOp], conn: &mut Connection) -> Result<u
rusqlite::params![
entity_id.as_bytes(),
component_type,
data,
data.as_ref(),
current_timestamp(),
],
)?;
@@ -219,7 +219,7 @@ pub fn flush_to_sqlite(ops: &[PersistenceOp], conn: &mut Connection) -> Result<u
rusqlite::params![
&node_id.to_string(), // Convert UUID to string for SQLite TEXT column
sequence,
operation,
operation.as_ref(),
current_timestamp(),
],
)?;
@@ -613,6 +613,368 @@ pub fn load_session_vector_clock(
Ok(clock)
}
/// Loaded entity data from database
#[derive(Debug)]
pub struct LoadedEntity {
pub id: uuid::Uuid,
pub entity_type: String,
pub created_at: chrono::DateTime<chrono::Utc>,
pub updated_at: chrono::DateTime<chrono::Utc>,
pub components: Vec<LoadedComponent>,
}
/// Loaded component data from database
#[derive(Debug)]
pub struct LoadedComponent {
pub component_type: String,
pub data: bytes::Bytes,
}
/// Load all components for a single entity from the database
pub fn load_entity_components(
conn: &Connection,
entity_id: uuid::Uuid,
) -> Result<Vec<LoadedComponent>> {
let mut stmt = conn.prepare(
"SELECT component_type, data
FROM components
WHERE entity_id = ?1",
)?;
let components: Vec<LoadedComponent> = stmt
.query_map([entity_id.as_bytes()], |row| {
let data_vec: Vec<u8> = row.get(1)?;
Ok(LoadedComponent {
component_type: row.get(0)?,
data: bytes::Bytes::from(data_vec),
})
})?
.collect::<std::result::Result<Vec<_>, _>>()?;
Ok(components)
}
/// Load a single entity by network ID from the database
///
/// Returns None if the entity doesn't exist.
pub fn load_entity_by_network_id(
conn: &Connection,
network_id: uuid::Uuid,
) -> Result<Option<LoadedEntity>> {
// Load entity metadata
let entity_data = conn
.query_row(
"SELECT id, entity_type, created_at, updated_at
FROM entities
WHERE id = ?1",
[network_id.as_bytes()],
|row| {
let id_bytes: Vec<u8> = row.get(0)?;
let mut id_array = [0u8; 16];
id_array.copy_from_slice(&id_bytes);
let id = uuid::Uuid::from_bytes(id_array);
Ok((
id,
row.get::<_, String>(1)?,
row.get::<_, i64>(2)?,
row.get::<_, i64>(3)?,
))
},
)
.optional()?;
let Some((id, entity_type, created_at_ts, updated_at_ts)) = entity_data else {
return Ok(None);
};
// Load all components for this entity
let components = load_entity_components(conn, id)?;
Ok(Some(LoadedEntity {
id,
entity_type,
created_at: chrono::DateTime::from_timestamp(created_at_ts, 0)
.unwrap_or_else(chrono::Utc::now),
updated_at: chrono::DateTime::from_timestamp(updated_at_ts, 0)
.unwrap_or_else(chrono::Utc::now),
components,
}))
}
/// Load all entities from the database
///
/// This loads all entity metadata and their components.
/// Used during startup to rehydrate the game state.
pub fn load_all_entities(conn: &Connection) -> Result<Vec<LoadedEntity>> {
let mut stmt = conn.prepare(
"SELECT id, entity_type, created_at, updated_at
FROM entities
ORDER BY created_at ASC",
)?;
let entity_rows = stmt.query_map([], |row| {
let id_bytes: Vec<u8> = row.get(0)?;
let mut id_array = [0u8; 16];
id_array.copy_from_slice(&id_bytes);
let id = uuid::Uuid::from_bytes(id_array);
Ok((
id,
row.get::<_, String>(1)?,
row.get::<_, i64>(2)?,
row.get::<_, i64>(3)?,
))
})?;
let mut entities = Vec::new();
for row in entity_rows {
let (id, entity_type, created_at_ts, updated_at_ts) = row?;
// Load all components for this entity
let components = load_entity_components(conn, id)?;
entities.push(LoadedEntity {
id,
entity_type,
created_at: chrono::DateTime::from_timestamp(created_at_ts, 0)
.unwrap_or_else(chrono::Utc::now),
updated_at: chrono::DateTime::from_timestamp(updated_at_ts, 0)
.unwrap_or_else(chrono::Utc::now),
components,
});
}
Ok(entities)
}
/// Load entities by entity type from the database
///
/// Returns all entities matching the specified entity_type.
pub fn load_entities_by_type(conn: &Connection, entity_type: &str) -> Result<Vec<LoadedEntity>> {
let mut stmt = conn.prepare(
"SELECT id, entity_type, created_at, updated_at
FROM entities
WHERE entity_type = ?1
ORDER BY created_at ASC",
)?;
let entity_rows = stmt.query_map([entity_type], |row| {
let id_bytes: Vec<u8> = row.get(0)?;
let mut id_array = [0u8; 16];
id_array.copy_from_slice(&id_bytes);
let id = uuid::Uuid::from_bytes(id_array);
Ok((
id,
row.get::<_, String>(1)?,
row.get::<_, i64>(2)?,
row.get::<_, i64>(3)?,
))
})?;
let mut entities = Vec::new();
for row in entity_rows {
let (id, entity_type, created_at_ts, updated_at_ts) = row?;
// Load all components for this entity
let components = load_entity_components(conn, id)?;
entities.push(LoadedEntity {
id,
entity_type,
created_at: chrono::DateTime::from_timestamp(created_at_ts, 0)
.unwrap_or_else(chrono::Utc::now),
updated_at: chrono::DateTime::from_timestamp(updated_at_ts, 0)
.unwrap_or_else(chrono::Utc::now),
components,
});
}
Ok(entities)
}
/// Rehydrate a loaded entity into the Bevy world
///
/// Takes a `LoadedEntity` from the database and spawns it as a new Bevy entity,
/// deserializing and inserting all components using the ComponentTypeRegistry.
///
/// # Arguments
///
/// * `loaded_entity` - The entity data loaded from SQLite
/// * `world` - The Bevy world to spawn the entity into
/// * `component_registry` - Type registry for component deserialization
///
/// # Returns
///
/// The spawned Bevy `Entity` on success
///
/// # Errors
///
/// Returns an error if:
/// - Component deserialization fails
/// - Component type is not registered
/// - Component insertion fails
pub fn rehydrate_entity(
loaded_entity: LoadedEntity,
world: &mut bevy::prelude::World,
component_registry: &crate::persistence::ComponentTypeRegistry,
) -> Result<bevy::prelude::Entity> {
use bevy::prelude::*;
use crate::networking::NetworkedEntity;
// Spawn a new entity
let entity = world.spawn_empty().id();
info!(
"Rehydrating entity {:?} with type {} and {} components",
loaded_entity.id,
loaded_entity.entity_type,
loaded_entity.components.len()
);
// Deserialize and insert each component
for component in &loaded_entity.components {
// Get deserialization function for this component type
let deserialize_fn = component_registry
.get_deserialize_fn_by_path(&component.component_type)
.ok_or_else(|| {
PersistenceError::Deserialization(format!(
"No deserialize function registered for component type: {}",
component.component_type
))
})?;
// Get insert function for this component type
let insert_fn = component_registry
.get_insert_fn_by_path(&component.component_type)
.ok_or_else(|| {
PersistenceError::Deserialization(format!(
"No insert function registered for component type: {}",
component.component_type
))
})?;
// Deserialize the component from bytes
let deserialized = deserialize_fn(&component.data).map_err(|e| {
PersistenceError::Deserialization(format!(
"Failed to deserialize component {}: {}",
component.component_type, e
))
})?;
// Insert the component into the entity
// Get an EntityWorldMut to pass to the insert function
let mut entity_mut = world.entity_mut(entity);
insert_fn(&mut entity_mut, deserialized);
debug!(
"Inserted component {} into entity {:?}",
component.component_type, entity
);
}
// Add the NetworkedEntity component with the persisted network_id
// This ensures the entity maintains its identity across restarts
world.entity_mut(entity).insert(NetworkedEntity {
network_id: loaded_entity.id,
owner_node_id: uuid::Uuid::nil(), // Will be set by network system if needed
});
// Add the Persisted marker component
world
.entity_mut(entity)
.insert(crate::persistence::Persisted {
network_id: loaded_entity.id,
});
info!(
"Successfully rehydrated entity {:?} as Bevy entity {:?}",
loaded_entity.id, entity
);
Ok(entity)
}
/// Rehydrate all entities from the database into the Bevy world
///
/// This function is called during startup to restore the entire persisted
/// state. It loads all entities from SQLite and spawns them into the Bevy world
/// with all their components.
///
/// # Arguments
///
/// * `world` - The Bevy world to spawn entities into
///
/// # Errors
///
/// Returns an error if:
/// - Database connection fails
/// - Entity loading fails
/// - Entity rehydration fails
pub fn rehydrate_all_entities(world: &mut bevy::prelude::World) -> Result<()> {
use bevy::prelude::*;
// Get database connection from resource
let loaded_entities = {
let db_res = world.resource::<crate::persistence::PersistenceDb>();
let conn = db_res
.conn
.lock()
.map_err(|e| PersistenceError::Other(format!("Failed to lock database: {}", e)))?;
// Load all entities from database
load_all_entities(&conn)?
};
info!("Loaded {} entities from database", loaded_entities.len());
if loaded_entities.is_empty() {
info!("No entities to rehydrate");
return Ok(());
}
// Get component registry
let component_registry = {
let registry_res = world.resource::<crate::persistence::ComponentTypeRegistryResource>();
registry_res.0
};
// Rehydrate each entity
let mut rehydrated_count = 0;
let mut failed_count = 0;
for loaded_entity in loaded_entities {
match rehydrate_entity(loaded_entity, world, component_registry) {
| Ok(entity) => {
rehydrated_count += 1;
debug!("Rehydrated entity {:?}", entity);
},
| Err(e) => {
failed_count += 1;
error!("Failed to rehydrate entity: {}", e);
},
}
}
info!(
"Entity rehydration complete: {} succeeded, {} failed",
rehydrated_count, failed_count
);
if failed_count > 0 {
warn!(
"{} entities failed to rehydrate - check logs for details",
failed_count
);
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
@@ -656,7 +1018,7 @@ mod tests {
PersistenceOp::UpsertComponent {
entity_id,
component_type: "Transform".to_string(),
data: vec![1, 2, 3, 4],
data: bytes::Bytes::from(vec![1, 2, 3, 4]),
},
];

View File

@@ -91,8 +91,12 @@ impl Plugin for PersistencePlugin {
.insert_resource(PendingFlushTasks::default())
.init_resource::<ComponentTypeRegistryResource>();
// Add startup system
app.add_systems(Startup, persistence_startup_system);
// Add startup systems
// First initialize the database, then rehydrate entities
app.add_systems(Startup, (
persistence_startup_system,
rehydrate_entities_system,
).chain());
// Add systems in the appropriate schedule
app.add_systems(
@@ -159,6 +163,19 @@ fn persistence_startup_system(db: Res<PersistenceDb>, mut metrics: ResMut<Persis
}
}
/// Exclusive startup system to rehydrate entities from database
///
/// This system runs after `persistence_startup_system` and loads all entities
/// from SQLite, deserializing and spawning them into the Bevy world with all
/// their components.
fn rehydrate_entities_system(world: &mut World) {
if let Err(e) = crate::persistence::database::rehydrate_all_entities(world) {
error!("Failed to rehydrate entities from database: {}", e);
} else {
info!("Successfully rehydrated entities from database");
}
}
/// System to collect dirty entities using Bevy's change detection
///
/// This system tracks changes to the `Persisted` component. When `Persisted` is

View File

@@ -479,7 +479,7 @@ mod tests {
.add(PersistenceOp::UpsertComponent {
entity_id,
component_type: "Transform".to_string(),
data: vec![1, 2, 3],
data: bytes::Bytes::from(vec![1, 2, 3]),
})
.unwrap();

View File

@@ -27,7 +27,7 @@ pub struct ComponentMeta {
pub deserialize_fn: fn(&[u8]) -> Result<Box<dyn std::any::Any>>,
/// Serialization function that reads from an entity (returns None if entity doesn't have this component)
pub serialize_fn: fn(&bevy::ecs::world::World, bevy::ecs::entity::Entity) -> Option<Vec<u8>>,
pub serialize_fn: fn(&bevy::ecs::world::World, bevy::ecs::entity::Entity) -> Option<bytes::Bytes>,
/// Insert function that takes a boxed component and inserts it into an entity
pub insert_fn: fn(&mut bevy::ecs::world::EntityWorldMut, Box<dyn std::any::Any>),
@@ -47,7 +47,7 @@ pub struct ComponentTypeRegistry {
discriminant_to_deserializer: HashMap<u16, fn(&[u8]) -> Result<Box<dyn std::any::Any>>>,
/// Discriminant to serialization function
discriminant_to_serializer: HashMap<u16, fn(&bevy::ecs::world::World, bevy::ecs::entity::Entity) -> Option<Vec<u8>>>,
discriminant_to_serializer: HashMap<u16, fn(&bevy::ecs::world::World, bevy::ecs::entity::Entity) -> Option<bytes::Bytes>>,
/// Discriminant to insert function
discriminant_to_inserter: HashMap<u16, fn(&mut bevy::ecs::world::EntityWorldMut, Box<dyn std::any::Any>)>,
@@ -196,7 +196,7 @@ impl ComponentTypeRegistry {
&self,
world: &bevy::ecs::world::World,
entity: bevy::ecs::entity::Entity,
) -> Vec<(u16, &'static str, Vec<u8>)> {
) -> Vec<(u16, &'static str, bytes::Bytes)> {
let mut results = Vec::new();
for (&discriminant, &serialize_fn) in &self.discriminant_to_serializer {

View File

@@ -105,14 +105,14 @@ pub enum PersistenceOp {
UpsertComponent {
entity_id: EntityId,
component_type: String,
data: Vec<u8>,
data: bytes::Bytes,
},
/// Log an operation for CRDT sync
LogOperation {
node_id: NodeId,
sequence: u64,
operation: Vec<u8>,
operation: bytes::Bytes,
},
/// Update vector clock for causality tracking
@@ -473,7 +473,7 @@ mod tests {
buffer.add(PersistenceOp::UpsertComponent {
entity_id,
component_type: "Transform".to_string(),
data: vec![1, 2, 3],
data: bytes::Bytes::from(vec![1, 2, 3]),
})?;
assert_eq!(buffer.len(), 1);
@@ -481,7 +481,7 @@ mod tests {
buffer.add(PersistenceOp::UpsertComponent {
entity_id,
component_type: "Transform".to_string(),
data: vec![4, 5, 6],
data: bytes::Bytes::from(vec![4, 5, 6]),
})?;
assert_eq!(buffer.len(), 1);
@@ -489,7 +489,7 @@ mod tests {
let ops = buffer.take_operations();
assert_eq!(ops.len(), 1);
if let PersistenceOp::UpsertComponent { data, .. } = &ops[0] {
assert_eq!(data, &vec![4, 5, 6]);
assert_eq!(data.as_ref(), &[4, 5, 6]);
} else {
panic!("Expected UpsertComponent");
}
@@ -506,7 +506,7 @@ mod tests {
.add(PersistenceOp::UpsertComponent {
entity_id,
component_type: "Transform".to_string(),
data: vec![1, 2, 3],
data: bytes::Bytes::from(vec![1, 2, 3]),
})
.expect("Should successfully add Transform");
@@ -515,7 +515,7 @@ mod tests {
.add(PersistenceOp::UpsertComponent {
entity_id,
component_type: "Velocity".to_string(),
data: vec![4, 5, 6],
data: bytes::Bytes::from(vec![4, 5, 6]),
})
.expect("Should successfully add Velocity");
@@ -652,7 +652,7 @@ mod tests {
let log_op = PersistenceOp::LogOperation {
node_id,
sequence: 1,
operation: vec![1, 2, 3],
operation: bytes::Bytes::from(vec![1, 2, 3]),
};
let vector_clock_op = PersistenceOp::UpdateVectorClock {
@@ -689,7 +689,7 @@ mod tests {
buffer.add(PersistenceOp::UpsertComponent {
entity_id,
component_type: "Transform".to_string(),
data: vec![i],
data: bytes::Bytes::from(vec![i]),
})?;
}
@@ -700,7 +700,7 @@ mod tests {
let ops = buffer.take_operations();
assert_eq!(ops.len(), 1);
if let PersistenceOp::UpsertComponent { data, .. } = &ops[0] {
assert_eq!(data, &vec![9]);
assert_eq!(data.as_ref(), &[9]);
} else {
panic!("Expected UpsertComponent");
}
@@ -709,7 +709,7 @@ mod tests {
buffer.add(PersistenceOp::UpsertComponent {
entity_id,
component_type: "Transform".to_string(),
data: vec![100],
data: bytes::Bytes::from(vec![100]),
})?;
assert_eq!(buffer.len(), 1);
@@ -726,13 +726,13 @@ mod tests {
buffer.add(PersistenceOp::UpsertComponent {
entity_id: entity1,
component_type: "Transform".to_string(),
data: vec![1],
data: bytes::Bytes::from(vec![1]),
})?;
buffer.add(PersistenceOp::UpsertComponent {
entity_id: entity2,
component_type: "Transform".to_string(),
data: vec![2],
data: bytes::Bytes::from(vec![2]),
})?;
// Should have 2 operations (different entities)
@@ -742,7 +742,7 @@ mod tests {
buffer.add(PersistenceOp::UpsertComponent {
entity_id: entity1,
component_type: "Transform".to_string(),
data: vec![3],
data: bytes::Bytes::from(vec![3]),
})?;
// Still 2 operations (first was replaced in-place)
@@ -761,7 +761,7 @@ mod tests {
.add_with_default_priority(PersistenceOp::LogOperation {
node_id,
sequence: 1,
operation: vec![1, 2, 3],
operation: bytes::Bytes::from(vec![1, 2, 3]),
})
.unwrap();
@@ -776,7 +776,7 @@ mod tests {
let entity_id = EntityId::new_v4();
// Create 11MB component (exceeds 10MB limit)
let oversized_data = vec![0u8; 11 * 1024 * 1024];
let oversized_data = bytes::Bytes::from(vec![0u8; 11 * 1024 * 1024]);
let result = buffer.add(PersistenceOp::UpsertComponent {
entity_id,
@@ -809,7 +809,7 @@ mod tests {
let entity_id = EntityId::new_v4();
// Create exactly 10MB component (at limit)
let max_data = vec![0u8; 10 * 1024 * 1024];
let max_data = bytes::Bytes::from(vec![0u8; 10 * 1024 * 1024]);
let result = buffer.add(PersistenceOp::UpsertComponent {
entity_id,
@@ -824,7 +824,7 @@ mod tests {
#[test]
fn test_oversized_operation_returns_error() {
let mut buffer = WriteBuffer::new(100);
let oversized_op = vec![0u8; 11 * 1024 * 1024];
let oversized_op = bytes::Bytes::from(vec![0u8; 11 * 1024 * 1024]);
let result = buffer.add(PersistenceOp::LogOperation {
node_id: uuid::Uuid::new_v4(),
@@ -860,7 +860,7 @@ mod tests {
for size in sizes {
let mut buffer = WriteBuffer::new(100);
let data = vec![0u8; size];
let data = bytes::Bytes::from(vec![0u8; size]);
let result = buffer.add(PersistenceOp::UpsertComponent {
entity_id: uuid::Uuid::new_v4(),