Spark tip: Disable Coalescing Post Shuffle Partitions for compute intensive tasks
Artem Plotnikov
Posted on August 26, 2022
TL;DR
Set spark.sql.adaptive.coalescePartitions.enabled
to false
when performing compute intensive operations inside User Defined Aggregate Functions (UDAFs) on small inputs.
In detail
Apache Spark has a nice feature called Adaptive Query Execution (AQE), which performs optimizations based on runtime statistics and is enabled by default since 3.2.0. One such optimization is Coalescing Post Shuffle Partitions for dynamic shuffle partition number tuning. The latter improves query performance most of the time, but may have an opposite effect on long running compute intensive tasks. Take a look at the example below.
Test data:
from pyspark.sql.types import StringType, DateType
import pyspark.sql.functions as f
from datetime import date, timedelta
cities = ['Amsterdam', 'Oslo', 'Warsaw', 'Copenhagen', 'Prague', 'Helsinki', 'Paris', 'Berlin', 'Dublin', 'Reykjavik']
cities_df = spark.createDataFrame(cities, StringType()).toDF('city')
dates = [date(2021, 1, 1) + timedelta(days=x) for x in range(0, 365)]
dates_df = spark.createDataFrame(dates, DateType()).toDF('day')
df = cities_df.crossJoin(dates_df).withColumn('orders', f.floor(100 * f.rand(seed=1234)))
df.show()
+---------+----------+------+
| city| day|orders|
+---------+----------+------+
|Amsterdam|2021-01-01| 71|
|Amsterdam|2021-01-02| 83|
|Amsterdam|2021-01-03| 20|
|Amsterdam|2021-01-04| 23|
|Amsterdam|2021-01-05| 89|
|Amsterdam|2021-01-06| 42|
|Amsterdam|2021-01-07| 50|
|Amsterdam|2021-01-08| 47|
|Amsterdam|2021-01-09| 57|
|Amsterdam|2021-01-10| 65|
|Amsterdam|2021-01-11| 59|
|Amsterdam|2021-01-12| 45|
|Amsterdam|2021-01-13| 52|
|Amsterdam|2021-01-14| 62|
|Amsterdam|2021-01-15| 14|
|Amsterdam|2021-01-16| 66|
|Amsterdam|2021-01-17| 18|
|Amsterdam|2021-01-18| 51|
|Amsterdam|2021-01-19| 59|
|Amsterdam|2021-01-20| 53|
+---------+----------+------+
only showing top 20 rows
df.count() # shows 3650
Now we fake model training for time series data by doing some stupid compute intensive operations inside UDAF.
from pyspark.sql.types import StructType, StructField, StringType, DateType, LongType
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd
import random
schema = StructType([
StructField("city", StringType()),
StructField("day", DateType()),
StructField("orders", LongType()),
StructField("ml_orders", LongType()),
])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def compute_intensive(df):
fake_fitted_orders = []
for y in df['orders'].to_list():
yhat = y
for i in range(0, 10000):
yhat += random.randint(-1, 1)
fake_fitted_orders.append(yhat)
df = df.assign(ml_orders = pd.Series(fake_fitted_orders))
return df
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "false")
result_df = df.groupBy("city").apply(compute_intensive)
result_df
will have spark.sql.shuffle.partitions
partitions:
assert(result_df.rdd.getNumPartitions() == int(spark.conf.get("spark.sql.shuffle.partitions")))
In my case there are 250 partitions by default, but actually there are 10 non empty ones since we have so little data in place:
num_non_empty_partitions_df = result_df \
.withColumn('partition_id', f.spark_partition_id()) \
.groupby('partition_id') \
.count()
num_non_empty_partitions_df.show()
+------------+-----+
|partition_id|count|
+------------+-----+
| 145| 365|
| 25| 365|
| 108| 365|
| 92| 365|
| 8| 365|
| 40| 365|
| 84| 365|
| 235| 365|
| 152| 365|
| 34| 365|
+------------+-----+
The stage, which relates to groupby
operation has 250 tasks, but only 10 tasks done actual work.
The busy tasks have been placed on the same executor 25, and 8 out of 10 tasks started at the same time. This number corresponds to the amount of CPU cores.
Now let's enable AQE coalescePartitions and see what happens.
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
# Do the same computation again
AQEShuffleRead
operator will be added to the execution plan, which dynamically coalesces shuffle partitions.
All operations are now performed inside just one task.
Let's measure the overall performance degradation.
import time
def with_execution_time(name, fn):
st = time.time()
fn()
et = time.time()
elapsed_time = et - st
print("Execution time for '{}' {} seconds".format(name, str(elapsed_time)))
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "false")
with_execution_time('coalescePartitions disabled', lambda: result_df.count())
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
with_execution_time('coalescePartitions enabled', lambda: result_df.count())
Execution time for 'coalescePartitions disabled' 11.219334125518799 seconds
Execution time for 'coalescePartitions enabled' 30.78111720085144 seconds
Now the query runs almost 3 times slower on our synthetic data, but in real world we observed a 10x degradation for one distributed training pipeline soon after Spark 3.2.0 upgrade.
If disabling Coalescing Post Shuffle Partitions is undesirable, Spark allows to fine tune its behavior using spark.sql.adaptive.coalescePartitions.parallelismFirst
, park.sql.adaptive.coalescePartitions.minPartitionSize
and spark.sql.adaptive.advisoryPartitionSizeInBytes
options.
Hopefully, this article gave you not only a useful Spark tip, but also a framework for debugging similar issues in your apps.
Posted on August 26, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
August 26, 2022