use std::{ ops::Deref, sync::{Arc, OnceLock}, }; use futures::{StreamExt, TryStreamExt}; use tokio::sync::Mutex; use tuwunel_core::{Result, Server, debug, debug_info, err, info, trace}; use tuwunel_database::Database; use crate::{ account_data, admin, appservice, client, config, deactivate, emergency, federation, globals, key_backups, manager::Manager, media, membership, presence, pusher, resolver, rooms, sending, server_keys, service::{Args, Service}, sync, transaction_ids, uiaa, users, }; pub struct Services { pub account_data: Arc, pub admin: Arc, pub appservice: Arc, pub config: Arc, pub client: Arc, pub emergency: Arc, pub globals: Arc, pub key_backups: Arc, pub media: Arc, pub presence: Arc, pub pusher: Arc, pub resolver: Arc, pub alias: Arc, pub auth_chain: Arc, pub directory: Arc, pub event_handler: Arc, pub lazy_loading: Arc, pub metadata: Arc, pub pdu_metadata: Arc, pub read_receipt: Arc, pub search: Arc, pub short: Arc, pub spaces: Arc, pub state: Arc, pub state_accessor: Arc, pub state_cache: Arc, pub state_compressor: Arc, pub threads: Arc, pub timeline: Arc, pub typing: Arc, pub user: Arc, pub federation: Arc, pub sending: Arc, pub server_keys: Arc, pub sync: Arc, pub transaction_ids: Arc, pub uiaa: Arc, pub users: Arc, pub membership: Arc, pub deactivate: Arc, manager: Mutex>>, pub server: Arc, pub db: Arc, } pub struct OnceServices { lock: OnceLock>, } impl OnceServices { pub fn get_services(&self) -> &Arc { self.lock .get() .expect("services must be initialized") } } impl Deref for OnceServices { type Target = Arc; fn deref(&self) -> &Self::Target { self.get_services() } } impl Services { #[allow(clippy::cognitive_complexity)] pub async fn build(server: Arc) -> Result> { let db = Database::open(&server).await?; let services = Arc::new(OnceServices { lock: OnceLock::new() }); macro_rules! build { ($tyname:ty) => { <$tyname>::build(Args { db: &db, server: &server, services: &services, })? }; } let res = Arc::new(Self { account_data: build!(account_data::Service), admin: build!(admin::Service), appservice: build!(appservice::Service), resolver: build!(resolver::Service), client: build!(client::Service), config: build!(config::Service), emergency: build!(emergency::Service), globals: build!(globals::Service), key_backups: build!(key_backups::Service), media: build!(media::Service), presence: build!(presence::Service), pusher: build!(pusher::Service), alias: build!(rooms::alias::Service), auth_chain: build!(rooms::auth_chain::Service), directory: build!(rooms::directory::Service), event_handler: build!(rooms::event_handler::Service), lazy_loading: build!(rooms::lazy_loading::Service), metadata: build!(rooms::metadata::Service), pdu_metadata: build!(rooms::pdu_metadata::Service), read_receipt: build!(rooms::read_receipt::Service), search: build!(rooms::search::Service), short: build!(rooms::short::Service), spaces: build!(rooms::spaces::Service), state: build!(rooms::state::Service), state_accessor: build!(rooms::state_accessor::Service), state_cache: build!(rooms::state_cache::Service), state_compressor: build!(rooms::state_compressor::Service), threads: build!(rooms::threads::Service), timeline: build!(rooms::timeline::Service), typing: build!(rooms::typing::Service), user: build!(rooms::user::Service), federation: build!(federation::Service), sending: build!(sending::Service), server_keys: build!(server_keys::Service), sync: build!(sync::Service), transaction_ids: build!(transaction_ids::Service), uiaa: build!(uiaa::Service), users: build!(users::Service), membership: build!(membership::Service), deactivate: build!(deactivate::Service), manager: Mutex::new(None), server, db, }); services .lock .set(res.clone()) .map_err(|_| err!("couldn't set services lock")) .unwrap(); Ok(res) } pub async fn start(self: &Arc) -> Result> { debug_info!("Starting services..."); super::migrations::migrations(self).await?; self.manager .lock() .await .insert(Manager::new(self)) .clone() .start() .await?; debug_info!("Services startup complete."); Ok(Arc::clone(self)) } pub async fn stop(&self) { info!("Shutting down services..."); self.interrupt().await; if let Some(manager) = self.manager.lock().await.as_ref() { manager.stop().await; } debug_info!("Services shutdown complete."); } pub async fn poll(&self) -> Result { if let Some(manager) = self.manager.lock().await.as_ref() { return manager.poll().await; } Ok(()) } pub(crate) fn services(&self) -> [Arc; 40] { [ self.account_data.clone(), self.admin.clone(), self.appservice.clone(), self.resolver.clone(), self.client.clone(), self.config.clone(), self.emergency.clone(), self.globals.clone(), self.key_backups.clone(), self.media.clone(), self.presence.clone(), self.pusher.clone(), self.alias.clone(), self.auth_chain.clone(), self.directory.clone(), self.event_handler.clone(), self.lazy_loading.clone(), self.metadata.clone(), self.pdu_metadata.clone(), self.read_receipt.clone(), self.search.clone(), self.short.clone(), self.spaces.clone(), self.state.clone(), self.state_accessor.clone(), self.state_cache.clone(), self.state_compressor.clone(), self.threads.clone(), self.timeline.clone(), self.typing.clone(), self.user.clone(), self.federation.clone(), self.sending.clone(), self.server_keys.clone(), self.sync.clone(), self.transaction_ids.clone(), self.uiaa.clone(), self.users.clone(), self.membership.clone(), self.deactivate.clone(), ] } pub async fn clear_cache(&self) { futures::stream::iter(self.services()) .for_each(async |service| { service.clear_cache().await; }) .await; } pub async fn memory_usage(&self) -> Result { futures::stream::iter(self.services()) .map(Ok) .try_fold(String::new(), async |mut out, service| { service.memory_usage(&mut out).await?; Ok(out) }) .await } async fn interrupt(&self) { debug!("Interrupting services..."); for service in self.services() { let name = service.name(); trace!("Interrupting {name}"); service.interrupt().await; } } }