Add TaskMonitor interval metrics w/ admin command.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2026-03-02 02:10:55 +00:00
parent bf8ae48ec2
commit 9e75453303
6 changed files with 90 additions and 12 deletions

View File

@@ -22,6 +22,7 @@ use tuwunel_core::{
Event, Event,
pdu::{PduEvent, PduId, RawPduId}, pdu::{PduEvent, PduId, RawPduId},
}, },
tokio_metrics::TaskMonitor,
trace, utils, trace, utils,
utils::{ utils::{
stream::{IterStream, ReadyExt}, stream::{IterStream, ReadyExt},
@@ -790,6 +791,37 @@ pub(super) async fn runtime_interval(&self) -> Result {
.await .await
} }
#[admin_command]
pub(super) async fn task_metrics(&self) -> Result {
let out = self
.services
.server
.metrics
.task_metrics()
.map(TaskMonitor::cumulative)
.map_or_else(
|| "Task metrics are not available.".to_owned(),
|metrics| format!("```rs\n{metrics:#?}\n```"),
);
self.write_str(&out).await
}
#[admin_command]
pub(super) async fn task_interval(&self) -> Result {
let out = self
.services
.server
.metrics
.task_interval()
.map_or_else(
|| "Task metrics are not available.".to_owned(),
|metrics| format!("```rs\n{metrics:#?}\n```"),
);
self.write_str(&out).await
}
#[admin_command] #[admin_command]
pub(super) async fn time(&self) -> Result { pub(super) async fn time(&self) -> Result {
let now = SystemTime::now(); let now = SystemTime::now();

View File

@@ -197,6 +197,13 @@ pub(super) enum DebugCommand {
/// invocation. /// invocation.
RuntimeInterval, RuntimeInterval,
/// - Print detailed tokio task metrics accumulated in total.
TaskMetrics,
/// - Print detailed tokio task metrics accumulated since last command
/// invocation.
TaskInterval,
/// - Print the current time /// - Print the current time
Time, Time,

View File

@@ -1,9 +1,9 @@
use std::sync::atomic::{AtomicU32, AtomicU64}; use std::sync::atomic::{AtomicU32, AtomicU64};
use tokio::runtime; use tokio::runtime;
use tokio_metrics::TaskMonitor;
#[cfg(tokio_unstable)] #[cfg(tokio_unstable)]
use tokio_metrics::{RuntimeIntervals, RuntimeMonitor}; use tokio_metrics::{RuntimeIntervals, RuntimeMonitor};
use tokio_metrics::{TaskMetrics, TaskMonitor};
pub struct Metrics { pub struct Metrics {
_runtime: Option<runtime::Handle>, _runtime: Option<runtime::Handle>,
@@ -12,6 +12,8 @@ pub struct Metrics {
task_monitor: Option<TaskMonitor>, task_monitor: Option<TaskMonitor>,
task_intervals: std::sync::Mutex<Option<Box<dyn Iterator<Item = TaskMetrics> + Send>>>,
#[cfg(tokio_unstable)] #[cfg(tokio_unstable)]
_runtime_monitor: Option<RuntimeMonitor>, _runtime_monitor: Option<RuntimeMonitor>,
@@ -27,21 +29,37 @@ pub struct Metrics {
impl Metrics { impl Metrics {
#[must_use] #[must_use]
pub fn new(runtime: Option<runtime::Handle>) -> Self { pub fn new(runtime: Option<&runtime::Handle>) -> Self {
#[cfg(tokio_unstable)] #[cfg(tokio_unstable)]
let runtime_monitor = runtime.as_ref().map(RuntimeMonitor::new); let runtime_monitor = runtime.map(RuntimeMonitor::new);
#[cfg(tokio_unstable)] #[cfg(tokio_unstable)]
let runtime_intervals = runtime_monitor let runtime_intervals = runtime_monitor
.as_ref() .as_ref()
.map(RuntimeMonitor::intervals); .map(RuntimeMonitor::intervals);
let task_monitor = cfg!(tokio_unstable).then(|| {
TaskMonitor::builder()
.with_slow_poll_threshold(TaskMonitor::DEFAULT_SLOW_POLL_THRESHOLD)
.with_long_delay_threshold(TaskMonitor::DEFAULT_LONG_DELAY_THRESHOLD)
.clone()
.build()
});
let task_intervals = task_monitor.as_ref().map(
|task_monitor| -> Box<dyn Iterator<Item = TaskMetrics> + Send> {
Box::new(task_monitor.intervals())
},
);
Self { Self {
_runtime: runtime.clone(), _runtime: runtime.cloned(),
runtime_metrics: runtime.as_ref().map(runtime::Handle::metrics), runtime_metrics: runtime.map(runtime::Handle::metrics),
task_monitor: runtime.map(|_| TaskMonitor::new()), task_monitor,
task_intervals: task_intervals.into(),
#[cfg(tokio_unstable)] #[cfg(tokio_unstable)]
_runtime_monitor: runtime_monitor, _runtime_monitor: runtime_monitor,
@@ -56,6 +74,26 @@ impl Metrics {
} }
} }
#[inline]
pub async fn instrument<F, Output>(&self, f: F) -> Output
where
F: Future<Output = Output>,
{
if let Some(monitor) = self.task_metrics() {
monitor.instrument(f).await
} else {
f.await
}
}
pub fn task_interval(&self) -> Option<TaskMetrics> {
self.task_intervals
.lock()
.expect("locked")
.as_mut()
.and_then(Iterator::next)
}
#[cfg(tokio_unstable)] #[cfg(tokio_unstable)]
pub fn runtime_interval(&self) -> Option<tokio_metrics::RuntimeMetrics> { pub fn runtime_interval(&self) -> Option<tokio_metrics::RuntimeMetrics> {
self.runtime_intervals self.runtime_intervals
@@ -66,15 +104,15 @@ impl Metrics {
.expect("next interval") .expect("next interval")
} }
#[inline]
pub fn task_root(&self) -> Option<&TaskMonitor> { self.task_monitor.as_ref() }
#[inline] #[inline]
pub fn num_workers(&self) -> usize { pub fn num_workers(&self) -> usize {
self.runtime_metrics() self.runtime_metrics()
.map_or(0, runtime::RuntimeMetrics::num_workers) .map_or(0, runtime::RuntimeMetrics::num_workers)
} }
#[inline]
pub fn task_metrics(&self) -> Option<&TaskMonitor> { self.task_monitor.as_ref() }
#[inline] #[inline]
pub fn runtime_metrics(&self) -> Option<&runtime::RuntimeMetrics> { pub fn runtime_metrics(&self) -> Option<&runtime::RuntimeMetrics> {
self.runtime_metrics.as_ref() self.runtime_metrics.as_ref()

View File

@@ -20,6 +20,7 @@ pub use ::jsonwebtoken as jwt;
pub use ::ruma; pub use ::ruma;
pub use ::smallstr; pub use ::smallstr;
pub use ::smallvec; pub use ::smallvec;
pub use ::tokio_metrics;
pub use ::toml; pub use ::toml;
pub use ::tracing; pub use ::tracing;
pub use config::Config; pub use config::Config;

View File

@@ -49,7 +49,7 @@ pub struct Server {
impl Server { impl Server {
#[must_use] #[must_use]
pub fn new(config: Config, runtime: Option<runtime::Handle>, log: Logging) -> Self { pub fn new(config: Config, runtime: Option<&runtime::Handle>, log: Logging) -> Self {
Self { Self {
name: config.server_name.clone(), name: config.server_name.clone(),
config: config::Manager::new(config), config: config::Manager::new(config),
@@ -57,7 +57,7 @@ impl Server {
stopping: AtomicBool::new(false), stopping: AtomicBool::new(false),
reloading: AtomicBool::new(false), reloading: AtomicBool::new(false),
restarting: AtomicBool::new(false), restarting: AtomicBool::new(false),
runtime: runtime.clone(), runtime: runtime.cloned(),
signal: broadcast::channel::<&'static str>(1).0, signal: broadcast::channel::<&'static str>(1).0,
log, log,
metrics: Metrics::new(runtime), metrics: Metrics::new(runtime),

View File

@@ -67,7 +67,7 @@ pub fn new(args: Option<&Args>, runtime: Option<&runtime::Handle>) -> Result<Arc
); );
Ok(Arc::new(Self { Ok(Arc::new(Self {
server: Arc::new(tuwunel_core::Server::new(config, runtime.cloned(), logger)), server: Arc::new(tuwunel_core::Server::new(config, runtime, logger)),
services: None.into(), services: None.into(),