Skip to content

Commit

Permalink
Merge 2f192e8 into openjdk23-bundle
Browse files Browse the repository at this point in the history
  • Loading branch information
elasticsearchmachine committed Sep 28, 2024
2 parents 3018523 + 2f192e8 commit d2260fd
Show file tree
Hide file tree
Showing 20 changed files with 919 additions and 237 deletions.
4 changes: 2 additions & 2 deletions build-tools-internal/gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionSha256Sum=fdfca5dbc2834f0ece5020465737538e5ba679deeff5ab6c09621d67f8bb1a15
distributionUrl=https\://services.gradle.org/distributions/gradle-8.10.1-all.zip
distributionSha256Sum=2ab88d6de2c23e6adae7363ae6e29cbdd2a709e992929b48b6530fd0c7133bd6
distributionUrl=https\://services.gradle.org/distributions/gradle-8.10.2-all.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
8.10.1
8.10.2
4 changes: 2 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionSha256Sum=fdfca5dbc2834f0ece5020465737538e5ba679deeff5ab6c09621d67f8bb1a15
distributionUrl=https\://services.gradle.org/distributions/gradle-8.10.1-all.zip
distributionSha256Sum=2ab88d6de2c23e6adae7363ae6e29cbdd2a709e992929b48b6530fd0c7133bd6
distributionUrl=https\://services.gradle.org/distributions/gradle-8.10.2-all.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.ingest.geoip;

import com.maxmind.geoip2.model.CountryResponse;
import com.maxmind.geoip2.record.Country;

import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.xcontent.XContentType;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.zip.GZIPOutputStream;

import static org.hamcrest.Matchers.equalTo;

