Qué es y como crear ETL en AWS Glue Parte 3

davidshaek

David💻

Posted on April 1, 2022

Qué es y como crear ETL en AWS Glue Parte 3

Continuando con la tercera y final parte del tutorial, explicare el código de nuestro ejemplo.

Escribiendo el código

Empezaremos por incluir las siguienteslibrerias de Glue.

  • JsonOptions nos permitirá especificar los paths dónde queremos que se cree nuestro archivo final.
  • DynamicFrame nos permitirá crear un frame de datos de tipo spark
  • GlueContext nos permitirá ejecutar nuestro job ETL en el entorno serverless
  • GlueArgParser nos permitirá leer las variables de sysArgs que enviemos a nuestro job
import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.{DynamicFrame, GlueContext}
import com.amazonaws.services.glue.util.GlueArgParser
Enter fullscreen mode Exit fullscreen mode

Definiremos nuestra GlueApp y nuestro main donde se realizará la primera ejecución de nuestro código

object GlueApp {
  def main(sysArgs: Array[String]) {
  ...
  }
}
Enter fullscreen mode Exit fullscreen mode

Ahora declararemos nuestras variables

GlueContext es el punto de entrada para leer y escribir un DynamicFrame de un bucket de Amazon S3, un catálogo de datos de AWS, JDBC, etc.

Esta clase ofrece funciones de utilidades para crear objetos Característica DataSource y DataSink que, a su vez, se pueden usar para leer y escribir objetos DynamicFrame.

val glueContext: GlueContext = new GlueContext(sc)
Enter fullscreen mode Exit fullscreen mode

Punto de entrada principal para la funcionalidad Spark. Un SparkContext representa la conexión a un clúster Spark y se puede usar para crear RDD, acumuladores y variables de difusión en ese clúster.

val sc: SparkContext = new SparkContext()
Enter fullscreen mode Exit fullscreen mode

Genera un identificador de la sesión Spark

val spark = glueContext.getSparkSession
Enter fullscreen mode Exit fullscreen mode
object GlueApp {
 val glueContext: GlueContext = new GlueContext(sc)
 val sc: SparkContext = new SparkContext()
 val spark = glueContext.getSparkSession
  def main(sysArgs: Array[String]) {
  ...
  }
}
Enter fullscreen mode Exit fullscreen mode

Si deseamos leer una variable de ambiente que hemos enviado en los job parameters lo podemos leer de la siguiente forma:

/* Lee el valor del job parameter enviado ejemplo: --env (key) ci (value) 
el valor lo leerá como ci */
val args = GlueArgParser.getResolvedOptions(sysArgs, Array("env"))
// ejemplo
// val table = s"${args("env")}_transactions" se traduce como ci_transactions
Enter fullscreen mode Exit fullscreen mode

Dentro de nuestro main comenzaremos por declarar nuestra base de datos junto a nuestras tablas, esto nos permitirá transformar o ejecutar consultas.

// Catálogo de datos: bases de datos y tablas
val dbName = s"db-kushki-ejemplo"
val tblCsv = s"transacciones" //El nombre de la tabla con la ubicación del S3
val tblDynamo = s"transactions" //El nombre de la tabla con la ubicación de dynamo
Enter fullscreen mode Exit fullscreen mode

Ahora declararemos nuestro output directory (la carpeta final donde se guardará nuestro archivo generado)

// Directorio final donde se guardará nuestro archivo dentro de un bucket S3
val baseOutputDir = s"s3://${args("env")}-trx-ejemplo/"
val transactionDir= s"$baseOutputDir/transaction/"
Enter fullscreen mode Exit fullscreen mode

Una de las principales abstracciones de Apache Spark es el DataFrame de SparkSQL, que es similar a la construcción DataFrame que se encuentra en R y en Pandas. Un elemento DataFrame es similar a una tabla y admite operaciones de estilo funcional (map/reduce/filter/etc.) y operaciones SQL (select, project, aggregate). En este caso de nuestro cátalogo de datos crearemos un DynamicFrame de cada tabla

