Confluent - MQTT <-> KAFKA
Hari Krishnan
Posted on May 22, 2022
Spark structured streaming has some limitations on listening to MQTT data sources in its latest version, though there are some extensions available, they support only Spark version 2.0.
Supported Spark Streaming sources
- Flume
- Kafka
- file sources
- TCP/IP port
- Kinesis
For people looking for what MQTT is ?
https://mqtt.org/
So to make mqtt data available for spark to stream, we have to connect MQTT with Kafka, there are many tools available to achieve this. One such is https://www.confluent.io/
Follow the below steps to bring your MQTT device data to a Kafka queue.
INSTALL CONFLUENT
-> install confluent in local machine (https://www.confluent.io/installation/)
-> set the path for confluent cli
CHECK FOR THE STATUS OF SERVICES
confluent local services kafka status
START KAFKA USING CONFLUENT CLI
confluent local services kafka start
AFTER STARTING KAFKA, START THE KAFKA CONNECT SERVICE
confluent local services connect start
INSTALL MOSQUITTO
We would need mosquitto to run an MQTT broker which will expose topics on which we can publish and subscribe data.
https://mosquitto.org/
brew instal mosquitto
START MOSQUITTO BROKER LOCALLY
brew services start mosquitto
CHECK IF WE CAN PUBLISH TO THE MOSQUITTO TOPIC
mosquitto_pub -h localhost -p 1883 -t temperature -m "sample-msg-1"
*SUBSCRIBE TO THE TOPIC AND TRY SENDING MESSAGES *
mosquitto_sub -h localhost -p 1883 -t temperature
INSTALL THE MQTT CONNECTOR PLUGIN
confluent-hub install confluentinc/kafka-connect-mqtt:latest
RESTART AND START THE CONNECT SERVICE
confluent local services connect stop && confluent local services connect start
CHECK IF MQTT PLUGIN IS INSTALLED USING THE BELOW COMMAND
curl -s "http://localhost:8083/connector-plugins"
CONNECT KAFKA WITH MQTT BROKER USING THE BELOW COMMAND
curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '{
"name" : "mqtt-source",
"config" : {
"connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
"tasks.max" : "1",
"mqtt.server.uri" : "tcp://127.0.0.1:1883",
"mqtt.topics" : "temperature",
"kafka.topic" : "mqtt.temperature",
"confluent.topic.bootstrap.servers": "localhost:9092",
"confluent.topic.replication.factor": "1",
"confluent.license":""
}
}'
CHECK IF THE CONNECTOR HAS A STATUS : RUNNING
curl -s "http://localhost:8083/connectors"
curl -s "http://localhost:8083/connectors/mqtt-source/status"
*CREATE A KAFKA TOPIC *
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mqtt.temperature
FINALLY TEST BY CREATING TWO CONSUMERS
mosquitto_sub -h localhost -p 1883 -t temperature
kafka-console-consumer --bootstrap-server localhost:9092 --topic mqtt.temperature --property print.key=true --from-beginning
Now if we publish the data to the mqtt topic, both the consumers created above will receive the data pushed in the mqtt topic.
Posted on May 22, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.