Add release-sequence for counter retirements.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-07-25 10:06:40 +00:00
parent 85a84f93c7
commit 51931de94c
3 changed files with 63 additions and 5 deletions

View File

@@ -44,6 +44,10 @@ pub struct State<F: Fn(u64) -> Result + Sync> {
/// this list is the "retirement" sequence number where all writes have /// this list is the "retirement" sequence number where all writes have
/// completed and all reads are globally visible. /// completed and all reads are globally visible.
pending: VecDeque<u64>, pending: VecDeque<u64>,
/// Callback to notify updates of the retirement value. This is likely
/// called from the destructor of a permit/guard; try not to panic.
release: F,
} }
#[derive(Debug)] #[derive(Debug)]
@@ -63,8 +67,10 @@ impl<F: Fn(u64) -> Result + Sync> Counter<F> {
/// Construct a new Two-Phase counter state. The value of `init` is /// Construct a new Two-Phase counter state. The value of `init` is
/// considered retired, and the next sequence number dispatched will be one /// considered retired, and the next sequence number dispatched will be one
/// greater. /// greater.
pub fn new(init: u64, commit: F) -> Arc<Self> { pub fn new(init: u64, commit: F, release: F) -> Arc<Self> {
Arc::new(Self { inner: State::new(init, commit).into() }) Arc::new(Self {
inner: State::new(init, commit, release).into(),
})
} }
/// Obtain a sequence number to conduct write operations for the scope. /// Obtain a sequence number to conduct write operations for the scope.
@@ -83,16 +89,27 @@ impl<F: Fn(u64) -> Result + Sync> Counter<F> {
.expect("locked for reading") .expect("locked for reading")
.retired() .retired()
} }
/// Load the highest sequence number (dispatched); may still be pending or
/// may be retired.
#[inline]
pub fn dispatched(&self) -> u64 {
self.inner
.read()
.expect("locked for reading")
.dispatched
}
} }
impl<F: Fn(u64) -> Result + Sync> State<F> { impl<F: Fn(u64) -> Result + Sync> State<F> {
/// Create new state, starting from `init`. The next sequence number /// Create new state, starting from `init`. The next sequence number
/// dispatched will be one greater than `init`. /// dispatched will be one greater than `init`.
fn new(dispatched: u64, commit: F) -> Self { fn new(dispatched: u64, commit: F, release: F) -> Self {
Self { Self {
dispatched, dispatched,
commit, commit,
pending: VecDeque::new(), pending: VecDeque::new(),
release,
} }
} }
@@ -127,6 +144,10 @@ impl<F: Fn(u64) -> Result + Sync> State<F> {
.expect("sequence number at index must be removed"); .expect("sequence number at index must be removed");
debug_assert!(removed == id, "sequence number removed must match id"); debug_assert!(removed == id, "sequence number removed must match id");
if index == 0 {
(self.release)(id).expect("release callback should not error");
}
} }
/// Calculate the retired sequence number, one less than the lowest pending /// Calculate the retired sequence number, one less than the lowest pending

View File

@@ -1,13 +1,15 @@
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{watch, watch::Sender};
use tuwunel_core::{ use tuwunel_core::{
Result, utils, Result, err, utils,
utils::two_phase_counter::{Counter as TwoPhaseCounter, Permit as TwoPhasePermit}, utils::two_phase_counter::{Counter as TwoPhaseCounter, Permit as TwoPhasePermit},
}; };
use tuwunel_database::{Database, Deserialized, Map}; use tuwunel_database::{Database, Deserialized, Map};
pub struct Data { pub struct Data {
global: Arc<Map>, global: Arc<Map>,
retires: Sender<u64>,
counter: Arc<Counter>, counter: Arc<Counter>,
pub(super) db: Arc<Database>, pub(super) db: Arc<Database>,
} }
@@ -21,16 +23,39 @@ const COUNTER: &[u8] = b"c";
impl Data { impl Data {
pub(super) fn new(args: &crate::Args<'_>) -> Self { pub(super) fn new(args: &crate::Args<'_>) -> Self {
let db = args.db.clone(); let db = args.db.clone();
let count = Self::stored_count(&args.db["global"]).expect("initialize global counter");
let retires = watch::channel(count).0;
Self { Self {
db: args.db.clone(), db: args.db.clone(),
global: args.db["global"].clone(), global: args.db["global"].clone(),
retires: retires.clone(),
counter: Counter::new( counter: Counter::new(
Self::stored_count(&args.db["global"]).expect("initialized global counter"), count,
Box::new(move |count| Self::store_count(&db, &db["global"], count)), Box::new(move |count| Self::store_count(&db, &db["global"], count)),
Box::new(move |count| Self::handle_retire(&retires, count)),
), ),
} }
} }
pub async fn wait_pending(&self) -> Result<u64> {
let count = self.counter.dispatched();
self.wait_count(&count).await.inspect(|retired| {
debug_assert!(
*retired >= count,
"Expecting retired sequence number >= snapshotted dispatch number"
);
})
}
pub async fn wait_count(&self, count: &u64) -> Result<u64> {
self.retires
.subscribe()
.wait_for(|retired| retired.ge(count))
.await
.map(|retired| *retired)
.map_err(|e| err!("counter channel error {e:?}"))
}
#[inline] #[inline]
pub fn next_count(&self) -> Permit { pub fn next_count(&self) -> Permit {
self.counter self.counter
@@ -41,6 +66,12 @@ impl Data {
#[inline] #[inline]
pub fn current_count(&self) -> u64 { self.counter.current() } pub fn current_count(&self) -> u64 { self.counter.current() }
fn handle_retire(sender: &Sender<u64>, count: u64) -> Result {
let _prev = sender.send_replace(count);
Ok(())
}
fn store_count(db: &Arc<Database>, global: &Arc<Map>, count: u64) -> Result { fn store_count(db: &Arc<Database>, global: &Arc<Map>, count: u64) -> Result {
let _cork = db.cork(); let _cork = db.cork();
global.insert(COUNTER, count.to_be_bytes()); global.insert(COUNTER, count.to_be_bytes());

View File

@@ -102,6 +102,12 @@ impl crate::Service for Service {
} }
impl Service { impl Service {
#[inline]
pub async fn wait_pending(&self) -> Result<u64> { self.db.wait_pending().await }
#[inline]
pub async fn wait_count(&self, count: &u64) -> Result<u64> { self.db.wait_count(count).await }
#[inline] #[inline]
#[must_use] #[must_use]
pub fn next_count(&self) -> data::Permit { self.db.next_count() } pub fn next_count(&self) -> data::Permit { self.db.next_count() }