diff --git a/docs/changelog/113143.yaml b/docs/changelog/113143.yaml new file mode 100644 index 0000000000000..4a2044cca0ce4 --- /dev/null +++ b/docs/changelog/113143.yaml @@ -0,0 +1,10 @@ +pr: 113143 +summary: Deprecate dutch_kp and lovins stemmer as they are removed in Lucene 10 +area: Analysis +type: deprecation +issues: [] +deprecation: + title: Deprecate dutch_kp and lovins stemmer as they are removed in Lucene 10 + area: Analysis + details: kp, dutch_kp, dutchKp and lovins stemmers are deprecated and will be removed. + impact: These stemmers will be removed and will be no longer supported. diff --git a/docs/changelog/113723.yaml b/docs/changelog/113723.yaml new file mode 100644 index 0000000000000..2cbcf49102719 --- /dev/null +++ b/docs/changelog/113723.yaml @@ -0,0 +1,6 @@ +pr: 113723 +summary: Fix max file size check to use `getMaxFileSize` +area: Infra/Core +type: bug +issues: + - 113705 diff --git a/docs/reference/analysis/tokenfilters/snowball-tokenfilter.asciidoc b/docs/reference/analysis/tokenfilters/snowball-tokenfilter.asciidoc index bd708096aac48..d8300288c9f4b 100644 --- a/docs/reference/analysis/tokenfilters/snowball-tokenfilter.asciidoc +++ b/docs/reference/analysis/tokenfilters/snowball-tokenfilter.asciidoc @@ -11,6 +11,8 @@ values: `Arabic`, `Armenian`, `Basque`, `Catalan`, `Danish`, `Dutch`, `English`, `Lithuanian`, `Lovins`, `Norwegian`, `Porter`, `Portuguese`, `Romanian`, `Russian`, `Serbian`, `Spanish`, `Swedish`, `Turkish`. +deprecated:[8.16.0, `Kp` and `Lovins` support will be removed in a future version] + For example: [source,console] @@ -28,7 +30,7 @@ PUT /my-index-000001 "filter": { "my_snow": { "type": "snowball", - "language": "Porter" + "language": "English" } } } diff --git a/docs/reference/analysis/tokenfilters/stemmer-tokenfilter.asciidoc b/docs/reference/analysis/tokenfilters/stemmer-tokenfilter.asciidoc index 42ac594fca3bf..4cd088935af19 100644 --- a/docs/reference/analysis/tokenfilters/stemmer-tokenfilter.asciidoc +++ b/docs/reference/analysis/tokenfilters/stemmer-tokenfilter.asciidoc @@ -144,12 +144,12 @@ https://snowballstem.org/algorithms/danish/stemmer.html[*`danish`*] Dutch:: https://snowballstem.org/algorithms/dutch/stemmer.html[*`dutch`*], -https://snowballstem.org/algorithms/kraaij_pohlmann/stemmer.html[`dutch_kp`] +https://snowballstem.org/algorithms/kraaij_pohlmann/stemmer.html[`dutch_kp`] deprecated:[8.16.0, `dutch_kp` will be removed in a future version] English:: https://snowballstem.org/algorithms/porter/stemmer.html[*`english`*], https://ciir.cs.umass.edu/pubfiles/ir-35.pdf[`light_english`], -https://snowballstem.org/algorithms/lovins/stemmer.html[`lovins`], +https://snowballstem.org/algorithms/lovins/stemmer.html[`lovins`] deprecated:[8.16.0, `lovins` will be removed in a future version], https://www.researchgate.net/publication/220433848_How_effective_is_suffixing[`minimal_english`], https://snowballstem.org/algorithms/english/stemmer.html[`porter2`], {lucene-analysis-docs}/en/EnglishPossessiveFilter.html[`possessive_english`] diff --git a/docs/reference/cluster/stats.asciidoc b/docs/reference/cluster/stats.asciidoc index 575a6457804a6..8e4f630ef7da4 100644 --- a/docs/reference/cluster/stats.asciidoc +++ b/docs/reference/cluster/stats.asciidoc @@ -40,6 +40,10 @@ If a node does not respond before its timeout expires, the response does not inc However, timed out nodes are included in the response's `_nodes.failed` property. Defaults to no timeout. +`include_remotes`:: +(Optional, Boolean) If `true`, includes remote cluster information in the response. +Defaults to `false`, so no remote cluster information is returned. + [role="child_attributes"] [[cluster-stats-api-response-body]] ==== {api-response-body-title} @@ -183,12 +187,11 @@ This number is based on documents in Lucene segments and may include documents f This number is based on documents in Lucene segments. {es} reclaims the disk space of deleted Lucene documents when a segment is merged. `total_size_in_bytes`:: -(integer) -Total size in bytes across all primary shards assigned to selected nodes. +(integer) Total size in bytes across all primary shards assigned to selected nodes. `total_size`:: -(string) -Total size across all primary shards assigned to selected nodes, as a human-readable string. +(string) Total size across all primary shards assigned to selected nodes, as a human-readable string. + ===== `store`:: @@ -1285,8 +1288,7 @@ They are included here for expert users, but should otherwise be ignored. ==== `repositories`:: -(object) Contains statistics about the <> repositories defined in the cluster, broken down -by repository type. +(object) Contains statistics about the <> repositories defined in the cluster, broken down by repository type. + .Properties of `repositories` [%collapsible%open] @@ -1314,13 +1316,74 @@ Each repository type may also include other statistics about the repositories of [%collapsible%open] ===== +`clusters`::: +(object) Contains remote cluster settings and metrics collected from them. +The keys are cluster names, and the values are per-cluster data. +Only present if `include_remotes` option is set to `true`. + ++ +.Properties of `clusters` +[%collapsible%open] +====== + +`cluster_uuid`::: +(string) The UUID of the remote cluster. + +`mode`::: +(string) The <> used to communicate with the remote cluster. + +`skip_unavailable`::: +(Boolean) The `skip_unavailable` <> used for this remote cluster. + +`transport.compress`::: +(string) Transport compression setting used for this remote cluster. + +`version`::: +(array of strings) The list of {es} versions used by the nodes on the remote cluster. + +`status`::: +include::{es-ref-dir}/rest-api/common-parms.asciidoc[tag=cluster-health-status] ++ +See <>. + +`nodes_count`::: +(integer) The total count of nodes in the remote cluster. + +`shards_count`::: +(integer) The total number of shards in the remote cluster. + +`indices_count`::: +(integer) The total number of indices in the remote cluster. + +`indices_total_size_in_bytes`::: +(integer) Total data set size, in bytes, of all shards assigned to selected nodes. + +`indices_total_size`::: +(string) Total data set size, in bytes, of all shards assigned to selected nodes, as a human-readable string. + +`max_heap_in_bytes`::: +(integer) Maximum amount of memory, in bytes, available for use by the heap across the nodes of the remote cluster. + +`max_heap`::: +(string) Maximum amount of memory, in bytes, available for use by the heap across the nodes of the remote cluster, +as a human-readable string. + +`mem_total_in_bytes`::: +(integer) Total amount, in bytes, of physical memory across the nodes of the remote cluster. + +`mem_total`::: +(string) Total amount, in bytes, of physical memory across the nodes of the remote cluster, as a human-readable string. + +====== + `_search`::: -(object) Contains the telemetry information about the <> usage in the cluster. +(object) Contains the information about the <> usage in the cluster. + .Properties of `_search` [%collapsible%open] ====== + `total`::: (integer) The total number of {ccs} requests that have been executed by the cluster. @@ -1336,6 +1399,7 @@ Each repository type may also include other statistics about the repositories of .Properties of `took` [%collapsible%open] ======= + `max`::: (integer) The maximum time taken to execute a {ccs} request, in milliseconds. @@ -1344,6 +1408,7 @@ Each repository type may also include other statistics about the repositories of `p90`::: (integer) The 90th percentile of the time taken to execute {ccs} requests, in milliseconds. + ======= `took_mrt_true`:: @@ -1361,6 +1426,7 @@ Each repository type may also include other statistics about the repositories of `p90`::: (integer) The 90th percentile of the time taken to execute {ccs} requests, in milliseconds. + ======= `took_mrt_false`:: @@ -1378,6 +1444,7 @@ Each repository type may also include other statistics about the repositories of `p90`::: (integer) The 90th percentile of the time taken to execute {ccs} requests, in milliseconds. + ======= `remotes_per_search_max`:: @@ -1391,9 +1458,10 @@ Each repository type may also include other statistics about the repositories of The keys are the failure reason names and the values are the number of requests that failed for that reason. `features`:: -(object) Contains statistics about the features used in {ccs} requests. The keys are the names of the search feature, -and the values are the number of requests that used that feature. Single request can use more than one feature -(e.g. both `async` and `wildcard`). Known features are: +(object) Contains statistics about the features used in {ccs} requests. +The keys are the names of the search feature, and the values are the number of requests that used that feature. +Single request can use more than one feature (e.g. both `async` and `wildcard`). +Known features are: * `async` - <> @@ -1427,6 +1495,7 @@ This may include requests where partial results were returned, but not requests .Properties of `took` [%collapsible%open] ======== + `max`::: (integer) The maximum time taken to execute a {ccs} request, in milliseconds. @@ -1435,6 +1504,7 @@ This may include requests where partial results were returned, but not requests `p90`::: (integer) The 90th percentile of the time taken to execute {ccs} requests, in milliseconds. + ======== ======= @@ -1812,3 +1882,37 @@ This API can be restricted to a subset of the nodes using < remoteClusterAlias() { + return List.of(REMOTE1, REMOTE2); + } + + @Override + protected Map skipUnavailableForRemoteClusters() { + return Map.of(REMOTE1, false, REMOTE2, true); + } + + public void testRemoteClusterStats() throws ExecutionException, InterruptedException { + setupClusters(); + final Client client = client(LOCAL_CLUSTER); + SearchRequest searchRequest = new SearchRequest("*", "*:*"); + searchRequest.allowPartialSearchResults(false); + searchRequest.setCcsMinimizeRoundtrips(randomBoolean()); + searchRequest.source(new SearchSourceBuilder().query(new MatchAllQueryBuilder()).size(10)); + + // do a search + assertResponse(cluster(LOCAL_CLUSTER).client().search(searchRequest), Assert::assertNotNull); + // collect stats without remotes + ClusterStatsResponse response = client.admin().cluster().prepareClusterStats().get(); + assertNotNull(response.getCcsMetrics()); + var remotesUsage = response.getCcsMetrics().getByRemoteCluster(); + assertThat(remotesUsage.size(), equalTo(3)); + assertNull(response.getRemoteClustersStats()); + // collect stats with remotes + response = client.admin().cluster().execute(TransportClusterStatsAction.TYPE, new ClusterStatsRequest(true)).get(); + assertNotNull(response.getCcsMetrics()); + remotesUsage = response.getCcsMetrics().getByRemoteCluster(); + assertThat(remotesUsage.size(), equalTo(3)); + assertNotNull(response.getRemoteClustersStats()); + var remoteStats = response.getRemoteClustersStats(); + assertThat(remoteStats.size(), equalTo(2)); + for (String clusterAlias : remoteClusterAlias()) { + assertThat(remoteStats, hasKey(clusterAlias)); + assertThat(remotesUsage, hasKey(clusterAlias)); + assertThat(remoteStats.get(clusterAlias).status(), equalToIgnoringCase(ClusterHealthStatus.GREEN.name())); + assertThat(remoteStats.get(clusterAlias).indicesCount(), greaterThan(0L)); + assertThat(remoteStats.get(clusterAlias).nodesCount(), greaterThan(0L)); + assertThat(remoteStats.get(clusterAlias).shardsCount(), greaterThan(0L)); + assertThat(remoteStats.get(clusterAlias).heapBytes(), greaterThan(0L)); + assertThat(remoteStats.get(clusterAlias).memBytes(), greaterThan(0L)); + assertThat(remoteStats.get(clusterAlias).indicesBytes(), greaterThan(0L)); + assertThat(remoteStats.get(clusterAlias).versions(), hasItem(Version.CURRENT.toString())); + assertThat(remoteStats.get(clusterAlias).clusterUUID(), not(equalTo(""))); + assertThat(remoteStats.get(clusterAlias).mode(), oneOf("sniff", "proxy")); + } + assertFalse(remoteStats.get(REMOTE1).skipUnavailable()); + assertTrue(remoteStats.get(REMOTE2).skipUnavailable()); + } + + private void setupClusters() { + int numShardsLocal = randomIntBetween(2, 5); + Settings localSettings = indexSettings(numShardsLocal, randomIntBetween(0, 1)).build(); + assertAcked( + client(LOCAL_CLUSTER).admin() + .indices() + .prepareCreate(INDEX_NAME) + .setSettings(localSettings) + .setMapping("@timestamp", "type=date", "f", "type=text") + ); + indexDocs(client(LOCAL_CLUSTER)); + + int numShardsRemote = randomIntBetween(2, 10); + for (String clusterAlias : remoteClusterAlias()) { + final InternalTestCluster remoteCluster = cluster(clusterAlias); + remoteCluster.ensureAtLeastNumDataNodes(randomIntBetween(1, 3)); + assertAcked( + client(clusterAlias).admin() + .indices() + .prepareCreate(INDEX_NAME) + .setSettings(indexSettings(numShardsRemote, randomIntBetween(0, 1))) + .setMapping("@timestamp", "type=date", "f", "type=text") + ); + assertFalse( + client(clusterAlias).admin() + .cluster() + .prepareHealth(TEST_REQUEST_TIMEOUT, INDEX_NAME) + .setWaitForGreenStatus() + .setTimeout(TimeValue.timeValueSeconds(30)) + .get() + .isTimedOut() + ); + indexDocs(client(clusterAlias)); + } + + } + + private void indexDocs(Client client) { + int numDocs = between(5, 20); + for (int i = 0; i < numDocs; i++) { + client.prepareIndex(INDEX_NAME).setSource("f", "v", "@timestamp", randomNonNegativeLong()).get(); + } + client.admin().indices().prepareRefresh(INDEX_NAME).get(); + } + +} diff --git a/server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/IndicesMetricsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/IndicesMetricsIT.java new file mode 100644 index 0000000000000..b72257b884f08 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/IndicesMetricsIT.java @@ -0,0 +1,245 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.monitor.metrics; + +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.telemetry.Measurement; +import org.elasticsearch.telemetry.TestTelemetryPlugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.hamcrest.Matcher; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.index.mapper.DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) +public class IndicesMetricsIT extends ESIntegTestCase { + + public static class TestAPMInternalSettings extends Plugin { + @Override + public List> getSettings() { + return List.of( + Setting.timeSetting("telemetry.agent.metrics_interval", TimeValue.timeValueSeconds(0), Setting.Property.NodeScope) + ); + } + } + + @Override + protected Collection> nodePlugins() { + return List.of(TestTelemetryPlugin.class, TestAPMInternalSettings.class); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + .put("telemetry.agent.metrics_interval", TimeValue.timeValueSeconds(0)) // disable metrics cache refresh delay + .build(); + } + + static final String STANDARD_INDEX_COUNT = "es.indices.standard.total"; + static final String STANDARD_DOCS_COUNT = "es.indices.standard.docs.total"; + static final String STANDARD_BYTES_SIZE = "es.indices.standard.bytes.total"; + + static final String TIME_SERIES_INDEX_COUNT = "es.indices.time_series.total"; + static final String TIME_SERIES_DOCS_COUNT = "es.indices.time_series.docs.total"; + static final String TIME_SERIES_BYTES_SIZE = "es.indices.time_series.bytes.total"; + + static final String LOGSDB_INDEX_COUNT = "es.indices.logsdb.total"; + static final String LOGSDB_DOCS_COUNT = "es.indices.logsdb.docs.total"; + static final String LOGSDB_BYTES_SIZE = "es.indices.logsdb.bytes.total"; + + public void testIndicesMetrics() { + String node = internalCluster().startNode(); + ensureStableCluster(1); + final TestTelemetryPlugin telemetry = internalCluster().getInstance(PluginsService.class, node) + .filterPlugins(TestTelemetryPlugin.class) + .findFirst() + .orElseThrow(); + telemetry.resetMeter(); + long numStandardIndices = randomIntBetween(1, 5); + long numStandardDocs = populateStandardIndices(numStandardIndices); + collectThenAssertMetrics( + telemetry, + 1, + Map.of( + STANDARD_INDEX_COUNT, + equalTo(numStandardIndices), + STANDARD_DOCS_COUNT, + equalTo(numStandardDocs), + STANDARD_BYTES_SIZE, + greaterThan(0L), + + TIME_SERIES_INDEX_COUNT, + equalTo(0L), + TIME_SERIES_DOCS_COUNT, + equalTo(0L), + TIME_SERIES_BYTES_SIZE, + equalTo(0L), + + LOGSDB_INDEX_COUNT, + equalTo(0L), + LOGSDB_DOCS_COUNT, + equalTo(0L), + LOGSDB_BYTES_SIZE, + equalTo(0L) + ) + ); + + long numTimeSeriesIndices = randomIntBetween(1, 5); + long numTimeSeriesDocs = populateTimeSeriesIndices(numTimeSeriesIndices); + collectThenAssertMetrics( + telemetry, + 2, + Map.of( + STANDARD_INDEX_COUNT, + equalTo(numStandardIndices), + STANDARD_DOCS_COUNT, + equalTo(numStandardDocs), + STANDARD_BYTES_SIZE, + greaterThan(0L), + + TIME_SERIES_INDEX_COUNT, + equalTo(numTimeSeriesIndices), + TIME_SERIES_DOCS_COUNT, + equalTo(numTimeSeriesDocs), + TIME_SERIES_BYTES_SIZE, + greaterThan(20L), + + LOGSDB_INDEX_COUNT, + equalTo(0L), + LOGSDB_DOCS_COUNT, + equalTo(0L), + LOGSDB_BYTES_SIZE, + equalTo(0L) + ) + ); + + long numLogsdbIndices = randomIntBetween(1, 5); + long numLogsdbDocs = populateLogsdbIndices(numLogsdbIndices); + collectThenAssertMetrics( + telemetry, + 3, + Map.of( + STANDARD_INDEX_COUNT, + equalTo(numStandardIndices), + STANDARD_DOCS_COUNT, + equalTo(numStandardDocs), + STANDARD_BYTES_SIZE, + greaterThan(0L), + + TIME_SERIES_INDEX_COUNT, + equalTo(numTimeSeriesIndices), + TIME_SERIES_DOCS_COUNT, + equalTo(numTimeSeriesDocs), + TIME_SERIES_BYTES_SIZE, + greaterThan(20L), + + LOGSDB_INDEX_COUNT, + equalTo(numLogsdbIndices), + LOGSDB_DOCS_COUNT, + equalTo(numLogsdbDocs), + LOGSDB_BYTES_SIZE, + greaterThan(0L) + ) + ); + } + + void collectThenAssertMetrics(TestTelemetryPlugin telemetry, int times, Map> matchers) { + telemetry.collect(); + for (Map.Entry> e : matchers.entrySet()) { + String name = e.getKey(); + List measurements = telemetry.getLongGaugeMeasurement(name); + assertThat(name, measurements, hasSize(times)); + assertThat(name, measurements.getLast().getLong(), e.getValue()); + } + } + + int populateStandardIndices(long numIndices) { + int totalDocs = 0; + for (int i = 0; i < numIndices; i++) { + String indexName = "standard-" + i; + createIndex(indexName); + int numDocs = between(1, 5); + for (int d = 0; d < numDocs; d++) { + indexDoc(indexName, Integer.toString(d), "f", Integer.toString(d)); + } + totalDocs += numDocs; + flush(indexName); + } + return totalDocs; + } + + int populateTimeSeriesIndices(long numIndices) { + int totalDocs = 0; + for (int i = 0; i < numIndices; i++) { + String indexName = "time_series-" + i; + Settings settings = Settings.builder().put("mode", "time_series").putList("routing_path", List.of("host")).build(); + client().admin() + .indices() + .prepareCreate(indexName) + .setSettings(settings) + .setMapping( + "@timestamp", + "type=date", + "host", + "type=keyword,time_series_dimension=true", + "cpu", + "type=long,time_series_metric=gauge" + ) + .get(); + long timestamp = DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-04-15T00:00:00Z"); + int numDocs = between(1, 5); + for (int d = 0; d < numDocs; d++) { + timestamp += between(1, 5) * 1000L; + client().prepareIndex(indexName) + .setSource("@timestamp", timestamp, "host", randomFrom("prod", "qa"), "cpu", randomIntBetween(1, 100)) + .get(); + } + totalDocs += numDocs; + flush(indexName); + } + return totalDocs; + } + + int populateLogsdbIndices(long numIndices) { + int totalDocs = 0; + for (int i = 0; i < numIndices; i++) { + String indexName = "logsdb-" + i; + Settings settings = Settings.builder().put("mode", "logsdb").build(); + client().admin() + .indices() + .prepareCreate(indexName) + .setSettings(settings) + .setMapping("@timestamp", "type=date", "host.name", "type=keyword", "cpu", "type=long") + .get(); + long timestamp = DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-04-15T00:00:00Z"); + int numDocs = between(1, 5); + for (int d = 0; d < numDocs; d++) { + timestamp += between(1, 5) * 1000L; + client().prepareIndex(indexName) + .setSource("@timestamp", timestamp, "host.name", randomFrom("prod", "qa"), "cpu", randomIntBetween(1, 100)) + .get(); + } + totalDocs += numDocs; + flush(indexName); + } + return totalDocs; + } +} diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index c233fa9ec2c2b..2ed16bc89126e 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -228,7 +228,8 @@ static TransportVersion def(int id) { public static final TransportVersion SEMANTIC_QUERY_INNER_HITS = def(8_752_00_0); public static final TransportVersion RETAIN_ILM_STEP_INFO = def(8_753_00_0); public static final TransportVersion ADD_DATA_STREAM_OPTIONS = def(8_754_00_0); - public static final TransportVersion REGEX_AND_RANGE_INTERVAL_QUERIES = def(8_755_00_0); + public static final TransportVersion CCS_REMOTE_TELEMETRY_STATS = def(8_755_00_0); + public static final TransportVersion REGEX_AND_RANGE_INTERVAL_QUERIES = def(8_756_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRequest.java index d9c55ba097b6c..a62db92687e5a 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRequest.java @@ -20,16 +20,50 @@ * A request to get cluster level stats. */ public class ClusterStatsRequest extends BaseNodesRequest { + /** + * Should the remote cluster stats be included in the response. + */ + private final boolean doRemotes; + /** + * Return stripped down stats for remote clusters. + */ + private boolean remoteStats; + /** * Get stats from nodes based on the nodes ids specified. If none are passed, stats * based on all nodes will be returned. */ public ClusterStatsRequest(String... nodesIds) { + this(false, nodesIds); + } + + public ClusterStatsRequest(boolean doRemotes, String... nodesIds) { super(nodesIds); + this.doRemotes = doRemotes; + this.remoteStats = false; } @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { return new CancellableTask(id, type, action, "", parentTaskId, headers); } + + public ClusterStatsRequest asRemoteStats() { + this.remoteStats = true; + return this; + } + + /** + * Should the remote cluster stats be included in the response. + */ + public boolean doRemotes() { + return doRemotes; + } + + /** + * Should the response be a stripped down version of the stats for remote clusters. + */ + public boolean isRemoteStats() { + return remoteStats; + } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java index 86900f830f4be..1a77a3d4d5399 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java @@ -18,12 +18,15 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.xcontent.ToXContentFragment; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; import java.util.List; import java.util.Locale; +import java.util.Map; +import java.util.Set; import static org.elasticsearch.action.search.TransportSearchAction.CCS_TELEMETRY_FEATURE_FLAG; @@ -34,10 +37,10 @@ public class ClusterStatsResponse extends BaseNodesResponse remoteClustersStats; public ClusterStatsResponse( long timestamp, @@ -48,7 +51,8 @@ public ClusterStatsResponse( MappingStats mappingStats, AnalysisStats analysisStats, VersionStats versionStats, - ClusterSnapshotStats clusterSnapshotStats + ClusterSnapshotStats clusterSnapshotStats, + Map remoteClustersStats ) { super(clusterName, nodes, failures); this.clusterUUID = clusterUUID; @@ -75,6 +79,7 @@ public ClusterStatsResponse( // stats should be the same on every node so just pick one of them .findAny() .orElse(RepositoryUsageStats.EMPTY); + this.remoteClustersStats = remoteClustersStats; } public String getClusterUUID() { @@ -101,6 +106,10 @@ public CCSTelemetrySnapshot getCcsMetrics() { return ccsMetrics; } + public Map getRemoteClustersStats() { + return remoteClustersStats; + } + @Override public void writeTo(StreamOutput out) throws IOException { TransportAction.localOnly(); @@ -138,6 +147,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (CCS_TELEMETRY_FEATURE_FLAG.isEnabled()) { builder.startObject("ccs"); + if (remoteClustersStats != null) { + builder.field("clusters", remoteClustersStats); + } ccsMetrics.toXContent(builder, params); builder.endObject(); } @@ -150,4 +162,74 @@ public String toString() { return Strings.toString(this, true, true); } + /** + * Represents the information about a remote cluster. + */ + public record RemoteClusterStats( + String clusterUUID, + String mode, + boolean skipUnavailable, + String transportCompress, + Set versions, + String status, + long nodesCount, + long shardsCount, + long indicesCount, + long indicesBytes, + long heapBytes, + long memBytes + ) implements ToXContentFragment { + public RemoteClusterStats(String mode, boolean skipUnavailable, String transportCompress) { + this( + "unavailable", + mode, + skipUnavailable, + transportCompress.toLowerCase(Locale.ROOT), + Set.of(), + "unavailable", + 0, + 0, + 0, + 0, + 0, + 0 + ); + } + + public RemoteClusterStats acceptResponse(RemoteClusterStatsResponse remoteResponse) { + return new RemoteClusterStats( + remoteResponse.getClusterUUID(), + mode, + skipUnavailable, + transportCompress, + remoteResponse.getVersions(), + remoteResponse.getStatus().name().toLowerCase(Locale.ROOT), + remoteResponse.getNodesCount(), + remoteResponse.getShardsCount(), + remoteResponse.getIndicesCount(), + remoteResponse.getIndicesBytes(), + remoteResponse.getHeapBytes(), + remoteResponse.getMemBytes() + ); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("cluster_uuid", clusterUUID); + builder.field("mode", mode); + builder.field("skip_unavailable", skipUnavailable); + builder.field("transport.compress", transportCompress); + builder.field("status", status); + builder.field("version", versions); + builder.field("nodes_count", nodesCount); + builder.field("shards_count", shardsCount); + builder.field("indices_count", indicesCount); + builder.humanReadableField("indices_total_size_in_bytes", "indices_total_size", ByteSizeValue.ofBytes(indicesBytes)); + builder.humanReadableField("max_heap_in_bytes", "max_heap", ByteSizeValue.ofBytes(heapBytes)); + builder.humanReadableField("mem_total_in_bytes", "mem_total", ByteSizeValue.ofBytes(memBytes)); + builder.endObject(); + return builder; + } + } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterStatsRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterStatsRequest.java new file mode 100644 index 0000000000000..47843a91351ee --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterStatsRequest.java @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.cluster.stats; + +import org.elasticsearch.TransportVersions; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * A request to get cluster level stats from the remote cluster. + */ +public class RemoteClusterStatsRequest extends ActionRequest { + public RemoteClusterStatsRequest(StreamInput in) throws IOException { + super(in); + } + + public RemoteClusterStatsRequest() { + super(); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + assert out.getTransportVersion().onOrAfter(TransportVersions.CCS_REMOTE_TELEMETRY_STATS) + : "RemoteClusterStatsRequest is not supported by the remote cluster"; + if (out.getTransportVersion().before(TransportVersions.CCS_REMOTE_TELEMETRY_STATS)) { + throw new UnsupportedOperationException("RemoteClusterStatsRequest is not supported by the remote cluster"); + } + super.writeTo(out); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterStatsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterStatsResponse.java new file mode 100644 index 0000000000000..9a140b6b7424e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterStatsResponse.java @@ -0,0 +1,116 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ +package org.elasticsearch.action.admin.cluster.stats; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Set; + +/** + * Trimmed down cluster stats response for reporting to a remote cluster. + */ +public class RemoteClusterStatsResponse extends ActionResponse { + final String clusterUUID; + final ClusterHealthStatus status; + private final Set versions; + private final long nodesCount; + private final long shardsCount; + private final long indicesCount; + private final long indicesBytes; + private final long heapBytes; + private final long memBytes; + + public Set getVersions() { + return versions; + } + + public long getNodesCount() { + return nodesCount; + } + + public long getShardsCount() { + return shardsCount; + } + + public long getIndicesCount() { + return indicesCount; + } + + public long getIndicesBytes() { + return indicesBytes; + } + + public long getHeapBytes() { + return heapBytes; + } + + public long getMemBytes() { + return memBytes; + } + + public RemoteClusterStatsResponse( + String clusterUUID, + ClusterHealthStatus status, + Set versions, + long nodesCount, + long shardsCount, + long indicesCount, + long indicesBytes, + long heapBytes, + long memBytes + ) { + this.clusterUUID = clusterUUID; + this.status = status; + this.versions = versions; + this.nodesCount = nodesCount; + this.shardsCount = shardsCount; + this.indicesCount = indicesCount; + this.indicesBytes = indicesBytes; + this.heapBytes = heapBytes; + this.memBytes = memBytes; + } + + public String getClusterUUID() { + return this.clusterUUID; + } + + public ClusterHealthStatus getStatus() { + return this.status; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(clusterUUID); + status.writeTo(out); + out.writeStringCollection(versions); + out.writeLong(nodesCount); + out.writeLong(shardsCount); + out.writeLong(indicesCount); + out.writeLong(indicesBytes); + out.writeLong(heapBytes); + out.writeLong(memBytes); + } + + public RemoteClusterStatsResponse(StreamInput in) throws IOException { + super(in); + this.clusterUUID = in.readString(); + this.status = ClusterHealthStatus.readFrom(in); + this.versions = in.readCollectionAsSet(StreamInput::readString); + this.nodesCount = in.readLong(); + this.shardsCount = in.readLong(); + this.indicesCount = in.readLong(); + this.indicesBytes = in.readLong(); + this.heapBytes = in.readLong(); + this.memBytes = in.readLong(); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 6cac8c8f8ca09..ab68f1d8481fd 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -9,6 +9,8 @@ package org.elasticsearch.action.admin.cluster.stats; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; @@ -16,10 +18,12 @@ import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse.RemoteClusterStats; import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.CancellableFanOut; import org.elasticsearch.action.support.RefCountingListener; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.nodes.TransportNodesAction; @@ -32,6 +36,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CancellableSingleObjectCache; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.UpdateForV9; @@ -48,6 +53,9 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClusterConnection; +import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.RemoteConnectionInfo; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.Transports; @@ -56,12 +64,19 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import java.util.function.BiFunction; import java.util.function.BooleanSupplier; +import java.util.stream.Collectors; +import static org.elasticsearch.TransportVersions.CCS_REMOTE_TELEMETRY_STATS; + +/** + * Transport action implementing _cluster/stats API. + */ public class TransportClusterStatsAction extends TransportNodesAction< ClusterStatsRequest, ClusterStatsResponse, @@ -70,6 +85,7 @@ public class TransportClusterStatsAction extends TransportNodesAction< SubscribableListener> { public static final ActionType TYPE = new ActionType<>("cluster:monitor/stats"); + private static final CommonStatsFlags SHARD_STATS_FLAGS = new CommonStatsFlags( CommonStatsFlags.Flag.Docs, CommonStatsFlags.Flag.Store, @@ -80,7 +96,9 @@ public class TransportClusterStatsAction extends TransportNodesAction< CommonStatsFlags.Flag.DenseVector, CommonStatsFlags.Flag.SparseVector ); + private static final Logger logger = LogManager.getLogger(TransportClusterStatsAction.class); + private final Settings settings; private final NodeService nodeService; private final IndicesService indicesService; private final RepositoriesService repositoriesService; @@ -90,6 +108,8 @@ public class TransportClusterStatsAction extends TransportNodesAction< private final Executor clusterStateStatsExecutor; private final MetadataStatsCache mappingStatsCache; private final MetadataStatsCache analysisStatsCache; + private final RemoteClusterService remoteClusterService; + private final TransportRemoteClusterStatsAction remoteClusterStatsAction; @Inject public TransportClusterStatsAction( @@ -100,7 +120,9 @@ public TransportClusterStatsAction( IndicesService indicesService, RepositoriesService repositoriesService, UsageService usageService, - ActionFilters actionFilters + ActionFilters actionFilters, + Settings settings, + TransportRemoteClusterStatsAction remoteClusterStatsAction ) { super( TYPE.name(), @@ -118,6 +140,9 @@ public TransportClusterStatsAction( this.clusterStateStatsExecutor = threadPool.executor(ThreadPool.Names.MANAGEMENT); this.mappingStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), MappingStats::of); this.analysisStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), AnalysisStats::of); + this.remoteClusterService = transportService.getRemoteClusterService(); + this.settings = settings; + this.remoteClusterStatsAction = remoteClusterStatsAction; } @Override @@ -125,14 +150,13 @@ protected SubscribableListener createActionContext(Task task, C assert task instanceof CancellableTask; final var cancellableTask = (CancellableTask) task; final var additionalStatsListener = new SubscribableListener(); - AdditionalStats.compute( - cancellableTask, - clusterStateStatsExecutor, - clusterService, - mappingStatsCache, - analysisStatsCache, - additionalStatsListener - ); + if (request.isRemoteStats() == false) { + final AdditionalStats additionalStats = new AdditionalStats(); + additionalStats.compute(cancellableTask, request, additionalStatsListener); + } else { + // For remote stats request, we don't need to compute anything + additionalStatsListener.onResponse(null); + } return additionalStatsListener; } @@ -150,18 +174,34 @@ protected void newResponseAsync( + "the cluster state that are too slow for a transport thread" ); assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.MANAGEMENT); + additionalStatsListener.andThenApply( - additionalStats -> new ClusterStatsResponse( - System.currentTimeMillis(), - additionalStats.clusterUUID(), - clusterService.getClusterName(), - responses, - failures, - additionalStats.mappingStats(), - additionalStats.analysisStats(), - VersionStats.of(clusterService.state().metadata(), responses), - additionalStats.clusterSnapshotStats() - ) + additionalStats -> request.isRemoteStats() + // Return stripped down stats for remote clusters + ? new ClusterStatsResponse( + System.currentTimeMillis(), + clusterService.state().metadata().clusterUUID(), + clusterService.getClusterName(), + responses, + List.of(), + null, + null, + null, + null, + Map.of() + ) + : new ClusterStatsResponse( + System.currentTimeMillis(), + additionalStats.clusterUUID(), + clusterService.getClusterName(), + responses, + failures, + additionalStats.mappingStats(), + additionalStats.analysisStats(), + VersionStats.of(clusterService.state().metadata(), responses), + additionalStats.clusterSnapshotStats(), + additionalStats.getRemoteStats() + ) ).addListener(listener); } @@ -315,36 +355,33 @@ protected boolean isFresh(Long currentKey, Long newKey) { } } - public static final class AdditionalStats { + public final class AdditionalStats { private String clusterUUID; private MappingStats mappingStats; private AnalysisStats analysisStats; private ClusterSnapshotStats clusterSnapshotStats; + private Map remoteStats; - static void compute( - CancellableTask task, - Executor executor, - ClusterService clusterService, - MetadataStatsCache mappingStatsCache, - MetadataStatsCache analysisStatsCache, - ActionListener listener - ) { - executor.execute(ActionRunnable.wrap(listener, l -> { + void compute(CancellableTask task, ClusterStatsRequest request, ActionListener listener) { + clusterStateStatsExecutor.execute(ActionRunnable.wrap(listener, l -> { task.ensureNotCancelled(); - final var result = new AdditionalStats(); - result.compute( + internalCompute( + task, + request, clusterService.state(), mappingStatsCache, analysisStatsCache, task::isCancelled, clusterService.threadPool().absoluteTimeInMillis(), - l.map(ignored -> result) + l.map(ignored -> this) ); })); } - private void compute( + private void internalCompute( + CancellableTask task, + ClusterStatsRequest request, ClusterState clusterState, MetadataStatsCache mappingStatsCache, MetadataStatsCache analysisStatsCache, @@ -358,6 +395,18 @@ private void compute( mappingStatsCache.get(metadata, isCancelledSupplier, listeners.acquire(s -> mappingStats = s)); analysisStatsCache.get(metadata, isCancelledSupplier, listeners.acquire(s -> analysisStats = s)); clusterSnapshotStats = ClusterSnapshotStats.of(clusterState, absoluteTimeInMillis); + if (doRemotes(request)) { + var remotes = remoteClusterService.getRegisteredRemoteClusterNames(); + if (remotes.isEmpty()) { + remoteStats = Map.of(); + } else { + new RemoteStatsFanout(task, transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION)).start( + task, + remotes, + listeners.acquire(s -> remoteStats = s) + ); + } + } } } @@ -376,5 +425,79 @@ AnalysisStats analysisStats() { ClusterSnapshotStats clusterSnapshotStats() { return clusterSnapshotStats; } + + public Map getRemoteStats() { + return remoteStats; + } + } + + private static boolean doRemotes(ClusterStatsRequest request) { + return request.doRemotes(); + } + + private class RemoteStatsFanout extends CancellableFanOut> { + private final Executor requestExecutor; + private final TaskId taskId; + private Map remoteClustersStats; + + RemoteStatsFanout(Task task, Executor requestExecutor) { + this.requestExecutor = requestExecutor; + this.taskId = new TaskId(clusterService.getNodeName(), task.getId()); + } + + @Override + protected void sendItemRequest(String clusterAlias, ActionListener listener) { + var remoteClusterClient = remoteClusterService.getRemoteClusterClient( + clusterAlias, + requestExecutor, + RemoteClusterService.DisconnectedStrategy.RECONNECT_IF_DISCONNECTED + ); + var remoteRequest = new RemoteClusterStatsRequest(); + remoteRequest.setParentTask(taskId); + remoteClusterClient.getConnection(remoteRequest, listener.delegateFailureAndWrap((responseListener, connection) -> { + if (connection.getTransportVersion().before(CCS_REMOTE_TELEMETRY_STATS)) { + responseListener.onResponse(null); + } else { + remoteClusterClient.execute(connection, TransportRemoteClusterStatsAction.REMOTE_TYPE, remoteRequest, responseListener); + } + })); + } + + @Override + protected void onItemResponse(String clusterAlias, RemoteClusterStatsResponse response) { + if (response != null) { + remoteClustersStats.computeIfPresent(clusterAlias, (k, v) -> v.acceptResponse(response)); + } + } + + @Override + protected void onItemFailure(String clusterAlias, Exception e) { + logger.warn("Failed to get remote cluster stats for [{}]: {}", clusterAlias, e); + } + + void start(Task task, Collection remotes, ActionListener> listener) { + this.remoteClustersStats = remotes.stream().collect(Collectors.toConcurrentMap(r -> r, this::makeRemoteClusterStats)); + super.run(task, remotes.iterator(), listener); + } + + /** + * Create static portion of RemoteClusterStats for a given cluster alias. + */ + RemoteClusterStats makeRemoteClusterStats(String clusterAlias) { + RemoteClusterConnection remoteConnection = remoteClusterService.getRemoteClusterConnection(clusterAlias); + RemoteConnectionInfo remoteConnectionInfo = remoteConnection.getConnectionInfo(); + var compression = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings); + return new RemoteClusterStats( + remoteConnectionInfo.getModeInfo().modeName(), + remoteConnection.isSkipUnavailable(), + compression.toString() + ); + } + + @Override + protected Map onCompletion() { + return remoteClustersStats; + } } + } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportRemoteClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportRemoteClusterStatsAction.java new file mode 100644 index 0000000000000..4d57f10807af6 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportRemoteClusterStatsAction.java @@ -0,0 +1,65 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.cluster.stats; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.RemoteClusterActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; + +/** + * Handler action for incoming {@link RemoteClusterStatsRequest}. + * Will pass the work to {@link TransportClusterStatsAction} and return the response. + */ +public class TransportRemoteClusterStatsAction extends HandledTransportAction { + + public static final String NAME = "cluster:monitor/stats/remote"; + public static final ActionType TYPE = new ActionType<>(NAME); + public static final RemoteClusterActionType REMOTE_TYPE = new RemoteClusterActionType<>( + NAME, + RemoteClusterStatsResponse::new + ); + private final NodeClient client; + + @Inject + public TransportRemoteClusterStatsAction(NodeClient client, TransportService transportService, ActionFilters actionFilters) { + super(NAME, transportService, actionFilters, RemoteClusterStatsRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE); + this.client = client; + } + + @Override + protected void doExecute(Task task, RemoteClusterStatsRequest request, ActionListener listener) { + ClusterStatsRequest subRequest = new ClusterStatsRequest().asRemoteStats(); + subRequest.setParentTask(request.getParentTask()); + client.execute( + TransportClusterStatsAction.TYPE, + subRequest, + listener.map( + clusterStatsResponse -> new RemoteClusterStatsResponse( + clusterStatsResponse.getClusterUUID(), + clusterStatsResponse.getStatus(), + clusterStatsResponse.getNodesStats().getVersions(), + clusterStatsResponse.getNodesStats().getCounts().getTotal(), + clusterStatsResponse.getIndicesStats().getShards().getTotal(), + clusterStatsResponse.getIndicesStats().getIndexCount(), + clusterStatsResponse.getIndicesStats().getStore().sizeInBytes(), + clusterStatsResponse.getNodesStats().getJvm().getHeapMax().getBytes(), + clusterStatsResponse.getNodesStats().getOs().getMem().getTotal().getBytes() + ) + ) + ); + } +} diff --git a/server/src/main/java/org/elasticsearch/bootstrap/BootstrapChecks.java b/server/src/main/java/org/elasticsearch/bootstrap/BootstrapChecks.java index 335e2e4ca729e..6a881163914e4 100644 --- a/server/src/main/java/org/elasticsearch/bootstrap/BootstrapChecks.java +++ b/server/src/main/java/org/elasticsearch/bootstrap/BootstrapChecks.java @@ -412,12 +412,12 @@ static class MaxFileSizeCheck implements BootstrapCheck { @Override public BootstrapCheckResult check(BootstrapContext context) { - final long maxFileSize = getMaxFileSize(); + final long maxFileSize = getProcessLimits().maxFileSize(); if (maxFileSize != Long.MIN_VALUE && maxFileSize != ProcessLimits.UNLIMITED) { final String message = String.format( Locale.ROOT, "max file size [%d] for user [%s] is too low, increase to [unlimited]", - getMaxFileSize(), + maxFileSize, BootstrapInfo.getSystemProperties().get("user.name") ); return BootstrapCheckResult.failure(message); @@ -426,8 +426,8 @@ public BootstrapCheckResult check(BootstrapContext context) { } } - long getMaxFileSize() { - return NativeAccess.instance().getProcessLimits().maxVirtualMemorySize(); + protected ProcessLimits getProcessLimits() { + return NativeAccess.instance().getProcessLimits(); } @Override diff --git a/server/src/main/java/org/elasticsearch/monitor/metrics/IndicesMetrics.java b/server/src/main/java/org/elasticsearch/monitor/metrics/IndicesMetrics.java new file mode 100644 index 0000000000000..17e290283d5e0 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/monitor/metrics/IndicesMetrics.java @@ -0,0 +1,177 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.monitor.metrics; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.util.SingleObjectCache; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.shard.IllegalIndexShardStateException; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.telemetry.metric.LongWithAttributes; +import org.elasticsearch.telemetry.metric.MeterRegistry; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; + +/** + * {@link IndicesMetrics} monitors index statistics on an Elasticsearch node and exposes them as metrics + * through the provided {@link MeterRegistry}. It tracks the current total number of indices, document count, and + * store size (in bytes) for each index mode. + */ +public class IndicesMetrics extends AbstractLifecycleComponent { + private final Logger logger = LogManager.getLogger(IndicesMetrics.class); + private final MeterRegistry registry; + private final List metrics = new ArrayList<>(); + private final IndicesStatsCache stateCache; + + public IndicesMetrics(MeterRegistry meterRegistry, IndicesService indicesService, TimeValue metricsInterval) { + this.registry = meterRegistry; + // Use half of the update interval to ensure that results aren't cached across updates, + // while preventing the cache from expiring when reading different gauges within the same update. + var cacheExpiry = new TimeValue(metricsInterval.getMillis() / 2); + this.stateCache = new IndicesStatsCache(indicesService, cacheExpiry); + } + + private static List registerAsyncMetrics(MeterRegistry registry, IndicesStatsCache cache) { + List metrics = new ArrayList<>(IndexMode.values().length * 3); + assert IndexMode.values().length == 3 : "index modes have changed"; + for (IndexMode indexMode : IndexMode.values()) { + String name = indexMode.getName(); + metrics.add( + registry.registerLongGauge( + "es.indices." + name + ".total", + "total number of " + name + " indices", + "unit", + () -> new LongWithAttributes(cache.getOrRefresh().get(indexMode).numIndices) + ) + ); + metrics.add( + registry.registerLongGauge( + "es.indices." + name + ".docs.total", + "total documents of " + name + " indices", + "unit", + () -> new LongWithAttributes(cache.getOrRefresh().get(indexMode).numDocs) + ) + ); + metrics.add( + registry.registerLongGauge( + "es.indices." + name + ".bytes.total", + "total size in bytes of " + name + " indices", + "unit", + () -> new LongWithAttributes(cache.getOrRefresh().get(indexMode).numBytes) + ) + ); + } + return metrics; + } + + @Override + protected void doStart() { + metrics.addAll(registerAsyncMetrics(registry, stateCache)); + } + + @Override + protected void doStop() { + stateCache.stopRefreshing(); + } + + @Override + protected void doClose() throws IOException { + metrics.forEach(metric -> { + try { + metric.close(); + } catch (Exception e) { + logger.warn("metrics close() method should not throw Exception", e); + } + }); + } + + static class IndexStats { + int numIndices = 0; + long numDocs = 0; + long numBytes = 0; + } + + private static class IndicesStatsCache extends SingleObjectCache> { + private static final Map MISSING_STATS; + static { + MISSING_STATS = new EnumMap<>(IndexMode.class); + for (IndexMode value : IndexMode.values()) { + MISSING_STATS.put(value, new IndexStats()); + } + } + + private boolean refresh; + private final IndicesService indicesService; + + IndicesStatsCache(IndicesService indicesService, TimeValue interval) { + super(interval, MISSING_STATS); + this.indicesService = indicesService; + this.refresh = true; + } + + private Map internalGetIndicesStats() { + Map stats = new EnumMap<>(IndexMode.class); + for (IndexMode mode : IndexMode.values()) { + stats.put(mode, new IndexStats()); + } + for (IndexService indexService : indicesService) { + for (IndexShard indexShard : indexService) { + if (indexShard.isSystem()) { + continue; // skip system indices + } + final ShardRouting shardRouting = indexShard.routingEntry(); + if (shardRouting.primary() == false) { + continue; // count primaries only + } + if (shardRouting.recoverySource() != null) { + continue; // exclude relocating shards + } + final IndexMode indexMode = indexShard.indexSettings().getMode(); + final IndexStats indexStats = stats.get(indexMode); + if (shardRouting.shardId().id() == 0) { + indexStats.numIndices++; + } + try { + indexStats.numDocs += indexShard.commitStats().getNumDocs(); + indexStats.numBytes += indexShard.storeStats().sizeInBytes(); + } catch (IllegalIndexShardStateException | AlreadyClosedException ignored) { + // ignored + } + } + } + return stats; + } + + @Override + protected Map refresh() { + return refresh ? internalGetIndicesStats() : getNoRefresh(); + } + + @Override + protected boolean needsRefresh() { + return getNoRefresh() == MISSING_STATS || super.needsRefresh(); + } + + void stopRefreshing() { + this.refresh = false; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 1447ac1c5b59b..5024cc5468866 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -66,6 +66,7 @@ import org.elasticsearch.injection.guice.Injector; import org.elasticsearch.monitor.fs.FsHealthService; import org.elasticsearch.monitor.jvm.JvmInfo; +import org.elasticsearch.monitor.metrics.IndicesMetrics; import org.elasticsearch.monitor.metrics.NodeMetrics; import org.elasticsearch.node.internal.TerminationHandler; import org.elasticsearch.plugins.ClusterCoordinationPlugin; @@ -441,6 +442,7 @@ public void onTimeout(TimeValue timeout) { } injector.getInstance(NodeMetrics.class).start(); + injector.getInstance(IndicesMetrics.class).start(); injector.getInstance(HealthPeriodicLogger.class).start(); logger.info("started {}", transportService.getLocalNode()); @@ -489,6 +491,7 @@ private void stop() { stopIfStarted(SearchService.class); stopIfStarted(TransportService.class); stopIfStarted(NodeMetrics.class); + stopIfStarted(IndicesMetrics.class); pluginLifecycleComponents.forEach(Node::stopIfStarted); // we should stop this last since it waits for resources to get released @@ -558,6 +561,7 @@ public synchronized void close() throws IOException { toClose.add(() -> stopWatch.stop().start("transport")); toClose.add(injector.getInstance(TransportService.class)); toClose.add(injector.getInstance(NodeMetrics.class)); + toClose.add(injector.getInstance(IndicesService.class)); if (ReadinessService.enabled(environment)) { toClose.add(injector.getInstance(ReadinessService.class)); } diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index fdd3b9784f8b8..80ee1174db7b1 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -142,6 +142,7 @@ import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.monitor.fs.FsHealthService; import org.elasticsearch.monitor.jvm.JvmInfo; +import org.elasticsearch.monitor.metrics.IndicesMetrics; import org.elasticsearch.monitor.metrics.NodeMetrics; import org.elasticsearch.node.internal.TerminationHandler; import org.elasticsearch.node.internal.TerminationHandlerProvider; @@ -1076,6 +1077,7 @@ private void construct( final TimeValue metricsInterval = settings.getAsTime("telemetry.agent.metrics_interval", TimeValue.timeValueSeconds(10)); final NodeMetrics nodeMetrics = new NodeMetrics(telemetryProvider.getMeterRegistry(), nodeService, metricsInterval); + final IndicesMetrics indicesMetrics = new IndicesMetrics(telemetryProvider.getMeterRegistry(), indicesService, metricsInterval); final SearchService searchService = serviceProvider.newSearchService( pluginsService, @@ -1175,6 +1177,7 @@ private void construct( b.bind(Transport.class).toInstance(transport); b.bind(TransportService.class).toInstance(transportService); b.bind(NodeMetrics.class).toInstance(nodeMetrics); + b.bind(IndicesMetrics.class).toInstance(indicesMetrics); b.bind(NetworkService.class).toInstance(networkService); b.bind(IndexMetadataVerifier.class).toInstance(indexMetadataVerifier); b.bind(ClusterInfoService.class).toInstance(clusterInfoService); diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterStatsAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterStatsAction.java index 603dcdba86730..53ae50bc0b75f 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterStatsAction.java @@ -11,6 +11,8 @@ import org.elasticsearch.action.admin.cluster.stats.ClusterStatsRequest; import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.common.util.FeatureFlag; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.Scope; @@ -29,6 +31,8 @@ public class RestClusterStatsAction extends BaseRestHandler { private static final Set SUPPORTED_CAPABILITIES = Set.of("human-readable-total-docs-size"); + private static final Set SUPPORTED_CAPABILITIES_CCS_STATS = Sets.union(SUPPORTED_CAPABILITIES, Set.of("ccs-stats")); + public static final FeatureFlag CCS_TELEMETRY_FEATURE_FLAG = new FeatureFlag("ccs_telemetry"); @Override public List routes() { @@ -40,9 +44,17 @@ public String getName() { return "cluster_stats_action"; } + @Override + public Set supportedQueryParameters() { + return Set.of("include_remotes", "nodeId"); + } + @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { - ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest(request.paramAsStringArray("nodeId", null)); + ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest( + request.paramAsBoolean("include_remotes", false), + request.paramAsStringArray("nodeId", null) + ); clusterStatsRequest.timeout(getTimeout(request)); return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).admin() .cluster() @@ -56,6 +68,6 @@ public boolean canTripCircuitBreaker() { @Override public Set supportedCapabilities() { - return SUPPORTED_CAPABILITIES; + return CCS_TELEMETRY_FEATURE_FLAG.isEnabled() ? SUPPORTED_CAPABILITIES_CCS_STATS : SUPPORTED_CAPABILITIES; } } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index d0638fcf7a2de..f0cafb956457e 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -43,7 +43,7 @@ * {@link SniffConnectionStrategy#REMOTE_CONNECTIONS_PER_CLUSTER} until either all eligible nodes are exhausted or the maximum number of * connections per cluster has been reached. */ -final class RemoteClusterConnection implements Closeable { +public final class RemoteClusterConnection implements Closeable { private final TransportService transportService; private final RemoteConnectionManager remoteConnectionManager; @@ -99,7 +99,7 @@ void setSkipUnavailable(boolean skipUnavailable) { /** * Returns whether this cluster is configured to be skipped when unavailable */ - boolean isSkipUnavailable() { + public boolean isSkipUnavailable() { return skipUnavailable; } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index f1afdfe1f186b..620b80e91cb45 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -277,7 +277,7 @@ public void maybeEnsureConnectedAndGetConnection( } } - RemoteClusterConnection getRemoteClusterConnection(String cluster) { + public RemoteClusterConnection getRemoteClusterConnection(String cluster) { if (enabled == false) { throw new IllegalArgumentException( "this node does not have the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role" diff --git a/server/src/test/java/org/elasticsearch/bootstrap/BootstrapChecksTests.java b/server/src/test/java/org/elasticsearch/bootstrap/BootstrapChecksTests.java index 9a51757189f8b..8c3749dbd3a45 100644 --- a/server/src/test/java/org/elasticsearch/bootstrap/BootstrapChecksTests.java +++ b/server/src/test/java/org/elasticsearch/bootstrap/BootstrapChecksTests.java @@ -389,8 +389,8 @@ public void testMaxFileSizeCheck() throws NodeValidationException { final AtomicLong maxFileSize = new AtomicLong(randomIntBetween(0, Integer.MAX_VALUE)); final BootstrapChecks.MaxFileSizeCheck check = new BootstrapChecks.MaxFileSizeCheck() { @Override - long getMaxFileSize() { - return maxFileSize.get(); + protected ProcessLimits getProcessLimits() { + return new ProcessLimits(ProcessLimits.UNKNOWN, ProcessLimits.UNKNOWN, maxFileSize.get()); } }; diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java index 6afea6faa607e..73ceafc0a24b9 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java @@ -434,7 +434,8 @@ public void testToXContent() throws IOException { MappingStats.of(metadata, () -> {}), AnalysisStats.of(metadata, () -> {}), VersionStats.of(metadata, singletonList(mockNodeResponse)), - ClusterSnapshotStats.EMPTY + ClusterSnapshotStats.EMPTY, + null ); final MonitoringDoc.Node node = new MonitoringDoc.Node("_uuid", "_host", "_addr", "_ip", "_name", 1504169190855L);