diff --git a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs index f4fe5fb7..3a03a19d 100644 --- a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs +++ b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs @@ -1,10 +1,11 @@ use std::{ - collections::{BTreeMap, HashSet, VecDeque, hash_map}, - time::Instant, + collections::{HashSet, VecDeque}, + ops::Range, + time::Duration, }; use ruma::{ - CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName, + CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName, api::federation::event::get_event, }; use tuwunel_core::{ @@ -13,9 +14,7 @@ use tuwunel_core::{ PduEvent, event::{Event, gen_event_id_canonical_json}, }, - trace, - utils::continue_exponential_backoff_secs, - warn, + trace, warn, }; use super::get_room_version_id; @@ -36,144 +35,19 @@ pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>( events: Events, create_event: &'a Pdu, room_id: &'a RoomId, -) -> Vec<(PduEvent, Option>)> +) -> Vec<(PduEvent, Option)> where Pdu: Event, Events: Iterator + Clone + Send, { - let back_off = |id| match self - .services - .globals - .bad_event_ratelimiter - .write() - .expect("locked") - .entry(id) - { - | hash_map::Entry::Vacant(e) => { - e.insert((Instant::now(), 1)); - }, - | hash_map::Entry::Occupied(mut e) => { - *e.get_mut() = (Instant::now(), e.get().1.saturating_add(1)); - }, - }; - let mut events_with_auth_events = Vec::with_capacity(events.clone().count()); - for id in events { - // a. Look in the main timeline (pduid_pdu tree) - // b. Look at outlier pdu tree - // (get_pdu_json checks both) - if let Ok(local_pdu) = self.services.timeline.get_pdu(id).await { - events_with_auth_events.push((id.to_owned(), Some(local_pdu), vec![])); - continue; - } + for event_id in events { + let outlier = self + .fetch_auth(room_id, event_id, origin, create_event) + .await; - // c. Ask origin server over federation - // We also handle its auth chain here so we don't get a stack overflow in - // handle_outlier_pdu. - let mut todo_auth_events: VecDeque<_> = [id.to_owned()].into(); - let mut events_in_reverse_order = Vec::with_capacity(todo_auth_events.len()); - - let mut events_all = HashSet::with_capacity(todo_auth_events.len()); - while let Some(next_id) = todo_auth_events.pop_front() { - if let Some((time, tries)) = self - .services - .globals - .bad_event_ratelimiter - .read() - .expect("locked") - .get(&*next_id) - { - // Exponential backoff - const MIN_DURATION: u64 = 60 * 2; - const MAX_DURATION: u64 = 60 * 60 * 8; - if continue_exponential_backoff_secs( - MIN_DURATION, - MAX_DURATION, - time.elapsed(), - *tries, - ) { - debug_warn!( - tried = ?*tries, - elapsed = ?time.elapsed(), - "Backing off from {next_id}", - ); - continue; - } - } - - if events_all.contains(&next_id) { - continue; - } - - if self.services.timeline.pdu_exists(&next_id).await { - trace!("Found {next_id} in db"); - continue; - } - - debug!("Fetching {next_id} over federation."); - match self - .services - .sending - .send_federation_request(origin, get_event::v1::Request { - event_id: (*next_id).to_owned(), - include_unredacted_content: None, - }) - .await - { - | Ok(res) => { - debug!("Got {next_id} over federation"); - let Ok(room_version_id) = get_room_version_id(create_event) else { - back_off((*next_id).to_owned()); - continue; - }; - - let Ok((calculated_event_id, value)) = - gen_event_id_canonical_json(&res.pdu, &room_version_id) - else { - back_off((*next_id).to_owned()); - continue; - }; - - if calculated_event_id != *next_id { - warn!( - "Server didn't return event id we requested: requested: {next_id}, \ - we got {calculated_event_id}. Event: {:?}", - &res.pdu - ); - } - - if let Some(auth_events) = value - .get("auth_events") - .and_then(CanonicalJsonValue::as_array) - { - for auth_event in auth_events { - match serde_json::from_value::( - auth_event.clone().into(), - ) { - | Ok(auth_event) => { - todo_auth_events.push_back(auth_event); - }, - | _ => { - warn!("Auth event id is not valid"); - }, - } - } - } else { - warn!("Auth event list invalid"); - } - - events_in_reverse_order.push((next_id.clone(), value)); - events_all.insert(next_id); - }, - | Err(e) => { - debug_error!("Failed to fetch event {next_id}: {e}"); - back_off((*next_id).to_owned()); - }, - } - } - - events_with_auth_events.push((id.to_owned(), None, events_in_reverse_order)); + events_with_auth_events.push(outlier); } let mut pdus = Vec::with_capacity(events_with_auth_events.len()); @@ -187,26 +61,12 @@ where } for (next_id, value) in events_in_reverse_order.into_iter().rev() { - if let Some((time, tries)) = self - .services - .globals - .bad_event_ratelimiter - .read() - .expect("locked") - .get(&*next_id) - { - // Exponential backoff - const MIN_DURATION: u64 = 5 * 60; - const MAX_DURATION: u64 = 60 * 60 * 24; - if continue_exponential_backoff_secs( - MIN_DURATION, - MAX_DURATION, - time.elapsed(), - *tries, - ) { - debug!("Backing off from {next_id}"); - continue; - } + if self.is_backed_off(&next_id, Range { + start: Duration::from_secs(5 * 60), + end: Duration::from_secs(60 * 60 * 24), + }) { + debug_warn!("Backing off from {next_id}"); + continue; } match Box::pin(self.handle_outlier_pdu( @@ -225,7 +85,7 @@ where }, | Err(e) => { warn!("Authentication of event {next_id} failed: {e:?}"); - back_off(next_id); + self.back_off(&next_id); }, } } @@ -233,3 +93,109 @@ where pdus } + +#[implement(super::Service)] +async fn fetch_auth<'a, Pdu>( + &self, + _room_id: &'a RoomId, + event_id: &'a EventId, + origin: &'a ServerName, + create_event: &'a Pdu, +) -> (OwnedEventId, Option, Vec<(OwnedEventId, CanonicalJsonObject)>) +where + Pdu: Event, +{ + // a. Look in the main timeline (pduid_pdu tree) + // b. Look at outlier pdu tree + // (get_pdu_json checks both) + if let Ok(local_pdu) = self.services.timeline.get_pdu(event_id).await { + return (event_id.to_owned(), Some(local_pdu), vec![]); + } + + // c. Ask origin server over federation + // We also handle its auth chain here so we don't get a stack overflow in + // handle_outlier_pdu. + let mut todo_auth_events: VecDeque<_> = [event_id.to_owned()].into(); + let mut events_in_reverse_order = Vec::with_capacity(todo_auth_events.len()); + + let mut events_all = HashSet::with_capacity(todo_auth_events.len()); + while let Some(next_id) = todo_auth_events.pop_front() { + if self.is_backed_off(&next_id, Range { + start: Duration::from_secs(2 * 60), + end: Duration::from_secs(60 * 60 * 8), + }) { + debug_warn!("Backing off from {next_id}"); + continue; + } + + if events_all.contains(&next_id) { + continue; + } + + if self.services.timeline.pdu_exists(&next_id).await { + trace!("Found {next_id} in db"); + continue; + } + + debug!("Fetching {next_id} over federation."); + match self + .services + .sending + .send_federation_request(origin, get_event::v1::Request { + event_id: (*next_id).to_owned(), + include_unredacted_content: None, + }) + .await + { + | Ok(res) => { + debug!("Got {next_id} over federation"); + let Ok(room_version_id) = get_room_version_id(create_event) else { + self.back_off(&next_id); + continue; + }; + + let Ok((calculated_event_id, value)) = + gen_event_id_canonical_json(&res.pdu, &room_version_id) + else { + self.back_off(&next_id); + continue; + }; + + if calculated_event_id != *next_id { + warn!( + "Server didn't return event id we requested: requested: {next_id}, we \ + got {calculated_event_id}. Event: {:?}", + &res.pdu + ); + } + + if let Some(auth_events) = value + .get("auth_events") + .and_then(CanonicalJsonValue::as_array) + { + for auth_event in auth_events { + match serde_json::from_value::(auth_event.clone().into()) { + | Ok(auth_event) => { + todo_auth_events.push_back(auth_event); + }, + | _ => { + warn!("Auth event id is not valid"); + }, + } + } + } else { + warn!("Auth event list invalid"); + } + + events_in_reverse_order.push((next_id.clone(), value)); + events_all.insert(next_id); + }, + | Err(e) => { + debug_error!("Failed to fetch event {next_id}: {e}"); + self.back_off(&next_id); + }, + } + } + + (event_id.to_owned(), None, events_in_reverse_order) +} diff --git a/src/service/rooms/event_handler/fetch_prev.rs b/src/service/rooms/event_handler/fetch_prev.rs index f57768bc..26830652 100644 --- a/src/service/rooms/event_handler/fetch_prev.rs +++ b/src/service/rooms/event_handler/fetch_prev.rs @@ -1,11 +1,11 @@ use std::{ - collections::{BTreeMap, HashMap, HashSet, VecDeque}, + collections::{HashMap, HashSet, VecDeque}, iter::once, }; use futures::FutureExt; use ruma::{ - CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName, + CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName, int, uint, }; use tuwunel_core::{ @@ -30,10 +30,7 @@ pub(super) async fn fetch_prev<'a, Pdu, Events>( room_id: &RoomId, first_ts_in_room: MilliSecondsSinceUnixEpoch, initial_set: Events, -) -> Result<( - Vec, - HashMap)>, -)> +) -> Result<(Vec, HashMap)> where Pdu: Event, Events: Iterator + Clone + Send, diff --git a/src/service/rooms/event_handler/handle_incoming_pdu.rs b/src/service/rooms/event_handler/handle_incoming_pdu.rs index d366da75..2aea704f 100644 --- a/src/service/rooms/event_handler/handle_incoming_pdu.rs +++ b/src/service/rooms/event_handler/handle_incoming_pdu.rs @@ -1,13 +1,10 @@ -use std::{ - collections::{BTreeMap, hash_map}, - time::Instant, -}; +use std::time::Instant; use futures::{ FutureExt, TryFutureExt, TryStreamExt, future::{OptionFuture, try_join5}, }; -use ruma::{CanonicalJsonValue, EventId, RoomId, ServerName, UserId, events::StateEventType}; +use ruma::{CanonicalJsonObject, EventId, RoomId, ServerName, UserId, events::StateEventType}; use tuwunel_core::{ Err, Result, debug, debug::INFO_SPAN_LEVEL, defer, err, implement, matrix::Event, utils::stream::IterStream, warn, @@ -55,7 +52,7 @@ pub async fn handle_incoming_pdu<'a>( origin: &'a ServerName, room_id: &'a RoomId, event_id: &'a EventId, - value: BTreeMap, + value: CanonicalJsonObject, is_timeline_event: bool, ) -> Result> { // 1. Skip the PDU if we already have it as a timeline event @@ -160,22 +157,7 @@ pub async fn handle_incoming_pdu<'a>( ) .inspect_err(move |e| { warn!("Prev {prev_id} failed: {e}"); - match self - .services - .globals - .bad_event_ratelimiter - .write() - .expect("locked") - .entry(prev_id.into()) - { - | hash_map::Entry::Vacant(e) => { - e.insert((Instant::now(), 1)); - }, - | hash_map::Entry::Occupied(mut e) => { - let tries = e.get().1.saturating_add(1); - *e.get_mut() = (Instant::now(), tries); - }, - } + self.back_off(prev_id); }) .map(|_| self.services.server.check_running()) }) diff --git a/src/service/rooms/event_handler/handle_outlier_pdu.rs b/src/service/rooms/event_handler/handle_outlier_pdu.rs index 58021d09..4fd9ae9c 100644 --- a/src/service/rooms/event_handler/handle_outlier_pdu.rs +++ b/src/service/rooms/event_handler/handle_outlier_pdu.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, HashMap, hash_map}; +use std::collections::{HashMap, hash_map}; use futures::future::ready; use ruma::{ @@ -22,7 +22,7 @@ pub(super) async fn handle_outlier_pdu<'a, Pdu>( room_id: &'a RoomId, mut value: CanonicalJsonObject, auth_events_known: bool, -) -> Result<(PduEvent, BTreeMap)> +) -> Result<(PduEvent, CanonicalJsonObject)> where Pdu: Event, { diff --git a/src/service/rooms/event_handler/handle_prev_pdu.rs b/src/service/rooms/event_handler/handle_prev_pdu.rs index 59f536a0..359578d4 100644 --- a/src/service/rooms/event_handler/handle_prev_pdu.rs +++ b/src/service/rooms/event_handler/handle_prev_pdu.rs @@ -1,16 +1,17 @@ -use std::{collections::BTreeMap, time::Instant}; +use std::{ + ops::Range, + time::{Duration, Instant}, +}; -use ruma::{CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName}; +use ruma::{CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName}; use tuwunel_core::{ Err, Result, debug, debug::INFO_SPAN_LEVEL, defer, implement, matrix::{Event, PduEvent}, - utils::continue_exponential_backoff_secs, }; #[implement(super::Service)] -#[allow(clippy::type_complexity)] #[allow(clippy::too_many_arguments)] #[tracing::instrument( name = "prev", @@ -23,7 +24,7 @@ pub(super) async fn handle_prev_pdu<'a, Pdu>( origin: &'a ServerName, event_id: &'a EventId, room_id: &'a RoomId, - eventid_info: Option<(PduEvent, BTreeMap)>, + eventid_info: Option<(PduEvent, CanonicalJsonObject)>, create_event: &'a Pdu, first_ts_in_room: MilliSecondsSinceUnixEpoch, prev_id: &'a EventId, @@ -39,25 +40,12 @@ where )))); } - if let Some((time, tries)) = self - .services - .globals - .bad_event_ratelimiter - .read() - .expect("locked") - .get(prev_id) - { - // Exponential backoff - const MIN_DURATION: u64 = 5 * 60; - const MAX_DURATION: u64 = 60 * 60 * 24; - if continue_exponential_backoff_secs(MIN_DURATION, MAX_DURATION, time.elapsed(), *tries) { - debug!( - ?tries, - duration = ?time.elapsed(), - "Backing off from prev_event" - ); - return Ok(()); - } + if self.is_backed_off(prev_id, Range { + start: Duration::from_secs(5 * 60), + end: Duration::from_secs(60 * 60 * 24), + }) { + debug!(?prev_id, "Backing off from prev_event"); + return Ok(()); } let Some((pdu, json)) = eventid_info else { @@ -73,7 +61,7 @@ where self.federation_handletime .write() .expect("locked") - .insert(room_id.into(), ((*prev_id).to_owned(), start_time)); + .insert(room_id.into(), (prev_id.to_owned(), start_time)); defer! {{ self.federation_handletime diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index d37316bf..62c3cb68 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -11,21 +11,22 @@ mod state_at_incoming; mod upgrade_outlier_pdu; use std::{ - collections::HashMap, + collections::{HashMap, hash_map}, fmt::Write, + ops::Range, sync::{Arc, RwLock as StdRwLock}, - time::Instant, + time::{Duration, Instant}, }; use async_trait::async_trait; use ruma::{ - OwnedEventId, OwnedRoomId, RoomId, RoomVersionId, + EventId, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId, events::room::create::RoomCreateEventContent, }; use tuwunel_core::{ - Err, Result, RoomVersion, Server, + Err, Result, RoomVersion, Server, implement, matrix::{Event, PduEvent}, - utils::MutexMap, + utils::{MutexMap, continue_exponential_backoff}, }; use crate::{Dep, globals, rooms, sending, server_keys}; @@ -97,20 +98,58 @@ impl crate::Service for Service { fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } } -impl Service { - async fn event_exists(&self, event_id: OwnedEventId) -> bool { - self.services.timeline.pdu_exists(&event_id).await - } +#[implement(Service)] +fn back_off(&self, event_id: &EventId) { + use hash_map::Entry::{Occupied, Vacant}; - async fn event_fetch(&self, event_id: OwnedEventId) -> Option { - self.services - .timeline - .get_pdu(&event_id) - .await - .ok() + match self + .services + .globals + .bad_event_ratelimiter + .write() + .expect("locked") + .entry(event_id.into()) + { + | Vacant(e) => { + e.insert((Instant::now(), 1)); + }, + | Occupied(mut e) => { + *e.get_mut() = (Instant::now(), e.get().1.saturating_add(1)); + }, } } +#[implement(Service)] +fn is_backed_off(&self, event_id: &EventId, range: Range) -> bool { + let Some((time, tries)) = self + .services + .globals + .bad_event_ratelimiter + .read() + .expect("locked") + .get(event_id) + .copied() + else { + return false; + }; + + continue_exponential_backoff(range.start, range.end, time.elapsed(), tries) +} + +#[implement(Service)] +async fn event_exists(&self, event_id: OwnedEventId) -> bool { + self.services.timeline.pdu_exists(&event_id).await +} + +#[implement(Service)] +async fn event_fetch(&self, event_id: OwnedEventId) -> Option { + self.services + .timeline + .get_pdu(&event_id) + .await + .ok() +} + fn check_room_id(room_id: &RoomId, pdu: &Pdu) -> Result { if pdu.room_id() != room_id { return Err!(Request(InvalidParam(error!( diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index d16996cf..814db63e 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -1,7 +1,7 @@ -use std::{borrow::Borrow, collections::BTreeMap, iter::once, sync::Arc, time::Instant}; +use std::{borrow::Borrow, iter::once, sync::Arc, time::Instant}; use futures::{FutureExt, StreamExt, future::ready}; -use ruma::{CanonicalJsonValue, RoomId, ServerName, events::StateEventType}; +use ruma::{CanonicalJsonObject, RoomId, ServerName, events::StateEventType}; use tuwunel_core::{ Err, Result, debug, debug_info, err, implement, is_equal_to, matrix::{Event, EventTypeExt, PduEvent, StateKey, state_res}, @@ -21,7 +21,7 @@ use crate::rooms::{ pub(super) async fn upgrade_outlier_to_timeline_pdu( &self, incoming_pdu: PduEvent, - val: BTreeMap, + val: CanonicalJsonObject, create_event: &Pdu, origin: &ServerName, room_id: &RoomId,