From 42570a5a7c459f4d5c65595e217013f8491119cf Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Thu, 26 Feb 2026 12:08:50 +0000 Subject: [PATCH] Optimize sequential auth_chain chasing in power_sort. Signed-off-by: Jason Volk --- .../rooms/state_res/resolve/power_sort.rs | 128 +++++++++--------- 1 file changed, 63 insertions(+), 65 deletions(-) diff --git a/src/service/rooms/state_res/resolve/power_sort.rs b/src/service/rooms/state_res/resolve/power_sort.rs index 4d92431f..5c7b0a2e 100644 --- a/src/service/rooms/state_res/resolve/power_sort.rs +++ b/src/service/rooms/state_res/resolve/power_sort.rs @@ -1,9 +1,10 @@ use std::{ borrow::Borrow, collections::{HashMap, HashSet}, + iter::once, }; -use futures::{StreamExt, TryFutureExt, TryStreamExt}; +use futures::{StreamExt, TryFutureExt, TryStreamExt, stream::FuturesUnordered}; use ruma::{ EventId, OwnedEventId, events::{TimelineEventType, room::power_levels::UserPowerLevel}, @@ -64,9 +65,14 @@ where let graph = full_conflicted_set .iter() .stream() - .broad_filter_map(async |id| is_power_event_id(id, fetch).await.then_some(id)) - .fold(HashMap::new(), |graph, event_id| { - add_event_auth_chain(graph, full_conflicted_set, event_id, fetch) + .broad_filter_map(async |id| { + is_power_event_id(id, fetch) + .await + .then(|| id.clone()) + }) + .enumerate() + .fold(HashMap::new(), |graph, (i, event_id)| { + add_event_auth_chain(full_conflicted_set, graph, event_id, fetch, i) }) .await; @@ -103,58 +109,50 @@ where level = "trace", skip_all, fields( - ?event_id, graph = graph.len(), + ?event_id, + %i, ) )] async fn add_event_auth_chain( - mut graph: HashMap, full_conflicted_set: &HashSet, - event_id: &EventId, + mut graph: HashMap, + event_id: OwnedEventId, fetch: &Fetch, + i: usize, ) -> HashMap where Fetch: Fn(OwnedEventId) -> Fut + Sync, Fut: Future> + Send, Pdu: Event, { - let mut state = vec![event_id.to_owned()]; + let mut todo: FuturesUnordered = once(fetch(event_id)).collect(); - // Iterate through the auth chain of the event. - while let Some(event_id) = state.pop() { - // Iterate through the auth events of this event. - let event = fetch(event_id.clone()).await.ok(); + while let Some(event) = todo.next().await { + let Ok(event) = event else { + continue; + }; - // Add the current event to the graph. - graph.entry(event_id).or_default(); + let event_id = event.event_id().to_owned(); + graph.entry(event_id.clone()).or_default(); - event - .as_ref() - .map(Event::auth_events) + for auth_event_id in event + .auth_events_into() .into_iter() - .flatten() - .map(ToOwned::to_owned) .filter(|auth_event_id| full_conflicted_set.contains(auth_event_id)) - .for_each(|auth_event_id| { - // If the auth event ID is not in the graph, check its auth events later. - if !graph.contains_key(&auth_event_id) { - state.push(auth_event_id.clone()); - } + { + if !graph.contains_key(&auth_event_id) { + todo.push(fetch(auth_event_id.clone())); + } - let event_id = event - .as_ref() - .expect("event is Some if there are auth_events") - .event_id(); + let references = graph + .get_mut(&event_id) + .expect("event_id present in graph"); - // Add the auth event ID to the list of incoming edges. - let references = graph - .get_mut(event_id) - .expect("event_id must be added to graph"); - - if !references.contains(&auth_event_id) { - references.push(auth_event_id); - } - }); + if !references.contains(&auth_event_id) { + references.push(auth_event_id); + } + } } graph @@ -199,17 +197,18 @@ where Fut: Future> + Send, Pdu: Event, { - let mut room_create_event = None; - let mut room_power_levels_event = None; - let event = fetch(event_id.to_owned()).await; - if let Ok(event) = &event - && rules - .authorization - .room_create_event_id_as_room_id - { + let event = fetch(event_id.into()).await; + let hydra_room_id = rules + .authorization + .room_create_event_id_as_room_id; + + let mut create_event = None; + let mut power_levels_event = None; + if hydra_room_id && let Ok(event) = event.as_ref() { let create_id = event.room_id().as_event_id()?; let fetched = fetch(create_id).await?; - room_create_event = Some(RoomCreateEvent::new(fetched)); + + _ = create_event.insert(RoomCreateEvent::new(fetched)); } for auth_event_id in event @@ -218,33 +217,32 @@ where .into_iter() .flatten() { - if let Ok(auth_event) = fetch(auth_event_id.to_owned()).await { - if auth_event.is_type_and_state_key(&TimelineEventType::RoomPowerLevels, "") { - room_power_levels_event = Some(RoomPowerLevelsEvent::new(auth_event)); - } else if !rules - .authorization - .room_create_event_id_as_room_id - && auth_event.is_type_and_state_key(&TimelineEventType::RoomCreate, "") - { - room_create_event = Some(RoomCreateEvent::new(auth_event)); - } + use TimelineEventType::{RoomCreate, RoomPowerLevels}; - if room_power_levels_event.is_some() && room_create_event.is_some() { - break; - } + let Ok(auth_event) = fetch(auth_event_id.to_owned()).await else { + continue; + }; + + if !hydra_room_id && auth_event.is_type_and_state_key(&RoomCreate, "") { + _ = create_event.get_or_insert_with(|| RoomCreateEvent::new(auth_event)); + } else if auth_event.is_type_and_state_key(&RoomPowerLevels, "") { + _ = power_levels_event.get_or_insert_with(|| RoomPowerLevelsEvent::new(auth_event)); + } + + if power_levels_event.is_some() && create_event.is_some() { + break; } } - let auth_rules = &rules.authorization; - let creators = room_create_event + let creators = create_event .as_ref() - .and_then(|event| event.creators(auth_rules).ok()); + .and_then(|event| event.creators(&rules.authorization).ok()); if let Some((event, creators)) = event.ok().zip(creators) { - room_power_levels_event.user_power_level(event.sender(), creators, auth_rules) + power_levels_event.user_power_level(event.sender(), creators, &rules.authorization) } else { - room_power_levels_event - .get_as_int_or_default(RoomPowerLevelsIntField::UsersDefault, auth_rules) + power_levels_event + .get_as_int_or_default(RoomPowerLevelsIntField::UsersDefault, &rules.authorization) .map(Into::into) } }