additional futures extension utils

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2024-12-31 01:11:58 +00:00
parent a3f9432da8
commit 27328cbc01
6 changed files with 123 additions and 2 deletions

View File

@@ -8,6 +8,7 @@ mod ready;
mod tools;
mod try_broadband;
mod try_ready;
mod try_tools;
mod wideband;
pub use band::{
@@ -23,4 +24,5 @@ pub use ready::ReadyExt;
pub use tools::Tools;
pub use try_broadband::TryBroadbandExt;
pub use try_ready::TryReadyExt;
pub use try_tools::TryTools;
pub use wideband::WidebandExt;

View File

@@ -3,7 +3,7 @@
use futures::{
future::{ready, Ready},
stream::{AndThen, TryFilterMap, TryFold, TryForEach, TryStream, TryStreamExt},
stream::{AndThen, TryFilterMap, TryFold, TryForEach, TryStream, TryStreamExt, TryTakeWhile},
};
use crate::Result;
@@ -56,6 +56,13 @@ where
) -> TryForEach<Self, Ready<Result<(), E>>, impl FnMut(S::Ok) -> Ready<Result<(), E>>>
where
F: FnMut(S::Ok) -> Result<(), E>;
fn ready_try_take_while<F>(
self,
f: F,
) -> TryTakeWhile<Self, Ready<Result<bool, E>>, impl FnMut(&S::Ok) -> Ready<Result<bool, E>>>
where
F: Fn(&S::Ok) -> Result<bool, E>;
}
impl<T, E, S> TryReadyExt<T, E, S> for S
@@ -122,4 +129,15 @@ where
{
self.try_for_each(move |t| ready(f(t)))
}
#[inline]
fn ready_try_take_while<F>(
self,
f: F,
) -> TryTakeWhile<Self, Ready<Result<bool, E>>, impl FnMut(&S::Ok) -> Ready<Result<bool, E>>>
where
F: Fn(&S::Ok) -> Result<bool, E>,
{
self.try_take_while(move |t| ready(f(t)))
}
}

View File

@@ -0,0 +1,44 @@
//! TryStreamTools for futures::TryStream
#![allow(clippy::type_complexity)]
use futures::{future, future::Ready, stream::TryTakeWhile, TryStream, TryStreamExt};
use crate::Result;
/// TryStreamTools
pub trait TryTools<T, E, S>
where
S: TryStream<Ok = T, Error = E, Item = Result<T, E>> + Send + ?Sized,
Self: TryStream + Send + Sized,
{
fn try_take(
self,
n: usize,
) -> TryTakeWhile<
Self,
Ready<Result<bool, S::Error>>,
impl FnMut(&S::Ok) -> Ready<Result<bool, S::Error>>,
>;
}
impl<T, E, S> TryTools<T, E, S> for S
where
S: TryStream<Ok = T, Error = E, Item = Result<T, E>> + Send + ?Sized,
Self: TryStream + Send + Sized,
{
#[inline]
fn try_take(
self,
mut n: usize,
) -> TryTakeWhile<
Self,
Ready<Result<bool, S::Error>>,
impl FnMut(&S::Ok) -> Ready<Result<bool, S::Error>>,
> {
self.try_take_while(move |_| {
let res = future::ok(n > 0);
n = n.saturating_sub(1);
res
})
}
}