Part 2: Change Data Capture (CDC) for MongoDB with Debezium and Memphis.dev

atrifsik

avital trifsik

Posted on May 28, 2023

Part 2: Change Data Capture (CDC) for MongoDB with Debezium and Memphis.dev

This is part two of a series of blog posts on building a modern event-driven system using Memphis.dev.

In our last blog post, we introduced a reference implementation for capturing change data capture (CDC) events from a PostgreSQL database using Debezium Server and Memphis.dev. By replacing Apache Kafka with Memphis.dev, the solution substantially reduced the operational resources and overhead – saving money and freeing developers to focus on building new functionality.

PostgreSQL is the only commonly used database, however. Debezium provides connectors for a range of databases, including the non-relational document database MongoDB. MongoDB is popular with developers, especially those working in dynamic programming languages since it avoids the object-relational impedance mismatch. Developers can directly store, query, and update objects in the database.

In this blog post, we demonstrate how to adapt the CDC solution to MongoDB.


Overview of the Solution

Here, we describe the architecture of the reference solution for delivering change data capture events with Memphis.dev. The architecture has not changed from our previous blog post except for the replacement of PostgreSQL with MongoDB.

A Todo Item generator script writes randomly-generated records to MongoDB. Debezium Server receives CDC events from MongoDB and forwards them to the Memphis REST gateway through the HTTP client sink. The Memphis REST gateway adds the messages to a station in Memphis.dev. Lastly, a consumer script polls Memphis.dev for new messages and prints them to the console.

  1. Todo Item Generator: Inserts a randomly-generated todo item in the MongoDB collection every 0.5 seconds. Each todo item contains a description, creation timestamp, optional due date, and completion status.
  2. MongoDB: Configured with a single database containing a single collection (todo_items).
  3. Debezium Server: Instance of Debezium Server configured with MongoDB source and HTTP Client sink connectors.
  4. Memphis.dev REST Gateway: Uses the out-of-the-box configuration.
  5. Memphis.dev: Configured with a single station (todo-cdc-events) and single user (todocdcservice)
  6. P*rinting Consumer*: A script that uses the Memphis.dev Python SDK to consume messages and print them to the console

Image description


Getting Started

The implementation tutorial is available in the mongodb-debezium-cdc-example directory of the Memphis Example Solutions repository. Docker Compose will be needed to run it.

Running the Implementation
Build the Docker images for Debezium Server, the printing consumer, and database setup (table and user creation).

Currently, the implementation depends on a pre-release version of Debezium Server for the JWT authentication support. A Docker image will be built directly from the main branch of the Debezium and Debezium Server repositories. Note that this step can take quite a while (~20 minutes) to run. When Debezium Server 2.3.0 is released, we will switch to using the upstream Docker image.

Step 1: Build the Images

$ docker compose build --pull --no-cache
Enter fullscreen mode Exit fullscreen mode

Step 2: Start the Memphis.dev Broker and REST Gateway
Start the Memphis.dev broker and REST gateway. Note that the memphis-rest-gateway service depends on the memphis broker service, so the broker service will be started as well.

$ docker compose up -d memphis-rest-gateway
Enter fullscreen mode Exit fullscreen mode
[+] Running 4/4
 ⠿ Network mongodb-debezium-cdc-example_default                   Created                                                        0.0s
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1      Healthy                                                        6.0s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1               Healthy                                                       16.8s
 ⠿ Container mongodb-debezium-cdc-example-memphis-rest-gateway-1  Started
Enter fullscreen mode Exit fullscreen mode

Step 3: Create a Station and Corresponding User in Memphis.dev
Messages are delivered to “stations” in Memphis.dev; they are equivalent to “topics” used by message brokers. Point your browser at http://localhost:9000/. Click the “sign in with root” link at the bottom of the page.

Image description

Log in with root (username) and memphis (password).

Image description

Follow the wizard to create a station named todo-cdc-events.

Image description

