Using Confluent Cloud when there is no Cloud (or internet)

rmoff

Robin Moffatt

Posted on April 23, 2020

Using Confluent Cloud when there is no Cloud (or internet)

☁️Confluent Cloud is a great solution for a hosted and managed Apache Kafka service, with the additional benefits of Confluent Platform such as ksqlDB and managed Kafka Connect connectors. But as a developer, you won’t always have a reliable internet connection. Train, planes, and automobiles—not to mention crappy hotel or conference Wi-Fi. Wouldn’t it be useful if you could have a replica of your Cloud data on your local machine? That just pulled down new data automagically, without needing to be restarted each time you got back on the network?

Let me show you here how you can go about doing this, to replicate one (or more) topics from Confluent Cloud onto your local machine. It’s also a really useful thing if you want to develop something locally without perhaps being ready to deploy it against your cloud environment.

How?

Confluent Replicator is a Kafka Connect plugin, acting as a consumer from one Kafka cluster (Confluent Cloud) and producer to another (your local Kafka cluster). I use Docker Compose to run Kafka locally, almost exclusively. It’s a piece of cake to provision, spin up - and tear down new environments, in isolation from others.

Setup

Create a .env file with your Confluent Cloud broker details and credentials in it:

CCLOUD_BROKER_HOST=foo.bar.bork.bork.bork.us-central1.gcp.confluent.cloud:9092
CCLOUD_API_KEY=yyy
CCLOUD_API_SECRET=xxx
Enter fullscreen mode Exit fullscreen mode

Use these environment variables in your local shell (we’ll use them with Docker later, hence writing them to a file for re-use)

source .env
Enter fullscreen mode Exit fullscreen mode

Create some test data on Confluent Cloud

Chuck some dummy data into a Confluent Cloud topic:

echo $(date) | \
    kafkacat -b $CCLOUD_BROKER_HOST \
             -X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \
             -X sasl.username="$CCLOUD_API_KEY" -X sasl.password="$CCLOUD_API_SECRET" \
             -X api.version.request=true \
             -t test_topic -P
Enter fullscreen mode Exit fullscreen mode

Verify that it’s there:

➜ kafkacat -b $CCLOUD_BROKER_HOST \
              -X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \
              -X sasl.username="$CCLOUD_API_KEY" -X sasl.password="$CCLOUD_API_SECRET" \
              -X api.version.request=true \
              -t test_topic -C -e
Fri 17 Apr 2020 18:03:17 BST
Enter fullscreen mode Exit fullscreen mode

Create local Kafka cluster

Now spin up yourself a local Kafka cluster using this Docker Compose:

➜ docker-compose up -d
➜ docker-compose ps
   Name Command State Ports
---------------------------------------------------------------------------------------------
kafka-1 /etc/confluent/docker/run Up 0.0.0.0:9092->9092/tcp
kafka-2 /etc/confluent/docker/run Up 0.0.0.0:19092->19092/tcp, 9092/tcp
kafka-3 /etc/confluent/docker/run Up 0.0.0.0:29092->29092/tcp, 9092/tcp
replicator /etc/confluent/docker/run Up 0.0.0.0:58083->58083/tcp, 8083/tcp, 9092/tcp
zookeeper /etc/confluent/docker/run Up 2181/tcp, 2888/tcp, 3888/tcp
Enter fullscreen mode Exit fullscreen mode

Set up Replicator

First, we’ll check that the Replicator container has started and is ready. Replicator runs as a plugin to Kafka Connect, so we use its API for interacting with Replicator.