// Read data into a dynamic frame
val trx_dyn: DynamicFrame = glueContext.getCatalogSource(database = dbName, tableName = tblDYNAMO ).getDynamicFrame()
val trx_csv: DynamicFrame = glueContext.getCatalogSource(database = dbName, tableName = tblCSV ).getDynamicFrame()
Enter fullscreen mode Exit fullscreen mode

ApplyMapping vs ResolveChoice

  • ApplyMapping: Aplica un mapeo declarativo a un DynamicFrame especificado.
  • ResolveChoice: Proporciona información para resolver tipos ambiguos dentro de un elemento DynamicFrame.
ApplyMapping ResolveChoice
Tipos de datos Es incompatible si un tipo de dato es ambiguo Se define un solo tipo de dato
Mapping El Dataframe devuelve solo lo que se mapea Devuelve todos los campos incluyendo al campo que se le realizó el casting
// ApplyMapping
val trx_dyn_mapping=  trx_dyn.applyMapping(mappings = Seq(("id", "string", "id", "string"),("cliente", "string", "cliente", "string"),("estado", "string", "estado", "string"),("monto", "bigint", "monto", "double")), caseSensitive = false, transformationContext = "trx_dyn_mapping")
// ResolveChoice
val trx_dyn_resolve= trx_dyn.resolveChoice(specs = Seq(("monto", "cast:double")))
Enter fullscreen mode Exit fullscreen mode

En nuestro ejemplo es necesario resolver el problema de los tipos de datos ambiguos debido a que en nuestro archivo csv se presenta datos de tipo bigint, y en nuestra tabla de dynamo se presenta datos de tipo Number, ambos tipos de datos deben ser del mismo tipo por lo que se necesita aplicar resolveChoice , en este caso applyMapping nos devolverá un problema debido a que la columna monto devolverá un struct de los diferentes tipos de dato.

val trx_dyn_resolve= trx_dyn.resolveChoice(specs = Seq(("monto", "cast:double")))
val trx_csv_resolve= trx_csv.resolveChoice(specs = Seq(("monto", "cast:double")))
Enter fullscreen mode Exit fullscreen mode

En la siguiente sección de código procederemos a crear nuestra pseudo-tabla donde ejecutaremos sentencias SQL es importante darle un nombre simple pero distintivo

// Spark SQL on a Spark dataframe
val dynDf = trx_dyn_resolve.toDF()
dynDf.createOrReplaceTempView("dynamoTable")
val csvDf = trx_csv_resolve.toDF()
csvDf.createOrReplaceTempView("csvTable")
Enter fullscreen mode Exit fullscreen mode

A continuación realizaremos nuestra sentencia SQL con cualquier lógica de negocio que necesitemos, para nuestro ejemplo realizaremos una sentencia simple en la cual obtenga todos los registros que hagan un match

// SQL Query
val dynSqlDf = spark.sql("SELECT T1.id,T1.monto,T1.cliente,T1.estado FROM dynamoTable T1 LEFT JOIN csvTable T2 ON (T1.id=T2.id) WHERE T2.idIS NOT NULL AND (T1.monto=T2.monto AND T1.cliente=T2.cliente AND T1.estado = T2.estado)")
Enter fullscreen mode Exit fullscreen mode

El runtime por detrás de AWS Glue ejecuta un proceso de ApacheSpark por lo que los DynamicFrame que retornemos se crearán en multi partes, por lo que utilizaremos coalesce(1) para juntarlos en uno solo, sin embargo esto puede ocasionar errores en grandes cantidades de datos retornados.

//Compact al run-part files into one
val dynFile = DynamicFrame(dynSqlDf, glueContext).withName("dyn_dyf").coalesce(1)
Enter fullscreen mode Exit fullscreen mode

Finalmente procederemos a guardar nuestro resultado especificando un path en un bucket de S3

// Save file into S3
glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions(Map("path" -> transactionDir)), format = "csv").writeDynamicFrame(dynFile)
Enter fullscreen mode Exit fullscreen mode

Source Code
El script completo lo puedes encontrar aquí:
Github

Espero este tutorial te haya sido de ayuda !

💖 💪 🙅 🚩
davidshaek
David💻

Posted on April 1, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related