From 6b8d6956a3e8a3fcb7ac410a0443b1ed19ab99cc Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sun, 2 Nov 2025 06:32:52 +0000 Subject: [PATCH] Fix timeline events order in /initialSync. Implement acccount_data for response. Ensure consistency of revealed events. Signed-off-by: Jason Volk --- src/api/client/room/initial_sync.rs | 89 +++++++++++++++++------------ 1 file changed, 53 insertions(+), 36 deletions(-) diff --git a/src/api/client/room/initial_sync.rs b/src/api/client/room/initial_sync.rs index 58611969..e0c6c9c6 100644 --- a/src/api/client/room/initial_sync.rs +++ b/src/api/client/room/initial_sync.rs @@ -1,15 +1,20 @@ use axum::extract::State; -use futures::{FutureExt, TryStreamExt, future::try_join4}; -use ruma::api::client::room::initial_sync::v3::{PaginationChunk, Request, Response}; +use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::try_join5}; +use ruma::{ + api::client::room::initial_sync::v3::{PaginationChunk, Request, Response}, + events::AnyRawAccountDataEvent, +}; use tuwunel_core::{ - Err, Event, Result, at, - utils::{BoolExt, stream::TryTools}, + Err, Event, Result, at, extract_variant, + matrix::PduCount, + utils::stream::{ReadyExt, TryTools}, }; use crate::Ruma; -const LIMIT_MAX: usize = 100; +const LIMIT_MAX: usize = 50; +/// GET `/_matrix/client/v3/rooms/{roomId}/initialSync` pub(crate) async fn room_initial_sync_route( State(services): State, body: Ruma, @@ -24,13 +29,15 @@ pub(crate) async fn room_initial_sync_route( return Err!(Request(Forbidden("No room preview available."))); } + let next_batch = services.globals.current_count(); + + let visibility = services.directory.visibility(room_id).map(Ok); + let membership = services .state_cache .user_membership(body.sender_user(), room_id) .map(Ok); - let visibility = services.directory.visibility(room_id).map(Ok); - let state = services .state_accessor .room_state_full_pdus(room_id) @@ -40,42 +47,52 @@ pub(crate) async fn room_initial_sync_route( let limit = LIMIT_MAX; let events = services .timeline - .pdus_rev(None, room_id, None) + .pdus_rev(None, room_id, Some(PduCount::Normal(next_batch).saturating_add(1))) .try_take(limit) - .try_collect::>(); + .try_collect() + .map_ok(|mut vec: Vec<_>| { + vec.reverse(); + vec + }); - let (membership, visibility, state, events) = - try_join4(membership, visibility, state, events) + let account_data = services + .account_data + .changes_since(Some(room_id), body.sender_user(), 0, Some(next_batch)) + .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) + .collect::>() + .map(Ok); + + let (membership, visibility, state, events, account_data) = + try_join5(membership, visibility, state, events, account_data) .boxed() .await?; - let messages = PaginationChunk { - start: events - .last() - .map(at!(0)) - .as_ref() - .map(ToString::to_string), - - end: events - .first() - .map(at!(0)) - .as_ref() - .map(ToString::to_string) - .unwrap_or_default(), - - chunk: events - .into_iter() - .map(at!(1)) - .map(Event::into_format) - .collect(), - }; - Ok(Response { room_id: room_id.to_owned(), - account_data: None, - state: state.into(), - messages: messages.chunk.is_empty().or_some(messages), - visibility: visibility.into(), membership, + visibility: visibility.into(), + account_data: Some(account_data), + state: state.into(), + messages: PaginationChunk { + start: events + .first() + .map(at!(0)) + .as_ref() + .map(ToString::to_string), + + end: events + .last() + .map(at!(0)) + .as_ref() + .map(ToString::to_string) + .unwrap_or_default(), + + chunk: events + .into_iter() + .map(at!(1)) + .map(Event::into_format) + .collect(), + } + .into(), }) }