diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java index 96e71c9aa65c2..710ea8c15b66e 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/GetDataStreamsResponseTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.DataStreamLifecycle; +import org.elasticsearch.cluster.metadata.DataStreamOptions; import org.elasticsearch.cluster.metadata.DataStreamTestHelper; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; @@ -83,7 +84,7 @@ public void testResponseIlmAndDataStreamLifecycleRepresentation() throws Excepti .setAllowCustomRouting(true) .setIndexMode(IndexMode.STANDARD) .setLifecycle(new DataStreamLifecycle()) - .setFailureStoreEnabled(true) + .setDataStreamOptions(DataStreamOptions.FAILURE_STORE_ENABLED) .setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStores).build()) .build(); @@ -186,7 +187,7 @@ public void testResponseIlmAndDataStreamLifecycleRepresentation() throws Excepti .setAllowCustomRouting(true) .setIndexMode(IndexMode.STANDARD) .setLifecycle(new DataStreamLifecycle(null, null, false)) - .setFailureStoreEnabled(true) + .setDataStreamOptions(DataStreamOptions.FAILURE_STORE_ENABLED) .setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStores).build()) .build(); diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java index 7cf7362ae472a..c1a5a8da1f4d2 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java @@ -42,6 +42,7 @@ import org.elasticsearch.cluster.metadata.DataStreamLifecycle; import org.elasticsearch.cluster.metadata.DataStreamLifecycle.Downsampling; import org.elasticsearch.cluster.metadata.DataStreamLifecycle.Downsampling.Round; +import org.elasticsearch.cluster.metadata.DataStreamOptions; import org.elasticsearch.cluster.metadata.DataStreamTestHelper; import org.elasticsearch.cluster.metadata.IndexAbstraction; import org.elasticsearch.cluster.metadata.IndexGraveyard; @@ -1492,6 +1493,13 @@ public void testTargetIndices() { String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); int numBackingIndices = 3; int numFailureIndices = 2; + int mutationBranch = randomIntBetween(0, 2); + DataStreamOptions dataStreamOptions = switch (mutationBranch) { + case 0 -> DataStreamOptions.EMPTY; + case 1 -> DataStreamOptions.FAILURE_STORE_ENABLED; + case 2 -> DataStreamOptions.FAILURE_STORE_DISABLED; + default -> throw new IllegalStateException("Unexpected value: " + mutationBranch); + }; Metadata.Builder builder = Metadata.builder(); DataStream dataStream = createDataStream( builder, @@ -1501,7 +1509,7 @@ public void testTargetIndices() { settings(IndexVersion.current()), new DataStreamLifecycle(), now - ).copy().setFailureStoreEnabled(randomBoolean()).build(); // failure store is managed even when disabled + ).copy().setDataStreamOptions(dataStreamOptions).build(); // failure store is managed even when disabled builder.put(dataStream); Metadata metadata = builder.build(); Set indicesToExclude = Set.of(dataStream.getIndices().get(0), dataStream.getFailureIndices().getIndices().get(0)); @@ -1533,7 +1541,7 @@ public void testFailureStoreIsManagedEvenWhenDisabled() { settings(IndexVersion.current()), DataStreamLifecycle.newBuilder().dataRetention(0).build(), now - ).copy().setFailureStoreEnabled(false).build(); // failure store is managed even when it is disabled + ).copy().setDataStreamOptions(DataStreamOptions.FAILURE_STORE_DISABLED).build(); // failure store is managed even when disabled builder.put(dataStream); ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build(); diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 0e55ce1ee17f0..f58e30bf11e2a 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -227,6 +227,7 @@ static TransportVersion def(int id) { public static final TransportVersion ML_INFERENCE_CHUNKING_SETTINGS = def(8_751_00_0); public static final TransportVersion SEMANTIC_QUERY_INNER_HITS = def(8_752_00_0); public static final TransportVersion RETAIN_ILM_STEP_INFO = def(8_753_00_0); + public static final TransportVersion ADD_DATA_STREAM_OPTIONS = def(8_754_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java index 78902f5e27c90..dd4a52fd9beda 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStream.java @@ -112,7 +112,7 @@ public static boolean isFailureStoreFeatureFlagEnabled() { private final IndexMode indexMode; @Nullable private final DataStreamLifecycle lifecycle; - private final boolean failureStoreEnabled; + private final DataStreamOptions dataStreamOptions; private final DataStreamIndices backingIndices; private final DataStreamIndices failureIndices; @@ -128,7 +128,7 @@ public DataStream( boolean allowCustomRouting, IndexMode indexMode, DataStreamLifecycle lifecycle, - boolean failureStoreEnabled, + @Nullable DataStreamOptions dataStreamOptions, List failureIndices, boolean rolloverOnWrite, @Nullable DataStreamAutoShardingEvent autoShardingEvent @@ -144,7 +144,7 @@ public DataStream( allowCustomRouting, indexMode, lifecycle, - failureStoreEnabled, + dataStreamOptions, new DataStreamIndices(BACKING_INDEX_PREFIX, List.copyOf(indices), rolloverOnWrite, autoShardingEvent), new DataStreamIndices(FAILURE_STORE_PREFIX, List.copyOf(failureIndices), false, null) ); @@ -162,7 +162,7 @@ public DataStream( boolean allowCustomRouting, IndexMode indexMode, DataStreamLifecycle lifecycle, - boolean failureStoreEnabled, + DataStreamOptions dataStreamOptions, DataStreamIndices backingIndices, DataStreamIndices failureIndices ) { @@ -177,7 +177,7 @@ public DataStream( this.allowCustomRouting = allowCustomRouting; this.indexMode = indexMode; this.lifecycle = lifecycle; - this.failureStoreEnabled = failureStoreEnabled; + this.dataStreamOptions = dataStreamOptions == null ? DataStreamOptions.EMPTY : dataStreamOptions; assert backingIndices.indices.isEmpty() == false; assert replicated == false || (backingIndices.rolloverOnWrite == false && failureIndices.rolloverOnWrite == false) : "replicated data streams cannot be marked for lazy rollover"; @@ -198,9 +198,11 @@ public static DataStream read(StreamInput in) throws IOException { var lifecycle = in.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X) ? in.readOptionalWriteable(DataStreamLifecycle::new) : null; - var failureStoreEnabled = in.getTransportVersion().onOrAfter(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION) - ? in.readBoolean() - : false; + // This boolean flag has been moved in data stream options + var failureStoreEnabled = in.getTransportVersion() + .between(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION, TransportVersions.ADD_DATA_STREAM_OPTIONS) + ? in.readBoolean() + : false; var failureIndices = in.getTransportVersion().onOrAfter(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION) ? readIndices(in) : List.of(); @@ -213,6 +215,14 @@ public static DataStream read(StreamInput in) throws IOException { failureIndicesBuilder.setRolloverOnWrite(in.readBoolean()) .setAutoShardingEvent(in.readOptionalWriteable(DataStreamAutoShardingEvent::new)); } + DataStreamOptions dataStreamOptions; + if (in.getTransportVersion().onOrAfter(TransportVersions.ADD_DATA_STREAM_OPTIONS)) { + dataStreamOptions = in.readOptionalWriteable(DataStreamOptions::read); + } else { + // We cannot distinguish if failure store was explicitly disabled or not. Given that failure store + // is still behind a feature flag in previous version we use the default value instead of explicitly disabling it. + dataStreamOptions = failureStoreEnabled ? DataStreamOptions.FAILURE_STORE_ENABLED : null; + } return new DataStream( name, generation, @@ -224,7 +234,7 @@ public static DataStream read(StreamInput in) throws IOException { allowCustomRouting, indexMode, lifecycle, - failureStoreEnabled, + dataStreamOptions, backingIndicesBuilder.build(), failureIndicesBuilder.build() ); @@ -274,6 +284,10 @@ public boolean isFailureStoreIndex(String indexName) { return failureIndices.containsIndex(indexName); } + public DataStreamOptions getDataStreamOptions() { + return dataStreamOptions; + } + public boolean rolloverOnWrite() { return backingIndices.rolloverOnWrite; } @@ -406,13 +420,12 @@ public boolean isAllowCustomRouting() { } /** - * Determines if this data stream should persist ingest pipeline and mapping failures from bulk requests to a locally - * configured failure store. - * - * @return Whether this data stream should store ingestion failures. + * Determines if this data stream has its failure store enabled or not. Currently, the failure store + * is enabled only when a user has explicitly requested it. + * @return true, if the user has explicitly enabled the failure store. */ public boolean isFailureStoreEnabled() { - return failureStoreEnabled; + return dataStreamOptions.failureStore() != null && dataStreamOptions.failureStore().isExplicitlyEnabled(); } @Nullable @@ -1063,8 +1076,11 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X)) { out.writeOptionalWriteable(lifecycle); } + if (out.getTransportVersion() + .between(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION, TransportVersions.ADD_DATA_STREAM_OPTIONS)) { + out.writeBoolean(isFailureStoreEnabled()); + } if (out.getTransportVersion().onOrAfter(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION)) { - out.writeBoolean(failureStoreEnabled); out.writeCollection(failureIndices.indices); } if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) { @@ -1077,6 +1093,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(failureIndices.rolloverOnWrite); out.writeOptionalWriteable(failureIndices.autoShardingEvent); } + if (out.getTransportVersion().onOrAfter(TransportVersions.ADD_DATA_STREAM_OPTIONS)) { + out.writeOptionalWriteable(dataStreamOptions.isEmpty() ? null : dataStreamOptions); + } } public static final ParseField NAME_FIELD = new ParseField("name"); @@ -1096,6 +1115,7 @@ public void writeTo(StreamOutput out) throws IOException { public static final ParseField AUTO_SHARDING_FIELD = new ParseField("auto_sharding"); public static final ParseField FAILURE_ROLLOVER_ON_WRITE_FIELD = new ParseField("failure_rollover_on_write"); public static final ParseField FAILURE_AUTO_SHARDING_FIELD = new ParseField("failure_auto_sharding"); + public static final ParseField DATA_STREAM_OPTIONS_FIELD = new ParseField("options"); @SuppressWarnings("unchecked") private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("data_stream", args -> { @@ -1110,6 +1130,16 @@ public void writeTo(StreamOutput out) throws IOException { (DataStreamAutoShardingEvent) args[15] ) : new DataStreamIndices(FAILURE_STORE_PREFIX, List.of(), false, null); + // We cannot distinguish if failure store was explicitly disabled or not. Given that failure store + // is still behind a feature flag in previous version we use the default value instead of explicitly disabling it. + DataStreamOptions dataStreamOptions = DataStreamOptions.EMPTY; + if (DataStream.isFailureStoreFeatureFlagEnabled()) { + if (args[16] != null) { + dataStreamOptions = (DataStreamOptions) args[16]; + } else if (failureStoreEnabled) { + dataStreamOptions = DataStreamOptions.FAILURE_STORE_ENABLED; + } + } return new DataStream( (String) args[0], (Long) args[2], @@ -1121,7 +1151,7 @@ public void writeTo(StreamOutput out) throws IOException { args[7] != null && (boolean) args[7], args[8] != null ? IndexMode.fromString((String) args[8]) : null, (DataStreamLifecycle) args[9], - failureStoreEnabled, + dataStreamOptions, new DataStreamIndices( BACKING_INDEX_PREFIX, (List) args[1], @@ -1171,6 +1201,11 @@ public void writeTo(StreamOutput out) throws IOException { (p, c) -> DataStreamAutoShardingEvent.fromXContent(p), FAILURE_AUTO_SHARDING_FIELD ); + PARSER.declareObject( + ConstructingObjectParser.optionalConstructorArg(), + (p, c) -> DataStreamOptions.fromXContent(p), + DATA_STREAM_OPTIONS_FIELD + ); } } @@ -1208,7 +1243,6 @@ public XContentBuilder toXContent( builder.field(SYSTEM_FIELD.getPreferredName(), system); builder.field(ALLOW_CUSTOM_ROUTING.getPreferredName(), allowCustomRouting); if (DataStream.isFailureStoreFeatureFlagEnabled()) { - builder.field(FAILURE_STORE_FIELD.getPreferredName(), failureStoreEnabled); if (failureIndices.indices.isEmpty() == false) { builder.xContentList(FAILURE_INDICES_FIELD.getPreferredName(), failureIndices.indices); } @@ -1218,6 +1252,10 @@ public XContentBuilder toXContent( failureIndices.autoShardingEvent.toXContent(builder, params); builder.endObject(); } + if (dataStreamOptions.isEmpty() == false) { + builder.field(DATA_STREAM_OPTIONS_FIELD.getPreferredName()); + dataStreamOptions.toXContent(builder, params); + } } if (indexMode != null) { builder.field(INDEX_MODE.getPreferredName(), indexMode); @@ -1250,7 +1288,7 @@ public boolean equals(Object o) { && allowCustomRouting == that.allowCustomRouting && indexMode == that.indexMode && Objects.equals(lifecycle, that.lifecycle) - && failureStoreEnabled == that.failureStoreEnabled + && Objects.equals(dataStreamOptions, that.dataStreamOptions) && Objects.equals(backingIndices, that.backingIndices) && Objects.equals(failureIndices, that.failureIndices); } @@ -1267,7 +1305,7 @@ public int hashCode() { allowCustomRouting, indexMode, lifecycle, - failureStoreEnabled, + dataStreamOptions, backingIndices, failureIndices ); @@ -1580,7 +1618,7 @@ public static class Builder { private IndexMode indexMode = null; @Nullable private DataStreamLifecycle lifecycle = null; - private boolean failureStoreEnabled = false; + private DataStreamOptions dataStreamOptions = DataStreamOptions.EMPTY; private DataStreamIndices backingIndices; private DataStreamIndices failureIndices = DataStreamIndices.failureIndicesBuilder(List.of()).build(); @@ -1605,7 +1643,7 @@ private Builder(DataStream dataStream) { allowCustomRouting = dataStream.allowCustomRouting; indexMode = dataStream.indexMode; lifecycle = dataStream.lifecycle; - failureStoreEnabled = dataStream.failureStoreEnabled; + dataStreamOptions = dataStream.dataStreamOptions; backingIndices = dataStream.backingIndices; failureIndices = dataStream.failureIndices; } @@ -1660,8 +1698,8 @@ public Builder setLifecycle(DataStreamLifecycle lifecycle) { return this; } - public Builder setFailureStoreEnabled(boolean failureStoreEnabled) { - this.failureStoreEnabled = failureStoreEnabled; + public Builder setDataStreamOptions(DataStreamOptions dataStreamOptions) { + this.dataStreamOptions = dataStreamOptions; return this; } @@ -1697,7 +1735,7 @@ public DataStream build() { allowCustomRouting, indexMode, lifecycle, - failureStoreEnabled, + dataStreamOptions, backingIndices, failureIndices ); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamFailureStore.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamFailureStore.java index d94a7630eb868..e9d32594fa833 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamFailureStore.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamFailureStore.java @@ -24,38 +24,51 @@ /** * Holds the data stream failure store metadata that enable or disable the failure store of a data stream. Currently, it - * supports the following configurations: - * - enabled + * supports the following configurations only explicitly enabling or disabling the failure store */ -public record DataStreamFailureStore(boolean enabled) implements SimpleDiffable, ToXContentObject { +public record DataStreamFailureStore(Boolean enabled) implements SimpleDiffable, ToXContentObject { public static final ParseField ENABLED_FIELD = new ParseField("enabled"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "failure_store", false, - (args, unused) -> new DataStreamFailureStore(args[0] == null || (Boolean) args[0]) + (args, unused) -> new DataStreamFailureStore((Boolean) args[0]) ); static { - PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ENABLED_FIELD); + PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), ENABLED_FIELD); } - public DataStreamFailureStore() { - this(true); + /** + * @param enabled, true when the failure is enabled, false when it's disabled, null when it depends on other configuration. Currently, + * null value is not supported because there are no other arguments + * @throws IllegalArgumentException when all the constructor arguments are null + */ + public DataStreamFailureStore { + if (enabled == null) { + throw new IllegalArgumentException("Failure store configuration should have at least one non-null configuration value."); + } } public DataStreamFailureStore(StreamInput in) throws IOException { - this(in.readBoolean()); + this(in.readOptionalBoolean()); } public static Diff readDiffFrom(StreamInput in) throws IOException { return SimpleDiffable.readDiffFrom(DataStreamFailureStore::new, in); } + /** + * @return iff the user has explicitly enabled the failure store + */ + public boolean isExplicitlyEnabled() { + return enabled != null && enabled; + } + @Override public void writeTo(StreamOutput out) throws IOException { - out.writeBoolean(enabled); + out.writeOptionalBoolean(enabled); } @Override @@ -66,7 +79,9 @@ public String toString() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field(ENABLED_FIELD.getPreferredName(), enabled); + if (enabled != null) { + builder.field(ENABLED_FIELD.getPreferredName(), enabled); + } builder.endObject(); return builder; } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamOptions.java b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamOptions.java index 29211e8c1b37b..9cd4e2625e2ba 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamOptions.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/DataStreamOptions.java @@ -35,6 +35,9 @@ public record DataStreamOptions(@Nullable DataStreamFailureStore failureStore) ToXContentObject { public static final ParseField FAILURE_STORE_FIELD = new ParseField("failure_store"); + public static final DataStreamOptions FAILURE_STORE_ENABLED = new DataStreamOptions(new DataStreamFailureStore(true)); + public static final DataStreamOptions FAILURE_STORE_DISABLED = new DataStreamOptions(new DataStreamFailureStore(false)); + public static final DataStreamOptions EMPTY = new DataStreamOptions(); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "options", @@ -59,15 +62,14 @@ public static DataStreamOptions read(StreamInput in) throws IOException { return new DataStreamOptions(in.readOptionalWriteable(DataStreamFailureStore::new)); } - @Nullable - public DataStreamFailureStore getFailureStore() { - return failureStore; - } - public static Diff readDiffFrom(StreamInput in) throws IOException { return SimpleDiffable.readDiffFrom(DataStreamOptions::read, in); } + public boolean isEmpty() { + return this.equals(EMPTY); + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(failureStore); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java index 80e6483bb086d..2df9cf706d892 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetadataCreateDataStreamService.java @@ -329,7 +329,7 @@ static ClusterState createDataStream( template.getDataStreamTemplate().isAllowCustomRouting(), indexMode, lifecycle == null && isDslOnlyMode ? DataStreamLifecycle.DEFAULT : lifecycle, - template.getDataStreamTemplate().hasFailureStore(), + template.getDataStreamTemplate().hasFailureStore() ? DataStreamOptions.FAILURE_STORE_ENABLED : DataStreamOptions.EMPTY, new DataStream.DataStreamIndices(DataStream.BACKING_INDEX_PREFIX, dsBackingIndices, false, null), // If the failure store shouldn't be initialized on data stream creation, we're marking it for "lazy rollover", which will // initialize the failure store on first write. diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java index b87dfd07181dc..c39be42f96150 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.coordination.NoMasterBlockService; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.DataStreamOptions; import org.elasticsearch.cluster.metadata.DataStreamTestHelper; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -130,7 +131,7 @@ public class BulkOperationTests extends ESTestCase { ); private final DataStream dataStream3 = DataStream.builder(fsRolloverDataStreamName, List.of(ds3BackingIndex1.getIndex())) .setGeneration(1) - .setFailureStoreEnabled(true) + .setDataStreamOptions(DataStreamOptions.FAILURE_STORE_ENABLED) .setFailureIndices( DataStream.DataStreamIndices.failureIndicesBuilder(List.of(ds3FailureStore1.getIndex())).setRolloverOnWrite(true).build() ) diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamFailureStoreTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamFailureStoreTests.java index 4a9f13170f694..ffd703048dbd3 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamFailureStoreTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamFailureStoreTests.java @@ -15,6 +15,8 @@ import java.io.IOException; +import static org.hamcrest.Matchers.containsString; + public class DataStreamFailureStoreTests extends AbstractXContentSerializingTestCase { @Override @@ -40,4 +42,9 @@ protected DataStreamFailureStore doParseInstance(XContentParser parser) throws I static DataStreamFailureStore randomFailureStore() { return new DataStreamFailureStore(randomBoolean()); } + + public void testInvalidEmptyConfiguration() { + IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> new DataStreamFailureStore((Boolean) null)); + assertThat(exception.getMessage(), containsString("at least one non-null configuration value")); + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamOptionsTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamOptionsTests.java index 764c02d7fcec6..020955d226a0f 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamOptionsTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamOptionsTests.java @@ -29,11 +29,11 @@ protected DataStreamOptions createTestInstance() { @Override protected DataStreamOptions mutateInstance(DataStreamOptions instance) throws IOException { - var failureStore = instance.getFailureStore(); + var failureStore = instance.failureStore(); if (failureStore == null) { failureStore = DataStreamFailureStoreTests.randomFailureStore(); } else { - failureStore = randomBoolean() ? null : new DataStreamFailureStore(failureStore.enabled() == false); + failureStore = randomBoolean() ? null : randomValueOtherThan(failureStore, DataStreamFailureStoreTests::randomFailureStore); } return new DataStreamOptions(failureStore); } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java index 8cb7867cff436..7d29a88d62972 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/DataStreamTests.java @@ -94,13 +94,13 @@ protected DataStream mutateInstance(DataStream instance) { var allowsCustomRouting = instance.isAllowCustomRouting(); var indexMode = instance.getIndexMode(); var lifecycle = instance.getLifecycle(); - var failureStore = instance.isFailureStoreEnabled(); + var dataStreamOptions = instance.getDataStreamOptions(); var failureIndices = instance.getFailureIndices().getIndices(); var rolloverOnWrite = instance.rolloverOnWrite(); var autoShardingEvent = instance.getAutoShardingEvent(); var failureRolloverOnWrite = instance.getFailureIndices().isRolloverOnWrite(); var failureAutoShardingEvent = instance.getBackingIndices().getAutoShardingEvent(); - switch (between(0, 14)) { + switch (between(0, 15)) { case 0 -> name = randomAlphaOfLength(10); case 1 -> indices = randomNonEmptyIndexInstances(); case 2 -> generation = instance.getGeneration() + randomIntBetween(1, 10); @@ -134,20 +134,20 @@ protected DataStream mutateInstance(DataStream instance) { case 9 -> lifecycle = randomBoolean() && lifecycle != null ? null : DataStreamLifecycle.newBuilder().dataRetention(randomMillisUpToYear9999()).build(); - case 10 -> { - failureIndices = randomValueOtherThan(failureIndices, DataStreamTestHelper::randomIndexInstances); - failureStore = failureIndices.isEmpty() == false; - } - case 11 -> { + case 10 -> failureIndices = randomValueOtherThan(failureIndices, DataStreamTestHelper::randomIndexInstances); + case 11 -> dataStreamOptions = dataStreamOptions.isEmpty() ? new DataStreamOptions(new DataStreamFailureStore(randomBoolean())) + : randomBoolean() ? (randomBoolean() ? null : DataStreamOptions.EMPTY) + : new DataStreamOptions(new DataStreamFailureStore(dataStreamOptions.failureStore().enabled() == false)); + case 12 -> { rolloverOnWrite = rolloverOnWrite == false; isReplicated = rolloverOnWrite == false && isReplicated; } - case 12 -> { + case 13 -> { if (randomBoolean() || autoShardingEvent == null) { // If we're mutating the auto sharding event of the failure store, we need to ensure there's at least one failure index. if (failureIndices.isEmpty()) { failureIndices = DataStreamTestHelper.randomNonEmptyIndexInstances(); - failureStore = true; + dataStreamOptions = DataStreamOptions.FAILURE_STORE_ENABLED; } autoShardingEvent = new DataStreamAutoShardingEvent( failureIndices.get(failureIndices.size() - 1).getName(), @@ -158,19 +158,17 @@ protected DataStream mutateInstance(DataStream instance) { autoShardingEvent = null; } } - case 13 -> { + case 14 -> { failureRolloverOnWrite = failureRolloverOnWrite == false; isReplicated = failureRolloverOnWrite == false && isReplicated; } - case 14 -> { - failureAutoShardingEvent = randomBoolean() && failureAutoShardingEvent != null - ? null - : new DataStreamAutoShardingEvent( - indices.get(indices.size() - 1).getName(), - randomIntBetween(1, 10), - randomMillisUpToYear9999() - ); - } + case 15 -> failureAutoShardingEvent = randomBoolean() && failureAutoShardingEvent != null + ? null + : new DataStreamAutoShardingEvent( + indices.get(indices.size() - 1).getName(), + randomIntBetween(1, 10), + randomMillisUpToYear9999() + ); } return new DataStream( @@ -184,7 +182,7 @@ protected DataStream mutateInstance(DataStream instance) { allowsCustomRouting, indexMode, lifecycle, - failureStore, + dataStreamOptions, new DataStream.DataStreamIndices(DataStream.BACKING_INDEX_PREFIX, indices, rolloverOnWrite, autoShardingEvent), new DataStream.DataStreamIndices( DataStream.BACKING_INDEX_PREFIX, @@ -1914,7 +1912,7 @@ public void testXContentSerializationWithRolloverAndEffectiveRetention() throws randomBoolean(), randomBoolean() ? IndexMode.STANDARD : null, // IndexMode.TIME_SERIES triggers validation that many unit tests doesn't pass lifecycle, - failureStore, + new DataStreamOptions(new DataStreamFailureStore(failureStore)), failureIndices, false, null @@ -2102,7 +2100,7 @@ public void testWriteFailureIndex() { randomBoolean(), randomBoolean() ? IndexMode.STANDARD : IndexMode.TIME_SERIES, DataStreamLifecycleTests.randomLifecycle(), - false, + DataStreamOptions.FAILURE_STORE_DISABLED, List.of(), replicated == false && randomBoolean(), null @@ -2120,7 +2118,7 @@ public void testWriteFailureIndex() { randomBoolean(), randomBoolean() ? IndexMode.STANDARD : IndexMode.TIME_SERIES, DataStreamLifecycleTests.randomLifecycle(), - true, + DataStreamOptions.FAILURE_STORE_ENABLED, List.of(), replicated == false && randomBoolean(), null @@ -2145,7 +2143,7 @@ public void testWriteFailureIndex() { randomBoolean(), randomBoolean() ? IndexMode.STANDARD : IndexMode.TIME_SERIES, DataStreamLifecycleTests.randomLifecycle(), - true, + DataStreamOptions.FAILURE_STORE_ENABLED, failureIndices, replicated == false && randomBoolean(), null @@ -2169,7 +2167,7 @@ public void testIsFailureIndex() { randomBoolean(), randomBoolean() ? IndexMode.STANDARD : IndexMode.TIME_SERIES, DataStreamLifecycleTests.randomLifecycle(), - false, + DataStreamOptions.FAILURE_STORE_DISABLED, List.of(), replicated == false && randomBoolean(), null @@ -2191,7 +2189,7 @@ public void testIsFailureIndex() { randomBoolean(), randomBoolean() ? IndexMode.STANDARD : IndexMode.TIME_SERIES, DataStreamLifecycleTests.randomLifecycle(), - true, + DataStreamOptions.FAILURE_STORE_ENABLED, List.of(), replicated == false && randomBoolean(), null @@ -2222,7 +2220,7 @@ public void testIsFailureIndex() { randomBoolean(), randomBoolean() ? IndexMode.STANDARD : IndexMode.TIME_SERIES, DataStreamLifecycleTests.randomLifecycle(), - true, + DataStreamOptions.FAILURE_STORE_ENABLED, failureIndices, replicated == false && randomBoolean(), null diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java index dd9b4ec21a4d1..5ca52024e82f6 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/metadata/DataStreamTestHelper.java @@ -152,7 +152,7 @@ public static DataStream newInstance( .setMetadata(metadata) .setReplicated(replicated) .setLifecycle(lifecycle) - .setFailureStoreEnabled(failureStores.isEmpty() == false) + .setDataStreamOptions(failureStores.isEmpty() ? DataStreamOptions.EMPTY : DataStreamOptions.FAILURE_STORE_ENABLED) .setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStores).build()) .build(); } @@ -348,7 +348,7 @@ public static DataStream randomInstance(String dataStreamName, LongSupplier time randomBoolean(), randomBoolean() ? IndexMode.STANDARD : null, // IndexMode.TIME_SERIES triggers validation that many unit tests doesn't pass randomBoolean() ? DataStreamLifecycle.newBuilder().dataRetention(randomMillisUpToYear9999()).build() : null, - failureStore, + failureStore ? DataStreamOptions.FAILURE_STORE_ENABLED : DataStreamOptions.EMPTY, DataStream.DataStreamIndices.backingIndicesBuilder(indices) .setRolloverOnWrite(replicated == false && randomBoolean()) .setAutoShardingEvent( diff --git a/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/action/DataStreamLifecycleUsageTransportActionIT.java b/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/action/DataStreamLifecycleUsageTransportActionIT.java index a08eb935178cf..499e660d2e542 100644 --- a/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/action/DataStreamLifecycleUsageTransportActionIT.java +++ b/x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/core/action/DataStreamLifecycleUsageTransportActionIT.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.metadata.DataStreamAlias; import org.elasticsearch.cluster.metadata.DataStreamGlobalRetentionSettings; import org.elasticsearch.cluster.metadata.DataStreamLifecycle; +import org.elasticsearch.cluster.metadata.DataStreamOptions; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; @@ -178,7 +179,7 @@ public void testAction() throws Exception { randomBoolean(), IndexMode.STANDARD, lifecycle, - false, + DataStreamOptions.EMPTY, List.of(), replicated == false && randomBoolean(), null