Playing PyFlink from Scratch

lazypro

ChunTing Wu

Posted on October 17, 2022

Playing PyFlink from Scratch

I am trying to play with PyFlink recently, but there is not much information available on the Internet, and most of the information is a bit outdated. So I'll document how I set up the environment and actually experiment with PyFlink.

Before starting, let's set the experiment goals.

  1. To run PyFlink instead of Flink, because I can't code in JAVA.
  2. To run on K8s, not docker or standalone.
  3. Use the latest version of Flink, 1.15.2.

I briefly describe the entire experimental process.

  1. Create a K8s cluster.
  2. Deploy PyFlink cluster.
  3. Upload the job from the local machine.
  4. Check the job status in the dashboard.

Now, I believe you all understand the experimental process, so let's start preparing the preconditions.

Once all is ready, let's get started!

Create a K8s cluster

My personal preference is to use K3s to create clusters rather than minikube.

There are several reasons.

  1. It is easier to share the environment. My experimental environment is on EC2, so anyone who knows the location can use it.
  2. It is more resource efficient. minikube has high system requirements, while K3s itself is designed for IoT devices and is relatively resource efficient.
  3. K3s is a regular K8s, there are no special features.

After setting up the cluster according to the official document, some extra settings are required.

  1. Create a namespace.
    • kubectl create ns flink
  2. Create a service account.
    • kubectl create serviceaccount flink -n flink
  3. Grant authorization for the service account.
    • kubectl create clusterrolebinding flink-role-bind --clusterrole=edit --serviceaccount=flink:flink

Build a PyFlink cluster

To build a PyFlink cluster we need to prepare the container image to support PyFlink first as follows.

https://hub.docker.com/r/wirelessr/pyflink

The Dockerfile to build the container is also available in Dockerhub.

Next, download the latest stable version of flink artifact locally.

https://www.apache.org/dyn/closer.lua/flink/flink-1.15.2/flink-1.15.2-bin-scala_2.12.tgz

After decompression we have to add some fat jar that is required but not packaged into it.

Download bcprov-jdk15on and bcpkix-jdk15on jar files at

Then move them to the folder

./flink-1.15.2/lib

When everything is ready, we can run the PyFlink cluster on K8s.

./bin/kubernetes-session.sh \
 -Dkubernetes.namespace=flink \
 -Dkubernetes.jobmanager.service-account=flink \
 -Dkubernetes.rest-service.exposed.type=NodePort \
 -Dkubernetes.cluster-id=flink-cluster \
 -Dkubernetes.jobmanager.cpu=0.2 \
 -Djobmanager.memory.process.size=1024m \
 -Dresourcemanager.taskmanager-timeout=3600000 \
 -Dkubernetes.taskmanager.cpu=0.2 \
 -Dtaskmanager.memory.process.size=1024m \
 -Dtaskmanager.numberOfTaskSlots=1 \
 -Dkubernetes.container.image=wirelessr/pyflink:1.15.2-scala_2.12-python_3.7
Enter fullscreen mode Exit fullscreen mode

There are two key points noteworthy.

  1. rest-service.exposed.type=NodePort. In order to allow users to access the PyFlink cluster, and to simplify the experiment process, we choose to use NodePort as the external interface.
  2. We must specify the container.image that can support PyFlink, in this case, the Dockerhub mentioned earlier.

By using kubectl get svc -n flink we can know which port the dashboard (flink-cluster-rest) is running on. This dashboard location will also be the entry point for the jobs we want to submit later.

Submit a job

Most of the Flink examples will use examples/batch/WordCount.jar as the first Hello World.

So we can also use this Jar file to test whether Flink can run correctly.

./bin/flink run \
  -m ${flink-cluster-rest}:${port} \
  examples/batch/WordCount.jar
Enter fullscreen mode Exit fullscreen mode

After submitting, we can check the result in the dashboard, normally, it will be executed successfully. But this example is JAVA-based, so we'll proceed with a Python-based example to verify PyFlink.

Before we start, we need to install the PyFlink package locally.

pip3 install apache-flink==1.15.2

If we are not using M1 chip, this command should install successfully without any problems.

Same as the WordCount example, Python also has a corresponding WordCount example in examples/python/table/word_count.py.

However, the Python example needs to be modified a bit to work properly. Find this comment line:

remove .wait if submitting to a remote cluster

Follow the instructions to remove the .wait above, and then we try to submit the Python job.

./bin/flink run \
  -m ${flink-cluster-rest}:${port} \
  -py ./examples/python/table/word_count.py
Enter fullscreen mode Exit fullscreen mode

Returning to the dashboard should show the job is also performing properly.

Conclusion

There is very little information about PyFLink, both in terms of environment installation and coding reference, so I hope this article will be helpful to you.

However, this article does not explain the details of Flink cluster and how to develop PyFlink in detail, I'll leave those details for a future article.

💖 💪 🙅 🚩
lazypro
ChunTing Wu

Posted on October 17, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related

Playing PyFlink from Scratch
bigdata Playing PyFlink from Scratch

October 17, 2022