How to Easily Manage and Scale AI Models with Workload Orchestration
martinbald81
Posted on February 16, 2024
As we scale and deploy more models into this production process across multiple cloud environments, Data Scientists and ML Engineers are burdened with spending too many valuable cycles on the data plumbing and repetitive tasks needed just to get models to run and produce business reports – often using tools not designed for AI workloads.
Data engineers are also spending far too many cycles supporting data scientists as they try to run and analyze ML pipelines instead of building robust upstream data pipelines to ensure business continuity. In attempting to achieve value from their ML efforts, they soon find bottlenecks preventing them from realizing the production demands they need.
In this blog post we will go through model workload orchestration and show how to continue the journey for building scale and ease of management for deploying sustainable and value producing models into production.
Getting your AI models into production can be a challenging task in itself and once your models are in production AI teams can then encounter a number of operational challenges bringing about;
- Long cycles spent on plumbing to get models to run.
- Dependency on DevOps/ITOps to build, run, and analyze ML pipelines.
- Lengthy inefficient feedback loops for value-generating insights.
- In house roles unable to scale to meet all the business demands.
By implementing workload orchestration practice into your AI production lifecycle you can help mitigate and likely eliminate these challenges altogether and bring about a number of benefits that will help your team and the business realize the value of your AI models sooner rather than later. Some of these benefits manifest as
- Enabling Data Scientists and AI Engineers to automate model operations production
- Create repeatable, scalable production workflows across different use cases
- Launch AI initiatives in production with little to no engineering overhead
- Time and cost savings with optimized compute utilization
- Simplifying complexity
Regardless of your industry vertical or size of your business AI workload orchestration bring about efficiencies and centralized ease of management across multiple uses cases. Some of these use cases and examples are;
Demand Forecasting
- Retail product demand and dynamic pricing
- Supply chain forecasting
Reporting Automation
- Manufacturing worker safety for production loss for injuries/maintenance
- FinTech fraud reporting and/or compliance reporting
Event Simulation
- Retail seasonal product demand
- Entertainment - Subscription renewals and promotions.
- Ad Tech campaigns
Churn Modeling
- Ad Tech/Entertainment - Subscriptions abandonment
- Client behavior and account renewals
- Online Retail - Consumer behavior
There are many more use cases and benefits for workload orchestration but you get the picture that it helps bring structure, efficiencies and scale not only to your models in production but also helps scale your AI team and give them back valuable bandwidth.
What is Workload Orchestration?
Let's take a minute to walk through what an orchestration is.
In the diagram below we can see that at its core it is a Python file, one or more Python files to be exact designed to run and perform some task. These Python files can contain any kind of processing code, other dependencies that we need. Essentially these files will contain references to one or more deployed pipelines. This allows us to schedule runs of these files and reference these pipelines that are deployed as needed.
It also fully supports the connections that we make so I can have as many of those connections as we need. We often see people using these automations to take live input feeds into the pipelines and write the results to another external data source or file store.
Once these are set up I can wrap them all in this orchestration and register that orchestration in the platform. This means that I can then create what is called Tasks or Runs of this Orchestration.
These can be done On Demand or Ad Hoc or we can schedule this to run on a regular basis. For example we could schedule it to run every minute, day, week, month etc,. The flexibility of this is up to you and the needs of your team and business.
AI Workload Orchestration flow works within 3 tiers:
Tier | Description |
ML Workload Orchestration | User created custom instructions that provide automated processes that follow the same steps every time without error. Orchestrations contain the instructions to be performed, uploaded as a .ZIP file with the instructions, requirements, and artifacts. |
Task | Instructions on when to run an Orchestration as a scheduled Task. Tasks can be Run Once, where it creates a single Task Run, or Run Scheduled, where a Task Run is created on a regular schedule based on the Kubernetes cronjob specifications. If a Task is Run Scheduled, it will create a new Task Run every time the schedule parameters are met until the Task is killed. |
Task Run | The execution of a task. These validate business operations and successfully identify any unsuccessful task runs. If the Task is Run Once, then only one Task Run is generated. If the Task is a Run Scheduled task, then a new Task Run will be created each time the schedule parameters are met, with each Task Run having its own results and logs. |
Automate Batch Forecasts with Workload Orchestrations
To show an example of Workload Orchestration in action we have a saved Tensorflow LSTM model trained on the popular Airline Passenger Forecasting dataset. We will walk through:
- Deploying this model to production using Wallaroo
- Making connections to external datastores to fetch information for inferencing
- Automating forecasting jobs to be run on-demand or at scheduled intervals
Our first step is to import the Python libraries needed.
import numpy as np
import pandas as pd
import tensorflow as tf
import wallaroo
from wallaroo.framework import Framework
from wallaroo.deployment_config import DeploymentConfigBuilder
from google.cloud import bigquery
from google.oauth2 import service_account
import datetime
import time
import utils
import timeseries_plot_utils as tsplot
[output]
%matplotlib inline
tsplot.sim_forecast_steps(num_steps=3)
Model Upload and Deployment
First, we get a connection to our Wallaroo instance and set the workspace we want to deploy to.
You will get the following message and by clicking on the URL and selecting Yes you will be authenticated to the instance.
Please log into the following URL in a web browser:
https://keycloak.demo.pov.wallaroo.io/auth/realms/master/device?user_code=TRUH-DZII
Login successful!
wl = wallaroo.Client()
workspace = utils.get_workspace(wl, "airline-forecast")
_ = wl.set_current_workspace(workspace)
We specify our Tensorflow SavedModel file and the framework (Tensorflow) and the upload_model function takes care of uploading the model and making it available and ready for deployment in our workspace:
framework=Framework.TENSORFLOW
model = wl.upload_model("lstm-forecast",
"models/tf_lstm_model.zip",
framework=framework)
We set any metadata around our pipeline, determine how much hardware we want each deployed instance to have and how many replicas we want deployed. In this example, we deploy 1 but I can deploy more to handle concurrent requests as necessary. Following that, we're ready to build our pipeline and deploy to production.
pipeline_name = "lstm-forecast-airline"
deployment_config = DeploymentConfigBuilder() \
.replica_count(1) \
.cpus(1).memory('2Gi') \
.build()
pipeline = wl.build_pipeline(pipeline_name) \
.add_model_step(model) \
.deploy(deployment_config=deployment_config)
[output]
ok
Now, we can test our deployment with an input dataset. We use a quick function to grab one datapoint and scale it:
sample_data_point = utils.get_sample_data()
sample_data_point
[output]
array([[[0.48455598],
[0.61389961],
[0.6969112 ]]])
We can then put this into the DataFrame format Wallaroo expects:
input_df = pd.DataFrame({"lstm_input": sample_data_point.reshape(1,1,3).tolist()})
input_df
results = pipeline.infer(input_df)
results
Connect to Production Data Storage
Now that we can see my model works as expected in production, let's connect to our production BigQuery instance to source data from there:
conn = wl.get_connection("bigquerybatchstorage")
bq_client = bigquery.Client(
credentials = service_account.Credentials.from_service_account_info(
conn.details()),
project=conn.details()['project_id']
)
Now that we have this connection, I can write queries to fetch batch data live:
sample_dataframe = bq_client.query(
f"""
SELECT *
FROM sample_datasets.airline_passengers
LIMIT 5"""
).to_dataframe()
[output]
Automate Batch Forecasts with Workload Orchestrations
Now that I have a production model and a connection to my production datasource, all that remains is to encapsulate this process so that we can kickoff batch jobs on a schedule or on demand. This is done using Wallaroo's Machine Learning Workload Orchestration feature.
The first step here is to create a Python file that encapsulates the workflow. We have done that in our automation/main.py file. Once we have this file and all dependencies in a zip file, I can register this orchestration in Wallaroo:
orchestration = wl.upload_orchestration(name="airline_forecast", path="./automation/forecast_orchestration.zip")
We can confirm we see this orchestration along with any others:
wl.list_orchestrations()[-1]
[output]
We can now create tasks of this orchestration. Tasks are instances of this orchestration that run with a particular set of inputs. These can be created on demand or on schedule:
orchestration = wl.list_orchestrations()[-1]
start_time = datetime.datetime.now()
task = orchestration.run_once(name="forecast_run", json_args={"workspace_name": "airline-forecast",
"pipeline_name": "lstm-forecast-airline",
"current_date": "1950-06",
"forecast_window": 1})
We can examine the status of the runs manually:
task.last_runs()
[output]
Or we can block by examining log outputs for the pipeline:
logs = pipeline.logs(start_datetime = start_time, end_datetime = datetime.datetime.now())
while logs.empty:
time.sleep(1)
logs = pipeline.logs(start_datetime = start_time, end_datetime = datetime.datetime.now())
Finally, we can schedule this job to run on a regular basis using cron-style scheduling. Let's have this job run every Sunday:
schedule = "0 23 * * 0"
task_scheduled = orchestration.run_scheduled(name="airline-forecast-weekly-scheduled",
timeout=600,
schedule=schedule,
json_args={"workspace_name": "airline-forecast",
"pipeline_name": "lstm-forecast-airline",
"current_date": "1950-06",
"forecast_window": 1})
Final Product
I can use on-demand runs of this orchestration to create the plots we saw earlier:
tsplot.sim_forecast_steps(num_steps=3)
Conclusion
In this blog post we have addressed a very common set of challenges that AI teams face with scaling and managing production ML workloads and how to solve them through Model Workload Orchestration. This means that Ai teams can create efficiencies to easily define, automate, and scale recurring production ML workloads that ingest data from predefined data sources, run inferencing, and deposit the results to a predefined location.
If you want to try the steps in this blog post series you can access the tutorial ML Workload Orchestration and use the free inference servers available on the Azure Marketplace. Or you can download a free Wallaroo.AI Community Edition which you can install to GitHub Codespaces, or Azure.
Wallaroo.AI is a unified production ML platform built for Data Scientists and ML Engineers for easily deploying, observing, and optimizing machine learning in production at scale – in any cloud, on-prem, or at the edge.
Posted on February 16, 2024
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.