Implementing queues with Amazon SQS

animusna

Aniello Musella

Posted on November 10, 2022

Implementing queues with Amazon SQS

In this post I'll show how to use Amazon Simple Query Service a.k.a. Amazon SQS providing a coding example in Python implementing the "producer consumer problem".

What do you need before to start?

  • AWS account If you don't have one you can activate it using free-tier plan.
  • AWS CLI installed.
  • Python basic knowledge level
  • Python 3 installed
  • Access to a bash shell

Scenario

The goal of this article is to implement a classic consumer/producer problem using a FIFO queue where a producer sends one or more messages to a queue and a consumer receives these messages reading from the same queue. We choose a FIFO queue model to guarantee that the receiving order of the messages is the same of the sending order (pretending that this matters).

What's Amazon SQS

From the documentation:

Amazon Simple Queue Service (SQS) is a managed message queuing service that enables you to decouple and scale microservices, distributed systems, and serverless applications. SQS reduces the complexity of managing and operating message-oriented middleware. SQS is designed so that you can send, store, and receive messages between software components at any volume, without losing messages or requiring other services to be available.

AWS SQS

We'll use this service to create and use a FIFO queue.

Setting up the environment

Let's set up the cloud.

Configure AWS Command Line Interface

To permit to AWS CLI to access to the AWS cloud we need to configure it running the command aws configure

am@animusna:~$aws configure
AWS Access Key ID [None]: AKIAIOSFODNN7EXAMPLE
AWS Secret Access Key [None]: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
Default region name [None]: eu-west-1
Default output format [None]: json
Enter fullscreen mode Exit fullscreen mode

In case you need to create the keys follow the official guide

Creation of a FIFO queue

Let's create a FIFO queue running the command aws sqs create-queue:

am@animusna:~$aws sqs create-queue --queue-name test.fifo --attributes FifoQueue=true
{
    "QueueUrl": "https://eu-west-1.queue.amazonaws.com/123456789012/test.fifo"
}
Enter fullscreen mode Exit fullscreen mode

The output is in JSON format and it represents the URL of the queue that will be used in the demo.

Let's code

The solution is composed by three Python files and one configuration file.
To implement the demo we'll use:

  • boto3 the official AWS SDK for Python.
  • colorama module to to prettify the printings in the terminal and used in the demo to make more understandable the outputs of different threads.

Implementing the consumer

In the consumer.py is defined the class Consumer that implements the message reader of the queue. The consumer read one message at a time and after the reading remove it.

import boto3
from configparser import ConfigParser
import messageSqs
import warnings

warnings.filterwarnings('ignore', category=FutureWarning, module='botocore.client')

class Consumer:
    def __init__(self, region_name, queue_url,logger):
        self.region_name = region_name
        self.queue_url = queue_url
        self.logger=logger

    def receive_message(self,deleteAfterReceiveing):
        sqs_client = boto3.client("sqs", region_name=self.region_name)
        response = sqs_client.receive_message(
            QueueUrl=self.queue_url,
            MaxNumberOfMessages=1,
            WaitTimeSeconds=15,
        )

        messages=response.get("Messages", [])

        self.logger(f"Number of messages received: {len(messages)}")

        if len(messages)==0:
            return False

        for message in messages:
            m=messageSqs.MessageSqs(jsonStr=message["Body"])
            self.logger(f"\tMessage read from queue: Data:{m.data}\tTime Stamp:{m.ts}\t Id:{m.id}")
            if deleteAfterReceiveing:
                receipt_handle = message['ReceiptHandle']
                dlt_response=sqs_client.delete_message(QueueUrl=self.queue_url,ReceiptHandle=receipt_handle)
                if dlt_response['ResponseMetadata']['HTTPStatusCode']==200:
                    self.logger("\tMessage deleted from queue.")
        return True
Enter fullscreen mode Exit fullscreen mode

Implementing the producer

In the producer.py it's defined the Producer class that implements the message sender of the queue. The producer create one message at a time to send it to the queue.

import boto3
import uuid
from messageSqs import MessageSqsJSONEncoder
import warnings

warnings.filterwarnings('ignore', category=FutureWarning, module='botocore.client')

class Producer:

    def __init__(self,region_name,queue_url,logger):
        self.region_name = region_name
        self.queue_url= queue_url
        self.logger=logger

    def send_message(self,message):
        sqs_client = boto3.client("sqs", self.region_name)
        self.logger(f"Sending message with id {message.id}...")        
        response = sqs_client.send_message(
            QueueUrl=self.queue_url,
            MessageBody=MessageSqsJSONEncoder().encode(message),
            MessageGroupId="test",
            MessageDeduplicationId=f"DeduplicationId-{uuid.uuid4()}"
        )
        self.logger(f"SQS Response: Status:{response['ResponseMetadata']['HTTPStatusCode']}\tSQS Message Id:{response['MessageId']}")             
