Why You Need Distributed Computing for Real-World Machine Learning
Swapnil
Posted on September 9, 2024
And How PySpark Can Help You Handle Huge Datasets Like a Pro
Machine learning frameworks like PyTorch and TensorFlow are awesome for building models. But the reality is, when it comes to real-world projects—where you’re dealing with gigantic datasets—you need more than just a good model. You need a way to efficiently process and manage all that data. That’s where distributed computing, like PySpark, comes in to save the day.
Let’s break down why handling big data in real-world machine learning means going beyond PyTorch and TensorFlow, and how PySpark helps you get there.
The Real Problem: Big Data
Most ML examples you see online use small, manageable datasets. You can fit the whole thing into memory, play around with it, and train a model in minutes. But in real-world scenarios—like credit card fraud detection, recommendation systems, or financial forecasts—you’re dealing with millions or even billions of rows. Suddenly, your laptop or server can’t handle it.
If you try loading all that data into PyTorch or TensorFlow at once, things will break. These frameworks are designed for model training, not for efficiently handling huge datasets. This is where distributed computing becomes crucial.
Why PyTorch and TensorFlow Aren’t Enough
PyTorch and TensorFlow are great for building and optimizing models, but they fall short when dealing with large-scale data tasks. Two major problems:
- Memory Overload: They load the entire dataset into memory before training. That works for small datasets, but when you’ve got terabytes of data, it’s game over.
- No Distributed Data Processing: PyTorch and TensorFlow aren’t built to handle distributed data processing. If you’ve got massive amounts of data spread across multiple machines, they don’t really help.
This is where PySpark shines. It’s designed to work with distributed data, processing it efficiently across multiple machines while handling massive datasets without crashing your system.
Real-World Example: Credit Card Fraud Detection with PySpark
Let’s dive into an example. Suppose you’re working on a fraud detection system using credit card transaction data. In this case, we’ll use a popular dataset from Kaggle. It contains over 284,000 transactions, and less than 1% of them are fraudulent.
Step 1: Set Up PySpark in Google Colab
We’ll use Google Colab for this because it lets us run PySpark with minimal setup.
!pip install pyspark
Next, import the necessary libraries and start a Spark session.
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, udf
from pyspark.ml.feature import VectorAssembler, StringIndexer, MinMaxScaler
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vectors
import numpy as np
from pyspark.sql.types import FloatType
start a pyspark session
spark = SparkSession.builder \
.appName("FraudDetectionImproved") \
.master("local[*]") \
.config("spark.executorEnv.PYTHONHASHSEED", "0") \
.getOrCreate()
Step 2: Load and Prepare data
data = spark.read.csv('creditcard.csv', header=True, inferSchema=True)
data = data.orderBy("Time") # Ensure data is sorted by time
data.show(5)
data.describe().show()
# Check for missing values in each column
data.select([sum(col(c).isNull().cast("int")).alias(c) for c in data.columns]).show()
# Prepare the feature columns
feature_columns = data.columns
feature_columns.remove("Class") # Removing "Class" column as it is our label
# Assemble features into a single vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(data)
data.select("features", "Class").show(5)
# Split data into train (60%), test (20%), and unseen (20%)
train_data, temp_data = data.randomSplit([0.6, 0.4], seed=42)
test_data, unseen_data = temp_data.randomSplit([0.5, 0.5], seed=42)
# Print class distribution in each dataset
print("Train Data:")
train_data.groupBy("Class").count().show()
print("Test and parameter optimisation Data:")
test_data.groupBy("Class").count().show()
print("Unseen Data:")
unseen_data.groupBy("Class").count().show()
Step 3: Initialise Model
# Initialize RandomForestClassifier
rf = RandomForestClassifier(labelCol="Class", featuresCol="features", probabilityCol="probability")
# Create ParamGrid for Cross Validation
paramGrid = ParamGridBuilder() \
.addGrid(rf.numTrees, [10, 20 ]) \
.addGrid(rf.maxDepth, [5, 10]) \
.build()
# Create 5-fold CrossValidator
crossval = CrossValidator(estimator=rf,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(labelCol="Class", metricName="areaUnderROC"),
numFolds=5)
Step 4: Fit, Run cross-validation, and choose the best set of parameters
# Run cross-validation, and choose the best set of parameters
rf_model = crossval.fit(train_data)
# Make predictions on test data
predictions_rf = rf_model.transform(test_data)
# Evaluate Random Forest Model
binary_evaluator = BinaryClassificationEvaluator(labelCol="Class", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
pr_evaluator = BinaryClassificationEvaluator(labelCol="Class", rawPredictionCol="rawPrediction", metricName="areaUnderPR")
auc_rf = binary_evaluator.evaluate(predictions_rf)
auprc_rf = pr_evaluator.evaluate(predictions_rf)
print(f"Random Forest - AUC: {auc_rf:.4f}, AUPRC: {auprc_rf:.4f}")
# UDF to extract positive probability from probability vector
extract_prob = udf(lambda prob: float(prob[1]), FloatType())
predictions_rf = predictions_rf.withColumn("positive_probability", extract_prob(col("probability")))
Step 5 Function to calculate precision, recall, and F1-score
# Function to calculate precision, recall, and F1-score
def calculate_metrics(predictions):
tp = predictions.filter((col("Class") == 1) & (col("prediction") == 1)).count()
fp = predictions.filter((col("Class") == 0) & (col("prediction") == 1)).count()
fn = predictions.filter((col("Class") == 1) & (col("prediction") == 0)).count()
precision = tp / (tp + fp) if (tp + fp) != 0 else 0
recall = tp / (tp + fn) if (tp + fn) != 0 else 0
f1_score = (2 * precision * recall) / (precision + recall) if (precision + recall) != 0 else 0
return precision, recall, f1_score
Step 6: Find the best threshold for the model
# Find the best threshold for the model
best_threshold = 0.5
best_f1 = 0
for threshold in np.arange(0.1, 0.9, 0.1):
thresholded_predictions = predictions_rf.withColumn("prediction", (col("positive_probability") > threshold).cast("double"))
precision, recall, f1 = calculate_metrics(thresholded_predictions)
if f1 > best_f1:
best_f1 = f1
best_threshold = threshold
print(f"Best threshold: {best_threshold}, Best F1-score: {best_f1:.4f}")
Step7: Evaluate on unseen Data
# Evaluate on unseen data
predictions_unseen = rf_model.transform(unseen_data)
auc_unseen = binary_evaluator.evaluate(predictions_unseen)
print(f"Unseen Data - AUC: {auc_unseen:.4f}")
precision, recall, f1 = calculate_metrics(predictions_unseen)
print(f"Unseen Data - Precision: {precision:.4f}, Recall: {recall:.4f}, F1-score: {f1:.4f}")
area_under_roc = binary_evaluator.evaluate(predictions_unseen)
area_under_pr = pr_evaluator.evaluate(predictions_unseen)
print(f"Unseen Data - AUC: {area_under_roc:.4f}, AUPRC: {area_under_pr:.4f}")
RESULTS
Best threshold: 0.30000000000000004, Best F1-score: 0.9062
Unseen Data - AUC: 0.9384
Unseen Data - Precision: 0.9655, Recall: 0.7568, F1-score: 0.8485
Unseen Data - AUC: 0.9423, AUPRC: 0.8618
You can then save this model (few KBs) and use it anywehere in pyspark pipeline
rf_model.save()
Here’s why PySpark makes a huge difference when dealing with large datasets in real-world machine learning tasks:
It Scales Easily: PySpark can distribute tasks across clusters, allowing you to process terabytes of data without running out of memory.
On-the-fly Data Processing: PySpark doesn’t need to load the entire dataset into memory. It processes the data as needed, which makes it much more efficient.
Faster Model Training: With distributed computing, you can train models faster by distributing the computational workload across multiple machines.
Final Thoughts
PyTorch and TensorFlow are fantastic tools for building machine learning models, but for real-world, large-scale tasks, you need more. Distributed computing with PySpark allows you to handle huge datasets efficiently, process data in real-time, and scale your machine learning pipelines.
So, the next time you’re working with massive data—whether it’s fraud detection, recommendation systems, or financial analysis—consider using PySpark to take your project to the next level.
For the full code and results, check out this notebook. :
https://colab.research.google.com/drive/1W9naxNZirirLRodSEnHAUWevYd5LH8D4?authuser=5#scrollTo=odmodmqKcY23
__
I am Swapnil, feel free leave your comments Results and ideas, or ping me - swapnil@nooffice.no for Data, Software dev gigs and jobs
Posted on September 9, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
September 9, 2024