Specifications for Writing Model Inference Code

This section describes the general method of editing model inference code in ModelArts. This section also provides an inference code example for the TensorFlow engine and an example of customizing the inference logic in the inference script.

Specifications for Compiling Inference Code

  1. All custom Python code must be inherited from the BaseService class. Table 1 lists the import statements of different types of model parent classes.

    Table 1 Import statements of the BaseService class

    Model Type

    Parent Class

    Import Statement

    TensorFlow

    TfServingBaseService

    from model_service.tfserving_model_service import TfServingBaseService

    MXNet

    MXNetBaseService

    from mms.model_service.mxnet_model_service import MXNetBaseService

    PyTorch

    PTServingBaseService

    from model_service.pytorch_model_service import PTServingBaseService

    Spark_MLlib

    SparkServingBaseService

    from model_service.spark_model_service import SparkServingBaseService

    Caffe

    CaffeBaseService

    from model_service.caffe_model_service import CaffeBaseService

    XGBoost

    XgSklServingBaseService

    from model_service.python_model_service import XgSklServingBaseService

    Scikit_Learn

    XgSklServingBaseService

    from model_service.python_model_service import XgSklServingBaseService

    MindSpore

    SingleNodeService

    from model_service.model_service import SingleNodeService

  2. The following methods can be rewritten:

    Table 2 Methods to be rewritten

    Method

    Description

    __init__(self, model_name, model_path)

    Initialization method, which is suitable for models created based on deep learning frameworks. Models and labels are loaded using this method. This method must be rewritten for models based on PyTorch and Caffe to implement the model loading logic.

    __init__(self, model_path)

    Initialization method, which is suitable for models created based on machine learning frameworks. The model path (self.model_path) is initialized using this method. In Spark_MLlib, this method also initializes SparkSession (self.spark).

    _preprocess(self, data)

    Preprocess method, which is called before an inference request and is used to convert the original request data of an API into the expected input data of a model

    _inference(self, data)

    Inference request method. You are advised not to rewrite the method because once the method is rewritten, the ModelArts built-in inference process will be overwritten and the custom inference logic will run.

    _postprocess(self, data)

    Postprocess method, which is called after an inference request is complete and is used to convert the model output to the API output

    batch_inference(self, data)

    Request processing method bound to the micro-batch function using the /batch API in the built-in engines Spark_MLlib, Scikit_Learn, and XGBoost

    Note

    • You can choose to rewrite the preprocess and postprocess methods to implement preprocessing of the API input and postprocessing of the inference output.

    • Rewriting the init method of the BaseService inheritance class may cause an AI application to run abnormally.

  3. The attribute that can be used is the local path where the model resides. The attribute name is self.model_path. In addition, PySpark-based models can use self.spark to obtain the SparkSession object in customize_service.py.

    Note

    An absolute path is required for reading files in the inference code. You can obtain the absolute path of the model from the self.model_path attribute.

    • When TensorFlow, Caffe, or MXNet is used, self.model_path indicates the path of the model file. See the following example:

      # Store the label.json file in the model directory. The following information is read:
      with open(os.path.join(self.model_path, 'label.json')) as f:
          self.label = json.load(f)
      
    • When PyTorch, Scikit_Learn, or PySpark is used, self.model_path indicates the path of the model file. See the following example:

      # Store the label.json file in the model directory. The following information is read:
      dir_path = os.path.dirname(os.path.realpath(self.model_path))
      with open(os.path.join(dir_path, 'label.json')) as f:
          self.label = json.load(f)
      
  4. data imported through the API for pre-processing, actual inference request, and post-processing can be multipart/form-data or application/json.

    • multipart/form-data request

      curl -X POST \
        <modelarts-inference-endpoint> \
        -F image1=@cat.jpg \
        -F images2=@horse.jpg
      

      The corresponding input data is as follows:

      [
         {
            "image1":{
               "cat.jpg":"<cat..jpg file io>"
            }
         },
         {
            "image2":{
               "horse.jpg":"<horse.jpg file io>"
            }
         }
      ]
      
    • application/json request

      curl -X POST \
        <modelarts-inference-endpoint> \
        -d '{
         "images":"base64 encode image"
         }'
      

      The corresponding input data is python dict.

      {
         "images":"base64 encode image"
      
      }
      