Create a user named todocdcservice with the same value for the password.

Image description

Click “next” until the wizard is finished:

Image description

Click “Go to station overview” to go to the station overview page.

Image description

Step 4: Start the Printing Consumer
We used the Memphis.dev Python SDK to create a consumer script that polls the todo-cdc-events station and prints the messages to the console.

$ docker compose up -d printing-consumer
Enter fullscreen mode Exit fullscreen mode
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container printing-consumer                                Started                                                            1.4s
Enter fullscreen mode Exit fullscreen mode

Step 5: Starting and Configuring MongoDB
To capture changes, MongoDB’s replication functionality must be enabled. There are several steps:

  • The replica set name must be set. This can be done by passing the name of a replica set on the command-line or in the configuration file. In the Docker Compose file, we run MongoDB with the command-line argument –replSet rs0 to set the replica set name.

  • When replication is used and authorization is enabled, a common key file must be provided to each replica instance. We generated a key file following the instructions in the MongoDB documentation. We then built an image that extends the official MongoDB image by including the key file.

  • The replica set needs to be initialized once MongoDB is running. We use a script that configures the instance on startup. The script calls the replSetInitiate command with a list of the IP addresses and ports of each MongoDB instance in the replica set. This command causes the MongoDB instances to communicate with each other and select a leader.

Generally speaking, replica sets are used for increased reliability (high availability). Most documentation that you’ll find describes how to set up a replica set with multiple MongoDB instances. In our case, Debezium’s MongoDB connector piggybacks off of the replication functionality to capture data change events. Although we go through the steps to configure a replica set, we only use one MongoDB instance.

The todo item generator script creates a new todo item every half second. The field values are randomly generated. The items are added to a MongoDB collection named “todo_items.”

In the Docker Compose file, the todo item generator script is configured to depend on the Mongodb instance running in a healthy state and successful completion of the database setup script. By starting the todo item generator script, Docker Compose will also start MongoDB and run the database setup script.

$ docker compose up -d todo-generator
Enter fullscreen mode Exit fullscreen mode
[+] Running 3/3
 ⠿ Container mongodb                 Healthy                                                                                     8.4s
 ⠿ Container mongodb-database-setup  Exited                                                                                      8.8s
 ⠿ Container mongodb-todo-generator  Started                                                                                     9.1s
Enter fullscreen mode Exit fullscreen mode

Step 6: Start the Debezium Server
The last service that needs to be started is the Debezium Server. The server is configured with a source connector for MongoDB and the HTTP Client sink connector through a Java properties file:

debezium.sink.type=http
debezium.sink.http.url=http://memphis-rest-gateway:4444/stations/todo-cdc-events/produce/single
debezium.sink.http.time-out.ms=500
debezium.sink.http.retries=3
debezium.sink.http.authentication.type=jwt
debezium.sink.http.authentication.jwt.username=todocdcservice
debezium.sink.http.authentication.jwt.password=todocdcservice
debezium.sink.http.authentication.jwt.url=http://memphis-rest-gateway:4444/
debezium.source.connector.class=io.debezium.connector.mongodb.MongoDbConnector
debezium.source.mongodb.connection.string=mongodb://db
debezium.source.mongodb.user=root
debezium.source.mongodb.password=mongodb
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.topic.prefix=tutorial
debezium.format.key=json
debezium.format.value=json
quarkus.log.console.json=false
Enter fullscreen mode Exit fullscreen mode

Most of the options are self-explanatory. The HTTP client sink URL is worth explaining in detail. Memphis.dev REST gateway expects to receive POST requests with a path in the following format:
/stations/{station}/produce/{quantity}

The {station} placeholder is replaced with the name of the station to send the message to. The {quantity} placeholder is replaced with the value single (for a single message) or batch (for multiple messages).

The message(s) is (are) passed as the payload of the POST request. The REST gateway supports three message formats (plain text, JSON, or protocol buffer). The value (text/, application/json, application/x-protobuf) of the content-type header field determines how the payload is interpreted.

