diff --git a/src/service/rooms/timeline/append.rs b/src/service/rooms/timeline/append.rs index 64c15ddb..8b7c711e 100644 --- a/src/service/rooms/timeline/append.rs +++ b/src/service/rooms/timeline/append.rs @@ -5,7 +5,7 @@ use std::{ use futures::StreamExt; use ruma::{ - CanonicalJsonObject, CanonicalJsonValue, EventId, RoomVersionId, UserId, + CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedUserId, RoomId, RoomVersionId, UserId, events::{ GlobalAccountDataEventType, StateEventType, TimelineEventType, push_rules::PushRulesEvent, @@ -26,6 +26,7 @@ use tuwunel_core::{ }, utils::{self, ReadyExt}, }; +use tuwunel_database::{Json, Map}; use super::{ExtractBody, ExtractRelatesTo, ExtractRelatesToEventId, RoomMutexGuard}; use crate::{appservice::NamespaceRegex, rooms::state_compressor::CompressedState}; @@ -181,9 +182,7 @@ where let pdu_id: RawPduId = PduId { shortroomid, shorteventid: count }.into(); // Insert pdu - self.db - .append_pdu(&pdu_id, pdu, &pdu_json, count) - .await; + self.append_pdu_json(&pdu_id, pdu, &pdu_json, count); drop(insert_lock); @@ -286,8 +285,7 @@ where .await; } - self.db - .increment_notification_counts(pdu.room_id(), notifies, highlights); + self.increment_notification_counts(pdu.room_id(), notifies, highlights); match *pdu.kind() { | TimelineEventType::RoomRedaction => { @@ -487,3 +485,55 @@ where Ok(pdu_id) } + +#[implement(super::Service)] +fn append_pdu_json( + &self, + pdu_id: &RawPduId, + pdu: &PduEvent, + json: &CanonicalJsonObject, + count: PduCount, +) { + debug_assert!(matches!(count, PduCount::Normal(_)), "PduCount not Normal"); + + self.db.pduid_pdu.raw_put(pdu_id, Json(json)); + + self.db + .eventid_pduid + .insert(pdu.event_id.as_bytes(), pdu_id); + + self.db + .eventid_outlierpdu + .remove(pdu.event_id.as_bytes()); +} + +#[implement(super::Service)] +fn increment_notification_counts( + &self, + room_id: &RoomId, + notifies: Vec, + highlights: Vec, +) { + let _cork = self.db.db.cork(); + + for user in notifies { + let mut userroom_id = user.as_bytes().to_vec(); + userroom_id.push(0xFF); + userroom_id.extend_from_slice(room_id.as_bytes()); + increment(&self.db.userroomid_notificationcount, &userroom_id); + } + + for user in highlights { + let mut userroom_id = user.as_bytes().to_vec(); + userroom_id.push(0xFF); + userroom_id.extend_from_slice(room_id.as_bytes()); + increment(&self.db.userroomid_highlightcount, &userroom_id); + } +} + +//TODO: this is an ABA +fn increment(db: &Arc, key: &[u8]) { + let old = db.get_blocking(key); + let new = utils::increment(old.ok().as_deref()); + db.insert(key, new); +} diff --git a/src/service/rooms/timeline/backfill.rs b/src/service/rooms/timeline/backfill.rs index b4917295..e766347b 100644 --- a/src/service/rooms/timeline/backfill.rs +++ b/src/service/rooms/timeline/backfill.rs @@ -2,7 +2,7 @@ use std::iter::once; use futures::{FutureExt, StreamExt}; use ruma::{ - RoomId, ServerName, + CanonicalJsonObject, EventId, RoomId, ServerName, api::federation, events::{ StateEventType, TimelineEventType, room::power_levels::RoomPowerLevelsEventContent, @@ -19,6 +19,7 @@ use tuwunel_core::{ utils::{IterStream, ReadyExt}, validated, warn, }; +use tuwunel_database::Json; use super::ExtractBody; @@ -191,8 +192,7 @@ pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box) -> .into(); // Insert pdu - self.db - .prepend_backfill_pdu(&pdu_id, &event_id, &value); + self.prepend_backfill_pdu(&pdu_id, &event_id, &value); drop(insert_lock); @@ -209,3 +209,15 @@ pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box) -> debug!("Prepended backfill pdu"); Ok(()) } + +#[implement(super::Service)] +fn prepend_backfill_pdu( + &self, + pdu_id: &RawPduId, + event_id: &EventId, + json: &CanonicalJsonObject, +) { + self.db.pduid_pdu.raw_put(pdu_id, Json(json)); + self.db.eventid_pduid.insert(event_id, pdu_id); + self.db.eventid_outlierpdu.remove(event_id); +} diff --git a/src/service/rooms/timeline/data.rs b/src/service/rooms/timeline/data.rs deleted file mode 100644 index 6443cffb..00000000 --- a/src/service/rooms/timeline/data.rs +++ /dev/null @@ -1,366 +0,0 @@ -use std::{borrow::Borrow, sync::Arc}; - -use futures::{ - Stream, TryFutureExt, TryStreamExt, - future::{ - Either::{Left, Right}, - select_ok, - }, - pin_mut, -}; -use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId, api::Direction}; -use tuwunel_core::{ - Err, PduCount, PduEvent, Result, at, err, - result::{LogErr, NotFound}, - utils, - utils::stream::TryReadyExt, -}; -use tuwunel_database::{Database, Deserialized, Json, KeyVal, Map}; - -use super::{PduId, RawPduId}; -use crate::{Dep, rooms, rooms::short::ShortRoomId}; - -pub(super) struct Data { - eventid_outlierpdu: Arc, - eventid_pduid: Arc, - pduid_pdu: Arc, - userroomid_highlightcount: Arc, - userroomid_notificationcount: Arc, - pub(super) db: Arc, - services: Services, -} - -struct Services { - short: Dep, -} - -pub type PdusIterItem = (PduCount, PduEvent); - -impl Data { - pub(super) fn new(args: &crate::Args<'_>) -> Self { - let db = &args.db; - Self { - eventid_outlierpdu: db["eventid_outlierpdu"].clone(), - eventid_pduid: db["eventid_pduid"].clone(), - pduid_pdu: db["pduid_pdu"].clone(), - userroomid_highlightcount: db["userroomid_highlightcount"].clone(), - userroomid_notificationcount: db["userroomid_notificationcount"].clone(), - db: args.db.clone(), - services: Services { - short: args.depend::("rooms::short"), - }, - } - } - - #[inline] - pub(super) async fn last_timeline_count( - &self, - sender_user: Option<&UserId>, - room_id: &RoomId, - ) -> Result { - let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max()); - - pin_mut!(pdus_rev); - let last_count = pdus_rev - .try_next() - .await? - .map(at!(0)) - .filter(|&count| matches!(count, PduCount::Normal(_))) - .unwrap_or_else(PduCount::max); - - Ok(last_count) - } - - #[inline] - pub(super) async fn latest_pdu_in_room( - &self, - sender_user: Option<&UserId>, - room_id: &RoomId, - ) -> Result { - let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max()); - - pin_mut!(pdus_rev); - pdus_rev - .try_next() - .await? - .map(at!(1)) - .ok_or_else(|| err!(Request(NotFound("no PDU's found in room")))) - } - - /// Returns the `count` of this pdu's id. - #[inline] - pub(super) async fn get_pdu_count(&self, event_id: &EventId) -> Result { - self.get_pdu_id(event_id) - .await - .map(|pdu_id| pdu_id.pdu_count()) - } - - /// Returns the json of a pdu. - #[inline] - pub(super) async fn get_outlier_pdu_json( - &self, - event_id: &EventId, - ) -> Result { - self.eventid_outlierpdu - .get(event_id) - .await - .deserialized() - } - - /// Returns the json of a pdu. - #[inline] - pub(super) async fn get_pdu_json(&self, event_id: &EventId) -> Result { - let accepted = self.get_non_outlier_pdu_json(event_id); - let outlier = self.get_outlier_pdu_json(event_id); - - pin_mut!(accepted, outlier); - select_ok([Left(accepted), Right(outlier)]) - .await - .map(at!(0)) - } - - /// Returns the json of a pdu. - #[inline] - pub(super) async fn get_non_outlier_pdu_json( - &self, - event_id: &EventId, - ) -> Result { - let pduid = self.get_pdu_id(event_id).await?; - - self.pduid_pdu.get(&pduid).await.deserialized() - } - - /// Returns the pdu's id. - #[inline] - pub(super) async fn get_pdu_id(&self, event_id: &EventId) -> Result { - self.eventid_pduid - .get(event_id) - .await - .map(|handle| RawPduId::from(&*handle)) - } - - /// Returns the pdu directly from `eventid_pduid` only. - #[inline] - pub(super) async fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result { - let pduid = self.get_pdu_id(event_id).await?; - - self.pduid_pdu.get(&pduid).await.deserialized() - } - - /// Like get_non_outlier_pdu(), but without the expense of fetching and - /// parsing the PduEvent - #[inline] - pub(super) async fn non_outlier_pdu_exists(&self, event_id: &EventId) -> Result { - let pduid = self.get_pdu_id(event_id).await?; - - self.pduid_pdu.exists(&pduid).await - } - - /// Returns the pdu. - /// - /// Checks the `eventid_outlierpdu` Tree if not found in the timeline. - #[inline] - pub(super) async fn get_outlier_pdu(&self, event_id: &EventId) -> Result { - self.eventid_outlierpdu - .get(event_id) - .await - .deserialized() - } - - /// Returns the pdu. - /// - /// Checks the `eventid_outlierpdu` Tree if not found in the timeline. - #[inline] - pub(super) async fn get_pdu(&self, event_id: &EventId) -> Result { - let accepted = self.get_non_outlier_pdu(event_id); - let outlier = self.get_outlier_pdu(event_id); - - pin_mut!(accepted, outlier); - select_ok([Left(accepted), Right(outlier)]) - .await - .map(at!(0)) - } - - /// Like get_non_outlier_pdu(), but without the expense of fetching and - /// parsing the PduEvent - #[inline] - pub(super) async fn outlier_pdu_exists(&self, event_id: &EventId) -> Result { - self.eventid_outlierpdu.exists(event_id).await - } - - /// Like get_pdu(), but without the expense of fetching and parsing the data - #[inline] - pub(super) async fn pdu_exists(&self, event_id: &EventId) -> Result { - let non_outlier = self.non_outlier_pdu_exists(event_id); - let outlier = self.outlier_pdu_exists(event_id); - - pin_mut!(non_outlier, outlier); - select_ok([Left(non_outlier), Right(outlier)]) - .await - .map(at!(0)) - } - - /// Returns the pdu. - /// - /// This does __NOT__ check the outliers `Tree`. - #[inline] - pub(super) async fn get_pdu_from_id(&self, pdu_id: &RawPduId) -> Result { - self.pduid_pdu.get(pdu_id).await.deserialized() - } - - /// Returns the pdu as a `BTreeMap`. - #[inline] - pub(super) async fn get_pdu_json_from_id( - &self, - pdu_id: &RawPduId, - ) -> Result { - self.pduid_pdu.get(pdu_id).await.deserialized() - } - - pub(super) async fn append_pdu( - &self, - pdu_id: &RawPduId, - pdu: &PduEvent, - json: &CanonicalJsonObject, - count: PduCount, - ) { - debug_assert!(matches!(count, PduCount::Normal(_)), "PduCount not Normal"); - - self.pduid_pdu.raw_put(pdu_id, Json(json)); - self.eventid_pduid - .insert(pdu.event_id.as_bytes(), pdu_id); - self.eventid_outlierpdu - .remove(pdu.event_id.as_bytes()); - } - - pub(super) fn prepend_backfill_pdu( - &self, - pdu_id: &RawPduId, - event_id: &EventId, - json: &CanonicalJsonObject, - ) { - self.pduid_pdu.raw_put(pdu_id, Json(json)); - self.eventid_pduid.insert(event_id, pdu_id); - self.eventid_outlierpdu.remove(event_id); - } - - /// Removes a pdu and creates a new one with the same id. - pub(super) async fn replace_pdu( - &self, - pdu_id: &RawPduId, - pdu_json: &CanonicalJsonObject, - ) -> Result { - if self.pduid_pdu.get(pdu_id).await.is_not_found() { - return Err!(Request(NotFound("PDU does not exist."))); - } - - self.pduid_pdu.raw_put(pdu_id, Json(pdu_json)); - - Ok(()) - } - - /// Returns an iterator over all events and their tokens in a room that - /// happened before the event with id `until` in reverse-chronological - /// order. - #[inline] - pub(super) fn pdus_rev<'a>( - &'a self, - user_id: Option<&'a UserId>, - room_id: &'a RoomId, - until: PduCount, - ) -> impl Stream> + Send + 'a { - self.count_to_id(room_id, until, Direction::Backward) - .map_ok(move |current| { - let prefix = current.shortroomid(); - self.pduid_pdu - .rev_raw_stream_from(¤t) - .ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix))) - .ready_and_then(move |item| Self::each_pdu(item, user_id)) - }) - .try_flatten_stream() - } - - #[inline] - pub(super) fn pdus<'a>( - &'a self, - user_id: Option<&'a UserId>, - room_id: &'a RoomId, - from: PduCount, - ) -> impl Stream> + Send + 'a { - self.count_to_id(room_id, from, Direction::Forward) - .map_ok(move |current| { - let prefix = current.shortroomid(); - self.pduid_pdu - .raw_stream_from(¤t) - .ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix))) - .ready_and_then(move |item| Self::each_pdu(item, user_id)) - }) - .try_flatten_stream() - } - - fn each_pdu((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> Result { - let pdu_id: RawPduId = pdu_id.into(); - - let mut pdu = serde_json::from_slice::(pdu)?; - - if Some(pdu.sender.borrow()) != user_id { - pdu.remove_transaction_id().log_err().ok(); - } - - pdu.add_age().log_err().ok(); - - Ok((pdu_id.pdu_count(), pdu)) - } - - pub(super) fn increment_notification_counts( - &self, - room_id: &RoomId, - notifies: Vec, - highlights: Vec, - ) { - let _cork = self.db.cork(); - - for user in notifies { - let mut userroom_id = user.as_bytes().to_vec(); - userroom_id.push(0xFF); - userroom_id.extend_from_slice(room_id.as_bytes()); - increment(&self.userroomid_notificationcount, &userroom_id); - } - - for user in highlights { - let mut userroom_id = user.as_bytes().to_vec(); - userroom_id.push(0xFF); - userroom_id.extend_from_slice(room_id.as_bytes()); - increment(&self.userroomid_highlightcount, &userroom_id); - } - } - - async fn count_to_id( - &self, - room_id: &RoomId, - shorteventid: PduCount, - dir: Direction, - ) -> Result { - let shortroomid: ShortRoomId = self - .services - .short - .get_shortroomid(room_id) - .await - .map_err(|e| err!(Request(NotFound("Room {room_id:?} not found: {e:?}"))))?; - - // +1 so we don't send the base event - let pdu_id = PduId { - shortroomid, - shorteventid: shorteventid.saturating_inc(dir), - }; - - Ok(pdu_id.into()) - } -} - -//TODO: this is an ABA -fn increment(db: &Arc, key: &[u8]) { - let old = db.get_blocking(key); - let new = utils::increment(old.ok().as_deref()); - db.insert(key, new); -} diff --git a/src/service/rooms/timeline/mod.rs b/src/service/rooms/timeline/mod.rs index a379b455..20b7a21a 100644 --- a/src/service/rooms/timeline/mod.rs +++ b/src/service/rooms/timeline/mod.rs @@ -2,57 +2,42 @@ mod append; mod backfill; mod build; mod create; -mod data; mod redact; -use std::{fmt::Write, sync::Arc}; +use std::{borrow::Borrow, fmt::Write, sync::Arc}; use async_trait::async_trait; -use futures::{Future, Stream, TryStreamExt, pin_mut}; +use futures::{ + Stream, TryFutureExt, TryStreamExt, + future::{ + Either::{Left, Right}, + select_ok, + }, + pin_mut, +}; use ruma::{ - CanonicalJsonObject, EventId, OwnedEventId, OwnedRoomId, RoomId, UserId, + CanonicalJsonObject, EventId, OwnedEventId, OwnedRoomId, RoomId, UserId, api::Direction, events::room::encrypted::Relation, }; use serde::Deserialize; pub use tuwunel_core::matrix::pdu::{PduId, RawPduId}; use tuwunel_core::{ - Result, Server, at, err, - matrix::{ - event::Event, - pdu::{PduCount, PduEvent}, + Err, Result, Server, at, err, implement, + matrix::pdu::{PduCount, PduEvent}, + utils::{ + MutexMap, MutexMapGuard, + result::{LogErr, NotFound}, + stream::{TryIgnore, TryReadyExt}, }, - utils::{MutexMap, MutexMapGuard, future::TryExtExt, stream::TryIgnore}, warn, }; +use tuwunel_database::{Database, Deserialized, Json, KeyVal, Map}; -use self::data::Data; -pub use self::data::PdusIterItem; use crate::{ - Dep, account_data, admin, appservice, globals, pusher, rooms, sending, server_keys, users, + Dep, account_data, admin, appservice, globals, pusher, rooms, rooms::short::ShortRoomId, + sending, server_keys, users, }; -// Update Relationships -#[derive(Deserialize)] -struct ExtractRelatesTo { - #[serde(rename = "m.relates_to")] - relates_to: Relation, -} - -#[derive(Clone, Debug, Deserialize)] -struct ExtractEventId { - event_id: OwnedEventId, -} -#[derive(Clone, Debug, Deserialize)] -struct ExtractRelatesToEventId { - #[serde(rename = "m.relates_to")] - relates_to: ExtractEventId, -} - -#[derive(Deserialize)] -struct ExtractBody { - body: Option, -} - pub struct Service { services: Services, db: Data, @@ -83,8 +68,40 @@ struct Services { event_handler: Dep, } +struct Data { + eventid_outlierpdu: Arc, + eventid_pduid: Arc, + pduid_pdu: Arc, + userroomid_highlightcount: Arc, + userroomid_notificationcount: Arc, + db: Arc, +} + +// Update Relationships +#[derive(Deserialize)] +struct ExtractRelatesTo { + #[serde(rename = "m.relates_to")] + relates_to: Relation, +} + +#[derive(Clone, Debug, Deserialize)] +struct ExtractEventId { + event_id: OwnedEventId, +} +#[derive(Clone, Debug, Deserialize)] +struct ExtractRelatesToEventId { + #[serde(rename = "m.relates_to")] + relates_to: ExtractEventId, +} + +#[derive(Deserialize)] +struct ExtractBody { + body: Option, +} + type RoomMutexMap = MutexMap; pub type RoomMutexGuard = MutexMapGuard; +pub type PdusIterItem = (PduCount, PduEvent); #[async_trait] impl crate::Service for Service { @@ -115,7 +132,14 @@ impl crate::Service for Service { event_handler: args .depend::("rooms::event_handler"), }, - db: Data::new(&args), + db: Data { + eventid_outlierpdu: args.db["eventid_outlierpdu"].clone(), + eventid_pduid: args.db["eventid_pduid"].clone(), + pduid_pdu: args.db["pduid_pdu"].clone(), + userroomid_highlightcount: args.db["userroomid_highlightcount"].clone(), + userroomid_notificationcount: args.db["userroomid_notificationcount"].clone(), + db: args.db.clone(), + }, mutex_insert: RoomMutexMap::new(), })) } @@ -130,135 +154,302 @@ impl crate::Service for Service { fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } } -impl Service { - #[tracing::instrument(skip(self), level = "debug")] - pub async fn first_pdu_in_room(&self, room_id: &RoomId) -> Result { - self.first_item_in_room(room_id).await.map(at!(1)) +/// Removes a pdu and creates a new one with the same id. +#[implement(Service)] +#[tracing::instrument(skip(self), level = "debug")] +pub async fn replace_pdu(&self, pdu_id: &RawPduId, pdu_json: &CanonicalJsonObject) -> Result { + if self.db.pduid_pdu.get(pdu_id).await.is_not_found() { + return Err!(Request(NotFound("PDU does not exist."))); } - #[tracing::instrument(skip(self), level = "debug")] - pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, impl Event)> { - let pdus = self.pdus(None, room_id, None); + self.db.pduid_pdu.raw_put(pdu_id, Json(pdu_json)); - pin_mut!(pdus); - pdus.try_next() - .await? - .ok_or_else(|| err!(Request(NotFound("No PDU found in room")))) - } - - #[tracing::instrument(skip(self), level = "debug")] - pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result { - self.db.latest_pdu_in_room(None, room_id).await - } - - #[tracing::instrument(skip(self), level = "debug")] - pub async fn last_timeline_count( - &self, - sender_user: Option<&UserId>, - room_id: &RoomId, - ) -> Result { - self.db - .last_timeline_count(sender_user, room_id) - .await - } - - /// Returns the `count` of this pdu's id. - pub async fn get_pdu_count(&self, event_id: &EventId) -> Result { - self.db.get_pdu_count(event_id).await - } - - /// Returns the json of a pdu. - pub async fn get_pdu_json(&self, event_id: &EventId) -> Result { - self.db.get_pdu_json(event_id).await - } - - /// Returns the json of a pdu. - pub async fn get_non_outlier_pdu_json( - &self, - event_id: &EventId, - ) -> Result { - self.db.get_non_outlier_pdu_json(event_id).await - } - - /// Returns the pdu's id. - pub async fn get_pdu_id(&self, event_id: &EventId) -> Result { - self.db.get_pdu_id(event_id).await - } - - /// Returns the pdu. - /// - /// Checks the `eventid_outlierpdu` Tree if not found in the timeline. - pub async fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result { - self.db.get_non_outlier_pdu(event_id).await - } - - /// Returns the pdu. - /// - /// Checks the `eventid_outlierpdu` Tree if not found in the timeline. - pub async fn get_pdu(&self, event_id: &EventId) -> Result { - self.db.get_pdu(event_id).await - } - - /// Returns the pdu. - /// - /// This does __NOT__ check the outliers `Tree`. - pub async fn get_pdu_from_id(&self, pdu_id: &RawPduId) -> Result { - self.db.get_pdu_from_id(pdu_id).await - } - - /// Returns the pdu as a `BTreeMap`. - pub async fn get_pdu_json_from_id(&self, pdu_id: &RawPduId) -> Result { - self.db.get_pdu_json_from_id(pdu_id).await - } - - /// Checks if pdu exists - /// - /// Checks the `eventid_outlierpdu` Tree if not found in the timeline. - pub fn pdu_exists<'a>( - &'a self, - event_id: &'a EventId, - ) -> impl Future + Send + 'a { - self.db.pdu_exists(event_id).is_ok() - } - - /// Removes a pdu and creates a new one with the same id. - #[tracing::instrument(skip(self), level = "debug")] - pub async fn replace_pdu(&self, pdu_id: &RawPduId, pdu_json: &CanonicalJsonObject) -> Result { - self.db.replace_pdu(pdu_id, pdu_json).await - } - - /// Returns an iterator over all PDUs in a room. Unknown rooms produce no - /// items. - pub fn all_pdus<'a>( - &'a self, - user_id: &'a UserId, - room_id: &'a RoomId, - ) -> impl Stream + Send + 'a { - self.pdus(Some(user_id), room_id, None) - .ignore_err() - } - - /// Reverse iteration starting at from. - #[tracing::instrument(skip(self), level = "debug")] - pub fn pdus_rev<'a>( - &'a self, - user_id: Option<&'a UserId>, - room_id: &'a RoomId, - until: Option, - ) -> impl Stream> + Send + 'a { - self.db - .pdus_rev(user_id, room_id, until.unwrap_or_else(PduCount::max)) - } - - /// Forward iteration starting at from. - #[tracing::instrument(skip(self), level = "debug")] - pub fn pdus<'a>( - &'a self, - user_id: Option<&'a UserId>, - room_id: &'a RoomId, - from: Option, - ) -> impl Stream> + Send + 'a { - self.db - .pdus(user_id, room_id, from.unwrap_or_else(PduCount::min)) - } + Ok(()) +} + +#[implement(Service)] +#[tracing::instrument(skip(self), level = "debug")] +pub async fn first_pdu_in_room(&self, room_id: &RoomId) -> Result { + self.first_item_in_room(room_id).await.map(at!(1)) +} + +#[implement(Service)] +#[tracing::instrument(skip(self), level = "debug")] +#[inline] +pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result { + self.latest_item_in_room(None, room_id).await +} + +#[implement(Service)] +#[tracing::instrument(skip(self), level = "debug")] +pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, PduEvent)> { + let pdus = self.pdus(None, room_id, None); + + pin_mut!(pdus); + pdus.try_next() + .await? + .ok_or_else(|| err!(Request(NotFound("No PDU found in room")))) +} + +#[implement(Service)] +#[tracing::instrument(skip(self), level = "debug")] +pub async fn latest_item_in_room( + &self, + sender_user: Option<&UserId>, + room_id: &RoomId, +) -> Result { + let pdus_rev = self.pdus_rev(sender_user, room_id, None); + + pin_mut!(pdus_rev); + pdus_rev + .try_next() + .await? + .map(at!(1)) + .ok_or_else(|| err!(Request(NotFound("no PDU's found in room")))) +} + +#[implement(Service)] +#[tracing::instrument(skip(self), level = "debug")] +pub async fn last_timeline_count( + &self, + sender_user: Option<&UserId>, + room_id: &RoomId, +) -> Result { + let pdus_rev = self.pdus_rev(sender_user, room_id, None); + + pin_mut!(pdus_rev); + let last_count = pdus_rev + .try_next() + .await? + .map(at!(0)) + .filter(|&count| matches!(count, PduCount::Normal(_))) + .unwrap_or_else(PduCount::max); + + Ok(last_count) +} + +/// Returns an iterator over all PDUs in a room. Unknown rooms produce no +/// items. +#[implement(Service)] +#[inline] +pub fn all_pdus<'a>( + &'a self, + user_id: &'a UserId, + room_id: &'a RoomId, +) -> impl Stream + Send + 'a { + self.pdus(Some(user_id), room_id, None) + .ignore_err() +} + +/// Returns an iterator over all events and their tokens in a room that +/// happened after the event with id `from` in order. +#[implement(Service)] +#[tracing::instrument(skip(self), level = "debug")] +pub fn pdus<'a>( + &'a self, + user_id: Option<&'a UserId>, + room_id: &'a RoomId, + from: Option, +) -> impl Stream> + Send + 'a { + let from = from.unwrap_or_else(PduCount::min); + self.count_to_id(room_id, from, Direction::Forward) + .map_ok(move |current| { + let prefix = current.shortroomid(); + self.db + .pduid_pdu + .raw_stream_from(¤t) + .ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix))) + .ready_and_then(move |item| Self::each_pdu(item, user_id)) + }) + .try_flatten_stream() +} + +/// Returns an iterator over all events and their tokens in a room that +/// happened before the event with id `until` in reverse-order. +#[implement(Service)] +#[tracing::instrument(skip(self), level = "debug")] +pub fn pdus_rev<'a>( + &'a self, + user_id: Option<&'a UserId>, + room_id: &'a RoomId, + until: Option, +) -> impl Stream> + Send + 'a { + let until = until.unwrap_or_else(PduCount::max); + self.count_to_id(room_id, until, Direction::Backward) + .map_ok(move |current| { + let prefix = current.shortroomid(); + self.db + .pduid_pdu + .rev_raw_stream_from(¤t) + .ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix))) + .ready_and_then(move |item| Self::each_pdu(item, user_id)) + }) + .try_flatten_stream() +} + +#[implement(Service)] +fn each_pdu((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> Result { + let pdu_id: RawPduId = pdu_id.into(); + let mut pdu = serde_json::from_slice::(pdu)?; + + if Some(pdu.sender.borrow()) != user_id { + pdu.remove_transaction_id().log_err().ok(); + } + + pdu.add_age().log_err().ok(); + + Ok((pdu_id.pdu_count(), pdu)) +} + +#[implement(Service)] +async fn count_to_id( + &self, + room_id: &RoomId, + shorteventid: PduCount, + dir: Direction, +) -> Result { + let shortroomid: ShortRoomId = self + .services + .short + .get_shortroomid(room_id) + .await + .map_err(|e| err!(Request(NotFound("Room {room_id:?} not found: {e:?}"))))?; + + // +1 so we don't send the base event + let pdu_id = PduId { + shortroomid, + shorteventid: shorteventid.saturating_inc(dir), + }; + + Ok(pdu_id.into()) +} + +/// Returns the pdu. +/// +/// Checks the `eventid_outlierpdu` Tree if not found in the timeline. +#[implement(Service)] +pub async fn get_pdu(&self, event_id: &EventId) -> Result { + let accepted = self.get_non_outlier_pdu(event_id); + let outlier = self.get_outlier_pdu(event_id); + + pin_mut!(accepted, outlier); + select_ok([Left(accepted), Right(outlier)]) + .await + .map(at!(0)) +} + +/// Returns the pdu. +/// +/// Checks the `eventid_outlierpdu` Tree if not found in the timeline. +#[implement(Service)] +pub async fn get_outlier_pdu(&self, event_id: &EventId) -> Result { + self.db + .eventid_outlierpdu + .get(event_id) + .await + .deserialized() +} + +/// Returns the pdu. +/// +/// Checks the `eventid_outlierpdu` Tree if not found in the timeline. +#[implement(Service)] +pub async fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result { + let pdu_id = self.get_pdu_id(event_id).await?; + + self.get_pdu_from_id(&pdu_id).await +} + +/// Returns the pdu. +/// +/// This does __NOT__ check the outliers `Tree`. +#[implement(Service)] +pub async fn get_pdu_from_id(&self, pdu_id: &RawPduId) -> Result { + self.db.pduid_pdu.get(pdu_id).await.deserialized() +} + +/// Returns the json of a pdu. +#[implement(Service)] +pub async fn get_pdu_json(&self, event_id: &EventId) -> Result { + let accepted = self.get_non_outlier_pdu_json(event_id); + let outlier = self.get_outlier_pdu_json(event_id); + + pin_mut!(accepted, outlier); + select_ok([Left(accepted), Right(outlier)]) + .await + .map(at!(0)) +} + +/// Returns the json of a pdu. +#[implement(Service)] +pub async fn get_outlier_pdu_json(&self, event_id: &EventId) -> Result { + self.db + .eventid_outlierpdu + .get(event_id) + .await + .deserialized() +} + +/// Returns the json of a pdu. +#[implement(Service)] +pub async fn get_non_outlier_pdu_json(&self, event_id: &EventId) -> Result { + let pdu_id = self.get_pdu_id(event_id).await?; + + self.get_pdu_json_from_id(&pdu_id).await +} + +/// Returns the pdu as a `BTreeMap`. +#[implement(Service)] +pub async fn get_pdu_json_from_id(&self, pdu_id: &RawPduId) -> Result { + self.db.pduid_pdu.get(pdu_id).await.deserialized() +} + +/// Checks if pdu exists +/// +/// Checks the `eventid_outlierpdu` Tree if not found in the timeline. +#[implement(Service)] +pub async fn pdu_exists<'a>(&'a self, event_id: &'a EventId) -> bool { + let non_outlier = self.non_outlier_pdu_exists(event_id); + let outlier = self.outlier_pdu_exists(event_id); + + pin_mut!(non_outlier, outlier); + select_ok([Left(non_outlier), Right(outlier)]) + .await + .map(at!(0)) + .is_ok() +} + +/// Like get_non_outlier_pdu(), but without the expense of fetching and +/// parsing the PduEvent +#[implement(Service)] +pub async fn non_outlier_pdu_exists(&self, event_id: &EventId) -> Result { + let pduid = self.get_pdu_id(event_id).await?; + + self.db.pduid_pdu.exists(&pduid).await +} + +/// Like get_non_outlier_pdu(), but without the expense of fetching and +/// parsing the PduEvent +#[implement(Service)] +#[inline] +pub async fn outlier_pdu_exists(&self, event_id: &EventId) -> Result { + self.db.eventid_outlierpdu.exists(event_id).await +} + +/// Returns the `count` of this pdu's id. +#[implement(Service)] +pub async fn get_pdu_count(&self, event_id: &EventId) -> Result { + self.get_pdu_id(event_id) + .await + .map(|pdu_id| pdu_id.pdu_count()) +} + +/// Returns the pdu's id. +#[implement(Service)] +pub async fn get_pdu_id(&self, event_id: &EventId) -> Result { + self.db + .eventid_pduid + .get(event_id) + .await + .map(|handle| RawPduId::from(&*handle)) }