From 64facb33443d241e48da0a1f52b8615068dc5989 Mon Sep 17 00:00:00 2001 From: Sienna Meridian Satterwhite Date: Mon, 23 Mar 2026 19:22:22 +0000 Subject: [PATCH] refactor(grpc): move bridge out of orchestrator into grpc module MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit grpc_bridge.rs → grpc/bridge.rs. The bridge maps OrchestratorEvents to protobuf ServerMessages — it imports from orchestrator (downstream) and grpc protos (sibling). The orchestrator never imports from grpc. --- src/grpc/bridge.rs | 130 +++++++++++++++++++++++++++++++++++++++++++++ src/grpc/mod.rs | 2 + 2 files changed, 132 insertions(+) create mode 100644 src/grpc/bridge.rs diff --git a/src/grpc/bridge.rs b/src/grpc/bridge.rs new file mode 100644 index 0000000..0ad867f --- /dev/null +++ b/src/grpc/bridge.rs @@ -0,0 +1,130 @@ +//! gRPC bridge — maps OrchestratorEvents to protobuf ServerMessages. +//! Lives in the gRPC module (NOT in the orchestrator) — the orchestrator +//! has zero knowledge of gRPC proto types. + +use tokio::sync::{broadcast, mpsc}; +use tracing::warn; + +use crate::orchestrator::event::*; + +// Proto types from sibling module +use super::{ + ServerMessage, TextDone, ToolCall, Error, + StatusKind, server_message, +}; + +// Proto Status (not tonic::Status) +type ProtoStatus = super::Status; + +/// Forward orchestrator events for a specific request to the gRPC client stream. +/// Exits when a terminal event (Done/Failed) is received. +pub async fn bridge_events_to_grpc( + request_id: RequestId, + mut event_rx: broadcast::Receiver, + client_tx: mpsc::Sender>, +) { + loop { + match event_rx.recv().await { + Ok(event) => { + if event.request_id() != &request_id { + continue; + } + + let (msg, terminal) = map_event(event); + + if let Some(msg) = msg { + if client_tx.send(Ok(msg)).await.is_err() { + warn!("gRPC client disconnected"); + break; + } + } + + if terminal { + break; + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!(n, "gRPC bridge lagged"); + } + Err(broadcast::error::RecvError::Closed) => break, + } + } +} + +/// Map an orchestrator event to a ServerMessage. Returns (message, is_terminal). +fn map_event(event: OrchestratorEvent) -> (Option, bool) { + match event { + OrchestratorEvent::Started { .. } => (None, false), + + OrchestratorEvent::Thinking { .. } => ( + Some(ServerMessage { + payload: Some(server_message::Payload::Status(ProtoStatus { + message: "generating…".into(), + kind: StatusKind::Thinking.into(), + })), + }), + false, + ), + + OrchestratorEvent::ToolCallDetected { call_id, name, args, side, .. } => { + if side == ToolSide::Client { + ( + Some(ServerMessage { + payload: Some(server_message::Payload::ToolCall(ToolCall { + call_id, + name, + args_json: args, + is_local: true, + needs_approval: true, + })), + }), + false, + ) + } else { + ( + Some(ServerMessage { + payload: Some(server_message::Payload::Status(ProtoStatus { + message: format!("executing {name}…"), + kind: StatusKind::ToolRunning.into(), + })), + }), + false, + ) + } + } + + OrchestratorEvent::ToolCompleted { name, success, .. } => ( + Some(ServerMessage { + payload: Some(server_message::Payload::Status(ProtoStatus { + message: if success { format!("{name} done") } else { format!("{name} failed") }, + kind: StatusKind::ToolDone.into(), + })), + }), + false, + ), + + OrchestratorEvent::Done { text, usage, .. } => ( + Some(ServerMessage { + payload: Some(server_message::Payload::Done(TextDone { + full_text: text, + input_tokens: usage.prompt_tokens, + output_tokens: usage.completion_tokens, + })), + }), + true, + ), + + OrchestratorEvent::Failed { error, .. } => ( + Some(ServerMessage { + payload: Some(server_message::Payload::Error(Error { + message: error, + fatal: false, + })), + }), + true, + ), + + // ToolStarted — not forwarded (ToolCallDetected is sufficient) + _ => (None, false), + } +} diff --git a/src/grpc/mod.rs b/src/grpc/mod.rs index dc3170a..6f3ba9b 100644 --- a/src/grpc/mod.rs +++ b/src/grpc/mod.rs @@ -1,4 +1,5 @@ pub mod auth; +pub mod bridge; pub mod router; pub mod service; pub mod session; @@ -27,6 +28,7 @@ pub struct GrpcState { pub matrix: matrix_sdk::Client, pub system_prompt: String, pub orchestrator_agent_id: String, + pub orchestrator: Option>, } /// Start the gRPC server. Call from main.rs alongside the Matrix sync loop.