1import builtins
2import dill
3import sys
4import types
5import warnings
6
7from omegaml.backends.basedata import BaseDataBackend
8from omegaml.util import tryOr
9
10
[docs]
11class VirtualObjectBackend(BaseDataBackend):
12 """
13 Support arbitrary functions as object handlers
14
15 Virtual object functions can be any callable that provides a __omega_virtual__
16 attribute. The callable must support the following signature::
17
18 @virtualobj
19 def virtualobjfn(data=None, method='get|put|drop',
20 meta=None, store=None, **kwargs):
21 ...
22 return data
23
24 Note that there is a distinction between storing the function as a virtual object,
25 and passing data in or getting data out of the store. It is the responsibility
26 of the function to implement the appropriate code for get, put, drop, as well as
27 to keep track of the data it actually stores.
28
29 As a convenience virtual object handlers can be implemented as a subclass of
30 VirtualObjectHandler
31
32 Usage::
33
34 1) as a virtual data handler
35
36 # create the 'foo' virtual object
37 om.datasets.put(virtualobjfn, 'foo')
38
39 # get data from the virtualobj
40 om.datasets.get('foo')
41 => will call virtualobjfn(method='get')
42
43 # put data into the virtualobj
44 om.datasets.put(data, 'foo')
45 => will call virtualobjfn(data=data, method='put')
46
47 # drop the virtualfn
48 om.datasets.drop('name')
49 => will call virtualobjfn(method='drop') and then
50 drop the virtual object completely from the storage
51
52 2) as a virtual model
53
54 # create the mymodel model as a virtualobj
55 om.models.put(virtualobjfn, 'mymodel')
56
57 # run the model's predict() function
58 om.runtime.model('mymodel').predict(X)
59 => will call virtualobjfn(method='predict')
60
61 3) as a virtual script
62
63 # create the myscript script as a virtualobj
64 om.models.put(virtualobjfn, 'myscript')
65
66 # run the script
67 om.runtime.script('myscript').run()
68 => will call virtualobjfn(method='run')
69
70 WARNING:
71
72 Virtual objects are executed in the address space of the client or
73 runtime context. Make sure that the source of the code is trustworthy.
74 Note that this is different from Backends and Mixins as these are
75 pro-actively enabled by the administrator of the client or runtime
76 context, respectively - virtual objects can be injected by anyone
77 who are authorized to write data.
78 """
79 # TODO split VirtualObjectBackend into VirtualModelBackend and VirtualDataBackend
80 # to avoid confusion between the two (currently the same class is used for both)
81 KIND = 'virtualobj.dill'
82 PROMOTE = 'export'
83
[docs]
84 @classmethod
85 def supports(self, obj, name, **kwargs):
86 is_virtual = callable(obj) and getattr(obj, '_omega_virtual', False)
87 is_tool = isinstance(obj, types.FunctionType) and name.startswith('tools/')
88 return is_virtual or is_tool
89
90 @property
91 def _call_handler(self):
92 # the model store handles _pre and _post methods in self.perform()
93 return self.model_store
94
[docs]
95 def put(self, obj, name, attributes=None, dill_kwargs=None, as_source=False, **kwargs):
96 # TODO add obj signing so that only trustworthy sources can put functions
97 # since 0.15.6: only __main__ objects are stored as bytecodes,
98 # all module code is stored as source code. This
99 # removes the dependency on opcode parity between client
100 # and server. Source objects are compiled into __main__
101 # within the runtime. This is a tradeoff compatibility
102 # v.s. execution time. Use as_source=False to force
103 # storing bytecodes.
104 data = dilldip.dumps(obj, as_source=as_source, **(dill_kwargs or {}))
105 filename = self.model_store.object_store_key(name, '.dill', hashed=True)
106 gridfile = self._store_to_file(self.model_store, data, filename)
107 return self.model_store._make_metadata(
108 name=name,
109 prefix=self.model_store.prefix,
110 bucket=self.model_store.bucket,
111 kind=self.KIND,
112 attributes=attributes,
113 gridfile=gridfile).save()
114
[docs]
115 def get(self, name, version=-1, force_python=False, lazy=False, **kwargs):
116 meta = self.model_store.metadata(name)
117 outf = meta.gridfile
118 data = outf.read()
119 obj = dilldip.loads(data)
120 outf.close()
121 return self._ensure_handler_instance(obj)
122
123 def _ensure_handler_instance(self, obj):
124 # ensure VirtualObjectHandler classes are transformed to a virtualobj
125 return obj() if isinstance(obj, type) and issubclass(obj, VirtualObjectHandler) else obj
126
127 def predict(self, modelname, xName, rName=None, **kwargs):
128 # make this work as a model backend too
129 meta = self.model_store.metadata(modelname)
130 handler = self._ensure_handler_instance(self.get(modelname))
131 X = self.data_store.get(xName)
132 return handler(method='predict', data=X, meta=meta, store=self.model_store, rName=rName,
133 tracking=self.tracking, **kwargs)
134
135 def fit(self, modelname, xName, yName=None, rName=None, **kwargs):
136 # make this work as a model backend too
137 meta = self.model_store.metadata(modelname)
138 handler = self._ensure_handler_instance(self.get(modelname))
139 X = self.data_store.get(xName)
140 y = self.data_store.get(yName) if yName else None
141 return handler(method='fit', data=(X, y), meta=meta, store=self.model_store, rName=rName,
142 tracking=self.tracking, **kwargs)
143
144 def score(self, modelname, xName, yName=None, rName=None, **kwargs):
145 # make this work as a model backend too
146 meta = self.model_store.metadata(modelname)
147 handler = self._ensure_handler_instance(self.get(modelname))
148 X = self.data_store.get(xName)
149 y = self.data_store.get(yName) if yName else None
150 return handler(method='score', data=(X, y), meta=meta, store=self.model_store, rName=rName,
151 tracking=self.tracking, **kwargs)
152
153 def run(self, scriptname, *args, **kwargs):
154 # run as a script
155 meta = self.model_store.metadata(scriptname)
156 handler = self._ensure_handler_instance(self.get(scriptname))
157 data = args[0] if args else None
158 kwargs['args'] = args
159 return handler(method='run', data=data, meta=meta, store=self.data_store, tracking=self.tracking, **kwargs)
160
[docs]
161 def reduce(self, modelname, results, rName=None, **kwargs):
162 """
163 reduce a list of results to a single result
164
165 Use this as the last step in a task canvas
166
167 Args:
168 modelname (str): the name of the virtualobj
169 results (list): the list of results forwarded by task canvas
170 rName (result): the name of the result object
171 **kwargs:
172
173 Returns:
174 result of the virtualobj handler
175
176 See Also
177 om.runtime.mapreduce
178 """
179 meta = self.model_store.metadata(modelname)
180 handler = self._ensure_handler_instance(self.get(modelname))
181 return handler(method='reduce', data=results, meta=meta, store=self.model_store, rName=rName,
182 tracking=self.tracking, **kwargs)
183
184
185def virtualobj(fn):
186 """
187 function decorator to create a virtual object handler from any
188 callable
189
190 Args:
191 fn: the virtual handler function
192
193 Usage:
194
195 .. code::
196
197 @virtualobj
198 def myvirtualobj(data=None, method=None, meta=None, store=None, **kwargs):
199 ...
200
201 See:
202 VirtualObjectBackend for details on virtual object handlers
203
204 Returns:
205 fn
206 """
207 setattr(fn, '_omega_virtual', True)
208 return fn
209
210
211class VirtualObjectHandler(object):
212 """
213 Object-oriented API for virtual object functions
214 """
215 _omega_virtual = True
216
217 def _vobj_call_map(self):
218 return {
219 'drop': self.drop,
220 'get': self.get,
221 'put': self.put,
222 'predict': self.predict,
223 'run': self.run,
224 }
225
226 def load(self):
227 pass
228
229 def get(self, data=None, meta=None, store=None, **kwargs):
230 raise NotImplementedError
231
232 def put(self, data=None, meta=None, store=None, **kwargs):
233 raise NotImplementedError
234
235 def drop(self, data=None, meta=None, store=None, **kwargs):
236 raise NotImplementedError
237
238 def predict(self, data=None, meta=None, store=None, **kwargs):
239 raise NotImplementedError
240
241 def run(self, data=None, meta=None, store=None, **kwargs):
242 raise NotImplementedError
243
244 def __call__(self, data=None, method=None, meta=None, store=None, tracking=None, **kwargs):
245 MAP = self._vobj_call_map()
246 methodfn = MAP[method]
247 self.load() # ensure loading of imports and other dependencies
248 return methodfn(data=data, meta=meta, store=store, tracking=tracking, **kwargs)
249
250
251class _DillDip:
252 # enhanced dill for functions and classes
253 # - warns about bound variables that could cause issues when undilling
254 # - checks types and functions are defined in __main__ before dilling (so we will code, not references),
255 # dynamically recompiles in __main__ before dilling if source is available
256 # - saves source code for improved python cross-version compatibility
257 # - falls back on standard dill for any other object type
258 # Rationale:
259 # - https://stackoverflow.com/questions/26389981/serialize-a-python-function-with-dependencies
260 # - https://github.com/uqfoundation/dill/issues/424#issuecomment-1598887257
261
262 # magic id for dipped dill objects
263 # -- why 0x1565? 15g of dill dip have 65 kcal
264 # -- see https://www.nutritionvalue.org/Dill_dip%2C_regular_12350210_nutritional_value.html
265 # -- also 42 is overused
266 __calories = 0x1565
267
268 def dumps(self, obj, as_source=False, **dill_kwargs):
269 # enhanced flavor of dill that stores source code for cross-version compatibility
270 # ensure we have a dill'able object
271 # if isinstance(obj, type):
272 # obj = obj()
273 self._check(obj)
274 data = (self._dill_main(obj, **dill_kwargs) or
275 self._dill_types_or_function(obj, as_source=as_source, **dill_kwargs) or
276 self._dill_dill(obj, **dill_kwargs))
277 return data
278
279 def loads(self, data):
280 # compat: Python 3.8.x < 3.8.2
281 # https://github.com/python/cpython/commit/b19f7ecfa3adc6ba1544225317b9473649815b38
282 # https://docs.python.org/3.8/whatsnew/changelog.html#python-3-8-2-final
283 try:
284 obj = self._dynamic_compile(dill.loads(data), module='__main__')
285 except ModuleNotFoundError as e:
286 # if the functions original module is not known, simulate it
287 # this is to deal with functions created outside of __main__
288 # see https://stackoverflow.com/q/26193102/890242
289 # https://stackoverflow.com/a/70513630/890242
290 mod = types.ModuleType(e.name, '__dynamic__')
291 sys.modules[e.name] = mod # sys.modules['__main__']
292 obj = dill.loads(data)
293 return obj
294
295 def _check(self, obj):
296 # check for freevars
297 freevars = dill.detect.nestedglobals(obj)
298 freevars += list(dill.detect.freevars(obj).keys())
299 freevars += list(dill.detect.referredglobals(obj))
300 freevars = [n for n in set(freevars) if n not in dir(builtins)]
301 if len(freevars):
302 warnings.warn(
303 f'The {repr(obj)} module references {freevars}, this may lead to errors at runtime; import/declare all variables within method/function scope')
304
305 def _dill_dill(self, obj, **dill_kwargs):
306 # fallback to standard dill
307 # e.g. class instances cannot be dumped unless they come from __main__
308 return dill.dumps(obj, **dill_kwargs)
309
310 def _dill_main(self, obj, **dill_kwargs):
311 # dynamic __main__ objects can be dilled directly, there is no source code
312 if dill.source.isfrommain(obj) or dill.source.isdynamic(obj):
313 return dill.dumps(obj, **dill_kwargs)
314 return None
315
316 def _dill_types_or_function(self, obj, as_source=False, **dill_kwargs):
317 # classes or functions should be dilled as source, unless they come from __main__
318 if isinstance(obj, type) or isinstance(obj, types.FunctionType):
319 return self._dill_source(obj, as_source=as_source, **dill_kwargs)
320 return None
321
322 def _dill_source(self, obj, as_source=False, **dill_kwargs):
323 # include source code along dill
324 try:
325 source = dill.source.getsource(obj, lstrip=True)
326 source_obj = {'__dipped__': self.__calories,
327 'source': ''.join(source),
328 'name': getattr(obj, '__name__'),
329 '__dict__': getattr(obj, '__dict__', {})}
330 except:
331 source_obj = {}
332 else:
333 # check obvious references in source
334 if '__main__' in source_obj.get('source', []):
335 warnings.warn(f'The {repr(obj)} references __main__, this may lead to unexpected results')
336 if as_source and source_obj:
337 # if source code was requested, transport as source code
338 data = dill.dumps(source_obj, **dill_kwargs)
339 elif source_obj and dill.detect.getmodule(obj) != '__main__':
340 # we have a source obj, make sure we can dill it and have source to revert from
341 # compile to __main__ module to enable full serialization
342 warnings.warn(f'The {repr(obj)} is defined outside of __main__, recompiling in __main__.')
343 obj = self._dynamic_compile(source_obj, module='__main__')
344 source_obj['dill'] = dill.dumps(obj, **dill_kwargs)
345 data = dill.dumps(source_obj, **dill_kwargs)
346 else:
347 # we have no source object, revert to standard dill
348 if as_source:
349 warnings.warn(f'Cannot save {repr(obj)} as source code, reverting to dill')
350 # could not get source code, revert to dill
351 data = dill.dumps(obj, **dill_kwargs)
352 return data
353
354 def _dynamic_compile(self, obj, module='__main__'):
355 from omegaml.backends.genai.models import GenAIModelHandler, virtual_genai
356 # re-compile source obj in __main__
357 if self.isdipped(obj):
358 if 'dill' in obj:
359 try:
360 obj = dill.loads(obj['dill'])
361 except:
362 warnings.warn('could not undill, reverting to dynamic compile source code')
363 else:
364 return obj
365 source, data = obj.get('source'), obj.get('__dict__', {})
366 mod = types.ModuleType(module)
367 mod.__dict__.update({'__compiling__': True,
368 'virtualobj': virtualobj,
369 'virtual_genai': virtual_genai,
370 'VirtualObjectHandler': VirtualObjectHandler,
371 'GenAIModelHandler': GenAIModelHandler})
372 sys.modules[module] = mod
373 code = compile(source, '<string>', 'exec')
374 exec(code, mod.__dict__)
375 obj = getattr(mod, obj['name'])
376 # restore instance data, if any
377 try:
378 getattr(obj, '__dict__', {}).update(data)
379 except AttributeError:
380 # we ignore attribute errors on class types
381 if not isinstance(obj, type):
382 warnings.warn(f'could not restore instance data for {obj}')
383 return obj
384
385 def isdipped(self, data_or_obj):
386 obj = tryOr(lambda: dill.loads(data_or_obj), None) if not isinstance(data_or_obj, dict) else data_or_obj
387 return isinstance(obj, dict) and obj.get('__dipped__') == self.__calories
388
389
390dilldip = _DillDip()
391# enable recursive tracing of globals
392# -- fixes https://github.com/uqfoundation/dill/issues/255
393# -- see also https://github.com/uqfoundation/dill/issues/537
394# -- the issue only happens when the openai module is loaded (root cause unknown)
395dill.settings['recurse'] = True