Flatten threads_until().

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-07-08 13:12:41 +00:00
parent 18f8d6c65c
commit 4429323e11

View File

@@ -1,6 +1,6 @@
use std::{collections::BTreeMap, sync::Arc}; use std::{collections::BTreeMap, sync::Arc};
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt, TryFutureExt};
use ruma::{ use ruma::{
CanonicalJsonValue, EventId, OwnedUserId, RoomId, UserId, CanonicalJsonValue, EventId, OwnedUserId, RoomId, UserId,
api::client::threads::get_threads::v1::IncludeThreads, events::relation::BundledThread, uint, api::client::threads::get_threads::v1::IncludeThreads, events::relation::BundledThread, uint,
@@ -16,7 +16,7 @@ use tuwunel_core::{
}; };
use tuwunel_database::{Deserialized, Map}; use tuwunel_database::{Deserialized, Map};
use crate::{Dep, rooms, rooms::short::ShortRoomId}; use crate::{Dep, rooms};
pub struct Service { pub struct Service {
db: Data, db: Data,
@@ -125,61 +125,57 @@ impl Service {
let mut users = Vec::new(); let mut users = Vec::new();
match self.get_participants(&root_id).await { match self.get_participants(&root_id).await {
| Ok(userids) => { | Ok(userids) => users.extend_from_slice(&userids),
users.extend_from_slice(&userids); | _ => users.push(root_pdu.sender().to_owned()),
},
| _ => {
users.push(root_pdu.sender().to_owned());
},
} }
users.push(event.sender().to_owned());
users.push(event.sender().to_owned());
self.update_participants(&root_id, &users) self.update_participants(&root_id, &users)
} }
pub async fn threads_until<'a>( pub fn threads_until<'a>(
&'a self, &'a self,
user_id: &'a UserId, user_id: &'a UserId,
room_id: &'a RoomId, room_id: &'a RoomId,
shorteventid: PduCount, shorteventid: PduCount,
_inc: &'a IncludeThreads, _inc: &'a IncludeThreads,
) -> Result<impl Stream<Item = (PduCount, PduEvent)> + Send + 'a> { ) -> impl Stream<Item = Result<(PduCount, PduEvent)>> + Send {
let shortroomid: ShortRoomId = self self.services
.services
.short .short
.get_shortroomid(room_id) .get_shortroomid(room_id)
.await?; .map_ok(move |shortroomid| PduId {
shortroomid,
shorteventid: shorteventid.saturating_sub(1),
})
.map_ok(Into::into)
.map_ok(move |current: RawPduId| {
self.db
.threadid_userids
.rev_raw_keys_from(&current)
.ignore_err()
.map(RawPduId::from)
.map(move |pdu_id| (pdu_id, user_id))
.ready_take_while(move |(pdu_id, _)| {
pdu_id.shortroomid() == current.shortroomid()
})
.wide_filter_map(async |(raw_pdu_id, user_id)| {
let pdu_id: PduId = raw_pdu_id.into();
let mut pdu = self
.services
.timeline
.get_pdu_from_id(&raw_pdu_id)
.await
.ok()?;
let current: RawPduId = PduId { if pdu.sender() != user_id {
shortroomid, pdu.as_mut_pdu().remove_transaction_id().ok();
shorteventid: shorteventid.saturating_sub(1), }
}
.into();
let stream = self Some((pdu_id.shorteventid, pdu))
.db })
.threadid_userids .map(Ok)
.rev_raw_keys_from(&current) })
.ignore_err() .try_flatten_stream()
.map(RawPduId::from)
.ready_take_while(move |pdu_id| pdu_id.shortroomid() == shortroomid.to_be_bytes())
.wide_filter_map(move |pdu_id| async move {
let mut pdu = self
.services
.timeline
.get_pdu_from_id(&pdu_id)
.await
.ok()?;
let pdu_id: PduId = pdu_id.into();
if pdu.sender() != user_id {
pdu.as_mut_pdu().remove_transaction_id().ok();
}
Some((pdu_id.shorteventid, pdu))
});
Ok(stream)
} }
pub(super) fn update_participants( pub(super) fn update_participants(