Signed-off-by: Sienna Meridian Satterwhite <sienna@r3t.io>
This commit is contained in:
2025-11-16 11:50:49 +00:00
parent 1bd664fd2a
commit 888e5d303c
33 changed files with 766 additions and 460 deletions

View File

@@ -1,6 +1,14 @@
use crate::error::Result;
use crate::models::*;
use rusqlite::{Connection, OpenFlags, Row, params};
use rusqlite::{
Connection,
OpenFlags,
Row,
params,
};
use crate::{
error::Result,
models::*,
};
pub struct ChatDb {
conn: Connection,
@@ -27,7 +35,10 @@ impl ChatDb {
start_date: Option<chrono::DateTime<chrono::Utc>>,
end_date: Option<chrono::DateTime<chrono::Utc>>,
) -> Result<Vec<Message>> {
use chrono::{TimeZone, Utc};
use chrono::{
TimeZone,
Utc,
};
// Default date range: January 1, 2024 to now
let start =
@@ -84,7 +95,7 @@ impl ChatDb {
WHERE h.id = ?
GROUP BY c.ROWID
ORDER BY msg_count DESC
LIMIT 1"
LIMIT 1",
)?;
let chat = stmt.query_row(params![phone_number], |row| {
@@ -98,7 +109,9 @@ impl ChatDb {
room_name: row.get(6)?,
is_archived: row.get::<_, i64>(7)? != 0,
is_filtered: row.get::<_, i64>(8)? != 0,
last_read_message_timestamp: row.get::<_, Option<i64>>(9)?.map(apple_timestamp_to_datetime),
last_read_message_timestamp: row
.get::<_, Option<i64>>(9)?
.map(apple_timestamp_to_datetime),
})
})?;

View File

@@ -1,6 +1,7 @@
//! Data access layer for iMessage chat.db
//!
//! This library provides a read-only interface to query messages from a specific conversation.
//! This library provides a read-only interface to query messages from a
//! specific conversation.
//!
//! # Safety
//!
@@ -20,12 +21,18 @@
//! # Ok::<(), lib::ChatDbError>(())
//! ```
mod db;
mod error;
mod models;
mod db;
pub mod sync;
pub mod persistence;
pub mod sync;
pub use error::{ChatDbError, Result};
pub use models::{Message, Chat};
pub use db::ChatDb;
pub use error::{
ChatDbError,
Result,
};
pub use models::{
Chat,
Message,
};

View File

@@ -1,5 +1,11 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use chrono::{
DateTime,
Utc,
};
use serde::{
Deserialize,
Serialize,
};
/// Represents a message in the iMessage database
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -41,7 +47,8 @@ pub struct Chat {
pub last_read_message_timestamp: Option<DateTime<Utc>>,
}
/// Helper function to convert Apple's Cocoa timestamp (seconds since 2001-01-01) to DateTime
/// Helper function to convert Apple's Cocoa timestamp (seconds since
/// 2001-01-01) to DateTime
pub fn apple_timestamp_to_datetime(timestamp: i64) -> DateTime<Utc> {
// Apple's Cocoa timestamps are in nanoseconds since 2001-01-01 00:00:00 UTC
// Convert to Unix timestamp (seconds since 1970-01-01 00:00:00 UTC)
@@ -50,7 +57,8 @@ pub fn apple_timestamp_to_datetime(timestamp: i64) -> DateTime<Utc> {
let seconds = timestamp / 1_000_000_000 + APPLE_EPOCH_OFFSET;
let nanos = (timestamp % 1_000_000_000) as u32;
DateTime::from_timestamp(seconds, nanos).unwrap_or_else(|| DateTime::from_timestamp(0, 0).unwrap())
DateTime::from_timestamp(seconds, nanos)
.unwrap_or_else(|| DateTime::from_timestamp(0, 0).unwrap())
}
/// Helper function to convert DateTime to Apple's Cocoa timestamp
@@ -65,8 +73,13 @@ pub fn datetime_to_apple_timestamp(dt: DateTime<Utc>) -> i64 {
#[cfg(test)]
mod tests {
use chrono::{
Datelike,
TimeZone,
Timelike,
};
use super::*;
use chrono::{Datelike, TimeZone, Timelike};
#[test]
fn test_apple_timestamp_to_datetime_zero() {

View File

@@ -1,9 +1,14 @@
//! Configuration for the persistence layer
use crate::persistence::error::Result;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use serde::{
Deserialize,
Serialize,
};
use crate::persistence::error::Result;
/// Default critical flush delay in milliseconds
const DEFAULT_CRITICAL_FLUSH_DELAY_MS: u64 = 1000;

View File

@@ -1,11 +1,21 @@
//! Database schema and operations for persistence layer
use crate::persistence::types::*;
use crate::persistence::error::{PersistenceError, Result};
use chrono::Utc;
use rusqlite::{Connection, OptionalExtension};
use std::path::Path;
use chrono::Utc;
use rusqlite::{
Connection,
OptionalExtension,
};
use crate::persistence::{
error::{
PersistenceError,
Result,
},
types::*,
};
/// Default SQLite page size in bytes (4KB)
const DEFAULT_PAGE_SIZE: i64 = 4096;
@@ -164,7 +174,7 @@ pub fn flush_to_sqlite(ops: &[PersistenceOp], conn: &mut Connection) -> Result<u
for op in ops {
match op {
PersistenceOp::UpsertEntity { id, data } => {
| PersistenceOp::UpsertEntity { id, data } => {
tx.execute(
"INSERT OR REPLACE INTO entities (id, entity_type, created_at, updated_at)
VALUES (?1, ?2, ?3, ?4)",
@@ -176,9 +186,9 @@ pub fn flush_to_sqlite(ops: &[PersistenceOp], conn: &mut Connection) -> Result<u
],
)?;
count += 1;
}
},
PersistenceOp::UpsertComponent {
| PersistenceOp::UpsertComponent {
entity_id,
component_type,
data,
@@ -194,9 +204,9 @@ pub fn flush_to_sqlite(ops: &[PersistenceOp], conn: &mut Connection) -> Result<u
],
)?;
count += 1;
}
},
PersistenceOp::LogOperation {
| PersistenceOp::LogOperation {
node_id,
sequence,
operation,
@@ -212,23 +222,26 @@ pub fn flush_to_sqlite(ops: &[PersistenceOp], conn: &mut Connection) -> Result<u
],
)?;
count += 1;
}
},
PersistenceOp::UpdateVectorClock { node_id, counter } => {
| PersistenceOp::UpdateVectorClock { node_id, counter } => {
tx.execute(
"INSERT OR REPLACE INTO vector_clock (node_id, counter, updated_at)
VALUES (?1, ?2, ?3)",
rusqlite::params![node_id, counter, current_timestamp()],
)?;
count += 1;
}
},
PersistenceOp::DeleteEntity { id } => {
tx.execute("DELETE FROM entities WHERE id = ?1", rusqlite::params![id.as_bytes()])?;
| PersistenceOp::DeleteEntity { id } => {
tx.execute(
"DELETE FROM entities WHERE id = ?1",
rusqlite::params![id.as_bytes()],
)?;
count += 1;
}
},
PersistenceOp::DeleteComponent {
| PersistenceOp::DeleteComponent {
entity_id,
component_type,
} => {
@@ -237,7 +250,7 @@ pub fn flush_to_sqlite(ops: &[PersistenceOp], conn: &mut Connection) -> Result<u
rusqlite::params![entity_id.as_bytes(), component_type],
)?;
count += 1;
}
},
}
}
@@ -255,7 +268,8 @@ pub fn flush_to_sqlite(ops: &[PersistenceOp], conn: &mut Connection) -> Result<u
///
/// # Parameters
/// - `conn`: Mutable reference to the SQLite connection
/// - `mode`: Checkpoint mode controlling blocking behavior (see [`CheckpointMode`])
/// - `mode`: Checkpoint mode controlling blocking behavior (see
/// [`CheckpointMode`])
///
/// # Returns
/// - `Ok(CheckpointInfo)`: Information about the checkpoint operation
@@ -276,17 +290,19 @@ pub fn flush_to_sqlite(ops: &[PersistenceOp], conn: &mut Connection) -> Result<u
/// ```
pub fn checkpoint_wal(conn: &mut Connection, mode: CheckpointMode) -> Result<CheckpointInfo> {
let mode_str = match mode {
CheckpointMode::Passive => "PASSIVE",
CheckpointMode::Full => "FULL",
CheckpointMode::Restart => "RESTART",
CheckpointMode::Truncate => "TRUNCATE",
| CheckpointMode::Passive => "PASSIVE",
| CheckpointMode::Full => "FULL",
| CheckpointMode::Restart => "RESTART",
| CheckpointMode::Truncate => "TRUNCATE",
};
let query = format!("PRAGMA wal_checkpoint({})", mode_str);
// Returns (busy, log_pages, checkpointed_pages)
let (busy, log_pages, checkpointed_pages): (i32, i32, i32) =
conn.query_row(&query, [], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))?;
conn.query_row(&query, [], |row| {
Ok((row.get(0)?, row.get(1)?, row.get(2)?))
})?;
// Update checkpoint state
conn.execute(
@@ -303,15 +319,16 @@ pub fn checkpoint_wal(conn: &mut Connection, mode: CheckpointMode) -> Result<Che
/// Get the size of the WAL file in bytes
///
/// This checks the actual WAL file size on disk without triggering a checkpoint.
/// Large WAL files consume disk space and can slow down recovery, so monitoring
/// size helps maintain optimal performance.
/// This checks the actual WAL file size on disk without triggering a
/// checkpoint. Large WAL files consume disk space and can slow down recovery,
/// so monitoring size helps maintain optimal performance.
///
/// # Parameters
/// - `conn`: Reference to the SQLite connection
///
/// # Returns
/// - `Ok(i64)`: WAL file size in bytes (0 if no WAL exists or in-memory database)
/// - `Ok(i64)`: WAL file size in bytes (0 if no WAL exists or in-memory
/// database)
/// - `Err`: If the database path query fails
///
/// # Note
@@ -332,8 +349,8 @@ pub fn get_wal_size(conn: &Connection) -> Result<i64> {
// Check if WAL file exists and get its size
match std::fs::metadata(&wal_path) {
Ok(metadata) => Ok(metadata.len() as i64),
Err(_) => Ok(0), // WAL doesn't exist yet
| Ok(metadata) => Ok(metadata.len() as i64),
| Err(_) => Ok(0), // WAL doesn't exist yet
}
}
@@ -360,8 +377,9 @@ pub struct CheckpointInfo {
/// Set a session state value in the database
///
/// Session state is used to track application lifecycle events and detect crashes.
/// Values persist across restarts, enabling crash detection and recovery.
/// Session state is used to track application lifecycle events and detect
/// crashes. Values persist across restarts, enabling crash detection and
/// recovery.
///
/// # Parameters
/// - `conn`: Mutable reference to the SQLite connection
@@ -404,12 +422,13 @@ pub fn get_session_state(conn: &Connection, key: &str) -> Result<Option<String>>
/// Check if the previous session had a clean shutdown
///
/// This is critical for crash detection. When the application starts, this checks
/// if the previous session ended cleanly. If not, it indicates a crash occurred,
/// and recovery procedures may be needed.
/// This is critical for crash detection. When the application starts, this
/// checks if the previous session ended cleanly. If not, it indicates a crash
/// occurred, and recovery procedures may be needed.
///
/// **Side effect**: Resets the clean_shutdown flag to "false" for the current session.
/// Call [`mark_clean_shutdown`] during normal shutdown to set it back to "true".
/// **Side effect**: Resets the clean_shutdown flag to "false" for the current
/// session. Call [`mark_clean_shutdown`] during normal shutdown to set it back
/// to "true".
///
/// # Parameters
/// - `conn`: Mutable reference to the SQLite connection (mutates session state)
@@ -537,7 +556,11 @@ mod tests {
// After checking clean shutdown, flag should be reset to false
// So if we check again without marking, it should report as crash
let value = get_session_state(&conn, "clean_shutdown")?;
assert_eq!(value, Some("false".to_string()), "Flag should be reset after check");
assert_eq!(
value,
Some("false".to_string()),
"Flag should be reset after check"
);
Ok(())
}

View File

@@ -42,16 +42,16 @@ pub enum PersistenceError {
impl fmt::Display for PersistenceError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Database(err) => write!(f, "Database error: {}", err),
Self::Serialization(err) => write!(f, "Serialization error: {}", err),
Self::Deserialization(msg) => write!(f, "Deserialization error: {}", msg),
Self::Config(msg) => write!(f, "Configuration error: {}", msg),
Self::Io(err) => write!(f, "I/O error: {}", err),
Self::TypeNotRegistered(type_name) => {
| Self::Database(err) => write!(f, "Database error: {}", err),
| Self::Serialization(err) => write!(f, "Serialization error: {}", err),
| Self::Deserialization(msg) => write!(f, "Deserialization error: {}", msg),
| Self::Config(msg) => write!(f, "Configuration error: {}", msg),
| Self::Io(err) => write!(f, "I/O error: {}", err),
| Self::TypeNotRegistered(type_name) => {
write!(f, "Type not registered in type registry: {}", type_name)
}
Self::NotFound(msg) => write!(f, "Not found: {}", msg),
Self::CircuitBreakerOpen {
},
| Self::NotFound(msg) => write!(f, "Not found: {}", msg),
| Self::CircuitBreakerOpen {
consecutive_failures,
retry_after_secs,
} => write!(
@@ -59,7 +59,7 @@ impl fmt::Display for PersistenceError {
"Circuit breaker open after {} consecutive failures, retry after {} seconds",
consecutive_failures, retry_after_secs
),
Self::Other(msg) => write!(f, "{}", msg),
| Self::Other(msg) => write!(f, "{}", msg),
}
}
}
@@ -67,10 +67,10 @@ impl fmt::Display for PersistenceError {
impl std::error::Error for PersistenceError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Database(err) => Some(err),
Self::Serialization(err) => Some(err),
Self::Io(err) => Some(err),
_ => None,
| Self::Database(err) => Some(err),
| Self::Serialization(err) => Some(err),
| Self::Io(err) => Some(err),
| _ => None,
}
}
}

