Separate cached body from request body in snake-sync; cleanup.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -61,31 +61,16 @@ pub(crate) async fn sync_events_v5_route(
|
|||||||
body: Ruma<sync_events::v5::Request>,
|
body: Ruma<sync_events::v5::Request>,
|
||||||
) -> Result<sync_events::v5::Response> {
|
) -> Result<sync_events::v5::Response> {
|
||||||
debug_assert!(DEFAULT_BUMP_TYPES.is_sorted(), "DEFAULT_BUMP_TYPES is not sorted");
|
debug_assert!(DEFAULT_BUMP_TYPES.is_sorted(), "DEFAULT_BUMP_TYPES is not sorted");
|
||||||
let sender_user = body
|
|
||||||
.sender_user
|
|
||||||
.as_ref()
|
|
||||||
.expect("user is authenticated");
|
|
||||||
let sender_device = body
|
|
||||||
.sender_device
|
|
||||||
.as_ref()
|
|
||||||
.expect("user is authenticated");
|
|
||||||
let mut body = body.body;
|
|
||||||
|
|
||||||
// Setup watchers, so if there's no response, we can wait for them
|
|
||||||
let watcher = services.sync.watch(sender_user, sender_device);
|
|
||||||
|
|
||||||
let next_batch = services.globals.next_count();
|
|
||||||
|
|
||||||
let conn_id = body.conn_id.clone();
|
|
||||||
|
|
||||||
|
let sender_user = body.sender_user();
|
||||||
|
let sender_device = body.sender_device();
|
||||||
|
let snake_key = into_snake_key(sender_user, sender_device, body.conn_id.clone());
|
||||||
let globalsince = body
|
let globalsince = body
|
||||||
.pos
|
.pos
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.and_then(|string| string.parse().ok())
|
.and_then(|string| string.parse().ok())
|
||||||
.unwrap_or(0);
|
.unwrap_or(0);
|
||||||
|
|
||||||
let snake_key = into_snake_key(sender_user, sender_device, conn_id);
|
|
||||||
|
|
||||||
if globalsince != 0 && !services.sync.snake_connection_cached(&snake_key) {
|
if globalsince != 0 && !services.sync.snake_connection_cached(&snake_key) {
|
||||||
return Err!(Request(UnknownPos(
|
return Err!(Request(UnknownPos(
|
||||||
"Connection data unknown to server; restarting sync stream."
|
"Connection data unknown to server; restarting sync stream."
|
||||||
@@ -99,10 +84,16 @@ pub(crate) async fn sync_events_v5_route(
|
|||||||
.forget_snake_sync_connection(&snake_key);
|
.forget_snake_sync_connection(&snake_key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Setup watchers, so if there's no response, we can wait for them
|
||||||
|
let watcher = services.sync.watch(sender_user, sender_device);
|
||||||
|
|
||||||
|
let next_batch = services.globals.next_count();
|
||||||
|
|
||||||
// Get sticky parameters from cache
|
// Get sticky parameters from cache
|
||||||
|
let mut cached = body.body.clone();
|
||||||
let known_rooms = services
|
let known_rooms = services
|
||||||
.sync
|
.sync
|
||||||
.update_snake_sync_request_with_cache(&snake_key, &mut body);
|
.update_snake_sync_request_with_cache(&snake_key, &mut cached);
|
||||||
|
|
||||||
let all_joined_rooms = services
|
let all_joined_rooms = services
|
||||||
.rooms
|
.rooms
|
||||||
@@ -140,13 +131,13 @@ pub(crate) async fn sync_events_v5_route(
|
|||||||
|
|
||||||
let mut todo_rooms: TodoRooms = BTreeMap::new();
|
let mut todo_rooms: TodoRooms = BTreeMap::new();
|
||||||
|
|
||||||
let sync_info: SyncInfo<'_> = (sender_user, sender_device, globalsince, &body);
|
let sync_info: SyncInfo<'_> = (sender_user, sender_device, globalsince, &cached);
|
||||||
|
|
||||||
let account_data = collect_account_data(services, sync_info).map(Ok);
|
let account_data = collect_account_data(services, sync_info).map(Ok);
|
||||||
|
|
||||||
let e2ee = collect_e2ee(services, sync_info, all_joined_rooms.clone());
|
let e2ee = collect_e2ee(services, sync_info, all_joined_rooms.clone());
|
||||||
|
|
||||||
let to_device = collect_to_device(services, sync_info, *next_batch).map(Ok);
|
let to_device = collect_to_device(services, sync_info, next_batch).map(Ok);
|
||||||
|
|
||||||
let receipts = collect_receipts(services).map(Ok);
|
let receipts = collect_receipts(services).map(Ok);
|
||||||
|
|
||||||
@@ -162,7 +153,7 @@ pub(crate) async fn sync_events_v5_route(
|
|||||||
};
|
};
|
||||||
|
|
||||||
let mut response = sync_events::v5::Response {
|
let mut response = sync_events::v5::Response {
|
||||||
txn_id: body.txn_id.clone(),
|
txn_id: cached.txn_id.clone(),
|
||||||
pos,
|
pos,
|
||||||
lists: BTreeMap::new(),
|
lists: BTreeMap::new(),
|
||||||
rooms: BTreeMap::new(),
|
rooms: BTreeMap::new(),
|
||||||
@@ -182,18 +173,18 @@ pub(crate) async fn sync_events_v5_route(
|
|||||||
.await;
|
.await;
|
||||||
|
|
||||||
response.extensions.typing =
|
response.extensions.typing =
|
||||||
collect_typing_events(services, sender_user, &body, all_joined_rooms.clone()).await?;
|
collect_typing_events(services, sender_user, &cached, all_joined_rooms.clone()).await?;
|
||||||
|
|
||||||
fetch_subscriptions(services, sync_info, &known_rooms, &mut todo_rooms).await;
|
fetch_subscriptions(services, sync_info, &known_rooms, &mut todo_rooms).await;
|
||||||
|
|
||||||
response.rooms = process_rooms(
|
response.rooms = process_rooms(
|
||||||
services,
|
services,
|
||||||
sender_user,
|
sender_user,
|
||||||
*next_batch,
|
next_batch,
|
||||||
all_invited_rooms.clone(),
|
all_invited_rooms.clone(),
|
||||||
&todo_rooms,
|
&todo_rooms,
|
||||||
&mut response,
|
&mut response,
|
||||||
&body,
|
&cached,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
@@ -703,6 +694,7 @@ where
|
|||||||
}
|
}
|
||||||
Ok(rooms)
|
Ok(rooms)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn collect_account_data(
|
async fn collect_account_data(
|
||||||
services: &Services,
|
services: &Services,
|
||||||
(sender_user, _, globalsince, body): (&UserId, &DeviceId, u64, &sync_events::v5::Request),
|
(sender_user, _, globalsince, body): (&UserId, &DeviceId, u64, &sync_events::v5::Request),
|
||||||
|
|||||||
Reference in New Issue
Block a user