Start Maxwell with Namespaced Topic Kafka Producer (Look for Idle Listeners in Kafka)
Montana Mendy
Posted on December 2, 2021
After running through the prerequisites, you will have:
- A AWS Aurora instance
- A Maxwell image named osheroff/maxwell
- AKafka service named kafka, listening on
kafka:9092
Start Maxwell with Namespaced Topic Kafka Producer
This is a slight variation of the prerequisite, AWS Aurora to Maxwell Kafka Producer.
In the prerequisite we ran Maxwell with the default Kafka Producer configuration which will produce messages on the Maxwell topic.
You'll want to get the number of messages in a Maxwell topic, you can run: bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic mytopic --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}'
In this example we are overriding the MAXWELL_OPTIONS
environment variable and specifying a dynamic topic name, so that Maxwell will route messages from each table to topics by the same name, namespaced by the database name.
docker run -it --rm \
--env MYSQL_USERNAME=AURORA_USERNAME \
--env MYSQL_PASSWORD=AURORA_PASSWORD \
--env MYSQL_HOST=AURORA_HOST \
--link kafka \
--env KAFKA_HOST=kafka \
--env KAFKA_PORT=9092 \
--env MAXWELL_OPTIONS="--kafka_topic=maxwell_%{database}_%{table}
--name maxwell \
osheroff/maxwell
Here's a graphical way of looking at things when it comes to Consumers
. Now let's get back to Maxwell.
This will be the output of Maxwell:
17:44:34,901 INFO ProducerConfig - ProducerConfig values:
request.timeout.ms = 30000
retry.backoff.ms = 100
buffer.memory = 33554432
ssl.truststore.password = null
batch.size = 16384
ssl.keymanager.algorithm = SunX509
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.key.password = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.provider = null
sasl.kerberos.service.name = null
max.in.flight.requests.per.connection = 5
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [kafka:9092]
client.id =
max.request.size = 1048576
acks = 1
linger.ms = 0
sasl.kerberos.kinit.cmd = /usr/bin/kinit
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
metadata.fetch.timeout.ms = 60000
ssl.endpoint.identification.algorithm = null
ssl.keystore.location = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
ssl.truststore.location = null
ssl.keystore.password = null
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
block.on.buffer.full = false
metrics.sample.window.ms = 30000
metadata.max.age.ms = 300000
security.protocol = PLAINTEXT
ssl.protocol = TLS
sasl.kerberos.min.time.before.relogin = 60000
timeout.ms = 30000
connections.max.idle.ms = 540000
ssl.trustmanager.algorithm = PKIX
metric.reporters = []
compression.type = none
ssl.truststore.type = JKS
max.block.ms = 60000
retries = 0
send.buffer.bytes = 131072
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
reconnect.backoff.ms = 50
metrics.num.samples = 2
ssl.keystore.type = JKS
17:44:34,952 INFO AppInfoParser - Kafka version : 0.9.0.1
17:44:34,952 INFO AppInfoParser - Kafka commitId : 23c69d62a0cabf06
17:44:35,012 INFO Maxwell - Maxwell v1.7.0 is booting (MaxwellKafkaProducer), starting at BinlogPosition[mysql-bin-changelog.000002:84337]
17:44:35,680 INFO MysqlSavedSchema - Restoring schema id 1 (last modified at BinlogPosition[mysql-bin-changelog.000002:3521])
17:44:38,991 INFO OpenReplicator - starting replication at mysql-bin-changelog.000002:84337
The process is now waiting for new data events and looking for idle Kafka listeners.
Start a consumer
(in another terminal window). This command will start an unnamed instance of Spotify/Kafka linked to the Kafka service, start a consumer, display existing messages from the Maxwell topic, and wait for new messages until you quit (which destroys the container):
docker run -it --rm --link kafka spotify/kafka /opt/kafka_2.11-0.10.1.0/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic maxwell_{AURORA_DATABASE}_{AURORA_TABLE} --from-beginning
Connect to the AWS Aurora instance, insert some records, and update some records. Data events from Maxwell will be printed in the Consumer terminal window:
{"database":"AURORA_DATABASE","table":"AURORA_TABLE","type":"update","ts":1484606003,"xid":1655558,"commit":true,"data":{"id":4,"first_name":"Mendy","last_name":"Montana"},"old":{"first_name":"Montana"}}
{"database":"AURORA_DATABASE","table":"AURORA_TABLE","type":"update","ts":1484606435,"xid":1658343,"commit":true,"data":{"id":4,"first_name":"Montana","last_name":"Mendy"},"old":{"first_name":"Tim"}}
{"database":"AURORA_DATABASE","table":"AURORA_TABLE","type":"update","ts":1484606451,"xid":1658455,"commit":true,"data":{"id":4,"first_name":"Tim","last_name":"Mendy"},"old":{"first_name":"Montana"}}
Posted on December 2, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
December 2, 2021