diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 21c954ab..86319d78 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -42,6 +42,7 @@ use tuwunel_core::{ }, pair_of, ref_at, result::FlatOk, + trace, utils::{ self, BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt, future::{OptionStream, ReadyEqExt}, @@ -115,6 +116,7 @@ type PresenceUpdates = HashMap; skip_all, fields( user_id = %body.sender_user(), + since = body.body.since.as_deref().unwrap_or("0"), ) )] pub(crate) async fn sync_events_route( @@ -166,7 +168,12 @@ pub(crate) async fn sync_events_route( .clamp(Duration::from_millis(timeout_min), Duration::from_millis(timeout_max)); // Wait for activity notification. - _ = tokio::time::timeout(duration, watcher).await; + let timeout = tokio::time::timeout(duration, watcher).await; + trace!( + count = ?services.globals.pending_count(), + timedout = timeout.is_err(), + "waited for watchers" + ); // Wait for synchronization of activity. let next_batch = services.globals.wait_pending().await?; @@ -174,6 +181,7 @@ pub(crate) async fn sync_events_route( // Return an empty response when nothing has changed. if since == next_batch { + trace!(next_batch, "empty response"); return Ok(sync_events::v3::Response::new(next_batch.to_string())); } @@ -190,6 +198,7 @@ pub(crate) async fn sync_events_route( fields( %since, %next_batch, + count = ?services.globals.pending_count(), ) )] async fn build_sync_events( diff --git a/src/core/utils/two_phase_counter.rs b/src/core/utils/two_phase_counter.rs index 51888000..82ad34f9 100644 --- a/src/core/utils/two_phase_counter.rs +++ b/src/core/utils/two_phase_counter.rs @@ -2,8 +2,7 @@ use std::{ collections::VecDeque, - fmt::Debug, - ops::Deref, + ops::{Deref, Range}, sync::{Arc, RwLock}, }; @@ -23,14 +22,12 @@ use crate::{Result, checked, is_equal_to}; /// value, but that value has no Pdu found because its write has not been /// completed with global visibility. Client-sync will then move on to the next /// counter value having missed the data from the current one. -#[derive(Debug)] pub struct Counter Result + Sync> { /// Self is intended to be Arc with inner state mutable via Lock. inner: RwLock>, } /// Inner protected state for Two-Phase Counter. -#[derive(Debug)] pub struct State Result + Sync> { /// Monotonic counter. The next sequence number is drawn by adding one to /// this value. That number will be persisted and added to `pending`. @@ -50,7 +47,6 @@ pub struct State Result + Sync> { release: F, } -#[derive(Debug)] pub struct Permit Result + Sync> { /// Link back to the shared-state. state: Arc>, @@ -80,6 +76,17 @@ impl Result + Sync> Counter { Ok(Permit:: { state: self.clone(), retired, id }) } + /// Load the current and dispatched values simultaneously + #[inline] + pub fn range(&self) -> Range { + let inner = self.inner.read().expect("locked for reading"); + + Range { + start: inner.retired(), + end: inner.dispatched, + } + } + /// Load the highest sequence number safe for reading, also known as the /// retirement value with writes "globally visible." #[inline] diff --git a/src/service/globals/data.rs b/src/service/globals/data.rs index c32bc50f..fe9b3c0a 100644 --- a/src/service/globals/data.rs +++ b/src/service/globals/data.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{ops::Range, sync::Arc}; use tokio::sync::{watch, watch::Sender}; use tuwunel_core::{ @@ -53,7 +53,7 @@ impl Data { .wait_for(|retired| retired.ge(count)) .await .map(|retired| *retired) - .map_err(|e| err!("counter channel error {e:?}")) + .map_err(|e| err!(debug_error!("counter channel error {e:?}"))) } #[inline] @@ -66,6 +66,9 @@ impl Data { #[inline] pub fn current_count(&self) -> u64 { self.counter.current() } + #[inline] + pub fn pending_count(&self) -> Range { self.counter.range() } + fn handle_retire(sender: &Sender, count: u64) -> Result { let _prev = sender.send_replace(count); diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index 7d934056..90cb50a4 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -3,6 +3,7 @@ mod data; use std::{ collections::HashMap, fmt::Write, + ops::Range, sync::{Arc, RwLock}, time::Instant, }; @@ -105,22 +106,40 @@ impl crate::Service for Service { impl Service { #[inline] - #[tracing::instrument(level = "trace", skip(self))] + #[tracing::instrument( + level = "trace", + skip_all, + ret, + fields(pending = ?self.pending_count()), + )] pub async fn wait_pending(&self) -> Result { self.db.wait_pending().await } #[inline] - #[tracing::instrument(level = "trace", skip(self))] + #[tracing::instrument( + level = "trace", + skip_all, + ret, + fields(pending = ?self.pending_count()), + )] pub async fn wait_count(&self, count: &u64) -> Result { self.db.wait_count(count).await } #[inline] #[must_use] - #[tracing::instrument(level = "debug", skip(self))] + #[tracing::instrument( + level = "debug", + skip_all, + fields(pending = ?self.pending_count()), + )] pub fn next_count(&self) -> data::Permit { self.db.next_count() } #[inline] #[must_use] pub fn current_count(&self) -> u64 { self.db.current_count() } + #[inline] + #[must_use] + pub fn pending_count(&self) -> Range { self.db.pending_count() } + #[inline] #[must_use] pub fn server_name(&self) -> &ServerName { self.server.name.as_ref() } diff --git a/src/service/rooms/event_handler/handle_incoming_pdu.rs b/src/service/rooms/event_handler/handle_incoming_pdu.rs index 4b95b63a..d366da75 100644 --- a/src/service/rooms/event_handler/handle_incoming_pdu.rs +++ b/src/service/rooms/event_handler/handle_incoming_pdu.rs @@ -48,6 +48,7 @@ use crate::rooms::timeline::RawPduId; level = INFO_SPAN_LEVEL, skip_all, fields(%room_id, %event_id), + ret(Debug), )] pub async fn handle_incoming_pdu<'a>( &self, diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index c4102c7e..d16996cf 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -17,6 +17,7 @@ use crate::rooms::{ }; #[implement(super::Service)] +#[tracing::instrument(name = "upgrade", level = "debug", skip_all, ret(Debug))] pub(super) async fn upgrade_outlier_to_timeline_pdu( &self, incoming_pdu: PduEvent, diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index 0101d103..cc85fd10 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -93,6 +93,15 @@ impl crate::Service for Service { impl Service { /// Set the room to the given statehash and update caches. + #[tracing::instrument( + name = "force", + level = "debug", + skip_all, + fields( + count = ?self.services.globals.pending_count(), + %shortstatehash, + ) + )] pub async fn force_state( &self, room_id: &RoomId, @@ -172,7 +181,14 @@ impl Service { /// /// This adds all current state events (not including the incoming event) /// to `stateid_pduid` and adds the incoming event to `eventid_statehash`. - #[tracing::instrument(skip(self, state_ids_compressed), level = "debug")] + #[tracing::instrument( + name = "set", + level = "debug", + skip(self, state_ids_compressed), + fields( + count = ?self.services.globals.pending_count(), + ) + )] pub async fn set_event_state( &self, event_id: &EventId, @@ -248,7 +264,14 @@ impl Service { /// /// This adds all current state events (not including the incoming event) /// to `stateid_pduid` and adds the incoming event to `eventid_statehash`. - #[tracing::instrument(skip(self, new_pdu), level = "debug")] + #[tracing::instrument( + name = "set", + level = "debug", + skip(self, new_pdu), + fields( + count = ?self.services.globals.pending_count(), + ) + )] pub async fn append_to_state(&self, new_pdu: &PduEvent) -> Result { const BUFSIZE: usize = size_of::(); @@ -332,7 +355,7 @@ impl Service { } } - #[tracing::instrument(skip_all, level = "debug")] + #[tracing::instrument(skip_all, level = "trace")] pub async fn summary_stripped(&self, event: &Pdu) -> Vec> where Pdu: Event, @@ -380,7 +403,11 @@ impl Service { } /// Returns the room's version. - #[tracing::instrument(skip(self), level = "debug")] + #[tracing::instrument( + level = "trace" + skip(self), + ret, + )] pub async fn get_room_version(&self, room_id: &RoomId) -> Result { self.services .state_accessor @@ -390,6 +417,11 @@ impl Service { .map_err(|e| err!(Request(NotFound("No create event found: {e:?}")))) } + #[tracing::instrument( + level = "trace" + skip(self), + ret, + )] pub async fn get_room_shortstatehash(&self, room_id: &RoomId) -> Result { self.db .roomid_shortstatehash @@ -411,6 +443,11 @@ impl Service { .ignore_err() } + #[tracing::instrument( + level = "debug" + skip_all, + fields(%room_id), + )] pub async fn set_forward_extremities<'a, I>( &'a self, room_id: &'a RoomId, @@ -434,7 +471,7 @@ impl Service { } /// This fetches auth events from the current state. - #[tracing::instrument(skip(self, content), level = "debug")] + #[tracing::instrument(skip(self, content), level = "trace")] pub async fn get_auth_events( &self, room_id: &RoomId, diff --git a/src/service/rooms/timeline/append.rs b/src/service/rooms/timeline/append.rs index e2335151..4d06d399 100644 --- a/src/service/rooms/timeline/append.rs +++ b/src/service/rooms/timeline/append.rs @@ -33,7 +33,12 @@ use crate::{appservice::NamespaceRegex, rooms::state_compressor::CompressedState /// Append the incoming event setting the state snapshot to the state from /// the server that sent the event. #[implement(super::Service)] -#[tracing::instrument(level = "debug", skip_all)] +#[tracing::instrument( + name = "append_incoming", + level = "debug", + skip_all, + ret(Debug) +)] pub async fn append_incoming_pdu<'a, Leafs>( &'a self, pdu: &'a PduEvent, @@ -81,7 +86,7 @@ where /// /// Returns pdu id #[implement(super::Service)] -#[tracing::instrument(level = "debug", skip_all)] +#[tracing::instrument(name = "append", level = "debug", skip_all, ret(Debug))] pub async fn append_pdu<'a, Leafs>( &'a self, pdu: &'a PduEvent, diff --git a/src/service/rooms/timeline/build.rs b/src/service/rooms/timeline/build.rs index a30807ec..ff097793 100644 --- a/src/service/rooms/timeline/build.rs +++ b/src/service/rooms/timeline/build.rs @@ -23,7 +23,7 @@ use super::RoomMutexGuard; /// takes a roomid_mutex_state, meaning that only this function is able to /// mutate the room state. #[implement(super::Service)] -#[tracing::instrument(skip(self, state_lock), level = "debug")] +#[tracing::instrument(skip(self, state_lock), level = "debug", ret)] pub async fn build_and_append_pdu( &self, pdu_builder: PduBuilder,