public class DatabaseNodeServiceIT extends AbstractGeoIpIT {
/*
* This test makes sure that if we index an ordinary mmdb file into the .geoip_databases index, it is correctly handled upon retrieval.
*/
public void testNonGzippedDatabase() throws Exception {
String databaseName = "GeoLite2-Country";
String databaseFileName = databaseName + ".mmdb";
byte[] mmdbBytes = getBytesForFile(databaseFileName);
final DatabaseNodeService databaseNodeService = internalCluster().getInstance(DatabaseNodeService.class);
assertNull(databaseNodeService.getDatabase(databaseFileName));
int numChunks = indexData(databaseFileName, mmdbBytes);
retrieveDatabase(databaseNodeService, databaseFileName, mmdbBytes, numChunks);
assertBusy(() -> assertNotNull(databaseNodeService.getDatabase(databaseFileName)));
assertValidDatabase(databaseNodeService, databaseFileName, databaseName);
}

/*
* This test makes sure that if we index a gzipped tar file wrapping an mmdb file into the .geoip_databases index, it is correctly
* handled upon retrieval.
*/
public void testGzippedDatabase() throws Exception {
String databaseName = "GeoLite2-Country";
String databaseFileName = databaseName + ".mmdb";
byte[] mmdbBytes = getBytesForFile(databaseFileName);
byte[] gzipBytes = gzipFileBytes(databaseFileName, mmdbBytes);
final DatabaseNodeService databaseNodeService = internalCluster().getInstance(DatabaseNodeService.class);
assertNull(databaseNodeService.getDatabase(databaseFileName));
int numChunks = indexData(databaseFileName, gzipBytes);
retrieveDatabase(databaseNodeService, databaseFileName, gzipBytes, numChunks);
assertBusy(() -> assertNotNull(databaseNodeService.getDatabase(databaseFileName)));
assertValidDatabase(databaseNodeService, databaseFileName, databaseName);
}

/*
* This makes sure that the database is generally usable
*/
private void assertValidDatabase(DatabaseNodeService databaseNodeService, String databaseFileName, String databaseType)
throws IOException {
IpDatabase database = databaseNodeService.getDatabase(databaseFileName);
assertNotNull(database);
assertThat(database.getDatabaseType(), equalTo(databaseType));
CountryResponse countryResponse = database.getCountry("89.160.20.128");
assertNotNull(countryResponse);
Country country = countryResponse.getCountry();
assertNotNull(country);
assertThat(country.getName(), equalTo("Sweden"));
}

/*
* This has the databaseNodeService retrieve the database from the .geoip_databases index, making the database ready for use when
* databaseNodeService.getDatabase(databaseFileName) is called.
*/
private void retrieveDatabase(DatabaseNodeService databaseNodeService, String databaseFileName, byte[] expectedBytes, int numChunks)
throws IOException {
GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(1, 0, numChunks - 1, getMd5(expectedBytes), 1);
databaseNodeService.retrieveAndUpdateDatabase(databaseFileName, metadata);
}

private String getMd5(byte[] bytes) {
MessageDigest md = MessageDigests.md5();
md.update(bytes);
return MessageDigests.toHexString(md.digest());
}

private byte[] gzipFileBytes(String databaseName, byte[] mmdbBytes) throws IOException {
final byte[] EMPTY_BUF = new byte[512];
Path mmdbFile = createTempFile();
Files.copy(new ByteArrayInputStream(mmdbBytes), mmdbFile, StandardCopyOption.REPLACE_EXISTING);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (OutputStream gos = new GZIPOutputStream(new BufferedOutputStream(baos))) {
long size = Files.size(mmdbFile);
gos.write(createTarHeader(databaseName, size));
Files.copy(mmdbFile, gos);
if (size % 512 != 0) {
gos.write(EMPTY_BUF, 0, (int) (512 - (size % 512)));
}
gos.write(EMPTY_BUF);
gos.write(EMPTY_BUF);
}
return baos.toByteArray();
}

private static byte[] createTarHeader(String name, long size) {
byte[] buf = new byte[512];
byte[] sizeBytes = String.format(Locale.ROOT, "%1$012o", size).getBytes(StandardCharsets.UTF_8);
byte[] nameBytes = name.substring(Math.max(0, name.length() - 100)).getBytes(StandardCharsets.US_ASCII);
byte[] id = "0001750".getBytes(StandardCharsets.UTF_8);
byte[] permission = "000644 ".getBytes(StandardCharsets.UTF_8);
byte[] time = String.format(Locale.ROOT, "%1$012o", System.currentTimeMillis() / 1000).getBytes(StandardCharsets.UTF_8);
System.arraycopy(nameBytes, 0, buf, 0, nameBytes.length);
System.arraycopy(permission, 0, buf, 100, 7);
System.arraycopy(id, 0, buf, 108, 7);
System.arraycopy(id, 0, buf, 116, 7);
System.arraycopy(sizeBytes, 0, buf, 124, 12);
System.arraycopy(time, 0, buf, 136, 12);
int checksum = 256;
for (byte b : buf) {
checksum += b & 0xFF;
}
byte[] checksumBytes = String.format(Locale.ROOT, "%1$07o", checksum).getBytes(StandardCharsets.UTF_8);
System.arraycopy(checksumBytes, 0, buf, 148, 7);
return buf;
}

/*
* Finds the given databaseFileName on the classpath, and returns its bytes.
*/
private static byte[] getBytesForFile(String databaseFileName) throws IOException {
try (InputStream is = DatabaseNodeServiceIT.class.getResourceAsStream("/" + databaseFileName)) {
if (is == null) {
throw new FileNotFoundException("Resource [" + databaseFileName + "] not found in classpath");
}
try (BufferedInputStream bis = new BufferedInputStream(is)) {
return bis.readAllBytes();
}
}
}

/*
* This indexes data into the .geoip_databases index in a random number of chunks.
*/
private static int indexData(String databaseFileName, byte[] content) throws IOException {
List<byte[]> chunks = chunkBytes(content, randomIntBetween(1, 100));
indexChunks(databaseFileName, chunks);
return chunks.size();
}

/*
* This turns the given content bytes into the given number of chunks.
*/
private static List<byte[]> chunkBytes(byte[] content, int chunks) throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
byteArrayOutputStream.write(content);
byteArrayOutputStream.close();

byte[] all = byteArrayOutputStream.toByteArray();
int chunkSize = Math.max(1, all.length / chunks);
List<byte[]> data = new ArrayList<>();

for (int from = 0; from < all.length;) {
int to = from + chunkSize;
if (to > all.length) {
to = all.length;
}
data.add(Arrays.copyOfRange(all, from, to));
from = to;
}

while (data.size() > chunks) {
byte[] last = data.removeLast();
byte[] secondLast = data.removeLast();
byte[] merged = new byte[secondLast.length + last.length];
System.arraycopy(secondLast, 0, merged, 0, secondLast.length);
System.arraycopy(last, 0, merged, secondLast.length, last.length);
data.add(merged);
}
return data;
}

