From 7fee459b1a0b2813c927d4a9d7294836d4709693 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Tue, 7 Oct 2025 08:55:24 +0000 Subject: [PATCH] Add admin diagnostic query suite for sync state. Signed-off-by: Jason Volk --- src/admin/query/mod.rs | 7 +++- src/admin/query/sync.rs | 75 +++++++++++++++++++++++++++++++++++++++ src/api/client/sync/v5.rs | 2 +- src/service/sync/mod.rs | 48 ++++++++++++++++++++++--- 4 files changed, 125 insertions(+), 7 deletions(-) create mode 100644 src/admin/query/sync.rs diff --git a/src/admin/query/mod.rs b/src/admin/query/mod.rs index 2cf95c10..00a3b993 100644 --- a/src/admin/query/mod.rs +++ b/src/admin/query/mod.rs @@ -10,6 +10,7 @@ mod room_state_cache; mod room_timeline; mod sending; mod short; +mod sync; mod users; use clap::Subcommand; @@ -20,7 +21,7 @@ use self::{ presence::PresenceCommand, pusher::PusherCommand, raw::RawCommand, resolver::ResolverCommand, room_alias::RoomAliasCommand, room_state_cache::RoomStateCacheCommand, room_timeline::RoomTimelineCommand, sending::SendingCommand, short::ShortCommand, - users::UsersCommand, + sync::SyncCommand, users::UsersCommand, }; use crate::admin_command_dispatch; @@ -76,6 +77,10 @@ pub(super) enum QueryCommand { #[command(subcommand)] Short(ShortCommand), + /// - sync service + #[command(subcommand)] + Sync(SyncCommand), + /// - raw service #[command(subcommand)] Raw(RawCommand), diff --git a/src/admin/query/sync.rs b/src/admin/query/sync.rs new file mode 100644 index 00000000..86646a06 --- /dev/null +++ b/src/admin/query/sync.rs @@ -0,0 +1,75 @@ +use clap::Subcommand; +use ruma::{OwnedDeviceId, OwnedUserId}; +use tuwunel_core::Result; +use tuwunel_service::sync::into_connection_key; + +use crate::{admin_command, admin_command_dispatch}; + +#[admin_command_dispatch] +#[derive(Debug, Subcommand)] +/// Query sync service state +pub(crate) enum SyncCommand { + /// List sliding-sync connections. + ListConnections, + + /// Show details of sliding sync connection by ID. + ShowConnection { + user_id: OwnedUserId, + device_id: OwnedDeviceId, + conn_id: Option, + }, + + /// Drop connections for a user, device, or all. + DropConnections { + user_id: Option, + device_id: Option, + conn_id: Option, + }, +} + +#[admin_command] +pub(super) async fn list_connections(&self) -> Result { + let connections = self.services.sync.list_connections(); + + for connection_key in connections { + self.write_str(&format!("{connection_key:?}")) + .await?; + } + + Ok(()) +} + +#[admin_command] +pub(super) async fn show_connection( + &self, + user_id: OwnedUserId, + device_id: OwnedDeviceId, + conn_id: Option, +) -> Result { + let key = into_connection_key(user_id, device_id, conn_id); + let cache = self.services.sync.find_connection(&key)?; + + let out; + { + let cached = cache.lock()?; + out = format!("{cached:#?}"); + }; + + self.write_str(out.as_str()).await +} + +#[admin_command] +pub(super) async fn drop_connections( + &self, + user_id: Option, + device_id: Option, + conn_id: Option, +) -> Result { + self.services.sync.clear_connections( + user_id.as_deref(), + device_id.as_deref(), + conn_id.map(Into::into).as_ref(), + ); + + Ok(()) +} diff --git a/src/api/client/sync/v5.rs b/src/api/client/sync/v5.rs index 22d9a69b..97fabd9e 100644 --- a/src/api/client/sync/v5.rs +++ b/src/api/client/sync/v5.rs @@ -112,7 +112,7 @@ pub(crate) async fn sync_events_v5_route( // Client / User requested an initial sync if globalsince == 0 { - services.sync.forget_connection(&conn_key); + services.sync.drop_connection(&conn_key); } // Get sticky parameters from cache diff --git a/src/service/sync/mod.rs b/src/service/sync/mod.rs index bdc34fd8..2962bacd 100644 --- a/src/service/sync/mod.rs +++ b/src/service/sync/mod.rs @@ -6,13 +6,13 @@ use std::{ }; use ruma::{ - OwnedDeviceId, OwnedRoomId, OwnedUserId, + DeviceId, OwnedDeviceId, OwnedRoomId, OwnedUserId, UserId, api::client::sync::sync_events::v5::{ Request, request, request::{AccountData, E2EE, Receipts, ToDevice, Typing}, }, }; -use tuwunel_core::{Result, implement, smallstr::SmallString}; +use tuwunel_core::{Result, err, implement, is_equal_to, smallstr::SmallString}; use tuwunel_database::Map; pub struct Service { @@ -45,7 +45,8 @@ pub struct Cache { extensions: request::Extensions, } -type Connections = Mutex>>>; +type Connections = Mutex>; +type Connection = Arc>; pub type ConnectionKey = (OwnedUserId, OwnedDeviceId, Option); pub type ConnectionId = SmallString<[u8; 16]>; @@ -89,6 +90,7 @@ pub fn update_cache(&self, key: &ConnectionKey, request: &mut Request) -> KnownR Self::update_cache_lists(request, &mut cached); Self::update_cache_subscriptions(request, &mut cached); Self::update_cache_extensions(request, &mut cached); + cached.known_rooms.clone() } @@ -233,6 +235,40 @@ pub fn update_subscriptions(&self, key: &ConnectionKey, subscriptions: Subscript .subscriptions = subscriptions; } +#[implement(Service)] +pub fn clear_connections( + &self, + user_id: Option<&UserId>, + device_id: Option<&DeviceId>, + conn_id: Option<&ConnectionId>, +) { + self.connections.lock().expect("locked").retain( + |(conn_user_id, conn_device_id, conn_conn_id), _| { + !(user_id.is_none_or(is_equal_to!(conn_user_id)) + && device_id.is_none_or(is_equal_to!(conn_device_id)) + && (conn_id.is_none() || conn_id == conn_conn_id.as_ref())) + }, + ); +} + +#[implement(Service)] +pub fn drop_connection(&self, key: &ConnectionKey) { + self.connections + .lock() + .expect("locked") + .remove(key); +} + +#[implement(Service)] +pub fn list_connections(&self) -> Vec { + self.connections + .lock() + .expect("locked") + .keys() + .cloned() + .collect() +} + #[implement(Service)] pub fn get_connection(&self, key: &ConnectionKey) -> Arc> { self.connections @@ -244,11 +280,13 @@ pub fn get_connection(&self, key: &ConnectionKey) -> Arc> { } #[implement(Service)] -pub fn forget_connection(&self, key: &ConnectionKey) { +pub fn find_connection(&self, key: &ConnectionKey) -> Result>> { self.connections .lock() .expect("locked") - .remove(key); + .get(key) + .cloned() + .ok_or_else(|| err!(Request(NotFound("Connection not found.")))) } #[implement(Service)]