Add set difference for sorted streams util.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -81,3 +81,31 @@ where
|
||||
None
|
||||
})
|
||||
}
|
||||
|
||||
/// Difference of sets
|
||||
///
|
||||
/// Outputs the set of elements found in `a` which are not found in `b`. Streams
|
||||
/// must be sorted.
|
||||
pub fn difference_sorted_stream2<Item, S>(a: S, b: S) -> impl Stream<Item = Item> + Send
|
||||
where
|
||||
S: Stream<Item = Item> + Send + Unpin,
|
||||
Item: Eq + PartialOrd + Send + Sync,
|
||||
{
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
let b = Arc::new(Mutex::new(b.peekable()));
|
||||
a.map(move |ai| (ai, b.clone()))
|
||||
.filter_map(async move |(ai, b)| {
|
||||
let mut lock = b.lock().await;
|
||||
let b = &mut Pin::new(&mut *lock);
|
||||
while b.as_mut().next_if(|bi| *bi < ai).await.is_some() {
|
||||
continue;
|
||||
}
|
||||
|
||||
b.as_mut()
|
||||
.next_if_eq(&ai)
|
||||
.await
|
||||
.is_none()
|
||||
.then_some(ai)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -276,3 +276,47 @@ async fn set_intersection_sorted_stream2() {
|
||||
.await;
|
||||
assert!(r.eq(&["ccc", "ggg", "iii"]));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn set_difference_sorted_stream2() {
|
||||
use futures::StreamExt;
|
||||
use utils::{IterStream, set::difference_sorted_stream2};
|
||||
|
||||
let a = ["bar", "foo"];
|
||||
let b = ["bar"];
|
||||
let r = difference_sorted_stream2(a.iter().stream(), b.iter().stream())
|
||||
.collect::<Vec<&str>>()
|
||||
.await;
|
||||
println!("{r:?}");
|
||||
assert!(r.eq(&["foo"]));
|
||||
|
||||
let r = difference_sorted_stream2(b.iter().stream(), a.iter().stream())
|
||||
.collect::<Vec<&str>>()
|
||||
.await;
|
||||
println!("{r:?}");
|
||||
assert!(r.is_empty());
|
||||
|
||||
let a = ["aaa", "ccc", "xxx", "yyy"];
|
||||
let b = ["hhh", "iii", "jjj", "zzz"];
|
||||
let r = difference_sorted_stream2(a.iter().stream(), b.iter().stream())
|
||||
.collect::<Vec<&str>>()
|
||||
.await;
|
||||
println!("{r:?}");
|
||||
assert!(r.eq(&["aaa", "ccc", "xxx", "yyy"]));
|
||||
|
||||
let a = ["aaa", "ccc", "eee", "ggg"];
|
||||
let b = ["aaa", "bbb", "ccc", "ddd", "eee"];
|
||||
let r = difference_sorted_stream2(a.iter().stream(), b.iter().stream())
|
||||
.collect::<Vec<&str>>()
|
||||
.await;
|
||||
println!("{r:?}");
|
||||
assert!(r.eq(&["ggg"]));
|
||||
|
||||
let a = ["aaa", "ccc", "eee", "ggg", "hhh", "iii"];
|
||||
let b = ["bbb", "ccc", "ddd", "fff", "ggg", "iii"];
|
||||
let r = difference_sorted_stream2(a.iter().stream(), b.iter().stream())
|
||||
.collect::<Vec<&str>>()
|
||||
.await;
|
||||
println!("{r:?}");
|
||||
assert!(r.eq(&["aaa", "eee", "hhh"]));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user