Specifications for Writing Model Inference Code¶
This section describes the general method of editing model inference code in ModelArts. This section also provides an inference code example for the TensorFlow engine and an example of customizing the inference logic in the inference script.
Specifications for Compiling Inference Code¶
All custom Python code must be inherited from the BaseService class. Table 1 lists the import statements of different types of model parent classes.
¶ Model Type
Parent Class
Import Statement
TensorFlow
TfServingBaseService
from model_service.tfserving_model_service import TfServingBaseService
MXNet
MXNetBaseService
from mms.model_service.mxnet_model_service import MXNetBaseService
PyTorch
PTServingBaseService
from model_service.pytorch_model_service import PTServingBaseService
Spark_MLlib
SparkServingBaseService
from model_service.spark_model_service import SparkServingBaseService
Caffe
CaffeBaseService
from model_service.caffe_model_service import CaffeBaseService
XGBoost
XgSklServingBaseService
from model_service.python_model_service import XgSklServingBaseService
Scikit_Learn
XgSklServingBaseService
from model_service.python_model_service import XgSklServingBaseService
MindSpore
SingleNodeService
from model_service.model_service import SingleNodeService
The following methods can be rewritten:
¶ Method
Description
__init__(self, model_name, model_path)
Initialization method, which is suitable for models created based on deep learning frameworks. Models and labels are loaded using this method. This method must be rewritten for models based on PyTorch and Caffe to implement the model loading logic.
__init__(self, model_path)
Initialization method, which is suitable for models created based on machine learning frameworks. The model path (self.model_path) is initialized using this method. In Spark_MLlib, this method also initializes SparkSession (self.spark).
_preprocess(self, data)
Preprocess method, which is called before an inference request and is used to convert the original request data of an API into the expected input data of a model
_inference(self, data)
Inference request method. You are advised not to rewrite the method because once the method is rewritten, the ModelArts built-in inference process will be overwritten and the custom inference logic will run.
_postprocess(self, data)
Postprocess method, which is called after an inference request is complete and is used to convert the model output to the API output
batch_inference(self, data)
Request processing method bound to the micro-batch function using the /batch API in the built-in engines Spark_MLlib, Scikit_Learn, and XGBoost
Note
You can choose to rewrite the preprocess and postprocess methods to implement preprocessing of the API input and postprocessing of the inference output.
Rewriting the init method of the BaseService inheritance class may cause an AI application to run abnormally.
The attribute that can be used is the local path where the model resides. The attribute name is self.model_path. In addition, PySpark-based models can use self.spark to obtain the SparkSession object in customize_service.py.
Note
An absolute path is required for reading files in the inference code. You can obtain the absolute path of the model from the self.model_path attribute.
When TensorFlow, Caffe, or MXNet is used, self.model_path indicates the path of the model file. See the following example:
# Store the label.json file in the model directory. The following information is read: with open(os.path.join(self.model_path, 'label.json')) as f: self.label = json.load(f)
When PyTorch, Scikit_Learn, or PySpark is used, self.model_path indicates the path of the model file. See the following example:
# Store the label.json file in the model directory. The following information is read: dir_path = os.path.dirname(os.path.realpath(self.model_path)) with open(os.path.join(dir_path, 'label.json')) as f: self.label = json.load(f)
data imported through the API for pre-processing, actual inference request, and post-processing can be multipart/form-data or application/json.
multipart/form-data request
curl -X POST \ <modelarts-inference-endpoint> \ -F image1=@cat.jpg \ -F images2=@horse.jpg
The corresponding input data is as follows:
[ { "image1":{ "cat.jpg":"<cat..jpg file io>" } }, { "image2":{ "horse.jpg":"<horse.jpg file io>" } } ]
application/json request
curl -X POST \ <modelarts-inference-endpoint> \ -d '{ "images":"base64 encode image" }'
The corresponding input data is python dict.
{ "images":"base64 encode image" }
TensorFlow Inference Script Example¶
The following is an example of TensorFlow MnistService.
Inference code
from PIL import Image import numpy as np from model_service.tfserving_model_service import TfServingBaseService class mnist_service(TfServingBaseService): def _preprocess(self, data): preprocessed_data = {} for k, v in data.items(): for file_name, file_content in v.items(): image1 = Image.open(file_content) image1 = np.array(image1, dtype=np.float32) image1.resize((1, 784)) preprocessed_data[k] = image1 return preprocessed_data def _postprocess(self, data): infer_output = {} for output_name, result in data.items(): infer_output["mnist_result"] = result[0].index(max(result[0])) return infer_output
Request
curl -X POST \ Real-time service address \ -F images=@test.jpg
Response
{"mnist_result": 7}
The preceding code example resizes images imported to the user's form to adapt to the model input shape. The 32x32 image is read from the Pillow library and resized to 1x784 to match the model input. In subsequent processing, convert the model output into a list for the RESTful API to display.
Inference Script Example of the Custom Inference Logic¶
Customize a dependency package in the configuration file by referring to Example of a Model Configuration File Using a Custom Dependency Package. Then, use the following code example to implement the loading and inference of the model in saved_model format.
# -*- coding: utf-8 -*-
import json
import os
import threading
import numpy as np
import tensorflow as tf
from PIL import Image
from model_service.tfserving_model_service import TfServingBaseService
import logging
logger = logging.getLogger(__name__)
class MnistService(TfServingBaseService):
def __init__(self, model_name, model_path):
self.model_name = model_name
self.model_path = model_path
self.model_inputs = {}
self.model_outputs = {}
# The label file can be loaded here and used in the post-processing function.
# Directories for storing the label.txt file on OBS and in the model package
# with open(os.path.join(self.model_path, 'label.txt')) as f:
# self.label = json.load(f)
# Load the model in saved_model format in non-blocking mode to prevent blocking timeout.
thread = threading.Thread(target=self.get_tf_sess)
thread.start()
def get_tf_sess(self):
# Load the model in saved_model format.
# The session will be reused. Do not use the with statement.
sess = tf.Session(graph=tf.Graph())
meta_graph_def = tf.saved_model.loader.load(sess, [tf.saved_model.tag_constants.SERVING], self.model_path)
signature_defs = meta_graph_def.signature_def
self.sess = sess
signature = []
# only one signature allowed
for signature_def in signature_defs:
signature.append(signature_def)
if len(signature) == 1:
model_signature = signature[0]
else:
logger.warning("signatures more than one, use serving_default signature")
model_signature = tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY
logger.info("model signature: %s", model_signature)
for signature_name in meta_graph_def.signature_def[model_signature].inputs:
tensorinfo = meta_graph_def.signature_def[model_signature].inputs[signature_name]
name = tensorinfo.name
op = self.sess.graph.get_tensor_by_name(name)
self.model_inputs[signature_name] = op
logger.info("model inputs: %s", self.model_inputs)
for signature_name in meta_graph_def.signature_def[model_signature].outputs:
tensorinfo = meta_graph_def.signature_def[model_signature].outputs[signature_name]
name = tensorinfo.name
op = self.sess.graph.get_tensor_by_name(name)
self.model_outputs[signature_name] = op
logger.info("model outputs: %s", self.model_outputs)
def _preprocess(self, data):
# Two request modes using HTTPS
# 1. The request in form-data file format is as follows: data = {"Request key value":{"File name":<File io>}}
# 2. Request in JSON format is as follows: data = json.loads("JSON body transferred by the API")
preprocessed_data = {}
for k, v in data.items():
for file_name, file_content in v.items():
image1 = Image.open(file_content)
image1 = np.array(image1, dtype=np.float32)
image1.resize((1, 28, 28))
preprocessed_data[k] = image1
return preprocessed_data
def _inference(self, data):
feed_dict = {}
for k, v in data.items():
if k not in self.model_inputs.keys():
logger.error("input key %s is not in model inputs %s", k, list(self.model_inputs.keys()))
raise Exception("input key %s is not in model inputs %s" % (k, list(self.model_inputs.keys())))
feed_dict[self.model_inputs[k]] = v
result = self.sess.run(self.model_outputs, feed_dict=feed_dict)
logger.info('predict result : ' + str(result))
return result
def _postprocess(self, data):
infer_output = {"mnist_result": []}
for output_name, results in data.items():
for result in results:
infer_output["mnist_result"].append(np.argmax(result))
return infer_output
def __del__(self):
self.sess.close()
MindSpore Inference Script Example¶
import threading
import mindspore
import mindspore.nn as nn
import numpy as np
import logging
from mindspore import Tensor, context
from mindspore.common.initializer import Normal
from mindspore.train.serialization import load_checkpoint, load_param_into_net
from model_service.model_service import SingleNodeService
from PIL import Image
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
class LeNet5(nn.Cell):
"""Lenet network structure."""
# define the operator required
def __init__(self, num_class=10, num_channel=1):
super(LeNet5, self).__init__()
self.conv1 = nn.Conv2d(num_channel, 6, 5, pad_mode='valid')
self.conv2 = nn.Conv2d(6, 16, 5, pad_mode='valid')
self.fc1 = nn.Dense(16 * 5 * 5, 120, weight_init=Normal(0.02))
self.fc2 = nn.Dense(120, 84, weight_init=Normal(0.02))
self.fc3 = nn.Dense(84, num_class, weight_init=Normal(0.02))
self.relu = nn.ReLU()
self.max_pool2d = nn.MaxPool2d(kernel_size=2, stride=2)
self.flatten = nn.Flatten()
# use the preceding operators to construct networks
def construct(self, x):
x = self.max_pool2d(self.relu(self.conv1(x)))
x = self.max_pool2d(self.relu(self.conv2(x)))
x = self.flatten(x)
x = self.relu(self.fc1(x))
x = self.relu(self.fc2(x))
x = self.fc3(x)
return x
class mnist_service(SingleNodeService):
def __init__(self, model_name, model_path):
self.model_name = model_name
self.model_path = model_path
logger.info("self.model_name:%s self.model_path: %s", self.model_name,
self.model_path)
self.network = None
# Load the model in non-blocking mode to prevent blocking timeout.
thread = threading.Thread(target=self.load_model)
thread.start()
def load_model(self):
logger.info("load network ... \n")
self.network = LeNet5()
ckpt_file = self.model_path + "/checkpoint_lenet_1-1_1875.ckpt"
logger.info("ckpt_file: %s", ckpt_file)
param_dict = load_checkpoint(ckpt_file)
load_param_into_net(self.network, param_dict)
# Inference warm-up. Otherwise, the initial inference will take a long time.
self.network_warmup()
logger.info("load network successfully ! \n")
def network_warmup(self):
# Inference warm-up. Otherwise, the initial inference will take a long time.
logger.info("warmup network ... \n")
images = np.array(np.random.randn(1, 1, 32, 32), dtype=np.float32)
inputs = Tensor(images, mindspore.float32)
inference_result = self.network(inputs)
logger.info("warmup network successfully ! \n")
def _preprocess(self, input_data):
preprocessed_result = {}
images = []
for k, v in input_data.items():
for file_name, file_content in v.items():
image1 = Image.open(file_content)
image1 = image1.resize((1, 32 * 32))
image1 = np.array(image1, dtype=np.float32)
images.append(image1)
images = np.array(images, dtype=np.float32)
logger.info(images.shape)
images.resize([len(input_data), 1, 32, 32])
logger.info("images shape: %s", images.shape)
inputs = Tensor(images, mindspore.float32)
preprocessed_result['images'] = inputs
return preprocessed_result
def _inference(self, preprocessed_result):
inference_result = self.network(preprocessed_result['images'])
return inference_result
def _postprocess(self, inference_result):
return str(inference_result)