Source code for omegaml.backends.virtualobj

  1import builtins
  2import dill
  3import sys
  4import types
  5import warnings
  6
  7from omegaml.backends.basedata import BaseDataBackend
  8from omegaml.util import tryOr
  9
 10
[docs] 11class VirtualObjectBackend(BaseDataBackend): 12 """ 13 Support arbitrary functions as object handlers 14 15 Virtual object functions can be any callable that provides a __omega_virtual__ 16 attribute. The callable must support the following signature:: 17 18 @virtualobj 19 def virtualobjfn(data=None, method='get|put|drop', 20 meta=None, store=None, **kwargs): 21 ... 22 return data 23 24 Note that there is a distinction between storing the function as a virtual object, 25 and passing data in or getting data out of the store. It is the responsibility 26 of the function to implement the appropriate code for get, put, drop, as well as 27 to keep track of the data it actually stores. 28 29 As a convenience virtual object handlers can be implemented as a subclass of 30 VirtualObjectHandler 31 32 Usage:: 33 34 1) as a virtual data handler 35 36 # create the 'foo' virtual object 37 om.datasets.put(virtualobjfn, 'foo') 38 39 # get data from the virtualobj 40 om.datasets.get('foo') 41 => will call virtualobjfn(method='get') 42 43 # put data into the virtualobj 44 om.datasets.put(data, 'foo') 45 => will call virtualobjfn(data=data, method='put') 46 47 # drop the virtualfn 48 om.datasets.drop('name') 49 => will call virtualobjfn(method='drop') and then 50 drop the virtual object completely from the storage 51 52 2) as a virtual model 53 54 # create the mymodel model as a virtualobj 55 om.models.put(virtualobjfn, 'mymodel') 56 57 # run the model's predict() function 58 om.runtime.model('mymodel').predict(X) 59 => will call virtualobjfn(method='predict') 60 61 3) as a virtual script 62 63 # create the myscript script as a virtualobj 64 om.models.put(virtualobjfn, 'myscript') 65 66 # run the script 67 om.runtime.script('myscript').run() 68 => will call virtualobjfn(method='run') 69 70 WARNING: 71 72 Virtual objects are executed in the address space of the client or 73 runtime context. Make sure that the source of the code is trustworthy. 74 Note that this is different from Backends and Mixins as these are 75 pro-actively enabled by the administrator of the client or runtime 76 context, respectively - virtual objects can be injected by anyone 77 who are authorized to write data. 78 """ 79 # TODO split VirtualObjectBackend into VirtualModelBackend and VirtualDataBackend 80 # to avoid confusion between the two (currently the same class is used for both) 81 KIND = 'virtualobj.dill' 82 PROMOTE = 'export' 83
[docs] 84 @classmethod 85 def supports(self, obj, name, **kwargs): 86 is_virtual = callable(obj) and getattr(obj, '_omega_virtual', False) 87 is_tool = isinstance(obj, types.FunctionType) and name.startswith('tools/') 88 return is_virtual or is_tool
89 90 @property 91 def _call_handler(self): 92 # the model store handles _pre and _post methods in self.perform() 93 return self.model_store 94
[docs] 95 def put(self, obj, name, attributes=None, dill_kwargs=None, as_source=False, **kwargs): 96 # TODO add obj signing so that only trustworthy sources can put functions 97 # since 0.15.6: only __main__ objects are stored as bytecodes, 98 # all module code is stored as source code. This 99 # removes the dependency on opcode parity between client 100 # and server. Source objects are compiled into __main__ 101 # within the runtime. This is a tradeoff compatibility 102 # v.s. execution time. Use as_source=False to force 103 # storing bytecodes. 104 data = dilldip.dumps(obj, as_source=as_source, **(dill_kwargs or {})) 105 filename = self.model_store.object_store_key(name, '.dill', hashed=True) 106 gridfile = self._store_to_file(self.model_store, data, filename) 107 return self.model_store._make_metadata( 108 name=name, 109 prefix=self.model_store.prefix, 110 bucket=self.model_store.bucket, 111 kind=self.KIND, 112 attributes=attributes, 113 gridfile=gridfile).save()
114
[docs] 115 def get(self, name, version=-1, force_python=False, lazy=False, **kwargs): 116 meta = self.model_store.metadata(name) 117 outf = meta.gridfile 118 data = outf.read() 119 obj = dilldip.loads(data) 120 outf.close() 121 return self._ensure_handler_instance(obj)
122 123 def _ensure_handler_instance(self, obj): 124 # ensure VirtualObjectHandler classes are transformed to a virtualobj 125 return obj() if isinstance(obj, type) and issubclass(obj, VirtualObjectHandler) else obj 126 127 def predict(self, modelname, xName, rName=None, **kwargs): 128 # make this work as a model backend too 129 meta = self.model_store.metadata(modelname) 130 handler = self._ensure_handler_instance(self.get(modelname)) 131 X = self.data_store.get(xName) 132 return handler(method='predict', data=X, meta=meta, store=self.model_store, rName=rName, 133 tracking=self.tracking, **kwargs) 134 135 def fit(self, modelname, xName, yName=None, rName=None, **kwargs): 136 # make this work as a model backend too 137 meta = self.model_store.metadata(modelname) 138 handler = self._ensure_handler_instance(self.get(modelname)) 139 X = self.data_store.get(xName) 140 y = self.data_store.get(yName) if yName else None 141 return handler(method='fit', data=(X, y), meta=meta, store=self.model_store, rName=rName, 142 tracking=self.tracking, **kwargs) 143 144 def score(self, modelname, xName, yName=None, rName=None, **kwargs): 145 # make this work as a model backend too 146 meta = self.model_store.metadata(modelname) 147 handler = self._ensure_handler_instance(self.get(modelname)) 148 X = self.data_store.get(xName) 149 y = self.data_store.get(yName) if yName else None 150 return handler(method='score', data=(X, y), meta=meta, store=self.model_store, rName=rName, 151 tracking=self.tracking, **kwargs) 152 153 def run(self, scriptname, *args, **kwargs): 154 # run as a script 155 meta = self.model_store.metadata(scriptname) 156 handler = self._ensure_handler_instance(self.get(scriptname)) 157 data = args[0] if args else None 158 kwargs['args'] = args 159 return handler(method='run', data=data, meta=meta, store=self.data_store, tracking=self.tracking, **kwargs) 160
[docs] 161 def reduce(self, modelname, results, rName=None, **kwargs): 162 """ 163 reduce a list of results to a single result 164 165 Use this as the last step in a task canvas 166 167 Args: 168 modelname (str): the name of the virtualobj 169 results (list): the list of results forwarded by task canvas 170 rName (result): the name of the result object 171 **kwargs: 172 173 Returns: 174 result of the virtualobj handler 175 176 See Also 177 om.runtime.mapreduce 178 """ 179 meta = self.model_store.metadata(modelname) 180 handler = self._ensure_handler_instance(self.get(modelname)) 181 return handler(method='reduce', data=results, meta=meta, store=self.model_store, rName=rName, 182 tracking=self.tracking, **kwargs)
183 184 185def virtualobj(fn): 186 """ 187 function decorator to create a virtual object handler from any 188 callable 189 190 Args: 191 fn: the virtual handler function 192 193 Usage: 194 195 .. code:: 196 197 @virtualobj 198 def myvirtualobj(data=None, method=None, meta=None, store=None, **kwargs): 199 ... 200 201 See: 202 VirtualObjectBackend for details on virtual object handlers 203 204 Returns: 205 fn 206 """ 207 setattr(fn, '_omega_virtual', True) 208 return fn 209 210 211class VirtualObjectHandler(object): 212 """ 213 Object-oriented API for virtual object functions 214 """ 215 _omega_virtual = True 216 217 def _vobj_call_map(self): 218 return { 219 'drop': self.drop, 220 'get': self.get, 221 'put': self.put, 222 'predict': self.predict, 223 'run': self.run, 224 } 225 226 def load(self): 227 pass 228 229 def get(self, data=None, meta=None, store=None, **kwargs): 230 raise NotImplementedError 231 232 def put(self, data=None, meta=None, store=None, **kwargs): 233 raise NotImplementedError 234 235 def drop(self, data=None, meta=None, store=None, **kwargs): 236 raise NotImplementedError 237 238 def predict(self, data=None, meta=None, store=None, **kwargs): 239 raise NotImplementedError 240 241 def run(self, data=None, meta=None, store=None, **kwargs): 242 raise NotImplementedError 243 244 def __call__(self, data=None, method=None, meta=None, store=None, tracking=None, **kwargs): 245 MAP = self._vobj_call_map() 246 methodfn = MAP[method] 247 self.load() # ensure loading of imports and other dependencies 248 return methodfn(data=data, meta=meta, store=store, tracking=tracking, **kwargs) 249 250 251class _DillDip: 252 # enhanced dill for functions and classes 253 # - warns about bound variables that could cause issues when undilling 254 # - checks types and functions are defined in __main__ before dilling (so we will code, not references), 255 # dynamically recompiles in __main__ before dilling if source is available 256 # - saves source code for improved python cross-version compatibility 257 # - falls back on standard dill for any other object type 258 # Rationale: 259 # - https://stackoverflow.com/questions/26389981/serialize-a-python-function-with-dependencies 260 # - https://github.com/uqfoundation/dill/issues/424#issuecomment-1598887257 261 262 # magic id for dipped dill objects 263 # -- why 0x1565? 15g of dill dip have 65 kcal 264 # -- see https://www.nutritionvalue.org/Dill_dip%2C_regular_12350210_nutritional_value.html 265 # -- also 42 is overused 266 __calories = 0x1565 267 268 def dumps(self, obj, as_source=False, **dill_kwargs): 269 # enhanced flavor of dill that stores source code for cross-version compatibility 270 # ensure we have a dill'able object 271 # if isinstance(obj, type): 272 # obj = obj() 273 self._check(obj) 274 data = (self._dill_main(obj, **dill_kwargs) or 275 self._dill_types_or_function(obj, as_source=as_source, **dill_kwargs) or 276 self._dill_dill(obj, **dill_kwargs)) 277 return data 278 279 def loads(self, data): 280 # compat: Python 3.8.x < 3.8.2 281 # https://github.com/python/cpython/commit/b19f7ecfa3adc6ba1544225317b9473649815b38 282 # https://docs.python.org/3.8/whatsnew/changelog.html#python-3-8-2-final 283 try: 284 obj = self._dynamic_compile(dill.loads(data), module='__main__') 285 except ModuleNotFoundError as e: 286 # if the functions original module is not known, simulate it 287 # this is to deal with functions created outside of __main__ 288 # see https://stackoverflow.com/q/26193102/890242 289 # https://stackoverflow.com/a/70513630/890242 290 mod = types.ModuleType(e.name, '__dynamic__') 291 sys.modules[e.name] = mod # sys.modules['__main__'] 292 obj = dill.loads(data) 293 return obj 294 295 def _check(self, obj): 296 # check for freevars 297 freevars = dill.detect.nestedglobals(obj) 298 freevars += list(dill.detect.freevars(obj).keys()) 299 freevars += list(dill.detect.referredglobals(obj)) 300 freevars = [n for n in set(freevars) if n not in dir(builtins)] 301 if len(freevars): 302 warnings.warn( 303 f'The {repr(obj)} module references {freevars}, this may lead to errors at runtime; import/declare all variables within method/function scope') 304 305 def _dill_dill(self, obj, **dill_kwargs): 306 # fallback to standard dill 307 # e.g. class instances cannot be dumped unless they come from __main__ 308 return dill.dumps(obj, **dill_kwargs) 309 310 def _dill_main(self, obj, **dill_kwargs): 311 # dynamic __main__ objects can be dilled directly, there is no source code 312 if dill.source.isfrommain(obj) or dill.source.isdynamic(obj): 313 return dill.dumps(obj, **dill_kwargs) 314 return None 315 316 def _dill_types_or_function(self, obj, as_source=False, **dill_kwargs): 317 # classes or functions should be dilled as source, unless they come from __main__ 318 if isinstance(obj, type) or isinstance(obj, types.FunctionType): 319 return self._dill_source(obj, as_source=as_source, **dill_kwargs) 320 return None 321 322 def _dill_source(self, obj, as_source=False, **dill_kwargs): 323 # include source code along dill 324 try: 325 source = dill.source.getsource(obj, lstrip=True) 326 source_obj = {'__dipped__': self.__calories, 327 'source': ''.join(source), 328 'name': getattr(obj, '__name__'), 329 '__dict__': getattr(obj, '__dict__', {})} 330 except: 331 source_obj = {} 332 else: 333 # check obvious references in source 334 if '__main__' in source_obj.get('source', []): 335 warnings.warn(f'The {repr(obj)} references __main__, this may lead to unexpected results') 336 if as_source and source_obj: 337 # if source code was requested, transport as source code 338 data = dill.dumps(source_obj, **dill_kwargs) 339 elif source_obj and dill.detect.getmodule(obj) != '__main__': 340 # we have a source obj, make sure we can dill it and have source to revert from 341 # compile to __main__ module to enable full serialization 342 warnings.warn(f'The {repr(obj)} is defined outside of __main__, recompiling in __main__.') 343 obj = self._dynamic_compile(source_obj, module='__main__') 344 source_obj['dill'] = dill.dumps(obj, **dill_kwargs) 345 data = dill.dumps(source_obj, **dill_kwargs) 346 else: 347 # we have no source object, revert to standard dill 348 if as_source: 349 warnings.warn(f'Cannot save {repr(obj)} as source code, reverting to dill') 350 # could not get source code, revert to dill 351 data = dill.dumps(obj, **dill_kwargs) 352 return data 353 354 def _dynamic_compile(self, obj, module='__main__'): 355 from omegaml.backends.genai.models import GenAIModelHandler, virtual_genai 356 # re-compile source obj in __main__ 357 if self.isdipped(obj): 358 if 'dill' in obj: 359 try: 360 obj = dill.loads(obj['dill']) 361 except: 362 warnings.warn('could not undill, reverting to dynamic compile source code') 363 else: 364 return obj 365 source, data = obj.get('source'), obj.get('__dict__', {}) 366 mod = types.ModuleType(module) 367 mod.__dict__.update({'__compiling__': True, 368 'virtualobj': virtualobj, 369 'virtual_genai': virtual_genai, 370 'VirtualObjectHandler': VirtualObjectHandler, 371 'GenAIModelHandler': GenAIModelHandler}) 372 sys.modules[module] = mod 373 code = compile(source, '<string>', 'exec') 374 exec(code, mod.__dict__) 375 obj = getattr(mod, obj['name']) 376 # restore instance data, if any 377 try: 378 getattr(obj, '__dict__', {}).update(data) 379 except AttributeError: 380 # we ignore attribute errors on class types 381 if not isinstance(obj, type): 382 warnings.warn(f'could not restore instance data for {obj}') 383 return obj 384 385 def isdipped(self, data_or_obj): 386 obj = tryOr(lambda: dill.loads(data_or_obj), None) if not isinstance(data_or_obj, dict) else data_or_obj 387 return isinstance(obj, dict) and obj.get('__dipped__') == self.__calories 388 389 390dilldip = _DillDip() 391# enable recursive tracing of globals 392# -- fixes https://github.com/uqfoundation/dill/issues/255 393# -- see also https://github.com/uqfoundation/dill/issues/537 394# -- the issue only happens when the openai module is loaded (root cause unknown) 395dill.settings['recurse'] = True