Fix partial v3 syncs on post-timeout pass; fix partial state on room join.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-07-27 05:44:10 +00:00
parent 2e0b156de0
commit 59b62b1453
4 changed files with 103 additions and 53 deletions

View File

@@ -115,7 +115,6 @@ type PresenceUpdates = HashMap<OwnedUserId, PresenceEventContent>;
skip_all,
fields(
user_id = %body.sender_user(),
since = %body.body.since.as_deref().unwrap_or_default(),
)
)]
pub(crate) async fn sync_events_route(
@@ -134,21 +133,29 @@ pub(crate) async fn sync_events_route(
// Setup watchers, so if there's no response, we can wait for them
let watcher = services.sync.watch(sender_user, sender_device);
let next_batch = services.globals.current_count();
let response = build_sync_events(&services, &body, next_batch).await?;
if body.body.full_state
|| !(response.rooms.is_empty()
&& response.presence.is_empty()
&& response.account_data.is_empty()
&& response.device_lists.is_empty()
&& response.to_device.is_empty())
{
return Ok(response);
let since = body
.body
.since
.as_deref()
.map(str::parse)
.flat_ok()
.unwrap_or(0);
// Make an initial pass if there's a possibility of something to find.
if since < next_batch {
let response = build_sync_events(&services, &body, since, next_batch).await?;
if body.body.full_state
|| !(response.rooms.is_empty()
&& response.presence.is_empty()
&& response.account_data.is_empty()
&& response.device_lists.is_empty()
&& response.to_device.is_empty())
{
return Ok(response);
}
}
// Hang a few seconds so requests are not spammed
// Stop hanging if new info arrives
let timeout_default = services.config.client_sync_timeout_default;
let timeout_min = services.config.client_sync_timeout_min;
let timeout_max = services.config.client_sync_timeout_max;
@@ -158,27 +165,41 @@ pub(crate) async fn sync_events_route(
.unwrap_or_else(|| Duration::from_millis(timeout_default))
.clamp(Duration::from_millis(timeout_min), Duration::from_millis(timeout_max));
// Wait for activity notification.
_ = tokio::time::timeout(duration, watcher).await;
// Retry returning data
let next_batch = services.globals.current_count();
build_sync_events(&services, &body, next_batch).await
// Wait for synchronization of activity.
let next_batch = services.globals.wait_pending().await?;
debug_assert!(since <= next_batch, "next_batch is monotonic");
// Return an empty response when nothing has changed.
if since == next_batch {
return Ok(sync_events::v3::Response::new(next_batch.to_string()));
}
build_sync_events(&services, &body, since, next_batch)
.await
.map_err(Into::into)
}
pub(crate) async fn build_sync_events(
#[tracing::instrument(
name = "build",
level = "debug",
ret(level = "trace"),
skip_all,
fields(
%since,
%next_batch,
)
)]
async fn build_sync_events(
services: &Services,
body: &Ruma<sync_events::v3::Request>,
since: u64,
next_batch: u64,
) -> Result<sync_events::v3::Response, RumaResponse<UiaaResponse>> {
) -> Result<sync_events::v3::Response> {
let (sender_user, sender_device) = body.sender();
let since = body
.body
.since
.as_ref()
.and_then(|string| string.parse().ok())
.unwrap_or(0);
let full_state = body.body.full_state;
let filter = match body.body.filter.as_ref() {
| None => FilterDefinition::default(),
@@ -364,7 +385,7 @@ pub(crate) async fn build_sync_events(
.collect()
.await;
let response = sync_events::v3::Response {
Ok(sync_events::v3::Response {
account_data: GlobalAccountData { events: account_data },
device_lists: DeviceLists {
changed: device_list_updates.into_iter().collect(),
@@ -390,9 +411,7 @@ pub(crate) async fn build_sync_events(
knock: knocked_rooms,
},
to_device: ToDevice { events: to_device_events },
};
Ok(response)
})
}
#[tracing::instrument(name = "presence", level = "debug", skip_all)]
@@ -670,6 +689,25 @@ async fn load_joined_room(
.boxed()
.await?;
// State was changed after the cutoff for this sync; similar to other handlers.
if current_shortstatehash > next_batch {
// Transfer the since_shortstatehash not the current over to the next sync.
if let Some(since_shortstatehash) = since_shortstatehash {
services
.rooms
.user
.associate_token_shortstatehash(room_id, next_batch, since_shortstatehash)
.await;
}
return Ok((JoinedRoom::default(), HashSet::new(), HashSet::new()));
}
let associate_token = services
.rooms
.user
.associate_token_shortstatehash(room_id, next_batch, current_shortstatehash);
let initial = since_shortstatehash.is_none() || since == 0;
let lazy_loading_enabled = filter.room.state.lazy_load_options.is_enabled()
|| filter
@@ -739,9 +777,15 @@ async fn load_joined_room(
.read_receipt
.last_privateread_update(sender_user, room_id);
let (last_privateread_update, last_notification_read, since_sender_member, witness) =
join4(last_privateread_update, last_notification_read, since_sender_member, witness)
.await;
let (last_privateread_update, last_notification_read, since_sender_member, witness, ()) =
join5(
last_privateread_update,
last_notification_read,
since_sender_member,
witness,
associate_token,
)
.await;
let joined_since_last_sync =
since_sender_member
@@ -888,14 +932,6 @@ async fn load_joined_room(
.chain(private_read_event.flatten().into_iter())
.collect();
// Save the state after this sync so we can send the correct state diff next
// sync
services
.rooms
.user
.associate_token_shortstatehash(room_id, next_batch, current_shortstatehash)
.await;
let joined_room = JoinedRoom {
account_data: RoomAccountData { events: account_data_events },
summary: RoomSummary {