Source code for omegaml.mixins.store.promotion

  1from pathlib import Path
  2
  3from functools import partial
  4
  5from omegaml.documents import MDREGISTRY
  6from omegaml.util import dict_merge
  7
  8
[docs] 9class PromotionMixin(object): 10 """ Promote objects from one bucket to another 11 12 Promotion Methods: 13 * `getput` - performs target.put(source.get()) and copies metadata 14 attributes by merging target.metadata's .attributes and .kind_meta 15 * `metadata` - creates a new metadata data entry in target, copying 16 metadata attributes and kind_meta. Does not get/put the object itself 17 (i.e. no associated data is promoted). 18 * `data` - like `getput`, but does not merge metadata 19 * `export` - performs .to_archive() and .from_archive(), effectively 20 copying metadata, the associated gridfile (if available) and collection 21 data (if available). This is equivalent to `om runtime export`. 22 23 The default promotion method is getput(), or the object's backend.PROMOTE 24 method, if specified. 25 26 Some object backends provide a default promotion other than getput: 27 28 * sqlalchemy.conx - uses the `metadata` promotion, effectively copying only 29 metadata. Use promote(..., method='metadata,data') to also promote data 30 31 * virtualobj.dill - uses the `export` promotion, effectively copying all 32 metadata and the associated @virtualobj function. If the source object 33 is a versioned model, this copies the current version and metadata. To 34 copy a specific version, use promote('model@version'). To create a 35 new version in the target bucket use promote(..., method='getput'). 36 """ 37 DEFAULT_DROP = { 38 'models/': False, # models are versioned, don't drop 39 } 40 41 @classmethod 42 def supports(cls, store, **kwargs): 43 return store.prefix in ('data/', 'jobs/', 'models/', 'scripts/', 'streams/') 44
[docs] 45 def promote(self, name, other, asname=None, drop=None, method='default', get=None, 46 put=None, **kwargs): 47 """ Promote object to another store. 48 49 This effectively copies the object. If the objects exists in the 50 target it will be replaced. 51 52 Args: 53 name: The name of the object 54 other: the OmegaStore instance to promote to 55 asname: the name to use in other, defaults to .metadata(name).name 56 drop (bool): if True calls other.drop(force=True) before promoting, defaults to False 57 method (str|list): specify the method or multiple methods in sequence, available methods 58 are 'default', 'getput', 'metadata', 'data'. For 'default', the object backend's 59 .PROMOTE property is used, defaulting to 'getput' 60 get (dict): optional, specifies the store.get(**kwargs) 61 put (dict): optional, specifies the other.put(**kwargs) 62 kwargs: additional kwargs are passed to the initial other.put(), for metadata promotion 63 64 Returns: 65 The Metadata of the new object 66 """ 67 from omegaml.store import OmegaStore 68 assert isinstance(other, OmegaStore), f"specify promote(..., other, ...) as om.models, not {type(other)}" 69 drop = drop if drop is not None else self.DEFAULT_DROP.get(self.prefix, True) 70 # sanity checks 71 # -- promotion by same name requires a different store 72 meta = self.metadata(name) 73 asname = asname or meta.name 74 if name == asname and self == other: 75 raise ValueError(f'must specify asname= different from {meta.name}') 76 # see if the backend supports explicit promotion 77 backend = self.get_backend(name) 78 if hasattr(backend, 'promote'): 79 return backend.promote(name, other, asname=asname, drop=drop, get=get, put=put, 80 method=method, **kwargs) 81 # run all promotion methods as requested or provided by the backend 82 methods = method.split(',') if isinstance(method, str) else method 83 promotion_methods = self.promotion_methods(name, other, asname=asname, drop=drop, 84 get=get, put=put, backend=backend, **kwargs) 85 [promotion_methods[m]() for m in methods] 86 return other.metadata(asname)
87 88 def promotion_methods(self, name, other, asname=None, drop=None, 89 get=None, put=None, backend=None, **kwargs): 90 # do default promotion, i.e. get()/put() 91 PROMOTION_METHODS = { 92 'getput': partial(self._get_put_promotion, name, other, asname=asname, drop=drop), 93 'data': partial(self._data_promotion, name, other, asname=asname, get=get, put=put, 94 **kwargs), 95 'metadata': partial(self._metadata_promotion, name, other, asname=None, drop=False, 96 get=get, put=put, **kwargs), 97 'export': partial(self._export_promotion, name, other, asname=None, drop=False, 98 get=get, put=put, **kwargs), 99 } 100 default_method = PROMOTION_METHODS[getattr(backend, 'PROMOTE', 'getput')] 101 PROMOTION_METHODS['default'] = lambda *args, **kwargs: default_method(*args, **kwargs) 102 return PROMOTION_METHODS 103 104 def _get_put_promotion(self, name, other, asname=None, drop=None, get=None, put=None, **kwargs): 105 get_kwargs = get or kwargs 106 put_kwargs = put or kwargs 107 meta = self.metadata(name) 108 obj = self.get(name, **get_kwargs) 109 asname = asname or meta.name 110 other.drop(asname, force=True) if drop else None 111 # TODO refactor to promote of python native data backend 112 if meta.kind == MDREGISTRY.PYTHON_DATA: 113 # in case of native python data we get back a list of 114 # all previously inserted objects. do the same in other 115 [other.put(o, asname) for o in obj] 116 other_meta = other.metadata(asname) 117 else: 118 other_meta = other.put(obj, asname, **put_kwargs) 119 # promote metadata, exception versions 120 # TODO: move versions to kind_meta, which is not promoted 121 meta.attributes.pop('versions', None) 122 dict_merge(other_meta.attributes, meta.attributes) 123 other_meta.save() 124 return other_meta 125 126 def _metadata_promotion(self, name, other, asname=None, drop=False, **kwargs): 127 """ promote metadata 128 129 This is called by PromotionMixin.promote() to promote metadata 130 to another store, not data. 131 132 Args: 133 name (str): the name of the object 134 other (om.datasets): the other om.datasets 135 136 Returns: 137 Metadata of the promoted dataset as the result of other.put() 138 """ 139 meta = self.get.metadata(name) 140 asname = asname or meta.name 141 other.drop(asname, force=True) if drop else None 142 other_meta = other.make_metadata(asname, meta.kind, bucket=other.bucket, 143 prefix=other.prefix, **kwargs) 144 other_meta.kind_meta.update(meta.kind_meta) 145 other_meta.attributes.update(meta.attributes) 146 other_meta.save() 147 return other_meta 148 149 def _data_promotion(self, name, other, asname=None, get=None, put=None, **kwargs): 150 """ promote data 151 152 This is called by PromotionMixin.promote() to promote an object's data 153 to another store. If source.get() returns an interator, target.put() will 154 be called for every iteration. 155 """ 156 asname = asname or name 157 get_kwargs = get or kwargs 158 put_kwargs = put or kwargs 159 to_copy = self.get(name, **get_kwargs) 160 if hasattr(to_copy, '__iter__'): 161 for chunk in to_copy: 162 other.put(chunk, asname, **put_kwargs) 163 else: 164 other.put(to_copy, asname, **put_kwargs) 165 other_meta = other.metadata(asname) 166 return other_meta.save() 167 168 def _export_promotion(self, name, other, asname=None, get=None, put=None, **kwargs): 169 """ promote as export/import 170 171 This is called by PromotionMixin.promote() to promote an object's data 172 to another store in the way that the `om runtime export/import` commands do. 173 This includes exporting all data and metadata associated with an object. 174 """ 175 asname = asname or name 176 get_kwargs = get or kwargs 177 put_kwargs = put or kwargs 178 arc = self.to_archive(name, Path(self.tmppath) / asname, **get_kwargs) 179 other_meta = other.from_archive(arc, asname, **put_kwargs) 180 return other_meta