Additional state accessor iterations for pdus of a specific type.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-10-07 09:48:45 +00:00
parent 7fee459b1a
commit dff30e5924
3 changed files with 58 additions and 1 deletions

View File

@@ -1,4 +1,4 @@
#![type_length_limit = "8192"]
#![type_length_limit = "24576"]
#![allow(refining_impl_trait)]
mod manager;

View File

@@ -24,6 +24,26 @@ where
.and_then(|event| event.get_content())
}
/// Returns the room state events for a specific type.
#[implement(super::Service)]
#[tracing::instrument(skip(self), level = "debug")]
pub fn room_state_type_pdus<'a>(
&'a self,
room_id: &'a RoomId,
event_type: &'a StateEventType,
) -> impl Stream<Item = Result<impl Event>> + Send + 'a {
self.services
.state
.get_room_shortstatehash(room_id)
.map_ok(|shortstatehash| {
self.state_type_pdus(shortstatehash, event_type)
.map(Ok)
.boxed()
})
.map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}")))
.try_flatten_stream()
}
/// Returns the full room state.
#[implement(super::Service)]
#[tracing::instrument(skip(self), level = "debug")]
@@ -79,6 +99,25 @@ where
.await
}
/// Iterates the state_keys for an event_type in the state
#[implement(super::Service)]
#[tracing::instrument(skip(self), level = "debug")]
pub fn room_state_keys<'a>(
&'a self,
room_id: &'a RoomId,
event_type: &'a StateEventType,
) -> impl Stream<Item = Result<StateKey>> + Send + 'a {
self.services
.state
.get_room_shortstatehash(room_id)
.map_ok(|shortstatehash| {
self.state_keys(shortstatehash, event_type)
.map(Ok)
})
.map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}")))
.try_flatten_stream()
}
/// Returns a single PDU from `room_id` with key (`event_type`,
/// `state_key`).
#[implement(super::Service)]

View File

@@ -188,6 +188,24 @@ pub async fn state_get_shortid(
.await?
}
/// Iterates the events for an event_type in the state.
#[implement(super::Service)]
pub fn state_type_pdus<'a>(
&'a self,
shortstatehash: ShortStateHash,
event_type: &'a StateEventType,
) -> impl Stream<Item = impl Event> + Send + 'a {
self.state_keys_with_ids(shortstatehash, event_type)
.map(at!(1))
.broad_filter_map(async |event_id: OwnedEventId| {
self.services
.timeline
.get_pdu(&event_id)
.await
.ok()
})
}
/// Iterates the state_keys for an event_type in the state; current state
/// event_id included.
#[implement(super::Service)]