@@ -156,10 +156,12 @@ pub(crate) async fn sync_events_route(
|
|||||||
.checked_add(Duration::from_millis(timeout))
|
.checked_add(Duration::from_millis(timeout))
|
||||||
.expect("configuration must limit maximum timeout");
|
.expect("configuration must limit maximum timeout");
|
||||||
|
|
||||||
let mut next_batch = services.globals.current_count();
|
|
||||||
loop {
|
loop {
|
||||||
// Listen for activity
|
// Listen for activity
|
||||||
let watchers = services.sync.watch(sender_user, sender_device);
|
let watchers = services.sync.watch(sender_user, sender_device);
|
||||||
|
let next_batch = services.globals.wait_pending().await?;
|
||||||
|
|
||||||
|
debug_assert!(since <= next_batch, "next_batch is monotonic");
|
||||||
if since < next_batch || body.body.full_state {
|
if since < next_batch || body.body.full_state {
|
||||||
// Check for activity
|
// Check for activity
|
||||||
let response = build_sync_events(&services, &body, since, next_batch).await?;
|
let response = build_sync_events(&services, &body, since, next_batch).await?;
|
||||||
@@ -176,7 +178,8 @@ pub(crate) async fn sync_events_route(
|
|||||||
|
|
||||||
// Wait for activity
|
// Wait for activity
|
||||||
if time::timeout_at(stop_at, watchers).await.is_err() {
|
if time::timeout_at(stop_at, watchers).await.is_err() {
|
||||||
break;
|
trace!(next_batch, "empty response");
|
||||||
|
return Ok(sync_events::v3::Response::new(next_batch.to_string()));
|
||||||
}
|
}
|
||||||
|
|
||||||
trace!(
|
trace!(
|
||||||
@@ -184,16 +187,9 @@ pub(crate) async fn sync_events_route(
|
|||||||
last_batch = ?next_batch,
|
last_batch = ?next_batch,
|
||||||
count = ?services.globals.pending_count(),
|
count = ?services.globals.pending_count(),
|
||||||
stop_at = ?stop_at,
|
stop_at = ?stop_at,
|
||||||
"waited for watchers"
|
"notified by watcher"
|
||||||
);
|
);
|
||||||
|
|
||||||
// Synchronize activity
|
|
||||||
next_batch = services.globals.wait_pending().await?;
|
|
||||||
debug_assert!(since <= next_batch, "next_batch is monotonic");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
trace!(next_batch, "empty response");
|
|
||||||
return Ok(sync_events::v3::Response::new(next_batch.to_string()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(
|
#[tracing::instrument(
|
||||||
|
|||||||
Reference in New Issue
Block a user