Add iteration of space children; simplify space child event fetcher.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
#![type_length_limit = "24576"]
|
||||
#![type_length_limit = "98304"]
|
||||
#![allow(refining_impl_trait)]
|
||||
|
||||
mod manager;
|
||||
|
||||
@@ -27,7 +27,7 @@ use tuwunel_core::{
|
||||
IterStream,
|
||||
future::{BoolExt, TryExtExt},
|
||||
math::usize_from_f64,
|
||||
stream::{BroadbandExt, ReadyExt},
|
||||
stream::{BroadbandExt, ReadyExt, TryReadyExt},
|
||||
},
|
||||
};
|
||||
|
||||
@@ -225,48 +225,6 @@ async fn get_summary_and_children_federation(
|
||||
Ok(Some(accessibility))
|
||||
}
|
||||
|
||||
/// Simply returns the stripped m.space.child events of a room
|
||||
#[implement(Service)]
|
||||
fn get_space_child_events<'a>(
|
||||
&'a self,
|
||||
room_id: &'a RoomId,
|
||||
) -> impl Stream<Item = PduEvent> + Send + 'a {
|
||||
self.services
|
||||
.state
|
||||
.get_room_shortstatehash(room_id)
|
||||
.map_ok(|current_shortstatehash| {
|
||||
self.services
|
||||
.state_accessor
|
||||
.state_keys_with_ids(current_shortstatehash, &StateEventType::SpaceChild)
|
||||
.boxed()
|
||||
})
|
||||
.map(Result::into_iter)
|
||||
.map(IterStream::stream)
|
||||
.map(StreamExt::flatten)
|
||||
.flatten_stream()
|
||||
.broad_filter_map(async move |(state_key, event_id): (_, OwnedEventId)| {
|
||||
self.services
|
||||
.timeline
|
||||
.get_pdu(&event_id)
|
||||
.map_ok(move |pdu| (state_key, pdu))
|
||||
.ok()
|
||||
.await
|
||||
})
|
||||
.ready_filter_map(move |(state_key, pdu)| {
|
||||
if let Ok(content) = pdu.get_content::<SpaceChildEventContent>() {
|
||||
if content.via.is_empty() {
|
||||
return None;
|
||||
}
|
||||
}
|
||||
|
||||
if RoomId::parse(&state_key).is_err() {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(pdu)
|
||||
})
|
||||
}
|
||||
|
||||
/// Gets the summary of a space using either local or remote (federation)
|
||||
/// sources
|
||||
#[implement(Service)]
|
||||
@@ -550,6 +508,52 @@ async fn cache_insert(
|
||||
cache.insert(current_room.to_owned(), Some(CachedSpaceHierarchySummary { summary }));
|
||||
}
|
||||
|
||||
/// Simply returns the stripped m.space.child events of a room
|
||||
#[implement(Service)]
|
||||
fn get_space_child_events<'a>(
|
||||
&'a self,
|
||||
room_id: &'a RoomId,
|
||||
) -> impl Stream<Item = PduEvent> + Send + 'a {
|
||||
self.services
|
||||
.state_accessor
|
||||
.room_state_keys_with_ids(room_id, &StateEventType::SpaceChild)
|
||||
.ready_filter_map(Result::ok)
|
||||
.broad_filter_map(async move |(state_key, event_id): (_, OwnedEventId)| {
|
||||
self.services
|
||||
.timeline
|
||||
.get_pdu(&event_id)
|
||||
.map_ok(move |pdu| (state_key, pdu))
|
||||
.ok()
|
||||
.await
|
||||
})
|
||||
.ready_filter_map(move |(state_key, pdu)| {
|
||||
if let Ok(content) = pdu.get_content::<SpaceChildEventContent>() {
|
||||
if content.via.is_empty() {
|
||||
return None;
|
||||
}
|
||||
}
|
||||
|
||||
if RoomId::parse(&state_key).is_err() {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(pdu)
|
||||
})
|
||||
}
|
||||
|
||||
/// Simply returns the stripped m.space.child events of a room
|
||||
#[implement(Service)]
|
||||
pub fn get_space_children<'a>(
|
||||
&'a self,
|
||||
room_id: &'a RoomId,
|
||||
) -> impl Stream<Item = OwnedRoomId> + Send + 'a {
|
||||
self.services
|
||||
.state_accessor
|
||||
.room_state_keys(room_id, &StateEventType::SpaceChild)
|
||||
.ready_and_then(|state_key| OwnedRoomId::parse(state_key.as_str()).map_err(Into::into))
|
||||
.ready_filter_map(Result::ok)
|
||||
}
|
||||
|
||||
// Here because cannot implement `From` across ruma-federation-api and
|
||||
// ruma-client-api types
|
||||
impl From<CachedSpaceHierarchySummary> for SpaceHierarchyRoomsChunk {
|
||||
|
||||
Reference in New Issue
Block a user