Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add non-exiting methods and implement SIGHUP reload #405

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 110 additions & 55 deletions pingora-core/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,69 +87,91 @@ pub struct Server {

impl Server {
#[cfg(unix)]
async fn main_loop(&self) -> ShutdownType {
async fn main_loop(&self) -> (ShutdownType, bool) {
// waiting for exit signal
// TODO: there should be a signal handling function
let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap();
let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap();
let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap();
tokio::select! {
_ = fast_shutdown_signal.recv() => {
info!("SIGINT received, exiting");
ShutdownType::Quick
},
_ = graceful_terminate_signal.recv() => {
// we receive a graceful terminate, all instances are instructed to stop
info!("SIGTERM received, gracefully exiting");
// graceful shutdown if there are listening sockets
info!("Broadcasting graceful shutdown");
match self.shutdown_watch.send(true) {
Ok(_) => { info!("Graceful shutdown started!"); }
Err(e) => {
error!("Graceful shutdown broadcast failed: {e}");
}
}
info!("Broadcast graceful shutdown complete");
ShutdownType::Graceful
}
_ = graceful_upgrade_signal.recv() => {
// TODO: still need to select! on signals in case a fast shutdown is needed
// aka: move below to another task and only kick it off here
info!("SIGQUIT received, sending socks and gracefully exiting");
if let Some(fds) = &self.listen_fds {
let fds = fds.lock().await;
info!("Trying to send socks");
// XXX: this is blocking IO
match fds.send_to_sock(
self.configuration.as_ref().upgrade_sock.as_str())
{
Ok(_) => {info!("listener sockets sent");},
Err(e) => {
error!("Unable to send listener sockets to new process: {e}");
// sentry log error on fd send failure
#[cfg(all(not(debug_assertions), feature = "sentry"))]
sentry::capture_error(&e);
}
}
sleep(Duration::from_secs(CLOSE_TIMEOUT)).await;
let mut reload_signal = unix::signal(unix::SignalKind::hangup()).unwrap();

loop {
tokio::select! {
_ = fast_shutdown_signal.recv() => {
info!("SIGINT received, exiting");
return (ShutdownType::Quick, false)
},
_ = graceful_terminate_signal.recv() => {
// we receive a graceful terminate, all instances are instructed to stop
info!("SIGTERM received, gracefully exiting");
// graceful shutdown if there are listening sockets
info!("Broadcasting graceful shutdown");
// gracefully exiting
match self.shutdown_watch.send(true) {
Ok(_) => { info!("Graceful shutdown started!"); }
Err(e) => {
error!("Graceful shutdown broadcast failed: {e}");
// switch to fast shutdown
return ShutdownType::Graceful;
}
}
info!("Broadcast graceful shutdown complete");
ShutdownType::Graceful
} else {
info!("No socks to send, shutting down.");
ShutdownType::Graceful
return (ShutdownType::Graceful, false)
}
_ = graceful_upgrade_signal.recv() => {
info!("SIGQUIT received, sending socks and gracefully exiting");
self.handle_gracefull_upgrade_signal(false).await;
return (ShutdownType::Graceful, false)
},
_ = reload_signal.recv() => {
info!("SIGHUP received, sending socks and gracefully reloading");
// ensure that sending socks is successful before exiting
if !self.handle_gracefull_upgrade_signal(true).await {
continue;
}
return (ShutdownType::Graceful, true)
}
},
}
}
}

async fn handle_gracefull_upgrade_signal(&self, reload: bool) -> bool {
// TODO: still need to select! on signals in case a fast shutdown is needed
// aka: move below to another task and only kick it off here
if let Some(fds) = &self.listen_fds {
let fds = fds.lock().await;
info!("Trying to send socks");
// XXX: this is blocking IO
match fds.send_to_sock(self.configuration.as_ref().upgrade_sock.as_str()) {
Ok(_) => {
info!("listener sockets sent");
}
Err(e) => {
error!("Unable to send listener sockets to new process: {e}");
// sentry log error on fd send failure
#[cfg(all(not(debug_assertions), feature = "sentry"))]
sentry::capture_error(&e);

if reload {
return false;
}
}
}
sleep(Duration::from_secs(CLOSE_TIMEOUT)).await;
info!("Broadcasting graceful shutdown");
// gracefully exiting
match self.shutdown_watch.send(true) {
Ok(_) => {
info!("Graceful shutdown started!");
}
Err(e) => {
error!("Graceful shutdown broadcast failed: {e}");
// switch to fast shutdown
return true;
}
}
info!("Broadcast graceful shutdown complete");
} else {
info!("No socks to send, shutting down.");
}
true
}

fn run_service(
Expand Down Expand Up @@ -277,6 +299,22 @@ impl Server {
/// When trying to zero downtime upgrade from an older version of the server which is already
/// running, this function will try to get all its listening sockets in order to take them over.
pub fn bootstrap(&mut self) {
match self.try_bootstrap() {
Ok(true) => {
std::process::exit(0);
}
Ok(false) => {}
Err(_) => {
std::process::exit(1);
}
}
}

/// Prepare the server to start
///
/// When trying to zero downtime upgrade from an older version of the server which is already
/// running, this function will try to get all its listening sockets in order to take them over.
pub fn try_bootstrap(&mut self) -> Result<bool> {
info!("Bootstrap starting");
debug!("{:#?}", self.options);

Expand All @@ -286,22 +324,26 @@ impl Server {

if self.options.as_ref().map_or(false, |o| o.test) {
info!("Server Test passed, exiting");
std::process::exit(0);
return Ok(true);
}

// load fds
#[cfg(unix)]
match self.load_fds(self.options.as_ref().map_or(false, |o| o.upgrade)) {
Ok(_) => {
info!("Bootstrap done");
Ok(false)
}
Err(e) => {
// sentry log error on fd load failure
#[cfg(all(not(debug_assertions), feature = "sentry"))]
sentry::capture_error(&e);

error!("Bootstrap failed on error: {:?}, exiting.", e);
std::process::exit(1);
Err(Error::explain(
ErrorType::Custom("BootstrapFdLoadError"),
e.desc(),
))
}
}
}
Expand All @@ -313,13 +355,26 @@ impl Server {
///
/// Note: this function may fork the process for daemonization, so any additional threads created
/// before this function will be lost to any service logic once this function is called.
pub fn run_forever(mut self) -> ! {
pub fn run_forever(self) -> ! {
let daemon = self.configuration.daemon;
self.run_server(daemon).unwrap();
std::process::exit(0)
}

/// Start the server
///
/// This function will block forever until the server needs to quit or reload. So this would be the last
/// function to call for this object.
///
/// Note: this function may fork the process for daemonization, so any additional threads created
/// before this function will be lost to any service logic once this function is called.
pub fn run_server(mut self, enable_daemon: bool) -> Result<bool> {
info!("Server starting");

let conf = self.configuration.as_ref();

#[cfg(unix)]
if conf.daemon {
if enable_daemon {
info!("Daemonizing the server");
fast_timeout::pause_for_fork();
daemonize(&self.configuration);
Expand Down Expand Up @@ -354,9 +409,9 @@ impl Server {
// Only work steal runtime can use block_on()
let server_runtime = Server::create_runtime("Server", 1, true);
#[cfg(unix)]
let shutdown_type = server_runtime.get_handle().block_on(self.main_loop());
let (shutdown_type, reload) = server_runtime.get_handle().block_on(self.main_loop());
#[cfg(windows)]
let shutdown_type = ShutdownType::Graceful;
let (shutdown_type, reload) = (ShutdownType::Graceful, false);

if matches!(shutdown_type, ShutdownType::Graceful) {
let exit_timeout = self
Expand Down Expand Up @@ -395,7 +450,7 @@ impl Server {
}
}
info!("All runtimes exited, exiting now");
std::process::exit(0)
Ok(reload)
}

fn create_runtime(name: &str, threads: usize, work_steal: bool) -> Runtime {
Expand Down
1 change: 1 addition & 0 deletions pingora/examples/app/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ enum DuplexEvent {
}

impl ProxyApp {
#[allow(dead_code)]
pub fn new(proxy_to: BasicPeer) -> Self {
ProxyApp {
client_connector: TransportConnector::new(None),
Expand Down
108 changes: 108 additions & 0 deletions pingora/examples/server_reload.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use log::{error, info};
use pingora::protocols::TcpKeepalive;
use pingora::server::configuration::Opt;
use pingora::server::Server;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::signal::unix;

mod app;
mod service;

pub fn main() {
env_logger::init();

let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
.unwrap();

let args_opt = Opt::parse_args();

rt.block_on(async move {
let mut reload_signal = unix::signal(unix::SignalKind::hangup()).unwrap();
let upgrade = Arc::new(AtomicBool::new(args_opt.upgrade));
let conf_filename = args_opt.conf;

let (server_stop_tx, mut server_stop_rx) = tokio::sync::mpsc::channel::<bool>(1);

loop {
let conf_filename = conf_filename.clone();
let upgrade = upgrade.clone();
#[cfg(target_os = "linux")]
let upgrade_for_store = upgrade.clone();

let server_stop_tx = server_stop_tx.clone();
tokio::spawn(async move {
let opt = Opt {
conf: conf_filename,
upgrade: upgrade.load(Ordering::SeqCst),
..Opt::default()
};
let opt = Some(opt);
let mut my_server = match Server::new(opt) {
Ok(server) => server,
Err(e) => {
error!("Create server error: {:?}", e);
return;
}
};
match my_server.try_bootstrap() {
Ok(_) => {}
Err(e) => {
error!("Bootstrap error: {:?}", e);
return;
}
}

let mut echo_service_http = service::echo::echo_service_http();

let mut options = pingora::listeners::TcpSocketOptions::default();
options.tcp_fastopen = Some(10);
options.tcp_keepalive = Some(TcpKeepalive {
idle: Duration::from_secs(60),
interval: Duration::from_secs(5),
count: 5,
});

echo_service_http.add_tcp_with_settings("0.0.0.0:6145", options);
my_server.add_service(echo_service_http);

let server_task =
tokio::task::spawn_blocking(move || match my_server.run_server(false) {
Ok(reload) => {
info!("Reload: {}", reload);
reload
}
Err(e) => {
error!("Failed to run server: {}", e);
false
}
});
if !server_task.await.unwrap() {
server_stop_tx.send(true).await.unwrap();
}
});

tokio::select! {
_ = reload_signal.recv() => {
#[cfg(target_os = "linux")]
{
upgrade_for_store.store(true, Ordering::SeqCst);
}
#[cfg(not(target_os = "linux"))]
{
info!("Upgrade is only supported on Linux");
}
}
_ = server_stop_rx.recv() => {
info!("Server task finished");
break;
}
}
}
});
rt.shutdown_background();
}
1 change: 1 addition & 0 deletions pingora/examples/service/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use crate::app::echo::{EchoApp, HttpEchoApp};
use pingora::services::listening::Service;

#[allow(dead_code)]
pub fn echo_service() -> Service<EchoApp> {
Service::new("Echo Service".to_string(), EchoApp)
}
Expand Down
2 changes: 2 additions & 0 deletions pingora/examples/service/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use pingora_core::listeners::Listeners;
use pingora_core::services::listening::Service;
use pingora_core::upstreams::peer::BasicPeer;

#[allow(dead_code)]
pub fn proxy_service(addr: &str, proxy_addr: &str) -> Service<ProxyApp> {
let proxy_to = BasicPeer::new(proxy_addr);

Expand All @@ -27,6 +28,7 @@ pub fn proxy_service(addr: &str, proxy_addr: &str) -> Service<ProxyApp> {
)
}

#[allow(dead_code)]
pub fn proxy_service_tls(
addr: &str,
proxy_addr: &str,
Expand Down