Optimize filter fetch in syncv3 prologue.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -128,16 +128,33 @@ pub(crate) async fn sync_events_route(
|
||||
) -> Result<sync_events::v3::Response> {
|
||||
let (sender_user, sender_device) = body.sender();
|
||||
|
||||
let filter: OptionFuture<_> = body
|
||||
.body
|
||||
.filter
|
||||
.as_ref()
|
||||
.map(async |filter| match filter {
|
||||
| Filter::FilterDefinition(filter) => filter.clone(),
|
||||
| Filter::FilterId(filter_id) => services
|
||||
.users
|
||||
.get_filter(sender_user, filter_id)
|
||||
.await
|
||||
.unwrap_or_default(),
|
||||
})
|
||||
.into();
|
||||
|
||||
let filter = filter.map(Option::unwrap_or_default);
|
||||
let full_state = body.body.full_state;
|
||||
let set_presence = &body.body.set_presence;
|
||||
let ping_presence = services
|
||||
.presence
|
||||
.maybe_ping_presence(sender_user, Some(sender_device), &body.body.set_presence)
|
||||
.maybe_ping_presence(sender_user, body.sender_device.as_deref(), set_presence)
|
||||
.inspect_err(inspect_log)
|
||||
.ok();
|
||||
|
||||
// Record user as actively syncing for push suppression heuristic.
|
||||
let note_sync = services.presence.note_sync(sender_user);
|
||||
|
||||
join(ping_presence, note_sync).await;
|
||||
let (filter, ..) = join3(filter, ping_presence, note_sync).await;
|
||||
|
||||
let mut since = body
|
||||
.body
|
||||
@@ -175,22 +192,31 @@ pub(crate) async fn sync_events_route(
|
||||
let next_batch = services.globals.wait_pending().await?;
|
||||
debug_assert!(since <= next_batch, "next_batch is monotonic");
|
||||
|
||||
if since < next_batch || body.body.full_state {
|
||||
let response = build_sync_events(&services, &body, since, next_batch).await?;
|
||||
if since < next_batch || full_state {
|
||||
let response = build_sync_events(
|
||||
&services,
|
||||
body.sender(),
|
||||
since,
|
||||
next_batch,
|
||||
full_state,
|
||||
&filter,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let empty = response.rooms.is_empty()
|
||||
&& response.presence.is_empty()
|
||||
&& response.account_data.is_empty()
|
||||
&& response.device_lists.is_empty()
|
||||
&& response.to_device.is_empty();
|
||||
|
||||
if !empty || body.body.full_state {
|
||||
if !empty || full_state {
|
||||
return Ok(response);
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for activity
|
||||
if time::timeout_at(stop_at, watchers).await.is_err() || services.server.is_stopping() {
|
||||
let response = build_empty_response(&services, &body, next_batch).await;
|
||||
let response = build_empty_response(&services, body.sender(), next_batch).await;
|
||||
trace!(since, next_batch, "empty response");
|
||||
return Ok(response);
|
||||
}
|
||||
@@ -209,13 +235,13 @@ pub(crate) async fn sync_events_route(
|
||||
|
||||
async fn build_empty_response(
|
||||
services: &Services,
|
||||
body: &Ruma<sync_events::v3::Request>,
|
||||
(sender_user, sender_device): (&UserId, &DeviceId),
|
||||
next_batch: u64,
|
||||
) -> sync_events::v3::Response {
|
||||
sync_events::v3::Response {
|
||||
device_one_time_keys_count: services
|
||||
.users
|
||||
.count_one_time_keys(body.sender_user(), body.sender_device())
|
||||
.count_one_time_keys(sender_user, sender_device)
|
||||
.await,
|
||||
|
||||
..sync_events::v3::Response::new(to_small_string(next_batch))
|
||||
@@ -235,23 +261,12 @@ async fn build_empty_response(
|
||||
)]
|
||||
async fn build_sync_events(
|
||||
services: &Services,
|
||||
body: &Ruma<sync_events::v3::Request>,
|
||||
(sender_user, sender_device): (&UserId, &DeviceId),
|
||||
since: u64,
|
||||
next_batch: u64,
|
||||
full_state: bool,
|
||||
filter: &FilterDefinition,
|
||||
) -> Result<sync_events::v3::Response> {
|
||||
let (sender_user, sender_device) = body.sender();
|
||||
|
||||
let full_state = body.body.full_state;
|
||||
let filter = match body.body.filter.as_ref() {
|
||||
| None => FilterDefinition::default(),
|
||||
| Some(Filter::FilterDefinition(filter)) => filter.clone(),
|
||||
| Some(Filter::FilterId(filter_id)) => services
|
||||
.users
|
||||
.get_filter(sender_user, filter_id)
|
||||
.await
|
||||
.unwrap_or_default(),
|
||||
};
|
||||
|
||||
let joined_rooms = services
|
||||
.state_cache
|
||||
.rooms_joined(sender_user)
|
||||
@@ -266,7 +281,7 @@ async fn build_sync_events(
|
||||
since,
|
||||
next_batch,
|
||||
full_state,
|
||||
&filter,
|
||||
filter,
|
||||
)
|
||||
.map_ok(move |(joined_room, dlu, jeu)| (room_id, joined_room, dlu, jeu))
|
||||
.ok()
|
||||
@@ -297,7 +312,7 @@ async fn build_sync_events(
|
||||
sender_user,
|
||||
next_batch,
|
||||
full_state,
|
||||
&filter,
|
||||
filter,
|
||||
)
|
||||
.map_ok(move |left_room| (room_id, left_room))
|
||||
.ok()
|
||||
@@ -356,7 +371,7 @@ async fn build_sync_events(
|
||||
let presence_updates: OptionFuture<_> = services
|
||||
.config
|
||||
.allow_local_presence
|
||||
.then(|| process_presence_updates(services, since, next_batch, sender_user, &filter))
|
||||
.then(|| process_presence_updates(services, since, next_batch, sender_user, filter))
|
||||
.into();
|
||||
|
||||
let account_data = services
|
||||
|
||||
Reference in New Issue
Block a user