1from omegaml.backends.basedata import BaseDataBackend
2from omegaml.mdataframe import MDataFrame
3from omegaml.util import json_normalize, PickableCollection
4from pymongo.collection import Collection
5
6
[docs]
7class PandasRawDictBackend(BaseDataBackend):
8 """
9 OmegaStore backend to support arbitrary collections
10
11 Usage::
12
13 # store any collection as part of metadata
14 coll = db['some_collection']
15 om.datasets.put(coll, 'foo')
16 => Metadata(name='foo', collection='some_collection', ...)
17 # parse the collection using pandas.io.json_normalize
18 df = om.datasets.get('foo')
19 # use an alternate parser that accepts dict|list(dict)
20 df = om.datasets.get('foo', parser=some_fn)
21 # get a MDataFrame
22 om.datasets.getl('foo')
23 # preserve all document keys, including _id
24 om.datasets.getl('foo', raw=True)
25 """
26 KIND = 'pandas.rawdict'
27
[docs]
28 @classmethod
29 def supports(self, obj, name, as_raw=None, data_store=None, **kwargs):
30 new_as_raw = (as_raw and isinstance(obj, (dict, list, tuple)))
31 new_as_collection = isinstance(obj, (Collection, PickableCollection))
32 exists_as_dict = not (new_as_raw or new_as_collection) and (name and data_store.metadata(name) is not None)
33 return new_as_raw or new_as_collection or exists_as_dict
34
[docs]
35 def get(self, name, version=-1, lazy=False, raw=False, parser=None, filter=None, **kwargs):
36 collection = self.data_store.collection(name)
37 # json_normalize needs a list of dicts to work, not a generator
38 json_normalizer = lambda v: json_normalize([r for r in v])
39 parser = parser or json_normalizer
40 query = filter or kwargs
41 mdf = MDataFrame(collection, query=query, parser=parser, raw=raw, **kwargs)
42 return mdf if lazy else mdf.value
43
[docs]
44 def put(self, obj, name, attributes=None, as_raw=None, **kwargs):
45 if isinstance(obj, (Collection, PickableCollection)):
46 # already a collection, import it to metadata
47 collection = obj
48 elif isinstance(obj, dict):
49 # actual data, a single document, just insert
50 collection = self.data_store.collection(name)
51 collection.insert_one(obj)
52 elif isinstance(obj, (list, tuple)) or hasattr(obj, '__iter__'):
53 # actual data, multiple documents, insert many
54 collection = self.data_store.collection(name)
55 collection.insert_many(obj)
56 else:
57 raise ValueError(f'cannot insert object of type {type(obj)}')
58 meta = self.data_store._make_metadata(name,
59 kind=self.KIND,
60 collection=collection.name,
61 attributes=attributes,
62 **kwargs.get('meta_kwargs', {}))
63 return meta.save()