UPSERTS and DELETES using AWS Glue and Delta Lake

klescosia

Kyle Escosia

Posted on July 21, 2021

UPSERTS and DELETES using AWS Glue and Delta Lake

The purpose of this blog post is to demonstrate how you can enable your Data Lake to be ACID-compliant, that is, having the same functionality as a database. This will allow you to do UPSERTS and DELETES directly to your data lake

Let me start first by defining what a Data Lake is:

From AWS

A data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale. You can store your data as-is, without having to first structure the data, and run different types of analytics—from dashboards and visualizations to big data processing, real-time analytics, and machine learning to guide better decisions.

Data Lake

A data lake is scalable, performant, secure, and cost-efficient. And it has played a crucial part of an organization's Data Analytics pipeline. So what's the problem?

Well, updates.

We all know that data lakes are immutable - the idea that data or objects should not be modified after they are created; how do we then go beyond that immutability?

Delta Lake

The answer is Delta Lake.

An open-source storage layer that brings scalable, ACID transactions to Apache Spark™ and big data workloads. It provides serializability, the strongest level of isolation level. Scalable Metadata Handling, Time Travel, and is 100% compatible with Apache Spark APIs.

Basically, it allows you to do DELETES and UPSERTS directly to your data lake.

How Spark Fails ACID

We all know our beloved Spark doesn't support ACID transactions, but to be fair, it isn't really built to address that kind of specific use case.

I came across a blog post from kundankumarr, explaining how Spark fails ACID.

Atomicity & Consistency

Atomicity states that it should either write full data or nothing to the data source when using spark data frame writer. Consistency, on the other hand, ensures that the data is always in a valid state.

Isolation & Durability

We know that when a transaction is in process and not yet committed, it must remain isolated from any other transaction. This is called Isolation Property. It means writing to a data set shouldn’t impact another concurrent read/write on the same data set.

Finally, Durability. It is the ACID property which guarantees that transactions that have committed will survive permanently. However, when Spark doesn’t correctly implement the commit, then all the durability features offered by the storage goes for a toss.

AWS Glue and Delta Lake

This part demonstrates how you can use Delta Lake with AWS Glue.

These are the services that will be used in this exercise:

AWS Glue

a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development.

Amazon Athena

an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

Amazon S3

an object storage service that offers industry-leading scalability, data availability, security, and performance.

This is what we'll be doing:

kyle-escosia-process-flow

Basically, I have an initial data then I want to apply changes to the Sales and Profit column. Then the table in the AWS Glue Data Catalog should be able to capture that changes. Just a basic update to the data.

So, let's start!

Pre-requisites

First, download the data here - I used Tableau's Superstore Dataset, this one is on Kaggle, you may need to register for an account to download.

Then, you need to download the Delta Lake .jar file to access it's libraries. You can download it here. Upload it on your S3 Bucket and take note of the S3 path, we'll use this as a reference later.

As of this writing, Glue's Spark Engine (v2.4) only supports v0.6.1 of Delta Lake since versions beyond that were implemented in Spark 3.0.

❗❗❗ UPDATE: AWS GLUE 3.0 WAS RELEASED ON AUGUST 2021! Check out my blog post on this one: ❗❗❗

AWS Glue

Navigate to AWS Glue then proceed to the creation of an ETL Job. Specify the This job runs to A new script to be authored by you. This will allow you to have a custom spark code.

Under Security configuration, script libraries, and job parameters (optional), specify the location of where you stored the .jar file as shown below:
etl-job

Then on blank script page, paste the following code:

from delta import *
from pyspark.sql.session import SparkSession
Enter fullscreen mode Exit fullscreen mode

This imports the SparkSession libraries as well as the Delta Lake libraries.

# Initialize Spark Session with Delta Lake
spark = SparkSession \
  .builder \
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
  .getOrCreate()
Enter fullscreen mode Exit fullscreen mode

This code initializes the SparkSession along with the Delta Lake configurations.

# Read Source
inputDF = spark.read.format("csv") \
.option("header", "true") \
.load('s3://delta-lake-ia-test/raw/')
Enter fullscreen mode Exit fullscreen mode

We read the source csv file as a Spark DataFrame.

