diff --git a/src/main/runtime.rs b/src/main/runtime.rs index 77476a65..bb7ed02f 100644 --- a/src/main/runtime.rs +++ b/src/main/runtime.rs @@ -22,11 +22,12 @@ use tuwunel_core::{ use crate::{Args, Server}; -const WORKER_NAME: &str = "tuwunel:worker"; -const WORKER_MIN: usize = 2; -const WORKER_KEEPALIVE: u64 = 36; -const MAX_BLOCKING_THREADS: usize = 1024; -const SHUTDOWN_TIMEOUT: Duration = Duration::from_millis(10000); +const WORKER_THREAD_NAME: &str = "tuwunel:worker"; +const WORKER_THREAD_MIN: usize = 2; +const BLOCKING_THREAD_KEEPALIVE: u64 = 36; +const BLOCKING_THREAD_NAME: &str = "tuwunel:spawned"; +const BLOCKING_THREAD_MAX: usize = 1024; +const RUNTIME_SHUTDOWN_TIMEOUT: Duration = Duration::from_millis(10000); #[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))] const DISABLE_MUZZY_THRESHOLD: usize = 8; @@ -34,6 +35,9 @@ static WORKER_AFFINITY: OnceLock = OnceLock::new(); static GC_ON_PARK: OnceLock> = OnceLock::new(); static GC_MUZZY: OnceLock> = OnceLock::new(); +static CORES_OCCUPIED: AtomicUsize = AtomicUsize::new(0); +static THREAD_SPAWNS: AtomicUsize = AtomicUsize::new(0); + pub fn new(args: Option<&Args>) -> Result { let args_default = args.is_none().then(Args::default); let args = args.unwrap_or_else(|| args_default.as_ref().expect("default arguments")); @@ -54,16 +58,16 @@ pub fn new(args: Option<&Args>) -> Result { .expect("obtained RLIMIT_NPROC or default") .0 .saturating_div(3) - .clamp(WORKER_MIN, MAX_BLOCKING_THREADS); + .clamp(WORKER_THREAD_MIN, BLOCKING_THREAD_MAX); let mut builder = Builder::new_multi_thread(); builder .enable_io() .enable_time() - .thread_name(WORKER_NAME) - .worker_threads(args.worker_threads.max(WORKER_MIN)) + .thread_name_fn(thread_name) + .worker_threads(args.worker_threads.max(WORKER_THREAD_MIN)) .max_blocking_threads(max_blocking_threads) - .thread_keep_alive(Duration::from_secs(WORKER_KEEPALIVE)) + .thread_keep_alive(Duration::from_secs(BLOCKING_THREAD_KEEPALIVE)) .global_queue_interval(args.global_event_interval) .event_interval(args.kernel_event_interval) .max_io_events_per_tick(args.kernel_events_per_tick) @@ -132,11 +136,11 @@ pub fn shutdown(server: &Arc, runtime: Runtime) -> Result { fn wait_shutdown(_server: &Arc, runtime: Runtime) { debug!( - timeout = ?SHUTDOWN_TIMEOUT, + timeout = ?RUNTIME_SHUTDOWN_TIMEOUT, "Waiting for runtime..." ); - runtime.shutdown_timeout(SHUTDOWN_TIMEOUT); + runtime.shutdown_timeout(RUNTIME_SHUTDOWN_TIMEOUT); // Join any jemalloc threads so they don't appear in use at exit. #[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))] @@ -145,33 +149,45 @@ fn wait_shutdown(_server: &Arc, runtime: Runtime) { .ok(); } +fn thread_name() -> String { + let handle = Handle::current(); + let num_workers = handle.metrics().num_workers(); + let i = THREAD_SPAWNS.load(Ordering::Acquire); + + if i >= num_workers { + BLOCKING_THREAD_NAME.into() + } else { + WORKER_THREAD_NAME.into() + } +} + #[tracing::instrument( name = "fork", level = "debug", skip_all, fields( - id = ?thread::current().id(), + tid = ?thread::current().id(), name = %thread::current().name().unwrap_or("None"), ), )] fn thread_start() { - debug_assert_eq!( - Some(WORKER_NAME), - thread::current().name(), + debug_assert!( + thread::current().name() == Some(WORKER_THREAD_NAME) + || thread::current().name() == Some(BLOCKING_THREAD_NAME), "tokio worker name mismatch at thread start" ); if WORKER_AFFINITY.get().is_some_and(is_true!()) { set_worker_affinity(); } + + THREAD_SPAWNS.fetch_add(1, Ordering::AcqRel); } fn set_worker_affinity() { - static CORES_OCCUPIED: AtomicUsize = AtomicUsize::new(0); - let handle = Handle::current(); let num_workers = handle.metrics().num_workers(); - let i = CORES_OCCUPIED.fetch_add(1, Ordering::Relaxed); + let i = CORES_OCCUPIED.fetch_add(1, Ordering::AcqRel); if i >= num_workers { return; } @@ -208,7 +224,7 @@ fn set_worker_mallctl(_: usize) {} level = "debug", skip_all, fields( - id = ?thread::current().id(), + tid = ?thread::current().id(), name = %thread::current().name().unwrap_or("None"), ), )] @@ -225,7 +241,7 @@ fn thread_stop() { level = "trace", skip_all, fields( - id = ?thread::current().id(), + tid = ?thread::current().id(), name = %thread::current().name().unwrap_or("None"), ), )] @@ -236,7 +252,7 @@ fn thread_unpark() {} level = "trace", skip_all, fields( - id = ?thread::current().id(), + tid = ?thread::current().id(), name = %thread::current().name().unwrap_or("None"), ), )] @@ -265,6 +281,7 @@ fn gc_on_park() { skip_all, fields( id = %meta.id(), + tid = ?thread::current().id(), ), )] fn task_spawn(meta: &tokio::runtime::TaskMeta<'_>) {} @@ -275,7 +292,8 @@ fn task_spawn(meta: &tokio::runtime::TaskMeta<'_>) {} level = "trace", skip_all, fields( - id = %meta.id() + id = %meta.id(), + tid = ?thread::current().id() ), )] fn task_terminate(meta: &tokio::runtime::TaskMeta<'_>) {} @@ -286,7 +304,8 @@ fn task_terminate(meta: &tokio::runtime::TaskMeta<'_>) {} level = "trace", skip_all, fields( - id = %meta.id() + id = %meta.id(), + tid = ?thread::current().id() ), )] fn task_enter(meta: &tokio::runtime::TaskMeta<'_>) {} @@ -297,7 +316,8 @@ fn task_enter(meta: &tokio::runtime::TaskMeta<'_>) {} level = "trace", skip_all, fields( - id = %meta.id() + id = %meta.id(), + tid = ?thread::current().id() ), )] fn task_leave(meta: &tokio::runtime::TaskMeta<'_>) {}