Source code for omegaml.store.base

   1"""
   2Native storage for OmegaML using mongodb as the storage layer
   3
   4An OmegaStore instance is a MongoDB database. It has at least the
   5metadata collection which lists all objects stored in it. A metadata
   6document refers to the following types of objects (metadata.kind):
   7
   8* pandas.dfrows - a Pandas DataFrame stored as a collection of rows
   9* sklearn.joblib - a scikit learn estimator/pipline dumped using joblib.dump()
  10* python.data - an arbitrary python dict, tuple, list stored as a document
  11
  12Note that storing Pandas and scikit learn objects requires the availability
  13of the respective packages. If either can not be imported, the OmegaStore
  14degrades to a python.data store only. It will still .list() and get() any
  15object, however reverts to pure python objects. In this case it is up
  16to the client to convert the data into an appropriate format for processing.
  17
  18Pandas and scikit-learn objects can only be stored if these packages are
  19availables. put() raises a TypeError if you pass such objects and these
  20modules cannot be loaded.
  21
  22All data are stored within the same mongodb, in per-object collections
  23as follows:
  24
  25    * .metadata
  26        all metadata. each object is one document,
  27        See **omegaml.documents.Metadata** for details
  28    * .<bucket>.files
  29        this is the GridFS instance used to store
  30        blobs (models, numpy, hdf). The actual file name
  31        will be <prefix>/<name>.<ext>, where ext is
  32        optionally generated by put() / get().
  33    * .<bucket>.<prefix>.<name>.data
  34        every other dataset is stored in a separate
  35        collection (dataframes, dicts, lists, tuples).
  36        Any forward slash in prefix is ignored (e.g. 'data/'
  37        becomes 'data')
  38
  39    DataFrames by default are stored in their own collection, every
  40    row becomes a document. To store dataframes as a binary file,
  41    use `put(...., as_hdf=True).` `.get()` will always return a dataframe.
  42
  43    Python dicts, lists, tuples are stored as a single document with
  44    a `.data` attribute holding the JSON-converted representation. `.get()`
  45    will always return the corresponding python object of .data.
  46
  47    Models are joblib.dump()'ed and ziped prior to transferring into
  48    GridFs. .get() will always unzip and joblib.load() before returning
  49    the model. Note this requires that the process using .get() supports
  50    joblib as well as all python classes referred to. If joblib is not
  51    supported, .get() returns a file-like object.
  52
  53    The .metadata entry specifies the format used to store each
  54    object as well as it's location:
  55
  56    * metadata.kind
  57        the type of object
  58    * metadata.name
  59        the name of the object, as given on put()
  60    * metadata.gridfile
  61        the gridfs object (if any, null otherwise)
  62    * metadata.collection
  63        the name of the collection
  64    * metadata.attributes
  65        arbitrary custom attributes set in
  66        put(attributes=obj). This is used e.g. by
  67        OmegaRuntime's fit() method to record the
  68        data used in the model's training.
  69
  70    **.put()** and **.get()** use helper methods specific to the type in
  71    object's type and metadata.kind, respectively. In the future
  72    a plugin system will enable extension to other types.
  73"""
  74from __future__ import absolute_import
  75
  76import bson
  77import gridfs
  78import logging
  79import os
  80import shutil
  81import tempfile
  82import warnings
  83import weakref
  84from datetime import datetime
  85from mongoengine.connection import disconnect, \
  86    connect, _connections, get_db
  87from mongoengine.errors import DoesNotExist
  88from mongoengine.fields import GridFSProxy
  89from mongoengine.queryset.visitor import Q
  90from omegaml.store.fastinsert import fast_insert, default_chunksize
  91from omegaml.util import unravel_index, restore_index, make_tuple, jsonescape, \
  92    cursor_to_dataframe, convert_dtypes, load_class, extend_instance, ensure_index, PickableCollection, \
  93    mongo_compatible, signature
  94from uuid import uuid4
  95
  96from .queryops import sanitize_filter
  97from ..documents import make_Metadata, MDREGISTRY
  98from ..mongoshim import sanitize_mongo_kwargs, waitForConnection
  99from ..util import (is_estimator, is_dataframe, is_ndarray, is_spark_mllib,
 100                    settings as omega_settings, urlparse, is_series)
 101
 102logger = logging.getLogger(__name__)
 103
 104
