1"""
2Native storage for OmegaML using mongodb as the storage layer
3
4An OmegaStore instance is a MongoDB database. It has at least the
5metadata collection which lists all objects stored in it. A metadata
6document refers to the following types of objects (metadata.kind):
7
8* pandas.dfrows - a Pandas DataFrame stored as a collection of rows
9* sklearn.joblib - a scikit learn estimator/pipline dumped using joblib.dump()
10* python.data - an arbitrary python dict, tuple, list stored as a document
11
12Note that storing Pandas and scikit learn objects requires the availability
13of the respective packages. If either can not be imported, the OmegaStore
14degrades to a python.data store only. It will still .list() and get() any
15object, however reverts to pure python objects. In this case it is up
16to the client to convert the data into an appropriate format for processing.
17
18Pandas and scikit-learn objects can only be stored if these packages are
19availables. put() raises a TypeError if you pass such objects and these
20modules cannot be loaded.
21
22All data are stored within the same mongodb, in per-object collections
23as follows:
24
25 * .metadata
26 all metadata. each object is one document,
27 See **omegaml.documents.Metadata** for details
28 * .<bucket>.files
29 this is the GridFS instance used to store
30 blobs (models, numpy, hdf). The actual file name
31 will be <prefix>/<name>.<ext>, where ext is
32 optionally generated by put() / get().
33 * .<bucket>.<prefix>.<name>.data
34 every other dataset is stored in a separate
35 collection (dataframes, dicts, lists, tuples).
36 Any forward slash in prefix is ignored (e.g. 'data/'
37 becomes 'data')
38
39 DataFrames by default are stored in their own collection, every
40 row becomes a document. To store dataframes as a binary file,
41 use `put(...., as_hdf=True).` `.get()` will always return a dataframe.
42
43 Python dicts, lists, tuples are stored as a single document with
44 a `.data` attribute holding the JSON-converted representation. `.get()`
45 will always return the corresponding python object of .data.
46
47 Models are joblib.dump()'ed and ziped prior to transferring into
48 GridFs. .get() will always unzip and joblib.load() before returning
49 the model. Note this requires that the process using .get() supports
50 joblib as well as all python classes referred to. If joblib is not
51 supported, .get() returns a file-like object.
52
53 The .metadata entry specifies the format used to store each
54 object as well as it's location:
55
56 * metadata.kind
57 the type of object
58 * metadata.name
59 the name of the object, as given on put()
60 * metadata.gridfile
61 the gridfs object (if any, null otherwise)
62 * metadata.collection
63 the name of the collection
64 * metadata.attributes
65 arbitrary custom attributes set in
66 put(attributes=obj). This is used e.g. by
67 OmegaRuntime's fit() method to record the
68 data used in the model's training.
69
70 **.put()** and **.get()** use helper methods specific to the type in
71 object's type and metadata.kind, respectively. In the future
72 a plugin system will enable extension to other types.
73"""
74from __future__ import absolute_import
75
76import logging
77import os
78import shutil
79import tempfile
80import warnings
81import weakref
82from datetime import datetime
83from uuid import uuid4
84
85import bson
86import gridfs
87from mongoengine.connection import disconnect, \
88 connect, _connections, get_db
89from mongoengine.errors import DoesNotExist
90from mongoengine.fields import GridFSProxy
91from mongoengine.queryset.visitor import Q
92
93from omegaml.store.fastinsert import fast_insert, default_chunksize
94from omegaml.util import unravel_index, restore_index, make_tuple, jsonescape, \
95 cursor_to_dataframe, convert_dtypes, load_class, extend_instance, ensure_index, PickableCollection, \
96 mongo_compatible, signature
97from .queryops import sanitize_filter
98from ..documents import make_Metadata, MDREGISTRY
99from ..mongoshim import sanitize_mongo_kwargs, waitForConnection
100from ..util import (is_estimator, is_dataframe, is_ndarray, is_spark_mllib,
101 settings as omega_settings, urlparse, is_series)
102
103logger = logging.getLogger(__name__)
104
105
[docs]
106class OmegaStore(object):
107 """
108 The storage backend for models and data
109 """
110
111 def __init__(self, mongo_url=None, bucket=None, prefix=None, kind=None, defaults=None, dbalias=None):
112 """
113 :param mongo_url: the mongourl to use for the gridfs
114 :param bucket: the mongo collection to use for gridfs
115 :param prefix: the path prefix for files. defaults to blank
116 :param kind: the kind or list of kinds to limit this store to
117 """
118 self.defaults = defaults or omega_settings()
119 self.mongo_url = mongo_url or self.defaults.OMEGA_MONGO_URL
120 self.bucket = bucket or self.defaults.OMEGA_MONGO_COLLECTION
121 self._fs_collection = self._ensure_fs_collection()
122 self._fs = None
123 self._tmppath = os.path.join(self.defaults.OMEGA_TMP, uuid4().hex)
124 self.prefix = prefix or ''
125 self.force_kind = kind
126 self._Metadata_cls = None
127 # don't initialize db here to avoid using the default settings
128 # otherwise Metadata will already have a connection and not use
129 # the one provided in override_settings
130 self._db = None
131 self._dbalias = dbalias
132 # add backends and mixins
133 self._apply_mixins()
134 # register backends
135 self.register_backends()
136 # register finalizer for cleanup
137 weakref.finalize(self, self._cleanup, repr(self), tmppath=str(self._tmppath))
138
139 def __repr__(self):
140 return 'OmegaStore(bucket={}, prefix={})'.format(self.bucket, self.prefix)
141
142 def __equal__(self, other):
143 """test for equality of OmegaStore instances
144
145 Args:
146 other: OmegaStore instance
147
148 Returns:
149 True if other is the same database, same bucket, same prefix
150 """
151 return self.mongo_url == other.mongo_url and self.bucket == other.bucket and self.prefix == other.prefix
152
153 @property
154 def tmppath(self):
155 """
156 return an instance-specific temporary path
157 """
158 os.makedirs(self._tmppath, exist_ok=True)
159 return self._tmppath
160
161 @property
162 def mongodb(self):
163 """
164 Returns a mongo database object
165 """
166 if self._db is not None:
167 return self._db
168 # parse salient parts of mongourl, e.g.
169 # mongodb://user:password@host/dbname
170 self.parsed_url = urlparse.urlparse(self.mongo_url)
171 self.database_name = self.parsed_url.path[1:]
172 host = self.parsed_url.netloc
173 scheme = self.parsed_url.scheme
174 username, password = None, None
175 if '@' in host:
176 creds, host = host.split('@', 1)
177 if ':' in creds:
178 username, password = creds.split(':')
179 # connect via mongoengine
180 #
181 # note this uses a MongoClient in the background, with pooled
182 # connections. there are multiprocessing issues with pymongo:
183 # http://api.mongodb.org/python/3.2/faq.html#using-pymongo-with-multiprocessing
184 # connect=False is due to https://jira.mongodb.org/browse/PYTHON-961
185 # this defers connecting until the first access
186 # serverSelectionTimeoutMS=2500 is to fail fast, the default is 30000
187 #
188 # use an instance specific alias, note that access to Metadata and
189 # QueryCache must pass the very same alias
190 self._dbalias = alias = self._dbalias or 'omega-{}'.format(uuid4().hex)
191 # always disconnect before registering a new connection because
192 # mongoengine.connect() forgets all connection settings upon disconnect
193 if alias not in _connections:
194 disconnect(alias)
195 connection = connect(alias=alias, db=self.database_name,
196 host=f'{scheme}://{host}',
197 username=username,
198 password=password,
199 connect=False,
200 authentication_source='admin',
201 **sanitize_mongo_kwargs(self.defaults.OMEGA_MONGO_SSL_KWARGS),
202 )
203 # since PyMongo 4, connect() no longer waits for connection
204 waitForConnection(connection)
205 self._db = get_db(alias)
206 return self._db
207
208 @property
209 def _Metadata(self):
210 if self._Metadata_cls is None:
211 # hack to localize metadata
212 db = self.mongodb
213 self._Metadata_cls = make_Metadata(db_alias=self._dbalias,
214 collection=self._fs_collection)
215 return self._Metadata_cls
216
217 @property
218 def fs(self):
219 """
220 Retrieve a gridfs instance using url and collection provided
221
222 :return: a gridfs instance
223 """
224 if self._fs is not None:
225 return self._fs
226 self._fs = gridfs.GridFS(self.mongodb, collection=self._fs_collection)
227 self._ensure_fs_index(self._fs)
228 return self._fs
229
230 def metadata(self, name=None, bucket=None, prefix=None, version=-1, **kwargs):
231 """
232 Returns a metadata document for the given entry name
233 """
234 # FIXME: version attribute does not do anything
235 # FIXME: metadata should be stored in a bucket-specific collection
236 # to enable access control, see https://docs.mongodb.com/manual/reference/method/db.create
237 #
238 # Role/#db.createRole
239 db = self.mongodb
240 fs = self.fs
241 prefix = prefix or self.prefix
242 bucket = bucket or self.bucket
243 # Meta is to silence lint on import error
244 Meta = self._Metadata
245 return Meta.objects(name=str(name), prefix=prefix, bucket=bucket).no_cache().first()
246
247 def make_metadata(self, name, kind, bucket=None, prefix=None, **kwargs):
248 """
249 create or update a metadata object
250
251 this retrieves a Metadata object if it exists given the kwargs. Only
252 the name, prefix and bucket arguments are considered
253
254 for existing Metadata objects, the attributes kw is treated as follows:
255
256 * attributes=None, the existing attributes are left as is
257 * attributes={}, the attributes value on an existing metadata object
258 is reset to the empty dict
259 * attributes={ some : value }, the existing attributes are updated
260
261 For new metadata objects, attributes defaults to {} if not specified,
262 else is set as provided.
263
264 :param name: the object name
265 :param bucket: the bucket, optional, defaults to self.bucket
266 :param prefix: the prefix, optional, defaults to self.prefix
267
268 """
269 # TODO kept _make_metadata for backwards compatibility.
270 return self._make_metadata(name, bucket=bucket, prefix=prefix,
271 kind=kind, **kwargs)
272
273 def _make_metadata(self, name=None, bucket=None, prefix=None, **kwargs):
274 """
275 create or update a metadata object
276
277 this retrieves a Metadata object if it exists given the kwargs. Only
278 the name, prefix and bucket arguments are considered
279
280 for existing Metadata objects, the attributes kw is treated as follows:
281
282 * attributes=None, the existing attributes are left as is
283 * attributes={}, the attributes value on an existing metadata object
284 is reset to the empty dict
285 * attributes={ some : value }, the existing attributes are updated
286
287 For new metadata objects, attributes defaults to {} if not specified,
288 else is set as provided.
289
290 :param name: the object name
291 :param bucket: the bucket, optional, defaults to self.bucket
292 :param prefix: the prefix, optional, defaults to self.prefix
293 """
294 bucket = bucket or self.bucket
295 prefix = prefix or self.prefix
296 meta = self.metadata(name=name,
297 prefix=prefix,
298 bucket=bucket)
299 if meta:
300 dict_fields = 'attributes', 'kind_meta'
301 for k, v in kwargs.items():
302 if k in dict_fields and v is not None and len(v) > 0:
303 previous = getattr(meta, k, {})
304 previous.update(v)
305 setattr(meta, k, previous)
306 elif k in dict_fields and v is not None and len(v) == 0:
307 setattr(meta, k, {})
308 elif k in dict_fields and v is None:
309 # ignore non specified attributes
310 continue
311 else:
312 # by default set whatever attribute is provided
313 setattr(meta, k, v)
314 else:
315 meta = self._Metadata(name=name, bucket=bucket, prefix=prefix,
316 **kwargs)
317 return meta
318
319 def _drop_metadata(self, name=None, **kwargs):
320 # internal method to delete meta data of an object
321 meta = self.metadata(name, **kwargs)
322 if meta is not None:
323 meta.delete()
324
325 def collection(self, name=None, bucket=None, prefix=None):
326 """
327 Returns a mongo db collection as a datastore
328
329 If there is an existing object of name, will return the .collection
330 of the object. Otherwise returns the collection according to naming
331 convention {bucket}.{prefix}.{name}.datastore
332
333 :param name: the collection to use. if none defaults to the
334 collection name given on instantiation. the actual collection name
335 used is always prefix + name + '.data'
336 """
337 # see if we have a known object and a collection for that, if not define it
338 meta = self.metadata(name, bucket=bucket, prefix=prefix)
339 collection = meta.collection if meta else None
340 if not collection:
341 collection = self.object_store_key(name, '.datastore')
342 collection = collection.replace('..', '.')
343 # return the collection
344 try:
345 datastore = getattr(self.mongodb, collection)
346 except Exception as e:
347 raise e
348 return PickableCollection(datastore)
349
350 def _apply_mixins(self):
351 """
352 apply mixins in defaults.OMEGA_STORE_MIXINS
353 """
354 for mixin in self.defaults.OMEGA_STORE_MIXINS:
355 conditional = self._mixins_conditional
356 extend_instance(self, mixin,
357 conditional=conditional)
358
359 def _mixins_conditional(self, cls, obj):
360 return cls.supports(obj) if hasattr(cls, 'supports') else True
361
362 def register_backends(self):
363 """
364 register backends in defaults.OMEGA_STORE_BACKENDS
365 """
366 for kind, backend in self.defaults.OMEGA_STORE_BACKENDS.items():
367 self.register_backend(kind, backend)
368
369 def register_backend(self, kind, backend):
370 """
371 register a backend class
372
373 :param kind: (str) the backend kind
374 :param backend: (class) the backend class
375 """
376 self.defaults.OMEGA_STORE_BACKENDS[kind] = load_class(backend)
377 if kind not in MDREGISTRY.KINDS:
378 MDREGISTRY.KINDS.append(kind)
379 return self
380
381 def register_mixin(self, mixincls):
382 """
383 register a mixin class
384
385 :param mixincls: (class) the mixin class
386 """
387 extend_instance(self, mixincls)
388 return self
389
[docs]
390 def put(self, obj, name, attributes=None, kind=None, replace=False, model_store=None,
391 data_store=None, **kwargs):
392 """
393 Stores an object, store estimators, pipelines, numpy arrays or
394 pandas dataframes
395 """
396 if replace:
397 self.drop(name, force=True)
398 backend = self.get_backend_byobj(obj, name, attributes=attributes, kind=kind,
399 model_store=model_store, data_store=data_store,
400 **kwargs)
401 if backend:
402 return backend.put(obj, name, attributes=attributes, **kwargs)
403 # TODO move all of the specifics to backend implementations
404 if is_estimator(obj):
405 backend = self.get_backend_bykind(MDREGISTRY.SKLEARN_JOBLIB)
406 return backend.put(obj, name, attributes=attributes, **kwargs)
407 elif is_spark_mllib(obj):
408 backend = self.get_backend_bykind(MDREGISTRY.SKLEARN_JOBLIB)
409 return backend.put(obj, name, attributes=attributes, **kwargs)
410 elif is_dataframe(obj) or is_series(obj):
411 groupby = kwargs.get('groupby')
412 if obj.empty:
413 from warnings import warn
414 warn(
415 'Provided dataframe is empty, ignoring it, doing nothing here!')
416 return None
417 if kwargs.pop('as_hdf', False):
418 return self.put_dataframe_as_hdf(
419 obj, name, attributes, **kwargs)
420 elif groupby:
421 return self.put_dataframe_as_dfgroup(
422 obj, name, groupby, attributes)
423 append = kwargs.pop('append', None)
424 timestamp = kwargs.pop('timestamp', None)
425 index = kwargs.pop('index', None)
426 chunksize = kwargs.pop('chunksize', default_chunksize)
427 return self.put_dataframe_as_documents(
428 obj, name, append=append, attributes=attributes, index=index,
429 timestamp=timestamp, chunksize=chunksize, **kwargs)
430 elif is_ndarray(obj):
431 if kwargs.pop('as_pydata', False):
432 return self.put_pyobj_as_document(obj.tolist(), name,
433 attributes=attributes, **kwargs)
434 return self.put_ndarray_as_hdf(obj, name, attributes=attributes,
435 **kwargs)
436 elif isinstance(obj, (dict, list, tuple)):
437 kwargs.pop('as_pydata', None)
438 if kwargs.pop('as_hdf', False):
439 return self.put_pyobj_as_hdf(obj, name,
440 attributes=attributes, **kwargs)
441 return self.put_pyobj_as_document(obj, name,
442 attributes=attributes,
443 **kwargs)
444 else:
445 raise TypeError('type %s not supported' % type(obj))
446
447 def put_dataframe_as_documents(self, obj, name, append=None,
448 attributes=None, index=None,
449 timestamp=None, chunksize=None,
450 ensure_compat=True, _fast_insert=fast_insert,
451 **kwargs):
452 """
453 store a dataframe as a row-wise collection of documents
454
455 :param obj: the dataframe to store
456 :param name: the name of the item in the store
457 :param append: if False collection will be dropped before inserting,
458 if True existing documents will persist. Defaults to True. If not
459 specified and rows have been previously inserted, will issue a
460 warning.
461 :param index: list of columns, using +, -, @ as a column prefix to
462 specify ASCENDING, DESCENDING, GEOSPHERE respectively. For @ the
463 column has to represent a valid GeoJSON object.
464 :param timestamp: if True or a field name adds a timestamp. If the
465 value is a boolean or datetime, uses _created as the field name.
466 The timestamp is always datetime.datetime.utcnow(). May be overriden
467 by specifying the tuple (col, datetime).
468 :param ensure_compat: if True attempt to convert obj to mongodb compatibility,
469 set to False only if you are sure to have only compatible values in dataframe.
470 defaults to True. False may reduce memory and increase speed on large dataframes.
471 :return: the Metadata object created
472 """
473 import pandas as pd
474 from .queryops import MongoQueryOps
475 collection = self.collection(name)
476 if is_series(obj):
477 import pandas as pd
478 obj = pd.DataFrame(obj, index=obj.index, columns=[str(obj.name)])
479 store_series = True
480 else:
481 store_series = False
482 if append is False:
483 self.drop(name, force=True)
484 elif append is None and collection.count_documents({}, limit=1):
485 from warnings import warn
486 warn('%s already exists, will append rows' % name)
487 if index:
488 # get index keys
489 if isinstance(index, dict):
490 idx_kwargs = index
491 index = index.pop('columns')
492 else:
493 idx_kwargs = {}
494 # create index with appropriate options
495 keys, idx_kwargs = MongoQueryOps().make_index(index, **idx_kwargs)
496 ensure_index(collection, keys, **idx_kwargs)
497 if timestamp:
498 dt = datetime.utcnow()
499 if isinstance(timestamp, bool):
500 col = '_created'
501 elif isinstance(timestamp, str):
502 col = timestamp
503 elif isinstance(timestamp, datetime):
504 col, dt = '_created', timestamp
505 elif isinstance(timestamp, tuple):
506 col, dt = timestamp
507 else:
508 col = '_created'
509 obj[col] = dt
510 # store dataframe indicies
511 # FIXME this may be a performance issue, use size stored on stats or metadata
512 row_count = self.collection(name).estimated_document_count()
513 # fixes #466, ensure column names are strings in a multiindex
514 if isinstance(obj.columns, pd.MultiIndex):
515 obj.columns = obj.columns.map('_'.join)
516 obj, idx_meta = unravel_index(obj, row_count=row_count)
517 stored_columns = [jsonescape(col) for col in obj.columns]
518 column_map = list(zip(obj.columns, stored_columns))
519 d_column_map = dict(column_map)
520 dtypes = {
521 d_column_map.get(k): v.name
522 for k, v in obj.dtypes.items()
523 }
524 kind_meta = {
525 'columns': column_map,
526 'dtypes': dtypes,
527 'idx_meta': idx_meta
528 }
529 # ensure column names to be strings
530 obj.columns = stored_columns
531 # create mongon indicies for data frame index columns
532 df_idxcols = [col for col in obj.columns if col.startswith('_idx#')]
533 if df_idxcols:
534 keys, idx_kwargs = MongoQueryOps().make_index(df_idxcols)
535 ensure_index(collection, keys, **idx_kwargs)
536 # create index on row id
537 keys, idx_kwargs = MongoQueryOps().make_index(['_om#rowid'])
538 ensure_index(collection, keys, **idx_kwargs)
539 # bulk insert
540 # -- get native objects
541 # -- seems to be required since pymongo 3.3.x. if not converted
542 # pymongo raises Cannot Encode object for int64 types
543 if ensure_compat:
544 for col, col_dtype in dtypes.items():
545 if 'datetime' in col_dtype:
546 obj[col].fillna('', inplace=True)
547 obj = obj.astype('O', errors='ignore')
548 _fast_insert(obj, self, name, chunksize=chunksize)
549 kind = (MDREGISTRY.PANDAS_SEROWS
550 if store_series
551 else MDREGISTRY.PANDAS_DFROWS)
552 meta = self._make_metadata(name=name,
553 prefix=self.prefix,
554 bucket=self.bucket,
555 kind=kind,
556 kind_meta=kind_meta,
557 attributes=attributes,
558 collection=collection.name)
559 return meta.save()
560
561 def put_dataframe_as_dfgroup(self, obj, name, groupby, attributes=None):
562 """
563 store a dataframe grouped by columns in a mongo document
564
565 :Example:
566
567 > # each group
568 > {
569 > #group keys
570 > key: val,
571 > _data: [
572 > # only data keys
573 > { key: val, ... }
574 > ]}
575
576 """
577
578 def row_to_doc(obj):
579 for gval, gdf in obj.groupby(groupby):
580 if hasattr(gval, 'astype'):
581 gval = make_tuple(gval.astype('O'))
582 else:
583 gval = make_tuple(gval)
584 doc = dict(zip(groupby, gval))
585 datacols = list(set(gdf.columns) - set(groupby))
586 doc['_data'] = gdf[datacols].astype('O').to_dict('records')
587 yield doc
588
589 datastore = self.collection(name)
590 datastore.drop()
591 datastore.insert_many(row_to_doc(obj))
592 return self._make_metadata(name=name,
593 prefix=self.prefix,
594 bucket=self.bucket,
595 kind=MDREGISTRY.PANDAS_DFGROUP,
596 attributes=attributes,
597 collection=datastore.name).save()
598
599 def put_dataframe_as_hdf(self, obj, name, attributes=None, **kwargs):
600 filename = self.object_store_key(name, '.hdf')
601 hdffname = self._package_dataframe2hdf(obj, filename)
602 with open(hdffname, 'rb') as fhdf:
603 fileid = self.fs.put(fhdf, filename=filename)
604 return self._make_metadata(name=name,
605 prefix=self.prefix,
606 bucket=self.bucket,
607 kind=MDREGISTRY.PANDAS_HDF,
608 attributes=attributes,
609 gridfile=GridFSProxy(db_alias=self._dbalias,
610 grid_id=fileid)).save()
611
612 def put_ndarray_as_hdf(self, obj, name, attributes=None, **kwargs):
613 """ store numpy array as hdf
614
615 this is hack, converting the array to a dataframe then storing
616 it
617 """
618 import pandas as pd
619 df = pd.DataFrame(obj)
620 return self.put_dataframe_as_hdf(df, name, attributes=attributes)
621
622 def put_pyobj_as_hdf(self, obj, name, attributes=None, **kwargs):
623 """
624 store list, tuple, dict as hdf
625
626 this requires the list, tuple or dict to be convertible into
627 a dataframe
628 """
629 import pandas as pd
630 df = pd.DataFrame(obj)
631 return self.put_dataframe_as_hdf(df, name, attributes=attributes)
632
633 def put_pyobj_as_document(self, obj, name, attributes=None, append=True, index=None, as_many=None, **kwargs):
634 """
635 store a dict as a document
636
637 similar to put_dataframe_as_documents no data will be replaced by
638 default. that is, obj is appended as new documents into the objects'
639 mongo collection. to replace the data, specify append=False.
640 """
641 collection = self.collection(name)
642 if append is False:
643 collection.drop()
644 elif append is None and collection.esimated_document_count(limit=1):
645 from warnings import warn
646 warn('%s already exists, will append rows' % name)
647 if index:
648 # create index with appropriate options
649 from omegaml.store import MongoQueryOps
650 if isinstance(index, dict):
651 idx_kwargs = index
652 index = index.pop('columns')
653 else:
654 idx_kwargs = {}
655 index = [f'data.{c}' for c in index]
656 keys, idx_kwargs = MongoQueryOps().make_index(index, **idx_kwargs)
657 ensure_index(collection, keys, **idx_kwargs)
658 if as_many is None:
659 as_many = isinstance(obj, (list, tuple)) and isinstance(obj[0], (list, tuple))
660 if as_many:
661 # list of lists are inserted as many objects, as in pymongo < 4
662 records = (mongo_compatible({'data': item}) for item in obj)
663 result = collection.insert_many(records)
664 objid = result.inserted_ids[-1]
665 else:
666 result = collection.insert_one(mongo_compatible({'data': obj}))
667 objid = result.inserted_id
668
669 return self._make_metadata(name=name,
670 prefix=self.prefix,
671 bucket=self.bucket,
672 kind=MDREGISTRY.PYTHON_DATA,
673 collection=collection.name,
674 attributes=attributes,
675 objid=objid).save()
676
[docs]
677 def drop(self, name, force=False, version=-1, report=False, **kwargs):
678 """
679 Drop the object
680
681 :param name: The name of the object. If the name is a pattern it will
682 be expanded using .list(), and call .drop() on every obj found.
683 :param force: If True ignores DoesNotExist exception, defaults to False
684 meaning this raises a DoesNotExist exception if the name does not
685 exist
686 :param report: if True returns a dict name=>status, where status is True
687 if deleted, False if not deleted
688 :return: True if object was deleted, False if not.
689 If force is True and the object does not exist it will still return True
690 :raises: DoesNotExist if the object does not exist and ```force=False```
691 """
692 is_pattern = '*' in name
693 objs = [name] if not is_pattern else self.list(name)
694 results = []
695 for name in objs:
696 try:
697 backend = self.get_backend(name)
698 drop = backend.drop if backend else self._drop
699 result = drop(name, force=force, version=version, **kwargs)
700 except Exception as e:
701 result = False
702 if not force and not is_pattern:
703 raise
704 if force:
705 result = self._drop(name, force=force, version=version)
706 results.append((name, result))
707 if not objs:
708 result = self._drop(name, force=force, version=version)
709 results.append((name, result))
710 return dict(results) if report else len(results) > 0
711
712 def _drop(self, name, force=False, version=-1, keep_data=False, **kwargs):
713 meta = self.metadata(name, version=version)
714 if meta is None and not force:
715 raise DoesNotExist(name)
716 collection = self.collection(name)
717 if collection and not keep_data:
718 self.mongodb.drop_collection(collection.name)
719 if meta:
720 if meta.collection and not keep_data:
721 self.mongodb.drop_collection(meta.collection)
722 if meta and meta.gridfile is not None and not keep_data:
723 meta.gridfile.delete()
724 self._drop_metadata(name)
725 return True
726 return False
727
728 def get_backend_bykind(self, kind, model_store=None, data_store=None,
729 **kwargs):
730 """
731 return the backend by a given object kind
732
733 :param kind: The object kind
734 :param model_store: the OmegaStore instance used to store models
735 :param data_store: the OmegaStore instance used to store data
736 :param kwargs: the kwargs passed to the backend initialization
737 :return: the backend
738 """
739 try:
740 backend_cls = load_class(self.defaults.OMEGA_STORE_BACKENDS[kind])
741 except KeyError as e:
742 raise ValueError('backend {kind} does not exist'.format(**locals()))
743 model_store = model_store or self
744 data_store = data_store or self
745 backend = backend_cls(model_store=model_store,
746 data_store=data_store, **kwargs)
747 return backend
748
749 def get_backend(self, name, model_store=None, data_store=None, **kwargs):
750 """
751 return the backend by a given object name
752
753 :param kind: The object kind
754 :param model_store: the OmegaStore instance used to store models
755 :param data_store: the OmegaStore instance used to store data
756 :param kwargs: the kwargs passed to the backend initialization
757 :return: the backend
758 """
759 meta = self.metadata(name)
760 if meta is not None and meta.kind in self.defaults.OMEGA_STORE_BACKENDS:
761 return self.get_backend_bykind(meta.kind,
762 model_store=model_store,
763 data_store=data_store,
764 **kwargs)
765 return None
766
767 def help(self, name_or_obj=None, kind=None, raw=False, display=None, renderer=None):
768 """ get help for an object by looking up its backend and calling help() on it
769
770 Retrieves the object's metadata and looks up its corresponding backend. If the
771 metadata.attributes['docs'] is a string it will display this as the help() contents.
772 If the string starts with 'http://' or 'https://' it will open the web page.
773
774 Args:
775 name_or_obj (str|obj): the name or actual object to get help for
776 kind (str): optional, if specified forces retrieval of backend for the given kind
777 raw (bool): optional, if True forces help to be the backend type of the object.
778 If False returns the attributes[docs] on the object's metadata, if available.
779 Defaults to False
780 display (fn): optional, callable for interactive display, defaults to help in
781 if sys.flags.interactive is True, else uses pydoc.render_doc with plaintext
782 renderer (fn): optional, the renderer= argument for pydoc.render_doc to use if
783 sys.flags.interactive is False and display is not provided
784
785 Returns:
786 * help(obj) if python is in interactive mode
787 * text(str) if python is in not interactive mode
788 """
789 import sys
790 import pydoc
791 interactive = bool(display) if display is not None else sys.flags.interactive
792 display = display or help
793 renderer = renderer or pydoc.plaintext
794 obj = self._resolve_help_backend(name_or_obj=name_or_obj, kind=kind, raw=raw)
795 if any(str(obj.__doc__).startswith(v) for v in ('https://', 'http://')):
796 obj = obj.__doc__
797 if interactive and display is help:
798 import webbrowser
799 display = webbrowser.open
800 return display(obj) if interactive else pydoc.render_doc(obj, renderer=renderer)
801
802 def _resolve_help_backend(self, name_or_obj=None, kind=None, raw=False):
803 # helper so we can test help
804 meta = self.metadata(name_or_obj) if name_or_obj else None
805 if kind:
806 backend = self.get_backend_bykind(kind)
807 else:
808 backend = self.get_backend(name_or_obj) or self.get_backend_byobj(name_or_obj)
809 if backend is None:
810 backend = self
811 if not raw and meta is not None and 'docs' in meta.attributes:
812 def UserDocumentation():
813 pass
814
815 basedoc = backend.__doc__ or ''
816 UserDocumentation.__doc__ = (basedoc +
817 meta.attributes['docs'])
818 backend = UserDocumentation
819 return backend
820
821 def get_backend_byobj(self, obj, name=None, kind=None, attributes=None,
822 model_store=None, data_store=None, **kwargs):
823 """
824 return the matching backend for the given obj
825
826 Returns:
827 the first backend that supports the given parameters or None
828 """
829 model_store = model_store or self
830 data_store = data_store or self
831 meta = self.metadata(name) if name else None
832 kind = kind or (meta.kind if meta is not None else None)
833 backend = None
834 if kind:
835 objtype = str(type(obj))
836 if kind in self.defaults.OMEGA_STORE_BACKENDS:
837 backend = self.get_backend_bykind(kind, data_store=data_store, model_store=model_store)
838 if not backend.supports(obj, name, attributes=attributes,
839 data_store=data_store,
840 model_store=model_store,
841 meta=meta,
842 kind=kind, **kwargs):
843 warnings.warn('Backend {kind} does not support {objtype}'.format(**locals()))
844 else:
845 pass
846 # TODO refactor pandas and numpy handling into proper backend to avoid misleading warning
847 # warnings.warn('Backend {kind} not found {objtype}. Reverting to default'.format(**locals()))
848 else:
849 for backend_kind, backend_cls in self.defaults.OMEGA_STORE_BACKENDS.items():
850 backend = self.get_backend_bykind(backend_kind, data_store=data_store, model_store=model_store)
851 if backend.supports(obj, name, attributes=attributes,
852 data_store=data_store, model_store=model_store,
853 **kwargs):
854 break
855 else:
856 backend = None
857 return backend
858
[docs]
859 def getl(self, *args, **kwargs):
860 """ return a lazy MDataFrame for a given object
861
862 Same as .get, but returns a MDataFrame
863
864 """
865 return self.get(*args, lazy=True, **kwargs)
866
[docs]
867 def get(self, name, version=-1, force_python=False,
868 kind=None, model_store=None, data_store=None, **kwargs):
869 """
870 Retrieve an object
871
872 :param name: The name of the object
873 :param version: Version of the stored object (not supported)
874 :param force_python: Return as a python object
875 :param kwargs: kwargs depending on object kind
876 :return: an object, estimator, pipelines, data array or pandas dataframe
877 previously stored with put()
878 """
879 meta = self.metadata(name, version=version)
880 if meta is None:
881 return None
882 if not force_python:
883 backend = (self.get_backend(name, model_store=model_store,
884 data_store=data_store)
885 if not kind else self.get_backend_bykind(kind,
886 model_store=model_store,
887 data_store=data_store))
888 if backend is not None:
889 # FIXME: some backends need to get model_store, data_store, but fails tests
890 return backend.get(name, **kwargs) # model_store=model_store, data_store=data_store, **kwargs)
891 if meta.kind == MDREGISTRY.SKLEARN_JOBLIB:
892 backend = self.get_backend(name)
893 return backend.get_model(name)
894 elif meta.kind == MDREGISTRY.SPARK_MLLIB:
895 backend = self.get_backend(name)
896 return backend.get_model(name, version)
897 elif meta.kind == MDREGISTRY.PANDAS_DFROWS:
898 return self.get_dataframe_documents(name, version=version,
899 **kwargs)
900 elif meta.kind == MDREGISTRY.PANDAS_SEROWS:
901 return self.get_dataframe_documents(name, version=version,
902 is_series=True,
903 **kwargs)
904 elif meta.kind == MDREGISTRY.PANDAS_DFGROUP:
905 return self.get_dataframe_dfgroup(
906 name, version=version, **kwargs)
907 elif meta.kind == MDREGISTRY.PYTHON_DATA:
908 return self.get_python_data(name, version=version, **kwargs)
909 elif meta.kind == MDREGISTRY.PANDAS_HDF:
910 return self.get_dataframe_hdf(name, version=version)
911 return self.get_object_as_python(meta, version=version)
912
913 def get_dataframe_documents(self, name, columns=None, lazy=False,
914 filter=None, version=-1, is_series=False,
915 chunksize=None, sanitize=True, trusted=None,
916 **kwargs):
917 """
918 Internal method to return DataFrame from documents
919
920 :param name: the name of the object (str)
921 :param columns: the column projection as a list of column names
922 :param lazy: if True returns a lazy representation as an MDataFrame.
923 If False retrieves all data and returns a DataFrame (default)
924 :param filter: the filter to be applied as a column__op=value dict
925 :param sanitize: sanitize filter by removing all $op filter keys,
926 defaults to True. Specify False to allow $op filter keys. $where
927 is always removed as it is considered unsafe.
928 :param version: the version to retrieve (not supported)
929 :param is_series: if True retruns a Series instead of a DataFrame
930 :param kwargs: remaining kwargs are used a filter. The filter kwarg
931 overrides other kwargs.
932 :return: the retrieved object (DataFrame, Series or MDataFrame)
933
934 """
935 from omegaml.store.queryops import sanitize_filter
936 from omegaml.store.filtered import FilteredCollection
937
938 collection = self.collection(name)
939 meta = self.metadata(name)
940 filter = filter or kwargs
941 filter = sanitize_filter(filter, no_ops=sanitize, trusted=trusted)
942 if lazy or chunksize:
943 from ..mdataframe import MDataFrame
944 df = MDataFrame(collection,
945 metadata=meta.kind_meta,
946 columns=columns).query(**filter)
947 if is_series:
948 df = df[0]
949 if chunksize is not None and chunksize > 0:
950 return df.iterchunks(chunksize=chunksize)
951 else:
952 # TODO ensure the same processing is applied in MDataFrame
953 # TODO this method should always use a MDataFrame disregarding lazy
954 if filter:
955 from .query import Filter
956 query = Filter(collection, **filter).query
957 cursor = FilteredCollection(collection).find(filter=query, projection=columns)
958 else:
959 cursor = FilteredCollection(collection).find(projection=columns)
960 # restore dataframe
961 df = cursor_to_dataframe(cursor)
962 if '_id' in df.columns:
963 del df['_id']
964 if hasattr(meta, 'kind_meta'):
965 df = convert_dtypes(df, meta.kind_meta.get('dtypes', {}))
966 # -- restore columns
967 meta_columns = dict(meta.kind_meta.get('columns'))
968 if meta_columns:
969 # apply projection, if any
970 if columns:
971 # get only projected columns
972 # meta_columns is {origin_column: stored_column}
973 orig_columns = dict({k: v for k, v in meta_columns.items()
974 if k in columns or v in columns})
975 else:
976 # restore columns to original name
977 orig_columns = meta_columns
978 df.rename(columns=orig_columns, inplace=True)
979 # -- restore indexes
980 idx_meta = meta.kind_meta.get('idx_meta')
981 if idx_meta:
982 df = restore_index(df, idx_meta)
983 # -- restore row order
984 if is_series:
985 index = df.index
986 name = df.columns[0]
987 df = df[name]
988 df.index = index
989 df.name = None if name == 'None' else name
990 return df
991
992 def rebuild_params(self, kwargs, collection):
993 """
994 Returns a modified set of parameters for querying mongodb
995 based on how the mongo document is structured and the
996 fields the document is grouped by.
997
998 **Note: Explicitly to be used with get_grouped_data only**
999
1000 :param kwargs: Mongo filter arguments
1001 :param collection: The name of mongodb collection
1002 :return: Returns a set of parameters as dictionary.
1003 """
1004 modified_params = {}
1005 db_structure = collection.find_one({}, {'_id': False})
1006 groupby_columns = list(set(db_structure.keys()) - set(['_data']))
1007 if kwargs is not None:
1008 for item in kwargs:
1009 if item not in groupby_columns:
1010 modified_query_param = '_data.' + item
1011 modified_params[modified_query_param] = kwargs.get(item)
1012 else:
1013 modified_params[item] = kwargs.get(item)
1014 return modified_params
1015
1016 def get_dataframe_dfgroup(self, name, version=-1, sanitize=True, kwargs=None):
1017 """
1018 Return a grouped dataframe
1019
1020 :param name: the name of the object
1021 :param version: not supported
1022 :param kwargs: mongo db query arguments to be passed to
1023 collection.find() as a filter.
1024 :param sanitize: remove any $op operators in kwargs
1025
1026 """
1027 import pandas as pd
1028 from omegaml.store.queryops import sanitize_filter
1029 from omegaml.store.filtered import FilteredCollection
1030
1031 def convert_doc_to_row(cursor):
1032 for doc in cursor:
1033 data = doc.pop('_data', [])
1034 for row in data:
1035 doc.update(row)
1036 yield doc
1037
1038 datastore = FilteredCollection(self.collection(name))
1039 kwargs = kwargs if kwargs else {}
1040 params = self.rebuild_params(kwargs, datastore)
1041 params = sanitize_filter(params, no_ops=sanitize)
1042 cursor = datastore.find(params, projection={'_id': False})
1043 df = pd.DataFrame(convert_doc_to_row(cursor))
1044 return df
1045
1046 def get_dataframe_hdf(self, name, version=-1):
1047 """
1048 Retrieve dataframe from hdf
1049
1050 :param name: The name of object
1051 :param version: The version of object (not supported)
1052 :return: Returns a python pandas dataframe
1053 :raises: gridfs.errors.NoFile
1054 """
1055 df = None
1056 meta = self.metadata(name)
1057 filename = getattr(meta.gridfile, 'name', self.object_store_key(name, '.hdf'))
1058 if self.fs.exists(filename=filename):
1059 df = self._extract_dataframe_hdf(filename, version=version)
1060 return df
1061 else:
1062 raise gridfs.errors.NoFile(
1063 "{0} does not exist in mongo collection '{1}'".format(
1064 name, self.bucket))
1065
1066 def get_python_data(self, name, filter=None, version=-1, lazy=False, trusted=False, **kwargs):
1067 """
1068 Retrieve objects as python data
1069
1070 :param name: The name of object
1071 :param version: The version of object
1072
1073 :return: Returns the object as python list object
1074 """
1075 datastore = self.collection(name)
1076 filter = filter or kwargs
1077 sanitize_filter(filter) if trusted is False or trusted != signature(filter) else filter
1078 cursor = datastore.find(filter, **kwargs)
1079 if lazy:
1080 return cursor
1081 data = (d.get('data') for d in cursor)
1082 return list(data)
1083
1084 def get_object_as_python(self, meta, version=-1):
1085 """
1086 Retrieve object as python object
1087
1088 :param meta: The metadata object
1089 :param version: The version of the object
1090
1091 :return: Returns data as python object
1092 """
1093 if meta.kind == MDREGISTRY.SKLEARN_JOBLIB:
1094 return meta.gridfile
1095 if meta.kind == MDREGISTRY.PANDAS_HDF:
1096 return meta.gridfile
1097 if meta.kind == MDREGISTRY.PANDAS_DFROWS:
1098 return list(getattr(self.mongodb, meta.collection).find())
1099 if meta.kind == MDREGISTRY.PYTHON_DATA:
1100 col = getattr(self.mongodb, meta.collection)
1101 return col.find_one(dict(_id=meta.objid)).get('data')
1102 raise TypeError('cannot return kind %s as a python object' % meta.kind)
1103
1104 def __iter__(self):
1105 for f in self.list(include_temp=True):
1106 yield f
1107
1108 @property
1109 def buckets(self):
1110 return ['default' if b == self.defaults.OMEGA_MONGO_COLLECTION else b
1111 for b in self._Metadata.objects.distinct('bucket')]
1112
[docs]
1113 def list(self, pattern=None, regexp=None, kind=None, raw=False, hidden=None,
1114 include_temp=False, bucket=None, prefix=None, filter=None):
1115 """
1116 List all files in store
1117
1118 specify pattern as a unix pattern (e.g. :code:`models/*`,
1119 or specify regexp)
1120
1121 :param pattern: the unix file pattern or None for all
1122 :param regexp: the regexp. takes precedence over pattern
1123 :param raw: if True return the meta data objects
1124 :param filter: specify additional filter criteria, optional
1125 :return: List of files in store
1126
1127 """
1128 regex = lambda pattern: bson.regex.Regex(f'{pattern}')
1129 db = self.mongodb
1130 searchkeys = dict(bucket=bucket or self.bucket,
1131 prefix=prefix or self.prefix)
1132 q_excludes = Q()
1133 if regexp:
1134 searchkeys['name'] = regex(regexp)
1135 elif pattern:
1136 re_pattern = pattern.replace('*', '.*').replace('/', r'\/')
1137 searchkeys['name'] = regex(f'^{re_pattern}$')
1138 if not include_temp:
1139 q_excludes &= Q(name__not__startswith='_')
1140 q_excludes &= Q(name__not=regex(r'(.{1,*}\/?_.*)'))
1141 if not hidden:
1142 q_excludes &= Q(name__not__startswith='.')
1143 if kind or self.force_kind:
1144 kind = kind or self.force_kind
1145 if isinstance(kind, (tuple, list)):
1146 searchkeys.update(kind__in=kind)
1147 else:
1148 searchkeys.update(kind=kind)
1149 if filter:
1150 searchkeys.update(filter)
1151 q_search = Q(**searchkeys) & q_excludes
1152 files = self._Metadata.objects.no_cache()(q_search)
1153 return [f if raw else str(f.name).replace('.omm', '') for f in files]
1154
1155 def exists(self, name, hidden=False):
1156 """ check if object exists
1157
1158 Args:
1159 name (str): name of object
1160 hidden (bool): if True, include hidden files, defaults to
1161 False, unless name starts with '.'
1162
1163 Returns:
1164 bool, True if object exists
1165
1166 .. versionchanged:: 0.16.4
1167 hidden defaults to True if name starts with '.'
1168 """
1169 hidden = True if name.startswith('.') else hidden
1170 return name in self.list(name, hidden=hidden)
1171
1172 def object_store_key(self, name, ext, hashed=None):
1173 """
1174 Returns the store key
1175
1176 Unless you write a mixin or a backend you should not use this method
1177
1178 :param name: The name of object
1179 :param ext: The extension of the filename
1180 :param hashed: hash the key to support arbitrary name length, defaults
1181 to defaults.OMEGA_STORE_HASHEDNAMES, True by default since 0.13.7
1182
1183 :return: A filename with relative bucket, prefix and name
1184 """
1185 # byte string
1186 _u8 = lambda t: t.encode('UTF-8', 'replace') if isinstance(t, str) else t
1187 key = self._get_obj_store_key(name, ext)
1188 hashed = hashed if hashed is not None else self.defaults.OMEGA_STORE_HASHEDNAMES
1189 if hashed:
1190 from hashlib import sha1
1191 # SEC: CWE-916
1192 # - status: wontfix
1193 # - reason: hashcode is used purely for name resolution, not a security function
1194 hasher = sha1()
1195 hasher.update(_u8(key))
1196 key = hasher.hexdigest()
1197 return key
1198
1199 def _get_obj_store_key(self, name, ext, prefix=None, bucket=None):
1200 # backwards compatilibity implementation of object_store_key()
1201 name = '%s.%s' % (name, ext) if not name.endswith(ext) else name
1202 filename = '{bucket}.{prefix}.{name}'.format(
1203 bucket=bucket or self.bucket,
1204 prefix=prefix or self.prefix,
1205 name=name,
1206 ext=ext).replace('/', '_').replace('..', '.')
1207 return filename
1208
1209 def _package_dataframe2hdf(self, df, filename, key=None):
1210 """
1211 Package a dataframe as a hdf file
1212
1213 :param df: The dataframe
1214 :param filename: Name of file
1215
1216 :return: Filename of hdf file
1217 """
1218 lpath = tempfile.mkdtemp()
1219 fname = os.path.basename(filename)
1220 hdffname = os.path.join(self.tmppath, fname + '.hdf')
1221 key = key or 'data'
1222 df.to_hdf(hdffname, key)
1223 return hdffname
1224
1225 def _extract_dataframe_hdf(self, filename, version=-1):
1226 """
1227 Extracts a dataframe from a stored hdf file
1228
1229 :param filename: The name of file
1230 :param version: The version of file
1231
1232 :return: Pandas dataframe
1233 """
1234 import pandas as pd
1235 hdffname = os.path.join(self.tmppath, filename)
1236 dirname = os.path.dirname(hdffname)
1237 if not os.path.exists(dirname):
1238 os.makedirs(dirname)
1239 try:
1240 outf = self.fs.get_version(filename, version=version)
1241 except gridfs.errors.NoFile as e:
1242 raise e
1243 with open(hdffname, 'wb') as hdff:
1244 hdff.write(outf.read())
1245 hdf = pd.HDFStore(hdffname)
1246 key = list(hdf.keys())[0]
1247 df = hdf[key]
1248 hdf.close()
1249 return df
1250
1251 def _ensure_fs_collection(self):
1252 # ensure backwards-compatible gridfs access
1253 if self.defaults.OMEGA_BUCKET_FS_LEGACY:
1254 # prior to 0.13.2 a single gridfs instance was used, always equal to the default collection
1255 return self.defaults.OMEGA_MONGO_COLLECTION
1256 if self.bucket == self.defaults.OMEGA_MONGO_COLLECTION:
1257 # from 0.13.2 onwards, only the default bucket is equal to the default collection
1258 # backwards compatibility for existing installations
1259 return self.bucket
1260 # since 0.13.2, all buckets other than the default use a qualified collection name to
1261 # effectively separate files in different buckets, enabling finer-grade access control
1262 # and avoiding name collisions from different buckets
1263 return '{}_{}'.format(self.defaults.OMEGA_MONGO_COLLECTION, self.bucket)
1264
1265 def _ensure_fs_index(self, fs):
1266 # make sure we have proper chunks and file indicies. this should be created on first write, but sometimes is not
1267 # see https://docs.mongodb.com/manual/core/gridfs/#gridfs-indexes
1268 # pymongo since 4.1 no longer has fs._GridFS
1269 chunks_collection = fs._GridFS__chunks if hasattr(fs, '_GridFS__chunks') else fs._chunks
1270 files_collection = fs._GridFS__files if hasattr(fs, '_GridFS__files') else fs._files
1271 ensure_index(chunks_collection, {'files_id': 1, 'n': 1}, unique=True)
1272 ensure_index(files_collection, {'filename': 1, 'uploadDate': 1})
1273
1274 def sign(self, filter):
1275 return signature(filter)
1276
1277 @staticmethod
1278 def _cleanup(objrepr, tmppath=None):
1279 # cleanup any temporary files on exit
1280 # -- this is called as a finalizer (weakref.finalize)
1281 try:
1282 logger.debug(f'finalizing {objrepr}: cleaning up temporary files in {tmppath}')
1283 shutil.rmtree(tmppath, ignore_errors=True)
1284 except Exception as e:
1285 logger.error(f'finalizing {objrepr}: error occured as {e}')