Handle prev_events without interruption by sibling failure.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2026-03-05 07:44:35 +00:00
parent 3fa22ea9d9
commit 4b03feef85
3 changed files with 85 additions and 44 deletions

View File

@@ -1,17 +1,32 @@
use futures::{FutureExt, TryFutureExt, TryStreamExt, future::try_join5};
use ruma::{CanonicalJsonObject, EventId, RoomId, ServerName, UserId, events::StateEventType};
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::try_join5};
use ruma::{
CanonicalJsonObject, EventId, OwnedEventId, RoomId, ServerName, UserId,
events::StateEventType,
};
use tuwunel_core::{
Err, Result, debug,
debug::INFO_SPAN_LEVEL,
err, implement,
matrix::{Event, room_version},
debug_warn, err, implement,
matrix::{Event, pdu::MAX_PREV_EVENTS, room_version},
smallvec::SmallVec,
trace,
utils::{BoolExt, stream::IterStream},
utils::{
BoolExt,
stream::{IterStream, TryWidebandExt},
},
warn,
};
use crate::rooms::timeline::RawPduId;
type PrevResultsHandled = SmallVec<[PrevHandled; MAX_PREV_EVENTS]>;
type PrevHandled = (OwnedEventId, Handled);
type PrevResults = SmallVec<[PrevResult; MAX_PREV_EVENTS]>;
type PrevResult = (OwnedEventId, Result<Handled>);
type Handled = Option<(RawPduId, bool)>;
/// When receiving an event one needs to:
/// 0. Check the server is in the room
/// 1. Skip the PDU if we already know about it
@@ -54,7 +69,7 @@ pub async fn handle_incoming_pdu<'a>(
event_id: &'a EventId,
pdu: CanonicalJsonObject,
is_timeline_event: bool,
) -> Result<Option<(RawPduId, bool)>> {
) -> Result<Handled> {
// 1. Skip the PDU if we already have it as a timeline event
if let Ok(pdu_id) = self.services.timeline.get_pdu_id(event_id).await {
debug!(?pdu_id, "Exists.");
@@ -151,30 +166,51 @@ pub async fn handle_incoming_pdu<'a>(
event_ids = ?sorted_prev_events,
"Handling previous events"
);
sorted_prev_events
.iter()
let _prev_handles: PrevResultsHandled = sorted_prev_events
.into_iter()
.enumerate()
.try_stream()
.map_ok(AsRef::as_ref)
.try_for_each(|prev_id| {
self.handle_prev_pdu(
origin,
room_id,
event_id,
eventid_info.remove(prev_id),
&room_version,
first_ts_in_room,
prev_id,
create_event.event_id(),
)
.inspect_err(move |e| {
warn!("Prev {prev_id} failed: {e}");
self.back_off(prev_id);
})
.inspect_ok(|()| {
self.cancel_back_off(prev_id);
})
.map(|_| self.services.server.check_running())
.map_ok(|(i, prev_id)| (i, eventid_info.remove(&prev_id), prev_id))
.widen_and_then(MAX_PREV_EVENTS, async |(i, eventid_info, prev_id)| {
self.services.server.check_running()?;
match self
.handle_prev_pdu(
origin,
room_id,
event_id,
eventid_info,
&room_version,
first_ts_in_room,
&prev_id,
create_event.event_id(),
)
.await
{
| Ok(Some(handled)) => {
self.cancel_back_off(&prev_id);
debug!(?i, ?prev_id, ?handled, "Prev event processed.");
Ok((prev_id, Ok(Some(handled))))
},
| Ok(None) => {
debug_warn!(?i, ?prev_id, "Prev event not processed.");
Ok((prev_id, Ok(None)))
},
| Err(e) => {
self.back_off(&prev_id);
warn!(?i, ?prev_id, "Prev event processing failed: {e}");
Ok((prev_id, Err(e)))
},
}
})
.try_collect::<PrevResults>()
.map_ok(PrevResults::into_iter)
.map_ok(IterStream::stream)
.map_ok(|s| s.map(|(id, res)| res.map(|res| (id, res))))
.try_flatten_stream()
.try_collect()
.boxed()
.await?;

View File

@@ -7,8 +7,8 @@ use ruma::{
use tuwunel_core::{
Err, Result, debug,
debug::INFO_SPAN_LEVEL,
implement,
matrix::{Event, PduEvent},
debug_warn, implement,
matrix::{Event, PduEvent, pdu::RawPduId},
};
#[implement(super::Service)]
@@ -29,7 +29,7 @@ pub(super) async fn handle_prev_pdu(
first_ts_in_room: MilliSecondsSinceUnixEpoch,
prev_id: &EventId,
create_event_id: &EventId,
) -> Result {
) -> Result<Option<(RawPduId, bool)>> {
// Check for disabled again because it might have changed
if self.services.metadata.is_disabled(room_id).await {
return Err!(Request(Forbidden(debug_warn!(
@@ -38,21 +38,23 @@ pub(super) async fn handle_prev_pdu(
))));
}
let Some((pdu, json)) = eventid_info else {
debug!(?prev_id, "Missing eventid_info.");
return Ok(None);
};
// Skip old events
if pdu.origin_server_ts() < first_ts_in_room {
debug_warn!(?prev_id, "origin_server_ts older than room");
return Ok(None);
}
if self.is_backed_off(prev_id, Range {
start: Duration::from_secs(5 * 60),
end: Duration::from_secs(60 * 60 * 24),
}) {
debug!(?prev_id, "Backing off from prev_event");
return Ok(());
}
let Some((pdu, json)) = eventid_info else {
return Ok(());
};
// Skip old events
if pdu.origin_server_ts() < first_ts_in_room {
return Ok(());
return Ok(None);
}
self.upgrade_outlier_to_timeline_pdu(
@@ -64,7 +66,5 @@ pub(super) async fn handle_prev_pdu(
create_event_id,
)
.boxed()
.await?;
Ok(())
.await
}

View File

@@ -278,6 +278,11 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
)
.await?;
debug_assert!(
pdu_id.is_some() || soft_fail,
"Ok(None) returned by timeline for soft-failed PDU's"
);
if soft_fail {
self.services
.pdu_metadata