Combine pdu_metadata Data into Service unit.

Move recursive component of get_relations from service to api crate.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-11-18 07:10:37 +00:00
parent 53b5eb4ba6
commit 90228e4865
3 changed files with 248 additions and 283 deletions

View File

@@ -1,5 +1,5 @@
use axum::extract::State;
use futures::StreamExt;
use futures::{Stream, StreamExt, TryFutureExt, future::try_join};
use ruma::{
EventId, RoomId, UInt, UserId,
api::{
@@ -15,9 +15,12 @@ use tuwunel_core::{
Result, at,
matrix::{
event::{Event, RelationTypeEqual},
pdu::PduCount,
pdu::{Pdu, PduCount},
},
utils::{
result::FlatOk,
stream::{IterStream, ReadyExt, WidebandExt},
},
utils::{IterStream, ReadyExt, result::FlatOk, stream::WidebandExt},
};
use tuwunel_service::Services;
@@ -99,6 +102,13 @@ pub(crate) async fn get_relating_events_route(
}
#[allow(clippy::too_many_arguments)]
#[tracing::instrument(
name = "relations",
level = "debug",
skip_all,
fields(room_id, target, from, to, dir, limit, recurse),
ret(level = "trace")
)]
async fn paginate_relations_with_filter(
services: &Services,
sender_user: &UserId,
@@ -132,27 +142,24 @@ async fn paginate_relations_with_filter(
// Spec (v1.10) recommends depth of at least 3
let depth: u8 = if recurse { 3 } else { 1 };
let events: Vec<_> = services
.pdu_metadata
.get_relations(sender_user, room_id, target, start, limit, depth, dir)
.await
.into_iter()
.filter(|(_, pdu)| {
filter_event_type
.as_ref()
.is_none_or(|kind| kind == pdu.kind())
})
.filter(|(_, pdu)| {
filter_rel_type
.as_ref()
.is_none_or(|rel_type| rel_type.relation_type_equal(pdu))
})
.stream()
.ready_take_while(|(count, _)| Some(*count) != to)
.wide_filter_map(|item| visibility_filter(services, sender_user, item))
.take(limit)
.collect()
.await;
let events: Vec<_> =
get_relations(services, sender_user, room_id, target, start, limit, depth, dir)
.await
.ready_filter(|(_, pdu)| {
filter_event_type
.as_ref()
.is_none_or(|kind| kind == pdu.kind())
})
.ready_filter(|(_, pdu)| {
filter_rel_type
.as_ref()
.is_none_or(|rel_type| rel_type.relation_type_equal(pdu))
})
.ready_take_while(|(count, _)| Some(*count) != to)
.wide_filter_map(|item| visibility_filter(services, sender_user, item))
.take(limit)
.collect()
.await;
let next_batch = events
.last()
@@ -172,6 +179,79 @@ async fn paginate_relations_with_filter(
})
}
#[allow(clippy::too_many_arguments)]
async fn get_relations(
services: &Services,
sender_user: &UserId,
room_id: &RoomId,
target: &EventId,
from: PduCount,
limit: usize,
max_depth: u8,
dir: Direction,
) -> impl Stream<Item = (PduCount, Pdu)> + Send {
let room_id = services.short.get_shortroomid(room_id);
let target = services
.timeline
.get_pdu_count(target)
.map_ok(|target| {
match target {
| PduCount::Normal(count) => count,
| _ => {
// TODO: Support backfilled relations
0 // This will result in an empty iterator
},
}
});
let Ok((room_id, target)) = try_join(room_id, target).await else {
return Vec::new().into_iter().stream();
};
let mut pdus: Vec<_> = services
.pdu_metadata
.get_relations(room_id, target.into(), from, dir, Some(sender_user))
.collect()
.await;
let mut stack: Vec<_> = pdus
.iter()
.filter(|_| max_depth > 0)
.map(|pdu| (pdu.clone(), 1))
.collect();
'limit: while let Some(stack_pdu) = stack.pop() {
let target = match stack_pdu.0.0 {
| PduCount::Normal(c) => c,
| PduCount::Backfilled(_) => {
// TODO: Support backfilled relations
0 // This will result in an empty iterator
},
};
let relations: Vec<_> = services
.pdu_metadata
.get_relations(room_id, target.into(), from, dir, Some(sender_user))
.collect()
.await;
for relation in relations {
if stack_pdu.1 < max_depth {
stack.push((relation.clone(), stack_pdu.1.saturating_add(1)));
}
if pdus.len() < limit {
pdus.push(relation);
} else {
break 'limit;
}
}
}
pdus.into_iter().stream()
}
async fn visibility_filter<Pdu: Event>(
services: &Services,
sender_user: &UserId,

View File

@@ -1,147 +0,0 @@
use std::{mem::size_of, sync::Arc};
use futures::{Stream, StreamExt};
use ruma::{EventId, RoomId, UserId, api::Direction};
use tuwunel_core::{
Result,
arrayvec::ArrayVec,
matrix::{Event, PduCount},
result::LogErr,
trace,
utils::{
ReadyExt,
stream::{TryIgnore, WidebandExt},
u64_from_u8,
},
};
use tuwunel_database::{Interfix, Map};
use crate::rooms::{
short::ShortRoomId,
timeline::{PduId, RawPduId},
};
pub(super) struct Data {
tofrom_relation: Arc<Map>,
referencedevents: Arc<Map>,
softfailedeventids: Arc<Map>,
services: Arc<crate::services::OnceServices>,
}
impl Data {
pub(super) fn new(args: &crate::Args<'_>) -> Self {
let db = &args.db;
Self {
tofrom_relation: db["tofrom_relation"].clone(),
referencedevents: db["referencedevents"].clone(),
softfailedeventids: db["softfailedeventids"].clone(),
services: args.services.clone(),
}
}
#[inline]
pub(super) fn add_relation(&self, from: u64, to: u64) {
const BUFSIZE: usize = size_of::<u64>() * 2;
let key: &[u64] = &[to, from];
self.tofrom_relation
.aput_raw::<BUFSIZE, _, _>(key, []);
}
pub(super) fn get_relations<'a>(
&'a self,
user_id: &'a UserId,
shortroomid: ShortRoomId,
target: PduCount,
from: PduCount,
dir: Direction,
) -> impl Stream<Item = (PduCount, impl Event)> + Send + '_ {
let mut current = ArrayVec::<u8, 16>::new();
current.extend(target.into_unsigned().to_be_bytes());
current.extend(
from.saturating_inc(dir)
.into_unsigned()
.to_be_bytes(),
);
let current = current.as_slice();
match dir {
| Direction::Forward => self
.tofrom_relation
.raw_keys_from(current)
.boxed(),
| Direction::Backward => self
.tofrom_relation
.rev_raw_keys_from(current)
.boxed(),
}
.ignore_err()
.ready_take_while(move |key| key.starts_with(&target.into_unsigned().to_be_bytes()))
.map(|to_from| u64_from_u8(&to_from[8..16]))
.map(PduCount::from_unsigned)
.map(move |count| (user_id, shortroomid, count))
.wide_filter_map(async |(user_id, shortroomid, count)| {
let pdu_id: RawPduId = PduId { shortroomid, count }.into();
let mut pdu = self
.services
.timeline
.get_pdu_from_id(&pdu_id)
.await
.ok()?;
if pdu.sender() != user_id {
pdu.as_mut_pdu()
.remove_transaction_id()
.log_err()
.ok();
}
Some((count, pdu))
})
}
#[inline]
pub(super) fn mark_as_referenced<'a, I>(&self, room_id: &RoomId, event_ids: I)
where
I: Iterator<Item = &'a EventId>,
{
for prev in event_ids {
let key = (room_id, prev);
self.referencedevents.put_raw(key, []);
}
}
#[inline]
pub(super) async fn is_event_referenced(&self, room_id: &RoomId, event_id: &EventId) -> bool {
let key = (room_id, event_id);
self.referencedevents.qry(&key).await.is_ok()
}
#[inline]
pub(super) fn mark_event_soft_failed(&self, event_id: &EventId) {
self.softfailedeventids.insert(event_id, []);
}
#[inline]
pub(super) async fn is_event_soft_failed(&self, event_id: &EventId) -> bool {
self.softfailedeventids
.get(event_id)
.await
.is_ok()
}
#[inline]
pub(super) async fn delete_all_referenced_for_room(&self, room_id: &RoomId) -> Result {
let prefix = (room_id, Interfix);
self.referencedevents
.keys_prefix_raw(&prefix)
.ignore_err()
.ready_for_each(|key| {
trace!("Removing key: {key:?}");
self.referencedevents.remove(key);
})
.await;
Ok(())
}
}

