Skip to content

Commit

Permalink
Fix testClusterHealthRestCancellation (#113680) (#113756)
Browse files Browse the repository at this point in the history
This test was failing due to a race between an early cancellation check
and the cancel operation. With this commit we wait until the action is
definitely blocked before cancelling the task.

Closes #100062
  • Loading branch information
DaveCTurner committed Sep 30, 2024
1 parent 2fc2d05 commit abb48bd
Showing 1 changed file with 14 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,15 @@
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.test.junit.annotations.TestIssueLogging;

import java.util.concurrent.CancellationException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.action.support.ActionTestUtils.wrapAsRestResponseListener;
import static org.elasticsearch.test.TaskAssertions.assertAllCancellableTasksAreCancelled;
import static org.elasticsearch.test.TaskAssertions.awaitTaskWithPrefixOnMaster;

public class ClusterHealthRestCancellationIT extends HttpSmokeTestCase {

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/100062")
@TestIssueLogging(
issueUrl = "https://github.com/elastic/elasticsearch/issues/100062",
value = "org.elasticsearch.test.TaskAssertions:TRACE"
+ ",org.elasticsearch.cluster.service.MasterService:TRACE"
+ ",org.elasticsearch.tasks.TaskManager:TRACE"
)
public void testClusterHealthRestCancellation() throws Exception {

final var barrier = new CyclicBarrier(2);
Expand All @@ -47,18 +37,7 @@ public void testClusterHealthRestCancellation() throws Exception {
@Override
public ClusterState execute(ClusterState currentState) {
safeAwait(barrier);
// safeAwait(barrier);

// temporarily lengthen timeout on safeAwait while investigating #100062
try {
barrier.await(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError("unexpected", e);
} catch (Exception e) {
throw new AssertionError("unexpected", e);
}

safeAwait(barrier);
return currentState;
}

Expand All @@ -72,12 +51,23 @@ public void onFailure(Exception e) {
clusterHealthRequest.addParameter("wait_for_events", Priority.LANGUID.toString());

final PlainActionFuture<Response> future = new PlainActionFuture<>();
logger.info("--> sending cluster state request");
logger.info("--> sending cluster health request");
final Cancellable cancellable = getRestClient().performRequestAsync(clusterHealthRequest, wrapAsRestResponseListener(future));

safeAwait(barrier);

awaitTaskWithPrefixOnMaster(TransportClusterHealthAction.NAME);
// wait until the health request is waiting on the (blocked) master service
assertBusy(
() -> assertTrue(
internalCluster().getCurrentMasterNodeInstance(ClusterService.class)
.getMasterService()
.pendingTasks()
.stream()
.anyMatch(
pendingClusterTask -> pendingClusterTask.source().string().equals("cluster_health (wait_for_events [LANGUID])")
)
)
);

logger.info("--> cancelling cluster health request");
cancellable.cancel();
Expand Down

0 comments on commit abb48bd

Please sign in to comment.