Add missing upper-bounded calls; improve snake-sync windowing.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -809,7 +809,8 @@ async fn load_joined_room(
|
|||||||
.map(ToOwned::to_owned)
|
.map(ToOwned::to_owned)
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
let send_notification_counts = last_notification_read.is_none_or(|count| count > since);
|
let send_notification_counts =
|
||||||
|
last_notification_read.is_none_or(|last_count| last_count.gt(&since));
|
||||||
|
|
||||||
let notification_count: OptionFuture<_> = send_notification_counts
|
let notification_count: OptionFuture<_> = send_notification_counts
|
||||||
.then(|| {
|
.then(|| {
|
||||||
|
|||||||
@@ -87,7 +87,7 @@ pub(crate) async fn sync_events_v5_route(
|
|||||||
// Setup watchers, so if there's no response, we can wait for them
|
// Setup watchers, so if there's no response, we can wait for them
|
||||||
let watcher = services.sync.watch(sender_user, sender_device);
|
let watcher = services.sync.watch(sender_user, sender_device);
|
||||||
|
|
||||||
let next_batch = services.globals.current_count();
|
let next_batch = services.globals.wait_pending().await?;
|
||||||
|
|
||||||
// Get sticky parameters from cache
|
// Get sticky parameters from cache
|
||||||
let mut cached = body.body.clone();
|
let mut cached = body.body.clone();
|
||||||
@@ -133,9 +133,9 @@ pub(crate) async fn sync_events_v5_route(
|
|||||||
|
|
||||||
let sync_info: SyncInfo<'_> = (sender_user, sender_device, globalsince, &cached);
|
let sync_info: SyncInfo<'_> = (sender_user, sender_device, globalsince, &cached);
|
||||||
|
|
||||||
let account_data = collect_account_data(services, sync_info).map(Ok);
|
let account_data = collect_account_data(services, sync_info, next_batch).map(Ok);
|
||||||
|
|
||||||
let e2ee = collect_e2ee(services, sync_info, all_joined_rooms.clone());
|
let e2ee = collect_e2ee(services, sync_info, next_batch, all_joined_rooms.clone());
|
||||||
|
|
||||||
let to_device = collect_to_device(services, sync_info, next_batch).map(Ok);
|
let to_device = collect_to_device(services, sync_info, next_batch).map(Ok);
|
||||||
|
|
||||||
@@ -489,15 +489,15 @@ where
|
|||||||
.insert(room_id.clone(), pack_receipts(Box::new(receipts.into_iter())));
|
.insert(room_id.clone(), pack_receipts(Box::new(receipts.into_iter())));
|
||||||
}
|
}
|
||||||
|
|
||||||
if roomsince != &0
|
if *roomsince != 0
|
||||||
&& timeline_pdus.is_empty()
|
&& timeline_pdus.is_empty()
|
||||||
|
&& receipt_size == 0
|
||||||
&& response
|
&& response
|
||||||
.extensions
|
.extensions
|
||||||
.account_data
|
.account_data
|
||||||
.rooms
|
.rooms
|
||||||
.get(room_id)
|
.get(room_id)
|
||||||
.is_none_or(Vec::is_empty)
|
.is_none_or(Vec::is_empty)
|
||||||
&& receipt_size == 0
|
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@@ -514,7 +514,7 @@ where
|
|||||||
}))
|
}))
|
||||||
})?
|
})?
|
||||||
.or_else(|| {
|
.or_else(|| {
|
||||||
if roomsince != &0 {
|
if *roomsince != 0 {
|
||||||
Some(roomsince.to_string())
|
Some(roomsince.to_string())
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
@@ -698,6 +698,7 @@ where
|
|||||||
async fn collect_account_data(
|
async fn collect_account_data(
|
||||||
services: &Services,
|
services: &Services,
|
||||||
(sender_user, _, globalsince, body): (&UserId, &DeviceId, u64, &sync_events::v5::Request),
|
(sender_user, _, globalsince, body): (&UserId, &DeviceId, u64, &sync_events::v5::Request),
|
||||||
|
next_batch: u64,
|
||||||
) -> sync_events::v5::response::AccountData {
|
) -> sync_events::v5::response::AccountData {
|
||||||
let mut account_data = sync_events::v5::response::AccountData {
|
let mut account_data = sync_events::v5::response::AccountData {
|
||||||
global: Vec::new(),
|
global: Vec::new(),
|
||||||
@@ -715,7 +716,7 @@ async fn collect_account_data(
|
|||||||
|
|
||||||
account_data.global = services
|
account_data.global = services
|
||||||
.account_data
|
.account_data
|
||||||
.changes_since(None, sender_user, globalsince, None)
|
.changes_since(None, sender_user, globalsince, Some(next_batch))
|
||||||
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global))
|
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global))
|
||||||
.collect()
|
.collect()
|
||||||
.await;
|
.await;
|
||||||
@@ -726,7 +727,7 @@ async fn collect_account_data(
|
|||||||
room.clone(),
|
room.clone(),
|
||||||
services
|
services
|
||||||
.account_data
|
.account_data
|
||||||
.changes_since(Some(room), sender_user, globalsince, None)
|
.changes_since(Some(room), sender_user, globalsince, Some(next_batch))
|
||||||
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
|
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
|
||||||
.collect()
|
.collect()
|
||||||
.await,
|
.await,
|
||||||
@@ -745,6 +746,7 @@ async fn collect_e2ee<'a, Rooms>(
|
|||||||
u64,
|
u64,
|
||||||
&sync_events::v5::Request,
|
&sync_events::v5::Request,
|
||||||
),
|
),
|
||||||
|
next_batch: u64,
|
||||||
all_joined_rooms: Rooms,
|
all_joined_rooms: Rooms,
|
||||||
) -> Result<sync_events::v5::response::E2EE>
|
) -> Result<sync_events::v5::response::E2EE>
|
||||||
where
|
where
|
||||||
@@ -760,7 +762,7 @@ where
|
|||||||
device_list_changes.extend(
|
device_list_changes.extend(
|
||||||
services
|
services
|
||||||
.users
|
.users
|
||||||
.keys_changed(sender_user, globalsince, None)
|
.keys_changed(sender_user, globalsince, Some(next_batch))
|
||||||
.map(ToOwned::to_owned)
|
.map(ToOwned::to_owned)
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
.await,
|
.await,
|
||||||
@@ -906,7 +908,7 @@ where
|
|||||||
device_list_changes.extend(
|
device_list_changes.extend(
|
||||||
services
|
services
|
||||||
.users
|
.users
|
||||||
.room_keys_changed(room_id, globalsince, None)
|
.room_keys_changed(room_id, globalsince, Some(next_batch))
|
||||||
.map(|(user_id, _)| user_id)
|
.map(|(user_id, _)| user_id)
|
||||||
.map(ToOwned::to_owned)
|
.map(ToOwned::to_owned)
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
|
|||||||
Reference in New Issue
Block a user