Gauthier
Posted on May 26, 2020
I'm sure many of you once faced this situation where you require to handle asynchronous tasks in your application, and to be able to queue tasks that will be executed sequentially.
Well, I faced this situation too ! And I'm going to show you how I solved the problem using an open-source message queue: Apache Artemis.
Problem statement
Before jumping into a solution, it is important to state what we are trying to solve.
We are talking about tasks, usually those are background tasks, that will be executed in an asynchronous manner. That means that the caller will not be blocked waiting for the task to be completed, this is very useful in the case of an API or a GUI for example.
Those tasks can be queued for later execution. We want the tasks to be executed sequentially, one after the other. Once a task is executed, we will pick up another task to execute.
Optionally we would like to skip some tasks in the queue if a similar task is already in the queue. That way we can save some processing time. For example let's say you have tasks that analyze files, you don't want to analyze the same file twice, even if the user requested it by clicking twice.
Lastly, it would be nice to be able to persist the task queue on disk, so in case your application stops or crashes, it can resume its processing where it left off.
Thus we are looking for a solution that can:
- manage tasks or messages in a queue for sequential execution
- can skip similar messages in the queue
- can persist the task queue on disk
Which solution can fit those needs?
The problem statement described above can be solved using various tools. Actually my first solution for the problem was to use Spring Boot's @Async
support, but I soon ran into issues, mostly due to the configuration of various ThreadPoolTaskExecutor
and the fact that proxy annotations must reside in a different class to work. It also did not fulfill two of the requirements, which are the ability to skip similar messages, and the ability to persist the queue on disk.
The problem described is well addressed by message queues, but if you are familiar with the topic there are many different message queues frameworks! Which is the right one to choose from ?
I wanted a solution that would fit well in my small application, meaning:
- it needs to be embeddable - I don't want to run another application on the side, I want this to be part of my application. It's also used only internally, so there is no need to have an external broker.
- Given point 1., it needs to be implemented fully in Java.
Apache ActiveMQ is a well-known open-source message queue implemented in Java, which is embeddable. It supports disk persistence, but it cannot skip similar messages.
Apache Artemis is a newer version of ActiveMQ, which has an interesting feature called Last-Value Queues:
Last-Value queues are special queues which discard any messages when a newer message with the same value for a well-defined Last-Value property is put in the queue. In other words, a Last-Value queue only retains the last value.
And that's exactly what we are looking after!
Broker setup
I am using Spring Boot for most of my projects, and that's what I'll be using in this tutorial. Spring Boot has out-of-the-box support for Apache Artemis, and their documentation covers some of the aspect of integrating it in your application.
In order to add Artemis to Spring Boot, you can just add a dependency to org.springframework.boot:spring-boot-starter-artemis
.
I'm using Gradle, so that's as simple as:
implementation("org.springframework.boot:spring-boot-starter-artemis")
Spring Boot will autoconfigure an embedded broker if you don't specify any configuration. We can tweak some of the configuration via the application.yml
file of our application:
spring:
artemis:
embedded:
persistent: true
data-directory: ~/.komga/artemis
Here we are explicitly configuring the persistency of the queues, and setting a specific directory that will store all the Artemis data on disk.
Next we want to configure a bit more Artemis, and that requires writing some code. Spring Boot lets you define a bean extending ArtemisConfigurationCustomizer
to customize the Artemis configuration at startup. So let's do that.
const val QUEUE_UNIQUE_ID = "unique_id"
const val QUEUE_TYPE = "type"
const val QUEUE_TASKS = "tasks.background"
const val QUEUE_TASKS_TYPE = "task"
const val QUEUE_TASKS_SELECTOR = "$QUEUE_TYPE = '$QUEUE_TASKS_TYPE'"
@Configuration
class ArtemisConfig : ArtemisConfigurationCustomizer {
override fun customize(configuration: ArtemisConfiguration?) {
configuration?.let {
// default is 90, meaning the queue would block if disk is 90% full. Set it to 100 to avoid blocking.
it.maxDiskUsage = 100
// disable prefetch, ensures messages stay in the queue and last value can have desired effect
it.addAddressesSetting(QUEUE_TASKS, AddressSettings().apply {
defaultConsumerWindowSize = 0
})
it.addQueueConfiguration(
CoreQueueConfiguration()
.setAddress(QUEUE_TASKS)
.setName(QUEUE_TASKS)
.setLastValueKey(QUEUE_UNIQUE_ID)
.setRoutingType(RoutingType.ANYCAST)
)
}
}
}
Let me explain what we did here:
- We are going to create a single queue called
tasks.background
. - As we want to use the Last-Value feature, we need to configure a Key that will be used to find similar messages, we will use
unique_id
for the name of this key. - We also change the
maxDiskUsage
from its default of90
to100
. By default, if the disk on which yourdata-directory
is located reaches 90% of usage, the broker will pause the queue. We don't want that to happen, so we change it to100
, so the broker will not pause if the disk gets full. - Lastly we need to disable prefetch. By default a consumer will fetch multiple messages from the queue so it can process them in sequence. This is good when you have a lot of messages, short processing time, and multiple consumers, it helps to reduce the polling time. This is not our case here. The prefetch also has the adverse effect of storing the messages locally in an executor service, which means they are not in the queue anymore, which means we would lose the message in case the application stops, and we lose the capability to skip similar messages.
Testing the broker
We have setup the embedded broker, but we need to ensure that it's working as expected, especially the Last-Value feature.
Before we jump in the test code, we need to disable Artemis persistence on disk for tests only. I ran into some locks because of that, so better be safe.
It's easy using a Spring configuration file:
spring:
artemis:
embedded:
persistent: false
Now we can start adding some tests. We will test two behaviors:
- messages sent without unique ID are all received
- messages sent with unique ID are deduplicated
In order to send messages to Artemis, we can use Spring's JmsTemplate
.
@ExtendWith(SpringExtension::class)
@SpringBootTest
class ArtemisConfigTest(
@Autowired private val jmsTemplate: JmsTemplate
) {
init {
// we setup the JmsTemplate to not block waiting for messages if the queue is empty, and return immediately to the calling code
jmsTemplate.receiveTimeout = JmsDestinationAccessor.RECEIVE_TIMEOUT_NO_WAIT
}
@AfterEach
fun emptyQueue() {
while (jmsTemplate.receive(QUEUE_TASKS) != null) {
logger.info { "Emptying queue" }
}
@Test
fun `when sending messages without unique id then messages are not deduplicated`() {
for (i in 1..5) {
jmsTemplate.convertAndSend(
QUEUE_TASKS,
"message $i"
)
}
// we use the browse method to check what's in the queue, without actually receiving the messages
val size = jmsTemplate.browse(QUEUE_TASKS) { _: Session, browser: QueueBrowser ->
browser.enumeration.toList().size
}
assertThat(size).isEqualTo(5)
// we will retrieve the first message to ensure the content is correct
val msg = jmsTemplate.receiveAndConvert(QUEUE_TASKS) as String
assertThat(msg).isEqualTo("message 1")
}
@Test
fun `when sending messages with the same unique id then messages are deduplicated`() {
for (i in 1..5) {
jmsTemplate.convertAndSend(
QUEUE_TASKS,
"message $i"
) {
// we add a StringProperty to each message, using the same value
it.apply { setStringProperty(QUEUE_UNIQUE_ID, "1") }
}
}
val size = jmsTemplate.browse(QUEUE_TASKS) { _: Session, browser: QueueBrowser ->
browser.enumeration.toList().size
}
assertThat(size).isEqualTo(1)
// we are retrieving the first message on the queue, which happens to also be the last one we sent, since Last-Value is working and previous messages are removed
val msg = jmsTemplate.receiveAndConvert(QUEUE_TASKS) as String
assertThat(msg).isEqualTo("message 5")
}
}
As you can see, in the first test all the messages are in the queue, as expected, and they are also in the same order in which they were sent.
In the second test, Last-Value is enabled via the StringProperty on the messages having all the same value ("1"
). When Artemis receives a message, it will check for existing messages in the queue with the same value for this key, and remove them. Hence we have only a single message in the queue, and it has the value of the last message sent.
Our broker is working as expected, it's doing a good job at removing duplicates. Now we need to use that to setup our task queue.
Setting a task queue
In our scenario we want to execute tasks sequentially, and without duplicates. Those tasks can be of various nature.
To represent the different tasks we will receive we can use Kotlin's sealed class
:
sealed class Task : Serializable {
abstract fun uniqueId(): String
data class ScanLibrary(val libraryId: Long) : Task() {
override fun uniqueId() = "SCAN_LIBRARY_$libraryId"
}
data class AnalyzeBook(val bookId: Long) : Task() {
override fun uniqueId() = "ANALYZE_BOOK_$bookId"
}
data class GenerateBookThumbnail(val bookId: Long) : Task() {
override fun uniqueId() = "GENERATE_BOOK_THUMBNAIL_$bookId"
}
data class RefreshBookMetadata(val bookId: Long) : Task() {
override fun uniqueId() = "REFRESH_BOOK_METADATA_$bookId"
}
}
We have a base class Task
, which is Serializable
as it's required for sending objects over JMS. It has an abstract uniqueId
property.
Each task type is a data class
implementing Task
, and overriding uniqueId
. We use the id
of the resource concerned by the task to build a unique string for the uniqueId
. That way 2 tasks of type AnalyzeBook
will only be deduplicated if they have the same bookId
.
Good, now we have our tasks defined, we need to wire up 2 components:
- a task receiver, that will receive tasks from other components, and send them to the broker.
- a task handler, that will pull tasks from the queue and execute them.
@Service
class TaskReceiver(
private val jmsTemplate: JmsTemplate
) {
fun scanLibrary(library: Library) {
submitTask(Task.ScanLibrary(library.id))
}
fun analyzeBook(book: Book) {
submitTask(Task.AnalyzeBook(book.id))
}
fun generateBookThumbnail(book: Book) {
submitTask(Task.GenerateBookThumbnail(book.id))
}
fun refreshBookMetadata(book: Book) {
submitTask(Task.RefreshBookMetadata(book.id))
}
private fun submitTask(task: Task) {
logger.info { "Sending task: $task" }
jmsTemplate.convertAndSend(QUEUE_TASKS, task) {
it.apply {
setStringProperty(QUEUE_TYPE, QUEUE_TASKS_TYPE)
setStringProperty(QUEUE_UNIQUE_ID, task.uniqueId())
}
}
}
}
The code is quite simple, we have one function for each task type that converts the Domain object to the corresponding task, and one private function that ensures the uniqueId
is set properly before sending.
Then we need to build the task handler:
@Service
class TaskHandler(
private val libraryRepository: LibraryRepository,
private val bookRepository: BookRepository,
private val libraryScanner: LibraryScanner,
private val bookLifecycle: BookLifecycle,
private val metadataLifecycle: MetadataLifecycle
) {
@JmsListener(destination = QUEUE_TASKS, selector = QUEUE_TASKS_SELECTOR)
@Transactional
fun handleTask(task: Task) {
logger.info { "Executing task: $task" }
try {
measureTime {
when (task) {
is Task.ScanLibrary ->
libraryRepository.findByIdOrNull(task.libraryId)?.let {
libraryScanner.scanRootFolder(it)
} ?: logger.warn { "Cannot execute task $task: Library does not exist" }
is Task.AnalyzeBook ->
bookRepository.findByIdOrNull(task.bookId)?.let {
bookLifecycle.analyzeAndPersist(it)
} ?: logger.warn { "Cannot execute task $task: Book does not exist" }
is Task.GenerateBookThumbnail ->
bookRepository.findByIdOrNull(task.bookId)?.let {
bookLifecycle.regenerateThumbnailAndPersist(it)
} ?: logger.warn { "Cannot execute task $task: Book does not exist" }
is Task.RefreshBookMetadata ->
bookRepository.findByIdOrNull(task.bookId)?.let {
metadataLifecycle.refreshMetadata(it)
} ?: logger.warn { "Cannot execute task $task: Book does not exist" }
}
}.also {
logger.info { "Task $task executed in $it" }
}
} catch (e: Exception) {
logger.error(e) { "Task $task execution failed" }
}
}
}
We use the @JmsListener
annotation to listen to specific messages:
-
destination = QUEUE_TASKS
will only get messages from ourtasks.bakground
queue -
selector = QUEUE_TASKS_SELECTOR
is a powerful feature of JMS, which lets you use SQL-like selectors to retrieve only specific messages.
In the broker configuration, we did add 3 constants that we didn't use yet:
const val QUEUE_TYPE = "type"
const val QUEUE_TASKS_TYPE = "task"
const val QUEUE_TASKS_SELECTOR = "$QUEUE_TYPE = '$QUEUE_TASKS_TYPE'"
And in the TaskReceiver
, when sending messages, we added an extra property to the message:
setStringProperty(QUEUE_TYPE, QUEUE_TASKS_TYPE)
Basically what we did here is that:
- all tasks sent are set with a
type
property equals totask
. - we only retrieve messages that match the condition
type = task
.
Why did we do that, you may ask ?
- First, you may want to have multiple queues in your application, but you want to have one receiver per queue, so your code is well separated.
- Second, if we have a single
@JmsListener
without a selector, it will get all messages all the time, even when we are running our tests. Remember our test from above ? Since it's a@SpringBootTest
it's loading the whole application, including ourTaskHandler
, so if we didn't have theselector
in place, theTaskHandler
would consume all our messages and our tests would fail.
Our annotated function takes a Task
parameter:
fun handleTask(task: Task)
That's a convenience from Spring that will automatically deserialize the message and pass it to our function.
We can then use Kotlin's pattern matching with when
to conveniently go through all the different task types defined, and execute a business action.
Wrapping up
We did quite a lot here, and with so little code!
- We did setup an Apache Artemis broker which is embedded in our application.
- The message queues are persisted on disk, and the broker will resume task consumption where it left off in case of application stop or crash.
- Messages are deduplicated using a unique ID, making our application spend less time processing the same tasks.
- Tasks are submitted to the queue synchronously, and return immediately.
- Tasks are handled sequentially and synchronously. Subsequent tasks will be processed only when the current task is finished.
The code excerpts are taken from my open-source project Komga. If you are interested in how I replaced the use of @Async
with Artemis, check out this commit.
Posted on May 26, 2020
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
October 27, 2024
December 10, 2023