Constrain size of FuturesUnordered for conflicted-subgraph.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -13,7 +13,10 @@ use tuwunel_core::{
|
|||||||
Result, implement,
|
Result, implement,
|
||||||
matrix::{Event, pdu::AuthEvents},
|
matrix::{Event, pdu::AuthEvents},
|
||||||
smallvec::SmallVec,
|
smallvec::SmallVec,
|
||||||
utils::{BoolExt, stream::IterStream},
|
utils::{
|
||||||
|
BoolExt,
|
||||||
|
stream::{IterStream, automatic_width},
|
||||||
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
@@ -43,6 +46,7 @@ type Frame = AuthEvents;
|
|||||||
|
|
||||||
const PATH_INLINE: usize = 32;
|
const PATH_INLINE: usize = 32;
|
||||||
const STACK_INLINE: usize = 32;
|
const STACK_INLINE: usize = 32;
|
||||||
|
const CAPACITY_MULTIPLIER: usize = 4;
|
||||||
|
|
||||||
#[tracing::instrument(
|
#[tracing::instrument(
|
||||||
name = "subgraph_dfs",
|
name = "subgraph_dfs",
|
||||||
@@ -61,21 +65,36 @@ where
|
|||||||
Fut: Future<Output = Result<Pdu>> + Send,
|
Fut: Future<Output = Result<Pdu>> + Send,
|
||||||
Pdu: Event,
|
Pdu: Event,
|
||||||
{
|
{
|
||||||
let initial_cap = conflicted_set.len().saturating_mul(3);
|
let initial_capacity = conflicted_set
|
||||||
|
.len()
|
||||||
|
.saturating_mul(CAPACITY_MULTIPLIER);
|
||||||
|
|
||||||
let state = Global {
|
let state = Global {
|
||||||
subgraph: Map::with_capacity(initial_cap),
|
subgraph: Map::with_capacity(initial_capacity),
|
||||||
todo: conflicted_set
|
todo: Todo::<_>::new(),
|
||||||
.iter()
|
|
||||||
.map(Deref::deref)
|
|
||||||
.cloned()
|
|
||||||
.enumerate()
|
|
||||||
.map(Local::new)
|
|
||||||
.filter_map(Local::pop)
|
|
||||||
.map(|(local, event_id)| local.push(fetch, Some(event_id)))
|
|
||||||
.collect(),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
unfold(state, |mut state| async {
|
let inputs = conflicted_set
|
||||||
|
.iter()
|
||||||
|
.map(Deref::deref)
|
||||||
|
.cloned()
|
||||||
|
.enumerate()
|
||||||
|
.map(Local::new)
|
||||||
|
.filter_map(Local::pop)
|
||||||
|
.map(|(local, event_id)| local.push(fetch, Some(event_id)));
|
||||||
|
|
||||||
|
unfold((inputs, state), async |(mut inputs, mut state)| {
|
||||||
|
debug_assert!(
|
||||||
|
state.todo.len() <= automatic_width(),
|
||||||
|
"Excessive items todo in FuturesUnordered"
|
||||||
|
);
|
||||||
|
|
||||||
|
while state.todo.len() < automatic_width()
|
||||||
|
&& let Some(input) = inputs.next()
|
||||||
|
{
|
||||||
|
state.todo.push(input);
|
||||||
|
}
|
||||||
|
|
||||||
let outputs = state
|
let outputs = state
|
||||||
.todo
|
.todo
|
||||||
.next()
|
.next()
|
||||||
@@ -93,7 +112,7 @@ where
|
|||||||
.flatten()
|
.flatten()
|
||||||
.stream();
|
.stream();
|
||||||
|
|
||||||
Some((outputs, state))
|
Some((outputs, (inputs, state)))
|
||||||
})
|
})
|
||||||
.flatten()
|
.flatten()
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user