ทดสอบทำ Machine Learning predict customer churn โดยใช้งาน Amazon SageMaker กับ Snowflake!

chatchaikomrangded

Chatchai Komrangded (Bas)

Posted on September 17, 2021

ทดสอบทำ Machine Learning predict customer churn โดยใช้งาน Amazon SageMaker กับ Snowflake!

โพสต์นี้เราจะทดสอบการใช้ Amazon SageMaker ด้วย AutoGlueon Tabular ในการ train และ inference machine learning โดยเอาข้อมูลจาก Cloud data warehouse อย่าง snowflake ซึ่ง snowflake ถือว่าเป็น AWS partner solution ที่อยู่บน AWS marketplace, หลายๆคนอาจสงสัยว่าเอ๊ะ AWS ก็มี Amazon Redshift นิ? ใช่ครับ เรามีตัวอย่างเยอะเลยที่ใช้ Amazon redshift ร่วมกัย Sage Maker หาแปปเดียวก็เจอ 55 ตัวอย่าง, แต่เราก็จะมีบางลูกค้าในไทยที่มีการใช้ Solution ของ AWS partner เช่น snowflake คำถามที่ลูกค้าถามบ่อยๆคือจะ integrated solution รอบๆเหล่านี้เข้าไปที่ AWS บ้านหลังใหญ่ๆของเค้าได้ยังไงเช่นทำพวก ML บน Sage Maker, ก็เลยคิดว่าคงมีประโยชน์ถ้าเราทำตัวอย่างการ integration ระหว่าง Amazon Sage maker กับ Snowflake นี้กันดู! แน่นอนเหมือนเดิมเราเน้นทฏษดีพอเพียงน้ำไม่ต้องเยอะ, Let's get hand dirty!

Sage Maker คืออะไร?

Snowflake คืออะไร?

Table Of Contents

Architecture Diagram

arc

Pre-requisites

  • AWS Account❗ Sage Maker มี free tier นะ
  • Snowflake Account❗ ทดลองเล่นได้ 30 วัน
  • Code ที่ใช้ทั้งหมดโหลดจากที่นี่

Preparing to Load Data & Loading Data

เราจะทำการ load ข้อมูลเข้า snowflake ก่อนโดยเพื่อจะเอาไปทำการ predict churn ใน Sage Maker อีกทีนึง หลังจากเรา login เข้ามาแล้วหน้าตา snowflake console ก็จะประมาณนี้

Alt Text

เราจะทำการเปลี่ยน Role เป็น Account Admin ก่อน ใช้คำสั่งด้านล่างใน snowflake worksheet ได้เลย

USE ROLE ACCOUNTADMIN;
Enter fullscreen mode Exit fullscreen mode

ทำการ configure the ตัว Warehouse spec, Role, User และ Database

--Create Warehouse for AI/ML work
CREATE OR REPLACE WAREHOUSE SAGEMAKER_WH
  WITH WAREHOUSE_SIZE = 'XSMALL'
  AUTO_SUSPEND = 120
  AUTO_RESUME = true
  INITIALLY_SUSPENDED = TRUE;

--Create ROLE และ USER สำหรับ Sagemaker ในการ ดึงข้อมูลจาก Snowflake
CREATE OR REPLACE ROLE SAGEMAKER_ROLE COMMENT='SageMaker Role';
GRANT ALL ON WAREHOUSE SAGEMAKER_WH TO ROLE SAGEMAKER_ROLE;
GRANT ROLE SAGEMAKER_ROLE TO ROLE SYSADMIN;

CREATE OR REPLACE USER SAGEMAKER PASSWORD='<YOUR_PASSWORD>' 
    DEFAULT_ROLE=SAGEMAKER_ROLE 
    DEFAULT_WAREHOUSE=SAGEMAKER_WH
    DEFAULT_NAMESPACE=ML_WORKSHOP.PUBLIC
    COMMENT='SageMaker User';

GRANT ROLE SAGEMAKER_ROLE TO USER SAGEMAKER;

USE ROLE SYSADMIN;
Enter fullscreen mode Exit fullscreen mode

