Avoid These Top 10 Mistakes When Using Apache Spark

sudo_pradip

Pradip Sodha

Posted on August 28, 2024

Avoid These Top 10 Mistakes When Using Apache Spark

We all know how easy it is to overlook small parts of our code, especially when we have powerful tools like Apache Spark to handle the heavy lifting. Spark's core engine is great at optimizing our messy, complex code into a sleek, efficient physical plan. But here's the catch: Spark isn't flawless. It's on a journey to perfection, sure, but it still has its limits. And Spark is upfront about those limitations, listing them out in the documentation (sometimes as little notes).

But let’s be honest—how often do we skip the docs and head straight to Stack Overflow or ChatGPT for quick answers? I've been there too. The thing is, while these shortcuts can be useful, they don't always tell the whole story. So, if you're ready to dive in, let's talk about some common mistakes and how to avoid them. Stay with me; this is going to be a ride!


Table of Content


Mistake #1: Adding Columns the Wrong Way

client: "Hey, can you add 5 columns? Make it quick, okay?"

Developer: "Sure, I'll just use withColumn() in a loop 5 times!"

Client: (Happy) "Great! Now, can you add 10 more columns? Make it quick, and keep the code short!"

Developer: "No problem! I'll loop 15 times now."

Spark: "Sorry I can't optimize"

But wait—according to Spark's documentation...

Don't use withColumn in loop

Apache Spark scala doc of withColumn with it's limitation highlighted

Solution: SelectExpr or Select
here is complete solution,

def addOrReplaceColumns(newColumns: List[Column], sourceColumns: List[String]): List[Column] = {
  val (columnsToBeReplace, newColumns) = newColumns.partition(column => sourceColumns.contains(column.toString()))
  val restOfColumns = sourceColumns.diff(columnsToBeReplace.map(column => column.toString())).map(col => col)

  (columnsToBeReplace ++ newColumns ++ restOfColumns).toList
}
Enter fullscreen mode Exit fullscreen mode

Mistake #2: Order of Narrow and Wide Transformation

Normally we focus on business logic when developing a data solution and it's common to ignore the order of narrow and wide transformation but things is spark recommended to combine all narrow first and then wide, for example,
if you have

narrow, wide, narrow, narrow, wide, narrow 
Enter fullscreen mode Exit fullscreen mode

then try to arrange like,

narrow, narrow, narrow, wide, wide
Enter fullscreen mode Exit fullscreen mode

then spark will optimize your code more accurately for example all narrow transformation will happen as pipeline operation and only one shuffle will required.


Mistake #3: Overlooking Data Serialization Format

By default, Spark uses Java serialization, which is not the most efficient option. Switching to Kryo serialization can lead to better performance, as it is faster and uses less memory. Use the following configuration to enable Kryo:

spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
Enter fullscreen mode Exit fullscreen mode

But does not support all Serializable types and requires you to register the classes you’ll use in the program in advance for best performance.


Mistake #4: Not Using Parallel Listing on Input Paths

When reading files from storage systems like Amazon S3, Azure Data Lake Storage (ADLS), or even local storage, Spark needs to list and find all matching files in the input directory before starting the next task. This listing process can become a bottleneck, especially when dealing with large directories or a vast number of files. By default, Spark uses only a single thread to list files, which can significantly slow down the start of your job.

To mitigate this, you can increase the number of threads used for listing files by setting the spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads property. This allows Spark to parallelize the file listing process, speeding up the initialization phase of your job.

spark.conf.set("spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads", 10)
Enter fullscreen mode Exit fullscreen mode

Mistake #5: Ignoring Data Locality

Data locality significantly impacts the performance of Spark jobs.

When data and the code processing it are close together, computation is faster, as there is less need to move large chunks of data. Spark scheduling prioritizes data locality to minimize data movement, following levels of locality from best to worst: PROCESS_LOCAL (data and code in the same JVM), NODE_LOCAL (data on the same node), RACK_LOCAL (data on the same rack but different node), and ANY (data elsewhere on the network).

Spark tries to schedule tasks at the highest locality level possible, but this isn't always feasible. If no idle executors have unprocessed data at the desired locality level, Spark can either wait for a busy executor to free up or fall back to a lower locality level by moving data to an idle executor. The time Spark waits before falling back can be adjusted using the spark.locality.wait settings. Adjusting these settings can help improve performance in scenarios with long-running tasks or when data locality is poor.

In case of medium data skew or cluster with ample resources or using .catch() then increasing would benefits rather than going to lower locality.

spark.conf.set("spark.locality.wait", "10s")
Enter fullscreen mode Exit fullscreen mode

Mistake #6: Relying on Default Number of Shuffle Partitions

By default, Spark uses 200 partitions for shuffle operations (e.g., join, groupBy). This number might be too high or too low, depending on your dataset and cluster size.

AQE (enabled by default from 7.3 LTS + onwards) adjusts the shuffle partition number automatically at each stage of the query, based on the size of the map-side shuffle output.

But it's advisable to update shuffle partition before performing a wide transformation, if you need accurate optimization and if you are unsure spark recommended to set shuffle partition value to number of cores in your cluster.

spark.conf.set("spark.sql.shuffle.partitions", "num_core_in_cluster")
Enter fullscreen mode Exit fullscreen mode

And don't forgot to tune spark.default.parallelism this setting accordingly as well.


Mistake #7: Overlooking Broadcast Join Thresholds

Scenario:

Developer: "I thought small lookup tables would be broadcasted automatically and my each of executors has 32GB of memory! Why are my joins so slow?"

