1import glob
2import os
3import tempfile
4from shutil import rmtree
5from zipfile import ZipFile, ZIP_DEFLATED
6
7import numpy as np
8import tensorflow as tf
9from tensorflow.python.framework.ops import EagerTensor
10
11from omegaml.backends.basemodel import BaseModelBackend
12
13
14class TensorflowSavedModelPredictor(object):
15 """
16 A predictor model from a TF SavedModel
17 """
18
19 def __init__(self, model_dir):
20 self.model_dir = model_dir
21 if tf.__version__.startswith('1'):
22 self.__init_tf_v1()
23 else:
24 self.__init__tf_v2()
25
26 def __init__tf_v2(self):
27 imported = tf.saved_model.load(self.model_dir)
28 if callable(imported):
29 self.predict_fn = imported
30 else:
31 self.predict_fn = imported.signatures["serving_default"]
32 self.inputs = imported.signatures["serving_default"].inputs
33 self.outputs = imported.signatures["serving_default"].outputs
34 self._convert_to_model_input = self._convert_to_model_input_v2
35 self._convert_to_model_output = self._convert_to_model_output_v2
36
37 def __init_tf_v1(self):
38 from tensorflow.contrib import predictor
39 self.predict_fn = predictor.from_saved_model(self.model_dir)
40 self.input_names = list(self.predict_fn.feed_tensors.keys())
41 self.output_names = list(self.predict_fn.fetch_tensors.keys())
42 self._convert_to_model_input = self._convert_to_model_input_v1
43 self._convert_to_model_output = self._convert_to_model_output_v1
44
45 def _convert_to_model_input_v1(self, X):
46 # coerce input into expected feature mapping
47 model_input = {
48 self.input_names[0]: X
49 }
50 return model_input
51
52 def _convert_to_model_input_v2(self, X):
53 # coerce input into expected feature mapping
54 from omegaml.backends.tensorflow import _tffn
55 return _tffn('convert_to_tensor')(X,
56 name=self.inputs[0].name,
57 dtype=self.inputs[0].dtype)
58
59 def _convert_to_model_output_v1(self, yhat):
60 # coerce output into dict or array-like response
61 if len(self.output_names) == 1:
62 yhat = yhat[self.output_names[0]]
63 return yhat
64
65 def _convert_to_model_output_v2(self, yhat):
66 # coerce output into dict or array-like response
67 return yhat
68
69 def predict(self, X):
70 yhat = self.predict_fn(self._convert_to_model_input(X))
71 return self._convert_to_model_output(yhat)
72
73
[docs]
74class TensorflowSavedModelBackend(BaseModelBackend):
75 KIND = 'tf.savedmodel'
76 _model_ext = 'tfsm'
77
[docs]
78 @classmethod
79 def supports(self, obj, name, **kwargs):
80 import tensorflow as tf
81 return isinstance(obj, (tf.estimator.Estimator, tf.compat.v1.estimator.Estimator))
82
83 def _package_model(self, model, key, tmpfn, serving_input_fn=None,
84 strip_default_attrs=None, **kwargs):
85 export_dir_base = self._make_savedmodel(model, serving_input_receiver_fn=serving_input_fn,
86 strip_default_attrs=strip_default_attrs)
87 zipfname = self._package_savedmodel(export_dir_base, key)
88 rmtree(export_dir_base)
89 return zipfname
90
91 def _extract_model(self, infile, key, tmpfn, **kwargs):
92 with open(tmpfn, 'wb') as pkgfn:
93 pkgfn.write(infile.read())
94 model = self._extract_savedmodel(tmpfn)
95 return model
96
97 def _package_savedmodel(self, export_base_dir, filename):
98 fname = os.path.basename(filename)
99 zipfname = os.path.join(self.model_store.tmppath, fname)
100 # check if we have an intermediate directory (timestamp)
101 # as in export_base_dir/<timestamp>, if so, use this as the base directory
102 # see https://www.tensorflow.org/guide/saved_model#perform_the_export
103 # we need this check because not all SavedModel exports create a timestamp
104 # directory. e.g. keras.save_keras_model() does not, while Estimator.export_saved_model does
105 files = glob.glob(os.path.join(export_base_dir, '*'))
106 if len(files) == 1:
107 export_base_dir = files[0]
108 with ZipFile(zipfname, 'w', compression=ZIP_DEFLATED) as zipf:
109 for part in glob.glob(os.path.join(export_base_dir, '**'), recursive=True):
110 zipf.write(part, os.path.relpath(part, export_base_dir))
111 return zipfname
112
113 def _extract_savedmodel(self, packagefname):
114 lpath = tempfile.mkdtemp()
115 fname = os.path.basename(packagefname)
116 mklfname = os.path.join(lpath, fname)
117 with ZipFile(packagefname) as zipf:
118 zipf.extractall(lpath)
119 model = TensorflowSavedModelPredictor(lpath)
120 rmtree(lpath)
121 return model
122
123 def _make_savedmodel(self, obj, serving_input_receiver_fn=None, strip_default_attrs=None):
124 # adapted from https://www.tensorflow.org/guide/saved_model#perform_the_export
125 export_dir_base = tempfile.mkdtemp()
126 obj.export_savedmodel(export_dir_base,
127 serving_input_receiver_fn=serving_input_receiver_fn,
128 strip_default_attrs=strip_default_attrs)
129 return export_dir_base
130
[docs]
131 def predict(
132 self, modelname, Xname, rName=None, pure_python=True, **kwargs):
133 """
134 Predict from a SavedModel
135
136 Args:
137 modelname:
138 Xname:
139 rName:
140 pure_python:
141 kwargs:
142
143 Returns:
144
145 """
146 model = self.get_model(modelname)
147 X = self._resolve_input_data('predict', Xname, 'X', **kwargs)
148 result = model.predict(X)
149
150 def ensure_serializable(data):
151 # convert to numpy
152 if isinstance(data, dict):
153 for k, v in data.items():
154 data[k] = ensure_serializable(v)
155 elif isinstance(data, EagerTensor):
156 data = data.numpy()
157 if pure_python:
158 data = data.tolist()
159 return data
160
161 result = ensure_serializable(result)
162 return self._prepare_result('predict', result, rName=rName, pure_python=pure_python, **kwargs)
163
[docs]
164 def fit(self, modelname, Xname, Yname=None, pure_python=True, tpu_specs=None, **kwargs):
165 raise ValueError('cannot fit a saved model')
166
167
168class ServingInput(object):
169 # FIXME this is not working yet
170 def __init__(self, model=None, features=None, like=None, shape=None, dtype=None,
171 batchsize=1, from_keras=False, v1_compat=False):
172 """
173 Helper to create serving_input_fn
174
175 Uses tf.build_raw_serving_input_receiver_fn to build a ServingInputReceiver
176 from the given inputs
177
178 Usage:
179 # use existing ndarray e.g. training or test data to specify a single input feature
180 ServingInput(features=['x'], like=ndarray)
181
182 # specify the dtype and shape explicitely
183 ServingInput(features=['x'], shape=(1, 28, 28))
184
185 # use multiple features
186 ServingInput(features={'f1': tf.Feature(...))
187
188 # for tf.keras models turned estimator, specify from_keras
189 # to ensure the input features are renamed correctly.
190 ServingInput(features=['x'], like=ndarray, from_keras=True)
191
192 Args:
193 model:
194 features:
195 like:
196 shape:
197 dtype:
198 batchsize:
199 from_keras:
200 """
201 self.model = model
202 self.features = features or ['X']
203 self.like = like
204 self.shape = shape
205 self.dtype = dtype
206 self.batchsize = batchsize
207 self.from_keras = from_keras
208 self.v1_compat = v1_compat
209
210 def build(self):
211 if isinstance(self.features, dict):
212 input_fn = self.from_features()
213 elif isinstance(self.like, np.ndarray):
214 shape = tuple((self.batchsize, *self.like.shape[1:])) # assume (rows, *cols)
215 input_fn = self.from_ndarray(shape, self.like.dtype)
216 elif isinstance(self.shape, (list, tuple, np.ndarray)):
217 input_fn = self.from_ndarray(self.shape, self.dtype)
218 return input_fn
219
220 def __call__(self):
221 input_fn = self.build()
222 return input_fn()
223
224 @property
225 def tf(self):
226 if self.v1_compat:
227 # https://www.tensorflow.org/guide/migrate
228 import tensorflow.compat.v1 as tf
229 tf.disable_v2_behavior()
230 else:
231 import tensorflow as tf
232 return tf
233
234 def from_features(self):
235 tf = self.tf
236 input_fn = tf.estimator.export.build_raw_serving_input_receiver_fn(
237 self.features,
238 default_batch_size=self.batchsize
239 )
240 return input_fn
241
242 def from_ndarray(self, shape, dtype):
243 tf = self.tf
244 if self.from_keras:
245 input_layer_name = '{}_input'.format(self.features[0])
246 else:
247 input_layer_name = self.features[0]
248 if self.v1_compat:
249 features = {
250 input_layer_name: tf.placeholder(dtype=dtype, shape=shape, )
251 }
252 else:
253 features = {
254 input_layer_name: tf.TensorSpec(shape=shape, dtype=dtype)
255 }
256 input_fn = tf.estimator.export.build_raw_serving_input_receiver_fn(
257 features,
258 default_batch_size=None
259 )
260 return input_fn
261
262 def from_dataframe(self, columns, input_layer_name='X',
263 batch_size=1, dtype=np.float32):
264 def serving_input_fn():
265 import tensorflow as tf
266 ndim = len(columns)
267 X_name = '{}_input'.format(input_layer_name)
268 placeholder = tf.placeholder(dtype=np.float32,
269 shape=(batch_size, ndim),
270 name=X_name)
271 receiver_tensors = {X_name: placeholder}
272 features = {X_name: placeholder}
273 return tf.estimator.export.ServingInputReceiver(features, receiver_tensors)