From 4b03feef8568ad1d7853d56291454338642c0a5c Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Thu, 5 Mar 2026 07:44:35 +0000 Subject: [PATCH] Handle prev_events without interruption by sibling failure. Signed-off-by: Jason Volk --- .../event_handler/handle_incoming_pdu.rs | 92 +++++++++++++------ .../rooms/event_handler/handle_prev_pdu.rs | 32 +++---- .../event_handler/upgrade_outlier_pdu.rs | 5 + 3 files changed, 85 insertions(+), 44 deletions(-) diff --git a/src/service/rooms/event_handler/handle_incoming_pdu.rs b/src/service/rooms/event_handler/handle_incoming_pdu.rs index 6b811da9..20a56183 100644 --- a/src/service/rooms/event_handler/handle_incoming_pdu.rs +++ b/src/service/rooms/event_handler/handle_incoming_pdu.rs @@ -1,17 +1,32 @@ -use futures::{FutureExt, TryFutureExt, TryStreamExt, future::try_join5}; -use ruma::{CanonicalJsonObject, EventId, RoomId, ServerName, UserId, events::StateEventType}; +use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::try_join5}; +use ruma::{ + CanonicalJsonObject, EventId, OwnedEventId, RoomId, ServerName, UserId, + events::StateEventType, +}; use tuwunel_core::{ Err, Result, debug, debug::INFO_SPAN_LEVEL, - err, implement, - matrix::{Event, room_version}, + debug_warn, err, implement, + matrix::{Event, pdu::MAX_PREV_EVENTS, room_version}, + smallvec::SmallVec, trace, - utils::{BoolExt, stream::IterStream}, + utils::{ + BoolExt, + stream::{IterStream, TryWidebandExt}, + }, warn, }; use crate::rooms::timeline::RawPduId; +type PrevResultsHandled = SmallVec<[PrevHandled; MAX_PREV_EVENTS]>; +type PrevHandled = (OwnedEventId, Handled); + +type PrevResults = SmallVec<[PrevResult; MAX_PREV_EVENTS]>; +type PrevResult = (OwnedEventId, Result); + +type Handled = Option<(RawPduId, bool)>; + /// When receiving an event one needs to: /// 0. Check the server is in the room /// 1. Skip the PDU if we already know about it @@ -54,7 +69,7 @@ pub async fn handle_incoming_pdu<'a>( event_id: &'a EventId, pdu: CanonicalJsonObject, is_timeline_event: bool, -) -> Result> { +) -> Result { // 1. Skip the PDU if we already have it as a timeline event if let Ok(pdu_id) = self.services.timeline.get_pdu_id(event_id).await { debug!(?pdu_id, "Exists."); @@ -151,30 +166,51 @@ pub async fn handle_incoming_pdu<'a>( event_ids = ?sorted_prev_events, "Handling previous events" ); - sorted_prev_events - .iter() + let _prev_handles: PrevResultsHandled = sorted_prev_events + .into_iter() + .enumerate() .try_stream() - .map_ok(AsRef::as_ref) - .try_for_each(|prev_id| { - self.handle_prev_pdu( - origin, - room_id, - event_id, - eventid_info.remove(prev_id), - &room_version, - first_ts_in_room, - prev_id, - create_event.event_id(), - ) - .inspect_err(move |e| { - warn!("Prev {prev_id} failed: {e}"); - self.back_off(prev_id); - }) - .inspect_ok(|()| { - self.cancel_back_off(prev_id); - }) - .map(|_| self.services.server.check_running()) + .map_ok(|(i, prev_id)| (i, eventid_info.remove(&prev_id), prev_id)) + .widen_and_then(MAX_PREV_EVENTS, async |(i, eventid_info, prev_id)| { + self.services.server.check_running()?; + match self + .handle_prev_pdu( + origin, + room_id, + event_id, + eventid_info, + &room_version, + first_ts_in_room, + &prev_id, + create_event.event_id(), + ) + .await + { + | Ok(Some(handled)) => { + self.cancel_back_off(&prev_id); + debug!(?i, ?prev_id, ?handled, "Prev event processed."); + + Ok((prev_id, Ok(Some(handled)))) + }, + | Ok(None) => { + debug_warn!(?i, ?prev_id, "Prev event not processed."); + + Ok((prev_id, Ok(None))) + }, + | Err(e) => { + self.back_off(&prev_id); + warn!(?i, ?prev_id, "Prev event processing failed: {e}"); + + Ok((prev_id, Err(e))) + }, + } }) + .try_collect::() + .map_ok(PrevResults::into_iter) + .map_ok(IterStream::stream) + .map_ok(|s| s.map(|(id, res)| res.map(|res| (id, res)))) + .try_flatten_stream() + .try_collect() .boxed() .await?; diff --git a/src/service/rooms/event_handler/handle_prev_pdu.rs b/src/service/rooms/event_handler/handle_prev_pdu.rs index 35b0a24d..5c045d29 100644 --- a/src/service/rooms/event_handler/handle_prev_pdu.rs +++ b/src/service/rooms/event_handler/handle_prev_pdu.rs @@ -7,8 +7,8 @@ use ruma::{ use tuwunel_core::{ Err, Result, debug, debug::INFO_SPAN_LEVEL, - implement, - matrix::{Event, PduEvent}, + debug_warn, implement, + matrix::{Event, PduEvent, pdu::RawPduId}, }; #[implement(super::Service)] @@ -29,7 +29,7 @@ pub(super) async fn handle_prev_pdu( first_ts_in_room: MilliSecondsSinceUnixEpoch, prev_id: &EventId, create_event_id: &EventId, -) -> Result { +) -> Result> { // Check for disabled again because it might have changed if self.services.metadata.is_disabled(room_id).await { return Err!(Request(Forbidden(debug_warn!( @@ -38,21 +38,23 @@ pub(super) async fn handle_prev_pdu( )))); } + let Some((pdu, json)) = eventid_info else { + debug!(?prev_id, "Missing eventid_info."); + return Ok(None); + }; + + // Skip old events + if pdu.origin_server_ts() < first_ts_in_room { + debug_warn!(?prev_id, "origin_server_ts older than room"); + return Ok(None); + } + if self.is_backed_off(prev_id, Range { start: Duration::from_secs(5 * 60), end: Duration::from_secs(60 * 60 * 24), }) { debug!(?prev_id, "Backing off from prev_event"); - return Ok(()); - } - - let Some((pdu, json)) = eventid_info else { - return Ok(()); - }; - - // Skip old events - if pdu.origin_server_ts() < first_ts_in_room { - return Ok(()); + return Ok(None); } self.upgrade_outlier_to_timeline_pdu( @@ -64,7 +66,5 @@ pub(super) async fn handle_prev_pdu( create_event_id, ) .boxed() - .await?; - - Ok(()) + .await } diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index 7dabfff3..996f39cf 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -278,6 +278,11 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( ) .await?; + debug_assert!( + pdu_id.is_some() || soft_fail, + "Ok(None) returned by timeline for soft-failed PDU's" + ); + if soft_fail { self.services .pdu_metadata