Additional logging/tracing tweaks around sliding-sync.

Additional spans around receipt service interface.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-10-22 20:30:43 +00:00
parent 050a1a350a
commit 9cd175b125
7 changed files with 25 additions and 24 deletions

View File

@@ -23,7 +23,7 @@ use ruma::{
}; };
use tokio::time::{Instant, timeout_at}; use tokio::time::{Instant, timeout_at};
use tuwunel_core::{ use tuwunel_core::{
Err, Result, apply, at, Err, Result, apply, at, debug,
debug::INFO_SPAN_LEVEL, debug::INFO_SPAN_LEVEL,
err, err,
error::inspect_log, error::inspect_log,
@@ -53,7 +53,6 @@ struct SyncInfo<'a> {
services: &'a Services, services: &'a Services,
sender_user: &'a UserId, sender_user: &'a UserId,
sender_device: &'a DeviceId, sender_device: &'a DeviceId,
request: &'a Request,
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@@ -152,13 +151,6 @@ pub(crate) async fn sync_events_v5_route(
conn.update_cache(request); conn.update_cache(request);
conn.update_rooms_prologue(advancing); conn.update_rooms_prologue(advancing);
let sync_info = SyncInfo {
services,
sender_user,
sender_device,
request,
};
let mut response = Response { let mut response = Response {
txn_id: request.txn_id.clone(), txn_id: request.txn_id.clone(),
lists: Default::default(), lists: Default::default(),
@@ -167,6 +159,7 @@ pub(crate) async fn sync_events_v5_route(
extensions: Default::default(), extensions: Default::default(),
}; };
let sync_info = SyncInfo { services, sender_user, sender_device };
loop { loop {
debug_assert!( debug_assert!(
conn.globalsince <= conn.next_batch, conn.globalsince <= conn.next_batch,
@@ -203,15 +196,15 @@ pub(crate) async fn sync_events_v5_route(
if timeout_at(stop_at, watchers).await.is_err() || services.server.is_stopping() { if timeout_at(stop_at, watchers).await.is_err() || services.server.is_stopping() {
response.pos = conn.next_batch.to_string().into(); response.pos = conn.next_batch.to_string().into();
trace!(conn.globalsince, conn.next_batch, "timeout; empty response"); trace!(conn.globalsince, conn.next_batch, "timeout; empty response {response:?}");
return Ok(response); return Ok(response);
} }
trace!( debug!(
conn.globalsince, ?timeout,
last_batch = ?conn.next_batch, last_since = conn.globalsince,
count = ?services.globals.pending_count(), last_batch = conn.next_batch,
stop_at = ?stop_at, pend_count = ?services.globals.pending_count(),
"notified by watcher" "notified by watcher"
); );

View File

@@ -94,7 +94,7 @@ pub(super) async fn collect(
}) })
} }
#[tracing::instrument(level = "trace", skip_all, fields(room_id))] #[tracing::instrument(level = "trace", skip_all, fields(room_id), ret)]
async fn collect_room( async fn collect_room(
SyncInfo { services, sender_user, .. }: SyncInfo<'_>, SyncInfo { services, sender_user, .. }: SyncInfo<'_>,
conn: &Connection, conn: &Connection,

View File

@@ -44,7 +44,7 @@ pub(super) async fn collect(
Ok(response::Receipts { rooms }) Ok(response::Receipts { rooms })
} }
#[tracing::instrument(level = "trace", skip_all, fields(room_id))] #[tracing::instrument(level = "trace", skip_all, fields(room_id), ret)]
async fn collect_room( async fn collect_room(
SyncInfo { services, sender_user, .. }: SyncInfo<'_>, SyncInfo { services, sender_user, .. }: SyncInfo<'_>,
conn: &Connection, conn: &Connection,

View File

@@ -29,9 +29,8 @@ pub(super) async fn selector(
) -> (Window, ResponseLists) { ) -> (Window, ResponseLists) {
use MembershipState::*; use MembershipState::*;
let SyncInfo { services, sender_user, request, .. } = sync_info; let SyncInfo { services, sender_user, .. } = sync_info;
trace!(?request);
let mut rooms = services let mut rooms = services
.state_cache .state_cache
.user_memberships(sender_user, Some(&[Join, Invite, Knock])) .user_memberships(sender_user, Some(&[Join, Invite, Knock]))
@@ -64,7 +63,12 @@ pub(super) async fn selector(
(window, lists) (window, lists)
} }
#[tracing::instrument(name = "window", level = "debug", skip_all)] #[tracing::instrument(
name = "window",
level = "debug",
skip_all,
fields(rooms = rooms.clone().count())
)]
async fn select_window<'a, Rooms>( async fn select_window<'a, Rooms>(
sync_info: SyncInfo<'_>, sync_info: SyncInfo<'_>,
conn: &Connection, conn: &Connection,

View File

@@ -4,7 +4,7 @@ use tuwunel_core::{self, Result};
use super::{Connection, SyncInfo}; use super::{Connection, SyncInfo};
#[tracing::instrument(name = "to_device", level = "trace", skip_all)] #[tracing::instrument(name = "to_device", level = "trace", skip_all, ret)]
pub(super) async fn collect( pub(super) async fn collect(
SyncInfo { services, sender_user, sender_device, .. }: SyncInfo<'_>, SyncInfo { services, sender_user, sender_device, .. }: SyncInfo<'_>,
conn: &Connection, conn: &Connection,

View File

@@ -13,7 +13,7 @@ use tuwunel_core::{
use super::{Connection, SyncInfo, Window, extension_rooms_selector}; use super::{Connection, SyncInfo, Window, extension_rooms_selector};
#[tracing::instrument(name = "typing", level = "trace", skip_all)] #[tracing::instrument(name = "typing", level = "trace", skip_all, ret)]
pub(super) async fn collect( pub(super) async fn collect(
sync_info: SyncInfo<'_>, sync_info: SyncInfo<'_>,
conn: &Connection, conn: &Connection,

View File

@@ -40,6 +40,7 @@ impl crate::Service for Service {
impl Service { impl Service {
/// Replaces the previous read receipt. /// Replaces the previous read receipt.
#[tracing::instrument(skip(self), level = "debug", name = "set_receipt")]
pub async fn readreceipt_update( pub async fn readreceipt_update(
&self, &self,
user_id: &UserId, user_id: &UserId,
@@ -58,6 +59,7 @@ impl Service {
} }
/// Gets the latest private read receipt from the user in the room /// Gets the latest private read receipt from the user in the room
#[tracing::instrument(skip(self), level = "debug", name = "get_private")]
pub async fn private_read_get( pub async fn private_read_get(
&self, &self,
room_id: &RoomId, room_id: &RoomId,
@@ -122,13 +124,13 @@ impl Service {
} }
/// Sets a private read marker at PDU `count`. /// Sets a private read marker at PDU `count`.
#[tracing::instrument(skip(self), level = "debug")] #[tracing::instrument(skip(self), level = "debug", name = "set_private")]
pub fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) { pub fn private_read_set(&self, room_id: &RoomId, user_id: &UserId, count: u64) {
self.db.private_read_set(room_id, user_id, count); self.db.private_read_set(room_id, user_id, count);
} }
/// Returns the private read marker PDU count. /// Returns the private read marker PDU count.
#[tracing::instrument(skip(self), level = "debug")] #[tracing::instrument(skip(self), level = "debug", name = "get_private_count", ret)]
pub async fn private_read_get_count( pub async fn private_read_get_count(
&self, &self,
room_id: &RoomId, room_id: &RoomId,
@@ -140,12 +142,14 @@ impl Service {
} }
/// Returns the PDU count of the last typing update in this room. /// Returns the PDU count of the last typing update in this room.
#[tracing::instrument(skip(self), level = "debug", name = "get_private_last", ret)]
pub async fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> u64 { pub async fn last_privateread_update(&self, user_id: &UserId, room_id: &RoomId) -> u64 {
self.db self.db
.last_privateread_update(user_id, room_id) .last_privateread_update(user_id, room_id)
.await .await
} }
#[tracing::instrument(skip(self), level = "debug", name = "get_receipt_last", ret)]
pub async fn last_receipt_count( pub async fn last_receipt_count(
&self, &self,
room_id: &RoomId, room_id: &RoomId,