echo "Waiting for Kafka Connect to start listening on localhost:58083 ⏳"
while : ; do
    curl_status=$(curl -s -o /dev/null -w %{http_code} http://localhost:58083/connectors)
    echo -e $(date) " Kafka Connect listener HTTP state: " $curl_status " (waiting for 200)"
    if [$curl_status -eq 200] ; then
    break
    fi
    sleep 5
done
Enter fullscreen mode Exit fullscreen mode

Should show:

Mon 20 Apr 2020 12:32:37 BST Kafka Connect listener HTTP state: 200 (waiting for 200)
Enter fullscreen mode Exit fullscreen mode

We can also verify that the Replicator plugin has been loaded correctly:

curl -s localhost:58083/connector-plugins|jq '.[].class'|grep -q io.confluent.connect.replicator.ReplicatorSourceConnector
if [$? -eq 0]; then 
   echo "Replicator plugin is correctly loaded ✅"
else
   echo "😢 Replicator plugin is not loaded. Please check the Kafka Connect worker logs and installation steps"
fi
Enter fullscreen mode Exit fullscreen mode

Should show:

Replicator plugin is correctly loaded ✅
Enter fullscreen mode Exit fullscreen mode

Now send the config to the Kafka Connect worker

source .env

epoch=$(date +%s)

curl -s -X PUT -H "Accept:application/json" \
    -H "Content-Type:application/json" "http://localhost:58083/connectors/replicator-source"$epoch"/config" \
    -d '
        {
        "connector.class" : "io.confluent.connect.replicator.ReplicatorSourceConnector",
        "key.converter" : "io.confluent.connect.replicator.util.ByteArrayConverter",
        "value.converter" : "io.confluent.connect.replicator.util.ByteArrayConverter",
        "header.converter" : "io.confluent.connect.replicator.util.ByteArrayConverter",
        "src.kafka.bootstrap.servers" : "'$CCLOUD_BROKER_HOST':9092",
        "src.kafka.security.protocol" : "SASL_SSL",
        "src.kafka.sasl.mechanism" : "PLAIN",
        "src.kafka.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"'$CCLOUD_API_KEY'\" password=\"'$CCLOUD_API_SECRET'\";",
        "src.consumer.group.id" : "replicator-'$epoch'",
        "dest.kafka.bootstrap.servers": "kafka-1:39092,kafka-2:49092,kafka-3:59092",
        "topic.whitelist" : "test_topic",
        "topic.rename.format" :"${topic}",
        "confluent.license" :"",
        "confluent.topic.bootstrap.servers":"kafka-1:39092,kafka-2:49092,kafka-3:59092",
        "confluent.topic.replication.factor":1,
        "offset.start" :"consumer"
        }'
Enter fullscreen mode Exit fullscreen mode

Check that it’s running:

➜ curl -s "http://localhost:58083/connectors?expand=info&expand=status" | \
       jq '. | to_entries[] | [.value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
       column -s : -t| sed 's/\"//g'| sort
source | replicator-source1587382706 | RUNNING | RUNNING | io.confluent.connect.replicator.ReplicatorSourceConnector
Enter fullscreen mode Exit fullscreen mode

Check that we’ve got data:

➜ kafkacat -b localhost:29092 -t test_topic -C -e
Fri 17 Apr 2020 18:03:17 BST
Enter fullscreen mode Exit fullscreen mode

So now when data gets sent to Confluent Cloud, we get it also pushed to our local Kafka cluster.

Restarting and dealing with network glitches

The cool thing about Kafka, and Kafka Connect, is that it keeps track of where a particular consumer has read up to in a topic. Replicator therefore will read from a topic whilst it’s running, and if you stop and restart it, it’ll just catch up from where it got to before it stopped.

The same principle applies to if your local machine goes off the network, or perhaps just goes through some patchy connectivity. If it can connect to the source cluster (Confluent Cloud), it will do so. If it can’t, it’ll just keep trying and carry on again once it can do.

Ingesting a fresh copy of the data

So there’s restarting, and then there’s restarting. What if instead of wanting to restart the connector (we rebooted the machine, made a config change, whatever) we want to actually start afresh and start a new replication of the topic from Confluent Cloud?

Because of the clever way Kafka Connect uses the Kafka consumer group protocol to track offsets, if you were to delete the replicator configuration and create it afresh, it would still carry on from where it got to before! You can see the consumer group name (and consumption progress) in Confluent Cloud UI:

replicator01

For this reason you may have noticed in the config that we ran above the use of epoch in the configuration name and, most importantly, src.consumer.group.id. This is just one way of ensuring a unique group name tied to this particular instance of the replicator. We can then choose to provision a new one if we want to start afresh, or restart an existing one.

Whilst you’re there in the Confluent Cloud UI you can check out the detailed view of the progress of a particular consumer group

replicator02

Changing the target topic

There’s a bunch of parameters that you can set with Replicator. One particularly useful one is to modify the name of the target topic that Replicator writes to. Here’s an example of routing a source topic to a target one that includes the identifier (epoch) of the Replicator that wrote it

"topic.rename.format":"${topic}-ccloud-'$epoch'",
Enter fullscreen mode Exit fullscreen mode

The resulting topic name goes from test_topic on the source (Confluent Cloud) to test_topic-ccloud-1587388241 on our target local cluster

➜ kafkacat -b localhost:29092 -t test_topic-ccloud-1587388241 -C -q -u -o end
Here's a test message, sent at Mon 20 Apr 2020 14:14:09 BST
Enter fullscreen mode Exit fullscreen mode

Storing credentials safely

In the example above we passed the credentials for Confluent Cloud just as environment variables to Kafka Connect, which is not great from a security point of view. Instead we could use external secrets. Note that the Replicator docker container has the necessary config.providers settings to enable this, and that we’ve mounted out local .env file into the container.

…
      CONNECT_CONFIG_PROVIDERS: 'file'
      CONNECT_CONFIG_PROVIDERS_FILE_CLASS: 'org.apache.kafka.common.config.provider.FileConfigProvider'
    volumes: 
      - ./.env:/opt/config
Enter fullscreen mode Exit fullscreen mode

Now when we create the replicator we can reference the file and attributes within it:

epoch=$(date +%s)
curl -s -X PUT -H "Accept:application/json" \
    -H "Content-Type:application/json" "http://localhost:58083/connectors/replicator-source"$epoch"/config" \
    -d '
        {
        "connector.class": "io.confluent.connect.replicator.ReplicatorSourceConnector",
        "key.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
        "value.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
        "header.converter": "io.confluent.connect.replicator.util.ByteArrayConverter",
        "src.kafka.bootstrap.servers": "${file:/opt/config:CCLOUD_BROKER_HOST}",
        "src.kafka.security.protocol": "SASL_SSL",
        "src.kafka.sasl.mechanism": "PLAIN",
        "src.kafka.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"${file:/opt/config:CCLOUD_API_KEY}\" password=\"${file:/opt/config:CCLOUD_API_SECRET}\";",
        "src.consumer.group.id": "replicator-'$epoch'",
        "dest.kafka.bootstrap.servers": "kafka-1:39092,kafka-2:49092,kafka-3:59092",
        "topic.whitelist": "test_topic",
        "topic.rename.format":"${topic}",
        "confluent.license":"",
        "confluent.topic.bootstrap.servers":"kafka-1:39092,kafka-2:49092,kafka-3:59092",
        "confluent.topic.replication.factor":1,
        "offset.start":"consumer"
        }' 
Enter fullscreen mode Exit fullscreen mode

It’s not just for Cloud

You can use Replicator between any two Apache Kafka clusters, and Confluent Control Center to give you the same consumer group monitoring view that I showed above.

Try it out by downloading Confluent Platform.

Further reading

💖 💪 🙅 🚩
rmoff
Robin Moffatt

Posted on April 23, 2020

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related