Split fetch_outlier; abstract backoff stanzas into fn.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-08-02 04:59:58 +00:00
parent 4b9b85f671
commit d217927000
7 changed files with 203 additions and 231 deletions

View File

@@ -1,10 +1,11 @@
use std::{
collections::{BTreeMap, HashSet, VecDeque, hash_map},
time::Instant,
collections::{HashSet, VecDeque},
ops::Range,
time::Duration,
};
use ruma::{
CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName,
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName,
api::federation::event::get_event,
};
use tuwunel_core::{
@@ -13,9 +14,7 @@ use tuwunel_core::{
PduEvent,
event::{Event, gen_event_id_canonical_json},
},
trace,
utils::continue_exponential_backoff_secs,
warn,
trace, warn,
};
use super::get_room_version_id;
@@ -36,144 +35,19 @@ pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>(
events: Events,
create_event: &'a Pdu,
room_id: &'a RoomId,
) -> Vec<(PduEvent, Option<BTreeMap<String, CanonicalJsonValue>>)>
) -> Vec<(PduEvent, Option<CanonicalJsonObject>)>
where
Pdu: Event,
Events: Iterator<Item = &'a EventId> + Clone + Send,
{
let back_off = |id| match self
.services
.globals
.bad_event_ratelimiter
.write()
.expect("locked")
.entry(id)
{
| hash_map::Entry::Vacant(e) => {
e.insert((Instant::now(), 1));
},
| hash_map::Entry::Occupied(mut e) => {
*e.get_mut() = (Instant::now(), e.get().1.saturating_add(1));
},
};
let mut events_with_auth_events = Vec::with_capacity(events.clone().count());
for id in events {
// a. Look in the main timeline (pduid_pdu tree)
// b. Look at outlier pdu tree
// (get_pdu_json checks both)
if let Ok(local_pdu) = self.services.timeline.get_pdu(id).await {
events_with_auth_events.push((id.to_owned(), Some(local_pdu), vec![]));
continue;
}
for event_id in events {
let outlier = self
.fetch_auth(room_id, event_id, origin, create_event)
.await;
// c. Ask origin server over federation
// We also handle its auth chain here so we don't get a stack overflow in
// handle_outlier_pdu.
let mut todo_auth_events: VecDeque<_> = [id.to_owned()].into();
let mut events_in_reverse_order = Vec::with_capacity(todo_auth_events.len());
let mut events_all = HashSet::with_capacity(todo_auth_events.len());
while let Some(next_id) = todo_auth_events.pop_front() {
if let Some((time, tries)) = self
.services
.globals
.bad_event_ratelimiter
.read()
.expect("locked")
.get(&*next_id)
{
// Exponential backoff
const MIN_DURATION: u64 = 60 * 2;
const MAX_DURATION: u64 = 60 * 60 * 8;
if continue_exponential_backoff_secs(
MIN_DURATION,
MAX_DURATION,
time.elapsed(),
*tries,
) {
debug_warn!(
tried = ?*tries,
elapsed = ?time.elapsed(),
"Backing off from {next_id}",
);
continue;
}
}
if events_all.contains(&next_id) {
continue;
}
if self.services.timeline.pdu_exists(&next_id).await {
trace!("Found {next_id} in db");
continue;
}
debug!("Fetching {next_id} over federation.");
match self
.services
.sending
.send_federation_request(origin, get_event::v1::Request {
event_id: (*next_id).to_owned(),
include_unredacted_content: None,
})
.await
{
| Ok(res) => {
debug!("Got {next_id} over federation");
let Ok(room_version_id) = get_room_version_id(create_event) else {
back_off((*next_id).to_owned());
continue;
};
let Ok((calculated_event_id, value)) =
gen_event_id_canonical_json(&res.pdu, &room_version_id)
else {
back_off((*next_id).to_owned());
continue;
};
if calculated_event_id != *next_id {
warn!(
"Server didn't return event id we requested: requested: {next_id}, \
we got {calculated_event_id}. Event: {:?}",
&res.pdu
);
}
if let Some(auth_events) = value
.get("auth_events")
.and_then(CanonicalJsonValue::as_array)
{
for auth_event in auth_events {
match serde_json::from_value::<OwnedEventId>(
auth_event.clone().into(),
) {
| Ok(auth_event) => {
todo_auth_events.push_back(auth_event);
},
| _ => {
warn!("Auth event id is not valid");
},
}
}
} else {
warn!("Auth event list invalid");
}
events_in_reverse_order.push((next_id.clone(), value));
events_all.insert(next_id);
},
| Err(e) => {
debug_error!("Failed to fetch event {next_id}: {e}");
back_off((*next_id).to_owned());
},
}
}
events_with_auth_events.push((id.to_owned(), None, events_in_reverse_order));
events_with_auth_events.push(outlier);
}
let mut pdus = Vec::with_capacity(events_with_auth_events.len());
@@ -187,26 +61,12 @@ where
}
for (next_id, value) in events_in_reverse_order.into_iter().rev() {
if let Some((time, tries)) = self
.services
.globals
.bad_event_ratelimiter
.read()
.expect("locked")
.get(&*next_id)
{
// Exponential backoff
const MIN_DURATION: u64 = 5 * 60;
const MAX_DURATION: u64 = 60 * 60 * 24;
if continue_exponential_backoff_secs(
MIN_DURATION,
MAX_DURATION,
time.elapsed(),
*tries,
) {
debug!("Backing off from {next_id}");
continue;
}
if self.is_backed_off(&next_id, Range {
start: Duration::from_secs(5 * 60),
end: Duration::from_secs(60 * 60 * 24),
}) {
debug_warn!("Backing off from {next_id}");
continue;
}
match Box::pin(self.handle_outlier_pdu(
@@ -225,7 +85,7 @@ where
},
| Err(e) => {
warn!("Authentication of event {next_id} failed: {e:?}");
back_off(next_id);
self.back_off(&next_id);
},
}
}
@@ -233,3 +93,109 @@ where
pdus
}
#[implement(super::Service)]
async fn fetch_auth<'a, Pdu>(
&self,
_room_id: &'a RoomId,
event_id: &'a EventId,
origin: &'a ServerName,
create_event: &'a Pdu,
) -> (OwnedEventId, Option<PduEvent>, Vec<(OwnedEventId, CanonicalJsonObject)>)
where
Pdu: Event,
{
// a. Look in the main timeline (pduid_pdu tree)
// b. Look at outlier pdu tree
// (get_pdu_json checks both)
if let Ok(local_pdu) = self.services.timeline.get_pdu(event_id).await {
return (event_id.to_owned(), Some(local_pdu), vec![]);
}
// c. Ask origin server over federation
// We also handle its auth chain here so we don't get a stack overflow in
// handle_outlier_pdu.
let mut todo_auth_events: VecDeque<_> = [event_id.to_owned()].into();
let mut events_in_reverse_order = Vec::with_capacity(todo_auth_events.len());
let mut events_all = HashSet::with_capacity(todo_auth_events.len());
while let Some(next_id) = todo_auth_events.pop_front() {
if self.is_backed_off(&next_id, Range {
start: Duration::from_secs(2 * 60),
end: Duration::from_secs(60 * 60 * 8),
}) {
debug_warn!("Backing off from {next_id}");
continue;
}
if events_all.contains(&next_id) {
continue;
}
if self.services.timeline.pdu_exists(&next_id).await {
trace!("Found {next_id} in db");
continue;
}
debug!("Fetching {next_id} over federation.");
match self
.services
.sending
.send_federation_request(origin, get_event::v1::Request {
event_id: (*next_id).to_owned(),
include_unredacted_content: None,
})
.await
{
| Ok(res) => {
debug!("Got {next_id} over federation");
let Ok(room_version_id) = get_room_version_id(create_event) else {
self.back_off(&next_id);
continue;
};
let Ok((calculated_event_id, value)) =
gen_event_id_canonical_json(&res.pdu, &room_version_id)
else {
self.back_off(&next_id);
continue;
};
if calculated_event_id != *next_id {
warn!(
"Server didn't return event id we requested: requested: {next_id}, we \
got {calculated_event_id}. Event: {:?}",
&res.pdu
);
}
if let Some(auth_events) = value
.get("auth_events")
.and_then(CanonicalJsonValue::as_array)
{
for auth_event in auth_events {
match serde_json::from_value::<OwnedEventId>(auth_event.clone().into()) {
| Ok(auth_event) => {
todo_auth_events.push_back(auth_event);
},
| _ => {
warn!("Auth event id is not valid");
},
}
}
} else {
warn!("Auth event list invalid");
}
events_in_reverse_order.push((next_id.clone(), value));
events_all.insert(next_id);
},
| Err(e) => {
debug_error!("Failed to fetch event {next_id}: {e}");
self.back_off(&next_id);
},
}
}
(event_id.to_owned(), None, events_in_reverse_order)
}

