diff --git a/src/admin/room/commands.rs b/src/admin/room/commands.rs index 91393cc1..e627dc4c 100644 --- a/src/admin/room/commands.rs +++ b/src/admin/room/commands.rs @@ -19,7 +19,7 @@ pub(super) async fn list_rooms( .rooms .metadata .iter_ids() - .filter_map(|room_id| async move { + .filter_map(async |room_id| { (!exclude_disabled || !self .services @@ -29,7 +29,7 @@ pub(super) async fn list_rooms( .await) .then_some(room_id) }) - .filter_map(|room_id| async move { + .filter_map(async |room_id| { (!exclude_banned || !self .services diff --git a/src/admin/room/info.rs b/src/admin/room/info.rs index 4af51889..646b03fe 100644 --- a/src/admin/room/info.rs +++ b/src/admin/room/info.rs @@ -47,7 +47,7 @@ async fn list_joined_members(&self, room_id: OwnedRoomId, local_only: bool) -> R .unwrap_or(true) }) .map(ToOwned::to_owned) - .filter_map(|user_id| async move { + .filter_map(async |user_id| { Some(( self.services .users diff --git a/src/admin/room/moderation.rs b/src/admin/room/moderation.rs index 9a083d1d..e1f14102 100644 --- a/src/admin/room/moderation.rs +++ b/src/admin/room/moderation.rs @@ -179,7 +179,7 @@ async fn ban_room(&self, room: OwnedRoomOrAliasId) -> Result { .alias .local_aliases_for_room(&room_id) .map(ToOwned::to_owned) - .for_each(|local_alias| async move { + .for_each(async |local_alias| { self.services .rooms .alias @@ -363,7 +363,7 @@ async fn ban_list_of_rooms(&self) -> Result { .alias .local_aliases_for_room(&room_id) .map(ToOwned::to_owned) - .for_each(|local_alias| async move { + .for_each(async |local_alias| { self.services .rooms .alias diff --git a/src/api/client/keys.rs b/src/api/client/keys.rs index 3d155057..56d484a3 100644 --- a/src/api/client/keys.rs +++ b/src/api/client/keys.rs @@ -526,7 +526,7 @@ where let mut futures: FuturesUnordered<_> = get_over_federation .into_iter() - .map(|(server, vec)| async move { + .map(async |(server, vec)| { let mut device_keys_input_fed = BTreeMap::new(); for (user_id, keys) in vec { device_keys_input_fed.insert(user_id.to_owned(), keys.clone()); @@ -656,7 +656,7 @@ pub(crate) async fn claim_keys_helper( let mut futures: FuturesUnordered<_> = get_over_federation .into_iter() - .map(|(server, vec)| async move { + .map(async |(server, vec)| { let mut one_time_keys_input_fed = BTreeMap::new(); for (user_id, keys) in vec { one_time_keys_input_fed.insert(user_id.clone(), keys.clone()); diff --git a/src/api/client/membership/join.rs b/src/api/client/membership/join.rs index 793976c2..41c90e74 100644 --- a/src/api/client/membership/join.rs +++ b/src/api/client/membership/join.rs @@ -550,7 +550,7 @@ async fn join_room_by_id_helper_remote( .validate_and_add_event_id_no_fetch(pdu, &room_version_id) }) .ready_filter_map(Result::ok) - .fold(HashMap::new(), |mut state, (event_id, value)| async move { + .fold(HashMap::new(), async |mut state, (event_id, value)| { let pdu = match PduEvent::from_id_val(&event_id, value.clone()) { | Ok(pdu) => pdu, | Err(e) => { diff --git a/src/api/client/membership/members.rs b/src/api/client/membership/members.rs index d9ae1a38..4cd7e1e2 100644 --- a/src/api/client/membership/members.rs +++ b/src/api/client/membership/members.rs @@ -85,7 +85,7 @@ pub(crate) async fn joined_members_route( .state_cache .room_members(&body.room_id) .map(ToOwned::to_owned) - .broad_then(|user_id| async move { + .broad_then(async |user_id| { let (display_name, avatar_url) = join( services.users.displayname(&user_id).ok(), services.users.avatar_url(&user_id).ok(), diff --git a/src/api/client/message.rs b/src/api/client/message.rs index ffbcc5ef..df408a41 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -151,9 +151,7 @@ pub(crate) async fn get_message_events_route( .map(IterStream::stream) .into_stream() .flatten() - .broad_filter_map(|user_id| async move { - get_member_event(&services, room_id, &user_id).await - }) + .broad_filter_map(async |user_id| get_member_event(&services, room_id, &user_id).await) .collect() .await; diff --git a/src/api/client/profile.rs b/src/api/client/profile.rs index 046cc983..71057cff 100644 --- a/src/api/client/profile.rs +++ b/src/api/client/profile.rs @@ -350,7 +350,7 @@ pub async fn update_displayname( let all_joined_rooms: Vec<_> = all_joined_rooms .iter() .try_stream() - .and_then(|room_id: &OwnedRoomId| async move { + .and_then(async |room_id: &OwnedRoomId| { let pdu = PduBuilder::state(user_id.to_string(), &RoomMemberEventContent { displayname: displayname.clone(), membership: MembershipState::Join, @@ -403,7 +403,7 @@ pub async fn update_avatar_url( let all_joined_rooms: Vec<_> = all_joined_rooms .iter() .try_stream() - .and_then(|room_id: &OwnedRoomId| async move { + .and_then(async |room_id: &OwnedRoomId| { let pdu = PduBuilder::state(user_id.to_string(), &RoomMemberEventContent { avatar_url: avatar_url.clone(), blurhash: blurhash.clone(), diff --git a/src/api/client/search.rs b/src/api/client/search.rs index 5de1a090..56c6b35f 100644 --- a/src/api/client/search.rs +++ b/src/api/client/search.rs @@ -95,13 +95,13 @@ async fn category_room_events( }); let results: Vec<_> = rooms - .filter_map(|room_id| async move { + .filter_map(async |room_id| { check_room_visible(services, sender_user, &room_id, criteria) .await .is_ok() .then_some(room_id) }) - .filter_map(|room_id| async move { + .filter_map(async |room_id| { let query = RoomQuery { room_id: &room_id, user_id: Some(sender_user), @@ -135,7 +135,7 @@ async fn category_room_events( .iter() .stream() .ready_filter(|_| criteria.include_state.is_some_and(is_true!())) - .filter_map(|(room_id, ..)| async move { + .filter_map(async |(room_id, ..)| { procure_room_state(services, room_id) .map_ok(|state| (room_id.clone(), state)) .await diff --git a/src/api/client/space.rs b/src/api/client/space.rs index c4ffe812..12d7f597 100644 --- a/src/api/client/space.rs +++ b/src/api/client/space.rs @@ -166,7 +166,7 @@ where let next_batch: OptionFuture<_> = queue .pop_front() - .map(|(room, _)| async move { + .map(async |(room, _)| { parents.insert(room); let next_short_room_ids: Vec<_> = parents diff --git a/src/api/client/sync/mod.rs b/src/api/client/sync/mod.rs index 4dd1f92e..25e0b666 100644 --- a/src/api/client/sync/mod.rs +++ b/src/api/client/sync/mod.rs @@ -75,7 +75,7 @@ async fn share_encrypted_room( .get_shared_rooms(sender_user, user_id) .ready_filter(|&room_id| Some(room_id) != ignore_room) .map(ToOwned::to_owned) - .broad_any(|other_room_id| async move { + .broad_any(async |other_room_id| { services .rooms .state_accessor diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 00331bbe..0a5f365c 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -237,7 +237,7 @@ pub(crate) async fn build_sync_events( .rooms .state_cache .rooms_invited(sender_user) - .fold_default(|mut invited_rooms: BTreeMap<_, _>, (room_id, invite_state)| async move { + .fold_default(async |mut invited_rooms: BTreeMap<_, _>, (room_id, invite_state)| { let invite_count = services .rooms .state_cache @@ -262,7 +262,7 @@ pub(crate) async fn build_sync_events( .rooms .state_cache .rooms_knocked(sender_user) - .fold_default(|mut knocked_rooms: BTreeMap<_, _>, (room_id, knock_state)| async move { + .fold_default(async |mut knocked_rooms: BTreeMap<_, _>, (room_id, knock_state)| { let knock_count = services .rooms .state_cache @@ -334,7 +334,7 @@ pub(crate) async fn build_sync_events( let device_list_left: HashSet<_> = left_encrypted_users .into_iter() .stream() - .broad_filter_map(|user_id| async move { + .broad_filter_map(async |user_id: OwnedUserId| { share_encrypted_room(services, sender_user, &user_id, None) .await .eq(&false) @@ -620,7 +620,7 @@ async fn load_joined_room( .rooms .read_receipt .readreceipts_since(room_id, since) - .filter_map(|(read_user, _, edu)| async move { + .filter_map(async |(read_user, _, edu)| { services .users .user_is_ignored(read_user, sender_user) @@ -806,7 +806,7 @@ async fn load_joined_room( .rooms .typing .last_typing_update(room_id) - .and_then(|count| async move { + .and_then(async |count| { if count <= since { return Ok(Vec::>::new()); } @@ -1124,8 +1124,9 @@ async fn calculate_state_incremental<'a>( .fold_default(|(mut dlu, mut leu): pair_of!(HashSet<_>), (content, user_id)| async move { use MembershipState::*; - let shares_encrypted_room = - |user_id| share_encrypted_room(services, sender_user, user_id, Some(room_id)); + let shares_encrypted_room = async |user_id| { + share_encrypted_room(services, sender_user, user_id, Some(room_id)).await + }; match content.membership { | Leave => leu.insert(user_id), diff --git a/src/api/client/sync/v5.rs b/src/api/client/sync/v5.rs index 248f53bb..c1dc703c 100644 --- a/src/api/client/sync/v5.rs +++ b/src/api/client/sync/v5.rs @@ -470,7 +470,7 @@ where .rooms .read_receipt .readreceipts_since(room_id, *roomsince) - .filter_map(|(read_user, _ts, v)| async move { + .filter_map(async |(read_user, _ts, v)| { services .users .user_is_ignored(read_user, sender_user) @@ -548,7 +548,7 @@ where let required_state = required_state_request .iter() .stream() - .filter_map(|state| async move { + .filter_map(async |state| { services .rooms .state_accessor diff --git a/src/api/client/threads.rs b/src/api/client/threads.rs index a97fb4d3..b8714725 100644 --- a/src/api/client/threads.rs +++ b/src/api/client/threads.rs @@ -1,5 +1,5 @@ use axum::extract::State; -use futures::StreamExt; +use futures::{StreamExt, TryStreamExt}; use ruma::{api::client::threads::get_threads, uint}; use tuwunel_core::{ Result, at, @@ -35,18 +35,17 @@ pub(crate) async fn get_threads_route( .rooms .threads .threads_until(body.sender_user(), &body.room_id, from, &body.include) - .await? .take(limit) - .filter_map(|(count, pdu)| async move { - services + .try_filter_map(async |(count, pdu)| { + Ok(services .rooms .state_accessor .user_can_see_event(body.sender_user(), &body.room_id, &pdu.event_id) .await - .then_some((count, pdu)) + .then_some((count, pdu))) }) - .collect() - .await; + .try_collect() + .await?; Ok(get_threads::v1::Response { next_batch: threads diff --git a/src/api/server/backfill.rs b/src/api/server/backfill.rs index c9cab812..daa64699 100644 --- a/src/api/server/backfill.rs +++ b/src/api/server/backfill.rs @@ -64,7 +64,7 @@ pub(crate) async fn get_backfill_route( .timeline .pdus_rev(None, &body.room_id, Some(from.saturating_add(1))) .try_take(limit) - .try_filter_map(|(_, pdu)| async move { + .try_filter_map(async |(_, pdu)| { Ok(services .rooms .state_accessor @@ -72,7 +72,7 @@ pub(crate) async fn get_backfill_route( .await .then_some(pdu)) }) - .try_filter_map(|pdu| async move { + .try_filter_map(async |pdu| { Ok(services .rooms .timeline diff --git a/src/api/server/event_auth.rs b/src/api/server/event_auth.rs index 09394362..111d5c35 100644 --- a/src/api/server/event_auth.rs +++ b/src/api/server/event_auth.rs @@ -49,7 +49,7 @@ pub(crate) async fn get_event_authorization_route( .auth_chain .event_ids_iter(room_id, once(body.event_id.borrow())) .ready_filter_map(Result::ok) - .filter_map(|id| async move { + .filter_map(async |id| { services .rooms .timeline diff --git a/src/api/server/hierarchy.rs b/src/api/server/hierarchy.rs index d53c5440..d5bc62d8 100644 --- a/src/api/server/hierarchy.rs +++ b/src/api/server/hierarchy.rs @@ -45,7 +45,7 @@ pub(crate) async fn get_hierarchy_route( let (children, inaccessible_children) = get_parent_children_via(&room, suggested_only) .stream() - .broad_filter_map(|(child, _via)| async move { + .broad_filter_map(async |(child, _via)| { match services .rooms .spaces diff --git a/src/api/server/send.rs b/src/api/server/send.rs index 3d5fe8a9..12b3d7a3 100644 --- a/src/api/server/send.rs +++ b/src/api/server/send.rs @@ -158,10 +158,11 @@ async fn handle( let results: ResolvedMap = pdus .into_iter() .try_stream() - .broad_and_then(|(room_id, pdus): (_, Vec<_>)| { - handle_room(services, client, origin, started, room_id, pdus.into_iter()) - .map_ok(Vec::into_iter) + .broad_and_then(async |(room_id, pdus): (_, Vec<_>)| { + handle_room(services, client, origin, &started, room_id, pdus.into_iter()) + .map_ok(ResolvedMap::into_iter) .map_ok(IterStream::try_stream) + .await }) .try_flatten() .try_collect() @@ -180,28 +181,27 @@ async fn handle_room( services: &Services, _client: &IpAddr, origin: &ServerName, - txn_start_time: Instant, - room_id: OwnedRoomId, + txn_start_time: &Instant, + ref room_id: OwnedRoomId, pdus: impl Iterator + Send, -) -> Result> { +) -> Result { let _room_lock = services .rooms .event_handler .mutex_federation - .lock(&room_id) + .lock(room_id) .await; - let room_id = &room_id; pdus.try_stream() - .and_then(|(_, event_id, value)| async move { + .and_then(async |(room_id, event_id, value)| { services.server.check_running()?; let pdu_start_time = Instant::now(); let result = services .rooms .event_handler - .handle_incoming_pdu(origin, room_id, &event_id, value, true) - .await - .map(|_| ()); + .handle_incoming_pdu(origin, &room_id, &event_id, value, true) + .map_ok(|_| ()) + .await; debug!( pdu_elapsed = ?pdu_start_time.elapsed(), @@ -329,7 +329,7 @@ async fn handle_edu_receipt_room( .read .into_iter() .stream() - .for_each_concurrent(automatic_width(), |(user_id, user_updates)| async move { + .for_each_concurrent(automatic_width(), async |(user_id, user_updates)| { handle_edu_receipt_room_user(services, origin, room_id, &user_id, user_updates).await; }) .await; @@ -368,7 +368,7 @@ async fn handle_edu_receipt_room_user( .event_ids .into_iter() .stream() - .for_each_concurrent(automatic_width(), |event_id| async move { + .for_each_concurrent(automatic_width(), async |event_id| { let user_data = [(user_id.to_owned(), data.clone())]; let receipts = [(ReceiptType::Read, BTreeMap::from(user_data))]; let content = [(event_id.clone(), BTreeMap::from(receipts))]; diff --git a/src/api/server/send_join.rs b/src/api/server/send_join.rs index 088dfd83..b6b1cd10 100644 --- a/src/api/server/send_join.rs +++ b/src/api/server/send_join.rs @@ -242,7 +242,7 @@ async fn create_join_event( .rooms .auth_chain .event_ids_iter(room_id, starting_events) - .broad_and_then(|event_id| async move { + .broad_and_then(async |event_id| { services .rooms .timeline diff --git a/src/api/server/state.rs b/src/api/server/state.rs index 34880143..00798b85 100644 --- a/src/api/server/state.rs +++ b/src/api/server/state.rs @@ -56,7 +56,7 @@ pub(crate) async fn get_room_state_route( .rooms .auth_chain .event_ids_iter(&body.room_id, once(body.event_id.borrow())) - .and_then(|id| async move { services.rooms.timeline.get_pdu_json(&id).await }) + .and_then(async |id| services.rooms.timeline.get_pdu_json(&id).await) .and_then(|pdu| { services .sending diff --git a/src/api/server/user.rs b/src/api/server/user.rs index d2742f5f..406ac72d 100644 --- a/src/api/server/user.rs +++ b/src/api/server/user.rs @@ -40,7 +40,7 @@ pub(crate) async fn get_devices_route( devices: services .users .all_devices_metadata(user_id) - .filter_map(|metadata| async move { + .filter_map(async |metadata| { let device_id = metadata.device_id.clone(); let device_id_clone = device_id.clone(); let device_id_string = device_id.as_str().to_owned(); diff --git a/src/core/utils/set.rs b/src/core/utils/set.rs index 9d2d8aea..689668c8 100644 --- a/src/core/utils/set.rs +++ b/src/core/utils/set.rs @@ -66,7 +66,7 @@ where let b = Arc::new(Mutex::new(b.peekable())); a.map(move |ai| (ai, b.clone())) - .filter_map(|(ai, b)| async move { + .filter_map(async move |(ai, b)| { let mut lock = b.lock().await; while let Some(bi) = Pin::new(&mut *lock) .next_if(|bi| *bi <= ai) diff --git a/src/service/appservice/mod.rs b/src/service/appservice/mod.rs index e0d69ccb..5ef0e398 100644 --- a/src/service/appservice/mod.rs +++ b/src/service/appservice/mod.rs @@ -213,7 +213,7 @@ impl Service { self.db .id_appserviceregistrations .keys() - .and_then(move |id: &str| async move { + .and_then(async move |id: &str| { Ok((id.to_owned(), self.get_db_registration(id).await?)) }) } diff --git a/src/service/rooms/auth_chain/mod.rs b/src/service/rooms/auth_chain/mod.rs index 52d41d68..36bffb30 100644 --- a/src/service/rooms/auth_chain/mod.rs +++ b/src/service/rooms/auth_chain/mod.rs @@ -146,7 +146,7 @@ async fn get_auth_chain_outer( let chunk_cache: Vec<_> = chunk .into_iter() .try_stream() - .broad_and_then(|(shortid, event_id)| async move { + .broad_and_then(async |(shortid, event_id)| { if let Ok(cached) = self .get_cached_eventid_authchain(&[shortid]) .await diff --git a/src/service/rooms/event_handler/fetch_prev.rs b/src/service/rooms/event_handler/fetch_prev.rs index ee6956fa..25a78367 100644 --- a/src/service/rooms/event_handler/fetch_prev.rs +++ b/src/service/rooms/event_handler/fetch_prev.rs @@ -3,7 +3,7 @@ use std::{ iter::once, }; -use futures::{FutureExt, future}; +use futures::FutureExt; use ruma::{ CanonicalJsonValue, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, RoomId, ServerName, int, uint, @@ -110,7 +110,7 @@ where } } - let event_fetch = |event_id| { + let event_fetch = async |event_id: OwnedEventId| { let origin_server_ts = eventid_info .get(&event_id) .map_or_else(|| uint!(0), |info| info.0.origin_server_ts().get()); @@ -118,7 +118,7 @@ where // This return value is the key used for sorting events, // events are then sorted by power level, time, // and lexically by event_id. - future::ok((int!(0), MilliSecondsSinceUnixEpoch(origin_server_ts))) + Ok((int!(0), MilliSecondsSinceUnixEpoch(origin_server_ts))) }; let sorted = state_res::lexicographical_topological_sort(&graph, &event_fetch) diff --git a/src/service/rooms/event_handler/state_at_incoming.rs b/src/service/rooms/event_handler/state_at_incoming.rs index a671f1f1..5ff50a3c 100644 --- a/src/service/rooms/event_handler/state_at_incoming.rs +++ b/src/service/rooms/event_handler/state_at_incoming.rs @@ -128,7 +128,7 @@ where new_state .into_iter() .stream() - .broad_then(|((event_type, state_key), event_id)| async move { + .broad_then(async |((event_type, state_key), event_id)| { self.services .short .get_or_create_shortstatekey(&event_type, &state_key) diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index 4db8a0b3..e5fa640a 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -167,7 +167,7 @@ where .prev_events() .any(is_equal_to!(event_id)) }) - .broad_filter_map(|event_id| async move { + .broad_filter_map(async |event_id| { // Only keep those extremities were not referenced yet self.services .pdu_metadata diff --git a/src/service/rooms/pdu_metadata/data.rs b/src/service/rooms/pdu_metadata/data.rs index a98b05ee..92f426d1 100644 --- a/src/service/rooms/pdu_metadata/data.rs +++ b/src/service/rooms/pdu_metadata/data.rs @@ -84,9 +84,9 @@ impl Data { .ready_take_while(move |key| key.starts_with(&target.to_be_bytes())) .map(|to_from| u64_from_u8(&to_from[8..16])) .map(PduCount::from_unsigned) - .wide_filter_map(move |shorteventid| async move { + .map(move |shorteventid| (user_id, shortroomid, shorteventid)) + .wide_filter_map(async |(user_id, shortroomid, shorteventid)| { let pdu_id: RawPduId = PduId { shortroomid, shorteventid }.into(); - let mut pdu = self .services .timeline diff --git a/src/service/rooms/search/mod.rs b/src/service/rooms/search/mod.rs index eb1ac0e9..cc3c9d38 100644 --- a/src/service/rooms/search/mod.rs +++ b/src/service/rooms/search/mod.rs @@ -112,7 +112,7 @@ pub async fn search_pdus<'a>( let pdus = pdu_ids .into_iter() .stream() - .wide_filter_map(move |result_pdu_id: RawPduId| async move { + .wide_filter_map(async |result_pdu_id: RawPduId| { self.services .timeline .get_pdu_from_id(&result_pdu_id) @@ -121,7 +121,7 @@ pub async fn search_pdus<'a>( }) .ready_filter(|pdu| !pdu.is_redacted()) .ready_filter(move |pdu| filter.matches(pdu)) - .wide_filter_map(move |pdu| async move { + .wide_filter_map(async |pdu| { self.services .state_accessor .user_can_see_event(query.user_id?, pdu.room_id(), pdu.event_id()) @@ -164,7 +164,7 @@ async fn search_pdu_ids_query_room( ) -> Vec> { tokenize(&query.criteria.search_term) .stream() - .wide_then(|word| async move { + .wide_then(async |word| { self.search_pdu_ids_query_words(shortroomid, &word) .collect::>() .await diff --git a/src/service/rooms/spaces/mod.rs b/src/service/rooms/spaces/mod.rs index cde57b29..a37253da 100644 --- a/src/service/rooms/spaces/mod.rs +++ b/src/service/rooms/spaces/mod.rs @@ -270,7 +270,7 @@ fn get_space_child_events<'a>( .map(IterStream::stream) .map(StreamExt::flatten) .flatten_stream() - .broad_filter_map(move |(state_key, event_id): (_, OwnedEventId)| async move { + .broad_filter_map(async move |(state_key, event_id): (_, OwnedEventId)| { self.services .timeline .get_pdu(&event_id) diff --git a/src/service/rooms/state_accessor/state.rs b/src/service/rooms/state_accessor/state.rs index b85d2075..7c0c7a45 100644 --- a/src/service/rooms/state_accessor/state.rs +++ b/src/service/rooms/state_accessor/state.rs @@ -338,7 +338,7 @@ pub fn state_full_pdus( .short .multi_get_eventid_from_short(short_ids) .ready_filter_map(Result::ok) - .broad_filter_map(move |event_id: OwnedEventId| async move { + .broad_filter_map(async |event_id: OwnedEventId| { self.services .timeline .get_pdu(&event_id) diff --git a/src/service/rooms/timeline/append.rs b/src/service/rooms/timeline/append.rs index 62454006..57e55464 100644 --- a/src/service/rooms/timeline/append.rs +++ b/src/service/rooms/timeline/append.rs @@ -197,7 +197,7 @@ where .map(ToOwned::to_owned) // Don't notify the sender of their own events, and dont send from ignored users .ready_filter(|user| *user != pdu.sender()) - .filter_map(|recipient_user| async move { (!self.services.users.user_is_ignored(pdu.sender(), &recipient_user).await).then_some(recipient_user) }) + .filter_map(async |recipient_user| self.services.users.user_is_ignored(pdu.sender(), &recipient_user).await.eq(&false).then_some(recipient_user)) .collect() .await; diff --git a/src/service/rooms/timeline/backfill.rs b/src/service/rooms/timeline/backfill.rs index 2e834ef0..e98cac8f 100644 --- a/src/service/rooms/timeline/backfill.rs +++ b/src/service/rooms/timeline/backfill.rs @@ -95,7 +95,7 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re .stream(), ) .ready_filter(|server_name| !self.services.globals.server_is_ours(server_name)) - .filter_map(|server_name| async move { + .filter_map(async |server_name| { self.services .state_cache .server_in_room(&server_name, room_id) diff --git a/src/service/rooms/typing/mod.rs b/src/service/rooms/typing/mod.rs index 25e399d9..aa786341 100644 --- a/src/service/rooms/typing/mod.rs +++ b/src/service/rooms/typing/mod.rs @@ -205,12 +205,12 @@ impl Service { let user_ids: Vec<_> = typing_indicators .into_keys() .stream() - .filter_map(|typing_user_id| async move { - (!self - .services + .filter_map(async |typing_user_id| { + self.services .users .user_is_ignored(&typing_user_id, sender_user) - .await) + .await + .eq(&false) .then_some(typing_user_id) }) .collect() diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index f50deaff..d38eb469 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -492,7 +492,7 @@ impl Service { .state_cache .server_rooms(server_name) .map(ToOwned::to_owned) - .broad_filter_map(|room_id| async move { + .broad_filter_map(async |room_id| { let receipt_map = self .select_edus_receipts_room(&room_id, since, max_edu_count, &mut num) .await; diff --git a/src/service/services.rs b/src/service/services.rs index b882409f..e225e347 100644 --- a/src/service/services.rs +++ b/src/service/services.rs @@ -173,7 +173,7 @@ impl Services { pub async fn clear_cache(&self) { self.services() - .for_each(|service| async move { + .for_each(async |service| { service.clear_cache().await; }) .await; @@ -182,7 +182,7 @@ impl Services { pub async fn memory_usage(&self) -> Result { self.services() .map(Ok) - .try_fold(String::new(), |mut out, service| async move { + .try_fold(String::new(), async |mut out, service| { service.memory_usage(&mut out).await?; Ok(out) })