Split txn pdu/edu handling with separate spans.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2026-02-14 05:27:31 +00:00
parent 6cd4c1a70b
commit 1004d99350
2 changed files with 48 additions and 15 deletions

View File

@@ -142,6 +142,21 @@ async fn handle(
started: Instant,
pdus: impl Stream<Item = Pdu> + Send,
edus: impl Stream<Item = Edu> + Send,
) -> Result<ResolvedMap> {
let results = handle_pdus(services, client, origin, started, pdus).await?;
handle_edus(services, client, origin, edus).await?;
Ok(results)
}
#[tracing::instrument(name = "pdus", level = "debug", skip_all)]
async fn handle_pdus(
services: &Services,
client: &IpAddr,
origin: &ServerName,
started: Instant,
pdus: impl Stream<Item = Pdu> + Send,
) -> Result<ResolvedMap> {
// group pdus by room
let pdus = pdus
@@ -160,7 +175,7 @@ async fn handle(
.into_iter()
.try_stream()
.broad_and_then(async |(room_id, pdus): (_, Vec<_>)| {
handle_room(services, client, origin, &started, room_id, pdus.into_iter())
handle_pdus_room(services, client, origin, &started, room_id, pdus.into_iter())
.map_ok(ResolvedMap::into_iter)
.map_ok(IterStream::try_stream)
.await
@@ -169,17 +184,16 @@ async fn handle(
.try_collect()
.await?;
// evaluate edus after pdus, at least for now.
edus.enumerate()
.for_each_concurrent(automatic_width(), |(i, edu)| {
handle_edu(services, client, origin, i, edu)
})
.await;
Ok(results)
}
async fn handle_room(
#[tracing::instrument(
name = "room",
level = "debug",
skip_all,
fields(%room_id)
)]
async fn handle_pdus_room(
services: &Services,
_client: &IpAddr,
origin: &ServerName,
@@ -205,10 +219,7 @@ async fn handle_room(
.await;
debug!(
%event_id,
%room_id,
ri,
ti,
%event_id, ri, ti,
pdu_elapsed = ?pdu_start_time.elapsed(),
txn_elapsed = ?txn_start_time.elapsed(),
"Finished PDU",
@@ -221,6 +232,28 @@ async fn handle_room(
.await
}
#[tracing::instrument(name = "edus", level = "debug", skip_all)]
async fn handle_edus(
services: &Services,
client: &IpAddr,
origin: &ServerName,
edus: impl Stream<Item = Edu> + Send,
) -> Result {
edus.enumerate()
.for_each_concurrent(automatic_width(), |(i, edu)| {
handle_edu(services, client, origin, i, edu)
})
.await;
Ok(())
}
#[tracing::instrument(
name = "edu",
level = "debug",
skip_all,
fields(%i),
)]
async fn handle_edu(
services: &Services,
client: &IpAddr,

View File

@@ -127,7 +127,7 @@ where
}
#[implement(Service)]
#[tracing::instrument(skip(self), level = "debug")]
#[tracing::instrument(skip(self), level = "debug", ret)]
pub async fn is_event_referenced(&self, room_id: &RoomId, event_id: &EventId) -> bool {
let key = (room_id, event_id);
self.db.referencedevents.qry(&key).await.is_ok()
@@ -140,7 +140,7 @@ pub fn mark_event_soft_failed(&self, event_id: &EventId) {
}
#[implement(Service)]
#[tracing::instrument(skip(self), level = "debug")]
#[tracing::instrument(skip(self), level = "debug", ret)]
pub async fn is_event_soft_failed(&self, event_id: &EventId) -> bool {
self.db
.softfailedeventids