Python ETL: Creating and automating a pipeline from Mysql to postgresql

24mwangi

James

Posted on October 11, 2022

Python ETL: Creating and automating a pipeline from Mysql to postgresql

What is ETL?
In data processing, extract, transform, load (ETL) is a three-phase process in which data is extracted, transformed (cleaned, cleaned, cleaned), and loaded into an output data container. Data can be collected from one or more sources and output to one or more destinations.ETL software typically automates the entire process and can be run manually or on a regular schedule as individual jobs or aggregated into batches of jobs.

ETL tools break down data silos and make it easier for data scientists to access, analyze, and transform data into business intelligence. In short, an ETL tool is a critical first step in the data warehousing process that ultimately enables you to make more informed decisions in less time.

Python for ETL
Python is a relatively easy programming language to learn and use. Python has an extremely active open source community on GitHub, regularly releasing new Python libraries and improvements.In recent years, Python has become a popular programming language choice for data processing, data analysis, and data science (especially with the powerful Pandas data science library).

python libraries useful in ETL

  • Pandas uses a dataframe as a data structure to hold data in memory (similar to how data is handled in the R programming language) Besides the usual ETL features, Pandas supports many analytical features and data visualization.
  • Apache Airflow is an open source workflow management tool. It can be used to create data ETL pipelines. Strictly speaking, it is not an ETL tool, but a orchestration tool that can be used to create, schedule, and track workflows. This means you can use Airflow to create a pipeline by merging different modules written independently from your ETL process. The airflow workflow follows the concept of DAG (Directed Acyclic Graph). Airflow, also features a browser-based dashboard to visualize workflows and track execution of multiple workflows.
  • Pyspark performs fast processing of huge amounts of data. So, if you are looking to create an ETL pipeline for very fast big data processing or processing data streams, you should definitely look into Pyspark.
  • Luigi is a Python-based ETL engine created by Spotify but now available as an open source tool. It is a more complex tool and has powerful features for creating complex ETL pipelines. It handles dependency resolution, workflow management, visualization, error handling, command line integration, and more.It also comes with a web dashboard to track all ETL jobs

Example of a pipeline from Mysql to postgresql db using python

This pipeline extracts data from a mysql database. The data is transformed in python using pandas library then loaded to analytical database in postgres db. This task is scheduled using the windows task scheduler(Updates the target database every 5 minutes)

Install mysql - Installing MySQL on Microsoft Windows

Install postgresql - Install PostgreSQL on Windows

The source database contains a classicmodels db which is a database for a car retail company.
Queries to create against the source database:

  • customers who made the most orders
  • products that have the highest number of purchases
  • customers who have spent more

Create the analytical database CREATE DATABASE classic_model_analysis to which the data will be loaded.

Creating a connection with target and source database

import psycopg2
import mysql.connector

# source database
def get_conn_mysql():
    conn = mysql.connector.connect(host="localhost", port=3306, user="root", password="", db="classicmodels")
    # start a connection
    cur = conn.cursor()
    return cur, conn
# target database
def get_conn_postgresql():
    conn = psycopg2.connect(host="localhost",database="classicmodels_analysis",user="postgres",password="")
    # start a connection
    cur = conn.cursor()
    return cur, conn
Enter fullscreen mode Exit fullscreen mode

Creating and updating tables for the target database

import db_connect
import pandas as pd
import psycopg2
import psycopg2.extras as extras

# ignore user warnings
import warnings
warnings.simplefilter("ignore", UserWarning)

cur1, conn1 = db_connect.get_conn_mysql()
cur2, conn2 = db_connect.get_conn_postgresql()

# creating and updating tables for the target database
commands = (
        """
        CREATE TABLE IF NOT EXISTS toporders(
            customername VARCHAR(255),
            number_of_orders INTEGER
        )
        """,
        """ CREATE TABLE IF NOT EXISTS product_demand(
                productName VARCHAR(255),
                quantity_ordered INTEGER
                )
        """,
        """
        CREATE TABLE IF NOT EXISTS customer_spending(
            customername VARCHAR(255),
            total_amount_spent float8
        )
        """,
        """
        TRUNCATE TABLE toporders, product_demand, customer_spending;
        """
        )
