Skip to content

Commit

Permalink
LexRequirement as a struct, instead of a type (apache#12583)
Browse files Browse the repository at this point in the history
* Converted LexRequirement into a struct.

* Adjusted the wrapping to return the correct type, since the LexRequirement was not being converted after the merge.

---------

Co-authored-by: nglime <[email protected]>
  • Loading branch information
2 people authored and bgjackma committed Sep 25, 2024
1 parent 6bb35e5 commit 65ddcd9
Show file tree
Hide file tree
Showing 12 changed files with 134 additions and 57 deletions.
5 changes: 3 additions & 2 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use datafusion_physical_expr::{

use async_trait::async_trait;
use datafusion_catalog::Session;
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use futures::{future, stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use object_store::ObjectStore;
Expand Down Expand Up @@ -987,12 +988,12 @@ impl TableProvider for ListingTable {
))?
.clone();
// Converts Vec<Vec<SortExpr>> into type required by execution plan to specify its required input ordering
Some(
Some(LexRequirement::new(
ordering
.into_iter()
.map(PhysicalSortRequirement::from)
.collect::<Vec<_>>(),
)
))
} else {
None
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1272,7 +1272,7 @@ fn ensure_distribution(
// Make sure to satisfy ordering requirement:
child = add_sort_above_with_check(
child,
required_input_ordering.to_vec(),
required_input_ordering.clone(),
None,
);
}
Expand Down
13 changes: 7 additions & 6 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use datafusion_physical_expr::{
use datafusion_physical_plan::streaming::StreamingTableExec;
use datafusion_physical_plan::union::UnionExec;

use datafusion_physical_expr_common::sort_expr::LexRequirement;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use itertools::Itertools;

Expand Down Expand Up @@ -334,10 +335,10 @@ fn try_swapping_with_output_req(
return Ok(None);
}

let mut updated_sort_reqs = vec![];
let mut updated_sort_reqs = LexRequirement::new(vec![]);
// None or empty_vec can be treated in the same way.
if let Some(reqs) = &output_req.required_input_ordering()[0] {
for req in reqs {
for req in &reqs.inner {
let Some(new_expr) = update_expr(&req.expr, projection.expr(), false)? else {
return Ok(None);
};
Expand Down Expand Up @@ -1995,7 +1996,7 @@ mod tests {
let csv = create_simple_csv_exec();
let sort_req: Arc<dyn ExecutionPlan> = Arc::new(OutputRequirementExec::new(
csv.clone(),
Some(vec![
Some(LexRequirement::new(vec![
PhysicalSortRequirement {
expr: Arc::new(Column::new("b", 1)),
options: Some(SortOptions::default()),
Expand All @@ -2008,7 +2009,7 @@ mod tests {
)),
options: Some(SortOptions::default()),
},
]),
])),
Distribution::HashPartitioned(vec![
Arc::new(Column::new("a", 0)),
Arc::new(Column::new("b", 1)),
Expand Down Expand Up @@ -2041,7 +2042,7 @@ mod tests {
];

assert_eq!(get_plan_string(&after_optimize), expected);
let expected_reqs = vec![
let expected_reqs = LexRequirement::new(vec![
PhysicalSortRequirement {
expr: Arc::new(Column::new("b", 2)),
options: Some(SortOptions::default()),
Expand All @@ -2054,7 +2055,7 @@ mod tests {
)),
options: Some(SortOptions::default()),
},
];
]);
assert_eq!(
after_optimize
.as_any()
Expand Down
23 changes: 15 additions & 8 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ fn pushdown_requirement_to_children(
let child_plan = plan.children().swap_remove(0);
match determine_children_requirement(parent_required, request_child, child_plan) {
RequirementsCompatibility::Satisfy => {
let req = (!request_child.is_empty()).then(|| request_child.to_vec());
let req = (!request_child.is_empty())
.then(|| LexRequirement::new(request_child.to_vec()));
Ok(Some(vec![req]))
}
RequirementsCompatibility::Compatible(adjusted) => Ok(Some(vec![adjusted])),
Expand All @@ -189,7 +190,9 @@ fn pushdown_requirement_to_children(
.requirements_compatible(parent_required, &sort_req)
{
debug_assert!(!parent_required.is_empty());
Ok(Some(vec![Some(parent_required.to_vec())]))
Ok(Some(vec![Some(LexRequirement::new(
parent_required.to_vec(),
))]))
} else {
Ok(None)
}
Expand All @@ -211,15 +214,17 @@ fn pushdown_requirement_to_children(
.eq_properties
.requirements_compatible(parent_required, &output_req)
{
let req = (!parent_required.is_empty()).then(|| parent_required.to_vec());
let req = (!parent_required.is_empty())
.then(|| LexRequirement::new(parent_required.to_vec()));
Ok(Some(vec![req]))
} else {
Ok(None)
}
} else if is_union(plan) {
// UnionExec does not have real sort requirements for its input. Here we change the adjusted_request_ordering to UnionExec's output ordering and
// propagate the sort requirements down to correct the unnecessary descendant SortExec under the UnionExec
let req = (!parent_required.is_empty()).then(|| parent_required.to_vec());
let req = (!parent_required.is_empty())
.then(|| LexRequirement::new(parent_required.to_vec()));
Ok(Some(vec![req; plan.children().len()]))
} else if let Some(smj) = plan.as_any().downcast_ref::<SortMergeJoinExec>() {
// If the current plan is SortMergeJoinExec
Expand Down Expand Up @@ -277,7 +282,8 @@ fn pushdown_requirement_to_children(
} else {
// Can push-down through SortPreservingMergeExec, because parent requirement is finer
// than SortPreservingMergeExec output ordering.
let req = (!parent_required.is_empty()).then(|| parent_required.to_vec());
let req = (!parent_required.is_empty())
.then(|| LexRequirement::new(parent_required.to_vec()));
Ok(Some(vec![req]))
}
} else {
Expand Down Expand Up @@ -331,7 +337,8 @@ fn determine_children_requirement(
{
// Parent requirements are more specific, adjust child's requirements
// and push down the new requirements:
let adjusted = (!parent_required.is_empty()).then(|| parent_required.to_vec());
let adjusted = (!parent_required.is_empty())
.then(|| LexRequirement::new(parent_required.to_vec()));
RequirementsCompatibility::Compatible(adjusted)
} else {
RequirementsCompatibility::NonCompatible
Expand Down Expand Up @@ -471,7 +478,7 @@ fn shift_right_required(
})
.collect::<Vec<_>>();
if new_right_required.len() == parent_required.len() {
Ok(new_right_required)
Ok(LexRequirement::new(new_right_required))
} else {
plan_err!(
"Expect to shift all the parent required column indexes for SortMergeJoin"
Expand Down Expand Up @@ -574,7 +581,7 @@ fn handle_custom_pushdown(
.iter()
.map(|&maintains_order| {
if maintains_order {
Some(updated_parent_req.clone())
Some(LexRequirement::new(updated_parent_req.clone()))
} else {
None
}
Expand Down
63 changes: 56 additions & 7 deletions datafusion/physical-expr-common/src/sort_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use std::fmt::Display;
use std::hash::{Hash, Hasher};
use std::ops::Deref;
use std::sync::Arc;

use crate::physical_expr::PhysicalExpr;
Expand Down Expand Up @@ -296,11 +297,13 @@ impl PhysicalSortRequirement {
pub fn from_sort_exprs<'a>(
ordering: impl IntoIterator<Item = &'a PhysicalSortExpr>,
) -> LexRequirement {
ordering
.into_iter()
.cloned()
.map(PhysicalSortRequirement::from)
.collect()
LexRequirement::new(
ordering
.into_iter()
.cloned()
.map(PhysicalSortRequirement::from)
.collect(),
)
}

/// Converts an iterator of [`PhysicalSortRequirement`] into a Vec
Expand Down Expand Up @@ -338,9 +341,55 @@ pub type LexOrdering = Vec<PhysicalSortExpr>;
/// a reference to a lexicographical ordering.
pub type LexOrderingRef<'a> = &'a [PhysicalSortExpr];

///`LexRequirement` is an alias for the type `Vec<PhysicalSortRequirement>`, which
///`LexRequirement` is an struct containing a `Vec<PhysicalSortRequirement>`, which
/// represents a lexicographical ordering requirement.
pub type LexRequirement = Vec<PhysicalSortRequirement>;
#[derive(Debug, Default, Clone, PartialEq)]
pub struct LexRequirement {
pub inner: Vec<PhysicalSortRequirement>,
}

impl LexRequirement {
pub fn new(inner: Vec<PhysicalSortRequirement>) -> Self {
Self { inner }
}

pub fn iter(&self) -> impl Iterator<Item = &PhysicalSortRequirement> {
self.inner.iter()
}

pub fn push(&mut self, physical_sort_requirement: PhysicalSortRequirement) {
self.inner.push(physical_sort_requirement)
}
}

impl Deref for LexRequirement {
type Target = [PhysicalSortRequirement];

fn deref(&self) -> &Self::Target {
self.inner.as_slice()
}
}

impl FromIterator<PhysicalSortRequirement> for LexRequirement {
fn from_iter<T: IntoIterator<Item = PhysicalSortRequirement>>(iter: T) -> Self {
let mut lex_requirement = LexRequirement::new(vec![]);

for i in iter {
lex_requirement.inner.push(i);
}

lex_requirement
}
}

impl IntoIterator for LexRequirement {
type Item = PhysicalSortRequirement;
type IntoIter = std::vec::IntoIter<Self::Item>;

fn into_iter(self) -> Self::IntoIter {
self.inner.into_iter()
}
}

///`LexRequirementRef` is an alias for the type &`[PhysicalSortRequirement]`, which
/// represents a reference to a lexicographical ordering requirement.
Expand Down
6 changes: 3 additions & 3 deletions datafusion/physical-expr/src/equivalence/class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ impl EquivalenceGroup {
// Normalize the requirements:
let normalized_sort_reqs = self.normalize_sort_requirements(&sort_reqs);
// Convert sort requirements back to sort expressions:
PhysicalSortRequirement::to_sort_exprs(normalized_sort_reqs)
PhysicalSortRequirement::to_sort_exprs(normalized_sort_reqs.inner)
}

/// This function applies the `normalize_sort_requirement` function for all
Expand All @@ -428,12 +428,12 @@ impl EquivalenceGroup {
&self,
sort_reqs: LexRequirementRef,
) -> LexRequirement {
collapse_lex_req(
collapse_lex_req(LexRequirement::new(
sort_reqs
.iter()
.map(|sort_req| self.normalize_sort_requirement(sort_req.clone()))
.collect(),
)
))
}

/// Projects `expr` according to the given projection mapping.
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/equivalence/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub fn collapse_lex_req(input: LexRequirement) -> LexRequirement {
output.push(item);
}
}
output
LexRequirement::new(output)
}

/// Adds the `offset` value to `Column` indices inside `expr`. This function is
Expand Down
5 changes: 3 additions & 2 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,8 +515,9 @@ impl EquivalenceProperties {
) -> Option<LexRequirement> {
let mut lhs = self.normalize_sort_requirements(req1);
let mut rhs = self.normalize_sort_requirements(req2);
lhs.iter_mut()
.zip(rhs.iter_mut())
lhs.inner
.iter_mut()
.zip(rhs.inner.iter_mut())
.all(|(lhs, rhs)| {
lhs.expr.eq(&rhs.expr)
&& match (lhs.options, rhs.options) {
Expand Down
18 changes: 10 additions & 8 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,21 +377,23 @@ impl AggregateExec {
// prefix requirements with this section. In this case, aggregation will
// work more efficiently.
let indices = get_ordered_partition_by_indices(&groupby_exprs, &input);
let mut new_requirement = indices
.iter()
.map(|&idx| PhysicalSortRequirement {
expr: Arc::clone(&groupby_exprs[idx]),
options: None,
})
.collect::<Vec<_>>();
let mut new_requirement = LexRequirement::new(
indices
.iter()
.map(|&idx| PhysicalSortRequirement {
expr: Arc::clone(&groupby_exprs[idx]),
options: None,
})
.collect::<Vec<_>>(),
);

let req = get_finer_aggregate_exprs_requirement(
&mut aggr_expr,
&group_by,
input_eq_properties,
&mode,
)?;
new_requirement.extend(req);
new_requirement.inner.extend(req);
new_requirement = collapse_lex_req(new_requirement);

// If our aggregation has grouping sets then our base grouping exprs will
Expand Down
8 changes: 6 additions & 2 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,9 @@ impl SortExec {
) -> PlanProperties {
// Determine execution mode:
let sort_satisfied = input.equivalence_properties().ordering_satisfy_requirement(
PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter()).as_slice(),
PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter())
.inner
.as_slice(),
);
let mode = match input.execution_mode() {
ExecutionMode::Unbounded if sort_satisfied => ExecutionMode::Unbounded,
Expand Down Expand Up @@ -895,7 +897,9 @@ impl ExecutionPlan for SortExec {
.input
.equivalence_properties()
.ordering_satisfy_requirement(
PhysicalSortRequirement::from_sort_exprs(self.expr.iter()).as_slice(),
PhysicalSortRequirement::from_sort_exprs(self.expr.iter())
.inner
.as_slice(),
);

match (sort_satisfied, self.fetch.as_ref()) {
Expand Down
Loading

0 comments on commit 65ddcd9

Please sign in to comment.