aws

Crea tablas para reportes con DynamoDB & DynamoDB Streams

cecamilo

camilo cabrales

Posted on October 18, 2022

Crea tablas para reportes con DynamoDB & DynamoDB Streams

Generalmente usamos DynamoDB para soluciones que necesitan un tiempo de respuesta muy bajo y no para BI o analítica de datos debido a su naturaleza.
En este post vamos a utilizar DynamoDB Streams con el objetivo de filtrar los datos y crear una tabla con información especifica para que sea consultada por una herramienta de visualización de datos cómo puede ser QuickSight.

Vamos a hacer uso de los siguientes servicios en la implementación de la solución.

  • DynamoDB
  • Lambda
  • Cloud9

Suponiendo que tuviéramos almacenada la información de los partidos de futbol de todas las ligas de futbol del mundo desde 1970, vamos a manejar dos tablas una para los resultados de los partidos de futbol (tabla principal) y en otra tabla (consulta) vamos a almacenar y actualizar las tablas de posiciones para posteriormente hacer nuestras visualizaciones.

Iniciemos por crear el ambiente en Cloud9 en donde vamos ejecutar nuestros scripts para poblar las tablas de DynamoDB.

Buscamos el servicio Cloud9 y damos click en crear entorno.

Create Channel

Vamos a ver las opciones de configuración de la maquina EC2 qué queremos utilizar para nuestro entorno. Que en este caso vamos a dejar las opciones por defecto. Damos click en el botón siguiente.

Configurar entorno

Vemos la configuración que definimos para nuestro entorno y damos click en crear.

Crear entorno

Veremos una pantalla de espera cómo la siguiente mientras se crea nuestro entorno de trabajo.

Espera entorno

Una vez nuestro entorno esta listo, debemos crear el archivo .py (python) que contiene el código fuente de nuestra tabla de DynamoDB. Damos click en crear archivo.

Crear archivo

Debemos copiar el siguiente código en el nuevo archivo y guardarlo con extension .py, en este caso le vamos a dar el nombre de creartabla.py

import boto3
from boto3.dynamodb.conditions import Key


dynamodb = boto3.resource("dynamodb")

dynamodb.create_table(
        TableName='ResultadosPartidos',
        KeySchema=[
            {
                'AttributeName': 'PK',
                'KeyType': 'HASH'  # Partition key
            },
            {
                'AttributeName': 'SK',
                'KeyType': 'RANGE'  # Sort key
            }
        ],
        #Se deben crear las llaves nuevamente con su tipo de dato
        AttributeDefinitions=[
            {
                'AttributeName': 'PK',
                'AttributeType': 'S'  # Se utiliza S para tipos de datos String y N para numeros
            },
            {
                'AttributeName': 'SK',
                'AttributeType': 'S'
            }

        ],
        ProvisionedThroughput={
            'ReadCapacityUnits': 5,
            'WriteCapacityUnits': 5
        }
    )

dynamodb.create_table(
        TableName='TablaPosiciones',
        KeySchema=[
            {
                'AttributeName': 'Liga',
                'KeyType': 'HASH'  # Partition key
            },
            {
                'AttributeName': 'Equipo',
                'KeyType': 'RANGE'  # Sort key
            }
        ],
        #Se deben crear las llaves nuevamente con su tipo de dato
        AttributeDefinitions=[
            {
                'AttributeName': 'Liga',
                'AttributeType': 'S'  # Se utiliza S para tipos de datos String y N para numeros
            },
            {
                'AttributeName': 'Equipo',
                'AttributeType': 'S'
            }

        ],
        ProvisionedThroughput={
            'ReadCapacityUnits': 5,
            'WriteCapacityUnits': 5
        }
    )
Enter fullscreen mode Exit fullscreen mode

Crear archivo

Una vez creado el archivo debemos instalar boto3 ejecutando en la terminal el siguiente comando:

pip install boto3

Enter fullscreen mode Exit fullscreen mode

Instalado boto3 debemos ejecutar nuestro archivo, ejecutando en la terminal el siguiente comando:

python creartabla.py

Enter fullscreen mode Exit fullscreen mode

Ejecutar archivo

