Simplify admin channel type.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-07-02 23:45:02 +00:00
parent fd0082fd2c
commit 7d2d42542c
2 changed files with 34 additions and 19 deletions

View File

@@ -11,21 +11,20 @@ use std::{
use async_trait::async_trait; use async_trait::async_trait;
pub use create::create_admin_room; pub use create::create_admin_room;
use futures::{Future, FutureExt, TryFutureExt}; use futures::{Future, FutureExt, TryFutureExt};
use loole::{Receiver, Sender};
use ruma::{ use ruma::{
OwnedEventId, OwnedRoomId, RoomId, UserId, OwnedEventId, OwnedRoomId, RoomId, UserId,
events::room::message::{Relation, RoomMessageEventContent}, events::room::message::{Relation, RoomMessageEventContent},
}; };
use tokio::sync::RwLock; use tokio::sync::{RwLock, mpsc};
use tuwunel_core::{ use tuwunel_core::{
Error, Event, Result, Server, debug, err, error, error::default_log, pdu::PduBuilder, Err, Error, Event, Result, Server, debug, err, error, error::default_log, pdu::PduBuilder,
}; };
use crate::{Dep, account_data, globals, rooms, rooms::state::RoomMutexGuard}; use crate::{Dep, account_data, globals, rooms, rooms::state::RoomMutexGuard};
pub struct Service { pub struct Service {
services: Services, services: Services,
channel: (Sender<CommandInput>, Receiver<CommandInput>), channel: StdRwLock<Option<mpsc::Sender<CommandInput>>>,
pub handle: RwLock<Option<Processor>>, pub handle: RwLock<Option<Processor>>,
pub complete: StdRwLock<Option<Completer>>, pub complete: StdRwLock<Option<Completer>>,
#[cfg(feature = "console")] #[cfg(feature = "console")]
@@ -90,7 +89,7 @@ impl crate::Service for Service {
account_data: args.depend::<account_data::Service>("account_data"), account_data: args.depend::<account_data::Service>("account_data"),
services: None.into(), services: None.into(),
}, },
channel: loole::bounded(COMMAND_QUEUE_LIMIT), channel: StdRwLock::new(None),
handle: RwLock::new(None), handle: RwLock::new(None),
complete: StdRwLock::new(None), complete: StdRwLock::new(None),
#[cfg(feature = "console")] #[cfg(feature = "console")]
@@ -98,18 +97,23 @@ impl crate::Service for Service {
})) }))
} }
async fn worker(self: Arc<Self>) -> Result<()> { async fn worker(self: Arc<Self>) -> Result {
let mut signals = self.services.server.signal.subscribe(); let mut signals = self.services.server.signal.subscribe();
let receiver = self.channel.1.clone(); let (sender, mut receiver) = mpsc::channel(COMMAND_QUEUE_LIMIT);
_ = self
.channel
.write()
.expect("locked for writing")
.insert(sender);
self.startup_execute().await?; self.startup_execute().await?;
self.console_auto_start().await; self.console_auto_start().await;
loop { loop {
tokio::select! { tokio::select! {
command = receiver.recv_async() => match command { command = receiver.recv() => match command {
Ok(command) => self.handle_command(command).await, Some(command) => self.handle_command(command).await,
Err(_) => break, None => break,
}, },
sig = signals.recv() => match sig { sig = signals.recv() => match sig {
Ok(sig) => self.handle_signal(sig).await, Ok(sig) => self.handle_signal(sig).await,
@@ -127,10 +131,11 @@ impl crate::Service for Service {
#[cfg(feature = "console")] #[cfg(feature = "console")]
self.console.interrupt(); self.console.interrupt();
let (sender, _) = &self.channel; _ = self
if !sender.is_closed() { .channel
sender.close(); .write()
} .expect("locked for writing")
.take();
} }
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
@@ -154,7 +159,7 @@ impl Service {
/// Sends a message to the admin room as the admin user (see send_text() for /// Sends a message to the admin room as the admin user (see send_text() for
/// convenience). /// convenience).
pub async fn send_message(&self, message_content: RoomMessageEventContent) -> Result<()> { pub async fn send_message(&self, message_content: RoomMessageEventContent) -> Result {
let user_id = &self.services.globals.server_user; let user_id = &self.services.globals.server_user;
let room_id = self.get_admin_room().await?; let room_id = self.get_admin_room().await?;
self.respond_to_room(message_content, &room_id, user_id) self.respond_to_room(message_content, &room_id, user_id)
@@ -165,10 +170,19 @@ impl Service {
/// Posts a command to the command processor queue and returns. Processing /// Posts a command to the command processor queue and returns. Processing
/// will take place on the service worker's task asynchronously. Errors if /// will take place on the service worker's task asynchronously. Errors if
/// the queue is full. /// the queue is full.
pub fn command(&self, command: String, reply_id: Option<OwnedEventId>) -> Result<()> { pub async fn command(&self, command: String, reply_id: Option<OwnedEventId>) -> Result {
self.channel let Some(sender) = self
.0 .channel
.read()
.expect("locked for reading")
.clone()
else {
return Err!("Admin command queue unavailable.");
};
sender
.send(CommandInput { command, reply_id }) .send(CommandInput { command, reply_id })
.await
.map_err(|e| err!("Failed to enqueue admin command: {e:?}")) .map_err(|e| err!("Failed to enqueue admin command: {e:?}"))
} }

View File

@@ -374,7 +374,8 @@ where
{ {
self.services self.services
.admin .admin
.command(body, Some((pdu.event_id()).into()))?; .command(body, Some((pdu.event_id()).into()))
.await?;
} }
} }
}, },