Apache Spark 101
Rubens Barbosa
Posted on May 25, 2024
In order to understand Spark let's remember what was the scenario before its creation. A couple of years ago computers became faster every year through processor speed increases. This trend in hardware stopped around 2005 due to hard limits in heat dissipation. So, hardware engineers stopped making individual processors faster, and started adding parallel CPU cores all running at the same speed. As a result of this change, applications needed to be modified to add parallelism in order to run faster.
Google wanted run giant computations on high volumes of data across large clusters. Because they were creating indexes of all the content of the web in order to identify the most important pages. So, they designed MapReduce, a parallel data processing framework, which enabled Google to index the web.
At that time, Hadoop MapReduce was the dominant parallel programming engine for clusters of thousands of nodes. So, why was Spark created? Well, MapReduce engine made it challenging and inefficient to build large applications that needed multiple MapReduce jobs together, which causes a lot of reading and writing to disk.
To address this issue, the Spark team first designed an API based on functional programming that could express multistep applications. The team then implemented this API over a new engine that could perform efficient, in-memory data sharing across computation steps.
What is Apache Spark?
Apache Spark is an open-source unified computing engine and a set of libraries for parallel data processing on computer clusters.
Spark is a fast engine for large-scale data processing, basically the idea is that we can write a code which describes how you want to transform a huge amount of data, and Spark will figure out how to distribute that work across an entire cluster of computers, i.e., the driver send tasks to workers to run/process them on a parallel mode. Apache Spark gets a massive data set and distribute the processing across an entire set of computers that work together in parallel at the same time.
In a nutshell Spark can execute tasks on data across a cluster of computers.
NOTE: Spark itself is written in Scala, and runs on the Java Virtual Machine (JVM). So, therefore to run Spark either on your laptop or a cluster, you need an installation of Java.
Architecture
Spark application architecture at high level
Spark architecture consists of driver process, executors, cluster manager, and worker nodes. Apache Spark follows a master and worker architecture; it has a single master and any number of workers.
There are some key componentes under the hood such as: Driver Program, Cluster Manager, Task, Partitions, Executors, Worker nodes.
Spark APIs
When working with Spark, we will come across different APIs
- RDD (Resilient Distributed Datasets) API
- DataFrame API
- Dataset API
- SQL API
- Structured Streaming API
RDDs
- RDDs are distributed collections of objects that can be processed in parallel;
- RDDs support two types of operations: transformations (which produce a new RDD) and actions (which return a value to the driver program after running a computation on the dataset);
- RDDs provides low-level control over data flow, data processing/operations;
- RDDs are fault tolerant, automatically recovers lost data due to node failures using lineage information. (Data lineage is the process of tracking the flow of data over time);
- RDDs don’t infer the schema of the data we need to specify it.
RDD Scala example:
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("rdd")
.getOrCreate()
// I wanna square everything
val rdd = spark.sparkContext.parallelize(List(1,2,3,4))
// we are creating a new RDD called squares
val rddSquares = rdd.map(x => x * x)
println(rddSquares)
res = 1, 4, 9, 16
The beauty of this example is that it could be distributed. So, if the RDD was really massive, it could actually split that processing up and handle that squaring in different chunks of that RDD on different nodes within our cluster, and send the result back to your driver script and get the final answers that we want.
Another example:
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("rdd")
.getOrCreate()
val rddNums: RDD[Int] = spark.sparkContext.parallelize(List(1,2,3))
val rddCollect: Array[Int] = rddNums.collect()
println("Action: RDD converted to Array[Int]")
Let's talk about rdd.collect()
method in Apache Spark is a powerful and potentially problematic operation. It's used to retrieve the entire rdd
from the distributed environment back to the local driver program. The collect()
method require a full dataset in memory, it carries significant risks and potentially issues, especially when dealing with large datasets.
Issues with rdd.collect()
memory overload because it transfers all data from the distributed nodes to the driver node. If the dataset is large, this can cause the driver program to run out of memory and crash because it tries to fit the entire dataset into the limited memory of the driver node. Imagine calling rdd.collect()
with terabytes of data, it will try to bring all that data into the memory of a single machine aka the driver, which is often impossible. So, in this scenario the job certainly will fail.
network bottleneck due to transferring large amounts of data over the network from the worker nodes to the driver node. This can lead to slow performance of the Spark job.
reduced parallelism one of the strengths of Spark is its ability to process data in parallel across a cluster, using collect()
invalidate this advantage by aggregating all the data into a single node, reducing the benefits of distributed processing.
Rules of thumb avoid use collect()
as much as possible, its use should be approached with caution. There are some best practices, instead of collecting the entire dataset, use Spark actions such as take(n)
, aggregate()
, reduce()
to perform computations on the data directly within the distributed environment. Also, persist intermediate results in memory or disk using persist()
or cache()
.
DataFrame
- DataFrames is a distributed collection of rows under named columns (similar to a table in a relational database);
- Built on top of RDDs it provides a higher-level abstraction for structured data;
- Simplifies data manipulation with a high-level API;
- Easily integrates with various data sources like JSON, CSV, Parquet, etc;
- It does not support compile time safely, thus the user is limited in case the structure of the data is not known.
DataFrame makes easier to perform complex data processing tasks
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
// Initialize SparkSession
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("dataframe")
.getOrCreate()
// Create DataFrame from CSV file
val filePath = "path/to/your/csvfile.csv"
val df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(filePath)
// Show the first 5 rows
df.show(5)
Dataset
- Datasets are a distributed collection of data, combining the best features of RDDs and DataFrames;
- A Dataset is a strongly-typed, immutable collection of objects that are mapped to a relational schema;
- Ensures compile-time type safety and supports object-oriented programming paradigms;
- The main disadvantage of datasets is that they require typecasting into strings;
- We can use it when complex transformations on structured data where compile-time type checking is beneficial.
import org.apache.spark.sql.{Dataset, SparkSession}
// Define the schema of our data
case class Client(name: String, age: Int, city: String)
// Initialize SparkSession
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("dataset")
.getOrCreate()
import spark.implicits._
// Create Dataset from a sequence of case class instances
val data = Seq(
Client("John", 30, "München"),
Client("Jane", 25, "Berlin"),
Client("Mike", 35, "Frankfurt"),
Client("Sara", 28, "Dachau")
)
val ds: Dataset[Client] = spark.createDataset(data)
// Show the content of the Dataset
ds.show()
Using Datasets, we can benefit from the best features of RDDs and DataFrames. Such as type safety and object-oriented programming interface of RDDs; and the optimizations execution, ease of use due to a higher level of abstraction from DataFrames for working with structured data in Spark.
SQL (via Spark SQL)
- Allows users to run SQL queries directly on DataFrames or Datasets;
- Provides a way to query data using standard SQL syntax;
- Uses standard SQL, which is familiar to many data professionals;
- Queries return DataFrames, enabling further processing using the DataFrame API;
- Ad-hoc querying and data exploration.
import org.apache.spark.sql.SparkSession
// Initialize SparkSession
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("sql")
.getOrCreate()
// Create DataFrame from CSV file
val filePath = "path/to/your/csvfile.csv"
val df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(filePath)
// Register the DF as a temp SQL view
df.createOrReplaceView("clients")
// Execute SQL queries
val allRowsDF = spark.sql("SELECT * FROM clients")
allRowsDF.show()
Using Spark SQL with Scala allows you to execute SQL queries on your data
Structured Streaming
- Built on the Spark SQL engine, it enables the same DataFrame and Dataset API to be used for stream processing;
- Uses the same API for batch and streaming data, simplifying the development process;
- Easy to use due to High-level abstraction for defining streaming computations;
- Real-time data processing and analytics;
- Stream processing applications that require the same APIs and optimizations as batch processing.
import org.apache.spark.sql.SparkSession
// Initialize SparkSession
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("StructuredStream")
.getOrCreate()
val kafkaStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "kafka_topic_name")
.option("startingOffsets", "earliest")
.load()
val query = kafkaStream
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
In this example, the code reads data from the Kafka topic. Then the key and value are written to the console in the output mode append. The awaitTermination
method is called to start the streaming query and wait for it to terminate.
Structured Streaming in Spark is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. It allows you to work with streaming data in the same way you would work with batch data.
Why should we use Spark?
- Spark can run programs up to 100 times faster than Hadoop MapReduce;
- It offers fast processing speed; through in-memory caching and processing data;
- Spark is a very mature technology, and it’s been out for a while so it’s reliable at this point;
- Spark is not that hard and applications can be implemented in a variety of programming languages like Scala, Java, Python;
- Spark puts together powerful libraries.
That's all folks :)
Posted on May 25, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.