diff --git a/pingora-core/src/server/mod.rs b/pingora-core/src/server/mod.rs index 1e1eb7dc3..ba1f17c4f 100644 --- a/pingora-core/src/server/mod.rs +++ b/pingora-core/src/server/mod.rs @@ -83,39 +83,45 @@ impl Server { let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap(); let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap(); let mut reload_signal = unix::signal(unix::SignalKind::hangup()).unwrap(); - tokio::select! { - _ = fast_shutdown_signal.recv() => { - info!("SIGINT received, exiting"); - (ShutdownType::Quick, false) - }, - _ = graceful_terminate_signal.recv() => { - // we receive a graceful terminate, all instances are instructed to stop - info!("SIGTERM received, gracefully exiting"); - // graceful shutdown if there are listening sockets - info!("Broadcasting graceful shutdown"); - match self.shutdown_watch.send(true) { - Ok(_) => { info!("Graceful shutdown started!"); } - Err(e) => { - error!("Graceful shutdown broadcast failed: {e}"); + + loop { + tokio::select! { + _ = fast_shutdown_signal.recv() => { + info!("SIGINT received, exiting"); + return (ShutdownType::Quick, false) + }, + _ = graceful_terminate_signal.recv() => { + // we receive a graceful terminate, all instances are instructed to stop + info!("SIGTERM received, gracefully exiting"); + // graceful shutdown if there are listening sockets + info!("Broadcasting graceful shutdown"); + match self.shutdown_watch.send(true) { + Ok(_) => { info!("Graceful shutdown started!"); } + Err(e) => { + error!("Graceful shutdown broadcast failed: {e}"); + } } + info!("Broadcast graceful shutdown complete"); + return (ShutdownType::Graceful, false) + } + _ = graceful_upgrade_signal.recv() => { + info!("SIGQUIT received, sending socks and gracefully exiting"); + self.handle_gracefull_upgrade_signal(false).await; + return (ShutdownType::Graceful, false) + }, + _ = reload_signal.recv() => { + info!("SIGHUP received, sending socks and gracefully reloading"); + // ensure that sending socks is successful before exiting + if !self.handle_gracefull_upgrade_signal(true).await { + continue; + } + return (ShutdownType::Graceful, true) } - info!("Broadcast graceful shutdown complete"); - (ShutdownType::Graceful, false) - } - _ = graceful_upgrade_signal.recv() => { - info!("SIGQUIT received, sending socks and gracefully exiting"); - self.handle_gracefull_upgrade_signal().await; - (ShutdownType::Graceful, false) - }, - _ = reload_signal.recv() => { - info!("SIGHUP received, sending socks and gracefully reloading"); - self.handle_gracefull_upgrade_signal().await; - (ShutdownType::Graceful, true) } } } - async fn handle_gracefull_upgrade_signal(&self) { + async fn handle_gracefull_upgrade_signal(&self, reload: bool) -> bool { // TODO: still need to select! on signals in case a fast shutdown is needed // aka: move below to another task and only kick it off here if let Some(fds) = &self.listen_fds { @@ -131,6 +137,10 @@ impl Server { // sentry log error on fd send failure #[cfg(not(debug_assertions))] sentry::capture_error(&e); + + if reload { + return false; + } } } sleep(Duration::from_secs(CLOSE_TIMEOUT)).await; @@ -143,13 +153,14 @@ impl Server { Err(e) => { error!("Graceful shutdown broadcast failed: {e}"); // switch to fast shutdown - return; + return true; } } info!("Broadcast graceful shutdown complete"); } else { info!("No socks to send, shutting down."); } + true } fn run_service( diff --git a/pingora/examples/app/proxy.rs b/pingora/examples/app/proxy.rs index 4ac0aaea9..617e5ce51 100644 --- a/pingora/examples/app/proxy.rs +++ b/pingora/examples/app/proxy.rs @@ -36,6 +36,7 @@ enum DuplexEvent { } impl ProxyApp { + #[allow(dead_code)] pub fn new(proxy_to: BasicPeer) -> Self { ProxyApp { client_connector: TransportConnector::new(None), diff --git a/pingora/examples/server_reload.rs b/pingora/examples/server_reload.rs index 8d70ee946..dc64efb01 100644 --- a/pingora/examples/server_reload.rs +++ b/pingora/examples/server_reload.rs @@ -26,19 +26,36 @@ pub fn main() { let upgrade = Arc::new(AtomicBool::new(args_opt.upgrade)); let conf_filename = args_opt.conf; + let (server_stop_tx, mut server_stop_rx) = tokio::sync::mpsc::channel::(1); + loop { let conf_filename = conf_filename.clone(); let upgrade = upgrade.clone(); + #[cfg(target_os = "linux")] let upgrade_for_store = upgrade.clone(); - let task = tokio::spawn(async move { + + let server_stop_tx = server_stop_tx.clone(); + tokio::spawn(async move { let opt = Opt { conf: conf_filename, upgrade: upgrade.load(Ordering::SeqCst), ..Opt::default() }; let opt = Some(opt); - let mut my_server = Server::new(opt).unwrap(); - my_server.try_bootstrap().unwrap(); + let mut my_server = match Server::new(opt) { + Ok(server) => server, + Err(e) => { + error!("Create server error: {:?}", e); + return; + } + }; + match my_server.try_bootstrap() { + Ok(_) => {} + Err(e) => { + error!("Bootstrap error: {:?}", e); + return; + } + } let mut echo_service_http = service::echo::echo_service_http(); @@ -57,12 +74,16 @@ pub fn main() { tokio::task::spawn_blocking(move || match my_server.run_server(false) { Ok(reload) => { info!("Reload: {}", reload); + reload } Err(e) => { error!("Failed to run server: {}", e); + false } }); - server_task.await.unwrap(); + if !server_task.await.unwrap() { + server_stop_tx.send(true).await.unwrap(); + } }); tokio::select! { @@ -76,7 +97,7 @@ pub fn main() { info!("Upgrade is only supported on Linux"); } } - _ = task => { + _ = server_stop_rx.recv() => { info!("Server task finished"); break; } diff --git a/pingora/examples/service/echo.rs b/pingora/examples/service/echo.rs index af07da217..d234d6494 100644 --- a/pingora/examples/service/echo.rs +++ b/pingora/examples/service/echo.rs @@ -15,6 +15,7 @@ use crate::app::echo::{EchoApp, HttpEchoApp}; use pingora::services::listening::Service; +#[allow(dead_code)] pub fn echo_service() -> Service { Service::new("Echo Service".to_string(), EchoApp) } diff --git a/pingora/examples/service/proxy.rs b/pingora/examples/service/proxy.rs index 49c24ca02..a9391dcb5 100644 --- a/pingora/examples/service/proxy.rs +++ b/pingora/examples/service/proxy.rs @@ -17,6 +17,7 @@ use pingora_core::listeners::Listeners; use pingora_core::services::listening::Service; use pingora_core::upstreams::peer::BasicPeer; +#[allow(dead_code)] pub fn proxy_service(addr: &str, proxy_addr: &str) -> Service { let proxy_to = BasicPeer::new(proxy_addr); @@ -27,6 +28,7 @@ pub fn proxy_service(addr: &str, proxy_addr: &str) -> Service { ) } +#[allow(dead_code)] pub fn proxy_service_tls( addr: &str, proxy_addr: &str,