add token infrastructure: service_users table, localpart helper
new SQLite table service_users maps OIDC identities (matrix localpart) to service-specific usernames, handling auth boundary mismatches. localpart() extracts the username from a matrix user ID. delete_all_conversations() added for bulk reset after agent recreation. all delete_* methods now log failures instead of silently discarding. removed dead user_tokens table (tokens now live in vault).
This commit is contained in:
@@ -16,6 +16,14 @@ pub struct ResponseContext {
|
|||||||
pub room_id: String,
|
pub room_id: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Extract the localpart from a Matrix user ID.
|
||||||
|
///
|
||||||
|
/// `@sienna:sunbeam.pt` → `sienna`
|
||||||
|
pub fn localpart(matrix_user_id: &str) -> &str {
|
||||||
|
let stripped = matrix_user_id.strip_prefix('@').unwrap_or(matrix_user_id);
|
||||||
|
stripped.split(':').next().unwrap_or(stripped)
|
||||||
|
}
|
||||||
|
|
||||||
/// Derive a portable user ID from a Matrix user ID.
|
/// Derive a portable user ID from a Matrix user ID.
|
||||||
///
|
///
|
||||||
/// `@sienna:sunbeam.pt` → `sienna@sunbeam.pt`
|
/// `@sienna:sunbeam.pt` → `sienna@sunbeam.pt`
|
||||||
@@ -53,4 +61,24 @@ mod tests {
|
|||||||
"user@server:8448"
|
"user@server:8448"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_localpart_standard() {
|
||||||
|
assert_eq!(localpart("@sienna:sunbeam.pt"), "sienna");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_localpart_no_at_prefix() {
|
||||||
|
assert_eq!(localpart("sienna:sunbeam.pt"), "sienna");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_localpart_no_colon() {
|
||||||
|
assert_eq!(localpart("@sienna"), "sienna");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_localpart_complex() {
|
||||||
|
assert_eq!(localpart("@user.name:matrix.org"), "user.name");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -68,13 +68,35 @@ impl Store {
|
|||||||
);
|
);
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS agents (
|
CREATE TABLE IF NOT EXISTS agents (
|
||||||
name TEXT PRIMARY KEY,
|
name TEXT PRIMARY KEY,
|
||||||
agent_id TEXT NOT NULL,
|
agent_id TEXT NOT NULL,
|
||||||
model TEXT NOT NULL DEFAULT '',
|
model TEXT NOT NULL DEFAULT '',
|
||||||
created_at TEXT NOT NULL DEFAULT (datetime('now'))
|
instructions_hash TEXT NOT NULL DEFAULT '',
|
||||||
);",
|
created_at TEXT NOT NULL DEFAULT (datetime('now'))
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS service_users (
|
||||||
|
localpart TEXT NOT NULL,
|
||||||
|
service TEXT NOT NULL,
|
||||||
|
service_username TEXT NOT NULL,
|
||||||
|
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
||||||
|
PRIMARY KEY (localpart, service)
|
||||||
|
);
|
||||||
|
|
||||||
|
",
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
// Migration: add instructions_hash column to agents table if missing
|
||||||
|
let has_hash: bool = conn
|
||||||
|
.prepare("SELECT instructions_hash FROM agents LIMIT 0")
|
||||||
|
.is_ok();
|
||||||
|
if !has_hash {
|
||||||
|
conn.execute_batch(
|
||||||
|
"ALTER TABLE agents ADD COLUMN instructions_hash TEXT NOT NULL DEFAULT '';"
|
||||||
|
)?;
|
||||||
|
info!("Migrated agents table: added instructions_hash column");
|
||||||
|
}
|
||||||
|
|
||||||
info!(path, "Opened Sol state database");
|
info!(path, "Opened Sol state database");
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
conn: Mutex::new(conn),
|
conn: Mutex::new(conn),
|
||||||
@@ -124,19 +146,29 @@ impl Store {
|
|||||||
/// Update token estimate for a conversation.
|
/// Update token estimate for a conversation.
|
||||||
pub fn update_tokens(&self, room_id: &str, estimated_tokens: u32) {
|
pub fn update_tokens(&self, room_id: &str, estimated_tokens: u32) {
|
||||||
let conn = self.conn.lock().unwrap();
|
let conn = self.conn.lock().unwrap();
|
||||||
let _ = conn.execute(
|
if let Err(e) = conn.execute(
|
||||||
"UPDATE conversations SET estimated_tokens = ?1 WHERE room_id = ?2",
|
"UPDATE conversations SET estimated_tokens = ?1 WHERE room_id = ?2",
|
||||||
params![estimated_tokens, room_id],
|
params![estimated_tokens, room_id],
|
||||||
);
|
) {
|
||||||
|
warn!("Failed to update token estimate: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remove all conversation mappings (e.g., after agent recreation).
|
||||||
|
pub fn delete_all_conversations(&self) -> usize {
|
||||||
|
let conn = self.conn.lock().unwrap();
|
||||||
|
conn.execute("DELETE FROM conversations", []).unwrap_or(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove a conversation mapping (e.g., after compaction).
|
/// Remove a conversation mapping (e.g., after compaction).
|
||||||
pub fn delete_conversation(&self, room_id: &str) {
|
pub fn delete_conversation(&self, room_id: &str) {
|
||||||
let conn = self.conn.lock().unwrap();
|
let conn = self.conn.lock().unwrap();
|
||||||
let _ = conn.execute(
|
if let Err(e) = conn.execute(
|
||||||
"DELETE FROM conversations WHERE room_id = ?1",
|
"DELETE FROM conversations WHERE room_id = ?1",
|
||||||
params![room_id],
|
params![room_id],
|
||||||
);
|
) {
|
||||||
|
warn!("Failed to delete conversation: {e}");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Load all conversation mappings (for startup recovery).
|
/// Load all conversation mappings (for startup recovery).
|
||||||
@@ -165,27 +197,28 @@ impl Store {
|
|||||||
// Agents
|
// Agents
|
||||||
// =========================================================================
|
// =========================================================================
|
||||||
|
|
||||||
/// Get the agent_id for a named agent.
|
/// Get the agent_id and instructions hash for a named agent.
|
||||||
pub fn get_agent(&self, name: &str) -> Option<String> {
|
pub fn get_agent(&self, name: &str) -> Option<(String, String)> {
|
||||||
let conn = self.conn.lock().unwrap();
|
let conn = self.conn.lock().unwrap();
|
||||||
conn.query_row(
|
conn.query_row(
|
||||||
"SELECT agent_id FROM agents WHERE name = ?1",
|
"SELECT agent_id, instructions_hash FROM agents WHERE name = ?1",
|
||||||
params![name],
|
params![name],
|
||||||
|row| row.get(0),
|
|row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)),
|
||||||
)
|
)
|
||||||
.ok()
|
.ok()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Store or update an agent mapping.
|
/// Store or update an agent mapping (with instructions hash for staleness detection).
|
||||||
pub fn upsert_agent(&self, name: &str, agent_id: &str, model: &str) {
|
pub fn upsert_agent(&self, name: &str, agent_id: &str, model: &str, instructions_hash: &str) {
|
||||||
let conn = self.conn.lock().unwrap();
|
let conn = self.conn.lock().unwrap();
|
||||||
if let Err(e) = conn.execute(
|
if let Err(e) = conn.execute(
|
||||||
"INSERT INTO agents (name, agent_id, model)
|
"INSERT INTO agents (name, agent_id, model, instructions_hash)
|
||||||
VALUES (?1, ?2, ?3)
|
VALUES (?1, ?2, ?3, ?4)
|
||||||
ON CONFLICT(name) DO UPDATE SET
|
ON CONFLICT(name) DO UPDATE SET
|
||||||
agent_id = excluded.agent_id,
|
agent_id = excluded.agent_id,
|
||||||
model = excluded.model",
|
model = excluded.model,
|
||||||
params![name, agent_id, model],
|
instructions_hash = excluded.instructions_hash",
|
||||||
|
params![name, agent_id, model, instructions_hash],
|
||||||
) {
|
) {
|
||||||
warn!("Failed to upsert agent: {e}");
|
warn!("Failed to upsert agent: {e}");
|
||||||
}
|
}
|
||||||
@@ -194,7 +227,49 @@ impl Store {
|
|||||||
/// Remove an agent mapping.
|
/// Remove an agent mapping.
|
||||||
pub fn delete_agent(&self, name: &str) {
|
pub fn delete_agent(&self, name: &str) {
|
||||||
let conn = self.conn.lock().unwrap();
|
let conn = self.conn.lock().unwrap();
|
||||||
let _ = conn.execute("DELETE FROM agents WHERE name = ?1", params![name]);
|
if let Err(e) = conn.execute("DELETE FROM agents WHERE name = ?1", params![name]) {
|
||||||
|
warn!("Failed to delete agent: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// =========================================================================
|
||||||
|
// Service Users (OIDC → service username mapping)
|
||||||
|
// =========================================================================
|
||||||
|
|
||||||
|
/// Get the service-specific username for a Matrix localpart.
|
||||||
|
pub fn get_service_user(&self, localpart: &str, service: &str) -> Option<String> {
|
||||||
|
let conn = self.conn.lock().unwrap();
|
||||||
|
conn.query_row(
|
||||||
|
"SELECT service_username FROM service_users WHERE localpart = ?1 AND service = ?2",
|
||||||
|
params![localpart, service],
|
||||||
|
|row| row.get(0),
|
||||||
|
)
|
||||||
|
.ok()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Store or update a service user mapping.
|
||||||
|
pub fn upsert_service_user(&self, localpart: &str, service: &str, service_username: &str) {
|
||||||
|
let conn = self.conn.lock().unwrap();
|
||||||
|
if let Err(e) = conn.execute(
|
||||||
|
"INSERT INTO service_users (localpart, service, service_username)
|
||||||
|
VALUES (?1, ?2, ?3)
|
||||||
|
ON CONFLICT(localpart, service) DO UPDATE SET
|
||||||
|
service_username = excluded.service_username",
|
||||||
|
params![localpart, service, service_username],
|
||||||
|
) {
|
||||||
|
warn!("Failed to upsert service user: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remove a service user mapping.
|
||||||
|
pub fn delete_service_user(&self, localpart: &str, service: &str) {
|
||||||
|
let conn = self.conn.lock().unwrap();
|
||||||
|
if let Err(e) = conn.execute(
|
||||||
|
"DELETE FROM service_users WHERE localpart = ?1 AND service = ?2",
|
||||||
|
params![localpart, service],
|
||||||
|
) {
|
||||||
|
warn!("Failed to delete service user: {e}");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Load all agent mappings (for startup recovery).
|
/// Load all agent mappings (for startup recovery).
|
||||||
@@ -255,18 +330,16 @@ mod tests {
|
|||||||
fn test_agent_crud() {
|
fn test_agent_crud() {
|
||||||
let store = Store::open_memory().unwrap();
|
let store = Store::open_memory().unwrap();
|
||||||
|
|
||||||
store.upsert_agent("sol-orchestrator", "ag_123", "mistral-medium-latest");
|
store.upsert_agent("sol-orchestrator", "ag_123", "mistral-medium-latest", "hash1");
|
||||||
assert_eq!(
|
let (id, hash) = store.get_agent("sol-orchestrator").unwrap();
|
||||||
store.get_agent("sol-orchestrator").unwrap(),
|
assert_eq!(id, "ag_123");
|
||||||
"ag_123"
|
assert_eq!(hash, "hash1");
|
||||||
);
|
|
||||||
|
|
||||||
// Update
|
// Update
|
||||||
store.upsert_agent("sol-orchestrator", "ag_456", "mistral-medium-latest");
|
store.upsert_agent("sol-orchestrator", "ag_456", "mistral-medium-latest", "hash2");
|
||||||
assert_eq!(
|
let (id, hash) = store.get_agent("sol-orchestrator").unwrap();
|
||||||
store.get_agent("sol-orchestrator").unwrap(),
|
assert_eq!(id, "ag_456");
|
||||||
"ag_456"
|
assert_eq!(hash, "hash2");
|
||||||
);
|
|
||||||
|
|
||||||
// Delete
|
// Delete
|
||||||
store.delete_agent("sol-orchestrator");
|
store.delete_agent("sol-orchestrator");
|
||||||
@@ -287,8 +360,8 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_load_all_agents() {
|
fn test_load_all_agents() {
|
||||||
let store = Store::open_memory().unwrap();
|
let store = Store::open_memory().unwrap();
|
||||||
store.upsert_agent("orch", "ag_1", "medium");
|
store.upsert_agent("orch", "ag_1", "medium", "h1");
|
||||||
store.upsert_agent("obs", "ag_2", "medium");
|
store.upsert_agent("obs", "ag_2", "medium", "h2");
|
||||||
|
|
||||||
let all = store.load_all_agents();
|
let all = store.load_all_agents();
|
||||||
assert_eq!(all.len(), 2);
|
assert_eq!(all.len(), 2);
|
||||||
@@ -300,4 +373,58 @@ mod tests {
|
|||||||
assert!(store.get_conversation("!nope:x").is_none());
|
assert!(store.get_conversation("!nope:x").is_none());
|
||||||
assert!(store.get_agent("nope").is_none());
|
assert!(store.get_agent("nope").is_none());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_service_user_crud() {
|
||||||
|
let store = Store::open_memory().unwrap();
|
||||||
|
|
||||||
|
// Insert
|
||||||
|
store.upsert_service_user("sienna", "gitea", "siennav");
|
||||||
|
assert_eq!(
|
||||||
|
store.get_service_user("sienna", "gitea").unwrap(),
|
||||||
|
"siennav"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Update
|
||||||
|
store.upsert_service_user("sienna", "gitea", "sienna");
|
||||||
|
assert_eq!(
|
||||||
|
store.get_service_user("sienna", "gitea").unwrap(),
|
||||||
|
"sienna"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Different service
|
||||||
|
store.upsert_service_user("sienna", "grafana", "sienna-g");
|
||||||
|
assert_eq!(
|
||||||
|
store.get_service_user("sienna", "grafana").unwrap(),
|
||||||
|
"sienna-g"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Delete
|
||||||
|
store.delete_service_user("sienna", "gitea");
|
||||||
|
assert!(store.get_service_user("sienna", "gitea").is_none());
|
||||||
|
// Other service unaffected
|
||||||
|
assert!(store.get_service_user("sienna", "grafana").is_some());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_delete_all_conversations() {
|
||||||
|
let store = Store::open_memory().unwrap();
|
||||||
|
store.upsert_conversation("!a:x", "conv_1", 10);
|
||||||
|
store.upsert_conversation("!b:x", "conv_2", 20);
|
||||||
|
store.upsert_conversation("!c:x", "conv_3", 30);
|
||||||
|
|
||||||
|
let deleted = store.delete_all_conversations();
|
||||||
|
assert_eq!(deleted, 3);
|
||||||
|
assert!(store.load_all_conversations().is_empty());
|
||||||
|
|
||||||
|
// DB still works after bulk delete
|
||||||
|
store.upsert_conversation("!d:x", "conv_4", 0);
|
||||||
|
assert_eq!(store.load_all_conversations().len(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_nonexistent_service_user() {
|
||||||
|
let store = Store::open_memory().unwrap();
|
||||||
|
assert!(store.get_service_user("nobody", "gitea").is_none());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user