Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Make shuffle compression level configurable #632

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ object CometConf extends ShimCometConf {
.stringConf
.createWithDefault("zstd")

val COMET_EXEC_SHUFFLE_COMPRESSION_LEVEL: ConfigEntry[Int] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compressionLevel")
.doc("Zstd compression level used in shuffle.")
.intConf
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could have a check here for valid values?

.createWithDefault(1)

val COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED: ConfigEntry[Boolean] = conf(
"spark.comet.columnar.shuffle.async.enabled")
.doc(
Expand Down
1 change: 1 addition & 0 deletions core/benches/row_columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ fn benchmark(c: &mut Criterion) {
false,
0,
None,
1,
)
.unwrap();
});
Expand Down
1 change: 1 addition & 0 deletions core/benches/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ fn criterion_benchmark(c: &mut Criterion) {
Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16),
"/tmp/data.out".to_string(),
"/tmp/index.out".to_string(),
1,
)
.unwrap();

Expand Down
9 changes: 8 additions & 1 deletion core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,27 +128,32 @@ pub struct PhysicalPlanner {
exec_context_id: i64,
execution_props: ExecutionProps,
session_ctx: Arc<SessionContext>,
compression_level: i32,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like that we are passing around the compression_level parameters. It's somehow a bit ugly if we are going to add more configurable entries later.

Could we leverage the SessionConf in session_ctx.state to avoid that?

}

#[cfg(test)]
impl Default for PhysicalPlanner {
/// Create default planner (for use in tests only)
fn default() -> Self {
let session_ctx = Arc::new(SessionContext::new());
let execution_props = ExecutionProps::new();
Self {
exec_context_id: TEST_EXEC_CONTEXT_ID,
execution_props,
session_ctx,
compression_level: 1,
}
}
}

impl PhysicalPlanner {
pub fn new(session_ctx: Arc<SessionContext>) -> Self {
pub fn new(session_ctx: Arc<SessionContext>, compression_level: i32) -> Self {
let execution_props = ExecutionProps::new();
Self {
exec_context_id: TEST_EXEC_CONTEXT_ID,
execution_props,
session_ctx,
compression_level,
}
}

Expand All @@ -157,6 +162,7 @@ impl PhysicalPlanner {
exec_context_id,
execution_props: self.execution_props,
session_ctx: self.session_ctx.clone(),
compression_level: self.compression_level,
}
}

Expand Down Expand Up @@ -863,6 +869,7 @@ impl PhysicalPlanner {
partitioning,
writer.output_data_file.clone(),
writer.output_index_file.clone(),
self.compression_level,
)?),
))
}
Expand Down
41 changes: 30 additions & 11 deletions core/src/execution/datafusion/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ use std::{
task::{Context, Poll},
};

use crate::{
common::bit::ceil,
errors::{CometError, CometResult},
execution::datafusion::spark_hash::{create_murmur3_hashes, pmod},
};
use arrow::{datatypes::*, ipc::writer::StreamWriter};
use async_trait::async_trait;
use bytes::Buf;
Expand Down Expand Up @@ -59,12 +64,6 @@ use itertools::Itertools;
use simd_adler32::Adler32;
use tokio::task;

use crate::{
common::bit::ceil,
errors::{CometError, CometResult},
execution::datafusion::spark_hash::{create_murmur3_hashes, pmod},
};

/// The shuffle writer operator maps each input partition to M output partitions based on a
/// partitioning scheme. No guarantees are made about the order of the resulting partitions.
#[derive(Debug)]
Expand All @@ -80,6 +79,8 @@ pub struct ShuffleWriterExec {
/// Metrics
metrics: ExecutionPlanMetricsSet,
cache: PlanProperties,
/// zstd compression level
compression_level: i32,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the compression_level is stored in SessionConf, we may retrieve that when executing instead of defining a new field?

}

impl DisplayAs for ShuffleWriterExec {
Expand Down Expand Up @@ -118,6 +119,7 @@ impl ExecutionPlan for ShuffleWriterExec {
self.partitioning.clone(),
self.output_data_file.clone(),
self.output_index_file.clone(),
self.compression_level,
)?)),
_ => panic!("ShuffleWriterExec wrong number of children"),
}
Expand All @@ -142,6 +144,7 @@ impl ExecutionPlan for ShuffleWriterExec {
self.partitioning.clone(),
metrics,
context,
self.compression_level,
)
.map_err(|e| ArrowError::ExternalError(Box::new(e))),
)
Expand Down Expand Up @@ -169,6 +172,7 @@ impl ShuffleWriterExec {
partitioning: Partitioning,
output_data_file: String,
output_index_file: String,
compression_level: i32,
) -> Result<Self> {
let cache = PlanProperties::new(
EquivalenceProperties::new(input.schema().clone()),
Expand All @@ -183,6 +187,7 @@ impl ShuffleWriterExec {
output_data_file,
output_index_file,
cache,
compression_level,
})
}
}
Expand All @@ -201,17 +206,20 @@ struct PartitionBuffer {
/// The maximum number of rows in a batch. Once `num_active_rows` reaches `batch_size`,
/// the active array builders will be frozen and appended to frozen buffer `frozen`.
batch_size: usize,
/// zstd compression level
compression_level: i32,
}

