Skip to content

Commit

Permalink
ServerJoinFailed callback (#470)
Browse files Browse the repository at this point in the history
* add serverJoinFailed callback

* update regarding review
  • Loading branch information
myrrc authored Sep 18, 2023
1 parent 99eeef3 commit c6ccc20
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 4 deletions.
6 changes: 6 additions & 0 deletions include/libnuraft/callback.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ public:
* ctx: null
*/
AutoAdjustQuorum = 24,

/**
* Adding a server failed due to RPC errors and timeout expiry.
* ctx: null
*/
ServerJoinFailed = 25
};

struct Param {
Expand Down
7 changes: 7 additions & 0 deletions src/handle_join_leave.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ ptr<resp_msg> raft_server::handle_add_srv_req(req_msg& req) {
// Otherwise: activity timeout, reset the server.
p_wn("activity timeout (last activity %" PRIu64 " ms ago), start over",
last_active_ms);

cb_func::Param param(id_, leader_, srv_to_join_->get_id());
invoke_callback(cb_func::ServerJoinFailed, &param);

reset_srv_to_join();
}

Expand Down Expand Up @@ -596,6 +600,9 @@ void raft_server::handle_join_leave_rpc_err(msg_type t_msg, ptr<peer> p) {
p->get_id() );
config_changing_ = false;
reset_srv_to_join();

cb_func::Param param(id_, leader_, p->get_id());
invoke_callback(cb_func::ServerJoinFailed, &param);
}
}

Expand Down
5 changes: 3 additions & 2 deletions tests/unit/raft_package_fake.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,13 @@ static cb_func::ReturnCode ATTR_UNUSED cb_default(

static INT_UNUSED launch_servers(const std::vector<RaftPkg*>& pkgs,
raft_params* custom_params = nullptr,
bool restart = false) {
bool restart = false,
cb_func::func_type callback = cb_default) {
size_t num_srvs = pkgs.size();
CHK_GT(num_srvs, 0);

raft_server::init_options opt(false, true, true);
opt.raft_callback_ = cb_default;
opt.raft_callback_ = callback;

for (size_t ii = 0; ii < num_srvs; ++ii) {
RaftPkg* ff = pkgs[ii];
Expand Down
30 changes: 28 additions & 2 deletions tests/unit/raft_server_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -283,13 +283,30 @@ int add_node_error_cases_test() {
std::string s1_addr = "S1";
std::string s2_addr = "S2";
std::string s3_addr = "S3";
// Hard to make a server really non-existent as to fail an rpc req with a FakeNetwork
// you need to actually have a recipient. So we simulate a nonexistent server with an
// offline one
std::string nonexistent_addr = "nonexistent";

RaftPkg s1(f_base, 1, s1_addr);
RaftPkg s2(f_base, 2, s2_addr);
RaftPkg s3(f_base, 3, s3_addr);
std::vector<RaftPkg*> pkgs = {&s1, &s2, &s3};
RaftPkg nonexistent(f_base, 4, nonexistent_addr);
std::vector<RaftPkg*> pkgs = {&s1, &s2, &s3, &nonexistent};

bool join_error_callback_fired = false;
int join_error_srv_id = -1;
auto join_error_callback = [&](cb_func::Type type, cb_func::Param* param) {
if (type == cb_func::Type::ServerJoinFailed) {
join_error_callback_fired = true;
join_error_srv_id = param->peerId;
return cb_func::ReturnCode::Ok;
}
return cb_default(type, param);
};

CHK_Z( launch_servers( pkgs ) );
CHK_Z( launch_servers( pkgs, nullptr, false, join_error_callback) );
nonexistent.fNet->goesOffline();

size_t num_srvs = pkgs.size();
CHK_GT(num_srvs, 0);
Expand Down Expand Up @@ -438,6 +455,15 @@ int add_node_error_cases_test() {
CHK_EQ(3, configs_out.size());
}

{ // Add a non-existent server to S1, check that a callback is fired on timers expiry.
s1.raftServer->add_srv({nonexistent.myId, nonexistent_addr});
s1.fNet->execReqResp(nonexistent_addr);
s1.fNet->execReqResp(nonexistent_addr);

CHK_TRUE(join_error_callback_fired);
CHK_EQ(nonexistent.myId, join_error_srv_id);
}

print_stats(pkgs);

s1.raftServer->shutdown();
Expand Down

0 comments on commit c6ccc20

Please sign in to comment.