ทำการสร้าง Databasse และ Schema และ grant access

CREATE DATABASE IF NOT EXISTS ML_WORKSHOP;
GRANT USAGE ON DATABASE ML_WORKSHOP TO ROLE SAGEMAKER_ROLE;
GRANT ALL ON SCHEMA ML_WORKSHOP.PUBLIC TO ROLE SAGEMAKER_ROLE;
Enter fullscreen mode Exit fullscreen mode

--สร้าง Table แล้ว load Customer data จาก Amazon S3

USE ML_WORKSHOP.PUBLIC;
USE WAREHOUSE SAGEMAKER_WH;

CREATE OR REPLACE TABLE CUSTOMER_CHURN (
    Cust_ID INT,
    State varchar(10),
    Account_Length INT,
    Area_Code INT,
    Phone varchar(10),
    Intl_Plan varchar(10),
    VMail_Plan varchar(10),
    VMail_Message INT,
    Day_Mins FLOAT,
    Day_Calls INT,
    Day_Charge  FLOAT,
    Eve_Mins FLOAT,
    Eve_Calls INT,
    Eve_Charge FLOAT,
    Night_Mins FLOAT,
    Night_Calls INT,
    Night_Charge FLOAT,
    Intl_Mins FLOAT,
    Intl_Calls INT,
    Intl_Charge FLOAT,
    CustServ_Calls INT,
    Churn varchar(10)
);
GRANT ALL ON TABLE CUSTOMER_CHURN TO ROLE SAGEMAKER_ROLE;
Enter fullscreen mode Exit fullscreen mode

เนื่องจากเราจะเอา ML scoring result สุดท้ายมาโชใน snowflake ดังนั้นเราจะสร้าง Table รอไว้ก่อน!!

CREATE OR REPLACE TABLE ML_RESULTS (
    Churn_IN INT,
    Cust_ID INT,
    Churn_Score REAL
);
GRANT ALL ON TABLE ML_RESULTS TO ROLE SAGEMAKER_ROLE; 
Enter fullscreen mode Exit fullscreen mode

Configure External Data Stages ใน Snowflake เพื่อใช้เป็น pointer ชี้ไปที่ Amazon S3 ซึ่งเป็นที่เก็บข้อมูลของเรา

CREATE OR REPLACE FILE FORMAT CSVHEADER
    TYPE = 'CSV'
    FIELD_DELIMITER = ','
    SKIP_HEADER = 0;

GRANT USAGE ON FILE FORMAT CSVHEADER TO ROLE SAGEMAKER_ROLE;    

--Snowflake เค้ามี sample dataset ตัวนี้ไว้ให้ละ เราก็โหลดมาเลย
CREATE OR REPLACE STAGE CHURN_DATA
  url='s3://snowflake-corp-se-workshop/sagemaker-snowflake-devdays-v1.5/sourcedata/';
Enter fullscreen mode Exit fullscreen mode

--Grant Stage Object Piviliges ไปที่ SAGEMAKER_ROLE

GRANT USAGE ON STAGE CHURN_DATA TO ROLE SAGEMAKER_ROLE;

--เราสามารถดู files ที่อยู่ใน external stage ได้List and view the files in the external stage
LIST @CHURN_DATA;

--สามารถลอง Query ดูได้
SELECT $1,$2 FROM @CHURN_DATA/ LIMIT 10;
Enter fullscreen mode Exit fullscreen mode

--โหลด Data เข้า Snowflake!

--เปลี่ยนมาใช้ SAGEMAKER_ROLE
USE ROLE SAGEMAKER_ROLE;

--โหลดข้อมูลจาก External Stage ไป Snowflake table
COPY INTO CUSTOMER_CHURN FROM @CHURN_DATA/ FILE_FORMAT = (FORMAT_NAME = CSVHEADER);

--ทดสอบดึงข้อมูลดู!
SELECT * FROM CUSTOMER_CHURN LIMIT 10;
Enter fullscreen mode Exit fullscreen mode

Machine Learning Workflow in SageMaker

