From 2b2c14513f9d4176719fac47f5f1e72c16ac64d6 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Tue, 3 Mar 2026 23:37:46 +0000 Subject: [PATCH] Constrain size of FuturesUnordered for conflicted-subgraph. Signed-off-by: Jason Volk --- .../state_res/resolve/conflicted_subgraph.rs | 47 +++++++++++++------ 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/src/service/rooms/state_res/resolve/conflicted_subgraph.rs b/src/service/rooms/state_res/resolve/conflicted_subgraph.rs index e47050a1..36a7b96c 100644 --- a/src/service/rooms/state_res/resolve/conflicted_subgraph.rs +++ b/src/service/rooms/state_res/resolve/conflicted_subgraph.rs @@ -13,7 +13,10 @@ use tuwunel_core::{ Result, implement, matrix::{Event, pdu::AuthEvents}, smallvec::SmallVec, - utils::{BoolExt, stream::IterStream}, + utils::{ + BoolExt, + stream::{IterStream, automatic_width}, + }, }; #[derive(Default)] @@ -43,6 +46,7 @@ type Frame = AuthEvents; const PATH_INLINE: usize = 32; const STACK_INLINE: usize = 32; +const CAPACITY_MULTIPLIER: usize = 4; #[tracing::instrument( name = "subgraph_dfs", @@ -61,21 +65,36 @@ where Fut: Future> + Send, Pdu: Event, { - let initial_cap = conflicted_set.len().saturating_mul(3); + let initial_capacity = conflicted_set + .len() + .saturating_mul(CAPACITY_MULTIPLIER); + let state = Global { - subgraph: Map::with_capacity(initial_cap), - todo: conflicted_set - .iter() - .map(Deref::deref) - .cloned() - .enumerate() - .map(Local::new) - .filter_map(Local::pop) - .map(|(local, event_id)| local.push(fetch, Some(event_id))) - .collect(), + subgraph: Map::with_capacity(initial_capacity), + todo: Todo::<_>::new(), }; - unfold(state, |mut state| async { + let inputs = conflicted_set + .iter() + .map(Deref::deref) + .cloned() + .enumerate() + .map(Local::new) + .filter_map(Local::pop) + .map(|(local, event_id)| local.push(fetch, Some(event_id))); + + unfold((inputs, state), async |(mut inputs, mut state)| { + debug_assert!( + state.todo.len() <= automatic_width(), + "Excessive items todo in FuturesUnordered" + ); + + while state.todo.len() < automatic_width() + && let Some(input) = inputs.next() + { + state.todo.push(input); + } + let outputs = state .todo .next() @@ -93,7 +112,7 @@ where .flatten() .stream(); - Some((outputs, state)) + Some((outputs, (inputs, state))) }) .flatten() }