View File

@@ -1,11 +1,11 @@
use std::{
collections::{BTreeMap, HashMap, HashSet, VecDeque},
collections::{HashMap, HashSet, VecDeque},
iter::once,
};
use futures::FutureExt;
use ruma::{
CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName,
CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName,
int, uint,
};
use tuwunel_core::{
@@ -30,10 +30,7 @@ pub(super) async fn fetch_prev<'a, Pdu, Events>(
room_id: &RoomId,
first_ts_in_room: MilliSecondsSinceUnixEpoch,
initial_set: Events,
) -> Result<(
Vec<OwnedEventId>,
HashMap<OwnedEventId, (PduEvent, BTreeMap<String, CanonicalJsonValue>)>,
)>
) -> Result<(Vec<OwnedEventId>, HashMap<OwnedEventId, (PduEvent, CanonicalJsonObject)>)>
where
Pdu: Event,
Events: Iterator<Item = &'a EventId> + Clone + Send,

View File

@@ -1,13 +1,10 @@
use std::{
collections::{BTreeMap, hash_map},
time::Instant,
};
use std::time::Instant;
use futures::{
FutureExt, TryFutureExt, TryStreamExt,
future::{OptionFuture, try_join5},
};
use ruma::{CanonicalJsonValue, EventId, RoomId, ServerName, UserId, events::StateEventType};
use ruma::{CanonicalJsonObject, EventId, RoomId, ServerName, UserId, events::StateEventType};
use tuwunel_core::{
Err, Result, debug, debug::INFO_SPAN_LEVEL, defer, err, implement, matrix::Event,
utils::stream::IterStream, warn,
@@ -55,7 +52,7 @@ pub async fn handle_incoming_pdu<'a>(
origin: &'a ServerName,
room_id: &'a RoomId,
event_id: &'a EventId,
value: BTreeMap<String, CanonicalJsonValue>,
value: CanonicalJsonObject,
is_timeline_event: bool,
) -> Result<Option<RawPduId>> {
// 1. Skip the PDU if we already have it as a timeline event
@@ -160,22 +157,7 @@ pub async fn handle_incoming_pdu<'a>(
)
.inspect_err(move |e| {
warn!("Prev {prev_id} failed: {e}");
match self
.services
.globals
.bad_event_ratelimiter
.write()
.expect("locked")
.entry(prev_id.into())
{
| hash_map::Entry::Vacant(e) => {
e.insert((Instant::now(), 1));
},
| hash_map::Entry::Occupied(mut e) => {
let tries = e.get().1.saturating_add(1);
*e.get_mut() = (Instant::now(), tries);
},
}
self.back_off(prev_id);
})
.map(|_| self.services.server.check_running())
})

