refactor(grpc): move bridge out of orchestrator into grpc module
grpc_bridge.rs → grpc/bridge.rs. The bridge maps OrchestratorEvents to protobuf ServerMessages — it imports from orchestrator (downstream) and grpc protos (sibling). The orchestrator never imports from grpc.
This commit is contained in:
130
src/grpc/bridge.rs
Normal file
130
src/grpc/bridge.rs
Normal file
@@ -0,0 +1,130 @@
|
||||
//! gRPC bridge — maps OrchestratorEvents to protobuf ServerMessages.
|
||||
//! Lives in the gRPC module (NOT in the orchestrator) — the orchestrator
|
||||
//! has zero knowledge of gRPC proto types.
|
||||
|
||||
use tokio::sync::{broadcast, mpsc};
|
||||
use tracing::warn;
|
||||
|
||||
use crate::orchestrator::event::*;
|
||||
|
||||
// Proto types from sibling module
|
||||
use super::{
|
||||
ServerMessage, TextDone, ToolCall, Error,
|
||||
StatusKind, server_message,
|
||||
};
|
||||
|
||||
// Proto Status (not tonic::Status)
|
||||
type ProtoStatus = super::Status;
|
||||
|
||||
/// Forward orchestrator events for a specific request to the gRPC client stream.
|
||||
/// Exits when a terminal event (Done/Failed) is received.
|
||||
pub async fn bridge_events_to_grpc(
|
||||
request_id: RequestId,
|
||||
mut event_rx: broadcast::Receiver<OrchestratorEvent>,
|
||||
client_tx: mpsc::Sender<Result<ServerMessage, tonic::Status>>,
|
||||
) {
|
||||
loop {
|
||||
match event_rx.recv().await {
|
||||
Ok(event) => {
|
||||
if event.request_id() != &request_id {
|
||||
continue;
|
||||
}
|
||||
|
||||
let (msg, terminal) = map_event(event);
|
||||
|
||||
if let Some(msg) = msg {
|
||||
if client_tx.send(Ok(msg)).await.is_err() {
|
||||
warn!("gRPC client disconnected");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if terminal {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(broadcast::error::RecvError::Lagged(n)) => {
|
||||
warn!(n, "gRPC bridge lagged");
|
||||
}
|
||||
Err(broadcast::error::RecvError::Closed) => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Map an orchestrator event to a ServerMessage. Returns (message, is_terminal).
|
||||
fn map_event(event: OrchestratorEvent) -> (Option<ServerMessage>, bool) {
|
||||
match event {
|
||||
OrchestratorEvent::Started { .. } => (None, false),
|
||||
|
||||
OrchestratorEvent::Thinking { .. } => (
|
||||
Some(ServerMessage {
|
||||
payload: Some(server_message::Payload::Status(ProtoStatus {
|
||||
message: "generating…".into(),
|
||||
kind: StatusKind::Thinking.into(),
|
||||
})),
|
||||
}),
|
||||
false,
|
||||
),
|
||||
|
||||
OrchestratorEvent::ToolCallDetected { call_id, name, args, side, .. } => {
|
||||
if side == ToolSide::Client {
|
||||
(
|
||||
Some(ServerMessage {
|
||||
payload: Some(server_message::Payload::ToolCall(ToolCall {
|
||||
call_id,
|
||||
name,
|
||||
args_json: args,
|
||||
is_local: true,
|
||||
needs_approval: true,
|
||||
})),
|
||||
}),
|
||||
false,
|
||||
)
|
||||
} else {
|
||||
(
|
||||
Some(ServerMessage {
|
||||
payload: Some(server_message::Payload::Status(ProtoStatus {
|
||||
message: format!("executing {name}…"),
|
||||
kind: StatusKind::ToolRunning.into(),
|
||||
})),
|
||||
}),
|
||||
false,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
OrchestratorEvent::ToolCompleted { name, success, .. } => (
|
||||
Some(ServerMessage {
|
||||
payload: Some(server_message::Payload::Status(ProtoStatus {
|
||||
message: if success { format!("{name} done") } else { format!("{name} failed") },
|
||||
kind: StatusKind::ToolDone.into(),
|
||||
})),
|
||||
}),
|
||||
false,
|
||||
),
|
||||
|
||||
OrchestratorEvent::Done { text, usage, .. } => (
|
||||
Some(ServerMessage {
|
||||
payload: Some(server_message::Payload::Done(TextDone {
|
||||
full_text: text,
|
||||
input_tokens: usage.prompt_tokens,
|
||||
output_tokens: usage.completion_tokens,
|
||||
})),
|
||||
}),
|
||||
true,
|
||||
),
|
||||
|
||||
OrchestratorEvent::Failed { error, .. } => (
|
||||
Some(ServerMessage {
|
||||
payload: Some(server_message::Payload::Error(Error {
|
||||
message: error,
|
||||
fatal: false,
|
||||
})),
|
||||
}),
|
||||
true,
|
||||
),
|
||||
|
||||
// ToolStarted — not forwarded (ToolCallDetected is sufficient)
|
||||
_ => (None, false),
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,5 @@
|
||||
pub mod auth;
|
||||
pub mod bridge;
|
||||
pub mod router;
|
||||
pub mod service;
|
||||
pub mod session;
|
||||
@@ -27,6 +28,7 @@ pub struct GrpcState {
|
||||
pub matrix: matrix_sdk::Client,
|
||||
pub system_prompt: String,
|
||||
pub orchestrator_agent_id: String,
|
||||
pub orchestrator: Option<Arc<crate::orchestrator::Orchestrator>>,
|
||||
}
|
||||
|
||||
/// Start the gRPC server. Call from main.rs alongside the Matrix sync loop.
|
||||
|
||||
Reference in New Issue
Block a user