use std::{ cmp::{Eq, Ord}, convert::identity, pin::Pin, sync::Arc, }; use futures::{ Stream, StreamExt, stream::{Peekable, unfold}, }; use tokio::sync::Mutex; use crate::{is_equal_to, is_less_than, utils::stream::ReadyExt}; /// Intersection of sets /// /// Outputs the set of elements common to all input sets. Inputs do not have to /// be sorted. If inputs are sorted a more optimized function is available in /// this suite and should be used. pub fn intersection(mut input: Iters) -> impl Iterator + Send where Iters: Iterator + Clone + Send, Iter: Iterator + Send, Item: Eq, { input.next().into_iter().flat_map(move |first| { let input = input.clone(); first.filter(move |targ| { input .clone() .all(|mut other| other.any(is_equal_to!(*targ))) }) }) } /// Intersection of sets /// /// Outputs the set of elements common to all input sets. Inputs must be sorted. pub fn intersection_sorted( mut input: Iters, ) -> impl Iterator + Send where Iters: Iterator + Clone + Send, Iter: Iterator + Send, Item: Eq + Ord, { input.next().into_iter().flat_map(move |first| { let mut input = input.clone().collect::>(); first.filter(move |targ| { input.iter_mut().all(|it| { it.by_ref() .skip_while(is_less_than!(targ)) .peekable() .peek() .is_some_and(is_equal_to!(targ)) }) }) }) } /// Intersection of sets /// /// Outputs the set of elements common to both streams. Streams must be sorted. pub fn intersection_sorted_stream2(a: S, b: S) -> impl Stream + Send where S: Stream + Send + Unpin, Item: Eq + PartialOrd + Send + Sync, { struct State { a: S, b: Peekable, } unfold(State { a, b: b.peekable() }, async |mut state| { let ai = state.a.next().await?; while let Some(bi) = Pin::new(&mut state.b) .next_if(|bi| *bi <= ai) .await .as_ref() { if ai == *bi { return Some((Some(ai), state)); } } Some((None, state)) }) .ready_filter_map(identity) } /// Difference of sets /// /// Outputs the set of elements found in `a` which are not found in `b`. Streams /// must be sorted. pub fn difference_sorted_stream2(a: A, b: B) -> impl Stream + Send where A: Stream + Send, B: Stream + Send + Unpin, Item: Eq + PartialOrd + Send + Sync, { let b = Arc::new(Mutex::new(b.peekable())); a.map(move |ai| (ai, b.clone())) .filter_map(async move |(ai, b)| { let mut lock = b.lock().await; let b = &mut Pin::new(&mut *lock); while b.as_mut().next_if(|bi| *bi < ai).await.is_some() { continue; } b.as_mut() .next_if_eq(&ai) .await .is_none() .then_some(ai) }) }