diff --git a/src/core/utils/sys.rs b/src/core/utils/sys.rs index 626df384..541ee810 100644 --- a/src/core/utils/sys.rs +++ b/src/core/utils/sys.rs @@ -5,7 +5,7 @@ use std::path::PathBuf; pub use compute::available_parallelism; -use crate::{Result, debug}; +use crate::{Result, at, debug}; /// This is needed for opening lots of file descriptors, which tends to /// happen more often when using RocksDB and making lots of federation @@ -51,3 +51,15 @@ pub fn current_exe_deleted() -> bool { .is_some_and(|exe| exe.ends_with(" (deleted)")) }) } + +/// Parse the `KEY=VALUE` contents of a `uevent` file searching for `key` and +/// returning the `value`. +#[inline] +#[must_use] +pub fn uevent_find<'a>(uevent: &'a str, key: &'a str) -> Option<&'a str> { + uevent + .lines() + .filter_map(|line| line.split_once('=')) + .find(|&(key_, _)| key.eq(key_)) + .map(at!(1)) +} diff --git a/src/core/utils/sys/compute.rs b/src/core/utils/sys/compute.rs index 68cfdf0e..bc3b71ac 100644 --- a/src/core/utils/sys/compute.rs +++ b/src/core/utils/sys/compute.rs @@ -9,7 +9,11 @@ type Id = usize; type Mask = u128; type Masks = [Mask; MASK_BITS]; -const MASK_BITS: usize = 128; +const MASK_BITS: usize = CORES_MAX; + +/// Maximum number of cores we support; for now limited to bits of our mask +/// integral. +pub const CORES_MAX: usize = 128; /// The mask of logical cores available to the process (at startup). static CORES_AVAILABLE: LazyLock = LazyLock::new(|| into_mask(query_cores_available())); diff --git a/src/core/utils/sys/storage.rs b/src/core/utils/sys/storage.rs index 4f55a5e9..36b57e60 100644 --- a/src/core/utils/sys/storage.rs +++ b/src/core/utils/sys/storage.rs @@ -8,6 +8,7 @@ use std::{ path::{Path, PathBuf}, }; +use itertools::Itertools; use libc::dev_t; use crate::{ @@ -16,9 +17,22 @@ use crate::{ utils::{result::LogDebugErr, string::SplitInfallible}, }; -/// Device characteristics useful for random access throughput +/// Multi-Device (md) i.e. software raid properties. #[derive(Clone, Debug, Default)] -pub struct Parallelism { +pub struct MultiDevice { + /// Type of raid (i.e. `raid1`); None if no raid present or detected. + pub level: Option, + + /// Number of participating devices. + pub raid_disks: usize, + + /// The MQ's discovered on the devices; or empty. + pub md: Vec, +} + +/// Multi-Queue (mq) characteristics. +#[derive(Clone, Debug, Default)] +pub struct MultiQueue { /// Number of requests for the device. pub nr_requests: Option, @@ -26,7 +40,7 @@ pub struct Parallelism { pub mq: Vec, } -/// Device queue characteristics +/// Single-queue characteristics #[derive(Clone, Debug, Default)] pub struct Queue { /// Queue's indice. @@ -39,18 +53,59 @@ pub struct Queue { pub cpu_list: Vec, } -/// Get device characteristics useful for random access throughput by name. +/// Get properties of a MultiDevice (md) storage system #[must_use] -pub fn parallelism(path: &Path) -> Parallelism { +pub fn md_discover(path: &Path) -> MultiDevice { let dev_id = dev_from_path(path) .log_debug_err() .unwrap_or_default(); - let mq_path = block_path(dev_id).join("mq/"); + let md_path = block_path(dev_id).join("md/"); - let nr_requests_path = block_path(dev_id).join("queue/nr_requests"); + let raid_disks_path = md_path.join("raid_disks"); - Parallelism { + let raid_disks: usize = read_to_string(&raid_disks_path) + .ok() + .as_deref() + .map(str::trim) + .map(str::parse) + .flat_ok() + .unwrap_or(0); + + let single_fallback = raid_disks.eq(&0).then(|| block_path(dev_id)); + + MultiDevice { + raid_disks, + + level: read_to_string(md_path.join("level")) + .ok() + .as_deref() + .map(str::trim) + .map(ToOwned::to_owned), + + md: (0..raid_disks) + .map(|i| format!("rd{i}/block")) + .map(|path| md_path.join(&path)) + .filter_map(|ref path| path.canonicalize().ok()) + .map(|mut path| { + path.pop(); + path + }) + .chain(single_fallback) + .map(|path| mq_discover(&path)) + .filter(|mq| !mq.mq.is_empty()) + .collect(), + } +} + +/// Get properties of a MultiQueue within a MultiDevice. +#[must_use] +fn mq_discover(path: &Path) -> MultiQueue { + let mq_path = path.join("mq/"); + + let nr_requests_path = path.join("queue/nr_requests"); + + MultiQueue { nr_requests: read_to_string(&nr_requests_path) .ok() .as_deref() @@ -68,13 +123,14 @@ pub fn parallelism(path: &Path) -> Parallelism { .as_ref() .is_ok_and(FileType::is_dir) }) - .map(|dir| queue_parallelism(&dir.path())) - .collect(), + .map(|dir| queue_discover(&dir.path())) + .sorted_by_key(|mq| mq.id) + .collect::>(), } } -/// Get device queue characteristics by mq path on sysfs(5) -fn queue_parallelism(dir: &Path) -> Queue { +/// Get properties of a Queue within a MultiQueue. +fn queue_discover(dir: &Path) -> Queue { let queue_id = dir.file_name(); let nr_tags_path = dir.join("nr_tags"); diff --git a/src/database/pool.rs b/src/database/pool.rs index ae77c122..3ecb9036 100644 --- a/src/database/pool.rs +++ b/src/database/pool.rs @@ -19,7 +19,7 @@ use tuwunel_core::{ result::DebugInspect, smallvec::SmallVec, trace, - utils::sys::compute::{get_affinity, nth_core_available, set_affinity}, + utils::sys::compute::{get_affinity, set_affinity}, }; use self::configure::configure; @@ -76,10 +76,11 @@ const WORKER_NAME: &str = "tuwunel:db"; pub(crate) fn new(server: &Arc) -> Result> { const CHAN_SCHED: (QueueStrategy, QueueStrategy) = (QueueStrategy::Fifo, QueueStrategy::Lifo); - let (total_workers, queue_sizes, topology) = configure(server); + let (topology, workers, queues) = configure(server); - let (senders, receivers): (Vec<_>, Vec<_>) = queue_sizes + let (senders, receivers): (Vec<_>, Vec<_>) = queues .into_iter() + .map(|cap| cap.max(QUEUE_LIMIT.0)) .map(|cap| async_channel::bounded_with_queue_strategy(cap, CHAN_SCHED)) .unzip(); @@ -92,7 +93,9 @@ pub(crate) fn new(server: &Arc) -> Result> { queued_max: AtomicUsize::default(), }); - pool.spawn_until(&receivers, total_workers)?; + for (chan_id, &count) in workers.iter().enumerate() { + pool.spawn_group(&receivers, chan_id, count)?; + } Ok(pool) } @@ -157,10 +160,11 @@ pub(crate) fn close(&self) { } #[implement(Pool)] -fn spawn_until(self: &Arc, recv: &[Receiver], count: usize) -> Result { +fn spawn_group(self: &Arc, recv: &[Receiver], chan_id: usize, count: usize) -> Result { let mut workers = self.workers.lock().expect("locked"); - while workers.len() < count { - self.clone().spawn_one(&mut workers, recv)?; + for _ in 0..count { + self.clone() + .spawn_one(&mut workers, recv, chan_id)?; } Ok(()) @@ -177,18 +181,18 @@ fn spawn_one( self: Arc, workers: &mut Vec>, recv: &[Receiver], + chan_id: usize, ) -> Result { debug_assert!(!self.queues.is_empty(), "Must have at least one queue"); debug_assert!(!recv.is_empty(), "Must have at least one receiver"); let id = workers.len(); - let group = id.overflowing_rem(self.queues.len()).0; - let recv = recv[group].clone(); + let recv = recv[chan_id].clone(); let handle = thread::Builder::new() .name(WORKER_NAME.into()) .stack_size(WORKER_STACK_SIZE) - .spawn(move || self.worker(id, &recv))?; + .spawn(move || self.worker(id, chan_id, &recv))?; workers.push(handle); @@ -227,8 +231,12 @@ pub(crate) async fn execute_iter(self: &Arc, mut cmd: Seek) -> Result &Sender { - let core_id = get_affinity().next().unwrap_or(0); + let core_id = get_affinity() + .next() + .expect("Affinity mask should be available."); + let chan_id = self.topology[core_id]; + self.queues .get(chan_id) .unwrap_or_else(|| &self.queues[0]) @@ -262,33 +270,33 @@ async fn execute(&self, queue: &Sender, cmd: Cmd) -> Result { #[tracing::instrument( parent = None, level = "debug", - skip(self, recv), + skip_all, fields( - tid = ?thread::current().id(), + id, + chan_id, + thread_id = ?thread::current().id(), ), )] -fn worker(self: Arc, id: usize, recv: &Receiver) { - self.worker_init(id); +fn worker(self: Arc, id: usize, chan_id: usize, recv: &Receiver) { + self.worker_init(id, chan_id); self.worker_loop(recv); } #[implement(Pool)] -fn worker_init(&self, id: usize) { - let group = id.overflowing_rem(self.queues.len()).0; +fn worker_init(&self, id: usize, chan_id: usize) { let affinity = self .topology .iter() .enumerate() - .filter(|_| self.queues.len() > 1) .filter(|_| self.server.config.db_pool_affinity) - .filter_map(|(core_id, &queue_id)| (group == queue_id).then_some(core_id)) - .filter_map(nth_core_available); + .filter_map(|(core_id, &queue_id)| (chan_id == queue_id).then_some(core_id)); // affinity is empty (no-op) if there's only one queue set_affinity(affinity.clone()); trace!( - ?group, + ?id, + ?chan_id, affinity = ?affinity.collect::>(), "worker ready" ); diff --git a/src/database/pool/configure.rs b/src/database/pool/configure.rs index e5f158c2..2b058a53 100644 --- a/src/database/pool/configure.rs +++ b/src/database/pool/configure.rs @@ -1,157 +1,300 @@ use std::{path::PathBuf, sync::Arc}; use tuwunel_core::{ - Server, debug, debug_info, expected, is_equal_to, + Server, debug, + debug::INFO_SPAN_LEVEL, + debug_info, debug_warn, expected, info, is_equal_to, utils::{ + BoolExt, math::usize_from_f64, result::LogDebugErr, stream, stream::{AMPLIFICATION_LIMIT, WIDTH_LIMIT}, - sys::{compute::is_core_available, storage}, + sys::{ + compute::{available_parallelism, cores_available, is_core_available}, + storage, + }, }, }; use super::{QUEUE_LIMIT, WORKER_LIMIT}; -pub(super) fn configure(server: &Arc) -> (usize, Vec, Vec) { +/// Determine storage hardware capabilities of the system for configuring the +/// shape of the database frontend threadpool. +/// +/// Returns a tuple of: +/// - `topology` Vector mapping hardware cores to hardware queues. Systems with +/// fewer queues than cores will see queue ID's repeated. Systems with the +/// same or more queues as cores will usually see a 1:1 association of core +/// ID's to queue ID's. Systems with sparse core assignments will see 0 for +/// core ID positions not available to the process. Systems where detection +/// failed will see a default of 1:1 core identity as a best-guess maintaining +/// core locality. +/// - `workers` Vector mapping hardware queues to the number of threads to spawn +/// in service of that queue. Systems with fewer queues than cores will set an +/// affinity mask for each thread to multiple cores based on the topology. +/// Systems with equal or more hardware queue to cores will set a simple +/// affinity for each thread in a pool. +/// - `queues` Vector of software mpmc queues to create and the size of each +/// queue. Each indice is associated with a thread-pool of workers which it +/// feeds from requests from various tokio tasks. When this queue reaches +/// capacity the tokio task must yield. +#[tracing::instrument( + level = INFO_SPAN_LEVEL, + skip_all, + ret(level = "trace"), +)] +pub(super) fn configure(server: &Arc) -> (Vec, Vec, Vec) { let config = &server.config; + let num_cores = available_parallelism(); + + // Determine the maximum number of cores. The total number of cores available to + // the process may be less on systems with sparse core assignments, but this + // still serves as an upper-bound. + let cores_max = cores_available() + .last() + .unwrap_or(0) + .saturating_add(1); // This finds the block device and gathers all the properties we need. let path: PathBuf = config.database_path.clone(); let device_name = storage::name_from_path(&path) .log_debug_err() .ok(); - let device_prop = storage::parallelism(&path); + + let devices = storage::md_discover(&path); + let topology_detected = devices.md.is_empty().is_false(); + debug!(?topology_detected, ?device_name, ?devices); // The default worker count is masked-on if we didn't find better information. - let default_worker_count = device_prop - .mq - .is_empty() + let default_worker_count = topology_detected + .is_false() .then_some(config.db_pool_workers); - // Determine the worker groupings. Each indice represents a hardware queue and - // contains the number of workers which will service it. - let worker_counts: Vec<_> = device_prop - .mq + // Sum the total number of possible tags. When no hardware detected this will + // default to the default_worker_count. Note well that the thread-worker model + // we use will never approach actual NVMe capacity as with io_uring or even + // close to userspace drivers. We still take some cues from this value which + // does give us actual request capacity. + let total_tags = devices + .md .iter() + .flat_map(|md| md.mq.iter()) .filter(|mq| mq.cpu_list.iter().copied().any(is_core_available)) - .map(|mq| { - let shares = mq - .cpu_list - .iter() - .filter(|&&id| is_core_available(id)) - .count() - .max(1); - - let limit = config - .db_pool_workers_limit - .saturating_mul(shares); - - let limit = device_prop - .nr_requests - .map_or(limit, |nr| nr.min(limit)); - - mq.nr_tags.unwrap_or(WORKER_LIMIT.0).min(limit) - }) + .filter_map(|mq| mq.nr_tags) .chain(default_worker_count) - .collect(); + .fold(0_usize, usize::saturating_add); - // Determine our software queue size for each hardware queue. This is the mpmc - // between the tokio worker and the pool worker. - let queue_sizes: Vec<_> = worker_counts + // Determine the CPU affinities of each hardware queue. Each indice is a core + // and each value is the associated hardware queue. On systems which share + // queues between cores some values will be repeated; on systems with multiple + // queues per core the affinities are assumed to match and we don't require a + // vector of vectors. Sparse unavailable cores default to 0. Undetected hardware + // defaults to the core identity as a best-guess. + let topology: Vec = devices + .md .iter() - .map(|worker_count| { - worker_count - .saturating_mul(config.db_pool_queue_mult) - .clamp(QUEUE_LIMIT.0, QUEUE_LIMIT.1) - }) - .collect(); - - // Determine the CPU affinities of each hardware queue. Each indice is a cpu and - // each value is the associated hardware queue. There is a little shiftiness - // going on because cpu's which are not available to the process are filtered - // out, similar to the worker_counts. - let topology = device_prop - .mq - .iter() - .fold(vec![0; 128], |mut topology, mq| { + .flat_map(|md| md.mq.iter()) + .fold(vec![0; cores_max], |mut topology, mq| { mq.cpu_list .iter() + .filter(|&&id| id < cores_max) .filter(|&&id| is_core_available(id)) .for_each(|&id| { topology[id] = mq.id; }); topology - }); + }) + .into_iter() + .enumerate() + .map(|(core_id, queue_id)| { + topology_detected + .then_some(queue_id) + .unwrap_or(core_id) + }) + .collect(); - // Regardless of the capacity of all queues we establish some limit on the total - // number of workers; this is hopefully hinted by nr_requests. - let max_workers = device_prop - .mq + // Determine an ideal max worker count based on true capacity. As stated prior + // the true value is rarely attainable in any thread-worker model, and clamped. + let max_workers = devices + .md .iter() + .flat_map(|md| md.mq.iter()) .filter_map(|mq| mq.nr_tags) - .chain(default_worker_count) + .chain(default_worker_count.into_iter()) .fold(0_usize, usize::saturating_add) .clamp(WORKER_LIMIT.0, WORKER_LIMIT.1); - // Determine the final worker count which we'll be spawning. - let total_workers = worker_counts + // Tamper for the total number of workers by reducing the count for each group. + let chan_limit = expected!(max_workers / num_cores) + .saturating_sub(8) + .saturating_add(1) + .next_multiple_of(8); + + // Default workers vector without detection. + let default_workers = default_worker_count + .into_iter() + .cycle() + .enumerate() + .map(|(core_id, count)| { + is_core_available(core_id) + .then_some(count) + .unwrap_or(0) + .min(chan_limit) + }); + + // Determine the worker groupings. Each indice represents a hardware queue and + // contains the number of workers which will service it. This vector is + // truncated to the number of cores on systems which have multiple hardware + // queues per core. The number of workers is then truncated to a maximum for + // each pool; as stated prior, this will usually be less than NVMe capacity. + let workers: Vec = devices + .md .iter() - .sum::() - .clamp(WORKER_LIMIT.0, max_workers); + .inspect(|md| debug!(?md)) + .flat_map(|md| md.mq.iter()) + .map(|mq| { + let shares = mq + .cpu_list + .iter() + .filter(|&&id| is_core_available(id)) + .count(); + + let conf_limit = config + .db_pool_workers_limit + .saturating_mul(shares); + + let hard_limit = devices + .md + .iter() + .filter(|_| shares > 0) + .fold(0_usize, |acc, mq| { + mq.nr_requests + .map(|nr| nr.min(conf_limit)) + .or(Some(conf_limit)) + .map(|nr| acc.saturating_add(nr)) + .unwrap_or(acc) + }); + + let tags = mq + .nr_tags + .unwrap_or(WORKER_LIMIT.0) + .min(hard_limit) + .min(chan_limit); + + debug!(?mq, ?shares, ?tags, ?conf_limit, ?hard_limit, ?chan_limit); + + tags + }) + .chain(default_workers) + .take(topology.len()) + .collect(); + + // Determine our software queue size for each hardware queue. This is the mpmc + // between the tokio worker and the pool worker. + let queues: Vec = workers + .iter() + .map(|count| { + count + .saturating_mul(config.db_pool_queue_mult) + .min(QUEUE_LIMIT.1) + }) + .collect(); + + // Total number of workers to spawn. + let total_workers = workers.iter().sum::(); + + // Total capacity of all software qeueus. + let total_capacity = queues.iter().sum::(); + + // Discount queues with zero capacity for a proper denominator. + let num_queues = queues.iter().filter(|&&cap| cap > 0).count(); // After computing all of the above we can update the global automatic stream // width, hopefully with a better value tailored to this system. if config.stream_width_scale > 0.0 { - let num_queues = queue_sizes.len().max(1); - update_stream_width(server, num_queues, total_workers); + update_stream_width(server, num_queues, total_workers, total_capacity); } - debug_info!( - device_name = ?device_name - .as_deref() - .unwrap_or("None"), - ?worker_counts, - ?queue_sizes, - ?total_workers, - stream_width = ?stream::automatic_width(), - "Frontend topology", - ); + if topology_detected { + debug_info!(?num_cores, ?topology, ?workers, ?queues, "Frontend topology",); + info!( + device_name = ?device_name.as_deref().unwrap_or("None"), + ?num_queues, + ?total_workers, + ?total_tags, + ?total_capacity, + stream_width = ?stream::automatic_width(), + amplification = ?stream::automatic_amplification(), + "Frontend topology", + ); + } else { + debug_info!(?num_cores, ?topology, ?workers, ?queues, "Frontend topology (defaults)"); + debug_warn!( + device_name = ?device_name.as_deref().unwrap_or("None"), + ?total_workers, + ?total_capacity, + stream_width = ?stream::automatic_width(), + amplification = ?stream::automatic_amplification(), + "Storage hardware not detected for database directory; assuming defaults.", + ); + } assert!(total_workers > 0, "some workers expected"); - assert!(!queue_sizes.is_empty(), "some queues expected"); - assert!( - !queue_sizes.iter().copied().any(is_equal_to!(0)), - "positive queue sizes expected" + debug_assert!( + total_workers <= max_workers || !topology_detected, + "spawning too many workers" ); - (total_workers, queue_sizes, topology) + assert!(!queues.is_empty(), "some queues expected"); + assert!(!queues.iter().copied().all(is_equal_to!(0)), "positive queue capacity expected"); + + (topology, workers, queues) } #[allow(clippy::as_conversions, clippy::cast_precision_loss)] -fn update_stream_width(server: &Arc, num_queues: usize, total_workers: usize) { +fn update_stream_width( + server: &Arc, + num_queues: usize, + total_workers: usize, + _total_capacity: usize, +) { + assert!(num_queues > 0, "Expected at least one queue."); + assert!(total_workers > 0, "Expected some workers."); + let config = &server.config; let scale: f64 = config.stream_width_scale.min(100.0).into(); + let max_width = expected!(total_workers / num_queues); - let req_width = expected!(total_workers / num_queues).next_multiple_of(2); - let req_width = req_width as f64; - let req_width = usize_from_f64(req_width * scale) + let old_width = stream::automatic_width(); + let old_scale_width = expected!(old_width * num_queues); + + let new_scale = total_workers as f64 / old_scale_width as f64; + let new_scale = new_scale.clamp(1.0, 4.0); + let new_scale_width = new_scale * old_width as f64; + let new_scale_width = usize_from_f64(new_scale_width) .expect("failed to convert f64 to usize") + .next_multiple_of(8); + + let req_width = usize_from_f64(scale * new_scale_width as f64) + .expect("failed to convert f64 to usize") + .next_multiple_of(4) + .min(max_width) .clamp(WIDTH_LIMIT.0, WIDTH_LIMIT.1); - let req_amp = config.stream_amplification as f64; + let req_amp = new_scale * config.stream_amplification as f64; let req_amp = usize_from_f64(req_amp * scale) .expect("failed to convert f64 to usize") + .next_multiple_of(64) .clamp(AMPLIFICATION_LIMIT.0, AMPLIFICATION_LIMIT.1); let (old_width, new_width) = stream::set_width(req_width); let (old_amp, new_amp) = stream::set_amplification(req_amp); debug!( - scale = ?config.stream_width_scale, - ?num_queues, - ?req_width, + config_scale = ?config.stream_width_scale, ?old_width, + ?new_scale, ?new_width, ?old_amp, ?new_amp,