Parallelize state_at_incoming sstatehash/prev_event fetches.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-07-08 22:51:38 +00:00
parent 190269f616
commit ed0b3c764e

View File

@@ -7,7 +7,7 @@ use std::{
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::try_join}; use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::try_join};
use ruma::{OwnedEventId, RoomId, RoomVersionId}; use ruma::{OwnedEventId, RoomId, RoomVersionId};
use tuwunel_core::{ use tuwunel_core::{
Result, debug, err, implement, Result, err, implement,
matrix::{Event, StateMap}, matrix::{Event, StateMap},
trace, trace,
utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryWidebandExt}, utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryWidebandExt},
@@ -26,7 +26,12 @@ pub(super) async fn state_at_incoming_degree_one<Pdu>(
where where
Pdu: Event, Pdu: Event,
{ {
let prev_event = incoming_pdu debug_assert!(
incoming_pdu.prev_events().count() == 1,
"Incoming PDU must have one prev_event to make this call"
);
let prev_event_id = incoming_pdu
.prev_events() .prev_events()
.next() .next()
.expect("at least one prev_event"); .expect("at least one prev_event");
@@ -34,35 +39,35 @@ where
let Ok(prev_event_sstatehash) = self let Ok(prev_event_sstatehash) = self
.services .services
.state_accessor .state_accessor
.pdu_shortstatehash(prev_event) .pdu_shortstatehash(prev_event_id)
.await .await
else { else {
return Ok(None); return Ok(None);
}; };
let mut state: HashMap<_, _> = self let prev_event = self
.services
.timeline
.get_pdu(prev_event_id)
.map_err(|e| err!(Database("Could not find prev_event, but we know the state: {e:?}")));
let state = self
.services .services
.state_accessor .state_accessor
.state_full_ids(prev_event_sstatehash) .state_full_ids(prev_event_sstatehash)
.collect() .collect::<HashMap<_, _>>()
.await; .map(Ok);
debug!("Using cached state"); let (prev_event, mut state) = try_join(prev_event, state).await?;
let prev_pdu = self
.services
.timeline
.get_pdu(prev_event)
.await
.map_err(|e| err!(Database("Could not find prev event, but we know the state: {e:?}")))?;
if let Some(state_key) = &prev_pdu.state_key { if let Some(state_key) = prev_event.state_key {
let shortstatekey = self let shortstatekey = self
.services .services
.short .short
.get_or_create_shortstatekey(&prev_pdu.kind.to_string().into(), state_key) .get_or_create_shortstatekey(&prev_event.kind.into(), &state_key)
.await; .await;
state.insert(shortstatekey, prev_event.to_owned()); state.insert(shortstatekey, prev_event.event_id);
// Now it's the state after the pdu // Now it's the state after the pdu
} }
@@ -82,21 +87,24 @@ pub(super) async fn state_at_incoming_resolved<Pdu>(
where where
Pdu: Event, Pdu: Event,
{ {
debug_assert!(
incoming_pdu.prev_events().count() > 1,
"Incoming PDU should have more than one prev_event for this codepath"
);
trace!("Calculating extremity statehashes..."); trace!("Calculating extremity statehashes...");
let Ok(extremity_sstatehashes) = incoming_pdu let Ok(extremity_sstatehashes) = incoming_pdu
.prev_events() .prev_events()
.try_stream() .try_stream()
.broad_and_then(|prev_eventid| { .broad_and_then(|prev_event_id| {
self.services let prev_event = self.services.timeline.get_pdu(prev_event_id);
.timeline
.get_pdu(prev_eventid) let sstatehash = self
.map_ok(move |prev_event| (prev_eventid, prev_event)) .services
})
.broad_and_then(|(prev_eventid, prev_event)| {
self.services
.state_accessor .state_accessor
.pdu_shortstatehash(prev_eventid) .pdu_shortstatehash(prev_event_id);
.map_ok(move |sstatehash| (sstatehash, prev_event))
try_join(sstatehash, prev_event)
}) })
.try_collect::<HashMap<_, _>>() .try_collect::<HashMap<_, _>>()
.await .await