View File

@@ -1,4 +1,4 @@
use std::collections::{BTreeMap, HashMap, hash_map};
use std::collections::{HashMap, hash_map};
use futures::future::ready;
use ruma::{
@@ -22,7 +22,7 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>(
room_id: &'a RoomId,
mut value: CanonicalJsonObject,
auth_events_known: bool,
) -> Result<(PduEvent, BTreeMap<String, CanonicalJsonValue>)>
) -> Result<(PduEvent, CanonicalJsonObject)>
where
Pdu: Event,
{

View File

@@ -1,16 +1,17 @@
use std::{collections::BTreeMap, time::Instant};
use std::{
ops::Range,
time::{Duration, Instant},
};
use ruma::{CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName};
use ruma::{CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName};
use tuwunel_core::{
Err, Result, debug,
debug::INFO_SPAN_LEVEL,
defer, implement,
matrix::{Event, PduEvent},
utils::continue_exponential_backoff_secs,
};
#[implement(super::Service)]
#[allow(clippy::type_complexity)]
#[allow(clippy::too_many_arguments)]
#[tracing::instrument(
name = "prev",
@@ -23,7 +24,7 @@ pub(super) async fn handle_prev_pdu<'a, Pdu>(
origin: &'a ServerName,
event_id: &'a EventId,
room_id: &'a RoomId,
eventid_info: Option<(PduEvent, BTreeMap<String, CanonicalJsonValue>)>,
eventid_info: Option<(PduEvent, CanonicalJsonObject)>,
create_event: &'a Pdu,
first_ts_in_room: MilliSecondsSinceUnixEpoch,
prev_id: &'a EventId,
@@ -39,25 +40,12 @@ where
))));
}
if let Some((time, tries)) = self
.services
.globals
.bad_event_ratelimiter
.read()
.expect("locked")
.get(prev_id)
{
// Exponential backoff
const MIN_DURATION: u64 = 5 * 60;
const MAX_DURATION: u64 = 60 * 60 * 24;
if continue_exponential_backoff_secs(MIN_DURATION, MAX_DURATION, time.elapsed(), *tries) {
debug!(
?tries,
duration = ?time.elapsed(),
"Backing off from prev_event"
);
return Ok(());
}
if self.is_backed_off(prev_id, Range {
start: Duration::from_secs(5 * 60),
end: Duration::from_secs(60 * 60 * 24),
}) {
debug!(?prev_id, "Backing off from prev_event");
return Ok(());
}
let Some((pdu, json)) = eventid_info else {
@@ -73,7 +61,7 @@ where
self.federation_handletime
.write()
.expect("locked")
.insert(room_id.into(), ((*prev_id).to_owned(), start_time));
.insert(room_id.into(), (prev_id.to_owned(), start_time));
defer! {{
self.federation_handletime

View File

@@ -11,21 +11,22 @@ mod state_at_incoming;
mod upgrade_outlier_pdu;
use std::{
collections::HashMap,
collections::{HashMap, hash_map},
fmt::Write,
ops::Range,
sync::{Arc, RwLock as StdRwLock},
time::Instant,
time::{Duration, Instant},
};
use async_trait::async_trait;
use ruma::{
OwnedEventId, OwnedRoomId, RoomId, RoomVersionId,
EventId, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId,
events::room::create::RoomCreateEventContent,
};
use tuwunel_core::{
Err, Result, RoomVersion, Server,
Err, Result, RoomVersion, Server, implement,
matrix::{Event, PduEvent},
utils::MutexMap,
utils::{MutexMap, continue_exponential_backoff},
};
use crate::{Dep, globals, rooms, sending, server_keys};
@@ -97,20 +98,58 @@ impl crate::Service for Service {
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
async fn event_exists(&self, event_id: OwnedEventId) -> bool {
self.services.timeline.pdu_exists(&event_id).await
}
#[implement(Service)]
fn back_off(&self, event_id: &EventId) {
use hash_map::Entry::{Occupied, Vacant};
async fn event_fetch(&self, event_id: OwnedEventId) -> Option<PduEvent> {
self.services
.timeline
.get_pdu(&event_id)
.await
.ok()
match self
.services
.globals
.bad_event_ratelimiter
.write()
.expect("locked")
.entry(event_id.into())
{
| Vacant(e) => {
e.insert((Instant::now(), 1));
},
| Occupied(mut e) => {
*e.get_mut() = (Instant::now(), e.get().1.saturating_add(1));
},
}
}
#[implement(Service)]
fn is_backed_off(&self, event_id: &EventId, range: Range<Duration>) -> bool {
let Some((time, tries)) = self
.services
.globals
.bad_event_ratelimiter
.read()
.expect("locked")
.get(event_id)
.copied()
else {
return false;
};
continue_exponential_backoff(range.start, range.end, time.elapsed(), tries)
}
#[implement(Service)]
async fn event_exists(&self, event_id: OwnedEventId) -> bool {
self.services.timeline.pdu_exists(&event_id).await
}
#[implement(Service)]
async fn event_fetch(&self, event_id: OwnedEventId) -> Option<PduEvent> {
self.services
.timeline
.get_pdu(&event_id)
.await
.ok()
}
fn check_room_id<Pdu: Event>(room_id: &RoomId, pdu: &Pdu) -> Result {
if pdu.room_id() != room_id {
return Err!(Request(InvalidParam(error!(

View File

@@ -1,7 +1,7 @@
use std::{borrow::Borrow, collections::BTreeMap, iter::once, sync::Arc, time::Instant};
use std::{borrow::Borrow, iter::once, sync::Arc, time::Instant};
use futures::{FutureExt, StreamExt, future::ready};
use ruma::{CanonicalJsonValue, RoomId, ServerName, events::StateEventType};
use ruma::{CanonicalJsonObject, RoomId, ServerName, events::StateEventType};
use tuwunel_core::{
Err, Result, debug, debug_info, err, implement, is_equal_to,
matrix::{Event, EventTypeExt, PduEvent, StateKey, state_res},
@@ -21,7 +21,7 @@ use crate::rooms::{
pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
&self,
incoming_pdu: PduEvent,
val: BTreeMap<String, CanonicalJsonValue>,
val: CanonicalJsonObject,
create_event: &Pdu,
origin: &ServerName,
room_id: &RoomId,