From 6d6c5a3a9b77757470acdda4e79e313dd6602507 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Tue, 3 Mar 2026 11:36:06 +0000 Subject: [PATCH] Optimize conflicted-subgraph with single state container. Signed-off-by: Jason Volk --- .../state_res/resolve/conflicted_subgraph.rs | 104 +++++++++++++----- 1 file changed, 74 insertions(+), 30 deletions(-) diff --git a/src/service/rooms/state_res/resolve/conflicted_subgraph.rs b/src/service/rooms/state_res/resolve/conflicted_subgraph.rs index 1d39b7a3..e47050a1 100644 --- a/src/service/rooms/state_res/resolve/conflicted_subgraph.rs +++ b/src/service/rooms/state_res/resolve/conflicted_subgraph.rs @@ -1,4 +1,8 @@ -use std::{collections::HashSet as Set, iter::once, ops::Deref}; +use std::{ + collections::{HashMap as Map, hash_map::Entry}, + iter::once, + ops::Deref, +}; use futures::{ Future, Stream, StreamExt, @@ -9,29 +13,36 @@ use tuwunel_core::{ Result, implement, matrix::{Event, pdu::AuthEvents}, smallvec::SmallVec, - utils::stream::IterStream, + utils::{BoolExt, stream::IterStream}, }; #[derive(Default)] struct Global { - subgraph: Set, - seen: Set, + subgraph: Subgraph, todo: Todo, } #[derive(Default, Debug)] struct Local { + id: usize, path: Path, stack: Stack, } +#[derive(Default, Debug)] +struct Substate { + subgraph: bool, + seen: bool, +} + type Todo = FuturesUnordered; +type Subgraph = Map; type Path = SmallVec<[OwnedEventId; PATH_INLINE]>; type Stack = SmallVec<[Frame; STACK_INLINE]>; type Frame = AuthEvents; -const PATH_INLINE: usize = 16; -const STACK_INLINE: usize = 16; +const PATH_INLINE: usize = 32; +const STACK_INLINE: usize = 32; #[tracing::instrument( name = "subgraph_dfs", @@ -50,13 +61,14 @@ where Fut: Future> + Send, Pdu: Event, { + let initial_cap = conflicted_set.len().saturating_mul(3); let state = Global { - subgraph: Default::default(), - seen: Default::default(), + subgraph: Map::with_capacity(initial_cap), todo: conflicted_set .iter() .map(Deref::deref) .cloned() + .enumerate() .map(Local::new) .filter_map(Local::pop) .map(|(local, event_id)| local.push(fetch, Some(event_id))) @@ -92,11 +104,18 @@ where level = "trace", skip_all, fields( + subgraph = ?state + .subgraph + .values() + .fold((0_u64, 0_u64), |(a, b), v| { + (a.saturating_add(u64::from(v.subgraph)), b.saturating_add(u64::from(v.seen))) + }), + ?event_id, + id = self.id, path = self.path.len(), - stack = self.stack.len(), - subgraph = state.subgraph.len(), - seen = state.seen.len(), + stack = self.stack.iter().flatten().count(), + frames = self.stack.len(), ) )] fn eval( @@ -105,42 +124,65 @@ fn eval( conflicted_event_ids: &Vec<&OwnedEventId>, event_id: OwnedEventId, ) -> (Self, Option, Path) { - let Global { subgraph, seen, .. } = state; + let Global { subgraph, .. } = state; - let insert_path = |global: &mut Global, local: &Local| { + let insert_path_filter = |subgraph: &mut Subgraph, event_id: &OwnedEventId| match subgraph + .entry(event_id.clone()) + { + | Entry::Occupied(state) if state.get().subgraph => false, + | Entry::Occupied(mut state) => { + state.get_mut().subgraph = true; + state.get().subgraph + }, + | Entry::Vacant(state) => + state + .insert(Substate { subgraph: true, seen: false }) + .subgraph, + }; + + let insert_path = |subgraph: &mut Subgraph, local: &Local| { local .path .iter() - .filter(|&event_id| global.subgraph.insert(event_id.clone())) + .filter(|&event_id| insert_path_filter(subgraph, event_id)) .cloned() .collect() }; - if subgraph.contains(&event_id) { - if self.path.len() <= 1 { - self.path.pop(); - return (self, None, Path::new()); - } + let is_conflicted = |event_id: &OwnedEventId| { + conflicted_event_ids + .binary_search(&event_id) + .is_ok() + }; + + let mut entry = subgraph.entry(event_id.clone()); + + if let Entry::Occupied(state) = &entry + && state.get().subgraph + { + let path = (self.path.len() > 1) + .then(|| insert_path(subgraph, &self)) + .unwrap_or_default(); - let path = insert_path(state, &self); self.path.pop(); return (self, None, path); } - if !seen.insert(event_id.clone()) { + if let Entry::Occupied(state) = &mut entry { + state.get_mut().seen = true; return (self, None, Path::new()); } - if self.path.len() > 1 - && conflicted_event_ids - .binary_search(&&event_id) - .is_ok() - { - let path = insert_path(state, &self); - return (self, Some(event_id), path); + if let Entry::Vacant(state) = entry { + state.insert(Substate { subgraph: false, seen: true }); } - (self, Some(event_id), Path::new()) + let path = (self.path.len() > 1) + .and_if(|| is_conflicted(&event_id)) + .then(|| insert_path(subgraph, &self)) + .unwrap_or_default(); + + (self, Some(event_id), path) } #[implement(Local)] @@ -176,9 +218,11 @@ fn pop(mut self) -> Option<(Self, OwnedEventId)> { #[implement(Local)] #[allow(clippy::redundant_clone)] // buggy, nursery -fn new(conflicted_event_id: OwnedEventId) -> Self { +fn new((id, conflicted_event_id): (usize, OwnedEventId)) -> Self { Self { + id, path: once(conflicted_event_id.clone()).collect(), stack: once(once(conflicted_event_id).collect()).collect(), + ..Default::default() } }