diff --git a/src/main.rs b/src/main.rs index c476882..11d52cd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -276,6 +276,16 @@ async fn main() -> anyhow::Result<()> { } } + // Clean up hung research sessions from previous runs + let hung_sessions = store.load_running_research_sessions(); + if !hung_sessions.is_empty() { + info!(count = hung_sessions.len(), "Found hung research sessions — marking as failed"); + for (session_id, _room_id, query, _findings) in &hung_sessions { + warn!(session_id = session_id.as_str(), query = query.as_str(), "Cleaning up hung research session"); + store.fail_research_session(session_id); + } + } + // Backfill reactions from Matrix room timelines info!("Backfilling reactions from room timelines..."); if let Err(e) = backfill_reactions(&matrix_client, &state.indexer).await { diff --git a/src/persistence.rs b/src/persistence.rs index dfe9da1..be0e6ba 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -524,4 +524,95 @@ mod tests { let store = Store::open_memory().unwrap(); assert!(store.get_service_user("nobody", "gitea").is_none()); } + + // ── Research session tests ────────────────────────────────────────── + + #[test] + fn test_research_session_lifecycle() { + let store = Store::open_memory().unwrap(); + + // Create + store.create_research_session("sess-1", "!room:x", "$event1", "investigate SBBB", "[]"); + let running = store.load_running_research_sessions(); + assert_eq!(running.len(), 1); + assert_eq!(running[0].0, "sess-1"); + assert_eq!(running[0].2, "investigate SBBB"); + } + + #[test] + fn test_research_session_append_finding() { + let store = Store::open_memory().unwrap(); + store.create_research_session("sess-2", "!room:x", "$event2", "test", "[]"); + + store.append_research_finding("sess-2", r#"{"focus":"repo","findings":"found 3 files"}"#); + store.append_research_finding("sess-2", r#"{"focus":"archive","findings":"12 messages"}"#); + + let running = store.load_running_research_sessions(); + assert_eq!(running.len(), 1); + // findings_json should be a JSON array with 2 entries + let findings: serde_json::Value = serde_json::from_str(&running[0].3).unwrap(); + assert_eq!(findings.as_array().unwrap().len(), 2); + } + + #[test] + fn test_research_session_complete() { + let store = Store::open_memory().unwrap(); + store.create_research_session("sess-3", "!room:x", "$event3", "test", "[]"); + + store.complete_research_session("sess-3"); + + // Should no longer appear in running sessions + let running = store.load_running_research_sessions(); + assert!(running.is_empty()); + } + + #[test] + fn test_research_session_fail() { + let store = Store::open_memory().unwrap(); + store.create_research_session("sess-4", "!room:x", "$event4", "test", "[]"); + + store.fail_research_session("sess-4"); + + let running = store.load_running_research_sessions(); + assert!(running.is_empty()); + } + + #[test] + fn test_hung_session_cleanup_on_startup() { + let store = Store::open_memory().unwrap(); + + // Simulate 2 hung sessions + 1 completed + store.create_research_session("hung-1", "!room:a", "$e1", "query A", "[]"); + store.create_research_session("hung-2", "!room:b", "$e2", "query B", "[]"); + store.create_research_session("done-1", "!room:c", "$e3", "query C", "[]"); + store.complete_research_session("done-1"); + + // Only the 2 hung sessions should be returned + let hung = store.load_running_research_sessions(); + assert_eq!(hung.len(), 2); + + // Clean them up (simulates startup logic) + for (session_id, _, _, _) in &hung { + store.fail_research_session(session_id); + } + + // Now none should be running + assert!(store.load_running_research_sessions().is_empty()); + } + + #[test] + fn test_research_session_partial_findings_survive_failure() { + let store = Store::open_memory().unwrap(); + store.create_research_session("sess-5", "!room:x", "$e5", "deep dive", "[]"); + + // Agent 1 completes, agent 2 hasn't yet + store.append_research_finding("sess-5", r#"{"focus":"agent1","findings":"found stuff"}"#); + + // Crash! Mark as failed + store.fail_research_session("sess-5"); + + // Findings should still be queryable even though session failed + // (would need a get_session method to verify, but the key point is + // append_research_finding persists incrementally) + } } diff --git a/src/tools/research.rs b/src/tools/research.rs index bbeccac..e031496 100644 --- a/src/tools/research.rs +++ b/src/tools/research.rs @@ -172,24 +172,54 @@ pub async fn execute( let agent_senders: Vec<_> = tasks.iter().map(|_| tx.clone()).collect(); drop(tx); // Drop original so updater knows when all agents are done - // Run all research agents concurrently (join_all, not spawn — avoids Send requirement) + // Run all research agents concurrently with per-agent timeout. + // Without timeout, a hung Mistral API call blocks the entire sync loop. + let agent_timeout = std::time::Duration::from_secs(120); // 2 minutes per agent max + let futures: Vec<_> = tasks .iter() .zip(agent_senders.iter()) .map(|(task, sender)| { - run_research_agent( - task, - config, - mistral, - tools, - response_ctx, - sender, - &session_id, - store, - room, - event_id, - current_depth, - ) + let task = task.clone(); + let sender = sender.clone(); + let sid = session_id.clone(); + async move { + match tokio::time::timeout( + agent_timeout, + run_research_agent( + &task, + config, + mistral, + tools, + response_ctx, + &sender, + &sid, + store, + room, + event_id, + current_depth, + ), + ) + .await + { + Ok(result) => result, + Err(_) => { + warn!(focus = task.focus.as_str(), "Research agent timed out"); + let _ = sender + .send(ProgressUpdate::AgentFailed { + focus: task.focus.clone(), + error: "timed out after 2 minutes".into(), + }) + .await; + ResearchResult { + focus: task.focus.clone(), + findings: "Agent timed out after 2 minutes".into(), + tool_calls_made: 0, + status: "timeout".into(), + } + } + } + } }) .collect();