A story of building a custom Flow operator - bufferTimeout

psfeng

Pin-Sho Feng

Posted on May 17, 2020

A story of building a custom Flow operator - bufferTimeout

I have this particular use case where I need to process a live stream of elements to send them to a distributed queue (e.g. Kafka, NATS...). I'd like to consume this stream in batches that can be up to N elements in size and I want to buffer them for at most t duration. In other words, a batch should be processed when either of the following conditions is met:

  • Number of buffered elements reaches a certain size N
  • Timeout after t

Another use case would be consuming a live stream that we want to process and store to the DB, and in this stream there could be duplicated events or events that could be merged, which we'd like to process before storing.

1st attempt: Flow + Reactor

In this particular project I wanted it to be pure Kotlin with as few dependencies as possible so initially I didn't look at either RxJava or Reactor, and Kotlin coroutines (in particular Flow) came to mind. Looking at the available operators (using version 1.3.6 at the time of writing), I didn't find one for that functionality so I went to coroutine's Github issues to see if anyone had run into a similar problem and found an open PR for chunked and window. Still, this is missing the timeout functionality.

The next thing I thought was that either Reactor or RxJava would probably have something like that, and I was lucky to find bufferTimeout! Now, I wanted to avoid having an extra dependency but building an operator and doing it right must be quite difficult, right? I still want to use coroutines as much as possible, so maybe I could just use the interop library and use Reactor to process a Flow.

// some channel where events will be produced
val channel = Channel<String>()

