Add admin diagnostic query suite for sync state.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -10,6 +10,7 @@ mod room_state_cache;
|
||||
mod room_timeline;
|
||||
mod sending;
|
||||
mod short;
|
||||
mod sync;
|
||||
mod users;
|
||||
|
||||
use clap::Subcommand;
|
||||
@@ -20,7 +21,7 @@ use self::{
|
||||
presence::PresenceCommand, pusher::PusherCommand, raw::RawCommand, resolver::ResolverCommand,
|
||||
room_alias::RoomAliasCommand, room_state_cache::RoomStateCacheCommand,
|
||||
room_timeline::RoomTimelineCommand, sending::SendingCommand, short::ShortCommand,
|
||||
users::UsersCommand,
|
||||
sync::SyncCommand, users::UsersCommand,
|
||||
};
|
||||
use crate::admin_command_dispatch;
|
||||
|
||||
@@ -76,6 +77,10 @@ pub(super) enum QueryCommand {
|
||||
#[command(subcommand)]
|
||||
Short(ShortCommand),
|
||||
|
||||
/// - sync service
|
||||
#[command(subcommand)]
|
||||
Sync(SyncCommand),
|
||||
|
||||
/// - raw service
|
||||
#[command(subcommand)]
|
||||
Raw(RawCommand),
|
||||
|
||||
75
src/admin/query/sync.rs
Normal file
75
src/admin/query/sync.rs
Normal file
@@ -0,0 +1,75 @@
|
||||
use clap::Subcommand;
|
||||
use ruma::{OwnedDeviceId, OwnedUserId};
|
||||
use tuwunel_core::Result;
|
||||
use tuwunel_service::sync::into_connection_key;
|
||||
|
||||
use crate::{admin_command, admin_command_dispatch};
|
||||
|
||||
#[admin_command_dispatch]
|
||||
#[derive(Debug, Subcommand)]
|
||||
/// Query sync service state
|
||||
pub(crate) enum SyncCommand {
|
||||
/// List sliding-sync connections.
|
||||
ListConnections,
|
||||
|
||||
/// Show details of sliding sync connection by ID.
|
||||
ShowConnection {
|
||||
user_id: OwnedUserId,
|
||||
device_id: OwnedDeviceId,
|
||||
conn_id: Option<String>,
|
||||
},
|
||||
|
||||
/// Drop connections for a user, device, or all.
|
||||
DropConnections {
|
||||
user_id: Option<OwnedUserId>,
|
||||
device_id: Option<OwnedDeviceId>,
|
||||
conn_id: Option<String>,
|
||||
},
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn list_connections(&self) -> Result {
|
||||
let connections = self.services.sync.list_connections();
|
||||
|
||||
for connection_key in connections {
|
||||
self.write_str(&format!("{connection_key:?}"))
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn show_connection(
|
||||
&self,
|
||||
user_id: OwnedUserId,
|
||||
device_id: OwnedDeviceId,
|
||||
conn_id: Option<String>,
|
||||
) -> Result {
|
||||
let key = into_connection_key(user_id, device_id, conn_id);
|
||||
let cache = self.services.sync.find_connection(&key)?;
|
||||
|
||||
let out;
|
||||
{
|
||||
let cached = cache.lock()?;
|
||||
out = format!("{cached:#?}");
|
||||
};
|
||||
|
||||
self.write_str(out.as_str()).await
|
||||
}
|
||||
|
||||
#[admin_command]
|
||||
pub(super) async fn drop_connections(
|
||||
&self,
|
||||
user_id: Option<OwnedUserId>,
|
||||
device_id: Option<OwnedDeviceId>,
|
||||
conn_id: Option<String>,
|
||||
) -> Result {
|
||||
self.services.sync.clear_connections(
|
||||
user_id.as_deref(),
|
||||
device_id.as_deref(),
|
||||
conn_id.map(Into::into).as_ref(),
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -112,7 +112,7 @@ pub(crate) async fn sync_events_v5_route(
|
||||
|
||||
// Client / User requested an initial sync
|
||||
if globalsince == 0 {
|
||||
services.sync.forget_connection(&conn_key);
|
||||
services.sync.drop_connection(&conn_key);
|
||||
}
|
||||
|
||||
// Get sticky parameters from cache
|
||||
|
||||
@@ -6,13 +6,13 @@ use std::{
|
||||
};
|
||||
|
||||
use ruma::{
|
||||
OwnedDeviceId, OwnedRoomId, OwnedUserId,
|
||||
DeviceId, OwnedDeviceId, OwnedRoomId, OwnedUserId, UserId,
|
||||
api::client::sync::sync_events::v5::{
|
||||
Request, request,
|
||||
request::{AccountData, E2EE, Receipts, ToDevice, Typing},
|
||||
},
|
||||
};
|
||||
use tuwunel_core::{Result, implement, smallstr::SmallString};
|
||||
use tuwunel_core::{Result, err, implement, is_equal_to, smallstr::SmallString};
|
||||
use tuwunel_database::Map;
|
||||
|
||||
pub struct Service {
|
||||
@@ -45,7 +45,8 @@ pub struct Cache {
|
||||
extensions: request::Extensions,
|
||||
}
|
||||
|
||||
type Connections = Mutex<BTreeMap<ConnectionKey, Arc<Mutex<Cache>>>>;
|
||||
type Connections = Mutex<BTreeMap<ConnectionKey, Connection>>;
|
||||
type Connection = Arc<Mutex<Cache>>;
|
||||
pub type ConnectionKey = (OwnedUserId, OwnedDeviceId, Option<ConnectionId>);
|
||||
pub type ConnectionId = SmallString<[u8; 16]>;
|
||||
|
||||
@@ -89,6 +90,7 @@ pub fn update_cache(&self, key: &ConnectionKey, request: &mut Request) -> KnownR
|
||||
Self::update_cache_lists(request, &mut cached);
|
||||
Self::update_cache_subscriptions(request, &mut cached);
|
||||
Self::update_cache_extensions(request, &mut cached);
|
||||
|
||||
cached.known_rooms.clone()
|
||||
}
|
||||
|
||||
@@ -233,6 +235,40 @@ pub fn update_subscriptions(&self, key: &ConnectionKey, subscriptions: Subscript
|
||||
.subscriptions = subscriptions;
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
pub fn clear_connections(
|
||||
&self,
|
||||
user_id: Option<&UserId>,
|
||||
device_id: Option<&DeviceId>,
|
||||
conn_id: Option<&ConnectionId>,
|
||||
) {
|
||||
self.connections.lock().expect("locked").retain(
|
||||
|(conn_user_id, conn_device_id, conn_conn_id), _| {
|
||||
!(user_id.is_none_or(is_equal_to!(conn_user_id))
|
||||
&& device_id.is_none_or(is_equal_to!(conn_device_id))
|
||||
&& (conn_id.is_none() || conn_id == conn_conn_id.as_ref()))
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
pub fn drop_connection(&self, key: &ConnectionKey) {
|
||||
self.connections
|
||||
.lock()
|
||||
.expect("locked")
|
||||
.remove(key);
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
pub fn list_connections(&self) -> Vec<ConnectionKey> {
|
||||
self.connections
|
||||
.lock()
|
||||
.expect("locked")
|
||||
.keys()
|
||||
.cloned()
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
pub fn get_connection(&self, key: &ConnectionKey) -> Arc<Mutex<Cache>> {
|
||||
self.connections
|
||||
@@ -244,11 +280,13 @@ pub fn get_connection(&self, key: &ConnectionKey) -> Arc<Mutex<Cache>> {
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
pub fn forget_connection(&self, key: &ConnectionKey) {
|
||||
pub fn find_connection(&self, key: &ConnectionKey) -> Result<Arc<Mutex<Cache>>> {
|
||||
self.connections
|
||||
.lock()
|
||||
.expect("locked")
|
||||
.remove(key);
|
||||
.get(key)
|
||||
.cloned()
|
||||
.ok_or_else(|| err!(Request(NotFound("Connection not found."))))
|
||||
}
|
||||
|
||||
#[implement(Service)]
|
||||
|
||||
Reference in New Issue
Block a user