# Write data as DELTA TABLE
inputDF.write.format("delta") \
.mode("overwrite") \
.save("s3a://delta-lake-ia-test/current/")
Enter fullscreen mode Exit fullscreen mode

Then we output it as a Delta format.

❗ Notice the use of s3a prefix in the save path, it is essential to use the s3a prefix instead of the standard s3 as the path. As using the s3 prefix, will throw an UnsupportedFileSystemException error. Followed by a fs.AbstractFileSystem.s3.impl=null: No AbstractFileSystem configured for scheme: s3.

More on the differences of s3 and s3a here

# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(spark, "s3a://delta-lake-ia-test/current/")
deltaTable.generate("symlink_format_manifest")

Enter fullscreen mode Exit fullscreen mode

Athena supports reading from external tables using a manifest file, which is a text file containing the list of data files to read for querying a table. Running the above code will generate a manifest file.

Read more about Delta Lake's integration for Presto and Athena here

Final Code:

from delta import *
from pyspark.sql.session import SparkSession

spark = SparkSession \
  .builder \
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
  .getOrCreate()


# Read Source
inputDF = spark.read.format("csv").option("header", "true").load('s3://delta-lake-ia-test/raw/')

# Write data as DELTA TABLE
inputDF.write.format("delta").mode("overwrite").save("s3a://delta-lake-ia-test/current/")

# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(spark, "s3a://delta-lake-ia-test/current/")
deltaTable.generate("symlink_format_manifest")

Enter fullscreen mode Exit fullscreen mode

Amazon Athena

In your S3 bucket, you should see a _symlink_format_manifest prefix/folder. This will be used by Amazon Athena for mapping out the parquet files.

Create your table using the code below as a reference:

CREATE EXTERNAL TABLE IF NOT EXISTS "default"."superstore" ( 
row_id STRING,
order_id STRING,
order_date STRING,
ship_date STRING,
ship_mode STRING,
customer_id STRING,
customer_name STRING,
segment STRING,
country STRING,
city STRING,
state STRING,
postal_code STRING,
region STRING,
product_id STRING,
category STRING,
sub_category STRING,
product_name STRING,
sales STRING,
quantity STRING,
discount STRING,
profit STRING
) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 

STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://delta-lake-ia-test/current/_symlink_format_manifest/'

Enter fullscreen mode Exit fullscreen mode

I'm lazy. I've made things simple by using STRING for all columns.

Note that you have to define the table name as it is when you you wrote it as a delta table. Or else you'll get blank results when querying with Athena.

-- Run a simple select
SELECT *
FROM "default"."superstore"
LIMIT 10
Enter fullscreen mode Exit fullscreen mode

kyle-escosia-athena-sample-resultset

Recap:

  • Read from a CSV
  • Created a Spark DataFrame from the CSV
  • Written the DataFrame as a Delta Table
  • Made a manifest file
  • Created an external table in Athena
  • Query sample data

What we'll do next:

  • Read the updates from the CSV
  • Make an update based on the new files
  • Generate/update the manifest file

Let's add another Glue ETL Job for the updates.

I have manually modified my raw data to simulate the updates, I just plug in the 99999 values in the sales and profit for the first 15 rows. Feel free to have your own modifications.

After which, upload it to your S3 Bucket in a different location.

from delta import *
from pyspark.sql.session import SparkSession


spark = SparkSession \
  .builder \
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
  .getOrCreate()

Enter fullscreen mode Exit fullscreen mode

Nothing new here.

# Read updates
df_updates = spark.read.format("csv").option("header", "true").load('s3://delta-lake-ia-test/updates/')

# Read current as DELTA TABLE
df = DeltaTable.forPath(spark, "s3a://delta-lake-ia-test/current/")
Enter fullscreen mode Exit fullscreen mode

First line is a typical read from csv code.

The next line creates a DeltaTable object, which allows us to call functions in the delta package.

# UPSERT process
final_df = df.alias("full_df").merge(
    source = df_updates.alias("append_df"),
    condition = expr("append_df.row_id = full_df.row_id"))\
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll() \
  .execute()
Enter fullscreen mode Exit fullscreen mode

One of this is the merge(source, condition) function, which:

Merges the data from the source DataFrame based on the given merge condition.

First, we take a DeltaTable DataFrame object, then give it an alias. We then call the merge() function, supplying the Parameters with the our Arguments. Which, in this case, is the updates DataFrame and the merge condition.

