Simplify admin channel type.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -11,21 +11,20 @@ use std::{
|
||||
use async_trait::async_trait;
|
||||
pub use create::create_admin_room;
|
||||
use futures::{Future, FutureExt, TryFutureExt};
|
||||
use loole::{Receiver, Sender};
|
||||
use ruma::{
|
||||
OwnedEventId, OwnedRoomId, RoomId, UserId,
|
||||
events::room::message::{Relation, RoomMessageEventContent},
|
||||
};
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::sync::{RwLock, mpsc};
|
||||
use tuwunel_core::{
|
||||
Error, Event, Result, Server, debug, err, error, error::default_log, pdu::PduBuilder,
|
||||
Err, Error, Event, Result, Server, debug, err, error, error::default_log, pdu::PduBuilder,
|
||||
};
|
||||
|
||||
use crate::{Dep, account_data, globals, rooms, rooms::state::RoomMutexGuard};
|
||||
|
||||
pub struct Service {
|
||||
services: Services,
|
||||
channel: (Sender<CommandInput>, Receiver<CommandInput>),
|
||||
channel: StdRwLock<Option<mpsc::Sender<CommandInput>>>,
|
||||
pub handle: RwLock<Option<Processor>>,
|
||||
pub complete: StdRwLock<Option<Completer>>,
|
||||
#[cfg(feature = "console")]
|
||||
@@ -90,7 +89,7 @@ impl crate::Service for Service {
|
||||
account_data: args.depend::<account_data::Service>("account_data"),
|
||||
services: None.into(),
|
||||
},
|
||||
channel: loole::bounded(COMMAND_QUEUE_LIMIT),
|
||||
channel: StdRwLock::new(None),
|
||||
handle: RwLock::new(None),
|
||||
complete: StdRwLock::new(None),
|
||||
#[cfg(feature = "console")]
|
||||
@@ -98,18 +97,23 @@ impl crate::Service for Service {
|
||||
}))
|
||||
}
|
||||
|
||||
async fn worker(self: Arc<Self>) -> Result<()> {
|
||||
async fn worker(self: Arc<Self>) -> Result {
|
||||
let mut signals = self.services.server.signal.subscribe();
|
||||
let receiver = self.channel.1.clone();
|
||||
let (sender, mut receiver) = mpsc::channel(COMMAND_QUEUE_LIMIT);
|
||||
_ = self
|
||||
.channel
|
||||
.write()
|
||||
.expect("locked for writing")
|
||||
.insert(sender);
|
||||
|
||||
self.startup_execute().await?;
|
||||
self.console_auto_start().await;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
command = receiver.recv_async() => match command {
|
||||
Ok(command) => self.handle_command(command).await,
|
||||
Err(_) => break,
|
||||
command = receiver.recv() => match command {
|
||||
Some(command) => self.handle_command(command).await,
|
||||
None => break,
|
||||
},
|
||||
sig = signals.recv() => match sig {
|
||||
Ok(sig) => self.handle_signal(sig).await,
|
||||
@@ -127,10 +131,11 @@ impl crate::Service for Service {
|
||||
#[cfg(feature = "console")]
|
||||
self.console.interrupt();
|
||||
|
||||
let (sender, _) = &self.channel;
|
||||
if !sender.is_closed() {
|
||||
sender.close();
|
||||
}
|
||||
_ = self
|
||||
.channel
|
||||
.write()
|
||||
.expect("locked for writing")
|
||||
.take();
|
||||
}
|
||||
|
||||
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
|
||||
@@ -154,7 +159,7 @@ impl Service {
|
||||
|
||||
/// Sends a message to the admin room as the admin user (see send_text() for
|
||||
/// convenience).
|
||||
pub async fn send_message(&self, message_content: RoomMessageEventContent) -> Result<()> {
|
||||
pub async fn send_message(&self, message_content: RoomMessageEventContent) -> Result {
|
||||
let user_id = &self.services.globals.server_user;
|
||||
let room_id = self.get_admin_room().await?;
|
||||
self.respond_to_room(message_content, &room_id, user_id)
|
||||
@@ -165,10 +170,19 @@ impl Service {
|
||||
/// Posts a command to the command processor queue and returns. Processing
|
||||
/// will take place on the service worker's task asynchronously. Errors if
|
||||
/// the queue is full.
|
||||
pub fn command(&self, command: String, reply_id: Option<OwnedEventId>) -> Result<()> {
|
||||
self.channel
|
||||
.0
|
||||
pub async fn command(&self, command: String, reply_id: Option<OwnedEventId>) -> Result {
|
||||
let Some(sender) = self
|
||||
.channel
|
||||
.read()
|
||||
.expect("locked for reading")
|
||||
.clone()
|
||||
else {
|
||||
return Err!("Admin command queue unavailable.");
|
||||
};
|
||||
|
||||
sender
|
||||
.send(CommandInput { command, reply_id })
|
||||
.await
|
||||
.map_err(|e| err!("Failed to enqueue admin command: {e:?}"))
|
||||
}
|
||||
|
||||
|
||||
@@ -374,7 +374,8 @@ where
|
||||
{
|
||||
self.services
|
||||
.admin
|
||||
.command(body, Some((pdu.event_id()).into()))?;
|
||||
.command(body, Some((pdu.event_id()).into()))
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user