Optimize spaces pageload; pipeline outer loop.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-08-07 03:16:32 +00:00
parent e106e50ed0
commit d7e5c0ccf3
2 changed files with 53 additions and 26 deletions

View File

@@ -1,21 +1,21 @@
use std::{
collections::{BTreeSet, VecDeque},
str::FromStr,
};
use std::{collections::BTreeSet, iter::once, str::FromStr};
use axum::extract::State;
use futures::{StreamExt, TryFutureExt, future::OptionFuture};
use futures::{FutureExt, StreamExt, TryFutureExt, future::OptionFuture, stream::FuturesOrdered};
use ruma::{
OwnedRoomId, OwnedServerName, RoomId, UInt, UserId, api::client::space::get_hierarchy,
};
use tuwunel_core::{
Err, Result,
Err, Result, debug_error,
utils::{future::TryExtExt, stream::IterStream},
};
use tuwunel_service::{
Services,
rooms::spaces::{
PaginationToken, SummaryAccessibility, get_parent_children_via, summary_to_chunk,
rooms::{
short::ShortRoomId,
spaces::{
PaginationToken, SummaryAccessibility, get_parent_children_via, summary_to_chunk,
},
},
};
@@ -77,30 +77,39 @@ async fn get_client_hierarchy<'a, ShortRoomIds>(
short_room_ids: ShortRoomIds,
) -> Result<get_hierarchy::v1::Response>
where
ShortRoomIds: Iterator<Item = &'a u64> + Clone + Send + Sync + 'a,
ShortRoomIds: Iterator<Item = &'a ShortRoomId> + Clone + Send + Sync + 'a,
{
type Via = Vec<OwnedServerName>;
type Entry = (OwnedRoomId, Via);
type Rooms = VecDeque<Entry>;
type Via = Vec<OwnedServerName>;
let mut queue: Rooms = [(
room_id.to_owned(),
room_id
let initial = async move {
let via = room_id
.server_name()
.map(ToOwned::to_owned)
.into_iter()
.collect(),
)]
.into();
.collect::<Vec<_>>();
let mut rooms = Vec::with_capacity(limit);
let mut parents = BTreeSet::new();
while let Some((current_room, via)) = queue.pop_front() {
let summary = services
.rooms
.spaces
.get_summary_and_children_client(&current_room, suggested_only, sender_user, &via)
.await?;
.get_summary_and_children_client(room_id, suggested_only, sender_user, &via)
.await;
(room_id.to_owned(), via, summary)
};
let mut parents = BTreeSet::new();
let mut rooms = Vec::with_capacity(limit);
let mut queue: FuturesOrdered<_> = once(initial.boxed()).collect();
while let Some((current_room, via, summary)) = queue.next().await {
let summary = match summary {
| Ok(summary) => summary,
| Err(e) => {
debug_error!(?current_room, ?via, ?e, "error getting summary");
continue;
},
};
match (summary, current_room == room_id) {
| (None | Some(SummaryAccessibility::Inaccessible), false) => {
@@ -159,14 +168,32 @@ where
continue;
}
queue.extend(children);
children
.into_iter()
.map(|(room_id, via)| async move {
let summary = services
.rooms
.spaces
.get_summary_and_children_client(
&room_id,
suggested_only,
sender_user,
&via,
)
.await;
(room_id, via, summary)
})
.map(FutureExt::boxed)
.for_each(|entry| queue.push_back(entry));
},
}
}
let next_batch: OptionFuture<_> = queue
.pop_front()
.map(async |(room, _)| {
.next()
.await
.map(async |(room, ..)| {
parents.insert(room);
let next_short_room_ids: Vec<_> = parents

View File

@@ -298,7 +298,7 @@ fn get_space_child_events<'a>(
#[implement(Service)]
pub async fn get_summary_and_children_client(
&self,
current_room: &OwnedRoomId,
current_room: &RoomId,
suggested_only: bool,
user_id: &UserId,
via: &[OwnedServerName],