Then, we call the whenMatchedUpdateAll(condition=None)

Updates all the columns of the matched table row with the values of the corresponding columns in the source row. If a condition is specified, then it must be true for the new row to be updated.

to have the code update all the columns.

If the condition specified in the merge() function doesn't match, then we do a whenNotMatchedInsertAll(condition=None)

Insert a new target Delta table row by assigning the target columns to the values of the corresponding columns in the source row. If a condition is specified, then it must evaluate to true for the new row to be inserted.

Lastly, we call the execute() function to sum it up

Execute the merge operation based on the built matched and not matched actions.

# Generate new MANIFEST file
final_df = DeltaTable.forPath(spark, "s3a://delta-lake-ia-test/current/")
final_df.generate("symlink_format_manifest")
Enter fullscreen mode Exit fullscreen mode

Then we update the manifest file.

For more functions in the library, kindly refer to the official docs

Final code:


from delta import *
from pyspark.sql.session import SparkSession


spark = SparkSession \
  .builder \
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
  .getOrCreate()


# Read updates
df_updates = spark.read.format("csv").option("header", "true").load('s3://delta-lake-ia-test/updates/')

# Read current as DELTA TABLE
df = DeltaTable.forPath(spark, "s3a://delta-lake-ia-test/current/")

# UPSERT process
final_df = df.alias("full_df").merge(
    source = df_updates.alias("append_df"),
    condition = expr("append_df.row_id = full_df.row_id"))\
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll() \
  .execute()

# Generate new MANIFEST file
final_df = DeltaTable.forPath(spark, "s3a://delta-lake-ia-test/current/")
final_df.generate("symlink_format_manifest")
Enter fullscreen mode Exit fullscreen mode

Now, try querying your updated in Athena. It should show the most updated data.

kyle-escosia-athena-updated-dataset


Conclusion and Final Thoughts

This blog post demonstrated how you can leverage ACID transactions in your data lake.

Having a functionality like this is helpful especially if you have requirements such as Change Data Capture (CDC). Curious to know how you guys implemented such things. Let me know in the comments!

I've read a couple of articles and blog posts whether a Data Lake should be immutable or not.

From O'Reilly, Data Lake for Enterprises by Tomcy John, Pankaj Misra on the topic of Immutable Data Principle

The data should be stored in a raw format from the different source systems. More importantly, the data stored should be immutable in nature.

By making it immutable, it inherently takes care of human fault tolerance to at least some extent and takes away errors with regards to data loss and corruption. It allows data to be selected, inserted, and not updated or deleted.

To cater to fundamental fast processing/performance, the data is usually stored in a denormalized fashion. Data being immutable makes the system in general simpler and more manageable.

From SQLServerCentral by Steve Jones on the topic of Should the Data Lake be Immutable?

Imagine I had a large set of data, say GBs in a file, would I want to download this and change a few values before uploading it again? Do we want a large ETL load process to repeat?

Could we repeat the process and reload a file again? I don't think so, but it's hard to decide. After all, the lake isn't the source of data; that is some other system.

Maybe that's the simplest solution, and one that reduces complexity, downtime, or anything else that might be involved with locking and changing a file.

Comments on the topic can be found here.

There is a comment from roger.plowman

I suspect immutability should be asked after asking if you should even have the data lake or warehouse in the first place.

What do you guys think? Would love to hear your thoughts!

Speaking of Data Lakes vs Data Warehouse, there's also this very interesting concept I picked up from one of the AWS Community Builder (Joel Farvault), it is called Data Mesh Architecture, in which they describe it as the next enterprise data platform architecture. I'll leave it up for you to read on about. AWS also made a blog post on this using AWS Lake Formation.

There is also a youtube video back from the AWS DevDay Data & Analytics conducted on July 14, 2021. Where AWS Technical Evangelists; Javier Ramirez and Ricardo Sueiras discusses the Data Mesh Architecture.

Additionally, here is a video about Data Mesh in practice for Europe's biggest online fashion retailer.

Hope this helps! Let me know if you have questions below.


Happy coding!

P.S. CloudFormation stack is on going

💖 💪 🙅 🚩
klescosia
Kyle Escosia

Posted on July 21, 2021

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related