Add upper-bound to readreceipts_since() and callsites.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-07-26 04:44:09 +00:00
parent 33a9fce828
commit c6836e51b2
7 changed files with 19 additions and 15 deletions

View File

@@ -192,10 +192,11 @@ where
.max()
.unwrap_or_else(PduCount::max);
let receipts = services
.rooms
.read_receipt
.readreceipts_since(lazy_loading_context.room_id, oldest.into_unsigned());
let receipts = services.rooms.read_receipt.readreceipts_since(
lazy_loading_context.room_id,
oldest.into_unsigned(),
Some(newest.into_unsigned()),
);
pin_mut!(receipts);
let witness: Witness = events

View File

@@ -645,7 +645,7 @@ async fn load_joined_room(
let receipt_events = services
.rooms
.read_receipt
.readreceipts_since(room_id, since)
.readreceipts_since(room_id, since, Some(next_batch))
.filter_map(async |(read_user, _, edu)| {
services
.users

View File

@@ -464,7 +464,7 @@ where
let mut receipts: Vec<Raw<AnySyncEphemeralRoomEvent>> = services
.rooms
.read_receipt
.readreceipts_since(room_id, *roomsince)
.readreceipts_since(room_id, *roomsince, Some(next_batch))
.filter_map(async |(read_user, _ts, v)| {
services
.users

View File

@@ -75,6 +75,7 @@ pub async fn update(
.roomusertype_roomuserdataid
.qry(&key)
.await;
self.db
.roomusertype_roomuserdataid
.put(key, roomuserdataid);

View File

@@ -66,6 +66,7 @@ impl Data {
&'a self,
room_id: &'a RoomId,
since: u64,
to: Option<u64>,
) -> impl Stream<Item = ReceiptItem<'_>> + Send + 'a {
type Key<'a> = (&'a RoomId, u64, &'a UserId);
type KeyVal<'a> = (Key<'a>, CanonicalJsonObject);
@@ -76,7 +77,9 @@ impl Data {
self.readreceiptid_readreceipt
.stream_from(&first_possible_edu)
.ignore_err()
.ready_take_while(move |((r, ..), _): &KeyVal<'_>| *r == room_id)
.ready_take_while(move |((r, c, ..), _): &KeyVal<'_>| {
*r == room_id && to.is_none_or(|to| *c <= to)
})
.map(move |((_, count, user_id), mut json): KeyVal<'_>| {
json.remove("room_id");

View File

@@ -126,8 +126,9 @@ impl Service {
&'a self,
room_id: &'a RoomId,
since: u64,
to: Option<u64>,
) -> impl Stream<Item = ReceiptItem<'_>> + Send + 'a {
self.db.readreceipts_since(room_id, since)
self.db.readreceipts_since(room_id, since, to)
}
/// Sets a private read marker at PDU `count`.

View File

@@ -536,17 +536,15 @@ impl Service {
max_edu_count: &AtomicU64,
num: &AtomicUsize,
) -> ReceiptMap {
let receipts = self
.services
.read_receipt
.readreceipts_since(room_id, since.0);
let receipts =
self.services
.read_receipt
.readreceipts_since(room_id, since.0, Some(since.1));
pin_mut!(receipts);
let mut read = BTreeMap::<OwnedUserId, ReceiptData>::new();
while let Some((user_id, count, read_receipt)) = receipts.next().await {
if count > since.1 {
break;
}
debug_assert!(count <= since.1, "exceeds upper-bound");
max_edu_count.fetch_max(count, Ordering::Relaxed);
if !self.services.globals.user_is_local(user_id) {