Running Delta Lake on Amazon EMR Serverless

neylsoncrepalde

Neylson Crepalde

Posted on July 30, 2022

Running Delta Lake on Amazon EMR Serverless

Amazon EMR Serverless is a brand new AWS Service made generally available in June 1st, 2022. With this service, it is possible to run serverless Spark clusters that can process TB scale data very easily and using any spark open source libraries. Getting started with EMR Serverless can be a bit tricky. The goal of this post is to help you get your Spark+Delta jobs up and running "serverlessly". Let's get to it!

Setup - Authentication

In order to run EMR Serverless you'll need to configure two IAM roles, a service-linked role and an access authorization role for your spark jobs. The service-linked role is very straightforward to create. Go to IAM on the AWS console, click on roles and click on Create role,

IAM Role console

choose Amazon EMR Serverless,

Configurations to create a Service role

and choose default settings until you finish creating the role. Next, create a job role with permissions to access S3 and glue. We will create a very open role (not the best practice) for didactic purposes. In a "production" environment, you should make your permissions very strict.

Click again Create role on the AWS console Roles section and mark Custom Trust Policy. Below, in the "Service" key, replace "{}" with emr-serverless.amazonaws.com.

Configurations for Custom trust policy

Next, you can select 2 AWS managed policies, "AmazonS3FullAccess" and "AWSGlueConsoleFullAccess". Click next, give your new role an easy identifiable name (like "EMRServerlessJobRole") and finish creating the role.

Setup - Data

For this post, we are working with the (very famous) titanic dataset which you can download here and upload to S3.

Data Pipeline Strategy

Delta Lake is a great tool that implements the Lakehouse architecture. It has many cool features (such as schema evolution, data time travel, transaction logs, ACID transactions) and it is fundamentally valuable when we have a case of incremental data ingestion. Thus, we are going to simulate some changes in titanic dataset. We will include two new passengers (Ney and Sarah) and we will update information on two passengers that were presumed dead but found alive(!!!), Mr. Owen Braund and Mr. William Allen.

First version of data is written as a delta table and the updates will be written with an upsert transaction.

Python code to do those operations is presented below:



from pyspark.sql import functions as f
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .getOrCreate()
)

from delta.tables import *

print("Reading CSV file from S3...")

schema = "PassengerId int, Survived int, Pclass int, Name string, Sex string, Age double, SibSp int, Parch int, Ticket string, Fare double, Cabin string, Embarked string"
df = spark.read.csv(
    "s3://<YOUR-BUCKET>/titanic", 
    header=True, schema=schema, sep=";"
)

print("Writing titanic dataset as a delta table...")
df.write.format("delta").save("s3://<YOUR-BUCKET>/silver/titanic_delta")

print("Updating and inserting new rows...")
new = df.where("PassengerId IN (1, 5)")
new = new.withColumn("Survived", f.lit(1))
newrows = [
    (892, 1, 1, "Sarah Crepalde", "female", 23.0, 1, 0, None, None, None, None),
    (893, 0, 1, "Ney Crepalde", "male", 35.0, 1, 0, None, None, None, None)
]
newrowsdf = spark.createDataFrame(newrows, schema=schema)
new = new.union(newrowsdf)

print("Create a delta table object...")
old = DeltaTable.forPath(spark, "s3://<YOUR-BUCKET>/silver/titanic_delta")


