Continuation to my tuning spark join series. In this article ,I would like to demonstrate every spark data engineer’s nightmare ‘shuffling’ and tuning tips. And then Spark’s underrated AQE’(Adaptive Query Execution) and how it helps in tuning Spark Joins . I always prefer to explain things with examples and code snippets which would definitely make some difference in learning things than just by reading theoretically. Let’s jump in …
While doing any multi-row operations like joins, grouping and aggregating all nodes in the spark cluster should exchange data so that each node should get a piece of data.
Shuffling is very costly operation which requires all nodes should exchange data via network . But there is no other go when performing aggregates by grouping.
By default Spark sets shuffle partitions as 200. This must (**Must) be changed when dealing with large data sets.
Let’s take an example ,
Sales Data : 20M
Products Data: 75M
Allocated Executors : 5[Local]
Verdict : Changing default shuffle partitions(200) either by increasing or coalescing partitions based on spark configuration and data size would significantly improve join performance
Join with Default Shuffle Partitions (200) : 22 Seconds
Change Default Shuffle partitions : 15 seconds
As you can see , having decent compact 25 shuffle partitions rather than small 200 partitions made a difference in join performance. You can see significant improvement on big Spark cluster with correct shuffle partitions.
How many shuffle partitions ?
This is tricky question and also complex. Having said that , It depends on Spark physicals resources, configuration and size of the data that you are dealing. It’s not that easy to find the correct number . You just have to try for different values and evaluate to get any conclusion.
The guidelines given by Databricks
SPARK AQE (Adaptive Query Execution)
Over the years, there’s been an extensive and continuous effort to improve Spark SQL’s query optimizer and planner in order to generate high-quality query execution plans
Adaptive Query Execution (AQE) is an optimization technique in Spark SQL that makes use of the runtime statistics to choose the most efficient query execution plan.
In Spark 3.0, the AQE framework is shipped with three features:
- Dynamically coalescing shuffle partitions
- Dynamically switching join strategies
- Dynamically optimizing skew joins
With the Current Data set I have , I am going to show case Dynamically switching join strategies .
Dynamically Switching Join :
AQE converts sort-merge join to broadcast hash join when the runtime statistics of any join side is smaller than the broadcast hash join threshold. This is not as efficient as planning a broadcast hash join in the first place, but it’s better than keep doing the sort-merge join, as we can save the sorting of both the join sides.
-- Get total sales count grouped by sales order for product with a product id 59354721.
-- The selectivity of the filter by product id is not known in static planning, so the initial plan opts for sort merge join.
-- But in fact, the "products" table after filtering is very small(just 1 row), so the query can do a broadcast hash join instead.
-- Static explain shows the initial plan with sort merge join.
-- Without AQE enabled , Spark does Sort Merge Join
-- AQE enabled, Spark does Broadcast join during run time.
Without AQE : 21 seconds and Join type is Sort Merge
AQE Enabled : 8 seconds and Join Type Broadcast
That’s all for now .. Hope you liked it . Please let me know your feedback
Thank you !!