From 875991f9b0c6842c686f1483011335b99ac95760 Mon Sep 17 00:00:00 2001 From: Stanislav Malyshev Date: Mon, 30 Sep 2024 11:50:22 -0600 Subject: [PATCH] Implement remote cluster CCS telemetry (#112478) * Add remote cluster stats to _cluster/stats * Implement remote cluster stats polling * Add docs for the include_remotes part (cherry picked from commit b26d81c71317f632f348d2b98af68b09d41db969) --- docs/reference/cluster/stats.asciidoc | 124 +++++++++++- .../rest-api-spec/api/cluster.stats.json | 4 +- .../test/cluster.stats/30_ccs_stats.yml | 151 ++++++++++++++ .../cluster/stats/ClusterStatsRemoteIT.java | 150 ++++++++++++++ .../org/elasticsearch/TransportVersions.java | 1 + .../cluster/stats/ClusterStatsRequest.java | 34 ++++ .../cluster/stats/ClusterStatsResponse.java | 86 +++++++- .../stats/RemoteClusterStatsRequest.java | 46 +++++ .../stats/RemoteClusterStatsResponse.java | 116 +++++++++++ .../stats/TransportClusterStatsAction.java | 191 ++++++++++++++---- .../TransportRemoteClusterStatsAction.java | 65 ++++++ .../admin/cluster/RestClusterStatsAction.java | 16 +- .../transport/RemoteClusterConnection.java | 4 +- .../transport/RemoteClusterService.java | 2 +- .../ClusterStatsMonitoringDocTests.java | 3 +- 15 files changed, 939 insertions(+), 54 deletions(-) create mode 100644 rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cluster.stats/30_ccs_stats.yml create mode 100644 server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRemoteIT.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterStatsRequest.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterStatsResponse.java create mode 100644 server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportRemoteClusterStatsAction.java diff --git a/docs/reference/cluster/stats.asciidoc b/docs/reference/cluster/stats.asciidoc index 575a6457804a6..8e4f630ef7da4 100644 --- a/docs/reference/cluster/stats.asciidoc +++ b/docs/reference/cluster/stats.asciidoc @@ -40,6 +40,10 @@ If a node does not respond before its timeout expires, the response does not inc However, timed out nodes are included in the response's `_nodes.failed` property. Defaults to no timeout. +`include_remotes`:: +(Optional, Boolean) If `true`, includes remote cluster information in the response. +Defaults to `false`, so no remote cluster information is returned. + [role="child_attributes"] [[cluster-stats-api-response-body]] ==== {api-response-body-title} @@ -183,12 +187,11 @@ This number is based on documents in Lucene segments and may include documents f This number is based on documents in Lucene segments. {es} reclaims the disk space of deleted Lucene documents when a segment is merged. `total_size_in_bytes`:: -(integer) -Total size in bytes across all primary shards assigned to selected nodes. +(integer) Total size in bytes across all primary shards assigned to selected nodes. `total_size`:: -(string) -Total size across all primary shards assigned to selected nodes, as a human-readable string. +(string) Total size across all primary shards assigned to selected nodes, as a human-readable string. + ===== `store`:: @@ -1285,8 +1288,7 @@ They are included here for expert users, but should otherwise be ignored. ==== `repositories`:: -(object) Contains statistics about the <> repositories defined in the cluster, broken down -by repository type. +(object) Contains statistics about the <> repositories defined in the cluster, broken down by repository type. + .Properties of `repositories` [%collapsible%open] @@ -1314,13 +1316,74 @@ Each repository type may also include other statistics about the repositories of [%collapsible%open] ===== +`clusters`::: +(object) Contains remote cluster settings and metrics collected from them. +The keys are cluster names, and the values are per-cluster data. +Only present if `include_remotes` option is set to `true`. + ++ +.Properties of `clusters` +[%collapsible%open] +====== + +`cluster_uuid`::: +(string) The UUID of the remote cluster. + +`mode`::: +(string) The <> used to communicate with the remote cluster. + +`skip_unavailable`::: +(Boolean) The `skip_unavailable` <> used for this remote cluster. + +`transport.compress`::: +(string) Transport compression setting used for this remote cluster. + +`version`::: +(array of strings) The list of {es} versions used by the nodes on the remote cluster. + +`status`::: +include::{es-ref-dir}/rest-api/common-parms.asciidoc[tag=cluster-health-status] ++ +See <>. + +`nodes_count`::: +(integer) The total count of nodes in the remote cluster. + +`shards_count`::: +(integer) The total number of shards in the remote cluster. + +`indices_count`::: +(integer) The total number of indices in the remote cluster. + +`indices_total_size_in_bytes`::: +(integer) Total data set size, in bytes, of all shards assigned to selected nodes. + +`indices_total_size`::: +(string) Total data set size, in bytes, of all shards assigned to selected nodes, as a human-readable string. + +`max_heap_in_bytes`::: +(integer) Maximum amount of memory, in bytes, available for use by the heap across the nodes of the remote cluster. + +`max_heap`::: +(string) Maximum amount of memory, in bytes, available for use by the heap across the nodes of the remote cluster, +as a human-readable string. + +`mem_total_in_bytes`::: +(integer) Total amount, in bytes, of physical memory across the nodes of the remote cluster. + +`mem_total`::: +(string) Total amount, in bytes, of physical memory across the nodes of the remote cluster, as a human-readable string. + +====== + `_search`::: -(object) Contains the telemetry information about the <> usage in the cluster. +(object) Contains the information about the <> usage in the cluster. + .Properties of `_search` [%collapsible%open] ====== + `total`::: (integer) The total number of {ccs} requests that have been executed by the cluster. @@ -1336,6 +1399,7 @@ Each repository type may also include other statistics about the repositories of .Properties of `took` [%collapsible%open] ======= + `max`::: (integer) The maximum time taken to execute a {ccs} request, in milliseconds. @@ -1344,6 +1408,7 @@ Each repository type may also include other statistics about the repositories of `p90`::: (integer) The 90th percentile of the time taken to execute {ccs} requests, in milliseconds. + ======= `took_mrt_true`:: @@ -1361,6 +1426,7 @@ Each repository type may also include other statistics about the repositories of `p90`::: (integer) The 90th percentile of the time taken to execute {ccs} requests, in milliseconds. + ======= `took_mrt_false`:: @@ -1378,6 +1444,7 @@ Each repository type may also include other statistics about the repositories of `p90`::: (integer) The 90th percentile of the time taken to execute {ccs} requests, in milliseconds. + ======= `remotes_per_search_max`:: @@ -1391,9 +1458,10 @@ Each repository type may also include other statistics about the repositories of The keys are the failure reason names and the values are the number of requests that failed for that reason. `features`:: -(object) Contains statistics about the features used in {ccs} requests. The keys are the names of the search feature, -and the values are the number of requests that used that feature. Single request can use more than one feature -(e.g. both `async` and `wildcard`). Known features are: +(object) Contains statistics about the features used in {ccs} requests. +The keys are the names of the search feature, and the values are the number of requests that used that feature. +Single request can use more than one feature (e.g. both `async` and `wildcard`). +Known features are: * `async` - <> @@ -1427,6 +1495,7 @@ This may include requests where partial results were returned, but not requests .Properties of `took` [%collapsible%open] ======== + `max`::: (integer) The maximum time taken to execute a {ccs} request, in milliseconds. @@ -1435,6 +1504,7 @@ This may include requests where partial results were returned, but not requests `p90`::: (integer) The 90th percentile of the time taken to execute {ccs} requests, in milliseconds. + ======== ======= @@ -1812,3 +1882,37 @@ This API can be restricted to a subset of the nodes using < remoteClusterAlias() { + return List.of(REMOTE1, REMOTE2); + } + + @Override + protected Map skipUnavailableForRemoteClusters() { + return Map.of(REMOTE1, false, REMOTE2, true); + } + + public void testRemoteClusterStats() throws ExecutionException, InterruptedException { + setupClusters(); + final Client client = client(LOCAL_CLUSTER); + SearchRequest searchRequest = new SearchRequest("*", "*:*"); + searchRequest.allowPartialSearchResults(false); + searchRequest.setCcsMinimizeRoundtrips(randomBoolean()); + searchRequest.source(new SearchSourceBuilder().query(new MatchAllQueryBuilder()).size(10)); + + // do a search + assertResponse(cluster(LOCAL_CLUSTER).client().search(searchRequest), Assert::assertNotNull); + // collect stats without remotes + ClusterStatsResponse response = client.admin().cluster().prepareClusterStats().get(); + assertNotNull(response.getCcsMetrics()); + var remotesUsage = response.getCcsMetrics().getByRemoteCluster(); + assertThat(remotesUsage.size(), equalTo(3)); + assertNull(response.getRemoteClustersStats()); + // collect stats with remotes + response = client.admin().cluster().execute(TransportClusterStatsAction.TYPE, new ClusterStatsRequest(true)).get(); + assertNotNull(response.getCcsMetrics()); + remotesUsage = response.getCcsMetrics().getByRemoteCluster(); + assertThat(remotesUsage.size(), equalTo(3)); + assertNotNull(response.getRemoteClustersStats()); + var remoteStats = response.getRemoteClustersStats(); + assertThat(remoteStats.size(), equalTo(2)); + for (String clusterAlias : remoteClusterAlias()) { + assertThat(remoteStats, hasKey(clusterAlias)); + assertThat(remotesUsage, hasKey(clusterAlias)); + assertThat(remoteStats.get(clusterAlias).status(), equalToIgnoringCase(ClusterHealthStatus.GREEN.name())); + assertThat(remoteStats.get(clusterAlias).indicesCount(), greaterThan(0L)); + assertThat(remoteStats.get(clusterAlias).nodesCount(), greaterThan(0L)); + assertThat(remoteStats.get(clusterAlias).shardsCount(), greaterThan(0L)); + assertThat(remoteStats.get(clusterAlias).heapBytes(), greaterThan(0L)); + assertThat(remoteStats.get(clusterAlias).memBytes(), greaterThan(0L)); + assertThat(remoteStats.get(clusterAlias).indicesBytes(), greaterThan(0L)); + assertThat(remoteStats.get(clusterAlias).versions(), hasItem(Version.CURRENT.toString())); + assertThat(remoteStats.get(clusterAlias).clusterUUID(), not(equalTo(""))); + assertThat(remoteStats.get(clusterAlias).mode(), oneOf("sniff", "proxy")); + } + assertFalse(remoteStats.get(REMOTE1).skipUnavailable()); + assertTrue(remoteStats.get(REMOTE2).skipUnavailable()); + } + + private void setupClusters() { + int numShardsLocal = randomIntBetween(2, 5); + Settings localSettings = indexSettings(numShardsLocal, randomIntBetween(0, 1)).build(); + assertAcked( + client(LOCAL_CLUSTER).admin() + .indices() + .prepareCreate(INDEX_NAME) + .setSettings(localSettings) + .setMapping("@timestamp", "type=date", "f", "type=text") + ); + indexDocs(client(LOCAL_CLUSTER)); + + int numShardsRemote = randomIntBetween(2, 10); + for (String clusterAlias : remoteClusterAlias()) { + final InternalTestCluster remoteCluster = cluster(clusterAlias); + remoteCluster.ensureAtLeastNumDataNodes(randomIntBetween(1, 3)); + assertAcked( + client(clusterAlias).admin() + .indices() + .prepareCreate(INDEX_NAME) + .setSettings(indexSettings(numShardsRemote, randomIntBetween(0, 1))) + .setMapping("@timestamp", "type=date", "f", "type=text") + ); + assertFalse( + client(clusterAlias).admin() + .cluster() + .prepareHealth(TEST_REQUEST_TIMEOUT, INDEX_NAME) + .setWaitForGreenStatus() + .setTimeout(TimeValue.timeValueSeconds(30)) + .get() + .isTimedOut() + ); + indexDocs(client(clusterAlias)); + } + + } + + private void indexDocs(Client client) { + int numDocs = between(5, 20); + for (int i = 0; i < numDocs; i++) { + client.prepareIndex(INDEX_NAME).setSource("f", "v", "@timestamp", randomNonNegativeLong()).get(); + } + client.admin().indices().prepareRefresh(INDEX_NAME).get(); + } + +} diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index f58e30bf11e2a..2039376bcb31d 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -228,6 +228,7 @@ static TransportVersion def(int id) { public static final TransportVersion SEMANTIC_QUERY_INNER_HITS = def(8_752_00_0); public static final TransportVersion RETAIN_ILM_STEP_INFO = def(8_753_00_0); public static final TransportVersion ADD_DATA_STREAM_OPTIONS = def(8_754_00_0); + public static final TransportVersion CCS_REMOTE_TELEMETRY_STATS = def(8_755_00_0); /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRequest.java index d9c55ba097b6c..a62db92687e5a 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRequest.java @@ -20,16 +20,50 @@ * A request to get cluster level stats. */ public class ClusterStatsRequest extends BaseNodesRequest { + /** + * Should the remote cluster stats be included in the response. + */ + private final boolean doRemotes; + /** + * Return stripped down stats for remote clusters. + */ + private boolean remoteStats; + /** * Get stats from nodes based on the nodes ids specified. If none are passed, stats * based on all nodes will be returned. */ public ClusterStatsRequest(String... nodesIds) { + this(false, nodesIds); + } + + public ClusterStatsRequest(boolean doRemotes, String... nodesIds) { super(nodesIds); + this.doRemotes = doRemotes; + this.remoteStats = false; } @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { return new CancellableTask(id, type, action, "", parentTaskId, headers); } + + public ClusterStatsRequest asRemoteStats() { + this.remoteStats = true; + return this; + } + + /** + * Should the remote cluster stats be included in the response. + */ + public boolean doRemotes() { + return doRemotes; + } + + /** + * Should the response be a stripped down version of the stats for remote clusters. + */ + public boolean isRemoteStats() { + return remoteStats; + } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java index 86900f830f4be..1a77a3d4d5399 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java @@ -18,12 +18,15 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.xcontent.ToXContentFragment; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; import java.util.List; import java.util.Locale; +import java.util.Map; +import java.util.Set; import static org.elasticsearch.action.search.TransportSearchAction.CCS_TELEMETRY_FEATURE_FLAG; @@ -34,10 +37,10 @@ public class ClusterStatsResponse extends BaseNodesResponse remoteClustersStats; public ClusterStatsResponse( long timestamp, @@ -48,7 +51,8 @@ public ClusterStatsResponse( MappingStats mappingStats, AnalysisStats analysisStats, VersionStats versionStats, - ClusterSnapshotStats clusterSnapshotStats + ClusterSnapshotStats clusterSnapshotStats, + Map remoteClustersStats ) { super(clusterName, nodes, failures); this.clusterUUID = clusterUUID; @@ -75,6 +79,7 @@ public ClusterStatsResponse( // stats should be the same on every node so just pick one of them .findAny() .orElse(RepositoryUsageStats.EMPTY); + this.remoteClustersStats = remoteClustersStats; } public String getClusterUUID() { @@ -101,6 +106,10 @@ public CCSTelemetrySnapshot getCcsMetrics() { return ccsMetrics; } + public Map getRemoteClustersStats() { + return remoteClustersStats; + } + @Override public void writeTo(StreamOutput out) throws IOException { TransportAction.localOnly(); @@ -138,6 +147,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (CCS_TELEMETRY_FEATURE_FLAG.isEnabled()) { builder.startObject("ccs"); + if (remoteClustersStats != null) { + builder.field("clusters", remoteClustersStats); + } ccsMetrics.toXContent(builder, params); builder.endObject(); } @@ -150,4 +162,74 @@ public String toString() { return Strings.toString(this, true, true); } + /** + * Represents the information about a remote cluster. + */ + public record RemoteClusterStats( + String clusterUUID, + String mode, + boolean skipUnavailable, + String transportCompress, + Set versions, + String status, + long nodesCount, + long shardsCount, + long indicesCount, + long indicesBytes, + long heapBytes, + long memBytes + ) implements ToXContentFragment { + public RemoteClusterStats(String mode, boolean skipUnavailable, String transportCompress) { + this( + "unavailable", + mode, + skipUnavailable, + transportCompress.toLowerCase(Locale.ROOT), + Set.of(), + "unavailable", + 0, + 0, + 0, + 0, + 0, + 0 + ); + } + + public RemoteClusterStats acceptResponse(RemoteClusterStatsResponse remoteResponse) { + return new RemoteClusterStats( + remoteResponse.getClusterUUID(), + mode, + skipUnavailable, + transportCompress, + remoteResponse.getVersions(), + remoteResponse.getStatus().name().toLowerCase(Locale.ROOT), + remoteResponse.getNodesCount(), + remoteResponse.getShardsCount(), + remoteResponse.getIndicesCount(), + remoteResponse.getIndicesBytes(), + remoteResponse.getHeapBytes(), + remoteResponse.getMemBytes() + ); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("cluster_uuid", clusterUUID); + builder.field("mode", mode); + builder.field("skip_unavailable", skipUnavailable); + builder.field("transport.compress", transportCompress); + builder.field("status", status); + builder.field("version", versions); + builder.field("nodes_count", nodesCount); + builder.field("shards_count", shardsCount); + builder.field("indices_count", indicesCount); + builder.humanReadableField("indices_total_size_in_bytes", "indices_total_size", ByteSizeValue.ofBytes(indicesBytes)); + builder.humanReadableField("max_heap_in_bytes", "max_heap", ByteSizeValue.ofBytes(heapBytes)); + builder.humanReadableField("mem_total_in_bytes", "mem_total", ByteSizeValue.ofBytes(memBytes)); + builder.endObject(); + return builder; + } + } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterStatsRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterStatsRequest.java new file mode 100644 index 0000000000000..47843a91351ee --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterStatsRequest.java @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.cluster.stats; + +import org.elasticsearch.TransportVersions; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * A request to get cluster level stats from the remote cluster. + */ +public class RemoteClusterStatsRequest extends ActionRequest { + public RemoteClusterStatsRequest(StreamInput in) throws IOException { + super(in); + } + + public RemoteClusterStatsRequest() { + super(); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + assert out.getTransportVersion().onOrAfter(TransportVersions.CCS_REMOTE_TELEMETRY_STATS) + : "RemoteClusterStatsRequest is not supported by the remote cluster"; + if (out.getTransportVersion().before(TransportVersions.CCS_REMOTE_TELEMETRY_STATS)) { + throw new UnsupportedOperationException("RemoteClusterStatsRequest is not supported by the remote cluster"); + } + super.writeTo(out); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterStatsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterStatsResponse.java new file mode 100644 index 0000000000000..9a140b6b7424e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterStatsResponse.java @@ -0,0 +1,116 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ +package org.elasticsearch.action.admin.cluster.stats; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Set; + +/** + * Trimmed down cluster stats response for reporting to a remote cluster. + */ +public class RemoteClusterStatsResponse extends ActionResponse { + final String clusterUUID; + final ClusterHealthStatus status; + private final Set versions; + private final long nodesCount; + private final long shardsCount; + private final long indicesCount; + private final long indicesBytes; + private final long heapBytes; + private final long memBytes; + + public Set getVersions() { + return versions; + } + + public long getNodesCount() { + return nodesCount; + } + + public long getShardsCount() { + return shardsCount; + } + + public long getIndicesCount() { + return indicesCount; + } + + public long getIndicesBytes() { + return indicesBytes; + } + + public long getHeapBytes() { + return heapBytes; + } + + public long getMemBytes() { + return memBytes; + } + + public RemoteClusterStatsResponse( + String clusterUUID, + ClusterHealthStatus status, + Set versions, + long nodesCount, + long shardsCount, + long indicesCount, + long indicesBytes, + long heapBytes, + long memBytes + ) { + this.clusterUUID = clusterUUID; + this.status = status; + this.versions = versions; + this.nodesCount = nodesCount; + this.shardsCount = shardsCount; + this.indicesCount = indicesCount; + this.indicesBytes = indicesBytes; + this.heapBytes = heapBytes; + this.memBytes = memBytes; + } + + public String getClusterUUID() { + return this.clusterUUID; + } + + public ClusterHealthStatus getStatus() { + return this.status; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(clusterUUID); + status.writeTo(out); + out.writeStringCollection(versions); + out.writeLong(nodesCount); + out.writeLong(shardsCount); + out.writeLong(indicesCount); + out.writeLong(indicesBytes); + out.writeLong(heapBytes); + out.writeLong(memBytes); + } + + public RemoteClusterStatsResponse(StreamInput in) throws IOException { + super(in); + this.clusterUUID = in.readString(); + this.status = ClusterHealthStatus.readFrom(in); + this.versions = in.readCollectionAsSet(StreamInput::readString); + this.nodesCount = in.readLong(); + this.shardsCount = in.readLong(); + this.indicesCount = in.readLong(); + this.indicesBytes = in.readLong(); + this.heapBytes = in.readLong(); + this.memBytes = in.readLong(); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 7e25fe45f6332..1d7a6105d1d44 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -9,6 +9,8 @@ package org.elasticsearch.action.admin.cluster.stats; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; @@ -17,10 +19,12 @@ import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse.RemoteClusterStats; import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.CancellableFanOut; import org.elasticsearch.action.support.RefCountingListener; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.nodes.TransportNodesAction; @@ -33,6 +37,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CancellableSingleObjectCache; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.UpdateForV9; @@ -49,6 +54,9 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClusterConnection; +import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.RemoteConnectionInfo; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.Transports; @@ -57,12 +65,19 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import java.util.function.BiFunction; import java.util.function.BooleanSupplier; +import java.util.stream.Collectors; +import static org.elasticsearch.TransportVersions.CCS_REMOTE_TELEMETRY_STATS; + +/** + * Transport action implementing _cluster/stats API. + */ public class TransportClusterStatsAction extends TransportNodesAction< ClusterStatsRequest, ClusterStatsResponse, @@ -71,6 +86,7 @@ public class TransportClusterStatsAction extends TransportNodesAction< SubscribableListener> { public static final ActionType TYPE = new ActionType<>("cluster:monitor/stats"); + private static final CommonStatsFlags SHARD_STATS_FLAGS = new CommonStatsFlags( CommonStatsFlags.Flag.Docs, CommonStatsFlags.Flag.Store, @@ -81,7 +97,9 @@ public class TransportClusterStatsAction extends TransportNodesAction< CommonStatsFlags.Flag.DenseVector, CommonStatsFlags.Flag.SparseVector ); + private static final Logger logger = LogManager.getLogger(TransportClusterStatsAction.class); + private final Settings settings; private final NodeService nodeService; private final IndicesService indicesService; private final RepositoriesService repositoriesService; @@ -91,6 +109,8 @@ public class TransportClusterStatsAction extends TransportNodesAction< private final Executor clusterStateStatsExecutor; private final MetadataStatsCache mappingStatsCache; private final MetadataStatsCache analysisStatsCache; + private final RemoteClusterService remoteClusterService; + private final TransportRemoteClusterStatsAction remoteClusterStatsAction; @Inject public TransportClusterStatsAction( @@ -101,7 +121,9 @@ public TransportClusterStatsAction( IndicesService indicesService, RepositoriesService repositoriesService, UsageService usageService, - ActionFilters actionFilters + ActionFilters actionFilters, + Settings settings, + TransportRemoteClusterStatsAction remoteClusterStatsAction ) { super( TYPE.name(), @@ -119,6 +141,9 @@ public TransportClusterStatsAction( this.clusterStateStatsExecutor = threadPool.executor(ThreadPool.Names.MANAGEMENT); this.mappingStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), MappingStats::of); this.analysisStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), AnalysisStats::of); + this.remoteClusterService = transportService.getRemoteClusterService(); + this.settings = settings; + this.remoteClusterStatsAction = remoteClusterStatsAction; } @Override @@ -126,14 +151,13 @@ protected SubscribableListener createActionContext(Task task, C assert task instanceof CancellableTask; final var cancellableTask = (CancellableTask) task; final var additionalStatsListener = new SubscribableListener(); - AdditionalStats.compute( - cancellableTask, - clusterStateStatsExecutor, - clusterService, - mappingStatsCache, - analysisStatsCache, - additionalStatsListener - ); + if (request.isRemoteStats() == false) { + final AdditionalStats additionalStats = new AdditionalStats(); + additionalStats.compute(cancellableTask, request, additionalStatsListener); + } else { + // For remote stats request, we don't need to compute anything + additionalStatsListener.onResponse(null); + } return additionalStatsListener; } @@ -151,18 +175,34 @@ protected void newResponseAsync( + "the cluster state that are too slow for a transport thread" ); assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.MANAGEMENT); + additionalStatsListener.andThenApply( - additionalStats -> new ClusterStatsResponse( - System.currentTimeMillis(), - additionalStats.clusterUUID(), - clusterService.getClusterName(), - responses, - failures, - additionalStats.mappingStats(), - additionalStats.analysisStats(), - VersionStats.of(clusterService.state().metadata(), responses), - additionalStats.clusterSnapshotStats() - ) + additionalStats -> request.isRemoteStats() + // Return stripped down stats for remote clusters + ? new ClusterStatsResponse( + System.currentTimeMillis(), + clusterService.state().metadata().clusterUUID(), + clusterService.getClusterName(), + responses, + List.of(), + null, + null, + null, + null, + Map.of() + ) + : new ClusterStatsResponse( + System.currentTimeMillis(), + additionalStats.clusterUUID(), + clusterService.getClusterName(), + responses, + failures, + additionalStats.mappingStats(), + additionalStats.analysisStats(), + VersionStats.of(clusterService.state().metadata(), responses), + additionalStats.clusterSnapshotStats(), + additionalStats.getRemoteStats() + ) ).addListener(listener); } @@ -318,36 +358,33 @@ protected boolean isFresh(Long currentKey, Long newKey) { } } - public static final class AdditionalStats { + public final class AdditionalStats { private String clusterUUID; private MappingStats mappingStats; private AnalysisStats analysisStats; private ClusterSnapshotStats clusterSnapshotStats; + private Map remoteStats; - static void compute( - CancellableTask task, - Executor executor, - ClusterService clusterService, - MetadataStatsCache mappingStatsCache, - MetadataStatsCache analysisStatsCache, - ActionListener listener - ) { - executor.execute(ActionRunnable.wrap(listener, l -> { + void compute(CancellableTask task, ClusterStatsRequest request, ActionListener listener) { + clusterStateStatsExecutor.execute(ActionRunnable.wrap(listener, l -> { task.ensureNotCancelled(); - final var result = new AdditionalStats(); - result.compute( + internalCompute( + task, + request, clusterService.state(), mappingStatsCache, analysisStatsCache, task::isCancelled, clusterService.threadPool().absoluteTimeInMillis(), - l.map(ignored -> result) + l.map(ignored -> this) ); })); } - private void compute( + private void internalCompute( + CancellableTask task, + ClusterStatsRequest request, ClusterState clusterState, MetadataStatsCache mappingStatsCache, MetadataStatsCache analysisStatsCache, @@ -361,6 +398,18 @@ private void compute( mappingStatsCache.get(metadata, isCancelledSupplier, listeners.acquire(s -> mappingStats = s)); analysisStatsCache.get(metadata, isCancelledSupplier, listeners.acquire(s -> analysisStats = s)); clusterSnapshotStats = ClusterSnapshotStats.of(clusterState, absoluteTimeInMillis); + if (doRemotes(request)) { + var remotes = remoteClusterService.getRegisteredRemoteClusterNames(); + if (remotes.isEmpty()) { + remoteStats = Map.of(); + } else { + new RemoteStatsFanout(task, transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION)).start( + task, + remotes, + listeners.acquire(s -> remoteStats = s) + ); + } + } } } @@ -379,5 +428,79 @@ AnalysisStats analysisStats() { ClusterSnapshotStats clusterSnapshotStats() { return clusterSnapshotStats; } + + public Map getRemoteStats() { + return remoteStats; + } + } + + private static boolean doRemotes(ClusterStatsRequest request) { + return request.doRemotes(); + } + + private class RemoteStatsFanout extends CancellableFanOut> { + private final Executor requestExecutor; + private final TaskId taskId; + private Map remoteClustersStats; + + RemoteStatsFanout(Task task, Executor requestExecutor) { + this.requestExecutor = requestExecutor; + this.taskId = new TaskId(clusterService.getNodeName(), task.getId()); + } + + @Override + protected void sendItemRequest(String clusterAlias, ActionListener listener) { + var remoteClusterClient = remoteClusterService.getRemoteClusterClient( + clusterAlias, + requestExecutor, + RemoteClusterService.DisconnectedStrategy.RECONNECT_IF_DISCONNECTED + ); + var remoteRequest = new RemoteClusterStatsRequest(); + remoteRequest.setParentTask(taskId); + remoteClusterClient.getConnection(remoteRequest, listener.delegateFailureAndWrap((responseListener, connection) -> { + if (connection.getTransportVersion().before(CCS_REMOTE_TELEMETRY_STATS)) { + responseListener.onResponse(null); + } else { + remoteClusterClient.execute(connection, TransportRemoteClusterStatsAction.REMOTE_TYPE, remoteRequest, responseListener); + } + })); + } + + @Override + protected void onItemResponse(String clusterAlias, RemoteClusterStatsResponse response) { + if (response != null) { + remoteClustersStats.computeIfPresent(clusterAlias, (k, v) -> v.acceptResponse(response)); + } + } + + @Override + protected void onItemFailure(String clusterAlias, Exception e) { + logger.warn("Failed to get remote cluster stats for [{}]: {}", clusterAlias, e); + } + + void start(Task task, Collection remotes, ActionListener> listener) { + this.remoteClustersStats = remotes.stream().collect(Collectors.toConcurrentMap(r -> r, this::makeRemoteClusterStats)); + super.run(task, remotes.iterator(), listener); + } + + /** + * Create static portion of RemoteClusterStats for a given cluster alias. + */ + RemoteClusterStats makeRemoteClusterStats(String clusterAlias) { + RemoteClusterConnection remoteConnection = remoteClusterService.getRemoteClusterConnection(clusterAlias); + RemoteConnectionInfo remoteConnectionInfo = remoteConnection.getConnectionInfo(); + var compression = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings); + return new RemoteClusterStats( + remoteConnectionInfo.getModeInfo().modeName(), + remoteConnection.isSkipUnavailable(), + compression.toString() + ); + } + + @Override + protected Map onCompletion() { + return remoteClustersStats; + } } + } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportRemoteClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportRemoteClusterStatsAction.java new file mode 100644 index 0000000000000..4d57f10807af6 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportRemoteClusterStatsAction.java @@ -0,0 +1,65 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.cluster.stats; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.RemoteClusterActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; + +/** + * Handler action for incoming {@link RemoteClusterStatsRequest}. + * Will pass the work to {@link TransportClusterStatsAction} and return the response. + */ +public class TransportRemoteClusterStatsAction extends HandledTransportAction { + + public static final String NAME = "cluster:monitor/stats/remote"; + public static final ActionType TYPE = new ActionType<>(NAME); + public static final RemoteClusterActionType REMOTE_TYPE = new RemoteClusterActionType<>( + NAME, + RemoteClusterStatsResponse::new + ); + private final NodeClient client; + + @Inject + public TransportRemoteClusterStatsAction(NodeClient client, TransportService transportService, ActionFilters actionFilters) { + super(NAME, transportService, actionFilters, RemoteClusterStatsRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE); + this.client = client; + } + + @Override + protected void doExecute(Task task, RemoteClusterStatsRequest request, ActionListener listener) { + ClusterStatsRequest subRequest = new ClusterStatsRequest().asRemoteStats(); + subRequest.setParentTask(request.getParentTask()); + client.execute( + TransportClusterStatsAction.TYPE, + subRequest, + listener.map( + clusterStatsResponse -> new RemoteClusterStatsResponse( + clusterStatsResponse.getClusterUUID(), + clusterStatsResponse.getStatus(), + clusterStatsResponse.getNodesStats().getVersions(), + clusterStatsResponse.getNodesStats().getCounts().getTotal(), + clusterStatsResponse.getIndicesStats().getShards().getTotal(), + clusterStatsResponse.getIndicesStats().getIndexCount(), + clusterStatsResponse.getIndicesStats().getStore().sizeInBytes(), + clusterStatsResponse.getNodesStats().getJvm().getHeapMax().getBytes(), + clusterStatsResponse.getNodesStats().getOs().getMem().getTotal().getBytes() + ) + ) + ); + } +} diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterStatsAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterStatsAction.java index 603dcdba86730..53ae50bc0b75f 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterStatsAction.java @@ -11,6 +11,8 @@ import org.elasticsearch.action.admin.cluster.stats.ClusterStatsRequest; import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.common.util.FeatureFlag; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.Scope; @@ -29,6 +31,8 @@ public class RestClusterStatsAction extends BaseRestHandler { private static final Set SUPPORTED_CAPABILITIES = Set.of("human-readable-total-docs-size"); + private static final Set SUPPORTED_CAPABILITIES_CCS_STATS = Sets.union(SUPPORTED_CAPABILITIES, Set.of("ccs-stats")); + public static final FeatureFlag CCS_TELEMETRY_FEATURE_FLAG = new FeatureFlag("ccs_telemetry"); @Override public List routes() { @@ -40,9 +44,17 @@ public String getName() { return "cluster_stats_action"; } + @Override + public Set supportedQueryParameters() { + return Set.of("include_remotes", "nodeId"); + } + @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { - ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest(request.paramAsStringArray("nodeId", null)); + ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest( + request.paramAsBoolean("include_remotes", false), + request.paramAsStringArray("nodeId", null) + ); clusterStatsRequest.timeout(getTimeout(request)); return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).admin() .cluster() @@ -56,6 +68,6 @@ public boolean canTripCircuitBreaker() { @Override public Set supportedCapabilities() { - return SUPPORTED_CAPABILITIES; + return CCS_TELEMETRY_FEATURE_FLAG.isEnabled() ? SUPPORTED_CAPABILITIES_CCS_STATS : SUPPORTED_CAPABILITIES; } } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index d0638fcf7a2de..f0cafb956457e 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -43,7 +43,7 @@ * {@link SniffConnectionStrategy#REMOTE_CONNECTIONS_PER_CLUSTER} until either all eligible nodes are exhausted or the maximum number of * connections per cluster has been reached. */ -final class RemoteClusterConnection implements Closeable { +public final class RemoteClusterConnection implements Closeable { private final TransportService transportService; private final RemoteConnectionManager remoteConnectionManager; @@ -99,7 +99,7 @@ void setSkipUnavailable(boolean skipUnavailable) { /** * Returns whether this cluster is configured to be skipped when unavailable */ - boolean isSkipUnavailable() { + public boolean isSkipUnavailable() { return skipUnavailable; } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index f1afdfe1f186b..620b80e91cb45 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -277,7 +277,7 @@ public void maybeEnsureConnectedAndGetConnection( } } - RemoteClusterConnection getRemoteClusterConnection(String cluster) { + public RemoteClusterConnection getRemoteClusterConnection(String cluster) { if (enabled == false) { throw new IllegalArgumentException( "this node does not have the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role" diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java index 6afea6faa607e..73ceafc0a24b9 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java @@ -434,7 +434,8 @@ public void testToXContent() throws IOException { MappingStats.of(metadata, () -> {}), AnalysisStats.of(metadata, () -> {}), VersionStats.of(metadata, singletonList(mockNodeResponse)), - ClusterSnapshotStats.EMPTY + ClusterSnapshotStats.EMPTY, + null ); final MonitoringDoc.Node node = new MonitoringDoc.Node("_uuid", "_host", "_addr", "_ip", "_name", 1504169190855L);