Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect and display execution metadata for ES|QL cross cluster searches #112595

Merged
merged 70 commits into from
Sep 30, 2024

Conversation

quux00
Copy link
Contributor

@quux00 quux00 commented Sep 6, 2024

Enhance ES|QL responses to include information about took time (search latency), shards, and clusters against which the query was executed.

The goal of this PR is to begin to provide parity between the metadata displayed for cross-cluster searches in _search and ES|QL.

This PR adds the following features:

  • add overall took time to all ES|QL query responses. And to emphasize: "all" here means: async search, sync search, local-only and cross-cluster searches, so it goes beyond just CCS.
  • add _clusters metadata to the final response for cross-cluster searches, for both async and sync search (see example below)
  • tracking/reporting counts of skipped shards from the can_match (SearchShards API) phase of ES|QL processing
  • marking clusters as skipped if they cannot be connected to (during the field-caps phase of processing)

Out of scope for this PR:

  • honoring the skip_unavailable cluster setting
  • showing _clusters metadata in the async response while the search is still running
  • showing any shard failure messages (since any shard search failures in ES|QL are automatically fatal and _cluster/details is not shown in 4xx/5xx error responses). Note that this also means that the failed shard count is always 0 in ES|QL _clusters section.

Things changed with respect to behavior in _search:

  • the timed_out field in _clusters/details/mycluster was removed in the ESQL response, since ESQL does not support timeouts. It could be added back later if/when ESQL supports timeouts.
  • the failures array in _clusters/details/mycluster/_shards was removed in the ESQL response, since any shard failure causes the whole query to fail.

Example output from ES|QL CCS:

POST /_query
{
  "query": "from blogs,remote2:blo*,remote1:blogs|\nkeep authors.first_name,publish_date|\n limit 5"
}
{
  "took": 49,
  "columns": [
    {
      "name": "authors.first_name",
      "type": "text"
    },
    {
      "name": "publish_date",
      "type": "date"
    }
  ],
  "values": [
    [
      "Tammy",
      "2009-11-04T04:08:07.000Z"
    ],
    [
      "Theresa",
      "2019-05-10T21:22:32.000Z"
    ],
    [
      "Jason",
      "2021-11-23T00:57:30.000Z"
    ],
    [
      "Craig",
      "2019-12-14T21:24:29.000Z"
    ],
    [
      "Alexandra",
      "2013-02-15T18:13:24.000Z"
    ]
  ],
  "_clusters": {
    "total": 3,
    "successful": 2,
    "running": 0,
    "skipped": 1,
    "partial": 0,
    "failed": 0,
    "details": {
      "(local)": {
        "status": "successful",
        "indices": "blogs",
        "took": 43,
        "_shards": {
          "total": 13,
          "successful": 13,
          "skipped": 0,
          "failed": 0
        }
      },
      "remote2": {
        "status": "skipped",  // remote2 was offline when this query was run
        "indices": "remote2:blo*",
        "took": 0,
        "_shards": {
          "total": 0,
          "successful": 0,
          "skipped": 0,
          "failed": 0
        }
      },
      "remote1": {
        "status": "successful",
        "indices": "remote1:blogs",
        "took": 47,
        "_shards": {
          "total": 13,
          "successful": 13,
          "skipped": 0,
          "failed": 0
        }
      }
    }
  }
}

Fixes #112402 and #110935

@quux00 quux00 force-pushed the esql/ccs-execution-info2 branch 9 times, most recently from 7ca990c to c8f1841 Compare September 11, 2024 14:42
@quux00 quux00 force-pushed the esql/ccs-execution-info2 branch 5 times, most recently from 0195ac2 to 9ff909d Compare September 13, 2024 13:14
@quux00
Copy link
Contributor Author

quux00 commented Sep 13, 2024

Question for reviewers / Product Managers:

Do we want took time in millis or nanos?

The argument for using millis:

took time will be consistent with _search which uses millis, so I've gone with millis, since my goal was to keep the metadata like _search as much as possible unless we decide to change some parts.

This potentially makes it simpler for Kibana as well if they already have code that parses and displays took time.

The argument for using nanos:

