From ae4aad3641c5c701176e285af84dd9dd28133a5a Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Mon, 29 Sep 2025 01:52:05 +0000 Subject: [PATCH] Add set difference for sorted streams util. Signed-off-by: Jason Volk --- src/core/utils/set.rs | 28 ++++++++++++++++++++++++++ src/core/utils/tests.rs | 44 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+) diff --git a/src/core/utils/set.rs b/src/core/utils/set.rs index 689668c8..673ea858 100644 --- a/src/core/utils/set.rs +++ b/src/core/utils/set.rs @@ -81,3 +81,31 @@ where None }) } + +/// Difference of sets +/// +/// Outputs the set of elements found in `a` which are not found in `b`. Streams +/// must be sorted. +pub fn difference_sorted_stream2(a: S, b: S) -> impl Stream + Send +where + S: Stream + Send + Unpin, + Item: Eq + PartialOrd + Send + Sync, +{ + use tokio::sync::Mutex; + + let b = Arc::new(Mutex::new(b.peekable())); + a.map(move |ai| (ai, b.clone())) + .filter_map(async move |(ai, b)| { + let mut lock = b.lock().await; + let b = &mut Pin::new(&mut *lock); + while b.as_mut().next_if(|bi| *bi < ai).await.is_some() { + continue; + } + + b.as_mut() + .next_if_eq(&ai) + .await + .is_none() + .then_some(ai) + }) +} diff --git a/src/core/utils/tests.rs b/src/core/utils/tests.rs index 05a0655b..8858feff 100644 --- a/src/core/utils/tests.rs +++ b/src/core/utils/tests.rs @@ -276,3 +276,47 @@ async fn set_intersection_sorted_stream2() { .await; assert!(r.eq(&["ccc", "ggg", "iii"])); } + +#[tokio::test] +async fn set_difference_sorted_stream2() { + use futures::StreamExt; + use utils::{IterStream, set::difference_sorted_stream2}; + + let a = ["bar", "foo"]; + let b = ["bar"]; + let r = difference_sorted_stream2(a.iter().stream(), b.iter().stream()) + .collect::>() + .await; + println!("{r:?}"); + assert!(r.eq(&["foo"])); + + let r = difference_sorted_stream2(b.iter().stream(), a.iter().stream()) + .collect::>() + .await; + println!("{r:?}"); + assert!(r.is_empty()); + + let a = ["aaa", "ccc", "xxx", "yyy"]; + let b = ["hhh", "iii", "jjj", "zzz"]; + let r = difference_sorted_stream2(a.iter().stream(), b.iter().stream()) + .collect::>() + .await; + println!("{r:?}"); + assert!(r.eq(&["aaa", "ccc", "xxx", "yyy"])); + + let a = ["aaa", "ccc", "eee", "ggg"]; + let b = ["aaa", "bbb", "ccc", "ddd", "eee"]; + let r = difference_sorted_stream2(a.iter().stream(), b.iter().stream()) + .collect::>() + .await; + println!("{r:?}"); + assert!(r.eq(&["ggg"])); + + let a = ["aaa", "ccc", "eee", "ggg", "hhh", "iii"]; + let b = ["bbb", "ccc", "ddd", "fff", "ggg", "iii"]; + let r = difference_sorted_stream2(a.iter().stream(), b.iter().stream()) + .collect::>() + .await; + println!("{r:?}"); + assert!(r.eq(&["aaa", "eee", "hhh"])); +}