Files
marathon/crates/lib/src/persistence/database.rs

717 lines
23 KiB
Rust
Raw Normal View History

//! Database schema and operations for persistence layer
use std::path::Path;
use chrono::Utc;
use rusqlite::{
Connection,
OptionalExtension,
};
use crate::persistence::{
error::{
PersistenceError,
Result,
},
types::*,
};
/// Default SQLite page size in bytes (4KB)
const DEFAULT_PAGE_SIZE: i64 = 4096;
/// Cache size for SQLite in KB (negative value = KB instead of pages)
const CACHE_SIZE_KB: i64 = -20000; // 20MB
/// Get current Unix timestamp in seconds
///
/// Helper to avoid repeating `Utc::now().timestamp()` throughout the code
#[inline]
fn current_timestamp() -> i64 {
Utc::now().timestamp()
}
/// Initialize SQLite connection with WAL mode and optimizations
pub fn initialize_persistence_db<P: AsRef<Path>>(path: P) -> Result<Connection> {
let mut conn = Connection::open(path)?;
configure_sqlite_for_persistence(&conn)?;
// Run migrations to ensure schema is up to date
crate::persistence::run_migrations(&mut conn)?;
Ok(conn)
}
/// Configure SQLite with WAL mode and battery-friendly settings
pub fn configure_sqlite_for_persistence(conn: &Connection) -> Result<()> {
// Enable Write-Ahead Logging for better concurrency and fewer fsyncs
conn.execute_batch("PRAGMA journal_mode = WAL;")?;
// Don't auto-checkpoint on every transaction - we'll control this manually
conn.execute_batch("PRAGMA wal_autocheckpoint = 0;")?;
// NORMAL synchronous mode - fsync WAL on commit, but not every write
// This is a good balance between durability and performance
conn.execute_batch("PRAGMA synchronous = NORMAL;")?;
// Larger page size for better sequential write performance on mobile
// Note: This must be set before the database is created or after VACUUM
// We'll skip setting it if database already exists to avoid issues
let page_size: i64 = conn.query_row("PRAGMA page_size", [], |row| row.get(0))?;
if page_size == DEFAULT_PAGE_SIZE {
// Try to set larger page size, but only if we're at default
// This will only work on a fresh database
let _ = conn.execute_batch("PRAGMA page_size = 8192;");
}
// Increase cache size for better performance (in pages, negative = KB)
conn.execute_batch(&format!("PRAGMA cache_size = {};", CACHE_SIZE_KB))?;
// Use memory for temp tables (faster, we don't need temp table durability)
conn.execute_batch("PRAGMA temp_store = MEMORY;")?;
Ok(())
}
/// Create the database schema for persistence
pub fn create_persistence_schema(conn: &Connection) -> Result<()> {
// Entities table - stores entity metadata
conn.execute(
"CREATE TABLE IF NOT EXISTS entities (
id BLOB PRIMARY KEY,
entity_type TEXT NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
)",
[],
)?;
// Components table - stores serialized component data
conn.execute(
"CREATE TABLE IF NOT EXISTS components (
entity_id BLOB NOT NULL,
component_type TEXT NOT NULL,
data BLOB NOT NULL,
updated_at INTEGER NOT NULL,
PRIMARY KEY (entity_id, component_type),
FOREIGN KEY (entity_id) REFERENCES entities(id) ON DELETE CASCADE
)",
[],
)?;
// Index for querying components by entity
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_components_entity
ON components(entity_id)",
[],
)?;
// Operation log - for CRDT sync protocol
conn.execute(
"CREATE TABLE IF NOT EXISTS operation_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
node_id TEXT NOT NULL,
sequence_number INTEGER NOT NULL,
operation BLOB NOT NULL,
timestamp INTEGER NOT NULL,
UNIQUE(node_id, sequence_number)
)",
[],
)?;
// Index for efficient operation log queries
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_oplog_node_seq
ON operation_log(node_id, sequence_number)",
[],
)?;
// Vector clock table - for causality tracking
conn.execute(
"CREATE TABLE IF NOT EXISTS vector_clock (
node_id TEXT PRIMARY KEY,
counter INTEGER NOT NULL,
updated_at INTEGER NOT NULL
)",
[],
)?;
// Session state table - for crash detection
conn.execute(
"CREATE TABLE IF NOT EXISTS session_state (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at INTEGER NOT NULL
)",
[],
)?;
// WAL checkpoint tracking
conn.execute(
"CREATE TABLE IF NOT EXISTS checkpoint_state (
last_checkpoint INTEGER NOT NULL,
wal_size_bytes INTEGER NOT NULL
)",
[],
)?;
// Initialize checkpoint state if not exists
conn.execute(
"INSERT OR IGNORE INTO checkpoint_state (rowid, last_checkpoint, wal_size_bytes)
VALUES (1, ?, 0)",
[current_timestamp()],
)?;
Ok(())
}
/// Flush a batch of operations to SQLite in a single transaction
pub fn flush_to_sqlite(ops: &[PersistenceOp], conn: &mut Connection) -> Result<usize> {
if ops.is_empty() {
return Ok(0);
}
let tx = conn.transaction()?;
let mut count = 0;
for op in ops {
match op {
| PersistenceOp::UpsertEntity { id, data } => {
tx.execute(
"INSERT OR REPLACE INTO entities (id, entity_type, created_at, updated_at)
VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![
id.as_bytes(),
data.entity_type,
data.created_at.timestamp(),
data.updated_at.timestamp(),
],
)?;
count += 1;
},
| PersistenceOp::UpsertComponent {
entity_id,
component_type,
data,
} => {
tx.execute(
"INSERT OR REPLACE INTO components (entity_id, component_type, data, updated_at)
VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![
entity_id.as_bytes(),
component_type,
data,
current_timestamp(),
],
)?;
count += 1;
},
| PersistenceOp::LogOperation {
node_id,
sequence,
operation,
} => {
tx.execute(
"INSERT OR REPLACE INTO operation_log (node_id, sequence_number, operation, timestamp)
VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![
&node_id.to_string(), // Convert UUID to string for SQLite TEXT column
sequence,
operation,
current_timestamp(),
],
)?;
count += 1;
},
| PersistenceOp::UpdateVectorClock { node_id, counter } => {
tx.execute(
"INSERT OR REPLACE INTO vector_clock (node_id, counter, updated_at)
VALUES (?1, ?2, ?3)",
rusqlite::params![&node_id.to_string(), counter, current_timestamp()], // Convert UUID to string
)?;
count += 1;
},
| PersistenceOp::DeleteEntity { id } => {
tx.execute(
"DELETE FROM entities WHERE id = ?1",
rusqlite::params![id.as_bytes()],
)?;
count += 1;
},
| PersistenceOp::DeleteComponent {
entity_id,
component_type,
} => {
tx.execute(
"DELETE FROM components WHERE entity_id = ?1 AND component_type = ?2",
rusqlite::params![entity_id.as_bytes(), component_type],
)?;
count += 1;
},
}
}
tx.commit()?;
Ok(count)
}
/// Manually checkpoint the WAL file to merge changes into the main database
///
/// This function performs a SQLite WAL checkpoint, which copies frames from the
/// write-ahead log back into the main database file. This is crucial for:
/// - Reducing WAL file size to save disk space
/// - Ensuring durability of committed transactions
/// - Maintaining database integrity
///
/// # Parameters
/// - `conn`: Mutable reference to the SQLite connection
/// - `mode`: Checkpoint mode controlling blocking behavior (see
/// [`CheckpointMode`])
///
/// # Returns
/// - `Ok(CheckpointInfo)`: Information about the checkpoint operation
/// - `Err`: If the checkpoint fails or database state update fails
///
/// # Examples
/// ```no_run
/// # use rusqlite::Connection;
/// # use lib::persistence::*;
/// # fn example() -> anyhow::Result<()> {
/// let mut conn = Connection::open("app.db")?;
/// let info = checkpoint_wal(&mut conn, CheckpointMode::Passive)?;
/// if info.busy {
/// // Some pages couldn't be checkpointed due to active readers
/// }
/// # Ok(())
/// # }
/// ```
pub fn checkpoint_wal(conn: &mut Connection, mode: CheckpointMode) -> Result<CheckpointInfo> {
let mode_str = match mode {
| CheckpointMode::Passive => "PASSIVE",
| CheckpointMode::Full => "FULL",
| CheckpointMode::Restart => "RESTART",
| CheckpointMode::Truncate => "TRUNCATE",
};
let query = format!("PRAGMA wal_checkpoint({})", mode_str);
// Returns (busy, log_pages, checkpointed_pages)
let (busy, log_pages, checkpointed_pages): (i32, i32, i32) =
conn.query_row(&query, [], |row| {
Ok((row.get(0)?, row.get(1)?, row.get(2)?))
})?;
// Update checkpoint state
conn.execute(
"UPDATE checkpoint_state SET last_checkpoint = ?1 WHERE rowid = 1",
[current_timestamp()],
)?;
Ok(CheckpointInfo {
busy: busy != 0,
log_pages,
checkpointed_pages,
})
}
/// Get the size of the WAL file in bytes
///
/// This checks the actual WAL file size on disk without triggering a
/// checkpoint. Large WAL files consume disk space and can slow down recovery,
/// so monitoring size helps maintain optimal performance.
///
/// # Parameters
/// - `conn`: Reference to the SQLite connection
///
/// # Returns
/// - `Ok(i64)`: WAL file size in bytes (0 if no WAL exists or in-memory
/// database)
/// - `Err`: If the database path query fails
///
/// # Note
/// For in-memory databases, always returns 0.
pub fn get_wal_size(conn: &Connection) -> Result<i64> {
// Get the database file path
let db_path: Option<String> = conn
.query_row("PRAGMA database_list", [], |row| row.get::<_, String>(2))
.optional()?;
// If no path (in-memory database), return 0
let Some(db_path) = db_path else {
return Ok(0);
};
// WAL file has same name as database but with -wal suffix
let wal_path = format!("{}-wal", db_path);
// Check if WAL file exists and get its size
match std::fs::metadata(&wal_path) {
| Ok(metadata) => Ok(metadata.len() as i64),
| Err(_) => Ok(0), // WAL doesn't exist yet
}
}
/// Checkpoint mode for WAL
#[derive(Debug, Clone, Copy)]
pub enum CheckpointMode {
/// Passive checkpoint - doesn't block readers/writers
Passive,
/// Full checkpoint - waits for writers to finish
Full,
/// Restart checkpoint - like Full, but restarts WAL file
Restart,
/// Truncate checkpoint - like Restart, but truncates WAL file to 0 bytes
Truncate,
}
/// Information about a checkpoint operation
#[derive(Debug)]
pub struct CheckpointInfo {
pub busy: bool,
pub log_pages: i32,
pub checkpointed_pages: i32,
}
/// Set a session state value in the database
///
/// Session state is used to track application lifecycle events and detect
/// crashes. Values persist across restarts, enabling crash detection and
/// recovery.
///
/// # Parameters
/// - `conn`: Mutable reference to the SQLite connection
/// - `key`: State key (e.g., "clean_shutdown", "session_id")
/// - `value`: State value to store
///
/// # Returns
/// - `Ok(())`: State was successfully saved
/// - `Err`: If the database write fails
pub fn set_session_state(conn: &mut Connection, key: &str, value: &str) -> Result<()> {
conn.execute(
"INSERT OR REPLACE INTO session_state (key, value, updated_at)
VALUES (?1, ?2, ?3)",
rusqlite::params![key, value, current_timestamp()],
)?;
Ok(())
}
/// Get a session state value from the database
///
/// Retrieves persistent state information stored across application sessions.
///
/// # Parameters
/// - `conn`: Reference to the SQLite connection
/// - `key`: State key to retrieve
///
/// # Returns
/// - `Ok(Some(value))`: State exists and was retrieved
/// - `Ok(None)`: State key doesn't exist
/// - `Err`: If the database query fails
pub fn get_session_state(conn: &Connection, key: &str) -> Result<Option<String>> {
conn.query_row(
"SELECT value FROM session_state WHERE key = ?1",
rusqlite::params![key],
|row| row.get(0),
)
.optional()
.map_err(|e| PersistenceError::Database(e))
}
/// Check if the previous session had a clean shutdown
///
/// This is critical for crash detection. When the application starts, this
/// checks if the previous session ended cleanly. If not, it indicates a crash
/// occurred, and recovery procedures may be needed.
///
/// **Side effect**: Resets the clean_shutdown flag to "false" for the current
/// session. Call [`mark_clean_shutdown`] during normal shutdown to set it back
/// to "true".
///
/// # Parameters
/// - `conn`: Mutable reference to the SQLite connection (mutates session state)
///
/// # Returns
/// - `Ok(true)`: Previous session shut down cleanly
/// - `Ok(false)`: Previous session crashed or this is first run
/// - `Err`: If database operations fail
pub fn check_clean_shutdown(conn: &mut Connection) -> Result<bool> {
let clean = get_session_state(conn, "clean_shutdown")?
.map(|v| v == "true")
.unwrap_or(false);
// Reset for this session
set_session_state(conn, "clean_shutdown", "false")?;
Ok(clean)
}
/// Mark the current session as cleanly shut down
///
/// Call this during normal application shutdown to indicate clean termination.
/// The next startup will detect this flag via [`check_clean_shutdown`] and know
/// no crash occurred.
///
/// # Parameters
/// - `conn`: Mutable reference to the SQLite connection
///
/// # Returns
/// - `Ok(())`: Clean shutdown flag was set
/// - `Err`: If the database write fails
pub fn mark_clean_shutdown(conn: &mut Connection) -> Result<()> {
set_session_state(conn, "clean_shutdown", "true")
}
//
// ============================================================================
// Session Management Operations
// ============================================================================
//
/// Save session metadata to database
pub fn save_session(conn: &mut Connection, session: &crate::networking::Session) -> Result<()> {
conn.execute(
"INSERT OR REPLACE INTO sessions (id, code, name, created_at, last_active, entity_count, state, secret)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
rusqlite::params![
session.id.as_uuid().as_bytes(),
session.id.to_code(),
session.name,
session.created_at,
session.last_active,
session.entity_count as i64,
session.state.to_string(),
session.secret,
],
)?;
Ok(())
}
/// Load session by ID
pub fn load_session(
conn: &Connection,
session_id: crate::networking::SessionId,
) -> Result<Option<crate::networking::Session>> {
conn.query_row(
"SELECT code, name, created_at, last_active, entity_count, state, secret
FROM sessions WHERE id = ?1",
[session_id.as_uuid().as_bytes()],
|row| {
let code: String = row.get(0)?;
let state_str: String = row.get(5)?;
let state = crate::networking::SessionState::from_str(&state_str)
.unwrap_or(crate::networking::SessionState::Created);
// Reconstruct SessionId from the stored code
let id = crate::networking::SessionId::from_code(&code)
.map_err(|_| rusqlite::Error::InvalidQuery)?;
Ok(crate::networking::Session {
id,
name: row.get(1)?,
created_at: row.get(2)?,
last_active: row.get(3)?,
entity_count: row.get::<_, i64>(4)? as usize,
state,
secret: row.get(6)?,
})
},
)
.optional()
.map_err(PersistenceError::from)
}
/// Get the most recently active session
pub fn get_last_active_session(conn: &Connection) -> Result<Option<crate::networking::Session>> {
conn.query_row(
"SELECT code, name, created_at, last_active, entity_count, state, secret
FROM sessions ORDER BY last_active DESC LIMIT 1",
[],
|row| {
let code: String = row.get(0)?;
let state_str: String = row.get(5)?;
let state = crate::networking::SessionState::from_str(&state_str)
.unwrap_or(crate::networking::SessionState::Created);
// Reconstruct SessionId from the stored code
let id = crate::networking::SessionId::from_code(&code)
.map_err(|_| rusqlite::Error::InvalidQuery)?;
Ok(crate::networking::Session {
id,
name: row.get(1)?,
created_at: row.get(2)?,
last_active: row.get(3)?,
entity_count: row.get::<_, i64>(4)? as usize,
state,
secret: row.get(6)?,
})
},
)
.optional()
.map_err(PersistenceError::from)
}
/// Save session vector clock to database
pub fn save_session_vector_clock(
conn: &mut Connection,
session_id: crate::networking::SessionId,
clock: &crate::networking::VectorClock,
) -> Result<()> {
let tx = conn.transaction()?;
// Delete old clock entries for this session
tx.execute(
"DELETE FROM vector_clock WHERE session_id = ?1",
[session_id.as_uuid().as_bytes()],
)?;
// Insert current clock state
for (node_id, &counter) in &clock.clocks {
tx.execute(
"INSERT INTO vector_clock (session_id, node_id, counter, updated_at)
VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![
session_id.as_uuid().as_bytes(),
node_id.to_string(),
counter as i64,
current_timestamp(),
],
)?;
}
tx.commit()?;
Ok(())
}
/// Load session vector clock from database
pub fn load_session_vector_clock(
conn: &Connection,
session_id: crate::networking::SessionId,
) -> Result<crate::networking::VectorClock> {
let mut stmt =
conn.prepare("SELECT node_id, counter FROM vector_clock WHERE session_id = ?1")?;
let mut clock = crate::networking::VectorClock::new();
let rows = stmt.query_map([session_id.as_uuid().as_bytes()], |row| {
let node_id_str: String = row.get(0)?;
let counter: i64 = row.get(1)?;
Ok((node_id_str, counter))
})?;
for row in rows {
let (node_id_str, counter) = row?;
if let Ok(node_id) = uuid::Uuid::parse_str(&node_id_str) {
clock.clocks.insert(node_id, counter as u64);
}
}
Ok(clock)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_database_initialization() -> Result<()> {
let conn = Connection::open_in_memory()?;
configure_sqlite_for_persistence(&conn)?;
create_persistence_schema(&conn)?;
// Verify tables exist
let tables: Vec<String> = conn
.prepare("SELECT name FROM sqlite_master WHERE type='table'")?
.query_map([], |row| row.get(0))?
.collect::<std::result::Result<Vec<_>, _>>()?;
assert!(tables.contains(&"entities".to_string()));
assert!(tables.contains(&"components".to_string()));
assert!(tables.contains(&"operation_log".to_string()));
assert!(tables.contains(&"vector_clock".to_string()));
Ok(())
}
#[test]
fn test_flush_operations() -> Result<()> {
let mut conn = Connection::open_in_memory()?;
create_persistence_schema(&conn)?;
let entity_id = uuid::Uuid::new_v4();
let ops = vec![
PersistenceOp::UpsertEntity {
id: entity_id,
data: EntityData {
id: entity_id,
created_at: Utc::now(),
updated_at: Utc::now(),
entity_type: "TestEntity".to_string(),
},
},
PersistenceOp::UpsertComponent {
entity_id,
component_type: "Transform".to_string(),
data: vec![1, 2, 3, 4],
},
];
let count = flush_to_sqlite(&ops, &mut conn)?;
assert_eq!(count, 2);
// Verify entity exists
let exists: bool = conn.query_row(
"SELECT COUNT(*) > 0 FROM entities WHERE id = ?1",
rusqlite::params![entity_id.as_bytes()],
|row| row.get(0),
)?;
assert!(exists);
Ok(())
}
#[test]
fn test_session_state() -> Result<()> {
let mut conn = Connection::open_in_memory()?;
create_persistence_schema(&conn)?;
set_session_state(&mut conn, "test_key", "test_value")?;
let value = get_session_state(&conn, "test_key")?;
assert_eq!(value, Some("test_value".to_string()));
Ok(())
}
#[test]
fn test_crash_recovery() -> Result<()> {
let mut conn = Connection::open_in_memory()?;
create_persistence_schema(&conn)?;
// Simulate first startup - should report as crash (no clean shutdown marker)
let clean = check_clean_shutdown(&mut conn)?;
assert!(!clean, "First startup should be detected as crash");
// Mark clean shutdown
mark_clean_shutdown(&mut conn)?;
// Next startup should report clean shutdown
let clean = check_clean_shutdown(&mut conn)?;
assert!(clean, "Should detect clean shutdown");
// After checking clean shutdown, flag should be reset to false
// So if we check again without marking, it should report as crash
let value = get_session_state(&conn, "clean_shutdown")?;
assert_eq!(
value,
Some("false".to_string()),
"Flag should be reset after check"
);
Ok(())
}
}