/*
* This writes the given chunks into the .geoip_databases index.
*/
private static void indexChunks(String name, List<byte[]> chunks) {
int chunk = 0;
for (byte[] buf : chunks) {
IndexRequest indexRequest = new IndexRequest(GeoIpDownloader.DATABASES_INDEX).id(name + "_" + chunk + "_" + 1)
.create(true)
.source(XContentType.SMILE, "name", name, "chunk", chunk, "data", buf);
client().index(indexRequest).actionGet();
chunk++;
}
FlushRequest flushRequest = new FlushRequest(GeoIpDownloader.DATABASES_INDEX);
client().admin().indices().flush(flushRequest).actionGet();
// Ensure that the chunk documents are visible:
RefreshRequest refreshRequest = new RefreshRequest(GeoIpDownloader.DATABASES_INDEX);
client().admin().indices().refresh(refreshRequest).actionGet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -351,15 +351,15 @@ void checkDatabases(ClusterState state) {
}

void retrieveAndUpdateDatabase(String databaseName, GeoIpTaskState.Metadata metadata) throws IOException {
logger.trace("Retrieving database {}", databaseName);
logger.trace("retrieving database [{}]", databaseName);
final String recordedMd5 = metadata.md5();

// This acts as a lock, if this method for a specific db is executed later and downloaded for this db is still ongoing then
// FileAlreadyExistsException is thrown and this method silently returns.
// (this method is never invoked concurrently and is invoked by a cluster state applier thread)
final Path databaseTmpGzFile;
final Path retrievedFile;
try {
databaseTmpGzFile = Files.createFile(geoipTmpDirectory.resolve(databaseName + ".tmp.gz"));
retrievedFile = Files.createFile(geoipTmpDirectory.resolve(databaseName + ".tmp.retrieved"));
} catch (FileAlreadyExistsException e) {
logger.debug("database update [{}] already in progress, skipping...", databaseName);
return;
Expand All @@ -374,24 +374,21 @@ void retrieveAndUpdateDatabase(String databaseName, GeoIpTaskState.Metadata meta
DatabaseReaderLazyLoader lazyLoader = databases.get(databaseName);
if (lazyLoader != null && recordedMd5.equals(lazyLoader.getMd5())) {
logger.debug("deleting tmp file because database [{}] has already been updated.", databaseName);
Files.delete(databaseTmpGzFile);
Files.delete(retrievedFile);
return;
}

final Path databaseTmpFile = Files.createFile(geoipTmpDirectory.resolve(databaseName + ".tmp"));
logger.debug("retrieve geoip database [{}] from [{}] to [{}]", databaseName, GeoIpDownloader.DATABASES_INDEX, databaseTmpGzFile);
retrieveDatabase(
databaseName,
recordedMd5,
metadata,
bytes -> Files.write(databaseTmpGzFile, bytes, StandardOpenOption.APPEND),
() -> {
logger.debug("decompressing [{}]", databaseTmpGzFile.getFileName());

Path databaseFile = geoipTmpDirectory.resolve(databaseName);
logger.debug("retrieving database [{}] from [{}] to [{}]", databaseName, GeoIpDownloader.DATABASES_INDEX, retrievedFile);
retrieveDatabase(databaseName, recordedMd5, metadata, bytes -> Files.write(retrievedFile, bytes, StandardOpenOption.APPEND), () -> {
final Path databaseFile = geoipTmpDirectory.resolve(databaseName);

boolean isTarGz = MMDBUtil.isGzip(retrievedFile);
if (isTarGz) {
// tarball contains <database_name>.mmdb, LICENSE.txt, COPYRIGHTS.txt and optional README.txt files.
// we store mmdb file as is and prepend database name to all other entries to avoid conflicts
try (TarInputStream is = new TarInputStream(new GZIPInputStream(Files.newInputStream(databaseTmpGzFile), 8192))) {
logger.debug("decompressing [{}]", retrievedFile.getFileName());
try (TarInputStream is = new TarInputStream(new GZIPInputStream(Files.newInputStream(retrievedFile), 8192))) {
TarInputStream.TarEntry entry;
while ((entry = is.getNextEntry()) != null) {
// there might be ./ entry in tar, we should skip it
Expand All @@ -407,28 +404,33 @@ void retrieveAndUpdateDatabase(String databaseName, GeoIpTaskState.Metadata meta
}
}
}

logger.debug("moving database from [{}] to [{}]", databaseTmpFile, databaseFile);
Files.move(databaseTmpFile, databaseFile, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
updateDatabase(databaseName, recordedMd5, databaseFile);
Files.delete(databaseTmpGzFile);
},
failure -> {
logger.error(() -> "failed to retrieve database [" + databaseName + "]", failure);
try {
Files.deleteIfExists(databaseTmpFile);
Files.deleteIfExists(databaseTmpGzFile);
} catch (IOException ioe) {
ioe.addSuppressed(failure);
logger.error("Unable to delete tmp database file after failure", ioe);
}
} else {
/*
* Given that this is not code that will be called extremely frequently, we copy the file to the expected location here in
* order to avoid making the rest of the code more complex to avoid this.
*/
Files.copy(retrievedFile, databaseTmpFile, StandardCopyOption.REPLACE_EXISTING);
}
);
// finally, atomically move some-database.mmdb.tmp to some-database.mmdb
logger.debug("moving database from [{}] to [{}]", databaseTmpFile, databaseFile);
Files.move(databaseTmpFile, databaseFile, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
updateDatabase(databaseName, recordedMd5, databaseFile);
Files.delete(retrievedFile);
}, failure -> {
logger.error(() -> "failed to retrieve database [" + databaseName + "]", failure);
try {
Files.deleteIfExists(databaseTmpFile);
Files.deleteIfExists(retrievedFile);
} catch (IOException ioe) {
ioe.addSuppressed(failure);
logger.error("unable to delete tmp database file after failure", ioe);
}
});
}

void updateDatabase(String databaseFileName, String recordedMd5, Path file) {
try {
logger.debug("starting reload of changed geoip database file [{}]", file);
logger.debug("starting reload of changed database file [{}]", file);
DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, file, recordedMd5);
DatabaseReaderLazyLoader existing = databases.put(databaseFileName, loader);
if (existing != null) {
Expand Down Expand Up @@ -458,7 +460,7 @@ void updateDatabase(String databaseFileName, String recordedMd5, Path file) {
logger.debug("no pipelines found to reload");
}
}
logger.info("successfully loaded geoip database file [{}]", file.getFileName());
logger.info("successfully loaded database file [{}]", file.getFileName());
} catch (Exception e) {
logger.error(() -> "failed to update database [" + databaseFileName + "]", e);
}
Expand Down
Loading

0 comments on commit d2260fd

Please sign in to comment.