View File

@@ -1,7 +1,11 @@
//! Health monitoring and error recovery for persistence layer
use std::time::{
Duration,
Instant,
};
use bevy::prelude::*;
use std::time::{Duration, Instant};
/// Base delay for exponential backoff in milliseconds
const BASE_RETRY_DELAY_MS: u64 = 1000; // 1 second
@@ -52,11 +56,10 @@ impl Default for PersistenceHealth {
}
impl PersistenceHealth {
/// Circuit breaker threshold - open after this many consecutive failures
pub const CIRCUIT_BREAKER_THRESHOLD: u32 = 5;
/// How long to keep circuit breaker open before attempting recovery
pub const CIRCUIT_BREAKER_COOLDOWN: Duration = Duration::from_secs(60);
/// Circuit breaker threshold - open after this many consecutive failures
pub const CIRCUIT_BREAKER_THRESHOLD: u32 = 5;
/// Record a successful flush
pub fn record_flush_success(&mut self) {
@@ -102,9 +105,9 @@ impl PersistenceHealth {
/// Check if we should attempt operations (circuit breaker state)
///
/// **CRITICAL FIX**: Now takes `&mut self` to properly reset the circuit breaker
/// after cooldown expires. This prevents the circuit breaker from remaining
/// permanently open after one post-cooldown failure.
/// **CRITICAL FIX**: Now takes `&mut self` to properly reset the circuit
/// breaker after cooldown expires. This prevents the circuit breaker
/// from remaining permanently open after one post-cooldown failure.
pub fn should_attempt_operation(&mut self) -> bool {
if !self.circuit_breaker_open {
return true;
@@ -114,7 +117,9 @@ impl PersistenceHealth {
if let Some(opened_at) = self.circuit_breaker_opened_at {
if opened_at.elapsed() >= Self::CIRCUIT_BREAKER_COOLDOWN {
// Transition to half-open state by resetting the breaker
info!("Circuit breaker cooldown elapsed - entering half-open state (testing recovery)");
info!(
"Circuit breaker cooldown elapsed - entering half-open state (testing recovery)"
);
self.circuit_breaker_open = false;
self.circuit_breaker_opened_at = None;
// consecutive_flush_failures is kept to track if this probe succeeds
@@ -128,7 +133,8 @@ impl PersistenceHealth {
/// Get exponential backoff delay based on consecutive failures
pub fn get_retry_delay(&self) -> Duration {
// Exponential backoff: 1s, 2s, 4s, 8s, 16s, max 30s
let delay_ms = BASE_RETRY_DELAY_MS * 2u64.pow(self.consecutive_flush_failures.min(MAX_BACKOFF_EXPONENT));
let delay_ms = BASE_RETRY_DELAY_MS *
2u64.pow(self.consecutive_flush_failures.min(MAX_BACKOFF_EXPONENT));
Duration::from_millis(delay_ms.min(MAX_RETRY_DELAY_MS))
}
}

View File

@@ -18,9 +18,10 @@
//! }
//! ```
use crate::persistence::*;
use bevy::prelude::*;
use crate::persistence::*;
/// Application lifecycle events that require persistence handling
///
/// These events are critical moments where data must be flushed immediately
@@ -39,9 +40,11 @@ pub enum AppLifecycleEvent {
/// 5 seconds to complete critical tasks before suspension.
DidEnterBackground,
/// Application will enter foreground (iOS: `applicationWillEnterForeground`)
/// Application will enter foreground (iOS:
/// `applicationWillEnterForeground`)
///
/// Sent when the app is about to enter the foreground (user returning to app).
/// Sent when the app is about to enter the foreground (user returning to
/// app).
WillEnterForeground,
/// Application did become active (iOS: `applicationDidBecomeActive`)
@@ -51,7 +54,8 @@ pub enum AppLifecycleEvent {
/// Application will terminate (iOS: `applicationWillTerminate`)
///
/// Sent when the app is about to terminate. Similar to shutdown but from OS.
/// Sent when the app is about to terminate. Similar to shutdown but from
/// OS.
WillTerminate,
}
@@ -69,7 +73,7 @@ pub fn lifecycle_event_system(
) {
for event in events.read() {
match event {
AppLifecycleEvent::WillResignActive => {
| AppLifecycleEvent::WillResignActive => {
// App is becoming inactive - perform immediate flush
info!("App will resign active - performing immediate flush");
@@ -79,9 +83,9 @@ pub fn lifecycle_event_system(
} else {
health.record_flush_success();
}
}
},
AppLifecycleEvent::DidEnterBackground => {
| AppLifecycleEvent::DidEnterBackground => {
// App entered background - perform immediate flush and checkpoint
info!("App entered background - performing immediate flush and checkpoint");
@@ -96,47 +100,50 @@ pub fn lifecycle_event_system(
// Also checkpoint the WAL to ensure durability
let start = std::time::Instant::now();
match db.lock() {
Ok(mut conn) => {
match checkpoint_wal(&mut conn, CheckpointMode::Passive) {
Ok(_) => {
let duration = start.elapsed();
metrics.record_checkpoint(duration);
health.record_checkpoint_success();
info!("Background checkpoint completed successfully");
}
Err(e) => {
error!("Failed to checkpoint on background: {}", e);
health.record_checkpoint_failure();
}
}
}
Err(e) => {
| Ok(mut conn) => match checkpoint_wal(&mut conn, CheckpointMode::Passive) {
| Ok(_) => {
let duration = start.elapsed();
metrics.record_checkpoint(duration);
health.record_checkpoint_success();
info!("Background checkpoint completed successfully");
},
| Err(e) => {
error!("Failed to checkpoint on background: {}", e);
health.record_checkpoint_failure();
},
},
| Err(e) => {
error!("Failed to acquire database lock for checkpoint: {}", e);
health.record_checkpoint_failure();
}
},
}
}
},
AppLifecycleEvent::WillTerminate => {
| AppLifecycleEvent::WillTerminate => {
// App will terminate - perform shutdown sequence
warn!("App will terminate - performing shutdown sequence");
if let Err(e) = shutdown_system(&mut write_buffer, &db, &mut metrics, Some(&mut pending_tasks)) {
if let Err(e) = shutdown_system(
&mut write_buffer,
&db,
&mut metrics,
Some(&mut pending_tasks),
) {
error!("Failed to perform shutdown on terminate: {}", e);
} else {
info!("Clean shutdown completed on terminate");
}
}
},
AppLifecycleEvent::WillEnterForeground => {
| AppLifecycleEvent::WillEnterForeground => {
// App returning from background - no immediate action needed
info!("App will enter foreground");
}
},
AppLifecycleEvent::DidBecomeActive => {
| AppLifecycleEvent::DidBecomeActive => {
// App became active - no immediate action needed
info!("App did become active");
}
},
}
}
}
@@ -149,10 +156,10 @@ mod tests {
fn test_lifecycle_event_creation() {
let event = AppLifecycleEvent::WillResignActive;
match event {
AppLifecycleEvent::WillResignActive => {
| AppLifecycleEvent::WillResignActive => {
// Success
}
_ => panic!("Event type mismatch"),
},
| _ => panic!("Event type mismatch"),
}
}
}

View File

@@ -142,19 +142,19 @@ pub enum HealthWarning {
impl std::fmt::Display for HealthWarning {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
HealthWarning::SlowFlush(duration) => {
| HealthWarning::SlowFlush(duration) => {
write!(f, "Flush duration ({:?}) exceeds 50ms threshold", duration)
},
| HealthWarning::LargeWal(size) => {
write!(f, "WAL size ({} bytes) exceeds 5MB threshold", size)
},
| HealthWarning::HighCrashRate(rate) => {
write!(
f,
"Flush duration ({:?}) exceeds 50ms threshold",
duration
"Crash recovery rate ({:.1}%) exceeds 10% threshold",
rate * 100.0
)
}
HealthWarning::LargeWal(size) => {
write!(f, "WAL size ({} bytes) exceeds 5MB threshold", size)
}
HealthWarning::HighCrashRate(rate) => {
write!(f, "Crash recovery rate ({:.1}%) exceeds 10% threshold", rate * 100.0)
}
},
}
}
}

View File

@@ -1,17 +1,19 @@
//! Persistence layer for battery-efficient state management
//!
//! This module implements the persistence strategy defined in RFC 0002.
//! It provides a three-tier system to minimize disk I/O while maintaining data durability:
//! It provides a three-tier system to minimize disk I/O while maintaining data
//! durability:
//!
//! 1. **In-Memory Dirty Tracking** - Track changes without writing immediately
//! 2. **Write Buffer** - Batch and coalesce operations before writing
//! 3. **SQLite with WAL Mode** - Controlled checkpoints to minimize fsync() calls
//! 3. **SQLite with WAL Mode** - Controlled checkpoints to minimize fsync()
//! calls
//!
//! # Example
//!
//! ```no_run
//! use lib::persistence::*;
//! use bevy::prelude::*;
//! use lib::persistence::*;
//!
//! fn setup(mut commands: Commands) {
//! // Spawn an entity with the Persisted marker
@@ -28,24 +30,24 @@
//! }
//! ```
mod types;
mod database;
mod systems;
mod config;
mod database;
mod error;
mod health;
mod lifecycle;
mod metrics;
mod plugin;
mod reflection;
mod health;
mod error;
mod lifecycle;
mod systems;
mod types;
pub use types::*;
pub use database::*;
pub use systems::*;
pub use config::*;
pub use database::*;
pub use error::*;
pub use health::*;
pub use lifecycle::*;
pub use metrics::*;
pub use plugin::*;
pub use reflection::*;
pub use health::*;
pub use error::*;
pub use lifecycle::*;
pub use systems::*;
pub use types::*;

View File

@@ -3,10 +3,17 @@
//! This module provides a Bevy plugin that sets up all the necessary resources
//! and systems for the persistence layer.
use crate::persistence::*;
use std::{
ops::{
Deref,
DerefMut,
},
path::PathBuf,
};
use bevy::prelude::*;
use std::path::PathBuf;
use std::ops::{Deref, DerefMut};
use crate::persistence::*;
/// Bevy plugin for persistence
///
@@ -143,10 +150,7 @@ impl std::ops::DerefMut for WriteBufferResource {
}
/// Startup system to initialize persistence
fn persistence_startup_system(
db: Res<PersistenceDb>,
mut metrics: ResMut<PersistenceMetrics>,
) {
fn persistence_startup_system(db: Res<PersistenceDb>, mut metrics: ResMut<PersistenceMetrics>) {
if let Err(e) = startup_system(db.deref(), metrics.deref_mut()) {
error!("Failed to initialize persistence: {}", e);
} else {
@@ -192,10 +196,12 @@ fn collect_dirty_entities_bevy_system(
/// System to automatically track changes to common Bevy components
///
/// This system detects changes to Transform, automatically triggering persistence
/// by accessing `Persisted` mutably (which marks it as changed via Bevy's change detection).
/// This system detects changes to Transform, automatically triggering
/// persistence by accessing `Persisted` mutably (which marks it as changed via
/// Bevy's change detection).
///
/// Add this system to your app if you want automatic persistence of Transform changes:
/// Add this system to your app if you want automatic persistence of Transform
/// changes:
///
/// ```no_run
/// # use bevy::prelude::*;
@@ -214,7 +220,6 @@ pub fn auto_track_transform_changes_system(
}
}
/// System to checkpoint the WAL
fn checkpoint_bevy_system(
db: Res<PersistenceDb>,
@@ -223,18 +228,22 @@ fn checkpoint_bevy_system(
mut metrics: ResMut<PersistenceMetrics>,
mut health: ResMut<PersistenceHealth>,
) {
match checkpoint_system(db.deref(), config.deref(), timer.deref_mut(), metrics.deref_mut()) {
Ok(_) => {
match checkpoint_system(
db.deref(),
config.deref(),
timer.deref_mut(),
metrics.deref_mut(),
) {
| Ok(_) => {
health.record_checkpoint_success();
}
Err(e) => {
},
| Err(e) => {
health.record_checkpoint_failure();
error!(
"Failed to checkpoint WAL (attempt {}): {}",
health.consecutive_checkpoint_failures,
e
health.consecutive_checkpoint_failures, e
);
}
},
}
}

View File

@@ -4,21 +4,33 @@
//! using reflection, allowing the persistence layer to work with any component
//! that implements Reflect.
use bevy::prelude::*;
use bevy::reflect::serde::{ReflectSerializer, ReflectDeserializer};
use bevy::reflect::TypeRegistry;
use crate::persistence::error::{PersistenceError, Result};
use bevy::{
prelude::*,
reflect::{
TypeRegistry,
serde::{
ReflectDeserializer,
ReflectSerializer,
},
},
};
use crate::persistence::error::{
PersistenceError,
Result,
};
/// Marker component to indicate that an entity should be persisted
///
/// Add this component to any entity that should have its state persisted to disk.
/// The persistence system will automatically serialize all components on entities
/// with this marker when they change.
/// Add this component to any entity that should have its state persisted to
/// disk. The persistence system will automatically serialize all components on
/// entities with this marker when they change.
///
/// # Triggering Persistence
///
/// To trigger persistence after modifying components on an entity, access `Persisted`
/// mutably through a query. Bevy's change detection will automatically mark it as changed:
/// To trigger persistence after modifying components on an entity, access
/// `Persisted` mutably through a query. Bevy's change detection will
/// automatically mark it as changed:
///
/// ```no_run
/// # use bevy::prelude::*;
@@ -31,8 +43,8 @@ use crate::persistence::error::{PersistenceError, Result};
/// }
/// ```
///
/// Alternatively, use `auto_track_transform_changes_system` for automatic persistence
/// of Transform changes without manual queries.
/// Alternatively, use `auto_track_transform_changes_system` for automatic
/// persistence of Transform changes without manual queries.
#[derive(Component, Reflect, Default)]
#[reflect(Component)]
pub struct Persisted {
@@ -103,7 +115,8 @@ pub fn serialize_component(
///
/// # Returns
/// - `Ok(Box<dyn PartialReflect>)`: Deserialized component (needs downcasting)
/// - `Err`: If deserialization fails (e.g., type not registered, data corruption)
/// - `Err`: If deserialization fails (e.g., type not registered, data
/// corruption)
///
/// # Examples
/// ```no_run
@@ -137,7 +150,8 @@ pub fn deserialize_component(
///
/// # Parameters
/// - `entity`: Bevy entity to read the component from
/// - `component_type`: Type path string (e.g., "bevy_transform::components::Transform")
/// - `component_type`: Type path string (e.g.,
/// "bevy_transform::components::Transform")
/// - `world`: Bevy world containing the entity
/// - `type_registry`: Bevy's type registry for reflection metadata
///
@@ -155,7 +169,7 @@ pub fn deserialize_component(
/// entity,
/// "bevy_transform::components::Transform",
/// world,
/// &registry
/// &registry,
/// )?;
/// # Some(())
/// # }
@@ -192,7 +206,8 @@ pub fn serialize_component_from_entity(
/// - `type_registry`: Bevy's type registry for reflection metadata
///
/// # Returns
/// Vector of tuples containing (component_type_path, serialized_data) for each component
/// Vector of tuples containing (component_type_path, serialized_data) for each
/// component
pub fn serialize_all_components_from_entity(
entity: Entity,
world: &World,

View File

@@ -1,16 +1,31 @@
//! Bevy systems for the persistence layer
//!
//! This module provides systems that integrate the persistence layer with Bevy's ECS.
//! These systems handle dirty tracking, write buffering, and flushing to SQLite.
//! This module provides systems that integrate the persistence layer with
//! Bevy's ECS. These systems handle dirty tracking, write buffering, and
//! flushing to SQLite.
use crate::persistence::*;
use crate::persistence::error::Result;
use bevy::prelude::*;
use bevy::tasks::{IoTaskPool, Task};
use std::{
sync::{
Arc,
Mutex,
},
time::Instant,
};
use bevy::{
prelude::*,
tasks::{
IoTaskPool,
Task,
},
};
use futures_lite::future;
use rusqlite::Connection;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use crate::persistence::{
error::Result,
*,
};
/// Resource wrapping the SQLite connection
#[derive(Clone, bevy::prelude::Resource)]
@@ -48,8 +63,9 @@ impl PersistenceDb {
/// - `Ok(MutexGuard<Connection>)`: Locked connection ready for use
/// - `Err(PersistenceError)`: If mutex is poisoned
pub fn lock(&self) -> Result<std::sync::MutexGuard<'_, Connection>> {
self.conn.lock()
.map_err(|e| PersistenceError::Other(format!("Database connection mutex poisoned: {}", e)))
self.conn.lock().map_err(|e| {
PersistenceError::Other(format!("Database connection mutex poisoned: {}", e))
})
}
}
@@ -85,9 +101,9 @@ pub struct FlushResult {
fn calculate_bytes_written(ops: &[PersistenceOp]) -> u64 {
ops.iter()
.map(|op| match op {
PersistenceOp::UpsertComponent { data, .. } => data.len() as u64,
PersistenceOp::LogOperation { operation, .. } => operation.len() as u64,
_ => 0,
| PersistenceOp::UpsertComponent { data, .. } => data.len() as u64,
| PersistenceOp::LogOperation { operation, .. } => operation.len() as u64,
| _ => 0,
})
.sum()
}
@@ -120,10 +136,7 @@ fn perform_flush_sync(
/// Helper function to perform a flush asynchronously (for normal operations)
///
/// This runs on the I/O task pool to avoid blocking the main thread
fn perform_flush_async(
ops: Vec<PersistenceOp>,
db: PersistenceDb,
) -> Result<FlushResult> {
fn perform_flush_async(ops: Vec<PersistenceOp>, db: PersistenceDb) -> Result<FlushResult> {
if ops.is_empty() {
return Ok(FlushResult {
operations_count: 0,
@@ -151,10 +164,12 @@ fn perform_flush_async(
/// System to flush the write buffer to SQLite asynchronously
///
/// This system runs on a schedule based on the configuration and battery status.
/// It spawns async tasks to avoid blocking the main thread and handles errors gracefully.
/// This system runs on a schedule based on the configuration and battery
/// status. It spawns async tasks to avoid blocking the main thread and handles
/// errors gracefully.
///
/// The system also polls pending flush tasks and updates metrics when they complete.
/// The system also polls pending flush tasks and updates metrics when they
/// complete.
pub fn flush_system(
mut write_buffer: ResMut<WriteBufferResource>,
db: Res<PersistenceDb>,
@@ -170,7 +185,7 @@ pub fn flush_system(
pending_tasks.tasks.retain_mut(|task| {
if let Some(result) = future::block_on(future::poll_once(task)) {
match result {
Ok(flush_result) => {
| Ok(flush_result) => {
let previous_failures = health.consecutive_flush_failures;
health.record_flush_success();
@@ -183,12 +198,10 @@ pub fn flush_system(
// Emit recovery event if we recovered from failures
if previous_failures > 0 {
recovery_events.write(PersistenceRecoveryEvent {
previous_failures,
});
recovery_events.write(PersistenceRecoveryEvent { previous_failures });
}
}
Err(e) => {
},
| Err(e) => {
health.record_flush_failure();
let error_msg = format!("{}", e);
@@ -205,7 +218,7 @@ pub fn flush_system(
consecutive_failures: health.consecutive_flush_failures,
circuit_breaker_open: health.circuit_breaker_open,
});
}
},
}
false // Remove completed task
} else {
@@ -235,9 +248,7 @@ pub fn flush_system(
let task_pool = IoTaskPool::get();
let db_clone = db.clone();
let task = task_pool.spawn(async move {
perform_flush_async(ops, db_clone.clone())
});
let task = task_pool.spawn(async move { perform_flush_async(ops, db_clone.clone()) });
pending_tasks.tasks.push(task);
@@ -247,7 +258,8 @@ pub fn flush_system(
/// System to checkpoint the WAL file
///
/// This runs less frequently than flush_system to merge the WAL into the main database.
/// This runs less frequently than flush_system to merge the WAL into the main
/// database.
pub fn checkpoint_system(
db: &PersistenceDb,
config: &PersistenceConfig,
@@ -308,24 +320,30 @@ pub fn shutdown_system(
// CRITICAL: Wait for all pending async flushes to complete
// This prevents data loss from in-flight operations
if let Some(pending) = pending_tasks {
info!("Waiting for {} pending flush tasks to complete before shutdown", pending.tasks.len());
info!(
"Waiting for {} pending flush tasks to complete before shutdown",
pending.tasks.len()
);
for task in pending.tasks.drain(..) {
// Block on each pending task to ensure completion
match future::block_on(task) {
Ok(flush_result) => {
| Ok(flush_result) => {
// Update metrics for completed flush
metrics.record_flush(
flush_result.operations_count,
flush_result.duration,
flush_result.bytes_written,
);
debug!("Pending flush completed: {} operations", flush_result.operations_count);
}
Err(e) => {
debug!(
"Pending flush completed: {} operations",
flush_result.operations_count
);
},
| Err(e) => {
error!("Pending flush failed during shutdown: {}", e);
// Continue with shutdown even if a task failed
}
},
}
}

View File

@@ -1,13 +1,26 @@
//! Core types for the persistence layer
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::time::Instant;
use std::{
collections::{
HashMap,
HashSet,
},
time::Instant,
};
use bevy::prelude::Resource;
use chrono::{
DateTime,
Utc,
};
use serde::{
Deserialize,
Serialize,
};
/// Maximum size for a single component in bytes (10MB)
/// Components larger than this may indicate serialization issues or unbounded data growth
/// Components larger than this may indicate serialization issues or unbounded
/// data growth
const MAX_COMPONENT_SIZE_BYTES: usize = 10 * 1024 * 1024;
/// Critical flush deadline in milliseconds (1 second for tier-1 operations)
@@ -23,7 +36,8 @@ pub type NodeId = String;
///
/// Determines how quickly an operation should be flushed to disk:
/// - **Normal**: Regular batched flushing (5-60s intervals based on battery)
/// - **Critical**: Flush within 1 second (tier-1 operations like user actions, CRDT ops)
/// - **Critical**: Flush within 1 second (tier-1 operations like user actions,
/// CRDT ops)
/// - **Immediate**: Flush immediately (shutdown, background suspension)
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum FlushPriority {
@@ -85,10 +99,7 @@ impl DirtyEntities {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PersistenceOp {
/// Insert or update an entity's existence
UpsertEntity {
id: EntityId,
data: EntityData,
},
UpsertEntity { id: EntityId, data: EntityData },
/// Insert or update a component on an entity
UpsertComponent {
@@ -105,15 +116,10 @@ pub enum PersistenceOp {
},
/// Update vector clock for causality tracking
UpdateVectorClock {
node_id: NodeId,
counter: u64,
},
UpdateVectorClock { node_id: NodeId, counter: u64 },
/// Delete an entity
DeleteEntity {
id: EntityId,
},
DeleteEntity { id: EntityId },
/// Delete a component from an entity
DeleteComponent {
@@ -125,17 +131,18 @@ pub enum PersistenceOp {
impl PersistenceOp {
/// Get the default priority for this operation type
///
/// CRDT operations (LogOperation, UpdateVectorClock) are critical tier-1 operations
/// that should be flushed within 1 second to maintain causality across nodes.
/// Other operations use normal priority by default.
/// CRDT operations (LogOperation, UpdateVectorClock) are critical tier-1
/// operations that should be flushed within 1 second to maintain
/// causality across nodes. Other operations use normal priority by
/// default.
pub fn default_priority(&self) -> FlushPriority {
match self {
// CRDT operations are tier-1 (critical)
PersistenceOp::LogOperation { .. } | PersistenceOp::UpdateVectorClock { .. } => {
| PersistenceOp::LogOperation { .. } | PersistenceOp::UpdateVectorClock { .. } => {
FlushPriority::Critical
}
},
// All other operations are normal priority by default
_ => FlushPriority::Normal,
| _ => FlushPriority::Normal,
}
}
}
@@ -181,7 +188,8 @@ impl WriteBuffer {
/// Add an operation to the write buffer with normal priority
///
/// This is a convenience method that calls `add_with_priority` with `FlushPriority::Normal`.
/// This is a convenience method that calls `add_with_priority` with
/// `FlushPriority::Normal`.
///
/// # Panics
/// Panics if component data exceeds MAX_COMPONENT_SIZE_BYTES (10MB)
@@ -191,8 +199,9 @@ impl WriteBuffer {
/// Add an operation using its default priority
///
/// Uses `PersistenceOp::default_priority()` to determine priority automatically.
/// CRDT operations will be added as Critical, others as Normal.
/// Uses `PersistenceOp::default_priority()` to determine priority
/// automatically. CRDT operations will be added as Critical, others as
/// Normal.
///
/// # Panics
/// Panics if component data exceeds MAX_COMPONENT_SIZE_BYTES (10MB)
@@ -212,7 +221,11 @@ impl WriteBuffer {
pub fn add_with_priority(&mut self, op: PersistenceOp, priority: FlushPriority) {
// Validate component size to prevent unbounded memory growth
match &op {
PersistenceOp::UpsertComponent { data, component_type, .. } => {
| PersistenceOp::UpsertComponent {
data,
component_type,
..
} => {
if data.len() > MAX_COMPONENT_SIZE_BYTES {
panic!(
"Component {} size ({} bytes) exceeds maximum ({} bytes). \
@@ -222,8 +235,8 @@ impl WriteBuffer {
MAX_COMPONENT_SIZE_BYTES
);
}
}
PersistenceOp::LogOperation { operation, .. } => {
},
| PersistenceOp::LogOperation { operation, .. } => {
if operation.len() > MAX_COMPONENT_SIZE_BYTES {
panic!(
"Operation size ({} bytes) exceeds maximum ({} bytes)",
@@ -231,12 +244,16 @@ impl WriteBuffer {
MAX_COMPONENT_SIZE_BYTES
);
}
}
_ => {}
},
| _ => {},
}
match &op {
PersistenceOp::UpsertComponent { entity_id, component_type, .. } => {
| PersistenceOp::UpsertComponent {
entity_id,
component_type,
..
} => {
// Remove any existing pending write for this entity+component
self.pending_operations.retain(|existing_op| {
!matches!(existing_op,
@@ -247,8 +264,8 @@ impl WriteBuffer {
} if e_id == entity_id && c_type == component_type
)
});
}
PersistenceOp::UpsertEntity { id, .. } => {
},
| PersistenceOp::UpsertEntity { id, .. } => {
// Remove any existing pending write for this entity
self.pending_operations.retain(|existing_op| {
!matches!(existing_op,
@@ -256,10 +273,10 @@ impl WriteBuffer {
if e_id == id
)
});
}
_ => {
},
| _ => {
// Other operations don't need coalescing
}
},
}
// Track priority for flush urgency
@@ -308,8 +325,8 @@ impl WriteBuffer {
}
// Normal flushing conditions
self.pending_operations.len() >= self.max_operations
|| self.last_flush.elapsed() >= flush_interval
self.pending_operations.len() >= self.max_operations ||
self.last_flush.elapsed() >= flush_interval
}
/// Get the number of pending operations
@@ -370,7 +387,8 @@ impl BatteryStatus {
/// Check if the device is in a battery-critical state
///
/// Returns true if battery is low (<20%) and not charging, or low power mode is enabled.
/// Returns true if battery is low (<20%) and not charging, or low power
/// mode is enabled.
pub fn is_battery_critical(&self) -> bool {
(self.level < 0.2 && !self.is_charging) || self.is_low_power_mode
}
@@ -521,8 +539,9 @@ mod tests {
assert!(!buffer.should_flush(std::time::Duration::from_secs(100)));
// Simulate deadline passing by manually setting the time
buffer.first_critical_time =
Some(Instant::now() - std::time::Duration::from_millis(CRITICAL_FLUSH_DEADLINE_MS + 100));
buffer.first_critical_time = Some(
Instant::now() - std::time::Duration::from_millis(CRITICAL_FLUSH_DEADLINE_MS + 100),
);
// Now should flush due to deadline
assert!(buffer.should_flush(std::time::Duration::from_secs(100)));

View File

@@ -1,24 +1,37 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::ops::{Deref, DerefMut};
// Re-export the macros
pub use sync_macros::{synced, Synced};
use std::ops::{
Deref,
DerefMut,
};
use chrono::{
DateTime,
Utc,
};
// Re-export common CRDT types from the crdts library
pub use crdts::{
CmRDT,
CvRDT,
ctx::ReadCtx,
lwwreg::LWWReg,
map::Map,
orswot::Orswot,
CmRDT, CvRDT,
};
use serde::{
Deserialize,
Serialize,
};
// Re-export the macros
pub use sync_macros::{
Synced,
synced,
};
pub type NodeId = String;
/// Transparent wrapper for synced values
///
/// This wraps any value with LWW semantics but allows you to use it like a normal value
/// This wraps any value with LWW semantics but allows you to use it like a
/// normal value
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncedValue<T: Clone> {
value: T,
@@ -55,8 +68,8 @@ impl<T: Clone> SyncedValue<T> {
pub fn merge(&mut self, other: &Self) {
// Only clone if we're actually going to use the values (when other is newer)
if other.timestamp > self.timestamp
|| (other.timestamp == self.timestamp && other.node_id > self.node_id)
if other.timestamp > self.timestamp ||
(other.timestamp == self.timestamp && other.node_id > self.node_id)
{
self.value = other.value.clone();
self.timestamp = other.timestamp;
@@ -95,7 +108,10 @@ pub struct SyncMessage<T> {
impl<T: Serialize> SyncMessage<T> {
pub fn new(node_id: NodeId, operation: T) -> Self {
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::{
AtomicU64,
Ordering,
};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let seq = COUNTER.fetch_add(1, Ordering::SeqCst);
@@ -134,7 +150,6 @@ pub trait Syncable: Sized {
}
}
#[cfg(test)]
mod tests {
use super::*;