Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support new joiner functionality #543

Merged
merged 1 commit into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ if (CODE_COVERAGE GREATER 0)
strfmt_test
stat_mgr_test
logger_test
new_joiner_test
)

# lcov
Expand Down
4 changes: 4 additions & 0 deletions include/libnuraft/peer.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ public:
return config_->is_learner();
}

bool is_new_joiner() const {
return config_->is_new_joiner();
}

const srv_config& get_config() {
return *config_;
}
Expand Down
15 changes: 15 additions & 0 deletions include/libnuraft/raft_params.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ struct raft_params {
, return_method_(blocking)
, auto_forwarding_req_timeout_(0)
, grace_period_of_lagging_state_machine_(0)
, use_new_joiner_type_(false)
, use_bg_thread_for_snapshot_io_(false)
, use_full_consensus_among_healthy_members_(false)
, parallel_log_appending_(false)
Expand Down Expand Up @@ -553,6 +554,20 @@ public:
*/
int32 grace_period_of_lagging_state_machine_;

/**
* If `true`, the new joiner will be added to cluster config as a `new_joiner`
* even before syncing all data. The new joiner will not initiate a vote or
* participate in leader election.
*
* Once the log gap becomes smaller than `log_sync_stop_gap_`, the new joiner
* will be a regular member.
*
* The purpose of this featuer is to preserve the new joiner information
* even after leader re-election, in order to let the new leader continue
* the sync process without calling `add_srv` again.
*/
bool use_new_joiner_type_;

/**
* (Experimental)
* If `true`, reading snapshot objects will be done by a background thread
Expand Down
12 changes: 12 additions & 0 deletions include/libnuraft/srv_config.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public:
, dc_id_(0)
, endpoint_(endpoint)
, learner_(false)
, new_joiner_(false)
, priority_(INIT_PRIORITY)
{}

Expand All @@ -55,6 +56,7 @@ public:
, endpoint_(endpoint)
, aux_(aux)
, learner_(learner)
, new_joiner_(false)
, priority_(priority)
{}

Expand All @@ -75,6 +77,10 @@ public:

bool is_learner() const { return learner_; }

bool is_new_joiner() const { return new_joiner_; }

void set_new_joiner(bool to) { new_joiner_ = to; }

int32 get_priority() const { return priority_; }

void set_priority(const int32 new_val) { priority_ = new_val; }
Expand Down Expand Up @@ -111,6 +117,12 @@ private:
*/
bool learner_;

/**
* `true` if this node is a new joiner, but not yet fully synced.
* New joiner will not initiate or participate in leader election.
*/
bool new_joiner_;

/**
* Priority of this node.
* 0 will never be a leader.
Expand Down
1 change: 1 addition & 0 deletions scripts/test/runtests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ set -e
./tests/strfmt_test --abort-on-failure
./tests/stat_mgr_test --abort-on-failure
./tests/raft_server_test --abort-on-failure
./tests/new_joiner_test --abort-on-failure
./tests/failure_test --abort-on-failure
./tests/asio_service_test --abort-on-failure
41 changes: 40 additions & 1 deletion src/handle_append_entries.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ ptr<resp_msg> raft_server::handle_append_entries(req_msg& req)
// may cause stepping down of this node.
ptr<cluster_config> cur_config = get_config();
ptr<srv_config> my_config = cur_config->get_server(id_);
if (my_config) {
if (my_config && !my_config->is_new_joiner()) {
p_in("catch-up process is done, clearing the flag");
catching_up_ = false;
}
Expand Down Expand Up @@ -1081,6 +1081,45 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
resp.get_next_idx(), p->get_next_log_idx() );
}

if (!config_changing_ && p->get_config().is_new_joiner()) {
auto params = ctx_->get_params();
uint64_t log_sync_stop_gap =
params->log_sync_stop_gap_ ? params->log_sync_stop_gap_ : 1;
uint64_t matched_idx = p->get_matched_idx();
uint64_t next_slot = log_store_->next_slot();
if (matched_idx + log_sync_stop_gap >= next_slot) {
p_in("peer %d is no longer a new joiner, matched index: %" PRIu64 ", "
"next slot: %" PRIu64 ", sync stop gap: %" PRIu64
", set new joiner flag to false",
p->get_id(), matched_idx, next_slot, log_sync_stop_gap);

// Clone the current cluster config.
ptr<cluster_config> cur_conf = get_config();
ptr<buffer> enc_conf_buf = cur_conf->serialize();
ptr<cluster_config> new_conf = cluster_config::deserialize(*enc_conf_buf);
new_conf->set_log_idx(log_store_->next_slot());

// Remove new joiner flag.
for (auto& ss: new_conf->get_servers()) {
if (ss->get_id() == p->get_id()) {
ss->set_new_joiner(false);
break;
}
}

ptr<buffer> new_conf_buf(new_conf->serialize());
ptr<log_entry> entry( cs_new<log_entry>( state_->get_term(),
new_conf_buf,
log_val_type::conf,
timer_helper::get_timeofday_us() ) );
store_log_entry(entry);
config_changing_ = true;
uncommitted_config_ = new_conf;
request_append_entries();
return;
}
}

// NOTE:
// If all other followers are not responding, we may not make
// below condition true. In that case, we check the timeout of
Expand Down
15 changes: 12 additions & 3 deletions src/handle_commit.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -757,10 +757,17 @@ void raft_server::reconfigure(const ptr<cluster_config>& new_config) {
if (id_ == (*it)->get_id()) {
my_priority_ = (*it)->get_priority();
steps_to_down_ = 0;
if (role_ == srv_role::follower &&
if (!(*it)->is_new_joiner() &&
role_ == srv_role::follower &&
catching_up_) {
// If this node is newly added, start election timer
// without waiting for the next append_entries message.
// Except for new joiner type, if this server is added
// to the cluster config, that means the sync is done.
// Start election timer without waiting for
// the next append_entries message.
//
// If this server is a new joiner, `catching_up_` flag
// will be cleared when it becomes a regular member,
// that is also notified by a new cluster config.
p_in("now this node is the part of cluster, "
"catch-up process is done, clearing the flag");
catching_up_ = false;
Expand Down Expand Up @@ -799,6 +806,7 @@ void raft_server::reconfigure(const ptr<cluster_config>& new_config) {
str_buf << "add peer " << srv_added->get_id()
<< ", " << srv_added->get_endpoint()
<< ", " << (srv_added->is_learner() ? "learner" : "voting member")
<< ", " << (srv_added->is_new_joiner() ? "new joiner" : "regular")
<< std::endl;

peers_.insert(std::make_pair(srv_added->get_id(), p));
Expand Down Expand Up @@ -940,6 +948,7 @@ void raft_server::reconfigure(const ptr<cluster_config>& new_config) {
<< ", DC ID " << s_conf->get_dc_id()
<< ", " << s_conf->get_endpoint()
<< ", " << (s_conf->is_learner() ? "learner" : "voting member")
<< ", " << (s_conf->is_new_joiner() ? "new joiner" : "regular member")
<< ", " << s_conf->get_priority()
<< std::endl;
}
Expand Down
12 changes: 9 additions & 3 deletions src/handle_join_leave.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,13 @@ ptr<resp_msg> raft_server::handle_add_srv_req(req_msg& req) {

// Before checking duplicate ID, confirm srv_to_leave_ is gone.
check_srv_to_leave_timeout();
ptr<srv_config> srv_conf =
srv_config::deserialize( entries[0]->get_buf() );
ptr<srv_config> srv_conf = srv_config::deserialize( entries[0]->get_buf() );

ptr<raft_params> params = ctx_->get_params();
if (params->use_new_joiner_type_) {
srv_conf->set_new_joiner(true);
}

if ( peers_.find( srv_conf->get_id() ) != peers_.end() ||
id_ == srv_conf->get_id() ) {
p_wn( "the server to be added has a duplicated "
Expand Down Expand Up @@ -232,7 +237,8 @@ void raft_server::sync_log_to_new_srv(ulong start_idx) {
ptr<raft_params> params = ctx_->get_params();
if ( ( params->log_sync_stop_gap_ > 0 &&
gap < (ulong)params->log_sync_stop_gap_ ) ||
params->log_sync_stop_gap_ == 0 ) {
params->log_sync_stop_gap_ == 0 ||
params->use_new_joiner_type_ ) {
p_in( "[SYNC LOG] LogSync is done for server %d "
"with log gap %" PRIu64 " (%" PRIu64 " - %" PRIu64 ", limit %d), "
"now put the server into cluster",
Expand Down
1 change: 1 addition & 0 deletions src/handle_timeout.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ void raft_server::stop_election_timer() {
return;
}

p_tr("stop election timer");
cancel_task(election_task_);
}

Expand Down
2 changes: 1 addition & 1 deletion src/handle_vote.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ void raft_server::request_vote(bool force_vote) {
for (peer_itor it = peers_.begin(); it != peers_.end(); ++it) {
ptr<peer> pp = it->second;
if (!is_regular_member(pp)) {
// Do not send voting request to learner.
// Do not send voting request to learner or new joiner.
continue;
}
ptr<req_msg> req = cs_new<req_msg>
Expand Down
5 changes: 5 additions & 0 deletions src/raft_server.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ void raft_server::apply_and_log_current_params() {
"max batch %d, backoff %d, snapshot distance %d, "
"enable randomized snapshot creation %s, "
"log sync stop gap %d, "
"use new joiner type %s, "
"reserved logs %d, client timeout %d, "
"auto forwarding %s, API call type %s, "
"custom commit quorum size %d, "
Expand All @@ -411,6 +412,7 @@ void raft_server::apply_and_log_current_params() {
params->snapshot_distance_,
params->enable_randomized_snapshot_creation_ ? "YES" : "NO",
params->log_sync_stop_gap_,
params->use_new_joiner_type_ ? "YES" : "NO",
params->reserved_log_items_,
params->client_req_timeout_,
( params->auto_forwarding_ ? "ON" : "OFF" ),
Expand Down Expand Up @@ -546,6 +548,9 @@ bool raft_server::is_regular_member(const ptr<peer>& p) {
// Skip learner.
if (p->is_learner()) return false;

// Skip new joiner.
if (p->is_new_joiner()) return false;

return true;
}

Expand Down
22 changes: 19 additions & 3 deletions src/srv_config.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ limitations under the License.

namespace nuraft {

static const uint8_t LEARNER_FLAG = 0x1;
static const uint8_t NEW_JOINER_FLAG = 0x2;

ptr<srv_config> srv_config::deserialize(buffer& buf) {
buffer_serializer bs(buf);
return deserialize(bs);
Expand All @@ -34,9 +37,17 @@ ptr<srv_config> srv_config::deserialize(buffer_serializer& bs) {
const char* aux_char = bs.get_cstr();
std::string endpoint( (endpoint_char) ? endpoint_char : std::string() );
std::string aux( (aux_char) ? aux_char : std::string() );
byte is_learner = bs.get_u8();

uint8_t srv_type = bs.get_u8();
bool is_learner = (srv_type & LEARNER_FLAG) ? true : false;
bool new_joiner = (srv_type & NEW_JOINER_FLAG) ? true : false;

int32 priority = bs.get_i32();
return cs_new<srv_config>(id, dc_id, endpoint, aux, is_learner, priority);

ptr<srv_config> ret =
cs_new<srv_config>(id, dc_id, endpoint, aux, is_learner, priority);
ret->set_new_joiner(new_joiner);
return ret;
}

ptr<buffer> srv_config::serialize() const{
Expand All @@ -52,7 +63,12 @@ ptr<buffer> srv_config::serialize() const{
buf->put(dc_id_);
buf->put(endpoint_);
buf->put(aux_);
buf->put((byte)(learner_?(1):(0)));

uint8_t srv_type = 0x0;
srv_type |= (learner_ ? LEARNER_FLAG : 0x0);
srv_type |= (new_joiner_ ? NEW_JOINER_FLAG : 0x0);
buf->put((byte)srv_type);

buf->put(priority_);
buf->pos(0);
return buf;
Expand Down
13 changes: 12 additions & 1 deletion tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# === Basic Raft server functionality test ===
# === Basic Raft server functionality tests without real network stack ===
add_executable(raft_server_test
unit/raft_server_test.cxx
unit/fake_network.cxx
Expand All @@ -9,6 +9,17 @@ add_dependencies(raft_server_test
target_link_libraries(raft_server_test
${BUILD_DIR}/${LIBRARY_OUTPUT_NAME})

add_executable(new_joiner_test
unit/new_joiner_test.cxx
unit/fake_network.cxx
${EXAMPLES_SRC}/logger.cc
${EXAMPLES_SRC}/in_memory_log_store.cxx)
add_dependencies(new_joiner_test
static_lib)
target_link_libraries(new_joiner_test
${BUILD_DIR}/${LIBRARY_OUTPUT_NAME})


# === Failure recovery & conflict resolution test ===
add_executable(failure_test
unit/failure_test.cxx
Expand Down
Loading
Loading