Improve sliding-sync robustness to deeper replays.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -130,25 +130,22 @@ pub(crate) async fn sync_events_v5_route(
|
|||||||
|
|
||||||
let (mut conn, _) = join(conn, ping_presence).await;
|
let (mut conn, _) = join(conn, ping_presence).await;
|
||||||
|
|
||||||
// The client must either use the last returned next_batch or replay the
|
|
||||||
// next_batch from the penultimate request: it's either up-to-date or
|
|
||||||
// one-behind. If we receive anything else we can boot them.
|
|
||||||
let advancing = since == conn.next_batch;
|
let advancing = since == conn.next_batch;
|
||||||
let replaying = since == conn.globalsince;
|
let retarding = since <= conn.globalsince;
|
||||||
if !advancing && !replaying {
|
if !advancing && !retarding {
|
||||||
return Err!(Request(UnknownPos("Requesting unknown or stale stream position.")));
|
return Err!(Request(UnknownPos("Requesting unknown or stale stream position.")));
|
||||||
}
|
}
|
||||||
|
|
||||||
debug_assert!(
|
debug_assert!(
|
||||||
advancing || replaying,
|
advancing || retarding,
|
||||||
"Request should either be advancing or replaying the last request."
|
"Request should either be advancing or replaying the since token."
|
||||||
);
|
);
|
||||||
|
|
||||||
// Update parameters regardless of replay or advance
|
// Update parameters regardless of replay or advance
|
||||||
conn.next_batch = services.globals.wait_pending().await?;
|
conn.next_batch = services.globals.wait_pending().await?;
|
||||||
conn.globalsince = since.min(conn.next_batch);
|
conn.globalsince = since.min(conn.next_batch);
|
||||||
conn.update_cache(request);
|
conn.update_cache(request);
|
||||||
conn.update_rooms_prologue(advancing);
|
conn.update_rooms_prologue(retarding.then_some(since));
|
||||||
|
|
||||||
let mut response = Response {
|
let mut response = Response {
|
||||||
txn_id: request.txn_id.clone(),
|
txn_id: request.txn_id.clone(),
|
||||||
|
|||||||
@@ -53,8 +53,6 @@ pub struct Connection {
|
|||||||
#[derive(Clone, Copy, Debug, Default)]
|
#[derive(Clone, Copy, Debug, Default)]
|
||||||
pub struct Room {
|
pub struct Room {
|
||||||
pub roomsince: u64,
|
pub roomsince: u64,
|
||||||
pub last_batch: u64,
|
|
||||||
pub next_batch: u64,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type Connections = StdMutex<BTreeMap<ConnectionKey, ConnectionVal>>;
|
type Connections = StdMutex<BTreeMap<ConnectionKey, ConnectionVal>>;
|
||||||
@@ -94,14 +92,12 @@ impl crate::Service for Service {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[implement(Connection)]
|
#[implement(Connection)]
|
||||||
pub fn update_rooms_prologue(&mut self, advance: bool) {
|
pub fn update_rooms_prologue(&mut self, retard_since: Option<u64>) {
|
||||||
self.rooms.values_mut().for_each(|room| {
|
self.rooms.values_mut().for_each(|room| {
|
||||||
if advance {
|
if let Some(retard_since) = retard_since {
|
||||||
room.roomsince = room.next_batch;
|
if room.roomsince > retard_since {
|
||||||
room.last_batch = room.next_batch;
|
room.roomsince = retard_since;
|
||||||
} else {
|
}
|
||||||
room.roomsince = room.last_batch;
|
|
||||||
room.next_batch = room.last_batch;
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -115,7 +111,6 @@ where
|
|||||||
let room = self.rooms.entry(room_id.into()).or_default();
|
let room = self.rooms.entry(room_id.into()).or_default();
|
||||||
|
|
||||||
room.roomsince = self.next_batch;
|
room.roomsince = self.next_batch;
|
||||||
room.next_batch = self.next_batch;
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user