Alvaro Fragoso
Posted on December 5, 2022
Last week I finished a data engineering course, I was very happy with what I learned, lots of database, pipelines and tools. Then I went looking for what else I could learn and realized that I was missing a very important tool. I had no idea about how to use Apache Kafka and everybody is using it!! So, I went back to my laptop, launched a kafka server and started playing with it and, to my surprise, it's quite easy to use!
Today I'll show you how to launch a kafka server and build a dummy realtime sleep tracker with python.
Here we go. The first thing you'll need is to have java and kafka installed.
In Mac it is really easy using homebrew. Simply run this in your terminal:
brew install java
brew install kafka
Cool! Next we'll need to make sure we can access kafka from python. Here you have multiple options but I'll use kafka-python. You can install it like this:
pip3 install kafka-python
Alright, now we have everything we need.
To start the server, you'll first start zookeeper:
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
And then you can run the kafka server:
kafka-server-start /usr/local/etc/kafka/server.properties
Be careful!
Sometimes you'll encounter a connection broken error. To fix this, simply modify the server.properties file and uncomment the server setting to listeners=PLAINTEXT://:9092
. You can use vim like this:
vim /usr/local/etc/kafka/server.properties
Ooof. Ok, now with everything working, we can go to the cool part. You can create a topic on your server like this. We'll call it pillow as we expect to receive data from a pillow but you can name it however you want.
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic pillow
Cool, now to python. You'll need one script that generates the data and sends it to kafka (producer) and another that reads it (consumer) and works with it. So, we'll create a producer.py and a consumer.py.
The producer.py will look like this:
from time import sleep
from json import dumps
from kafka import KafkaProducer
import random
# initializing the Kafka producer
my_producer = KafkaProducer(
bootstrap_servers = ['localhost:9092'],
value_serializer = lambda x:dumps(x).encode('utf-8')
)
# We generate random data every 5 seconds and send it to kafka
while True:
my_data = {'sr' : round(random.uniform(45, 100), 2), 'rr' : round(random.uniform(16, 30), 2), 't' : round(random.uniform(85, 99), 2), 'lm' : round(random.uniform(4, 19), 2), 'bo' : round(random.uniform(82, 97), 2), 'rem': round(random.uniform(60, 105), 2), 'sr.1' :round(random.uniform(0, 9), 2), 'hr': round(random.uniform(50, 85), 2)}
my_producer.send('pillow', value = my_data)
sleep(5)
As you can see, it is really simple! First, we imported all necesary dependencies, created our kafka producer specifying the server and started sending messages with random data.
Now we need to develop our consumer. To analyze the data we sent, I made a fake linear model that predicts the stress level of the user based on the parameters we sent before, because it is out of the scope of the project I will not explain it and it is probably not working properly but you can check the code in this repo as well as the rest of the code.
Anyhow, here is the structure of our consumer.py:
from json import loads
from kafka import KafkaConsumer
import pickle
import pandas as pd
# We create our consumer
my_consumer = KafkaConsumer(
'pillow',
bootstrap_servers=['localhost : 9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8'))
)
# Load the model we created before
loaded_model = pickle.load(open("model.sav", 'rb'))
# Analyze every message
for message in my_consumer:
print(message.value)
message = pd.DataFrame([message.value])
result = loaded_model.predict(message)
if result >= 4:
print(f"User is very stressed. Result: {result}")
elif result >= 3:
print(f"User is stressed. Result: {result}")
else:
print(f"User is resting correctly. Result: {result}")
As you can see it is also really simple! Again, we import our libraries, create our consumer making sure we read from the correct topic, load the model and start analyzing messages.
Woohooo! That's all we needed to start working with kafka and now you can use this simple code to build more powerful scripts, connect it to real data and become a straming master!
I hope you enjoyed this little tutorial as much as I did. Let me know in the comments what you built with kafka :)
Posted on December 5, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.