Simplify admin channel type.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -11,21 +11,20 @@ use std::{
|
|||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
pub use create::create_admin_room;
|
pub use create::create_admin_room;
|
||||||
use futures::{Future, FutureExt, TryFutureExt};
|
use futures::{Future, FutureExt, TryFutureExt};
|
||||||
use loole::{Receiver, Sender};
|
|
||||||
use ruma::{
|
use ruma::{
|
||||||
OwnedEventId, OwnedRoomId, RoomId, UserId,
|
OwnedEventId, OwnedRoomId, RoomId, UserId,
|
||||||
events::room::message::{Relation, RoomMessageEventContent},
|
events::room::message::{Relation, RoomMessageEventContent},
|
||||||
};
|
};
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::{RwLock, mpsc};
|
||||||
use tuwunel_core::{
|
use tuwunel_core::{
|
||||||
Error, Event, Result, Server, debug, err, error, error::default_log, pdu::PduBuilder,
|
Err, Error, Event, Result, Server, debug, err, error, error::default_log, pdu::PduBuilder,
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::{Dep, account_data, globals, rooms, rooms::state::RoomMutexGuard};
|
use crate::{Dep, account_data, globals, rooms, rooms::state::RoomMutexGuard};
|
||||||
|
|
||||||
pub struct Service {
|
pub struct Service {
|
||||||
services: Services,
|
services: Services,
|
||||||
channel: (Sender<CommandInput>, Receiver<CommandInput>),
|
channel: StdRwLock<Option<mpsc::Sender<CommandInput>>>,
|
||||||
pub handle: RwLock<Option<Processor>>,
|
pub handle: RwLock<Option<Processor>>,
|
||||||
pub complete: StdRwLock<Option<Completer>>,
|
pub complete: StdRwLock<Option<Completer>>,
|
||||||
#[cfg(feature = "console")]
|
#[cfg(feature = "console")]
|
||||||
@@ -90,7 +89,7 @@ impl crate::Service for Service {
|
|||||||
account_data: args.depend::<account_data::Service>("account_data"),
|
account_data: args.depend::<account_data::Service>("account_data"),
|
||||||
services: None.into(),
|
services: None.into(),
|
||||||
},
|
},
|
||||||
channel: loole::bounded(COMMAND_QUEUE_LIMIT),
|
channel: StdRwLock::new(None),
|
||||||
handle: RwLock::new(None),
|
handle: RwLock::new(None),
|
||||||
complete: StdRwLock::new(None),
|
complete: StdRwLock::new(None),
|
||||||
#[cfg(feature = "console")]
|
#[cfg(feature = "console")]
|
||||||
@@ -98,18 +97,23 @@ impl crate::Service for Service {
|
|||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn worker(self: Arc<Self>) -> Result<()> {
|
async fn worker(self: Arc<Self>) -> Result {
|
||||||
let mut signals = self.services.server.signal.subscribe();
|
let mut signals = self.services.server.signal.subscribe();
|
||||||
let receiver = self.channel.1.clone();
|
let (sender, mut receiver) = mpsc::channel(COMMAND_QUEUE_LIMIT);
|
||||||
|
_ = self
|
||||||
|
.channel
|
||||||
|
.write()
|
||||||
|
.expect("locked for writing")
|
||||||
|
.insert(sender);
|
||||||
|
|
||||||
self.startup_execute().await?;
|
self.startup_execute().await?;
|
||||||
self.console_auto_start().await;
|
self.console_auto_start().await;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
command = receiver.recv_async() => match command {
|
command = receiver.recv() => match command {
|
||||||
Ok(command) => self.handle_command(command).await,
|
Some(command) => self.handle_command(command).await,
|
||||||
Err(_) => break,
|
None => break,
|
||||||
},
|
},
|
||||||
sig = signals.recv() => match sig {
|
sig = signals.recv() => match sig {
|
||||||
Ok(sig) => self.handle_signal(sig).await,
|
Ok(sig) => self.handle_signal(sig).await,
|
||||||
@@ -127,10 +131,11 @@ impl crate::Service for Service {
|
|||||||
#[cfg(feature = "console")]
|
#[cfg(feature = "console")]
|
||||||
self.console.interrupt();
|
self.console.interrupt();
|
||||||
|
|
||||||
let (sender, _) = &self.channel;
|
_ = self
|
||||||
if !sender.is_closed() {
|
.channel
|
||||||
sender.close();
|
.write()
|
||||||
}
|
.expect("locked for writing")
|
||||||
|
.take();
|
||||||
}
|
}
|
||||||
|
|
||||||
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
|
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
|
||||||
@@ -154,7 +159,7 @@ impl Service {
|
|||||||
|
|
||||||
/// Sends a message to the admin room as the admin user (see send_text() for
|
/// Sends a message to the admin room as the admin user (see send_text() for
|
||||||
/// convenience).
|
/// convenience).
|
||||||
pub async fn send_message(&self, message_content: RoomMessageEventContent) -> Result<()> {
|
pub async fn send_message(&self, message_content: RoomMessageEventContent) -> Result {
|
||||||
let user_id = &self.services.globals.server_user;
|
let user_id = &self.services.globals.server_user;
|
||||||
let room_id = self.get_admin_room().await?;
|
let room_id = self.get_admin_room().await?;
|
||||||
self.respond_to_room(message_content, &room_id, user_id)
|
self.respond_to_room(message_content, &room_id, user_id)
|
||||||
@@ -165,10 +170,19 @@ impl Service {
|
|||||||
/// Posts a command to the command processor queue and returns. Processing
|
/// Posts a command to the command processor queue and returns. Processing
|
||||||
/// will take place on the service worker's task asynchronously. Errors if
|
/// will take place on the service worker's task asynchronously. Errors if
|
||||||
/// the queue is full.
|
/// the queue is full.
|
||||||
pub fn command(&self, command: String, reply_id: Option<OwnedEventId>) -> Result<()> {
|
pub async fn command(&self, command: String, reply_id: Option<OwnedEventId>) -> Result {
|
||||||
self.channel
|
let Some(sender) = self
|
||||||
.0
|
.channel
|
||||||
|
.read()
|
||||||
|
.expect("locked for reading")
|
||||||
|
.clone()
|
||||||
|
else {
|
||||||
|
return Err!("Admin command queue unavailable.");
|
||||||
|
};
|
||||||
|
|
||||||
|
sender
|
||||||
.send(CommandInput { command, reply_id })
|
.send(CommandInput { command, reply_id })
|
||||||
|
.await
|
||||||
.map_err(|e| err!("Failed to enqueue admin command: {e:?}"))
|
.map_err(|e| err!("Failed to enqueue admin command: {e:?}"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -374,7 +374,8 @@ where
|
|||||||
{
|
{
|
||||||
self.services
|
self.services
|
||||||
.admin
|
.admin
|
||||||
.command(body, Some((pdu.event_id()).into()))?;
|
.command(body, Some((pdu.event_id()).into()))
|
||||||
|
.await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|||||||
Reference in New Issue
Block a user