diff --git a/src/service/rooms/timeline/backfill.rs b/src/service/rooms/timeline/backfill.rs index 8f125f87..d756120e 100644 --- a/src/service/rooms/timeline/backfill.rs +++ b/src/service/rooms/timeline/backfill.rs @@ -1,15 +1,12 @@ -use std::iter::once; +use std::{collections::HashSet, iter::once}; use futures::{ FutureExt, StreamExt, TryFutureExt, future::{join, try_join, try_join4}, }; +use rand::seq::SliceRandom; use ruma::{ - CanonicalJsonObject, EventId, RoomId, ServerName, - api::federation, - events::{ - StateEventType, TimelineEventType, room::power_levels::RoomPowerLevelsEventContent, - }, + CanonicalJsonObject, EventId, RoomId, ServerName, api::federation, events::TimelineEventType, uint, }; use serde_json::value::RawValue as RawJsonValue; @@ -20,8 +17,8 @@ use tuwunel_core::{ pdu::{PduCount, PduId, RawPduId}, }, utils::{ - IterStream, ReadyExt, - future::{BoolExt, TryExtExt}, + BoolExt, IterStream, ReadyExt, + future::{BoolExt as FutureBoolExt, TryExtExt}, }, validated, warn, }; @@ -72,19 +69,43 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re let power_levels = self .services .state_accessor - .room_state_get_content(room_id, &StateEventType::RoomPowerLevels, "") - .map_ok(|content: RoomPowerLevelsEventContent| content) - .unwrap_or_default(); + .get_power_levels(room_id); let (canonical_alias, power_levels) = join(canonical_alias, power_levels).await; - let room_mods = power_levels - .users + let power_servers = power_levels .iter() - .filter_map(|(user_id, level)| { - (*level > power_levels.users_default && !self.services.globals.user_is_local(user_id)) + .flat_map(|power| { + power + .rules + .privileged_creators + .iter() + .flat_map(|creators| creators.iter()) + }) + .chain(power_levels.iter().flat_map(|power| { + power + .users + .iter() + .filter_map(|(user_id, level)| level.gt(&power.users_default).then_some(user_id)) + })) + .filter_map(|user_id| { + self.services + .globals + .user_is_local(user_id) + .is_false() .then_some(user_id.server_name()) - }); + }) + .collect::>(); + + let power_servers = { + let mut vec: Vec<_> = power_servers + .into_iter() + .map(ToOwned::to_owned) + .collect(); + + vec.shuffle(&mut rand::thread_rng()); + vec.into_iter().stream() + }; let canonical_room_alias_server = once(canonical_alias) .filter_map(Result::ok) @@ -100,9 +121,7 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re .map(ToOwned::to_owned) .stream(); - let mut servers = room_mods - .stream() - .map(ToOwned::to_owned) + let mut servers = power_servers .chain(canonical_room_alias_server) .chain(trusted_servers) .ready_filter(|server_name| !self.services.globals.server_is_ours(server_name))