Require stronger ordering to sample final counter values on shutdown.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -10,19 +10,17 @@ use tuwunel_core::{Result, Server, debug_info, info};
|
|||||||
|
|
||||||
pub(super) async fn serve(
|
pub(super) async fn serve(
|
||||||
server: &Arc<Server>,
|
server: &Arc<Server>,
|
||||||
app: Router,
|
router: Router,
|
||||||
handle: ServerHandle,
|
handle: ServerHandle,
|
||||||
addrs: Vec<SocketAddr>,
|
addrs: Vec<SocketAddr>,
|
||||||
) -> Result {
|
) -> Result {
|
||||||
let app = app.into_make_service_with_connect_info::<SocketAddr>();
|
|
||||||
let mut join_set = JoinSet::new();
|
let mut join_set = JoinSet::new();
|
||||||
|
let router = router.into_make_service_with_connect_info::<SocketAddr>();
|
||||||
for addr in &addrs {
|
for addr in &addrs {
|
||||||
join_set.spawn_on(
|
let bound = bind(*addr);
|
||||||
bind(*addr)
|
let handler = bound.handle(handle.clone());
|
||||||
.handle(handle.clone())
|
let acceptor = handler.serve(router.clone());
|
||||||
.serve(app.clone()),
|
join_set.spawn_on(acceptor, server.runtime());
|
||||||
server.runtime(),
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Listening on {addrs:?}");
|
info!("Listening on {addrs:?}");
|
||||||
@@ -31,21 +29,22 @@ pub(super) async fn serve(
|
|||||||
let handle_active = server
|
let handle_active = server
|
||||||
.metrics
|
.metrics
|
||||||
.requests_handle_active
|
.requests_handle_active
|
||||||
.load(Ordering::Relaxed);
|
.load(Ordering::Acquire);
|
||||||
|
|
||||||
debug_info!(
|
debug_info!(
|
||||||
handle_finished = server
|
handle_finished = server
|
||||||
.metrics
|
.metrics
|
||||||
.requests_handle_finished
|
.requests_handle_finished
|
||||||
.load(Ordering::Relaxed),
|
.load(Ordering::Acquire),
|
||||||
panics = server
|
panics = server
|
||||||
.metrics
|
.metrics
|
||||||
.requests_panic
|
.requests_panic
|
||||||
.load(Ordering::Relaxed),
|
.load(Ordering::Acquire),
|
||||||
handle_active,
|
handle_active,
|
||||||
"Stopped listening on {addrs:?}",
|
"Stopped listening on {addrs:?}",
|
||||||
);
|
);
|
||||||
|
|
||||||
debug_assert!(handle_active == 0, "active request handles still pending");
|
debug_assert_eq!(0, handle_active, "active request handles still pending");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -79,21 +79,22 @@ pub(super) async fn serve(
|
|||||||
let handle_active = server
|
let handle_active = server
|
||||||
.metrics
|
.metrics
|
||||||
.requests_handle_active
|
.requests_handle_active
|
||||||
.load(Ordering::Relaxed);
|
.load(Ordering::Acquire);
|
||||||
|
|
||||||
debug_info!(
|
debug_info!(
|
||||||
handle_finished = server
|
handle_finished = server
|
||||||
.metrics
|
.metrics
|
||||||
.requests_handle_finished
|
.requests_handle_finished
|
||||||
.load(Ordering::Relaxed),
|
.load(Ordering::Acquire),
|
||||||
panics = server
|
panics = server
|
||||||
.metrics
|
.metrics
|
||||||
.requests_panic
|
.requests_panic
|
||||||
.load(Ordering::Relaxed),
|
.load(Ordering::Acquire),
|
||||||
handle_active,
|
handle_active,
|
||||||
"Stopped listening on {addrs:?}",
|
"Stopped listening on {addrs:?}",
|
||||||
);
|
);
|
||||||
|
|
||||||
debug_assert!(handle_active == 0, "active request handles still pending");
|
debug_assert_eq!(0, handle_active, "active request handles still pending");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -162,7 +162,7 @@ async fn fini(server: &Arc<Server>, listener: UnixListener, mut tasks: JoinSet<(
|
|||||||
while server
|
while server
|
||||||
.metrics
|
.metrics
|
||||||
.requests_handle_active
|
.requests_handle_active
|
||||||
.load(Ordering::Relaxed)
|
.load(Ordering::Acquire)
|
||||||
.gt(&0)
|
.gt(&0)
|
||||||
{
|
{
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
|||||||
Reference in New Issue
Block a user