diff --git a/src/service/rooms/threads/mod.rs b/src/service/rooms/threads/mod.rs index 3b942eb3..103a1bc1 100644 --- a/src/service/rooms/threads/mod.rs +++ b/src/service/rooms/threads/mod.rs @@ -1,6 +1,6 @@ use std::{collections::BTreeMap, sync::Arc}; -use futures::{Stream, StreamExt}; +use futures::{Stream, StreamExt, TryFutureExt}; use ruma::{ CanonicalJsonValue, EventId, OwnedUserId, RoomId, UserId, api::client::threads::get_threads::v1::IncludeThreads, events::relation::BundledThread, uint, @@ -16,7 +16,7 @@ use tuwunel_core::{ }; use tuwunel_database::{Deserialized, Map}; -use crate::{Dep, rooms, rooms::short::ShortRoomId}; +use crate::{Dep, rooms}; pub struct Service { db: Data, @@ -125,61 +125,57 @@ impl Service { let mut users = Vec::new(); match self.get_participants(&root_id).await { - | Ok(userids) => { - users.extend_from_slice(&userids); - }, - | _ => { - users.push(root_pdu.sender().to_owned()); - }, + | Ok(userids) => users.extend_from_slice(&userids), + | _ => users.push(root_pdu.sender().to_owned()), } - users.push(event.sender().to_owned()); + users.push(event.sender().to_owned()); self.update_participants(&root_id, &users) } - pub async fn threads_until<'a>( + pub fn threads_until<'a>( &'a self, user_id: &'a UserId, room_id: &'a RoomId, shorteventid: PduCount, _inc: &'a IncludeThreads, - ) -> Result + Send + 'a> { - let shortroomid: ShortRoomId = self - .services + ) -> impl Stream> + Send { + self.services .short .get_shortroomid(room_id) - .await?; + .map_ok(move |shortroomid| PduId { + shortroomid, + shorteventid: shorteventid.saturating_sub(1), + }) + .map_ok(Into::into) + .map_ok(move |current: RawPduId| { + self.db + .threadid_userids + .rev_raw_keys_from(¤t) + .ignore_err() + .map(RawPduId::from) + .map(move |pdu_id| (pdu_id, user_id)) + .ready_take_while(move |(pdu_id, _)| { + pdu_id.shortroomid() == current.shortroomid() + }) + .wide_filter_map(async |(raw_pdu_id, user_id)| { + let pdu_id: PduId = raw_pdu_id.into(); + let mut pdu = self + .services + .timeline + .get_pdu_from_id(&raw_pdu_id) + .await + .ok()?; - let current: RawPduId = PduId { - shortroomid, - shorteventid: shorteventid.saturating_sub(1), - } - .into(); + if pdu.sender() != user_id { + pdu.as_mut_pdu().remove_transaction_id().ok(); + } - let stream = self - .db - .threadid_userids - .rev_raw_keys_from(¤t) - .ignore_err() - .map(RawPduId::from) - .ready_take_while(move |pdu_id| pdu_id.shortroomid() == shortroomid.to_be_bytes()) - .wide_filter_map(move |pdu_id| async move { - let mut pdu = self - .services - .timeline - .get_pdu_from_id(&pdu_id) - .await - .ok()?; - - let pdu_id: PduId = pdu_id.into(); - if pdu.sender() != user_id { - pdu.as_mut_pdu().remove_transaction_id().ok(); - } - - Some((pdu_id.shorteventid, pdu)) - }); - - Ok(stream) + Some((pdu_id.shorteventid, pdu)) + }) + .map(Ok) + }) + .try_flatten_stream() } pub(super) fn update_participants(