Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up dangling S3 multipart uploads #111955

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/changelog/111955.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
pr: 111955
summary: Clean up dangling S3 multipart uploads
area: Snapshot/Restore
type: enhancement
issues:
- 101169
- 44971
36 changes: 9 additions & 27 deletions docs/reference/snapshot-restore/repository-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,15 @@ include::repository-shared-settings.asciidoc[]
https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html[AWS
DeleteObjects API].

`max_multipart_upload_cleanup_size`::

(<<number,numeric>>) Sets the maximum number of possibly-dangling multipart
uploads to clean up in each batch of snapshot deletions. Defaults to `1000`
which is the maximum number supported by the
https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListMultipartUploads.html[AWS
ListMultipartUploads API]. If set to `0`, {es} will not attempt to clean up
dangling multipart uploads.

NOTE: The option of defining client settings in the repository settings as
documented below is considered deprecated, and will be removed in a future
version.
Expand Down Expand Up @@ -492,33 +501,6 @@ by the `elasticsearch` user. By default, {es} runs as user `elasticsearch` using

If the symlink exists, it will be used by default by all S3 repositories that don't have explicit `client` credentials.

==== Cleaning up multi-part uploads

{es} uses S3's multi-part upload process to upload larger blobs to the
repository. The multi-part upload process works by dividing each blob into
smaller parts, uploading each part independently, and then completing the
upload in a separate step. This reduces the amount of data that {es} must
re-send if an upload fails: {es} only needs to re-send the part that failed
rather than starting from the beginning of the whole blob. The storage for each
part is charged independently starting from the time at which the part was
uploaded.

If a multi-part upload cannot be completed then it must be aborted in order to
delete any parts that were successfully uploaded, preventing further storage
charges from accumulating. {es} will automatically abort a multi-part upload on
failure, but sometimes the abort request itself fails. For example, if the
repository becomes inaccessible or the instance on which {es} is running is
terminated abruptly then {es} cannot complete or abort any ongoing uploads.

You must make sure that failed uploads are eventually aborted to avoid
unnecessary storage costs. You can use the
https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListMultipartUploads.html[List
multipart uploads API] to list the ongoing uploads and look for any which are
unusually long-running, or you can
https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpu-abort-incomplete-mpu-lifecycle-config.html[configure
a bucket lifecycle policy] to automatically abort incomplete uploads once they
reach a certain age.

[[repository-s3-aws-vpc]]
==== AWS VPC bandwidth settings

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,20 @@
import fixture.s3.S3HttpHandler;

import com.amazonaws.http.AmazonHttpClient;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
import com.amazonaws.services.s3.model.MultipartUpload;
import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.LogEvent;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
Expand Down Expand Up @@ -54,6 +61,7 @@
import org.elasticsearch.telemetry.TestTelemetryPlugin;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.MockLog;
import org.elasticsearch.test.junit.annotations.TestIssueLogging;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.NamedXContentRegistry;
Expand All @@ -70,6 +78,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
Expand All @@ -81,6 +90,7 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasEntry;
Expand Down Expand Up @@ -451,6 +461,106 @@ private Map<S3BlobStore.StatsKey, Long> getServerMetrics() {
return Collections.emptyMap();
}

public void testMultipartUploadCleanup() {
final String repoName = randomRepositoryName();
createRepository(repoName, repositorySettings(repoName), true);

createIndex("test-idx-1");
for (int i = 0; i < 100; i++) {
prepareIndex("test-idx-1").setId(Integer.toString(i)).setSource("foo", "bar" + i).get();
}
client().admin().indices().prepareRefresh().get();

final String snapshotName = randomIdentifier();
CreateSnapshotResponse createSnapshotResponse = clusterAdmin().prepareCreateSnapshot(TEST_REQUEST_TIMEOUT, repoName, snapshotName)
.setWaitForCompletion(true)
.get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(
createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())
);

