Add configurable concurrent batch requests to notary.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2026-02-17 18:35:03 +00:00
parent 6f93436eff
commit 7df373524e
3 changed files with 100 additions and 39 deletions

View File

@@ -870,10 +870,17 @@ pub struct Config {
/// Maximum number of keys to request in each trusted server batch query. /// Maximum number of keys to request in each trusted server batch query.
/// ///
/// default: 1024 /// default: 192
#[serde(default = "default_trusted_server_batch_size")] #[serde(default = "default_trusted_server_batch_size")]
pub trusted_server_batch_size: usize, pub trusted_server_batch_size: usize,
/// Maximum number of request batches in flight simultaneously when querying
/// a trusted server.
///
/// default: 2
#[serde(default = "default_trusted_server_batch_concurrency")]
pub trusted_server_batch_concurrency: usize,
/// Max log level for tuwunel. Allows debug, info, warn, or error. /// Max log level for tuwunel. Allows debug, info, warn, or error.
/// ///
/// See also: /// See also:
@@ -3339,7 +3346,9 @@ fn parallelism_scaled_u32(val: u32) -> u32 {
fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_parallelism()) } fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_parallelism()) }
fn default_trusted_server_batch_size() -> usize { 256 } fn default_trusted_server_batch_size() -> usize { 192 }
fn default_trusted_server_batch_concurrency() -> usize { 2 }
fn default_db_pool_workers() -> usize { fn default_db_pool_workers() -> usize {
sys::available_parallelism() sys::available_parallelism()

View File

@@ -1,5 +1,6 @@
use std::{collections::BTreeMap, fmt::Debug}; use std::{collections::BTreeMap, convert::identity, fmt::Debug};
use futures::{FutureExt, StreamExt, TryFutureExt};
use ruma::{ use ruma::{
OwnedServerName, OwnedServerSigningKeyId, ServerName, ServerSigningKeyId, OwnedServerName, OwnedServerSigningKeyId, ServerName, ServerSigningKeyId,
api::federation::discovery::{ api::federation::discovery::{
@@ -8,7 +9,10 @@ use ruma::{
get_server_keys, get_server_keys,
}, },
}; };
use tuwunel_core::{Err, Result, debug, implement, info}; use tuwunel_core::{
Err, Result, error, implement, info, trace,
utils::stream::{IterStream, ReadyExt, TryBroadbandExt, TryReadyExt},
};
#[implement(super::Service)] #[implement(super::Service)]
pub(super) async fn batch_notary_request<'a, S, K>( pub(super) async fn batch_notary_request<'a, S, K>(
@@ -36,7 +40,11 @@ where
batch batch
}); });
let total_keys = server_keys.len(); let total_keys = server_keys
.iter()
.flat_map(|(_, ids)| ids.iter())
.count();
debug_assert!(total_keys > 0, "empty batch request to notary"); debug_assert!(total_keys > 0, "empty batch request to notary");
let batch_max = self let batch_max = self
@@ -45,47 +53,86 @@ where
.config .config
.trusted_server_batch_size; .trusted_server_batch_size;
let mut results = Vec::new(); let batch_concurrency = self
while let Some(batch) = server_keys .services
.server
.config
.trusted_server_batch_concurrency;
let batches: Vec<_> = server_keys
.keys() .keys()
.rev() .rev()
.take(batch_max) .step_by(batch_max.saturating_sub(1))
.next_back() .skip(1)
.chain(server_keys.keys().next().into_iter())
.cloned() .cloned()
{ .collect();
let request = Request {
server_keys: server_keys.split_off(&batch),
};
debug!( batches
?notary, .iter()
?batch, .stream()
remaining = %server_keys.len(), .enumerate()
requesting = ?request.server_keys.keys(), .map(|(i, batch)| {
"notary request" let request = Request {
); server_keys: server_keys.split_off(batch),
};
let response = self if request.server_keys.is_empty() {
.services return None;
.federation }
.execute_synapse(notary, request)
.await?
.server_keys
.into_iter()
.map(|key| key.deserialize())
.filter_map(Result::ok);
results.extend(response); trace!(
%i, %notary, ?batch,
remaining = ?server_keys,
requesting = ?request.server_keys.keys(),
"Request to notary server."
);
info!( info!(
"obtained {0} of {1} results with {2} more to request", %notary,
results.len(), remaining = %server_keys.len(),
total_keys, requesting = %request.server_keys.len(),
server_keys.len(), "Sending request to notary server..."
); );
}
Ok(results) Some(Ok(request))
})
.ready_filter_map(identity)
.broadn_and_then(batch_concurrency, |request| {
self.services
.federation
.execute_synapse(notary, request)
})
.ready_try_fold(Vec::new(), |mut results, response| {
let response = response
.server_keys
.into_iter()
.map(|key| key.deserialize())
.filter_map(Result::ok);
trace!(
%notary, ?response,
"Response from notary server."
);
results.extend(response);
info!(
"Received {0} keys out of {1} from notary server so far...",
results.len(),
total_keys,
);
Ok(results)
})
.inspect_err(|e| {
error!(
?notary, %batch_max, %batch_concurrency, %total_keys,
"Requesting keys from notary server failed: {e}",
);
})
.boxed()
.await
} }
#[implement(super::Service)] #[implement(super::Service)]

View File

@@ -720,7 +720,12 @@
# Maximum number of keys to request in each trusted server batch query. # Maximum number of keys to request in each trusted server batch query.
# #
#trusted_server_batch_size = 1024 #trusted_server_batch_size = 192
# Maximum number of request batches in flight simultaneously when querying
# a trusted server.
#
#trusted_server_batch_concurrency = 2
# Max log level for tuwunel. Allows debug, info, warn, or error. # Max log level for tuwunel. Allows debug, info, warn, or error.
# #