From 7d2d42542c7468273d50dfcc6202d500d85debec Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Wed, 2 Jul 2025 23:45:02 +0000 Subject: [PATCH] Simplify admin channel type. Signed-off-by: Jason Volk --- src/service/admin/mod.rs | 50 ++++++++++++++++++---------- src/service/rooms/timeline/append.rs | 3 +- 2 files changed, 34 insertions(+), 19 deletions(-) diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index b75f28e3..698e169a 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -11,21 +11,20 @@ use std::{ use async_trait::async_trait; pub use create::create_admin_room; use futures::{Future, FutureExt, TryFutureExt}; -use loole::{Receiver, Sender}; use ruma::{ OwnedEventId, OwnedRoomId, RoomId, UserId, events::room::message::{Relation, RoomMessageEventContent}, }; -use tokio::sync::RwLock; +use tokio::sync::{RwLock, mpsc}; use tuwunel_core::{ - Error, Event, Result, Server, debug, err, error, error::default_log, pdu::PduBuilder, + Err, Error, Event, Result, Server, debug, err, error, error::default_log, pdu::PduBuilder, }; use crate::{Dep, account_data, globals, rooms, rooms::state::RoomMutexGuard}; pub struct Service { services: Services, - channel: (Sender, Receiver), + channel: StdRwLock>>, pub handle: RwLock>, pub complete: StdRwLock>, #[cfg(feature = "console")] @@ -90,7 +89,7 @@ impl crate::Service for Service { account_data: args.depend::("account_data"), services: None.into(), }, - channel: loole::bounded(COMMAND_QUEUE_LIMIT), + channel: StdRwLock::new(None), handle: RwLock::new(None), complete: StdRwLock::new(None), #[cfg(feature = "console")] @@ -98,18 +97,23 @@ impl crate::Service for Service { })) } - async fn worker(self: Arc) -> Result<()> { + async fn worker(self: Arc) -> Result { let mut signals = self.services.server.signal.subscribe(); - let receiver = self.channel.1.clone(); + let (sender, mut receiver) = mpsc::channel(COMMAND_QUEUE_LIMIT); + _ = self + .channel + .write() + .expect("locked for writing") + .insert(sender); self.startup_execute().await?; self.console_auto_start().await; loop { tokio::select! { - command = receiver.recv_async() => match command { - Ok(command) => self.handle_command(command).await, - Err(_) => break, + command = receiver.recv() => match command { + Some(command) => self.handle_command(command).await, + None => break, }, sig = signals.recv() => match sig { Ok(sig) => self.handle_signal(sig).await, @@ -127,10 +131,11 @@ impl crate::Service for Service { #[cfg(feature = "console")] self.console.interrupt(); - let (sender, _) = &self.channel; - if !sender.is_closed() { - sender.close(); - } + _ = self + .channel + .write() + .expect("locked for writing") + .take(); } fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } @@ -154,7 +159,7 @@ impl Service { /// Sends a message to the admin room as the admin user (see send_text() for /// convenience). - pub async fn send_message(&self, message_content: RoomMessageEventContent) -> Result<()> { + pub async fn send_message(&self, message_content: RoomMessageEventContent) -> Result { let user_id = &self.services.globals.server_user; let room_id = self.get_admin_room().await?; self.respond_to_room(message_content, &room_id, user_id) @@ -165,10 +170,19 @@ impl Service { /// Posts a command to the command processor queue and returns. Processing /// will take place on the service worker's task asynchronously. Errors if /// the queue is full. - pub fn command(&self, command: String, reply_id: Option) -> Result<()> { - self.channel - .0 + pub async fn command(&self, command: String, reply_id: Option) -> Result { + let Some(sender) = self + .channel + .read() + .expect("locked for reading") + .clone() + else { + return Err!("Admin command queue unavailable."); + }; + + sender .send(CommandInput { command, reply_id }) + .await .map_err(|e| err!("Failed to enqueue admin command: {e:?}")) } diff --git a/src/service/rooms/timeline/append.rs b/src/service/rooms/timeline/append.rs index cc0154df..62454006 100644 --- a/src/service/rooms/timeline/append.rs +++ b/src/service/rooms/timeline/append.rs @@ -374,7 +374,8 @@ where { self.services .admin - .command(body, Some((pdu.event_id()).into()))?; + .command(body, Some((pdu.event_id()).into())) + .await?; } } },