Source code for omegaml.backends.tensorflow.tfsavedmodel

  1import glob
  2import os
  3import tempfile
  4from shutil import rmtree
  5from zipfile import ZipFile, ZIP_DEFLATED
  6
  7import numpy as np
  8import tensorflow as tf
  9from tensorflow.python.framework.ops import EagerTensor
 10
 11from omegaml.backends.basemodel import BaseModelBackend
 12
 13
 14class TensorflowSavedModelPredictor(object):
 15    """
 16    A predictor model from a TF SavedModel
 17    """
 18
 19    def __init__(self, model_dir):
 20        self.model_dir = model_dir
 21        if tf.__version__.startswith('1'):
 22            self.__init_tf_v1()
 23        else:
 24            self.__init__tf_v2()
 25
 26    def __init__tf_v2(self):
 27        imported = tf.saved_model.load(self.model_dir)
 28        if callable(imported):
 29            self.predict_fn = imported
 30        else:
 31            self.predict_fn = imported.signatures["serving_default"]
 32        self.inputs = imported.signatures["serving_default"].inputs
 33        self.outputs = imported.signatures["serving_default"].outputs
 34        self._convert_to_model_input = self._convert_to_model_input_v2
 35        self._convert_to_model_output = self._convert_to_model_output_v2
 36
 37    def __init_tf_v1(self):
 38        from tensorflow.contrib import predictor
 39        self.predict_fn = predictor.from_saved_model(self.model_dir)
 40        self.input_names = list(self.predict_fn.feed_tensors.keys())
 41        self.output_names = list(self.predict_fn.fetch_tensors.keys())
 42        self._convert_to_model_input = self._convert_to_model_input_v1
 43        self._convert_to_model_output = self._convert_to_model_output_v1
 44
 45    def _convert_to_model_input_v1(self, X):
 46        # coerce input into expected feature mapping
 47        model_input = {
 48            self.input_names[0]: X
 49        }
 50        return model_input
 51
 52    def _convert_to_model_input_v2(self, X):
 53        # coerce input into expected feature mapping
 54        from omegaml.backends.tensorflow import _tffn
 55        return _tffn('convert_to_tensor')(X,
 56                                          name=self.inputs[0].name,
 57                                          dtype=self.inputs[0].dtype)
 58
 59    def _convert_to_model_output_v1(self, yhat):
 60        # coerce output into dict or array-like response
 61        if len(self.output_names) == 1:
 62            yhat = yhat[self.output_names[0]]
 63        return yhat
 64
 65    def _convert_to_model_output_v2(self, yhat):
 66        # coerce output into dict or array-like response
 67        return yhat
 68
 69    def predict(self, X):
 70        yhat = self.predict_fn(self._convert_to_model_input(X))
 71        return self._convert_to_model_output(yhat)
 72
 73
