From a60e9612e74ddcc055c34a620df32cfc3dd59a75 Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Mon, 30 Sep 2024 09:44:55 -0400 Subject: [PATCH] ESQL: More tests for filtered aggs This adds a test to *every* agg for when it's entirely filtered away and another when filtering is enabled but unused. I'll follow up with another test later for partial filtering. --- .../CountGroupingAggregatorFunction.java | 1 - .../ConstantBooleanExpressionEvaluator.java | 28 ++++++ .../AggregatorFunctionTestCase.java | 39 +++++++- ...ooleanGroupingAggregatorFunctionTests.java | 11 +++ ...tesRefGroupingAggregatorFunctionTests.java | 11 +++ ...DoubleGroupingAggregatorFunctionTests.java | 11 +++ ...tFloatGroupingAggregatorFunctionTests.java | 11 +++ ...nctIntGroupingAggregatorFunctionTests.java | 11 +++ ...ctLongGroupingAggregatorFunctionTests.java | 11 +++ .../CountGroupingAggregatorFunctionTests.java | 11 +++ .../FilteredAggregatorFunctionTests.java | 5 + .../GroupingAggregatorFunctionTestCase.java | 95 +++++++++++++++---- 12 files changed, 223 insertions(+), 22 deletions(-) create mode 100644 x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/ConstantBooleanExpressionEvaluator.java diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/CountGroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/CountGroupingAggregatorFunction.java index f610abf271cfa..e107a73f7ab1e 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/CountGroupingAggregatorFunction.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/CountGroupingAggregatorFunction.java @@ -142,7 +142,6 @@ private void addRawInput(IntVector groups) { */ private void addRawInput(IntBlock groups) { for (int groupPosition = 0; groupPosition < groups.getPositionCount(); groupPosition++) { - // TODO remove the check one we don't emit null anymore if (groups.isNull(groupPosition)) { continue; } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/ConstantBooleanExpressionEvaluator.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/ConstantBooleanExpressionEvaluator.java new file mode 100644 index 0000000000000..3dbc8120473d4 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/ConstantBooleanExpressionEvaluator.java @@ -0,0 +1,28 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute; + +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.EvalOperator; + +public record ConstantBooleanExpressionEvaluator(BlockFactory factory, boolean value) implements EvalOperator.ExpressionEvaluator { + public static EvalOperator.ExpressionEvaluator.Factory factory(boolean value) { + return ctx -> new ConstantBooleanExpressionEvaluator(ctx.blockFactory(), value); + } + + @Override + public Block eval(Page page) { + return factory.newConstantBooleanVector(value, page.getPositionCount()).asBlock(); + } + + @Override + public void close() {} + +} diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/AggregatorFunctionTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/AggregatorFunctionTestCase.java index 275038e6d2f02..f43585b45154e 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/AggregatorFunctionTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/AggregatorFunctionTestCase.java @@ -8,6 +8,7 @@ package org.elasticsearch.compute.aggregation; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.compute.ConstantBooleanExpressionEvaluator; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BlockTestUtils; @@ -34,6 +35,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.function.Function; import java.util.stream.DoubleStream; import java.util.stream.IntStream; import java.util.stream.LongStream; @@ -58,8 +60,17 @@ protected final int aggregatorIntermediateBlockCount() { @Override protected Operator.OperatorFactory simpleWithMode(AggregatorMode mode) { + return simpleWithMode(mode, Function.identity()); + } + + private Operator.OperatorFactory simpleWithMode( + AggregatorMode mode, + Function wrap + ) { List channels = mode.isInputPartial() ? range(0, aggregatorIntermediateBlockCount()).boxed().toList() : List.of(0); - return new AggregationOperator.AggregationOperatorFactory(List.of(aggregatorFunction(channels).aggregatorFactory(mode)), mode); + AggregatorFunctionSupplier supplier = aggregatorFunction(channels); + Aggregator.Factory factory = wrap.apply(supplier).aggregatorFactory(mode); + return new AggregationOperator.AggregationOperatorFactory(List.of(factory), mode); } @Override @@ -141,6 +152,7 @@ public final void testEmptyInput() { List results = drive(simple().get(driverContext), List.of().iterator(), driverContext); assertThat(results, hasSize(1)); + assertOutputFromEmpty(results.get(0).getBlock(0)); } public final void testEmptyInputInitialFinal() { @@ -166,6 +178,31 @@ public final void testEmptyInputInitialIntermediateFinal() { assertOutputFromEmpty(results.get(0).getBlock(0)); } + public void testAllFiltered() { + Operator.OperatorFactory factory = simpleWithMode( + AggregatorMode.SINGLE, + agg -> new FilteredAggregatorFunctionSupplier(agg, ConstantBooleanExpressionEvaluator.factory(false)) + ); + DriverContext driverContext = driverContext(); + List input = CannedSourceOperator.collectPages(simpleInput(driverContext.blockFactory(), 10)); + List results = drive(factory.get(driverContext), input.iterator(), driverContext); + assertThat(results, hasSize(1)); + assertOutputFromEmpty(results.get(0).getBlock(0)); + } + + public final void testNoneFiltered() { + Operator.OperatorFactory factory = simpleWithMode( + AggregatorMode.SINGLE, + agg -> new FilteredAggregatorFunctionSupplier(agg, ConstantBooleanExpressionEvaluator.factory(true)) + ); + DriverContext driverContext = driverContext(); + List input = CannedSourceOperator.collectPages(simpleInput(driverContext.blockFactory(), 10)); + List origInput = BlockTestUtils.deepCopyOf(input, TestBlockFactory.getNonBreakingInstance()); + List results = drive(factory.get(driverContext), input.iterator(), driverContext); + assertThat(results, hasSize(1)); + assertSimpleOutput(origInput, results); + } + // Returns an intermediate state that is equivalent to what the local execution planner will emit // if it determines that certain shards have no relevant data. List nullIntermediateState(BlockFactory blockFactory) { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctBooleanGroupingAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctBooleanGroupingAggregatorFunctionTests.java index 66ecbb6eb1130..c39fe32620ff9 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctBooleanGroupingAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctBooleanGroupingAggregatorFunctionTests.java @@ -9,7 +9,9 @@ import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.LongBooleanTupleBlockSourceOperator; import org.elasticsearch.compute.operator.SourceOperator; @@ -53,4 +55,13 @@ protected void assertOutputFromNullOnly(Block b, int position) { assertThat(b.getValueCount(position), equalTo(1)); assertThat(((LongBlock) b).getLong(b.getFirstValueIndex(position)), equalTo(0L)); } + + @Override + protected void assertOutputFromAllFiltered(Block b) { + assertThat(b.elementType(), equalTo(ElementType.LONG)); + LongVector v = (LongVector) b.asVector(); + for (int p = 0; p < v.getPositionCount(); p++) { + assertThat(v.getLong(p), equalTo(0L)); + } + } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctBytesRefGroupingAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctBytesRefGroupingAggregatorFunctionTests.java index cbc2a5227d9ea..dd739d2189ba8 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctBytesRefGroupingAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctBytesRefGroupingAggregatorFunctionTests.java @@ -10,7 +10,9 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.LongBytesRefTupleBlockSourceOperator; import org.elasticsearch.compute.operator.SourceOperator; @@ -58,4 +60,13 @@ protected void assertOutputFromNullOnly(Block b, int position) { assertThat(b.getValueCount(position), equalTo(1)); assertThat(((LongBlock) b).getLong(b.getFirstValueIndex(position)), equalTo(0L)); } + + @Override + protected void assertOutputFromAllFiltered(Block b) { + assertThat(b.elementType(), equalTo(ElementType.LONG)); + LongVector v = (LongVector) b.asVector(); + for (int p = 0; p < v.getPositionCount(); p++) { + assertThat(v.getLong(p), equalTo(0L)); + } + } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctDoubleGroupingAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctDoubleGroupingAggregatorFunctionTests.java index 56a0d863038bc..7b6f928d57ddb 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctDoubleGroupingAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctDoubleGroupingAggregatorFunctionTests.java @@ -9,7 +9,9 @@ import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.LongDoubleTupleBlockSourceOperator; import org.elasticsearch.compute.operator.SourceOperator; @@ -57,4 +59,13 @@ protected void assertOutputFromNullOnly(Block b, int position) { assertThat(b.getValueCount(position), equalTo(1)); assertThat(((LongBlock) b).getLong(b.getFirstValueIndex(position)), equalTo(0L)); } + + @Override + protected void assertOutputFromAllFiltered(Block b) { + assertThat(b.elementType(), equalTo(ElementType.LONG)); + LongVector v = (LongVector) b.asVector(); + for (int p = 0; p < v.getPositionCount(); p++) { + assertThat(v.getLong(p), equalTo(0L)); + } + } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctFloatGroupingAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctFloatGroupingAggregatorFunctionTests.java index 03a11bb976b21..6b4a8f2900aaa 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctFloatGroupingAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctFloatGroupingAggregatorFunctionTests.java @@ -9,7 +9,9 @@ import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.LongFloatTupleBlockSourceOperator; import org.elasticsearch.compute.operator.SourceOperator; @@ -57,4 +59,13 @@ protected void assertOutputFromNullOnly(Block b, int position) { assertThat(b.getValueCount(position), equalTo(1)); assertThat(((LongBlock) b).getLong(b.getFirstValueIndex(position)), equalTo(0L)); } + + @Override + protected void assertOutputFromAllFiltered(Block b) { + assertThat(b.elementType(), equalTo(ElementType.LONG)); + LongVector v = (LongVector) b.asVector(); + for (int p = 0; p < v.getPositionCount(); p++) { + assertThat(v.getLong(p), equalTo(0L)); + } + } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctIntGroupingAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctIntGroupingAggregatorFunctionTests.java index 229ec49bcffa8..cfd3357a14c03 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctIntGroupingAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctIntGroupingAggregatorFunctionTests.java @@ -9,7 +9,9 @@ import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.LongIntBlockSourceOperator; import org.elasticsearch.compute.operator.SourceOperator; @@ -57,4 +59,13 @@ protected void assertOutputFromNullOnly(Block b, int position) { assertThat(b.getValueCount(position), equalTo(1)); assertThat(((LongBlock) b).getLong(b.getFirstValueIndex(position)), equalTo(0L)); } + + @Override + protected void assertOutputFromAllFiltered(Block b) { + assertThat(b.elementType(), equalTo(ElementType.LONG)); + LongVector v = (LongVector) b.asVector(); + for (int p = 0; p < v.getPositionCount(); p++) { + assertThat(v.getLong(p), equalTo(0L)); + } + } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctLongGroupingAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctLongGroupingAggregatorFunctionTests.java index 539ef35390663..55be7fe9a8ed3 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctLongGroupingAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctLongGroupingAggregatorFunctionTests.java @@ -9,7 +9,9 @@ import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.SourceOperator; import org.elasticsearch.compute.operator.TupleBlockSourceOperator; @@ -56,4 +58,13 @@ protected void assertOutputFromNullOnly(Block b, int position) { assertThat(b.getValueCount(position), equalTo(1)); assertThat(((LongBlock) b).getLong(b.getFirstValueIndex(position)), equalTo(0L)); } + + @Override + protected void assertOutputFromAllFiltered(Block b) { + assertThat(b.elementType(), equalTo(ElementType.LONG)); + LongVector v = (LongVector) b.asVector(); + for (int p = 0; p < v.getPositionCount(); p++) { + assertThat(v.getLong(p), equalTo(0L)); + } + } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountGroupingAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountGroupingAggregatorFunctionTests.java index 1d658f80c4e29..06c267ff2d6ab 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountGroupingAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountGroupingAggregatorFunctionTests.java @@ -9,7 +9,9 @@ import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.ElementType; import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.LongVector; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.LongDoubleTupleBlockSourceOperator; import org.elasticsearch.compute.operator.SourceOperator; @@ -58,4 +60,13 @@ protected void assertOutputFromNullOnly(Block b, int position) { assertThat(b.getValueCount(position), equalTo(1)); assertThat(((LongBlock) b).getLong(b.getFirstValueIndex(position)), equalTo(0L)); } + + @Override + protected void assertOutputFromAllFiltered(Block b) { + assertThat(b.elementType(), equalTo(ElementType.LONG)); + LongVector v = (LongVector) b.asVector(); + for (int p = 0; p < v.getPositionCount(); p++) { + assertThat(v.getLong(p), equalTo(0L)); + } + } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FilteredAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FilteredAggregatorFunctionTests.java index 6ad3251d3c120..4287c1fa96c55 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FilteredAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FilteredAggregatorFunctionTests.java @@ -93,4 +93,9 @@ public void checkUnclosed() { } assertThat(unclosed, empty()); } + + @Override + public void testAllFiltered() { + assumeFalse("can't double filter. tests already filter.", true); + } } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java index de9337f5fce2c..316058e57e089 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java @@ -10,6 +10,7 @@ import org.apache.lucene.document.InetAddressPoint; import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.util.BitArray; +import org.elasticsearch.compute.ConstantBooleanExpressionEvaluator; import org.elasticsearch.compute.aggregation.blockhash.BlockHash; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; @@ -42,6 +43,7 @@ import java.util.List; import java.util.SortedSet; import java.util.TreeSet; +import java.util.function.Function; import java.util.stream.DoubleStream; import java.util.stream.IntStream; import java.util.stream.LongStream; @@ -82,10 +84,17 @@ protected DataType acceptedDataType() { @Override protected final Operator.OperatorFactory simpleWithMode(AggregatorMode mode) { + return simpleWithMode(mode, Function.identity()); + } + + private Operator.OperatorFactory simpleWithMode( + AggregatorMode mode, + Function wrap + ) { List channels = mode.isInputPartial() ? range(1, 1 + aggregatorIntermediateBlockCount()).boxed().toList() : List.of(1); int emitChunkSize = between(100, 200); - AggregatorFunctionSupplier supplier = aggregatorFunction(channels); + AggregatorFunctionSupplier supplier = wrap.apply(aggregatorFunction(channels)); if (randomBoolean()) { supplier = chunkGroups(emitChunkSize, supplier); } @@ -353,6 +362,49 @@ public final void testNullOnlyInputInitialIntermediateFinal() { ); } + public final void testEmptyInput() { + DriverContext driverContext = driverContext(); + List results = drive(simple().get(driverContext), List.of().iterator(), driverContext); + + assertThat(results, hasSize(0)); + } + + public final void testAllFiltered() { + Operator.OperatorFactory factory = simpleWithMode( + AggregatorMode.SINGLE, + agg -> new FilteredAggregatorFunctionSupplier(agg, ConstantBooleanExpressionEvaluator.factory(false)) + ); + DriverContext driverContext = driverContext(); + List input = CannedSourceOperator.collectPages(simpleInput(driverContext.blockFactory(), 10)); + List results = drive(factory.get(driverContext), input.iterator(), driverContext); + assertThat(results, hasSize(1)); + assertOutputFromAllFiltered(results.get(0).getBlock(1)); + } + + public final void testNoneFiltered() { + Operator.OperatorFactory factory = simpleWithMode( + AggregatorMode.SINGLE, + agg -> new FilteredAggregatorFunctionSupplier(agg, ConstantBooleanExpressionEvaluator.factory(true)) + ); + DriverContext driverContext = driverContext(); + List input = CannedSourceOperator.collectPages(simpleInput(driverContext.blockFactory(), 10)); + List origInput = BlockTestUtils.deepCopyOf(input, TestBlockFactory.getNonBreakingInstance()); + List results = drive(factory.get(driverContext), input.iterator(), driverContext); + assertThat(results, hasSize(1)); + assertSimpleOutput(origInput, results); + } + + /** + * Asserts that the output from an empty input is a {@link Block} containing + * only {@code null}. Override for {@code count} style aggregations that + * return other sorts of results. + */ + protected void assertOutputFromAllFiltered(Block b) { + assertThat(b.areAllValuesNull(), equalTo(true)); + assertThat(b.isNull(0), equalTo(true)); + assertThat(b.getValueCount(0), equalTo(0)); + } + /** * Run the aggregation passing only null values. */ @@ -560,31 +612,34 @@ public AddInput prepareProcessPage(SeenGroupIds ignoredSeenGroupIds, Page page) @Override public void add(int positionOffset, IntBlock groupIds) { for (int offset = 0; offset < groupIds.getPositionCount(); offset += emitChunkSize) { - IntBlock.Builder builder = blockFactory().newIntBlockBuilder(emitChunkSize); - int endP = Math.min(groupIds.getPositionCount(), offset + emitChunkSize); - for (int p = offset; p < endP; p++) { - int start = groupIds.getFirstValueIndex(p); - int count = groupIds.getValueCount(p); - switch (count) { - case 0 -> builder.appendNull(); - case 1 -> { - int group = groupIds.getInt(start); - seenGroupIds.set(group); - builder.appendInt(group); - } - default -> { - int end = start + count; - builder.beginPositionEntry(); - for (int i = start; i < end; i++) { - int group = groupIds.getInt(i); + try (IntBlock.Builder builder = blockFactory().newIntBlockBuilder(emitChunkSize)) { + int endP = Math.min(groupIds.getPositionCount(), offset + emitChunkSize); + for (int p = offset; p < endP; p++) { + int start = groupIds.getFirstValueIndex(p); + int count = groupIds.getValueCount(p); + switch (count) { + case 0 -> builder.appendNull(); + case 1 -> { + int group = groupIds.getInt(start); seenGroupIds.set(group); builder.appendInt(group); } - builder.endPositionEntry(); + default -> { + int end = start + count; + builder.beginPositionEntry(); + for (int i = start; i < end; i++) { + int group = groupIds.getInt(i); + seenGroupIds.set(group); + builder.appendInt(group); + } + builder.endPositionEntry(); + } } } + try (IntBlock chunked = builder.build()) { + delegateAddInput.add(positionOffset + offset, chunked); + } } - delegateAddInput.add(positionOffset + offset, builder.build()); } }