ELT Data Pipeline with Kubernetes CronJob, Azure Data Lake, Azure Databricks (Part 1)
Rubens Barbosa
Posted on April 24, 2022
Hey world, the concept of ETL are far from new, but nowadays it is widely used in the industry. ETL stands for Extract, Transform, and Load. Okay, but what does that mean? The easiest way to understand how ETL works is to understand what happens in each step of the process. Let's dive into it.
Extract
During the extraction, raw data is moved from a structured or unstructured data pool to a staging data repository.
Transform
The data source might have a different structure than the target destination, we'll transform the data from the source schema to the destination schema.
Load
In this phase, we'll then load the transformed data into the data warehouse.
A disadvantage of the ETL approach is that the transformation stage can take a long time. An alternative approach is extract, load, and transform (ELT). In ELT, the data is immediately extracted and loaded into a large data repository, such as Azure Data Lake Storage. We can begin transforming the data as soon as the load is complete.
Hands on
In this first part I will show how to create an ELT. We'll extract data from a Public API called IntegraSUS regarding Covid-19 data, and load it on Azure Data Lake Storage. So, this ELT will be containerized on Azure Container Registry (ACR), and we will use Azure Kubernetes Service (AKS) to schedule our job on K8s cluster to run daily.
In the second part of this project, we will integrate the Azure Data Lake with Apache Spark on Azure Databricks to perform a small transformation on top of the files sent to the Data Lake and then we will store the result of the transformation in a Data Warehouse.
We will learn how to:
- Create a Python ELT and load into Azure Data Lake;
- Create an Azure Container Registry and push images into it;
- Create an Azure Kubernetes Service;
- Deploy CronJob into Azure Kubernetes Cluster;
- Integrate Azure Data Lake with Apache Spark on Databricks;
- Transform data using PySpark on Azure Databricks
- Load new data into Data Warehouse.
The project code is available here: github repository
1. Create a Python ELT
In the extraction phase we will get data from a Public API about Fortaleza/Ceará/Brazil Covid-19 data, and store the data into a json file. After that, we will load it into Azure Data Lake. You can see the project code below.
#!/usr/local/bin/python
import os
import sys
import yaml
import json
import logging
import requests
from datetime import datetime, timedelta
from azure.storage.filedatalake import DataLakeServiceClient
logging.basicConfig(stream=sys.stdout,
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
date = datetime.today() - timedelta(days=2)
previous_date = f"{date.year}-{date.month}-{date.day}"
def extract():
logger.info('EXTRACTING...')
#extract data from API
url = 'https://indicadores.integrasus.saude.ce.gov.br/api/casos-coronavirus?dataInicio='+ previous_date +'&dataFim=' + previous_date
req = requests.get(url)
data = req.json()
if data:
with open(f'covid-data-{previous_date}.json', 'w', encoding='utf-8') as file:
json.dump(data, file, ensure_ascii=False, indent=4)
else:
logger.info('THERE IS NOT DATA')
def initialize_storage_account(storage_account_name, storage_account_key):
try:
global service_client
service_client = DataLakeServiceClient(account_url="https://"+ storage_account_name +".dfs.core.windows.net", credential=storage_account_key)
except Exception as e:
logger.info('EXCEPTION...')
logger.info(e)
def create_file_system(container_name):
try:
global file_system_client
logger.info('CREATING A CONTAINER NAMED AZ-COVID-DATA')
file_system_client = service_client.create_file_system(file_system=container_name)
except Exception as e:
logger.info('EXCEPTION...')
logger.info(e)
def create_directory():
try:
logger.info('CREATING A DIRECTORY NAMED DIRECTORY-COVID22')
file_system_client.create_directory("directory-covid19")
except Exception as e:
logger.info('EXCEPTION...')
logger.info(e)
def upload_file_to_container_datalake(local_file, container_name):
try:
logger.info('UPLOADING FILE TO AZURE DATA LAKE STORAGE...')
file_system_client = service_client.get_file_system_client(file_system=container_name)
directory_client = file_system_client.get_directory_client("directory-covid19")
file_client = directory_client.get_file_client(f"covid-{previous_date}.json")
with open(local_file, "rb") as data:
file_client.upload_data(data, overwrite=True)
logger.info('UPLOADED TO AZURE DATA LAKE')
except Exception as e:
logger.info('EXCEPTION...')
logger.info(e)
def load_config():
directory = os.path.dirname(os.path.abspath(__file__))
with open(directory + "/config.yaml", "r") as yamlfile:
return yaml.load(yamlfile, Loader=yaml.FullLoader)
if __name__ == "__main__":
extract()
config = load_config()
initialize_storage_account(config["AZURE_DL_STORAGE_ACCOUNT_NAME"], config["AZURE_DL_ACCOUNT_KEY"])
create_file_system(config["AZURE_DL_CONTAINER_NAME"])
upload_file_to_container_datalake(f"covid-data-{previous_date}.json", config["AZURE_DL_CONTAINER_NAME"])
2. Create docker image for our Python ELT
We are going to build a docker image for our ELT job and run it inside the container. So, let's create a Dockerfile, which describes how a docker image is built. You can see a list of instructions below.
FROM python:3.9.12-buster
WORKDIR /usr/src/app
COPY requirements.txt /usr/src/app
RUN pip install -r requirements.txt
COPY config.yaml /usr/src/app
COPY el2datalake.py /usr/src/app
RUN chmod a+x el2datalake.py
CMD ["./el2datalake.py"]
We can build the docker image using docker build command
$ docker build -t el2datalakejob .
Now we should run our ELT inside the container
$ docker run -it el2datalakejob:latest
3. Push docker images to Azure Container Registry
Azure Container Registry handles private Docker container images and allow us to build, store, and manage container images. We are going to deploy an ACR instance and push a docker image to it.
To create an any instance on Azure, we must create a resource group. We create a new resource group with az group create command.
$ az group create --name myResourceGroup --location westeurope
Once we have a resource group, we can create an Azure Container Registry with az acr create command.
$ az acr create \
--resource-group myResourceGroup \
--name azcrjobs \
--sku Basic \
--location westeurope
Let's login on Azure Container Registry
$ az acr login --name azcrjobs
Let us tag the image to the login server azcrjobs.azurecr.io
$ docker tag el2datalakejob \
azcrjobs.azurecr.io/el2datalakejob:v1
Push the Docker image to ACR
$ docker push azcrjobs.azurecr.io/el2datalakejob:v1
Now we have our ELT on Azure Container Registry let's move on to the next step.
4. Create and Deploy CronJobs on Azure Kubernetes Service
Azure Kubernetes Service (AKS) deploy and manage containerized applications more easily with a fully managed Kubernetes service. Let’s create an AKS cluster with az aks create command.
$ az aks create \
--resource-group myResourceGroup \
--name az-aks-jobs \
--node-count 1 \
--attach-acr azcrjobs \
--location westeurope
To connect to the cluster from local machine we use Kubernetes client kubectl, open the terminal to connect to the cluster
$ az aks get-credentials --resource-group myResourceGroup \ --name az-aks-jobs
Let us see our node available on AKS
$ kubectl get nodes
We start from creating a manifest file for our ELT cron job.
apiVersion: batch/v1
kind: CronJob
metadata:
creationTimestamp: null
name: k8sjob
spec:
jobTemplate:
metadata:
creationTimestamp: null
name: k8sjob
spec:
template:
metadata:
creationTimestamp: null
spec:
containers:
- image: azcrjobs.azurecr.io/el2datalakejob:v1
imagePullPolicy: IfNotPresent
name: k8sjob
resources: {}
restartPolicy: OnFailure
schedule: '55 23 * * *'
status: {}
Above on our manifest file we defined the crontab expression used as a schedule for our job, and is scheduled to run everyday at 23:55. We put the name of the docker image to be pulled from container registry attached to cluster.
To deploy our job, we will use the kubectl apply command.
kubectl apply -f job.yml
We can view some details about the job with
kubectl get cronjobs
To retrieve cron job logs from Kubernetes, we can use kubectl logs command, but first we must get the pod name.
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
k8sjob-27513350--1-xnj8x 0/1 Completed 0 4m2s
Retrieve cron job logs from Kubernetes
$ kubectl logs k8sjob-27513350--1-xnj8x
Conclusion
Finally, we have the first stage of our project completed. Now we have Covid data on Azure Data Lake. For the next step, we will read this file from Azure Data Lake and perform a little processing of this data using Apache Spark on Azure Databricks, and we will be able to make the result of this processing available in a Data Warehouse.
Posted on April 24, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.