Yet another ode to Vert.x, or how to write a performance-wise expiring map in less than 100 lines of code.
Oleg Agafonov
Posted on September 22, 2021
I've been working with the Vert.x framework for more than 4 years but I won't stop being excited how simple, lightweight and elegant it is (especially the event loop thread model). In this blog post I will tell you how we implemented PeriodicallyExpiringHashMap
data structure in less than 100 lines of code. But first let me give you a bit of a context about why do we need it.
Problem
SIP3 is a very advanced VoIP monitoring and troubleshooting platform. To provide detailed information about calls quality we need to:
- Aggregate RTP packets into RTP streams in a real-time
- Periodically walk though all the RTP streams and terminate ones that haven't been updated for a certain period of time.
Let's stay away from telecom specific and take a look at a simplified code example:
class RtpStreamHandler : AbstractVerticle() {
var expirationDelay: Long = 1000
var aggregationTimeout: Long = 30000
private val rtpStreams = mutableMapOf<String, RtpStream>()
override fun start() {
vertx.setPeriodic(expirationDelay) {
val now = System.currentTimeMillis()
rtpStreams.filterValues { rtpStream -> rtpStream.updatedAt + aggregationTimeout < now }
.forEach { (rtpStreamId, rtpStream) ->
terminateRtpStream(rtpStream)
rtpStreams.remove(rtpStreamId)
}
}
vertx.eventBus().localConsumer<RtpPacket>("on_rtp_packet") { event ->
val rtpPacket = event.body()
handleRtpPacket(rtpPacket)
}
}
fun handleRtpPacket(rtpPacket: RtpPacket) {
val rtpStream = rtpStreams.getOrPut(rtpPacket.rtpStreamId) { RtpStream() }
rtpStream.addPacket(rtpPacket)
}
fun terminateRtpStream(rtpStream: RtpStream) {
vertx.eventBus().localSend("on_rtp_stream", rtpStream)
}
}
Now let's imagine that we constantly have 30K of active RTP streams. Also every second we terminate approximately a thousand of old steams but get a thousand of new ones instead. In these circumstances our code doesn't look very efficient and we certainly need a better solution.
Solution
As you can see from the first code snippet once an RTP stream was updated it won't be terminated at least for the next aggregationTimeout
. This means that we can simply do not bother about it for some time.
And this is the key idea behind the SIP3 PeriodicallyExpiringHashMap
implementation:
class PeriodicallyExpiringHashMap<K, V> private constructor(
vertx: Vertx,
private val delay: Long,
private val period: Int,
private val expireAt: (K, V) -> Long,
private val onExpire: (K, V) -> Unit
) {
private val objects = mutableMapOf<K, V>()
private val expiringSlots = (0 until period).map { mutableMapOf<K, V>() }.toList()
private var expiringSlotIdx = 0
init {
vertx.setPeriodic(delay) {
terminateExpiringSlot()
expiringSlotIdx += 1
if (expiringSlotIdx >= period) {
expiringSlotIdx = 0
}
}
}
fun getOrPut(key: K, defaultValue: () -> V): V {
return objects.getOrPut(key) {
defaultValue.invoke().also { expiringSlots[expiringSlotIdx][key] = it }
}
}
private fun terminateExpiringSlot() {
val now = System.currentTimeMillis()
expiringSlots[expiringSlotIdx].apply {
forEach { (k, v) ->
val expireAt = expireAt(k, v)
when {
expireAt <= now -> {
objects.remove(k)?.let { onExpire(k, it) }
}
else -> {
var shift = ((expireAt - now) / delay).toInt() + 1
if (shift >= period) {
shift = period - 1
}
val nextExpiringSlotIdx = (expiringSlotIdx + shift) % period
expiringSlots[nextExpiringSlotIdx][k] = v
}
}
}
clear()
}
}
data class Builder<K, V>(
var delay: Long = 1000,
var period: Int = 60,
var expireAt: (K, V) -> Long = { _: K, _: V -> Long.MAX_VALUE },
var onExpire: (K, V) -> Unit = { _: K, _: V -> }
) {
fun delay(delay: Long) = apply { this.delay = delay }
fun period(period: Int) = apply { this.period = period }
fun expireAt(expireAt: (K, V) -> Long) = apply { this.expireAt = expireAt }
fun onExpire(onExpire: (K, V) -> Unit) = apply { this.onExpire = onExpire }
fun build(vertx: Vertx) = PeriodicallyExpiringHashMap(vertx, delay, period, expireAt, onExpire)
}
}
Here are the benefits of this data structure:
- Now we just have a bunch of time slots. So, instead of walking through all the objects in our map every
expirationDelay
we can walk trough a single slot. So, instead of checking on 30K objects every second we will check on 1K only. - We don't need to create a copy of original map every time we decide to walk though it. In the previous example it also was an issue, because
rtpSteams.filtervalues
creates a copy of the original map. - The last and the most important. Our implementation will stay consistent within a particular verticle context. That means you can simply extend it and implement the rest of the methods (including tricky ones, like
size()
).
Conclusions
Finally let's see how our verticle will look like with the new PeriodicallyExpiringHashMap
data structure:
class RtpStreamHandler : AbstractVerticle() {
var expirationDelay: Long = 1000
var aggregationTimeout: Long = 30000
private lateinit var rtpStreams: PeriodicallyExpiringHashMap<String, RtpStream>
override fun start() {
rtpStreams = PeriodicallyExpiringHashMap.Builder<String, RtpStream>()
.delay(expirationDelay)
.period((aggregationTimeout / expirationDelay).toInt())
.expireAt { _, rtpStream -> rtpStream.updatedAt + aggregationTimeout }
.onExpire { _, rtpStream -> terminateRtpStream(rtpStream) }
.build(vertx)
vertx.eventBus().localConsumer<RtpPacket>("on_rtp_packet") { event ->
val rtpPacket = event.body()
handleRtpPacket(rtpPacket)
}
}
fun handleRtpPacket(rtpPacket: RtpPacket) {
val rtpStream = rtpStreams.getOrPut(rtpPacket.rtpStreamId) { RtpStream() }
rtpStream.addPacket(rtpPacket)
}
fun terminateRtpStream(rtpStream: RtpStream) {
vertx.eventBus().localSend("on_rtp_stream", rtpStream)
}
}
And here are the load tests results (purple - is our new implementation):
The tests look great and the code looks clean and simple thanks to the Vert.x event loop thread model.
šØāš» Happy coding,
Your SIP3 team.
Posted on September 22, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.