Skip to content

Commit

Permalink
[fix](env) state listener avoid endless waiting #27881 (#41462)
Browse files Browse the repository at this point in the history
cherry pick from #27881
  • Loading branch information
w41ter authored Sep 30, 2024
1 parent 4058860 commit f6beec3
Showing 1 changed file with 21 additions and 6 deletions.
27 changes: 21 additions & 6 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -925,8 +925,7 @@ public void initialize(String[] args) throws Exception {
createTxnCleaner();

// 6. start state listener thread
createStateListener();
listener.start();
startStateListener();

if (!Config.edit_log_type.equalsIgnoreCase("bdb")) {
// If not using bdb, we need to notify the FE type transfer manually.
Expand Down Expand Up @@ -1479,11 +1478,22 @@ void advanceNextId() {
* 2. register some hook.
* If there is, add them here.
*/
public void postProcessAfterMetadataReplayed(boolean waitCatalogReady) {
public boolean postProcessAfterMetadataReplayed(boolean waitCatalogReady) {
if (waitCatalogReady) {
while (!isReady()) {
// Avoid endless waiting if the state has changed.
//
// Consider the following situation:
// 1. The follower replay journals and is not set to ready because the synchronization internval
// exceeds meta delay toleration seconds.
// 2. The underlying BEBJE node of this follower is selected as the master, but the state listener
// thread is waiting for catalog ready.
if (typeTransferQueue.peek() != null) {
return false;
}

try {
Thread.sleep(10 * 1000);
Thread.sleep(100);
} catch (InterruptedException e) {
LOG.warn("", e);
}
Expand All @@ -1492,6 +1502,7 @@ public void postProcessAfterMetadataReplayed(boolean waitCatalogReady) {

auth.rectifyPrivs();
catalogMgr.registerCatalogRefreshListener(this);
return true;
}

// start all daemon threads only running on Master
Expand Down Expand Up @@ -1609,7 +1620,10 @@ private void transferToNonMaster(FrontendNodeType newType) {
}

// 'isReady' will be set to true in 'setCanRead()' method
postProcessAfterMetadataReplayed(true);
if (!postProcessAfterMetadataReplayed(true)) {
// the state has changed, exit early.
return;
}

checkLowerCaseTableNames();

Expand Down Expand Up @@ -2466,7 +2480,7 @@ public void notifyNewFETypeTransfer(FrontendNodeType newType) {
}
}

public void createStateListener() {
public void startStateListener() {
listener = new Daemon("stateListener", STATE_CHANGE_CHECK_INTERVAL_MS) {
@Override
protected synchronized void runOneCycle() {
Expand Down Expand Up @@ -2574,6 +2588,7 @@ protected synchronized void runOneCycle() {
};

listener.setMetaContext(metaContext);
listener.start();
}

public synchronized boolean replayJournal(long toJournalId) {
Expand Down

0 comments on commit f6beec3

Please sign in to comment.