print("UPSERT...")
# UPSERT
(
    old.alias("old")
    .merge(new.alias("new"), 
    "old.PassengerId = new.PassengerId"
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)

print("Checking if everything is ok")
print("New data...")

(
    spark.read.format("delta")
    .load("s3://<YOUR-BUCKET>/silver/titanic_delta")
    .where("PassengerId < 6 OR PassengerId > 888")
    .show()
)

print("Old data - with time travel")
(
    spark.read.format("delta")
    .option("versionAsOf", "0")
    .load("s3://<YOUR-BUCKET>/silver/titanic_delta")
    .where("PassengerId < 6 OR PassengerId > 888")
    .show()
)


Enter fullscreen mode Exit fullscreen mode

This .py file should be uploaded to S3.

Dependencies

One thing about EMR Serverless latest release available (6.6.0) is that the spark-submit flag --packages is not available yet (😢). So, we have an extra step to use java dependencies and python dependencies.

Jars

To use java dependencies, we have to build them manually into a single .jar file. AWS has provided a Dockerfile that we can use to build the dependencies without having to install maven locally (😍). I used this pom.xml file to define the dependencies:



<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.serverless-samples</groupId>
    <artifactId>jars</artifactId>
    <packaging>jar</packaging>
    <version>1.0-SNAPSHOT</version>
    <name>jars</name>
    <url>http://maven.apache.org</url>
    <dependencies>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-core_2.12</artifactId>
            <version>1.2.1</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <finalName>uber-${artifactId}-${version}</finalName>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>


Enter fullscreen mode Exit fullscreen mode

We build the final .jar file with the command



docker build -f Dockerfile.jars --output . .


Enter fullscreen mode Exit fullscreen mode

The output uber-jars-1.0-SNAPSHOT.jar must be uploaded to S3.

With this .jar file, we can use .format("delta") in our python code but if we try to import delta.tables we will get a python dependency error.

Python

We can build python dependencies in two ways: uploading dependencies files to S3 or building a virtual environment to use in EMR Serverless. For this post, uploading a zip file with delta python library was very simple to do.



mkdir dependencies
pip install delta-spark==1.2.1 --target dependencies
cd dependencies
zip -r9 ../emrserverless_dependencies.zip .


Enter fullscreen mode Exit fullscreen mode

The emrserverless_dependencies.zip file must also be uploaded to S3.

Now, we are ready to configure our serverless Spark application.

EMR Serverless

First, we must create an EMR Studio. If you don't have any studios created yet, this is very straightforward. After clicking Get started in the EMR Serverless home page, you can click to create a studio automatically.

Create EMR Studio automatically

Then click to enter the studio url.

Studio url

With EMR Serverless, we don't have to create a cluster. Instead, we work with the application concept. To create a new EMR Serverless application, click Create application, type an application name, select version and click Create application again at the bottom of the page.

Application creation page

Now, the last thing to do is to submit a spark job. If you have aws cli installed, the code below will submit a job spark job.



aws emr-serverless start-job-run \
--name Delta-Upsert \
--application-id <YOUR-APPLICATION-ID> \
--execution-role-arn arn:aws:iam::<ACCOUNT-NUMBER>:role/EMRServerlessJobRole \
--job-driver '{
  "sparkSubmit": {
    "entryPoint": "s3://<YOUR-BUCKET>/pyspark/emrserverless_delta_titanic.py", 
    "sparkSubmitParameters": "--jars s3://<YOUR-BUCKET>/pyspark/jars/uber-jars-1.0-SNAPSHOT.jar --conf spark.submit.pyFiles=s3://<YOUR-BUCKET>/pyspark/dependencies/emrserverless_dependencies.zip"
  }
}' \
--configuration-overrides '{
"monitoringConfiguration": {
  "s3MonitoringConfiguration": {
    "logUri": "s3://<YOUR-BUCKET>/emr-serverless-logs/"} 
  } 
}'


Enter fullscreen mode Exit fullscreen mode

Some important parameters:

  • entrypoint sets the s3 path for you pyspark script
  • sparkSubmitParameters: you should add the java dependencies with the --jars flag and set the --conf spark.submit.pyFiles=<YOUR .py/.zip/.egg FILE>
  • s3MonitoringConfiguration sets the s3 path that will be used to save job logs.

If you wish to use the console, set the job name, role and script location

Job initial parameters

and .jar file and .zip file location as follows

Jar files parameters

Spark job should start after this. When it finishes, check the logs folder in s3 (look for your application ID, job ID and SPARK_DRIVER logs). You should see something like this



Reading CSV file from S3...
Writing titanic dataset as a delta table...
Updating and inserting new rows...
Create a delta table object...
UPSERT...
Checking if everything is ok
New data...
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       1|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       1|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|        889|       0|     3|"Johnston, Miss. ...|female|null|    1|    2|      W./C. 6607|  23.45| null|       S|
|        890|       1|     1|Behr, Mr. Karl Ho...|  male|26.0|    0|    0|          111369|   30.0| C148|       C|
|        891|       0|     3| Dooley, Mr. Patrick|  male|32.0|    0|    0|          370376|   7.75| null|       Q|
|        892|       1|     1|      Sarah Crepalde|female|23.0|    1|    0|            null|   null| null|    null|
|        893|       0|     1|        Ney Crepalde|  male|35.0|    1|    0|            null|   null| null|    null|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+

Old data - with time travel
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|        889|       0|     3|"Johnston, Miss. ...|female|null|    1|    2|      W./C. 6607|  23.45| null|       S|
|        890|       1|     1|Behr, Mr. Karl Ho...|  male|26.0|    0|    0|          111369|   30.0| C148|       C|
|        891|       0|     3| Dooley, Mr. Patrick|  male|32.0|    0|    0|          370376|   7.75| null|       Q|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+


Enter fullscreen mode Exit fullscreen mode

Notice the same delta table being shown in 2 different moments using time travel. The last version, with Mr. Braund and Mr. Allen marked as alive and the new passengers in the first table and the original version of the dataset in the second table.

💖 💪 🙅 🚩
neylsoncrepalde
Neylson Crepalde

Posted on July 30, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related