TensorFlow Inference Script Example

The following is an example of TensorFlow MnistService.

  • Inference code

    from PIL import Image
    import numpy as np
    from model_service.tfserving_model_service import TfServingBaseService
    
    class mnist_service(TfServingBaseService):
    
        def _preprocess(self, data):
            preprocessed_data = {}
    
            for k, v in data.items():
                for file_name, file_content in v.items():
                    image1 = Image.open(file_content)
                    image1 = np.array(image1, dtype=np.float32)
                    image1.resize((1, 784))
                    preprocessed_data[k] = image1
    
            return preprocessed_data
    
        def _postprocess(self, data):
    
            infer_output = {}
    
            for output_name, result in data.items():
    
                infer_output["mnist_result"] = result[0].index(max(result[0]))
    
            return infer_output
    
  • Request

    curl -X POST \ Real-time service address \ -F images=@test.jpg
    
  • Response

    {"mnist_result": 7}
    

The preceding code example resizes images imported to the user's form to adapt to the model input shape. The 32x32 image is read from the Pillow library and resized to 1x784 to match the model input. In subsequent processing, convert the model output into a list for the RESTful API to display.

Inference Script Example of the Custom Inference Logic

Customize a dependency package in the configuration file by referring to Example of a Model Configuration File Using a Custom Dependency Package. Then, use the following code example to implement the loading and inference of the model in saved_model format.

# -*- coding: utf-8 -*-
import json
import os
import threading

import numpy as np
import tensorflow as tf
from PIL import Image

from model_service.tfserving_model_service import TfServingBaseService
import logging

logger = logging.getLogger(__name__)


class MnistService(TfServingBaseService):

    def __init__(self, model_name, model_path):
        self.model_name = model_name
        self.model_path = model_path
        self.model_inputs = {}
        self.model_outputs = {}

       # The label file can be loaded here and used in the post-processing function.
        # Directories for storing the label.txt file on OBS and in the model package

        # with open(os.path.join(self.model_path, 'label.txt')) as f:
        #     self.label = json.load(f)

        # Load the model in saved_model format in non-blocking mode to prevent blocking timeout.
        thread = threading.Thread(target=self.get_tf_sess)
        thread.start()

    def get_tf_sess(self):
        # Load the model in saved_model format.

       # The session will be reused. Do not use the with statement.
        sess = tf.Session(graph=tf.Graph())
        meta_graph_def = tf.saved_model.loader.load(sess, [tf.saved_model.tag_constants.SERVING], self.model_path)
        signature_defs = meta_graph_def.signature_def

        self.sess = sess

        signature = []

        # only one signature allowed
        for signature_def in signature_defs:
            signature.append(signature_def)
        if len(signature) == 1:
            model_signature = signature[0]
        else:
            logger.warning("signatures more than one, use serving_default signature")
            model_signature = tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY

        logger.info("model signature: %s", model_signature)

        for signature_name in meta_graph_def.signature_def[model_signature].inputs:
            tensorinfo = meta_graph_def.signature_def[model_signature].inputs[signature_name]
            name = tensorinfo.name
            op = self.sess.graph.get_tensor_by_name(name)
            self.model_inputs[signature_name] = op

        logger.info("model inputs: %s", self.model_inputs)

        for signature_name in meta_graph_def.signature_def[model_signature].outputs:
            tensorinfo = meta_graph_def.signature_def[model_signature].outputs[signature_name]
            name = tensorinfo.name
            op = self.sess.graph.get_tensor_by_name(name)

            self.model_outputs[signature_name] = op

        logger.info("model outputs: %s", self.model_outputs)

    def _preprocess(self, data):
        # Two request modes using HTTPS
        # 1. The request in form-data file format is as follows: data = {"Request key value":{"File name":<File io>}}
       # 2. Request in JSON format is as follows: data = json.loads("JSON body transferred by the API")
        preprocessed_data = {}

        for k, v in data.items():
            for file_name, file_content in v.items():
                image1 = Image.open(file_content)
                image1 = np.array(image1, dtype=np.float32)
                image1.resize((1, 28, 28))
                preprocessed_data[k] = image1

        return preprocessed_data

    def _inference(self, data):

        feed_dict = {}
        for k, v in data.items():
            if k not in self.model_inputs.keys():
                logger.error("input key %s is not in model inputs %s", k, list(self.model_inputs.keys()))
                raise Exception("input key %s is not in model inputs %s" % (k, list(self.model_inputs.keys())))
            feed_dict[self.model_inputs[k]] = v

        result = self.sess.run(self.model_outputs, feed_dict=feed_dict)
        logger.info('predict result : ' + str(result))

        return result

    def _postprocess(self, data):
        infer_output = {"mnist_result": []}
        for output_name, results in data.items():

            for result in results:
                infer_output["mnist_result"].append(np.argmax(result))

        return infer_output

    def __del__(self):
        self.sess.close()

