From aa37e32471bba847d8aec11e7356ae21dd8275b2 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Tue, 23 Sep 2025 18:12:06 +0000 Subject: [PATCH] Optimize Pdu prev_events and auth_events containers. Signed-off-by: Jason Volk --- src/api/client/state.rs | 1 + src/api/client/sync/v3.rs | 4 +- src/api/server/send.rs | 3 +- src/core/matrix/pdu.rs | 47 ++++++++++++------- src/core/matrix/state_res/benches.rs | 4 +- src/core/matrix/state_res/event_auth/tests.rs | 15 +++--- src/core/matrix/state_res/resolve/tests.rs | 4 +- src/core/matrix/state_res/test_utils.rs | 26 +++++----- .../rooms/event_handler/handle_prev_pdu.rs | 2 + .../event_handler/upgrade_outlier_pdu.rs | 2 - src/service/rooms/timeline/create.rs | 4 +- 11 files changed, 62 insertions(+), 50 deletions(-) diff --git a/src/api/client/state.rs b/src/api/client/state.rs index 812b883d..b8d11d01 100644 --- a/src/api/client/state.rs +++ b/src/api/client/state.rs @@ -195,6 +195,7 @@ async fn send_state_event_for_key_helper( room_id, &state_lock, ) + .boxed() .await?; Ok(event_id) diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 8ff33a77..e1364529 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -533,9 +533,9 @@ async fn handle_left_room( unsigned: None, // The following keys are dropped on conversion room_id: room_id.clone(), - prev_events: vec![], + prev_events: Default::default(), + auth_events: Default::default(), depth: uint!(1), - auth_events: vec![], redacts: None, hashes: EventHash::default(), signatures: None, diff --git a/src/api/server/send.rs b/src/api/server/send.rs index d975edc8..cf9dbc52 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -161,12 +161,10 @@ async fn handle( }) .try_flatten() .try_collect() - .boxed() .await?; // evaluate edus after pdus, at least for now. edus.for_each_concurrent(automatic_width(), |edu| handle_edu(services, client, origin, edu)) - .boxed() .await; Ok(results) @@ -205,6 +203,7 @@ async fn handle_room( Ok((event_id, result)) }) .try_collect() + .boxed() .await } diff --git a/src/core/matrix/pdu.rs b/src/core/matrix/pdu.rs index da7c03af..4ccf040d 100644 --- a/src/core/matrix/pdu.rs +++ b/src/core/matrix/pdu.rs @@ -17,6 +17,7 @@ use ruma::{ }; use serde::{Deserialize, Serialize}; use serde_json::value::RawValue as RawJsonValue; +use smallvec::SmallVec; pub use self::{ Count as PduCount, Id as PduId, Pdu as PduEvent, RawId as RawPduId, @@ -32,39 +33,39 @@ use crate::Result; /// Persistent Data Unit (Event) #[derive(Clone, Deserialize, Serialize, Debug)] pub struct Pdu { + #[serde(rename = "type")] + pub kind: TimelineEventType, + + pub content: Box, + pub event_id: OwnedEventId, pub room_id: OwnedRoomId, pub sender: OwnedUserId, - #[serde(skip_serializing_if = "Option::is_none")] - pub origin: Option, - - pub origin_server_ts: UInt, - - #[serde(rename = "type")] - pub kind: TimelineEventType, - - pub content: Box, - #[serde(skip_serializing_if = "Option::is_none")] pub state_key: Option, - pub prev_events: Vec, - - pub depth: UInt, - - pub auth_events: Vec, - #[serde(skip_serializing_if = "Option::is_none")] pub redacts: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub unsigned: Option>, + pub prev_events: PrevEvents, + + pub auth_events: AuthEvents, + + pub origin_server_ts: UInt, + + pub depth: UInt, pub hashes: EventHash, + #[serde(skip_serializing_if = "Option::is_none")] + pub origin: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub unsigned: Option>, + // BTreeMap, BTreeMap> #[serde(default, skip_serializing_if = "Option::is_none")] pub signatures: Option>, @@ -75,6 +76,16 @@ pub struct Pdu { pub rejected: bool, } +/// Tuned prev_events vector. Most events have one prev_event. Many events have +/// more but allocations for all of those cases still beats allocations for all +/// cases. +pub type PrevEvents = SmallVec<[OwnedEventId; 1]>; + +/// Tuned auth_events vector. Average events have three auth events. It is +/// debatable whether this could be an ArrayVec but the realistic upper-bound is +/// too high and non-deterministic in the era of restricted-type rooms. +pub type AuthEvents = SmallVec<[OwnedEventId; 3]>; + /// The [maximum size allowed] for a PDU. /// [maximum size allowed]: https://spec.matrix.org/latest/client-server-api/#size-limits pub const MAX_PDU_BYTES: usize = 65_535; diff --git a/src/core/matrix/state_res/benches.rs b/src/core/matrix/state_res/benches.rs index 2abf9011..be81c81e 100644 --- a/src/core/matrix/state_res/benches.rs +++ b/src/core/matrix/state_res/benches.rs @@ -433,13 +433,13 @@ where .iter() .map(AsRef::as_ref) .map(event_id) - .collect::>(); + .collect(); let prev_events = prev_events .iter() .map(AsRef::as_ref) .map(event_id) - .collect::>(); + .collect(); let state_key = state_key.map(ToOwned::to_owned); PduEvent { diff --git a/src/core/matrix/state_res/event_auth/tests.rs b/src/core/matrix/state_res/event_auth/tests.rs index 1e746ffc..72181c51 100644 --- a/src/core/matrix/state_res/event_auth/tests.rs +++ b/src/core/matrix/state_res/event_auth/tests.rs @@ -669,8 +669,8 @@ async fn auth_event_in_different_room() { content: to_raw_json_value(&json!({ "users": { alice(): 100 } })).unwrap(), redacts: None, unsigned: None, - auth_events: vec![event_id("CREATE"), event_id("IMA")], - prev_events: vec![event_id("IMA")], + auth_events: vec![event_id("CREATE"), event_id("IMA")].into(), + prev_events: vec![event_id("IMA")].into(), depth: uint!(0), hashes: EventHash::default(), signatures: None, @@ -807,8 +807,8 @@ async fn rejected_auth_event() { content: to_raw_json_value(&json!({ "users": { alice(): 100 } })).unwrap(), redacts: None, unsigned: None, - auth_events: vec![event_id("CREATE"), event_id("IMA")], - prev_events: vec![event_id("IMA")], + auth_events: vec![event_id("CREATE"), event_id("IMA")].into(), + prev_events: vec![event_id("IMA")].into(), depth: uint!(0), hashes: EventHash::default(), signatures: None, @@ -893,12 +893,13 @@ async fn event_without_room_id() { content: to_raw_json_value(&RoomMessageEventContent::text_plain("Hi!")).unwrap(), redacts: None, unsigned: None, - auth_events: vec![ + auth_events: [ owned_event_id!("$CREATE"), owned_event_id!("$IMA"), owned_event_id!("$IPOWER"), - ], - prev_events: vec![owned_event_id!("$IPOWER")], + ] + .into(), + prev_events: [owned_event_id!("$IPOWER")].into(), depth: uint!(0), hashes: EventHash::default(), signatures: None, diff --git a/src/core/matrix/state_res/resolve/tests.rs b/src/core/matrix/state_res/resolve/tests.rs index cac26a6e..794a6ce7 100644 --- a/src/core/matrix/state_res/resolve/tests.rs +++ b/src/core/matrix/state_res/resolve/tests.rs @@ -163,7 +163,7 @@ async fn topic_basic() { .finish(), ); - let events = &[ + let events = vec![ to_init_pdu_event( "T1", alice(), @@ -219,7 +219,7 @@ async fn topic_basic() { .map(event_id) .collect::>(); - do_check(events, edges, expected_state_ids).await; + do_check(&events, edges, expected_state_ids).await; } #[tokio::test] diff --git a/src/core/matrix/state_res/test_utils.rs b/src/core/matrix/state_res/test_utils.rs index bbe592cc..b9a4697d 100644 --- a/src/core/matrix/state_res/test_utils.rs +++ b/src/core/matrix/state_res/test_utils.rs @@ -428,13 +428,13 @@ pub(super) fn to_init_pdu_event( content: Box, ) -> PduEvent { let ts = SERVER_TIMESTAMP.fetch_add(1, SeqCst); + let state_key = state_key.map(ToOwned::to_owned); let id = if id.contains('$') { id.to_owned() } else { format!("${id}:foo") }; - let state_key = state_key.map(ToOwned::to_owned); PduEvent { event_id: id.try_into().unwrap(), room_id: room_id().to_owned(), @@ -446,8 +446,8 @@ pub(super) fn to_init_pdu_event( content, redacts: None, unsigned: None, - auth_events: vec![], - prev_events: vec![], + auth_events: Default::default(), + prev_events: Default::default(), depth: uint!(0), hashes: EventHash::default(), signatures: None, @@ -468,6 +468,7 @@ where S: AsRef, { let ts = SERVER_TIMESTAMP.fetch_add(1, SeqCst); + let state_key = state_key.map(ToOwned::to_owned); let id = if id.contains('$') { id.to_owned() } else { @@ -477,14 +478,13 @@ where .iter() .map(AsRef::as_ref) .map(event_id) - .collect::>(); + .collect(); let prev_events = prev_events .iter() .map(AsRef::as_ref) .map(event_id) - .collect::>(); + .collect(); - let state_key = state_key.map(ToOwned::to_owned); PduEvent { event_id: id.try_into().unwrap(), room_id: room_id().to_owned(), @@ -528,18 +528,18 @@ where } let ts = SERVER_TIMESTAMP.fetch_add(1, SeqCst); + let state_key = state_key.map(ToOwned::to_owned); let auth_events = auth_events .iter() .map(AsRef::as_ref) .map(event_id) - .collect::>(); + .collect(); let prev_events = prev_events .iter() .map(AsRef::as_ref) .map(event_id) - .collect::>(); + .collect(); - let state_key = state_key.map(ToOwned::to_owned); PduEvent { event_id: event_id(id), room_id: hydra_room_id().to_owned(), @@ -581,12 +581,12 @@ where .iter() .map(AsRef::as_ref) .map(event_id) - .collect::>(); + .collect(); let prev_events = prev_events .iter() .map(AsRef::as_ref) .map(event_id) - .collect::>(); + .collect(); PduEvent { event_id: id.try_into().unwrap(), @@ -636,8 +636,8 @@ pub(super) fn room_create_hydra_pdu_event( content, redacts: None, unsigned: None, - auth_events: vec![], - prev_events: vec![], + auth_events: Default::default(), + prev_events: Default::default(), depth: uint!(0), hashes: EventHash::default(), signatures: None, diff --git a/src/service/rooms/event_handler/handle_prev_pdu.rs b/src/service/rooms/event_handler/handle_prev_pdu.rs index 6349f283..041630dc 100644 --- a/src/service/rooms/event_handler/handle_prev_pdu.rs +++ b/src/service/rooms/event_handler/handle_prev_pdu.rs @@ -1,5 +1,6 @@ use std::{ops::Range, time::Duration}; +use futures::FutureExt; use ruma::{ CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, RoomId, RoomVersionId, ServerName, }; @@ -62,6 +63,7 @@ pub(super) async fn handle_prev_pdu( room_version, create_event_id, ) + .boxed() .await?; Ok(()) diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index b7ba31fb..2c85b7b5 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -212,13 +212,11 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( .services .state_compressor .save_state(room_id, new_room_state) - .boxed() .await?; self.services .state .force_state(room_id, shortstatehash, added, removed, &state_lock) - .boxed() .await?; } diff --git a/src/service/rooms/timeline/create.rs b/src/service/rooms/timeline/create.rs index 1bcbee5c..bfd43e4e 100644 --- a/src/service/rooms/timeline/create.rs +++ b/src/service/rooms/timeline/create.rs @@ -13,7 +13,7 @@ use tuwunel_core::{ Error, Result, err, implement, matrix::{ event::{Event, StateKey, TypeExt}, - pdu::{EventHash, PduBuilder, PduEvent}, + pdu::{EventHash, PduBuilder, PduEvent, PrevEvents}, room_version, state_res::{self}, }, @@ -43,7 +43,7 @@ pub async fn create_hash_and_sign_event( timestamp, } = pdu_builder; - let prev_events: Vec = self + let prev_events: PrevEvents = self .services .state .get_forward_extremities(room_id)