diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index e16d0c6b..96628c95 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -49,6 +49,7 @@ use tuwunel_core::{ self, BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt, future::{OptionStream, ReadyEqExt}, math::ruma_from_u64, + result::LogErr, stream::{BroadbandExt, Tools, TryExpect, WidebandExt}, }, warn, @@ -116,7 +117,6 @@ type PresenceUpdates = HashMap; skip_all, fields( user_id = %body.sender_user(), - since = body.body.since.as_deref().unwrap_or("0"), ) )] pub(crate) async fn sync_events_route( @@ -130,10 +130,12 @@ pub(crate) async fn sync_events_route( services .presence .ping_presence(sender_user, &body.body.set_presence) - .await?; + .await + .log_err() + .ok(); } - let since = body + let mut since = body .body .since .as_deref() @@ -157,13 +159,11 @@ pub(crate) async fn sync_events_route( .expect("configuration must limit maximum timeout"); loop { - // Listen for activity let watchers = services.sync.watch(sender_user, sender_device); let next_batch = services.globals.wait_pending().await?; debug_assert!(since <= next_batch, "next_batch is monotonic"); if since < next_batch || body.body.full_state { - // Check for activity let response = build_sync_events(&services, &body, since, next_batch).await?; let empty = response.rooms.is_empty() && response.presence.is_empty() @@ -178,17 +178,19 @@ pub(crate) async fn sync_events_route( // Wait for activity if time::timeout_at(stop_at, watchers).await.is_err() { - trace!(next_batch, "empty response"); + trace!(since, next_batch, "empty response"); return Ok(sync_events::v3::Response::new(next_batch.to_string())); } trace!( - ?since, + since, last_batch = ?next_batch, count = ?services.globals.pending_count(), stop_at = ?stop_at, "notified by watcher" ); + + since = next_batch; } }