Implement stateless sync; erase all the sync tokens.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-09-27 03:10:28 +00:00
parent d95c3f126f
commit 68c4f60bb3
5 changed files with 206 additions and 268 deletions

View File

@@ -5,8 +5,8 @@ use std::{
use axum::extract::State;
use futures::{
FutureExt, StreamExt, TryFutureExt, TryStreamExt,
future::{OptionFuture, join, join3, join4, join5, try_join3},
FutureExt, StreamExt, TryFutureExt,
future::{OptionFuture, join, join3, join4, join5},
pin_mut,
};
use ruma::{
@@ -36,7 +36,11 @@ use ruma::{
};
use tokio::time;
use tuwunel_core::{
Result, at, err, error, extract_variant, is_equal_to,
Error, Result, at,
debug::INFO_SPAN_LEVEL,
err, error,
error::inspect_debug_log,
extract_variant, is_equal_to,
matrix::{
Event,
event::Matches,
@@ -212,7 +216,7 @@ async fn build_empty_response(
#[tracing::instrument(
name = "build",
level = "debug",
level = INFO_SPAN_LEVEL,
ret(level = "trace"),
skip_all,
fields(
@@ -562,20 +566,22 @@ async fn handle_left_room(
let mut left_state_events = Vec::new();
let since_shortstatehash = services
.user
.get_token_shortstatehash(room_id, since);
.timeline
.prev_shortstatehash(room_id, PduCount::Normal(since).saturating_add(1))
.await
.ok();
let since_state_ids: HashMap<_, OwnedEventId> = since_shortstatehash
.map_ok(|since_shortstatehash| {
.map(|since_shortstatehash| {
services
.state_accessor
.state_full_ids(since_shortstatehash)
.map(Ok)
})
.try_flatten_stream()
.try_collect()
.await
.unwrap_or_default();
.into_iter()
.stream()
.flatten()
.collect()
.await;
let Ok(left_event_id): Result<OwnedEventId> = services
.state_accessor
@@ -666,12 +672,7 @@ async fn load_joined_room(
full_state: bool,
filter: &FilterDefinition,
) -> Result<(JoinedRoom, HashSet<OwnedUserId>, HashSet<OwnedUserId>)> {
let since_shortstatehash = services
.user
.get_token_shortstatehash(room_id, since)
.ok()
.map(Ok);
let initial = since == 0;
let timeline_limit: usize = filter
.room
.timeline
@@ -679,14 +680,38 @@ async fn load_joined_room(
.unwrap_or_else(|| uint!(10))
.try_into()?;
let timeline = load_timeline(
let (timeline_pdus, limited, last_timeline_count) = load_timeline(
services,
sender_user,
room_id,
PduCount::Normal(since),
Some(PduCount::Normal(next_batch)),
timeline_limit,
);
)
.await?;
let since_shortstatehash = services
.timeline
.prev_shortstatehash(room_id, PduCount::Normal(since).saturating_add(1))
.ok();
let horizon_shortstatehash: OptionFuture<_> = timeline_pdus
.first()
.map(at!(0))
.map(|count| {
services
.timeline
.get_shortstatehash(room_id, count)
.inspect_err(inspect_debug_log)
})
.into();
let current_shortstatehash = services
.timeline
.get_shortstatehash(room_id, last_timeline_count)
.inspect_err(inspect_debug_log)
.or_else(|_| services.state.get_room_shortstatehash(room_id))
.map_err(|_| err!(Database(error!("Room {room_id} has no state"))));
let receipt_events = services
.read_receipt
@@ -698,46 +723,32 @@ async fn load_joined_room(
.await
.or_some((read_user.to_owned(), edu))
})
.collect::<HashMap<OwnedUserId, Raw<AnySyncEphemeralRoomEvent>>>()
.map(Ok);
.collect::<HashMap<OwnedUserId, Raw<AnySyncEphemeralRoomEvent>>>();
let (since_shortstatehash, (timeline_pdus, limited, last_timeline_count), receipt_events) =
try_join3(since_shortstatehash, timeline, receipt_events)
.boxed()
.await?;
let encrypted_room = services.state_accessor.is_encrypted_room(room_id);
let horizon_shortstatehash: OptionFuture<_> = timeline_pdus
.iter()
.map(at!(0))
.map(PduCount::into_unsigned)
.map(|shorteventid| services.state.get_shortstatehash(shorteventid))
.next()
.into();
let (
(since_shortstatehash, horizon_shortstatehash, current_shortstatehash),
receipt_events,
encrypted_room,
) = join3(
join3(since_shortstatehash, horizon_shortstatehash, current_shortstatehash),
receipt_events,
encrypted_room,
)
.map(|((since, horizon, current), receipt, encrypted_room)| {
Ok::<_, Error>(((since, horizon.flat_ok(), current?), receipt, encrypted_room))
})
.boxed()
.await?;
let current_shortstatehash = services
.state
.get_shortstatehash(last_timeline_count.into_unsigned())
.or_else(|_| services.state.get_room_shortstatehash(room_id));
let lazy_load_options =
[&filter.room.state.lazy_load_options, &filter.room.timeline.lazy_load_options];
let (horizon_shortstatehash, current_shortstatehash) =
join(horizon_shortstatehash, current_shortstatehash)
.boxed()
.await;
let current_shortstatehash = current_shortstatehash
.map_err(|_| err!(Database(error!("Room {room_id} has no state"))))?;
let associate_token =
services
.user
.associate_token_shortstatehash(room_id, next_batch, current_shortstatehash);
let lazy_loading_enabled = filter.room.state.lazy_load_options.is_enabled()
|| filter
.room
.timeline
.lazy_load_options
.is_enabled();
let lazy_loading_enabled = !encrypted_room
&& lazy_load_options
.iter()
.any(|opts| opts.is_enabled());
let lazy_loading_context = &lazy_loading::Context {
user_id: sender_user,
@@ -747,8 +758,6 @@ async fn load_joined_room(
options: Some(&filter.room.state.lazy_load_options),
};
let initial = since == 0 || since_shortstatehash.is_none();
// Reset lazy loading because this is an initial sync
let lazy_load_reset: OptionFuture<_> = initial
.then(|| services.lazy_loading.reset(lazy_loading_context))
@@ -771,6 +780,22 @@ async fn load_joined_room(
})
.into();
let sender_joined_count = services
.state_cache
.get_joined_count(room_id, sender_user);
let since_encryption: OptionFuture<_> = since_shortstatehash
.map(|shortstatehash| {
services
.state_accessor
.state_get(shortstatehash, &StateEventType::RoomEncryption, "")
})
.into();
let last_privateread_update = services
.read_receipt
.last_privateread_update(sender_user, room_id);
let last_notification_read: OptionFuture<_> = timeline_pdus
.is_empty()
.then(|| {
@@ -780,38 +805,20 @@ async fn load_joined_room(
})
.into();
let since_sender_member: OptionFuture<_> = since_shortstatehash
.map(|short| {
services
.state_accessor
.state_get_content(short, &StateEventType::RoomMember, sender_user.as_str())
.ok()
})
.into();
let encrypted_room = services.state_accessor.is_encrypted_room(room_id);
let last_privateread_update = services
.read_receipt
.last_privateread_update(sender_user, room_id);
let (
(witness, since_sender_member),
(encrypted_room, ()),
(last_privateread_update, last_notification_read),
(sender_joined_count, since_encryption),
witness,
) = join3(
join(witness, since_sender_member),
join(encrypted_room, associate_token),
join(last_privateread_update, last_notification_read),
join(sender_joined_count, since_encryption),
witness,
)
.await;
let joined_since_last_sync =
since_sender_member
.flatten()
.is_none_or(|content: RoomMemberEventContent| {
content.membership != MembershipState::Join
});
let _encrypted_since_last_sync = !initial && encrypted_room && since_encryption.is_none();
let joined_since_last_sync = sender_joined_count.is_ok_and(|count| count > since);
let StateChanges {
heroes,
@@ -822,19 +829,15 @@ async fn load_joined_room(
services,
sender_user,
room_id,
full_state,
encrypted_room,
full_state || initial,
since_shortstatehash,
horizon_shortstatehash.flat_ok(),
horizon_shortstatehash,
current_shortstatehash,
joined_since_last_sync,
witness.as_ref(),
)
.await?;
let send_notification_counts =
last_notification_read.is_none_or(|last_count| last_count.gt(&since));
let is_sender_membership = |event: &PduEvent| {
*event.event_type() == StateEventType::RoomMember.into()
&& event
@@ -842,14 +845,25 @@ async fn load_joined_room(
.is_some_and(is_equal_to!(sender_user.as_str()))
};
let joined_sender_member: Option<_> = (joined_since_last_sync && timeline_pdus.is_empty())
.then(|| {
state_events
.iter()
.position(is_sender_membership)
.map(|pos| state_events.swap_remove(pos))
})
.flatten();
let joined_sender_member: Option<_> =
(joined_since_last_sync && timeline_pdus.is_empty() && !initial)
.then(|| {
state_events
.iter()
.position(is_sender_membership)
.map(|pos| state_events.swap_remove(pos))
})
.flatten();
let prev_batch = timeline_pdus.first().map(at!(0)).or_else(|| {
joined_sender_member
.is_some()
.then_some(since)
.map(Into::into)
});
let send_notification_counts =
last_notification_read.is_none_or(|last_count| last_count.gt(&since));
let notification_count: OptionFuture<_> = send_notification_counts
.then(|| {
@@ -895,19 +909,26 @@ async fn load_joined_room(
})
.unwrap_or(Vec::new());
let keys_changed = services
.users
.room_keys_changed(room_id, since, Some(next_batch))
.map(|(user_id, _)| user_id)
.map(ToOwned::to_owned)
.collect::<Vec<_>>();
let extract_membership = |event: &PduEvent| {
let content: RoomMemberEventContent = event.get_content().ok()?;
let user_id: OwnedUserId = event.state_key()?.parse().ok()?;
Some((content, user_id))
Some((content.membership, user_id))
};
let timeline_membership_changes: Vec<_> = timeline_pdus
let timeline_membership_changes = timeline_pdus
.iter()
.map(ref_at!(1))
.filter(|_| !initial)
.map(ref_at!(1))
.filter_map(extract_membership)
.collect();
.collect::<Vec<_>>();
let device_list_updates = state_events
.iter()
@@ -915,31 +936,27 @@ async fn load_joined_room(
.ready_filter(|_| !initial)
.ready_filter(|state_event| *state_event.event_type() == RoomMember)
.ready_filter_map(extract_membership)
.chain(timeline_membership_changes.into_iter().stream())
.fold_default(async |(mut dlu, mut leu): pair_of!(HashSet<_>), (content, user_id)| {
.chain(timeline_membership_changes.stream())
.fold_default(async |(mut dlu, mut leu): pair_of!(HashSet<_>), (membership, user_id)| {
use MembershipState::*;
let shares_encrypted_room = async |user_id| {
share_encrypted_room(services, sender_user, user_id, Some(room_id)).await
let requires_update = async |user_id| {
!share_encrypted_room(services, sender_user, user_id, Some(room_id)).await
};
match content.membership {
match membership {
| Join if requires_update(&user_id).await => dlu.insert(user_id),
| Leave => leu.insert(user_id),
| Join if joined_since_last_sync || !shares_encrypted_room(&user_id).await =>
dlu.insert(user_id),
| _ => false,
};
(dlu, leu)
})
.then(async |(mut dlu, leu)| {
dlu.extend(keys_changed.await);
(dlu, leu)
});
let prev_batch = timeline_pdus.first().map(at!(0)).or_else(|| {
joined_sender_member
.is_some()
.then_some(since)
.map(Into::into)
});
let include_in_timeline = |event: &PduEvent| {
let filter = &filter.room.timeline;
filter.matches(event)
@@ -954,33 +971,26 @@ async fn load_joined_room(
.ready_filter(include_in_timeline)
.collect::<Vec<_>>();
let device_updates = services
.users
.room_keys_changed(room_id, since, Some(next_batch))
.map(|(user_id, _)| user_id)
.map(ToOwned::to_owned)
.collect::<Vec<_>>();
let account_data_events = services
.account_data
.changes_since(Some(room_id), sender_user, since, Some(next_batch))
.changes_since(Some(room_id), sender_user, since, None)
.ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
.collect();
let (
(room_events, account_data_events),
(typing_events, private_read_event),
(notification_count, highlight_count),
((mut device_list_updates, left_encrypted_users), device_updates),
(room_events, account_data_events, typing_events, private_read_event),
) = join3(
(device_list_updates, left_encrypted_users),
) = join4(
join(room_events, account_data_events),
join(typing_events, private_read_event),
join(notification_count, highlight_count),
join(device_list_updates, device_updates),
join4(room_events, account_data_events, typing_events, private_read_event),
device_list_updates,
)
.boxed()
.await;
device_list_updates.extend(device_updates);
let is_in_timeline = |event: &PduEvent| {
room_events
.iter()
@@ -1042,8 +1052,9 @@ async fn load_joined_room(
skip_all,
fields(
full = %full_state,
cs = %current_shortstatehash,
ss = ?since_shortstatehash,
hs = ?horizon_shortstatehash,
cs = %current_shortstatehash,
)
)]
#[allow(clippy::too_many_arguments)]
@@ -1052,16 +1063,13 @@ async fn calculate_state_changes<'a>(
sender_user: &UserId,
room_id: &RoomId,
full_state: bool,
encrypted_room: bool,
since_shortstatehash: Option<ShortStateHash>,
horizon_shortstatehash: Option<ShortStateHash>,
current_shortstatehash: ShortStateHash,
joined_since_last_sync: bool,
witness: Option<&'a Witness>,
) -> Result<StateChanges> {
let initial = full_state || since_shortstatehash.is_none() || joined_since_last_sync;
let incremental = !initial && since_shortstatehash != Some(current_shortstatehash);
let incremental = !full_state && !joined_since_last_sync && since_shortstatehash.is_some();
let horizon_shortstatehash = horizon_shortstatehash.unwrap_or(current_shortstatehash);
@@ -1079,36 +1087,33 @@ async fn calculate_state_changes<'a>(
};
let lazy_state_ids: OptionFuture<_> = witness
.filter(|_| !encrypted_room)
.map(|witness| {
StreamExt::into_future(
witness
.iter()
.stream()
.broad_filter_map(|user_id| state_get_shorteventid(user_id)),
)
witness
.iter()
.stream()
.ready_filter(|&user_id| user_id != sender_user)
.broad_filter_map(|user_id| state_get_shorteventid(user_id))
.into_future()
})
.into();
let state_diff_ids: OptionFuture<_> = incremental
.then(|| {
StreamExt::into_future(
services
.state_accessor
.state_added((since_shortstatehash, horizon_shortstatehash))
.boxed(),
)
services
.state_accessor
.state_added((since_shortstatehash, horizon_shortstatehash))
.boxed()
.into_future()
})
.into();
let current_state_ids: OptionFuture<_> = initial
let current_state_ids: OptionFuture<_> = (!incremental)
.then(|| {
StreamExt::into_future(
services
.state_accessor
.state_full_shortids(horizon_shortstatehash)
.expect_ok(),
)
services
.state_accessor
.state_full_shortids(horizon_shortstatehash)
.expect_ok()
.into_future()
})
.into();
@@ -1116,28 +1121,24 @@ async fn calculate_state_changes<'a>(
.stream()
.chain(state_diff_ids.stream())
.broad_filter_map(async |(shortstatekey, shorteventid)| {
if witness.is_none() || encrypted_room {
return Some(shorteventid);
}
lazy_filter(services, sender_user, shortstatekey, shorteventid).await
lazy_filter(services, sender_user, witness, shortstatekey, shorteventid).await
})
.chain(lazy_state_ids.stream())
.broad_filter_map(|shorteventid| {
services
.short
.get_eventid_from_short(shorteventid)
.and_then(async |event_id: OwnedEventId| {
services.timeline.get_pdu(&event_id).await
})
.ok()
})
.broad_filter_map(async |event_id: OwnedEventId| {
services.timeline.get_pdu(&event_id).ok().await
})
.collect::<Vec<_>>()
.await;
let send_member_counts = state_events
.iter()
.any(|event| event.kind == RoomMember);
.any(|event| *event.kind() == RoomMember);
let member_counts: OptionFuture<_> = send_member_counts
.then(|| calculate_counts(services, room_id, sender_user))
@@ -1157,9 +1158,14 @@ async fn calculate_state_changes<'a>(
async fn lazy_filter(
services: &Services,
sender_user: &UserId,
witness: Option<&Witness>,
shortstatekey: ShortStateKey,
shorteventid: ShortEventId,
) -> Option<ShortEventId> {
if witness.is_none() {
return Some(shorteventid);
}
let (event_type, state_key) = services
.short
.get_statekey_from_short(shortstatekey)

View File

@@ -1100,8 +1100,8 @@ where
};
let since_shortstatehash = services
.user
.get_token_shortstatehash(room_id, globalsince)
.timeline
.next_shortstatehash(room_id, PduCount::Normal(globalsince))
.await
.ok();
@@ -1117,26 +1117,44 @@ where
continue;
}
let since_encryption = services
.state_accessor
.state_get(since_shortstatehash, &StateEventType::RoomEncryption, "")
.await;
let synced_shortstatehash = services
.timeline
.prev_shortstatehash(room_id, PduCount::Normal(globalsince).saturating_add(1))
.await
.ok();
let since_sender_member: Option<RoomMemberEventContent> = services
.state_accessor
.state_get_content(
since_shortstatehash,
&StateEventType::RoomMember,
sender_user.as_str(),
)
.ok()
.await;
let synced_sender_member: OptionFuture<_> = synced_shortstatehash
.map(|shortstatehash| {
services
.state_accessor
.state_get_content(
shortstatehash,
&StateEventType::RoomMember,
sender_user.as_str(),
)
.map_ok(|content: RoomMemberEventContent| content)
})
.into();
let joined_since_last_sync = since_sender_member
let joined_since_last_sync = synced_sender_member
.await
.and_then(Result::ok)
.as_ref()
.is_none_or(|member| member.membership != MembershipState::Join);
let new_encrypted_room = encrypted_room && since_encryption.is_err();
let synced_encryption: OptionFuture<_> = synced_shortstatehash
.map(|shortstatehash| {
services.state_accessor.state_get(
shortstatehash,
&StateEventType::RoomEncryption,
"",
)
})
.into();
let synced_encryption = synced_encryption.await.and_then(Result::ok);
let new_encrypted_room = encrypted_room && synced_encryption.is_none();
if encrypted_room {
let current_state_ids: HashMap<_, OwnedEventId> = services

View File

@@ -183,15 +183,6 @@ pub(super) static MAPS: &[Descriptor] = &[
name: "roomserverids",
..descriptor::RANDOM_SMALL
},
Descriptor {
name: "roomsynctoken_shortstatehash",
file_shape: 3,
val_size_hint: Some(8),
block_size: 512,
compression_level: 3,
bottommost_level: Some(6),
..descriptor::SEQUENTIAL
},
Descriptor {
name: "roomuserdataid_accountdata",
..descriptor::RANDOM_SMALL

View File

@@ -167,14 +167,6 @@ impl Service {
debug!("Final stages of deleting the room");
debug!("Deleting room sync tokens from our database");
self.services
.user
.delete_room_synctokens(room_id)
.await
.log_err()
.ok();
debug!("Deleting room state hash from our database");
self.services
.state

View File

@@ -5,9 +5,7 @@ use tuwunel_core::{
Result, implement, trace,
utils::stream::{ReadyExt, TryIgnore},
};
use tuwunel_database::{Database, Deserialized, Interfix, Map};
use crate::rooms::short::ShortStateHash;
use tuwunel_database::{Deserialized, Interfix, Map};
pub struct Service {
db: Data,
@@ -15,22 +13,18 @@ pub struct Service {
}
struct Data {
db: Arc<Database>,
userroomid_notificationcount: Arc<Map>,
userroomid_highlightcount: Arc<Map>,
roomuserid_lastnotificationread: Arc<Map>,
roomsynctoken_shortstatehash: Arc<Map>,
}
impl crate::Service for Service {
fn build(args: &crate::Args<'_>) -> Result<Arc<Self>> {
Ok(Arc::new(Self {
db: Data {
db: args.db.clone(),
userroomid_notificationcount: args.db["userroomid_notificationcount"].clone(),
userroomid_highlightcount: args.db["userroomid_highlightcount"].clone(),
roomuserid_lastnotificationread: args.db["userroomid_highlightcount"].clone(),
roomsynctoken_shortstatehash: args.db["roomsynctoken_shortstatehash"].clone(),
},
services: args.services.clone(),
}))
@@ -107,66 +101,3 @@ pub async fn delete_room_notification_read(&self, room_id: &RoomId) -> Result {
Ok(())
}
#[implement(Service)]
#[tracing::instrument(level = "trace", skip(self))]
pub async fn associate_token_shortstatehash(
&self,
room_id: &RoomId,
token: u64,
shortstatehash: ShortStateHash,
) {
let shortroomid = self
.services
.short
.get_shortroomid(room_id)
.await
.expect("room exists");
let _cork = self.db.db.cork();
let key: &[u64] = &[shortroomid, token];
self.db
.roomsynctoken_shortstatehash
.put(key, shortstatehash);
}
#[implement(Service)]
pub async fn get_token_shortstatehash(
&self,
room_id: &RoomId,
token: u64,
) -> Result<ShortStateHash> {
let shortroomid = self
.services
.short
.get_shortroomid(room_id)
.await?;
let key: &[u64] = &[shortroomid, token];
self.db
.roomsynctoken_shortstatehash
.qry(key)
.await
.deserialized()
}
#[implement(Service)]
pub async fn delete_room_synctokens(&self, room_id: &RoomId) -> Result {
let shortroomid = self
.services
.short
.get_shortroomid(room_id)
.await?;
self.db
.roomsynctoken_shortstatehash
.keys_prefix_raw(&shortroomid)
.ignore_err()
.ready_for_each(|key| {
trace!("Removing key: {key:?}");
self.db.roomsynctoken_shortstatehash.remove(key);
})
.await;
Ok(())
}