Spark: "Sorry, your lookup table is just above the default threshold."

Broadcast joins can drastically speed up join operations when one of the tables is small enough to fit into memory on each worker node. However, if you don't adjust the broadcast join threshold, Spark might not broadcast tables that could be effectively broadcasted, leading to unnecessary shuffling.

Solution:

Adjust the broadcast join threshold using spark.sql.autoBroadcastJoinThreshold. If your lookup table is slightly larger than the default 10MB limit, increase the threshold.

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 50 * 1024 * 1024) // 50MB
Enter fullscreen mode Exit fullscreen mode

When setting the broadcast join threshold, don't base it only on executor memory. The driver loads the small table into memory first before distributing it to executors. Make sure the threshold is suitable for both driver and executor memory capacities to prevent memory issues and optimize performance.


Mistake #8: Relying on default storage level for Cache

It’s crucial to select the appropriate storage level for caching and persisting data based on the type of executors in your cluster,

Choosing the right storage level based on the executor type and objectives is crucial for optimizing Spark performance and resource utilization. By understanding the trade-offs between speed, memory usage, and fault tolerance, you can tailor your Spark configuration to meet the specific needs of your application.

Executor Type Primary Objective Recommended Storage Level Description Alternative for Fault Tolerance Notes
Memory-Optimized Fast access, low memory usage MEMORY_ONLY_SER Stores RDD as serialized objects in memory. Balances speed and memory efficiency. MEMORY_ONLY_SER_2 Use MEMORY_ONLY if serialization overhead is not a concern.
MEMORY_ONLY Stores RDD as deserialized objects in memory. Fastest access, highest memory usage. MEMORY_ONLY_2 Use for small datasets that fit comfortably in memory.
CPU-Optimized Balanced memory and disk MEMORY_AND_DISK_SER Serialized storage in memory, spills to disk if needed. Good for large datasets. MEMORY_AND_DISK_SER_2 Preferred when memory is limited; avoids out-of-memory errors.
MEMORY_AND_DISK Deserialized storage in memory, spills to disk. Faster access than MEMORY_AND_DISK_SER. MEMORY_AND_DISK_2 Use when memory can accommodate deserialized objects, with fallback to disk.
General Purpose Flexibility, moderate size datasets MEMORY_AND_DISK Deserialized in-memory, spills to disk. Good balance for general use cases. MEMORY_AND_DISK_2 Good for mixed workloads; balances speed and fault tolerance.
MEMORY_ONLY_SER Serialized in-memory storage. Optimized for memory efficiency and speed. MEMORY_ONLY_SER_2 Suitable for datasets that fit well in memory after serialization.
Disk-Optimized Low memory, high fault tolerance DISK_ONLY Stores RDD partitions only on disk. Minimizes memory usage but slowest access. DISK_ONLY_2 Suitable for very large datasets where memory is a constraint.
MEMORY_AND_DISK_SER Serialized storage in memory with spillover to disk. More efficient than deserialized. MEMORY_AND_DISK_SER_2 Balances disk usage and memory efficiency.
  • The _2 options (e.g., MEMORY_ONLY_2, MEMORY_AND_DISK_2) are useful for scenarios where fault tolerance is crucial. They replicate data across two nodes, ensuring data is not lost if a node fails. This is particularly valuable in environments where reliability is prioritized over resource efficiency, such as production systems handling critical data or real-time processing pipelines.

  • The _SER option (e.g., MEMORY_AND_DISK_SER) Stores RDD as serialized Java objects (one byte array per partition) in memory. More memory-efficient than MEMORY_ONLY, but slower due to serialization/deserialization overhead.


Mistake #9: Misconfiguring Spark Memory Settings

Scenario:

Developer: "My Spark job keeps failing with out-of-memory errors. I gave it all the memory available!"

Spark: "Memory isn't just for you; I need some for myself, too."

Many users allocate almost all available memory to the executor heap space (spark.executor.memory) without considering Spark's overhead memory, causing frequent out-of-memory errors. Additionally, insufficient memory can lead to excessive garbage collection (GC) pauses, slowing down jobs.

Solution:

Properly configure memory settings by tuning spark.executor.memory and spark.executor.memoryOverhead.

--conf spark.executor.memory=4g --spark.executor.memoryOverhead=512m
Enter fullscreen mode Exit fullscreen mode

Ensure you leave enough memory overhead to accommodate Spark's internal needs (shuffle, RDD storage, etc.). Typically, 10-15% of the total memory should be allocated as overhead.

spark.memory.fraction expresses the size of M as a fraction of the (JVM heap space - 300MiB) (default 0.6). The rest of the space (40%) is reserved for user data structures, internal metadata in Spark, and safeguarding against OOM errors in the case of sparse and unusually large records.


Mistake #10: Relying Only on Cache and Persist

Many Spark developers are familiar with the cache() and persist() methods for improving performance, but they often overlook the value of checkpoint(). While cache() and persist() keep data in memory or on disk to speed up processing, they don’t provide fault tolerance in the case of a failure. checkpoint(), on the other hand, saves the RDD to a reliable storage system, allowing for fault recovery and optimizing job execution.

Using checkpoint() not only ensures that your job can recover from failures but also helps Spark optimize the execution of other jobs that share the same lineage. This can lead to improved performance and resource utilization.

spark.sparkContext.setCheckpointDir("path/to/checkpoint/dir")
df.checkpoint()
Enter fullscreen mode Exit fullscreen mode
💖 💪 🙅 🚩
sudo_pradip
Pradip Sodha

Posted on August 28, 2024

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related