val disposable = channel.consumeAsFlow()  
    .asFlux()  
    .bufferTimeout(500, Duration.ofMillis(500))  
    .concatMap { events ->  
        events  
            .map { event ->  
                mono {  
                    println("Received $event, ts: ${System.currentTimeMillis()}")
                    delay(100) // simultate suspending operation like sending to a queue  
                }
            .let { Flux.concat(it) }  
    }.subscribe()

Not bad, right? The problem is that when you start testing you'll find out that bufferTimeout doesn't implement backpressure (see this and this) and if your producer goes faster than you can consume, the flux will crash. The issue was first reported early 2018 and it doesn't seem easy to fix, so now what?

Maybe we could implement our own operator... Trying to fix bufferTimeout is out of the question so it has to be for Flow... but where do we start? 🤔 It's my first time using Flow, so I have no clue, let's see.

2nd try: imperative approach

If we can produce from a channel and can read from it, we should be able to just whip some naive code with a timeout to create our solution right? So I tried this path first. We'll need:

  • Something that will provide clock ticks for implementing the timeout
  • Accumulate values in a buffer
// some channel where events will be produced
val channel = Channel<String>()

val receiverJob = launch {
    // This launches a coroutine that will send ticks to a channel with a given frequency
    val tickerChannel = ticker(1000)
    val events = mutableListOf<String>()  

    try {
        while (true) {
            var hasTimedOut = false  
            val tickerJob = launch {  
                tickerChannel.receive()
                hasTimedOut = true
            }

            while (events.size < 500 && !hasTimedOut) {  
                withTimeoutOrNull(10) { channel.receive() }  
                    ?.let { events.add(it) }
            }

            events.forEach { event ->  
                println("Received $event, ts: ${System.currentTimeMillis()}")
            }

            events.clear()  
            tickerJob.cancel()
            hasTimedOut = false
        }
    } finally {
        tickerChannel.cancel()
    }
}

And voilà! Let's break this down a bit:

  • We launch a coroutine that will run the consumer logic
  • We have a mutable list that we use as the buffer of elements
  • There's an infinite loop to continuously read events from a channel
    • Inside the loop, we launch a coroutine to check if we reached the timeout. This is necessary because a channel's receive is suspending.
    • We read from the events channel while we haven't timed out or reached the desired buffer size. This is done again via receive and since it's suspending, we need to use withTimeoutOrNull to ensure it doesn't suspend forever if there are no items in the channel, as we might have elements in the buffer that we have to emit after the timeout.
  • Finally, since the current implementation of ticker doesn't respect structured concurrency and it's launched in the global scope, we need to make sure to cancel it to avoid a resource leak.
  • Note that backpressure is not a problem because by design Channel will suspend the producer if the consumer is not available to receive.

Now, this is a lot of code with a few undesirable properties:

  • Mutable variables
  • Manual resource management
  • Does not easily compose
  • Code will probably grow spaghetti over time

How can we improve it? There's probably already a bug that I haven't seen and over time that bug might be buried deep in more code. While some of the points can't be helped, can we encapsulate the logic so that it's easy to use? Sounds like it'd be awesome as a Flow operator!

Flow operator: bufferTimeout

Wouldn't it be great if we could just use it in the same way as with Flux so that the code looks like this?

channel.consumeAsFlow()
    .bufferTimeout(500, Duration.ofMillis(1000))
    .collect { println("Received $it") }

Taking the previous imperative implementation as a basis, we should be able to encapsulate it as an operator. With a few tweaks, this what I got initially:

@ExperimentalCoroutinesApi  
@ObsoleteCoroutinesApi  
fun <T> Flow<T>.bufferTimeout(size: Int, duration: Duration): Flow<T> {  
    require(size > 0) { "Window size should be greater than 0" }  
    require(duration.toMillis() > 0) { "Duration should be greater than 0" }

    return flow {  
        coroutineScope {  
            val events = ArrayList<T>(size)
            val tickerChannel = ticker(duration.toMillis())
            try {
                var hasTimedOut = false  
                val upstreamValues = produce { collect { send(it) } }  

                while (isActive) {  
                    val tickerJob = launch {  
                        tickerChannel.receive()
                        hasTimedOut = true  
                    }  

                    withTimeoutOrNull(10) { upstreamValues.receive() }  
                        ?.let { events.add(it) }

                    if (events.size == size || (hasTimedOut && events.isNotEmpty())) {  
                        emit(events.toList())  
                        events.clear()
                        tickerJob.cancel()
                        hasTimedOut = false
                    }
                }
            } finally {
                tickerChannel.cancel()
            }
        }
    }
}

As you can see, the code is pretty much the same as before but there are a few things worth noting:

  • Conceptually our operator creates a new Flow that consumes from the upstream Flow and emits for downstream consumption.
  • As we're going to call suspending functions, we need to be in a CoroutineScope.
  • Instead of receiving from a channel, we need to collect from the upstream Flow. As collect is a terminal operation that will not finish until the upstream is finished, we need to put it in a coroutine, which is what produce { collect { send(it) } } does while also creating a channel. All the collected values are sent to this channel that we'll drain in the infinite loop.
  • The rest is pretty much the same. Instead of while (true) I'm using while (isActive) but it doesn't really matter as all the suspending functions that are called support cancellation.
  • Some of the APIs used are either obsolete or experimental, but with no current alternatives AFAIK (coroutines 1.3.6), so this might change in a few months. I suspect select could make the code prettier but they're yet another experimental functionality so I thought what we had was enough for now.

That should be it, right? I proceed to write a simple test and run into an issue straight away:

@ExperimentalCoroutinesApi  
@ObsoleteCoroutinesApi  
internal class OperatorsTest : StringSpec({  
    "no emissions with empty flow" {  
        val actualCount = flowOf<Int>().bufferTimeout(5, Duration.ofMillis(10)).count()  
        actualCount shouldBe 0  
    }
})

// throws ClosedReceiveChannelException: Channel was closed

What the... ? It turns out withTimeoutOrNull(10) { values.receive() } is not accounting for the case when the channel is closed (upstream is done emitting). There's a receiveOrClosed() function but it's marked as internal so we can't use that. I decided to catch the exception and drain any event that might be left in the buffer:

try {
    // previous loop here
} catch {
    emit(events)
} finally {
    tickerChannel.cancel()
}

Now the previous test passes and I continue writing a test to check that the timeout is respected:

"buffers correctly by timeout" {
    var firstEmissionTs = 0L
    var firstItemReceiveTs = 0L
    var secondItemReceiveTs = 0L

    flow {
        firstEmissionTs = System.currentTimeMillis()
        emit(1)
        delay(50)

        emit(2)
        delay(50)

        emit(3)
    }
        .bufferTimeout(5, Duration.ofMillis(50))
        .collectIndexed { index, value ->
            when (index) {
                0 -> {
                    firstItemReceiveTs = System.currentTimeMillis()
                }
                1 -> {
                    secondItemReceiveTs = System.currentTimeMillis()
                }
                2 -> {}
                else -> {
                    throw RuntimeException("Emitted inexistent item")
                }
            }
        }

    (firstItemReceiveTs - firstEmissionTs) shouldBeGreaterThanOrEqual 50
    (secondItemReceiveTs - firstEmissionTs) shouldBeGreaterThan 100
}

// test fails randomly for the second assertion:
//   java.lang.AssertionError: 53 should be > 100

Something's off because the second item should be received after the second timeout, which is 50 + 50 = 100. What's going on? It turns out the delay that we specify in withTimeoutOrNull(10) is important because due to concurrency it could happen we're waiting on withTimeoutOrNull when the timeout ticker has already passed, meaning that a batch should be emitted. If during this timeout we receive something, a batch is going to be sent with this new item which should actually belong to the next batch. If we add some logs, it may look like this:

withTimeoutOrNull Got 1
withTimeoutOrNull timedout
withTimeoutOrNull timedout
withTimeoutOrNull timedout
withTimeoutOrNull timedout
timeout tick                 -- happens concurrently
withTimeoutOrNull Got 2      -- we were waiting due to delay
start emitting batch         -- batch has 1 and 2

Alright, let's take a step back. How do we decide when to emit a batch? When either we got the buffer filled or timeout has passed, whichever condition happens first but not concurrently. Can we somehow select between suspending functions and get the first one that is continued? Yes, we'll finally use the experimental select expression we mentioned before! Using that, the code is actually simplified:

@ExperimentalCoroutinesApi
@ObsoleteCoroutinesApi
fun <T> Flow<T>.bufferTimeout(size: Int, duration: Duration): Flow<List<T>> {
    require(size > 0) { "Window size should be greater than 0" }
    require(duration.toMillis() > 0) { "Duration should be greater than 0" }

    return flow {
        coroutineScope {
            val events = ArrayList<T>(size)
            val tickerChannel = ticker(duration.toMillis())
            try {
                val upstreamValues = produce { collect { send(it) } }

                while (isActive) {
                    var hasTimedOut = false

                    select<Unit> {
                        upstreamValues.onReceive {
                            events.add(it)
                        }

                        tickerChannel.onReceive {
                            hasTimedOut = true
                        }
                    }

                    if (events.size == size || (hasTimedOut && events.isNotEmpty())) {
                        emit(events.toList())
                        events.clear()
                    }
                }
            } catch (e: ClosedReceiveChannelException) {
                // drain remaining events
                if (events.isNotEmpty()) emit(events.toList())
            } finally {
                tickerChannel.cancel()
            }
        }
    }
}

And there goes my first attempt at making an operator out of an actual need! To save you from scrolling up again, the final usage is

someFlow
    .bufferTimeout(size = 500, duration = Duration.ofMillis(1000))
    .collect { println("Received batch: $it") }

Final remarks

This wasn't exactly easy, concurrency is hard, but it was definitely less scary than attempting to do the same with Rx as suspending channels make the backpressure problem a lot easier. There may be bugs or improvements that I'm not seeing so if you spot anything, I'd really appreciate a comment!

Thanks for reading and see you next time.

💖 💪 🙅 🚩
psfeng
Pin-Sho Feng

Posted on May 17, 2020

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related