Bulk load Pandas DataFrames into SQL databases using Jaydebeapi
MoroChev
Posted on May 9, 2021
Loading Pandas DataFrames into SQL databases of all names is a common task between all developers working on building data pipelines for their environments or trying to automate ETL jobs generally.
And for that, Pandas DataFrame class has the built-in method pandas.DataFrame.to_sql that allows to do so very quickly, for SQLite and all the databases supported by SQLAlchemy library, and when it comes to those who doesn’t have a good support by it ( in my case was IBM DB2 ), developers find themselves forced to think twice for some work around in order to get the job done.
Jaydebeapi introduces himself as a good alternative, and it’s particularly seen thus by all developers coming from a Java background and having some familiarities of working with JDBC API to access the database.
Let’s start first by creating the database connection. for that reason I will be creating a simple function that takes in params all the informations required and it will give a connection to DB2 as a return.
def get_conn_to_db( user: str,
password: str,
host: str,
port: str,
db_name: str,
driver_name: str ):
""" Return a connection to DB2 database """
login = {'user': user, 'password': password}
drivers_path = [path_to/driver_file.jar]
connection_url = 'jdbc:db2://'+host+':'+port+'/'+database
connection = jaydebeapi.connect(driver_name, connection_url, login,jars= drivers_path)
return connection
And then let’s move on to build the bulk_load function that’s going to be charged to load our Pandas DataFrame into DB2 in a chunked way.
def bulk_load(df: pandas.DataFrame, conn, schema_name: str, table: str, chunksize: int) -> []:
cursor = connection.cursor()
sql_exceptions = []
row_nbr = 0
df_length = df.shape[0]
schema_table = f"{schema_name}.{table}"
# You should make sure that the columns names doesn't
# contain any SQL key word
cols_names_list = df.columns.values.tolist()
cols_names = f"({ ",".join(cols_names_list) })"
while row_nbr < df_length:
# Determine insert statement boundaries (size)
beginrow = row_nbr
endrow = df_length if (row_nbr+chunksize) > df_length
else row_nbr + chunksize
# Extract the chunk
tuples = [tuple(x) for x in df.values[beginrow : endrow]]
values_params = '('+",".join('?' for i in cols_names)+')'
sql = f"INSERT INTO {schema_table} {cols_names} VALUES {values_params}"
try:
cursor.executemany(sql, tuples)
connection.commit()
except Exception as e:
sql_exceptions.append((beginrow,endrow, e))
row_nbr = endrow
cursor.close()
connection.close()
return sql_exceptions
Now Let’s see how we can apply those functions on our main task
import numpy as np
import pandas as pd
import jaydebeapi
db_settings = {
'host': 'host-adress',
'port': '50000',
'user':'username',
'password':"password",
'driver_name':'com.ibm.db2.jcc.DB2Driver',
'db_name':'bludb'
}
data = np.random.choice(['foo',3,'bar'],size=(100000,3))
df = pd.DataFrame(data, columns=['random_1', 'random_2', 'random_3'])
with get_conn_to_db(**db_settings) as conn:
bulk_load(df, conn, 'RANDOM_SCHEMA_NAME', 'RANDOM_TABLE_NAME', 1000)
Posted on May 9, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
November 30, 2024
November 30, 2024