Services refactor
Replace structs of Dep<Service> with OnceServices, so each service has a Services reference Remove service name => Service map Flatten Services.rooms Make reqwest Clients lazy initialized (client service)
This commit is contained in:
@@ -1,19 +1,18 @@
|
||||
use std::{
|
||||
any::Any,
|
||||
collections::BTreeMap,
|
||||
sync::{Arc, RwLock},
|
||||
ops::Deref,
|
||||
sync::{Arc, OnceLock},
|
||||
};
|
||||
|
||||
use futures::{Stream, StreamExt, TryStreamExt};
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
use tokio::sync::Mutex;
|
||||
use tuwunel_core::{Result, Server, debug, debug_info, info, trace, utils::stream::IterStream};
|
||||
use tuwunel_core::{Result, Server, debug, debug_info, err, info, trace};
|
||||
use tuwunel_database::Database;
|
||||
|
||||
use crate::{
|
||||
account_data, admin, appservice, client, config, emergency, federation, globals, key_backups,
|
||||
manager::Manager,
|
||||
media, presence, pusher, resolver, rooms, sending, server_keys, service,
|
||||
service::{Args, Map, Service},
|
||||
media, presence, pusher, resolver, rooms, sending, server_keys,
|
||||
service::{Args, Service},
|
||||
sync, transaction_ids, uiaa, users,
|
||||
};
|
||||
|
||||
@@ -30,7 +29,25 @@ pub struct Services {
|
||||
pub presence: Arc<presence::Service>,
|
||||
pub pusher: Arc<pusher::Service>,
|
||||
pub resolver: Arc<resolver::Service>,
|
||||
pub rooms: rooms::Service,
|
||||
pub alias: Arc<rooms::alias::Service>,
|
||||
pub auth_chain: Arc<rooms::auth_chain::Service>,
|
||||
pub directory: Arc<rooms::directory::Service>,
|
||||
pub event_handler: Arc<rooms::event_handler::Service>,
|
||||
pub lazy_loading: Arc<rooms::lazy_loading::Service>,
|
||||
pub metadata: Arc<rooms::metadata::Service>,
|
||||
pub pdu_metadata: Arc<rooms::pdu_metadata::Service>,
|
||||
pub read_receipt: Arc<rooms::read_receipt::Service>,
|
||||
pub search: Arc<rooms::search::Service>,
|
||||
pub short: Arc<rooms::short::Service>,
|
||||
pub spaces: Arc<rooms::spaces::Service>,
|
||||
pub state: Arc<rooms::state::Service>,
|
||||
pub state_accessor: Arc<rooms::state_accessor::Service>,
|
||||
pub state_cache: Arc<rooms::state_cache::Service>,
|
||||
pub state_compressor: Arc<rooms::state_compressor::Service>,
|
||||
pub threads: Arc<rooms::threads::Service>,
|
||||
pub timeline: Arc<rooms::timeline::Service>,
|
||||
pub typing: Arc<rooms::typing::Service>,
|
||||
pub user: Arc<rooms::user::Service>,
|
||||
pub federation: Arc<federation::Service>,
|
||||
pub sending: Arc<sending::Service>,
|
||||
pub server_keys: Arc<server_keys::Service>,
|
||||
@@ -40,29 +57,44 @@ pub struct Services {
|
||||
pub users: Arc<users::Service>,
|
||||
|
||||
manager: Mutex<Option<Arc<Manager>>>,
|
||||
pub(crate) service: Arc<Map>,
|
||||
pub server: Arc<Server>,
|
||||
pub db: Arc<Database>,
|
||||
}
|
||||
|
||||
pub struct OnceServices {
|
||||
lock: OnceLock<Arc<Services>>,
|
||||
}
|
||||
|
||||
impl OnceServices {
|
||||
pub fn get_services(&self) -> &Arc<Services> {
|
||||
self.lock
|
||||
.get()
|
||||
.expect("services must be initialized")
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for OnceServices {
|
||||
type Target = Arc<Services>;
|
||||
|
||||
fn deref(&self) -> &Self::Target { self.get_services() }
|
||||
}
|
||||
|
||||
impl Services {
|
||||
#[allow(clippy::cognitive_complexity)]
|
||||
pub async fn build(server: Arc<Server>) -> Result<Arc<Self>> {
|
||||
let db = Database::open(&server).await?;
|
||||
let service: Arc<Map> = Arc::new(RwLock::new(BTreeMap::new()));
|
||||
let services = Arc::new(OnceServices { lock: OnceLock::new() });
|
||||
macro_rules! build {
|
||||
($tyname:ty) => {{
|
||||
let built = <$tyname>::build(Args {
|
||||
($tyname:ty) => {
|
||||
<$tyname>::build(Args {
|
||||
db: &db,
|
||||
server: &server,
|
||||
service: &service,
|
||||
})?;
|
||||
add_service(&service, built.clone(), built.clone());
|
||||
built
|
||||
}};
|
||||
services: &services,
|
||||
})?
|
||||
};
|
||||
}
|
||||
|
||||
Ok(Arc::new(Self {
|
||||
let res = Arc::new(Self {
|
||||
account_data: build!(account_data::Service),
|
||||
admin: build!(admin::Service),
|
||||
appservice: build!(appservice::Service),
|
||||
@@ -75,27 +107,25 @@ impl Services {
|
||||
media: build!(media::Service),
|
||||
presence: build!(presence::Service),
|
||||
pusher: build!(pusher::Service),
|
||||
rooms: rooms::Service {
|
||||
alias: build!(rooms::alias::Service),
|
||||
auth_chain: build!(rooms::auth_chain::Service),
|
||||
directory: build!(rooms::directory::Service),
|
||||
event_handler: build!(rooms::event_handler::Service),
|
||||
lazy_loading: build!(rooms::lazy_loading::Service),
|
||||
metadata: build!(rooms::metadata::Service),
|
||||
pdu_metadata: build!(rooms::pdu_metadata::Service),
|
||||
read_receipt: build!(rooms::read_receipt::Service),
|
||||
search: build!(rooms::search::Service),
|
||||
short: build!(rooms::short::Service),
|
||||
spaces: build!(rooms::spaces::Service),
|
||||
state: build!(rooms::state::Service),
|
||||
state_accessor: build!(rooms::state_accessor::Service),
|
||||
state_cache: build!(rooms::state_cache::Service),
|
||||
state_compressor: build!(rooms::state_compressor::Service),
|
||||
threads: build!(rooms::threads::Service),
|
||||
timeline: build!(rooms::timeline::Service),
|
||||
typing: build!(rooms::typing::Service),
|
||||
user: build!(rooms::user::Service),
|
||||
},
|
||||
alias: build!(rooms::alias::Service),
|
||||
auth_chain: build!(rooms::auth_chain::Service),
|
||||
directory: build!(rooms::directory::Service),
|
||||
event_handler: build!(rooms::event_handler::Service),
|
||||
lazy_loading: build!(rooms::lazy_loading::Service),
|
||||
metadata: build!(rooms::metadata::Service),
|
||||
pdu_metadata: build!(rooms::pdu_metadata::Service),
|
||||
read_receipt: build!(rooms::read_receipt::Service),
|
||||
search: build!(rooms::search::Service),
|
||||
short: build!(rooms::short::Service),
|
||||
spaces: build!(rooms::spaces::Service),
|
||||
state: build!(rooms::state::Service),
|
||||
state_accessor: build!(rooms::state_accessor::Service),
|
||||
state_cache: build!(rooms::state_cache::Service),
|
||||
state_compressor: build!(rooms::state_compressor::Service),
|
||||
threads: build!(rooms::threads::Service),
|
||||
timeline: build!(rooms::timeline::Service),
|
||||
typing: build!(rooms::typing::Service),
|
||||
user: build!(rooms::user::Service),
|
||||
federation: build!(federation::Service),
|
||||
sending: build!(sending::Service),
|
||||
server_keys: build!(server_keys::Service),
|
||||
@@ -105,18 +135,22 @@ impl Services {
|
||||
users: build!(users::Service),
|
||||
|
||||
manager: Mutex::new(None),
|
||||
service,
|
||||
server,
|
||||
db,
|
||||
}))
|
||||
});
|
||||
|
||||
services
|
||||
.lock
|
||||
.set(res.clone())
|
||||
.map_err(|_| err!("couldn't set services lock"))
|
||||
.unwrap();
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
pub async fn start(self: &Arc<Self>) -> Result<Arc<Self>> {
|
||||
debug_info!("Starting services...");
|
||||
|
||||
self.admin
|
||||
.set_services(Some(Arc::clone(self)).as_ref());
|
||||
|
||||
super::migrations::migrations(self).await?;
|
||||
self.manager
|
||||
.lock()
|
||||
@@ -157,8 +191,6 @@ impl Services {
|
||||
manager.stop().await;
|
||||
}
|
||||
|
||||
self.admin.set_services(None);
|
||||
|
||||
debug_info!("Services shutdown complete.");
|
||||
}
|
||||
|
||||
@@ -170,8 +202,51 @@ impl Services {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn services(&self) -> [Arc<dyn Service>; 38] {
|
||||
[
|
||||
self.account_data.clone(),
|
||||
self.admin.clone(),
|
||||
self.appservice.clone(),
|
||||
self.resolver.clone(),
|
||||
self.client.clone(),
|
||||
self.config.clone(),
|
||||
self.emergency.clone(),
|
||||
self.globals.clone(),
|
||||
self.key_backups.clone(),
|
||||
self.media.clone(),
|
||||
self.presence.clone(),
|
||||
self.pusher.clone(),
|
||||
self.alias.clone(),
|
||||
self.auth_chain.clone(),
|
||||
self.directory.clone(),
|
||||
self.event_handler.clone(),
|
||||
self.lazy_loading.clone(),
|
||||
self.metadata.clone(),
|
||||
self.pdu_metadata.clone(),
|
||||
self.read_receipt.clone(),
|
||||
self.search.clone(),
|
||||
self.short.clone(),
|
||||
self.spaces.clone(),
|
||||
self.state.clone(),
|
||||
self.state_accessor.clone(),
|
||||
self.state_cache.clone(),
|
||||
self.state_compressor.clone(),
|
||||
self.threads.clone(),
|
||||
self.timeline.clone(),
|
||||
self.typing.clone(),
|
||||
self.user.clone(),
|
||||
self.federation.clone(),
|
||||
self.sending.clone(),
|
||||
self.server_keys.clone(),
|
||||
self.sync.clone(),
|
||||
self.transaction_ids.clone(),
|
||||
self.uiaa.clone(),
|
||||
self.users.clone(),
|
||||
]
|
||||
}
|
||||
|
||||
pub async fn clear_cache(&self) {
|
||||
self.services()
|
||||
futures::stream::iter(self.services())
|
||||
.for_each(async |service| {
|
||||
service.clear_cache().await;
|
||||
})
|
||||
@@ -179,7 +254,7 @@ impl Services {
|
||||
}
|
||||
|
||||
pub async fn memory_usage(&self) -> Result<String> {
|
||||
self.services()
|
||||
futures::stream::iter(self.services())
|
||||
.map(Ok)
|
||||
.try_fold(String::new(), async |mut out, service| {
|
||||
service.memory_usage(&mut out).await?;
|
||||
@@ -190,55 +265,10 @@ impl Services {
|
||||
|
||||
fn interrupt(&self) {
|
||||
debug!("Interrupting services...");
|
||||
for (name, (service, ..)) in self
|
||||
.service
|
||||
.read()
|
||||
.expect("locked for reading")
|
||||
.iter()
|
||||
{
|
||||
if let Some(service) = service.upgrade() {
|
||||
trace!("Interrupting {name}");
|
||||
service.interrupt();
|
||||
}
|
||||
for service in self.services() {
|
||||
let name = service.name();
|
||||
trace!("Interrupting {name}");
|
||||
service.interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterate from snapshot of the services map
|
||||
fn services(&self) -> impl Stream<Item = Arc<dyn Service>> + Send {
|
||||
self.service
|
||||
.read()
|
||||
.expect("locked for reading")
|
||||
.values()
|
||||
.filter_map(|val| val.0.upgrade())
|
||||
.collect::<Vec<_>>()
|
||||
.into_iter()
|
||||
.stream()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn try_get<T>(&self, name: &str) -> Result<Arc<T>>
|
||||
where
|
||||
T: Any + Send + Sync + Sized,
|
||||
{
|
||||
service::try_get::<T>(&self.service, name)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn get<T>(&self, name: &str) -> Option<Arc<T>>
|
||||
where
|
||||
T: Any + Send + Sync + Sized,
|
||||
{
|
||||
service::get::<T>(&self.service, name)
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::needless_pass_by_value)]
|
||||
fn add_service(map: &Arc<Map>, s: Arc<dyn Service>, a: Arc<dyn Any + Send + Sync>) {
|
||||
let name = s.name();
|
||||
let len = map.read().expect("locked for reading").len();
|
||||
|
||||
trace!("built service #{len}: {name:?}");
|
||||
map.write()
|
||||
.expect("locked for writing")
|
||||
.insert(name.to_owned(), (Arc::downgrade(&s), Arc::downgrade(&a)));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user