Data Engineering with Scala: Mastering Real-Time Data Processing with Apache Flink and Google Pub/Sub

geazi_anc

Geazi Anc

Posted on October 18, 2024

Data Engineering with Scala: Mastering Real-Time Data Processing with Apache Flink and Google Pub/Sub

Note: this article is also available in brazilian portuguese 🌎

Apache Flink is a distributed data processing framework for both batch and streaming processing. It can be used to develop event-driven applications; perform batch and streaming data analysis; and can be used to develop ETL data pipelines.

Pub/Sub is a scalable, asynchronous messaging service from Google that separates the services that produce messages from the services that process them. It is used for streaming analytics and data integration pipelines to load and distribute data, and is equally effective as a messaging middleware for service integration or as a queue to load tasks in parallel.

In this article, we will develop a very simple real-time data pipeline using Apache Flink in conjunction with version 3 of the Scala programming language, using Pub/Sub as a message broker. Before we begin, let's align expectations?

First, this article is not intended to be an introductory article to Apache Flink. If you have never heard of it before, I suggest you read the first steps from the official documentation. Read it without fear! The Apache Flink documentation is excellent!

Second, although Apache Flink has an official API for the Scala language, it has been deprecated and will be removed in future versions. You can learn more about this here. However, since Scala is a JVM-based language and Apache Flink is developed in Java, it is perfectly possible to still use the Scala language for development with Apache Flink, but using the Java APIs. Yes, I also turned my nose up at that. Nobody deserves that! But, to make our lives easier, we will use the Flink Scala API library, which is nothing less than a fork of the official Flink Scala API, but completely maintained by the community. I highly recommend this library!

Third, finally, we will develop a very simple real-time data pipeline. The goal is not to provide a complex example, but rather to provide a guide to working with Apache Flink with the Scala language plus Pub/Sub as a message broker. I had a hard time finding a decent article that used these three technologies together.

What will we see in this article?

Now, enough talk. Let's get started!

1. Problem definition

A web application is responsible for receiving the initial registration of new customers from a large Brazilian retail company called My Awesome Company, hereinafter MAC, mac.br. The application sends the initial registration of new customers in real time to a Pub/Sub topic, and you must develop a data pipeline that processes this data in real time, enriches the initial customer registration with some relevant business information and, finally, sends it to a final topic in Pub/Sub. Pretty simple, right?

The web application sends the following payload to Pub/Sub:

{
  "fullName": "string",
  "birthDate": "string"
}
Enter fullscreen mode Exit fullscreen mode

Where:

  • fullName is the client's full name (dann!);
  • birthDate is the customer's date of birth, in the format _year-month-day*;

The data pipeline must enrich this basic customer registration with some relevant business information:

  • It is necessary to split the client's full name into first name and last name;
  • The client's current age must be calculated based on their date of birth;
  • If the customer is over 30 years old, registration should not be carried out and the customer should be listed as inactive;
  • Add a createdAt field, related to the customer creation date.

With this understanding, let's start coding!

2. Setup

Hold on! Let's not start coding yet 🙍🏼 . We'll need to configure a few things first. The initial configurations we'll have to do are the following:

  • Creation of topics and subscriptions in Pub/Sub;
  • Installation of the dependencies required for the data pipeline to work;

2.1. Creating topics and subscriptions in Pub/Sub

To create topics and subscriptions in Pub/Sub, we will be using the official Google Cloud CLI, gcloud. Follow these instructions if you do not already have the CLI properly configured on your machine.

Now, what topics need to be created?

  • created-customer: the topic where the MAC web application will send the payloads relating to the initial customer registrations;
  • registered-customer: the final topic where our data pipeline will send customers with their respective registrations duly enriched;

Let's start with the created-customer topic. For this topic, we also need to create a standard subscription of type pull:

# creating the topic created-customer
$ gcloud pubsub topics create created-customer
Created topic [projects/my-project-id/topics/created-customer].

# now, creating a pull subscription to the topic created-customer
$ gcloud pubsub subscriptions create created-customer-sub --topic=created-customer
Created subscription [projects/my-project-id/subscriptions/created-customer-sub].
Enter fullscreen mode Exit fullscreen mode

Now, let's create the registered-customer topic. For this topic, we also need to create a default subscription of type pull:

# creating the registered-customer topic
$ gcloud pubsub topics create registered-customer
Created topic [projects/my-project-id/topics/registered-customer].

