Quick tip: Using Apache Spark and GraphFrames with SingleStore Notebooks

veryfatboy

Akmal Chaudhri

Posted on April 12, 2024

Quick tip: Using Apache Spark and GraphFrames with SingleStore Notebooks

Abstract

In this article, we'll see how to use the Apache Spark GraphFrames package with SingleStore by using historical data about the London Underground network. We'll store the data as stations (vertices) and line connections (edges) in SingleStore and then load the data into a notebook environment and perform some queries on the data using GraphFrames.

The notebook file used in this article is available on GitHub.

Create a SingleStore Cloud account

A previous article showed the steps to create a free SingleStore Cloud account. We'll use the following settings:

  • Workspace Group Name: Spark Demo Group
  • Cloud Provider: AWS
  • Region: US East 1 (N. Virginia)
  • Workspace Name: spark-demo
  • Size: S-00

We'll make a note of the password and store it in the secrets vault using the name password.

Create a new notebook

From the left navigation pane in the cloud portal, we'll select DEVELOP > Data Studio.

In the top right of the web page, we'll select New Notebook > New Notebook, as shown in Figure 1.

Figure 1. New Notebook.

Figure 1. New Notebook.

We'll call the notebook spark_graphframes_demo, select a Blank notebook template from the available options, and save it in the Personal location.

Fill out the notebook

First, let's install Java:

!conda install -y --quiet -c conda-forge openjdk
Enter fullscreen mode Exit fullscreen mode

Next, we'll install some libraries:

!pip install folium --quiet
!pip install graphframes --quiet
!pip install pyspark --quiet
Enter fullscreen mode Exit fullscreen mode

In previous articles in our Apache Spark series using notebooks, we've downloaded jar files into a local directory. However, in this article, we'll use spark.jars.packages and create our SparkSession as follows:

# List of Maven coordinates for all required packages
maven_packages = [
    "graphframes:graphframes:0.8.4-spark3.5-s_2.12",
    "org.scala-lang:scala-library:2.12",
    "com.singlestore:singlestore-jdbc-client:1.2.4",
    "com.singlestore:singlestore-spark-connector_2.12:4.1.8-spark-3.5.0",
    "org.apache.commons:commons-dbcp2:2.12.0",
    "org.apache.commons:commons-pool2:2.12.0",
    "io.spray:spray-json_3:1.3.6"
]

# Create Spark session with all required packages
spark = (SparkSession
             .builder
             .config("spark.jars.packages", ",".join(maven_packages))
             .appName("Spark GraphFrames Test")
             .getOrCreate()
        )

spark.sparkContext.setLogLevel("ERROR")
Enter fullscreen mode Exit fullscreen mode

A database is required, so we'll create one:

DROP DATABASE IF EXISTS spark_demo;
CREATE DATABASE IF NOT EXISTS spark_demo;
Enter fullscreen mode Exit fullscreen mode

We'll also create tables to store the data:

USE spark_demo;

DROP TABLE IF EXISTS connections;
CREATE ROWSTORE TABLE IF NOT EXISTS connections (
     src      INT,
     dst      INT,
     line     VARCHAR(32),
     colour   VARCHAR(8),
     time     INT,
     PRIMARY KEY(src, dst, line)
);

DROP TABLE IF EXISTS stations;
CREATE ROWSTORE TABLE IF NOT EXISTS stations (
     id          INT PRIMARY KEY,
     latitude    DOUBLE,
     longitude   DOUBLE,
     name        VARCHAR(32),
     zone        FLOAT,
     total_lines INT,
     rail        INT
);
Enter fullscreen mode Exit fullscreen mode

Now we'll download the historical data for the London Underground into Pandas. We'll also perform some adjustments for GraphFrames and some merging of the data to mirror the schema of the tables we defined above:

connections_url = "https://raw.githubusercontent.com/VeryFatBoy/singlestore-geospatial-example/main/csv/london_connections.csv"
stations_url = "https://raw.githubusercontent.com/VeryFatBoy/singlestore-geospatial-example/main/csv/london_stations.csv"
lines_url = "https://raw.githubusercontent.com/VeryFatBoy/singlestore-geospatial-example/main/csv/london_lines.csv"

