diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index a28cb887..98424954 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -1,20 +1,17 @@ use std::{collections::HashMap, fmt::Write, iter::once, sync::Arc}; use async_trait::async_trait; -use futures::{ - FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::join_all, pin_mut, -}; +use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::join_all}; use ruma::{ EventId, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId, UserId, - events::{ - AnyStrippedStateEvent, StateEventType, TimelineEventType, - room::member::RoomMemberEventContent, - }, + events::{AnyStrippedStateEvent, StateEventType, TimelineEventType}, room_version_rules::AuthorizationRules, serde::Raw, }; use tuwunel_core::{ - Event, PduEvent, Result, err, implement, + Event, PduEvent, Result, err, + error::inspect_debug_log, + implement, matrix::{RoomVersionRules, StateKey, TypeStateKey, room_version}, result::{AndThenRef, FlatOk}, state_res::{StateMap, auth_types_for_event}, @@ -22,7 +19,7 @@ use tuwunel_core::{ utils::{ IterStream, MutexMap, MutexMapGuard, ReadyExt, calculate_hash, mutex_map::Guard, - stream::{BroadbandExt, TryIgnore}, + stream::{BroadbandExt, TryIgnore, WidebandExt}, }, warn, }; @@ -94,24 +91,27 @@ pub async fn force_state( _statediffremoved: Arc, state_lock: &RoomMutexGuard, ) -> Result { - let event_ids = statediffnew + statediffnew .iter() .stream() .map(|&new| parse_compressed_state_event(new).1) - .then(|shorteventid| { - self.services + .wide_filter_map(async |shorteventid| { + let event_id: OwnedEventId = self + .services .short - .get_eventid_from_short::>(shorteventid) + .get_eventid_from_short(shorteventid) + .inspect_err(inspect_debug_log) + .await + .ok()?; + + self.services + .timeline + .get_pdu(&event_id) + .await + .ok() }) - .ignore_err(); - - pin_mut!(event_ids); - while let Some(event_id) = event_ids.next().await { - let Ok(pdu) = self.services.timeline.get_pdu(&event_id).await else { - continue; - }; - - match pdu.kind { + .map(Ok) + .try_for_each(async |pdu| match pdu.kind { | TimelineEventType::RoomMember => { let Some(user_id) = pdu .state_key @@ -119,11 +119,11 @@ pub async fn force_state( .map(UserId::parse) .flat_ok() else { - continue; + return Ok(()); }; - let Ok(membership_event) = pdu.get_content::() else { - continue; + let Ok(membership_event) = pdu.get_content() else { + return Ok(()); }; self.services @@ -137,7 +137,7 @@ pub async fn force_state( None, false, ) - .await?; + .await }, | TimelineEventType::SpaceChild => { self.services @@ -146,10 +146,13 @@ pub async fn force_state( .lock() .await .remove(&pdu.room_id); + + Ok(()) }, - | _ => continue, - } - } + | _ => Ok(()), + }) + .boxed() + .await?; self.services .state_cache