Spring Cloud Stream Kafka step by step

rogervinas

Roger Viñas Alcon

Posted on April 9, 2021

Spring Cloud Stream Kafka step by step

Spring Cloud Stream is the solution provided by Spring to build applications connected to shared messaging systems.

It offers an abstraction (the binding) that works the same whatever underneath implementation we use (the binder):

  • Apache Kafka
  • Rabbit MQ
  • Kafka Streams
  • Amazon Kinesis
  • ...

Let's try to set up a simple example step by step and see how it works!

You can clone this demo from:

GitHub logo rogervinas / spring-cloud-stream-kafka-step-by-step

🍀 Spring Cloud Stream Kafka - step by step

Step by step:

Producer with functional programming model

Our final goal is to produce messages to a Kafka topic.

From the point of view of the application we want an interface MyEventProducer to produce events to a generic messaging system. These events will be of type MyEvent, just containing a text field to make it simpler:



data class MyEvent(val text: String)

interface MyEventProducer { 
  fun produce(event: MyEvent)
}


Enter fullscreen mode Exit fullscreen mode

Then we follow these steps:

1) We configure the binding my-producer in application.yml:



spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: "localhost:9094"
      bindings:
        my-producer-out-0:
          destination: "my.topic"
    function:
      definition: "my-producer"


Enter fullscreen mode Exit fullscreen mode
  • Everything under spring.cloud.kafka.binder is related to the Kafka binder implementation and we can use all these extra Kafka binder properties.
  • Everything under spring.cloud.stream.bindings is related to the Spring Cloud Stream binding abstraction and we can use all these extra binding properties.
  • As stated in functional binding names: my-producer is the function name, out is for output bindings and 0 is the index we have to use if we have a single function.

2) We create an implementation of MyEventProducer as a Kotlin lambda () -> Flux<MyEventPayload>, to fulfill the interfaces that both our application and Spring Cloud Stream are expecting:



class MyStreamEventProducer : () -> Flux<MyEventPayload>, MyEventProducer {
  private val sink = Sinks.many().unicast().onBackpressureBuffer<MyEventPayload>()

  override fun produce(event: MyEvent) {
    sink.emitNext(toPayload(event), FAIL_FAST)
  }

  override fun invoke() = sink.asFlux()

  private fun toPayload(event: MyEvent) = MyEventPayload(event.text, event.text.length)
}

data class MyEventPayload(
  val string: String,
  val number: Int
)


Enter fullscreen mode Exit fullscreen mode
  • We use a DTO MyEventPayload to specify how do we want the payload to be serialized to JSON. In this case we don't need to but we could use Jackson annotations if we wanted to customize the JSON serialization.
  • We do a simple transformation between MyEvent and MyEventPayload just as an example.
  • Every time we emit a MyEventPayload through the Flux, Spring Cloud Stream will publish it to Kafka.

3) Finally, we configure the beans needed to link my-producer function definition:



@Configuration
class MyConfiguration { 
  @Bean
  fun myStreamEventProducer() = MyStreamEventProducer()

  @Bean("my-producer")
  fun myStreamEventProducerFunction(producer: MyStreamEventProducer): () -> Flux<MyEventPayload> = producer
}


Enter fullscreen mode Exit fullscreen mode
  • Both beans return the same instance ... why?
    • We need an instance with type MyStreamEventProducer that will be injected wherever a MyEventProducer is needed.
    • We need an instance with type () -> Flux<MyEventPayload> that will be bound to my-producer function.
    • As we are using Kotlin we need to define it as a lambda (required by KotlinLambdaToFunctionAutoConfiguration).
    • If we were using Java we should define it as Supplier<Flux<MyEventPayload>>.

4) For testing we start a Kafka container using Testcontainers:



@SpringBootTest(webEnvironment = NONE)
@Testcontainers
@ActiveProfiles("test")
class MyApplicationIntegrationTest {
  @Autowired // We inject MyEventProducer (it should be a MyStreamEventProducer)
  @Qualifier("myStreamEventProducer") // Avoid SpringBootTest issue: expected single matching bean but found 2  
  lateinit var eventProducer: MyEventProducer

  @Test
  fun `should produce event`() {
    // We produce an event using MyEventProducer
    val text = "hello ${UUID.randomUUID()}"
    eventProducer.produce(MyEvent(text))

    // We consume from Kafka using a helper
    val records = consumerHelper.consumeAtLeast(1, FIVE_SECONDS)

    // We verify the received json
    assertThat(records).singleElement().satisfies { record ->
      JSONAssert.assertEquals(
        record.value(),
        "{\"number\":${text.length},\"string\":\"$text\"}",
        true
      )
    }
  }
}


Enter fullscreen mode Exit fullscreen mode

Consumer with functional programming model

Our final goal is to consume messages from a Kafka topic.

From the point of view of the application we want an interface MyEventConsumer to be called every time an event is consumed from a generic messaging system. These events will be of type MyEvent like in the producer example:



data class MyEvent(val text: String)

interface MyEventConsumer {
  fun consume(event: MyEvent)
}


