Skip to content

Commit

Permalink
Ensure sending socks success before broadcasting shutdown signal
Browse files Browse the repository at this point in the history
  • Loading branch information
samurai00 committed Oct 10, 2024
1 parent 0c3edf1 commit 6650a32
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 33 deletions.
67 changes: 39 additions & 28 deletions pingora-core/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,39 +83,45 @@ impl Server {
let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap();
let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap();
let mut reload_signal = unix::signal(unix::SignalKind::hangup()).unwrap();
tokio::select! {
_ = fast_shutdown_signal.recv() => {
info!("SIGINT received, exiting");
(ShutdownType::Quick, false)
},
_ = graceful_terminate_signal.recv() => {
// we receive a graceful terminate, all instances are instructed to stop
info!("SIGTERM received, gracefully exiting");
// graceful shutdown if there are listening sockets
info!("Broadcasting graceful shutdown");
match self.shutdown_watch.send(true) {
Ok(_) => { info!("Graceful shutdown started!"); }
Err(e) => {
error!("Graceful shutdown broadcast failed: {e}");

loop {
tokio::select! {
_ = fast_shutdown_signal.recv() => {
info!("SIGINT received, exiting");
return (ShutdownType::Quick, false)
},
_ = graceful_terminate_signal.recv() => {
// we receive a graceful terminate, all instances are instructed to stop
info!("SIGTERM received, gracefully exiting");
// graceful shutdown if there are listening sockets
info!("Broadcasting graceful shutdown");
match self.shutdown_watch.send(true) {
Ok(_) => { info!("Graceful shutdown started!"); }
Err(e) => {
error!("Graceful shutdown broadcast failed: {e}");
}
}
info!("Broadcast graceful shutdown complete");
return (ShutdownType::Graceful, false)
}
_ = graceful_upgrade_signal.recv() => {
info!("SIGQUIT received, sending socks and gracefully exiting");
self.handle_gracefull_upgrade_signal(false).await;
return (ShutdownType::Graceful, false)
},
_ = reload_signal.recv() => {
info!("SIGHUP received, sending socks and gracefully reloading");
// ensure that sending socks is successful before exiting
if !self.handle_gracefull_upgrade_signal(true).await {
continue;
}
return (ShutdownType::Graceful, true)
}
info!("Broadcast graceful shutdown complete");
(ShutdownType::Graceful, false)
}
_ = graceful_upgrade_signal.recv() => {
info!("SIGQUIT received, sending socks and gracefully exiting");
self.handle_gracefull_upgrade_signal().await;
(ShutdownType::Graceful, false)
},
_ = reload_signal.recv() => {
info!("SIGHUP received, sending socks and gracefully reloading");
self.handle_gracefull_upgrade_signal().await;
(ShutdownType::Graceful, true)
}
}
}

async fn handle_gracefull_upgrade_signal(&self) {
async fn handle_gracefull_upgrade_signal(&self, reload: bool) -> bool {
// TODO: still need to select! on signals in case a fast shutdown is needed
// aka: move below to another task and only kick it off here
if let Some(fds) = &self.listen_fds {
Expand All @@ -131,6 +137,10 @@ impl Server {
// sentry log error on fd send failure
#[cfg(not(debug_assertions))]
sentry::capture_error(&e);

if reload {
return false;
}
}
}
sleep(Duration::from_secs(CLOSE_TIMEOUT)).await;
Expand All @@ -143,13 +153,14 @@ impl Server {
Err(e) => {
error!("Graceful shutdown broadcast failed: {e}");
// switch to fast shutdown
return;
return true;
}
}
info!("Broadcast graceful shutdown complete");
} else {
info!("No socks to send, shutting down.");
}
true
}

fn run_service(
Expand Down
1 change: 1 addition & 0 deletions pingora/examples/app/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ enum DuplexEvent {
}

impl ProxyApp {
#[allow(dead_code)]
pub fn new(proxy_to: BasicPeer) -> Self {
ProxyApp {
client_connector: TransportConnector::new(None),
Expand Down
31 changes: 26 additions & 5 deletions pingora/examples/server_reload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,36 @@ pub fn main() {
let upgrade = Arc::new(AtomicBool::new(args_opt.upgrade));
let conf_filename = args_opt.conf;

let (server_stop_tx, mut server_stop_rx) = tokio::sync::mpsc::channel::<bool>(1);

loop {
let conf_filename = conf_filename.clone();
let upgrade = upgrade.clone();
#[cfg(target_os = "linux")]
let upgrade_for_store = upgrade.clone();
let task = tokio::spawn(async move {

let server_stop_tx = server_stop_tx.clone();
tokio::spawn(async move {
let opt = Opt {
conf: conf_filename,
upgrade: upgrade.load(Ordering::SeqCst),
..Opt::default()
};
let opt = Some(opt);
let mut my_server = Server::new(opt).unwrap();
my_server.try_bootstrap().unwrap();
let mut my_server = match Server::new(opt) {
Ok(server) => server,
Err(e) => {
error!("Create server error: {:?}", e);
return;
}
};
match my_server.try_bootstrap() {
Ok(_) => {}
Err(e) => {
error!("Bootstrap error: {:?}", e);
return;
}
}

let mut echo_service_http = service::echo::echo_service_http();

Expand All @@ -57,12 +74,16 @@ pub fn main() {
tokio::task::spawn_blocking(move || match my_server.run_server(false) {
Ok(reload) => {
info!("Reload: {}", reload);
reload
}
Err(e) => {
error!("Failed to run server: {}", e);
false
}
});
server_task.await.unwrap();
if !server_task.await.unwrap() {
server_stop_tx.send(true).await.unwrap();
}
});

tokio::select! {
Expand All @@ -76,7 +97,7 @@ pub fn main() {
info!("Upgrade is only supported on Linux");
}
}
_ = task => {
_ = server_stop_rx.recv() => {
info!("Server task finished");
break;
}
Expand Down
1 change: 1 addition & 0 deletions pingora/examples/service/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use crate::app::echo::{EchoApp, HttpEchoApp};
use pingora::services::listening::Service;

#[allow(dead_code)]
pub fn echo_service() -> Service<EchoApp> {
Service::new("Echo Service".to_string(), EchoApp)
}
Expand Down
2 changes: 2 additions & 0 deletions pingora/examples/service/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use pingora_core::listeners::Listeners;
use pingora_core::services::listening::Service;
use pingora_core::upstreams::peer::BasicPeer;

#[allow(dead_code)]
pub fn proxy_service(addr: &str, proxy_addr: &str) -> Service<ProxyApp> {
let proxy_to = BasicPeer::new(proxy_addr);

Expand All @@ -27,6 +28,7 @@ pub fn proxy_service(addr: &str, proxy_addr: &str) -> Service<ProxyApp> {
)
}

#[allow(dead_code)]
pub fn proxy_service_tls(
addr: &str,
proxy_addr: &str,
Expand Down

0 comments on commit 6650a32

Please sign in to comment.