Check and wait loop for admin module load on startup. (fixes #320)

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2026-03-11 14:16:56 +00:00
parent e9864bc4e7
commit a656aba615
4 changed files with 31 additions and 16 deletions

View File

@@ -1,5 +1,8 @@
use ruma::events::room::message::RoomMessageEventContent; use ruma::events::room::message::RoomMessageEventContent;
use tokio::time::{Duration, sleep}; use tokio::{
task::yield_now,
time::{Duration, sleep},
};
use tuwunel_core::{Err, Result, debug, debug_info, error, implement, info}; use tuwunel_core::{Err, Result, debug, debug_info, error, implement, info};
pub(super) const SIGNAL: &str = "SIGUSR2"; pub(super) const SIGNAL: &str = "SIGUSR2";
@@ -16,7 +19,7 @@ pub(super) async fn console_auto_start(&self) {
.admin_console_automatic .admin_console_automatic
{ {
// Allow more of the startup sequence to execute before spawning // Allow more of the startup sequence to execute before spawning
tokio::task::yield_now().await; yield_now().await;
self.console.start(); self.console.start();
} }
} }
@@ -31,7 +34,7 @@ pub(super) async fn console_auto_stop(&self) {
/// Execute admin commands after startup /// Execute admin commands after startup
#[implement(super::Service)] #[implement(super::Service)]
pub(super) async fn startup_execute(&self) -> Result { pub(crate) async fn startup_execute(&self) -> Result {
// List of commands to execute // List of commands to execute
let commands = &self.services.server.config.admin_execute; let commands = &self.services.server.config.admin_execute;
@@ -46,8 +49,16 @@ pub(super) async fn startup_execute(&self) -> Result {
.config .config
.admin_execute_errors_ignore; .admin_execute_errors_ignore;
//TODO: remove this after run-states are broadcast if !commands.is_empty() {
sleep(Duration::from_millis(500)).await; for i in 0..20 {
if self.handle.read().await.is_some() {
break;
}
debug!(?i, "Waiting for admin module to load for startup commands...");
sleep(Duration::from_millis(250)).await;
}
}
for (i, command) in commands.iter().enumerate() { for (i, command) in commands.iter().enumerate() {
if let Err(e) = self.execute_command(i, command.clone()).await if let Err(e) = self.execute_command(i, command.clone()).await
@@ -56,7 +67,7 @@ pub(super) async fn startup_execute(&self) -> Result {
return Err(e); return Err(e);
} }
tokio::task::yield_now().await; yield_now().await;
} }
// The smoketest functionality is placed here for now and simply initiates // The smoketest functionality is placed here for now and simply initiates
@@ -98,7 +109,7 @@ pub(super) async fn signal_execute(&self) -> Result {
return Err(e); return Err(e);
} }
tokio::task::yield_now().await; yield_now().await;
} }
Ok(()) Ok(())

View File

@@ -86,7 +86,6 @@ impl crate::Service for Service {
.expect("locked for writing") .expect("locked for writing")
.insert(sender); .insert(sender);
self.startup_execute().await?;
self.console_auto_start().await; self.console_auto_start().await;
loop { loop {

View File

@@ -3,7 +3,7 @@ use std::{panic::AssertUnwindSafe, sync::Arc, time::Duration};
use futures::{FutureExt, TryFutureExt}; use futures::{FutureExt, TryFutureExt};
use tokio::{ use tokio::{
sync::{Mutex, MutexGuard}, sync::{Mutex, MutexGuard},
task::{JoinHandle, JoinSet}, task::{JoinHandle, JoinSet, yield_now},
time::sleep, time::sleep,
}; };
use tuwunel_core::{ use tuwunel_core::{
@@ -60,6 +60,8 @@ impl Manager {
self.start_worker(&mut workers, &service)?; self.start_worker(&mut workers, &service)?;
} }
yield_now().await;
Ok(()) Ok(())
} }
@@ -72,7 +74,7 @@ impl Manager {
} }
} }
async fn worker(&self) -> Result { async fn worker(self: &Arc<Self>) -> Result {
loop { loop {
let mut workers = self.workers.lock().await; let mut workers = self.workers.lock().await;
tokio::select! { tokio::select! {
@@ -95,7 +97,7 @@ impl Manager {
} }
async fn handle_result( async fn handle_result(
&self, self: &Arc<Self>,
workers: &mut WorkersLocked<'_>, workers: &mut WorkersLocked<'_>,
result: WorkerResult, result: WorkerResult,
) -> Result { ) -> Result {
@@ -108,7 +110,7 @@ impl Manager {
#[expect(clippy::unused_self)] #[expect(clippy::unused_self)]
fn handle_finished( fn handle_finished(
&self, self: &Arc<Self>,
_workers: &mut WorkersLocked<'_>, _workers: &mut WorkersLocked<'_>,
service: &Arc<dyn Service>, service: &Arc<dyn Service>,
) -> Result { ) -> Result {
@@ -117,7 +119,7 @@ impl Manager {
} }
async fn handle_error( async fn handle_error(
&self, self: &Arc<Self>,
workers: &mut WorkersLocked<'_>, workers: &mut WorkersLocked<'_>,
service: &Arc<dyn Service>, service: &Arc<dyn Service>,
error: Error, error: Error,
@@ -143,7 +145,7 @@ impl Manager {
/// Start the worker in a task for the service. /// Start the worker in a task for the service.
fn start_worker( fn start_worker(
&self, self: &Arc<Self>,
workers: &mut WorkersLocked<'_>, workers: &mut WorkersLocked<'_>,
service: &Arc<dyn Service>, service: &Arc<dyn Service>,
) -> Result { ) -> Result {
@@ -155,7 +157,7 @@ impl Manager {
} }
debug!("Service {:?} worker starting...", service.name()); debug!("Service {:?} worker starting...", service.name());
workers.spawn_on(worker(service.clone()), self.server.runtime()); workers.spawn_on(worker(service.clone(), self.clone()), self.server.runtime());
Ok(()) Ok(())
} }
@@ -172,7 +174,7 @@ impl Manager {
skip_all, skip_all,
fields(service = %service.name()), fields(service = %service.name()),
)] )]
async fn worker(service: Arc<dyn Service>) -> WorkerResult { async fn worker(service: Arc<dyn Service>, _mgr: Arc<Manager>) -> WorkerResult {
let service_ = Arc::clone(&service); let service_ = Arc::clone(&service);
let result = AssertUnwindSafe(service_.worker()) let result = AssertUnwindSafe(service_.worker())
.catch_unwind() .catch_unwind()

View File

@@ -199,6 +199,7 @@ pub async fn start(self: &Arc<Self>) -> Result<Arc<Self>> {
debug_info!("Starting services..."); debug_info!("Starting services...");
super::migrations::migrations(self).await?; super::migrations::migrations(self).await?;
self.manager self.manager
.lock() .lock()
.await .await
@@ -207,6 +208,8 @@ pub async fn start(self: &Arc<Self>) -> Result<Arc<Self>> {
.start() .start()
.await?; .await?;
self.admin.startup_execute().await?;
debug_info!("Services startup complete."); debug_info!("Services startup complete.");
Ok(Arc::clone(self)) Ok(Arc::clone(self))