//! End-to-end integration tests against the real Mistral API. //! //! Requires SOL_MISTRAL_API_KEY in .env file. //! Run: cargo test integration_test -- --test-threads=1 #![cfg(test)] use std::sync::Arc; use std::time::Duration; use crate::config::Config; use crate::conversations::ConversationRegistry; use crate::orchestrator::event::*; use crate::orchestrator::Orchestrator; use crate::persistence::Store; use crate::tools::ToolRegistry; // ── Test harness ──────────────────────────────────────────────────────── struct TestHarness { orchestrator: Arc, event_rx: tokio::sync::broadcast::Receiver, } impl TestHarness { async fn new() -> Self { // Load .env from project root let env_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join(".env"); if let Ok(contents) = std::fs::read_to_string(&env_path) { for line in contents.lines() { let line = line.trim(); if line.is_empty() || line.starts_with('#') { continue; } if let Some((key, value)) = line.split_once('=') { std::env::set_var(key.trim(), value.trim()); } } } let api_key = std::env::var("SOL_MISTRAL_API_KEY") .expect("SOL_MISTRAL_API_KEY must be set in .env"); let config = test_config(); let mistral = Arc::new( mistralai_client::v1::client::Client::new(Some(api_key), None, None, None) .expect("Failed to create Mistral client"), ); let store = Arc::new(Store::open_memory().expect("Failed to create in-memory store")); let conversations = Arc::new(ConversationRegistry::new( config.agents.orchestrator_model.clone(), config.agents.compaction_threshold, store, )); let tools = Arc::new(ToolRegistry::new_minimal(config.clone())); let orchestrator = Arc::new(Orchestrator::new( config, tools, mistral, conversations, "you are sol. respond briefly and concisely. lowercase only.".into(), )); let event_rx = orchestrator.subscribe(); Self { orchestrator, event_rx } } async fn collect_events_for( &mut self, request_id: &RequestId, timeout_secs: u64, ) -> Vec { let mut events = Vec::new(); let deadline = tokio::time::Instant::now() + Duration::from_secs(timeout_secs); loop { match tokio::time::timeout_at(deadline, self.event_rx.recv()).await { Ok(Ok(event)) => { if event.request_id() != request_id { continue; } let is_terminal = matches!( event, OrchestratorEvent::Done { .. } | OrchestratorEvent::Failed { .. } ); events.push(event); if is_terminal { break; } } Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(_))) => continue, Ok(Err(_)) => break, Err(_) => panic!("Timeout after {timeout_secs}s waiting for events"), } } events } } fn test_config() -> Arc { let toml = r#" [matrix] homeserver_url = "http://localhost:8008" user_id = "@test:localhost" state_store_path = "/tmp/sol-test-state" db_path = ":memory:" [opensearch] url = "http://localhost:9200" index = "sol_test" [mistral] default_model = "mistral-medium-latest" max_tool_iterations = 10 [behavior] instant_responses = true memory_extraction_enabled = false [agents] orchestrator_model = "mistral-medium-latest" use_conversations_api = true agent_prefix = "test" [grpc] listen_addr = "0.0.0.0:0" dev_mode = true "#; Arc::new(Config::from_str(toml).expect("Failed to parse test config")) } fn make_request(text: &str) -> GenerateRequest { GenerateRequest { request_id: RequestId::new(), text: text.into(), user_id: "test-user".into(), display_name: None, conversation_key: format!("test-{}", uuid::Uuid::new_v4()), is_direct: true, image: None, metadata: Metadata::new(), } } fn make_request_with_key(text: &str, conversation_key: &str) -> GenerateRequest { GenerateRequest { request_id: RequestId::new(), text: text.into(), user_id: "test-user".into(), display_name: None, conversation_key: conversation_key.into(), is_direct: true, image: None, metadata: Metadata::new(), } } // ── Test 1: Simple chat round-trip ────────────────────────────────────── #[tokio::test] async fn test_simple_chat_roundtrip() { let mut h = TestHarness::new().await; let request = make_request("what is 2+2? answer with just the number."); let rid = request.request_id.clone(); let orch = h.orchestrator.clone(); let gen = tokio::spawn(async move { orch.generate(&request).await }); let events = h.collect_events_for(&rid, 30).await; let result = gen.await.unwrap(); assert!(result.is_some(), "Expected a response"); let text = result.unwrap(); assert!(text.contains('4'), "Expected '4' in response, got: {text}"); assert!(events.iter().any(|e| matches!(e, OrchestratorEvent::Started { .. }))); assert!(events.iter().any(|e| matches!(e, OrchestratorEvent::Thinking { .. }))); let done = events.iter().find(|e| matches!(e, OrchestratorEvent::Done { .. })); assert!(done.is_some(), "Missing Done event"); if let Some(OrchestratorEvent::Done { usage, .. }) = done { assert!(usage.prompt_tokens > 0); assert!(usage.completion_tokens > 0); } } // ── Test 2: Conversation continuity ───────────────────────────────────── #[tokio::test] async fn test_conversation_continuity() { let mut h = TestHarness::new().await; let conv_key = format!("test-{}", uuid::Uuid::new_v4()); // Turn 1 let r1 = make_request_with_key("my favorite color is cerulean. just acknowledge.", &conv_key); let rid1 = r1.request_id.clone(); let orch1 = h.orchestrator.clone(); let gen1 = tokio::spawn(async move { orch1.generate(&r1).await }); h.collect_events_for(&rid1, 30).await; let result1 = gen1.await.unwrap(); assert!(result1.is_some(), "Turn 1 should get a response"); // Turn 2 let r2 = make_request_with_key("what is my favorite color?", &conv_key); let rid2 = r2.request_id.clone(); let orch2 = h.orchestrator.clone(); let gen2 = tokio::spawn(async move { orch2.generate(&r2).await }); h.collect_events_for(&rid2, 30).await; let result2 = gen2.await.unwrap(); assert!(result2.is_some(), "Turn 2 should get a response"); let text = result2.unwrap().to_lowercase(); assert!(text.contains("cerulean"), "Expected 'cerulean', got: {text}"); } // ── Test 3: Client-side tool dispatch ─────────────────────────────────── #[tokio::test] async fn test_client_tool_dispatch() { use mistralai_client::v1::conversations::{ ConversationInput, CreateConversationRequest, }; use mistralai_client::v1::agents::AgentTool; let mut h = TestHarness::new().await; // Create conversation directly with file_read tool defined let api_key = std::env::var("SOL_MISTRAL_API_KEY") .expect("SOL_MISTRAL_API_KEY must be set"); let mistral = mistralai_client::v1::client::Client::new(Some(api_key), None, None, None) .expect("Failed to create Mistral client"); let file_read_tool = AgentTool::function( "file_read".into(), "Read a file's contents. Use path for the file path.".into(), serde_json::json!({ "type": "object", "properties": { "path": { "type": "string", "description": "File path to read" } }, "required": ["path"] }), ); let req = CreateConversationRequest { inputs: ConversationInput::Text( "use your file_read tool to read the file at path 'README.md'".into(), ), model: Some("mistral-medium-latest".into()), agent_id: None, agent_version: None, name: Some("test-client-tool".into()), description: None, instructions: Some("you are a coding assistant. use tools when asked.".into()), completion_args: None, tools: Some(vec![file_read_tool]), handoff_execution: None, metadata: None, store: Some(true), stream: false, }; let conv_response = mistral .create_conversation_async(&req) .await .expect("Failed to create conversation"); // Now pass to orchestrator let request = make_request("use your file_read tool to read README.md"); let rid = request.request_id.clone(); let orch = h.orchestrator.clone(); let orch_submit = h.orchestrator.clone(); let gen = tokio::spawn(async move { orch.generate_from_response(&request, conv_response).await }); let mut got_client_tool = false; let deadline = tokio::time::Instant::now() + Duration::from_secs(60); loop { match tokio::time::timeout_at(deadline, h.event_rx.recv()).await { Ok(Ok(event)) => { if event.request_id() != &rid { continue; } match event { OrchestratorEvent::ToolCallDetected { side: ToolSide::Client, call_id, ref name, .. } => { assert_eq!(name, "file_read"); got_client_tool = true; orch_submit .submit_tool_result(&call_id, ToolResultPayload { text: "# Sol\n\nVirtual librarian for Sunbeam.".into(), is_error: false, }) .await .expect("Failed to submit tool result"); } OrchestratorEvent::Done { .. } | OrchestratorEvent::Failed { .. } => break, _ => continue, } } Ok(Err(_)) => break, Err(_) => panic!("Timeout waiting for client tool dispatch"), } } assert!(got_client_tool, "Expected client-side file_read tool call"); let result = gen.await.unwrap(); assert!(result.is_some(), "Expected response after tool execution"); } // ── Test 4: Event ordering ─────────────────────────────────────────────── #[tokio::test] async fn test_event_ordering() { let mut h = TestHarness::new().await; let request = make_request("say hello"); let rid = request.request_id.clone(); let orch = h.orchestrator.clone(); let gen = tokio::spawn(async move { orch.generate(&request).await }); let events = h.collect_events_for(&rid, 30).await; let _ = gen.await; // Verify strict ordering: Started → Thinking → Done assert!(events.len() >= 3, "Expected at least 3 events, got {}", events.len()); assert!(matches!(events[0], OrchestratorEvent::Started { .. }), "First event should be Started"); assert!(matches!(events[1], OrchestratorEvent::Thinking { .. }), "Second event should be Thinking"); assert!(matches!(events.last().unwrap(), OrchestratorEvent::Done { .. }), "Last event should be Done"); } // ── Test 5: Metadata pass-through ─────────────────────────────────────── #[tokio::test] async fn test_metadata_passthrough() { let mut h = TestHarness::new().await; let mut request = make_request("hi"); request.metadata.insert("room_id", "!test-room:localhost"); request.metadata.insert("custom_key", "custom_value"); let rid = request.request_id.clone(); let orch = h.orchestrator.clone(); let gen = tokio::spawn(async move { orch.generate(&request).await }); let events = h.collect_events_for(&rid, 30).await; let _ = gen.await; // Started event should carry metadata let started = events.iter().find(|e| matches!(e, OrchestratorEvent::Started { .. })); assert!(started.is_some(), "Missing Started event"); if let Some(OrchestratorEvent::Started { metadata, .. }) = started { assert_eq!(metadata.get("room_id"), Some("!test-room:localhost")); assert_eq!(metadata.get("custom_key"), Some("custom_value")); } } // ── Test 6: Token usage accuracy ──────────────────────────────────────── #[tokio::test] async fn test_token_usage_accuracy() { let mut h = TestHarness::new().await; // Short prompt → small token count let r1 = make_request("say ok"); let rid1 = r1.request_id.clone(); let orch1 = h.orchestrator.clone(); let gen1 = tokio::spawn(async move { orch1.generate(&r1).await }); let events1 = h.collect_events_for(&rid1, 30).await; let _ = gen1.await; let done1 = events1.iter().find_map(|e| match e { OrchestratorEvent::Done { usage, .. } => Some(usage.clone()), _ => None, }).expect("Missing Done event"); // Longer prompt → larger token count let r2 = make_request( "write a haiku about the sun setting over the ocean. include imagery of waves." ); let rid2 = r2.request_id.clone(); let orch2 = h.orchestrator.clone(); let gen2 = tokio::spawn(async move { orch2.generate(&r2).await }); let events2 = h.collect_events_for(&rid2, 30).await; let _ = gen2.await; let done2 = events2.iter().find_map(|e| match e { OrchestratorEvent::Done { usage, .. } => Some(usage.clone()), _ => None, }).expect("Missing Done event"); // Both should have non-zero tokens assert!(done1.prompt_tokens > 0); assert!(done1.completion_tokens > 0); assert!(done2.prompt_tokens > 0); assert!(done2.completion_tokens > 0); // The longer prompt should use more completion tokens (haiku vs "ok") assert!( done2.completion_tokens > done1.completion_tokens, "Longer request should produce more completion tokens: {} vs {}", done2.completion_tokens, done1.completion_tokens ); } // ── Test 7: Failed tool result ────────────────────────────────────────── #[tokio::test] async fn test_failed_tool_result() { use mistralai_client::v1::conversations::{ ConversationInput, CreateConversationRequest, }; use mistralai_client::v1::agents::AgentTool; let mut h = TestHarness::new().await; let api_key = std::env::var("SOL_MISTRAL_API_KEY").unwrap(); let mistral = mistralai_client::v1::client::Client::new(Some(api_key), None, None, None).unwrap(); let tool = AgentTool::function( "file_read".into(), "Read a file.".into(), serde_json::json!({ "type": "object", "properties": { "path": { "type": "string" } }, "required": ["path"] }), ); let req = CreateConversationRequest { inputs: ConversationInput::Text("read the file at /nonexistent/path".into()), model: Some("mistral-medium-latest".into()), agent_id: None, agent_version: None, name: Some("test-failed-tool".into()), description: None, instructions: Some("use tools when asked.".into()), completion_args: None, tools: Some(vec![tool]), handoff_execution: None, metadata: None, store: Some(true), stream: false, }; let conv_response = mistral.create_conversation_async(&req).await.unwrap(); let request = make_request("read /nonexistent/path"); let rid = request.request_id.clone(); let orch = h.orchestrator.clone(); let orch_submit = h.orchestrator.clone(); let gen = tokio::spawn(async move { orch.generate_from_response(&request, conv_response).await }); // Submit error result when tool is called let deadline = tokio::time::Instant::now() + Duration::from_secs(60); loop { match tokio::time::timeout_at(deadline, h.event_rx.recv()).await { Ok(Ok(event)) => { if event.request_id() != &rid { continue; } match event { OrchestratorEvent::ToolCallDetected { side: ToolSide::Client, call_id, .. } => { orch_submit.submit_tool_result(&call_id, ToolResultPayload { text: "Error: file not found".into(), is_error: true, }).await.unwrap(); } OrchestratorEvent::ToolCompleted { success, .. } => { assert!(!success, "Expected tool to report failure"); } OrchestratorEvent::Done { .. } | OrchestratorEvent::Failed { .. } => break, _ => continue, } } Ok(Err(_)) => break, Err(_) => panic!("Timeout"), } } // Model should still produce a response (explaining the error) let result = gen.await.unwrap(); assert!(result.is_some(), "Expected response even after tool error"); } // ── Test 8: Server-side tool execution (search_web) ───────────────────── // Note: run_script requires deno sandbox + tool definitions from the agent. // search_web is more reliably available in test conversations. #[tokio::test] async fn test_server_tool_execution() { use mistralai_client::v1::conversations::{ ConversationInput, CreateConversationRequest, }; use mistralai_client::v1::agents::AgentTool; let mut h = TestHarness::new().await; let api_key = std::env::var("SOL_MISTRAL_API_KEY").unwrap(); let mistral = mistralai_client::v1::client::Client::new(Some(api_key), None, None, None).unwrap(); // Create conversation with search_web tool let tool = AgentTool::function( "search_web".into(), "Search the web. Returns titles, URLs, and snippets.".into(), serde_json::json!({ "type": "object", "properties": { "query": { "type": "string", "description": "Search query" } }, "required": ["query"] }), ); let req = CreateConversationRequest { inputs: ConversationInput::Text("search the web for 'rust programming language'".into()), model: Some("mistral-medium-latest".into()), agent_id: None, agent_version: None, name: Some("test-server-tool".into()), description: None, instructions: Some("use tools when asked. always use the search_web tool for any web search request.".into()), completion_args: None, tools: Some(vec![tool]), handoff_execution: None, metadata: None, store: Some(true), stream: false, }; let conv_response = mistral.create_conversation_async(&req).await.unwrap(); let request = make_request("search the web for rust"); let rid = request.request_id.clone(); let orch = h.orchestrator.clone(); let gen = tokio::spawn(async move { orch.generate_from_response(&request, conv_response).await }); let events = h.collect_events_for(&rid, 60).await; let result = gen.await.unwrap(); // May or may not produce a result (search_web needs SearXNG running) // But we should at least see the tool call events let tool_detected = events.iter().find(|e| matches!(e, OrchestratorEvent::ToolCallDetected { .. })); assert!(tool_detected.is_some(), "Expected ToolCallDetected for search_web"); if let Some(OrchestratorEvent::ToolCallDetected { side, name, .. }) = tool_detected { assert_eq!(*side, ToolSide::Server); assert_eq!(name, "search_web"); } assert!(events.iter().any(|e| matches!(e, OrchestratorEvent::ToolStarted { .. }))); assert!(events.iter().any(|e| matches!(e, OrchestratorEvent::ToolCompleted { .. }))); } // ══════════════════════════════════════════════════════════════════════════ // gRPC integration tests — full round-trip through the gRPC server // ══════════════════════════════════════════════════════════════════════════ mod grpc_tests { use super::*; use crate::grpc::{self, GrpcState}; use crate::grpc::code_agent_client::CodeAgentClient; use crate::grpc::*; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; /// Start a gRPC server on a random port and return the endpoint URL. async fn start_test_server() -> (String, Arc) { let env_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join(".env"); if let Ok(contents) = std::fs::read_to_string(&env_path) { for line in contents.lines() { let line = line.trim(); if line.is_empty() || line.starts_with('#') { continue; } if let Some((k, v)) = line.split_once('=') { std::env::set_var(k.trim(), v.trim()); } } } let api_key = std::env::var("SOL_MISTRAL_API_KEY") .expect("SOL_MISTRAL_API_KEY must be set"); let config = test_config(); let mistral = Arc::new( mistralai_client::v1::client::Client::new(Some(api_key), None, None, None).unwrap(), ); let store = Arc::new(Store::open_memory().unwrap()); let conversations = Arc::new(ConversationRegistry::new( config.agents.orchestrator_model.clone(), config.agents.compaction_threshold, store.clone(), )); let tools = Arc::new(ToolRegistry::new_minimal(config.clone())); let orch = Arc::new(Orchestrator::new( config.clone(), tools.clone(), mistral.clone(), conversations, "you are sol. respond briefly. lowercase only.".into(), )); let grpc_state = Arc::new(GrpcState { config: config.clone(), tools, store, mistral, matrix: None, opensearch: None, // breadcrumbs disabled in tests system_prompt: "you are sol. respond briefly. lowercase only.".into(), orchestrator_agent_id: String::new(), orchestrator: Some(orch), }); let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); let endpoint = format!("http://{addr}"); let state = grpc_state.clone(); tokio::spawn(async move { let svc = crate::grpc::service::CodeAgentService::new(state); let incoming = tokio_stream::wrappers::TcpListenerStream::new(listener); tonic::transport::Server::builder() .add_service(CodeAgentServer::new(svc)) .serve_with_incoming(incoming) .await .unwrap(); }); tokio::time::sleep(Duration::from_millis(100)).await; (endpoint, grpc_state) } /// Connect a gRPC client and send StartSession. Returns (tx, rx, session_ready). async fn connect_session( endpoint: &str, ) -> ( mpsc::Sender, tonic::Streaming, SessionReady, ) { let mut client = CodeAgentClient::connect(endpoint.to_string()).await.unwrap(); let (tx, client_rx) = mpsc::channel::(32); let stream = ReceiverStream::new(client_rx); let response = client.session(stream).await.unwrap(); let mut rx = response.into_inner(); // Send StartSession tx.send(ClientMessage { payload: Some(client_message::Payload::Start(StartSession { project_path: "/tmp/test-project".into(), prompt_md: String::new(), config_toml: String::new(), git_branch: "main".into(), git_status: String::new(), file_tree: vec![], model: "mistral-medium-latest".into(), client_tools: vec![], capabilities: vec![], })), }) .await .unwrap(); // Wait for SessionReady let ready = loop { match rx.message().await.unwrap() { Some(ServerMessage { payload: Some(server_message::Payload::Ready(r)) }) => break r, Some(ServerMessage { payload: Some(server_message::Payload::Error(e)) }) => { panic!("Session start failed: {}", e.message); } _ => continue, } }; (tx, rx, ready) } #[tokio::test] async fn test_grpc_simple_roundtrip() { let (endpoint, _state) = start_test_server().await; let (tx, mut rx, ready) = connect_session(&endpoint).await; assert!(!ready.session_id.is_empty()); assert!(!ready.room_id.is_empty()); // Send a message tx.send(ClientMessage { payload: Some(client_message::Payload::Input(UserInput { text: "what is 3+3? answer with just the number.".into(), })), }) .await .unwrap(); // Collect server messages until TextDone let mut got_status = false; let mut got_done = false; let deadline = tokio::time::Instant::now() + Duration::from_secs(30); loop { match tokio::time::timeout_at(deadline, rx.message()).await { Ok(Ok(Some(msg))) => match msg.payload { Some(server_message::Payload::Status(_)) => got_status = true, Some(server_message::Payload::Done(d)) => { got_done = true; assert!(d.full_text.contains('6'), "Expected '6', got: {}", d.full_text); assert!(d.input_tokens > 0, "Expected non-zero input tokens"); assert!(d.output_tokens > 0, "Expected non-zero output tokens"); break; } Some(server_message::Payload::Error(e)) => { panic!("Server error: {}", e.message); } _ => continue, }, Ok(Ok(None)) => panic!("Stream closed before Done"), Ok(Err(e)) => panic!("Stream error: {e}"), Err(_) => panic!("Timeout waiting for gRPC response"), } } assert!(got_status, "Expected at least one Status message"); assert!(got_done, "Expected TextDone message"); // Clean disconnect tx.send(ClientMessage { payload: Some(client_message::Payload::End(EndSession {})), }) .await .unwrap(); } #[tokio::test] async fn test_grpc_client_tool_relay() { let (endpoint, _state) = start_test_server().await; let (tx, mut rx, _ready) = connect_session(&endpoint).await; // Send a message that should trigger file_read tx.send(ClientMessage { payload: Some(client_message::Payload::Input(UserInput { text: "use file_read to read README.md".into(), })), }) .await .unwrap(); let mut got_tool_call = false; let mut got_done = false; let deadline = tokio::time::Instant::now() + Duration::from_secs(60); loop { match tokio::time::timeout_at(deadline, rx.message()).await { Ok(Ok(Some(msg))) => match msg.payload { Some(server_message::Payload::ToolCall(tc)) => { assert!(tc.is_local, "Expected local tool, got: {}", tc.name); // Model may call file_read, list_directory, or other client tools got_tool_call = true; // Send tool result back tx.send(ClientMessage { payload: Some(client_message::Payload::ToolResult( crate::grpc::ToolResult { call_id: tc.call_id, result: "# Sol\nVirtual librarian.".into(), is_error: false, }, )), }) .await .unwrap(); } Some(server_message::Payload::Done(d)) => { got_done = true; assert!(!d.full_text.is_empty(), "Expected non-empty response"); break; } Some(server_message::Payload::Error(e)) => { panic!("Server error: {}", e.message); } _ => continue, }, Ok(Ok(None)) => break, Ok(Err(e)) => panic!("Stream error: {e}"), Err(_) => panic!("Timeout"), } } assert!(got_tool_call, "Expected ToolCall for file_read"); assert!(got_done, "Expected TextDone after tool execution"); } #[tokio::test] async fn test_grpc_token_counts() { let (endpoint, _state) = start_test_server().await; let (tx, mut rx, _ready) = connect_session(&endpoint).await; tx.send(ClientMessage { payload: Some(client_message::Payload::Input(UserInput { text: "say hello".into(), })), }) .await .unwrap(); let deadline = tokio::time::Instant::now() + Duration::from_secs(30); loop { match tokio::time::timeout_at(deadline, rx.message()).await { Ok(Ok(Some(msg))) => match msg.payload { Some(server_message::Payload::Done(d)) => { assert!(d.input_tokens > 0, "input_tokens should be > 0, got {}", d.input_tokens); assert!(d.output_tokens > 0, "output_tokens should be > 0, got {}", d.output_tokens); break; } Some(server_message::Payload::Error(e)) => panic!("Error: {}", e.message), _ => continue, }, Ok(Ok(None)) => panic!("Stream closed"), Ok(Err(e)) => panic!("Stream error: {e}"), Err(_) => panic!("Timeout"), } } } #[tokio::test] async fn test_grpc_session_resume() { let (endpoint, _state) = start_test_server().await; // Session 1: establish context let (tx1, mut rx1, ready1) = connect_session(&endpoint).await; tx1.send(ClientMessage { payload: Some(client_message::Payload::Input(UserInput { text: "my secret code is 42. remember it.".into(), })), }).await.unwrap(); // Wait for response let deadline = tokio::time::Instant::now() + Duration::from_secs(30); loop { match tokio::time::timeout_at(deadline, rx1.message()).await { Ok(Ok(Some(msg))) => match msg.payload { Some(server_message::Payload::Done(_)) => break, Some(server_message::Payload::Error(e)) => panic!("Error: {}", e.message), _ => continue, }, Ok(Ok(None)) => break, Ok(Err(e)) => panic!("Error: {e}"), Err(_) => panic!("Timeout"), } } // Disconnect (don't send End — keeps session active) drop(tx1); drop(rx1); // Session 2: reconnect — should resume the same session let (tx2, mut rx2, ready2) = connect_session(&endpoint).await; // Should be the same session (same project path → same room) assert_eq!(ready2.room_id, ready1.room_id, "Should resume same room"); assert!(ready2.resumed, "Should indicate resumed session"); // History requires Matrix (not available in tests) — just check session resumed // Ask for recall tx2.send(ClientMessage { payload: Some(client_message::Payload::Input(UserInput { text: "what is my secret code?".into(), })), }).await.unwrap(); let deadline = tokio::time::Instant::now() + Duration::from_secs(30); loop { match tokio::time::timeout_at(deadline, rx2.message()).await { Ok(Ok(Some(msg))) => match msg.payload { Some(server_message::Payload::Done(d)) => { assert!( d.full_text.contains("42"), "Expected model to recall '42', got: {}", d.full_text ); break; } Some(server_message::Payload::Error(e)) => panic!("Error: {}", e.message), _ => continue, }, Ok(Ok(None)) => panic!("Stream closed"), Ok(Err(e)) => panic!("Error: {e}"), Err(_) => panic!("Timeout"), } } } #[tokio::test] async fn test_grpc_clean_disconnect() { let (endpoint, _state) = start_test_server().await; let (tx, mut rx, ready) = connect_session(&endpoint).await; assert!(!ready.session_id.is_empty()); // Clean disconnect tx.send(ClientMessage { payload: Some(client_message::Payload::End(EndSession {})), }).await.unwrap(); // Should get SessionEnd let deadline = tokio::time::Instant::now() + Duration::from_secs(5); let mut got_end = false; loop { match tokio::time::timeout_at(deadline, rx.message()).await { Ok(Ok(Some(msg))) => match msg.payload { Some(server_message::Payload::End(_)) => { got_end = true; break; } _ => continue, }, Ok(Ok(None)) => break, Ok(Err(_)) => break, Err(_) => break, } } assert!(got_end, "Server should send SessionEnd on clean disconnect"); } } // ══════════════════════════════════════════════════════════════════════════ // Code index + breadcrumb integration tests (requires local OpenSearch) // ══════════════════════════════════════════════════════════════════════════ mod code_index_tests { use super::*; use crate::code_index::schema::{self, SymbolDocument}; use crate::code_index::indexer::CodeIndexer; use crate::breadcrumbs; pub(super) fn os_client() -> Option { use opensearch::http::transport::{SingleNodeConnectionPool, TransportBuilder}; let url = url::Url::parse("http://localhost:9200").ok()?; let transport = TransportBuilder::new(SingleNodeConnectionPool::new(url)) .build() .ok()?; Some(opensearch::OpenSearch::new(transport)) } pub(super) async fn setup_test_index(client: &opensearch::OpenSearch) -> String { let index = format!("sol_code_test_{}", uuid::Uuid::new_v4().to_string().split('-').next().unwrap()); schema::create_index_if_not_exists(client, &index).await.unwrap(); index } pub(super) async fn refresh_index(client: &opensearch::OpenSearch, index: &str) { let _ = client .indices() .refresh(opensearch::indices::IndicesRefreshParts::Index(&[index])) .send() .await; } pub(super) async fn cleanup_index(client: &opensearch::OpenSearch, index: &str) { let _ = client .indices() .delete(opensearch::indices::IndicesDeleteParts::Index(&[index])) .send() .await; } fn sample_symbols() -> Vec { let now = chrono::Utc::now().timestamp_millis(); vec![ SymbolDocument { file_path: "src/orchestrator/mod.rs".into(), repo_owner: Some("studio".into()), repo_name: "sol".into(), language: "rust".into(), symbol_name: "generate".into(), symbol_kind: "function".into(), signature: "pub async fn generate(&self, req: &GenerateRequest) -> Option".into(), docstring: "Generate a response using the ConversationRegistry.".into(), start_line: 80, end_line: 120, content: "pub async fn generate(&self, req: &GenerateRequest) -> Option { ... }".into(), branch: "mainline".into(), source: "local".into(), indexed_at: now, }, SymbolDocument { file_path: "src/orchestrator/engine.rs".into(), repo_owner: Some("studio".into()), repo_name: "sol".into(), language: "rust".into(), symbol_name: "run_tool_loop".into(), symbol_kind: "function".into(), signature: "pub async fn run_tool_loop(orch: &Orchestrator, req: &GenerateRequest, resp: ConversationResponse) -> Option<(String, TokenUsage)>".into(), docstring: "Unified Mistral tool loop. Emits events for every state transition.".into(), start_line: 20, end_line: 160, content: "pub async fn run_tool_loop(...) { ... tool iteration ... }".into(), branch: "mainline".into(), source: "local".into(), indexed_at: now, }, SymbolDocument { file_path: "src/orchestrator/tool_dispatch.rs".into(), repo_owner: Some("studio".into()), repo_name: "sol".into(), language: "rust".into(), symbol_name: "route".into(), symbol_kind: "function".into(), signature: "pub fn route(tool_name: &str) -> ToolSide".into(), docstring: "Route a tool call to server or client.".into(), start_line: 17, end_line: 23, content: "pub fn route(tool_name: &str) -> ToolSide { if CLIENT_TOOLS.contains ... }".into(), branch: "mainline".into(), source: "local".into(), indexed_at: now, }, SymbolDocument { file_path: "src/orchestrator/event.rs".into(), repo_owner: Some("studio".into()), repo_name: "sol".into(), language: "rust".into(), symbol_name: "ToolSide".into(), symbol_kind: "enum".into(), signature: "pub enum ToolSide { Server, Client }".into(), docstring: "Whether a tool executes on the server or on a connected client.".into(), start_line: 68, end_line: 72, content: "pub enum ToolSide { Server, Client }".into(), branch: "mainline".into(), source: "local".into(), indexed_at: now, }, SymbolDocument { file_path: "src/orchestrator/event.rs".into(), repo_owner: Some("studio".into()), repo_name: "sol".into(), language: "rust".into(), symbol_name: "OrchestratorEvent".into(), symbol_kind: "enum".into(), signature: "pub enum OrchestratorEvent { Started, Thinking, ToolCallDetected, ToolStarted, ToolCompleted, Done, Failed }".into(), docstring: "An event emitted by the orchestrator during response generation.".into(), start_line: 110, end_line: 170, content: "pub enum OrchestratorEvent { ... }".into(), branch: "mainline".into(), source: "local".into(), indexed_at: now, }, // Feature branch symbol — should be preferred when querying feat/code SymbolDocument { file_path: "src/orchestrator/mod.rs".into(), repo_owner: Some("studio".into()), repo_name: "sol".into(), language: "rust".into(), symbol_name: "generate_from_response".into(), symbol_kind: "function".into(), signature: "pub async fn generate_from_response(&self, req: &GenerateRequest, resp: ConversationResponse) -> Option".into(), docstring: "Generate from a pre-built ConversationResponse. Caller manages conversation.".into(), start_line: 125, end_line: 160, content: "pub async fn generate_from_response(...) { ... }".into(), branch: "feat/code".into(), source: "local".into(), indexed_at: now, }, ] } #[tokio::test] async fn test_index_and_search_symbols() { let Some(client) = os_client() else { eprintln!("Skipping: OpenSearch not available at localhost:9200"); return; }; let index = setup_test_index(&client).await; let mut indexer = CodeIndexer::new(client.clone(), index.clone(), "".into(), 100); for doc in sample_symbols() { indexer.add(doc).await; } indexer.flush().await; refresh_index(&client, &index).await; // Search for "tool loop" — should find run_tool_loop let results = crate::tools::code_search::search_code( &client, &index, r#"{"query": "tool loop"}"#, Some("sol"), Some("mainline"), ).await.unwrap(); assert!(results.contains("run_tool_loop"), "Expected run_tool_loop in results, got:\n{results}"); // Search for "ToolSide" — should find the enum let results = crate::tools::code_search::search_code( &client, &index, r#"{"query": "ToolSide"}"#, Some("sol"), None, ).await.unwrap(); assert!(results.contains("ToolSide"), "Expected ToolSide in results, got:\n{results}"); // Search for "generate response" — should find generate() let results = crate::tools::code_search::search_code( &client, &index, r#"{"query": "generate response"}"#, Some("sol"), None, ).await.unwrap(); assert!(results.contains("generate"), "Expected generate in results, got:\n{results}"); cleanup_index(&client, &index).await; } #[tokio::test] async fn test_breadcrumb_project_outline() { let Some(client) = os_client() else { eprintln!("Skipping: OpenSearch not available"); return; }; let index = setup_test_index(&client).await; let mut indexer = CodeIndexer::new(client.clone(), index.clone(), "".into(), 100); for doc in sample_symbols() { indexer.add(doc).await; } indexer.flush().await; refresh_index(&client, &index).await; let result = breadcrumbs::build_breadcrumbs( &client, &index, "sol", "mainline", "hi", 4000 ).await; // Default outline should have project name assert!(result.outline.contains("sol"), "Outline should mention project name"); // Short message → no adaptive expansion assert!(result.relevant.is_empty(), "Short message should not trigger expansion"); cleanup_index(&client, &index).await; } #[tokio::test] async fn test_breadcrumb_adaptive_expansion() { let Some(client) = os_client() else { eprintln!("Skipping: OpenSearch not available"); return; }; let index = setup_test_index(&client).await; let mut indexer = CodeIndexer::new(client.clone(), index.clone(), "".into(), 100); for doc in sample_symbols() { indexer.add(doc).await; } indexer.flush().await; refresh_index(&client, &index).await; let result = breadcrumbs::build_breadcrumbs( &client, &index, "sol", "mainline", "how does the tool loop handle client-side tools?", 4000, ).await; // Adaptive expansion should find relevant symbols assert!(!result.relevant.is_empty(), "Substantive message should trigger expansion"); // Formatted output should contain relevant context section assert!(result.formatted.contains("relevant context"), "Should have relevant context section"); // Should include tool-related symbols let symbol_names: Vec<&str> = result.relevant.iter().map(|s| s.symbol_name.as_str()).collect(); assert!( symbol_names.iter().any(|n| n.contains("tool") || n.contains("route") || n.contains("ToolSide")), "Expected tool-related symbols, got: {:?}", symbol_names ); cleanup_index(&client, &index).await; } #[tokio::test] async fn test_breadcrumb_token_budget() { let Some(client) = os_client() else { eprintln!("Skipping: OpenSearch not available"); return; }; let index = setup_test_index(&client).await; let mut indexer = CodeIndexer::new(client.clone(), index.clone(), "".into(), 100); for doc in sample_symbols() { indexer.add(doc).await; } indexer.flush().await; refresh_index(&client, &index).await; // Very small budget — should only fit the outline let result = breadcrumbs::build_breadcrumbs( &client, &index, "sol", "mainline", "how does the tool loop work?", 100, // tiny budget ).await; assert!(result.formatted.len() <= 100, "Should respect token budget, got {} chars", result.formatted.len()); cleanup_index(&client, &index).await; } #[tokio::test] async fn test_branch_scoping() { let Some(client) = os_client() else { eprintln!("Skipping: OpenSearch not available"); return; }; let index = setup_test_index(&client).await; let mut indexer = CodeIndexer::new(client.clone(), index.clone(), "".into(), 100); for doc in sample_symbols() { indexer.add(doc).await; } indexer.flush().await; refresh_index(&client, &index).await; // Search on feat/code branch — should find generate_from_response (branch-specific) let results = crate::tools::code_search::search_code( &client, &index, r#"{"query": "generate from response", "branch": "feat/code"}"#, Some("sol"), None, ).await.unwrap(); assert!( results.contains("generate_from_response"), "Should find branch-specific symbol, got:\n{results}" ); // Should also find mainline symbols as fallback assert!( results.contains("generate") || results.contains("run_tool_loop"), "Should also find mainline symbols as fallback" ); cleanup_index(&client, &index).await; } #[tokio::test] async fn test_delete_branch_symbols() { let Some(client) = os_client() else { eprintln!("Skipping: OpenSearch not available"); return; }; let index = setup_test_index(&client).await; let mut indexer = CodeIndexer::new(client.clone(), index.clone(), "".into(), 100); for doc in sample_symbols() { indexer.add(doc).await; } indexer.flush().await; refresh_index(&client, &index).await; // Delete feat/code branch symbols indexer.delete_branch("sol", "feat/code").await; refresh_index(&client, &index).await; // Should no longer find generate_from_response let results = crate::tools::code_search::search_code( &client, &index, r#"{"query": "generate_from_response"}"#, Some("sol"), Some("feat/code"), ).await.unwrap(); // Mainline symbols should still exist let mainline_results = crate::tools::code_search::search_code( &client, &index, r#"{"query": "generate"}"#, Some("sol"), Some("mainline"), ).await.unwrap(); assert!(mainline_results.contains("generate"), "Mainline symbols should survive branch deletion"); cleanup_index(&client, &index).await; } } // ══════════════════════════════════════════════════════════════════════════ // Gitea SDK + devtools integration tests (requires local Gitea) // ══════════════════════════════════════════════════════════════════════════ mod gitea_tests { use super::*; use std::sync::Arc; pub(super) fn load_env() { let env_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join(".env"); if let Ok(contents) = std::fs::read_to_string(&env_path) { for line in contents.lines() { let line = line.trim(); if line.is_empty() || line.starts_with('#') { continue; } if let Some((k, v)) = line.split_once('=') { std::env::set_var(k.trim(), v.trim()); } } } } pub(super) fn gitea_available() -> bool { load_env(); let url = std::env::var("GITEA_URL").unwrap_or_default(); if url.is_empty() { return false; } std::process::Command::new("curl") .args(["-sf", &format!("{url}/api/v1/version")]) .output() .map(|o| o.status.success()) .unwrap_or(false) } pub(super) fn gitea_client() -> Option> { if !gitea_available() { return None; } let url = std::env::var("GITEA_URL").ok()?; let user = std::env::var("GITEA_ADMIN_USERNAME").ok()?; let pass = std::env::var("GITEA_ADMIN_PASSWORD").ok()?; let store = Arc::new(Store::open_memory().unwrap()); // Create a minimal vault client (won't be used — admin uses basic auth) let vault = Arc::new(crate::sdk::vault::VaultClient::new( "http://localhost:8200".into(), "test".into(), "secret".into(), )); let token_store = Arc::new(crate::sdk::tokens::TokenStore::new(store, vault)); Some(Arc::new(crate::sdk::gitea::GiteaClient::new( url, user, pass, token_store, ))) } #[tokio::test] async fn test_list_repos() { let Some(gitea) = gitea_client() else { eprintln!("Skipping: Gitea not available"); return; }; let repos = gitea.list_repos("sol", None, Some("studio"), Some(50)).await; assert!(repos.is_ok(), "list_repos should succeed: {:?}", repos.err()); let repos = repos.unwrap(); assert!(!repos.is_empty(), "Should find repos in studio org"); // Should find sol repo let sol = repos.iter().find(|r| r.full_name.contains("sol")); assert!(sol.is_some(), "Should find studio/sol repo"); } #[tokio::test] async fn test_get_repo() { let Some(gitea) = gitea_client() else { eprintln!("Skipping: Gitea not available"); return; }; let repo = gitea.get_repo("sol", "studio", "sol").await; assert!(repo.is_ok(), "get_repo should succeed: {:?}", repo.err()); let repo = repo.unwrap(); assert!(!repo.default_branch.is_empty(), "Should have a default branch"); } #[tokio::test] async fn test_get_file_directory() { let Some(gitea) = gitea_client() else { eprintln!("Skipping: Gitea not available"); return; }; // List repo root — SDK returns parse error for directory listings (known issue), // but the API call itself should succeed let result = gitea.get_file("sol", "studio", "sol", "", None).await; // Directory listing returns an array, SDK expects single object — may error // Just verify we can call it without panic let _ = result; } #[tokio::test] async fn test_get_file_content() { let Some(gitea) = gitea_client() else { eprintln!("Skipping: Gitea not available"); return; }; let result = gitea.get_file("sol", "studio", "sol", "Cargo.toml", None).await; assert!(result.is_ok(), "Should get Cargo.toml: {:?}", result.err()); } #[tokio::test] async fn test_gitea_code_indexing() { let Some(gitea) = gitea_client() else { eprintln!("Skipping: Gitea not available"); return; }; let Some(os) = super::code_index_tests::os_client() else { eprintln!("Skipping: OpenSearch not available"); return; }; let index = super::code_index_tests::setup_test_index(&os).await; let mut indexer = crate::code_index::indexer::CodeIndexer::new( os.clone(), index.clone(), String::new(), 50, ); // Index the mistralai-client-rs repo (small, Rust) let result = crate::code_index::gitea::index_repo( &gitea, &mut indexer, "sol", "studio", "mistralai-client-rs", "main", ).await; assert!(result.is_ok(), "Indexing should succeed: {:?}", result.err()); let count = result.unwrap(); indexer.flush().await; // Should have found symbols assert!(count > 0, "Should extract symbols from Rust repo, got 0"); // Verify we can search them super::code_index_tests::refresh_index(&os, &index).await; let search_result = crate::tools::code_search::search_code( &os, &index, r#"{"query": "Client"}"#, Some("mistralai-client-rs"), None, ).await.unwrap(); assert!(!search_result.contains("No code results"), "Should find Client in results: {search_result}"); super::code_index_tests::cleanup_index(&os, &index).await; } } // ══════════════════════════════════════════════════════════════════════════ // Devtools (Gitea tool) integration tests // ══════════════════════════════════════════════════════════════════════════ mod devtools_tests { use super::*; #[tokio::test] async fn test_gitea_list_repos_tool() { gitea_tests::load_env(); let Some(gitea) = gitea_tests::gitea_client() else { eprintln!("Skipping: Gitea not available"); return; }; let ctx = crate::context::ResponseContext { matrix_user_id: "@sol:sunbeam.local".into(), user_id: "sol".into(), display_name: None, is_dm: true, is_reply: false, room_id: "test".into(), }; let result = crate::tools::devtools::execute( &gitea, "gitea_list_repos", r#"{"org": "studio"}"#, &ctx, ).await; assert!(result.is_ok(), "list_repos tool should succeed: {:?}", result.err()); let text = result.unwrap(); assert!(text.contains("sol"), "Should find sol repo in results: {text}"); } #[tokio::test] async fn test_gitea_get_repo_tool() { gitea_tests::load_env(); let Some(gitea) = gitea_tests::gitea_client() else { eprintln!("Skipping: Gitea not available"); return; }; let ctx = crate::context::ResponseContext { matrix_user_id: "@sol:sunbeam.local".into(), user_id: "sol".into(), display_name: None, is_dm: true, is_reply: false, room_id: "test".into(), }; let result = crate::tools::devtools::execute( &gitea, "gitea_get_repo", r#"{"owner": "studio", "repo": "sol"}"#, &ctx, ).await; assert!(result.is_ok(), "get_repo tool should succeed: {:?}", result.err()); let text = result.unwrap(); assert!(text.contains("sol"), "Should contain repo name: {text}"); } #[tokio::test] async fn test_gitea_get_file_tool() { gitea_tests::load_env(); let Some(gitea) = gitea_tests::gitea_client() else { eprintln!("Skipping: Gitea not available"); return; }; let ctx = crate::context::ResponseContext { matrix_user_id: "@sol:sunbeam.local".into(), user_id: "sol".into(), display_name: None, is_dm: true, is_reply: false, room_id: "test".into(), }; let result = crate::tools::devtools::execute( &gitea, "gitea_get_file", r#"{"owner": "studio", "repo": "sol", "path": "Cargo.toml"}"#, &ctx, ).await; assert!(result.is_ok(), "get_file tool should succeed: {:?}", result.err()); } #[tokio::test] async fn test_gitea_list_branches_tool() { gitea_tests::load_env(); let Some(gitea) = gitea_tests::gitea_client() else { eprintln!("Skipping: Gitea not available"); return; }; let ctx = crate::context::ResponseContext { matrix_user_id: "@sol:sunbeam.local".into(), user_id: "sol".into(), display_name: None, is_dm: true, is_reply: false, room_id: "test".into(), }; let result = crate::tools::devtools::execute( &gitea, "gitea_list_branches", r#"{"owner": "studio", "repo": "sol"}"#, &ctx, ).await; assert!(result.is_ok(), "list_branches tool should succeed: {:?}", result.err()); let text = result.unwrap(); assert!(text.contains("mainline") || text.contains("main"), "Should find default branch: {text}"); } #[tokio::test] async fn test_gitea_list_issues_tool() { gitea_tests::load_env(); let Some(gitea) = gitea_tests::gitea_client() else { eprintln!("Skipping: Gitea not available"); return; }; let ctx = crate::context::ResponseContext { matrix_user_id: "@sol:sunbeam.local".into(), user_id: "sol".into(), display_name: None, is_dm: true, is_reply: false, room_id: "test".into(), }; let result = crate::tools::devtools::execute( &gitea, "gitea_list_issues", r#"{"owner": "studio", "repo": "sol"}"#, &ctx, ).await; assert!(result.is_ok(), "list_issues should succeed: {:?}", result.err()); } #[tokio::test] async fn test_gitea_list_orgs_tool() { gitea_tests::load_env(); let Some(gitea) = gitea_tests::gitea_client() else { eprintln!("Skipping: Gitea not available"); return; }; let ctx = crate::context::ResponseContext { matrix_user_id: "@sol:sunbeam.local".into(), user_id: "sol".into(), display_name: None, is_dm: true, is_reply: false, room_id: "test".into(), }; let result = crate::tools::devtools::execute( &gitea, "gitea_list_orgs", r#"{"username": "sol"}"#, &ctx, ).await; assert!(result.is_ok(), "list_orgs should succeed: {:?}", result.err()); let text = result.unwrap(); assert!(text.contains("studio"), "Should find studio org: {text}"); } } // ══════════════════════════════════════════════════════════════════════════ // Web search + conversation registry tests // ══════════════════════════════════════════════════════════════════════════ mod service_tests { use super::*; #[tokio::test] async fn test_web_search() { // Requires SearXNG at localhost:8888 let result = reqwest::get("http://localhost:8888/search?q=test&format=json").await; if result.is_err() { eprintln!("Skipping: SearXNG not available"); return; } let tool_result = crate::tools::web_search::search( "http://localhost:8888", r#"{"query": "rust programming language", "limit": 3}"#, ).await; assert!(tool_result.is_ok(), "Web search should succeed: {:?}", tool_result.err()); let text = tool_result.unwrap(); assert!(!text.is_empty(), "Should return results"); assert!(text.to_lowercase().contains("rust"), "Should mention Rust in results"); } #[tokio::test] async fn test_conversation_registry_lifecycle() { let store = Arc::new(Store::open_memory().unwrap()); let registry = crate::conversations::ConversationRegistry::new( "mistral-medium-latest".into(), 118000, store, ); // No conversation should exist yet let conv_id = registry.get_conversation_id("test-room").await; assert!(conv_id.is_none(), "Should have no conversation initially"); } #[tokio::test] async fn test_conversation_send_message() { let env_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join(".env"); if let Ok(contents) = std::fs::read_to_string(&env_path) { for line in contents.lines() { let line = line.trim(); if line.is_empty() || line.starts_with('#') { continue; } if let Some((k, v)) = line.split_once('=') { std::env::set_var(k.trim(), v.trim()); } } } let api_key = match std::env::var("SOL_MISTRAL_API_KEY") { Ok(k) => k, Err(_) => { eprintln!("Skipping: no API key"); return; } }; let mistral = Arc::new( mistralai_client::v1::client::Client::new(Some(api_key), None, None, None).unwrap(), ); let store = Arc::new(Store::open_memory().unwrap()); let registry = crate::conversations::ConversationRegistry::new( "mistral-medium-latest".into(), 118000, store, ); let conv_key = format!("test-{}", uuid::Uuid::new_v4()); let input = mistralai_client::v1::conversations::ConversationInput::Text("say hi".into()); let result = registry.send_message(&conv_key, input, true, &mistral, None).await; assert!(result.is_ok(), "send_message should succeed: {:?}", result.err()); // Conversation should now exist let conv_id = registry.get_conversation_id(&conv_key).await; assert!(conv_id.is_some(), "Conversation should be stored after first message"); } #[test] fn test_evaluator_rule_matching() { let config = test_config(); let evaluator = crate::brain::evaluator::Evaluator::new( config, "you are sol.".into(), ); // DM should trigger MustRespond let engagement = evaluator.evaluate_rules( "@alice:sunbeam.pt", "hey sol", true, // DM ); assert!(engagement.is_some(), "DM should trigger a rule"); // Own message should be Ignored let engagement = evaluator.evaluate_rules( "@test:localhost", // matches config user_id "hello", false, ); assert!(engagement.is_some(), "Own message should be Ignored"); } } // ══════════════════════════════════════════════════════════════════════════ // gRPC bridge unit tests // ══════════════════════════════════════════════════════════════════════════ mod bridge_tests { use crate::grpc::bridge; use crate::orchestrator::event::*; #[tokio::test] async fn test_bridge_thinking_event() { let (tx, mut rx) = tokio::sync::mpsc::channel(16); let (event_tx, event_rx) = tokio::sync::broadcast::channel(16); let rid = RequestId::new(); let rid2 = rid.clone(); let handle = tokio::spawn(async move { bridge::bridge_events_to_grpc(rid2, event_rx, tx).await; }); // Send Thinking + Done let _ = event_tx.send(OrchestratorEvent::Thinking { request_id: rid.clone() }); let _ = event_tx.send(OrchestratorEvent::Done { request_id: rid.clone(), text: "hello".into(), usage: TokenUsage::default(), }); // Collect messages let mut msgs = Vec::new(); while let Some(Ok(msg)) = rx.recv().await { msgs.push(msg); if msgs.len() >= 2 { break; } } assert_eq!(msgs.len(), 2, "Should get Status + TextDone"); let _ = handle.await; } #[tokio::test] async fn test_bridge_tool_call_client() { let (tx, mut rx) = tokio::sync::mpsc::channel(16); let (event_tx, event_rx) = tokio::sync::broadcast::channel(16); let rid = RequestId::new(); let rid2 = rid.clone(); let handle = tokio::spawn(async move { bridge::bridge_events_to_grpc(rid2, event_rx, tx).await; }); let _ = event_tx.send(OrchestratorEvent::ToolCallDetected { request_id: rid.clone(), call_id: "c1".into(), name: "file_read".into(), args: "{}".into(), side: ToolSide::Client, }); let _ = event_tx.send(OrchestratorEvent::Done { request_id: rid.clone(), text: "done".into(), usage: TokenUsage::default(), }); let mut msgs = Vec::new(); while let Some(Ok(msg)) = rx.recv().await { msgs.push(msg); if msgs.len() >= 2 { break; } } // First message should be ToolCall assert!(msgs.len() >= 1); let _ = handle.await; } #[tokio::test] async fn test_bridge_filters_by_request_id() { let (tx, mut rx) = tokio::sync::mpsc::channel(16); let (event_tx, event_rx) = tokio::sync::broadcast::channel(16); let rid = RequestId::new(); let other_rid = RequestId::new(); let rid2 = rid.clone(); let handle = tokio::spawn(async move { bridge::bridge_events_to_grpc(rid2, event_rx, tx).await; }); // Send event for different request — should be filtered out let _ = event_tx.send(OrchestratorEvent::Thinking { request_id: other_rid }); // Send Done for our request — should be forwarded let _ = event_tx.send(OrchestratorEvent::Done { request_id: rid.clone(), text: "hi".into(), usage: TokenUsage::default(), }); let msg = rx.recv().await; assert!(msg.is_some(), "Should get Done message (filtered correctly)"); let _ = handle.await; } }