Skip to content

Commit

Permalink
Merge branch '8.x' into backport/8.x/pr-113637
Browse files Browse the repository at this point in the history
  • Loading branch information
nik9000 committed Sep 27, 2024
2 parents 7ce9644 + 850952f commit 406efa9
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,23 @@ public void testLogsTimeSeriesIndexModeSwitch() throws IOException {
);
assertDataStreamBackingIndexMode("logsdb", 0, DATA_STREAM_NAME);

putTemplate(client, "custom-template", LOGS_STANDARD_INDEX_MODE);
rolloverDataStream(client, DATA_STREAM_NAME);
indexDocument(
client,
DATA_STREAM_NAME,
document(
Instant.now(),
randomAlphaOfLength(10),
randomNonNegativeLong(),
randomFrom("PUT", "POST", "GET"),
randomAlphaOfLength(64),
randomIp(randomBoolean()),
randomLongBetween(1_000_000L, 2_000_000L)
)
);
assertDataStreamBackingIndexMode("standard", 1, DATA_STREAM_NAME);

putTemplate(client, "custom-template", TIME_SERIES_TEMPLATE);
rolloverDataStream(client, DATA_STREAM_NAME);
indexDocument(
Expand All @@ -348,7 +365,24 @@ public void testLogsTimeSeriesIndexModeSwitch() throws IOException {
randomLongBetween(1_000_000L, 2_000_000L)
)
);
assertDataStreamBackingIndexMode("time_series", 1, DATA_STREAM_NAME);
assertDataStreamBackingIndexMode("time_series", 2, DATA_STREAM_NAME);

putTemplate(client, "custom-template", LOGS_STANDARD_INDEX_MODE);
rolloverDataStream(client, DATA_STREAM_NAME);
indexDocument(
client,
DATA_STREAM_NAME,
document(
Instant.now(),
randomAlphaOfLength(10),
randomNonNegativeLong(),
randomFrom("PUT", "POST", "GET"),
randomAlphaOfLength(64),
randomIp(randomBoolean()),
randomLongBetween(1_000_000L, 2_000_000L)
)
);
assertDataStreamBackingIndexMode("standard", 3, DATA_STREAM_NAME);

putTemplate(client, "custom-template", LOGS_TEMPLATE);
rolloverDataStream(client, DATA_STREAM_NAME);
Expand All @@ -365,7 +399,7 @@ public void testLogsTimeSeriesIndexModeSwitch() throws IOException {
randomLongBetween(1_000_000L, 2_000_000L)
)
);
assertDataStreamBackingIndexMode("logsdb", 2, DATA_STREAM_NAME);
assertDataStreamBackingIndexMode("logsdb", 4, DATA_STREAM_NAME);
}