เราจะใช้ตัว AutoGluon-Tabular บน AWS marketeplace เพื่อช่วยประหยัดเวลาในการ handling พวก feature engineering ต่างๆเช่น missing data, feature transformation, model selection, algo selection, hyper parameter selection และ หรือ ensembling multiple models

ถ้าจะใช้ code ตามตัวอย่างกด Continue to Subscibe (แต่ถ้าใครจะใช้ Built in sage maker algo อย่าง XGBoost หรือ Open source framework เองก็ไม่ต้องกดนะจ้า)

sub

เราต้อง Provision SagaMaker notebook ก่อนนะ ถ้าก็กดเข้าไป ทีนี้เราก็จะเจอ notebook แล้ว open jupyer lab ก็จะได้ภาพแบบข้างล่าง Notebook ที่คุ้นเคย
Alt Text

เราจะมาไล่ในส่วนแรกก่อน BAU, import lib ที่จะใช้ใข้ boto3 sdk สร้าง sagemaker session, สร้าง ตัวแปร region และ iam role

import boto3
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import io
import os
import sys
import time
import json
from IPython.display import display
from time import strftime, gmtime

import sagemaker
from sagemaker import AlgorithmEstimator, get_execution_role
from sagemaker.predictor import RealTimePredictor, csv_serializer, StringDeserializer

from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

sess = sagemaker.Session()
role = get_execution_role()
region = boto3.Session().region_name
print("IAM role ARN: {}".format(role))

bucket = '<YOUR_BUCKET_NAME>'
prefix = 'churn-analytics'
Enter fullscreen mode Exit fullscreen mode

Access ข้อมูลใน snowflake ผ่าน snowflake connector

import snowflake.connector
# Connecting to Snowflake using the default authenticator
ctx = snowflake.connector.connect(
  user='sagemaker',
  password='<YOUR_PASSWORD>',
  account='<snowflake_account>.<aws_region>',
  warehouse='SAGEMAKER_WH',
  database='ML_WORKSHOP',
  schema='PUBLIC'
)
Enter fullscreen mode Exit fullscreen mode

ในเชิง practice, security standard เราไม่ควรเอา credentials เป็น clear text ฝังในนี้!!!, best practices เราควรเก็บ secret credential สักที่นึงอย่าง AWS Secrets Manager ช่วยท่านได้

ดึงข้อมูลใน snowflake ดูหน่อย

cs=ctx.cursor()
allrows=cs.execute("""select Cust_ID,STATE,ACCOUNT_LENGTH,AREA_CODE,PHONE,INTL_PLAN,VMAIL_PLAN,VMAIL_MESSAGE,
                   DAY_MINS,DAY_CALLS,DAY_CHARGE,EVE_MINS,EVE_CALLS,EVE_CHARGE,NIGHT_MINS,NIGHT_CALLS,
                   NIGHT_CHARGE,INTL_MINS,INTL_CALLS,INTL_CHARGE,CUSTSERV_CALLS,
                   CHURN from CUSTOMER_CHURN """).fetchall()

churn = pd.DataFrame(allrows)
churn.columns=['Cust_id','State','Account Length','Area Code','Phone','Intl Plan', 'VMail Plan', 'VMail Message','Day Mins',
            'Day Calls', 'Day Charge', 'Eve Mins', 'Eve Calls', 'Eve Charge', 'Night Mins', 'Night Calls','Night Charge',
            'Intl Mins','Intl Calls','Intl Charge','CustServ Calls', 'Churn']

pd.set_option('display.max_columns', 500)     # Make sure we can see all of the columns
pd.set_option('display.max_rows', 10)         # Keep the output on one page
churn
Enter fullscreen mode Exit fullscreen mode

เตรียม Dataset สำหรับเอาไป Train!!

churn = churn.drop('Phone', axis=1)
churn['Area Code'] = churn['Area Code'].astype(object)
churn = churn.drop(['Day Charge', 'Eve Charge', 'Night Charge', 'Intl Charge'], axis=1)
to_split_data = churn.drop(['Cust_id'], axis=1)
train_data, test_data = np.split(to_split_data.sample(frac=1, random_state=1729), [int(0.9 * len(to_split_data))])
train_data.to_csv('train.csv', header=True, index=False)

