From 0b15ab2006ec396eb2d4f0f5df76403bf4b149f8 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Wed, 30 Jul 2025 21:18:11 +0000 Subject: [PATCH] Ensure unwind safety in dispatch sequence. Ensure await safety/efficiency in retirement sequence. Signed-off-by: Jason Volk --- src/core/utils/two_phase_counter.rs | 10 +++++----- src/service/globals/data.rs | 9 +++++---- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/core/utils/two_phase_counter.rs b/src/core/utils/two_phase_counter.rs index 82ad34f9..e0d9e263 100644 --- a/src/core/utils/two_phase_counter.rs +++ b/src/core/utils/two_phase_counter.rs @@ -123,16 +123,16 @@ impl Result + Sync> State { /// Dispatch the next sequence number as pending. The retired value is /// calculated as a courtesy while the state is under lock. fn dispatch(&mut self) -> Result<(u64, u64)> { - let retired = self.retired(); let prev = self.dispatched; - - self.dispatched = checked!(prev + 1)?; - (self.commit)(self.dispatched)?; + let retired = self.retired(); + let dispatched = checked!(prev + 1)?; debug_assert!( - !self.check_pending(self.dispatched), + !self.check_pending(dispatched), "sequence number cannot already be pending", ); + (self.commit)(dispatched)?; + self.dispatched = dispatched; self.pending.push_back(self.dispatched); Ok((retired, self.dispatched)) } diff --git a/src/service/globals/data.rs b/src/service/globals/data.rs index fe9b3c0a..386560ee 100644 --- a/src/service/globals/data.rs +++ b/src/service/globals/data.rs @@ -1,6 +1,7 @@ use std::{ops::Range, sync::Arc}; -use tokio::sync::{watch, watch::Sender}; +use futures::TryFutureExt; +use tokio::sync::watch::Sender; use tuwunel_core::{ Result, err, utils, utils::two_phase_counter::{Counter as TwoPhaseCounter, Permit as TwoPhasePermit}, @@ -24,7 +25,7 @@ impl Data { pub(super) fn new(args: &crate::Args<'_>) -> Self { let db = args.db.clone(); let count = Self::stored_count(&args.db["global"]).expect("initialize global counter"); - let retires = watch::channel(count).0; + let retires = Sender::new(count); Self { db: args.db.clone(), global: args.db["global"].clone(), @@ -51,9 +52,9 @@ impl Data { self.retires .subscribe() .wait_for(|retired| retired.ge(count)) - .await - .map(|retired| *retired) + .map_ok(|retired| *retired) .map_err(|e| err!(debug_error!("counter channel error {e:?}"))) + .await } #[inline]