How to use TensorFlow with Redpanda for real-time machine learning
The Team @ Redpanda
Posted on June 15, 2022
According to TensorFlow's website:
TensorFlow is an end-to-end open-source platform for machine learning. It has a comprehensive, flexible ecosystem of tools, libraries, and community resources that lets researchers push the state-of-the-art in ML and developers easily build and deploy ML-powered applications.
People on the street import TensorFlow as tf
, and so will the code below. Download the Apache Kafka®-IO wrappers for producing and consuming TensorFlow native record format straight from Kafka without intermediaries. Because Redpanda is a drop-in replacement for Kafka, and Kafka-API compatible, you can use TensorFlow with Redpanda, too (minus the need for Zookeeper® or JVM, of course).
Here's how to integrate Redpanda with TensorFlow to get you started in building real-time machine learning projects.
Install Redpanda
In this tutorial, I’ll only cover the Ubuntu install, but you can view all our quickstart documentation on our website. To install Redpanda with Ubuntu, run:
curl -1sLf \
'https://packages.vectorized.io/nzc4ZYQK3WRGd9sy/redpanda/cfg/setup/bash.deb.sh' \
| sudo -E bash
sudo apt-get install redpanda && sudo systemctl start redpanda
Install deps on your machine
pip3 install --user kafka-python tensorflow-io sklearn pandas
Copy this gist
I copied this gist from the main GitHub repo of the TensorFlow / io organization:
#Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from datetime import datetime
import time
import threading
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
from sklearn.model_selection import train_test_split
import pandas as pd
import tensorflow as tf
import tensorflow_io as tfio
print("tensorflow-io version: {}".format(tfio.__version__))
print("tensorflow version: {}".format(tf.__version__))
COLUMNS = [
# labels
'class',
# low-level features
'lepton_1_pT',
'lepton_1_eta',
'lepton_1_phi',
'lepton_2_pT',
'lepton_2_eta',
'lepton_2_phi',
'missing_energy_magnitude',
'missing_energy_phi',
# high-level derived features
'MET_rel',
'axial_MET',
'M_R',
'M_TR_2',
'R',
'MT2',
'S_R',
'M_Delta_R',
'dPhi_r_b',
'cos(theta_r1)'
]
susy_iterator = pd.read_csv('SUSY.csv.gz', header=None, names=COLUMNS, chunksize=100000)
susy_df = next(susy_iterator)
susy_df.head()
# Number of datapoints and columns
len(susy_df), len(susy_df.columns)
# Number of datapoints belonging to each class (0: background noise, 1: signal)
len(susy_df[susy_df["class"]==0]), len(susy_df[susy_df["class"]==1])
train_df, test_df = train_test_split(susy_df, test_size=0.4, shuffle=True)
print("Number of training samples: ",len(train_df))
print("Number of testing sample: ",len(test_df))
x_train_df = train_df.drop(["class"], axis=1)
y_train_df = train_df["class"]
x_test_df = test_df.drop(["class"], axis=1)
y_test_df = test_df["class"]
# The labels are set as the kafka message keys so as to store data
# in multiple-partitions. Thus, enabling efficient data retrieval
# using the consumer groups.
x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:]))
y_train = list(filter(None, y_train_df.to_csv(index=False).split("\n")[1:]))
x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:]))
y_test = list(filter(None, y_test_df.to_csv(index=False).split("\n")[1:]))
NUM_COLUMNS = len(x_train_df.columns)
len(x_train), len(y_train), len(x_test), len(y_test)
def error_callback(exc):
raise Exception('Error while sendig data to kafka: {0}'.format(str(exc)))
def write_to_kafka(topic_name, items):
count=0
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
for message, key in items:
producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8')).add_errback(error_callback)
count+=1
producer.flush()
print("Wrote {0} messages into topic: {1}".format(count, topic_name))
write_to_kafka("susy-train", zip(x_train, y_train))
write_to_kafka("susy-test", zip(x_test, y_test))
def decode_kafka_item(item):
message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)])
key = tf.strings.to_number(item.key)
return (message, key)
BATCH_SIZE=64
SHUFFLE_BUFFER_SIZE=64
train_ds = tfio.IODataset.from_kafka('susy-train', partition=0, offset=0)
train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)
train_ds = train_ds.map(decode_kafka_item)
train_ds = train_ds.batch(BATCH_SIZE)
OPTIMIZER="adam"
LOSS=tf.keras.losses.BinaryCrossentropy(from_logits=True)
METRICS=['accuracy']
EPOCHS=10
# design/build the model
model = tf.keras.Sequential([
tf.keras.layers.Input(shape=(NUM_COLUMNS,)),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(256, activation='relu'),
tf.keras.layers.Dropout(0.4),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.4),
tf.keras.layers.Dense(1, activation='sigmoid')
])
print(model.summary())
# compile the model
model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)
# fit the model
model.fit(train_ds, epochs=EPOCHS)
test_ds = tfio.experimental.streaming.KafkaGroupIODataset(
topics=["susy-test"],
group_id="testcg",
servers="127.0.0.1:9092",
stream_timeout=10000,
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"auto.offset.reset=earliest"
],
)
def decode_kafka_test_item(raw_message, raw_key):
message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
key = tf.strings.to_number(raw_key)
return (message, key)
test_ds = test_ds.map(decode_kafka_test_item)
test_ds = test_ds.batch(BATCH_SIZE)
res = model.evaluate(test_ds)
print("test loss, test acc:", res)
online_train_ds = tfio.experimental.streaming.KafkaBatchIODataset(
topics=["susy-train"],
group_id="cgonline",
servers="127.0.0.1:9092",
stream_timeout=30000, # in milliseconds, to block indefinitely, set it to -1.
configuration=[
"session.timeout.ms=7000",
"max.poll.interval.ms=8000",
"auto.offset.reset=earliest"
],
)
def error_callback(exc):
raise Exception('Error while sendig data to kafka: {0}'.format(str(exc)))
def write_to_kafka_after_sleep(topic_name, items):
time.sleep(30)
print("#"*100)
print("Writing messages into topic: {0} after a nice sleep !".format(topic_name))
print("#"*100)
count=0
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
for message, key in items:
producer.send(topic_name,
key=key.encode('utf-8'),
value=message.encode('utf-8')
).add_errback(error_callback)
count+=1
producer.flush()
print("#"*100)
print("Wrote {0} messages into topic: {1}".format(count, topic_name))
print("#"*100)
def decode_kafka_online_item(raw_message, raw_key):
message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])
key = tf.strings.to_number(raw_key)
return (message, key)
thread = threading.Thread(target=write_to_kafka_after_sleep,
args=("susy-train", zip(x_train, y_train)))
thread.daemon = True
thread.start()
for mini_ds in online_train_ds:
mini_ds = mini_ds.shuffle(buffer_size=32)
mini_ds = mini_ds.map(decode_kafka_online_item)
mini_ds = mini_ds.batch(32)
model.fit(mini_ds, epochs=3)
Download the data
curl -sSOL https://archive.ics.uci.edu/ml/machine-learning-databases/00279/SUSY.csv.gz
Profit!
I had to install the CUDA libs to get my GPU humming. However, after a quick install of that, I was up and running!
❯ python3 main.py
2021-01-11 22:23:02.910794: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
tensorflow-io version: 0.17.0
tensorflow version: 2.4.0
Number of training samples: 60000
Number of testing sample: 40000
Wrote 60000 messages into topic: susy-train
Wrote 40000 messages into topic: susy-test
2021-01-11 22:23:10.847745: I tensorflow_io/core/kernels/cpu_check.cc:128] Your CPU supports instructions that this TensorFlow IO binary was not compiled to use: AVX2 FMA
2021-01-11 22:23:10.902048: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-11 22:23:10.902534: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
2021-01-11 22:23:10.928441: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected
2021-01-11 22:23:10.928471: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: darktower
2021-01-11 22:23:10.928483: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: darktower
2021-01-11 22:23:10.928573: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 460.27.4
2021-01-11 22:23:10.928598: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 455.28.0
2021-01-11 22:23:10.928608: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 455.28.0 does not match DSO version 460.27.4 -- cannot find working devices in this configuration
2021-01-11 22:23:10.931185: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set
2021-01-11 22:23:11.935134: I tensorflow_io/core/kernels/kafka_kernels.cc:349] Kafka tail: 59449
Model: "sequential"
_________________________________________________________________
Layer (type) Output Shape Param #
=================================================================
dense (Dense) (None, 128) 2432
_________________________________________________________________
dropout (Dropout) (None, 128) 0
_________________________________________________________________
dense_1 (Dense) (None, 256) 33024
_________________________________________________________________
dropout_1 (Dropout) (None, 256) 0
_________________________________________________________________
dense_2 (Dense) (None, 128) 32896
_________________________________________________________________
dropout_2 (Dropout) (None, 128) 0
_________________________________________________________________
dense_3 (Dense) (None, 1) 129
=================================================================
Total params: 68,481
Trainable params: 68,481
Non-trainable params: 0
_________________________________________________________________
None
2021-01-11 22:23:12.036891: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2021-01-11 22:23:12.038427: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2894530000 Hz
Epoch 1/10
2021-01-11 22:23:12.283066: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 0
1/Unknown - 0s 415ms/step - loss: 0.7726 - accuracy: 0.37502021-01-11 22:23:12.518990: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 1024
29/Unknown - 1s 20ms/step - loss: 0.6915 - accuracy: 0.54202021-01-11 22:23:13.043534: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 2048
46/Unknown - 2s 25ms/step - loss: 0.6721 - accuracy: 0.57392021-01-11 22:23:13.575452: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 3072
62/Unknown - 2s 27ms/step - loss: 0.6572 - accuracy: 0.59512021-01-11 22:23:14.102275: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 4096
78/Unknown - 3s 28ms/step - loss: 0.6444 - accuracy: 0.61152021-01-11 22:23:14.638575: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 5120
93/Unknown - 3s 29ms/step - loss: 0.6345 - accuracy: 0.62332021-01-11 22:23:15.167787: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 6144
109/Unknown - 4s 30ms/step - loss: 0.6251 - accuracy: 0.63442021-01-11 22:23:15.696971: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 7168
126/Unknown - 4s 30ms/step - loss: 0.6165 - accuracy: 0.64432021-01-11 22:23:16.213187: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 8192
128/Unknown - 5s 33ms/step - loss: 0.6155 - accuracy: 0.64542021-01-11 22:23:16.733228: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 9216
144/Unknown - 5s 33ms/step - loss: 0.6086 - accuracy: 0.65312021-01-11 22:23:17.252568: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 10240
160/Unknown - 6s 33ms/step - loss: 0.6024 - accuracy: 0.65982021-01-11 22:23:17.771701: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 11264
190/Unknown - 6s 31ms/step - loss: 0.5925 - accuracy: 0.67032021-01-11 22:23:18.295851: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 12288
192/Unknown - 7s 33ms/step - loss: 0.5920 - accuracy: 0.67092021-01-11 22:23:18.815791: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 13312
222/Unknown - 7s 31ms/step - loss: 0.5842 - accuracy: 0.67902021-01-11 22:23:19.343171: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 14336
224/Unknown - 8s 33ms/step - loss: 0.5837 - accuracy: 0.67952021-01-11 22:23:19.857738: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 15360
254/Unknown - 8s 31ms/step - loss: 0.5770 - accuracy: 0.68632021-01-11 22:23:20.379426: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 16384
270/Unknown - 9s 31ms/step - loss: 0.5737 - accuracy: 0.68952021-01-11 22:23:20.899491: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 17408
286/Unknown - 9s 31ms/step - loss: 0.5706 - accuracy: 0.69252021-01-11 22:23:21.420403: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 18432
302/Unknown - 10s 31ms/step - loss: 0.5677 - accuracy: 0.69522021-01-11 22:23:21.941930: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 19456
304/Unknown - 10s 33ms/step - loss: 0.5674 - accuracy: 0.69562021-01-11 22:23:22.461011: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 20480
Note: I did time myself when copying the gists into my teminal and it was a bit less than 3 minutes.
To learn more about what you can do with Redpanda, watch our YouTube videos, join our Slack community, or download the binary from GitHub. Follow us on Twitter via @redpandadata or follow me personally at @emaxerrno.
Posted on June 15, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.