# executing the queries against the target database
for command in commands:
    cur2.execute(command)
print("--------- tables updated ----------")
# commit schema changes
conn2.commit()
Enter fullscreen mode Exit fullscreen mode

Extracting data from Source database and performing simple transformations

# extracting data from the source database-views
#product demand-products with highest purchases
query1="SELECT productName , SUM(quantityOrdered) AS quantity_ordered\
       FROM  products, orderdetails\
       WHERE products.productCode = orderdetails.productCode\
       GROUP BY productName\
       ORDER BY quantity_ordered DESC\
       LIMIT 20;"

# toporders- customers that have the most orders
query2="SELECT contactFirstName, contactLastName , COUNT(*) AS number_of_orders\
       FROM  customers, orders\
       WHERE customers.customerNumber = orders.customerNumber\
       GROUP BY customerName\
       ORDER BY number_of_orders DESC\
       LIMIT 20;"
# customer spending- ustomers that have spent more
query3="SELECT contactFirstName , contactLastName, SUM(quantityOrdered*priceEach) AS total_amount_spent\
       FROM  customers, orders, orderdetails\
       WHERE customers.customerNumber = orders.customerNumber AND orderdetails.orderNumber= orders.orderNumber\
       GROUP BY customerName\
       ORDER BY total_amount_spent DESC\
       LIMIT 10;"

# creating dataframes from the queries
df1= pd.read_sql(query1,con=conn1)
df2= pd.read_sql(query2,con=conn1)
df3= pd.read_sql(query3,con=conn1)

# performing some transformations on the dataframes- joining columns for first name and last name
df2['customername'] = df2['contactFirstName'].str.cat(df2['contactLastName'],sep=" ")
df2=df2.drop(['contactFirstName','contactLastName'], axis=1)
df3['customername'] = df3['contactFirstName'].str.cat(df3['contactLastName'],sep=" ")
df3=df3.drop(['contactFirstName','contactLastName'], axis=1)
# converting the datatype of quantity ordered to integer for df-product demand
data_types={'quantity_ordered':int}
df1 = df1.astype(data_types)

Enter fullscreen mode Exit fullscreen mode

Loading data into the target database

# loading the df into the target database tables
def execute_values(conn, df, table):

    tuples = [tuple(x) for x in df.to_numpy()]

    cols = ','.join(list(df.columns))
    # SQL query to execute
    query = "INSERT INTO %s(%s) VALUES %%s" % (table, cols)
    try:
        extras.execute_values(cur2, query, tuples)
        conn2.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        return 1
    print("-------data updated/inserted into table----",table)

execute_values(conn2, df1,'product_demand')
execute_values(conn2, df2,'toporders')
execute_values(conn2, df3,'customer_spending')

# close connections for mysql and postgresql
conn1.close()
conn2.close()
Enter fullscreen mode Exit fullscreen mode

Below is an output from one the tables in the target database:
customer spending

Automating the job with windows scheduler
This etl job is scheduled to run every 5 minutes for one day, using the windows task scheduler. schedule_python_etl.bat activates the environment and runs the python script.
to create a task in windows task scheduler: start->task scheduler->create a folder (mytask)->create task (python_etl)->trigger(repeat after 5 mins)->action(start program-schedule_python_etl.bat)

This is a simple example of a pipeline in python. Apache airflow can be used to manage this task more efficiently The entire code is available here: https://github.com/James-Wachuka/python_etl

Conclusion
Coding ETL processes in Python can take many forms, depending on technical requirements, business goals, what libraries are currently available, tools compatible with, and the extent to which developers feel they should work from scratch.Python is flexible enough that users can code almost any ETL process with native data structures.

💖 💪 🙅 🚩
24mwangi
James

Posted on October 11, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related