Combine rooms timeline service data unit into mod; eliminate wrappers.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-08-02 03:14:26 +00:00
parent a91380ac17
commit 5d963abda6
4 changed files with 425 additions and 538 deletions

View File

@@ -5,7 +5,7 @@ use std::{
use futures::StreamExt;
use ruma::{
CanonicalJsonObject, CanonicalJsonValue, EventId, RoomVersionId, UserId,
CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedUserId, RoomId, RoomVersionId, UserId,
events::{
GlobalAccountDataEventType, StateEventType, TimelineEventType,
push_rules::PushRulesEvent,
@@ -26,6 +26,7 @@ use tuwunel_core::{
},
utils::{self, ReadyExt},
};
use tuwunel_database::{Json, Map};
use super::{ExtractBody, ExtractRelatesTo, ExtractRelatesToEventId, RoomMutexGuard};
use crate::{appservice::NamespaceRegex, rooms::state_compressor::CompressedState};
@@ -181,9 +182,7 @@ where
let pdu_id: RawPduId = PduId { shortroomid, shorteventid: count }.into();
// Insert pdu
self.db
.append_pdu(&pdu_id, pdu, &pdu_json, count)
.await;
self.append_pdu_json(&pdu_id, pdu, &pdu_json, count);
drop(insert_lock);
@@ -286,8 +285,7 @@ where
.await;
}
self.db
.increment_notification_counts(pdu.room_id(), notifies, highlights);
self.increment_notification_counts(pdu.room_id(), notifies, highlights);
match *pdu.kind() {
| TimelineEventType::RoomRedaction => {
@@ -487,3 +485,55 @@ where
Ok(pdu_id)
}
#[implement(super::Service)]
fn append_pdu_json(
&self,
pdu_id: &RawPduId,
pdu: &PduEvent,
json: &CanonicalJsonObject,
count: PduCount,
) {
debug_assert!(matches!(count, PduCount::Normal(_)), "PduCount not Normal");
self.db.pduid_pdu.raw_put(pdu_id, Json(json));
self.db
.eventid_pduid
.insert(pdu.event_id.as_bytes(), pdu_id);
self.db
.eventid_outlierpdu
.remove(pdu.event_id.as_bytes());
}
#[implement(super::Service)]
fn increment_notification_counts(
&self,
room_id: &RoomId,
notifies: Vec<OwnedUserId>,
highlights: Vec<OwnedUserId>,
) {
let _cork = self.db.db.cork();
for user in notifies {
let mut userroom_id = user.as_bytes().to_vec();
userroom_id.push(0xFF);
userroom_id.extend_from_slice(room_id.as_bytes());
increment(&self.db.userroomid_notificationcount, &userroom_id);
}
for user in highlights {
let mut userroom_id = user.as_bytes().to_vec();
userroom_id.push(0xFF);
userroom_id.extend_from_slice(room_id.as_bytes());
increment(&self.db.userroomid_highlightcount, &userroom_id);
}
}
//TODO: this is an ABA
fn increment(db: &Arc<Map>, key: &[u8]) {
let old = db.get_blocking(key);
let new = utils::increment(old.ok().as_deref());
db.insert(key, new);
}

View File

@@ -2,7 +2,7 @@ use std::iter::once;
use futures::{FutureExt, StreamExt};
use ruma::{
RoomId, ServerName,
CanonicalJsonObject, EventId, RoomId, ServerName,
api::federation,
events::{
StateEventType, TimelineEventType, room::power_levels::RoomPowerLevelsEventContent,
@@ -19,6 +19,7 @@ use tuwunel_core::{
utils::{IterStream, ReadyExt},
validated, warn,
};
use tuwunel_database::Json;
use super::ExtractBody;
@@ -191,8 +192,7 @@ pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box<RawJsonValue>) ->
.into();
// Insert pdu
self.db
.prepend_backfill_pdu(&pdu_id, &event_id, &value);
self.prepend_backfill_pdu(&pdu_id, &event_id, &value);
drop(insert_lock);
@@ -209,3 +209,15 @@ pub async fn backfill_pdu(&self, origin: &ServerName, pdu: Box<RawJsonValue>) ->
debug!("Prepended backfill pdu");
Ok(())
}
#[implement(super::Service)]
fn prepend_backfill_pdu(
&self,
pdu_id: &RawPduId,
event_id: &EventId,
json: &CanonicalJsonObject,
) {
self.db.pduid_pdu.raw_put(pdu_id, Json(json));
self.db.eventid_pduid.insert(event_id, pdu_id);
self.db.eventid_outlierpdu.remove(event_id);
}

View File

@@ -1,366 +0,0 @@
use std::{borrow::Borrow, sync::Arc};
use futures::{
Stream, TryFutureExt, TryStreamExt,
future::{
Either::{Left, Right},
select_ok,
},
pin_mut,
};
use ruma::{CanonicalJsonObject, EventId, OwnedUserId, RoomId, UserId, api::Direction};
use tuwunel_core::{
Err, PduCount, PduEvent, Result, at, err,
result::{LogErr, NotFound},
utils,
utils::stream::TryReadyExt,
};
use tuwunel_database::{Database, Deserialized, Json, KeyVal, Map};
use super::{PduId, RawPduId};
use crate::{Dep, rooms, rooms::short::ShortRoomId};
pub(super) struct Data {
eventid_outlierpdu: Arc<Map>,
eventid_pduid: Arc<Map>,
pduid_pdu: Arc<Map>,
userroomid_highlightcount: Arc<Map>,
userroomid_notificationcount: Arc<Map>,
pub(super) db: Arc<Database>,
services: Services,
}
struct Services {
short: Dep<rooms::short::Service>,
}
pub type PdusIterItem = (PduCount, PduEvent);
impl Data {
pub(super) fn new(args: &crate::Args<'_>) -> Self {
let db = &args.db;
Self {
eventid_outlierpdu: db["eventid_outlierpdu"].clone(),
eventid_pduid: db["eventid_pduid"].clone(),
pduid_pdu: db["pduid_pdu"].clone(),
userroomid_highlightcount: db["userroomid_highlightcount"].clone(),
userroomid_notificationcount: db["userroomid_notificationcount"].clone(),
db: args.db.clone(),
services: Services {
short: args.depend::<rooms::short::Service>("rooms::short"),
},
}
}
#[inline]
pub(super) async fn last_timeline_count(
&self,
sender_user: Option<&UserId>,
room_id: &RoomId,
) -> Result<PduCount> {
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
pin_mut!(pdus_rev);
let last_count = pdus_rev
.try_next()
.await?
.map(at!(0))
.filter(|&count| matches!(count, PduCount::Normal(_)))
.unwrap_or_else(PduCount::max);
Ok(last_count)
}
#[inline]
pub(super) async fn latest_pdu_in_room(
&self,
sender_user: Option<&UserId>,
room_id: &RoomId,
) -> Result<PduEvent> {
let pdus_rev = self.pdus_rev(sender_user, room_id, PduCount::max());
pin_mut!(pdus_rev);
pdus_rev
.try_next()
.await?
.map(at!(1))
.ok_or_else(|| err!(Request(NotFound("no PDU's found in room"))))
}
/// Returns the `count` of this pdu's id.
#[inline]
pub(super) async fn get_pdu_count(&self, event_id: &EventId) -> Result<PduCount> {
self.get_pdu_id(event_id)
.await
.map(|pdu_id| pdu_id.pdu_count())
}
/// Returns the json of a pdu.
#[inline]
pub(super) async fn get_outlier_pdu_json(
&self,
event_id: &EventId,
) -> Result<CanonicalJsonObject> {
self.eventid_outlierpdu
.get(event_id)
.await
.deserialized()
}
/// Returns the json of a pdu.
#[inline]
pub(super) async fn get_pdu_json(&self, event_id: &EventId) -> Result<CanonicalJsonObject> {
let accepted = self.get_non_outlier_pdu_json(event_id);
let outlier = self.get_outlier_pdu_json(event_id);
pin_mut!(accepted, outlier);
select_ok([Left(accepted), Right(outlier)])
.await
.map(at!(0))
}
/// Returns the json of a pdu.
#[inline]
pub(super) async fn get_non_outlier_pdu_json(
&self,
event_id: &EventId,
) -> Result<CanonicalJsonObject> {
let pduid = self.get_pdu_id(event_id).await?;
self.pduid_pdu.get(&pduid).await.deserialized()
}
/// Returns the pdu's id.
#[inline]
pub(super) async fn get_pdu_id(&self, event_id: &EventId) -> Result<RawPduId> {
self.eventid_pduid
.get(event_id)
.await
.map(|handle| RawPduId::from(&*handle))
}
/// Returns the pdu directly from `eventid_pduid` only.
#[inline]
pub(super) async fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result<PduEvent> {
let pduid = self.get_pdu_id(event_id).await?;
self.pduid_pdu.get(&pduid).await.deserialized()
}
/// Like get_non_outlier_pdu(), but without the expense of fetching and
/// parsing the PduEvent
#[inline]
pub(super) async fn non_outlier_pdu_exists(&self, event_id: &EventId) -> Result {
let pduid = self.get_pdu_id(event_id).await?;
self.pduid_pdu.exists(&pduid).await
}
/// Returns the pdu.
///
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
#[inline]
pub(super) async fn get_outlier_pdu(&self, event_id: &EventId) -> Result<PduEvent> {
self.eventid_outlierpdu
.get(event_id)
.await
.deserialized()
}
/// Returns the pdu.
///
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
#[inline]
pub(super) async fn get_pdu(&self, event_id: &EventId) -> Result<PduEvent> {
let accepted = self.get_non_outlier_pdu(event_id);
let outlier = self.get_outlier_pdu(event_id);
pin_mut!(accepted, outlier);
select_ok([Left(accepted), Right(outlier)])
.await
.map(at!(0))
}
/// Like get_non_outlier_pdu(), but without the expense of fetching and
/// parsing the PduEvent
#[inline]
pub(super) async fn outlier_pdu_exists(&self, event_id: &EventId) -> Result {
self.eventid_outlierpdu.exists(event_id).await
}
/// Like get_pdu(), but without the expense of fetching and parsing the data
#[inline]
pub(super) async fn pdu_exists(&self, event_id: &EventId) -> Result {
let non_outlier = self.non_outlier_pdu_exists(event_id);
let outlier = self.outlier_pdu_exists(event_id);
pin_mut!(non_outlier, outlier);
select_ok([Left(non_outlier), Right(outlier)])
.await
.map(at!(0))
}
/// Returns the pdu.
///
/// This does __NOT__ check the outliers `Tree`.
#[inline]
pub(super) async fn get_pdu_from_id(&self, pdu_id: &RawPduId) -> Result<PduEvent> {
self.pduid_pdu.get(pdu_id).await.deserialized()
}
/// Returns the pdu as a `BTreeMap<String, CanonicalJsonValue>`.
#[inline]
pub(super) async fn get_pdu_json_from_id(
&self,
pdu_id: &RawPduId,
) -> Result<CanonicalJsonObject> {
self.pduid_pdu.get(pdu_id).await.deserialized()
}
pub(super) async fn append_pdu(
&self,
pdu_id: &RawPduId,
pdu: &PduEvent,
json: &CanonicalJsonObject,
count: PduCount,
) {
debug_assert!(matches!(count, PduCount::Normal(_)), "PduCount not Normal");
self.pduid_pdu.raw_put(pdu_id, Json(json));
self.eventid_pduid
.insert(pdu.event_id.as_bytes(), pdu_id);
self.eventid_outlierpdu
.remove(pdu.event_id.as_bytes());
}
pub(super) fn prepend_backfill_pdu(
&self,
pdu_id: &RawPduId,
event_id: &EventId,
json: &CanonicalJsonObject,
) {
self.pduid_pdu.raw_put(pdu_id, Json(json));
self.eventid_pduid.insert(event_id, pdu_id);
self.eventid_outlierpdu.remove(event_id);
}
/// Removes a pdu and creates a new one with the same id.
pub(super) async fn replace_pdu(
&self,
pdu_id: &RawPduId,
pdu_json: &CanonicalJsonObject,
) -> Result {
if self.pduid_pdu.get(pdu_id).await.is_not_found() {
return Err!(Request(NotFound("PDU does not exist.")));
}
self.pduid_pdu.raw_put(pdu_id, Json(pdu_json));
Ok(())
}
/// Returns an iterator over all events and their tokens in a room that
/// happened before the event with id `until` in reverse-chronological
/// order.
#[inline]
pub(super) fn pdus_rev<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
until: PduCount,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
self.count_to_id(room_id, until, Direction::Backward)
.map_ok(move |current| {
let prefix = current.shortroomid();
self.pduid_pdu
.rev_raw_stream_from(&current)
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
.ready_and_then(move |item| Self::each_pdu(item, user_id))
})
.try_flatten_stream()
}
#[inline]
pub(super) fn pdus<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
from: PduCount,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
self.count_to_id(room_id, from, Direction::Forward)
.map_ok(move |current| {
let prefix = current.shortroomid();
self.pduid_pdu
.raw_stream_from(&current)
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
.ready_and_then(move |item| Self::each_pdu(item, user_id))
})
.try_flatten_stream()
}
fn each_pdu((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> Result<PdusIterItem> {
let pdu_id: RawPduId = pdu_id.into();
let mut pdu = serde_json::from_slice::<PduEvent>(pdu)?;
if Some(pdu.sender.borrow()) != user_id {
pdu.remove_transaction_id().log_err().ok();
}
pdu.add_age().log_err().ok();
Ok((pdu_id.pdu_count(), pdu))
}
pub(super) fn increment_notification_counts(
&self,
room_id: &RoomId,
notifies: Vec<OwnedUserId>,
highlights: Vec<OwnedUserId>,
) {
let _cork = self.db.cork();
for user in notifies {
let mut userroom_id = user.as_bytes().to_vec();
userroom_id.push(0xFF);
userroom_id.extend_from_slice(room_id.as_bytes());
increment(&self.userroomid_notificationcount, &userroom_id);
}
for user in highlights {
let mut userroom_id = user.as_bytes().to_vec();
userroom_id.push(0xFF);
userroom_id.extend_from_slice(room_id.as_bytes());
increment(&self.userroomid_highlightcount, &userroom_id);
}
}
async fn count_to_id(
&self,
room_id: &RoomId,
shorteventid: PduCount,
dir: Direction,
) -> Result<RawPduId> {
let shortroomid: ShortRoomId = self
.services
.short
.get_shortroomid(room_id)
.await
.map_err(|e| err!(Request(NotFound("Room {room_id:?} not found: {e:?}"))))?;
// +1 so we don't send the base event
let pdu_id = PduId {
shortroomid,
shorteventid: shorteventid.saturating_inc(dir),
};
Ok(pdu_id.into())
}
}
//TODO: this is an ABA
fn increment(db: &Arc<Map>, key: &[u8]) {
let old = db.get_blocking(key);
let new = utils::increment(old.ok().as_deref());
db.insert(key, new);
}

View File

@@ -2,57 +2,42 @@ mod append;
mod backfill;
mod build;
mod create;
mod data;
mod redact;
use std::{fmt::Write, sync::Arc};
use std::{borrow::Borrow, fmt::Write, sync::Arc};
use async_trait::async_trait;
use futures::{Future, Stream, TryStreamExt, pin_mut};
use futures::{
Stream, TryFutureExt, TryStreamExt,
future::{
Either::{Left, Right},
select_ok,
},
pin_mut,
};
use ruma::{
CanonicalJsonObject, EventId, OwnedEventId, OwnedRoomId, RoomId, UserId,
CanonicalJsonObject, EventId, OwnedEventId, OwnedRoomId, RoomId, UserId, api::Direction,
events::room::encrypted::Relation,
};
use serde::Deserialize;
pub use tuwunel_core::matrix::pdu::{PduId, RawPduId};
use tuwunel_core::{
Result, Server, at, err,
matrix::{
event::Event,
pdu::{PduCount, PduEvent},
Err, Result, Server, at, err, implement,
matrix::pdu::{PduCount, PduEvent},
utils::{
MutexMap, MutexMapGuard,
result::{LogErr, NotFound},
stream::{TryIgnore, TryReadyExt},
},
utils::{MutexMap, MutexMapGuard, future::TryExtExt, stream::TryIgnore},
warn,
};
use tuwunel_database::{Database, Deserialized, Json, KeyVal, Map};
use self::data::Data;
pub use self::data::PdusIterItem;
use crate::{
Dep, account_data, admin, appservice, globals, pusher, rooms, sending, server_keys, users,
Dep, account_data, admin, appservice, globals, pusher, rooms, rooms::short::ShortRoomId,
sending, server_keys, users,
};
// Update Relationships
#[derive(Deserialize)]
struct ExtractRelatesTo {
#[serde(rename = "m.relates_to")]
relates_to: Relation,
}
#[derive(Clone, Debug, Deserialize)]
struct ExtractEventId {
event_id: OwnedEventId,
}
#[derive(Clone, Debug, Deserialize)]
struct ExtractRelatesToEventId {
#[serde(rename = "m.relates_to")]
relates_to: ExtractEventId,
}
#[derive(Deserialize)]
struct ExtractBody {
body: Option<String>,
}
pub struct Service {
services: Services,
db: Data,
@@ -83,8 +68,40 @@ struct Services {
event_handler: Dep<rooms::event_handler::Service>,
}
struct Data {
eventid_outlierpdu: Arc<Map>,
eventid_pduid: Arc<Map>,
pduid_pdu: Arc<Map>,
userroomid_highlightcount: Arc<Map>,
userroomid_notificationcount: Arc<Map>,
db: Arc<Database>,
}
// Update Relationships
#[derive(Deserialize)]
struct ExtractRelatesTo {
#[serde(rename = "m.relates_to")]
relates_to: Relation,
}
#[derive(Clone, Debug, Deserialize)]
struct ExtractEventId {
event_id: OwnedEventId,
}
#[derive(Clone, Debug, Deserialize)]
struct ExtractRelatesToEventId {
#[serde(rename = "m.relates_to")]
relates_to: ExtractEventId,
}
#[derive(Deserialize)]
struct ExtractBody {
body: Option<String>,
}
type RoomMutexMap = MutexMap<OwnedRoomId, ()>;
pub type RoomMutexGuard = MutexMapGuard<OwnedRoomId, ()>;
pub type PdusIterItem = (PduCount, PduEvent);
#[async_trait]
impl crate::Service for Service {
@@ -115,7 +132,14 @@ impl crate::Service for Service {
event_handler: args
.depend::<rooms::event_handler::Service>("rooms::event_handler"),
},
db: Data::new(&args),
db: Data {
eventid_outlierpdu: args.db["eventid_outlierpdu"].clone(),
eventid_pduid: args.db["eventid_pduid"].clone(),
pduid_pdu: args.db["pduid_pdu"].clone(),
userroomid_highlightcount: args.db["userroomid_highlightcount"].clone(),
userroomid_notificationcount: args.db["userroomid_notificationcount"].clone(),
db: args.db.clone(),
},
mutex_insert: RoomMutexMap::new(),
}))
}
@@ -130,135 +154,302 @@ impl crate::Service for Service {
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
#[tracing::instrument(skip(self), level = "debug")]
pub async fn first_pdu_in_room(&self, room_id: &RoomId) -> Result<impl Event> {
self.first_item_in_room(room_id).await.map(at!(1))
/// Removes a pdu and creates a new one with the same id.
#[implement(Service)]
#[tracing::instrument(skip(self), level = "debug")]
pub async fn replace_pdu(&self, pdu_id: &RawPduId, pdu_json: &CanonicalJsonObject) -> Result {
if self.db.pduid_pdu.get(pdu_id).await.is_not_found() {
return Err!(Request(NotFound("PDU does not exist.")));
}
#[tracing::instrument(skip(self), level = "debug")]
pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, impl Event)> {
let pdus = self.pdus(None, room_id, None);
self.db.pduid_pdu.raw_put(pdu_id, Json(pdu_json));
pin_mut!(pdus);
pdus.try_next()
.await?
.ok_or_else(|| err!(Request(NotFound("No PDU found in room"))))
}
#[tracing::instrument(skip(self), level = "debug")]
pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<impl Event> {
self.db.latest_pdu_in_room(None, room_id).await
}
#[tracing::instrument(skip(self), level = "debug")]
pub async fn last_timeline_count(
&self,
sender_user: Option<&UserId>,
room_id: &RoomId,
) -> Result<PduCount> {
self.db
.last_timeline_count(sender_user, room_id)
.await
}
/// Returns the `count` of this pdu's id.
pub async fn get_pdu_count(&self, event_id: &EventId) -> Result<PduCount> {
self.db.get_pdu_count(event_id).await
}
/// Returns the json of a pdu.
pub async fn get_pdu_json(&self, event_id: &EventId) -> Result<CanonicalJsonObject> {
self.db.get_pdu_json(event_id).await
}
/// Returns the json of a pdu.
pub async fn get_non_outlier_pdu_json(
&self,
event_id: &EventId,
) -> Result<CanonicalJsonObject> {
self.db.get_non_outlier_pdu_json(event_id).await
}
/// Returns the pdu's id.
pub async fn get_pdu_id(&self, event_id: &EventId) -> Result<RawPduId> {
self.db.get_pdu_id(event_id).await
}
/// Returns the pdu.
///
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
pub async fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result<impl Event> {
self.db.get_non_outlier_pdu(event_id).await
}
/// Returns the pdu.
///
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
pub async fn get_pdu(&self, event_id: &EventId) -> Result<PduEvent> {
self.db.get_pdu(event_id).await
}
/// Returns the pdu.
///
/// This does __NOT__ check the outliers `Tree`.
pub async fn get_pdu_from_id(&self, pdu_id: &RawPduId) -> Result<PduEvent> {
self.db.get_pdu_from_id(pdu_id).await
}
/// Returns the pdu as a `BTreeMap<String, CanonicalJsonValue>`.
pub async fn get_pdu_json_from_id(&self, pdu_id: &RawPduId) -> Result<CanonicalJsonObject> {
self.db.get_pdu_json_from_id(pdu_id).await
}
/// Checks if pdu exists
///
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
pub fn pdu_exists<'a>(
&'a self,
event_id: &'a EventId,
) -> impl Future<Output = bool> + Send + 'a {
self.db.pdu_exists(event_id).is_ok()
}
/// Removes a pdu and creates a new one with the same id.
#[tracing::instrument(skip(self), level = "debug")]
pub async fn replace_pdu(&self, pdu_id: &RawPduId, pdu_json: &CanonicalJsonObject) -> Result {
self.db.replace_pdu(pdu_id, pdu_json).await
}
/// Returns an iterator over all PDUs in a room. Unknown rooms produce no
/// items.
pub fn all_pdus<'a>(
&'a self,
user_id: &'a UserId,
room_id: &'a RoomId,
) -> impl Stream<Item = PdusIterItem> + Send + 'a {
self.pdus(Some(user_id), room_id, None)
.ignore_err()
}
/// Reverse iteration starting at from.
#[tracing::instrument(skip(self), level = "debug")]
pub fn pdus_rev<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
until: Option<PduCount>,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
self.db
.pdus_rev(user_id, room_id, until.unwrap_or_else(PduCount::max))
}
/// Forward iteration starting at from.
#[tracing::instrument(skip(self), level = "debug")]
pub fn pdus<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
from: Option<PduCount>,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
self.db
.pdus(user_id, room_id, from.unwrap_or_else(PduCount::min))
}
Ok(())
}
#[implement(Service)]
#[tracing::instrument(skip(self), level = "debug")]
pub async fn first_pdu_in_room(&self, room_id: &RoomId) -> Result<PduEvent> {
self.first_item_in_room(room_id).await.map(at!(1))
}
#[implement(Service)]
#[tracing::instrument(skip(self), level = "debug")]
#[inline]
pub async fn latest_pdu_in_room(&self, room_id: &RoomId) -> Result<PduEvent> {
self.latest_item_in_room(None, room_id).await
}
#[implement(Service)]
#[tracing::instrument(skip(self), level = "debug")]
pub async fn first_item_in_room(&self, room_id: &RoomId) -> Result<(PduCount, PduEvent)> {
let pdus = self.pdus(None, room_id, None);
pin_mut!(pdus);
pdus.try_next()
.await?
.ok_or_else(|| err!(Request(NotFound("No PDU found in room"))))
}
#[implement(Service)]
#[tracing::instrument(skip(self), level = "debug")]
pub async fn latest_item_in_room(
&self,
sender_user: Option<&UserId>,
room_id: &RoomId,
) -> Result<PduEvent> {
let pdus_rev = self.pdus_rev(sender_user, room_id, None);
pin_mut!(pdus_rev);
pdus_rev
.try_next()
.await?
.map(at!(1))
.ok_or_else(|| err!(Request(NotFound("no PDU's found in room"))))
}
#[implement(Service)]
#[tracing::instrument(skip(self), level = "debug")]
pub async fn last_timeline_count(
&self,
sender_user: Option<&UserId>,
room_id: &RoomId,
) -> Result<PduCount> {
let pdus_rev = self.pdus_rev(sender_user, room_id, None);
pin_mut!(pdus_rev);
let last_count = pdus_rev
.try_next()
.await?
.map(at!(0))
.filter(|&count| matches!(count, PduCount::Normal(_)))
.unwrap_or_else(PduCount::max);
Ok(last_count)
}
/// Returns an iterator over all PDUs in a room. Unknown rooms produce no
/// items.
#[implement(Service)]
#[inline]
pub fn all_pdus<'a>(
&'a self,
user_id: &'a UserId,
room_id: &'a RoomId,
) -> impl Stream<Item = PdusIterItem> + Send + 'a {
self.pdus(Some(user_id), room_id, None)
.ignore_err()
}
/// Returns an iterator over all events and their tokens in a room that
/// happened after the event with id `from` in order.
#[implement(Service)]
#[tracing::instrument(skip(self), level = "debug")]
pub fn pdus<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
from: Option<PduCount>,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
let from = from.unwrap_or_else(PduCount::min);
self.count_to_id(room_id, from, Direction::Forward)
.map_ok(move |current| {
let prefix = current.shortroomid();
self.db
.pduid_pdu
.raw_stream_from(&current)
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
.ready_and_then(move |item| Self::each_pdu(item, user_id))
})
.try_flatten_stream()
}
/// Returns an iterator over all events and their tokens in a room that
/// happened before the event with id `until` in reverse-order.
#[implement(Service)]
#[tracing::instrument(skip(self), level = "debug")]
pub fn pdus_rev<'a>(
&'a self,
user_id: Option<&'a UserId>,
room_id: &'a RoomId,
until: Option<PduCount>,
) -> impl Stream<Item = Result<PdusIterItem>> + Send + 'a {
let until = until.unwrap_or_else(PduCount::max);
self.count_to_id(room_id, until, Direction::Backward)
.map_ok(move |current| {
let prefix = current.shortroomid();
self.db
.pduid_pdu
.rev_raw_stream_from(&current)
.ready_try_take_while(move |(key, _)| Ok(key.starts_with(&prefix)))
.ready_and_then(move |item| Self::each_pdu(item, user_id))
})
.try_flatten_stream()
}
#[implement(Service)]
fn each_pdu((pdu_id, pdu): KeyVal<'_>, user_id: Option<&UserId>) -> Result<PdusIterItem> {
let pdu_id: RawPduId = pdu_id.into();
let mut pdu = serde_json::from_slice::<PduEvent>(pdu)?;
if Some(pdu.sender.borrow()) != user_id {
pdu.remove_transaction_id().log_err().ok();
}
pdu.add_age().log_err().ok();
Ok((pdu_id.pdu_count(), pdu))
}
#[implement(Service)]
async fn count_to_id(
&self,
room_id: &RoomId,
shorteventid: PduCount,
dir: Direction,
) -> Result<RawPduId> {
let shortroomid: ShortRoomId = self
.services
.short
.get_shortroomid(room_id)
.await
.map_err(|e| err!(Request(NotFound("Room {room_id:?} not found: {e:?}"))))?;
// +1 so we don't send the base event
let pdu_id = PduId {
shortroomid,
shorteventid: shorteventid.saturating_inc(dir),
};
Ok(pdu_id.into())
}
/// Returns the pdu.
///
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
#[implement(Service)]
pub async fn get_pdu(&self, event_id: &EventId) -> Result<PduEvent> {
let accepted = self.get_non_outlier_pdu(event_id);
let outlier = self.get_outlier_pdu(event_id);
pin_mut!(accepted, outlier);
select_ok([Left(accepted), Right(outlier)])
.await
.map(at!(0))
}
/// Returns the pdu.
///
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
#[implement(Service)]
pub async fn get_outlier_pdu(&self, event_id: &EventId) -> Result<PduEvent> {
self.db
.eventid_outlierpdu
.get(event_id)
.await
.deserialized()
}
/// Returns the pdu.
///
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
#[implement(Service)]
pub async fn get_non_outlier_pdu(&self, event_id: &EventId) -> Result<PduEvent> {
let pdu_id = self.get_pdu_id(event_id).await?;
self.get_pdu_from_id(&pdu_id).await
}
/// Returns the pdu.
///
/// This does __NOT__ check the outliers `Tree`.
#[implement(Service)]
pub async fn get_pdu_from_id(&self, pdu_id: &RawPduId) -> Result<PduEvent> {
self.db.pduid_pdu.get(pdu_id).await.deserialized()
}
/// Returns the json of a pdu.
#[implement(Service)]
pub async fn get_pdu_json(&self, event_id: &EventId) -> Result<CanonicalJsonObject> {
let accepted = self.get_non_outlier_pdu_json(event_id);
let outlier = self.get_outlier_pdu_json(event_id);
pin_mut!(accepted, outlier);
select_ok([Left(accepted), Right(outlier)])
.await
.map(at!(0))
}
/// Returns the json of a pdu.
#[implement(Service)]
pub async fn get_outlier_pdu_json(&self, event_id: &EventId) -> Result<CanonicalJsonObject> {
self.db
.eventid_outlierpdu
.get(event_id)
.await
.deserialized()
}
/// Returns the json of a pdu.
#[implement(Service)]
pub async fn get_non_outlier_pdu_json(&self, event_id: &EventId) -> Result<CanonicalJsonObject> {
let pdu_id = self.get_pdu_id(event_id).await?;
self.get_pdu_json_from_id(&pdu_id).await
}
/// Returns the pdu as a `BTreeMap<String, CanonicalJsonValue>`.
#[implement(Service)]
pub async fn get_pdu_json_from_id(&self, pdu_id: &RawPduId) -> Result<CanonicalJsonObject> {
self.db.pduid_pdu.get(pdu_id).await.deserialized()
}
/// Checks if pdu exists
///
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
#[implement(Service)]
pub async fn pdu_exists<'a>(&'a self, event_id: &'a EventId) -> bool {
let non_outlier = self.non_outlier_pdu_exists(event_id);
let outlier = self.outlier_pdu_exists(event_id);
pin_mut!(non_outlier, outlier);
select_ok([Left(non_outlier), Right(outlier)])
.await
.map(at!(0))
.is_ok()
}
/// Like get_non_outlier_pdu(), but without the expense of fetching and
/// parsing the PduEvent
#[implement(Service)]
pub async fn non_outlier_pdu_exists(&self, event_id: &EventId) -> Result {
let pduid = self.get_pdu_id(event_id).await?;
self.db.pduid_pdu.exists(&pduid).await
}
/// Like get_non_outlier_pdu(), but without the expense of fetching and
/// parsing the PduEvent
#[implement(Service)]
#[inline]
pub async fn outlier_pdu_exists(&self, event_id: &EventId) -> Result {
self.db.eventid_outlierpdu.exists(event_id).await
}
/// Returns the `count` of this pdu's id.
#[implement(Service)]
pub async fn get_pdu_count(&self, event_id: &EventId) -> Result<PduCount> {
self.get_pdu_id(event_id)
.await
.map(|pdu_id| pdu_id.pdu_count())
}
/// Returns the pdu's id.
#[implement(Service)]
pub async fn get_pdu_id(&self, event_id: &EventId) -> Result<RawPduId> {
self.db
.eventid_pduid
.get(event_id)
.await
.map(|handle| RawPduId::from(&*handle))
}