Avoid These Top 10 Mistakes When Using Apache Spark
Pradip Sodha
Posted on August 28, 2024
We all know how easy it is to overlook small parts of our code, especially when we have powerful tools like Apache Spark to handle the heavy lifting. Spark's core engine is great at optimizing our messy, complex code into a sleek, efficient physical plan. But here's the catch: Spark isn't flawless. It's on a journey to perfection, sure, but it still has its limits. And Spark is upfront about those limitations, listing them out in the documentation (sometimes as little notes).
But let’s be honest—how often do we skip the docs and head straight to Stack Overflow or ChatGPT for quick answers? I've been there too. The thing is, while these shortcuts can be useful, they don't always tell the whole story. So, if you're ready to dive in, let's talk about some common mistakes and how to avoid them. Stay with me; this is going to be a ride!
Table of Content
- Mistake #1: Adding Columns the Wrong Way
- Mistake #2: Order of Narrow and Wide Transformation
- Mistake #3: Overlooking Data Serialization Format
- Mistake #4: Not Using Parallel Listing on Input Paths
- Mistake #5: Ignoring Data Locality
- Mistake #6: Relying on Default Number of Shuffle Partitions
- Mistake #7: Overlooking Broadcast Join Thresholds
- Mistake #8: Relying on default storage level for Cache
- Mistake #9: Misconfiguring Spark Memory Settings
- Mistake #10: Relying Only on Cache and Persist
Mistake #1: Adding Columns the Wrong Way
client: "Hey, can you add 5 columns? Make it quick, okay?"
Developer: "Sure, I'll just use withColumn() in a loop 5 times!"
Client: (Happy) "Great! Now, can you add 10 more columns? Make it quick, and keep the code short!"
Developer: "No problem! I'll loop 15 times now."
Spark: "Sorry I can't optimize"
But wait—according to Spark's documentation...
Don't use withColumn in loop
Solution: SelectExpr or Select
here is complete solution,
def addOrReplaceColumns(newColumns: List[Column], sourceColumns: List[String]): List[Column] = {
val (columnsToBeReplace, newColumns) = newColumns.partition(column => sourceColumns.contains(column.toString()))
val restOfColumns = sourceColumns.diff(columnsToBeReplace.map(column => column.toString())).map(col => col)
(columnsToBeReplace ++ newColumns ++ restOfColumns).toList
}
Mistake #2: Order of Narrow and Wide Transformation
Normally we focus on business logic when developing a data solution and it's common to ignore the order of narrow and wide transformation but things is spark recommended to combine all narrow first and then wide, for example,
if you have
narrow, wide, narrow, narrow, wide, narrow
then try to arrange like,
narrow, narrow, narrow, wide, wide
then spark will optimize your code more accurately for example all narrow transformation will happen as pipeline operation and only one shuffle will required.
Mistake #3: Overlooking Data Serialization Format
By default, Spark uses Java serialization, which is not the most efficient option. Switching to Kryo serialization can lead to better performance, as it is faster and uses less memory. Use the following configuration to enable Kryo:
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
But does not support all Serializable types and requires you to register the classes you’ll use in the program in advance for best performance.
Mistake #4: Not Using Parallel Listing on Input Paths
When reading files from storage systems like Amazon S3, Azure Data Lake Storage (ADLS), or even local storage, Spark needs to list and find all matching files in the input directory before starting the next task. This listing process can become a bottleneck, especially when dealing with large directories or a vast number of files. By default, Spark uses only a single thread to list files, which can significantly slow down the start of your job.
To mitigate this, you can increase the number of threads used for listing files by setting the spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads property. This allows Spark to parallelize the file listing process, speeding up the initialization phase of your job.
spark.conf.set("spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads", 10)
Mistake #5: Ignoring Data Locality
Data locality significantly impacts the performance of Spark jobs.
When data and the code processing it are close together, computation is faster, as there is less need to move large chunks of data. Spark scheduling prioritizes data locality to minimize data movement, following levels of locality from best to worst: PROCESS_LOCAL (data and code in the same JVM), NODE_LOCAL (data on the same node), RACK_LOCAL (data on the same rack but different node), and ANY (data elsewhere on the network).
Spark tries to schedule tasks at the highest locality level possible, but this isn't always feasible. If no idle executors have unprocessed data at the desired locality level, Spark can either wait for a busy executor to free up or fall back to a lower locality level by moving data to an idle executor. The time Spark waits before falling back can be adjusted using the spark.locality.wait settings. Adjusting these settings can help improve performance in scenarios with long-running tasks or when data locality is poor.
In case of medium data skew or cluster with ample resources or using .catch() then increasing would benefits rather than going to lower locality.
spark.conf.set("spark.locality.wait", "10s")
Mistake #6: Relying on Default Number of Shuffle Partitions
By default, Spark uses 200 partitions for shuffle operations (e.g., join, groupBy). This number might be too high or too low, depending on your dataset and cluster size.
AQE (enabled by default from 7.3 LTS + onwards) adjusts the shuffle partition number automatically at each stage of the query, based on the size of the map-side shuffle output.
But it's advisable to update shuffle partition before performing a wide transformation, if you need accurate optimization and if you are unsure spark recommended to set shuffle partition value to number of cores in your cluster.
spark.conf.set("spark.sql.shuffle.partitions", "num_core_in_cluster")
And don't forgot to tune spark.default.parallelism
this setting accordingly as well.
Mistake #7: Overlooking Broadcast Join Thresholds
Scenario:
Developer: "I thought small lookup tables would be broadcasted automatically and my each of executors has 32GB of memory! Why are my joins so slow?"
Spark: "Sorry, your lookup table is just above the default threshold."
Broadcast joins can drastically speed up join operations when one of the tables is small enough to fit into memory on each worker node. However, if you don't adjust the broadcast join threshold, Spark might not broadcast tables that could be effectively broadcasted, leading to unnecessary shuffling.
Solution:
Adjust the broadcast join threshold using spark.sql.autoBroadcastJoinThreshold. If your lookup table is slightly larger than the default 10MB limit, increase the threshold.
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 50 * 1024 * 1024) // 50MB
When setting the broadcast join threshold, don't base it only on executor memory. The driver loads the small table into memory first before distributing it to executors. Make sure the threshold is suitable for both driver and executor memory capacities to prevent memory issues and optimize performance.
Mistake #8: Relying on default storage level for Cache
It’s crucial to select the appropriate storage level for caching and persisting data based on the type of executors in your cluster,
Choosing the right storage level based on the executor type and objectives is crucial for optimizing Spark performance and resource utilization. By understanding the trade-offs between speed, memory usage, and fault tolerance, you can tailor your Spark configuration to meet the specific needs of your application.
Executor Type | Primary Objective | Recommended Storage Level | Description | Alternative for Fault Tolerance | Notes |
---|---|---|---|---|---|
Memory-Optimized | Fast access, low memory usage | MEMORY_ONLY_SER |
Stores RDD as serialized objects in memory. Balances speed and memory efficiency. | MEMORY_ONLY_SER_2 |
Use MEMORY_ONLY if serialization overhead is not a concern. |
MEMORY_ONLY |
Stores RDD as deserialized objects in memory. Fastest access, highest memory usage. | MEMORY_ONLY_2 |
Use for small datasets that fit comfortably in memory. | ||
CPU-Optimized | Balanced memory and disk | MEMORY_AND_DISK_SER |
Serialized storage in memory, spills to disk if needed. Good for large datasets. | MEMORY_AND_DISK_SER_2 |
Preferred when memory is limited; avoids out-of-memory errors. |
MEMORY_AND_DISK |
Deserialized storage in memory, spills to disk. Faster access than MEMORY_AND_DISK_SER . |
MEMORY_AND_DISK_2 |
Use when memory can accommodate deserialized objects, with fallback to disk. | ||
General Purpose | Flexibility, moderate size datasets | MEMORY_AND_DISK |
Deserialized in-memory, spills to disk. Good balance for general use cases. | MEMORY_AND_DISK_2 |
Good for mixed workloads; balances speed and fault tolerance. |
MEMORY_ONLY_SER |
Serialized in-memory storage. Optimized for memory efficiency and speed. | MEMORY_ONLY_SER_2 |
Suitable for datasets that fit well in memory after serialization. | ||
Disk-Optimized | Low memory, high fault tolerance | DISK_ONLY |
Stores RDD partitions only on disk. Minimizes memory usage but slowest access. | DISK_ONLY_2 |
Suitable for very large datasets where memory is a constraint. |
MEMORY_AND_DISK_SER |
Serialized storage in memory with spillover to disk. More efficient than deserialized. | MEMORY_AND_DISK_SER_2 |
Balances disk usage and memory efficiency. |
The
_2
options (e.g.,MEMORY_ONLY_2
,MEMORY_AND_DISK_2
) are useful for scenarios where fault tolerance is crucial. They replicate data across two nodes, ensuring data is not lost if a node fails. This is particularly valuable in environments where reliability is prioritized over resource efficiency, such as production systems handling critical data or real-time processing pipelines.The
_SER
option (e.g., MEMORY_AND_DISK_SER) Stores RDD as serialized Java objects (one byte array per partition) in memory. More memory-efficient than MEMORY_ONLY, but slower due to serialization/deserialization overhead.
Mistake #9: Misconfiguring Spark Memory Settings
Scenario:
Developer: "My Spark job keeps failing with out-of-memory errors. I gave it all the memory available!"
Spark: "Memory isn't just for you; I need some for myself, too."
Many users allocate almost all available memory to the executor heap space (spark.executor.memory) without considering Spark's overhead memory, causing frequent out-of-memory errors. Additionally, insufficient memory can lead to excessive garbage collection (GC) pauses, slowing down jobs.
Solution:
Properly configure memory settings by tuning spark.executor.memory and spark.executor.memoryOverhead.
--conf spark.executor.memory=4g --spark.executor.memoryOverhead=512m
Ensure you leave enough memory overhead to accommodate Spark's internal needs (shuffle, RDD storage, etc.). Typically, 10-15% of the total memory should be allocated as overhead.
spark.memory.fraction
expresses the size of M as a fraction of the (JVM heap space - 300MiB) (default 0.6). The rest of the space (40%) is reserved for user data structures, internal metadata in Spark, and safeguarding against OOM errors in the case of sparse and unusually large records.
Mistake #10: Relying Only on Cache and Persist
Many Spark developers are familiar with the cache() and persist() methods for improving performance, but they often overlook the value of checkpoint(). While cache() and persist() keep data in memory or on disk to speed up processing, they don’t provide fault tolerance in the case of a failure. checkpoint(), on the other hand, saves the RDD to a reliable storage system, allowing for fault recovery and optimizing job execution.
Using checkpoint() not only ensures that your job can recover from failures but also helps Spark optimize the execution of other jobs that share the same lineage. This can lead to improved performance and resource utilization.
spark.sparkContext.setCheckpointDir("path/to/checkpoint/dir")
df.checkpoint()
Posted on August 28, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.