Consuming AMQP Messages using Alpakka

ericcheatham

Eric Cheatham

Posted on December 3, 2019

Consuming AMQP Messages using Alpakka

Alpakka is a library written in both Scala and Java that provides a way to implement stream-aware pipelines for streaming data from one source to another.

One such source that Alpakka supports is from AMQP (Advanced Message Queuing Protocol) servers. Alpakka allows a consumer to treat an AMQP server as either a source (origin) or a sink (destination).

We will be exploring an example of using an AMQP server as a source as well as exploring another of Alpakka's features, using a relational database as the sink.

Overview

At a high level our example will be doing the following steps:

  • Declaring and attaching to an AMQP queue
  • Creating a consumer to consume messages off previously declared queue
  • Transforming the data into something that can be further manipulated
  • Writing our data to our database

In pictures, our steps would look a little something like this:
high level architecture overview

This may seem fairly complicated at first, however, Alpakka handles a lot of the heavy lifting for us and makes setting all this up a breeze!

Declaring a Queue and Creating a Consumer

Our first two steps go hand-in-hand; declaring our queue and creating our consumer.

We will begin by choosing a name for our queue and a connection provider for our queue. For this example our queue will be named amqp-test-queue-example and we will be going with AmqpLocalConnectionProvider. Our choice of connection provider will handle connecting to a local instance of an AMQP server for us. Alpakka provides several other connection providers to handle connecting to non-local servers.

val queueName = "amqp-test-queue-example"
val queueDeclaration = QueueDeclaration(queueName)
val connectionProvider = AmqpLocalConnectionProvider

object MessageQueue {
  def createQueue(connectionProvider: AmqpConnectionProvider,
                  queueName: String,
                  queueDeclaration: Declaration,
                  bufferSize: Int = 10): Source[ReadResult, NotUsed] =
    AmqpSource.atMostOnceSource(
      NamedQueueSourceSettings(connectionProvider, queueName)
        .withDeclaration(queueDeclaration)
        .withAckRequired(true),
      bufferSize = bufferSize
    )
}
Enter fullscreen mode Exit fullscreen mode

It is also worth noting in our above code that we have limited our queue's buffer size to 10 messages. This will limit the number of messages to pre-fetch from the AMQP server.

We are also telling our consumer to ACK all messages that are consumed by using the withAckRequired() method on the NamedQueueSourceSettings class. More complex models allow for custom ACKing/NAKing policies. To learn more about that visit the Alkpakka article about stream graphs

Transforming our Data

Now that we have a consumer that will take consume messages from an AMQP server and ACK those messages we can begin to use the contents of those messages. One hang up though: all our messages will be coming in the form of byte arrays! No worries, we can make use of JSON4s to unmarshall or convert our message from the form that was transmitted to a form we can use.

import org.json4s.native.Serialization.read

case class CustomerCreated(name: String, email: String)

val result =
    MessageQueue.createQueue(connectionProvider, queueName, queueDeclaration, bufferSize)
        .take(bufferSize)
        .map({ x => read[CustomerCreated](x.bytes.utf8String) })
        .log("Received message", x  println(x))
Enter fullscreen mode Exit fullscreen mode

In our example we are:

  • Creating our consumer using MessageQueue.createQueue()
  • Telling our consumer to take up to our buffer size of 10 messages
  • Iterating over the messages that we get and letting JSON4s convert them into our CustomerCreated case class
  • Logging out the result of each message

Writing to a Sink

At this point JSON4s has done us a huge favor and converted our message into something we can do some work with. For our purposes we will be writing our data to a PostgreSQL database. A look into our database structure is seen below.

 Column |          Type          | Collation | Nullable |                Default
--------+------------------------+-----------+----------+---------------------------------------
 id     | integer                |           | not null | nextval('customers_id_seq'::regclass)
 name   | character varying(100) |           |          |
 email  | character varying(100) |           |          |
Enter fullscreen mode Exit fullscreen mode

To write to a SQL based database we will have to make use of another of Alpakka's many connectors: Slick. Slick handles connecting to and writing to a database of our choosing.

It's honestly pretty slick.

Slick requires you to provide a configuration file that describes how to connect to a database. For our example we'll be using the following:

slick-postgres {
  profile = "slick.jdbc.PostgresProfile$"

  db = {
    password = "password"
    user = "example"
    url = "jdbc:postgresql://127.0.0.1/exampledb"
  }
}
Enter fullscreen mode Exit fullscreen mode

Going back to our created consumer, we can now tell our consumer where to write our consumed and transformed data. We will do so by telling slick to use our configuration file and writing each item to our sink as sql inserts.

import akka.stream.alpakka.slick.javadsl.SlickSession
import akka.stream.alpakka.slick.scaladsl.Slick

case class CustomerCreated(name: String, email: String)

MessageQueue.createQueue(connectionProvider, queueName, queueDeclaration, bufferSize)
    .take(bufferSize)
    .map({ x => read[CustomerCreated](x.bytes.utf8String) })
    .log("Received message", x  println(x))
    .runWith(
        Slick.sink({ x  sqlu"INSERT INTO customers (name, email) VALUES(${x.name}, ${x.email})" })
    )
Enter fullscreen mode Exit fullscreen mode

Wrapping our database sink in a runWith() allows us to materialize our flow. Until now we have merely been describing what we intend to do. Materializing our consumer tells Alpakka to allocate all the necessary resources in order to run our flow. To further explore stream materialization visit the Alpakka article on Stream Materailzation

Seeing this all work

All the code that we have constructed above can be found on my github repository. The README.md will walk you through the process of starting up a local RabbitMq instance (our AMQP server) as well as a Postgres database (our relational database sink).

GitHub logo ericcheatham / event-streaming-example

Scala Event Streaming App Example


In conclusion we explored one of many ways to use Alpakka to consume asynchronous messages from an AMQP source. Alpakka provides connectors for a vast many more connectors and functions aside from what we've explored here. No matter the situation you are approaching, Alpakka has it covered in their incredible documenation


If you ever want to talk Scala or have really spicy takes about food, send me a message over on Twitter or LinkedIn

💖 💪 🙅 🚩
ericcheatham
Eric Cheatham

Posted on December 3, 2019

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related