Update the since token to skip empty range from prior iteration.

Log and discard presence ping errors without preventing sync.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-08-05 23:35:31 +00:00
parent 00f11a9e8f
commit e106e50ed0

View File

@@ -49,6 +49,7 @@ use tuwunel_core::{
self, BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt, self, BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt,
future::{OptionStream, ReadyEqExt}, future::{OptionStream, ReadyEqExt},
math::ruma_from_u64, math::ruma_from_u64,
result::LogErr,
stream::{BroadbandExt, Tools, TryExpect, WidebandExt}, stream::{BroadbandExt, Tools, TryExpect, WidebandExt},
}, },
warn, warn,
@@ -116,7 +117,6 @@ type PresenceUpdates = HashMap<OwnedUserId, PresenceEventContent>;
skip_all, skip_all,
fields( fields(
user_id = %body.sender_user(), user_id = %body.sender_user(),
since = body.body.since.as_deref().unwrap_or("0"),
) )
)] )]
pub(crate) async fn sync_events_route( pub(crate) async fn sync_events_route(
@@ -130,10 +130,12 @@ pub(crate) async fn sync_events_route(
services services
.presence .presence
.ping_presence(sender_user, &body.body.set_presence) .ping_presence(sender_user, &body.body.set_presence)
.await?; .await
.log_err()
.ok();
} }
let since = body let mut since = body
.body .body
.since .since
.as_deref() .as_deref()
@@ -157,13 +159,11 @@ pub(crate) async fn sync_events_route(
.expect("configuration must limit maximum timeout"); .expect("configuration must limit maximum timeout");
loop { loop {
// Listen for activity
let watchers = services.sync.watch(sender_user, sender_device); let watchers = services.sync.watch(sender_user, sender_device);
let next_batch = services.globals.wait_pending().await?; let next_batch = services.globals.wait_pending().await?;
debug_assert!(since <= next_batch, "next_batch is monotonic"); debug_assert!(since <= next_batch, "next_batch is monotonic");
if since < next_batch || body.body.full_state { if since < next_batch || body.body.full_state {
// Check for activity
let response = build_sync_events(&services, &body, since, next_batch).await?; let response = build_sync_events(&services, &body, since, next_batch).await?;
let empty = response.rooms.is_empty() let empty = response.rooms.is_empty()
&& response.presence.is_empty() && response.presence.is_empty()
@@ -178,17 +178,19 @@ pub(crate) async fn sync_events_route(
// Wait for activity // Wait for activity
if time::timeout_at(stop_at, watchers).await.is_err() { if time::timeout_at(stop_at, watchers).await.is_err() {
trace!(next_batch, "empty response"); trace!(since, next_batch, "empty response");
return Ok(sync_events::v3::Response::new(next_batch.to_string())); return Ok(sync_events::v3::Response::new(next_batch.to_string()));
} }
trace!( trace!(
?since, since,
last_batch = ?next_batch, last_batch = ?next_batch,
count = ?services.globals.pending_count(), count = ?services.globals.pending_count(),
stop_at = ?stop_at, stop_at = ?stop_at,
"notified by watcher" "notified by watcher"
); );
since = next_batch;
} }
} }