From aac49b09c7f5434e78f00d6618bbb947512a23a4 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Tue, 8 Jul 2025 13:16:40 +0000 Subject: [PATCH] Improve mutable state in sender closure captures. Signed-off-by: Jason Volk --- src/service/sending/sender.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index d38eb469..77fd166c 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -11,8 +11,8 @@ use std::{ use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD}; use futures::{ FutureExt, StreamExt, - future::{BoxFuture, OptionFuture}, - join, pin_mut, + future::{BoxFuture, OptionFuture, join3}, + pin_mut, stream::FuturesUnordered, }; use ruma::{ @@ -400,7 +400,8 @@ impl Service { .then(|| self.select_edus_presence(server_name, batch, &max_edu_count)) .into(); - let (device_changes, receipts, presence) = join!(device_changes, receipts, presence); + let (device_changes, receipts, presence) = + join3(device_changes, receipts, presence).await; let mut events = device_changes; events.extend(presence.into_iter().flatten()); @@ -486,7 +487,7 @@ impl Service { since: (u64, u64), max_edu_count: &AtomicU64, ) -> Option { - let mut num = 0; + let num = AtomicUsize::new(0); let receipts: BTreeMap = self .services .state_cache @@ -494,7 +495,7 @@ impl Service { .map(ToOwned::to_owned) .broad_filter_map(async |room_id| { let receipt_map = self - .select_edus_receipts_room(&room_id, since, max_edu_count, &mut num) + .select_edus_receipts_room(&room_id, since, max_edu_count, &num) .await; receipt_map @@ -530,7 +531,7 @@ impl Service { room_id: &RoomId, since: (u64, u64), max_edu_count: &AtomicU64, - num: &mut usize, + num: &AtomicUsize, ) -> ReceiptMap { let receipts = self .services @@ -581,8 +582,8 @@ impl Service { .insert(user_id.to_owned(), receipt_data) .is_none() { - *num = num.saturating_add(1); - if *num >= SELECT_RECEIPT_LIMIT { + let num = num.fetch_add(1, Ordering::Relaxed); + if num >= SELECT_RECEIPT_LIMIT { break; } }