From 99d210a87378e660bb8c7056bf199728dca7b6a3 Mon Sep 17 00:00:00 2001 From: Nick Tindall Date: Tue, 1 Oct 2024 09:22:42 +1000 Subject: [PATCH] Update Azure internal blob store stats API to include operation purpose (#113573) Closes ES-9549 --- .../AzureStorageCleanupThirdPartyTests.java | 4 +- .../azure/AzureBlobContainer.java | 34 +-- .../repositories/azure/AzureBlobStore.java | 221 +++++++++++------- .../azure/AzureClientProvider.java | 28 ++- .../azure/AzureStorageService.java | 30 ++- .../azure/AbstractAzureServerTestCase.java | 16 +- .../azure/AzureBlobContainerStatsTests.java | 96 ++++++++ .../azure/AzureClientProviderTests.java | 23 +- .../azure/AzureStorageServiceTests.java | 92 ++++++-- 9 files changed, 400 insertions(+), 144 deletions(-) create mode 100644 modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerStatsTests.java diff --git a/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureStorageCleanupThirdPartyTests.java b/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureStorageCleanupThirdPartyTests.java index b5987bf6338bb..7d280f31ecf19 100644 --- a/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureStorageCleanupThirdPartyTests.java +++ b/modules/repository-azure/src/internalClusterTest/java/org/elasticsearch/repositories/azure/AzureStorageCleanupThirdPartyTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.blobstore.OperationPurpose; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.SecureSettings; import org.elasticsearch.common.settings.Settings; @@ -146,7 +147,8 @@ private void ensureSasTokenPermissions() { final PlainActionFuture future = new PlainActionFuture<>(); repository.threadPool().generic().execute(ActionRunnable.wrap(future, l -> { final AzureBlobStore blobStore = (AzureBlobStore) repository.blobStore(); - final AzureBlobServiceClient azureBlobServiceClient = blobStore.getService().client("default", LocationMode.PRIMARY_ONLY); + final AzureBlobServiceClient azureBlobServiceClient = blobStore.getService() + .client("default", LocationMode.PRIMARY_ONLY, randomFrom(OperationPurpose.values())); final BlobServiceClient client = azureBlobServiceClient.getSyncClient(); try { SocketAccess.doPrivilegedException(() -> { diff --git a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java index 1f92c92426384..a3f26424324fa 100644 --- a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java +++ b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobContainer.java @@ -51,7 +51,7 @@ public class AzureBlobContainer extends AbstractBlobContainer { @Override public boolean blobExists(OperationPurpose purpose, String blobName) throws IOException { logger.trace("blobExists({})", blobName); - return blobStore.blobExists(buildKey(blobName)); + return blobStore.blobExists(purpose, buildKey(blobName)); } private InputStream openInputStream(OperationPurpose purpose, String blobName, long position, @Nullable Long length) @@ -68,7 +68,7 @@ private InputStream openInputStream(OperationPurpose purpose, String blobName, l throw new NoSuchFileException("Blob [" + blobKey + "] not found"); } try { - return blobStore.getInputStream(blobKey, position, length); + return blobStore.getInputStream(purpose, blobKey, position, length); } catch (Exception e) { if (ExceptionsHelper.unwrap(e, HttpResponseException.class) instanceof HttpResponseException httpResponseException) { final var httpStatusCode = httpResponseException.getResponse().getStatusCode(); @@ -102,7 +102,7 @@ public long readBlobPreferredLength() { public void writeBlob(OperationPurpose purpose, String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { logger.trace("writeBlob({}, stream, {})", buildKey(blobName), blobSize); - blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists); + blobStore.writeBlob(purpose, buildKey(blobName), inputStream, blobSize, failIfAlreadyExists); } @Override @@ -117,14 +117,13 @@ public void writeBlobAtomic( } @Override - public void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) - throws IOException { + public void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) { writeBlob(purpose, blobName, bytes, failIfAlreadyExists); } @Override - public void writeBlob(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException { - blobStore.writeBlob(buildKey(blobName), bytes, failIfAlreadyExists); + public void writeBlob(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) { + blobStore.writeBlob(purpose, buildKey(blobName), bytes, failIfAlreadyExists); } @Override @@ -135,12 +134,12 @@ public void writeMetadataBlob( boolean atomic, CheckedConsumer writer ) throws IOException { - blobStore.writeBlob(buildKey(blobName), failIfAlreadyExists, writer); + blobStore.writeBlob(purpose, buildKey(blobName), failIfAlreadyExists, writer); } @Override - public DeleteResult delete(OperationPurpose purpose) throws IOException { - return blobStore.deleteBlobDirectory(keyPath); + public DeleteResult delete(OperationPurpose purpose) { + return blobStore.deleteBlobDirectory(purpose, keyPath); } @Override @@ -161,7 +160,7 @@ public String next() { @Override public Map listBlobsByPrefix(OperationPurpose purpose, @Nullable String prefix) throws IOException { logger.trace("listBlobsByPrefix({})", prefix); - return blobStore.listBlobsByPrefix(keyPath, prefix); + return blobStore.listBlobsByPrefix(purpose, keyPath, prefix); } @Override @@ -173,7 +172,7 @@ public Map listBlobs(OperationPurpose purpose) throws IOEx @Override public Map children(OperationPurpose purpose) throws IOException { final BlobPath path = path(); - return blobStore.children(path); + return blobStore.children(purpose, path); } protected String buildKey(String blobName) { @@ -199,7 +198,7 @@ private boolean skipIfNotPrimaryOnlyLocationMode(ActionListener listener) { @Override public void getRegister(OperationPurpose purpose, String key, ActionListener listener) { if (skipRegisterOperation(listener)) return; - ActionListener.completeWith(listener, () -> blobStore.getRegister(buildKey(key), keyPath, key)); + ActionListener.completeWith(listener, () -> blobStore.getRegister(purpose, buildKey(key), keyPath, key)); } @Override @@ -211,7 +210,14 @@ public void compareAndExchangeRegister( ActionListener listener ) { if (skipRegisterOperation(listener)) return; - ActionListener.completeWith(listener, () -> blobStore.compareAndExchangeRegister(buildKey(key), keyPath, key, expected, updated)); + ActionListener.completeWith( + listener, + () -> blobStore.compareAndExchangeRegister(purpose, buildKey(key), keyPath, key, expected, updated) + ); } + // visible for testing + AzureBlobStore getBlobStore() { + return blobStore; + } } diff --git a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java index 4ed1b142023e8..5466989082129 100644 --- a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java +++ b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureBlobStore.java @@ -16,6 +16,7 @@ import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; +import com.azure.core.http.HttpMethod; import com.azure.core.http.rest.ResponseBase; import com.azure.core.util.BinaryData; import com.azure.storage.blob.BlobAsyncClient; @@ -74,6 +75,7 @@ import java.nio.ByteBuffer; import java.nio.file.FileAlreadyExistsException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Base64; import java.util.Collections; import java.util.HashMap; @@ -83,10 +85,13 @@ import java.util.Objects; import java.util.Spliterator; import java.util.Spliterators; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; +import java.util.concurrent.atomic.LongAdder; import java.util.function.BiPredicate; +import java.util.function.Consumer; +import java.util.stream.Collectors; import java.util.stream.StreamSupport; import static org.elasticsearch.core.Strings.format; @@ -105,8 +110,8 @@ public class AzureBlobStore implements BlobStore { private final LocationMode locationMode; private final ByteSizeValue maxSinglePartUploadSize; - private final Stats stats = new Stats(); - private final BiConsumer statsConsumer; + private final StatsCollectors statsCollectors = new StatsCollectors(); + private final AzureClientProvider.SuccessfulRequestHandler statsConsumer; public AzureBlobStore(RepositoryMetadata metadata, AzureStorageService service, BigArrays bigArrays) { this.container = Repository.CONTAINER_SETTING.get(metadata.settings()); @@ -118,26 +123,38 @@ public AzureBlobStore(RepositoryMetadata metadata, AzureStorageService service, this.maxSinglePartUploadSize = Repository.MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.get(metadata.settings()); List requestStatsCollectors = List.of( - RequestStatsCollector.create((httpMethod, url) -> httpMethod.equals("HEAD"), stats.headOperations::incrementAndGet), RequestStatsCollector.create( - (httpMethod, url) -> httpMethod.equals("GET") && isListRequest(httpMethod, url) == false, - stats.getOperations::incrementAndGet + (httpMethod, url) -> httpMethod == HttpMethod.HEAD, + purpose -> statsCollectors.onSuccessfulRequest(Operation.GET_BLOB_PROPERTIES, purpose) + ), + RequestStatsCollector.create( + (httpMethod, url) -> httpMethod == HttpMethod.GET && isListRequest(httpMethod, url) == false, + purpose -> statsCollectors.onSuccessfulRequest(Operation.GET_BLOB, purpose) + ), + RequestStatsCollector.create( + AzureBlobStore::isListRequest, + purpose -> statsCollectors.onSuccessfulRequest(Operation.LIST_BLOBS, purpose) + ), + RequestStatsCollector.create( + AzureBlobStore::isPutBlockRequest, + purpose -> statsCollectors.onSuccessfulRequest(Operation.PUT_BLOCK, purpose) + ), + RequestStatsCollector.create( + AzureBlobStore::isPutBlockListRequest, + purpose -> statsCollectors.onSuccessfulRequest(Operation.PUT_BLOCK_LIST, purpose) ), - RequestStatsCollector.create(AzureBlobStore::isListRequest, stats.listOperations::incrementAndGet), - RequestStatsCollector.create(AzureBlobStore::isPutBlockRequest, stats.putBlockOperations::incrementAndGet), - RequestStatsCollector.create(AzureBlobStore::isPutBlockListRequest, stats.putBlockListOperations::incrementAndGet), RequestStatsCollector.create( // https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob#uri-parameters // The only URI parameter allowed for put-blob operation is "timeout", but if a sas token is used, // it's possible that the URI parameters contain additional parameters unrelated to the upload type. - (httpMethod, url) -> httpMethod.equals("PUT") + (httpMethod, url) -> httpMethod == HttpMethod.PUT && isPutBlockRequest(httpMethod, url) == false && isPutBlockListRequest(httpMethod, url) == false, - stats.putOperations::incrementAndGet + purpose -> statsCollectors.onSuccessfulRequest(Operation.PUT_BLOB, purpose) ) ); - this.statsConsumer = (httpMethod, url) -> { + this.statsConsumer = (purpose, httpMethod, url) -> { try { URI uri = url.toURI(); String path = uri.getPath() == null ? "" : uri.getPath(); @@ -152,27 +169,27 @@ && isPutBlockListRequest(httpMethod, url) == false, for (RequestStatsCollector requestStatsCollector : requestStatsCollectors) { if (requestStatsCollector.shouldConsumeRequestInfo(httpMethod, url)) { - requestStatsCollector.consumeHttpRequestInfo(); + requestStatsCollector.consumeHttpRequestInfo(purpose); return; } } }; } - private static boolean isListRequest(String httpMethod, URL url) { - return httpMethod.equals("GET") && url.getQuery() != null && url.getQuery().contains("comp=list"); + private static boolean isListRequest(HttpMethod httpMethod, URL url) { + return httpMethod == HttpMethod.GET && url.getQuery() != null && url.getQuery().contains("comp=list"); } // https://docs.microsoft.com/en-us/rest/api/storageservices/put-block - private static boolean isPutBlockRequest(String httpMethod, URL url) { + private static boolean isPutBlockRequest(HttpMethod httpMethod, URL url) { String queryParams = url.getQuery() == null ? "" : url.getQuery(); - return httpMethod.equals("PUT") && queryParams.contains("comp=block") && queryParams.contains("blockid="); + return httpMethod == HttpMethod.PUT && queryParams.contains("comp=block") && queryParams.contains("blockid="); } // https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list - private static boolean isPutBlockListRequest(String httpMethod, URL url) { + private static boolean isPutBlockListRequest(HttpMethod httpMethod, URL url) { String queryParams = url.getQuery() == null ? "" : url.getQuery(); - return httpMethod.equals("PUT") && queryParams.contains("comp=blocklist"); + return httpMethod == HttpMethod.PUT && queryParams.contains("comp=blocklist"); } public long getReadChunkSize() { @@ -203,8 +220,8 @@ public BlobContainer blobContainer(BlobPath path) { @Override public void close() {} - public boolean blobExists(String blob) throws IOException { - final BlobServiceClient client = client(); + public boolean blobExists(OperationPurpose purpose, String blob) throws IOException { + final BlobServiceClient client = client(purpose); try { Boolean blobExists = SocketAccess.doPrivilegedException(() -> { @@ -221,12 +238,12 @@ public boolean blobExists(String blob) throws IOException { // number of concurrent blob delete requests to use while bulk deleting private static final int CONCURRENT_DELETES = 100; - public DeleteResult deleteBlobDirectory(String path) throws IOException { + public DeleteResult deleteBlobDirectory(OperationPurpose purpose, String path) { final AtomicInteger blobsDeleted = new AtomicInteger(0); final AtomicLong bytesDeleted = new AtomicLong(0); SocketAccess.doPrivilegedVoidException(() -> { - final BlobContainerAsyncClient blobContainerAsyncClient = asyncClient().getBlobContainerAsyncClient(container); + final BlobContainerAsyncClient blobContainerAsyncClient = asyncClient(purpose).getBlobContainerAsyncClient(container); final ListBlobsOptions options = new ListBlobsOptions().setPrefix(path) .setDetails(new BlobListDetails().setRetrieveMetadata(true)); try { @@ -266,12 +283,12 @@ private static void filterDeleteExceptionsAndRethrow(Exception e, IOException ex } @Override - public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator blobs) throws IOException { + public void deleteBlobsIgnoringIfNotExists(OperationPurpose purpose, Iterator blobs) { if (blobs.hasNext() == false) { return; } - BlobServiceAsyncClient asyncClient = asyncClient(); + BlobServiceAsyncClient asyncClient = asyncClient(purpose); SocketAccess.doPrivilegedVoidException(() -> { final BlobContainerAsyncClient blobContainerClient = asyncClient.getBlobContainerAsyncClient(container); try { @@ -296,9 +313,9 @@ private static Mono getDeleteTask(String blobName, BlobAsyncClient blobAsy .onErrorMap(throwable -> new IOException("Error deleting blob " + blobName, throwable)); } - public InputStream getInputStream(String blob, long position, final @Nullable Long length) throws IOException { + public InputStream getInputStream(OperationPurpose purpose, String blob, long position, final @Nullable Long length) { logger.trace(() -> format("reading container [%s], blob [%s]", container, blob)); - final AzureBlobServiceClient azureBlobServiceClient = getAzureBlobServiceClientClient(); + final AzureBlobServiceClient azureBlobServiceClient = getAzureBlobServiceClientClient(purpose); final BlobServiceClient syncClient = azureBlobServiceClient.getSyncClient(); final BlobServiceAsyncClient asyncClient = azureBlobServiceClient.getAsyncClient(); @@ -324,11 +341,11 @@ public InputStream getInputStream(String blob, long position, final @Nullable Lo }); } - public Map listBlobsByPrefix(String keyPath, String prefix) throws IOException { + public Map listBlobsByPrefix(OperationPurpose purpose, String keyPath, String prefix) throws IOException { final var blobsBuilder = new HashMap(); logger.trace(() -> format("listing container [%s], keyPath [%s], prefix [%s]", container, keyPath, prefix)); try { - final BlobServiceClient client = client(); + final BlobServiceClient client = client(purpose); SocketAccess.doPrivilegedVoidException(() -> { final BlobContainerClient containerClient = client.getBlobContainerClient(container); final BlobListDetails details = new BlobListDetails().setRetrieveMetadata(true); @@ -352,12 +369,12 @@ public Map listBlobsByPrefix(String keyPath, String prefix return Map.copyOf(blobsBuilder); } - public Map children(BlobPath path) throws IOException { + public Map children(OperationPurpose purpose, BlobPath path) throws IOException { final var childrenBuilder = new HashMap(); final String keyPath = path.buildAsString(); try { - final BlobServiceClient client = client(); + final BlobServiceClient client = client(purpose); SocketAccess.doPrivilegedVoidException(() -> { BlobContainerClient blobContainer = client.getBlobContainerClient(container); final ListBlobsOptions listBlobsOptions = new ListBlobsOptions(); @@ -383,14 +400,18 @@ public Map children(BlobPath path) throws IOException { return Collections.unmodifiableMap(childrenBuilder); } - public void writeBlob(String blobName, BytesReference bytes, boolean failIfAlreadyExists) { + public void writeBlob(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) { Flux byteBufferFlux = Flux.fromArray(BytesReference.toByteBuffers(bytes)); - executeSingleUpload(blobName, byteBufferFlux, bytes.length(), failIfAlreadyExists); + executeSingleUpload(purpose, blobName, byteBufferFlux, bytes.length(), failIfAlreadyExists); } - public void writeBlob(String blobName, boolean failIfAlreadyExists, CheckedConsumer writer) - throws IOException { - final BlockBlobAsyncClient blockBlobAsyncClient = asyncClient().getBlobContainerAsyncClient(container) + public void writeBlob( + OperationPurpose purpose, + String blobName, + boolean failIfAlreadyExists, + CheckedConsumer writer + ) throws IOException { + final BlockBlobAsyncClient blockBlobAsyncClient = asyncClient(purpose).getBlobContainerAsyncClient(container) .getBlobAsyncClient(blobName) .getBlockBlobAsyncClient(); try (ChunkedBlobOutputStream out = new ChunkedBlobOutputStream<>(bigArrays, getUploadBlockSize()) { @@ -414,7 +435,7 @@ protected void flushBuffer() { @Override protected void onCompletion() { if (flushedBytes == 0L) { - writeBlob(blobName, buffer.bytes(), failIfAlreadyExists); + writeBlob(purpose, blobName, buffer.bytes(), failIfAlreadyExists); } else { flushBuffer(); SocketAccess.doPrivilegedVoidException( @@ -434,16 +455,17 @@ protected void onFailure() { } } - public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException { + public void writeBlob(OperationPurpose purpose, String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) + throws IOException { assert inputStream.markSupported() : "Should not be used with non-mark supporting streams as their retry handling in the SDK is broken"; logger.trace(() -> format("writeBlob(%s, stream, %s)", blobName, blobSize)); try { if (blobSize <= getLargeBlobThresholdInBytes()) { final Flux byteBufferFlux = convertStreamToByteBuffer(inputStream, blobSize, DEFAULT_UPLOAD_BUFFERS_SIZE); - executeSingleUpload(blobName, byteBufferFlux, blobSize, failIfAlreadyExists); + executeSingleUpload(purpose, blobName, byteBufferFlux, blobSize, failIfAlreadyExists); } else { - executeMultipartUpload(blobName, inputStream, blobSize, failIfAlreadyExists); + executeMultipartUpload(purpose, blobName, inputStream, blobSize, failIfAlreadyExists); } } catch (final BlobStorageException e) { if (failIfAlreadyExists @@ -459,9 +481,15 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b logger.trace(() -> format("writeBlob(%s, stream, %s) - done", blobName, blobSize)); } - private void executeSingleUpload(String blobName, Flux byteBufferFlux, long blobSize, boolean failIfAlreadyExists) { + private void executeSingleUpload( + OperationPurpose purpose, + String blobName, + Flux byteBufferFlux, + long blobSize, + boolean failIfAlreadyExists + ) { SocketAccess.doPrivilegedVoidException(() -> { - final BlobServiceAsyncClient asyncClient = asyncClient(); + final BlobServiceAsyncClient asyncClient = asyncClient(purpose); final BlobAsyncClient blobAsyncClient = asyncClient.getBlobContainerAsyncClient(container).getBlobAsyncClient(blobName); final BlockBlobAsyncClient blockBlobAsyncClient = blobAsyncClient.getBlockBlobAsyncClient(); @@ -476,9 +504,15 @@ private void executeSingleUpload(String blobName, Flux byteBufferFlu }); } - private void executeMultipartUpload(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) { + private void executeMultipartUpload( + OperationPurpose purpose, + String blobName, + InputStream inputStream, + long blobSize, + boolean failIfAlreadyExists + ) { SocketAccess.doPrivilegedVoidException(() -> { - final BlobServiceAsyncClient asyncClient = asyncClient(); + final BlobServiceAsyncClient asyncClient = asyncClient(purpose); final BlobAsyncClient blobAsyncClient = asyncClient.getBlobContainerAsyncClient(container).getBlobAsyncClient(blobName); final BlockBlobAsyncClient blockBlobAsyncClient = blobAsyncClient.getBlockBlobAsyncClient(); @@ -622,52 +656,72 @@ long getUploadBlockSize() { return service.getUploadBlockSize(); } - private BlobServiceClient client() { - return getAzureBlobServiceClientClient().getSyncClient(); + private BlobServiceClient client(OperationPurpose purpose) { + return getAzureBlobServiceClientClient(purpose).getSyncClient(); } - private BlobServiceAsyncClient asyncClient() { - return getAzureBlobServiceClientClient().getAsyncClient(); + private BlobServiceAsyncClient asyncClient(OperationPurpose purpose) { + return getAzureBlobServiceClientClient(purpose).getAsyncClient(); } - private AzureBlobServiceClient getAzureBlobServiceClientClient() { - return service.client(clientName, locationMode, statsConsumer); + private AzureBlobServiceClient getAzureBlobServiceClientClient(OperationPurpose purpose) { + return service.client(clientName, locationMode, purpose, statsConsumer); } @Override public Map stats() { - return stats.toMap(); + return statsCollectors.statsMap(service.isStateless()); } - private static class Stats { + // visible for testing + enum Operation { + GET_BLOB("GetBlob"), + LIST_BLOBS("ListBlobs"), + GET_BLOB_PROPERTIES("GetBlobProperties"), + PUT_BLOB("PutBlob"), + PUT_BLOCK("PutBlock"), + PUT_BLOCK_LIST("PutBlockList"); - private final AtomicLong getOperations = new AtomicLong(); + private final String key; - private final AtomicLong listOperations = new AtomicLong(); + public String getKey() { + return key; + } - private final AtomicLong headOperations = new AtomicLong(); + Operation(String key) { + this.key = key; + } + } - private final AtomicLong putOperations = new AtomicLong(); + private record StatsKey(Operation operation, OperationPurpose purpose) { + @Override + public String toString() { + return purpose.getKey() + "_" + operation.getKey(); + } + } - private final AtomicLong putBlockOperations = new AtomicLong(); + private static class StatsCollectors { + final Map collectors = new ConcurrentHashMap<>(); - private final AtomicLong putBlockListOperations = new AtomicLong(); + Map statsMap(boolean stateless) { + if (stateless) { + return collectors.entrySet() + .stream() + .collect(Collectors.toUnmodifiableMap(e -> e.getKey().toString(), e -> e.getValue().sum())); + } else { + Map normalisedStats = Arrays.stream(Operation.values()).collect(Collectors.toMap(Operation::getKey, o -> 0L)); + collectors.forEach( + (key, value) -> normalisedStats.compute( + key.operation.getKey(), + (k, current) -> Objects.requireNonNull(current) + value.sum() + ) + ); + return Map.copyOf(normalisedStats); + } + } - private Map toMap() { - return Map.of( - "GetBlob", - getOperations.get(), - "ListBlobs", - listOperations.get(), - "GetBlobProperties", - headOperations.get(), - "PutBlob", - putOperations.get(), - "PutBlock", - putBlockOperations.get(), - "PutBlockList", - putBlockListOperations.get() - ); + public void onSuccessfulRequest(Operation operation, OperationPurpose purpose) { + collectors.computeIfAbsent(new StatsKey(operation, purpose), k -> new LongAdder()).increment(); } } @@ -793,35 +847,35 @@ private ByteBuf getNextByteBuf() throws IOException { } private static class RequestStatsCollector { - private final BiPredicate filter; - private final Runnable onHttpRequest; + private final BiPredicate filter; + private final Consumer onHttpRequest; - private RequestStatsCollector(BiPredicate filter, Runnable onHttpRequest) { + private RequestStatsCollector(BiPredicate filter, Consumer onHttpRequest) { this.filter = filter; this.onHttpRequest = onHttpRequest; } - static RequestStatsCollector create(BiPredicate filter, Runnable consumer) { + static RequestStatsCollector create(BiPredicate filter, Consumer consumer) { return new RequestStatsCollector(filter, consumer); } - private boolean shouldConsumeRequestInfo(String httpMethod, URL url) { + private boolean shouldConsumeRequestInfo(HttpMethod httpMethod, URL url) { return filter.test(httpMethod, url); } - private void consumeHttpRequestInfo() { - onHttpRequest.run(); + private void consumeHttpRequestInfo(OperationPurpose operationPurpose) { + onHttpRequest.accept(operationPurpose); } } - OptionalBytesReference getRegister(String blobPath, String containerPath, String blobKey) { + OptionalBytesReference getRegister(OperationPurpose purpose, String blobPath, String containerPath, String blobKey) { try { return SocketAccess.doPrivilegedException( () -> OptionalBytesReference.of( downloadRegisterBlob( containerPath, blobKey, - getAzureBlobServiceClientClient().getSyncClient().getBlobContainerClient(container).getBlobClient(blobPath), + getAzureBlobServiceClientClient(purpose).getSyncClient().getBlobContainerClient(container).getBlobClient(blobPath), null ) ) @@ -836,6 +890,7 @@ OptionalBytesReference getRegister(String blobPath, String containerPath, String } OptionalBytesReference compareAndExchangeRegister( + OperationPurpose purpose, String blobPath, String containerPath, String blobKey, @@ -849,7 +904,7 @@ OptionalBytesReference compareAndExchangeRegister( innerCompareAndExchangeRegister( containerPath, blobKey, - getAzureBlobServiceClientClient().getSyncClient().getBlobContainerClient(container).getBlobClient(blobPath), + getAzureBlobServiceClientClient(purpose).getSyncClient().getBlobContainerClient(container).getBlobClient(blobPath), expected, updated ) diff --git a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureClientProvider.java b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureClientProvider.java index e21c1384db271..ae497ff159576 100644 --- a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureClientProvider.java +++ b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureClientProvider.java @@ -37,6 +37,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.blobstore.OperationPurpose; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -46,13 +47,11 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.netty4.NettyAllocator; -import java.io.IOException; import java.net.URL; import java.time.Duration; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; -import java.util.function.BiConsumer; import static org.elasticsearch.repositories.azure.AzureRepositoryPlugin.NETTY_EVENT_LOOP_THREAD_POOL_NAME; import static org.elasticsearch.repositories.azure.AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME; @@ -161,7 +160,8 @@ AzureBlobServiceClient createClient( LocationMode locationMode, RequestRetryOptions retryOptions, ProxyOptions proxyOptions, - BiConsumer successfulRequestConsumer + SuccessfulRequestHandler successfulRequestHandler, + OperationPurpose purpose ) { if (closed) { throw new IllegalStateException("AzureClientProvider is already closed"); @@ -189,8 +189,8 @@ AzureBlobServiceClient createClient( builder.credential(credentialBuilder.build()); } - if (successfulRequestConsumer != null) { - builder.addPolicy(new SuccessfulRequestTracker(successfulRequestConsumer)); + if (successfulRequestHandler != null) { + builder.addPolicy(new SuccessfulRequestTracker(purpose, successfulRequestHandler)); } if (locationMode.isSecondary()) { @@ -257,13 +257,15 @@ protected void doStop() { } @Override - protected void doClose() throws IOException {} + protected void doClose() {} private static final class SuccessfulRequestTracker implements HttpPipelinePolicy { private static final Logger logger = LogManager.getLogger(SuccessfulRequestTracker.class); - private final BiConsumer onSuccessfulRequest; + private final OperationPurpose purpose; + private final SuccessfulRequestHandler onSuccessfulRequest; - private SuccessfulRequestTracker(BiConsumer onSuccessfulRequest) { + private SuccessfulRequestTracker(OperationPurpose purpose, SuccessfulRequestHandler onSuccessfulRequest) { + this.purpose = purpose; this.onSuccessfulRequest = onSuccessfulRequest; } @@ -276,11 +278,19 @@ private void trackSuccessfulRequest(HttpRequest httpRequest, HttpResponse httpRe HttpMethod method = httpRequest.getHttpMethod(); if (httpResponse != null && method != null && httpResponse.getStatusCode() > 199 && httpResponse.getStatusCode() <= 299) { try { - onSuccessfulRequest.accept(method.name(), httpRequest.getUrl()); + onSuccessfulRequest.onSuccessfulRequest(purpose, method, httpRequest.getUrl()); } catch (Exception e) { logger.warn("Unable to notify a successful request", e); } } } } + + /** + * The {@link SuccessfulRequestTracker} calls this when a request completes successfully + */ + interface SuccessfulRequestHandler { + + void onSuccessfulRequest(OperationPurpose purpose, HttpMethod method, URL url); + } } diff --git a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java index 17c719b97e448..c6e85e44d24dd 100644 --- a/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java +++ b/modules/repository-azure/src/main/java/org/elasticsearch/repositories/azure/AzureStorageService.java @@ -14,6 +14,8 @@ import com.azure.storage.common.policy.RequestRetryOptions; import com.azure.storage.common.policy.RetryPolicyType; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.blobstore.OperationPurpose; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsException; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -23,10 +25,8 @@ import java.net.InetSocketAddress; import java.net.Proxy; -import java.net.URL; import java.util.Map; import java.util.Set; -import java.util.function.BiConsumer; import static java.util.Collections.emptyMap; @@ -73,24 +73,38 @@ public class AzureStorageService { volatile Map storageSettings = emptyMap(); private final AzureClientProvider azureClientProvider; private final ClientLogger clientLogger = new ClientLogger(AzureStorageService.class); + private final boolean stateless; public AzureStorageService(Settings settings, AzureClientProvider azureClientProvider) { // eagerly load client settings so that secure settings are read final Map clientsSettings = AzureStorageSettings.load(settings); refreshSettings(clientsSettings); this.azureClientProvider = azureClientProvider; + this.stateless = DiscoveryNode.isStateless(settings); } - public AzureBlobServiceClient client(String clientName, LocationMode locationMode) { - return client(clientName, locationMode, null); + public AzureBlobServiceClient client(String clientName, LocationMode locationMode, OperationPurpose purpose) { + return client(clientName, locationMode, purpose, null); } - public AzureBlobServiceClient client(String clientName, LocationMode locationMode, BiConsumer successfulRequestConsumer) { + public AzureBlobServiceClient client( + String clientName, + LocationMode locationMode, + OperationPurpose purpose, + AzureClientProvider.SuccessfulRequestHandler successfulRequestHandler + ) { final AzureStorageSettings azureStorageSettings = getClientSettings(clientName); RequestRetryOptions retryOptions = getRetryOptions(locationMode, azureStorageSettings); ProxyOptions proxyOptions = getProxyOptions(azureStorageSettings); - return azureClientProvider.createClient(azureStorageSettings, locationMode, retryOptions, proxyOptions, successfulRequestConsumer); + return azureClientProvider.createClient( + azureStorageSettings, + locationMode, + retryOptions, + proxyOptions, + successfulRequestHandler, + purpose + ); } private AzureStorageSettings getClientSettings(String clientName) { @@ -124,6 +138,10 @@ int getMaxReadRetries(String clientName) { return azureStorageSettings.getMaxRetries(); } + boolean isStateless() { + return stateless; + } + // non-static, package private for testing RequestRetryOptions getRetryOptions(LocationMode locationMode, AzureStorageSettings azureStorageSettings) { AzureStorageSettings.StorageEndpoint endpoint = azureStorageSettings.getStorageEndpoint(); diff --git a/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AbstractAzureServerTestCase.java b/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AbstractAzureServerTestCase.java index 303f82cf34f9a..1962bddd8fdb3 100644 --- a/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AbstractAzureServerTestCase.java +++ b/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AbstractAzureServerTestCase.java @@ -15,6 +15,7 @@ import com.sun.net.httpserver.HttpServer; import org.elasticsearch.cluster.metadata.RepositoryMetadata; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.network.InetAddresses; @@ -60,14 +61,18 @@ @SuppressForbidden(reason = "use a http server") public abstract class AbstractAzureServerTestCase extends ESTestCase { protected static final long MAX_RANGE_VAL = Long.MAX_VALUE - 1L; + protected static final String ACCOUNT = "account"; + protected static final String CONTAINER = "container"; protected HttpServer httpServer; protected HttpServer secondaryHttpServer; + protected boolean serverlessMode; private ThreadPool threadPool; private AzureClientProvider clientProvider; @Before public void setUp() throws Exception { + serverlessMode = false; threadPool = new TestThreadPool( getTestClass().getName(), AzureRepositoryPlugin.executorBuilder(), @@ -98,7 +103,7 @@ protected BlobContainer createBlobContainer(final int maxRetries) { protected BlobContainer createBlobContainer(final int maxRetries, String secondaryHost, final LocationMode locationMode) { final String clientName = randomAlphaOfLength(5).toLowerCase(Locale.ROOT); final MockSecureSettings secureSettings = new MockSecureSettings(); - secureSettings.setString(ACCOUNT_SETTING.getConcreteSettingForNamespace(clientName).getKey(), "account"); + secureSettings.setString(ACCOUNT_SETTING.getConcreteSettingForNamespace(clientName).getKey(), ACCOUNT); final String key = Base64.getEncoder().encodeToString(randomAlphaOfLength(14).getBytes(UTF_8)); secureSettings.setString(KEY_SETTING.getConcreteSettingForNamespace(clientName).getKey(), key); @@ -114,15 +119,16 @@ protected BlobContainer createBlobContainer( ) { final Settings.Builder clientSettings = Settings.builder(); - String endpoint = "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=" + getEndpointForServer(httpServer, "account"); + String endpoint = "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=" + getEndpointForServer(httpServer, ACCOUNT); if (secondaryHost != null) { - endpoint += ";BlobSecondaryEndpoint=" + getEndpointForServer(secondaryHttpServer, "account"); + endpoint += ";BlobSecondaryEndpoint=" + getEndpointForServer(secondaryHttpServer, ACCOUNT); } clientSettings.put(ENDPOINT_SUFFIX_SETTING.getConcreteSettingForNamespace(clientName).getKey(), endpoint); clientSettings.put(MAX_RETRIES_SETTING.getConcreteSettingForNamespace(clientName).getKey(), maxRetries); clientSettings.put(TIMEOUT_SETTING.getConcreteSettingForNamespace(clientName).getKey(), TimeValue.timeValueMillis(500)); clientSettings.setSecureSettings(secureSettings); + clientSettings.put(DiscoveryNode.STATELESS_ENABLED_SETTING_NAME, serverlessMode); final AzureStorageService service = new AzureStorageService(clientSettings.build(), clientProvider) { @Override @@ -136,7 +142,7 @@ RequestRetryOptions getRetryOptions(LocationMode locationMode, AzureStorageSetti // The SDK doesn't work well with ip endponts. Secondary host endpoints that contain // a path causes the sdk to rewrite the endpoint with an invalid path, that's the reason why we provide just the host + // port. - secondaryHost != null ? secondaryHost.replaceFirst("/account", "") : null + secondaryHost != null ? secondaryHost.replaceFirst("/" + ACCOUNT, "") : null ); } @@ -155,7 +161,7 @@ int getMaxReadRetries(String clientName) { "repository", AzureRepository.TYPE, Settings.builder() - .put(CONTAINER_SETTING.getKey(), "container") + .put(CONTAINER_SETTING.getKey(), CONTAINER) .put(ACCOUNT_SETTING.getKey(), clientName) .put(LOCATION_MODE_SETTING.getKey(), locationMode) .put(MAX_SINGLE_PART_UPLOAD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.MB)) diff --git a/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerStatsTests.java b/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerStatsTests.java new file mode 100644 index 0000000000000..1ed01bbadc07e --- /dev/null +++ b/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureBlobContainerStatsTests.java @@ -0,0 +1,96 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.repositories.azure; + +import fixture.azure.AzureHttpHandler; + +import org.elasticsearch.common.blobstore.OperationPurpose; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.core.SuppressForbidden; +import org.junit.Before; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; + +public class AzureBlobContainerStatsTests extends AbstractAzureServerTestCase { + + @SuppressForbidden(reason = "use a http server") + @Before + public void configureAzureHandler() { + httpServer.createContext("/", new AzureHttpHandler(ACCOUNT, CONTAINER, null)); + } + + public void testOperationPurposeIsReflectedInBlobStoreStats() throws IOException { + serverlessMode = true; + AzureBlobContainer blobContainer = asInstanceOf(AzureBlobContainer.class, createBlobContainer(between(1, 3))); + AzureBlobStore blobStore = blobContainer.getBlobStore(); + OperationPurpose purpose = randomFrom(OperationPurpose.values()); + + String blobName = randomIdentifier(); + // PUT_BLOB + blobStore.writeBlob(purpose, blobName, BytesReference.fromByteBuffer(ByteBuffer.wrap(randomBlobContent())), false); + // LIST_BLOBS + blobStore.listBlobsByPrefix(purpose, randomIdentifier(), randomIdentifier()); + // GET_BLOB_PROPERTIES + blobStore.blobExists(purpose, blobName); + // PUT_BLOCK & PUT_BLOCK_LIST + byte[] blobContent = randomByteArrayOfLength((int) blobStore.getUploadBlockSize()); + blobStore.writeBlob(purpose, randomIdentifier(), false, os -> { + os.write(blobContent); + os.flush(); + }); + + Map stats = blobStore.stats(); + String statsMapString = stats.toString(); + assertEquals(statsMapString, Long.valueOf(1L), stats.get(statsKey(purpose, AzureBlobStore.Operation.PUT_BLOB))); + assertEquals(statsMapString, Long.valueOf(1L), stats.get(statsKey(purpose, AzureBlobStore.Operation.LIST_BLOBS))); + assertEquals(statsMapString, Long.valueOf(1L), stats.get(statsKey(purpose, AzureBlobStore.Operation.GET_BLOB_PROPERTIES))); + assertEquals(statsMapString, Long.valueOf(1L), stats.get(statsKey(purpose, AzureBlobStore.Operation.PUT_BLOCK))); + assertEquals(statsMapString, Long.valueOf(1L), stats.get(statsKey(purpose, AzureBlobStore.Operation.PUT_BLOCK_LIST))); + } + + public void testOperationPurposeIsNotReflectedInBlobStoreStatsWhenNotServerless() throws IOException { + serverlessMode = false; + AzureBlobContainer blobContainer = asInstanceOf(AzureBlobContainer.class, createBlobContainer(between(1, 3))); + AzureBlobStore blobStore = blobContainer.getBlobStore(); + + int repeatTimes = randomIntBetween(1, 3); + for (int i = 0; i < repeatTimes; i++) { + OperationPurpose purpose = randomFrom(OperationPurpose.values()); + + String blobName = randomIdentifier(); + // PUT_BLOB + blobStore.writeBlob(purpose, blobName, BytesReference.fromByteBuffer(ByteBuffer.wrap(randomBlobContent())), false); + // LIST_BLOBS + blobStore.listBlobsByPrefix(purpose, randomIdentifier(), randomIdentifier()); + // GET_BLOB_PROPERTIES + blobStore.blobExists(purpose, blobName); + // PUT_BLOCK & PUT_BLOCK_LIST + byte[] blobContent = randomByteArrayOfLength((int) blobStore.getUploadBlockSize()); + blobStore.writeBlob(purpose, randomIdentifier(), false, os -> { + os.write(blobContent); + os.flush(); + }); + } + + Map stats = blobStore.stats(); + String statsMapString = stats.toString(); + assertEquals(statsMapString, Long.valueOf(repeatTimes), stats.get(AzureBlobStore.Operation.PUT_BLOB.getKey())); + assertEquals(statsMapString, Long.valueOf(repeatTimes), stats.get(AzureBlobStore.Operation.LIST_BLOBS.getKey())); + assertEquals(statsMapString, Long.valueOf(repeatTimes), stats.get(AzureBlobStore.Operation.GET_BLOB_PROPERTIES.getKey())); + assertEquals(statsMapString, Long.valueOf(repeatTimes), stats.get(AzureBlobStore.Operation.PUT_BLOCK.getKey())); + assertEquals(statsMapString, Long.valueOf(repeatTimes), stats.get(AzureBlobStore.Operation.PUT_BLOCK_LIST.getKey())); + } + + private static String statsKey(OperationPurpose purpose, AzureBlobStore.Operation operation) { + return purpose.getKey() + "_" + operation.getKey(); + } +} diff --git a/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureClientProviderTests.java b/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureClientProviderTests.java index d595cded53dc7..7d82f2d5029f6 100644 --- a/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureClientProviderTests.java +++ b/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureClientProviderTests.java @@ -11,6 +11,7 @@ import com.azure.storage.common.policy.RequestRetryOptions; +import org.elasticsearch.common.blobstore.OperationPurpose; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; @@ -19,15 +20,13 @@ import org.junit.After; import org.junit.Before; -import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.function.BiConsumer; public class AzureClientProviderTests extends ESTestCase { - private static final BiConsumer EMPTY_CONSUMER = (method, url) -> {}; + private static final AzureClientProvider.SuccessfulRequestHandler EMPTY_CONSUMER = (purpose, method, url) -> {}; private ThreadPool threadPool; private AzureClientProvider azureClientProvider; @@ -72,7 +71,14 @@ public void testCanCreateAClientWithSecondaryLocation() { LocationMode locationMode = LocationMode.SECONDARY_ONLY; RequestRetryOptions requestRetryOptions = new RequestRetryOptions(); - azureClientProvider.createClient(storageSettings, locationMode, requestRetryOptions, null, EMPTY_CONSUMER); + azureClientProvider.createClient( + storageSettings, + locationMode, + requestRetryOptions, + null, + EMPTY_CONSUMER, + randomFrom(OperationPurpose.values()) + ); } public void testCanNotCreateAClientWithSecondaryLocationWithoutAProperEndpoint() { @@ -95,7 +101,14 @@ public void testCanNotCreateAClientWithSecondaryLocationWithoutAProperEndpoint() RequestRetryOptions requestRetryOptions = new RequestRetryOptions(); expectThrows( IllegalArgumentException.class, - () -> azureClientProvider.createClient(storageSettings, locationMode, requestRetryOptions, null, EMPTY_CONSUMER) + () -> azureClientProvider.createClient( + storageSettings, + locationMode, + requestRetryOptions, + null, + EMPTY_CONSUMER, + randomFrom(OperationPurpose.values()) + ) ); } diff --git a/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceTests.java b/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceTests.java index 85382c4f022f0..ef50644ac95e5 100644 --- a/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceTests.java +++ b/modules/repository-azure/src/test/java/org/elasticsearch/repositories/azure/AzureStorageServiceTests.java @@ -11,6 +11,7 @@ import com.azure.storage.common.policy.RequestRetryOptions; +import org.elasticsearch.common.blobstore.OperationPurpose; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsException; @@ -97,10 +98,18 @@ public void testCreateClientWithEndpointSuffix() throws IOException { .build(); try (AzureRepositoryPlugin plugin = pluginWithSettingsValidation(settings)) { final AzureStorageService azureStorageService = plugin.azureStoreService.get(); - AzureBlobServiceClient client1 = azureStorageService.client("azure1", LocationMode.PRIMARY_ONLY); + AzureBlobServiceClient client1 = azureStorageService.client( + "azure1", + LocationMode.PRIMARY_ONLY, + randomFrom(OperationPurpose.values()) + ); assertThat(client1.getSyncClient().getAccountUrl(), equalTo("https://myaccount1.blob.my_endpoint_suffix")); - AzureBlobServiceClient client2 = azureStorageService.client("azure2", LocationMode.PRIMARY_ONLY); + AzureBlobServiceClient client2 = azureStorageService.client( + "azure2", + LocationMode.PRIMARY_ONLY, + randomFrom(OperationPurpose.values()) + ); assertThat(client2.getSyncClient().getAccountUrl(), equalTo("https://myaccount2.blob.core.windows.net")); } } @@ -121,16 +130,24 @@ public void testReinitClientSettings() throws IOException { try (AzureRepositoryPlugin plugin = pluginWithSettingsValidation(settings1)) { final AzureStorageService azureStorageService = plugin.azureStoreService.get(); - AzureBlobServiceClient client11 = azureStorageService.client("azure1", LocationMode.PRIMARY_ONLY); + AzureBlobServiceClient client11 = azureStorageService.client( + "azure1", + LocationMode.PRIMARY_ONLY, + randomFrom(OperationPurpose.values()) + ); assertThat(client11.getSyncClient().getAccountUrl(), equalTo("https://myaccount11.blob.core.windows.net")); - AzureBlobServiceClient client12 = azureStorageService.client("azure2", LocationMode.PRIMARY_ONLY); + AzureBlobServiceClient client12 = azureStorageService.client( + "azure2", + LocationMode.PRIMARY_ONLY, + randomFrom(OperationPurpose.values()) + ); assertThat(client12.getSyncClient().getAccountUrl(), equalTo("https://myaccount12.blob.core.windows.net")); // client 3 is missing final SettingsException e1 = expectThrows( SettingsException.class, - () -> azureStorageService.client("azure3", LocationMode.PRIMARY_ONLY) + () -> azureStorageService.client("azure3", LocationMode.PRIMARY_ONLY, randomFrom(OperationPurpose.values())) ); assertThat(e1.getMessage(), is("Unable to find client with name [azure3]")); @@ -141,7 +158,11 @@ public void testReinitClientSettings() throws IOException { assertThat(client11.getSyncClient().getAccountUrl(), equalTo("https://myaccount11.blob.core.windows.net")); // new client 1 is changed - AzureBlobServiceClient client21 = azureStorageService.client("azure1", LocationMode.PRIMARY_ONLY); + AzureBlobServiceClient client21 = azureStorageService.client( + "azure1", + LocationMode.PRIMARY_ONLY, + randomFrom(OperationPurpose.values()) + ); assertThat(client21.getSyncClient().getAccountUrl(), equalTo("https://myaccount21.blob.core.windows.net")); // old client 2 not changed @@ -150,12 +171,16 @@ public void testReinitClientSettings() throws IOException { // new client2 is gone final SettingsException e2 = expectThrows( SettingsException.class, - () -> azureStorageService.client("azure2", LocationMode.PRIMARY_ONLY) + () -> azureStorageService.client("azure2", LocationMode.PRIMARY_ONLY, randomFrom(OperationPurpose.values())) ); assertThat(e2.getMessage(), is("Unable to find client with name [azure2]")); // client 3 emerged - AzureBlobServiceClient client23 = azureStorageService.client("azure3", LocationMode.PRIMARY_ONLY); + AzureBlobServiceClient client23 = azureStorageService.client( + "azure3", + LocationMode.PRIMARY_ONLY, + randomFrom(OperationPurpose.values()) + ); assertThat(client23.getSyncClient().getAccountUrl(), equalTo("https://myaccount23.blob.core.windows.net")); } } @@ -167,7 +192,11 @@ public void testReinitClientEmptySettings() throws IOException { final Settings settings = Settings.builder().setSecureSettings(secureSettings).build(); try (AzureRepositoryPlugin plugin = pluginWithSettingsValidation(settings)) { final AzureStorageService azureStorageService = plugin.azureStoreService.get(); - AzureBlobServiceClient client11 = azureStorageService.client("azure1", LocationMode.PRIMARY_ONLY); + AzureBlobServiceClient client11 = azureStorageService.client( + "azure1", + LocationMode.PRIMARY_ONLY, + randomFrom(OperationPurpose.values()) + ); assertThat(client11.getSyncClient().getAccountUrl(), equalTo("https://myaccount1.blob.core.windows.net")); // reinit with empty settings is okay plugin.reload(Settings.EMPTY); @@ -176,7 +205,7 @@ public void testReinitClientEmptySettings() throws IOException { // client is no longer registered final SettingsException e = expectThrows( SettingsException.class, - () -> azureStorageService.client("azure1", LocationMode.PRIMARY_ONLY) + () -> azureStorageService.client("azure1", LocationMode.PRIMARY_ONLY, randomFrom(OperationPurpose.values())) ); assertThat(e.getMessage(), equalTo("Unable to find client with name [azure1]")); } @@ -195,7 +224,11 @@ public void testReinitClientWrongSettings() throws IOException { final Settings settings2 = Settings.builder().setSecureSettings(secureSettings2).build(); try (AzureRepositoryPlugin plugin = pluginWithSettingsValidation(settings1)) { final AzureStorageService azureStorageService = plugin.azureStoreService.get(); - AzureBlobServiceClient client11 = azureStorageService.client("azure1", LocationMode.PRIMARY_ONLY); + AzureBlobServiceClient client11 = azureStorageService.client( + "azure1", + LocationMode.PRIMARY_ONLY, + randomFrom(OperationPurpose.values()) + ); assertThat(client11.getSyncClient().getAccountUrl(), equalTo("https://myaccount1.blob.core.windows.net")); assertThat( expectThrows(SettingsException.class, () -> plugin.reload(settings2)).getMessage(), @@ -463,24 +496,41 @@ public void testCreateClientWithEndpoints() throws IOException { try (AzureRepositoryPlugin plugin = pluginWithSettingsValidation(settings)) { final AzureStorageService azureStorageService = plugin.azureStoreService.get(); - expectThrows(IllegalArgumentException.class, () -> azureStorageService.client("azure1", LocationMode.PRIMARY_THEN_SECONDARY)); - expectThrows(IllegalArgumentException.class, () -> azureStorageService.client("azure1", LocationMode.SECONDARY_ONLY)); - expectThrows(IllegalArgumentException.class, () -> azureStorageService.client("azure1", LocationMode.SECONDARY_THEN_PRIMARY)); + expectThrows( + IllegalArgumentException.class, + () -> azureStorageService.client("azure1", LocationMode.PRIMARY_THEN_SECONDARY, randomFrom(OperationPurpose.values())) + ); + expectThrows( + IllegalArgumentException.class, + () -> azureStorageService.client("azure1", LocationMode.SECONDARY_ONLY, randomFrom(OperationPurpose.values())) + ); + expectThrows( + IllegalArgumentException.class, + () -> azureStorageService.client("azure1", LocationMode.SECONDARY_THEN_PRIMARY, randomFrom(OperationPurpose.values())) + ); - AzureBlobServiceClient client1 = azureStorageService.client("azure1", LocationMode.PRIMARY_ONLY); + AzureBlobServiceClient client1 = azureStorageService.client( + "azure1", + LocationMode.PRIMARY_ONLY, + randomFrom(OperationPurpose.values()) + ); assertThat(client1.getSyncClient().getAccountUrl(), equalTo("https://account1.zone.azure.net")); assertThat( - azureStorageService.client("azure2", randomBoolean() ? LocationMode.PRIMARY_ONLY : LocationMode.PRIMARY_THEN_SECONDARY) - .getSyncClient() - .getAccountUrl(), + azureStorageService.client( + "azure2", + randomBoolean() ? LocationMode.PRIMARY_ONLY : LocationMode.PRIMARY_THEN_SECONDARY, + randomFrom(OperationPurpose.values()) + ).getSyncClient().getAccountUrl(), equalTo("https://account2.zone.azure.net") ); assertThat( - azureStorageService.client("azure2", randomBoolean() ? LocationMode.SECONDARY_ONLY : LocationMode.SECONDARY_THEN_PRIMARY) - .getSyncClient() - .getAccountUrl(), + azureStorageService.client( + "azure2", + randomBoolean() ? LocationMode.SECONDARY_ONLY : LocationMode.SECONDARY_THEN_PRIMARY, + randomFrom(OperationPurpose.values()) + ).getSyncClient().getAccountUrl(), equalTo("https://account2-secondary.zone.azure.net") ); }