diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index 5e2afa77..b0d16b1e 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -870,10 +870,17 @@ pub struct Config { /// Maximum number of keys to request in each trusted server batch query. /// - /// default: 1024 + /// default: 192 #[serde(default = "default_trusted_server_batch_size")] pub trusted_server_batch_size: usize, + /// Maximum number of request batches in flight simultaneously when querying + /// a trusted server. + /// + /// default: 2 + #[serde(default = "default_trusted_server_batch_concurrency")] + pub trusted_server_batch_concurrency: usize, + /// Max log level for tuwunel. Allows debug, info, warn, or error. /// /// See also: @@ -3339,7 +3346,9 @@ fn parallelism_scaled_u32(val: u32) -> u32 { fn parallelism_scaled(val: usize) -> usize { val.saturating_mul(sys::available_parallelism()) } -fn default_trusted_server_batch_size() -> usize { 256 } +fn default_trusted_server_batch_size() -> usize { 192 } + +fn default_trusted_server_batch_concurrency() -> usize { 2 } fn default_db_pool_workers() -> usize { sys::available_parallelism() diff --git a/src/service/server_keys/request.rs b/src/service/server_keys/request.rs index b7dfa9b6..a3757d32 100644 --- a/src/service/server_keys/request.rs +++ b/src/service/server_keys/request.rs @@ -1,5 +1,6 @@ -use std::{collections::BTreeMap, fmt::Debug}; +use std::{collections::BTreeMap, convert::identity, fmt::Debug}; +use futures::{FutureExt, StreamExt, TryFutureExt}; use ruma::{ OwnedServerName, OwnedServerSigningKeyId, ServerName, ServerSigningKeyId, api::federation::discovery::{ @@ -8,7 +9,10 @@ use ruma::{ get_server_keys, }, }; -use tuwunel_core::{Err, Result, debug, implement, info}; +use tuwunel_core::{ + Err, Result, error, implement, info, trace, + utils::stream::{IterStream, ReadyExt, TryBroadbandExt, TryReadyExt}, +}; #[implement(super::Service)] pub(super) async fn batch_notary_request<'a, S, K>( @@ -36,7 +40,11 @@ where batch }); - let total_keys = server_keys.len(); + let total_keys = server_keys + .iter() + .flat_map(|(_, ids)| ids.iter()) + .count(); + debug_assert!(total_keys > 0, "empty batch request to notary"); let batch_max = self @@ -45,47 +53,86 @@ where .config .trusted_server_batch_size; - let mut results = Vec::new(); - while let Some(batch) = server_keys + let batch_concurrency = self + .services + .server + .config + .trusted_server_batch_concurrency; + + let batches: Vec<_> = server_keys .keys() .rev() - .take(batch_max) - .next_back() + .step_by(batch_max.saturating_sub(1)) + .skip(1) + .chain(server_keys.keys().next().into_iter()) .cloned() - { - let request = Request { - server_keys: server_keys.split_off(&batch), - }; + .collect(); - debug!( - ?notary, - ?batch, - remaining = %server_keys.len(), - requesting = ?request.server_keys.keys(), - "notary request" - ); + batches + .iter() + .stream() + .enumerate() + .map(|(i, batch)| { + let request = Request { + server_keys: server_keys.split_off(batch), + }; - let response = self - .services - .federation - .execute_synapse(notary, request) - .await? - .server_keys - .into_iter() - .map(|key| key.deserialize()) - .filter_map(Result::ok); + if request.server_keys.is_empty() { + return None; + } - results.extend(response); + trace!( + %i, %notary, ?batch, + remaining = ?server_keys, + requesting = ?request.server_keys.keys(), + "Request to notary server." + ); - info!( - "obtained {0} of {1} results with {2} more to request", - results.len(), - total_keys, - server_keys.len(), - ); - } + info!( + %notary, + remaining = %server_keys.len(), + requesting = %request.server_keys.len(), + "Sending request to notary server..." + ); - Ok(results) + Some(Ok(request)) + }) + .ready_filter_map(identity) + .broadn_and_then(batch_concurrency, |request| { + self.services + .federation + .execute_synapse(notary, request) + }) + .ready_try_fold(Vec::new(), |mut results, response| { + let response = response + .server_keys + .into_iter() + .map(|key| key.deserialize()) + .filter_map(Result::ok); + + trace!( + %notary, ?response, + "Response from notary server." + ); + + results.extend(response); + + info!( + "Received {0} keys out of {1} from notary server so far...", + results.len(), + total_keys, + ); + + Ok(results) + }) + .inspect_err(|e| { + error!( + ?notary, %batch_max, %batch_concurrency, %total_keys, + "Requesting keys from notary server failed: {e}", + ); + }) + .boxed() + .await } #[implement(super::Service)] diff --git a/tuwunel-example.toml b/tuwunel-example.toml index 24cb35f3..ac5a4011 100644 --- a/tuwunel-example.toml +++ b/tuwunel-example.toml @@ -720,7 +720,12 @@ # Maximum number of keys to request in each trusted server batch query. # -#trusted_server_batch_size = 1024 +#trusted_server_batch_size = 192 + +# Maximum number of request batches in flight simultaneously when querying +# a trusted server. +# +#trusted_server_batch_concurrency = 2 # Max log level for tuwunel. Allows debug, info, warn, or error. #