Pipelines in ML: A guide to developing good workflows
Fortune Adekogbe
Posted on February 14, 2021
When I got started with machine learning, I did not consider pipelines to be very important. They just seemed like one of those things some people used but isn't really essential. I was creating reasonably accurate models in my notebooks and it felt alright.
Eventually, I began to focus more on the later part of a models' life — deployment and using models in Production — and thus the importance of a structured workflow became glaring.
In this article, I will walk us through a machine learning problem with our preprocessing handled in a Pipeline. You may be familiar with using kubeflow and docker on your choice cloud platform to do this but this article will make use of the good ol' Sklearn.
Note:
- We will focus on just the implementation of various preprocessing and feature extraction steps with transforms. Little to no attention will be given to detailed data exploration, visualization and model accuracy. Check out this link to see a version of the notebook that focused on that.
- If any of the packages used is not installed in your environment make sure to install with:
pip install [package_name]
.
The data that is used in this article was gotten from a Data Science Nigeria 2019 Challenge and the aim is to build a predictive model to determine if a building will have an insurance claim during a certain period based on building characteristics. The target variable, Claim, is a:
1 if the building has at least a claim over the insured period. 0 if the building doesn’t have a claim over the insured period.
Making Relevant Imports
import pandas as pd
import numpy as np
train_df = pd.read_csv('train_data.csv')
Claim = train_df.pop('Claim')
train_df.pop('Customer Id');
train_df
First, we import pandas
and numpy
to aid our data wrangling and then we read in the training data using the read_csv
method in pandas. We then use the pop function of DataFrames to remove the customer ID (since the IDs are unique and contain no useful information). The target column is also removed and stored in a variable named Claim
for later use.
from sklearn.model_selection import train_test_split
X_train, X_valid, y_train,y_valid = train_test_split(train_df,Claim,test_size=0.25,
stratify=Claim,random_state=50
)
The training data is split into training and validation sets using the train_test_split
function in sklearn
. A test size of 25% is set alongside a random_state
for reproducibility. The target is parsed into the stratify
argument to ensure that the proportion of 0s and 1s in the original data gets transferred into the splits.
Transformers
scikit-learn provides a library of transformers, which may clean, reduce, expand or generate feature representations. However, in this tutorial, we will be building our custom transformers.
Like other estimators, these are represented by classes with a fit
method, which learns model parameters (e.g. mean and standard deviation for normalization) from a training set, and a transform
method which applies this transformation model to unseen data. The fit_transform
method may be more convenient and efficient for modelling and transforming the training data simultaneously and so we will be using it.
The Empty Transformer
I found that it is useful to have a base class to build up from whenever a transformer needs creating.
from sklearn.base import BaseEstimator, TransformerMixin
class Transformer(BaseEstimator, TransformerMixin):
def __init__(self):
...
def fit(self, X, y=None):
return self
def transform(self, X):
return X
The class above inherits from two other classes namely BaseEstimator
and TransformerMixin
. BaseEstimator
is the base class for all estimators in scikit-learn. The TransformerMixin
is a class that defines and implements the fit_transform
method (this fits to data, then transforms it).
Its __init__
method is an empty function, its fit
method returns the instance while the transform
function returns its input.
To create a custom transformer, at least one of these functions must be modified. We will explore a couple of transformers for the stated problem.
class NanFillerTransformer(BaseEstimator, TransformerMixin):
def __init__(self, column, normalize_ = False):
self.column = column
self.normalize_ = normalize_
def fit(self, X, y=None):
return self
def replace_nan(self, X,feature_name,new_values,weights):
assert len(new_values)==len(weights),'New values do not correspond with weights'
from random import choices
mask= X[feature_name].isna()
length = sum(mask)
replacement = choices(new_values,weights =weights,k=length)
X.loc[mask,feature_name]=replacement
return X[feature_name]
def transform(self, X):
x = X[self.column].value_counts(normalize=True)
X[self.column] = self.replace_nan(X,self.column,x.keys(),x.values)
if self.normalize_: X[self.column] = X[self.column]/X[self.column].max()
return X
This transformer carries out a series of operations that results in missing values in a column being replaced by some predefined values with weights assigned to each value. The "brain" of the transformer is the replace_nan
function and it is used in the transform
method such that, the predefined values are the unique values in the column and the weights are the normalized value counts.
The transformer takes in 2 arguments which are:
-
column
: the label of the column to be filled as a string -
normalize_
: a boolean that determines whether the values in the column are normalized after the missing values are replaced.
Given that it is not an estimator, the fit
method stays unchanged.
class EncoderTransformer(BaseEstimator, TransformerMixin):
def __init__(self,show_map = False):
self.show_map = show_map
def fit(self, X, y=None):
return self
def transform(self, X):
X['NoWindows'] = X['NumberOfWindows'].map(lambda x: 1 if x == ' .' else 0)
X['3-7Windows'] = X['NumberOfWindows'].map(lambda x: 1 if x in '34567' else 0)
X['Other_Windows'] = X['NumberOfWindows'].map(lambda x: 1 if x in '1 2 8 9 >=10'.split(' ') else 0)
X['Sectioned_Insured_Period'] = X['Insured_Period'].map(lambda x: 1 if x==1 else 0)
X['Grouped_Date_of_Occupancy'] = X['Date_of_Occupancy'].map(lambda x: 1 if x>1900 else 0)
return X
class YearEncoderTransformer(BaseEstimator, TransformerMixin):
def __init__(self, show_map = False):
self.show_map = show_map
def fit(self, X, y=None):
return self
def map_counts(self, X,feat):
mapp = X[feat].value_counts(normalize=True)
X[feat].map(lambda x:mapp[x])
return mapp
def transform(self, X):
X['2012-13YearOfObs'] = X['YearOfObservation'].map(lambda x: 1 if x in [2012,2013] else 0)
X['2014YearOfObs'] = X['YearOfObservation'].map(lambda x: 1 if x in [2014] else 0)
X['2015-16YearOfObs'] = X['YearOfObservation'].map(lambda x: 1 if x in [2015,2016] else 0)
X['YearOfObservation']= X['YearOfObservation'].map(lambda x: 2021 - x)
if self.show_map:
self.map_counts(datum,'YearOfObservation')
return X
The EncoderTransformer
and YearEncoderTransformer
carry out operations that are specific to this problem. The decision to carry out the operations in their transform
method was made by observing the data distribution in modified columns.
The show_map attribute of the YearEncoderTransformer
is used to determine whether the map_count
function is run. This function modifies the data in a column with label feat
by replacing each with its normalized value count.
class FeatureCombiningTransformer(BaseEstimator, TransformerMixin):
def __init__(self, columns, drop_any=[]):
self.columns = columns
self.drop_any = drop_any
def fit(self, X, y=None):
return self
def transform(self, X):
suffix = ''.join([i[0] for i in self.columns])
X[f'Combined_{suffix}'] = X[self.columns].sum(axis=0)
for j in self.drop_any:
X.pop(self.columns[j])
print(f">>> Removed {self.columns[j]} from dataframe")
return X
The FeatureCombiningTransformer
transformer sums up the row values in a subset of the data frame. The class is instantiated with the columns whose values are to be combined. The drop_any
attribute contains a list of indexes of original column names that are to be dropped. For uniqueness, the suffix
variable was added.
class DummyFeaturesTransformer(BaseEstimator, TransformerMixin):
def __init__(self, column = None):
self.column = column
def fit(self, X, y=None):
return self
def transform(self, X):
if self.column:
X = pd.get_dummies(X,columns=[self.column])
else:
X = pd.get_dummies(X)
return X
class NormalizedFrequencyTransformer(BaseEstimator, TransformerMixin):
def __init__(self, column, replace_original= True):
self.column = column
self.replace_original = replace_original
def fit(self, X, y=None):
return self
def transform(self, X):
mapper = X[self.column].value_counts(normalize=True)
if self.replace_original:
X[self.column] = X[self.column].map(lambda x:mapper[x])
else:
X[f"Coded_{self.column}"] = X[self.column].map(lambda x:mapper[x])
return X
The DummyFeaturesTransformer
creates a pandas DataFrame that contains dummies made from the categorical data present in the specified column or the entire DataFrame.
The NormalizedFrequencyTransformer
modifies a feature's data by replacing each with its normalized value count.
The Pipeline
Things get a little more interesting here. We have created a number of transformers but how do they get used together? How is the pipeline formed? Well, it starts with an import
statement.
from sklearn.pipeline import Pipeline
data_pipeline = Pipeline([
('et',EncoderTransformer()),
('yet', YearEncoderTransformer()),
('fct',FeatureCombiningTransformer(['Garden', 'Building_Fenced', 'Settlement'], [0])),
('nanft1', NanFillerTransformer('Building Dimension', normalize_ = True)),
('nanft2', NanFillerTransformer('Date_of_Occupancy')),
('normft1', NormalizedFrequencyTransformer("Date_of_Occupancy")),
('nanft3', NanFillerTransformer('Geo_Code')),
('normft2', NormalizedFrequencyTransformer("Geo_Code")),
('normft3', NormalizedFrequencyTransformer("Insured_Period", replace_original = False)),
('normft4', NormalizedFrequencyTransformer("YearOfObservation", replace_original = False)),
('dft', DummyFeaturesTransformer('')),
])
Here we create pipelines object with the Pipeline
function from sklearn
. The same pipeline is used for the train and validation data. The order of the transformer combinations was predetermined but by paying attention to the transforms and their parameters, we will definitely end up on the same page.
Note that the different transforms are included as tuples whose first element is a string naming them. This step can be avoided with the sklearn.pipeline.make_pipeline
function but we will not be implementing it.
from sklearn.preprocessing import LabelEncoder
class ColumnSelectTransformer(BaseEstimator, TransformerMixin):
def __init__(self, column):
self.column = column
def fit(self, X, y=None):
return self
def transform(self, X):
if not isinstance(X, pd.DataFrame):
X = pd.DataFrame(X)
return X[self.column]
class Encoder(BaseEstimator, TransformerMixin):
def __init__(self):
...
def fit(self, X, y=None):
return self
def transform(self, X):
le = LabelEncoder()
return le.fit_transform(X.values).reshape(-1,1)
Before we wrap things up, we create 2 last transforms. The first extracts a column from the DataFrame while the second encodes its content with the LabelEncoder
function from sklearn
.
Feature Unions
The transformation above could have been done like the others and added to the main transforms but a new concept must be introduced. It so happens that we can have transforms combined in series and in parallel. Pipeline
implements the series combination while FeatureUnion
implements the parallel combination of transforms.
from sklearn.pipeline import FeatureUnion
Settlement_onehot = Pipeline([
('cst', ColumnSelectTransformer(['Settlement'])),
("le",Encoder())
])
Building_Fenced_onehot = Pipeline([
('cst', ColumnSelectTransformer(['Building_Fenced'])),
("le",Encoder())
])
Building_Painted_onehot = Pipeline([
('cst', ColumnSelectTransformer(['Building_Painted'])),
("le",Encoder())
])
categorical_features = FeatureUnion([
("Settlement_onehot", Settlement_onehot),
("Building_Fenced_onehot", Building_Fenced_onehot),
("Building_Painted_onehot", Building_Painted_onehot),
])
In the above code snippet, we have implementations of both the series and parallel combinations and it culminates in a pipeline named categorical_features
.
data_transformer = FeatureUnion([
('features', data_pipeline),
('categorical', categorical_features),
])
train = data_transformer.fit_transform(X_train)
validate = data_transformer.fit_transform(X_valid)
Here we use the FeatureUnion
function we just learned of to combine both categorical_features
pipeline with the data_pipeline
we created earlier.
We then go ahead and transform our training and validation data by using the fit_transform
method.
Training
In this tutorial, the Categorical boosting classifier is used with a learning rate of 0.01
and 400
estimators. log_loss
is used as the metric for comparing models.
from catboost import CatBoostClassifier
from sklearn.metrics import log_loss
cbc1 = CatBoostClassifier(verbose=0,learning_rate = 0.01, n_estimators=400)
cbc1.fit(train, y_train)
print(f'Train score: {log_loss(y_train, cbc1.predict_proba(train))}')
print(f'Validation score: {log_loss(y_valid, cbc1.predict_proba(validate))}')
Saving our Model
The model is then saved using the joblib
library in Python as is shown below.
import joblib
filename = 'Insurance_model.sav'
joblib.dump(cbc1,open(filename,'wb'))
In practice, I would advise that the specific preprocessing steps are finalized before creating custom transforms and a pipeline.
Thanks for reading through, I hope you enjoyed and learnt from this. Check out this repository for the data and script(s) related to this article.
Reference
Posted on February 14, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.