connections_df = pd.read_csv(connections_url)
connections_df.rename(
    columns = {"station1": "src", "station2": "dst"},
    inplace = True
)

stations_df = pd.read_csv(stations_url)
stations_df.drop(
    "display_name",
    axis = 1,
    inplace = True
)

lines_df = pd.read_csv(lines_url)
lines_df.drop(
    "stripe",
    axis = 1,
    inplace = True
)

connections_df = pd.merge(
    connections_df,
    lines_df,
    on = "line",
    how = "left"
)
connections_df.drop(
    "line",
    axis = 1,
    inplace = True
)
connections_df.rename(
    columns = {"name": "line"},
    inplace = True
)
Enter fullscreen mode Exit fullscreen mode

Just before we save our data into SingleStore, we'll create a map of the London Underground using Folium:

London = [51.509865, -0.118092]
mymap = folium.Map(location = London, zoom_start = 12)

# Add markers for stations
for idx, row in stations_df.iterrows():
    folium.Marker(
        [row["latitude"], row["longitude"]],
        popup = row["name"]
    ).add_to(mymap)

# Add lines with colours
for idx, row in connections_df.iterrows():
    source = stations_df.loc[stations_df["id"] == row["src"]]
    target = stations_df.loc[stations_df["id"] == row["dst"]]

    # Extract latitude and longitude
    source_coords = (float(source["latitude"].iloc[0]), float(source["longitude"].iloc[0]))
    target_coords = (float(target["latitude"].iloc[0]), float(target["longitude"].iloc[0]))

    folium.PolyLine(
        locations = [source_coords, target_coords],
        color = row["colour"]
    ).add_to(mymap)

html_content = mymap._repr_html_()
Enter fullscreen mode Exit fullscreen mode

and save the map to Stage for download, as follows:

with nb.stage.open("map.html", "w") as st:
    st.write(html_content)
Enter fullscreen mode Exit fullscreen mode

This produces a map, as shown in Figure 2. We can scroll and zoom the map. When clicked, a marker will show the station name and the lines are coloured according to the London Underground scheme.

Figure 2. Map using Folium.

Figure 2. Map using Folium.

We'll now prepare the connection to SingleStore:

from sqlalchemy import *

db_connection = create_engine(connection_url)
url = db_connection.url
Enter fullscreen mode Exit fullscreen mode

and write the connections data:

connections_df.to_sql(
    "connections",
    con = db_connection,
    if_exists = "append",
    index = False,
    chunksize = 1000
)
Enter fullscreen mode Exit fullscreen mode

and stations data:

stations_df.to_sql(
    "stations",
    con = db_connection,
    if_exists = "append",
    index = False,
    chunksize = 1000
)
Enter fullscreen mode Exit fullscreen mode

We can check the data in the connections table:

SELECT * FROM connections LIMIT 5;
Enter fullscreen mode Exit fullscreen mode

and stations table:

SELECT * FROM stations LIMIT 5;
Enter fullscreen mode Exit fullscreen mode

Now we'll create the Spark connection to SingleStore:

password = get_secret("password")
host = url.host
port = url.port
cluster = host + ":" + str(port)
Enter fullscreen mode Exit fullscreen mode

We also need to set some configuration parameters:

spark.conf.set("spark.datasource.singlestore.ddlEndpoint", cluster)
spark.conf.set("spark.datasource.singlestore.user", "admin")
spark.conf.set("spark.datasource.singlestore.password", password)
spark.conf.set("spark.datasource.singlestore.disablePushdown", "false")
Enter fullscreen mode Exit fullscreen mode

With the data safely stored in SingleStore, we can now read it back into Spark and use GraphFrames. First the connections data:

connections = (spark.read
    .format("singlestore")
    .load("spark_demo.connections")
)
Enter fullscreen mode Exit fullscreen mode

and then the stations data:

