diff --git a/src/api/client/message.rs b/src/api/client/message.rs index 005c0a97..4eb125e4 100644 --- a/src/api/client/message.rs +++ b/src/api/client/message.rs @@ -192,10 +192,11 @@ where .max() .unwrap_or_else(PduCount::max); - let receipts = services - .rooms - .read_receipt - .readreceipts_since(lazy_loading_context.room_id, oldest.into_unsigned()); + let receipts = services.rooms.read_receipt.readreceipts_since( + lazy_loading_context.room_id, + oldest.into_unsigned(), + Some(newest.into_unsigned()), + ); pin_mut!(receipts); let witness: Witness = events diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 09cccfdc..706dbb96 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -645,7 +645,7 @@ async fn load_joined_room( let receipt_events = services .rooms .read_receipt - .readreceipts_since(room_id, since) + .readreceipts_since(room_id, since, Some(next_batch)) .filter_map(async |(read_user, _, edu)| { services .users diff --git a/src/api/client/sync/v5.rs b/src/api/client/sync/v5.rs index 6d29e836..a4073841 100644 --- a/src/api/client/sync/v5.rs +++ b/src/api/client/sync/v5.rs @@ -464,7 +464,7 @@ where let mut receipts: Vec> = services .rooms .read_receipt - .readreceipts_since(room_id, *roomsince) + .readreceipts_since(room_id, *roomsince, Some(next_batch)) .filter_map(async |(read_user, _ts, v)| { services .users diff --git a/src/service/account_data/mod.rs b/src/service/account_data/mod.rs index d8d4d033..815df853 100644 --- a/src/service/account_data/mod.rs +++ b/src/service/account_data/mod.rs @@ -75,6 +75,7 @@ pub async fn update( .roomusertype_roomuserdataid .qry(&key) .await; + self.db .roomusertype_roomuserdataid .put(key, roomuserdataid); diff --git a/src/service/rooms/read_receipt/data.rs b/src/service/rooms/read_receipt/data.rs index 11fb469b..5a0971f8 100644 --- a/src/service/rooms/read_receipt/data.rs +++ b/src/service/rooms/read_receipt/data.rs @@ -66,6 +66,7 @@ impl Data { &'a self, room_id: &'a RoomId, since: u64, + to: Option, ) -> impl Stream> + Send + 'a { type Key<'a> = (&'a RoomId, u64, &'a UserId); type KeyVal<'a> = (Key<'a>, CanonicalJsonObject); @@ -76,7 +77,9 @@ impl Data { self.readreceiptid_readreceipt .stream_from(&first_possible_edu) .ignore_err() - .ready_take_while(move |((r, ..), _): &KeyVal<'_>| *r == room_id) + .ready_take_while(move |((r, c, ..), _): &KeyVal<'_>| { + *r == room_id && to.is_none_or(|to| *c <= to) + }) .map(move |((_, count, user_id), mut json): KeyVal<'_>| { json.remove("room_id"); diff --git a/src/service/rooms/read_receipt/mod.rs b/src/service/rooms/read_receipt/mod.rs index a06afefc..ecb43bd5 100644 --- a/src/service/rooms/read_receipt/mod.rs +++ b/src/service/rooms/read_receipt/mod.rs @@ -126,8 +126,9 @@ impl Service { &'a self, room_id: &'a RoomId, since: u64, + to: Option, ) -> impl Stream> + Send + 'a { - self.db.readreceipts_since(room_id, since) + self.db.readreceipts_since(room_id, since, to) } /// Sets a private read marker at PDU `count`. diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index 3552def8..6672d6e0 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -536,17 +536,15 @@ impl Service { max_edu_count: &AtomicU64, num: &AtomicUsize, ) -> ReceiptMap { - let receipts = self - .services - .read_receipt - .readreceipts_since(room_id, since.0); + let receipts = + self.services + .read_receipt + .readreceipts_since(room_id, since.0, Some(since.1)); pin_mut!(receipts); let mut read = BTreeMap::::new(); while let Some((user_id, count, read_receipt)) = receipts.next().await { - if count > since.1 { - break; - } + debug_assert!(count <= since.1, "exceeds upper-bound"); max_edu_count.fetch_max(count, Ordering::Relaxed); if !self.services.globals.user_is_local(user_id) {