Reapply "Support mdraid hierarchies for storage topology detection."
This reverts commit 121aa9e39d.
Fixes panics on systems with sparse core topologies.
This commit is contained in:
@@ -5,7 +5,7 @@ use std::path::PathBuf;
|
|||||||
|
|
||||||
pub use compute::available_parallelism;
|
pub use compute::available_parallelism;
|
||||||
|
|
||||||
use crate::{Result, debug};
|
use crate::{Result, at, debug};
|
||||||
|
|
||||||
/// This is needed for opening lots of file descriptors, which tends to
|
/// This is needed for opening lots of file descriptors, which tends to
|
||||||
/// happen more often when using RocksDB and making lots of federation
|
/// happen more often when using RocksDB and making lots of federation
|
||||||
@@ -51,3 +51,15 @@ pub fn current_exe_deleted() -> bool {
|
|||||||
.is_some_and(|exe| exe.ends_with(" (deleted)"))
|
.is_some_and(|exe| exe.ends_with(" (deleted)"))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Parse the `KEY=VALUE` contents of a `uevent` file searching for `key` and
|
||||||
|
/// returning the `value`.
|
||||||
|
#[inline]
|
||||||
|
#[must_use]
|
||||||
|
pub fn uevent_find<'a>(uevent: &'a str, key: &'a str) -> Option<&'a str> {
|
||||||
|
uevent
|
||||||
|
.lines()
|
||||||
|
.filter_map(|line| line.split_once('='))
|
||||||
|
.find(|&(key_, _)| key.eq(key_))
|
||||||
|
.map(at!(1))
|
||||||
|
}
|
||||||
|
|||||||
@@ -9,7 +9,11 @@ type Id = usize;
|
|||||||
type Mask = u128;
|
type Mask = u128;
|
||||||
type Masks = [Mask; MASK_BITS];
|
type Masks = [Mask; MASK_BITS];
|
||||||
|
|
||||||
const MASK_BITS: usize = 128;
|
const MASK_BITS: usize = CORES_MAX;
|
||||||
|
|
||||||
|
/// Maximum number of cores we support; for now limited to bits of our mask
|
||||||
|
/// integral.
|
||||||
|
pub const CORES_MAX: usize = 128;
|
||||||
|
|
||||||
/// The mask of logical cores available to the process (at startup).
|
/// The mask of logical cores available to the process (at startup).
|
||||||
static CORES_AVAILABLE: LazyLock<Mask> = LazyLock::new(|| into_mask(query_cores_available()));
|
static CORES_AVAILABLE: LazyLock<Mask> = LazyLock::new(|| into_mask(query_cores_available()));
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ use std::{
|
|||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use itertools::Itertools;
|
||||||
use libc::dev_t;
|
use libc::dev_t;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
@@ -16,9 +17,22 @@ use crate::{
|
|||||||
utils::{result::LogDebugErr, string::SplitInfallible},
|
utils::{result::LogDebugErr, string::SplitInfallible},
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Device characteristics useful for random access throughput
|
/// Multi-Device (md) i.e. software raid properties.
|
||||||
#[derive(Clone, Debug, Default)]
|
#[derive(Clone, Debug, Default)]
|
||||||
pub struct Parallelism {
|
pub struct MultiDevice {
|
||||||
|
/// Type of raid (i.e. `raid1`); None if no raid present or detected.
|
||||||
|
pub level: Option<String>,
|
||||||
|
|
||||||
|
/// Number of participating devices.
|
||||||
|
pub raid_disks: usize,
|
||||||
|
|
||||||
|
/// The MQ's discovered on the devices; or empty.
|
||||||
|
pub md: Vec<MultiQueue>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Multi-Queue (mq) characteristics.
|
||||||
|
#[derive(Clone, Debug, Default)]
|
||||||
|
pub struct MultiQueue {
|
||||||
/// Number of requests for the device.
|
/// Number of requests for the device.
|
||||||
pub nr_requests: Option<usize>,
|
pub nr_requests: Option<usize>,
|
||||||
|
|
||||||
@@ -26,7 +40,7 @@ pub struct Parallelism {
|
|||||||
pub mq: Vec<Queue>,
|
pub mq: Vec<Queue>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Device queue characteristics
|
/// Single-queue characteristics
|
||||||
#[derive(Clone, Debug, Default)]
|
#[derive(Clone, Debug, Default)]
|
||||||
pub struct Queue {
|
pub struct Queue {
|
||||||
/// Queue's indice.
|
/// Queue's indice.
|
||||||
@@ -39,18 +53,59 @@ pub struct Queue {
|
|||||||
pub cpu_list: Vec<usize>,
|
pub cpu_list: Vec<usize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get device characteristics useful for random access throughput by name.
|
/// Get properties of a MultiDevice (md) storage system
|
||||||
#[must_use]
|
#[must_use]
|
||||||
pub fn parallelism(path: &Path) -> Parallelism {
|
pub fn md_discover(path: &Path) -> MultiDevice {
|
||||||
let dev_id = dev_from_path(path)
|
let dev_id = dev_from_path(path)
|
||||||
.log_debug_err()
|
.log_debug_err()
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
let mq_path = block_path(dev_id).join("mq/");
|
let md_path = block_path(dev_id).join("md/");
|
||||||
|
|
||||||
let nr_requests_path = block_path(dev_id).join("queue/nr_requests");
|
let raid_disks_path = md_path.join("raid_disks");
|
||||||
|
|
||||||
Parallelism {
|
let raid_disks: usize = read_to_string(&raid_disks_path)
|
||||||
|
.ok()
|
||||||
|
.as_deref()
|
||||||
|
.map(str::trim)
|
||||||
|
.map(str::parse)
|
||||||
|
.flat_ok()
|
||||||
|
.unwrap_or(0);
|
||||||
|
|
||||||
|
let single_fallback = raid_disks.eq(&0).then(|| block_path(dev_id));
|
||||||
|
|
||||||
|
MultiDevice {
|
||||||
|
raid_disks,
|
||||||
|
|
||||||
|
level: read_to_string(md_path.join("level"))
|
||||||
|
.ok()
|
||||||
|
.as_deref()
|
||||||
|
.map(str::trim)
|
||||||
|
.map(ToOwned::to_owned),
|
||||||
|
|
||||||
|
md: (0..raid_disks)
|
||||||
|
.map(|i| format!("rd{i}/block"))
|
||||||
|
.map(|path| md_path.join(&path))
|
||||||
|
.filter_map(|ref path| path.canonicalize().ok())
|
||||||
|
.map(|mut path| {
|
||||||
|
path.pop();
|
||||||
|
path
|
||||||
|
})
|
||||||
|
.chain(single_fallback)
|
||||||
|
.map(|path| mq_discover(&path))
|
||||||
|
.filter(|mq| !mq.mq.is_empty())
|
||||||
|
.collect(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get properties of a MultiQueue within a MultiDevice.
|
||||||
|
#[must_use]
|
||||||
|
fn mq_discover(path: &Path) -> MultiQueue {
|
||||||
|
let mq_path = path.join("mq/");
|
||||||
|
|
||||||
|
let nr_requests_path = path.join("queue/nr_requests");
|
||||||
|
|
||||||
|
MultiQueue {
|
||||||
nr_requests: read_to_string(&nr_requests_path)
|
nr_requests: read_to_string(&nr_requests_path)
|
||||||
.ok()
|
.ok()
|
||||||
.as_deref()
|
.as_deref()
|
||||||
@@ -68,13 +123,14 @@ pub fn parallelism(path: &Path) -> Parallelism {
|
|||||||
.as_ref()
|
.as_ref()
|
||||||
.is_ok_and(FileType::is_dir)
|
.is_ok_and(FileType::is_dir)
|
||||||
})
|
})
|
||||||
.map(|dir| queue_parallelism(&dir.path()))
|
.map(|dir| queue_discover(&dir.path()))
|
||||||
.collect(),
|
.sorted_by_key(|mq| mq.id)
|
||||||
|
.collect::<Vec<_>>(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get device queue characteristics by mq path on sysfs(5)
|
/// Get properties of a Queue within a MultiQueue.
|
||||||
fn queue_parallelism(dir: &Path) -> Queue {
|
fn queue_discover(dir: &Path) -> Queue {
|
||||||
let queue_id = dir.file_name();
|
let queue_id = dir.file_name();
|
||||||
|
|
||||||
let nr_tags_path = dir.join("nr_tags");
|
let nr_tags_path = dir.join("nr_tags");
|
||||||
|
|||||||
@@ -19,7 +19,7 @@ use tuwunel_core::{
|
|||||||
result::DebugInspect,
|
result::DebugInspect,
|
||||||
smallvec::SmallVec,
|
smallvec::SmallVec,
|
||||||
trace,
|
trace,
|
||||||
utils::sys::compute::{get_affinity, nth_core_available, set_affinity},
|
utils::sys::compute::{get_affinity, set_affinity},
|
||||||
};
|
};
|
||||||
|
|
||||||
use self::configure::configure;
|
use self::configure::configure;
|
||||||
@@ -76,10 +76,11 @@ const WORKER_NAME: &str = "tuwunel:db";
|
|||||||
pub(crate) fn new(server: &Arc<Server>) -> Result<Arc<Self>> {
|
pub(crate) fn new(server: &Arc<Server>) -> Result<Arc<Self>> {
|
||||||
const CHAN_SCHED: (QueueStrategy, QueueStrategy) = (QueueStrategy::Fifo, QueueStrategy::Lifo);
|
const CHAN_SCHED: (QueueStrategy, QueueStrategy) = (QueueStrategy::Fifo, QueueStrategy::Lifo);
|
||||||
|
|
||||||
let (total_workers, queue_sizes, topology) = configure(server);
|
let (topology, workers, queues) = configure(server);
|
||||||
|
|
||||||
let (senders, receivers): (Vec<_>, Vec<_>) = queue_sizes
|
let (senders, receivers): (Vec<_>, Vec<_>) = queues
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
.map(|cap| cap.max(QUEUE_LIMIT.0))
|
||||||
.map(|cap| async_channel::bounded_with_queue_strategy(cap, CHAN_SCHED))
|
.map(|cap| async_channel::bounded_with_queue_strategy(cap, CHAN_SCHED))
|
||||||
.unzip();
|
.unzip();
|
||||||
|
|
||||||
@@ -92,7 +93,9 @@ pub(crate) fn new(server: &Arc<Server>) -> Result<Arc<Self>> {
|
|||||||
queued_max: AtomicUsize::default(),
|
queued_max: AtomicUsize::default(),
|
||||||
});
|
});
|
||||||
|
|
||||||
pool.spawn_until(&receivers, total_workers)?;
|
for (chan_id, &count) in workers.iter().enumerate() {
|
||||||
|
pool.spawn_group(&receivers, chan_id, count)?;
|
||||||
|
}
|
||||||
|
|
||||||
Ok(pool)
|
Ok(pool)
|
||||||
}
|
}
|
||||||
@@ -157,10 +160,11 @@ pub(crate) fn close(&self) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[implement(Pool)]
|
#[implement(Pool)]
|
||||||
fn spawn_until(self: &Arc<Self>, recv: &[Receiver<Cmd>], count: usize) -> Result {
|
fn spawn_group(self: &Arc<Self>, recv: &[Receiver<Cmd>], chan_id: usize, count: usize) -> Result {
|
||||||
let mut workers = self.workers.lock().expect("locked");
|
let mut workers = self.workers.lock().expect("locked");
|
||||||
while workers.len() < count {
|
for _ in 0..count {
|
||||||
self.clone().spawn_one(&mut workers, recv)?;
|
self.clone()
|
||||||
|
.spawn_one(&mut workers, recv, chan_id)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@@ -177,18 +181,18 @@ fn spawn_one(
|
|||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
workers: &mut Vec<JoinHandle<()>>,
|
workers: &mut Vec<JoinHandle<()>>,
|
||||||
recv: &[Receiver<Cmd>],
|
recv: &[Receiver<Cmd>],
|
||||||
|
chan_id: usize,
|
||||||
) -> Result {
|
) -> Result {
|
||||||
debug_assert!(!self.queues.is_empty(), "Must have at least one queue");
|
debug_assert!(!self.queues.is_empty(), "Must have at least one queue");
|
||||||
debug_assert!(!recv.is_empty(), "Must have at least one receiver");
|
debug_assert!(!recv.is_empty(), "Must have at least one receiver");
|
||||||
|
|
||||||
let id = workers.len();
|
let id = workers.len();
|
||||||
let group = id.overflowing_rem(self.queues.len()).0;
|
let recv = recv[chan_id].clone();
|
||||||
let recv = recv[group].clone();
|
|
||||||
|
|
||||||
let handle = thread::Builder::new()
|
let handle = thread::Builder::new()
|
||||||
.name(WORKER_NAME.into())
|
.name(WORKER_NAME.into())
|
||||||
.stack_size(WORKER_STACK_SIZE)
|
.stack_size(WORKER_STACK_SIZE)
|
||||||
.spawn(move || self.worker(id, &recv))?;
|
.spawn(move || self.worker(id, chan_id, &recv))?;
|
||||||
|
|
||||||
workers.push(handle);
|
workers.push(handle);
|
||||||
|
|
||||||
@@ -227,8 +231,12 @@ pub(crate) async fn execute_iter(self: &Arc<Self>, mut cmd: Seek) -> Result<stre
|
|||||||
|
|
||||||
#[implement(Pool)]
|
#[implement(Pool)]
|
||||||
fn select_queue(&self) -> &Sender<Cmd> {
|
fn select_queue(&self) -> &Sender<Cmd> {
|
||||||
let core_id = get_affinity().next().unwrap_or(0);
|
let core_id = get_affinity()
|
||||||
|
.next()
|
||||||
|
.expect("Affinity mask should be available.");
|
||||||
|
|
||||||
let chan_id = self.topology[core_id];
|
let chan_id = self.topology[core_id];
|
||||||
|
|
||||||
self.queues
|
self.queues
|
||||||
.get(chan_id)
|
.get(chan_id)
|
||||||
.unwrap_or_else(|| &self.queues[0])
|
.unwrap_or_else(|| &self.queues[0])
|
||||||
@@ -262,33 +270,33 @@ async fn execute(&self, queue: &Sender<Cmd>, cmd: Cmd) -> Result {
|
|||||||
#[tracing::instrument(
|
#[tracing::instrument(
|
||||||
parent = None,
|
parent = None,
|
||||||
level = "debug",
|
level = "debug",
|
||||||
skip(self, recv),
|
skip_all,
|
||||||
fields(
|
fields(
|
||||||
tid = ?thread::current().id(),
|
id,
|
||||||
|
chan_id,
|
||||||
|
thread_id = ?thread::current().id(),
|
||||||
),
|
),
|
||||||
)]
|
)]
|
||||||
fn worker(self: Arc<Self>, id: usize, recv: &Receiver<Cmd>) {
|
fn worker(self: Arc<Self>, id: usize, chan_id: usize, recv: &Receiver<Cmd>) {
|
||||||
self.worker_init(id);
|
self.worker_init(id, chan_id);
|
||||||
self.worker_loop(recv);
|
self.worker_loop(recv);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[implement(Pool)]
|
#[implement(Pool)]
|
||||||
fn worker_init(&self, id: usize) {
|
fn worker_init(&self, id: usize, chan_id: usize) {
|
||||||
let group = id.overflowing_rem(self.queues.len()).0;
|
|
||||||
let affinity = self
|
let affinity = self
|
||||||
.topology
|
.topology
|
||||||
.iter()
|
.iter()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.filter(|_| self.queues.len() > 1)
|
|
||||||
.filter(|_| self.server.config.db_pool_affinity)
|
.filter(|_| self.server.config.db_pool_affinity)
|
||||||
.filter_map(|(core_id, &queue_id)| (group == queue_id).then_some(core_id))
|
.filter_map(|(core_id, &queue_id)| (chan_id == queue_id).then_some(core_id));
|
||||||
.filter_map(nth_core_available);
|
|
||||||
|
|
||||||
// affinity is empty (no-op) if there's only one queue
|
// affinity is empty (no-op) if there's only one queue
|
||||||
set_affinity(affinity.clone());
|
set_affinity(affinity.clone());
|
||||||
|
|
||||||
trace!(
|
trace!(
|
||||||
?group,
|
?id,
|
||||||
|
?chan_id,
|
||||||
affinity = ?affinity.collect::<Vec<_>>(),
|
affinity = ?affinity.collect::<Vec<_>>(),
|
||||||
"worker ready"
|
"worker ready"
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -1,157 +1,300 @@
|
|||||||
use std::{path::PathBuf, sync::Arc};
|
use std::{path::PathBuf, sync::Arc};
|
||||||
|
|
||||||
use tuwunel_core::{
|
use tuwunel_core::{
|
||||||
Server, debug, debug_info, expected, is_equal_to,
|
Server, debug,
|
||||||
|
debug::INFO_SPAN_LEVEL,
|
||||||
|
debug_info, debug_warn, expected, info, is_equal_to,
|
||||||
utils::{
|
utils::{
|
||||||
|
BoolExt,
|
||||||
math::usize_from_f64,
|
math::usize_from_f64,
|
||||||
result::LogDebugErr,
|
result::LogDebugErr,
|
||||||
stream,
|
stream,
|
||||||
stream::{AMPLIFICATION_LIMIT, WIDTH_LIMIT},
|
stream::{AMPLIFICATION_LIMIT, WIDTH_LIMIT},
|
||||||
sys::{compute::is_core_available, storage},
|
sys::{
|
||||||
|
compute::{available_parallelism, cores_available, is_core_available},
|
||||||
|
storage,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::{QUEUE_LIMIT, WORKER_LIMIT};
|
use super::{QUEUE_LIMIT, WORKER_LIMIT};
|
||||||
|
|
||||||
pub(super) fn configure(server: &Arc<Server>) -> (usize, Vec<usize>, Vec<usize>) {
|
/// Determine storage hardware capabilities of the system for configuring the
|
||||||
|
/// shape of the database frontend threadpool.
|
||||||
|
///
|
||||||
|
/// Returns a tuple of:
|
||||||
|
/// - `topology` Vector mapping hardware cores to hardware queues. Systems with
|
||||||
|
/// fewer queues than cores will see queue ID's repeated. Systems with the
|
||||||
|
/// same or more queues as cores will usually see a 1:1 association of core
|
||||||
|
/// ID's to queue ID's. Systems with sparse core assignments will see 0 for
|
||||||
|
/// core ID positions not available to the process. Systems where detection
|
||||||
|
/// failed will see a default of 1:1 core identity as a best-guess maintaining
|
||||||
|
/// core locality.
|
||||||
|
/// - `workers` Vector mapping hardware queues to the number of threads to spawn
|
||||||
|
/// in service of that queue. Systems with fewer queues than cores will set an
|
||||||
|
/// affinity mask for each thread to multiple cores based on the topology.
|
||||||
|
/// Systems with equal or more hardware queue to cores will set a simple
|
||||||
|
/// affinity for each thread in a pool.
|
||||||
|
/// - `queues` Vector of software mpmc queues to create and the size of each
|
||||||
|
/// queue. Each indice is associated with a thread-pool of workers which it
|
||||||
|
/// feeds from requests from various tokio tasks. When this queue reaches
|
||||||
|
/// capacity the tokio task must yield.
|
||||||
|
#[tracing::instrument(
|
||||||
|
level = INFO_SPAN_LEVEL,
|
||||||
|
skip_all,
|
||||||
|
ret(level = "trace"),
|
||||||
|
)]
|
||||||
|
pub(super) fn configure(server: &Arc<Server>) -> (Vec<usize>, Vec<usize>, Vec<usize>) {
|
||||||
let config = &server.config;
|
let config = &server.config;
|
||||||
|
let num_cores = available_parallelism();
|
||||||
|
|
||||||
|
// Determine the maximum number of cores. The total number of cores available to
|
||||||
|
// the process may be less on systems with sparse core assignments, but this
|
||||||
|
// still serves as an upper-bound.
|
||||||
|
let cores_max = cores_available()
|
||||||
|
.last()
|
||||||
|
.unwrap_or(0)
|
||||||
|
.saturating_add(1);
|
||||||
|
|
||||||
// This finds the block device and gathers all the properties we need.
|
// This finds the block device and gathers all the properties we need.
|
||||||
let path: PathBuf = config.database_path.clone();
|
let path: PathBuf = config.database_path.clone();
|
||||||
let device_name = storage::name_from_path(&path)
|
let device_name = storage::name_from_path(&path)
|
||||||
.log_debug_err()
|
.log_debug_err()
|
||||||
.ok();
|
.ok();
|
||||||
let device_prop = storage::parallelism(&path);
|
|
||||||
|
let devices = storage::md_discover(&path);
|
||||||
|
let topology_detected = devices.md.is_empty().is_false();
|
||||||
|
debug!(?topology_detected, ?device_name, ?devices);
|
||||||
|
|
||||||
// The default worker count is masked-on if we didn't find better information.
|
// The default worker count is masked-on if we didn't find better information.
|
||||||
let default_worker_count = device_prop
|
let default_worker_count = topology_detected
|
||||||
.mq
|
.is_false()
|
||||||
.is_empty()
|
|
||||||
.then_some(config.db_pool_workers);
|
.then_some(config.db_pool_workers);
|
||||||
|
|
||||||
// Determine the worker groupings. Each indice represents a hardware queue and
|
// Sum the total number of possible tags. When no hardware detected this will
|
||||||
// contains the number of workers which will service it.
|
// default to the default_worker_count. Note well that the thread-worker model
|
||||||
let worker_counts: Vec<_> = device_prop
|
// we use will never approach actual NVMe capacity as with io_uring or even
|
||||||
.mq
|
// close to userspace drivers. We still take some cues from this value which
|
||||||
|
// does give us actual request capacity.
|
||||||
|
let total_tags = devices
|
||||||
|
.md
|
||||||
.iter()
|
.iter()
|
||||||
|
.flat_map(|md| md.mq.iter())
|
||||||
.filter(|mq| mq.cpu_list.iter().copied().any(is_core_available))
|
.filter(|mq| mq.cpu_list.iter().copied().any(is_core_available))
|
||||||
.map(|mq| {
|
.filter_map(|mq| mq.nr_tags)
|
||||||
let shares = mq
|
|
||||||
.cpu_list
|
|
||||||
.iter()
|
|
||||||
.filter(|&&id| is_core_available(id))
|
|
||||||
.count()
|
|
||||||
.max(1);
|
|
||||||
|
|
||||||
let limit = config
|
|
||||||
.db_pool_workers_limit
|
|
||||||
.saturating_mul(shares);
|
|
||||||
|
|
||||||
let limit = device_prop
|
|
||||||
.nr_requests
|
|
||||||
.map_or(limit, |nr| nr.min(limit));
|
|
||||||
|
|
||||||
mq.nr_tags.unwrap_or(WORKER_LIMIT.0).min(limit)
|
|
||||||
})
|
|
||||||
.chain(default_worker_count)
|
.chain(default_worker_count)
|
||||||
.collect();
|
.fold(0_usize, usize::saturating_add);
|
||||||
|
|
||||||
// Determine our software queue size for each hardware queue. This is the mpmc
|
// Determine the CPU affinities of each hardware queue. Each indice is a core
|
||||||
// between the tokio worker and the pool worker.
|
// and each value is the associated hardware queue. On systems which share
|
||||||
let queue_sizes: Vec<_> = worker_counts
|
// queues between cores some values will be repeated; on systems with multiple
|
||||||
|
// queues per core the affinities are assumed to match and we don't require a
|
||||||
|
// vector of vectors. Sparse unavailable cores default to 0. Undetected hardware
|
||||||
|
// defaults to the core identity as a best-guess.
|
||||||
|
let topology: Vec<usize> = devices
|
||||||
|
.md
|
||||||
.iter()
|
.iter()
|
||||||
.map(|worker_count| {
|
.flat_map(|md| md.mq.iter())
|
||||||
worker_count
|
.fold(vec![0; cores_max], |mut topology, mq| {
|
||||||
.saturating_mul(config.db_pool_queue_mult)
|
|
||||||
.clamp(QUEUE_LIMIT.0, QUEUE_LIMIT.1)
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
// Determine the CPU affinities of each hardware queue. Each indice is a cpu and
|
|
||||||
// each value is the associated hardware queue. There is a little shiftiness
|
|
||||||
// going on because cpu's which are not available to the process are filtered
|
|
||||||
// out, similar to the worker_counts.
|
|
||||||
let topology = device_prop
|
|
||||||
.mq
|
|
||||||
.iter()
|
|
||||||
.fold(vec![0; 128], |mut topology, mq| {
|
|
||||||
mq.cpu_list
|
mq.cpu_list
|
||||||
.iter()
|
.iter()
|
||||||
|
.filter(|&&id| id < cores_max)
|
||||||
.filter(|&&id| is_core_available(id))
|
.filter(|&&id| is_core_available(id))
|
||||||
.for_each(|&id| {
|
.for_each(|&id| {
|
||||||
topology[id] = mq.id;
|
topology[id] = mq.id;
|
||||||
});
|
});
|
||||||
|
|
||||||
topology
|
topology
|
||||||
});
|
})
|
||||||
|
.into_iter()
|
||||||
|
.enumerate()
|
||||||
|
.map(|(core_id, queue_id)| {
|
||||||
|
topology_detected
|
||||||
|
.then_some(queue_id)
|
||||||
|
.unwrap_or(core_id)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
// Regardless of the capacity of all queues we establish some limit on the total
|
// Determine an ideal max worker count based on true capacity. As stated prior
|
||||||
// number of workers; this is hopefully hinted by nr_requests.
|
// the true value is rarely attainable in any thread-worker model, and clamped.
|
||||||
let max_workers = device_prop
|
let max_workers = devices
|
||||||
.mq
|
.md
|
||||||
.iter()
|
.iter()
|
||||||
|
.flat_map(|md| md.mq.iter())
|
||||||
.filter_map(|mq| mq.nr_tags)
|
.filter_map(|mq| mq.nr_tags)
|
||||||
.chain(default_worker_count)
|
.chain(default_worker_count.into_iter())
|
||||||
.fold(0_usize, usize::saturating_add)
|
.fold(0_usize, usize::saturating_add)
|
||||||
.clamp(WORKER_LIMIT.0, WORKER_LIMIT.1);
|
.clamp(WORKER_LIMIT.0, WORKER_LIMIT.1);
|
||||||
|
|
||||||
// Determine the final worker count which we'll be spawning.
|
// Tamper for the total number of workers by reducing the count for each group.
|
||||||
let total_workers = worker_counts
|
let chan_limit = expected!(max_workers / num_cores)
|
||||||
|
.saturating_sub(8)
|
||||||
|
.saturating_add(1)
|
||||||
|
.next_multiple_of(8);
|
||||||
|
|
||||||
|
// Default workers vector without detection.
|
||||||
|
let default_workers = default_worker_count
|
||||||
|
.into_iter()
|
||||||
|
.cycle()
|
||||||
|
.enumerate()
|
||||||
|
.map(|(core_id, count)| {
|
||||||
|
is_core_available(core_id)
|
||||||
|
.then_some(count)
|
||||||
|
.unwrap_or(0)
|
||||||
|
.min(chan_limit)
|
||||||
|
});
|
||||||
|
|
||||||
|
// Determine the worker groupings. Each indice represents a hardware queue and
|
||||||
|
// contains the number of workers which will service it. This vector is
|
||||||
|
// truncated to the number of cores on systems which have multiple hardware
|
||||||
|
// queues per core. The number of workers is then truncated to a maximum for
|
||||||
|
// each pool; as stated prior, this will usually be less than NVMe capacity.
|
||||||
|
let workers: Vec<usize> = devices
|
||||||
|
.md
|
||||||
.iter()
|
.iter()
|
||||||
.sum::<usize>()
|
.inspect(|md| debug!(?md))
|
||||||
.clamp(WORKER_LIMIT.0, max_workers);
|
.flat_map(|md| md.mq.iter())
|
||||||
|
.map(|mq| {
|
||||||
|
let shares = mq
|
||||||
|
.cpu_list
|
||||||
|
.iter()
|
||||||
|
.filter(|&&id| is_core_available(id))
|
||||||
|
.count();
|
||||||
|
|
||||||
|
let conf_limit = config
|
||||||
|
.db_pool_workers_limit
|
||||||
|
.saturating_mul(shares);
|
||||||
|
|
||||||
|
let hard_limit = devices
|
||||||
|
.md
|
||||||
|
.iter()
|
||||||
|
.filter(|_| shares > 0)
|
||||||
|
.fold(0_usize, |acc, mq| {
|
||||||
|
mq.nr_requests
|
||||||
|
.map(|nr| nr.min(conf_limit))
|
||||||
|
.or(Some(conf_limit))
|
||||||
|
.map(|nr| acc.saturating_add(nr))
|
||||||
|
.unwrap_or(acc)
|
||||||
|
});
|
||||||
|
|
||||||
|
let tags = mq
|
||||||
|
.nr_tags
|
||||||
|
.unwrap_or(WORKER_LIMIT.0)
|
||||||
|
.min(hard_limit)
|
||||||
|
.min(chan_limit);
|
||||||
|
|
||||||
|
debug!(?mq, ?shares, ?tags, ?conf_limit, ?hard_limit, ?chan_limit);
|
||||||
|
|
||||||
|
tags
|
||||||
|
})
|
||||||
|
.chain(default_workers)
|
||||||
|
.take(topology.len())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// Determine our software queue size for each hardware queue. This is the mpmc
|
||||||
|
// between the tokio worker and the pool worker.
|
||||||
|
let queues: Vec<usize> = workers
|
||||||
|
.iter()
|
||||||
|
.map(|count| {
|
||||||
|
count
|
||||||
|
.saturating_mul(config.db_pool_queue_mult)
|
||||||
|
.min(QUEUE_LIMIT.1)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// Total number of workers to spawn.
|
||||||
|
let total_workers = workers.iter().sum::<usize>();
|
||||||
|
|
||||||
|
// Total capacity of all software qeueus.
|
||||||
|
let total_capacity = queues.iter().sum::<usize>();
|
||||||
|
|
||||||
|
// Discount queues with zero capacity for a proper denominator.
|
||||||
|
let num_queues = queues.iter().filter(|&&cap| cap > 0).count();
|
||||||
|
|
||||||
// After computing all of the above we can update the global automatic stream
|
// After computing all of the above we can update the global automatic stream
|
||||||
// width, hopefully with a better value tailored to this system.
|
// width, hopefully with a better value tailored to this system.
|
||||||
if config.stream_width_scale > 0.0 {
|
if config.stream_width_scale > 0.0 {
|
||||||
let num_queues = queue_sizes.len().max(1);
|
update_stream_width(server, num_queues, total_workers, total_capacity);
|
||||||
update_stream_width(server, num_queues, total_workers);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
debug_info!(
|
if topology_detected {
|
||||||
device_name = ?device_name
|
debug_info!(?num_cores, ?topology, ?workers, ?queues, "Frontend topology",);
|
||||||
.as_deref()
|
info!(
|
||||||
.unwrap_or("None"),
|
device_name = ?device_name.as_deref().unwrap_or("None"),
|
||||||
?worker_counts,
|
?num_queues,
|
||||||
?queue_sizes,
|
?total_workers,
|
||||||
?total_workers,
|
?total_tags,
|
||||||
stream_width = ?stream::automatic_width(),
|
?total_capacity,
|
||||||
"Frontend topology",
|
stream_width = ?stream::automatic_width(),
|
||||||
);
|
amplification = ?stream::automatic_amplification(),
|
||||||
|
"Frontend topology",
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
debug_info!(?num_cores, ?topology, ?workers, ?queues, "Frontend topology (defaults)");
|
||||||
|
debug_warn!(
|
||||||
|
device_name = ?device_name.as_deref().unwrap_or("None"),
|
||||||
|
?total_workers,
|
||||||
|
?total_capacity,
|
||||||
|
stream_width = ?stream::automatic_width(),
|
||||||
|
amplification = ?stream::automatic_amplification(),
|
||||||
|
"Storage hardware not detected for database directory; assuming defaults.",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
assert!(total_workers > 0, "some workers expected");
|
assert!(total_workers > 0, "some workers expected");
|
||||||
assert!(!queue_sizes.is_empty(), "some queues expected");
|
debug_assert!(
|
||||||
assert!(
|
total_workers <= max_workers || !topology_detected,
|
||||||
!queue_sizes.iter().copied().any(is_equal_to!(0)),
|
"spawning too many workers"
|
||||||
"positive queue sizes expected"
|
|
||||||
);
|
);
|
||||||
|
|
||||||
(total_workers, queue_sizes, topology)
|
assert!(!queues.is_empty(), "some queues expected");
|
||||||
|
assert!(!queues.iter().copied().all(is_equal_to!(0)), "positive queue capacity expected");
|
||||||
|
|
||||||
|
(topology, workers, queues)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[allow(clippy::as_conversions, clippy::cast_precision_loss)]
|
#[allow(clippy::as_conversions, clippy::cast_precision_loss)]
|
||||||
fn update_stream_width(server: &Arc<Server>, num_queues: usize, total_workers: usize) {
|
fn update_stream_width(
|
||||||
|
server: &Arc<Server>,
|
||||||
|
num_queues: usize,
|
||||||
|
total_workers: usize,
|
||||||
|
_total_capacity: usize,
|
||||||
|
) {
|
||||||
|
assert!(num_queues > 0, "Expected at least one queue.");
|
||||||
|
assert!(total_workers > 0, "Expected some workers.");
|
||||||
|
|
||||||
let config = &server.config;
|
let config = &server.config;
|
||||||
let scale: f64 = config.stream_width_scale.min(100.0).into();
|
let scale: f64 = config.stream_width_scale.min(100.0).into();
|
||||||
|
let max_width = expected!(total_workers / num_queues);
|
||||||
|
|
||||||
let req_width = expected!(total_workers / num_queues).next_multiple_of(2);
|
let old_width = stream::automatic_width();
|
||||||
let req_width = req_width as f64;
|
let old_scale_width = expected!(old_width * num_queues);
|
||||||
let req_width = usize_from_f64(req_width * scale)
|
|
||||||
|
let new_scale = total_workers as f64 / old_scale_width as f64;
|
||||||
|
let new_scale = new_scale.clamp(1.0, 4.0);
|
||||||
|
let new_scale_width = new_scale * old_width as f64;
|
||||||
|
let new_scale_width = usize_from_f64(new_scale_width)
|
||||||
.expect("failed to convert f64 to usize")
|
.expect("failed to convert f64 to usize")
|
||||||
|
.next_multiple_of(8);
|
||||||
|
|
||||||
|
let req_width = usize_from_f64(scale * new_scale_width as f64)
|
||||||
|
.expect("failed to convert f64 to usize")
|
||||||
|
.next_multiple_of(4)
|
||||||
|
.min(max_width)
|
||||||
.clamp(WIDTH_LIMIT.0, WIDTH_LIMIT.1);
|
.clamp(WIDTH_LIMIT.0, WIDTH_LIMIT.1);
|
||||||
|
|
||||||
let req_amp = config.stream_amplification as f64;
|
let req_amp = new_scale * config.stream_amplification as f64;
|
||||||
let req_amp = usize_from_f64(req_amp * scale)
|
let req_amp = usize_from_f64(req_amp * scale)
|
||||||
.expect("failed to convert f64 to usize")
|
.expect("failed to convert f64 to usize")
|
||||||
|
.next_multiple_of(64)
|
||||||
.clamp(AMPLIFICATION_LIMIT.0, AMPLIFICATION_LIMIT.1);
|
.clamp(AMPLIFICATION_LIMIT.0, AMPLIFICATION_LIMIT.1);
|
||||||
|
|
||||||
let (old_width, new_width) = stream::set_width(req_width);
|
let (old_width, new_width) = stream::set_width(req_width);
|
||||||
let (old_amp, new_amp) = stream::set_amplification(req_amp);
|
let (old_amp, new_amp) = stream::set_amplification(req_amp);
|
||||||
debug!(
|
debug!(
|
||||||
scale = ?config.stream_width_scale,
|
config_scale = ?config.stream_width_scale,
|
||||||
?num_queues,
|
|
||||||
?req_width,
|
|
||||||
?old_width,
|
?old_width,
|
||||||
|
?new_scale,
|
||||||
?new_width,
|
?new_width,
|
||||||
?old_amp,
|
?old_amp,
|
||||||
?new_amp,
|
?new_amp,
|
||||||
|
|||||||
Reference in New Issue
Block a user