From e9864bc4e724689745f23412a8131d3e3878799c Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Wed, 11 Mar 2026 13:42:17 +0000 Subject: [PATCH] Encapsulate incoming pdu formatting and checks within constructor. Signed-off-by: Jason Volk --- src/admin/debug/commands.rs | 6 +- src/core/matrix/pdu.rs | 61 +++++++++++++++---- src/core/matrix/pdu/format.rs | 15 ++--- src/core/matrix/pdu/format/check.rs | 23 +++++-- src/service/federation/format.rs | 2 +- src/service/membership/join.rs | 16 ++--- src/service/membership/knock.rs | 4 +- src/service/membership/leave.rs | 4 +- src/service/rooms/event_handler/fetch_prev.rs | 8 ++- .../rooms/event_handler/handle_outlier_pdu.rs | 19 +++--- src/service/rooms/event_handler/mod.rs | 19 +----- .../event_handler/upgrade_outlier_pdu.rs | 8 +-- src/service/rooms/timeline/create.rs | 4 +- 13 files changed, 110 insertions(+), 79 deletions(-) diff --git a/src/admin/debug/commands.rs b/src/admin/debug/commands.rs index 0d5b08af..bdd9529f 100644 --- a/src/admin/debug/commands.rs +++ b/src/admin/debug/commands.rs @@ -553,9 +553,11 @@ pub(super) async fn force_set_room_state_from_server( }; let pdu = if value["type"] == "m.room.create" { - PduEvent::from_rid_val(&room_id, &event_id, value.clone()).map_err(invalid_pdu_err)? + PduEvent::from_object_and_roomid_and_eventid(&room_id, &event_id, value.clone()) + .map_err(invalid_pdu_err)? } else { - PduEvent::from_id_val(&event_id, value.clone()).map_err(invalid_pdu_err)? + PduEvent::from_object_and_eventid(&event_id, value.clone()) + .map_err(invalid_pdu_err)? }; if !value.contains_key("room_id") { diff --git a/src/core/matrix/pdu.rs b/src/core/matrix/pdu.rs index 78e13053..0a8765c4 100644 --- a/src/core/matrix/pdu.rs +++ b/src/core/matrix/pdu.rs @@ -1,6 +1,6 @@ mod builder; mod count; -pub mod format; +mod format; mod hashes; mod id; mod raw_id; @@ -13,6 +13,7 @@ use std::cmp::Ordering; use ruma::{ CanonicalJsonObject, CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, UInt, UserId, events::TimelineEventType, + room_version_rules::RoomVersionRules, }; use serde::{Deserialize, Serialize}; use serde_json::value::RawValue as RawJsonValue; @@ -22,7 +23,10 @@ pub use self::{ Count as PduCount, Id as PduId, Pdu as PduEvent, RawId as RawPduId, builder::{Builder, Builder as PduBuilder}, count::Count, - format::check::check_pdu_format, + format::{ + check::{check_room_id, check_rules}, + from_incoming_federation, into_outgoing_federation, + }, hashes::EventHashes as EventHash, id::Id, raw_id::*, @@ -99,28 +103,61 @@ pub const MAX_PREV_EVENTS: usize = 20; pub const MAX_AUTH_EVENTS: usize = 10; impl Pdu { - pub fn from_rid_val( + pub fn from_object_and_roomid_and_eventid( room_id: &RoomId, event_id: &EventId, mut json: CanonicalJsonObject, ) -> Result { let room_id = CanonicalJsonValue::String(room_id.into()); json.insert("room_id".into(), room_id); - - Self::from_id_val(event_id, json) + Self::from_object_and_eventid(event_id, json) } - pub fn from_id_val(event_id: &EventId, mut json: CanonicalJsonObject) -> Result { + pub fn from_object_and_eventid( + event_id: &EventId, + mut json: CanonicalJsonObject, + ) -> Result { let event_id = CanonicalJsonValue::String(event_id.into()); json.insert("event_id".into(), event_id); - - Self::from_val(&json) + Self::from_object(json) } - pub fn from_val(json: &CanonicalJsonObject) -> Result { - serde_json::to_value(json) - .and_then(serde_json::from_value) - .map_err(Into::into) + pub fn from_object_federation( + room_id: &RoomId, + event_id: &EventId, + json: CanonicalJsonObject, + rules: &RoomVersionRules, + ) -> Result<(Self, CanonicalJsonObject)> { + let json = from_incoming_federation(room_id, event_id, json, rules); + let pdu = Self::from_object_checked(json.clone(), rules)?; + check_room_id(&pdu, room_id)?; + Ok((pdu, json)) + } + + pub fn from_object_checked( + json: CanonicalJsonObject, + rules: &RoomVersionRules, + ) -> Result { + check_rules(&json, &rules.event_format)?; + Self::from_object(json) + } + + pub fn from_object(json: CanonicalJsonObject) -> Result { + let json = CanonicalJsonValue::Object(json); + Self::from_value(json) + } + + pub fn from_raw_value(json: &RawJsonValue) -> Result { + let json: CanonicalJsonValue = json.into(); + Self::from_value(json) + } + + pub fn from_value(json: CanonicalJsonValue) -> Result { + serde_json::from_value(json.into()).map_err(Into::into) + } + + pub fn from_raw_json(json: &RawJsonValue) -> Result { + Self::deserialize(json).map_err(Into::into) } } diff --git a/src/core/matrix/pdu/format.rs b/src/core/matrix/pdu/format.rs index ac19d1c8..3b57e0be 100644 --- a/src/core/matrix/pdu/format.rs +++ b/src/core/matrix/pdu/format.rs @@ -5,11 +5,9 @@ use ruma::{ room_version_rules::{EventsReferenceFormatVersion, RoomVersionRules}, }; -use crate::{ - Result, extract_variant, is_equal_to, - matrix::{PduEvent, room_version}, -}; +use crate::{extract_variant, is_equal_to, matrix::room_version}; +#[must_use] pub fn into_outgoing_federation( mut pdu_json: CanonicalJsonObject, room_version: &RoomVersionId, @@ -68,12 +66,13 @@ fn mutate_outgoing_reference_format(value: &mut CanonicalJsonValue) { }); } +#[must_use] pub fn from_incoming_federation( room_id: &RoomId, event_id: &EventId, - pdu_json: &mut CanonicalJsonObject, + mut pdu_json: CanonicalJsonObject, room_rules: &RoomVersionRules, -) -> Result { +) -> CanonicalJsonObject { if matches!(room_rules.events_reference_format, EventsReferenceFormatVersion::V1) { if let Some(value) = pdu_json.get_mut("auth_events") { mutate_incoming_reference_format(value); @@ -95,9 +94,7 @@ pub fn from_incoming_federation( pdu_json.insert("event_id".into(), CanonicalJsonValue::String(event_id.into())); } - check::check_pdu_format(pdu_json, &room_rules.event_format)?; - - PduEvent::from_val(pdu_json) + pdu_json } fn mutate_incoming_reference_format(value: &mut CanonicalJsonValue) { diff --git a/src/core/matrix/pdu/format/check.rs b/src/core/matrix/pdu/format/check.rs index 69dacef9..a5ffbf51 100644 --- a/src/core/matrix/pdu/format/check.rs +++ b/src/core/matrix/pdu/format/check.rs @@ -4,10 +4,21 @@ use ruma::{ }; use serde_json::to_string as to_json_string; -use crate::{ - Err, Result, err, - matrix::pdu::{MAX_AUTH_EVENTS, MAX_PDU_BYTES, MAX_PREV_EVENTS}, -}; +use super::super::{MAX_AUTH_EVENTS, MAX_PDU_BYTES, MAX_PREV_EVENTS, Pdu}; +use crate::{Err, Result, err}; + +pub fn check_room_id(pdu: &Pdu, room_id: &RoomId) -> Result { + if pdu.room_id != room_id { + return Err!(Request(InvalidParam(error!( + pdu_event_id = ?pdu.event_id, + pdu_room_id = ?pdu.room_id, + ?room_id, + "Event in wrong room", + )))); + } + + Ok(()) +} /// Check that the given canonicalized PDU respects the event format of the room /// version and the [size limits] from the Matrix specification. @@ -31,7 +42,7 @@ use crate::{ /// /// [size limits]: https://spec.matrix.org/latest/client-server-api/#size-limits /// [checks performed on receipt of a PDU]: https://spec.matrix.org/latest/server-server-api/#checks-performed-on-receipt-of-a-pdu -pub fn check_pdu_format(pdu: &CanonicalJsonObject, rules: &EventFormatRules) -> Result { +pub fn check_rules(pdu: &CanonicalJsonObject, rules: &EventFormatRules) -> Result { // Check the PDU size, it must occur on the full PDU with signatures. let json = to_json_string(&pdu) .map_err(|e| err!(Request(BadJson("Failed to serialize canonical JSON: {e}"))))?; @@ -200,7 +211,7 @@ mod tests { }; use serde_json::{from_value as from_json_value, json}; - use super::check_pdu_format; + use super::check_rules as check_pdu_format; /// Construct a PDU valid for the event format of room v1. fn pdu_v1() -> CanonicalJsonObject { diff --git a/src/service/federation/format.rs b/src/service/federation/format.rs index 45cee589..956697f4 100644 --- a/src/service/federation/format.rs +++ b/src/service/federation/format.rs @@ -35,7 +35,7 @@ pub async fn format_pdu_into( .as_ref() .or(room_version) { - pdu_json = pdu::format::into_outgoing_federation(pdu_json, room_version); + pdu_json = pdu::into_outgoing_federation(pdu_json, room_version); } else { pdu_json.remove("event_id"); } diff --git a/src/service/membership/join.rs b/src/service/membership/join.rs index 0f195171..19cb212c 100644 --- a/src/service/membership/join.rs +++ b/src/service/membership/join.rs @@ -29,7 +29,7 @@ use serde_json::value::RawValue as RawJsonValue; use tuwunel_core::{ Err, Result, at, debug, debug_error, debug_info, debug_warn, err, error, implement, info, matrix::{event::gen_event_id_canonical_json, room_version}, - pdu::{PduBuilder, check_pdu_format, format::from_incoming_federation}, + pdu::{Pdu, PduBuilder, check_rules}, trace, utils::{self, BoolExt, IterStream, ReadyExt, future::TryExtExt, math::Expected, shuffle}, warn, @@ -310,8 +310,8 @@ pub async fn join_remote( %shortroomid, "Initialized room. Parsing join event..." ); - let parsed_join_pdu = - from_incoming_federation(room_id, &event_id, &mut join_event, &room_version_rules)?; + let (parsed_join_pdu, join_event) = + Pdu::from_object_federation(room_id, &event_id, join_event, &room_version_rules)?; let resp_state = &response.state; let resp_auth = &response.auth_chain; @@ -337,12 +337,12 @@ pub async fn join_remote( }) .inspect_err(|e| debug_error!("Invalid send_join state event: {e:?}")) .ready_filter_map(Result::ok) - .ready_filter_map(|(event_id, mut value)| { - from_incoming_federation(room_id, &event_id, &mut value, &room_version_rules) + .ready_filter_map(|(event_id, value)| { + Pdu::from_object_federation(room_id, &event_id, value, &room_version_rules) .inspect_err(|e| { - debug_warn!("Invalid PDU in send_join response: {e:?}: {value:#?}"); + debug_warn!("Invalid PDU {event_id:?} in send_join response: {e:?}"); }) - .map(move |pdu| (event_id, pdu, value)) + .map(move |(pdu, value)| (event_id, pdu, value)) .ok() }) .fold(HashMap::new(), async |mut state, (event_id, pdu, value)| { @@ -757,7 +757,7 @@ async fn create_join_event( .server_keys .gen_id_hash_and_sign_event(&mut event, room_version_id)?; - check_pdu_format(&event, &room_version_rules.event_format)?; + check_rules(&event, &room_version_rules.event_format)?; Ok((event, event_id, join_authorized_via_users_server)) } diff --git a/src/service/membership/knock.rs b/src/service/membership/knock.rs index 0d76306f..f7d8de66 100644 --- a/src/service/membership/knock.rs +++ b/src/service/membership/knock.rs @@ -308,7 +308,7 @@ async fn knock_room_helper_local( info!("Parsing knock event"); - let parsed_knock_pdu = PduEvent::from_id_val(&event_id, knock_event.clone()) + let parsed_knock_pdu = PduEvent::from_object_and_eventid(&event_id, knock_event.clone()) .map_err(|e| err!(BadServerResponse("Invalid knock event PDU: {e:?}")))?; info!("Updating membership locally to knock state with provided stripped state events"); @@ -480,7 +480,7 @@ async fn knock_room_helper_remote( .await; info!("Parsing knock event"); - let parsed_knock_pdu = PduEvent::from_id_val(&event_id, knock_event.clone()) + let parsed_knock_pdu = PduEvent::from_object_and_eventid(&event_id, knock_event.clone()) .map_err(|e| err!(BadServerResponse("Invalid knock event PDU: {e:?}")))?; info!("Going through send_knock response knock state events"); diff --git a/src/service/membership/leave.rs b/src/service/membership/leave.rs index 0292f303..16962013 100644 --- a/src/service/membership/leave.rs +++ b/src/service/membership/leave.rs @@ -16,7 +16,7 @@ use ruma::{ }; use tuwunel_core::{ Err, Result, debug_info, debug_warn, err, implement, - matrix::{PduCount, pdu::check_pdu_format, room_version}, + matrix::{PduCount, pdu::check_rules, room_version}, pdu::PduBuilder, utils::{ self, FutureBoolExt, @@ -353,7 +353,7 @@ async fn remote_leave( .server_keys .gen_id_hash_and_sign_event(&mut event, &room_version_id)?; - check_pdu_format(&event, &room_version_rules.event_format)?; + check_rules(&event, &room_version_rules.event_format)?; self.services .federation diff --git a/src/service/rooms/event_handler/fetch_prev.rs b/src/service/rooms/event_handler/fetch_prev.rs index 884512c7..671ed340 100644 --- a/src/service/rooms/event_handler/fetch_prev.rs +++ b/src/service/rooms/event_handler/fetch_prev.rs @@ -7,11 +7,13 @@ use ruma::{ }; use tuwunel_core::{ Result, debug_warn, err, implement, - matrix::{Event, PduEvent, pdu::MAX_PREV_EVENTS}, + matrix::{ + Event, PduEvent, + pdu::{MAX_PREV_EVENTS, check_room_id}, + }, utils::stream::IterStream, }; -use super::check_room_id; use crate::rooms::state_res; #[implement(super::Service)] @@ -69,7 +71,7 @@ where continue; }; - check_room_id(room_id, &pdu)?; + check_room_id(&pdu, room_id)?; let limit = self.services.server.config.max_fetch_prev_events; if amount > limit { diff --git a/src/service/rooms/event_handler/handle_outlier_pdu.rs b/src/service/rooms/event_handler/handle_outlier_pdu.rs index 05383666..33cd57cf 100644 --- a/src/service/rooms/event_handler/handle_outlier_pdu.rs +++ b/src/service/rooms/event_handler/handle_outlier_pdu.rs @@ -5,13 +5,11 @@ use ruma::{ use tuwunel_core::{ Err, Result, debug, debug_info, err, implement, matrix::{Event, PduEvent, event::TypeExt, room_version}, - pdu::{check_pdu_format, format::from_incoming_federation}, ref_at, trace, utils::{future::TryExtExt, stream::IterStream}, warn, }; -use super::check_room_id; use crate::rooms::state_res; #[implement(super::Service)] @@ -41,7 +39,7 @@ pub(super) async fn handle_outlier_pdu( // anywhere?: https://matrix.org/docs/spec/rooms/v6#canonical-json // 2. Check signatures, otherwise drop // 3. check content hash, redact if doesn't match - let mut pdu_json = match self + let pdu_json = match self .services .server_keys .verify_event(&pdu_json, Some(room_version)) @@ -57,7 +55,8 @@ pub(super) async fn handle_outlier_pdu( ))); }; - let Ok(obj) = ruma::canonical_json::redact(pdu_json, &rules.redaction, None) else { + let Ok(pdu_json) = ruma::canonical_json::redact(pdu_json, &rules.redaction, None) + else { return Err!(Request(InvalidParam("Redaction failed"))); }; @@ -68,7 +67,7 @@ pub(super) async fn handle_outlier_pdu( ))); } - obj + pdu_json }, | Err(e) => { return Err!(Request(InvalidParam(debug_error!( @@ -77,15 +76,11 @@ pub(super) async fn handle_outlier_pdu( }, }; - let room_rules = room_version::rules(room_version)?; - - check_pdu_format(&pdu_json, &room_rules.event_format)?; - // Now that we have checked the signature and hashes we can make mutations and // convert to our PduEvent type. - let event = from_incoming_federation(room_id, event_id, &mut pdu_json, &room_rules)?; - - check_room_id(room_id, &event)?; + let room_rules = room_version::rules(room_version)?; + let (event, pdu_json) = + PduEvent::from_object_federation(room_id, event_id, pdu_json, &room_rules)?; if !auth_events_known { // 4. fetch any missing auth events doing all checks listed here starting at 1. diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index ae77eae5..a92a4d31 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -19,10 +19,10 @@ use std::{ }; use async_trait::async_trait; -use ruma::{EventId, OwnedEventId, OwnedRoomId, RoomId}; +use ruma::{EventId, OwnedEventId, OwnedRoomId}; use tuwunel_core::{ - Err, Result, implement, - matrix::{Event, PduEvent}, + Result, implement, + matrix::PduEvent, utils::{MutexMap, bytes::pretty, continue_exponential_backoff}, }; @@ -146,16 +146,3 @@ async fn event_exists(&self, event_id: &EventId) -> bool { async fn event_fetch(&self, event_id: &EventId) -> Result { self.services.timeline.get_pdu(event_id).await } - -fn check_room_id(room_id: &RoomId, pdu: &Pdu) -> Result { - if pdu.room_id() != room_id { - return Err!(Request(InvalidParam(error!( - pdu_event_id = ?pdu.event_id(), - pdu_room_id = ?pdu.room_id(), - ?room_id, - "Found event from room in room", - )))); - } - - Ok(()) -} diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index c826129c..55ce83eb 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -7,7 +7,7 @@ use ruma::{ }; use tuwunel_core::{ Err, Result, debug, debug_info, err, implement, is_equal_to, - matrix::{Event, EventTypeExt, PduEvent, StateKey, pdu::check_pdu_format, room_version}, + matrix::{Event, EventTypeExt, PduEvent, StateKey, pdu::check_rules, room_version}, trace, utils::stream::{BroadbandExt, ReadyExt}, warn, @@ -33,7 +33,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( origin: &ServerName, room_id: &RoomId, incoming_pdu: PduEvent, - val: CanonicalJsonObject, + pdu_json: CanonicalJsonObject, room_version: &RoomVersionId, recursion_level: usize, create_event_id: &EventId, @@ -64,7 +64,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( let room_rules = room_version::rules(room_version)?; trace!(format = ?room_rules.event_format, "Checking format"); - check_pdu_format(&val, &room_rules.event_format)?; + check_rules(&pdu_json, &room_rules.event_format)?; // 10. Fetch missing state and auth chain events by calling /state_ids at // backwards extremities doing all the checks in this list starting at 1. @@ -280,7 +280,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( .timeline .append_incoming_pdu( &incoming_pdu, - val, + pdu_json, extremities, state_ids_compressed, soft_fail, diff --git a/src/service/rooms/timeline/create.rs b/src/service/rooms/timeline/create.rs index f8ceea9c..25a0c3a5 100644 --- a/src/service/rooms/timeline/create.rs +++ b/src/service/rooms/timeline/create.rs @@ -13,7 +13,7 @@ use tuwunel_core::{ Error, Result, err, implement, matrix::{ event::{Event, StateKey, TypeExt}, - pdu::{EventHash, PduBuilder, PduEvent, PrevEvents, check_pdu_format}, + pdu::{EventHash, PduBuilder, PduEvent, PrevEvents, check_rules}, room_version, }, utils::{ @@ -195,7 +195,7 @@ pub async fn create_hash_and_sign_event( pdu_json.insert("room_id".into(), CanonicalJsonValue::String(pdu.room_id.clone().into())); } - check_pdu_format(&pdu_json, &version_rules.event_format)?; + check_rules(&pdu_json, &version_rules.event_format)?; // Generate short event id let _shorteventid = self