1import builtins
2import sys
3import types
4import warnings
5
6import dill
7
8from omegaml.backends.basedata import BaseDataBackend
9from omegaml.util import tryOr
10
11
[docs]
12class VirtualObjectBackend(BaseDataBackend):
13 """
14 Support arbitrary functions as object handlers
15
16 Virtual object functions can be any callable that provides a __omega_virtual__
17 attribute. The callable must support the following signature::
18
19 @virtualobj
20 def virtualobjfn(data=None, method='get|put|drop',
21 meta=None, store=None, **kwargs):
22 ...
23 return data
24
25 Note that there is a distinction between storing the function as a virtual object,
26 and passing data in or getting data out of the store. It is the responsibility
27 of the function to implement the appropriate code for get, put, drop, as well as
28 to keep track of the data it actually stores.
29
30 As a convenience virtual object handlers can be implemented as a subclass of
31 VirtualObjectHandler
32
33 Usage::
34
35 1) as a virtual data handler
36
37 # create the 'foo' virtual object
38 om.datasets.put(virtualobjfn, 'foo')
39
40 # get data from the virtualobj
41 om.datasets.get('foo')
42 => will call virtualobjfn(method='get')
43
44 # put data into the virtualobj
45 om.datasets.put(data, 'foo')
46 => will call virtualobjfn(data=data, method='put')
47
48 # drop the virtualfn
49 om.datasets.drop('name')
50 => will call virtualobjfn(method='drop') and then
51 drop the virtual object completely from the storage
52
53 2) as a virtual model
54
55 # create the mymodel model as a virtualobj
56 om.models.put(virtualobjfn, 'mymodel')
57
58 # run the model's predict() function
59 om.runtime.model('mymodel').predict(X)
60 => will call virtualobjfn(method='predict')
61
62 3) as a virtual script
63
64 # create the myscript script as a virtualobj
65 om.models.put(virtualobjfn, 'myscript')
66
67 # run the script
68 om.runtime.script('myscript').run()
69 => will call virtualobjfn(method='run')
70
71 WARNING:
72
73 Virtual objects are executed in the address space of the client or
74 runtime context. Make sure that the source of the code is trustworthy.
75 Note that this is different from Backends and Mixins as these are
76 pro-actively enabled by the administrator of the client or runtime
77 context, respectively - virtual objects can be injected by anyone
78 who are authorized to write data.
79 """
80 # TODO split VirtualObjectBackend into VirtualModelBackend and VirtualDataBackend
81 # to avoid confusion between the two (currently the same class is used for both)
82 KIND = 'virtualobj.dill'
83 PROMOTE = 'export'
84
[docs]
85 @classmethod
86 def supports(self, obj, name, **kwargs):
87 is_virtual = callable(obj) and getattr(obj, '_omega_virtual', False)
88 is_tool = isinstance(obj, types.FunctionType) and name.startswith('tools/')
89 return is_virtual or is_tool
90
91 @property
92 def _call_handler(self):
93 # the model store handles _pre and _post methods in self.perform()
94 return self.model_store
95
[docs]
96 def put(self, obj, name, attributes=None, dill_kwargs=None, as_source=False, **kwargs):
97 # TODO add obj signing so that only trustworthy sources can put functions
98 # since 0.15.6: only __main__ objects are stored as bytecodes,
99 # all module code is stored as source code. This
100 # removes the dependency on opcode parity between client
101 # and server. Source objects are compiled into __main__
102 # within the runtime. This is a tradeoff compatibility
103 # v.s. execution time. Use as_source=False to force
104 # storing bytecodes.
105 data = dilldip.dumps(obj, as_source=as_source, **(dill_kwargs or {}))
106 filename = self.model_store.object_store_key(name, '.dill', hashed=True)
107 gridfile = self._store_to_file(self.model_store, data, filename)
108 return self.model_store._make_metadata(
109 name=name,
110 prefix=self.model_store.prefix,
111 bucket=self.model_store.bucket,
112 kind=self.KIND,
113 attributes=attributes,
114 gridfile=gridfile).save()
115
[docs]
116 def get(self, name, version=-1, force_python=False, lazy=False, **kwargs):
117 meta = self.model_store.metadata(name)
118 outf = meta.gridfile
119 data = outf.read()
120 obj = dilldip.loads(data)
121 outf.close()
122 return self._ensure_handler_instance(obj)
123
124 def _ensure_handler_instance(self, obj):
125 # ensure VirtualObjectHandler classes are transformed to a virtualobj
126 return obj() if isinstance(obj, type) and issubclass(obj, VirtualObjectHandler) else obj
127
128 def predict(self, modelname, xName, rName=None, **kwargs):
129 # make this work as a model backend too
130 meta = self.model_store.metadata(modelname)
131 handler = self._ensure_handler_instance(self.get(modelname))
132 X = self.data_store.get(xName)
133 return handler(method='predict', data=X, meta=meta, store=self.model_store, rName=rName,
134 tracking=self.tracking, **kwargs)
135
136 def fit(self, modelname, xName, yName=None, rName=None, **kwargs):
137 # make this work as a model backend too
138 meta = self.model_store.metadata(modelname)
139 handler = self._ensure_handler_instance(self.get(modelname))
140 X = self.data_store.get(xName)
141 y = self.data_store.get(yName) if yName else None
142 return handler(method='fit', data=(X, y), meta=meta, store=self.model_store, rName=rName,
143 tracking=self.tracking, **kwargs)
144
145 def score(self, modelname, xName, yName=None, rName=None, **kwargs):
146 # make this work as a model backend too
147 meta = self.model_store.metadata(modelname)
148 handler = self._ensure_handler_instance(self.get(modelname))
149 X = self.data_store.get(xName)
150 y = self.data_store.get(yName) if yName else None
151 return handler(method='score', data=(X, y), meta=meta, store=self.model_store, rName=rName,
152 tracking=self.tracking, **kwargs)
153
154 def run(self, scriptname, *args, **kwargs):
155 # run as a script
156 meta = self.model_store.metadata(scriptname)
157 handler = self._ensure_handler_instance(self.get(scriptname))
158 data = args[0] if args else None
159 kwargs['args'] = args
160 return handler(method='run', data=data, meta=meta, store=self.data_store, tracking=self.tracking, **kwargs)
161
[docs]
162 def reduce(self, modelname, results, rName=None, **kwargs):
163 """
164 reduce a list of results to a single result
165
166 Use this as the last step in a task canvas
167
168 Args:
169 modelname (str): the name of the virtualobj
170 results (list): the list of results forwarded by task canvas
171 rName (result): the name of the result object
172 **kwargs:
173
174 Returns:
175 result of the virtualobj handler
176
177 See Also
178 om.runtime.mapreduce
179 """
180 meta = self.model_store.metadata(modelname)
181 handler = self._ensure_handler_instance(self.get(modelname))
182 return handler(method='reduce', data=results, meta=meta, store=self.model_store, rName=rName,
183 tracking=self.tracking, **kwargs)
184
185
186def virtualobj(fn):
187 """
188 function decorator to create a virtual object handler from any
189 callable
190
191 Args:
192 fn: the virtual handler function
193
194 Usage:
195
196 .. code::
197
198 @virtualobj
199 def myvirtualobj(data=None, method=None, meta=None, store=None, **kwargs):
200 ...
201
202 See:
203 VirtualObjectBackend for details on virtual object handlers
204
205 Returns:
206 fn
207 """
208 setattr(fn, '_omega_virtual', True)
209 return fn
210
211
212class VirtualObjectHandler(object):
213 """
214 Object-oriented API for virtual object functions
215 """
216 _omega_virtual = True
217
218 def _vobj_call_map(self):
219 return {
220 'drop': self.drop,
221 'get': self.get,
222 'put': self.put,
223 'predict': self.predict,
224 'run': self.run,
225 }
226
227 def load(self):
228 pass
229
230 def get(self, data=None, meta=None, store=None, **kwargs):
231 raise NotImplementedError
232
233 def put(self, data=None, meta=None, store=None, **kwargs):
234 raise NotImplementedError
235
236 def drop(self, data=None, meta=None, store=None, **kwargs):
237 raise NotImplementedError
238
239 def predict(self, data=None, meta=None, store=None, **kwargs):
240 raise NotImplementedError
241
242 def run(self, data=None, meta=None, store=None, **kwargs):
243 raise NotImplementedError
244
245 def __call__(self, data=None, method=None, meta=None, store=None, tracking=None, **kwargs):
246 MAP = self._vobj_call_map()
247 methodfn = MAP[method]
248 self.load() # ensure loading of imports and other dependencies
249 return methodfn(data=data, meta=meta, store=store, tracking=tracking, **kwargs)
250
251
252class _DillDip:
253 # enhanced dill for functions and classes
254 # - warns about bound variables that could cause issues when undilling
255 # - checks types and functions are defined in __main__ before dilling (so we will code, not references),
256 # dynamically recompiles in __main__ before dilling if source is available
257 # - saves source code for improved python cross-version compatibility
258 # - falls back on standard dill for any other object type
259 # Rationale:
260 # - https://stackoverflow.com/questions/26389981/serialize-a-python-function-with-dependencies
261 # - https://github.com/uqfoundation/dill/issues/424#issuecomment-1598887257
262
263 # magic id for dipped dill objects
264 # -- why 0x1565? 15g of dill dip have 65 kcal
265 # -- see https://www.nutritionvalue.org/Dill_dip%2C_regular_12350210_nutritional_value.html
266 # -- also 42 is overused
267 __calories = 0x1565
268
269 def dumps(self, obj, as_source=False, **dill_kwargs):
270 # enhanced flavor of dill that stores source code for cross-version compatibility
271 # ensure we have a dill'able object
272 # if isinstance(obj, type):
273 # obj = obj()
274 self._check(obj)
275 data = (self._dill_main(obj, **dill_kwargs) or
276 self._dill_types_or_function(obj, as_source=as_source, **dill_kwargs) or
277 self._dill_dill(obj, **dill_kwargs))
278 return data
279
280 def loads(self, data):
281 # compat: Python 3.8.x < 3.8.2
282 # https://github.com/python/cpython/commit/b19f7ecfa3adc6ba1544225317b9473649815b38
283 # https://docs.python.org/3.8/whatsnew/changelog.html#python-3-8-2-final
284 try:
285 obj = self._dynamic_compile(dill.loads(data), module='__main__')
286 except ModuleNotFoundError as e:
287 # if the functions original module is not known, simulate it
288 # this is to deal with functions created outside of __main__
289 # see https://stackoverflow.com/q/26193102/890242
290 # https://stackoverflow.com/a/70513630/890242
291 mod = types.ModuleType(e.name, '__dynamic__')
292 sys.modules[e.name] = mod # sys.modules['__main__']
293 obj = dill.loads(data)
294 return obj
295
296 def _check(self, obj):
297 # check for freevars
298 freevars = dill.detect.nestedglobals(obj)
299 freevars += list(dill.detect.freevars(obj).keys())
300 freevars += list(dill.detect.referredglobals(obj))
301 freevars = [n for n in set(freevars) if n not in dir(builtins)]
302 if len(freevars):
303 warnings.warn(
304 f'The {repr(obj)} module references {freevars}, this may lead to errors at runtime; import/declare all variables within method/function scope')
305
306 def _dill_dill(self, obj, **dill_kwargs):
307 # fallback to standard dill
308 # e.g. class instances cannot be dumped unless they come from __main__
309 return dill.dumps(obj, **dill_kwargs)
310
311 def _dill_main(self, obj, **dill_kwargs):
312 # dynamic __main__ objects can be dilled directly, there is no source code
313 if dill.source.isfrommain(obj) or dill.source.isdynamic(obj):
314 return dill.dumps(obj, **dill_kwargs)
315 return None
316
317 def _dill_types_or_function(self, obj, as_source=False, **dill_kwargs):
318 # classes or functions should be dilled as source, unless they come from __main__
319 if isinstance(obj, type) or isinstance(obj, types.FunctionType):
320 return self._dill_source(obj, as_source=as_source, **dill_kwargs)
321 return None
322
323 def _dill_source(self, obj, as_source=False, **dill_kwargs):
324 # include source code along dill
325 try:
326 source = dill.source.getsource(obj, lstrip=True)
327 source_obj = {'__dipped__': self.__calories,
328 'source': ''.join(source),
329 'name': getattr(obj, '__name__'),
330 '__dict__': getattr(obj, '__dict__', {})}
331 except:
332 source_obj = {}
333 else:
334 # check obvious references in source
335 if '__main__' in source_obj.get('source', []):
336 warnings.warn(f'The {repr(obj)} references __main__, this may lead to unexpected results')
337 if as_source and source_obj:
338 # if source code was requested, transport as source code
339 data = dill.dumps(source_obj, **dill_kwargs)
340 elif source_obj and dill.detect.getmodule(obj) != '__main__':
341 # we have a source obj, make sure we can dill it and have source to revert from
342 # compile to __main__ module to enable full serialization
343 warnings.warn(f'The {repr(obj)} is defined outside of __main__, recompiling in __main__.')
344 obj = self._dynamic_compile(source_obj, module='__main__')
345 source_obj['dill'] = dill.dumps(obj, **dill_kwargs)
346 data = dill.dumps(source_obj, **dill_kwargs)
347 else:
348 # we have no source object, revert to standard dill
349 if as_source:
350 warnings.warn(f'Cannot save {repr(obj)} as source code, reverting to dill')
351 # could not get source code, revert to dill
352 data = dill.dumps(obj, **dill_kwargs)
353 return data
354
355 def _dynamic_compile(self, obj, module='__main__'):
356 from omegaml.backends.genai.models import GenAIModelHandler, virtual_genai
357 # re-compile source obj in __main__
358 if self.isdipped(obj):
359 if 'dill' in obj:
360 try:
361 obj = dill.loads(obj['dill'])
362 except:
363 warnings.warn('could not undill, reverting to dynamic compile source code')
364 else:
365 return obj
366 source, data = obj.get('source'), obj.get('__dict__', {})
367 mod = types.ModuleType(module)
368 mod.__dict__.update({'__compiling__': True,
369 'virtualobj': virtualobj,
370 'virtual_genai': virtual_genai,
371 'VirtualObjectHandler': VirtualObjectHandler,
372 'GenAIModelHandler': GenAIModelHandler})
373 sys.modules[module] = mod
374 code = compile(source, '<string>', 'exec')
375 exec(code, mod.__dict__)
376 obj = getattr(mod, obj['name'])
377 # restore instance data, if any
378 try:
379 getattr(obj, '__dict__', {}).update(data)
380 except AttributeError:
381 # we ignore attribute errors on class types
382 if not isinstance(obj, type):
383 warnings.warn(f'could not restore instance data for {obj}')
384 return obj
385
386 def isdipped(self, data_or_obj):
387 obj = tryOr(lambda: dill.loads(data_or_obj), None) if not isinstance(data_or_obj, dict) else data_or_obj
388 return isinstance(obj, dict) and obj.get('__dipped__') == self.__calories
389
390
391dilldip = _DillDip()
392# enable recursive tracing of globals
393# -- fixes https://github.com/uqfoundation/dill/issues/255
394# -- see also https://github.com/uqfoundation/dill/issues/537
395# -- the issue only happens when the openai module is loaded (root cause unknown)
396dill.settings['recurse'] = True