Source code for omegaml.backends.rawdict

 1from omegaml.backends.basedata import BaseDataBackend
 2from omegaml.mdataframe import MDataFrame
 3from omegaml.util import json_normalize, PickableCollection
 4from pymongo.collection import Collection
 5
 6
[docs] 7class PandasRawDictBackend(BaseDataBackend): 8 """ 9 OmegaStore backend to support arbitrary collections 10 11 Usage:: 12 13 # store any collection as part of metadata 14 coll = db['some_collection'] 15 om.datasets.put(coll, 'foo') 16 => Metadata(name='foo', collection='some_collection', ...) 17 # parse the collection using pandas.io.json_normalize 18 df = om.datasets.get('foo') 19 # use an alternate parser that accepts dict|list(dict) 20 df = om.datasets.get('foo', parser=some_fn) 21 # get a MDataFrame 22 om.datasets.getl('foo') 23 # preserve all document keys, including _id 24 om.datasets.getl('foo', raw=True) 25 """ 26 KIND = 'pandas.rawdict' 27
[docs] 28 @classmethod 29 def supports(self, obj, name, as_raw=None, data_store=None, **kwargs): 30 new_as_raw = (as_raw and isinstance(obj, (dict, list, tuple))) 31 new_as_collection = isinstance(obj, (Collection, PickableCollection)) 32 exists_as_dict = not (new_as_raw or new_as_collection) and (name and data_store.metadata(name) is not None) 33 return new_as_raw or new_as_collection or exists_as_dict
34
[docs] 35 def get(self, name, version=-1, lazy=False, raw=False, parser=None, filter=None, **kwargs): 36 collection = self.data_store.collection(name) 37 # json_normalize needs a list of dicts to work, not a generator 38 json_normalizer = lambda v: json_normalize([r for r in v]) 39 parser = parser or json_normalizer 40 query = filter or kwargs 41 mdf = MDataFrame(collection, query=query, parser=parser, raw=raw, **kwargs) 42 return mdf if lazy else mdf.value
43
[docs] 44 def put(self, obj, name, attributes=None, as_raw=None, **kwargs): 45 if isinstance(obj, (Collection, PickableCollection)): 46 # already a collection, import it to metadata 47 collection = obj 48 elif isinstance(obj, dict): 49 # actual data, a single document, just insert 50 collection = self.data_store.collection(name) 51 collection.insert_one(obj) 52 elif isinstance(obj, (list, tuple)) or hasattr(obj, '__iter__'): 53 # actual data, multiple documents, insert many 54 collection = self.data_store.collection(name) 55 collection.insert_many(obj) 56 else: 57 raise ValueError(f'cannot insert object of type {type(obj)}') 58 meta = self.data_store._make_metadata(name, 59 kind=self.KIND, 60 collection=collection.name, 61 attributes=attributes, 62 **kwargs.get('meta_kwargs', {})) 63 return meta.save()