Fetch prev_events concurrently.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-08-02 09:32:21 +00:00
parent 003257693b
commit a0dc37e024
2 changed files with 74 additions and 64 deletions

View File

@@ -35,7 +35,7 @@ pub(super) async fn fetch_auth<'a, Events>(
room_version: &RoomVersionId, room_version: &RoomVersionId,
) -> Vec<(PduEvent, Option<CanonicalJsonObject>)> ) -> Vec<(PduEvent, Option<CanonicalJsonObject>)>
where where
Events: Iterator<Item = &'a EventId> + Clone + Send, Events: Iterator<Item = &'a EventId> + Send,
{ {
let events_with_auth_events: Vec<_> = events let events_with_auth_events: Vec<_> = events
.stream() .stream()

View File

@@ -1,9 +1,9 @@
use std::{ use std::{
collections::{HashMap, HashSet, VecDeque}, collections::{HashMap, HashSet},
iter::once, iter::once,
}; };
use futures::FutureExt; use futures::{FutureExt, StreamExt, stream::FuturesOrdered};
use ruma::{ use ruma::{
CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId,
RoomVersionId, ServerName, int, uint, RoomVersionId, ServerName, int, uint,
@@ -32,73 +32,83 @@ pub(super) async fn fetch_prev<'a, Events>(
first_ts_in_room: MilliSecondsSinceUnixEpoch, first_ts_in_room: MilliSecondsSinceUnixEpoch,
) -> Result<(Vec<OwnedEventId>, HashMap<OwnedEventId, (PduEvent, CanonicalJsonObject)>)> ) -> Result<(Vec<OwnedEventId>, HashMap<OwnedEventId, (PduEvent, CanonicalJsonObject)>)>
where where
Events: Iterator<Item = &'a EventId> + Clone + Send, Events: Iterator<Item = &'a EventId> + Send,
{ {
let num_ids = initial_set.clone().count(); let mut todo_outlier_stack: FuturesOrdered<_> = initial_set
let mut eventid_info = HashMap::new(); .map(ToOwned::to_owned)
let mut graph: HashMap<OwnedEventId, _> = HashMap::with_capacity(num_ids); .map(async |event_id| {
let mut todo_outlier_stack: VecDeque<OwnedEventId> = let fetch = self.fetch_auth(origin, room_id, once(event_id.as_ref()), room_version);
initial_set.map(ToOwned::to_owned).collect();
(event_id.clone(), fetch.await)
})
.map(FutureExt::boxed)
.collect();
let mut amount = 0; let mut amount = 0;
let mut eventid_info = HashMap::new();
let mut graph: HashMap<OwnedEventId, _> = HashMap::with_capacity(todo_outlier_stack.len());
while let Some((prev_event_id, mut outlier)) = todo_outlier_stack.next().await {
let Some((pdu, mut json_opt)) = outlier.pop() else {
// Fetch and handle failed
graph.insert(prev_event_id.clone(), HashSet::new());
continue;
};
while let Some(prev_event_id) = todo_outlier_stack.pop_front() { check_room_id(room_id, &pdu)?;
self.services.server.check_running()?;
match self let limit = self.services.server.config.max_fetch_prev_events;
.fetch_auth(origin, room_id, once(prev_event_id.as_ref()), room_version) if amount > limit {
.boxed() debug_warn!(?limit, "Max prev event limit reached!");
.await graph.insert(prev_event_id.clone(), HashSet::new());
.pop() continue;
{
| Some((pdu, mut json_opt)) => {
check_room_id(room_id, &pdu)?;
let limit = self.services.server.config.max_fetch_prev_events;
if amount > limit {
debug_warn!(?limit, "Max prev event limit reached!");
graph.insert(prev_event_id.clone(), HashSet::new());
continue;
}
if json_opt.is_none() {
json_opt = self
.services
.timeline
.get_outlier_pdu_json(&prev_event_id)
.await
.ok();
}
if let Some(json) = json_opt {
if pdu.origin_server_ts() > first_ts_in_room {
amount = amount.saturating_add(1);
for prev_prev in pdu.prev_events() {
if !graph.contains_key(prev_prev) {
todo_outlier_stack.push_back(prev_prev.to_owned());
}
}
graph.insert(
prev_event_id.clone(),
pdu.prev_events().map(ToOwned::to_owned).collect(),
);
} else {
// Time based check failed
graph.insert(prev_event_id.clone(), HashSet::new());
}
eventid_info.insert(prev_event_id.clone(), (pdu, json));
} else {
// Get json failed, so this was not fetched over federation
graph.insert(prev_event_id.clone(), HashSet::new());
}
},
| _ => {
// Fetch and handle failed
graph.insert(prev_event_id.clone(), HashSet::new());
},
} }
if json_opt.is_none() {
json_opt = self
.services
.timeline
.get_outlier_pdu_json(&prev_event_id)
.await
.ok();
}
let Some(json) = json_opt else {
// Get json failed, so this was not fetched over federation
graph.insert(prev_event_id.clone(), HashSet::new());
continue;
};
if pdu.origin_server_ts() > first_ts_in_room {
amount = amount.saturating_add(1);
for prev_prev in pdu.prev_events() {
if !graph.contains_key(prev_prev) {
let prev_prev = prev_prev.to_owned();
let fetch = async move {
let fetch = self.fetch_auth(
origin,
room_id,
once(prev_prev.as_ref()),
room_version,
);
(prev_prev.clone(), fetch.await)
};
todo_outlier_stack.push_back(fetch.boxed());
}
}
graph.insert(
prev_event_id.clone(),
pdu.prev_events().map(ToOwned::to_owned).collect(),
);
} else {
// Time based check failed
graph.insert(prev_event_id.clone(), HashSet::new());
}
eventid_info.insert(prev_event_id.clone(), (pdu, json));
self.services.server.check_running()?;
} }
let event_fetch = async |event_id: OwnedEventId| { let event_fetch = async |event_id: OwnedEventId| {