feat(orchestrator): Phase 1 — event types + broadcast channel foundation

Introduces the orchestrator module with:
- OrchestratorEvent enum: 11 event variants covering lifecycle, tools,
  progress, and side effects
- RequestId (UUID per generation), ResponseMode (Chat/Code), ToolSide
- ChatRequest/CodeRequest structs for transport-agnostic request input
- Orchestrator struct with tokio::broadcast channel (capacity 256)
- subscribe() for transport bridges, emit() for the engine
- Client-side tool dispatch: pending_client_tools map with oneshot channels
- submit_tool_result() to unblock engine from gRPC client responses

Additive only — no behavior change. Existing responder + gRPC session
paths are untouched. Phase 2 will migrate the Conversations API path.
This commit is contained in:
2026-03-23 17:30:36 +00:00
parent b8b76687a5
commit ec4fde7b97
4 changed files with 477 additions and 1 deletions

View File

@@ -171,7 +171,7 @@ mod tests {
#[test]
fn test_orchestrator_request() {
let req = orchestrator_request("test prompt", "mistral-medium-latest", vec![], &[]);
let req = orchestrator_request("test prompt", "mistral-medium-latest", vec![], &[], "sol-orchestrator");
assert_eq!(req.name, "sol-orchestrator");
assert_eq!(req.model, "mistral-medium-latest");
assert!(req.instructions.unwrap().contains("test prompt"));

View File

@@ -9,6 +9,7 @@ mod matrix_utils;
mod memory;
mod persistence;
mod grpc;
mod orchestrator;
mod sdk;
mod sync;
mod time_context;

288
src/orchestrator/event.rs Normal file
View File

@@ -0,0 +1,288 @@
//! Orchestrator events — the contract between Sol's response pipeline
//! and its presentation layers (Matrix, gRPC, future web/API).
//!
//! Every state transition in the response lifecycle is an event.
//! Transport bridges subscribe to these and translate to their protocol.
use std::fmt;
/// Unique identifier for a response generation request.
/// One per user message → response cycle (including all tool iterations).
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct RequestId(pub String);
impl RequestId {
pub fn new() -> Self {
Self(uuid::Uuid::new_v4().to_string())
}
}
impl fmt::Display for RequestId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
/// Which transport initiated this request.
#[derive(Debug, Clone)]
pub enum ResponseMode {
/// Standard Matrix chat (DM, room, thread).
Chat {
room_id: String,
is_spontaneous: bool,
use_thread: bool,
trigger_event_id: String,
},
/// Coding session via gRPC (`sunbeam code`).
Code {
session_id: String,
room_id: String,
},
}
/// Whether a tool executes on the server or on a connected client.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ToolSide {
Server,
Client,
}
/// An event emitted by the orchestrator during response generation.
/// Transport bridges subscribe to these and translate to their protocol.
#[derive(Debug, Clone)]
pub enum OrchestratorEvent {
// ── Lifecycle ──────────────────────────────────────────────
/// Response generation has begun.
ResponseStarted {
request_id: RequestId,
mode: ResponseMode,
},
/// The model is generating (typing indicator equivalent).
Thinking {
request_id: RequestId,
},
/// Final response text is ready.
ResponseReady {
request_id: RequestId,
text: String,
prompt_tokens: u32,
completion_tokens: u32,
tool_iterations: u32,
},
/// Response generation failed.
ResponseFailed {
request_id: RequestId,
error: String,
},
// ── Tool execution ─────────────────────────────────────────
/// A tool call was detected in the model's output.
ToolCallDetected {
request_id: RequestId,
call_id: String,
name: String,
args: String,
side: ToolSide,
},
/// A tool started executing.
ToolExecutionStarted {
request_id: RequestId,
call_id: String,
name: String,
},
/// A tool finished executing.
ToolExecutionCompleted {
request_id: RequestId,
call_id: String,
name: String,
result: String,
success: bool,
},
// ── Progress (presentation hints) ──────────────────────────
/// Agentic progress started (first tool call in a response).
AgentProgressStarted {
request_id: RequestId,
},
/// A progress step (tool call summary for display).
AgentProgressStep {
request_id: RequestId,
summary: String,
},
/// Agentic progress complete (all tool iterations done).
AgentProgressDone {
request_id: RequestId,
},
// ── Side effects ───────────────────────────────────────────
/// Memory extraction should run for this exchange.
MemoryExtractionScheduled {
request_id: RequestId,
user_msg: String,
response: String,
},
}
impl OrchestratorEvent {
/// Get the request ID for any event variant.
pub fn request_id(&self) -> &RequestId {
match self {
Self::ResponseStarted { request_id, .. }
| Self::Thinking { request_id }
| Self::ResponseReady { request_id, .. }
| Self::ResponseFailed { request_id, .. }
| Self::ToolCallDetected { request_id, .. }
| Self::ToolExecutionStarted { request_id, .. }
| Self::ToolExecutionCompleted { request_id, .. }
| Self::AgentProgressStarted { request_id }
| Self::AgentProgressStep { request_id, .. }
| Self::AgentProgressDone { request_id }
| Self::MemoryExtractionScheduled { request_id, .. } => request_id,
}
}
}
/// Request to generate a chat response (Matrix path).
#[derive(Debug, Clone)]
pub struct ChatRequest {
pub request_id: RequestId,
pub trigger_body: String,
pub trigger_sender: String,
pub room_id: String,
pub room_name: String,
pub is_dm: bool,
pub is_spontaneous: bool,
pub use_thread: bool,
pub trigger_event_id: String,
pub image_data_uri: Option<String>,
pub context_hint: Option<String>,
}
/// Request to generate a coding response (gRPC path).
#[derive(Debug, Clone)]
pub struct CodeRequest {
pub request_id: RequestId,
pub session_id: String,
pub text: String,
pub project_name: String,
pub project_path: String,
pub prompt_md: String,
pub model: String,
pub room_id: String,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_request_id_unique() {
let a = RequestId::new();
let b = RequestId::new();
assert_ne!(a, b);
}
#[test]
fn test_event_request_id_accessor() {
let id = RequestId::new();
let event = OrchestratorEvent::Thinking {
request_id: id.clone(),
};
assert_eq!(event.request_id(), &id);
}
#[test]
fn test_response_mode_variants() {
let chat = ResponseMode::Chat {
room_id: "!room:test".into(),
is_spontaneous: false,
use_thread: true,
trigger_event_id: "$event".into(),
};
assert!(matches!(chat, ResponseMode::Chat { .. }));
let code = ResponseMode::Code {
session_id: "sess-1".into(),
room_id: "!room:test".into(),
};
assert!(matches!(code, ResponseMode::Code { .. }));
}
#[test]
fn test_tool_side() {
assert_ne!(ToolSide::Server, ToolSide::Client);
}
#[test]
fn test_all_event_variants_have_request_id() {
let id = RequestId::new();
let events = vec![
OrchestratorEvent::ResponseStarted {
request_id: id.clone(),
mode: ResponseMode::Chat {
room_id: "!r:t".into(),
is_spontaneous: false,
use_thread: false,
trigger_event_id: "$e".into(),
},
},
OrchestratorEvent::Thinking { request_id: id.clone() },
OrchestratorEvent::ResponseReady {
request_id: id.clone(),
text: "hi".into(),
prompt_tokens: 0,
completion_tokens: 0,
tool_iterations: 0,
},
OrchestratorEvent::ResponseFailed {
request_id: id.clone(),
error: "err".into(),
},
OrchestratorEvent::ToolCallDetected {
request_id: id.clone(),
call_id: "c1".into(),
name: "bash".into(),
args: "{}".into(),
side: ToolSide::Server,
},
OrchestratorEvent::ToolExecutionStarted {
request_id: id.clone(),
call_id: "c1".into(),
name: "bash".into(),
},
OrchestratorEvent::ToolExecutionCompleted {
request_id: id.clone(),
call_id: "c1".into(),
name: "bash".into(),
result: "ok".into(),
success: true,
},
OrchestratorEvent::AgentProgressStarted { request_id: id.clone() },
OrchestratorEvent::AgentProgressStep {
request_id: id.clone(),
summary: "step".into(),
},
OrchestratorEvent::AgentProgressDone { request_id: id.clone() },
OrchestratorEvent::MemoryExtractionScheduled {
request_id: id.clone(),
user_msg: "q".into(),
response: "a".into(),
},
];
for event in &events {
assert_eq!(event.request_id(), &id);
}
}
}

187
src/orchestrator/mod.rs Normal file
View File

@@ -0,0 +1,187 @@
//! Event-driven orchestrator — the single response generation pipeline.
//!
//! The orchestrator owns the Mistral tool loop and emits events through
//! a `tokio::broadcast` channel. Transport bridges (Matrix, gRPC, etc.)
//! subscribe to these events and translate them to their protocol.
//!
//! Phase 1: types + channel wiring only. No behavior change.
pub mod event;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{broadcast, oneshot, Mutex};
use tracing::info;
pub use event::*;
use crate::config::Config;
use crate::conversations::ConversationRegistry;
use crate::persistence::Store;
use crate::tools::ToolRegistry;
const EVENT_CHANNEL_CAPACITY: usize = 256;
/// Result payload from a client-side tool execution.
#[derive(Debug)]
pub struct ToolResultPayload {
pub text: String,
pub is_error: bool,
}
/// The orchestrator — Sol's response generation pipeline.
///
/// Owns the event broadcast channel. Bridges subscribe via `subscribe()`.
/// Phase 2+ will add `generate_chat_response()` and `generate_code_response()`.
pub struct Orchestrator {
pub config: Arc<Config>,
pub tools: Arc<ToolRegistry>,
pub store: Arc<Store>,
pub mistral: Arc<mistralai_client::v1::client::Client>,
pub conversation_registry: Arc<ConversationRegistry>,
pub system_prompt: String,
/// Broadcast sender — all orchestration events go here.
event_tx: broadcast::Sender<OrchestratorEvent>,
/// Pending client-side tool calls awaiting results from gRPC clients.
/// Key: call_id, Value: oneshot sender to unblock the engine.
pending_client_tools: Arc<Mutex<HashMap<String, oneshot::Sender<ToolResultPayload>>>>,
}
impl Orchestrator {
/// Create a new orchestrator. Returns the orchestrator and an initial
/// event receiver (for the first subscriber, typically the Matrix bridge).
pub fn new(
config: Arc<Config>,
tools: Arc<ToolRegistry>,
store: Arc<Store>,
mistral: Arc<mistralai_client::v1::client::Client>,
conversation_registry: Arc<ConversationRegistry>,
system_prompt: String,
) -> Self {
let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
info!("Orchestrator initialized (event channel capacity: {EVENT_CHANNEL_CAPACITY})");
Self {
config,
tools,
store,
mistral,
conversation_registry,
system_prompt,
event_tx,
pending_client_tools: Arc::new(Mutex::new(HashMap::new())),
}
}
/// Subscribe to the event stream. Each subscriber gets its own receiver
/// that independently tracks position in the broadcast buffer.
pub fn subscribe(&self) -> broadcast::Receiver<OrchestratorEvent> {
self.event_tx.subscribe()
}
/// Emit an event to all subscribers.
pub fn emit(&self, event: OrchestratorEvent) {
// Ignore send errors (no subscribers is fine during startup).
let _ = self.event_tx.send(event);
}
/// Submit a tool result from an external source (e.g., gRPC client).
/// Unblocks the engine's tool loop for the matching call_id.
pub async fn submit_tool_result(
&self,
call_id: &str,
result: ToolResultPayload,
) -> anyhow::Result<()> {
let sender = self
.pending_client_tools
.lock()
.await
.remove(call_id)
.ok_or_else(|| anyhow::anyhow!("No pending tool call with id {call_id}"))?;
sender
.send(result)
.map_err(|_| anyhow::anyhow!("Tool result receiver dropped for {call_id}"))?;
Ok(())
}
/// Register a pending client-side tool call. Returns a oneshot receiver
/// that the engine awaits for the result.
pub async fn register_pending_tool(
&self,
call_id: &str,
) -> oneshot::Receiver<ToolResultPayload> {
let (tx, rx) = oneshot::channel();
self.pending_client_tools
.lock()
.await
.insert(call_id.to_string(), tx);
rx
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_subscribe_and_emit() {
let (event_tx, _) = broadcast::channel(16);
let mut rx = event_tx.subscribe();
let id = RequestId::new();
let _ = event_tx.send(OrchestratorEvent::Thinking {
request_id: id.clone(),
});
let received = rx.recv().await.unwrap();
assert_eq!(received.request_id(), &id);
}
#[tokio::test]
async fn test_multiple_subscribers() {
let (event_tx, _) = broadcast::channel(16);
let mut rx1 = event_tx.subscribe();
let mut rx2 = event_tx.subscribe();
let id = RequestId::new();
let _ = event_tx.send(OrchestratorEvent::Thinking {
request_id: id.clone(),
});
let r1 = rx1.recv().await.unwrap();
let r2 = rx2.recv().await.unwrap();
assert_eq!(r1.request_id(), &id);
assert_eq!(r2.request_id(), &id);
}
#[tokio::test]
async fn test_submit_tool_result() {
let pending: Arc<Mutex<HashMap<String, oneshot::Sender<ToolResultPayload>>>> =
Arc::new(Mutex::new(HashMap::new()));
let (tx, rx) = oneshot::channel();
pending
.lock()
.await
.insert("call-1".into(), tx);
// Simulate submitting result
let sender = pending.lock().await.remove("call-1").unwrap();
sender
.send(ToolResultPayload {
text: "file contents".into(),
is_error: false,
})
.unwrap();
let result = rx.await.unwrap();
assert_eq!(result.text, "file contents");
assert!(!result.is_error);
}
}