Optimize Pdu prev_events and auth_events containers.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-09-23 18:12:06 +00:00
parent 887a22dabd
commit aa37e32471
11 changed files with 62 additions and 50 deletions

View File

@@ -195,6 +195,7 @@ async fn send_state_event_for_key_helper(
room_id, room_id,
&state_lock, &state_lock,
) )
.boxed()
.await?; .await?;
Ok(event_id) Ok(event_id)

View File

@@ -533,9 +533,9 @@ async fn handle_left_room(
unsigned: None, unsigned: None,
// The following keys are dropped on conversion // The following keys are dropped on conversion
room_id: room_id.clone(), room_id: room_id.clone(),
prev_events: vec![], prev_events: Default::default(),
auth_events: Default::default(),
depth: uint!(1), depth: uint!(1),
auth_events: vec![],
redacts: None, redacts: None,
hashes: EventHash::default(), hashes: EventHash::default(),
signatures: None, signatures: None,

View File

@@ -161,12 +161,10 @@ async fn handle(
}) })
.try_flatten() .try_flatten()
.try_collect() .try_collect()
.boxed()
.await?; .await?;
// evaluate edus after pdus, at least for now. // evaluate edus after pdus, at least for now.
edus.for_each_concurrent(automatic_width(), |edu| handle_edu(services, client, origin, edu)) edus.for_each_concurrent(automatic_width(), |edu| handle_edu(services, client, origin, edu))
.boxed()
.await; .await;
Ok(results) Ok(results)
@@ -205,6 +203,7 @@ async fn handle_room(
Ok((event_id, result)) Ok((event_id, result))
}) })
.try_collect() .try_collect()
.boxed()
.await .await
} }

View File

