diff --git a/src/api/client/space.rs b/src/api/client/space.rs index 12d7f597..b63a4a2f 100644 --- a/src/api/client/space.rs +++ b/src/api/client/space.rs @@ -1,21 +1,21 @@ -use std::{ - collections::{BTreeSet, VecDeque}, - str::FromStr, -}; +use std::{collections::BTreeSet, iter::once, str::FromStr}; use axum::extract::State; -use futures::{StreamExt, TryFutureExt, future::OptionFuture}; +use futures::{FutureExt, StreamExt, TryFutureExt, future::OptionFuture, stream::FuturesOrdered}; use ruma::{ OwnedRoomId, OwnedServerName, RoomId, UInt, UserId, api::client::space::get_hierarchy, }; use tuwunel_core::{ - Err, Result, + Err, Result, debug_error, utils::{future::TryExtExt, stream::IterStream}, }; use tuwunel_service::{ Services, - rooms::spaces::{ - PaginationToken, SummaryAccessibility, get_parent_children_via, summary_to_chunk, + rooms::{ + short::ShortRoomId, + spaces::{ + PaginationToken, SummaryAccessibility, get_parent_children_via, summary_to_chunk, + }, }, }; @@ -77,30 +77,39 @@ async fn get_client_hierarchy<'a, ShortRoomIds>( short_room_ids: ShortRoomIds, ) -> Result where - ShortRoomIds: Iterator + Clone + Send + Sync + 'a, + ShortRoomIds: Iterator + Clone + Send + Sync + 'a, { - type Via = Vec; type Entry = (OwnedRoomId, Via); - type Rooms = VecDeque; + type Via = Vec; - let mut queue: Rooms = [( - room_id.to_owned(), - room_id + let initial = async move { + let via = room_id .server_name() .map(ToOwned::to_owned) .into_iter() - .collect(), - )] - .into(); + .collect::>(); - let mut rooms = Vec::with_capacity(limit); - let mut parents = BTreeSet::new(); - while let Some((current_room, via)) = queue.pop_front() { let summary = services .rooms .spaces - .get_summary_and_children_client(¤t_room, suggested_only, sender_user, &via) - .await?; + .get_summary_and_children_client(room_id, suggested_only, sender_user, &via) + .await; + + (room_id.to_owned(), via, summary) + }; + + let mut parents = BTreeSet::new(); + let mut rooms = Vec::with_capacity(limit); + let mut queue: FuturesOrdered<_> = once(initial.boxed()).collect(); + + while let Some((current_room, via, summary)) = queue.next().await { + let summary = match summary { + | Ok(summary) => summary, + | Err(e) => { + debug_error!(?current_room, ?via, ?e, "error getting summary"); + continue; + }, + }; match (summary, current_room == room_id) { | (None | Some(SummaryAccessibility::Inaccessible), false) => { @@ -159,14 +168,32 @@ where continue; } - queue.extend(children); + children + .into_iter() + .map(|(room_id, via)| async move { + let summary = services + .rooms + .spaces + .get_summary_and_children_client( + &room_id, + suggested_only, + sender_user, + &via, + ) + .await; + + (room_id, via, summary) + }) + .map(FutureExt::boxed) + .for_each(|entry| queue.push_back(entry)); }, } } let next_batch: OptionFuture<_> = queue - .pop_front() - .map(async |(room, _)| { + .next() + .await + .map(async |(room, ..)| { parents.insert(room); let next_short_room_ids: Vec<_> = parents diff --git a/src/service/rooms/spaces/mod.rs b/src/service/rooms/spaces/mod.rs index a37253da..84329c5d 100644 --- a/src/service/rooms/spaces/mod.rs +++ b/src/service/rooms/spaces/mod.rs @@ -298,7 +298,7 @@ fn get_space_child_events<'a>( #[implement(Service)] pub async fn get_summary_and_children_client( &self, - current_room: &OwnedRoomId, + current_room: &RoomId, suggested_only: bool, user_id: &UserId, via: &[OwnedServerName],