Fix timeline events order in /initialSync.
Implement acccount_data for response. Ensure consistency of revealed events. Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -1,15 +1,20 @@
|
||||
use axum::extract::State;
|
||||
use futures::{FutureExt, TryStreamExt, future::try_join4};
|
||||
use ruma::api::client::room::initial_sync::v3::{PaginationChunk, Request, Response};
|
||||
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::try_join5};
|
||||
use ruma::{
|
||||
api::client::room::initial_sync::v3::{PaginationChunk, Request, Response},
|
||||
events::AnyRawAccountDataEvent,
|
||||
};
|
||||
use tuwunel_core::{
|
||||
Err, Event, Result, at,
|
||||
utils::{BoolExt, stream::TryTools},
|
||||
Err, Event, Result, at, extract_variant,
|
||||
matrix::PduCount,
|
||||
utils::stream::{ReadyExt, TryTools},
|
||||
};
|
||||
|
||||
use crate::Ruma;
|
||||
|
||||
const LIMIT_MAX: usize = 100;
|
||||
const LIMIT_MAX: usize = 50;
|
||||
|
||||
/// GET `/_matrix/client/v3/rooms/{roomId}/initialSync`
|
||||
pub(crate) async fn room_initial_sync_route(
|
||||
State(services): State<crate::State>,
|
||||
body: Ruma<Request>,
|
||||
@@ -24,13 +29,15 @@ pub(crate) async fn room_initial_sync_route(
|
||||
return Err!(Request(Forbidden("No room preview available.")));
|
||||
}
|
||||
|
||||
let next_batch = services.globals.current_count();
|
||||
|
||||
let visibility = services.directory.visibility(room_id).map(Ok);
|
||||
|
||||
let membership = services
|
||||
.state_cache
|
||||
.user_membership(body.sender_user(), room_id)
|
||||
.map(Ok);
|
||||
|
||||
let visibility = services.directory.visibility(room_id).map(Ok);
|
||||
|
||||
let state = services
|
||||
.state_accessor
|
||||
.room_state_full_pdus(room_id)
|
||||
@@ -40,42 +47,52 @@ pub(crate) async fn room_initial_sync_route(
|
||||
let limit = LIMIT_MAX;
|
||||
let events = services
|
||||
.timeline
|
||||
.pdus_rev(None, room_id, None)
|
||||
.pdus_rev(None, room_id, Some(PduCount::Normal(next_batch).saturating_add(1)))
|
||||
.try_take(limit)
|
||||
.try_collect::<Vec<_>>();
|
||||
.try_collect()
|
||||
.map_ok(|mut vec: Vec<_>| {
|
||||
vec.reverse();
|
||||
vec
|
||||
});
|
||||
|
||||
let (membership, visibility, state, events) =
|
||||
try_join4(membership, visibility, state, events)
|
||||
let account_data = services
|
||||
.account_data
|
||||
.changes_since(Some(room_id), body.sender_user(), 0, Some(next_batch))
|
||||
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
|
||||
.collect::<Vec<_>>()
|
||||
.map(Ok);
|
||||
|
||||
let (membership, visibility, state, events, account_data) =
|
||||
try_join5(membership, visibility, state, events, account_data)
|
||||
.boxed()
|
||||
.await?;
|
||||
|
||||
let messages = PaginationChunk {
|
||||
start: events
|
||||
.last()
|
||||
.map(at!(0))
|
||||
.as_ref()
|
||||
.map(ToString::to_string),
|
||||
|
||||
end: events
|
||||
.first()
|
||||
.map(at!(0))
|
||||
.as_ref()
|
||||
.map(ToString::to_string)
|
||||
.unwrap_or_default(),
|
||||
|
||||
chunk: events
|
||||
.into_iter()
|
||||
.map(at!(1))
|
||||
.map(Event::into_format)
|
||||
.collect(),
|
||||
};
|
||||
|
||||
Ok(Response {
|
||||
room_id: room_id.to_owned(),
|
||||
account_data: None,
|
||||
state: state.into(),
|
||||
messages: messages.chunk.is_empty().or_some(messages),
|
||||
visibility: visibility.into(),
|
||||
membership,
|
||||
visibility: visibility.into(),
|
||||
account_data: Some(account_data),
|
||||
state: state.into(),
|
||||
messages: PaginationChunk {
|
||||
start: events
|
||||
.first()
|
||||
.map(at!(0))
|
||||
.as_ref()
|
||||
.map(ToString::to_string),
|
||||
|
||||
end: events
|
||||
.last()
|
||||
.map(at!(0))
|
||||
.as_ref()
|
||||
.map(ToString::to_string)
|
||||
.unwrap_or_default(),
|
||||
|
||||
chunk: events
|
||||
.into_iter()
|
||||
.map(at!(1))
|
||||
.map(Event::into_format)
|
||||
.collect(),
|
||||
}
|
||||
.into(),
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user