Source code for omegaml.store.base

   1"""
   2Native storage for OmegaML using mongodb as the storage layer
   3
   4An OmegaStore instance is a MongoDB database. It has at least the
   5metadata collection which lists all objects stored in it. A metadata
   6document refers to the following types of objects (metadata.kind):
   7
   8* pandas.dfrows - a Pandas DataFrame stored as a collection of rows
   9* sklearn.joblib - a scikit learn estimator/pipline dumped using joblib.dump()
  10* python.data - an arbitrary python dict, tuple, list stored as a document
  11
  12Note that storing Pandas and scikit learn objects requires the availability
  13of the respective packages. If either can not be imported, the OmegaStore
  14degrades to a python.data store only. It will still .list() and get() any
  15object, however reverts to pure python objects. In this case it is up
  16to the client to convert the data into an appropriate format for processing.
  17
  18Pandas and scikit-learn objects can only be stored if these packages are
  19availables. put() raises a TypeError if you pass such objects and these
  20modules cannot be loaded.
  21
  22All data are stored within the same mongodb, in per-object collections
  23as follows:
  24
  25    * .metadata
  26        all metadata. each object is one document,
  27        See **omegaml.documents.Metadata** for details
  28    * .<bucket>.files
  29        this is the GridFS instance used to store
  30        blobs (models, numpy, hdf). The actual file name
  31        will be <prefix>/<name>.<ext>, where ext is
  32        optionally generated by put() / get().
  33    * .<bucket>.<prefix>.<name>.data
  34        every other dataset is stored in a separate
  35        collection (dataframes, dicts, lists, tuples).
  36        Any forward slash in prefix is ignored (e.g. 'data/'
  37        becomes 'data')
  38
  39    DataFrames by default are stored in their own collection, every
  40    row becomes a document. To store dataframes as a binary file,
  41    use `put(...., as_hdf=True).` `.get()` will always return a dataframe.
  42
  43    Python dicts, lists, tuples are stored as a single document with
  44    a `.data` attribute holding the JSON-converted representation. `.get()`
  45    will always return the corresponding python object of .data.
  46
  47    Models are joblib.dump()'ed and ziped prior to transferring into
  48    GridFs. .get() will always unzip and joblib.load() before returning
  49    the model. Note this requires that the process using .get() supports
  50    joblib as well as all python classes referred to. If joblib is not
  51    supported, .get() returns a file-like object.
  52
  53    The .metadata entry specifies the format used to store each
  54    object as well as it's location:
  55
  56    * metadata.kind
  57        the type of object
  58    * metadata.name
  59        the name of the object, as given on put()
  60    * metadata.gridfile
  61        the gridfs object (if any, null otherwise)
  62    * metadata.collection
  63        the name of the collection
  64    * metadata.attributes
  65        arbitrary custom attributes set in
  66        put(attributes=obj). This is used e.g. by
  67        OmegaRuntime's fit() method to record the
  68        data used in the model's training.
  69
  70    **.put()** and **.get()** use helper methods specific to the type in
  71    object's type and metadata.kind, respectively. In the future
  72    a plugin system will enable extension to other types.
  73"""
  74from __future__ import absolute_import
  75
  76import bson
  77import gridfs
  78import logging
  79import os
  80import shutil
  81import tempfile
  82import warnings
  83import weakref
  84from datetime import datetime
  85from mongoengine.connection import disconnect, \
  86    connect, _connections, get_db
  87from mongoengine.errors import DoesNotExist
  88from mongoengine.fields import GridFSProxy
  89from mongoengine.queryset.visitor import Q
  90from uuid import uuid4
  91
  92from omegaml.store.fastinsert import fast_insert, default_chunksize
  93from omegaml.util import unravel_index, restore_index, make_tuple, jsonescape, \
  94    cursor_to_dataframe, convert_dtypes, load_class, extend_instance, ensure_index, PickableCollection, \
  95    mongo_compatible, signature
  96from .queryops import sanitize_filter
  97from ..documents import make_Metadata, MDREGISTRY
  98from ..mongoshim import sanitize_mongo_kwargs, waitForConnection
  99from ..util import (is_estimator, is_dataframe, is_ndarray, is_spark_mllib,
 100                    settings as omega_settings, urlparse, is_series)
 101
 102logger = logging.getLogger(__name__)
 103
 104
