How to Build a Spark Cluster with Docker, JupyterLab, and Apache Livy—a REST API for Apache Spark

ad_przewozny

Adam Przewoźny

Posted on March 5, 2022

How to Build a Spark Cluster with Docker, JupyterLab, and Apache Livy—a REST API for Apache Spark

Originally written by Michał Klóska and Lidia Kurasińska

Have you ever wondered how you can leverage Apache Livy in your project to take your experience with the Apache Spark cluster to the next level? I put together a step-by-step guide that’ll help you achieve that goal.

To run a sample project and make the most of this guide, you’ll need to install the Docker container service first. If you’re not familiar with containers, you’ll find more details in the Docker documentation.

By reading this article, you’ll learn how to build a Spark cluster with the Livy server and JupyterLab based on the Docker virtual environment.

You’ll also find out how to prepare the business logic in JupyterLab and discover how I used a sample project to run PySpark code via the Livy service.

What is Apache Spark? Why should you use it?

Apache Spark is an analytics engine used to process petabytes of data in a parallel manner.

Thanks to simple-to-use APIs and structures such as RDD, data set, data frame with a rich collection of operators, as well as the support for languages like Python, Scala, R, Java, and SQL, it’s become a preferred tool for data engineers.

Due to its speed (it’s up to 100 times faster than Hadoop MapReduce) and its flexibility (e.g. support for SQL queries, machine learning, streaming, and graph processing) Apache Spark is deployed at a mass scale by enterprises across a wide range of industries.

Spark Ecosystem Source: https://databricks.com/spark/about

What is Apache Livy? How does it benefit your project?

Apache Livy is a service that enables easy interaction with a Spark cluster over REST API.

Some of its helpful features include:

  • submitting jobs as precompiled jars or snippets of code in Python/Scala/R,
  • running Spark jobs synchronously or asynchronously,
  • managing multiple SparkContexts simultaneously,
  • long-running SparkContext can be reused by many Spark jobs,
  • share cached RDDs or data frames across multiple jobs and clients,
  • secure authenticated communication.

You might be wondering how to make Apache Spark simpler to use in automated processing.

For example, we can imagine a situation where we submit Spark code written in Python or Scala into a cluster, just like we submit SQL queries into a database engine. If we don’t want to play with the command line to reach the cluster directly using SSH then Apache Livy comes into play with its REST API interface.

Livy architecture Source: https://livy.apache.org/assets/images/livy-architecture.png

Do you have to create an additional layer of logic to manage connections and all REST API functionalities?

No, thankfully there’s a dedicated library called pylivy that I’m going to use in the sample project. You can find pylivy examples and documentation here.

Testing the concept of using Apache Livy in Spark clusters

Let’s assume that we have to create an application to process some customer or market data on Spark and our app will run on a daily basis.

From a developer’s point of view, we first have to prepare scripts with the business logic. Next, we should prepare the jobs’ controllers to manage communication with our Spark cluster via the Livy REST API.

But before we develop anything, we need to install and configure Livy and the Spark cluster.

Setup requirements

As I mentioned at the beginning of this article, to keep the operating system free from dependencies, we use the Docker container service to run our infrastructure. So, first, we need to prepare Docker files for the Spark master and worker containers with a separate container for JupyterLab.

Installation and service configuration

If you don’t have the Docker service installed, follow these instructions to get it running on your machine.

If you’re here, I assume you have Docker ready to run our containers, so we can focus on the code.

To keep our concept more realistic, I decided to split infrastructure into three separate services:

  • jupyterlab,

  • spark-master,

  • spark-worker-1.

Also, to keep the logic simple and clear, I’ve created a dedicated folder for Docker services called livy_poc_docker.

In the first step, we will create a base image.

