Improve tokio thread naming schema; cleanup.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2026-02-27 05:05:32 +00:00
parent 0933943dd6
commit cbbe370df2

View File

@@ -22,11 +22,12 @@ use tuwunel_core::{
use crate::{Args, Server}; use crate::{Args, Server};
const WORKER_NAME: &str = "tuwunel:worker"; const WORKER_THREAD_NAME: &str = "tuwunel:worker";
const WORKER_MIN: usize = 2; const WORKER_THREAD_MIN: usize = 2;
const WORKER_KEEPALIVE: u64 = 36; const BLOCKING_THREAD_KEEPALIVE: u64 = 36;
const MAX_BLOCKING_THREADS: usize = 1024; const BLOCKING_THREAD_NAME: &str = "tuwunel:spawned";
const SHUTDOWN_TIMEOUT: Duration = Duration::from_millis(10000); const BLOCKING_THREAD_MAX: usize = 1024;
const RUNTIME_SHUTDOWN_TIMEOUT: Duration = Duration::from_millis(10000);
#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))] #[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
const DISABLE_MUZZY_THRESHOLD: usize = 8; const DISABLE_MUZZY_THRESHOLD: usize = 8;
@@ -34,6 +35,9 @@ static WORKER_AFFINITY: OnceLock<bool> = OnceLock::new();
static GC_ON_PARK: OnceLock<Option<bool>> = OnceLock::new(); static GC_ON_PARK: OnceLock<Option<bool>> = OnceLock::new();
static GC_MUZZY: OnceLock<Option<bool>> = OnceLock::new(); static GC_MUZZY: OnceLock<Option<bool>> = OnceLock::new();
static CORES_OCCUPIED: AtomicUsize = AtomicUsize::new(0);
static THREAD_SPAWNS: AtomicUsize = AtomicUsize::new(0);
pub fn new(args: Option<&Args>) -> Result<Runtime> { pub fn new(args: Option<&Args>) -> Result<Runtime> {
let args_default = args.is_none().then(Args::default); let args_default = args.is_none().then(Args::default);
let args = args.unwrap_or_else(|| args_default.as_ref().expect("default arguments")); let args = args.unwrap_or_else(|| args_default.as_ref().expect("default arguments"));
@@ -54,16 +58,16 @@ pub fn new(args: Option<&Args>) -> Result<Runtime> {
.expect("obtained RLIMIT_NPROC or default") .expect("obtained RLIMIT_NPROC or default")
.0 .0
.saturating_div(3) .saturating_div(3)
.clamp(WORKER_MIN, MAX_BLOCKING_THREADS); .clamp(WORKER_THREAD_MIN, BLOCKING_THREAD_MAX);
let mut builder = Builder::new_multi_thread(); let mut builder = Builder::new_multi_thread();
builder builder
.enable_io() .enable_io()
.enable_time() .enable_time()
.thread_name(WORKER_NAME) .thread_name_fn(thread_name)
.worker_threads(args.worker_threads.max(WORKER_MIN)) .worker_threads(args.worker_threads.max(WORKER_THREAD_MIN))
.max_blocking_threads(max_blocking_threads) .max_blocking_threads(max_blocking_threads)
.thread_keep_alive(Duration::from_secs(WORKER_KEEPALIVE)) .thread_keep_alive(Duration::from_secs(BLOCKING_THREAD_KEEPALIVE))
.global_queue_interval(args.global_event_interval) .global_queue_interval(args.global_event_interval)
.event_interval(args.kernel_event_interval) .event_interval(args.kernel_event_interval)
.max_io_events_per_tick(args.kernel_events_per_tick) .max_io_events_per_tick(args.kernel_events_per_tick)
@@ -132,11 +136,11 @@ pub fn shutdown(server: &Arc<Server>, runtime: Runtime) -> Result {
fn wait_shutdown(_server: &Arc<Server>, runtime: Runtime) { fn wait_shutdown(_server: &Arc<Server>, runtime: Runtime) {
debug!( debug!(
timeout = ?SHUTDOWN_TIMEOUT, timeout = ?RUNTIME_SHUTDOWN_TIMEOUT,
"Waiting for runtime..." "Waiting for runtime..."
); );
runtime.shutdown_timeout(SHUTDOWN_TIMEOUT); runtime.shutdown_timeout(RUNTIME_SHUTDOWN_TIMEOUT);
// Join any jemalloc threads so they don't appear in use at exit. // Join any jemalloc threads so they don't appear in use at exit.
#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))] #[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
@@ -145,33 +149,45 @@ fn wait_shutdown(_server: &Arc<Server>, runtime: Runtime) {
.ok(); .ok();
} }
fn thread_name() -> String {
let handle = Handle::current();
let num_workers = handle.metrics().num_workers();
let i = THREAD_SPAWNS.load(Ordering::Acquire);
if i >= num_workers {
BLOCKING_THREAD_NAME.into()
} else {
WORKER_THREAD_NAME.into()
}
}
#[tracing::instrument( #[tracing::instrument(
name = "fork", name = "fork",
level = "debug", level = "debug",
skip_all, skip_all,
fields( fields(
id = ?thread::current().id(), tid = ?thread::current().id(),
name = %thread::current().name().unwrap_or("None"), name = %thread::current().name().unwrap_or("None"),
), ),
)] )]
fn thread_start() { fn thread_start() {
debug_assert_eq!( debug_assert!(
Some(WORKER_NAME), thread::current().name() == Some(WORKER_THREAD_NAME)
thread::current().name(), || thread::current().name() == Some(BLOCKING_THREAD_NAME),
"tokio worker name mismatch at thread start" "tokio worker name mismatch at thread start"
); );
if WORKER_AFFINITY.get().is_some_and(is_true!()) { if WORKER_AFFINITY.get().is_some_and(is_true!()) {
set_worker_affinity(); set_worker_affinity();
} }
THREAD_SPAWNS.fetch_add(1, Ordering::AcqRel);
} }
fn set_worker_affinity() { fn set_worker_affinity() {
static CORES_OCCUPIED: AtomicUsize = AtomicUsize::new(0);
let handle = Handle::current(); let handle = Handle::current();
let num_workers = handle.metrics().num_workers(); let num_workers = handle.metrics().num_workers();
let i = CORES_OCCUPIED.fetch_add(1, Ordering::Relaxed); let i = CORES_OCCUPIED.fetch_add(1, Ordering::AcqRel);
if i >= num_workers { if i >= num_workers {
return; return;
} }
@@ -208,7 +224,7 @@ fn set_worker_mallctl(_: usize) {}
level = "debug", level = "debug",
skip_all, skip_all,
fields( fields(
id = ?thread::current().id(), tid = ?thread::current().id(),
name = %thread::current().name().unwrap_or("None"), name = %thread::current().name().unwrap_or("None"),
), ),
)] )]
@@ -225,7 +241,7 @@ fn thread_stop() {
level = "trace", level = "trace",
skip_all, skip_all,
fields( fields(
id = ?thread::current().id(), tid = ?thread::current().id(),
name = %thread::current().name().unwrap_or("None"), name = %thread::current().name().unwrap_or("None"),
), ),
)] )]
@@ -236,7 +252,7 @@ fn thread_unpark() {}
level = "trace", level = "trace",
skip_all, skip_all,
fields( fields(
id = ?thread::current().id(), tid = ?thread::current().id(),
name = %thread::current().name().unwrap_or("None"), name = %thread::current().name().unwrap_or("None"),
), ),
)] )]
@@ -265,6 +281,7 @@ fn gc_on_park() {
skip_all, skip_all,
fields( fields(
id = %meta.id(), id = %meta.id(),
tid = ?thread::current().id(),
), ),
)] )]
fn task_spawn(meta: &tokio::runtime::TaskMeta<'_>) {} fn task_spawn(meta: &tokio::runtime::TaskMeta<'_>) {}
@@ -275,7 +292,8 @@ fn task_spawn(meta: &tokio::runtime::TaskMeta<'_>) {}
level = "trace", level = "trace",
skip_all, skip_all,
fields( fields(
id = %meta.id() id = %meta.id(),
tid = ?thread::current().id()
), ),
)] )]
fn task_terminate(meta: &tokio::runtime::TaskMeta<'_>) {} fn task_terminate(meta: &tokio::runtime::TaskMeta<'_>) {}
@@ -286,7 +304,8 @@ fn task_terminate(meta: &tokio::runtime::TaskMeta<'_>) {}
level = "trace", level = "trace",
skip_all, skip_all,
fields( fields(
id = %meta.id() id = %meta.id(),
tid = ?thread::current().id()
), ),
)] )]
fn task_enter(meta: &tokio::runtime::TaskMeta<'_>) {} fn task_enter(meta: &tokio::runtime::TaskMeta<'_>) {}
@@ -297,7 +316,8 @@ fn task_enter(meta: &tokio::runtime::TaskMeta<'_>) {}
level = "trace", level = "trace",
skip_all, skip_all,
fields( fields(
id = %meta.id() id = %meta.id(),
tid = ?thread::current().id()
), ),
)] )]
fn task_leave(meta: &tokio::runtime::TaskMeta<'_>) {} fn task_leave(meta: &tokio::runtime::TaskMeta<'_>) {}