Limpiar y preparar grandes cantidades de datos de manera fácil con Glue DataBrew
camilo cabrales
Posted on March 27, 2023
Usualmente cuando queremos realizar un proceso de ETL (Extraer, Transformar, Cargar) utilizamos algunas herramientas o lenguajes de programación nos ayudan en este proceso. El desarrollo de ETL requiere de ciertos conocimientos técnicos, que nos tomaran un tiempo en adquirir. Para simplificar este proceso AWS tiene un servicio llamado Glue DataBrew que nos va a simplificar y agilizar la tarea de realizar las ETL.
En este post vamos a crear la siguiente arquitectura:
EL objetivo de esta arquitectura es: Cuando se cargue un archivo al Bucket este llame a una función Lambda que a su vez va a ejecutar una Step Fucntion que va realizar el procesamiento que hallamos definido para nuestro data set y enviara un mensaje de texto si se ejecuto correctamente o si hubo un error durante el procesamiento de la información.
Iniciemos creando el Bucket que va almacenar nuestro conjunto de datos y el resultado del proceso de ETL. Para esto buscamos el servicio de S3 y damos click en el botón Create Bucket, una vez en la pagina de creación del Bucket le damos un nombre (el nombre debe ser úrico en todo AWS), dejamos las opciones por defecto y damos click en el botón en Create Bucket que se encuentra en la parte inferior de la pantalla.
Vamos a crear dos carpetas en nuestro Bucket una para los archivos que vamos a procesar y otra de resultados. Damos click en el botón Create Folder.
La carpeta que contiene el archivo a procesar le damos el nombre de procesardata y la que almacena los resultados le damos el nombre de resultados.
Creamos la carpeta procesardata y realizamos el mismo procedimiento para la carpeta de resultados.
Creadas las carpetas deberíamos verlas en nuestro Bucket.
Ahora necesitamos subir a la carpeta de procesardata el archivo con el cual vamos a trabajar damos click en la carpeta y damos click en el botón Upload.
Seleccionamos el archivo y damos click en el botón Upload.
Ya que tenemos el archivo con el cual vamos a trabajar, buscamos el servicio Glue DataBrew una vez en la pagina principal damos click en DataSets.
En la siguiente pantalla seleccionamos el Bucket, el folder procesardata y el archivo precios.csv.
Para terminar con la creación del DataSet seleccionamos el tipo de archivo el cual estamos utilizando (.CSV separado por comas) e indicamos que la primera fila del archivo es la cabecera y damos click en el botón Create dataset.
Creado nuestro DataSet el siguiente paso es crear un proyecto en el cual vamos a realizar las modificaciones de nuestros datos. para esto damos click en Projects y luego en el botón Create project.
Para crear el proyecto necesitamos la siguiente información:
Nombre del proyecto
Receta o nombre de la receta si vamos a crear una nueva. La receta son las modificaciones que vamos a realizar al conjunto de datos.
Debemos seleccionar el Dataset con el cual vamos a trabajar.
Debemos utilizar un rol existente o crear uno nuevo definiendo un prefijo para este.
Creado nuestro proyectos veremos una pagina como la siguiente.
- En la parte superior vamos a ver las opciones de uso frecuente de la receta que podemos aplicar a nuestros datos.
- Abajo de la cabecera de nuestros datos vamos a poder observar las estadísticas de cada columna, lo que nos ayuda a tener una idea del comportamiento de los datos que están almacenados dentro de cada una de ellas.
- En la parte derecha de la pantalla tenemos la sección de la receta, donde vamos a poder agregar diferentes tipos de modificaciones a nuestro conjunto de datos.
Ya que esta listo nuestro proyecto para trabajar vamos a crear nuestra receta, para esto damos click en el botón Add step que se encuentra en la sección de la receta.
Una vez damos click en el botón Add step se despliegan las diferentes opciones que podemos utilizar para modificar los datos. Vamos a seleccionar la opción Change to capital case de la sección de FORMAT.
Como la columna PRODUCTO tiene los nombres en mayusculas los vamos a cambiar para que cada palabra empieza por una letra en mayúscula.
Al dar click en el botón Apply veremos los resultados.
Vemos que las columnas de los días tienen los precios de cada producto, sin embargo si quisiéramos hacer alguna operación matemática sobre ellos no será posible debido a que tienen el signo $. Veamos como podemos eliminar el signo $ de las columnas.
Vamos al menú superior seleccionamos Clean y en la sección Remove seleccionamos Special characters.
Al realizar la selección en sección de Recipies se nos despliegan las opciones para realizar la limpieza de la columna. Seleccionamos la columna LUNES,Special characters y Custom value para remover el signo - que aparece delante de los ceros.
Al dar click en el botón Apply veremos que de la columna LUNES se quitaron los caracteres: - $ y se agrego un nuevo paso a nuestra receta. Este proceso lo puedes realizar para el resto de columnas de los días de la semana.
Al finalizar de remover los caracteres especiales de las columnas de los días de la semana, nuestra receta se vera de la siguiente forma.
Ahora que nuestras columnas no tienen caracteres especiales vamos a cambiar el tipo de dato para poder realizar algunas operaciones. En la parte superior de cada columna podemos ver el tipo de datos de cada columna, en las columnas de los días de la semana identificamos que son de tipo string y necesitamos cambiarlas a un valor numérico en este caso double.
Al dar click en Change type se nos presentan las opciones donde debemos seleccionar la columna y el tipo de dato al cual queremos cambiar.
Damos click en el botón Apply dando como resultado el cambio de tipo de columna y un nuevo paso se agrego a nuestra receta.
Debemos hacer este mismo procedimiento a las otras columnas, para que nuestra receta quede de la siguiente forma.
Para terminar con nuestra receta vamos a crear un nueva columna llamada Total que va almacenar la suma de los días LUNES Y VIERNES.
Hacemos click en botón FUNCTIONS - Math functions - ADD.
En la sección de la receta veremos las opciones donde seleccionamos el tipo de función (ADD), las columnas que vamos a sumar (LUNES Y VIERNES) y por ultimo damos el nombre de la columna que va almacenar el resultado (Total).
Al dar click en el botón Apply vemos que se creo la columna total y contiene la suma de las columnas LUNES Y VIERNES.
Con esto finalizamos la modificación de nuestro DataSet, ahora necesitamos definir como se van a ejecutar estas modificaciones por lo tanto necesitamos crear un Job.
Damos click en el botón Create job.
En la pantalla de creación del Job le asignamos un nombre al Job, seleccionamos el Bucket y la carpeta en la cual queremos que se almacenen los resultados (Bucket y carpeta que creamos al inicio) y le asignamos un rol que puede ser el que creamos al momento de configurar el proyecto o uno nuevo.
Al crearse el Job vamos a ver el listado con los Jobs que hayamos creado, seleccionamos el Job que acabamos de crear y damos click en el botón Run Job.
Para ver los resultados podemos ir a la carpeta resultados del Bucket que creamos y ver el archivo que genero el Job.
Hemos visto como crear, modificar un DataSet y obtener los resultados por medio de las recetas y los Jobs.
Ahora necesitamos implementar los siguientes servicios para completar nuestra arquitectura:
Un Topic y una Suscripción de SNS para enviar los mensajes de notificación.
Una Step Function que va a realizar todo el flujo para procesar el Dataset, modificando el Dataset para que tome el archivo que se suba a S3, ejecutar el Job y enviar una mensaje de texto para notificar que el proceso se realizo correctamente.
Una función Lambda que se va encargar de llamar a la Step Function cuando se suba un archivo al Bucket de S3.
Esta implementación la vamos a realizar con la siguiente plantilla de Terraform.
provider "aws" {
region = "us-east-1"
}
resource "random_id" "random" {
byte_length = 2
}
variable "job_name"{
type= string
default= "PreciosJob"
}
variable "bucket_name" {
type = string
default = "datosgluedatabrew"
}
variable "telefono"{
type = string
default = "+573005465765"
}
resource "aws_lambda_function" "lambda_function_call_step_function" {
filename = "lambda_function.py.zip"
function_name = "LambdaStepFunction-${random_id.random.hex}"
role = aws_iam_role.lambda_execution_role.arn
handler = "lambda_function.lambda_handler"
runtime = "python3.8"
memory_size = 128
timeout = 5
environment {
variables = {
STEPFUNCTIONARN = aws_sfn_state_machine.function.arn
}
}
}
resource "aws_iam_role" "lambda_execution_role" {
name = "lambda_execution_role-${random_id.random.hex}"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "lambda.amazonaws.com"
}
}
]
})
}
resource "aws_iam_role_policy_attachment" "lambda_execution_role_policy_attach" {
policy_arn = "arn:aws:iam::aws:policy/AWSStepFunctionsFullAccess"
role = aws_iam_role.lambda_execution_role.name
}
resource "aws_iam_role_policy_attachment" "lambda_execution_role_cloudwatch_attach" {
policy_arn = "arn:aws:iam::aws:policy/CloudWatchLogsFullAccess"
role = aws_iam_role.lambda_execution_role.name
}
resource "aws_lambda_permission" "s3_permission_to_trigger_lambda" {
statement_id = "AllowExecutionFromS3Bucket"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.lambda_function_call_step_function.arn
principal = "s3.amazonaws.com"
source_arn = "arn:aws:s3:::${var.bucket_name}"
}
resource "aws_s3_bucket_notification" "bucket_notification_put" {
bucket = "${var.bucket_name}"
lambda_function {
lambda_function_arn = aws_lambda_function.lambda_function_call_step_function.arn
events = ["s3:ObjectCreated:Put"]
filter_prefix = "procesardata/"
filter_suffix = ".csv"
}
depends_on = [aws_lambda_permission.s3_permission_to_trigger_lambda]
}
resource "aws_sfn_state_machine" "function" {
name = "StateMachineGlueDataBrew-${random_id.random.hex}"
role_arn = aws_iam_role.function_role.arn
definition = <<DEFINITION
{
"Comment": "A description of my state machine",
"StartAt": "Actualizar DataSet",
"States": {
"Actualizar DataSet": {
"Type": "Task",
"Parameters": {
"Format": "CSV",
"Input": {
"S3InputDefinition": {
"Bucket.$": "$.bucket",
"BucketOwner.$": "$.cuenta",
"Key.$": "$.carpeta"
}
},
"Name": "Precios"
},
"Resource": "arn:aws:states:::aws-sdk:databrew:updateDataset",
"Next": "Ejecutar Job"
},
"Ejecutar Job": {
"Type": "Task",
"Resource": "arn:aws:states:::databrew:startJobRun",
"Parameters": {
"Name": "${var.job_name}"
},
"Catch": [
{
"ErrorEquals": [
"DataBrew.ConflictException"
],
"Next": "SNS Error"
}
],
"Next": "SNS Job Ejecutado"
},
"SNS Error": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"Message": {
"Mensaje": "Se produjo un error"
},
"TopicArn": "${aws_sns_topic.topic_glue_databrew.arn}"
},
"End": true
},
"SNS Job Ejecutado": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "${aws_sns_topic.topic_glue_databrew.arn}",
"Message": {
"Mensaje": "Se ejecuto el job correctamente"
}
},
"End": true
}
}
}
DEFINITION
}
resource "aws_iam_role" "function_role" {
name = "my-state-machine-role-${random_id.random.hex}"
assume_role_policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "states.amazonaws.com"
}
}
]
})
}
resource "aws_iam_role_policy_attachment" "function_policy_role_sns" {
policy_arn = "arn:aws:iam::aws:policy/AmazonSNSFullAccess"
role = aws_iam_role.function_role.name
}
resource "aws_iam_role_policy_attachment" "lambda_policy_glue_data_brew" {
policy_arn = "arn:aws:iam::aws:policy/AwsGlueDataBrewFullAccessPolicy"
role = aws_iam_role.function_role.name
}
resource "aws_sns_topic" "topic_glue_databrew" {
name = "topic_glue_databrew-${random_id.random.hex}"
}
resource "aws_sns_topic_subscription" "subscription_glue_databrew" {
topic_arn = aws_sns_topic.topic_glue_databrew.arn
protocol = "sms"
endpoint = "${var.telefono}"
}
De la plantilla anterior debemos modificar las variables y la región en la cual se van desplegar los servicios.
La región.
provider "aws" {
region = "us-east-1"
}
El nombre que le dimos la Job.
variable "job_name"{
type= string
default= "PreciosJob"
}
El nombre que le dimos al Bucket.
variable "bucket_name" {
type = string
default = "datosgluedatabrew"
}
El numero de teléfono al cual queremos que lleguen los mensajes cuando se ejecuta el Job.
variable "telefono"{
type = string
default = "+573005465765"
}
Una vez que hemos modificado las variables de acuerdo a nuestras configuraciones podemos validar y ejecutar la plantilla.
- Valida la plantilla
terraform validate
- Crea los servicios. Recuerda que la plantilla y el código de la función Lambda se encuentren en la misma carpeta.
terraform apply
Una vez terminada la ejecución del comando, vamos a revisar que los servicios se hayan desplegado correctamente.
Buscamos SNS y damos click en Topics.
Buscamos la función Lambda.
Buscamos la Step Function.
Como todos los servicios fueron desplegados con éxito vamos a subir un nuevo archivo al Bucket para que se procese el archivo con la llamada a la función Lambda y la Step Function, cuando esta termina su proceso damos click en el nombre de la Step Function y damos click en la ultima ejecución para ver el gráfico del proceso.
Con esto finalizamos la implementación y pruebas de la arquitectura planteada al inicio.Pueden realizar mas pruebas modificando la receta o agregando mas pasos a la Step Funcion.
Para no incurrir en ningún costo pueden borrar el proyecto de Glue DataBrew, el Bucket y para eliminar los servicios creados con Terraform pueden utilizar el siguiente comando.
terraform destroy
Me pueden encontrar en
Referencias
Posted on March 27, 2023
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.