Una vez ejecutado nuestro script vamos a buscar el servicio de DynamoDB y ver las tablas que acabamos de crear.

Tabla DynamoDB

Ahora vamos a crear la función Lambda que va a filtrar los datos de la tabla principal a la tabla que vamos a utilizar para nuestras visualizaciones. Para esto buscamos Lambda en el buscador de servicios y damos click en Create Function.

Le damos un nombre a la función Lambda, seleccionamos el lenguaje de programación (Runtime = Python3.8)y damos click en Create Function.

Create Function

El siguiente paso es darle los permisos necesarios al rol asignado a la función Lambda. Para esto damos click en la pestaña Configuration en el menu Permissions y en el nombre del rol.

Select Rol

Agregamos las siguientes políticas al rol:

CloudWatchFullAccess: Nos sirve para acceder a logs de la función Lambda

AmazonDynamoDBFullAccess: Nos sirve para poder guardar información en la tabla y para qué DynamoDB Streams pueda invocar a la función. En otro tipo de entorno recuerden ser mas granulares con sus políticas.

En caso que únicamente se necesitara hacer invocaciones a la función Lambda desde DynamoDB Streams, se debe usar la política: AWSLambdaInvocation-DynamoDB.

Politicas Rol

Creada la función Lambda y asignado los permisos necesarios para su ejecución, vamos a habilitar el Stream en DynamoDb para que llame a la función Lambda cuando se ingrese un nuevo registro a la tabla.Para esto vamos a la pantalla principal de DynamoDB damos click en Tables, seleccionamos la tabla ResultadosPartidos y damos click en la pestaña Exports and streams.

Select Stream

Los siguientes pasos son habilitar el Stream y asignarle la función Lambda que ya creamos.

Habilitemos el Stream:

Select Stream

Seleccionamos el tipo de información que va a ser enviada la la función Lambda, en este caso solo seleccionamos la información del nuevo registro que se ingreso en la tabla.

Select Stream

Validamos que el stream haya quedado habilitado.

Validate Stream

Asignamos la función al trigger la función Lambda.

Create Trigger 1

Seleccionamos la función Lambda que va a procesar el Stream y la cantidad de registros que va a procesar en cada llamado.

Create Trigger 2

Por ultimo validamos que el trigger este habilitado y que la función Lambda que va a procesar el stream sea la correcta.

Create Trigger 3

El diseño de tabla de DynamoDb que vamos a utilizar en este post es de una sola tabla, necesitamos definir un filtro sobre los mensajes que van a ser enviados a la función Lambda con el objetivo que se procesen solamente los registros que necesitamos para construir la tabla de posiciones.

Para esto vamos de nuevo a nuestra función Lambda y damos click Configuration - Triggers. Podemos observar qué se agrego el trigger que creamos desde DynamoDB. Damos click en checkbox del trigger y después en el botón editar.

Lambda Trigger 1

La siguiente pantalla nos muestra la configuración actual del trigger. Damos click Additional settings

Lambda Trigger 2

Buscamos la sección Filter criteria y copiamos el siguiente filtro en el cuadro de texto.

{"eventName":["INSERT"],"dynamodb":{"NewImage":{"Local":{"S":[{ "exists": true }]}}},"dynamodb":{"NewImage":{"Visitante":{"S":[{ "exists": true }]}}}}
Enter fullscreen mode Exit fullscreen mode

Guardamos los cambios.

Lambda Trigger 3

Con los pasos anteriores configuramos el filtro del stream para que solo sean procesados por la función Lambda los eventos del tipo Insert y que dentro de la datos enviados se encuentre el campo Local y Visitante, los demás datos que no cumplan con esta condición son ignorados y la función Lambda no es invocada, lo que ayuda a reducir costos debido a que solo se procesa la información necesaria.

El siguiente paso es ponerle código fuente a nuestra función Lambda, vamos a la pestaña Code, reemplazamos el código existente por el siguiente y damos click en el botón Deploy.

import json
import boto3
from boto3.dynamodb.conditions import Key

dynamodb = boto3.resource("dynamodb")
tabla = dynamodb.Table("TablaPosiciones")
local = "local"
visitante = "visitante"

