diff --git a/src/admin/debug/commands.rs b/src/admin/debug/commands.rs index 56fa0aa3..123f38dd 100644 --- a/src/admin/debug/commands.rs +++ b/src/admin/debug/commands.rs @@ -270,7 +270,7 @@ pub(super) async fn get_remote_pdu( })?; trace!("Attempting to parse PDU: {:?}", &response.pdu); - let _parsed_pdu = { + let (room_id, ..) = { let parsed_result = self .services .event_handler @@ -278,22 +278,20 @@ pub(super) async fn get_remote_pdu( .boxed() .await; - let (event_id, value, room_id) = match parsed_result { + match parsed_result { | Ok(t) => t, | Err(e) => { warn!("Failed to parse PDU: {e}"); info!("Full PDU: {:?}", &response.pdu); return Err!("Failed to parse PDU remote server {server} sent us: {e}"); }, - }; - - vec![(event_id, value, room_id)] + } }; info!("Attempting to handle event ID {event_id} as backfilled PDU"); self.services .timeline - .backfill_pdu(&server, response.pdu) + .backfill_pdu(&room_id, &server, response.pdu) .await?; let text = serde_json::to_string_pretty(&json)?; diff --git a/src/service/rooms/event_handler/handle_incoming_pdu.rs b/src/service/rooms/event_handler/handle_incoming_pdu.rs index ba20f89c..ab47f5b7 100644 --- a/src/service/rooms/event_handler/handle_incoming_pdu.rs +++ b/src/service/rooms/event_handler/handle_incoming_pdu.rs @@ -8,6 +8,7 @@ use tuwunel_core::{ debug::INFO_SPAN_LEVEL, err, implement, matrix::{Event, room_version}, + trace, utils::stream::IterStream, warn, }; @@ -59,6 +60,7 @@ pub async fn handle_incoming_pdu<'a>( ) -> Result> { // 1. Skip the PDU if we already have it as a timeline event if let Ok(pdu_id) = self.services.timeline.get_pdu_id(event_id).await { + trace!(?event_id, "exists"); return Ok(Some(pdu_id)); } diff --git a/src/service/rooms/timeline/backfill.rs b/src/service/rooms/timeline/backfill.rs index e766347b..cd22b818 100644 --- a/src/service/rooms/timeline/backfill.rs +++ b/src/service/rooms/timeline/backfill.rs @@ -1,6 +1,9 @@ use std::iter::once; -use futures::{FutureExt, StreamExt}; +use futures::{ + FutureExt, StreamExt, TryFutureExt, + future::{join, try_join, try_join4}, +}; use ruma::{ CanonicalJsonObject, EventId, RoomId, ServerName, api::federation, @@ -11,12 +14,15 @@ use ruma::{ }; use serde_json::value::RawValue as RawJsonValue; use tuwunel_core::{ - Result, debug, debug_warn, implement, info, + Result, debug, debug_info, debug_warn, implement, is_false, matrix::{ event::Event, pdu::{PduCount, PduId, RawPduId}, }, - utils::{IterStream, ReadyExt}, + utils::{ + IterStream, ReadyExt, + future::{BoolExt, TryExtExt}, + }, validated, warn, }; use tuwunel_database::Json; @@ -26,39 +32,47 @@ use super::ExtractBody; #[implement(super::Service)] #[tracing::instrument(name = "backfill", level = "debug", skip(self))] pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Result { - if self - .services - .state_cache - .room_joined_count(room_id) - .await - .is_ok_and(|count| count <= 1) - && !self - .services - .state_accessor - .is_world_readable(room_id) - .await - { - // Room is empty (1 user or none), there is no one that can backfill - return Ok(()); - } - let first_pdu = self .first_item_in_room(room_id) .await .expect("Room is not empty"); + // No backfill required, there are still events between them if first_pdu.0 < from { - // No backfill required, there are still events between them return Ok(()); } - let power_levels: RoomPowerLevelsEventContent = self + let empty_room = self + .services + .state_cache + .room_joined_count(room_id) + .map_ok_or(true, |count| count <= 1); + + let not_world_readable = self + .services + .state_accessor + .is_world_readable(room_id) + .map(is_false!()); + + // Room is empty (1 user or none), there is no one that can backfill + if empty_room.and(not_world_readable).await { + return Ok(()); + } + + let canonical_alias = self + .services + .state_accessor + .get_canonical_alias(room_id); + + let power_levels = self .services .state_accessor .room_state_get_content(room_id, &StateEventType::RoomPowerLevels, "") - .await + .map_ok(|content: RoomPowerLevelsEventContent| content) .unwrap_or_default(); + let (canonical_alias, power_levels) = join(canonical_alias, power_levels).await; + let room_mods = power_levels .users .iter() @@ -72,12 +86,6 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re } }); - let canonical_alias = self - .services - .state_accessor - .get_canonical_alias(room_id) - .await; - let canonical_room_alias_server = once(canonical_alias) .filter_map(Result::ok) .map(|alias| alias.server_name().to_owned()) @@ -108,82 +116,85 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re .boxed(); while let Some(ref backfill_server) = servers.next().await { - info!("Asking {backfill_server} for backfill"); - let response = self + let request = federation::backfill::get_backfill::v1::Request { + room_id: room_id.to_owned(), + v: vec![first_pdu.1.event_id().to_owned()], + limit: uint!(100), + }; + + debug_info!("Asking {backfill_server} for backfill"); + if let Ok(response) = self .services .sending - .send_federation_request( - backfill_server, - federation::backfill::get_backfill::v1::Request { - room_id: room_id.to_owned(), - v: vec![first_pdu.1.event_id().to_owned()], - limit: uint!(100), - }, - ) - .await; - - match response { - | Ok(response) => { - for pdu in response.pdus { - if let Err(e) = self.backfill_pdu(backfill_server, pdu).await { + .send_federation_request(backfill_server, request) + .inspect_err(|e| { + warn!("{backfill_server} failed backfilling for room {room_id}: {e}"); + }) + .await + { + return response + .pdus + .into_iter() + .stream() + .for_each(async |pdu| { + if let Err(e) = self + .backfill_pdu(room_id, backfill_server, pdu) + .await + { debug_warn!("Failed to add backfilled pdu in room {room_id}: {e}"); } - } - return Ok(()); - }, - | Err(e) => { - warn!("{backfill_server} failed to provide backfill for room {room_id}: {e}"); - }, + }) + .map(Ok) + .await; } } - info!("No servers could backfill, but backfill was needed in room {room_id}"); + warn!("No servers could backfill, but backfill was needed in room {room_id}"); + Ok(()) } #[implement(super::Service)] #[tracing::instrument(skip(self, pdu), level = "debug")] -pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box) -> Result { - let (room_id, event_id, value) = self +pub async fn backfill_pdu( + &self, + room_id: &RoomId, + origin: &ServerName, + pdu: Box, +) -> Result { + let parsed = self .services .event_handler - .parse_incoming_pdu(&pdu) - .await?; + .parse_incoming_pdu(&pdu); // Lock so we cannot backfill the same pdu twice at the same time let mutex_lock = self .services .event_handler .mutex_federation - .lock(&room_id) - .await; + .lock(room_id) + .map(Ok); - // Skip the PDU if we already have it as a timeline event - if let Ok(pdu_id) = self.get_pdu_id(&event_id).await { - debug!("We already know {event_id} at {pdu_id:?}"); - return Ok(()); - } + let ((_, event_id, value), mutex_lock) = try_join(parsed, mutex_lock).await?; self.services .event_handler - .handle_incoming_pdu(origin, &room_id, &event_id, value, false) + .handle_incoming_pdu(origin, room_id, &event_id, value, false) .boxed() .await?; - let value = self.get_pdu_json(&event_id).await?; + let pdu = self.get_pdu(&event_id); - let pdu = self.get_pdu(&event_id).await?; + let value = self.get_pdu_json(&event_id); - let shortroomid = self - .services - .short - .get_shortroomid(&room_id) - .await?; + let shortroomid = self.services.short.get_shortroomid(room_id); - let insert_lock = self.mutex_insert.lock(&room_id).await; + let insert_lock = self.mutex_insert.lock(room_id).map(Ok); + + let (pdu, value, shortroomid, insert_lock) = + try_join4(pdu, value, shortroomid, insert_lock).await?; let count = self.services.globals.next_count(); - let count: i64 = (*count).try_into()?; let pdu_id: RawPduId = PduId { shortroomid, @@ -193,7 +204,6 @@ pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box) -> // Insert pdu self.prepend_backfill_pdu(&pdu_id, &event_id, &value); - drop(insert_lock); if pdu.kind == TimelineEventType::RoomMessage {