Source code for omegaml.backends.virtualobj

  1import builtins
  2import dill
  3import sys
  4import types
  5import warnings
  6
  7from omegaml.backends.basedata import BaseDataBackend
  8from omegaml.util import tryOr
  9
 10
[docs] 11class VirtualObjectBackend(BaseDataBackend): 12 """ 13 Support arbitrary functions as object handlers 14 15 Virtual object functions can be any callable that provides a __omega_virtual__ 16 attribute. The callable must support the following signature:: 17 18 @virtualobj 19 def virtualobjfn(data=None, method='get|put|drop', 20 meta=None, store=None, **kwargs): 21 ... 22 return data 23 24 Note that there is a distinction between storing the function as a virtual object, 25 and passing data in or getting data out of the store. It is the responsibility 26 of the function to implement the appropriate code for get, put, drop, as well as 27 to keep track of the data it actually stores. 28 29 As a convenience virtual object handlers can be implemented as a subclass of 30 VirtualObjectHandler 31 32 Usage:: 33 34 1) as a virtual data handler 35 36 # create the 'foo' virtual object 37 om.datasets.put(virtualobjfn, 'foo') 38 39 # get data from the virtualobj 40 om.datasets.get('foo') 41 => will call virtualobjfn(method='get') 42 43 # put data into the virtualobj 44 om.datasets.put(data, 'foo') 45 => will call virtualobjfn(data=data, method='put') 46 47 # drop the virtualfn 48 om.datasets.drop('name') 49 => will call virtualobjfn(method='drop') and then 50 drop the virtual object completely from the storage 51 52 2) as a virtual model 53 54 # create the mymodel model as a virtualobj 55 om.models.put(virtualobjfn, 'mymodel') 56 57 # run the model's predict() function 58 om.runtime.model('mymodel').predict(X) 59 => will call virtualobjfn(method='predict') 60 61 3) as a virtual script 62 63 # create the myscript script as a virtualobj 64 om.models.put(virtualobjfn, 'myscript') 65 66 # run the script 67 om.runtime.script('myscript').run() 68 => will call virtualobjfn(method='run') 69 70 WARNING: 71 72 Virtual objects are executed in the address space of the client or 73 runtime context. Make sure that the source of the code is trustworthy. 74 Note that this is different from Backends and Mixins as these are 75 pro-actively enabled by the administrator of the client or runtime 76 context, respectively - virtual objects can be injected by anyone 77 who are authorized to write data. 78 """ 79 # TODO split VirtualObjectBackend into VirtualModelBackend and VirtualDataBackend 80 # to avoid confusion between the two (currently the same class is used for both) 81 KIND = 'virtualobj.dill' 82 PROMOTE = 'export' 83
[docs] 84 @classmethod 85 def supports(self, obj, name, **kwargs): 86 is_virtual = callable(obj) and getattr(obj, '_omega_virtual', False) 87 is_tool = isinstance(obj, types.FunctionType) and name.startswith('tools/') 88 return is_virtual or is_tool
89 90 @property 91 def _call_handler(self): 92 # the model store handles _pre and _post methods in self.perform() 93 return self.model_store 94
[docs] 95 def put(self, obj, name, attributes=None, dill_kwargs=None, as_source=False, **kwargs): 96 # TODO add obj signing so that only trustworthy sources can put functions 97 # since 0.15.6: only __main__ objects are stored as bytecodes, 98 # all module code is stored as source code. This 99 # removes the dependency on opcode parity between client 100 # and server. Source objects are compiled into __main__ 101 # within the runtime. This is a tradeoff compatibility 102 # v.s. execution time. Use as_source=False to force 103 # storing bytecodes. 104 data = dilldip.dumps(obj, as_source=as_source, **(dill_kwargs or {})) 105 filename = self.model_store.object_store_key(name, '.dill', hashed=True) 106 gridfile = self._store_to_file(self.model_store, data, filename) 107 return self.model_store._make_metadata( 108 name=name, 109 prefix=self.model_store.prefix, 110 bucket=self.model_store.bucket, 111 kind=self.KIND, 112 attributes=attributes, 113 gridfile=gridfile).save()
114
[docs] 115 def get(self, name, version=-1, force_python=False, lazy=False, **kwargs): 116 meta = self.model_store.metadata(name) 117 outf = meta.gridfile 118 data = outf.read() 119 obj = dilldip.loads(data) 120 outf.close() 121 return self._ensure_handler_instance(obj)
122 123 def _ensure_handler_instance(self, obj): 124 # ensure VirtualObjectHandler classes are transformed to a virtualobj 125 return obj() if isinstance(obj, type) and issubclass(obj, VirtualObjectHandler) else obj 126 127 def predict(self, modelname, xName, rName=None, **kwargs): 128 # make this work as a model backend too 129 meta = self.model_store.metadata(modelname) 130 handler = self._ensure_handler_instance(self.get(modelname)) 131 X = self.data_store.get(xName) 132 return handler(method='predict', data=X, meta=meta, store=self.model_store, rName=rName, 133 tracking=self.tracking, **kwargs) 134 135 def fit(self, modelname, xName, yName=None, rName=None, **kwargs): 136 # make this work as a model backend too 137 meta = self.model_store.metadata(modelname) 138 handler = self._ensure_handler_instance(self.get(modelname)) 139 X = self.data_store.get(xName) 140 y = self.data_store.get(yName) if yName else None 141 return handler(method='fit', data=(X, y), meta=meta, store=self.model_store, rName=rName, 142 tracking=self.tracking, **kwargs) 143 144 def score(self, modelname, xName, yName=None, rName=None, **kwargs): 145 # make this work as a model backend too 146 meta = self.model_store.metadata(modelname) 147 handler = self._ensure_handler_instance(self.get(modelname)) 148 X = self.data_store.get(xName) 149 y = self.data_store.get(yName) if yName else None 150 return handler(method='score', data=(X, y), meta=meta, store=self.model_store, rName=rName, 151 tracking=self.tracking, **kwargs) 152 153 def run(self, scriptname, *args, **kwargs): 154 # run as a script 155 meta = self.model_store.metadata(scriptname) 156 handler = self._ensure_handler_instance(self.get(scriptname)) 157 data = args[0] if args else None 158 kwargs['args'] = args 159 return handler(method='run', data=data, meta=meta, store=self.data_store, tracking=self.tracking, **kwargs) 160
[docs] 161 def reduce(self, modelname, results, rName=None, **kwargs): 162 """ 163 reduce a list of results to a single result 164 165 Use this as the last step in a task canvas 166 167 Args: 168 modelname (str): the name of the virtualobj 169 results (list): the list of results forwarded by task canvas 170 rName (result): the name of the result object 171 **kwargs: 172 173 Returns: 174 result of the virtualobj handler 175 176 See Also 177 om.runtime.mapreduce 178 """ 179 meta = self.model_store.metadata(modelname) 180 handler = self._ensure_handler_instance(self.get(modelname)) 181 return handler(method='reduce', data=results, meta=meta, store=self.model_store, rName=rName, 182 tracking=self.tracking, **kwargs)
183 184 185def virtualobj(fn): 186 """ 187 function decorator to create a virtual object handler from any 188 callable 189 190 Args: 191 fn: the virtual handler function 192 193 Usage: 194 195 .. code:: 196 197 @virtualobj 198 def myvirtualobj(data=None, method=None, meta=None, store=None, **kwargs): 199 ... 200 201 See: 202 VirtualObjectBackend for details on virtual object handlers 203 204 Returns: 205 fn 206 """ 207 setattr(fn, '_omega_virtual', True) 208 return fn 209 210 211class VirtualObjectHandler(object): 212 """ 213 Object-oriented API for virtual object functions 214 """ 215 _omega_virtual = True 216 217 def _vobj_call_map(self): 218 return { 219 'drop': self.drop, 220 'get': self.get, 221 'put': self.put, 222 'predict': self.predict, 223 'run': self.run, 224 } 225 226 def get(self, data=None, meta=None, store=None, **kwargs): 227 raise NotImplementedError 228 229 def put(self, data=None, meta=None, store=None, **kwargs): 230 raise NotImplementedError 231 232 def drop(self, data=None, meta=None, store=None, **kwargs): 233 raise NotImplementedError 234 235 def predict(self, data=None, meta=None, store=None, **kwargs): 236 raise NotImplementedError 237 238 def run(self, data=None, meta=None, store=None, **kwargs): 239 raise NotImplementedError 240 241 def __call__(self, data=None, method=None, meta=None, store=None, tracking=None, **kwargs): 242 MAP = self._vobj_call_map() 243 methodfn = MAP[method] 244 return methodfn(data=data, meta=meta, store=store, tracking=tracking, **kwargs) 245 246 247class _DillDip: 248 # enhanced dill for functions and classes 249 # - warns about bound variables that could cause issues when undilling 250 # - checks types and functions are defined in __main__ before dilling (so we will code, not references), 251 # dynamically recompiles in __main__ before dilling if source is available 252 # - saves source code for improved python cross-version compatibility 253 # - falls back on standard dill for any other object type 254 # Rationale: 255 # - https://stackoverflow.com/questions/26389981/serialize-a-python-function-with-dependencies 256 # - https://github.com/uqfoundation/dill/issues/424#issuecomment-1598887257 257 258 # magic id for dipped dill objects 259 # -- why 0x1565? 15g of dill dip have 65 kcal 260 # -- see https://www.nutritionvalue.org/Dill_dip%2C_regular_12350210_nutritional_value.html 261 # -- also 42 is overused 262 __calories = 0x1565 263 264 def dumps(self, obj, as_source=False, **dill_kwargs): 265 # enhanced flavor of dill that stores source code for cross-version compatibility 266 # ensure we have a dill'able object 267 # if isinstance(obj, type): 268 # obj = obj() 269 self._check(obj) 270 data = (self._dill_main(obj, **dill_kwargs) or 271 self._dill_types_or_function(obj, as_source=as_source, **dill_kwargs) or 272 self._dill_dill(obj, **dill_kwargs)) 273 return data 274 275 def loads(self, data): 276 # compat: Python 3.8.x < 3.8.2 277 # https://github.com/python/cpython/commit/b19f7ecfa3adc6ba1544225317b9473649815b38 278 # https://docs.python.org/3.8/whatsnew/changelog.html#python-3-8-2-final 279 try: 280 obj = self._dynamic_compile(dill.loads(data), module='__main__') 281 except ModuleNotFoundError as e: 282 # if the functions original module is not known, simulate it 283 # this is to deal with functions created outside of __main__ 284 # see https://stackoverflow.com/q/26193102/890242 285 # https://stackoverflow.com/a/70513630/890242 286 mod = types.ModuleType(e.name, '__dynamic__') 287 sys.modules[e.name] = mod # sys.modules['__main__'] 288 obj = dill.loads(data) 289 return obj 290 291 def _check(self, obj): 292 # check for freevars 293 freevars = dill.detect.nestedglobals(obj) 294 freevars += list(dill.detect.freevars(obj).keys()) 295 freevars += list(dill.detect.referredglobals(obj)) 296 freevars = [n for n in set(freevars) if n not in dir(builtins)] 297 if len(freevars): 298 warnings.warn( 299 f'The {repr(obj)} module references {freevars}, this may lead to errors at runtime; import/declare all variables within method/function scope') 300 301 def _dill_dill(self, obj, **dill_kwargs): 302 # fallback to standard dill 303 # e.g. class instances cannot be dumped unless they come from __main__ 304 return dill.dumps(obj, **dill_kwargs) 305 306 def _dill_main(self, obj, **dill_kwargs): 307 # dynamic __main__ objects can be dilled directly, there is no source code 308 if dill.source.isfrommain(obj) or dill.source.isdynamic(obj): 309 return dill.dumps(obj, **dill_kwargs) 310 return None 311 312 def _dill_types_or_function(self, obj, as_source=False, **dill_kwargs): 313 # classes or functions should be dilled as source, unless they come from __main__ 314 if isinstance(obj, type) or isinstance(obj, types.FunctionType): 315 return self._dill_source(obj, as_source=as_source, **dill_kwargs) 316 return None 317 318 def _dill_source(self, obj, as_source=False, **dill_kwargs): 319 # include source code along dill 320 try: 321 source = dill.source.getsource(obj, lstrip=True) 322 source_obj = {'__dipped__': self.__calories, 323 'source': ''.join(source), 324 'name': getattr(obj, '__name__'), 325 '__dict__': getattr(obj, '__dict__', {})} 326 except: 327 source_obj = {} 328 else: 329 # check obvious references in source 330 if '__main__' in source_obj.get('source', []): 331 warnings.warn(f'The {repr(obj)} references __main__, this may lead to unexpected results') 332 if as_source and source_obj: 333 # if source code was requested, transport as source code 334 data = dill.dumps(source_obj, **dill_kwargs) 335 elif source_obj and dill.detect.getmodule(obj) != '__main__': 336 # we have a source obj, make sure we can dill it and have source to revert from 337 # compile to __main__ module to enable full serialization 338 warnings.warn(f'The {repr(obj)} is defined outside of __main__, recompiling in __main__.') 339 obj = self._dynamic_compile(source_obj, module='__main__') 340 source_obj['dill'] = dill.dumps(obj, **dill_kwargs) 341 data = dill.dumps(source_obj, **dill_kwargs) 342 else: 343 # we have no source object, revert to standard dill 344 if as_source: 345 warnings.warn(f'Cannot save {repr(obj)} as source code, reverting to dill') 346 # could not get source code, revert to dill 347 data = dill.dumps(obj, **dill_kwargs) 348 return data 349 350 def _dynamic_compile(self, obj, module='__main__'): 351 from omegaml.backends.genai.models import GenAIModelHandler, virtual_genai 352 # re-compile source obj in __main__ 353 if self.isdipped(obj): 354 if 'dill' in obj: 355 try: 356 obj = dill.loads(obj['dill']) 357 except: 358 warnings.warn('could not undill, reverting to dynamic compile source code') 359 else: 360 return obj 361 source, data = obj.get('source'), obj.get('__dict__', {}) 362 mod = types.ModuleType(module) 363 mod.__dict__.update({'__compiling__': True, 364 'virtualobj': virtualobj, 365 'virtual_genai': virtual_genai, 366 'VirtualObjectHandler': VirtualObjectHandler, 367 'GenAIModelHandler': GenAIModelHandler}) 368 sys.modules[module] = mod 369 code = compile(source, '<string>', 'exec') 370 exec(code, mod.__dict__) 371 obj = getattr(mod, obj['name']) 372 # restore instance data, if any 373 try: 374 getattr(obj, '__dict__', {}).update(data) 375 except AttributeError: 376 # we ignore attribute errors on class types 377 if not isinstance(obj, type): 378 warnings.warn(f'could not restore instance data for {obj}') 379 return obj 380 381 def isdipped(self, data_or_obj): 382 obj = tryOr(lambda: dill.loads(data_or_obj), None) if not isinstance(data_or_obj, dict) else data_or_obj 383 return isinstance(obj, dict) and obj.get('__dipped__') == self.__calories 384 385 386dilldip = _DillDip() 387# enable recursive tracing of globals 388# -- fixes https://github.com/uqfoundation/dill/issues/255 389# -- see also https://github.com/uqfoundation/dill/issues/537 390# -- the issue only happens when the openai module is loaded (root cause unknown) 391dill.settings['recurse'] = True