diff --git a/src/admin/debug/commands.rs b/src/admin/debug/commands.rs index cc9897d2..21b7f82c 100644 --- a/src/admin/debug/commands.rs +++ b/src/admin/debug/commands.rs @@ -22,6 +22,7 @@ use tuwunel_core::{ Event, pdu::{PduEvent, PduId, RawPduId}, }, + tokio_metrics::TaskMonitor, trace, utils, utils::{ stream::{IterStream, ReadyExt}, @@ -790,6 +791,37 @@ pub(super) async fn runtime_interval(&self) -> Result { .await } +#[admin_command] +pub(super) async fn task_metrics(&self) -> Result { + let out = self + .services + .server + .metrics + .task_metrics() + .map(TaskMonitor::cumulative) + .map_or_else( + || "Task metrics are not available.".to_owned(), + |metrics| format!("```rs\n{metrics:#?}\n```"), + ); + + self.write_str(&out).await +} + +#[admin_command] +pub(super) async fn task_interval(&self) -> Result { + let out = self + .services + .server + .metrics + .task_interval() + .map_or_else( + || "Task metrics are not available.".to_owned(), + |metrics| format!("```rs\n{metrics:#?}\n```"), + ); + + self.write_str(&out).await +} + #[admin_command] pub(super) async fn time(&self) -> Result { let now = SystemTime::now(); diff --git a/src/admin/debug/mod.rs b/src/admin/debug/mod.rs index 66c8a0fd..0aa02de2 100644 --- a/src/admin/debug/mod.rs +++ b/src/admin/debug/mod.rs @@ -197,6 +197,13 @@ pub(super) enum DebugCommand { /// invocation. RuntimeInterval, + /// - Print detailed tokio task metrics accumulated in total. + TaskMetrics, + + /// - Print detailed tokio task metrics accumulated since last command + /// invocation. + TaskInterval, + /// - Print the current time Time, diff --git a/src/core/metrics/mod.rs b/src/core/metrics/mod.rs index 95588caa..46e19e8d 100644 --- a/src/core/metrics/mod.rs +++ b/src/core/metrics/mod.rs @@ -1,9 +1,9 @@ use std::sync::atomic::{AtomicU32, AtomicU64}; use tokio::runtime; -use tokio_metrics::TaskMonitor; #[cfg(tokio_unstable)] use tokio_metrics::{RuntimeIntervals, RuntimeMonitor}; +use tokio_metrics::{TaskMetrics, TaskMonitor}; pub struct Metrics { _runtime: Option, @@ -12,6 +12,8 @@ pub struct Metrics { task_monitor: Option, + task_intervals: std::sync::Mutex + Send>>>, + #[cfg(tokio_unstable)] _runtime_monitor: Option, @@ -27,21 +29,37 @@ pub struct Metrics { impl Metrics { #[must_use] - pub fn new(runtime: Option) -> Self { + pub fn new(runtime: Option<&runtime::Handle>) -> Self { #[cfg(tokio_unstable)] - let runtime_monitor = runtime.as_ref().map(RuntimeMonitor::new); + let runtime_monitor = runtime.map(RuntimeMonitor::new); #[cfg(tokio_unstable)] let runtime_intervals = runtime_monitor .as_ref() .map(RuntimeMonitor::intervals); + let task_monitor = cfg!(tokio_unstable).then(|| { + TaskMonitor::builder() + .with_slow_poll_threshold(TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD) + .with_long_delay_threshold(TaskMonitor::DEFAULT_LONG_DELAY_THRESHOLD) + .clone() + .build() + }); + + let task_intervals = task_monitor.as_ref().map( + |task_monitor| -> Box + Send> { + Box::new(task_monitor.intervals()) + }, + ); + Self { - _runtime: runtime.clone(), + _runtime: runtime.cloned(), - runtime_metrics: runtime.as_ref().map(runtime::Handle::metrics), + runtime_metrics: runtime.map(runtime::Handle::metrics), - task_monitor: runtime.map(|_| TaskMonitor::new()), + task_monitor, + + task_intervals: task_intervals.into(), #[cfg(tokio_unstable)] _runtime_monitor: runtime_monitor, @@ -56,6 +74,26 @@ impl Metrics { } } + #[inline] + pub async fn instrument(&self, f: F) -> Output + where + F: Future, + { + if let Some(monitor) = self.task_metrics() { + monitor.instrument(f).await + } else { + f.await + } + } + + pub fn task_interval(&self) -> Option { + self.task_intervals + .lock() + .expect("locked") + .as_mut() + .and_then(Iterator::next) + } + #[cfg(tokio_unstable)] pub fn runtime_interval(&self) -> Option { self.runtime_intervals @@ -66,15 +104,15 @@ impl Metrics { .expect("next interval") } - #[inline] - pub fn task_root(&self) -> Option<&TaskMonitor> { self.task_monitor.as_ref() } - #[inline] pub fn num_workers(&self) -> usize { self.runtime_metrics() .map_or(0, runtime::RuntimeMetrics::num_workers) } + #[inline] + pub fn task_metrics(&self) -> Option<&TaskMonitor> { self.task_monitor.as_ref() } + #[inline] pub fn runtime_metrics(&self) -> Option<&runtime::RuntimeMetrics> { self.runtime_metrics.as_ref() diff --git a/src/core/mod.rs b/src/core/mod.rs index 8710e52c..ec0f559f 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -20,6 +20,7 @@ pub use ::jsonwebtoken as jwt; pub use ::ruma; pub use ::smallstr; pub use ::smallvec; +pub use ::tokio_metrics; pub use ::toml; pub use ::tracing; pub use config::Config; diff --git a/src/core/server.rs b/src/core/server.rs index fff6ba4c..7261bf7e 100644 --- a/src/core/server.rs +++ b/src/core/server.rs @@ -49,7 +49,7 @@ pub struct Server { impl Server { #[must_use] - pub fn new(config: Config, runtime: Option, log: Logging) -> Self { + pub fn new(config: Config, runtime: Option<&runtime::Handle>, log: Logging) -> Self { Self { name: config.server_name.clone(), config: config::Manager::new(config), @@ -57,7 +57,7 @@ impl Server { stopping: AtomicBool::new(false), reloading: AtomicBool::new(false), restarting: AtomicBool::new(false), - runtime: runtime.clone(), + runtime: runtime.cloned(), signal: broadcast::channel::<&'static str>(1).0, log, metrics: Metrics::new(runtime), diff --git a/src/main/server.rs b/src/main/server.rs index c707ac04..35e0bb6a 100644 --- a/src/main/server.rs +++ b/src/main/server.rs @@ -67,7 +67,7 @@ pub fn new(args: Option<&Args>, runtime: Option<&runtime::Handle>) -> Result