Transferring Commit Offset with MirrorMaker 2
keigodasu
Posted on October 25, 2021
MirrorMaker 2(MM2) is the tool to copy messages from one Kafka cluster to another. And current version of MM2 supports to transfer consumer commit offset to another cluster. Thanks to this feature, when failing over to another site, Consumer is able to consume messages from the last committed offset mirrored by MM2.
Prerequisite of this article
Run MM2 on Kafka Connect.
Steps
- Register MM2 with Kafka Connect
- Write Consumer on destination side with RemoteClusterUtils
1. Register MM2 with Kafka Connect
Need two connectors for transferring consumer commit offset. For details about parameters, go to the MM2 document
MirrorSourceConnector
replicates a set of topics from a source cluster into a destination cluster
curl -s -XPOST -H 'Content-Type: application/json' -H 'Accept: application/json' http://<kafka-connect>:8083/connectors -d'{
"name": "mirror-source-connector",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"source.cluster.alias": "kafka-source",
"target.cluster.alias": "kafka-dist",
"source.cluster.bootstrap.servers": "dc01-kafka01:9091",
"target.cluster.bootstrap.servers": "dc02-kafka01:9091",
"topics": ".*",
"backup.sync.group.offsets.enabled": true,
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}
}
MirrorCheckpointConnector
emits consumer offset and syncs the offset __consumer_offsets
. Synced consumer offset is stored in <source-cluster-alias>.checkpoints.internal
on the destination cluster.
curl -s -XPOST -H 'Content-Type: application/json' -H 'Accept: application/json' http://<kafka-connect>:8083/connectors -d'{
"name": "mirror-checkpoint-connector",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"sync.group.offsets.enabled": "true",
"source.cluster.alias": "kafka-source",
"target.cluster.alias": "kafka-dist",
"exclude.internal.topics":"false",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"source.cluster.bootstrap.servers": "dc01-kafka01:9091",
"target.cluster.bootstrap.servers": "dc02-kafka01:9091"
}
}
2. Write Consumer on destination side with RemoteClusterUtils
add dependency of RemoteClusterUtils
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-mirror-client</artifactId>
<version>3.0.0</version>
</dependency>
RemoteClusterUtils.translateOffsets
internally fetches synced offset from <source-cluster-alias>.checkpoints.internal
. Once get synced offset, consumer on the destination side is able to continue operations by setting the offset via seek
API.
public class RemoteUtilTest {
private static final Properties props = new Properties();
@SuppressWarnings("InfiniteLoopStatement")
public static void main(final String[] args) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "dc02-kafka01:9091");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "dist-consumer-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// config for MM2 (bootstrap.servers in destination side is required)
Map<String, Object> mmConfig = new HashMap<>();
mmConfig.put("bootstrap.servers", "dc02-kafka01:9091");
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
// get synced consumer offset
Map<TopicPartition, OffsetAndMetadata> destinationOffsetsMap = RemoteClusterUtils.translateOffsets(mmConfig, "kafka-source", "source-consumer-group", Duration.ofMillis(10000));
destinationOffsetsMap.forEach(((topicPartition, offsetAndMetadata) -> System.out.printf("topicPartiion: %s, offsetAndMetadata: %s%n", topicPartition, offsetAndMetadata) ));
// set the offset
List<TopicPartition> topicPartitions = destinationOffsetsMap.keySet().stream().collect(Collectors.toList());
destinationOffsetsMap.forEach(((topicPartition, offsetAndMetadata) -> consumer.assign(topicPartitions)));
destinationOffsetsMap.forEach(((topicPartition, offsetAndMetadata) -> consumer.seek(topicPartition, offsetAndMetadata)));
while (true) {
final ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (final ConsumerRecord<String, String> record : records) {
final String key = record.key();
final String value = record.value();
System.out.printf("key = %s, value = %s%n", key, value);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
Posted on October 25, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.