diff --git a/docs/changelog/113143.yaml b/docs/changelog/113143.yaml new file mode 100644 index 0000000000000..4a2044cca0ce4 --- /dev/null +++ b/docs/changelog/113143.yaml @@ -0,0 +1,10 @@ +pr: 113143 +summary: Deprecate dutch_kp and lovins stemmer as they are removed in Lucene 10 +area: Analysis +type: deprecation +issues: [] +deprecation: + title: Deprecate dutch_kp and lovins stemmer as they are removed in Lucene 10 + area: Analysis + details: kp, dutch_kp, dutchKp and lovins stemmers are deprecated and will be removed. + impact: These stemmers will be removed and will be no longer supported. diff --git a/docs/changelog/113723.yaml b/docs/changelog/113723.yaml new file mode 100644 index 0000000000000..2cbcf49102719 --- /dev/null +++ b/docs/changelog/113723.yaml @@ -0,0 +1,6 @@ +pr: 113723 +summary: Fix max file size check to use `getMaxFileSize` +area: Infra/Core +type: bug +issues: + - 113705 diff --git a/docs/reference/analysis/tokenfilters/snowball-tokenfilter.asciidoc b/docs/reference/analysis/tokenfilters/snowball-tokenfilter.asciidoc index 57e402988cc5a..d8300288c9f4b 100644 --- a/docs/reference/analysis/tokenfilters/snowball-tokenfilter.asciidoc +++ b/docs/reference/analysis/tokenfilters/snowball-tokenfilter.asciidoc @@ -11,6 +11,8 @@ values: `Arabic`, `Armenian`, `Basque`, `Catalan`, `Danish`, `Dutch`, `English`, `Lithuanian`, `Lovins`, `Norwegian`, `Porter`, `Portuguese`, `Romanian`, `Russian`, `Serbian`, `Spanish`, `Swedish`, `Turkish`. +deprecated:[8.16.0, `Kp` and `Lovins` support will be removed in a future version] + For example: [source,console] @@ -28,7 +30,7 @@ PUT /my-index-000001 "filter": { "my_snow": { "type": "snowball", - "language": "Lovins" + "language": "English" } } } diff --git a/docs/reference/analysis/tokenfilters/stemmer-tokenfilter.asciidoc b/docs/reference/analysis/tokenfilters/stemmer-tokenfilter.asciidoc index 42ac594fca3bf..4cd088935af19 100644 --- a/docs/reference/analysis/tokenfilters/stemmer-tokenfilter.asciidoc +++ b/docs/reference/analysis/tokenfilters/stemmer-tokenfilter.asciidoc @@ -144,12 +144,12 @@ https://snowballstem.org/algorithms/danish/stemmer.html[*`danish`*] Dutch:: https://snowballstem.org/algorithms/dutch/stemmer.html[*`dutch`*], -https://snowballstem.org/algorithms/kraaij_pohlmann/stemmer.html[`dutch_kp`] +https://snowballstem.org/algorithms/kraaij_pohlmann/stemmer.html[`dutch_kp`] deprecated:[8.16.0, `dutch_kp` will be removed in a future version] English:: https://snowballstem.org/algorithms/porter/stemmer.html[*`english`*], https://ciir.cs.umass.edu/pubfiles/ir-35.pdf[`light_english`], -https://snowballstem.org/algorithms/lovins/stemmer.html[`lovins`], +https://snowballstem.org/algorithms/lovins/stemmer.html[`lovins`] deprecated:[8.16.0, `lovins` will be removed in a future version], https://www.researchgate.net/publication/220433848_How_effective_is_suffixing[`minimal_english`], https://snowballstem.org/algorithms/english/stemmer.html[`porter2`], {lucene-analysis-docs}/en/EnglishPossessiveFilter.html[`possessive_english`] diff --git a/docs/reference/cluster/stats.asciidoc b/docs/reference/cluster/stats.asciidoc index 575a6457804a6..8e4f630ef7da4 100644 --- a/docs/reference/cluster/stats.asciidoc +++ b/docs/reference/cluster/stats.asciidoc @@ -40,6 +40,10 @@ If a node does not respond before its timeout expires, the response does not inc However, timed out nodes are included in the response's `_nodes.failed` property. Defaults to no timeout. +`include_remotes`:: +(Optional, Boolean) If `true`, includes remote cluster information in the response. +Defaults to `false`, so no remote cluster information is returned. + [role="child_attributes"] [[cluster-stats-api-response-body]] ==== {api-response-body-title} @@ -183,12 +187,11 @@ This number is based on documents in Lucene segments and may include documents f This number is based on documents in Lucene segments. {es} reclaims the disk space of deleted Lucene documents when a segment is merged. `total_size_in_bytes`:: -(integer) -Total size in bytes across all primary shards assigned to selected nodes. +(integer) Total size in bytes across all primary shards assigned to selected nodes. `total_size`:: -(string) -Total size across all primary shards assigned to selected nodes, as a human-readable string. +(string) Total size across all primary shards assigned to selected nodes, as a human-readable string. + ===== `store`:: @@ -1285,8 +1288,7 @@ They are included here for expert users, but should otherwise be ignored. ==== `repositories`:: -(object) Contains statistics about the <> repositories defined in the cluster, broken down -by repository type. +(object) Contains statistics about the <> repositories defined in the cluster, broken down by repository type. + .Properties of `repositories` [%collapsible%open] @@ -1314,13 +1316,74 @@ Each repository type may also include other statistics about the repositories of [%collapsible%open] ===== +`clusters`::: +(object) Contains remote cluster settings and metrics collected from them. +The keys are cluster names, and the values are per-cluster data. +Only present if `include_remotes` option is set to `true`. + ++ +.Properties of `clusters` +[%collapsible%open] +====== + +`cluster_uuid`::: +(string) The UUID of the remote cluster. + +`mode`::: +(string) The <> used to communicate with the remote cluster. + +`skip_unavailable`::: +(Boolean) The `skip_unavailable` <> used for this remote cluster. + +`transport.compress`::: +(string) Transport compression setting used for this remote cluster. + +`version`::: +(array of strings) The list of {es} versions used by the nodes on the remote cluster. + +`status`::: +include::{es-ref-dir}/rest-api/common-parms.asciidoc[tag=cluster-health-status] ++ +See <>. + +`nodes_count`::: +(integer) The total count of nodes in the remote cluster. + +`shards_count`::: +(integer) The total number of shards in the remote cluster. + +`indices_count`::: +(integer) The total number of indices in the remote cluster. + +`indices_total_size_in_bytes`::: +(integer) Total data set size, in bytes, of all shards assigned to selected nodes. + +`indices_total_size`::: +(string) Total data set size, in bytes, of all shards assigned to selected nodes, as a human-readable string. + +`max_heap_in_bytes`::: +(integer) Maximum amount of memory, in bytes, available for use by the heap across the nodes of the remote cluster. + +`max_heap`::: +(string) Maximum amount of memory, in bytes, available for use by the heap across the nodes of the remote cluster, +as a human-readable string. + +`mem_total_in_bytes`::: +(integer) Total amount, in bytes, of physical memory across the nodes of the remote cluster. + +`mem_total`::: +(string) Total amount, in bytes, of physical memory across the nodes of the remote cluster, as a human-readable string. + +====== + `_search`::: -(object) Contains the telemetry information about the <> usage in the cluster. +(object) Contains the information about the <> usage in the cluster. + .Properties of `_search` [%collapsible%open] ====== + `total`::: (integer) The total number of {ccs} requests that have been executed by the cluster. @@ -1336,6 +1399,7 @@ Each repository type may also include other statistics about the repositories of .Properties of `took` [%collapsible%open] ======= + `max`::: (integer) The maximum time taken to execute a {ccs} request, in milliseconds. @@ -1344,6 +1408,7 @@ Each repository type may also include other statistics about the repositories of `p90`::: (integer) The 90th percentile of the time taken to execute {ccs} requests, in milliseconds. + ======= `took_mrt_true`:: @@ -1361,6 +1426,7 @@ Each repository type may also include other statistics about the repositories of `p90`::: (integer) The 90th percentile of the time taken to execute {ccs} requests, in milliseconds. + ======= `took_mrt_false`:: @@ -1378,6 +1444,7 @@ Each repository type may also include other statistics about the repositories of `p90`::: (integer) The 90th percentile of the time taken to execute {ccs} requests, in milliseconds. + ======= `remotes_per_search_max`:: @@ -1391,9 +1458,10 @@ Each repository type may also include other statistics about the repositories of The keys are the failure reason names and the values are the number of requests that failed for that reason. `features`:: -(object) Contains statistics about the features used in {ccs} requests. The keys are the names of the search feature, -and the values are the number of requests that used that feature. Single request can use more than one feature -(e.g. both `async` and `wildcard`). Known features are: +(object) Contains statistics about the features used in {ccs} requests. +The keys are the names of the search feature, and the values are the number of requests that used that feature. +Single request can use more than one feature (e.g. both `async` and `wildcard`). +Known features are: * `async` - <> @@ -1427,6 +1495,7 @@ This may include requests where partial results were returned, but not requests .Properties of `took` [%collapsible%open] ======== + `max`::: (integer) The maximum time taken to execute a {ccs} request, in milliseconds. @@ -1435,6 +1504,7 @@ This may include requests where partial results were returned, but not requests `p90`::: (integer) The 90th percentile of the time taken to execute {ccs} requests, in milliseconds. + ======== ======= @@ -1812,3 +1882,37 @@ This API can be restricted to a subset of the nodes using <PostgreSQL ++++ @@ -13,8 +13,312 @@ This connector is written in Python using the {connectors-python}[Elastic connec This connector uses the https://github.com/elastic/connectors/blob/{branch}/connectors/sources/generic_database.py[generic database connector source code^] (branch _{connectors-branch}_, compatible with Elastic _{minor-version}_). View the specific {connectors-python}/connectors/sources/{service-name-stub}.py[*source code* for this connector^] (branch _{connectors-branch}_, compatible with Elastic _{minor-version}_). + +.Choose your connector reference +******************************* +Are you using an Elastic managed connector on Elastic Cloud or a self-managed connector? Expand the documentation based on your deployment method. +******************************* + +// //////// //// //// //// //// //// //// //////// +// //////// NATIVE CONNECTOR REFERENCE /////// +// //////// //// //// //// //// //// //// //////// + +[discrete#connectors-postgresql-native-connector-reference] +=== *Elastic managed connector (Elastic Cloud)* + +.View *Elastic managed connector* reference + +[%collapsible] +=============== + +[discrete#connectors-postgresql-availability-prerequisites] +==== Availability and prerequisites + +This connector is available as an *Elastic managed connector* in Elastic versions *8.8.0 and later*. +To use this connector natively in Elastic Cloud, satisfy all <>. + +[discrete#connectors-postgresql-create-native-connector] +==== Create a {service-name} connector +include::_connectors-create-native.asciidoc[] + +[discrete#connectors-postgresql-usage] +==== Usage + +To use this connector as an *Elastic managed connector*, use the *Connector* workflow. +See <>. + +[TIP] +==== +Users must set `track_commit_timestamp` to `on`. +To do this, run `ALTER SYSTEM SET track_commit_timestamp = on;` in PostgreSQL server. +==== + +For additional operations, see <<-esconnectors-usage>>. + +[NOTE] +==== +For an end-to-end example of the connector client workflow, see <>. +==== + +[discrete#connectors-postgresql-compatibility] +==== Compatibility + +PostgreSQL versions 11 to 15 are compatible with the Elastic connector. + +[discrete#connectors-postgresql-configuration] +==== Configuration + +Set the following configuration fields: + +Host:: +The server host address where the PostgreSQL instance is hosted. +Examples: ++ +* `192.158.1.38` +* `demo.instance.demo-region.demo.service.com` + +Port:: +The port where the PostgreSQL instance is hosted. +Examples: ++ +* `5432` (default) + +Username:: +The username of the PostgreSQL account. + +Password:: +The password of the PostgreSQL account. + +Database:: +Name of the PostgreSQL database. +Examples: ++ +* `employee_database` +* `customer_database` + +Schema:: +The schema of the PostgreSQL database. + +Comma-separated List of Tables:: +A list of tables separated by commas. +The PostgreSQL connector will fetch data from all tables present in the configured database, if the value is `*` . +Default value is `*`. +Examples: ++ +* `table_1, table_2` +* `*` ++ +[WARNING] +==== +This field can be bypassed when using advanced sync rules. +==== + +Enable SSL:: +Toggle to enable SSL verification. +Disabled by default. + +SSL Certificate:: +Content of SSL certificate. +If SSL is disabled, the `ssl_ca` value will be ignored. ++ +.*Expand* to see an example certificate +[%collapsible] +==== +``` +-----BEGIN CERTIFICATE----- +MIID+jCCAuKgAwIBAgIGAJJMzlxLMA0GCSqGSIb3DQEBCwUAMHoxCzAJBgNVBAYT +AlVTMQwwCgYDVQQKEwNJQk0xFjAUBgNVBAsTDURlZmF1bHROb2RlMDExFjAUBgNV +BAsTDURlZmF1bHRDZWxsMDExGTAXBgNVBAsTEFJvb3QgQ2VydGlmaWNhdGUxEjAQ +BgNVBAMTCWxvY2FsaG9zdDAeFw0yMTEyMTQyMjA3MTZaFw0yMjEyMTQyMjA3MTZa +MF8xCzAJBgNVBAYTAlVTMQwwCgYDVQQKEwNJQk0xFjAUBgNVBAsTDURlZmF1bHRO +b2RlMDExFjAUBgNVBAsTDURlZmF1bHRDZWxsMDExEjAQBgNVBAMTCWxvY2FsaG9z +dDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAMv5HCsJZIpI5zCy+jXV +z6lmzNc9UcVSEEHn86h6zT6pxuY90TYeAhlZ9hZ+SCKn4OQ4GoDRZhLPTkYDt+wW +CV3NTIy9uCGUSJ6xjCKoxClJmgSQdg5m4HzwfY4ofoEZ5iZQ0Zmt62jGRWc0zuxj +hegnM+eO2reBJYu6Ypa9RPJdYJsmn1RNnC74IDY8Y95qn+WZj//UALCpYfX41hko +i7TWD9GKQO8SBmAxhjCDifOxVBokoxYrNdzESl0LXvnzEadeZTd9BfUtTaBHhx6t +njqqCPrbTY+3jAbZFd4RiERPnhLVKMytw5ot506BhPrUtpr2lusbN5svNXjuLeea +MMUCAwEAAaOBoDCBnTATBgNVHSMEDDAKgAhOatpLwvJFqjAdBgNVHSUEFjAUBggr +BgEFBQcDAQYIKwYBBQUHAwIwVAYDVR0RBE0wS4E+UHJvZmlsZVVVSUQ6QXBwU3J2 +MDEtQkFTRS05MDkzMzJjMC1iNmFiLTQ2OTMtYWI5NC01Mjc1ZDI1MmFmNDiCCWxv +Y2FsaG9zdDARBgNVHQ4ECgQITzqhA5sO8O4wDQYJKoZIhvcNAQELBQADggEBAKR0 +gY/BM69S6BDyWp5dxcpmZ9FS783FBbdUXjVtTkQno+oYURDrhCdsfTLYtqUlP4J4 +CHoskP+MwJjRIoKhPVQMv14Q4VC2J9coYXnePhFjE+6MaZbTjq9WaekGrpKkMaQA +iQt5b67jo7y63CZKIo9yBvs7sxODQzDn3wZwyux2vPegXSaTHR/rop/s/mPk3YTS +hQprs/IVtPoWU4/TsDN3gIlrAYGbcs29CAt5q9MfzkMmKsuDkTZD0ry42VjxjAmk +xw23l/k8RoD1wRWaDVbgpjwSzt+kl+vJE/ip2w3h69eEZ9wbo6scRO5lCO2JM4Pr +7RhLQyWn2u00L7/9Omw= +-----END CERTIFICATE----- +``` +==== + +[discrete#connectors-postgresql-documents-syncs] +==== Documents and syncs + +* Tables must be owned by a PostgreSQL user. +* Tables with no primary key defined are skipped. +* To fetch the last updated time in PostgreSQL, `track_commit_timestamp` must be set to `on`. +Otherwise, all data will be indexed in every sync. + +[NOTE] +==== +* Files bigger than 10 MB won't be extracted. +* Permissions are not synced. +**All documents** indexed to an Elastic deployment will be visible to **all users with access** to that Elastic Deployment. +==== + +[discrete#connectors-postgresql-sync-rules] +==== Sync rules + +<> are identical for all connectors and are available by default. + +[discrete#connectors-postgresql-sync-rules-advanced] +===== Advanced sync rules + +[NOTE] +==== +A <> is required for advanced sync rules to take effect. +==== + +Advanced sync rules are defined through a source-specific DSL JSON snippet. + +[discrete#connectors-postgresql-sync-rules-advanced-example-data] +====== Example data + +Here is some example data that will be used in the following examples. + +[discrete#connectors-postgresql-sync-rules-advanced-example-data-1] +======= `employee` table + +[cols="3*", options="header"] +|=== +| emp_id | name | age +| 3 | John | 28 +| 10 | Jane | 35 +| 14 | Alex | 22 +|=== + +[discrete#connectors-postgresql-sync-rules-advanced-example-2] +======= `customer` table + +[cols="3*", options="header"] +|=== +| c_id | name | age +| 2 | Elm | 24 +| 6 | Pine | 30 +| 9 | Oak | 34 +|=== + +[discrete#connectors-postgresql-sync-rules-advanced-examples] +====== Advanced sync rules examples + +[discrete#connectors-postgresql-sync-rules-advanced-examples-1] +======= Multiple table queries + +[source,js] +---- +[ + { + "tables": [ + "employee" + ], + "query": "SELECT * FROM employee" + }, + { + "tables": [ + "customer" + ], + "query": "SELECT * FROM customer" + } +] +---- +// NOTCONSOLE + +[discrete#connectors-postgresql-sync-rules-advanced-examples-1-id-columns] +======= Multiple table queries with `id_columns` + +In 8.15.0, we added a new optional `id_columns` field in our advanced sync rules for the PostgreSQL connector. +Use the `id_columns` field to ingest tables which do not have a primary key. Include the names of unique fields so that the connector can use them to generate unique IDs for documents. + +[source,js] +---- +[ + { + "tables": [ + "employee" + ], + "query": "SELECT * FROM employee", + "id_columns": ["emp_id"] + }, + { + "tables": [ + "customer" + ], + "query": "SELECT * FROM customer", + "id_columns": ["c_id"] + } +] +---- +// NOTCONSOLE + +This example uses the `id_columns` field to specify the unique fields `emp_id` and `c_id` for the `employee` and `customer` tables, respectively. + +[discrete#connectors-postgresql-sync-rules-advanced-examples-2] +======= Filtering data with `WHERE` clause + +[source,js] +---- +[ + { + "tables": ["employee"], + "query": "SELECT * FROM employee WHERE emp_id > 5" + } +] +---- +// NOTCONSOLE + +[discrete#connectors-postgresql-sync-rules-advanced-examples-3] +======= `JOIN` operations + +[source,js] +---- +[ + { + "tables": ["employee", "customer"], + "query": "SELECT * FROM employee INNER JOIN customer ON employee.emp_id = customer.c_id" + } +] +---- +// NOTCONSOLE + +[WARNING] +==== +When using advanced rules, a query can bypass the configuration field `tables`. +This will happen if the query specifies a table that doesn't appear in the configuration. +This can also happen if the configuration specifies `*` to fetch all tables while the advanced sync rule requests for only a subset of tables. +==== + +[discrete#connectors-postgresql-known-issues] +==== Known issues + +There are no known issues for this connector. +Refer to <> for a list of known issues for all connectors. + +[discrete#connectors-postgresql-troubleshooting] +==== Troubleshooting + +See <>. + +[discrete#connectors-postgresql-security] +==== Security + +See <>. + +// Closing the collapsible section +=============== + [discrete#es-connectors-postgresql-connector-client-reference] -==== *Self-managed connector* +=== *Self-managed connector* .View *self-managed connector* reference @@ -22,19 +326,19 @@ View the specific {connectors-python}/connectors/sources/{service-name-stub}.py[ =============== [discrete#es-connectors-postgresql-client-availability-prerequisites] -===== Availability and prerequisites +==== Availability and prerequisites This connector is available as a self-managed *self-managed connector*. -To use this connector, satisfy all //build-connector,self-managed connector requirements. +To use this connector, satisfy all <>. [discrete#es-connectors-postgresql-create-connector-client] -===== Create a {service-name} connector +==== Create a {service-name} connector include::_connectors-create-client.asciidoc[] [discrete#es-connectors-postgresql-client-usage] -===== Usage +==== Usage -To use this connector as a *self-managed connector*, see //build-connector +To use this connector as a *self-managed connector*, see <>. [TIP] ==== Users must set `track_commit_timestamp` to `on`. @@ -45,20 +349,20 @@ For additional operations, see. [NOTE] ==== -For an end-to-end example of the self-managed connector workflow, see //postgresql-connector-client-tutorial. +For an end-to-end example of the self-managed connector workflow, see <>. ==== [discrete#es-connectors-postgresql-client-compatibility] -===== Compatibility +==== Compatibility PostgreSQL versions 11 to 15 are compatible with Elastic connector frameworks. [discrete#es-connectors-postgresql-client-configuration] -===== Configuration +==== Configuration [TIP] ==== -When using the //build-connector, self-managed connector workflow, initially these fields will use the default configuration set in the https://github.com/elastic/connectors-python/blob/{branch}/connectors/sources/postgresql.py[connector source code^]. +When using the <>, initially these fields will use the default configuration set in the https://github.com/elastic/connectors-python/blob/{branch}/connectors/sources/postgresql.py[connector source code^]. These configurable fields will be rendered with their respective *labels* in the Kibana UI. Once connected, users will be able to update these values in Kibana. @@ -150,12 +454,12 @@ xw23l/k8RoD1wRWaDVbgpjwSzt+kl+vJE/ip2w3h69eEZ9wbo6scRO5lCO2JM4Pr ==== [discrete#es-connectors-postgresql-client-docker] -===== Deployment using Docker +==== Deployment using Docker include::_connectors-docker-instructions.asciidoc[] [discrete#es-connectors-postgresql-client-documents-syncs] -===== Documents and syncs +==== Documents and syncs * Tables must be owned by a PostgreSQL user. * Tables with no primary key defined are skipped. @@ -170,12 +474,12 @@ Otherwise, all data will be indexed in every sync. ==== [discrete#es-connectors-postgresql-client-sync-rules] -===== Sync rules +==== Sync rules //sync-rules-basic,Basic sync rules are identical for all connectors and are available by default. [discrete#es-connectors-postgresql-client-sync-rules-advanced] -====== Advanced sync rules +===== Advanced sync rules [NOTE] ==== @@ -185,12 +489,12 @@ A //connectors-sync-types-full, full sync is required for advanced sync rules to Advanced sync rules are defined through a source-specific DSL JSON snippet. [discrete#es-connectors-postgresql-client-sync-rules-advanced-example-data] -======= Example data +====== Example data Here is some example data that will be used in the following examples. [discrete#es-connectors-postgresql-client-sync-rules-advanced-example-data-1] -======== `employee` table +======= `employee` table [cols="3*", options="header"] |=== @@ -201,7 +505,7 @@ Here is some example data that will be used in the following examples. |=== [discrete#es-connectors-postgresql-client-sync-rules-advanced-example-2] -======== `customer` table +======= `customer` table [cols="3*", options="header"] |=== @@ -212,7 +516,7 @@ Here is some example data that will be used in the following examples. |=== [discrete#es-connectors-postgresql-client-sync-rules-advanced-examples] -======= Advanced sync rules examples +====== Advanced sync rules examples [discrete#es-connectors-postgresql-client-sync-rules-advanced-examples-1] ======== Multiple table queries @@ -301,10 +605,10 @@ This can also happen if the configuration specifies `*` to fetch all tables whil ==== [discrete#es-connectors-postgresql-client-client-operations-testing] -===== End-to-end testing +==== End-to-end testing The connector framework enables operators to run functional tests against a real data source. -Refer to //build-connector-testing for more details. +Refer to <> for more details. To perform E2E testing for the PostgreSQL connector, run the following command: @@ -321,20 +625,20 @@ make ftest NAME=postgresql DATA_SIZE=small ---- [discrete#es-connectors-postgresql-client-known-issues] -===== Known issues +==== Known issues There are no known issues for this connector. -Refer to //connectors-known-issues for a list of known issues for all connectors. +Refer to <> for a list of known issues for all connectors. [discrete#es-connectors-postgresql-client-troubleshooting] -===== Troubleshooting +==== Troubleshooting -See //connectors-troubleshooting. +See <>. [discrete#es-connectors-postgresql-client-security] -===== Security +==== Security -See //connectors-security. +See <>. // Closing the collapsible section -=============== +=============== \ No newline at end of file diff --git a/docs/reference/connector/docs/connectors-redis.asciidoc b/docs/reference/connector/docs/connectors-redis.asciidoc index 5dbd008ee5932..7aad7b0b41497 100644 --- a/docs/reference/connector/docs/connectors-redis.asciidoc +++ b/docs/reference/connector/docs/connectors-redis.asciidoc @@ -1,5 +1,5 @@ [#es-connectors-redis] -==== Redis connector reference +=== Redis connector reference ++++ Redis ++++ @@ -12,7 +12,7 @@ The Redis connector is built with the Elastic connectors Python framework and is View the {connectors-python}/connectors/sources/{service-name-stub}.py[*source code* for this connector^] (branch _{connectors-branch}_, compatible with Elastic _{minor-version}_). [discrete#es-connectors-redis-connector-availability-and-prerequisites] -===== Availability and prerequisites +==== Availability and prerequisites This connector was introduced in Elastic *8.13.0*, available as a *self-managed* self-managed connector. @@ -29,19 +29,19 @@ This connector is in *technical preview* and is subject to change. The design an ==== [discrete#es-connectors-redis-connector-usage] -===== Usage +==== Usage To set up this connector in the UI, select the *Redis* tile when creating a new connector under *Search -> Connectors*. For additional operations, see <>. [discrete#es-connectors-redis-connector-docker] -===== Deploy with Docker +==== Deploy with Docker include::_connectors-docker-instructions.asciidoc[] [discrete#es-connectors-redis-connector-configuration] -===== Configuration +==== Configuration `host` (required):: The IP of your Redis server/cloud. Example: @@ -89,7 +89,7 @@ Specifies the client private key. The value of the key is used to validate the c Depends on `mutual_tls_enabled`. [discrete#es-connectors-redis-connector-documents-and-syncs] -===== Documents and syncs +==== Documents and syncs The connector syncs the following objects and entities: @@ -102,12 +102,12 @@ The connector syncs the following objects and entities: ==== [discrete#es-connectors-redis-connector-sync-rules] -===== Sync rules +==== Sync rules <> are identical for all connectors and are available by default. [discrete#es-connectors-redis-connector-advanced-sync-rules] -===== Advanced Sync Rules +==== Advanced Sync Rules <> are defined through a source-specific DSL JSON snippet. @@ -134,10 +134,10 @@ Provide at least one of the following: `key_pattern` or `type`, or both. ==== [discrete#es-connectors-redis-connector-advanced-sync-rules-examples] -====== Advanced sync rules examples +===== Advanced sync rules examples [discrete#es-connectors-redis-connector-advanced-sync-rules-example-1] -======= Example 1 +====== Example 1 *Fetch database records where keys start with `alpha`*: @@ -153,7 +153,7 @@ Provide at least one of the following: `key_pattern` or `type`, or both. // NOTCONSOLE [discrete#es-connectors-redis-connector-advanced-sync-rules-example-2] -======= Example 2 +====== Example 2 *Fetch database records with exact match by specifying the full key name:* @@ -169,7 +169,7 @@ Provide at least one of the following: `key_pattern` or `type`, or both. // NOTCONSOLE [discrete#es-connectors-redis-connector-advanced-sync-rules-example-3] -======= Example 3 +====== Example 3 *Fetch database records where keys start with `test1`, `test2` or `test3`:* @@ -180,13 +180,12 @@ Provide at least one of the following: `key_pattern` or `type`, or both. "database": 0, "key_pattern": "test[123]" } -] ---- // NOTCONSOLE [discrete#es-connectors-redis-connector-advanced-sync-rules-example-4] -======= Example 4 +====== Example 4 *Exclude database records where keys start with `test1`, `test2` or `test3`:* @@ -202,7 +201,7 @@ Provide at least one of the following: `key_pattern` or `type`, or both. // NOTCONSOLE [discrete#es-connectors-redis-connector-advanced-sync-rules-example-5] -======= Example 5 +====== Example 5 *Fetch all database records:* @@ -218,7 +217,7 @@ Provide at least one of the following: `key_pattern` or `type`, or both. // NOTCONSOLE [discrete#es-connectors-redis-connector-advanced-sync-rules-example-6] -======= Example 6 +====== Example 6 *Fetch all database records where type is `SET`:* @@ -235,7 +234,7 @@ Provide at least one of the following: `key_pattern` or `type`, or both. // NOTCONSOLE [discrete#es-connectors-redis-connector-advanced-sync-rules-example-7] -======= Example 7 +====== Example 7 *Fetch database records where type is `SET`*: @@ -251,10 +250,10 @@ Provide at least one of the following: `key_pattern` or `type`, or both. // NOTCONSOLE [discrete#es-connectors-redis-connector-connector-client-operations] -===== Connector Client operations +==== Connector Client operations [discrete#es-connectors-redis-connector-end-to-end-testing] -====== End-to-end Testing +===== End-to-end Testing The connector framework enables operators to run functional tests against a real data source, using Docker Compose. You don't need a running Elasticsearch instance or Redis source to run this test. @@ -276,7 +275,7 @@ make ftest NAME=redis DATA_SIZE=small By default, `DATA_SIZE=MEDIUM`. [discrete#es-connectors-redis-connector-known-issues] -===== Known issues +==== Known issues * The last modified time is unavailable when retrieving keys/values from the Redis database. As a result, *all objects* are indexed each time an advanced sync rule query is executed. @@ -284,11 +283,11 @@ As a result, *all objects* are indexed each time an advanced sync rule query is Refer to <> for a list of known issues for all connectors. [discrete#es-connectors-redis-connector-troubleshooting] -===== Troubleshooting +==== Troubleshooting See <>. [discrete#es-connectors-redis-connector-security] -===== Security +==== Security See <>. \ No newline at end of file diff --git a/modules/analysis-common/src/main/java/org/elasticsearch/analysis/common/StemmerTokenFilterFactory.java b/modules/analysis-common/src/main/java/org/elasticsearch/analysis/common/StemmerTokenFilterFactory.java index afb3d69733d02..1c71c64311517 100644 --- a/modules/analysis-common/src/main/java/org/elasticsearch/analysis/common/StemmerTokenFilterFactory.java +++ b/modules/analysis-common/src/main/java/org/elasticsearch/analysis/common/StemmerTokenFilterFactory.java @@ -47,6 +47,8 @@ import org.apache.lucene.analysis.snowball.SnowballFilter; import org.apache.lucene.analysis.sv.SwedishLightStemFilter; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.logging.DeprecationCategory; +import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; import org.elasticsearch.index.IndexSettings; @@ -81,6 +83,8 @@ public class StemmerTokenFilterFactory extends AbstractTokenFilterFactory { + private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(StemmerTokenFilterFactory.class); + private static final TokenStream EMPTY_TOKEN_STREAM = new EmptyTokenStream(); private String language; @@ -90,6 +94,20 @@ public class StemmerTokenFilterFactory extends AbstractTokenFilterFactory { this.language = Strings.capitalize(settings.get("language", settings.get("name", "porter"))); // check that we have a valid language by trying to create a TokenStream create(EMPTY_TOKEN_STREAM).close(); + if ("lovins".equalsIgnoreCase(language)) { + deprecationLogger.critical( + DeprecationCategory.ANALYSIS, + "lovins_deprecation", + "The [lovins] stemmer is deprecated and will be removed in a future version." + ); + } + if ("dutch_kp".equalsIgnoreCase(language) || "dutchKp".equalsIgnoreCase(language) || "kp".equalsIgnoreCase(language)) { + deprecationLogger.critical( + DeprecationCategory.ANALYSIS, + "dutch_kp_deprecation", + "The [dutch_kp] stemmer is deprecated and will be removed in a future version." + ); + } } @Override diff --git a/modules/analysis-common/src/test/java/org/elasticsearch/analysis/common/StemmerTokenFilterFactoryTests.java b/modules/analysis-common/src/test/java/org/elasticsearch/analysis/common/StemmerTokenFilterFactoryTests.java index a1c95deb65a52..8f3d52f0174c6 100644 --- a/modules/analysis-common/src/test/java/org/elasticsearch/analysis/common/StemmerTokenFilterFactoryTests.java +++ b/modules/analysis-common/src/test/java/org/elasticsearch/analysis/common/StemmerTokenFilterFactoryTests.java @@ -32,7 +32,6 @@ import static org.hamcrest.Matchers.instanceOf; public class StemmerTokenFilterFactoryTests extends ESTokenStreamTestCase { - private static final CommonAnalysisPlugin PLUGIN = new CommonAnalysisPlugin(); public void testEnglishFilterFactory() throws IOException { @@ -103,4 +102,30 @@ public void testMultipleLanguagesThrowsException() throws IOException { ); assertEquals("Invalid stemmer class specified: [english, light_english]", e.getMessage()); } + + public void testKpDeprecation() throws IOException { + IndexVersion v = IndexVersionUtils.randomVersion(random()); + Settings settings = Settings.builder() + .put("index.analysis.filter.my_kp.type", "stemmer") + .put("index.analysis.filter.my_kp.language", "kp") + .put(SETTING_VERSION_CREATED, v) + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()) + .build(); + + AnalysisTestsHelper.createTestAnalysisFromSettings(settings, PLUGIN); + assertCriticalWarnings("The [dutch_kp] stemmer is deprecated and will be removed in a future version."); + } + + public void testLovinsDeprecation() throws IOException { + IndexVersion v = IndexVersionUtils.randomVersion(random()); + Settings settings = Settings.builder() + .put("index.analysis.filter.my_lovins.type", "stemmer") + .put("index.analysis.filter.my_lovins.language", "lovins") + .put(SETTING_VERSION_CREATED, v) + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()) + .build(); + + AnalysisTestsHelper.createTestAnalysisFromSettings(settings, PLUGIN); + assertCriticalWarnings("The [lovins] stemmer is deprecated and will be removed in a future version."); + } } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.stats.json b/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.stats.json index 4a8ca46ceba8c..23f6ed4ec5b76 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.stats.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.stats.json @@ -32,9 +32,9 @@ ] }, "params":{ - "flat_settings":{ + "include_remotes":{ "type":"boolean", - "description":"Return settings in flat format (default: false)" + "description":"Include remote cluster data into the response (default: false)" }, "timeout":{ "type":"time", diff --git a/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cluster.stats/30_ccs_stats.yml b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cluster.stats/30_ccs_stats.yml new file mode 100644 index 0000000000000..955c68634e617 --- /dev/null +++ b/rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cluster.stats/30_ccs_stats.yml @@ -0,0 +1,151 @@ +--- +"cross-cluster search stats basic": + - requires: + test_runner_features: [ capabilities ] + capabilities: + - method: GET + path: /_cluster/stats + capabilities: + - "ccs-stats" + reason: "Capability required to run test" + + - do: + cluster.stats: { } + + - is_true: ccs + - is_true: ccs._search + - is_false: ccs.clusters # no ccs clusters configured + - exists: ccs._search.total + - exists: ccs._search.success + - exists: ccs._search.skipped + - is_true: ccs._search.took + - is_true: ccs._search.took_mrt_true + - is_true: ccs._search.took_mrt_false + - exists: ccs._search.remotes_per_search_max + - exists: ccs._search.remotes_per_search_avg + - exists: ccs._search.failure_reasons + - exists: ccs._search.features + - exists: ccs._search.clients + - exists: ccs._search.clusters + +--- +"cross-cluster search stats search": + - requires: + test_runner_features: [ capabilities ] + capabilities: + - method: GET + path: /_cluster/stats + capabilities: + - "ccs-stats" + reason: "Capability required to run test" + + - do: + cluster.state: {} + - set: { master_node: master } + - do: + nodes.info: + metric: [ http, transport ] + - set: {nodes.$master.http.publish_address: host} + - set: {nodes.$master.transport.publish_address: transport_host} + + - do: + cluster.put_settings: + body: + persistent: + cluster: + remote: + cluster_one: + seeds: + - "${transport_host}" + skip_unavailable: true + cluster_two: + seeds: + - "${transport_host}" + skip_unavailable: false + - is_true: persistent.cluster.remote.cluster_one + + - do: + indices.create: + index: test + body: + settings: + number_of_replicas: 0 + + - do: + index: + index: test + id: "1" + refresh: true + body: + foo: bar + + - do: + cluster.health: + wait_for_status: green + + - do: + search: + index: "*,*:*" + body: + query: + match: + foo: bar + + - do: + cluster.stats: {} + - is_true: ccs + - is_true: ccs._search + - is_false: ccs.clusters # Still no remotes since include_remotes is not set + + - do: + cluster.stats: + include_remotes: true + - is_true: ccs + - is_true: ccs._search + - is_true: ccs.clusters # Now we have remotes + - is_true: ccs.clusters.cluster_one + - is_true: ccs.clusters.cluster_two + - is_true: ccs.clusters.cluster_one.cluster_uuid + - match: { ccs.clusters.cluster_one.mode: sniff } + - match: { ccs.clusters.cluster_one.skip_unavailable: true } + - match: { ccs.clusters.cluster_two.skip_unavailable: false } + - is_true: ccs.clusters.cluster_one.version + - match: { ccs.clusters.cluster_one.status: green } + - match: { ccs.clusters.cluster_two.status: green } + - is_true: ccs.clusters.cluster_one.nodes_count + - is_true: ccs.clusters.cluster_one.shards_count + - is_true: ccs.clusters.cluster_one.indices_count + - is_true: ccs.clusters.cluster_one.indices_total_size_in_bytes + - is_true: ccs.clusters.cluster_one.max_heap_in_bytes + - is_true: ccs.clusters.cluster_one.mem_total_in_bytes + - is_true: ccs._search.total + - is_true: ccs._search.success + - exists: ccs._search.skipped + - is_true: ccs._search.took + - is_true: ccs._search.took.max + - is_true: ccs._search.took.avg + - is_true: ccs._search.took.p90 + - is_true: ccs._search.took_mrt_true + - exists: ccs._search.took_mrt_true.max + - exists: ccs._search.took_mrt_true.avg + - exists: ccs._search.took_mrt_true.p90 + - is_true: ccs._search.took_mrt_false + - exists: ccs._search.took_mrt_false.max + - exists: ccs._search.took_mrt_false.avg + - exists: ccs._search.took_mrt_false.p90 + - match: { ccs._search.remotes_per_search_max: 2 } + - match: { ccs._search.remotes_per_search_avg: 2.0 } + - exists: ccs._search.failure_reasons + - exists: ccs._search.features + - exists: ccs._search.clients + - is_true: ccs._search.clusters + - is_true: ccs._search.clusters.cluster_one + - is_true: ccs._search.clusters.cluster_two + - gte: {ccs._search.clusters.cluster_one.total: 1} + - gte: {ccs._search.clusters.cluster_two.total: 1} + - exists: ccs._search.clusters.cluster_one.skipped + - exists: ccs._search.clusters.cluster_two.skipped + - is_true: ccs._search.clusters.cluster_one.took + - is_true: ccs._search.clusters.cluster_one.took.max + - is_true: ccs._search.clusters.cluster_one.took.avg + - is_true: ccs._search.clusters.cluster_one.took.p90 diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRemoteIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRemoteIT.java new file mode 100644 index 0000000000000..50fa7cfa1fdef --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRemoteIT.java @@ -0,0 +1,150 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.cluster.stats; + +import org.elasticsearch.Version; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.test.AbstractMultiClustersTestCase; +import org.elasticsearch.test.ESIntegTestCase.ClusterScope; +import org.elasticsearch.test.ESIntegTestCase.Scope; +import org.elasticsearch.test.InternalTestCluster; +import org.junit.Assert; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertResponse; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.equalToIgnoringCase; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.oneOf; + +@ClusterScope(scope = Scope.TEST, numDataNodes = 0) +public class ClusterStatsRemoteIT extends AbstractMultiClustersTestCase { + private static final String REMOTE1 = "cluster-a"; + private static final String REMOTE2 = "cluster-b"; + + private static final String INDEX_NAME = "demo"; + + @Override + protected boolean reuseClusters() { + return false; + } + + @Override + protected Collection remoteClusterAlias() { + return List.of(REMOTE1, REMOTE2); + } + + @Override + protected Map skipUnavailableForRemoteClusters() { + return Map.of(REMOTE1, false, REMOTE2, true); + } + + public void testRemoteClusterStats() throws ExecutionException, InterruptedException { + setupClusters(); + final Client client = client(LOCAL_CLUSTER); + SearchRequest searchRequest = new SearchRequest("*", "*:*"); + searchRequest.allowPartialSearchResults(false); + searchRequest.setCcsMinimizeRoundtrips(randomBoolean()); + searchRequest.source(new SearchSourceBuilder().query(new MatchAllQueryBuilder()).size(10)); + + // do a search + assertResponse(cluster(LOCAL_CLUSTER).client().search(searchRequest), Assert::assertNotNull); + // collect stats without remotes + ClusterStatsResponse response = client.admin().cluster().prepareClusterStats().get(); + assertNotNull(response.getCcsMetrics()); + var remotesUsage = response.getCcsMetrics().getByRemoteCluster(); + assertThat(remotesUsage.size(), equalTo(3)); + assertNull(response.getRemoteClustersStats()); + // collect stats with remotes + response = client.admin().cluster().execute(TransportClusterStatsAction.TYPE, new ClusterStatsRequest(true)).get(); + assertNotNull(response.getCcsMetrics()); + remotesUsage = response.getCcsMetrics().getByRemoteCluster(); + assertThat(remotesUsage.size(), equalTo(3)); + assertNotNull(response.getRemoteClustersStats()); + var remoteStats = response.getRemoteClustersStats(); + assertThat(remoteStats.size(), equalTo(2)); + for (String clusterAlias : remoteClusterAlias()) { + assertThat(remoteStats, hasKey(clusterAlias)); + assertThat(remotesUsage, hasKey(clusterAlias)); + assertThat(remoteStats.get(clusterAlias).status(), equalToIgnoringCase(ClusterHealthStatus.GREEN.name())); + assertThat(remoteStats.get(clusterAlias).indicesCount(), greaterThan(0L)); + assertThat(remoteStats.get(clusterAlias).nodesCount(), greaterThan(0L)); + assertThat(remoteStats.get(clusterAlias).shardsCount(), greaterThan(0L)); + assertThat(remoteStats.get(clusterAlias).heapBytes(), greaterThan(0L)); + assertThat(remoteStats.get(clusterAlias).memBytes(), greaterThan(0L)); + assertThat(remoteStats.get(clusterAlias).indicesBytes(), greaterThan(0L)); + assertThat(remoteStats.get(clusterAlias).versions(), hasItem(Version.CURRENT.toString())); + assertThat(remoteStats.get(clusterAlias).clusterUUID(), not(equalTo(""))); + assertThat(remoteStats.get(clusterAlias).mode(), oneOf("sniff", "proxy")); + } + assertFalse(remoteStats.get(REMOTE1).skipUnavailable()); + assertTrue(remoteStats.get(REMOTE2).skipUnavailable()); + } + + private void setupClusters() { + int numShardsLocal = randomIntBetween(2, 5); + Settings localSettings = indexSettings(numShardsLocal, randomIntBetween(0, 1)).build(); + assertAcked( + client(LOCAL_CLUSTER).admin() + .indices() + .prepareCreate(INDEX_NAME) + .setSettings(localSettings) + .setMapping("@timestamp", "type=date", "f", "type=text") + ); + indexDocs(client(LOCAL_CLUSTER)); + + int numShardsRemote = randomIntBetween(2, 10); + for (String clusterAlias : remoteClusterAlias()) { + final InternalTestCluster remoteCluster = cluster(clusterAlias); + remoteCluster.ensureAtLeastNumDataNodes(randomIntBetween(1, 3)); + assertAcked( + client(clusterAlias).admin() + .indices() + .prepareCreate(INDEX_NAME) + .setSettings(indexSettings(numShardsRemote, randomIntBetween(0, 1))) + .setMapping("@timestamp", "type=date", "f", "type=text") + ); + assertFalse( + client(clusterAlias).admin() + .cluster() + .prepareHealth(TEST_REQUEST_TIMEOUT, INDEX_NAME) + .setWaitForGreenStatus() + .setTimeout(TimeValue.timeValueSeconds(30)) + .get() + .isTimedOut() + ); + indexDocs(client(clusterAlias)); + } + + } + + private void indexDocs(Client client) { + int numDocs = between(5, 20); + for (int i = 0; i < numDocs; i++) { + client.prepareIndex(INDEX_NAME).setSource("f", "v", "@timestamp", randomNonNegativeLong()).get(); + } + client.admin().indices().prepareRefresh(INDEX_NAME).get(); + } + +} diff --git a/server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/IndicesMetricsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/IndicesMetricsIT.java new file mode 100644 index 0000000000000..b72257b884f08 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/monitor/metrics/IndicesMetricsIT.java @@ -0,0 +1,245 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.monitor.metrics; + +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.telemetry.Measurement; +import org.elasticsearch.telemetry.TestTelemetryPlugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.hamcrest.Matcher; + +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.index.mapper.DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0) +public class IndicesMetricsIT extends ESIntegTestCase { + + public static class TestAPMInternalSettings extends Plugin { + @Override + public List> getSettings() { + return List.of( + Setting.timeSetting("telemetry.agent.metrics_interval", TimeValue.timeValueSeconds(0), Setting.Property.NodeScope) + ); + } + } + + @Override + protected Collection> nodePlugins() { + return List.of(TestTelemetryPlugin.class, TestAPMInternalSettings.class); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal, otherSettings)) + .put("telemetry.agent.metrics_interval", TimeValue.timeValueSeconds(0)) // disable metrics cache refresh delay + .build(); + } + + static final String STANDARD_INDEX_COUNT = "es.indices.standard.total"; + static final String STANDARD_DOCS_COUNT = "es.indices.standard.docs.total"; + static final String STANDARD_BYTES_SIZE = "es.indices.standard.bytes.total"; + + static final String TIME_SERIES_INDEX_COUNT = "es.indices.time_series.total"; + static final String TIME_SERIES_DOCS_COUNT = "es.indices.time_series.docs.total"; + static final String TIME_SERIES_BYTES_SIZE = "es.indices.time_series.bytes.total"; + + static final String LOGSDB_INDEX_COUNT = "es.indices.logsdb.total"; + static final String LOGSDB_DOCS_COUNT = "es.indices.logsdb.docs.total"; + static final String LOGSDB_BYTES_SIZE = "es.indices.logsdb.bytes.total"; + + public void testIndicesMetrics() { + String node = internalCluster().startNode(); + ensureStableCluster(1); + final TestTelemetryPlugin telemetry = internalCluster().getInstance(PluginsService.class, node) + .filterPlugins(TestTelemetryPlugin.class) + .findFirst() + .orElseThrow(); + telemetry.resetMeter(); + long numStandardIndices = randomIntBetween(1, 5); + long numStandardDocs = populateStandardIndices(numStandardIndices); + collectThenAssertMetrics( + telemetry, + 1, + Map.of( + STANDARD_INDEX_COUNT, + equalTo(numStandardIndices), + STANDARD_DOCS_COUNT, + equalTo(numStandardDocs), + STANDARD_BYTES_SIZE, + greaterThan(0L), + + TIME_SERIES_INDEX_COUNT, + equalTo(0L), + TIME_SERIES_DOCS_COUNT, + equalTo(0L), + TIME_SERIES_BYTES_SIZE, + equalTo(0L), + + LOGSDB_INDEX_COUNT, + equalTo(0L), + LOGSDB_DOCS_COUNT, + equalTo(0L), + LOGSDB_BYTES_SIZE, + equalTo(0L) + ) + ); + + long numTimeSeriesIndices = randomIntBetween(1, 5); + long numTimeSeriesDocs = populateTimeSeriesIndices(numTimeSeriesIndices); + collectThenAssertMetrics( + telemetry, + 2, + Map.of( + STANDARD_INDEX_COUNT, + equalTo(numStandardIndices), + STANDARD_DOCS_COUNT, + equalTo(numStandardDocs), + STANDARD_BYTES_SIZE, + greaterThan(0L), + + TIME_SERIES_INDEX_COUNT, + equalTo(numTimeSeriesIndices), + TIME_SERIES_DOCS_COUNT, + equalTo(numTimeSeriesDocs), + TIME_SERIES_BYTES_SIZE, + greaterThan(20L), + + LOGSDB_INDEX_COUNT, + equalTo(0L), + LOGSDB_DOCS_COUNT, + equalTo(0L), + LOGSDB_BYTES_SIZE, + equalTo(0L) + ) + ); + + long numLogsdbIndices = randomIntBetween(1, 5); + long numLogsdbDocs = populateLogsdbIndices(numLogsdbIndices); + collectThenAssertMetrics( + telemetry, + 3, + Map.of( + STANDARD_INDEX_COUNT, + equalTo(numStandardIndices), + STANDARD_DOCS_COUNT, + equalTo(numStandardDocs), + STANDARD_BYTES_SIZE, + greaterThan(0L), + + TIME_SERIES_INDEX_COUNT, + equalTo(numTimeSeriesIndices), + TIME_SERIES_DOCS_COUNT, + equalTo(numTimeSeriesDocs), + TIME_SERIES_BYTES_SIZE, + greaterThan(20L), + + LOGSDB_INDEX_COUNT, + equalTo(numLogsdbIndices), + LOGSDB_DOCS_COUNT, + equalTo(numLogsdbDocs), + LOGSDB_BYTES_SIZE, + greaterThan(0L) + ) + ); + } + + void collectThenAssertMetrics(TestTelemetryPlugin telemetry, int times, Map> matchers) { + telemetry.collect(); + for (Map.Entry> e : matchers.entrySet()) { + String name = e.getKey(); + List measurements = telemetry.getLongGaugeMeasurement(name); + assertThat(name, measurements, hasSize(times)); + assertThat(name, measurements.getLast().getLong(), e.getValue()); + } + } + + int populateStandardIndices(long numIndices) { + int totalDocs = 0; + for (int i = 0; i < numIndices; i++) { + String indexName = "standard-" + i; + createIndex(indexName); + int numDocs = between(1, 5); + for (int d = 0; d < numDocs; d++) { + indexDoc(indexName, Integer.toString(d), "f", Integer.toString(d)); + } + totalDocs += numDocs; + flush(indexName); + } + return totalDocs; + } + + int populateTimeSeriesIndices(long numIndices) { + int totalDocs = 0; + for (int i = 0; i < numIndices; i++) { + String indexName = "time_series-" + i; + Settings settings = Settings.builder().put("mode", "time_series").putList("routing_path", List.of("host")).build(); + client().admin() + .indices() + .prepareCreate(indexName) + .setSettings(settings) + .setMapping( + "@timestamp", + "type=date", + "host", + "type=keyword,time_series_dimension=true", + "cpu", + "type=long,time_series_metric=gauge" + ) + .get(); + long timestamp = DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-04-15T00:00:00Z"); + int numDocs = between(1, 5); + for (int d = 0; d < numDocs; d++) { + timestamp += between(1, 5) * 1000L; + client().prepareIndex(indexName) + .setSource("@timestamp", timestamp, "host", randomFrom("prod", "qa"), "cpu", randomIntBetween(1, 100)) + .get(); + } + totalDocs += numDocs; + flush(indexName); + } + return totalDocs; + } + + int populateLogsdbIndices(long numIndices) { + int totalDocs = 0; + for (int i = 0; i < numIndices; i++) { + String indexName = "logsdb-" + i; + Settings settings = Settings.builder().put("mode", "logsdb").build(); + client().admin() + .indices() + .prepareCreate(indexName) + .setSettings(settings) + .setMapping("@timestamp", "type=date", "host.name", "type=keyword", "cpu", "type=long") + .get(); + long timestamp = DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-04-15T00:00:00Z"); + int numDocs = between(1, 5); + for (int d = 0; d < numDocs; d++) { + timestamp += between(1, 5) * 1000L; + client().prepareIndex(indexName) + .setSource("@timestamp", timestamp, "host.name", randomFrom("prod", "qa"), "cpu", randomIntBetween(1, 100)) + .get(); + } + totalDocs += numDocs; + flush(indexName); + } + return totalDocs; + } +} diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index c233fa9ec2c2b..5d26194c8876c 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -228,7 +228,9 @@ static TransportVersion def(int id) { public static final TransportVersion SEMANTIC_QUERY_INNER_HITS = def(8_752_00_0); public static final TransportVersion RETAIN_ILM_STEP_INFO = def(8_753_00_0); public static final TransportVersion ADD_DATA_STREAM_OPTIONS = def(8_754_00_0); - public static final TransportVersion REGEX_AND_RANGE_INTERVAL_QUERIES = def(8_755_00_0); + public static final TransportVersion CCS_REMOTE_TELEMETRY_STATS = def(8_755_00_0); + public static final TransportVersion REGEX_AND_RANGE_INTERVAL_QUERIES = def(8_756_00_0); + /* * STOP! READ THIS FIRST! No, really, diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRequest.java index d9c55ba097b6c..a62db92687e5a 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRequest.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsRequest.java @@ -20,16 +20,50 @@ * A request to get cluster level stats. */ public class ClusterStatsRequest extends BaseNodesRequest { + /** + * Should the remote cluster stats be included in the response. + */ + private final boolean doRemotes; + /** + * Return stripped down stats for remote clusters. + */ + private boolean remoteStats; + /** * Get stats from nodes based on the nodes ids specified. If none are passed, stats * based on all nodes will be returned. */ public ClusterStatsRequest(String... nodesIds) { + this(false, nodesIds); + } + + public ClusterStatsRequest(boolean doRemotes, String... nodesIds) { super(nodesIds); + this.doRemotes = doRemotes; + this.remoteStats = false; } @Override public Task createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { return new CancellableTask(id, type, action, "", parentTaskId, headers); } + + public ClusterStatsRequest asRemoteStats() { + this.remoteStats = true; + return this; + } + + /** + * Should the remote cluster stats be included in the response. + */ + public boolean doRemotes() { + return doRemotes; + } + + /** + * Should the response be a stripped down version of the stats for remote clusters. + */ + public boolean isRemoteStats() { + return remoteStats; + } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java index 86900f830f4be..1a77a3d4d5399 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/ClusterStatsResponse.java @@ -18,12 +18,15 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.xcontent.ToXContentFragment; import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; import java.util.List; import java.util.Locale; +import java.util.Map; +import java.util.Set; import static org.elasticsearch.action.search.TransportSearchAction.CCS_TELEMETRY_FEATURE_FLAG; @@ -34,10 +37,10 @@ public class ClusterStatsResponse extends BaseNodesResponse remoteClustersStats; public ClusterStatsResponse( long timestamp, @@ -48,7 +51,8 @@ public ClusterStatsResponse( MappingStats mappingStats, AnalysisStats analysisStats, VersionStats versionStats, - ClusterSnapshotStats clusterSnapshotStats + ClusterSnapshotStats clusterSnapshotStats, + Map remoteClustersStats ) { super(clusterName, nodes, failures); this.clusterUUID = clusterUUID; @@ -75,6 +79,7 @@ public ClusterStatsResponse( // stats should be the same on every node so just pick one of them .findAny() .orElse(RepositoryUsageStats.EMPTY); + this.remoteClustersStats = remoteClustersStats; } public String getClusterUUID() { @@ -101,6 +106,10 @@ public CCSTelemetrySnapshot getCcsMetrics() { return ccsMetrics; } + public Map getRemoteClustersStats() { + return remoteClustersStats; + } + @Override public void writeTo(StreamOutput out) throws IOException { TransportAction.localOnly(); @@ -138,6 +147,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (CCS_TELEMETRY_FEATURE_FLAG.isEnabled()) { builder.startObject("ccs"); + if (remoteClustersStats != null) { + builder.field("clusters", remoteClustersStats); + } ccsMetrics.toXContent(builder, params); builder.endObject(); } @@ -150,4 +162,74 @@ public String toString() { return Strings.toString(this, true, true); } + /** + * Represents the information about a remote cluster. + */ + public record RemoteClusterStats( + String clusterUUID, + String mode, + boolean skipUnavailable, + String transportCompress, + Set versions, + String status, + long nodesCount, + long shardsCount, + long indicesCount, + long indicesBytes, + long heapBytes, + long memBytes + ) implements ToXContentFragment { + public RemoteClusterStats(String mode, boolean skipUnavailable, String transportCompress) { + this( + "unavailable", + mode, + skipUnavailable, + transportCompress.toLowerCase(Locale.ROOT), + Set.of(), + "unavailable", + 0, + 0, + 0, + 0, + 0, + 0 + ); + } + + public RemoteClusterStats acceptResponse(RemoteClusterStatsResponse remoteResponse) { + return new RemoteClusterStats( + remoteResponse.getClusterUUID(), + mode, + skipUnavailable, + transportCompress, + remoteResponse.getVersions(), + remoteResponse.getStatus().name().toLowerCase(Locale.ROOT), + remoteResponse.getNodesCount(), + remoteResponse.getShardsCount(), + remoteResponse.getIndicesCount(), + remoteResponse.getIndicesBytes(), + remoteResponse.getHeapBytes(), + remoteResponse.getMemBytes() + ); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("cluster_uuid", clusterUUID); + builder.field("mode", mode); + builder.field("skip_unavailable", skipUnavailable); + builder.field("transport.compress", transportCompress); + builder.field("status", status); + builder.field("version", versions); + builder.field("nodes_count", nodesCount); + builder.field("shards_count", shardsCount); + builder.field("indices_count", indicesCount); + builder.humanReadableField("indices_total_size_in_bytes", "indices_total_size", ByteSizeValue.ofBytes(indicesBytes)); + builder.humanReadableField("max_heap_in_bytes", "max_heap", ByteSizeValue.ofBytes(heapBytes)); + builder.humanReadableField("mem_total_in_bytes", "mem_total", ByteSizeValue.ofBytes(memBytes)); + builder.endObject(); + return builder; + } + } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterStatsRequest.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterStatsRequest.java new file mode 100644 index 0000000000000..47843a91351ee --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterStatsRequest.java @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.cluster.stats; + +import org.elasticsearch.TransportVersions; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * A request to get cluster level stats from the remote cluster. + */ +public class RemoteClusterStatsRequest extends ActionRequest { + public RemoteClusterStatsRequest(StreamInput in) throws IOException { + super(in); + } + + public RemoteClusterStatsRequest() { + super(); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + assert out.getTransportVersion().onOrAfter(TransportVersions.CCS_REMOTE_TELEMETRY_STATS) + : "RemoteClusterStatsRequest is not supported by the remote cluster"; + if (out.getTransportVersion().before(TransportVersions.CCS_REMOTE_TELEMETRY_STATS)) { + throw new UnsupportedOperationException("RemoteClusterStatsRequest is not supported by the remote cluster"); + } + super.writeTo(out); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterStatsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterStatsResponse.java new file mode 100644 index 0000000000000..9a140b6b7424e --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/RemoteClusterStatsResponse.java @@ -0,0 +1,116 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ +package org.elasticsearch.action.admin.cluster.stats; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Set; + +/** + * Trimmed down cluster stats response for reporting to a remote cluster. + */ +public class RemoteClusterStatsResponse extends ActionResponse { + final String clusterUUID; + final ClusterHealthStatus status; + private final Set versions; + private final long nodesCount; + private final long shardsCount; + private final long indicesCount; + private final long indicesBytes; + private final long heapBytes; + private final long memBytes; + + public Set getVersions() { + return versions; + } + + public long getNodesCount() { + return nodesCount; + } + + public long getShardsCount() { + return shardsCount; + } + + public long getIndicesCount() { + return indicesCount; + } + + public long getIndicesBytes() { + return indicesBytes; + } + + public long getHeapBytes() { + return heapBytes; + } + + public long getMemBytes() { + return memBytes; + } + + public RemoteClusterStatsResponse( + String clusterUUID, + ClusterHealthStatus status, + Set versions, + long nodesCount, + long shardsCount, + long indicesCount, + long indicesBytes, + long heapBytes, + long memBytes + ) { + this.clusterUUID = clusterUUID; + this.status = status; + this.versions = versions; + this.nodesCount = nodesCount; + this.shardsCount = shardsCount; + this.indicesCount = indicesCount; + this.indicesBytes = indicesBytes; + this.heapBytes = heapBytes; + this.memBytes = memBytes; + } + + public String getClusterUUID() { + return this.clusterUUID; + } + + public ClusterHealthStatus getStatus() { + return this.status; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(clusterUUID); + status.writeTo(out); + out.writeStringCollection(versions); + out.writeLong(nodesCount); + out.writeLong(shardsCount); + out.writeLong(indicesCount); + out.writeLong(indicesBytes); + out.writeLong(heapBytes); + out.writeLong(memBytes); + } + + public RemoteClusterStatsResponse(StreamInput in) throws IOException { + super(in); + this.clusterUUID = in.readString(); + this.status = ClusterHealthStatus.readFrom(in); + this.versions = in.readCollectionAsSet(StreamInput::readString); + this.nodesCount = in.readLong(); + this.shardsCount = in.readLong(); + this.indicesCount = in.readLong(); + this.indicesBytes = in.readLong(); + this.heapBytes = in.readLong(); + this.memBytes = in.readLong(); + } +} diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 6cac8c8f8ca09..ab68f1d8481fd 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -9,6 +9,8 @@ package org.elasticsearch.action.admin.cluster.stats; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; @@ -16,10 +18,12 @@ import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse.RemoteClusterStats; import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.CancellableFanOut; import org.elasticsearch.action.support.RefCountingListener; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.action.support.nodes.TransportNodesAction; @@ -32,6 +36,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.CancellableSingleObjectCache; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.core.UpdateForV9; @@ -48,6 +53,9 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClusterConnection; +import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.RemoteConnectionInfo; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.Transports; @@ -56,12 +64,19 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import java.util.function.BiFunction; import java.util.function.BooleanSupplier; +import java.util.stream.Collectors; +import static org.elasticsearch.TransportVersions.CCS_REMOTE_TELEMETRY_STATS; + +/** + * Transport action implementing _cluster/stats API. + */ public class TransportClusterStatsAction extends TransportNodesAction< ClusterStatsRequest, ClusterStatsResponse, @@ -70,6 +85,7 @@ public class TransportClusterStatsAction extends TransportNodesAction< SubscribableListener> { public static final ActionType TYPE = new ActionType<>("cluster:monitor/stats"); + private static final CommonStatsFlags SHARD_STATS_FLAGS = new CommonStatsFlags( CommonStatsFlags.Flag.Docs, CommonStatsFlags.Flag.Store, @@ -80,7 +96,9 @@ public class TransportClusterStatsAction extends TransportNodesAction< CommonStatsFlags.Flag.DenseVector, CommonStatsFlags.Flag.SparseVector ); + private static final Logger logger = LogManager.getLogger(TransportClusterStatsAction.class); + private final Settings settings; private final NodeService nodeService; private final IndicesService indicesService; private final RepositoriesService repositoriesService; @@ -90,6 +108,8 @@ public class TransportClusterStatsAction extends TransportNodesAction< private final Executor clusterStateStatsExecutor; private final MetadataStatsCache mappingStatsCache; private final MetadataStatsCache analysisStatsCache; + private final RemoteClusterService remoteClusterService; + private final TransportRemoteClusterStatsAction remoteClusterStatsAction; @Inject public TransportClusterStatsAction( @@ -100,7 +120,9 @@ public TransportClusterStatsAction( IndicesService indicesService, RepositoriesService repositoriesService, UsageService usageService, - ActionFilters actionFilters + ActionFilters actionFilters, + Settings settings, + TransportRemoteClusterStatsAction remoteClusterStatsAction ) { super( TYPE.name(), @@ -118,6 +140,9 @@ public TransportClusterStatsAction( this.clusterStateStatsExecutor = threadPool.executor(ThreadPool.Names.MANAGEMENT); this.mappingStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), MappingStats::of); this.analysisStatsCache = new MetadataStatsCache<>(threadPool.getThreadContext(), AnalysisStats::of); + this.remoteClusterService = transportService.getRemoteClusterService(); + this.settings = settings; + this.remoteClusterStatsAction = remoteClusterStatsAction; } @Override @@ -125,14 +150,13 @@ protected SubscribableListener createActionContext(Task task, C assert task instanceof CancellableTask; final var cancellableTask = (CancellableTask) task; final var additionalStatsListener = new SubscribableListener(); - AdditionalStats.compute( - cancellableTask, - clusterStateStatsExecutor, - clusterService, - mappingStatsCache, - analysisStatsCache, - additionalStatsListener - ); + if (request.isRemoteStats() == false) { + final AdditionalStats additionalStats = new AdditionalStats(); + additionalStats.compute(cancellableTask, request, additionalStatsListener); + } else { + // For remote stats request, we don't need to compute anything + additionalStatsListener.onResponse(null); + } return additionalStatsListener; } @@ -150,18 +174,34 @@ protected void newResponseAsync( + "the cluster state that are too slow for a transport thread" ); assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.MANAGEMENT); + additionalStatsListener.andThenApply( - additionalStats -> new ClusterStatsResponse( - System.currentTimeMillis(), - additionalStats.clusterUUID(), - clusterService.getClusterName(), - responses, - failures, - additionalStats.mappingStats(), - additionalStats.analysisStats(), - VersionStats.of(clusterService.state().metadata(), responses), - additionalStats.clusterSnapshotStats() - ) + additionalStats -> request.isRemoteStats() + // Return stripped down stats for remote clusters + ? new ClusterStatsResponse( + System.currentTimeMillis(), + clusterService.state().metadata().clusterUUID(), + clusterService.getClusterName(), + responses, + List.of(), + null, + null, + null, + null, + Map.of() + ) + : new ClusterStatsResponse( + System.currentTimeMillis(), + additionalStats.clusterUUID(), + clusterService.getClusterName(), + responses, + failures, + additionalStats.mappingStats(), + additionalStats.analysisStats(), + VersionStats.of(clusterService.state().metadata(), responses), + additionalStats.clusterSnapshotStats(), + additionalStats.getRemoteStats() + ) ).addListener(listener); } @@ -315,36 +355,33 @@ protected boolean isFresh(Long currentKey, Long newKey) { } } - public static final class AdditionalStats { + public final class AdditionalStats { private String clusterUUID; private MappingStats mappingStats; private AnalysisStats analysisStats; private ClusterSnapshotStats clusterSnapshotStats; + private Map remoteStats; - static void compute( - CancellableTask task, - Executor executor, - ClusterService clusterService, - MetadataStatsCache mappingStatsCache, - MetadataStatsCache analysisStatsCache, - ActionListener listener - ) { - executor.execute(ActionRunnable.wrap(listener, l -> { + void compute(CancellableTask task, ClusterStatsRequest request, ActionListener listener) { + clusterStateStatsExecutor.execute(ActionRunnable.wrap(listener, l -> { task.ensureNotCancelled(); - final var result = new AdditionalStats(); - result.compute( + internalCompute( + task, + request, clusterService.state(), mappingStatsCache, analysisStatsCache, task::isCancelled, clusterService.threadPool().absoluteTimeInMillis(), - l.map(ignored -> result) + l.map(ignored -> this) ); })); } - private void compute( + private void internalCompute( + CancellableTask task, + ClusterStatsRequest request, ClusterState clusterState, MetadataStatsCache mappingStatsCache, MetadataStatsCache analysisStatsCache, @@ -358,6 +395,18 @@ private void compute( mappingStatsCache.get(metadata, isCancelledSupplier, listeners.acquire(s -> mappingStats = s)); analysisStatsCache.get(metadata, isCancelledSupplier, listeners.acquire(s -> analysisStats = s)); clusterSnapshotStats = ClusterSnapshotStats.of(clusterState, absoluteTimeInMillis); + if (doRemotes(request)) { + var remotes = remoteClusterService.getRegisteredRemoteClusterNames(); + if (remotes.isEmpty()) { + remoteStats = Map.of(); + } else { + new RemoteStatsFanout(task, transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION)).start( + task, + remotes, + listeners.acquire(s -> remoteStats = s) + ); + } + } } } @@ -376,5 +425,79 @@ AnalysisStats analysisStats() { ClusterSnapshotStats clusterSnapshotStats() { return clusterSnapshotStats; } + + public Map getRemoteStats() { + return remoteStats; + } + } + + private static boolean doRemotes(ClusterStatsRequest request) { + return request.doRemotes(); + } + + private class RemoteStatsFanout extends CancellableFanOut> { + private final Executor requestExecutor; + private final TaskId taskId; + private Map remoteClustersStats; + + RemoteStatsFanout(Task task, Executor requestExecutor) { + this.requestExecutor = requestExecutor; + this.taskId = new TaskId(clusterService.getNodeName(), task.getId()); + } + + @Override + protected void sendItemRequest(String clusterAlias, ActionListener listener) { + var remoteClusterClient = remoteClusterService.getRemoteClusterClient( + clusterAlias, + requestExecutor, + RemoteClusterService.DisconnectedStrategy.RECONNECT_IF_DISCONNECTED + ); + var remoteRequest = new RemoteClusterStatsRequest(); + remoteRequest.setParentTask(taskId); + remoteClusterClient.getConnection(remoteRequest, listener.delegateFailureAndWrap((responseListener, connection) -> { + if (connection.getTransportVersion().before(CCS_REMOTE_TELEMETRY_STATS)) { + responseListener.onResponse(null); + } else { + remoteClusterClient.execute(connection, TransportRemoteClusterStatsAction.REMOTE_TYPE, remoteRequest, responseListener); + } + })); + } + + @Override + protected void onItemResponse(String clusterAlias, RemoteClusterStatsResponse response) { + if (response != null) { + remoteClustersStats.computeIfPresent(clusterAlias, (k, v) -> v.acceptResponse(response)); + } + } + + @Override + protected void onItemFailure(String clusterAlias, Exception e) { + logger.warn("Failed to get remote cluster stats for [{}]: {}", clusterAlias, e); + } + + void start(Task task, Collection remotes, ActionListener> listener) { + this.remoteClustersStats = remotes.stream().collect(Collectors.toConcurrentMap(r -> r, this::makeRemoteClusterStats)); + super.run(task, remotes.iterator(), listener); + } + + /** + * Create static portion of RemoteClusterStats for a given cluster alias. + */ + RemoteClusterStats makeRemoteClusterStats(String clusterAlias) { + RemoteClusterConnection remoteConnection = remoteClusterService.getRemoteClusterConnection(clusterAlias); + RemoteConnectionInfo remoteConnectionInfo = remoteConnection.getConnectionInfo(); + var compression = RemoteClusterService.REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings); + return new RemoteClusterStats( + remoteConnectionInfo.getModeInfo().modeName(), + remoteConnection.isSkipUnavailable(), + compression.toString() + ); + } + + @Override + protected Map onCompletion() { + return remoteClustersStats; + } } + } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportRemoteClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportRemoteClusterStatsAction.java new file mode 100644 index 0000000000000..4d57f10807af6 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportRemoteClusterStatsAction.java @@ -0,0 +1,65 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.admin.cluster.stats; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.RemoteClusterActionType; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.TransportService; + +/** + * Handler action for incoming {@link RemoteClusterStatsRequest}. + * Will pass the work to {@link TransportClusterStatsAction} and return the response. + */ +public class TransportRemoteClusterStatsAction extends HandledTransportAction { + + public static final String NAME = "cluster:monitor/stats/remote"; + public static final ActionType TYPE = new ActionType<>(NAME); + public static final RemoteClusterActionType REMOTE_TYPE = new RemoteClusterActionType<>( + NAME, + RemoteClusterStatsResponse::new + ); + private final NodeClient client; + + @Inject + public TransportRemoteClusterStatsAction(NodeClient client, TransportService transportService, ActionFilters actionFilters) { + super(NAME, transportService, actionFilters, RemoteClusterStatsRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE); + this.client = client; + } + + @Override + protected void doExecute(Task task, RemoteClusterStatsRequest request, ActionListener listener) { + ClusterStatsRequest subRequest = new ClusterStatsRequest().asRemoteStats(); + subRequest.setParentTask(request.getParentTask()); + client.execute( + TransportClusterStatsAction.TYPE, + subRequest, + listener.map( + clusterStatsResponse -> new RemoteClusterStatsResponse( + clusterStatsResponse.getClusterUUID(), + clusterStatsResponse.getStatus(), + clusterStatsResponse.getNodesStats().getVersions(), + clusterStatsResponse.getNodesStats().getCounts().getTotal(), + clusterStatsResponse.getIndicesStats().getShards().getTotal(), + clusterStatsResponse.getIndicesStats().getIndexCount(), + clusterStatsResponse.getIndicesStats().getStore().sizeInBytes(), + clusterStatsResponse.getNodesStats().getJvm().getHeapMax().getBytes(), + clusterStatsResponse.getNodesStats().getOs().getMem().getTotal().getBytes() + ) + ) + ); + } +} diff --git a/server/src/main/java/org/elasticsearch/bootstrap/BootstrapChecks.java b/server/src/main/java/org/elasticsearch/bootstrap/BootstrapChecks.java index 566c8001dea56..021ad8127a2d0 100644 --- a/server/src/main/java/org/elasticsearch/bootstrap/BootstrapChecks.java +++ b/server/src/main/java/org/elasticsearch/bootstrap/BootstrapChecks.java @@ -412,12 +412,12 @@ static class MaxFileSizeCheck implements BootstrapCheck { @Override public BootstrapCheckResult check(BootstrapContext context) { - final long maxFileSize = getMaxFileSize(); + final long maxFileSize = getProcessLimits().maxFileSize(); if (maxFileSize != Long.MIN_VALUE && maxFileSize != ProcessLimits.UNLIMITED) { final String message = String.format( Locale.ROOT, "max file size [%d] for user [%s] is too low, increase to [unlimited]", - getMaxFileSize(), + maxFileSize, BootstrapInfo.getSystemProperties().get("user.name") ); return BootstrapCheckResult.failure(message); @@ -426,8 +426,8 @@ public BootstrapCheckResult check(BootstrapContext context) { } } - long getMaxFileSize() { - return NativeAccess.instance().getProcessLimits().maxVirtualMemorySize(); + protected ProcessLimits getProcessLimits() { + return NativeAccess.instance().getProcessLimits(); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/mapper/MapperFeatures.java b/server/src/main/java/org/elasticsearch/index/mapper/MapperFeatures.java index 2f665fd5d1e6a..31df558492b35 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/MapperFeatures.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/MapperFeatures.java @@ -36,6 +36,7 @@ public Set getFeatures() { NodeMappingStats.SEGMENT_LEVEL_FIELDS_STATS, BooleanFieldMapper.BOOLEAN_DIMENSION, ObjectMapper.SUBOBJECTS_AUTO, + ObjectMapper.SUBOBJECTS_AUTO_FIXES, KeywordFieldMapper.KEYWORD_NORMALIZER_SYNTHETIC_SOURCE, SourceFieldMapper.SYNTHETIC_SOURCE_STORED_FIELDS_ADVANCE_FIX, Mapper.SYNTHETIC_SOURCE_KEEP_FEATURE, diff --git a/server/src/main/java/org/elasticsearch/index/mapper/ObjectMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/ObjectMapper.java index f9c854749e885..40019566adaa8 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/ObjectMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/ObjectMapper.java @@ -45,6 +45,7 @@ public class ObjectMapper extends Mapper { public static final String CONTENT_TYPE = "object"; static final String STORE_ARRAY_SOURCE_PARAM = "store_array_source"; static final NodeFeature SUBOBJECTS_AUTO = new NodeFeature("mapper.subobjects_auto"); + static final NodeFeature SUBOBJECTS_AUTO_FIXES = new NodeFeature("mapper.subobjects_auto_fixes"); /** * Enhances the previously boolean option for subobjects support with an intermediate mode `auto` that uses diff --git a/server/src/main/java/org/elasticsearch/monitor/metrics/IndicesMetrics.java b/server/src/main/java/org/elasticsearch/monitor/metrics/IndicesMetrics.java new file mode 100644 index 0000000000000..17e290283d5e0 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/monitor/metrics/IndicesMetrics.java @@ -0,0 +1,177 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.monitor.metrics; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.util.SingleObjectCache; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.IndexMode; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.shard.IllegalIndexShardStateException; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.telemetry.metric.LongWithAttributes; +import org.elasticsearch.telemetry.metric.MeterRegistry; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; + +/** + * {@link IndicesMetrics} monitors index statistics on an Elasticsearch node and exposes them as metrics + * through the provided {@link MeterRegistry}. It tracks the current total number of indices, document count, and + * store size (in bytes) for each index mode. + */ +public class IndicesMetrics extends AbstractLifecycleComponent { + private final Logger logger = LogManager.getLogger(IndicesMetrics.class); + private final MeterRegistry registry; + private final List metrics = new ArrayList<>(); + private final IndicesStatsCache stateCache; + + public IndicesMetrics(MeterRegistry meterRegistry, IndicesService indicesService, TimeValue metricsInterval) { + this.registry = meterRegistry; + // Use half of the update interval to ensure that results aren't cached across updates, + // while preventing the cache from expiring when reading different gauges within the same update. + var cacheExpiry = new TimeValue(metricsInterval.getMillis() / 2); + this.stateCache = new IndicesStatsCache(indicesService, cacheExpiry); + } + + private static List registerAsyncMetrics(MeterRegistry registry, IndicesStatsCache cache) { + List metrics = new ArrayList<>(IndexMode.values().length * 3); + assert IndexMode.values().length == 3 : "index modes have changed"; + for (IndexMode indexMode : IndexMode.values()) { + String name = indexMode.getName(); + metrics.add( + registry.registerLongGauge( + "es.indices." + name + ".total", + "total number of " + name + " indices", + "unit", + () -> new LongWithAttributes(cache.getOrRefresh().get(indexMode).numIndices) + ) + ); + metrics.add( + registry.registerLongGauge( + "es.indices." + name + ".docs.total", + "total documents of " + name + " indices", + "unit", + () -> new LongWithAttributes(cache.getOrRefresh().get(indexMode).numDocs) + ) + ); + metrics.add( + registry.registerLongGauge( + "es.indices." + name + ".bytes.total", + "total size in bytes of " + name + " indices", + "unit", + () -> new LongWithAttributes(cache.getOrRefresh().get(indexMode).numBytes) + ) + ); + } + return metrics; + } + + @Override + protected void doStart() { + metrics.addAll(registerAsyncMetrics(registry, stateCache)); + } + + @Override + protected void doStop() { + stateCache.stopRefreshing(); + } + + @Override + protected void doClose() throws IOException { + metrics.forEach(metric -> { + try { + metric.close(); + } catch (Exception e) { + logger.warn("metrics close() method should not throw Exception", e); + } + }); + } + + static class IndexStats { + int numIndices = 0; + long numDocs = 0; + long numBytes = 0; + } + + private static class IndicesStatsCache extends SingleObjectCache> { + private static final Map MISSING_STATS; + static { + MISSING_STATS = new EnumMap<>(IndexMode.class); + for (IndexMode value : IndexMode.values()) { + MISSING_STATS.put(value, new IndexStats()); + } + } + + private boolean refresh; + private final IndicesService indicesService; + + IndicesStatsCache(IndicesService indicesService, TimeValue interval) { + super(interval, MISSING_STATS); + this.indicesService = indicesService; + this.refresh = true; + } + + private Map internalGetIndicesStats() { + Map stats = new EnumMap<>(IndexMode.class); + for (IndexMode mode : IndexMode.values()) { + stats.put(mode, new IndexStats()); + } + for (IndexService indexService : indicesService) { + for (IndexShard indexShard : indexService) { + if (indexShard.isSystem()) { + continue; // skip system indices + } + final ShardRouting shardRouting = indexShard.routingEntry(); + if (shardRouting.primary() == false) { + continue; // count primaries only + } + if (shardRouting.recoverySource() != null) { + continue; // exclude relocating shards + } + final IndexMode indexMode = indexShard.indexSettings().getMode(); + final IndexStats indexStats = stats.get(indexMode); + if (shardRouting.shardId().id() == 0) { + indexStats.numIndices++; + } + try { + indexStats.numDocs += indexShard.commitStats().getNumDocs(); + indexStats.numBytes += indexShard.storeStats().sizeInBytes(); + } catch (IllegalIndexShardStateException | AlreadyClosedException ignored) { + // ignored + } + } + } + return stats; + } + + @Override + protected Map refresh() { + return refresh ? internalGetIndicesStats() : getNoRefresh(); + } + + @Override + protected boolean needsRefresh() { + return getNoRefresh() == MISSING_STATS || super.needsRefresh(); + } + + void stopRefreshing() { + this.refresh = false; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 1447ac1c5b59b..5024cc5468866 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -66,6 +66,7 @@ import org.elasticsearch.injection.guice.Injector; import org.elasticsearch.monitor.fs.FsHealthService; import org.elasticsearch.monitor.jvm.JvmInfo; +import org.elasticsearch.monitor.metrics.IndicesMetrics; import org.elasticsearch.monitor.metrics.NodeMetrics; import org.elasticsearch.node.internal.TerminationHandler; import org.elasticsearch.plugins.ClusterCoordinationPlugin; @@ -441,6 +442,7 @@ public void onTimeout(TimeValue timeout) { } injector.getInstance(NodeMetrics.class).start(); + injector.getInstance(IndicesMetrics.class).start(); injector.getInstance(HealthPeriodicLogger.class).start(); logger.info("started {}", transportService.getLocalNode()); @@ -489,6 +491,7 @@ private void stop() { stopIfStarted(SearchService.class); stopIfStarted(TransportService.class); stopIfStarted(NodeMetrics.class); + stopIfStarted(IndicesMetrics.class); pluginLifecycleComponents.forEach(Node::stopIfStarted); // we should stop this last since it waits for resources to get released @@ -558,6 +561,7 @@ public synchronized void close() throws IOException { toClose.add(() -> stopWatch.stop().start("transport")); toClose.add(injector.getInstance(TransportService.class)); toClose.add(injector.getInstance(NodeMetrics.class)); + toClose.add(injector.getInstance(IndicesService.class)); if (ReadinessService.enabled(environment)) { toClose.add(injector.getInstance(ReadinessService.class)); } diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 97ade19228a3d..9a7dfba095723 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -142,6 +142,7 @@ import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.monitor.fs.FsHealthService; import org.elasticsearch.monitor.jvm.JvmInfo; +import org.elasticsearch.monitor.metrics.IndicesMetrics; import org.elasticsearch.monitor.metrics.NodeMetrics; import org.elasticsearch.node.internal.TerminationHandler; import org.elasticsearch.node.internal.TerminationHandlerProvider; @@ -1076,6 +1077,7 @@ private void construct( final TimeValue metricsInterval = settings.getAsTime("telemetry.agent.metrics_interval", TimeValue.timeValueSeconds(10)); final NodeMetrics nodeMetrics = new NodeMetrics(telemetryProvider.getMeterRegistry(), nodeService, metricsInterval); + final IndicesMetrics indicesMetrics = new IndicesMetrics(telemetryProvider.getMeterRegistry(), indicesService, metricsInterval); final SearchService searchService = serviceProvider.newSearchService( pluginsService, @@ -1175,6 +1177,7 @@ private void construct( b.bind(Transport.class).toInstance(transport); b.bind(TransportService.class).toInstance(transportService); b.bind(NodeMetrics.class).toInstance(nodeMetrics); + b.bind(IndicesMetrics.class).toInstance(indicesMetrics); b.bind(NetworkService.class).toInstance(networkService); b.bind(IndexMetadataVerifier.class).toInstance(indexMetadataVerifier); b.bind(ClusterInfoService.class).toInstance(clusterInfoService); diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterStatsAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterStatsAction.java index 603dcdba86730..53ae50bc0b75f 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestClusterStatsAction.java @@ -11,6 +11,8 @@ import org.elasticsearch.action.admin.cluster.stats.ClusterStatsRequest; import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.common.util.FeatureFlag; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.Scope; @@ -29,6 +31,8 @@ public class RestClusterStatsAction extends BaseRestHandler { private static final Set SUPPORTED_CAPABILITIES = Set.of("human-readable-total-docs-size"); + private static final Set SUPPORTED_CAPABILITIES_CCS_STATS = Sets.union(SUPPORTED_CAPABILITIES, Set.of("ccs-stats")); + public static final FeatureFlag CCS_TELEMETRY_FEATURE_FLAG = new FeatureFlag("ccs_telemetry"); @Override public List routes() { @@ -40,9 +44,17 @@ public String getName() { return "cluster_stats_action"; } + @Override + public Set supportedQueryParameters() { + return Set.of("include_remotes", "nodeId"); + } + @Override public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { - ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest(request.paramAsStringArray("nodeId", null)); + ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest( + request.paramAsBoolean("include_remotes", false), + request.paramAsStringArray("nodeId", null) + ); clusterStatsRequest.timeout(getTimeout(request)); return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).admin() .cluster() @@ -56,6 +68,6 @@ public boolean canTripCircuitBreaker() { @Override public Set supportedCapabilities() { - return SUPPORTED_CAPABILITIES; + return CCS_TELEMETRY_FEATURE_FLAG.isEnabled() ? SUPPORTED_CAPABILITIES_CCS_STATS : SUPPORTED_CAPABILITIES; } } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index d0638fcf7a2de..f0cafb956457e 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -43,7 +43,7 @@ * {@link SniffConnectionStrategy#REMOTE_CONNECTIONS_PER_CLUSTER} until either all eligible nodes are exhausted or the maximum number of * connections per cluster has been reached. */ -final class RemoteClusterConnection implements Closeable { +public final class RemoteClusterConnection implements Closeable { private final TransportService transportService; private final RemoteConnectionManager remoteConnectionManager; @@ -99,7 +99,7 @@ void setSkipUnavailable(boolean skipUnavailable) { /** * Returns whether this cluster is configured to be skipped when unavailable */ - boolean isSkipUnavailable() { + public boolean isSkipUnavailable() { return skipUnavailable; } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index f1afdfe1f186b..620b80e91cb45 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -277,7 +277,7 @@ public void maybeEnsureConnectedAndGetConnection( } } - RemoteClusterConnection getRemoteClusterConnection(String cluster) { + public RemoteClusterConnection getRemoteClusterConnection(String cluster) { if (enabled == false) { throw new IllegalArgumentException( "this node does not have the " + DiscoveryNodeRole.REMOTE_CLUSTER_CLIENT_ROLE.roleName() + " role" diff --git a/server/src/test/java/org/elasticsearch/bootstrap/BootstrapChecksTests.java b/server/src/test/java/org/elasticsearch/bootstrap/BootstrapChecksTests.java index 9a51757189f8b..8c3749dbd3a45 100644 --- a/server/src/test/java/org/elasticsearch/bootstrap/BootstrapChecksTests.java +++ b/server/src/test/java/org/elasticsearch/bootstrap/BootstrapChecksTests.java @@ -389,8 +389,8 @@ public void testMaxFileSizeCheck() throws NodeValidationException { final AtomicLong maxFileSize = new AtomicLong(randomIntBetween(0, Integer.MAX_VALUE)); final BootstrapChecks.MaxFileSizeCheck check = new BootstrapChecks.MaxFileSizeCheck() { @Override - long getMaxFileSize() { - return maxFileSize.get(); + protected ProcessLimits getProcessLimits() { + return new ProcessLimits(ProcessLimits.UNKNOWN, ProcessLimits.UNKNOWN, maxFileSize.get()); } }; diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/action/elastic/ElasticInferenceServiceActionCreator.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/action/elastic/ElasticInferenceServiceActionCreator.java index ea2295979c480..c8ada6e535b63 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/action/elastic/ElasticInferenceServiceActionCreator.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/action/elastic/ElasticInferenceServiceActionCreator.java @@ -13,6 +13,7 @@ import org.elasticsearch.xpack.inference.external.http.sender.Sender; import org.elasticsearch.xpack.inference.services.ServiceComponents; import org.elasticsearch.xpack.inference.services.elastic.ElasticInferenceServiceSparseEmbeddingsModel; +import org.elasticsearch.xpack.inference.telemetry.TraceContext; import java.util.Objects; @@ -24,14 +25,17 @@ public class ElasticInferenceServiceActionCreator implements ElasticInferenceSer private final ServiceComponents serviceComponents; - public ElasticInferenceServiceActionCreator(Sender sender, ServiceComponents serviceComponents) { + private final TraceContext traceContext; + + public ElasticInferenceServiceActionCreator(Sender sender, ServiceComponents serviceComponents, TraceContext traceContext) { this.sender = Objects.requireNonNull(sender); this.serviceComponents = Objects.requireNonNull(serviceComponents); + this.traceContext = traceContext; } @Override public ExecutableAction create(ElasticInferenceServiceSparseEmbeddingsModel model) { - var requestManager = new ElasticInferenceServiceSparseEmbeddingsRequestManager(model, serviceComponents); + var requestManager = new ElasticInferenceServiceSparseEmbeddingsRequestManager(model, serviceComponents, traceContext); var errorMessage = constructFailedToSendRequestMessage(model.uri(), "Elastic Inference Service sparse embeddings"); return new SenderExecutableAction(sender, requestManager, errorMessage); } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/ElasticInferenceServiceSparseEmbeddingsRequestManager.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/ElasticInferenceServiceSparseEmbeddingsRequestManager.java index b59ac54d5cbb6..e7ee41525f07d 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/ElasticInferenceServiceSparseEmbeddingsRequestManager.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/http/sender/ElasticInferenceServiceSparseEmbeddingsRequestManager.java @@ -19,6 +19,7 @@ import org.elasticsearch.xpack.inference.external.response.elastic.ElasticInferenceServiceSparseEmbeddingsResponseEntity; import org.elasticsearch.xpack.inference.services.ServiceComponents; import org.elasticsearch.xpack.inference.services.elastic.ElasticInferenceServiceSparseEmbeddingsModel; +import org.elasticsearch.xpack.inference.telemetry.TraceContext; import java.util.List; import java.util.function.Supplier; @@ -35,6 +36,8 @@ public class ElasticInferenceServiceSparseEmbeddingsRequestManager extends Elast private final Truncator truncator; + private final TraceContext traceContext; + private static ResponseHandler createSparseEmbeddingsHandler() { return new ElasticInferenceServiceResponseHandler( "Elastic Inference Service sparse embeddings", @@ -44,11 +47,13 @@ private static ResponseHandler createSparseEmbeddingsHandler() { public ElasticInferenceServiceSparseEmbeddingsRequestManager( ElasticInferenceServiceSparseEmbeddingsModel model, - ServiceComponents serviceComponents + ServiceComponents serviceComponents, + TraceContext traceContext ) { super(serviceComponents.threadPool(), model); this.model = model; this.truncator = serviceComponents.truncator(); + this.traceContext = traceContext; } @Override @@ -64,7 +69,8 @@ public void execute( ElasticInferenceServiceSparseEmbeddingsRequest request = new ElasticInferenceServiceSparseEmbeddingsRequest( truncator, truncatedInput, - model + model, + traceContext ); execute(new ExecutableInferenceRequest(requestSender, logger, request, HANDLER, hasRequestCompletedFunction, listener)); } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/request/elastic/ElasticInferenceServiceSparseEmbeddingsRequest.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/request/elastic/ElasticInferenceServiceSparseEmbeddingsRequest.java index 41a2ef1c3ccda..d445a779f8230 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/request/elastic/ElasticInferenceServiceSparseEmbeddingsRequest.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/external/request/elastic/ElasticInferenceServiceSparseEmbeddingsRequest.java @@ -12,11 +12,13 @@ import org.apache.http.entity.ByteArrayEntity; import org.apache.http.message.BasicHeader; import org.elasticsearch.common.Strings; +import org.elasticsearch.tasks.Task; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.inference.common.Truncator; import org.elasticsearch.xpack.inference.external.request.HttpRequest; import org.elasticsearch.xpack.inference.external.request.Request; import org.elasticsearch.xpack.inference.services.elastic.ElasticInferenceServiceSparseEmbeddingsModel; +import org.elasticsearch.xpack.inference.telemetry.TraceContext; import java.net.URI; import java.nio.charset.StandardCharsets; @@ -31,15 +33,19 @@ public class ElasticInferenceServiceSparseEmbeddingsRequest implements ElasticIn private final Truncator.TruncationResult truncationResult; private final Truncator truncator; + private final TraceContext traceContext; + public ElasticInferenceServiceSparseEmbeddingsRequest( Truncator truncator, Truncator.TruncationResult truncationResult, - ElasticInferenceServiceSparseEmbeddingsModel model + ElasticInferenceServiceSparseEmbeddingsModel model, + TraceContext traceContext ) { this.truncator = truncator; this.truncationResult = truncationResult; this.model = Objects.requireNonNull(model); this.uri = model.uri(); + this.traceContext = traceContext; } @Override @@ -50,6 +56,10 @@ public HttpRequest createHttpRequest() { ByteArrayEntity byteEntity = new ByteArrayEntity(requestEntity.getBytes(StandardCharsets.UTF_8)); httpPost.setEntity(byteEntity); + if (traceContext != null) { + propagateTraceContext(httpPost); + } + httpPost.setHeader(new BasicHeader(HttpHeaders.CONTENT_TYPE, XContentType.JSON.mediaType())); return new HttpRequest(httpPost, getInferenceEntityId()); @@ -65,11 +75,15 @@ public URI getURI() { return this.uri; } + public TraceContext getTraceContext() { + return traceContext; + } + @Override public Request truncate() { var truncatedInput = truncator.truncate(truncationResult.input()); - return new ElasticInferenceServiceSparseEmbeddingsRequest(truncator, truncatedInput, model); + return new ElasticInferenceServiceSparseEmbeddingsRequest(truncator, truncatedInput, model, traceContext); } @Override @@ -77,4 +91,16 @@ public boolean[] getTruncationInfo() { return truncationResult.truncated().clone(); } + private void propagateTraceContext(HttpPost httpPost) { + var traceParent = traceContext.traceParent(); + var traceState = traceContext.traceState(); + + if (traceParent != null) { + httpPost.setHeader(Task.TRACE_PARENT_HTTP_HEADER, traceParent); + } + + if (traceState != null) { + httpPost.setHeader(Task.TRACE_STATE, traceState); + } + } } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/ElasticInferenceService.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/ElasticInferenceService.java index 103ddd4c5c5ea..abbe893823b96 100644 --- a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/ElasticInferenceService.java +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/services/elastic/ElasticInferenceService.java @@ -23,6 +23,7 @@ import org.elasticsearch.inference.ModelSecrets; import org.elasticsearch.inference.TaskType; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.tasks.Task; import org.elasticsearch.xpack.core.inference.results.ErrorChunkedInferenceResults; import org.elasticsearch.xpack.core.inference.results.InferenceChunkedSparseEmbeddingResults; import org.elasticsearch.xpack.core.inference.results.SparseEmbeddingResults; @@ -34,6 +35,7 @@ import org.elasticsearch.xpack.inference.services.ConfigurationParseContext; import org.elasticsearch.xpack.inference.services.SenderService; import org.elasticsearch.xpack.inference.services.ServiceComponents; +import org.elasticsearch.xpack.inference.telemetry.TraceContext; import java.util.List; import java.util.Map; @@ -75,8 +77,13 @@ protected void doInfer( return; } + // We extract the trace context here as it's sufficient to propagate the trace information of the REST request, + // which handles the request to the inference API overall (including the outgoing request, which is started in a new thread + // generating a different "traceparent" as every task and every REST request creates a new span). + var currentTraceInfo = getCurrentTraceInfo(); + ElasticInferenceServiceModel elasticInferenceServiceModel = (ElasticInferenceServiceModel) model; - var actionCreator = new ElasticInferenceServiceActionCreator(getSender(), getServiceComponents()); + var actionCreator = new ElasticInferenceServiceActionCreator(getSender(), getServiceComponents(), currentTraceInfo); var action = elasticInferenceServiceModel.accept(actionCreator, taskSettings); action.execute(inputs, timeout, listener); @@ -258,4 +265,13 @@ private ElasticInferenceServiceSparseEmbeddingsModel updateModelWithEmbeddingDet return new ElasticInferenceServiceSparseEmbeddingsModel(model, serviceSettings); } + + private TraceContext getCurrentTraceInfo() { + var threadPool = getServiceComponents().threadPool(); + + var traceParent = threadPool.getThreadContext().getHeader(Task.TRACE_PARENT); + var traceState = threadPool.getThreadContext().getHeader(Task.TRACE_STATE); + + return new TraceContext(traceParent, traceState); + } } diff --git a/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/telemetry/TraceContext.java b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/telemetry/TraceContext.java new file mode 100644 index 0000000000000..05654ed146f16 --- /dev/null +++ b/x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/telemetry/TraceContext.java @@ -0,0 +1,10 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.inference.telemetry; + +public record TraceContext(String traceParent, String traceState) {} diff --git a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/action/elastic/ElasticInferenceServiceActionCreatorTests.java b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/action/elastic/ElasticInferenceServiceActionCreatorTests.java index 1081a60ba6866..02b09917d0065 100644 --- a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/action/elastic/ElasticInferenceServiceActionCreatorTests.java +++ b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/action/elastic/ElasticInferenceServiceActionCreatorTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.xpack.inference.logging.ThrottlerManager; import org.elasticsearch.xpack.inference.results.SparseEmbeddingResultsTests; import org.elasticsearch.xpack.inference.services.elastic.ElasticInferenceServiceSparseEmbeddingsModelTests; +import org.elasticsearch.xpack.inference.telemetry.TraceContext; import org.junit.After; import org.junit.Before; @@ -89,7 +90,7 @@ public void testExecute_ReturnsSuccessfulResponse_ForElserAction() throws IOExce webServer.enqueue(new MockResponse().setResponseCode(200).setBody(responseJson)); var model = ElasticInferenceServiceSparseEmbeddingsModelTests.createModel(getUrl(webServer)); - var actionCreator = new ElasticInferenceServiceActionCreator(sender, createWithEmptySettings(threadPool)); + var actionCreator = new ElasticInferenceServiceActionCreator(sender, createWithEmptySettings(threadPool), createTraceContext()); var action = actionCreator.create(model); PlainActionFuture listener = new PlainActionFuture<>(); @@ -145,7 +146,7 @@ public void testSend_FailsFromInvalidResponseFormat_ForElserAction() throws IOEx webServer.enqueue(new MockResponse().setResponseCode(200).setBody(responseJson)); var model = ElasticInferenceServiceSparseEmbeddingsModelTests.createModel(getUrl(webServer)); - var actionCreator = new ElasticInferenceServiceActionCreator(sender, createWithEmptySettings(threadPool)); + var actionCreator = new ElasticInferenceServiceActionCreator(sender, createWithEmptySettings(threadPool), createTraceContext()); var action = actionCreator.create(model); PlainActionFuture listener = new PlainActionFuture<>(); @@ -197,7 +198,7 @@ public void testExecute_ReturnsSuccessfulResponse_AfterTruncating() throws IOExc webServer.enqueue(new MockResponse().setResponseCode(200).setBody(responseJson)); var model = ElasticInferenceServiceSparseEmbeddingsModelTests.createModel(getUrl(webServer)); - var actionCreator = new ElasticInferenceServiceActionCreator(sender, createWithEmptySettings(threadPool)); + var actionCreator = new ElasticInferenceServiceActionCreator(sender, createWithEmptySettings(threadPool), createTraceContext()); var action = actionCreator.create(model); PlainActionFuture listener = new PlainActionFuture<>(); @@ -257,7 +258,7 @@ public void testExecute_TruncatesInputBeforeSending() throws IOException { // truncated to 1 token = 3 characters var model = ElasticInferenceServiceSparseEmbeddingsModelTests.createModel(getUrl(webServer), 1); - var actionCreator = new ElasticInferenceServiceActionCreator(sender, createWithEmptySettings(threadPool)); + var actionCreator = new ElasticInferenceServiceActionCreator(sender, createWithEmptySettings(threadPool), createTraceContext()); var action = actionCreator.create(model); PlainActionFuture listener = new PlainActionFuture<>(); @@ -286,4 +287,8 @@ public void testExecute_TruncatesInputBeforeSending() throws IOException { } } + private TraceContext createTraceContext() { + return new TraceContext(randomAlphaOfLength(10), randomAlphaOfLength(10)); + } + } diff --git a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/request/elastic/ElasticInferenceServiceSparseEmbeddingsRequestTests.java b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/request/elastic/ElasticInferenceServiceSparseEmbeddingsRequestTests.java index 0f2c859fb62d5..9d3bbe2ed12ae 100644 --- a/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/request/elastic/ElasticInferenceServiceSparseEmbeddingsRequestTests.java +++ b/x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/external/request/elastic/ElasticInferenceServiceSparseEmbeddingsRequestTests.java @@ -9,11 +9,13 @@ import org.apache.http.HttpHeaders; import org.apache.http.client.methods.HttpPost; +import org.elasticsearch.tasks.Task; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xpack.inference.common.Truncator; import org.elasticsearch.xpack.inference.common.TruncatorTests; import org.elasticsearch.xpack.inference.services.elastic.ElasticInferenceServiceSparseEmbeddingsModelTests; +import org.elasticsearch.xpack.inference.telemetry.TraceContext; import java.io.IOException; import java.util.List; @@ -42,6 +44,23 @@ public void testCreateHttpRequest() throws IOException { assertThat(requestMap.get("input"), is(List.of(input))); } + public void testTraceContextPropagatedThroughHTTPHeaders() { + var url = "http://eis-gateway.com"; + var input = "input"; + + var request = createRequest(url, input); + var httpRequest = request.createHttpRequest(); + + assertThat(httpRequest.httpRequestBase(), instanceOf(HttpPost.class)); + var httpPost = (HttpPost) httpRequest.httpRequestBase(); + + var traceParent = request.getTraceContext().traceParent(); + var traceState = request.getTraceContext().traceState(); + + assertThat(httpPost.getLastHeader(Task.TRACE_PARENT_HTTP_HEADER).getValue(), is(traceParent)); + assertThat(httpPost.getLastHeader(Task.TRACE_STATE).getValue(), is(traceState)); + } + public void testTruncate_ReducesInputTextSizeByHalf() throws IOException { var url = "http://eis-gateway.com"; var input = "abcd"; @@ -75,7 +94,8 @@ public ElasticInferenceServiceSparseEmbeddingsRequest createRequest(String url, return new ElasticInferenceServiceSparseEmbeddingsRequest( TruncatorTests.createTruncator(), new Truncator.TruncationResult(List.of(input), new boolean[] { false }), - embeddingsModel + embeddingsModel, + new TraceContext(randomAlphaOfLength(10), randomAlphaOfLength(10)) ); } } diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java index 6afea6faa607e..73ceafc0a24b9 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java @@ -434,7 +434,8 @@ public void testToXContent() throws IOException { MappingStats.of(metadata, () -> {}), AnalysisStats.of(metadata, () -> {}), VersionStats.of(metadata, singletonList(mockNodeResponse)), - ClusterSnapshotStats.EMPTY + ClusterSnapshotStats.EMPTY, + null ); final MonitoringDoc.Node node = new MonitoringDoc.Node("_uuid", "_host", "_addr", "_ip", "_name", 1504169190855L);