Source code for omegaml.mixins.store.promotion
1from pathlib import Path
2
3from functools import partial
4
5from omegaml.documents import MDREGISTRY
6from omegaml.util import dict_merge
7
8
[docs]
9class PromotionMixin(object):
10 """ Promote objects from one bucket to another
11
12 Promotion Methods:
13 * `getput` - performs target.put(source.get()) and copies metadata
14 attributes by merging target.metadata's .attributes and .kind_meta
15 * `metadata` - creates a new metadata data entry in target, copying
16 metadata attributes and kind_meta. Does not get/put the object itself
17 (i.e. no associated data is promoted).
18 * `data` - like `getput`, but does not merge metadata
19 * `export` - performs .to_archive() and .from_archive(), effectively
20 copying metadata, the associated gridfile (if available) and collection
21 data (if available). This is equivalent to `om runtime export`.
22
23 The default promotion method is getput(), or the object's backend.PROMOTE
24 method, if specified.
25
26 Some object backends provide a default promotion other than getput:
27
28 * sqlalchemy.conx - uses the `metadata` promotion, effectively copying only
29 metadata. Use promote(..., method='metadata,data') to also promote data
30
31 * virtualobj.dill - uses the `export` promotion, effectively copying all
32 metadata and the associated @virtualobj function. If the source object
33 is a versioned model, this copies the current version and metadata. To
34 copy a specific version, use promote('model@version'). To create a
35 new version in the target bucket use promote(..., method='getput').
36 """
37 DEFAULT_DROP = {
38 'models/': False, # models are versioned, don't drop
39 }
40
41 @classmethod
42 def supports(cls, store, **kwargs):
43 return store.prefix in ('data/', 'jobs/', 'models/', 'scripts/', 'streams/')
44
[docs]
45 def promote(self, name, other, asname=None, drop=None, method='default', get=None,
46 put=None, **kwargs):
47 """ Promote object to another store.
48
49 This effectively copies the object. If the objects exists in the
50 target it will be replaced.
51
52 Args:
53 name: The name of the object
54 other: the OmegaStore instance to promote to
55 asname: the name to use in other, defaults to .metadata(name).name
56 drop (bool): if True calls other.drop(force=True) before promoting, defaults to False
57 method (str|list): specify the method or multiple methods in sequence, available methods
58 are 'default', 'getput', 'metadata', 'data'. For 'default', the object backend's
59 .PROMOTE property is used, defaulting to 'getput'
60 get (dict): optional, specifies the store.get(**kwargs)
61 put (dict): optional, specifies the other.put(**kwargs)
62 kwargs: additional kwargs are passed to the initial other.put(), for metadata promotion
63
64 Returns:
65 The Metadata of the new object
66 """
67 from omegaml.store import OmegaStore
68 assert isinstance(other, OmegaStore), f"specify promote(..., other, ...) as om.models, not {type(other)}"
69 drop = drop if drop is not None else self.DEFAULT_DROP.get(self.prefix, True)
70 # sanity checks
71 # -- promotion by same name requires a different store
72 meta = self.metadata(name)
73 asname = asname or meta.name
74 if name == asname and self == other:
75 raise ValueError(f'must specify asname= different from {meta.name}')
76 # see if the backend supports explicit promotion
77 backend = self.get_backend(name)
78 if hasattr(backend, 'promote'):
79 return backend.promote(name, other, asname=asname, drop=drop, get=get, put=put,
80 method=method, **kwargs)
81 # run all promotion methods as requested or provided by the backend
82 methods = method.split(',') if isinstance(method, str) else method
83 promotion_methods = self.promotion_methods(name, other, asname=asname, drop=drop,
84 get=get, put=put, backend=backend, **kwargs)
85 [promotion_methods[m]() for m in methods]
86 return other.metadata(asname)
87
88 def promotion_methods(self, name, other, asname=None, drop=None,
89 get=None, put=None, backend=None, **kwargs):
90 # do default promotion, i.e. get()/put()
91 PROMOTION_METHODS = {
92 'getput': partial(self._get_put_promotion, name, other, asname=asname, drop=drop),
93 'data': partial(self._data_promotion, name, other, asname=asname, get=get, put=put,
94 **kwargs),
95 'metadata': partial(self._metadata_promotion, name, other, asname=None, drop=False,
96 get=get, put=put, **kwargs),
97 'export': partial(self._export_promotion, name, other, asname=None, drop=False,
98 get=get, put=put, **kwargs),
99 }
100 default_method = PROMOTION_METHODS[getattr(backend, 'PROMOTE', 'getput')]
101 PROMOTION_METHODS['default'] = lambda *args, **kwargs: default_method(*args, **kwargs)
102 return PROMOTION_METHODS
103
104 def _get_put_promotion(self, name, other, asname=None, drop=None, get=None, put=None, **kwargs):
105 get_kwargs = get or kwargs
106 put_kwargs = put or kwargs
107 meta = self.metadata(name)
108 obj = self.get(name, **get_kwargs)
109 asname = asname or meta.name
110 other.drop(asname, force=True) if drop else None
111 # TODO refactor to promote of python native data backend
112 if meta.kind == MDREGISTRY.PYTHON_DATA:
113 # in case of native python data we get back a list of
114 # all previously inserted objects. do the same in other
115 [other.put(o, asname) for o in obj]
116 other_meta = other.metadata(asname)
117 else:
118 other_meta = other.put(obj, asname, **put_kwargs)
119 # promote metadata, exception versions
120 # TODO: move versions to kind_meta, which is not promoted
121 meta.attributes.pop('versions', None)
122 dict_merge(other_meta.attributes, meta.attributes)
123 other_meta.save()
124 return other_meta
125
126 def _metadata_promotion(self, name, other, asname=None, drop=False, **kwargs):
127 """ promote metadata
128
129 This is called by PromotionMixin.promote() to promote metadata
130 to another store, not data.
131
132 Args:
133 name (str): the name of the object
134 other (om.datasets): the other om.datasets
135
136 Returns:
137 Metadata of the promoted dataset as the result of other.put()
138 """
139 meta = self.get.metadata(name)
140 asname = asname or meta.name
141 other.drop(asname, force=True) if drop else None
142 other_meta = other.make_metadata(asname, meta.kind, bucket=other.bucket,
143 prefix=other.prefix, **kwargs)
144 other_meta.kind_meta.update(meta.kind_meta)
145 other_meta.attributes.update(meta.attributes)
146 other_meta.save()
147 return other_meta
148
149 def _data_promotion(self, name, other, asname=None, get=None, put=None, **kwargs):
150 """ promote data
151
152 This is called by PromotionMixin.promote() to promote an object's data
153 to another store. If source.get() returns an interator, target.put() will
154 be called for every iteration.
155 """
156 asname = asname or name
157 get_kwargs = get or kwargs
158 put_kwargs = put or kwargs
159 to_copy = self.get(name, **get_kwargs)
160 if hasattr(to_copy, '__iter__'):
161 for chunk in to_copy:
162 other.put(chunk, asname, **put_kwargs)
163 else:
164 other.put(to_copy, asname, **put_kwargs)
165 other_meta = other.metadata(asname)
166 return other_meta.save()
167
168 def _export_promotion(self, name, other, asname=None, get=None, put=None, **kwargs):
169 """ promote as export/import
170
171 This is called by PromotionMixin.promote() to promote an object's data
172 to another store in the way that the `om runtime export/import` commands do.
173 This includes exporting all data and metadata associated with an object.
174 """
175 asname = asname or name
176 get_kwargs = get or kwargs
177 put_kwargs = put or kwargs
178 arc = self.to_archive(name, Path(self.tmppath) / asname, **get_kwargs)
179 other_meta = other.from_archive(arc, asname, **put_kwargs)
180 return other_meta