Build a Multilayer Perceptron with PySpark

ruizleandro

Leandro Ruiz

Posted on July 22, 2020

Build a Multilayer Perceptron with PySpark

In this article we will build a multilayer perceptron, using Spark. The dataset that we are going to use for this exercise contains close to 75k records, with some sample customer journey data on a retail web site. There are 16 input features to predict whether the visitor is likely to convert. We have a balanced target class in this dataset. We will use MultilayerPerceptronClassifier from Spark's ML library. We start by importing a few important dependencies.

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('deep_learning').getOrCreate()
import os
import numpy as np
import pandas as pd
from pyspark.sql.types import *
Enter fullscreen mode Exit fullscreen mode

Now we load the dataset into Spark, for feature engineering and model training. As mentioned, there are 16 input features and 1 output column (Orders_Normalized).

[In]: data = spark.read.csv('dl_data.csv', header=True, inferSchema=True)
[In]: data.printSchema()
[Out]: root
        |-- Visit_Number_Bucket: string (nullable = true)
        |-- Page_Views_Normalized: double (nullable = true)
        |-- Orders_Normalized: integer (nullable = true)
        |-- Internal_Search_Successful_Normalized: double (nullable = true)
        |-- Internal_Search_Null_Normalized: double (nullable = true)
        |-- Email_Signup_Normalized: double (nullable = true)
        |-- Total_Seconds_Spent_Normalized: double (nullable = true)
        |-- Store_Locator_Search_Normalized: double (nullable = true)
        |-- Mapped_Last_Touch_Channel: string (nullable = true)
        |-- Mapped_Mobile_Device_Type: string (nullable = true)
        |-- Mapped_Browser_Type: string (nullable = true)
        |-- Mapped_Entry_Pages: string (nullable = true)
        |-- Mapped_Site_Section: string (nullable = true)
        |-- Mapped_Promo_Code: string (nullable = true)
        |-- Maped_Product_Name: string (nullable = true)
        |-- Mapped_Search_Term: string (nullable = true)
        |-- Mapped_Product_Collection: string (nullable = true)
Enter fullscreen mode Exit fullscreen mode

We change the name of the label column from Orders_Normalized to label, to be able to train the model.

[In]: data = data.withColumnRenamed('Orders_Normalized', 'label')
[In]: data.printSchema()
[Out]: root
        |-- Visit_Number_Bucket: string (nullable = true)
        |-- Page_Views_Normalized: double (nullable = true)
        |-- label: integer (nullable = true)
        |-- Internal_Search_Successful_Normalized: double (nullable = true)
        |-- Internal_Search_Null_Normalized: double (nullable = true)
        |-- Email_Signup_Normalized: double (nullable = true)
        |-- Total_Seconds_Spent_Normalized: double (nullable = true)
        |-- Store_Locator_Search_Normalized: double (nullable = true)
        |-- Mapped_Last_Touch_Channel: string (nullable = true)
        |-- Mapped_Mobile_Device_Type: string (nullable = true)
        |-- Mapped_Browser_Type: string (nullable = true)
        |-- Mapped_Entry_Pages: string (nullable = true)
        |-- Mapped_Site_Section: string (nullable = true)
        |-- Mapped_Promo_Code: string (nullable = true)
        |-- Maped_Product_Name: string (nullable = true)
        |-- Mapped_Search_Term: string (nullable = true)
        |-- Mapped_Product_Collection: string (nullable = true)
Enter fullscreen mode Exit fullscreen mode

Because we are dealing with both numerical and categorical coluns, we must write a pipeline to create features combinind both for model training. Therefore, we import Pipeline, VectorAssembler, and OneHotEncoder, to create feature vectors. We will also import MultiClassificationEvaluator and MultilayerPerceptron, to check the performance of the model.

from pyspark.ml.feature import OneHotEncoder, VectorAssembler, StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf, StringType
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import MultilayerPerceptronClassifier
Enter fullscreen mode Exit fullscreen mode

Alt Text

Link to GitHub

Split into Train and Test Sets

We now split the data into train, validation, and test sets, for the training of the model.

train, validation, test = data.randomSplit([0.7, 0.2, 0.1], 1234)
Enter fullscreen mode Exit fullscreen mode

Data Preprocessing

We create separate lsits of categorical columns and numeric columns based on datatypes.

categorical_columns = [item[0] for item in data.dtypes if item[1].startswith(
    'string')]
numeric_columns = [item[0] for item in data.dtypes if item[1].startswith(
    'double')]
indexers = [StringIndexer(inputCol=column, outputCol='{0}_index'.format(
    column)) for column in categorical_columns]
Enter fullscreen mode Exit fullscreen mode

We now create consolidated feature vectors, using VectorAssembler:

featuresCreator = VectorAssembler(
    inputCols=[indexer.getOutputCol() for indexer in indexers] + numeric_columns,
    outputCol='features')
layers = [len(featuresCreator.getInputCols()), 4, 2, 2]
Enter fullscreen mode Exit fullscreen mode

Model Building

The next step is to build the MultilayerPerceptron model. One can play around with different hyperparameters, such as number of layers and maxiters, to improve performance of the model.

classifier = MultilayerPerceptronClassifier(labelCol='label',
                                            featuresCol='features',
                                            maxIter=100,
                                            layers=layers,
                                            blockSize=128,
                                            seed=1234)
Enter fullscreen mode Exit fullscreen mode

Now that we have defined every stage, we add all these steps to the pipeline and tun it on the training data.

pipeline = Pipeline(stages=indexers + [featuresCreator, classifier])
model = pipeline.fit(train)
Enter fullscreen mode Exit fullscreen mode

We now calculate the predictions of the model on train, validation and test datasets.

train_output_df = model.transform(train)
validation_output_df = model.transform(validation)
test_output_df = model.transform(test)
Enter fullscreen mode Exit fullscreen mode
train_predictionAndLabels = train_output_df.select('prediction', 'label')
validation_predictionAndLabels = validation_output_df.select('prediction', 'label')
test_predictionAndLabels = test_output_df.select('prediction', 'label')
Enter fullscreen mode Exit fullscreen mode

Model Evaluation

We define three different metrics, to evaluate the performance of the model.

[In]: metrics = ['weightedPrecision', 'weightedRecall', 'accuracy']
[In]: for metric in metrics:
        evaluator = MulticlassClassificationEvaluator(metricName=metric)
        print('Train ' + metric + ' = ' + str(evaluator.evaluate(
            train_predictionAndLabels)))
        print('Validation ' + metric + ' = ' + str(evaluator.evaluate(
            validation_predictionAndLabels)))
        print('Test ' + metric + ' = ' + str(evaluator.evaluate(
            test_predictionAndLabels)))
[Out]: Train weightedPrecision = 0.9722605697126978
[Out]: Validation weightedPrecision = 0.9734944186485901
[Out]: Test weightedPrecision = 0.9710090865749514
[Out]: Train weightedRecall = 0.9718655625913297
[Out]: Validation weightedRecall = 0.9731379731379731
[Out]: Test weightedRecall = 0.9706199460916443
[Out]: Train accuracy = 0.9718655625913297
[Out]: Validation accuracy = 0.9731379731379731
[Out]: Test accuracy = 0.9706199460916443
Enter fullscreen mode Exit fullscreen mode

As we can see, the deep learning model is doing reasonably well on the test data, based on the input signal.

💖 💪 🙅 🚩
ruizleandro
Leandro Ruiz

Posted on July 22, 2020

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related