Skip to content

Commit

Permalink
Update Azure internal blob store stats API to include operation purpo…
Browse files Browse the repository at this point in the history
…se (#113573)

Closes ES-9549
  • Loading branch information
nicktindall committed Sep 30, 2024
1 parent 2eb9274 commit 99d210a
Show file tree
Hide file tree
Showing 9 changed files with 400 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.OperationPurpose;
import org.elasticsearch.common.settings.MockSecureSettings;
import org.elasticsearch.common.settings.SecureSettings;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -146,7 +147,8 @@ private void ensureSasTokenPermissions() {
final PlainActionFuture<Void> future = new PlainActionFuture<>();
repository.threadPool().generic().execute(ActionRunnable.wrap(future, l -> {
final AzureBlobStore blobStore = (AzureBlobStore) repository.blobStore();
final AzureBlobServiceClient azureBlobServiceClient = blobStore.getService().client("default", LocationMode.PRIMARY_ONLY);
final AzureBlobServiceClient azureBlobServiceClient = blobStore.getService()
.client("default", LocationMode.PRIMARY_ONLY, randomFrom(OperationPurpose.values()));
final BlobServiceClient client = azureBlobServiceClient.getSyncClient();
try {
SocketAccess.doPrivilegedException(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class AzureBlobContainer extends AbstractBlobContainer {
@Override
public boolean blobExists(OperationPurpose purpose, String blobName) throws IOException {
logger.trace("blobExists({})", blobName);
return blobStore.blobExists(buildKey(blobName));
return blobStore.blobExists(purpose, buildKey(blobName));
}

private InputStream openInputStream(OperationPurpose purpose, String blobName, long position, @Nullable Long length)
Expand All @@ -68,7 +68,7 @@ private InputStream openInputStream(OperationPurpose purpose, String blobName, l
throw new NoSuchFileException("Blob [" + blobKey + "] not found");
}
try {
return blobStore.getInputStream(blobKey, position, length);
return blobStore.getInputStream(purpose, blobKey, position, length);
} catch (Exception e) {
if (ExceptionsHelper.unwrap(e, HttpResponseException.class) instanceof HttpResponseException httpResponseException) {
final var httpStatusCode = httpResponseException.getResponse().getStatusCode();
Expand Down Expand Up @@ -102,7 +102,7 @@ public long readBlobPreferredLength() {
public void writeBlob(OperationPurpose purpose, String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
throws IOException {
logger.trace("writeBlob({}, stream, {})", buildKey(blobName), blobSize);
blobStore.writeBlob(buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
blobStore.writeBlob(purpose, buildKey(blobName), inputStream, blobSize, failIfAlreadyExists);
}

@Override
Expand All @@ -117,14 +117,13 @@ public void writeBlobAtomic(
}

@Override
public void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists)
throws IOException {
public void writeBlobAtomic(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) {
writeBlob(purpose, blobName, bytes, failIfAlreadyExists);
}

@Override
public void writeBlob(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) throws IOException {
blobStore.writeBlob(buildKey(blobName), bytes, failIfAlreadyExists);
public void writeBlob(OperationPurpose purpose, String blobName, BytesReference bytes, boolean failIfAlreadyExists) {
blobStore.writeBlob(purpose, buildKey(blobName), bytes, failIfAlreadyExists);
}

@Override
Expand All @@ -135,12 +134,12 @@ public void writeMetadataBlob(
boolean atomic,
CheckedConsumer<OutputStream, IOException> writer
) throws IOException {
blobStore.writeBlob(buildKey(blobName), failIfAlreadyExists, writer);
blobStore.writeBlob(purpose, buildKey(blobName), failIfAlreadyExists, writer);
}

@Override
public DeleteResult delete(OperationPurpose purpose) throws IOException {
return blobStore.deleteBlobDirectory(keyPath);
public DeleteResult delete(OperationPurpose purpose) {
return blobStore.deleteBlobDirectory(purpose, keyPath);
}

@Override
Expand All @@ -161,7 +160,7 @@ public String next() {
@Override
public Map<String, BlobMetadata> listBlobsByPrefix(OperationPurpose purpose, @Nullable String prefix) throws IOException {
logger.trace("listBlobsByPrefix({})", prefix);
return blobStore.listBlobsByPrefix(keyPath, prefix);
return blobStore.listBlobsByPrefix(purpose, keyPath, prefix);
}

@Override
Expand All @@ -173,7 +172,7 @@ public Map<String, BlobMetadata> listBlobs(OperationPurpose purpose) throws IOEx
@Override
public Map<String, BlobContainer> children(OperationPurpose purpose) throws IOException {
final BlobPath path = path();
return blobStore.children(path);
return blobStore.children(purpose, path);
}

protected String buildKey(String blobName) {
Expand All @@ -199,7 +198,7 @@ private boolean skipIfNotPrimaryOnlyLocationMode(ActionListener<?> listener) {
@Override
public void getRegister(OperationPurpose purpose, String key, ActionListener<OptionalBytesReference> listener) {
if (skipRegisterOperation(listener)) return;
ActionListener.completeWith(listener, () -> blobStore.getRegister(buildKey(key), keyPath, key));
ActionListener.completeWith(listener, () -> blobStore.getRegister(purpose, buildKey(key), keyPath, key));
}

@Override
Expand All @@ -211,7 +210,14 @@ public void compareAndExchangeRegister(
ActionListener<OptionalBytesReference> listener
) {
if (skipRegisterOperation(listener)) return;
ActionListener.completeWith(listener, () -> blobStore.compareAndExchangeRegister(buildKey(key), keyPath, key, expected, updated));
ActionListener.completeWith(
listener,
() -> blobStore.compareAndExchangeRegister(purpose, buildKey(key), keyPath, key, expected, updated)
);
}

// visible for testing
AzureBlobStore getBlobStore() {
return blobStore;
}
}
Loading

0 comments on commit 99d210a

Please sign in to comment.