Add release-sequence for counter retirements.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -44,6 +44,10 @@ pub struct State<F: Fn(u64) -> Result + Sync> {
|
|||||||
/// this list is the "retirement" sequence number where all writes have
|
/// this list is the "retirement" sequence number where all writes have
|
||||||
/// completed and all reads are globally visible.
|
/// completed and all reads are globally visible.
|
||||||
pending: VecDeque<u64>,
|
pending: VecDeque<u64>,
|
||||||
|
|
||||||
|
/// Callback to notify updates of the retirement value. This is likely
|
||||||
|
/// called from the destructor of a permit/guard; try not to panic.
|
||||||
|
release: F,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@@ -63,8 +67,10 @@ impl<F: Fn(u64) -> Result + Sync> Counter<F> {
|
|||||||
/// Construct a new Two-Phase counter state. The value of `init` is
|
/// Construct a new Two-Phase counter state. The value of `init` is
|
||||||
/// considered retired, and the next sequence number dispatched will be one
|
/// considered retired, and the next sequence number dispatched will be one
|
||||||
/// greater.
|
/// greater.
|
||||||
pub fn new(init: u64, commit: F) -> Arc<Self> {
|
pub fn new(init: u64, commit: F, release: F) -> Arc<Self> {
|
||||||
Arc::new(Self { inner: State::new(init, commit).into() })
|
Arc::new(Self {
|
||||||
|
inner: State::new(init, commit, release).into(),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Obtain a sequence number to conduct write operations for the scope.
|
/// Obtain a sequence number to conduct write operations for the scope.
|
||||||
@@ -83,16 +89,27 @@ impl<F: Fn(u64) -> Result + Sync> Counter<F> {
|
|||||||
.expect("locked for reading")
|
.expect("locked for reading")
|
||||||
.retired()
|
.retired()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Load the highest sequence number (dispatched); may still be pending or
|
||||||
|
/// may be retired.
|
||||||
|
#[inline]
|
||||||
|
pub fn dispatched(&self) -> u64 {
|
||||||
|
self.inner
|
||||||
|
.read()
|
||||||
|
.expect("locked for reading")
|
||||||
|
.dispatched
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F: Fn(u64) -> Result + Sync> State<F> {
|
impl<F: Fn(u64) -> Result + Sync> State<F> {
|
||||||
/// Create new state, starting from `init`. The next sequence number
|
/// Create new state, starting from `init`. The next sequence number
|
||||||
/// dispatched will be one greater than `init`.
|
/// dispatched will be one greater than `init`.
|
||||||
fn new(dispatched: u64, commit: F) -> Self {
|
fn new(dispatched: u64, commit: F, release: F) -> Self {
|
||||||
Self {
|
Self {
|
||||||
dispatched,
|
dispatched,
|
||||||
commit,
|
commit,
|
||||||
pending: VecDeque::new(),
|
pending: VecDeque::new(),
|
||||||
|
release,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -127,6 +144,10 @@ impl<F: Fn(u64) -> Result + Sync> State<F> {
|
|||||||
.expect("sequence number at index must be removed");
|
.expect("sequence number at index must be removed");
|
||||||
|
|
||||||
debug_assert!(removed == id, "sequence number removed must match id");
|
debug_assert!(removed == id, "sequence number removed must match id");
|
||||||
|
|
||||||
|
if index == 0 {
|
||||||
|
(self.release)(id).expect("release callback should not error");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Calculate the retired sequence number, one less than the lowest pending
|
/// Calculate the retired sequence number, one less than the lowest pending
|
||||||
|
|||||||
@@ -1,13 +1,15 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use tokio::sync::{watch, watch::Sender};
|
||||||
use tuwunel_core::{
|
use tuwunel_core::{
|
||||||
Result, utils,
|
Result, err, utils,
|
||||||
utils::two_phase_counter::{Counter as TwoPhaseCounter, Permit as TwoPhasePermit},
|
utils::two_phase_counter::{Counter as TwoPhaseCounter, Permit as TwoPhasePermit},
|
||||||
};
|
};
|
||||||
use tuwunel_database::{Database, Deserialized, Map};
|
use tuwunel_database::{Database, Deserialized, Map};
|
||||||
|
|
||||||
pub struct Data {
|
pub struct Data {
|
||||||
global: Arc<Map>,
|
global: Arc<Map>,
|
||||||
|
retires: Sender<u64>,
|
||||||
counter: Arc<Counter>,
|
counter: Arc<Counter>,
|
||||||
pub(super) db: Arc<Database>,
|
pub(super) db: Arc<Database>,
|
||||||
}
|
}
|
||||||
@@ -21,16 +23,39 @@ const COUNTER: &[u8] = b"c";
|
|||||||
impl Data {
|
impl Data {
|
||||||
pub(super) fn new(args: &crate::Args<'_>) -> Self {
|
pub(super) fn new(args: &crate::Args<'_>) -> Self {
|
||||||
let db = args.db.clone();
|
let db = args.db.clone();
|
||||||
|
let count = Self::stored_count(&args.db["global"]).expect("initialize global counter");
|
||||||
|
let retires = watch::channel(count).0;
|
||||||
Self {
|
Self {
|
||||||
db: args.db.clone(),
|
db: args.db.clone(),
|
||||||
global: args.db["global"].clone(),
|
global: args.db["global"].clone(),
|
||||||
|
retires: retires.clone(),
|
||||||
counter: Counter::new(
|
counter: Counter::new(
|
||||||
Self::stored_count(&args.db["global"]).expect("initialized global counter"),
|
count,
|
||||||
Box::new(move |count| Self::store_count(&db, &db["global"], count)),
|
Box::new(move |count| Self::store_count(&db, &db["global"], count)),
|
||||||
|
Box::new(move |count| Self::handle_retire(&retires, count)),
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn wait_pending(&self) -> Result<u64> {
|
||||||
|
let count = self.counter.dispatched();
|
||||||
|
self.wait_count(&count).await.inspect(|retired| {
|
||||||
|
debug_assert!(
|
||||||
|
*retired >= count,
|
||||||
|
"Expecting retired sequence number >= snapshotted dispatch number"
|
||||||
|
);
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn wait_count(&self, count: &u64) -> Result<u64> {
|
||||||
|
self.retires
|
||||||
|
.subscribe()
|
||||||
|
.wait_for(|retired| retired.ge(count))
|
||||||
|
.await
|
||||||
|
.map(|retired| *retired)
|
||||||
|
.map_err(|e| err!("counter channel error {e:?}"))
|
||||||
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn next_count(&self) -> Permit {
|
pub fn next_count(&self) -> Permit {
|
||||||
self.counter
|
self.counter
|
||||||
@@ -41,6 +66,12 @@ impl Data {
|
|||||||
#[inline]
|
#[inline]
|
||||||
pub fn current_count(&self) -> u64 { self.counter.current() }
|
pub fn current_count(&self) -> u64 { self.counter.current() }
|
||||||
|
|
||||||
|
fn handle_retire(sender: &Sender<u64>, count: u64) -> Result {
|
||||||
|
let _prev = sender.send_replace(count);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
fn store_count(db: &Arc<Database>, global: &Arc<Map>, count: u64) -> Result {
|
fn store_count(db: &Arc<Database>, global: &Arc<Map>, count: u64) -> Result {
|
||||||
let _cork = db.cork();
|
let _cork = db.cork();
|
||||||
global.insert(COUNTER, count.to_be_bytes());
|
global.insert(COUNTER, count.to_be_bytes());
|
||||||
|
|||||||
@@ -102,6 +102,12 @@ impl crate::Service for Service {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Service {
|
impl Service {
|
||||||
|
#[inline]
|
||||||
|
pub async fn wait_pending(&self) -> Result<u64> { self.db.wait_pending().await }
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub async fn wait_count(&self, count: &u64) -> Result<u64> { self.db.wait_count(count).await }
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
#[must_use]
|
#[must_use]
|
||||||
pub fn next_count(&self) -> data::Permit { self.db.next_count() }
|
pub fn next_count(&self) -> data::Permit { self.db.next_count() }
|
||||||
|
|||||||
Reference in New Issue
Block a user