Add count value to the to_device iter item.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -375,6 +375,7 @@ async fn build_sync_events(
|
|||||||
let to_device_events = services
|
let to_device_events = services
|
||||||
.users
|
.users
|
||||||
.get_to_device_events(sender_user, sender_device, Some(since), Some(next_batch))
|
.get_to_device_events(sender_user, sender_device, Some(since), Some(next_batch))
|
||||||
|
.map(at!(1))
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
let device_one_time_keys_count = services
|
let device_one_time_keys_count = services
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use ruma::api::client::sync::sync_events::v5::response;
|
use ruma::api::client::sync::sync_events::v5::response;
|
||||||
use tuwunel_core::{self, Result};
|
use tuwunel_core::{self, Result, at};
|
||||||
|
|
||||||
use super::{Connection, SyncInfo};
|
use super::{Connection, SyncInfo};
|
||||||
|
|
||||||
@@ -17,6 +17,7 @@ pub(super) async fn collect(
|
|||||||
let events: Vec<_> = services
|
let events: Vec<_> = services
|
||||||
.users
|
.users
|
||||||
.get_to_device_events(sender_user, sender_device, None, Some(conn.next_batch))
|
.get_to_device_events(sender_user, sender_device, None, Some(conn.next_batch))
|
||||||
|
.map(at!(1))
|
||||||
.collect()
|
.collect()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
|||||||
@@ -319,7 +319,7 @@ pub fn get_to_device_events<'a>(
|
|||||||
device_id: &'a DeviceId,
|
device_id: &'a DeviceId,
|
||||||
since: Option<u64>,
|
since: Option<u64>,
|
||||||
to: Option<u64>,
|
to: Option<u64>,
|
||||||
) -> impl Stream<Item = Raw<AnyToDeviceEvent>> + Send + 'a {
|
) -> impl Stream<Item = (u64, Raw<AnyToDeviceEvent>)> + Send + 'a {
|
||||||
type Key<'a> = (&'a UserId, &'a DeviceId, u64);
|
type Key<'a> = (&'a UserId, &'a DeviceId, u64);
|
||||||
|
|
||||||
let from = (user_id, device_id, since.map_or(0, |since| since.saturating_add(1)));
|
let from = (user_id, device_id, since.map_or(0, |since| since.saturating_add(1)));
|
||||||
@@ -331,7 +331,7 @@ pub fn get_to_device_events<'a>(
|
|||||||
.ready_take_while(move |((user_id_, device_id_, count), _): &(Key<'_>, _)| {
|
.ready_take_while(move |((user_id_, device_id_, count), _): &(Key<'_>, _)| {
|
||||||
user_id == *user_id_ && device_id == *device_id_ && to.is_none_or(|to| *count <= to)
|
user_id == *user_id_ && device_id == *device_id_ && to.is_none_or(|to| *count <= to)
|
||||||
})
|
})
|
||||||
.map(at!(1))
|
.map(|((_, _, count), event)| (count, event))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[implement(super::Service)]
|
#[implement(super::Service)]
|
||||||
|
|||||||
Reference in New Issue
Block a user