Katarina Supe
Posted on October 22, 2021
Introduction
Twitch is the world’s leading live streaming platform for gamers. In this blog post, you’ll explore the Twitch dataset to find out which streamers, teams, and games are the most popular. You will also discover which users or bots are great candidates for receiving a VIP or moderator badges and measure the popularity of streamers by how much followers and viewers and chatters they have. Each streamer has a network of chatters, VIPs, and moderators, so you will use the MAGE PageRank algorithm to check who is the most popular streamer. Besides that, you can calculate the betweenness centrality on this network and find out which streamer has the most influence. This blog is divided into three parts, depending on the part of the application you are building:
- Part 1: data source and backend implementation
- Part 2: frontend implementation
- Part 3: streaming data from a Kafka cluster
App Architecture
The app consists of five main services:
-
twitch-stream
: A Python script that gets new chatters for certain streamers and sends them to a Kafka cluster. -
kafka
: A Kafka cluster consisting of one topic namedchatters
. -
memgraph-mage
: A graph analytics platform that we query for relevant statistics. This platform also stores the incoming Twitch data from Kafka and performs PageRank and betweenness centrality algorithms on all streamers. -
twitch-app
: A Flask server that sends all the data we query frommemgraph-mage
to thereact-app
. It also consumes the Kafka stream and sends it to thereact-app
. -
react-app
: A React app that visualizes the Twitch network with the D3.js library.
Dataset and Graph Schema
You can collect the data using the Twitch API and then rearrange it to fit the idea of graph databases. You can check out the script that creates .csv
files which you'll import to Memgraph. You'll use the files: streamers.csv
, teams.csv
, vips.csv
, moderators.csv
and chatters.csv
. Once generated, streamers.csv
will hold important information about the languages the user speaks and the games the user streams.
All nodes, except for the :User:Stream
node, have only the name property. The :User:Stream
node represents live stream users and its followers
and totalViewCount
properties are important for measuring their popularity. Language, team, and a game could have been properties on :User:Stream
node, but since there are many users who speak the same language, belong to the same team, or play the same game, it's better if they're connected. Now that you understand the Twitch network better, it's time to visualize it by making a web application!
Prerequisites
Let's install all the tools you'll need to build the app:
-
Docker and Docker Compose:
- If you are using Windows, first install Windows Subsystem for Linux (WSL) and then Docker Desktop. If you are a Linux user, install Docker and Docker Compose.
-
Node.js
- You'll also need Node.js for the
npx
command you'll use to create the React app.
- You'll also need Node.js for the
Project structure
Below you can see the how the structure of the project should look like at the end. I will explain the most important parts of the implementation and you can check the whole repository if you are missing any other details.
| docker-compose.yml
|
+---backend
| | app.py
| | models.py
| | twitch_data.py
| | requirements.txt
| | Dockerfile
| +---import-data
| | chatters.csv
| | moderators.csv
| | streamers.csv
| | teams.csv
| | vips.csv
|
+---frontend
| | craco.config.js
| | Dockerfile
| | package.json
| | package-lock.json
| +---node_modules
| +---public
| +---src
|
+---memgraph
| | Dockerfile
| +---query_modules
| | twitch.py
| +---mg_log
| +---mg_lib
|
+---twitch-stream
| | Dockerfile
| | dummy.py
| | setup.py
| | chatters.csv
| | requirements.txt
Let's build the application from bottom to top. First, create the backend
and memgraph
directories. Within the backend
directory, create the import-data
subdirectory and move all the scraped CSV files there or use the files you already have. Also, create docker-compose.yml
file at the root folder of the project. The project structure should look like this:
| docker-compose.yml
|
+---backend
| | app.py
| | models.py
| | twitch_data.py
| | requirements.txt
| | Dockerfile
| +---import-data
| | chatters.csv
| | moderators.csv
| | streamers.csv
| | teams.csv
| | vips.csv
|
+---memgraph
| | Dockerfile
| +---mg_log
| +---mg_lib
Dockerizing Memgraph and the Backend
The docker-compose.yml
file looks like this:
version: "3"
networks:
app-tier:
driver: bridge
services:
memgraph-mage:
build: ./memgraph
volumes:
- ./memgraph/mg_lib:/var/lib/memgraph
- ./memgraph/mg_log:/var/log/memgraph
entrypoint:
[
"/usr/lib/memgraph/memgraph",
"--telemetry-enabled=false"
]
ports:
- "7687:7687"
networks:
- app-tier
twitch-app:
build: ./backend
volumes:
- ./backend:/app
ports:
- "5000:5000"
environment:
MG_HOST: memgraph-mage
MG_PORT: 7687
depends_on:
- memgraph-mage
networks:
- app-tier
This application has many services which depend on one another. With the docker-compose.yml
file, you can simply run the application using docker-compose build
and docker-compose up
. The memgraph-mage
service has a running Memgraph instance along with MAGE - Memgraph Advanced Graph Extensions. The twitch-app
is the backend service and it depends on memgraph-mage
, because you need a running Memgraph instance in order to save data into Memgraph and run queries. The backend will be running on port 5000. Now, take a look at the backend Dockerfile:
FROM python:3.8
# Install CMake
RUN apt-get update && \
apt-get --yes install cmake && \
rm -rf /var/lib/apt/lists/*
# Install packages
COPY requirements.txt ./
RUN pip3 install -r requirements.txt
COPY app.py /app/app.py
COPY import-data /app/import-data
WORKDIR /app
ENV FLASK_ENV=development
ENV LC_ALL=C.UTF-8
ENV LANG=C.UTF-8
ENTRYPOINT ["python3", "app.py", "--populate"]
The requirements.txt
is copied to the container to install all the dependecies correctly. You can check out what's in there:
Flask==1.1.2
gqlalchemy==1.1.2
Flask, a micro web framework, will be installed in the Docker container first. Flask wraps Werkzeug, a comprehensive Web Server Gateway Interface (WSGI) web application library. It will enable application to communicate with Memgraph on request and send a response back. Next, gqlalchemy, a library developed to assist in writing and running queries on Memgraph, will be installed inside the container. GQLAlchemy is a fully open-source Python library that aims to be the go-to Object Graph Mapper (OGM) -- a link between Graph Database objects and Python objects.
The import-data
folder is copied to the Docker container where the twitch-app
service is running since you'll load the data from the CSV files there and then save it into Memgraph. Include the flag --populate
in the Dockerfile in order to load data into Memgraph. Later on, if you restart the app, remove the --populate
flag. Due to the created volumes, the data will stay loaded into Memgraph.
Backend Implementation
Let's build an API that will help frontend get all the necessary data from Memgraph. First, you need to set up everything for the Flask server. Add the arguments, such as --host
, --port
and --debug
flags and create the previously mentioned --populate
flag.
memgraph = Memgraph()
app = Flask(
__name__,
)
args = None
def parse_args():
"""Parse command line arguments."""
parser = ArgumentParser(description=__doc__)
parser.add_argument("--host", default="0.0.0.0", help="Host address.")
parser.add_argument("--port", default=5000, type=int, help="App port.")
parser.add_argument(
"--debug",
default=True,
action="store_true",
help="Run web server in debug mode.",
)
parser.add_argument(
"--populate",
dest="populate",
action="store_true",
help="Run app with data loading."
)
parser.set_defaults(populate=False)
log.info(__doc__)
return parser.parse_args()
@log_time
def load_data():
"""Load data into the database."""
if not args.populate:
log.info("Data is loaded in Memgraph.")
return
log.info("Loading data into Memgraph.")
try:
memgraph.drop_database()
twitch_data.load()
except Exception as e:
log.info("Data loading error.")
def main():
global args
args = parse_args()
if os.environ.get("WERKZEUG_RUN_MAIN") == "true":
init_log()
connect_to_memgraph()
load_data()
app.run(host=args.host, port=args.port, debug=args.debug)
if __name__ == "__main__":
main()
First, you should set all arguments. Then, when you start the app.py
, you are connecting to Memgraph. After that, the data is loaded from the CSV files and saved into Memgraph, if there is a --populate
flag. All loading methods are located in the twitch_data.py script. There are several different ways to import data, and LOAD CSV
clause is probably the best option for importing large datasets. But, if Python plays a big part in your tech stack, try loading and saving the data using GQLAlchemy's object graph mapper (OGM).
To do that, you first have to map Python classes to the appropriate nodes and relationships. You can find all mappings in models.py
. Check out how you can create User
, Stream
, Language
and Game
nodes, as well as Speaks
and Plays
relationships:
class User(Node):
name: str = Field(index=True, exists=True, unique=True, db=memgraph)
class Stream(User):
name: Optional[str] = Field(
index=True, exists=True, unique=True, db=memgraph, label="User"
)
id: str = Field(index=True, exists=True, unique=True, db=memgraph)
url: Optional[str] = Field()
followers: Optional[int] = Field()
createdAt: Optional[str] = Field()
totalViewCount: Optional[int] = Field()
description: Optional[str] = Field()
class Language(Node):
name: str = Field(unique=True, db=memgraph)
class Game(Node):
name: str = Field(unique=True, db=memgraph)
class Speaks(Relationship, type="SPEAKS"):
pass
class Plays(Relationship, type="PLAYS"):
pass
Now, check how the data from the streamers.csv
can be saved into Memgraph:
def load_streams(path):
with open(path) as read_obj:
csv_reader = reader(read_obj)
header = next(csv_reader)
if header != None:
for row in csv_reader:
stream = models.Stream(
id=row[1],
name=row[3],
url=row[6],
followers=row[7],
createdAt=row[10],
totalViewCount=row[9],
description=row[8],
).save(memgraph)
language = models.Language(name=row[5]).save(memgraph)
game = models.Game(name=row[4]).save(memgraph)
speaks_rel = models.Speaks(
_start_node_id=stream._id, _end_node_id=language._id
).save(memgraph)
plays_rel = models.Plays(
_start_node_id=stream._id, _end_node_id=game._id
).save(memgraph)
Here nodes with the labels :User:Stream
are created, along with their properties. A streamer speaks a language and plays a game, and nodes :Language
and :Game
are created to represent those connections.
If you want to learn more about GQLAchemy's OGM and query builder, check out our how-to guides.
After the data is in Memgraph, the Flask server will run. For now, the only thing you can actually check is whether your data is in Memgraph. The easiest way, especially if you are a visual type, is by using Memgraph Lab. But first, don't forget to build the whole project in order to try it out. Placing yourself in the project root folder and run:
docker-compose build memgraph-mage
docker-compose build twitch-app
To start the Memgraph instance (along with MAGE), run:
docker-compose up memgraph-mage
Now that you have a running Memgraph instance, you can install Memgraph Lab, open it and click on the Connect button. Then run the twitch-app
server to start the data loading process:
docker-compose up twitch-app
You can see the total number of nodes and edges in your database at the Overview tab:
Nice - the data is really there! Try adding some simple methods in the app.py
and test the connection with Memgraph.
@app.route("/nodes", methods=["GET"])
@log_time
def get_nodes():
"""Get the number of nodes in database."""
try:
num_of_nodes = next(
Match()
.node(variable="node")
.return_({"count(node)": "num_of_nodes"})
.execute()
)["num_of_nodes"]
response = {"nodes": num_of_nodes}
return Response(
response=dumps(response), status=200, mimetype="application/json"
)
except Exception as e:
log.info("Fetching number of nodes went wrong.")
log.info(e)
return ("", 500)
App route says that you can get the response at localhost:5000/nodes
. The method creates query using the query builder and Memgraph executes it and returns the results. In this case, there is only one result, and that is the total number of nodes in the database. Try it out!
If your data is already in Memgraph, make sure to remove
--populate
flag from the backend Dockerfile. Then build the backend service again by runningdocker-compose build twitch app
. Next time you runtwitch-app
you won't have to wait for the data import, since the data will already be there. This is also useful for all the other changes you're going to make on the backend service.
The idea behind these methods is to find out interesting statistics about the network. Check out how you can figure out what games are played by the most players:
@app.route("/top-games/<num_of_games>", methods=["GET"])
@log_time
def get_top_games(num_of_games):
"""Get top num_of_games games by number of streamers who play them."""
try:
results = list(
Match()
.node("User", variable="u")
.to("PLAYS")
.node("Game", variable="g")
.return_({"g.name": "game_name", "count(u)": "num_of_players"})
.order_by("num_of_players DESC")
.limit(num_of_games)
.execute()
)
games_list = list()
players_list = list()
for result in results:
game_name = result["game_name"]
num_of_players = result["num_of_players"]
games_list.append(game_name)
players_list.append(num_of_players)
games = [{"name": game_name} for game_name in games_list]
players = [{"players": player_count} for player_count in players_list]
response = {"games": games, "players": players}
return Response(
response=dumps(response), status=200, mimetype="application/json"
)
except Exception as e:
log.info("Fetching top games went wrong.")
log.info(e)
return ("", 500)
With query builder from GQLAlchemy it's easy to get a game name and the number of players that play that game, in descending order. After that, you can put the data into a dictionary to get the response in JSON format which is easy to work with. The response is a list of games along with the number of players that play that game, in descending order. The length of the list depends on the argument num_of_games
.
Besides general statistics, it would be nice to visualize some information about streamers. You can search the database for your favorite streamer and get the game the streamer plays, the language the streamer speaks and the team the streamer is part of.
@app.route("/streamer/<streamer_name>", methods=["GET"])
@log_time
def get_streamer(streamer_name):
"""Get info about streamer whose name is streamer_name."""
try:
counter = next(
Match()
.node("User", "u")
.where("u.name", "=", streamer_name)
.return_({"count(u)": "num_of_streamers"})
.execute()
)["num_of_streamers"]
# If the streamer exists, return its relationships
if counter != 0:
results = list(
Match()
.node("User", variable="u")
.to()
.node(variable="n")
.where("u.name", "=", streamer_name)
.return_(
{
"u.id": "streamer_id",
"u.name": "streamer_name",
"n.name": "node_name",
"labels(n)": "labels",
}
)
.execute()
)
links_set = set()
nodes_set = set()
for result in results:
if result["labels"][0] != "Stream" and result["labels"][0] != "User":
source_id = result["streamer_id"]
source_name = result["streamer_name"]
source_label = "Stream"
target_id = result["node_name"]
target_name = result["node_name"]
target_label = result["labels"][0]
nodes_set.add((source_id, source_label, source_name))
nodes_set.add((target_id, target_label, target_name))
if (source_id, target_id) not in links_set and (
target_id,
source_id,
) not in links_set:
links_set.add((source_id, target_id))
nodes = [
{"id": node_id, "label": node_label, "name": node_name}
for node_id, node_label, node_name in nodes_set
]
links = [{"source": n_id, "target": m_id} for (n_id, m_id) in links_set]
# If the streamer doesn't exist, return empty response
else:
nodes = []
links = []
response = {"nodes": nodes, "links": links}
return Response(
response=dumps(response), status=200, mimetype="application/json"
)
except Exception as e:
log.info("Fetching streamer by name went wrong.")
log.info(e)
return ("", 500)
Within the response you'll get the streamer node along with the links the streamer is connected to. This data will be useful on the frontend side, where you can use it to draw graphs with D3.js. Now, let’s see how Memgraph’s query modules work, particularly PageRank and Betweenness Centrality algorithms. With these you can measure the popularity and influence of streamers and simply determine which node is the most relevant in the graph. Here is the get_page_rank()
method:
@app.route("/page-rank", methods=["GET"])
@log_time
def get_page_rank():
"""Call the Page rank procedure and return top 50 in descending order."""
try:
results = list(
Call("pagerank.get")
.yield_()
.with_({"node": "node", "rank": "rank"})
.add_custom_cypher("WHERE node:Stream OR node:User")
.return_({"node.name": "node_name", "rank": "rank"})
.order_by("rank DESC")
.limit(50)
.execute()
)
page_rank_dict = dict()
page_rank_list = list()
for result in results:
user_name = result["node_name"]
rank = float(result["rank"])
page_rank_dict = {"name": user_name, "rank": rank}
dict_copy = page_rank_dict.copy()
page_rank_list.append(dict_copy)
response = {"page_rank": page_rank_list}
return Response(
response=dumps(response), status=200, mimetype="application/json"
)
except Exception as e:
log.info("Fetching users' ranks using pagerank went wrong.")
log.info(e)
return ("", 500)
There are many different methods which you can come up with, but here is the list of the ones that are already implemented in app.py
:
Method | Description |
---|---|
get_top_games(num_of_games) |
Get top num_of_games games by number of streamers who play them. |
get_top_teams(num_of_teams) |
Get top num_of_teams teams by number of streamers who are part of them. |
get_top_vips(num_of_vips) |
Get top num_of_vips vips by number of streamers who gave them the vip badge. |
get_top_moderators(num_of_moderators) |
Get top num_of_moderators moderators by number of streamers who gave them the moderator badge. |
get_top_streamers_by_views(num_of_streamers) |
Get top num_of_streamers streamers by total number of views. |
get_top_streamers_by_followers(num_of_streamers) |
Get top num_of_streamers streamers by total number of followers. |
get_streamer(streamer_name) |
Get info about streamer whose name is streamer_name. |
get_streamers(language, game) |
Get all streamers who stream certain game in certain language. |
get_nodes() |
Get the number of nodes in database. |
get_edges() |
Get the number of edges in database. |
get_page_rank() |
Call the PageRank procedure and return top 50 in descending order. |
get_bc() |
Call the Betweenness centrality procedure and return top 50 in descending order. |
Conclusion
That's it for now! In the next part of this blog post, find out how to build React application on top of the backend, and in the third part, learn how to ingest new data with Kafka and see how you can visualize the streaming changes of your data. Feel free to join our Discord Community server, where you can ask any question related to this blog post or anything else you want to know about Memgraph.
Posted on October 22, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.