class Partido:

    def __init__(self,local,visitante,marcadorLocal,marcadorVisitante,liga):
        self.local = local
        self.visitante = visitante
        self.marcadorLocal = marcadorLocal
        self.marcadorVisitante = marcadorVisitante
        self.liga = liga


    def getLocal(self):
        return self.local

    def getVisitante(self):
        return self.visitante

    def getMarcadorLocal(self):
        return self.marcadorLocal

    def getMarcadorVisitante(self):
        return self.marcadorVisitante

    def getLiga(self):
        return self.liga

    def setLocal(self,local):
        self.local = local

    def setVisitantel(self,visitante):
        self.visitante = visitante

    def setMarcadorLocal(self,marcadorLocal):
        self.marcadorLocal = marcadorLocal

    def setMarcadorVisitante(self,marcadorVisitante):
        self.marcadorVisitante = marcadorVisitante

    def setLiga(self,liga):
        self.liga = liga

def obtenerPartido(data):
    local = data["Local"]["S"]
    visitante = data["Visitante"]["S"]
    marcadorLocal = data["MarcadorLocal"]["N"]
    marcadorVisitante = data["MarcadorVisitante"]["N"]
    liga = data["PK"]["S"].split("#")[0]
    return Partido(local,visitante,marcadorLocal,marcadorVisitante,liga)

def lambda_handler(event, context):

    data = event["Records"][0]["dynamodb"]["NewImage"]
    partido = obtenerPartido(data)
    datosLocal = tabla.query(KeyConditionExpression=Key('Liga').eq(partido.getLiga()) & Key("Equipo").eq(partido.getLocal()))
    datosVisitante = tabla.query(KeyConditionExpression=Key('Liga').eq(partido.getLiga()) & Key("Equipo").eq(partido.getVisitante()))
    actualizarDatosEquipo(datosLocal["Items"],partido,local)
    actualizarDatosEquipo(datosVisitante["Items"],partido,visitante)
    print("ejecuto")


def actualizarDatosEquipo(datos,partido,localidad):

    puntos = getPuntos(partido,localidad)

    if len(datos) == 0:
        tabla.put_item(Item={
            "Liga": partido.getLiga(),
            "Equipo":partido.getLocal() if localidad == local else partido.getVisitante(),
            "PartidosJugados":1,
            "GolesFavor": partido.getMarcadorLocal() if localidad == local else partido.getMarcadorVisitante(),
            "GolesContra":partido.getMarcadorLocal() if localidad != local else partido.getMarcadorVisitante(),
            "Puntos": puntos
        })
    else:
        tabla.update_item(
                Key={
                    "Liga": partido.getLiga(),
                    "Equipo": partido.getLocal() if localidad == local else partido.getVisitante()
                },
                UpdateExpression="set PartidosJugados=:partidos_jugados,GolesFavor=:goles_favor,GolesContra=:goles_contra,Puntos=:puntos",
                ExpressionAttributeValues={
                    ":partidos_jugados":datos[0]["PartidosJugados"] + 1,
                    ":goles_favor":int(datos[0]["GolesFavor"]) + int(partido.getMarcadorLocal()) if localidad == local else int(datos[0]["GolesFavor"]) +int(partido.getMarcadorVisitante()),
                    ":goles_contra":int(datos[0]["GolesContra"]) + int(partido.getMarcadorVisitante()) if localidad == local else int(datos[0]["GolesContra"])+int(partido.getMarcadorLocal()),
                    ":puntos":int(datos[0]["Puntos"]) + puntos
                },
                ReturnValues="UPDATED_NEW"
            )

def getPuntos(partido,localidad):
    puntos = 1
    if local == localidad:
        if partido.getMarcadorLocal() > partido.getMarcadorVisitante():
            puntos = 3
        elif partido.getMarcadorLocal() < partido.getMarcadorVisitante(): 
            puntos = 0
    else:
        if partido.getMarcadorLocal() < partido.getMarcadorVisitante():
            puntos = 3
        elif partido.getMarcadorLocal() > partido.getMarcadorVisitante(): 
            puntos = 0
    return puntos
Enter fullscreen mode Exit fullscreen mode

Deploy Code

