Source code for omegaml.store.base

   1"""
   2Native storage for OmegaML using mongodb as the storage layer
   3
   4An OmegaStore instance is a MongoDB database. It has at least the
   5metadata collection which lists all objects stored in it. A metadata
   6document refers to the following types of objects (metadata.kind):
   7
   8* pandas.dfrows - a Pandas DataFrame stored as a collection of rows
   9* sklearn.joblib - a scikit learn estimator/pipline dumped using joblib.dump()
  10* python.data - an arbitrary python dict, tuple, list stored as a document
  11
  12Note that storing Pandas and scikit learn objects requires the availability
  13of the respective packages. If either can not be imported, the OmegaStore
  14degrades to a python.data store only. It will still .list() and get() any
  15object, however reverts to pure python objects. In this case it is up
  16to the client to convert the data into an appropriate format for processing.
  17
  18Pandas and scikit-learn objects can only be stored if these packages are
  19availables. put() raises a TypeError if you pass such objects and these
  20modules cannot be loaded.
  21
  22All data are stored within the same mongodb, in per-object collections
  23as follows:
  24
  25    * .metadata
  26        all metadata. each object is one document,
  27        See **omegaml.documents.Metadata** for details
  28    * .<bucket>.files
  29        this is the GridFS instance used to store
  30        blobs (models, numpy, hdf). The actual file name
  31        will be <prefix>/<name>.<ext>, where ext is
  32        optionally generated by put() / get().
  33    * .<bucket>.<prefix>.<name>.data
  34        every other dataset is stored in a separate
  35        collection (dataframes, dicts, lists, tuples).
  36        Any forward slash in prefix is ignored (e.g. 'data/'
  37        becomes 'data')
  38
  39    DataFrames by default are stored in their own collection, every
  40    row becomes a document. To store dataframes as a binary file,
  41    use `put(...., as_hdf=True).` `.get()` will always return a dataframe.
  42
  43    Python dicts, lists, tuples are stored as a single document with
  44    a `.data` attribute holding the JSON-converted representation. `.get()`
  45    will always return the corresponding python object of .data.
  46
  47    Models are joblib.dump()'ed and ziped prior to transferring into
  48    GridFs. .get() will always unzip and joblib.load() before returning
  49    the model. Note this requires that the process using .get() supports
  50    joblib as well as all python classes referred to. If joblib is not
  51    supported, .get() returns a file-like object.
  52
  53    The .metadata entry specifies the format used to store each
  54    object as well as it's location:
  55
  56    * metadata.kind
  57        the type of object
  58    * metadata.name
  59        the name of the object, as given on put()
  60    * metadata.gridfile
  61        the gridfs object (if any, null otherwise)
  62    * metadata.collection
  63        the name of the collection
  64    * metadata.attributes
  65        arbitrary custom attributes set in
  66        put(attributes=obj). This is used e.g. by
  67        OmegaRuntime's fit() method to record the
  68        data used in the model's training.
  69
  70    **.put()** and **.get()** use helper methods specific to the type in
  71    object's type and metadata.kind, respectively. In the future
  72    a plugin system will enable extension to other types.
  73"""
  74from __future__ import absolute_import
  75
  76import logging
  77import os
  78import shutil
  79import tempfile
  80import warnings
  81import weakref
  82from datetime import datetime
  83from uuid import uuid4
  84
  85import bson
  86import gridfs
  87from mongoengine.connection import disconnect, \
  88    connect, _connections, get_db
  89from mongoengine.errors import DoesNotExist
  90from mongoengine.fields import GridFSProxy
  91from mongoengine.queryset.visitor import Q
  92
  93from omegaml.store.fastinsert import fast_insert, default_chunksize
  94from omegaml.util import unravel_index, restore_index, make_tuple, jsonescape, \
  95    cursor_to_dataframe, convert_dtypes, load_class, extend_instance, ensure_index, PickableCollection, \
  96    mongo_compatible, signature
  97from .queryops import sanitize_filter
  98from ..documents import make_Metadata, MDREGISTRY
  99from ..mongoshim import sanitize_mongo_kwargs, waitForConnection
 100from ..util import (is_estimator, is_dataframe, is_ndarray, is_spark_mllib,
 101                    settings as omega_settings, urlparse, is_series)
 102
 103logger = logging.getLogger(__name__)
 104
 105
