From 90228e48659363b690a26bd10005024af74dc44c Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Tue, 18 Nov 2025 07:10:37 +0000 Subject: [PATCH] Combine pdu_metadata Data into Service unit. Move recursive component of get_relations from service to api crate. Signed-off-by: Jason Volk --- src/api/client/relations.rs | 128 ++++++++++--- src/service/rooms/pdu_metadata/data.rs | 147 -------------- src/service/rooms/pdu_metadata/mod.rs | 256 ++++++++++++++----------- 3 files changed, 248 insertions(+), 283 deletions(-) delete mode 100644 src/service/rooms/pdu_metadata/data.rs diff --git a/src/api/client/relations.rs b/src/api/client/relations.rs index dc851b15..5319bcbc 100644 --- a/src/api/client/relations.rs +++ b/src/api/client/relations.rs @@ -1,5 +1,5 @@ use axum::extract::State; -use futures::StreamExt; +use futures::{Stream, StreamExt, TryFutureExt, future::try_join}; use ruma::{ EventId, RoomId, UInt, UserId, api::{ @@ -15,9 +15,12 @@ use tuwunel_core::{ Result, at, matrix::{ event::{Event, RelationTypeEqual}, - pdu::PduCount, + pdu::{Pdu, PduCount}, + }, + utils::{ + result::FlatOk, + stream::{IterStream, ReadyExt, WidebandExt}, }, - utils::{IterStream, ReadyExt, result::FlatOk, stream::WidebandExt}, }; use tuwunel_service::Services; @@ -99,6 +102,13 @@ pub(crate) async fn get_relating_events_route( } #[allow(clippy::too_many_arguments)] +#[tracing::instrument( + name = "relations", + level = "debug", + skip_all, + fields(room_id, target, from, to, dir, limit, recurse), + ret(level = "trace") +)] async fn paginate_relations_with_filter( services: &Services, sender_user: &UserId, @@ -132,27 +142,24 @@ async fn paginate_relations_with_filter( // Spec (v1.10) recommends depth of at least 3 let depth: u8 = if recurse { 3 } else { 1 }; - let events: Vec<_> = services - .pdu_metadata - .get_relations(sender_user, room_id, target, start, limit, depth, dir) - .await - .into_iter() - .filter(|(_, pdu)| { - filter_event_type - .as_ref() - .is_none_or(|kind| kind == pdu.kind()) - }) - .filter(|(_, pdu)| { - filter_rel_type - .as_ref() - .is_none_or(|rel_type| rel_type.relation_type_equal(pdu)) - }) - .stream() - .ready_take_while(|(count, _)| Some(*count) != to) - .wide_filter_map(|item| visibility_filter(services, sender_user, item)) - .take(limit) - .collect() - .await; + let events: Vec<_> = + get_relations(services, sender_user, room_id, target, start, limit, depth, dir) + .await + .ready_filter(|(_, pdu)| { + filter_event_type + .as_ref() + .is_none_or(|kind| kind == pdu.kind()) + }) + .ready_filter(|(_, pdu)| { + filter_rel_type + .as_ref() + .is_none_or(|rel_type| rel_type.relation_type_equal(pdu)) + }) + .ready_take_while(|(count, _)| Some(*count) != to) + .wide_filter_map(|item| visibility_filter(services, sender_user, item)) + .take(limit) + .collect() + .await; let next_batch = events .last() @@ -172,6 +179,79 @@ async fn paginate_relations_with_filter( }) } +#[allow(clippy::too_many_arguments)] +async fn get_relations( + services: &Services, + sender_user: &UserId, + room_id: &RoomId, + target: &EventId, + from: PduCount, + limit: usize, + max_depth: u8, + dir: Direction, +) -> impl Stream + Send { + let room_id = services.short.get_shortroomid(room_id); + + let target = services + .timeline + .get_pdu_count(target) + .map_ok(|target| { + match target { + | PduCount::Normal(count) => count, + | _ => { + // TODO: Support backfilled relations + 0 // This will result in an empty iterator + }, + } + }); + + let Ok((room_id, target)) = try_join(room_id, target).await else { + return Vec::new().into_iter().stream(); + }; + + let mut pdus: Vec<_> = services + .pdu_metadata + .get_relations(room_id, target.into(), from, dir, Some(sender_user)) + .collect() + .await; + + let mut stack: Vec<_> = pdus + .iter() + .filter(|_| max_depth > 0) + .map(|pdu| (pdu.clone(), 1)) + .collect(); + + 'limit: while let Some(stack_pdu) = stack.pop() { + let target = match stack_pdu.0.0 { + | PduCount::Normal(c) => c, + | PduCount::Backfilled(_) => { + // TODO: Support backfilled relations + 0 // This will result in an empty iterator + }, + }; + + let relations: Vec<_> = services + .pdu_metadata + .get_relations(room_id, target.into(), from, dir, Some(sender_user)) + .collect() + .await; + + for relation in relations { + if stack_pdu.1 < max_depth { + stack.push((relation.clone(), stack_pdu.1.saturating_add(1))); + } + + if pdus.len() < limit { + pdus.push(relation); + } else { + break 'limit; + } + } + } + + pdus.into_iter().stream() +} + async fn visibility_filter( services: &Services, sender_user: &UserId, diff --git a/src/service/rooms/pdu_metadata/data.rs b/src/service/rooms/pdu_metadata/data.rs deleted file mode 100644 index c0c50cf3..00000000 --- a/src/service/rooms/pdu_metadata/data.rs +++ /dev/null @@ -1,147 +0,0 @@ -use std::{mem::size_of, sync::Arc}; - -use futures::{Stream, StreamExt}; -use ruma::{EventId, RoomId, UserId, api::Direction}; -use tuwunel_core::{ - Result, - arrayvec::ArrayVec, - matrix::{Event, PduCount}, - result::LogErr, - trace, - utils::{ - ReadyExt, - stream::{TryIgnore, WidebandExt}, - u64_from_u8, - }, -}; -use tuwunel_database::{Interfix, Map}; - -use crate::rooms::{ - short::ShortRoomId, - timeline::{PduId, RawPduId}, -}; - -pub(super) struct Data { - tofrom_relation: Arc, - referencedevents: Arc, - softfailedeventids: Arc, - services: Arc, -} - -impl Data { - pub(super) fn new(args: &crate::Args<'_>) -> Self { - let db = &args.db; - Self { - tofrom_relation: db["tofrom_relation"].clone(), - referencedevents: db["referencedevents"].clone(), - softfailedeventids: db["softfailedeventids"].clone(), - services: args.services.clone(), - } - } - - #[inline] - pub(super) fn add_relation(&self, from: u64, to: u64) { - const BUFSIZE: usize = size_of::() * 2; - - let key: &[u64] = &[to, from]; - self.tofrom_relation - .aput_raw::(key, []); - } - - pub(super) fn get_relations<'a>( - &'a self, - user_id: &'a UserId, - shortroomid: ShortRoomId, - target: PduCount, - from: PduCount, - dir: Direction, - ) -> impl Stream + Send + '_ { - let mut current = ArrayVec::::new(); - current.extend(target.into_unsigned().to_be_bytes()); - current.extend( - from.saturating_inc(dir) - .into_unsigned() - .to_be_bytes(), - ); - let current = current.as_slice(); - match dir { - | Direction::Forward => self - .tofrom_relation - .raw_keys_from(current) - .boxed(), - | Direction::Backward => self - .tofrom_relation - .rev_raw_keys_from(current) - .boxed(), - } - .ignore_err() - .ready_take_while(move |key| key.starts_with(&target.into_unsigned().to_be_bytes())) - .map(|to_from| u64_from_u8(&to_from[8..16])) - .map(PduCount::from_unsigned) - .map(move |count| (user_id, shortroomid, count)) - .wide_filter_map(async |(user_id, shortroomid, count)| { - let pdu_id: RawPduId = PduId { shortroomid, count }.into(); - let mut pdu = self - .services - .timeline - .get_pdu_from_id(&pdu_id) - .await - .ok()?; - - if pdu.sender() != user_id { - pdu.as_mut_pdu() - .remove_transaction_id() - .log_err() - .ok(); - } - - Some((count, pdu)) - }) - } - - #[inline] - pub(super) fn mark_as_referenced<'a, I>(&self, room_id: &RoomId, event_ids: I) - where - I: Iterator, - { - for prev in event_ids { - let key = (room_id, prev); - self.referencedevents.put_raw(key, []); - } - } - - #[inline] - pub(super) async fn is_event_referenced(&self, room_id: &RoomId, event_id: &EventId) -> bool { - let key = (room_id, event_id); - self.referencedevents.qry(&key).await.is_ok() - } - - #[inline] - pub(super) fn mark_event_soft_failed(&self, event_id: &EventId) { - self.softfailedeventids.insert(event_id, []); - } - - #[inline] - pub(super) async fn is_event_soft_failed(&self, event_id: &EventId) -> bool { - self.softfailedeventids - .get(event_id) - .await - .is_ok() - } - - #[inline] - pub(super) async fn delete_all_referenced_for_room(&self, room_id: &RoomId) -> Result { - let prefix = (room_id, Interfix); - - self.referencedevents - .keys_prefix_raw(&prefix) - .ignore_err() - .ready_for_each(|key| { - trace!("Removing key: {key:?}"); - self.referencedevents.remove(key); - }) - .await; - - Ok(()) - } -} diff --git a/src/service/rooms/pdu_metadata/mod.rs b/src/service/rooms/pdu_metadata/mod.rs index 6e408ec4..c662e940 100644 --- a/src/service/rooms/pdu_metadata/mod.rs +++ b/src/service/rooms/pdu_metadata/mod.rs @@ -1,137 +1,169 @@ -mod data; use std::sync::Arc; -use futures::{StreamExt, future::try_join}; +use futures::{Stream, StreamExt, TryFutureExt}; use ruma::{EventId, RoomId, UserId, api::Direction}; use tuwunel_core::{ - Result, - matrix::{Event, PduCount}, + PduId, Result, + arrayvec::ArrayVec, + implement, + matrix::{Event, Pdu, PduCount, RawPduId}, + result::LogErr, + trace, + utils::{ + stream::{ReadyExt, TryIgnore, WidebandExt}, + u64_from_u8, + }, }; +use tuwunel_database::{Interfix, Map}; -use self::data::Data; +use crate::rooms::short::ShortRoomId; pub struct Service { services: Arc, db: Data, } +struct Data { + tofrom_relation: Arc, + referencedevents: Arc, + softfailedeventids: Arc, +} + impl crate::Service for Service { fn build(args: &crate::Args<'_>) -> Result> { Ok(Arc::new(Self { services: args.services.clone(), - db: Data::new(args), + db: Data { + tofrom_relation: args.db["tofrom_relation"].clone(), + referencedevents: args.db["referencedevents"].clone(), + softfailedeventids: args.db["softfailedeventids"].clone(), + }, })) } fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } } -impl Service { - #[tracing::instrument(skip(self, from, to), level = "debug")] - pub fn add_relation(&self, from: PduCount, to: PduCount) { - match (from, to) { - | (PduCount::Normal(f), PduCount::Normal(t)) => self.db.add_relation(f, t), - | _ => { - // TODO: Relations with backfilled pdus - }, - } - } +#[implement(Service)] +#[tracing::instrument(skip(self, from, to), level = "debug")] +pub fn add_relation(&self, from: PduCount, to: PduCount) { + const BUFSIZE: usize = size_of::() * 2; - #[allow(clippy::too_many_arguments)] - pub async fn get_relations<'a>( - &'a self, - user_id: &'a UserId, - room_id: &'a RoomId, - target: &'a EventId, - from: PduCount, - limit: usize, - max_depth: u8, - dir: Direction, - ) -> Vec<(PduCount, impl Event)> { - let room_id = self.services.short.get_shortroomid(room_id); - - let target = self.services.timeline.get_pdu_count(target); - - let Ok((room_id, target)) = try_join(room_id, target).await else { - return Vec::new(); - }; - - let target = match target { - | PduCount::Normal(c) => c, - // TODO: Support backfilled relations - | _ => 0, // This will result in an empty iterator - }; - - let mut pdus: Vec<_> = self - .db - .get_relations(user_id, room_id, target.into(), from, dir) - .collect() - .await; - - let mut stack: Vec<_> = pdus - .iter() - .filter(|_| max_depth > 0) - .map(|pdu| (pdu.clone(), 1)) - .collect(); - - 'limit: while let Some(stack_pdu) = stack.pop() { - let target = match stack_pdu.0.0 { - | PduCount::Normal(c) => c, - // TODO: Support backfilled relations - | PduCount::Backfilled(_) => 0, // This will result in an empty iterator - }; - - let relations: Vec<_> = self - .db - .get_relations(user_id, room_id, target.into(), from, dir) - .collect() - .await; - - for relation in relations { - if stack_pdu.1 < max_depth { - stack.push((relation.clone(), stack_pdu.1.saturating_add(1))); - } - - if pdus.len() < limit { - pdus.push(relation); - } else { - break 'limit; - } - } - } - - pdus - } - - #[tracing::instrument(skip_all, level = "debug")] - pub fn mark_as_referenced<'a, I>(&self, room_id: &RoomId, event_ids: I) - where - I: Iterator, - { - self.db.mark_as_referenced(room_id, event_ids); - } - - #[tracing::instrument(skip(self), level = "debug")] - pub async fn is_event_referenced(&self, room_id: &RoomId, event_id: &EventId) -> bool { - self.db - .is_event_referenced(room_id, event_id) - .await - } - - #[tracing::instrument(skip(self), level = "debug")] - pub fn mark_event_soft_failed(&self, event_id: &EventId) { - self.db.mark_event_soft_failed(event_id); - } - - #[tracing::instrument(skip(self), level = "debug")] - pub async fn is_event_soft_failed(&self, event_id: &EventId) -> bool { - self.db.is_event_soft_failed(event_id).await - } - - #[tracing::instrument(skip(self), level = "debug")] - pub async fn delete_all_referenced_for_room(&self, room_id: &RoomId) -> Result { - self.db - .delete_all_referenced_for_room(room_id) - .await + match (from, to) { + | (PduCount::Normal(from), PduCount::Normal(to)) => { + let key: &[u64] = &[to, from]; + self.db + .tofrom_relation + .aput_raw::(key, []); + }, + | _ => {}, // TODO: Relations with backfilled pdus } } + +#[implement(Service)] +pub fn get_relations<'a>( + &'a self, + shortroomid: ShortRoomId, + target: PduCount, + from: PduCount, + dir: Direction, + user_id: Option<&'a UserId>, +) -> impl Stream + Send + '_ { + let target = target.to_be_bytes(); + let from = from.saturating_inc(dir).to_be_bytes(); + let mut buf = ArrayVec::::new(); + let start = { + buf.extend(target); + buf.extend(from); + buf.as_slice() + }; + + match dir { + | Direction::Forward => self + .db + .tofrom_relation + .raw_keys_from(start) + .boxed(), + | Direction::Backward => self + .db + .tofrom_relation + .rev_raw_keys_from(start) + .boxed(), + } + .ignore_err() + .ready_take_while(move |key| key.starts_with(&target)) + .map(|to_from| u64_from_u8(&to_from[8..16])) + .map(PduCount::from_unsigned) + .map(move |count| (user_id, shortroomid, count)) + .wide_filter_map(async |(user_id, shortroomid, count)| { + let pdu_id: RawPduId = PduId { shortroomid, count }.into(); + self.services + .timeline + .get_pdu_from_id(&pdu_id) + .map_ok(move |mut pdu| { + if user_id.is_none_or(|user_id| pdu.sender() != user_id) { + pdu.as_mut_pdu() + .remove_transaction_id() + .log_err() + .ok(); + } + + (count, pdu) + }) + .await + .ok() + }) +} + +#[implement(Service)] +#[tracing::instrument(skip_all, level = "debug")] +pub fn mark_as_referenced<'a, I>(&self, room_id: &RoomId, event_ids: I) +where + I: Iterator, +{ + for prev in event_ids { + let key = (room_id, prev); + self.db.referencedevents.put_raw(key, []); + } +} + +#[implement(Service)] +#[tracing::instrument(skip(self), level = "debug")] +pub async fn is_event_referenced(&self, room_id: &RoomId, event_id: &EventId) -> bool { + let key = (room_id, event_id); + self.db.referencedevents.qry(&key).await.is_ok() +} + +#[implement(Service)] +#[tracing::instrument(skip(self), level = "debug")] +pub fn mark_event_soft_failed(&self, event_id: &EventId) { + self.db.softfailedeventids.insert(event_id, []); +} + +#[implement(Service)] +#[tracing::instrument(skip(self), level = "debug")] +pub async fn is_event_soft_failed(&self, event_id: &EventId) -> bool { + self.db + .softfailedeventids + .get(event_id) + .await + .is_ok() +} + +#[implement(Service)] +#[tracing::instrument(skip(self), level = "debug")] +pub async fn delete_all_referenced_for_room(&self, room_id: &RoomId) -> Result { + let prefix = (room_id, Interfix); + + self.db + .referencedevents + .keys_prefix_raw(&prefix) + .ignore_err() + .ready_for_each(|key| { + trace!(?key, "Removing key"); + self.db.referencedevents.remove(key); + }) + .await; + + Ok(()) +}