ESQL Driver Profiles use took_nanos. So should we also use nanos for overall search latency? Will using millis feel inconsistent from profiles to end users?

And apparently ESQL already issues a took-nanos HTTP header, based on this issue: #110935. I tracked that down - the took-nanos for the header is added here: https://github.com/elastic/elasticsearch/blob/main/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResponseListener.java.

Based on #110935, I wonder if the EsqlResponseListener implementation of tracking total took time is flawed. It doesn't work for async-searches that are still running when the initial response is returned. Should we remove/change this implementation? Should it instead grab the overall took time out of the EsqlQueryResponse that I've now added? I asked that question in the code also in this commit.

@quux00
Copy link
Contributor Author

quux00 commented Sep 13, 2024

Question for reviewers / Product Managers:

Do we want to have the partial status in the ESQL metadata? Is that possible to have now? Will it ever be possible?

I've kept all the cluster statuses that we use on the _search side. These are the meanings of those statuses in _search:

successful: All shards searched returned successfully
running: Search is still running on that cluster
partial: At least one shard search failed or the search timed out on the server side (and thus may have searched all index segments), so we are returning partial data.
skipped: No shards were searched because the cluster was offline and marked as skip_unavailable=true
failed: The cluster was offline and marked as skip_unavailable=false. This causes the whole search to fail (return 4xx/5xx)

Do we want to keep these same status meanings in ES|QL?

UPDATE: Based on reviewer feedback and a discussion with Product, we will keep the same meanings, but not document the partial state as a possible status in the ES|QL API docs, since it currently can never be set. We will keep it in the code pending needing that state in the future, if ES|QL ever starts returning partial data (either due some some shard searches failing or ES|QL timeouts, as we have in _search).

@quux00
Copy link
Contributor Author

quux00 commented Sep 13, 2024

Edge case question for reviewers / Product Managers:

What should the status of the cluster be in the following scenario?

The user makes a cross-cluster query where the index expression on a cluster matches no indices:

POST /_query
{
  "query": "from blogs,remote2:no_such_index,remote1:blogs|\nkeep authors.first_name,publish_date|\n limit 5"
}

Currently, in the code in this PR, I mark this as "SUCCESSFUL" (see example below), but with total shards = 0, indicating that nothing was searched. Is that the behavior we want? Or should we mark it as "SKIPPED"?

(toggle) **ES|QL response when cluster specified with index expression that matching no indices**
{
  "took": 25,
  "columns": [
    {
      "name": "authors.first_name",
      "type": "text"
    },
    {
      "name": "publish_date",
      "type": "date"
    }
  ],
  "values": [
    [
      "Theresa",
      "2020-09-14T23:13:55.000Z"
    ],
  ...
  ],
  "_clusters": {
    "total": 3,
    "successful": 2,
    "running": 0,
    "skipped": 1,
    "partial": 0,
    "failed": 0,
    "details": {
      "(local)": {
        "status": "successful",
        "indices": "blogs",
        "took": 24,
        "_shards": {
          "total": 13,
          "successful": 13,
          "skipped": 0,
          "failed": 0
        }
      },
      "remote2": {
        "status": "successful",
        "indices": "remote2:blogs",
        "took": 24,
        "_shards": {
          "total": 4,
          "successful": 4,
          "skipped": 0,
          "failed": 0
        }
      },
      "remote1": {
        "status": "successful",
        "indices": "remote1:blogs",
        "took": 0,
        "_shards": {
          "total": 0,
          "successful": 0,
          "skipped": 0,
          "failed": 0
        }
      }
    }
  }
}

Comparison to behavior in _search

In _search, the handling of this differs depending on whether the user used a wildcard or not. With a wildcard that matches nothing you get status=successful, _shards.total=0:

        "remote1": {
          "status": "successful",
          "indices": "x*",
          "took": 0,
          "timed_out": false,
          "_shards": {
            "total": 0,
            "successful": 0,
            "skipped": 0,
            "failed": 0
          }
        }

With no wildcard, you get status=skipped and a no_such_index failure entry:

        "remote1": {
          "status": "skipped",
          "indices": "my_index",
          "timed_out": false,
          "failures": [
            {
              "shard": -1,
              "index": null,
              "reason": {
                "type": "index_not_found_exception",
                "reason": "no such index [my_index]",
                "index_uuid": "_na_",
                "resource.type": "index_or_alias",
                "resource.id": "my_index",
                "index": "my_index"
              }
            }
          ]
        }

