ETL with Spark on Azure Databricks and Azure Data Warehouse (Part 2)
Rubens Barbosa
Posted on April 30, 2022
Hey y'all, this is a continuation of the previous article. We already have data on Azure Data Lake Storage. Now, we will integrate it with Apache Spark on Azure Databricks to perform a small transformation on top of the JSON, and send the data to Azure SQL Data Warehouse. I'll try to be the most hands on as possible. Let's get started!
What is Apache Spark?
Apache Spark is a framework for processing large-scale data, i.e., Big Data distributed across clusters. It is used for executing data engineering, data science, and machine learning.
Overview
The main abstraction Spark provides is a Resilient Distributed Dataset RDD which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. Thus, Spark running multiple processes concurrently in parallel that don't interfere each other. RDD can be created from text files, SQL databases, NoSQL databases, HDFS, Cloud Storage and so on. The processing of RDD is done entirely in memory.
At a high level, in a Spark cluster you will have a driver node and then several worker nodes. The driver node is running the main program which has all of the transformations that you want to do with your data and then get sent out to the worker nodes who then operate a task and return that result do the driver node. This is the core engine of Spark on top of that there are several library modules that allow developers to easily interact with the core engine. These libraries include: Spark SQL, Spark Streaming, MLLib, GraphX.
Databricks
Databricks is a company that was founded by the creators of Apache Spark with the intention to make Apache Spark much easier to use it. Databricks develops a web-based platform for working with Spark. that provides automated cluster management and IPython style notebooks.
The Databricks workspace is the cloud based environment in which you use Databricks and includes the user interface, integrated storage, security settings, job scheduling, and most importantly notebooks.
Cluster
As mentioned before Spark is all about cluster. That's why, we'll first create a cluster on Databricks. After launch Azure Databricks workspace go to compute, then create cluster.
Once we have our cluster running we can create a notebook and start coding, which I'll use PySpark. We will start a creation of ETL with PySpark on Azure Databricks. In the load phase we will write data on Azure SQL Data Warehouse. So, we must already have our Data Warehouse deployed and get the connection string.
Creation of ETL with PySpark on Azure Databricks Notebook
Let's have a look.
Extract
First of all, we need extract the JSON file from Azure Data Lake Storage and read it into DataFrame. After that, we will ingest data into PySpark DataFrame.
# Title: ETL Spark: extract from Azure Data Lake Storage, and load to Azure SQL Data Warehouse
# Language: PySpark
# Author: Rubens Santos Barbosa
# config the session using spark object and set the key from our azure data lake storage account
spark.conf.set(
"fs.azure.account.key.YOUR_AZURE_DATA_LAKE_STORAGE_ACCOUNT_NAME.dfs.core.windows.net",
"YOUR_AZURE_DATA_LAKE_ACCOUNT_KEY"
)
# abfss://AZURE_DL_CONTAINER_NAME@AZURE_DL_STORAGE_ACCOUNT_NAME.dfs.core.windows.net/DIRECTORY_CLIENT
dbutils.fs.ls("abfss://az-covid-data@engdatalake.dfs.core.windows.net/directory-covid19")
# path JSON file on azure data lake storage
covid_data_json = "abfss://az-covid-data@engdatalake.dfs.core.windows.net/directory-covid19/covid-2022-4-21.json"
# read JSON file into DataFrame
df = spark.read.option("multiline","true").json(covid_data_json)
# PySpark print schema
df.printSchema()
We might wanna see some content from our DataFrame.
# showing first 5 rows
df.head(5)
Transform
We've just done the data extraction. Now, we will do a little transformation. Let's analyze if there is missing data in our columns on PySpark Dataframe.
# missing values in a specific column of pySpark dataframe
df.filter(df['bairroPaciente'].isNull()).count()
# count null value in every column
for col in df.columns:
print(col, "\t", "with null values: ", df.filter(df[col].isNull()).count())
df.count()
We noticed that our PySpark DataFrame there are 82 rows, and there some columns with 81 null values. So, let's drop these columns.
# columns in pyspark dataframe to drop
columns_to_drop = ['classificacaoEstadoSivep', 'comorbidadeAsmaSivep', 'comorbidadeDiabetesSivep', 'comorbidadeHematologiaSivep', 'comorbidadeImunodeficienciaSivep', 'comorbidadeNeurologiaSivep', 'comorbidadeObesidadeSivep', 'comorbidadePneumopatiaSivep', 'comorbidadePuerperaSivep', 'comorbidadeRenalSivep', 'comorbidadeSindromeDownSivep', 'dataEntradaUtisSvep', 'dataEvolucaoCasoSivep', 'dataInternacaoSivep', 'dataResultadoExame', 'dataSolicitacaoExame', 'evolucaoCaso', 'idSivep', 'paisPaciente', 'cnesNotificacaoEsus', 'comorbidadeCardiovascularSivep', 'dataColetaExame', 'resultadoFinalExame', 'tipoTesteExame']
# delete columns in pyspark dataframe
df = df.drop(*columns_to_drop)
Let's display our data.
df.describe()
display(df)
As you can see above the columns dataInicioSintomas and dataNotificacao are in timestamp format, I will transform it to date format in our PySpark DataFrame.
from pyspark.sql.functions import to_date
# timestamp to date
df = df.withColumn("dataInicioSintomas", to_date(df['dataInicioSintomas']))
df = df.withColumn("dataNotificacao", to_date(df['dataNotificacao']))
display(df)
Load
We've done the data transformation. We will load these data into Azure SQL Data Warehouse.
# removing repeated rows
distinctDF = df.distinct()
print("Distinct count: "+str(distinctDF.count()))
display(distinctDF)
# Load PySpark DataFrame to Azure SQL Data Warehouse
db_table = "dbo.COVID"
sql_password = "YOUR_PASSWORD"
jdbc_url = "jdbc:sqlserver://cosmos-database.database.windows.net:1433;database=cosmos-pool;user=rubnsbarbosa@cosmos-database;password=" + sql_password + ";encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
distinctDF.write.jdbc(url=jdbc_url, table=db_table, mode="append")
We've finished the ETL with PySpark on Azure Databricks.
Azure SQL Data Warehouse
Before loading PySpark DataFrame into Azure SQL Data Warehouse, we must have created our table in our SQL DW. So, we must enter the query editor and create. You can see the query I created below.
CREATE TABLE dbo.COVID (
bairroPaciente VARCHAR(254),
codigoMunicipioPaciente VARCHAR(254),
codigoPaciente VARCHAR(254),
dataInicioSintomas VARCHAR(50),
dataNotificacao VARCHAR(50),
estadoPaciente VARCHAR(10),
idadePaciente VARCHAR(10),
municipioNotificacaoEsus VARCHAR(60),
municipioPaciente VARCHAR(100),
profissionalSaude VARCHAR(60),
racaCorPaciente VARCHAR(60),
sexoPaciente VARCHAR(60)
);
It might happen some firewall issues when you try to load the data, you just need go into Firewalls and Virtual Networks [inside of SQL DW] and save the Client IP address. Finally, let's see the data into our Azure SQL Data Warehouse.
Conclusion
We have the second and last part of our project completed. It was created an ETL using Spark on Azure Databricks Cluster. In the extraction phase we got data from Azure Data Lake Storage, we performed a basic transformation, and the data was loaded into Azure SQL Data Warehouse as proposed.
Posted on April 30, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.