Ensure unwind safety in dispatch sequence.

Ensure await safety/efficiency in retirement sequence.

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk
2025-07-30 21:18:11 +00:00
parent bd0c3e33e2
commit 0b15ab2006
2 changed files with 10 additions and 9 deletions

View File

@@ -123,16 +123,16 @@ impl<F: Fn(u64) -> Result + Sync> State<F> {
/// Dispatch the next sequence number as pending. The retired value is /// Dispatch the next sequence number as pending. The retired value is
/// calculated as a courtesy while the state is under lock. /// calculated as a courtesy while the state is under lock.
fn dispatch(&mut self) -> Result<(u64, u64)> { fn dispatch(&mut self) -> Result<(u64, u64)> {
let retired = self.retired();
let prev = self.dispatched; let prev = self.dispatched;
let retired = self.retired();
self.dispatched = checked!(prev + 1)?; let dispatched = checked!(prev + 1)?;
(self.commit)(self.dispatched)?;
debug_assert!( debug_assert!(
!self.check_pending(self.dispatched), !self.check_pending(dispatched),
"sequence number cannot already be pending", "sequence number cannot already be pending",
); );
(self.commit)(dispatched)?;
self.dispatched = dispatched;
self.pending.push_back(self.dispatched); self.pending.push_back(self.dispatched);
Ok((retired, self.dispatched)) Ok((retired, self.dispatched))
} }

View File

@@ -1,6 +1,7 @@
use std::{ops::Range, sync::Arc}; use std::{ops::Range, sync::Arc};
use tokio::sync::{watch, watch::Sender}; use futures::TryFutureExt;
use tokio::sync::watch::Sender;
use tuwunel_core::{ use tuwunel_core::{
Result, err, utils, Result, err, utils,
utils::two_phase_counter::{Counter as TwoPhaseCounter, Permit as TwoPhasePermit}, utils::two_phase_counter::{Counter as TwoPhaseCounter, Permit as TwoPhasePermit},
@@ -24,7 +25,7 @@ impl Data {
pub(super) fn new(args: &crate::Args<'_>) -> Self { pub(super) fn new(args: &crate::Args<'_>) -> Self {
let db = args.db.clone(); let db = args.db.clone();
let count = Self::stored_count(&args.db["global"]).expect("initialize global counter"); let count = Self::stored_count(&args.db["global"]).expect("initialize global counter");
let retires = watch::channel(count).0; let retires = Sender::new(count);
Self { Self {
db: args.db.clone(), db: args.db.clone(),
global: args.db["global"].clone(), global: args.db["global"].clone(),
@@ -51,9 +52,9 @@ impl Data {
self.retires self.retires
.subscribe() .subscribe()
.wait_for(|retired| retired.ge(count)) .wait_for(|retired| retired.ge(count))
.await .map_ok(|retired| *retired)
.map(|retired| *retired)
.map_err(|e| err!(debug_error!("counter channel error {e:?}"))) .map_err(|e| err!(debug_error!("counter channel error {e:?}")))
.await
} }
#[inline] #[inline]