Split full_conflicted_set from resolve under separate span.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -96,50 +96,30 @@ where
|
|||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
unconflicted = unconflicted_state.len(),
|
unconflicted = unconflicted_state.len(),
|
||||||
conflicted = conflicted_states.len(),
|
conflicted_states = conflicted_states.len(),
|
||||||
"unresolved states",
|
conflicted_events = conflicted_states
|
||||||
|
.values()
|
||||||
|
.fold(0_usize, |a, s| a.saturating_add(s.len())),
|
||||||
|
"unresolved states"
|
||||||
);
|
);
|
||||||
|
|
||||||
trace!(
|
trace!(
|
||||||
?unconflicted_state,
|
?unconflicted_state,
|
||||||
?conflicted_states,
|
?conflicted_states,
|
||||||
unconflicted = unconflicted_state.len(),
|
unconflicted = unconflicted_state.len(),
|
||||||
conflicted = conflicted_states.len(),
|
conflicted_states = conflicted_states.len(),
|
||||||
"unresolved states",
|
"unresolved states"
|
||||||
);
|
);
|
||||||
|
|
||||||
if conflicted_states.is_empty() {
|
if conflicted_states.is_empty() {
|
||||||
return Ok(unconflicted_state.into_iter().collect());
|
return Ok(unconflicted_state.into_iter().collect());
|
||||||
}
|
}
|
||||||
|
|
||||||
let consider_conflicted_subgraph = rules
|
|
||||||
.state_res
|
|
||||||
.v2_rules()
|
|
||||||
.is_some_and(|rules| rules.consider_conflicted_state_subgraph)
|
|
||||||
|| backport_css;
|
|
||||||
|
|
||||||
let conflicted_states = conflicted_states.values().flatten().cloned();
|
|
||||||
|
|
||||||
// Since `org.matrix.hydra.11`, fetch the conflicted state subgraph.
|
|
||||||
let conflicted_subgraph = consider_conflicted_subgraph
|
|
||||||
.then(|| conflicted_states.clone().stream())
|
|
||||||
.map_async(async |ids| conflicted_subgraph_dfs(ids, fetch))
|
|
||||||
.map(Option::into_iter)
|
|
||||||
.map(IterStream::stream)
|
|
||||||
.flatten_stream()
|
|
||||||
.flatten()
|
|
||||||
.boxed();
|
|
||||||
|
|
||||||
// 0. The full conflicted set is the union of the conflicted state set and the
|
// 0. The full conflicted set is the union of the conflicted state set and the
|
||||||
// auth difference. Don't honor events that don't exist.
|
// auth difference. Don't honor events that don't exist.
|
||||||
let full_conflicted_set = auth_difference(auth_sets)
|
let full_conflicted_set =
|
||||||
.chain(conflicted_states.stream())
|
full_conflicted_set(rules, conflicted_states, auth_sets, fetch, exists, backport_css)
|
||||||
.broad_filter_map(async |id| exists(id.clone()).await.then_some(id))
|
.await;
|
||||||
.chain(conflicted_subgraph)
|
|
||||||
.collect::<HashSet<_>>()
|
|
||||||
.inspect(|set| debug!(count = set.len(), "full conflicted set"))
|
|
||||||
.inspect(|set| trace!(?set, "full conflicted set"))
|
|
||||||
.await;
|
|
||||||
|
|
||||||
// 1. Select the set X of all power events that appear in the full conflicted
|
// 1. Select the set X of all power events that appear in the full conflicted
|
||||||
// set. For each such power event P, enlarge X by adding the events in the
|
// set. For each such power event P, enlarge X by adding the events in the
|
||||||
@@ -232,3 +212,56 @@ where
|
|||||||
|
|
||||||
Ok(resolved_state)
|
Ok(resolved_state)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(
|
||||||
|
name = "conflicted",
|
||||||
|
level = "debug",
|
||||||
|
skip_all,
|
||||||
|
fields(
|
||||||
|
states = conflicted_states.len(),
|
||||||
|
events = conflicted_states.values().flatten().count()
|
||||||
|
),
|
||||||
|
)]
|
||||||
|
async fn full_conflicted_set<AuthSets, FetchExists, ExistsFut, FetchEvent, EventFut, Pdu>(
|
||||||
|
rules: &RoomVersionRules,
|
||||||
|
conflicted_states: ConflictMap<OwnedEventId>,
|
||||||
|
auth_sets: AuthSets,
|
||||||
|
fetch: &FetchEvent,
|
||||||
|
exists: &FetchExists,
|
||||||
|
backport_css: bool,
|
||||||
|
) -> HashSet<OwnedEventId>
|
||||||
|
where
|
||||||
|
AuthSets: Stream<Item = AuthSet<OwnedEventId>> + Send,
|
||||||
|
FetchExists: Fn(OwnedEventId) -> ExistsFut + Sync,
|
||||||
|
ExistsFut: Future<Output = bool> + Send,
|
||||||
|
FetchEvent: Fn(OwnedEventId) -> EventFut + Sync,
|
||||||
|
EventFut: Future<Output = Result<Pdu>> + Send,
|
||||||
|
Pdu: Event + Clone,
|
||||||
|
{
|
||||||
|
let consider_conflicted_subgraph = rules
|
||||||
|
.state_res
|
||||||
|
.v2_rules()
|
||||||
|
.is_some_and(|rules| rules.consider_conflicted_state_subgraph)
|
||||||
|
|| backport_css;
|
||||||
|
|
||||||
|
let conflicted_states = conflicted_states.values().flatten().cloned();
|
||||||
|
|
||||||
|
// Since `org.matrix.hydra.11`, fetch the conflicted state subgraph.
|
||||||
|
let conflicted_subgraph = consider_conflicted_subgraph
|
||||||
|
.then(|| conflicted_states.clone().stream())
|
||||||
|
.map_async(async |ids| conflicted_subgraph_dfs(ids, fetch))
|
||||||
|
.map(Option::into_iter)
|
||||||
|
.map(IterStream::stream)
|
||||||
|
.flatten_stream()
|
||||||
|
.flatten()
|
||||||
|
.boxed();
|
||||||
|
|
||||||
|
auth_difference(auth_sets)
|
||||||
|
.chain(conflicted_states.stream())
|
||||||
|
.broad_filter_map(async |id| exists(id.clone()).await.then_some(id))
|
||||||
|
.chain(conflicted_subgraph)
|
||||||
|
.collect::<HashSet<_>>()
|
||||||
|
.inspect(|set| debug!(count = set.len(), "full conflicted set"))
|
||||||
|
.inspect(|set| trace!(?set, "full conflicted set"))
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|||||||
@@ -34,7 +34,7 @@ type Frame = AuthEvents;
|
|||||||
const PATH_INLINE: usize = 48;
|
const PATH_INLINE: usize = 48;
|
||||||
const STACK_INLINE: usize = 48;
|
const STACK_INLINE: usize = 48;
|
||||||
|
|
||||||
#[tracing::instrument(name = "conflicted_subgraph", level = "debug", skip_all)]
|
#[tracing::instrument(name = "subgraph_dfs", level = "debug", skip_all)]
|
||||||
pub(super) fn conflicted_subgraph_dfs<ConflictedEventIds, Fetch, Fut, Pdu>(
|
pub(super) fn conflicted_subgraph_dfs<ConflictedEventIds, Fetch, Fut, Pdu>(
|
||||||
conflicted_event_ids: ConflictedEventIds,
|
conflicted_event_ids: ConflictedEventIds,
|
||||||
fetch: &Fetch,
|
fetch: &Fetch,
|
||||||
|
|||||||
@@ -39,7 +39,7 @@ use crate::{
|
|||||||
level = "debug",
|
level = "debug",
|
||||||
skip_all,
|
skip_all,
|
||||||
fields(
|
fields(
|
||||||
event_id = power_level_event_id
|
power_levels = power_level_event_id
|
||||||
.as_deref()
|
.as_deref()
|
||||||
.map(EventId::as_str)
|
.map(EventId::as_str)
|
||||||
.unwrap_or_default(),
|
.unwrap_or_default(),
|
||||||
|
|||||||
@@ -45,7 +45,7 @@ use crate::{
|
|||||||
level = "debug",
|
level = "debug",
|
||||||
skip_all,
|
skip_all,
|
||||||
fields(
|
fields(
|
||||||
full_conflicted = full_conflicted_set.len(),
|
conflicted = full_conflicted_set.len(),
|
||||||
)
|
)
|
||||||
)]
|
)]
|
||||||
pub(super) async fn power_sort<Fetch, Fut, Pdu>(
|
pub(super) async fn power_sort<Fetch, Fut, Pdu>(
|
||||||
|
|||||||
Reference in New Issue
Block a user