//! Two-Phase Counter. use std::{ collections::VecDeque, ops::{Deref, Range}, sync::{Arc, RwLock}, }; use crate::{Result, checked, is_equal_to}; /// Two-Phase Counter. /// /// This device solves the problem of a One-Phase Counter (or just a counter) /// which is incremented to provide unique sequence numbers (or index numbers) /// fundamental to server operation. For example, let's say a new Matrix Pdu /// is received: the counter is incremented and its value becomes the PduId /// used as a key for the Pdu value when writing to the database. /// /// Problem: With a single counter shared by both writers and readers, pending /// writes might still be in-flight and not visible to readers after the writer /// incremented it. For example, client-sync sees the counter at a certain /// value, but that value has no Pdu found because its write has not been /// completed with global visibility. Client-sync will then move on to the next /// counter value having missed the data from the current one. pub struct Counter Result + Sync> { /// Self is intended to be Arc with inner state mutable via Lock. inner: RwLock>, } /// Inner protected state for Two-Phase Counter. pub struct State Result + Sync> { /// Monotonic counter. The next sequence number is drawn by adding one to /// this value. That number will be persisted and added to `pending`. dispatched: u64, /// Callback to persist the next sequence number drawn from `dispatched`. /// This prevents pending numbers from being reused after server restart. commit: F, /// List of pending sequence numbers. One less than the minimum value in /// this list is the "retirement" sequence number where all writes have /// completed and all reads are globally visible. pending: VecDeque, /// Callback to notify updates of the retirement value. This is likely /// called from the destructor of a permit/guard; try not to panic. release: F, } pub struct Permit Result + Sync> { /// Link back to the shared-state. state: Arc>, /// The retirement value computed as a courtesy when this permit was /// created. retired: u64, /// Sequence number of this permit. id: u64, } impl Result + Sync> Counter { /// Construct a new Two-Phase counter state. The value of `init` is /// considered retired, and the next sequence number dispatched will be one /// greater. pub fn new(init: u64, commit: F, release: F) -> Arc { Arc::new(Self { inner: State::new(init, commit, release).into(), }) } /// Obtain a sequence number to conduct write operations for the scope. pub fn next(self: &Arc) -> Result> { let (retired, id) = self.inner.write()?.dispatch()?; Ok(Permit:: { state: self.clone(), retired, id }) } /// Load the current and dispatched values simultaneously #[inline] pub fn range(&self) -> Range { let inner = self.inner.read().expect("locked for reading"); Range { start: inner.retired(), end: inner.dispatched, } } /// Load the highest sequence number safe for reading, also known as the /// retirement value with writes "globally visible." #[inline] pub fn current(&self) -> u64 { self.inner .read() .expect("locked for reading") .retired() } /// Load the highest sequence number (dispatched); may still be pending or /// may be retired. #[inline] pub fn dispatched(&self) -> u64 { self.inner .read() .expect("locked for reading") .dispatched } } impl Result + Sync> State { /// Create new state, starting from `init`. The next sequence number /// dispatched will be one greater than `init`. fn new(dispatched: u64, commit: F, release: F) -> Self { Self { dispatched, commit, pending: VecDeque::new(), release, } } /// Dispatch the next sequence number as pending. The retired value is /// calculated as a courtesy while the state is under lock. fn dispatch(&mut self) -> Result<(u64, u64)> { let prev = self.dispatched; let retired = self.retired(); let dispatched = checked!(prev + 1)?; debug_assert!( !self.check_pending(dispatched), "sequence number cannot already be pending", ); (self.commit)(dispatched)?; self.dispatched = dispatched; self.pending.push_back(self.dispatched); Ok((retired, self.dispatched)) } /// Retire the sequence number `id`. fn retire(&mut self, id: u64) { debug_assert!(self.check_pending(id), "sequence number must be currently pending",); let index = self .pending_index(id) .expect("sequence number must be found as pending"); let removed = self .pending .remove(index) .expect("sequence number at index must be removed"); debug_assert_eq!(removed, id, "sequence number removed must match id"); // release only occurs when the oldest value retires if index != 0 { return; } // release occurs for the maximum retired value let release = if self.pending.is_empty() { self.dispatched } else { id }; debug_assert!(release >= id, "sequence number released must not be less than id"); (self.release)(release).expect("release callback should not error"); } /// Calculate the retired sequence number, one less than the lowest pending /// sequence number. If nothing is pending the value of `dispatched` has /// been previously retired and is returned. fn retired(&self) -> u64 { debug_assert!( self.pending.iter().is_sorted(), "Pending values should be naturally sorted" ); self.pending .front() .map(|val| val.saturating_sub(1)) .unwrap_or(self.dispatched) } /// Get the position of `id` in the pending list. fn pending_index(&self, id: u64) -> Option { debug_assert!( self.pending.iter().is_sorted(), "Pending values should be naturally sorted" ); self.pending.binary_search(&id).ok() } /// Check for `id` in the pending list sequentially (for debug and assertion /// purposes only) fn check_pending(&self, id: u64) -> bool { self.pending.iter().any(is_equal_to!(&id)) } } impl Result + Sync> Permit { /// Access the retired sequence number sampled at this permit's creation. /// This may be outdated prior to access. Obtained as a courtesy under lock. #[inline] #[must_use] pub fn retired(&self) -> &u64 { &self.retired } /// Access the sequence number obtained by this permit; a unique value #[inline] #[must_use] pub fn id(&self) -> &u64 { &self.id } } impl Result + Sync> Deref for Permit { type Target = u64; #[inline] fn deref(&self) -> &Self::Target { self.id() } } impl Result + Sync> Drop for Permit { fn drop(&mut self) { self.state .inner .write() .expect("locked for writing") .retire(self.id); } }