diff --git a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs b/src/service/rooms/event_handler/fetch_auth.rs similarity index 85% rename from src/service/rooms/event_handler/fetch_and_handle_outliers.rs rename to src/service/rooms/event_handler/fetch_auth.rs index 3a03a19d..1fdb2b49 100644 --- a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs +++ b/src/service/rooms/event_handler/fetch_auth.rs @@ -5,20 +5,15 @@ use std::{ }; use ruma::{ - CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName, - api::federation::event::get_event, + CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, RoomId, RoomVersionId, + ServerName, api::federation::event::get_event, }; use tuwunel_core::{ debug, debug_error, debug_warn, implement, - matrix::{ - PduEvent, - event::{Event, gen_event_id_canonical_json}, - }, + matrix::{PduEvent, event::gen_event_id_canonical_json}, trace, warn, }; -use super::get_room_version_id; - /// Find the event and auth it. Once the event is validated (steps 1 - 8) /// it is appended to the outliers Tree. /// @@ -29,22 +24,21 @@ use super::get_room_version_id; /// c. Ask origin server over federation /// d. TODO: Ask other servers over federation? #[implement(super::Service)] -pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>( +pub(super) async fn fetch_auth<'a, Events>( &self, - origin: &'a ServerName, + origin: &ServerName, + room_id: &RoomId, events: Events, - create_event: &'a Pdu, - room_id: &'a RoomId, + room_version: &RoomVersionId, ) -> Vec<(PduEvent, Option)> where - Pdu: Event, Events: Iterator + Clone + Send, { let mut events_with_auth_events = Vec::with_capacity(events.clone().count()); for event_id in events { let outlier = self - .fetch_auth(room_id, event_id, origin, create_event) + .fetch_auth_chain(origin, room_id, event_id, room_version) .await; events_with_auth_events.push(outlier); @@ -71,10 +65,10 @@ where match Box::pin(self.handle_outlier_pdu( origin, - create_event, - &next_id, room_id, + &next_id, value.clone(), + room_version, true, )) .await @@ -95,16 +89,13 @@ where } #[implement(super::Service)] -async fn fetch_auth<'a, Pdu>( +async fn fetch_auth_chain( &self, - _room_id: &'a RoomId, - event_id: &'a EventId, - origin: &'a ServerName, - create_event: &'a Pdu, -) -> (OwnedEventId, Option, Vec<(OwnedEventId, CanonicalJsonObject)>) -where - Pdu: Event, -{ + origin: &ServerName, + _room_id: &RoomId, + event_id: &EventId, + room_version: &RoomVersionId, +) -> (OwnedEventId, Option, Vec<(OwnedEventId, CanonicalJsonObject)>) { // a. Look in the main timeline (pduid_pdu tree) // b. Look at outlier pdu tree // (get_pdu_json checks both) @@ -149,13 +140,8 @@ where { | Ok(res) => { debug!("Got {next_id} over federation"); - let Ok(room_version_id) = get_room_version_id(create_event) else { - self.back_off(&next_id); - continue; - }; - let Ok((calculated_event_id, value)) = - gen_event_id_canonical_json(&res.pdu, &room_version_id) + gen_event_id_canonical_json(&res.pdu, room_version) else { self.back_off(&next_id); continue; diff --git a/src/service/rooms/event_handler/fetch_prev.rs b/src/service/rooms/event_handler/fetch_prev.rs index 26830652..57574826 100644 --- a/src/service/rooms/event_handler/fetch_prev.rs +++ b/src/service/rooms/event_handler/fetch_prev.rs @@ -5,8 +5,8 @@ use std::{ use futures::FutureExt; use ruma::{ - CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName, - int, uint, + CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, + RoomVersionId, ServerName, int, uint, }; use tuwunel_core::{ Result, debug_warn, err, implement, @@ -23,16 +23,15 @@ use super::check_room_id; fields(%origin), )] #[allow(clippy::type_complexity)] -pub(super) async fn fetch_prev<'a, Pdu, Events>( +pub(super) async fn fetch_prev<'a, Events>( &self, origin: &ServerName, - create_event: &Pdu, room_id: &RoomId, - first_ts_in_room: MilliSecondsSinceUnixEpoch, initial_set: Events, + room_version: &RoomVersionId, + first_ts_in_room: MilliSecondsSinceUnixEpoch, ) -> Result<(Vec, HashMap)> where - Pdu: Event, Events: Iterator + Clone + Send, { let num_ids = initial_set.clone().count(); @@ -47,12 +46,7 @@ where self.services.server.check_running()?; match self - .fetch_and_handle_outliers( - origin, - once(prev_event_id.as_ref()), - create_event, - room_id, - ) + .fetch_auth(origin, room_id, once(prev_event_id.as_ref()), room_version) .boxed() .await .pop() @@ -62,7 +56,7 @@ where let limit = self.services.server.config.max_fetch_prev_events; if amount > limit { - debug_warn!("Max prev event limit reached! Limit: {limit}"); + debug_warn!(?limit, "Max prev event limit reached!"); graph.insert(prev_event_id.clone(), HashSet::new()); continue; } diff --git a/src/service/rooms/event_handler/fetch_state.rs b/src/service/rooms/event_handler/fetch_state.rs index 82591631..d2cbe757 100644 --- a/src/service/rooms/event_handler/fetch_state.rs +++ b/src/service/rooms/event_handler/fetch_state.rs @@ -2,8 +2,8 @@ use std::collections::{HashMap, hash_map}; use futures::FutureExt; use ruma::{ - EventId, OwnedEventId, RoomId, ServerName, api::federation::event::get_room_state_ids, - events::StateEventType, + EventId, OwnedEventId, RoomId, RoomVersionId, ServerName, + api::federation::event::get_room_state_ids, events::StateEventType, }; use tuwunel_core::{Err, Result, debug, debug_warn, err, implement, matrix::Event}; @@ -18,16 +18,14 @@ use crate::rooms::short::ShortStateKey; skip_all, fields(%origin), )] -pub(super) async fn fetch_state( +pub(super) async fn fetch_state( &self, origin: &ServerName, - create_event: &Pdu, room_id: &RoomId, event_id: &EventId, -) -> Result>> -where - Pdu: Event, -{ + room_version: &RoomVersionId, + create_event_id: &EventId, +) -> Result>> { let res = self .services .sending @@ -41,7 +39,7 @@ where debug!("Fetching state events"); let state_ids = res.pdu_ids.iter().map(AsRef::as_ref); let state_vec = self - .fetch_and_handle_outliers(origin, state_ids, create_event, room_id) + .fetch_auth(origin, room_id, state_ids, room_version) .boxed() .await; @@ -79,7 +77,7 @@ where if state .get(&create_shortstatekey) .map(AsRef::as_ref) - != Some(create_event.event_id()) + != Some(create_event_id) { return Err!(Database("Incoming event refers to wrong create event.")); } diff --git a/src/service/rooms/event_handler/handle_incoming_pdu.rs b/src/service/rooms/event_handler/handle_incoming_pdu.rs index 280b7aaf..e9ed07df 100644 --- a/src/service/rooms/event_handler/handle_incoming_pdu.rs +++ b/src/service/rooms/event_handler/handle_incoming_pdu.rs @@ -8,6 +8,7 @@ use tuwunel_core::{ utils::stream::IterStream, warn, }; +use super::get_room_version_id; use crate::rooms::timeline::RawPduId; /// When receiving an event one needs to: @@ -50,7 +51,7 @@ pub async fn handle_incoming_pdu<'a>( origin: &'a ServerName, room_id: &'a RoomId, event_id: &'a EventId, - value: CanonicalJsonObject, + pdu: CanonicalJsonObject, is_timeline_event: bool, ) -> Result> { // 1. Skip the PDU if we already have it as a timeline event @@ -72,7 +73,7 @@ pub async fn handle_incoming_pdu<'a>( let origin_acl_check = self.acl_check(origin, room_id); // 1.3.2 Check room ACL on sender's server name - let sender: &UserId = value + let sender: &UserId = pdu .get("sender") .try_into() .map_err(|e| err!(Request(InvalidParam("PDU does not have a valid sender key: {e}"))))?; @@ -106,8 +107,10 @@ pub async fn handle_incoming_pdu<'a>( return Err!(Request(Forbidden("Federation of this room is disabled by this server."))); } + let room_version = get_room_version_id(create_event)?; + let (incoming_pdu, val) = self - .handle_outlier_pdu(origin, create_event, event_id, room_id, value, false) + .handle_outlier_pdu(origin, room_id, event_id, pdu, &room_version, false) .boxed() .await?; @@ -131,7 +134,7 @@ pub async fn handle_incoming_pdu<'a>( // 9. Fetch any missing prev events doing all checks listed here starting at 1. // These are timeline events let (sorted_prev_events, mut eventid_info) = self - .fetch_prev(origin, create_event, room_id, first_ts_in_room, incoming_pdu.prev_events()) + .fetch_prev(origin, room_id, incoming_pdu.prev_events(), &room_version, first_ts_in_room) .await?; debug!( @@ -146,12 +149,13 @@ pub async fn handle_incoming_pdu<'a>( .try_for_each(|prev_id| { self.handle_prev_pdu( origin, - event_id, room_id, + event_id, eventid_info.remove(prev_id), - create_event, + &room_version, first_ts_in_room, prev_id, + create_event.event_id(), ) .inspect_err(move |e| { warn!("Prev {prev_id} failed: {e}"); @@ -163,7 +167,14 @@ pub async fn handle_incoming_pdu<'a>( .await?; // Done with prev events, now handling the incoming event - self.upgrade_outlier_to_timeline_pdu(incoming_pdu, val, create_event, origin, room_id) - .boxed() - .await + self.upgrade_outlier_to_timeline_pdu( + origin, + room_id, + incoming_pdu, + val, + &room_version, + create_event.event_id(), + ) + .boxed() + .await } diff --git a/src/service/rooms/event_handler/handle_outlier_pdu.rs b/src/service/rooms/event_handler/handle_outlier_pdu.rs index 4fd9ae9c..7c8310fc 100644 --- a/src/service/rooms/event_handler/handle_outlier_pdu.rs +++ b/src/service/rooms/event_handler/handle_outlier_pdu.rs @@ -2,7 +2,8 @@ use std::collections::{HashMap, hash_map}; use futures::future::ready; use ruma::{ - CanonicalJsonObject, CanonicalJsonValue, EventId, RoomId, ServerName, events::StateEventType, + CanonicalJsonObject, CanonicalJsonValue, EventId, RoomId, RoomVersionId, ServerName, + events::StateEventType, }; use tuwunel_core::{ Err, Result, debug, debug_info, err, implement, @@ -10,41 +11,36 @@ use tuwunel_core::{ state_res, trace, warn, }; -use super::{check_room_id, get_room_version_id, to_room_version}; +use super::{check_room_id, to_room_version}; #[implement(super::Service)] -#[allow(clippy::too_many_arguments)] -pub(super) async fn handle_outlier_pdu<'a, Pdu>( +pub(super) async fn handle_outlier_pdu( &self, - origin: &'a ServerName, - create_event: &'a Pdu, - event_id: &'a EventId, - room_id: &'a RoomId, - mut value: CanonicalJsonObject, + origin: &ServerName, + room_id: &RoomId, + event_id: &EventId, + mut pdu_json: CanonicalJsonObject, + room_version: &RoomVersionId, auth_events_known: bool, -) -> Result<(PduEvent, CanonicalJsonObject)> -where - Pdu: Event, -{ +) -> Result<(PduEvent, CanonicalJsonObject)> { // 1. Remove unsigned field - value.remove("unsigned"); - - // TODO: For RoomVersion6 we must check that Raw<..> is canonical do we anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json + pdu_json.remove("unsigned"); + // TODO: For RoomVersion6 we must check that Raw<..> is canonical do we + // anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json // 2. Check signatures, otherwise drop // 3. check content hash, redact if doesn't match - let room_version_id = get_room_version_id(create_event)?; - let mut incoming_pdu = match self + let mut pdu_json = match self .services .server_keys - .verify_event(&value, Some(&room_version_id)) + .verify_event(&pdu_json, Some(room_version)) .await { - | Ok(ruma::signatures::Verified::All) => value, + | Ok(ruma::signatures::Verified::All) => pdu_json, | Ok(ruma::signatures::Verified::Signatures) => { // Redact debug_info!("Calculated hash does not match (redaction): {event_id}"); - let Ok(obj) = ruma::canonical_json::redact(value, &room_version_id, None) else { + let Ok(obj) = ruma::canonical_json::redact(pdu_json, room_version, None) else { return Err!(Request(InvalidParam("Redaction failed"))); }; @@ -66,13 +62,13 @@ where // Now that we have checked the signature and hashes we can add the eventID and // convert to our PduEvent type - incoming_pdu + pdu_json .insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.as_str().to_owned())); - let pdu_event = serde_json::from_value::(serde_json::to_value(&incoming_pdu)?) + let event = serde_json::from_value::(serde_json::to_value(&pdu_json)?) .map_err(|e| err!(Request(BadJson(debug_warn!("Event is not a valid PDU: {e}")))))?; - check_room_id(room_id, &pdu_event)?; + check_room_id(room_id, &event)?; if !auth_events_known { // 4. fetch any missing auth events doing all checks listed here starting at 1. @@ -81,21 +77,15 @@ where // the auth events are also rejected "due to auth events" // NOTE: Step 5 is not applied anymore because it failed too often debug!("Fetching auth events"); - Box::pin(self.fetch_and_handle_outliers( - origin, - pdu_event.auth_events(), - create_event, - room_id, - )) - .await; + Box::pin(self.fetch_auth(origin, room_id, event.auth_events(), room_version)).await; } // 6. Reject "due to auth events" if the event doesn't pass auth based on the // auth events debug!("Checking based on auth events"); // Build map of auth events - let mut auth_events = HashMap::with_capacity(pdu_event.auth_events().count()); - for id in pdu_event.auth_events() { + let mut auth_events = HashMap::with_capacity(event.auth_events().count()); + for id in event.auth_events() { let Ok(auth_event) = self.services.timeline.get_pdu(id).await else { warn!("Could not find auth event {id}"); continue; @@ -135,8 +125,8 @@ where }; let auth_check = state_res::event_auth::auth_check( - &to_room_version(&room_version_id), - &pdu_event, + &to_room_version(room_version), + &event, None, // TODO: third party invite state_fetch, ) @@ -152,9 +142,9 @@ where // 7. Persist the event as an outlier. self.services .timeline - .add_pdu_outlier(pdu_event.event_id(), &incoming_pdu); + .add_pdu_outlier(event.event_id(), &pdu_json); trace!("Added pdu as outlier."); - Ok((pdu_event, incoming_pdu)) + Ok((event, pdu_json)) } diff --git a/src/service/rooms/event_handler/handle_prev_pdu.rs b/src/service/rooms/event_handler/handle_prev_pdu.rs index 9d1d5143..6349f283 100644 --- a/src/service/rooms/event_handler/handle_prev_pdu.rs +++ b/src/service/rooms/event_handler/handle_prev_pdu.rs @@ -1,6 +1,8 @@ use std::{ops::Range, time::Duration}; -use ruma::{CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName}; +use ruma::{ + CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, RoomId, RoomVersionId, ServerName, +}; use tuwunel_core::{ Err, Result, debug, debug::INFO_SPAN_LEVEL, @@ -16,19 +18,17 @@ use tuwunel_core::{ skip_all, fields(%prev_id), )] -pub(super) async fn handle_prev_pdu<'a, Pdu>( +pub(super) async fn handle_prev_pdu( &self, - origin: &'a ServerName, - event_id: &'a EventId, - room_id: &'a RoomId, + origin: &ServerName, + room_id: &RoomId, + event_id: &EventId, eventid_info: Option<(PduEvent, CanonicalJsonObject)>, - create_event: &'a Pdu, + room_version: &RoomVersionId, first_ts_in_room: MilliSecondsSinceUnixEpoch, - prev_id: &'a EventId, -) -> Result -where - Pdu: Event, -{ + prev_id: &EventId, + create_event_id: &EventId, +) -> Result { // Check for disabled again because it might have changed if self.services.metadata.is_disabled(room_id).await { return Err!(Request(Forbidden(debug_warn!( @@ -54,8 +54,15 @@ where return Ok(()); } - self.upgrade_outlier_to_timeline_pdu(pdu, json, create_event, origin, room_id) - .await?; + self.upgrade_outlier_to_timeline_pdu( + origin, + room_id, + pdu, + json, + room_version, + create_event_id, + ) + .await?; Ok(()) } diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index 0eea93b1..da9e36c3 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -1,5 +1,5 @@ mod acl_check; -mod fetch_and_handle_outliers; +mod fetch_auth; mod fetch_prev; mod fetch_state; mod handle_incoming_pdu; diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index 814db63e..ddd61d49 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -1,7 +1,9 @@ use std::{borrow::Borrow, iter::once, sync::Arc, time::Instant}; use futures::{FutureExt, StreamExt, future::ready}; -use ruma::{CanonicalJsonObject, RoomId, ServerName, events::StateEventType}; +use ruma::{ + CanonicalJsonObject, EventId, RoomId, RoomVersionId, ServerName, events::StateEventType, +}; use tuwunel_core::{ Err, Result, debug, debug_info, err, implement, is_equal_to, matrix::{Event, EventTypeExt, PduEvent, StateKey, state_res}, @@ -10,7 +12,7 @@ use tuwunel_core::{ warn, }; -use super::{get_room_version_id, to_room_version}; +use super::to_room_version; use crate::rooms::{ state_compressor::{CompressedState, HashSetCompressStateEvent}, timeline::RawPduId, @@ -18,17 +20,15 @@ use crate::rooms::{ #[implement(super::Service)] #[tracing::instrument(name = "upgrade", level = "debug", skip_all, ret(Debug))] -pub(super) async fn upgrade_outlier_to_timeline_pdu( +pub(super) async fn upgrade_outlier_to_timeline_pdu( &self, - incoming_pdu: PduEvent, - val: CanonicalJsonObject, - create_event: &Pdu, origin: &ServerName, room_id: &RoomId, -) -> Result> -where - Pdu: Event, -{ + incoming_pdu: PduEvent, + val: CanonicalJsonObject, + room_version: &RoomVersionId, + create_event_id: &EventId, +) -> Result> { // Skip the PDU if we already have it as a timeline event if let Ok(pduid) = self .services @@ -50,7 +50,6 @@ where debug!("Upgrading to timeline pdu"); let timer = Instant::now(); - let room_version_id = get_room_version_id(create_event)?; // 10. Fetch missing state and auth chain events by calling /state_ids at // backwards extremities doing all the checks in this list starting at 1. @@ -61,22 +60,21 @@ where self.state_at_incoming_degree_one(&incoming_pdu) .await? } else { - self.state_at_incoming_resolved(&incoming_pdu, room_id, &room_version_id) + self.state_at_incoming_resolved(&incoming_pdu, room_id, room_version) .boxed() .await? }; if state_at_incoming_event.is_none() { state_at_incoming_event = self - .fetch_state(origin, create_event, room_id, incoming_pdu.event_id()) + .fetch_state(origin, room_id, incoming_pdu.event_id(), room_version, create_event_id) + .boxed() .await?; } let state_at_incoming_event = state_at_incoming_event.expect("we always set this to some above"); - let room_version = to_room_version(&room_version_id); - debug!("Performing auth check"); // 11. Check the auth of the event passes based on the state of the event let state_fetch_state = &state_at_incoming_event; @@ -97,7 +95,7 @@ where }; let auth_check = state_res::event_auth::auth_check( - &room_version, + &to_room_version(room_version), &incoming_pdu, None, // TODO: third party invite |ty, sk| state_fetch(ty.clone(), sk.into()), @@ -128,7 +126,7 @@ where }; let auth_check = state_res::event_auth::auth_check( - &room_version, + &to_room_version(room_version), &incoming_pdu, None, // third-party invite state_fetch, @@ -138,7 +136,7 @@ where // Soft fail check before doing state res debug!("Performing soft-fail check"); - let soft_fail = match (auth_check, incoming_pdu.redacts_id(&room_version_id)) { + let soft_fail = match (auth_check, incoming_pdu.redacts_id(room_version)) { | (false, _) => true, | (true, None) => false, | (true, Some(redact_id)) => @@ -216,7 +214,7 @@ where } let new_room_state = self - .resolve_state(room_id, &room_version_id, state_after) + .resolve_state(room_id, room_version, state_after) .boxed() .await?;