diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index c10ba25a..0383e038 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -211,7 +211,7 @@ impl Service { .await .expect("Admin module is not loaded"); - handle(Arc::clone(self.services.get_services()), command).await + handle(Arc::clone(self.services.get()), command).await } /// Checks whether a given user is an admin of this server diff --git a/src/service/mod.rs b/src/service/mod.rs index a3093d6a..fe8a4c09 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -3,6 +3,7 @@ mod manager; mod migrations; +mod once_services; mod service; pub mod services; @@ -29,6 +30,7 @@ pub mod transaction_ids; pub mod uiaa; pub mod users; +pub(crate) use once_services::OnceServices; pub(crate) use service::{Args, Service}; pub use crate::services::Services; diff --git a/src/service/once_services.rs b/src/service/once_services.rs new file mode 100644 index 00000000..f8cb3986 --- /dev/null +++ b/src/service/once_services.rs @@ -0,0 +1,52 @@ +use std::{ + ops::Deref, + sync::{Arc, OnceLock}, +}; + +use crate::Services; + +#[derive(Default)] +pub(crate) struct OnceServices { + lock: OnceLock>, +} + +impl OnceServices { + pub(super) fn set(&self, services: Arc) -> Arc { + self.lock.get_or_init(move || services).clone() + } + + #[inline] + pub(crate) fn get(&self) -> &Arc { + self.lock + .get() + .expect("services must be initialized") + } +} + +impl Deref for OnceServices { + type Target = Arc; + + #[inline] + fn deref(&self) -> &Self::Target { self.get() } +} + +// SAFETY: Services has a lot of circularity inherited from Conduit's original +// design. This stresses the trait solver which twists itself into a knot +// proving Sendness. This issue was a lot worse in conduwuit where we used an +// instance of `Dep` for each Service rather than a single instance of +// `OnceServices` like now. The problem still exists though greatly reduced, and +// the same solution now has greater impact because OnceServices is the single +// unified focal-point for the entire Services call-web. +// +// The prior incarnation required this unsafety or it would blow through the +// recursion_limit; that no longer happens. Nevertheless compile times are +// still substantially reduced by asserting Sendness here. Prove sendness +// by simply commenting this out, it will just take longer. +unsafe impl Send for OnceServices {} + +// SAFETY: Similar to Send as explained above, we further reduce compile-times +// by manually asserting Syncness of this type. The only threading contention +// concerns for this would be on startup but this server has a very well defined +// initialization sequence. After that this structure is purely read-only shared +// without concern. +unsafe impl Sync for OnceServices {} diff --git a/src/service/services.rs b/src/service/services.rs index a2d009f3..22d82b13 100644 --- a/src/service/services.rs +++ b/src/service/services.rs @@ -1,13 +1,13 @@ -use std::{ - ops::Deref, - sync::{Arc, OnceLock}, -}; +use std::sync::Arc; use futures::{StreamExt, TryStreamExt}; use tokio::sync::Mutex; -use tuwunel_core::{Result, Server, debug, debug_info, err, info, trace}; +use tuwunel_core::{ + Result, Server, debug, debug_info, implement, info, trace, utils::stream::IterStream, +}; use tuwunel_database::Database; +pub(crate) use crate::OnceServices; use crate::{ account_data, admin, appservice, client, config, deactivate, emergency, federation, globals, key_backups, @@ -65,202 +65,191 @@ pub struct Services { pub db: Arc, } -pub struct OnceServices { - lock: OnceLock>, +#[implement(Services)] +pub async fn build(server: Arc) -> Result> { + let db = Database::open(&server).await?; + let services = Arc::new(OnceServices::default()); + macro_rules! build { + ($tyname:ty) => { + <$tyname>::build(Args { + db: &db, + server: &server, + services: &services, + })? + }; + } + + let res = Arc::new(Self { + account_data: build!(account_data::Service), + admin: build!(admin::Service), + appservice: build!(appservice::Service), + resolver: build!(resolver::Service), + client: build!(client::Service), + config: build!(config::Service), + emergency: build!(emergency::Service), + globals: build!(globals::Service), + key_backups: build!(key_backups::Service), + media: build!(media::Service), + presence: build!(presence::Service), + pusher: build!(pusher::Service), + alias: build!(rooms::alias::Service), + auth_chain: build!(rooms::auth_chain::Service), + delete: build!(rooms::delete::Service), + directory: build!(rooms::directory::Service), + event_handler: build!(rooms::event_handler::Service), + lazy_loading: build!(rooms::lazy_loading::Service), + metadata: build!(rooms::metadata::Service), + pdu_metadata: build!(rooms::pdu_metadata::Service), + read_receipt: build!(rooms::read_receipt::Service), + search: build!(rooms::search::Service), + short: build!(rooms::short::Service), + spaces: build!(rooms::spaces::Service), + state: build!(rooms::state::Service), + state_accessor: build!(rooms::state_accessor::Service), + state_cache: build!(rooms::state_cache::Service), + state_compressor: build!(rooms::state_compressor::Service), + threads: build!(rooms::threads::Service), + timeline: build!(rooms::timeline::Service), + typing: build!(rooms::typing::Service), + user: build!(rooms::user::Service), + federation: build!(federation::Service), + sending: build!(sending::Service), + server_keys: build!(server_keys::Service), + sync: build!(sync::Service), + transaction_ids: build!(transaction_ids::Service), + uiaa: build!(uiaa::Service), + users: build!(users::Service), + membership: build!(membership::Service), + deactivate: build!(deactivate::Service), + + manager: Mutex::new(None), + server, + db, + }); + + Ok(services.set(res)) } -impl OnceServices { - pub fn get_services(&self) -> &Arc { - self.lock - .get() - .expect("services must be initialized") +#[implement(Services)] +pub(crate) fn services(&self) -> impl Iterator> + Send { + macro_rules! cast { + ($s:expr) => { + as Into<_>>::into($s.clone()) + }; + } + + [ + cast!(self.account_data), + cast!(self.admin), + cast!(self.appservice), + cast!(self.resolver), + cast!(self.client), + cast!(self.config), + cast!(self.emergency), + cast!(self.globals), + cast!(self.key_backups), + cast!(self.media), + cast!(self.presence), + cast!(self.pusher), + cast!(self.alias), + cast!(self.auth_chain), + cast!(self.delete), + cast!(self.directory), + cast!(self.event_handler), + cast!(self.lazy_loading), + cast!(self.metadata), + cast!(self.pdu_metadata), + cast!(self.read_receipt), + cast!(self.search), + cast!(self.short), + cast!(self.spaces), + cast!(self.state), + cast!(self.state_accessor), + cast!(self.state_cache), + cast!(self.state_compressor), + cast!(self.threads), + cast!(self.timeline), + cast!(self.typing), + cast!(self.user), + cast!(self.federation), + cast!(self.sending), + cast!(self.server_keys), + cast!(self.sync), + cast!(self.transaction_ids), + cast!(self.uiaa), + cast!(self.users), + cast!(self.membership), + cast!(self.deactivate), + ] + .into_iter() +} + +#[implement(Services)] +pub async fn start(self: &Arc) -> Result> { + debug_info!("Starting services..."); + + super::migrations::migrations(self).await?; + self.manager + .lock() + .await + .insert(Manager::new(self)) + .clone() + .start() + .await?; + + debug_info!("Services startup complete."); + + Ok(Arc::clone(self)) +} + +#[implement(Services)] +pub async fn stop(&self) { + info!("Shutting down services..."); + + self.interrupt().await; + if let Some(manager) = self.manager.lock().await.as_ref() { + manager.stop().await; + } + + debug_info!("Services shutdown complete."); +} + +#[implement(Services)] +pub(crate) async fn interrupt(&self) { + debug!("Interrupting services..."); + for service in self.services() { + let name = service.name(); + trace!("Interrupting {name}"); + service.interrupt().await; } } -impl Deref for OnceServices { - type Target = Arc; +#[implement(Services)] +pub async fn poll(&self) -> Result { + if let Some(manager) = self.manager.lock().await.as_ref() { + return manager.poll().await; + } - fn deref(&self) -> &Self::Target { self.get_services() } + Ok(()) } -impl Services { - #[allow(clippy::cognitive_complexity)] - pub async fn build(server: Arc) -> Result> { - let db = Database::open(&server).await?; - let services = Arc::new(OnceServices { lock: OnceLock::new() }); - macro_rules! build { - ($tyname:ty) => { - <$tyname>::build(Args { - db: &db, - server: &server, - services: &services, - })? - }; - } - - let res = Arc::new(Self { - account_data: build!(account_data::Service), - admin: build!(admin::Service), - appservice: build!(appservice::Service), - resolver: build!(resolver::Service), - client: build!(client::Service), - config: build!(config::Service), - emergency: build!(emergency::Service), - globals: build!(globals::Service), - key_backups: build!(key_backups::Service), - media: build!(media::Service), - presence: build!(presence::Service), - pusher: build!(pusher::Service), - alias: build!(rooms::alias::Service), - auth_chain: build!(rooms::auth_chain::Service), - delete: build!(rooms::delete::Service), - directory: build!(rooms::directory::Service), - event_handler: build!(rooms::event_handler::Service), - lazy_loading: build!(rooms::lazy_loading::Service), - metadata: build!(rooms::metadata::Service), - pdu_metadata: build!(rooms::pdu_metadata::Service), - read_receipt: build!(rooms::read_receipt::Service), - search: build!(rooms::search::Service), - short: build!(rooms::short::Service), - spaces: build!(rooms::spaces::Service), - state: build!(rooms::state::Service), - state_accessor: build!(rooms::state_accessor::Service), - state_cache: build!(rooms::state_cache::Service), - state_compressor: build!(rooms::state_compressor::Service), - threads: build!(rooms::threads::Service), - timeline: build!(rooms::timeline::Service), - typing: build!(rooms::typing::Service), - user: build!(rooms::user::Service), - federation: build!(federation::Service), - sending: build!(sending::Service), - server_keys: build!(server_keys::Service), - sync: build!(sync::Service), - transaction_ids: build!(transaction_ids::Service), - uiaa: build!(uiaa::Service), - users: build!(users::Service), - membership: build!(membership::Service), - deactivate: build!(deactivate::Service), - - manager: Mutex::new(None), - server, - db, - }); - - services - .lock - .set(res.clone()) - .map_err(|_| err!("couldn't set services lock")) - .unwrap(); - - Ok(res) - } - - pub async fn start(self: &Arc) -> Result> { - debug_info!("Starting services..."); - - super::migrations::migrations(self).await?; - self.manager - .lock() - .await - .insert(Manager::new(self)) - .clone() - .start() - .await?; - - debug_info!("Services startup complete."); - - Ok(Arc::clone(self)) - } - - pub async fn stop(&self) { - info!("Shutting down services..."); - - self.interrupt().await; - if let Some(manager) = self.manager.lock().await.as_ref() { - manager.stop().await; - } - - debug_info!("Services shutdown complete."); - } - - pub async fn poll(&self) -> Result { - if let Some(manager) = self.manager.lock().await.as_ref() { - return manager.poll().await; - } - - Ok(()) - } - - pub(crate) fn services(&self) -> [Arc; 41] { - [ - self.account_data.clone(), - self.admin.clone(), - self.appservice.clone(), - self.resolver.clone(), - self.client.clone(), - self.config.clone(), - self.emergency.clone(), - self.globals.clone(), - self.key_backups.clone(), - self.media.clone(), - self.presence.clone(), - self.pusher.clone(), - self.alias.clone(), - self.auth_chain.clone(), - self.delete.clone(), - self.directory.clone(), - self.event_handler.clone(), - self.lazy_loading.clone(), - self.metadata.clone(), - self.pdu_metadata.clone(), - self.read_receipt.clone(), - self.search.clone(), - self.short.clone(), - self.spaces.clone(), - self.state.clone(), - self.state_accessor.clone(), - self.state_cache.clone(), - self.state_compressor.clone(), - self.threads.clone(), - self.timeline.clone(), - self.typing.clone(), - self.user.clone(), - self.federation.clone(), - self.sending.clone(), - self.server_keys.clone(), - self.sync.clone(), - self.transaction_ids.clone(), - self.uiaa.clone(), - self.users.clone(), - self.membership.clone(), - self.deactivate.clone(), - ] - } - - pub async fn clear_cache(&self) { - futures::stream::iter(self.services()) - .for_each(async |service| { - service.clear_cache().await; - }) - .await; - } - - pub async fn memory_usage(&self) -> Result { - futures::stream::iter(self.services()) - .map(Ok) - .try_fold(String::new(), async |mut out, service| { - service.memory_usage(&mut out).await?; - Ok(out) - }) - .await - } - - async fn interrupt(&self) { - debug!("Interrupting services..."); - for service in self.services() { - let name = service.name(); - trace!("Interrupting {name}"); - service.interrupt().await; - } - } +#[implement(Services)] +pub async fn clear_cache(&self) { + self.services() + .stream() + .for_each(async |service| { + service.clear_cache().await; + }) + .await; +} + +#[implement(Services)] +pub async fn memory_usage(&self) -> Result { + self.services() + .try_stream() + .try_fold(String::new(), async |mut out, service| { + service.memory_usage(&mut out).await?; + Ok(out) + }) + .await }