Simplify admin channel type.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-07-02 23:45:02 +00:00
parent fd0082fd2c
commit 7d2d42542c
2 changed files with 34 additions and 19 deletions

View File

@@ -11,21 +11,20 @@ use std::{
use async_trait::async_trait;
pub use create::create_admin_room;
use futures::{Future, FutureExt, TryFutureExt};
use loole::{Receiver, Sender};
use ruma::{
OwnedEventId, OwnedRoomId, RoomId, UserId,
events::room::message::{Relation, RoomMessageEventContent},
};
use tokio::sync::RwLock;
use tokio::sync::{RwLock, mpsc};
use tuwunel_core::{
Error, Event, Result, Server, debug, err, error, error::default_log, pdu::PduBuilder,
Err, Error, Event, Result, Server, debug, err, error, error::default_log, pdu::PduBuilder,
};
use crate::{Dep, account_data, globals, rooms, rooms::state::RoomMutexGuard};
pub struct Service {
services: Services,
channel: (Sender<CommandInput>, Receiver<CommandInput>),
channel: StdRwLock<Option<mpsc::Sender<CommandInput>>>,
pub handle: RwLock<Option<Processor>>,
pub complete: StdRwLock<Option<Completer>>,
#[cfg(feature = "console")]
@@ -90,7 +89,7 @@ impl crate::Service for Service {
account_data: args.depend::<account_data::Service>("account_data"),
services: None.into(),
},
channel: loole::bounded(COMMAND_QUEUE_LIMIT),
channel: StdRwLock::new(None),
handle: RwLock::new(None),
complete: StdRwLock::new(None),
#[cfg(feature = "console")]
@@ -98,18 +97,23 @@ impl crate::Service for Service {
}))
}
async fn worker(self: Arc<Self>) -> Result<()> {
async fn worker(self: Arc<Self>) -> Result {
let mut signals = self.services.server.signal.subscribe();
let receiver = self.channel.1.clone();
let (sender, mut receiver) = mpsc::channel(COMMAND_QUEUE_LIMIT);
_ = self
.channel
.write()
.expect("locked for writing")
.insert(sender);
self.startup_execute().await?;
self.console_auto_start().await;
loop {
tokio::select! {
command = receiver.recv_async() => match command {
Ok(command) => self.handle_command(command).await,
Err(_) => break,
command = receiver.recv() => match command {
Some(command) => self.handle_command(command).await,
None => break,
},
sig = signals.recv() => match sig {
Ok(sig) => self.handle_signal(sig).await,
@@ -127,10 +131,11 @@ impl crate::Service for Service {
#[cfg(feature = "console")]
self.console.interrupt();
let (sender, _) = &self.channel;
if !sender.is_closed() {
sender.close();
}
_ = self
.channel
.write()
.expect("locked for writing")
.take();
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
@@ -154,7 +159,7 @@ impl Service {
/// Sends a message to the admin room as the admin user (see send_text() for
/// convenience).
pub async fn send_message(&self, message_content: RoomMessageEventContent) -> Result<()> {
pub async fn send_message(&self, message_content: RoomMessageEventContent) -> Result {
let user_id = &self.services.globals.server_user;
let room_id = self.get_admin_room().await?;
self.respond_to_room(message_content, &room_id, user_id)
@@ -165,10 +170,19 @@ impl Service {
/// Posts a command to the command processor queue and returns. Processing
/// will take place on the service worker's task asynchronously. Errors if
/// the queue is full.
pub fn command(&self, command: String, reply_id: Option<OwnedEventId>) -> Result<()> {
self.channel
.0
pub async fn command(&self, command: String, reply_id: Option<OwnedEventId>) -> Result {
let Some(sender) = self
.channel
.read()
.expect("locked for reading")
.clone()
else {
return Err!("Admin command queue unavailable.");
};
sender
.send(CommandInput { command, reply_id })
.await
.map_err(|e| err!("Failed to enqueue admin command: {e:?}"))
}

View File

@@ -374,7 +374,8 @@ where
{
self.services
.admin
.command(body, Some((pdu.event_id()).into()))?;
.command(body, Some((pdu.event_id()).into()))
.await?;
}
}
},