Creating a data pipeline using Dataproc workflow templates and cloud Schedule
Jader Lima
Posted on August 21, 2024
About This Post
Data pipelines are processes of acquiring, transforming and enriching data,
orchestrated and scheduled, which process information from different sources and with countless possible destinations and applications.
There are several systems that help in creating data pipelines, in this post we will cover creating data pipelines on Google Cloud Platform, using the dataproc Workflow template and creating a schedule with cloud Schedule
Description of Services Used in GCP
Apache Spark
Apache Spark is an open-source unified analytics engine for large-scale data processing. It is known for its speed, ease of use, and sophisticated analytics capabilities. Spark provides an interface for programming entire clusters with implicit data parallelism and fault tolerance, making it suitable for a wide variety of big data applications.
Google Dataproc
Google Dataproc is a fully managed cloud service that simplifies running Apache Spark and Apache Hadoop clusters in the Google Cloud environment. It allows users to easily process large datasets and integrates seamlessly with other Google Cloud services such as Cloud Storage. Dataproc is designed to make big data processing fast and efficient while minimizing operational overhead.
Cloud Storage
Google Cloud Storage is a scalable and secure object storage service for storing large amounts of unstructured data. It offers high availability and strong global consistency, making it suitable for a wide range of scenarios, such as data backups, big data analytics, and content distribution.
Workflow Templates
Workflow templates in Google Cloud allow you to define and manage complex workflows that automate interactions between different cloud services. This feature simplifies the process of building, scheduling, and executing intricate workflows, ensuring better management of resources and tasks.
Cloud Scheduler
Google Cloud Scheduler is a fully managed cron job service that allows you to run arbitrary functions at specified times without needing to manage the infrastructure. It is useful for automating tasks such as running reports, triggering workflows, and executing other scheduled jobs.
CI/CD Process with GitHub Actions
Incorporating a CI/CD pipeline using GitHub Actions involves automating the build, test, and deployment processes of your applications. For this project, GitHub Actions simplifies the deployment of code and resources to Google Cloud. This automation leverages GitHub's infrastructure to trigger workflows based on events such as code pushes, ensuring that your applications are built and deployed consistently and accurately each time code changes are made.
GitHub Secrets and Configuration
Utilizing secrets in GitHub Actions is vital for maintaining security during the deployment process. Secrets allow you to store sensitive information such as API keys, passwords, and service account credentials securely. By keeping this sensitive data out of your source code, you minimize the risk of leaks and unauthorized access.
-
GCP_BUCKET_BIGDATA_FILES
- Secret used to store the name of the cloud storage
-
GCP_BUCKET_DATALAKE
- Secret used to store the name of the cloud storage
-
GCP_BUCKET_DATAPROC
- Secret used to store the name of the cloud storage
GCP_SERVICE_ACCOUNT
-
GCP_SA_KEY
- Secret used to store the value of the service account key. For this project, the default service key was used.
-
PROJECT_ID
- Secret used to store the project id value
Creating a GCP service account key
To create computing resources in any cloud, in an automated or programmatic way, it is necessary to have an access key.
In the case of GCP, we use an access key linked to a service account, for the project the default account was used.
- In GCP Console, access :
- IAM &Admin
- Service accounts
- Select default service account, default name is something like Compute Engine default service account
- In selected service account, menu KEYS,
- ADD KEY, Create new Key, Key Type json
- Download the key file, use the content as key value in your secret in github
For more details, access:
https://cloud.google.com/iam/docs/keys-create-delete
Creating github secret
- To create a new secret:
- In project repository, menu Settings
- Security,
- Secrets and variables,click in access Action
- New repository secret, type a name and value for secret.
For more details , access :
https://docs.github.com/pt/actions/security-for-github-actions/security-guides/using-secrets-in-github-actions
Architecture Diagram
Deploying the project
Every time a push to the main branch happens, github actions will be triggered,
running the yml script.
the yml script contains 3 jobs which are explained in more detail below, but basically
github actions uses the credentials of the service account with rights to create
computing resources, if you authenticate to GCP, perform the steps described in the yml file
on:
push:
branchs: [main]
Workflow File YAML Explanation
Environments Needed
Here's a brief explanation of the environment variables needed in your workflows based on the YAML file you provided:
env:
BRONZE_DATALAKE_FILES: bronze
TRANSIENT_DATALAKE_FILES: transient
BUCKET_DATALAKE_FOLDER: transient
BUCKET_BIGDATA_JAR_FOLDER: jars
BUCKET_BIGDATA_PYSPARK_FOLDER: scripts
PYSPARK_INGESTION_SCRIPT : ingestion_csv_to_delta.py
REGION: us-east1
ZONE: us-east1-b
DATAPROC_CLUSTER_NAME : dataproc-bigdata-multi-node-cluster
DATAPROC_WORKER_TYPE : n2-standard-2
DATAPROC_MASTER_TYPE : n2-standard-2
DATAPROC_NUM_WORKERS : 2
DATAPROC_IMAGE_VERSION : 2.1-debian11
DATAPROC_WORKER_NUM_LOCAL_SSD: 1
DATAPROC_MASTER_NUM_LOCAL_SSD: 1
DATAPROC_MASTER_BOOT_DISK_SIZE: 32
DATAPROC_WORKER_DISK_SIZE: 32
DATAPROC_MASTER_BOOT_DISK_TYPE: pd-balanced
DATAPROC_WORKER_BOOT_DISK_TYPE: pd-balanced
DATAPROC_COMPONENTS: JUPYTER
DATAPROC_WORKFLOW_NAME: departments_etl
DATAPROC_WORKFLOW_INGESTION_STEP_NAME: ingestion_countries_csv_to_delta
JAR_LIB1 : delta-core_2.12-2.3.0.jar
JAR_LIB2 : delta-storage-2.3.0.jar
APP_NAME : 'countries_ingestion_csv_to_delta'
SUBJECT : departments
STEP1 : countries
STEP2 : departments
STEP3 : employees
STEP4 : jobs
TIME_ZONE : America/Sao_Paulo
SCHEDULE : "20 12 * * *"
SCHEDULE_NAME : schedule_departments_etl
SERVICE_ACCOUNT_NAME : dataproc-account-workflow
CUSTOM_ROLE : WorkflowCustomRole
STEP1_NAME : step_countries
STEP2_NAME : step_departments
STEP3_NAME : step_employees
STEP4_NAME : step_jobs
Deploy Buckets Job
This job is responsible for creating three Google Cloud Storage buckets: one for transient files, one for jar files, and one for PySpark scripts. It checks if each bucket already exists before attempting to create them. Additionally, it uploads specified files into these buckets to prepare for later processing.
jobs:
deploy-buckets:
runs-on: ubuntu-22.04
timeout-minutes: 10
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Authorize GCP
uses: 'google-github-actions/auth@v2'
with:
credentials_json: ${{ secrets.GCP_SA_KEY }}
# Step to Create GCP Bucket
- name: Create Google Cloud Storage - files
run: |-
if ! gsutil ls -p ${{ secrets.PROJECT_ID }} gs://${{ secrets.GCP_BUCKET_BIGDATA_FILES }} &> /dev/null; \
then \
gcloud storage buckets create gs://${{ secrets.GCP_BUCKET_BIGDATA_FILES }} --default-storage-class=nearline --location=${{ env.REGION }}
else
echo "Cloud Storage : gs://${{ secrets.GCP_BUCKET_BIGDATA_FILES }} already exists" !
fi
# Step to Create GCP Bucket
- name: Create Google Cloud Storage - dataproc
run: |-
if ! gsutil ls -p ${{ secrets.PROJECT_ID }} gs://${{ secrets.GCP_BUCKET_DATAPROC }} &> /dev/null; \
then \
gcloud storage buckets create gs://${{ secrets.GCP_BUCKET_DATAPROC }} --default-storage-class=nearline --location=${{ env.REGION }}
else
echo "Cloud Storage : gs://${{ secrets.GCP_BUCKET_DATAPROC }} already exists" !
fi
# Step to Create GCP Bucket
- name: Create Google Cloud Storage - datalake
run: |-
if ! gsutil ls -p ${{ secrets.PROJECT_ID }} gs://${{ secrets.GCP_BUCKET_DATALAKE }} &> /dev/null; \
then \
gcloud storage buckets create gs://${{ secrets.GCP_BUCKET_DATALAKE }} --default-storage-class=nearline --location=${{ env.REGION }}
else
echo "Cloud Storage : gs://${{ secrets.GCP_BUCKET_DATALAKE }} already exists" !
fi
# Step to Upload the file to GCP Bucket - transient files
- name: Upload transient files to Google Cloud Storage
run: |-
TARGET=${{ env.TRANSIENT_DATALAKE_FILES }}
BUCKET_PATH=${{ secrets.GCP_BUCKET_DATALAKE }}/${{ env.BUCKET_DATALAKE_FOLDER }}
gsutil cp -r $TARGET gs://${BUCKET_PATH}
# Step to Upload the file to GCP Bucket - jar files
- name: Upload jar files to Google Cloud Storage
run: |-
TARGET=${{ env.BUCKET_BIGDATA_JAR_FOLDER }}
BUCKET_PATH=${{ secrets.GCP_BUCKET_BIGDATA_FILES }}/${{ env.BUCKET_BIGDATA_JAR_FOLDER }}
gsutil cp -r $TARGET gs://${BUCKET_PATH}
# Step to Upload the file to GCP Bucket - pyspark files
- name: Upload pyspark files to Google Cloud Storage
run: |-
TARGET=${{ env.BUCKET_BIGDATA_PYSPARK_FOLDER }}
BUCKET_PATH=${{ secrets.GCP_BUCKET_BIGDATA_FILES }}/${{ env.BUCKET_BIGDATA_PYSPARK_FOLDER }}
gsutil cp -r $TARGET gs://${BUCKET_PATH}
# Step to create dataproc cluster
- name: Upload pyspark files to Google Cloud Storage
run: |-
TARGET=${{ env.BUCKET_BIGDATA_PYSPARK_FOLDER }}
BUCKET_PATH=${{ secrets.GCP_BUCKET_BIGDATA_FILES }}/${{ env.BUCKET_BIGDATA_PYSPARK_FOLDER }}
gsutil cp -r $TARGET gs://${BUCKET_PATH}
Explanation:
This job begins by checking out the code and authorizing Google Cloud credentials. It then checks for the existence of three specified Cloud Storage buckets—one for transient files, one for JAR files, and one for PySpark scripts. If these buckets do not exist, it creates them with gcloud. Finally, it uploads the relevant files to the corresponding buckets using gsutil.
Deploy Dataproc Workflow Template Job
This job deploys a Dataproc workflow template in Google Cloud. It begins by checking if the workflow template already exists; if not, it creates one. It also sets up a managed Dataproc cluster with specific configurations such as the machine types and number of workers. Subsequently, it adds various steps (jobs) to the workflow template to outline the processing tasks for data ingestion.
Code Snippet:
deploy-dataproc-workflow-template:
needs: [deploy-buckets]
runs-on: ubuntu-22.04
timeout-minutes: 10
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Authorize GCP
uses: 'google-github-actions/auth@v2'
with:
credentials_json: ${{ secrets.GCP_SA_KEY }}
- name: Create Dataproc Workflow
run: |-
if ! gcloud dataproc workflow-templates list --region=${{ env.REGION}} | grep -i ${{ env.DATAPROC_WORKFLOW_NAME}} &> /dev/null; \
then \
gcloud dataproc workflow-templates create ${{ env.DATAPROC_WORKFLOW_NAME }} --region ${{ env.REGION }}
else
echo "Workflow Template : ${{ env.DATAPROC_WORKFLOW_NAME }} already exists" !
fi
- name: Create Dataproc Managed Cluster
run: >
gcloud dataproc workflow-templates set-managed-cluster ${{ env.DATAPROC_WORKFLOW_NAME }}
--region ${{ env.REGION }}
--zone ${{ env.ZONE }}
--image-version ${{ env.DATAPROC_IMAGE_VERSION }}
--master-machine-type=${{ env.DATAPROC_MASTER_TYPE }}
--master-boot-disk-type ${{ env.DATAPROC_MASTER_BOOT_DISK_TYPE }}
--master-boot-disk-size ${{ env.DATAPROC_MASTER_BOOT_DISK_SIZE }}
--worker-machine-type=${{ env.DATAPROC_WORKER_TYPE }}
--worker-boot-disk-type ${{ env.DATAPROC_WORKER_BOOT_DISK_TYPE }}
--worker-boot-disk-size ${{ env.DATAPROC_WORKER_DISK_SIZE }}
--num-workers=${{ env.DATAPROC_NUM_WORKERS }}
--cluster-name=${{ env.DATAPROC_CLUSTER_NAME }}
--optional-components ${{ env.DATAPROC_COMPONENTS }}
--service-account=${{ env.GCP_SERVICE_ACCOUNT }}
- name: Add Job Ingestion countries to Workflow
run: |-
if gcloud dataproc workflow-templates list --region=${{ env.REGION}} | grep -i ${{ env.DATAPROC_WORKFLOW_NAME}} &> /dev/null; \
then \
if gcloud dataproc workflow-templates describe ${{ env.DATAPROC_WORKFLOW_NAME}} --region=${{ env.REGION}} | grep -i ${{ env.STEP1_NAME }} &> /dev/null; \
then \
echo "Workflow Template : ${{ env.DATAPROC_WORKFLOW_NAME }} already has step : ${{ env.STEP1_NAME }} " !
else
PYSPARK_SCRIPT_PATH=${{ secrets.GCP_BUCKET_BIGDATA_FILES }}/${{ env.BUCKET_BIGDATA_PYSPARK_FOLDER }}/${{ env.PYSPARK_INGESTION_SCRIPT }}
JARS_PATH=gs://${{ secrets.GCP_BUCKET_BIGDATA_FILES }}/${{ env.BUCKET_BIGDATA_JAR_FOLDER }}/${{ env.JAR_LIB1 }}
JARS_PATH=${JARS_PATH},gs://${{ secrets.GCP_BUCKET_BIGDATA_FILES }}/${{ env.BUCKET_BIGDATA_JAR_FOLDER }}/${{ env.JAR_LIB2 }}
TRANSIENT=${{ secrets.GCP_BUCKET_DATALAKE }}/${{ env.BUCKET_DATALAKE_FOLDER }}/${{ env.SUBJECT }}/${{ env.STEP1 }}
BRONZE=${{ secrets.GCP_BUCKET_DATALAKE }}/${{ env.BRONZE_DATALAKE_FILES }}/${{ env.SUBJECT }}/${{ env.STEP1 }}
gcloud dataproc workflow-templates add-job pyspark gs://${PYSPARK_SCRIPT_PATH} \
--workflow-template ${{ env.DATAPROC_WORKFLOW_NAME }} \
--step-id ${{env.STEP1_NAME }} \
--region ${{ env.REGION }} \
--jars ${JARS_PATH} \
-- --app_name=${{ env.APP_NAME }}${{ env.STEP1 }} --bucket_transient=gs://${TRANSIENT} \
--bucket_bronze=gs://${BRONZE}
fi
else
echo "Workflow Template : ${{ env.DATAPROC_WORKFLOW_NAME}} not exists" !
fi
- name: Add Job Ingestion departments to Workflow
run: |-
if gcloud dataproc workflow-templates list --region=${{ env.REGION}} | grep -i ${{ env.DATAPROC_WORKFLOW_NAME}} &> /dev/null; \
then \
if gcloud dataproc workflow-templates describe ${{ env.DATAPROC_WORKFLOW_NAME}} --region=${{ env.REGION}} | grep -i ${{ env.STEP2_NAME }} &> /dev/null; \
then \
echo "Workflow Template : ${{ env.DATAPROC_WORKFLOW_NAME }} already has step : ${{ env.STEP2_NAME }} " !
else
PYSPARK_SCRIPT_PATH=${{ secrets.GCP_BUCKET_BIGDATA_FILES }}/${{ env.BUCKET_BIGDATA_PYSPARK_FOLDER }}/${{ env.PYSPARK_INGESTION_SCRIPT }}
JARS_PATH=gs://${{ secrets.GCP_BUCKET_BIGDATA_FILES }}/${{ env.BUCKET_BIGDATA_JAR_FOLDER }}/${{ env.JAR_LIB1 }}
JARS_PATH=${JARS_PATH},gs://${{ secrets.GCP_BUCKET_BIGDATA_FILES }}/${{ env.BUCKET_BIGDATA_JAR_FOLDER }}/${{ env.JAR_LIB2 }}
TRANSIENT=${{ secrets.GCP_BUCKET_DATALAKE }}/${{ env.BUCKET_DATALAKE_FOLDER }}/${{ env.SUBJECT }}/${{ env.STEP2 }}
BRONZE=${{ secrets.GCP_BUCKET_DATALAKE }}/${{ env.BRONZE_DATALAKE_FILES }}/${{ env.SUBJECT }}/${{ env.STEP2 }}
gcloud dataproc workflow-templates add-job pyspark gs://${PYSPARK_SCRIPT_PATH} \
--workflow-template ${{ env.DATAPROC_WORKFLOW_NAME }} \
--step-id ${{ env.STEP2_NAME }} \
--start-after ${{ env.STEP1_NAME }} \
--region ${{ env.REGION }} \
--jars ${JARS_PATH} \
-- --app_name=${{ env.APP_NAME }}${{ env.STEP2 }} --bucket_transient=gs://${TRANSIENT} \
--bucket_bronze=gs://${BRONZE}
fi
else
echo "Workflow Template : ${{ env.DATAPROC_WORKFLOW_NAME}} not exists" !
fi
- name: Add Job Ingestion employees to Workflow
run: |-
if gcloud dataproc workflow-templates list --region=${{ env.REGION}} | grep -i ${{ env.DATAPROC_WORKFLOW_NAME}} &> /dev/null; \
then \
if gcloud dataproc workflow-templates describe ${{ env.DATAPROC_WORKFLOW_NAME}} --region=${{ env.REGION}} | grep -i ${{ env.STEP3_NAME }} &> /dev/null; \
then \
echo "Workflow Template : ${{ env.DATAPROC_WORKFLOW_NAME }} already has step : ${{ env.STEP3_NAME }} " !
else
PYSPARK_SCRIPT_PATH=${{ secrets.GCP_BUCKET_BIGDATA_FILES }}/${{ env.BUCKET_BIGDATA_PYSPARK_FOLDER }}/${{ env.PYSPARK_INGESTION_SCRIPT }}
JARS_PATH=gs://${{ secrets.GCP_BUCKET_BIGDATA_FILES }}/${{ env.BUCKET_BIGDATA_JAR_FOLDER }}/${{ env.JAR_LIB1 }}
JARS_PATH=${JARS_PATH},gs://${{ secrets.GCP_BUCKET_BIGDATA_FILES }}/${{ env.BUCKET_BIGDATA_JAR_FOLDER }}/${{ env.JAR_LIB2 }}
TRANSIENT=${{ secrets.GCP_BUCKET_DATALAKE }}/${{ env.BUCKET_DATALAKE_FOLDER }}/${{ env.SUBJECT }}/${{ env.STEP2 }}
PYSPARK_SCRIPT_PATH=${{ secrets.GCP_BUCKET_BIGDATA_FILES }}/${{ env.BUCKET_BIGDATA_PYSPARK_FOLDER }}/${{ env.PYSPARK_INGESTION_SCRIPT }}
JARS_PATH=gs://${{ secrets.GCP_BUCKET_BIGDATA_FILES }}/${{ env.BUCKET_BIGDATA_JAR_FOLDER }}/${{ env.JAR_LIB1 }}
JARS_PATH=${JARS_PATH},gs://${{ secrets.GCP_BUCKET_BIGDATA_FILES }}/${{ env.BUCKET_BIGDATA_JAR_FOLDER }}/${{ env.JAR_LIB2 }}
TRANSIENT=${{ secrets.GCP_BUCKET_DATALAKE }}/${{ env.BUCKET_DATALAKE_FOLDER }}/${{ env.SUBJECT }}/${{ env.STEP3 }}
BRONZE=${{ secrets.GCP_BUCKET_DATALAKE }}/${{ env.BRONZE_DATALAKE_FILES }}/${{ env.SUBJECT }}/${{ env.STEP3 }}
gcloud dataproc workflow-templates add-job pyspark gs://${PYSPARK_SCRIPT_PATH} \
--workflow-template ${{ env.DATAPROC_WORKFLOW_NAME }} \
--step-id ${{ env.STEP3_NAME }} \
--start-after ${{ env.STEP1_NAME }},${{ env.STEP2_NAME }} \
--region ${{ env.REGION }} \
--jars ${JARS_PATH} \
-- --app_name=${{ env.APP_NAME }}${{ env.STEP3 }} --bucket_transient=gs://${TRANSIENT} \
--bucket_bronze=gs://${BRONZE}
fi
else
echo "Workflow Template : ${{ env.DATAPROC_WORKFLOW_NAME}} not exists" !
fi
- name: Add Job Ingestion Jobs to Workflow
run: |-
if gcloud dataproc workflow-templates list --region=${{ env.REGION}} | grep -i ${{ env.DATAPROC_WORKFLOW_NAME}} &> /dev/null; \
then \
if gcloud dataproc workflow-templates describe ${{ env.DATAPROC_WORKFLOW_NAME}} --region=${{ env.REGION}} | grep -i ${{ env.STEP4_NAME }} &> /dev/null; \
then \
echo "Workflow Template : ${{ env.DATAPROC_WORKFLOW_NAME }} already has step : ${{ env.STEP4_NAME }} " !
else
PYSPARK_SCRIPT_PATH=${{ secrets.GCP_BUCKET_BIGDATA_FILES }}/${{ env.BUCKET_BIGDATA_PYSPARK_FOLDER }}/${{ env.PYSPARK_INGESTION_SCRIPT }}
JARS_PATH=gs://${{ secrets.GCP_BUCKET_BIGDATA_FILES }}/${{ env.BUCKET_BIGDATA_JAR_FOLDER }}/${{ env.JAR_LIB1 }}
JARS_PATH=${JARS_PATH},gs://${{ secrets.GCP_BUCKET_BIGDATA_FILES }}/${{ env.BUCKET_BIGDATA_JAR_FOLDER }}/${{ env.JAR_LIB2 }}
TRANSIENT=${{ secrets.GCP_BUCKET_DATALAKE }}/${{ env.BUCKET_DATALAKE_FOLDER }}/${{ env.SUBJECT }}/${{ env.STEP2 }}
PYSPARK_SCRIPT_PATH=${{ secrets.GCP_BUCKET_BIGDATA_FILES }}/${{ env.BUCKET_BIGDATA_PYSPARK_FOLDER }}/${{ env.PYSPARK_INGESTION_SCRIPT }}
JARS_PATH=gs://${{ secrets.GCP_BUCKET_BIGDATA_FILES }}/${{ env.BUCKET_BIGDATA_JAR_FOLDER }}/${{ env.JAR_LIB1 }}
JARS_PATH=${JARS_PATH},gs://${{ secrets.GCP_BUCKET_BIGDATA_FILES }}/${{ env.BUCKET_BIGDATA_JAR_FOLDER }}/${{ env.JAR_LIB2 }}
TRANSIENT=${{ secrets.GCP_BUCKET_DATALAKE }}/${{ env.BUCKET_DATALAKE_FOLDER }}/${{ env.SUBJECT }}/${{ env.STEP4 }}
BRONZE=${{ secrets.GCP_BUCKET_DATALAKE }}/${{ env.BRONZE_DATALAKE_FILES }}/${{ env.SUBJECT }}/${{ env.STEP4 }}
gcloud dataproc workflow-templates add-job pyspark gs://${PYSPARK_SCRIPT_PATH} \
--workflow-template ${{ env.DATAPROC_WORKFLOW_NAME }} \
--step-id ${{ env.STEP4_NAME }} \
--start-after ${{ env.STEP1_NAME }},${{ env.STEP2_NAME }},${{ env.STEP3_NAME }} \
--region ${{ env.REGION }} \
--jars ${JARS_PATH} \
-- --app_name=${{ env.APP_NAME }}${{ env.STEP4 }} --bucket_transient=gs://${TRANSIENT} \
--bucket_bronze=gs://${BRONZE}
fi
else
echo "Workflow Template : ${{ env.DATAPROC_WORKFLOW_NAME}} not exists" !
fi
Explanation
This job follows a systematic approach to deploying a Dataproc workflow template. It first checks if the workflow template exists and creates it if it does not. Next, a managed Dataproc cluster is configured with specified properties (e.g., number of workers, machine type). The job also adds specified steps for data ingestion tasks to the workflow template, detailing how data should be processed. The remaining steps for Add Job are structured similarly, each focusing on different data ingestion tasks within the workflow.
Deploy Cloud Schedule Job
This job sets up a scheduling mechanism using Google Cloud Scheduler. It creates a service account specifically for the scheduled job, defines a custom role with specific permissions, and binds the custom role to the service account. Finally, it creates the cloud schedule to trigger the execution of the workflow at defined intervals.
deploy-cloud-schedule:
needs: [deploy-buckets, deploy-dataproc-workflow-template]
runs-on: ubuntu-22.04
timeout-minutes: 10
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Authorize GCP
uses: 'google-github-actions/auth@v2'
with:
credentials_json: ${{ secrets.GCP_SA_KEY }}
# Step to Authenticate with GCP
- name: Set up Cloud SDK
uses: google-github-actions/setup-gcloud@v2
with:
version: '>= 363.0.0'
project_id: ${{ secrets.PROJECT_ID }}
# Step to Configure Docker to use the gcloud command-line tool as a credential helper
- name: Configure Docker
run: |-
gcloud auth configure-docker
- name: Create service account
run: |-
if ! gcloud iam service-accounts list | grep -i ${{ env.SERVICE_ACCOUNT_NAME}} &> /dev/null; \
then \
gcloud iam service-accounts create ${{ env.SERVICE_ACCOUNT_NAME }} \
--display-name="scheduler dataproc workflow service account"
fi
- name: Create Custom role for service account
run: |-
if ! gcloud iam roles list --project ${{ secrets.PROJECT_ID }} | grep -i ${{ env.CUSTOM_ROLE }} &> /dev/null; \
then \
gcloud iam roles create ${{ env.CUSTOM_ROLE }} --project ${{ secrets.PROJECT_ID }} \
--title "Dataproc Workflow template scheduler" --description "Dataproc Workflow template scheduler" \
--permissions "dataproc.workflowTemplates.instantiate,iam.serviceAccounts.actAs" --stage ALPHA
fi
- name: Add the custom role for service account
run: |-
gcloud projects add-iam-policy-binding ${{secrets.PROJECT_ID}} \
--member=serviceAccount:${{env.SERVICE_ACCOUNT_NAME}}@${{secrets.PROJECT_ID}}.iam.gserviceaccount.com \
--role=projects/${{secrets.PROJECT_ID}}/roles/${{env.CUSTOM_ROLE}}
- name: Create cloud schedule for workflow execution
run: |-
if ! gcloud scheduler jobs list --location ${{env.REGION}} | grep -i ${{env.SCHEDULE_NAME}} &> /dev/null; \
then \
gcloud scheduler jobs create http ${{env.SCHEDULE_NAME}} \
--schedule="30 12 * * *" \
--description="Dataproc workflow " \
--location=${{env.REGION}} \
--uri=https://dataproc.googleapis.com/v1/projects/${{secrets.PROJECT_ID}}/regions/${{env.REGION}}/workflowTemplates/${{env.DATAPROC_WORKFLOW_NAME}}:instantiate?alt=json \
--time-zone=${{env.TIME_ZONE}} \
--oauth-service-account-email=${{env.SERVICE_ACCOUNT_NAME}}@${{secrets.PROJECT_ID}}.iam.gserviceaccount.com
fi
Explanation
In this job, a service account is created specifically for handling the scheduled workflow execution. It also defines a custom role that grants the necessary permissions for the service account to instantiate the workflow template. This custom role is then associated with the service account to ensure it has the required permissions. Finally, the job creates a cloud schedule that triggers the workflow execution at predetermined times, ensuring automated execution of the data processing workflow.
Resources created after deploy process
Dataproc Workflow Template
After deploying the project, you can access the Dataproc service to view the Workflow template. In the Workflow tab, you can explore various options, including monitoring workflow executions and analyzing their details.
When you select the created workflow, you can see the cluster used for processing and the steps that comprise the workflow, including any dependencies between the steps. This visibility allows you to track the workflow's operational flow.
Additionally, within the Dataproc service, you can monitor the execution status of each job. It provides details about each execution, including the performance of individual steps within the workflow template, as illustrated below.
Cloud Scheduler
By accessing the Cloud Scheduler service, you'll find the scheduled job created during the deployment process. The interface displays the last run status, the defined schedule for execution, and additional details about the target URL and other parameters.
Cloud Storage
As part of the deployment process, several Cloud Storage buckets are created: one bucket for storing data related to the data lake, another for the Dataproc cluster, and a third for the PySpark scripts and libraries used in the project. The Dataproc service itself creates a cluster to manage temporary data generated during processing.
After the data processing is complete, a new directory is established in the designated Cloud Storage bucket to save the ingested data from the data lake. The transient directory, created during the deployment phase, serves as the location where data was copied from the GitHub repository to Cloud Storage. In a production environment, another application would likely handle the ingestion of data into this transient layer.
Conclusion
Data pipelines are crucial components in the landscape of data processing. While there are robust and feature-rich tools available, such as Azure Data Factory and Apache Airflow, simpler solutions can be valuable in certain scenarios. The decision on the most appropriate tool ultimately rests with the data and architecture teams, who must assess the specific needs and context to select the best solution for the moment.
Links and References
Github Repo
Dataproc workflow documentation
Posted on August 21, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
August 21, 2024