Mohsin Ashraf
Posted on December 14, 2020
We deal with data every day as part of my work in the data science team. It starts by collecting data and analyzing it for potentially important features and baseline numbers. Then we do data preprocessing and cleaning. Finally, we feed the data into a machine learning algorithm for training.
Once the training is complete, we test the model. We then serve via an API if the performance is good.
In a previous article, we talked about uploading large files using multipart upload via pre-signed URLs. We will take a step further now and discuss how to create a CLI tool for uploading large files to S3 using pre-signed URLs.
The article comprises 3 parts, as described below:
- Create pre-signed URLs for multipart upload
- Upload all parts of the object
- Complete the upload
Request for Multipart upload pre-signed URLs
First of all, we have to request the pre-signed URLs to the AWS S3 bucket. It will return a list of pre-signed URLs corresponding with each of the object’s parts, along with a upload_id, which is associated with the object whose parts are being created. Let’s create the route for requesting pre-signed URLs.
from pathlib import Path
…
…
@app.route('/presigned',methods=['POST'])
def return_presigned():
data = request.form.to_dict(flat=False)
file_name = data['file_name'][0]
file_size = int(data['file_size'][0])
target_file = Path(file_name)
max_size = 5 * 1024 * 1024
upload_by = int(file_size / max_size) + 1
bucket_name = "YOUR_BUCKET_NAME"
key = file_name
upload_id = s3util.start(bucket_name, key)
urls = []
for part in range(1, upload_by + 1):
signed_url = s3util.create_presigned_url(part)
urls.append(signed_url)
return jsonify({
'bucket_name':bucket_name,
'key':key,
'upload_id':upload_id,
'file_size:file_size,
'file_name':file_name,
'max_size':max_size,
'upload_by':upload_by,
'urls':urls
})
Let’s go through the code. In this route (Flask route), we get the information sent in the request: file_name and file_size.
The file_name will be used in creating URLs for parts of the object, and file_size will be used to find how many parts to create (pre-signed URLs to create).
In the route, max_size determines each part’s maximum size. You can change it according to your needs.
upload_by tells how many parts there will be for the object to upload.
bucket_name is the bucket you want to upload data in.
upload_id is generated using the S3 utility function create_multipart_upload, which we will discuss shortly.
After that, pre-signed URLs are created in the for loop using the create_presigned_url utility function of s3. Again, we will come back to it in a bit.
Next, I return the required data in JSON format.
Now, let’s talk about create_multipart_upload. It’s a utility function that helps me encapsulate the code so it’s more readable and manageable. Following is the code for the utility class.
import boto3
from botocore.exceptions import ClientError
from boto3 import Session
class S3MultipartUploadUtil:
"""
AWS S3 Multipart Upload Uril
"""
def __init__(self, session: Session):
self.session = session
self.s3 = session.client('s3')
self.upload_id = None
self.bucket_name = None
self.key = None
def start(self, bucket_name: str, key: str):
"""
Start Multipart Upload
:param bucket_name:
:param key:
:return:
"""
self.bucket_name = bucket_name
self.key = key
res = self.s3.create_multipart_upload(Bucket=bucket_name, Key=key)
self.upload_id = res['UploadId']
logger.debug(f"Start multipart upload '{self.upload_id}'")
return self.upload_id
def create_presigned_url(self, part_no: int, expire: int=3600) -> str:
"""
Create pre-signed URL for upload part.
:param part_no:
:param expire:
:return:
"""
signed_url = self.s3.generate_presigned_url(
ClientMethod='upload_part',
Params={'Bucket': self.bucket_name,
'Key': self.key,
'UploadId': self.upload_id,
'PartNumber': part_no},
ExpiresIn=expire)
logger.debug(f"Create presigned url for upload part '{signed_url}'")
return signed_url
def complete(self, parts,id,key,bucket_name):
"""
Complete Multipart Uploading.
`parts` is list of dictionary below.
```
[ {'ETag': etag, 'PartNumber': 1}, {'ETag': etag, 'PartNumber': 2}, ... ]
```
you can get `ETag` from upload part response header.
:param parts: Sent part info.
:return:
"""
res = self.s3.complete_multipart_upload(
Bucket=bucket_name,
Key=key,
MultipartUpload={
'Parts': parts
},
UploadId=id
)
logger.debug(f"Complete multipart upload '{self.upload_id}'")
logger.debug(res)
self.upload_id = None
self.bucket_name = None
self.key = None
In this class, I wrap the functionality of the S3 client to make it easy to use and less cluttered in the API file.
Once you get the response from the API, it would look something like this:
You would download this response in a JSON file to upload the data using the CLI.
Upload all parts of the object
Now let’s turn to the CLI code, which uses this JSON file, and we assume that we save this file as presigned.json.
import requests
import progressbar
from pathlib import Path
def main():
data = eval(open('presigned.json').read())
upload_by = data['upload_by']
max_size = data['max_size']
urls = data['urls']
target_file = Path(data['file_name'])
file_size = data['file_size']
key = data['key']
upload_id = data['upload_id']
bucket_name = data['bucket_name']
bar = progressbar.ProgressBar(maxval=file_size, \
widgets=[progressbar.Bar('=', '[', ']'), ' ', progressbar.Percentage()])
json_object = dict()
parts = []
file_size_counter = 0
with target_file.open('rb') as fin:
bar.start()
for num, url in enumerate(urls):
part = num + 1
file_data = fin.read(max_size)
file_size_counter += len(file_data)
res = requests.put(url, data=file_data)
if res.status_code != 200:
print (res.status_code)
print ("Error while uploading your data.")
return None
bar.update(file_size_counter)
etag = res.headers['ETag']
parts.append((etag, part))
bar.finish()
json_object['parts'] = [
{"ETag": eval(x), 'PartNumber': int(y)} for x, y in parts]
json_object['upload_id'] = upload_id
json_object['key'] = key
json_object['bucket_name'] = bucket_name
requests.post('https://YOUR_HOSTED_API/combine, json={'parts': json_object})
print ("Dataset is uploaded successfully")
if __name__ == "__main__":
main()
The above code loads the file and gets all the required information, including upload_id, URLs, and others. I use Progressbar to show progress while uploading the file. The entire code is pretty much self-explanatory except for the following line of code:
requests.post('https://YOUR_HOSTED_API/combine, json={'parts': json_object})
To understand this piece of code, we have to look at the final step of completing the upload.
Complete the upload
We have uploaded all parts of the file, but these parts are not yet combined. To combine them we need to tell the s3 that we have finished uploading and that now you can combine the parts. The above request calls the route in the table below and completes the multipart upload using the s3 utility class. It provides the proper information about the file and the upload_id, which tells s3 about the parts of the same file being uploaded using the upload_id.
@app.route("/combine",methods=["POST"])
def combine():
body = request.form
body = body['parts']
session = Session()
s3util = Presigned(session)
parts = body['parts']
id, key, bucket_name = body['upload_id'], body['key'], body['bucket_name']
PARTS = [{"Etag": eval(x), 'PartNumber': int(y)} for x, y in parts]
s3util.complete(PARTS, id, key, bucket_name)
return Response(status_code=200)
This code is a very minimum required code to create a CLI tool. You can deploy it on a server, which has proper roles in AWS for interacting with S3, to create and return the pre-signed URLs for completing the multipart upload. This way, you can make sure that no one has direct access to your S3 bucket. Instead, they upload the data using pre-signed URLs, which is a secure way of uploading the data.
Posted on December 14, 2020
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.