Using Google Cloud functions to send QA check reports via Gmail
Niko Draganić
Posted on February 17, 2021
It's been a couple of months on my new job, and I've come to the conclusion that Google Cloud Platform, for all its genius, sometimes lacks some basic functionality. Or rather, those functionalities can be obscured or difficult to implement into your data pipelines. It is what it is, as they say, and I still maintain that overall data management and governance processes work very well in the GCP.
I've created a data quality check in BigQuery that outputs a date (current_date()
), the owner of data (let's say the salesman responsible for the deal that fails the DQ check) and the payload, which is, essentially, a JSON string containing the relevant insights to make the fix at the source. The important thing to note here is that one row per data owner is generated daily, with all of his/her errors aggregated into the payload. Why? Because we want to send one email containing all the failed DQ checks per person, not spam them with potentially dozens of mails.
Getting the data out of BigQuery
There are two main ways to get the data stored in BigQuery into a mail-sending pipeline: DataFlow and Cloud Functions. At first I chose DataFlow, mainly to get familiarised with the tool. You can create DataFlow jobs from SQL, which is similar, but not entirely equal to BQ, so my recommendation is to write all of the query in BQ, load it into a table (with a scheduled job, if needed), and then read the table directly with DataFlow.
To read a BigQuery table in a DataFlow SQL job, you need to format it as
bigquery.table.`project-name`.`dataset-name`.`table-name`
.
Now, I said "at first I chose DataFlow" - I now believe it's simpler to just have the Cloud Function retrieve the data from BQ directly, for several reasons:
- DataFlow batch jobs are slower than cloud functions, at least for the tiny amount of data my DQ checks generate
- DataFlow batch jobs are a bitch to schedule - another fascinatingly unavailable functionality
- In my pipeline, this step should be the last, because once you create a job it runs automatically and there is no way to restart it
However, since I did it this way, I will say that when creating your DataFlow job, the only thing you need to specify is the name of the Pub/Sub topic. Which brings us to...
Creating a Pub/Sub topic
Weather you use DataFlow to send data from BQ to your Cloud Function, or you do everything inside your CF, you will use Pub/Sub to trigger it. Pub/Sub is a distributed messenger service, and it is used to handle communication between multiple sources (producers) and multiple targets (consumers). Not much to say about it, create a new topic, give it a name and make sure you check adding a default subscription. When created, use the trigger cloud function
button in the navigation. This starts a process of creating a new CF, which we will put on hold for now (do the next step in a new tab so you can return to this).
Create a service account and manage Google APIs
I will not get into details here - there are plenty of walkthroughs on how to create a service account. The important thing is to grant it G Suite Domain-wide delegation and note its Client_ID. Create a new .json key for your service account and store it.
The next thing is to go to G Suite and enable this service to send mails. To do this, you need to be a Superadmin in your organisation. You can see the detailed steps in this medium post, but essentially, you want to:
- Enable Gmail API
- Go to G Suite admin console -> Security -> Advanced -> Manage API client access
- Enter your sacc's Client_ID and give it a name
- Give it an API scope:
https://www.googleapis.com/auth/gmail.send
and authorise.
Create a python Cloud Function
Go back to the Cloud Function creation page. Give it a name and trigger it with the Pub/Sub you created earlier. You can define your sensitive service account information as environment variables, or you can just upload the .json key in the next step. Choose runtime (I chose Python 3.8) and name your entry point, i.e. the function that does all of the job. In my case, that was run_mail_sender
, without the brackets.
Update your requirements.txt with:
# Function dependencies, for example:
# package>=version
google-api-core==1.25.0
google-api-python-client==1.12.8
google-auth==1.24.0
google-auth-httplib2==0.0.4
google-cloud-bigquery-storage==2.1.0
google-cloud-core==1.5.0
google-cloud-storage==1.35.0
google-crc32c==1.1.2
google-resumable-media==1.2.0
googleapis-common-protos==1.52.0
httplib2==0.19.0
oauth2==1.9.0.post1
pandas==1.2.2
Upload the .json key and note the filename.
And finally, here's my function:
from __future__ import print_function
import base64
import json
import pandas as pd
from googleapiclient.discovery import build
from googleapiclient import errors
from httplib2 import Http
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from google.oauth2 import service_account
If all you need to send it a plain text message, your function will be a lot simpler. This example shows how to create a HTML report.
def run_mail_sender(event, context):
"""Triggered from a message on a Cloud Pub/Sub topic.
Args:
event (dict): Event payload.
context (google.cloud.functions.Context): Metadata for the event.
"""
pubsubMessage = base64.b64decode(event['data']).decode('utf-8')
j = json.loads(pubsubMessage)
date = j['calculation_date']
dataOwner = j['data_owner']
# Email variables. Modify this!
EMAIL_FROM = 'niko.draganic@somemail.com'
EMAIL_TO = dataOwner + 'somemail.com'
EMAIL_SUBJECT = '[QA] DQ report generated on {}'.format(date)
# Generate HTML report
html = "<!DOCTYPE html>\n<html>\n<head>\n <title>Bornfight BI QA Report</title>\n <meta charset='UTF-8'>\n <style>\n .dataframe th{background: #000000; font-family: arial; font-size: 120%; color: white; border:none; text-align:left !important;}\n .reportHighlight{color: #8b8b8b;}\n html{font-family: arial, sans-serif; font-size: 14px;}\n table{margin:0; padding:0; border:none; border-collapse: collapse; border-spacing: 0; background-image: none;}\n td,th{padding: 8px;}\n </style>\n</head>\n<body>\n"
html = html + "<h3>DQ report for <span class='reportHighlight'>{}</span> generated on <span class='reportHighlight'>{}</span></h3>\n<br />\n<br />\n".format(dataOwner, date)
for i in j['data']:
html = html + "<h4>Report name: {}</h4>\n".format(i['qa_control_name'])
html = html + pd.DataFrame.from_dict(json.loads(j['data'][0]['payload'])['data']).to_html(index=False, border=0)
html = html + "</body>\n</html>"
service = service_account_login(EMAIL_FROM)
# Call the Gmail API
message = create_message(EMAIL_FROM, EMAIL_TO, EMAIL_SUBJECT, 'DQ Check Errors', html)
sent = send_message(service,'me', message)
So we decode the pubsubMessage
and then parse the JSON - the three keys being calculation_date
, data_owner
and data
. This is enough to generate the sender, recipient and subject fields. Next we generate the email body in HTML:
- First we generate the
<head>
and in it some styling. - Next, we add a title for the report, customised with the recipient's name and date.
- Then we generate the elements of the report - the quality check name, followed by a table containing all the relevant DQ data. We do this in a loop if more than 1 DQ checks failed for this particular user.
- Finally, we close off the HTML tags.
A quick note about generating tables - my data is stored as an array of structs in BQ. When we parse it with json.loads()
it becomes a nested JSON object. This is then loaded into Pandas quite easily, and then exported as HTML directly from Pandas.
The beauty of this approach is that every DQ check can generate its own table independently of other checks. Pandas parses the JSON into a DataFrame and then outputs HTML
<table>
object. With some CSS styling done in the html header, you can customise the report any way you want.
After the HTML is generated, we create a service account, package the message and send it. Here are the functions that do that:
def create_message(sender, to, subject, txt, html):
"""Create a message for an email.
Args:
sender: Email address of the sender.
to: Email address of the receiver.
subject: The subject of the email message.
message_text: The text of the email message.
Returns:
An object containing a base64url encoded email object.
"""
message = MIMEMultipart('alternative')
message['to'] = to
message['from'] = sender
message['subject'] = subject
message.attach(MIMEText(txt, 'plain'))
message.attach(MIMEText(html, 'html'))
return {'raw': base64.urlsafe_b64encode(message.as_bytes()).decode()}
def send_message(service, user_id, message):
"""Send an email message.
Args:
service: Authorized Gmail API service instance.
user_id: User's email address. The special value "me"
can be used to indicate the authenticated user.
message: Message to be sent.
Returns:
Sent Message.
"""
try:
message = (service.users().messages().send(userId=user_id, body=message).execute())
print('Message Id: %s' % message['id'])
return message
except errors.HttpError as error:
print('An error occurred: %s' % error)
Not much to it. The last one creates a service account with my credentials. Be sure to include the API scope defined above and the JSON file containing the credentials, which I created directly inside the Cloud Function workspace and then pasted its contents.
def service_account_login(EMAIL_FROM):
SCOPES = ['https://www.googleapis.com/auth/gmail.send']
SERVICE_ACCOUNT_FILE = 'file-to-sacc-credentials.json'
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE, scopes=SCOPES)
delegated_credentials = credentials.with_subject(EMAIL_FROM)
service = build('gmail', 'v1', credentials=delegated_credentials)
return service
That's it!
Having done this, I now realise that my mail sending function would be better off accepting the email message parameters, such as HTML, subject, sender and recipient via Pub/Sub message, and then have another function reach into BigQuery, generate the HTML and publish to the Pub/Sub topic. But that will be a topic of some future post.
Posted on February 17, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.