Files
sol/src/sync.rs

352 lines
11 KiB
Rust
Raw Normal View History

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use matrix_sdk::config::SyncSettings;
use matrix_sdk::room::Room;
use matrix_sdk::Client;
use ruma::events::reaction::OriginalSyncReactionEvent;
use ruma::events::room::member::StrippedRoomMemberEvent;
use ruma::events::room::message::OriginalSyncRoomMessageEvent;
use ruma::events::room::redaction::OriginalSyncRoomRedactionEvent;
use tokio::sync::Mutex;
use tracing::{debug, error, info, warn};
use opensearch::OpenSearch;
use crate::archive::indexer::Indexer;
use crate::archive::schema::ArchiveDocument;
use crate::brain::conversation::{ContextMessage, ConversationManager};
use crate::brain::evaluator::{Engagement, Evaluator};
use crate::brain::responder::Responder;
use crate::config::Config;
use crate::context::{self, ResponseContext};
use crate::matrix_utils;
use crate::memory;
pub struct AppState {
pub config: Arc<Config>,
pub indexer: Arc<Indexer>,
pub evaluator: Arc<Evaluator>,
pub responder: Arc<Responder>,
pub conversations: Arc<Mutex<ConversationManager>>,
pub mistral: Arc<mistralai_client::v1::client::Client>,
pub opensearch: OpenSearch,
/// Tracks when Sol last responded in each room (for cooldown)
pub last_response: Arc<Mutex<HashMap<String, Instant>>>,
/// Tracks rooms where a response is currently being generated (in-flight guard)
pub responding_in: Arc<Mutex<std::collections::HashSet<String>>>,
}
pub async fn start_sync(client: Client, state: Arc<AppState>) -> anyhow::Result<()> {
// Register event handlers
let s = state.clone();
client.add_event_handler(
move |event: OriginalSyncRoomMessageEvent, room: Room| {
let state = s.clone();
async move {
if let Err(e) = handle_message(event, room, state).await {
error!("Error handling message: {e}");
}
}
},
);
let s = state.clone();
client.add_event_handler(
move |event: OriginalSyncRoomRedactionEvent, _room: Room| {
let state = s.clone();
async move {
handle_redaction(event, &state).await;
}
},
);
let s = state.clone();
client.add_event_handler(
move |event: OriginalSyncReactionEvent, _room: Room| {
let state = s.clone();
async move {
handle_reaction(event, &state).await;
}
},
);
client.add_event_handler(
move |event: StrippedRoomMemberEvent, room: Room| async move {
handle_invite(event, room).await;
},
);
info!("Starting Matrix sync loop");
let settings = SyncSettings::default();
client.sync(settings).await?;
Ok(())
}
async fn handle_message(
event: OriginalSyncRoomMessageEvent,
room: Room,
state: Arc<AppState>,
) -> anyhow::Result<()> {
let sender = event.sender.to_string();
let room_id = room.room_id().to_string();
let event_id = event.event_id.to_string();
let timestamp = event.origin_server_ts.0.into();
// Check if this is an edit
if let Some((original_id, new_body)) = matrix_utils::extract_edit(&event) {
state
.indexer
.update_edit(&original_id.to_string(), &new_body)
.await;
return Ok(());
}
let Some(body) = matrix_utils::extract_body(&event) else {
return Ok(());
};
let room_name = matrix_utils::room_display_name(&room);
let sender_name = room
.get_member_no_sync(&event.sender)
.await
.ok()
.flatten()
.and_then(|m| m.display_name().map(|s| s.to_string()));
let reply_to = matrix_utils::extract_reply_to(&event).map(|id| id.to_string());
let is_reply = reply_to.is_some();
let thread_id = matrix_utils::extract_thread_id(&event).map(|id| id.to_string());
// Archive the message
let doc = ArchiveDocument {
event_id: event_id.clone(),
room_id: room_id.clone(),
room_name: Some(room_name.clone()),
sender: sender.clone(),
sender_name: sender_name.clone(),
timestamp,
content: body.clone(),
reply_to,
thread_id,
media_urls: Vec::new(),
event_type: "m.room.message".into(),
edited: false,
redacted: false,
reactions: Vec::new(),
};
state.indexer.add(doc).await;
// Update conversation context
let is_dm = room.is_direct().await.unwrap_or(false);
let response_ctx = ResponseContext {
matrix_user_id: sender.clone(),
user_id: context::derive_user_id(&sender),
display_name: sender_name.clone(),
is_dm,
is_reply,
room_id: room_id.clone(),
};
{
let mut convs = state.conversations.lock().await;
convs.add_message(
&room_id,
is_dm,
ContextMessage {
sender: sender_name.clone().unwrap_or_else(|| sender.clone()),
content: body.clone(),
timestamp,
},
);
}
// Evaluate whether to respond
let recent: Vec<String> = {
let convs = state.conversations.lock().await;
convs
.get_context(&room_id)
.iter()
.map(|m| format!("{}: {}", m.sender, m.content))
.collect()
};
let engagement = state
.evaluator
.evaluate(&sender, &body, is_dm, &recent, &state.mistral)
.await;
let (should_respond, is_spontaneous) = match engagement {
Engagement::MustRespond { reason } => {
info!(room = room_id.as_str(), ?reason, "Must respond");
(true, false)
}
Engagement::MaybeRespond { relevance, hook } => {
info!(room = room_id.as_str(), relevance, hook = hook.as_str(), "Maybe respond (spontaneous)");
(true, true)
}
Engagement::React { emoji, relevance } => {
info!(room = room_id.as_str(), relevance, emoji = emoji.as_str(), "Reacting with emoji");
if let Err(e) = matrix_utils::send_reaction(&room, event.event_id.clone().into(), &emoji).await {
error!("Failed to send reaction: {e}");
}
(false, false)
}
Engagement::Ignore => (false, false),
};
if !should_respond {
return Ok(());
}
// In-flight guard: skip if we're already generating a response for this room
{
let responding = state.responding_in.lock().await;
if responding.contains(&room_id) {
debug!(room = room_id.as_str(), "Skipping — response already in flight for this room");
return Ok(());
}
}
// Cooldown check: skip spontaneous if we responded recently
if is_spontaneous {
let last = state.last_response.lock().await;
if let Some(ts) = last.get(&room_id) {
let elapsed = ts.elapsed().as_millis() as u64;
let cooldown = state.config.behavior.cooldown_after_response_ms;
if elapsed < cooldown {
debug!(
room = room_id.as_str(),
elapsed_ms = elapsed,
cooldown_ms = cooldown,
"Skipping spontaneous — within cooldown period"
);
return Ok(());
}
}
}
// Mark room as in-flight
{
let mut responding = state.responding_in.lock().await;
responding.insert(room_id.clone());
}
let context = {
let convs = state.conversations.lock().await;
convs.get_context(&room_id)
};
let members = matrix_utils::room_member_names(&room).await;
let display_sender = sender_name.as_deref().unwrap_or(&sender);
let response = state
.responder
.generate_response(
&context,
&body,
display_sender,
&room_name,
&members,
is_spontaneous,
&state.mistral,
&room,
&response_ctx,
)
.await;
if let Some(text) = response {
// Reply with reference only when directly addressed. Spontaneous
// and DM messages are sent as plain content — feels more natural.
let content = if !is_spontaneous && !is_dm {
matrix_utils::make_reply_content(&text, event.event_id.to_owned())
} else {
ruma::events::room::message::RoomMessageEventContent::text_markdown(&text)
};
if let Err(e) = room.send(content).await {
error!("Failed to send response: {e}");
} else {
info!(room = room_id.as_str(), len = text.len(), is_dm, "Response sent");
}
// Post-response memory extraction (fire-and-forget)
if state.config.behavior.memory_extraction_enabled {
let ctx = response_ctx.clone();
let mistral = state.mistral.clone();
let os = state.opensearch.clone();
let config = state.config.clone();
let user_msg = body.clone();
let sol_response = text.clone();
tokio::spawn(async move {
if let Err(e) = memory::extractor::extract_and_store(
&mistral, &os, &config, &ctx, &user_msg, &sol_response,
)
.await
{
warn!("Memory extraction failed (non-fatal): {e}");
}
});
}
// Update last response timestamp
let mut last = state.last_response.lock().await;
last.insert(room_id.clone(), Instant::now());
}
// Clear in-flight flag
{
let mut responding = state.responding_in.lock().await;
responding.remove(&room_id);
}
Ok(())
}
async fn handle_reaction(event: OriginalSyncReactionEvent, state: &AppState) {
let target_event_id = event.content.relates_to.event_id.to_string();
let sender = event.sender.to_string();
let emoji = &event.content.relates_to.key;
let timestamp: i64 = event.origin_server_ts.0.into();
info!(
target = target_event_id.as_str(),
sender = sender.as_str(),
emoji = emoji.as_str(),
"Indexing reaction"
);
state.indexer.add_reaction(&target_event_id, &sender, emoji, timestamp).await;
}
async fn handle_redaction(event: OriginalSyncRoomRedactionEvent, state: &AppState) {
if let Some(redacted_id) = &event.redacts {
state.indexer.update_redaction(&redacted_id.to_string()).await;
}
}
async fn handle_invite(event: StrippedRoomMemberEvent, room: Room) {
// Only handle our own invites
if event.state_key != room.own_user_id() {
return;
}
info!(room_id = %room.room_id(), "Received invite, auto-joining");
tokio::spawn(async move {
for attempt in 0..3u32 {
match room.join().await {
Ok(_) => {
info!(room_id = %room.room_id(), "Joined room");
return;
}
Err(e) => {
warn!(room_id = %room.room_id(), attempt, "Failed to join: {e}");
tokio::time::sleep(std::time::Duration::from_secs(2u64.pow(attempt))).await;
}
}
}
error!(room_id = %room.room_id(), "Failed to join after retries");
});
}