Optimize force_state query pattern.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-09-27 11:28:22 +00:00
parent 83afe81f60
commit ba12773a5a

View File

@@ -1,20 +1,17 @@
use std::{collections::HashMap, fmt::Write, iter::once, sync::Arc};
use async_trait::async_trait;
use futures::{
FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::join_all, pin_mut,
};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::join_all};
use ruma::{
EventId, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId, UserId,
events::{
AnyStrippedStateEvent, StateEventType, TimelineEventType,
room::member::RoomMemberEventContent,
},
events::{AnyStrippedStateEvent, StateEventType, TimelineEventType},
room_version_rules::AuthorizationRules,
serde::Raw,
};
use tuwunel_core::{
Event, PduEvent, Result, err, implement,
Event, PduEvent, Result, err,
error::inspect_debug_log,
implement,
matrix::{RoomVersionRules, StateKey, TypeStateKey, room_version},
result::{AndThenRef, FlatOk},
state_res::{StateMap, auth_types_for_event},
@@ -22,7 +19,7 @@ use tuwunel_core::{
utils::{
IterStream, MutexMap, MutexMapGuard, ReadyExt, calculate_hash,
mutex_map::Guard,
stream::{BroadbandExt, TryIgnore},
stream::{BroadbandExt, TryIgnore, WidebandExt},
},
warn,
};
@@ -94,24 +91,27 @@ pub async fn force_state(
_statediffremoved: Arc<CompressedState>,
state_lock: &RoomMutexGuard,
) -> Result {
let event_ids = statediffnew
statediffnew
.iter()
.stream()
.map(|&new| parse_compressed_state_event(new).1)
.then(|shorteventid| {
self.services
.wide_filter_map(async |shorteventid| {
let event_id: OwnedEventId = self
.services
.short
.get_eventid_from_short::<Box<_>>(shorteventid)
.get_eventid_from_short(shorteventid)
.inspect_err(inspect_debug_log)
.await
.ok()?;
self.services
.timeline
.get_pdu(&event_id)
.await
.ok()
})
.ignore_err();
pin_mut!(event_ids);
while let Some(event_id) = event_ids.next().await {
let Ok(pdu) = self.services.timeline.get_pdu(&event_id).await else {
continue;
};
match pdu.kind {
.map(Ok)
.try_for_each(async |pdu| match pdu.kind {
| TimelineEventType::RoomMember => {
let Some(user_id) = pdu
.state_key
@@ -119,11 +119,11 @@ pub async fn force_state(
.map(UserId::parse)
.flat_ok()
else {
continue;
return Ok(());
};
let Ok(membership_event) = pdu.get_content::<RoomMemberEventContent>() else {
continue;
let Ok(membership_event) = pdu.get_content() else {
return Ok(());
};
self.services
@@ -137,7 +137,7 @@ pub async fn force_state(
None,
false,
)
.await?;
.await
},
| TimelineEventType::SpaceChild => {
self.services
@@ -146,10 +146,13 @@ pub async fn force_state(
.lock()
.await
.remove(&pdu.room_id);
Ok(())
},
| _ => continue,
}
}
| _ => Ok(()),
})
.boxed()
.await?;
self.services
.state_cache