Activate recursive relations. Maximum fan-out.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-11-20 03:21:34 +00:00
parent 5260912c3b
commit 9e539d0a22
2 changed files with 54 additions and 95 deletions

View File

@@ -1,5 +1,11 @@
use std::iter::once;
use axum::extract::State; use axum::extract::State;
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, future::try_join3}; use futures::{
FutureExt, StreamExt, TryFutureExt,
future::try_join3,
stream::{select_all, unfold},
};
use ruma::{ use ruma::{
EventId, RoomId, UInt, UserId, EventId, RoomId, UInt, UserId,
api::{ api::{
@@ -14,14 +20,13 @@ use ruma::{
use tuwunel_core::{ use tuwunel_core::{
Err, Error, Result, at, err, Err, Error, Result, at, err,
matrix::{ matrix::{
ShortRoomId,
event::{Event, RelationTypeEqual}, event::{Event, RelationTypeEqual},
pdu::{Pdu, PduCount, PduId}, pdu::{PduCount, PduId},
}, },
utils::{ utils::{
BoolExt, BoolExt,
result::FlatOk, result::FlatOk,
stream::{IterStream, ReadyExt, WidebandExt}, stream::{ReadyExt, WidebandExt},
}, },
}; };
use tuwunel_service::Services; use tuwunel_service::Services;
@@ -129,7 +134,7 @@ async fn paginate_relations_with_filter(
let to: Option<PduCount> = to.map(str::parse).flat_ok(); let to: Option<PduCount> = to.map(str::parse).flat_ok();
// Spec (v1.10) recommends depth of at least 3 // Spec (v1.10) recommends depth of at least 3
let depth: u8 = if recurse { 3 } else { 1 }; let max_depth: usize = if recurse { 3 } else { 0 };
let limit: usize = limit let limit: usize = limit
.map(TryInto::try_into) .map(TryInto::try_into)
@@ -137,8 +142,6 @@ async fn paginate_relations_with_filter(
.unwrap_or(30) .unwrap_or(30)
.min(100); .min(100);
let shortroomid = services.short.get_shortroomid(room_id);
let target = services let target = services
.timeline .timeline
.get_pdu_id(target) .get_pdu_id(target)
@@ -152,6 +155,8 @@ async fn paginate_relations_with_filter(
visible.ok_or_else(|| err!(Request(Forbidden("You cannot view this room.")))) visible.ok_or_else(|| err!(Request(Forbidden("You cannot view this room."))))
}); });
let shortroomid = services.short.get_shortroomid(room_id);
let (shortroomid, target, ()) = try_join3(shortroomid, target, visible).await?; let (shortroomid, target, ()) = try_join3(shortroomid, target, visible).await?;
let Ok(target) = target else { let Ok(target) = target else {
@@ -162,124 +167,77 @@ async fn paginate_relations_with_filter(
return Err!(Request(NotFound("Event not found in room."))); return Err!(Request(NotFound("Event not found in room.")));
} }
//TODO: support backfilled relations
if let PduCount::Backfilled(_) = target.count { if let PduCount::Backfilled(_) = target.count {
return Ok(get_relating_events::v1::Response::new(Vec::new())); return Ok(get_relating_events::v1::Response::new(Vec::new()));
} }
let events: Vec<_> = get_relations( let fetch = |depth: usize, count: PduCount| {
services, services
sender_user, .pdu_metadata
target.shortroomid, .get_relations(shortroomid, count, from, dir, Some(sender_user))
target.count, .map(move |(count, pdu)| (depth, count, pdu))
from, .ready_filter(|(_, count, _)| matches!(count, PduCount::Normal(_)))
limit, .boxed()
depth, };
dir,
) let events = unfold(select_all(once(fetch(0, target.count))), async |mut relations| {
.await //TODO: XXX let (depth, count, pdu) = relations.next().await?;
.ready_take_while(|(count, _)| Some(*count) != to)
.ready_filter(|(_, pdu)| { if depth < max_depth {
relations.push(fetch(depth.saturating_add(1), count));
}
Some(((depth, count, pdu), relations))
})
.ready_take_while(|&(_, count, _)| Some(count) != to)
.ready_filter(|(_, _, pdu)| {
filter_event_type filter_event_type
.as_ref() .as_ref()
.is_none_or(|kind| kind == pdu.kind()) .is_none_or(|kind| kind == pdu.kind())
}) })
.ready_filter(|(_, pdu)| { .ready_filter(|(_, _, pdu)| {
filter_rel_type filter_rel_type
.as_ref() .as_ref()
.is_none_or(|rel_type| rel_type.relation_type_equal(pdu)) .is_none_or(|rel_type| rel_type.relation_type_equal(pdu))
}) })
.wide_filter_map(|item| visibility_filter(services, sender_user, item)) .wide_filter_map(async |(depth, count, pdu)| {
services
.state_accessor
.user_can_see_event(sender_user, pdu.room_id(), pdu.event_id())
.await
.then_some((depth, count, pdu))
})
.take(limit) .take(limit)
.collect() .collect::<Vec<_>>()
.await; .await;
Ok(get_relating_events::v1::Response { Ok(get_relating_events::v1::Response {
recursion_depth: recurse.then_some(depth.into()), recursion_depth: max_depth
.gt(&0)
.then(|| events.iter().map(at!(0)))
.into_iter()
.flatten()
.max()
.map(TryInto::try_into)
.transpose()?,
next_batch: events next_batch: events
.last() .last()
.map(at!(0)) .map(at!(1))
.as_ref() .as_ref()
.map(ToString::to_string), .map(ToString::to_string),
prev_batch: events prev_batch: events
.first() .first()
.map(at!(0)) .map(at!(1))
.or(from) .or(from)
.as_ref() .as_ref()
.map(ToString::to_string), .map(ToString::to_string),
chunk: events chunk: events
.into_iter() .into_iter()
.map(at!(1)) .map(at!(2))
.map(Event::into_format) .map(Event::into_format)
.collect(), .collect(),
}) })
} }
#[allow(clippy::too_many_arguments)]
async fn get_relations(
services: &Services,
sender_user: &UserId,
shortroomid: ShortRoomId,
target: PduCount,
from: Option<PduCount>,
limit: usize,
max_depth: u8,
dir: Direction,
) -> impl Stream<Item = (PduCount, Pdu)> + Send {
let mut pdus: Vec<_> = services
.pdu_metadata
.get_relations(shortroomid, target, from, dir, Some(sender_user))
.take(limit)
.collect()
.await;
let mut stack: Vec<_> = pdus
.iter()
.filter(|_| max_depth > 0)
.map(|(count, _)| (*count, 1))
.collect();
'limit: while let Some((target, depth)) = stack.pop() {
let PduCount::Normal(target) = target else {
continue;
};
let relations: Vec<_> = services
.pdu_metadata
.get_relations(shortroomid, target.into(), from, dir, Some(sender_user))
.take(limit.saturating_sub(pdus.len()))
.collect()
.await;
for (target, pdu) in relations {
if depth < max_depth {
stack.push((target, depth.saturating_add(1)));
}
if pdus.len() < limit {
pdus.push((target, pdu));
} else {
break 'limit;
}
}
}
pdus.into_iter().stream()
}
async fn visibility_filter<Pdu: Event>(
services: &Services,
sender_user: &UserId,
item: (PduCount, Pdu),
) -> Option<(PduCount, Pdu)> {
let (_, pdu) = &item;
services
.state_accessor
.user_can_see_event(sender_user, pdu.room_id(), pdu.event_id())
.await
.then_some(item)
}

View File

@@ -37,7 +37,8 @@ pub(crate) async fn get_supported_versions_route(
"v1.3".to_owned(), "v1.3".to_owned(),
"v1.4".to_owned(), "v1.4".to_owned(),
"v1.5".to_owned(), "v1.5".to_owned(),
"v1.11".to_owned(), "v1.10".to_owned(), // relations recursion
"v1.11".to_owned(), // authenticated media
], ],
unstable_features: BTreeMap::from_iter([ unstable_features: BTreeMap::from_iter([
("org.matrix.e2e_cross_signing".to_owned(), true), ("org.matrix.e2e_cross_signing".to_owned(), true),