Debemos validar el funcionamiento tanto del filtro como el del código de la función Lambda. Por lo tanto debemos ingresar nuevos items en la tabla ResultadosPartidos.
Nos dirigimos nuevamente a Cloud9 y creamos un nuevo archivo llamado data.py y lo ejecutamos utilizando en la terminal el comando python data.py.

Deploy Code

import boto3
from boto3.dynamodb.conditions import Key

dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table("ResultadosPartidos")
table.put_item(Item={
    "PK": "Premier League#2022/23",
    "SK":"Arsenal#Liverpool#2022/10/09",
    "Local":"Arsenal",
    "Visitante":"Liverpool",
    "MarcadorLocal":3,
    "MarcadorVisitante": 2,
    "Fecha":"2022/10/09"

})


table.put_item(Item={
    "PK": "Premier League#2022/23",
    "SK":"Manchester City#Southampton#2022/10/08",
    "Local":"Manchester City",
    "Visitante":"Southampton",
    "MarcadorLocal":4,
    "MarcadorVisitante": 0,
    "Fecha":"2022/10/08"

})

table.put_item(Item={
    "PK": "Premier League#2022/23",
    "SK":"Everton#Manchester Utd#2022/10/08",
    "Local":"Everton",
    "Visitante":"Manchester Utd",
    "MarcadorLocal":1,
    "MarcadorVisitante": 2,
    "Fecha":"2022/10/09"

})

table.put_item(Item={
    "PK": "Premier League#2022/23",
    "SK":"Manchester City#Manchester Utd#2022/10/08",
    "Local":"Manchester City",
    "Visitante":"Manchester Utd",
    "MarcadorLocal":6,
    "MarcadorVisitante": 3,
    "Fecha":"2022/10/02"

})

table.put_item(Item={
    "PK": "Premier League#2022/23",
    "SK":"Liverpool#Brighton#2022/10/09",
    "Local":"Liverpool",
    "Visitante":"Brighton",
    "MarcadorLocal":3,
    "MarcadorVisitante": 3,
    "Fecha":"2022/10/01"

})


table.put_item(Item={
    "PK": "Manchester City#2022/23",
    "SK":"Jugador#Erling Haaland",
    "Nombre":"Erling Haaland",
    "Goles":15,
    "Partidos Jugados": 9
})

table.put_item(Item={
    "PK": "Liverpool#2022/23",
    "SK":"Jugador#Luis Diaz",
    "Nombre":"Luis Diaz",
    "Goles":3,
    "Partidos Jugados": 8
})

table.put_item(Item={
    "PK": "Tottenham#2022/23",
    "SK":"Jugador#Kane",
    "Nombre":"Kane",
    "Goles":8,
    "Partidos Jugados": 9
})

Enter fullscreen mode Exit fullscreen mode

Por ultimo vamos a validar la información almacenada en las dos tablas.

Resultado Partidos

Cómo podemos observar en la tabla ResultadoPartidos se almaceno la información de los resultados de cada partido más la información de los jugadores de los equipos. Esto se realiza de esta forma por el tipo de diseño de tabla que elegimos para el ejercicio que es de única tabla.

Resultado Partidos

En la tabla TablaPosiciones podemos observar que únicamente tenemos almacenado la información de los equipos de futbol y no la de los jugadores porque no cumplen con el filtro creado. Si queremos confirmar que la función Lambda solo se haya llamado las veces correctas (5 que son el numero de partidos que ingresamos), debemos ir a CloudWatch y buscar el log group que termina con el nombre de la función Lambda: lambda/lambda_stream_dynamodb y observar el numero de veces que la función fue invocada.

Resultado Partidos

La Arquitectura de nuestro post es la siguiente:

Resultado Partidos

Como actividad adicional pueden agregar mas registros al script de inserción, tratar de crear nuevos tipos de filtros o mejorar el código fuente de la función Lambda.

En el proximo post vamos a ver cómo crear gráficos de los datos de la tabla TablaPosiciones utilizando el servicio de QuickSight.

Me pueden encontrar en

Camilo Cabrales

Referencias

Filtros Eventos Lambda
Documentación DynamoDB
Boto3

💖 💪 🙅 🚩
cecamilo
camilo cabrales

Posted on October 18, 2022

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related