diff --git a/src/service/admin/execute.rs b/src/service/admin/execute.rs index a46c53d7..1f7ba2e3 100644 --- a/src/service/admin/execute.rs +++ b/src/service/admin/execute.rs @@ -1,5 +1,8 @@ use ruma::events::room::message::RoomMessageEventContent; -use tokio::time::{Duration, sleep}; +use tokio::{ + task::yield_now, + time::{Duration, sleep}, +}; use tuwunel_core::{Err, Result, debug, debug_info, error, implement, info}; pub(super) const SIGNAL: &str = "SIGUSR2"; @@ -16,7 +19,7 @@ pub(super) async fn console_auto_start(&self) { .admin_console_automatic { // Allow more of the startup sequence to execute before spawning - tokio::task::yield_now().await; + yield_now().await; self.console.start(); } } @@ -31,7 +34,7 @@ pub(super) async fn console_auto_stop(&self) { /// Execute admin commands after startup #[implement(super::Service)] -pub(super) async fn startup_execute(&self) -> Result { +pub(crate) async fn startup_execute(&self) -> Result { // List of commands to execute let commands = &self.services.server.config.admin_execute; @@ -46,8 +49,16 @@ pub(super) async fn startup_execute(&self) -> Result { .config .admin_execute_errors_ignore; - //TODO: remove this after run-states are broadcast - sleep(Duration::from_millis(500)).await; + if !commands.is_empty() { + for i in 0..20 { + if self.handle.read().await.is_some() { + break; + } + + debug!(?i, "Waiting for admin module to load for startup commands..."); + sleep(Duration::from_millis(250)).await; + } + } for (i, command) in commands.iter().enumerate() { if let Err(e) = self.execute_command(i, command.clone()).await @@ -56,7 +67,7 @@ pub(super) async fn startup_execute(&self) -> Result { return Err(e); } - tokio::task::yield_now().await; + yield_now().await; } // The smoketest functionality is placed here for now and simply initiates @@ -98,7 +109,7 @@ pub(super) async fn signal_execute(&self) -> Result { return Err(e); } - tokio::task::yield_now().await; + yield_now().await; } Ok(()) diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index 51304c1f..09718bf4 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -86,7 +86,6 @@ impl crate::Service for Service { .expect("locked for writing") .insert(sender); - self.startup_execute().await?; self.console_auto_start().await; loop { diff --git a/src/service/manager.rs b/src/service/manager.rs index 32e09ac6..d0a8d66d 100644 --- a/src/service/manager.rs +++ b/src/service/manager.rs @@ -3,7 +3,7 @@ use std::{panic::AssertUnwindSafe, sync::Arc, time::Duration}; use futures::{FutureExt, TryFutureExt}; use tokio::{ sync::{Mutex, MutexGuard}, - task::{JoinHandle, JoinSet}, + task::{JoinHandle, JoinSet, yield_now}, time::sleep, }; use tuwunel_core::{ @@ -60,6 +60,8 @@ impl Manager { self.start_worker(&mut workers, &service)?; } + yield_now().await; + Ok(()) } @@ -72,7 +74,7 @@ impl Manager { } } - async fn worker(&self) -> Result { + async fn worker(self: &Arc) -> Result { loop { let mut workers = self.workers.lock().await; tokio::select! { @@ -95,7 +97,7 @@ impl Manager { } async fn handle_result( - &self, + self: &Arc, workers: &mut WorkersLocked<'_>, result: WorkerResult, ) -> Result { @@ -108,7 +110,7 @@ impl Manager { #[expect(clippy::unused_self)] fn handle_finished( - &self, + self: &Arc, _workers: &mut WorkersLocked<'_>, service: &Arc, ) -> Result { @@ -117,7 +119,7 @@ impl Manager { } async fn handle_error( - &self, + self: &Arc, workers: &mut WorkersLocked<'_>, service: &Arc, error: Error, @@ -143,7 +145,7 @@ impl Manager { /// Start the worker in a task for the service. fn start_worker( - &self, + self: &Arc, workers: &mut WorkersLocked<'_>, service: &Arc, ) -> Result { @@ -155,7 +157,7 @@ impl Manager { } debug!("Service {:?} worker starting...", service.name()); - workers.spawn_on(worker(service.clone()), self.server.runtime()); + workers.spawn_on(worker(service.clone(), self.clone()), self.server.runtime()); Ok(()) } @@ -172,7 +174,7 @@ impl Manager { skip_all, fields(service = %service.name()), )] -async fn worker(service: Arc) -> WorkerResult { +async fn worker(service: Arc, _mgr: Arc) -> WorkerResult { let service_ = Arc::clone(&service); let result = AssertUnwindSafe(service_.worker()) .catch_unwind() diff --git a/src/service/services.rs b/src/service/services.rs index 0ab66689..2c32e357 100644 --- a/src/service/services.rs +++ b/src/service/services.rs @@ -199,6 +199,7 @@ pub async fn start(self: &Arc) -> Result> { debug_info!("Starting services..."); super::migrations::migrations(self).await?; + self.manager .lock() .await @@ -207,6 +208,8 @@ pub async fn start(self: &Arc) -> Result> { .start() .await?; + self.admin.startup_execute().await?; + debug_info!("Services startup complete."); Ok(Arc::clone(self))