diff --git a/src/orchestrator/engine.rs b/src/orchestrator/engine.rs index 4598885..719963d 100644 --- a/src/orchestrator/engine.rs +++ b/src/orchestrator/engine.rs @@ -1,206 +1,50 @@ //! The unified response generation engine. //! //! Single implementation of the Mistral Conversations API tool loop. -//! Emits `OrchestratorEvent`s instead of calling Matrix/gRPC directly. -//! Phase 2: replaces `responder.generate_response_conversations()`. +//! Emits `OrchestratorEvent`s — no transport knowledge. -use std::sync::Arc; use std::time::Duration; use mistralai_client::v1::conversations::{ - ConversationEntry, ConversationInput, ConversationResponse, FunctionResultEntry, + ConversationEntry, ConversationInput, ConversationResponse, + AppendConversationRequest, FunctionResultEntry, }; -use rand::Rng; -use tokio::sync::broadcast; use tracing::{debug, error, info, warn}; use super::event::*; use super::tool_dispatch; use super::Orchestrator; -use crate::brain::personality::Personality; -use crate::context::ResponseContext; -use crate::conversations::ConversationRegistry; -use crate::time_context::TimeContext; -/// Strip "Sol: " or "sol: " prefix that models sometimes prepend. -fn strip_sol_prefix(text: &str) -> String { - let trimmed = text.trim(); - if trimmed.starts_with("Sol: ") || trimmed.starts_with("sol: ") { - trimmed[5..].to_string() - } else if trimmed.starts_with("Sol:\n") || trimmed.starts_with("sol:\n") { - trimmed[4..].to_string() - } else { - trimmed.to_string() - } -} - -/// Generate a chat response through the Conversations API. -/// This is the unified path that replaces both the responder's conversations -/// method and the gRPC session's inline tool loop. -pub async fn generate_response( +/// Run the Mistral tool iteration loop on a conversation response. +/// Emits events for every state transition. Returns the final text + usage. +pub async fn run_tool_loop( orchestrator: &Orchestrator, - personality: &Personality, - request: &ChatRequest, - response_ctx: &ResponseContext, - conversation_registry: &ConversationRegistry, -) -> Option { - let request_id = &request.request_id; - - // Emit start - orchestrator.emit(OrchestratorEvent::ResponseStarted { - request_id: request_id.clone(), - mode: ResponseMode::Chat { - room_id: request.room_id.clone(), - is_spontaneous: request.is_spontaneous, - use_thread: request.use_thread, - trigger_event_id: request.trigger_event_id.clone(), - }, - }); - - // Apply response delay - if !orchestrator.config.behavior.instant_responses { - let delay = if request.is_spontaneous { - rand::thread_rng().gen_range( - orchestrator.config.behavior.spontaneous_delay_min_ms - ..=orchestrator.config.behavior.spontaneous_delay_max_ms, - ) - } else { - rand::thread_rng().gen_range( - orchestrator.config.behavior.response_delay_min_ms - ..=orchestrator.config.behavior.response_delay_max_ms, - ) - }; - tokio::time::sleep(Duration::from_millis(delay)).await; - } - - orchestrator.emit(OrchestratorEvent::Thinking { - request_id: request_id.clone(), - }); - - // Memory query - let memory_notes = load_memory_notes(orchestrator, response_ctx, &request.trigger_body).await; - - // Build context header - let tc = TimeContext::now(); - let mut context_header = format!( - "{}\n[room: {} ({})]", - tc.message_line(), - request.room_name, - request.room_id, - ); - - if let Some(ref notes) = memory_notes { - context_header.push('\n'); - context_header.push_str(notes); - } - - let user_msg = if request.is_dm { - request.trigger_body.clone() - } else { - format!("<{}> {}", response_ctx.matrix_user_id, request.trigger_body) - }; - - let input_text = format!("{context_header}\n{user_msg}"); - let input = ConversationInput::Text(input_text); - - // Send through conversation registry - let response = match conversation_registry - .send_message( - &request.room_id, - input, - request.is_dm, - &orchestrator.mistral, - request.context_hint.as_deref(), - ) - .await - { - Ok(r) => r, - Err(e) => { - error!("Conversation API failed: {e}"); - orchestrator.emit(OrchestratorEvent::ResponseFailed { - request_id: request_id.clone(), - error: e.clone(), - }); - return None; - } - }; - - // Tool loop - let result = run_tool_loop( - orchestrator, - request_id, - response, - response_ctx, - conversation_registry, - &request.room_id, - request.is_dm, - ) - .await; - - match result { - Some(text) => { - let text = strip_sol_prefix(&text); - if text.is_empty() { - orchestrator.emit(OrchestratorEvent::ResponseFailed { - request_id: request_id.clone(), - error: "Empty response from model".into(), - }); - return None; - } - - orchestrator.emit(OrchestratorEvent::ResponseReady { - request_id: request_id.clone(), - text: text.clone(), - prompt_tokens: 0, // TODO: extract from response - completion_tokens: 0, - tool_iterations: 0, - }); - - // Schedule memory extraction - orchestrator.emit(OrchestratorEvent::MemoryExtractionScheduled { - request_id: request_id.clone(), - user_msg: request.trigger_body.clone(), - response: text.clone(), - }); - - Some(text) - } - None => { - orchestrator.emit(OrchestratorEvent::ResponseFailed { - request_id: request_id.clone(), - error: "No response from model".into(), - }); - None - } - } -} - -/// The unified tool iteration loop. -/// Emits tool events and executes server-side tools. -/// Client-side tools are dispatched via the pending_client_tools oneshot map. -async fn run_tool_loop( - orchestrator: &Orchestrator, - request_id: &RequestId, + request: &GenerateRequest, initial_response: ConversationResponse, - response_ctx: &ResponseContext, - conversation_registry: &ConversationRegistry, - room_id: &str, - is_dm: bool, -) -> Option { +) -> Option<(String, TokenUsage)> { + let request_id = &request.request_id; let function_calls = initial_response.function_calls(); - // No tool calls — return the text directly + // No tool calls — return text directly if function_calls.is_empty() { - return initial_response.assistant_text(); + return initial_response.assistant_text().map(|text| { + (text, TokenUsage { + prompt_tokens: initial_response.usage.prompt_tokens, + completion_tokens: initial_response.usage.completion_tokens, + }) + }); } - orchestrator.emit(OrchestratorEvent::AgentProgressStarted { - request_id: request_id.clone(), - }); - + let conv_id = initial_response.conversation_id.clone(); let max_iterations = orchestrator.config.mistral.max_tool_iterations; let mut current_response = initial_response; + let tool_ctx = ToolContext { + user_id: request.user_id.clone(), + scope_key: request.conversation_key.clone(), + is_direct: request.is_direct, + }; + for iteration in 0..max_iterations { let calls = current_response.function_calls(); if calls.is_empty() { @@ -221,7 +65,7 @@ async fn run_tool_loop( side: side.clone(), }); - orchestrator.emit(OrchestratorEvent::ToolExecutionStarted { + orchestrator.emit(OrchestratorEvent::ToolStarted { request_id: request_id.clone(), call_id: call_id.into(), name: fc.name.clone(), @@ -229,31 +73,14 @@ async fn run_tool_loop( let result_str = match side { ToolSide::Server => { - // Execute server-side tool - let result = if fc.name == "research" { - // Research needs special handling (room + event context) - // For now, use the standard execute path - orchestrator - .tools - .execute(&fc.name, &fc.arguments, response_ctx) - .await - } else { - orchestrator - .tools - .execute(&fc.name, &fc.arguments, response_ctx) - .await - }; + let result = orchestrator + .tools + .execute_with_context(&fc.name, &fc.arguments, &tool_ctx) + .await; match result { Ok(s) => { - let preview: String = s.chars().take(500).collect(); - info!( - tool = fc.name.as_str(), - id = call_id, - result_len = s.len(), - result_preview = preview.as_str(), - "Tool result" - ); + info!(tool = fc.name.as_str(), id = call_id, result_len = s.len(), "Tool result"); s } Err(e) => { @@ -263,7 +90,7 @@ async fn run_tool_loop( } } ToolSide::Client => { - // Park on oneshot — gRPC bridge will deliver the result + // Park on oneshot — transport bridge delivers the result let rx = orchestrator.register_pending_tool(call_id).await; match tokio::time::timeout(Duration::from_secs(300), rx).await { Ok(Ok(payload)) => { @@ -281,22 +108,14 @@ async fn run_tool_loop( let success = !result_str.starts_with("Error:"); - orchestrator.emit(OrchestratorEvent::ToolExecutionCompleted { + orchestrator.emit(OrchestratorEvent::ToolCompleted { request_id: request_id.clone(), call_id: call_id.into(), name: fc.name.clone(), - result: result_str.chars().take(200).collect(), + result_preview: result_str.chars().take(200).collect(), success, }); - orchestrator.emit(OrchestratorEvent::AgentProgressStep { - request_id: request_id.clone(), - summary: crate::agent_ux::AgentProgress::format_tool_call( - &fc.name, - &fc.arguments, - ), - }); - result_entries.push(ConversationEntry::FunctionResult(FunctionResultEntry { tool_call_id: call_id.to_string(), result: result_str, @@ -307,17 +126,24 @@ async fn run_tool_loop( })); } - // Send function results back to conversation - current_response = match conversation_registry - .send_function_result(room_id, result_entries, &orchestrator.mistral) + // Send results back to Mistral conversation + let req = AppendConversationRequest { + inputs: ConversationInput::Entries(result_entries), + completion_args: None, + handoff_execution: None, + store: Some(true), + tool_confirmations: None, + stream: false, + }; + + current_response = match orchestrator + .mistral + .append_conversation_async(&conv_id, &req) .await { Ok(r) => r, Err(e) => { - error!("Failed to send function results: {e}"); - orchestrator.emit(OrchestratorEvent::AgentProgressDone { - request_id: request_id.clone(), - }); + error!("Failed to send function results: {}", e.message); return None; } }; @@ -325,23 +151,10 @@ async fn run_tool_loop( debug!(iteration, "Tool iteration complete"); } - orchestrator.emit(OrchestratorEvent::AgentProgressDone { - request_id: request_id.clone(), - }); - - current_response.assistant_text() -} - -/// Load memory notes relevant to the trigger message. -/// TODO (Phase 4): move the full memory::store query logic here -/// when the Responder is dissolved. For now returns None — the Matrix -/// bridge path still uses the responder which has memory loading. -async fn load_memory_notes( - _orchestrator: &Orchestrator, - _ctx: &ResponseContext, - _trigger_body: &str, -) -> Option { - // Memory loading is not yet migrated to the orchestrator. - // The responder's load_memory_notes() still handles this for now. - None + current_response.assistant_text().map(|text| { + (text, TokenUsage { + prompt_tokens: current_response.usage.prompt_tokens, + completion_tokens: current_response.usage.completion_tokens, + }) + }) } diff --git a/src/orchestrator/mod.rs b/src/orchestrator/mod.rs index 91fb28a..3d9fc2e 100644 --- a/src/orchestrator/mod.rs +++ b/src/orchestrator/mod.rs @@ -1,10 +1,12 @@ -//! Event-driven orchestrator — the single response generation pipeline. +//! Event-driven orchestrator — Sol's transport-agnostic response pipeline. //! -//! The orchestrator owns the Mistral tool loop and emits events through -//! a `tokio::broadcast` channel. Transport bridges (Matrix, gRPC, etc.) -//! subscribe to these events and translate them to their protocol. +//! The orchestrator receives a `GenerateRequest`, runs the Mistral +//! conversation + tool loop, and emits `OrchestratorEvent`s through a +//! `tokio::broadcast` channel. It has zero knowledge of Matrix, gRPC, +//! or any specific transport. //! -//! Phase 1: types + channel wiring only. No behavior change. +//! Transport bridges subscribe externally via `subscribe()` and translate +//! events to their protocol. pub mod engine; pub mod event; @@ -20,79 +22,152 @@ pub use event::*; use crate::config::Config; use crate::conversations::ConversationRegistry; -use crate::persistence::Store; use crate::tools::ToolRegistry; const EVENT_CHANNEL_CAPACITY: usize = 256; -/// Result payload from a client-side tool execution. -#[derive(Debug)] -pub struct ToolResultPayload { - pub text: String, - pub is_error: bool, -} - /// The orchestrator — Sol's response generation pipeline. /// -/// Owns the event broadcast channel. Bridges subscribe via `subscribe()`. -/// Phase 2+ will add `generate_chat_response()` and `generate_code_response()`. +/// Owns the event broadcast channel. Transport bridges subscribe via `subscribe()`. +/// Call `generate()` or `generate_from_response()` to run the pipeline. pub struct Orchestrator { pub config: Arc, pub tools: Arc, - pub store: Arc, pub mistral: Arc, - pub conversation_registry: Arc, + pub conversations: Arc, pub system_prompt: String, /// Broadcast sender — all orchestration events go here. event_tx: broadcast::Sender, - /// Pending client-side tool calls awaiting results from gRPC clients. - /// Key: call_id, Value: oneshot sender to unblock the engine. + /// Pending client-side tool calls awaiting results from external sources. pending_client_tools: Arc>>>, } impl Orchestrator { - /// Create a new orchestrator. Returns the orchestrator and an initial - /// event receiver (for the first subscriber, typically the Matrix bridge). pub fn new( config: Arc, tools: Arc, - store: Arc, mistral: Arc, - conversation_registry: Arc, + conversations: Arc, system_prompt: String, ) -> Self { let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY); - info!("Orchestrator initialized (event channel capacity: {EVENT_CHANNEL_CAPACITY})"); Self { config, tools, - store, mistral, - conversation_registry, + conversations, system_prompt, event_tx, pending_client_tools: Arc::new(Mutex::new(HashMap::new())), } } - /// Subscribe to the event stream. Each subscriber gets its own receiver - /// that independently tracks position in the broadcast buffer. + /// Subscribe to the event stream. pub fn subscribe(&self) -> broadcast::Receiver { self.event_tx.subscribe() } /// Emit an event to all subscribers. pub fn emit(&self, event: OrchestratorEvent) { - // Ignore send errors (no subscribers is fine during startup). let _ = self.event_tx.send(event); } - /// Submit a tool result from an external source (e.g., gRPC client). - /// Unblocks the engine's tool loop for the matching call_id. + /// Generate a response using the ConversationRegistry. + /// Creates or appends to a conversation keyed by `request.conversation_key`. + pub async fn generate(&self, request: &GenerateRequest) -> Option { + self.emit(OrchestratorEvent::Started { + request_id: request.request_id.clone(), + metadata: request.metadata.clone(), + }); + + self.emit(OrchestratorEvent::Thinking { + request_id: request.request_id.clone(), + }); + + let input = mistralai_client::v1::conversations::ConversationInput::Text( + request.text.clone(), + ); + + let response = match self.conversations + .send_message( + &request.conversation_key, + input, + request.is_direct, + &self.mistral, + None, + ) + .await + { + Ok(r) => r, + Err(e) => { + self.emit(OrchestratorEvent::Failed { + request_id: request.request_id.clone(), + error: e.clone(), + }); + return None; + } + }; + + self.run_and_emit(request, response).await + } + + /// Generate a response from a pre-built ConversationResponse. + /// The caller already created/appended the conversation externally. + /// The orchestrator only runs the tool loop and emits events. + pub async fn generate_from_response( + &self, + request: &GenerateRequest, + response: mistralai_client::v1::conversations::ConversationResponse, + ) -> Option { + self.emit(OrchestratorEvent::Started { + request_id: request.request_id.clone(), + metadata: request.metadata.clone(), + }); + + self.emit(OrchestratorEvent::Thinking { + request_id: request.request_id.clone(), + }); + + self.run_and_emit(request, response).await + } + + /// Run the tool loop and emit Done/Failed events. + async fn run_and_emit( + &self, + request: &GenerateRequest, + response: mistralai_client::v1::conversations::ConversationResponse, + ) -> Option { + let result = engine::run_tool_loop(self, request, response).await; + + match result { + Some((text, usage)) => { + info!( + prompt_tokens = usage.prompt_tokens, + completion_tokens = usage.completion_tokens, + "Response ready" + ); + self.emit(OrchestratorEvent::Done { + request_id: request.request_id.clone(), + text: text.clone(), + usage, + }); + Some(text) + } + None => { + self.emit(OrchestratorEvent::Failed { + request_id: request.request_id.clone(), + error: "No response from model".into(), + }); + None + } + } + } + + /// Submit a tool result from an external source. pub async fn submit_tool_result( &self, call_id: &str, @@ -112,8 +187,7 @@ impl Orchestrator { Ok(()) } - /// Register a pending client-side tool call. Returns a oneshot receiver - /// that the engine awaits for the result. + /// Register a pending client-side tool call. pub async fn register_pending_tool( &self, call_id: &str, @@ -145,35 +219,14 @@ mod tests { assert_eq!(received.request_id(), &id); } - #[tokio::test] - async fn test_multiple_subscribers() { - let (event_tx, _) = broadcast::channel(16); - let mut rx1 = event_tx.subscribe(); - let mut rx2 = event_tx.subscribe(); - - let id = RequestId::new(); - let _ = event_tx.send(OrchestratorEvent::Thinking { - request_id: id.clone(), - }); - - let r1 = rx1.recv().await.unwrap(); - let r2 = rx2.recv().await.unwrap(); - assert_eq!(r1.request_id(), &id); - assert_eq!(r2.request_id(), &id); - } - #[tokio::test] async fn test_submit_tool_result() { let pending: Arc>>> = Arc::new(Mutex::new(HashMap::new())); let (tx, rx) = oneshot::channel(); - pending - .lock() - .await - .insert("call-1".into(), tx); + pending.lock().await.insert("call-1".into(), tx); - // Simulate submitting result let sender = pending.lock().await.remove("call-1").unwrap(); sender .send(ToolResultPayload {