final var repository = asInstanceOf(
S3Repository.class,
internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName)
);
final var blobStore = asInstanceOf(S3BlobStore.class, asInstanceOf(BlobStoreWrapper.class, repository.blobStore()).delegate());

try (var clientRef = blobStore.clientReference()) {
final var danglingBlobName = randomIdentifier();
final var initiateMultipartUploadRequest = new InitiateMultipartUploadRequest(
blobStore.bucket(),
blobStore.blobContainer(repository.basePath().add("test-multipart-upload")).path().buildAsString() + danglingBlobName
);
initiateMultipartUploadRequest.putCustomQueryParameter(
S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE,
OperationPurpose.SNAPSHOT_DATA.getKey()
);
final var multipartUploadResult = clientRef.client().initiateMultipartUpload(initiateMultipartUploadRequest);

final var listMultipartUploadsRequest = new ListMultipartUploadsRequest(blobStore.bucket()).withPrefix(
repository.basePath().buildAsString()
);
listMultipartUploadsRequest.putCustomQueryParameter(
S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE,
OperationPurpose.SNAPSHOT_DATA.getKey()
);
assertEquals(
List.of(multipartUploadResult.getUploadId()),
clientRef.client()
.listMultipartUploads(listMultipartUploadsRequest)
.getMultipartUploads()
.stream()
.map(MultipartUpload::getUploadId)
.toList()
);

final var seenCleanupLogLatch = new CountDownLatch(1);
MockLog.assertThatLogger(() -> {
assertAcked(clusterAdmin().prepareDeleteSnapshot(TEST_REQUEST_TIMEOUT, repoName, snapshotName));
safeAwait(seenCleanupLogLatch);
},
S3BlobContainer.class,
new MockLog.SeenEventExpectation(
"found-dangling",
S3BlobContainer.class.getCanonicalName(),
Level.INFO,
"found [1] possibly-dangling multipart uploads; will clean them up after finalizing the current snapshot deletions"
),
new MockLog.SeenEventExpectation(
"cleaned-dangling",
S3BlobContainer.class.getCanonicalName(),
Level.INFO,
Strings.format(
"cleaned up dangling multipart upload [%s] of blob [%s]*test-multipart-upload/%s]",
multipartUploadResult.getUploadId(),
repoName,
danglingBlobName
)
) {
@Override
public void match(LogEvent event) {
super.match(event);
if (Regex.simpleMatch(message, event.getMessage().getFormattedMessage())) {
seenCleanupLogLatch.countDown();
}
}
}
);

assertThat(
clientRef.client()
.listMultipartUploads(listMultipartUploadsRequest)
.getMultipartUploads()
.stream()
.map(MultipartUpload::getUploadId)
.toList(),
empty()
);
}
}

/**
* S3RepositoryPlugin that allows to disable chunked encoding and to set a low threshold between single upload and multipart upload.
*/
Expand Down Expand Up @@ -592,6 +702,9 @@ public void maybeTrack(final String rawRequest, Headers requestHeaders) {
trackRequest("ListObjects");
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.LIST_OBJECTS, purpose), k -> new AtomicLong())
.incrementAndGet();
} else if (Regex.simpleMatch("GET /*/?uploads&*", request)) {
// TODO track ListMultipartUploads requests
logger.info("--> ListMultipartUploads not tracked [{}] with parsed purpose [{}]", request, purpose.getKey());
} else if (Regex.simpleMatch("GET /*/*", request)) {
trackRequest("GetObject");
metricsCount.computeIfAbsent(new S3BlobStore.StatsKey(S3BlobStore.Operation.GET_OBJECT, purpose), k -> new AtomicLong())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@
import com.amazonaws.services.s3.model.UploadPartResult;
import com.amazonaws.util.ValidationUtils;

import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobContainer;
Expand All @@ -54,6 +58,7 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.ChunkedBlobOutputStream;
import org.elasticsearch.repositories.s3.S3BlobStore.Operation;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -912,4 +917,94 @@ public void getRegister(OperationPurpose purpose, String key, ActionListener<Opt
}
});
}

