Playing PyFlink from Scratch
ChunTing Wu
Posted on October 17, 2022
I am trying to play with PyFlink recently, but there is not much information available on the Internet, and most of the information is a bit outdated. So I'll document how I set up the environment and actually experiment with PyFlink.
Before starting, let's set the experiment goals.
- To run PyFlink instead of Flink, because I can't code in JAVA.
- To run on K8s, not docker or standalone.
- Use the latest version of Flink, 1.15.2.
I briefly describe the entire experimental process.
- Create a K8s cluster.
- Deploy PyFlink cluster.
- Upload the job from the local machine.
- Check the job status in the dashboard.
Now, I believe you all understand the experimental process, so let's start preparing the preconditions.
- Do not use M1 chips. The current stable version 1.15.2 does not support M1 chips, so installing PyFlink on M1 would be suffering.
- JAVA 11
- Python 3.6-3.8
Once all is ready, let's get started!
Create a K8s cluster
My personal preference is to use K3s to create clusters rather than minikube.
There are several reasons.
- It is easier to share the environment. My experimental environment is on EC2, so anyone who knows the location can use it.
- It is more resource efficient. minikube has high system requirements, while K3s itself is designed for IoT devices and is relatively resource efficient.
- K3s is a regular K8s, there are no special features.
After setting up the cluster according to the official document, some extra settings are required.
- Create a namespace.
kubectl create ns flink
- Create a service account.
kubectl create serviceaccount flink -n flink
- Grant authorization for the service account.
kubectl create clusterrolebinding flink-role-bind --clusterrole=edit --serviceaccount=flink:flink
Build a PyFlink cluster
To build a PyFlink cluster we need to prepare the container image to support PyFlink first as follows.
https://hub.docker.com/r/wirelessr/pyflink
The Dockerfile
to build the container is also available in Dockerhub.
Next, download the latest stable version of flink artifact locally.
https://www.apache.org/dyn/closer.lua/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz
After decompression we have to add some fat jar that is required but not packaged into it.
Download bcprov-jdk15on and bcpkix-jdk15on jar files at
- https://mvnrepository.com/artifact/org.bouncycastle/bcprov-jdk15on
- https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-jdk15on
Then move them to the folder
./flink-1.15.2/lib
When everything is ready, we can run the PyFlink cluster on K8s.
./bin/kubernetes-session.sh \
-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Dkubernetes.cluster-id=flink-cluster \
-Dkubernetes.jobmanager.cpu=0.2 \
-Djobmanager.memory.process.size=1024m \
-Dresourcemanager.taskmanager-timeout=3600000 \
-Dkubernetes.taskmanager.cpu=0.2 \
-Dtaskmanager.memory.process.size=1024m \
-Dtaskmanager.numberOfTaskSlots=1 \
-Dkubernetes.container.image=wirelessr/pyflink:1.15.2-scala_2.12-python_3.7
There are two key points noteworthy.
-
rest-service.exposed.type=NodePort
. In order to allow users to access the PyFlink cluster, and to simplify the experiment process, we choose to useNodePort
as the external interface. - We must specify the
container.image
that can support PyFlink, in this case, the Dockerhub mentioned earlier.
By using kubectl get svc -n flink
we can know which port the dashboard (flink-cluster-rest) is running on. This dashboard location will also be the entry point for the jobs we want to submit later.
Submit a job
Most of the Flink examples will use examples/batch/WordCount.jar
as the first Hello World.
So we can also use this Jar file to test whether Flink can run correctly.
./bin/flink run \
-m ${flink-cluster-rest}:${port} \
examples/batch/WordCount.jar
After submitting, we can check the result in the dashboard, normally, it will be executed successfully. But this example is JAVA-based, so we'll proceed with a Python-based example to verify PyFlink.
Before we start, we need to install the PyFlink package locally.
pip3 install apache-flink==1.15.2
If we are not using M1 chip, this command should install successfully without any problems.
Same as the WordCount example, Python also has a corresponding WordCount example in examples/python/table/word_count.py
.
However, the Python example needs to be modified a bit to work properly. Find this comment line:
remove .wait if submitting to a remote cluster
Follow the instructions to remove the .wait
above, and then we try to submit the Python job.
./bin/flink run \
-m ${flink-cluster-rest}:${port} \
-py ./examples/python/table/word_count.py
Returning to the dashboard should show the job is also performing properly.
Conclusion
There is very little information about PyFLink, both in terms of environment installation and coding reference, so I hope this article will be helpful to you.
However, this article does not explain the details of Flink cluster and how to develop PyFlink in detail, I'll leave those details for a future article.
Posted on October 17, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.