Mock a SQS queue with moto

drewmullen

drewmullen

Posted on March 10, 2021

Mock a SQS queue with moto

Below is simple example code to mock the aws sqs api. This can be useful for development so you dont have to actually manage a real SQS queue / connectivity / IAM permissions until you're ready.

app.py

A simple function to write a message to a preexisting queue

QUEUE_URL = os.environ.get('QUEUE_URL', '<default value>')
def write_message(data):
    sqs = boto3.client('sqs', region_name = 'us-east-1')
    r = sqs.send_message(
        MessageBody = json.dumps(data),
        QueueUrl = QUEUE_URL
    )
Enter fullscreen mode Exit fullscreen mode

conftest.py

Pytest fixtures to mock up the aws sqs API. aws_credentials() also ensures that your pytest functions will not actually write to aws resources.

REGION='us-east-'
@pytest.fixture(scope='function')
def aws_credentials():
    """Mocked AWS Credentials for moto."""
    os.environ['AWS_ACCESS_KEY_ID'] = 'testing'
    os.environ['AWS_SECRET_ACCESS_KEY'] = 'testing'
    os.environ['AWS_SECURITY_TOKEN'] = 'testing'
    os.environ['AWS_SESSION_TOKEN'] = 'testing'


@pytest.fixture(scope='function')
def sqs_client(aws_credentials):
    # setup
    with mock_sqs():
        yield boto3.client('sqs', region_name=REGION)
    # teardown
Enter fullscreen mode Exit fullscreen mode

test_sqs.py

An example test function. Create a queue using the mock client from conftest.py (notice sqs_client parameter matches the conftest function name sqs_client), invoke your python module function app.write_message(). Validate the returned message matches what you sent

def test_write_message(sqs_client):
    queue = sqs_client.create_queue(QueueName='test-msg-sender')
    queue_url = queue['QueueUrl']
    # override function global URL variable
    app.QUEUE_URL = queue_url
    expected_msg = str({'msg':f'this is a test'})
    app.write_message(expected_msg)
    sqs_messages = sqs_client.receive_message(QueueUrl=queue_url)

    assert json.loads(sqs_messages['Messages'][0]['Body']) == expected_msg
Enter fullscreen mode Exit fullscreen mode

Extra

In case you wanted to see my file structure:

├── README.md
├── app.py
├── requirements.txt
├── requirements_dev.txt
└── tests
    ├── __init__.py
    ├── conftest.py
    └── unit
        ├── __init__.py
        └── test_sqs.py
Enter fullscreen mode Exit fullscreen mode

Thank you

kudos to Arunkumar Muralidharan who got me started

💖 💪 🙅 🚩
drewmullen
drewmullen

Posted on March 10, 2021

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related

Mock a SQS queue with moto
python Mock a SQS queue with moto

March 10, 2021