Alexis Alvarez
Posted on November 2, 2021
S3 es un servicio excelente que permite almacenar y recuperar cantidades ilimitadas de información desde cualquier parte del mundo, es una de las alternativas más atractivas si no es la mejor cuando se trata de lagos de datos y es que S3 puede almacenar cualquier tipo de archivo, tiene muchas capacidades como las de encriptación hasta seguridad para accesos en todos los niveles, puede almacenar logs hasta inclusive websites estáticas.
La unidad de almacenamiento son denominados objetos, cada objeto puede pesar hasta 5 TB.
AWS S3 almacena los objetos en un contenedor al cual se le denomina “Bucket”, el cual puede almacenar de forma ilimitada un numero de objetos. Los nombres de los buckets son únicos de forma global, es decir no hay otra cuenta de AWS que pueda tener el mismo nombre de un bucket.
Para este ejemplo usaremos el AWS SDK para Python(Boto3), algunas API Calls que tenemos son las siguientes: put_object, copy, copy_object, upload_file, upload_fileobj.
Dicho lo anterior, algunas formas para subir archivos en S3 son:
1. Subir un solo archivo usando la consola de AWS S3, desde la consola de administración es posible subir un objeto hasta los 160 GB.
2. Cargar un objeto en una sola operación usando AWS CLI: entre las operaciones diponibles estan:
- cp -Copiar
- mv-Mover
- sync-Sincronizar
aws s3 cp "video2.mp4" "s3://bucket-test-uploads"
3. Cargar un objeto en una sola operación usando los AWS SDK, REST API: con una operación PUT se puede subir un objeto hasta 5GB. En esta función abrimos el archivo en memoria e invocamos la función put_object al cual le pasamos los datos del archivo, el nombre y el bucket de destino, finalmente imprimimos la respuesta o en caso de haber una excepción capturarla.
def put_object(self, s3_client):
try:
with open(self.file, 'rb') as fd:
response = s3_client.put_object(
Bucket=self.bucket,
Key=self.key,
Body=fd
)
print(json.dumps(response, sort_keys=True, indent=4))
print("Put Object exitoso")
return True
except FileNotFoundError:
print("Archivo no encontrado")
return False
except Exception as e:
print(str(e))
return False
4. Cargar un objeto en partes usando los AWS SDK, REST API o AWS CLI con la API multipart upload, es posible subir archivos hasta 5TB .
Hemos creado la función multipart_upload, donde iniciamos el proceso de carga y obteniendo el UploadId
def multipart_upload(file, key, bucket, chunk_size, processes):
upload_id = start_upload(bucket, key)
print(f'Starting upload: {upload_id}')
Abrimos el archivo en modo lectura en memoria, y definimos una cola para paralelizar procesos, cada parte debe estar entre 5MB y 100Mb
file_upload = open(file, 'rb')
part_procs = []
proc_queue = multiprocessing.Queue()
queue_returns = []
chunk_size = (chunk_size * 1024) * 1024
part_num = 1
chunk = file_upload.read(chunk_size)
Iteramos cada 5MB y los vamos metiendo a la cola, en esta sección añadimos el método add_part, el cual es el encargado de subir una parte del archivo al S3.
while len(chunk) > 0:
proc = multiprocessing.Process(target=add_part, args=(
proc_queue, chunk, bucket, key, part_num, upload_id))
part_procs.append(proc)
part_num += 1
chunk = file_upload.read(chunk_size)
Realiza agrupación de procesamiento según número de procesos simultáneos, por ejemplo, un archivo de 421MB, tendrá 85 partes de 5MB aproximadamente. Como resultado se tendrá 43 ejecuciones con 2 procesos simultáneos, en la última ejecución parte tendré 1 parte
part_procs = [part_procs[i * processes:(i + 1) * processes]
for i in range((len(part_procs) + (processes - 1)) // processes)]
Ejecutamos cada “n” procesos en simultaneo y guardamos los resultados. Ordenamos la lista basada en los 'PartNumber' los cuales son los resultados de carga por cada parte, para finalmente completar la carga con el método end_upload
for i in range(len(part_procs)):
for p in part_procs[i]:
p.start()
for p in part_procs[i]:
p.join()
for p in part_procs[i]:
queue_returns.append(proc_queue.get())
queue_returns = sorted(queue_returns, key=lambda i: i['PartNumber'])
response = end_upload(
bucket, key, upload_id, queue_returns)
print(json.dumps(response, sort_keys=True, indent=4))
Si todo cargo bien veremos lo siguientes resultados:
5. Bonus: Cargar un objeto en partes usando upload_file este método es manejado por S3TransferManager.
Esto significa que automáticamente manejará las cargas multiparte por nosotros, si es necesario. A diferencia del método put_object realiza una llamada directamente a la low-level API de S3. No maneja cargas multiparte por nosotros. Intentará enviar todo en una sola solicitud
try:
s3_client.upload_file(self.file, self.bucket, self.key)
print("Subida exitosa")
return True
except FileNotFoundError:
print("Archivo no encontrado")
return False
except Exception as e:
print(str(e))
return False
Un detalle importe, como buena práctica estamos usando una variable del perfil el ambiente que tengamos a AWS:
session = boto3.Session(profile_name='aadev')
Un ejempplo para la invocacion seria el siguiente
if __name__ == '__main__':
s3_client = session.client('s3')
obj = S3Uploader(r'ruta\video2.mp4',
"video2.mp4", "aa-dev-test123", 5, 3)
#Carga de archivos multiparte
multipart_upload(obj.file, obj.key, obj.bucket,
obj.chunk_size, obj.processes)
#Carga de archivos con upload_file
obj.upload_to_aws(s3_client)
print(obj)
Y eso es todo por ahora, he querido compartir algunas de las formas con las que se pueden subir archivos en s3 y a esto se nos abre una gamma de posibles casos de usos, el codigo es mejorable. Hasta la proxima!
Si deseas ver más contenido puedes verlo en mi sitio: bigdateros.com
La url del repositorio se encuentra en: https://github.com/bigdateros/AWS-S3
Posted on November 2, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.