Improved solution to current_shortstatehash > next_batch. (59b62b1)

`current_shortstatehash` is now calculated from timeline which is already
upper-bound by the `next_batch` snapshot. Previously `current_shortstatehash`
was truly current and had to be disallowed from exceeding `next_batch` by
ignoring the room during sync until a future pass when these values finally
met that condition.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-08-10 08:59:57 +00:00
parent 3e2f2fbffa
commit 3430d4ac86
5 changed files with 46 additions and 53 deletions

View File

@@ -36,7 +36,7 @@ pub(super) async fn last(&self, room_id: OwnedRoomOrAliasId) -> Result {
.services .services
.rooms .rooms
.timeline .timeline
.last_timeline_count(None, &room_id) .last_timeline_count(None, &room_id, None)
.await?; .await?;
self.write_str(&format!("{result:#?}")).await self.write_str(&format!("{result:#?}")).await

View File

@@ -11,7 +11,7 @@ use ruma::{
use tuwunel_core::{ use tuwunel_core::{
Error, PduCount, Result, Error, PduCount, Result,
matrix::pdu::PduEvent, matrix::pdu::PduEvent,
utils::stream::{BroadbandExt, ReadyExt, TryIgnore}, utils::stream::{BroadbandExt, ReadyExt},
}; };
use tuwunel_service::Services; use tuwunel_service::Services;
@@ -27,22 +27,22 @@ async fn load_timeline(
roomsincecount: PduCount, roomsincecount: PduCount,
next_batch: Option<PduCount>, next_batch: Option<PduCount>,
limit: usize, limit: usize,
) -> Result<(Vec<(PduCount, PduEvent)>, bool), Error> { ) -> Result<(Vec<(PduCount, PduEvent)>, bool, PduCount), Error> {
let last_timeline_count = services let last_timeline_count = services
.rooms .rooms
.timeline .timeline
.last_timeline_count(Some(sender_user), room_id) .last_timeline_count(Some(sender_user), room_id, next_batch)
.await?; .await?;
if last_timeline_count <= roomsincecount { if last_timeline_count <= roomsincecount {
return Ok((Vec::new(), false)); return Ok((Vec::new(), false, last_timeline_count));
} }
let non_timeline_pdus = services let non_timeline_pdus = services
.rooms .rooms
.timeline .timeline
.pdus_rev(Some(sender_user), room_id, None) .pdus_rev(Some(sender_user), room_id, None)
.ignore_err() .ready_filter_map(Result::ok)
.ready_skip_while(|&(pducount, _)| pducount > next_batch.unwrap_or_else(PduCount::max)) .ready_skip_while(|&(pducount, _)| pducount > next_batch.unwrap_or_else(PduCount::max))
.ready_take_while(|&(pducount, _)| pducount > roomsincecount); .ready_take_while(|&(pducount, _)| pducount > roomsincecount);
@@ -60,7 +60,7 @@ async fn load_timeline(
// is limited unless there are events in non_timeline_pdus // is limited unless there are events in non_timeline_pdus
let limited = non_timeline_pdus.next().await.is_some(); let limited = non_timeline_pdus.next().await.is_some();
Ok((timeline_pdus, limited)) Ok((timeline_pdus, limited, last_timeline_count))
} }
async fn share_encrypted_room( async fn share_encrypted_room(

View File

@@ -6,7 +6,7 @@ use std::{
use axum::extract::State; use axum::extract::State;
use futures::{ use futures::{
FutureExt, StreamExt, TryFutureExt, TryStreamExt, FutureExt, StreamExt, TryFutureExt, TryStreamExt,
future::{OptionFuture, join, join3, join4, join5, try_join4}, future::{OptionFuture, join, join3, join4, join5, try_join3},
pin_mut, pin_mut,
}; };
use ruma::{ use ruma::{
@@ -658,15 +658,6 @@ async fn load_joined_room(
full_state: bool, full_state: bool,
filter: &FilterDefinition, filter: &FilterDefinition,
) -> Result<(JoinedRoom, HashSet<OwnedUserId>, HashSet<OwnedUserId>)> { ) -> Result<(JoinedRoom, HashSet<OwnedUserId>, HashSet<OwnedUserId>)> {
let sincecount = PduCount::Normal(since);
let next_batchcount = PduCount::Normal(next_batch);
let current_shortstatehash = services
.rooms
.state
.get_room_shortstatehash(room_id)
.map_err(|_| err!(Database(error!("Room {room_id} has no state"))));
let since_shortstatehash = services let since_shortstatehash = services
.rooms .rooms
.user .user
@@ -685,8 +676,8 @@ async fn load_joined_room(
services, services,
sender_user, sender_user,
room_id, room_id,
sincecount, PduCount::Normal(since),
Some(next_batchcount), Some(PduCount::Normal(next_batch)),
timeline_limit, timeline_limit,
); );
@@ -704,24 +695,37 @@ async fn load_joined_room(
.collect::<HashMap<OwnedUserId, Raw<AnySyncEphemeralRoomEvent>>>() .collect::<HashMap<OwnedUserId, Raw<AnySyncEphemeralRoomEvent>>>()
.map(Ok); .map(Ok);
let (current_shortstatehash, since_shortstatehash, (timeline_pdus, limited), receipt_events) = let (since_shortstatehash, (timeline_pdus, limited, last_timeline_count), receipt_events) =
try_join4(current_shortstatehash, since_shortstatehash, timeline, receipt_events) try_join3(since_shortstatehash, timeline, receipt_events)
.boxed() .boxed()
.await?; .await?;
// State was changed after the cutoff for this sync; similar to other handlers. let horizon_shortstatehash: OptionFuture<_> = timeline_pdus
if current_shortstatehash > next_batch { .iter()
// Transfer the since_shortstatehash not the current over to the next sync. .map(at!(0))
if let Some(since_shortstatehash) = since_shortstatehash { .map(PduCount::into_unsigned)
.map(|shorteventid| {
services services
.rooms .rooms
.user .state_accessor
.associate_token_shortstatehash(room_id, next_batch, since_shortstatehash) .get_shortstatehash(shorteventid)
.await; })
} .next()
.into();
return Ok((JoinedRoom::default(), HashSet::new(), HashSet::new())); let current_shortstatehash = services
} .rooms
.state_accessor
.get_shortstatehash(last_timeline_count.into_unsigned())
.or_else(|_| services.state.get_room_shortstatehash(room_id));
let (horizon_shortstatehash, current_shortstatehash) =
join(horizon_shortstatehash, current_shortstatehash)
.boxed()
.await;
let current_shortstatehash = current_shortstatehash
.map_err(|_| err!(Database(error!("Room {room_id} has no state"))))?;
let associate_token = services let associate_token = services
.rooms .rooms
@@ -773,19 +777,6 @@ async fn load_joined_room(
}) })
.into(); .into();
let horizon_shortstatehash: OptionFuture<_> = timeline_pdus
.iter()
.map(at!(0))
.map(PduCount::into_unsigned)
.map(|shorteventid| {
services
.rooms
.state_accessor
.get_shortstatehash(shorteventid)
})
.next()
.into();
let last_notification_read: OptionFuture<_> = timeline_pdus let last_notification_read: OptionFuture<_> = timeline_pdus
.is_empty() .is_empty()
.then(|| { .then(|| {
@@ -809,8 +800,7 @@ async fn load_joined_room(
let encrypted_room = services let encrypted_room = services
.rooms .rooms
.state_accessor .state_accessor
.state_get(current_shortstatehash, &StateEventType::RoomEncryption, "") .is_encrypted_room(room_id);
.is_ok();
let last_privateread_update = services let last_privateread_update = services
.rooms .rooms
@@ -818,13 +808,13 @@ async fn load_joined_room(
.last_privateread_update(sender_user, room_id); .last_privateread_update(sender_user, room_id);
let ( let (
witness, (witness, since_sender_member),
(encrypted_room, last_privateread_update, last_notification_read), (encrypted_room, ()),
(since_sender_member, horizon_shortstatehash, ()), (last_privateread_update, last_notification_read),
) = join3( ) = join3(
witness, join(witness, since_sender_member),
join3(encrypted_room, last_privateread_update, last_notification_read), join(encrypted_room, associate_token),
join3(since_sender_member, horizon_shortstatehash, associate_token), join(last_privateread_update, last_notification_read),
) )
.boxed() .boxed()
.await; .await;

View File

@@ -422,7 +422,7 @@ where
(timeline_pdus, limited) = (Vec::new(), true); (timeline_pdus, limited) = (Vec::new(), true);
} else { } else {
(timeline_pdus, limited) = match load_timeline( (timeline_pdus, limited, _) = match load_timeline(
services, services,
sender_user, sender_user,
room_id, room_id,

View File

@@ -222,11 +222,14 @@ pub async fn last_timeline_count(
&self, &self,
sender_user: Option<&UserId>, sender_user: Option<&UserId>,
room_id: &RoomId, room_id: &RoomId,
upper_bound: Option<PduCount>,
) -> Result<PduCount> { ) -> Result<PduCount> {
let upper_bound = upper_bound.unwrap_or_else(PduCount::max);
let pdus_rev = self.pdus_rev(sender_user, room_id, None); let pdus_rev = self.pdus_rev(sender_user, room_id, None);
pin_mut!(pdus_rev); pin_mut!(pdus_rev);
let last_count = pdus_rev let last_count = pdus_rev
.ready_try_skip_while(|&(pducount, _)| Ok(pducount > upper_bound))
.try_next() .try_next()
.await? .await?
.map(at!(0)) .map(at!(0))