This notebook demonstrates the use of Amazon SageMaker’s implementation of the XGBoost algorithm to train and host a regression model. We use the Abalone data originally from the UCI data repository.
Assign an IAM role and an Amazon S3 bucket
%%time
import os
import boto3
import re
from sagemaker import get_execution_role
role = get_execution_role()
region = boto3.Session().region_name
# put your s3 bucket name here, and create s3 bucket
bucket='<bucket-name>'
prefix = 'sagemaker/DEMO-xgboost-regression'
# customize to your bucket where you have stored the data
bucket_path = 'https://s3-{}.amazonaws.com/{}'.format(region,bucket)
%%time
import io
import boto3
import random
def data_split(FILE_DATA, FILE_TRAIN, FILE_VALIDATION, FILE_TEST, PERCENT_TRAIN, PERCENT_VALIDATION, PERCENT_TEST):
data = [l for l in open(FILE_DATA, 'r')]
train_file = open(FILE_TRAIN, 'w')
valid_file = open(FILE_VALIDATION, 'w')
tests_file = open(FILE_TEST, 'w')
num_of_data = len(data)
num_train = int((PERCENT_TRAIN/100.0)*num_of_data)
num_valid = int((PERCENT_VALIDATION/100.0)*num_of_data)
num_tests = int((PERCENT_TEST/100.0)*num_of_data)
data_fractions = [num_train, num_valid, num_tests]
split_data = [[],[],[]]
rand_data_ind = 0
for split_ind, fraction in enumerate(data_fractions):
for i in range(fraction):
rand_data_ind = random.randint(0, len(data)-1)
split_data[split_ind].append(data[rand_data_ind])
data.pop(rand_data_ind)
for l in split_data[0]:
train_file.write(l)
for l in split_data[1]:
valid_file.write(l)
for l in split_data[2]:
tests_file.write(l)
train_file.close()
valid_file.close()
tests_file.close()
def write_to_s3(fobj, bucket, key):
return boto3.Session(region_name=region).resource('s3').Bucket(bucket).Object(key).upload_fileobj(fobj)
def upload_to_s3(bucket, channel, filename):
fobj=open(filename, 'rb')
key = prefix+'/'+channel
url = 's3://{}/{}/{}'.format(bucket, key, filename)
print('Writing to {}'.format(url))
write_to_s3(fobj, bucket, key)
Next, we read the dataset from the existing repository into memory, for preprocessing prior to training. Then, the next step would be to transfer the data to S3 for use in training.
%%time
import urllib.request
# Load the dataset
FILE_DATA = 'abalone'
urllib.request.urlretrieve("https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/regression/abalone", FILE_DATA)
#split the downloaded data into train/test/validation files
FILE_TRAIN = 'abalone.train'
FILE_VALIDATION = 'abalone.validation'
FILE_TEST = 'abalone.test'
PERCENT_TRAIN = 70
PERCENT_VALIDATION = 15
PERCENT_TEST = 15
data_split(FILE_DATA, FILE_TRAIN, FILE_VALIDATION, FILE_TEST, PERCENT_TRAIN, PERCENT_VALIDATION, PERCENT_TEST)
#upload the files to the S3 bucket
upload_to_s3(bucket, 'train', FILE_TRAIN)
upload_to_s3(bucket, 'validation', FILE_VALIDATION)
upload_to_s3(bucket, 'test', FILE_TEST)
After setting training parameters, we kick off training, and poll for status until training is completed, which in this example, takes between 5 and 6 minutes.
from sagemaker.amazon.amazon_estimator import get_image_uri
container = get_image_uri(region, 'xgboost')
%%time
import boto3
from time import gmtime, strftime
job_name = 'DEMO-xgboost-regression-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print("Training job", job_name)
#Ensure that the training and validation data folders generated above are reflected in the "InputDataConfig" parameter below.
create_training_params = \
{
"AlgorithmSpecification": {
"TrainingImage": container,
"TrainingInputMode": "File"
},
"RoleArn": role,
"OutputDataConfig": {
"S3OutputPath": bucket_path + "/" + prefix + "/single-xgboost"
},
"ResourceConfig": {
"InstanceCount": 1,
"InstanceType": "ml.m4.4xlarge",
"VolumeSizeInGB": 5
},
"TrainingJobName": job_name,
"HyperParameters": {
"max_depth":"5",
"eta":"0.2",
"gamma":"4",
"min_child_weight":"6",
"subsample":"0.7",
"silent":"0",
"objective":"reg:linear",
"num_round":"50"
},
"StoppingCondition": {
"MaxRuntimeInSeconds": 3600
},
"InputDataConfig": [
{
"ChannelName": "train",
"DataSource": {
"S3DataSource": {
"S3DataType": "S3Prefix",
"S3Uri": bucket_path + "/" + prefix + '/train',
"S3DataDistributionType": "FullyReplicated"
}
},
"ContentType": "libsvm",
"CompressionType": "None"
},
{
"ChannelName": "validation",
"DataSource": {
"S3DataSource": {
"S3DataType": "S3Prefix",
"S3Uri": bucket_path + "/" + prefix + '/validation',
"S3DataDistributionType": "FullyReplicated"
}
},
"ContentType": "libsvm",
"CompressionType": "None"
}
]
}
client = boto3.client('sagemaker', region_name=region)
client.create_training_job(**create_training_params)
import time
status = client.describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']
print(status)
while status !='Completed' and status!='Failed':
time.sleep(60)
status = client.describe_training_job(TrainingJobName=job_name)['TrainingJobStatus']
print(status)
Note that the "validation" channel has been initialized too. The SageMaker XGBoost algorithm actually calculates RMSE and writes it to the CloudWatch logs on the data passed to the "validation" channel.
Evaluation metrics for the completed training job are available in CloudWatch. We can pull the area under curve metric for the validation data set and plot it to see the performance of the model over time.
%matplotlib inline
from sagemaker.analytics import TrainingJobAnalytics
metric_name = 'validation:rmse'
metrics_dataframe = TrainingJobAnalytics(training_job_name=job_name, metric_names=[metric_name]).dataframe()
plt = metrics_dataframe.plot(kind='line', figsize=(12,5), x='timestamp', y='value', style='b.', legend=False)
plt.set_ylabel(metric_name);
In order to set up hosting, we have to import the model from training to hosting.
Register the model with hosting. This allows the flexibility of importing models trained elsewhere.
%%time
import boto3
from time import gmtime, strftime
model_name=job_name + '-model'
print(model_name)
info = client.describe_training_job(TrainingJobName=job_name)
model_data = info['ModelArtifacts']['S3ModelArtifacts']
print(model_data)
primary_container = {
'Image': container,
'ModelDataUrl': model_data
}
create_model_response = client.create_model(
ModelName = model_name,
ExecutionRoleArn = role,
PrimaryContainer = primary_container)
print(create_model_response['ModelArn'])
SageMaker supports configuring REST endpoints in hosting with multiple models, e.g. for A/B testing purposes. In order to support this, customers create an endpoint configuration, that describes the distribution of traffic across the models, whether split, shadowed, or sampled in some way. In addition, the endpoint configuration describes the instance type required for model deployment.
from time import gmtime, strftime
endpoint_config_name = 'DEMO-XGBoostEndpointConfig-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(endpoint_config_name)
create_endpoint_config_response = client.create_endpoint_config(
EndpointConfigName = endpoint_config_name,
ProductionVariants=[{
'InstanceType':'ml.m4.xlarge',
'InitialVariantWeight':1,
'InitialInstanceCount':1,
'ModelName':model_name,
'VariantName':'AllTraffic'}])
print("Endpoint Config Arn: " + create_endpoint_config_response['EndpointConfigArn'])
Lastly, the customer creates the endpoint that serves up the model, through specifying the name and configuration defined above. The end result is an endpoint that can be validated and incorporated into production applications. This takes 9-11 minutes to complete.
%%time
import time
endpoint_name = 'DEMO-XGBoostEndpoint-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(endpoint_name)
create_endpoint_response = client.create_endpoint(
EndpointName=endpoint_name,
EndpointConfigName=endpoint_config_name)
print(create_endpoint_response['EndpointArn'])
resp = client.describe_endpoint(EndpointName=endpoint_name)
status = resp['EndpointStatus']
print("Status: " + status)
while status=='Creating':
time.sleep(60)
resp = client.describe_endpoint(EndpointName=endpoint_name)
status = resp['EndpointStatus']
print("Status: " + status)
print("Arn: " + resp['EndpointArn'])
print("Status: " + status)
Finally, the customer can now validate the model for use. They can obtain the endpoint from the client library using the result from previous operations, and generate classifications from the trained model using that endpoint.
runtime_client = boto3.client('runtime.sagemaker', region_name=region)
Start with a single prediction.
!head -1 abalone.test > abalone.single.test
%%time
import json
from itertools import islice
import math
import struct
file_name = 'abalone.single.test' #customize to your test file
with open(file_name, 'r') as f:
payload = f.read().strip()
response = runtime_client.invoke_endpoint(EndpointName=endpoint_name,
ContentType='text/x-libsvm',
Body=payload)
result = response['Body'].read()
result = result.decode("utf-8")
result = result.split(',')
result = [math.ceil(float(i)) for i in result]
label = payload.strip(' ').split()[0]
print ('Label: ',label,'\nPrediction: ', result[0])
Run a batch prediction
import sys
import math
def do_predict(data, endpoint_name, content_type):
payload = '\n'.join(data)
response = runtime_client.invoke_endpoint(EndpointName=endpoint_name,
ContentType=content_type,
Body=payload)
result = response['Body'].read()
result = result.decode("utf-8")
result = result.split(',')
preds = [float((num)) for num in result]
preds = [math.ceil(num) for num in preds]
return preds
def batch_predict(data, batch_size, endpoint_name, content_type):
items = len(data)
arrs = []
for offset in range(0, items, batch_size):
if offset+batch_size < items:
results = do_predict(data[offset:(offset+batch_size)], endpoint_name, content_type)
arrs.extend(results)
else:
arrs.extend(do_predict(data[offset:items], endpoint_name, content_type))
sys.stdout.write('.')
return(arrs)
The following helps us calculate the Median Absolute Percent Error (MdAPE) on the batch dataset.
%%time
import json
import numpy as np
with open(FILE_TEST, 'r') as f:
payload = f.read().strip()
labels = [int(line.split(' ')[0]) for line in payload.split('\n')]
test_data = [line for line in payload.split('\n')]
preds = batch_predict(test_data, 100, endpoint_name, 'text/x-libsvm')
print('\n Median Absolute Percent Error (MdAPE) = ', np.median(np.abs(np.array(labels) - np.array(preds)) / np.array(labels)))
client.delete_endpoint(EndpointName=endpoint_name)