ActionListener<Void> getMultipartUploadCleanupListener(int maxUploads, RefCountingRunnable refs) {
try (var clientReference = blobStore.clientReference()) {
final var bucket = blobStore.bucket();
final var request = new ListMultipartUploadsRequest(bucket).withPrefix(keyPath).withMaxUploads(maxUploads);
request.putCustomQueryParameter(S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE, OperationPurpose.SNAPSHOT_DATA.getKey());
final var multipartUploadListing = SocketAccess.doPrivileged(() -> clientReference.client().listMultipartUploads(request));
final var multipartUploads = multipartUploadListing.getMultipartUploads();
if (multipartUploads.isEmpty()) {
logger.debug("found no multipart uploads to clean up");
return ActionListener.noop();
} else {
// the uploads are only _possibly_ dangling because it's also possible we're no longer then master and the new master has
// started some more shard snapshots
if (multipartUploadListing.isTruncated()) {
logger.info("""
found at least [{}] possibly-dangling multipart uploads; will clean up the first [{}] after finalizing \
the current snapshot deletions, and will check for further possibly-dangling multipart uploads in future \
snapshot deletions""", multipartUploads.size(), multipartUploads.size());
} else {
logger.info("""
found [{}] possibly-dangling multipart uploads; \
will clean them up after finalizing the current snapshot deletions""", multipartUploads.size());
}
return newMultipartUploadCleanupListener(
refs,
multipartUploads.stream().map(u -> new AbortMultipartUploadRequest(bucket, u.getKey(), u.getUploadId())).toList()
);
}
} catch (Exception e) {
// Cleanup is a best-effort thing, we can't do anything better than log and carry on here.
logger.warn("failure while checking for possibly-dangling multipart uploads", e);
return ActionListener.noop();
}
}

private ActionListener<Void> newMultipartUploadCleanupListener(
RefCountingRunnable refs,
List<AbortMultipartUploadRequest> abortMultipartUploadRequests
) {
return new ThreadedActionListener<>(blobStore.getSnapshotExecutor(), ActionListener.releaseAfter(new ActionListener<>() {
@Override
public void onResponse(Void unused) {
try (var clientReference = blobStore.clientReference()) {
for (final var abortMultipartUploadRequest : abortMultipartUploadRequests) {
abortMultipartUploadRequest.putCustomQueryParameter(
S3BlobStore.CUSTOM_QUERY_PARAMETER_PURPOSE,
OperationPurpose.SNAPSHOT_DATA.getKey()
);
try {
SocketAccess.doPrivilegedVoid(() -> clientReference.client().abortMultipartUpload(abortMultipartUploadRequest));
logger.info(
"cleaned up dangling multipart upload [{}] of blob [{}][{}][{}]",
abortMultipartUploadRequest.getUploadId(),
blobStore.getRepositoryMetadata().name(),
abortMultipartUploadRequest.getBucketName(),
abortMultipartUploadRequest.getKey()
);
} catch (Exception e) {
// Cleanup is a best-effort thing, we can't do anything better than log and carry on here. Note that any failure
// is surprising, even a 404 means that something else aborted/completed the upload at a point where there
// should be no other processes interacting with the repository.
logger.warn(
Strings.format(
"failed to clean up multipart upload [{}] of blob [{}][{}][{}]",
abortMultipartUploadRequest.getUploadId(),
blobStore.getRepositoryMetadata().name(),
abortMultipartUploadRequest.getBucketName(),
abortMultipartUploadRequest.getKey()
),
e
);
}
}
}
}

@Override
public void onFailure(Exception e) {
logger.log(
MasterService.isPublishFailureException(e)
|| (e instanceof RepositoryException repositoryException
&& repositoryException.getCause() instanceof Exception cause
&& MasterService.isPublishFailureException(cause)) ? Level.DEBUG : Level.WARN,
"failed to start cleanup of dangling multipart uploads",
e
);
}
}, refs.acquire()));
}
}
Loading