@quux00 quux00 added >enhancement Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) :Analytics/ES|QL AKA ESQL labels Sep 13, 2024
@elasticsearchmachine
Copy link
Collaborator

Hi @quux00, I've created a changelog YAML for you.

@quux00 quux00 added auto-backport-and-merge Automatically create backport pull requests and merge when ready v8.16.0 labels Sep 13, 2024
@quux00 quux00 marked this pull request as ready for review September 13, 2024 16:28
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

@quux00 quux00 requested a review from a team as a code owner September 13, 2024 16:28
@quux00 quux00 requested a review from nik9000 September 13, 2024 16:28
…es in ComputeListener.

This however requires adding additional state to ComputeListener to know what "context" each compute
listener is running so it knows what actions to take.
@quux00 quux00 requested a review from dnhatn September 26, 2024 20:54
Copy link
Member

@nik9000 nik9000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a big improvement. I asked for a few things, but I don't think we need another review.

}
}
----
// TEST[skip: cross-cluster testing env not set up]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

this(profiles, null, null, null, null, null);
}

ComputeResponse(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

INLINESTATS is too busted at the moment to test this with properly

this(profiles, null, null, null, null, null);
}

ComputeResponse(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the distraction.

@@ -163,9 +174,17 @@ public String query() {
* Note: Currently, it returns {@link System#currentTimeMillis()}, but this value will be serialized between nodes.
*/
public long absoluteStartedTimeInMillis() {
// MP TODO: I'm confused - Why is this not a fixed value taken at the start of the query processing?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yikes. I'll bet this is old. It should totally be serialized too. looks like a bug to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But not as part of this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened #113709

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could just use queryStartTimeMillis here. but not in this PR because I have no idea if it'd do something unexpected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, these two fields/methods feel redundant but this PR has so much going on I didn't want to futz with it here since the pre-existing case is unclear to me. Thanks for creating a follow-on ticket.

for (String c : clustersWithNoMatchingIndices) {
executionInfo.swapCluster(
c,
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be nicer if these were NOT_MATCHING. Could you get that into this one? I know it's not super important, but it'd save some serialization changes.

Copy link
Contributor Author

@quux00 quux00 Sep 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take that one to Product. I have an open question with product on how to handle this case - this adds a fifth option.

Copy link
Contributor

@astefan astefan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM
Sorry for the long wait.

Copy link
Member

@dnhatn dnhatn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed the production changes, and they look good to me. Thanks, Michael!

for (SearchShardsGroup group : resp.getGroups()) {
var shardId = group.shardId();
if (group.skipped()) {
totalShards++;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you move totalShard++ outside and remove line 566?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@@ -747,22 +790,33 @@ public void messageReceived(DataNodeRequest request, TransportChannel channel, T
private class ClusterRequestHandler implements TransportRequestHandler<ClusterComputeRequest> {
@Override
public void messageReceived(ClusterComputeRequest request, TransportChannel channel, Task task) {
ChannelActionListener<ComputeResponse> listener = new ChannelActionListener<>(channel);
ChannelActionListener<ComputeResponse> lnr = new ChannelActionListener<>(channel);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the renaming is a leftover?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Thanks.

/**
* @return true if the "local" querying/coordinator cluster is being searched in a cross-cluster search
*/
private boolean coordinatingClusterIsSearchedInCCS() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove the methods coordinatingClusterIsSearchedInCCS, runningOnRemoteCluster, and isCCSListener, shouldRecordTookTime as we should be able to collect execution info similarly to how we handle profiles and warnings. These methods require careful consideration of when and how to pass the clusterAlias, which I think increases the risk of errors. That said, it's entirely up to you whether to keep them as they are or remove them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that's true. I find the ComputeListener very hard to reason about, so I could be wrong. The core problems with the profiles/warnings approach is that they accumulate with an intent to be shown in the final response, but the EsqlExecutionInfo needs to be updated in real time as the search is running so that when we add _clusters to the async-search response of still running searches (in a later PR) it will have all the current data in the EsqlExecution info, not some possibly inaccessible rollup with the ComputeListener.

And I don't see how we can remove runningOnRemoteCluster() since we need to know in the refs Listener whether to populate the ComputeResponse with execution info metadata or not. The remote and local cluster refs listeners have to act differently.

I also don't see how we can remove isCCSListener since the acquireCompute handler needs to know whether it is listener for a ComputeResponse with remote metadata or not, as again it needs to behave differently than a data-node handler listener. That's why I split them into separate methods in my earlier iteration of the PR, making the context or use case clear.

Finally, another reason context is needed is because when the Status of a cluster gets set to SUCCESSFUL differs between remote and local clusters. For remote clusters, we mark it done when the remote ComputeResponse comes back. But for local, we have to wait until the coordinator is done (in case it has to run operators that aggregate/merge data from the remotes), so those get set in different places, and context/state is needed to know what code to execute in these methods that are shared for the 4 uses cases for which ComputeListener is used.

Again, I could be wrong and there's a cleaner way to do this, but I was not able to figure it out. I'd need some detailed guidance on how we solve the above problems and do local rollup accumulations like we do for profiles and warnings.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's discuss this later.

@quux00 quux00 merged commit ddba474 into elastic:main Sep 30, 2024
16 checks passed
@elasticsearchmachine
Copy link
Collaborator

💚 Backport successful

Status Branch Result
8.x

quux00 added a commit to quux00/elasticsearch that referenced this pull request Sep 30, 2024
…es (elastic#112595)

Enhance ES|QL responses to include information about `took` time (search latency), shards, and
clusters against which the query was executed.

The goal of this PR is to begin to provide parity between the metadata displayed for 
cross-cluster searches in _search and ES|QL.

This PR adds the following features:
- add overall `took` time to all ES|QL query responses. And to emphasize: "all" here 
means: async search, sync search, local-only and cross-cluster searches, so it goes
beyond just CCS.
- add `_clusters` metadata to the final response for cross-cluster searches, for both
async and sync search (see example below)
- tracking/reporting counts of skipped shards from the can_match (SearchShards API)
phase of ES|QL processing
- marking clusters as skipped if they cannot be connected to (during the field-caps
phase of processing)

Out of scope for this PR:
- honoring the `skip_unavailable` cluster setting
- showing `_clusters` metadata in the async response **while** the search is still running
- showing any shard failure messages (since any shard search failures in ES|QL are
automatically fatal and _cluster/details is not shown in 4xx/5xx error responses). Note that 
this also means that the `failed` shard count is always 0 in ES|QL `_clusters` section.

Things changed with respect to behavior in `_search`:
- the `timed_out` field in `_clusters/details/mycluster` was removed in the ESQL
response, since ESQL does not support timeouts. It could be added back later
if/when ESQL supports timeouts.
- the `failures` array in `_clusters/details/mycluster/_shards` was removed in the ESQL
response, since any shard failure causes the whole query to fail.

Example output from ES|QL CCS:

```es
POST /_query
{
  "query": "from blogs,remote2:bl*,remote1:blogs|\nkeep authors.first_name,publish_date|\n limit 5"
}
```

```json
{
  "took": 49,
  "columns": [
    {
      "name": "authors.first_name",
      "type": "text"
    },
    {
      "name": "publish_date",
      "type": "date"
    }
  ],
  "values": [
    [
      "Tammy",
      "2009-11-04T04:08:07.000Z"
    ],
    [
      "Theresa",
      "2019-05-10T21:22:32.000Z"
    ],
    [
      "Jason",
      "2021-11-23T00:57:30.000Z"
    ],
    [
      "Craig",
      "2019-12-14T21:24:29.000Z"
    ],
    [
      "Alexandra",
      "2013-02-15T18:13:24.000Z"
    ]
  ],
  "_clusters": {
    "total": 3,
    "successful": 2,
    "running": 0,
    "skipped": 1,
    "partial": 0,
    "failed": 0,
    "details": {
      "(local)": {
        "status": "successful",
        "indices": "blogs",
        "took": 43,
        "_shards": {
          "total": 13,
          "successful": 13,
          "skipped": 0,
          "failed": 0
        }
      },
      "remote2": {
        "status": "skipped",  // remote2 was offline when this query was run
        "indices": "remote2:bl*",
        "took": 0,
        "_shards": {
          "total": 0,
          "successful": 0,
          "skipped": 0,
          "failed": 0
        }
      },
      "remote1": {
        "status": "successful",
        "indices": "remote1:blogs",
        "took": 47,
        "_shards": {
          "total": 13,
          "successful": 13,
          "skipped": 0,
          "failed": 0
        }
      }
    }
  }
}
```

Fixes elastic#112402 and elastic#110935
elasticsearchmachine pushed a commit that referenced this pull request Sep 30, 2024
…es (#112595) (#113820)

Enhance ES|QL responses to include information about `took` time (search latency), shards, and
clusters against which the query was executed.

The goal of this PR is to begin to provide parity between the metadata displayed for 
cross-cluster searches in _search and ES|QL.

This PR adds the following features:
- add overall `took` time to all ES|QL query responses. And to emphasize: "all" here 
means: async search, sync search, local-only and cross-cluster searches, so it goes
beyond just CCS.
- add `_clusters` metadata to the final response for cross-cluster searches, for both
async and sync search (see example below)
- tracking/reporting counts of skipped shards from the can_match (SearchShards API)
phase of ES|QL processing
- marking clusters as skipped if they cannot be connected to (during the field-caps
phase of processing)

Out of scope for this PR:
- honoring the `skip_unavailable` cluster setting
- showing `_clusters` metadata in the async response **while** the search is still running
- showing any shard failure messages (since any shard search failures in ES|QL are
automatically fatal and _cluster/details is not shown in 4xx/5xx error responses). Note that 
this also means that the `failed` shard count is always 0 in ES|QL `_clusters` section.

Things changed with respect to behavior in `_search`:
- the `timed_out` field in `_clusters/details/mycluster` was removed in the ESQL
response, since ESQL does not support timeouts. It could be added back later
if/when ESQL supports timeouts.
- the `failures` array in `_clusters/details/mycluster/_shards` was removed in the ESQL
response, since any shard failure causes the whole query to fail.

Example output from ES|QL CCS:

```es
POST /_query
{
  "query": "from blogs,remote2:bl*,remote1:blogs|\nkeep authors.first_name,publish_date|\n limit 5"
}
```

```json
{
  "took": 49,
  "columns": [
    {
      "name": "authors.first_name",
      "type": "text"
    },
    {
      "name": "publish_date",
      "type": "date"
    }
  ],
  "values": [
    [
      "Tammy",
      "2009-11-04T04:08:07.000Z"
    ],
    [
      "Theresa",
      "2019-05-10T21:22:32.000Z"
    ],
    [
      "Jason",
      "2021-11-23T00:57:30.000Z"
    ],
    [
      "Craig",
      "2019-12-14T21:24:29.000Z"
    ],
    [
      "Alexandra",
      "2013-02-15T18:13:24.000Z"
    ]
  ],
  "_clusters": {
    "total": 3,
    "successful": 2,
    "running": 0,
    "skipped": 1,
    "partial": 0,
    "failed": 0,
    "details": {
      "(local)": {
        "status": "successful",
        "indices": "blogs",
        "took": 43,
        "_shards": {
          "total": 13,
          "successful": 13,
          "skipped": 0,
          "failed": 0
        }
      },
      "remote2": {
        "status": "skipped",  // remote2 was offline when this query was run
        "indices": "remote2:bl*",
        "took": 0,
        "_shards": {
          "total": 0,
          "successful": 0,
          "skipped": 0,
          "failed": 0
        }
      },
      "remote1": {
        "status": "successful",
        "indices": "remote1:blogs",
        "took": 47,
        "_shards": {
          "total": 13,
          "successful": 13,
          "skipped": 0,
          "failed": 0
        }
      }
    }
  }
}
```

Fixes #112402 and #110935
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Analytics/ES|QL AKA ESQL auto-backport-and-merge Automatically create backport pull requests and merge when ready >enhancement Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v8.16.0 v9.0.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Collect and display execution metadata for ES|QL cross cluster searches
6 participants