From bd0c3e33e2a2b8a46ad1b95c586e71aeab2ca0d9 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Wed, 30 Jul 2025 05:27:12 +0000 Subject: [PATCH] Loop syncv3 for robust response without re-request overhead. Signed-off-by: Jason Volk --- src/api/client/sync/v3.rs | 89 +++++++++++++++++++++------------------ 1 file changed, 48 insertions(+), 41 deletions(-) diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 86319d78..b1b4cb51 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -34,6 +34,7 @@ use ruma::{ serde::Raw, uint, }; +use tokio::time; use tuwunel_core::{ Result, at, err, error, extract_variant, is_equal_to, matrix::{ @@ -133,9 +134,6 @@ pub(crate) async fn sync_events_route( .await?; } - // Setup watchers, so if there's no response, we can wait for them - let watcher = services.sync.watch(sender_user, sender_device); - let next_batch = services.globals.current_count(); let since = body .body .since @@ -144,50 +142,59 @@ pub(crate) async fn sync_events_route( .flat_ok() .unwrap_or(0); - // Make an initial pass if there's a possibility of something to find. - if since < next_batch { - let response = build_sync_events(&services, &body, since, next_batch).await?; - if body.body.full_state - || !(response.rooms.is_empty() + let timeout = body + .body + .timeout + .as_ref() + .map(Duration::as_millis) + .map(TryInto::try_into) + .flat_ok() + .unwrap_or(services.config.client_sync_timeout_default) + .max(services.config.client_sync_timeout_min) + .min(services.config.client_sync_timeout_max); + + let stop_at = time::Instant::now() + .checked_add(Duration::from_millis(timeout)) + .expect("configuration must limit maximum timeout"); + + let mut next_batch = services.globals.current_count(); + loop { + // Listen for activity + let watchers = services.sync.watch(sender_user, sender_device); + if since < next_batch || body.body.full_state { + // Check for activity + let response = build_sync_events(&services, &body, since, next_batch).await?; + let empty = response.rooms.is_empty() && response.presence.is_empty() && response.account_data.is_empty() && response.device_lists.is_empty() - && response.to_device.is_empty()) - { - return Ok(response); + && response.to_device.is_empty(); + + if !empty || body.body.full_state { + return Ok(response); + } } + + // Wait for activity + if time::timeout_at(stop_at, watchers).await.is_err() { + break; + } + + trace!( + ?since, + last_batch = ?next_batch, + count = ?services.globals.pending_count(), + stop_at = ?stop_at, + "waited for watchers" + ); + + // Synchronize activity + next_batch = services.globals.wait_pending().await?; + debug_assert!(since <= next_batch, "next_batch is monotonic"); } - let timeout_default = services.config.client_sync_timeout_default; - let timeout_min = services.config.client_sync_timeout_min; - let timeout_max = services.config.client_sync_timeout_max; - let duration = body - .body - .timeout - .unwrap_or_else(|| Duration::from_millis(timeout_default)) - .clamp(Duration::from_millis(timeout_min), Duration::from_millis(timeout_max)); - - // Wait for activity notification. - let timeout = tokio::time::timeout(duration, watcher).await; - trace!( - count = ?services.globals.pending_count(), - timedout = timeout.is_err(), - "waited for watchers" - ); - - // Wait for synchronization of activity. - let next_batch = services.globals.wait_pending().await?; - debug_assert!(since <= next_batch, "next_batch is monotonic"); - - // Return an empty response when nothing has changed. - if since == next_batch { - trace!(next_batch, "empty response"); - return Ok(sync_events::v3::Response::new(next_batch.to_string())); - } - - build_sync_events(&services, &body, since, next_batch) - .await - .map_err(Into::into) + trace!(next_batch, "empty response"); + return Ok(sync_events::v3::Response::new(next_batch.to_string())); } #[tracing::instrument(