Introduce OptionFuture helpers

Optimize user directory searches
This commit is contained in:
dasha_uwu
2026-01-17 05:38:09 +05:00
committed by Jason Volk
parent 95121ad905
commit e78bf21085
28 changed files with 454 additions and 567 deletions

View File

@@ -1,7 +1,7 @@
use axum::extract::State; use axum::extract::State;
use futures::{ use futures::{
FutureExt, StreamExt, TryFutureExt, TryStreamExt, FutureExt, StreamExt, TryFutureExt, TryStreamExt,
future::{OptionFuture, join, join3, try_join3}, future::{join, join3, try_join3},
}; };
use ruma::{OwnedEventId, UserId, api::client::context::get_context, events::StateEventType}; use ruma::{OwnedEventId, UserId, api::client::context::get_context, events::StateEventType};
use tuwunel_core::{ use tuwunel_core::{
@@ -9,6 +9,7 @@ use tuwunel_core::{
utils::{ utils::{
IterStream, IterStream,
future::TryExtExt, future::TryExtExt,
option::OptionExt,
stream::{BroadbandExt, ReadyExt, TryIgnore, WidebandExt}, stream::{BroadbandExt, ReadyExt, TryIgnore, WidebandExt},
}, },
}; };
@@ -116,7 +117,7 @@ pub(crate) async fn get_context_route(
options: Some(&filter.lazy_load_options), options: Some(&filter.lazy_load_options),
}; };
let lazy_loading_witnessed: OptionFuture<_> = filter let lazy_loading_witnessed = filter
.lazy_load_options .lazy_load_options
.is_enabled() .is_enabled()
.then_some( .then_some(
@@ -125,8 +126,7 @@ pub(crate) async fn get_context_route(
.chain(events_before.iter()) .chain(events_before.iter())
.chain(events_after.iter()), .chain(events_after.iter()),
) )
.map(|witnessed| lazy_loading_witness(&services, &lazy_loading_context, witnessed)) .map_async(|witnessed| lazy_loading_witness(&services, &lazy_loading_context, witnessed));
.into();
let state_at = events_after let state_at = events_after
.last() .last()

View File

@@ -1,9 +1,5 @@
use axum::extract::State; use axum::extract::State;
use futures::{ use futures::{FutureExt, StreamExt, TryFutureExt, future::Either, pin_mut};
FutureExt, StreamExt, TryFutureExt,
future::{Either, OptionFuture},
pin_mut,
};
use ruma::{ use ruma::{
RoomId, UserId, RoomId, UserId,
api::{ api::{
@@ -21,7 +17,7 @@ use tuwunel_core::{
}, },
ref_at, ref_at,
utils::{ utils::{
IterStream, ReadyExt, BoolExt, IterStream, ReadyExt,
result::{FlatOk, LogErr}, result::{FlatOk, LogErr},
stream::{BroadbandExt, TryIgnore, WidebandExt}, stream::{BroadbandExt, TryIgnore, WidebandExt},
}, },
@@ -140,11 +136,10 @@ pub(crate) async fn get_message_events_route(
options: Some(&filter.lazy_load_options), options: Some(&filter.lazy_load_options),
}; };
let witness: OptionFuture<_> = filter let witness = filter
.lazy_load_options .lazy_load_options
.is_enabled() .is_enabled()
.then(|| lazy_loading_witness(&services, &lazy_loading_context, events.iter())) .then_async(|| lazy_loading_witness(&services, &lazy_loading_context, events.iter()));
.into();
let state = witness let state = witness
.map(Option::into_iter) .map(Option::into_iter)

View File

@@ -1,7 +1,7 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use axum::extract::State; use axum::extract::State;
use futures::{FutureExt, future::OptionFuture}; use futures::FutureExt;
use ruma::{ use ruma::{
CanonicalJsonObject, EventEncryptionAlgorithm, Int, OwnedRoomAliasId, OwnedRoomId, CanonicalJsonObject, EventEncryptionAlgorithm, Int, OwnedRoomAliasId, OwnedRoomId,
OwnedUserId, RoomId, RoomVersionId, OwnedUserId, RoomId, RoomVersionId,
@@ -32,7 +32,7 @@ use serde_json::{json, value::to_raw_value};
use tuwunel_core::{ use tuwunel_core::{
Err, Result, debug_info, debug_warn, err, info, Err, Result, debug_info, debug_warn, err, info,
matrix::{StateKey, pdu::PduBuilder, room_version}, matrix::{StateKey, pdu::PduBuilder, room_version},
utils::BoolExt, utils::{BoolExt, option::OptionExt},
warn, warn,
}; };
use tuwunel_service::{Services, appservice::RegistrationInfo, rooms::state::RoomMutexGuard}; use tuwunel_service::{Services, appservice::RegistrationInfo, rooms::state::RoomMutexGuard};
@@ -72,11 +72,10 @@ pub(crate) async fn create_room_route(
| _ => RoomPreset::PrivateChat, // Room visibility should not be custom | _ => RoomPreset::PrivateChat, // Room visibility should not be custom
}); });
let alias: OptionFuture<_> = body let alias = body
.room_alias_name .room_alias_name
.as_ref() .as_ref()
.map(|alias| room_alias_check(&services, alias, body.appservice_info.as_ref())) .map_async(|alias| room_alias_check(&services, alias, body.appservice_info.as_ref()));
.into();
// Determine room version // Determine room version
let (room_version, version_rules) = body let (room_version, version_rules) = body

View File

@@ -1,10 +1,6 @@
use axum::extract::State; use axum::extract::State;
use axum_client_ip::InsecureClientIp; use axum_client_ip::InsecureClientIp;
use futures::{ use futures::{FutureExt, StreamExt, TryFutureExt, future::join3, stream::FuturesUnordered};
FutureExt, StreamExt, TryFutureExt,
future::{OptionFuture, join3},
stream::FuturesUnordered,
};
use ruma::{ use ruma::{
OwnedServerName, RoomId, UserId, OwnedServerName, RoomId, UserId,
api::{ api::{
@@ -16,7 +12,7 @@ use ruma::{
}; };
use tuwunel_core::{ use tuwunel_core::{
Err, Result, debug_warn, trace, Err, Result, debug_warn, trace,
utils::{IterStream, future::TryExtExt}, utils::{IterStream, future::TryExtExt, option::OptionExt},
}; };
use tuwunel_service::Services; use tuwunel_service::Services;
@@ -158,14 +154,12 @@ async fn local_room_summary_response(
.room_joined_count(room_id) .room_joined_count(room_id)
.unwrap_or(0); .unwrap_or(0);
let membership: OptionFuture<_> = sender_user let membership = sender_user.map_async(|sender_user| {
.map(|sender_user| { services
services .state_accessor
.state_accessor .get_member(room_id, sender_user)
.get_member(room_id, sender_user) .map_ok_or(MembershipState::Leave, |content| content.membership)
.map_ok_or(MembershipState::Leave, |content| content.membership) });
})
.into();
let ( let (
canonical_alias, canonical_alias,

View File

@@ -1,7 +1,7 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use axum::extract::State; use axum::extract::State;
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::OptionFuture}; use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use ruma::{ use ruma::{
OwnedRoomId, RoomId, UInt, UserId, OwnedRoomId, RoomId, UInt, UserId,
api::client::search::search_events::{ api::client::search::search_events::{
@@ -16,7 +16,7 @@ use tuwunel_core::{
Err, Result, at, is_true, Err, Result, at, is_true,
matrix::Event, matrix::Event,
result::FlatOk, result::FlatOk,
utils::{IterStream, stream::ReadyExt}, utils::{IterStream, option::OptionExt, stream::ReadyExt},
}; };
use tuwunel_service::{Services, rooms::search::RoomQuery}; use tuwunel_service::{Services, rooms::search::RoomQuery};
@@ -41,18 +41,17 @@ pub(crate) async fn search_events_route(
) -> Result<Response> { ) -> Result<Response> {
let sender_user = body.sender_user(); let sender_user = body.sender_user();
let next_batch = body.next_batch.as_deref(); let next_batch = body.next_batch.as_deref();
let room_events_result: OptionFuture<_> = body let room_events = body
.search_categories .search_categories
.room_events .room_events
.as_ref() .as_ref()
.map(|criteria| category_room_events(&services, sender_user, next_batch, criteria)) .map_async(|criteria| category_room_events(&services, sender_user, next_batch, criteria))
.into(); .await
.transpose()?;
Ok(Response { Ok(Response {
search_categories: ResultCategories { search_categories: ResultCategories {
room_events: room_events_result room_events: room_events.unwrap_or_default(),
.await
.unwrap_or_else(|| Ok(ResultRoomEvents::default()))?,
}, },
}) })
} }

View File

@@ -1,13 +1,13 @@
use std::{collections::BTreeSet, iter::once, str::FromStr}; use std::{collections::BTreeSet, iter::once, str::FromStr};
use axum::extract::State; use axum::extract::State;
use futures::{FutureExt, StreamExt, TryFutureExt, future::OptionFuture, stream::FuturesOrdered}; use futures::{FutureExt, StreamExt, TryFutureExt, stream::FuturesOrdered};
use ruma::{ use ruma::{
OwnedRoomId, OwnedServerName, RoomId, UInt, UserId, api::client::space::get_hierarchy, OwnedRoomId, OwnedServerName, RoomId, UInt, UserId, api::client::space::get_hierarchy,
}; };
use tuwunel_core::{ use tuwunel_core::{
Err, Result, debug_error, Err, Result, debug_error,
utils::{future::TryExtExt, stream::IterStream}, utils::{future::TryExtExt, option::OptionExt, stream::IterStream},
}; };
use tuwunel_service::{ use tuwunel_service::{
Services, Services,
@@ -187,30 +187,26 @@ where
} }
} }
let next_batch: OptionFuture<_> = queue let next_batch = queue.next().await.map_async(async |(room, ..)| {
.next() parents.insert(room);
.await
.map(async |(room, ..)| {
parents.insert(room);
let next_short_room_ids: Vec<_> = parents let next_short_room_ids: Vec<_> = parents
.iter() .iter()
.stream() .stream()
.filter_map(|room_id| services.short.get_shortroomid(room_id).ok()) .filter_map(|room_id| services.short.get_shortroomid(room_id).ok())
.collect() .collect()
.await; .await;
(next_short_room_ids.iter().ne(short_room_ids) && !next_short_room_ids.is_empty()) (next_short_room_ids.iter().ne(short_room_ids) && !next_short_room_ids.is_empty())
.then_some(PaginationToken { .then_some(PaginationToken {
short_room_ids: next_short_room_ids, short_room_ids: next_short_room_ids,
limit: limit.try_into().ok()?, limit: limit.try_into().ok()?,
max_depth: max_depth.try_into().ok()?, max_depth: max_depth.try_into().ok()?,
suggested_only, suggested_only,
}) })
.as_ref() .as_ref()
.map(PaginationToken::to_string) .map(PaginationToken::to_string)
}) });
.into();
Ok(get_hierarchy::v1::Response { Ok(get_hierarchy::v1::Response {
next_batch: next_batch.await.flatten(), next_batch: next_batch.await.flatten(),

View File

@@ -6,7 +6,7 @@ use std::{
use axum::extract::State; use axum::extract::State;
use futures::{ use futures::{
FutureExt, StreamExt, TryFutureExt, FutureExt, StreamExt, TryFutureExt,
future::{OptionFuture, join, join3, join4, join5}, future::{join, join3, join4, join5},
pin_mut, pin_mut,
}; };
use ruma::{ use ruma::{
@@ -52,6 +52,7 @@ use tuwunel_core::{
self, BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt, self, BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt,
future::{OptionStream, ReadyBoolExt}, future::{OptionStream, ReadyBoolExt},
math::ruma_from_u64, math::ruma_from_u64,
option::OptionExt,
result::MapExpect, result::MapExpect,
stream::{BroadbandExt, Tools, TryExpect, WidebandExt}, stream::{BroadbandExt, Tools, TryExpect, WidebandExt},
string::to_small_string, string::to_small_string,
@@ -130,19 +131,18 @@ pub(crate) async fn sync_events_route(
let sender_user = body.sender_user(); let sender_user = body.sender_user();
let sender_device = body.sender_device.as_deref(); let sender_device = body.sender_device.as_deref();
let filter: OptionFuture<_> = body let filter = body
.body .body
.filter .filter
.as_ref() .as_ref()
.map(async |filter| match filter { .map_async(async |filter| match filter {
| Filter::FilterDefinition(filter) => filter.clone(), | Filter::FilterDefinition(filter) => filter.clone(),
| Filter::FilterId(filter_id) => services | Filter::FilterId(filter_id) => services
.users .users
.get_filter(sender_user, filter_id) .get_filter(sender_user, filter_id)
.await .await
.unwrap_or_default(), .unwrap_or_default(),
}) });
.into();
let filter = filter.map(Option::unwrap_or_default); let filter = filter.map(Option::unwrap_or_default);
let full_state = body.body.full_state; let full_state = body.body.full_state;
@@ -243,16 +243,13 @@ async fn build_empty_response(
sender_device: Option<&DeviceId>, sender_device: Option<&DeviceId>,
next_batch: u64, next_batch: u64,
) -> sync_events::v3::Response { ) -> sync_events::v3::Response {
let device_one_time_keys_count: OptionFuture<_> = sender_device
.map(|sender_device| {
services
.users
.count_one_time_keys(sender_user, sender_device)
})
.into();
sync_events::v3::Response { sync_events::v3::Response {
device_one_time_keys_count: device_one_time_keys_count device_one_time_keys_count: sender_device
.map_async(|sender_device| {
services
.users
.count_one_time_keys(sender_user, sender_device)
})
.await .await
.unwrap_or_default(), .unwrap_or_default(),
@@ -380,11 +377,12 @@ async fn build_sync_events(
knocked_rooms knocked_rooms
}); });
let presence_updates: OptionFuture<_> = services let presence_updates = services
.config .config
.allow_local_presence .allow_local_presence
.then(|| process_presence_updates(services, since, next_batch, sender_user, filter)) .then_async(|| {
.into(); process_presence_updates(services, since, next_batch, sender_user, filter)
});
let account_data = services let account_data = services
.account_data .account_data
@@ -399,32 +397,26 @@ async fn build_sync_events(
.map(ToOwned::to_owned) .map(ToOwned::to_owned)
.collect::<HashSet<_>>(); .collect::<HashSet<_>>();
let to_device_events: OptionFuture<_> = sender_device let to_device_events = sender_device.map_async(|sender_device| {
.map(|sender_device| { services
services .users
.users .get_to_device_events(sender_user, sender_device, Some(since), Some(next_batch))
.get_to_device_events(sender_user, sender_device, Some(since), Some(next_batch)) .map(at!(1))
.map(at!(1)) .collect::<Vec<_>>()
.collect::<Vec<_>>() });
})
.into();
let device_one_time_keys_count: OptionFuture<_> = sender_device let device_one_time_keys_count = sender_device.map_async(|sender_device| {
.map(|sender_device| { services
services .users
.users .count_one_time_keys(sender_user, sender_device)
.count_one_time_keys(sender_user, sender_device) });
})
.into();
// Remove all to-device events the device received *last time* // Remove all to-device events the device received *last time*
let remove_to_device_events: OptionFuture<_> = sender_device let remove_to_device_events = sender_device.map_async(|sender_device| {
.map(|sender_device| { services
services .users
.users .remove_to_device_events(sender_user, sender_device, since)
.remove_to_device_events(sender_user, sender_device, since) });
})
.into();
let ( let (
account_data, account_data,
@@ -643,17 +635,16 @@ async fn load_left_room(
.prev_shortstatehash(room_id, PduCount::Normal(since).saturating_add(1)) .prev_shortstatehash(room_id, PduCount::Normal(since).saturating_add(1))
.ok(); .ok();
let horizon_shortstatehash: OptionFuture<_> = timeline_pdus let horizon_shortstatehash = timeline_pdus
.first() .first()
.map(at!(0)) .map(at!(0))
.map(|count| { .map_async(|count| {
services services
.timeline .timeline
.get_shortstatehash(room_id, count) .get_shortstatehash(room_id, count)
.inspect_err(inspect_debug_log) .inspect_err(inspect_debug_log)
.ok() .ok()
}) });
.into();
let left_shortstatehash = services let left_shortstatehash = services
.timeline .timeline
@@ -790,40 +781,34 @@ async fn load_joined_room(
"if timeline events, last_timeline_count must be in the since window." "if timeline events, last_timeline_count must be in the since window."
); );
let since_shortstatehash: OptionFuture<_> = timeline_changed let since_shortstatehash = timeline_changed.then_async(|| {
.then(|| { services
services .timeline
.timeline .prev_shortstatehash(room_id, PduCount::Normal(since).saturating_add(1))
.prev_shortstatehash(room_id, PduCount::Normal(since).saturating_add(1)) .ok()
.ok() });
})
.into();
let horizon_shortstatehash: OptionFuture<_> = timeline_pdus let horizon_shortstatehash = timeline_pdus
.first() .first()
.map(at!(0)) .map(at!(0))
.map(|count| { .map_async(|count| {
services services
.timeline .timeline
.get_shortstatehash(room_id, count) .get_shortstatehash(room_id, count)
.inspect_err(inspect_debug_log) .inspect_err(inspect_debug_log)
}) });
.into();
let current_shortstatehash: OptionFuture<_> = timeline_changed let current_shortstatehash = timeline_changed.then_async(|| {
.then(|| { services
services .timeline
.timeline .get_shortstatehash(room_id, last_timeline_count)
.get_shortstatehash(room_id, last_timeline_count) .inspect_err(inspect_debug_log)
.inspect_err(inspect_debug_log) .or_else(|_| services.state.get_room_shortstatehash(room_id))
.or_else(|_| services.state.get_room_shortstatehash(room_id)) .map_err(|_| err!(Database(error!("Room {room_id} has no state"))))
.map_err(|_| err!(Database(error!("Room {room_id} has no state")))) });
})
.into();
let encrypted_room: OptionFuture<_> = timeline_changed let encrypted_room =
.then(|| services.state_accessor.is_encrypted_room(room_id)) timeline_changed.then_async(|| services.state_accessor.is_encrypted_room(room_id));
.into();
let receipt_events = services let receipt_events = services
.read_receipt .read_receipt
@@ -873,53 +858,43 @@ async fn load_joined_room(
}; };
// Reset lazy loading because this is an initial sync // Reset lazy loading because this is an initial sync
let lazy_load_reset: OptionFuture<_> = initial let lazy_load_reset =
.then(|| services.lazy_loading.reset(lazy_loading_context)) initial.then_async(|| services.lazy_loading.reset(lazy_loading_context));
.into();
lazy_load_reset.await; lazy_load_reset.await;
let witness: OptionFuture<_> = lazy_loading_enabled let witness = lazy_loading_enabled.then_async(|| {
.then(|| { let witness: Witness = timeline_pdus
let witness: Witness = timeline_pdus .iter()
.iter() .map(ref_at!(1))
.map(ref_at!(1)) .map(Event::sender)
.map(Event::sender) .map(Into::into)
.map(Into::into) .chain(receipt_events.keys().map(Into::into))
.chain(receipt_events.keys().map(Into::into)) .collect();
.collect();
services services
.lazy_loading .lazy_loading
.witness_retain(witness, lazy_loading_context) .witness_retain(witness, lazy_loading_context)
}) });
.into();
let sender_joined_count: OptionFuture<_> = timeline_changed let sender_joined_count = timeline_changed.then_async(|| {
.then(|| { services
services .state_cache
.state_cache .get_joined_count(room_id, sender_user)
.get_joined_count(room_id, sender_user) .unwrap_or(0)
.unwrap_or(0) });
})
.into();
let since_encryption: OptionFuture<_> = since_shortstatehash let since_encryption = since_shortstatehash.map_async(|shortstatehash| {
.map(|shortstatehash| { services
services .state_accessor
.state_accessor .state_get(shortstatehash, &StateEventType::RoomEncryption, "")
.state_get(shortstatehash, &StateEventType::RoomEncryption, "") });
})
.into();
let last_notification_read: OptionFuture<_> = timeline_pdus let last_notification_read = timeline_pdus.is_empty().then_async(|| {
.is_empty() services
.then(|| { .pusher
services .last_notification_read(sender_user, room_id)
.pusher .ok()
.last_notification_read(sender_user, room_id) });
.ok()
})
.into();
let last_privateread_update = services let last_privateread_update = services
.read_receipt .read_receipt
@@ -941,21 +916,19 @@ async fn load_joined_room(
let joined_since_last_sync = sender_joined_count.unwrap_or(0) > since; let joined_since_last_sync = sender_joined_count.unwrap_or(0) > since;
let state_changes: OptionFuture<_> = current_shortstatehash let state_changes = current_shortstatehash.map_async(|current_shortstatehash| {
.map(|current_shortstatehash| { calculate_state_changes(
calculate_state_changes( services,
services, sender_user,
sender_user, room_id,
room_id, full_state || initial,
full_state || initial, since_shortstatehash,
since_shortstatehash, horizon_shortstatehash,
horizon_shortstatehash, current_shortstatehash,
current_shortstatehash, joined_since_last_sync,
joined_since_last_sync, witness.as_ref(),
witness.as_ref(), )
) });
})
.into();
let StateChanges { let StateChanges {
heroes, heroes,
@@ -1002,35 +975,28 @@ async fn load_joined_room(
let send_notification_count_filter = let send_notification_count_filter =
|count: &UInt| *count != uint!(0) || send_notification_resets; |count: &UInt| *count != uint!(0) || send_notification_resets;
let notification_count: OptionFuture<_> = send_notification_counts let notification_count = send_notification_counts.then_async(|| {
.then(|| { services
services .pusher
.pusher .notification_count(sender_user, room_id)
.notification_count(sender_user, room_id) .map(TryInto::try_into)
.map(TryInto::try_into) .unwrap_or(uint!(0))
.unwrap_or(uint!(0)) });
})
.into();
let highlight_count: OptionFuture<_> = send_notification_counts let highlight_count = send_notification_counts.then_async(|| {
.then(|| { services
services .pusher
.pusher .highlight_count(sender_user, room_id)
.highlight_count(sender_user, room_id) .map(TryInto::try_into)
.map(TryInto::try_into) .unwrap_or(uint!(0))
.unwrap_or(uint!(0)) });
})
.into();
let private_read_event: OptionFuture<_> = last_privateread_update let private_read_event = last_privateread_update.gt(&since).then_async(|| {
.gt(&since) services
.then(|| { .read_receipt
services .private_read_get(room_id, sender_user)
.read_receipt .map(Result::ok)
.private_read_get(room_id, sender_user) });
.map(Result::ok)
})
.into();
let typing_events = services let typing_events = services
.typing .typing
@@ -1226,37 +1192,31 @@ async fn calculate_state_changes<'a>(
.ok() .ok()
}; };
let lazy_state_ids: OptionFuture<_> = witness let lazy_state_ids = witness.map_async(|witness| {
.map(|witness| { witness
witness .iter()
.iter() .stream()
.stream() .ready_filter(|&user_id| user_id != sender_user)
.ready_filter(|&user_id| user_id != sender_user) .broad_filter_map(|user_id| state_get_shorteventid(user_id))
.broad_filter_map(|user_id| state_get_shorteventid(user_id)) .into_future()
.into_future() });
})
.into();
let state_diff_ids: OptionFuture<_> = incremental let state_diff_ids = incremental.then_async(|| {
.then(|| { services
services .state_accessor
.state_accessor .state_added((since_shortstatehash, horizon_shortstatehash))
.state_added((since_shortstatehash, horizon_shortstatehash)) .boxed()
.boxed() .into_future()
.into_future() });
})
.into();
let current_state_ids: OptionFuture<_> = (!incremental) let current_state_ids = (!incremental).then_async(|| {
.then(|| { services
services .state_accessor
.state_accessor .state_full_shortids(horizon_shortstatehash)
.state_full_shortids(horizon_shortstatehash) .expect_ok()
.expect_ok() .boxed()
.boxed() .into_future()
.into_future() });
})
.into();
let state_events = current_state_ids let state_events = current_state_ids
.stream() .stream()
@@ -1278,9 +1238,8 @@ async fn calculate_state_changes<'a>(
.iter() .iter()
.any(|event| *event.kind() == RoomMember); .any(|event| *event.kind() == RoomMember);
let member_counts: OptionFuture<_> = send_member_counts let member_counts =
.then(|| calculate_counts(services, room_id, sender_user)) send_member_counts.then_async(|| calculate_counts(services, room_id, sender_user));
.into();
let (joined_member_count, invited_member_count, heroes) = let (joined_member_count, invited_member_count, heroes) =
member_counts.await.unwrap_or((None, None, None)); member_counts.await.unwrap_or((None, None, None));
@@ -1334,12 +1293,11 @@ async fn calculate_counts(
let small_room = joined_member_count.saturating_add(invited_member_count) <= 5; let small_room = joined_member_count.saturating_add(invited_member_count) <= 5;
let heroes: OptionFuture<_> = services let heroes = services
.config .config
.calculate_heroes .calculate_heroes
.and_is(small_room) .and_is(small_room)
.then(|| calculate_heroes(services, room_id, sender_user)) .then_async(|| calculate_heroes(services, room_id, sender_user));
.into();
(Some(joined_member_count), Some(invited_member_count), heroes.await) (Some(joined_member_count), Some(invited_member_count), heroes.await)
} }

View File

@@ -6,10 +6,7 @@ mod typing;
use std::fmt::Debug; use std::fmt::Debug;
use futures::{ use futures::{FutureExt, future::join5};
FutureExt,
future::{OptionFuture, join5},
};
use ruma::{ use ruma::{
RoomId, RoomId,
api::client::sync::sync_events::v5::{ListId, request::ExtensionRoomConfig, response}, api::client::sync::sync_events::v5::{ListId, request::ExtensionRoomConfig, response},
@@ -37,45 +34,40 @@ pub(super) async fn handle(
) -> Result<response::Extensions> { ) -> Result<response::Extensions> {
let SyncInfo { .. } = sync_info; let SyncInfo { .. } = sync_info;
let account_data: OptionFuture<_> = conn let account_data = conn
.extensions .extensions
.account_data .account_data
.enabled .enabled
.unwrap_or(false) .unwrap_or(false)
.then(|| account_data::collect(sync_info, conn, window)) .then_async(|| account_data::collect(sync_info, conn, window));
.into();
let receipts: OptionFuture<_> = conn let receipts = conn
.extensions .extensions
.receipts .receipts
.enabled .enabled
.unwrap_or(false) .unwrap_or(false)
.then(|| receipts::collect(sync_info, conn, window)) .then_async(|| receipts::collect(sync_info, conn, window));
.into();
let typing: OptionFuture<_> = conn let typing = conn
.extensions .extensions
.typing .typing
.enabled .enabled
.unwrap_or(false) .unwrap_or(false)
.then(|| typing::collect(sync_info, conn, window)) .then_async(|| typing::collect(sync_info, conn, window));
.into();
let to_device: OptionFuture<_> = conn let to_device = conn
.extensions .extensions
.to_device .to_device
.enabled .enabled
.unwrap_or(false) .unwrap_or(false)
.then(|| to_device::collect(sync_info, conn)) .then_async(|| to_device::collect(sync_info, conn));
.into();
let e2ee: OptionFuture<_> = conn let e2ee = conn
.extensions .extensions
.e2ee .e2ee
.enabled .enabled
.unwrap_or(false) .unwrap_or(false)
.then(|| e2ee::collect(sync_info, conn)) .then_async(|| e2ee::collect(sync_info, conn));
.into();
let (account_data, receipts, typing, to_device, e2ee) = let (account_data, receipts, typing, to_device, e2ee) =
join5(account_data, receipts, typing, to_device, e2ee) join5(account_data, receipts, typing, to_device, e2ee)

View File

@@ -2,7 +2,7 @@ use std::collections::HashSet;
use futures::{ use futures::{
FutureExt, StreamExt, TryFutureExt, FutureExt, StreamExt, TryFutureExt,
future::{OptionFuture, join, join3}, future::{join, join3},
stream::once, stream::once,
}; };
use ruma::{ use ruma::{
@@ -71,15 +71,12 @@ pub(super) async fn collect(
let device_one_time_keys_count = services let device_one_time_keys_count = services
.users .users
.last_one_time_keys_update(sender_user) .last_one_time_keys_update(sender_user)
.then(|since| -> OptionFuture<_> { .then(|since| {
since since.gt(&conn.globalsince).then_async(|| {
.gt(&conn.globalsince) services
.then(|| { .users
services .count_one_time_keys(sender_user, sender_device)
.users })
.count_one_time_keys(sender_user, sender_device)
})
.into()
}) })
.map(Option::unwrap_or_default); .map(Option::unwrap_or_default);
@@ -164,9 +161,8 @@ async fn collect_room(
let encrypted_since_last_sync = !since_encryption; let encrypted_since_last_sync = !since_encryption;
let joined_since_last_sync = sender_joined_count.is_ok_and(|count| count > conn.globalsince); let joined_since_last_sync = sender_joined_count.is_ok_and(|count| count > conn.globalsince);
let joined_members_burst: OptionFuture<_> = (joined_since_last_sync let joined_members_burst =
|| encrypted_since_last_sync) (joined_since_last_sync || encrypted_since_last_sync).then_async(|| {
.then(|| {
services services
.state_cache .state_cache
.room_members(room_id) .room_members(room_id)
@@ -175,8 +171,7 @@ async fn collect_room(
.map(|user_id| (MembershipState::Join, user_id)) .map(|user_id| (MembershipState::Join, user_id))
.boxed() .boxed()
.into_future() .into_future()
}) });
.into();
services services
.state_accessor .state_accessor

View File

@@ -1,4 +1,4 @@
use futures::{StreamExt, future::OptionFuture, pin_mut}; use futures::{StreamExt, pin_mut};
use ruma::{ use ruma::{
RoomId, api::client::sync::sync_events::v5::request::ListFilters, directory::RoomTypeFilter, RoomId, api::client::sync::sync_events::v5::request::ListFilters, directory::RoomTypeFilter,
events::room::member::MembershipState, events::room::member::MembershipState,
@@ -7,7 +7,8 @@ use tuwunel_core::{
is_equal_to, is_true, is_equal_to, is_true,
utils::{ utils::{
BoolExt, FutureBoolExt, IterStream, ReadyExt, BoolExt, FutureBoolExt, IterStream, ReadyExt,
future::{self, OptionExt, ReadyBoolExt}, future::{self, OptionFutureExt, ReadyBoolExt},
option::OptionExt,
}, },
}; };
@@ -20,56 +21,49 @@ pub(super) async fn filter_room(
room_id: &RoomId, room_id: &RoomId,
membership: Option<&MembershipState>, membership: Option<&MembershipState>,
) -> bool { ) -> bool {
let match_invite: OptionFuture<_> = filter let match_invite =
.is_invite filter
.map(async |is_invite| match (membership, is_invite) { .is_invite
| (Some(MembershipState::Invite), true) => true, .map_async(async |is_invite| match (membership, is_invite) {
| (Some(MembershipState::Invite), false) => false, | (Some(MembershipState::Invite), true) => true,
| (Some(_), true) => false, | (Some(MembershipState::Invite), false) => false,
| (Some(_), false) => true, | (Some(_), true) => false,
| _ => | (Some(_), false) => true,
services | _ =>
.state_cache services
.is_invited(sender_user, room_id) .state_cache
.await == is_invite, .is_invited(sender_user, room_id)
}) .await == is_invite,
.into(); });
let match_direct: OptionFuture<_> = filter let match_direct = filter.is_dm.map_async(async |is_dm| {
.is_dm services
.map(async |is_dm| { .account_data
services .is_direct(sender_user, room_id)
.account_data .await == is_dm
.is_direct(sender_user, room_id) });
.await == is_dm
})
.into();
let match_direct_member: OptionFuture<_> = filter let match_direct_member = filter.is_dm.map_async(async |is_dm| {
.is_dm services
.map(async |is_dm| { .state_accessor
services .is_direct(room_id, sender_user)
.state_accessor .await == is_dm
.is_direct(room_id, sender_user) });
.await == is_dm
})
.into();
let match_encrypted: OptionFuture<_> = filter let match_encrypted = filter
.is_encrypted .is_encrypted
.map(async |is_encrypted| { .map_async(async |is_encrypted| {
services services
.state_accessor .state_accessor
.is_encrypted_room(room_id) .is_encrypted_room(room_id)
.await == is_encrypted .await == is_encrypted
}) });
.into();
let match_space_child: OptionFuture<_> = filter let match_space_child = filter
.spaces .spaces
.is_empty() .is_empty()
.is_false() .is_false()
.then(async || { .then_async(async || {
filter filter
.spaces .spaces
.iter() .iter()
@@ -77,43 +71,38 @@ pub(super) async fn filter_room(
.flat_map(|room_id| services.spaces.get_space_children(room_id)) .flat_map(|room_id| services.spaces.get_space_children(room_id))
.ready_any(is_equal_to!(room_id)) .ready_any(is_equal_to!(room_id))
.await .await
}) });
.into();
let fetch_tags = !filter.tags.is_empty() || !filter.not_tags.is_empty(); let fetch_tags = !filter.tags.is_empty() || !filter.not_tags.is_empty();
let match_room_tag: OptionFuture<_> = fetch_tags let match_room_tag = fetch_tags.then_async(async || {
.then(async || { if let Some(tags) = services
if let Some(tags) = services .account_data
.account_data .get_room_tags(sender_user, room_id)
.get_room_tags(sender_user, room_id) .await
.await .ok()
.ok() .filter(|tags| !tags.is_empty())
.filter(|tags| !tags.is_empty()) {
{ tags.keys().any(|tag| {
tags.keys().any(|tag| { (filter.not_tags.is_empty() || !filter.not_tags.contains(tag))
(filter.not_tags.is_empty() || !filter.not_tags.contains(tag)) || (!filter.tags.is_empty() && filter.tags.contains(tag))
|| (!filter.tags.is_empty() && filter.tags.contains(tag)) })
}) } else {
} else { filter.tags.is_empty()
filter.tags.is_empty() }
} });
})
.into();
let fetch_room_type = !filter.room_types.is_empty() || !filter.not_room_types.is_empty(); let fetch_room_type = !filter.room_types.is_empty() || !filter.not_room_types.is_empty();
let match_room_type: OptionFuture<_> = fetch_room_type let match_room_type = fetch_room_type.then_async(async || {
.then(async || { let room_type = services
let room_type = services .state_accessor
.state_accessor .get_room_type(room_id)
.get_room_type(room_id) .await
.await .ok();
.ok();
let room_type = RoomTypeFilter::from(room_type); let room_type = RoomTypeFilter::from(room_type);
(filter.not_room_types.is_empty() || !filter.not_room_types.contains(&room_type)) (filter.not_room_types.is_empty() || !filter.not_room_types.contains(&room_type))
&& (filter.room_types.is_empty() || filter.room_types.contains(&room_type)) && (filter.room_types.is_empty() || filter.room_types.contains(&room_type))
}) });
.into();
future::and7( future::and7(
match_invite.is_none_or(is_true!()), match_invite.is_none_or(is_true!()),

View File

@@ -5,7 +5,7 @@ use std::{
use futures::{ use futures::{
FutureExt, StreamExt, TryFutureExt, TryStreamExt, FutureExt, StreamExt, TryFutureExt, TryStreamExt,
future::{OptionFuture, join, join3, join4}, future::{join, join3, join4},
}; };
use ruma::{ use ruma::{
JsOption, MxcUri, OwnedMxcUri, OwnedRoomId, RoomId, UserId, JsOption, MxcUri, OwnedMxcUri, OwnedRoomId, RoomId, UserId,
@@ -131,19 +131,16 @@ async fn handle_room(
(timeline_limit, required_state) (timeline_limit, required_state)
}); });
let timeline: OptionFuture<_> = is_invite let timeline = is_invite.is_false().then_async(|| {
.is_false() load_timeline(
.then(|| { services,
load_timeline( sender_user,
services, room_id,
sender_user, PduCount::Normal(roomsince),
room_id, Some(PduCount::from(conn.next_batch)),
PduCount::Normal(roomsince), timeline_limit,
Some(PduCount::from(conn.next_batch)), )
timeline_limit, });
)
})
.into();
let (timeline_pdus, limited, _lastcount) = timeline let (timeline_pdus, limited, _lastcount) = timeline
.await .await
@@ -182,18 +179,17 @@ async fn handle_room(
.map(TryInto::try_into) .map(TryInto::try_into)
.flat_ok(); .flat_ok();
let num_live: OptionFuture<_> = roomsince let num_live = roomsince
.ne(&0) .ne(&0)
.and_is(limited || timeline_pdus.len() >= timeline_limit) .and_is(limited || timeline_pdus.len() >= timeline_limit)
.then(|| { .then_async(|| {
services services
.timeline .timeline
.pdus(None, room_id, Some(roomsince.into())) .pdus(None, room_id, Some(roomsince.into()))
.count() .count()
.map(TryInto::try_into) .map(TryInto::try_into)
.map(Result::ok) .map(Result::ok)
}) });
.into();
let lazy = required_state let lazy = required_state
.iter() .iter()
@@ -249,14 +245,12 @@ async fn handle_room(
.collect(); .collect();
// TODO: figure out a timestamp we can use for remote invites // TODO: figure out a timestamp we can use for remote invites
let invite_state: OptionFuture<_> = is_invite let invite_state = is_invite.then_async(|| {
.then(|| { services
services .state_cache
.state_cache .invite_state(sender_user, room_id)
.invite_state(sender_user, room_id) .ok()
.ok() });
})
.into();
let room_name = services let room_name = services
.state_accessor .state_accessor
@@ -327,10 +321,10 @@ async fn handle_room(
.boxed() .boxed()
.await; .await;
let heroes: OptionFuture<_> = services let heroes = services
.config .config
.calculate_heroes .calculate_heroes
.then(|| { .then_async(|| {
calculate_heroes( calculate_heroes(
services, services,
sender_user, sender_user,
@@ -339,9 +333,10 @@ async fn handle_room(
room_avatar.as_deref(), room_avatar.as_deref(),
) )
}) })
.into(); .await
.unwrap_or_default();
let (heroes, heroes_name, heroes_avatar) = heroes.await.unwrap_or_default(); let (heroes, heroes_name, heroes_avatar) = heroes;
Ok(response::Room { Ok(response::Room {
initial: roomsince.eq(&0).then_some(true), initial: roomsince.eq(&0).then_some(true),
@@ -388,17 +383,15 @@ async fn calculate_heroes(
.await .await
.ok()?; .ok()?;
let name: OptionFuture<_> = content let name = content
.displayname .displayname
.is_none() .is_none()
.then(|| services.users.displayname(&user_id).ok()) .then_async(|| services.users.displayname(&user_id).ok());
.into();
let avatar: OptionFuture<_> = content let avatar = content
.avatar_url .avatar_url
.is_none() .is_none()
.then(|| services.users.avatar_url(&user_id).ok()) .then_async(|| services.users.avatar_url(&user_id).ok());
.into();
let (name, avatar) = join(name, avatar).await; let (name, avatar) = join(name, avatar).await;
let hero = response::Hero { let hero = response::Hero {

View File

@@ -1,9 +1,6 @@
use std::cmp::Ordering; use std::cmp::Ordering;
use futures::{ use futures::{FutureExt, StreamExt, TryFutureExt, future::join5};
FutureExt, StreamExt, TryFutureExt,
future::{OptionFuture, join5},
};
use ruma::{OwnedRoomId, UInt, events::room::member::MembershipState, uint}; use ruma::{OwnedRoomId, UInt, events::room::member::MembershipState, uint};
use tuwunel_core::{ use tuwunel_core::{
apply, is_true, apply, is_true,
@@ -12,6 +9,7 @@ use tuwunel_core::{
utils::{ utils::{
BoolExt, TryFutureExtExt, BoolExt, TryFutureExtExt,
math::usize_from_ruma, math::usize_from_ruma,
option::OptionExt,
stream::{BroadbandExt, IterStream}, stream::{BroadbandExt, IterStream},
}, },
}; };
@@ -80,15 +78,11 @@ async fn matcher(
.iter() .iter()
.stream() .stream()
.filter_map(async |(id, list)| { .filter_map(async |(id, list)| {
let filter: OptionFuture<_> = list list.filters
.filters
.clone() .clone()
.map(async |filters| { .map_async(async |filters| {
filter_room(sync_info, &filters, &room_id, membership.as_ref()).await filter_room(sync_info, &filters, &room_id, membership.as_ref()).await
}) })
.into();
filter
.await .await
.is_none_or(is_true!()) .is_none_or(is_true!())
.then(|| id.clone()) .then(|| id.clone())
@@ -97,50 +91,40 @@ async fn matcher(
.map(|lists| (lists.is_empty().is_false(), lists)) .map(|lists| (lists.is_empty().is_false(), lists))
.await; .await;
let last_notification: OptionFuture<_> = matched let last_notification = matched.then_async(|| {
.then(|| { services
services .pusher
.pusher .last_notification_read(sender_user, &room_id)
.last_notification_read(sender_user, &room_id) .unwrap_or_default()
.unwrap_or_default() });
})
.into();
let last_privateread: OptionFuture<_> = matched let last_privateread = matched.then_async(|| {
.then(|| { services
services .read_receipt
.read_receipt .last_privateread_update(sender_user, &room_id)
.last_privateread_update(sender_user, &room_id) });
})
.into();
let last_receipt: OptionFuture<_> = matched let last_receipt = matched.then_async(|| {
.then(|| { services
services .read_receipt
.read_receipt .last_receipt_count(&room_id, sender_user.into(), None)
.last_receipt_count(&room_id, sender_user.into(), None) .unwrap_or_default()
.unwrap_or_default() });
})
.into();
let last_account: OptionFuture<_> = matched let last_account = matched.then_async(|| {
.then(|| { services
services .account_data
.account_data .last_count(Some(room_id.as_ref()), sender_user, Some(conn.next_batch))
.last_count(Some(room_id.as_ref()), sender_user, Some(conn.next_batch)) .unwrap_or_default()
.unwrap_or_default() });
})
.into();
let last_timeline: OptionFuture<_> = matched let last_timeline = matched.then_async(|| {
.then(|| { services
services .timeline
.timeline .last_timeline_count(None, &room_id, Some(conn.next_batch.into()))
.last_timeline_count(None, &room_id, Some(conn.next_batch.into())) .map_ok(PduCount::into_unsigned)
.map_ok(PduCount::into_unsigned) .unwrap_or_default()
.unwrap_or_default() });
})
.into();
let (last_timeline, last_notification, last_account, last_receipt, last_privateread) = let (last_timeline, last_notification, last_account, last_receipt, last_privateread) =
join5(last_timeline, last_notification, last_account, last_receipt, last_privateread) join5(last_timeline, last_notification, last_account, last_receipt, last_privateread)

View File

@@ -8,7 +8,7 @@ use ruma::{
use tuwunel_core::{ use tuwunel_core::{
Result, Result,
utils::{ utils::{
future::BoolExt, BoolExt, FutureBoolExt,
stream::{BroadbandExt, ReadyExt}, stream::{BroadbandExt, ReadyExt},
}, },
}; };
@@ -52,11 +52,12 @@ pub(crate) async fn search_users_route(
&search_term, &search_term,
) )
.await .await
.then_some(search_users::v3::User { .then_async(async || search_users::v3::User {
user_id: user_id.clone(), user_id: user_id.clone(),
display_name, display_name,
avatar_url: services.users.avatar_url(&user_id).await.ok(), avatar_url: services.users.avatar_url(&user_id).await.ok(),
}) })
.await
}); });
pin_mut!(users); pin_mut!(users);

View File

@@ -1,6 +1,6 @@
use futures::{FutureExt, StreamExt, future::OptionFuture, join}; use futures::{FutureExt, StreamExt, join};
use ruma::{EventId, RoomId, ServerName}; use ruma::{EventId, RoomId, ServerName};
use tuwunel_core::{Err, Result, implement, is_false}; use tuwunel_core::{Err, Result, implement, is_false, utils::option::OptionExt};
use tuwunel_service::Services; use tuwunel_service::Services;
pub(super) struct AccessCheck<'a> { pub(super) struct AccessCheck<'a> {
@@ -36,14 +36,11 @@ pub(super) async fn check(&self) -> Result {
.room_members_knocked(self.room_id) .room_members_knocked(self.room_id)
.count(); .count();
let server_can_see: OptionFuture<_> = self let server_can_see = self.event_id.map_async(|event_id| {
.event_id self.services
.map(|event_id| { .state_accessor
self.services .server_can_see_event(self.origin, self.room_id, event_id)
.state_accessor });
.server_can_see_event(self.origin, self.room_id, event_id)
})
.into();
let (world_readable, server_in_room, server_can_see, acl_check, user_is_knocking) = let (world_readable, server_in_room, server_can_see, acl_check, user_is_knocking) =
join!(world_readable, server_in_room, server_can_see, acl_check, user_is_knocking); join!(world_readable, server_in_room, server_can_see, acl_check, user_is_knocking);

View File

@@ -29,8 +29,7 @@ pub use self::{check::check, manager::Manager};
use crate::{ use crate::{
Result, err, Result, err,
error::Error, error::Error,
utils, utils::{self, option::OptionExt, string::EMPTY, sys},
utils::{string::EMPTY, sys},
}; };
/// All the config options for tuwunel. /// All the config options for tuwunel.
@@ -2663,14 +2662,12 @@ impl IdentityProvider {
return Ok(client_secret.clone()); return Ok(client_secret.clone());
} }
futures::future::OptionFuture::from( self.client_secret_file
self.client_secret_file .as_ref()
.as_ref() .map_async(tokio::fs::read_to_string)
.map(tokio::fs::read_to_string), .await
) .transpose()?
.await .ok_or_else(|| err!("No client secret or client secret file configured"))
.transpose()?
.ok_or_else(|| err!("No client secret or client secret file configured"))
} }
} }

View File

@@ -11,7 +11,7 @@ mod topological_sort;
use std::collections::{BTreeMap, BTreeSet}; use std::collections::{BTreeMap, BTreeSet};
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, future::OptionFuture}; use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
use ruma::{OwnedEventId, events::StateEventType, room_version_rules::RoomVersionRules}; use ruma::{OwnedEventId, events::StateEventType, room_version_rules::RoomVersionRules};
pub use self::topological_sort::topological_sort; pub use self::topological_sort::topological_sort;
@@ -26,7 +26,11 @@ use crate::{
Result, debug, Result, debug,
matrix::{Event, TypeStateKey}, matrix::{Event, TypeStateKey},
trace, trace,
utils::stream::{BroadbandExt, IterStream}, utils::{
BoolExt,
option::OptionExt,
stream::{BroadbandExt, IterStream},
},
}; };
/// ConflictMap of OwnedEventId specifically. /// ConflictMap of OwnedEventId specifically.
@@ -102,10 +106,9 @@ where
|| backport_css; || backport_css;
// Since `org.matrix.hydra.11`, fetch the conflicted state subgraph. // Since `org.matrix.hydra.11`, fetch the conflicted state subgraph.
let conflicted_subgraph: OptionFuture<_> = consider_conflicted_subgraph let conflicted_subgraph = consider_conflicted_subgraph
.then(|| conflicted_states.clone().into_values().flatten()) .then(|| conflicted_states.clone().into_values().flatten())
.map(async |ids| conflicted_subgraph_dfs(ids.stream(), fetch)) .map_async(async |ids| conflicted_subgraph_dfs(ids.stream(), fetch));
.into();
let conflicted_subgraph = conflicted_subgraph let conflicted_subgraph = conflicted_subgraph
.await .await
@@ -180,9 +183,8 @@ where
// 3. Take all remaining events that werent picked in step 1 and order them by // 3. Take all remaining events that werent picked in step 1 and order them by
// the mainline ordering based on the power level in the partially resolved // the mainline ordering based on the power level in the partially resolved
// state obtained in step 2. // state obtained in step 2.
let sorted_remaining_events: OptionFuture<_> = have_remaining_events let sorted_remaining_events = have_remaining_events
.then(move || mainline_sort(power_event.cloned(), remaining_events, fetch)) .then_async(move || mainline_sort(power_event.cloned(), remaining_events, fetch));
.into();
let sorted_remaining_events = sorted_remaining_events let sorted_remaining_events = sorted_remaining_events
.await .await

View File

@@ -1,5 +1,7 @@
//! Trait BoolExt //! Trait BoolExt
use futures::future::OptionFuture;
/// Boolean extensions and chain.starters /// Boolean extensions and chain.starters
pub trait BoolExt { pub trait BoolExt {
fn and<T>(self, t: Option<T>) -> Option<T>; fn and<T>(self, t: Option<T>) -> Option<T>;
@@ -50,6 +52,8 @@ pub trait BoolExt {
fn or_some<T>(self, t: T) -> Option<T>; fn or_some<T>(self, t: T) -> Option<T>;
fn then_async<O: Future, F: FnOnce() -> O>(self, f: F) -> OptionFuture<O>;
fn then_none<T>(self) -> Option<T>; fn then_none<T>(self) -> Option<T>;
fn then_ok_or<T, E>(self, t: T, e: E) -> Result<T, E>; fn then_ok_or<T, E>(self, t: T, e: E) -> Result<T, E>;
@@ -126,6 +130,11 @@ impl BoolExt for bool {
#[inline] #[inline]
fn or_some<T>(self, t: T) -> Option<T> { self.is_false().then_some(t) } fn or_some<T>(self, t: T) -> Option<T> { self.is_false().then_some(t) }
#[inline]
fn then_async<O: Future, F: FnOnce() -> O>(self, f: F) -> OptionFuture<O> {
OptionFuture::<_>::from(self.then(f))
}
#[inline] #[inline]
fn then_none<T>(self) -> Option<T> { Option::<T>::None } fn then_none<T>(self) -> Option<T> { Option::<T>::None }

View File

@@ -9,7 +9,7 @@ mod try_ext_ext;
pub use self::{ pub use self::{
bool_ext::{BoolExt, and, and4, and5, and6, and7, or}, bool_ext::{BoolExt, and, and4, and5, and6, and7, or},
ext_ext::ExtExt, ext_ext::ExtExt,
option_ext::OptionExt, option_ext::OptionFutureExt,
option_stream::OptionStream, option_stream::OptionStream,
ready_bool_ext::ReadyBoolExt, ready_bool_ext::ReadyBoolExt,
ready_eq_ext::ReadyEqExt, ready_eq_ext::ReadyEqExt,

View File

@@ -2,7 +2,7 @@
use futures::{Future, FutureExt, future::OptionFuture}; use futures::{Future, FutureExt, future::OptionFuture};
pub trait OptionExt<T> { pub trait OptionFutureExt<T> {
fn is_none_or(self, f: impl FnOnce(&T) -> bool + Send) -> impl Future<Output = bool> + Send; fn is_none_or(self, f: impl FnOnce(&T) -> bool + Send) -> impl Future<Output = bool> + Send;
fn is_some_and(self, f: impl FnOnce(&T) -> bool + Send) -> impl Future<Output = bool> + Send; fn is_some_and(self, f: impl FnOnce(&T) -> bool + Send) -> impl Future<Output = bool> + Send;
@@ -16,7 +16,7 @@ pub trait OptionExt<T> {
T: Default; T: Default;
} }
impl<T, Fut> OptionExt<T> for OptionFuture<Fut> impl<T, Fut> OptionFutureExt<T> for OptionFuture<Fut>
where where
Fut: Future<Output = T> + Send, Fut: Future<Output = T> + Send,
T: Send, T: Send,

View File

@@ -9,6 +9,7 @@ pub mod hash;
pub mod json; pub mod json;
pub mod math; pub mod math;
pub mod mutex_map; pub mod mutex_map;
pub mod option;
pub mod rand; pub mod rand;
pub mod result; pub mod result;
pub mod set; pub mod set;

11
src/core/utils/option.rs Normal file
View File

@@ -0,0 +1,11 @@
use futures::future::OptionFuture;
pub trait OptionExt<T> {
fn map_async<O: Future, F: FnOnce(T) -> O>(self, f: F) -> OptionFuture<O>;
}
impl<T> OptionExt<T> for Option<T> {
fn map_async<O: Future, F: FnOnce(T) -> O>(self, f: F) -> OptionFuture<O> {
OptionFuture::<_>::from(self.map(f))
}
}

View File

@@ -8,7 +8,7 @@ use std::{
use futures::{ use futures::{
FutureExt, StreamExt, TryFutureExt, TryStreamExt, FutureExt, StreamExt, TryFutureExt, TryStreamExt,
future::{OptionFuture, join3, join4}, future::{join3, join4},
}; };
use ruma::{ use ruma::{
CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, OwnedServerName, OwnedUserId, RoomId, CanonicalJsonObject, CanonicalJsonValue, OwnedEventId, OwnedServerName, OwnedUserId, RoomId,
@@ -31,7 +31,7 @@ use tuwunel_core::{
matrix::{event::gen_event_id_canonical_json, room_version}, matrix::{event::gen_event_id_canonical_json, room_version},
pdu::{PduBuilder, format::from_incoming_federation}, pdu::{PduBuilder, format::from_incoming_federation},
state_res, trace, state_res, trace,
utils::{self, IterStream, ReadyExt, future::TryExtExt, math::Expected, shuffle}, utils::{self, BoolExt, IterStream, ReadyExt, future::TryExtExt, math::Expected, shuffle},
warn, warn,
}; };
@@ -544,25 +544,23 @@ pub async fn join_local(
}) })
.await; .await;
let join_authorized_via_users_server: OptionFuture<_> = is_joined_restricted_rooms let join_authorized_via_users_server = is_joined_restricted_rooms.then_async(async || {
.then(async || { self.services
self.services .state_cache
.state_cache .local_users_in_room(room_id)
.local_users_in_room(room_id) .filter(|user| {
.filter(|user| { self.services.state_accessor.user_can_invite(
self.services.state_accessor.user_can_invite( room_id,
room_id, user,
user, sender_user,
sender_user, state_lock,
state_lock, )
) })
}) .map(ToOwned::to_owned)
.map(ToOwned::to_owned) .boxed()
.boxed() .next()
.next() .await
.await });
})
.into();
let displayname = self.services.users.displayname(sender_user).ok(); let displayname = self.services.users.displayname(sender_user).ok();

View File

@@ -4,19 +4,17 @@ mod presence;
use std::{collections::HashMap, sync::Arc, time::Duration}; use std::{collections::HashMap, sync::Arc, time::Duration};
use async_trait::async_trait; use async_trait::async_trait;
use futures::{ use futures::{Stream, StreamExt, TryFutureExt, future::try_join, stream::FuturesUnordered};
Stream, StreamExt, TryFutureExt,
future::{OptionFuture, try_join},
stream::FuturesUnordered,
};
use loole::{Receiver, Sender}; use loole::{Receiver, Sender};
use ruma::{ use ruma::{
DeviceId, OwnedUserId, UInt, UserId, events::presence::PresenceEvent, presence::PresenceState, DeviceId, OwnedUserId, UInt, UserId, events::presence::PresenceEvent, presence::PresenceState,
}; };
use tokio::{sync::RwLock, time::sleep}; use tokio::{sync::RwLock, time::sleep};
use tuwunel_core::{ use tuwunel_core::{
Error, Result, checked, debug, debug_warn, error, result::LogErr, trace, Error, Result, checked, debug, debug_warn, error,
utils::future::OptionExt, result::LogErr,
trace,
utils::{future::OptionFutureExt, option::OptionExt},
}; };
use self::{data::Data, presence::Presence}; use self::{data::Data, presence::Presence};
@@ -164,13 +162,11 @@ impl Service {
return Ok(()); return Ok(());
} }
let update_device_seen: OptionFuture<_> = device_id let update_device_seen = device_id.map_async(|device_id| {
.map(|device_id| { self.services
self.services .users
.users .update_device_last_seen(user_id, device_id, None)
.update_device_last_seen(user_id, device_id, None) });
})
.into();
let status_msg = match last_presence { let status_msg = match last_presence {
| Ok((_, ref presence)) => presence.content.status_msg.clone(), | Ok((_, ref presence)) => presence.content.status_msg.clone(),

View File

@@ -1,9 +1,6 @@
use std::{collections::HashSet, sync::Arc}; use std::{collections::HashSet, sync::Arc};
use futures::{ use futures::{FutureExt, StreamExt, future::join};
FutureExt, StreamExt,
future::{OptionFuture, join},
};
use ruma::{ use ruma::{
RoomId, UserId, RoomId, UserId,
api::client::push::ProfileTag, api::client::push::ProfileTag,
@@ -111,13 +108,11 @@ pub(crate) async fn append_pdu(&self, pdu_id: RawPduId, pdu: &Pdu) -> Result {
.iter() .iter()
.any(|action| matches!(action, Action::SetTweak(Tweak::Highlight(true)))); .any(|action| matches!(action, Action::SetTweak(Tweak::Highlight(true))));
let increment_notify: OptionFuture<_> = notify let increment_notify =
.then(|| self.increment_notificationcount(pdu.room_id(), user)) notify.then_async(|| self.increment_notificationcount(pdu.room_id(), user));
.into();
let increment_highlight: OptionFuture<_> = highlight let increment_highlight =
.then(|| self.increment_highlightcount(pdu.room_id(), user)) highlight.then_async(|| self.increment_highlightcount(pdu.room_id(), user));
.into();
join(increment_notify, increment_highlight).await; join(increment_notify, increment_highlight).await;

View File

@@ -1,7 +1,4 @@
use futures::{ use futures::{FutureExt, TryFutureExt, TryStreamExt, future::try_join5};
FutureExt, TryFutureExt, TryStreamExt,
future::{OptionFuture, try_join5},
};
use ruma::{CanonicalJsonObject, EventId, RoomId, ServerName, UserId, events::StateEventType}; use ruma::{CanonicalJsonObject, EventId, RoomId, ServerName, UserId, events::StateEventType};
use tuwunel_core::{ use tuwunel_core::{
Err, Result, debug, Err, Result, debug,
@@ -9,7 +6,7 @@ use tuwunel_core::{
err, implement, err, implement,
matrix::{Event, room_version}, matrix::{Event, room_version},
trace, trace,
utils::stream::IterStream, utils::{BoolExt, stream::IterStream},
warn, warn,
}; };
@@ -83,11 +80,10 @@ pub async fn handle_incoming_pdu<'a>(
.try_into() .try_into()
.map_err(|e| err!(Request(InvalidParam("PDU does not have a valid sender key: {e}"))))?; .map_err(|e| err!(Request(InvalidParam("PDU does not have a valid sender key: {e}"))))?;
let sender_acl_check: OptionFuture<_> = sender let sender_acl_check = sender
.server_name() .server_name()
.ne(origin) .ne(origin)
.then(|| self.acl_check(sender.server_name(), room_id)) .then_async(|| self.acl_check(sender.server_name(), room_id));
.into();
// Fetch create event // Fetch create event
let create_event = let create_event =

View File

@@ -3,16 +3,16 @@ use std::{
iter::{Iterator, once}, iter::{Iterator, once},
}; };
use futures::{ use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, future::try_join};
FutureExt, StreamExt, TryFutureExt, TryStreamExt,
future::{OptionFuture, try_join},
};
use ruma::{OwnedEventId, RoomId, RoomVersionId}; use ruma::{OwnedEventId, RoomId, RoomVersionId};
use tuwunel_core::{ use tuwunel_core::{
Result, apply, err, implement, Result, apply, err, implement,
matrix::{Event, StateMap, state_res::AuthSet}, matrix::{Event, StateMap, state_res::AuthSet},
ref_at, trace, ref_at, trace,
utils::stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryWidebandExt}, utils::{
option::OptionExt,
stream::{BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryWidebandExt},
},
}; };
use crate::rooms::short::ShortStateHash; use crate::rooms::short::ShortStateHash;
@@ -174,16 +174,15 @@ async fn state_at_incoming_fork<Pdu>(
where where
Pdu: Event, Pdu: Event,
{ {
let leaf: OptionFuture<_> = prev_event let leaf = prev_event
.state_key() .state_key()
.map(async |state_key| { .map_async(async |state_key| {
self.services self.services
.short .short
.get_or_create_shortstatekey(&prev_event.kind().to_cow_str().into(), state_key) .get_or_create_shortstatekey(&prev_event.kind().to_cow_str().into(), state_key)
.map(|shortstatekey| once((shortstatekey, prev_event.event_id().to_owned()))) .map(|shortstatekey| once((shortstatekey, prev_event.event_id().to_owned())))
.await .await
}) });
.into();
let leaf_state_after_event: Vec<_> = self let leaf_state_after_event: Vec<_> = self
.services .services

View File

@@ -6,11 +6,7 @@ use std::{
sync::{Arc, RwLock}, sync::{Arc, RwLock},
}; };
use futures::{ use futures::{Stream, StreamExt, future::join5, pin_mut};
Stream, StreamExt,
future::{OptionFuture, join5},
pin_mut,
};
use ruma::{ use ruma::{
OwnedRoomId, RoomId, ServerName, UserId, OwnedRoomId, RoomId, ServerName, UserId,
events::{AnyStrippedStateEvent, AnySyncStateEvent, room::member::MembershipState}, events::{AnyStrippedStateEvent, AnySyncStateEvent, room::member::MembershipState},
@@ -21,6 +17,7 @@ use tuwunel_core::{
result::LogErr, result::LogErr,
trace, trace,
utils::{ utils::{
BoolExt,
future::OptionStream, future::OptionStream,
stream::{BroadbandExt, ReadyExt, TryIgnore}, stream::{BroadbandExt, ReadyExt, TryIgnore},
}, },
@@ -411,45 +408,41 @@ pub fn user_memberships<'a>(
use MembershipState::*; use MembershipState::*;
use futures::stream::select; use futures::stream::select;
let joined: OptionFuture<_> = mask let joined = mask
.is_none_or(|mask| mask.contains(&Join)) .is_none_or(|mask| mask.contains(&Join))
.then(|| { .then_async(|| {
self.rooms_joined(user_id) self.rooms_joined(user_id)
.map(|room_id| (Join, room_id)) .map(|room_id| (Join, room_id))
.boxed() .boxed()
.into_future() .into_future()
}) });
.into();
let invited: OptionFuture<_> = mask let invited = mask
.is_none_or(|mask| mask.contains(&Invite)) .is_none_or(|mask| mask.contains(&Invite))
.then(|| { .then_async(|| {
self.rooms_invited(user_id) self.rooms_invited(user_id)
.map(|room_id| (Invite, room_id)) .map(|room_id| (Invite, room_id))
.boxed() .boxed()
.into_future() .into_future()
}) });
.into();
let knocked: OptionFuture<_> = mask let knocked = mask
.is_none_or(|mask| mask.contains(&Knock)) .is_none_or(|mask| mask.contains(&Knock))
.then(|| { .then_async(|| {
self.rooms_knocked(user_id) self.rooms_knocked(user_id)
.map(|room_id| (Knock, room_id)) .map(|room_id| (Knock, room_id))
.boxed() .boxed()
.into_future() .into_future()
}) });
.into();
let left: OptionFuture<_> = mask let left = mask
.is_none_or(|mask| mask.contains(&Leave)) .is_none_or(|mask| mask.contains(&Leave))
.then(|| { .then_async(|| {
self.rooms_left(user_id) self.rooms_left(user_id)
.map(|room_id| (Leave, room_id)) .map(|room_id| (Leave, room_id))
.boxed() .boxed()
.into_future() .into_future()
}) });
.into();
select( select(
select(joined.stream(), left.stream()), select(joined.stream(), left.stream()),

View File

@@ -11,7 +11,7 @@ use std::{
use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD}; use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
use futures::{ use futures::{
FutureExt, StreamExt, TryFutureExt, FutureExt, StreamExt, TryFutureExt,
future::{BoxFuture, OptionFuture, join3, try_join3}, future::{BoxFuture, join3, try_join3},
pin_mut, pin_mut,
stream::FuturesUnordered, stream::FuturesUnordered,
}; };
@@ -43,7 +43,7 @@ use tuwunel_core::{
result::LogErr, result::LogErr,
trace, trace,
utils::{ utils::{
ReadyExt, calculate_hash, continue_exponential_backoff_secs, BoolExt, ReadyExt, calculate_hash, continue_exponential_backoff_secs,
future::TryExtExt, future::TryExtExt,
stream::{BroadbandExt, IterStream, WidebandExt}, stream::{BroadbandExt, IterStream, WidebandExt},
}, },
@@ -388,19 +388,17 @@ impl Service {
let device_changes = let device_changes =
self.select_edus_device_changes(server_name, batch, &max_edu_count, &events_len); self.select_edus_device_changes(server_name, batch, &max_edu_count, &events_len);
let receipts: OptionFuture<_> = self let receipts = self
.server .server
.config .config
.allow_outgoing_read_receipts .allow_outgoing_read_receipts
.then(|| self.select_edus_receipts(server_name, batch, &max_edu_count)) .then_async(|| self.select_edus_receipts(server_name, batch, &max_edu_count));
.into();
let presence: OptionFuture<_> = self let presence = self
.server .server
.config .config
.allow_outgoing_presence .allow_outgoing_presence
.then(|| self.select_edus_presence(server_name, batch, &max_edu_count)) .then_async(|| self.select_edus_presence(server_name, batch, &max_edu_count));
.into();
let (device_changes, receipts, presence) = let (device_changes, receipts, presence) =
join3(device_changes, receipts, presence).await; join3(device_changes, receipts, presence).await;