Optimize sequential auth_chain chasing in power_sort.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2026-02-26 12:08:50 +00:00
parent 9ede830ffe
commit 42570a5a7c

View File

@@ -1,9 +1,10 @@
use std::{
borrow::Borrow,
collections::{HashMap, HashSet},
iter::once,
};
use futures::{StreamExt, TryFutureExt, TryStreamExt};
use futures::{StreamExt, TryFutureExt, TryStreamExt, stream::FuturesUnordered};
use ruma::{
EventId, OwnedEventId,
events::{TimelineEventType, room::power_levels::UserPowerLevel},
@@ -64,9 +65,14 @@ where
let graph = full_conflicted_set
.iter()
.stream()
.broad_filter_map(async |id| is_power_event_id(id, fetch).await.then_some(id))
.fold(HashMap::new(), |graph, event_id| {
add_event_auth_chain(graph, full_conflicted_set, event_id, fetch)
.broad_filter_map(async |id| {
is_power_event_id(id, fetch)
.await
.then(|| id.clone())
})
.enumerate()
.fold(HashMap::new(), |graph, (i, event_id)| {
add_event_auth_chain(full_conflicted_set, graph, event_id, fetch, i)
})
.await;
@@ -103,58 +109,50 @@ where
level = "trace",
skip_all,
fields(
?event_id,
graph = graph.len(),
?event_id,
%i,
)
)]
async fn add_event_auth_chain<Fetch, Fut, Pdu>(
mut graph: HashMap<OwnedEventId, ReferencedIds>,
full_conflicted_set: &HashSet<OwnedEventId>,
event_id: &EventId,
mut graph: HashMap<OwnedEventId, ReferencedIds>,
event_id: OwnedEventId,
fetch: &Fetch,
i: usize,
) -> HashMap<OwnedEventId, ReferencedIds>
where
Fetch: Fn(OwnedEventId) -> Fut + Sync,
Fut: Future<Output = Result<Pdu>> + Send,
Pdu: Event,
{
let mut state = vec![event_id.to_owned()];
let mut todo: FuturesUnordered<Fut> = once(fetch(event_id)).collect();
// Iterate through the auth chain of the event.
while let Some(event_id) = state.pop() {
// Iterate through the auth events of this event.
let event = fetch(event_id.clone()).await.ok();
while let Some(event) = todo.next().await {
let Ok(event) = event else {
continue;
};
// Add the current event to the graph.
graph.entry(event_id).or_default();
let event_id = event.event_id().to_owned();
graph.entry(event_id.clone()).or_default();
event
.as_ref()
.map(Event::auth_events)
for auth_event_id in event
.auth_events_into()
.into_iter()
.flatten()
.map(ToOwned::to_owned)
.filter(|auth_event_id| full_conflicted_set.contains(auth_event_id))
.for_each(|auth_event_id| {
// If the auth event ID is not in the graph, check its auth events later.
if !graph.contains_key(&auth_event_id) {
state.push(auth_event_id.clone());
}
{
if !graph.contains_key(&auth_event_id) {
todo.push(fetch(auth_event_id.clone()));
}
let event_id = event
.as_ref()
.expect("event is Some if there are auth_events")
.event_id();
let references = graph
.get_mut(&event_id)
.expect("event_id present in graph");
// Add the auth event ID to the list of incoming edges.
let references = graph
.get_mut(event_id)
.expect("event_id must be added to graph");
if !references.contains(&auth_event_id) {
references.push(auth_event_id);
}
});
if !references.contains(&auth_event_id) {
references.push(auth_event_id);
}
}
}
graph
@@ -199,17 +197,18 @@ where
Fut: Future<Output = Result<Pdu>> + Send,
Pdu: Event,
{
let mut room_create_event = None;
let mut room_power_levels_event = None;
let event = fetch(event_id.to_owned()).await;
if let Ok(event) = &event
&& rules
.authorization
.room_create_event_id_as_room_id
{
let event = fetch(event_id.into()).await;
let hydra_room_id = rules
.authorization
.room_create_event_id_as_room_id;
let mut create_event = None;
let mut power_levels_event = None;
if hydra_room_id && let Ok(event) = event.as_ref() {
let create_id = event.room_id().as_event_id()?;
let fetched = fetch(create_id).await?;
room_create_event = Some(RoomCreateEvent::new(fetched));
_ = create_event.insert(RoomCreateEvent::new(fetched));
}
for auth_event_id in event
@@ -218,33 +217,32 @@ where
.into_iter()
.flatten()
{
if let Ok(auth_event) = fetch(auth_event_id.to_owned()).await {
if auth_event.is_type_and_state_key(&TimelineEventType::RoomPowerLevels, "") {
room_power_levels_event = Some(RoomPowerLevelsEvent::new(auth_event));
} else if !rules
.authorization
.room_create_event_id_as_room_id
&& auth_event.is_type_and_state_key(&TimelineEventType::RoomCreate, "")
{
room_create_event = Some(RoomCreateEvent::new(auth_event));
}
use TimelineEventType::{RoomCreate, RoomPowerLevels};
if room_power_levels_event.is_some() && room_create_event.is_some() {
break;
}
let Ok(auth_event) = fetch(auth_event_id.to_owned()).await else {
continue;
};
if !hydra_room_id && auth_event.is_type_and_state_key(&RoomCreate, "") {
_ = create_event.get_or_insert_with(|| RoomCreateEvent::new(auth_event));
} else if auth_event.is_type_and_state_key(&RoomPowerLevels, "") {
_ = power_levels_event.get_or_insert_with(|| RoomPowerLevelsEvent::new(auth_event));
}
if power_levels_event.is_some() && create_event.is_some() {
break;
}
}
let auth_rules = &rules.authorization;
let creators = room_create_event
let creators = create_event
.as_ref()
.and_then(|event| event.creators(auth_rules).ok());
.and_then(|event| event.creators(&rules.authorization).ok());
if let Some((event, creators)) = event.ok().zip(creators) {
room_power_levels_event.user_power_level(event.sender(), creators, auth_rules)
power_levels_event.user_power_level(event.sender(), creators, &rules.authorization)
} else {
room_power_levels_event
.get_as_int_or_default(RoomPowerLevelsIntField::UsersDefault, auth_rules)
power_levels_event
.get_as_int_or_default(RoomPowerLevelsIntField::UsersDefault, &rules.authorization)
.map(Into::into)
}
}