From 625f877ed21afb424a7870f66ffc9e2fe4d09c75 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 4 Jul 2024 10:54:56 -0600 Subject: [PATCH 1/6] Make compression level configurable --- .../scala/org/apache/comet/CometConf.scala | 6 +++ core/src/execution/datafusion/planner.rs | 9 ++++- .../execution/datafusion/shuffle_writer.rs | 40 ++++++++++++++----- core/src/execution/jni_api.rs | 17 +++++++- core/src/execution/shuffle/row.rs | 3 +- docs/source/user-guide/configs.md | 1 + .../org/apache/comet/CometExecIterator.scala | 3 +- 7 files changed, 63 insertions(+), 16 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 787e8b4e9..79e5c39f4 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -175,6 +175,12 @@ object CometConf extends ShimCometConf { .stringConf .createWithDefault("zstd") + val COMET_EXEC_SHUFFLE_COMPRESSION_LEVEL: ConfigEntry[Int] = + conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compressionLevel") + .doc("Zstd compression level used in shuffle.") + .intConf + .createWithDefault(1) + val COMET_COLUMNAR_SHUFFLE_ASYNC_ENABLED: ConfigEntry[Boolean] = conf( "spark.comet.columnar.shuffle.async.enabled") .doc( diff --git a/core/src/execution/datafusion/planner.rs b/core/src/execution/datafusion/planner.rs index a969c3baa..6feeff6a1 100644 --- a/core/src/execution/datafusion/planner.rs +++ b/core/src/execution/datafusion/planner.rs @@ -128,9 +128,12 @@ pub struct PhysicalPlanner { exec_context_id: i64, execution_props: ExecutionProps, session_ctx: Arc, + compression_level: i32, } +#[cfg(test)] impl Default for PhysicalPlanner { + /// Create default planner (for use in tests only) fn default() -> Self { let session_ctx = Arc::new(SessionContext::new()); let execution_props = ExecutionProps::new(); @@ -138,17 +141,19 @@ impl Default for PhysicalPlanner { exec_context_id: TEST_EXEC_CONTEXT_ID, execution_props, session_ctx, + compression_level: 1, } } } impl PhysicalPlanner { - pub fn new(session_ctx: Arc) -> Self { + pub fn new(session_ctx: Arc, compression_level: i32) -> Self { let execution_props = ExecutionProps::new(); Self { exec_context_id: TEST_EXEC_CONTEXT_ID, execution_props, session_ctx, + compression_level, } } @@ -157,6 +162,7 @@ impl PhysicalPlanner { exec_context_id, execution_props: self.execution_props, session_ctx: self.session_ctx.clone(), + compression_level: self.compression_level, } } @@ -863,6 +869,7 @@ impl PhysicalPlanner { partitioning, writer.output_data_file.clone(), writer.output_index_file.clone(), + self.compression_level, )?), )) } diff --git a/core/src/execution/datafusion/shuffle_writer.rs b/core/src/execution/datafusion/shuffle_writer.rs index 5afc9a53e..8a910a765 100644 --- a/core/src/execution/datafusion/shuffle_writer.rs +++ b/core/src/execution/datafusion/shuffle_writer.rs @@ -28,6 +28,11 @@ use std::{ task::{Context, Poll}, }; +use crate::{ + common::bit::ceil, + errors::{CometError, CometResult}, + execution::datafusion::spark_hash::{create_murmur3_hashes, pmod}, +}; use arrow::{datatypes::*, ipc::writer::StreamWriter}; use async_trait::async_trait; use bytes::Buf; @@ -59,12 +64,6 @@ use itertools::Itertools; use simd_adler32::Adler32; use tokio::task; -use crate::{ - common::bit::ceil, - errors::{CometError, CometResult}, - execution::datafusion::spark_hash::{create_murmur3_hashes, pmod}, -}; - /// The shuffle writer operator maps each input partition to M output partitions based on a /// partitioning scheme. No guarantees are made about the order of the resulting partitions. #[derive(Debug)] @@ -80,6 +79,8 @@ pub struct ShuffleWriterExec { /// Metrics metrics: ExecutionPlanMetricsSet, cache: PlanProperties, + /// zstd compression level + compression_level: i32, } impl DisplayAs for ShuffleWriterExec { @@ -118,6 +119,7 @@ impl ExecutionPlan for ShuffleWriterExec { self.partitioning.clone(), self.output_data_file.clone(), self.output_index_file.clone(), + self.compression_level, )?)), _ => panic!("ShuffleWriterExec wrong number of children"), } @@ -142,6 +144,7 @@ impl ExecutionPlan for ShuffleWriterExec { self.partitioning.clone(), metrics, context, + self.compression_level, ) .map_err(|e| ArrowError::ExternalError(Box::new(e))), ) @@ -169,6 +172,7 @@ impl ShuffleWriterExec { partitioning: Partitioning, output_data_file: String, output_index_file: String, + compression_level: i32, ) -> Result { let cache = PlanProperties::new( EquivalenceProperties::new(input.schema().clone()), @@ -183,6 +187,7 @@ impl ShuffleWriterExec { output_data_file, output_index_file, cache, + compression_level, }) } } @@ -201,10 +206,12 @@ struct PartitionBuffer { /// The maximum number of rows in a batch. Once `num_active_rows` reaches `batch_size`, /// the active array builders will be frozen and appended to frozen buffer `frozen`. batch_size: usize, + /// zstd compression level + compression_level: i32, } impl PartitionBuffer { - fn new(schema: SchemaRef, batch_size: usize) -> Self { + fn new(schema: SchemaRef, batch_size: usize, compression_level: i32) -> Self { Self { schema, frozen: vec![], @@ -212,6 +219,7 @@ impl PartitionBuffer { active_slots_mem_size: 0, num_active_rows: 0, batch_size, + compression_level, } } @@ -285,7 +293,7 @@ impl PartitionBuffer { let frozen_capacity_old = self.frozen.capacity(); let mut cursor = Cursor::new(&mut self.frozen); cursor.seek(SeekFrom::End(0))?; - write_ipc_compressed(&frozen_batch, &mut cursor)?; + write_ipc_compressed(&frozen_batch, &mut cursor, self.compression_level)?; mem_diff += (self.frozen.capacity() - frozen_capacity_old) as isize; Ok(mem_diff) @@ -577,6 +585,8 @@ struct ShuffleRepartitioner { partition_ids: Vec, /// The configured batch size batch_size: usize, + /// zstd compression level + compression_level: i32, } struct ShuffleRepartitionerMetrics { @@ -611,6 +621,7 @@ impl ShuffleRepartitioner { metrics: ShuffleRepartitionerMetrics, runtime: Arc, batch_size: usize, + compression_level: i32, ) -> Self { let num_output_partitions = partitioning.partition_count(); let reservation = MemoryConsumer::new(format!("ShuffleRepartitioner[{}]", partition_id)) @@ -633,7 +644,7 @@ impl ShuffleRepartitioner { schema: schema.clone(), buffered_partitions: Mutex::new( (0..num_output_partitions) - .map(|_| PartitionBuffer::new(schema.clone(), batch_size)) + .map(|_| PartitionBuffer::new(schema.clone(), batch_size, compression_level)) .collect::>(), ), spills: Mutex::new(vec![]), @@ -645,6 +656,7 @@ impl ShuffleRepartitioner { hashes_buf, partition_ids, batch_size, + compression_level, } } @@ -963,6 +975,7 @@ async fn external_shuffle( partitioning: Partitioning, metrics: ShuffleRepartitionerMetrics, context: Arc, + compression_level: i32, ) -> Result { let schema = input.schema(); let mut repartitioner = ShuffleRepartitioner::new( @@ -974,6 +987,7 @@ async fn external_shuffle( metrics, context.runtime_env(), context.session_config().batch_size(), + compression_level, ); while let Some(batch) = input.next().await { @@ -1353,6 +1367,7 @@ impl Checksum { pub(crate) fn write_ipc_compressed( batch: &RecordBatch, output: &mut W, + compression_level: i32, ) -> Result { if batch.num_rows() == 0 { return Ok(0); @@ -1363,8 +1378,10 @@ pub(crate) fn write_ipc_compressed( output.write_all(&[0u8; 8])?; // write ipc data - // TODO: make compression level configurable - let mut arrow_writer = StreamWriter::try_new(zstd::Encoder::new(output, 1)?, &batch.schema())?; + let mut arrow_writer = StreamWriter::try_new( + zstd::Encoder::new(output, compression_level)?, + &batch.schema(), + )?; arrow_writer.write(batch)?; arrow_writer.finish()?; @@ -1465,6 +1482,7 @@ mod test { Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16), "/tmp/data.out".to_string(), "/tmp/index.out".to_string(), + 1, ) .unwrap(); let ctx = SessionContext::new(); diff --git a/core/src/execution/jni_api.rs b/core/src/execution/jni_api.rs index bc194238b..17a615b91 100644 --- a/core/src/execution/jni_api.rs +++ b/core/src/execution/jni_api.rs @@ -89,6 +89,8 @@ struct ExecutionContext { pub session_ctx: Arc, /// Whether to enable additional debugging checks & messages pub debug_native: bool, + /// zstd compression level + pub compression_level: i32, } /// Accept serialized query plan and return the address of the native query plan. @@ -132,6 +134,11 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( .and_then(|x| x.parse::().ok()) .unwrap_or(false); + let compression_level = configs + .get("compression_level") + .and_then(|x| x.parse::().ok()) + .unwrap_or(1); + // Use multi-threaded tokio runtime to prevent blocking spawned tasks if any let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() @@ -169,6 +176,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan( metrics, session_ctx: Arc::new(session), debug_native, + compression_level, }); Ok(Box::into_raw(exec_context) as i64) @@ -317,8 +325,11 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( // Because we don't know if input arrays are dictionary-encoded when we create // query plan, we need to defer stream initialization to first time execution. if exec_context.root_op.is_none() { - let planner = PhysicalPlanner::new(exec_context.session_ctx.clone()) - .with_exec_id(exec_context_id); + let planner = PhysicalPlanner::new( + exec_context.session_ctx.clone(), + exec_context.compression_level, + ) + .with_exec_id(exec_context_id); let (scans, root_op) = planner.create_plan( &exec_context.spark_plan, &mut exec_context.input_sources.clone(), @@ -455,6 +466,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative checksum_enabled: jboolean, checksum_algo: jint, current_checksum: jlong, + compression_level: jlong, ) -> jlongArray { try_unwrap_or_throw(&e, |mut env| unsafe { let data_types = convert_datatype_arrays(&mut env, serialized_datatypes)?; @@ -493,6 +505,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative checksum_enabled, checksum_algo, current_checksum, + compression_level as i32, )?; let checksum = if let Some(checksum) = checksum { diff --git a/core/src/execution/shuffle/row.rs b/core/src/execution/shuffle/row.rs index 2aeb48815..deecb75ec 100644 --- a/core/src/execution/shuffle/row.rs +++ b/core/src/execution/shuffle/row.rs @@ -3296,6 +3296,7 @@ pub fn process_sorted_row_partition( // this is the initial checksum for this method, as it also gets updated iteratively // inside the loop within the method across batches. initial_checksum: Option, + compression_level: i32, ) -> Result<(i64, Option), CometError> { // TODO: We can tune this parameter automatically based on row size and cache size. let row_step = 10; @@ -3355,7 +3356,7 @@ pub fn process_sorted_row_partition( let mut frozen: Vec = vec![]; let mut cursor = Cursor::new(&mut frozen); cursor.seek(SeekFrom::End(0))?; - written += write_ipc_compressed(&batch, &mut cursor)?; + written += write_ipc_compressed(&batch, &mut cursor, compression_level)?; if let Some(checksum) = &mut current_checksum { checksum.update(&mut cursor)?; diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 283749076..8a8a8f43f 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -36,6 +36,7 @@ Comet provides the following configuration settings. | spark.comet.exec.enabled | Whether to enable Comet native vectorized execution for Spark. This controls whether Spark should convert operators into their Comet counterparts and execute them in native space. Note: each operator is associated with a separate config in the format of 'spark.comet.exec..enabled' at the moment, and both the config and this need to be turned on, in order for the operator to be executed in native. By default, this config is false. | false | | spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 | | spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd | +| spark.comet.exec.shuffle.compressionLevel | Zstd compression level used in shuffle. | 1 | | spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. By default, this config is false. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | false | | spark.comet.exec.shuffle.mode | The mode of Comet shuffle. This config is only effective if Comet shuffle is enabled. Available modes are 'native', 'jvm', and 'auto'. 'native' is for native shuffle which has best performance in general. 'jvm' is for jvm-based columnar shuffle which has higher coverage than native shuffle. 'auto' is for Comet to choose the best shuffle mode based on the query plan. By default, this config is 'jvm'. | jvm | | spark.comet.explain.verbose.enabled | When this setting is enabled, Comet will provide a verbose tree representation of the extended information. | false | diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index 89225c0d6..55f85eeaf 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -23,7 +23,7 @@ import org.apache.spark._ import org.apache.spark.sql.comet.CometMetricNode import org.apache.spark.sql.vectorized._ -import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION} +import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_FRACTION, COMET_EXEC_SHUFFLE_COMPRESSION_LEVEL} import org.apache.comet.vector.NativeUtil /** @@ -85,6 +85,7 @@ class CometExecIterator( result.put("memory_fraction", String.valueOf(COMET_EXEC_MEMORY_FRACTION.get())) result.put("batch_size", String.valueOf(COMET_BATCH_SIZE.get())) result.put("debug_native", String.valueOf(COMET_DEBUG_ENABLED.get())) + result.put("compression_level", String.valueOf(COMET_EXEC_SHUFFLE_COMPRESSION_LEVEL.get())) // Strip mandatory prefix spark. which is not required for DataFusion session params conf.getAll.foreach { From d746ad4dc8b6343f2edf57aa98faab5858be8fe8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 5 Jul 2024 10:21:40 -0600 Subject: [PATCH 2/6] clippy --- core/src/execution/datafusion/shuffle_writer.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/execution/datafusion/shuffle_writer.rs b/core/src/execution/datafusion/shuffle_writer.rs index 8a910a765..37484c0e9 100644 --- a/core/src/execution/datafusion/shuffle_writer.rs +++ b/core/src/execution/datafusion/shuffle_writer.rs @@ -967,6 +967,7 @@ impl Debug for ShuffleRepartitioner { } } +#[allow(clippy::too_many_arguments)] async fn external_shuffle( mut input: SendableRecordBatchStream, partition_id: usize, From b135c0cba751f4c15c7df9a1a8eda5ce9cdea353 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 5 Jul 2024 10:48:51 -0600 Subject: [PATCH 3/6] update bench --- core/benches/shuffle_writer.rs | 1 + core/src/errors.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/core/benches/shuffle_writer.rs b/core/benches/shuffle_writer.rs index 4bebd045d..03f111721 100644 --- a/core/benches/shuffle_writer.rs +++ b/core/benches/shuffle_writer.rs @@ -39,6 +39,7 @@ fn criterion_benchmark(c: &mut Criterion) { Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16), "/tmp/data.out".to_string(), "/tmp/index.out".to_string(), + 1 ) .unwrap(); diff --git a/core/src/errors.rs b/core/src/errors.rs index b6f4d0889..10c55ceaf 100644 --- a/core/src/errors.rs +++ b/core/src/errors.rs @@ -585,6 +585,7 @@ mod tests { jvm().attach_current_thread().expect("Unable to attach JVM") } + #[ignore] #[test] pub fn error_from_panic() { let _guard = attach_current_thread(); From 0213ecab7f56eba370bc27d1cd119f04a52f3835 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 5 Jul 2024 10:50:08 -0600 Subject: [PATCH 4/6] revert change --- core/src/errors.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/errors.rs b/core/src/errors.rs index 10c55ceaf..b6f4d0889 100644 --- a/core/src/errors.rs +++ b/core/src/errors.rs @@ -585,7 +585,6 @@ mod tests { jvm().attach_current_thread().expect("Unable to attach JVM") } - #[ignore] #[test] pub fn error_from_panic() { let _guard = attach_current_thread(); From afdd25daea7312218e5a53d3e237a46892614ba8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 5 Jul 2024 12:13:46 -0600 Subject: [PATCH 5/6] clippy --- core/benches/shuffle_writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/benches/shuffle_writer.rs b/core/benches/shuffle_writer.rs index 03f111721..1261711fe 100644 --- a/core/benches/shuffle_writer.rs +++ b/core/benches/shuffle_writer.rs @@ -39,7 +39,7 @@ fn criterion_benchmark(c: &mut Criterion) { Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 16), "/tmp/data.out".to_string(), "/tmp/index.out".to_string(), - 1 + 1, ) .unwrap(); From 886f7549ef86523250d5b38ce184d09f137e0738 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 5 Jul 2024 12:35:36 -0600 Subject: [PATCH 6/6] update another benchmark --- core/benches/row_columnar.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/core/benches/row_columnar.rs b/core/benches/row_columnar.rs index 60b41330e..005394980 100644 --- a/core/benches/row_columnar.rs +++ b/core/benches/row_columnar.rs @@ -77,6 +77,7 @@ fn benchmark(c: &mut Criterion) { false, 0, None, + 1, ) .unwrap(); });