From 003257693bae10bb7952ead8b146eafebf218476 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sat, 2 Aug 2025 07:39:38 +0000 Subject: [PATCH] Fetch outlier and auth chains concurrently. Signed-off-by: Jason Volk --- src/service/rooms/event_handler/fetch_auth.rs | 200 +++++++++--------- 1 file changed, 103 insertions(+), 97 deletions(-) diff --git a/src/service/rooms/event_handler/fetch_auth.rs b/src/service/rooms/event_handler/fetch_auth.rs index 1fdb2b49..045061b0 100644 --- a/src/service/rooms/event_handler/fetch_auth.rs +++ b/src/service/rooms/event_handler/fetch_auth.rs @@ -4,6 +4,7 @@ use std::{ time::Duration, }; +use futures::{FutureExt, StreamExt}; use ruma::{ CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, RoomId, RoomVersionId, ServerName, api::federation::event::get_event, @@ -11,7 +12,9 @@ use ruma::{ use tuwunel_core::{ debug, debug_error, debug_warn, implement, matrix::{PduEvent, event::gen_event_id_canonical_json}, - trace, warn, + trace, + utils::stream::{BroadbandExt, IterStream, ReadyExt}, + warn, }; /// Find the event and auth it. Once the event is validated (steps 1 - 8) @@ -34,58 +37,62 @@ pub(super) async fn fetch_auth<'a, Events>( where Events: Iterator + Clone + Send, { - let mut events_with_auth_events = Vec::with_capacity(events.clone().count()); + let events_with_auth_events: Vec<_> = events + .stream() + .broad_then(|event_id| self.fetch_auth_chain(origin, room_id, event_id, room_version)) + .collect() + .boxed() + .await; - for event_id in events { - let outlier = self - .fetch_auth_chain(origin, room_id, event_id, room_version) - .await; - - events_with_auth_events.push(outlier); - } - - let mut pdus = Vec::with_capacity(events_with_auth_events.len()); - for (id, local_pdu, events_in_reverse_order) in events_with_auth_events { - // a. Look in the main timeline (pduid_pdu tree) - // b. Look at outlier pdu tree - // (get_pdu_json checks both) - if let Some(local_pdu) = local_pdu { - trace!("Found {id} in db"); - pdus.push((local_pdu.clone(), None)); - } - - for (next_id, value) in events_in_reverse_order.into_iter().rev() { - if self.is_backed_off(&next_id, Range { - start: Duration::from_secs(5 * 60), - end: Duration::from_secs(60 * 60 * 24), - }) { - debug_warn!("Backing off from {next_id}"); - continue; + events_with_auth_events + .into_iter() + .stream() + .fold(Vec::new(), async |mut pdus, (id, local_pdu, events_in_reverse_order)| { + // a. Look in the main timeline (pduid_pdu tree) + // b. Look at outlier pdu tree + // (get_pdu_json checks both) + if let Some(local_pdu) = local_pdu { + pdus.push((local_pdu, None)); } - match Box::pin(self.handle_outlier_pdu( - origin, - room_id, - &next_id, - value.clone(), - room_version, - true, - )) - .await - { - | Ok((pdu, json)) => - if next_id == *id { - pdus.push((pdu, Some(json))); - }, - | Err(e) => { - warn!("Authentication of event {next_id} failed: {e:?}"); - self.back_off(&next_id); - }, - } - } - } + events_in_reverse_order + .into_iter() + .rev() + .stream() + .ready_filter(|(next_id, _)| { + let backed_off = self.is_backed_off(next_id, Range { + start: Duration::from_secs(5 * 60), + end: Duration::from_secs(60 * 60 * 24), + }); - pdus + !backed_off + }) + .fold(pdus, async |mut pdus, (next_id, value)| { + let outlier = Box::pin(self.handle_outlier_pdu( + origin, + room_id, + &next_id, + value.clone(), + room_version, + true, + )); + + if let Ok((pdu, json)) = outlier + .await + .inspect_err(|e| warn!("Authentication of event {next_id} failed: {e:?}")) + { + if next_id == id { + pdus.push((pdu, Some(json))); + } + } else { + self.back_off(&next_id); + } + + pdus + }) + .await + }) + .await } #[implement(super::Service)] @@ -100,6 +107,7 @@ async fn fetch_auth_chain( // b. Look at outlier pdu tree // (get_pdu_json checks both) if let Ok(local_pdu) = self.services.timeline.get_pdu(event_id).await { + trace!(?event_id, "Found in database"); return (event_id.to_owned(), Some(local_pdu), vec![]); } @@ -111,6 +119,10 @@ async fn fetch_auth_chain( let mut events_all = HashSet::with_capacity(todo_auth_events.len()); while let Some(next_id) = todo_auth_events.pop_front() { + if events_all.contains(&next_id) { + continue; + } + if self.is_backed_off(&next_id, Range { start: Duration::from_secs(2 * 60), end: Duration::from_secs(60 * 60 * 8), @@ -119,68 +131,62 @@ async fn fetch_auth_chain( continue; } - if events_all.contains(&next_id) { - continue; - } - if self.services.timeline.pdu_exists(&next_id).await { - trace!("Found {next_id} in db"); + trace!(?next_id, "Found in database"); continue; } debug!("Fetching {next_id} over federation."); - match self + let Ok(res) = self .services .sending .send_federation_request(origin, get_event::v1::Request { - event_id: (*next_id).to_owned(), + event_id: next_id.clone(), include_unredacted_content: None, }) .await - { - | Ok(res) => { - debug!("Got {next_id} over federation"); - let Ok((calculated_event_id, value)) = - gen_event_id_canonical_json(&res.pdu, room_version) - else { - self.back_off(&next_id); - continue; - }; + .inspect_err(|e| debug_error!("Failed to fetch event {next_id}: {e}")) + else { + self.back_off(&next_id); + continue; + }; - if calculated_event_id != *next_id { - warn!( - "Server didn't return event id we requested: requested: {next_id}, we \ - got {calculated_event_id}. Event: {:?}", - &res.pdu - ); - } + debug!("Got {next_id} over federation"); + let Ok((calculated_event_id, value)) = + gen_event_id_canonical_json(&res.pdu, room_version) + else { + self.back_off(&next_id); + continue; + }; - if let Some(auth_events) = value - .get("auth_events") - .and_then(CanonicalJsonValue::as_array) - { - for auth_event in auth_events { - match serde_json::from_value::(auth_event.clone().into()) { - | Ok(auth_event) => { - todo_auth_events.push_back(auth_event); - }, - | _ => { - warn!("Auth event id is not valid"); - }, - } - } - } else { - warn!("Auth event list invalid"); - } - - events_in_reverse_order.push((next_id.clone(), value)); - events_all.insert(next_id); - }, - | Err(e) => { - debug_error!("Failed to fetch event {next_id}: {e}"); - self.back_off(&next_id); - }, + if calculated_event_id != next_id { + warn!( + "Server didn't return event id we requested: requested: {next_id}, we got \ + {calculated_event_id}. Event: {:?}", + &res.pdu + ); } + + if let Some(auth_events) = value + .get("auth_events") + .and_then(CanonicalJsonValue::as_array) + { + for auth_event in auth_events { + match serde_json::from_value::(auth_event.clone().into()) { + | Ok(auth_event) => { + todo_auth_events.push_back(auth_event); + }, + | _ => { + warn!("Auth event id is not valid"); + }, + } + } + } else { + warn!("Auth event list invalid"); + } + + events_in_reverse_order.push((next_id.clone(), value)); + events_all.insert(next_id); } (event_id.to_owned(), None, events_in_reverse_order)