Fetch outlier and auth chains concurrently.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-08-02 07:39:38 +00:00
parent bc898efcce
commit 003257693b

View File

@@ -4,6 +4,7 @@ use std::{
time::Duration, time::Duration,
}; };
use futures::{FutureExt, StreamExt};
use ruma::{ use ruma::{
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, RoomId, RoomVersionId, CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, RoomId, RoomVersionId,
ServerName, api::federation::event::get_event, ServerName, api::federation::event::get_event,
@@ -11,7 +12,9 @@ use ruma::{
use tuwunel_core::{ use tuwunel_core::{
debug, debug_error, debug_warn, implement, debug, debug_error, debug_warn, implement,
matrix::{PduEvent, event::gen_event_id_canonical_json}, matrix::{PduEvent, event::gen_event_id_canonical_json},
trace, warn, trace,
utils::stream::{BroadbandExt, IterStream, ReadyExt},
warn,
}; };
/// Find the event and auth it. Once the event is validated (steps 1 - 8) /// Find the event and auth it. Once the event is validated (steps 1 - 8)
@@ -34,58 +37,62 @@ pub(super) async fn fetch_auth<'a, Events>(
where where
Events: Iterator<Item = &'a EventId> + Clone + Send, Events: Iterator<Item = &'a EventId> + Clone + Send,
{ {
let mut events_with_auth_events = Vec::with_capacity(events.clone().count()); let events_with_auth_events: Vec<_> = events
.stream()
.broad_then(|event_id| self.fetch_auth_chain(origin, room_id, event_id, room_version))
.collect()
.boxed()
.await;
for event_id in events { events_with_auth_events
let outlier = self .into_iter()
.fetch_auth_chain(origin, room_id, event_id, room_version) .stream()
.await; .fold(Vec::new(), async |mut pdus, (id, local_pdu, events_in_reverse_order)| {
// a. Look in the main timeline (pduid_pdu tree)
events_with_auth_events.push(outlier); // b. Look at outlier pdu tree
} // (get_pdu_json checks both)
if let Some(local_pdu) = local_pdu {
let mut pdus = Vec::with_capacity(events_with_auth_events.len()); pdus.push((local_pdu, None));
for (id, local_pdu, events_in_reverse_order) in events_with_auth_events {
// a. Look in the main timeline (pduid_pdu tree)
// b. Look at outlier pdu tree
// (get_pdu_json checks both)
if let Some(local_pdu) = local_pdu {
trace!("Found {id} in db");
pdus.push((local_pdu.clone(), None));
}
for (next_id, value) in events_in_reverse_order.into_iter().rev() {
if self.is_backed_off(&next_id, Range {
start: Duration::from_secs(5 * 60),
end: Duration::from_secs(60 * 60 * 24),
}) {
debug_warn!("Backing off from {next_id}");
continue;
} }
match Box::pin(self.handle_outlier_pdu( events_in_reverse_order
origin, .into_iter()
room_id, .rev()
&next_id, .stream()
value.clone(), .ready_filter(|(next_id, _)| {
room_version, let backed_off = self.is_backed_off(next_id, Range {
true, start: Duration::from_secs(5 * 60),
)) end: Duration::from_secs(60 * 60 * 24),
.await });
{
| Ok((pdu, json)) =>
if next_id == *id {
pdus.push((pdu, Some(json)));
},
| Err(e) => {
warn!("Authentication of event {next_id} failed: {e:?}");
self.back_off(&next_id);
},
}
}
}
pdus !backed_off
})
.fold(pdus, async |mut pdus, (next_id, value)| {
let outlier = Box::pin(self.handle_outlier_pdu(
origin,
room_id,
&next_id,
value.clone(),
room_version,
true,
));
if let Ok((pdu, json)) = outlier
.await
.inspect_err(|e| warn!("Authentication of event {next_id} failed: {e:?}"))
{
if next_id == id {
pdus.push((pdu, Some(json)));
}
} else {
self.back_off(&next_id);
}
pdus
})
.await
})
.await
} }
#[implement(super::Service)] #[implement(super::Service)]
@@ -100,6 +107,7 @@ async fn fetch_auth_chain(
// b. Look at outlier pdu tree // b. Look at outlier pdu tree
// (get_pdu_json checks both) // (get_pdu_json checks both)
if let Ok(local_pdu) = self.services.timeline.get_pdu(event_id).await { if let Ok(local_pdu) = self.services.timeline.get_pdu(event_id).await {
trace!(?event_id, "Found in database");
return (event_id.to_owned(), Some(local_pdu), vec![]); return (event_id.to_owned(), Some(local_pdu), vec![]);
} }
@@ -111,6 +119,10 @@ async fn fetch_auth_chain(
let mut events_all = HashSet::with_capacity(todo_auth_events.len()); let mut events_all = HashSet::with_capacity(todo_auth_events.len());
while let Some(next_id) = todo_auth_events.pop_front() { while let Some(next_id) = todo_auth_events.pop_front() {
if events_all.contains(&next_id) {
continue;
}
if self.is_backed_off(&next_id, Range { if self.is_backed_off(&next_id, Range {
start: Duration::from_secs(2 * 60), start: Duration::from_secs(2 * 60),
end: Duration::from_secs(60 * 60 * 8), end: Duration::from_secs(60 * 60 * 8),
@@ -119,68 +131,62 @@ async fn fetch_auth_chain(
continue; continue;
} }
if events_all.contains(&next_id) {
continue;
}
if self.services.timeline.pdu_exists(&next_id).await { if self.services.timeline.pdu_exists(&next_id).await {
trace!("Found {next_id} in db"); trace!(?next_id, "Found in database");
continue; continue;
} }
debug!("Fetching {next_id} over federation."); debug!("Fetching {next_id} over federation.");
match self let Ok(res) = self
.services .services
.sending .sending
.send_federation_request(origin, get_event::v1::Request { .send_federation_request(origin, get_event::v1::Request {
event_id: (*next_id).to_owned(), event_id: next_id.clone(),
include_unredacted_content: None, include_unredacted_content: None,
}) })
.await .await
{ .inspect_err(|e| debug_error!("Failed to fetch event {next_id}: {e}"))
| Ok(res) => { else {
debug!("Got {next_id} over federation"); self.back_off(&next_id);
let Ok((calculated_event_id, value)) = continue;
gen_event_id_canonical_json(&res.pdu, room_version) };
else {
self.back_off(&next_id);
continue;
};
if calculated_event_id != *next_id { debug!("Got {next_id} over federation");
warn!( let Ok((calculated_event_id, value)) =
"Server didn't return event id we requested: requested: {next_id}, we \ gen_event_id_canonical_json(&res.pdu, room_version)
got {calculated_event_id}. Event: {:?}", else {
&res.pdu self.back_off(&next_id);
); continue;
} };
if let Some(auth_events) = value if calculated_event_id != next_id {
.get("auth_events") warn!(
.and_then(CanonicalJsonValue::as_array) "Server didn't return event id we requested: requested: {next_id}, we got \
{ {calculated_event_id}. Event: {:?}",
for auth_event in auth_events { &res.pdu
match serde_json::from_value::<OwnedEventId>(auth_event.clone().into()) { );
| Ok(auth_event) => {
todo_auth_events.push_back(auth_event);
},
| _ => {
warn!("Auth event id is not valid");
},
}
}
} else {
warn!("Auth event list invalid");
}
events_in_reverse_order.push((next_id.clone(), value));
events_all.insert(next_id);
},
| Err(e) => {
debug_error!("Failed to fetch event {next_id}: {e}");
self.back_off(&next_id);
},
} }
if let Some(auth_events) = value
.get("auth_events")
.and_then(CanonicalJsonValue::as_array)
{
for auth_event in auth_events {
match serde_json::from_value::<OwnedEventId>(auth_event.clone().into()) {
| Ok(auth_event) => {
todo_auth_events.push_back(auth_event);
},
| _ => {
warn!("Auth event id is not valid");
},
}
}
} else {
warn!("Auth event list invalid");
}
events_in_reverse_order.push((next_id.clone(), value));
events_all.insert(next_id);
} }
(event_id.to_owned(), None, events_in_reverse_order) (event_id.to_owned(), None, events_in_reverse_order)