From d904d30a164cd51e0768b828fa21b52d0fd6bc5a Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sat, 14 Feb 2026 02:55:34 +0000 Subject: [PATCH] Split full_conflicted_set from resolve under separate span. Signed-off-by: Jason Volk --- src/core/matrix/state_res/resolve.rs | 93 +++++++++++++------ .../state_res/resolve/conflicted_subgraph.rs | 2 +- .../matrix/state_res/resolve/mainline_sort.rs | 2 +- .../matrix/state_res/resolve/power_sort.rs | 2 +- 4 files changed, 66 insertions(+), 33 deletions(-) diff --git a/src/core/matrix/state_res/resolve.rs b/src/core/matrix/state_res/resolve.rs index 931ef912..8d1232ec 100644 --- a/src/core/matrix/state_res/resolve.rs +++ b/src/core/matrix/state_res/resolve.rs @@ -96,50 +96,30 @@ where debug!( unconflicted = unconflicted_state.len(), - conflicted = conflicted_states.len(), - "unresolved states", + conflicted_states = conflicted_states.len(), + conflicted_events = conflicted_states + .values() + .fold(0_usize, |a, s| a.saturating_add(s.len())), + "unresolved states" ); trace!( ?unconflicted_state, ?conflicted_states, unconflicted = unconflicted_state.len(), - conflicted = conflicted_states.len(), - "unresolved states", + conflicted_states = conflicted_states.len(), + "unresolved states" ); if conflicted_states.is_empty() { return Ok(unconflicted_state.into_iter().collect()); } - let consider_conflicted_subgraph = rules - .state_res - .v2_rules() - .is_some_and(|rules| rules.consider_conflicted_state_subgraph) - || backport_css; - - let conflicted_states = conflicted_states.values().flatten().cloned(); - - // Since `org.matrix.hydra.11`, fetch the conflicted state subgraph. - let conflicted_subgraph = consider_conflicted_subgraph - .then(|| conflicted_states.clone().stream()) - .map_async(async |ids| conflicted_subgraph_dfs(ids, fetch)) - .map(Option::into_iter) - .map(IterStream::stream) - .flatten_stream() - .flatten() - .boxed(); - // 0. The full conflicted set is the union of the conflicted state set and the // auth difference. Don't honor events that don't exist. - let full_conflicted_set = auth_difference(auth_sets) - .chain(conflicted_states.stream()) - .broad_filter_map(async |id| exists(id.clone()).await.then_some(id)) - .chain(conflicted_subgraph) - .collect::>() - .inspect(|set| debug!(count = set.len(), "full conflicted set")) - .inspect(|set| trace!(?set, "full conflicted set")) - .await; + let full_conflicted_set = + full_conflicted_set(rules, conflicted_states, auth_sets, fetch, exists, backport_css) + .await; // 1. Select the set X of all power events that appear in the full conflicted // set. For each such power event P, enlarge X by adding the events in the @@ -232,3 +212,56 @@ where Ok(resolved_state) } + +#[tracing::instrument( + name = "conflicted", + level = "debug", + skip_all, + fields( + states = conflicted_states.len(), + events = conflicted_states.values().flatten().count() + ), +)] +async fn full_conflicted_set( + rules: &RoomVersionRules, + conflicted_states: ConflictMap, + auth_sets: AuthSets, + fetch: &FetchEvent, + exists: &FetchExists, + backport_css: bool, +) -> HashSet +where + AuthSets: Stream> + Send, + FetchExists: Fn(OwnedEventId) -> ExistsFut + Sync, + ExistsFut: Future + Send, + FetchEvent: Fn(OwnedEventId) -> EventFut + Sync, + EventFut: Future> + Send, + Pdu: Event + Clone, +{ + let consider_conflicted_subgraph = rules + .state_res + .v2_rules() + .is_some_and(|rules| rules.consider_conflicted_state_subgraph) + || backport_css; + + let conflicted_states = conflicted_states.values().flatten().cloned(); + + // Since `org.matrix.hydra.11`, fetch the conflicted state subgraph. + let conflicted_subgraph = consider_conflicted_subgraph + .then(|| conflicted_states.clone().stream()) + .map_async(async |ids| conflicted_subgraph_dfs(ids, fetch)) + .map(Option::into_iter) + .map(IterStream::stream) + .flatten_stream() + .flatten() + .boxed(); + + auth_difference(auth_sets) + .chain(conflicted_states.stream()) + .broad_filter_map(async |id| exists(id.clone()).await.then_some(id)) + .chain(conflicted_subgraph) + .collect::>() + .inspect(|set| debug!(count = set.len(), "full conflicted set")) + .inspect(|set| trace!(?set, "full conflicted set")) + .await +} diff --git a/src/core/matrix/state_res/resolve/conflicted_subgraph.rs b/src/core/matrix/state_res/resolve/conflicted_subgraph.rs index 237d9fae..03080547 100644 --- a/src/core/matrix/state_res/resolve/conflicted_subgraph.rs +++ b/src/core/matrix/state_res/resolve/conflicted_subgraph.rs @@ -34,7 +34,7 @@ type Frame = AuthEvents; const PATH_INLINE: usize = 48; const STACK_INLINE: usize = 48; -#[tracing::instrument(name = "conflicted_subgraph", level = "debug", skip_all)] +#[tracing::instrument(name = "subgraph_dfs", level = "debug", skip_all)] pub(super) fn conflicted_subgraph_dfs( conflicted_event_ids: ConflictedEventIds, fetch: &Fetch, diff --git a/src/core/matrix/state_res/resolve/mainline_sort.rs b/src/core/matrix/state_res/resolve/mainline_sort.rs index b5347bbe..09f1e5c7 100644 --- a/src/core/matrix/state_res/resolve/mainline_sort.rs +++ b/src/core/matrix/state_res/resolve/mainline_sort.rs @@ -39,7 +39,7 @@ use crate::{ level = "debug", skip_all, fields( - event_id = power_level_event_id + power_levels = power_level_event_id .as_deref() .map(EventId::as_str) .unwrap_or_default(), diff --git a/src/core/matrix/state_res/resolve/power_sort.rs b/src/core/matrix/state_res/resolve/power_sort.rs index c95eb4c2..01e2da80 100644 --- a/src/core/matrix/state_res/resolve/power_sort.rs +++ b/src/core/matrix/state_res/resolve/power_sort.rs @@ -45,7 +45,7 @@ use crate::{ level = "debug", skip_all, fields( - full_conflicted = full_conflicted_set.len(), + conflicted = full_conflicted_set.len(), ) )] pub(super) async fn power_sort(