From af7dfb31bca4dd182827de44ab6128ec50c930de Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sun, 27 Apr 2025 09:34:07 +0000 Subject: [PATCH] Abstract Pdu filter matching into trait Event. Abstract Pdu unsigned accessors into trait Event. Abstract Pdu relation related into trait Event. Abstract PDU content into trait Event. Move event_id utils from pdu to event. Signed-off-by: Jason Volk --- src/admin/debug/commands.rs | 2 +- src/admin/user/commands.rs | 21 ++-- src/api/client/account.rs | 4 +- src/api/client/directory.rs | 5 +- src/api/client/membership/invite.rs | 2 +- src/api/client/membership/join.rs | 3 +- src/api/client/membership/knock.rs | 5 +- src/api/client/membership/leave.rs | 2 +- src/api/client/membership/members.rs | 13 +-- src/api/client/message.rs | 30 +++--- src/api/client/relations.rs | 21 ++-- src/api/client/room/initial_sync.rs | 4 +- src/api/client/room/upgrade.rs | 4 +- src/api/server/invite.rs | 7 +- src/api/server/send_join.rs | 2 +- src/api/server/send_knock.rs | 2 +- src/api/server/send_leave.rs | 2 +- src/core/matrix/event.rs | 90 ++++++++++++++-- src/core/matrix/event/filter.rs | 101 ++++++++++++++++++ .../matrix/{pdu/event_id.rs => event/id.rs} | 0 src/core/matrix/event/relation.rs | 28 +++++ src/core/matrix/event/unsigned.rs | 51 +++++++++ src/core/matrix/pdu.rs | 28 +++-- src/core/matrix/pdu/content.rs | 20 ---- src/core/matrix/pdu/filter.rs | 94 ---------------- src/core/matrix/pdu/redact.rs | 4 +- src/core/matrix/pdu/relation.rs | 22 ---- src/core/matrix/pdu/unsigned.rs | 44 +------- src/core/matrix/state_res/mod.rs | 10 +- src/service/admin/mod.rs | 6 +- src/service/pusher/mod.rs | 10 +- src/service/rooms/alias/mod.rs | 3 +- .../fetch_and_handle_outliers.rs | 40 ++++--- src/service/rooms/event_handler/fetch_prev.rs | 50 +++++---- .../rooms/event_handler/fetch_state.rs | 29 +++-- .../event_handler/handle_incoming_pdu.rs | 16 +-- .../rooms/event_handler/handle_outlier_pdu.rs | 40 ++++--- .../rooms/event_handler/handle_prev_pdu.rs | 20 ++-- src/service/rooms/event_handler/mod.rs | 16 +-- .../rooms/event_handler/parse_incoming_pdu.rs | 4 +- .../rooms/event_handler/state_at_incoming.rs | 48 +++++---- .../event_handler/upgrade_outlier_pdu.rs | 52 +++++---- src/service/rooms/outlier/mod.rs | 2 +- src/service/rooms/pdu_metadata/data.rs | 13 +-- src/service/rooms/pdu_metadata/mod.rs | 19 ++-- src/service/rooms/read_receipt/mod.rs | 10 +- src/service/rooms/search/mod.rs | 10 +- src/service/rooms/state/mod.rs | 4 +- .../rooms/state_accessor/room_state.rs | 8 +- src/service/rooms/state_accessor/state.rs | 16 ++- src/service/rooms/state_accessor/user_can.rs | 14 +-- src/service/rooms/threads/mod.rs | 13 ++- src/service/rooms/timeline/data.rs | 1 - src/service/rooms/timeline/mod.rs | 89 ++++++++------- src/service/sending/sender.rs | 2 +- src/service/server_keys/verify.rs | 2 +- 56 files changed, 666 insertions(+), 492 deletions(-) create mode 100644 src/core/matrix/event/filter.rs rename src/core/matrix/{pdu/event_id.rs => event/id.rs} (100%) create mode 100644 src/core/matrix/event/relation.rs create mode 100644 src/core/matrix/event/unsigned.rs delete mode 100644 src/core/matrix/pdu/content.rs delete mode 100644 src/core/matrix/pdu/filter.rs delete mode 100644 src/core/matrix/pdu/relation.rs diff --git a/src/admin/debug/commands.rs b/src/admin/debug/commands.rs index eb6647e2..d164c694 100644 --- a/src/admin/debug/commands.rs +++ b/src/admin/debug/commands.rs @@ -598,7 +598,7 @@ pub(super) async fn force_set_room_state_from_server( .sending .send_federation_request(&server_name, get_room_state::v1::Request { room_id: room_id.clone(), - event_id: first_pdu.event_id.clone(), + event_id: first_pdu.event_id().to_owned(), }) .await?; diff --git a/src/admin/user/commands.rs b/src/admin/user/commands.rs index 55e91773..5e866a19 100644 --- a/src/admin/user/commands.rs +++ b/src/admin/user/commands.rs @@ -738,7 +738,7 @@ pub(super) async fn force_demote(&self, user_id: String, room_id: OwnedRoomOrAli .state_accessor .room_state_get(&room_id, &StateEventType::RoomCreate, "") .await - .is_ok_and(|event| event.sender == user_id); + .is_ok_and(|event| event.sender() == user_id); if !user_can_demote_self { return Err!("User is not allowed to modify their own power levels in the room.",); @@ -895,10 +895,11 @@ pub(super) async fn redact_event(&self, event_id: OwnedEventId) -> Result { return Err!("Event is already redacted."); } - let room_id = event.room_id; - let sender_user = event.sender; - - if !self.services.globals.user_is_local(&sender_user) { + if !self + .services + .globals + .user_is_local(event.sender()) + { return Err!("This command only works on local users."); } @@ -913,7 +914,7 @@ pub(super) async fn redact_event(&self, event_id: OwnedEventId) -> Result { .rooms .state .mutex - .lock(&room_id) + .lock(event.room_id()) .await; self.services @@ -921,14 +922,14 @@ pub(super) async fn redact_event(&self, event_id: OwnedEventId) -> Result { .timeline .build_and_append_pdu( PduBuilder { - redacts: Some(event.event_id.clone()), + redacts: Some(event.event_id().to_owned()), ..PduBuilder::timeline(&RoomRedactionEventContent { - redacts: Some(event.event_id.clone()), + redacts: Some(event.event_id().to_owned()), reason: Some(reason), }) }, - &sender_user, - &room_id, + event.sender(), + event.room_id(), &state_lock, ) .await? diff --git a/src/api/client/account.rs b/src/api/client/account.rs index 6c283b10..bbfe9473 100644 --- a/src/api/client/account.rs +++ b/src/api/client/account.rs @@ -24,7 +24,7 @@ use ruma::{ }; use tuwunel_core::{ Err, Error, Result, debug_info, err, error, info, is_equal_to, - matrix::pdu::PduBuilder, + matrix::{Event, pdu::PduBuilder}, utils, utils::{ReadyExt, stream::BroadbandExt}, warn, @@ -934,7 +934,7 @@ pub async fn full_user_deactivate( .state_accessor .room_state_get(room_id, &StateEventType::RoomCreate, "") .await - .is_ok_and(|event| event.sender == user_id); + .is_ok_and(|event| event.sender() == user_id); if user_can_demote_self { let mut power_levels_content = room_power_levels.unwrap_or_default(); diff --git a/src/api/client/directory.rs b/src/api/client/directory.rs index b3569a71..92e2444e 100644 --- a/src/api/client/directory.rs +++ b/src/api/client/directory.rs @@ -28,6 +28,7 @@ use ruma::{ }; use tuwunel_core::{ Err, Result, err, info, + matrix::Event, utils::{ TryFutureExtExt, math::Expected, @@ -389,7 +390,7 @@ async fn user_can_publish_room( .room_state_get(room_id, &StateEventType::RoomPowerLevels, "") .await { - | Ok(event) => serde_json::from_str(event.content.get()) + | Ok(event) => serde_json::from_str(event.content().get()) .map_err(|_| err!(Database("Invalid event content for m.room.power_levels"))) .map(|content: RoomPowerLevelsEventContent| { RoomPowerLevels::from(content) @@ -402,7 +403,7 @@ async fn user_can_publish_room( .room_state_get(room_id, &StateEventType::RoomCreate, "") .await { - | Ok(event) => Ok(event.sender == user_id), + | Ok(event) => Ok(event.sender() == user_id), | _ => Err!(Request(Forbidden("User is not allowed to publish this room"))), } }, diff --git a/src/api/client/membership/invite.rs b/src/api/client/membership/invite.rs index 16c39546..c510d077 100644 --- a/src/api/client/membership/invite.rs +++ b/src/api/client/membership/invite.rs @@ -8,7 +8,7 @@ use ruma::{ }; use tuwunel_core::{ Err, Result, debug_error, err, info, - matrix::pdu::{PduBuilder, gen_event_id_canonical_json}, + matrix::{event::gen_event_id_canonical_json, pdu::PduBuilder}, }; use tuwunel_service::Services; diff --git a/src/api/client/membership/join.rs b/src/api/client/membership/join.rs index e5c4f8f8..d7f24014 100644 --- a/src/api/client/membership/join.rs +++ b/src/api/client/membership/join.rs @@ -26,7 +26,8 @@ use tuwunel_core::{ Err, Result, debug, debug_info, debug_warn, err, error, info, matrix::{ StateKey, - pdu::{PduBuilder, PduEvent, gen_event_id, gen_event_id_canonical_json}, + event::{gen_event_id, gen_event_id_canonical_json}, + pdu::{PduBuilder, PduEvent}, state_res, }, result::FlatOk, diff --git a/src/api/client/membership/knock.rs b/src/api/client/membership/knock.rs index 9820c1af..fb500601 100644 --- a/src/api/client/membership/knock.rs +++ b/src/api/client/membership/knock.rs @@ -18,7 +18,10 @@ use ruma::{ }; use tuwunel_core::{ Err, Result, debug, debug_info, debug_warn, err, info, - matrix::pdu::{PduBuilder, PduEvent, gen_event_id}, + matrix::{ + event::{Event, gen_event_id}, + pdu::{PduBuilder, PduEvent}, + }, result::FlatOk, trace, utils::{self, shuffle, stream::IterStream}, diff --git a/src/api/client/membership/leave.rs b/src/api/client/membership/leave.rs index ff4cada9..84d77c73 100644 --- a/src/api/client/membership/leave.rs +++ b/src/api/client/membership/leave.rs @@ -15,7 +15,7 @@ use ruma::{ }; use tuwunel_core::{ Err, Result, debug_info, debug_warn, err, - matrix::pdu::{PduBuilder, gen_event_id}, + matrix::{event::gen_event_id, pdu::PduBuilder}, utils::{self, FutureBoolExt, future::ReadyEqExt}, warn, }; diff --git a/src/api/client/membership/members.rs b/src/api/client/membership/members.rs index 410410f8..d9ae1a38 100644 --- a/src/api/client/membership/members.rs +++ b/src/api/client/membership/members.rs @@ -1,5 +1,5 @@ use axum::extract::State; -use futures::{StreamExt, future::join}; +use futures::{FutureExt, StreamExt, future::join}; use ruma::{ api::client::membership::{ get_member_events::{self, v3::MembershipEventFilter}, @@ -11,8 +11,8 @@ use ruma::{ }, }; use tuwunel_core::{ - Err, Event, Result, at, - matrix::pdu::PduEvent, + Err, Result, at, + matrix::Event, utils::{ future::TryExtExt, stream::{BroadbandExt, ReadyExt}, @@ -55,6 +55,7 @@ pub(crate) async fn get_member_events_route( .ready_filter_map(|pdu| membership_filter(pdu, membership, not_membership)) .map(Event::into_format) .collect() + .boxed() .await, }) } @@ -98,11 +99,11 @@ pub(crate) async fn joined_members_route( }) } -fn membership_filter( - pdu: PduEvent, +fn membership_filter( + pdu: Pdu, for_membership: Option<&MembershipEventFilter>, not_membership: Option<&MembershipEventFilter>, -) -> Option { +) -> Option { let membership_state_filter = match for_membership { | Some(MembershipEventFilter::Ban) => MembershipState::Ban, | Some(MembershipEventFilter::Invite) => MembershipState::Invite, diff --git a/src/api/client/message.rs b/src/api/client/message.rs index 32c56793..85f038e7 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -12,9 +12,10 @@ use ruma::{ use tuwunel_core::{ Err, Result, at, matrix::{ - Event, - pdu::{PduCount, PduEvent}, + event::{Event, Matches}, + pdu::PduCount, }, + ref_at, utils::{ IterStream, ReadyExt, result::{FlatOk, LogErr}, @@ -202,7 +203,9 @@ where pin_mut!(receipts); let witness: Witness = events .stream() - .map(|(_, pdu)| pdu.sender.clone()) + .map(ref_at!(1)) + .map(Event::sender) + .map(ToOwned::to_owned) .chain( receipts .ready_take_while(|(_, c, _)| *c <= newest.into_unsigned()) @@ -247,31 +250,34 @@ pub(crate) async fn ignored_filter( } #[inline] -pub(crate) async fn is_ignored_pdu( +pub(crate) async fn is_ignored_pdu( services: &Services, - pdu: &PduEvent, + event: &Pdu, user_id: &UserId, -) -> bool { +) -> bool +where + Pdu: Event + Send + Sync, +{ // exclude Synapse's dummy events from bloating up response bodies. clients // don't need to see this. - if pdu.kind.to_cow_str() == "org.matrix.dummy_event" { + if event.kind().to_cow_str() == "org.matrix.dummy_event" { return true; } let ignored_type = IGNORED_MESSAGE_TYPES - .binary_search(&pdu.kind) + .binary_search(event.kind()) .is_ok(); let ignored_server = services .config .forbidden_remote_server_names - .is_match(pdu.sender().server_name().host()); + .is_match(event.sender().server_name().host()); if ignored_type && (ignored_server || services .users - .user_is_ignored(&pdu.sender, user_id) + .user_is_ignored(event.sender(), user_id) .await) { return true; @@ -291,7 +297,7 @@ pub(crate) async fn visibility_filter( services .rooms .state_accessor - .user_can_see_event(user_id, &pdu.room_id, &pdu.event_id) + .user_can_see_event(user_id, pdu.room_id(), pdu.event_id()) .await .then_some(item) } @@ -299,7 +305,7 @@ pub(crate) async fn visibility_filter( #[inline] pub(crate) fn event_filter(item: PdusIterItem, filter: &RoomEventFilter) -> Option { let (_, pdu) = &item; - pdu.matches(filter).then_some(item) + filter.matches(pdu).then_some(item) } #[cfg_attr(debug_assertions, tuwunel_core::ctor)] diff --git a/src/api/client/relations.rs b/src/api/client/relations.rs index 42237992..5effde20 100644 --- a/src/api/client/relations.rs +++ b/src/api/client/relations.rs @@ -13,10 +13,13 @@ use ruma::{ }; use tuwunel_core::{ Result, at, - matrix::{Event, pdu::PduCount}, + matrix::{ + event::{Event, RelationTypeEqual}, + pdu::PduCount, + }, utils::{IterStream, ReadyExt, result::FlatOk, stream::WidebandExt}, }; -use tuwunel_service::{Services, rooms::timeline::PdusIterItem}; +use tuwunel_service::Services; use crate::Ruma; @@ -129,7 +132,7 @@ async fn paginate_relations_with_filter( // Spec (v1.10) recommends depth of at least 3 let depth: u8 = if recurse { 3 } else { 1 }; - let events: Vec = services + let events: Vec<_> = services .rooms .pdu_metadata .get_relations(sender_user, room_id, target, start, limit, depth, dir) @@ -138,12 +141,12 @@ async fn paginate_relations_with_filter( .filter(|(_, pdu)| { filter_event_type .as_ref() - .is_none_or(|kind| *kind == pdu.kind) + .is_none_or(|kind| kind == pdu.kind()) }) .filter(|(_, pdu)| { filter_rel_type .as_ref() - .is_none_or(|rel_type| pdu.relation_type_equal(rel_type)) + .is_none_or(|rel_type| rel_type.relation_type_equal(pdu)) }) .stream() .ready_take_while(|(count, _)| Some(*count) != to) @@ -172,17 +175,17 @@ async fn paginate_relations_with_filter( }) } -async fn visibility_filter( +async fn visibility_filter( services: &Services, sender_user: &UserId, - item: PdusIterItem, -) -> Option { + item: (PduCount, Pdu), +) -> Option<(PduCount, Pdu)> { let (_, pdu) = &item; services .rooms .state_accessor - .user_can_see_event(sender_user, &pdu.room_id, &pdu.event_id) + .user_can_see_event(sender_user, pdu.room_id(), pdu.event_id()) .await .then_some(item) } diff --git a/src/api/client/room/initial_sync.rs b/src/api/client/room/initial_sync.rs index a2c1c520..9360c5b5 100644 --- a/src/api/client/room/initial_sync.rs +++ b/src/api/client/room/initial_sync.rs @@ -53,7 +53,9 @@ pub(crate) async fn room_initial_sync_route( .try_collect::>(); let (membership, visibility, state, events) = - try_join4(membership, visibility, state, events).await?; + try_join4(membership, visibility, state, events) + .boxed() + .await?; let messages = PaginationChunk { start: events diff --git a/src/api/client/room/upgrade.rs b/src/api/client/room/upgrade.rs index d6172f80..f066d907 100644 --- a/src/api/client/room/upgrade.rs +++ b/src/api/client/room/upgrade.rs @@ -18,7 +18,7 @@ use ruma::{ use serde_json::{json, value::to_raw_value}; use tuwunel_core::{ Error, Result, err, info, - matrix::{StateKey, pdu::PduBuilder}, + matrix::{Event, StateKey, pdu::PduBuilder}, }; use crate::Ruma; @@ -226,7 +226,7 @@ pub(crate) async fn upgrade_room_route( .room_state_get(&body.room_id, event_type, "") .await { - | Ok(v) => v.content.clone(), + | Ok(v) => v.content().to_owned(), | Err(_) => continue, // Skipping missing events. }; diff --git a/src/api/server/invite.rs b/src/api/server/invite.rs index 5d1ee0c3..12fdbe35 100644 --- a/src/api/server/invite.rs +++ b/src/api/server/invite.rs @@ -8,8 +8,11 @@ use ruma::{ serde::JsonObject, }; use tuwunel_core::{ - Err, Error, PduEvent, Result, err, matrix::Event, pdu::gen_event_id, utils, - utils::hash::sha256, warn, + Err, Error, Result, err, + matrix::{Event, PduEvent, event::gen_event_id}, + utils, + utils::hash::sha256, + warn, }; use crate::Ruma; diff --git a/src/api/server/send_join.rs b/src/api/server/send_join.rs index 58636887..088dfd83 100644 --- a/src/api/server/send_join.rs +++ b/src/api/server/send_join.rs @@ -16,7 +16,7 @@ use ruma::{ use serde_json::value::{RawValue as RawJsonValue, to_raw_value}; use tuwunel_core::{ Err, Result, at, err, - pdu::gen_event_id_canonical_json, + matrix::event::gen_event_id_canonical_json, utils::stream::{IterStream, TryBroadbandExt}, warn, }; diff --git a/src/api/server/send_knock.rs b/src/api/server/send_knock.rs index bf7cc811..b2b5425f 100644 --- a/src/api/server/send_knock.rs +++ b/src/api/server/send_knock.rs @@ -12,7 +12,7 @@ use ruma::{ }; use tuwunel_core::{ Err, Result, err, - matrix::pdu::{PduEvent, gen_event_id_canonical_json}, + matrix::{event::gen_event_id_canonical_json, pdu::PduEvent}, warn, }; diff --git a/src/api/server/send_leave.rs b/src/api/server/send_leave.rs index dd98dccf..d32af5cc 100644 --- a/src/api/server/send_leave.rs +++ b/src/api/server/send_leave.rs @@ -11,7 +11,7 @@ use ruma::{ }, }; use serde_json::value::RawValue as RawJsonValue; -use tuwunel_core::{Err, Result, err, matrix::pdu::gen_event_id_canonical_json}; +use tuwunel_core::{Err, Result, err, matrix::event::gen_event_id_canonical_json}; use tuwunel_service::Services; use crate::Ruma; diff --git a/src/core/matrix/event.rs b/src/core/matrix/event.rs index 5b12770b..a1d1339e 100644 --- a/src/core/matrix/event.rs +++ b/src/core/matrix/event.rs @@ -1,21 +1,27 @@ mod content; +mod filter; mod format; +mod id; mod redact; +mod relation; mod type_ext; +mod unsigned; + +use std::fmt::Debug; use ruma::{ - EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, RoomVersionId, UserId, - events::TimelineEventType, + CanonicalJsonObject, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, + RoomVersionId, UserId, events::TimelineEventType, }; use serde::Deserialize; use serde_json::{Value as JsonValue, value::RawValue as RawJsonValue}; -pub use self::type_ext::TypeExt; -use super::state_key::StateKey; -use crate::Result; +pub use self::{filter::Matches, id::*, relation::RelationTypeEqual, type_ext::TypeExt}; +use super::{pdu::Pdu, state_key::StateKey}; +use crate::{Result, utils}; /// Abstraction of a PDU so users can have their own PDU types. -pub trait Event { +pub trait Event: Clone + Debug { /// Serialize into a Ruma JSON format, consuming. #[inline] fn into_format(self) -> T @@ -36,6 +42,41 @@ pub trait Event { format::Ref(self).into() } + #[inline] + fn contains_unsigned_property(&self, property: &str, is_type: T) -> bool + where + T: FnOnce(&JsonValue) -> bool, + Self: Sized, + { + unsigned::contains_unsigned_property::(self, property, is_type) + } + + #[inline] + fn get_unsigned_property(&self, property: &str) -> Result + where + T: for<'de> Deserialize<'de>, + Self: Sized, + { + unsigned::get_unsigned_property::(self, property) + } + + #[inline] + fn get_unsigned_as_value(&self) -> JsonValue + where + Self: Sized, + { + unsigned::get_unsigned_as_value(self) + } + + #[inline] + fn get_unsigned(&self) -> Result + where + T: for<'de> Deserialize<'de>, + Self: Sized, + { + unsigned::get_unsigned::(self) + } + #[inline] fn get_content_as_value(&self) -> JsonValue where @@ -69,6 +110,39 @@ pub trait Event { redact::is_redacted(self) } + #[inline] + fn into_canonical_object(self) -> CanonicalJsonObject + where + Self: Sized, + { + utils::to_canonical_object(self.into_pdu()).expect("failed to create Value::Object") + } + + #[inline] + fn to_canonical_object(&self) -> CanonicalJsonObject { + utils::to_canonical_object(self.as_pdu()).expect("failed to create Value::Object") + } + + #[inline] + fn into_value(self) -> JsonValue + where + Self: Sized, + { + serde_json::to_value(self.into_pdu()).expect("failed to create JSON Value") + } + + #[inline] + fn to_value(&self) -> JsonValue { + serde_json::to_value(self.as_pdu()).expect("failed to create JSON Value") + } + + #[inline] + fn as_mut_pdu(&mut self) -> &mut Pdu { unimplemented!("not a mutable Pdu") } + + fn as_pdu(&self) -> &Pdu; + + fn into_pdu(self) -> Pdu; + fn is_owned(&self) -> bool; // @@ -76,7 +150,7 @@ pub trait Event { // /// All the authenticating events for this event. - fn auth_events(&self) -> impl DoubleEndedIterator + Send + '_; + fn auth_events(&self) -> impl DoubleEndedIterator + Clone + Send + '_; /// The event's content. fn content(&self) -> &RawJsonValue; @@ -88,7 +162,7 @@ pub trait Event { fn origin_server_ts(&self) -> MilliSecondsSinceUnixEpoch; /// The events before this event. - fn prev_events(&self) -> impl DoubleEndedIterator + Send + '_; + fn prev_events(&self) -> impl DoubleEndedIterator + Clone + Send + '_; /// If this event is a redaction event this is the event it redacts. fn redacts(&self) -> Option<&EventId>; diff --git a/src/core/matrix/event/filter.rs b/src/core/matrix/event/filter.rs new file mode 100644 index 00000000..ef4660b6 --- /dev/null +++ b/src/core/matrix/event/filter.rs @@ -0,0 +1,101 @@ +use ruma::api::client::filter::{RoomEventFilter, UrlFilter}; +use serde_json::Value; + +use super::Event; +use crate::is_equal_to; + +pub trait Matches { + fn matches(&self, event: &E) -> bool; +} + +impl Matches for &RoomEventFilter { + #[inline] + fn matches(&self, event: &E) -> bool { + if !matches_sender(event, self) { + return false; + } + + if !matches_room(event, self) { + return false; + } + + if !matches_type(event, self) { + return false; + } + + if !matches_url(event, self) { + return false; + } + + true + } +} + +fn matches_room(event: &E, filter: &RoomEventFilter) -> bool { + if filter + .not_rooms + .iter() + .any(is_equal_to!(event.room_id())) + { + return false; + } + + if let Some(rooms) = filter.rooms.as_ref() { + if !rooms.iter().any(is_equal_to!(event.room_id())) { + return false; + } + } + + true +} + +fn matches_sender(event: &E, filter: &RoomEventFilter) -> bool { + if filter + .not_senders + .iter() + .any(is_equal_to!(event.sender())) + { + return false; + } + + if let Some(senders) = filter.senders.as_ref() { + if !senders.iter().any(is_equal_to!(event.sender())) { + return false; + } + } + + true +} + +fn matches_type(event: &E, filter: &RoomEventFilter) -> bool { + let kind = event.kind().to_cow_str(); + + if filter.not_types.iter().any(is_equal_to!(&kind)) { + return false; + } + + if let Some(types) = filter.types.as_ref() { + if !types.iter().any(is_equal_to!(&kind)) { + return false; + } + } + + true +} + +fn matches_url(event: &E, filter: &RoomEventFilter) -> bool { + let Some(url_filter) = filter.url_filter.as_ref() else { + return true; + }; + + //TODO: might be better to use Ruma's Raw rather than serde here + let url = event + .get_content_as_value() + .get("url") + .is_some_and(Value::is_string); + + match url_filter { + | UrlFilter::EventsWithUrl => url, + | UrlFilter::EventsWithoutUrl => !url, + } +} diff --git a/src/core/matrix/pdu/event_id.rs b/src/core/matrix/event/id.rs similarity index 100% rename from src/core/matrix/pdu/event_id.rs rename to src/core/matrix/event/id.rs diff --git a/src/core/matrix/event/relation.rs b/src/core/matrix/event/relation.rs new file mode 100644 index 00000000..58324e86 --- /dev/null +++ b/src/core/matrix/event/relation.rs @@ -0,0 +1,28 @@ +use ruma::events::relation::RelationType; +use serde::Deserialize; + +use super::Event; + +pub trait RelationTypeEqual { + fn relation_type_equal(&self, event: &E) -> bool; +} + +#[derive(Clone, Debug, Deserialize)] +struct ExtractRelatesToEventId { + #[serde(rename = "m.relates_to")] + relates_to: ExtractRelType, +} + +#[derive(Clone, Debug, Deserialize)] +struct ExtractRelType { + rel_type: RelationType, +} + +impl RelationTypeEqual for RelationType { + fn relation_type_equal(&self, event: &E) -> bool { + event + .get_content() + .map(|c: ExtractRelatesToEventId| c.relates_to.rel_type) + .is_ok_and(|r| r == *self) + } +} diff --git a/src/core/matrix/event/unsigned.rs b/src/core/matrix/event/unsigned.rs new file mode 100644 index 00000000..42928af4 --- /dev/null +++ b/src/core/matrix/event/unsigned.rs @@ -0,0 +1,51 @@ +use serde::Deserialize; +use serde_json::value::Value as JsonValue; + +use super::Event; +use crate::{Result, err, is_true}; + +pub(super) fn contains_unsigned_property(event: &E, property: &str, is_type: F) -> bool +where + F: FnOnce(&JsonValue) -> bool, + E: Event, +{ + get_unsigned_as_value(event) + .get(property) + .map(is_type) + .is_some_and(is_true!()) +} + +pub(super) fn get_unsigned_property(event: &E, property: &str) -> Result +where + T: for<'de> Deserialize<'de>, + E: Event, +{ + get_unsigned_as_value(event) + .get_mut(property) + .map(JsonValue::take) + .map(serde_json::from_value) + .ok_or(err!(Request(NotFound("property not found in unsigned object"))))? + .map_err(|e| err!(Database("Failed to deserialize unsigned.{property} into type: {e}"))) +} + +#[must_use] +pub(super) fn get_unsigned_as_value(event: &E) -> JsonValue +where + E: Event, +{ + get_unsigned::(event).unwrap_or_default() +} + +pub(super) fn get_unsigned(event: &E) -> Result +where + T: for<'de> Deserialize<'de>, + E: Event, +{ + event + .unsigned() + .as_ref() + .map(|raw| raw.get()) + .map(serde_json::from_str) + .ok_or(err!(Request(NotFound("\"unsigned\" property not found in pdu"))))? + .map_err(|e| err!(Database("Failed to deserialize \"unsigned\" into value: {e}"))) +} diff --git a/src/core/matrix/pdu.rs b/src/core/matrix/pdu.rs index e64baeb8..bff0c203 100644 --- a/src/core/matrix/pdu.rs +++ b/src/core/matrix/pdu.rs @@ -1,12 +1,8 @@ mod builder; -mod content; mod count; -mod event_id; -mod filter; mod id; mod raw_id; mod redact; -mod relation; #[cfg(test)] mod tests; mod unsigned; @@ -24,7 +20,6 @@ pub use self::{ Count as PduCount, Id as PduId, Pdu as PduEvent, RawId as RawPduId, builder::{Builder, Builder as PduBuilder}, count::Count, - event_id::*, id::{ShortId, *}, raw_id::*, }; @@ -91,7 +86,7 @@ impl Pdu { impl Event for Pdu { #[inline] - fn auth_events(&self) -> impl DoubleEndedIterator + Send + '_ { + fn auth_events(&self) -> impl DoubleEndedIterator + Clone + Send + '_ { self.auth_events.iter().map(AsRef::as_ref) } @@ -107,7 +102,7 @@ impl Event for Pdu { } #[inline] - fn prev_events(&self) -> impl DoubleEndedIterator + Send + '_ { + fn prev_events(&self) -> impl DoubleEndedIterator + Clone + Send + '_ { self.prev_events.iter().map(AsRef::as_ref) } @@ -129,13 +124,22 @@ impl Event for Pdu { #[inline] fn unsigned(&self) -> Option<&RawJsonValue> { self.unsigned.as_deref() } + #[inline] + fn as_mut_pdu(&mut self) -> &mut Pdu { self } + + #[inline] + fn as_pdu(&self) -> &Pdu { self } + + #[inline] + fn into_pdu(self) -> Pdu { self } + #[inline] fn is_owned(&self) -> bool { true } } impl Event for &Pdu { #[inline] - fn auth_events(&self) -> impl DoubleEndedIterator + Send + '_ { + fn auth_events(&self) -> impl DoubleEndedIterator + Clone + Send + '_ { self.auth_events.iter().map(AsRef::as_ref) } @@ -151,7 +155,7 @@ impl Event for &Pdu { } #[inline] - fn prev_events(&self) -> impl DoubleEndedIterator + Send + '_ { + fn prev_events(&self) -> impl DoubleEndedIterator + Clone + Send + '_ { self.prev_events.iter().map(AsRef::as_ref) } @@ -173,6 +177,12 @@ impl Event for &Pdu { #[inline] fn unsigned(&self) -> Option<&RawJsonValue> { self.unsigned.as_deref() } + #[inline] + fn as_pdu(&self) -> &Pdu { self } + + #[inline] + fn into_pdu(self) -> Pdu { self.clone() } + #[inline] fn is_owned(&self) -> bool { false } } diff --git a/src/core/matrix/pdu/content.rs b/src/core/matrix/pdu/content.rs deleted file mode 100644 index 4e60ce6e..00000000 --- a/src/core/matrix/pdu/content.rs +++ /dev/null @@ -1,20 +0,0 @@ -use serde::Deserialize; -use serde_json::value::Value as JsonValue; - -use crate::{Result, err, implement}; - -#[must_use] -#[implement(super::Pdu)] -pub fn get_content_as_value(&self) -> JsonValue { - self.get_content() - .expect("pdu content must be a valid JSON value") -} - -#[implement(super::Pdu)] -pub fn get_content(&self) -> Result -where - T: for<'de> Deserialize<'de>, -{ - serde_json::from_str(self.content.get()) - .map_err(|e| err!(Database("Failed to deserialize pdu content into type: {e}"))) -} diff --git a/src/core/matrix/pdu/filter.rs b/src/core/matrix/pdu/filter.rs deleted file mode 100644 index d9c2c269..00000000 --- a/src/core/matrix/pdu/filter.rs +++ /dev/null @@ -1,94 +0,0 @@ -use ruma::api::client::filter::{RoomEventFilter, UrlFilter}; -use serde_json::Value; - -use crate::{implement, is_equal_to}; - -#[implement(super::Pdu)] -#[must_use] -pub fn matches(&self, filter: &RoomEventFilter) -> bool { - if !self.matches_sender(filter) { - return false; - } - - if !self.matches_room(filter) { - return false; - } - - if !self.matches_type(filter) { - return false; - } - - if !self.matches_url(filter) { - return false; - } - - true -} - -#[implement(super::Pdu)] -fn matches_room(&self, filter: &RoomEventFilter) -> bool { - if filter.not_rooms.contains(&self.room_id) { - return false; - } - - if let Some(rooms) = filter.rooms.as_ref() { - if !rooms.contains(&self.room_id) { - return false; - } - } - - true -} - -#[implement(super::Pdu)] -fn matches_sender(&self, filter: &RoomEventFilter) -> bool { - if filter.not_senders.contains(&self.sender) { - return false; - } - - if let Some(senders) = filter.senders.as_ref() { - if !senders.contains(&self.sender) { - return false; - } - } - - true -} - -#[implement(super::Pdu)] -fn matches_type(&self, filter: &RoomEventFilter) -> bool { - let event_type = &self.kind.to_cow_str(); - if filter - .not_types - .iter() - .any(is_equal_to!(event_type)) - { - return false; - } - - if let Some(types) = filter.types.as_ref() { - if !types.iter().any(is_equal_to!(event_type)) { - return false; - } - } - - true -} - -#[implement(super::Pdu)] -fn matches_url(&self, filter: &RoomEventFilter) -> bool { - let Some(url_filter) = filter.url_filter.as_ref() else { - return true; - }; - - //TODO: might be better to use Ruma's Raw rather than serde here - let url = serde_json::from_str::(self.content.get()) - .expect("parsing content JSON failed") - .get("url") - .is_some_and(Value::is_string); - - match url_filter { - | UrlFilter::EventsWithUrl => url, - | UrlFilter::EventsWithoutUrl => !url, - } -} diff --git a/src/core/matrix/pdu/redact.rs b/src/core/matrix/pdu/redact.rs index e6a03209..896e03f8 100644 --- a/src/core/matrix/pdu/redact.rs +++ b/src/core/matrix/pdu/redact.rs @@ -1,10 +1,10 @@ use ruma::{RoomVersionId, canonical_json::redact_content_in_place}; -use serde_json::{json, value::to_raw_value}; +use serde_json::{Value as JsonValue, json, value::to_raw_value}; use crate::{Error, Result, err, implement}; #[implement(super::Pdu)] -pub fn redact(&mut self, room_version_id: &RoomVersionId, reason: &Self) -> Result { +pub fn redact(&mut self, room_version_id: &RoomVersionId, reason: JsonValue) -> Result { self.unsigned = None; let mut content = serde_json::from_str(self.content.get()) diff --git a/src/core/matrix/pdu/relation.rs b/src/core/matrix/pdu/relation.rs deleted file mode 100644 index 2968171e..00000000 --- a/src/core/matrix/pdu/relation.rs +++ /dev/null @@ -1,22 +0,0 @@ -use ruma::events::relation::RelationType; -use serde::Deserialize; - -use crate::implement; - -#[derive(Clone, Debug, Deserialize)] -struct ExtractRelType { - rel_type: RelationType, -} -#[derive(Clone, Debug, Deserialize)] -struct ExtractRelatesToEventId { - #[serde(rename = "m.relates_to")] - relates_to: ExtractRelType, -} - -#[implement(super::Pdu)] -#[must_use] -pub fn relation_type_equal(&self, rel_type: &RelationType) -> bool { - self.get_content() - .map(|c: ExtractRelatesToEventId| c.relates_to.rel_type) - .is_ok_and(|r| r == *rel_type) -} diff --git a/src/core/matrix/pdu/unsigned.rs b/src/core/matrix/pdu/unsigned.rs index 6139ec71..0c58bb68 100644 --- a/src/core/matrix/pdu/unsigned.rs +++ b/src/core/matrix/pdu/unsigned.rs @@ -1,11 +1,10 @@ use std::collections::BTreeMap; use ruma::MilliSecondsSinceUnixEpoch; -use serde::Deserialize; use serde_json::value::{RawValue as RawJsonValue, Value as JsonValue, to_raw_value}; use super::Pdu; -use crate::{Result, err, implement, is_true}; +use crate::{Result, err, implement}; #[implement(Pdu)] pub fn remove_transaction_id(&mut self) -> Result { @@ -74,44 +73,3 @@ pub fn add_relation(&mut self, name: &str, pdu: Option<&Pdu>) -> Result { Ok(()) } - -#[implement(Pdu)] -pub fn contains_unsigned_property(&self, property: &str, is_type: F) -> bool -where - F: FnOnce(&JsonValue) -> bool, -{ - self.get_unsigned_as_value() - .get(property) - .map(is_type) - .is_some_and(is_true!()) -} - -#[implement(Pdu)] -pub fn get_unsigned_property(&self, property: &str) -> Result -where - T: for<'de> Deserialize<'de>, -{ - self.get_unsigned_as_value() - .get_mut(property) - .map(JsonValue::take) - .map(serde_json::from_value) - .ok_or(err!(Request(NotFound("property not found in unsigned object"))))? - .map_err(|e| err!(Database("Failed to deserialize unsigned.{property} into type: {e}"))) -} - -#[implement(Pdu)] -#[must_use] -pub fn get_unsigned_as_value(&self) -> JsonValue { - self.get_unsigned::() - .unwrap_or_default() -} - -#[implement(Pdu)] -pub fn get_unsigned(&self) -> Result { - self.unsigned - .as_ref() - .map(|raw| raw.get()) - .map(serde_json::from_str) - .ok_or(err!(Request(NotFound("\"unsigned\" property not found in pdu"))))? - .map_err(|e| err!(Database("Failed to deserialize \"unsigned\" into value: {e}"))) -} diff --git a/src/core/matrix/state_res/mod.rs b/src/core/matrix/state_res/mod.rs index c0729811..ce299667 100644 --- a/src/core/matrix/state_res/mod.rs +++ b/src/core/matrix/state_res/mod.rs @@ -74,7 +74,7 @@ type Result = crate::Result; /// event is part of the same room. //#[tracing::instrument(level = "debug", skip(state_sets, auth_chain_sets, //#[tracing::instrument(level event_fetch))] -pub async fn resolve<'a, E, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, ExistsFut>( +pub async fn resolve<'a, Pdu, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, ExistsFut>( room_version: &RoomVersionId, state_sets: Sets, auth_chain_sets: &'a [HashSet], @@ -83,14 +83,14 @@ pub async fn resolve<'a, E, Sets, SetIter, Hasher, Fetch, FetchFut, Exists, Exis ) -> Result> where Fetch: Fn(OwnedEventId) -> FetchFut + Sync, - FetchFut: Future> + Send, + FetchFut: Future> + Send, Exists: Fn(OwnedEventId) -> ExistsFut + Sync, ExistsFut: Future + Send, Sets: IntoIterator + Send, SetIter: Iterator> + Clone + Send, Hasher: BuildHasher + Send + Sync, - E: Event + Clone + Send + Sync, - for<'b> &'b E: Event + Send, + Pdu: Event + Clone + Send + Sync, + for<'b> &'b Pdu: Event + Send, { debug!("State resolution starting"); @@ -227,6 +227,7 @@ where let state_sets_iter = state_sets_iter.inspect(|_| state_set_count = state_set_count.saturating_add(1)); + for (k, v) in state_sets_iter.flatten() { occurrences .entry(k) @@ -311,6 +312,7 @@ where let pl = get_power_level_for_sender(&event_id, fetch_event) .await .ok()?; + Some((event_id, pl)) }) .inspect(|(event_id, pl)| { diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index 8396a1d0..8e41650d 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -279,13 +279,13 @@ impl Service { return Ok(()); }; - let response_sender = if self.is_admin_room(&pdu.room_id).await { + let response_sender = if self.is_admin_room(pdu.room_id()).await { &self.services.globals.server_user } else { - &pdu.sender + pdu.sender() }; - self.respond_to_room(content, &pdu.room_id, response_sender) + self.respond_to_room(content, pdu.room_id(), response_sender) .boxed() .await } diff --git a/src/service/pusher/mod.rs b/src/service/pusher/mod.rs index 8b7aee1b..04921f34 100644 --- a/src/service/pusher/mod.rs +++ b/src/service/pusher/mod.rs @@ -24,7 +24,9 @@ use ruma::{ uint, }; use tuwunel_core::{ - Err, Event, Result, debug_warn, err, trace, + Err, Result, debug_warn, err, + matrix::Event, + trace, utils::{stream::TryIgnore, string_from_bytes}, warn, }; @@ -305,11 +307,7 @@ impl Service { .state_accessor .room_state_get(event.room_id(), &StateEventType::RoomPowerLevels, "") .await - .and_then(|ev| { - serde_json::from_str(ev.content.get()).map_err(|e| { - err!(Database(error!("invalid m.room.power_levels event: {e:?}"))) - }) - }) + .and_then(|event| event.get_content()) .unwrap_or_default(); let serialized = event.to_format(); diff --git a/src/service/rooms/alias/mod.rs b/src/service/rooms/alias/mod.rs index 4b51d331..e37fe553 100644 --- a/src/service/rooms/alias/mod.rs +++ b/src/service/rooms/alias/mod.rs @@ -12,6 +12,7 @@ use ruma::{ }; use tuwunel_core::{ Err, Result, Server, err, + matrix::Event, utils::{ReadyExt, stream::TryIgnore}, }; use tuwunel_database::{Deserialized, Ignore, Interfix, Map}; @@ -247,7 +248,7 @@ impl Service { .room_state_get(&room_id, &StateEventType::RoomCreate, "") .await { - return Ok(event.sender == user_id); + return Ok(event.sender() == user_id); } Err!(Database("Room has no m.room.create event")) diff --git a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs index a1566040..714ede22 100644 --- a/src/service/rooms/event_handler/fetch_and_handle_outliers.rs +++ b/src/service/rooms/event_handler/fetch_and_handle_outliers.rs @@ -4,11 +4,18 @@ use std::{ }; use ruma::{ - CanonicalJsonValue, OwnedEventId, RoomId, ServerName, api::federation::event::get_event, + CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName, + api::federation::event::get_event, }; use tuwunel_core::{ - PduEvent, debug, debug_error, debug_warn, implement, pdu, trace, - utils::continue_exponential_backoff_secs, warn, + debug, debug_error, debug_warn, implement, + matrix::{ + PduEvent, + event::{Event, gen_event_id_canonical_json}, + }, + trace, + utils::continue_exponential_backoff_secs, + warn, }; use super::get_room_version_id; @@ -23,13 +30,17 @@ use super::get_room_version_id; /// c. Ask origin server over federation /// d. TODO: Ask other servers over federation? #[implement(super::Service)] -pub(super) async fn fetch_and_handle_outliers<'a>( +pub(super) async fn fetch_and_handle_outliers<'a, Pdu, Events>( &self, origin: &'a ServerName, - events: &'a [OwnedEventId], - create_event: &'a PduEvent, + events: Events, + create_event: &'a Pdu, room_id: &'a RoomId, -) -> Vec<(PduEvent, Option>)> { +) -> Vec<(PduEvent, Option>)> +where + Pdu: Event + Send + Sync, + Events: Iterator + Clone + Send, +{ let back_off = |id| match self .services .globals @@ -46,22 +57,23 @@ pub(super) async fn fetch_and_handle_outliers<'a>( }, }; - let mut events_with_auth_events = Vec::with_capacity(events.len()); + let mut events_with_auth_events = Vec::with_capacity(events.clone().count()); + for id in events { // a. Look in the main timeline (pduid_pdu tree) // b. Look at outlier pdu tree // (get_pdu_json checks both) if let Ok(local_pdu) = self.services.timeline.get_pdu(id).await { - trace!("Found {id} in db"); - events_with_auth_events.push((id, Some(local_pdu), vec![])); + events_with_auth_events.push((id.to_owned(), Some(local_pdu), vec![])); continue; } // c. Ask origin server over federation // We also handle its auth chain here so we don't get a stack overflow in // handle_outlier_pdu. - let mut todo_auth_events: VecDeque<_> = [id.clone()].into(); + let mut todo_auth_events: VecDeque<_> = [id.to_owned()].into(); let mut events_in_reverse_order = Vec::with_capacity(todo_auth_events.len()); + let mut events_all = HashSet::with_capacity(todo_auth_events.len()); while let Some(next_id) = todo_auth_events.pop_front() { if let Some((time, tries)) = self @@ -117,7 +129,7 @@ pub(super) async fn fetch_and_handle_outliers<'a>( }; let Ok((calculated_event_id, value)) = - pdu::gen_event_id_canonical_json(&res.pdu, &room_version_id) + gen_event_id_canonical_json(&res.pdu, &room_version_id) else { back_off((*next_id).to_owned()); continue; @@ -160,7 +172,8 @@ pub(super) async fn fetch_and_handle_outliers<'a>( }, } } - events_with_auth_events.push((id, None, events_in_reverse_order)); + + events_with_auth_events.push((id.to_owned(), None, events_in_reverse_order)); } let mut pdus = Vec::with_capacity(events_with_auth_events.len()); @@ -217,5 +230,6 @@ pub(super) async fn fetch_and_handle_outliers<'a>( } } } + pdus } diff --git a/src/service/rooms/event_handler/fetch_prev.rs b/src/service/rooms/event_handler/fetch_prev.rs index 3c5a2cfd..5f469605 100644 --- a/src/service/rooms/event_handler/fetch_prev.rs +++ b/src/service/rooms/event_handler/fetch_prev.rs @@ -1,12 +1,16 @@ -use std::collections::{BTreeMap, HashMap, HashSet, VecDeque}; +use std::{ + collections::{BTreeMap, HashMap, HashSet, VecDeque}, + iter::once, +}; use futures::{FutureExt, future}; use ruma::{ - CanonicalJsonValue, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName, UInt, int, - uint, + CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName, + int, uint, }; use tuwunel_core::{ - PduEvent, Result, debug_warn, err, implement, + Result, debug_warn, err, implement, + matrix::{Event, PduEvent}, state_res::{self}, }; @@ -19,20 +23,26 @@ use super::check_room_id; fields(%origin), )] #[allow(clippy::type_complexity)] -pub(super) async fn fetch_prev( +pub(super) async fn fetch_prev<'a, Pdu, Events>( &self, origin: &ServerName, - create_event: &PduEvent, + create_event: &Pdu, room_id: &RoomId, - first_ts_in_room: UInt, - initial_set: Vec, + first_ts_in_room: MilliSecondsSinceUnixEpoch, + initial_set: Events, ) -> Result<( Vec, HashMap)>, -)> { - let mut graph: HashMap = HashMap::with_capacity(initial_set.len()); +)> +where + Pdu: Event + Send + Sync, + Events: Iterator + Clone + Send, +{ + let num_ids = initial_set.clone().count(); let mut eventid_info = HashMap::new(); - let mut todo_outlier_stack: VecDeque = initial_set.into(); + let mut graph: HashMap = HashMap::with_capacity(num_ids); + let mut todo_outlier_stack: VecDeque = + initial_set.map(ToOwned::to_owned).collect(); let mut amount = 0; @@ -40,7 +50,12 @@ pub(super) async fn fetch_prev( self.services.server.check_running()?; match self - .fetch_and_handle_outliers(origin, &[prev_event_id.clone()], create_event, room_id) + .fetch_and_handle_outliers( + origin, + once(prev_event_id.as_ref()), + create_event, + room_id, + ) .boxed() .await .pop() @@ -65,17 +80,17 @@ pub(super) async fn fetch_prev( } if let Some(json) = json_opt { - if pdu.origin_server_ts > first_ts_in_room { + if pdu.origin_server_ts() > first_ts_in_room { amount = amount.saturating_add(1); - for prev_prev in &pdu.prev_events { + for prev_prev in pdu.prev_events() { if !graph.contains_key(prev_prev) { - todo_outlier_stack.push_back(prev_prev.clone()); + todo_outlier_stack.push_back(prev_prev.to_owned()); } } graph.insert( prev_event_id.clone(), - pdu.prev_events.iter().cloned().collect(), + pdu.prev_events().map(ToOwned::to_owned).collect(), ); } else { // Time based check failed @@ -98,8 +113,7 @@ pub(super) async fn fetch_prev( let event_fetch = |event_id| { let origin_server_ts = eventid_info .get(&event_id) - .cloned() - .map_or_else(|| uint!(0), |info| info.0.origin_server_ts); + .map_or_else(|| uint!(0), |info| info.0.origin_server_ts().get()); // This return value is the key used for sorting events, // events are then sorted by power level, time, diff --git a/src/service/rooms/event_handler/fetch_state.rs b/src/service/rooms/event_handler/fetch_state.rs index 4c2d86af..329ad9f1 100644 --- a/src/service/rooms/event_handler/fetch_state.rs +++ b/src/service/rooms/event_handler/fetch_state.rs @@ -5,7 +5,7 @@ use ruma::{ EventId, OwnedEventId, RoomId, ServerName, api::federation::event::get_room_state_ids, events::StateEventType, }; -use tuwunel_core::{Err, Error, PduEvent, Result, debug, debug_warn, implement}; +use tuwunel_core::{Err, Result, debug, debug_warn, err, implement, matrix::Event}; use crate::rooms::short::ShortStateKey; @@ -18,13 +18,16 @@ use crate::rooms::short::ShortStateKey; skip_all, fields(%origin), )] -pub(super) async fn fetch_state( +pub(super) async fn fetch_state( &self, origin: &ServerName, - create_event: &PduEvent, + create_event: &Pdu, room_id: &RoomId, event_id: &EventId, -) -> Result>> { +) -> Result>> +where + Pdu: Event + Send + Sync, +{ let res = self .services .sending @@ -36,27 +39,27 @@ pub(super) async fn fetch_state( .inspect_err(|e| debug_warn!("Fetching state for event failed: {e}"))?; debug!("Fetching state events"); + let state_ids = res.pdu_ids.iter().map(AsRef::as_ref); let state_vec = self - .fetch_and_handle_outliers(origin, &res.pdu_ids, create_event, room_id) + .fetch_and_handle_outliers(origin, state_ids, create_event, room_id) .boxed() .await; let mut state: HashMap = HashMap::with_capacity(state_vec.len()); for (pdu, _) in state_vec { let state_key = pdu - .state_key - .clone() - .ok_or_else(|| Error::bad_database("Found non-state pdu in state events."))?; + .state_key() + .ok_or_else(|| err!(Database("Found non-state pdu in state events.")))?; let shortstatekey = self .services .short - .get_or_create_shortstatekey(&pdu.kind.to_string().into(), &state_key) + .get_or_create_shortstatekey(&pdu.kind().to_string().into(), state_key) .await; match state.entry(shortstatekey) { | hash_map::Entry::Vacant(v) => { - v.insert(pdu.event_id.clone()); + v.insert(pdu.event_id().to_owned()); }, | hash_map::Entry::Occupied(_) => { return Err!(Database( @@ -73,7 +76,11 @@ pub(super) async fn fetch_state( .get_shortstatekey(&StateEventType::RoomCreate, "") .await?; - if state.get(&create_shortstatekey) != Some(&create_event.event_id) { + if state + .get(&create_shortstatekey) + .map(AsRef::as_ref) + != Some(create_event.event_id()) + { return Err!(Database("Incoming event refers to wrong create event.")); } diff --git a/src/service/rooms/event_handler/handle_incoming_pdu.rs b/src/service/rooms/event_handler/handle_incoming_pdu.rs index 51a69d4c..acc8ee54 100644 --- a/src/service/rooms/event_handler/handle_incoming_pdu.rs +++ b/src/service/rooms/event_handler/handle_incoming_pdu.rs @@ -9,8 +9,8 @@ use futures::{ }; use ruma::{CanonicalJsonValue, EventId, RoomId, ServerName, UserId, events::StateEventType}; use tuwunel_core::{ - Err, Result, debug, debug::INFO_SPAN_LEVEL, defer, err, implement, utils::stream::IterStream, - warn, + Err, Result, debug, debug::INFO_SPAN_LEVEL, defer, err, implement, matrix::Event, + utils::stream::IterStream, warn, }; use crate::rooms::timeline::RawPduId; @@ -125,22 +125,16 @@ pub async fn handle_incoming_pdu<'a>( .timeline .first_pdu_in_room(room_id) .await? - .origin_server_ts; + .origin_server_ts(); - if incoming_pdu.origin_server_ts < first_ts_in_room { + if incoming_pdu.origin_server_ts() < first_ts_in_room { return Ok(None); } // 9. Fetch any missing prev events doing all checks listed here starting at 1. // These are timeline events let (sorted_prev_events, mut eventid_info) = self - .fetch_prev( - origin, - create_event, - room_id, - first_ts_in_room, - incoming_pdu.prev_events.clone(), - ) + .fetch_prev(origin, create_event, room_id, first_ts_in_room, incoming_pdu.prev_events()) .await?; debug!( diff --git a/src/service/rooms/event_handler/handle_outlier_pdu.rs b/src/service/rooms/event_handler/handle_outlier_pdu.rs index ca415b5d..5b0261a1 100644 --- a/src/service/rooms/event_handler/handle_outlier_pdu.rs +++ b/src/service/rooms/event_handler/handle_outlier_pdu.rs @@ -2,26 +2,30 @@ use std::collections::{BTreeMap, HashMap, hash_map}; use futures::future::ready; use ruma::{ - CanonicalJsonObject, CanonicalJsonValue, EventId, RoomId, ServerName, - api::client::error::ErrorKind, events::StateEventType, + CanonicalJsonObject, CanonicalJsonValue, EventId, RoomId, ServerName, events::StateEventType, }; use tuwunel_core::{ - Err, Error, PduEvent, Result, debug, debug_info, err, implement, state_res, trace, warn, + Err, Result, debug, debug_info, err, implement, + matrix::{Event, PduEvent}, + state_res, trace, warn, }; use super::{check_room_id, get_room_version_id, to_room_version}; #[implement(super::Service)] #[allow(clippy::too_many_arguments)] -pub(super) async fn handle_outlier_pdu<'a>( +pub(super) async fn handle_outlier_pdu<'a, Pdu>( &self, origin: &'a ServerName, - create_event: &'a PduEvent, + create_event: &'a Pdu, event_id: &'a EventId, room_id: &'a RoomId, mut value: CanonicalJsonObject, auth_events_known: bool, -) -> Result<(PduEvent, BTreeMap)> { +) -> Result<(PduEvent, BTreeMap)> +where + Pdu: Event + Send + Sync, +{ // 1. Remove unsigned field value.remove("unsigned"); @@ -30,7 +34,7 @@ pub(super) async fn handle_outlier_pdu<'a>( // 2. Check signatures, otherwise drop // 3. check content hash, redact if doesn't match let room_version_id = get_room_version_id(create_event)?; - let mut val = match self + let mut incoming_pdu = match self .services .server_keys .verify_event(&value, Some(&room_version_id)) @@ -62,13 +66,15 @@ pub(super) async fn handle_outlier_pdu<'a>( // Now that we have checked the signature and hashes we can add the eventID and // convert to our PduEvent type - val.insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.as_str().to_owned())); - let incoming_pdu = serde_json::from_value::( - serde_json::to_value(&val).expect("CanonicalJsonObj is a valid JsonValue"), + incoming_pdu + .insert("event_id".to_owned(), CanonicalJsonValue::String(event_id.as_str().to_owned())); + + let pdu_event = serde_json::from_value::( + serde_json::to_value(&incoming_pdu).expect("CanonicalJsonObj is a valid JsonValue"), ) .map_err(|e| err!(Request(BadJson(debug_warn!("Event is not a valid PDU: {e}")))))?; - check_room_id(room_id, &incoming_pdu)?; + check_room_id(room_id, &pdu_event)?; if !auth_events_known { // 4. fetch any missing auth events doing all checks listed here starting at 1. @@ -79,7 +85,7 @@ pub(super) async fn handle_outlier_pdu<'a>( debug!("Fetching auth events"); Box::pin(self.fetch_and_handle_outliers( origin, - &incoming_pdu.auth_events, + pdu_event.auth_events(), create_event, room_id, )) @@ -90,8 +96,8 @@ pub(super) async fn handle_outlier_pdu<'a>( // auth events debug!("Checking based on auth events"); // Build map of auth events - let mut auth_events = HashMap::with_capacity(incoming_pdu.auth_events.len()); - for id in &incoming_pdu.auth_events { + let mut auth_events = HashMap::with_capacity(pdu_event.auth_events().count()); + for id in pdu_event.auth_events() { let Ok(auth_event) = self.services.timeline.get_pdu(id).await else { warn!("Could not find auth event {id}"); continue; @@ -132,7 +138,7 @@ pub(super) async fn handle_outlier_pdu<'a>( let auth_check = state_res::event_auth::auth_check( &to_room_version(&room_version_id), - &incoming_pdu, + &pdu_event, None, // TODO: third party invite state_fetch, ) @@ -148,9 +154,9 @@ pub(super) async fn handle_outlier_pdu<'a>( // 7. Persist the event as an outlier. self.services .outlier - .add_pdu_outlier(&incoming_pdu.event_id, &val); + .add_pdu_outlier(pdu_event.event_id(), &incoming_pdu); trace!("Added pdu as outlier."); - Ok((incoming_pdu, val)) + Ok((pdu_event, incoming_pdu)) } diff --git a/src/service/rooms/event_handler/handle_prev_pdu.rs b/src/service/rooms/event_handler/handle_prev_pdu.rs index eb08874c..1cd6df6a 100644 --- a/src/service/rooms/event_handler/handle_prev_pdu.rs +++ b/src/service/rooms/event_handler/handle_prev_pdu.rs @@ -1,8 +1,11 @@ use std::{collections::BTreeMap, time::Instant}; -use ruma::{CanonicalJsonValue, EventId, RoomId, ServerName, UInt}; +use ruma::{CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, RoomId, ServerName}; use tuwunel_core::{ - Err, PduEvent, Result, debug, debug::INFO_SPAN_LEVEL, defer, implement, + Err, Result, debug, + debug::INFO_SPAN_LEVEL, + defer, implement, + matrix::{Event, PduEvent}, utils::continue_exponential_backoff_secs, }; @@ -15,16 +18,19 @@ use tuwunel_core::{ skip_all, fields(%prev_id), )] -pub(super) async fn handle_prev_pdu<'a>( +pub(super) async fn handle_prev_pdu<'a, Pdu>( &self, origin: &'a ServerName, event_id: &'a EventId, room_id: &'a RoomId, eventid_info: Option<(PduEvent, BTreeMap)>, - create_event: &'a PduEvent, - first_ts_in_room: UInt, + create_event: &'a Pdu, + first_ts_in_room: MilliSecondsSinceUnixEpoch, prev_id: &'a EventId, -) -> Result { +) -> Result +where + Pdu: Event + Send + Sync, +{ // Check for disabled again because it might have changed if self.services.metadata.is_disabled(room_id).await { return Err!(Request(Forbidden(debug_warn!( @@ -59,7 +65,7 @@ pub(super) async fn handle_prev_pdu<'a>( }; // Skip old events - if pdu.origin_server_ts < first_ts_in_room { + if pdu.origin_server_ts() < first_ts_in_room { return Ok(()); } diff --git a/src/service/rooms/event_handler/mod.rs b/src/service/rooms/event_handler/mod.rs index 4e5eb31d..bfb3c290 100644 --- a/src/service/rooms/event_handler/mod.rs +++ b/src/service/rooms/event_handler/mod.rs @@ -22,7 +22,11 @@ use ruma::{ OwnedEventId, OwnedRoomId, RoomId, RoomVersionId, events::room::create::RoomCreateEventContent, }; -use tuwunel_core::{Err, PduEvent, Result, RoomVersion, Server, utils::MutexMap}; +use tuwunel_core::{ + Err, Result, RoomVersion, Server, + matrix::{Event, PduEvent}, + utils::MutexMap, +}; use crate::{Dep, globals, rooms, sending, server_keys}; @@ -108,11 +112,11 @@ impl Service { } } -fn check_room_id(room_id: &RoomId, pdu: &PduEvent) -> Result { - if pdu.room_id != room_id { +fn check_room_id(room_id: &RoomId, pdu: &Pdu) -> Result { + if pdu.room_id() != room_id { return Err!(Request(InvalidParam(error!( - pdu_event_id = ?pdu.event_id, - pdu_room_id = ?pdu.room_id, + pdu_event_id = ?pdu.event_id(), + pdu_room_id = ?pdu.room_id(), ?room_id, "Found event from room in room", )))); @@ -121,7 +125,7 @@ fn check_room_id(room_id: &RoomId, pdu: &PduEvent) -> Result { Ok(()) } -fn get_room_version_id(create_event: &PduEvent) -> Result { +fn get_room_version_id(create_event: &Pdu) -> Result { let content: RoomCreateEventContent = create_event.get_content()?; let room_version = content.room_version; diff --git a/src/service/rooms/event_handler/parse_incoming_pdu.rs b/src/service/rooms/event_handler/parse_incoming_pdu.rs index 3cf126a7..58938fa3 100644 --- a/src/service/rooms/event_handler/parse_incoming_pdu.rs +++ b/src/service/rooms/event_handler/parse_incoming_pdu.rs @@ -1,6 +1,8 @@ use ruma::{CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, OwnedRoomId}; use serde_json::value::RawValue as RawJsonValue; -use tuwunel_core::{Result, err, implement, pdu::gen_event_id_canonical_json, result::FlatOk}; +use tuwunel_core::{ + Result, err, implement, matrix::event::gen_event_id_canonical_json, result::FlatOk, +}; type Parsed = (OwnedRoomId, OwnedEventId, CanonicalJsonObject); diff --git a/src/service/rooms/event_handler/state_at_incoming.rs b/src/service/rooms/event_handler/state_at_incoming.rs index 17b4389f..0542c70b 100644 --- a/src/service/rooms/event_handler/state_at_incoming.rs +++ b/src/service/rooms/event_handler/state_at_incoming.rs @@ -8,7 +8,7 @@ use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::try_join use ruma::{OwnedEventId, RoomId, RoomVersionId}; use tuwunel_core::{ Result, debug, err, implement, - matrix::{PduEvent, StateMap}, + matrix::{Event, StateMap}, trace, utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryWidebandExt}, }; @@ -19,11 +19,18 @@ use crate::rooms::short::ShortStateHash; #[implement(super::Service)] // request and build the state from a known point and resolve if > 1 prev_event #[tracing::instrument(name = "state", level = "debug", skip_all)] -pub(super) async fn state_at_incoming_degree_one( +pub(super) async fn state_at_incoming_degree_one( &self, - incoming_pdu: &PduEvent, -) -> Result>> { - let prev_event = &incoming_pdu.prev_events[0]; + incoming_pdu: &Pdu, +) -> Result>> +where + Pdu: Event + Send + Sync, +{ + let prev_event = incoming_pdu + .prev_events() + .next() + .expect("at least one prev_event"); + let Ok(prev_event_sstatehash) = self .services .state_accessor @@ -55,7 +62,7 @@ pub(super) async fn state_at_incoming_degree_one( .get_or_create_shortstatekey(&prev_pdu.kind.to_string().into(), state_key) .await; - state.insert(shortstatekey, prev_event.clone()); + state.insert(shortstatekey, prev_event.to_owned()); // Now it's the state after the pdu } @@ -66,16 +73,18 @@ pub(super) async fn state_at_incoming_degree_one( #[implement(super::Service)] #[tracing::instrument(name = "state", level = "debug", skip_all)] -pub(super) async fn state_at_incoming_resolved( +pub(super) async fn state_at_incoming_resolved( &self, - incoming_pdu: &PduEvent, + incoming_pdu: &Pdu, room_id: &RoomId, room_version_id: &RoomVersionId, -) -> Result>> { +) -> Result>> +where + Pdu: Event + Send + Sync, +{ trace!("Calculating extremity statehashes..."); let Ok(extremity_sstatehashes) = incoming_pdu - .prev_events - .iter() + .prev_events() .try_stream() .broad_and_then(|prev_eventid| { self.services @@ -133,12 +142,15 @@ pub(super) async fn state_at_incoming_resolved( } #[implement(super::Service)] -async fn state_at_incoming_fork( +async fn state_at_incoming_fork( &self, room_id: &RoomId, sstatehash: ShortStateHash, - prev_event: PduEvent, -) -> Result<(StateMap, HashSet)> { + prev_event: Pdu, +) -> Result<(StateMap, HashSet)> +where + Pdu: Event, +{ let mut leaf_state: HashMap<_, _> = self .services .state_accessor @@ -146,15 +158,15 @@ async fn state_at_incoming_fork( .collect() .await; - if let Some(state_key) = &prev_event.state_key { + if let Some(state_key) = prev_event.state_key() { let shortstatekey = self .services .short - .get_or_create_shortstatekey(&prev_event.kind.to_string().into(), state_key) + .get_or_create_shortstatekey(&prev_event.kind().to_string().into(), state_key) .await; - let event_id = &prev_event.event_id; - leaf_state.insert(shortstatekey, event_id.clone()); + let event_id = prev_event.event_id(); + leaf_state.insert(shortstatekey, event_id.to_owned()); // Now it's the state after the pdu } diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index ba5b0a73..1dbe2619 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -3,7 +3,7 @@ use std::{borrow::Borrow, collections::BTreeMap, iter::once, sync::Arc, time::In use futures::{FutureExt, StreamExt, future::ready}; use ruma::{CanonicalJsonValue, RoomId, ServerName, events::StateEventType}; use tuwunel_core::{ - Err, Result, debug, debug_info, err, implement, + Err, Result, debug, debug_info, err, implement, is_equal_to, matrix::{Event, EventTypeExt, PduEvent, StateKey, state_res}, trace, utils::stream::{BroadbandExt, ReadyExt}, @@ -17,19 +17,22 @@ use crate::rooms::{ }; #[implement(super::Service)] -pub(super) async fn upgrade_outlier_to_timeline_pdu( +pub(super) async fn upgrade_outlier_to_timeline_pdu( &self, incoming_pdu: PduEvent, val: BTreeMap, - create_event: &PduEvent, + create_event: &Pdu, origin: &ServerName, room_id: &RoomId, -) -> Result> { +) -> Result> +where + Pdu: Event + Send + Sync, +{ // Skip the PDU if we already have it as a timeline event if let Ok(pduid) = self .services .timeline - .get_pdu_id(&incoming_pdu.event_id) + .get_pdu_id(incoming_pdu.event_id()) .await { return Ok(Some(pduid)); @@ -38,7 +41,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( if self .services .pdu_metadata - .is_event_soft_failed(&incoming_pdu.event_id) + .is_event_soft_failed(incoming_pdu.event_id()) .await { return Err!(Request(InvalidParam("Event has been soft failed"))); @@ -53,7 +56,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( // These are not timeline events. debug!("Resolving state at event"); - let mut state_at_incoming_event = if incoming_pdu.prev_events.len() == 1 { + let mut state_at_incoming_event = if incoming_pdu.prev_events().count() == 1 { self.state_at_incoming_degree_one(&incoming_pdu) .await? } else { @@ -63,12 +66,13 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( if state_at_incoming_event.is_none() { state_at_incoming_event = self - .fetch_state(origin, create_event, room_id, &incoming_pdu.event_id) + .fetch_state(origin, create_event, room_id, incoming_pdu.event_id()) .await?; } let state_at_incoming_event = state_at_incoming_event.expect("we always set this to some above"); + let room_version = to_room_version(&room_version_id); debug!("Performing auth check"); @@ -109,10 +113,10 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( .state .get_auth_events( room_id, - &incoming_pdu.kind, - &incoming_pdu.sender, - incoming_pdu.state_key.as_deref(), - &incoming_pdu.content, + incoming_pdu.kind(), + incoming_pdu.sender(), + incoming_pdu.state_key(), + incoming_pdu.content(), ) .await?; @@ -139,7 +143,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( !self .services .state_accessor - .user_can_redact(&redact_id, &incoming_pdu.sender, &incoming_pdu.room_id, true) + .user_can_redact(&redact_id, incoming_pdu.sender(), incoming_pdu.room_id(), true) .await?, }; @@ -159,7 +163,9 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( .map(ToOwned::to_owned) .ready_filter(|event_id| { // Remove any that are referenced by this incoming event's prev_events - !incoming_pdu.prev_events.contains(event_id) + !incoming_pdu + .prev_events() + .any(is_equal_to!(event_id)) }) .broad_filter_map(|event_id| async move { // Only keep those extremities were not referenced yet @@ -176,7 +182,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( debug!( "Retained {} extremities checked against {} prev_events", extremities.len(), - incoming_pdu.prev_events.len() + incoming_pdu.prev_events().count() ); let state_ids_compressed: Arc = self @@ -191,20 +197,20 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( .map(Arc::new) .await; - if incoming_pdu.state_key.is_some() { + if incoming_pdu.state_key().is_some() { debug!("Event is a state-event. Deriving new room state"); // We also add state after incoming event to the fork states let mut state_after = state_at_incoming_event.clone(); - if let Some(state_key) = &incoming_pdu.state_key { + if let Some(state_key) = incoming_pdu.state_key() { let shortstatekey = self .services .short - .get_or_create_shortstatekey(&incoming_pdu.kind.to_string().into(), state_key) + .get_or_create_shortstatekey(&incoming_pdu.kind().to_string().into(), state_key) .await; - let event_id = &incoming_pdu.event_id; - state_after.insert(shortstatekey, event_id.clone()); + let event_id = incoming_pdu.event_id(); + state_after.insert(shortstatekey, event_id.to_owned()); } let new_room_state = self @@ -246,9 +252,9 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( // Soft fail, we keep the event as an outlier but don't add it to the timeline self.services .pdu_metadata - .mark_event_soft_failed(&incoming_pdu.event_id); + .mark_event_soft_failed(incoming_pdu.event_id()); - warn!("Event was soft failed: {incoming_pdu:?}"); + warn!("Event was soft failed: {:?}", incoming_pdu.event_id()); return Err!(Request(InvalidParam("Event has been soft failed"))); } @@ -259,7 +265,7 @@ pub(super) async fn upgrade_outlier_to_timeline_pdu( let extremities = extremities .iter() .map(Borrow::borrow) - .chain(once(incoming_pdu.event_id.borrow())); + .chain(once(incoming_pdu.event_id())); let pdu_id = self .services diff --git a/src/service/rooms/outlier/mod.rs b/src/service/rooms/outlier/mod.rs index 35472e3e..29dcb782 100644 --- a/src/service/rooms/outlier/mod.rs +++ b/src/service/rooms/outlier/mod.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use ruma::{CanonicalJsonObject, EventId}; -use tuwunel_core::{Result, implement, matrix::pdu::PduEvent}; +use tuwunel_core::{Result, implement, matrix::PduEvent}; use tuwunel_database::{Deserialized, Json, Map}; pub struct Service { diff --git a/src/service/rooms/pdu_metadata/data.rs b/src/service/rooms/pdu_metadata/data.rs index 734d0c47..a98b05ee 100644 --- a/src/service/rooms/pdu_metadata/data.rs +++ b/src/service/rooms/pdu_metadata/data.rs @@ -3,8 +3,8 @@ use std::{mem::size_of, sync::Arc}; use futures::{Stream, StreamExt}; use ruma::{EventId, RoomId, UserId, api::Direction}; use tuwunel_core::{ - PduCount, PduEvent, arrayvec::ArrayVec, + matrix::{Event, PduCount}, result::LogErr, utils::{ ReadyExt, @@ -33,8 +33,6 @@ struct Services { timeline: Dep, } -pub(super) type PdusIterItem = (PduCount, PduEvent); - impl Data { pub(super) fn new(args: &crate::Args<'_>) -> Self { let db = &args.db; @@ -63,7 +61,7 @@ impl Data { target: ShortEventId, from: PduCount, dir: Direction, - ) -> impl Stream + Send + '_ { + ) -> impl Stream + Send + '_ { let mut current = ArrayVec::::new(); current.extend(target.to_be_bytes()); current.extend( @@ -96,8 +94,11 @@ impl Data { .await .ok()?; - if pdu.sender != user_id { - pdu.remove_transaction_id().log_err().ok(); + if pdu.sender() != user_id { + pdu.as_mut_pdu() + .remove_transaction_id() + .log_err() + .ok(); } Some((shorteventid, pdu)) diff --git a/src/service/rooms/pdu_metadata/mod.rs b/src/service/rooms/pdu_metadata/mod.rs index 45fcbda0..35b58f46 100644 --- a/src/service/rooms/pdu_metadata/mod.rs +++ b/src/service/rooms/pdu_metadata/mod.rs @@ -3,9 +3,12 @@ use std::sync::Arc; use futures::{StreamExt, future::try_join}; use ruma::{EventId, RoomId, UserId, api::Direction}; -use tuwunel_core::{PduCount, Result}; +use tuwunel_core::{ + Result, + matrix::{Event, PduCount}, +}; -use self::data::{Data, PdusIterItem}; +use self::data::Data; use crate::{Dep, rooms}; pub struct Service { @@ -44,16 +47,16 @@ impl Service { } #[allow(clippy::too_many_arguments)] - pub async fn get_relations( - &self, - user_id: &UserId, - room_id: &RoomId, - target: &EventId, + pub async fn get_relations<'a>( + &'a self, + user_id: &'a UserId, + room_id: &'a RoomId, + target: &'a EventId, from: PduCount, limit: usize, max_depth: u8, dir: Direction, - ) -> Vec { + ) -> Vec<(PduCount, impl Event)> { let room_id = self.services.short.get_shortroomid(room_id); let target = self.services.timeline.get_pdu_count(target); diff --git a/src/service/rooms/read_receipt/mod.rs b/src/service/rooms/read_receipt/mod.rs index 14684612..a06afefc 100644 --- a/src/service/rooms/read_receipt/mod.rs +++ b/src/service/rooms/read_receipt/mod.rs @@ -13,7 +13,10 @@ use ruma::{ }; use tuwunel_core::{ Result, debug, err, - matrix::pdu::{PduCount, PduId, RawPduId}, + matrix::{ + Event, + pdu::{PduCount, PduId, RawPduId}, + }, warn, }; @@ -84,18 +87,17 @@ impl Service { "Short room ID does not exist in database for {room_id}: {e}" ))) }); - let (pdu_count, shortroomid) = try_join!(pdu_count, shortroomid)?; + let (pdu_count, shortroomid) = try_join!(pdu_count, shortroomid)?; let shorteventid = PduCount::Normal(pdu_count); let pdu_id: RawPduId = PduId { shortroomid, shorteventid }.into(); - let pdu = self .services .timeline .get_pdu_from_id(&pdu_id) .await?; - let event_id: OwnedEventId = pdu.event_id; + let event_id: OwnedEventId = pdu.event_id().to_owned(); let user_id: OwnedUserId = user_id.to_owned(); let content: BTreeMap = BTreeMap::from_iter([( event_id, diff --git a/src/service/rooms/search/mod.rs b/src/service/rooms/search/mod.rs index d407762a..eb1ac0e9 100644 --- a/src/service/rooms/search/mod.rs +++ b/src/service/rooms/search/mod.rs @@ -3,9 +3,10 @@ use std::sync::Arc; use futures::{Stream, StreamExt}; use ruma::{RoomId, UserId, api::client::search::search_events::v3::Criteria}; use tuwunel_core::{ - Event, PduCount, PduEvent, Result, + PduCount, Result, arrayvec::ArrayVec, implement, + matrix::event::{Event, Matches}, utils::{ ArrayVecExt, IterStream, ReadyExt, set, stream::{TryIgnore, WidebandExt}, @@ -103,9 +104,10 @@ pub fn deindex_pdu(&self, shortroomid: ShortRoomId, pdu_id: &RawPduId, message_b pub async fn search_pdus<'a>( &'a self, query: &'a RoomQuery<'a>, -) -> Result<(usize, impl Stream + Send + 'a)> { +) -> Result<(usize, impl Stream> + Send + '_)> { let pdu_ids: Vec<_> = self.search_pdu_ids(query).await?.collect().await; + let filter = &query.criteria.filter; let count = pdu_ids.len(); let pdus = pdu_ids .into_iter() @@ -118,11 +120,11 @@ pub async fn search_pdus<'a>( .ok() }) .ready_filter(|pdu| !pdu.is_redacted()) - .ready_filter(|pdu| pdu.matches(&query.criteria.filter)) + .ready_filter(move |pdu| filter.matches(pdu)) .wide_filter_map(move |pdu| async move { self.services .state_accessor - .user_can_see_event(query.user_id?, &pdu.room_id, &pdu.event_id) + .user_can_see_event(query.user_id?, pdu.room_id(), pdu.event_id()) .await .then_some(pdu) }) diff --git a/src/service/rooms/state/mod.rs b/src/service/rooms/state/mod.rs index cac4e9d2..9f1bfd2d 100644 --- a/src/service/rooms/state/mod.rs +++ b/src/service/rooms/state/mod.rs @@ -369,8 +369,8 @@ impl Service { &self, room_id: &RoomId, shortstatehash: u64, - _mutex_lock: &RoomMutexGuard, /* Take mutex guard to make sure users get the room - * state mutex */ + // Take mutex guard to make sure users get the room state mutex + _mutex_lock: &RoomMutexGuard, ) { const BUFSIZE: usize = size_of::(); diff --git a/src/service/rooms/state_accessor/room_state.rs b/src/service/rooms/state_accessor/room_state.rs index f4bde04a..d58acd39 100644 --- a/src/service/rooms/state_accessor/room_state.rs +++ b/src/service/rooms/state_accessor/room_state.rs @@ -5,7 +5,7 @@ use ruma::{EventId, RoomId, events::StateEventType}; use serde::Deserialize; use tuwunel_core::{ Result, err, implement, - matrix::{PduEvent, StateKey}, + matrix::{Event, StateKey}, }; /// Returns a single PDU from `room_id` with key (`event_type`,`state_key`). @@ -30,7 +30,7 @@ where pub fn room_state_full<'a>( &'a self, room_id: &'a RoomId, -) -> impl Stream> + Send + 'a { +) -> impl Stream> + Send + 'a { self.services .state .get_room_shortstatehash(room_id) @@ -45,7 +45,7 @@ pub fn room_state_full<'a>( pub fn room_state_full_pdus<'a>( &'a self, room_id: &'a RoomId, -) -> impl Stream> + Send + 'a { +) -> impl Stream> + Send + 'a { self.services .state .get_room_shortstatehash(room_id) @@ -88,7 +88,7 @@ pub async fn room_state_get( room_id: &RoomId, event_type: &StateEventType, state_key: &str, -) -> Result { +) -> Result { self.services .state .get_room_shortstatehash(room_id) diff --git a/src/service/rooms/state_accessor/state.rs b/src/service/rooms/state_accessor/state.rs index fc9234bd..b85d2075 100644 --- a/src/service/rooms/state_accessor/state.rs +++ b/src/service/rooms/state_accessor/state.rs @@ -11,7 +11,7 @@ use ruma::{ use serde::Deserialize; use tuwunel_core::{ Result, at, err, implement, - matrix::{PduEvent, StateKey}, + matrix::{Event, StateKey}, pair_of, utils::{ result::FlatOk, @@ -128,11 +128,9 @@ pub async fn state_get( shortstatehash: ShortStateHash, event_type: &StateEventType, state_key: &str, -) -> Result { +) -> Result { self.state_get_id(shortstatehash, event_type, state_key) - .and_then(|event_id: OwnedEventId| async move { - self.services.timeline.get_pdu(&event_id).await - }) + .and_then(async |event_id: OwnedEventId| self.services.timeline.get_pdu(&event_id).await) .await } @@ -321,18 +319,16 @@ pub fn state_added( pub fn state_full( &self, shortstatehash: ShortStateHash, -) -> impl Stream + Send + '_ { +) -> impl Stream + Send + '_ { self.state_full_pdus(shortstatehash) - .ready_filter_map(|pdu| { - Some(((pdu.kind.to_string().into(), pdu.state_key.clone()?), pdu)) - }) + .ready_filter_map(|pdu| Some(((pdu.kind().clone().into(), pdu.state_key()?.into()), pdu))) } #[implement(super::Service)] pub fn state_full_pdus( &self, shortstatehash: ShortStateHash, -) -> impl Stream + Send + '_ { +) -> impl Stream + Send + '_ { let short_ids = self .state_full_shortids(shortstatehash) .ignore_err() diff --git a/src/service/rooms/state_accessor/user_can.rs b/src/service/rooms/state_accessor/user_can.rs index 204b039f..75fbdd77 100644 --- a/src/service/rooms/state_accessor/user_can.rs +++ b/src/service/rooms/state_accessor/user_can.rs @@ -9,7 +9,7 @@ use ruma::{ }, }, }; -use tuwunel_core::{Err, Result, implement, pdu::PduBuilder}; +use tuwunel_core::{Err, Result, implement, matrix::Event, pdu::PduBuilder}; use crate::rooms::state::RoomMutexGuard; @@ -29,14 +29,14 @@ pub async fn user_can_redact( if redacting_event .as_ref() - .is_ok_and(|pdu| pdu.kind == TimelineEventType::RoomCreate) + .is_ok_and(|pdu| *pdu.kind() == TimelineEventType::RoomCreate) { return Err!(Request(Forbidden("Redacting m.room.create is not safe, forbidding."))); } if redacting_event .as_ref() - .is_ok_and(|pdu| pdu.kind == TimelineEventType::RoomServerAcl) + .is_ok_and(|pdu| *pdu.kind() == TimelineEventType::RoomServerAcl) { return Err!(Request(Forbidden( "Redacting m.room.server_acl will result in the room being inaccessible for \ @@ -59,9 +59,9 @@ pub async fn user_can_redact( && match redacting_event { | Ok(redacting_event) => if federation { - redacting_event.sender.server_name() == sender.server_name() + redacting_event.sender().server_name() == sender.server_name() } else { - redacting_event.sender == sender + redacting_event.sender() == sender }, | _ => false, }) @@ -72,10 +72,10 @@ pub async fn user_can_redact( .room_state_get(room_id, &StateEventType::RoomCreate, "") .await { - | Ok(room_create) => Ok(room_create.sender == sender + | Ok(room_create) => Ok(room_create.sender() == sender || redacting_event .as_ref() - .is_ok_and(|redacting_event| redacting_event.sender == sender)), + .is_ok_and(|redacting_event| redacting_event.sender() == sender)), | _ => Err!(Database( "No m.room.power_levels or m.room.create events in database for room" )), diff --git a/src/service/rooms/threads/mod.rs b/src/service/rooms/threads/mod.rs index 9fb93ba6..76526f51 100644 --- a/src/service/rooms/threads/mod.rs +++ b/src/service/rooms/threads/mod.rs @@ -49,10 +49,9 @@ impl crate::Service for Service { } impl Service { - pub async fn add_to_thread<'a, E>(&self, root_event_id: &EventId, event: &'a E) -> Result + pub async fn add_to_thread(&self, root_event_id: &EventId, event: &E) -> Result where E: Event + Send + Sync, - &'a E: Event + Send, { let root_id = self .services @@ -120,7 +119,7 @@ impl Service { self.services .timeline - .replace_pdu(&root_id, &root_pdu_json, &root_pdu) + .replace_pdu(&root_id, &root_pdu_json) .await?; } @@ -130,7 +129,7 @@ impl Service { users.extend_from_slice(&userids); }, | _ => { - users.push(root_pdu.sender); + users.push(root_pdu.sender().to_owned()); }, } users.push(event.sender().to_owned()); @@ -171,10 +170,10 @@ impl Service { .get_pdu_from_id(&pdu_id) .await .ok()?; - let pdu_id: PduId = pdu_id.into(); - if pdu.sender != user_id { - pdu.remove_transaction_id().ok(); + let pdu_id: PduId = pdu_id.into(); + if pdu.sender() != user_id { + pdu.as_mut_pdu().remove_transaction_id().ok(); } Some((pdu_id.shorteventid, pdu)) diff --git a/src/service/rooms/timeline/data.rs b/src/service/rooms/timeline/data.rs index 643b1d7b..42bede4f 100644 --- a/src/service/rooms/timeline/data.rs +++ b/src/service/rooms/timeline/data.rs @@ -211,7 +211,6 @@ impl Data { &self, pdu_id: &RawPduId, pdu_json: &CanonicalJsonObject, - _pdu: &PduEvent, ) -> Result { if self.pduid_pdu.get(pdu_id).await.is_not_found() { return Err!(Request(NotFound("PDU does not exist."))); diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 7d89d855..28acf57b 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -38,8 +38,8 @@ pub use tuwunel_core::matrix::pdu::{PduId, RawPduId}; use tuwunel_core::{ Err, Error, Result, Server, at, debug, debug_warn, err, error, implement, info, matrix::{ - Event, - pdu::{EventHash, PduBuilder, PduCount, PduEvent, gen_event_id}, + event::{Event, gen_event_id}, + pdu::{EventHash, PduBuilder, PduCount, PduEvent}, state_res::{self, RoomVersion}, }, utils::{ @@ -159,12 +159,12 @@ impl crate::Service for Service { impl Service { #[tracing::instrument(skip(self), level = "debug")] - pub async fn first_pdu_in_room(&self, room_id: &RoomId) -> Result { + pub async fn first_pdu_in_room(&self, room_id: &RoomId) -> Result { self.first_item_in_room(room_id).await.map(at!(1)) } #[tracing::instrument(skip(self), level = "debug")] - pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, PduEvent)> { + pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, impl Event)> { let pdus = self.pdus(None, room_id, None); pin_mut!(pdus); @@ -174,7 +174,7 @@ impl Service { } #[tracing::instrument(skip(self), level = "debug")] - pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result { + pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result { self.db.latest_pdu_in_room(None, room_id).await } @@ -218,13 +218,14 @@ impl Service { /// /// Checks the `eventid_outlierpdu` Tree if not found in the timeline. #[inline] - pub async fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result { + pub async fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result { self.db.get_non_outlier_pdu(event_id).await } /// Returns the pdu. /// /// Checks the `eventid_outlierpdu` Tree if not found in the timeline. + #[inline] pub async fn get_pdu(&self, event_id: &EventId) -> Result { self.db.get_pdu(event_id).await } @@ -232,11 +233,13 @@ impl Service { /// Returns the pdu. /// /// This does __NOT__ check the outliers `Tree`. + #[inline] pub async fn get_pdu_from_id(&self, pdu_id: &RawPduId) -> Result { self.db.get_pdu_from_id(pdu_id).await } /// Returns the pdu as a `BTreeMap`. + #[inline] pub async fn get_pdu_json_from_id(&self, pdu_id: &RawPduId) -> Result { self.db.get_pdu_json_from_id(pdu_id).await } @@ -244,6 +247,7 @@ impl Service { /// Checks if pdu exists /// /// Checks the `eventid_outlierpdu` Tree if not found in the timeline. + #[inline] pub fn pdu_exists<'a>( &'a self, event_id: &'a EventId, @@ -253,13 +257,8 @@ impl Service { /// Removes a pdu and creates a new one with the same id. #[tracing::instrument(skip(self), level = "debug")] - pub async fn replace_pdu( - &self, - pdu_id: &RawPduId, - pdu_json: &CanonicalJsonObject, - pdu: &PduEvent, - ) -> Result<()> { - self.db.replace_pdu(pdu_id, pdu_json, pdu).await + pub async fn replace_pdu(&self, pdu_id: &RawPduId, pdu_json: &CanonicalJsonObject) -> Result { + self.db.replace_pdu(pdu_id, pdu_json).await } /// Creates a new persisted data unit and adds it to a room. @@ -312,25 +311,21 @@ impl Service { unsigned.insert( "prev_content".to_owned(), CanonicalJsonValue::Object( - utils::to_canonical_object(prev_state.content.clone()).map_err( - |e| { - error!( - "Failed to convert prev_state to canonical JSON: {e}" - ); - Error::bad_database( - "Failed to convert prev_state to canonical JSON.", - ) - }, - )?, + utils::to_canonical_object(prev_state.get_content_as_value()) + .map_err(|e| { + err!(Database(error!( + "Failed to convert prev_state to canonical JSON: {e}", + ))) + })?, ), ); unsigned.insert( String::from("prev_sender"), - CanonicalJsonValue::String(prev_state.sender.to_string()), + CanonicalJsonValue::String(prev_state.sender().to_string()), ); unsigned.insert( String::from("replaces_state"), - CanonicalJsonValue::String(prev_state.event_id.to_string()), + CanonicalJsonValue::String(prev_state.event_id().to_string()), ); } } @@ -736,14 +731,11 @@ impl Service { .await { unsigned.insert("prev_content".to_owned(), prev_pdu.get_content_as_value()); - unsigned.insert( - "prev_sender".to_owned(), - serde_json::to_value(&prev_pdu.sender) - .expect("UserId::to_value always works"), - ); + unsigned + .insert("prev_sender".to_owned(), serde_json::to_value(prev_pdu.sender())?); unsigned.insert( "replaces_state".to_owned(), - serde_json::to_value(&prev_pdu.event_id).expect("EventId is valid json"), + serde_json::to_value(prev_pdu.event_id())?, ); } } @@ -774,7 +766,7 @@ impl Service { unsigned: if unsigned.is_empty() { None } else { - Some(to_raw_value(&unsigned).expect("to_raw_value always works")) + Some(to_raw_value(&unsigned)?) }, hashes: EventHash { sha256: "aaa".to_owned() }, signatures: None, @@ -1072,10 +1064,10 @@ impl Service { /// Replace a PDU with the redacted form. #[tracing::instrument(name = "redact", level = "debug", skip(self))] - pub async fn redact_pdu( + pub async fn redact_pdu( &self, event_id: &EventId, - reason: &PduEvent, + reason: &Pdu, shortroomid: ShortRoomId, ) -> Result { // TODO: Don't reserialize, keep original json @@ -1084,9 +1076,13 @@ impl Service { return Ok(()); }; - let mut pdu = self.get_pdu_from_id(&pdu_id).await.map_err(|e| { - err!(Database(error!(?pdu_id, ?event_id, ?e, "PDU ID points to invalid PDU."))) - })?; + let mut pdu = self + .get_pdu_from_id(&pdu_id) + .await + .map(Event::into_pdu) + .map_err(|e| { + err!(Database(error!(?pdu_id, ?event_id, ?e, "PDU ID points to invalid PDU."))) + })?; if let Ok(content) = pdu.get_content::() { if let Some(body) = content.body { @@ -1099,16 +1095,16 @@ impl Service { let room_version_id = self .services .state - .get_room_version(&pdu.room_id) + .get_room_version(pdu.room_id()) .await?; - pdu.redact(&room_version_id, reason)?; + pdu.redact(&room_version_id, reason.to_value())?; let obj = utils::to_canonical_object(&pdu).map_err(|e| { err!(Database(error!(?event_id, ?e, "Failed to convert PDU to canonical JSON"))) })?; - self.replace_pdu(&pdu_id, &obj, &pdu).await + self.replace_pdu(&pdu_id, &obj).await } #[tracing::instrument(name = "backfill", level = "debug", skip(self))] @@ -1201,7 +1197,7 @@ impl Service { backfill_server, federation::backfill::get_backfill::v1::Request { room_id: room_id.to_owned(), - v: vec![first_pdu.1.event_id.clone()], + v: vec![first_pdu.1.event_id().to_owned()], limit: uint!(100), }, ) @@ -1305,8 +1301,11 @@ impl Service { #[implement(Service)] #[tracing::instrument(skip_all, level = "debug")] -async fn check_pdu_for_admin_room(&self, pdu: &PduEvent, sender: &UserId) -> Result<()> { - match &pdu.kind { +async fn check_pdu_for_admin_room(&self, pdu: &Pdu, sender: &UserId) -> Result +where + Pdu: Event + Send + Sync, +{ + match pdu.kind() { | TimelineEventType::RoomEncryption => { return Err!(Request(Forbidden(error!("Encryption not supported in admins room.")))); }, @@ -1330,7 +1329,7 @@ async fn check_pdu_for_admin_room(&self, pdu: &PduEvent, sender: &UserId) -> Res let count = self .services .state_cache - .room_members(&pdu.room_id) + .room_members(pdu.room_id()) .ready_filter(|user| self.services.globals.user_is_local(user)) .ready_filter(|user| *user != target) .boxed() @@ -1354,7 +1353,7 @@ async fn check_pdu_for_admin_room(&self, pdu: &PduEvent, sender: &UserId) -> Res let count = self .services .state_cache - .room_members(&pdu.room_id) + .room_members(pdu.room_id()) .ready_filter(|user| self.services.globals.user_is_local(user)) .ready_filter(|user| *user != target) .boxed() diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index 43245d6d..c5244b8c 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -842,7 +842,7 @@ impl Service { let unread: UInt = self .services .user - .notification_count(&user_id, &pdu.room_id) + .notification_count(&user_id, pdu.room_id()) .await .try_into() .expect("notification count can't go that high"); diff --git a/src/service/server_keys/verify.rs b/src/service/server_keys/verify.rs index 4fb60cc7..c1a5e06c 100644 --- a/src/service/server_keys/verify.rs +++ b/src/service/server_keys/verify.rs @@ -2,7 +2,7 @@ use ruma::{ CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, RoomVersionId, signatures::Verified, }; use serde_json::value::RawValue as RawJsonValue; -use tuwunel_core::{Err, Result, implement, pdu::gen_event_id_canonical_json}; +use tuwunel_core::{Err, Result, implement, matrix::event::gen_event_id_canonical_json}; #[implement(super::Service)] pub async fn validate_and_add_event_id(