diff --git a/clippy.toml b/clippy.toml index 9003a2bf..bd5d4b2a 100644 --- a/clippy.toml +++ b/clippy.toml @@ -1,7 +1,7 @@ array-size-threshold = 4096 cognitive-complexity-threshold = 100 # TODO reduce me ALARA excessive-nesting-threshold = 11 # TODO reduce me to 4 or 5 -future-size-threshold = 7745 # TODO reduce me ALARA +future-size-threshold = 8192 stack-size-threshold = 196608 # TODO reduce me ALARA too-many-lines-threshold = 780 # TODO reduce me to <= 100 type-complexity-threshold = 250 # reduce me to ~200 diff --git a/src/admin/user/commands.rs b/src/admin/user/commands.rs index 5e866a19..15678692 100644 --- a/src/admin/user/commands.rs +++ b/src/admin/user/commands.rs @@ -236,7 +236,10 @@ pub(super) async fn deactivate(&self, no_leave_rooms: bool, user_id: String) -> .collect() .await; - full_user_deactivate(self.services, &user_id, &all_joined_rooms).await?; + full_user_deactivate(self.services, &user_id, &all_joined_rooms) + .boxed() + .await?; + update_displayname(self.services, &user_id, None, &all_joined_rooms).await; update_avatar_url(self.services, &user_id, None, None, &all_joined_rooms).await; leave_all_rooms(self.services, &user_id).await; @@ -358,7 +361,10 @@ pub(super) async fn deactivate_all(&self, no_leave_rooms: bool, force: bool) -> .collect() .await; - full_user_deactivate(self.services, &user_id, &all_joined_rooms).await?; + full_user_deactivate(self.services, &user_id, &all_joined_rooms) + .boxed() + .await?; + update_displayname(self.services, &user_id, None, &all_joined_rooms).await; update_avatar_url(self.services, &user_id, None, None, &all_joined_rooms) .await; diff --git a/src/api/client/membership/join.rs b/src/api/client/membership/join.rs index 41c90e74..7fb370e7 100644 --- a/src/api/client/membership/join.rs +++ b/src/api/client/membership/join.rs @@ -2,7 +2,7 @@ use std::{borrow::Borrow, collections::HashMap, iter::once, sync::Arc}; use axum::extract::State; use axum_client_ip::InsecureClientIp; -use futures::{FutureExt, StreamExt}; +use futures::{FutureExt, StreamExt, pin_mut}; use ruma::{ CanonicalJsonObject, CanonicalJsonValue, OwnedRoomId, OwnedServerName, OwnedUserId, RoomId, RoomVersionId, UserId, @@ -747,7 +747,7 @@ async fn join_room_by_id_helper_local( }) .await { - services + let users = services .rooms .state_cache .local_users_in_room(room_id) @@ -759,10 +759,10 @@ async fn join_room_by_id_helper_local( &state_lock, ) }) - .boxed() - .next() - .await - .map(ToOwned::to_owned) + .map(ToOwned::to_owned); + + pin_mut!(users); + users.next().await } else { None } diff --git a/src/api/client/message.rs b/src/api/client/message.rs index df408a41..005c0a97 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -100,7 +100,6 @@ pub(crate) async fn get_message_events_route( .rooms .timeline .backfill_if_required(room_id, from) - .boxed() .await .log_err() .ok(); diff --git a/src/api/client/relations.rs b/src/api/client/relations.rs index 5effde20..a8d9ef37 100644 --- a/src/api/client/relations.rs +++ b/src/api/client/relations.rs @@ -175,7 +175,7 @@ async fn paginate_relations_with_filter( }) } -async fn visibility_filter( +async fn visibility_filter( services: &Services, sender_user: &UserId, item: (PduCount, Pdu), diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 0a5f365c..a192a44e 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -728,7 +728,6 @@ async fn load_joined_room( joined_since_last_sync, witness.as_ref(), ) - .boxed() .await?; let is_sender_membership = |pdu: &PduEvent| { @@ -1075,8 +1074,7 @@ async fn calculate_state_incremental<'a>( .rooms .state_accessor .state_full_shortids(current_shortstatehash) - .expect_ok() - .boxed(), + .expect_ok(), ) }) .into(); @@ -1108,6 +1106,7 @@ async fn calculate_state_incremental<'a>( .ok() }) .collect::>() + .boxed() .await; let (device_list_updates, left_encrypted_users) = state_events diff --git a/src/api/server/make_join.rs b/src/api/server/make_join.rs index 506a3038..146bbb72 100644 --- a/src/api/server/make_join.rs +++ b/src/api/server/make_join.rs @@ -1,5 +1,5 @@ use axum::extract::State; -use futures::StreamExt; +use futures::{StreamExt, pin_mut}; use ruma::{ CanonicalJsonObject, OwnedUserId, RoomId, RoomVersionId, UserId, api::{client::error::ErrorKind, federation::membership::prepare_join_event}, @@ -105,7 +105,7 @@ pub(crate) async fn create_join_event_template_route( ) .await? { - let Some(auth_user) = services + let users = services .rooms .state_cache .local_users_in_room(&body.room_id) @@ -117,15 +117,15 @@ pub(crate) async fn create_join_event_template_route( &state_lock, ) }) - .boxed() - .next() - .await - .map(ToOwned::to_owned) - else { + .map(ToOwned::to_owned); + + pin_mut!(users); + let Some(auth_user) = users.next().await else { return Err!(Request(UnableToGrantJoin( "No user on this server is able to assist in joining." ))); }; + Some(auth_user) } else { None diff --git a/src/service/admin/grant.rs b/src/service/admin/grant.rs index d9a513e6..84c8af0b 100644 --- a/src/service/admin/grant.rs +++ b/src/service/admin/grant.rs @@ -1,5 +1,6 @@ use std::collections::BTreeMap; +use futures::FutureExt; use ruma::{ RoomId, UserId, events::{ @@ -239,6 +240,7 @@ pub async fn revoke_admin(&self, user_id: &UserId) -> Result { &room_id, &state_lock, ) + .boxed() .await .map(|_| ()) } diff --git a/src/service/admin/mod.rs b/src/service/admin/mod.rs index dadf351b..c30f4566 100644 --- a/src/service/admin/mod.rs +++ b/src/service/admin/mod.rs @@ -321,6 +321,7 @@ impl Service { .await { self.handle_response_error(e, room_id, user_id, &state_lock) + .boxed() .await .unwrap_or_else(default_log); } @@ -344,6 +345,7 @@ impl Service { self.services .timeline .build_and_append_pdu(PduBuilder::timeline(&content), user_id, room_id, state_lock) + .boxed() .await?; Ok(()) diff --git a/src/service/rooms/auth_chain/mod.rs b/src/service/rooms/auth_chain/mod.rs index 36bffb30..45b06cde 100644 --- a/src/service/rooms/auth_chain/mod.rs +++ b/src/service/rooms/auth_chain/mod.rs @@ -7,7 +7,7 @@ use std::{ time::Instant, }; -use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; +use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, pin_mut}; use ruma::{EventId, OwnedEventId, RoomId}; use tuwunel_core::{ Err, Result, at, debug, debug_error, implement, trace, @@ -80,13 +80,13 @@ where const BUCKET: Bucket<'_> = BTreeSet::new(); let started = Instant::now(); - let mut starting_ids = self + let starting_ids = self .services .short .multi_get_or_create_shorteventid(starting_events.clone()) - .zip(starting_events.clone().stream()) - .boxed(); + .zip(starting_events.clone().stream()); + pin_mut!(starting_ids); let mut buckets = [BUCKET; NUM_BUCKETS]; while let Some((short, starting_event)) = starting_ids.next().await { let bucket: usize = short.try_into()?; diff --git a/src/service/rooms/event_handler/handle_incoming_pdu.rs b/src/service/rooms/event_handler/handle_incoming_pdu.rs index acc8ee54..4b95b63a 100644 --- a/src/service/rooms/event_handler/handle_incoming_pdu.rs +++ b/src/service/rooms/event_handler/handle_incoming_pdu.rs @@ -112,6 +112,7 @@ pub async fn handle_incoming_pdu<'a>( let (incoming_pdu, val) = self .handle_outlier_pdu(origin, create_event, event_id, room_id, value, false) + .boxed() .await?; // 8. if not timeline event: stop diff --git a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs index e5fa640a..c4102c7e 100644 --- a/src/service/rooms/event_handler/upgrade_outlier_pdu.rs +++ b/src/service/rooms/event_handler/upgrade_outlier_pdu.rs @@ -61,6 +61,7 @@ where .await? } else { self.state_at_incoming_resolved(&incoming_pdu, room_id, &room_version_id) + .boxed() .await? }; @@ -215,6 +216,7 @@ where let new_room_state = self .resolve_state(room_id, &room_version_id, state_after) + .boxed() .await?; // Set the new room state to the resolved state diff --git a/src/service/rooms/timeline/backfill.rs b/src/service/rooms/timeline/backfill.rs index e98cac8f..f48b15f8 100644 --- a/src/service/rooms/timeline/backfill.rs +++ b/src/service/rooms/timeline/backfill.rs @@ -118,14 +118,11 @@ pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Re }, ) .await; + match response { | Ok(response) => { for pdu in response.pdus { - if let Err(e) = self - .backfill_pdu(backfill_server, pdu) - .boxed() - .await - { + if let Err(e) = self.backfill_pdu(backfill_server, pdu).await { debug_warn!("Failed to add backfilled pdu in room {room_id}: {e}"); } } diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index 77fd166c..fb8abd2a 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -253,9 +253,11 @@ impl Service { ) { let keep = usize::try_from(self.server.config.startup_netburst_keep).unwrap_or(usize::MAX); - let mut txns = HashMap::>::new(); - let mut active = self.db.active_requests().boxed(); + let mut txns = HashMap::>::new(); + let active = self.db.active_requests(); + + pin_mut!(active); while let Some((key, event, dest)) = active.next().await { if self.shard_id(&dest) != id { continue; @@ -505,6 +507,7 @@ impl Service { .then_some((room_id, receipt_map)) }) .collect() + .boxed() .await; if receipts.is_empty() {