Building the Data API for MLOps — 4 years of lessons learnt
MagicLex
Posted on August 31, 2022
When it comes to feature stores, there are two main approaches to feature engineering. One approach is to build a domain specific language (DSL) that covers all the possible feature engineering steps (e.g., aggregations, dimensionality reduction and transformations) that a data scientist might need. The second approach is to use a general purpose framework for feature engineering, based on DataFrames (Pandas or Spark), to enable users to do feature engineering using their favorite framework. The DSL approach requires re-writing any existing feature engineering pipelines from scratch, while the DataFrames approach is backwards compatible with existing feature engineering code written for Pandas or Spark.
At Hopsworks, we pioneered the DataFrames approach and our APIs reflect that. There are a couple of reasons behind this choice.
Flexibility: While the very first feature stores (e.g., Uber’s Michelangelo, AirBnB’s Zipline) adopted a DSL, they were developed inside a company for the specific needs of that company. Hopsworks is a general purpose feature store, and our customers run a plethora of different use cases on Hopsworks: anomaly detection, recommendations, time-series forecasting, NLP, etc. For Hopsworks to be able to support all these different use cases, we need to provide data scientists with a powerful and flexible API to achieve what they need. These tools already exist and are represented by Pandas, PySpark and the vast array of libraries that the Python ecosystem already provides. New Python libraries for feature engineering are usable immediately in feature engineering pipelines.
User experience: The Hopsworks feature store is meant to be a collaborative and productivity enhancing platform for teams. We believe that if users had to learn a new DSL to be able to create features it would drastically reduce the feature store’s attractiveness to developers, due to the need to learn a new framework with limited transferable skills — the DSL is seen as a form of vendor lock-in. With Hopsworks users can keep using the same libraries and processes they already know how to use.
Bring your own pipeline: The vast majority of Hopsworks users are working on brownfield projects. That is to say that they already have feature engineering pipelines running in their environment and they are not starting from scratch. Having a DSL would mean having to re-engineer those pipelines to be able to use a feature store. With Hopsworks and the DataFrame APIs, it doesn’t matter if the pipelines are using Python, PySpark or SQL. Users just need to change the output of the pipeline to write a DataFrame to Hopsworks instead of saving the DataFrame to a S3 bucket or a data warehouse.
Write API — Feature Group
Hopsworks feature store is agnostic to the feature engineering step. You can run the process on different environments: from Colab to Snowflake, from Databricks to SageMaker and, on Hopsworks itself. The only requirement for developers is to use the HSFS (HopsworkS Feature Store) library to interact with Hopsworks
At the end of your feature pipelines, when you have the final DataFrame, you can register it with Hopsworks using the HSFS API. Features in Hopsworks are registered as a table of features, called a feature group.
Hopsworks provides several write modes to accommodate different use cases:
- Batch Write: This was the default mode prior to version 3.0. It involves writing a DataFrame in batch mode either to the offline feature store, or to the online one, or to both. This mode is still the default when you are writing a Spark DataFrame.
- Streaming Write: This mode was introduced in version 2.0 and expanded in version 3.0. Streaming writes provide very low latency streaming updates to the online store and efficient batch updates to the offline store, while ensuring consistency between the online and offline stores. In Hopsworks 3.0, this is the mode for Python clients.
- External connectors: This mode allows users to mount tables (of features) in Data Warehouses like Snowflake, Redshift, Delta Lake, and BigQuery as feature groups in Hopsworks.The tables are not copied into Hopsworks, and the Data Warehouse becomes the offline store for Hopsworks. Hopsworks manages the metadata, statistics, access control, and lineage for the external tables. Hopsworks can act as a virtual feature store, where many different Data Warehouses can be offline stores in the same Hopsworks feature store.
Create
Before ingesting the engineered features you need to create a feature group and define its metadata. An example of feature group definition is the following:
fg = fs.get_or_create_feature_group(
name="transactions",
version=1,
description="Transaction data",
primary_key=['cc_num'],
event_time='datetime',
online_enabled=True
)
In the example above we define a feature group called transactions. It has a version (read about on our approach to versioning), a description, a primary key, partition key and an event time. The primary_key
is required if we (1) want to be able to join the features in this feature group with features in other feature groups, and (2) for retrieval of precomputed features from the online store. The partition_key is used to enable efficient appends to storage and efficient querying of large volumes of feature data, reducing the amount of data read in pruned queries. The event_time
specifies the column in our feature group containing the timestamp for the row update (when the event happened in the real-world), enabling features to be able to correctly be joined together without data leakage. The online_enabled
attribute defines whether the feature group will be available online for real time serving or not.
As you can see, we have not defined the full schema of the feature group. That’s because the information about the feature names and their data types are inferred from the Pandas DataFrame when writing.
To write the features to Hopsworks, users need to call fg.insert(df), where df is the Pandas DataFrame. At this stage the platform takes over and starts creating all the necessary feature metadata and scaffolding. As mentioned above, you can but you don’t have to, explicitly specify the schema of the feature group. If you don’t, the feature names and data types are mapped based on the columns of the Pandas DataFrame (read more on data types and mapping).
Feature Data Validation
In Hopsworks 3.0, we introduced first class support for Great Expectations for validation of feature data. Developers have the option of registering a Great Expectation suite to a feature group. In this case, before sending the Pandas DataFrame to Hopsworks for writing, the feature store APIs transparently invoke the Great Expectations library and validate the DataFrame. If it complies with the expectations, the write pipeline proceeds and the data is written into the feature store. If the DataFrame doesn’t comply with the expectation suite, an alert can be sent to a configured channel (e.g., Slack or Email). Alert channels are securely defined in Hopsworks.
Write
The write pipeline involves the Pandas DataFrame being serialized as Avro and securely written to a Kafka topic. The APIs also take care of serializing complex features like embeddings in such a way that they can be stored correctly.
From the Kafka topic, the data is picked up immediately by the online feature store service which streams it into the online feature store (RonDB). For offline storage, a job can be scheduled at regular intervals to write the data to the offline feature store. With this “kappa-style” architecture, Hopsworks can guarantee that the online data is available as soon as possible (TM), while at the same time, it can be compacted and written periodically in larger batches to the offline feature store to take advantages of the performance improvements given by large files in systems like Spark, S3 and HopsFS. Finally, Kafka only ensures at-least-once semantics for features written to the Kafka topic, but we ensure the correct, consistent replication of data to online and offline stores using idempotent writes to the online store, and ACID updates with duplicate record removal to the offline store.
Statistics Computation
Finally after the data has been written in the offline feature store, its statistics are updated. For each feature group, by default, Hopsworks transparently computes descriptive statistics, the distribution, and correlation matrix for features in the feature group. Statistics are then presented in the UI for users to explore and analyze.
Read API — Feature View
The feature view is a new abstraction introduced in Hopsworks 3.0. Feature views are the gateway for users to access feature data from the feature store. At its core, a feature view represents the information about which features, from which feature groups, a model needs. Feature views contain only metadata about features, similar to how views in databases contain information about tables. In contrast to database views, however, feature views can also extend the features (columns) with feature transformations — more on this later.
Feature Selection
The first step to create a feature view is to select a set of features from the feature store. Features can be selected from different feature groups which are joined together. Hopsworks provides a Pandas-style API to select and join features from different feature groups. For example:
# Get a reference to the transactions and aggregation feature groups
trans_fg =
fs.get_feature_group(
name='transactions_fraud_batch_fg',
version=1)
window_aggs_fg =
fs.get_feature_group(
name='transactions_4h_aggs_fraud_batch_fg',
version=1)
#Select features from feature groups and join them with other features
ds_query = trans_fg.select(["fraud_label", "category", "amount",
"age_at_transaction", "days_until_card_expires",
"loc_delta"])\
.join(window_aggs_fg.select_except(["cc_num"])
What the Hopsworks feature store does on your behalf is to transpile the Pandas-like code into a complex SQL query that implements a point-in-time correct JOIN. As an example, the above snippet gets transpiled into:
WITH right_fg0 AS (
SELECT
*
FROM
(
SELECT
`fg1`.`fraud_label` `fraud_label`,
`fg1`.`category` `category`,
`fg1`.`amount` `amount`,
`fg1`.`age_at_transaction` `age_at_transaction`,
`fg1`.`days_until_card_expires` `days_until_card_expires`,
`fg1`.`loc_delta` `loc_delta`,
`fg1`.`cc_num` `join_pk_cc_num`,
`fg1`.`datetime` `join_evt_datetime`,
`fg0`.`trans_volume_mstd` `trans_volume_mstd`,
`fg0`.`trans_volume_mavg` `trans_volume_mavg`,
`fg0`.`trans_freq` `trans_freq`,
`fg0`.`loc_delta_mavg` `loc_delta_mavg`,
RANK() OVER (
PARTITION BY `fg0`.`cc_num`,
`fg1`.`datetime`
ORDER BY
`fg0`.`datetime` DESC
) pit_rank_hopsworks
FROM
`fabio_featurestore`.`transactions_1` `fg1`
INNER JOIN `fabio_featurestore`.`transactions_4h_aggs_1` `fg0` ON `fg1`.`cc_num` = `fg0`.`cc_num`
AND `fg1`.`datetime` >= `fg0`.`datetime`
) NA
WHERE
`pit_rank_hopsworks` = 1
) (
SELECT
`right_fg0`.`fraud_label` `fraud_label`,
`right_fg0`.`category` `category`,
`right_fg0`.`amount` `amount`,
`right_fg0`.`age_at_transaction` `age_at_transaction`,
`right_fg0`.`days_until_card_expires` `days_until_card_expires`,
`right_fg0`.`loc_delta` `loc_delta`,
`right_fg0`.`trans_volume_mstd` `trans_volume_mstd`,
`right_fg0`.`trans_volume_mavg` `trans_volume_mavg`,
`right_fg0`.`trans_freq` `trans_freq`,
`right_fg0`.`loc_delta_mavg` `loc_delta_mavg`
FROM
right_fg0
)
The above SQL statement pulls the data from the specified data sources, e.g. if it is an external feature group defined over a Snowflake table, the SQL query will fetch the necessary data from Snowflake. The HSFS APIs also infer the joining keys based on the largest matching subset of primary keys of the feature groups being joined. This default behavior can be overridden by data scientists who can provide their own joining conditions.
More importantly though, the query enforces point in time correctness of the data being joined. The APIs will join each event you want to use for training with the most recent feature value before the event occurred (for each feature selected).
As you can see above, the query is quite complex and it would be error prone to write manually. The Hopsworks feature store makes it easy for data scientists to select and correctly join features using a Pandas-like API — one they are already familiar with.
To create a feature view, you call the create_feature_view()
method. You need to provide a name, the version, the query object containing the input features, and a list of features that will be used as a label (target) by your model. The label(s) will not be returned when retrieving data for batch or online scoring.
feature_view = fs.create_feature_view(
name='transactions_view',
version=1,
query=ds_query,
labels=["fraud_label"]
)
Feature Transformations
Although feature transformations can be performed before features are stored in the feature store, a feature store can increase feature reuse across different models by supporting consistent feature transformations for both offline and online APIs (training and inference). Hopsworks can transparently perform feature transformations with Python UDFs (user-defined functions) when you select features from the feature store. For example, when you select features for use in a feature view, you might decide to normalize a numerical feature in the feature view.
Let’s look at the implications of only supporting feature transformations before the feature store (as is the case in many well known feature stores). Assume you adopt the OBT ( one big table) data modeling approach, and store several years of engineered data in a feature group containing data for all your customers. You might have several models that use the same features in that feature group. One model might be trained using those rows with data for only US customers, a second model only uses European customer data, a third model might be trained on the entire history of the data available in the feature group, while a fourth model might be trained on only the last year of data. Each model is trained on a different training dataset. And each of these training datasets has different rows, and hence different descriptive statistics (min, max, mean, standard deviation). Many transformation functions are stateful, using descriptive statistics. For example, normalizing a numerical feature uses the mean value for that feature in the training dataset.
If you had transformed your features before storing them in the feature store, you could not create the four different training sets using the same feature groups. Instead, you would have one feature group with all the data available for the third model. You would also have the problem of how to train the fourth model on the last year of data. Its descriptive statistics are different from the full dataset, so transformed feature values for the full dataset and the last year of data would be different. You would need to store the last year of data in a different feature group. The same is true for models trained on data for US and EU customers, respectively. With this pattern, the amount of data storage required to store your features and the number of feature groups needed is a function of the number of models you have in production, not the number of features used by your models!
By applying the transformations only when using the features, the same set of features can be used by all models — meaning you only need to store your feature data once, and your model transforms the feature on-demand. Transforming features before the feature store is, in general, an anti-pattern that increases cost both in terms of storage but also in terms of the number of feature pipelines that need to be maintained. The only exception to this rule is high value online models where online transformation latency is too high for the use case, but this is a rare exception to the rule (that is anyway supported in Hopsworks).
You can specify what features to transform and the transformation functions to apply to those features by providing a dictionary of features and transformation functions. Hopsworks comes with a set of built-in transformation functions (such as MinMax
Scalar and LabelEncoder
). You can also define and register custom transformation functions as Python functions that take the feature as input and return the transformed feature as output.
The feature view stores the list of features and any transformation function applied to those features. Transformation functions are then transparently applied both when generating the training data, as well as when generating a batch of single feature vectors for inference. The feature view also stores the descriptive statistics for each versioned training dataset it creates, enabling transformation functions to use the correct descriptive statistics when applying transformation functions. For example, if our 4th model that used only the last year of data was training dataset version 4, then transformation functions for that 4th model would use the descriptive statistics (and any other state needed) from version 4 of the training data.
# Load transformation functions.
min_max_scaler =
fs.get_transformation_function(name="min_max_scaler")
label_encoder =
fs.get_transformation_function(name="label_encoder")
# Map features to transformations.
transformation_functions_map = {
"category": label_encoder,
"amount": min_max_scaler,
"trans_volume_mavg": min_max_scaler,
"trans_volume_mstd": min_max_scaler,
"trans_freq": min_max_scaler,
"loc_delta": min_max_scaler,
"loc_delta_mavg": min_max_scaler,
"age_at_transaction": min_max_scaler,
"days_until_card_expires": min_max_scaler,
}
Training Data
As mentioned already, training data is generated using a feature view. The feature view holds the information on which features are needed and which transformation functions need to be applied.
Training data can be automatically split into train, test and validation sets. When that happens, the necessary statistics for the transformation functions are automatically computed only on the train set. This prevents leakage of information from the validation and test set into the model trained on the train set.
Training data can be generated on the fly as shown below:
X_train, y_train, X_val, y_val, X_test, y_test = feature_view.train_validation_test_splits(validation_size=0.3, test_size=0.2)
Alternatively, users can launch a Hopsworks job that generates and stores the training data as files in a desired file format (e.g., CSV, TFRecord). This is useful, for instance, when your training data does not fit in a Pandas DataFrame, but your model training pipeline can incrementally load training data from files, as TensorFlow does with its DataSet API for files stored in TFRecord format.
Prediction Services
When it comes to put the model into production, there are two classes of prediction services we can build with models:
Analytical Models: Models where inference happens periodically and in batches.
Operational Models: Models where inference happens in real time with strict latency requirements.
For Analytical Models, best practice dictates that the inference pipeline should be set up such that the data to be scored is already available in the feature store. What this means in practice is that the new (unseen) data (features) is extracted from the feature groups, transformed and returned as DataFrames or files. A batch scoring program will then load the correct model version and perform inference on the new data, with the predictions stored in some sink (which could be an operational database or even another feature group in Hopsworks).
By setting up the feature pipeline such that the same data is feature-engineered for both training and inference in feature groups, the same inference data can then be used in future iterations of model training when the actual outcomes of the batch inference predictions become known and are stored in the feature store.
To retrieve the batch inference data, you can use the get_batch_data
method. You need to provide a time interval for the window of data you need to score. Example:
transactions_to_score =
feature_view.get_batch_data(start_time = start_time, end_time = end_time)
For operational Models, predictions need to be available with strict latency requirements. What it means in practice is that the feature data needs to be fetched from the online feature store. Typically, only one or a small set of feature vectors are scored by online inference pipelines. The feature view APIs provide a way to retrieve the feature vectors from the online feature store. In this case, users need to provide a set of keys (primary keys) for feature groups that are used in the feature view:
transactions_to_score =
feature_view.get_feature_vector(entry={'cc_num': '12124324235'})
Additionally for some use cases, some of the features needed to make a prediction are only known at runtime. For this, you can explicitly include the features and their untransformed feature values in the feature vector retrieval call, indicating these features are provided by the client . The feature view will apply the feature transformations to both feature values retrieved from the online feature store as well as the client-provided feature values provided in real time.
Get started
As always you can get started building great models on Hopsworks by using our serverless deployment. You don’t have to connect any cloud account or deploy anything, you can just register on app.hopsworks.ai and start building.
Originally published at https://www.hopsworks.ai.
Posted on August 31, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.