1import glob
2import os
3import tempfile
4from shutil import rmtree
5from zipfile import ZipFile, ZIP_DEFLATED
6
7import numpy as np
8import tensorflow as tf
9from tensorflow.python.framework.ops import EagerTensor
10
11from omegaml.backends.basemodel import BaseModelBackend
12
13
14class TensorflowSavedModelPredictor(object):
15 """
16 A predictor model from a TF SavedModel
17
18 .. versionchanged:: 0.18.0
19 Only supported for tensorflow <= 2.15 and Python <= 3.11
20
21 .. deprecated:: 0.18.0
22 Use an object helper or a serializer/loader combination instead.
23 """
24
25 def __init__(self, model_dir):
26 self.model_dir = model_dir
27 if tf.__version__.startswith('1'):
28 self.__init_tf_v1()
29 else:
30 self.__init__tf_v2()
31
32 def __init__tf_v2(self):
33 imported = tf.saved_model.load(self.model_dir)
34 if callable(imported):
35 self.predict_fn = imported
36 else:
37 self.predict_fn = imported.signatures["serving_default"]
38 self.inputs = imported.signatures["serving_default"].inputs
39 self.outputs = imported.signatures["serving_default"].outputs
40 self._convert_to_model_input = self._convert_to_model_input_v2
41 self._convert_to_model_output = self._convert_to_model_output_v2
42
43 def __init_tf_v1(self):
44 from tensorflow.contrib import predictor
45 self.predict_fn = predictor.from_saved_model(self.model_dir)
46 self.input_names = list(self.predict_fn.feed_tensors.keys())
47 self.output_names = list(self.predict_fn.fetch_tensors.keys())
48 self._convert_to_model_input = self._convert_to_model_input_v1
49 self._convert_to_model_output = self._convert_to_model_output_v1
50
51 def _convert_to_model_input_v1(self, X):
52 # coerce input into expected feature mapping
53 model_input = {
54 self.input_names[0]: X
55 }
56 return model_input
57
58 def _convert_to_model_input_v2(self, X):
59 # coerce input into expected feature mapping
60 from omegaml.backends.tensorflow import _tffn
61 return _tffn('convert_to_tensor')(X,
62 name=self.inputs[0].name,
63 dtype=self.inputs[0].dtype)
64
65 def _convert_to_model_output_v1(self, yhat):
66 # coerce output into dict or array-like response
67 if len(self.output_names) == 1:
68 yhat = yhat[self.output_names[0]]
69 return yhat
70
71 def _convert_to_model_output_v2(self, yhat):
72 # coerce output into dict or array-like response
73 return yhat
74
75 def predict(self, X):
76 yhat = self.predict_fn(self._convert_to_model_input(X))
77 return self._convert_to_model_output(yhat)
78
79
[docs]
80class TensorflowSavedModelBackend(BaseModelBackend):
81 KIND = 'tf.savedmodel'
82 _model_ext = 'tfsm'
83
[docs]
84 @classmethod
85 def supports(self, obj, name, **kwargs):
86 import tensorflow as tf
87 return isinstance(obj, (tf.estimator.Estimator, tf.compat.v1.estimator.Estimator))
88
89 def _package_model(self, model, key, tmpfn, serving_input_fn=None,
90 strip_default_attrs=None, **kwargs):
91 export_dir_base = self._make_savedmodel(model, serving_input_receiver_fn=serving_input_fn,
92 strip_default_attrs=strip_default_attrs)
93 zipfname = self._package_savedmodel(export_dir_base, key)
94 rmtree(export_dir_base)
95 return zipfname
96
97 def _extract_model(self, infile, key, tmpfn, **kwargs):
98 with open(tmpfn, 'wb') as pkgfn:
99 pkgfn.write(infile.read())
100 model = self._extract_savedmodel(tmpfn)
101 return model
102
103 def _package_savedmodel(self, export_base_dir, filename):
104 fname = os.path.basename(filename)
105 zipfname = os.path.join(self.model_store.tmppath, fname)
106 # check if we have an intermediate directory (timestamp)
107 # as in export_base_dir/<timestamp>, if so, use this as the base directory
108 # see https://www.tensorflow.org/guide/saved_model#perform_the_export
109 # we need this check because not all SavedModel exports create a timestamp
110 # directory. e.g. keras.save_keras_model() does not, while Estimator.export_saved_model does
111 files = glob.glob(os.path.join(export_base_dir, '*'))
112 if len(files) == 1:
113 export_base_dir = files[0]
114 with ZipFile(zipfname, 'w', compression=ZIP_DEFLATED) as zipf:
115 for part in glob.glob(os.path.join(export_base_dir, '**'), recursive=True):
116 zipf.write(part, os.path.relpath(part, export_base_dir))
117 return zipfname
118
119 def _extract_savedmodel(self, packagefname):
120 lpath = tempfile.mkdtemp()
121 fname = os.path.basename(packagefname)
122 mklfname = os.path.join(lpath, fname)
123 with ZipFile(packagefname) as zipf:
124 zipf.extractall(lpath)
125 model = TensorflowSavedModelPredictor(lpath)
126 rmtree(lpath)
127 return model
128
129 def _make_savedmodel(self, obj, serving_input_receiver_fn=None, strip_default_attrs=None):
130 # adapted from https://www.tensorflow.org/guide/saved_model#perform_the_export
131 export_dir_base = tempfile.mkdtemp()
132 obj.export_savedmodel(export_dir_base,
133 serving_input_receiver_fn=serving_input_receiver_fn,
134 strip_default_attrs=strip_default_attrs)
135 return export_dir_base
136
[docs]
137 def predict(
138 self, modelname, Xname, rName=None, pure_python=True, **kwargs):
139 """
140 Predict from a SavedModel
141
142 Args:
143 modelname:
144 Xname:
145 rName:
146 pure_python:
147 kwargs:
148
149 Returns:
150
151 """
152 model = self.get_model(modelname)
153 X = self._resolve_input_data('predict', Xname, 'X', **kwargs)
154 result = model.predict(X)
155
156 def ensure_serializable(data):
157 # convert to numpy
158 if isinstance(data, dict):
159 for k, v in data.items():
160 data[k] = ensure_serializable(v)
161 elif isinstance(data, EagerTensor):
162 data = data.numpy()
163 if pure_python:
164 data = data.tolist()
165 return data
166
167 result = ensure_serializable(result)
168 return self._prepare_result('predict', result, rName=rName, pure_python=pure_python, **kwargs)
169
[docs]
170 def fit(self, modelname, Xname, Yname=None, pure_python=True, tpu_specs=None, **kwargs):
171 raise ValueError('cannot fit a saved model')
172
173
174class ServingInput(object):
175 # FIXME this is not working yet
176 def __init__(self, model=None, features=None, like=None, shape=None, dtype=None,
177 batchsize=1, from_keras=False, v1_compat=False):
178 """
179 Helper to create serving_input_fn
180
181 Uses tf.build_raw_serving_input_receiver_fn to build a ServingInputReceiver
182 from the given inputs
183
184 Usage:
185 # use existing ndarray e.g. training or test data to specify a single input feature
186 ServingInput(features=['x'], like=ndarray)
187
188 # specify the dtype and shape explicitely
189 ServingInput(features=['x'], shape=(1, 28, 28))
190
191 # use multiple features
192 ServingInput(features={'f1': tf.Feature(...))
193
194 # for tf.keras models turned estimator, specify from_keras
195 # to ensure the input features are renamed correctly.
196 ServingInput(features=['x'], like=ndarray, from_keras=True)
197
198 Args:
199 model:
200 features:
201 like:
202 shape:
203 dtype:
204 batchsize:
205 from_keras:
206 """
207 self.model = model
208 self.features = features or ['X']
209 self.like = like
210 self.shape = shape
211 self.dtype = dtype
212 self.batchsize = batchsize
213 self.from_keras = from_keras
214 self.v1_compat = v1_compat
215
216 def build(self):
217 if isinstance(self.features, dict):
218 input_fn = self.from_features()
219 elif isinstance(self.like, np.ndarray):
220 shape = tuple((self.batchsize, *self.like.shape[1:])) # assume (rows, *cols)
221 input_fn = self.from_ndarray(shape, self.like.dtype)
222 elif isinstance(self.shape, (list, tuple, np.ndarray)):
223 input_fn = self.from_ndarray(self.shape, self.dtype)
224 return input_fn
225
226 def __call__(self):
227 input_fn = self.build()
228 return input_fn()
229
230 @property
231 def tf(self):
232 if self.v1_compat:
233 # https://www.tensorflow.org/guide/migrate
234 import tensorflow.compat.v1 as tf
235 tf.disable_v2_behavior()
236 else:
237 import tensorflow as tf
238 return tf
239
240 def from_features(self):
241 tf = self.tf
242 input_fn = tf.estimator.export.build_raw_serving_input_receiver_fn(
243 self.features,
244 default_batch_size=self.batchsize
245 )
246 return input_fn
247
248 def from_ndarray(self, shape, dtype):
249 tf = self.tf
250 if self.from_keras:
251 input_layer_name = '{}_input'.format(self.features[0])
252 else:
253 input_layer_name = self.features[0]
254 if self.v1_compat:
255 features = {
256 input_layer_name: tf.placeholder(dtype=dtype, shape=shape, )
257 }
258 else:
259 features = {
260 input_layer_name: tf.TensorSpec(shape=shape, dtype=dtype)
261 }
262 input_fn = tf.estimator.export.build_raw_serving_input_receiver_fn(
263 features,
264 default_batch_size=None
265 )
266 return input_fn
267
268 def from_dataframe(self, columns, input_layer_name='X',
269 batch_size=1, dtype=np.float32):
270 def serving_input_fn():
271 import tensorflow as tf
272 ndim = len(columns)
273 X_name = '{}_input'.format(input_layer_name)
274 placeholder = tf.placeholder(dtype=np.float32,
275 shape=(batch_size, ndim),
276 name=X_name)
277 receiver_tensors = {X_name: placeholder}
278 features = {X_name: placeholder}
279 return tf.estimator.export.ServingInputReceiver(features, receiver_tensors)