Bulk load Pandas DataFrames into SQL databases using Jaydebeapi

rmohcie

MoroChev

Posted on May 9, 2021

Bulk load Pandas DataFrames into SQL databases using Jaydebeapi

Loading Pandas DataFrames into SQL databases of all names is a common task between all developers working on building data pipelines for their environments or trying to automate ETL jobs generally.
And for that, Pandas DataFrame class has the built-in method pandas.DataFrame.to_sql that allows to do so very quickly, for SQLite and all the databases supported by SQLAlchemy library, and when it comes to those who doesn’t have a good support by it ( in my case was IBM DB2 ), developers find themselves forced to think twice for some work around in order to get the job done.
Jaydebeapi introduces himself as a good alternative, and it’s particularly seen thus by all developers coming from a Java background and having some familiarities of working with JDBC API to access the database.
Let’s start first by creating the database connection. for that reason I will be creating a simple function that takes in params all the informations required and it will give a connection to DB2 as a return.

def get_conn_to_db( user: str,
                    password: str,
                    host: str,
                    port: str,
                    db_name: str,
                    driver_name: str ): 

    """ Return a connection to DB2 database  """

    login          = {'user': user, 'password': password} 
    drivers_path   = [path_to/driver_file.jar]     
    connection_url = 'jdbc:db2://'+host+':'+port+'/'+database 
    connection     = jaydebeapi.connect(driver_name, connection_url, login,jars= drivers_path)     
    return connection
Enter fullscreen mode Exit fullscreen mode

And then let’s move on to build the bulk_load function that’s going to be charged to load our Pandas DataFrame into DB2 in a chunked way.

def bulk_load(df: pandas.DataFrame, conn, schema_name: str, table: str, chunksize: int) -> []:

    cursor = connection.cursor()    
    sql_exceptions = []
    row_nbr = 0
    df_length = df.shape[0]
    schema_table = f"{schema_name}.{table}"
    # You should make sure that the columns names doesn't 
    # contain any SQL key word
    cols_names_list = df.columns.values.tolist()
    cols_names = f"({ ",".join(cols_names_list) })"

    while row_nbr < df_length:       
        # Determine insert statement boundaries (size)
        beginrow = row_nbr
        endrow = df_length if (row_nbr+chunksize) > df_length 
                           else row_nbr + chunksize 

        # Extract the chunk
        tuples = [tuple(x) for x in df.values[beginrow : endrow]]            
        values_params = '('+",".join('?' for i in cols_names)+')'       
        sql = f"INSERT INTO {schema_table} {cols_names} VALUES {values_params}"

        try:
            cursor.executemany(sql, tuples)
            connection.commit()
        except Exception as e: 
            sql_exceptions.append((beginrow,endrow, e))

        row_nbr = endrow

    cursor.close()
    connection.close()
    return sql_exceptions
Enter fullscreen mode Exit fullscreen mode

Now Let’s see how we can apply those functions on our main task

import numpy as np
import pandas as pd
import jaydebeapi


db_settings = {    
    'host': 'host-adress',
    'port': '50000',
    'user':'username',
    'password':"password",
    'driver_name':'com.ibm.db2.jcc.DB2Driver',
    'db_name':'bludb'
} 

data = np.random.choice(['foo',3,'bar'],size=(100000,3))
df = pd.DataFrame(data, columns=['random_1', 'random_2', 'random_3'])

with get_conn_to_db(**db_settings) as conn:
     bulk_load(df, conn, 'RANDOM_SCHEMA_NAME', 'RANDOM_TABLE_NAME',  1000)
Enter fullscreen mode Exit fullscreen mode
💖 💪 🙅 🚩
rmohcie
MoroChev

Posted on May 9, 2021

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related