The Debezium Server’s HTTP Client sink produces REST requests that are consistent with these patterns. Requests use the POST verb, each request contains a single JSON-encoded message as the payload, and the content-type header set to application/json. We use todo-cdc-events as the station name and the single quantity value in the endpoint URL to route messages and indicate how the REST gateway should interpret the requests:

http://memphis-rest-gateway:4444/stations/todo-cdc-events/produce/single

The debezium.sink.http.authentication.type=jwt property indicates that the HTTP Client sink should use JWT authentication. The username and password properties are self-evident, but the debezium.sink.http.authentication.jwt.url property deserves some explanation. An initial token is acquired using the /auth/authenticate endpoint, while the authentication is refreshed using the separate /auth/refreshToken endpoint. The JWT authentication in the HTTP Client appends the appropriate endpoint to the given base URL.

Debezium Server can be started with the following command:

$ docker compose up -d debezium-server
Enter fullscreen mode Exit fullscreen mode

Step 7: Confirm the System is Working
Check the todo-cdc-events station overview screen in Memphis.dev web UI to confirm that the producer and consumer are connected and messages are being delivered.

Image description

And, print the logs for the printing-consumer container:

Enter fullscreen mode Exit fullscreen mode

message:

bytearray(b'{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"before"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"struct","fields":[{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"removedFields"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"updatedFields"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"field"},{"type":"int32","optional":false,"field":"size"}],"optional":false,"name":"io.debezium.connector.mongodb.changestream.truncatedarray","version":1},"optional":true,"field":"truncatedArrays"}],"optional":true,"name":"io.debezium.connector.mongodb.changestream.updatedescription","version":1,"field":"updateDescription"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"collection"},{"type":"int32","optional":false,"field":"ord"},{"type":"string","optional":true,"field":"lsid"},{"type":"int64","optional":true,"field":"txnNumber"},{"type":"int64","optional":true,"field":"wallTime"}],"optional":false,"name":"io.debezium.connector.mongo.Source","field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"tutorial.todo_application.todo_items.Envelope"},"payload":{"before":null,"after":"{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ec\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402475},\\"due_date\\": {\\"$date\\": 1684266602475},\\"description\\": \\"GMZVMKXVKOWIOEAVRYWR\\",\\"completed\\": false}","updateDescription":null,"source":{"version":"2.3.0-SNAPSHOT","connector":"mongodb","name":"tutorial","ts_ms":1684007402000,"snapshot":"false","db":"todo_application","sequence":null,"rs":"rs0","collection":"todo_items","ord":1,"lsid":null,"txnNumber":null,"wallTime":1684007402476},"op":"c","ts_ms":1684007402478,"transaction":null}}')
Enter fullscreen mode Exit fullscreen mode

Format of the CDC Messages

The incoming messages are formatted as JSON. The messages have two top-level fields (schema and payload). The schema describes the record schema (field names and types), while the payload describes the change to the record. The payload object itself contains two fields (before and after) indicating the value of the record before and after the change.

For MongoDB, Debezium Server encodes the record as a string of serialized JSON:

{
"before" : null,

"after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}"
}
Enter fullscreen mode Exit fullscreen mode

This will have implications on the downstream processing of messages, which we will describe in a future blog post in this series.


Congratulations! You now have a working example of how to capture data change events from a MongoDB database using Debezium Server and transfer the events to Memphis.dev for downstream processing.

Head over to Part 3: Transforming MongoDB CDC Event Messages to learn further.

In case you missed part 1:

Part 1: Integrating Debezium Server and Memphis.dev for Streaming Change Data Capture (CDC) Events


Follow Us to get the latest updates!
GithubDocsDiscord


Join 4500+ others and sign up for our data engineering newsletter.

💖 💪 🙅 🚩
atrifsik
avital trifsik

Posted on May 28, 2023

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related