Source code for omegaml.backends.tensorflow.tfsavedmodel

  1import glob
  2import os
  3import tempfile
  4from shutil import rmtree
  5from zipfile import ZipFile, ZIP_DEFLATED
  6
  7import numpy as np
  8import tensorflow as tf
  9from tensorflow.python.framework.ops import EagerTensor
 10
 11from omegaml.backends.basemodel import BaseModelBackend
 12
 13
 14class TensorflowSavedModelPredictor(object):
 15    """
 16    A predictor model from a TF SavedModel
 17
 18    .. versionchanged:: 0.18.0
 19        Only supported for tensorflow <= 2.15 and Python <= 3.11
 20
 21    .. deprecated:: 0.18.0
 22        Use an object helper or a serializer/loader combination instead.
 23    """
 24
 25    def __init__(self, model_dir):
 26        self.model_dir = model_dir
 27        if tf.__version__.startswith('1'):
 28            self.__init_tf_v1()
 29        else:
 30            self.__init__tf_v2()
 31
 32    def __init__tf_v2(self):
 33        imported = tf.saved_model.load(self.model_dir)
 34        if callable(imported):
 35            self.predict_fn = imported
 36        else:
 37            self.predict_fn = imported.signatures["serving_default"]
 38        self.inputs = imported.signatures["serving_default"].inputs
 39        self.outputs = imported.signatures["serving_default"].outputs
 40        self._convert_to_model_input = self._convert_to_model_input_v2
 41        self._convert_to_model_output = self._convert_to_model_output_v2
 42
 43    def __init_tf_v1(self):
 44        from tensorflow.contrib import predictor
 45        self.predict_fn = predictor.from_saved_model(self.model_dir)
 46        self.input_names = list(self.predict_fn.feed_tensors.keys())
 47        self.output_names = list(self.predict_fn.fetch_tensors.keys())
 48        self._convert_to_model_input = self._convert_to_model_input_v1
 49        self._convert_to_model_output = self._convert_to_model_output_v1
 50
 51    def _convert_to_model_input_v1(self, X):
 52        # coerce input into expected feature mapping
 53        model_input = {
 54            self.input_names[0]: X
 55        }
 56        return model_input
 57
 58    def _convert_to_model_input_v2(self, X):
 59        # coerce input into expected feature mapping
 60        from omegaml.backends.tensorflow import _tffn
 61        return _tffn('convert_to_tensor')(X,
 62                                          name=self.inputs[0].name,
 63                                          dtype=self.inputs[0].dtype)
 64
 65    def _convert_to_model_output_v1(self, yhat):
 66        # coerce output into dict or array-like response
 67        if len(self.output_names) == 1:
 68            yhat = yhat[self.output_names[0]]
 69        return yhat
 70
 71    def _convert_to_model_output_v2(self, yhat):
 72        # coerce output into dict or array-like response
 73        return yhat
 74
 75    def predict(self, X):
 76        yhat = self.predict_fn(self._convert_to_model_input(X))
 77        return self._convert_to_model_output(yhat)
 78
 79
