ทดสอบทำ Machine Learning predict customer churn โดยใช้งาน Amazon SageMaker กับ Snowflake!
Chatchai Komrangded (Bas)
Posted on September 17, 2021
โพสต์นี้เราจะทดสอบการใช้ Amazon SageMaker ด้วย AutoGlueon Tabular ในการ train และ inference machine learning โดยเอาข้อมูลจาก Cloud data warehouse อย่าง snowflake ซึ่ง snowflake ถือว่าเป็น AWS partner solution ที่อยู่บน AWS marketplace, หลายๆคนอาจสงสัยว่าเอ๊ะ AWS ก็มี Amazon Redshift นิ? ใช่ครับ เรามีตัวอย่างเยอะเลยที่ใช้ Amazon redshift ร่วมกัย Sage Maker หาแปปเดียวก็เจอ 55 ตัวอย่าง, แต่เราก็จะมีบางลูกค้าในไทยที่มีการใช้ Solution ของ AWS partner เช่น snowflake คำถามที่ลูกค้าถามบ่อยๆคือจะ integrated solution รอบๆเหล่านี้เข้าไปที่ AWS บ้านหลังใหญ่ๆของเค้าได้ยังไงเช่นทำพวก ML บน Sage Maker, ก็เลยคิดว่าคงมีประโยชน์ถ้าเราทำตัวอย่างการ integration ระหว่าง Amazon Sage maker กับ Snowflake นี้กันดู! แน่นอนเหมือนเดิมเราเน้นทฏษดีพอเพียงน้ำไม่ต้องเยอะ, Let's get hand dirty!
Table Of Contents
- Architecture Diagram
- Pre-requisites
- Preparing to Load Data & Loading Data
- Machine Learning Workflow in SageMaker
- Update the Churn Scores to Snowflake
- Conclusion
Architecture Diagram
Pre-requisites
- AWS Account❗ Sage Maker มี free tier นะ
- Snowflake Account❗ ทดลองเล่นได้ 30 วัน
- Code ที่ใช้ทั้งหมดโหลดจากที่นี่
Preparing to Load Data & Loading Data
เราจะทำการ load ข้อมูลเข้า snowflake ก่อนโดยเพื่อจะเอาไปทำการ predict churn ใน Sage Maker อีกทีนึง หลังจากเรา login เข้ามาแล้วหน้าตา snowflake console ก็จะประมาณนี้
เราจะทำการเปลี่ยน Role เป็น Account Admin ก่อน ใช้คำสั่งด้านล่างใน snowflake worksheet ได้เลย
USE ROLE ACCOUNTADMIN;
ทำการ configure the ตัว Warehouse spec, Role, User และ Database
--Create Warehouse for AI/ML work
CREATE OR REPLACE WAREHOUSE SAGEMAKER_WH
WITH WAREHOUSE_SIZE = 'XSMALL'
AUTO_SUSPEND = 120
AUTO_RESUME = true
INITIALLY_SUSPENDED = TRUE;
--Create ROLE และ USER สำหรับ Sagemaker ในการ ดึงข้อมูลจาก Snowflake
CREATE OR REPLACE ROLE SAGEMAKER_ROLE COMMENT='SageMaker Role';
GRANT ALL ON WAREHOUSE SAGEMAKER_WH TO ROLE SAGEMAKER_ROLE;
GRANT ROLE SAGEMAKER_ROLE TO ROLE SYSADMIN;
CREATE OR REPLACE USER SAGEMAKER PASSWORD='<YOUR_PASSWORD>'
DEFAULT_ROLE=SAGEMAKER_ROLE
DEFAULT_WAREHOUSE=SAGEMAKER_WH
DEFAULT_NAMESPACE=ML_WORKSHOP.PUBLIC
COMMENT='SageMaker User';
GRANT ROLE SAGEMAKER_ROLE TO USER SAGEMAKER;
USE ROLE SYSADMIN;
ทำการสร้าง Databasse และ Schema และ grant access
CREATE DATABASE IF NOT EXISTS ML_WORKSHOP;
GRANT USAGE ON DATABASE ML_WORKSHOP TO ROLE SAGEMAKER_ROLE;
GRANT ALL ON SCHEMA ML_WORKSHOP.PUBLIC TO ROLE SAGEMAKER_ROLE;
--สร้าง Table แล้ว load Customer data จาก Amazon S3
USE ML_WORKSHOP.PUBLIC;
USE WAREHOUSE SAGEMAKER_WH;
CREATE OR REPLACE TABLE CUSTOMER_CHURN (
Cust_ID INT,
State varchar(10),
Account_Length INT,
Area_Code INT,
Phone varchar(10),
Intl_Plan varchar(10),
VMail_Plan varchar(10),
VMail_Message INT,
Day_Mins FLOAT,
Day_Calls INT,
Day_Charge FLOAT,
Eve_Mins FLOAT,
Eve_Calls INT,
Eve_Charge FLOAT,
Night_Mins FLOAT,
Night_Calls INT,
Night_Charge FLOAT,
Intl_Mins FLOAT,
Intl_Calls INT,
Intl_Charge FLOAT,
CustServ_Calls INT,
Churn varchar(10)
);
GRANT ALL ON TABLE CUSTOMER_CHURN TO ROLE SAGEMAKER_ROLE;
เนื่องจากเราจะเอา ML scoring result สุดท้ายมาโชใน snowflake ดังนั้นเราจะสร้าง Table รอไว้ก่อน!!
CREATE OR REPLACE TABLE ML_RESULTS (
Churn_IN INT,
Cust_ID INT,
Churn_Score REAL
);
GRANT ALL ON TABLE ML_RESULTS TO ROLE SAGEMAKER_ROLE;
Configure External Data Stages ใน Snowflake เพื่อใช้เป็น pointer ชี้ไปที่ Amazon S3 ซึ่งเป็นที่เก็บข้อมูลของเรา
CREATE OR REPLACE FILE FORMAT CSVHEADER
TYPE = 'CSV'
FIELD_DELIMITER = ','
SKIP_HEADER = 0;
GRANT USAGE ON FILE FORMAT CSVHEADER TO ROLE SAGEMAKER_ROLE;
--Snowflake เค้ามี sample dataset ตัวนี้ไว้ให้ละ เราก็โหลดมาเลย
CREATE OR REPLACE STAGE CHURN_DATA
url='s3://snowflake-corp-se-workshop/sagemaker-snowflake-devdays-v1.5/sourcedata/';
--Grant Stage Object Piviliges ไปที่ SAGEMAKER_ROLE
GRANT USAGE ON STAGE CHURN_DATA TO ROLE SAGEMAKER_ROLE;
--เราสามารถดู files ที่อยู่ใน external stage ได้List and view the files in the external stage
LIST @CHURN_DATA;
--สามารถลอง Query ดูได้
SELECT $1,$2 FROM @CHURN_DATA/ LIMIT 10;
--โหลด Data เข้า Snowflake!
--เปลี่ยนมาใช้ SAGEMAKER_ROLE
USE ROLE SAGEMAKER_ROLE;
--โหลดข้อมูลจาก External Stage ไป Snowflake table
COPY INTO CUSTOMER_CHURN FROM @CHURN_DATA/ FILE_FORMAT = (FORMAT_NAME = CSVHEADER);
--ทดสอบดึงข้อมูลดู!
SELECT * FROM CUSTOMER_CHURN LIMIT 10;
Machine Learning Workflow in SageMaker
เราจะใช้ตัว AutoGluon-Tabular บน AWS marketeplace เพื่อช่วยประหยัดเวลาในการ handling พวก feature engineering ต่างๆเช่น missing data, feature transformation, model selection, algo selection, hyper parameter selection และ หรือ ensembling multiple models
ถ้าจะใช้ code ตามตัวอย่างกด Continue to Subscibe (แต่ถ้าใครจะใช้ Built in sage maker algo อย่าง XGBoost หรือ Open source framework เองก็ไม่ต้องกดนะจ้า)
เราต้อง Provision SagaMaker notebook ก่อนนะ ถ้าก็กดเข้าไป ทีนี้เราก็จะเจอ notebook แล้ว open jupyer lab ก็จะได้ภาพแบบข้างล่าง Notebook ที่คุ้นเคย
เราจะมาไล่ในส่วนแรกก่อน BAU, import lib ที่จะใช้ใข้ boto3 sdk สร้าง sagemaker session, สร้าง ตัวแปร region และ iam role
import boto3
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import io
import os
import sys
import time
import json
from IPython.display import display
from time import strftime, gmtime
import sagemaker
from sagemaker import AlgorithmEstimator, get_execution_role
from sagemaker.predictor import RealTimePredictor, csv_serializer, StringDeserializer
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
sess = sagemaker.Session()
role = get_execution_role()
region = boto3.Session().region_name
print("IAM role ARN: {}".format(role))
bucket = '<YOUR_BUCKET_NAME>'
prefix = 'churn-analytics'
Access ข้อมูลใน snowflake ผ่าน snowflake connector
import snowflake.connector
# Connecting to Snowflake using the default authenticator
ctx = snowflake.connector.connect(
user='sagemaker',
password='<YOUR_PASSWORD>',
account='<snowflake_account>.<aws_region>',
warehouse='SAGEMAKER_WH',
database='ML_WORKSHOP',
schema='PUBLIC'
)
ในเชิง practice, security standard เราไม่ควรเอา credentials เป็น clear text ฝังในนี้!!!, best practices เราควรเก็บ secret credential สักที่นึงอย่าง AWS Secrets Manager ช่วยท่านได้
ดึงข้อมูลใน snowflake ดูหน่อย
cs=ctx.cursor()
allrows=cs.execute("""select Cust_ID,STATE,ACCOUNT_LENGTH,AREA_CODE,PHONE,INTL_PLAN,VMAIL_PLAN,VMAIL_MESSAGE,
DAY_MINS,DAY_CALLS,DAY_CHARGE,EVE_MINS,EVE_CALLS,EVE_CHARGE,NIGHT_MINS,NIGHT_CALLS,
NIGHT_CHARGE,INTL_MINS,INTL_CALLS,INTL_CHARGE,CUSTSERV_CALLS,
CHURN from CUSTOMER_CHURN """).fetchall()
churn = pd.DataFrame(allrows)
churn.columns=['Cust_id','State','Account Length','Area Code','Phone','Intl Plan', 'VMail Plan', 'VMail Message','Day Mins',
'Day Calls', 'Day Charge', 'Eve Mins', 'Eve Calls', 'Eve Charge', 'Night Mins', 'Night Calls','Night Charge',
'Intl Mins','Intl Calls','Intl Charge','CustServ Calls', 'Churn']
pd.set_option('display.max_columns', 500) # Make sure we can see all of the columns
pd.set_option('display.max_rows', 10) # Keep the output on one page
churn
เตรียม Dataset สำหรับเอาไป Train!!
churn = churn.drop('Phone', axis=1)
churn['Area Code'] = churn['Area Code'].astype(object)
churn = churn.drop(['Day Charge', 'Eve Charge', 'Night Charge', 'Intl Charge'], axis=1)
to_split_data = churn.drop(['Cust_id'], axis=1)
train_data, test_data = np.split(to_split_data.sample(frac=1, random_state=1729), [int(0.9 * len(to_split_data))])
train_data.to_csv('train.csv', header=True, index=False)
pd.set_option('display.max_columns', 100)
pd.set_option('display.width', 1000)
display(train_data)
Get AutoGlue on container image ARN
AUTOGLUON_PRODUCT = "autogluon-tabular-v2-a11d018f6028f8192e60553704ee3d97"
def get_algorithm_arn(region, algo_name):
acct_mapping = {
"ap-northeast-1" : "977537786026",
"ap-northeast-2" : "745090734665",
"ap-southeast-1" : "192199979996",
"ap-southeast-2" : "666831318237",
"us-east-1" : "865070037744",
"eu-central-1" : "446921602837",
"ap-south-1" : "077584701553",
"sa-east-1" : "270155090741",
"ca-central-1" : "470592106596",
"eu-west-1" : "985815980388",
"eu-west-2" : "856760150666",
"eu-west-3" : "843114510376",
"eu-north-1" : "136758871317",
"us-west-1" : "382657785993",
"us-east-2" : "057799348421",
"us-west-2" : "594846645681"
}
return "arn:aws:sagemaker:{}:{}:algorithm/{}".format(region, acct_mapping[region], algo_name)
algorithm_arn = get_algorithm_arn(boto3.Session().region_name, AUTOGLUON_PRODUCT)
print("The Tabular AutoGluon ARN in your region is {}.".format(algorithm_arn))
เมื่อเรา subscribed มาแล้วก็จะให้อารมณ์คล้ายกับ SageMaker built-in algorithm, ซึ่งเรียกได้ว่า deployed กันแบบ "low-to-no-code" กันไปเลย
มี 3 ค่าที่ต้องคำนึง
Hyperparamters: AutoML algorithms like AutoGluon ถูกออกมาให้ automated hpo, คือการ set คือ optional, แต่เราสามารถ override ได้! เราแค่ต้องกำหนด target column แค่นั้นอะ
Infrastructure: ใน SageMaker เราจะใช้ remote training instance ข้างนอกซึ่งเรากำหนด size กับจำนวนตามใจชอบ SageMaker จะ provision, และ de-provision ให้เรา auto (อย่าใช้ notebook นี้ train นะ❗ นั่นคือการใช้ที่ผิด)
ข้อมูลที่จะใช้ train เราควรเอาข้อมูลที่ train เก็บไว้ใน Amazon S3 นะ! สำหรับการรองรับ Scale training ใหญ่ๆในอนาคต
hyperparameters = {
#"hyperparameters": {
# "NN":{"num_epochs": "1"}
#},
#"auto_stack": "True",
"label": "Churn"
}
compatible_training_instance_type='ml.m5.4xlarge'
s3_input_train = sagemaker.inputs.TrainingInput(s3_data='s3://{}/{}/train'.format(bucket, prefix), content_type='csv')
#กำหนดค่า Configuration
autogluon = AlgorithmEstimator(algorithm_arn=algorithm_arn,
role=role,
instance_count=1,
instance_type=compatible_training_instance_type,
sagemaker_session=sess,
base_job_name='autogluon',
hyperparameters=hyperparameters,
train_volume_size=100)
#สั่ง Train
autogluon.fit({'training': s3_input_train})
ทำ Batch inference คือการทำ offline สร้างตัว churn scores เพื่อดูว่าลูกค้าคนไหนที่มีโอกาสจะหนีจากเรา เตรียม Test data ก่อน
batch_input = churn.iloc[:,:-1]
batch_input.to_csv('batch.csv', header=False, index=False)
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'batch/in/batch.csv')).upload_file('batch.csv')
s3uri_batch_input ='s3://{}/{}/batch/in'.format(bucket, prefix)
print('Batch Transform input S3 uri: {}'.format(s3uri_batch_input))
s3uri_batch_output= 's3://{}/{}/batch/out'.format(bucket, prefix)
print('Batch Transform output S3 uri: {}'.format(s3uri_batch_output))
ทำการเรียก transformer() เพื่อสร้างและรัน batch inference process
from sagemaker.transformer import Transformer
BATCH_INSTANCE_TYPE = 'ml.c5.2xlarge'
transformer = autogluon.transformer(instance_count=1,
strategy='SingleRecord',
assemble_with='Line',
instance_type= BATCH_INSTANCE_TYPE,
accept = 'text/csv',
output_path=s3uri_batch_output)
transformer.transform(s3uri_batch_input,
split_type= 'Line',
content_type= 'text/csv',
input_filter = "$[1:]",
join_source = "Input",
output_filter = "$[0,-1]",
logs = False)
ลอง produce ผลผ่าน confusion matrix ง่ายๆดู
batched_churn_scores = pd.read_csv(s3uri_batch_output+'/batch.csv.out', usecols=[0,1], names=['id','scores'])
batched_churn_scores['scores'] = (batched_churn_scores['scores'] == "True.").astype(int)
#batched_churn_scores['Churn'] = (churn['Churn'] == "True.").astype(int)
gt_df = pd.DataFrame((churn['Churn'] == "True.").astype(int)).reset_index(drop=True)
results_df= pd.concat([gt_df,batched_churn_scores],axis=1)
pd.crosstab(index=results_df['Churn'], columns=np.round(results_df['scores']), rownames=['actual'], colnames=['predictions'])
Update the Churn Scores to Snowflake
สุดท้ายเราจะไปโชผลลัพธ์ของ Predict scoring ผ่าน snowflake กัน! เอาข้อมูลกลับ snowflake ผ่าน write_pandas
from snowflake.connector.pandas_tools import write_pandas
results_df.columns = ['CHURN_IN','CUST_ID','CHURN_SCORE']
# Write the predictions to the table named "ML_RESULTS".
success, nchunks, nrows, _ = write_pandas(ctx, results_df, 'ML_RESULTS')
display(nrows)
กลับมาที่หน้า snowflake เราจะมาลอง Query ดู
--ดู CHURN SCORES output ที่ได้มาจาก Sagemaker
SELECT * FROM ML_RESULTS LIMIT 100;
--ตัวอย่างลอง identify พื้นที่ที่มี rate ที่คาดว่าลูกค้า จะ churn โดยเราจะทำการ join ผลลัพธ์จาก predict score กลับไปที่ข้อมูลลูกค้า โดยเงื่อนไขที่โอกาส churn มากกว่าหรือเท่ากับ 10 คน และ churn scoring confidence มากกว่า 75%
SELECT C.STATE, COUNT(DISTINCT(C.CUST_ID))
FROM ML_RESULTS M INNER JOIN CUSTOMER_CHURN C on M.CUST_ID = C.CUST_ID
WHERE M.CHURN_SCORE >= 0.75
GROUP BY C.STATE
HAVING COUNT(DISTINCT(C.CUST_ID)) >= 10
ORDER BY COUNT(C.CUST_ID) DESC;
มีประมาน 17 รัฐที่ต้อง focus ว่าลูกค้าเราน่ากำลังจะ churn!
✅ Conclusion
Posted on September 17, 2021
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.
Related
September 17, 2021