Enabling Apache Airflow to copy large S3 objects

mauricebrg

Maurice Borgmeier

Posted on August 27, 2024

Enabling Apache Airflow to copy large S3 objects

If you're trying to use Apache Airflow to copy large objects in S3, you might have encountered issues where S3 complains about you sending an InvalidRequest. We will fix that in this post by writing a custom operator to handle the underlying problem. Before we do that, let's first understand where this issue originated.

In case you need a primer on Airflow, check out our post Understanding Apache Airflow on AWS, but if you do - what are you doing here?

The error message that led you to this post looks something like this:

An error occurred (InvalidRequest) when calling the CopyObject operation: The specified copy source is larger than the maximum allowable size for a copy source [...]

And the problematic code that caused it may look something like that. Although this code is not problematic in itself. For smaller objects, it works perfectly fine.

from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator

op = S3CopyObjectOperator(
    task_id="copy_source_file",
    source_bucket_name="source-bucket",
    source_bucket_key="path/to_large_object",
    dest_bucket_name="destination-bucket",
    dest_bucket_key="save-me-here",
)
Enter fullscreen mode Exit fullscreen mode

The name of the operator indicates the underlying API call, which is CopyObject. In general, CopyObject is pretty robust, but there's a failure mode that's not immediately obvious - it is limited to copying objects up to 5GB in a single operation. That means attempting to copy any object larger than that leads to the aforementioned error message.

This is mentioned in the first big info box in the docs, but who has time to read those, right? It even includes a link to the proposed solution, which means using multipart uploads to achieve this. Before you groan - no, you won't have to handle splitting up stuff into byte ranges and managing the individual copy operations. All of that comes included in your installation of boto3 in the form of the S3 client's copy operation.

The copy method uses the underlying s3transfer library that ships with boto3 and transparently manages the multipart uploads. I should note here, that despite the name being "multipart upload", we don't need to download the object first before uploading it again, it uses the UploadPartCopy API, which keeps the data internal to S3.

Using this is as simple as subclassing the existing S3CopyObjectOperator and overwriting the execute method, which Airflow calls to perform the actual operation. In the code below, you can see that I try to delegate the API call to the parent class and only become active if there's an exception.

from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator
from botocore.exceptions import ClientError

class S3CopyOperator(S3CopyObjectOperator):
    """
    An extension of the S3CopyObjectOperator that can copy
    objects larger than 5GB.
    """

    def execute(self, context: Context):

        s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)

        try:
            super().execute(context)
        except ClientError as err:

            if err.response["Error"]["Code"] == "InvalidRequest":
                # The response when we try to copy more than 5GB in one request.
                s3_hook.conn.copy(
                    CopySource={
                        "Bucket": self.source_bucket_name,
                        "Key": self.source_bucket_key,
                    },
                    Bucket=self.dest_bucket_name,
                    Key=self.dest_bucket_key,
                )
            else:
                raise err
Enter fullscreen mode Exit fullscreen mode

The error code InvalidRequest is used to denote this specific error, although confusingly it's not listed in the API docs for CopyObject but as part of the generic error responses documentation. If we get this error, we use the conn object of an S3Hook, which is basically a wrapper around boto3 to call the appropriate copy method with the parameters from our instance. You can use it like this:

op = S3CopyOperator(
    task_id="copy_source_file",
    source_bucket_name="source-bucket",
    source_bucket_key="path/to_large_object",
    dest_bucket_name="destination-bucket",
    dest_bucket_key="save-me-here",
)
Enter fullscreen mode Exit fullscreen mode

This approach means the API doesn't change, i.e., you can just replace the S3CopyObjectOperator instances with S3CopyOperator instances. Additionally, we only perform the extra work of doing the multipart upload when the simpler method is insufficient. The trade-off is that we're inefficient if almost every object is larger than 5GB because we're doing a "useless" API call first. As usual, it depends. A similar approach has been discussed in this Github Issue in the Airflow repository.

— Maurice


Photo by K. Mitch Hodge on Unsplash

💖 💪 🙅 🚩
mauricebrg
Maurice Borgmeier

Posted on August 27, 2024

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related