Optimize conflicted-subgraph with single state container.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -1,4 +1,8 @@
|
|||||||
use std::{collections::HashSet as Set, iter::once, ops::Deref};
|
use std::{
|
||||||
|
collections::{HashMap as Map, hash_map::Entry},
|
||||||
|
iter::once,
|
||||||
|
ops::Deref,
|
||||||
|
};
|
||||||
|
|
||||||
use futures::{
|
use futures::{
|
||||||
Future, Stream, StreamExt,
|
Future, Stream, StreamExt,
|
||||||
@@ -9,29 +13,36 @@ use tuwunel_core::{
|
|||||||
Result, implement,
|
Result, implement,
|
||||||
matrix::{Event, pdu::AuthEvents},
|
matrix::{Event, pdu::AuthEvents},
|
||||||
smallvec::SmallVec,
|
smallvec::SmallVec,
|
||||||
utils::stream::IterStream,
|
utils::{BoolExt, stream::IterStream},
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
struct Global<Fut: Future + Send> {
|
struct Global<Fut: Future + Send> {
|
||||||
subgraph: Set<OwnedEventId>,
|
subgraph: Subgraph,
|
||||||
seen: Set<OwnedEventId>,
|
|
||||||
todo: Todo<Fut>,
|
todo: Todo<Fut>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default, Debug)]
|
#[derive(Default, Debug)]
|
||||||
struct Local {
|
struct Local {
|
||||||
|
id: usize,
|
||||||
path: Path,
|
path: Path,
|
||||||
stack: Stack,
|
stack: Stack,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Default, Debug)]
|
||||||
|
struct Substate {
|
||||||
|
subgraph: bool,
|
||||||
|
seen: bool,
|
||||||
|
}
|
||||||
|
|
||||||
type Todo<Fut> = FuturesUnordered<Fut>;
|
type Todo<Fut> = FuturesUnordered<Fut>;
|
||||||
|
type Subgraph = Map<OwnedEventId, Substate>;
|
||||||
type Path = SmallVec<[OwnedEventId; PATH_INLINE]>;
|
type Path = SmallVec<[OwnedEventId; PATH_INLINE]>;
|
||||||
type Stack = SmallVec<[Frame; STACK_INLINE]>;
|
type Stack = SmallVec<[Frame; STACK_INLINE]>;
|
||||||
type Frame = AuthEvents;
|
type Frame = AuthEvents;
|
||||||
|
|
||||||
const PATH_INLINE: usize = 16;
|
const PATH_INLINE: usize = 32;
|
||||||
const STACK_INLINE: usize = 16;
|
const STACK_INLINE: usize = 32;
|
||||||
|
|
||||||
#[tracing::instrument(
|
#[tracing::instrument(
|
||||||
name = "subgraph_dfs",
|
name = "subgraph_dfs",
|
||||||
@@ -50,13 +61,14 @@ where
|
|||||||
Fut: Future<Output = Result<Pdu>> + Send,
|
Fut: Future<Output = Result<Pdu>> + Send,
|
||||||
Pdu: Event,
|
Pdu: Event,
|
||||||
{
|
{
|
||||||
|
let initial_cap = conflicted_set.len().saturating_mul(3);
|
||||||
let state = Global {
|
let state = Global {
|
||||||
subgraph: Default::default(),
|
subgraph: Map::with_capacity(initial_cap),
|
||||||
seen: Default::default(),
|
|
||||||
todo: conflicted_set
|
todo: conflicted_set
|
||||||
.iter()
|
.iter()
|
||||||
.map(Deref::deref)
|
.map(Deref::deref)
|
||||||
.cloned()
|
.cloned()
|
||||||
|
.enumerate()
|
||||||
.map(Local::new)
|
.map(Local::new)
|
||||||
.filter_map(Local::pop)
|
.filter_map(Local::pop)
|
||||||
.map(|(local, event_id)| local.push(fetch, Some(event_id)))
|
.map(|(local, event_id)| local.push(fetch, Some(event_id)))
|
||||||
@@ -92,11 +104,18 @@ where
|
|||||||
level = "trace",
|
level = "trace",
|
||||||
skip_all,
|
skip_all,
|
||||||
fields(
|
fields(
|
||||||
|
subgraph = ?state
|
||||||
|
.subgraph
|
||||||
|
.values()
|
||||||
|
.fold((0_u64, 0_u64), |(a, b), v| {
|
||||||
|
(a.saturating_add(u64::from(v.subgraph)), b.saturating_add(u64::from(v.seen)))
|
||||||
|
}),
|
||||||
|
|
||||||
?event_id,
|
?event_id,
|
||||||
|
id = self.id,
|
||||||
path = self.path.len(),
|
path = self.path.len(),
|
||||||
stack = self.stack.len(),
|
stack = self.stack.iter().flatten().count(),
|
||||||
subgraph = state.subgraph.len(),
|
frames = self.stack.len(),
|
||||||
seen = state.seen.len(),
|
|
||||||
)
|
)
|
||||||
)]
|
)]
|
||||||
fn eval<Fut: Future + Send>(
|
fn eval<Fut: Future + Send>(
|
||||||
@@ -105,42 +124,65 @@ fn eval<Fut: Future + Send>(
|
|||||||
conflicted_event_ids: &Vec<&OwnedEventId>,
|
conflicted_event_ids: &Vec<&OwnedEventId>,
|
||||||
event_id: OwnedEventId,
|
event_id: OwnedEventId,
|
||||||
) -> (Self, Option<OwnedEventId>, Path) {
|
) -> (Self, Option<OwnedEventId>, Path) {
|
||||||
let Global { subgraph, seen, .. } = state;
|
let Global { subgraph, .. } = state;
|
||||||
|
|
||||||
let insert_path = |global: &mut Global<Fut>, local: &Local| {
|
let insert_path_filter = |subgraph: &mut Subgraph, event_id: &OwnedEventId| match subgraph
|
||||||
|
.entry(event_id.clone())
|
||||||
|
{
|
||||||
|
| Entry::Occupied(state) if state.get().subgraph => false,
|
||||||
|
| Entry::Occupied(mut state) => {
|
||||||
|
state.get_mut().subgraph = true;
|
||||||
|
state.get().subgraph
|
||||||
|
},
|
||||||
|
| Entry::Vacant(state) =>
|
||||||
|
state
|
||||||
|
.insert(Substate { subgraph: true, seen: false })
|
||||||
|
.subgraph,
|
||||||
|
};
|
||||||
|
|
||||||
|
let insert_path = |subgraph: &mut Subgraph, local: &Local| {
|
||||||
local
|
local
|
||||||
.path
|
.path
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|&event_id| global.subgraph.insert(event_id.clone()))
|
.filter(|&event_id| insert_path_filter(subgraph, event_id))
|
||||||
.cloned()
|
.cloned()
|
||||||
.collect()
|
.collect()
|
||||||
};
|
};
|
||||||
|
|
||||||
if subgraph.contains(&event_id) {
|
let is_conflicted = |event_id: &OwnedEventId| {
|
||||||
if self.path.len() <= 1 {
|
conflicted_event_ids
|
||||||
self.path.pop();
|
.binary_search(&event_id)
|
||||||
return (self, None, Path::new());
|
.is_ok()
|
||||||
}
|
};
|
||||||
|
|
||||||
|
let mut entry = subgraph.entry(event_id.clone());
|
||||||
|
|
||||||
|
if let Entry::Occupied(state) = &entry
|
||||||
|
&& state.get().subgraph
|
||||||
|
{
|
||||||
|
let path = (self.path.len() > 1)
|
||||||
|
.then(|| insert_path(subgraph, &self))
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
let path = insert_path(state, &self);
|
|
||||||
self.path.pop();
|
self.path.pop();
|
||||||
return (self, None, path);
|
return (self, None, path);
|
||||||
}
|
}
|
||||||
|
|
||||||
if !seen.insert(event_id.clone()) {
|
if let Entry::Occupied(state) = &mut entry {
|
||||||
|
state.get_mut().seen = true;
|
||||||
return (self, None, Path::new());
|
return (self, None, Path::new());
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.path.len() > 1
|
if let Entry::Vacant(state) = entry {
|
||||||
&& conflicted_event_ids
|
state.insert(Substate { subgraph: false, seen: true });
|
||||||
.binary_search(&&event_id)
|
|
||||||
.is_ok()
|
|
||||||
{
|
|
||||||
let path = insert_path(state, &self);
|
|
||||||
return (self, Some(event_id), path);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
(self, Some(event_id), Path::new())
|
let path = (self.path.len() > 1)
|
||||||
|
.and_if(|| is_conflicted(&event_id))
|
||||||
|
.then(|| insert_path(subgraph, &self))
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
(self, Some(event_id), path)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[implement(Local)]
|
#[implement(Local)]
|
||||||
@@ -176,9 +218,11 @@ fn pop(mut self) -> Option<(Self, OwnedEventId)> {
|
|||||||
|
|
||||||
#[implement(Local)]
|
#[implement(Local)]
|
||||||
#[allow(clippy::redundant_clone)] // buggy, nursery
|
#[allow(clippy::redundant_clone)] // buggy, nursery
|
||||||
fn new(conflicted_event_id: OwnedEventId) -> Self {
|
fn new((id, conflicted_event_id): (usize, OwnedEventId)) -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
id,
|
||||||
path: once(conflicted_event_id.clone()).collect(),
|
path: once(conflicted_event_id.clone()).collect(),
|
||||||
stack: once(once(conflicted_event_id).collect()).collect(),
|
stack: once(once(conflicted_event_id).collect()).collect(),
|
||||||
|
..Default::default()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user