1import builtins
2import dill
3import sys
4import types
5import warnings
6
7from omegaml.backends.basedata import BaseDataBackend
8from omegaml.util import tryOr
9
10
[docs]
11class VirtualObjectBackend(BaseDataBackend):
12 """
13 Support arbitrary functions as object handlers
14
15 Virtual object functions can be any callable that provides a __omega_virtual__
16 attribute. The callable must support the following signature::
17
18 @virtualobj
19 def virtualobjfn(data=None, method='get|put|drop',
20 meta=None, store=None, **kwargs):
21 ...
22 return data
23
24 Note that there is a distinction between storing the function as a virtual object,
25 and passing data in or getting data out of the store. It is the responsibility
26 of the function to implement the appropriate code for get, put, drop, as well as
27 to keep track of the data it actually stores.
28
29 As a convenience virtual object handlers can be implemented as a subclass of
30 VirtualObjectHandler
31
32 Usage::
33
34 1) as a virtual data handler
35
36 # create the 'foo' virtual object
37 om.datasets.put(virtualobjfn, 'foo')
38
39 # get data from the virtualobj
40 om.datasets.get('foo')
41 => will call virtualobjfn(method='get')
42
43 # put data into the virtualobj
44 om.datasets.put(data, 'foo')
45 => will call virtualobjfn(data=data, method='put')
46
47 # drop the virtualfn
48 om.datasets.drop('name')
49 => will call virtualobjfn(method='drop') and then
50 drop the virtual object completely from the storage
51
52 2) as a virtual model
53
54 # create the mymodel model as a virtualobj
55 om.models.put(virtualobjfn, 'mymodel')
56
57 # run the model's predict() function
58 om.runtime.model('mymodel').predict(X)
59 => will call virtualobjfn(method='predict')
60
61 3) as a virtual script
62
63 # create the myscript script as a virtualobj
64 om.models.put(virtualobjfn, 'myscript')
65
66 # run the script
67 om.runtime.script('myscript').run()
68 => will call virtualobjfn(method='run')
69
70 WARNING:
71
72 Virtual objects are executed in the address space of the client or
73 runtime context. Make sure that the source of the code is trustworthy.
74 Note that this is different from Backends and Mixins as these are
75 pro-actively enabled by the administrator of the client or runtime
76 context, respectively - virtual objects can be injected by anyone
77 who are authorized to write data.
78 """
79 # TODO split VirtualObjectBackend into VirtualModelBackend and VirtualDataBackend
80 # to avoid confusion between the two (currently the same class is used for both)
81 KIND = 'virtualobj.dill'
82 PROMOTE = 'export'
83
[docs]
84 @classmethod
85 def supports(self, obj, name, **kwargs):
86 is_virtual = callable(obj) and getattr(obj, '_omega_virtual', False)
87 is_tool = isinstance(obj, types.FunctionType) and name.startswith('tools/')
88 return is_virtual or is_tool
89
90 @property
91 def _call_handler(self):
92 # the model store handles _pre and _post methods in self.perform()
93 return self.model_store
94
[docs]
95 def put(self, obj, name, attributes=None, dill_kwargs=None, as_source=False, **kwargs):
96 # TODO add obj signing so that only trustworthy sources can put functions
97 # since 0.15.6: only __main__ objects are stored as bytecodes,
98 # all module code is stored as source code. This
99 # removes the dependency on opcode parity between client
100 # and server. Source objects are compiled into __main__
101 # within the runtime. This is a tradeoff compatibility
102 # v.s. execution time. Use as_source=False to force
103 # storing bytecodes.
104 data = dilldip.dumps(obj, as_source=as_source, **(dill_kwargs or {}))
105 filename = self.model_store.object_store_key(name, '.dill', hashed=True)
106 gridfile = self._store_to_file(self.model_store, data, filename)
107 return self.model_store._make_metadata(
108 name=name,
109 prefix=self.model_store.prefix,
110 bucket=self.model_store.bucket,
111 kind=self.KIND,
112 attributes=attributes,
113 gridfile=gridfile).save()
114
[docs]
115 def get(self, name, version=-1, force_python=False, lazy=False, **kwargs):
116 meta = self.model_store.metadata(name)
117 outf = meta.gridfile
118 data = outf.read()
119 obj = dilldip.loads(data)
120 outf.close()
121 return self._ensure_handler_instance(obj)
122
123 def _ensure_handler_instance(self, obj):
124 # ensure VirtualObjectHandler classes are transformed to a virtualobj
125 return obj() if isinstance(obj, type) and issubclass(obj, VirtualObjectHandler) else obj
126
127 def predict(self, modelname, xName, rName=None, **kwargs):
128 # make this work as a model backend too
129 meta = self.model_store.metadata(modelname)
130 handler = self._ensure_handler_instance(self.get(modelname))
131 X = self.data_store.get(xName)
132 return handler(method='predict', data=X, meta=meta, store=self.model_store, rName=rName,
133 tracking=self.tracking, **kwargs)
134
135 def fit(self, modelname, xName, yName=None, rName=None, **kwargs):
136 # make this work as a model backend too
137 meta = self.model_store.metadata(modelname)
138 handler = self._ensure_handler_instance(self.get(modelname))
139 X = self.data_store.get(xName)
140 y = self.data_store.get(yName) if yName else None
141 return handler(method='fit', data=(X, y), meta=meta, store=self.model_store, rName=rName,
142 tracking=self.tracking, **kwargs)
143
144 def score(self, modelname, xName, yName=None, rName=None, **kwargs):
145 # make this work as a model backend too
146 meta = self.model_store.metadata(modelname)
147 handler = self._ensure_handler_instance(self.get(modelname))
148 X = self.data_store.get(xName)
149 y = self.data_store.get(yName) if yName else None
150 return handler(method='score', data=(X, y), meta=meta, store=self.model_store, rName=rName,
151 tracking=self.tracking, **kwargs)
152
153 def run(self, scriptname, *args, **kwargs):
154 # run as a script
155 meta = self.model_store.metadata(scriptname)
156 handler = self._ensure_handler_instance(self.get(scriptname))
157 data = args[0] if args else None
158 kwargs['args'] = args
159 return handler(method='run', data=data, meta=meta, store=self.data_store, tracking=self.tracking, **kwargs)
160
[docs]
161 def reduce(self, modelname, results, rName=None, **kwargs):
162 """
163 reduce a list of results to a single result
164
165 Use this as the last step in a task canvas
166
167 Args:
168 modelname (str): the name of the virtualobj
169 results (list): the list of results forwarded by task canvas
170 rName (result): the name of the result object
171 **kwargs:
172
173 Returns:
174 result of the virtualobj handler
175
176 See Also
177 om.runtime.mapreduce
178 """
179 meta = self.model_store.metadata(modelname)
180 handler = self._ensure_handler_instance(self.get(modelname))
181 return handler(method='reduce', data=results, meta=meta, store=self.model_store, rName=rName,
182 tracking=self.tracking, **kwargs)
183
184
185def virtualobj(fn):
186 """
187 function decorator to create a virtual object handler from any
188 callable
189
190 Args:
191 fn: the virtual handler function
192
193 Usage:
194
195 .. code::
196
197 @virtualobj
198 def myvirtualobj(data=None, method=None, meta=None, store=None, **kwargs):
199 ...
200
201 See:
202 VirtualObjectBackend for details on virtual object handlers
203
204 Returns:
205 fn
206 """
207 setattr(fn, '_omega_virtual', True)
208 return fn
209
210
211class VirtualObjectHandler(object):
212 """
213 Object-oriented API for virtual object functions
214 """
215 _omega_virtual = True
216
217 def _vobj_call_map(self):
218 return {
219 'drop': self.drop,
220 'get': self.get,
221 'put': self.put,
222 'predict': self.predict,
223 'run': self.run,
224 }
225
226 def get(self, data=None, meta=None, store=None, **kwargs):
227 raise NotImplementedError
228
229 def put(self, data=None, meta=None, store=None, **kwargs):
230 raise NotImplementedError
231
232 def drop(self, data=None, meta=None, store=None, **kwargs):
233 raise NotImplementedError
234
235 def predict(self, data=None, meta=None, store=None, **kwargs):
236 raise NotImplementedError
237
238 def run(self, data=None, meta=None, store=None, **kwargs):
239 raise NotImplementedError
240
241 def __call__(self, data=None, method=None, meta=None, store=None, tracking=None, **kwargs):
242 MAP = self._vobj_call_map()
243 methodfn = MAP[method]
244 return methodfn(data=data, meta=meta, store=store, tracking=tracking, **kwargs)
245
246
247class _DillDip:
248 # enhanced dill for functions and classes
249 # - warns about bound variables that could cause issues when undilling
250 # - checks types and functions are defined in __main__ before dilling (so we will code, not references),
251 # dynamically recompiles in __main__ before dilling if source is available
252 # - saves source code for improved python cross-version compatibility
253 # - falls back on standard dill for any other object type
254 # Rationale:
255 # - https://stackoverflow.com/questions/26389981/serialize-a-python-function-with-dependencies
256 # - https://github.com/uqfoundation/dill/issues/424#issuecomment-1598887257
257
258 # magic id for dipped dill objects
259 # -- why 0x1565? 15g of dill dip have 65 kcal
260 # -- see https://www.nutritionvalue.org/Dill_dip%2C_regular_12350210_nutritional_value.html
261 # -- also 42 is overused
262 __calories = 0x1565
263
264 def dumps(self, obj, as_source=False, **dill_kwargs):
265 # enhanced flavor of dill that stores source code for cross-version compatibility
266 # ensure we have a dill'able object
267 # if isinstance(obj, type):
268 # obj = obj()
269 self._check(obj)
270 data = (self._dill_main(obj, **dill_kwargs) or
271 self._dill_types_or_function(obj, as_source=as_source, **dill_kwargs) or
272 self._dill_dill(obj, **dill_kwargs))
273 return data
274
275 def loads(self, data):
276 # compat: Python 3.8.x < 3.8.2
277 # https://github.com/python/cpython/commit/b19f7ecfa3adc6ba1544225317b9473649815b38
278 # https://docs.python.org/3.8/whatsnew/changelog.html#python-3-8-2-final
279 try:
280 obj = self._dynamic_compile(dill.loads(data), module='__main__')
281 except ModuleNotFoundError as e:
282 # if the functions original module is not known, simulate it
283 # this is to deal with functions created outside of __main__
284 # see https://stackoverflow.com/q/26193102/890242
285 # https://stackoverflow.com/a/70513630/890242
286 mod = types.ModuleType(e.name, '__dynamic__')
287 sys.modules[e.name] = mod # sys.modules['__main__']
288 obj = dill.loads(data)
289 return obj
290
291 def _check(self, obj):
292 # check for freevars
293 freevars = dill.detect.nestedglobals(obj)
294 freevars += list(dill.detect.freevars(obj).keys())
295 freevars += list(dill.detect.referredglobals(obj))
296 freevars = [n for n in set(freevars) if n not in dir(builtins)]
297 if len(freevars):
298 warnings.warn(
299 f'The {repr(obj)} module references {freevars}, this may lead to errors at runtime; import/declare all variables within method/function scope')
300
301 def _dill_dill(self, obj, **dill_kwargs):
302 # fallback to standard dill
303 # e.g. class instances cannot be dumped unless they come from __main__
304 return dill.dumps(obj, **dill_kwargs)
305
306 def _dill_main(self, obj, **dill_kwargs):
307 # dynamic __main__ objects can be dilled directly, there is no source code
308 if dill.source.isfrommain(obj) or dill.source.isdynamic(obj):
309 return dill.dumps(obj, **dill_kwargs)
310 return None
311
312 def _dill_types_or_function(self, obj, as_source=False, **dill_kwargs):
313 # classes or functions should be dilled as source, unless they come from __main__
314 if isinstance(obj, type) or isinstance(obj, types.FunctionType):
315 return self._dill_source(obj, as_source=as_source, **dill_kwargs)
316 return None
317
318 def _dill_source(self, obj, as_source=False, **dill_kwargs):
319 # include source code along dill
320 try:
321 source = dill.source.getsource(obj, lstrip=True)
322 source_obj = {'__dipped__': self.__calories,
323 'source': ''.join(source),
324 'name': getattr(obj, '__name__'),
325 '__dict__': getattr(obj, '__dict__', {})}
326 except:
327 source_obj = {}
328 else:
329 # check obvious references in source
330 if '__main__' in source_obj.get('source', []):
331 warnings.warn(f'The {repr(obj)} references __main__, this may lead to unexpected results')
332 if as_source and source_obj:
333 # if source code was requested, transport as source code
334 data = dill.dumps(source_obj, **dill_kwargs)
335 elif source_obj and dill.detect.getmodule(obj) != '__main__':
336 # we have a source obj, make sure we can dill it and have source to revert from
337 # compile to __main__ module to enable full serialization
338 warnings.warn(f'The {repr(obj)} is defined outside of __main__, recompiling in __main__.')
339 obj = self._dynamic_compile(source_obj, module='__main__')
340 source_obj['dill'] = dill.dumps(obj, **dill_kwargs)
341 data = dill.dumps(source_obj, **dill_kwargs)
342 else:
343 # we have no source object, revert to standard dill
344 if as_source:
345 warnings.warn(f'Cannot save {repr(obj)} as source code, reverting to dill')
346 # could not get source code, revert to dill
347 data = dill.dumps(obj, **dill_kwargs)
348 return data
349
350 def _dynamic_compile(self, obj, module='__main__'):
351 from omegaml.backends.genai.models import GenAIModelHandler, virtual_genai
352 # re-compile source obj in __main__
353 if self.isdipped(obj):
354 if 'dill' in obj:
355 try:
356 obj = dill.loads(obj['dill'])
357 except:
358 warnings.warn('could not undill, reverting to dynamic compile source code')
359 else:
360 return obj
361 source, data = obj.get('source'), obj.get('__dict__', {})
362 mod = types.ModuleType(module)
363 mod.__dict__.update({'__compiling__': True,
364 'virtualobj': virtualobj,
365 'virtual_genai': virtual_genai,
366 'VirtualObjectHandler': VirtualObjectHandler,
367 'GenAIModelHandler': GenAIModelHandler})
368 sys.modules[module] = mod
369 code = compile(source, '<string>', 'exec')
370 exec(code, mod.__dict__)
371 obj = getattr(mod, obj['name'])
372 # restore instance data, if any
373 try:
374 getattr(obj, '__dict__', {}).update(data)
375 except AttributeError:
376 # we ignore attribute errors on class types
377 if not isinstance(obj, type):
378 warnings.warn(f'could not restore instance data for {obj}')
379 return obj
380
381 def isdipped(self, data_or_obj):
382 obj = tryOr(lambda: dill.loads(data_or_obj), None) if not isinstance(data_or_obj, dict) else data_or_obj
383 return isinstance(obj, dict) and obj.get('__dipped__') == self.__calories
384
385
386dilldip = _DillDip()
387# enable recursive tracing of globals
388# -- fixes https://github.com/uqfoundation/dill/issues/255
389# -- see also https://github.com/uqfoundation/dill/issues/537
390# -- the issue only happens when the openai module is loaded (root cause unknown)
391dill.settings['recurse'] = True