David💻
Posted on April 1, 2022
Continuando con la tercera y final parte del tutorial, explicare el código de nuestro ejemplo.
Escribiendo el código
Empezaremos por incluir las siguienteslibrerias de Glue.
- JsonOptions nos permitirá especificar los paths dónde queremos que se cree nuestro archivo final.
- DynamicFrame nos permitirá crear un frame de datos de tipo spark
- GlueContext nos permitirá ejecutar nuestro job ETL en el entorno serverless
- GlueArgParser nos permitirá leer las variables de sysArgs que enviemos a nuestro job
import com.amazonaws.services.glue.util.JsonOptions
import com.amazonaws.services.glue.{DynamicFrame, GlueContext}
import com.amazonaws.services.glue.util.GlueArgParser
Definiremos nuestra GlueApp y nuestro main donde se realizará la primera ejecución de nuestro código
object GlueApp {
def main(sysArgs: Array[String]) {
...
}
}
Ahora declararemos nuestras variables
GlueContext es el punto de entrada para leer y escribir un DynamicFrame de un bucket de Amazon S3, un catálogo de datos de AWS, JDBC, etc.
Esta clase ofrece funciones de utilidades para crear objetos Característica DataSource y DataSink que, a su vez, se pueden usar para leer y escribir objetos DynamicFrame.
val glueContext: GlueContext = new GlueContext(sc)
Punto de entrada principal para la funcionalidad Spark. Un SparkContext representa la conexión a un clúster Spark y se puede usar para crear RDD, acumuladores y variables de difusión en ese clúster.
val sc: SparkContext = new SparkContext()
Genera un identificador de la sesión Spark
val spark = glueContext.getSparkSession
object GlueApp {
val glueContext: GlueContext = new GlueContext(sc)
val sc: SparkContext = new SparkContext()
val spark = glueContext.getSparkSession
def main(sysArgs: Array[String]) {
...
}
}
Si deseamos leer una variable de ambiente que hemos enviado en los job parameters lo podemos leer de la siguiente forma:
/* Lee el valor del job parameter enviado ejemplo: --env (key) ci (value)
el valor lo leerá como ci */
val args = GlueArgParser.getResolvedOptions(sysArgs, Array("env"))
// ejemplo
// val table = s"${args("env")}_transactions" se traduce como ci_transactions
Dentro de nuestro main comenzaremos por declarar nuestra base de datos junto a nuestras tablas, esto nos permitirá transformar o ejecutar consultas.
// Catálogo de datos: bases de datos y tablas
val dbName = s"db-kushki-ejemplo"
val tblCsv = s"transacciones" //El nombre de la tabla con la ubicación del S3
val tblDynamo = s"transactions" //El nombre de la tabla con la ubicación de dynamo
Ahora declararemos nuestro output directory (la carpeta final donde se guardará nuestro archivo generado)
// Directorio final donde se guardará nuestro archivo dentro de un bucket S3
val baseOutputDir = s"s3://${args("env")}-trx-ejemplo/"
val transactionDir= s"$baseOutputDir/transaction/"
Una de las principales abstracciones de Apache Spark es el DataFrame de SparkSQL, que es similar a la construcción DataFrame que se encuentra en R y en Pandas. Un elemento DataFrame es similar a una tabla y admite operaciones de estilo funcional (map/reduce/filter/etc.) y operaciones SQL (select, project, aggregate). En este caso de nuestro cátalogo de datos crearemos un DynamicFrame de cada tabla
// Read data into a dynamic frame
val trx_dyn: DynamicFrame = glueContext.getCatalogSource(database = dbName, tableName = tblDYNAMO ).getDynamicFrame()
val trx_csv: DynamicFrame = glueContext.getCatalogSource(database = dbName, tableName = tblCSV ).getDynamicFrame()
ApplyMapping vs ResolveChoice
- ApplyMapping: Aplica un mapeo declarativo a un DynamicFrame especificado.
- ResolveChoice: Proporciona información para resolver tipos ambiguos dentro de un elemento DynamicFrame.
ApplyMapping | ResolveChoice | |
---|---|---|
Tipos de datos | Es incompatible si un tipo de dato es ambiguo | Se define un solo tipo de dato |
Mapping | El Dataframe devuelve solo lo que se mapea | Devuelve todos los campos incluyendo al campo que se le realizó el casting |
// ApplyMapping
val trx_dyn_mapping= trx_dyn.applyMapping(mappings = Seq(("id", "string", "id", "string"),("cliente", "string", "cliente", "string"),("estado", "string", "estado", "string"),("monto", "bigint", "monto", "double")), caseSensitive = false, transformationContext = "trx_dyn_mapping")
// ResolveChoice
val trx_dyn_resolve= trx_dyn.resolveChoice(specs = Seq(("monto", "cast:double")))
En nuestro ejemplo es necesario resolver el problema de los tipos de datos ambiguos debido a que en nuestro archivo csv se presenta datos de tipo bigint, y en nuestra tabla de dynamo se presenta datos de tipo Number, ambos tipos de datos deben ser del mismo tipo por lo que se necesita aplicar resolveChoice , en este caso applyMapping nos devolverá un problema debido a que la columna monto devolverá un struct de los diferentes tipos de dato.
val trx_dyn_resolve= trx_dyn.resolveChoice(specs = Seq(("monto", "cast:double")))
val trx_csv_resolve= trx_csv.resolveChoice(specs = Seq(("monto", "cast:double")))
En la siguiente sección de código procederemos a crear nuestra pseudo-tabla donde ejecutaremos sentencias SQL es importante darle un nombre simple pero distintivo
// Spark SQL on a Spark dataframe
val dynDf = trx_dyn_resolve.toDF()
dynDf.createOrReplaceTempView("dynamoTable")
val csvDf = trx_csv_resolve.toDF()
csvDf.createOrReplaceTempView("csvTable")
A continuación realizaremos nuestra sentencia SQL con cualquier lógica de negocio que necesitemos, para nuestro ejemplo realizaremos una sentencia simple en la cual obtenga todos los registros que hagan un match
// SQL Query
val dynSqlDf = spark.sql("SELECT T1.id,T1.monto,T1.cliente,T1.estado FROM dynamoTable T1 LEFT JOIN csvTable T2 ON (T1.id=T2.id) WHERE T2.idIS NOT NULL AND (T1.monto=T2.monto AND T1.cliente=T2.cliente AND T1.estado = T2.estado)")
El runtime por detrás de AWS Glue ejecuta un proceso de ApacheSpark por lo que los DynamicFrame que retornemos se crearán en multi partes, por lo que utilizaremos coalesce(1) para juntarlos en uno solo, sin embargo esto puede ocasionar errores en grandes cantidades de datos retornados.
//Compact al run-part files into one
val dynFile = DynamicFrame(dynSqlDf, glueContext).withName("dyn_dyf").coalesce(1)
Finalmente procederemos a guardar nuestro resultado especificando un path en un bucket de S3
// Save file into S3
glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions(Map("path" -> transactionDir)), format = "csv").writeDynamicFrame(dynFile)
Source Code
El script completo lo puedes encontrar aquí:
Github
Espero este tutorial te haya sido de ayuda !
Posted on April 1, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.