pd.set_option('display.max_columns', 100)
pd.set_option('display.width', 1000)
display(train_data)
Enter fullscreen mode Exit fullscreen mode

Get AutoGlue on container image ARN

AUTOGLUON_PRODUCT = "autogluon-tabular-v2-a11d018f6028f8192e60553704ee3d97"
def get_algorithm_arn(region, algo_name):
    acct_mapping = {
        "ap-northeast-1" : "977537786026",
        "ap-northeast-2" : "745090734665",
        "ap-southeast-1" : "192199979996",
        "ap-southeast-2" : "666831318237",
        "us-east-1"      : "865070037744",
        "eu-central-1"   : "446921602837",
        "ap-south-1"     : "077584701553",
        "sa-east-1"      : "270155090741",
        "ca-central-1"   : "470592106596",
        "eu-west-1"      : "985815980388",
        "eu-west-2"      : "856760150666",
        "eu-west-3"      : "843114510376",
        "eu-north-1"     : "136758871317",
        "us-west-1"      : "382657785993",
        "us-east-2"      : "057799348421",
        "us-west-2"      : "594846645681"
    }

    return "arn:aws:sagemaker:{}:{}:algorithm/{}".format(region, acct_mapping[region], algo_name)

algorithm_arn = get_algorithm_arn(boto3.Session().region_name, AUTOGLUON_PRODUCT)
print("The Tabular AutoGluon ARN in your region is {}.".format(algorithm_arn))
Enter fullscreen mode Exit fullscreen mode

เมื่อเรา subscribed มาแล้วก็จะให้อารมณ์คล้ายกับ SageMaker built-in algorithm, ซึ่งเรียกได้ว่า deployed กันแบบ "low-to-no-code" กันไปเลย

มี 3 ค่าที่ต้องคำนึง

  1. Hyperparamters: AutoML algorithms like AutoGluon ถูกออกมาให้ automated hpo, คือการ set คือ optional, แต่เราสามารถ override ได้! เราแค่ต้องกำหนด target column แค่นั้นอะ

  2. Infrastructure: ใน SageMaker เราจะใช้ remote training instance ข้างนอกซึ่งเรากำหนด size กับจำนวนตามใจชอบ SageMaker จะ provision, และ de-provision ให้เรา auto (อย่าใช้ notebook นี้ train นะ❗ นั่นคือการใช้ที่ผิด)

  3. ข้อมูลที่จะใช้ train เราควรเอาข้อมูลที่ train เก็บไว้ใน Amazon S3 นะ! สำหรับการรองรับ Scale training ใหญ่ๆในอนาคต

hyperparameters = {
    #"hyperparameters": {
    #    "NN":{"num_epochs": "1"}
    #},
    #"auto_stack": "True",
    "label": "Churn"
}

compatible_training_instance_type='ml.m5.4xlarge' 
s3_input_train = sagemaker.inputs.TrainingInput(s3_data='s3://{}/{}/train'.format(bucket, prefix), content_type='csv')

#กำหนดค่า Configuration
autogluon = AlgorithmEstimator(algorithm_arn=algorithm_arn, 
                                  role=role, 
                                  instance_count=1, 
                                  instance_type=compatible_training_instance_type, 
                                  sagemaker_session=sess, 
                                  base_job_name='autogluon',
                                  hyperparameters=hyperparameters,
                                  train_volume_size=100) 

#สั่ง Train
autogluon.fit({'training': s3_input_train})
Enter fullscreen mode Exit fullscreen mode

train_result

ทำ Batch inference คือการทำ offline สร้างตัว churn scores เพื่อดูว่าลูกค้าคนไหนที่มีโอกาสจะหนีจากเรา เตรียม Test data ก่อน

batch_input = churn.iloc[:,:-1]
batch_input.to_csv('batch.csv', header=False, index=False)
boto3.Session().resource('s3').Bucket(bucket).Object(os.path.join(prefix, 'batch/in/batch.csv')).upload_file('batch.csv')

