diff --git a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/LogsDataStreamRestIT.java b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/LogsDataStreamRestIT.java index f95815d1daff9..b555dc03dc3f6 100644 --- a/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/LogsDataStreamRestIT.java +++ b/modules/data-streams/src/javaRestTest/java/org/elasticsearch/datastreams/LogsDataStreamRestIT.java @@ -333,6 +333,23 @@ public void testLogsTimeSeriesIndexModeSwitch() throws IOException { ); assertDataStreamBackingIndexMode("logsdb", 0, DATA_STREAM_NAME); + putTemplate(client, "custom-template", LOGS_STANDARD_INDEX_MODE); + rolloverDataStream(client, DATA_STREAM_NAME); + indexDocument( + client, + DATA_STREAM_NAME, + document( + Instant.now(), + randomAlphaOfLength(10), + randomNonNegativeLong(), + randomFrom("PUT", "POST", "GET"), + randomAlphaOfLength(64), + randomIp(randomBoolean()), + randomLongBetween(1_000_000L, 2_000_000L) + ) + ); + assertDataStreamBackingIndexMode("standard", 1, DATA_STREAM_NAME); + putTemplate(client, "custom-template", TIME_SERIES_TEMPLATE); rolloverDataStream(client, DATA_STREAM_NAME); indexDocument( @@ -348,7 +365,24 @@ public void testLogsTimeSeriesIndexModeSwitch() throws IOException { randomLongBetween(1_000_000L, 2_000_000L) ) ); - assertDataStreamBackingIndexMode("time_series", 1, DATA_STREAM_NAME); + assertDataStreamBackingIndexMode("time_series", 2, DATA_STREAM_NAME); + + putTemplate(client, "custom-template", LOGS_STANDARD_INDEX_MODE); + rolloverDataStream(client, DATA_STREAM_NAME); + indexDocument( + client, + DATA_STREAM_NAME, + document( + Instant.now(), + randomAlphaOfLength(10), + randomNonNegativeLong(), + randomFrom("PUT", "POST", "GET"), + randomAlphaOfLength(64), + randomIp(randomBoolean()), + randomLongBetween(1_000_000L, 2_000_000L) + ) + ); + assertDataStreamBackingIndexMode("standard", 3, DATA_STREAM_NAME); putTemplate(client, "custom-template", LOGS_TEMPLATE); rolloverDataStream(client, DATA_STREAM_NAME); @@ -365,7 +399,7 @@ public void testLogsTimeSeriesIndexModeSwitch() throws IOException { randomLongBetween(1_000_000L, 2_000_000L) ) ); - assertDataStreamBackingIndexMode("logsdb", 2, DATA_STREAM_NAME); + assertDataStreamBackingIndexMode("logsdb", 4, DATA_STREAM_NAME); } public void testLogsDBToStandardReindex() throws IOException { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java index cfc5b7802d989..0c22a17bb1f6b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/rollover/MetadataRolloverService.java @@ -412,7 +412,7 @@ yield new DataStreamAutoShardingEvent( dataStream.rollover( indexMetadata.getIndex(), newGeneration, - metadata.isTimeSeriesTemplate(templateV2), + metadata.retrieveIndexModeFromTemplate(templateV2), dataStreamAutoShardingEvent ) ); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 5634d40993b4f..78902f5e27c90 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -448,43 +448,52 @@ public DataStreamIndices getDataStreamIndices(boolean failureStore) { * Performs a rollover on a {@code DataStream} instance and returns a new instance containing * the updated list of backing indices and incremented generation. * - * @param writeIndex new write index - * @param generation new generation - * @param timeSeries whether the template that created this data stream is in time series mode - * @param autoShardingEvent the auto sharding event this rollover operation is applying - * + * @param writeIndex new write index + * @param generation new generation + * @param indexModeFromTemplate the index mode that originates from the template that created this data stream + * @param autoShardingEvent the auto sharding event this rollover operation is applying * @return new {@code DataStream} instance with the rollover operation applied */ public DataStream rollover( Index writeIndex, long generation, - boolean timeSeries, + IndexMode indexModeFromTemplate, @Nullable DataStreamAutoShardingEvent autoShardingEvent ) { ensureNotReplicated(); - return unsafeRollover(writeIndex, generation, timeSeries, autoShardingEvent); + return unsafeRollover(writeIndex, generation, indexModeFromTemplate, autoShardingEvent); } /** - * Like {@link #rollover(Index, long, boolean, DataStreamAutoShardingEvent)}, but does no validation, use with care only. + * Like {@link #rollover(Index, long, IndexMode, DataStreamAutoShardingEvent)}, but does no validation, use with care only. */ - public DataStream unsafeRollover(Index writeIndex, long generation, boolean timeSeries, DataStreamAutoShardingEvent autoShardingEvent) { - IndexMode indexMode = this.indexMode; - if ((indexMode == null || indexMode == IndexMode.STANDARD) && timeSeries) { + public DataStream unsafeRollover( + Index writeIndex, + long generation, + IndexMode indexModeFromTemplate, + DataStreamAutoShardingEvent autoShardingEvent + ) { + IndexMode dsIndexMode = this.indexMode; + if ((dsIndexMode == null || dsIndexMode == IndexMode.STANDARD) && indexModeFromTemplate == IndexMode.TIME_SERIES) { // This allows for migrating a data stream to be a tsdb data stream: // (only if index_mode=null|standard then allow it to be set to time_series) - indexMode = IndexMode.TIME_SERIES; - } else if (indexMode == IndexMode.TIME_SERIES && timeSeries == false) { + dsIndexMode = IndexMode.TIME_SERIES; + } else if (dsIndexMode == IndexMode.TIME_SERIES && (indexModeFromTemplate == null || indexModeFromTemplate == IndexMode.STANDARD)) { + // Allow downgrading a time series data stream to a regular data stream + dsIndexMode = null; + } else if ((dsIndexMode == null || dsIndexMode == IndexMode.STANDARD) && indexModeFromTemplate == IndexMode.LOGSDB) { + dsIndexMode = IndexMode.LOGSDB; + } else if (dsIndexMode == IndexMode.LOGSDB && (indexModeFromTemplate == null || indexModeFromTemplate == IndexMode.STANDARD)) { // Allow downgrading a time series data stream to a regular data stream - indexMode = null; + dsIndexMode = null; } List backingIndices = new ArrayList<>(this.backingIndices.indices); backingIndices.add(writeIndex); return copy().setBackingIndices( this.backingIndices.copy().setIndices(backingIndices).setAutoShardingEvent(autoShardingEvent).setRolloverOnWrite(false).build() - ).setGeneration(generation).setIndexMode(indexMode).build(); + ).setGeneration(generation).setIndexMode(dsIndexMode).build(); } /** diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java index c1a2025271891..1319e75520ed6 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java @@ -1307,16 +1307,10 @@ public Map templatesV2() { .orElse(Collections.emptyMap()); } + // TODO: remove this method: public boolean isTimeSeriesTemplate(ComposableIndexTemplate indexTemplate) { - if (indexTemplate.getDataStreamTemplate() == null) { - return false; - } - - var settings = MetadataIndexTemplateService.resolveSettings(indexTemplate, componentTemplates()); - // Not using IndexSettings.MODE.get() to avoid validation that may fail at this point. - var rawIndexMode = settings.get(IndexSettings.MODE.getKey()); - var indexMode = rawIndexMode != null ? Enum.valueOf(IndexMode.class, rawIndexMode.toUpperCase(Locale.ROOT)) : null; - if (indexMode == IndexMode.TIME_SERIES) { + var indexModeFromTemplate = retrieveIndexModeFromTemplate(indexTemplate); + if (indexModeFromTemplate == IndexMode.TIME_SERIES) { // No need to check for the existence of index.routing_path here, because index.mode=time_series can't be specified without it. // Setting validation takes care of this. // Also no need to validate that the fields defined in index.routing_path are keyword fields with time_series_dimension @@ -1330,6 +1324,17 @@ public boolean isTimeSeriesTemplate(ComposableIndexTemplate indexTemplate) { return false; } + public IndexMode retrieveIndexModeFromTemplate(ComposableIndexTemplate indexTemplate) { + if (indexTemplate.getDataStreamTemplate() == null) { + return null; + } + + var settings = MetadataIndexTemplateService.resolveSettings(indexTemplate, componentTemplates()); + // Not using IndexSettings.MODE.get() to avoid validation that may fail at this point. + var rawIndexMode = settings.get(IndexSettings.MODE.getKey()); + return rawIndexMode != null ? Enum.valueOf(IndexMode.class, rawIndexMode.toUpperCase(Locale.ROOT)) : null; + } + public Map dataStreams() { return this.custom(DataStreamMetadata.TYPE, DataStreamMetadata.EMPTY).dataStreams(); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java index 69f753233b418..80e6483bb086d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java @@ -313,7 +313,7 @@ static ClusterState createDataStream( .collect(Collectors.toCollection(ArrayList::new)); dsBackingIndices.add(writeIndex.getIndex()); boolean hidden = isSystem || template.getDataStreamTemplate().isHidden(); - final IndexMode indexMode = metadata.isTimeSeriesTemplate(template) ? IndexMode.TIME_SERIES : null; + final IndexMode indexMode = metadata.retrieveIndexModeFromTemplate(template); final DataStreamLifecycle lifecycle = isSystem ? MetadataIndexTemplateService.resolveLifecycle(template, systemDataStreamDescriptor.getComponentTemplates()) : MetadataIndexTemplateService.resolveLifecycle(template, metadata.componentTemplates()); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index 76573049c6962..8cb7867cff436 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -198,7 +198,7 @@ protected DataStream mutateInstance(DataStream instance) { public void testRollover() { DataStream ds = DataStreamTestHelper.randomInstance().promoteDataStream(); Tuple newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA, ds.getBackingIndices()); - final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), false, null); + final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), null, null); assertThat(rolledDs.getName(), equalTo(ds.getName())); assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1)); assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1)); @@ -225,7 +225,7 @@ public void testRolloverWithConflictingBackingIndexName() { } final Tuple newCoordinates = ds.nextWriteIndexAndGeneration(builder.build(), ds.getBackingIndices()); - final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), false, null); + final DataStream rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), null, null); assertThat(rolledDs.getName(), equalTo(ds.getName())); assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + numConflictingIndices + 1)); assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1)); @@ -242,7 +242,12 @@ public void testRolloverUpgradeToTsdbDataStream() { .build(); var newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA, ds.getBackingIndices()); - var rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), true, null); + var rolledDs = ds.rollover( + new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), + newCoordinates.v2(), + IndexMode.TIME_SERIES, + null + ); assertThat(rolledDs.getName(), equalTo(ds.getName())); assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1)); assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1)); @@ -251,11 +256,41 @@ public void testRolloverUpgradeToTsdbDataStream() { assertThat(rolledDs.getIndexMode(), equalTo(IndexMode.TIME_SERIES)); } - public void testRolloverDowngradeToRegularDataStream() { + public void testRolloverUpgradeToLogsdbDataStream() { + DataStream ds = DataStreamTestHelper.randomInstance() + .copy() + .setReplicated(false) + .setIndexMode(randomBoolean() ? IndexMode.STANDARD : null) + .build(); + var newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA, ds.getBackingIndices()); + + var rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), IndexMode.LOGSDB, null); + assertThat(rolledDs.getName(), equalTo(ds.getName())); + assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1)); + assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1)); + assertTrue(rolledDs.getIndices().containsAll(ds.getIndices())); + assertTrue(rolledDs.getIndices().contains(rolledDs.getWriteIndex())); + assertThat(rolledDs.getIndexMode(), equalTo(IndexMode.LOGSDB)); + } + + public void testRolloverDowngradeFromTsdbToRegularDataStream() { DataStream ds = DataStreamTestHelper.randomInstance().copy().setReplicated(false).setIndexMode(IndexMode.TIME_SERIES).build(); var newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA, ds.getBackingIndices()); - var rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), false, null); + var rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), null, null); + assertThat(rolledDs.getName(), equalTo(ds.getName())); + assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1)); + assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1)); + assertTrue(rolledDs.getIndices().containsAll(ds.getIndices())); + assertTrue(rolledDs.getIndices().contains(rolledDs.getWriteIndex())); + assertThat(rolledDs.getIndexMode(), nullValue()); + } + + public void testRolloverDowngradeFromLogsdbToRegularDataStream() { + DataStream ds = DataStreamTestHelper.randomInstance().copy().setReplicated(false).setIndexMode(IndexMode.LOGSDB).build(); + var newCoordinates = ds.nextWriteIndexAndGeneration(Metadata.EMPTY_METADATA, ds.getBackingIndices()); + + var rolledDs = ds.rollover(new Index(newCoordinates.v1(), UUIDs.randomBase64UUID()), newCoordinates.v2(), null, null); assertThat(rolledDs.getName(), equalTo(ds.getName())); assertThat(rolledDs.getGeneration(), equalTo(ds.getGeneration() + 1)); assertThat(rolledDs.getIndices().size(), equalTo(ds.getIndices().size() + 1)); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java index bbcf1ca33a0c2..6c76abf7cebe3 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamServiceTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.Maps; import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexMode; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexVersion; import org.elasticsearch.indices.ExecutorNames; @@ -77,6 +78,43 @@ public void testCreateDataStream() throws Exception { assertThat(newState.metadata().dataStreams().get(dataStreamName).isHidden(), is(false)); assertThat(newState.metadata().dataStreams().get(dataStreamName).isReplicated(), is(false)); assertThat(newState.metadata().dataStreams().get(dataStreamName).getLifecycle(), equalTo(DataStreamLifecycle.DEFAULT)); + assertThat(newState.metadata().dataStreams().get(dataStreamName).getIndexMode(), nullValue()); + assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)), notNullValue()); + assertThat( + newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)).getSettings().get("index.hidden"), + equalTo("true") + ); + assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)).isSystem(), is(false)); + } + + public void testCreateDataStreamLogsdb() throws Exception { + final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService(); + final String dataStreamName = "my-data-stream"; + ComposableIndexTemplate template = ComposableIndexTemplate.builder() + .indexPatterns(List.of(dataStreamName + "*")) + .template(new Template(Settings.builder().put("index.mode", "logsdb").build(), null, null)) + .dataStreamTemplate(new DataStreamTemplate()) + .build(); + ClusterState cs = ClusterState.builder(new ClusterName("_name")) + .metadata(Metadata.builder().put("template", template).build()) + .build(); + CreateDataStreamClusterStateUpdateRequest req = new CreateDataStreamClusterStateUpdateRequest(dataStreamName); + ClusterState newState = MetadataCreateDataStreamService.createDataStream( + metadataCreateIndexService, + Settings.EMPTY, + cs, + true, + req, + ActionListener.noop(), + false + ); + assertThat(newState.metadata().dataStreams().size(), equalTo(1)); + assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName)); + assertThat(newState.metadata().dataStreams().get(dataStreamName).isSystem(), is(false)); + assertThat(newState.metadata().dataStreams().get(dataStreamName).isHidden(), is(false)); + assertThat(newState.metadata().dataStreams().get(dataStreamName).isReplicated(), is(false)); + assertThat(newState.metadata().dataStreams().get(dataStreamName).getIndexMode(), equalTo(IndexMode.LOGSDB)); + assertThat(newState.metadata().dataStreams().get(dataStreamName).getLifecycle(), equalTo(DataStreamLifecycle.DEFAULT)); assertThat(newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)), notNullValue()); assertThat( newState.metadata().index(DataStream.getDefaultBackingIndexName(dataStreamName, 1)).getSettings().get("index.hidden"), diff --git a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java index 2f8cccdc303e6..4c612d5e04886 100644 --- a/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java +++ b/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java @@ -823,7 +823,7 @@ private SingleForecast forecast(Metadata metadata, DataStream stream, long forec stream = stream.unsafeRollover( new Index(rolledDataStreamInfo.v1(), uuid), rolledDataStreamInfo.v2(), - false, + null, stream.getAutoShardingEvent() );