Get data from Cloud SQL with Python

gabrielosluz

Gabriel Luz

Posted on June 24, 2022

Get data from Cloud SQL with Python

Hello, there!

A possible scenario in the day-to-day work of a data engineer may be the need to transport data from a SQL base to another place. In this short tutorial I show a simple way to perform this operation using Python.

For the database, I used Cloud SQL, which is a managed database service from Google Cloud Platform (GCP). This GCP product provides a cloud-based alternative to MySQL, PostgreSQL and SQL Server databases. The great advantage of Cloud SQL is that it is a managed service, that is, you do not have to worry about some tasks related to the infrastructure where the database will run, tasks such as backups, maintenance and updates, monitoring, logging, etc. In this example I used Postgres SQL.

In this tutorial, I'm going to assume that you already have an operational Cloud SQL, if you don't, you can follow this tutorial to create a database from scratch in a few minutes. To interact with GCP we will use the SDK. If you are a Mac OS user, you can install it through the Homebrew package manager, just run the command:

brew install --cask google-cloud-sdk

If you are a user of another operational system, you can check this link for the correct way to install the SDK on your OS. Then, you need to use the SDK to authenticate your GCP account, for that, just use the command below and log into your account and then select the project that will be used:

gcloud auth application-default login
gcloud config set project <PROJECT_ID>

As in any python project, it is good practice to create a virtual environment dedicated to it, for that, we will use virtualenv, to download it just run the command:

pip install virtualenv

Next, we create the virtual environment for the project and launch it:

virtualenv <my_env_name>
source <my_env_name>/bin/activate

With the virtual environment created, we can download the libraries that will be used in the project:

The first step of our Python code will be import the downloaded libraries:

from google.cloud.sql.connector import Connector
import sqlalchemy
import configparser
import pandas as pd
from fastavro import writer, parse_schema
Enter fullscreen mode Exit fullscreen mode

Next, we will need to set the connection parameters. It is always good to point out that you should never put passwords or direct access keys in the code. With that in mind, let's create an .env file to store the database access data. It's extremely important to put this file in .gitignore so it doesn't go up to the git repository you're using. Two other alternatives would be to use environment variables or a GCP service designed to store access information, the secret manager. However, for this project, we will continue in this way. Fill in the .env file with the following information:

[DEFAULT]
db_user = database user
db_pass = password
db_name = database name
project_id = GCP project id
region = region where the database was created
instance_name = name of the database instance
Enter fullscreen mode Exit fullscreen mode

Going back to the python code, let's use the configpaser library to read these parameters:

#set config
parser = configparser.ConfigParser()
parser.read(".env")

project_id = parser.get("DEFAULT", "project_id")
region = parser.get("DEFAULT", "region")
instance_name = parser.get("DEFAULT", "instance_name")
db_user = parser.get("DEFAULT", "db_user")
db_pass = parser.get("DEFAULT", "db_pass")
db_name = parser.get("DEFAULT", "db_name")
Enter fullscreen mode Exit fullscreen mode

Then we set up the connection name to follow the pattern: PROJECT_ID:REGION:INSTANCE_NAME

# initialize parameters
INSTANCE_CONNECTION_NAME = f"{project_id}:{region}:{instance_name}" # i.e demo-project:us-central1:demo-instance
print(f"Your instance connection name is: {INSTANCE_CONNECTION_NAME}")
Enter fullscreen mode Exit fullscreen mode

To make the connection itself, we are going to use the cloud-sql-python-connector library. Let's initialize the Connector object and make a call to the connect method, passing the connection parameters.

# initialize Connector object
connector = Connector()

# function to return the database connection object
def getconn():
   conn = connector.connect(
       INSTANCE_CONNECTION_NAME,
       "pg8000",
       user=db_user,
       password=db_pass,
       db=db_name
   )
   return conn
Enter fullscreen mode Exit fullscreen mode

The Connector itself creates connection objects by calling its connect method but does not manage database connection pooling. For this reason, it is recommended to use the connector alongside a library that can create connection pools. For our case, we will use the SQLAlchemy library. This will allow for connections to remain open and be reused, reducing connection overhead and the number of connections needed.

