Source code for omegaml.backends.virtualobj

  1import builtins
  2import sys
  3import types
  4import warnings
  5
  6import dill
  7
  8from omegaml.backends.basedata import BaseDataBackend
  9from omegaml.util import tryOr
 10
 11
[docs] 12class VirtualObjectBackend(BaseDataBackend): 13 """ 14 Support arbitrary functions as object handlers 15 16 Virtual object functions can be any callable that provides a __omega_virtual__ 17 attribute. The callable must support the following signature:: 18 19 @virtualobj 20 def virtualobjfn(data=None, method='get|put|drop', 21 meta=None, store=None, **kwargs): 22 ... 23 return data 24 25 Note that there is a distinction between storing the function as a virtual object, 26 and passing data in or getting data out of the store. It is the responsibility 27 of the function to implement the appropriate code for get, put, drop, as well as 28 to keep track of the data it actually stores. 29 30 As a convenience virtual object handlers can be implemented as a subclass of 31 VirtualObjectHandler 32 33 Usage:: 34 35 1) as a virtual data handler 36 37 # create the 'foo' virtual object 38 om.datasets.put(virtualobjfn, 'foo') 39 40 # get data from the virtualobj 41 om.datasets.get('foo') 42 => will call virtualobjfn(method='get') 43 44 # put data into the virtualobj 45 om.datasets.put(data, 'foo') 46 => will call virtualobjfn(data=data, method='put') 47 48 # drop the virtualfn 49 om.datasets.drop('name') 50 => will call virtualobjfn(method='drop') and then 51 drop the virtual object completely from the storage 52 53 2) as a virtual model 54 55 # create the mymodel model as a virtualobj 56 om.models.put(virtualobjfn, 'mymodel') 57 58 # run the model's predict() function 59 om.runtime.model('mymodel').predict(X) 60 => will call virtualobjfn(method='predict') 61 62 3) as a virtual script 63 64 # create the myscript script as a virtualobj 65 om.models.put(virtualobjfn, 'myscript') 66 67 # run the script 68 om.runtime.script('myscript').run() 69 => will call virtualobjfn(method='run') 70 71 WARNING: 72 73 Virtual objects are executed in the address space of the client or 74 runtime context. Make sure that the source of the code is trustworthy. 75 Note that this is different from Backends and Mixins as these are 76 pro-actively enabled by the administrator of the client or runtime 77 context, respectively - virtual objects can be injected by anyone 78 who are authorized to write data. 79 """ 80 # TODO split VirtualObjectBackend into VirtualModelBackend and VirtualDataBackend 81 # to avoid confusion between the two (currently the same class is used for both) 82 KIND = 'virtualobj.dill' 83 PROMOTE = 'export' 84
[docs] 85 @classmethod 86 def supports(self, obj, name, **kwargs): 87 is_virtual = callable(obj) and getattr(obj, '_omega_virtual', False) 88 is_tool = isinstance(obj, types.FunctionType) and name.startswith('tools/') 89 return is_virtual or is_tool
90 91 @property 92 def _call_handler(self): 93 # the model store handles _pre and _post methods in self.perform() 94 return self.model_store 95
[docs] 96 def put(self, obj, name, attributes=None, dill_kwargs=None, as_source=False, **kwargs): 97 # TODO add obj signing so that only trustworthy sources can put functions 98 # since 0.15.6: only __main__ objects are stored as bytecodes, 99 # all module code is stored as source code. This 100 # removes the dependency on opcode parity between client 101 # and server. Source objects are compiled into __main__ 102 # within the runtime. This is a tradeoff compatibility 103 # v.s. execution time. Use as_source=False to force 104 # storing bytecodes. 105 data = dilldip.dumps(obj, as_source=as_source, **(dill_kwargs or {})) 106 filename = self.model_store.object_store_key(name, '.dill', hashed=True) 107 gridfile = self._store_to_file(self.model_store, data, filename) 108 return self.model_store._make_metadata( 109 name=name, 110 prefix=self.model_store.prefix, 111 bucket=self.model_store.bucket, 112 kind=self.KIND, 113 attributes=attributes, 114 gridfile=gridfile).save()
115
[docs] 116 def get(self, name, version=-1, force_python=False, lazy=False, **kwargs): 117 meta = self.model_store.metadata(name) 118 outf = meta.gridfile 119 data = outf.read() 120 obj = dilldip.loads(data) 121 outf.close() 122 return self._ensure_handler_instance(obj)
123 124 def _ensure_handler_instance(self, obj): 125 # ensure VirtualObjectHandler classes are transformed to a virtualobj 126 return obj() if isinstance(obj, type) and issubclass(obj, VirtualObjectHandler) else obj 127 128 def predict(self, modelname, xName, rName=None, **kwargs): 129 # make this work as a model backend too 130 meta = self.model_store.metadata(modelname) 131 handler = self._ensure_handler_instance(self.get(modelname)) 132 X = self.data_store.get(xName) 133 return handler(method='predict', data=X, meta=meta, store=self.model_store, rName=rName, 134 tracking=self.tracking, **kwargs) 135 136 def fit(self, modelname, xName, yName=None, rName=None, **kwargs): 137 # make this work as a model backend too 138 meta = self.model_store.metadata(modelname) 139 handler = self._ensure_handler_instance(self.get(modelname)) 140 X = self.data_store.get(xName) 141 y = self.data_store.get(yName) if yName else None 142 return handler(method='fit', data=(X, y), meta=meta, store=self.model_store, rName=rName, 143 tracking=self.tracking, **kwargs) 144 145 def score(self, modelname, xName, yName=None, rName=None, **kwargs): 146 # make this work as a model backend too 147 meta = self.model_store.metadata(modelname) 148 handler = self._ensure_handler_instance(self.get(modelname)) 149 X = self.data_store.get(xName) 150 y = self.data_store.get(yName) if yName else None 151 return handler(method='score', data=(X, y), meta=meta, store=self.model_store, rName=rName, 152 tracking=self.tracking, **kwargs) 153 154 def run(self, scriptname, *args, **kwargs): 155 # run as a script 156 meta = self.model_store.metadata(scriptname) 157 handler = self._ensure_handler_instance(self.get(scriptname)) 158 data = args[0] if args else None 159 kwargs['args'] = args 160 return handler(method='run', data=data, meta=meta, store=self.data_store, tracking=self.tracking, **kwargs) 161
[docs] 162 def reduce(self, modelname, results, rName=None, **kwargs): 163 """ 164 reduce a list of results to a single result 165 166 Use this as the last step in a task canvas 167 168 Args: 169 modelname (str): the name of the virtualobj 170 results (list): the list of results forwarded by task canvas 171 rName (result): the name of the result object 172 **kwargs: 173 174 Returns: 175 result of the virtualobj handler 176 177 See Also 178 om.runtime.mapreduce 179 """ 180 meta = self.model_store.metadata(modelname) 181 handler = self._ensure_handler_instance(self.get(modelname)) 182 return handler(method='reduce', data=results, meta=meta, store=self.model_store, rName=rName, 183 tracking=self.tracking, **kwargs)
184 185 186def virtualobj(fn): 187 """ 188 function decorator to create a virtual object handler from any 189 callable 190 191 Args: 192 fn: the virtual handler function 193 194 Usage: 195 196 .. code:: 197 198 @virtualobj 199 def myvirtualobj(data=None, method=None, meta=None, store=None, **kwargs): 200 ... 201 202 See: 203 VirtualObjectBackend for details on virtual object handlers 204 205 Returns: 206 fn 207 """ 208 setattr(fn, '_omega_virtual', True) 209 return fn 210 211 212class VirtualObjectHandler(object): 213 """ 214 Object-oriented API for virtual object functions 215 """ 216 _omega_virtual = True 217 218 def _vobj_call_map(self): 219 return { 220 'drop': self.drop, 221 'get': self.get, 222 'put': self.put, 223 'predict': self.predict, 224 'run': self.run, 225 } 226 227 def load(self): 228 pass 229 230 def get(self, data=None, meta=None, store=None, **kwargs): 231 raise NotImplementedError 232 233 def put(self, data=None, meta=None, store=None, **kwargs): 234 raise NotImplementedError 235 236 def drop(self, data=None, meta=None, store=None, **kwargs): 237 raise NotImplementedError 238 239 def predict(self, data=None, meta=None, store=None, **kwargs): 240 raise NotImplementedError 241 242 def run(self, data=None, meta=None, store=None, **kwargs): 243 raise NotImplementedError 244 245 def __call__(self, data=None, method=None, meta=None, store=None, tracking=None, **kwargs): 246 MAP = self._vobj_call_map() 247 methodfn = MAP[method] 248 self.load() # ensure loading of imports and other dependencies 249 return methodfn(data=data, meta=meta, store=store, tracking=tracking, **kwargs) 250 251 252class _DillDip: 253 # enhanced dill for functions and classes 254 # - warns about bound variables that could cause issues when undilling 255 # - checks types and functions are defined in __main__ before dilling (so we will code, not references), 256 # dynamically recompiles in __main__ before dilling if source is available 257 # - saves source code for improved python cross-version compatibility 258 # - falls back on standard dill for any other object type 259 # Rationale: 260 # - https://stackoverflow.com/questions/26389981/serialize-a-python-function-with-dependencies 261 # - https://github.com/uqfoundation/dill/issues/424#issuecomment-1598887257 262 263 # magic id for dipped dill objects 264 # -- why 0x1565? 15g of dill dip have 65 kcal 265 # -- see https://www.nutritionvalue.org/Dill_dip%2C_regular_12350210_nutritional_value.html 266 # -- also 42 is overused 267 __calories = 0x1565 268 269 def dumps(self, obj, as_source=False, **dill_kwargs): 270 # enhanced flavor of dill that stores source code for cross-version compatibility 271 # ensure we have a dill'able object 272 # if isinstance(obj, type): 273 # obj = obj() 274 self._check(obj) 275 data = (self._dill_main(obj, **dill_kwargs) or 276 self._dill_types_or_function(obj, as_source=as_source, **dill_kwargs) or 277 self._dill_dill(obj, **dill_kwargs)) 278 return data 279 280 def loads(self, data): 281 # compat: Python 3.8.x < 3.8.2 282 # https://github.com/python/cpython/commit/b19f7ecfa3adc6ba1544225317b9473649815b38 283 # https://docs.python.org/3.8/whatsnew/changelog.html#python-3-8-2-final 284 try: 285 obj = self._dynamic_compile(dill.loads(data), module='__main__') 286 except ModuleNotFoundError as e: 287 # if the functions original module is not known, simulate it 288 # this is to deal with functions created outside of __main__ 289 # see https://stackoverflow.com/q/26193102/890242 290 # https://stackoverflow.com/a/70513630/890242 291 mod = types.ModuleType(e.name, '__dynamic__') 292 sys.modules[e.name] = mod # sys.modules['__main__'] 293 obj = dill.loads(data) 294 return obj 295 296 def _check(self, obj): 297 # check for freevars 298 freevars = dill.detect.nestedglobals(obj) 299 freevars += list(dill.detect.freevars(obj).keys()) 300 freevars += list(dill.detect.referredglobals(obj)) 301 freevars = [n for n in set(freevars) if n not in dir(builtins)] 302 if len(freevars): 303 warnings.warn( 304 f'The {repr(obj)} module references {freevars}, this may lead to errors at runtime; import/declare all variables within method/function scope') 305 306 def _dill_dill(self, obj, **dill_kwargs): 307 # fallback to standard dill 308 # e.g. class instances cannot be dumped unless they come from __main__ 309 return dill.dumps(obj, **dill_kwargs) 310 311 def _dill_main(self, obj, **dill_kwargs): 312 # dynamic __main__ objects can be dilled directly, there is no source code 313 if dill.source.isfrommain(obj) or dill.source.isdynamic(obj): 314 return dill.dumps(obj, **dill_kwargs) 315 return None 316 317 def _dill_types_or_function(self, obj, as_source=False, **dill_kwargs): 318 # classes or functions should be dilled as source, unless they come from __main__ 319 if isinstance(obj, type) or isinstance(obj, types.FunctionType): 320 return self._dill_source(obj, as_source=as_source, **dill_kwargs) 321 return None 322 323 def _dill_source(self, obj, as_source=False, **dill_kwargs): 324 # include source code along dill 325 try: 326 source = dill.source.getsource(obj, lstrip=True) 327 source_obj = {'__dipped__': self.__calories, 328 'source': ''.join(source), 329 'name': getattr(obj, '__name__'), 330 '__dict__': getattr(obj, '__dict__', {})} 331 except: 332 source_obj = {} 333 else: 334 # check obvious references in source 335 if '__main__' in source_obj.get('source', []): 336 warnings.warn(f'The {repr(obj)} references __main__, this may lead to unexpected results') 337 if as_source and source_obj: 338 # if source code was requested, transport as source code 339 data = dill.dumps(source_obj, **dill_kwargs) 340 elif source_obj and dill.detect.getmodule(obj) != '__main__': 341 # we have a source obj, make sure we can dill it and have source to revert from 342 # compile to __main__ module to enable full serialization 343 warnings.warn(f'The {repr(obj)} is defined outside of __main__, recompiling in __main__.') 344 obj = self._dynamic_compile(source_obj, module='__main__') 345 source_obj['dill'] = dill.dumps(obj, **dill_kwargs) 346 data = dill.dumps(source_obj, **dill_kwargs) 347 else: 348 # we have no source object, revert to standard dill 349 if as_source: 350 warnings.warn(f'Cannot save {repr(obj)} as source code, reverting to dill') 351 # could not get source code, revert to dill 352 data = dill.dumps(obj, **dill_kwargs) 353 return data 354 355 def _dynamic_compile(self, obj, module='__main__'): 356 from omegaml.backends.genai.models import GenAIModelHandler, virtual_genai 357 # re-compile source obj in __main__ 358 if self.isdipped(obj): 359 if 'dill' in obj: 360 try: 361 obj = dill.loads(obj['dill']) 362 except: 363 warnings.warn('could not undill, reverting to dynamic compile source code') 364 else: 365 return obj 366 source, data = obj.get('source'), obj.get('__dict__', {}) 367 mod = types.ModuleType(module) 368 mod.__dict__.update({'__compiling__': True, 369 'virtualobj': virtualobj, 370 'virtual_genai': virtual_genai, 371 'VirtualObjectHandler': VirtualObjectHandler, 372 'GenAIModelHandler': GenAIModelHandler}) 373 sys.modules[module] = mod 374 code = compile(source, '<string>', 'exec') 375 exec(code, mod.__dict__) 376 obj = getattr(mod, obj['name']) 377 # restore instance data, if any 378 try: 379 getattr(obj, '__dict__', {}).update(data) 380 except AttributeError: 381 # we ignore attribute errors on class types 382 if not isinstance(obj, type): 383 warnings.warn(f'could not restore instance data for {obj}') 384 return obj 385 386 def isdipped(self, data_or_obj): 387 obj = tryOr(lambda: dill.loads(data_or_obj), None) if not isinstance(data_or_obj, dict) else data_or_obj 388 return isinstance(obj, dict) and obj.get('__dipped__') == self.__calories 389 390 391dilldip = _DillDip() 392# enable recursive tracing of globals 393# -- fixes https://github.com/uqfoundation/dill/issues/255 394# -- see also https://github.com/uqfoundation/dill/issues/537 395# -- the issue only happens when the openai module is loaded (root cause unknown) 396dill.settings['recurse'] = True