AWS MWAA and AWS SES integration

mucio

mucio

Posted on November 2, 2021

AWS MWAA and AWS SES integration

Table of Contents

  1. Intro
  2. The standard SMTP configuration
  3. Can we skip the credentials?
  4. A possible solution

Intro

This post is about integrating MWAA with SES using an IAM role and not SMTP credentials. I will try to keep it short and focused.

The assumption here is that you already have your MWAA 2.0.2 environment and its role configured, as per the AWS documentation, and with the SES:* actions allowed on your AWS SES.

The standard SMTP configuration

If you already have your MWAA environment configured and you are trying to send emails, you probably ended up on this documentation page: the default settings there should be good enough to give you an idea of what you need.

The one we are interested in the most is the email.email_backend. As you can see in the documentation the default value is airflow.utils.email.send_email_smtp.

This is the default airflow SMTP integration and works pretty well if you have SMTP credentials (and it works also with SES, see the link above to create the credentials).

Can we skip the credentials?

I do not like to create more credentials than I need. I wanted to be able to use AWS SES without them, just using the IAM Role assigned to my MWAA environment.

What we need is a different email backend, specifically airflow.providers.amazon.aws.utils.emailer.send_email. This will use a small module which is sitting among the ones provided by AWS.

All good and dandy until you try to send the first email.

Here is a quick DAG to copy&paste to test your email:

from datetime import datetime

from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.email import EmailOperator

my_dag = DAG(dag_id="test_email_dag",
             start_date=datetime(2021, 10, 1),
             schedule_interval="0 0 * * *"
            )

start = DummyOperator(dag=my_dag,
                      task_id="start"
                     )

end = DummyOperator(dag=my_dag,
                    task_id="end"
                   )

run_this = EmailOperator(task_id='sent_email',
                         to='mucio@mucio.net',
                         subject="test",
                         html_content="this is a test, nothing to worry"
                        )

start >> run_this >> end
Enter fullscreen mode Exit fullscreen mode

If you run this DAG you will probably see the following error in the logs:

botocore.exceptions.ParamValidationError: Parameter validation failed:
Invalid type for parameter Source, value: None, type: , valid types:

What is happening here? Do you remember the email_backend provided by the AWS? For some reasons the mail_from parameter which is passed as Source to boto3 does not contain the correct value.

This problem was already reported in a few Airflow issues and PRs. The fix didn't make the cut for Airflow 2.2 and will be probably there in version 2.3, but because we are talking about MWAA (version 2.0.2), we don't really know when this will be fixed on AWS.

A possible solution

The solution I come up with was to rewrite the emailer.py utility, deploy it in the dags folder and reference it in the MWAA configuration.

Here the new emailer (I put mine in ses_email_fix/emailer.py, with an empty __init__.py):

"""Airflow module fix for email backend using AWS SES"""

from typing import List, Optional, Union

from airflow.configuration import conf
from airflow.providers.amazon.aws.hooks.ses import SESHook


def send_email(to: Union[List[str], str],
               subject: str,
               html_content: str,
               files: Optional[List] = None,
               cc: Optional[Union[List[str], str]] = None,
               bcc: Optional[Union[List[str], str]] = None,
               mime_subtype: str = 'mixed',
               mime_charset: str = 'utf-8',
               conn_id: str = 'aws_default',
               **kwargs,
              ) -> None:
    """Email backend for SES."""

    hook = SESHook(aws_conn_id=conn_id)

    hook.send_email(mail_from=conf.get('smtp', 'SMTP_MAIL_FROM'),
                    to=to,
                    subject=subject,
                    html_content=html_content,
                    files=files,
                    cc=cc,
                    bcc=bcc,
                    mime_subtype=mime_subtype,
                    mime_charset=mime_charset,
                   )
Enter fullscreen mode Exit fullscreen mode

This is taking the mail_from from the smtp.smtp_mail_from MWAA environment setting. Also I changed my email.email_backend to be ses_email_fix.emailer.send_email.

Last thing you don't want to forget is to put the folder ses_email_fix in your .airflowignore file (otherwise Airflow will parse that as a DAG).

Now, after updating your environment, your test DAG should be able to fire emails via SES without using credentials.

Last words are for the Airflow community who came up with a quick workaround for this problem that I just implemented in my own MWAA environment.

Credits: Cover photo by Quoc Nguyen from Pexels

💖 💪 🙅 🚩
mucio
mucio

Posted on November 2, 2021

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related

AWS MWAA and AWS SES integration