MindSpore Inference Script Example

import threading

import mindspore
import mindspore.nn as nn
import numpy as np
import logging
from mindspore import Tensor, context
from mindspore.common.initializer import Normal
from mindspore.train.serialization import load_checkpoint, load_param_into_net
from model_service.model_service import SingleNodeService
from PIL import Image

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)



context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")


class LeNet5(nn.Cell):
    """Lenet network structure."""

    # define the operator required
    def __init__(self, num_class=10, num_channel=1):
        super(LeNet5, self).__init__()
        self.conv1 = nn.Conv2d(num_channel, 6, 5, pad_mode='valid')
        self.conv2 = nn.Conv2d(6, 16, 5, pad_mode='valid')
        self.fc1 = nn.Dense(16 * 5 * 5, 120, weight_init=Normal(0.02))
        self.fc2 = nn.Dense(120, 84, weight_init=Normal(0.02))
        self.fc3 = nn.Dense(84, num_class, weight_init=Normal(0.02))
        self.relu = nn.ReLU()
        self.max_pool2d = nn.MaxPool2d(kernel_size=2, stride=2)
        self.flatten = nn.Flatten()

    # use the preceding operators to construct networks
    def construct(self, x):
        x = self.max_pool2d(self.relu(self.conv1(x)))
        x = self.max_pool2d(self.relu(self.conv2(x)))
        x = self.flatten(x)
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.fc3(x)
        return x


class mnist_service(SingleNodeService):
    def __init__(self, model_name, model_path):
        self.model_name = model_name
        self.model_path = model_path
        logger.info("self.model_name:%s self.model_path: %s", self.model_name,
                    self.model_path)
        self.network = None
        # Load the model in non-blocking mode to prevent blocking timeout.
        thread = threading.Thread(target=self.load_model)
        thread.start()

    def load_model(self):
        logger.info("load network ... \n")
        self.network = LeNet5()
        ckpt_file = self.model_path + "/checkpoint_lenet_1-1_1875.ckpt"
        logger.info("ckpt_file: %s", ckpt_file)
        param_dict = load_checkpoint(ckpt_file)
        load_param_into_net(self.network, param_dict)
        # Inference warm-up. Otherwise, the initial inference will take a long time.
        self.network_warmup()
        logger.info("load network successfully ! \n")

    def network_warmup(self):
        # Inference warm-up. Otherwise, the initial inference will take a long time.
        logger.info("warmup network ... \n")
        images = np.array(np.random.randn(1, 1, 32, 32), dtype=np.float32)
        inputs = Tensor(images, mindspore.float32)
        inference_result = self.network(inputs)
        logger.info("warmup network successfully ! \n")

    def _preprocess(self, input_data):
        preprocessed_result = {}
        images = []
        for k, v in input_data.items():
            for file_name, file_content in v.items():
                image1 = Image.open(file_content)
                image1 = image1.resize((1, 32 * 32))
                image1 = np.array(image1, dtype=np.float32)
                images.append(image1)

        images = np.array(images, dtype=np.float32)
        logger.info(images.shape)
        images.resize([len(input_data), 1, 32, 32])
        logger.info("images shape: %s", images.shape)
        inputs = Tensor(images, mindspore.float32)
        preprocessed_result['images'] = inputs

        return preprocessed_result

    def _inference(self, preprocessed_result):
        inference_result = self.network(preprocessed_result['images'])
        return inference_result

    def _postprocess(self, inference_result):
        return str(inference_result)