public void testLogsDBToStandardReindex() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ yield new DataStreamAutoShardingEvent(
dataStream.rollover(
indexMetadata.getIndex(),
newGeneration,
metadata.isTimeSeriesTemplate(templateV2),
metadata.retrieveIndexModeFromTemplate(templateV2),
dataStreamAutoShardingEvent
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,43 +448,52 @@ public DataStreamIndices getDataStreamIndices(boolean failureStore) {
* Performs a rollover on a {@code DataStream} instance and returns a new instance containing
* the updated list of backing indices and incremented generation.
*
* @param writeIndex new write index
* @param generation new generation
* @param timeSeries whether the template that created this data stream is in time series mode
* @param autoShardingEvent the auto sharding event this rollover operation is applying
*
* @param writeIndex new write index
* @param generation new generation
* @param indexModeFromTemplate the index mode that originates from the template that created this data stream
* @param autoShardingEvent the auto sharding event this rollover operation is applying
* @return new {@code DataStream} instance with the rollover operation applied
*/
public DataStream rollover(
Index writeIndex,
long generation,
boolean timeSeries,
IndexMode indexModeFromTemplate,
@Nullable DataStreamAutoShardingEvent autoShardingEvent
) {
ensureNotReplicated();

return unsafeRollover(writeIndex, generation, timeSeries, autoShardingEvent);
return unsafeRollover(writeIndex, generation, indexModeFromTemplate, autoShardingEvent);
}

/**
* Like {@link #rollover(Index, long, boolean, DataStreamAutoShardingEvent)}, but does no validation, use with care only.
* Like {@link #rollover(Index, long, IndexMode, DataStreamAutoShardingEvent)}, but does no validation, use with care only.
*/
public DataStream unsafeRollover(Index writeIndex, long generation, boolean timeSeries, DataStreamAutoShardingEvent autoShardingEvent) {
IndexMode indexMode = this.indexMode;
if ((indexMode == null || indexMode == IndexMode.STANDARD) && timeSeries) {
public DataStream unsafeRollover(
Index writeIndex,
long generation,
IndexMode indexModeFromTemplate,
DataStreamAutoShardingEvent autoShardingEvent
) {
IndexMode dsIndexMode = this.indexMode;
if ((dsIndexMode == null || dsIndexMode == IndexMode.STANDARD) && indexModeFromTemplate == IndexMode.TIME_SERIES) {
// This allows for migrating a data stream to be a tsdb data stream:
// (only if index_mode=null|standard then allow it to be set to time_series)
indexMode = IndexMode.TIME_SERIES;
} else if (indexMode == IndexMode.TIME_SERIES && timeSeries == false) {
dsIndexMode = IndexMode.TIME_SERIES;
} else if (dsIndexMode == IndexMode.TIME_SERIES && (indexModeFromTemplate == null || indexModeFromTemplate == IndexMode.STANDARD)) {
// Allow downgrading a time series data stream to a regular data stream
dsIndexMode = null;
} else if ((dsIndexMode == null || dsIndexMode == IndexMode.STANDARD) && indexModeFromTemplate == IndexMode.LOGSDB) {
dsIndexMode = IndexMode.LOGSDB;
} else if (dsIndexMode == IndexMode.LOGSDB && (indexModeFromTemplate == null || indexModeFromTemplate == IndexMode.STANDARD)) {
// Allow downgrading a time series data stream to a regular data stream
indexMode = null;
dsIndexMode = null;
}

List<Index> backingIndices = new ArrayList<>(this.backingIndices.indices);
backingIndices.add(writeIndex);
return copy().setBackingIndices(
this.backingIndices.copy().setIndices(backingIndices).setAutoShardingEvent(autoShardingEvent).setRolloverOnWrite(false).build()
).setGeneration(generation).setIndexMode(indexMode).build();
).setGeneration(generation).setIndexMode(dsIndexMode).build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1307,16 +1307,10 @@ public Map<String, ComposableIndexTemplate> templatesV2() {
.orElse(Collections.emptyMap());
}

// TODO: remove this method:
public boolean isTimeSeriesTemplate(ComposableIndexTemplate indexTemplate) {
if (indexTemplate.getDataStreamTemplate() == null) {
return false;
}

var settings = MetadataIndexTemplateService.resolveSettings(indexTemplate, componentTemplates());
// Not using IndexSettings.MODE.get() to avoid validation that may fail at this point.
var rawIndexMode = settings.get(IndexSettings.MODE.getKey());
var indexMode = rawIndexMode != null ? Enum.valueOf(IndexMode.class, rawIndexMode.toUpperCase(Locale.ROOT)) : null;
if (indexMode == IndexMode.TIME_SERIES) {
var indexModeFromTemplate = retrieveIndexModeFromTemplate(indexTemplate);
if (indexModeFromTemplate == IndexMode.TIME_SERIES) {
// No need to check for the existence of index.routing_path here, because index.mode=time_series can't be specified without it.
// Setting validation takes care of this.
// Also no need to validate that the fields defined in index.routing_path are keyword fields with time_series_dimension
Expand All @@ -1330,6 +1324,17 @@ public boolean isTimeSeriesTemplate(ComposableIndexTemplate indexTemplate) {
return false;
}

public IndexMode retrieveIndexModeFromTemplate(ComposableIndexTemplate indexTemplate) {
if (indexTemplate.getDataStreamTemplate() == null) {
return null;
}

var settings = MetadataIndexTemplateService.resolveSettings(indexTemplate, componentTemplates());
// Not using IndexSettings.MODE.get() to avoid validation that may fail at this point.
var rawIndexMode = settings.get(IndexSettings.MODE.getKey());
return rawIndexMode != null ? Enum.valueOf(IndexMode.class, rawIndexMode.toUpperCase(Locale.ROOT)) : null;
}

public Map<String, DataStream> dataStreams() {
return this.custom(DataStreamMetadata.TYPE, DataStreamMetadata.EMPTY).dataStreams();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ static ClusterState createDataStream(
.collect(Collectors.toCollection(ArrayList::new));
dsBackingIndices.add(writeIndex.getIndex());
boolean hidden = isSystem || template.getDataStreamTemplate().isHidden();
final IndexMode indexMode = metadata.isTimeSeriesTemplate(template) ? IndexMode.TIME_SERIES : null;
final IndexMode indexMode = metadata.retrieveIndexModeFromTemplate(template);
final DataStreamLifecycle lifecycle = isSystem
? MetadataIndexTemplateService.resolveLifecycle(template, systemDataStreamDescriptor.getComponentTemplates())
: MetadataIndexTemplateService.resolveLifecycle(template, metadata.componentTemplates());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ protected DataStream mutateInstance(DataStream instance) {
public void testRollover() {
DataStream ds = DataStreamTestHelper.randomInstance().promoteDataStream();
Tuple<String, Long> newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA, ds.getBackingIndices());
final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), false, null);
final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), null, null);
assertThat(rolledDs.getName(), equalTo(ds.getName()));
assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1));
assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1));
Expand All @@ -225,7 +225,7 @@ public void testRolloverWithConflictingBackingIndexName() {
}

final Tuple<String, Long> newCoordinates = ds.nextWriteIndexAndGeneration(builder.build(), ds.getBackingIndices());
final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), false, null);
final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), null, null);
assertThat(rolledDs.getName(), equalTo(ds.getName()));
assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + numConflictingIndices + 1));
assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1));
Expand All @@ -242,7 +242,12 @@ public void testRolloverUpgradeToTsdbDataStream() {
.build();
var newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA, ds.getBackingIndices());

var rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), true, null);
var rolledDs = ds.rollover(
new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()),
newCoordinates.v2(),
IndexMode.TIME_SERIES,
null
);
assertThat(rolledDs.getName(), equalTo(ds.getName()));
assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1));
assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1));
Expand All @@ -251,11 +256,41 @@ public void testRolloverUpgradeToTsdbDataStream() {
assertThat(rolledDs.getIndexMode(), equalTo(IndexMode.TIME_SERIES));
}

public void testRolloverDowngradeToRegularDataStream() {
public void testRolloverUpgradeToLogsdbDataStream() {
DataStream ds = DataStreamTestHelper.randomInstance()
.copy()
.setReplicated(false)
.setIndexMode(randomBoolean() ? IndexMode.STANDARD : null)
.build();
var newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA, ds.getBackingIndices());

var rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), IndexMode.LOGSDB, null);
assertThat(rolledDs.getName(), equalTo(ds.getName()));
assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1));
assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1));
assertTrue(rolledDs.getIndices().containsAll(ds.getIndices()));
assertTrue(rolledDs.getIndices().contains(rolledDs.getWriteIndex()));
assertThat(rolledDs.getIndexMode(), equalTo(IndexMode.LOGSDB));
}

public void testRolloverDowngradeFromTsdbToRegularDataStream() {
DataStream ds = DataStreamTestHelper.randomInstance().copy().setReplicated(false).setIndexMode(IndexMode.TIME_SERIES).build();
var newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA, ds.getBackingIndices());

var rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), false, null);
var rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), null, null);
assertThat(rolledDs.getName(), equalTo(ds.getName()));
assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1));
assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1));
assertTrue(rolledDs.getIndices().containsAll(ds.getIndices()));
assertTrue(rolledDs.getIndices().contains(rolledDs.getWriteIndex()));
assertThat(rolledDs.getIndexMode(), nullValue());
}

public void testRolloverDowngradeFromLogsdbToRegularDataStream() {
DataStream ds = DataStreamTestHelper.randomInstance().copy().setReplicated(false).setIndexMode(IndexMode.LOGSDB).build();
var newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA, ds.getBackingIndices());

var rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), null, null);
assertThat(rolledDs.getName(), equalTo(ds.getName()));
assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1));
assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.indices.ExecutorNames;
Expand Down Expand Up @@ -77,6 +78,43 @@ public void testCreateDataStream() throws Exception {
assertThat(newState.metadata().dataStreams().get(dataStreamName).isHidden(), is(false));
assertThat(newState.metadata().dataStreams().get(dataStreamName).isReplicated(), is(false));
assertThat(newState.metadata().dataStreams().get(dataStreamName).getLifecycle(), equalTo(DataStreamLifecycle.DEFAULT));
assertThat(newState.metadata().dataStreams().get(dataStreamName).getIndexMode(), nullValue());
assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)), notNullValue());
assertThat(
newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)).getSettings().get("index.hidden"),
equalTo("true")
);
assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)).isSystem(), is(false));
}

public void testCreateDataStreamLogsdb() throws Exception {
final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
final String dataStreamName = "my-data-stream";
ComposableIndexTemplate template = ComposableIndexTemplate.builder()
.indexPatterns(List.of(dataStreamName + "*"))
.template(new Template(Settings.builder().put("index.mode", "logsdb").build(), null, null))
.dataStreamTemplate(new DataStreamTemplate())
.build();
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metadata(Metadata.builder().put("template", template).build())
.build();
CreateDataStreamClusterStateUpdateRequest req = new CreateDataStreamClusterStateUpdateRequest(dataStreamName);
ClusterState newState = MetadataCreateDataStreamService.createDataStream(
metadataCreateIndexService,
Settings.EMPTY,
cs,
true,
req,
ActionListener.noop(),
false
);
assertThat(newState.metadata().dataStreams().size(), equalTo(1));
assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
assertThat(newState.metadata().dataStreams().get(dataStreamName).isSystem(), is(false));
assertThat(newState.metadata().dataStreams().get(dataStreamName).isHidden(), is(false));
assertThat(newState.metadata().dataStreams().get(dataStreamName).isReplicated(), is(false));
assertThat(newState.metadata().dataStreams().get(dataStreamName).getIndexMode(), equalTo(IndexMode.LOGSDB));
assertThat(newState.metadata().dataStreams().get(dataStreamName).getLifecycle(), equalTo(DataStreamLifecycle.DEFAULT));
assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)), notNullValue());
assertThat(
newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)).getSettings().get("index.hidden"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ private SingleForecast forecast(Metadata metadata, DataStream stream, long forec
stream = stream.unsafeRollover(
new Index(rolledDataStreamInfo.v1(), uuid),
rolledDataStreamInfo.v2(),
false,
null,
stream.getAutoShardingEvent()
);

Expand Down

0 comments on commit 406efa9

Please sign in to comment.