From dff30e59243365b6c45f0d484e9a8f05a42fa4f1 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Tue, 7 Oct 2025 09:48:45 +0000 Subject: [PATCH] Additional state accessor iterations for pdus of a specific type. Signed-off-by: Jason Volk --- src/service/mod.rs | 2 +- .../rooms/state_accessor/room_state.rs | 39 +++++++++++++++++++ src/service/rooms/state_accessor/state.rs | 18 +++++++++ 3 files changed, 58 insertions(+), 1 deletion(-) diff --git a/src/service/mod.rs b/src/service/mod.rs index fe8a4c09..36c51408 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -1,4 +1,4 @@ -#![type_length_limit = "8192"] +#![type_length_limit = "24576"] #![allow(refining_impl_trait)] mod manager; diff --git a/src/service/rooms/state_accessor/room_state.rs b/src/service/rooms/state_accessor/room_state.rs index 215ba448..0879c648 100644 --- a/src/service/rooms/state_accessor/room_state.rs +++ b/src/service/rooms/state_accessor/room_state.rs @@ -24,6 +24,26 @@ where .and_then(|event| event.get_content()) } +/// Returns the room state events for a specific type. +#[implement(super::Service)] +#[tracing::instrument(skip(self), level = "debug")] +pub fn room_state_type_pdus<'a>( + &'a self, + room_id: &'a RoomId, + event_type: &'a StateEventType, +) -> impl Stream> + Send + 'a { + self.services + .state + .get_room_shortstatehash(room_id) + .map_ok(|shortstatehash| { + self.state_type_pdus(shortstatehash, event_type) + .map(Ok) + .boxed() + }) + .map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}"))) + .try_flatten_stream() +} + /// Returns the full room state. #[implement(super::Service)] #[tracing::instrument(skip(self), level = "debug")] @@ -79,6 +99,25 @@ where .await } +/// Iterates the state_keys for an event_type in the state +#[implement(super::Service)] +#[tracing::instrument(skip(self), level = "debug")] +pub fn room_state_keys<'a>( + &'a self, + room_id: &'a RoomId, + event_type: &'a StateEventType, +) -> impl Stream> + Send + 'a { + self.services + .state + .get_room_shortstatehash(room_id) + .map_ok(|shortstatehash| { + self.state_keys(shortstatehash, event_type) + .map(Ok) + }) + .map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}"))) + .try_flatten_stream() +} + /// Returns a single PDU from `room_id` with key (`event_type`, /// `state_key`). #[implement(super::Service)] diff --git a/src/service/rooms/state_accessor/state.rs b/src/service/rooms/state_accessor/state.rs index 2b98e139..4621760c 100644 --- a/src/service/rooms/state_accessor/state.rs +++ b/src/service/rooms/state_accessor/state.rs @@ -188,6 +188,24 @@ pub async fn state_get_shortid( .await? } +/// Iterates the events for an event_type in the state. +#[implement(super::Service)] +pub fn state_type_pdus<'a>( + &'a self, + shortstatehash: ShortStateHash, + event_type: &'a StateEventType, +) -> impl Stream + Send + 'a { + self.state_keys_with_ids(shortstatehash, event_type) + .map(at!(1)) + .broad_filter_map(async |event_id: OwnedEventId| { + self.services + .timeline + .get_pdu(&event_id) + .await + .ok() + }) +} + /// Iterates the state_keys for an event_type in the state; current state /// event_id included. #[implement(super::Service)]