Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor ByteGroupValueBuilder to use MaybeNullBufferBuilder #12681

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,21 @@
// specific language governing permissions and limitations
// under the License.

use arrow::array::BooleanBufferBuilder;
use arrow::array::BufferBuilder;
use arrow::array::GenericBinaryArray;
use arrow::array::GenericStringArray;
use arrow::array::OffsetSizeTrait;
use arrow::array::PrimitiveArray;
use arrow::array::{Array, ArrayRef, ArrowPrimitiveType, AsArray};
use arrow::buffer::NullBuffer;
use arrow::buffer::OffsetBuffer;
use arrow::buffer::ScalarBuffer;
use arrow::datatypes::ArrowNativeType;
use arrow::datatypes::ByteArrayType;
use arrow::datatypes::DataType;
use arrow::datatypes::GenericBinaryType;
use arrow::datatypes::GenericStringType;
use datafusion_common::utils::proxy::VecAllocExt;

use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
use arrow_array::types::GenericStringType;
use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAPACITY};
use std::sync::Arc;
use std::vec;
Expand Down Expand Up @@ -190,6 +187,12 @@ impl<T: ArrowPrimitiveType> GroupColumn for PrimitiveGroupValueBuilder<T> {
}

/// An implementation of [`GroupColumn`] for binary and utf8 types.
///
/// Stores a collection of binary or utf8 group values in a single buffer
/// in a way that allows:
///
/// 1. Efficient comparison of incoming rows to existing rows
/// 2. Efficient construction of the final output array
pub struct ByteGroupValueBuilder<O>
where
O: OffsetSizeTrait,
Expand All @@ -201,8 +204,8 @@ where
/// stored in the range `offsets[i]..offsets[i+1]` in `buffer`. Null values
/// are stored as a zero length string.
offsets: Vec<O>,
/// Null indexes in offsets, if `i` is in nulls, `offsets[i]` should be equals to `offsets[i+1]`
nulls: Vec<usize>,
/// Nulls
nulls: MaybeNullBufferBuilder,
}

impl<O> ByteGroupValueBuilder<O>
Expand All @@ -214,7 +217,7 @@ where
output_type,
buffer: BufferBuilder::new(INITIAL_BUFFER_CAPACITY),
offsets: vec![O::default()],
nulls: vec![],
nulls: MaybeNullBufferBuilder::new(),
}
}

Expand All @@ -224,40 +227,33 @@ where
{
let arr = array.as_bytes::<B>();
if arr.is_null(row) {
self.nulls.push(self.len());
self.nulls.append(true);
// nulls need a zero length in the offset buffer
let offset = self.buffer.len();

self.offsets.push(O::usize_as(offset));
return;
} else {
self.nulls.append(false);
let value: &[u8] = arr.value(row).as_ref();
self.buffer.append_slice(value);
self.offsets.push(O::usize_as(self.buffer.len()));
}

let value: &[u8] = arr.value(row).as_ref();
self.buffer.append_slice(value);
self.offsets.push(O::usize_as(self.buffer.len()));
}

fn equal_to_inner<B>(&self, lhs_row: usize, array: &ArrayRef, rhs_row: usize) -> bool
where
B: ByteArrayType,
{
// Handle nulls
let is_lhs_null = self.nulls.iter().any(|null_idx| *null_idx == lhs_row);
let arr = array.as_bytes::<B>();
if is_lhs_null {
return arr.is_null(rhs_row);
} else if arr.is_null(rhs_row) {
return false;
}
self.nulls.is_null(lhs_row) == arr.is_null(rhs_row)
&& self.value(lhs_row) == (arr.value(rhs_row).as_ref() as &[u8])
}

let arr = array.as_bytes::<B>();
let rhs_elem: &[u8] = arr.value(rhs_row).as_ref();
let rhs_elem_len = arr.value_length(rhs_row).as_usize();
debug_assert_eq!(rhs_elem_len, rhs_elem.len());
let l = self.offsets[lhs_row].as_usize();
let r = self.offsets[lhs_row + 1].as_usize();
let existing_elem = unsafe { self.buffer.as_slice().get_unchecked(l..r) };
rhs_elem == existing_elem
/// return the current value of the specified row irrespective of null
pub fn value(&self, row: usize) -> &[u8] {
let l = self.offsets[row].as_usize();
let r = self.offsets[row + 1].as_usize();
// Safety: the offsets are constructed correctly and never decrease
unsafe { self.buffer.as_slice().get_unchecked(l..r) }
}
}

Expand Down Expand Up @@ -325,18 +321,7 @@ where
nulls,
} = *self;

let null_buffer = if nulls.is_empty() {
None
} else {
// Only make a `NullBuffer` if there was a null value
let num_values = offsets.len() - 1;
let mut bool_builder = BooleanBufferBuilder::new(num_values);
bool_builder.append_n(num_values, true);
nulls.into_iter().for_each(|null_index| {
bool_builder.set_bit(null_index, false);
});
Some(NullBuffer::from(bool_builder.finish()))
};
let null_buffer = nulls.build();

// SAFETY: the offsets were constructed correctly in `insert_if_new` --
// monotonically increasing, overflows were checked.
Expand All @@ -353,9 +338,9 @@ where
// SAFETY:
// 1. the offsets were constructed safely
//
// 2. we asserted the input arrays were all the correct type and
// thus since all the values that went in were valid (e.g. utf8)
// so are all the values that come out
// 2. the input arrays were all the correct type and thus since
// all the values that went in were valid (e.g. utf8) so are all
// the values that come out
Arc::new(unsafe {
GenericStringArray::new_unchecked(offsets, values, null_buffer)
})
Expand All @@ -366,27 +351,7 @@ where

fn take_n(&mut self, n: usize) -> ArrayRef {
debug_assert!(self.len() >= n);

let null_buffer = if self.nulls.is_empty() {
None
} else {
// Only make a `NullBuffer` if there was a null value
let mut bool_builder = BooleanBufferBuilder::new(n);
bool_builder.append_n(n, true);

let mut new_nulls = vec![];
self.nulls.iter().for_each(|null_index| {
if *null_index < n {
bool_builder.set_bit(*null_index, false);
} else {
new_nulls.push(null_index - n);
}
});

self.nulls = new_nulls;
Some(NullBuffer::from(bool_builder.finish()))
};

let null_buffer = self.nulls.take_n(n);
let first_remaining_offset = O::as_usize(self.offsets[n]);

// Given offests like [0, 2, 4, 5] and n = 1, we expect to get
Expand Down