Skip to content

Commit

Permalink
Optimize error handling after lazy rollovers (#111572) (#113684)
Browse files Browse the repository at this point in the history
This commit improves the performance of the error-handling process after a lazy rollover or an index creation failed.

Co-authored-by: Kushal-Dalasaniya <[email protected]>
Co-authored-by: Elastic Machine <[email protected]>
  • Loading branch information
3 people committed Sep 28, 2024
1 parent ea90bdb commit 74430ee
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 255 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
private final ConcurrentLinkedQueue<BulkItemRequest> failureStoreRedirects = new ConcurrentLinkedQueue<>();
private final long startTimeNanos;
private final ClusterStateObserver observer;
private final Map<String, IndexNotFoundException> indicesThatCannotBeCreated;
private final Executor executor;
private final LongSupplier relativeTimeProvider;
private final FailureStoreDocumentConverter failureStoreDocumentConverter;
Expand All @@ -107,7 +106,6 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
BulkRequest bulkRequest,
NodeClient client,
AtomicArray<BulkItemResponse> responses,
Map<String, IndexNotFoundException> indicesThatCannotBeCreated,
IndexNameExpressionResolver indexNameExpressionResolver,
LongSupplier relativeTimeProvider,
long startTimeNanos,
Expand All @@ -122,7 +120,6 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
bulkRequest,
client,
responses,
indicesThatCannotBeCreated,
indexNameExpressionResolver,
relativeTimeProvider,
startTimeNanos,
Expand All @@ -141,7 +138,6 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
BulkRequest bulkRequest,
NodeClient client,
AtomicArray<BulkItemResponse> responses,
Map<String, IndexNotFoundException> indicesThatCannotBeCreated,
IndexNameExpressionResolver indexNameExpressionResolver,
LongSupplier relativeTimeProvider,
long startTimeNanos,
Expand All @@ -158,7 +154,6 @@ final class BulkOperation extends ActionRunnable<BulkResponse> {
this.bulkRequest = bulkRequest;
this.listener = listener;
this.startTimeNanos = startTimeNanos;
this.indicesThatCannotBeCreated = indicesThatCannotBeCreated;
this.executor = executor;
this.relativeTimeProvider = relativeTimeProvider;
this.indexNameExpressionResolver = indexNameExpressionResolver;
Expand Down Expand Up @@ -298,9 +293,6 @@ private Map<ShardId, List<BulkItemRequest>> groupRequestsByShards(
if (addFailureIfRequiresAliasAndAliasIsMissing(docWriteRequest, bulkItemRequest.id(), metadata)) {
continue;
}
if (addFailureIfIndexCannotBeCreated(docWriteRequest, bulkItemRequest.id())) {
continue;
}
if (addFailureIfRequiresDataStreamAndNoParentDataStream(docWriteRequest, bulkItemRequest.id(), metadata)) {
continue;
}
Expand Down Expand Up @@ -759,18 +751,6 @@ private boolean addFailureIfIndexIsClosed(DocWriteRequest<?> request, Index conc
return false;
}

private boolean addFailureIfIndexCannotBeCreated(DocWriteRequest<?> request, int idx) {
IndexNotFoundException cannotCreate = indicesThatCannotBeCreated.get(request.index());
if (cannotCreate != null) {
var failureStoreStatus = isFailureStoreRequest(request)
? IndexDocFailureStoreStatus.FAILED
: IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN;
addFailureAndDiscardRequest(request, idx, request.index(), cannotCreate, failureStoreStatus);
return true;
}
return false;
}

private static boolean isFailureStoreRequest(DocWriteRequest<?> request) {
return request instanceof IndexRequest ir && ir.isWriteToFailureStore();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.indices.SystemIndices;
Expand All @@ -60,6 +59,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -351,29 +351,36 @@ protected void createMissingIndicesAndIndexData(
final AtomicArray<BulkItemResponse> responses = new AtomicArray<>(bulkRequest.requests.size());
// Optimizing when there are no prerequisite actions
if (indicesToAutoCreate.isEmpty() && dataStreamsToBeRolledOver.isEmpty() && failureStoresToBeRolledOver.isEmpty()) {
executeBulk(task, bulkRequest, startTimeNanos, listener, executor, responses, Map.of());
executeBulk(task, bulkRequest, startTimeNanos, listener, executor, responses);
return;
}
final Map<String, IndexNotFoundException> indicesThatCannotBeCreated = new HashMap<>();
Map<String, Exception> indicesExceptions = new ConcurrentHashMap<>();
Map<String, Exception> dataStreamExceptions = new ConcurrentHashMap<>();
Map<String, Exception> failureStoreExceptions = new ConcurrentHashMap<>();
Runnable executeBulkRunnable = () -> executor.execute(new ActionRunnable<>(listener) {
@Override
protected void doRun() {
executeBulk(task, bulkRequest, startTimeNanos, listener, executor, responses, indicesThatCannotBeCreated);
failRequestsWhenPrerequisiteActionFailed(
indicesExceptions,
dataStreamExceptions,
failureStoreExceptions,
bulkRequest,
responses
);
executeBulk(task, bulkRequest, startTimeNanos, listener, executor, responses);
}
});
try (RefCountingRunnable refs = new RefCountingRunnable(executeBulkRunnable)) {
createIndices(bulkRequest, indicesToAutoCreate, indicesThatCannotBeCreated, responses, refs);
rollOverDataStreams(bulkRequest, dataStreamsToBeRolledOver, false, responses, refs);
rollOverDataStreams(bulkRequest, failureStoresToBeRolledOver, true, responses, refs);
createIndices(indicesToAutoCreate, refs, indicesExceptions);
rollOverDataStreams(bulkRequest, dataStreamsToBeRolledOver, false, refs, dataStreamExceptions);
rollOverDataStreams(bulkRequest, failureStoresToBeRolledOver, true, refs, failureStoreExceptions);
}
}

private void createIndices(
BulkRequest bulkRequest,
Map<String, CreateIndexRequest> indicesToAutoCreate,
Map<String, IndexNotFoundException> indicesThatCannotBeCreated,
AtomicArray<BulkItemResponse> responses,
RefCountingRunnable refs
RefCountingRunnable refs,
final Map<String, Exception> indicesExceptions
) {
for (Map.Entry<String, CreateIndexRequest> indexEntry : indicesToAutoCreate.entrySet()) {
final String index = indexEntry.getKey();
Expand All @@ -384,25 +391,26 @@ public void onResponse(CreateIndexResponse createIndexResponse) {}
@Override
public void onFailure(Exception e) {
final Throwable cause = ExceptionsHelper.unwrapCause(e);
if (cause instanceof IndexNotFoundException indexNotFoundException) {
synchronized (indicesThatCannotBeCreated) {
indicesThatCannotBeCreated.put(index, indexNotFoundException);
}
} else if ((cause instanceof ResourceAlreadyExistsException) == false) {
if ((cause instanceof ResourceAlreadyExistsException) == false) {
// fail all requests involving this index, if create didn't work
failRequestsWhenPrerequisiteActionFailed(index, bulkRequest, responses, e);
indicesExceptions.put(index, e);
}
}
}, refs.acquire()));
}
}

// Separate method to allow for overriding in tests.
void createIndex(CreateIndexRequest createIndexRequest, ActionListener<CreateIndexResponse> listener) {
client.execute(AutoCreateAction.INSTANCE, createIndexRequest, listener);
}

private void rollOverDataStreams(
BulkRequest bulkRequest,
Set<String> dataStreamsToBeRolledOver,
boolean targetFailureStore,
AtomicArray<BulkItemResponse> responses,
RefCountingRunnable refs
RefCountingRunnable refs,
Map<String, Exception> dataStreamExceptions
) {
for (String dataStream : dataStreamsToBeRolledOver) {
RolloverRequest rolloverRequest = new RolloverRequest(dataStream, null);
Expand All @@ -416,7 +424,7 @@ private void rollOverDataStreams(
}
// We are executing a lazy rollover because it is an action specialised for this situation, when we want an
// unconditional and performant rollover.
rolloverClient.execute(LazyRolloverAction.INSTANCE, rolloverRequest, ActionListener.releaseAfter(new ActionListener<>() {
rollOver(rolloverRequest, ActionListener.releaseAfter(new ActionListener<>() {

@Override
public void onResponse(RolloverResponse result) {
Expand All @@ -431,26 +439,52 @@ public void onResponse(RolloverResponse result) {

@Override
public void onFailure(Exception e) {
failRequestsWhenPrerequisiteActionFailed(dataStream, bulkRequest, responses, e);
dataStreamExceptions.put(dataStream, e);
}
}, refs.acquire()));
}
}

// Separate method to allow for overriding in tests.
void rollOver(RolloverRequest rolloverRequest, ActionListener<RolloverResponse> listener) {
rolloverClient.execute(LazyRolloverAction.INSTANCE, rolloverRequest, listener);
}

/**
* Fails all requests involving this index or data stream because the prerequisite action failed too.
* Mark all the requests for which the prerequisite action failed (i.e. index creation or data stream/failure store rollover) as failed.
*/
private static void failRequestsWhenPrerequisiteActionFailed(
String target,
private void failRequestsWhenPrerequisiteActionFailed(
Map<String, Exception> indicesExceptions,
Map<String, Exception> dataStreamExceptions,
Map<String, Exception> failureStoreExceptions,
BulkRequest bulkRequest,
AtomicArray<BulkItemResponse> responses,
Exception error
AtomicArray<BulkItemResponse> responses
) {
if (indicesExceptions.isEmpty() && dataStreamExceptions.isEmpty() && failureStoreExceptions.isEmpty()) {
return;
}
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest<?> request = bulkRequest.requests.get(i);
if (request != null && setResponseFailureIfIndexMatches(responses, i, request, target, error)) {
bulkRequest.requests.set(i, null);
if (request == null) {
continue;
}
var exception = indicesExceptions.get(request.index());
if (exception == null) {
if (request instanceof IndexRequest indexRequest && indexRequest.isWriteToFailureStore()) {
exception = failureStoreExceptions.get(request.index());
} else {
exception = dataStreamExceptions.get(request.index());
}
}
if (exception == null) {
continue;
}
var failureStoreStatus = request instanceof IndexRequest ir && ir.isWriteToFailureStore()
? IndexDocFailureStoreStatus.FAILED
: IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN;
var failure = new BulkItemResponse.Failure(request.index(), request.id(), exception, failureStoreStatus);
responses.set(i, BulkItemResponse.failure(i, request.opType(), failure));
bulkRequest.requests.set(i, null);
}
}

Expand Down Expand Up @@ -532,33 +566,13 @@ private static boolean isSystemIndex(SortedMap<String, IndexAbstraction> indices
}
}

void createIndex(CreateIndexRequest createIndexRequest, ActionListener<CreateIndexResponse> listener) {
client.execute(AutoCreateAction.INSTANCE, createIndexRequest, listener);
}

private static boolean setResponseFailureIfIndexMatches(
AtomicArray<BulkItemResponse> responses,
int idx,
DocWriteRequest<?> request,
String index,
Exception e
) {
if (index.equals(request.index())) {
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(request.index(), request.id(), e);
responses.set(idx, BulkItemResponse.failure(idx, request.opType(), failure));
return true;
}
return false;
}

void executeBulk(
Task task,
BulkRequest bulkRequest,
long startTimeNanos,
ActionListener<BulkResponse> listener,
Executor executor,
AtomicArray<BulkItemResponse> responses,
Map<String, IndexNotFoundException> indicesThatCannotBeCreated
AtomicArray<BulkItemResponse> responses
) {
new BulkOperation(
task,
Expand All @@ -568,7 +582,6 @@ void executeBulk(
bulkRequest,
client,
responses,
indicesThatCannotBeCreated,
indexNameExpressionResolver,
relativeTimeNanosProvider,
startTimeNanos,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -1022,7 +1021,6 @@ private BulkOperation newBulkOperation(NodeClient client, BulkRequest request, A
client,
request,
new AtomicArray<>(request.numberOfActions()),
Map.of(),
mockObserver(DEFAULT_STATE),
listener,
new FailureStoreDocumentConverter()
Expand All @@ -1040,7 +1038,6 @@ private BulkOperation newBulkOperation(
client,
request,
new AtomicArray<>(request.numberOfActions()),
Map.of(),
mockObserver(DEFAULT_STATE),
listener,
failureStoreDocumentConverter
Expand All @@ -1059,7 +1056,6 @@ private BulkOperation newBulkOperation(
client,
request,
new AtomicArray<>(request.numberOfActions()),
Map.of(),
observer,
listener,
new FailureStoreDocumentConverter()
Expand All @@ -1071,7 +1067,6 @@ private BulkOperation newBulkOperation(
NodeClient client,
BulkRequest request,
AtomicArray<BulkItemResponse> existingResponses,
Map<String, IndexNotFoundException> indicesThatCanNotBeCreated,
ClusterStateObserver observer,
ActionListener<BulkResponse> listener,
FailureStoreDocumentConverter failureStoreDocumentConverter
Expand Down Expand Up @@ -1100,7 +1095,6 @@ private BulkOperation newBulkOperation(
request,
client,
existingResponses,
indicesThatCanNotBeCreated,
indexNameExpressionResolver,
() -> endTime,
timeZero,
Expand Down
Loading

0 comments on commit 74430ee

Please sign in to comment.