Build a Streaming SQL Pipeline with Apache Flink and Apache Kafka
Francesco Tisiot
Posted on June 3, 2021
Apache Kafka is the perfect base for any streaming application: a solid, highly available, fault tolerant platform that makes reliable communication between streaming components as easy as writing to a disk.
Apache Flink adds the power of stateful data transformations to the picture. Being able to calculate, persist, recover and process data in a similar distributed, highly available, fault tolerant fashion that Kafka provides. Apache Flink is available from a variety of languages: from the more traditional Java and Scala all the way to Python and SQL. A previous post showed how you can create your Docker version of Apache Flink including its SQL Client. In this post, we will demonstrate how you can use the best streaming combination: Apache Flink and Kafka to create pipelines defined using data practitioners' favourite language: SQL!
Here's how it goes:
- Setting up Apache Kafka
- Set up Apache Flink on Docker
- Create a Keystore for Kafka's SSL certificates
- Create some test data with Kafkacat
- Define the source Kafka topic as Flink Table
- Transform and insert data
- Check the pipeline output
1. Set up Apache Kafka
Apache Kafka is our basic data storage platform. We can create an cluster via Aiven's Command line interface on our terminal:
avn service create -p business-4 \
-t kafka demo-kafka \
--cloud google-europe-west3 \
-c kafka_rest=true \
-c kafka.auto_create_topics_enable=true
This sets up an Apache Kafka cluster named demo-kafka
in google-europe-west3
, enabling Kafka REST APIs and topic auto creation. If you want to wait until the demo-kafka
service is ready to use, you can use the following command:
avn service wait demo-kafka
2. Set up Apache Flink on Docker
In a previous post I outlined how to properly set up Apache Flink on Docker. With Docker, we can have a working environment running in minutes without needing to fiddle with installation and configurations. The previous post provides instructions on how to set up such an environment and how to create a file-to-PostgreSQL data pipeline. In this article, I'm going to assume that you correctly configured Apache Flink and that the service is up and running.
A quick summary of the required steps if you didn't follow the previous post. Clone the aiven/flink-sql-cli-docker
repository with the following code in your terminal
git clone https://github.com/aiven/sql-cli-for-apache-flink-docker.git
Now open the sql-cli-for-apache-flink-docker
folder and start the docker compose:
cd sql-cli-for-apache-flink-docker
docker-compose up -d
At this stage when running
docker-compose ps
You should see an output like the following
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
315677df38a7 flink:1.16.0-scala_2.12-java11 "/docker-entrypoint.…" 22 minutes ago Up 22 minutes 6123/tcp, 8081/tcp sql-cli-for-apache-flink-docker-taskmanager-1
dbeaeae681e3 sql-cli-for-apache-flink-docker-sql-client "/docker-entrypoint.…" 22 minutes ago Up 22 minutes 6123/tcp, 8081/tcp sql-client
e4c0d2214376 flink:1.16.0-scala_2.12-java11 "/docker-entrypoint.…" 22 minutes ago Up 22 minutes 6123/tcp, 0.0.0.0:8081->8081/tcp sql-cli-for-apache-flink-docker-jobmanager-1
Telling you that Flink's job manager, task manager and sql-client containers are all ready to be used.
3. Create a Keystore for Kafka's SSL certificates
Aiven's Apache Kafka enables SSL authentication by default. To safely connect to it from Apache Flink, we need to use the Java Keystore and Truststore. We can generate them with the following command in our terminal, assuming we are in the flink-sql-cli-docker
folder created in the previous steps:
avn service user-kafka-java-creds demo-kafka \
--username avnadmin \
-d settings/certs \
-p password123
The command creates a folder named certs
under settings
and stores the certificate files together with a Keystore and Truststore (named client.keystore.p12
and client.truststore.jks
) secured with the password123
password string.
4. Create some test data with Kafkacat
We can use Kafkacat to create some data. After installing it, let's create a file kafkacat.config
under our certs
folder with the following content:
bootstrap.servers=<host>:<port>
security.protocol=ssl
ssl.key.location=service.key
ssl.certificate.location=service.cert
ssl.ca.location=ca.pem
To find the <host>
and <port>
parameters, use the following call:
avn service get demo-kafka --format '{service_uri_params}'
Now open a new terminal, navigate to the certs
folder and execute this:
kafkacat -F kafkacat.config -P -t people
Kafkacat sends every new line appearing in the terminal as message to Kafka in the people
topic. Paste the following lines in the terminal:
{"name":"Jon","country":"USA","age":40}
{"name":"Ava","country":"England","age":35}
{"name":"Pino","country":"Italy","age":25}
{"name":"Carla","country":"Italy","age":45}
Four messages have been sent to people
topic in our Apache Kafka environment. Keep this window open - you'll use it again later to insert more messages.
5. Define the source Kafka topic as Flink Table
As mentioned in the previous post, we can enter Flink's sql-client
container to create a SQL pipeline by executing the following command in a new terminal window:
docker exec -it sql-client /bin/bash
Now we're in, and we can start the Flink's SQL client with
./sql-client.sh
Define a source for the people
Kafka topic with the following code (replace the <host>
and <port>
parameters to correctly point to Kafka as mentioned):
CREATE TABLE people_source (
name VARCHAR,
country VARCHAR,
age INT
) WITH (
'connector' = 'kafka',
'property-version' = 'universal',
'properties.bootstrap.servers' = '<host>:<port>',
'topic' = 'people',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'json',
'properties.security.protocol' = 'SSL',
'properties.ssl.endpoint.identification.algorithm' = '',
'properties.ssl.truststore.location' = '/settings/certs/client.truststore.jks',
'properties.ssl.truststore.password' = 'password123',
'properties.ssl.keystore.type' = 'PKCS12',
'properties.ssl.keystore.location' = '/settings/certs/client.keystore.p12',
'properties.ssl.keystore.password' = 'password123',
'properties.ssl.key.password' = 'password123',
'properties.group.id' = 'my-working-group'
);
The command above defines a Flink table named people_source
with the following properties:
- Three columns:
name
,country
andage
- Connecting to Apache Kafka (
connector = 'kafka'
) - Reading from the start (
scan.startup.mode
) of the topicpeople
(topic
) which format is JSON (value.format
) with consumer being part of themy-working-group
consumer group. - Connecting via the
bootstrap.servers
and using theSSL
security protocol (properties.security.protocol
) with theclient.truststore.jks
andclient.keystore.p12
stores.
After executing it, we should receive a message saying [INFO] Table has been created.
. Please note that this doesn't mean it's working! We can test it properly by issuing the following sql statement from the sql-client
terminal
select * from people_source;
Which will result in
+/- name country age
+ Jon USA 40
+ Ava England 35
+ Pino Italy 25
+ Carla Italy 45
To leave Flink's table view, press Q
.
Solving the volume permission problem on Linux
If you're on Linux, you'll probably hit an error like this:
[ERROR] Could not execute SQL statement. Reason:
java.nio.file.AccessDeniedException: /settings/certs/client.keystore.p12
The error is due to a couple of factors:
- The
client.keystore.p12
file generated is by default readable only from the user who created it (-rw -- --
) - The way docker-compose mounts the volumes: the folder where keystore files resides is owned by user root (
uid 1000
)
The combination of the two make the file client.keystore.p12
inaccessible by Flink (executed by user flink
with uid 9999
). To solve the problem, make the keystore readable by the flink
user by redefining the folder ownership:
Find its id with the following command in a terminal from the flink-sql-cli-docker
folder in your host
docker exec sql-cli-for-apache-flink-docker-taskmanager-1 id flink
The result will be similar to the below
uid=9999(flink) gid=9999(flink) groups=9999(flink)
Now we can use flink
's uid
to set the settings
folder ownership, always by executing the following command in the same terminal window (replacing the 9999
with flink
's uid
from the above call if necessary)
sudo chown -R 9999 ./settings
After executing it, retry the select * from people_source;
statement which should succeed.
6. Transform and insert data
Now it's time to see Flink's beauty in action: we are going to set up a process that analyses the streaming data coming from the people
Kafka topic, calculates some aggregated KPIs and publishes them to a target datastore, in our case a new Kafka topic. We're doing everything only using SQL statements.
Flink is so flexible that you can run a similar exercise with a huge variety of technologies as sources or targets. The Kafka examples shown in this blog could be replaced with any JDBC database, local files, Elasticsearch or Hive with only few changes in our SQL definitions. The list of supported connectors can be found in Flink's website.
Defining the target Kafka topic as a Flink table
For the purpose of this article, let's assume we want to push aggregated data to a new Kafka topic containing the average age and the number of people in a specific country. To do so, we first need to define Flink's target table structure with the following code in Flink's sql-cli terminal window (replacing, as before, the <host>:<port>
section with Kafka's endpoint):
CREATE TABLE country_target (
country VARCHAR,
avg_age BIGINT,
nr_people BIGINT,
PRIMARY KEY (country) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'property-version' = 'universal',
'properties.bootstrap.servers' = '<host>:<port>',
'topic' = 'country_agg',
'value.format' = 'json',
'key.format' = 'json',
'properties.security.protocol' = 'SSL',
'properties.ssl.endpoint.identification.algorithm' = '',
'properties.ssl.truststore.location' = '/settings/certs/client.truststore.jks',
'properties.ssl.truststore.password' = 'password123',
'properties.ssl.keystore.type' = 'PKCS12',
'properties.ssl.keystore.location' = '/settings/certs/client.keystore.p12',
'properties.ssl.keystore.password' = 'password123',
'properties.ssl.key.password' = 'password123',
'properties.group.id' = 'my-working-group'
);
The above SQL creates a Flink's table with three columns country
primary key, avg-age
and nr_people
. The connector is upsert-kafka
since we want to update the topic always with the most updated version of the KPIs per country (PRIMARY KEY (country)
). The WITH
clause specifies that we will push data to the country_agg
Kafka topic using the same connection properties as the people_source
connector.
What's even cooler about this is that with a few small amendments to the WITH
statement above, we could publish the result of our data pipeline to a completely different technology endpoint. An example of Flink's table definition over a database is provided in the article Apache Flink SQL client on Docker.
Setting up the data pipeline
Once the country_target
destination endpoint is defined, we can finally create the SQL pipeline by defining the query aggregation logic and related insert statement. The following code provides exactly what we need, we can paste it in Flink's sql-cli terminal window:
insert into country_target
select country,
avg(age),
count(*)
from people_source
group by country;
We should receive a message telling that our SQL pipeline was successfully deployed, quite like this:
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: 95b57225d702ab9c9402daba10fe6a84
Now if we query the country_target
table from Flink's SQL client with:
select * from country_target;
We should see something like this:
+/- country avg_age nr_people
+ USA 40 1
+ England 35 1
+ Italy 25 1
- Italy 25 1
+ Italy 35 2
It's telling us that we have one entry for USA
and England
as expected, but three entries for Italy
- weird, since we pushed only two records for that country. This is due to the fact that we want to keep only the current KPIs status and are experiencing Flink's sql-client changelog
result-mode. The above result is telling us that we had in order:
- Insert entry for
Italy
with1
people of average age of25
-> Sign+
- Delete entry #1 -> Sign
-
- Insert entry for
Italy
with2
people of average age of35
-> Sign+
Flink's changelog
view is great if we want to check how KPIs have been calculated over time. On the other hand, if we just want to browse the up-to-date situation we can move to Flink's table
result mode by executing the following in Flink's sql-cli terminal:
SET execution.result-mode = table;
And now when re-issuing the select * from country_target;
will show just the current situation:
country avg_age nr_people
USA 40 1
England 35 1
Italy 35 2
7. Check the pipeline output
Now we want to verify that the Flink records have been successfully written to the desired Kafka topic. From a new terminal window positioned on the flink-sql-cli-docker/settings/certs
we can execute this:
kafkacat -F kafkacat.config -C -t country_agg
The command will start Kafkacat in consumer mode (-C
) listening on topic country_agg
(the same used in Flink's table definition). The output will be the list of updated records on the various KPIs:
{"country":"USA","avg_age":40,"nr_people":1}
{"country":"England","avg_age":35,"nr_people":1}
{"country":"Italy","avg_age":25,"nr_people":1}
{"country":"Italy","avg_age":35,"nr_people":2}
% Reached end of topic country_agg [0] at offset 4
If we now add a row to our people
topic via the first Kafkacat window in producer mode, thus:
{"name":"Mark","country":"England","age":37}
We can immediately see the streaming pipeline in action with a new line appearing in the country_agg
Kafka topic on the Kafkacat consumer terminal, containing the updated avg_age
and nr_people
KPIs:
{"country":"England","avg_age":36,"nr_people":2}
% Reached end of topic country_agg [0] at offset 5
Wow, we managed to build a whole analytics pipeline!
We started by inserting JSON records into a Kafka topic with kafkacat representing our streaming input. The topic was then registered in Flink which we later configured to transform and aggregate the data. The output was finally stored in a new Kafka topic.
The whole pipeline was built with just three SQL statements and, with minor changes, we could quickly swap the data source or target using Flink as "abstraction layer" on top of our data technology. We just demonstrated a very simple use case, but Flink can be a game changer in a huge variety of situations. Your batch ETL now seems a bit seasoned, doesn't it?
Build a streaming pipeline in pure SQL
SQL is the most known and loved language across data practitioners. The union of Apache Kafka and Flink provides a simple, highly available and scalable toolset that can let them focus on building real time data pipelines rather than learning and debugging complex code. Flink SQL capabilities enhance all the benefits of building Kafka-based data hubs, with the capability of joining in external data assets and delivering data pipelines output to a huge variety of targets.
Some more resources, that you could find interesting:
- Flink SQL Client Documentation - to understand Flink SQL client functionality
- Flink - Apache Kafka SQL Connector - to check Apache Kafka table definition
- Flink - Apache Kafka Upsert SQL Connector - to review Apache Kafka Upsert parameters
- Aiven Console - to create and manage your Apache Kafka cluster
Posted on June 3, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.