[docs] 106class OmegaStore(object): 107 """ 108 The storage backend for models and data 109 """ 110 111 def __init__(self, mongo_url=None, bucket=None, prefix=None, kind=None, defaults=None, dbalias=None): 112 """ 113 :param mongo_url: the mongourl to use for the gridfs 114 :param bucket: the mongo collection to use for gridfs 115 :param prefix: the path prefix for files. defaults to blank 116 :param kind: the kind or list of kinds to limit this store to 117 """ 118 self.defaults = defaults or omega_settings() 119 self.mongo_url = mongo_url or self.defaults.OMEGA_MONGO_URL 120 self.bucket = bucket or self.defaults.OMEGA_MONGO_COLLECTION 121 self._fs_collection = self._ensure_fs_collection() 122 self._fs = None 123 self._tmppath = os.path.join(self.defaults.OMEGA_TMP, uuid4().hex) 124 self.prefix = prefix or '' 125 self.force_kind = kind 126 self._Metadata_cls = None 127 # don't initialize db here to avoid using the default settings 128 # otherwise Metadata will already have a connection and not use 129 # the one provided in override_settings 130 self._db = None 131 self._dbalias = dbalias 132 # add backends and mixins 133 self._apply_mixins() 134 # register backends 135 self.register_backends() 136 # register finalizer for cleanup 137 weakref.finalize(self, self._cleanup, repr(self), tmppath=str(self._tmppath)) 138 139 def __repr__(self): 140 return 'OmegaStore(bucket={}, prefix={})'.format(self.bucket, self.prefix) 141 142 def __equal__(self, other): 143 """test for equality of OmegaStore instances 144 145 Args: 146 other: OmegaStore instance 147 148 Returns: 149 True if other is the same database, same bucket, same prefix 150 """ 151 return self.mongo_url == other.mongo_url and self.bucket == other.bucket and self.prefix == other.prefix 152 153 @property 154 def tmppath(self): 155 """ 156 return an instance-specific temporary path 157 """ 158 os.makedirs(self._tmppath, exist_ok=True) 159 return self._tmppath 160 161 @property 162 def mongodb(self): 163 """ 164 Returns a mongo database object 165 """ 166 if self._db is not None: 167 return self._db 168 # parse salient parts of mongourl, e.g. 169 # mongodb://user:password@host/dbname 170 self.parsed_url = urlparse.urlparse(self.mongo_url) 171 self.database_name = self.parsed_url.path[1:] 172 host = self.parsed_url.netloc 173 scheme = self.parsed_url.scheme 174 username, password = None, None 175 if '@' in host: 176 creds, host = host.split('@', 1) 177 if ':' in creds: 178 username, password = creds.split(':') 179 # connect via mongoengine 180 # 181 # note this uses a MongoClient in the background, with pooled 182 # connections. there are multiprocessing issues with pymongo: 183 # http://api.mongodb.org/python/3.2/faq.html#using-pymongo-with-multiprocessing 184 # connect=False is due to https://jira.mongodb.org/browse/PYTHON-961 185 # this defers connecting until the first access 186 # serverSelectionTimeoutMS=2500 is to fail fast, the default is 30000 187 # 188 # use an instance specific alias, note that access to Metadata and 189 # QueryCache must pass the very same alias 190 self._dbalias = alias = self._dbalias or 'omega-{}'.format(uuid4().hex) 191 # always disconnect before registering a new connection because 192 # mongoengine.connect() forgets all connection settings upon disconnect 193 if alias not in _connections: 194 disconnect(alias) 195 connection = connect(alias=alias, db=self.database_name, 196 host=f'{scheme}://{host}', 197 username=username, 198 password=password, 199 connect=False, 200 authentication_source='admin', 201 **sanitize_mongo_kwargs(self.defaults.OMEGA_MONGO_SSL_KWARGS), 202 ) 203 # since PyMongo 4, connect() no longer waits for connection 204 waitForConnection(connection) 205 self._db = get_db(alias) 206 return self._db 207 208 @property 209 def _Metadata(self): 210 if self._Metadata_cls is None: 211 # hack to localize metadata 212 db = self.mongodb 213 self._Metadata_cls = make_Metadata(db_alias=self._dbalias, 214 collection=self._fs_collection) 215 return self._Metadata_cls 216 217 @property 218 def fs(self): 219 """ 220 Retrieve a gridfs instance using url and collection provided 221 222 :return: a gridfs instance 223 """ 224 if self._fs is not None: 225 return self._fs 226 self._fs = gridfs.GridFS(self.mongodb, collection=self._fs_collection) 227 self._ensure_fs_index(self._fs) 228 return self._fs 229 230 def metadata(self, name=None, bucket=None, prefix=None, version=-1, **kwargs): 231 """ 232 Returns a metadata document for the given entry name 233 """ 234 # FIXME: version attribute does not do anything 235 # FIXME: metadata should be stored in a bucket-specific collection 236 # to enable access control, see https://docs.mongodb.com/manual/reference/method/db.create 237 # 238 # Role/#db.createRole 239 db = self.mongodb 240 fs = self.fs 241 prefix = prefix or self.prefix 242 bucket = bucket or self.bucket 243 # Meta is to silence lint on import error 244 Meta = self._Metadata 245 return Meta.objects(name=str(name), prefix=prefix, bucket=bucket).no_cache().first() 246 247 def make_metadata(self, name, kind, bucket=None, prefix=None, **kwargs): 248 """ 249 create or update a metadata object 250 251 this retrieves a Metadata object if it exists given the kwargs. Only 252 the name, prefix and bucket arguments are considered 253 254 for existing Metadata objects, the attributes kw is treated as follows: 255 256 * attributes=None, the existing attributes are left as is 257 * attributes={}, the attributes value on an existing metadata object 258 is reset to the empty dict 259 * attributes={ some : value }, the existing attributes are updated 260 261 For new metadata objects, attributes defaults to {} if not specified, 262 else is set as provided. 263 264 :param name: the object name 265 :param bucket: the bucket, optional, defaults to self.bucket 266 :param prefix: the prefix, optional, defaults to self.prefix 267 268 """ 269 # TODO kept _make_metadata for backwards compatibility. 270 return self._make_metadata(name, bucket=bucket, prefix=prefix, 271 kind=kind, **kwargs) 272 273 def _make_metadata(self, name=None, bucket=None, prefix=None, **kwargs): 274 """ 275 create or update a metadata object 276 277 this retrieves a Metadata object if it exists given the kwargs. Only 278 the name, prefix and bucket arguments are considered 279 280 for existing Metadata objects, the attributes kw is treated as follows: 281 282 * attributes=None, the existing attributes are left as is 283 * attributes={}, the attributes value on an existing metadata object 284 is reset to the empty dict 285 * attributes={ some : value }, the existing attributes are updated 286 287 For new metadata objects, attributes defaults to {} if not specified, 288 else is set as provided. 289 290 :param name: the object name 291 :param bucket: the bucket, optional, defaults to self.bucket 292 :param prefix: the prefix, optional, defaults to self.prefix 293 """ 294 bucket = bucket or self.bucket 295 prefix = prefix or self.prefix 296 meta = self.metadata(name=name, 297 prefix=prefix, 298 bucket=bucket) 299 if meta: 300 dict_fields = 'attributes', 'kind_meta' 301 for k, v in kwargs.items(): 302 if k in dict_fields and v is not None and len(v) > 0: 303 previous = getattr(meta, k, {}) 304 previous.update(v) 305 setattr(meta, k, previous) 306 elif k in dict_fields and v is not None and len(v) == 0: 307 setattr(meta, k, {}) 308 elif k in dict_fields and v is None: 309 # ignore non specified attributes 310 continue 311 else: 312 # by default set whatever attribute is provided 313 setattr(meta, k, v) 314 else: 315 meta = self._Metadata(name=name, bucket=bucket, prefix=prefix, 316 **kwargs) 317 return meta 318 319 def _drop_metadata(self, name=None, **kwargs): 320 # internal method to delete meta data of an object 321 meta = self.metadata(name, **kwargs) 322 if meta is not None: 323 meta.delete() 324 325 def collection(self, name=None, bucket=None, prefix=None): 326 """ 327 Returns a mongo db collection as a datastore 328 329 If there is an existing object of name, will return the .collection 330 of the object. Otherwise returns the collection according to naming 331 convention {bucket}.{prefix}.{name}.datastore 332 333 :param name: the collection to use. if none defaults to the 334 collection name given on instantiation. the actual collection name 335 used is always prefix + name + '.data' 336 """ 337 # see if we have a known object and a collection for that, if not define it 338 meta = self.metadata(name, bucket=bucket, prefix=prefix) 339 collection = meta.collection if meta else None 340 if not collection: 341 collection = self.object_store_key(name, '.datastore') 342 collection = collection.replace('..', '.') 343 # return the collection 344 try: 345 datastore = getattr(self.mongodb, collection) 346 except Exception as e: 347 raise e 348 return PickableCollection(datastore) 349 350 def _apply_mixins(self): 351 """ 352 apply mixins in defaults.OMEGA_STORE_MIXINS 353 """ 354 for mixin in self.defaults.OMEGA_STORE_MIXINS: 355 conditional = self._mixins_conditional 356 extend_instance(self, mixin, 357 conditional=conditional) 358 359 def _mixins_conditional(self, cls, obj): 360 return cls.supports(obj) if hasattr(cls, 'supports') else True 361 362 def register_backends(self): 363 """ 364 register backends in defaults.OMEGA_STORE_BACKENDS 365 """ 366 for kind, backend in self.defaults.OMEGA_STORE_BACKENDS.items(): 367 self.register_backend(kind, backend) 368 369 def register_backend(self, kind, backend): 370 """ 371 register a backend class 372 373 :param kind: (str) the backend kind 374 :param backend: (class) the backend class 375 """ 376 self.defaults.OMEGA_STORE_BACKENDS[kind] = load_class(backend) 377 if kind not in MDREGISTRY.KINDS: 378 MDREGISTRY.KINDS.append(kind) 379 return self 380 381 def register_mixin(self, mixincls): 382 """ 383 register a mixin class 384 385 :param mixincls: (class) the mixin class 386 """ 387 extend_instance(self, mixincls) 388 return self 389
[docs] 390 def put(self, obj, name, attributes=None, kind=None, replace=False, model_store=None, 391 data_store=None, **kwargs): 392 """ 393 Stores an object, store estimators, pipelines, numpy arrays or 394 pandas dataframes 395 """ 396 if replace: 397 self.drop(name, force=True) 398 backend = self.get_backend_byobj(obj, name, attributes=attributes, kind=kind, 399 model_store=model_store, data_store=data_store, 400 **kwargs) 401 if backend: 402 return backend.put(obj, name, attributes=attributes, **kwargs) 403 # TODO move all of the specifics to backend implementations 404 if is_estimator(obj): 405 backend = self.get_backend_bykind(MDREGISTRY.SKLEARN_JOBLIB) 406 return backend.put(obj, name, attributes=attributes, **kwargs) 407 elif is_spark_mllib(obj): 408 backend = self.get_backend_bykind(MDREGISTRY.SKLEARN_JOBLIB) 409 return backend.put(obj, name, attributes=attributes, **kwargs) 410 elif is_dataframe(obj) or is_series(obj): 411 groupby = kwargs.get('groupby') 412 if obj.empty: 413 from warnings import warn 414 warn( 415 'Provided dataframe is empty, ignoring it, doing nothing here!') 416 return None 417 if kwargs.pop('as_hdf', False): 418 return self.put_dataframe_as_hdf( 419 obj, name, attributes, **kwargs) 420 elif groupby: 421 return self.put_dataframe_as_dfgroup( 422 obj, name, groupby, attributes) 423 append = kwargs.pop('append', None) 424 timestamp = kwargs.pop('timestamp', None) 425 index = kwargs.pop('index', None) 426 chunksize = kwargs.pop('chunksize', default_chunksize) 427 return self.put_dataframe_as_documents( 428 obj, name, append=append, attributes=attributes, index=index, 429 timestamp=timestamp, chunksize=chunksize, **kwargs) 430 elif is_ndarray(obj): 431 if kwargs.pop('as_pydata', False): 432 return self.put_pyobj_as_document(obj.tolist(), name, 433 attributes=attributes, **kwargs) 434 return self.put_ndarray_as_hdf(obj, name, attributes=attributes, 435 **kwargs) 436 elif isinstance(obj, (dict, list, tuple)): 437 kwargs.pop('as_pydata', None) 438 if kwargs.pop('as_hdf', False): 439 return self.put_pyobj_as_hdf(obj, name, 440 attributes=attributes, **kwargs) 441 return self.put_pyobj_as_document(obj, name, 442 attributes=attributes, 443 **kwargs) 444 else: 445 raise TypeError('type %s not supported' % type(obj))
446 447 def put_dataframe_as_documents(self, obj, name, append=None, 448 attributes=None, index=None, 449 timestamp=None, chunksize=None, 450 ensure_compat=True, _fast_insert=fast_insert, 451 **kwargs): 452 """ 453 store a dataframe as a row-wise collection of documents 454 455 :param obj: the dataframe to store 456 :param name: the name of the item in the store 457 :param append: if False collection will be dropped before inserting, 458 if True existing documents will persist. Defaults to True. If not 459 specified and rows have been previously inserted, will issue a 460 warning. 461 :param index: list of columns, using +, -, @ as a column prefix to 462 specify ASCENDING, DESCENDING, GEOSPHERE respectively. For @ the 463 column has to represent a valid GeoJSON object. 464 :param timestamp: if True or a field name adds a timestamp. If the 465 value is a boolean or datetime, uses _created as the field name. 466 The timestamp is always datetime.datetime.utcnow(). May be overriden 467 by specifying the tuple (col, datetime). 468 :param ensure_compat: if True attempt to convert obj to mongodb compatibility, 469 set to False only if you are sure to have only compatible values in dataframe. 470 defaults to True. False may reduce memory and increase speed on large dataframes. 471 :return: the Metadata object created 472 """ 473 import pandas as pd 474 from .queryops import MongoQueryOps 475 collection = self.collection(name) 476 if is_series(obj): 477 import pandas as pd 478 obj = pd.DataFrame(obj, index=obj.index, columns=[str(obj.name)]) 479 store_series = True 480 else: 481 store_series = False 482 if append is False: 483 self.drop(name, force=True) 484 elif append is None and collection.count_documents({}, limit=1): 485 from warnings import warn 486 warn('%s already exists, will append rows' % name) 487 if index: 488 # get index keys 489 if isinstance(index, dict): 490 idx_kwargs = index 491 index = index.pop('columns') 492 else: 493 idx_kwargs = {} 494 # create index with appropriate options 495 keys, idx_kwargs = MongoQueryOps().make_index(index, **idx_kwargs) 496 ensure_index(collection, keys, **idx_kwargs) 497 if timestamp: 498 dt = datetime.utcnow() 499 if isinstance(timestamp, bool): 500 col = '_created' 501 elif isinstance(timestamp, str): 502 col = timestamp 503 elif isinstance(timestamp, datetime): 504 col, dt = '_created', timestamp 505 elif isinstance(timestamp, tuple): 506 col, dt = timestamp 507 else: 508 col = '_created' 509 obj[col] = dt 510 # store dataframe indicies 511 # FIXME this may be a performance issue, use size stored on stats or metadata 512 row_count = self.collection(name).estimated_document_count() 513 # fixes #466, ensure column names are strings in a multiindex 514 if isinstance(obj.columns, pd.MultiIndex): 515 obj.columns = obj.columns.map('_'.join) 516 obj, idx_meta = unravel_index(obj, row_count=row_count) 517 stored_columns = [jsonescape(col) for col in obj.columns] 518 column_map = list(zip(obj.columns, stored_columns)) 519 d_column_map = dict(column_map) 520 dtypes = { 521 d_column_map.get(k): v.name 522 for k, v in obj.dtypes.items() 523 } 524 kind_meta = { 525 'columns': column_map, 526 'dtypes': dtypes, 527 'idx_meta': idx_meta 528 } 529 # ensure column names to be strings 530 obj.columns = stored_columns 531 # create mongon indicies for data frame index columns 532 df_idxcols = [col for col in obj.columns if col.startswith('_idx#')] 533 if df_idxcols: 534 keys, idx_kwargs = MongoQueryOps().make_index(df_idxcols) 535 ensure_index(collection, keys, **idx_kwargs) 536 # create index on row id 537 keys, idx_kwargs = MongoQueryOps().make_index(['_om#rowid']) 538 ensure_index(collection, keys, **idx_kwargs) 539 # bulk insert 540 # -- get native objects 541 # -- seems to be required since pymongo 3.3.x. if not converted 542 # pymongo raises Cannot Encode object for int64 types 543 if ensure_compat: 544 for col, col_dtype in dtypes.items(): 545 if 'datetime' in col_dtype: 546 obj[col].fillna('', inplace=True) 547 obj = obj.astype('O', errors='ignore') 548 _fast_insert(obj, self, name, chunksize=chunksize) 549 kind = (MDREGISTRY.PANDAS_SEROWS 550 if store_series 551 else MDREGISTRY.PANDAS_DFROWS) 552 meta = self._make_metadata(name=name, 553 prefix=self.prefix, 554 bucket=self.bucket, 555 kind=kind, 556 kind_meta=kind_meta, 557 attributes=attributes, 558 collection=collection.name) 559 return meta.save() 560 561 def put_dataframe_as_dfgroup(self, obj, name, groupby, attributes=None): 562 """ 563 store a dataframe grouped by columns in a mongo document 564 565 :Example: 566 567 > # each group 568 > { 569 > #group keys 570 > key: val, 571 > _data: [ 572 > # only data keys 573 > { key: val, ... } 574 > ]} 575 576 """ 577 578 def row_to_doc(obj): 579 for gval, gdf in obj.groupby(groupby): 580 if hasattr(gval, 'astype'): 581 gval = make_tuple(gval.astype('O')) 582 else: 583 gval = make_tuple(gval) 584 doc = dict(zip(groupby, gval)) 585 datacols = list(set(gdf.columns) - set(groupby)) 586 doc['_data'] = gdf[datacols].astype('O').to_dict('records') 587 yield doc 588 589 datastore = self.collection(name) 590 datastore.drop() 591 datastore.insert_many(row_to_doc(obj)) 592 return self._make_metadata(name=name, 593 prefix=self.prefix, 594 bucket=self.bucket, 595 kind=MDREGISTRY.PANDAS_DFGROUP, 596 attributes=attributes, 597 collection=datastore.name).save() 598 599 def put_dataframe_as_hdf(self, obj, name, attributes=None, **kwargs): 600 filename = self.object_store_key(name, '.hdf') 601 hdffname = self._package_dataframe2hdf(obj, filename) 602 with open(hdffname, 'rb') as fhdf: 603 fileid = self.fs.put(fhdf, filename=filename) 604 return self._make_metadata(name=name, 605 prefix=self.prefix, 606 bucket=self.bucket, 607 kind=MDREGISTRY.PANDAS_HDF, 608 attributes=attributes, 609 gridfile=GridFSProxy(db_alias=self._dbalias, 610 grid_id=fileid)).save() 611 612 def put_ndarray_as_hdf(self, obj, name, attributes=None, **kwargs): 613 """ store numpy array as hdf 614 615 this is hack, converting the array to a dataframe then storing 616 it 617 """ 618 import pandas as pd 619 df = pd.DataFrame(obj) 620 return self.put_dataframe_as_hdf(df, name, attributes=attributes) 621 622 def put_pyobj_as_hdf(self, obj, name, attributes=None, **kwargs): 623 """ 624 store list, tuple, dict as hdf 625 626 this requires the list, tuple or dict to be convertible into 627 a dataframe 628 """ 629 import pandas as pd 630 df = pd.DataFrame(obj) 631 return self.put_dataframe_as_hdf(df, name, attributes=attributes) 632 633 def put_pyobj_as_document(self, obj, name, attributes=None, append=True, index=None, as_many=None, **kwargs): 634 """ 635 store a dict as a document 636 637 similar to put_dataframe_as_documents no data will be replaced by 638 default. that is, obj is appended as new documents into the objects' 639 mongo collection. to replace the data, specify append=False. 640 """ 641 collection = self.collection(name) 642 if append is False: 643 collection.drop() 644 elif append is None and collection.esimated_document_count(limit=1): 645 from warnings import warn 646 warn('%s already exists, will append rows' % name) 647 if index: 648 # create index with appropriate options 649 from omegaml.store import MongoQueryOps 650 if isinstance(index, dict): 651 idx_kwargs = index 652 index = index.pop('columns') 653 else: 654 idx_kwargs = {} 655 index = [f'data.{c}' for c in index] 656 keys, idx_kwargs = MongoQueryOps().make_index(index, **idx_kwargs) 657 ensure_index(collection, keys, **idx_kwargs) 658 if as_many is None: 659 as_many = isinstance(obj, (list, tuple)) and isinstance(obj[0], (list, tuple)) 660 if as_many: 661 # list of lists are inserted as many objects, as in pymongo < 4 662 records = (mongo_compatible({'data': item}) for item in obj) 663 result = collection.insert_many(records) 664 objid = result.inserted_ids[-1] 665 else: 666 result = collection.insert_one(mongo_compatible({'data': obj})) 667 objid = result.inserted_id 668 669 return self._make_metadata(name=name, 670 prefix=self.prefix, 671 bucket=self.bucket, 672 kind=MDREGISTRY.PYTHON_DATA, 673 collection=collection.name, 674 attributes=attributes, 675 objid=objid).save() 676
[docs] 677 def drop(self, name, force=False, version=-1, report=False, **kwargs): 678 """ 679 Drop the object 680 681 :param name: The name of the object. If the name is a pattern it will 682 be expanded using .list(), and call .drop() on every obj found. 683 :param force: If True ignores DoesNotExist exception, defaults to False 684 meaning this raises a DoesNotExist exception if the name does not 685 exist 686 :param report: if True returns a dict name=>status, where status is True 687 if deleted, False if not deleted 688 :return: True if object was deleted, False if not. 689 If force is True and the object does not exist it will still return True 690 :raises: DoesNotExist if the object does not exist and ```force=False``` 691 """ 692 is_pattern = '*' in name 693 objs = [name] if not is_pattern else self.list(name) 694 results = [] 695 for name in objs: 696 try: 697 backend = self.get_backend(name) 698 drop = backend.drop if backend else self._drop 699 result = drop(name, force=force, version=version, **kwargs) 700 except Exception as e: 701 result = False 702 if not force and not is_pattern: 703 raise 704 if force: 705 result = self._drop(name, force=force, version=version) 706 results.append((name, result)) 707 if not objs: 708 result = self._drop(name, force=force, version=version) 709 results.append((name, result)) 710 return dict(results) if report else len(results) > 0
711 712 def _drop(self, name, force=False, version=-1, keep_data=False, **kwargs): 713 meta = self.metadata(name, version=version) 714 if meta is None and not force: 715 raise DoesNotExist(name) 716 collection = self.collection(name) 717 if collection and not keep_data: 718 self.mongodb.drop_collection(collection.name) 719 if meta: 720 if meta.collection and not keep_data: 721 self.mongodb.drop_collection(meta.collection) 722 if meta and meta.gridfile is not None and not keep_data: 723 meta.gridfile.delete() 724 self._drop_metadata(name) 725 return True 726 return False 727 728 def get_backend_bykind(self, kind, model_store=None, data_store=None, 729 **kwargs): 730 """ 731 return the backend by a given object kind 732 733 :param kind: The object kind 734 :param model_store: the OmegaStore instance used to store models 735 :param data_store: the OmegaStore instance used to store data 736 :param kwargs: the kwargs passed to the backend initialization 737 :return: the backend 738 """ 739 try: 740 backend_cls = load_class(self.defaults.OMEGA_STORE_BACKENDS[kind]) 741 except KeyError as e: 742 raise ValueError('backend {kind} does not exist'.format(**locals())) 743 model_store = model_store or self 744 data_store = data_store or self 745 backend = backend_cls(model_store=model_store, 746 data_store=data_store, **kwargs) 747 return backend 748 749 def get_backend(self, name, model_store=None, data_store=None, **kwargs): 750 """ 751 return the backend by a given object name 752 753 :param kind: The object kind 754 :param model_store: the OmegaStore instance used to store models 755 :param data_store: the OmegaStore instance used to store data 756 :param kwargs: the kwargs passed to the backend initialization 757 :return: the backend 758 """ 759 meta = self.metadata(name) 760 if meta is not None and meta.kind in self.defaults.OMEGA_STORE_BACKENDS: 761 return self.get_backend_bykind(meta.kind, 762 model_store=model_store, 763 data_store=data_store, 764 **kwargs) 765 return None 766 767 def help(self, name_or_obj=None, kind=None, raw=False, display=None, renderer=None): 768 """ get help for an object by looking up its backend and calling help() on it 769 770 Retrieves the object's metadata and looks up its corresponding backend. If the 771 metadata.attributes['docs'] is a string it will display this as the help() contents. 772 If the string starts with 'http://' or 'https://' it will open the web page. 773 774 Args: 775 name_or_obj (str|obj): the name or actual object to get help for 776 kind (str): optional, if specified forces retrieval of backend for the given kind 777 raw (bool): optional, if True forces help to be the backend type of the object. 778 If False returns the attributes[docs] on the object's metadata, if available. 779 Defaults to False 780 display (fn): optional, callable for interactive display, defaults to help in 781 if sys.flags.interactive is True, else uses pydoc.render_doc with plaintext 782 renderer (fn): optional, the renderer= argument for pydoc.render_doc to use if 783 sys.flags.interactive is False and display is not provided 784 785 Returns: 786 * help(obj) if python is in interactive mode 787 * text(str) if python is in not interactive mode 788 """ 789 import sys 790 import pydoc 791 interactive = bool(display) if display is not None else sys.flags.interactive 792 display = display or help 793 renderer = renderer or pydoc.plaintext 794 obj = self._resolve_help_backend(name_or_obj=name_or_obj, kind=kind, raw=raw) 795 if any(str(obj.__doc__).startswith(v) for v in ('https://', 'http://')): 796 obj = obj.__doc__ 797 if interactive and display is help: 798 import webbrowser 799 display = webbrowser.open 800 return display(obj) if interactive else pydoc.render_doc(obj, renderer=renderer) 801 802 def _resolve_help_backend(self, name_or_obj=None, kind=None, raw=False): 803 # helper so we can test help 804 meta = self.metadata(name_or_obj) if name_or_obj else None 805 if kind: 806 backend = self.get_backend_bykind(kind) 807 else: 808 backend = self.get_backend(name_or_obj) or self.get_backend_byobj(name_or_obj) 809 if backend is None: 810 backend = self 811 if not raw and meta is not None and 'docs' in meta.attributes: 812 def UserDocumentation(): 813 pass 814 815 basedoc = backend.__doc__ or '' 816 UserDocumentation.__doc__ = (basedoc + 817 meta.attributes['docs']) 818 backend = UserDocumentation 819 return backend 820 821 def get_backend_byobj(self, obj, name=None, kind=None, attributes=None, 822 model_store=None, data_store=None, **kwargs): 823 """ 824 return the matching backend for the given obj 825 826 Returns: 827 the first backend that supports the given parameters or None 828 """ 829 model_store = model_store or self 830 data_store = data_store or self 831 meta = self.metadata(name) if name else None 832 kind = kind or (meta.kind if meta is not None else None) 833 backend = None 834 if kind: 835 objtype = str(type(obj)) 836 if kind in self.defaults.OMEGA_STORE_BACKENDS: 837 backend = self.get_backend_bykind(kind, data_store=data_store, model_store=model_store) 838 if not backend.supports(obj, name, attributes=attributes, 839 data_store=data_store, 840 model_store=model_store, 841 meta=meta, 842 kind=kind, **kwargs): 843 warnings.warn('Backend {kind} does not support {objtype}'.format(**locals())) 844 else: 845 pass 846 # TODO refactor pandas and numpy handling into proper backend to avoid misleading warning 847 # warnings.warn('Backend {kind} not found {objtype}. Reverting to default'.format(**locals())) 848 else: 849 for backend_kind, backend_cls in self.defaults.OMEGA_STORE_BACKENDS.items(): 850 backend = self.get_backend_bykind(backend_kind, data_store=data_store, model_store=model_store) 851 if backend.supports(obj, name, attributes=attributes, 852 data_store=data_store, model_store=model_store, 853 **kwargs): 854 break 855 else: 856 backend = None 857 return backend 858
[docs] 859 def getl(self, *args, **kwargs): 860 """ return a lazy MDataFrame for a given object 861 862 Same as .get, but returns a MDataFrame 863 864 """ 865 return self.get(*args, lazy=True, **kwargs)
866
[docs] 867 def get(self, name, version=-1, force_python=False, 868 kind=None, model_store=None, data_store=None, **kwargs): 869 """ 870 Retrieve an object 871 872 :param name: The name of the object 873 :param version: Version of the stored object (not supported) 874 :param force_python: Return as a python object 875 :param kwargs: kwargs depending on object kind 876 :return: an object, estimator, pipelines, data array or pandas dataframe 877 previously stored with put() 878 """ 879 meta = self.metadata(name, version=version) 880 if meta is None: 881 return None 882 if not force_python: 883 backend = (self.get_backend(name, model_store=model_store, 884 data_store=data_store) 885 if not kind else self.get_backend_bykind(kind, 886 model_store=model_store, 887 data_store=data_store)) 888 if backend is not None: 889 # FIXME: some backends need to get model_store, data_store, but fails tests 890 return backend.get(name, **kwargs) # model_store=model_store, data_store=data_store, **kwargs) 891 if meta.kind == MDREGISTRY.SKLEARN_JOBLIB: 892 backend = self.get_backend(name) 893 return backend.get_model(name) 894 elif meta.kind == MDREGISTRY.SPARK_MLLIB: 895 backend = self.get_backend(name) 896 return backend.get_model(name, version) 897 elif meta.kind == MDREGISTRY.PANDAS_DFROWS: 898 return self.get_dataframe_documents(name, version=version, 899 **kwargs) 900 elif meta.kind == MDREGISTRY.PANDAS_SEROWS: 901 return self.get_dataframe_documents(name, version=version, 902 is_series=True, 903 **kwargs) 904 elif meta.kind == MDREGISTRY.PANDAS_DFGROUP: 905 return self.get_dataframe_dfgroup( 906 name, version=version, **kwargs) 907 elif meta.kind == MDREGISTRY.PYTHON_DATA: 908 return self.get_python_data(name, version=version, **kwargs) 909 elif meta.kind == MDREGISTRY.PANDAS_HDF: 910 return self.get_dataframe_hdf(name, version=version) 911 return self.get_object_as_python(meta, version=version)
912 913 def get_dataframe_documents(self, name, columns=None, lazy=False, 914 filter=None, version=-1, is_series=False, 915 chunksize=None, sanitize=True, trusted=None, 916 **kwargs): 917 """ 918 Internal method to return DataFrame from documents 919 920 :param name: the name of the object (str) 921 :param columns: the column projection as a list of column names 922 :param lazy: if True returns a lazy representation as an MDataFrame. 923 If False retrieves all data and returns a DataFrame (default) 924 :param filter: the filter to be applied as a column__op=value dict 925 :param sanitize: sanitize filter by removing all $op filter keys, 926 defaults to True. Specify False to allow $op filter keys. $where 927 is always removed as it is considered unsafe. 928 :param version: the version to retrieve (not supported) 929 :param is_series: if True retruns a Series instead of a DataFrame 930 :param kwargs: remaining kwargs are used a filter. The filter kwarg 931 overrides other kwargs. 932 :return: the retrieved object (DataFrame, Series or MDataFrame) 933 934 """ 935 from omegaml.store.queryops import sanitize_filter 936 from omegaml.store.filtered import FilteredCollection 937 938 collection = self.collection(name) 939 meta = self.metadata(name) 940 filter = filter or kwargs 941 filter = sanitize_filter(filter, no_ops=sanitize, trusted=trusted) 942 if lazy or chunksize: 943 from ..mdataframe import MDataFrame 944 df = MDataFrame(collection, 945 metadata=meta.kind_meta, 946 columns=columns).query(**filter) 947 if is_series: 948 df = df[0] 949 if chunksize is not None and chunksize > 0: 950 return df.iterchunks(chunksize=chunksize) 951 else: 952 # TODO ensure the same processing is applied in MDataFrame 953 # TODO this method should always use a MDataFrame disregarding lazy 954 if filter: 955 from .query import Filter 956 query = Filter(collection, **filter).query 957 cursor = FilteredCollection(collection).find(filter=query, projection=columns) 958 else: 959 cursor = FilteredCollection(collection).find(projection=columns) 960 # restore dataframe 961 df = cursor_to_dataframe(cursor) 962 if '_id' in df.columns: 963 del df['_id'] 964 if hasattr(meta, 'kind_meta'): 965 df = convert_dtypes(df, meta.kind_meta.get('dtypes', {})) 966 # -- restore columns 967 meta_columns = dict(meta.kind_meta.get('columns')) 968 if meta_columns: 969 # apply projection, if any 970 if columns: 971 # get only projected columns 972 # meta_columns is {origin_column: stored_column} 973 orig_columns = dict({k: v for k, v in meta_columns.items() 974 if k in columns or v in columns}) 975 else: 976 # restore columns to original name 977 orig_columns = meta_columns 978 df.rename(columns=orig_columns, inplace=True) 979 # -- restore indexes 980 idx_meta = meta.kind_meta.get('idx_meta') 981 if idx_meta: 982 df = restore_index(df, idx_meta) 983 # -- restore row order 984 if is_series: 985 index = df.index 986 name = df.columns[0] 987 df = df[name] 988 df.index = index 989 df.name = None if name == 'None' else name 990 return df 991 992 def rebuild_params(self, kwargs, collection): 993 """ 994 Returns a modified set of parameters for querying mongodb 995 based on how the mongo document is structured and the 996 fields the document is grouped by. 997 998 **Note: Explicitly to be used with get_grouped_data only** 999 1000 :param kwargs: Mongo filter arguments 1001 :param collection: The name of mongodb collection 1002 :return: Returns a set of parameters as dictionary. 1003 """ 1004 modified_params = {} 1005 db_structure = collection.find_one({}, {'_id': False}) 1006 groupby_columns = list(set(db_structure.keys()) - set(['_data'])) 1007 if kwargs is not None: 1008 for item in kwargs: 1009 if item not in groupby_columns: 1010 modified_query_param = '_data.' + item 1011 modified_params[modified_query_param] = kwargs.get(item) 1012 else: 1013 modified_params[item] = kwargs.get(item) 1014 return modified_params 1015 1016 def get_dataframe_dfgroup(self, name, version=-1, sanitize=True, kwargs=None): 1017 """ 1018 Return a grouped dataframe 1019 1020 :param name: the name of the object 1021 :param version: not supported 1022 :param kwargs: mongo db query arguments to be passed to 1023 collection.find() as a filter. 1024 :param sanitize: remove any $op operators in kwargs 1025 1026 """ 1027 import pandas as pd 1028 from omegaml.store.queryops import sanitize_filter 1029 from omegaml.store.filtered import FilteredCollection 1030 1031 def convert_doc_to_row(cursor): 1032 for doc in cursor: 1033 data = doc.pop('_data', []) 1034 for row in data: 1035 doc.update(row) 1036 yield doc 1037 1038 datastore = FilteredCollection(self.collection(name)) 1039 kwargs = kwargs if kwargs else {} 1040 params = self.rebuild_params(kwargs, datastore) 1041 params = sanitize_filter(params, no_ops=sanitize) 1042 cursor = datastore.find(params, projection={'_id': False}) 1043 df = pd.DataFrame(convert_doc_to_row(cursor)) 1044 return df 1045 1046 def get_dataframe_hdf(self, name, version=-1): 1047 """ 1048 Retrieve dataframe from hdf 1049 1050 :param name: The name of object 1051 :param version: The version of object (not supported) 1052 :return: Returns a python pandas dataframe 1053 :raises: gridfs.errors.NoFile 1054 """ 1055 df = None 1056 meta = self.metadata(name) 1057 filename = getattr(meta.gridfile, 'name', self.object_store_key(name, '.hdf')) 1058 if self.fs.exists(filename=filename): 1059 df = self._extract_dataframe_hdf(filename, version=version) 1060 return df 1061 else: 1062 raise gridfs.errors.NoFile( 1063 "{0} does not exist in mongo collection '{1}'".format( 1064 name, self.bucket)) 1065 1066 def get_python_data(self, name, filter=None, version=-1, lazy=False, trusted=False, **kwargs): 1067 """ 1068 Retrieve objects as python data 1069 1070 :param name: The name of object 1071 :param version: The version of object 1072 1073 :return: Returns the object as python list object 1074 """ 1075 datastore = self.collection(name) 1076 filter = filter or kwargs 1077 sanitize_filter(filter) if trusted is False or trusted != signature(filter) else filter 1078 cursor = datastore.find(filter, **kwargs) 1079 if lazy: 1080 return cursor 1081 data = (d.get('data') for d in cursor) 1082 return list(data) 1083 1084 def get_object_as_python(self, meta, version=-1): 1085 """ 1086 Retrieve object as python object 1087 1088 :param meta: The metadata object 1089 :param version: The version of the object 1090 1091 :return: Returns data as python object 1092 """ 1093 if meta.kind == MDREGISTRY.SKLEARN_JOBLIB: 1094 return meta.gridfile 1095 if meta.kind == MDREGISTRY.PANDAS_HDF: 1096 return meta.gridfile 1097 if meta.kind == MDREGISTRY.PANDAS_DFROWS: 1098 return list(getattr(self.mongodb, meta.collection).find()) 1099 if meta.kind == MDREGISTRY.PYTHON_DATA: 1100 col = getattr(self.mongodb, meta.collection) 1101 return col.find_one(dict(_id=meta.objid)).get('data') 1102 raise TypeError('cannot return kind %s as a python object' % meta.kind) 1103 1104 def __iter__(self): 1105 for f in self.list(include_temp=True): 1106 yield f 1107 1108 @property 1109 def buckets(self): 1110 return ['default' if b == self.defaults.OMEGA_MONGO_COLLECTION else b 1111 for b in self._Metadata.objects.distinct('bucket')] 1112
[docs] 1113 def list(self, pattern=None, regexp=None, kind=None, raw=False, hidden=None, 1114 include_temp=False, bucket=None, prefix=None, filter=None): 1115 """ 1116 List all files in store 1117 1118 specify pattern as a unix pattern (e.g. :code:`models/*`, 1119 or specify regexp) 1120 1121 :param pattern: the unix file pattern or None for all 1122 :param regexp: the regexp. takes precedence over pattern 1123 :param raw: if True return the meta data objects 1124 :param filter: specify additional filter criteria, optional 1125 :return: List of files in store 1126 1127 """ 1128 regex = lambda pattern: bson.regex.Regex(f'{pattern}') 1129 db = self.mongodb 1130 searchkeys = dict(bucket=bucket or self.bucket, 1131 prefix=prefix or self.prefix) 1132 q_excludes = Q() 1133 if regexp: 1134 searchkeys['name'] = regex(regexp) 1135 elif pattern: 1136 re_pattern = pattern.replace('*', '.*').replace('/', r'\/') 1137 searchkeys['name'] = regex(f'^{re_pattern}$') 1138 if not include_temp: 1139 q_excludes &= Q(name__not__startswith='_') 1140 q_excludes &= Q(name__not=regex(r'(.{1,*}\/?_.*)')) 1141 if not hidden: 1142 q_excludes &= Q(name__not__startswith='.') 1143 if kind or self.force_kind: 1144 kind = kind or self.force_kind 1145 if isinstance(kind, (tuple, list)): 1146 searchkeys.update(kind__in=kind) 1147 else: 1148 searchkeys.update(kind=kind) 1149 if filter: 1150 searchkeys.update(filter) 1151 q_search = Q(**searchkeys) & q_excludes 1152 files = self._Metadata.objects.no_cache()(q_search) 1153 return [f if raw else str(f.name).replace('.omm', '') for f in files]
1154 1155 def exists(self, name, hidden=False): 1156 """ check if object exists 1157 1158 Args: 1159 name (str): name of object 1160 hidden (bool): if True, include hidden files, defaults to 1161 False, unless name starts with '.' 1162 1163 Returns: 1164 bool, True if object exists 1165 1166 .. versionchanged:: 0.16.4 1167 hidden defaults to True if name starts with '.' 1168 """ 1169 hidden = True if name.startswith('.') else hidden 1170 return name in self.list(name, hidden=hidden) 1171 1172 def object_store_key(self, name, ext, hashed=None): 1173 """ 1174 Returns the store key 1175 1176 Unless you write a mixin or a backend you should not use this method 1177 1178 :param name: The name of object 1179 :param ext: The extension of the filename 1180 :param hashed: hash the key to support arbitrary name length, defaults 1181 to defaults.OMEGA_STORE_HASHEDNAMES, True by default since 0.13.7 1182 1183 :return: A filename with relative bucket, prefix and name 1184 """ 1185 # byte string 1186 _u8 = lambda t: t.encode('UTF-8', 'replace') if isinstance(t, str) else t 1187 key = self._get_obj_store_key(name, ext) 1188 hashed = hashed if hashed is not None else self.defaults.OMEGA_STORE_HASHEDNAMES 1189 if hashed: 1190 from hashlib import sha1 1191 # SEC: CWE-916 1192 # - status: wontfix 1193 # - reason: hashcode is used purely for name resolution, not a security function 1194 hasher = sha1() 1195 hasher.update(_u8(key)) 1196 key = hasher.hexdigest() 1197 return key 1198 1199 def _get_obj_store_key(self, name, ext, prefix=None, bucket=None): 1200 # backwards compatilibity implementation of object_store_key() 1201 name = '%s.%s' % (name, ext) if not name.endswith(ext) else name 1202 filename = '{bucket}.{prefix}.{name}'.format( 1203 bucket=bucket or self.bucket, 1204 prefix=prefix or self.prefix, 1205 name=name, 1206 ext=ext).replace('/', '_').replace('..', '.') 1207 return filename 1208 1209 def _package_dataframe2hdf(self, df, filename, key=None): 1210 """ 1211 Package a dataframe as a hdf file 1212 1213 :param df: The dataframe 1214 :param filename: Name of file 1215 1216 :return: Filename of hdf file 1217 """ 1218 lpath = tempfile.mkdtemp() 1219 fname = os.path.basename(filename) 1220 hdffname = os.path.join(self.tmppath, fname + '.hdf') 1221 key = key or 'data' 1222 df.to_hdf(hdffname, key) 1223 return hdffname 1224 1225 def _extract_dataframe_hdf(self, filename, version=-1): 1226 """ 1227 Extracts a dataframe from a stored hdf file 1228 1229 :param filename: The name of file 1230 :param version: The version of file 1231 1232 :return: Pandas dataframe 1233 """ 1234 import pandas as pd 1235 hdffname = os.path.join(self.tmppath, filename) 1236 dirname = os.path.dirname(hdffname) 1237 if not os.path.exists(dirname): 1238 os.makedirs(dirname) 1239 try: 1240 outf = self.fs.get_version(filename, version=version) 1241 except gridfs.errors.NoFile as e: 1242 raise e 1243 with open(hdffname, 'wb') as hdff: 1244 hdff.write(outf.read()) 1245 hdf = pd.HDFStore(hdffname) 1246 key = list(hdf.keys())[0] 1247 df = hdf[key] 1248 hdf.close() 1249 return df 1250 1251 def _ensure_fs_collection(self): 1252 # ensure backwards-compatible gridfs access 1253 if self.defaults.OMEGA_BUCKET_FS_LEGACY: 1254 # prior to 0.13.2 a single gridfs instance was used, always equal to the default collection 1255 return self.defaults.OMEGA_MONGO_COLLECTION 1256 if self.bucket == self.defaults.OMEGA_MONGO_COLLECTION: 1257 # from 0.13.2 onwards, only the default bucket is equal to the default collection 1258 # backwards compatibility for existing installations 1259 return self.bucket 1260 # since 0.13.2, all buckets other than the default use a qualified collection name to 1261 # effectively separate files in different buckets, enabling finer-grade access control 1262 # and avoiding name collisions from different buckets 1263 return '{}_{}'.format(self.defaults.OMEGA_MONGO_COLLECTION, self.bucket) 1264 1265 def _ensure_fs_index(self, fs): 1266 # make sure we have proper chunks and file indicies. this should be created on first write, but sometimes is not 1267 # see https://docs.mongodb.com/manual/core/gridfs/#gridfs-indexes 1268 # pymongo since 4.1 no longer has fs._GridFS 1269 chunks_collection = fs._GridFS__chunks if hasattr(fs, '_GridFS__chunks') else fs._chunks 1270 files_collection = fs._GridFS__files if hasattr(fs, '_GridFS__files') else fs._files 1271 ensure_index(chunks_collection, {'files_id': 1, 'n': 1}, unique=True) 1272 ensure_index(files_collection, {'filename': 1, 'uploadDate': 1}) 1273 1274 def sign(self, filter): 1275 return signature(filter) 1276 1277 @staticmethod 1278 def _cleanup(objrepr, tmppath=None): 1279 # cleanup any temporary files on exit 1280 # -- this is called as a finalizer (weakref.finalize) 1281 try: 1282 logger.debug(f'finalizing {objrepr}: cleaning up temporary files in {tmppath}') 1283 shutil.rmtree(tmppath, ignore_errors=True) 1284 except Exception as e: 1285 logger.error(f'finalizing {objrepr}: error occured as {e}')