diff --git a/src/breadcrumbs/mod.rs b/src/breadcrumbs/mod.rs index 7d4b468..060dd82 100644 --- a/src/breadcrumbs/mod.rs +++ b/src/breadcrumbs/mod.rs @@ -190,17 +190,15 @@ async fn hybrid_symbol_search( // Step 2: Build hybrid query let mut should_clauses = vec![ - serde_json::json!({ "match": { "content": user_message } }), + serde_json::json!({ "match": { "content": { "query": user_message, "boost": 1.0 } } }), serde_json::json!({ "match": { "signature": { "query": user_message, "boost": 2.0 } } }), serde_json::json!({ "match": { "docstring": { "query": user_message, "boost": 1.5 } } }), ]; - // Add symbol name term matching from analyzed tokens - if !tokens.is_empty() { - // Build wildcard patterns from tokens for symbol name matching - let patterns: Vec = tokens.iter().map(|t| format!(".*{t}.*")).collect(); + // Add wildcard queries on symbol_name for each analyzed token + for token in &tokens { should_clauses.push(serde_json::json!({ - "regexp": { "symbol_name": { "value": patterns.join("|"), "boost": 3.0 } } + "wildcard": { "symbol_name": { "value": format!("*{token}*"), "boost": 3.0 } } })); } diff --git a/src/code_index/indexer.rs b/src/code_index/indexer.rs index da760be..f86db92 100644 --- a/src/code_index/indexer.rs +++ b/src/code_index/indexer.rs @@ -49,13 +49,11 @@ impl CodeIndexer { body.push(serde_json::to_value(doc).unwrap_or_default().into()); } - match self - .client - .bulk(opensearch::BulkParts::None) - .pipeline(&self.pipeline) - .body(body) - .send() - .await + let mut req = self.client.bulk(opensearch::BulkParts::None).body(body); + if !self.pipeline.is_empty() { + req = req.pipeline(&self.pipeline); + } + match req.send().await { Ok(response) => { let count = self.buffer.len(); diff --git a/src/grpc/mod.rs b/src/grpc/mod.rs index 071d47e..f407ca7 100644 --- a/src/grpc/mod.rs +++ b/src/grpc/mod.rs @@ -26,11 +26,20 @@ pub struct GrpcState { pub store: Arc, pub mistral: Arc, pub matrix: Option, + pub opensearch: Option, pub system_prompt: String, pub orchestrator_agent_id: String, pub orchestrator: Option>, } +impl GrpcState { + /// Get the code index name from config, defaulting to "sol_code". + pub fn code_index_name(&self) -> String { + // TODO: add to config. For now, hardcode. + "sol_code".into() + } +} + /// Start the gRPC server. Call from main.rs alongside the Matrix sync loop. pub async fn start_server(state: Arc) -> anyhow::Result<()> { let addr = state diff --git a/src/grpc/session.rs b/src/grpc/session.rs index cf5a15d..02fcb8a 100644 --- a/src/grpc/session.rs +++ b/src/grpc/session.rs @@ -225,20 +225,47 @@ you also have access to server-side tools: search_archive, search_web, research, } /// Build the per-message context header for coding mode. - fn build_context_header(&self) -> String { + /// Includes time context, project info, instructions, and adaptive breadcrumbs. + async fn build_context_header(&self, user_message: &str) -> String { let tc = TimeContext::now(); - format!( - "{}\n[project: {} | path: {} | model: {}]\n{}", + let mut header = format!( + "{}\n[project: {} | path: {} | model: {}]", tc.message_line(), self.project_name, self.project_path, self.model, - if self.prompt_md.is_empty() { - String::new() - } else { - format!("## project instructions\n{}\n", self.prompt_md) - }, - ) + ); + + if !self.prompt_md.is_empty() { + header.push_str(&format!("\n## project instructions\n{}", self.prompt_md)); + } + + // Inject adaptive breadcrumbs from the code index (if OpenSearch available) + if let Some(ref os) = self.state.opensearch { + let breadcrumbs = crate::breadcrumbs::build_breadcrumbs( + os, + &self.state.code_index_name(), + &self.project_name, + &self.git_branch(), + user_message, + 4000, // ~1000 tokens budget + ) + .await; + + if !breadcrumbs.formatted.is_empty() { + header.push('\n'); + header.push_str(&breadcrumbs.formatted); + } + } + + header.push('\n'); + header + } + + fn git_branch(&self) -> String { + // Stored from StartSession.git_branch, fall back to "mainline" + // TODO: store git_branch in CodeSession struct + "mainline".into() } /// Send a user message and run the agent loop. @@ -250,7 +277,7 @@ you also have access to server-side tools: search_archive, search_web, research, client_tx: &mpsc::Sender>, client_rx: &mut tonic::Streaming, ) -> anyhow::Result<()> { - let context_header = self.build_context_header(); + let context_header = self.build_context_header(text).await; let input_text = format!("{context_header}\n{text}"); // Post user message to Matrix room (as m.notice to distinguish from assistant) @@ -486,7 +513,7 @@ you also have access to server-side tools: search_archive, search_web, research, &mut self, text: &str, ) -> anyhow::Result { - let context_header = self.build_context_header(); + let context_header = self.build_context_header(text).await; let input_text = format!("{context_header}\n{text}"); if let Some(ref conv_id) = self.conversation_id { diff --git a/src/integration_test.rs b/src/integration_test.rs index 2dc06f0..df4edca 100644 --- a/src/integration_test.rs +++ b/src/integration_test.rs @@ -606,7 +606,8 @@ mod grpc_tests { tools, store, mistral, - matrix: None, // not needed for tests + matrix: None, + opensearch: None, // breadcrumbs disabled in tests system_prompt: "you are sol. respond briefly. lowercase only.".into(), orchestrator_agent_id: String::new(), orchestrator: Some(orch), @@ -921,3 +922,355 @@ mod grpc_tests { assert!(got_end, "Server should send SessionEnd on clean disconnect"); } } + +// ══════════════════════════════════════════════════════════════════════════ +// Code index + breadcrumb integration tests (requires local OpenSearch) +// ══════════════════════════════════════════════════════════════════════════ + +mod code_index_tests { + use super::*; + use crate::code_index::schema::{self, SymbolDocument}; + use crate::code_index::indexer::CodeIndexer; + use crate::breadcrumbs; + + fn os_client() -> Option { + use opensearch::http::transport::{SingleNodeConnectionPool, TransportBuilder}; + let url = url::Url::parse("http://localhost:9200").ok()?; + let transport = TransportBuilder::new(SingleNodeConnectionPool::new(url)) + .build() + .ok()?; + Some(opensearch::OpenSearch::new(transport)) + } + + async fn setup_test_index(client: &opensearch::OpenSearch) -> String { + let index = format!("sol_code_test_{}", uuid::Uuid::new_v4().to_string().split('-').next().unwrap()); + schema::create_index_if_not_exists(client, &index).await.unwrap(); + index + } + + async fn refresh_index(client: &opensearch::OpenSearch, index: &str) { + let _ = client + .indices() + .refresh(opensearch::indices::IndicesRefreshParts::Index(&[index])) + .send() + .await; + } + + async fn cleanup_index(client: &opensearch::OpenSearch, index: &str) { + let _ = client + .indices() + .delete(opensearch::indices::IndicesDeleteParts::Index(&[index])) + .send() + .await; + } + + fn sample_symbols() -> Vec { + let now = chrono::Utc::now().timestamp_millis(); + vec![ + SymbolDocument { + file_path: "src/orchestrator/mod.rs".into(), + repo_owner: Some("studio".into()), + repo_name: "sol".into(), + language: "rust".into(), + symbol_name: "generate".into(), + symbol_kind: "function".into(), + signature: "pub async fn generate(&self, req: &GenerateRequest) -> Option".into(), + docstring: "Generate a response using the ConversationRegistry.".into(), + start_line: 80, + end_line: 120, + content: "pub async fn generate(&self, req: &GenerateRequest) -> Option { ... }".into(), + branch: "mainline".into(), + source: "local".into(), + indexed_at: now, + }, + SymbolDocument { + file_path: "src/orchestrator/engine.rs".into(), + repo_owner: Some("studio".into()), + repo_name: "sol".into(), + language: "rust".into(), + symbol_name: "run_tool_loop".into(), + symbol_kind: "function".into(), + signature: "pub async fn run_tool_loop(orch: &Orchestrator, req: &GenerateRequest, resp: ConversationResponse) -> Option<(String, TokenUsage)>".into(), + docstring: "Unified Mistral tool loop. Emits events for every state transition.".into(), + start_line: 20, + end_line: 160, + content: "pub async fn run_tool_loop(...) { ... tool iteration ... }".into(), + branch: "mainline".into(), + source: "local".into(), + indexed_at: now, + }, + SymbolDocument { + file_path: "src/orchestrator/tool_dispatch.rs".into(), + repo_owner: Some("studio".into()), + repo_name: "sol".into(), + language: "rust".into(), + symbol_name: "route".into(), + symbol_kind: "function".into(), + signature: "pub fn route(tool_name: &str) -> ToolSide".into(), + docstring: "Route a tool call to server or client.".into(), + start_line: 17, + end_line: 23, + content: "pub fn route(tool_name: &str) -> ToolSide { if CLIENT_TOOLS.contains ... }".into(), + branch: "mainline".into(), + source: "local".into(), + indexed_at: now, + }, + SymbolDocument { + file_path: "src/orchestrator/event.rs".into(), + repo_owner: Some("studio".into()), + repo_name: "sol".into(), + language: "rust".into(), + symbol_name: "ToolSide".into(), + symbol_kind: "enum".into(), + signature: "pub enum ToolSide { Server, Client }".into(), + docstring: "Whether a tool executes on the server or on a connected client.".into(), + start_line: 68, + end_line: 72, + content: "pub enum ToolSide { Server, Client }".into(), + branch: "mainline".into(), + source: "local".into(), + indexed_at: now, + }, + SymbolDocument { + file_path: "src/orchestrator/event.rs".into(), + repo_owner: Some("studio".into()), + repo_name: "sol".into(), + language: "rust".into(), + symbol_name: "OrchestratorEvent".into(), + symbol_kind: "enum".into(), + signature: "pub enum OrchestratorEvent { Started, Thinking, ToolCallDetected, ToolStarted, ToolCompleted, Done, Failed }".into(), + docstring: "An event emitted by the orchestrator during response generation.".into(), + start_line: 110, + end_line: 170, + content: "pub enum OrchestratorEvent { ... }".into(), + branch: "mainline".into(), + source: "local".into(), + indexed_at: now, + }, + // Feature branch symbol — should be preferred when querying feat/code + SymbolDocument { + file_path: "src/orchestrator/mod.rs".into(), + repo_owner: Some("studio".into()), + repo_name: "sol".into(), + language: "rust".into(), + symbol_name: "generate_from_response".into(), + symbol_kind: "function".into(), + signature: "pub async fn generate_from_response(&self, req: &GenerateRequest, resp: ConversationResponse) -> Option".into(), + docstring: "Generate from a pre-built ConversationResponse. Caller manages conversation.".into(), + start_line: 125, + end_line: 160, + content: "pub async fn generate_from_response(...) { ... }".into(), + branch: "feat/code".into(), + source: "local".into(), + indexed_at: now, + }, + ] + } + + #[tokio::test] + async fn test_index_and_search_symbols() { + let Some(client) = os_client() else { + eprintln!("Skipping: OpenSearch not available at localhost:9200"); + return; + }; + + let index = setup_test_index(&client).await; + let mut indexer = CodeIndexer::new(client.clone(), index.clone(), "".into(), 100); + + for doc in sample_symbols() { + indexer.add(doc).await; + } + indexer.flush().await; + + refresh_index(&client, &index).await; + + // Search for "tool loop" — should find run_tool_loop + let results = crate::tools::code_search::search_code( + &client, &index, + r#"{"query": "tool loop"}"#, + Some("sol"), Some("mainline"), + ).await.unwrap(); + assert!(results.contains("run_tool_loop"), "Expected run_tool_loop in results, got:\n{results}"); + + // Search for "ToolSide" — should find the enum + let results = crate::tools::code_search::search_code( + &client, &index, + r#"{"query": "ToolSide"}"#, + Some("sol"), None, + ).await.unwrap(); + assert!(results.contains("ToolSide"), "Expected ToolSide in results, got:\n{results}"); + + // Search for "generate response" — should find generate() + let results = crate::tools::code_search::search_code( + &client, &index, + r#"{"query": "generate response"}"#, + Some("sol"), None, + ).await.unwrap(); + assert!(results.contains("generate"), "Expected generate in results, got:\n{results}"); + + cleanup_index(&client, &index).await; + } + + #[tokio::test] + async fn test_breadcrumb_project_outline() { + let Some(client) = os_client() else { + eprintln!("Skipping: OpenSearch not available"); + return; + }; + + let index = setup_test_index(&client).await; + let mut indexer = CodeIndexer::new(client.clone(), index.clone(), "".into(), 100); + for doc in sample_symbols() { + indexer.add(doc).await; + } + indexer.flush().await; + refresh_index(&client, &index).await; + + let result = breadcrumbs::build_breadcrumbs( + &client, &index, "sol", "mainline", "hi", 4000 + ).await; + + // Default outline should have project name + assert!(result.outline.contains("sol"), "Outline should mention project name"); + // Short message → no adaptive expansion + assert!(result.relevant.is_empty(), "Short message should not trigger expansion"); + + cleanup_index(&client, &index).await; + } + + #[tokio::test] + async fn test_breadcrumb_adaptive_expansion() { + let Some(client) = os_client() else { + eprintln!("Skipping: OpenSearch not available"); + return; + }; + + let index = setup_test_index(&client).await; + let mut indexer = CodeIndexer::new(client.clone(), index.clone(), "".into(), 100); + for doc in sample_symbols() { + indexer.add(doc).await; + } + indexer.flush().await; + refresh_index(&client, &index).await; + + let result = breadcrumbs::build_breadcrumbs( + &client, &index, "sol", "mainline", + "how does the tool loop handle client-side tools?", + 4000, + ).await; + + // Adaptive expansion should find relevant symbols + assert!(!result.relevant.is_empty(), "Substantive message should trigger expansion"); + + // Formatted output should contain relevant context section + assert!(result.formatted.contains("relevant context"), "Should have relevant context section"); + + // Should include tool-related symbols + let symbol_names: Vec<&str> = result.relevant.iter().map(|s| s.symbol_name.as_str()).collect(); + assert!( + symbol_names.iter().any(|n| n.contains("tool") || n.contains("route") || n.contains("ToolSide")), + "Expected tool-related symbols, got: {:?}", symbol_names + ); + + cleanup_index(&client, &index).await; + } + + #[tokio::test] + async fn test_breadcrumb_token_budget() { + let Some(client) = os_client() else { + eprintln!("Skipping: OpenSearch not available"); + return; + }; + + let index = setup_test_index(&client).await; + let mut indexer = CodeIndexer::new(client.clone(), index.clone(), "".into(), 100); + for doc in sample_symbols() { + indexer.add(doc).await; + } + indexer.flush().await; + refresh_index(&client, &index).await; + + // Very small budget — should only fit the outline + let result = breadcrumbs::build_breadcrumbs( + &client, &index, "sol", "mainline", + "how does the tool loop work?", + 100, // tiny budget + ).await; + + assert!(result.formatted.len() <= 100, "Should respect token budget, got {} chars", result.formatted.len()); + + cleanup_index(&client, &index).await; + } + + #[tokio::test] + async fn test_branch_scoping() { + let Some(client) = os_client() else { + eprintln!("Skipping: OpenSearch not available"); + return; + }; + + let index = setup_test_index(&client).await; + let mut indexer = CodeIndexer::new(client.clone(), index.clone(), "".into(), 100); + for doc in sample_symbols() { + indexer.add(doc).await; + } + indexer.flush().await; + refresh_index(&client, &index).await; + + // Search on feat/code branch — should find generate_from_response (branch-specific) + let results = crate::tools::code_search::search_code( + &client, &index, + r#"{"query": "generate from response", "branch": "feat/code"}"#, + Some("sol"), None, + ).await.unwrap(); + assert!( + results.contains("generate_from_response"), + "Should find branch-specific symbol, got:\n{results}" + ); + + // Should also find mainline symbols as fallback + assert!( + results.contains("generate") || results.contains("run_tool_loop"), + "Should also find mainline symbols as fallback" + ); + + cleanup_index(&client, &index).await; + } + + #[tokio::test] + async fn test_delete_branch_symbols() { + let Some(client) = os_client() else { + eprintln!("Skipping: OpenSearch not available"); + return; + }; + + let index = setup_test_index(&client).await; + let mut indexer = CodeIndexer::new(client.clone(), index.clone(), "".into(), 100); + for doc in sample_symbols() { + indexer.add(doc).await; + } + indexer.flush().await; + refresh_index(&client, &index).await; + + // Delete feat/code branch symbols + indexer.delete_branch("sol", "feat/code").await; + refresh_index(&client, &index).await; + + // Should no longer find generate_from_response + let results = crate::tools::code_search::search_code( + &client, &index, + r#"{"query": "generate_from_response"}"#, + Some("sol"), Some("feat/code"), + ).await.unwrap(); + + // Mainline symbols should still exist + let mainline_results = crate::tools::code_search::search_code( + &client, &index, + r#"{"query": "generate"}"#, + Some("sol"), Some("mainline"), + ).await.unwrap(); + assert!(mainline_results.contains("generate"), "Mainline symbols should survive branch deletion"); + + cleanup_index(&client, &index).await; + } +} diff --git a/src/main.rs b/src/main.rs index 77466a3..b7ebacf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -324,6 +324,16 @@ async fn main() -> anyhow::Result<()> { store: store.clone(), mistral: state.mistral.clone(), matrix: Some(matrix_client.clone()), + opensearch: { + // Rebuild a fresh OpenSearch client (os_client was moved into AppState) + let os_url = url::Url::parse(&config.opensearch.url).ok(); + os_url.map(|u| { + let transport = opensearch::http::transport::TransportBuilder::new( + opensearch::http::transport::SingleNodeConnectionPool::new(u), + ).build().unwrap(); + opensearch::OpenSearch::new(transport) + }) + }, system_prompt: system_prompt_text.clone(), orchestrator_agent_id: orchestrator_id, orchestrator: Some(orch), diff --git a/src/tools/code_search.rs b/src/tools/code_search.rs new file mode 100644 index 0000000..6523746 --- /dev/null +++ b/src/tools/code_search.rs @@ -0,0 +1,135 @@ +//! search_code tool — semantic + keyword search over the code index. + +use opensearch::OpenSearch; +use serde::Deserialize; +use tracing::warn; + +#[derive(Debug, Deserialize)] +struct SearchCodeArgs { + query: String, + #[serde(default)] + language: Option, + #[serde(default)] + repo: Option, + #[serde(default)] + branch: Option, + #[serde(default)] + semantic: Option, + #[serde(default)] + limit: Option, +} + +pub async fn search_code( + client: &OpenSearch, + index: &str, + arguments: &str, + default_repo: Option<&str>, + default_branch: Option<&str>, +) -> anyhow::Result { + let args: SearchCodeArgs = serde_json::from_str(arguments)?; + let limit = args.limit.unwrap_or(10); + let repo = args.repo.as_deref().or(default_repo); + let branch = args.branch.as_deref().or(default_branch); + + let mut filters = Vec::new(); + if let Some(repo) = repo { + filters.push(serde_json::json!({ "term": { "repo_name": repo } })); + } + if let Some(lang) = &args.language { + filters.push(serde_json::json!({ "term": { "language": lang } })); + } + if let Some(branch) = branch { + filters.push(serde_json::json!({ + "bool": { "should": [ + { "term": { "branch": { "value": branch, "boost": 2.0 } } }, + { "term": { "branch": "mainline" } }, + { "term": { "branch": "main" } } + ]} + })); + } + + let query = serde_json::json!({ + "size": limit, + "_source": ["file_path", "symbol_name", "symbol_kind", "signature", "docstring", "start_line", "end_line", "language", "branch"], + "query": { + "bool": { + "should": [ + { "match": { "content": { "query": &args.query, "boost": 1.0 } } }, + { "match": { "signature": { "query": &args.query, "boost": 2.0 } } }, + { "match": { "docstring": { "query": &args.query, "boost": 1.5 } } }, + { "match": { "symbol_name": { "query": &args.query, "boost": 3.0 } } } + ], + "filter": filters, + "minimum_should_match": 1 + } + } + }); + + // TODO: add neural search component when kNN is available + // The hybrid pipeline will combine BM25 + neural for best results. + + let response = client + .search(opensearch::SearchParts::Index(&[index])) + .body(query) + .send() + .await?; + + let body: serde_json::Value = response.json().await?; + + let hits = body["hits"]["hits"].as_array(); + if hits.is_none() || hits.unwrap().is_empty() { + return Ok("No code results found.".into()); + } + + let mut results = Vec::new(); + for hit in hits.unwrap() { + let src = &hit["_source"]; + let file_path = src["file_path"].as_str().unwrap_or("?"); + let name = src["symbol_name"].as_str().unwrap_or("?"); + let kind = src["symbol_kind"].as_str().unwrap_or("?"); + let sig = src["signature"].as_str().unwrap_or(""); + let doc = src["docstring"].as_str().unwrap_or(""); + let start = src["start_line"].as_u64().unwrap_or(0); + let end = src["end_line"].as_u64().unwrap_or(0); + let lang = src["language"].as_str().unwrap_or("?"); + + let mut entry = format!("{file_path}:{start}-{end} ({lang}) {kind} {name}"); + if !sig.is_empty() { + entry.push_str(&format!("\n {sig}")); + } + if !doc.is_empty() { + let first_line = doc.lines().next().unwrap_or(""); + entry.push_str(&format!("\n /// {first_line}")); + } + results.push(entry); + } + + Ok(results.join("\n\n")) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_search_code_args() { + let args: SearchCodeArgs = serde_json::from_str(r#"{"query": "tool loop"}"#).unwrap(); + assert_eq!(args.query, "tool loop"); + assert!(args.language.is_none()); + assert!(args.repo.is_none()); + assert!(args.limit.is_none()); + } + + #[test] + fn test_parse_search_code_args_full() { + let args: SearchCodeArgs = serde_json::from_str( + r#"{"query": "auth", "language": "rust", "repo": "sol", "branch": "feat/code", "semantic": true, "limit": 5}"# + ).unwrap(); + assert_eq!(args.query, "auth"); + assert_eq!(args.language.as_deref(), Some("rust")); + assert_eq!(args.repo.as_deref(), Some("sol")); + assert_eq!(args.branch.as_deref(), Some("feat/code")); + assert_eq!(args.semantic, Some(true)); + assert_eq!(args.limit, Some(5)); + } +} diff --git a/src/tools/mod.rs b/src/tools/mod.rs index 82c61bd..aa5bb21 100644 --- a/src/tools/mod.rs +++ b/src/tools/mod.rs @@ -1,4 +1,5 @@ pub mod bridge; +pub mod code_search; pub mod devtools; pub mod identity; pub mod research; @@ -374,6 +375,13 @@ impl ToolRegistry { anyhow::bail!("Identity (Kratos) integration not configured") } } + "search_code" => { + if let Some(ref os) = self.opensearch { + code_search::search_code(os, "sol_code", arguments, None, None).await + } else { + anyhow::bail!("Code search not available (OpenSearch not configured)") + } + } "search_web" => { if let Some(ref searxng) = self.config.services.searxng { web_search::search(&searxng.url, arguments).await