# create connection pool with 'creator' argument to our connection object function
pool = sqlalchemy.create_engine(
   "postgresql+pg8000://",
   creator=getconn,
)
Enter fullscreen mode Exit fullscreen mode

The returned connection pool engine can then be used to query and modify the database. For the database, I chose to use this dataset, containing information about the 2021-22 Uefa Champions League season. To upload the data to Postgres, I used the SQL Debeaver client, which has a feature to easily import data. The base has 8 tables, as shown in the image below, but I will only use one as it will be enough for the purpose of this short tutorial.

List of eight tables present in the created database.

Through the pool object, created in the last code block, we can perform a query using the pandas library, as follows:

tb_key_stats = pd.read_sql_query("select * from public.tb_key_stats limit 1000", con=pool)
print(tb_key_stats)
Enter fullscreen mode Exit fullscreen mode

With that, we have the extracted data:

Print containing the first five lines of the dataframe resulting from the data extraction.

As a plus to this tutorial, we are going to export the extracted data to an Avro file. In case you're not familiar, Avro is an open-source project which provides services of data serialization and exchange for Apache Hadoop. Unlike another very popular data engineering format, Parquet, Avro is row-oriented, just like csv. In case you don't know the difference between the two orientations, consider this dataset.

Image of a small table containing three rows and three columns. Columns player, club and parents (club).

Here’s how it would be organized in both row and column storage:

Image of the previous small dataset arranged by both row and column.

Column storage files are more lightweight, as adequate compression can be made for each column. Row storage doesn’t work in that way, since a single row can have multiple data types.

A very useful feature of Avro is that it stores the schema in a Json like object, so the data typing is already known in advance.

To convert the previously generated pandas dataframe into an Avro file, we are going to use the FastAvro library. This process essentially contains three steps:

  • Define the schema: We need to define a schema in JSON format to specify which fields are expected, along with their respective data types. Let's do this through a Python dictionary and then make the fastavro.parse_schema() function receive this object.
schema_tb_key_stats = {
   'doc': 'UCL_2021_22',
   'name': 'UCL',
   'namespace': 'stats',
   'type': 'record',
   'fields': [
       {'name': 'player_name', 'type': 'string'},
       {'name': 'club', 'type': 'string'},
       {'name': 'position', 'type': 'string'},
       {'name': 'minutes_played', 'type': 'int'},
       {'name': 'match_played', 'type': 'int'},
       {'name': 'goals', 'type': 'int'},
       {'name': 'assists', 'type': 'int'},
       {'name': 'distance_covered', 'type': 'float'}
   ]
}
parsed_schema_tb_key_stats = parse_schema(schema_tb_key_stats)
Enter fullscreen mode Exit fullscreen mode
  • Convert the DataFrame to a list of records: let's convert the dataframe to a list of dictionaries:
records = tb_key_stats.to_dict('records')
Enter fullscreen mode Exit fullscreen mode
  • Generate the Avro file:
with open('tb_key_stats_deflate.avro', 'wb') as out:
   writer(out, parsed_schema_tb_key_stats, records, codec='deflate')
Enter fullscreen mode Exit fullscreen mode

With that we have the file generated successfully. As you can see, for this example I used deflate compression, but it is also possible to choose snappy or no compression at all, by default the codec parameter is set to null. It is also clear that if it is a dataset with many columns it can become a tedious work to pass the whole schema to a Json object, this is the price paid for efficiency. While there are creative ways to automate this process, that's for another article.

That’s It!

The purpose of this article was to present a very simple way to extract data from a Postgres database in Cloud SQL. In a future article I intend to demonstrate an efficient way to extract all tables from the created database, since for today we only extract one table.

Let me know if you have any questions, feedback, or ideas over on Linkedin or in the comments below. Thanks for reading!

References

💖 💪 🙅 🚩
gabrielosluz
Gabriel Luz

Posted on June 24, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related