Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory over-reservation when running native shuffle write #887

Open
Kontinuation opened this issue Aug 29, 2024 · 3 comments · May be fixed by #988
Open

Memory over-reservation when running native shuffle write #887

Kontinuation opened this issue Aug 29, 2024 · 3 comments · May be fixed by #988
Assignees
Labels
bug Something isn't working
Milestone

Comments

@Kontinuation
Copy link
Member

Describe the bug

We've seen this exception when running queries with spark.comet.exec.shuffle.mode=native:

Py4JJavaError: An error occurred while calling o456.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 174.0 failed 4 times, most recent failure: Lost task 4.3 in stage 174.0 (TID 9264) (10.0.132.242 executor 7): org.apache.comet.CometNativeException: External error: Resources exhausted: Failed to allocate additional 913120256 bytes for ShuffleRepartitioner[0] with 0 bytes already allocated for this reservation - 901355929 bytes remain available for the total pool
	at org.apache.comet.Native.executePlan(Native Method)
	at org.apache.comet.CometExecIterator.getNextBatch(CometExecIterator.scala:105)
	at org.apache.comet.CometExecIterator.hasNext(CometExecIterator.scala:128)
	at org.apache.spark.sql.comet.execution.shuffle.CometShuffleWriteProcessor.write(CometShuffleExchangeExec.scala:496)
	at org.apache.spark.sql.comet.shims.ShimCometShuffleWriteProcessor.write(ShimCometShuffleWriteProcessor.scala:35)
	at org.apache.spark.sql.comet.shims.ShimCometShuffleWriteProcessor.write$(ShimCometShuffleWriteProcessor.scala:28)
	at org.apache.spark.sql.comet.execution.shuffle.CometShuffleWriteProcessor.write(CometShuffleExchangeExec.scala:452)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)

This happens when running TPC-H Query 10 with scale factor = 1. The memory allocated for comet is quite small but it should not prevent the query from finishing.

Steps to reproduce

Running TPC-H query 10 on a Spark cluster. The detailed environment and spark configurations are listed in Additional context.

Expected behavior

All TPC-H queries should finish successfully.

Additional context

The problem was produced on a self-deployed K8S Spark cluster on AWS.

  • Driver/executor instance type: r7i.2xlarge (8 vCPUs, 64GB memory)
  • Executor pod resource limit: 6 vCPUs, 48GB memory. We reserved some resources for some reason
  • Number of executor instances: 48
  • Spark version: 3.4.0
  • Java version: 17
  • Comet version: commit 9205f0d

Here are relevant spark configurations:

spark.executor.cores 6
spark.executor.memory 30719m
# Reserve native memory for comet, python and other stuff
spark.executor.memoryOverheadFactor 0.6
# Each executor core gets 1.2 GB memory for comet, all 6 executors will use 7.2GB memory.
# I know this is too small for comet, but it should not prevent the query from finishing
spark.comet.memory.overhead.factor 0.04

spark.sql.extensions org.apache.comet.CometSparkSessionExtensions
spark.comet.enabled true
spark.comet.exec.enabled true
spark.comet.exec.all.enabled true
spark.comet.exec.shuffle.enabled true
spark.comet.exec.shuffle.mode auto
spark.shuffle.manager org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager
@Kontinuation Kontinuation added the bug Something isn't working label Aug 29, 2024
@Kontinuation
Copy link
Member Author

Kontinuation commented Aug 29, 2024

I found that the memory reserved by the native shuffle writer is based on guesses of data sizes and proportional to the number of partitions. For TPC-H query 10, the schema of shuffled record batches is:

Field Type Slot Size
col0 Int64 8 * len
col1 Utf8 104 * len
col2 Decimal128(12, 2) 16 * len
col3 Utf8 104 * len
col4 Utf8 104 * len
col5 Utf8 104 * len
col6 Utf8 104 * len
col7 Decimal128(36, 4) 16 * len
col8 Boolean len / 8

The estimated size of each record batch is 4588544 bytes given the batch size of 8192. The shuffle repartitioner reserves a batch for each partition, given the partition number of 200 (the default value of spark.sql.shuffle.partitions), the total amount of reserved memory is 917708800 bytes (nearly 900 MB).

We have multiple cores on each executor, each core runs a shuffle writer and reserves its own memory. On a 6-core worker instance, the amount of reserved memory will be 5 GB. If we tune the number of shuffle partitions for large ETL, the native shuffle writer has to reserve more memory. For partition number = 2000, the total reserved memory will be 50 GB. This is usually more than the total memory of the worker instance. Even if we allocate most of the worker instance's memory to comet, we'll still run into problems.

Maybe a better approach is to always reserve comet execution memory * shuffle fraction amount of memory beforehand, and check if the reserved memory was exceeded each time we insert a record batch? We can estimate the amount of allocated memory using batch.get_array_memory_size(), and trigger a spill if we've already ingested too many batches. This would make the native shuffle writer adaptive to the actual size of record batches and avoid over-reservation.

@andygrove andygrove added this to the 0.3.0 milestone Sep 16, 2024
@andygrove andygrove modified the milestones: 0.3.0, 0.4.0 Sep 24, 2024
@viirya viirya self-assigned this Sep 27, 2024
@viirya
Copy link
Member

viirya commented Sep 27, 2024

I'm working on this.

@Kontinuation
Copy link
Member Author

I'm working on this.

That's great to hear! A few days ago, I tried to tackle this issue myself. You can see my approach here. However, I suspect my solution might not be the most efficient, as it involved changing the initial size of array builders, which could lead to the cost of reallocations. I'm really looking forward to seeing a better solution!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants