Skip to content

Commit

Permalink
Move the failure store enable flag into the data stream options (#113176
Browse files Browse the repository at this point in the history
)
  • Loading branch information
gmarouli committed Sep 30, 2024
1 parent 1142c6d commit 76c2d8d
Show file tree
Hide file tree
Showing 13 changed files with 146 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.cluster.metadata.DataStreamOptions;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -83,7 +84,7 @@ public void testResponseIlmAndDataStreamLifecycleRepresentation() throws Excepti
.setAllowCustomRouting(true)
.setIndexMode(IndexMode.STANDARD)
.setLifecycle(new DataStreamLifecycle())
.setFailureStoreEnabled(true)
.setDataStreamOptions(DataStreamOptions.FAILURE_STORE_ENABLED)
.setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStores).build())
.build();

Expand Down Expand Up @@ -186,7 +187,7 @@ public void testResponseIlmAndDataStreamLifecycleRepresentation() throws Excepti
.setAllowCustomRouting(true)
.setIndexMode(IndexMode.STANDARD)
.setLifecycle(new DataStreamLifecycle(null, null, false))
.setFailureStoreEnabled(true)
.setDataStreamOptions(DataStreamOptions.FAILURE_STORE_ENABLED)
.setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(failureStores).build())
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.cluster.metadata.DataStreamLifecycle;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle.Downsampling;
import org.elasticsearch.cluster.metadata.DataStreamLifecycle.Downsampling.Round;
import org.elasticsearch.cluster.metadata.DataStreamOptions;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexGraveyard;
Expand Down Expand Up @@ -1495,6 +1496,13 @@ public void testTargetIndices() {
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
int numBackingIndices = 3;
int numFailureIndices = 2;
int mutationBranch = randomIntBetween(0, 2);
DataStreamOptions dataStreamOptions = switch (mutationBranch) {
case 0 -> DataStreamOptions.EMPTY;
case 1 -> DataStreamOptions.FAILURE_STORE_ENABLED;
case 2 -> DataStreamOptions.FAILURE_STORE_DISABLED;
default -> throw new IllegalStateException("Unexpected value: " + mutationBranch);
};
Metadata.Builder builder = Metadata.builder();
DataStream dataStream = createDataStream(
builder,
Expand All @@ -1504,7 +1512,7 @@ public void testTargetIndices() {
settings(IndexVersion.current()),
new DataStreamLifecycle(),
now
).copy().setFailureStoreEnabled(randomBoolean()).build(); // failure store is managed even when disabled
).copy().setDataStreamOptions(dataStreamOptions).build(); // failure store is managed even when disabled
builder.put(dataStream);
Metadata metadata = builder.build();
Set<Index> indicesToExclude = Set.of(dataStream.getIndices().get(0), dataStream.getFailureIndices().getIndices().get(0));
Expand Down Expand Up @@ -1536,7 +1544,7 @@ public void testFailureStoreIsManagedEvenWhenDisabled() {
settings(IndexVersion.current()),
DataStreamLifecycle.newBuilder().dataRetention(0).build(),
now
).copy().setFailureStoreEnabled(false).build(); // failure store is managed even when it is disabled
).copy().setDataStreamOptions(DataStreamOptions.FAILURE_STORE_DISABLED).build(); // failure store is managed even when disabled
builder.put(dataStream);

ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ML_INFERENCE_CHUNKING_SETTINGS = def(8_751_00_0);
public static final TransportVersion SEMANTIC_QUERY_INNER_HITS = def(8_752_00_0);
public static final TransportVersion RETAIN_ILM_STEP_INFO = def(8_753_00_0);
public static final TransportVersion ADD_DATA_STREAM_OPTIONS = def(8_754_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public static boolean isFailureStoreFeatureFlagEnabled() {
private final IndexMode indexMode;
@Nullable
private final DataStreamLifecycle lifecycle;
private final boolean failureStoreEnabled;
private final DataStreamOptions dataStreamOptions;

private final DataStreamIndices backingIndices;
private final DataStreamIndices failureIndices;
Expand All @@ -128,7 +128,7 @@ public DataStream(
boolean allowCustomRouting,
IndexMode indexMode,
DataStreamLifecycle lifecycle,
boolean failureStoreEnabled,
@Nullable DataStreamOptions dataStreamOptions,
List<Index> failureIndices,
boolean rolloverOnWrite,
@Nullable DataStreamAutoShardingEvent autoShardingEvent
Expand All @@ -144,7 +144,7 @@ public DataStream(
allowCustomRouting,
indexMode,
lifecycle,
failureStoreEnabled,
dataStreamOptions,
new DataStreamIndices(BACKING_INDEX_PREFIX, List.copyOf(indices), rolloverOnWrite, autoShardingEvent),
new DataStreamIndices(FAILURE_STORE_PREFIX, List.copyOf(failureIndices), false, null)
);
Expand All @@ -162,7 +162,7 @@ public DataStream(
boolean allowCustomRouting,
IndexMode indexMode,
DataStreamLifecycle lifecycle,
boolean failureStoreEnabled,
DataStreamOptions dataStreamOptions,
DataStreamIndices backingIndices,
DataStreamIndices failureIndices
) {
Expand All @@ -177,7 +177,7 @@ public DataStream(
this.allowCustomRouting = allowCustomRouting;
this.indexMode = indexMode;
this.lifecycle = lifecycle;
this.failureStoreEnabled = failureStoreEnabled;
this.dataStreamOptions = dataStreamOptions == null ? DataStreamOptions.EMPTY : dataStreamOptions;
assert backingIndices.indices.isEmpty() == false;
assert replicated == false || (backingIndices.rolloverOnWrite == false && failureIndices.rolloverOnWrite == false)
: "replicated data streams cannot be marked for lazy rollover";
Expand All @@ -198,9 +198,11 @@ public static DataStream read(StreamInput in) throws IOException {
var lifecycle = in.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X)
? in.readOptionalWriteable(DataStreamLifecycle::new)
: null;
var failureStoreEnabled = in.getTransportVersion().onOrAfter(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION)
? in.readBoolean()
: false;
// This boolean flag has been moved in data stream options
var failureStoreEnabled = in.getTransportVersion()
.between(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION, TransportVersions.ADD_DATA_STREAM_OPTIONS)
? in.readBoolean()
: false;
var failureIndices = in.getTransportVersion().onOrAfter(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION)
? readIndices(in)
: List.<Index>of();
Expand All @@ -213,6 +215,14 @@ public static DataStream read(StreamInput in) throws IOException {
failureIndicesBuilder.setRolloverOnWrite(in.readBoolean())
.setAutoShardingEvent(in.readOptionalWriteable(DataStreamAutoShardingEvent::new));
}
DataStreamOptions dataStreamOptions;
if (in.getTransportVersion().onOrAfter(TransportVersions.ADD_DATA_STREAM_OPTIONS)) {
dataStreamOptions = in.readOptionalWriteable(DataStreamOptions::read);
} else {
// We cannot distinguish if failure store was explicitly disabled or not. Given that failure store
// is still behind a feature flag in previous version we use the default value instead of explicitly disabling it.
dataStreamOptions = failureStoreEnabled ? DataStreamOptions.FAILURE_STORE_ENABLED : null;
}
return new DataStream(
name,
generation,
Expand All @@ -224,7 +234,7 @@ public static DataStream read(StreamInput in) throws IOException {
allowCustomRouting,
indexMode,
lifecycle,
failureStoreEnabled,
dataStreamOptions,
backingIndicesBuilder.build(),
failureIndicesBuilder.build()
);
Expand Down Expand Up @@ -274,6 +284,10 @@ public boolean isFailureStoreIndex(String indexName) {
return failureIndices.containsIndex(indexName);
}

public DataStreamOptions getDataStreamOptions() {
return dataStreamOptions;
}

public boolean rolloverOnWrite() {
return backingIndices.rolloverOnWrite;
}
Expand Down Expand Up @@ -406,13 +420,12 @@ public boolean isAllowCustomRouting() {
}

/**
* Determines if this data stream should persist ingest pipeline and mapping failures from bulk requests to a locally
* configured failure store.
*
* @return Whether this data stream should store ingestion failures.
* Determines if this data stream has its failure store enabled or not. Currently, the failure store
* is enabled only when a user has explicitly requested it.
* @return true, if the user has explicitly enabled the failure store.
*/
public boolean isFailureStoreEnabled() {
return failureStoreEnabled;
return dataStreamOptions.failureStore() != null && dataStreamOptions.failureStore().isExplicitlyEnabled();
}

@Nullable
Expand Down Expand Up @@ -1063,8 +1076,11 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_9_X)) {
out.writeOptionalWriteable(lifecycle);
}
if (out.getTransportVersion()
.between(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION, TransportVersions.ADD_DATA_STREAM_OPTIONS)) {
out.writeBoolean(isFailureStoreEnabled());
}
if (out.getTransportVersion().onOrAfter(DataStream.ADDED_FAILURE_STORE_TRANSPORT_VERSION)) {
out.writeBoolean(failureStoreEnabled);
out.writeCollection(failureIndices.indices);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_13_0)) {
Expand All @@ -1077,6 +1093,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(failureIndices.rolloverOnWrite);
out.writeOptionalWriteable(failureIndices.autoShardingEvent);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.ADD_DATA_STREAM_OPTIONS)) {
out.writeOptionalWriteable(dataStreamOptions.isEmpty() ? null : dataStreamOptions);
}
}

public static final ParseField NAME_FIELD = new ParseField("name");
Expand All @@ -1096,6 +1115,7 @@ public void writeTo(StreamOutput out) throws IOException {
public static final ParseField AUTO_SHARDING_FIELD = new ParseField("auto_sharding");
public static final ParseField FAILURE_ROLLOVER_ON_WRITE_FIELD = new ParseField("failure_rollover_on_write");
public static final ParseField FAILURE_AUTO_SHARDING_FIELD = new ParseField("failure_auto_sharding");
public static final ParseField DATA_STREAM_OPTIONS_FIELD = new ParseField("options");

@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>("data_stream", args -> {
Expand All @@ -1110,6 +1130,16 @@ public void writeTo(StreamOutput out) throws IOException {
(DataStreamAutoShardingEvent) args[15]
)
: new DataStreamIndices(FAILURE_STORE_PREFIX, List.of(), false, null);
// We cannot distinguish if failure store was explicitly disabled or not. Given that failure store
// is still behind a feature flag in previous version we use the default value instead of explicitly disabling it.
DataStreamOptions dataStreamOptions = DataStreamOptions.EMPTY;
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
if (args[16] != null) {
dataStreamOptions = (DataStreamOptions) args[16];
} else if (failureStoreEnabled) {
dataStreamOptions = DataStreamOptions.FAILURE_STORE_ENABLED;
}
}
return new DataStream(
(String) args[0],
(Long) args[2],
Expand All @@ -1121,7 +1151,7 @@ public void writeTo(StreamOutput out) throws IOException {
args[7] != null && (boolean) args[7],
args[8] != null ? IndexMode.fromString((String) args[8]) : null,
(DataStreamLifecycle) args[9],
failureStoreEnabled,
dataStreamOptions,
new DataStreamIndices(
BACKING_INDEX_PREFIX,
(List<Index>) args[1],
Expand Down Expand Up @@ -1171,6 +1201,11 @@ public void writeTo(StreamOutput out) throws IOException {
(p, c) -> DataStreamAutoShardingEvent.fromXContent(p),
FAILURE_AUTO_SHARDING_FIELD
);
PARSER.declareObject(
ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> DataStreamOptions.fromXContent(p),
DATA_STREAM_OPTIONS_FIELD
);
}
}

Expand Down Expand Up @@ -1208,7 +1243,6 @@ public XContentBuilder toXContent(
builder.field(SYSTEM_FIELD.getPreferredName(), system);
builder.field(ALLOW_CUSTOM_ROUTING.getPreferredName(), allowCustomRouting);
if (DataStream.isFailureStoreFeatureFlagEnabled()) {
builder.field(FAILURE_STORE_FIELD.getPreferredName(), failureStoreEnabled);
if (failureIndices.indices.isEmpty() == false) {
builder.xContentList(FAILURE_INDICES_FIELD.getPreferredName(), failureIndices.indices);
}
Expand All @@ -1218,6 +1252,10 @@ public XContentBuilder toXContent(
failureIndices.autoShardingEvent.toXContent(builder, params);
builder.endObject();
}
if (dataStreamOptions.isEmpty() == false) {
builder.field(DATA_STREAM_OPTIONS_FIELD.getPreferredName());
dataStreamOptions.toXContent(builder, params);
}
}
if (indexMode != null) {
builder.field(INDEX_MODE.getPreferredName(), indexMode);
Expand Down Expand Up @@ -1250,7 +1288,7 @@ public boolean equals(Object o) {
&& allowCustomRouting == that.allowCustomRouting
&& indexMode == that.indexMode
&& Objects.equals(lifecycle, that.lifecycle)
&& failureStoreEnabled == that.failureStoreEnabled
&& Objects.equals(dataStreamOptions, that.dataStreamOptions)
&& Objects.equals(backingIndices, that.backingIndices)
&& Objects.equals(failureIndices, that.failureIndices);
}
Expand All @@ -1267,7 +1305,7 @@ public int hashCode() {
allowCustomRouting,
indexMode,
lifecycle,
failureStoreEnabled,
dataStreamOptions,
backingIndices,
failureIndices
);
Expand Down Expand Up @@ -1580,7 +1618,7 @@ public static class Builder {
private IndexMode indexMode = null;
@Nullable
private DataStreamLifecycle lifecycle = null;
private boolean failureStoreEnabled = false;
private DataStreamOptions dataStreamOptions = DataStreamOptions.EMPTY;
private DataStreamIndices backingIndices;
private DataStreamIndices failureIndices = DataStreamIndices.failureIndicesBuilder(List.of()).build();

Expand All @@ -1605,7 +1643,7 @@ private Builder(DataStream dataStream) {
allowCustomRouting = dataStream.allowCustomRouting;
indexMode = dataStream.indexMode;
lifecycle = dataStream.lifecycle;
failureStoreEnabled = dataStream.failureStoreEnabled;
dataStreamOptions = dataStream.dataStreamOptions;
backingIndices = dataStream.backingIndices;
failureIndices = dataStream.failureIndices;
}
Expand Down Expand Up @@ -1660,8 +1698,8 @@ public Builder setLifecycle(DataStreamLifecycle lifecycle) {
return this;
}

public Builder setFailureStoreEnabled(boolean failureStoreEnabled) {
this.failureStoreEnabled = failureStoreEnabled;
public Builder setDataStreamOptions(DataStreamOptions dataStreamOptions) {
this.dataStreamOptions = dataStreamOptions;
return this;
}

Expand Down Expand Up @@ -1697,7 +1735,7 @@ public DataStream build() {
allowCustomRouting,
indexMode,
lifecycle,
failureStoreEnabled,
dataStreamOptions,
backingIndices,
failureIndices
);
Expand Down
Loading

0 comments on commit 76c2d8d

Please sign in to comment.