@@ -17,6 +17,7 @@ use ruma::{
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::value::RawValue as RawJsonValue; use serde_json::value::RawValue as RawJsonValue;
use smallvec::SmallVec;
pub use self::{ pub use self::{
Count as PduCount, Id as PduId, Pdu as PduEvent, RawId as RawPduId, Count as PduCount, Id as PduId, Pdu as PduEvent, RawId as RawPduId,
@@ -32,39 +33,39 @@ use crate::Result;
/// Persistent Data Unit (Event) /// Persistent Data Unit (Event)
#[derive(Clone, Deserialize, Serialize, Debug)] #[derive(Clone, Deserialize, Serialize, Debug)]
pub struct Pdu { pub struct Pdu {
#[serde(rename = "type")]
pub kind: TimelineEventType,
pub content: Box<RawJsonValue>,
pub event_id: OwnedEventId, pub event_id: OwnedEventId,
pub room_id: OwnedRoomId, pub room_id: OwnedRoomId,
pub sender: OwnedUserId, pub sender: OwnedUserId,
#[serde(skip_serializing_if = "Option::is_none")]
pub origin: Option<OwnedServerName>,
pub origin_server_ts: UInt,
#[serde(rename = "type")]
pub kind: TimelineEventType,
pub content: Box<RawJsonValue>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub state_key: Option<StateKey>, pub state_key: Option<StateKey>,
pub prev_events: Vec<OwnedEventId>,
pub depth: UInt,
pub auth_events: Vec<OwnedEventId>,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub redacts: Option<OwnedEventId>, pub redacts: Option<OwnedEventId>,
#[serde(default, skip_serializing_if = "Option::is_none")] pub prev_events: PrevEvents,
pub unsigned: Option<Box<RawJsonValue>>,
pub auth_events: AuthEvents,
pub origin_server_ts: UInt,
pub depth: UInt,
pub hashes: EventHash, pub hashes: EventHash,
#[serde(skip_serializing_if = "Option::is_none")]
pub origin: Option<OwnedServerName>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub unsigned: Option<Box<RawJsonValue>>,
// BTreeMap<Box<ServerName>, BTreeMap<ServerSigningKeyId, String>> // BTreeMap<Box<ServerName>, BTreeMap<ServerSigningKeyId, String>>
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
pub signatures: Option<Box<RawJsonValue>>, pub signatures: Option<Box<RawJsonValue>>,
@@ -75,6 +76,16 @@ pub struct Pdu {
pub rejected: bool, pub rejected: bool,
} }
/// Tuned prev_events vector. Most events have one prev_event. Many events have
/// more but allocations for all of those cases still beats allocations for all
/// cases.
pub type PrevEvents = SmallVec<[OwnedEventId; 1]>;
/// Tuned auth_events vector. Average events have three auth events. It is
/// debatable whether this could be an ArrayVec but the realistic upper-bound is
/// too high and non-deterministic in the era of restricted-type rooms.
pub type AuthEvents = SmallVec<[OwnedEventId; 3]>;
/// The [maximum size allowed] for a PDU. /// The [maximum size allowed] for a PDU.
/// [maximum size allowed]: https://spec.matrix.org/latest/client-server-api/#size-limits /// [maximum size allowed]: https://spec.matrix.org/latest/client-server-api/#size-limits
pub const MAX_PDU_BYTES: usize = 65_535; pub const MAX_PDU_BYTES: usize = 65_535;

View File

@@ -433,13 +433,13 @@ where
.iter() .iter()
.map(AsRef::as_ref) .map(AsRef::as_ref)
.map(event_id) .map(event_id)
.collect::<Vec<_>>(); .collect();
let prev_events = prev_events let prev_events = prev_events
.iter() .iter()
.map(AsRef::as_ref) .map(AsRef::as_ref)
.map(event_id) .map(event_id)
.collect::<Vec<_>>(); .collect();
let state_key = state_key.map(ToOwned::to_owned); let state_key = state_key.map(ToOwned::to_owned);
PduEvent { PduEvent {

View File

@@ -669,8 +669,8 @@ async fn auth_event_in_different_room() {
content: to_raw_json_value(&json!({ "users": { alice(): 100 } })).unwrap(), content: to_raw_json_value(&json!({ "users": { alice(): 100 } })).unwrap(),
redacts: None, redacts: None,
unsigned: None, unsigned: None,
auth_events: vec![event_id("CREATE"), event_id("IMA")], auth_events: vec![event_id("CREATE"), event_id("IMA")].into(),
prev_events: vec![event_id("IMA")], prev_events: vec![event_id("IMA")].into(),
depth: uint!(0), depth: uint!(0),
hashes: EventHash::default(), hashes: EventHash::default(),
signatures: None, signatures: None,
@@ -807,8 +807,8 @@ async fn rejected_auth_event() {
content: to_raw_json_value(&json!({ "users": { alice(): 100 } })).unwrap(), content: to_raw_json_value(&json!({ "users": { alice(): 100 } })).unwrap(),
redacts: None, redacts: None,
unsigned: None, unsigned: None,
auth_events: vec![event_id("CREATE"), event_id("IMA")], auth_events: vec![event_id("CREATE"), event_id("IMA")].into(),
prev_events: vec![event_id("IMA")], prev_events: vec![event_id("IMA")].into(),
depth: uint!(0), depth: uint!(0),
hashes: EventHash::default(), hashes: EventHash::default(),
signatures: None, signatures: None,
@@ -893,12 +893,13 @@ async fn event_without_room_id() {
content: to_raw_json_value(&RoomMessageEventContent::text_plain("Hi!")).unwrap(), content: to_raw_json_value(&RoomMessageEventContent::text_plain("Hi!")).unwrap(),
redacts: None, redacts: None,
unsigned: None, unsigned: None,
auth_events: vec![ auth_events: [
owned_event_id!("$CREATE"), owned_event_id!("$CREATE"),
owned_event_id!("$IMA"), owned_event_id!("$IMA"),
owned_event_id!("$IPOWER"), owned_event_id!("$IPOWER"),
], ]
prev_events: vec![owned_event_id!("$IPOWER")], .into(),
prev_events: [owned_event_id!("$IPOWER")].into(),
depth: uint!(0), depth: uint!(0),
hashes: EventHash::default(), hashes: EventHash::default(),
signatures: None, signatures: None,

View File

@@ -163,7 +163,7 @@ async fn topic_basic() {
.finish(), .finish(),
); );
let events = &[ let events = vec![
to_init_pdu_event( to_init_pdu_event(
"T1", "T1",
alice(), alice(),
@@ -219,7 +219,7 @@ async fn topic_basic() {
.map(event_id) .map(event_id)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
do_check(events, edges, expected_state_ids).await; do_check(&events, edges, expected_state_ids).await;
} }
#[tokio::test] #[tokio::test]

View File

@@ -428,13 +428,13 @@ pub(super) fn to_init_pdu_event(
content: Box<RawJsonValue>, content: Box<RawJsonValue>,
) -> PduEvent { ) -> PduEvent {
let ts = SERVER_TIMESTAMP.fetch_add(1, SeqCst); let ts = SERVER_TIMESTAMP.fetch_add(1, SeqCst);
let state_key = state_key.map(ToOwned::to_owned);
let id = if id.contains('$') { let id = if id.contains('$') {
id.to_owned() id.to_owned()
} else { } else {
format!("${id}:foo") format!("${id}:foo")
}; };
let state_key = state_key.map(ToOwned::to_owned);
PduEvent { PduEvent {
event_id: id.try_into().unwrap(), event_id: id.try_into().unwrap(),
room_id: room_id().to_owned(), room_id: room_id().to_owned(),
@@ -446,8 +446,8 @@ pub(super) fn to_init_pdu_event(
content, content,
redacts: None, redacts: None,
unsigned: None, unsigned: None,
auth_events: vec![], auth_events: Default::default(),
prev_events: vec![], prev_events: Default::default(),
depth: uint!(0), depth: uint!(0),
hashes: EventHash::default(), hashes: EventHash::default(),
signatures: None, signatures: None,
@@ -468,6 +468,7 @@ where
S: AsRef<str>, S: AsRef<str>,
{ {
let ts = SERVER_TIMESTAMP.fetch_add(1, SeqCst); let ts = SERVER_TIMESTAMP.fetch_add(1, SeqCst);
let state_key = state_key.map(ToOwned::to_owned);
let id = if id.contains('$') { let id = if id.contains('$') {
id.to_owned() id.to_owned()
} else { } else {
@@ -477,14 +478,13 @@ where
.iter() .iter()
.map(AsRef::as_ref) .map(AsRef::as_ref)
.map(event_id) .map(event_id)
.collect::<Vec<_>>(); .collect();
let prev_events = prev_events let prev_events = prev_events
.iter() .iter()
.map(AsRef::as_ref) .map(AsRef::as_ref)
.map(event_id) .map(event_id)
.collect::<Vec<_>>(); .collect();
let state_key = state_key.map(ToOwned::to_owned);
PduEvent { PduEvent {
event_id: id.try_into().unwrap(), event_id: id.try_into().unwrap(),
room_id: room_id().to_owned(), room_id: room_id().to_owned(),
@@ -528,18 +528,18 @@ where
} }
let ts = SERVER_TIMESTAMP.fetch_add(1, SeqCst); let ts = SERVER_TIMESTAMP.fetch_add(1, SeqCst);
let state_key = state_key.map(ToOwned::to_owned);
let auth_events = auth_events let auth_events = auth_events
.iter() .iter()
.map(AsRef::as_ref) .map(AsRef::as_ref)
.map(event_id) .map(event_id)
.collect::<Vec<_>>(); .collect();
let prev_events = prev_events let prev_events = prev_events
.iter() .iter()
.map(AsRef::as_ref) .map(AsRef::as_ref)
.map(event_id) .map(event_id)
.collect::<Vec<_>>(); .collect();
let state_key = state_key.map(ToOwned::to_owned);
PduEvent { PduEvent {
event_id: event_id(id), event_id: event_id(id),
room_id: hydra_room_id().to_owned(), room_id: hydra_room_id().to_owned(),
@@ -581,12 +581,12 @@ where
.iter() .iter()
.map(AsRef::as_ref) .map(AsRef::as_ref)
.map(event_id) .map(event_id)
.collect::<Vec<_>>(); .collect();
let prev_events = prev_events let prev_events = prev_events
.iter() .iter()
.map(AsRef::as_ref) .map(AsRef::as_ref)
.map(event_id) .map(event_id)
.collect::<Vec<_>>(); .collect();
PduEvent { PduEvent {
event_id: id.try_into().unwrap(), event_id: id.try_into().unwrap(),
@@ -636,8 +636,8 @@ pub(super) fn room_create_hydra_pdu_event(
content, content,
redacts: None, redacts: None,
unsigned: None, unsigned: None,
auth_events: vec![], auth_events: Default::default(),
prev_events: vec![], prev_events: Default::default(),
depth: uint!(0), depth: uint!(0),
hashes: EventHash::default(), hashes: EventHash::default(),
signatures: None, signatures: None,

View File

@@ -1,5 +1,6 @@
use std::{ops::Range, time::Duration}; use std::{ops::Range, time::Duration};
use futures::FutureExt;
use ruma::{ use ruma::{
CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, RoomId, RoomVersionId, ServerName, CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, RoomId, RoomVersionId, ServerName,
}; };
@@ -62,6 +63,7 @@ pub(super) async fn handle_prev_pdu(
room_version, room_version,
create_event_id, create_event_id,
) )
.boxed()
.await?; .await?;
Ok(()) Ok(())

View File

@@ -212,13 +212,11 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu(
.services .services
.state_compressor .state_compressor
.save_state(room_id, new_room_state) .save_state(room_id, new_room_state)
.boxed()
.await?; .await?;
self.services self.services
.state .state
.force_state(room_id, shortstatehash, added, removed, &state_lock) .force_state(room_id, shortstatehash, added, removed, &state_lock)
.boxed()
.await?; .await?;
} }

View File

@@ -13,7 +13,7 @@ use tuwunel_core::{
Error, Result, err, implement, Error, Result, err, implement,
matrix::{ matrix::{
event::{Event, StateKey, TypeExt}, event::{Event, StateKey, TypeExt},
pdu::{EventHash, PduBuilder, PduEvent}, pdu::{EventHash, PduBuilder, PduEvent, PrevEvents},
room_version, room_version,
state_res::{self}, state_res::{self},
}, },
@@ -43,7 +43,7 @@ pub async fn create_hash_and_sign_event(
timestamp, timestamp,
} = pdu_builder; } = pdu_builder;
let prev_events: Vec<OwnedEventId> = self let prev_events: PrevEvents = self
.services .services
.state .state
.get_forward_extremities(room_id) .get_forward_extremities(room_id)