[docs] 105class OmegaStore(object): 106 """ 107 The storage backend for models and data 108 """ 109 110 def __init__(self, mongo_url=None, bucket=None, prefix=None, kind=None, defaults=None, dbalias=None): 111 """ 112 :param mongo_url: the mongourl to use for the gridfs 113 :param bucket: the mongo collection to use for gridfs 114 :param prefix: the path prefix for files. defaults to blank 115 :param kind: the kind or list of kinds to limit this store to 116 """ 117 self.defaults = defaults or omega_settings() 118 self.mongo_url = mongo_url or self.defaults.OMEGA_MONGO_URL 119 self.bucket = bucket or self.defaults.OMEGA_MONGO_COLLECTION 120 self._fs_collection = self._ensure_fs_collection() 121 self._fs = None 122 self._tmppath = os.path.join(self.defaults.OMEGA_TMP, uuid4().hex) 123 self.prefix = prefix or '' 124 self.force_kind = kind 125 self._Metadata_cls = None 126 # don't initialize db here to avoid using the default settings 127 # otherwise Metadata will already have a connection and not use 128 # the one provided in override_settings 129 self._db = None 130 self._dbalias = dbalias 131 # add backends and mixins 132 self._apply_mixins() 133 # register backends 134 self.register_backends() 135 # register finalizer for cleanup 136 weakref.finalize(self, self._cleanup, repr(self), tmppath=str(self._tmppath)) 137 138 def __repr__(self): 139 return 'OmegaStore(bucket={}, prefix={})'.format(self.bucket, self.prefix) 140 141 def __equal__(self, other): 142 """test for equality of OmegaStore instances 143 144 Args: 145 other: OmegaStore instance 146 147 Returns: 148 True if other is the same database, same bucket, same prefix 149 """ 150 return self.mongo_url == other.mongo_url and self.bucket == other.bucket and self.prefix == other.prefix 151 152 @property 153 def tmppath(self): 154 """ 155 return an instance-specific temporary path 156 """ 157 os.makedirs(self._tmppath, exist_ok=True) 158 return self._tmppath 159 160 @property 161 def mongodb(self): 162 """ 163 Returns a mongo database object 164 """ 165 if self._db is not None: 166 return self._db 167 # parse salient parts of mongourl, e.g. 168 # mongodb://user:password@host/dbname 169 self.parsed_url = urlparse.urlparse(self.mongo_url) 170 self.database_name = self.parsed_url.path[1:] 171 host = self.parsed_url.netloc 172 scheme = self.parsed_url.scheme 173 username, password = None, None 174 if '@' in host: 175 creds, host = host.split('@', 1) 176 if ':' in creds: 177 username, password = creds.split(':') 178 # connect via mongoengine 179 # 180 # note this uses a MongoClient in the background, with pooled 181 # connections. there are multiprocessing issues with pymongo: 182 # http://api.mongodb.org/python/3.2/faq.html#using-pymongo-with-multiprocessing 183 # connect=False is due to https://jira.mongodb.org/browse/PYTHON-961 184 # this defers connecting until the first access 185 # serverSelectionTimeoutMS=2500 is to fail fast, the default is 30000 186 # 187 # use an instance specific alias, note that access to Metadata and 188 # QueryCache must pass the very same alias 189 self._dbalias = alias = self._dbalias or 'omega-{}'.format(uuid4().hex) 190 # always disconnect before registering a new connection because 191 # mongoengine.connect() forgets all connection settings upon disconnect 192 if alias not in _connections: 193 disconnect(alias) 194 connection = connect(alias=alias, db=self.database_name, 195 host=f'{scheme}://{host}', 196 username=username, 197 password=password, 198 connect=False, 199 authentication_source='admin', 200 **sanitize_mongo_kwargs(self.defaults.OMEGA_MONGO_SSL_KWARGS), 201 ) 202 # since PyMongo 4, connect() no longer waits for connection 203 waitForConnection(connection) 204 self._db = get_db(alias) 205 return self._db 206 207 @property 208 def _Metadata(self): 209 if self._Metadata_cls is None: 210 # hack to localize metadata 211 db = self.mongodb 212 self._Metadata_cls = make_Metadata(db_alias=self._dbalias, 213 collection=self._fs_collection) 214 return self._Metadata_cls 215 216 @property 217 def fs(self): 218 """ 219 Retrieve a gridfs instance using url and collection provided 220 221 :return: a gridfs instance 222 """ 223 if self._fs is not None: 224 return self._fs 225 self._fs = gridfs.GridFS(self.mongodb, collection=self._fs_collection) 226 self._ensure_fs_index(self._fs) 227 return self._fs 228
[docs] 229 def metadata(self, name=None, bucket=None, prefix=None, version=-1, **kwargs): 230 """ 231 Returns a metadata document for the given entry name 232 """ 233 # FIXME: version attribute does not do anything 234 # FIXME: metadata should be stored in a bucket-specific collection 235 # to enable access control, see https://docs.mongodb.com/manual/reference/method/db.create 236 # 237 # Role/#db.createRole 238 db = self.mongodb 239 fs = self.fs 240 prefix = prefix or self.prefix 241 bucket = bucket or self.bucket 242 # Meta is to silence lint on import error 243 Meta = self._Metadata 244 return Meta.objects(name=str(name), prefix=prefix, bucket=bucket).no_cache().first()
245
[docs] 246 def make_metadata(self, name, kind, bucket=None, prefix=None, **kwargs): 247 """ 248 create or update a metadata object 249 250 this retrieves a Metadata object if it exists given the kwargs. Only 251 the name, prefix and bucket arguments are considered 252 253 for existing Metadata objects, the attributes kw is treated as follows: 254 255 * attributes=None, the existing attributes are left as is 256 * attributes={}, the attributes value on an existing metadata object 257 is reset to the empty dict 258 * attributes={ some : value }, the existing attributes are updated 259 260 For new metadata objects, attributes defaults to {} if not specified, 261 else is set as provided. 262 263 :param name: the object name 264 :param bucket: the bucket, optional, defaults to self.bucket 265 :param prefix: the prefix, optional, defaults to self.prefix 266 267 """ 268 # TODO kept _make_metadata for backwards compatibility. 269 return self._make_metadata(name, bucket=bucket, prefix=prefix, 270 kind=kind, **kwargs)
271 272 def _make_metadata(self, name=None, bucket=None, prefix=None, **kwargs): 273 """ 274 create or update a metadata object 275 276 this retrieves a Metadata object if it exists given the kwargs. Only 277 the name, prefix and bucket arguments are considered 278 279 for existing Metadata objects, the attributes kw is treated as follows: 280 281 * attributes=None, the existing attributes are left as is 282 * attributes={}, the attributes value on an existing metadata object 283 is reset to the empty dict 284 * attributes={ some : value }, the existing attributes are updated 285 286 For new metadata objects, attributes defaults to {} if not specified, 287 else is set as provided. 288 289 :param name: the object name 290 :param bucket: the bucket, optional, defaults to self.bucket 291 :param prefix: the prefix, optional, defaults to self.prefix 292 """ 293 bucket = bucket or self.bucket 294 prefix = prefix or self.prefix 295 meta = self.metadata(name=name, 296 prefix=prefix, 297 bucket=bucket) 298 if meta: 299 dict_fields = 'attributes', 'kind_meta' 300 for k, v in kwargs.items(): 301 if k in dict_fields and v is not None and len(v) > 0: 302 previous = getattr(meta, k, {}) 303 previous.update(v) 304 setattr(meta, k, previous) 305 elif k in dict_fields and v is not None and len(v) == 0: 306 setattr(meta, k, {}) 307 elif k in dict_fields and v is None: 308 # ignore non specified attributes 309 continue 310 else: 311 # by default set whatever attribute is provided 312 setattr(meta, k, v) 313 else: 314 meta = self._Metadata(name=name, bucket=bucket, prefix=prefix, 315 **kwargs) 316 return meta 317 318 def _drop_metadata(self, name=None, **kwargs): 319 # internal method to delete meta data of an object 320 meta = self.metadata(name, **kwargs) 321 if meta is not None: 322 meta.delete() 323
[docs] 324 def collection(self, name=None, bucket=None, prefix=None): 325 """ 326 Returns a mongo db collection as a datastore 327 328 If there is an existing object of name, will return the .collection 329 of the object. Otherwise returns the collection according to naming 330 convention {bucket}.{prefix}.{name}.datastore 331 332 :param name: the collection to use. if none defaults to the 333 collection name given on instantiation. the actual collection name 334 used is always prefix + name + '.data' 335 """ 336 # see if we have a known object and a collection for that, if not define it 337 meta = self.metadata(name, bucket=bucket, prefix=prefix) 338 collection = meta.collection if meta else None 339 if not collection: 340 collection = self.object_store_key(name, '.datastore') 341 collection = collection.replace('..', '.') 342 # return the collection 343 try: 344 datastore = getattr(self.mongodb, collection) 345 except Exception as e: 346 raise e 347 return PickableCollection(datastore)
348 349 def _apply_mixins(self): 350 """ 351 apply mixins in defaults.OMEGA_STORE_MIXINS 352 """ 353 for mixin in self.defaults.OMEGA_STORE_MIXINS: 354 conditional = self._mixins_conditional 355 extend_instance(self, mixin, 356 conditional=conditional) 357 358 def _mixins_conditional(self, cls, obj): 359 return cls.supports(obj) if hasattr(cls, 'supports') else True 360
[docs] 361 def register_backends(self): 362 """ 363 register backends in defaults.OMEGA_STORE_BACKENDS 364 """ 365 for kind, backend in self.defaults.OMEGA_STORE_BACKENDS.items(): 366 self.register_backend(kind, backend)
367
[docs] 368 def register_backend(self, kind, backend): 369 """ 370 register a backend class 371 372 :param kind: (str) the backend kind 373 :param backend: (class) the backend class 374 """ 375 self.defaults.OMEGA_STORE_BACKENDS[kind] = load_class(backend) 376 if kind not in MDREGISTRY.KINDS: 377 MDREGISTRY.KINDS.append(kind) 378 return self
379
[docs] 380 def register_mixin(self, mixincls): 381 """ 382 register a mixin class 383 384 :param mixincls: (class) the mixin class 385 """ 386 extend_instance(self, mixincls) 387 return self
388
[docs] 389 def put(self, obj, name, attributes=None, kind=None, replace=False, model_store=None, 390 data_store=None, **kwargs): 391 """ 392 Stores an object, store estimators, pipelines, numpy arrays or 393 pandas dataframes 394 """ 395 if replace: 396 self.drop(name, force=True) 397 backend = self.get_backend_byobj(obj, name, attributes=attributes, kind=kind, 398 model_store=model_store, data_store=data_store, 399 **kwargs) 400 if backend: 401 return backend.put(obj, name, attributes=attributes, **kwargs) 402 # TODO move all of the specifics to backend implementations 403 if is_estimator(obj): 404 backend = self.get_backend_bykind(MDREGISTRY.SKLEARN_JOBLIB) 405 return backend.put(obj, name, attributes=attributes, **kwargs) 406 elif is_spark_mllib(obj): 407 backend = self.get_backend_bykind(MDREGISTRY.SKLEARN_JOBLIB) 408 return backend.put(obj, name, attributes=attributes, **kwargs) 409 elif is_dataframe(obj) or is_series(obj): 410 groupby = kwargs.get('groupby') 411 if obj.empty: 412 from warnings import warn 413 warn( 414 'Provided dataframe is empty, ignoring it, doing nothing here!') 415 return None 416 if kwargs.pop('as_hdf', False): 417 return self.put_dataframe_as_hdf( 418 obj, name, attributes, **kwargs) 419 elif groupby: 420 return self.put_dataframe_as_dfgroup( 421 obj, name, groupby, attributes) 422 append = kwargs.pop('append', None) 423 timestamp = kwargs.pop('timestamp', None) 424 index = kwargs.pop('index', None) 425 chunksize = kwargs.pop('chunksize', default_chunksize) 426 return self.put_dataframe_as_documents( 427 obj, name, append=append, attributes=attributes, index=index, 428 timestamp=timestamp, chunksize=chunksize, **kwargs) 429 elif is_ndarray(obj): 430 if kwargs.pop('as_pydata', False): 431 return self.put_pyobj_as_document(obj.tolist(), name, 432 attributes=attributes, **kwargs) 433 return self.put_ndarray_as_hdf(obj, name, attributes=attributes, 434 **kwargs) 435 elif isinstance(obj, (dict, list, tuple)): 436 kwargs.pop('as_pydata', None) 437 if kwargs.pop('as_hdf', False): 438 return self.put_pyobj_as_hdf(obj, name, 439 attributes=attributes, **kwargs) 440 return self.put_pyobj_as_document(obj, name, 441 attributes=attributes, 442 **kwargs) 443 else: 444 raise TypeError('type %s not supported' % type(obj))
445
[docs] 446 def put_dataframe_as_documents(self, obj, name, append=None, 447 attributes=None, index=None, 448 timestamp=None, chunksize=None, 449 ensure_compat=True, _fast_insert=fast_insert, 450 **kwargs): 451 """ 452 store a dataframe as a row-wise collection of documents 453 454 :param obj: the dataframe to store 455 :param name: the name of the item in the store 456 :param append: if False collection will be dropped before inserting, 457 if True existing documents will persist. Defaults to True. If not 458 specified and rows have been previously inserted, will issue a 459 warning. 460 :param index: list of columns, using +, -, @ as a column prefix to 461 specify ASCENDING, DESCENDING, GEOSPHERE respectively. For @ the 462 column has to represent a valid GeoJSON object. 463 :param timestamp: if True or a field name adds a timestamp. If the 464 value is a boolean or datetime, uses _created as the field name. 465 The timestamp is always datetime.datetime.utcnow(). May be overriden 466 by specifying the tuple (col, datetime). 467 :param ensure_compat: if True attempt to convert obj to mongodb compatibility, 468 set to False only if you are sure to have only compatible values in dataframe. 469 defaults to True. False may reduce memory and increase speed on large dataframes. 470 :return: the Metadata object created 471 """ 472 import pandas as pd 473 from .queryops import MongoQueryOps 474 collection = self.collection(name) 475 if is_series(obj): 476 import pandas as pd 477 obj = pd.DataFrame(obj, index=obj.index, columns=[str(obj.name)]) 478 store_series = True 479 else: 480 store_series = False 481 if append is False: 482 self.drop(name, force=True) 483 elif append is None and collection.count_documents({}, limit=1): 484 from warnings import warn 485 warn('%s already exists, will append rows' % name) 486 if index: 487 # get index keys 488 if isinstance(index, dict): 489 idx_kwargs = index 490 index = index.pop('columns') 491 else: 492 idx_kwargs = {} 493 # create index with appropriate options 494 keys, idx_kwargs = MongoQueryOps().make_index(index, **idx_kwargs) 495 ensure_index(collection, keys, **idx_kwargs) 496 if timestamp: 497 dt = datetime.utcnow() 498 if isinstance(timestamp, bool): 499 col = '_created' 500 elif isinstance(timestamp, str): 501 col = timestamp 502 elif isinstance(timestamp, datetime): 503 col, dt = '_created', timestamp 504 elif isinstance(timestamp, tuple): 505 col, dt = timestamp 506 else: 507 col = '_created' 508 obj[col] = dt 509 # store dataframe indicies 510 # FIXME this may be a performance issue, use size stored on stats or metadata 511 row_count = self.collection(name).estimated_document_count() 512 # fixes #466, ensure column names are strings in a multiindex 513 if isinstance(obj.columns, pd.MultiIndex): 514 obj.columns = obj.columns.map('_'.join) 515 obj, idx_meta = unravel_index(obj, row_count=row_count) 516 stored_columns = [jsonescape(col) for col in obj.columns] 517 column_map = list(zip(obj.columns, stored_columns)) 518 d_column_map = dict(column_map) 519 dtypes = { 520 d_column_map.get(k): v.name 521 for k, v in obj.dtypes.items() 522 } 523 kind_meta = { 524 'columns': column_map, 525 'dtypes': dtypes, 526 'idx_meta': idx_meta 527 } 528 # ensure column names to be strings 529 obj.columns = stored_columns 530 # create mongon indicies for data frame index columns 531 df_idxcols = [col for col in obj.columns if col.startswith('_idx#')] 532 if df_idxcols: 533 keys, idx_kwargs = MongoQueryOps().make_index(df_idxcols) 534 ensure_index(collection, keys, **idx_kwargs) 535 # create index on row id 536 keys, idx_kwargs = MongoQueryOps().make_index(['_om#rowid']) 537 ensure_index(collection, keys, **idx_kwargs) 538 # bulk insert 539 # -- get native objects 540 # -- seems to be required since pymongo 3.3.x. if not converted 541 # pymongo raises Cannot Encode object for int64 types 542 if ensure_compat: 543 for col, col_dtype in dtypes.items(): 544 if 'datetime' in col_dtype: 545 obj[col].fillna('', inplace=True) 546 obj = obj.astype('O', errors='ignore') 547 _fast_insert(obj, self, name, chunksize=chunksize) 548 kind = (MDREGISTRY.PANDAS_SEROWS 549 if store_series 550 else MDREGISTRY.PANDAS_DFROWS) 551 meta = self._make_metadata(name=name, 552 prefix=self.prefix, 553 bucket=self.bucket, 554 kind=kind, 555 kind_meta=kind_meta, 556 attributes=attributes, 557 collection=collection.name) 558 return meta.save()
559
[docs] 560 def put_dataframe_as_dfgroup(self, obj, name, groupby, attributes=None): 561 """ 562 store a dataframe grouped by columns in a mongo document 563 564 :Example: 565 566 > # each group 567 > { 568 > #group keys 569 > key: val, 570 > _data: [ 571 > # only data keys 572 > { key: val, ... } 573 > ]} 574 575 """ 576 577 def row_to_doc(obj): 578 for gval, gdf in obj.groupby(groupby): 579 if hasattr(gval, 'astype'): 580 gval = make_tuple(gval.astype('O')) 581 else: 582 gval = make_tuple(gval) 583 doc = dict(zip(groupby, gval)) 584 datacols = list(set(gdf.columns) - set(groupby)) 585 doc['_data'] = gdf[datacols].astype('O').to_dict('records') 586 yield doc 587 588 datastore = self.collection(name) 589 datastore.drop() 590 datastore.insert_many(row_to_doc(obj)) 591 return self._make_metadata(name=name, 592 prefix=self.prefix, 593 bucket=self.bucket, 594 kind=MDREGISTRY.PANDAS_DFGROUP, 595 attributes=attributes, 596 collection=datastore.name).save()
597 598 def put_dataframe_as_hdf(self, obj, name, attributes=None, **kwargs): 599 filename = self.object_store_key(name, '.hdf') 600 hdffname = self._package_dataframe2hdf(obj, filename) 601 with open(hdffname, 'rb') as fhdf: 602 fileid = self.fs.put(fhdf, filename=filename) 603 return self._make_metadata(name=name, 604 prefix=self.prefix, 605 bucket=self.bucket, 606 kind=MDREGISTRY.PANDAS_HDF, 607 attributes=attributes, 608 gridfile=GridFSProxy(db_alias=self._dbalias, 609 grid_id=fileid)).save() 610
[docs] 611 def put_ndarray_as_hdf(self, obj, name, attributes=None, **kwargs): 612 """ store numpy array as hdf 613 614 this is hack, converting the array to a dataframe then storing 615 it 616 """ 617 import pandas as pd 618 df = pd.DataFrame(obj) 619 return self.put_dataframe_as_hdf(df, name, attributes=attributes)
620
[docs] 621 def put_pyobj_as_hdf(self, obj, name, attributes=None, **kwargs): 622 """ 623 store list, tuple, dict as hdf 624 625 this requires the list, tuple or dict to be convertible into 626 a dataframe 627 """ 628 import pandas as pd 629 df = pd.DataFrame(obj) 630 return self.put_dataframe_as_hdf(df, name, attributes=attributes)
631
[docs] 632 def put_pyobj_as_document(self, obj, name, attributes=None, append=True, index=None, as_many=None, **kwargs): 633 """ 634 store a dict as a document 635 636 similar to put_dataframe_as_documents no data will be replaced by 637 default. that is, obj is appended as new documents into the objects' 638 mongo collection. to replace the data, specify append=False. 639 """ 640 collection = self.collection(name) 641 if append is False: 642 collection.drop() 643 elif append is None and collection.esimated_document_count(limit=1): 644 from warnings import warn 645 warn('%s already exists, will append rows' % name) 646 if index: 647 # create index with appropriate options 648 from omegaml.store import MongoQueryOps 649 if isinstance(index, dict): 650 idx_kwargs = index 651 index = index.pop('columns') 652 else: 653 idx_kwargs = {} 654 index = [f'data.{c}' for c in index] 655 keys, idx_kwargs = MongoQueryOps().make_index(index, **idx_kwargs) 656 ensure_index(collection, keys, **idx_kwargs) 657 if as_many is None: 658 as_many = isinstance(obj, (list, tuple)) and isinstance(obj[0], (list, tuple)) 659 if as_many: 660 # list of lists are inserted as many objects, as in pymongo < 4 661 records = (mongo_compatible({'data': item}) for item in obj) 662 result = collection.insert_many(records) 663 objid = result.inserted_ids[-1] 664 else: 665 result = collection.insert_one(mongo_compatible({'data': obj})) 666 objid = result.inserted_id 667 668 return self._make_metadata(name=name, 669 prefix=self.prefix, 670 bucket=self.bucket, 671 kind=MDREGISTRY.PYTHON_DATA, 672 collection=collection.name, 673 attributes=attributes, 674 objid=objid).save()
675
[docs] 676 def drop(self, name, force=False, version=-1, report=False, **kwargs): 677 """ 678 Drop the object 679 680 :param name: The name of the object. If the name is a pattern it will 681 be expanded using .list(), and call .drop() on every obj found. 682 :param force: If True ignores DoesNotExist exception, defaults to False 683 meaning this raises a DoesNotExist exception if the name does not 684 exist 685 :param report: if True returns a dict name=>status, where status is True 686 if deleted, False if not deleted 687 :return: True if object was deleted, False if not. 688 If force is True and the object does not exist it will still return True 689 :raises: DoesNotExist if the object does not exist and ```force=False``` 690 """ 691 is_pattern = '*' in name 692 objs = [name] if not is_pattern else self.list(name) 693 results = [] 694 for name in objs: 695 try: 696 backend = self.get_backend(name) 697 if backend is not None: 698 result = backend.drop(name, force=force, version=version, **kwargs) 699 else: 700 result = self._drop(name, force=force, version=version) 701 except Exception as e: 702 results.append((name, False)) 703 if not is_pattern: 704 raise 705 else: 706 results.append((name, result)) 707 if not objs: 708 result = self._drop(name, force=force, version=version) 709 results.append((name, result)) 710 return dict(results) if report else len(results) > 0
711 712 def _drop(self, name, force=False, version=-1, keep_data=False, **kwargs): 713 meta = self.metadata(name, version=version) 714 if meta is None and not force: 715 raise DoesNotExist(name) 716 collection = self.collection(name) 717 if collection and not keep_data: 718 self.mongodb.drop_collection(collection.name) 719 if meta: 720 if meta.collection and not keep_data: 721 self.mongodb.drop_collection(meta.collection) 722 if meta and meta.gridfile is not None and not keep_data: 723 meta.gridfile.delete() 724 self._drop_metadata(name) 725 return True 726 return False 727
[docs] 728 def get_backend_bykind(self, kind, model_store=None, data_store=None, 729 **kwargs): 730 """ 731 return the backend by a given object kind 732 733 :param kind: The object kind 734 :param model_store: the OmegaStore instance used to store models 735 :param data_store: the OmegaStore instance used to store data 736 :param kwargs: the kwargs passed to the backend initialization 737 :return: the backend 738 """ 739 try: 740 backend_cls = load_class(self.defaults.OMEGA_STORE_BACKENDS[kind]) 741 except KeyError as e: 742 raise ValueError('backend {kind} does not exist'.format(**locals())) 743 model_store = model_store or self 744 data_store = data_store or self 745 backend = backend_cls(model_store=model_store, 746 data_store=data_store, **kwargs) 747 return backend
748
[docs] 749 def get_backend(self, name, model_store=None, data_store=None, **kwargs): 750 """ 751 return the backend by a given object name 752 753 :param kind: The object kind 754 :param model_store: the OmegaStore instance used to store models 755 :param data_store: the OmegaStore instance used to store data 756 :param kwargs: the kwargs passed to the backend initialization 757 :return: the backend 758 """ 759 meta = self.metadata(name) 760 if meta is not None and meta.kind in self.defaults.OMEGA_STORE_BACKENDS: 761 return self.get_backend_bykind(meta.kind, 762 model_store=model_store, 763 data_store=data_store, 764 **kwargs) 765 return None
766
[docs] 767 def help(self, name_or_obj=None, kind=None, raw=False, display=None, renderer=None): 768 """ get help for an object by looking up its backend and calling help() on it 769 770 Retrieves the object's metadata and looks up its corresponding backend. If the 771 metadata.attributes['docs'] is a string it will display this as the help() contents. 772 If the string starts with 'http://' or 'https://' it will open the web page. 773 774 Args: 775 name_or_obj (str|obj): the name or actual object to get help for 776 kind (str): optional, if specified forces retrieval of backend for the given kind 777 raw (bool): optional, if True forces help to be the backend type of the object. 778 If False returns the attributes[docs] on the object's metadata, if available. 779 Defaults to False 780 display (fn): optional, callable for interactive display, defaults to help in 781 if sys.flags.interactive is True, else uses pydoc.render_doc with plaintext 782 renderer (fn): optional, the renderer= argument for pydoc.render_doc to use if 783 sys.flags.interactive is False and display is not provided 784 785 Returns: 786 * help(obj) if python is in interactive mode 787 * text(str) if python is in not interactive mode 788 """ 789 import sys 790 import pydoc 791 interactive = bool(display) if display is not None else sys.flags.interactive 792 display = display or help 793 renderer = renderer or pydoc.plaintext 794 obj = self._resolve_help_backend(name_or_obj=name_or_obj, kind=kind, raw=raw) 795 if any(str(obj.__doc__).startswith(v) for v in ('https://', 'http://')): 796 obj = obj.__doc__ 797 if interactive and display is help: 798 import webbrowser 799 display = webbrowser.open 800 return display(obj) if interactive else pydoc.render_doc(obj, renderer=renderer)
801 802 def _resolve_help_backend(self, name_or_obj=None, kind=None, raw=False): 803 # helper so we can test help 804 meta = self.metadata(name_or_obj) if name_or_obj else None 805 if kind: 806 backend = self.get_backend_bykind(kind) 807 else: 808 backend = self.get_backend(name_or_obj) or self.get_backend_byobj(name_or_obj) 809 if backend is None: 810 backend = self 811 if not raw and meta is not None and 'docs' in meta.attributes: 812 def UserDocumentation(): 813 pass 814 815 basedoc = backend.__doc__ or '' 816 UserDocumentation.__doc__ = (basedoc + 817 meta.attributes['docs']) 818 backend = UserDocumentation 819 return backend 820
[docs] 821 def get_backend_byobj(self, obj, name=None, kind=None, attributes=None, 822 model_store=None, data_store=None, **kwargs): 823 """ 824 return the matching backend for the given obj 825 826 Returns: 827 the first backend that supports the given parameters or None 828 """ 829 model_store = model_store or self 830 data_store = data_store or self 831 meta = self.metadata(name) if name else None 832 kind = kind or (meta.kind if meta is not None else None) 833 backend = None 834 if kind: 835 objtype = str(type(obj)) 836 if kind in self.defaults.OMEGA_STORE_BACKENDS: 837 backend = self.get_backend_bykind(kind, data_store=data_store, model_store=model_store) 838 if not backend.supports(obj, name, attributes=attributes, 839 data_store=data_store, 840 model_store=model_store, 841 kind=kind, **kwargs): 842 warnings.warn('Backend {kind} does not support {objtype}'.format(**locals())) 843 else: 844 pass 845 # TODO refactor pandas and numpy handling into proper backend to avoid misleading warning 846 # warnings.warn('Backend {kind} not found {objtype}. Reverting to default'.format(**locals())) 847 else: 848 for backend_kind, backend_cls in self.defaults.OMEGA_STORE_BACKENDS.items(): 849 backend = self.get_backend_bykind(backend_kind, data_store=data_store, model_store=model_store) 850 if backend.supports(obj, name, attributes=attributes, 851 data_store=data_store, model_store=model_store, 852 **kwargs): 853 break 854 else: 855 backend = None 856 return backend
857
[docs] 858 def getl(self, *args, **kwargs): 859 """ return a lazy MDataFrame for a given object 860 861 Same as .get, but returns a MDataFrame 862 863 """ 864 return self.get(*args, lazy=True, **kwargs)
865
[docs] 866 def get(self, name, version=-1, force_python=False, 867 kind=None, model_store=None, data_store=None, **kwargs): 868 """ 869 Retrieve an object 870 871 :param name: The name of the object 872 :param version: Version of the stored object (not supported) 873 :param force_python: Return as a python object 874 :param kwargs: kwargs depending on object kind 875 :return: an object, estimator, pipelines, data array or pandas dataframe 876 previously stored with put() 877 """ 878 meta = self.metadata(name, version=version) 879 if meta is None: 880 return None 881 if not force_python: 882 backend = (self.get_backend(name, model_store=model_store, 883 data_store=data_store) 884 if not kind else self.get_backend_bykind(kind, 885 model_store=model_store, 886 data_store=data_store)) 887 if backend is not None: 888 # FIXME: some backends need to get model_store, data_store, but fails tests 889 return backend.get(name, **kwargs) # model_store=model_store, data_store=data_store, **kwargs) 890 if meta.kind == MDREGISTRY.SKLEARN_JOBLIB: 891 backend = self.get_backend(name) 892 return backend.get_model(name) 893 elif meta.kind == MDREGISTRY.SPARK_MLLIB: 894 backend = self.get_backend(name) 895 return backend.get_model(name, version) 896 elif meta.kind == MDREGISTRY.PANDAS_DFROWS: 897 return self.get_dataframe_documents(name, version=version, 898 **kwargs) 899 elif meta.kind == MDREGISTRY.PANDAS_SEROWS: 900 return self.get_dataframe_documents(name, version=version, 901 is_series=True, 902 **kwargs) 903 elif meta.kind == MDREGISTRY.PANDAS_DFGROUP: 904 return self.get_dataframe_dfgroup( 905 name, version=version, **kwargs) 906 elif meta.kind == MDREGISTRY.PYTHON_DATA: 907 return self.get_python_data(name, version=version, **kwargs) 908 elif meta.kind == MDREGISTRY.PANDAS_HDF: 909 return self.get_dataframe_hdf(name, version=version) 910 return self.get_object_as_python(meta, version=version)
911
[docs] 912 def get_dataframe_documents(self, name, columns=None, lazy=False, 913 filter=None, version=-1, is_series=False, 914 chunksize=None, sanitize=True, trusted=None, 915 **kwargs): 916 """ 917 Internal method to return DataFrame from documents 918 919 :param name: the name of the object (str) 920 :param columns: the column projection as a list of column names 921 :param lazy: if True returns a lazy representation as an MDataFrame. 922 If False retrieves all data and returns a DataFrame (default) 923 :param filter: the filter to be applied as a column__op=value dict 924 :param sanitize: sanitize filter by removing all $op filter keys, 925 defaults to True. Specify False to allow $op filter keys. $where 926 is always removed as it is considered unsafe. 927 :param version: the version to retrieve (not supported) 928 :param is_series: if True retruns a Series instead of a DataFrame 929 :param kwargs: remaining kwargs are used a filter. The filter kwarg 930 overrides other kwargs. 931 :return: the retrieved object (DataFrame, Series or MDataFrame) 932 933 """ 934 from omegaml.store.queryops import sanitize_filter 935 from omegaml.store.filtered import FilteredCollection 936 937 collection = self.collection(name) 938 meta = self.metadata(name) 939 filter = filter or kwargs 940 filter = sanitize_filter(filter, no_ops=sanitize, trusted=trusted) 941 if lazy or chunksize: 942 from ..mdataframe import MDataFrame 943 df = MDataFrame(collection, 944 metadata=meta.kind_meta, 945 columns=columns).query(**filter) 946 if is_series: 947 df = df[0] 948 if chunksize is not None and chunksize > 0: 949 return df.iterchunks(chunksize=chunksize) 950 else: 951 # TODO ensure the same processing is applied in MDataFrame 952 # TODO this method should always use a MDataFrame disregarding lazy 953 if filter: 954 from .query import Filter 955 query = Filter(collection, **filter).query 956 cursor = FilteredCollection(collection).find(filter=query, projection=columns) 957 else: 958 cursor = FilteredCollection(collection).find(projection=columns) 959 # restore dataframe 960 df = cursor_to_dataframe(cursor) 961 if '_id' in df.columns: 962 del df['_id'] 963 if hasattr(meta, 'kind_meta'): 964 df = convert_dtypes(df, meta.kind_meta.get('dtypes', {})) 965 # -- restore columns 966 meta_columns = dict(meta.kind_meta.get('columns')) 967 if meta_columns: 968 # apply projection, if any 969 if columns: 970 # get only projected columns 971 # meta_columns is {origin_column: stored_column} 972 orig_columns = dict({k: v for k, v in meta_columns.items() 973 if k in columns or v in columns}) 974 else: 975 # restore columns to original name 976 orig_columns = meta_columns 977 df.rename(columns=orig_columns, inplace=True) 978 # -- restore indexes 979 idx_meta = meta.kind_meta.get('idx_meta') 980 if idx_meta: 981 df = restore_index(df, idx_meta) 982 # -- restore row order 983 if is_series: 984 index = df.index 985 name = df.columns[0] 986 df = df[name] 987 df.index = index 988 df.name = None if name == 'None' else name 989 return df
990
[docs] 991 def rebuild_params(self, kwargs, collection): 992 """ 993 Returns a modified set of parameters for querying mongodb 994 based on how the mongo document is structured and the 995 fields the document is grouped by. 996 997 **Note: Explicitly to be used with get_grouped_data only** 998 999 :param kwargs: Mongo filter arguments 1000 :param collection: The name of mongodb collection 1001 :return: Returns a set of parameters as dictionary. 1002 """ 1003 modified_params = {} 1004 db_structure = collection.find_one({}, {'_id': False}) 1005 groupby_columns = list(set(db_structure.keys()) - set(['_data'])) 1006 if kwargs is not None: 1007 for item in kwargs: 1008 if item not in groupby_columns: 1009 modified_query_param = '_data.' + item 1010 modified_params[modified_query_param] = kwargs.get(item) 1011 else: 1012 modified_params[item] = kwargs.get(item) 1013 return modified_params
1014
[docs] 1015 def get_dataframe_dfgroup(self, name, version=-1, sanitize=True, kwargs=None): 1016 """ 1017 Return a grouped dataframe 1018 1019 :param name: the name of the object 1020 :param version: not supported 1021 :param kwargs: mongo db query arguments to be passed to 1022 collection.find() as a filter. 1023 :param sanitize: remove any $op operators in kwargs 1024 1025 """ 1026 import pandas as pd 1027 from omegaml.store.queryops import sanitize_filter 1028 from omegaml.store.filtered import FilteredCollection 1029 1030 def convert_doc_to_row(cursor): 1031 for doc in cursor: 1032 data = doc.pop('_data', []) 1033 for row in data: 1034 doc.update(row) 1035 yield doc 1036 1037 datastore = FilteredCollection(self.collection(name)) 1038 kwargs = kwargs if kwargs else {} 1039 params = self.rebuild_params(kwargs, datastore) 1040 params = sanitize_filter(params, no_ops=sanitize) 1041 cursor = datastore.find(params, projection={'_id': False}) 1042 df = pd.DataFrame(convert_doc_to_row(cursor)) 1043 return df
1044
[docs] 1045 def get_dataframe_hdf(self, name, version=-1): 1046 """ 1047 Retrieve dataframe from hdf 1048 1049 :param name: The name of object 1050 :param version: The version of object (not supported) 1051 :return: Returns a python pandas dataframe 1052 :raises: gridfs.errors.NoFile 1053 """ 1054 df = None 1055 meta = self.metadata(name) 1056 filename = getattr(meta.gridfile, 'name', self.object_store_key(name, '.hdf')) 1057 if self.fs.exists(filename=filename): 1058 df = self._extract_dataframe_hdf(filename, version=version) 1059 return df 1060 else: 1061 raise gridfs.errors.NoFile( 1062 "{0} does not exist in mongo collection '{1}'".format( 1063 name, self.bucket))
1064
[docs] 1065 def get_python_data(self, name, filter=None, version=-1, lazy=False, trusted=False, **kwargs): 1066 """ 1067 Retrieve objects as python data 1068 1069 :param name: The name of object 1070 :param version: The version of object 1071 1072 :return: Returns the object as python list object 1073 """ 1074 datastore = self.collection(name) 1075 filter = filter or kwargs 1076 sanitize_filter(filter) if trusted is False or trusted != signature(filter) else filter 1077 cursor = datastore.find(filter, **kwargs) 1078 if lazy: 1079 return cursor 1080 data = (d.get('data') for d in cursor) 1081 return list(data)
1082
[docs] 1083 def get_object_as_python(self, meta, version=-1): 1084 """ 1085 Retrieve object as python object 1086 1087 :param meta: The metadata object 1088 :param version: The version of the object 1089 1090 :return: Returns data as python object 1091 """ 1092 if meta.kind == MDREGISTRY.SKLEARN_JOBLIB: 1093 return meta.gridfile 1094 if meta.kind == MDREGISTRY.PANDAS_HDF: 1095 return meta.gridfile 1096 if meta.kind == MDREGISTRY.PANDAS_DFROWS: 1097 return list(getattr(self.mongodb, meta.collection).find()) 1098 if meta.kind == MDREGISTRY.PYTHON_DATA: 1099 col = getattr(self.mongodb, meta.collection) 1100 return col.find_one(dict(_id=meta.objid)).get('data') 1101 raise TypeError('cannot return kind %s as a python object' % meta.kind)
1102 1103 def __iter__(self): 1104 for f in self.list(include_temp=True): 1105 yield f 1106 1107 @property 1108 def buckets(self): 1109 return ['default' if b == self.defaults.OMEGA_MONGO_COLLECTION else b 1110 for b in self._Metadata.objects.distinct('bucket')] 1111
[docs] 1112 def list(self, pattern=None, regexp=None, kind=None, raw=False, hidden=None, 1113 include_temp=False, bucket=None, prefix=None, filter=None): 1114 """ 1115 List all files in store 1116 1117 specify pattern as a unix pattern (e.g. :code:`models/*`, 1118 or specify regexp) 1119 1120 :param pattern: the unix file pattern or None for all 1121 :param regexp: the regexp. takes precedence over pattern 1122 :param raw: if True return the meta data objects 1123 :param filter: specify additional filter criteria, optional 1124 :return: List of files in store 1125 1126 """ 1127 regex = lambda pattern: bson.regex.Regex(f'{pattern}') 1128 db = self.mongodb 1129 searchkeys = dict(bucket=bucket or self.bucket, 1130 prefix=prefix or self.prefix) 1131 q_excludes = Q() 1132 if regexp: 1133 searchkeys['name'] = regex(regexp) 1134 elif pattern: 1135 re_pattern = pattern.replace('*', '.*').replace('/', r'\/') 1136 searchkeys['name'] = regex(f'^{re_pattern}$') 1137 if not include_temp: 1138 q_excludes &= Q(name__not__startswith='_') 1139 q_excludes &= Q(name__not=regex(r'(.{1,*}\/?_.*)')) 1140 if not hidden: 1141 q_excludes &= Q(name__not__startswith='.') 1142 if kind or self.force_kind: 1143 kind = kind or self.force_kind 1144 if isinstance(kind, (tuple, list)): 1145 searchkeys.update(kind__in=kind) 1146 else: 1147 searchkeys.update(kind=kind) 1148 if filter: 1149 searchkeys.update(filter) 1150 q_search = Q(**searchkeys) & q_excludes 1151 files = self._Metadata.objects.no_cache()(q_search) 1152 return [f if raw else str(f.name).replace('.omm', '') for f in files]
1153
[docs] 1154 def exists(self, name, hidden=False): 1155 """ check if object exists 1156 1157 Args: 1158 name (str): name of object 1159 hidden (bool): if True, include hidden files, defaults to 1160 False, unless name starts with '.' 1161 1162 Returns: 1163 bool, True if object exists 1164 1165 .. versionchanged:: 0.16.4 1166 hidden defaults to True if name starts with '.' 1167 """ 1168 hidden = True if name.startswith('.') else hidden 1169 return name in self.list(name, hidden=hidden)
1170
[docs] 1171 def object_store_key(self, name, ext, hashed=None): 1172 """ 1173 Returns the store key 1174 1175 Unless you write a mixin or a backend you should not use this method 1176 1177 :param name: The name of object 1178 :param ext: The extension of the filename 1179 :param hashed: hash the key to support arbitrary name length, defaults 1180 to defaults.OMEGA_STORE_HASHEDNAMES, True by default since 0.13.7 1181 1182 :return: A filename with relative bucket, prefix and name 1183 """ 1184 # byte string 1185 _u8 = lambda t: t.encode('UTF-8', 'replace') if isinstance(t, str) else t 1186 key = self._get_obj_store_key(name, ext) 1187 hashed = hashed if hashed is not None else self.defaults.OMEGA_STORE_HASHEDNAMES 1188 if hashed: 1189 from hashlib import sha1 1190 # SEC: CWE-916 1191 # - status: wontfix 1192 # - reason: hashcode is used purely for name resolution, not a security function 1193 hasher = sha1() 1194 hasher.update(_u8(key)) 1195 key = hasher.hexdigest() 1196 return key
1197 1198 def _get_obj_store_key(self, name, ext, prefix=None, bucket=None): 1199 # backwards compatilibity implementation of object_store_key() 1200 name = '%s.%s' % (name, ext) if not name.endswith(ext) else name 1201 filename = '{bucket}.{prefix}.{name}'.format( 1202 bucket=bucket or self.bucket, 1203 prefix=prefix or self.prefix, 1204 name=name, 1205 ext=ext).replace('/', '_').replace('..', '.') 1206 return filename 1207 1208 def _package_dataframe2hdf(self, df, filename, key=None): 1209 """ 1210 Package a dataframe as a hdf file 1211 1212 :param df: The dataframe 1213 :param filename: Name of file 1214 1215 :return: Filename of hdf file 1216 """ 1217 lpath = tempfile.mkdtemp() 1218 fname = os.path.basename(filename) 1219 hdffname = os.path.join(self.tmppath, fname + '.hdf') 1220 key = key or 'data' 1221 df.to_hdf(hdffname, key) 1222 return hdffname 1223 1224 def _extract_dataframe_hdf(self, filename, version=-1): 1225 """ 1226 Extracts a dataframe from a stored hdf file 1227 1228 :param filename: The name of file 1229 :param version: The version of file 1230 1231 :return: Pandas dataframe 1232 """ 1233 import pandas as pd 1234 hdffname = os.path.join(self.tmppath, filename) 1235 dirname = os.path.dirname(hdffname) 1236 if not os.path.exists(dirname): 1237 os.makedirs(dirname) 1238 try: 1239 outf = self.fs.get_version(filename, version=version) 1240 except gridfs.errors.NoFile as e: 1241 raise e 1242 with open(hdffname, 'wb') as hdff: 1243 hdff.write(outf.read()) 1244 hdf = pd.HDFStore(hdffname) 1245 key = list(hdf.keys())[0] 1246 df = hdf[key] 1247 hdf.close() 1248 return df 1249 1250 def _ensure_fs_collection(self): 1251 # ensure backwards-compatible gridfs access 1252 if self.defaults.OMEGA_BUCKET_FS_LEGACY: 1253 # prior to 0.13.2 a single gridfs instance was used, always equal to the default collection 1254 return self.defaults.OMEGA_MONGO_COLLECTION 1255 if self.bucket == self.defaults.OMEGA_MONGO_COLLECTION: 1256 # from 0.13.2 onwards, only the default bucket is equal to the default collection 1257 # backwards compatibility for existing installations 1258 return self.bucket 1259 # since 0.13.2, all buckets other than the default use a qualified collection name to 1260 # effectively separate files in different buckets, enabling finer-grade access control 1261 # and avoiding name collisions from different buckets 1262 return '{}_{}'.format(self.defaults.OMEGA_MONGO_COLLECTION, self.bucket) 1263 1264 def _ensure_fs_index(self, fs): 1265 # make sure we have proper chunks and file indicies. this should be created on first write, but sometimes is not 1266 # see https://docs.mongodb.com/manual/core/gridfs/#gridfs-indexes 1267 # pymongo since 4.1 no longer has fs._GridFS 1268 chunks_collection = fs._GridFS__chunks if hasattr(fs, '_GridFS__chunks') else fs._chunks 1269 files_collection = fs._GridFS__files if hasattr(fs, '_GridFS__files') else fs._files 1270 ensure_index(chunks_collection, {'files_id': 1, 'n': 1}, unique=True) 1271 ensure_index(files_collection, {'filename': 1, 'uploadDate': 1}) 1272 1273 def sign(self, filter): 1274 return signature(filter) 1275 1276 @staticmethod 1277 def _cleanup(objrepr, tmppath=None): 1278 # cleanup any temporary files on exit 1279 # -- this is called as a finalizer (weakref.finalize) 1280 try: 1281 logger.debug(f'finalizing {objrepr}: cleaning up temporary files in {tmppath}') 1282 shutil.rmtree(tmppath, ignore_errors=True) 1283 except Exception as e: 1284 logger.error(f'finalizing {objrepr}: error occured as {e}')