From 14b9c5df45a1cc7002748ba17c75e73d22ed8a2d Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sun, 8 Mar 2026 05:08:12 +0000 Subject: [PATCH] Instrument recursion level in event_handler. Signed-off-by: Jason Volk --- src/service/rooms/event_handler/fetch_auth.rs | 12 +++++++++--- src/service/rooms/event_handler/fetch_prev.rs | 18 ++++++++++++++---- .../rooms/event_handler/fetch_state.rs | 3 ++- .../event_handler/handle_incoming_pdu.rs | 14 ++++++++++++-- .../rooms/event_handler/handle_outlier_pdu.rs | 19 +++++++++++++++++-- .../rooms/event_handler/handle_prev_pdu.rs | 2 ++ .../event_handler/upgrade_outlier_pdu.rs | 14 ++++++++++++-- 7 files changed, 68 insertions(+), 14 deletions(-) diff --git a/src/service/rooms/event_handler/fetch_auth.rs b/src/service/rooms/event_handler/fetch_auth.rs index 97b19710..4595dacb 100644 --- a/src/service/rooms/event_handler/fetch_auth.rs +++ b/src/service/rooms/event_handler/fetch_auth.rs @@ -10,7 +10,7 @@ use ruma::{ ServerName, api::federation::event::get_event, }; use tuwunel_core::{ - debug, debug_error, debug_warn, implement, + debug, debug_error, debug_warn, expected, implement, matrix::{PduEvent, event::gen_event_id_canonical_json, pdu::MAX_AUTH_EVENTS}, trace, utils::stream::{BroadbandExt, IterStream, ReadyExt}, @@ -30,7 +30,11 @@ use tuwunel_core::{ #[tracing::instrument( level = "debug", skip_all, - fields(%origin), + fields( + %origin, + events = %events.clone().count(), + lev = %recursion_level, + ), )] pub(super) async fn fetch_auth<'a, Events>( &self, @@ -38,9 +42,10 @@ pub(super) async fn fetch_auth<'a, Events>( room_id: &RoomId, events: Events, room_version: &RoomVersionId, + recursion_level: usize, ) -> Vec<(PduEvent, Option)> where - Events: Iterator + Send, + Events: Iterator + Clone + Send, { let events_with_auth_events: Vec<_> = events .stream() @@ -79,6 +84,7 @@ where &next_id, value.clone(), room_version, + expected!(recursion_level + 1), true, )); diff --git a/src/service/rooms/event_handler/fetch_prev.rs b/src/service/rooms/event_handler/fetch_prev.rs index 738c6c68..884512c7 100644 --- a/src/service/rooms/event_handler/fetch_prev.rs +++ b/src/service/rooms/event_handler/fetch_prev.rs @@ -18,7 +18,10 @@ use crate::rooms::state_res; #[tracing::instrument( level = "debug", skip_all, - fields(%origin), + fields( + %origin, + events = %initial_set.clone().count(), + ), )] #[expect(clippy::type_complexity)] pub(super) async fn fetch_prev<'a, Events>( @@ -27,10 +30,11 @@ pub(super) async fn fetch_prev<'a, Events>( room_id: &RoomId, initial_set: Events, room_version: &RoomVersionId, + recursion_level: usize, first_ts_in_room: MilliSecondsSinceUnixEpoch, ) -> Result<(Vec, HashMap)> where - Events: Iterator + Send, + Events: Iterator + Clone + Send, { let mut todo_outlier_stack: FuturesOrdered<_> = initial_set .stream() @@ -46,7 +50,7 @@ where .map(async |event_id| { let events = once(event_id.as_ref()); let auth = self - .fetch_auth(origin, room_id, events, room_version) + .fetch_auth(origin, room_id, events, room_version, recursion_level) .await; (event_id, auth) @@ -104,7 +108,13 @@ where let prev_prev = prev_prev.to_owned(); let fetch = async move { let fetch = self - .fetch_auth(origin, room_id, once(prev_prev.as_ref()), room_version) + .fetch_auth( + origin, + room_id, + once(prev_prev.as_ref()), + room_version, + recursion_level, + ) .await; (prev_prev, fetch) diff --git a/src/service/rooms/event_handler/fetch_state.rs b/src/service/rooms/event_handler/fetch_state.rs index c95f4caa..faa78027 100644 --- a/src/service/rooms/event_handler/fetch_state.rs +++ b/src/service/rooms/event_handler/fetch_state.rs @@ -24,6 +24,7 @@ pub(super) async fn fetch_state( room_id: &RoomId, event_id: &EventId, room_version: &RoomVersionId, + recursion_level: usize, create_event_id: &EventId, ) -> Result>> { let res = self @@ -39,7 +40,7 @@ pub(super) async fn fetch_state( debug!("Fetching state events"); let state_ids = res.pdu_ids.iter().map(AsRef::as_ref); let state_vec = self - .fetch_auth(origin, room_id, state_ids, room_version) + .fetch_auth(origin, room_id, state_ids, room_version, recursion_level) .boxed() .await; diff --git a/src/service/rooms/event_handler/handle_incoming_pdu.rs b/src/service/rooms/event_handler/handle_incoming_pdu.rs index 20a56183..8d50c5f4 100644 --- a/src/service/rooms/event_handler/handle_incoming_pdu.rs +++ b/src/service/rooms/event_handler/handle_incoming_pdu.rs @@ -124,9 +124,10 @@ pub async fn handle_incoming_pdu<'a>( } let room_version = room_version::from_create_event(create_event)?; + let recursion_level = 0; let (incoming_pdu, pdu) = self - .handle_outlier_pdu(origin, room_id, event_id, pdu, &room_version, false) + .handle_outlier_pdu(origin, room_id, event_id, pdu, &room_version, recursion_level, false) .await?; // 8. if not timeline event: stop @@ -158,7 +159,14 @@ pub async fn handle_incoming_pdu<'a>( // 9. Fetch any missing prev events doing all checks listed here starting at 1. // These are timeline events let (sorted_prev_events, mut eventid_info) = self - .fetch_prev(origin, room_id, incoming_pdu.prev_events(), &room_version, first_ts_in_room) + .fetch_prev( + origin, + room_id, + incoming_pdu.prev_events(), + &room_version, + recursion_level, + first_ts_in_room, + ) .await?; trace!( @@ -180,6 +188,7 @@ pub async fn handle_incoming_pdu<'a>( event_id, eventid_info, &room_version, + recursion_level, first_ts_in_room, &prev_id, create_event.event_id(), @@ -221,6 +230,7 @@ pub async fn handle_incoming_pdu<'a>( incoming_pdu, pdu, &room_version, + recursion_level, create_event.event_id(), ) .boxed() diff --git a/src/service/rooms/event_handler/handle_outlier_pdu.rs b/src/service/rooms/event_handler/handle_outlier_pdu.rs index 8c3383a4..05383666 100644 --- a/src/service/rooms/event_handler/handle_outlier_pdu.rs +++ b/src/service/rooms/event_handler/handle_outlier_pdu.rs @@ -15,6 +15,13 @@ use super::check_room_id; use crate::rooms::state_res; #[implement(super::Service)] +#[cfg_attr(unabridged, tracing::instrument( + name = "outlier", + level = "debug", + skip_all, + fields(lev = %recursion_level) +))] +#[expect(clippy::too_many_arguments)] pub(super) async fn handle_outlier_pdu( &self, origin: &ServerName, @@ -22,9 +29,10 @@ pub(super) async fn handle_outlier_pdu( event_id: &EventId, mut pdu_json: CanonicalJsonObject, room_version: &RoomVersionId, + recursion_level: usize, auth_events_known: bool, ) -> Result<(PduEvent, CanonicalJsonObject)> { - debug!(?event_id, ?auth_events_known, "handle outlier"); + debug!(?event_id, ?auth_events_known, %recursion_level, "handle outlier"); // 1. Remove unsigned field pdu_json.remove("unsigned"); @@ -86,7 +94,14 @@ pub(super) async fn handle_outlier_pdu( // the auth events are also rejected "due to auth events" // NOTE: Step 5 is not applied anymore because it failed too often debug!("Fetching auth events"); - Box::pin(self.fetch_auth(origin, room_id, event.auth_events(), room_version)).await; + Box::pin(self.fetch_auth( + origin, + room_id, + event.auth_events(), + room_version, + recursion_level, + )) + .await; } // 6. Reject "due to auth events" if the event doesn't pass auth based on the diff --git a/src/service/rooms/event_handler/handle_prev_pdu.rs b/src/service/rooms/event_handler/handle_prev_pdu.rs index 0d117c16..b4b89684 100644 --- a/src/service/rooms/event_handler/handle_prev_pdu.rs +++ b/src/service/rooms/event_handler/handle_prev_pdu.rs @@ -26,6 +26,7 @@ pub(super) async fn handle_prev_pdu( event_id: &EventId, eventid_info: Option<(PduEvent, CanonicalJsonObject)>, room_version: &RoomVersionId, + recursion_level: usize, first_ts_in_room: MilliSecondsSinceUnixEpoch, prev_id: &EventId, create_event_id: &EventId, @@ -63,6 +64,7 @@ pub(super) async fn handle_prev_pdu( pdu, json, room_version, + recursion_level, create_event_id, ) .boxed() diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index 2279989e..c826129c 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -24,8 +24,10 @@ use crate::rooms::{ name = "upgrade", level = "debug", ret(level = "debug"), - skip_all + skip_all, + fields(lev = %recursion_level) )] +#[expect(clippy::too_many_arguments)] pub(super) async fn upgrade_outlier_to_timeline_pdu( &self, origin: &ServerName, @@ -33,6 +35,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( incoming_pdu: PduEvent, val: CanonicalJsonObject, room_version: &RoomVersionId, + recursion_level: usize, create_event_id: &EventId, ) -> Result> { // Skip the PDU if we already have it as a timeline event @@ -79,7 +82,14 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( if state_at_incoming_event.is_none() { state_at_incoming_event = self - .fetch_state(origin, room_id, incoming_pdu.event_id(), room_version, create_event_id) + .fetch_state( + origin, + room_id, + incoming_pdu.event_id(), + room_version, + recursion_level, + create_event_id, + ) .boxed() .await?; }