Implement declarative appservices. (closes #67)
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -4,10 +4,10 @@ mod registration_info;
|
||||
use std::{collections::BTreeMap, iter::IntoIterator, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use futures::{Future, FutureExt, Stream, TryStreamExt};
|
||||
use futures::{Future, FutureExt, Stream, StreamExt, TryStreamExt};
|
||||
use ruma::{RoomAliasId, RoomId, UserId, api::appservice::Registration};
|
||||
use tokio::sync::{RwLock, RwLockReadGuard};
|
||||
use tuwunel_core::{Result, err, utils::stream::IterStream};
|
||||
use tuwunel_core::{Err, Result, Server, debug, err, utils::stream::IterStream};
|
||||
use tuwunel_database::Map;
|
||||
|
||||
pub use self::{namespace_regex::NamespaceRegex, registration_info::RegistrationInfo};
|
||||
@@ -21,6 +21,7 @@ pub struct Service {
|
||||
|
||||
struct Services {
|
||||
sending: Dep<sending::Service>,
|
||||
server: Arc<Server>,
|
||||
}
|
||||
|
||||
struct Data {
|
||||
@@ -36,6 +37,7 @@ impl crate::Service for Service {
|
||||
registration_info: RwLock::new(BTreeMap::new()),
|
||||
services: Services {
|
||||
sending: args.depend::<sending::Service>("sending"),
|
||||
server: args.server.clone(),
|
||||
},
|
||||
db: Data {
|
||||
id_appserviceregistrations: args.db["id_appserviceregistrations"].clone(),
|
||||
@@ -44,23 +46,48 @@ impl crate::Service for Service {
|
||||
}
|
||||
|
||||
async fn worker(self: Arc<Self>) -> Result {
|
||||
// Inserting registrations into cache
|
||||
self.iter_db_ids()
|
||||
.try_for_each(async |appservice| {
|
||||
self.registration_info
|
||||
.write()
|
||||
.await
|
||||
.insert(appservice.0, appservice.1.try_into()?);
|
||||
self.init_registrations().await?;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
|
||||
}
|
||||
|
||||
impl Service {
|
||||
#[tracing::instrument(name = "init", skip(self))]
|
||||
async fn init_registrations(&self) -> Result {
|
||||
// Registrations from configuration file
|
||||
let confs = self
|
||||
.services
|
||||
.server
|
||||
.config
|
||||
.appservice
|
||||
.clone()
|
||||
.into_iter()
|
||||
.stream()
|
||||
.map(|(id, mut reg)| {
|
||||
reg.id.clone_from(&id);
|
||||
reg.sender_localpart
|
||||
.get_or_insert_with(|| id.clone());
|
||||
|
||||
Ok((id, reg))
|
||||
});
|
||||
|
||||
// Registrations from database
|
||||
self.iter_db_ids()
|
||||
.chain(confs.map_ok(|(id, reg)| (id, reg.into())))
|
||||
.try_for_each(async |(id, reg): (_, Registration)| {
|
||||
debug!(?id, ?reg, "appservice registration");
|
||||
self.registration_info
|
||||
.write()
|
||||
.await
|
||||
.insert(id.clone(), reg.try_into()?)
|
||||
.map_or(Ok(()), |_| Err!("Conflicting Appservice ID: {id:?}"))
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Registers an appservice and returns the ID to the caller
|
||||
pub async fn register_appservice(
|
||||
&self,
|
||||
|
||||
Reference in New Issue
Block a user