stations = (spark.read
    .format("singlestore")
    .load("spark_demo.stations")
)
Enter fullscreen mode Exit fullscreen mode

Now we'll create a GraphFrame:

underground = GraphFrame(stations, connections)
Enter fullscreen mode Exit fullscreen mode

We can show the vertices:

underground.vertices.show(5)
Enter fullscreen mode Exit fullscreen mode

Example output:

+---+--------+---------+---------------+----+-----------+----+
| id|latitude|longitude|           name|zone|total_lines|rail|
+---+--------+---------+---------------+----+-----------+----+
| 25|  51.512|  -0.1031|    Blackfriars| 1.0|          2|   0|
| 39| 51.5481|  -0.1188|Caledonian Road| 2.0|          1|   0|
| 43| 51.5147|   0.0082|   Canning Town| 3.0|          2|   0|
| 50| 51.7052|   -0.611|        Chesham|10.0|          1|   0|
| 60| 51.5129|  -0.1243|  Covent Garden| 1.0|          1|   0|
+---+--------+---------+---------------+----+-----------+----+
only showing top 5 rows
Enter fullscreen mode Exit fullscreen mode

We can show the edges:

underground.edges.show(5)
Enter fullscreen mode Exit fullscreen mode

Example output:

+---+---+--------------------+-------+----+
|src|dst|                line| colour|time|
+---+---+--------------------+-------+----+
|  7|145|       Northern Line|#000000|   2|
| 11|163|       Bakerloo Line|#B36305|   1|
| 19| 97|Docklands Light R...|#00A4A7|   2|
| 28|192|        Central Line|#E32017|   1|
| 49|151|       Northern Line|#000000|   2|
+---+---+--------------------+-------+----+
only showing top 5 rows
Enter fullscreen mode Exit fullscreen mode

We can check how many stations are in each London Underground Zone:

(underground
    .vertices
    .groupBy("zone")
    .count()
    .orderBy("count", ascending = False)
    .show()
)
Enter fullscreen mode Exit fullscreen mode

Example output:

+----+-----+
|zone|count|
+----+-----+
| 2.0|   75|
| 1.0|   60|
| 3.0|   47|
| 4.0|   38|
| 5.0|   28|
| 6.0|   18|
| 2.5|   17|
| 3.5|    6|
| 1.5|    4|
| 8.0|    2|
|10.0|    2|
| 7.0|    2|
| 9.0|    1|
| 6.5|    1|
| 5.5|    1|
+----+-----+
Enter fullscreen mode Exit fullscreen mode

It may be useful to find the number of stations by the line name:

(underground
    .edges
    .filter("line = 'District Line'")
    .count()
)
Enter fullscreen mode Exit fullscreen mode

Example output:

59
Enter fullscreen mode Exit fullscreen mode

It could be interesting to know the maximum number of lines running through a station:

(underground
    .vertices
    .groupBy()
    .max("total_lines")
    .show()
)
Enter fullscreen mode Exit fullscreen mode

Example output:

+----------------+
|max(total_lines)|
+----------------+
|               6|
+----------------+
Enter fullscreen mode Exit fullscreen mode

and to find the station with the most lines running through it:

(underground
    .vertices
    .filter("total_lines == 6")
    .show()
)
Enter fullscreen mode Exit fullscreen mode

Example output:

+---+--------+---------+--------------------+----+-----------+----+
| id|latitude|longitude|                name|zone|total_lines|rail|
+---+--------+---------+--------------------+----+-----------+----+
|145| 51.5308|  -0.1238|King's Cross St. ...| 1.0|          6|   1|
+---+--------+---------+--------------------+----+-----------+----+
Enter fullscreen mode Exit fullscreen mode

Many more types of queries are possible. The GraphFrames documentation is a good place to start.

Finally, we can stop Spark:

spark.stop()
Enter fullscreen mode Exit fullscreen mode

Summary

In this short article, we've seen the ease with which we can store graph data in SingleStore and how we can use GraphFrames to perform various queries on the data.

💖 💪 🙅 🚩
veryfatboy
Akmal Chaudhri

Posted on April 12, 2024

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related