From ec4fde7b97bac18f48eae57801e5f0c484b62ae0 Mon Sep 17 00:00:00 2001 From: Sienna Meridian Satterwhite Date: Mon, 23 Mar 2026 17:30:36 +0000 Subject: [PATCH] =?UTF-8?q?feat(orchestrator):=20Phase=201=20=E2=80=94=20e?= =?UTF-8?q?vent=20types=20+=20broadcast=20channel=20foundation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces the orchestrator module with: - OrchestratorEvent enum: 11 event variants covering lifecycle, tools, progress, and side effects - RequestId (UUID per generation), ResponseMode (Chat/Code), ToolSide - ChatRequest/CodeRequest structs for transport-agnostic request input - Orchestrator struct with tokio::broadcast channel (capacity 256) - subscribe() for transport bridges, emit() for the engine - Client-side tool dispatch: pending_client_tools map with oneshot channels - submit_tool_result() to unblock engine from gRPC client responses Additive only — no behavior change. Existing responder + gRPC session paths are untouched. Phase 2 will migrate the Conversations API path. --- src/agents/definitions.rs | 2 +- src/main.rs | 1 + src/orchestrator/event.rs | 288 ++++++++++++++++++++++++++++++++++++++ src/orchestrator/mod.rs | 187 +++++++++++++++++++++++++ 4 files changed, 477 insertions(+), 1 deletion(-) create mode 100644 src/orchestrator/event.rs create mode 100644 src/orchestrator/mod.rs diff --git a/src/agents/definitions.rs b/src/agents/definitions.rs index e50cb76..cdba63e 100644 --- a/src/agents/definitions.rs +++ b/src/agents/definitions.rs @@ -171,7 +171,7 @@ mod tests { #[test] fn test_orchestrator_request() { - let req = orchestrator_request("test prompt", "mistral-medium-latest", vec![], &[]); + let req = orchestrator_request("test prompt", "mistral-medium-latest", vec![], &[], "sol-orchestrator"); assert_eq!(req.name, "sol-orchestrator"); assert_eq!(req.model, "mistral-medium-latest"); assert!(req.instructions.unwrap().contains("test prompt")); diff --git a/src/main.rs b/src/main.rs index f70b214..f324f9a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,6 +9,7 @@ mod matrix_utils; mod memory; mod persistence; mod grpc; +mod orchestrator; mod sdk; mod sync; mod time_context; diff --git a/src/orchestrator/event.rs b/src/orchestrator/event.rs new file mode 100644 index 0000000..cbaa46f --- /dev/null +++ b/src/orchestrator/event.rs @@ -0,0 +1,288 @@ +//! Orchestrator events — the contract between Sol's response pipeline +//! and its presentation layers (Matrix, gRPC, future web/API). +//! +//! Every state transition in the response lifecycle is an event. +//! Transport bridges subscribe to these and translate to their protocol. + +use std::fmt; + +/// Unique identifier for a response generation request. +/// One per user message → response cycle (including all tool iterations). +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct RequestId(pub String); + +impl RequestId { + pub fn new() -> Self { + Self(uuid::Uuid::new_v4().to_string()) + } +} + +impl fmt::Display for RequestId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +/// Which transport initiated this request. +#[derive(Debug, Clone)] +pub enum ResponseMode { + /// Standard Matrix chat (DM, room, thread). + Chat { + room_id: String, + is_spontaneous: bool, + use_thread: bool, + trigger_event_id: String, + }, + /// Coding session via gRPC (`sunbeam code`). + Code { + session_id: String, + room_id: String, + }, +} + +/// Whether a tool executes on the server or on a connected client. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ToolSide { + Server, + Client, +} + +/// An event emitted by the orchestrator during response generation. +/// Transport bridges subscribe to these and translate to their protocol. +#[derive(Debug, Clone)] +pub enum OrchestratorEvent { + // ── Lifecycle ────────────────────────────────────────────── + + /// Response generation has begun. + ResponseStarted { + request_id: RequestId, + mode: ResponseMode, + }, + + /// The model is generating (typing indicator equivalent). + Thinking { + request_id: RequestId, + }, + + /// Final response text is ready. + ResponseReady { + request_id: RequestId, + text: String, + prompt_tokens: u32, + completion_tokens: u32, + tool_iterations: u32, + }, + + /// Response generation failed. + ResponseFailed { + request_id: RequestId, + error: String, + }, + + // ── Tool execution ───────────────────────────────────────── + + /// A tool call was detected in the model's output. + ToolCallDetected { + request_id: RequestId, + call_id: String, + name: String, + args: String, + side: ToolSide, + }, + + /// A tool started executing. + ToolExecutionStarted { + request_id: RequestId, + call_id: String, + name: String, + }, + + /// A tool finished executing. + ToolExecutionCompleted { + request_id: RequestId, + call_id: String, + name: String, + result: String, + success: bool, + }, + + // ── Progress (presentation hints) ────────────────────────── + + /// Agentic progress started (first tool call in a response). + AgentProgressStarted { + request_id: RequestId, + }, + + /// A progress step (tool call summary for display). + AgentProgressStep { + request_id: RequestId, + summary: String, + }, + + /// Agentic progress complete (all tool iterations done). + AgentProgressDone { + request_id: RequestId, + }, + + // ── Side effects ─────────────────────────────────────────── + + /// Memory extraction should run for this exchange. + MemoryExtractionScheduled { + request_id: RequestId, + user_msg: String, + response: String, + }, +} + +impl OrchestratorEvent { + /// Get the request ID for any event variant. + pub fn request_id(&self) -> &RequestId { + match self { + Self::ResponseStarted { request_id, .. } + | Self::Thinking { request_id } + | Self::ResponseReady { request_id, .. } + | Self::ResponseFailed { request_id, .. } + | Self::ToolCallDetected { request_id, .. } + | Self::ToolExecutionStarted { request_id, .. } + | Self::ToolExecutionCompleted { request_id, .. } + | Self::AgentProgressStarted { request_id } + | Self::AgentProgressStep { request_id, .. } + | Self::AgentProgressDone { request_id } + | Self::MemoryExtractionScheduled { request_id, .. } => request_id, + } + } +} + +/// Request to generate a chat response (Matrix path). +#[derive(Debug, Clone)] +pub struct ChatRequest { + pub request_id: RequestId, + pub trigger_body: String, + pub trigger_sender: String, + pub room_id: String, + pub room_name: String, + pub is_dm: bool, + pub is_spontaneous: bool, + pub use_thread: bool, + pub trigger_event_id: String, + pub image_data_uri: Option, + pub context_hint: Option, +} + +/// Request to generate a coding response (gRPC path). +#[derive(Debug, Clone)] +pub struct CodeRequest { + pub request_id: RequestId, + pub session_id: String, + pub text: String, + pub project_name: String, + pub project_path: String, + pub prompt_md: String, + pub model: String, + pub room_id: String, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_request_id_unique() { + let a = RequestId::new(); + let b = RequestId::new(); + assert_ne!(a, b); + } + + #[test] + fn test_event_request_id_accessor() { + let id = RequestId::new(); + let event = OrchestratorEvent::Thinking { + request_id: id.clone(), + }; + assert_eq!(event.request_id(), &id); + } + + #[test] + fn test_response_mode_variants() { + let chat = ResponseMode::Chat { + room_id: "!room:test".into(), + is_spontaneous: false, + use_thread: true, + trigger_event_id: "$event".into(), + }; + assert!(matches!(chat, ResponseMode::Chat { .. })); + + let code = ResponseMode::Code { + session_id: "sess-1".into(), + room_id: "!room:test".into(), + }; + assert!(matches!(code, ResponseMode::Code { .. })); + } + + #[test] + fn test_tool_side() { + assert_ne!(ToolSide::Server, ToolSide::Client); + } + + #[test] + fn test_all_event_variants_have_request_id() { + let id = RequestId::new(); + let events = vec![ + OrchestratorEvent::ResponseStarted { + request_id: id.clone(), + mode: ResponseMode::Chat { + room_id: "!r:t".into(), + is_spontaneous: false, + use_thread: false, + trigger_event_id: "$e".into(), + }, + }, + OrchestratorEvent::Thinking { request_id: id.clone() }, + OrchestratorEvent::ResponseReady { + request_id: id.clone(), + text: "hi".into(), + prompt_tokens: 0, + completion_tokens: 0, + tool_iterations: 0, + }, + OrchestratorEvent::ResponseFailed { + request_id: id.clone(), + error: "err".into(), + }, + OrchestratorEvent::ToolCallDetected { + request_id: id.clone(), + call_id: "c1".into(), + name: "bash".into(), + args: "{}".into(), + side: ToolSide::Server, + }, + OrchestratorEvent::ToolExecutionStarted { + request_id: id.clone(), + call_id: "c1".into(), + name: "bash".into(), + }, + OrchestratorEvent::ToolExecutionCompleted { + request_id: id.clone(), + call_id: "c1".into(), + name: "bash".into(), + result: "ok".into(), + success: true, + }, + OrchestratorEvent::AgentProgressStarted { request_id: id.clone() }, + OrchestratorEvent::AgentProgressStep { + request_id: id.clone(), + summary: "step".into(), + }, + OrchestratorEvent::AgentProgressDone { request_id: id.clone() }, + OrchestratorEvent::MemoryExtractionScheduled { + request_id: id.clone(), + user_msg: "q".into(), + response: "a".into(), + }, + ]; + + for event in &events { + assert_eq!(event.request_id(), &id); + } + } +} diff --git a/src/orchestrator/mod.rs b/src/orchestrator/mod.rs new file mode 100644 index 0000000..bf636c2 --- /dev/null +++ b/src/orchestrator/mod.rs @@ -0,0 +1,187 @@ +//! Event-driven orchestrator — the single response generation pipeline. +//! +//! The orchestrator owns the Mistral tool loop and emits events through +//! a `tokio::broadcast` channel. Transport bridges (Matrix, gRPC, etc.) +//! subscribe to these events and translate them to their protocol. +//! +//! Phase 1: types + channel wiring only. No behavior change. + +pub mod event; + +use std::collections::HashMap; +use std::sync::Arc; + +use tokio::sync::{broadcast, oneshot, Mutex}; +use tracing::info; + +pub use event::*; + +use crate::config::Config; +use crate::conversations::ConversationRegistry; +use crate::persistence::Store; +use crate::tools::ToolRegistry; + +const EVENT_CHANNEL_CAPACITY: usize = 256; + +/// Result payload from a client-side tool execution. +#[derive(Debug)] +pub struct ToolResultPayload { + pub text: String, + pub is_error: bool, +} + +/// The orchestrator — Sol's response generation pipeline. +/// +/// Owns the event broadcast channel. Bridges subscribe via `subscribe()`. +/// Phase 2+ will add `generate_chat_response()` and `generate_code_response()`. +pub struct Orchestrator { + pub config: Arc, + pub tools: Arc, + pub store: Arc, + pub mistral: Arc, + pub conversation_registry: Arc, + pub system_prompt: String, + + /// Broadcast sender — all orchestration events go here. + event_tx: broadcast::Sender, + + /// Pending client-side tool calls awaiting results from gRPC clients. + /// Key: call_id, Value: oneshot sender to unblock the engine. + pending_client_tools: Arc>>>, +} + +impl Orchestrator { + /// Create a new orchestrator. Returns the orchestrator and an initial + /// event receiver (for the first subscriber, typically the Matrix bridge). + pub fn new( + config: Arc, + tools: Arc, + store: Arc, + mistral: Arc, + conversation_registry: Arc, + system_prompt: String, + ) -> Self { + let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY); + + info!("Orchestrator initialized (event channel capacity: {EVENT_CHANNEL_CAPACITY})"); + + Self { + config, + tools, + store, + mistral, + conversation_registry, + system_prompt, + event_tx, + pending_client_tools: Arc::new(Mutex::new(HashMap::new())), + } + } + + /// Subscribe to the event stream. Each subscriber gets its own receiver + /// that independently tracks position in the broadcast buffer. + pub fn subscribe(&self) -> broadcast::Receiver { + self.event_tx.subscribe() + } + + /// Emit an event to all subscribers. + pub fn emit(&self, event: OrchestratorEvent) { + // Ignore send errors (no subscribers is fine during startup). + let _ = self.event_tx.send(event); + } + + /// Submit a tool result from an external source (e.g., gRPC client). + /// Unblocks the engine's tool loop for the matching call_id. + pub async fn submit_tool_result( + &self, + call_id: &str, + result: ToolResultPayload, + ) -> anyhow::Result<()> { + let sender = self + .pending_client_tools + .lock() + .await + .remove(call_id) + .ok_or_else(|| anyhow::anyhow!("No pending tool call with id {call_id}"))?; + + sender + .send(result) + .map_err(|_| anyhow::anyhow!("Tool result receiver dropped for {call_id}"))?; + + Ok(()) + } + + /// Register a pending client-side tool call. Returns a oneshot receiver + /// that the engine awaits for the result. + pub async fn register_pending_tool( + &self, + call_id: &str, + ) -> oneshot::Receiver { + let (tx, rx) = oneshot::channel(); + self.pending_client_tools + .lock() + .await + .insert(call_id.to_string(), tx); + rx + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_subscribe_and_emit() { + let (event_tx, _) = broadcast::channel(16); + let mut rx = event_tx.subscribe(); + + let id = RequestId::new(); + let _ = event_tx.send(OrchestratorEvent::Thinking { + request_id: id.clone(), + }); + + let received = rx.recv().await.unwrap(); + assert_eq!(received.request_id(), &id); + } + + #[tokio::test] + async fn test_multiple_subscribers() { + let (event_tx, _) = broadcast::channel(16); + let mut rx1 = event_tx.subscribe(); + let mut rx2 = event_tx.subscribe(); + + let id = RequestId::new(); + let _ = event_tx.send(OrchestratorEvent::Thinking { + request_id: id.clone(), + }); + + let r1 = rx1.recv().await.unwrap(); + let r2 = rx2.recv().await.unwrap(); + assert_eq!(r1.request_id(), &id); + assert_eq!(r2.request_id(), &id); + } + + #[tokio::test] + async fn test_submit_tool_result() { + let pending: Arc>>> = + Arc::new(Mutex::new(HashMap::new())); + + let (tx, rx) = oneshot::channel(); + pending + .lock() + .await + .insert("call-1".into(), tx); + + // Simulate submitting result + let sender = pending.lock().await.remove("call-1").unwrap(); + sender + .send(ToolResultPayload { + text: "file contents".into(), + is_error: false, + }) + .unwrap(); + + let result = rx.await.unwrap(); + assert_eq!(result.text, "file contents"); + assert!(!result.is_error); + } +}