From 9e539d0a22b4f218ba8d849c3f9e07475d8260f7 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Thu, 20 Nov 2025 03:21:34 +0000 Subject: [PATCH] Activate recursive relations. Maximum fan-out. Signed-off-by: Jason Volk --- src/api/client/relations.rs | 146 ++++++++++++---------------------- src/api/client/unversioned.rs | 3 +- 2 files changed, 54 insertions(+), 95 deletions(-) diff --git a/src/api/client/relations.rs b/src/api/client/relations.rs index b850c0a1..21bf3696 100644 --- a/src/api/client/relations.rs +++ b/src/api/client/relations.rs @@ -1,5 +1,11 @@ +use std::iter::once; + use axum::extract::State; -use futures::{FutureExt, Stream, StreamExt, TryFutureExt, future::try_join3}; +use futures::{ + FutureExt, StreamExt, TryFutureExt, + future::try_join3, + stream::{select_all, unfold}, +}; use ruma::{ EventId, RoomId, UInt, UserId, api::{ @@ -14,14 +20,13 @@ use ruma::{ use tuwunel_core::{ Err, Error, Result, at, err, matrix::{ - ShortRoomId, event::{Event, RelationTypeEqual}, - pdu::{Pdu, PduCount, PduId}, + pdu::{PduCount, PduId}, }, utils::{ BoolExt, result::FlatOk, - stream::{IterStream, ReadyExt, WidebandExt}, + stream::{ReadyExt, WidebandExt}, }, }; use tuwunel_service::Services; @@ -129,7 +134,7 @@ async fn paginate_relations_with_filter( let to: Option = to.map(str::parse).flat_ok(); // Spec (v1.10) recommends depth of at least 3 - let depth: u8 = if recurse { 3 } else { 1 }; + let max_depth: usize = if recurse { 3 } else { 0 }; let limit: usize = limit .map(TryInto::try_into) @@ -137,8 +142,6 @@ async fn paginate_relations_with_filter( .unwrap_or(30) .min(100); - let shortroomid = services.short.get_shortroomid(room_id); - let target = services .timeline .get_pdu_id(target) @@ -152,6 +155,8 @@ async fn paginate_relations_with_filter( visible.ok_or_else(|| err!(Request(Forbidden("You cannot view this room.")))) }); + let shortroomid = services.short.get_shortroomid(room_id); + let (shortroomid, target, ()) = try_join3(shortroomid, target, visible).await?; let Ok(target) = target else { @@ -162,124 +167,77 @@ async fn paginate_relations_with_filter( return Err!(Request(NotFound("Event not found in room."))); } - //TODO: support backfilled relations if let PduCount::Backfilled(_) = target.count { return Ok(get_relating_events::v1::Response::new(Vec::new())); } - let events: Vec<_> = get_relations( - services, - sender_user, - target.shortroomid, - target.count, - from, - limit, - depth, - dir, - ) - .await //TODO: XXX - .ready_take_while(|(count, _)| Some(*count) != to) - .ready_filter(|(_, pdu)| { + let fetch = |depth: usize, count: PduCount| { + services + .pdu_metadata + .get_relations(shortroomid, count, from, dir, Some(sender_user)) + .map(move |(count, pdu)| (depth, count, pdu)) + .ready_filter(|(_, count, _)| matches!(count, PduCount::Normal(_))) + .boxed() + }; + + let events = unfold(select_all(once(fetch(0, target.count))), async |mut relations| { + let (depth, count, pdu) = relations.next().await?; + + if depth < max_depth { + relations.push(fetch(depth.saturating_add(1), count)); + } + + Some(((depth, count, pdu), relations)) + }) + .ready_take_while(|&(_, count, _)| Some(count) != to) + .ready_filter(|(_, _, pdu)| { filter_event_type .as_ref() .is_none_or(|kind| kind == pdu.kind()) }) - .ready_filter(|(_, pdu)| { + .ready_filter(|(_, _, pdu)| { filter_rel_type .as_ref() .is_none_or(|rel_type| rel_type.relation_type_equal(pdu)) }) - .wide_filter_map(|item| visibility_filter(services, sender_user, item)) + .wide_filter_map(async |(depth, count, pdu)| { + services + .state_accessor + .user_can_see_event(sender_user, pdu.room_id(), pdu.event_id()) + .await + .then_some((depth, count, pdu)) + }) .take(limit) - .collect() + .collect::>() .await; Ok(get_relating_events::v1::Response { - recursion_depth: recurse.then_some(depth.into()), + recursion_depth: max_depth + .gt(&0) + .then(|| events.iter().map(at!(0))) + .into_iter() + .flatten() + .max() + .map(TryInto::try_into) + .transpose()?, next_batch: events .last() - .map(at!(0)) + .map(at!(1)) .as_ref() .map(ToString::to_string), prev_batch: events .first() - .map(at!(0)) + .map(at!(1)) .or(from) .as_ref() .map(ToString::to_string), chunk: events .into_iter() - .map(at!(1)) + .map(at!(2)) .map(Event::into_format) .collect(), }) } - -#[allow(clippy::too_many_arguments)] -async fn get_relations( - services: &Services, - sender_user: &UserId, - shortroomid: ShortRoomId, - target: PduCount, - from: Option, - limit: usize, - max_depth: u8, - dir: Direction, -) -> impl Stream + Send { - let mut pdus: Vec<_> = services - .pdu_metadata - .get_relations(shortroomid, target, from, dir, Some(sender_user)) - .take(limit) - .collect() - .await; - - let mut stack: Vec<_> = pdus - .iter() - .filter(|_| max_depth > 0) - .map(|(count, _)| (*count, 1)) - .collect(); - - 'limit: while let Some((target, depth)) = stack.pop() { - let PduCount::Normal(target) = target else { - continue; - }; - - let relations: Vec<_> = services - .pdu_metadata - .get_relations(shortroomid, target.into(), from, dir, Some(sender_user)) - .take(limit.saturating_sub(pdus.len())) - .collect() - .await; - - for (target, pdu) in relations { - if depth < max_depth { - stack.push((target, depth.saturating_add(1))); - } - - if pdus.len() < limit { - pdus.push((target, pdu)); - } else { - break 'limit; - } - } - } - - pdus.into_iter().stream() -} - -async fn visibility_filter( - services: &Services, - sender_user: &UserId, - item: (PduCount, Pdu), -) -> Option<(PduCount, Pdu)> { - let (_, pdu) = &item; - - services - .state_accessor - .user_can_see_event(sender_user, pdu.room_id(), pdu.event_id()) - .await - .then_some(item) -} diff --git a/src/api/client/unversioned.rs b/src/api/client/unversioned.rs index b4088440..7acd565b 100644 --- a/src/api/client/unversioned.rs +++ b/src/api/client/unversioned.rs @@ -37,7 +37,8 @@ pub(crate) async fn get_supported_versions_route( "v1.3".to_owned(), "v1.4".to_owned(), "v1.5".to_owned(), - "v1.11".to_owned(), + "v1.10".to_owned(), // relations recursion + "v1.11".to_owned(), // authenticated media ], unstable_features: BTreeMap::from_iter([ ("org.matrix.e2e_cross_signing".to_owned(), true),