Additional span logging of counter state; trace logging of contents.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -42,6 +42,7 @@ use tuwunel_core::{
|
|||||||
},
|
},
|
||||||
pair_of, ref_at,
|
pair_of, ref_at,
|
||||||
result::FlatOk,
|
result::FlatOk,
|
||||||
|
trace,
|
||||||
utils::{
|
utils::{
|
||||||
self, BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt,
|
self, BoolExt, FutureBoolExt, IterStream, ReadyExt, TryFutureExtExt,
|
||||||
future::{OptionStream, ReadyEqExt},
|
future::{OptionStream, ReadyEqExt},
|
||||||
@@ -115,6 +116,7 @@ type PresenceUpdates = HashMap<OwnedUserId, PresenceEventContent>;
|
|||||||
skip_all,
|
skip_all,
|
||||||
fields(
|
fields(
|
||||||
user_id = %body.sender_user(),
|
user_id = %body.sender_user(),
|
||||||
|
since = body.body.since.as_deref().unwrap_or("0"),
|
||||||
)
|
)
|
||||||
)]
|
)]
|
||||||
pub(crate) async fn sync_events_route(
|
pub(crate) async fn sync_events_route(
|
||||||
@@ -166,7 +168,12 @@ pub(crate) async fn sync_events_route(
|
|||||||
.clamp(Duration::from_millis(timeout_min), Duration::from_millis(timeout_max));
|
.clamp(Duration::from_millis(timeout_min), Duration::from_millis(timeout_max));
|
||||||
|
|
||||||
// Wait for activity notification.
|
// Wait for activity notification.
|
||||||
_ = tokio::time::timeout(duration, watcher).await;
|
let timeout = tokio::time::timeout(duration, watcher).await;
|
||||||
|
trace!(
|
||||||
|
count = ?services.globals.pending_count(),
|
||||||
|
timedout = timeout.is_err(),
|
||||||
|
"waited for watchers"
|
||||||
|
);
|
||||||
|
|
||||||
// Wait for synchronization of activity.
|
// Wait for synchronization of activity.
|
||||||
let next_batch = services.globals.wait_pending().await?;
|
let next_batch = services.globals.wait_pending().await?;
|
||||||
@@ -174,6 +181,7 @@ pub(crate) async fn sync_events_route(
|
|||||||
|
|
||||||
// Return an empty response when nothing has changed.
|
// Return an empty response when nothing has changed.
|
||||||
if since == next_batch {
|
if since == next_batch {
|
||||||
|
trace!(next_batch, "empty response");
|
||||||
return Ok(sync_events::v3::Response::new(next_batch.to_string()));
|
return Ok(sync_events::v3::Response::new(next_batch.to_string()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -190,6 +198,7 @@ pub(crate) async fn sync_events_route(
|
|||||||
fields(
|
fields(
|
||||||
%since,
|
%since,
|
||||||
%next_batch,
|
%next_batch,
|
||||||
|
count = ?services.globals.pending_count(),
|
||||||
)
|
)
|
||||||
)]
|
)]
|
||||||
async fn build_sync_events(
|
async fn build_sync_events(
|
||||||
|
|||||||
@@ -2,8 +2,7 @@
|
|||||||
|
|
||||||
use std::{
|
use std::{
|
||||||
collections::VecDeque,
|
collections::VecDeque,
|
||||||
fmt::Debug,
|
ops::{Deref, Range},
|
||||||
ops::Deref,
|
|
||||||
sync::{Arc, RwLock},
|
sync::{Arc, RwLock},
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -23,14 +22,12 @@ use crate::{Result, checked, is_equal_to};
|
|||||||
/// value, but that value has no Pdu found because its write has not been
|
/// value, but that value has no Pdu found because its write has not been
|
||||||
/// completed with global visibility. Client-sync will then move on to the next
|
/// completed with global visibility. Client-sync will then move on to the next
|
||||||
/// counter value having missed the data from the current one.
|
/// counter value having missed the data from the current one.
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct Counter<F: Fn(u64) -> Result + Sync> {
|
pub struct Counter<F: Fn(u64) -> Result + Sync> {
|
||||||
/// Self is intended to be Arc<Counter> with inner state mutable via Lock.
|
/// Self is intended to be Arc<Counter> with inner state mutable via Lock.
|
||||||
inner: RwLock<State<F>>,
|
inner: RwLock<State<F>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Inner protected state for Two-Phase Counter.
|
/// Inner protected state for Two-Phase Counter.
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct State<F: Fn(u64) -> Result + Sync> {
|
pub struct State<F: Fn(u64) -> Result + Sync> {
|
||||||
/// Monotonic counter. The next sequence number is drawn by adding one to
|
/// Monotonic counter. The next sequence number is drawn by adding one to
|
||||||
/// this value. That number will be persisted and added to `pending`.
|
/// this value. That number will be persisted and added to `pending`.
|
||||||
@@ -50,7 +47,6 @@ pub struct State<F: Fn(u64) -> Result + Sync> {
|
|||||||
release: F,
|
release: F,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct Permit<F: Fn(u64) -> Result + Sync> {
|
pub struct Permit<F: Fn(u64) -> Result + Sync> {
|
||||||
/// Link back to the shared-state.
|
/// Link back to the shared-state.
|
||||||
state: Arc<Counter<F>>,
|
state: Arc<Counter<F>>,
|
||||||
@@ -80,6 +76,17 @@ impl<F: Fn(u64) -> Result + Sync> Counter<F> {
|
|||||||
Ok(Permit::<F> { state: self.clone(), retired, id })
|
Ok(Permit::<F> { state: self.clone(), retired, id })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Load the current and dispatched values simultaneously
|
||||||
|
#[inline]
|
||||||
|
pub fn range(&self) -> Range<u64> {
|
||||||
|
let inner = self.inner.read().expect("locked for reading");
|
||||||
|
|
||||||
|
Range {
|
||||||
|
start: inner.retired(),
|
||||||
|
end: inner.dispatched,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Load the highest sequence number safe for reading, also known as the
|
/// Load the highest sequence number safe for reading, also known as the
|
||||||
/// retirement value with writes "globally visible."
|
/// retirement value with writes "globally visible."
|
||||||
#[inline]
|
#[inline]
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
use std::sync::Arc;
|
use std::{ops::Range, sync::Arc};
|
||||||
|
|
||||||
use tokio::sync::{watch, watch::Sender};
|
use tokio::sync::{watch, watch::Sender};
|
||||||
use tuwunel_core::{
|
use tuwunel_core::{
|
||||||
@@ -53,7 +53,7 @@ impl Data {
|
|||||||
.wait_for(|retired| retired.ge(count))
|
.wait_for(|retired| retired.ge(count))
|
||||||
.await
|
.await
|
||||||
.map(|retired| *retired)
|
.map(|retired| *retired)
|
||||||
.map_err(|e| err!("counter channel error {e:?}"))
|
.map_err(|e| err!(debug_error!("counter channel error {e:?}")))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
@@ -66,6 +66,9 @@ impl Data {
|
|||||||
#[inline]
|
#[inline]
|
||||||
pub fn current_count(&self) -> u64 { self.counter.current() }
|
pub fn current_count(&self) -> u64 { self.counter.current() }
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub fn pending_count(&self) -> Range<u64> { self.counter.range() }
|
||||||
|
|
||||||
fn handle_retire(sender: &Sender<u64>, count: u64) -> Result {
|
fn handle_retire(sender: &Sender<u64>, count: u64) -> Result {
|
||||||
let _prev = sender.send_replace(count);
|
let _prev = sender.send_replace(count);
|
||||||
|
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ mod data;
|
|||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
fmt::Write,
|
fmt::Write,
|
||||||
|
ops::Range,
|
||||||
sync::{Arc, RwLock},
|
sync::{Arc, RwLock},
|
||||||
time::Instant,
|
time::Instant,
|
||||||
};
|
};
|
||||||
@@ -105,22 +106,40 @@ impl crate::Service for Service {
|
|||||||
|
|
||||||
impl Service {
|
impl Service {
|
||||||
#[inline]
|
#[inline]
|
||||||
#[tracing::instrument(level = "trace", skip(self))]
|
#[tracing::instrument(
|
||||||
|
level = "trace",
|
||||||
|
skip_all,
|
||||||
|
ret,
|
||||||
|
fields(pending = ?self.pending_count()),
|
||||||
|
)]
|
||||||
pub async fn wait_pending(&self) -> Result<u64> { self.db.wait_pending().await }
|
pub async fn wait_pending(&self) -> Result<u64> { self.db.wait_pending().await }
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
#[tracing::instrument(level = "trace", skip(self))]
|
#[tracing::instrument(
|
||||||
|
level = "trace",
|
||||||
|
skip_all,
|
||||||
|
ret,
|
||||||
|
fields(pending = ?self.pending_count()),
|
||||||
|
)]
|
||||||
pub async fn wait_count(&self, count: &u64) -> Result<u64> { self.db.wait_count(count).await }
|
pub async fn wait_count(&self, count: &u64) -> Result<u64> { self.db.wait_count(count).await }
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
#[must_use]
|
#[must_use]
|
||||||
#[tracing::instrument(level = "debug", skip(self))]
|
#[tracing::instrument(
|
||||||
|
level = "debug",
|
||||||
|
skip_all,
|
||||||
|
fields(pending = ?self.pending_count()),
|
||||||
|
)]
|
||||||
pub fn next_count(&self) -> data::Permit { self.db.next_count() }
|
pub fn next_count(&self) -> data::Permit { self.db.next_count() }
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
#[must_use]
|
#[must_use]
|
||||||
pub fn current_count(&self) -> u64 { self.db.current_count() }
|
pub fn current_count(&self) -> u64 { self.db.current_count() }
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
#[must_use]
|
||||||
|
pub fn pending_count(&self) -> Range<u64> { self.db.pending_count() }
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
#[must_use]
|
#[must_use]
|
||||||
pub fn server_name(&self) -> &ServerName { self.server.name.as_ref() }
|
pub fn server_name(&self) -> &ServerName { self.server.name.as_ref() }
|
||||||
|
|||||||
@@ -48,6 +48,7 @@ use crate::rooms::timeline::RawPduId;
|
|||||||
level = INFO_SPAN_LEVEL,
|
level = INFO_SPAN_LEVEL,
|
||||||
skip_all,
|
skip_all,
|
||||||
fields(%room_id, %event_id),
|
fields(%room_id, %event_id),
|
||||||
|
ret(Debug),
|
||||||
)]
|
)]
|
||||||
pub async fn handle_incoming_pdu<'a>(
|
pub async fn handle_incoming_pdu<'a>(
|
||||||
&self,
|
&self,
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ use crate::rooms::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
#[implement(super::Service)]
|
#[implement(super::Service)]
|
||||||
|
#[tracing::instrument(name = "upgrade", level = "debug", skip_all, ret(Debug))]
|
||||||
pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
|
pub(super) async fn upgrade_outlier_to_timeline_pdu<Pdu>(
|
||||||
&self,
|
&self,
|
||||||
incoming_pdu: PduEvent,
|
incoming_pdu: PduEvent,
|
||||||
|
|||||||
@@ -93,6 +93,15 @@ impl crate::Service for Service {
|
|||||||
|
|
||||||
impl Service {
|
impl Service {
|
||||||
/// Set the room to the given statehash and update caches.
|
/// Set the room to the given statehash and update caches.
|
||||||
|
#[tracing::instrument(
|
||||||
|
name = "force",
|
||||||
|
level = "debug",
|
||||||
|
skip_all,
|
||||||
|
fields(
|
||||||
|
count = ?self.services.globals.pending_count(),
|
||||||
|
%shortstatehash,
|
||||||
|
)
|
||||||
|
)]
|
||||||
pub async fn force_state(
|
pub async fn force_state(
|
||||||
&self,
|
&self,
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
@@ -172,7 +181,14 @@ impl Service {
|
|||||||
///
|
///
|
||||||
/// This adds all current state events (not including the incoming event)
|
/// This adds all current state events (not including the incoming event)
|
||||||
/// to `stateid_pduid` and adds the incoming event to `eventid_statehash`.
|
/// to `stateid_pduid` and adds the incoming event to `eventid_statehash`.
|
||||||
#[tracing::instrument(skip(self, state_ids_compressed), level = "debug")]
|
#[tracing::instrument(
|
||||||
|
name = "set",
|
||||||
|
level = "debug",
|
||||||
|
skip(self, state_ids_compressed),
|
||||||
|
fields(
|
||||||
|
count = ?self.services.globals.pending_count(),
|
||||||
|
)
|
||||||
|
)]
|
||||||
pub async fn set_event_state(
|
pub async fn set_event_state(
|
||||||
&self,
|
&self,
|
||||||
event_id: &EventId,
|
event_id: &EventId,
|
||||||
@@ -248,7 +264,14 @@ impl Service {
|
|||||||
///
|
///
|
||||||
/// This adds all current state events (not including the incoming event)
|
/// This adds all current state events (not including the incoming event)
|
||||||
/// to `stateid_pduid` and adds the incoming event to `eventid_statehash`.
|
/// to `stateid_pduid` and adds the incoming event to `eventid_statehash`.
|
||||||
#[tracing::instrument(skip(self, new_pdu), level = "debug")]
|
#[tracing::instrument(
|
||||||
|
name = "set",
|
||||||
|
level = "debug",
|
||||||
|
skip(self, new_pdu),
|
||||||
|
fields(
|
||||||
|
count = ?self.services.globals.pending_count(),
|
||||||
|
)
|
||||||
|
)]
|
||||||
pub async fn append_to_state(&self, new_pdu: &PduEvent) -> Result<u64> {
|
pub async fn append_to_state(&self, new_pdu: &PduEvent) -> Result<u64> {
|
||||||
const BUFSIZE: usize = size_of::<u64>();
|
const BUFSIZE: usize = size_of::<u64>();
|
||||||
|
|
||||||
@@ -332,7 +355,7 @@ impl Service {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip_all, level = "debug")]
|
#[tracing::instrument(skip_all, level = "trace")]
|
||||||
pub async fn summary_stripped<Pdu>(&self, event: &Pdu) -> Vec<Raw<AnyStrippedStateEvent>>
|
pub async fn summary_stripped<Pdu>(&self, event: &Pdu) -> Vec<Raw<AnyStrippedStateEvent>>
|
||||||
where
|
where
|
||||||
Pdu: Event,
|
Pdu: Event,
|
||||||
@@ -380,7 +403,11 @@ impl Service {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the room's version.
|
/// Returns the room's version.
|
||||||
#[tracing::instrument(skip(self), level = "debug")]
|
#[tracing::instrument(
|
||||||
|
level = "trace"
|
||||||
|
skip(self),
|
||||||
|
ret,
|
||||||
|
)]
|
||||||
pub async fn get_room_version(&self, room_id: &RoomId) -> Result<RoomVersionId> {
|
pub async fn get_room_version(&self, room_id: &RoomId) -> Result<RoomVersionId> {
|
||||||
self.services
|
self.services
|
||||||
.state_accessor
|
.state_accessor
|
||||||
@@ -390,6 +417,11 @@ impl Service {
|
|||||||
.map_err(|e| err!(Request(NotFound("No create event found: {e:?}"))))
|
.map_err(|e| err!(Request(NotFound("No create event found: {e:?}"))))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(
|
||||||
|
level = "trace"
|
||||||
|
skip(self),
|
||||||
|
ret,
|
||||||
|
)]
|
||||||
pub async fn get_room_shortstatehash(&self, room_id: &RoomId) -> Result<ShortStateHash> {
|
pub async fn get_room_shortstatehash(&self, room_id: &RoomId) -> Result<ShortStateHash> {
|
||||||
self.db
|
self.db
|
||||||
.roomid_shortstatehash
|
.roomid_shortstatehash
|
||||||
@@ -411,6 +443,11 @@ impl Service {
|
|||||||
.ignore_err()
|
.ignore_err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(
|
||||||
|
level = "debug"
|
||||||
|
skip_all,
|
||||||
|
fields(%room_id),
|
||||||
|
)]
|
||||||
pub async fn set_forward_extremities<'a, I>(
|
pub async fn set_forward_extremities<'a, I>(
|
||||||
&'a self,
|
&'a self,
|
||||||
room_id: &'a RoomId,
|
room_id: &'a RoomId,
|
||||||
@@ -434,7 +471,7 @@ impl Service {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// This fetches auth events from the current state.
|
/// This fetches auth events from the current state.
|
||||||
#[tracing::instrument(skip(self, content), level = "debug")]
|
#[tracing::instrument(skip(self, content), level = "trace")]
|
||||||
pub async fn get_auth_events(
|
pub async fn get_auth_events(
|
||||||
&self,
|
&self,
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
|
|||||||
@@ -33,7 +33,12 @@ use crate::{appservice::NamespaceRegex, rooms::state_compressor::CompressedState
|
|||||||
/// Append the incoming event setting the state snapshot to the state from
|
/// Append the incoming event setting the state snapshot to the state from
|
||||||
/// the server that sent the event.
|
/// the server that sent the event.
|
||||||
#[implement(super::Service)]
|
#[implement(super::Service)]
|
||||||
#[tracing::instrument(level = "debug", skip_all)]
|
#[tracing::instrument(
|
||||||
|
name = "append_incoming",
|
||||||
|
level = "debug",
|
||||||
|
skip_all,
|
||||||
|
ret(Debug)
|
||||||
|
)]
|
||||||
pub async fn append_incoming_pdu<'a, Leafs>(
|
pub async fn append_incoming_pdu<'a, Leafs>(
|
||||||
&'a self,
|
&'a self,
|
||||||
pdu: &'a PduEvent,
|
pdu: &'a PduEvent,
|
||||||
@@ -81,7 +86,7 @@ where
|
|||||||
///
|
///
|
||||||
/// Returns pdu id
|
/// Returns pdu id
|
||||||
#[implement(super::Service)]
|
#[implement(super::Service)]
|
||||||
#[tracing::instrument(level = "debug", skip_all)]
|
#[tracing::instrument(name = "append", level = "debug", skip_all, ret(Debug))]
|
||||||
pub async fn append_pdu<'a, Leafs>(
|
pub async fn append_pdu<'a, Leafs>(
|
||||||
&'a self,
|
&'a self,
|
||||||
pdu: &'a PduEvent,
|
pdu: &'a PduEvent,
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ use super::RoomMutexGuard;
|
|||||||
/// takes a roomid_mutex_state, meaning that only this function is able to
|
/// takes a roomid_mutex_state, meaning that only this function is able to
|
||||||
/// mutate the room state.
|
/// mutate the room state.
|
||||||
#[implement(super::Service)]
|
#[implement(super::Service)]
|
||||||
#[tracing::instrument(skip(self, state_lock), level = "debug")]
|
#[tracing::instrument(skip(self, state_lock), level = "debug", ret)]
|
||||||
pub async fn build_and_append_pdu(
|
pub async fn build_and_append_pdu(
|
||||||
&self,
|
&self,
|
||||||
pdu_builder: PduBuilder,
|
pdu_builder: PduBuilder,
|
||||||
|
|||||||
Reference in New Issue
Block a user