Enter fullscreen mode Exit fullscreen mode

Then we follow these steps:

1) We configure the binding my-consumer in application.yml declaring it as a function:



spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: "localhost:9094"
      bindings:
        my-consumer-in-0:
          destination: "my.topic"
          group: "${spring.application.name}"
    function:
      definition: "my-consumer"


Enter fullscreen mode Exit fullscreen mode
  • Remember that everything under spring.cloud.kafka.binder is related to the Kafka binder implementation and we can use all these extra Kafka binder properties and everything under spring.cloud.stream.bindings is related to the Spring Cloud Stream binding abstraction and we can use all these extra binding properties.
  • We configure a group because we want the application to consume from Kafka identifying itself as a consumer group so if there were to be more than one instance of the application every message will be delivered to only one of the instances.
  • As stated in functional binding names: my-consumer is the function name, in is for input bindings and 0 is the index we have to use if we have a single function.

2) We create MyStreamEventConsumer to fulfill the interface required by Spring Cloud Stream:



class MyStreamEventConsumer(private val consumer: MyEventConsumer) : (MyEventPayload) -> Unit {
  override fun invoke(payload: MyEventPayload) {
    consumer.consume(fromPayload(payload))
  }

  private fun fromPayload(payload: MyEventPayload) = MyEvent(payload.string)
}


Enter fullscreen mode Exit fullscreen mode
  • Every time a new message is received in the Kafka topic, its payload will be deserialized to a MyEventPayload and the invoke method will we called.
  • Then the only thing we have to do is to transform the MyEventPayload to a MyEvent and callback the generic MyEventConsumer.

3) Finally, we configure the beans needed to link my-consumer function definition:



@Configuration
class MyConfiguration {
  @Bean
  fun myEventConsumer() = object : MyEventConsumer {
    override fun consume(event: MyEvent) {
      println("Received ${event.text}")
    }
  }

  @Bean("my-consumer")
  fun myStreamEventConsumerFunction(consumer: MyEventConsumer): (MyEventPayload) -> Unit =
    MyStreamEventConsumer(consumer)
}


Enter fullscreen mode Exit fullscreen mode
  • We need an instance with type (MyEventPayload) -> Unit that will be bound to my-consumer function.
    • As we are using Kotlin we need to define it as a lambda (required by KotlinLambdaToFunctionAutoConfiguration).
    • If we were using Java we should define it as Consumer<MyEventPayload>.
  • We create a simple implementation of MyEventConsumer that just prints the event.

4) For testing we start a Kafka container using Testcontainers:



@SpringBootTest(webEnvironment = NONE)
@Testcontainers
@ActiveProfiles("test")
class MyApplicationIntegrationTest {
  @MockBean // We mock MyEventConsumer
  lateinit var eventConsumer: MyEventConsumer

  @Test
  fun `should consume event`() {
    val eventCaptor = argumentCaptor<MyEvent>()
    doNothing().`when`(eventConsumer).consume(eventCaptor.capture())

    // We send a Kafka message using a helper
    val text = "hello ${UUID.randomUUID()}"
    kafkaProducerHelper.send(TOPIC, "{\"number\":${text.length},\"string\":\"$text\"}")

    // We wait at most 5 seconds to receive the expected MyEvent in MyEventConsumer mock
    await().atMost(TEN_SECONDS).untilAsserted {
      assertThat(eventCaptor.allValues.filter { it.text == text }).isEqualTo(ONE)
    } 
  }
}


Enter fullscreen mode Exit fullscreen mode

Extras

Kafka Message Key

Kafka topics are partitioned to allow horizontal scalability.

When a message is sent to a topic, Kafka chooses randomly the destination partition. If we specify a key for the message, Kafka will use this key to choose the destination partition, then all messages sharing the same key will always be sent to the same partition.

This is important on the consumer side, because chronological order of messages is only guaranteed within the same partition, so if we need to consume some messages in the order they were produced, we should use the same key for all of them (i.e. for messages of a user, we use the user id as the message key).

To specify the message key in MyStreamEventProducer we can produce Message<MyEventPayload> instead of MyEventPayload and inform the KafkaHeaders.KEY header:



class MyStreamEventProducer : () -> Flux<Message<MyEventPayload>>, MyEventProducer {
  // ...
  override fun produce(event: MyEvent) {
    val message = MessageBuilder
      .withPayload(MyEventPayload(event.text, event.text.length))
      .setHeader(KafkaHeaders.KEY, "key-${event.text.length}")
      .build()
    sink.emitNext(message, FAIL_FAST)
  }
  // ...
}


Enter fullscreen mode Exit fullscreen mode

As we are setting a key of type String we should use a StringSerializer as key.serializer:



spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: "localhost:9094"
          producer-properties:
            key.serializer: "org.apache.kafka.common.serialization.StringSerializer"


Enter fullscreen mode Exit fullscreen mode

And we can test it like this:



@Test
fun `should produce event`() {
  val text = "hello ${UUID.randomUUID()}"
  eventProducer.produce(MyEvent(text))

  val records = kafkaConsumerHelper.consumeAtLeast(1, TEN_SECONDS)

  assertThat(records).singleElement().satisfies { record ->
    // check the message payload
    JSONAssert.assertEquals(
      record.value(),
      "{\"number\":${text.length},\"string\":\"$text\"}",
      true
    )
    // check the message key
    assertThat(record.key())
      .isEqualTo("key-${text.length}")
  }
}


Enter fullscreen mode Exit fullscreen mode
  • Alternatively we can use partitionKeyExpression and other related binding producer properties to achieve the same but at the binding abstraction level of Spring Cloud Stream.

Retries

If errors are thrown while consuming messages, we can tell Spring Cloud Stream what to do using the following binding consumer properties:

  • maxAttempts: number of retries
  • backOffInitialInterval, backOffMaxInterval, backOffMultiplier: backoff parameters to increase delay between retries
  • defaultRetryable, retryableExceptions: which exceptions retry or not

For example we can use this configuration:



spring:
  cloud:
    stream:
      bindings:
        my-consumer-in-0:
          destination: "my.topic"
          group: "${spring.application.name}"
          consumer:
            max-attempts: 5
            back-off-initial-interval: 100
            default-retryable: false
            retryable-exceptions:
              com.rogervinas.stream.domain.MyRetryableException: true            


Enter fullscreen mode Exit fullscreen mode

And we can test it like this:



@Test
fun `should retry consume event 5 times`() {
  // we throw a MyRetryableException every time we receive a message
  val eventCaptor = argumentCaptor<MyEvent>()
  doThrow(MyRetryableException("retry later!")).`when`(eventConsumer).consume(eventCaptor.capture())

  // we send a Kafka message using a helper
  val text = "hello ${UUID.randomUUID()}"
  kafkaProducerHelper.send(TOPIC, "{\"number\":${text.length},\"string\":\"$text\"}")

  // consumer has been called five times with the same message
  await().atMost(TEN_SECONDS).untilAsserted {
    assertThat(eventCaptor.allValues.filter { it.text == text }).isEqualTo(FIVE)
  }
}


Enter fullscreen mode Exit fullscreen mode

Dead Letter Queue

Additional to retries, DLQ is another mechanism we can use to deal with consumer errors.

In the case of Kafka it consists of sending to another topic all the messages that the consumer has rejected.

We can configure the DLQ using these Kafka binder consumer properties:

  • enableDlq: enable DLQ
  • dlqName:
    • not set: defaults to error.<destination>.<group>
    • set: use a specific DLQ topic
  • dlqPartitions:
    • not set: DLQ topic should have the same number of partitions as the original one
    • set to 0: DLQ topic should have only 1 partition
    • set to N>0: we should provide a DlqPartitionFunction bean

For example we can use this configuration:



spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: "localhost:9094"
        bindings:
          my-consumer-in-0:
            consumer:
              enable-dlq: true
              dlq-name: "my.topic.errors"
              dlq-partitions: 1    
      bindings:
        my-consumer-in-0:
          destination: "my.topic"
          group: "${spring.application.name}"


Enter fullscreen mode Exit fullscreen mode

And we can test it like this:

Application errors:



@Test
fun `should send to DLQ rejected messages`() {
  // we throw a MyRetryableException every time we receive a message
  doThrow(MyRetryableException("retry later!")).`when`(eventConsumer).consume(any())

  // we send a Kafka message using a helper
  val text = "hello ${UUID.randomUUID()}"
  kafkaProducerHelper.send(TOPIC, "{\"number\":${text.length},\"string\":\"$text\"}")

  // we check the message has been sent to the DLQ
  val errorRecords = kafkaDLQConsumerHelper.consumeAtLeast(1, TEN_SECONDS)
  assertThat(errorRecords).singleElement().satisfies { record ->
    JSONAssert.assertEquals(
      record.value(),
      "{\"number\":${text.length},\"string\":\"$text\"}",
      true
    )
  }
}


Enter fullscreen mode Exit fullscreen mode

Message deserialization errors:



@ParameterizedTest
@ValueSource(strings = [
  "plain text",
  "{\"unknownField\":\"not expected\"}"
])
fun `should send to DLQ undeserializable messages`(body: String) {
  // we send a Kafka message with an invalid body using a helper
  kafkaProducerHelper.send(TOPIC, body)

  // we check the message has been sent to the DLQ
  val errorRecords = kafkaDLQConsumerHelper.consumeAtLeast(1, TEN_SECONDS)
  assertThat(errorRecords).singleElement().satisfies { record ->
    assertThat(record.value()).isEqualTo(body)
  }
}


Enter fullscreen mode Exit fullscreen mode

That's it! Happy coding! 💙

💖 💪 🙅 🚩
rogervinas
Roger Viñas Alcon

Posted on April 9, 2021

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related

Spring Cloud Stream Kafka Streams first steps
springcloudstream Spring Cloud Stream Kafka Streams first steps

September 20, 2021

Spring Cloud Stream Kafka step by step
springcloudstream Spring Cloud Stream Kafka step by step

April 9, 2021