From a0b98fa575ffe198c257e4f4b8edff4225b79688 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Mon, 19 Jan 2026 05:02:55 +0000 Subject: [PATCH] Misc debug and trace log tweaks. Signed-off-by: Jason Volk --- src/api/server/send.rs | 52 +++++++++++----- src/core/matrix/state_res/resolve.rs | 15 ++++- .../state_res/resolve/conflicted_subgraph.rs | 8 ++- src/service/rooms/event_handler/fetch_auth.rs | 14 ++++- src/service/rooms/event_handler/fetch_prev.rs | 2 +- .../event_handler/handle_incoming_pdu.rs | 19 ++++-- .../rooms/event_handler/handle_outlier_pdu.rs | 2 + .../rooms/event_handler/state_at_incoming.rs | 40 +++++++++++-- .../event_handler/upgrade_outlier_pdu.rs | 60 +++++++++++++------ 9 files changed, 163 insertions(+), 49 deletions(-) diff --git a/src/api/server/send.rs b/src/api/server/send.rs index 7a8cf153..8ba5a1d5 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -28,7 +28,9 @@ use tuwunel_core::{ result::LogErr, trace, utils::{ - IterStream, ReadyExt, millis_since_unix_epoch, + IterStream, ReadyExt, + debug::str_truncated, + millis_since_unix_epoch, stream::{BroadbandExt, TryBroadbandExt, automatic_width}, }, warn, @@ -52,7 +54,8 @@ type Pdu = (OwnedRoomId, OwnedEventId, CanonicalJsonObject); skip_all, fields( %client, - origin = body.origin().as_str() + origin = body.origin().as_str(), + txn = str_truncated(body.transaction_id.as_str(), 20), ), )] pub(crate) async fn send_transaction_message_route( @@ -83,8 +86,6 @@ pub(crate) async fn send_transaction_message_route( pdus = body.pdus.len(), edus = body.edus.len(), elapsed = ?txn_start_time.elapsed(), - id = ?body.transaction_id, - origin =?body.origin(), "Starting txn", ); @@ -104,16 +105,20 @@ pub(crate) async fn send_transaction_message_route( .filter_map(Result::ok) .stream(); + trace!( + elapsed = ?txn_start_time.elapsed(), + "Parsed txn", + ); + let results = handle(&services, &client, body.origin(), txn_start_time, pdus, edus).await?; debug!( pdus = body.pdus.len(), edus = body.edus.len(), elapsed = ?txn_start_time.elapsed(), - id = ?body.transaction_id, - origin =?body.origin(), "Finished txn", ); + for (id, result) in &results { if let Err(e) = result && matches!(e, Error::BadRequest(ErrorKind::NotFound, _)) @@ -140,11 +145,12 @@ async fn handle( ) -> Result { // group pdus by room let pdus = pdus + .enumerate() .collect() .map(|mut pdus: Vec<_>| { - pdus.sort_by(|(room_a, ..), (room_b, ..)| room_a.cmp(room_b)); + pdus.sort_by(|(_, (room_a, ..)), (_, (room_b, ..))| room_a.cmp(room_b)); pdus.into_iter() - .into_grouping_map_by(|(room_id, ..)| room_id.clone()) + .into_grouping_map_by(|(_, (room_id, ..))| room_id.clone()) .collect() }) .await; @@ -164,7 +170,10 @@ async fn handle( .await?; // evaluate edus after pdus, at least for now. - edus.for_each_concurrent(automatic_width(), |edu| handle_edu(services, client, origin, edu)) + edus.enumerate() + .for_each_concurrent(automatic_width(), |(i, edu)| { + handle_edu(services, client, origin, i, edu) + }) .await; Ok(results) @@ -176,7 +185,7 @@ async fn handle_room( origin: &ServerName, txn_start_time: &Instant, ref room_id: OwnedRoomId, - pdus: impl Iterator + Send, + pdus: impl Iterator + Send, ) -> Result { let _room_lock = services .event_handler @@ -184,8 +193,9 @@ async fn handle_room( .lock(room_id) .await; - pdus.try_stream() - .and_then(async |(room_id, event_id, value)| { + pdus.enumerate() + .try_stream() + .and_then(async |(ri, (ti, (room_id, event_id, value)))| { services.server.check_running()?; let pdu_start_time = Instant::now(); let result = services @@ -195,9 +205,13 @@ async fn handle_room( .await; debug!( + %event_id, + %room_id, + ri, + ti, pdu_elapsed = ?pdu_start_time.elapsed(), txn_elapsed = ?txn_start_time.elapsed(), - "Finished PDU {event_id}", + "Finished PDU", ); Ok((event_id, result)) @@ -207,7 +221,13 @@ async fn handle_room( .await } -async fn handle_edu(services: &Services, client: &IpAddr, origin: &ServerName, edu: Edu) { +async fn handle_edu( + services: &Services, + client: &IpAddr, + origin: &ServerName, + i: usize, + edu: Edu, +) { match edu { | Edu::Presence(presence) if services.server.config.allow_incoming_presence => handle_edu_presence(services, client, origin, presence).await, @@ -231,9 +251,9 @@ async fn handle_edu(services: &Services, client: &IpAddr, origin: &ServerName, e | Edu::SigningKeyUpdate(content) => handle_edu_signing_key_update(services, client, origin, content).await, - | Edu::_Custom(ref _custom) => debug_warn!(?edu, "received custom/unknown EDU"), + | Edu::_Custom(ref _custom) => debug_warn!(?i, ?edu, "received custom/unknown EDU"), - | _ => trace!(?edu, "skipped"), + | _ => trace!(?i, ?edu, "skipped"), } } diff --git a/src/core/matrix/state_res/resolve.rs b/src/core/matrix/state_res/resolve.rs index 48ec0395..a7003653 100644 --- a/src/core/matrix/state_res/resolve.rs +++ b/src/core/matrix/state_res/resolve.rs @@ -92,8 +92,19 @@ where // Split the unconflicted state map and the conflicted state set. let (unconflicted_state, conflicted_states) = split_conflicted_state(state_maps).await; - trace!(?unconflicted_state, unconflicted = unconflicted_state.len(), "unresolved state"); - debug!(?conflicted_states, conflicted = conflicted_states.len(), "unresolved states"); + debug!( + unconflicted = unconflicted_state.len(), + conflicted = conflicted_states.len(), + "unresolved states", + ); + + trace!( + ?unconflicted_state, + ?conflicted_states, + unconflicted = unconflicted_state.len(), + conflicted = conflicted_states.len(), + "unresolved states", + ); if conflicted_states.is_empty() { return Ok(unconflicted_state.into_iter().collect()); diff --git a/src/core/matrix/state_res/resolve/conflicted_subgraph.rs b/src/core/matrix/state_res/resolve/conflicted_subgraph.rs index f873bc02..41f051f9 100644 --- a/src/core/matrix/state_res/resolve/conflicted_subgraph.rs +++ b/src/core/matrix/state_res/resolve/conflicted_subgraph.rs @@ -8,7 +8,7 @@ use futures::{Future, FutureExt, Stream, StreamExt}; use ruma::OwnedEventId; use crate::{ - Result, + Result, debug, matrix::Event, utils::stream::{IterStream, automatic_width}, }; @@ -52,6 +52,12 @@ where .await; let mut state = state.subgraph.lock().expect("locked"); + debug!( + input_event = conflicted_event_ids.len(), + output_events = state.len(), + "conflicted subgraph state" + ); + take(&mut *state) }) .map(Set::into_iter) diff --git a/src/service/rooms/event_handler/fetch_auth.rs b/src/service/rooms/event_handler/fetch_auth.rs index a3b496ca..298f75f4 100644 --- a/src/service/rooms/event_handler/fetch_auth.rs +++ b/src/service/rooms/event_handler/fetch_auth.rs @@ -27,6 +27,11 @@ use tuwunel_core::{ /// c. Ask origin server over federation /// d. TODO: Ask other servers over federation? #[implement(super::Service)] +#[tracing::instrument( + level = "debug", + skip_all, + fields(%origin), +)] pub(super) async fn fetch_auth<'a, Events>( &self, origin: &ServerName, @@ -96,6 +101,12 @@ where } #[implement(super::Service)] +#[tracing::instrument( + name = "chain", + level = "trace", + skip_all, + fields(%event_id), +)] async fn fetch_auth_chain( &self, origin: &ServerName, @@ -127,7 +138,7 @@ async fn fetch_auth_chain( start: Duration::from_secs(2 * 60), end: Duration::from_secs(60 * 60 * 8), }) { - debug_warn!("Backing off from {next_id}"); + debug_warn!("Backed off from {next_id}"); continue; } @@ -144,6 +155,7 @@ async fn fetch_auth_chain( .await .inspect_err(|e| debug_error!("Failed to fetch event {next_id}: {e}")) else { + debug_warn!("Backing off from {next_id}"); self.back_off(&next_id); continue; }; diff --git a/src/service/rooms/event_handler/fetch_prev.rs b/src/service/rooms/event_handler/fetch_prev.rs index a1f9d88b..11cc6db3 100644 --- a/src/service/rooms/event_handler/fetch_prev.rs +++ b/src/service/rooms/event_handler/fetch_prev.rs @@ -18,7 +18,7 @@ use super::check_room_id; #[implement(super::Service)] #[tracing::instrument( - level = "debug", + level = "debug", skip_all, fields(%origin), )] diff --git a/src/service/rooms/event_handler/handle_incoming_pdu.rs b/src/service/rooms/event_handler/handle_incoming_pdu.rs index 804b3010..a1fcf6bb 100644 --- a/src/service/rooms/event_handler/handle_incoming_pdu.rs +++ b/src/service/rooms/event_handler/handle_incoming_pdu.rs @@ -45,7 +45,7 @@ use crate::rooms::timeline::RawPduId; level = INFO_SPAN_LEVEL, skip_all, fields(%room_id, %event_id), - ret(Debug), + ret(level = "debug"), )] pub async fn handle_incoming_pdu<'a>( &'a self, @@ -57,7 +57,7 @@ pub async fn handle_incoming_pdu<'a>( ) -> Result> { // 1. Skip the PDU if we already have it as a timeline event if let Ok(pdu_id) = self.services.timeline.get_pdu_id(event_id).await { - trace!(?event_id, "exists"); + debug!(?pdu_id, "Exists."); return Ok(Some((pdu_id, false))); } @@ -116,6 +116,10 @@ pub async fn handle_incoming_pdu<'a>( // 8. if not timeline event: stop if !is_timeline_event { + debug!( + kind = ?incoming_pdu.event_type(), + "Not a timeline event.", + ); return Ok(None); } @@ -128,6 +132,11 @@ pub async fn handle_incoming_pdu<'a>( .origin_server_ts(); if incoming_pdu.origin_server_ts() < first_ts_in_room { + debug!( + origin_server_ts = ?incoming_pdu.origin_server_ts(), + ?first_ts_in_room, + "Skipping old event." + ); return Ok(None); } @@ -137,11 +146,11 @@ pub async fn handle_incoming_pdu<'a>( .fetch_prev(origin, room_id, incoming_pdu.prev_events(), &room_version, first_ts_in_room) .await?; - debug!( - events = ?sorted_prev_events, + trace!( + events = sorted_prev_events.len(), + event_ids = ?sorted_prev_events, "Handling previous events" ); - sorted_prev_events .iter() .try_stream() diff --git a/src/service/rooms/event_handler/handle_outlier_pdu.rs b/src/service/rooms/event_handler/handle_outlier_pdu.rs index 8e47e3b7..5a4d42b0 100644 --- a/src/service/rooms/event_handler/handle_outlier_pdu.rs +++ b/src/service/rooms/event_handler/handle_outlier_pdu.rs @@ -23,6 +23,8 @@ pub(super) async fn handle_outlier_pdu( room_version: &RoomVersionId, auth_events_known: bool, ) -> Result<(PduEvent, CanonicalJsonObject)> { + debug!(?event_id, ?auth_events_known, "handle outlier"); + // 1. Remove unsigned field pdu_json.remove("unsigned"); diff --git a/src/service/rooms/event_handler/state_at_incoming.rs b/src/service/rooms/event_handler/state_at_incoming.rs index 9d233a00..a94f0c18 100644 --- a/src/service/rooms/event_handler/state_at_incoming.rs +++ b/src/service/rooms/event_handler/state_at_incoming.rs @@ -6,7 +6,7 @@ use std::{ use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::try_join}; use ruma::{OwnedEventId, RoomId, RoomVersionId}; use tuwunel_core::{ - Result, apply, err, implement, + Result, apply, debug, debug_warn, err, implement, matrix::{Event, StateMap, state_res::AuthSet}, ref_at, trace, utils::{ @@ -42,11 +42,14 @@ where .services .state .pdu_shortstatehash(prev_event_id) + .inspect_err(|e| debug_warn!(?prev_event_id, "Missing state at prev_event: {e}")) .await else { return Ok(None); }; + debug!(?prev_event_id, ?prev_event_sstatehash, "Resolving state at prev_event."); + let prev_event = self .services .timeline @@ -62,6 +65,13 @@ where let (prev_event, mut state) = try_join(prev_event, state).await?; + debug!( + ?prev_event_id, + ?prev_event_sstatehash, + state_ids = state.len(), + "Resolved state at prev_event.", + ); + if let Some(state_key) = prev_event.state_key() { let prev_event_type = prev_event.event_type().to_cow_str().into(); @@ -73,6 +83,14 @@ where state.insert(shortstatekey, prev_event.event_id().into()); // Now it's the state after the pdu + debug!( + ?prev_event_id, + ?prev_event_type, + ?prev_event_sstatehash, + ?shortstatekey, + state_ids = state.len(), + "Added prev_event to state.", + ); } debug_assert!(!state.is_empty(), "should be returning None for empty HashMap result"); @@ -101,14 +119,16 @@ where .prev_events() .try_stream() .broad_and_then(|prev_event_id| { - let prev_event = self.services.timeline.get_pdu(prev_event_id); - let sstatehash = self .services .state .pdu_shortstatehash(prev_event_id); - try_join(sstatehash, prev_event) + let prev_event = self.services.timeline.get_pdu(prev_event_id); + + try_join(sstatehash, prev_event).inspect_err(move |e| { + debug_warn!(?prev_event_id, "Missing state at prev_event: {e}"); + }) }) .try_collect::>() .await @@ -133,12 +153,12 @@ where trace!("Resolving state"); let Ok(new_state) = self .state_resolution(room_version_id, fork_states, auth_chain_sets) + .inspect_ok(|_| trace!("State resolution done.")) .await else { return Ok(None); }; - trace!("State resolution done."); new_state .into_iter() .stream() @@ -149,7 +169,8 @@ where .map(move |shortstatekey| (shortstatekey, event_id)) .await }) - .collect() + .collect::>() + .inspect(|state| trace!(state = state.len(), "Created shortstatekeys.")) .map(Some) .map(Ok) .await @@ -192,6 +213,13 @@ where .collect() .await; + trace!( + prev_event = ?prev_event.event_id(), + ?sstatehash, + leaf_states = leaf_state_after_event.len(), + "leaf state after event" + ); + let starting_events = leaf_state_after_event .iter() .map(ref_at!(1)) diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index dbc4d840..46faca11 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -19,7 +19,12 @@ use crate::rooms::{ }; #[implement(super::Service)] -#[tracing::instrument(name = "upgrade", level = "debug", skip_all, ret(Debug))] +#[tracing::instrument( + name = "upgrade", + level = "debug", + skip_all, + ret(level = "debug") +)] pub(super) async fn upgrade_outlier_to_timeline_pdu( &self, origin: &ServerName, @@ -30,13 +35,14 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( create_event_id: &EventId, ) -> Result> { // Skip the PDU if we already have it as a timeline event - if let Ok(pduid) = self + if let Ok(pdu_id) = self .services .timeline .get_pdu_id(incoming_pdu.event_id()) .await { - return Ok(Some((pduid, false))); + debug!(?pdu_id, "Exists."); + return Ok(Some((pdu_id, false))); } if self @@ -48,17 +54,19 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( return Err!(Request(InvalidParam("Event has been soft failed"))); } - debug!("Upgrading to timeline pdu"); + trace!("Upgrading to timeline pdu"); + let timer = Instant::now(); let room_rules = room_version::rules(room_version)?; + trace!(format = ?room_rules.event_format, "Checking format"); state_res::check_pdu_format(&val, &room_rules.event_format)?; // 10. Fetch missing state and auth chain events by calling /state_ids at // backwards extremities doing all the checks in this list starting at 1. // These are not timeline events. + trace!("Resolving state at event"); - debug!("Resolving state at event"); let mut state_at_incoming_event = if incoming_pdu.prev_events().count() == 1 { self.state_at_incoming_degree_one(&incoming_pdu) .await? @@ -78,8 +86,8 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( let state_at_incoming_event = state_at_incoming_event.expect("we always set this to some above"); - debug!("Performing auth check"); // 11. Check the auth of the event passes based on the state of the event + let state_fetch = async |k: StateEventType, s: StateKey| { let shortstatekey = self .services @@ -99,9 +107,11 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( }; let event_fetch = async |event_id: OwnedEventId| self.event_fetch(&event_id).await; + + trace!("Performing auth check"); state_res::auth_check(&room_rules, &incoming_pdu, &event_fetch, &state_fetch).await?; - debug!("Gathering auth events"); + trace!("Gathering auth events"); let auth_events = self .services .state @@ -123,10 +133,11 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( .ok_or_else(|| err!(Request(NotFound("state event not found")))) }; + trace!("Performing auth check"); state_res::auth_check(&room_rules, &incoming_pdu, &event_fetch, &state_fetch).await?; // Soft fail check before doing state res - debug!("Performing soft-fail check"); + trace!("Performing soft-fail check"); let soft_fail = match incoming_pdu.redacts_id(room_version) { | None => false, | Some(redact_id) => @@ -138,7 +149,6 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( }; // 13. Use state resolution to find new room state - // We start looking at current room state now, so lets lock the room trace!("Locking the room"); let state_lock = self.services.state.mutex.lock(room_id).await; @@ -170,11 +180,12 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( .await; debug!( - "Retained {} extremities checked against {} prev_events", - extremities.len(), - incoming_pdu.prev_events().count() + retained = extremities.len(), + prev_events = incoming_pdu.prev_events().count(), + "Retained extremities checked against prev_events.", ); + trace!("Compressing state..."); let state_ids_compressed: Arc = self .services .state_compressor @@ -188,34 +199,49 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( .await; if incoming_pdu.state_key().is_some() { - debug!("Event is a state-event. Deriving new room state"); - // We also add state after incoming event to the fork states let mut state_after = state_at_incoming_event.clone(); if let Some(state_key) = incoming_pdu.state_key() { + let event_id = incoming_pdu.event_id(); + let event_type = incoming_pdu.kind(); let shortstatekey = self .services .short - .get_or_create_shortstatekey(&incoming_pdu.kind().to_string().into(), state_key) + .get_or_create_shortstatekey(&event_type.to_string().into(), state_key) .await; - let event_id = incoming_pdu.event_id(); state_after.insert(shortstatekey, event_id.to_owned()); + // Now it's the state after the event. + debug!( + ?event_id, + ?event_type, + ?state_key, + ?shortstatekey, + state_after = state_after.len(), + "Adding event to state." + ); } + trace!("Resolving new room state."); let new_room_state = self .resolve_state(room_id, room_version, state_after) .boxed() .await?; // Set the new room state to the resolved state - debug!("Forcing new room state"); + trace!("Saving resolved state."); let HashSetCompressStateEvent { shortstatehash, added, removed } = self .services .state_compressor .save_state(room_id, new_room_state) .await?; + debug!( + ?shortstatehash, + added = added.len(), + removed = removed.len(), + "Forcing new room state." + ); self.services .state .force_state(room_id, shortstatehash, added, removed, &state_lock)