Reorganize main crate for testability.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -17,17 +17,11 @@ crate-type = [
|
||||
# "dylib",
|
||||
]
|
||||
|
||||
[[bench]]
|
||||
name = "state_res"
|
||||
harness = false
|
||||
|
||||
[features]
|
||||
brotli_compression = [
|
||||
"reqwest/brotli",
|
||||
]
|
||||
tuwunel_mods = [
|
||||
"dep:libloading"
|
||||
]
|
||||
console = []
|
||||
gzip_compression = [
|
||||
"reqwest/gzip",
|
||||
]
|
||||
@@ -53,6 +47,9 @@ release_max_log_level = [
|
||||
"log/release_max_level_info",
|
||||
]
|
||||
sentry_telemetry = []
|
||||
tuwunel_mods = [
|
||||
"dep:libloading"
|
||||
]
|
||||
zstd_compression = [
|
||||
"reqwest/zstd",
|
||||
]
|
||||
@@ -124,3 +121,7 @@ criterion.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[[bench]]
|
||||
name = "state_res"
|
||||
harness = false
|
||||
|
||||
207
src/core/args.rs
Normal file
207
src/core/args.rs
Normal file
@@ -0,0 +1,207 @@
|
||||
//! Integration with `clap`
|
||||
|
||||
use std::path::PathBuf;
|
||||
|
||||
use clap::{ArgAction, Parser};
|
||||
|
||||
use crate::{
|
||||
Err, Result,
|
||||
config::{Figment, FigmentValue},
|
||||
err, toml,
|
||||
utils::available_parallelism,
|
||||
};
|
||||
|
||||
/// Commandline arguments
|
||||
#[derive(Parser, Debug)]
|
||||
#[clap(
|
||||
about,
|
||||
long_about = None,
|
||||
name = "tuwunel",
|
||||
version = crate::version(),
|
||||
)]
|
||||
pub struct Args {
|
||||
#[arg(short, long)]
|
||||
/// Path to the config TOML file (optional)
|
||||
pub config: Option<Vec<PathBuf>>,
|
||||
|
||||
/// Override a configuration variable using TOML 'key=value' syntax
|
||||
#[arg(long, short('O'))]
|
||||
pub option: Vec<String>,
|
||||
|
||||
/// Run in a stricter read-only --maintenance mode.
|
||||
#[arg(long)]
|
||||
pub read_only: bool,
|
||||
|
||||
/// Run in maintenance mode while refusing connections.
|
||||
#[arg(long)]
|
||||
pub maintenance: bool,
|
||||
|
||||
#[cfg(feature = "console")]
|
||||
/// Activate admin command console automatically after startup.
|
||||
#[arg(long, num_args(0))]
|
||||
pub console: bool,
|
||||
|
||||
/// Execute console command automatically after startup.
|
||||
#[arg(long)]
|
||||
pub execute: Vec<String>,
|
||||
|
||||
/// Set functional testing modes if available. Ex '--test=smoke'
|
||||
#[arg(long, hide(true))]
|
||||
pub test: Vec<String>,
|
||||
|
||||
/// Override the tokio worker_thread count.
|
||||
#[arg(
|
||||
long,
|
||||
hide(true),
|
||||
env = "TOKIO_WORKER_THREADS",
|
||||
default_value = available_parallelism().to_string(),
|
||||
)]
|
||||
pub worker_threads: usize,
|
||||
|
||||
/// Override the tokio global_queue_interval.
|
||||
#[arg(
|
||||
long,
|
||||
hide(true),
|
||||
env = "TOKIO_GLOBAL_QUEUE_INTERVAL",
|
||||
default_value = "192"
|
||||
)]
|
||||
pub global_event_interval: u32,
|
||||
|
||||
/// Override the tokio event_interval.
|
||||
#[arg(
|
||||
long,
|
||||
hide(true),
|
||||
env = "TOKIO_EVENT_INTERVAL",
|
||||
default_value = "512"
|
||||
)]
|
||||
pub kernel_event_interval: u32,
|
||||
|
||||
/// Override the tokio max_io_events_per_tick.
|
||||
#[arg(
|
||||
long,
|
||||
hide(true),
|
||||
env = "TOKIO_MAX_IO_EVENTS_PER_TICK",
|
||||
default_value = "512"
|
||||
)]
|
||||
pub kernel_events_per_tick: usize,
|
||||
|
||||
/// Set the histogram bucket size, in microseconds (tokio_unstable). Default
|
||||
/// is 25 microseconds. If the values of the histogram don't approach zero
|
||||
/// with the exception of the last bucket, try increasing this value to e.g.
|
||||
/// 50 or 100. Inversely, decrease to 10 etc if the histogram lacks
|
||||
/// resolution.
|
||||
#[arg(
|
||||
long,
|
||||
hide(true),
|
||||
env = "TUWUNEL_RUNTIME_HISTOGRAM_INTERVAL",
|
||||
default_value = "25"
|
||||
)]
|
||||
pub worker_histogram_interval: u64,
|
||||
|
||||
/// Set the histogram bucket count (tokio_unstable). Default is 20.
|
||||
#[arg(
|
||||
long,
|
||||
hide(true),
|
||||
env = "TUWUNEL_RUNTIME_HISTOGRAM_BUCKETS",
|
||||
default_value = "20"
|
||||
)]
|
||||
pub worker_histogram_buckets: usize,
|
||||
|
||||
/// Toggles worker affinity feature.
|
||||
#[arg(
|
||||
long,
|
||||
hide(true),
|
||||
env = "TUWUNEL_RUNTIME_WORKER_AFFINITY",
|
||||
action = ArgAction::Set,
|
||||
num_args = 0..=1,
|
||||
require_equals(false),
|
||||
default_value = "true",
|
||||
default_missing_value = "true",
|
||||
)]
|
||||
pub worker_affinity: bool,
|
||||
|
||||
/// Toggles feature to promote memory reclamation by the operating system
|
||||
/// when tokio worker runs out of work.
|
||||
#[arg(
|
||||
long,
|
||||
hide(true),
|
||||
env = "TUWUNEL_RUNTIME_GC_ON_PARK",
|
||||
action = ArgAction::Set,
|
||||
num_args = 0..=1,
|
||||
require_equals(false),
|
||||
)]
|
||||
pub gc_on_park: Option<bool>,
|
||||
|
||||
/// Toggles muzzy decay for jemalloc arenas associated with a tokio
|
||||
/// worker (when worker-affinity is enabled). Setting to false releases
|
||||
/// memory to the operating system using MADV_FREE without MADV_DONTNEED.
|
||||
/// Setting to false increases performance by reducing pagefaults, but
|
||||
/// resident memory usage appears high until there is memory pressure. The
|
||||
/// default is true unless the system has four or more cores.
|
||||
#[arg(
|
||||
long,
|
||||
hide(true),
|
||||
env = "TUWUNEL_RUNTIME_GC_MUZZY",
|
||||
action = ArgAction::Set,
|
||||
num_args = 0..=1,
|
||||
require_equals(false),
|
||||
)]
|
||||
pub gc_muzzy: Option<bool>,
|
||||
}
|
||||
|
||||
impl Default for Args {
|
||||
fn default() -> Self { Self::parse() }
|
||||
}
|
||||
|
||||
/// Parse commandline arguments into structured data
|
||||
#[must_use]
|
||||
pub fn parse() -> Args { Args::parse() }
|
||||
|
||||
/// Synthesize any command line options with configuration file options.
|
||||
pub fn update(mut config: Figment, args: &Args) -> Result<Figment> {
|
||||
if args.read_only {
|
||||
config = config.join(("rocksdb_read_only", true));
|
||||
}
|
||||
|
||||
if args.maintenance || args.read_only {
|
||||
config = config.join(("startup_netburst", false));
|
||||
config = config.join(("listening", false));
|
||||
}
|
||||
|
||||
#[cfg(feature = "console")]
|
||||
// Indicate the admin console should be spawned automatically if the
|
||||
// configuration file hasn't already.
|
||||
if args.console {
|
||||
config = config.join(("admin_console_automatic", true));
|
||||
}
|
||||
|
||||
// Execute commands after any commands listed in configuration file
|
||||
config = config.adjoin(("admin_execute", &args.execute));
|
||||
|
||||
// Update config with names of any functional-tests
|
||||
config = config.adjoin(("test", &args.test));
|
||||
|
||||
// All other individual overrides can go last in case we have options which
|
||||
// set multiple conf items at once and the user still needs granular overrides.
|
||||
for option in &args.option {
|
||||
let (path, val) = option
|
||||
.split_once('=')
|
||||
.ok_or_else(|| err!("Missing '=' in -O/--option: {option:?}"))?;
|
||||
|
||||
if path.is_empty() {
|
||||
return Err!("Missing key= in -O/--option: {option:?}");
|
||||
}
|
||||
|
||||
if val.is_empty() {
|
||||
return Err!("Missing =val in -O/--option: {option:?}");
|
||||
}
|
||||
|
||||
// The value has to pass for what would appear as a line in the TOML file.
|
||||
let val = toml::from_str::<FigmentValue>(option)?;
|
||||
|
||||
// Figment::merge() overrides existing
|
||||
config = config.merge((path, val.find(path)));
|
||||
}
|
||||
|
||||
Ok(config)
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
#![type_length_limit = "12288"]
|
||||
|
||||
pub mod alloc;
|
||||
pub mod args;
|
||||
pub mod config;
|
||||
pub mod debug;
|
||||
pub mod error;
|
||||
@@ -9,6 +10,7 @@ pub mod log;
|
||||
pub mod matrix;
|
||||
pub mod metrics;
|
||||
pub mod mods;
|
||||
pub mod runtime;
|
||||
pub mod server;
|
||||
pub mod utils;
|
||||
|
||||
@@ -20,12 +22,14 @@ pub use ::smallstr;
|
||||
pub use ::smallvec;
|
||||
pub use ::toml;
|
||||
pub use ::tracing;
|
||||
pub use args::Args;
|
||||
pub use config::Config;
|
||||
pub use error::Error;
|
||||
pub use info::{rustc_flags_capture, version, version::version};
|
||||
pub use matrix::{
|
||||
Event, EventTypeExt, Pdu, PduCount, PduEvent, PduId, RoomVersion, pdu, state_res,
|
||||
};
|
||||
pub use runtime::Runtime;
|
||||
pub use server::Server;
|
||||
pub use utils::{ctor, dtor, implement, result, result::Result};
|
||||
|
||||
|
||||
290
src/core/runtime.rs
Normal file
290
src/core/runtime.rs
Normal file
@@ -0,0 +1,290 @@
|
||||
use std::{
|
||||
iter::once,
|
||||
sync::{
|
||||
Arc, OnceLock,
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
},
|
||||
thread,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use tokio::runtime::Builder;
|
||||
pub use tokio::runtime::{Handle, Runtime};
|
||||
|
||||
#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
|
||||
use crate::result::LogDebugErr;
|
||||
use crate::{
|
||||
Args, Result, Server, debug, is_true,
|
||||
utils::sys::compute::{nth_core_available, set_affinity},
|
||||
};
|
||||
|
||||
const WORKER_NAME: &str = "tuwunel:worker";
|
||||
const WORKER_MIN: usize = 2;
|
||||
const WORKER_KEEPALIVE: u64 = 36;
|
||||
const MAX_BLOCKING_THREADS: usize = 1024;
|
||||
const SHUTDOWN_TIMEOUT: Duration = Duration::from_millis(10000);
|
||||
#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
|
||||
const DISABLE_MUZZY_THRESHOLD: usize = 4;
|
||||
|
||||
static WORKER_AFFINITY: OnceLock<bool> = OnceLock::new();
|
||||
static GC_ON_PARK: OnceLock<Option<bool>> = OnceLock::new();
|
||||
static GC_MUZZY: OnceLock<Option<bool>> = OnceLock::new();
|
||||
|
||||
pub fn new(args: Option<&Args>) -> Result<Runtime> {
|
||||
let args_default = args.is_none().then(Args::default);
|
||||
let args = args.unwrap_or_else(|| args_default.as_ref().expect("default arguments"));
|
||||
|
||||
WORKER_AFFINITY
|
||||
.set(args.worker_affinity)
|
||||
.expect("set WORKER_AFFINITY from program argument");
|
||||
|
||||
GC_ON_PARK
|
||||
.set(args.gc_on_park)
|
||||
.expect("set GC_ON_PARK from program argument");
|
||||
|
||||
GC_MUZZY
|
||||
.set(args.gc_muzzy)
|
||||
.expect("set GC_MUZZY from program argument");
|
||||
|
||||
let mut builder = Builder::new_multi_thread();
|
||||
builder
|
||||
.enable_io()
|
||||
.enable_time()
|
||||
.thread_name(WORKER_NAME)
|
||||
.worker_threads(args.worker_threads.max(WORKER_MIN))
|
||||
.max_blocking_threads(MAX_BLOCKING_THREADS)
|
||||
.thread_keep_alive(Duration::from_secs(WORKER_KEEPALIVE))
|
||||
.global_queue_interval(args.global_event_interval)
|
||||
.event_interval(args.kernel_event_interval)
|
||||
.max_io_events_per_tick(args.kernel_events_per_tick)
|
||||
.on_thread_start(thread_start)
|
||||
.on_thread_stop(thread_stop)
|
||||
.on_thread_unpark(thread_unpark)
|
||||
.on_thread_park(thread_park);
|
||||
|
||||
#[cfg(tokio_unstable)]
|
||||
builder
|
||||
.on_task_spawn(task_spawn)
|
||||
.on_before_task_poll(task_enter)
|
||||
.on_after_task_poll(task_leave)
|
||||
.on_task_terminate(task_terminate);
|
||||
|
||||
#[cfg(tokio_unstable)]
|
||||
enable_histogram(&mut builder, args);
|
||||
|
||||
builder.build().map_err(Into::into)
|
||||
}
|
||||
|
||||
#[cfg(tokio_unstable)]
|
||||
fn enable_histogram(builder: &mut Builder, args: &Args) {
|
||||
use tokio::runtime::HistogramConfiguration;
|
||||
|
||||
let buckets = args.worker_histogram_buckets;
|
||||
let interval = Duration::from_micros(args.worker_histogram_interval);
|
||||
let linear = HistogramConfiguration::linear(interval, buckets);
|
||||
builder
|
||||
.enable_metrics_poll_time_histogram()
|
||||
.metrics_poll_time_histogram_configuration(linear);
|
||||
}
|
||||
|
||||
#[cfg(tokio_unstable)]
|
||||
#[tracing::instrument(name = "stop", level = "info", skip_all)]
|
||||
pub fn shutdown(server: &Arc<Server>, runtime: Runtime) -> Result {
|
||||
use tracing::Level;
|
||||
|
||||
// The final metrics output is promoted to INFO when tokio_unstable is active in
|
||||
// a release/bench mode and DEBUG is likely optimized out
|
||||
const LEVEL: Level = if cfg!(debug_assertions) {
|
||||
Level::DEBUG
|
||||
} else {
|
||||
Level::INFO
|
||||
};
|
||||
|
||||
wait_shutdown(server, runtime);
|
||||
let runtime_metrics = server
|
||||
.metrics
|
||||
.runtime_interval()
|
||||
.unwrap_or_default();
|
||||
|
||||
crate::event!(LEVEL, ?runtime_metrics, "Final runtime metrics");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(not(tokio_unstable))]
|
||||
#[tracing::instrument(name = "stop", level = "info", skip_all)]
|
||||
pub fn shutdown(server: &Arc<Server>, runtime: Runtime) -> Result {
|
||||
wait_shutdown(server, runtime);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn wait_shutdown(_server: &Arc<Server>, runtime: Runtime) {
|
||||
debug!(
|
||||
timeout = ?SHUTDOWN_TIMEOUT,
|
||||
"Waiting for runtime..."
|
||||
);
|
||||
|
||||
runtime.shutdown_timeout(SHUTDOWN_TIMEOUT);
|
||||
|
||||
// Join any jemalloc threads so they don't appear in use at exit.
|
||||
#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
|
||||
crate::alloc::je::background_thread_enable(false)
|
||||
.log_debug_err()
|
||||
.ok();
|
||||
}
|
||||
|
||||
#[tracing::instrument(
|
||||
name = "fork",
|
||||
level = "debug",
|
||||
skip_all,
|
||||
fields(
|
||||
id = ?thread::current().id(),
|
||||
name = %thread::current().name().unwrap_or("None"),
|
||||
),
|
||||
)]
|
||||
fn thread_start() {
|
||||
debug_assert_eq!(
|
||||
Some(WORKER_NAME),
|
||||
thread::current().name(),
|
||||
"tokio worker name mismatch at thread start"
|
||||
);
|
||||
|
||||
if WORKER_AFFINITY.get().is_some_and(is_true!()) {
|
||||
set_worker_affinity();
|
||||
}
|
||||
}
|
||||
|
||||
fn set_worker_affinity() {
|
||||
static CORES_OCCUPIED: AtomicUsize = AtomicUsize::new(0);
|
||||
|
||||
let handle = Handle::current();
|
||||
let num_workers = handle.metrics().num_workers();
|
||||
let i = CORES_OCCUPIED.fetch_add(1, Ordering::Relaxed);
|
||||
if i >= num_workers {
|
||||
return;
|
||||
}
|
||||
|
||||
let Some(id) = nth_core_available(i) else {
|
||||
return;
|
||||
};
|
||||
|
||||
set_affinity(once(id));
|
||||
set_worker_mallctl(id);
|
||||
}
|
||||
|
||||
#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
|
||||
fn set_worker_mallctl(id: usize) {
|
||||
use crate::alloc::je::{
|
||||
is_affine_arena,
|
||||
this_thread::{set_arena, set_muzzy_decay},
|
||||
};
|
||||
|
||||
if is_affine_arena() {
|
||||
set_arena(id).log_debug_err().ok();
|
||||
}
|
||||
|
||||
let muzzy_option = GC_MUZZY
|
||||
.get()
|
||||
.expect("GC_MUZZY initialized by runtime::new()");
|
||||
|
||||
let muzzy_auto_disable = crate::utils::available_parallelism() >= DISABLE_MUZZY_THRESHOLD;
|
||||
|
||||
if matches!(muzzy_option, Some(false) | None if muzzy_auto_disable) {
|
||||
set_muzzy_decay(-1).log_debug_err().ok();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(any(not(feature = "jemalloc"), target_env = "msvc"))]
|
||||
fn set_worker_mallctl(_: usize) {}
|
||||
|
||||
#[tracing::instrument(
|
||||
name = "join",
|
||||
level = "debug",
|
||||
skip_all,
|
||||
fields(
|
||||
id = ?thread::current().id(),
|
||||
name = %thread::current().name().unwrap_or("None"),
|
||||
),
|
||||
)]
|
||||
fn thread_stop() {}
|
||||
|
||||
#[tracing::instrument(
|
||||
name = "work",
|
||||
level = "trace",
|
||||
skip_all,
|
||||
fields(
|
||||
id = ?thread::current().id(),
|
||||
name = %thread::current().name().unwrap_or("None"),
|
||||
),
|
||||
)]
|
||||
fn thread_unpark() {}
|
||||
|
||||
#[tracing::instrument(
|
||||
name = "park",
|
||||
level = "trace",
|
||||
skip_all,
|
||||
fields(
|
||||
id = ?thread::current().id(),
|
||||
name = %thread::current().name().unwrap_or("None"),
|
||||
),
|
||||
)]
|
||||
fn thread_park() {
|
||||
match GC_ON_PARK
|
||||
.get()
|
||||
.as_ref()
|
||||
.expect("GC_ON_PARK initialized by runtime::new()")
|
||||
{
|
||||
| Some(true) | None if cfg!(feature = "jemalloc_conf") => gc_on_park(),
|
||||
| _ => (),
|
||||
}
|
||||
}
|
||||
|
||||
fn gc_on_park() {
|
||||
#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
|
||||
crate::alloc::je::this_thread::decay()
|
||||
.log_debug_err()
|
||||
.ok();
|
||||
}
|
||||
|
||||
#[cfg(tokio_unstable)]
|
||||
#[tracing::instrument(
|
||||
name = "spawn",
|
||||
level = "trace",
|
||||
skip_all,
|
||||
fields(
|
||||
id = %meta.id(),
|
||||
),
|
||||
)]
|
||||
fn task_spawn(meta: &tokio::runtime::TaskMeta<'_>) {}
|
||||
|
||||
#[cfg(tokio_unstable)]
|
||||
#[tracing::instrument(
|
||||
name = "finish",
|
||||
level = "trace",
|
||||
skip_all,
|
||||
fields(
|
||||
id = %meta.id()
|
||||
),
|
||||
)]
|
||||
fn task_terminate(meta: &tokio::runtime::TaskMeta<'_>) {}
|
||||
|
||||
#[cfg(tokio_unstable)]
|
||||
#[tracing::instrument(
|
||||
name = "enter",
|
||||
level = "trace",
|
||||
skip_all,
|
||||
fields(
|
||||
id = %meta.id()
|
||||
),
|
||||
)]
|
||||
fn task_enter(meta: &tokio::runtime::TaskMeta<'_>) {}
|
||||
|
||||
#[cfg(tokio_unstable)]
|
||||
#[tracing::instrument(
|
||||
name = "leave",
|
||||
level = "trace",
|
||||
skip_all,
|
||||
fields(
|
||||
id = %meta.id()
|
||||
),
|
||||
)]
|
||||
fn task_leave(meta: &tokio::runtime::TaskMeta<'_>) {}
|
||||
Reference in New Issue
Block a user