Spark on AWS Glue: Performance Tuning 4 ( Spark Join)

tmyoda

Tomoya Oda

Posted on July 16, 2023

Spark on AWS Glue: Performance Tuning 4 ( Spark Join)

This is a continuation of my previous posts as follows.

  1. Spark on AWS Glue: Performance Tuning 1 (CSV vs Parquet)

  2. Spark on AWS Glue: Performance Tuning 2 (Glue DynamicFrame vs Spark DataFrame)

  3. Spark on AWS Glue: Performance Tuning 3 (Impact of Partition Quantity)

  4. Spark on AWS Glue: Performance Tuning 4 (Spark Join)

  5. Spark on AWS Glue: Performance Tuning 5 (Using Cache)

Spark Join

Apache Spark has a type of join called Broadcast Join, which avoids shuffle processing. This method is effective when one table is small and the other is large. Essentially, it distributes the small table to all worker nodes, allowing each node to perform the join. This experiment will test the effectiveness of Broadcast Join for speed optimization using a small dataframe (small enough to fit in the memory of each worker node) and a large dataframe.

https://sparkbyexamples.com/spark/broadcast-join-in-spark/?expand_article=1

With broadcast join, Spark broadcast the smaller DataFrame to all executors and the executor keeps this DataFrame in memory and the larger DataFrame is split and distributed across all executors so that Spark can perform a join without shuffling any data from the larger DataFrame as the data required for join colocated on every executor.

BroadCast Join

join_df = part_df.select(part_df['request_port']).distinct().withColumn("random", F.round(F.rand()*(10-5)+5,0))

with timer('broadcast join dataframe'):
    broadcast_df = part_df.join(join_df.hint('BROADCAST'), part_df.request_port == join_df.request_port, how='left')
    broadcast_df.count()

with timer('sortmerge join dataframe'):
    merge_df = part_df.join(join_df.hint('MERGE'), part_df.request_port == join_df.request_port, how='left')
    merge_df.count()

with timer('shuffle hash join dataframe'):
    shuffle_df = part_df.join(join_df.hint('SHUFFLE_HASH'), part_df.request_port == join_df.request_port, how='left')
    shuffle_df.count()
Enter fullscreen mode Exit fullscreen mode

I used Join Hints to suggest the join strategy. You can find more about JOIN hints here.

https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-hints.html#:~:text=The%20join%20side%20with%20the,BROADCAST%20are%20BROADCASTJOIN%20and%20MAPJOIN%20.&text=Suggests%20that%20Spark%20use%20shuffle%20sort%20merge%20join

Although not shown here, I looked Physical Plan using explain(), to verify the Hint's effectiveness.

[shuffle hash join dataframe] done in 23.2022 s
[broadcast join dataframe] done in 11.7729 s
[sortmerge join dataframe] done in 38.4018 s
Enter fullscreen mode Exit fullscreen mode

The broadcast join is the fastest!

Summary

When performing a JOIN operation between a small df and a large df, the broadcast join is the fastest strategy.

💖 💪 🙅 🚩
tmyoda
Tomoya Oda

Posted on July 16, 2023

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related