SageMaker
November 2, 2023About 5 min
Common Variables
import os
import time
import boto3
import numpy as np
import pandas as pd
import sagemaker
#from sagemaker import get_execution_role
def get_execution_role():
return "SageMaker-ExecutionRole-20241030T121452"
sess = boto3.Session()
sm = sess.client("sagemaker")
role = get_execution_role()
sagemaker_session = sagemaker.Session(boto_session=sess)
bucket = "shared-hs-mlops-bucket" #sagemaker_session.default_bucket()
region = boto3.Session().region_name
model_package_group_name = "PipelineModelPackageGroup"
prefix = "mas-pipeline-model-example"
pipeline_name = "mas-serial-inference-pipeline" # SageMaker Pipeline name
raw_dir = os.path.join(os.getcwd(), "data", "raw")
#data_dir = os.path.join(os.getcwd(), "data")
#os.makedirs(data_dir, exist_ok=True)
#raw_dir = os.path.join(os.getcwd(), "data/raw")
#os.makedirs(raw_dir, exist_ok=True)
#########################################################################
## Download Data
s3 = boto3.client("s3")
s3.download_file(
f"sagemaker-example-files-prod-{region}",
"datasets/tabular/california_housing/cal_housing.tgz",
"cal_housing.tgz",
)
#!tar -zxf cal_housing.tgz
columns = [
"longitude", "latitude", "housingMedianAge",
"totalRooms", "totalBedrooms","population",
"households", "medianIncome", "medianHouseValue",
]
cal_housing_df = pd.read_csv("CaliforniaHousing/cal_housing.data", names=columns, header=None)
cal_housing_df["medianHouseValue"] /= 500000 # Scaling target down to avoid overcomplicating the example
cal_housing_df.to_csv(f"./data/raw/raw_data_all.csv", header=True, index=False)
rawdata_s3_prefix = "{}/data/raw".format(prefix)
#########################################################################
## Upload to S3
raw_s3 = sagemaker_session.upload_data(
bucket = bucket,
path="./data/raw/",
key_prefix=rawdata_s3_prefix)
print(raw_s3) #s3://shared-hs-mlops-bucket/mas-pipeline-model-example/data/raw
Nodes
preprocess.py
import glob
import numpy as np
import pandas as pd
import os
import json
import joblib
from io import StringIO
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import tarfile
try:
from sagemaker_containers.beta.framework import (
content_types, encoders, env, modules,
transformer, worker, server,
)
except ImportError:
pass
feature_columns = [
"longitude",
"latitude",
"housingMedianAge",
"totalRooms",
"totalBedrooms",
"population",
"households",
"medianIncome",
]
label_column = "medianHouseValue"
base_dir = "/opt/ml/processing"
base_output_dir = "/opt/ml/output/"
if __name__ == "__main__":
df = pd.read_csv(f"{base_dir}/input/raw_data_all.csv")
feature_data = df.drop(label_column, axis=1, inplace=False)
label_data = df[label_column]
x_train, x_test, y_train, y_test = train_test_split(feature_data, label_data, test_size=0.33)
scaler = StandardScaler()
scaler.fit(x_train)
x_train = scaler.transform(x_train)
x_test = scaler.transform(x_test)
train_dataset = pd.concat([pd.DataFrame(x_train), y_train.reset_index(drop=True)], axis=1)
test_dataset = pd.concat([pd.DataFrame(x_test), y_test.reset_index(drop=True)], axis=1)
train_dataset.columns = feature_columns + [label_column]
test_dataset.columns = feature_columns + [label_column]
train_dataset.to_csv(f"{base_dir}/train/train.csv", header=True, index=False)
test_dataset.to_csv(f"{base_dir}/test/test.csv", header=True, index=False)
joblib.dump(scaler, "model.joblib")
with tarfile.open(f"{base_dir}/scaler_model/model.tar.gz", "w:gz") as tar_handle:
tar_handle.add(f"model.joblib")
train.py
import argparse
import numpy as np
import os
import tensorflow as tf
import pandas as pd
feature_columns = [
"longitude",
"latitude",
"housingMedianAge",
"totalRooms",
"totalBedrooms",
"population",
"households",
"medianIncome",
]
label_column = "medianHouseValue"
def parse_args():
parser = argparse.ArgumentParser()
# hyperparameters sent by the client are passed as command-line arguments to the script
parser.add_argument("--epochs", type=int, default=1)
parser.add_argument("--batch_size", type=int, default=64)
parser.add_argument("--learning_rate", type=float, default=0.1)
# data directories
parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
parser.add_argument("--test", type=str, default=os.environ.get("SM_CHANNEL_TEST"))
# model directory
parser.add_argument("--sm-model-dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
return parser.parse_known_args()
def get_train_data(train_dir):
train_data = pd.read_csv(os.path.join(train_dir, "train.csv"))
x_train = train_data[feature_columns].to_numpy()
y_train = train_data[label_column].to_numpy()
print("x train", x_train.shape, "y train", y_train.shape)
return x_train, y_train
def get_test_data(test_dir):
test_data = pd.read_csv(os.path.join(test_dir, "test.csv"))
x_test = test_data[feature_columns].to_numpy()
y_test = test_data[label_column].to_numpy()
print("x test", x_test.shape, "y test", y_test.shape)
return x_test, y_test
def get_model():
inputs = tf.keras.Input(shape=(8,))
hidden_1 = tf.keras.layers.Dense(8, activation="tanh")(inputs)
hidden_2 = tf.keras.layers.Dense(4, activation="sigmoid")(hidden_1)
outputs = tf.keras.layers.Dense(1)(hidden_2)
return tf.keras.Model(inputs=inputs, outputs=outputs)
if __name__ == "__main__":
args, _ = parse_args()
print("Training data location: {}".format(args.train))
print("Test data location: {}".format(args.test))
x_train, y_train = get_train_data(args.train)
x_test, y_test = get_test_data(args.test)
batch_size = args.batch_size
epochs = args.epochs
learning_rate = args.learning_rate
print(
"batch_size = {}, epochs = {}, learning rate = {}".format(batch_size, epochs, learning_rate)
)
model = get_model()
optimizer = tf.keras.optimizers.SGD(learning_rate)
model.compile(optimizer=optimizer, loss="mse")
model.fit(
x_train, y_train, batch_size=batch_size, epochs=epochs, validation_data=(x_test, y_test)
)
# evaluate on test set
scores = model.evaluate(x_test, y_test, batch_size, verbose=2)
print("\nTest MSE :", scores)
# save model
model.save(args.sm_model_dir + "/1")
evaluate.py
import os
import json
import sys
import numpy as np
import pandas as pd
import pathlib
import tarfile
feature_columns = [
"longitude",
"latitude",
"housingMedianAge",
"totalRooms",
"totalBedrooms",
"population",
"households",
"medianIncome",
]
label_column = "medianHouseValue"
if __name__ == "__main__":
model_path = f"/opt/ml/processing/model/model.tar.gz"
with tarfile.open(model_path, "r:gz") as tar:
tar.extractall("./model")
import tensorflow as tf
model = tf.keras.models.load_model("./model/1")
test_path = "/opt/ml/processing/test/"
df = pd.read_csv(test_path + "/test.csv")
x_test = df[feature_columns].to_numpy()
y_test = df[label_column].to_numpy()
scores = model.evaluate(x_test, y_test, verbose=2)
print("\nTest MSE :", scores)
# Available metrics to add to model: https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-metrics.html
report_dict = {
"regression_metrics": {
"mse": {"value": scores, "standard_deviation": "NaN"},
},
}
output_dir = "/opt/ml/processing/evaluation"
pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
evaluation_path = f"{output_dir}/evaluation.json"
with open(evaluation_path, "w") as f:
f.write(json.dumps(report_dict))
register
deployment
import argparse
import boto3
import logging
import os
from botocore.exceptions import ClientError
import tarfile
import zipfile
logger = logging.getLogger(__name__)
sm_client = boto3.client("sagemaker")
def get_approved_package(model_package_group_name):
"""Gets the latest approved model package for a model package group.
Args:
model_package_group_name: The model package group name.
Returns:
The SageMaker Model Package ARN.
"""
try:
# Get the latest approved model package
response = sm_client.list_model_packages(
ModelPackageGroupName=model_package_group_name,
ModelApprovalStatus="Approved",
SortBy="CreationTime",
MaxResults=100,
)
approved_packages = response["ModelPackageSummaryList"]
# Fetch more packages if none returned with continuation token
while len(approved_packages) == 0 and "NextToken" in response:
logger.debug("Getting more packages for token: {}".format(response["NextToken"]))
response = sm_client.list_model_packages(
ModelPackageGroupName=model_package_group_name,
ModelApprovalStatus="Approved",
SortBy="CreationTime",
MaxResults=100,
NextToken=response["NextToken"],
)
approved_packages.extend(response["ModelPackageSummaryList"])
# Return error if no packages found
if len(approved_packages) == 0:
error_message = (
f"No approved ModelPackage found for ModelPackageGroup: {model_package_group_name}"
)
logger.error(error_message)
raise Exception(error_message)
# Return the pmodel package arn
model_package_arn = approved_packages[0]["ModelPackageArn"]
logger.info(f"Identified the latest approved model package: {model_package_arn}")
return approved_packages[0]
# return model_package_arn
except ClientError as e:
error_message = e.response["Error"]["Message"]
logger.error(error_message)
raise Exception(error_message)
Pipeline
Common Part
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat
from sagemaker.workflow.pipeline_context import PipelineSession
pipeline_session = PipelineSession()
# raw input data
input_data = ParameterString(name="InputData", default_value=raw_s3)
# status of newly trained model in registry
model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="Approved")
# processing step parameters
processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.m5.xlarge")
processing_instance_count= ParameterInteger(name="ProcessingInstanceCount", default_value=1)
# training step parameters
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
training_epochs = ParameterString(name="TrainingEpochs", default_value="100")
# model performance step parameters
accuracy_mse_threshold = ParameterFloat(name="AccuracyMseThreshold", default_value=0.75)
Nodes
Preprocess Node
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
sklearn_framework_version = "1.2-1"
sklearn_processor = SKLearnProcessor(
framework_version=sklearn_framework_version,
instance_type="ml.m5.large",
instance_count=processing_instance_count,
base_job_name="sklearn-housing-data-process",
role=role,
sagemaker_session=pipeline_session,
)
processor_args = sklearn_processor.run(
inputs=[
ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
],
outputs=[
ProcessingOutput(output_name="scaler_model", source="/opt/ml/processing/scaler_model"),
ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
],
code="code/preprocess.py",
)
Training Node
from sagemaker.tensorflow import TensorFlow
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.model_step import ModelStep
import time
# Where to store the trained model
model_path = f"s3://{bucket}/{prefix}/model/"
hyperparameters = {"epochs": training_epochs}
tensorflow_version = "2.4.1"
python_version = "py37"
tf2_estimator = TensorFlow(
source_dir="code",
entry_point="train.py",
instance_type=training_instance_type,
instance_count=1,
framework_version=tensorflow_version,
role=role,
base_job_name="tensorflow-train-model",
output_path=model_path,
hyperparameters=hyperparameters,
py_version=python_version,
sagemaker_session=pipeline_session,
)
# NOTE how the input to the training job directly references the output of the previous step.
train_args = tf2_estimator.fit(
inputs={
"train": TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
content_type="text/csv",
),
"test": TrainingInput(
s3_data=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
content_type="text/csv",
),
}
)
Evaluation
from sagemaker.workflow.properties import PropertyFile
from sagemaker.sklearn.processing import ScriptProcessor
tf_eval_image_uri = sagemaker.image_uris.retrieve(
framework="tensorflow",
region=region,
version=tensorflow_version,
image_scope="training",
py_version="py37",
instance_type="ml.m5.xlarge",
)
evaluate_model_processor = ScriptProcessor(
role=role,
image_uri=tf_eval_image_uri,
command=["python3"],
instance_count=1,
instance_type=processing_instance_type,
sagemaker_session=pipeline_session,
)
# Create a PropertyFile
# A PropertyFile is used to be able to reference outputs from a processing step, for instance to use in a condition step.
# For more information, visit https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-propertyfile.html
evaluation_report = PropertyFile(
name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)
eval_args = evaluate_model_processor.run(
inputs=[
ProcessingInput(
source=step_train_model.properties.ModelArtifacts.S3ModelArtifacts,
destination="/opt/ml/processing/model",
),
ProcessingInput(
source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
destination="/opt/ml/processing/test",
),
],
outputs=[
ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
],
code="code/evaluate.py",
)
Pipeline Model
from sagemaker.model import Model
from sagemaker.sklearn.model import SKLearnModel
from sagemaker import PipelineModel
scaler_model_s3 = "{}/model.tar.gz".format(
step_process.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
)
scaler_model = SKLearnModel(
model_data=scaler_model_s3,
role=role,
sagemaker_session=pipeline_session,
entry_point="code/preprocess.py",
framework_version=sklearn_framework_version,
)
tf_model_image_uri = sagemaker.image_uris.retrieve(
framework="tensorflow",
region=region,
version=tensorflow_version,
image_scope="inference",
py_version="py37",
instance_type="ml.m5.xlarge",
)
tf_model = Model(
image_uri=tf_model_image_uri,
model_data=step_train_model.properties.ModelArtifacts.S3ModelArtifacts,
sagemaker_session=pipeline_session,
role=role,
)
pipeline_model = PipelineModel(
models=[scaler_model, tf_model], role=role, sagemaker_session=pipeline_session
)
Register Model
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel
evaluation_s3_uri = "{}/evaluation.json".format(
step_evaluate_model.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
)
model_metrics = ModelMetrics(
model_statistics=MetricsSource(
s3_uri=evaluation_s3_uri,
content_type="application/json",
)
)
register_args = pipeline_model.register(
content_types=["text/csv"],
response_types=["text/csv"],
inference_instances=["ml.m5.large", "ml.m5.xlarge"],
transform_instances=["ml.m5.xlarge"],
model_package_group_name=model_package_group_name,
model_metrics=model_metrics,
approval_status=model_approval_status,
)
step_register_pipeline_model = ModelStep(
name="PipelineModel",
step_args=register_args,
)
Conditional Evaluation
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
# Create accuracy condition to ensure the model meets performance requirements.
# Models with a test accuracy lower than the condition will not be registered with the model registry.
cond_lte = ConditionLessThanOrEqualTo(
left=JsonGet(
step_name=step_evaluate_model.name,
property_file=evaluation_report,
json_path="regression_metrics.mse.value",
),
right=accuracy_mse_threshold,
)
Construct Whole Pipeline
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.pipeline import Pipeline
step_process = ProcessingStep(
name="PreprocessData",
step_args=processor_args,
)
step_train_model = TrainingStep(
name="TrainTensorflowModel",
step_args=train_args
)
step_evaluate_model = ProcessingStep(
name="EvaluateModelPerformance",
step_args=eval_args,
property_files=[evaluation_report],
)
# Create a Sagemaker Pipelines ConditionStep, using the condition above.
# Enter the steps to perform if the condition returns True / False.
step_cond = ConditionStep(
name="MSE-Lower-Than-Threshold-Condition",
conditions=[cond_lte],
if_steps=[step_register_pipeline_model], # step_register_model, step_register_scaler,
else_steps=[],
)
# Create a Sagemaker Pipeline.
# Each parameter for the pipeline must be set as a parameter explicitly when the pipeline is created.
# Also pass in each of the steps created above.
# Note that the order of execution is determined from each step's dependencies on other steps,
# not on the order they are passed in below.
pipeline = Pipeline(
name=pipeline_name,
parameters=[
training_instance_type,
processing_instance_type,
processing_instance_count,
input_data,
model_approval_status,
training_epochs,
accuracy_mse_threshold,
],
steps=[step_process, step_train_model, step_evaluate_model, step_cond],
)
import json
definition = json.loads(pipeline.definition())
definition
role_arn = f"arn:aws:iam::654654179472:role/service-role/{role}"
pipeline.upsert(role_arn=role_arn)
execution = pipeline.start()
execution.wait()