Messaging services on AWS - Amazon SQS
Cloud Ranger
Posted on May 30, 2022
Photo by Will Truettner on Unsplash
Overview
When looking at scaling your services in production, a common approach that is used is to identify which workflows can be re-architected to an asynchronous workflow thereby allowing you to decouple services that do not necessarily have to work in sequence. This allows you to scale services independently by passing messages between services.
One approach of doing this on the AWS cloud is by using Amazon SQS (Simple Queue Service). As the name suggests, SQS offers a secure, durable, and available hosted queue that lets you integrate and decouple distributed software systems and components.
Show me some code!
The following code samples are in Java and you can use any o the supported languages that have an AWS SDK. Check here for a list of Let's dive right in!
Initialize a queue
Using the aws cli, a standard SQS queue can be created using the aws sqs create-queue
command as below:
aws sqs create-queue --queue-name my-sqs-using-cli
You can also attach tags to a queue when creating it as below:
aws sqs create-queue --queue-name my-sqs-using-cli-with-tag --tags "env"="test"
Writing messages to a queue
So with our queue created let's publish our first message to it!
Import the following libraries:
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.amazonaws.services.sqs.model.AmazonSQSException;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
We will need the queue URL when doing any operations on the queue. Let's fetch that using the queue name:
AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
String queue_url = sqs.getQueueUrl(QUEUE_NAME).getQueueUrl();
With the queue URL handy, let's add a single message to the queue:
SendMessageRequest send_msg_request = new SendMessageRequest()
.withQueueUrl(queueUrl)
.withMessageBody("hello world")
.withDelaySeconds(5);
sqs.sendMessage(send_msg_request);
You can also send multiple messages in a single request by using the sendMessageBatch method:
SendMessageBatchRequest send_batch_request = new SendMessageBatchRequest()
.withQueueUrl(queueUrl)
.withEntries(
new SendMessageBatchRequestEntry(
"msg_1", "Hello from message 1"),
new SendMessageBatchRequestEntry(
"msg_2", "Hello from message 2")
.withDelaySeconds(10));
sqs.sendMessageBatch(send_batch_request);
Reading messages from a queue
With our queue populated with messages, let's read them!
List<Message> messages = sqs.receiveMessage(queueUrl).getMessages();
What happens if a message cannot be read? Dead letter queues to the rescue!
SQS provides support for dead letter queues. A dead letter queue is a queue that other (source) queues can target for messages that can’t be processed successfully. You can set aside and isolate these messages in the dead letter queue to determine why their processing did not succeed.
Let's start by creating a source queue and a dead letter queue:
final AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
// Create source queue
try {
sqs.createQueue(src_queue_name);
} catch (AmazonSQSException e) {
if (!e.getErrorCode().equals("QueueAlreadyExists")) {
throw e;
}
}
// Create dead-letter queue
try {
sqs.createQueue(dl_queue_name);
} catch (AmazonSQSException e) {
if (!e.getErrorCode().equals("QueueAlreadyExists")) {
throw e;
}
}
To designate a dead letter queue, you must first create a redrive policy, and then set the policy in the queue’s attributes. A redrive policy is specified in JSON, and specifies the ARN of the dead letter queue and the maximum number of times the message can be received and not processed before it’s sent to the dead letter queue.
String dl_queue_url = sqs.getQueueUrl(dl_queue_name)
.getQueueUrl();
GetQueueAttributesResult queue_attrs = sqs.getQueueAttributes(
new GetQueueAttributesRequest(dl_queue_url)
.withAttributeNames("QueueArn"));
String dl_queue_arn = queue_attrs.getAttributes().get("QueueArn");
// Set dead letter queue with redrive policy on source queue.
String src_queue_url = sqs.getQueueUrl(src_queue_name)
.getQueueUrl();
SetQueueAttributesRequest request = new SetQueueAttributesRequest()
.withQueueUrl(src_queue_url)
.addAttributesEntry("RedrivePolicy",
"{\"maxReceiveCount\":\"5\", \"deadLetterTargetArn\":\""
+ dl_queue_arn + "\"}");
sqs.setQueueAttributes(request);
Now the dead letter queue is ready to accept any messages that are not read due to an error or timeout. You can read the messages left behind in the dead letter queue by running a seperate job and either trying to re-try sending the messages to the source queue for processing or by storing them in a permanent store like a database for further analysis.
Recap
So as you saw in the above steps, we were able to initialize a queue on SQS, read messages off of it, and also use the Dead letter queue to store messages that could not be processes.
Look out for future articles that talk about other messaging services on AWS that can be used to decouple your services for better scale and durability.
Thanks for reading!
Posted on May 30, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.