From a0dc37e0243baf59d056af5c9db1e9676d361882 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sat, 2 Aug 2025 09:32:21 +0000 Subject: [PATCH] Fetch prev_events concurrently. Signed-off-by: Jason Volk --- src/service/rooms/event_handler/fetch_auth.rs | 2 +- src/service/rooms/event_handler/fetch_prev.rs | 136 ++++++++++-------- 2 files changed, 74 insertions(+), 64 deletions(-) diff --git a/src/service/rooms/event_handler/fetch_auth.rs b/src/service/rooms/event_handler/fetch_auth.rs index 045061b0..4930d71c 100644 --- a/src/service/rooms/event_handler/fetch_auth.rs +++ b/src/service/rooms/event_handler/fetch_auth.rs @@ -35,7 +35,7 @@ pub(super) async fn fetch_auth<'a, Events>( room_version: &RoomVersionId, ) -> Vec<(PduEvent, Option)> where - Events: Iterator + Clone + Send, + Events: Iterator + Send, { let events_with_auth_events: Vec<_> = events .stream() diff --git a/src/service/rooms/event_handler/fetch_prev.rs b/src/service/rooms/event_handler/fetch_prev.rs index 57574826..6b5426cb 100644 --- a/src/service/rooms/event_handler/fetch_prev.rs +++ b/src/service/rooms/event_handler/fetch_prev.rs @@ -1,9 +1,9 @@ use std::{ - collections::{HashMap, HashSet, VecDeque}, + collections::{HashMap, HashSet}, iter::once, }; -use futures::FutureExt; +use futures::{FutureExt, StreamExt, stream::FuturesOrdered}; use ruma::{ CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, RoomVersionId, ServerName, int, uint, @@ -32,73 +32,83 @@ pub(super) async fn fetch_prev<'a, Events>( first_ts_in_room: MilliSecondsSinceUnixEpoch, ) -> Result<(Vec, HashMap)> where - Events: Iterator + Clone + Send, + Events: Iterator + Send, { - let num_ids = initial_set.clone().count(); - let mut eventid_info = HashMap::new(); - let mut graph: HashMap = HashMap::with_capacity(num_ids); - let mut todo_outlier_stack: VecDeque = - initial_set.map(ToOwned::to_owned).collect(); + let mut todo_outlier_stack: FuturesOrdered<_> = initial_set + .map(ToOwned::to_owned) + .map(async |event_id| { + let fetch = self.fetch_auth(origin, room_id, once(event_id.as_ref()), room_version); + + (event_id.clone(), fetch.await) + }) + .map(FutureExt::boxed) + .collect(); let mut amount = 0; + let mut eventid_info = HashMap::new(); + let mut graph: HashMap = HashMap::with_capacity(todo_outlier_stack.len()); + while let Some((prev_event_id, mut outlier)) = todo_outlier_stack.next().await { + let Some((pdu, mut json_opt)) = outlier.pop() else { + // Fetch and handle failed + graph.insert(prev_event_id.clone(), HashSet::new()); + continue; + }; - while let Some(prev_event_id) = todo_outlier_stack.pop_front() { - self.services.server.check_running()?; + check_room_id(room_id, &pdu)?; - match self - .fetch_auth(origin, room_id, once(prev_event_id.as_ref()), room_version) - .boxed() - .await - .pop() - { - | Some((pdu, mut json_opt)) => { - check_room_id(room_id, &pdu)?; - - let limit = self.services.server.config.max_fetch_prev_events; - if amount > limit { - debug_warn!(?limit, "Max prev event limit reached!"); - graph.insert(prev_event_id.clone(), HashSet::new()); - continue; - } - - if json_opt.is_none() { - json_opt = self - .services - .timeline - .get_outlier_pdu_json(&prev_event_id) - .await - .ok(); - } - - if let Some(json) = json_opt { - if pdu.origin_server_ts() > first_ts_in_room { - amount = amount.saturating_add(1); - for prev_prev in pdu.prev_events() { - if !graph.contains_key(prev_prev) { - todo_outlier_stack.push_back(prev_prev.to_owned()); - } - } - - graph.insert( - prev_event_id.clone(), - pdu.prev_events().map(ToOwned::to_owned).collect(), - ); - } else { - // Time based check failed - graph.insert(prev_event_id.clone(), HashSet::new()); - } - - eventid_info.insert(prev_event_id.clone(), (pdu, json)); - } else { - // Get json failed, so this was not fetched over federation - graph.insert(prev_event_id.clone(), HashSet::new()); - } - }, - | _ => { - // Fetch and handle failed - graph.insert(prev_event_id.clone(), HashSet::new()); - }, + let limit = self.services.server.config.max_fetch_prev_events; + if amount > limit { + debug_warn!(?limit, "Max prev event limit reached!"); + graph.insert(prev_event_id.clone(), HashSet::new()); + continue; } + + if json_opt.is_none() { + json_opt = self + .services + .timeline + .get_outlier_pdu_json(&prev_event_id) + .await + .ok(); + } + + let Some(json) = json_opt else { + // Get json failed, so this was not fetched over federation + graph.insert(prev_event_id.clone(), HashSet::new()); + continue; + }; + + if pdu.origin_server_ts() > first_ts_in_room { + amount = amount.saturating_add(1); + for prev_prev in pdu.prev_events() { + if !graph.contains_key(prev_prev) { + let prev_prev = prev_prev.to_owned(); + let fetch = async move { + let fetch = self.fetch_auth( + origin, + room_id, + once(prev_prev.as_ref()), + room_version, + ); + + (prev_prev.clone(), fetch.await) + }; + + todo_outlier_stack.push_back(fetch.boxed()); + } + } + + graph.insert( + prev_event_id.clone(), + pdu.prev_events().map(ToOwned::to_owned).collect(), + ); + } else { + // Time based check failed + graph.insert(prev_event_id.clone(), HashSet::new()); + } + + eventid_info.insert(prev_event_id.clone(), (pdu, json)); + self.services.server.check_running()?; } let event_fetch = async |event_id: OwnedEventId| {