Skip to content

Commit

Permalink
Merge branch '8.x' into backport/8.x/pr-113183
Browse files Browse the repository at this point in the history
  • Loading branch information
nik9000 committed Sep 24, 2024
2 parents c409411 + 0631be5 commit a655c17
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 9 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/113013.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 113013
summary: Account for `DelayedBucket` before reduction
area: Aggregations
type: enhancement
issues: []
5 changes: 5 additions & 0 deletions docs/reference/modules/threadpool.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ There are several thread pools, but the important ones include:
`min(5 * (`<<node.processors, `# of allocated processors`>>`), 50)`
and queue_size of `1000`.

[[modules-threadpool-esql]]`esql_worker`::
Executes <<esql>> operations. Thread pool type is `fixed` with a
size of `int((`<<node.processors, `# of allocated processors`>>
`pass:[ * ]3) / 2) + 1`, and queue_size of `1000`.

Thread pool settings are <<static-cluster-setting,static>> and can be changed by
editing `elasticsearch.yml`. Changing a specific thread pool can be done by
setting its type-specific parameters; for example, changing the number of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
/**
* A wrapper around reducing buckets with the same key that can delay that reduction
* as long as possible. It's stateful and not even close to thread safe.
* <p>
* It is responsibility of the caller to account for buckets created using DelayedBucket.
* It should call {@link #nonCompetitive} to release any possible sub-bucket creation if
* a bucket is rejected from the final response.
*/
public final class DelayedBucket<B extends InternalMultiBucketAggregation.InternalBucket> {
/**
Expand Down Expand Up @@ -45,7 +49,6 @@ public DelayedBucket(List<B> toReduce) {
*/
public B reduced(BiFunction<List<B>, AggregationReduceContext, B> reduce, AggregationReduceContext reduceContext) {
if (reduced == null) {
reduceContext.consumeBucketsAndMaybeBreak(1);
reduced = reduce.apply(toReduce, reduceContext);
toReduce = null;
}
Expand Down Expand Up @@ -95,8 +98,8 @@ public String toString() {
*/
void nonCompetitive(AggregationReduceContext reduceContext) {
if (reduced != null) {
// -1 for itself, -countInnerBucket for all the sub-buckets.
reduceContext.consumeBucketsAndMaybeBreak(-1 - InternalMultiBucketAggregation.countInnerBucket(reduced));
// -countInnerBucket for all the sub-buckets.
reduceContext.consumeBucketsAndMaybeBreak(-InternalMultiBucketAggregation.countInnerBucket(reduced));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,11 @@ public void add(DelayedBucket<B> bucket) {
DelayedBucket<B> removed = queue.insertWithOverflow(bucket);
if (removed != null) {
nonCompetitive.accept(removed);
// release any created sub-buckets
removed.nonCompetitive(reduceContext);
} else {
// add one bucket to the final result
reduceContext.consumeBucketsAndMaybeBreak(1);
}
}

Expand Down Expand Up @@ -183,6 +187,8 @@ public void add(DelayedBucket<B> bucket) {
next.add(bucket);
return;
}
// add one bucket to the final result
reduceContext.consumeBucketsAndMaybeBreak(1);
buffer.add(bucket);
if (buffer.size() < size) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ public InternalAggregation get() {
result = new ArrayList<>();
thisReduceOrder = reduceBuckets(bucketsList, getThisReduceOrder(), bucket -> {
if (result.size() < getRequiredSize()) {
reduceContext.consumeBucketsAndMaybeBreak(1);
result.add(bucket.reduced(AbstractInternalTerms.this::reduceBucket, reduceContext));
} else {
otherDocCount[0] += bucket.getDocCount();
Expand All @@ -311,11 +312,10 @@ public InternalAggregation get() {
result = top.build();
} else {
result = new ArrayList<>();
thisReduceOrder = reduceBuckets(
bucketsList,
getThisReduceOrder(),
bucket -> result.add(bucket.reduced(AbstractInternalTerms.this::reduceBucket, reduceContext))
);
thisReduceOrder = reduceBuckets(bucketsList, getThisReduceOrder(), bucket -> {
reduceContext.consumeBucketsAndMaybeBreak(1);
result.add(bucket.reduced(AbstractInternalTerms.this::reduceBucket, reduceContext));
});
}
for (B r : result) {
if (sumDocCountError == -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class DelayedBucketTests extends ESTestCase {
public void testToString() {
Expand All @@ -40,6 +42,23 @@ public void testReduced() {
assertThat(b.reduced(reduce, context), sameInstance(b.reduced(reduce, context)));
assertThat(b.reduced(reduce, context).getKeyAsString(), equalTo("test"));
assertThat(b.reduced(reduce, context).getDocCount(), equalTo(3L));
// it only accounts for sub-buckets
assertEquals(0, buckets.get());
}

public void testReducedSubAggregation() {
AtomicInteger buckets = new AtomicInteger();
AggregationReduceContext context = new AggregationReduceContext.ForFinal(null, null, () -> false, null, buckets::addAndGet);
BiFunction<List<InternalBucket>, AggregationReduceContext, InternalBucket> reduce = mockReduce(context);
DelayedBucket<InternalBucket> b = new DelayedBucket<>(
List.of(bucket("test", 1, mockMultiBucketAgg()), bucket("test", 2, mockMultiBucketAgg()))
);

assertThat(b.getDocCount(), equalTo(3L));
assertThat(b.reduced(reduce, context), sameInstance(b.reduced(reduce, context)));
assertThat(b.reduced(reduce, context).getKeyAsString(), equalTo("test"));
assertThat(b.reduced(reduce, context).getDocCount(), equalTo(3L));
// it only accounts for sub-buckets
assertEquals(1, buckets.get());
}

Expand Down Expand Up @@ -76,6 +95,19 @@ public void testNonCompetitiveReduced() {
BiFunction<List<InternalBucket>, AggregationReduceContext, InternalBucket> reduce = mockReduce(context);
DelayedBucket<InternalBucket> b = new DelayedBucket<>(List.of(bucket("test", 1)));
b.reduced(reduce, context);
// only account for sub-aggregations
assertEquals(0, buckets.get());
b.nonCompetitive(context);
assertEquals(0, buckets.get());
}

public void testNonCompetitiveReducedSubAggregation() {
AtomicInteger buckets = new AtomicInteger();
AggregationReduceContext context = new AggregationReduceContext.ForFinal(null, null, () -> false, null, buckets::addAndGet);
BiFunction<List<InternalBucket>, AggregationReduceContext, InternalBucket> reduce = mockReduce(context);
DelayedBucket<InternalBucket> b = new DelayedBucket<>(List.of(bucket("test", 1, mockMultiBucketAgg())));
b.reduced(reduce, context);
// only account for sub-aggregations
assertEquals(1, buckets.get());
b.nonCompetitive(context);
assertEquals(0, buckets.get());
Expand All @@ -85,10 +117,25 @@ private static InternalBucket bucket(String key, long docCount) {
return new StringTerms.Bucket(new BytesRef(key), docCount, InternalAggregations.EMPTY, false, 0, DocValueFormat.RAW);
}

private static InternalBucket bucket(String key, long docCount, InternalAggregations subAggregations) {
return new StringTerms.Bucket(new BytesRef(key), docCount, subAggregations, false, 0, DocValueFormat.RAW);
}

static BiFunction<List<InternalBucket>, AggregationReduceContext, InternalBucket> mockReduce(AggregationReduceContext context) {
return (l, c) -> {
assertThat(c, sameInstance(context));
return bucket(l.get(0).getKeyAsString(), l.stream().mapToLong(Bucket::getDocCount).sum());
context.consumeBucketsAndMaybeBreak(l.get(0).getAggregations().asList().size());
return bucket(l.get(0).getKeyAsString(), l.stream().mapToLong(Bucket::getDocCount).sum(), l.get(0).getAggregations());
};
}

@SuppressWarnings("unchecked")
private InternalAggregations mockMultiBucketAgg() {
List<InternalBucket> buckets = List.of(bucket("sub", 1));
InternalMultiBucketAggregation<?, InternalBucket> mock = (InternalMultiBucketAggregation<?, InternalBucket>) mock(
InternalMultiBucketAggregation.class
);
when(mock.getBuckets()).thenReturn(buckets);
return InternalAggregations.from(List.of(mock));
}
}

0 comments on commit a655c17

Please sign in to comment.