diff --git a/src/service/rooms/event_handler/fetch_prev.rs b/src/service/rooms/event_handler/fetch_prev.rs index 951ab520..34efd80b 100644 --- a/src/service/rooms/event_handler/fetch_prev.rs +++ b/src/service/rooms/event_handler/fetch_prev.rs @@ -8,6 +8,7 @@ use ruma::{ use tuwunel_core::{ Result, debug_warn, err, implement, matrix::{Event, PduEvent}, + utils::stream::IterStream, }; use super::check_room_id; @@ -32,14 +33,27 @@ where Events: Iterator + Send, { let mut todo_outlier_stack: FuturesOrdered<_> = initial_set + .stream() .map(ToOwned::to_owned) + .filter_map(async |event_id| { + self.services + .timeline + .non_outlier_pdu_exists(&event_id) + .await + .is_err() + .then_some(event_id) + }) .map(async |event_id| { - let fetch = self.fetch_auth(origin, room_id, once(event_id.as_ref()), room_version); + let events = once(event_id.as_ref()); + let auth = self + .fetch_auth(origin, room_id, events, room_version) + .await; - (event_id.clone(), fetch.await) + (event_id, auth) }) .map(FutureExt::boxed) - .collect(); + .collect() + .await; let mut amount = 0; let mut eventid_info = HashMap::new(); @@ -78,21 +92,20 @@ where if pdu.origin_server_ts() > first_ts_in_room { amount = amount.saturating_add(1); for prev_prev in pdu.prev_events() { - if !graph.contains_key(prev_prev) { - let prev_prev = prev_prev.to_owned(); - let fetch = async move { - let fetch = self.fetch_auth( - origin, - room_id, - once(prev_prev.as_ref()), - room_version, - ); - - (prev_prev.clone(), fetch.await) - }; - - todo_outlier_stack.push_back(fetch.boxed()); + if graph.contains_key(prev_prev) { + continue; } + + let prev_prev = prev_prev.to_owned(); + let fetch = async move { + let fetch = self + .fetch_auth(origin, room_id, once(prev_prev.as_ref()), room_version) + .await; + + (prev_prev, fetch) + }; + + todo_outlier_stack.push_back(fetch.boxed()); } graph.insert(