Enter fullscreen mode Exit fullscreen mode

Implementing the scenario

In the module sqs-demo.py we run in different concurrent threads the producer and consumer.The producer send a message each half second for a max of M (where M is PRODUCER_MESSAGES_TO_SENT defined in configuration file). The consumer try to read a message each half second until there is something to read. The consumer stops after detecting an empty queue for K times (where K is CONSUMER_EXIT_IF_QUEUE_IS_EMPTY_AFTER_ATTEMPTS defined in the configuration file).

from colorama import Fore
from configparser import ConfigParser
from consumer import Consumer
from datetime import datetime
from messageSqs import MessageSqs
from producer import Producer
import time
import threading
import uuid

config_object = ConfigParser()
config_object.read("config.ini")
aws_conf = config_object["AWS"]
demo_conf = config_object["DEMO"]
message_to_sent = int(demo_conf["PRODUCER_MESSAGES_TO_SENT"])
max_attempts_if_queue_is_empty = int(
    demo_conf["CONSUMER_EXIT_IF_QUEUE_IS_EMPTY_AFTER_ATTEMPTS"])
consumer_start_delay = float(demo_conf["CONSUMER_START_DELAY_IN_SECONDS"])
producer_start_delay = float(demo_conf["PRODUCER_START_DELAY_IN_SECONDS"])


def log(task_name, text, textColor):
    print(textColor + f"{task_name}:{text}" + Fore.WHITE)

def produce_task(task_name="producer"):
    logger=lambda text: log(task_name, text, Fore.YELLOW)
    logger("Running ...")
    time.sleep(producer_start_delay)

    message_index=0
    producer=Producer(aws_conf["REGION"], aws_conf["QUEUE_URL"], logger)
    while (message_index < message_to_sent):
        m=MessageSqs(
            jsonStr = f"{{\"data\":\"mydata{message_index}\",\"ts\":\"{datetime.now().isoformat()}\",\"id\":\"{uuid.uuid4()}\" }}")
        producer.send_message(m)
        message_index += 1
        time.sleep(0.5)
    logger("Terminated!")


def consume_task(task_name = "consumer"):
    logger=lambda text: log(task_name, text, Fore.GREEN)
    receiver = Consumer(aws_conf["REGION"],
                        aws_conf["QUEUE_URL"], logger)
    logger("Running...")
    time.sleep(consumer_start_delay)
    attempts = 0
    while (attempts < max_attempts_if_queue_is_empty):
        if not receiver.receive_message(True):
            attempts += 1
        time.sleep(0.5)

print(Fore.WHITE + '\nStarting demo AWS SQS..\n')

if __name__ == "__main__":
    tasks = []
    tasks.append(threading.Thread(target=produce_task, args=("Producer-SQS",)))
    tasks.append(threading.Thread(target=consume_task, args=("Consumer-SQS",)))

    for t in tasks:
        t.start()

    for t in tasks:
        t.join()

    print(Fore.WHITE + "\n\nAWS SQS Demo terminated!\n\n")

Enter fullscreen mode Exit fullscreen mode

The configuration file

In the file config.ini there is the configuration of the demo:

[AWS]
REGION = eu-west-1
QUEUE_URL = https://eu-west-1.queue.amazonaws.com/123456789012/test.fifo
[DEMO] 
#Total messages sent by producer
PRODUCER_MESSAGES_TO_SENT = 100
#Delay in seconds before to start the producer
PRODUCER_START_DELAY_IN_SECONDS=0.5
#If the consumer checks the queue is empty it exits after this number of attempts.
CONSUMER_EXIT_IF_QUEUE_IS_EMPTY_AFTER_ATTEMPTS = 10     
#Delay in seconds before to start teh consumer
CONSUMER_START_DELAY_IN_SECONDS=2
Enter fullscreen mode Exit fullscreen mode

Output of the demo

Following some screenshots of the demo in action:

Starting Output

Middle Output

Final output

Points of interest

  • Increase the consumers
  • Increase the producers
  • Change the queue type
  • Increase/decrease the visibility timeout
  • Read more than one message at a time
  • ...

Conclusions

In this post we've seen how it's easy to implement a consumer/producer problem making use of a FIFO queue hosted in AWS SQS. The use of the official SDK Boto3 it's very intuitive as much as the AWS cli that in one only command allowed us to create a new queue.

You can find the sources on GitHub.

💖 💪 🙅 🚩
animusna
Aniello Musella

Posted on November 10, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related