Publish PostgresSQL Data Changes to React with KsqlDB and MQTT

kwonghungyip

Kwong-Hung YIP

Posted on March 21, 2024

Publish PostgresSQL Data Changes to React with KsqlDB and MQTT

The cover image illustrates the aim of this post. First, upon executing an update SQL in the backend PostgresSQL database, a table cell in the React frontend was almost immediately updated and highlighted, showing the new value provided in the update SQL. The data streaming flow was composed using Kafka Connect, KsqlDB and HiveMQ MQTT broker, with all source code is available in this github repo.

Architecture

Architecture

  • All components in the architecture diagram are defined in the docker-compose.yml file. The setup aims to capture the data changes in PostgresSQL database and publish it into the React frontend. To mininize the delay, the client does not take the long polling approach for fetching the data, instead, the back-end aggresively pushes the messages to client via the MQTT v5 protocol, whenever a DB changes is detected.

  • Another considertion is to utilitze product feature as much as possible, and reduce development effort. Apart from Next.js routing and React frontend, the back-end implementation only invokes PostgresSQL and ksqlDB scripts.

  • With the "cp-server:7.5.3" docker image, the Kafka broker can be run in KRaft mode, no longer depending on ZooKeeper.

  • During Kafka Connect setup, the confluent hub client was used to install the connectors and SMT transfomers. To setup the JDBC source connector, the PostgresSQL JDBC driver must be download into the docker image.

Data stream flow

PostgresSQL

  • The flow starts from PostgresSQL database on left hand side of data stream flow diagram. The "horse", "jockey", and "race" tables persist the basic entities, linked by the "race_horse_jockey" table. The "v_race_horse" view compose all four tables as a comprehensive view. Finally, the "odds_forecast" table persist odds data, and changes to this table are published to the React frontend.

  • All tables, view, procedures and data initiation are defined in the create-table.sql, executed when starting the Postgres docker image.

  • The JDBC source connector captures the changes from the v_race_horse view, race and odds_forecast table, and publishing them to corresponding "postgres_src_" Kafka topics.

KsqlDB and Kafka Connect

  • The different between KTable and KStream is when both receiving multiple messages with the same message key, KTable keeps the latest message only, but KStream keeps the entire history of the stream. Based on the data feature, "v_race_horse" view and "race" table are imported as a KTable, and "odds_forecast" table is imported as a KStream to capture every odds changes being published.

  • The odds KStream, race_horse_tbl and race KTables were created with CREATE STREAM/CREATE TABLE command. The command requires a source Kafka topic as a parameter, those KStream/KTable are serve as the input to KsqlDB.

  • On the other hand, the CREATE STREAM AS SELECT/CREATE TABLE AS SELECT command create KStream/KTable (KStream/KTable-query in Data stream flow diagram) as the output of KsqlDB, the new Kafka topics associate with the new KStream/KTable were the source topic of the Redis and MQTT sink connector.

  • KsqlDB also provide the CREATE CONNECTOR command to define a new Kafka Connect connector. It is not necessary to install KsqlDB if you are only interest in using the connectors, you can skip KsqlDB by using the REST API provided by Kafka Connect also provides a REST API to manage the connectors. However, for convenience, all KStream, KTable, and connector creation were defined in the init.sql, executed after the docker-compose.yml is started.

RedisJSON and RedisSearch

  • There are two Redis sink connectors using to publish data: “my-redis-sink-json” publishes the horse list of each race in JSON format, and “my-redis-sink-avro” publishes all odds in AVRO format. Data published into Redis are stored as an key-pair value, the key format is ":", and the value format is native JSON supported by RedisJSON. You can refer to json.set command for how to create and update JSON value in Redis.

  • RedisSearch support indexing and searching native JSON in Redis, the ft.create command is for creating "horse" and "odds" indexes for searching later.

Next.js

  • Route Handler of Next.js using to serve as the API backend to query the horse list and entire odds table by race date and race no. The API searches native JSON by ft.search command, horse an odds indexes.

  • Next.js also hosts the React client components, with the data fetching feature, it calls the API and inject the horse list and odds table into the client component, it also allows to control the caching of the API calls.

MQTT

  • The "odds_forecast__" KStream-queries read odds for a particular race from "odds_forecast" KStream and publish them into corresponding "odds-forecast--" topics. The MQTT sink connector then exports odds to corresponding MQTT topic in HiveMQ. Due to limitations of the MQTT sink connector, it cannot exporting to MQTT topic dynamically based on properties in message payload. It is also not allowed to use MQTT hierarchy topic because the Kafka topic name is mapped to MQTT topic directly, but slash “/” character are invalid to use in a Kafka topic name. KStream plus Spring Integration could an alternative to address these limitation, which can also customize the retention period based on the data type.

  • The React frontend only shows one odds table of a particular race. The client instance created from MQTT.js library is kept in the useContext() hook and available globally. The client only subscribes to a single MQTT topic, reducing the unnecessary traffic pushing to client. With the useEffect() hook, the component responds to unsubscribe the topic for housekeeping the connection in HiveMQ server.

React frontend

  • Finally, on the right-hand side of data stream flow diagram, which is the React web front-end. All odds data and changes must be reflected in the React state to be presented to user.

Odds table in React frontend

  • The URL "…/odds//" determines the odds table being displayed, the parameters are captured by Next.js Dynamic Routes. The horse list and entire odds table are injected into the React component by Next.js Data Fetching feature. After rendering the odds table, the MQTT client subscribes to the corresponding MQTT topic. Whenever an odds message is received, it updates the React state, and the corresponding cell is highlighted in yellow.

Further Enhancements

  • Deploy the infrastructure to Kubernetes, possible with multi-nodes in different layers (Kafka, redis, HiveMQ).

  • Replace MQTT sink connector with KStream + Spring Integration. Support dynamic topic mapping, topic hierarchy, and customize retention period.

  • Replace JDBC source connector with Debezium connector.

  • Replace MQTT message push with HTTP2 streaming, but I could not find the HTTP2 Sink connector.

  • Enable OAuth authentication in different levels.

💖 💪 🙅 🚩
kwonghungyip
Kwong-Hung YIP

Posted on March 21, 2024

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related