[docs] 105class OmegaStore(object): 106 """ 107 The storage backend for models and data 108 """ 109 110 def __init__(self, mongo_url=None, bucket=None, prefix=None, kind=None, defaults=None, dbalias=None): 111 """ 112 :param mongo_url: the mongourl to use for the gridfs 113 :param bucket: the mongo collection to use for gridfs 114 :param prefix: the path prefix for files. defaults to blank 115 :param kind: the kind or list of kinds to limit this store to 116 """ 117 self.defaults = defaults or omega_settings() 118 self.mongo_url = mongo_url or self.defaults.OMEGA_MONGO_URL 119 self.bucket = bucket or self.defaults.OMEGA_MONGO_COLLECTION 120 self._fs_collection = self._ensure_fs_collection() 121 self._fs = None 122 self._tmppath = os.path.join(self.defaults.OMEGA_TMP, uuid4().hex) 123 self.prefix = prefix or '' 124 self.force_kind = kind 125 self._Metadata_cls = None 126 # don't initialize db here to avoid using the default settings 127 # otherwise Metadata will already have a connection and not use 128 # the one provided in override_settings 129 self._db = None 130 self._dbalias = dbalias 131 # add backends and mixins 132 self._apply_mixins() 133 # register backends 134 self.register_backends() 135 # register finalizer for cleanup 136 weakref.finalize(self, self._cleanup, repr(self), tmppath=str(self._tmppath)) 137 138 def __repr__(self): 139 return 'OmegaStore(bucket={}, prefix={})'.format(self.bucket, self.prefix) 140 141 def __equal__(self, other): 142 """test for equality of OmegaStore instances 143 144 Args: 145 other: OmegaStore instance 146 147 Returns: 148 True if other is the same database, same bucket, same prefix 149 """ 150 return self.mongo_url == other.mongo_url and self.bucket == other.bucket and self.prefix == other.prefix 151 152 @property 153 def tmppath(self): 154 """ 155 return an instance-specific temporary path 156 """ 157 os.makedirs(self._tmppath, exist_ok=True) 158 return self._tmppath 159 160 @property 161 def mongodb(self): 162 """ 163 Returns a mongo database object 164 """ 165 if self._db is not None: 166 return self._db 167 # parse salient parts of mongourl, e.g. 168 # mongodb://user:password@host/dbname 169 self.parsed_url = urlparse.urlparse(self.mongo_url) 170 self.database_name = self.parsed_url.path[1:] 171 host = self.parsed_url.netloc 172 scheme = self.parsed_url.scheme 173 username, password = None, None 174 if '@' in host: 175 creds, host = host.split('@', 1) 176 if ':' in creds: 177 username, password = creds.split(':') 178 # connect via mongoengine 179 # 180 # note this uses a MongoClient in the background, with pooled 181 # connections. there are multiprocessing issues with pymongo: 182 # http://api.mongodb.org/python/3.2/faq.html#using-pymongo-with-multiprocessing 183 # connect=False is due to https://jira.mongodb.org/browse/PYTHON-961 184 # this defers connecting until the first access 185 # serverSelectionTimeoutMS=2500 is to fail fast, the default is 30000 186 # 187 # use an instance specific alias, note that access to Metadata and 188 # QueryCache must pass the very same alias 189 self._dbalias = alias = self._dbalias or 'omega-{}'.format(uuid4().hex) 190 # always disconnect before registering a new connection because 191 # mongoengine.connect() forgets all connection settings upon disconnect 192 if alias not in _connections: 193 disconnect(alias) 194 connection = connect(alias=alias, db=self.database_name, 195 host=f'{scheme}://{host}', 196 username=username, 197 password=password, 198 connect=False, 199 authentication_source='admin', 200 **sanitize_mongo_kwargs(self.defaults.OMEGA_MONGO_SSL_KWARGS), 201 ) 202 # since PyMongo 4, connect() no longer waits for connection 203 waitForConnection(connection) 204 self._db = get_db(alias) 205 return self._db 206 207 @property 208 def _Metadata(self): 209 if self._Metadata_cls is None: 210 # hack to localize metadata 211 db = self.mongodb 212 self._Metadata_cls = make_Metadata(db_alias=self._dbalias, 213 collection=self._fs_collection) 214 return self._Metadata_cls 215 216 @property 217 def fs(self): 218 """ 219 Retrieve a gridfs instance using url and collection provided 220 221 :return: a gridfs instance 222 """ 223 if self._fs is not None: 224 return self._fs 225 self._fs = gridfs.GridFS(self.mongodb, collection=self._fs_collection) 226 self._ensure_fs_index(self._fs) 227 return self._fs 228
[docs] 229 def metadata(self, name=None, bucket=None, prefix=None, version=-1, **kwargs): 230 """ 231 Returns a metadata document for the given entry name 232 """ 233 # FIXME: version attribute does not do anything 234 # FIXME: metadata should be stored in a bucket-specific collection 235 # to enable access control, see https://docs.mongodb.com/manual/reference/method/db.create 236 # 237 # Role/#db.createRole 238 db = self.mongodb 239 fs = self.fs 240 prefix = prefix or self.prefix 241 bucket = bucket or self.bucket 242 # Meta is to silence lint on import error 243 Meta = self._Metadata 244 return Meta.objects(name=str(name), prefix=prefix, bucket=bucket).no_cache().first()
245
[docs] 246 def make_metadata(self, name, kind, bucket=None, prefix=None, **kwargs): 247 """ 248 create or update a metadata object 249 250 this retrieves a Metadata object if it exists given the kwargs. Only 251 the name, prefix and bucket arguments are considered 252 253 for existing Metadata objects, the attributes kw is treated as follows: 254 255 * attributes=None, the existing attributes are left as is 256 * attributes={}, the attributes value on an existing metadata object 257 is reset to the empty dict 258 * attributes={ some : value }, the existing attributes are updated 259 260 For new metadata objects, attributes defaults to {} if not specified, 261 else is set as provided. 262 263 :param name: the object name 264 :param bucket: the bucket, optional, defaults to self.bucket 265 :param prefix: the prefix, optional, defaults to self.prefix 266 267 """ 268 # TODO kept _make_metadata for backwards compatibility. 269 return self._make_metadata(name, bucket=bucket, prefix=prefix, 270 kind=kind, **kwargs)
271 272 def _make_metadata(self, name=None, bucket=None, prefix=None, **kwargs): 273 """ 274 create or update a metadata object 275 276 this retrieves a Metadata object if it exists given the kwargs. Only 277 the name, prefix and bucket arguments are considered 278 279 for existing Metadata objects, the attributes kw is treated as follows: 280 281 * attributes=None, the existing attributes are left as is 282 * attributes={}, the attributes value on an existing metadata object 283 is reset to the empty dict 284 * attributes={ some : value }, the existing attributes are updated 285 286 For new metadata objects, attributes defaults to {} if not specified, 287 else is set as provided. 288 289 :param name: the object name 290 :param bucket: the bucket, optional, defaults to self.bucket 291 :param prefix: the prefix, optional, defaults to self.prefix 292 """ 293 bucket = bucket or self.bucket 294 prefix = prefix or self.prefix 295 meta = self.metadata(name=name, 296 prefix=prefix, 297 bucket=bucket) 298 if meta: 299 dict_fields = 'attributes', 'kind_meta' 300 for k, v in kwargs.items(): 301 if k in dict_fields and v is not None and len(v) > 0: 302 previous = getattr(meta, k, {}) 303 previous.update(v) 304 setattr(meta, k, previous) 305 elif k in dict_fields and v is not None and len(v) == 0: 306 setattr(meta, k, {}) 307 elif k in dict_fields and v is None: 308 # ignore non specified attributes 309 continue 310 else: 311 # by default set whatever attribute is provided 312 setattr(meta, k, v) 313 else: 314 meta = self._Metadata(name=name, bucket=bucket, prefix=prefix, 315 **kwargs) 316 return meta 317 318 def _drop_metadata(self, name=None, **kwargs): 319 # internal method to delete meta data of an object 320 meta = self.metadata(name, **kwargs) 321 if meta is not None: 322 meta.delete() 323
[docs] 324 def collection(self, name=None, bucket=None, prefix=None): 325 """ 326 Returns a mongo db collection as a datastore 327 328 If there is an existing object of name, will return the .collection 329 of the object. Otherwise returns the collection according to naming 330 convention {bucket}.{prefix}.{name}.datastore 331 332 :param name: the collection to use. if none defaults to the 333 collection name given on instantiation. the actual collection name 334 used is always prefix + name + '.data' 335 """ 336 # see if we have a known object and a collection for that, if not define it 337 meta = self.metadata(name, bucket=bucket, prefix=prefix) 338 collection = meta.collection if meta else None 339 if not collection: 340 collection = self.object_store_key(name, '.datastore') 341 collection = collection.replace('..', '.') 342 # return the collection 343 try: 344 datastore = getattr(self.mongodb, collection) 345 except Exception as e: 346 raise e 347 return PickableCollection(datastore)
348 349 def _apply_mixins(self): 350 """ 351 apply mixins in defaults.OMEGA_STORE_MIXINS 352 """ 353 for mixin in self.defaults.OMEGA_STORE_MIXINS: 354 conditional = self._mixins_conditional 355 extend_instance(self, mixin, 356 conditional=conditional) 357 358 def _mixins_conditional(self, cls, obj): 359 return cls.supports(obj) if hasattr(cls, 'supports') else True 360
[docs] 361 def register_backends(self): 362 """ 363 register backends in defaults.OMEGA_STORE_BACKENDS 364 """ 365 for kind, backend in self.defaults.OMEGA_STORE_BACKENDS.items(): 366 self.register_backend(kind, backend)
367
[docs] 368 def register_backend(self, kind, backend): 369 """ 370 register a backend class 371 372 :param kind: (str) the backend kind 373 :param backend: (class) the backend class 374 """ 375 self.defaults.OMEGA_STORE_BACKENDS[kind] = load_class(backend) 376 if kind not in MDREGISTRY.KINDS: 377 MDREGISTRY.KINDS.append(kind) 378 return self
379
[docs] 380 def register_mixin(self, mixincls): 381 """ 382 register a mixin class 383 384 :param mixincls: (class) the mixin class 385 """ 386 extend_instance(self, mixincls) 387 return self
388
[docs] 389 def put(self, obj, name, attributes=None, kind=None, replace=False, model_store=None, 390 data_store=None, **kwargs): 391 """ 392 Stores an object, store estimators, pipelines, numpy arrays or 393 pandas dataframes 394 """ 395 if replace: 396 self.drop(name, force=True) 397 backend = self.get_backend_byobj(obj, name, attributes=attributes, kind=kind, 398 model_store=model_store, data_store=data_store, 399 **kwargs) 400 if backend: 401 return backend.put(obj, name, attributes=attributes, **kwargs) 402 # TODO move all of the specifics to backend implementations 403 if is_estimator(obj): 404 backend = self.get_backend_bykind(MDREGISTRY.SKLEARN_JOBLIB) 405 return backend.put(obj, name, attributes=attributes, **kwargs) 406 elif is_spark_mllib(obj): 407 backend = self.get_backend_bykind(MDREGISTRY.SKLEARN_JOBLIB) 408 return backend.put(obj, name, attributes=attributes, **kwargs) 409 elif is_dataframe(obj) or is_series(obj): 410 groupby = kwargs.get('groupby') 411 if obj.empty: 412 from warnings import warn 413 warn( 414 'Provided dataframe is empty, ignoring it, doing nothing here!') 415 return None 416 if kwargs.pop('as_hdf', False): 417 return self.put_dataframe_as_hdf( 418 obj, name, attributes, **kwargs) 419 elif groupby: 420 return self.put_dataframe_as_dfgroup( 421 obj, name, groupby, attributes) 422 append = kwargs.pop('append', None) 423 timestamp = kwargs.pop('timestamp', None) 424 index = kwargs.pop('index', None) 425 chunksize = kwargs.pop('chunksize', default_chunksize) 426 return self.put_dataframe_as_documents( 427 obj, name, append=append, attributes=attributes, index=index, 428 timestamp=timestamp, chunksize=chunksize, **kwargs) 429 elif is_ndarray(obj): 430 if kwargs.pop('as_pydata', False): 431 return self.put_pyobj_as_document(obj.tolist(), name, 432 attributes=attributes, **kwargs) 433 return self.put_ndarray_as_hdf(obj, name, attributes=attributes, 434 **kwargs) 435 elif isinstance(obj, (dict, list, tuple)): 436 kwargs.pop('as_pydata', None) 437 if kwargs.pop('as_hdf', False): 438 return self.put_pyobj_as_hdf(obj, name, 439 attributes=attributes, **kwargs) 440 return self.put_pyobj_as_document(obj, name, 441 attributes=attributes, 442 **kwargs) 443 else: 444 raise TypeError('type %s not supported' % type(obj))
445
[docs] 446 def put_dataframe_as_documents(self, obj, name, append=None, 447 attributes=None, index=None, 448 timestamp=None, chunksize=None, 449 ensure_compat=True, _fast_insert=fast_insert, 450 **kwargs): 451 """ 452 store a dataframe as a row-wise collection of documents 453 454 :param obj: the dataframe to store 455 :param name: the name of the item in the store 456 :param append: if False collection will be dropped before inserting, 457 if True existing documents will persist. Defaults to True. If not 458 specified and rows have been previously inserted, will issue a 459 warning. 460 :param index: list of columns, using +, -, @ as a column prefix to 461 specify ASCENDING, DESCENDING, GEOSPHERE respectively. For @ the 462 column has to represent a valid GeoJSON object. 463 :param timestamp: if True or a field name adds a timestamp. If the 464 value is a boolean or datetime, uses _created as the field name. 465 The timestamp is always datetime.datetime.utcnow(). May be overriden 466 by specifying the tuple (col, datetime). 467 :param ensure_compat: if True attempt to convert obj to mongodb compatibility, 468 set to False only if you are sure to have only compatible values in dataframe. 469 defaults to True. False may reduce memory and increase speed on large dataframes. 470 :return: the Metadata object created 471 """ 472 import pandas as pd 473 from .queryops import MongoQueryOps 474 collection = self.collection(name) 475 if is_series(obj): 476 import pandas as pd 477 obj = pd.DataFrame(obj, index=obj.index, columns=[str(obj.name)]) 478 store_series = True 479 else: 480 store_series = False 481 if append is False: 482 self.drop(name, force=True) 483 elif append is None and collection.count_documents({}, limit=1): 484 from warnings import warn 485 warn('%s already exists, will append rows' % name) 486 if index: 487 # get index keys 488 if isinstance(index, dict): 489 idx_kwargs = index 490 index = index.pop('columns') 491 else: 492 idx_kwargs = {} 493 # create index with appropriate options 494 keys, idx_kwargs = MongoQueryOps().make_index(index, **idx_kwargs) 495 ensure_index(collection, keys, **idx_kwargs) 496 if timestamp: 497 dt = datetime.utcnow() 498 if isinstance(timestamp, bool): 499 col = '_created' 500 elif isinstance(timestamp, str): 501 col = timestamp 502 elif isinstance(timestamp, datetime): 503 col, dt = '_created', timestamp 504 elif isinstance(timestamp, tuple): 505 col, dt = timestamp 506 else: 507 col = '_created' 508 obj[col] = dt 509 # store dataframe indicies 510 # FIXME this may be a performance issue, use size stored on stats or metadata 511 row_count = self.collection(name).estimated_document_count() 512 # fixes #466, ensure column names are strings in a multiindex 513 if isinstance(obj.columns, pd.MultiIndex): 514 obj.columns = obj.columns.map('_'.join) 515 obj, idx_meta = unravel_index(obj, row_count=row_count) 516 stored_columns = [jsonescape(col) for col in obj.columns] 517 column_map = list(zip(obj.columns, stored_columns)) 518 d_column_map = dict(column_map) 519 dtypes = { 520 d_column_map.get(k): v.name 521 for k, v in obj.dtypes.items() 522 } 523 kind_meta = { 524 'columns': column_map, 525 'dtypes': dtypes, 526 'idx_meta': idx_meta 527 } 528 # ensure column names to be strings 529 obj.columns = stored_columns 530 # create mongon indicies for data frame index columns 531 df_idxcols = [col for col in obj.columns if col.startswith('_idx#')] 532 if df_idxcols: 533 keys, idx_kwargs = MongoQueryOps().make_index(df_idxcols) 534 ensure_index(collection, keys, **idx_kwargs) 535 # create index on row id 536 keys, idx_kwargs = MongoQueryOps().make_index(['_om#rowid']) 537 ensure_index(collection, keys, **idx_kwargs) 538 # bulk insert 539 # -- get native objects 540 # -- seems to be required since pymongo 3.3.x. if not converted 541 # pymongo raises Cannot Encode object for int64 types 542 if ensure_compat: 543 for col, col_dtype in dtypes.items(): 544 if 'datetime' in col_dtype: 545 obj[col].fillna('', inplace=True) 546 obj = obj.astype('O', errors='ignore') 547 _fast_insert(obj, self, name, chunksize=chunksize) 548 kind = (MDREGISTRY.PANDAS_SEROWS 549 if store_series 550 else MDREGISTRY.PANDAS_DFROWS) 551 meta = self._make_metadata(name=name, 552 prefix=self.prefix, 553 bucket=self.bucket, 554 kind=kind, 555 kind_meta=kind_meta, 556 attributes=attributes, 557 collection=collection.name) 558 return meta.save()
559
[docs] 560 def put_dataframe_as_dfgroup(self, obj, name, groupby, attributes=None): 561 """ 562 store a dataframe grouped by columns in a mongo document 563 564 :Example: 565 566 > # each group 567 > { 568 > #group keys 569 > key: val, 570 > _data: [ 571 > # only data keys 572 > { key: val, ... } 573 > ]} 574 575 """ 576 577 def row_to_doc(obj): 578 for gval, gdf in obj.groupby(groupby): 579 if hasattr(gval, 'astype'): 580 gval = make_tuple(gval.astype('O')) 581 else: 582 gval = make_tuple(gval) 583 doc = dict(zip(groupby, gval)) 584 datacols = list(set(gdf.columns) - set(groupby)) 585 doc['_data'] = gdf[datacols].astype('O').to_dict('records') 586 yield doc 587 588 datastore = self.collection(name) 589 datastore.drop() 590 datastore.insert_many(row_to_doc(obj)) 591 return self._make_metadata(name=name, 592 prefix=self.prefix, 593 bucket=self.bucket, 594 kind=MDREGISTRY.PANDAS_DFGROUP, 595 attributes=attributes, 596 collection=datastore.name).save()
597 598 def put_dataframe_as_hdf(self, obj, name, attributes=None, **kwargs): 599 filename = self.object_store_key(name, '.hdf') 600 hdffname = self._package_dataframe2hdf(obj, filename) 601 with open(hdffname, 'rb') as fhdf: 602 fileid = self.fs.put(fhdf, filename=filename) 603 return self._make_metadata(name=name, 604 prefix=self.prefix, 605 bucket=self.bucket, 606 kind=MDREGISTRY.PANDAS_HDF, 607 attributes=attributes, 608 gridfile=GridFSProxy(db_alias=self._dbalias, 609 grid_id=fileid)).save() 610
[docs] 611 def put_ndarray_as_hdf(self, obj, name, attributes=None, **kwargs): 612 """ store numpy array as hdf 613 614 this is hack, converting the array to a dataframe then storing 615 it 616 """ 617 import pandas as pd 618 df = pd.DataFrame(obj) 619 return self.put_dataframe_as_hdf(df, name, attributes=attributes)
620
[docs] 621 def put_pyobj_as_hdf(self, obj, name, attributes=None, **kwargs): 622 """ 623 store list, tuple, dict as hdf 624 625 this requires the list, tuple or dict to be convertible into 626 a dataframe 627 """ 628 import pandas as pd 629 df = pd.DataFrame(obj) 630 return self.put_dataframe_as_hdf(df, name, attributes=attributes)
631
[docs] 632 def put_pyobj_as_document(self, obj, name, attributes=None, append=True, index=None, as_many=None, **kwargs): 633 """ 634 store a dict as a document 635 636 similar to put_dataframe_as_documents no data will be replaced by 637 default. that is, obj is appended as new documents into the objects' 638 mongo collection. to replace the data, specify append=False. 639 """ 640 collection = self.collection(name) 641 if append is False: 642 collection.drop() 643 elif append is None and collection.esimated_document_count(limit=1): 644 from warnings import warn 645 warn('%s already exists, will append rows' % name) 646 if index: 647 # create index with appropriate options 648 from omegaml.store import MongoQueryOps 649 if isinstance(index, dict): 650 idx_kwargs = index 651 index = index.pop('columns') 652 else: 653 idx_kwargs = {} 654 index = [f'data.{c}' for c in index] 655 keys, idx_kwargs = MongoQueryOps().make_index(index, **idx_kwargs) 656 ensure_index(collection, keys, **idx_kwargs) 657 if as_many is None: 658 as_many = isinstance(obj, (list, tuple)) and isinstance(obj[0], (list, tuple)) 659 if as_many: 660 # list of lists are inserted as many objects, as in pymongo < 4 661 records = (mongo_compatible({'data': item}) for item in obj) 662 result = collection.insert_many(records) 663 objid = result.inserted_ids[-1] 664 else: 665 result = collection.insert_one(mongo_compatible({'data': obj})) 666 objid = result.inserted_id 667 668 return self._make_metadata(name=name, 669 prefix=self.prefix, 670 bucket=self.bucket, 671 kind=MDREGISTRY.PYTHON_DATA, 672 collection=collection.name, 673 attributes=attributes, 674 objid=objid).save()
675
[docs] 676 def drop(self, name, force=False, version=-1, report=False, **kwargs): 677 """ 678 Drop the object 679 680 :param name: The name of the object. If the name is a pattern it will 681 be expanded using .list(), and call .drop() on every obj found. 682 :param force: If True ignores DoesNotExist exception, defaults to False 683 meaning this raises a DoesNotExist exception if the name does not 684 exist 685 :param report: if True returns a dict name=>status, where status is True 686 if deleted, False if not deleted 687 :return: True if object was deleted, False if not. 688 If force is True and the object does not exist it will still return True 689 :raises: DoesNotExist if the object does not exist and ```force=False``` 690 """ 691 is_pattern = '*' in name 692 objs = [name] if not is_pattern else self.list(name) 693 results = [] 694 for name in objs: 695 try: 696 backend = self.get_backend(name) 697 if backend is not None: 698 result = backend.drop(name, force=force, version=version, **kwargs) 699 else: 700 result = self._drop(name, force=force, version=version) 701 except Exception as e: 702 results.append((name, False)) 703 if not is_pattern: 704 raise 705 else: 706 results.append((name, result)) 707 if not objs: 708 result = self._drop(name, force=force, version=version) 709 results.append((name, result)) 710 return dict(results) if report else len(results) > 0
711 712 def _drop(self, name, force=False, version=-1, keep_data=False, **kwargs): 713 meta = self.metadata(name, version=version) 714 if meta is None and not force: 715 raise DoesNotExist(name) 716 collection = self.collection(name) 717 if collection and not keep_data: 718 self.mongodb.drop_collection(collection.name) 719 if meta: 720 if meta.collection and not keep_data: 721 self.mongodb.drop_collection(meta.collection) 722 if meta and meta.gridfile is not None and not keep_data: 723 meta.gridfile.delete() 724 self._drop_metadata(name) 725 return True 726 return False 727
[docs] 728 def get_backend_bykind(self, kind, model_store=None, data_store=None, 729 **kwargs): 730 """ 731 return the backend by a given object kind 732 733 :param kind: The object kind 734 :param model_store: the OmegaStore instance used to store models 735 :param data_store: the OmegaStore instance used to store data 736 :param kwargs: the kwargs passed to the backend initialization 737 :return: the backend 738 """ 739 try: 740 backend_cls = load_class(self.defaults.OMEGA_STORE_BACKENDS[kind]) 741 except KeyError as e: 742 raise ValueError('backend {kind} does not exist'.format(**locals())) 743 model_store = model_store or self 744 data_store = data_store or self 745 backend = backend_cls(model_store=model_store, 746 data_store=data_store, **kwargs) 747 return backend
748
[docs] 749 def get_backend(self, name, model_store=None, data_store=None, **kwargs): 750 """ 751 return the backend by a given object name 752 753 :param kind: The object kind 754 :param model_store: the OmegaStore instance used to store models 755 :param data_store: the OmegaStore instance used to store data 756 :param kwargs: the kwargs passed to the backend initialization 757 :return: the backend 758 """ 759 meta = self.metadata(name) 760 if meta is not None and meta.kind in self.defaults.OMEGA_STORE_BACKENDS: 761 return self.get_backend_bykind(meta.kind, 762 model_store=model_store, 763 data_store=data_store, 764 **kwargs) 765 return None
766
[docs] 767 def help(self, name_or_obj=None, kind=None, raw=False, display=None, renderer=None): 768 """ get help for an object by looking up its backend and calling help() on it 769 770 Retrieves the object's metadata and looks up its corresponding backend. If the 771 metadata.attributes['docs'] is a string it will display this as the help() contents. 772 If the string starts with 'http://' or 'https://' it will open the web page. 773 774 Args: 775 name_or_obj (str|obj): the name or actual object to get help for 776 kind (str): optional, if specified forces retrieval of backend for the given kind 777 raw (bool): optional, if True forces help to be the backend type of the object. 778 If False returns the attributes[docs] on the object's metadata, if available. 779 Defaults to False 780 display (fn): optional, callable for interactive display, defaults to help in 781 if sys.flags.interactive is True, else uses pydoc.render_doc with plaintext 782 renderer (fn): optional, the renderer= argument for pydoc.render_doc to use if 783 sys.flags.interactive is False and display is not provided 784 785 Returns: 786 * help(obj) if python is in interactive mode 787 * text(str) if python is in not interactive mode 788 """ 789 import sys 790 import pydoc 791 interactive = bool(display) if display is not None else sys.flags.interactive 792 display = display or help 793 renderer = renderer or pydoc.plaintext 794 obj = self._resolve_help_backend(name_or_obj=name_or_obj, kind=kind, raw=raw) 795 if any(str(obj.__doc__).startswith(v) for v in ('https://', 'http://')): 796 obj = obj.__doc__ 797 if interactive and display is help: 798 import webbrowser 799 display = webbrowser.open 800 return display(obj) if interactive else pydoc.render_doc(obj, renderer=renderer)
801 802 def _resolve_help_backend(self, name_or_obj=None, kind=None, raw=False): 803 # helper so we can test help 804 meta = self.metadata(name_or_obj) if name_or_obj else None 805 if kind: 806 backend = self.get_backend_bykind(kind) 807 else: 808 backend = self.get_backend(name_or_obj) or self.get_backend_byobj(name_or_obj) 809 if backend is None: 810 backend = self 811 if not raw and meta is not None and 'docs' in meta.attributes: 812 def UserDocumentation(): 813 pass 814 815 basedoc = backend.__doc__ or '' 816 UserDocumentation.__doc__ = (basedoc + 817 meta.attributes['docs']) 818 backend = UserDocumentation 819 return backend 820
[docs] 821 def get_backend_byobj(self, obj, name=None, kind=None, attributes=None, 822 model_store=None, data_store=None, **kwargs): 823 """ 824 return the matching backend for the given obj 825 826 Returns: 827 the first backend that supports the given parameters or None 828 """ 829 model_store = model_store or self 830 data_store = data_store or self 831 meta = self.metadata(name) if name else None 832 kind = kind or (meta.kind if meta is not None else None) 833 backend = None 834 if kind: 835 objtype = str(type(obj)) 836 if kind in self.defaults.OMEGA_STORE_BACKENDS: 837 backend = self.get_backend_bykind(kind, data_store=data_store, model_store=model_store) 838 if not backend.supports(obj, name, attributes=attributes, 839 data_store=data_store, 840 model_store=model_store, 841 meta=meta, 842 kind=kind, **kwargs): 843 warnings.warn('Backend {kind} does not support {objtype}'.format(**locals())) 844 else: 845 pass 846 # TODO refactor pandas and numpy handling into proper backend to avoid misleading warning 847 # warnings.warn('Backend {kind} not found {objtype}. Reverting to default'.format(**locals())) 848 else: 849 for backend_kind, backend_cls in self.defaults.OMEGA_STORE_BACKENDS.items(): 850 backend = self.get_backend_bykind(backend_kind, data_store=data_store, model_store=model_store) 851 if backend.supports(obj, name, attributes=attributes, 852 data_store=data_store, model_store=model_store, 853 **kwargs): 854 break 855 else: 856 backend = None 857 return backend
858
[docs] 859 def getl(self, *args, **kwargs): 860 """ return a lazy MDataFrame for a given object 861 862 Same as .get, but returns a MDataFrame 863 864 """ 865 return self.get(*args, lazy=True, **kwargs)
866
[docs] 867 def get(self, name, version=-1, force_python=False, 868 kind=None, model_store=None, data_store=None, **kwargs): 869 """ 870 Retrieve an object 871 872 :param name: The name of the object 873 :param version: Version of the stored object (not supported) 874 :param force_python: Return as a python object 875 :param kwargs: kwargs depending on object kind 876 :return: an object, estimator, pipelines, data array or pandas dataframe 877 previously stored with put() 878 """ 879 meta = self.metadata(name, version=version) 880 if meta is None: 881 return None 882 if not force_python: 883 backend = (self.get_backend(name, model_store=model_store, 884 data_store=data_store) 885 if not kind else self.get_backend_bykind(kind, 886 model_store=model_store, 887 data_store=data_store)) 888 if backend is not None: 889 # FIXME: some backends need to get model_store, data_store, but fails tests 890 return backend.get(name, **kwargs) # model_store=model_store, data_store=data_store, **kwargs) 891 if meta.kind == MDREGISTRY.SKLEARN_JOBLIB: 892 backend = self.get_backend(name) 893 return backend.get_model(name) 894 elif meta.kind == MDREGISTRY.SPARK_MLLIB: 895 backend = self.get_backend(name) 896 return backend.get_model(name, version) 897 elif meta.kind == MDREGISTRY.PANDAS_DFROWS: 898 return self.get_dataframe_documents(name, version=version, 899 **kwargs) 900 elif meta.kind == MDREGISTRY.PANDAS_SEROWS: 901 return self.get_dataframe_documents(name, version=version, 902 is_series=True, 903 **kwargs) 904 elif meta.kind == MDREGISTRY.PANDAS_DFGROUP: 905 return self.get_dataframe_dfgroup( 906 name, version=version, **kwargs) 907 elif meta.kind == MDREGISTRY.PYTHON_DATA: 908 return self.get_python_data(name, version=version, **kwargs) 909 elif meta.kind == MDREGISTRY.PANDAS_HDF: 910 return self.get_dataframe_hdf(name, version=version) 911 return self.get_object_as_python(meta, version=version)
912
[docs] 913 def get_dataframe_documents(self, name, columns=None, lazy=False, 914 filter=None, version=-1, is_series=False, 915 chunksize=None, sanitize=True, trusted=None, 916 **kwargs): 917 """ 918 Internal method to return DataFrame from documents 919 920 :param name: the name of the object (str) 921 :param columns: the column projection as a list of column names 922 :param lazy: if True returns a lazy representation as an MDataFrame. 923 If False retrieves all data and returns a DataFrame (default) 924 :param filter: the filter to be applied as a column__op=value dict 925 :param sanitize: sanitize filter by removing all $op filter keys, 926 defaults to True. Specify False to allow $op filter keys. $where 927 is always removed as it is considered unsafe. 928 :param version: the version to retrieve (not supported) 929 :param is_series: if True retruns a Series instead of a DataFrame 930 :param kwargs: remaining kwargs are used a filter. The filter kwarg 931 overrides other kwargs. 932 :return: the retrieved object (DataFrame, Series or MDataFrame) 933 934 """ 935 from omegaml.store.queryops import sanitize_filter 936 from omegaml.store.filtered import FilteredCollection 937 938 collection = self.collection(name) 939 meta = self.metadata(name) 940 filter = filter or kwargs 941 filter = sanitize_filter(filter, no_ops=sanitize, trusted=trusted) 942 if lazy or chunksize: 943 from ..mdataframe import MDataFrame 944 df = MDataFrame(collection, 945 metadata=meta.kind_meta, 946 columns=columns).query(**filter) 947 if is_series: 948 df = df[0] 949 if chunksize is not None and chunksize > 0: 950 return df.iterchunks(chunksize=chunksize) 951 else: 952 # TODO ensure the same processing is applied in MDataFrame 953 # TODO this method should always use a MDataFrame disregarding lazy 954 if filter: 955 from .query import Filter 956 query = Filter(collection, **filter).query 957 cursor = FilteredCollection(collection).find(filter=query, projection=columns) 958 else: 959 cursor = FilteredCollection(collection).find(projection=columns) 960 # restore dataframe 961 df = cursor_to_dataframe(cursor) 962 if '_id' in df.columns: 963 del df['_id'] 964 if hasattr(meta, 'kind_meta'): 965 df = convert_dtypes(df, meta.kind_meta.get('dtypes', {})) 966 # -- restore columns 967 meta_columns = dict(meta.kind_meta.get('columns')) 968 if meta_columns: 969 # apply projection, if any 970 if columns: 971 # get only projected columns 972 # meta_columns is {origin_column: stored_column} 973 orig_columns = dict({k: v for k, v in meta_columns.items() 974 if k in columns or v in columns}) 975 else: 976 # restore columns to original name 977 orig_columns = meta_columns 978 df.rename(columns=orig_columns, inplace=True) 979 # -- restore indexes 980 idx_meta = meta.kind_meta.get('idx_meta') 981 if idx_meta: 982 df = restore_index(df, idx_meta) 983 # -- restore row order 984 if is_series: 985 index = df.index 986 name = df.columns[0] 987 df = df[name] 988 df.index = index 989 df.name = None if name == 'None' else name 990 return df
991
[docs] 992 def rebuild_params(self, kwargs, collection): 993 """ 994 Returns a modified set of parameters for querying mongodb 995 based on how the mongo document is structured and the 996 fields the document is grouped by. 997 998 **Note: Explicitly to be used with get_grouped_data only** 999 1000 :param kwargs: Mongo filter arguments 1001 :param collection: The name of mongodb collection 1002 :return: Returns a set of parameters as dictionary. 1003 """ 1004 modified_params = {} 1005 db_structure = collection.find_one({}, {'_id': False}) 1006 groupby_columns = list(set(db_structure.keys()) - set(['_data'])) 1007 if kwargs is not None: 1008 for item in kwargs: 1009 if item not in groupby_columns: 1010 modified_query_param = '_data.' + item 1011 modified_params[modified_query_param] = kwargs.get(item) 1012 else: 1013 modified_params[item] = kwargs.get(item) 1014 return modified_params
1015
[docs] 1016 def get_dataframe_dfgroup(self, name, version=-1, sanitize=True, kwargs=None): 1017 """ 1018 Return a grouped dataframe 1019 1020 :param name: the name of the object 1021 :param version: not supported 1022 :param kwargs: mongo db query arguments to be passed to 1023 collection.find() as a filter. 1024 :param sanitize: remove any $op operators in kwargs 1025 1026 """ 1027 import pandas as pd 1028 from omegaml.store.queryops import sanitize_filter 1029 from omegaml.store.filtered import FilteredCollection 1030 1031 def convert_doc_to_row(cursor): 1032 for doc in cursor: 1033 data = doc.pop('_data', []) 1034 for row in data: 1035 doc.update(row) 1036 yield doc 1037 1038 datastore = FilteredCollection(self.collection(name)) 1039 kwargs = kwargs if kwargs else {} 1040 params = self.rebuild_params(kwargs, datastore) 1041 params = sanitize_filter(params, no_ops=sanitize) 1042 cursor = datastore.find(params, projection={'_id': False}) 1043 df = pd.DataFrame(convert_doc_to_row(cursor)) 1044 return df
1045
[docs] 1046 def get_dataframe_hdf(self, name, version=-1): 1047 """ 1048 Retrieve dataframe from hdf 1049 1050 :param name: The name of object 1051 :param version: The version of object (not supported) 1052 :return: Returns a python pandas dataframe 1053 :raises: gridfs.errors.NoFile 1054 """ 1055 df = None 1056 meta = self.metadata(name) 1057 filename = getattr(meta.gridfile, 'name', self.object_store_key(name, '.hdf')) 1058 if self.fs.exists(filename=filename): 1059 df = self._extract_dataframe_hdf(filename, version=version) 1060 return df 1061 else: 1062 raise gridfs.errors.NoFile( 1063 "{0} does not exist in mongo collection '{1}'".format( 1064 name, self.bucket))
1065
[docs] 1066 def get_python_data(self, name, filter=None, version=-1, lazy=False, trusted=False, **kwargs): 1067 """ 1068 Retrieve objects as python data 1069 1070 :param name: The name of object 1071 :param version: The version of object 1072 1073 :return: Returns the object as python list object 1074 """ 1075 datastore = self.collection(name) 1076 filter = filter or kwargs 1077 sanitize_filter(filter) if trusted is False or trusted != signature(filter) else filter 1078 cursor = datastore.find(filter, **kwargs) 1079 if lazy: 1080 return cursor 1081 data = (d.get('data') for d in cursor) 1082 return list(data)
1083
[docs] 1084 def get_object_as_python(self, meta, version=-1): 1085 """ 1086 Retrieve object as python object 1087 1088 :param meta: The metadata object 1089 :param version: The version of the object 1090 1091 :return: Returns data as python object 1092 """ 1093 if meta.kind == MDREGISTRY.SKLEARN_JOBLIB: 1094 return meta.gridfile 1095 if meta.kind == MDREGISTRY.PANDAS_HDF: 1096 return meta.gridfile 1097 if meta.kind == MDREGISTRY.PANDAS_DFROWS: 1098 return list(getattr(self.mongodb, meta.collection).find()) 1099 if meta.kind == MDREGISTRY.PYTHON_DATA: 1100 col = getattr(self.mongodb, meta.collection) 1101 return col.find_one(dict(_id=meta.objid)).get('data') 1102 raise TypeError('cannot return kind %s as a python object' % meta.kind)
1103 1104 def __iter__(self): 1105 for f in self.list(include_temp=True): 1106 yield f 1107 1108 @property 1109 def buckets(self): 1110 return ['default' if b == self.defaults.OMEGA_MONGO_COLLECTION else b 1111 for b in self._Metadata.objects.distinct('bucket')] 1112
[docs] 1113 def list(self, pattern=None, regexp=None, kind=None, raw=False, hidden=None, 1114 include_temp=False, bucket=None, prefix=None, filter=None): 1115 """ 1116 List all files in store 1117 1118 specify pattern as a unix pattern (e.g. :code:`models/*`, 1119 or specify regexp) 1120 1121 :param pattern: the unix file pattern or None for all 1122 :param regexp: the regexp. takes precedence over pattern 1123 :param raw: if True return the meta data objects 1124 :param filter: specify additional filter criteria, optional 1125 :return: List of files in store 1126 1127 """ 1128 regex = lambda pattern: bson.regex.Regex(f'{pattern}') 1129 db = self.mongodb 1130 searchkeys = dict(bucket=bucket or self.bucket, 1131 prefix=prefix or self.prefix) 1132 q_excludes = Q() 1133 if regexp: 1134 searchkeys['name'] = regex(regexp) 1135 elif pattern: 1136 re_pattern = pattern.replace('*', '.*').replace('/', r'\/') 1137 searchkeys['name'] = regex(f'^{re_pattern}$') 1138 if not include_temp: 1139 q_excludes &= Q(name__not__startswith='_') 1140 q_excludes &= Q(name__not=regex(r'(.{1,*}\/?_.*)')) 1141 if not hidden: 1142 q_excludes &= Q(name__not__startswith='.') 1143 if kind or self.force_kind: 1144 kind = kind or self.force_kind 1145 if isinstance(kind, (tuple, list)): 1146 searchkeys.update(kind__in=kind) 1147 else: 1148 searchkeys.update(kind=kind) 1149 if filter: 1150 searchkeys.update(filter) 1151 q_search = Q(**searchkeys) & q_excludes 1152 files = self._Metadata.objects.no_cache()(q_search) 1153 return [f if raw else str(f.name).replace('.omm', '') for f in files]
1154
[docs] 1155 def exists(self, name, hidden=False): 1156 """ check if object exists 1157 1158 Args: 1159 name (str): name of object 1160 hidden (bool): if True, include hidden files, defaults to 1161 False, unless name starts with '.' 1162 1163 Returns: 1164 bool, True if object exists 1165 1166 .. versionchanged:: 0.16.4 1167 hidden defaults to True if name starts with '.' 1168 """ 1169 hidden = True if name.startswith('.') else hidden 1170 return name in self.list(name, hidden=hidden)
1171
[docs] 1172 def object_store_key(self, name, ext, hashed=None): 1173 """ 1174 Returns the store key 1175 1176 Unless you write a mixin or a backend you should not use this method 1177 1178 :param name: The name of object 1179 :param ext: The extension of the filename 1180 :param hashed: hash the key to support arbitrary name length, defaults 1181 to defaults.OMEGA_STORE_HASHEDNAMES, True by default since 0.13.7 1182 1183 :return: A filename with relative bucket, prefix and name 1184 """ 1185 # byte string 1186 _u8 = lambda t: t.encode('UTF-8', 'replace') if isinstance(t, str) else t 1187 key = self._get_obj_store_key(name, ext) 1188 hashed = hashed if hashed is not None else self.defaults.OMEGA_STORE_HASHEDNAMES 1189 if hashed: 1190 from hashlib import sha1 1191 # SEC: CWE-916 1192 # - status: wontfix 1193 # - reason: hashcode is used purely for name resolution, not a security function 1194 hasher = sha1() 1195 hasher.update(_u8(key)) 1196 key = hasher.hexdigest() 1197 return key
1198 1199 def _get_obj_store_key(self, name, ext, prefix=None, bucket=None): 1200 # backwards compatilibity implementation of object_store_key() 1201 name = '%s.%s' % (name, ext) if not name.endswith(ext) else name 1202 filename = '{bucket}.{prefix}.{name}'.format( 1203 bucket=bucket or self.bucket, 1204 prefix=prefix or self.prefix, 1205 name=name, 1206 ext=ext).replace('/', '_').replace('..', '.') 1207 return filename 1208 1209 def _package_dataframe2hdf(self, df, filename, key=None): 1210 """ 1211 Package a dataframe as a hdf file 1212 1213 :param df: The dataframe 1214 :param filename: Name of file 1215 1216 :return: Filename of hdf file 1217 """ 1218 lpath = tempfile.mkdtemp() 1219 fname = os.path.basename(filename) 1220 hdffname = os.path.join(self.tmppath, fname + '.hdf') 1221 key = key or 'data' 1222 df.to_hdf(hdffname, key) 1223 return hdffname 1224 1225 def _extract_dataframe_hdf(self, filename, version=-1): 1226 """ 1227 Extracts a dataframe from a stored hdf file 1228 1229 :param filename: The name of file 1230 :param version: The version of file 1231 1232 :return: Pandas dataframe 1233 """ 1234 import pandas as pd 1235 hdffname = os.path.join(self.tmppath, filename) 1236 dirname = os.path.dirname(hdffname) 1237 if not os.path.exists(dirname): 1238 os.makedirs(dirname) 1239 try: 1240 outf = self.fs.get_version(filename, version=version) 1241 except gridfs.errors.NoFile as e: 1242 raise e 1243 with open(hdffname, 'wb') as hdff: 1244 hdff.write(outf.read()) 1245 hdf = pd.HDFStore(hdffname) 1246 key = list(hdf.keys())[0] 1247 df = hdf[key] 1248 hdf.close() 1249 return df 1250 1251 def _ensure_fs_collection(self): 1252 # ensure backwards-compatible gridfs access 1253 if self.defaults.OMEGA_BUCKET_FS_LEGACY: 1254 # prior to 0.13.2 a single gridfs instance was used, always equal to the default collection 1255 return self.defaults.OMEGA_MONGO_COLLECTION 1256 if self.bucket == self.defaults.OMEGA_MONGO_COLLECTION: 1257 # from 0.13.2 onwards, only the default bucket is equal to the default collection 1258 # backwards compatibility for existing installations 1259 return self.bucket 1260 # since 0.13.2, all buckets other than the default use a qualified collection name to 1261 # effectively separate files in different buckets, enabling finer-grade access control 1262 # and avoiding name collisions from different buckets 1263 return '{}_{}'.format(self.defaults.OMEGA_MONGO_COLLECTION, self.bucket) 1264 1265 def _ensure_fs_index(self, fs): 1266 # make sure we have proper chunks and file indicies. this should be created on first write, but sometimes is not 1267 # see https://docs.mongodb.com/manual/core/gridfs/#gridfs-indexes 1268 # pymongo since 4.1 no longer has fs._GridFS 1269 chunks_collection = fs._GridFS__chunks if hasattr(fs, '_GridFS__chunks') else fs._chunks 1270 files_collection = fs._GridFS__files if hasattr(fs, '_GridFS__files') else fs._files 1271 ensure_index(chunks_collection, {'files_id': 1, 'n': 1}, unique=True) 1272 ensure_index(files_collection, {'filename': 1, 'uploadDate': 1}) 1273 1274 def sign(self, filter): 1275 return signature(filter) 1276 1277 @staticmethod 1278 def _cleanup(objrepr, tmppath=None): 1279 # cleanup any temporary files on exit 1280 # -- this is called as a finalizer (weakref.finalize) 1281 try: 1282 logger.debug(f'finalizing {objrepr}: cleaning up temporary files in {tmppath}') 1283 shutil.rmtree(tmppath, ignore_errors=True) 1284 except Exception as e: 1285 logger.error(f'finalizing {objrepr}: error occured as {e}')