View File

@@ -1,137 +1,169 @@
mod data;
use std::sync::Arc;
use futures::{StreamExt, future::try_join};
use futures::{Stream, StreamExt, TryFutureExt};
use ruma::{EventId, RoomId, UserId, api::Direction};
use tuwunel_core::{
Result,
matrix::{Event, PduCount},
PduId, Result,
arrayvec::ArrayVec,
implement,
matrix::{Event, Pdu, PduCount, RawPduId},
result::LogErr,
trace,
utils::{
stream::{ReadyExt, TryIgnore, WidebandExt},
u64_from_u8,
},
};
use tuwunel_database::{Interfix, Map};
use self::data::Data;
use crate::rooms::short::ShortRoomId;
pub struct Service {
services: Arc<crate::services::OnceServices>,
db: Data,
}
struct Data {
tofrom_relation: Arc<Map>,
referencedevents: Arc<Map>,
softfailedeventids: Arc<Map>,
}
impl crate::Service for Service {
fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
services: args.services.clone(),
db: Data::new(args),
db: Data {
tofrom_relation: args.db["tofrom_relation"].clone(),
referencedevents: args.db["referencedevents"].clone(),
softfailedeventids: args.db["softfailedeventids"].clone(),
},
}))
}
fn name(&self) -> &str { crate::service::make_name(std::module_path!()) }
}
impl Service {
#[tracing::instrument(skip(self, from, to), level = "debug")]
pub fn add_relation(&self, from: PduCount, to: PduCount) {
match (from, to) {
| (PduCount::Normal(f), PduCount::Normal(t)) => self.db.add_relation(f, t),
| _ => {
// TODO: Relations with backfilled pdus
},
}
}
#[implement(Service)]
#[tracing::instrument(skip(self, from, to), level = "debug")]
pub fn add_relation(&self, from: PduCount, to: PduCount) {
const BUFSIZE: usize = size_of::<u64>() * 2;
#[allow(clippy::too_many_arguments)]
pub async fn get_relations<'a>(
&'a self,
user_id: &'a UserId,
room_id: &'a RoomId,
target: &'a EventId,
from: PduCount,
limit: usize,
max_depth: u8,
dir: Direction,
) -> Vec<(PduCount, impl Event)> {
let room_id = self.services.short.get_shortroomid(room_id);
let target = self.services.timeline.get_pdu_count(target);
let Ok((room_id, target)) = try_join(room_id, target).await else {
return Vec::new();
};
let target = match target {
| PduCount::Normal(c) => c,
// TODO: Support backfilled relations
| _ => 0, // This will result in an empty iterator
};
let mut pdus: Vec<_> = self
.db
.get_relations(user_id, room_id, target.into(), from, dir)
.collect()
.await;
let mut stack: Vec<_> = pdus
.iter()
.filter(|_| max_depth > 0)
.map(|pdu| (pdu.clone(), 1))
.collect();
'limit: while let Some(stack_pdu) = stack.pop() {
let target = match stack_pdu.0.0 {
| PduCount::Normal(c) => c,
// TODO: Support backfilled relations
| PduCount::Backfilled(_) => 0, // This will result in an empty iterator
};
let relations: Vec<_> = self
.db
.get_relations(user_id, room_id, target.into(), from, dir)
.collect()
.await;
for relation in relations {
if stack_pdu.1 < max_depth {
stack.push((relation.clone(), stack_pdu.1.saturating_add(1)));
}
if pdus.len() < limit {
pdus.push(relation);
} else {
break 'limit;
}
}
}
pdus
}
#[tracing::instrument(skip_all, level = "debug")]
pub fn mark_as_referenced<'a, I>(&self, room_id: &RoomId, event_ids: I)
where
I: Iterator<Item = &'a EventId>,
{
self.db.mark_as_referenced(room_id, event_ids);
}
#[tracing::instrument(skip(self), level = "debug")]
pub async fn is_event_referenced(&self, room_id: &RoomId, event_id: &EventId) -> bool {
self.db
.is_event_referenced(room_id, event_id)
.await
}
#[tracing::instrument(skip(self), level = "debug")]
pub fn mark_event_soft_failed(&self, event_id: &EventId) {
self.db.mark_event_soft_failed(event_id);
}
#[tracing::instrument(skip(self), level = "debug")]
pub async fn is_event_soft_failed(&self, event_id: &EventId) -> bool {
self.db.is_event_soft_failed(event_id).await
}
#[tracing::instrument(skip(self), level = "debug")]
pub async fn delete_all_referenced_for_room(&self, room_id: &RoomId) -> Result {
self.db
.delete_all_referenced_for_room(room_id)
.await
match (from, to) {
| (PduCount::Normal(from), PduCount::Normal(to)) => {
let key: &[u64] = &[to, from];
self.db
.tofrom_relation
.aput_raw::<BUFSIZE, _, _>(key, []);
},
| _ => {}, // TODO: Relations with backfilled pdus
}
}
#[implement(Service)]
pub fn get_relations<'a>(
&'a self,
shortroomid: ShortRoomId,
target: PduCount,
from: PduCount,
dir: Direction,
user_id: Option<&'a UserId>,
) -> impl Stream<Item = (PduCount, Pdu)> + Send + '_ {
let target = target.to_be_bytes();
let from = from.saturating_inc(dir).to_be_bytes();
let mut buf = ArrayVec::<u8, 16>::new();
let start = {
buf.extend(target);
buf.extend(from);
buf.as_slice()
};
match dir {
| Direction::Forward => self
.db
.tofrom_relation
.raw_keys_from(start)
.boxed(),
| Direction::Backward => self
.db
.tofrom_relation
.rev_raw_keys_from(start)
.boxed(),
}
.ignore_err()
.ready_take_while(move |key| key.starts_with(&target))
.map(|to_from| u64_from_u8(&to_from[8..16]))
.map(PduCount::from_unsigned)
.map(move |count| (user_id, shortroomid, count))
.wide_filter_map(async |(user_id, shortroomid, count)| {
let pdu_id: RawPduId = PduId { shortroomid, count }.into();
self.services
.timeline
.get_pdu_from_id(&pdu_id)
.map_ok(move |mut pdu| {
if user_id.is_none_or(|user_id| pdu.sender() != user_id) {
pdu.as_mut_pdu()
.remove_transaction_id()
.log_err()
.ok();
}
(count, pdu)
})
.await
.ok()
})
}
#[implement(Service)]
#[tracing::instrument(skip_all, level = "debug")]
pub fn mark_as_referenced<'a, I>(&self, room_id: &RoomId, event_ids: I)
where
I: Iterator<Item = &'a EventId>,
{
for prev in event_ids {
let key = (room_id, prev);
self.db.referencedevents.put_raw(key, []);
}
}
#[implement(Service)]
#[tracing::instrument(skip(self), level = "debug")]
pub async fn is_event_referenced(&self, room_id: &RoomId, event_id: &EventId) -> bool {
let key = (room_id, event_id);
self.db.referencedevents.qry(&key).await.is_ok()
}
#[implement(Service)]
#[tracing::instrument(skip(self), level = "debug")]
pub fn mark_event_soft_failed(&self, event_id: &EventId) {
self.db.softfailedeventids.insert(event_id, []);
}
#[implement(Service)]
#[tracing::instrument(skip(self), level = "debug")]
pub async fn is_event_soft_failed(&self, event_id: &EventId) -> bool {
self.db
.softfailedeventids
.get(event_id)
.await
.is_ok()
}
#[implement(Service)]
#[tracing::instrument(skip(self), level = "debug")]
pub async fn delete_all_referenced_for_room(&self, room_id: &RoomId) -> Result {
let prefix = (room_id, Interfix);
self.db
.referencedevents
.keys_prefix_raw(&prefix)
.ignore_err()
.ready_for_each(|key| {
trace!(?key, "Removing key");
self.db.referencedevents.remove(key);
})
.await;
Ok(())
}