//! Broadband stream combinator extensions to futures::Stream #![allow(clippy::type_complexity)] use std::convert::identity; use futures::{ stream::{Stream, StreamExt}, Future, }; use super::ReadyExt; const WIDTH: usize = 32; /// Concurrency extensions to augment futures::StreamExt. broad_ combinators /// produce out-of-order pub trait BroadbandExt where Self: Stream + Send + Sized, { /// Concurrent filter_map(); unordered results fn broadn_filter_map(self, n: N, f: F) -> impl Stream + Send where N: Into>, F: Fn(Item) -> Fut + Send, Fut: Future> + Send, U: Send; fn broadn_then(self, n: N, f: F) -> impl Stream + Send where N: Into>, F: Fn(Item) -> Fut + Send, Fut: Future + Send, U: Send; #[inline] fn broad_filter_map(self, f: F) -> impl Stream + Send where F: Fn(Item) -> Fut + Send, Fut: Future> + Send, U: Send, { self.broadn_filter_map(None, f) } #[inline] fn broad_then(self, f: F) -> impl Stream + Send where F: Fn(Item) -> Fut + Send, Fut: Future + Send, U: Send, { self.broadn_then(None, f) } } impl BroadbandExt for S where S: Stream + Send + Sized, { #[inline] fn broadn_filter_map(self, n: N, f: F) -> impl Stream + Send where N: Into>, F: Fn(Item) -> Fut + Send, Fut: Future> + Send, U: Send, { self.map(f) .buffer_unordered(n.into().unwrap_or(WIDTH)) .ready_filter_map(identity) } #[inline] fn broadn_then(self, n: N, f: F) -> impl Stream + Send where N: Into>, F: Fn(Item) -> Fut + Send, Fut: Future + Send, U: Send, { self.map(f).buffer_unordered(n.into().unwrap_or(WIDTH)) } }