Add release-sequence for counter retirements.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -1,13 +1,15 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::sync::{watch, watch::Sender};
|
||||
use tuwunel_core::{
|
||||
Result, utils,
|
||||
Result, err, utils,
|
||||
utils::two_phase_counter::{Counter as TwoPhaseCounter, Permit as TwoPhasePermit},
|
||||
};
|
||||
use tuwunel_database::{Database, Deserialized, Map};
|
||||
|
||||
pub struct Data {
|
||||
global: Arc<Map>,
|
||||
retires: Sender<u64>,
|
||||
counter: Arc<Counter>,
|
||||
pub(super) db: Arc<Database>,
|
||||
}
|
||||
@@ -21,16 +23,39 @@ const COUNTER: &[u8] = b"c";
|
||||
impl Data {
|
||||
pub(super) fn new(args: &crate::Args<'_>) -> Self {
|
||||
let db = args.db.clone();
|
||||
let count = Self::stored_count(&args.db["global"]).expect("initialize global counter");
|
||||
let retires = watch::channel(count).0;
|
||||
Self {
|
||||
db: args.db.clone(),
|
||||
global: args.db["global"].clone(),
|
||||
retires: retires.clone(),
|
||||
counter: Counter::new(
|
||||
Self::stored_count(&args.db["global"]).expect("initialized global counter"),
|
||||
count,
|
||||
Box::new(move |count| Self::store_count(&db, &db["global"], count)),
|
||||
Box::new(move |count| Self::handle_retire(&retires, count)),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn wait_pending(&self) -> Result<u64> {
|
||||
let count = self.counter.dispatched();
|
||||
self.wait_count(&count).await.inspect(|retired| {
|
||||
debug_assert!(
|
||||
*retired >= count,
|
||||
"Expecting retired sequence number >= snapshotted dispatch number"
|
||||
);
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn wait_count(&self, count: &u64) -> Result<u64> {
|
||||
self.retires
|
||||
.subscribe()
|
||||
.wait_for(|retired| retired.ge(count))
|
||||
.await
|
||||
.map(|retired| *retired)
|
||||
.map_err(|e| err!("counter channel error {e:?}"))
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn next_count(&self) -> Permit {
|
||||
self.counter
|
||||
@@ -41,6 +66,12 @@ impl Data {
|
||||
#[inline]
|
||||
pub fn current_count(&self) -> u64 { self.counter.current() }
|
||||
|
||||
fn handle_retire(sender: &Sender<u64>, count: u64) -> Result {
|
||||
let _prev = sender.send_replace(count);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn store_count(db: &Arc<Database>, global: &Arc<Map>, count: u64) -> Result {
|
||||
let _cork = db.cork();
|
||||
global.insert(COUNTER, count.to_be_bytes());
|
||||
|
||||
@@ -102,6 +102,12 @@ impl crate::Service for Service {
|
||||
}
|
||||
|
||||
impl Service {
|
||||
#[inline]
|
||||
pub async fn wait_pending(&self) -> Result<u64> { self.db.wait_pending().await }
|
||||
|
||||
#[inline]
|
||||
pub async fn wait_count(&self, count: &u64) -> Result<u64> { self.db.wait_count(count).await }
|
||||
|
||||
#[inline]
|
||||
#[must_use]
|
||||
pub fn next_count(&self) -> data::Permit { self.db.next_count() }
|
||||
|
||||
Reference in New Issue
Block a user