diff --git a/src/core/utils/stream/broadband.rs b/src/core/utils/stream/broadband.rs index 832f2638..aea03041 100644 --- a/src/core/utils/stream/broadband.rs +++ b/src/core/utils/stream/broadband.rs @@ -35,6 +35,19 @@ where Fut: Future> + Send, U: Send; + /// Concurrent find_map(); unordered result + fn broadn_find_map<'a, F, Fut, U, N>( + self, + n: N, + f: F, + ) -> impl Future> + Send + where + N: Into>, + F: Fn(Item) -> Fut + Send + 'a, + Fut: Future> + Send, + U: Send + 'a, + Self: Unpin + 'a; + fn broadn_flat_map(self, n: N, f: F) -> impl Stream + Send where N: Into>, @@ -77,6 +90,17 @@ where self.broadn_filter_map(None, f) } + #[inline] + fn broad_find_map<'a, F, Fut, U>(self, f: F) -> impl Future> + Send + where + F: Fn(Item) -> Fut + Send + 'a, + Fut: Future> + Send, + U: Send + 'a, + Self: Unpin + 'a, + { + self.broadn_find_map(None, f) + } + #[inline] fn broad_flat_map(self, f: F) -> impl Stream + Send where @@ -139,6 +163,24 @@ where .ready_filter_map(identity) } + #[inline] + fn broadn_find_map<'a, F, Fut, U, N>( + self, + n: N, + f: F, + ) -> impl Future> + Send + where + N: Into>, + F: Fn(Item) -> Fut + Send + 'a, + Fut: Future> + Send, + U: Send + 'a, + Self: Unpin + 'a, + { + self.map(f) + .buffer_unordered(n.into().unwrap_or_else(automatic_width)) + .ready_find_map(identity) + } + #[inline] fn broadn_flat_map(self, n: N, f: F) -> impl Stream + Send where diff --git a/src/core/utils/stream/ready.rs b/src/core/utils/stream/ready.rs index be4d1b25..5baa094e 100644 --- a/src/core/utils/stream/ready.rs +++ b/src/core/utils/stream/ready.rs @@ -32,6 +32,13 @@ where F: Fn(&Item) -> bool + Send + 'a, Item: Send; + fn ready_find_map<'a, F, U>(self, f: F) -> impl Future> + Send + where + Self: Send + Unpin + 'a, + F: Fn(Item) -> Option + Send + 'a, + Item: Send, + U: Send; + fn ready_filter<'a, F>( self, f: F, @@ -130,6 +137,20 @@ where .map(|(curr, _next)| curr) } + #[inline] + fn ready_find_map<'a, F, U>(self, f: F) -> impl Future> + Send + where + Self: Send + Unpin + 'a, + F: Fn(Item) -> Option + Send + 'a, + Item: Send, + U: Send, + { + self.ready_filter_map(f) + .take(1) + .into_future() + .map(|(curr, _next)| curr) + } + #[inline] fn ready_filter<'a, F>( self,