Optimize backfill_if_required conditions.

Optimize backfill_pdu.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-08-29 04:46:11 +00:00
parent 54ce221e88
commit 12bdfe3b66
3 changed files with 89 additions and 79 deletions

View File

@@ -270,7 +270,7 @@ pub(super) async fn get_remote_pdu(
})?;
trace!("Attempting to parse PDU: {:?}", &response.pdu);
let _parsed_pdu = {
let (room_id, ..) = {
let parsed_result = self
.services
.event_handler
@@ -278,22 +278,20 @@ pub(super) async fn get_remote_pdu(
.boxed()
.await;
let (event_id, value, room_id) = match parsed_result {
match parsed_result {
| Ok(t) => t,
| Err(e) => {
warn!("Failed to parse PDU: {e}");
info!("Full PDU: {:?}", &response.pdu);
return Err!("Failed to parse PDU remote server {server} sent us: {e}");
},
};
vec![(event_id, value, room_id)]
}
};
info!("Attempting to handle event ID {event_id} as backfilled PDU");
self.services
.timeline
.backfill_pdu(&server, response.pdu)
.backfill_pdu(&room_id, &server, response.pdu)
.await?;
let text = serde_json::to_string_pretty(&json)?;

View File

@@ -8,6 +8,7 @@ use tuwunel_core::{
debug::INFO_SPAN_LEVEL,
err, implement,
matrix::{Event, room_version},
trace,
utils::stream::IterStream,
warn,
};
@@ -59,6 +60,7 @@ pub async fn handle_incoming_pdu<'a>(
) -> Result<Option<RawPduId>> {
// 1. Skip the PDU if we already have it as a timeline event
if let Ok(pdu_id) = self.services.timeline.get_pdu_id(event_id).await {
trace!(?event_id, "exists");
return Ok(Some(pdu_id));
}

View File

@@ -1,6 +1,9 @@
use std::iter::once;
use futures::{FutureExt, StreamExt};
use futures::{
FutureExt, StreamExt, TryFutureExt,
future::{join, try_join, try_join4},
};
use ruma::{
CanonicalJsonObject, EventId, RoomId, ServerName,
api::federation,
@@ -11,12 +14,15 @@ use ruma::{
};
use serde_json::value::RawValue as RawJsonValue;
use tuwunel_core::{
Result, debug, debug_warn, implement, info,
Result, debug, debug_info, debug_warn, implement, is_false,
matrix::{
event::Event,
pdu::{PduCount, PduId, RawPduId},
},
utils::{IterStream, ReadyExt},
utils::{
IterStream, ReadyExt,
future::{BoolExt, TryExtExt},
},
validated, warn,
};
use tuwunel_database::Json;
@@ -26,39 +32,47 @@ use super::ExtractBody;
#[implement(super::Service)]
#[tracing::instrument(name = "backfill", level = "debug", skip(self))]
pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Result {
if self
.services
.state_cache
.room_joined_count(room_id)
.await
.is_ok_and(|count| count <= 1)
&& !self
.services
.state_accessor
.is_world_readable(room_id)
.await
{
// Room is empty (1 user or none), there is no one that can backfill
return Ok(());
}
let first_pdu = self
.first_item_in_room(room_id)
.await
.expect("Room is not empty");
// No backfill required, there are still events between them
if first_pdu.0 < from {
// No backfill required, there are still events between them
return Ok(());
}
let power_levels: RoomPowerLevelsEventContent = self
let empty_room = self
.services
.state_cache
.room_joined_count(room_id)
.map_ok_or(true, |count| count <= 1);
let not_world_readable = self
.services
.state_accessor
.is_world_readable(room_id)
.map(is_false!());
// Room is empty (1 user or none), there is no one that can backfill
if empty_room.and(not_world_readable).await {
return Ok(());
}
let canonical_alias = self
.services
.state_accessor
.get_canonical_alias(room_id);
let power_levels = self
.services
.state_accessor
.room_state_get_content(room_id, &StateEventType::RoomPowerLevels, "")
.await
.map_ok(|content: RoomPowerLevelsEventContent| content)
.unwrap_or_default();
let (canonical_alias, power_levels) = join(canonical_alias, power_levels).await;
let room_mods = power_levels
.users
.iter()
@@ -72,12 +86,6 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re
}
});
let canonical_alias = self
.services
.state_accessor
.get_canonical_alias(room_id)
.await;
let canonical_room_alias_server = once(canonical_alias)
.filter_map(Result::ok)
.map(|alias| alias.server_name().to_owned())
@@ -108,82 +116,85 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re
.boxed();
while let Some(ref backfill_server) = servers.next().await {
info!("Asking {backfill_server} for backfill");
let response = self
let request = federation::backfill::get_backfill::v1::Request {
room_id: room_id.to_owned(),
v: vec![first_pdu.1.event_id().to_owned()],
limit: uint!(100),
};
debug_info!("Asking {backfill_server} for backfill");
if let Ok(response) = self
.services
.sending
.send_federation_request(
backfill_server,
federation::backfill::get_backfill::v1::Request {
room_id: room_id.to_owned(),
v: vec![first_pdu.1.event_id().to_owned()],
limit: uint!(100),
},
)
.await;
match response {
| Ok(response) => {
for pdu in response.pdus {
if let Err(e) = self.backfill_pdu(backfill_server, pdu).await {
.send_federation_request(backfill_server, request)
.inspect_err(|e| {
warn!("{backfill_server} failed backfilling for room {room_id}: {e}");
})
.await
{
return response
.pdus
.into_iter()
.stream()
.for_each(async |pdu| {
if let Err(e) = self
.backfill_pdu(room_id, backfill_server, pdu)
.await
{
debug_warn!("Failed to add backfilled pdu in room {room_id}: {e}");
}
}
return Ok(());
},
| Err(e) => {
warn!("{backfill_server} failed to provide backfill for room {room_id}: {e}");
},
})
.map(Ok)
.await;
}
}
info!("No servers could backfill, but backfill was needed in room {room_id}");
warn!("No servers could backfill, but backfill was needed in room {room_id}");
Ok(())
}
#[implement(super::Service)]
#[tracing::instrument(skip(self, pdu), level = "debug")]
pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box<RawJsonValue>) -> Result {
let (room_id, event_id, value) = self
pub async fn backfill_pdu(
&self,
room_id: &RoomId,
origin: &ServerName,
pdu: Box<RawJsonValue>,
) -> Result {
let parsed = self
.services
.event_handler
.parse_incoming_pdu(&pdu)
.await?;
.parse_incoming_pdu(&pdu);
// Lock so we cannot backfill the same pdu twice at the same time
let mutex_lock = self
.services
.event_handler
.mutex_federation
.lock(&room_id)
.await;
.lock(room_id)
.map(Ok);
// Skip the PDU if we already have it as a timeline event
if let Ok(pdu_id) = self.get_pdu_id(&event_id).await {
debug!("We already know {event_id} at {pdu_id:?}");
return Ok(());
}
let ((_, event_id, value), mutex_lock) = try_join(parsed, mutex_lock).await?;
self.services
.event_handler
.handle_incoming_pdu(origin, &room_id, &event_id, value, false)
.handle_incoming_pdu(origin, room_id, &event_id, value, false)
.boxed()
.await?;
let value = self.get_pdu_json(&event_id).await?;
let pdu = self.get_pdu(&event_id);
let pdu = self.get_pdu(&event_id).await?;
let value = self.get_pdu_json(&event_id);
let shortroomid = self
.services
.short
.get_shortroomid(&room_id)
.await?;
let shortroomid = self.services.short.get_shortroomid(room_id);
let insert_lock = self.mutex_insert.lock(&room_id).await;
let insert_lock = self.mutex_insert.lock(room_id).map(Ok);
let (pdu, value, shortroomid, insert_lock) =
try_join4(pdu, value, shortroomid, insert_lock).await?;
let count = self.services.globals.next_count();
let count: i64 = (*count).try_into()?;
let pdu_id: RawPduId = PduId {
shortroomid,
@@ -193,7 +204,6 @@ pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box<RawJsonValue>) ->
// Insert pdu
self.prepend_backfill_pdu(&pdu_id, &event_id, &value);
drop(insert_lock);
if pdu.kind == TimelineEventType::RoomMessage {