The base image uses Debian Linux and contains all required libraries: python3, open-jdk-8 and utilities e.g. wget, curl and unzip.

 FROM debian:stretch

   # System packages
   RUN apt-get clean && apt-get update -y && \
      apt-get install -y python3 python3-pip curl wget unzip procps openjdk-8-jdk && \
      ln -s /usr/bin/python3 /usr/bin/python && \
      rm -rf /var/lib/apt/lists/*
Enter fullscreen mode Exit fullscreen mode

To simplify image structure, I decided to add Spark installation into the base image. For production use, Spark should be added to a separate image.

 # Install Spark
   RUN curl https://apache.mirrors.tworzy.net/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz -o spark.tgz && \
      tar -xf spark.tgz && \
      mv spark-2.4.7-bin-hadoop2.7 /usr/bin/ && \
      mkdir /usr/bin/spark-2.4.7-bin-hadoop2.7/logs && \
      rm spark.tgz
Enter fullscreen mode Exit fullscreen mode

Next, I created the required directories and set up environment variables.

 # Prepare dirs
   RUN mkdir -p /tmp/logs/ && chmod a+w /tmp/logs/ && mkdir /app && chmod a+rwx /app && mkdir /data && chmod a+rwx /data
   ENV JAVA_HOME=/usr
   ENV SPARK_HOME=/usr/bin/spark-2.4.7-bin-hadoop2.7
   ENV PATH=$SPARK_HOME:$PATH:/bin:$JAVA_HOME/bin:$JAVA_HOME/jre/bin
   ENV SPARK_MASTER_HOST spark-master
   ENV SPARK_MASTER_PORT 7077
   ENV PYSPARK_PYTHON=/usr/bin/python
   ENV PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH
   ENV APP=/app
   ENV SHARED_WORKSPACE=/opt/workspace
   RUN mkdir -p ${SHARED_WORKSPACE}
   VOLUME ${SHARED_WORKSPACE}
Enter fullscreen mode Exit fullscreen mode

Then, I created an image for the Spark master container.

The Spark master container is responsible for hosting Spark master node and the Livy service.

This image is based on a previously created image called mk-spark-base.

Additional packages for Python will be installed in this step.

 FROM mk-spark-base

   # Python packages
   RUN pip3 install wget requests datawrangler
Enter fullscreen mode Exit fullscreen mode

And, of course, we can’t forget about Apache Livy.

 # Get Livy
   RUN wget https://apache.mirrors.tworzy.net/incubator/livy/0.7.0-incubating/apache-livy-0.7.0-incubating-bin.zip -O livy.zip && \
      unzip livy.zip -d /usr/bin/
Enter fullscreen mode Exit fullscreen mode

Spark master will be the final image we have to expose ports and set up the entry point.

 EXPOSE 8080 7077 8998 8888
   WORKDIR ${APP}
   ADD entryfile.sh entryfile.sh
   ENTRYPOINT ["sh", "entryfile.sh"]
Enter fullscreen mode Exit fullscreen mode

Okay, the configuration for the master image is ready. Now, we need a worker image.

It’s only responsible for running Spark node in a worker mode, and nothing else.

This image also uses the mk-spark-base image as a base.

 FROM mk-spark-base

   ENV APP=/app
   ENV JAVA_HOME=/usr
   ENV SPARK_HOME=/usr/bin/spark-2.4.7-bin-hadoop2.7
   ENV PATH=$SPARK_HOME:$PATH:/bin:$JAVA_HOME/bin:$JAVA_HOME/jre/bin
   ENV PYSPARK_PYTHON=/usr/bin/python
   ENV PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH

   EXPOSE 8081 7077 8998 8888

   WORKDIR ${APP}

   CMD /usr/bin/spark-2.4.7-bin-hadoop2.7/bin/spark-class org.apache.spark.deploy.worker.Worker spark://${SPARK_MASTER_HOST}:${SPARK_MASTER_PORT} >> /tmp/logs/spark-worker.out
Enter fullscreen mode Exit fullscreen mode

Now, it’s time to create a configuration for the JupyterLab image. Let’s call it mk-jupyter.

  FROM mk-spark-base

   # Python packages
   RUN pip3 install wget requests pandas numpy datawrangler findspark jupyterlab pyspark==2.4.7

   EXPOSE 8888

   WORKDIR ${SHARED_WORKSPACE}

   CMD jupyter lab --ip=0.0.0.0 --port=8888 --no-browser --allow-root --NotebookApp.token=
Enter fullscreen mode Exit fullscreen mode

How simple is that?

Finally, we have the configuration for all containers of our cluster, and it’s time to build images. This simple script runs the docker build command to create all required images.

  # Builds images
   docker build \
   -f base.Dockerfile \
   -t mk-spark-base .
   docker build \
   -f master.Dockerfile \
   -t mk-spark-master .
   docker build \
   -f worker.Dockerfile \
   -t mk-spark-worker .
   docker build \
   -f jupyter.Dockerfile \
   -t mk-jupyter .
Enter fullscreen mode Exit fullscreen mode

Let’s run it in the CLI: sh build.sh.

To start the already built containers, we have to prepare a Docker compose file.

 version: "3.6"
   volumes:
   shared-workspace:
   name: "hadoop-distributed-file-system"
   driver: local
   services:
   jupyterlab:
   image: mk-jupyter
   container_name: mk-jupyter
   ports:
   - 8888:8888
   volumes:
   - shared-workspace:/opt/workspace
   spark-master:
   image: mk-spark-master
   container_name: mk-spark-master
   ports:
   - 8080:8080
   - 7077:7077
   - 8998:8998
   volumes:
   - shared-workspace:/opt/workspace
   spark-worker-1:
   image: mk-spark-worker
   container_name: mk-spark-worker-1
   environment:
   - SPARK_WORKER_CORES=1
   - SPARK_WORKER_MEMORY=512m
   ports:
   - 8081:8081
   volumes:
   - shared-workspace:/opt/workspace
   depends_on:
   - spark-master
Enter fullscreen mode Exit fullscreen mode

The Docker compose file is responsible for service orchestration and describes input parameters like service name, ports, volumes, and environment variables, as well as dependencies between services, if they exist.

Coding the business logic

If you’re here, I assume you went through all previous steps successfully and all containers are running. This is the server site.

Now, we will focus on the business logic of our project—the client site. As I mentioned earlier, now we need to create the client script to communicate with the Spark server using REST API.

Before we start coding, I recommend creating a separate project where we put our code. Let’s call it livy_poc.

To play with the Livy server, we’ll use a Python library called pylivy. Of course, you can play with REST API directly using requests package but in my opinion pylivy will simplify our code a lot.

So, first, we have to install the required pylivy package like _pip install -U livy_.

It’s a common practice to create a virtual environment dedicated to a given project and install all required packages manually like above or using requirements.txt file _pip install -t requirements.txt_.

You can find helpful information on how to use venv here.

Now, we can create a file called titanic_data.py and put all logic there.

For simplicity, we’ll put all logic into one file, but in a real project it’s a good practice to split business logic into many files depending on the framework or project structure used.

In the client script, we have to import all required packages (livy and textwrap) to make our code mode friendly.

from livy import LivySession, SessionKind
   import textwrap
Enter fullscreen mode Exit fullscreen mode

In this example, we’ll play with the Titanic data set, which is very popular and free to use.

 # Get and load Titanic data into Spark
   get_titanic_data = textwrap.dedent(
   """
   from pathlib import Path
   import wget
   data_file = Path("/opt/workspace/titanic.csv")
   if not data_file.is_file():
   url = "https://web.stanford.edu/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv"
   wget.download(url, "/opt/workspace/titanic.csv")
   """
   )
Enter fullscreen mode Exit fullscreen mode

As you can see, we embedded Python code inside a multiline string and assigned it to the Python variable get_titanic_data.

We can treat the code wrapped by multiline string as a query-script that will be submitted to our Spark cluster. Like SQL query in case of database.

Notice the imports inside our multiline string. This means that Spark has to import those packages before executing the rest of the code.

Now, we have to create a SparkSession object and load the previously downloaded data set into a Spark data frame.

Here you can find additional imports of SparkSession, SQLContext libs, on the Spark side.

 load_to_spark = textwrap.dedent(
   """
   from pyspark.sql import SparkSession, SQLContext
   spark = SparkSession.\
   builder.\
   appName("pyspark-notebook-titanic").\
   master("spark://spark-master:7077").\
   config("spark.executor.memory", "512m").\
   getOrCreate()

   sql = SQLContext(spark)

   data = sql.read.option("header", "true").option("inferSchema", "true").csv("/opt/workspace/titanic.csv")
   print("---"*20)
   print("Loaded rows:", data.count())
   print("---"*20)
   """
   )
Enter fullscreen mode Exit fullscreen mode

Below you can see the logic assigned to variables that will be run against the Titanic data set on Spark cluster.

  # Run some analysis
   general_number_of_survived_passengers = textwrap.dedent(
   """
   survived = data[data["Survived"] == 1]
   survived_percent = survived.count()/data.count() * 100
   print("Total number of passengers:", data.count())
   print("Count of survived passengers:", survived.count())
   print("Percent of survived passengers:", survived_percent)
   """
   )

   percent_of_survived_passengers_with_siblings_spouses = textwrap.dedent(
   """
   sur_with_siblings = data[data["Siblings/Spouses Aboard"] != 0]
   sur_with_siblings_percent = sur_with_siblings.count()/data.count() * 100
   print("Count of passengers with siblings-spouses:", sur_with_siblings.count())
   print("Percent of survived passengers with siblings-spouses", sur_with_siblings_percent)
   """
   )

   percent_of_survived_passengers_with_parent_children = textwrap.dedent(
   """
   sur_with_parents = data[data["Parents/Children Aboard"] != 0]
   sur_with_parents_percent = sur_with_parents.count()/data.count() * 100
   print("Count of passengers with parents-children:", sur_with_parents.count())
   print("Percent of survived passengers with parents-children", sur_with_parents_percent)
   """
   )

   some_aggregations = textwrap.dedent(
   """
   grouped_passengers = data.groupby("pclass", "age", "survived").count() \
   .orderBy(
   data.Pclass,
   data.Age.desc(),
   data.Survived.asc()
   )
   print("---"*20)
   print("Aggregated data:")
   grouped_passengers.show(n=10)
   print("---"*20)
   """
   )

   save_aggregations_as_parquet = textwrap.dedent(
   """
   grouped_passengers.write \
   .option("header", "true") \
   .mode("overwrite") \
   .parquet("/opt/workspace/titanic_grouped_passengers.parquet")
   """
   )
Enter fullscreen mode Exit fullscreen mode

What’s important here is that the source data frame called data is used and shared across all snippets. This means that we can create a variable or a data frame in one place and use it in any other place in our code inside one Spark session.

Now, it’s time to submit our business logic to the cluster. What exactly is happening here?

Inside the main function, we’ll connect to the Livy server and create the session object. Next, we’ll call the run method and use variables with business logic defined earlier.

 # Execute pyspark code
   LIVY_SERVER = "http://127.0.0.1:8998"

   def main():
   with LivySession.create(LIVY_SERVER, kind=SessionKind.PYSPARK) as session:
   session.run(get_titanic_data)
   session.run(load_to_spark)
   session.run(general_number_of_survived_passengers)
   session.run(percent_of_survived_passengers_with_siblings_spouses)
   session.run(percent_of_survived_passengers_with_parent_children)
   session.run(some_aggregations)
   session.run(save_aggregations_as_parquet)

   # Let's download data locally from Spark as Pandas data frame
   grouped_passengers_1 = session.read("grouped_passengers")
   print("Spark data frame 'grouped_passengers' as local Pandas data frame:")
   print("grouped_passengers_1: ", grouped_passengers_1)

   # Run same aggregation using SparkSQL
   session.run("data.createOrReplaceTempView('titanic_data')")
   session.run(textwrap.dedent(
   """
   grouped_passengers_2 = sql.sql(
   '''
   select pclass, age, survived, count(*) as cnt
   from titanic_data
   group by pclass, age, survived
   '''
   )
   """
   ))

   # Read the result from spark as Pandas data frame
   grouped_passengers_2 = session.read("grouped_passengers_2")
   print("Spark data frame 'grouped_passengers' as local Pandas data frame generated by SparkSQL:")
   print("grouped_passengers_2: ", grouped_passengers_2)

   # Compare local data frames
   print("---"*20)
   print("Results comparison of Spark MR vs SparkSQL")
   print(f"len(grouped_passengers_1) ==", str(len(grouped_passengers_1)))
   print(f"len(grouped_passengers_2) ==", str(len(grouped_passengers_2)))
   print("---"*20)

   print("Spark session closed.")

   if __name__ == "__main__":
   main()
Enter fullscreen mode Exit fullscreen mode

As you can see, we used two different methods to execute our Spark code.

The run method is used to execute code; it returns an object with request status and text containing top ten rows.

However, if we want to use the data processed on our Spark cluster locally, we can use the read method that returns a Pandas data frame.

Additionally, Livy gives us the possibility to use SparkSQL (you can see how in our simple example). You can also use dedicated methods such as download_sql and read_sql.

Notably, in our example we’re creating a session object using a with clause so we don’t have to worry about closing Livy’s session at the end of the script.

Remember that after closing a Livy session, all data frames, variables, and objects will be permanently destroyed.

Cluster execution

If you’ve installed pylivy inside a virtual environment, it’s time to activate it by _source my_venv/bin/activate_. You can replace _source_ by . (a dot).

To execute our sample code, first you have to start a Docker server on your machine. In the next step, using CLI, go to a project with Docker services configuration, in this case called livy_poc_docker. Then write the _docker-compose start_ command and press Enter.

After one or two minutes, when all services are running using CLI, go to the project with our business login—in our case called livy_poc.

Inside our project, write _python tytanic_data.py_ and as the output you should see something like this:

 ------------------------------------------------------------
   Loaded rows: 887
   ------------------------------------------------------------
   Total number of passengers: 887
   Count of survived passengers: 342
   Percent of survived passengers: 38.55693348365276
   Count of passengers with siblings-spouses: 283
   Percent of survived passengers with siblings-spouses 31.905298759864714
   Count of passengers with parents-children: 213
   Percent of survived passengers with parents-children 24.01352874859076
   ------------------------------------------------------------
   Aggregated data:
   +------+----+--------+-----+
   |pclass| age|survived|count|
   +------+----+--------+-----+
   | 1|80.0| 1| 1|
   | 1|71.0| 0| 2|
   | 1|70.0| 0| 1|
   | 1|65.0| 0| 2|
   | 1|64.0| 0| 3|
   | 1|63.0| 1| 1|
   | 1|62.0| 0| 2|
   | 1|62.0| 1| 1|
   | 1|61.0| 0| 2|
   | 1|60.0| 0| 2|
   +------+----+--------+-----+
   only showing top 10 rows

   ------------------------------------------------------------
   Spark data frame 'grouped_passengers' as local Pandas data frame:
   grouped_passengers_1: pclass age survived count
   0 1 80.00 1 1
   1 1 71.00 0 2
   2 1 70.00 0 1
   3 1 65.00 0 2
   4 1 64.00 0 3
   .. ... ... ... ...
   277 3 2.00 1 2
   278 3 1.00 0 2
   279 3 1.00 1 3
   280 3 0.75 1 2
   281 3 0.42 1 1

   [282 rows x 4 columns]
   Spark data frame 'grouped_passengers' as local Pandas data frame generated by SparkSQL:
   grouped_passengers_2: pclass age survived cnt
   0 2 13.0 1 1
   1 1 58.0 0 2
   2 3 28.5 0 2
   3 1 23.0 1 3
   4 1 32.0 1 2
   .. ... ... ... ...
   277 3 22.0 1 9
   278 2 21.0 0 5
   279 2 33.0 1 2
   280 3 20.0 1 4
   281 1 60.0 1 2

   [282 rows x 4 columns]
   ------------------------------------------------------------
   Results comparison of Spark MR vs SparkSQL
   len(grouped_passengers_1) == 282
   len(grouped_passengers_2) == 282
   ------------------------------------------------------------
   Spark session closed.
Enter fullscreen mode Exit fullscreen mode

Status monitoring in Apache Livy, Apache Spark, and JupyterLab

Finally, we’ve got everything up and running. But how can we monitor service status? Below you’ll find some tips.

Apache Livy session monitoring

To check a Livy session status, go to http://localhost:8998.

If you have any sessions running, you’ll see the list of started sessions and their status.

Livy session - idle

Livy session - busy

Livy session - shutting down

You can click on the session number to see the details, such as executed code, status, and progress.

Livy session - overview

If you click on a session in the logs column to see all session’s logs, here’s what you will see:

Livy session - error log

And if there isn’t any session running, you’ll see this:

Livy - no sessions

Apache Spark cluster monitoring

Go to http://localhost:8080/ to check the Spark cluster.

Spark monitoring

Developing with JupyterLab

Now, it’s time to explain why I’ve created the JupyterLab service. To be honest with you, I really like this setup because it gives me the possibility to use the JupyterLab environment to develop and test my Spark scripts. When it’s ready, I can automate them using Livy API without any other configuration.

So this is how I’ve developed the business logic of the example presented in this article.

Go to http://localhost:8888 to see JupyterLab’s interface.

Jupyter code

Final thoughts on the Apache Livy REST API

In this article, we’ve covered how to use Apache Livy with Spark and what benefits Livy can bring to your work. I’ve also explained how to integrate it with a Python project and prepare a simple project based on the Docker environment, which makes this project system-agnostic.

Even though it might look like a lot to take in, remember that what I’ve shown you is only the tip of the iceberg. Livy can do much more, especially if you use pylivy as an abstraction layer. This will give you a powerful tool to create and manage your Spark jobs from your application or to create more flexible data pipelines.

If you decide to use Apache Livy in your next project, it will be beneficial for you to make yourself familiar with the pylivy documentation first. In there, you will find many helpful classes and methods that make using Livy very efficient and simple.

Additional resources and further reading

Docker
https://www.docker.com/get-started

Apache Livy
https://livy.apache.org/
https://livy.apache.org/docs/latest/rest-api.html
https://livy.incubator.apache.org/examples/

PyLivy
https://pylivy.readthedocs.io/en/stable/
https://pylivy.readthedocs.io/en/stable/api/session.html

Apache Spark
https://spark.apache.org/
https://spark.apache.org/examples.html

JupyterLab
https://jupyter.org/

💖 💪 🙅 🚩
ad_przewozny
Adam Przewoźny

Posted on March 5, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related