Confluent - MQTT <-> KAFKA

hariraghupathy

Hari Krishnan

Posted on May 22, 2022

Confluent - MQTT <-> KAFKA

Spark structured streaming has some limitations on listening to MQTT data sources in its latest version, though there are some extensions available, they support only Spark version 2.0.

Supported Spark Streaming sources

  1. Flume
  2. Kafka
  3. file sources
  4. TCP/IP port
  5. Kinesis
  6. Twitter

For people looking for what MQTT is ?
https://mqtt.org/

So to make mqtt data available for spark to stream, we have to connect MQTT with Kafka, there are many tools available to achieve this. One such is https://www.confluent.io/

Follow the below steps to bring your MQTT device data to a Kafka queue.

INSTALL CONFLUENT

    -> install confluent in local machine (https://www.confluent.io/installation/)
    -> set the path for confluent cli
Enter fullscreen mode Exit fullscreen mode

CHECK FOR THE STATUS OF SERVICES

    confluent local services kafka status
Enter fullscreen mode Exit fullscreen mode

START KAFKA USING CONFLUENT CLI

    confluent local services kafka start
Enter fullscreen mode Exit fullscreen mode

AFTER STARTING KAFKA, START THE KAFKA CONNECT SERVICE

    confluent local services connect start
Enter fullscreen mode Exit fullscreen mode

INSTALL MOSQUITTO

We would need mosquitto to run an MQTT broker which will expose topics on which we can publish and subscribe data.
https://mosquitto.org/

    brew instal mosquitto
Enter fullscreen mode Exit fullscreen mode

START MOSQUITTO BROKER LOCALLY

    brew services start mosquitto
Enter fullscreen mode Exit fullscreen mode

CHECK IF WE CAN PUBLISH TO THE MOSQUITTO TOPIC

    mosquitto_pub -h localhost -p 1883 -t temperature -m "sample-msg-1"
Enter fullscreen mode Exit fullscreen mode

*SUBSCRIBE TO THE TOPIC AND TRY SENDING MESSAGES *

    mosquitto_sub -h localhost -p 1883 -t temperature 
Enter fullscreen mode Exit fullscreen mode

INSTALL THE MQTT CONNECTOR PLUGIN

      confluent-hub install confluentinc/kafka-connect-mqtt:latest
Enter fullscreen mode Exit fullscreen mode

RESTART AND START THE CONNECT SERVICE

      confluent local services connect stop && confluent local services connect start
Enter fullscreen mode Exit fullscreen mode

CHECK IF MQTT PLUGIN IS INSTALLED USING THE BELOW COMMAND

      curl -s "http://localhost:8083/connector-plugins"
Enter fullscreen mode Exit fullscreen mode

CONNECT KAFKA WITH MQTT BROKER USING THE BELOW COMMAND

curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '{
    "name" : "mqtt-source",
"config" : {
    "connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
    "tasks.max" : "1",
    "mqtt.server.uri" : "tcp://127.0.0.1:1883",
    "mqtt.topics" : "temperature",
    "kafka.topic" : "mqtt.temperature",
    "confluent.topic.bootstrap.servers": "localhost:9092",
    "confluent.topic.replication.factor": "1",
    "confluent.license":""
    }
}'
Enter fullscreen mode Exit fullscreen mode

CHECK IF THE CONNECTOR HAS A STATUS : RUNNING

      curl -s "http://localhost:8083/connectors"
      curl -s "http://localhost:8083/connectors/mqtt-source/status"
Enter fullscreen mode Exit fullscreen mode

*CREATE A KAFKA TOPIC *

    kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mqtt.temperature
Enter fullscreen mode Exit fullscreen mode

FINALLY TEST BY CREATING TWO CONSUMERS

   mosquitto_sub -h localhost -p 1883 -t temperature
   kafka-console-consumer --bootstrap-server localhost:9092 --topic mqtt.temperature --property print.key=true --from-beginning
Enter fullscreen mode Exit fullscreen mode

Now if we publish the data to the mqtt topic, both the consumers created above will receive the data pushed in the mqtt topic.

💖 💪 🙅 🚩
hariraghupathy
Hari Krishnan

Posted on May 22, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related