s3uri_batch_input ='s3://{}/{}/batch/in'.format(bucket, prefix)
print('Batch Transform input S3 uri: {}'.format(s3uri_batch_input))

s3uri_batch_output= 's3://{}/{}/batch/out'.format(bucket, prefix)
print('Batch Transform output S3 uri: {}'.format(s3uri_batch_output))
Enter fullscreen mode Exit fullscreen mode

ทำการเรียก transformer() เพื่อสร้างและรัน batch inference process

from sagemaker.transformer import Transformer
BATCH_INSTANCE_TYPE = 'ml.c5.2xlarge'

transformer = autogluon.transformer(instance_count=1,
                                         strategy='SingleRecord',
                                         assemble_with='Line',
                                         instance_type= BATCH_INSTANCE_TYPE,
                                         accept = 'text/csv',
                                         output_path=s3uri_batch_output)

transformer.transform(s3uri_batch_input,
                      split_type= 'Line',
                      content_type= 'text/csv',   
                      input_filter = "$[1:]",
                      join_source = "Input",
                      output_filter = "$[0,-1]",
                      logs = False)
Enter fullscreen mode Exit fullscreen mode

ลอง produce ผลผ่าน confusion matrix ง่ายๆดู

batched_churn_scores = pd.read_csv(s3uri_batch_output+'/batch.csv.out', usecols=[0,1], names=['id','scores'])
batched_churn_scores['scores'] = (batched_churn_scores['scores'] == "True.").astype(int)
#batched_churn_scores['Churn'] = (churn['Churn'] == "True.").astype(int)
gt_df = pd.DataFrame((churn['Churn'] == "True.").astype(int)).reset_index(drop=True)

results_df= pd.concat([gt_df,batched_churn_scores],axis=1)
pd.crosstab(index=results_df['Churn'], columns=np.round(results_df['scores']), rownames=['actual'], colnames=['predictions'])
Enter fullscreen mode Exit fullscreen mode

Update the Churn Scores to Snowflake

สุดท้ายเราจะไปโชผลลัพธ์ของ Predict scoring ผ่าน snowflake กัน! เอาข้อมูลกลับ snowflake ผ่าน write_pandas

from snowflake.connector.pandas_tools import write_pandas

results_df.columns = ['CHURN_IN','CUST_ID','CHURN_SCORE']

# Write the predictions to the table named "ML_RESULTS".
success, nchunks, nrows, _ = write_pandas(ctx, results_df, 'ML_RESULTS')

display(nrows)
Enter fullscreen mode Exit fullscreen mode

กลับมาที่หน้า snowflake เราจะมาลอง Query ดู

query

--ดู CHURN SCORES output ที่ได้มาจาก Sagemaker
SELECT * FROM ML_RESULTS LIMIT 100;

--ตัวอย่างลอง identify พื้นที่ที่มี rate ที่คาดว่าลูกค้า จะ churn โดยเราจะทำการ join ผลลัพธ์จาก predict score กลับไปที่ข้อมูลลูกค้า โดยเงื่อนไขที่โอกาส churn มากกว่าหรือเท่ากับ 10 คน และ churn scoring confidence มากกว่า 75%
SELECT C.STATE, COUNT(DISTINCT(C.CUST_ID))
FROM ML_RESULTS M INNER JOIN  CUSTOMER_CHURN C on M.CUST_ID = C.CUST_ID
WHERE M.CHURN_SCORE >= 0.75
GROUP BY C.STATE
HAVING COUNT(DISTINCT(C.CUST_ID)) >= 10
ORDER BY COUNT(C.CUST_ID) DESC;
Enter fullscreen mode Exit fullscreen mode

snow_query

มีประมาน 17 รัฐที่ต้อง focus ว่าลูกค้าเราน่ากำลังจะ churn!

✅ Conclusion

จบแล้ว!
good_stuff

💖 💪 🙅 🚩
chatchaikomrangded
Chatchai Komrangded (Bas)

Posted on September 17, 2021

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related