Files
sol/src/sync.rs

497 lines
17 KiB
Rust
Raw Normal View History

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use matrix_sdk::config::SyncSettings;
use matrix_sdk::room::Room;
use matrix_sdk::Client;
use ruma::events::reaction::OriginalSyncReactionEvent;
use ruma::events::room::member::StrippedRoomMemberEvent;
use ruma::events::room::message::OriginalSyncRoomMessageEvent;
use ruma::events::room::redaction::OriginalSyncRoomRedactionEvent;
use tokio::sync::Mutex;
use tracing::{debug, error, info, warn};
use opensearch::OpenSearch;
use crate::agents::registry::AgentRegistry;
use crate::archive::indexer::Indexer;
use crate::archive::schema::ArchiveDocument;
use crate::brain::conversation::{ContextMessage, ConversationManager};
use crate::brain::evaluator::{Engagement, Evaluator};
use crate::brain::personality::Personality;
use crate::tools::ToolRegistry;
use crate::config::Config;
use crate::context::{self, ResponseContext};
use crate::conversations::ConversationRegistry;
use crate::matrix_utils;
use crate::memory;
pub struct AppState {
pub config: Arc<Config>,
pub indexer: Arc<Indexer>,
pub evaluator: Arc<Evaluator>,
pub tools: Arc<ToolRegistry>,
pub personality: Arc<Personality>,
pub conversations: Arc<Mutex<ConversationManager>>,
pub mistral: Arc<mistralai_client::v1::client::Client>,
pub opensearch: OpenSearch,
/// Agent registry — manages Mistral agent lifecycle.
pub agent_registry: Arc<AgentRegistry>,
/// Conversation registry for Mistral Conversations API.
pub conversation_registry: Arc<ConversationRegistry>,
/// Tracks when Sol last responded in each room (for cooldown)
pub last_response: Arc<Mutex<HashMap<String, Instant>>>,
/// Tracks rooms where a response is currently being generated (in-flight guard)
pub responding_in: Arc<Mutex<std::collections::HashSet<String>>>,
/// Rooms where Sol has been told to be quiet — maps room_id → silenced_until
pub silenced_until: Arc<Mutex<HashMap<String, Instant>>>,
}
pub async fn start_sync(client: Client, state: Arc<AppState>) -> anyhow::Result<()> {
// Register event handlers
let s = state.clone();
client.add_event_handler(
move |event: OriginalSyncRoomMessageEvent, room: Room| {
let state = s.clone();
async move {
if let Err(e) = handle_message(event, room, state).await {
error!("Error handling message: {e}");
}
}
},
);
let s = state.clone();
client.add_event_handler(
move |event: OriginalSyncRoomRedactionEvent, _room: Room| {
let state = s.clone();
async move {
handle_redaction(event, &state).await;
}
},
);
let s = state.clone();
client.add_event_handler(
move |event: OriginalSyncReactionEvent, _room: Room| {
let state = s.clone();
async move {
handle_reaction(event, &state).await;
}
},
);
client.add_event_handler(
move |event: StrippedRoomMemberEvent, room: Room| async move {
handle_invite(event, room).await;
},
);
info!("Starting Matrix sync loop");
let settings = SyncSettings::default();
client.sync(settings).await?;
Ok(())
}
async fn handle_message(
event: OriginalSyncRoomMessageEvent,
room: Room,
state: Arc<AppState>,
) -> anyhow::Result<()> {
let sender = event.sender.to_string();
let room_id = room.room_id().to_string();
let event_id = event.event_id.to_string();
let timestamp = event.origin_server_ts.0.into();
// Check if this is an edit
if let Some((original_id, new_body)) = matrix_utils::extract_edit(&event) {
state
.indexer
.update_edit(&original_id.to_string(), &new_body)
.await;
return Ok(());
}
// Extract text body — or image caption for m.image events
let image_data_uri = matrix_utils::download_image_as_data_uri(
&room.client(),
&event,
)
.await;
let body = if let Some(ref _uri) = image_data_uri {
// For images, use the caption/filename as the text body
matrix_utils::extract_image(&event)
.map(|(_, _, caption)| caption)
.or_else(|| matrix_utils::extract_body(&event))
.unwrap_or_default()
} else {
match matrix_utils::extract_body(&event) {
Some(b) => b,
None => return Ok(()),
}
};
// Skip if we have neither text nor image
if body.is_empty() && image_data_uri.is_none() {
return Ok(());
}
let room_name = matrix_utils::room_display_name(&room);
let sender_name = room
.get_member_no_sync(&event.sender)
.await
.ok()
.flatten()
.and_then(|m| m.display_name().map(|s| s.to_string()));
let reply_to = matrix_utils::extract_reply_to(&event).map(|id| id.to_string());
let is_reply = reply_to.is_some();
let thread_id = matrix_utils::extract_thread_id(&event).map(|id| id.to_string());
// Archive the message
let doc = ArchiveDocument {
event_id: event_id.clone(),
room_id: room_id.clone(),
room_name: Some(room_name.clone()),
sender: sender.clone(),
sender_name: sender_name.clone(),
timestamp,
content: body.clone(),
reply_to,
thread_id,
media_urls: matrix_utils::extract_image(&event)
.map(|(url, _, _)| vec![url])
.unwrap_or_default(),
event_type: "m.room.message".into(),
edited: false,
redacted: false,
reactions: Vec::new(),
};
state.indexer.add(doc).await;
// Update conversation context
let is_dm = room.is_direct().await.unwrap_or(false);
let response_ctx = ResponseContext {
matrix_user_id: sender.clone(),
user_id: context::derive_user_id(&sender),
display_name: sender_name.clone(),
is_dm,
is_reply,
room_id: room_id.clone(),
};
{
let mut convs = state.conversations.lock().await;
convs.add_message(
&room_id,
is_dm,
ContextMessage {
sender: sender_name.clone().unwrap_or_else(|| sender.clone()),
content: body.clone(),
timestamp,
},
);
}
// Silence detection — if someone tells Sol to be quiet, set a per-room timer
{
let lower = body.to_lowercase();
let silence_phrases = [
"shut up", "be quiet", "shush", "silence", "stop talking",
"quiet down", "hush", "enough sol", "sol enough", "sol stop",
"sol shut up", "sol be quiet", "sol shush",
];
if silence_phrases.iter().any(|p| lower.contains(p)) {
let duration = std::time::Duration::from_millis(
state.config.behavior.silence_duration_ms,
);
let until = Instant::now() + duration;
let mut silenced = state.silenced_until.lock().await;
silenced.insert(room_id.clone(), until);
info!(
room = room_id.as_str(),
duration_mins = state.config.behavior.silence_duration_ms / 60_000,
"Silenced in room"
);
}
}
// Check if Sol is currently silenced in this room
let is_silenced = {
let silenced = state.silenced_until.lock().await;
silenced
.get(&room_id)
.map(|until| Instant::now() < *until)
.unwrap_or(false)
};
// Evaluate whether to respond
let recent: Vec<String> = {
let convs = state.conversations.lock().await;
convs
.get_context(&room_id)
.iter()
.map(|m| format!("{}: {}", m.sender, m.content))
.collect()
};
// A: Check if this message is a reply to another human (not Sol)
let is_reply_to_human = is_reply && !is_dm && {
// If it's a reply, check the conversation context for who the previous
// message was from. We don't have event IDs in context, so we use a
// heuristic: if the most recent message before this one was from a human
// (not Sol), this reply is likely directed at them.
let convs = state.conversations.lock().await;
let ctx = convs.get_context(&room_id);
let sol_id = &state.config.matrix.user_id;
// Check the message before the current one (last in context before we added ours)
ctx.iter().rev().skip(1).next()
.map(|m| m.sender != *sol_id)
.unwrap_or(false)
};
// B: Count messages since Sol last spoke in this room
let messages_since_sol = {
let convs = state.conversations.lock().await;
let ctx = convs.get_context(&room_id);
let sol_id = &state.config.matrix.user_id;
ctx.iter().rev().take_while(|m| m.sender != *sol_id).count()
};
let engagement = state
.evaluator
.evaluate(
&sender, &body, is_dm, &recent, &state.mistral,
is_reply_to_human, messages_since_sol, is_silenced,
)
.await;
// use_thread: if true, Sol responds in a thread instead of inline
let (should_respond, is_spontaneous, use_thread) = match engagement {
Engagement::MustRespond { reason } => {
info!(room = room_id.as_str(), ?reason, "Must respond");
// Direct mention breaks silence
if is_silenced {
let mut silenced = state.silenced_until.lock().await;
silenced.remove(&room_id);
info!(room = room_id.as_str(), "Silence broken by direct mention");
}
(true, false, false)
}
Engagement::Respond { relevance, hook } => {
info!(room = room_id.as_str(), relevance, hook = hook.as_str(), "Respond (spontaneous)");
(true, true, false)
}
Engagement::ThreadReply { relevance, hook } => {
info!(room = room_id.as_str(), relevance, hook = hook.as_str(), "Thread reply (spontaneous)");
(true, true, true)
}
Engagement::React { emoji, relevance } => {
info!(room = room_id.as_str(), relevance, emoji = emoji.as_str(), "Reacting with emoji");
if let Err(e) = matrix_utils::send_reaction(&room, event.event_id.clone().into(), &emoji).await {
error!("Failed to send reaction: {e}");
}
(false, false, false)
}
Engagement::Ignore => (false, false, false),
};
if !should_respond {
return Ok(());
}
// In-flight guard: skip if we're already generating a response for this room
{
let responding = state.responding_in.lock().await;
if responding.contains(&room_id) {
debug!(room = room_id.as_str(), "Skipping — response already in flight for this room");
return Ok(());
}
}
// Cooldown check: skip spontaneous if we responded recently
if is_spontaneous {
let last = state.last_response.lock().await;
if let Some(ts) = last.get(&room_id) {
let elapsed = ts.elapsed().as_millis() as u64;
let cooldown = state.config.behavior.cooldown_after_response_ms;
if elapsed < cooldown {
debug!(
room = room_id.as_str(),
elapsed_ms = elapsed,
cooldown_ms = cooldown,
"Skipping spontaneous — within cooldown period"
);
return Ok(());
}
}
}
// Mark room as in-flight
{
let mut responding = state.responding_in.lock().await;
responding.insert(room_id.clone());
}
let context = {
let convs = state.conversations.lock().await;
convs.get_context(&room_id)
};
let members = matrix_utils::room_member_names(&room).await;
let display_sender = sender_name.as_deref().unwrap_or(&sender);
// Build context hint for new conversations (last 50 messages for continuity)
let context_hint = if state.config.agents.use_conversations_api {
let conv_exists = state
.conversation_registry
.get_conversation_id(&room_id)
.await
.is_some();
if !conv_exists && !context.is_empty() {
let hint_messages: Vec<String> = context
.iter()
.rev()
.take(50)
.rev()
.map(|m| format!("{}: {}", m.sender, m.content))
.collect();
Some(hint_messages.join("\n"))
} else {
None
}
} else {
None
};
// Generate response via ConversationRegistry (Conversations API path).
// The legacy manual chat path has been removed — Conversations API is now mandatory.
let input_text = {
let tc = crate::time_context::TimeContext::now();
let mut header = format!("{}\n[room: {} ({})]", tc.message_line(), room_name, room_id);
// TODO: inject memory notes + breadcrumbs here (like the orchestrator does)
let user_msg = if is_dm {
body.clone()
} else {
format!("<{}> {}", response_ctx.matrix_user_id, body)
};
format!("{header}\n{user_msg}")
};
let input = mistralai_client::v1::conversations::ConversationInput::Text(input_text);
let conv_result = state
.conversation_registry
.send_message(&room_id, input, is_dm, &state.mistral, context_hint.as_deref())
.await;
let response = match conv_result {
Ok(conv_response) => {
// Simple path: extract text (no tool loop for Matrix — tools handled by orchestrator)
// TODO: wire full orchestrator + Matrix bridge for tool support
conv_response.assistant_text()
}
Err(e) => {
error!("Conversation API failed: {e}");
None
}
};
if let Some(text) = response {
let content = if use_thread {
// Thread reply — less intrusive, for tangential contributions
matrix_utils::make_thread_reply(&text, event.event_id.to_owned())
} else if !is_spontaneous && !is_dm {
// Direct reply — when explicitly addressed
matrix_utils::make_reply_content(&text, event.event_id.to_owned())
} else {
// Plain message — spontaneous or DM, feels more natural
ruma::events::room::message::RoomMessageEventContent::text_markdown(&text)
};
if let Err(e) = room.send(content).await {
error!("Failed to send response: {e}");
} else {
info!(room = room_id.as_str(), len = text.len(), is_dm, use_thread, "Response sent");
}
// Post-response memory extraction (fire-and-forget)
if state.config.behavior.memory_extraction_enabled {
let ctx = response_ctx.clone();
let mistral = state.mistral.clone();
let os = state.opensearch.clone();
let config = state.config.clone();
let user_msg = body.clone();
let sol_response = text.clone();
tokio::spawn(async move {
if let Err(e) = memory::extractor::extract_and_store(
&mistral, &os, &config, &ctx, &user_msg, &sol_response,
)
.await
{
warn!("Memory extraction failed (non-fatal): {e}");
}
});
}
// Update last response timestamp
let mut last = state.last_response.lock().await;
last.insert(room_id.clone(), Instant::now());
}
// Clear in-flight flag
{
let mut responding = state.responding_in.lock().await;
responding.remove(&room_id);
}
Ok(())
}
async fn handle_reaction(event: OriginalSyncReactionEvent, state: &AppState) {
let target_event_id = event.content.relates_to.event_id.to_string();
let sender = event.sender.to_string();
let emoji = &event.content.relates_to.key;
let timestamp: i64 = event.origin_server_ts.0.into();
info!(
target = target_event_id.as_str(),
sender = sender.as_str(),
emoji = emoji.as_str(),
"Indexing reaction"
);
state.indexer.add_reaction(&target_event_id, &sender, emoji, timestamp).await;
}
async fn handle_redaction(event: OriginalSyncRoomRedactionEvent, state: &AppState) {
if let Some(redacted_id) = &event.redacts {
state.indexer.update_redaction(&redacted_id.to_string()).await;
}
}
async fn handle_invite(event: StrippedRoomMemberEvent, room: Room) {
// Only handle our own invites
if event.state_key != room.own_user_id() {
return;
}
info!(room_id = %room.room_id(), "Received invite, auto-joining");
tokio::spawn(async move {
for attempt in 0..3u32 {
match room.join().await {
Ok(_) => {
info!(room_id = %room.room_id(), "Joined room");
return;
}
Err(e) => {
warn!(room_id = %room.room_id(), attempt, "Failed to join: {e}");
tokio::time::sleep(std::time::Duration::from_secs(2u64.pow(attempt))).await;
}
}
}
error!(room_id = %room.room_id(), "Failed to join after retries");
});
}