Taming the Machine Learning Pipeline Beast: ZenML Edition
Arka Dash
Posted on November 14, 2024
Intro to the Zen of ZenML
Buckle up, because we’re going on a journey from Jupyter jungle to ZenML nirvana. No, ZenML won’t make you a meditation master, but it will make you a pipeline pro. So, set aside your 100+ lines of spaghetti code; it’s time to bring in the big guns.
To follow along, install ZenML (trust me, it’s easier than explaining to your boss why your last model broke). Types matter here, so no freestyle coding; we’ll talk about that as we go.
First Things First: The Sacred pipelines.py
Create a new file called pipelines.py
. In this masterpiece, we’ll build our pipeline—something a bit cleaner than a tangled mess of data processing. Start with ZenML’s pipeline decorator:
from zenml import pipeline
@pipeline(name="used_car_price_predictor")
def ml_pipeline():
# We’ll fill in these dots soon.
...
Step 1: Data Ingestion, a.k.a. Opening Pandora’s Zip
Here’s our first ZenML step, where we’ll read in data from a .zip
file (because, of course, data never comes in simple CSVs). Meet our data_ingestion_step
function, where we import the data and throw it into an artifact—a ZenML term for “we’re passing this mess to the next step, but it’s technically fancy now.”
from zenml import step
import pandas as pd
from typing import Tuple
@step(enable_cache=False)
def data_ingestion_step(file_path: str) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
# Extract zip files and read data with pd.read_csv()
...
return train, test, sample # This tuple is now an “Artifact” – no fancy unboxing needed
In ml_pipeline
, we extract the actual data from the artifact like this:
raw_data_artifacts = data_ingestion_step(file_path="data/playground-series-s4e9.zip")
train, test, sample = raw_data_artifacts
Straightforward Steps (Don’t Get Too Comfortable)
Step 2: Missing Values, Feature Engineering, and Outlier Detection
These steps are relatively painless, but don’t get cocky. Using ZenML’s step
decorator, we handle missing values, engineer features, and clean up outliers.
@step(enable_cache=False)
def handle_missing_values_step(df: pd.DataFrame) -> pd.DataFrame:
# Code to fill missing values
...
@step(enable_cache=False)
def feature_engineering_step(df: pd.DataFrame, strategy: str, features: list) -> pd.DataFrame:
# Log-transform and other fancy tricks
...
@step(enable_cache=False)
def outlier_detection_step(df: pd.DataFrame, feature: str, strategy: str, method: str) -> pd.DataFrame:
# Outlier removal or adjustment
...
And in the pipeline:
filled_train = handle_missing_values_step(train)
engineered_train = feature_engineering_step(filled_train, strategy='log', features=['price'])
cleaned_train = outlier_detection_step(df=engineered_train, feature='price', strategy='IQR', method='remove')
Step 3: Data Splitting
Our data is finally clean. Now it’s time to split it into training and testing sets. You’d think this would be the easy part, but you’d be wrong—type casting is key.
X_train, X_test, y_train, y_test = data_splitter(cleaned_train)
The Model Building Labyrinth
Step 4: Building a Model That Doesn’t Break Every Step
Here’s where things get tricky. Sklearn’s RegressorMixin
is useful for portability, but ZenML artifacts don’t always play nice. So, we hack it by making a custom PipelineRegressor
class:
from sklearn.pipeline import Pipeline
from sklearn.base import RegressorMixin
class PipelineRegressor(Pipeline, RegressorMixin):
pass
Now, we use this class in our model_building_step
. You’ll need to initialize mlflow
, log the columns, and wrap up the process:
@step(enable_cache=False)
def model_building_step(x_train: pd.DataFrame, y_train: pd.Series) -> RegressorMixin:
mlflow.start_run()
model_pipeline = PipelineRegressor(steps=[...]) # Add preprocessing, etc.
model_pipeline.fit(x_train, y_train)
# Log features
processed_columns = list(model_pipeline.named_steps['preprocessor'].get_feature_names_out())
mlflow.log_dict({'used_columns': processed_columns}, 'used_columns.json')
mlflow.end_run()
return model_pipeline
Evaluating with Just Enough Data to Feel Smart
Step 5: Model Evaluation
With the model built, we make some predictions and log evaluation metrics—if only it were as simple as “look, it’s accurate!” Here’s the ZenML version of that:
@step(enable_cache=False)
def evaluation_step(model_pipeline: RegressorMixin, X_test: pd.DataFrame, y_test: pd.Series) -> pd.DataFrame:
y_pred = model_pipeline.predict(X_test)
rmse, r2 = mean_squared_error(y_test, y_pred, squared=False), r2_score(y_test, y_pred)
mlflow.log_metric("rmse", rmse)
mlflow.log_metric("r2", r2)
return pd.DataFrame({"rmse": [rmse], "r2": [r2]})
The End: A.k.a., Our ZenML Workflow is Complete
Congratulations, you made it! Now, run ml_pipeline()
and head over to the ZenML dashboard for a DAG view of the process. The MLFlow UI will display metrics, model details, and features in use.
Useful Links
Posted on November 14, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.