Optimize OnceServices; simplify init.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -211,7 +211,7 @@ impl Service {
|
|||||||
.await
|
.await
|
||||||
.expect("Admin module is not loaded");
|
.expect("Admin module is not loaded");
|
||||||
|
|
||||||
handle(Arc::clone(self.services.get_services()), command).await
|
handle(Arc::clone(self.services.get()), command).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Checks whether a given user is an admin of this server
|
/// Checks whether a given user is an admin of this server
|
||||||
|
|||||||
@@ -3,6 +3,7 @@
|
|||||||
|
|
||||||
mod manager;
|
mod manager;
|
||||||
mod migrations;
|
mod migrations;
|
||||||
|
mod once_services;
|
||||||
mod service;
|
mod service;
|
||||||
pub mod services;
|
pub mod services;
|
||||||
|
|
||||||
@@ -29,6 +30,7 @@ pub mod transaction_ids;
|
|||||||
pub mod uiaa;
|
pub mod uiaa;
|
||||||
pub mod users;
|
pub mod users;
|
||||||
|
|
||||||
|
pub(crate) use once_services::OnceServices;
|
||||||
pub(crate) use service::{Args, Service};
|
pub(crate) use service::{Args, Service};
|
||||||
|
|
||||||
pub use crate::services::Services;
|
pub use crate::services::Services;
|
||||||
|
|||||||
52
src/service/once_services.rs
Normal file
52
src/service/once_services.rs
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
use std::{
|
||||||
|
ops::Deref,
|
||||||
|
sync::{Arc, OnceLock},
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::Services;
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub(crate) struct OnceServices {
|
||||||
|
lock: OnceLock<Arc<Services>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl OnceServices {
|
||||||
|
pub(super) fn set(&self, services: Arc<Services>) -> Arc<Services> {
|
||||||
|
self.lock.get_or_init(move || services).clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub(crate) fn get(&self) -> &Arc<Services> {
|
||||||
|
self.lock
|
||||||
|
.get()
|
||||||
|
.expect("services must be initialized")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Deref for OnceServices {
|
||||||
|
type Target = Arc<Services>;
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn deref(&self) -> &Self::Target { self.get() }
|
||||||
|
}
|
||||||
|
|
||||||
|
// SAFETY: Services has a lot of circularity inherited from Conduit's original
|
||||||
|
// design. This stresses the trait solver which twists itself into a knot
|
||||||
|
// proving Sendness. This issue was a lot worse in conduwuit where we used an
|
||||||
|
// instance of `Dep` for each Service rather than a single instance of
|
||||||
|
// `OnceServices` like now. The problem still exists though greatly reduced, and
|
||||||
|
// the same solution now has greater impact because OnceServices is the single
|
||||||
|
// unified focal-point for the entire Services call-web.
|
||||||
|
//
|
||||||
|
// The prior incarnation required this unsafety or it would blow through the
|
||||||
|
// recursion_limit; that no longer happens. Nevertheless compile times are
|
||||||
|
// still substantially reduced by asserting Sendness here. Prove sendness
|
||||||
|
// by simply commenting this out, it will just take longer.
|
||||||
|
unsafe impl Send for OnceServices {}
|
||||||
|
|
||||||
|
// SAFETY: Similar to Send as explained above, we further reduce compile-times
|
||||||
|
// by manually asserting Syncness of this type. The only threading contention
|
||||||
|
// concerns for this would be on startup but this server has a very well defined
|
||||||
|
// initialization sequence. After that this structure is purely read-only shared
|
||||||
|
// without concern.
|
||||||
|
unsafe impl Sync for OnceServices {}
|
||||||
@@ -1,13 +1,13 @@
|
|||||||
use std::{
|
use std::sync::Arc;
|
||||||
ops::Deref,
|
|
||||||
sync::{Arc, OnceLock},
|
|
||||||
};
|
|
||||||
|
|
||||||
use futures::{StreamExt, TryStreamExt};
|
use futures::{StreamExt, TryStreamExt};
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
use tuwunel_core::{Result, Server, debug, debug_info, err, info, trace};
|
use tuwunel_core::{
|
||||||
|
Result, Server, debug, debug_info, implement, info, trace, utils::stream::IterStream,
|
||||||
|
};
|
||||||
use tuwunel_database::Database;
|
use tuwunel_database::Database;
|
||||||
|
|
||||||
|
pub(crate) use crate::OnceServices;
|
||||||
use crate::{
|
use crate::{
|
||||||
account_data, admin, appservice, client, config, deactivate, emergency, federation, globals,
|
account_data, admin, appservice, client, config, deactivate, emergency, federation, globals,
|
||||||
key_backups,
|
key_backups,
|
||||||
@@ -65,202 +65,191 @@ pub struct Services {
|
|||||||
pub db: Arc<Database>,
|
pub db: Arc<Database>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct OnceServices {
|
#[implement(Services)]
|
||||||
lock: OnceLock<Arc<Services>>,
|
pub async fn build(server: Arc<Server>) -> Result<Arc<Self>> {
|
||||||
|
let db = Database::open(&server).await?;
|
||||||
|
let services = Arc::new(OnceServices::default());
|
||||||
|
macro_rules! build {
|
||||||
|
($tyname:ty) => {
|
||||||
|
<$tyname>::build(Args {
|
||||||
|
db: &db,
|
||||||
|
server: &server,
|
||||||
|
services: &services,
|
||||||
|
})?
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
let res = Arc::new(Self {
|
||||||
|
account_data: build!(account_data::Service),
|
||||||
|
admin: build!(admin::Service),
|
||||||
|
appservice: build!(appservice::Service),
|
||||||
|
resolver: build!(resolver::Service),
|
||||||
|
client: build!(client::Service),
|
||||||
|
config: build!(config::Service),
|
||||||
|
emergency: build!(emergency::Service),
|
||||||
|
globals: build!(globals::Service),
|
||||||
|
key_backups: build!(key_backups::Service),
|
||||||
|
media: build!(media::Service),
|
||||||
|
presence: build!(presence::Service),
|
||||||
|
pusher: build!(pusher::Service),
|
||||||
|
alias: build!(rooms::alias::Service),
|
||||||
|
auth_chain: build!(rooms::auth_chain::Service),
|
||||||
|
delete: build!(rooms::delete::Service),
|
||||||
|
directory: build!(rooms::directory::Service),
|
||||||
|
event_handler: build!(rooms::event_handler::Service),
|
||||||
|
lazy_loading: build!(rooms::lazy_loading::Service),
|
||||||
|
metadata: build!(rooms::metadata::Service),
|
||||||
|
pdu_metadata: build!(rooms::pdu_metadata::Service),
|
||||||
|
read_receipt: build!(rooms::read_receipt::Service),
|
||||||
|
search: build!(rooms::search::Service),
|
||||||
|
short: build!(rooms::short::Service),
|
||||||
|
spaces: build!(rooms::spaces::Service),
|
||||||
|
state: build!(rooms::state::Service),
|
||||||
|
state_accessor: build!(rooms::state_accessor::Service),
|
||||||
|
state_cache: build!(rooms::state_cache::Service),
|
||||||
|
state_compressor: build!(rooms::state_compressor::Service),
|
||||||
|
threads: build!(rooms::threads::Service),
|
||||||
|
timeline: build!(rooms::timeline::Service),
|
||||||
|
typing: build!(rooms::typing::Service),
|
||||||
|
user: build!(rooms::user::Service),
|
||||||
|
federation: build!(federation::Service),
|
||||||
|
sending: build!(sending::Service),
|
||||||
|
server_keys: build!(server_keys::Service),
|
||||||
|
sync: build!(sync::Service),
|
||||||
|
transaction_ids: build!(transaction_ids::Service),
|
||||||
|
uiaa: build!(uiaa::Service),
|
||||||
|
users: build!(users::Service),
|
||||||
|
membership: build!(membership::Service),
|
||||||
|
deactivate: build!(deactivate::Service),
|
||||||
|
|
||||||
|
manager: Mutex::new(None),
|
||||||
|
server,
|
||||||
|
db,
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(services.set(res))
|
||||||
}
|
}
|
||||||
|
|
||||||
impl OnceServices {
|
#[implement(Services)]
|
||||||
pub fn get_services(&self) -> &Arc<Services> {
|
pub(crate) fn services(&self) -> impl Iterator<Item = Arc<dyn Service>> + Send {
|
||||||
self.lock
|
macro_rules! cast {
|
||||||
.get()
|
($s:expr) => {
|
||||||
.expect("services must be initialized")
|
<Arc<dyn Service> as Into<_>>::into($s.clone())
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
[
|
||||||
|
cast!(self.account_data),
|
||||||
|
cast!(self.admin),
|
||||||
|
cast!(self.appservice),
|
||||||
|
cast!(self.resolver),
|
||||||
|
cast!(self.client),
|
||||||
|
cast!(self.config),
|
||||||
|
cast!(self.emergency),
|
||||||
|
cast!(self.globals),
|
||||||
|
cast!(self.key_backups),
|
||||||
|
cast!(self.media),
|
||||||
|
cast!(self.presence),
|
||||||
|
cast!(self.pusher),
|
||||||
|
cast!(self.alias),
|
||||||
|
cast!(self.auth_chain),
|
||||||
|
cast!(self.delete),
|
||||||
|
cast!(self.directory),
|
||||||
|
cast!(self.event_handler),
|
||||||
|
cast!(self.lazy_loading),
|
||||||
|
cast!(self.metadata),
|
||||||
|
cast!(self.pdu_metadata),
|
||||||
|
cast!(self.read_receipt),
|
||||||
|
cast!(self.search),
|
||||||
|
cast!(self.short),
|
||||||
|
cast!(self.spaces),
|
||||||
|
cast!(self.state),
|
||||||
|
cast!(self.state_accessor),
|
||||||
|
cast!(self.state_cache),
|
||||||
|
cast!(self.state_compressor),
|
||||||
|
cast!(self.threads),
|
||||||
|
cast!(self.timeline),
|
||||||
|
cast!(self.typing),
|
||||||
|
cast!(self.user),
|
||||||
|
cast!(self.federation),
|
||||||
|
cast!(self.sending),
|
||||||
|
cast!(self.server_keys),
|
||||||
|
cast!(self.sync),
|
||||||
|
cast!(self.transaction_ids),
|
||||||
|
cast!(self.uiaa),
|
||||||
|
cast!(self.users),
|
||||||
|
cast!(self.membership),
|
||||||
|
cast!(self.deactivate),
|
||||||
|
]
|
||||||
|
.into_iter()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[implement(Services)]
|
||||||
|
pub async fn start(self: &Arc<Self>) -> Result<Arc<Self>> {
|
||||||
|
debug_info!("Starting services...");
|
||||||
|
|
||||||
|
super::migrations::migrations(self).await?;
|
||||||
|
self.manager
|
||||||
|
.lock()
|
||||||
|
.await
|
||||||
|
.insert(Manager::new(self))
|
||||||
|
.clone()
|
||||||
|
.start()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
debug_info!("Services startup complete.");
|
||||||
|
|
||||||
|
Ok(Arc::clone(self))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[implement(Services)]
|
||||||
|
pub async fn stop(&self) {
|
||||||
|
info!("Shutting down services...");
|
||||||
|
|
||||||
|
self.interrupt().await;
|
||||||
|
if let Some(manager) = self.manager.lock().await.as_ref() {
|
||||||
|
manager.stop().await;
|
||||||
|
}
|
||||||
|
|
||||||
|
debug_info!("Services shutdown complete.");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[implement(Services)]
|
||||||
|
pub(crate) async fn interrupt(&self) {
|
||||||
|
debug!("Interrupting services...");
|
||||||
|
for service in self.services() {
|
||||||
|
let name = service.name();
|
||||||
|
trace!("Interrupting {name}");
|
||||||
|
service.interrupt().await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Deref for OnceServices {
|
#[implement(Services)]
|
||||||
type Target = Arc<Services>;
|
pub async fn poll(&self) -> Result {
|
||||||
|
if let Some(manager) = self.manager.lock().await.as_ref() {
|
||||||
|
return manager.poll().await;
|
||||||
|
}
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target { self.get_services() }
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Services {
|
#[implement(Services)]
|
||||||
#[allow(clippy::cognitive_complexity)]
|
pub async fn clear_cache(&self) {
|
||||||
pub async fn build(server: Arc<Server>) -> Result<Arc<Self>> {
|
self.services()
|
||||||
let db = Database::open(&server).await?;
|
.stream()
|
||||||
let services = Arc::new(OnceServices { lock: OnceLock::new() });
|
.for_each(async |service| {
|
||||||
macro_rules! build {
|
service.clear_cache().await;
|
||||||
($tyname:ty) => {
|
})
|
||||||
<$tyname>::build(Args {
|
.await;
|
||||||
db: &db,
|
}
|
||||||
server: &server,
|
|
||||||
services: &services,
|
#[implement(Services)]
|
||||||
})?
|
pub async fn memory_usage(&self) -> Result<String> {
|
||||||
};
|
self.services()
|
||||||
}
|
.try_stream()
|
||||||
|
.try_fold(String::new(), async |mut out, service| {
|
||||||
let res = Arc::new(Self {
|
service.memory_usage(&mut out).await?;
|
||||||
account_data: build!(account_data::Service),
|
Ok(out)
|
||||||
admin: build!(admin::Service),
|
})
|
||||||
appservice: build!(appservice::Service),
|
.await
|
||||||
resolver: build!(resolver::Service),
|
|
||||||
client: build!(client::Service),
|
|
||||||
config: build!(config::Service),
|
|
||||||
emergency: build!(emergency::Service),
|
|
||||||
globals: build!(globals::Service),
|
|
||||||
key_backups: build!(key_backups::Service),
|
|
||||||
media: build!(media::Service),
|
|
||||||
presence: build!(presence::Service),
|
|
||||||
pusher: build!(pusher::Service),
|
|
||||||
alias: build!(rooms::alias::Service),
|
|
||||||
auth_chain: build!(rooms::auth_chain::Service),
|
|
||||||
delete: build!(rooms::delete::Service),
|
|
||||||
directory: build!(rooms::directory::Service),
|
|
||||||
event_handler: build!(rooms::event_handler::Service),
|
|
||||||
lazy_loading: build!(rooms::lazy_loading::Service),
|
|
||||||
metadata: build!(rooms::metadata::Service),
|
|
||||||
pdu_metadata: build!(rooms::pdu_metadata::Service),
|
|
||||||
read_receipt: build!(rooms::read_receipt::Service),
|
|
||||||
search: build!(rooms::search::Service),
|
|
||||||
short: build!(rooms::short::Service),
|
|
||||||
spaces: build!(rooms::spaces::Service),
|
|
||||||
state: build!(rooms::state::Service),
|
|
||||||
state_accessor: build!(rooms::state_accessor::Service),
|
|
||||||
state_cache: build!(rooms::state_cache::Service),
|
|
||||||
state_compressor: build!(rooms::state_compressor::Service),
|
|
||||||
threads: build!(rooms::threads::Service),
|
|
||||||
timeline: build!(rooms::timeline::Service),
|
|
||||||
typing: build!(rooms::typing::Service),
|
|
||||||
user: build!(rooms::user::Service),
|
|
||||||
federation: build!(federation::Service),
|
|
||||||
sending: build!(sending::Service),
|
|
||||||
server_keys: build!(server_keys::Service),
|
|
||||||
sync: build!(sync::Service),
|
|
||||||
transaction_ids: build!(transaction_ids::Service),
|
|
||||||
uiaa: build!(uiaa::Service),
|
|
||||||
users: build!(users::Service),
|
|
||||||
membership: build!(membership::Service),
|
|
||||||
deactivate: build!(deactivate::Service),
|
|
||||||
|
|
||||||
manager: Mutex::new(None),
|
|
||||||
server,
|
|
||||||
db,
|
|
||||||
});
|
|
||||||
|
|
||||||
services
|
|
||||||
.lock
|
|
||||||
.set(res.clone())
|
|
||||||
.map_err(|_| err!("couldn't set services lock"))
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
Ok(res)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn start(self: &Arc<Self>) -> Result<Arc<Self>> {
|
|
||||||
debug_info!("Starting services...");
|
|
||||||
|
|
||||||
super::migrations::migrations(self).await?;
|
|
||||||
self.manager
|
|
||||||
.lock()
|
|
||||||
.await
|
|
||||||
.insert(Manager::new(self))
|
|
||||||
.clone()
|
|
||||||
.start()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
debug_info!("Services startup complete.");
|
|
||||||
|
|
||||||
Ok(Arc::clone(self))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn stop(&self) {
|
|
||||||
info!("Shutting down services...");
|
|
||||||
|
|
||||||
self.interrupt().await;
|
|
||||||
if let Some(manager) = self.manager.lock().await.as_ref() {
|
|
||||||
manager.stop().await;
|
|
||||||
}
|
|
||||||
|
|
||||||
debug_info!("Services shutdown complete.");
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn poll(&self) -> Result {
|
|
||||||
if let Some(manager) = self.manager.lock().await.as_ref() {
|
|
||||||
return manager.poll().await;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn services(&self) -> [Arc<dyn Service>; 41] {
|
|
||||||
[
|
|
||||||
self.account_data.clone(),
|
|
||||||
self.admin.clone(),
|
|
||||||
self.appservice.clone(),
|
|
||||||
self.resolver.clone(),
|
|
||||||
self.client.clone(),
|
|
||||||
self.config.clone(),
|
|
||||||
self.emergency.clone(),
|
|
||||||
self.globals.clone(),
|
|
||||||
self.key_backups.clone(),
|
|
||||||
self.media.clone(),
|
|
||||||
self.presence.clone(),
|
|
||||||
self.pusher.clone(),
|
|
||||||
self.alias.clone(),
|
|
||||||
self.auth_chain.clone(),
|
|
||||||
self.delete.clone(),
|
|
||||||
self.directory.clone(),
|
|
||||||
self.event_handler.clone(),
|
|
||||||
self.lazy_loading.clone(),
|
|
||||||
self.metadata.clone(),
|
|
||||||
self.pdu_metadata.clone(),
|
|
||||||
self.read_receipt.clone(),
|
|
||||||
self.search.clone(),
|
|
||||||
self.short.clone(),
|
|
||||||
self.spaces.clone(),
|
|
||||||
self.state.clone(),
|
|
||||||
self.state_accessor.clone(),
|
|
||||||
self.state_cache.clone(),
|
|
||||||
self.state_compressor.clone(),
|
|
||||||
self.threads.clone(),
|
|
||||||
self.timeline.clone(),
|
|
||||||
self.typing.clone(),
|
|
||||||
self.user.clone(),
|
|
||||||
self.federation.clone(),
|
|
||||||
self.sending.clone(),
|
|
||||||
self.server_keys.clone(),
|
|
||||||
self.sync.clone(),
|
|
||||||
self.transaction_ids.clone(),
|
|
||||||
self.uiaa.clone(),
|
|
||||||
self.users.clone(),
|
|
||||||
self.membership.clone(),
|
|
||||||
self.deactivate.clone(),
|
|
||||||
]
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn clear_cache(&self) {
|
|
||||||
futures::stream::iter(self.services())
|
|
||||||
.for_each(async |service| {
|
|
||||||
service.clear_cache().await;
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn memory_usage(&self) -> Result<String> {
|
|
||||||
futures::stream::iter(self.services())
|
|
||||||
.map(Ok)
|
|
||||||
.try_fold(String::new(), async |mut out, service| {
|
|
||||||
service.memory_usage(&mut out).await?;
|
|
||||||
Ok(out)
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn interrupt(&self) {
|
|
||||||
debug!("Interrupting services...");
|
|
||||||
for service in self.services() {
|
|
||||||
let name = service.name();
|
|
||||||
trace!("Interrupting {name}");
|
|
||||||
service.interrupt().await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user