From 71f3ccf1401d6dfcdb0998f78e90b19a965427ae Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Mon, 24 Nov 2025 09:12:14 +0000 Subject: [PATCH] Unbox and pin database streams. Signed-off-by: Jason Volk --- clippy.toml | 12 ++++---- src/api/client/message.rs | 29 +++++++++++-------- src/api/client/sync/v3.rs | 1 + src/api/client/sync/v5/extensions/e2ee.rs | 1 + src/api/client/user_directory.rs | 3 +- src/database/map/get.rs | 22 ++++++++------ src/database/map/get_batch.rs | 2 +- src/database/map/keys.rs | 28 +++++++++--------- src/database/map/keys_from.rs | 26 +++++++++++------ src/database/map/rev_keys.rs | 28 +++++++++--------- src/database/map/rev_keys_from.rs | 26 +++++++++++------ src/database/map/rev_stream.rs | 28 +++++++++--------- src/database/map/rev_stream_from.rs | 28 +++++++++--------- src/database/map/stream.rs | 28 +++++++++--------- src/database/map/stream_from.rs | 28 +++++++++--------- src/database/stream.rs | 6 ++-- src/service/account_data/mod.rs | 11 ++++--- src/service/media/data.rs | 9 ++++-- src/service/rooms/delete/mod.rs | 5 ++-- src/service/rooms/metadata/mod.rs | 11 +++---- src/service/rooms/pdu_metadata/mod.rs | 14 ++------- src/service/rooms/short/mod.rs | 10 +++++-- .../rooms/state_accessor/room_state.rs | 2 ++ src/service/rooms/state_accessor/state.rs | 2 -- src/service/rooms/state_cache/mod.rs | 9 ++++-- src/service/users/keys.rs | 14 +++++---- 26 files changed, 217 insertions(+), 166 deletions(-) diff --git a/clippy.toml b/clippy.toml index eec1e7a4..eaafff4b 100644 --- a/clippy.toml +++ b/clippy.toml @@ -1,11 +1,11 @@ +stack-size-threshold = 393216 +future-size-threshold = 24576 array-size-threshold = 4096 -cognitive-complexity-threshold = 100 # TODO reduce me ALARA -excessive-nesting-threshold = 8 -future-size-threshold = 8192 -stack-size-threshold = 196608 # TODO reduce me ALARA -too-many-lines-threshold = 780 # TODO reduce me to <= 100 -type-complexity-threshold = 250 # reduce me to ~200 large-error-threshold = 256 # TODO reduce me ALARA +too-many-lines-threshold = 780 # TODO reduce me to <= 100 +excessive-nesting-threshold = 8 +type-complexity-threshold = 250 # reduce me to ~200 +cognitive-complexity-threshold = 100 # TODO reduce me ALARA #disallowed-macros = [ # { path = "log::error", reason = "use tuwunel_core::error" }, diff --git a/src/api/client/message.rs b/src/api/client/message.rs index f86b309e..620f2c68 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -1,5 +1,9 @@ use axum::extract::State; -use futures::{FutureExt, StreamExt, TryFutureExt, future::OptionFuture, pin_mut}; +use futures::{ + FutureExt, StreamExt, TryFutureExt, + future::{Either, OptionFuture}, + pin_mut, +}; use ruma::{ RoomId, UserId, api::{ @@ -105,17 +109,18 @@ pub(crate) async fn get_message_events_route( } let it = match body.dir { - | Direction::Forward => services - .timeline - .pdus(Some(sender_user), room_id, Some(from)) - .ignore_err() - .boxed(), - - | Direction::Backward => services - .timeline - .pdus_rev(Some(sender_user), room_id, Some(from)) - .ignore_err() - .boxed(), + | Direction::Forward => Either::Left( + services + .timeline + .pdus(Some(sender_user), room_id, Some(from)) + .ignore_err(), + ), + | Direction::Backward => Either::Right( + services + .timeline + .pdus_rev(Some(sender_user), room_id, Some(from)) + .ignore_err(), + ), }; let events: Vec<_> = it diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index c40a73b9..7e2b8827 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -1182,6 +1182,7 @@ async fn calculate_state_changes<'a>( .state_accessor .state_full_shortids(horizon_shortstatehash) .expect_ok() + .boxed() .into_future() }) .into(); diff --git a/src/api/client/sync/v5/extensions/e2ee.rs b/src/api/client/sync/v5/extensions/e2ee.rs index 84a3c1d6..72b322ec 100644 --- a/src/api/client/sync/v5/extensions/e2ee.rs +++ b/src/api/client/sync/v5/extensions/e2ee.rs @@ -170,6 +170,7 @@ async fn collect_room( .ready_filter(|&user_id| user_id != sender_user) .map(ToOwned::to_owned) .map(|user_id| (MembershipState::Join, user_id)) + .boxed() .into_future() }) .into(); diff --git a/src/api/client/user_directory.rs b/src/api/client/user_directory.rs index 0b4aabb4..eae78e71 100644 --- a/src/api/client/user_directory.rs +++ b/src/api/client/user_directory.rs @@ -34,7 +34,7 @@ pub(crate) async fn search_users_route( .min(LIMIT_MAX); let search_term = body.search_term.to_lowercase(); - let mut users = services + let users = services .users .stream() .ready_filter(|&user_id| user_id != sender_user) @@ -83,6 +83,7 @@ pub(crate) async fn search_users_route( }) }); + pin_mut!(users); let results = users.by_ref().take(limit).collect().await; let limited = users.next().await.is_some(); diff --git a/src/database/map/get.rs b/src/database/map/get.rs index 3c355644..6964023a 100644 --- a/src/database/map/get.rs +++ b/src/database/map/get.rs @@ -1,6 +1,9 @@ use std::{convert::AsRef, fmt::Debug, sync::Arc}; -use futures::{Future, FutureExt, TryFutureExt, future::ready}; +use futures::{ + Future, FutureExt, TryFutureExt, + future::{Either, ready}, +}; use rocksdb::{DBPinnableSlice, ReadOptions}; use tokio::task; use tuwunel_core::{Err, Result, err, implement, utils::result::MapExpect}; @@ -25,9 +28,9 @@ where let cached = self.get_cached(key); if matches!(cached, Err(_) | Ok(Some(_))) { - return task::consume_budget() - .map(move |()| cached.map_expect("data found in cache")) - .boxed(); + return Either::Left( + task::consume_budget().map(move |()| cached.map_expect("data found in cache")), + ); } debug_assert!(matches!(cached, Ok(None)), "expected status Incomplete"); @@ -37,11 +40,12 @@ where res: None, }; - self.engine - .pool - .execute_get(cmd) - .and_then(|mut res| ready(res.remove(0))) - .boxed() + Either::Right( + self.engine + .pool + .execute_get(cmd) + .and_then(|mut res| ready(res.remove(0))), + ) } /// Fetch a value from the cache without I/O. diff --git a/src/database/map/get_batch.rs b/src/database/map/get_batch.rs index dfe7614d..172a1ae4 100644 --- a/src/database/map/get_batch.rs +++ b/src/database/map/get_batch.rs @@ -50,12 +50,12 @@ where .widen_then(automatic_width(), |chunk| { self.engine.pool.execute_get(Get { map: self.clone(), + res: None, key: chunk .iter() .map(AsRef::as_ref) .map(Into::into) .collect(), - res: None, }) }) .map_ok(|results| results.into_iter().stream()) diff --git a/src/database/map/keys.rs b/src/database/map/keys.rs index a1b3b5ee..4c9cd957 100644 --- a/src/database/map/keys.rs +++ b/src/database/map/keys.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; +use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::Either}; use rocksdb::Direction; use serde::Deserialize; use tokio::task; @@ -27,11 +27,12 @@ pub fn raw_keys(self: &Arc) -> impl Stream>> + Send let state = stream::State::new(self, opts); if is_cached(self) { let state = state.init_fwd(None); - return task::consume_budget() - .map(move |()| stream::Keys::<'_>::from(state)) - .into_stream() - .flatten() - .boxed(); + return Either::Left( + task::consume_budget() + .map(move |()| stream::Keys::<'_>::from(state)) + .into_stream() + .flatten(), + ); } let seek = Seek { @@ -42,11 +43,12 @@ pub fn raw_keys(self: &Arc) -> impl Stream>> + Send res: None, }; - self.engine - .pool - .execute_iter(seek) - .ok_into::>() - .into_stream() - .try_flatten() - .boxed() + Either::Right( + self.engine + .pool + .execute_iter(seek) + .ok_into::>() + .into_stream() + .try_flatten(), + ) } diff --git a/src/database/map/keys_from.rs b/src/database/map/keys_from.rs index 2efa3588..e1324b21 100644 --- a/src/database/map/keys_from.rs +++ b/src/database/map/keys_from.rs @@ -1,8 +1,9 @@ use std::{convert::AsRef, fmt::Debug, sync::Arc}; -use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; +use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::Either}; use rocksdb::Direction; use serde::{Deserialize, Serialize}; +use tokio::task; use tuwunel_core::{Result, implement}; use super::stream_from::is_cached; @@ -64,7 +65,13 @@ where let opts = super::iter_options_default(&self.engine); let state = stream::State::new(self, opts); if is_cached(self, from) { - return stream::Keys::<'_>::from(state.init_fwd(from.as_ref().into())).boxed(); + let state = state.init_fwd(from.as_ref().into()); + return Either::Left( + task::consume_budget() + .map(move |()| stream::Keys::<'_>::from(state)) + .into_stream() + .flatten(), + ); } let seek = Seek { @@ -75,11 +82,12 @@ where res: None, }; - self.engine - .pool - .execute_iter(seek) - .ok_into::>() - .into_stream() - .try_flatten() - .boxed() + Either::Right( + self.engine + .pool + .execute_iter(seek) + .ok_into::>() + .into_stream() + .try_flatten(), + ) } diff --git a/src/database/map/rev_keys.rs b/src/database/map/rev_keys.rs index 3dc1e9b9..0a130b18 100644 --- a/src/database/map/rev_keys.rs +++ b/src/database/map/rev_keys.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; +use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::Either}; use rocksdb::Direction; use serde::Deserialize; use tokio::task; @@ -27,11 +27,12 @@ pub fn rev_raw_keys(self: &Arc) -> impl Stream>> + S let state = stream::State::new(self, opts); if is_cached(self) { let state = state.init_rev(None); - return task::consume_budget() - .map(move |()| stream::KeysRev::<'_>::from(state)) - .into_stream() - .flatten() - .boxed(); + return Either::Left( + task::consume_budget() + .map(move |()| stream::KeysRev::<'_>::from(state)) + .into_stream() + .flatten(), + ); } let seek = Seek { @@ -42,11 +43,12 @@ pub fn rev_raw_keys(self: &Arc) -> impl Stream>> + S res: None, }; - self.engine - .pool - .execute_iter(seek) - .ok_into::>() - .into_stream() - .try_flatten() - .boxed() + Either::Right( + self.engine + .pool + .execute_iter(seek) + .ok_into::>() + .into_stream() + .try_flatten(), + ) } diff --git a/src/database/map/rev_keys_from.rs b/src/database/map/rev_keys_from.rs index 9fd9a562..8ad0bd9a 100644 --- a/src/database/map/rev_keys_from.rs +++ b/src/database/map/rev_keys_from.rs @@ -1,8 +1,9 @@ use std::{convert::AsRef, fmt::Debug, sync::Arc}; -use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; +use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::Either}; use rocksdb::Direction; use serde::{Deserialize, Serialize}; +use tokio::task; use tuwunel_core::{Result, implement}; use super::rev_stream_from::is_cached; @@ -64,7 +65,13 @@ where let opts = super::iter_options_default(&self.engine); let state = stream::State::new(self, opts); if is_cached(self, from) { - return stream::KeysRev::<'_>::from(state.init_rev(from.as_ref().into())).boxed(); + let state = state.init_rev(from.as_ref().into()); + return Either::Left( + task::consume_budget() + .map(move |()| stream::KeysRev::<'_>::from(state)) + .into_stream() + .flatten(), + ); } let seek = Seek { @@ -75,11 +82,12 @@ where res: None, }; - self.engine - .pool - .execute_iter(seek) - .ok_into::>() - .into_stream() - .try_flatten() - .boxed() + Either::Right( + self.engine + .pool + .execute_iter(seek) + .ok_into::>() + .into_stream() + .try_flatten(), + ) } diff --git a/src/database/map/rev_stream.rs b/src/database/map/rev_stream.rs index 6ab2f4a2..f60fd23b 100644 --- a/src/database/map/rev_stream.rs +++ b/src/database/map/rev_stream.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; +use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::Either}; use rocksdb::Direction; use serde::Deserialize; use tokio::task; @@ -35,11 +35,12 @@ pub fn rev_raw_stream(self: &Arc) -> impl Stream> let state = stream::State::new(self, opts); if is_cached(self) { let state = state.init_rev(None); - return task::consume_budget() - .map(move |()| stream::ItemsRev::<'_>::from(state)) - .into_stream() - .flatten() - .boxed(); + return Either::Left( + task::consume_budget() + .map(move |()| stream::ItemsRev::<'_>::from(state)) + .into_stream() + .flatten(), + ); } let seek = Seek { @@ -50,13 +51,14 @@ pub fn rev_raw_stream(self: &Arc) -> impl Stream> res: None, }; - self.engine - .pool - .execute_iter(seek) - .ok_into::>() - .into_stream() - .try_flatten() - .boxed() + Either::Right( + self.engine + .pool + .execute_iter(seek) + .ok_into::>() + .into_stream() + .try_flatten(), + ) } #[tracing::instrument( diff --git a/src/database/map/rev_stream_from.rs b/src/database/map/rev_stream_from.rs index 6bbc1c40..7b1cd3f7 100644 --- a/src/database/map/rev_stream_from.rs +++ b/src/database/map/rev_stream_from.rs @@ -1,6 +1,6 @@ use std::{convert::AsRef, fmt::Debug, sync::Arc}; -use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; +use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::Either}; use rocksdb::Direction; use serde::{Deserialize, Serialize}; use tokio::task; @@ -84,11 +84,12 @@ where let state = stream::State::new(self, opts); if is_cached(self, from) { let state = state.init_rev(from.as_ref().into()); - return task::consume_budget() - .map(move |()| stream::ItemsRev::<'_>::from(state)) - .into_stream() - .flatten() - .boxed(); + return Either::Left( + task::consume_budget() + .map(move |()| stream::ItemsRev::<'_>::from(state)) + .into_stream() + .flatten(), + ); } let seek = Seek { @@ -99,13 +100,14 @@ where res: None, }; - self.engine - .pool - .execute_iter(seek) - .ok_into::>() - .into_stream() - .try_flatten() - .boxed() + Either::Right( + self.engine + .pool + .execute_iter(seek) + .ok_into::>() + .into_stream() + .try_flatten(), + ) } #[tracing::instrument( diff --git a/src/database/map/stream.rs b/src/database/map/stream.rs index ec88defc..e17af62a 100644 --- a/src/database/map/stream.rs +++ b/src/database/map/stream.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; +use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::Either}; use rocksdb::Direction; use serde::Deserialize; use tokio::task; @@ -35,11 +35,12 @@ pub fn raw_stream(self: &Arc) -> impl Stream>> + let state = stream::State::new(self, opts); if is_cached(self) { let state = state.init_fwd(None); - return task::consume_budget() - .map(move |()| stream::Items::<'_>::from(state)) - .into_stream() - .flatten() - .boxed(); + return Either::Left( + task::consume_budget() + .map(move |()| stream::Items::<'_>::from(state)) + .into_stream() + .flatten(), + ); } let seek = Seek { @@ -50,13 +51,14 @@ pub fn raw_stream(self: &Arc) -> impl Stream>> + res: None, }; - self.engine - .pool - .execute_iter(seek) - .ok_into::>() - .into_stream() - .try_flatten() - .boxed() + Either::Right( + self.engine + .pool + .execute_iter(seek) + .ok_into::>() + .into_stream() + .try_flatten(), + ) } #[tracing::instrument( diff --git a/src/database/map/stream_from.rs b/src/database/map/stream_from.rs index e8bb2ba7..7163c8af 100644 --- a/src/database/map/stream_from.rs +++ b/src/database/map/stream_from.rs @@ -1,6 +1,6 @@ use std::{convert::AsRef, fmt::Debug, sync::Arc}; -use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; +use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future::Either}; use rocksdb::Direction; use serde::{Deserialize, Serialize}; use tokio::task; @@ -83,11 +83,12 @@ where let state = stream::State::new(self, opts); if is_cached(self, from) { let state = state.init_fwd(from.as_ref().into()); - return task::consume_budget() - .map(move |()| stream::Items::<'_>::from(state)) - .into_stream() - .flatten() - .boxed(); + return Either::Left( + task::consume_budget() + .map(move |()| stream::Items::<'_>::from(state)) + .into_stream() + .flatten(), + ); } let seek = Seek { @@ -98,13 +99,14 @@ where res: None, }; - self.engine - .pool - .execute_iter(seek) - .ok_into::>() - .into_stream() - .try_flatten() - .boxed() + Either::Right( + self.engine + .pool + .execute_iter(seek) + .ok_into::>() + .into_stream() + .try_flatten(), + ) } #[tracing::instrument( diff --git a/src/database/stream.rs b/src/database/stream.rs index 7160d1f2..b92b265e 100644 --- a/src/database/stream.rs +++ b/src/database/stream.rs @@ -22,7 +22,7 @@ pub(crate) struct State<'a> { init: bool, } -pub(crate) trait Cursor<'a, T> { +pub(crate) trait Cursor<'a, T>: Send { fn state(&self) -> &State<'a>; fn fetch(&self) -> Option; @@ -50,12 +50,12 @@ impl<'a> State<'a> { #[inline] pub(super) fn new(map: &'a Arc, opts: ReadOptions) -> Self { Self { + init: true, + seek: false, inner: map .engine() .db .raw_iterator_cf_opt(&map.cf(), opts), - init: true, - seek: false, } } diff --git a/src/service/account_data/mod.rs b/src/service/account_data/mod.rs index 847a540d..0ed592a2 100644 --- a/src/service/account_data/mod.rs +++ b/src/service/account_data/mod.rs @@ -3,7 +3,7 @@ mod room_tags; use std::sync::Arc; -use futures::{Stream, StreamExt, TryFutureExt}; +use futures::{Stream, StreamExt, TryFutureExt, pin_mut}; use ruma::{ RoomId, UserId, events::{ @@ -175,15 +175,18 @@ pub async fn last_count<'a>( let upper = upper.unwrap_or(u64::MAX); let key = (room_id, user_id, upper, Interfix); - self.db + let keys = self + .db .roomuserdataid_accountdata .rev_keys_from(&key) .ignore_err() .ready_take_while(move |(room_id_, user_id_, ..): &Key<'_>| { room_id == *room_id_ && user_id == *user_id_ }) - .map(at!(2)) - .next() + .map(at!(2)); + + pin_mut!(keys); + keys.next() .await .ok_or_else(|| err!(Request(NotFound("No account data found.")))) } diff --git a/src/service/media/data.rs b/src/service/media/data.rs index db1346d3..aa2e8ba4 100644 --- a/src/service/media/data.rs +++ b/src/service/media/data.rs @@ -1,6 +1,6 @@ use std::{sync::Arc, time::Duration}; -use futures::StreamExt; +use futures::{StreamExt, pin_mut}; use ruma::{Mxc, OwnedMxcUri, UserId, http_headers::ContentDisposition}; use tuwunel_core::{ Err, Result, debug, debug_info, err, @@ -109,11 +109,14 @@ impl Data { let dim: &[u32] = &[dim.width, dim.height]; let prefix = (mxc, dim, Interfix); - let key = self + let keys = self .mediaid_file .keys_prefix_raw(&prefix) .ignore_err() - .map(ToOwned::to_owned) + .map(ToOwned::to_owned); + + pin_mut!(keys); + let key = keys .next() .await .ok_or_else(|| err!(Request(NotFound("Media not found"))))?; diff --git a/src/service/rooms/delete/mod.rs b/src/service/rooms/delete/mod.rs index c0f50b3a..f8ddefdc 100644 --- a/src/service/rooms/delete/mod.rs +++ b/src/service/rooms/delete/mod.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use futures::{FutureExt, StreamExt, pin_mut}; +use futures::{FutureExt, StreamExt}; use ruma::RoomId; use tuwunel_core::{ Result, debug, @@ -35,6 +35,7 @@ impl Service { .services .state_cache .local_users_in_room(room_id) + .boxed() .into_future() .map(|(next, ..)| next.as_ref().is_some()); @@ -42,10 +43,10 @@ impl Service { .services .state_cache .local_users_invited_to_room(room_id) + .boxed() .into_future() .map(|(next, ..)| next.as_ref().is_some()); - pin_mut!(has_local_users, has_local_invites); if has_local_users.or(has_local_invites).await { trace!(?room_id, "Not deleting with local joined or invited"); return; diff --git a/src/service/rooms/metadata/mod.rs b/src/service/rooms/metadata/mod.rs index 06be9aff..ad7454f5 100644 --- a/src/service/rooms/metadata/mod.rs +++ b/src/service/rooms/metadata/mod.rs @@ -46,13 +46,14 @@ pub async fn exists(&self, room_id: &RoomId) -> bool { }; // Look for PDUs in that room. - self.db + let keys = self + .db .pduid_pdu .keys_prefix_raw(&prefix) - .ignore_err() - .next() - .await - .is_some() + .ignore_err(); + + pin_mut!(keys); + keys.next().await.is_some() } #[implement(Service)] diff --git a/src/service/rooms/pdu_metadata/mod.rs b/src/service/rooms/pdu_metadata/mod.rs index e1aeca3d..1548fb88 100644 --- a/src/service/rooms/pdu_metadata/mod.rs +++ b/src/service/rooms/pdu_metadata/mod.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use futures::{Stream, StreamExt, TryFutureExt}; +use futures::{Stream, StreamExt, TryFutureExt, future::Either}; use ruma::{EventId, RoomId, UserId, api::Direction}; use tuwunel_core::{ PduId, Result, @@ -86,16 +86,8 @@ pub fn get_relations<'a>( }; match dir { - | Direction::Backward => self - .db - .tofrom_relation - .rev_raw_keys_from(start) - .boxed(), - | Direction::Forward => self - .db - .tofrom_relation - .raw_keys_from(start) - .boxed(), + | Direction::Backward => Either::Left(self.db.tofrom_relation.rev_raw_keys_from(start)), + | Direction::Forward => Either::Right(self.db.tofrom_relation.raw_keys_from(start)), } .ignore_err() .ready_take_while(move |key| key.starts_with(&target)) diff --git a/src/service/rooms/short/mod.rs b/src/service/rooms/short/mod.rs index 595a73cb..f5562868 100644 --- a/src/service/rooms/short/mod.rs +++ b/src/service/rooms/short/mod.rs @@ -1,6 +1,6 @@ use std::{borrow::Borrow, fmt::Debug, mem::size_of_val, sync::Arc}; -use futures::{FutureExt, Stream, StreamExt}; +use futures::{FutureExt, Stream, StreamExt, pin_mut}; use ruma::{EventId, OwnedRoomId, RoomId, events::StateEventType}; use serde::Deserialize; pub use tuwunel_core::matrix::{ShortEventId, ShortId, ShortRoomId, ShortStateKey}; @@ -245,10 +245,14 @@ pub async fn get_shortroomid(&self, room_id: &RoomId) -> Result { #[implement(Service)] pub async fn get_roomid_from_short(&self, shortroomid_: ShortRoomId) -> Result { - self.db + let stream = self + .db .roomid_shortroomid .stream() - .ready_filter_map(Result::ok) + .ready_filter_map(Result::ok); + + pin_mut!(stream); + stream .ready_find(|&(_, shortroomid)| shortroomid == shortroomid_) .map(|found| found.map(|(room_id, _): (&RoomId, ShortRoomId)| room_id.to_owned())) .await diff --git a/src/service/rooms/state_accessor/room_state.rs b/src/service/rooms/state_accessor/room_state.rs index dbb716a9..51efff44 100644 --- a/src/service/rooms/state_accessor/room_state.rs +++ b/src/service/rooms/state_accessor/room_state.rs @@ -108,6 +108,7 @@ pub fn room_state_keys_with_ids<'a>( .map_ok(|shortstatehash| { self.state_keys_with_ids(shortstatehash, event_type) .map(Ok) + .boxed() }) .map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}"))) .try_flatten_stream() @@ -127,6 +128,7 @@ pub fn room_state_keys<'a>( .map_ok(|shortstatehash| { self.state_keys(shortstatehash, event_type) .map(Ok) + .boxed() }) .map_err(move |e| err!(Database("Missing state for {room_id:?}: {e:?}"))) .try_flatten_stream() diff --git a/src/service/rooms/state_accessor/state.rs b/src/service/rooms/state_accessor/state.rs index ae5dc1a9..a12c161e 100644 --- a/src/service/rooms/state_accessor/state.rs +++ b/src/service/rooms/state_accessor/state.rs @@ -249,7 +249,6 @@ pub fn state_keys_with_shortids<'a>( .ignore_err() .unzip() .map(|(ssks, sids): (Vec, Vec)| (ssks, sids)) - .boxed() .shared(); let shortstatekeys = short_ids @@ -410,7 +409,6 @@ pub fn state_full_shortids( .map_ok(Vec::into_iter) .map_ok(IterStream::try_stream) .try_flatten_stream() - .boxed() } #[implement(super::Service)] diff --git a/src/service/rooms/state_cache/mod.rs b/src/service/rooms/state_cache/mod.rs index d56722c1..b109452e 100644 --- a/src/service/rooms/state_cache/mod.rs +++ b/src/service/rooms/state_cache/mod.rs @@ -212,8 +212,9 @@ pub fn get_shared_rooms<'a>( ) -> impl Stream + Send + 'a { use tuwunel_core::utils::set; - let a = self.rooms_joined(user_a); - let b = self.rooms_joined(user_b); + let a = self.rooms_joined(user_a).boxed(); + let b = self.rooms_joined(user_b).boxed(); + set::intersection_sorted_stream2(a, b) } @@ -415,6 +416,7 @@ pub fn user_memberships<'a>( .then(|| { self.rooms_joined(user_id) .map(|room_id| (Join, room_id)) + .boxed() .into_future() }) .into(); @@ -424,6 +426,7 @@ pub fn user_memberships<'a>( .then(|| { self.rooms_invited(user_id) .map(|room_id| (Invite, room_id)) + .boxed() .into_future() }) .into(); @@ -433,6 +436,7 @@ pub fn user_memberships<'a>( .then(|| { self.rooms_knocked(user_id) .map(|room_id| (Knock, room_id)) + .boxed() .into_future() }) .into(); @@ -442,6 +446,7 @@ pub fn user_memberships<'a>( .then(|| { self.rooms_left(user_id) .map(|room_id| (Leave, room_id)) + .boxed() .into_future() }) .into(); diff --git a/src/service/users/keys.rs b/src/service/users/keys.rs index bd7649af..a2f1f017 100644 --- a/src/service/users/keys.rs +++ b/src/service/users/keys.rs @@ -1,6 +1,6 @@ use std::{collections::BTreeMap, mem}; -use futures::{Stream, StreamExt, TryFutureExt}; +use futures::{Stream, StreamExt, TryFutureExt, pin_mut}; use ruma::{ DeviceId, KeyId, OneTimeKeyAlgorithm, OneTimeKeyId, OneTimeKeyName, OwnedKeyId, RoomId, UInt, UserId, @@ -113,7 +113,7 @@ pub async fn take_one_time_key( prefix.extend_from_slice(key_algorithm.as_ref().as_bytes()); prefix.push(b':'); - let one_time_key = self + let one_time_keys = self .db .onetimekeyid_onetimekeys .raw_stream_prefix(&prefix) @@ -136,11 +136,13 @@ pub async fn take_one_time_key( .unwrap(); (key, val) - }) - .next() - .await; + }); - one_time_key.ok_or_else(|| err!(Request(NotFound("No one-time-key found")))) + pin_mut!(one_time_keys); + one_time_keys + .next() + .await + .ok_or_else(|| err!(Request(NotFound("No one-time-key found")))) } #[implement(super::Service)]