Improve mutable state in sender closure captures.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-07-08 13:16:40 +00:00
parent c8d35cca57
commit aac49b09c7

View File

@@ -11,8 +11,8 @@ use std::{
use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD}; use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
use futures::{ use futures::{
FutureExt, StreamExt, FutureExt, StreamExt,
future::{BoxFuture, OptionFuture}, future::{BoxFuture, OptionFuture, join3},
join, pin_mut, pin_mut,
stream::FuturesUnordered, stream::FuturesUnordered,
}; };
use ruma::{ use ruma::{
@@ -400,7 +400,8 @@ impl Service {
.then(|| self.select_edus_presence(server_name, batch, &max_edu_count)) .then(|| self.select_edus_presence(server_name, batch, &max_edu_count))
.into(); .into();
let (device_changes, receipts, presence) = join!(device_changes, receipts, presence); let (device_changes, receipts, presence) =
join3(device_changes, receipts, presence).await;
let mut events = device_changes; let mut events = device_changes;
events.extend(presence.into_iter().flatten()); events.extend(presence.into_iter().flatten());
@@ -486,7 +487,7 @@ impl Service {
since: (u64, u64), since: (u64, u64),
max_edu_count: &AtomicU64, max_edu_count: &AtomicU64,
) -> Option<EduBuf> { ) -> Option<EduBuf> {
let mut num = 0; let num = AtomicUsize::new(0);
let receipts: BTreeMap<OwnedRoomId, ReceiptMap> = self let receipts: BTreeMap<OwnedRoomId, ReceiptMap> = self
.services .services
.state_cache .state_cache
@@ -494,7 +495,7 @@ impl Service {
.map(ToOwned::to_owned) .map(ToOwned::to_owned)
.broad_filter_map(async |room_id| { .broad_filter_map(async |room_id| {
let receipt_map = self let receipt_map = self
.select_edus_receipts_room(&room_id, since, max_edu_count, &mut num) .select_edus_receipts_room(&room_id, since, max_edu_count, &num)
.await; .await;
receipt_map receipt_map
@@ -530,7 +531,7 @@ impl Service {
room_id: &RoomId, room_id: &RoomId,
since: (u64, u64), since: (u64, u64),
max_edu_count: &AtomicU64, max_edu_count: &AtomicU64,
num: &mut usize, num: &AtomicUsize,
) -> ReceiptMap { ) -> ReceiptMap {
let receipts = self let receipts = self
.services .services
@@ -581,8 +582,8 @@ impl Service {
.insert(user_id.to_owned(), receipt_data) .insert(user_id.to_owned(), receipt_data)
.is_none() .is_none()
{ {
*num = num.saturating_add(1); let num = num.fetch_add(1, Ordering::Relaxed);
if *num >= SELECT_RECEIPT_LIMIT { if num >= SELECT_RECEIPT_LIMIT {
break; break;
} }
} }