Simplify router::run() and blocking point.
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
@@ -3,8 +3,11 @@ use std::{
|
|||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use futures::FutureExt;
|
use futures::{FutureExt, pin_mut};
|
||||||
use tuwunel_core::{Error, Result, Server, debug, debug_error, debug_info, error, info};
|
use tuwunel_core::{
|
||||||
|
Error, Result, Server, debug, debug_error, debug_info, error, info,
|
||||||
|
utils::{BoolExt, future::OptionFutureExt},
|
||||||
|
};
|
||||||
use tuwunel_service::Services;
|
use tuwunel_service::Services;
|
||||||
|
|
||||||
use crate::{handle::ServerHandle, serve};
|
use crate::{handle::ServerHandle, serve};
|
||||||
@@ -24,27 +27,23 @@ pub(crate) async fn run(services: Arc<Services>) -> Result {
|
|||||||
.runtime()
|
.runtime()
|
||||||
.spawn(signal(server.clone(), handle.clone()));
|
.spawn(signal(server.clone(), handle.clone()));
|
||||||
|
|
||||||
let mut listener = if services.config.listening {
|
let listener = services
|
||||||
let future = serve::serve(services.clone(), handle);
|
.config
|
||||||
server
|
.listening
|
||||||
.runtime()
|
.then_async(|| {
|
||||||
.spawn(future)
|
server
|
||||||
.map(|res| res.map_err(Error::from).unwrap_or_else(Err))
|
.runtime()
|
||||||
.boxed()
|
.spawn(serve::serve(services.clone(), handle))
|
||||||
} else {
|
.map(|res| res.map_err(Error::from).unwrap_or_else(Err))
|
||||||
let server = server.clone();
|
})
|
||||||
async move {
|
.unwrap_or_else_async(|| server.until_shutdown().map(Ok));
|
||||||
server.until_shutdown().await;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
.boxed()
|
|
||||||
};
|
|
||||||
|
|
||||||
// Focal point
|
// Focal point
|
||||||
debug!("Running");
|
debug!("Running");
|
||||||
|
pin_mut!(listener);
|
||||||
let res = tokio::select! {
|
let res = tokio::select! {
|
||||||
res = &mut listener => res,
|
res = &mut listener => res.unwrap_or(Ok(())),
|
||||||
res = services.poll() => handle_services_poll(server, res, listener).await,
|
res = services.poll() => handle_services_finish(server, res, listener.await),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Join the signal handler before we leave.
|
// Join the signal handler before we leave.
|
||||||
@@ -127,10 +126,10 @@ fn handle_shutdown(server: &Arc<Server>, handle: &ServerHandle) {
|
|||||||
handle.graceful_shutdown(Some(timeout));
|
handle.graceful_shutdown(Some(timeout));
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_services_poll(
|
fn handle_services_finish(
|
||||||
server: &Arc<Server>,
|
server: &Arc<Server>,
|
||||||
result: Result,
|
result: Result,
|
||||||
listener: impl Future<Output = Result>,
|
listener: Option<Result>,
|
||||||
) -> Result {
|
) -> Result {
|
||||||
debug!("Service manager finished: {result:?}");
|
debug!("Service manager finished: {result:?}");
|
||||||
|
|
||||||
@@ -140,7 +139,7 @@ async fn handle_services_poll(
|
|||||||
error!("Failed to send shutdown signal: {e}");
|
error!("Failed to send shutdown signal: {e}");
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Err(e) = listener.await {
|
if let Some(Err(e)) = listener {
|
||||||
error!("Client listener task finished with error: {e}");
|
error!("Client listener task finished with error: {e}");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user