Skip to content

Commit

Permalink
Update TransportClusterStatsAction code with new action context code
Browse files Browse the repository at this point in the history
  • Loading branch information
smalyshev committed Sep 19, 2024
1 parent 17b144f commit a07206e
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void testRemoteClusterStats() throws ExecutionException, InterruptedExcep
}

private void setupClusters() {
int numShardsLocal = randomIntBetween(2, 10);
int numShardsLocal = randomIntBetween(2, 5);
Settings localSettings = indexSettings(numShardsLocal, randomIntBetween(0, 1)).build();
assertAcked(
client(LOCAL_CLUSTER).admin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.CancellableFanOut;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.RefCountingListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.ClusterSnapshotStats;
import org.elasticsearch.cluster.ClusterState;
Expand Down Expand Up @@ -146,24 +146,18 @@ public TransportClusterStatsAction(
this.remoteClusterStatsAction = remoteClusterStatsAction;
}

@Override
protected void doExecute(Task task, ClusterStatsRequest request, ActionListener<ClusterStatsResponse> listener) {
super.doExecute(task, request, new ActionListenerWithRemotes(task, request, listener));
}

@Override
protected SubscribableListener<AdditionalStats> createActionContext(Task task, ClusterStatsRequest request) {
assert task instanceof CancellableTask;
final var cancellableTask = (CancellableTask) task;
final var additionalStatsListener = new SubscribableListener<AdditionalStats>();
AdditionalStats.compute(
cancellableTask,
clusterStateStatsExecutor,
clusterService,
mappingStatsCache,
analysisStatsCache,
additionalStatsListener
);
if (request.isRemoteStats() == false) {
final AdditionalStats additionalStats = new AdditionalStats();
additionalStats.compute(cancellableTask, request, additionalStatsListener);
} else {
// For remote stats request, we don't need to compute anything
additionalStatsListener.onResponse(null);
}
return additionalStatsListener;
}

Expand Down Expand Up @@ -198,7 +192,7 @@ protected void newResponseAsync(
additionalStats.analysisStats(),
VersionStats.of(clusterService.state().metadata(), responses),
additionalStats.clusterSnapshotStats(),
null
additionalStats.getRemoteStats()
)
).addListener(listener);
}
Expand Down Expand Up @@ -375,36 +369,33 @@ protected boolean isFresh(Long currentKey, Long newKey) {
}
}

public static final class AdditionalStats {
public final class AdditionalStats {

private String clusterUUID;
private MappingStats mappingStats;
private AnalysisStats analysisStats;
private ClusterSnapshotStats clusterSnapshotStats;
private Map<String, RemoteClusterStats> remoteStats;

static void compute(
CancellableTask task,
Executor executor,
ClusterService clusterService,
MetadataStatsCache<MappingStats> mappingStatsCache,
MetadataStatsCache<AnalysisStats> analysisStatsCache,
ActionListener<AdditionalStats> listener
) {
executor.execute(ActionRunnable.wrap(listener, l -> {
void compute(CancellableTask task, ClusterStatsRequest request, ActionListener<AdditionalStats> listener) {
clusterStateStatsExecutor.execute(ActionRunnable.wrap(listener, l -> {
task.ensureNotCancelled();
final var result = new AdditionalStats();
result.compute(
internalCompute(
task,
request,
clusterService.state(),
mappingStatsCache,
analysisStatsCache,
task::isCancelled,
clusterService.threadPool().absoluteTimeInMillis(),
l.map(ignored -> result)
l.map(ignored -> this)
);
}));
}

private void compute(
private void internalCompute(
CancellableTask task,
ClusterStatsRequest request,
ClusterState clusterState,
MetadataStatsCache<MappingStats> mappingStatsCache,
MetadataStatsCache<AnalysisStats> analysisStatsCache,
Expand All @@ -418,6 +409,18 @@ private void compute(
mappingStatsCache.get(metadata, isCancelledSupplier, listeners.acquire(s -> mappingStats = s));
analysisStatsCache.get(metadata, isCancelledSupplier, listeners.acquire(s -> analysisStats = s));
clusterSnapshotStats = ClusterSnapshotStats.of(clusterState, absoluteTimeInMillis);
if (doRemotes(request)) {
var remotes = remoteClusterService.getRegisteredRemoteClusterNames();
if (remotes.isEmpty()) {
remoteStats = Map.of();
} else {
new RemoteStatsFanout(
task,
transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION),
remotes
).start(listeners.acquire(s -> remoteStats = s));
}
}
}
}

Expand All @@ -436,23 +439,25 @@ AnalysisStats analysisStats() {
ClusterSnapshotStats clusterSnapshotStats() {
return clusterSnapshotStats;
}

public Map<String, RemoteClusterStats> getRemoteStats() {
return remoteStats;
}
}

private static boolean doRemotes(ClusterStatsRequest request) {
return CCS_TELEMETRY_FEATURE_FLAG.isEnabled() && request.doRemotes();
}

private class RemoteStatsFanout extends CancellableFanOut<String, RemoteClusterStatsResponse, Map<String, RemoteClusterStats>> {
private final ClusterStatsRequest request;
private final Map<String, RemoteClusterStatsResponse> responses = new ConcurrentHashMap<>();
private final Executor requestExecutor;
private final Task task;
private final TaskId taskId;
private final Collection<String> remotes;

RemoteStatsFanout(Task task, ClusterStatsRequest request, Executor requestExecutor, Collection<String> remotes) {
RemoteStatsFanout(Task task, Executor requestExecutor, Collection<String> remotes) {
this.task = task;
this.request = request;
this.requestExecutor = requestExecutor;
this.taskId = new TaskId(clusterService.getNodeName(), task.getId());
this.remotes = remotes;
Expand Down Expand Up @@ -486,8 +491,8 @@ private boolean isCancelled() {
return task instanceof CancellableTask cancellableTask && cancellableTask.isCancelled();
}

void start(SubscribableListener<Map<String, RemoteClusterStats>> future) {
super.run(task, remotes.iterator(), future);
void start(ActionListener<Map<String, RemoteClusterStats>> listener) {
super.run(task, remotes.iterator(), listener);
}

@Override
Expand Down Expand Up @@ -515,41 +520,4 @@ protected Map<String, RemoteClusterStats> onCompletion() {
}
}

private class ActionListenerWithRemotes implements ActionListener<ClusterStatsResponse> {
private final ActionListener<ClusterStatsResponse> listener;
private final SubscribableListener<Map<String, RemoteClusterStats>> remoteListener;

ActionListenerWithRemotes(Task task, ClusterStatsRequest request, ActionListener<ClusterStatsResponse> listener) {
this.listener = listener;
remoteListener = getStatsFromRemotes(task, request);
}

SubscribableListener<Map<String, RemoteClusterStats>> getStatsFromRemotes(Task task, ClusterStatsRequest request) {
if (doRemotes(request) == false) {
return SubscribableListener.newSucceeded(null);
}
var remotes = remoteClusterService.getRegisteredRemoteClusterNames();
if (remotes.isEmpty()) {
return SubscribableListener.newSucceeded(Map.of());
}
var remotesListener = new SubscribableListener<Map<String, RemoteClusterStats>>();
new RemoteStatsFanout(task, request, transportService.getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION), remotes)
.start(remotesListener);
return remotesListener;
}

SubscribableListener<Map<String, RemoteClusterStats>> getRemoteClusterStats() {
return remoteListener;
}

@Override
public void onResponse(ClusterStatsResponse response) {
listener.onResponse(response);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}
}

0 comments on commit a07206e

Please sign in to comment.