Bài viết này sẽ hướng dẫn cho các bạn cách dùng Amazon SageMaker để phát triển, đào tạo, điều chỉnh và triển khai mô hình ML dựa trên Scikit-Learn (Random Forest). Mẫu dữ liệu được dùng là The California Housing dataset đã được công bố tại:
Pace, R. Kelley, and Ronald Barry. "Sparse spatial autoregressions." Statistics & Probability Letters 33.3 (1997): 291-297.
Đoạn mã cơ bản:
import datetime
import time
import tarfile import boto3
import pandas as pd
import numpy as np
from sagemaker import get_execution_role
import sagemaker
from sklearn.model_selection import train_test_split
from sklearn.datasets import fetch_california_housing sm_boto3 = boto3.client("sagemaker") sess = sagemaker.Session() region = sess.boto_session.region_name bucket = sess.default_bucket() # this could also be a hard-coded bucket name print("Using bucket " + bucket)
Chuẩn bị dữ liệu
Dữ liệu tải từ sklearn, chia nhỏ và gửi đến S3
# we use the California housing dataset
data = fetch_california_housing()
X_train, X_test, y_train, y_test = train_test_split( data.data, data.target, test_size=0.25, random_state=42
) trainX = pd.DataFrame(X_train, columns=data.feature_names)
trainX["target"] = y_train testX = pd.DataFrame(X_test, columns=data.feature_names)
testX["target"] = y_test
trainX.head()
trainX.to_csv("california_housing_train.csv")
testX.to_csv("california_housing_test.csv")
# send data to S3. SageMaker will take training data from s3
trainpath = sess.upload_data( path="california_housing_train.csv", bucket=bucket, key_prefix="sagemaker/sklearncontainer"
) testpath = sess.upload_data( path="california_housing_test.csv", bucket=bucket, key_prefix="sagemaker/sklearncontainer"
)
Mã Script Mode
Tập lệnh bên dưới chứa cả chức năng đào tạo và suy luận, đồng thời có thể chạy cả trong phần cứng SageMaker Training hoặc cục bộ (PC, sổ ghi chép SageMaker, tại chỗ, v.v.). Hướng dẫn chi tiết tại đây.
%%writefile script.py import argparse
import joblib
import os import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestRegressor # inference functions ---------------
def model_fn(model_dir): clf = joblib.load(os.path.join(model_dir, "model.joblib")) return clf if __name__ == "__main__": print("extracting arguments") parser = argparse.ArgumentParser() # hyperparameters sent by the client are passed as command-line arguments to the script. # to simplify the demo we don't use all sklearn RandomForest hyperparameters parser.add_argument("--n-estimators", type=int, default=10) parser.add_argument("--min-samples-leaf", type=int, default=3) # Data, model, and output directories parser.add_argument("--model-dir", type=str, default=os.environ.get("SM_MODEL_DIR")) parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN")) parser.add_argument("--test", type=str, default=os.environ.get("SM_CHANNEL_TEST")) parser.add_argument("--train-file", type=str, default="california_housing_train.csv") parser.add_argument("--test-file", type=str, default="california_housing_test.csv") parser.add_argument( "--features", type=str ) # in this script we ask user to explicitly name features parser.add_argument( "--target", type=str ) # in this script we ask user to explicitly name the target args, _ = parser.parse_known_args() print("reading data") train_df = pd.read_csv(os.path.join(args.train, args.train_file)) test_df = pd.read_csv(os.path.join(args.test, args.test_file)) print("building training and testing datasets") X_train = train_df[args.features.split()] X_test = test_df[args.features.split()] y_train = train_df[args.target] y_test = test_df[args.target] # train print("training model") model = RandomForestRegressor( n_estimators=args.n_estimators, min_samples_leaf=args.min_samples_leaf, n_jobs=-1 ) model.fit(X_train, y_train) # print abs error print("validating model") abs_err = np.abs(model.predict(X_test) - y_test) # print couple perf metrics for q in [10, 50, 90]: print("AE-at-" + str(q) + "th-percentile: " + str(np.percentile(a=abs_err, q=q))) # persist model path = os.path.join(args.model_dir, "model.joblib") joblib.dump(model, path) print("model persisted at " + path) print(args.min_samples_leaf)
Local training
Đoạn mã cho phép xóa khỏi tập lệnh mọi cấu hình dành riêng cho SageMaker và chạy cục bộ.
! python script.py --n-estimators 100 \ --min-samples-leaf 2 \ --model-dir ./ \ --train ./ \ --test ./ \ --features 'MedInc HouseAge AveRooms AveBedrms Population AveOccup Latitude Longitude' \ --target target
SageMaker Training
Triển khai việc training với Python SDK
# We use the Estimator from the SageMaker Python SDK
from sagemaker.sklearn.estimator import SKLearn FRAMEWORK_VERSION = "0.23-1" sklearn_estimator = SKLearn( entry_point="script.py", role=get_execution_role(), instance_count=1, instance_type="ml.c5.xlarge", framework_version=FRAMEWORK_VERSION, base_job_name="rf-scikit", metric_definitions=[{"Name": "median-AE", "Regex": "AE-at-50th-percentile: ([0-9.]+).*$"}], hyperparameters={ "n-estimators": 100, "min-samples-leaf": 3, "features": "MedInc HouseAge AveRooms AveBedrms Population AveOccup Latitude Longitude", "target": "target", },
)
# launch training job, with asynchronous call
sklearn_estimator.fit({"train": trainpath, "test": testpath}, wait=True)
Giải pháp thay thế: triển khai training với boto3
# first compress the code and send to S3 source = "source.tar.gz"
project = "scikitlearn-train-from-boto3" tar = tarfile.open(source, "w:gz")
tar.add("script.py")
tar.close() s3 = boto3.client("s3")
s3.upload_file(source, bucket, project + "/" + source)
Khi sử dụng boto3 để khởi chạy training, chúng ta phải trỏ rõ ràng đến docker image.
from sagemaker import image_uris training_image = image_uris.retrieve( framework="sklearn", region=region, version=FRAMEWORK_VERSION, py_version="py3", instance_type="ml.c5.xlarge",
)
print(training_image)
# launch training job response = sm_boto3.create_training_job( TrainingJobName="sklearn-boto3-" + datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S"), HyperParameters={ "n_estimators": "300", "min_samples_leaf": "3", "sagemaker_program": "script.py", "features": "MedInc HouseAge AveRooms AveBedrms Population AveOccup Latitude Longitude", "target": "target", "sagemaker_submit_directory": "s3://" + bucket + "/" + project + "/" + source, }, AlgorithmSpecification={ "TrainingImage": training_image, "TrainingInputMode": "File", "MetricDefinitions": [ {"Name": "median-AE", "Regex": "AE-at-50th-percentile: ([0-9.]+).*$"}, ], }, RoleArn=get_execution_role(), InputDataConfig=[ { "ChannelName": "train", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": trainpath, "S3DataDistributionType": "FullyReplicated", } }, }, { "ChannelName": "test", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": testpath, "S3DataDistributionType": "FullyReplicated", } }, }, ], OutputDataConfig={"S3OutputPath": "s3://" + bucket + "/sagemaker-sklearn-artifact/"}, ResourceConfig={"InstanceType": "ml.c5.xlarge", "InstanceCount": 1, "VolumeSizeInGB": 10}, StoppingCondition={"MaxRuntimeInSeconds": 86400}, EnableNetworkIsolation=False,
) print(response)
Triển khai việc điều chỉnh với Python SDK
# we use the Hyperparameter Tuner
from sagemaker.tuner import IntegerParameter # Define exploration boundaries
hyperparameter_ranges = { "n-estimators": IntegerParameter(20, 100), "min-samples-leaf": IntegerParameter(2, 6),
} # create Optimizer
Optimizer = sagemaker.tuner.HyperparameterTuner( estimator=sklearn_estimator, hyperparameter_ranges=hyperparameter_ranges, base_tuning_job_name="RF-tuner", objective_type="Minimize", objective_metric_name="median-AE", metric_definitions=[ {"Name": "median-AE", "Regex": "AE-at-50th-percentile: ([0-9.]+).*$"} ], # extract tracked metric from logs with regexp max_jobs=10, max_parallel_jobs=2,
)
Optimizer.fit({"train": trainpath, "test": testpath})
# get tuner results in a df
results = Optimizer.analytics().dataframe()
while results.empty: time.sleep(1) results = Optimizer.analytics().dataframe()
results.head()
Triển khai đến điểm cuối theo thời gian thực
Khai triển với Python SDK
Công cụ Estimator
có thể được triển khai trực tiếp sau khi đào tạo, với Estimator.deploy()
nhưng ở đây quy trình được trình bày mở rộng hơn trong việc tạo mô hình từ s3, có thể được sử dụng để triển khai một mô hình đã được đào tạo trong một phiên khác hoặc thậm chí là ra ngoài của SageMaker.
sklearn_estimator.latest_training_job.wait(logs="None")
artifact = sm_boto3.describe_training_job( TrainingJobName=sklearn_estimator.latest_training_job.name
)["ModelArtifacts"]["S3ModelArtifacts"] print("Model artifact persisted at " + artifact)
from sagemaker.sklearn.model import SKLearnModel model = SKLearnModel( model_data=artifact, role=get_execution_role(), entry_point="script.py", framework_version=FRAMEWORK_VERSION,
)
predictor = model.deploy(instance_type="ml.c5.large", initial_instance_count=1)
Gọi Python SDK
# the SKLearnPredictor does the serialization from pandas for us
print(predictor.predict(testX[data.feature_names]))
Cách thay thế: gọi boto3
runtime = boto3.client("sagemaker-runtime")
Cách 1: dùng CSV
# csv serialization
response = runtime.invoke_endpoint( EndpointName=predictor.endpoint, Body=testX[data.feature_names].to_csv(header=False, index=False).encode("utf-8"), ContentType="text/csv",
) print(response["Body"].read())
Cách 2: dùng npy
# npy serialization
from io import BytesIO # Serialise numpy ndarray as bytes
buffer = BytesIO()
# Assuming testX is a data frame
np.save(buffer, testX[data.feature_names].values) response = runtime.invoke_endpoint( EndpointName=predictor.endpoint, Body=buffer.getvalue(), ContentType="application/x-npy"
) print(response["Body"].read())
Đừng quên xóa điểm cuối
sm_boto3.delete_endpoint(EndpointName=predictor.endpoint)
Kham khảo:
Doc: https://sagemaker.readthedocs.io/en/stable/using_sklearn.html
SDK: https://sagemaker.readthedocs.io/en/stable/sagemaker.sklearn.html
boto3: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#client