# now, creating a pull subscription to the registered-customer topic
$ gcloud pubsub subscriptions create registered-customer-sub --topic=registered-customer
Created subscription [projects/my-project-id/subscriptions/registered-customer-sub].
Enter fullscreen mode Exit fullscreen mode

2.2. Installing dependencies

Now yes! Time to code! 🎉

First of all, the development of our data pipeline will not be based on SBT projects. We will use the Scala CLI, a command-line tool that allows compile, run, test and package Scala code. Based on the Scala CLI, we can develop Scala scripts in a very practical and fast way!

To install dependencies, we will use a Scala CLI feature called directives. Directives are ways of defining configurations within the own source code, without needing a build tool like SBT for this. One of the directives we will use is to define the dependencies that our pipeline will use, namely:

  • Apache Flink Client (Apache Flink's own dependency);
  • Flink Scala API (a community-maintained library that allows us to develop code in Apache Flink with Scala APIs);
  • Flink Connector GCP PubSub: the official Apache Flink connector that allows us to send and receive Pub/Sub messages;
  • Toolkit: a set of useful libraries for everyday tasks, including the uPickle library, used to serialize and deserialize JSON;

To begin, create a directory called br-mac, and a file called Customers.sc inside it:

$ mkdir br-mac
...
$ cd br-mac
...
$ touch Customers.sc
...
Enter fullscreen mode Exit fullscreen mode

Now, inside the Customers.sc file, add the following lines that are related to the directives for installing the necessary dependencies:

//> using toolkit default
//> using dep "org.flinkextended::flink-scala-api:1.18.1_1.1.6"
//> using dep "org.apache.flink:flink-clients:1.18.1"
//> using dep org.apache.flink:flink-connector-gcp-pubsub:3.1.0-1.18
Enter fullscreen mode Exit fullscreen mode

And add the imports that will be used later:

import br.mac.customers.models.*
import br.mac.customers.serializations.*
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.connectors.gcp.pubsub.{PubSubSink, PubSubSource}
import org.apache.flinkx.api.*
import org.apache.flinkx.api.serializers.*
Enter fullscreen mode Exit fullscreen mode

Done! Dependencies and imports have been defined. Let's move on.

3. Data pipeline development

Now it's time to build the data pipeline itself with Apache Flink! This build will consist of six parts:

  • Business models and requirements;
  • Development of serializers and deserializers;
  • Using ParameterTool so that we can get some relevant information for our pipeline through command line arguments;
  • Development of PubSubSource so that Apache Flink can read data from the Pub/Sub created-customer topic;
  • Development of PubSubSink so that Apache Flink can send the processed data to the registered-customer topic in Pub/Sub;
  • Development of the data pipeline core applying business requirements;

Let's go?

3.1. Business models and requirements

Business models are the information that we will receive and send to Pub/Sub. As mentioned before, we will receive a payload in JSON format from Pub/Sub, and send a payload to Pub/Sub also in JSON format. We need to model this payload in Scala classes.

Since these classes are representations of JSON payloads, we will use the uPickle library to serialize them.
and deserialize them into JSON format. If you are not familiar with the uPickle library, I highly recommend you read the documentation. It is also an excellent library!

An example of a payload that we will receive, related to the initial customer registration, is the following:

{
  "fullName": "John Doe",
  "birthDate": "1995-01-01"
}
Enter fullscreen mode Exit fullscreen mode

An example of a payload that we will send to Pub/Sub, related to the final customer registration, is the following:

{
  "firstName": "John",
  "lastName": "Doe",
  "age": 29,
  "isActive": true,
  "createdAt": "2024-08-08T18:07:44.167635713Z"
}
Enter fullscreen mode Exit fullscreen mode

Create another file called Models.scala. Note that this time the file extension is .scala, not .sc. This is because this file is a Scala module, not a Scala script.

In the file, add the following lines:

package br.mac.customers.models

import upickle.default.*

final case class CreatedCustomer(fullName: String, birthDate: String) derives ReadWriter
final case class RegisteredCustomer(firstName: String, lastName: String, age: Int, isActive: Boolean, createdAt: String)
    derives ReadWriter
Enter fullscreen mode Exit fullscreen mode

Done! However, we are not finished with our models yet. We need to define some methods so that we can satisfy the business requirements that were defined, which are:

  • It is necessary to split the client's full name into first name and last name;
  • The customers's current age must be calculated based on their date of birth;
  • If the customer is over 30 years old, registration should not be carried out and the customer should be listed as inactive;
  • Add a createdAt field, related to the customer creation date.

The first and second business requirements can be defined as methods in the CreatedCustomer class. For the third, we can define a constructor for the RegisteredCustomer class that creates an instance of the class with the isActive attribute set to true and the createdAt attribute set to the current time. The fourth requirement will be addressed in the data pipeline itself.

For the first and second requirement, we need to make some imports in the Models.scala file:

import java.time.temporal.ChronoUnit
import java.time.{Instant, LocalDate}
Enter fullscreen mode Exit fullscreen mode

And we can now define the methods in the CreatedCustomer class:

final case class CreatedCustomer(fullName: String, birthDate: String) derives ReadWriter:
def firstName: String = fullName.split(" ").head
def lastName: String = fullName.split(" ").last
def age: Int = ChronoUnit.YEARS.between(LocalDate.parse(birthDate), LocalDate.now()).toInt
Enter fullscreen mode Exit fullscreen mode

Finally, let's declare the constructor for the RegisteredCustomer class. We'll do this by defining the apply method on the companion object:

object RegisteredCustomer:
def apply(firstName: String, lastName: String, age: Int): RegisteredCustomer =
RegisteredCustomer(firstName, lastName, age, true, Instant.now().toString)
Enter fullscreen mode Exit fullscreen mode

So the final code for the Models.scala file looks like this:

package br.mac.customers.models

import upickle.default.*

import java.time.temporal.ChronoUnit
import java.time.{Instant, LocalDate}

final case class CreatedCustomer(fullName: String, birthDate: String) derives ReadWriter:
def firstName: String = fullName.split(" ").head
def lastName: String = fullName.split(" ").last
def age: Int = ChronoUnit.YEARS.between(LocalDate.parse(birthDate), LocalDate.now()).toInt

final case class RegisteredCustomer(firstName: String, lastName: String, age: Int, isActive: Boolean, createdAt: String)
derives ReadWriter

object RegisteredCustomer:
def apply(firstName: String, lastName: String, age: Int): RegisteredCustomer =
RegisteredCustomer(firstName, lastName, age, true, Instant.now().toString)
Enter fullscreen mode Exit fullscreen mode

3.2. Defining serializers and deserializers

When we talk about Apache Flink connectors, as is the case with the Apache Flink connector for Pub/Sub, we need to keep in mind two fundamental concepts: serializers and deserializers. In other words, serializations.

Serializers are responsible for transforming primitive data types, both from Java and Scala, to be sent to the destination in binary format. Deserializers are responsible for transforming the data received from the source and transforming it into object instances. of the programming languages used.

In our case, we need to create a serializer that receives an instance of one of our newly created classes, transforms them into JSON strings, and transforms them into binary so that they can then be sent to Pub/Sub. The process is exactly the opposite for deserializers. We need to transform a message, a JSON string, that Pub/Sub sends in binary format and transform this message into an instance of the newly created classes.

It's a relatively simple process. To deserialize the JSON string into an instance of the case class, we'll use uPickle. If you're already familiar with Flink, you might be wondering why we don't do this process with the flink-json library. Simple, I had a lot of problems using it to deserialize the JSON strings into the case classes. Therefore, I found it more practical to create a custom deserializer that uses the uPickle library for this process.

Enough talk! Let's code!

Create another file in the directory called Serializations.scala and add the following lines inside it:

package br.mac.customers.serializations

import br.mac.customers.models.*
import org.apache.flink.api.common.serialization.{AbstractDeserializationSchema, SerializationSchema}
import upickle.default.{read, write}
Enter fullscreen mode Exit fullscreen mode

Let's create the deserializer for the CreatedCustomer class. To do this, simply define a class that extends the AbstractDeserializationSchema abstract class, and define the deserialize method. For more information, see this documentation.

class CreatedCustomerDeserializer extends AbstractDeserializationSchema[CreatedCustomer]:
  override def deserialize(message: Array[Byte]): CreatedCustomer = read[CreatedCustomer](new String(message, "UTF-8"))
Enter fullscreen mode Exit fullscreen mode

See? I told you it was simple!

Now let's define the serializer for the RegisteredCustomer class.

class RegisteredCustomerSerializer extends SerializationSchema[RegisteredCustomer]:
override def serialize(element: RegisteredCustomer): Array[Byte] =
write[RegisteredCustomer](element).getBytes("UTF-8")
Enter fullscreen mode Exit fullscreen mode

The interesting thing about this approach is that we can use any library we want to serialize and deserialize JSON strings. If we were using the flink-json library, we would be stuck using Java's jackson library. Yes, I also got goosebumps just thinking about it!

The final code for the Serializations.scala file looks like this:

package br.mac.customers.serializations

import br.mac.customers.models.*
import org.apache.flink.api.common.serialization.{AbstractDeserializationSchema, SerializationSchema}
import upickle.default.{read, write}

class CreatedCustomerDeserializer extends AbstractDeserializationSchema[CreatedCustomer]:
override def deserialize(message: Array[Byte]): CreatedCustomer = read[CreatedCustomer](new String(message, "UTF-8"))

class RegisteredCustomerSerializer extends SerializationSchema[RegisteredCustomer]:
override def serialize(element: RegisteredCustomer): Array[Byte] =
write[RegisteredCustomer](element).getBytes("UTF-8")
Enter fullscreen mode Exit fullscreen mode

We end here with serializers and deserializers. Let's continue!

3.3. Pipeline arguments

In order to make our pipeline as flexible as possible, we must have a way to receive some parameters that are relevant to the functioning of our application, without having to hard-code this information. This information is:

  • Google Cloud Platform project ID;
  • Name of the Pub/Sub subscription from which Apache Fllink will consume data;
  • Name of the Pub/Sub topic where Apache Flink will send the processed data;

To do this, we will receive this information through command line arguments. To do this, we will use a built-in Apache Flink utility called ParameterTool. You can learn more about using this utility in this documentation.

Let's get to work! Add the following lines to the Customers.sc file:

val parameters = ParameterTool.fromArgs(args)
val projectName = parameters.get("project")
val subscriptionName = parameters.get("subscription-name")
val topicName = parameters.get("topic-name")
Enter fullscreen mode Exit fullscreen mode

Done! With this, we can pass the project ID, subscription name and topic name to our pipeline through the --project, --subscription-name and --topic-name parameters, respectively.

3.4. Pub/Sub source

The Pub/Sub source, as mentioned, is the way Apache Flink will read data from Pub/Sub. We will build this source using the official Apache Flink connector for Pub/Sub. If you are interested in learning more about this connector, check out this documentation.

The Pub/Sub source constructor requires the following information:

  • Deserializer: the way Apache Flink will transform the message received from Pub/Sub into Scala language objects. Remember the deserializer for the CreatedCustomer class that we developed above? So, that's what we'll be using;
  • ProjectName: The name of the GCP project where you created the Pub/Sub topics and subscriptions;
  • SubscriptionName: the name of the subscription from which Apache Flink will consume data related to the initial registration of customers;

Add the following lines to the file:

val pubsubSource = PubSubSource
.newBuilder()
.withDeserializationSchema(new CreatedCustomerDeserializer())
.withProjectName(projectName)
.withSubscriptionName(subscriptionName)
.build()
Enter fullscreen mode Exit fullscreen mode

And that's it! Pretty simple too, right?

3.5. Pub/Sub Sink

Phew, we're almost done. Let's build the PubSub Sink for our pipeline.

As stated, Pub/Sub Sink is a way for Apache Flink to send processed data to Pub/Sub. The Pub/Sub Sink constructor requires the following information:

  • Serializer: the way Apache Flink will transform the RegisteredCustomer class instance into a JSON string and then into binary and send it to Pub/Sub. Remember the serializer we created earlier? That's the one we're going to use!
  • ProjectName: The name of the GCP project where you created the Pub/Sub topics and subscriptions;
  • TopicName: the name of the topic that Apache Fllink will send the processed data to;

Add the following lines to the file:

val pubsubSink = PubSubSink
.newBuilder()
.withSerializationSchema(new RegisteredCustomerSerializer())
.withProjectName(projectName)
.withTopicName(topicName)
.build()
Enter fullscreen mode Exit fullscreen mode

3.6. Data pipeline and application of business requirements

We have finally reached the last stage of development! Let's build the core of our data pipeline! As a reminder, our data pipeline will:

  • Read the initial customer registrations from the Pub/Sub created-customer topic;
  • Apply transformations and rules according to business requirements, such as:
  • Split the customer's name into first and last name;
  • Calculate the customer's age based on their date of birth;
  • Set the client creation date;
  • If the customer's age is greater than or equal to 30 years, do not register the customer and set the isActive status to false;
  • Send the processed data to the registered-customer topic in Pub/Sub;

Let's go! Let's get to work!

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(1000L)

env
.addSource(pubsubSource) // reading data from the created-customer topic
.map(cc => RegisteredCustomer(cc.firstName, cc.lastName, cc.age)) // splitting the customer's name into first and last name, calculating the age and setting the creation date
.map(rc => if rc.age >= 30 then rc.copy(isActive = false) else rc) // checking if the client's age is greater than or equal to 30
.addSink(pubsubSink) // sending the processed data to the registered-customer topic

env.execute("customerRegistering")
Enter fullscreen mode Exit fullscreen mode

Is it finished? Yes, it is finished! Here is what the complete code looks like:

//> using toolkit default
//> using dep "org.flinkextended::flink-scala-api:1.18.1_1.1.6"
//> using dep "org.apache.flink:flink-clients:1.18.1"
//> using dep org.apache.flink:flink-connector-gcp-pubsub:3.1.0-1.18

import br.mac.customers.models.*
import br.mac.customers.serializations.*
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.connectors.gcp.pubsub.{PubSubSink, PubSubSource}
import org.apache.flinkx.api.*
import org.apache.flinkx.api.serializers.*

val parameters = ParameterTool.fromArgs(args)
val projectName = parameters.get("project")
val subscriptionName = parameters.get("subscription-name")
val topicName = parameters.get("topic-name")

val pubsubSource = PubSubSource
.newBuilder()
.withDeserializationSchema(new CreatedCustomerDeserializer())
.withProjectName(projectName)
.withSubscriptionName(subscriptionName)
.build()
val pubsubSink = PubSubSink
.newBuilder()
.withSerializationSchema(new RegisteredCustomerSerializer())
.withProjectName(projectName)
.withTopicName(topicName)
.build()

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(1000L)

env
.addSource(pubsubSource)
.map(cc => RegisteredCustomer(cc.firstName, cc.lastName, cc.age))
.map(rc => if rc.age >= 30 then rc.copy(isActive = false) else rc)
.addSink(pubsubSink)

env.execute("customerRegistering")
Enter fullscreen mode Exit fullscreen mode

4. Running the data pipeline

Before running the pipeline, access Pub/Sub through your browser console, access the created-customer topic, and manually send some messages according to the CreatedCustomer payload schema. For example:

{
  "fullName": "John Doe",
  "birthDate": "1995-01-01"
}
Enter fullscreen mode Exit fullscreen mode

Let's see all this in action. To do so, run the data pipeline through the Scala CLI. There is no need to package the data pipeline and upload it to a Flink cluster. We are working here in local mode.

Run the data pipeline with the following command. Note the application parameters as we defined previously:

$ scala-cli . -- \
--project your-project-id-here \
--subscription-name created-customer-sub \
--topic-name registered-customer
# ...
Compiling project (Scala 3.4.2, JVM (11))
Compiled project (Scala 3.4.2, JVM (11))
SLF4J(W): No SLF4J providers were found.
SLF4J(W): Defaulting to no-operation (NOP) logger implementation
SLF4J(W): See https://www.slf4j.org/codes.html#noProviders for further details.
Enter fullscreen mode Exit fullscreen mode

Running! Open Pub/Sub in your browser, go to the registered-customer topic, and click Pull. This will show you the data that was processed by Apache Flink 🎉 .

Press CTRL + C to stop the pipeline execution.

5. Conclusion

And we've reached the end of the article! Today, we did:

  • We define the problem of the company My Awesome Company (MAC);
  • We define the JSON payloads that would be received and sent to the Pub/Sub topics;
  • We define the business requirements that would be applied to the received data;
  • We created two topics in Pub/Sub: one to receive the message regarding the initial registration of customers and another to send the data after being processed by Apache Flink;
  • We developed the data pipeline in Apache Flink, defining the business models for each payload received and sent; serializers and deserializers of JSON strings; and finally the data pipeline itself, applying the previously defined business rules;

That's all for today, guys! If you liked it, give me a little push and hit like and share it with your friends, okay?

See you next time 💚

💖 💪 🙅 🚩
geazi_anc
Geazi Anc

Posted on October 18, 2024

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related