From e6c85c97c66b0a3ebd7736722c4a501c7c24338f Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Mon, 29 Sep 2025 09:44:45 +0000 Subject: [PATCH] Fix names and types misrepresenting PduCount as ShortEventId. Add get_shorteventid_from_pdu_id() conversion. Fix prev/next nearest-state interface (dev branch 642086ecfcfa). Signed-off-by: Jason Volk --- src/admin/debug/commands.rs | 17 +--- src/admin/debug/mod.rs | 6 +- src/core/matrix/mod.rs | 7 +- src/core/matrix/pdu.rs | 4 +- src/core/matrix/pdu/count.rs | 4 + src/core/matrix/pdu/id.rs | 11 +-- src/core/matrix/pdu/raw_id.rs | 33 +++---- src/service/rooms/pdu_metadata/data.rs | 16 ++-- src/service/rooms/pdu_metadata/mod.rs | 4 +- src/service/rooms/read_receipt/mod.rs | 8 +- src/service/rooms/search/mod.rs | 6 +- src/service/rooms/short/mod.rs | 2 +- src/service/rooms/threads/mod.rs | 6 +- src/service/rooms/timeline/append.rs | 2 +- src/service/rooms/timeline/backfill.rs | 2 +- src/service/rooms/timeline/mod.rs | 116 +++++++++++++++++++++---- 16 files changed, 157 insertions(+), 87 deletions(-) diff --git a/src/admin/debug/commands.rs b/src/admin/debug/commands.rs index 00ded5f4..3442ae09 100644 --- a/src/admin/debug/commands.rs +++ b/src/admin/debug/commands.rs @@ -28,10 +28,7 @@ use tuwunel_core::{ }, warn, }; -use tuwunel_service::rooms::{ - short::{ShortEventId, ShortRoomId}, - state_compressor::HashSetCompressStateEvent, -}; +use tuwunel_service::rooms::{short::ShortRoomId, state_compressor::HashSetCompressStateEvent}; use crate::admin_command; @@ -138,16 +135,8 @@ pub(super) async fn get_pdu(&self, event_id: OwnedEventId) -> Result { } #[admin_command] -pub(super) async fn get_short_pdu( - &self, - shortroomid: ShortRoomId, - shorteventid: ShortEventId, -) -> Result { - let pdu_id: RawPduId = PduId { - shortroomid, - shorteventid: shorteventid.into(), - } - .into(); +pub(super) async fn get_short_pdu(&self, shortroomid: ShortRoomId, count: i64) -> Result { + let pdu_id: RawPduId = PduId { shortroomid, count: count.into() }.into(); let pdu_json = self .services diff --git a/src/admin/debug/mod.rs b/src/admin/debug/mod.rs index 36d50ca0..4db0acf9 100644 --- a/src/admin/debug/mod.rs +++ b/src/admin/debug/mod.rs @@ -4,7 +4,7 @@ pub(crate) mod tester; use clap::Subcommand; use ruma::{OwnedEventId, OwnedRoomId, OwnedRoomOrAliasId, OwnedServerName}; use tuwunel_core::Result; -use tuwunel_service::rooms::short::{ShortEventId, ShortRoomId}; +use tuwunel_service::rooms::short::ShortRoomId; use self::tester::TesterCommand; use crate::admin_command_dispatch; @@ -43,8 +43,8 @@ pub(super) enum DebugCommand { /// Shortroomid integer shortroomid: ShortRoomId, - /// Shorteventid integer - shorteventid: ShortEventId, + /// PduCount integer + count: i64, }, /// - Attempts to retrieve a PDU from a remote server. Inserts it into our diff --git a/src/core/matrix/mod.rs b/src/core/matrix/mod.rs index aab42398..11e5484d 100644 --- a/src/core/matrix/mod.rs +++ b/src/core/matrix/mod.rs @@ -6,6 +6,11 @@ pub mod room_version; pub mod state_res; pub use event::{Event, StateKey, TypeExt as EventTypeExt, TypeStateKey, state_key}; -pub use pdu::{EventHash, Pdu, PduBuilder, PduCount, PduEvent, PduId, RawPduId, ShortId}; +pub use pdu::{EventHash, Pdu, PduBuilder, PduCount, PduEvent, PduId, RawPduId}; pub use room_version::{RoomVersion, RoomVersionRules}; pub use state_res::{StateMap, events}; + +pub type ShortStateKey = ShortId; +pub type ShortEventId = ShortId; +pub type ShortRoomId = ShortId; +pub type ShortId = u64; diff --git a/src/core/matrix/pdu.rs b/src/core/matrix/pdu.rs index 4ccf040d..47f59d22 100644 --- a/src/core/matrix/pdu.rs +++ b/src/core/matrix/pdu.rs @@ -24,10 +24,10 @@ pub use self::{ builder::{Builder, Builder as PduBuilder}, count::Count, hashes::EventHashes as EventHash, - id::{ShortId, *}, + id::Id, raw_id::*, }; -use super::{Event, StateKey}; +use super::{Event, ShortRoomId, StateKey}; use crate::Result; /// Persistent Data Unit (Event) diff --git a/src/core/matrix/pdu/count.rs b/src/core/matrix/pdu/count.rs index 9397109a..fb64d267 100644 --- a/src/core/matrix/pdu/count.rs +++ b/src/core/matrix/pdu/count.rs @@ -17,6 +17,10 @@ pub enum Count { } impl Count { + #[inline] + #[must_use] + pub fn to_be_bytes(self) -> [u8; size_of::()] { self.into_unsigned().to_be_bytes() } + #[inline] #[must_use] pub fn from_unsigned(unsigned: u64) -> Self { Self::from_signed(unsigned as i64) } diff --git a/src/core/matrix/pdu/id.rs b/src/core/matrix/pdu/id.rs index d024ad07..e5dcc885 100644 --- a/src/core/matrix/pdu/id.rs +++ b/src/core/matrix/pdu/id.rs @@ -1,14 +1,9 @@ -use super::{Count, RawId}; - -pub type ShortRoomId = ShortId; -pub type ShortEventId = ShortId; -pub type ShortStateKey = ShortId; -pub type ShortId = u64; +use super::{Count, RawId, ShortRoomId}; #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct Id { pub shortroomid: ShortRoomId, - pub shorteventid: Count, + pub count: Count, } impl From for Id { @@ -16,7 +11,7 @@ impl From for Id { fn from(raw: RawId) -> Self { Self { shortroomid: u64::from_be_bytes(raw.shortroomid()), - shorteventid: Count::from_unsigned(u64::from_be_bytes(raw.shorteventid())), + count: Count::from_unsigned(u64::from_be_bytes(raw.count())), } } } diff --git a/src/core/matrix/pdu/raw_id.rs b/src/core/matrix/pdu/raw_id.rs index 35e37575..e9028125 100644 --- a/src/core/matrix/pdu/raw_id.rs +++ b/src/core/matrix/pdu/raw_id.rs @@ -3,7 +3,10 @@ use std::fmt; use arrayvec::ArrayVec; use serde::{Deserialize, Deserializer}; -use super::{Count, Id, ShortEventId, ShortId, ShortRoomId}; +use super::{ + super::{ShortId, ShortRoomId}, + Count, Id, +}; #[derive(Clone, Copy, Eq, Hash, PartialEq)] pub enum RawId { @@ -19,9 +22,9 @@ struct RawIdVisitor; const INT_LEN: usize = size_of::(); impl RawId { - const BACKFILLED_LEN: usize = size_of::() + INT_LEN + size_of::(); + const BACKFILLED_LEN: usize = size_of::() + INT_LEN + size_of::(); const MAX_LEN: usize = Self::BACKFILLED_LEN; - const NORMAL_LEN: usize = size_of::() + size_of::(); + const NORMAL_LEN: usize = size_of::() + size_of::(); #[inline] #[must_use] @@ -29,9 +32,9 @@ impl RawId { #[inline] #[must_use] - pub fn pdu_count(&self) -> Count { - let id: Id = (*self).into(); - id.shorteventid + pub fn pdu_count(self) -> Count { + let id: Id = self.into(); + id.count } #[inline] @@ -49,14 +52,14 @@ impl RawId { #[inline] #[must_use] - pub fn shorteventid(self) -> [u8; INT_LEN] { + pub fn count(self) -> [u8; INT_LEN] { match self { | Self::Normal(raw) => raw[INT_LEN..INT_LEN * 2] .try_into() - .expect("normal raw shorteventid array from slice"), + .expect("normal raw indice array from slice"), | Self::Backfilled(raw) => raw[INT_LEN * 2..INT_LEN * 3] .try_into() - .expect("backfilled raw shorteventid array from slice"), + .expect("backfilled raw indice array from slice"), } } @@ -103,19 +106,19 @@ impl From for RawId { let mut vec = RawVec::new(); vec.extend(id.shortroomid.to_be_bytes()); - id.shorteventid.debug_assert_valid(); - match id.shorteventid { - | Count::Normal(shorteventid) => { - vec.extend(shorteventid.to_be_bytes()); + id.count.debug_assert_valid(); + match id.count { + | Count::Normal(count) => { + vec.extend(count.to_be_bytes()); Self::Normal( vec.as_ref() .try_into() .expect("RawVec into RawId::Normal"), ) }, - | Count::Backfilled(shorteventid) => { + | Count::Backfilled(count) => { vec.extend(0_u64.to_be_bytes()); - vec.extend(shorteventid.to_be_bytes()); + vec.extend(count.to_be_bytes()); Self::Backfilled( vec.as_ref() .try_into() diff --git a/src/service/rooms/pdu_metadata/data.rs b/src/service/rooms/pdu_metadata/data.rs index 52365c4f..c0c50cf3 100644 --- a/src/service/rooms/pdu_metadata/data.rs +++ b/src/service/rooms/pdu_metadata/data.rs @@ -17,7 +17,7 @@ use tuwunel_core::{ use tuwunel_database::{Interfix, Map}; use crate::rooms::{ - short::{ShortEventId, ShortRoomId}, + short::ShortRoomId, timeline::{PduId, RawPduId}, }; @@ -52,12 +52,12 @@ impl Data { &'a self, user_id: &'a UserId, shortroomid: ShortRoomId, - target: ShortEventId, + target: PduCount, from: PduCount, dir: Direction, ) -> impl Stream + Send + '_ { let mut current = ArrayVec::::new(); - current.extend(target.to_be_bytes()); + current.extend(target.into_unsigned().to_be_bytes()); current.extend( from.saturating_inc(dir) .into_unsigned() @@ -75,12 +75,12 @@ impl Data { .boxed(), } .ignore_err() - .ready_take_while(move |key| key.starts_with(&target.to_be_bytes())) + .ready_take_while(move |key| key.starts_with(&target.into_unsigned().to_be_bytes())) .map(|to_from| u64_from_u8(&to_from[8..16])) .map(PduCount::from_unsigned) - .map(move |shorteventid| (user_id, shortroomid, shorteventid)) - .wide_filter_map(async |(user_id, shortroomid, shorteventid)| { - let pdu_id: RawPduId = PduId { shortroomid, shorteventid }.into(); + .map(move |count| (user_id, shortroomid, count)) + .wide_filter_map(async |(user_id, shortroomid, count)| { + let pdu_id: RawPduId = PduId { shortroomid, count }.into(); let mut pdu = self .services .timeline @@ -95,7 +95,7 @@ impl Data { .ok(); } - Some((shorteventid, pdu)) + Some((count, pdu)) }) } diff --git a/src/service/rooms/pdu_metadata/mod.rs b/src/service/rooms/pdu_metadata/mod.rs index bed342a3..45169a98 100644 --- a/src/service/rooms/pdu_metadata/mod.rs +++ b/src/service/rooms/pdu_metadata/mod.rs @@ -64,7 +64,7 @@ impl Service { let mut pdus: Vec<_> = self .db - .get_relations(user_id, room_id, target, from, dir) + .get_relations(user_id, room_id, target.into(), from, dir) .collect() .await; @@ -83,7 +83,7 @@ impl Service { let relations: Vec<_> = self .db - .get_relations(user_id, room_id, target, from, dir) + .get_relations(user_id, room_id, target.into(), from, dir) .collect() .await; diff --git a/src/service/rooms/read_receipt/mod.rs b/src/service/rooms/read_receipt/mod.rs index 37be64f9..0148ad05 100644 --- a/src/service/rooms/read_receipt/mod.rs +++ b/src/service/rooms/read_receipt/mod.rs @@ -63,7 +63,7 @@ impl Service { room_id: &RoomId, user_id: &UserId, ) -> Result> { - let pdu_count = self + let count = self .private_read_get_count(room_id, user_id) .map_err(|e| { err!(Database(warn!("No private read receipt was set in {room_id}: {e}"))) @@ -79,9 +79,9 @@ impl Service { ))) }); - let (pdu_count, shortroomid) = try_join!(pdu_count, shortroomid)?; - let shorteventid = PduCount::Normal(pdu_count); - let pdu_id: RawPduId = PduId { shortroomid, shorteventid }.into(); + let (count, shortroomid) = try_join!(count, shortroomid)?; + let count = PduCount::Normal(count); + let pdu_id: RawPduId = PduId { shortroomid, count }.into(); let pdu = self .services .timeline diff --git a/src/service/rooms/search/mod.rs b/src/service/rooms/search/mod.rs index 31168697..baa7d09b 100644 --- a/src/service/rooms/search/mod.rs +++ b/src/service/rooms/search/mod.rs @@ -182,11 +182,7 @@ fn search_pdu_ids_query_word( word: &str, ) -> impl Stream> + Send + '_ + use<'_> { // rustc says const'ing this not yet stable - let end_id: RawPduId = PduId { - shortroomid, - shorteventid: PduCount::max(), - } - .into(); + let end_id: RawPduId = PduId { shortroomid, count: PduCount::max() }.into(); // Newest pdus first let end = make_tokenid(shortroomid, word, &end_id); diff --git a/src/service/rooms/short/mod.rs b/src/service/rooms/short/mod.rs index d9da2e6b..edd66028 100644 --- a/src/service/rooms/short/mod.rs +++ b/src/service/rooms/short/mod.rs @@ -3,7 +3,7 @@ use std::{borrow::Borrow, fmt::Debug, mem::size_of_val, sync::Arc}; use futures::{FutureExt, Stream, StreamExt}; use ruma::{EventId, OwnedRoomId, RoomId, events::StateEventType}; use serde::Deserialize; -pub use tuwunel_core::matrix::pdu::{ShortEventId, ShortId, ShortRoomId, ShortStateKey}; +pub use tuwunel_core::matrix::{ShortEventId, ShortId, ShortRoomId, ShortStateKey}; use tuwunel_core::{ Err, Result, err, implement, matrix::StateKey, diff --git a/src/service/rooms/threads/mod.rs b/src/service/rooms/threads/mod.rs index b47247b8..c2d82986 100644 --- a/src/service/rooms/threads/mod.rs +++ b/src/service/rooms/threads/mod.rs @@ -128,7 +128,7 @@ impl Service { &'a self, user_id: &'a UserId, room_id: &'a RoomId, - shorteventid: PduCount, + count: PduCount, _inc: &'a IncludeThreads, ) -> impl Stream> + Send { self.services @@ -136,7 +136,7 @@ impl Service { .get_shortroomid(room_id) .map_ok(move |shortroomid| PduId { shortroomid, - shorteventid: shorteventid.saturating_sub(1), + count: count.saturating_sub(1), }) .map_ok(Into::into) .map_ok(move |current: RawPduId| { @@ -162,7 +162,7 @@ impl Service { pdu.as_mut_pdu().remove_transaction_id().ok(); } - Some((pdu_id.shorteventid, pdu)) + Some((pdu_id.count, pdu)) }) .map(Ok) }) diff --git a/src/service/rooms/timeline/append.rs b/src/service/rooms/timeline/append.rs index 42a51c3e..ad52e58c 100644 --- a/src/service/rooms/timeline/append.rs +++ b/src/service/rooms/timeline/append.rs @@ -178,7 +178,7 @@ where .reset_notification_counts(pdu.sender(), pdu.room_id()); let count = PduCount::Normal(*next_count1); - let pdu_id: RawPduId = PduId { shortroomid, shorteventid: count }.into(); + let pdu_id: RawPduId = PduId { shortroomid, count }.into(); // Insert pdu self.append_pdu_json(&pdu_id, pdu, &pdu_json, count); diff --git a/src/service/rooms/timeline/backfill.rs b/src/service/rooms/timeline/backfill.rs index 3db322cc..f570cd92 100644 --- a/src/service/rooms/timeline/backfill.rs +++ b/src/service/rooms/timeline/backfill.rs @@ -198,7 +198,7 @@ pub async fn backfill_pdu( let count: i64 = (*count).try_into()?; let pdu_id: RawPduId = PduId { shortroomid, - shorteventid: PduCount::Backfilled(validated!(0 - count)), + count: PduCount::Backfilled(validated!(0 - count)), } .into(); diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index 6b4c344c..8be8f766 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -23,7 +23,10 @@ use serde::Deserialize; pub use tuwunel_core::matrix::pdu::{PduId, RawPduId}; use tuwunel_core::{ Err, Result, at, err, implement, - matrix::pdu::{PduCount, PduEvent}, + matrix::{ + ShortEventId, + pdu::{PduCount, PduEvent}, + }, trace, utils::{ MutexMap, MutexMapGuard, @@ -167,7 +170,7 @@ pub async fn latest_item_in_room( } /// Returns the shortstatehash of the room at the event directly preceding the -/// exclusive `before` param. `before` does not have to be a valid shorteventid +/// exclusive `before` param. `before` does not have to be a valid count /// or in the room. #[implement(Service)] #[tracing::instrument(skip(self), level = "debug")] @@ -176,16 +179,30 @@ pub async fn prev_shortstatehash( room_id: &RoomId, before: PduCount, ) -> Result { - let prev = self.prev_timeline_count(room_id, before).await?; + let shortroomid: ShortRoomId = self + .services + .short + .get_shortroomid(room_id) + .await + .map_err(|e| err!(Request(NotFound("Room {room_id:?} not found: {e:?}"))))?; + + let before = PduId { shortroomid, count: before }; + + let prev = PduId { + shortroomid, + count: self.prev_timeline_count(&before).await?, + }; + + let shorteventid = self.get_shorteventid_from_pdu_id(&prev).await?; self.services .state - .get_shortstatehash(prev.into_unsigned()) + .get_shortstatehash(shorteventid) .await } /// Returns the shortstatehash of the room at the event directly following the -/// exclusive `after` param. `after` does not have to be a valid shorteventid or +/// exclusive `after` param. `after` does not have to be a valid count or /// in the room. #[implement(Service)] #[tracing::instrument(skip(self), level = "debug")] @@ -194,11 +211,71 @@ pub async fn next_shortstatehash( room_id: &RoomId, after: PduCount, ) -> Result { - let next = self.next_timeline_count(room_id, after).await?; + let shortroomid: ShortRoomId = self + .services + .short + .get_shortroomid(room_id) + .await + .map_err(|e| err!(Request(NotFound("Room {room_id:?} not found: {e:?}"))))?; + + let after = PduId { shortroomid, count: after }; + + let next = PduId { + shortroomid, + count: self.next_timeline_count(&after).await?, + }; + + let shorteventid = self.get_shorteventid_from_pdu_id(&next).await?; self.services .state - .get_shortstatehash(next.into_unsigned()) + .get_shortstatehash(shorteventid) + .await +} + +/// Returns the shortstatehash of the room at the event +#[implement(Service)] +#[tracing::instrument(skip(self), level = "debug")] +pub async fn get_shortstatehash( + &self, + room_id: &RoomId, + count: PduCount, +) -> Result { + let shortroomid: ShortRoomId = self + .services + .short + .get_shortroomid(room_id) + .await + .map_err(|e| err!(Request(NotFound("Room {room_id:?} not found: {e:?}"))))?; + + let pdu_id = PduId { shortroomid, count }; + + let shorteventid = self.get_shorteventid_from_pdu_id(&pdu_id).await?; + + self.services + .state + .get_shortstatehash(shorteventid) + .await +} + +/// Returns the `shorteventid` from the `pdu_id` +#[implement(Service)] +pub async fn get_shorteventid_from_pdu_id(&self, pdu_id: &PduId) -> Result { + let event_id = self.get_event_id_from_pdu_id(pdu_id).await?; + + self.services + .short + .get_shorteventid(&event_id) + .await +} + +/// Returns the `event_id` from the `pdu_id` +#[implement(Service)] +pub async fn get_event_id_from_pdu_id(&self, pdu_id: &PduId) -> Result { + let pdu_id: RawPduId = (*pdu_id).into(); + + self.get_pdu_from_id(&pdu_id) + .map_ok(|pdu| pdu.event_id) .await } @@ -206,10 +283,8 @@ pub async fn next_shortstatehash( /// `before` does not have to be a valid shorteventid or in the room. #[implement(Service)] #[tracing::instrument(skip(self), level = "debug")] -pub async fn prev_timeline_count(&self, room_id: &RoomId, before: PduCount) -> Result { - let before = self - .count_to_id(room_id, before, Direction::Backward) - .await?; +pub async fn prev_timeline_count(&self, before: &PduId) -> Result { + let before = Self::pdu_count_to_id(before.shortroomid, before.count, Direction::Backward); let pdu_ids = self .db @@ -230,10 +305,8 @@ pub async fn prev_timeline_count(&self, room_id: &RoomId, before: PduCount) -> R /// `after` does not have to be a valid shorteventid or in the room. #[implement(Service)] #[tracing::instrument(skip(self), level = "debug")] -pub async fn next_timeline_count(&self, room_id: &RoomId, after: PduCount) -> Result { - let after = self - .count_to_id(room_id, after, Direction::Forward) - .await?; +pub async fn next_timeline_count(&self, after: &PduId) -> Result { + let after = Self::pdu_count_to_id(after.shortroomid, after.count, Direction::Forward); let pdu_ids = self .db @@ -350,7 +423,7 @@ fn each_pdu((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> Result Result { let shortroomid: ShortRoomId = self @@ -360,13 +433,18 @@ async fn count_to_id( .await .map_err(|e| err!(Request(NotFound("Room {room_id:?} not found: {e:?}"))))?; + Ok(Self::pdu_count_to_id(shortroomid, count, dir)) +} + +#[implement(Service)] +fn pdu_count_to_id(shortroomid: ShortRoomId, count: PduCount, dir: Direction) -> RawPduId { // +1 so we don't send the base event let pdu_id = PduId { shortroomid, - shorteventid: shorteventid.saturating_inc(dir), + count: count.saturating_inc(dir), }; - Ok(pdu_id.into()) + pdu_id.into() } /// Returns the pdu. @@ -486,7 +564,7 @@ pub async fn outlier_pdu_exists(&self, event_id: &EventId) -> Result { pub async fn get_pdu_count(&self, event_id: &EventId) -> Result { self.get_pdu_id(event_id) .await - .map(|pdu_id| pdu_id.pdu_count()) + .map(RawPduId::pdu_count) } /// Returns the pdu's id.