[docs] 80class TensorflowSavedModelBackend(BaseModelBackend): 81 KIND = 'tf.savedmodel' 82 _model_ext = 'tfsm' 83
[docs] 84 @classmethod 85 def supports(self, obj, name, **kwargs): 86 import tensorflow as tf 87 return isinstance(obj, (tf.estimator.Estimator, tf.compat.v1.estimator.Estimator))
88 89 def _package_model(self, model, key, tmpfn, serving_input_fn=None, 90 strip_default_attrs=None, **kwargs): 91 export_dir_base = self._make_savedmodel(model, serving_input_receiver_fn=serving_input_fn, 92 strip_default_attrs=strip_default_attrs) 93 zipfname = self._package_savedmodel(export_dir_base, key) 94 rmtree(export_dir_base) 95 return zipfname 96 97 def _extract_model(self, infile, key, tmpfn, **kwargs): 98 with open(tmpfn, 'wb') as pkgfn: 99 pkgfn.write(infile.read()) 100 model = self._extract_savedmodel(tmpfn) 101 return model 102 103 def _package_savedmodel(self, export_base_dir, filename): 104 fname = os.path.basename(filename) 105 zipfname = os.path.join(self.model_store.tmppath, fname) 106 # check if we have an intermediate directory (timestamp) 107 # as in export_base_dir/<timestamp>, if so, use this as the base directory 108 # see https://www.tensorflow.org/guide/saved_model#perform_the_export 109 # we need this check because not all SavedModel exports create a timestamp 110 # directory. e.g. keras.save_keras_model() does not, while Estimator.export_saved_model does 111 files = glob.glob(os.path.join(export_base_dir, '*')) 112 if len(files) == 1: 113 export_base_dir = files[0] 114 with ZipFile(zipfname, 'w', compression=ZIP_DEFLATED) as zipf: 115 for part in glob.glob(os.path.join(export_base_dir, '**'), recursive=True): 116 zipf.write(part, os.path.relpath(part, export_base_dir)) 117 return zipfname 118 119 def _extract_savedmodel(self, packagefname): 120 lpath = tempfile.mkdtemp() 121 fname = os.path.basename(packagefname) 122 mklfname = os.path.join(lpath, fname) 123 with ZipFile(packagefname) as zipf: 124 zipf.extractall(lpath) 125 model = TensorflowSavedModelPredictor(lpath) 126 rmtree(lpath) 127 return model 128 129 def _make_savedmodel(self, obj, serving_input_receiver_fn=None, strip_default_attrs=None): 130 # adapted from https://www.tensorflow.org/guide/saved_model#perform_the_export 131 export_dir_base = tempfile.mkdtemp() 132 obj.export_savedmodel(export_dir_base, 133 serving_input_receiver_fn=serving_input_receiver_fn, 134 strip_default_attrs=strip_default_attrs) 135 return export_dir_base 136
[docs] 137 def predict( 138 self, modelname, Xname, rName=None, pure_python=True, **kwargs): 139 """ 140 Predict from a SavedModel 141 142 Args: 143 modelname: 144 Xname: 145 rName: 146 pure_python: 147 kwargs: 148 149 Returns: 150 151 """ 152 model = self.get_model(modelname) 153 X = self._resolve_input_data('predict', Xname, 'X', **kwargs) 154 result = model.predict(X) 155 156 def ensure_serializable(data): 157 # convert to numpy 158 if isinstance(data, dict): 159 for k, v in data.items(): 160 data[k] = ensure_serializable(v) 161 elif isinstance(data, EagerTensor): 162 data = data.numpy() 163 if pure_python: 164 data = data.tolist() 165 return data 166 167 result = ensure_serializable(result) 168 return self._prepare_result('predict', result, rName=rName, pure_python=pure_python, **kwargs)
169
[docs] 170 def fit(self, modelname, Xname, Yname=None, pure_python=True, tpu_specs=None, **kwargs): 171 raise ValueError('cannot fit a saved model')
172 173 174class ServingInput(object): 175 # FIXME this is not working yet 176 def __init__(self, model=None, features=None, like=None, shape=None, dtype=None, 177 batchsize=1, from_keras=False, v1_compat=False): 178 """ 179 Helper to create serving_input_fn 180 181 Uses tf.build_raw_serving_input_receiver_fn to build a ServingInputReceiver 182 from the given inputs 183 184 Usage: 185 # use existing ndarray e.g. training or test data to specify a single input feature 186 ServingInput(features=['x'], like=ndarray) 187 188 # specify the dtype and shape explicitely 189 ServingInput(features=['x'], shape=(1, 28, 28)) 190 191 # use multiple features 192 ServingInput(features={'f1': tf.Feature(...)) 193 194 # for tf.keras models turned estimator, specify from_keras 195 # to ensure the input features are renamed correctly. 196 ServingInput(features=['x'], like=ndarray, from_keras=True) 197 198 Args: 199 model: 200 features: 201 like: 202 shape: 203 dtype: 204 batchsize: 205 from_keras: 206 """ 207 self.model = model 208 self.features = features or ['X'] 209 self.like = like 210 self.shape = shape 211 self.dtype = dtype 212 self.batchsize = batchsize 213 self.from_keras = from_keras 214 self.v1_compat = v1_compat 215 216 def build(self): 217 if isinstance(self.features, dict): 218 input_fn = self.from_features() 219 elif isinstance(self.like, np.ndarray): 220 shape = tuple((self.batchsize, *self.like.shape[1:])) # assume (rows, *cols) 221 input_fn = self.from_ndarray(shape, self.like.dtype) 222 elif isinstance(self.shape, (list, tuple, np.ndarray)): 223 input_fn = self.from_ndarray(self.shape, self.dtype) 224 return input_fn 225 226 def __call__(self): 227 input_fn = self.build() 228 return input_fn() 229 230 @property 231 def tf(self): 232 if self.v1_compat: 233 # https://www.tensorflow.org/guide/migrate 234 import tensorflow.compat.v1 as tf 235 tf.disable_v2_behavior() 236 else: 237 import tensorflow as tf 238 return tf 239 240 def from_features(self): 241 tf = self.tf 242 input_fn = tf.estimator.export.build_raw_serving_input_receiver_fn( 243 self.features, 244 default_batch_size=self.batchsize 245 ) 246 return input_fn 247 248 def from_ndarray(self, shape, dtype): 249 tf = self.tf 250 if self.from_keras: 251 input_layer_name = '{}_input'.format(self.features[0]) 252 else: 253 input_layer_name = self.features[0] 254 if self.v1_compat: 255 features = { 256 input_layer_name: tf.placeholder(dtype=dtype, shape=shape, ) 257 } 258 else: 259 features = { 260 input_layer_name: tf.TensorSpec(shape=shape, dtype=dtype) 261 } 262 input_fn = tf.estimator.export.build_raw_serving_input_receiver_fn( 263 features, 264 default_batch_size=None 265 ) 266 return input_fn 267 268 def from_dataframe(self, columns, input_layer_name='X', 269 batch_size=1, dtype=np.float32): 270 def serving_input_fn(): 271 import tensorflow as tf 272 ndim = len(columns) 273 X_name = '{}_input'.format(input_layer_name) 274 placeholder = tf.placeholder(dtype=np.float32, 275 shape=(batch_size, ndim), 276 name=X_name) 277 receiver_tensors = {X_name: placeholder} 278 features = {X_name: placeholder} 279 return tf.estimator.export.ServingInputReceiver(features, receiver_tensors)