diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 2b738f02..dd4062d1 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -809,7 +809,8 @@ async fn load_joined_room( .map(ToOwned::to_owned) .collect::>(); - let send_notification_counts = last_notification_read.is_none_or(|count| count > since); + let send_notification_counts = + last_notification_read.is_none_or(|last_count| last_count.gt(&since)); let notification_count: OptionFuture<_> = send_notification_counts .then(|| { diff --git a/src/api/client/sync/v5.rs b/src/api/client/sync/v5.rs index a4073841..78b9535e 100644 --- a/src/api/client/sync/v5.rs +++ b/src/api/client/sync/v5.rs @@ -87,7 +87,7 @@ pub(crate) async fn sync_events_v5_route( // Setup watchers, so if there's no response, we can wait for them let watcher = services.sync.watch(sender_user, sender_device); - let next_batch = services.globals.current_count(); + let next_batch = services.globals.wait_pending().await?; // Get sticky parameters from cache let mut cached = body.body.clone(); @@ -133,9 +133,9 @@ pub(crate) async fn sync_events_v5_route( let sync_info: SyncInfo<'_> = (sender_user, sender_device, globalsince, &cached); - let account_data = collect_account_data(services, sync_info).map(Ok); + let account_data = collect_account_data(services, sync_info, next_batch).map(Ok); - let e2ee = collect_e2ee(services, sync_info, all_joined_rooms.clone()); + let e2ee = collect_e2ee(services, sync_info, next_batch, all_joined_rooms.clone()); let to_device = collect_to_device(services, sync_info, next_batch).map(Ok); @@ -489,15 +489,15 @@ where .insert(room_id.clone(), pack_receipts(Box::new(receipts.into_iter()))); } - if roomsince != &0 + if *roomsince != 0 && timeline_pdus.is_empty() + && receipt_size == 0 && response .extensions .account_data .rooms .get(room_id) .is_none_or(Vec::is_empty) - && receipt_size == 0 { continue; } @@ -514,7 +514,7 @@ where })) })? .or_else(|| { - if roomsince != &0 { + if *roomsince != 0 { Some(roomsince.to_string()) } else { None @@ -698,6 +698,7 @@ where async fn collect_account_data( services: &Services, (sender_user, _, globalsince, body): (&UserId, &DeviceId, u64, &sync_events::v5::Request), + next_batch: u64, ) -> sync_events::v5::response::AccountData { let mut account_data = sync_events::v5::response::AccountData { global: Vec::new(), @@ -715,7 +716,7 @@ async fn collect_account_data( account_data.global = services .account_data - .changes_since(None, sender_user, globalsince, None) + .changes_since(None, sender_user, globalsince, Some(next_batch)) .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global)) .collect() .await; @@ -726,7 +727,7 @@ async fn collect_account_data( room.clone(), services .account_data - .changes_since(Some(room), sender_user, globalsince, None) + .changes_since(Some(room), sender_user, globalsince, Some(next_batch)) .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) .collect() .await, @@ -745,6 +746,7 @@ async fn collect_e2ee<'a, Rooms>( u64, &sync_events::v5::Request, ), + next_batch: u64, all_joined_rooms: Rooms, ) -> Result where @@ -760,7 +762,7 @@ where device_list_changes.extend( services .users - .keys_changed(sender_user, globalsince, None) + .keys_changed(sender_user, globalsince, Some(next_batch)) .map(ToOwned::to_owned) .collect::>() .await, @@ -906,7 +908,7 @@ where device_list_changes.extend( services .users - .room_keys_changed(room_id, globalsince, None) + .room_keys_changed(room_id, globalsince, Some(next_batch)) .map(|(user_id, _)| user_id) .map(ToOwned::to_owned) .collect::>()