impl PartitionBuffer {
fn new(schema: SchemaRef, batch_size: usize) -> Self {
fn new(schema: SchemaRef, batch_size: usize, compression_level: i32) -> Self {
Self {
schema,
frozen: vec![],
active: vec![],
active_slots_mem_size: 0,
num_active_rows: 0,
batch_size,
compression_level,
}
}

Expand Down Expand Up @@ -285,7 +293,7 @@ impl PartitionBuffer {
let frozen_capacity_old = self.frozen.capacity();
let mut cursor = Cursor::new(&mut self.frozen);
cursor.seek(SeekFrom::End(0))?;
write_ipc_compressed(&frozen_batch, &mut cursor)?;
write_ipc_compressed(&frozen_batch, &mut cursor, self.compression_level)?;

mem_diff += (self.frozen.capacity() - frozen_capacity_old) as isize;
Ok(mem_diff)
Expand Down Expand Up @@ -577,6 +585,8 @@ struct ShuffleRepartitioner {
partition_ids: Vec<u64>,
/// The configured batch size
batch_size: usize,
/// zstd compression level
compression_level: i32,
}

struct ShuffleRepartitionerMetrics {
Expand Down Expand Up @@ -611,6 +621,7 @@ impl ShuffleRepartitioner {
metrics: ShuffleRepartitionerMetrics,
runtime: Arc<RuntimeEnv>,
batch_size: usize,
compression_level: i32,
) -> Self {
let num_output_partitions = partitioning.partition_count();
let reservation = MemoryConsumer::new(format!("ShuffleRepartitioner[{}]", partition_id))
Expand All @@ -633,7 +644,7 @@ impl ShuffleRepartitioner {
schema: schema.clone(),
buffered_partitions: Mutex::new(
(0..num_output_partitions)
.map(|_| PartitionBuffer::new(schema.clone(), batch_size))
.map(|_| PartitionBuffer::new(schema.clone(), batch_size, compression_level))
.collect::<Vec<_>>(),
),
spills: Mutex::new(vec![]),
Expand All @@ -645,6 +656,7 @@ impl ShuffleRepartitioner {
hashes_buf,
partition_ids,
batch_size,
compression_level,
}
}

Expand Down Expand Up @@ -955,6 +967,7 @@ impl Debug for ShuffleRepartitioner {
}
}

#[allow(clippy::too_many_arguments)]
async fn external_shuffle(
mut input: SendableRecordBatchStream,
partition_id: usize,
Expand All @@ -963,6 +976,7 @@ async fn external_shuffle(
partitioning: Partitioning,
metrics: ShuffleRepartitionerMetrics,
context: Arc<TaskContext>,
compression_level: i32,
) -> Result<SendableRecordBatchStream> {
let schema = input.schema();
let mut repartitioner = ShuffleRepartitioner::new(
Expand All @@ -974,6 +988,7 @@ async fn external_shuffle(
metrics,
context.runtime_env(),
context.session_config().batch_size(),
compression_level,
);

while let Some(batch) = input.next().await {
Expand Down Expand Up @@ -1353,6 +1368,7 @@ impl Checksum {
pub(crate) fn write_ipc_compressed<W: Write + Seek>(
batch: &RecordBatch,
output: &mut W,
compression_level: i32,
) -> Result<usize> {
if batch.num_rows() == 0 {
return Ok(0);
Expand All @@ -1363,8 +1379,10 @@ pub(crate) fn write_ipc_compressed<W: Write + Seek>(
output.write_all(&[0u8; 8])?;

// write ipc data
// TODO: make compression level configurable
let mut arrow_writer = StreamWriter::try_new(zstd::Encoder::new(output, 1)?, &batch.schema())?;
let mut arrow_writer = StreamWriter::try_new(
zstd::Encoder::new(output, compression_level)?,
&batch.schema(),
)?;
arrow_writer.write(batch)?;
arrow_writer.finish()?;

Expand Down Expand Up @@ -1465,6 +1483,7 @@ mod test {
Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16),
"/tmp/data.out".to_string(),
"/tmp/index.out".to_string(),
1,
)
.unwrap();
let ctx = SessionContext::new();
Expand Down
17 changes: 15 additions & 2 deletions core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ struct ExecutionContext {
pub session_ctx: Arc<SessionContext>,
/// Whether to enable additional debugging checks & messages
pub debug_native: bool,
/// zstd compression level
pub compression_level: i32,
}

/// Accept serialized query plan and return the address of the native query plan.
Expand Down Expand Up @@ -132,6 +134,11 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
.and_then(|x| x.parse::<bool>().ok())
.unwrap_or(false);

let compression_level = configs
.get("compression_level")
.and_then(|x| x.parse::<i32>().ok())
.unwrap_or(1);

// Use multi-threaded tokio runtime to prevent blocking spawned tasks if any
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
Expand Down Expand Up @@ -169,6 +176,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
metrics,
session_ctx: Arc::new(session),
debug_native,
compression_level,
});

Ok(Box::into_raw(exec_context) as i64)
Expand Down Expand Up @@ -317,8 +325,11 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
// Because we don't know if input arrays are dictionary-encoded when we create
// query plan, we need to defer stream initialization to first time execution.
if exec_context.root_op.is_none() {
let planner = PhysicalPlanner::new(exec_context.session_ctx.clone())
.with_exec_id(exec_context_id);
let planner = PhysicalPlanner::new(
exec_context.session_ctx.clone(),
exec_context.compression_level,
)
.with_exec_id(exec_context_id);
let (scans, root_op) = planner.create_plan(
&exec_context.spark_plan,
&mut exec_context.input_sources.clone(),
Expand Down Expand Up @@ -455,6 +466,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative
checksum_enabled: jboolean,
checksum_algo: jint,
current_checksum: jlong,
compression_level: jlong,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you need to update jni code in the JVM side too: org.apache.comet.Native#writeSortedFileNative

) -> jlongArray {
try_unwrap_or_throw(&e, |mut env| unsafe {
let data_types = convert_datatype_arrays(&mut env, serialized_datatypes)?;
Expand Down Expand Up @@ -493,6 +505,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative
checksum_enabled,
checksum_algo,
current_checksum,
compression_level as i32,
)?;

let checksum = if let Some(checksum) = checksum {
Expand Down
3 changes: 2 additions & 1 deletion core/src/execution/shuffle/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3296,6 +3296,7 @@ pub fn process_sorted_row_partition(
// this is the initial checksum for this method, as it also gets updated iteratively
// inside the loop within the method across batches.
initial_checksum: Option<u32>,
compression_level: i32,
) -> Result<(i64, Option<u32>), CometError> {
// TODO: We can tune this parameter automatically based on row size and cache size.
let row_step = 10;
Expand Down Expand Up @@ -3355,7 +3356,7 @@ pub fn process_sorted_row_partition(
let mut frozen: Vec<u8> = vec![];
let mut cursor = Cursor::new(&mut frozen);
cursor.seek(SeekFrom::End(0))?;
written += write_ipc_compressed(&batch, &mut cursor)?;
written += write_ipc_compressed(&batch, &mut cursor, compression_level)?;

if let Some(checksum) = &mut current_checksum {
checksum.update(&mut cursor)?;
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.enabled | Whether to enable Comet native vectorized execution for Spark. This controls whether Spark should convert operators into their Comet counterparts and execute them in native space. Note: each operator is associated with a separate config in the format of 'spark.comet.exec.<operator_name>.enabled' at the moment, and both the config and this need to be turned on, in order for the operator to be executed in native. By default, this config is false. | false |
| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 |
| spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd |
| spark.comet.exec.shuffle.compressionLevel | Zstd compression level used in shuffle. | 1 |
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. By default, this config is false. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | false |
| spark.comet.exec.shuffle.mode | The mode of Comet shuffle. This config is only effective if Comet shuffle is enabled. Available modes are 'native', 'jvm', and 'auto'. 'native' is for native shuffle which has best performance in general. 'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle. 'auto' is for Comet to choose the best shuffle mode based on the query plan. By default, this config is 'jvm'. | jvm |
| spark.comet.explain.verbose.enabled | When this setting is enabled, Comet will provide a verbose tree representation of the extended information. | false |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark._
import org.apache.spark.sql.comet.CometMetricNode
import org.apache.spark.sql.vectorized._

import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION}
import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION, COMET_EXEC_SHUFFLE_COMPRESSION_LEVEL}
import org.apache.comet.vector.NativeUtil

/**
Expand Down Expand Up @@ -85,6 +85,7 @@ class CometExecIterator(
result.put("memory_fraction", String.valueOf(COMET_EXEC_MEMORY_FRACTION.get()))
result.put("batch_size", String.valueOf(COMET_BATCH_SIZE.get()))
result.put("debug_native", String.valueOf(COMET_DEBUG_ENABLED.get()))
result.put("compression_level", String.valueOf(COMET_EXEC_SHUFFLE_COMPRESSION_LEVEL.get()))

// Strip mandatory prefix spark. which is not required for DataFusion session params
conf.getAll.foreach {
Expand Down
Loading