phase 2 server core: - CodeSession: create/resume sessions, Matrix room per project, Mistral conversation lifecycle, tool dispatch loop - agent loop: user input → Mistral → tool calls → route (client via gRPC / server via ToolRegistry) → collect results → respond - Matrix bridge: all messages posted to project room, accessible from any Matrix client - code_sessions SQLite table (Postgres-compatible schema) - coding mode context injection (project path, git info, prompt.md)
746 lines
26 KiB
Rust
746 lines
26 KiB
Rust
use rusqlite::{params, Connection, Result as SqlResult};
|
|
use std::path::Path;
|
|
use std::sync::Mutex;
|
|
use tracing::{info, warn};
|
|
|
|
/// SQLite-backed persistent state for Sol.
|
|
///
|
|
/// Stores:
|
|
/// - Conversation registry: room_id → Mistral conversation_id + token estimates
|
|
/// - Agent registry: agent_name → Mistral agent_id
|
|
///
|
|
/// ## Kubernetes mount
|
|
///
|
|
/// The database file should be on a persistent volume. Recommended setup:
|
|
///
|
|
/// ```yaml
|
|
/// # PersistentVolumeClaim (Longhorn)
|
|
/// apiVersion: v1
|
|
/// kind: PersistentVolumeClaim
|
|
/// metadata:
|
|
/// name: sol-data
|
|
/// namespace: matrix
|
|
/// spec:
|
|
/// accessModes: [ReadWriteOnce]
|
|
/// storageClassName: longhorn
|
|
/// resources:
|
|
/// requests:
|
|
/// storage: 1Gi
|
|
///
|
|
/// # Deployment volume mount
|
|
/// volumes:
|
|
/// - name: sol-data
|
|
/// persistentVolumeClaim:
|
|
/// claimName: sol-data
|
|
/// containers:
|
|
/// - name: sol
|
|
/// volumeMounts:
|
|
/// - name: sol-data
|
|
/// mountPath: /data
|
|
/// ```
|
|
///
|
|
/// Default path: `/data/sol.db` (configurable via `matrix.db_path` in sol.toml).
|
|
/// The `/data` mount also holds the Matrix SDK state store at `/data/matrix-state`.
|
|
pub struct Store {
|
|
conn: Mutex<Connection>,
|
|
}
|
|
|
|
impl Store {
|
|
/// Open or create the database at the given path.
|
|
/// Creates tables if they don't exist.
|
|
pub fn open(path: &str) -> anyhow::Result<Self> {
|
|
// Ensure parent directory exists
|
|
if let Some(parent) = Path::new(path).parent() {
|
|
std::fs::create_dir_all(parent)?;
|
|
}
|
|
|
|
let conn = Connection::open(path)?;
|
|
|
|
// Enable WAL mode for better concurrent read performance
|
|
conn.execute_batch("PRAGMA journal_mode=WAL;")?;
|
|
|
|
conn.execute_batch(
|
|
"CREATE TABLE IF NOT EXISTS conversations (
|
|
room_id TEXT PRIMARY KEY,
|
|
conversation_id TEXT NOT NULL,
|
|
estimated_tokens INTEGER NOT NULL DEFAULT 0,
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now'))
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS agents (
|
|
name TEXT PRIMARY KEY,
|
|
agent_id TEXT NOT NULL,
|
|
model TEXT NOT NULL DEFAULT '',
|
|
instructions_hash TEXT NOT NULL DEFAULT '',
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now'))
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS service_users (
|
|
localpart TEXT NOT NULL,
|
|
service TEXT NOT NULL,
|
|
service_username TEXT NOT NULL,
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
|
PRIMARY KEY (localpart, service)
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS code_sessions (
|
|
session_id TEXT PRIMARY KEY,
|
|
user_id TEXT NOT NULL,
|
|
room_id TEXT NOT NULL,
|
|
conversation_id TEXT,
|
|
project_path TEXT NOT NULL,
|
|
project_name TEXT NOT NULL,
|
|
model TEXT NOT NULL,
|
|
status TEXT NOT NULL DEFAULT 'active',
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
|
last_active TEXT NOT NULL DEFAULT (datetime('now'))
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS research_sessions (
|
|
session_id TEXT PRIMARY KEY,
|
|
room_id TEXT NOT NULL,
|
|
event_id TEXT NOT NULL,
|
|
status TEXT NOT NULL DEFAULT 'running',
|
|
query TEXT NOT NULL,
|
|
plan_json TEXT,
|
|
findings_json TEXT,
|
|
depth INTEGER NOT NULL DEFAULT 0,
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
|
completed_at TEXT
|
|
);
|
|
",
|
|
)?;
|
|
|
|
// Migration: add instructions_hash column to agents table if missing
|
|
let has_hash: bool = conn
|
|
.prepare("SELECT instructions_hash FROM agents LIMIT 0")
|
|
.is_ok();
|
|
if !has_hash {
|
|
conn.execute_batch(
|
|
"ALTER TABLE agents ADD COLUMN instructions_hash TEXT NOT NULL DEFAULT '';"
|
|
)?;
|
|
info!("Migrated agents table: added instructions_hash column");
|
|
}
|
|
|
|
info!(path, "Opened Sol state database");
|
|
Ok(Self {
|
|
conn: Mutex::new(conn),
|
|
})
|
|
}
|
|
|
|
/// Open an in-memory database (for tests).
|
|
pub fn open_memory() -> anyhow::Result<Self> {
|
|
Self::open(":memory:")
|
|
}
|
|
|
|
// =========================================================================
|
|
// Conversations
|
|
// =========================================================================
|
|
|
|
/// Get the conversation_id for a room, if one exists.
|
|
pub fn get_conversation(&self, room_id: &str) -> Option<(String, u32)> {
|
|
let conn = self.conn.lock().unwrap();
|
|
conn.query_row(
|
|
"SELECT conversation_id, estimated_tokens FROM conversations WHERE room_id = ?1",
|
|
params![room_id],
|
|
|row| Ok((row.get::<_, String>(0)?, row.get::<_, u32>(1)?)),
|
|
)
|
|
.ok()
|
|
}
|
|
|
|
/// Store or update a conversation mapping.
|
|
pub fn upsert_conversation(
|
|
&self,
|
|
room_id: &str,
|
|
conversation_id: &str,
|
|
estimated_tokens: u32,
|
|
) {
|
|
let conn = self.conn.lock().unwrap();
|
|
if let Err(e) = conn.execute(
|
|
"INSERT INTO conversations (room_id, conversation_id, estimated_tokens)
|
|
VALUES (?1, ?2, ?3)
|
|
ON CONFLICT(room_id) DO UPDATE SET
|
|
conversation_id = excluded.conversation_id,
|
|
estimated_tokens = excluded.estimated_tokens",
|
|
params![room_id, conversation_id, estimated_tokens],
|
|
) {
|
|
warn!("Failed to upsert conversation: {e}");
|
|
}
|
|
}
|
|
|
|
/// Update token estimate for a conversation.
|
|
pub fn update_tokens(&self, room_id: &str, estimated_tokens: u32) {
|
|
let conn = self.conn.lock().unwrap();
|
|
if let Err(e) = conn.execute(
|
|
"UPDATE conversations SET estimated_tokens = ?1 WHERE room_id = ?2",
|
|
params![estimated_tokens, room_id],
|
|
) {
|
|
warn!("Failed to update token estimate: {e}");
|
|
}
|
|
}
|
|
|
|
/// Remove all conversation mappings (e.g., after agent recreation).
|
|
pub fn delete_all_conversations(&self) -> usize {
|
|
let conn = self.conn.lock().unwrap();
|
|
conn.execute("DELETE FROM conversations", []).unwrap_or(0)
|
|
}
|
|
|
|
/// Remove a conversation mapping (e.g., after compaction).
|
|
pub fn delete_conversation(&self, room_id: &str) {
|
|
let conn = self.conn.lock().unwrap();
|
|
if let Err(e) = conn.execute(
|
|
"DELETE FROM conversations WHERE room_id = ?1",
|
|
params![room_id],
|
|
) {
|
|
warn!("Failed to delete conversation: {e}");
|
|
}
|
|
}
|
|
|
|
/// Load all conversation mappings (for startup recovery).
|
|
pub fn load_all_conversations(&self) -> Vec<(String, String, u32)> {
|
|
let conn = self.conn.lock().unwrap();
|
|
let mut stmt = match conn.prepare(
|
|
"SELECT room_id, conversation_id, estimated_tokens FROM conversations",
|
|
) {
|
|
Ok(s) => s,
|
|
Err(_) => return Vec::new(),
|
|
};
|
|
|
|
stmt.query_map([], |row| {
|
|
Ok((
|
|
row.get::<_, String>(0)?,
|
|
row.get::<_, String>(1)?,
|
|
row.get::<_, u32>(2)?,
|
|
))
|
|
})
|
|
.ok()
|
|
.map(|rows| rows.filter_map(|r| r.ok()).collect())
|
|
.unwrap_or_default()
|
|
}
|
|
|
|
// =========================================================================
|
|
// Agents
|
|
// =========================================================================
|
|
|
|
/// Get the agent_id and instructions hash for a named agent.
|
|
pub fn get_agent(&self, name: &str) -> Option<(String, String)> {
|
|
let conn = self.conn.lock().unwrap();
|
|
conn.query_row(
|
|
"SELECT agent_id, instructions_hash FROM agents WHERE name = ?1",
|
|
params![name],
|
|
|row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)),
|
|
)
|
|
.ok()
|
|
}
|
|
|
|
/// Store or update an agent mapping (with instructions hash for staleness detection).
|
|
pub fn upsert_agent(&self, name: &str, agent_id: &str, model: &str, instructions_hash: &str) {
|
|
let conn = self.conn.lock().unwrap();
|
|
if let Err(e) = conn.execute(
|
|
"INSERT INTO agents (name, agent_id, model, instructions_hash)
|
|
VALUES (?1, ?2, ?3, ?4)
|
|
ON CONFLICT(name) DO UPDATE SET
|
|
agent_id = excluded.agent_id,
|
|
model = excluded.model,
|
|
instructions_hash = excluded.instructions_hash",
|
|
params![name, agent_id, model, instructions_hash],
|
|
) {
|
|
warn!("Failed to upsert agent: {e}");
|
|
}
|
|
}
|
|
|
|
/// Remove an agent mapping.
|
|
pub fn delete_agent(&self, name: &str) {
|
|
let conn = self.conn.lock().unwrap();
|
|
if let Err(e) = conn.execute("DELETE FROM agents WHERE name = ?1", params![name]) {
|
|
warn!("Failed to delete agent: {e}");
|
|
}
|
|
}
|
|
|
|
// =========================================================================
|
|
// Code Sessions (sunbeam code)
|
|
// =========================================================================
|
|
|
|
/// Find an active code session for a user + project.
|
|
pub fn find_code_session(
|
|
&self,
|
|
user_id: &str,
|
|
project_name: &str,
|
|
) -> Option<(String, String, String)> {
|
|
let conn = self.conn.lock().unwrap();
|
|
conn.query_row(
|
|
"SELECT session_id, room_id, conversation_id FROM code_sessions
|
|
WHERE user_id = ?1 AND project_name = ?2 AND status = 'active'
|
|
ORDER BY last_active DESC LIMIT 1",
|
|
params![user_id, project_name],
|
|
|row| Ok((
|
|
row.get::<_, String>(0)?,
|
|
row.get::<_, String>(1)?,
|
|
row.get::<_, String>(2)?,
|
|
)),
|
|
)
|
|
.ok()
|
|
}
|
|
|
|
/// Create a new code session.
|
|
pub fn create_code_session(
|
|
&self,
|
|
session_id: &str,
|
|
user_id: &str,
|
|
room_id: &str,
|
|
project_path: &str,
|
|
project_name: &str,
|
|
model: &str,
|
|
) {
|
|
let conn = self.conn.lock().unwrap();
|
|
if let Err(e) = conn.execute(
|
|
"INSERT INTO code_sessions (session_id, user_id, room_id, project_path, project_name, model)
|
|
VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
|
|
params![session_id, user_id, room_id, project_path, project_name, model],
|
|
) {
|
|
warn!("Failed to create code session: {e}");
|
|
}
|
|
}
|
|
|
|
/// Update the conversation_id for a code session.
|
|
pub fn set_code_session_conversation(
|
|
&self,
|
|
session_id: &str,
|
|
conversation_id: &str,
|
|
) {
|
|
let conn = self.conn.lock().unwrap();
|
|
if let Err(e) = conn.execute(
|
|
"UPDATE code_sessions SET conversation_id = ?1, last_active = datetime('now')
|
|
WHERE session_id = ?2",
|
|
params![conversation_id, session_id],
|
|
) {
|
|
warn!("Failed to update code session conversation: {e}");
|
|
}
|
|
}
|
|
|
|
/// Touch the last_active timestamp.
|
|
pub fn touch_code_session(&self, session_id: &str) {
|
|
let conn = self.conn.lock().unwrap();
|
|
if let Err(e) = conn.execute(
|
|
"UPDATE code_sessions SET last_active = datetime('now') WHERE session_id = ?1",
|
|
params![session_id],
|
|
) {
|
|
warn!("Failed to touch code session: {e}");
|
|
}
|
|
}
|
|
|
|
/// End a code session.
|
|
pub fn end_code_session(&self, session_id: &str) {
|
|
let conn = self.conn.lock().unwrap();
|
|
if let Err(e) = conn.execute(
|
|
"UPDATE code_sessions SET status = 'ended' WHERE session_id = ?1",
|
|
params![session_id],
|
|
) {
|
|
warn!("Failed to end code session: {e}");
|
|
}
|
|
}
|
|
|
|
/// Check if a room is a code session room.
|
|
pub fn is_code_room(&self, room_id: &str) -> bool {
|
|
let conn = self.conn.lock().unwrap();
|
|
conn.query_row(
|
|
"SELECT 1 FROM code_sessions WHERE room_id = ?1 AND status = 'active' LIMIT 1",
|
|
params![room_id],
|
|
|_| Ok(()),
|
|
)
|
|
.is_ok()
|
|
}
|
|
|
|
/// Get project context for a code room.
|
|
pub fn get_code_room_context(
|
|
&self,
|
|
room_id: &str,
|
|
) -> Option<(String, String, String)> {
|
|
let conn = self.conn.lock().unwrap();
|
|
conn.query_row(
|
|
"SELECT project_name, project_path, model FROM code_sessions
|
|
WHERE room_id = ?1 AND status = 'active'
|
|
ORDER BY last_active DESC LIMIT 1",
|
|
params![room_id],
|
|
|row| Ok((
|
|
row.get::<_, String>(0)?,
|
|
row.get::<_, String>(1)?,
|
|
row.get::<_, String>(2)?,
|
|
)),
|
|
)
|
|
.ok()
|
|
}
|
|
|
|
// =========================================================================
|
|
// Service Users (OIDC → service username mapping)
|
|
// =========================================================================
|
|
|
|
/// Get the service-specific username for a Matrix localpart.
|
|
pub fn get_service_user(&self, localpart: &str, service: &str) -> Option<String> {
|
|
let conn = self.conn.lock().unwrap();
|
|
conn.query_row(
|
|
"SELECT service_username FROM service_users WHERE localpart = ?1 AND service = ?2",
|
|
params![localpart, service],
|
|
|row| row.get(0),
|
|
)
|
|
.ok()
|
|
}
|
|
|
|
/// Store or update a service user mapping.
|
|
pub fn upsert_service_user(&self, localpart: &str, service: &str, service_username: &str) {
|
|
let conn = self.conn.lock().unwrap();
|
|
if let Err(e) = conn.execute(
|
|
"INSERT INTO service_users (localpart, service, service_username)
|
|
VALUES (?1, ?2, ?3)
|
|
ON CONFLICT(localpart, service) DO UPDATE SET
|
|
service_username = excluded.service_username",
|
|
params![localpart, service, service_username],
|
|
) {
|
|
warn!("Failed to upsert service user: {e}");
|
|
}
|
|
}
|
|
|
|
/// Remove a service user mapping.
|
|
pub fn delete_service_user(&self, localpart: &str, service: &str) {
|
|
let conn = self.conn.lock().unwrap();
|
|
if let Err(e) = conn.execute(
|
|
"DELETE FROM service_users WHERE localpart = ?1 AND service = ?2",
|
|
params![localpart, service],
|
|
) {
|
|
warn!("Failed to delete service user: {e}");
|
|
}
|
|
}
|
|
|
|
// =========================================================================
|
|
// Research Sessions
|
|
// =========================================================================
|
|
|
|
/// Create a new research session.
|
|
pub fn create_research_session(
|
|
&self,
|
|
session_id: &str,
|
|
room_id: &str,
|
|
event_id: &str,
|
|
query: &str,
|
|
plan_json: &str,
|
|
) {
|
|
let conn = self.conn.lock().unwrap();
|
|
if let Err(e) = conn.execute(
|
|
"INSERT INTO research_sessions (session_id, room_id, event_id, query, plan_json, findings_json)
|
|
VALUES (?1, ?2, ?3, ?4, ?5, '[]')",
|
|
params![session_id, room_id, event_id, query, plan_json],
|
|
) {
|
|
warn!("Failed to create research session: {e}");
|
|
}
|
|
}
|
|
|
|
/// Append a finding to a research session.
|
|
pub fn append_research_finding(&self, session_id: &str, finding_json: &str) {
|
|
let conn = self.conn.lock().unwrap();
|
|
// Append to the JSON array
|
|
if let Err(e) = conn.execute(
|
|
"UPDATE research_sessions
|
|
SET findings_json = json_insert(findings_json, '$[#]', json(?1))
|
|
WHERE session_id = ?2",
|
|
params![finding_json, session_id],
|
|
) {
|
|
warn!("Failed to append research finding: {e}");
|
|
}
|
|
}
|
|
|
|
/// Mark a research session as complete.
|
|
pub fn complete_research_session(&self, session_id: &str) {
|
|
let conn = self.conn.lock().unwrap();
|
|
if let Err(e) = conn.execute(
|
|
"UPDATE research_sessions SET status = 'complete', completed_at = datetime('now')
|
|
WHERE session_id = ?1",
|
|
params![session_id],
|
|
) {
|
|
warn!("Failed to complete research session: {e}");
|
|
}
|
|
}
|
|
|
|
/// Mark a research session as failed.
|
|
pub fn fail_research_session(&self, session_id: &str) {
|
|
let conn = self.conn.lock().unwrap();
|
|
if let Err(e) = conn.execute(
|
|
"UPDATE research_sessions SET status = 'failed', completed_at = datetime('now')
|
|
WHERE session_id = ?1",
|
|
params![session_id],
|
|
) {
|
|
warn!("Failed to mark research session failed: {e}");
|
|
}
|
|
}
|
|
|
|
/// Load all running research sessions (for crash recovery on startup).
|
|
pub fn load_running_research_sessions(&self) -> Vec<(String, String, String, String)> {
|
|
let conn = self.conn.lock().unwrap();
|
|
let mut stmt = match conn.prepare(
|
|
"SELECT session_id, room_id, query, findings_json
|
|
FROM research_sessions WHERE status = 'running'",
|
|
) {
|
|
Ok(s) => s,
|
|
Err(_) => return Vec::new(),
|
|
};
|
|
|
|
stmt.query_map([], |row| {
|
|
Ok((
|
|
row.get::<_, String>(0)?,
|
|
row.get::<_, String>(1)?,
|
|
row.get::<_, String>(2)?,
|
|
row.get::<_, String>(3)?,
|
|
))
|
|
})
|
|
.ok()
|
|
.map(|rows| rows.filter_map(|r| r.ok()).collect())
|
|
.unwrap_or_default()
|
|
}
|
|
|
|
/// Load all agent mappings (for startup recovery).
|
|
pub fn load_all_agents(&self) -> Vec<(String, String)> {
|
|
let conn = self.conn.lock().unwrap();
|
|
let mut stmt = match conn.prepare("SELECT name, agent_id FROM agents") {
|
|
Ok(s) => s,
|
|
Err(_) => return Vec::new(),
|
|
};
|
|
|
|
stmt.query_map([], |row| {
|
|
Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
|
|
})
|
|
.ok()
|
|
.map(|rows| rows.filter_map(|r| r.ok()).collect())
|
|
.unwrap_or_default()
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
#[test]
|
|
fn test_open_memory_db() {
|
|
let store = Store::open_memory().unwrap();
|
|
assert!(store.load_all_conversations().is_empty());
|
|
assert!(store.load_all_agents().is_empty());
|
|
}
|
|
|
|
#[test]
|
|
fn test_conversation_crud() {
|
|
let store = Store::open_memory().unwrap();
|
|
|
|
// Insert
|
|
store.upsert_conversation("!room:x", "conv_abc", 100);
|
|
let (conv_id, tokens) = store.get_conversation("!room:x").unwrap();
|
|
assert_eq!(conv_id, "conv_abc");
|
|
assert_eq!(tokens, 100);
|
|
|
|
// Update tokens
|
|
store.update_tokens("!room:x", 500);
|
|
let (_, tokens) = store.get_conversation("!room:x").unwrap();
|
|
assert_eq!(tokens, 500);
|
|
|
|
// Upsert (replace conversation_id)
|
|
store.upsert_conversation("!room:x", "conv_def", 0);
|
|
let (conv_id, tokens) = store.get_conversation("!room:x").unwrap();
|
|
assert_eq!(conv_id, "conv_def");
|
|
assert_eq!(tokens, 0);
|
|
|
|
// Delete
|
|
store.delete_conversation("!room:x");
|
|
assert!(store.get_conversation("!room:x").is_none());
|
|
}
|
|
|
|
#[test]
|
|
fn test_agent_crud() {
|
|
let store = Store::open_memory().unwrap();
|
|
|
|
store.upsert_agent("sol-orchestrator", "ag_123", "mistral-medium-latest", "hash1");
|
|
let (id, hash) = store.get_agent("sol-orchestrator").unwrap();
|
|
assert_eq!(id, "ag_123");
|
|
assert_eq!(hash, "hash1");
|
|
|
|
// Update
|
|
store.upsert_agent("sol-orchestrator", "ag_456", "mistral-medium-latest", "hash2");
|
|
let (id, hash) = store.get_agent("sol-orchestrator").unwrap();
|
|
assert_eq!(id, "ag_456");
|
|
assert_eq!(hash, "hash2");
|
|
|
|
// Delete
|
|
store.delete_agent("sol-orchestrator");
|
|
assert!(store.get_agent("sol-orchestrator").is_none());
|
|
}
|
|
|
|
#[test]
|
|
fn test_load_all_conversations() {
|
|
let store = Store::open_memory().unwrap();
|
|
store.upsert_conversation("!a:x", "conv_1", 10);
|
|
store.upsert_conversation("!b:x", "conv_2", 20);
|
|
store.upsert_conversation("!c:x", "conv_3", 30);
|
|
|
|
let all = store.load_all_conversations();
|
|
assert_eq!(all.len(), 3);
|
|
}
|
|
|
|
#[test]
|
|
fn test_load_all_agents() {
|
|
let store = Store::open_memory().unwrap();
|
|
store.upsert_agent("orch", "ag_1", "medium", "h1");
|
|
store.upsert_agent("obs", "ag_2", "medium", "h2");
|
|
|
|
let all = store.load_all_agents();
|
|
assert_eq!(all.len(), 2);
|
|
}
|
|
|
|
#[test]
|
|
fn test_nonexistent_keys_return_none() {
|
|
let store = Store::open_memory().unwrap();
|
|
assert!(store.get_conversation("!nope:x").is_none());
|
|
assert!(store.get_agent("nope").is_none());
|
|
}
|
|
|
|
#[test]
|
|
fn test_service_user_crud() {
|
|
let store = Store::open_memory().unwrap();
|
|
|
|
// Insert
|
|
store.upsert_service_user("sienna", "gitea", "siennav");
|
|
assert_eq!(
|
|
store.get_service_user("sienna", "gitea").unwrap(),
|
|
"siennav"
|
|
);
|
|
|
|
// Update
|
|
store.upsert_service_user("sienna", "gitea", "sienna");
|
|
assert_eq!(
|
|
store.get_service_user("sienna", "gitea").unwrap(),
|
|
"sienna"
|
|
);
|
|
|
|
// Different service
|
|
store.upsert_service_user("sienna", "grafana", "sienna-g");
|
|
assert_eq!(
|
|
store.get_service_user("sienna", "grafana").unwrap(),
|
|
"sienna-g"
|
|
);
|
|
|
|
// Delete
|
|
store.delete_service_user("sienna", "gitea");
|
|
assert!(store.get_service_user("sienna", "gitea").is_none());
|
|
// Other service unaffected
|
|
assert!(store.get_service_user("sienna", "grafana").is_some());
|
|
}
|
|
|
|
#[test]
|
|
fn test_delete_all_conversations() {
|
|
let store = Store::open_memory().unwrap();
|
|
store.upsert_conversation("!a:x", "conv_1", 10);
|
|
store.upsert_conversation("!b:x", "conv_2", 20);
|
|
store.upsert_conversation("!c:x", "conv_3", 30);
|
|
|
|
let deleted = store.delete_all_conversations();
|
|
assert_eq!(deleted, 3);
|
|
assert!(store.load_all_conversations().is_empty());
|
|
|
|
// DB still works after bulk delete
|
|
store.upsert_conversation("!d:x", "conv_4", 0);
|
|
assert_eq!(store.load_all_conversations().len(), 1);
|
|
}
|
|
|
|
#[test]
|
|
fn test_nonexistent_service_user() {
|
|
let store = Store::open_memory().unwrap();
|
|
assert!(store.get_service_user("nobody", "gitea").is_none());
|
|
}
|
|
|
|
// ── Research session tests ──────────────────────────────────────────
|
|
|
|
#[test]
|
|
fn test_research_session_lifecycle() {
|
|
let store = Store::open_memory().unwrap();
|
|
|
|
// Create
|
|
store.create_research_session("sess-1", "!room:x", "$event1", "investigate SBBB", "[]");
|
|
let running = store.load_running_research_sessions();
|
|
assert_eq!(running.len(), 1);
|
|
assert_eq!(running[0].0, "sess-1");
|
|
assert_eq!(running[0].2, "investigate SBBB");
|
|
}
|
|
|
|
#[test]
|
|
fn test_research_session_append_finding() {
|
|
let store = Store::open_memory().unwrap();
|
|
store.create_research_session("sess-2", "!room:x", "$event2", "test", "[]");
|
|
|
|
store.append_research_finding("sess-2", r#"{"focus":"repo","findings":"found 3 files"}"#);
|
|
store.append_research_finding("sess-2", r#"{"focus":"archive","findings":"12 messages"}"#);
|
|
|
|
let running = store.load_running_research_sessions();
|
|
assert_eq!(running.len(), 1);
|
|
// findings_json should be a JSON array with 2 entries
|
|
let findings: serde_json::Value = serde_json::from_str(&running[0].3).unwrap();
|
|
assert_eq!(findings.as_array().unwrap().len(), 2);
|
|
}
|
|
|
|
#[test]
|
|
fn test_research_session_complete() {
|
|
let store = Store::open_memory().unwrap();
|
|
store.create_research_session("sess-3", "!room:x", "$event3", "test", "[]");
|
|
|
|
store.complete_research_session("sess-3");
|
|
|
|
// Should no longer appear in running sessions
|
|
let running = store.load_running_research_sessions();
|
|
assert!(running.is_empty());
|
|
}
|
|
|
|
#[test]
|
|
fn test_research_session_fail() {
|
|
let store = Store::open_memory().unwrap();
|
|
store.create_research_session("sess-4", "!room:x", "$event4", "test", "[]");
|
|
|
|
store.fail_research_session("sess-4");
|
|
|
|
let running = store.load_running_research_sessions();
|
|
assert!(running.is_empty());
|
|
}
|
|
|
|
#[test]
|
|
fn test_hung_session_cleanup_on_startup() {
|
|
let store = Store::open_memory().unwrap();
|
|
|
|
// Simulate 2 hung sessions + 1 completed
|
|
store.create_research_session("hung-1", "!room:a", "$e1", "query A", "[]");
|
|
store.create_research_session("hung-2", "!room:b", "$e2", "query B", "[]");
|
|
store.create_research_session("done-1", "!room:c", "$e3", "query C", "[]");
|
|
store.complete_research_session("done-1");
|
|
|
|
// Only the 2 hung sessions should be returned
|
|
let hung = store.load_running_research_sessions();
|
|
assert_eq!(hung.len(), 2);
|
|
|
|
// Clean them up (simulates startup logic)
|
|
for (session_id, _, _, _) in &hung {
|
|
store.fail_research_session(session_id);
|
|
}
|
|
|
|
// Now none should be running
|
|
assert!(store.load_running_research_sessions().is_empty());
|
|
}
|
|
|
|
#[test]
|
|
fn test_research_session_partial_findings_survive_failure() {
|
|
let store = Store::open_memory().unwrap();
|
|
store.create_research_session("sess-5", "!room:x", "$e5", "deep dive", "[]");
|
|
|
|
// Agent 1 completes, agent 2 hasn't yet
|
|
store.append_research_finding("sess-5", r#"{"focus":"agent1","findings":"found stuff"}"#);
|
|
|
|
// Crash! Mark as failed
|
|
store.fail_research_session("sess-5");
|
|
|
|
// Findings should still be queryable even though session failed
|
|
// (would need a get_session method to verify, but the key point is
|
|
// append_research_finding persists incrementally)
|
|
}
|
|
}
|