From e78bf21085ee3d30e39e8f25bc22778fc21cb3f6 Mon Sep 17 00:00:00 2001 From: dasha_uwu Date: Sat, 17 Jan 2026 05:38:09 +0500 Subject: [PATCH] Introduce OptionFuture helpers Optimize user directory searches --- src/api/client/context.rs | 8 +- src/api/client/message.rs | 13 +- src/api/client/room/create.rs | 9 +- src/api/client/room/summary.rs | 22 +- src/api/client/search.rs | 15 +- src/api/client/space.rs | 44 ++- src/api/client/sync/v3.rs | 332 ++++++++---------- src/api/client/sync/v5/extensions.rs | 30 +- src/api/client/sync/v5/extensions/e2ee.rs | 25 +- src/api/client/sync/v5/filter.rs | 133 ++++--- src/api/client/sync/v5/rooms.rs | 65 ++-- src/api/client/sync/v5/selector.rs | 84 ++--- src/api/client/user_directory.rs | 5 +- src/api/server/utils.rs | 17 +- src/core/config/mod.rs | 17 +- src/core/matrix/state_res/resolve.rs | 18 +- src/core/utils/bool.rs | 9 + src/core/utils/future/mod.rs | 2 +- src/core/utils/future/option_ext.rs | 4 +- src/core/utils/mod.rs | 1 + src/core/utils/option.rs | 11 + src/service/membership/join.rs | 40 +-- src/service/presence/mod.rs | 24 +- src/service/pusher/append.rs | 15 +- .../event_handler/handle_incoming_pdu.rs | 12 +- .../rooms/event_handler/state_at_incoming.rs | 17 +- src/service/rooms/state_cache/mod.rs | 35 +- src/service/sending/sender.rs | 14 +- 28 files changed, 454 insertions(+), 567 deletions(-) create mode 100644 src/core/utils/option.rs diff --git a/src/api/client/context.rs b/src/api/client/context.rs index 3eacbf5b..3d4f5866 100644 --- a/src/api/client/context.rs +++ b/src/api/client/context.rs @@ -1,7 +1,7 @@ use axum::extract::State; use futures::{ FutureExt, StreamExt, TryFutureExt, TryStreamExt, - future::{OptionFuture, join, join3, try_join3}, + future::{join, join3, try_join3}, }; use ruma::{OwnedEventId, UserId, api::client::context::get_context, events::StateEventType}; use tuwunel_core::{ @@ -9,6 +9,7 @@ use tuwunel_core::{ utils::{ IterStream, future::TryExtExt, + option::OptionExt, stream::{BroadbandExt, ReadyExt, TryIgnore, WidebandExt}, }, }; @@ -116,7 +117,7 @@ pub(crate) async fn get_context_route( options: Some(&filter.lazy_load_options), }; - let lazy_loading_witnessed: OptionFuture<_> = filter + let lazy_loading_witnessed = filter .lazy_load_options .is_enabled() .then_some( @@ -125,8 +126,7 @@ pub(crate) async fn get_context_route( .chain(events_before.iter()) .chain(events_after.iter()), ) - .map(|witnessed| lazy_loading_witness(&services, &lazy_loading_context, witnessed)) - .into(); + .map_async(|witnessed| lazy_loading_witness(&services, &lazy_loading_context, witnessed)); let state_at = events_after .last() diff --git a/src/api/client/message.rs b/src/api/client/message.rs index 620f2c68..6c6d51a0 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -1,9 +1,5 @@ use axum::extract::State; -use futures::{ - FutureExt, StreamExt, TryFutureExt, - future::{Either, OptionFuture}, - pin_mut, -}; +use futures::{FutureExt, StreamExt, TryFutureExt, future::Either, pin_mut}; use ruma::{ RoomId, UserId, api::{ @@ -21,7 +17,7 @@ use tuwunel_core::{ }, ref_at, utils::{ - IterStream, ReadyExt, + BoolExt, IterStream, ReadyExt, result::{FlatOk, LogErr}, stream::{BroadbandExt, TryIgnore, WidebandExt}, }, @@ -140,11 +136,10 @@ pub(crate) async fn get_message_events_route( options: Some(&filter.lazy_load_options), }; - let witness: OptionFuture<_> = filter + let witness = filter .lazy_load_options .is_enabled() - .then(|| lazy_loading_witness(&services, &lazy_loading_context, events.iter())) - .into(); + .then_async(|| lazy_loading_witness(&services, &lazy_loading_context, events.iter())); let state = witness .map(Option::into_iter) diff --git a/src/api/client/room/create.rs b/src/api/client/room/create.rs index 3cbdac3b..396665ec 100644 --- a/src/api/client/room/create.rs +++ b/src/api/client/room/create.rs @@ -1,7 +1,7 @@ use std::collections::BTreeMap; use axum::extract::State; -use futures::{FutureExt, future::OptionFuture}; +use futures::FutureExt; use ruma::{ CanonicalJsonObject, EventEncryptionAlgorithm, Int, OwnedRoomAliasId, OwnedRoomId, OwnedUserId, RoomId, RoomVersionId, @@ -32,7 +32,7 @@ use serde_json::{json, value::to_raw_value}; use tuwunel_core::{ Err, Result, debug_info, debug_warn, err, info, matrix::{StateKey, pdu::PduBuilder, room_version}, - utils::BoolExt, + utils::{BoolExt, option::OptionExt}, warn, }; use tuwunel_service::{Services, appservice::RegistrationInfo, rooms::state::RoomMutexGuard}; @@ -72,11 +72,10 @@ pub(crate) async fn create_room_route( | _ => RoomPreset::PrivateChat, // Room visibility should not be custom }); - let alias: OptionFuture<_> = body + let alias = body .room_alias_name .as_ref() - .map(|alias| room_alias_check(&services, alias, body.appservice_info.as_ref())) - .into(); + .map_async(|alias| room_alias_check(&services, alias, body.appservice_info.as_ref())); // Determine room version let (room_version, version_rules) = body diff --git a/src/api/client/room/summary.rs b/src/api/client/room/summary.rs index 2d21a692..36cfe8b2 100644 --- a/src/api/client/room/summary.rs +++ b/src/api/client/room/summary.rs @@ -1,10 +1,6 @@ use axum::extract::State; use axum_client_ip::InsecureClientIp; -use futures::{ - FutureExt, StreamExt, TryFutureExt, - future::{OptionFuture, join3}, - stream::FuturesUnordered, -}; +use futures::{FutureExt, StreamExt, TryFutureExt, future::join3, stream::FuturesUnordered}; use ruma::{ OwnedServerName, RoomId, UserId, api::{ @@ -16,7 +12,7 @@ use ruma::{ }; use tuwunel_core::{ Err, Result, debug_warn, trace, - utils::{IterStream, future::TryExtExt}, + utils::{IterStream, future::TryExtExt, option::OptionExt}, }; use tuwunel_service::Services; @@ -158,14 +154,12 @@ async fn local_room_summary_response( .room_joined_count(room_id) .unwrap_or(0); - let membership: OptionFuture<_> = sender_user - .map(|sender_user| { - services - .state_accessor - .get_member(room_id, sender_user) - .map_ok_or(MembershipState::Leave, |content| content.membership) - }) - .into(); + let membership = sender_user.map_async(|sender_user| { + services + .state_accessor + .get_member(room_id, sender_user) + .map_ok_or(MembershipState::Leave, |content| content.membership) + }); let ( canonical_alias, diff --git a/src/api/client/search.rs b/src/api/client/search.rs index d13ff851..63340691 100644 --- a/src/api/client/search.rs +++ b/src/api/client/search.rs @@ -1,7 +1,7 @@ use std::collections::BTreeMap; use axum::extract::State; -use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::OptionFuture}; +use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use ruma::{ OwnedRoomId, RoomId, UInt, UserId, api::client::search::search_events::{ @@ -16,7 +16,7 @@ use tuwunel_core::{ Err, Result, at, is_true, matrix::Event, result::FlatOk, - utils::{IterStream, stream::ReadyExt}, + utils::{IterStream, option::OptionExt, stream::ReadyExt}, }; use tuwunel_service::{Services, rooms::search::RoomQuery}; @@ -41,18 +41,17 @@ pub(crate) async fn search_events_route( ) -> Result { let sender_user = body.sender_user(); let next_batch = body.next_batch.as_deref(); - let room_events_result: OptionFuture<_> = body + let room_events = body .search_categories .room_events .as_ref() - .map(|criteria| category_room_events(&services, sender_user, next_batch, criteria)) - .into(); + .map_async(|criteria| category_room_events(&services, sender_user, next_batch, criteria)) + .await + .transpose()?; Ok(Response { search_categories: ResultCategories { - room_events: room_events_result - .await - .unwrap_or_else(|| Ok(ResultRoomEvents::default()))?, + room_events: room_events.unwrap_or_default(), }, }) } diff --git a/src/api/client/space.rs b/src/api/client/space.rs index 48bd8696..bcb5e213 100644 --- a/src/api/client/space.rs +++ b/src/api/client/space.rs @@ -1,13 +1,13 @@ use std::{collections::BTreeSet, iter::once, str::FromStr}; use axum::extract::State; -use futures::{FutureExt, StreamExt, TryFutureExt, future::OptionFuture, stream::FuturesOrdered}; +use futures::{FutureExt, StreamExt, TryFutureExt, stream::FuturesOrdered}; use ruma::{ OwnedRoomId, OwnedServerName, RoomId, UInt, UserId, api::client::space::get_hierarchy, }; use tuwunel_core::{ Err, Result, debug_error, - utils::{future::TryExtExt, stream::IterStream}, + utils::{future::TryExtExt, option::OptionExt, stream::IterStream}, }; use tuwunel_service::{ Services, @@ -187,30 +187,26 @@ where } } - let next_batch: OptionFuture<_> = queue - .next() - .await - .map(async |(room, ..)| { - parents.insert(room); + let next_batch = queue.next().await.map_async(async |(room, ..)| { + parents.insert(room); - let next_short_room_ids: Vec<_> = parents - .iter() - .stream() - .filter_map(|room_id| services.short.get_shortroomid(room_id).ok()) - .collect() - .await; + let next_short_room_ids: Vec<_> = parents + .iter() + .stream() + .filter_map(|room_id| services.short.get_shortroomid(room_id).ok()) + .collect() + .await; - (next_short_room_ids.iter().ne(short_room_ids) && !next_short_room_ids.is_empty()) - .then_some(PaginationToken { - short_room_ids: next_short_room_ids, - limit: limit.try_into().ok()?, - max_depth: max_depth.try_into().ok()?, - suggested_only, - }) - .as_ref() - .map(PaginationToken::to_string) - }) - .into(); + (next_short_room_ids.iter().ne(short_room_ids) && !next_short_room_ids.is_empty()) + .then_some(PaginationToken { + short_room_ids: next_short_room_ids, + limit: limit.try_into().ok()?, + max_depth: max_depth.try_into().ok()?, + suggested_only, + }) + .as_ref() + .map(PaginationToken::to_string) + }); Ok(get_hierarchy::v1::Response { next_batch: next_batch.await.flatten(), diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index dcd3f641..aec624ec 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -6,7 +6,7 @@ use std::{ use axum::extract::State; use futures::{ FutureExt, StreamExt, TryFutureExt, - future::{OptionFuture, join, join3, join4, join5}, + future::{join, join3, join4, join5}, pin_mut, }; use ruma::{ @@ -52,6 +52,7 @@ use tuwunel_core::{ self, BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt, future::{OptionStream, ReadyBoolExt}, math::ruma_from_u64, + option::OptionExt, result::MapExpect, stream::{BroadbandExt, Tools, TryExpect, WidebandExt}, string::to_small_string, @@ -130,19 +131,18 @@ pub(crate) async fn sync_events_route( let sender_user = body.sender_user(); let sender_device = body.sender_device.as_deref(); - let filter: OptionFuture<_> = body + let filter = body .body .filter .as_ref() - .map(async |filter| match filter { + .map_async(async |filter| match filter { | Filter::FilterDefinition(filter) => filter.clone(), | Filter::FilterId(filter_id) => services .users .get_filter(sender_user, filter_id) .await .unwrap_or_default(), - }) - .into(); + }); let filter = filter.map(Option::unwrap_or_default); let full_state = body.body.full_state; @@ -243,16 +243,13 @@ async fn build_empty_response( sender_device: Option<&DeviceId>, next_batch: u64, ) -> sync_events::v3::Response { - let device_one_time_keys_count: OptionFuture<_> = sender_device - .map(|sender_device| { - services - .users - .count_one_time_keys(sender_user, sender_device) - }) - .into(); - sync_events::v3::Response { - device_one_time_keys_count: device_one_time_keys_count + device_one_time_keys_count: sender_device + .map_async(|sender_device| { + services + .users + .count_one_time_keys(sender_user, sender_device) + }) .await .unwrap_or_default(), @@ -380,11 +377,12 @@ async fn build_sync_events( knocked_rooms }); - let presence_updates: OptionFuture<_> = services + let presence_updates = services .config .allow_local_presence - .then(|| process_presence_updates(services, since, next_batch, sender_user, filter)) - .into(); + .then_async(|| { + process_presence_updates(services, since, next_batch, sender_user, filter) + }); let account_data = services .account_data @@ -399,32 +397,26 @@ async fn build_sync_events( .map(ToOwned::to_owned) .collect::>(); - let to_device_events: OptionFuture<_> = sender_device - .map(|sender_device| { - services - .users - .get_to_device_events(sender_user, sender_device, Some(since), Some(next_batch)) - .map(at!(1)) - .collect::>() - }) - .into(); + let to_device_events = sender_device.map_async(|sender_device| { + services + .users + .get_to_device_events(sender_user, sender_device, Some(since), Some(next_batch)) + .map(at!(1)) + .collect::>() + }); - let device_one_time_keys_count: OptionFuture<_> = sender_device - .map(|sender_device| { - services - .users - .count_one_time_keys(sender_user, sender_device) - }) - .into(); + let device_one_time_keys_count = sender_device.map_async(|sender_device| { + services + .users + .count_one_time_keys(sender_user, sender_device) + }); // Remove all to-device events the device received *last time* - let remove_to_device_events: OptionFuture<_> = sender_device - .map(|sender_device| { - services - .users - .remove_to_device_events(sender_user, sender_device, since) - }) - .into(); + let remove_to_device_events = sender_device.map_async(|sender_device| { + services + .users + .remove_to_device_events(sender_user, sender_device, since) + }); let ( account_data, @@ -643,17 +635,16 @@ async fn load_left_room( .prev_shortstatehash(room_id, PduCount::Normal(since).saturating_add(1)) .ok(); - let horizon_shortstatehash: OptionFuture<_> = timeline_pdus + let horizon_shortstatehash = timeline_pdus .first() .map(at!(0)) - .map(|count| { + .map_async(|count| { services .timeline .get_shortstatehash(room_id, count) .inspect_err(inspect_debug_log) .ok() - }) - .into(); + }); let left_shortstatehash = services .timeline @@ -790,40 +781,34 @@ async fn load_joined_room( "if timeline events, last_timeline_count must be in the since window." ); - let since_shortstatehash: OptionFuture<_> = timeline_changed - .then(|| { - services - .timeline - .prev_shortstatehash(room_id, PduCount::Normal(since).saturating_add(1)) - .ok() - }) - .into(); + let since_shortstatehash = timeline_changed.then_async(|| { + services + .timeline + .prev_shortstatehash(room_id, PduCount::Normal(since).saturating_add(1)) + .ok() + }); - let horizon_shortstatehash: OptionFuture<_> = timeline_pdus + let horizon_shortstatehash = timeline_pdus .first() .map(at!(0)) - .map(|count| { + .map_async(|count| { services .timeline .get_shortstatehash(room_id, count) .inspect_err(inspect_debug_log) - }) - .into(); + }); - let current_shortstatehash: OptionFuture<_> = timeline_changed - .then(|| { - services - .timeline - .get_shortstatehash(room_id, last_timeline_count) - .inspect_err(inspect_debug_log) - .or_else(|_| services.state.get_room_shortstatehash(room_id)) - .map_err(|_| err!(Database(error!("Room {room_id} has no state")))) - }) - .into(); + let current_shortstatehash = timeline_changed.then_async(|| { + services + .timeline + .get_shortstatehash(room_id, last_timeline_count) + .inspect_err(inspect_debug_log) + .or_else(|_| services.state.get_room_shortstatehash(room_id)) + .map_err(|_| err!(Database(error!("Room {room_id} has no state")))) + }); - let encrypted_room: OptionFuture<_> = timeline_changed - .then(|| services.state_accessor.is_encrypted_room(room_id)) - .into(); + let encrypted_room = + timeline_changed.then_async(|| services.state_accessor.is_encrypted_room(room_id)); let receipt_events = services .read_receipt @@ -873,53 +858,43 @@ async fn load_joined_room( }; // Reset lazy loading because this is an initial sync - let lazy_load_reset: OptionFuture<_> = initial - .then(|| services.lazy_loading.reset(lazy_loading_context)) - .into(); + let lazy_load_reset = + initial.then_async(|| services.lazy_loading.reset(lazy_loading_context)); lazy_load_reset.await; - let witness: OptionFuture<_> = lazy_loading_enabled - .then(|| { - let witness: Witness = timeline_pdus - .iter() - .map(ref_at!(1)) - .map(Event::sender) - .map(Into::into) - .chain(receipt_events.keys().map(Into::into)) - .collect(); + let witness = lazy_loading_enabled.then_async(|| { + let witness: Witness = timeline_pdus + .iter() + .map(ref_at!(1)) + .map(Event::sender) + .map(Into::into) + .chain(receipt_events.keys().map(Into::into)) + .collect(); - services - .lazy_loading - .witness_retain(witness, lazy_loading_context) - }) - .into(); + services + .lazy_loading + .witness_retain(witness, lazy_loading_context) + }); - let sender_joined_count: OptionFuture<_> = timeline_changed - .then(|| { - services - .state_cache - .get_joined_count(room_id, sender_user) - .unwrap_or(0) - }) - .into(); + let sender_joined_count = timeline_changed.then_async(|| { + services + .state_cache + .get_joined_count(room_id, sender_user) + .unwrap_or(0) + }); - let since_encryption: OptionFuture<_> = since_shortstatehash - .map(|shortstatehash| { - services - .state_accessor - .state_get(shortstatehash, &StateEventType::RoomEncryption, "") - }) - .into(); + let since_encryption = since_shortstatehash.map_async(|shortstatehash| { + services + .state_accessor + .state_get(shortstatehash, &StateEventType::RoomEncryption, "") + }); - let last_notification_read: OptionFuture<_> = timeline_pdus - .is_empty() - .then(|| { - services - .pusher - .last_notification_read(sender_user, room_id) - .ok() - }) - .into(); + let last_notification_read = timeline_pdus.is_empty().then_async(|| { + services + .pusher + .last_notification_read(sender_user, room_id) + .ok() + }); let last_privateread_update = services .read_receipt @@ -941,21 +916,19 @@ async fn load_joined_room( let joined_since_last_sync = sender_joined_count.unwrap_or(0) > since; - let state_changes: OptionFuture<_> = current_shortstatehash - .map(|current_shortstatehash| { - calculate_state_changes( - services, - sender_user, - room_id, - full_state || initial, - since_shortstatehash, - horizon_shortstatehash, - current_shortstatehash, - joined_since_last_sync, - witness.as_ref(), - ) - }) - .into(); + let state_changes = current_shortstatehash.map_async(|current_shortstatehash| { + calculate_state_changes( + services, + sender_user, + room_id, + full_state || initial, + since_shortstatehash, + horizon_shortstatehash, + current_shortstatehash, + joined_since_last_sync, + witness.as_ref(), + ) + }); let StateChanges { heroes, @@ -1002,35 +975,28 @@ async fn load_joined_room( let send_notification_count_filter = |count: &UInt| *count != uint!(0) || send_notification_resets; - let notification_count: OptionFuture<_> = send_notification_counts - .then(|| { - services - .pusher - .notification_count(sender_user, room_id) - .map(TryInto::try_into) - .unwrap_or(uint!(0)) - }) - .into(); + let notification_count = send_notification_counts.then_async(|| { + services + .pusher + .notification_count(sender_user, room_id) + .map(TryInto::try_into) + .unwrap_or(uint!(0)) + }); - let highlight_count: OptionFuture<_> = send_notification_counts - .then(|| { - services - .pusher - .highlight_count(sender_user, room_id) - .map(TryInto::try_into) - .unwrap_or(uint!(0)) - }) - .into(); + let highlight_count = send_notification_counts.then_async(|| { + services + .pusher + .highlight_count(sender_user, room_id) + .map(TryInto::try_into) + .unwrap_or(uint!(0)) + }); - let private_read_event: OptionFuture<_> = last_privateread_update - .gt(&since) - .then(|| { - services - .read_receipt - .private_read_get(room_id, sender_user) - .map(Result::ok) - }) - .into(); + let private_read_event = last_privateread_update.gt(&since).then_async(|| { + services + .read_receipt + .private_read_get(room_id, sender_user) + .map(Result::ok) + }); let typing_events = services .typing @@ -1226,37 +1192,31 @@ async fn calculate_state_changes<'a>( .ok() }; - let lazy_state_ids: OptionFuture<_> = witness - .map(|witness| { - witness - .iter() - .stream() - .ready_filter(|&user_id| user_id != sender_user) - .broad_filter_map(|user_id| state_get_shorteventid(user_id)) - .into_future() - }) - .into(); + let lazy_state_ids = witness.map_async(|witness| { + witness + .iter() + .stream() + .ready_filter(|&user_id| user_id != sender_user) + .broad_filter_map(|user_id| state_get_shorteventid(user_id)) + .into_future() + }); - let state_diff_ids: OptionFuture<_> = incremental - .then(|| { - services - .state_accessor - .state_added((since_shortstatehash, horizon_shortstatehash)) - .boxed() - .into_future() - }) - .into(); + let state_diff_ids = incremental.then_async(|| { + services + .state_accessor + .state_added((since_shortstatehash, horizon_shortstatehash)) + .boxed() + .into_future() + }); - let current_state_ids: OptionFuture<_> = (!incremental) - .then(|| { - services - .state_accessor - .state_full_shortids(horizon_shortstatehash) - .expect_ok() - .boxed() - .into_future() - }) - .into(); + let current_state_ids = (!incremental).then_async(|| { + services + .state_accessor + .state_full_shortids(horizon_shortstatehash) + .expect_ok() + .boxed() + .into_future() + }); let state_events = current_state_ids .stream() @@ -1278,9 +1238,8 @@ async fn calculate_state_changes<'a>( .iter() .any(|event| *event.kind() == RoomMember); - let member_counts: OptionFuture<_> = send_member_counts - .then(|| calculate_counts(services, room_id, sender_user)) - .into(); + let member_counts = + send_member_counts.then_async(|| calculate_counts(services, room_id, sender_user)); let (joined_member_count, invited_member_count, heroes) = member_counts.await.unwrap_or((None, None, None)); @@ -1334,12 +1293,11 @@ async fn calculate_counts( let small_room = joined_member_count.saturating_add(invited_member_count) <= 5; - let heroes: OptionFuture<_> = services + let heroes = services .config .calculate_heroes .and_is(small_room) - .then(|| calculate_heroes(services, room_id, sender_user)) - .into(); + .then_async(|| calculate_heroes(services, room_id, sender_user)); (Some(joined_member_count), Some(invited_member_count), heroes.await) } diff --git a/src/api/client/sync/v5/extensions.rs b/src/api/client/sync/v5/extensions.rs index f3099d20..42f15de2 100644 --- a/src/api/client/sync/v5/extensions.rs +++ b/src/api/client/sync/v5/extensions.rs @@ -6,10 +6,7 @@ mod typing; use std::fmt::Debug; -use futures::{ - FutureExt, - future::{OptionFuture, join5}, -}; +use futures::{FutureExt, future::join5}; use ruma::{ RoomId, api::client::sync::sync_events::v5::{ListId, request::ExtensionRoomConfig, response}, @@ -37,45 +34,40 @@ pub(super) async fn handle( ) -> Result { let SyncInfo { .. } = sync_info; - let account_data: OptionFuture<_> = conn + let account_data = conn .extensions .account_data .enabled .unwrap_or(false) - .then(|| account_data::collect(sync_info, conn, window)) - .into(); + .then_async(|| account_data::collect(sync_info, conn, window)); - let receipts: OptionFuture<_> = conn + let receipts = conn .extensions .receipts .enabled .unwrap_or(false) - .then(|| receipts::collect(sync_info, conn, window)) - .into(); + .then_async(|| receipts::collect(sync_info, conn, window)); - let typing: OptionFuture<_> = conn + let typing = conn .extensions .typing .enabled .unwrap_or(false) - .then(|| typing::collect(sync_info, conn, window)) - .into(); + .then_async(|| typing::collect(sync_info, conn, window)); - let to_device: OptionFuture<_> = conn + let to_device = conn .extensions .to_device .enabled .unwrap_or(false) - .then(|| to_device::collect(sync_info, conn)) - .into(); + .then_async(|| to_device::collect(sync_info, conn)); - let e2ee: OptionFuture<_> = conn + let e2ee = conn .extensions .e2ee .enabled .unwrap_or(false) - .then(|| e2ee::collect(sync_info, conn)) - .into(); + .then_async(|| e2ee::collect(sync_info, conn)); let (account_data, receipts, typing, to_device, e2ee) = join5(account_data, receipts, typing, to_device, e2ee) diff --git a/src/api/client/sync/v5/extensions/e2ee.rs b/src/api/client/sync/v5/extensions/e2ee.rs index 0d9bdfa9..02310bc4 100644 --- a/src/api/client/sync/v5/extensions/e2ee.rs +++ b/src/api/client/sync/v5/extensions/e2ee.rs @@ -2,7 +2,7 @@ use std::collections::HashSet; use futures::{ FutureExt, StreamExt, TryFutureExt, - future::{OptionFuture, join, join3}, + future::{join, join3}, stream::once, }; use ruma::{ @@ -71,15 +71,12 @@ pub(super) async fn collect( let device_one_time_keys_count = services .users .last_one_time_keys_update(sender_user) - .then(|since| -> OptionFuture<_> { - since - .gt(&conn.globalsince) - .then(|| { - services - .users - .count_one_time_keys(sender_user, sender_device) - }) - .into() + .then(|since| { + since.gt(&conn.globalsince).then_async(|| { + services + .users + .count_one_time_keys(sender_user, sender_device) + }) }) .map(Option::unwrap_or_default); @@ -164,9 +161,8 @@ async fn collect_room( let encrypted_since_last_sync = !since_encryption; let joined_since_last_sync = sender_joined_count.is_ok_and(|count| count > conn.globalsince); - let joined_members_burst: OptionFuture<_> = (joined_since_last_sync - || encrypted_since_last_sync) - .then(|| { + let joined_members_burst = + (joined_since_last_sync || encrypted_since_last_sync).then_async(|| { services .state_cache .room_members(room_id) @@ -175,8 +171,7 @@ async fn collect_room( .map(|user_id| (MembershipState::Join, user_id)) .boxed() .into_future() - }) - .into(); + }); services .state_accessor diff --git a/src/api/client/sync/v5/filter.rs b/src/api/client/sync/v5/filter.rs index fe52e0a8..139f6652 100644 --- a/src/api/client/sync/v5/filter.rs +++ b/src/api/client/sync/v5/filter.rs @@ -1,4 +1,4 @@ -use futures::{StreamExt, future::OptionFuture, pin_mut}; +use futures::{StreamExt, pin_mut}; use ruma::{ RoomId, api::client::sync::sync_events::v5::request::ListFilters, directory::RoomTypeFilter, events::room::member::MembershipState, @@ -7,7 +7,8 @@ use tuwunel_core::{ is_equal_to, is_true, utils::{ BoolExt, FutureBoolExt, IterStream, ReadyExt, - future::{self, OptionExt, ReadyBoolExt}, + future::{self, OptionFutureExt, ReadyBoolExt}, + option::OptionExt, }, }; @@ -20,56 +21,49 @@ pub(super) async fn filter_room( room_id: &RoomId, membership: Option<&MembershipState>, ) -> bool { - let match_invite: OptionFuture<_> = filter - .is_invite - .map(async |is_invite| match (membership, is_invite) { - | (Some(MembershipState::Invite), true) => true, - | (Some(MembershipState::Invite), false) => false, - | (Some(_), true) => false, - | (Some(_), false) => true, - | _ => - services - .state_cache - .is_invited(sender_user, room_id) - .await == is_invite, - }) - .into(); + let match_invite = + filter + .is_invite + .map_async(async |is_invite| match (membership, is_invite) { + | (Some(MembershipState::Invite), true) => true, + | (Some(MembershipState::Invite), false) => false, + | (Some(_), true) => false, + | (Some(_), false) => true, + | _ => + services + .state_cache + .is_invited(sender_user, room_id) + .await == is_invite, + }); - let match_direct: OptionFuture<_> = filter - .is_dm - .map(async |is_dm| { - services - .account_data - .is_direct(sender_user, room_id) - .await == is_dm - }) - .into(); + let match_direct = filter.is_dm.map_async(async |is_dm| { + services + .account_data + .is_direct(sender_user, room_id) + .await == is_dm + }); - let match_direct_member: OptionFuture<_> = filter - .is_dm - .map(async |is_dm| { - services - .state_accessor - .is_direct(room_id, sender_user) - .await == is_dm - }) - .into(); + let match_direct_member = filter.is_dm.map_async(async |is_dm| { + services + .state_accessor + .is_direct(room_id, sender_user) + .await == is_dm + }); - let match_encrypted: OptionFuture<_> = filter + let match_encrypted = filter .is_encrypted - .map(async |is_encrypted| { + .map_async(async |is_encrypted| { services .state_accessor .is_encrypted_room(room_id) .await == is_encrypted - }) - .into(); + }); - let match_space_child: OptionFuture<_> = filter + let match_space_child = filter .spaces .is_empty() .is_false() - .then(async || { + .then_async(async || { filter .spaces .iter() @@ -77,43 +71,38 @@ pub(super) async fn filter_room( .flat_map(|room_id| services.spaces.get_space_children(room_id)) .ready_any(is_equal_to!(room_id)) .await - }) - .into(); + }); let fetch_tags = !filter.tags.is_empty() || !filter.not_tags.is_empty(); - let match_room_tag: OptionFuture<_> = fetch_tags - .then(async || { - if let Some(tags) = services - .account_data - .get_room_tags(sender_user, room_id) - .await - .ok() - .filter(|tags| !tags.is_empty()) - { - tags.keys().any(|tag| { - (filter.not_tags.is_empty() || !filter.not_tags.contains(tag)) - || (!filter.tags.is_empty() && filter.tags.contains(tag)) - }) - } else { - filter.tags.is_empty() - } - }) - .into(); + let match_room_tag = fetch_tags.then_async(async || { + if let Some(tags) = services + .account_data + .get_room_tags(sender_user, room_id) + .await + .ok() + .filter(|tags| !tags.is_empty()) + { + tags.keys().any(|tag| { + (filter.not_tags.is_empty() || !filter.not_tags.contains(tag)) + || (!filter.tags.is_empty() && filter.tags.contains(tag)) + }) + } else { + filter.tags.is_empty() + } + }); let fetch_room_type = !filter.room_types.is_empty() || !filter.not_room_types.is_empty(); - let match_room_type: OptionFuture<_> = fetch_room_type - .then(async || { - let room_type = services - .state_accessor - .get_room_type(room_id) - .await - .ok(); + let match_room_type = fetch_room_type.then_async(async || { + let room_type = services + .state_accessor + .get_room_type(room_id) + .await + .ok(); - let room_type = RoomTypeFilter::from(room_type); - (filter.not_room_types.is_empty() || !filter.not_room_types.contains(&room_type)) - && (filter.room_types.is_empty() || filter.room_types.contains(&room_type)) - }) - .into(); + let room_type = RoomTypeFilter::from(room_type); + (filter.not_room_types.is_empty() || !filter.not_room_types.contains(&room_type)) + && (filter.room_types.is_empty() || filter.room_types.contains(&room_type)) + }); future::and7( match_invite.is_none_or(is_true!()), diff --git a/src/api/client/sync/v5/rooms.rs b/src/api/client/sync/v5/rooms.rs index f9d9e41c..6369a4ca 100644 --- a/src/api/client/sync/v5/rooms.rs +++ b/src/api/client/sync/v5/rooms.rs @@ -5,7 +5,7 @@ use std::{ use futures::{ FutureExt, StreamExt, TryFutureExt, TryStreamExt, - future::{OptionFuture, join, join3, join4}, + future::{join, join3, join4}, }; use ruma::{ JsOption, MxcUri, OwnedMxcUri, OwnedRoomId, RoomId, UserId, @@ -131,19 +131,16 @@ async fn handle_room( (timeline_limit, required_state) }); - let timeline: OptionFuture<_> = is_invite - .is_false() - .then(|| { - load_timeline( - services, - sender_user, - room_id, - PduCount::Normal(roomsince), - Some(PduCount::from(conn.next_batch)), - timeline_limit, - ) - }) - .into(); + let timeline = is_invite.is_false().then_async(|| { + load_timeline( + services, + sender_user, + room_id, + PduCount::Normal(roomsince), + Some(PduCount::from(conn.next_batch)), + timeline_limit, + ) + }); let (timeline_pdus, limited, _lastcount) = timeline .await @@ -182,18 +179,17 @@ async fn handle_room( .map(TryInto::try_into) .flat_ok(); - let num_live: OptionFuture<_> = roomsince + let num_live = roomsince .ne(&0) .and_is(limited || timeline_pdus.len() >= timeline_limit) - .then(|| { + .then_async(|| { services .timeline .pdus(None, room_id, Some(roomsince.into())) .count() .map(TryInto::try_into) .map(Result::ok) - }) - .into(); + }); let lazy = required_state .iter() @@ -249,14 +245,12 @@ async fn handle_room( .collect(); // TODO: figure out a timestamp we can use for remote invites - let invite_state: OptionFuture<_> = is_invite - .then(|| { - services - .state_cache - .invite_state(sender_user, room_id) - .ok() - }) - .into(); + let invite_state = is_invite.then_async(|| { + services + .state_cache + .invite_state(sender_user, room_id) + .ok() + }); let room_name = services .state_accessor @@ -327,10 +321,10 @@ async fn handle_room( .boxed() .await; - let heroes: OptionFuture<_> = services + let heroes = services .config .calculate_heroes - .then(|| { + .then_async(|| { calculate_heroes( services, sender_user, @@ -339,9 +333,10 @@ async fn handle_room( room_avatar.as_deref(), ) }) - .into(); + .await + .unwrap_or_default(); - let (heroes, heroes_name, heroes_avatar) = heroes.await.unwrap_or_default(); + let (heroes, heroes_name, heroes_avatar) = heroes; Ok(response::Room { initial: roomsince.eq(&0).then_some(true), @@ -388,17 +383,15 @@ async fn calculate_heroes( .await .ok()?; - let name: OptionFuture<_> = content + let name = content .displayname .is_none() - .then(|| services.users.displayname(&user_id).ok()) - .into(); + .then_async(|| services.users.displayname(&user_id).ok()); - let avatar: OptionFuture<_> = content + let avatar = content .avatar_url .is_none() - .then(|| services.users.avatar_url(&user_id).ok()) - .into(); + .then_async(|| services.users.avatar_url(&user_id).ok()); let (name, avatar) = join(name, avatar).await; let hero = response::Hero { diff --git a/src/api/client/sync/v5/selector.rs b/src/api/client/sync/v5/selector.rs index 3922e125..b8a7184c 100644 --- a/src/api/client/sync/v5/selector.rs +++ b/src/api/client/sync/v5/selector.rs @@ -1,9 +1,6 @@ use std::cmp::Ordering; -use futures::{ - FutureExt, StreamExt, TryFutureExt, - future::{OptionFuture, join5}, -}; +use futures::{FutureExt, StreamExt, TryFutureExt, future::join5}; use ruma::{OwnedRoomId, UInt, events::room::member::MembershipState, uint}; use tuwunel_core::{ apply, is_true, @@ -12,6 +9,7 @@ use tuwunel_core::{ utils::{ BoolExt, TryFutureExtExt, math::usize_from_ruma, + option::OptionExt, stream::{BroadbandExt, IterStream}, }, }; @@ -80,15 +78,11 @@ async fn matcher( .iter() .stream() .filter_map(async |(id, list)| { - let filter: OptionFuture<_> = list - .filters + list.filters .clone() - .map(async |filters| { + .map_async(async |filters| { filter_room(sync_info, &filters, &room_id, membership.as_ref()).await }) - .into(); - - filter .await .is_none_or(is_true!()) .then(|| id.clone()) @@ -97,50 +91,40 @@ async fn matcher( .map(|lists| (lists.is_empty().is_false(), lists)) .await; - let last_notification: OptionFuture<_> = matched - .then(|| { - services - .pusher - .last_notification_read(sender_user, &room_id) - .unwrap_or_default() - }) - .into(); + let last_notification = matched.then_async(|| { + services + .pusher + .last_notification_read(sender_user, &room_id) + .unwrap_or_default() + }); - let last_privateread: OptionFuture<_> = matched - .then(|| { - services - .read_receipt - .last_privateread_update(sender_user, &room_id) - }) - .into(); + let last_privateread = matched.then_async(|| { + services + .read_receipt + .last_privateread_update(sender_user, &room_id) + }); - let last_receipt: OptionFuture<_> = matched - .then(|| { - services - .read_receipt - .last_receipt_count(&room_id, sender_user.into(), None) - .unwrap_or_default() - }) - .into(); + let last_receipt = matched.then_async(|| { + services + .read_receipt + .last_receipt_count(&room_id, sender_user.into(), None) + .unwrap_or_default() + }); - let last_account: OptionFuture<_> = matched - .then(|| { - services - .account_data - .last_count(Some(room_id.as_ref()), sender_user, Some(conn.next_batch)) - .unwrap_or_default() - }) - .into(); + let last_account = matched.then_async(|| { + services + .account_data + .last_count(Some(room_id.as_ref()), sender_user, Some(conn.next_batch)) + .unwrap_or_default() + }); - let last_timeline: OptionFuture<_> = matched - .then(|| { - services - .timeline - .last_timeline_count(None, &room_id, Some(conn.next_batch.into())) - .map_ok(PduCount::into_unsigned) - .unwrap_or_default() - }) - .into(); + let last_timeline = matched.then_async(|| { + services + .timeline + .last_timeline_count(None, &room_id, Some(conn.next_batch.into())) + .map_ok(PduCount::into_unsigned) + .unwrap_or_default() + }); let (last_timeline, last_notification, last_account, last_receipt, last_privateread) = join5(last_timeline, last_notification, last_account, last_receipt, last_privateread) diff --git a/src/api/client/user_directory.rs b/src/api/client/user_directory.rs index c7839a1f..4bd58e6b 100644 --- a/src/api/client/user_directory.rs +++ b/src/api/client/user_directory.rs @@ -8,7 +8,7 @@ use ruma::{ use tuwunel_core::{ Result, utils::{ - future::BoolExt, + BoolExt, FutureBoolExt, stream::{BroadbandExt, ReadyExt}, }, }; @@ -52,11 +52,12 @@ pub(crate) async fn search_users_route( &search_term, ) .await - .then_some(search_users::v3::User { + .then_async(async || search_users::v3::User { user_id: user_id.clone(), display_name, avatar_url: services.users.avatar_url(&user_id).await.ok(), }) + .await }); pin_mut!(users); diff --git a/src/api/server/utils.rs b/src/api/server/utils.rs index 7cbc6fd8..9b33f978 100644 --- a/src/api/server/utils.rs +++ b/src/api/server/utils.rs @@ -1,6 +1,6 @@ -use futures::{FutureExt, StreamExt, future::OptionFuture, join}; +use futures::{FutureExt, StreamExt, join}; use ruma::{EventId, RoomId, ServerName}; -use tuwunel_core::{Err, Result, implement, is_false}; +use tuwunel_core::{Err, Result, implement, is_false, utils::option::OptionExt}; use tuwunel_service::Services; pub(super) struct AccessCheck<'a> { @@ -36,14 +36,11 @@ pub(super) async fn check(&self) -> Result { .room_members_knocked(self.room_id) .count(); - let server_can_see: OptionFuture<_> = self - .event_id - .map(|event_id| { - self.services - .state_accessor - .server_can_see_event(self.origin, self.room_id, event_id) - }) - .into(); + let server_can_see = self.event_id.map_async(|event_id| { + self.services + .state_accessor + .server_can_see_event(self.origin, self.room_id, event_id) + }); let (world_readable, server_in_room, server_can_see, acl_check, user_is_knocking) = join!(world_readable, server_in_room, server_can_see, acl_check, user_is_knocking); diff --git a/src/core/config/mod.rs b/src/core/config/mod.rs index 2d04eecd..fd6bf55f 100644 --- a/src/core/config/mod.rs +++ b/src/core/config/mod.rs @@ -29,8 +29,7 @@ pub use self::{check::check, manager::Manager}; use crate::{ Result, err, error::Error, - utils, - utils::{string::EMPTY, sys}, + utils::{self, option::OptionExt, string::EMPTY, sys}, }; /// All the config options for tuwunel. @@ -2663,14 +2662,12 @@ impl IdentityProvider { return Ok(client_secret.clone()); } - futures::future::OptionFuture::from( - self.client_secret_file - .as_ref() - .map(tokio::fs::read_to_string), - ) - .await - .transpose()? - .ok_or_else(|| err!("No client secret or client secret file configured")) + self.client_secret_file + .as_ref() + .map_async(tokio::fs::read_to_string) + .await + .transpose()? + .ok_or_else(|| err!("No client secret or client secret file configured")) } } diff --git a/src/core/matrix/state_res/resolve.rs b/src/core/matrix/state_res/resolve.rs index 3b09873b..48ec0395 100644 --- a/src/core/matrix/state_res/resolve.rs +++ b/src/core/matrix/state_res/resolve.rs @@ -11,7 +11,7 @@ mod topological_sort; use std::collections::{BTreeMap, BTreeSet}; -use futures::{FutureExt, Stream, StreamExt, TryFutureExt, future::OptionFuture}; +use futures::{FutureExt, Stream, StreamExt, TryFutureExt}; use ruma::{OwnedEventId, events::StateEventType, room_version_rules::RoomVersionRules}; pub use self::topological_sort::topological_sort; @@ -26,7 +26,11 @@ use crate::{ Result, debug, matrix::{Event, TypeStateKey}, trace, - utils::stream::{BroadbandExt, IterStream}, + utils::{ + BoolExt, + option::OptionExt, + stream::{BroadbandExt, IterStream}, + }, }; /// ConflictMap of OwnedEventId specifically. @@ -102,10 +106,9 @@ where || backport_css; // Since `org.matrix.hydra.11`, fetch the conflicted state subgraph. - let conflicted_subgraph: OptionFuture<_> = consider_conflicted_subgraph + let conflicted_subgraph = consider_conflicted_subgraph .then(|| conflicted_states.clone().into_values().flatten()) - .map(async |ids| conflicted_subgraph_dfs(ids.stream(), fetch)) - .into(); + .map_async(async |ids| conflicted_subgraph_dfs(ids.stream(), fetch)); let conflicted_subgraph = conflicted_subgraph .await @@ -180,9 +183,8 @@ where // 3. Take all remaining events that weren’t picked in step 1 and order them by // the mainline ordering based on the power level in the partially resolved // state obtained in step 2. - let sorted_remaining_events: OptionFuture<_> = have_remaining_events - .then(move || mainline_sort(power_event.cloned(), remaining_events, fetch)) - .into(); + let sorted_remaining_events = have_remaining_events + .then_async(move || mainline_sort(power_event.cloned(), remaining_events, fetch)); let sorted_remaining_events = sorted_remaining_events .await diff --git a/src/core/utils/bool.rs b/src/core/utils/bool.rs index 66adfa4c..4943ef25 100644 --- a/src/core/utils/bool.rs +++ b/src/core/utils/bool.rs @@ -1,5 +1,7 @@ //! Trait BoolExt +use futures::future::OptionFuture; + /// Boolean extensions and chain.starters pub trait BoolExt { fn and(self, t: Option) -> Option; @@ -50,6 +52,8 @@ pub trait BoolExt { fn or_some(self, t: T) -> Option; + fn then_async O>(self, f: F) -> OptionFuture; + fn then_none(self) -> Option; fn then_ok_or(self, t: T, e: E) -> Result; @@ -126,6 +130,11 @@ impl BoolExt for bool { #[inline] fn or_some(self, t: T) -> Option { self.is_false().then_some(t) } + #[inline] + fn then_async O>(self, f: F) -> OptionFuture { + OptionFuture::<_>::from(self.then(f)) + } + #[inline] fn then_none(self) -> Option { Option::::None } diff --git a/src/core/utils/future/mod.rs b/src/core/utils/future/mod.rs index f0e1d178..ddd57305 100644 --- a/src/core/utils/future/mod.rs +++ b/src/core/utils/future/mod.rs @@ -9,7 +9,7 @@ mod try_ext_ext; pub use self::{ bool_ext::{BoolExt, and, and4, and5, and6, and7, or}, ext_ext::ExtExt, - option_ext::OptionExt, + option_ext::OptionFutureExt, option_stream::OptionStream, ready_bool_ext::ReadyBoolExt, ready_eq_ext::ReadyEqExt, diff --git a/src/core/utils/future/option_ext.rs b/src/core/utils/future/option_ext.rs index c7b65ef6..d20986ed 100644 --- a/src/core/utils/future/option_ext.rs +++ b/src/core/utils/future/option_ext.rs @@ -2,7 +2,7 @@ use futures::{Future, FutureExt, future::OptionFuture}; -pub trait OptionExt { +pub trait OptionFutureExt { fn is_none_or(self, f: impl FnOnce(&T) -> bool + Send) -> impl Future + Send; fn is_some_and(self, f: impl FnOnce(&T) -> bool + Send) -> impl Future + Send; @@ -16,7 +16,7 @@ pub trait OptionExt { T: Default; } -impl OptionExt for OptionFuture +impl OptionFutureExt for OptionFuture where Fut: Future + Send, T: Send, diff --git a/src/core/utils/mod.rs b/src/core/utils/mod.rs index ceaeb543..6098f995 100644 --- a/src/core/utils/mod.rs +++ b/src/core/utils/mod.rs @@ -9,6 +9,7 @@ pub mod hash; pub mod json; pub mod math; pub mod mutex_map; +pub mod option; pub mod rand; pub mod result; pub mod set; diff --git a/src/core/utils/option.rs b/src/core/utils/option.rs new file mode 100644 index 00000000..ce07841b --- /dev/null +++ b/src/core/utils/option.rs @@ -0,0 +1,11 @@ +use futures::future::OptionFuture; + +pub trait OptionExt { + fn map_async O>(self, f: F) -> OptionFuture; +} + +impl OptionExt for Option { + fn map_async O>(self, f: F) -> OptionFuture { + OptionFuture::<_>::from(self.map(f)) + } +} diff --git a/src/service/membership/join.rs b/src/service/membership/join.rs index f464c7c4..ffd8c231 100644 --- a/src/service/membership/join.rs +++ b/src/service/membership/join.rs @@ -8,7 +8,7 @@ use std::{ use futures::{ FutureExt, StreamExt, TryFutureExt, TryStreamExt, - future::{OptionFuture, join3, join4}, + future::{join3, join4}, }; use ruma::{ CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, OwnedServerName, OwnedUserId, RoomId, @@ -31,7 +31,7 @@ use tuwunel_core::{ matrix::{event::gen_event_id_canonical_json, room_version}, pdu::{PduBuilder, format::from_incoming_federation}, state_res, trace, - utils::{self, IterStream, ReadyExt, future::TryExtExt, math::Expected, shuffle}, + utils::{self, BoolExt, IterStream, ReadyExt, future::TryExtExt, math::Expected, shuffle}, warn, }; @@ -544,25 +544,23 @@ pub async fn join_local( }) .await; - let join_authorized_via_users_server: OptionFuture<_> = is_joined_restricted_rooms - .then(async || { - self.services - .state_cache - .local_users_in_room(room_id) - .filter(|user| { - self.services.state_accessor.user_can_invite( - room_id, - user, - sender_user, - state_lock, - ) - }) - .map(ToOwned::to_owned) - .boxed() - .next() - .await - }) - .into(); + let join_authorized_via_users_server = is_joined_restricted_rooms.then_async(async || { + self.services + .state_cache + .local_users_in_room(room_id) + .filter(|user| { + self.services.state_accessor.user_can_invite( + room_id, + user, + sender_user, + state_lock, + ) + }) + .map(ToOwned::to_owned) + .boxed() + .next() + .await + }); let displayname = self.services.users.displayname(sender_user).ok(); diff --git a/src/service/presence/mod.rs b/src/service/presence/mod.rs index 49a7db30..c11e052c 100644 --- a/src/service/presence/mod.rs +++ b/src/service/presence/mod.rs @@ -4,19 +4,17 @@ mod presence; use std::{collections::HashMap, sync::Arc, time::Duration}; use async_trait::async_trait; -use futures::{ - Stream, StreamExt, TryFutureExt, - future::{OptionFuture, try_join}, - stream::FuturesUnordered, -}; +use futures::{Stream, StreamExt, TryFutureExt, future::try_join, stream::FuturesUnordered}; use loole::{Receiver, Sender}; use ruma::{ DeviceId, OwnedUserId, UInt, UserId, events::presence::PresenceEvent, presence::PresenceState, }; use tokio::{sync::RwLock, time::sleep}; use tuwunel_core::{ - Error, Result, checked, debug, debug_warn, error, result::LogErr, trace, - utils::future::OptionExt, + Error, Result, checked, debug, debug_warn, error, + result::LogErr, + trace, + utils::{future::OptionFutureExt, option::OptionExt}, }; use self::{data::Data, presence::Presence}; @@ -164,13 +162,11 @@ impl Service { return Ok(()); } - let update_device_seen: OptionFuture<_> = device_id - .map(|device_id| { - self.services - .users - .update_device_last_seen(user_id, device_id, None) - }) - .into(); + let update_device_seen = device_id.map_async(|device_id| { + self.services + .users + .update_device_last_seen(user_id, device_id, None) + }); let status_msg = match last_presence { | Ok((_, ref presence)) => presence.content.status_msg.clone(), diff --git a/src/service/pusher/append.rs b/src/service/pusher/append.rs index 12162189..530c829a 100644 --- a/src/service/pusher/append.rs +++ b/src/service/pusher/append.rs @@ -1,9 +1,6 @@ use std::{collections::HashSet, sync::Arc}; -use futures::{ - FutureExt, StreamExt, - future::{OptionFuture, join}, -}; +use futures::{FutureExt, StreamExt, future::join}; use ruma::{ RoomId, UserId, api::client::push::ProfileTag, @@ -111,13 +108,11 @@ pub(crate) async fn append_pdu(&self, pdu_id: RawPduId, pdu: &Pdu) -> Result { .iter() .any(|action| matches!(action, Action::SetTweak(Tweak::Highlight(true)))); - let increment_notify: OptionFuture<_> = notify - .then(|| self.increment_notificationcount(pdu.room_id(), user)) - .into(); + let increment_notify = + notify.then_async(|| self.increment_notificationcount(pdu.room_id(), user)); - let increment_highlight: OptionFuture<_> = highlight - .then(|| self.increment_highlightcount(pdu.room_id(), user)) - .into(); + let increment_highlight = + highlight.then_async(|| self.increment_highlightcount(pdu.room_id(), user)); join(increment_notify, increment_highlight).await; diff --git a/src/service/rooms/event_handler/handle_incoming_pdu.rs b/src/service/rooms/event_handler/handle_incoming_pdu.rs index e249918a..804b3010 100644 --- a/src/service/rooms/event_handler/handle_incoming_pdu.rs +++ b/src/service/rooms/event_handler/handle_incoming_pdu.rs @@ -1,7 +1,4 @@ -use futures::{ - FutureExt, TryFutureExt, TryStreamExt, - future::{OptionFuture, try_join5}, -}; +use futures::{FutureExt, TryFutureExt, TryStreamExt, future::try_join5}; use ruma::{CanonicalJsonObject, EventId, RoomId, ServerName, UserId, events::StateEventType}; use tuwunel_core::{ Err, Result, debug, @@ -9,7 +6,7 @@ use tuwunel_core::{ err, implement, matrix::{Event, room_version}, trace, - utils::stream::IterStream, + utils::{BoolExt, stream::IterStream}, warn, }; @@ -83,11 +80,10 @@ pub async fn handle_incoming_pdu<'a>( .try_into() .map_err(|e| err!(Request(InvalidParam("PDU does not have a valid sender key: {e}"))))?; - let sender_acl_check: OptionFuture<_> = sender + let sender_acl_check = sender .server_name() .ne(origin) - .then(|| self.acl_check(sender.server_name(), room_id)) - .into(); + .then_async(|| self.acl_check(sender.server_name(), room_id)); // Fetch create event let create_event = diff --git a/src/service/rooms/event_handler/state_at_incoming.rs b/src/service/rooms/event_handler/state_at_incoming.rs index 9c854095..9d233a00 100644 --- a/src/service/rooms/event_handler/state_at_incoming.rs +++ b/src/service/rooms/event_handler/state_at_incoming.rs @@ -3,16 +3,16 @@ use std::{ iter::{Iterator, once}, }; -use futures::{ - FutureExt, StreamExt, TryFutureExt, TryStreamExt, - future::{OptionFuture, try_join}, -}; +use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::try_join}; use ruma::{OwnedEventId, RoomId, RoomVersionId}; use tuwunel_core::{ Result, apply, err, implement, matrix::{Event, StateMap, state_res::AuthSet}, ref_at, trace, - utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryWidebandExt}, + utils::{ + option::OptionExt, + stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryWidebandExt}, + }, }; use crate::rooms::short::ShortStateHash; @@ -174,16 +174,15 @@ async fn state_at_incoming_fork( where Pdu: Event, { - let leaf: OptionFuture<_> = prev_event + let leaf = prev_event .state_key() - .map(async |state_key| { + .map_async(async |state_key| { self.services .short .get_or_create_shortstatekey(&prev_event.kind().to_cow_str().into(), state_key) .map(|shortstatekey| once((shortstatekey, prev_event.event_id().to_owned()))) .await - }) - .into(); + }); let leaf_state_after_event: Vec<_> = self .services diff --git a/src/service/rooms/state_cache/mod.rs b/src/service/rooms/state_cache/mod.rs index 03216c56..b63e678c 100644 --- a/src/service/rooms/state_cache/mod.rs +++ b/src/service/rooms/state_cache/mod.rs @@ -6,11 +6,7 @@ use std::{ sync::{Arc, RwLock}, }; -use futures::{ - Stream, StreamExt, - future::{OptionFuture, join5}, - pin_mut, -}; +use futures::{Stream, StreamExt, future::join5, pin_mut}; use ruma::{ OwnedRoomId, RoomId, ServerName, UserId, events::{AnyStrippedStateEvent, AnySyncStateEvent, room::member::MembershipState}, @@ -21,6 +17,7 @@ use tuwunel_core::{ result::LogErr, trace, utils::{ + BoolExt, future::OptionStream, stream::{BroadbandExt, ReadyExt, TryIgnore}, }, @@ -411,45 +408,41 @@ pub fn user_memberships<'a>( use MembershipState::*; use futures::stream::select; - let joined: OptionFuture<_> = mask + let joined = mask .is_none_or(|mask| mask.contains(&Join)) - .then(|| { + .then_async(|| { self.rooms_joined(user_id) .map(|room_id| (Join, room_id)) .boxed() .into_future() - }) - .into(); + }); - let invited: OptionFuture<_> = mask + let invited = mask .is_none_or(|mask| mask.contains(&Invite)) - .then(|| { + .then_async(|| { self.rooms_invited(user_id) .map(|room_id| (Invite, room_id)) .boxed() .into_future() - }) - .into(); + }); - let knocked: OptionFuture<_> = mask + let knocked = mask .is_none_or(|mask| mask.contains(&Knock)) - .then(|| { + .then_async(|| { self.rooms_knocked(user_id) .map(|room_id| (Knock, room_id)) .boxed() .into_future() - }) - .into(); + }); - let left: OptionFuture<_> = mask + let left = mask .is_none_or(|mask| mask.contains(&Leave)) - .then(|| { + .then_async(|| { self.rooms_left(user_id) .map(|room_id| (Leave, room_id)) .boxed() .into_future() - }) - .into(); + }); select( select(joined.stream(), left.stream()), diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index ac0a6cc0..8321dfe0 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -11,7 +11,7 @@ use std::{ use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD}; use futures::{ FutureExt, StreamExt, TryFutureExt, - future::{BoxFuture, OptionFuture, join3, try_join3}, + future::{BoxFuture, join3, try_join3}, pin_mut, stream::FuturesUnordered, }; @@ -43,7 +43,7 @@ use tuwunel_core::{ result::LogErr, trace, utils::{ - ReadyExt, calculate_hash, continue_exponential_backoff_secs, + BoolExt, ReadyExt, calculate_hash, continue_exponential_backoff_secs, future::TryExtExt, stream::{BroadbandExt, IterStream, WidebandExt}, }, @@ -388,19 +388,17 @@ impl Service { let device_changes = self.select_edus_device_changes(server_name, batch, &max_edu_count, &events_len); - let receipts: OptionFuture<_> = self + let receipts = self .server .config .allow_outgoing_read_receipts - .then(|| self.select_edus_receipts(server_name, batch, &max_edu_count)) - .into(); + .then_async(|| self.select_edus_receipts(server_name, batch, &max_edu_count)); - let presence: OptionFuture<_> = self + let presence = self .server .config .allow_outgoing_presence - .then(|| self.select_edus_presence(server_name, batch, &max_edu_count)) - .into(); + .then_async(|| self.select_edus_presence(server_name, batch, &max_edu_count)); let (device_changes, receipts, presence) = join3(device_changes, receipts, presence).await;