Collect v5 typing events concurrent with other extensions.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -8,7 +8,7 @@ use std::{
|
|||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
use futures::{
|
use futures::{
|
||||||
FutureExt, Stream, StreamExt, TryFutureExt,
|
FutureExt, Stream, StreamExt, TryFutureExt,
|
||||||
future::{OptionFuture, join3, try_join4},
|
future::{OptionFuture, join3, try_join5},
|
||||||
pin_mut,
|
pin_mut,
|
||||||
};
|
};
|
||||||
use ruma::{
|
use ruma::{
|
||||||
@@ -139,17 +139,19 @@ pub(crate) async fn sync_events_v5_route(
|
|||||||
|
|
||||||
let to_device = collect_to_device(services, sync_info, next_batch).map(Ok);
|
let to_device = collect_to_device(services, sync_info, next_batch).map(Ok);
|
||||||
|
|
||||||
let receipts = collect_receipts(services).map(Ok);
|
let receipts = collect_receipts(services, sync_info, next_batch).map(Ok);
|
||||||
|
|
||||||
let (account_data, e2ee, to_device, receipts) =
|
let typing = collect_typing_events(services, sync_info, next_batch, all_joined_rooms.clone());
|
||||||
try_join4(account_data, e2ee, to_device, receipts).await?;
|
|
||||||
|
let (account_data, e2ee, to_device, receipts, typing) =
|
||||||
|
try_join5(account_data, e2ee, to_device, receipts, typing).await?;
|
||||||
|
|
||||||
let extensions = sync_events::v5::response::Extensions {
|
let extensions = sync_events::v5::response::Extensions {
|
||||||
account_data,
|
account_data,
|
||||||
e2ee,
|
e2ee,
|
||||||
to_device,
|
to_device,
|
||||||
receipts,
|
receipts,
|
||||||
typing: sync_events::v5::response::Typing::default(),
|
typing,
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut response = sync_events::v5::Response {
|
let mut response = sync_events::v5::Response {
|
||||||
@@ -172,9 +174,6 @@ pub(crate) async fn sync_events_v5_route(
|
|||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
response.extensions.typing =
|
|
||||||
collect_typing_events(services, sender_user, &cached, all_joined_rooms.clone()).await?;
|
|
||||||
|
|
||||||
fetch_subscriptions(services, sync_info, &known_rooms, &mut todo_rooms).await;
|
fetch_subscriptions(services, sync_info, &known_rooms, &mut todo_rooms).await;
|
||||||
|
|
||||||
response.rooms = process_rooms(
|
response.rooms = process_rooms(
|
||||||
@@ -968,8 +967,8 @@ async fn collect_to_device(
|
|||||||
|
|
||||||
async fn collect_typing_events<'a, Rooms>(
|
async fn collect_typing_events<'a, Rooms>(
|
||||||
services: &Services,
|
services: &Services,
|
||||||
sender_user: &UserId,
|
(sender_user, _, _, body): SyncInfo<'_>,
|
||||||
body: &sync_events::v5::Request,
|
_next_batch: u64,
|
||||||
rooms: Rooms,
|
rooms: Rooms,
|
||||||
) -> Result<sync_events::v5::response::Typing>
|
) -> Result<sync_events::v5::response::Typing>
|
||||||
where
|
where
|
||||||
@@ -1007,7 +1006,11 @@ where
|
|||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn collect_receipts(_services: &Services) -> sync_events::v5::response::Receipts {
|
async fn collect_receipts(
|
||||||
|
_services: &Services,
|
||||||
|
(_sender_user, _, _globalsince, _body): SyncInfo<'_>,
|
||||||
|
_next_batch: u64,
|
||||||
|
) -> sync_events::v5::response::Receipts {
|
||||||
sync_events::v5::response::Receipts { rooms: BTreeMap::new() }
|
sync_events::v5::response::Receipts { rooms: BTreeMap::new() }
|
||||||
// TODO: get explicitly requested read receipts
|
// TODO: get explicitly requested read receipts
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user