[docs] 74class TensorflowSavedModelBackend(BaseModelBackend): 75 KIND = 'tf.savedmodel' 76 _model_ext = 'tfsm' 77
[docs] 78 @classmethod 79 def supports(self, obj, name, **kwargs): 80 import tensorflow as tf 81 return isinstance(obj, (tf.estimator.Estimator, tf.compat.v1.estimator.Estimator))
82 83 def _package_model(self, model, key, tmpfn, serving_input_fn=None, 84 strip_default_attrs=None, **kwargs): 85 export_dir_base = self._make_savedmodel(model, serving_input_receiver_fn=serving_input_fn, 86 strip_default_attrs=strip_default_attrs) 87 zipfname = self._package_savedmodel(export_dir_base, key) 88 rmtree(export_dir_base) 89 return zipfname 90 91 def _extract_model(self, infile, key, tmpfn, **kwargs): 92 with open(tmpfn, 'wb') as pkgfn: 93 pkgfn.write(infile.read()) 94 model = self._extract_savedmodel(tmpfn) 95 return model 96 97 def _package_savedmodel(self, export_base_dir, filename): 98 fname = os.path.basename(filename) 99 zipfname = os.path.join(self.model_store.tmppath, fname) 100 # check if we have an intermediate directory (timestamp) 101 # as in export_base_dir/<timestamp>, if so, use this as the base directory 102 # see https://www.tensorflow.org/guide/saved_model#perform_the_export 103 # we need this check because not all SavedModel exports create a timestamp 104 # directory. e.g. keras.save_keras_model() does not, while Estimator.export_saved_model does 105 files = glob.glob(os.path.join(export_base_dir, '*')) 106 if len(files) == 1: 107 export_base_dir = files[0] 108 with ZipFile(zipfname, 'w', compression=ZIP_DEFLATED) as zipf: 109 for part in glob.glob(os.path.join(export_base_dir, '**'), recursive=True): 110 zipf.write(part, os.path.relpath(part, export_base_dir)) 111 return zipfname 112 113 def _extract_savedmodel(self, packagefname): 114 lpath = tempfile.mkdtemp() 115 fname = os.path.basename(packagefname) 116 mklfname = os.path.join(lpath, fname) 117 with ZipFile(packagefname) as zipf: 118 zipf.extractall(lpath) 119 model = TensorflowSavedModelPredictor(lpath) 120 rmtree(lpath) 121 return model 122 123 def _make_savedmodel(self, obj, serving_input_receiver_fn=None, strip_default_attrs=None): 124 # adapted from https://www.tensorflow.org/guide/saved_model#perform_the_export 125 export_dir_base = tempfile.mkdtemp() 126 obj.export_savedmodel(export_dir_base, 127 serving_input_receiver_fn=serving_input_receiver_fn, 128 strip_default_attrs=strip_default_attrs) 129 return export_dir_base 130
[docs] 131 def predict( 132 self, modelname, Xname, rName=None, pure_python=True, **kwargs): 133 """ 134 Predict from a SavedModel 135 136 Args: 137 modelname: 138 Xname: 139 rName: 140 pure_python: 141 kwargs: 142 143 Returns: 144 145 """ 146 model = self.get_model(modelname) 147 X = self._resolve_input_data('predict', Xname, 'X', **kwargs) 148 result = model.predict(X) 149 150 def ensure_serializable(data): 151 # convert to numpy 152 if isinstance(data, dict): 153 for k, v in data.items(): 154 data[k] = ensure_serializable(v) 155 elif isinstance(data, EagerTensor): 156 data = data.numpy() 157 if pure_python: 158 data = data.tolist() 159 return data 160 161 result = ensure_serializable(result) 162 return self._prepare_result('predict', result, rName=rName, pure_python=pure_python, **kwargs)
163
[docs] 164 def fit(self, modelname, Xname, Yname=None, pure_python=True, tpu_specs=None, **kwargs): 165 raise ValueError('cannot fit a saved model')
166 167 168class ServingInput(object): 169 # FIXME this is not working yet 170 def __init__(self, model=None, features=None, like=None, shape=None, dtype=None, 171 batchsize=1, from_keras=False, v1_compat=False): 172 """ 173 Helper to create serving_input_fn 174 175 Uses tf.build_raw_serving_input_receiver_fn to build a ServingInputReceiver 176 from the given inputs 177 178 Usage: 179 # use existing ndarray e.g. training or test data to specify a single input feature 180 ServingInput(features=['x'], like=ndarray) 181 182 # specify the dtype and shape explicitely 183 ServingInput(features=['x'], shape=(1, 28, 28)) 184 185 # use multiple features 186 ServingInput(features={'f1': tf.Feature(...)) 187 188 # for tf.keras models turned estimator, specify from_keras 189 # to ensure the input features are renamed correctly. 190 ServingInput(features=['x'], like=ndarray, from_keras=True) 191 192 Args: 193 model: 194 features: 195 like: 196 shape: 197 dtype: 198 batchsize: 199 from_keras: 200 """ 201 self.model = model 202 self.features = features or ['X'] 203 self.like = like 204 self.shape = shape 205 self.dtype = dtype 206 self.batchsize = batchsize 207 self.from_keras = from_keras 208 self.v1_compat = v1_compat 209 210 def build(self): 211 if isinstance(self.features, dict): 212 input_fn = self.from_features() 213 elif isinstance(self.like, np.ndarray): 214 shape = tuple((self.batchsize, *self.like.shape[1:])) # assume (rows, *cols) 215 input_fn = self.from_ndarray(shape, self.like.dtype) 216 elif isinstance(self.shape, (list, tuple, np.ndarray)): 217 input_fn = self.from_ndarray(self.shape, self.dtype) 218 return input_fn 219 220 def __call__(self): 221 input_fn = self.build() 222 return input_fn() 223 224 @property 225 def tf(self): 226 if self.v1_compat: 227 # https://www.tensorflow.org/guide/migrate 228 import tensorflow.compat.v1 as tf 229 tf.disable_v2_behavior() 230 else: 231 import tensorflow as tf 232 return tf 233 234 def from_features(self): 235 tf = self.tf 236 input_fn = tf.estimator.export.build_raw_serving_input_receiver_fn( 237 self.features, 238 default_batch_size=self.batchsize 239 ) 240 return input_fn 241 242 def from_ndarray(self, shape, dtype): 243 tf = self.tf 244 if self.from_keras: 245 input_layer_name = '{}_input'.format(self.features[0]) 246 else: 247 input_layer_name = self.features[0] 248 if self.v1_compat: 249 features = { 250 input_layer_name: tf.placeholder(dtype=dtype, shape=shape, ) 251 } 252 else: 253 features = { 254 input_layer_name: tf.TensorSpec(shape=shape, dtype=dtype) 255 } 256 input_fn = tf.estimator.export.build_raw_serving_input_receiver_fn( 257 features, 258 default_batch_size=None 259 ) 260 return input_fn 261 262 def from_dataframe(self, columns, input_layer_name='X', 263 batch_size=1, dtype=np.float32): 264 def serving_input_fn(): 265 import tensorflow as tf 266 ndim = len(columns) 267 X_name = '{}_input'.format(input_layer_name) 268 placeholder = tf.placeholder(dtype=np.float32, 269 shape=(batch_size, ndim), 270 name=X_name) 271 receiver_tensors = {X_name: placeholder} 272 features = {X_name: placeholder} 273 return tf.estimator.export.ServingInputReceiver(features, receiver_tensors)