Kotlin coroutine based KafkaProducer extension

viniciusccarvalho

Vinicius Carvalho

Posted on August 16, 2018

Kotlin coroutine based KafkaProducer extension

Getting rid of Callback hell in KafkaProducer

Kotlin's coroutines provides a nice way to write async code. It makes it easy to write and compose asynchronous computation using a very light-weight model.

This post is not about what coroutines are, the link with the docs have a very deep and easy to read explanation of that. Instead I'm offering a solution to using KafkaProducer.send method.

The issue is send() leverages a Callback strategy, and we all know that there's a special place in hell for those who use callbacks.

Fortunately Kotlin coroutines offer a solution: suspendCoroutine function, that allows us to transform a callback into a suspend function call.

Receiver functions are also another nice treat of Kotlin language. It allows us to augment regular types with custom functions.

I decided to call the new function dispatch instead of send because I find a bit confusing when people decide to extend original function using the same name, and imports can get a bit messy.

So the extension function you need to write is very simple:

suspend inline fun <reified K : Any, reified V : Any> KafkaProducer<K, V>.dispatch(record: ProducerRecord<K, V>) =
    suspendCoroutine<RecordMetadata> { continuation ->
        val callback = Callback { metadata, exception ->
            if (metadata == null) {
                continuation.resumeWithException(exception!!)
            } else {
                continuation.resume(metadata)
            }
        }
        this.send(record, callback)
    }
Enter fullscreen mode Exit fullscreen mode

Now you can just use it from your regular KafkaProducer instance:

val props = Properties()
props["bootstrap.servers"] = "localhost:9092"
props["key.serializer"] = StringSerializer::class.java
props["value.serializer"] = JacksonSerializer::class.java
val kafkaProducer = KafkaProducer<String, SensorReading>(props)
async {
    kafkaProducer.dispatch(ProducerRecord("sample", SensorReading("Bedroom", 72.0, false)))
}

Enter fullscreen mode Exit fullscreen mode

Just remember that you can only call a suspend function within the boundaries of a coroutine, hence the need for async, same could be achieved with launch or runBlocking for testing.

Happy coding!

💖 💪 🙅 🚩
viniciusccarvalho
Vinicius Carvalho

Posted on August 16, 2018

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related