1"""
2Native storage for OmegaML using mongodb as the storage layer
3
4An OmegaStore instance is a MongoDB database. It has at least the
5metadata collection which lists all objects stored in it. A metadata
6document refers to the following types of objects (metadata.kind):
7
8* pandas.dfrows - a Pandas DataFrame stored as a collection of rows
9* sklearn.joblib - a scikit learn estimator/pipline dumped using joblib.dump()
10* python.data - an arbitrary python dict, tuple, list stored as a document
11
12Note that storing Pandas and scikit learn objects requires the availability
13of the respective packages. If either can not be imported, the OmegaStore
14degrades to a python.data store only. It will still .list() and get() any
15object, however reverts to pure python objects. In this case it is up
16to the client to convert the data into an appropriate format for processing.
17
18Pandas and scikit-learn objects can only be stored if these packages are
19availables. put() raises a TypeError if you pass such objects and these
20modules cannot be loaded.
21
22All data are stored within the same mongodb, in per-object collections
23as follows:
24
25 * .metadata
26 all metadata. each object is one document,
27 See **omegaml.documents.Metadata** for details
28 * .<bucket>.files
29 this is the GridFS instance used to store
30 blobs (models, numpy, hdf). The actual file name
31 will be <prefix>/<name>.<ext>, where ext is
32 optionally generated by put() / get().
33 * .<bucket>.<prefix>.<name>.data
34 every other dataset is stored in a separate
35 collection (dataframes, dicts, lists, tuples).
36 Any forward slash in prefix is ignored (e.g. 'data/'
37 becomes 'data')
38
39 DataFrames by default are stored in their own collection, every
40 row becomes a document. To store dataframes as a binary file,
41 use `put(...., as_hdf=True).` `.get()` will always return a dataframe.
42
43 Python dicts, lists, tuples are stored as a single document with
44 a `.data` attribute holding the JSON-converted representation. `.get()`
45 will always return the corresponding python object of .data.
46
47 Models are joblib.dump()'ed and ziped prior to transferring into
48 GridFs. .get() will always unzip and joblib.load() before returning
49 the model. Note this requires that the process using .get() supports
50 joblib as well as all python classes referred to. If joblib is not
51 supported, .get() returns a file-like object.
52
53 The .metadata entry specifies the format used to store each
54 object as well as it's location:
55
56 * metadata.kind
57 the type of object
58 * metadata.name
59 the name of the object, as given on put()
60 * metadata.gridfile
61 the gridfs object (if any, null otherwise)
62 * metadata.collection
63 the name of the collection
64 * metadata.attributes
65 arbitrary custom attributes set in
66 put(attributes=obj). This is used e.g. by
67 OmegaRuntime's fit() method to record the
68 data used in the model's training.
69
70 **.put()** and **.get()** use helper methods specific to the type in
71 object's type and metadata.kind, respectively. In the future
72 a plugin system will enable extension to other types.
73"""
74from __future__ import absolute_import
75
76import bson
77import gridfs
78import logging
79import os
80import shutil
81import tempfile
82import warnings
83import weakref
84from datetime import datetime
85from mongoengine.connection import disconnect, \
86 connect, _connections, get_db
87from mongoengine.errors import DoesNotExist
88from mongoengine.fields import GridFSProxy
89from mongoengine.queryset.visitor import Q
90from omegaml.store.fastinsert import fast_insert, default_chunksize
91from omegaml.util import unravel_index, restore_index, make_tuple, jsonescape, \
92 cursor_to_dataframe, convert_dtypes, load_class, extend_instance, ensure_index, PickableCollection, \
93 mongo_compatible, signature
94from uuid import uuid4
95
96from .queryops import sanitize_filter
97from ..documents import make_Metadata, MDREGISTRY
98from ..mongoshim import sanitize_mongo_kwargs, waitForConnection
99from ..util import (is_estimator, is_dataframe, is_ndarray, is_spark_mllib,
100 settings as omega_settings, urlparse, is_series)
101
102logger = logging.getLogger(__name__)
103
104
[docs]
105class OmegaStore(object):
106 """
107 The storage backend for models and data
108 """
109
110 def __init__(self, mongo_url=None, bucket=None, prefix=None, kind=None, defaults=None, dbalias=None):
111 """
112 :param mongo_url: the mongourl to use for the gridfs
113 :param bucket: the mongo collection to use for gridfs
114 :param prefix: the path prefix for files. defaults to blank
115 :param kind: the kind or list of kinds to limit this store to
116 """
117 self.defaults = defaults or omega_settings()
118 self.mongo_url = mongo_url or self.defaults.OMEGA_MONGO_URL
119 self.bucket = bucket or self.defaults.OMEGA_MONGO_COLLECTION
120 self._fs_collection = self._ensure_fs_collection()
121 self._fs = None
122 self._tmppath = os.path.join(self.defaults.OMEGA_TMP, uuid4().hex)
123 self.prefix = prefix or ''
124 self.force_kind = kind
125 self._Metadata_cls = None
126 # don't initialize db here to avoid using the default settings
127 # otherwise Metadata will already have a connection and not use
128 # the one provided in override_settings
129 self._db = None
130 self._dbalias = dbalias
131 # add backends and mixins
132 self._apply_mixins()
133 # register backends
134 self.register_backends()
135 # register finalizer for cleanup
136 weakref.finalize(self, self._cleanup, repr(self), tmppath=str(self._tmppath))
137
138 def __repr__(self):
139 return 'OmegaStore(bucket={}, prefix={})'.format(self.bucket, self.prefix)
140
141 def __equal__(self, other):
142 """test for equality of OmegaStore instances
143
144 Args:
145 other: OmegaStore instance
146
147 Returns:
148 True if other is the same database, same bucket, same prefix
149 """
150 return self.mongo_url == other.mongo_url and self.bucket == other.bucket and self.prefix == other.prefix
151
152 @property
153 def tmppath(self):
154 """
155 return an instance-specific temporary path
156 """
157 os.makedirs(self._tmppath, exist_ok=True)
158 return self._tmppath
159
160 @property
161 def mongodb(self):
162 """
163 Returns a mongo database object
164 """
165 if self._db is not None:
166 return self._db
167 # parse salient parts of mongourl, e.g.
168 # mongodb://user:password@host/dbname
169 self.parsed_url = urlparse.urlparse(self.mongo_url)
170 self.database_name = self.parsed_url.path[1:]
171 host = self.parsed_url.netloc
172 scheme = self.parsed_url.scheme
173 username, password = None, None
174 if '@' in host:
175 creds, host = host.split('@', 1)
176 if ':' in creds:
177 username, password = creds.split(':')
178 # connect via mongoengine
179 #
180 # note this uses a MongoClient in the background, with pooled
181 # connections. there are multiprocessing issues with pymongo:
182 # http://api.mongodb.org/python/3.2/faq.html#using-pymongo-with-multiprocessing
183 # connect=False is due to https://jira.mongodb.org/browse/PYTHON-961
184 # this defers connecting until the first access
185 # serverSelectionTimeoutMS=2500 is to fail fast, the default is 30000
186 #
187 # use an instance specific alias, note that access to Metadata and
188 # QueryCache must pass the very same alias
189 self._dbalias = alias = self._dbalias or 'omega-{}'.format(uuid4().hex)
190 # always disconnect before registering a new connection because
191 # mongoengine.connect() forgets all connection settings upon disconnect
192 if alias not in _connections:
193 disconnect(alias)
194 connection = connect(alias=alias, db=self.database_name,
195 host=f'{scheme}://{host}',
196 username=username,
197 password=password,
198 connect=False,
199 authentication_source='admin',
200 **sanitize_mongo_kwargs(self.defaults.OMEGA_MONGO_SSL_KWARGS),
201 )
202 # since PyMongo 4, connect() no longer waits for connection
203 waitForConnection(connection)
204 self._db = get_db(alias)
205 return self._db
206
207 @property
208 def _Metadata(self):
209 if self._Metadata_cls is None:
210 # hack to localize metadata
211 db = self.mongodb
212 self._Metadata_cls = make_Metadata(db_alias=self._dbalias,
213 collection=self._fs_collection)
214 return self._Metadata_cls
215
216 @property
217 def fs(self):
218 """
219 Retrieve a gridfs instance using url and collection provided
220
221 :return: a gridfs instance
222 """
223 if self._fs is not None:
224 return self._fs
225 self._fs = gridfs.GridFS(self.mongodb, collection=self._fs_collection)
226 self._ensure_fs_index(self._fs)
227 return self._fs
228
245
271
272 def _make_metadata(self, name=None, bucket=None, prefix=None, **kwargs):
273 """
274 create or update a metadata object
275
276 this retrieves a Metadata object if it exists given the kwargs. Only
277 the name, prefix and bucket arguments are considered
278
279 for existing Metadata objects, the attributes kw is treated as follows:
280
281 * attributes=None, the existing attributes are left as is
282 * attributes={}, the attributes value on an existing metadata object
283 is reset to the empty dict
284 * attributes={ some : value }, the existing attributes are updated
285
286 For new metadata objects, attributes defaults to {} if not specified,
287 else is set as provided.
288
289 :param name: the object name
290 :param bucket: the bucket, optional, defaults to self.bucket
291 :param prefix: the prefix, optional, defaults to self.prefix
292 """
293 bucket = bucket or self.bucket
294 prefix = prefix or self.prefix
295 meta = self.metadata(name=name,
296 prefix=prefix,
297 bucket=bucket)
298 if meta:
299 dict_fields = 'attributes', 'kind_meta'
300 for k, v in kwargs.items():
301 if k in dict_fields and v is not None and len(v) > 0:
302 previous = getattr(meta, k, {})
303 previous.update(v)
304 setattr(meta, k, previous)
305 elif k in dict_fields and v is not None and len(v) == 0:
306 setattr(meta, k, {})
307 elif k in dict_fields and v is None:
308 # ignore non specified attributes
309 continue
310 else:
311 # by default set whatever attribute is provided
312 setattr(meta, k, v)
313 else:
314 meta = self._Metadata(name=name, bucket=bucket, prefix=prefix,
315 **kwargs)
316 return meta
317
318 def _drop_metadata(self, name=None, **kwargs):
319 # internal method to delete meta data of an object
320 meta = self.metadata(name, **kwargs)
321 if meta is not None:
322 meta.delete()
323
[docs]
324 def collection(self, name=None, bucket=None, prefix=None):
325 """
326 Returns a mongo db collection as a datastore
327
328 If there is an existing object of name, will return the .collection
329 of the object. Otherwise returns the collection according to naming
330 convention {bucket}.{prefix}.{name}.datastore
331
332 :param name: the collection to use. if none defaults to the
333 collection name given on instantiation. the actual collection name
334 used is always prefix + name + '.data'
335 """
336 # see if we have a known object and a collection for that, if not define it
337 meta = self.metadata(name, bucket=bucket, prefix=prefix)
338 collection = meta.collection if meta else None
339 if not collection:
340 collection = self.object_store_key(name, '.datastore')
341 collection = collection.replace('..', '.')
342 # return the collection
343 try:
344 datastore = getattr(self.mongodb, collection)
345 except Exception as e:
346 raise e
347 return PickableCollection(datastore)
348
349 def _apply_mixins(self):
350 """
351 apply mixins in defaults.OMEGA_STORE_MIXINS
352 """
353 for mixin in self.defaults.OMEGA_STORE_MIXINS:
354 conditional = self._mixins_conditional
355 extend_instance(self, mixin,
356 conditional=conditional)
357
358 def _mixins_conditional(self, cls, obj):
359 return cls.supports(obj) if hasattr(cls, 'supports') else True
360
[docs]
361 def register_backends(self):
362 """
363 register backends in defaults.OMEGA_STORE_BACKENDS
364 """
365 for kind, backend in self.defaults.OMEGA_STORE_BACKENDS.items():
366 self.register_backend(kind, backend)
367
[docs]
368 def register_backend(self, kind, backend):
369 """
370 register a backend class
371
372 :param kind: (str) the backend kind
373 :param backend: (class) the backend class
374 """
375 self.defaults.OMEGA_STORE_BACKENDS[kind] = load_class(backend)
376 if kind not in MDREGISTRY.KINDS:
377 MDREGISTRY.KINDS.append(kind)
378 return self
379
[docs]
380 def register_mixin(self, mixincls):
381 """
382 register a mixin class
383
384 :param mixincls: (class) the mixin class
385 """
386 extend_instance(self, mixincls)
387 return self
388
[docs]
389 def put(self, obj, name, attributes=None, kind=None, replace=False, model_store=None,
390 data_store=None, **kwargs):
391 """
392 Stores an object, store estimators, pipelines, numpy arrays or
393 pandas dataframes
394 """
395 if replace:
396 self.drop(name, force=True)
397 backend = self.get_backend_byobj(obj, name, attributes=attributes, kind=kind,
398 model_store=model_store, data_store=data_store,
399 **kwargs)
400 if backend:
401 return backend.put(obj, name, attributes=attributes, **kwargs)
402 # TODO move all of the specifics to backend implementations
403 if is_estimator(obj):
404 backend = self.get_backend_bykind(MDREGISTRY.SKLEARN_JOBLIB)
405 return backend.put(obj, name, attributes=attributes, **kwargs)
406 elif is_spark_mllib(obj):
407 backend = self.get_backend_bykind(MDREGISTRY.SKLEARN_JOBLIB)
408 return backend.put(obj, name, attributes=attributes, **kwargs)
409 elif is_dataframe(obj) or is_series(obj):
410 groupby = kwargs.get('groupby')
411 if obj.empty:
412 from warnings import warn
413 warn(
414 'Provided dataframe is empty, ignoring it, doing nothing here!')
415 return None
416 if kwargs.pop('as_hdf', False):
417 return self.put_dataframe_as_hdf(
418 obj, name, attributes, **kwargs)
419 elif groupby:
420 return self.put_dataframe_as_dfgroup(
421 obj, name, groupby, attributes)
422 append = kwargs.pop('append', None)
423 timestamp = kwargs.pop('timestamp', None)
424 index = kwargs.pop('index', None)
425 chunksize = kwargs.pop('chunksize', default_chunksize)
426 return self.put_dataframe_as_documents(
427 obj, name, append=append, attributes=attributes, index=index,
428 timestamp=timestamp, chunksize=chunksize, **kwargs)
429 elif is_ndarray(obj):
430 if kwargs.pop('as_pydata', False):
431 return self.put_pyobj_as_document(obj.tolist(), name,
432 attributes=attributes, **kwargs)
433 return self.put_ndarray_as_hdf(obj, name, attributes=attributes,
434 **kwargs)
435 elif isinstance(obj, (dict, list, tuple)):
436 kwargs.pop('as_pydata', None)
437 if kwargs.pop('as_hdf', False):
438 return self.put_pyobj_as_hdf(obj, name,
439 attributes=attributes, **kwargs)
440 return self.put_pyobj_as_document(obj, name,
441 attributes=attributes,
442 **kwargs)
443 else:
444 raise TypeError('type %s not supported' % type(obj))
445
[docs]
446 def put_dataframe_as_documents(self, obj, name, append=None,
447 attributes=None, index=None,
448 timestamp=None, chunksize=None,
449 ensure_compat=True, _fast_insert=fast_insert,
450 **kwargs):
451 """
452 store a dataframe as a row-wise collection of documents
453
454 :param obj: the dataframe to store
455 :param name: the name of the item in the store
456 :param append: if False collection will be dropped before inserting,
457 if True existing documents will persist. Defaults to True. If not
458 specified and rows have been previously inserted, will issue a
459 warning.
460 :param index: list of columns, using +, -, @ as a column prefix to
461 specify ASCENDING, DESCENDING, GEOSPHERE respectively. For @ the
462 column has to represent a valid GeoJSON object.
463 :param timestamp: if True or a field name adds a timestamp. If the
464 value is a boolean or datetime, uses _created as the field name.
465 The timestamp is always datetime.datetime.utcnow(). May be overriden
466 by specifying the tuple (col, datetime).
467 :param ensure_compat: if True attempt to convert obj to mongodb compatibility,
468 set to False only if you are sure to have only compatible values in dataframe.
469 defaults to True. False may reduce memory and increase speed on large dataframes.
470 :return: the Metadata object created
471 """
472 import pandas as pd
473 from .queryops import MongoQueryOps
474 collection = self.collection(name)
475 if is_series(obj):
476 import pandas as pd
477 obj = pd.DataFrame(obj, index=obj.index, columns=[str(obj.name)])
478 store_series = True
479 else:
480 store_series = False
481 if append is False:
482 self.drop(name, force=True)
483 elif append is None and collection.count_documents({}, limit=1):
484 from warnings import warn
485 warn('%s already exists, will append rows' % name)
486 if index:
487 # get index keys
488 if isinstance(index, dict):
489 idx_kwargs = index
490 index = index.pop('columns')
491 else:
492 idx_kwargs = {}
493 # create index with appropriate options
494 keys, idx_kwargs = MongoQueryOps().make_index(index, **idx_kwargs)
495 ensure_index(collection, keys, **idx_kwargs)
496 if timestamp:
497 dt = datetime.utcnow()
498 if isinstance(timestamp, bool):
499 col = '_created'
500 elif isinstance(timestamp, str):
501 col = timestamp
502 elif isinstance(timestamp, datetime):
503 col, dt = '_created', timestamp
504 elif isinstance(timestamp, tuple):
505 col, dt = timestamp
506 else:
507 col = '_created'
508 obj[col] = dt
509 # store dataframe indicies
510 # FIXME this may be a performance issue, use size stored on stats or metadata
511 row_count = self.collection(name).estimated_document_count()
512 # fixes #466, ensure column names are strings in a multiindex
513 if isinstance(obj.columns, pd.MultiIndex):
514 obj.columns = obj.columns.map('_'.join)
515 obj, idx_meta = unravel_index(obj, row_count=row_count)
516 stored_columns = [jsonescape(col) for col in obj.columns]
517 column_map = list(zip(obj.columns, stored_columns))
518 d_column_map = dict(column_map)
519 dtypes = {
520 d_column_map.get(k): v.name
521 for k, v in obj.dtypes.items()
522 }
523 kind_meta = {
524 'columns': column_map,
525 'dtypes': dtypes,
526 'idx_meta': idx_meta
527 }
528 # ensure column names to be strings
529 obj.columns = stored_columns
530 # create mongon indicies for data frame index columns
531 df_idxcols = [col for col in obj.columns if col.startswith('_idx#')]
532 if df_idxcols:
533 keys, idx_kwargs = MongoQueryOps().make_index(df_idxcols)
534 ensure_index(collection, keys, **idx_kwargs)
535 # create index on row id
536 keys, idx_kwargs = MongoQueryOps().make_index(['_om#rowid'])
537 ensure_index(collection, keys, **idx_kwargs)
538 # bulk insert
539 # -- get native objects
540 # -- seems to be required since pymongo 3.3.x. if not converted
541 # pymongo raises Cannot Encode object for int64 types
542 if ensure_compat:
543 for col, col_dtype in dtypes.items():
544 if 'datetime' in col_dtype:
545 obj[col].fillna('', inplace=True)
546 obj = obj.astype('O', errors='ignore')
547 _fast_insert(obj, self, name, chunksize=chunksize)
548 kind = (MDREGISTRY.PANDAS_SEROWS
549 if store_series
550 else MDREGISTRY.PANDAS_DFROWS)
551 meta = self._make_metadata(name=name,
552 prefix=self.prefix,
553 bucket=self.bucket,
554 kind=kind,
555 kind_meta=kind_meta,
556 attributes=attributes,
557 collection=collection.name)
558 return meta.save()
559
[docs]
560 def put_dataframe_as_dfgroup(self, obj, name, groupby, attributes=None):
561 """
562 store a dataframe grouped by columns in a mongo document
563
564 :Example:
565
566 > # each group
567 > {
568 > #group keys
569 > key: val,
570 > _data: [
571 > # only data keys
572 > { key: val, ... }
573 > ]}
574
575 """
576
577 def row_to_doc(obj):
578 for gval, gdf in obj.groupby(groupby):
579 if hasattr(gval, 'astype'):
580 gval = make_tuple(gval.astype('O'))
581 else:
582 gval = make_tuple(gval)
583 doc = dict(zip(groupby, gval))
584 datacols = list(set(gdf.columns) - set(groupby))
585 doc['_data'] = gdf[datacols].astype('O').to_dict('records')
586 yield doc
587
588 datastore = self.collection(name)
589 datastore.drop()
590 datastore.insert_many(row_to_doc(obj))
591 return self._make_metadata(name=name,
592 prefix=self.prefix,
593 bucket=self.bucket,
594 kind=MDREGISTRY.PANDAS_DFGROUP,
595 attributes=attributes,
596 collection=datastore.name).save()
597
598 def put_dataframe_as_hdf(self, obj, name, attributes=None, **kwargs):
599 filename = self.object_store_key(name, '.hdf')
600 hdffname = self._package_dataframe2hdf(obj, filename)
601 with open(hdffname, 'rb') as fhdf:
602 fileid = self.fs.put(fhdf, filename=filename)
603 return self._make_metadata(name=name,
604 prefix=self.prefix,
605 bucket=self.bucket,
606 kind=MDREGISTRY.PANDAS_HDF,
607 attributes=attributes,
608 gridfile=GridFSProxy(db_alias=self._dbalias,
609 grid_id=fileid)).save()
610
[docs]
611 def put_ndarray_as_hdf(self, obj, name, attributes=None, **kwargs):
612 """ store numpy array as hdf
613
614 this is hack, converting the array to a dataframe then storing
615 it
616 """
617 import pandas as pd
618 df = pd.DataFrame(obj)
619 return self.put_dataframe_as_hdf(df, name, attributes=attributes)
620
[docs]
621 def put_pyobj_as_hdf(self, obj, name, attributes=None, **kwargs):
622 """
623 store list, tuple, dict as hdf
624
625 this requires the list, tuple or dict to be convertible into
626 a dataframe
627 """
628 import pandas as pd
629 df = pd.DataFrame(obj)
630 return self.put_dataframe_as_hdf(df, name, attributes=attributes)
631
[docs]
632 def put_pyobj_as_document(self, obj, name, attributes=None, append=True, index=None, as_many=None, **kwargs):
633 """
634 store a dict as a document
635
636 similar to put_dataframe_as_documents no data will be replaced by
637 default. that is, obj is appended as new documents into the objects'
638 mongo collection. to replace the data, specify append=False.
639 """
640 collection = self.collection(name)
641 if append is False:
642 collection.drop()
643 elif append is None and collection.esimated_document_count(limit=1):
644 from warnings import warn
645 warn('%s already exists, will append rows' % name)
646 if index:
647 # create index with appropriate options
648 from omegaml.store import MongoQueryOps
649 if isinstance(index, dict):
650 idx_kwargs = index
651 index = index.pop('columns')
652 else:
653 idx_kwargs = {}
654 index = [f'data.{c}' for c in index]
655 keys, idx_kwargs = MongoQueryOps().make_index(index, **idx_kwargs)
656 ensure_index(collection, keys, **idx_kwargs)
657 if as_many is None:
658 as_many = isinstance(obj, (list, tuple)) and isinstance(obj[0], (list, tuple))
659 if as_many:
660 # list of lists are inserted as many objects, as in pymongo < 4
661 records = (mongo_compatible({'data': item}) for item in obj)
662 result = collection.insert_many(records)
663 objid = result.inserted_ids[-1]
664 else:
665 result = collection.insert_one(mongo_compatible({'data': obj}))
666 objid = result.inserted_id
667
668 return self._make_metadata(name=name,
669 prefix=self.prefix,
670 bucket=self.bucket,
671 kind=MDREGISTRY.PYTHON_DATA,
672 collection=collection.name,
673 attributes=attributes,
674 objid=objid).save()
675
[docs]
676 def drop(self, name, force=False, version=-1, report=False, **kwargs):
677 """
678 Drop the object
679
680 :param name: The name of the object. If the name is a pattern it will
681 be expanded using .list(), and call .drop() on every obj found.
682 :param force: If True ignores DoesNotExist exception, defaults to False
683 meaning this raises a DoesNotExist exception if the name does not
684 exist
685 :param report: if True returns a dict name=>status, where status is True
686 if deleted, False if not deleted
687 :return: True if object was deleted, False if not.
688 If force is True and the object does not exist it will still return True
689 :raises: DoesNotExist if the object does not exist and ```force=False```
690 """
691 is_pattern = '*' in name
692 objs = [name] if not is_pattern else self.list(name)
693 results = []
694 for name in objs:
695 try:
696 backend = self.get_backend(name)
697 if backend is not None:
698 result = backend.drop(name, force=force, version=version, **kwargs)
699 else:
700 result = self._drop(name, force=force, version=version)
701 except Exception as e:
702 results.append((name, False))
703 if not is_pattern:
704 raise
705 else:
706 results.append((name, result))
707 if not objs:
708 result = self._drop(name, force=force, version=version)
709 results.append((name, result))
710 return dict(results) if report else len(results) > 0
711
712 def _drop(self, name, force=False, version=-1, keep_data=False, **kwargs):
713 meta = self.metadata(name, version=version)
714 if meta is None and not force:
715 raise DoesNotExist(name)
716 collection = self.collection(name)
717 if collection and not keep_data:
718 self.mongodb.drop_collection(collection.name)
719 if meta:
720 if meta.collection and not keep_data:
721 self.mongodb.drop_collection(meta.collection)
722 if meta and meta.gridfile is not None and not keep_data:
723 meta.gridfile.delete()
724 self._drop_metadata(name)
725 return True
726 return False
727
[docs]
728 def get_backend_bykind(self, kind, model_store=None, data_store=None,
729 **kwargs):
730 """
731 return the backend by a given object kind
732
733 :param kind: The object kind
734 :param model_store: the OmegaStore instance used to store models
735 :param data_store: the OmegaStore instance used to store data
736 :param kwargs: the kwargs passed to the backend initialization
737 :return: the backend
738 """
739 try:
740 backend_cls = load_class(self.defaults.OMEGA_STORE_BACKENDS[kind])
741 except KeyError as e:
742 raise ValueError('backend {kind} does not exist'.format(**locals()))
743 model_store = model_store or self
744 data_store = data_store or self
745 backend = backend_cls(model_store=model_store,
746 data_store=data_store, **kwargs)
747 return backend
748
[docs]
749 def get_backend(self, name, model_store=None, data_store=None, **kwargs):
750 """
751 return the backend by a given object name
752
753 :param kind: The object kind
754 :param model_store: the OmegaStore instance used to store models
755 :param data_store: the OmegaStore instance used to store data
756 :param kwargs: the kwargs passed to the backend initialization
757 :return: the backend
758 """
759 meta = self.metadata(name)
760 if meta is not None and meta.kind in self.defaults.OMEGA_STORE_BACKENDS:
761 return self.get_backend_bykind(meta.kind,
762 model_store=model_store,
763 data_store=data_store,
764 **kwargs)
765 return None
766
[docs]
767 def help(self, name_or_obj=None, kind=None, raw=False, display=None, renderer=None):
768 """ get help for an object by looking up its backend and calling help() on it
769
770 Retrieves the object's metadata and looks up its corresponding backend. If the
771 metadata.attributes['docs'] is a string it will display this as the help() contents.
772 If the string starts with 'http://' or 'https://' it will open the web page.
773
774 Args:
775 name_or_obj (str|obj): the name or actual object to get help for
776 kind (str): optional, if specified forces retrieval of backend for the given kind
777 raw (bool): optional, if True forces help to be the backend type of the object.
778 If False returns the attributes[docs] on the object's metadata, if available.
779 Defaults to False
780 display (fn): optional, callable for interactive display, defaults to help in
781 if sys.flags.interactive is True, else uses pydoc.render_doc with plaintext
782 renderer (fn): optional, the renderer= argument for pydoc.render_doc to use if
783 sys.flags.interactive is False and display is not provided
784
785 Returns:
786 * help(obj) if python is in interactive mode
787 * text(str) if python is in not interactive mode
788 """
789 import sys
790 import pydoc
791 interactive = bool(display) if display is not None else sys.flags.interactive
792 display = display or help
793 renderer = renderer or pydoc.plaintext
794 obj = self._resolve_help_backend(name_or_obj=name_or_obj, kind=kind, raw=raw)
795 if any(str(obj.__doc__).startswith(v) for v in ('https://', 'http://')):
796 obj = obj.__doc__
797 if interactive and display is help:
798 import webbrowser
799 display = webbrowser.open
800 return display(obj) if interactive else pydoc.render_doc(obj, renderer=renderer)
801
802 def _resolve_help_backend(self, name_or_obj=None, kind=None, raw=False):
803 # helper so we can test help
804 meta = self.metadata(name_or_obj) if name_or_obj else None
805 if kind:
806 backend = self.get_backend_bykind(kind)
807 else:
808 backend = self.get_backend(name_or_obj) or self.get_backend_byobj(name_or_obj)
809 if backend is None:
810 backend = self
811 if not raw and meta is not None and 'docs' in meta.attributes:
812 def UserDocumentation():
813 pass
814
815 basedoc = backend.__doc__ or ''
816 UserDocumentation.__doc__ = (basedoc +
817 meta.attributes['docs'])
818 backend = UserDocumentation
819 return backend
820
[docs]
821 def get_backend_byobj(self, obj, name=None, kind=None, attributes=None,
822 model_store=None, data_store=None, **kwargs):
823 """
824 return the matching backend for the given obj
825
826 Returns:
827 the first backend that supports the given parameters or None
828 """
829 model_store = model_store or self
830 data_store = data_store or self
831 meta = self.metadata(name) if name else None
832 kind = kind or (meta.kind if meta is not None else None)
833 backend = None
834 if kind:
835 objtype = str(type(obj))
836 if kind in self.defaults.OMEGA_STORE_BACKENDS:
837 backend = self.get_backend_bykind(kind, data_store=data_store, model_store=model_store)
838 if not backend.supports(obj, name, attributes=attributes,
839 data_store=data_store,
840 model_store=model_store,
841 kind=kind, **kwargs):
842 warnings.warn('Backend {kind} does not support {objtype}'.format(**locals()))
843 else:
844 pass
845 # TODO refactor pandas and numpy handling into proper backend to avoid misleading warning
846 # warnings.warn('Backend {kind} not found {objtype}. Reverting to default'.format(**locals()))
847 else:
848 for backend_kind, backend_cls in self.defaults.OMEGA_STORE_BACKENDS.items():
849 backend = self.get_backend_bykind(backend_kind, data_store=data_store, model_store=model_store)
850 if backend.supports(obj, name, attributes=attributes,
851 data_store=data_store, model_store=model_store,
852 **kwargs):
853 break
854 else:
855 backend = None
856 return backend
857
[docs]
858 def getl(self, *args, **kwargs):
859 """ return a lazy MDataFrame for a given object
860
861 Same as .get, but returns a MDataFrame
862
863 """
864 return self.get(*args, lazy=True, **kwargs)
865
[docs]
866 def get(self, name, version=-1, force_python=False,
867 kind=None, model_store=None, data_store=None, **kwargs):
868 """
869 Retrieve an object
870
871 :param name: The name of the object
872 :param version: Version of the stored object (not supported)
873 :param force_python: Return as a python object
874 :param kwargs: kwargs depending on object kind
875 :return: an object, estimator, pipelines, data array or pandas dataframe
876 previously stored with put()
877 """
878 meta = self.metadata(name, version=version)
879 if meta is None:
880 return None
881 if not force_python:
882 backend = (self.get_backend(name, model_store=model_store,
883 data_store=data_store)
884 if not kind else self.get_backend_bykind(kind,
885 model_store=model_store,
886 data_store=data_store))
887 if backend is not None:
888 # FIXME: some backends need to get model_store, data_store, but fails tests
889 return backend.get(name, **kwargs) # model_store=model_store, data_store=data_store, **kwargs)
890 if meta.kind == MDREGISTRY.SKLEARN_JOBLIB:
891 backend = self.get_backend(name)
892 return backend.get_model(name)
893 elif meta.kind == MDREGISTRY.SPARK_MLLIB:
894 backend = self.get_backend(name)
895 return backend.get_model(name, version)
896 elif meta.kind == MDREGISTRY.PANDAS_DFROWS:
897 return self.get_dataframe_documents(name, version=version,
898 **kwargs)
899 elif meta.kind == MDREGISTRY.PANDAS_SEROWS:
900 return self.get_dataframe_documents(name, version=version,
901 is_series=True,
902 **kwargs)
903 elif meta.kind == MDREGISTRY.PANDAS_DFGROUP:
904 return self.get_dataframe_dfgroup(
905 name, version=version, **kwargs)
906 elif meta.kind == MDREGISTRY.PYTHON_DATA:
907 return self.get_python_data(name, version=version, **kwargs)
908 elif meta.kind == MDREGISTRY.PANDAS_HDF:
909 return self.get_dataframe_hdf(name, version=version)
910 return self.get_object_as_python(meta, version=version)
911
[docs]
912 def get_dataframe_documents(self, name, columns=None, lazy=False,
913 filter=None, version=-1, is_series=False,
914 chunksize=None, sanitize=True, trusted=None,
915 **kwargs):
916 """
917 Internal method to return DataFrame from documents
918
919 :param name: the name of the object (str)
920 :param columns: the column projection as a list of column names
921 :param lazy: if True returns a lazy representation as an MDataFrame.
922 If False retrieves all data and returns a DataFrame (default)
923 :param filter: the filter to be applied as a column__op=value dict
924 :param sanitize: sanitize filter by removing all $op filter keys,
925 defaults to True. Specify False to allow $op filter keys. $where
926 is always removed as it is considered unsafe.
927 :param version: the version to retrieve (not supported)
928 :param is_series: if True retruns a Series instead of a DataFrame
929 :param kwargs: remaining kwargs are used a filter. The filter kwarg
930 overrides other kwargs.
931 :return: the retrieved object (DataFrame, Series or MDataFrame)
932
933 """
934 from omegaml.store.queryops import sanitize_filter
935 from omegaml.store.filtered import FilteredCollection
936
937 collection = self.collection(name)
938 meta = self.metadata(name)
939 filter = filter or kwargs
940 filter = sanitize_filter(filter, no_ops=sanitize, trusted=trusted)
941 if lazy or chunksize:
942 from ..mdataframe import MDataFrame
943 df = MDataFrame(collection,
944 metadata=meta.kind_meta,
945 columns=columns).query(**filter)
946 if is_series:
947 df = df[0]
948 if chunksize is not None and chunksize > 0:
949 return df.iterchunks(chunksize=chunksize)
950 else:
951 # TODO ensure the same processing is applied in MDataFrame
952 # TODO this method should always use a MDataFrame disregarding lazy
953 if filter:
954 from .query import Filter
955 query = Filter(collection, **filter).query
956 cursor = FilteredCollection(collection).find(filter=query, projection=columns)
957 else:
958 cursor = FilteredCollection(collection).find(projection=columns)
959 # restore dataframe
960 df = cursor_to_dataframe(cursor)
961 if '_id' in df.columns:
962 del df['_id']
963 if hasattr(meta, 'kind_meta'):
964 df = convert_dtypes(df, meta.kind_meta.get('dtypes', {}))
965 # -- restore columns
966 meta_columns = dict(meta.kind_meta.get('columns'))
967 if meta_columns:
968 # apply projection, if any
969 if columns:
970 # get only projected columns
971 # meta_columns is {origin_column: stored_column}
972 orig_columns = dict({k: v for k, v in meta_columns.items()
973 if k in columns or v in columns})
974 else:
975 # restore columns to original name
976 orig_columns = meta_columns
977 df.rename(columns=orig_columns, inplace=True)
978 # -- restore indexes
979 idx_meta = meta.kind_meta.get('idx_meta')
980 if idx_meta:
981 df = restore_index(df, idx_meta)
982 # -- restore row order
983 if is_series:
984 index = df.index
985 name = df.columns[0]
986 df = df[name]
987 df.index = index
988 df.name = None if name == 'None' else name
989 return df
990
[docs]
991 def rebuild_params(self, kwargs, collection):
992 """
993 Returns a modified set of parameters for querying mongodb
994 based on how the mongo document is structured and the
995 fields the document is grouped by.
996
997 **Note: Explicitly to be used with get_grouped_data only**
998
999 :param kwargs: Mongo filter arguments
1000 :param collection: The name of mongodb collection
1001 :return: Returns a set of parameters as dictionary.
1002 """
1003 modified_params = {}
1004 db_structure = collection.find_one({}, {'_id': False})
1005 groupby_columns = list(set(db_structure.keys()) - set(['_data']))
1006 if kwargs is not None:
1007 for item in kwargs:
1008 if item not in groupby_columns:
1009 modified_query_param = '_data.' + item
1010 modified_params[modified_query_param] = kwargs.get(item)
1011 else:
1012 modified_params[item] = kwargs.get(item)
1013 return modified_params
1014
[docs]
1015 def get_dataframe_dfgroup(self, name, version=-1, sanitize=True, kwargs=None):
1016 """
1017 Return a grouped dataframe
1018
1019 :param name: the name of the object
1020 :param version: not supported
1021 :param kwargs: mongo db query arguments to be passed to
1022 collection.find() as a filter.
1023 :param sanitize: remove any $op operators in kwargs
1024
1025 """
1026 import pandas as pd
1027 from omegaml.store.queryops import sanitize_filter
1028 from omegaml.store.filtered import FilteredCollection
1029
1030 def convert_doc_to_row(cursor):
1031 for doc in cursor:
1032 data = doc.pop('_data', [])
1033 for row in data:
1034 doc.update(row)
1035 yield doc
1036
1037 datastore = FilteredCollection(self.collection(name))
1038 kwargs = kwargs if kwargs else {}
1039 params = self.rebuild_params(kwargs, datastore)
1040 params = sanitize_filter(params, no_ops=sanitize)
1041 cursor = datastore.find(params, projection={'_id': False})
1042 df = pd.DataFrame(convert_doc_to_row(cursor))
1043 return df
1044
[docs]
1045 def get_dataframe_hdf(self, name, version=-1):
1046 """
1047 Retrieve dataframe from hdf
1048
1049 :param name: The name of object
1050 :param version: The version of object (not supported)
1051 :return: Returns a python pandas dataframe
1052 :raises: gridfs.errors.NoFile
1053 """
1054 df = None
1055 meta = self.metadata(name)
1056 filename = getattr(meta.gridfile, 'name', self.object_store_key(name, '.hdf'))
1057 if self.fs.exists(filename=filename):
1058 df = self._extract_dataframe_hdf(filename, version=version)
1059 return df
1060 else:
1061 raise gridfs.errors.NoFile(
1062 "{0} does not exist in mongo collection '{1}'".format(
1063 name, self.bucket))
1064
[docs]
1065 def get_python_data(self, name, filter=None, version=-1, lazy=False, trusted=False, **kwargs):
1066 """
1067 Retrieve objects as python data
1068
1069 :param name: The name of object
1070 :param version: The version of object
1071
1072 :return: Returns the object as python list object
1073 """
1074 datastore = self.collection(name)
1075 filter = filter or kwargs
1076 sanitize_filter(filter) if trusted is False or trusted != signature(filter) else filter
1077 cursor = datastore.find(filter, **kwargs)
1078 if lazy:
1079 return cursor
1080 data = (d.get('data') for d in cursor)
1081 return list(data)
1082
[docs]
1083 def get_object_as_python(self, meta, version=-1):
1084 """
1085 Retrieve object as python object
1086
1087 :param meta: The metadata object
1088 :param version: The version of the object
1089
1090 :return: Returns data as python object
1091 """
1092 if meta.kind == MDREGISTRY.SKLEARN_JOBLIB:
1093 return meta.gridfile
1094 if meta.kind == MDREGISTRY.PANDAS_HDF:
1095 return meta.gridfile
1096 if meta.kind == MDREGISTRY.PANDAS_DFROWS:
1097 return list(getattr(self.mongodb, meta.collection).find())
1098 if meta.kind == MDREGISTRY.PYTHON_DATA:
1099 col = getattr(self.mongodb, meta.collection)
1100 return col.find_one(dict(_id=meta.objid)).get('data')
1101 raise TypeError('cannot return kind %s as a python object' % meta.kind)
1102
1103 def __iter__(self):
1104 for f in self.list(include_temp=True):
1105 yield f
1106
1107 @property
1108 def buckets(self):
1109 return ['default' if b == self.defaults.OMEGA_MONGO_COLLECTION else b
1110 for b in self._Metadata.objects.distinct('bucket')]
1111
[docs]
1112 def list(self, pattern=None, regexp=None, kind=None, raw=False, hidden=None,
1113 include_temp=False, bucket=None, prefix=None, filter=None):
1114 """
1115 List all files in store
1116
1117 specify pattern as a unix pattern (e.g. :code:`models/*`,
1118 or specify regexp)
1119
1120 :param pattern: the unix file pattern or None for all
1121 :param regexp: the regexp. takes precedence over pattern
1122 :param raw: if True return the meta data objects
1123 :param filter: specify additional filter criteria, optional
1124 :return: List of files in store
1125
1126 """
1127 regex = lambda pattern: bson.regex.Regex(f'{pattern}')
1128 db = self.mongodb
1129 searchkeys = dict(bucket=bucket or self.bucket,
1130 prefix=prefix or self.prefix)
1131 q_excludes = Q()
1132 if regexp:
1133 searchkeys['name'] = regex(regexp)
1134 elif pattern:
1135 re_pattern = pattern.replace('*', '.*').replace('/', r'\/')
1136 searchkeys['name'] = regex(f'^{re_pattern}$')
1137 if not include_temp:
1138 q_excludes &= Q(name__not__startswith='_')
1139 q_excludes &= Q(name__not=regex(r'(.{1,*}\/?_.*)'))
1140 if not hidden:
1141 q_excludes &= Q(name__not__startswith='.')
1142 if kind or self.force_kind:
1143 kind = kind or self.force_kind
1144 if isinstance(kind, (tuple, list)):
1145 searchkeys.update(kind__in=kind)
1146 else:
1147 searchkeys.update(kind=kind)
1148 if filter:
1149 searchkeys.update(filter)
1150 q_search = Q(**searchkeys) & q_excludes
1151 files = self._Metadata.objects.no_cache()(q_search)
1152 return [f if raw else str(f.name).replace('.omm', '') for f in files]
1153
[docs]
1154 def exists(self, name, hidden=False):
1155 """ check if object exists
1156
1157 Args:
1158 name (str): name of object
1159 hidden (bool): if True, include hidden files, defaults to
1160 False, unless name starts with '.'
1161
1162 Returns:
1163 bool, True if object exists
1164
1165 .. versionchanged:: 0.16.4
1166 hidden defaults to True if name starts with '.'
1167 """
1168 hidden = True if name.startswith('.') else hidden
1169 return name in self.list(name, hidden=hidden)
1170
[docs]
1171 def object_store_key(self, name, ext, hashed=None):
1172 """
1173 Returns the store key
1174
1175 Unless you write a mixin or a backend you should not use this method
1176
1177 :param name: The name of object
1178 :param ext: The extension of the filename
1179 :param hashed: hash the key to support arbitrary name length, defaults
1180 to defaults.OMEGA_STORE_HASHEDNAMES, True by default since 0.13.7
1181
1182 :return: A filename with relative bucket, prefix and name
1183 """
1184 # byte string
1185 _u8 = lambda t: t.encode('UTF-8', 'replace') if isinstance(t, str) else t
1186 key = self._get_obj_store_key(name, ext)
1187 hashed = hashed if hashed is not None else self.defaults.OMEGA_STORE_HASHEDNAMES
1188 if hashed:
1189 from hashlib import sha1
1190 # SEC: CWE-916
1191 # - status: wontfix
1192 # - reason: hashcode is used purely for name resolution, not a security function
1193 hasher = sha1()
1194 hasher.update(_u8(key))
1195 key = hasher.hexdigest()
1196 return key
1197
1198 def _get_obj_store_key(self, name, ext, prefix=None, bucket=None):
1199 # backwards compatilibity implementation of object_store_key()
1200 name = '%s.%s' % (name, ext) if not name.endswith(ext) else name
1201 filename = '{bucket}.{prefix}.{name}'.format(
1202 bucket=bucket or self.bucket,
1203 prefix=prefix or self.prefix,
1204 name=name,
1205 ext=ext).replace('/', '_').replace('..', '.')
1206 return filename
1207
1208 def _package_dataframe2hdf(self, df, filename, key=None):
1209 """
1210 Package a dataframe as a hdf file
1211
1212 :param df: The dataframe
1213 :param filename: Name of file
1214
1215 :return: Filename of hdf file
1216 """
1217 lpath = tempfile.mkdtemp()
1218 fname = os.path.basename(filename)
1219 hdffname = os.path.join(self.tmppath, fname + '.hdf')
1220 key = key or 'data'
1221 df.to_hdf(hdffname, key)
1222 return hdffname
1223
1224 def _extract_dataframe_hdf(self, filename, version=-1):
1225 """
1226 Extracts a dataframe from a stored hdf file
1227
1228 :param filename: The name of file
1229 :param version: The version of file
1230
1231 :return: Pandas dataframe
1232 """
1233 import pandas as pd
1234 hdffname = os.path.join(self.tmppath, filename)
1235 dirname = os.path.dirname(hdffname)
1236 if not os.path.exists(dirname):
1237 os.makedirs(dirname)
1238 try:
1239 outf = self.fs.get_version(filename, version=version)
1240 except gridfs.errors.NoFile as e:
1241 raise e
1242 with open(hdffname, 'wb') as hdff:
1243 hdff.write(outf.read())
1244 hdf = pd.HDFStore(hdffname)
1245 key = list(hdf.keys())[0]
1246 df = hdf[key]
1247 hdf.close()
1248 return df
1249
1250 def _ensure_fs_collection(self):
1251 # ensure backwards-compatible gridfs access
1252 if self.defaults.OMEGA_BUCKET_FS_LEGACY:
1253 # prior to 0.13.2 a single gridfs instance was used, always equal to the default collection
1254 return self.defaults.OMEGA_MONGO_COLLECTION
1255 if self.bucket == self.defaults.OMEGA_MONGO_COLLECTION:
1256 # from 0.13.2 onwards, only the default bucket is equal to the default collection
1257 # backwards compatibility for existing installations
1258 return self.bucket
1259 # since 0.13.2, all buckets other than the default use a qualified collection name to
1260 # effectively separate files in different buckets, enabling finer-grade access control
1261 # and avoiding name collisions from different buckets
1262 return '{}_{}'.format(self.defaults.OMEGA_MONGO_COLLECTION, self.bucket)
1263
1264 def _ensure_fs_index(self, fs):
1265 # make sure we have proper chunks and file indicies. this should be created on first write, but sometimes is not
1266 # see https://docs.mongodb.com/manual/core/gridfs/#gridfs-indexes
1267 # pymongo since 4.1 no longer has fs._GridFS
1268 chunks_collection = fs._GridFS__chunks if hasattr(fs, '_GridFS__chunks') else fs._chunks
1269 files_collection = fs._GridFS__files if hasattr(fs, '_GridFS__files') else fs._files
1270 ensure_index(chunks_collection, {'files_id': 1, 'n': 1}, unique=True)
1271 ensure_index(files_collection, {'filename': 1, 'uploadDate': 1})
1272
1273 def sign(self, filter):
1274 return signature(filter)
1275
1276 @staticmethod
1277 def _cleanup(objrepr, tmppath=None):
1278 # cleanup any temporary files on exit
1279 # -- this is called as a finalizer (weakref.finalize)
1280 try:
1281 logger.debug(f'finalizing {objrepr}: cleaning up temporary files in {tmppath}')
1282 shutil.rmtree(tmppath, ignore_errors=True)
1283 except Exception as e:
1284 logger.error(f'finalizing {objrepr}: error occured as {e}')