Skip to content

Commit

Permalink
Support new joiner functionality (#543)
Browse files Browse the repository at this point in the history
* A new setting, `use_new_joiner_type_`, has been introduced, which
defaults to false.

* When activated, any new members are immediately added to the cluster
configuration as a "new joiner." Similar to learners, new joiners do
not participate in the quorum; they cannot initiate or partake in voting.

* Once the log gap of a new joiner narrows to less than
`log_sync_stop_gap_`, the member transitions to a regular status and
becomes eligible to engage in the leader election process.

* The advantage of this feature is that it allows a newly elected leader
to continue the synchronization process with the new joiner without
needing to invoke the `add_srv()` API again.
  • Loading branch information
greensky00 authored Oct 15, 2024
1 parent 2f28d5b commit 1adcc62
Show file tree
Hide file tree
Showing 14 changed files with 511 additions and 12 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ if (CODE_COVERAGE GREATER 0)
strfmt_test
stat_mgr_test
logger_test
new_joiner_test
)

# lcov
Expand Down
4 changes: 4 additions & 0 deletions include/libnuraft/peer.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ public:
return config_->is_learner();
}

bool is_new_joiner() const {
return config_->is_new_joiner();
}

const srv_config& get_config() {
return *config_;
}
Expand Down
15 changes: 15 additions & 0 deletions include/libnuraft/raft_params.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ struct raft_params {
, return_method_(blocking)
, auto_forwarding_req_timeout_(0)
, grace_period_of_lagging_state_machine_(0)
, use_new_joiner_type_(false)
, use_bg_thread_for_snapshot_io_(false)
, use_full_consensus_among_healthy_members_(false)
, parallel_log_appending_(false)
Expand Down Expand Up @@ -553,6 +554,20 @@ public:
*/
int32 grace_period_of_lagging_state_machine_;

/**
* If `true`, the new joiner will be added to cluster config as a `new_joiner`
* even before syncing all data. The new joiner will not initiate a vote or
* participate in leader election.
*
* Once the log gap becomes smaller than `log_sync_stop_gap_`, the new joiner
* will be a regular member.
*
* The purpose of this featuer is to preserve the new joiner information
* even after leader re-election, in order to let the new leader continue
* the sync process without calling `add_srv` again.
*/
bool use_new_joiner_type_;

/**
* (Experimental)
* If `true`, reading snapshot objects will be done by a background thread
Expand Down
12 changes: 12 additions & 0 deletions include/libnuraft/srv_config.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public:
, dc_id_(0)
, endpoint_(endpoint)
, learner_(false)
, new_joiner_(false)
, priority_(INIT_PRIORITY)
{}

Expand All @@ -55,6 +56,7 @@ public:
, endpoint_(endpoint)
, aux_(aux)
, learner_(learner)
, new_joiner_(false)
, priority_(priority)
{}

Expand All @@ -75,6 +77,10 @@ public:

bool is_learner() const { return learner_; }

bool is_new_joiner() const { return new_joiner_; }

void set_new_joiner(bool to) { new_joiner_ = to; }

int32 get_priority() const { return priority_; }

void set_priority(const int32 new_val) { priority_ = new_val; }
Expand Down Expand Up @@ -111,6 +117,12 @@ private:
*/
bool learner_;

/**
* `true` if this node is a new joiner, but not yet fully synced.
* New joiner will not initiate or participate in leader election.
*/
bool new_joiner_;

/**
* Priority of this node.
* 0 will never be a leader.
Expand Down
1 change: 1 addition & 0 deletions scripts/test/runtests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ set -e
./tests/strfmt_test --abort-on-failure
./tests/stat_mgr_test --abort-on-failure
./tests/raft_server_test --abort-on-failure
./tests/new_joiner_test --abort-on-failure
./tests/failure_test --abort-on-failure
./tests/asio_service_test --abort-on-failure
41 changes: 40 additions & 1 deletion src/handle_append_entries.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ ptr<resp_msg> raft_server::handle_append_entries(req_msg& req)
// may cause stepping down of this node.
ptr<cluster_config> cur_config = get_config();
ptr<srv_config> my_config = cur_config->get_server(id_);
if (my_config) {
if (my_config && !my_config->is_new_joiner()) {
p_in("catch-up process is done, clearing the flag");
catching_up_ = false;
}
Expand Down Expand Up @@ -1081,6 +1081,45 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
resp.get_next_idx(), p->get_next_log_idx() );
}

if (!config_changing_ && p->get_config().is_new_joiner()) {
auto params = ctx_->get_params();
uint64_t log_sync_stop_gap =
params->log_sync_stop_gap_ ? params->log_sync_stop_gap_ : 1;
uint64_t matched_idx = p->get_matched_idx();
uint64_t next_slot = log_store_->next_slot();
if (matched_idx + log_sync_stop_gap >= next_slot) {
p_in("peer %d is no longer a new joiner, matched index: %" PRIu64 ", "
"next slot: %" PRIu64 ", sync stop gap: %" PRIu64
", set new joiner flag to false",
p->get_id(), matched_idx, next_slot, log_sync_stop_gap);

// Clone the current cluster config.
ptr<cluster_config> cur_conf = get_config();
ptr<buffer> enc_conf_buf = cur_conf->serialize();
ptr<cluster_config> new_conf = cluster_config::deserialize(*enc_conf_buf);
new_conf->set_log_idx(log_store_->next_slot());

// Remove new joiner flag.
for (auto& ss: new_conf->get_servers()) {
if (ss->get_id() == p->get_id()) {
ss->set_new_joiner(false);
break;
}
}

ptr<buffer> new_conf_buf(new_conf->serialize());
ptr<log_entry> entry( cs_new<log_entry>( state_->get_term(),
new_conf_buf,
log_val_type::conf,
timer_helper::get_timeofday_us() ) );
store_log_entry(entry);
config_changing_ = true;
uncommitted_config_ = new_conf;
request_append_entries();
return;
}
}

// NOTE:
// If all other followers are not responding, we may not make
// below condition true. In that case, we check the timeout of
Expand Down
15 changes: 12 additions & 3 deletions src/handle_commit.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -757,10 +757,17 @@ void raft_server::reconfigure(const ptr<cluster_config>& new_config) {
if (id_ == (*it)->get_id()) {
my_priority_ = (*it)->get_priority();
steps_to_down_ = 0;
if (role_ == srv_role::follower &&
if (!(*it)->is_new_joiner() &&
role_ == srv_role::follower &&
catching_up_) {
// If this node is newly added, start election timer
// without waiting for the next append_entries message.
// Except for new joiner type, if this server is added
// to the cluster config, that means the sync is done.
// Start election timer without waiting for
// the next append_entries message.
//
// If this server is a new joiner, `catching_up_` flag
// will be cleared when it becomes a regular member,
// that is also notified by a new cluster config.
p_in("now this node is the part of cluster, "
"catch-up process is done, clearing the flag");
catching_up_ = false;
Expand Down Expand Up @@ -799,6 +806,7 @@ void raft_server::reconfigure(const ptr<cluster_config>& new_config) {
str_buf << "add peer " << srv_added->get_id()
<< ", " << srv_added->get_endpoint()
<< ", " << (srv_added->is_learner() ? "learner" : "voting member")
<< ", " << (srv_added->is_new_joiner() ? "new joiner" : "regular")
<< std::endl;

peers_.insert(std::make_pair(srv_added->get_id(), p));
Expand Down Expand Up @@ -940,6 +948,7 @@ void raft_server::reconfigure(const ptr<cluster_config>& new_config) {
<< ", DC ID " << s_conf->get_dc_id()
<< ", " << s_conf->get_endpoint()
<< ", " << (s_conf->is_learner() ? "learner" : "voting member")
<< ", " << (s_conf->is_new_joiner() ? "new joiner" : "regular member")
<< ", " << s_conf->get_priority()
<< std::endl;
}
Expand Down
12 changes: 9 additions & 3 deletions src/handle_join_leave.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,13 @@ ptr<resp_msg> raft_server::handle_add_srv_req(req_msg& req) {

// Before checking duplicate ID, confirm srv_to_leave_ is gone.
check_srv_to_leave_timeout();
ptr<srv_config> srv_conf =
srv_config::deserialize( entries[0]->get_buf() );
ptr<srv_config> srv_conf = srv_config::deserialize( entries[0]->get_buf() );

ptr<raft_params> params = ctx_->get_params();
if (params->use_new_joiner_type_) {
srv_conf->set_new_joiner(true);
}

if ( peers_.find( srv_conf->get_id() ) != peers_.end() ||
id_ == srv_conf->get_id() ) {
p_wn( "the server to be added has a duplicated "
Expand Down Expand Up @@ -232,7 +237,8 @@ void raft_server::sync_log_to_new_srv(ulong start_idx) {
ptr<raft_params> params = ctx_->get_params();
if ( ( params->log_sync_stop_gap_ > 0 &&
gap < (ulong)params->log_sync_stop_gap_ ) ||
params->log_sync_stop_gap_ == 0 ) {
params->log_sync_stop_gap_ == 0 ||
params->use_new_joiner_type_ ) {
p_in( "[SYNC LOG] LogSync is done for server %d "
"with log gap %" PRIu64 " (%" PRIu64 " - %" PRIu64 ", limit %d), "
"now put the server into cluster",
Expand Down
1 change: 1 addition & 0 deletions src/handle_timeout.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ void raft_server::stop_election_timer() {
return;
}

p_tr("stop election timer");
cancel_task(election_task_);
}

Expand Down
2 changes: 1 addition & 1 deletion src/handle_vote.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ void raft_server::request_vote(bool force_vote) {
for (peer_itor it = peers_.begin(); it != peers_.end(); ++it) {
ptr<peer> pp = it->second;
if (!is_regular_member(pp)) {
// Do not send voting request to learner.
// Do not send voting request to learner or new joiner.
continue;
}
ptr<req_msg> req = cs_new<req_msg>
Expand Down
5 changes: 5 additions & 0 deletions src/raft_server.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ void raft_server::apply_and_log_current_params() {
"max batch %d, backoff %d, snapshot distance %d, "
"enable randomized snapshot creation %s, "
"log sync stop gap %d, "
"use new joiner type %s, "
"reserved logs %d, client timeout %d, "
"auto forwarding %s, API call type %s, "
"custom commit quorum size %d, "
Expand All @@ -411,6 +412,7 @@ void raft_server::apply_and_log_current_params() {
params->snapshot_distance_,
params->enable_randomized_snapshot_creation_ ? "YES" : "NO",
params->log_sync_stop_gap_,
params->use_new_joiner_type_ ? "YES" : "NO",
params->reserved_log_items_,
params->client_req_timeout_,
( params->auto_forwarding_ ? "ON" : "OFF" ),
Expand Down Expand Up @@ -546,6 +548,9 @@ bool raft_server::is_regular_member(const ptr<peer>& p) {
// Skip learner.
if (p->is_learner()) return false;

// Skip new joiner.
if (p->is_new_joiner()) return false;

return true;
}

Expand Down
22 changes: 19 additions & 3 deletions src/srv_config.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ limitations under the License.

namespace nuraft {

static const uint8_t LEARNER_FLAG = 0x1;
static const uint8_t NEW_JOINER_FLAG = 0x2;

ptr<srv_config> srv_config::deserialize(buffer& buf) {
buffer_serializer bs(buf);
return deserialize(bs);
Expand All @@ -34,9 +37,17 @@ ptr<srv_config> srv_config::deserialize(buffer_serializer& bs) {
const char* aux_char = bs.get_cstr();
std::string endpoint( (endpoint_char) ? endpoint_char : std::string() );
std::string aux( (aux_char) ? aux_char : std::string() );
byte is_learner = bs.get_u8();

uint8_t srv_type = bs.get_u8();
bool is_learner = (srv_type & LEARNER_FLAG) ? true : false;
bool new_joiner = (srv_type & NEW_JOINER_FLAG) ? true : false;

int32 priority = bs.get_i32();
return cs_new<srv_config>(id, dc_id, endpoint, aux, is_learner, priority);

ptr<srv_config> ret =
cs_new<srv_config>(id, dc_id, endpoint, aux, is_learner, priority);
ret->set_new_joiner(new_joiner);
return ret;
}

ptr<buffer> srv_config::serialize() const{
Expand All @@ -52,7 +63,12 @@ ptr<buffer> srv_config::serialize() const{
buf->put(dc_id_);
buf->put(endpoint_);
buf->put(aux_);
buf->put((byte)(learner_?(1):(0)));

uint8_t srv_type = 0x0;
srv_type |= (learner_ ? LEARNER_FLAG : 0x0);
srv_type |= (new_joiner_ ? NEW_JOINER_FLAG : 0x0);
buf->put((byte)srv_type);

buf->put(priority_);
buf->pos(0);
return buf;
Expand Down
13 changes: 12 additions & 1 deletion tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# === Basic Raft server functionality test ===
# === Basic Raft server functionality tests without real network stack ===
add_executable(raft_server_test
unit/raft_server_test.cxx
unit/fake_network.cxx
Expand All @@ -9,6 +9,17 @@ add_dependencies(raft_server_test
target_link_libraries(raft_server_test
${BUILD_DIR}/${LIBRARY_OUTPUT_NAME})

add_executable(new_joiner_test
unit/new_joiner_test.cxx
unit/fake_network.cxx
${EXAMPLES_SRC}/logger.cc
${EXAMPLES_SRC}/in_memory_log_store.cxx)
add_dependencies(new_joiner_test
static_lib)
target_link_libraries(new_joiner_test
${BUILD_DIR}/${LIBRARY_OUTPUT_NAME})


# === Failure recovery & conflict resolution test ===
add_executable(failure_test
unit/failure_test.cxx
Expand Down
Loading

0 comments on commit 1adcc62

Please sign in to comment.