From d665a34f30d8f8556cb4b4ef2d30af0cf5559a96 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Wed, 24 Dec 2025 16:36:38 +0000 Subject: [PATCH] Support mdraid hierarchies for storage topology detection. Signed-off-by: Jason Volk --- src/core/utils/sys.rs | 12 ++- src/core/utils/sys/compute.rs | 6 +- src/core/utils/sys/storage.rs | 76 ++++++++++++++--- src/database/pool/configure.rs | 148 ++++++++++++++++++++++++--------- 4 files changed, 188 insertions(+), 54 deletions(-) diff --git a/src/core/utils/sys.rs b/src/core/utils/sys.rs index 626df384..dc74f43f 100644 --- a/src/core/utils/sys.rs +++ b/src/core/utils/sys.rs @@ -5,7 +5,7 @@ use std::path::PathBuf; pub use compute::available_parallelism; -use crate::{Result, debug}; +use crate::{Result, at, debug}; /// This is needed for opening lots of file descriptors, which tends to /// happen more often when using RocksDB and making lots of federation @@ -51,3 +51,13 @@ pub fn current_exe_deleted() -> bool { .is_some_and(|exe| exe.ends_with(" (deleted)")) }) } + +/// Parse the `KEY=VALUE` contents of a `uevent` file searching for `key` and +/// returning the `value`. +fn _uevent_get<'a>(uevent: &'a str, key: &'a str) -> Option<&'a str> { + uevent + .lines() + .filter_map(|line| line.split_once('=')) + .find(|&(key_, _)| key.eq(key_)) + .map(at!(1)) +} diff --git a/src/core/utils/sys/compute.rs b/src/core/utils/sys/compute.rs index 8bb4f704..72956db9 100644 --- a/src/core/utils/sys/compute.rs +++ b/src/core/utils/sys/compute.rs @@ -9,7 +9,11 @@ type Id = usize; type Mask = u128; type Masks = [Mask; MASK_BITS]; -const MASK_BITS: usize = 128; +const MASK_BITS: usize = CORES_MAX; + +/// Maximum number of cores we support; for now limited to bits of our mask +/// integral. +pub const CORES_MAX: usize = 128; /// The mask of logical cores available to the process (at startup). static CORES_AVAILABLE: LazyLock = LazyLock::new(|| into_mask(query_cores_available())); diff --git a/src/core/utils/sys/storage.rs b/src/core/utils/sys/storage.rs index 4f55a5e9..0224f28d 100644 --- a/src/core/utils/sys/storage.rs +++ b/src/core/utils/sys/storage.rs @@ -16,9 +16,22 @@ use crate::{ utils::{result::LogDebugErr, string::SplitInfallible}, }; -/// Device characteristics useful for random access throughput +/// Multi-Device (md) i.e. software raid properties. #[derive(Clone, Debug, Default)] -pub struct Parallelism { +pub struct MultiDevice { + /// Type of raid (i.e. `raid1`); None if no raid present or detected. + pub level: Option, + + /// Number of participating devices. + pub raid_disks: usize, + + /// The MQ's discovered on the devices; or empty. + pub md: Vec, +} + +/// Multi-Queue (mq) characteristics. +#[derive(Clone, Debug, Default)] +pub struct MultiQueue { /// Number of requests for the device. pub nr_requests: Option, @@ -26,7 +39,7 @@ pub struct Parallelism { pub mq: Vec, } -/// Device queue characteristics +/// Single-queue characteristics #[derive(Clone, Debug, Default)] pub struct Queue { /// Queue's indice. @@ -39,18 +52,59 @@ pub struct Queue { pub cpu_list: Vec, } -/// Get device characteristics useful for random access throughput by name. +/// Get properties of a MultiDevice (md) storage system #[must_use] -pub fn parallelism(path: &Path) -> Parallelism { +pub fn md_discover(path: &Path) -> MultiDevice { let dev_id = dev_from_path(path) .log_debug_err() .unwrap_or_default(); - let mq_path = block_path(dev_id).join("mq/"); + let md_path = block_path(dev_id).join("md/"); - let nr_requests_path = block_path(dev_id).join("queue/nr_requests"); + let raid_disks_path = md_path.join("raid_disks"); - Parallelism { + let raid_disks: usize = read_to_string(&raid_disks_path) + .ok() + .as_deref() + .map(str::trim) + .map(str::parse) + .flat_ok() + .unwrap_or(0); + + let single_fallback = raid_disks.eq(&0).then(|| block_path(dev_id)); + + MultiDevice { + raid_disks, + + level: read_to_string(md_path.join("level")) + .ok() + .as_deref() + .map(str::trim) + .map(ToOwned::to_owned), + + md: (0..raid_disks) + .map(|i| format!("rd{i}/block")) + .map(|path| md_path.join(&path)) + .filter_map(|ref path| path.canonicalize().ok()) + .map(|mut path| { + path.pop(); + path + }) + .chain(single_fallback) + .map(|path| mq_discover(&path)) + .filter(|mq| !mq.mq.is_empty()) + .collect(), + } +} + +/// Get properties of a MultiQueue within a MultiDevice. +#[must_use] +fn mq_discover(path: &Path) -> MultiQueue { + let mq_path = path.join("mq/"); + + let nr_requests_path = path.join("queue/nr_requests"); + + MultiQueue { nr_requests: read_to_string(&nr_requests_path) .ok() .as_deref() @@ -68,13 +122,13 @@ pub fn parallelism(path: &Path) -> Parallelism { .as_ref() .is_ok_and(FileType::is_dir) }) - .map(|dir| queue_parallelism(&dir.path())) + .map(|dir| queue_discover(&dir.path())) .collect(), } } -/// Get device queue characteristics by mq path on sysfs(5) -fn queue_parallelism(dir: &Path) -> Queue { +/// Get properties of a Queue within a MultiQueue. +fn queue_discover(dir: &Path) -> Queue { let queue_id = dir.file_name(); let nr_tags_path = dir.join("nr_tags"); diff --git a/src/database/pool/configure.rs b/src/database/pool/configure.rs index e5f158c2..c6836b6e 100644 --- a/src/database/pool/configure.rs +++ b/src/database/pool/configure.rs @@ -1,39 +1,69 @@ use std::{path::PathBuf, sync::Arc}; use tuwunel_core::{ - Server, debug, debug_info, expected, is_equal_to, + Server, debug, + debug::INFO_SPAN_LEVEL, + debug_info, debug_warn, expected, info, is_equal_to, utils::{ + BoolExt, math::usize_from_f64, result::LogDebugErr, stream, stream::{AMPLIFICATION_LIMIT, WIDTH_LIMIT}, - sys::{compute::is_core_available, storage}, + sys::{ + compute::{CORES_MAX, available_parallelism, is_core_available}, + storage, + }, }, }; use super::{QUEUE_LIMIT, WORKER_LIMIT}; +#[tracing::instrument( + level = INFO_SPAN_LEVEL, + skip_all, + ret(level = "trace"), +)] pub(super) fn configure(server: &Arc) -> (usize, Vec, Vec) { let config = &server.config; + let num_cores = available_parallelism(); // This finds the block device and gathers all the properties we need. let path: PathBuf = config.database_path.clone(); let device_name = storage::name_from_path(&path) .log_debug_err() .ok(); - let device_prop = storage::parallelism(&path); + + let devices = storage::md_discover(&path); + let topology_detected = devices.md.is_empty().is_false(); + debug!(?topology_detected, ?device_name, ?devices); // The default worker count is masked-on if we didn't find better information. - let default_worker_count = device_prop - .mq - .is_empty() + let default_worker_count = topology_detected + .is_false() .then_some(config.db_pool_workers); - // Determine the worker groupings. Each indice represents a hardware queue and - // contains the number of workers which will service it. - let worker_counts: Vec<_> = device_prop - .mq + // Sum the total number of possible tags. When no hardware detected this will + // default to the default_worker_count + let total_tags = devices + .md .iter() + .flat_map(|md| md.mq.iter()) + .filter(|mq| mq.cpu_list.iter().copied().any(is_core_available)) + .filter_map(|mq| mq.nr_tags) + .chain(default_worker_count) + .fold(0_usize, usize::saturating_add); + + // Determine the worker groupings. Each indice represents a hardware queue and + // contains the number of workers which will service it. This vector is + // truncated to the number of cores on systems which have multiple hardware + // queues per core. When no hardware is detected this defaults to one queue with + // a default count of workers. + let worker_counts: Vec<_> = devices + .md + .iter() + .inspect(|md| debug!(?md)) + .flat_map(|md| md.mq.iter()) .filter(|mq| mq.cpu_list.iter().copied().any(is_core_available)) .map(|mq| { let shares = mq @@ -47,13 +77,20 @@ pub(super) fn configure(server: &Arc) -> (usize, Vec, Vec) .db_pool_workers_limit .saturating_mul(shares); - let limit = device_prop - .nr_requests - .map_or(limit, |nr| nr.min(limit)); + let limit = devices.md.iter().fold(0_usize, |acc, mq| { + mq.nr_requests + .map(|nr| nr.min(limit)) + .or(Some(limit)) + .map(|nr| acc.saturating_add(nr)) + .unwrap_or(acc) + }); + + debug!(?mq, ?shares, ?limit); mq.nr_tags.unwrap_or(WORKER_LIMIT.0).min(limit) }) .chain(default_worker_count) + .take(num_cores) .collect(); // Determine our software queue size for each hardware queue. This is the mpmc @@ -67,14 +104,18 @@ pub(super) fn configure(server: &Arc) -> (usize, Vec, Vec) }) .collect(); - // Determine the CPU affinities of each hardware queue. Each indice is a cpu and - // each value is the associated hardware queue. There is a little shiftiness - // going on because cpu's which are not available to the process are filtered - // out, similar to the worker_counts. - let topology = device_prop - .mq + // Determine the CPU affinities of each hardware queue. Each indice is a core + // and each value is the associated hardware queue. On systems which share + // queues between cores some values will be repeated; on systems with multiple + // queues per core the affinities are assumed to match and we don't require a + // vector of vectors. There is a little hiftiness going on because cpu's which + // are not available to the process are filtered out, similar to the + // worker_counts. + let topology = devices + .md .iter() - .fold(vec![0; 128], |mut topology, mq| { + .flat_map(|md| md.mq.iter()) + .fold(vec![0; CORES_MAX], |mut topology, mq| { mq.cpu_list .iter() .filter(|&&id| is_core_available(id)) @@ -83,13 +124,17 @@ pub(super) fn configure(server: &Arc) -> (usize, Vec, Vec) }); topology - }); + }) + .into_iter() + .take(num_cores) + .collect(); // Regardless of the capacity of all queues we establish some limit on the total // number of workers; this is hopefully hinted by nr_requests. - let max_workers = device_prop - .mq + let max_workers = devices + .md .iter() + .flat_map(|md| md.mq.iter()) .filter_map(|mq| mq.nr_tags) .chain(default_worker_count) .fold(0_usize, usize::saturating_add) @@ -103,21 +148,32 @@ pub(super) fn configure(server: &Arc) -> (usize, Vec, Vec) // After computing all of the above we can update the global automatic stream // width, hopefully with a better value tailored to this system. + let num_queues = queue_sizes.len(); if config.stream_width_scale > 0.0 { - let num_queues = queue_sizes.len().max(1); - update_stream_width(server, num_queues, total_workers); + update_stream_width(server, num_queues, total_workers, total_tags); } - debug_info!( - device_name = ?device_name - .as_deref() - .unwrap_or("None"), - ?worker_counts, - ?queue_sizes, - ?total_workers, - stream_width = ?stream::automatic_width(), - "Frontend topology", - ); + if topology_detected { + debug_info!(?topology, ?worker_counts, ?queue_sizes, "Frontend topology",); + info!( + device_name = ?device_name.as_deref().unwrap_or("None"), + ?num_cores, + ?num_queues, + ?total_workers, + ?total_tags, + stream_width = ?stream::automatic_width(), + amplification = ?stream::automatic_amplification(), + "Frontend topology", + ); + } else { + debug_warn!( + device_name = ?device_name.as_deref().unwrap_or("None"), + ?total_workers, + stream_width = ?stream::automatic_width(), + amplification = ?stream::automatic_amplification(), + "Storage hardware not detected for database directory; assuming defaults.", + ); + } assert!(total_workers > 0, "some workers expected"); assert!(!queue_sizes.is_empty(), "some queues expected"); @@ -130,26 +186,36 @@ pub(super) fn configure(server: &Arc) -> (usize, Vec, Vec) } #[allow(clippy::as_conversions, clippy::cast_precision_loss)] -fn update_stream_width(server: &Arc, num_queues: usize, total_workers: usize) { +fn update_stream_width( + server: &Arc, + num_queues: usize, + total_workers: usize, + total_tags: usize, +) { let config = &server.config; let scale: f64 = config.stream_width_scale.min(100.0).into(); + let auto_scale = total_tags as f64 / total_workers as f64; + let auto_scale_width = auto_scale / num_queues as f64; - let req_width = expected!(total_workers / num_queues).next_multiple_of(2); - let req_width = req_width as f64; + let req_width = expected!(total_workers / num_queues).next_multiple_of(8); + let req_width = req_width as f64 * auto_scale_width.clamp(1.0, 4.0); let req_width = usize_from_f64(req_width * scale) .expect("failed to convert f64 to usize") + .next_multiple_of(4) .clamp(WIDTH_LIMIT.0, WIDTH_LIMIT.1); - let req_amp = config.stream_amplification as f64; + let req_amp = config.stream_amplification as f64 * auto_scale.clamp(1.0, 4.0); let req_amp = usize_from_f64(req_amp * scale) .expect("failed to convert f64 to usize") + .next_multiple_of(64) .clamp(AMPLIFICATION_LIMIT.0, AMPLIFICATION_LIMIT.1); let (old_width, new_width) = stream::set_width(req_width); let (old_amp, new_amp) = stream::set_amplification(req_amp); debug!( - scale = ?config.stream_width_scale, - ?num_queues, + config_scale = ?config.stream_width_scale, + ?auto_scale, + ?auto_scale_width, ?req_width, ?old_width, ?new_width,