Source code for omegaml.store.base

  1""" Native storage for OmegaML using mongodb as the storage layer
  2
  3An OmegaStore instance is a MongoDB database. It has at least the
  4metadata collection which lists all objects stored in it. A metadata
  5document refers to the following types of objects (metadata.kind):
  6
  7* pandas.dfrows - a Pandas DataFrame stored as a collection of rows
  8* sklearn.joblib - a scikit learn estimator/pipline dumped using joblib.dump()
  9* python.data - an arbitrary python dict, tuple, list stored as a document
 10
 11Note that storing Pandas and scikit learn objects requires the availability
 12of the respective packages. If either can not be imported, the OmegaStore
 13degrades to a python.data store only. It will still .list() and get() any
 14object, however reverts to pure python objects. In this case it is up
 15to the client to convert the data into an appropriate format for processing.
 16
 17Pandas and scikit-learn objects can only be stored if these packages are
 18availables. put() raises a TypeError if you pass such objects and these
 19modules cannot be loaded.
 20
 21All data are stored within the same mongodb, in per-object collections
 22as follows:
 23
 24    * .metadata
 25        all metadata. each object is one document,
 26        See **omegaml.documents.Metadata** for details
 27    * .<bucket>.files
 28        this is the GridFS instance used to store
 29        blobs (models, numpy, hdf). The actual file name
 30        will be <prefix>/<name>.<ext>, where ext is
 31        optionally generated by put() / get().
 32    * .<bucket>.<prefix>.<name>.data
 33        every other dataset is stored in a separate
 34        collection (dataframes, dicts, lists, tuples).
 35        Any forward slash in prefix is ignored (e.g. 'data/'
 36        becomes 'data')
 37
 38    DataFrames by default are stored in their own collection, every
 39    row becomes a document. To store dataframes as a binary file,
 40    use `put(...., as_hdf=True).` `.get()` will always return a dataframe.
 41
 42    Python dicts, lists, tuples are stored as a single document with
 43    a `.data` attribute holding the JSON-converted representation. `.get()`
 44    will always return the corresponding python object of .data.
 45
 46    Models are joblib.dump()'ed and ziped prior to transferring into
 47    GridFs. .get() will always unzip and joblib.load() before returning
 48    the model. Note this requires that the process using .get() supports
 49    joblib as well as all python classes referred to. If joblib is not
 50    supported, .get() returns a file-like object.
 51
 52    The .metadata entry specifies the format used to store each
 53    object as well as it's location:
 54
 55    * metadata.kind
 56        the type of object
 57    * metadata.name
 58        the name of the object, as given on put()
 59    * metadata.gridfile
 60        the gridfs object (if any, null otherwise)
 61    * metadata.collection
 62        the name of the collection
 63    * metadata.attributes
 64        arbitrary custom attributes set in
 65        put(attributes=obj). This is used e.g. by
 66        OmegaRuntime's fit() method to record the
 67        data used in the model's training.
 68
 69    **.put()** and **.get()** use helper methods specific to the type in
 70    object's type and metadata.kind, respectively. In the future
 71    a plugin system will enable extension to other types.
 72"""
 73from __future__ import absolute_import
 74
 75import logging
 76import os
 77import shutil
 78import warnings
 79import weakref
 80from uuid import uuid4
 81
 82import bson
 83import gridfs
 84from mongoengine.connection import disconnect, connect, get_db
 85from mongoengine.errors import DoesNotExist
 86from mongoengine.queryset.visitor import Q
 87
 88from omegaml.documents import make_Metadata, MDREGISTRY
 89from omegaml.mongoshim import sanitize_mongo_kwargs, waitForConnection
 90from omegaml.util import load_class, extend_instance, ensure_index, PickableCollection, signature
 91from omegaml.util import settings as omega_settings, urlparse
 92
 93logger = logging.getLogger(__name__)
 94
 95
[docs] 96class OmegaStore(object): 97 """ The storage backend for models and data 98 99 .. versionchanged:: 0.18 100 refactored all methods handling Python and Pandas datatypes to omegaml.backends.coreobjects.CoreObjectsBackend 101 """ 102 103 def __init__(self, mongo_url=None, bucket=None, prefix=None, kind=None, defaults=None, dbalias=None): 104 """ 105 :param mongo_url: the mongourl to use for the gridfs 106 :param bucket: the mongo collection to use for gridfs 107 :param prefix: the path prefix for files. defaults to blank 108 :param kind: the kind or list of kinds to limit this store to 109 """ 110 self.defaults = defaults or omega_settings() 111 self.mongo_url = mongo_url or self.defaults.OMEGA_MONGO_URL 112 self.bucket = bucket or self.defaults.OMEGA_MONGO_COLLECTION 113 self._fs_collection = self._ensure_fs_collection() 114 self._fs = None 115 self._tmppath = os.path.join(self.defaults.OMEGA_TMP, uuid4().hex) 116 self.prefix = prefix or '' 117 self.force_kind = kind 118 self._Metadata_cls = None 119 # don't initialize db here to avoid using the default settings 120 # otherwise Metadata will already have a connection and not use 121 # the one provided in override_settings 122 self._db = None 123 self._dbalias = dbalias 124 # add backends and mixins 125 self._apply_mixins() 126 # register backends 127 self.register_backends() 128 # register finalizer for cleanup 129 weakref.finalize(self, self._cleanup, repr(self), tmppath=str(self._tmppath)) 130 131 def __repr__(self): 132 return 'OmegaStore(bucket={}, prefix={})'.format(self.bucket, self.prefix) 133 134 def __equal__(self, other): 135 """test for equality of OmegaStore instances 136 137 Args: 138 other: OmegaStore instance 139 140 Returns: 141 True if other is the same database, same bucket, same prefix 142 """ 143 return self.mongo_url == other.mongo_url and self.bucket == other.bucket and self.prefix == other.prefix 144 145 @property 146 def tmppath(self): 147 """ 148 return an instance-specific temporary path 149 """ 150 os.makedirs(self._tmppath, exist_ok=True) 151 return self._tmppath 152 153 @property 154 def mongodb(self): 155 """ 156 Returns a mongo database object 157 """ 158 if self._db is not None: 159 return self._db 160 # parse salient parts of mongourl, e.g. 161 # mongodb://user:password@host/dbname 162 self.parsed_url = urlparse.urlparse(self.mongo_url) 163 self.database_name = self.parsed_url.path[1:] 164 host = self.parsed_url.netloc 165 scheme = self.parsed_url.scheme 166 username, password = None, None 167 if '@' in host: 168 creds, host = host.split('@', 1) 169 if ':' in creds: 170 username, password = creds.split(':') 171 # connect via mongoengine 172 # 173 # note this uses a MongoClient in the background, with pooled 174 # connections. there are multiprocessing issues with pymongo: 175 # http://api.mongodb.org/python/3.2/faq.html#using-pymongo-with-multiprocessing 176 # connect=False is due to https://jira.mongodb.org/browse/PYTHON-961 177 # this defers connecting until the first access 178 # serverSelectionTimeoutMS=2500 is to fail fast, the default is 30000 179 # 180 # use an instance specific alias, note that access to Metadata and 181 # QueryCache must pass the very same alias 182 self._dbalias = alias = self._dbalias or 'omega-{}'.format(uuid4().hex) 183 # local import of _connections ensure we get the actual object, not an earlier version 184 from mongoengine.connection import _connections 185 if alias not in _connections: 186 # always disconnect before registering a new connection because 187 # mongoengine.connect() forgets all connection settings upon disconnect 188 disconnect(alias) 189 connection = connect(alias=alias, db=self.database_name, 190 host=f'{scheme}://{host}', 191 username=username, 192 password=password, 193 connect=False, 194 authentication_source='admin', 195 **sanitize_mongo_kwargs(self.defaults.OMEGA_MONGO_SSL_KWARGS), 196 ) 197 # since PyMongo 4, connect() no longer waits for connection 198 waitForConnection(connection) 199 self._db = get_db(alias) 200 return self._db 201 202 @property 203 def _Metadata(self): 204 if self._Metadata_cls is None: 205 # hack to localize metadata 206 db = self.mongodb 207 self._Metadata_cls = make_Metadata(db_alias=self._dbalias, 208 collection=self._fs_collection) 209 return self._Metadata_cls 210 211 @property 212 def fs(self): 213 """ 214 Retrieve a gridfs instance using url and collection provided 215 216 :return: a gridfs instance 217 """ 218 if self._fs is not None: 219 return self._fs 220 self._fs = gridfs.GridFS(self.mongodb, collection=self._fs_collection) 221 self._ensure_fs_index(self._fs) 222 return self._fs 223
[docs] 224 def metadata(self, name=None, bucket=None, prefix=None, version=-1, **kwargs): 225 """ 226 Returns a metadata document for the given entry name 227 """ 228 # FIXME: version attribute does not do anything 229 # FIXME: metadata should be stored in a bucket-specific collection 230 # to enable access control, see https://docs.mongodb.com/manual/reference/method/db.create 231 # 232 # Role/#db.createRole 233 db = self.mongodb 234 fs = self.fs 235 prefix = prefix or self.prefix 236 bucket = bucket or self.bucket 237 # Meta is to silence lint on import error 238 Meta = self._Metadata 239 return Meta.objects(name=str(name), prefix=prefix, bucket=bucket).no_cache().limit(1).first()
240
[docs] 241 def make_metadata(self, name, kind, bucket=None, prefix=None, **kwargs): 242 """ 243 create or update a metadata object 244 245 this retrieves a Metadata object if it exists given the kwargs. Only 246 the name, prefix and bucket arguments are considered 247 248 for existing Metadata objects, the attributes kw is treated as follows: 249 250 * attributes=None, the existing attributes are left as is 251 * attributes={}, the attributes value on an existing metadata object 252 is reset to the empty dict 253 * attributes={ some : value }, the existing attributes are updated 254 255 For new metadata objects, attributes defaults to {} if not specified, 256 else is set as provided. 257 258 :param name: the object name 259 :param bucket: the bucket, optional, defaults to self.bucket 260 :param prefix: the prefix, optional, defaults to self.prefix 261 262 """ 263 # TODO kept _make_metadata for backwards compatibility. 264 return self._make_metadata(name, bucket=bucket, prefix=prefix, 265 kind=kind, **kwargs)
266 267 def _make_metadata(self, name=None, bucket=None, prefix=None, **kwargs): 268 """ 269 create or update a metadata object 270 271 this retrieves a Metadata object if it exists given the kwargs. Only 272 the name, prefix and bucket arguments are considered 273 274 for existing Metadata objects, the attributes kw is treated as follows: 275 276 * attributes=None, the existing attributes are left as is 277 * attributes={}, the attributes value on an existing metadata object 278 is reset to the empty dict 279 * attributes={ some : value }, the existing attributes are updated 280 281 For new metadata objects, attributes defaults to {} if not specified, 282 else is set as provided. 283 284 :param name: the object name 285 :param bucket: the bucket, optional, defaults to self.bucket 286 :param prefix: the prefix, optional, defaults to self.prefix 287 """ 288 bucket = bucket or self.bucket 289 prefix = prefix or self.prefix 290 meta = self.metadata(name=name, 291 prefix=prefix, 292 bucket=bucket) 293 if meta: 294 dict_fields = 'attributes', 'kind_meta' 295 for k, v in kwargs.items(): 296 if k in dict_fields and v is not None and len(v) > 0: 297 previous = getattr(meta, k, {}) 298 previous.update(v) 299 setattr(meta, k, previous) 300 elif k in dict_fields and v is not None and len(v) == 0: 301 setattr(meta, k, {}) 302 elif k in dict_fields and v is None: 303 # ignore non specified attributes 304 continue 305 else: 306 # by default set whatever attribute is provided 307 setattr(meta, k, v) 308 else: 309 meta = self._Metadata(name=name, bucket=bucket, prefix=prefix, 310 **kwargs) 311 return meta 312 313 def _drop_metadata(self, name=None, **kwargs): 314 # internal method to delete meta data of an object 315 meta = self.metadata(name, **kwargs) 316 if meta is not None: 317 meta.delete() 318
[docs] 319 def collection(self, name=None, bucket=None, prefix=None): 320 """ 321 Returns a mongo db collection as a datastore 322 323 If there is an existing object of name, will return the .collection 324 of the object. Otherwise returns the collection according to naming 325 convention {bucket}.{prefix}.{name}.datastore 326 327 :param name: the collection to use. if none defaults to the 328 collection name given on instantiation. the actual collection name 329 used is always prefix + name + '.data' 330 """ 331 # see if we have a known object and a collection for that, if not define it 332 meta = self.metadata(name, bucket=bucket, prefix=prefix) 333 collection = meta.collection if meta else None 334 if not collection: 335 collection = self.object_store_key(name, '.datastore') 336 collection = collection.replace('..', '.') 337 # return the collection 338 try: 339 datastore = getattr(self.mongodb, collection) 340 except Exception as e: 341 raise e 342 return PickableCollection(datastore)
343 344 def _apply_mixins(self): 345 """ 346 apply mixins in defaults.OMEGA_STORE_MIXINS 347 """ 348 for mixin in self.defaults.OMEGA_STORE_MIXINS: 349 conditional = self._mixins_conditional 350 extend_instance(self, mixin, 351 conditional=conditional) 352 353 def _mixins_conditional(self, cls, obj): 354 return cls.supports(obj) if hasattr(cls, 'supports') else True 355
[docs] 356 def register_backends(self): 357 """ 358 register backends in defaults.OMEGA_STORE_BACKENDS 359 """ 360 # enable list modification within loop 361 # -- avoid RuntimeError: dictionary changed size during iteration 362 backends = list(self.defaults.OMEGA_STORE_BACKENDS.items()) 363 for kind, backend in backends: 364 self.register_backend(kind, backend)
365
[docs] 366 def register_backend(self, kind, backend, index=-1): 367 """ 368 register a backend class 369 370 :param kind: (str) the backend kind 371 :param backend: (class) the backend class 372 :param index: (int) the insert position, defaults to -1, which means 373 to append 374 375 .. versionchanged:: 0.18 376 added index to have more control over backend evaluation by .get_backend_byobj() 377 378 .. versionchanged:: 0.18 379 backends can specify cls.KIND_EXT to register additional kinds 380 """ 381 backend_cls = load_class(backend) 382 backend_kinds = [backend_cls.KIND] + list(getattr(backend_cls, 'KIND_EXT', [])) 383 for kind in backend_kinds: 384 self.defaults.OMEGA_STORE_BACKENDS[kind] = backend_cls 385 if kind not in MDREGISTRY.KINDS: 386 pos = len(MDREGISTRY.KINDS) + index if index < 0 else index 387 MDREGISTRY.KINDS.insert(pos, kind) 388 return self
389
[docs] 390 def register_mixin(self, mixincls): 391 """ 392 register a mixin class 393 394 :param mixincls: (class) the mixin class 395 """ 396 extend_instance(self, mixincls) 397 return self
398
[docs] 399 def put(self, obj, name, attributes=None, kind=None, replace=False, model_store=None, 400 data_store=None, **kwargs): 401 """ 402 Stores an object, store estimators, pipelines, numpy arrays or 403 pandas dataframes 404 """ 405 if replace: 406 self.drop(name, force=True) 407 backend = self.get_backend_byobj(obj, name, attributes=attributes, kind=kind, 408 model_store=model_store, data_store=data_store, 409 **kwargs) 410 if backend: 411 return backend.put(obj, name, attributes=attributes, **kwargs) 412 raise TypeError('type %s not supported' % type(obj))
413
[docs] 414 def drop(self, name, force=False, version=-1, report=False, **kwargs): 415 """ 416 Drop the object 417 418 :param name: The name of the object. If the name is a pattern it will 419 be expanded using .list(), and call .drop() on every obj found. 420 :param force: If True ignores DoesNotExist exception, defaults to False 421 meaning this raises a DoesNotExist exception if the name does not 422 exist 423 :param report: if True returns a dict name=>status, where status is True 424 if deleted, False if not deleted 425 :return: True if object was deleted, False if not. 426 If force is True and the object does not exist it will still return True 427 :raises: DoesNotExist if the object does not exist and ```force=False``` 428 """ 429 is_pattern = '*' in name 430 objs = [name] if not is_pattern else self.list(name) 431 results = [] 432 for name in objs: 433 try: 434 backend = self.get_backend(name) 435 drop = backend.drop if hasattr(backend, 'drop') else self._drop 436 result = drop(name, force=force, version=version, **kwargs) 437 except Exception as e: 438 result = False 439 if not force and not is_pattern: 440 raise 441 if force: 442 result = self._drop(name, force=force, version=version) 443 results.append((name, result)) 444 if not objs: 445 result = self._drop(name, force=force, version=version) 446 results.append((name, result)) 447 return dict(results) if report else len(results) > 0
448 449 def _drop(self, name, force=False, version=-1, keep_data=False, **kwargs): 450 meta = self.metadata(name, version=version) 451 if meta is None and not force: 452 raise DoesNotExist(name) 453 collection = self.collection(name) 454 if collection and not keep_data: 455 self.mongodb.drop_collection(collection.name) 456 if meta: 457 if meta.collection and not keep_data: 458 self.mongodb.drop_collection(meta.collection) 459 if meta and meta.gridfile is not None and not keep_data: 460 meta.gridfile.delete() 461 self._drop_metadata(name) 462 return True 463 return False 464
[docs] 465 def get_backend_bykind(self, kind, model_store=None, data_store=None, 466 **kwargs): 467 """ 468 return the backend by a given object kind 469 470 :param kind: The object kind 471 :param model_store: the OmegaStore instance used to store models 472 :param data_store: the OmegaStore instance used to store data 473 :param kwargs: the kwargs passed to the backend initialization 474 :return: the backend 475 """ 476 try: 477 backend_cls = load_class(self.defaults.OMEGA_STORE_BACKENDS[kind]) 478 except KeyError as e: 479 raise ValueError('backend {kind} does not exist'.format(**locals())) 480 model_store = model_store or self 481 data_store = data_store or self 482 backend = backend_cls(model_store=model_store, 483 data_store=data_store, **kwargs) 484 return backend
485
[docs] 486 def get_backend(self, name, model_store=None, data_store=None, **kwargs): 487 """ 488 return the backend by a given object name 489 490 :param kind: The object kind 491 :param model_store: the OmegaStore instance used to store models 492 :param data_store: the OmegaStore instance used to store data 493 :param kwargs: the kwargs passed to the backend initialization 494 :return: the backend 495 """ 496 meta = self.metadata(name) 497 if meta is not None and meta.kind in self.defaults.OMEGA_STORE_BACKENDS: 498 return self.get_backend_bykind(meta.kind, 499 model_store=model_store, 500 data_store=data_store, 501 **kwargs) 502 return None
503
[docs] 504 def help(self, name_or_obj=None, kind=None, raw=False, display=None, renderer=None): 505 """ get help for an object by looking up its backend and calling help() on it 506 507 Retrieves the object's metadata and looks up its corresponding backend. If the 508 metadata.attributes['docs'] is a string it will display this as the help() contents. 509 If the string starts with 'http://' or 'https://' it will open the web page. 510 511 Args: 512 name_or_obj (str|obj): the name or actual object to get help for 513 kind (str): optional, if specified forces retrieval of backend for the given kind 514 raw (bool): optional, if True forces help to be the backend type of the object. 515 If False returns the attributes[docs] on the object's metadata, if available. 516 Defaults to False 517 display (fn): optional, callable for interactive display, defaults to help in 518 if sys.flags.interactive is True, else uses pydoc.render_doc with plaintext 519 renderer (fn): optional, the renderer= argument for pydoc.render_doc to use if 520 sys.flags.interactive is False and display is not provided 521 522 Returns: 523 * help(obj) if python is in interactive mode 524 * text(str) if python is in not interactive mode 525 """ 526 import sys 527 import pydoc 528 interactive = bool(display) if display is not None else sys.flags.interactive 529 display = display or help 530 renderer = renderer or pydoc.plaintext 531 obj = self._resolve_help_backend(name_or_obj=name_or_obj, kind=kind, raw=raw) 532 if any(str(obj.__doc__).startswith(v) for v in ('https://', 'http://')): 533 obj = obj.__doc__ 534 if interactive and display is help: 535 import webbrowser 536 display = webbrowser.open 537 return display(obj) if interactive else pydoc.render_doc(obj, renderer=renderer)
538 539 def _resolve_help_backend(self, name_or_obj=None, kind=None, raw=False): 540 # helper so we can test help 541 meta = self.metadata(name_or_obj) if name_or_obj else None 542 if kind: 543 backend = self.get_backend_bykind(kind) 544 else: 545 backend = self.get_backend(name_or_obj) or self.get_backend_byobj(name_or_obj) 546 if backend is None: 547 backend = self.get_backend_bykind('core.object', model_store=self, data_store=self) 548 if not raw and meta is not None and 'docs' in meta.attributes: 549 def UserDocumentation(): 550 pass 551 552 basedoc = backend.__doc__ or '' 553 UserDocumentation.__doc__ = (basedoc + 554 meta.attributes['docs']) 555 backend = UserDocumentation 556 return backend 557
[docs] 558 def get_backend_byobj(self, obj, name=None, kind=None, attributes=None, 559 model_store=None, data_store=None, **kwargs): 560 """ 561 return the matching backend for the given obj 562 563 Returns: 564 the first backend that supports the given parameters or None 565 566 .. versionchanged:: 0.18 567 backends are tested in order of MDREGISTRY.KINDS, see .register_backend() 568 """ 569 model_store = model_store or self 570 data_store = data_store or self 571 meta = self.metadata(name) if name else None 572 kind = kind or (meta.kind if meta is not None else None) 573 backend = None 574 if kind: 575 if kind in self.defaults.OMEGA_STORE_BACKENDS: 576 backend = self.get_backend_bykind(kind, data_store=data_store, model_store=model_store) 577 if not backend.supports(obj, name, attributes=attributes, 578 data_store=data_store, 579 model_store=model_store, 580 meta=meta, 581 kind=kind, **kwargs): 582 objtype = str(type(obj)) 583 warnings.warn('Backend {kind} does not support {objtype}'.format(**locals())) 584 else: 585 # revert to core backend 586 backend = self.get_backend_bykind('core.object', model_store=model_store, data_store=data_store) 587 else: 588 # sort by order in MDREGISTRY.KINDS, only using kinds that actually have a registered backend 589 sorted_backends = (k for k in MDREGISTRY.KINDS if k in self.defaults.OMEGA_STORE_BACKENDS) 590 for backend_kind in sorted_backends: 591 backend = self.get_backend_bykind(backend_kind, data_store=data_store, model_store=model_store) 592 if backend.supports(obj, name, attributes=attributes, 593 data_store=data_store, model_store=model_store, 594 **kwargs): 595 break 596 else: 597 backend = None 598 return backend
599
[docs] 600 def getl(self, *args, **kwargs): 601 """ return a lazy MDataFrame for a given object 602 603 Same as .get, but returns a MDataFrame 604 605 """ 606 return self.get(*args, lazy=True, **kwargs)
607
[docs] 608 def get(self, name, version=-1, force_python=False, 609 kind=None, model_store=None, data_store=None, **kwargs): 610 """ 611 Retrieve an object 612 613 :param name: The name of the object 614 :param version: Version of the stored object (not supported) 615 :param force_python: Return as a python object 616 :param kwargs: kwargs depending on object kind 617 :return: an object, estimator, pipelines, data array or pandas dataframe 618 previously stored with put() 619 """ 620 meta = self.metadata(name, version=version) 621 if meta is None: 622 return None 623 if not force_python: 624 backend = (self.get_backend(name, model_store=model_store, 625 data_store=data_store, **kwargs) 626 if not kind else self.get_backend_bykind(kind, model_store=model_store, 627 data_store=data_store, **kwargs)) 628 if backend is not None: 629 # FIXME: some backends need to get model_store, data_store, but fails tests 630 return backend.get(name, **kwargs) # model_store=model_store, data_store=data_store, **kwargs) 631 # catch-call to CoreObjectsBackend or force python 632 # -- keeping the same behavior until version 0.17, handling all other KINDs 633 core_backend = self.get_backend_bykind('core.object', model_store=model_store, data_store=data_store, **kwargs) 634 if force_python: 635 return core_backend.get_object_as_python(meta, version=version) 636 return core_backend.get(name, version=version, force_python=force_python, **kwargs)
637 638 def __iter__(self): 639 for f in self.list(include_temp=True): 640 yield f 641 642 @property 643 def buckets(self): 644 return ['default' if b == self.defaults.OMEGA_MONGO_COLLECTION else b 645 for b in self._Metadata.objects.distinct('bucket')] 646
[docs] 647 def list(self, pattern=None, regexp=None, kind=None, raw=False, hidden=None, 648 include_temp=False, bucket=None, prefix=None, filter=None): 649 """ 650 List all files in store 651 652 specify pattern as a unix pattern (e.g. :code:`models/*`, 653 or specify regexp) 654 655 :param pattern: the unix file pattern or None for all 656 :param regexp: the regexp. takes precedence over pattern 657 :param raw: if True return the meta data objects 658 :param filter: specify additional filter criteria, optional 659 :return: List of files in store 660 661 """ 662 regex = lambda pattern: bson.regex.Regex(f'{pattern}') 663 db = self.mongodb 664 searchkeys = dict(bucket=bucket or self.bucket, 665 prefix=prefix or self.prefix) 666 q_excludes = Q() 667 if regexp: 668 searchkeys['name'] = regex(regexp) 669 elif pattern: 670 re_pattern = pattern.replace('*', '.*').replace('/', r'\/') 671 searchkeys['name'] = regex(f'^{re_pattern}$') 672 if not include_temp: 673 q_excludes &= Q(name__not__startswith='_') 674 q_excludes &= Q(name__not=regex(r'(.{1,*}\/?_.*)')) 675 if not hidden: 676 q_excludes &= Q(name__not__startswith='.') 677 if kind or self.force_kind: 678 kind = kind or self.force_kind 679 if isinstance(kind, (tuple, list)): 680 searchkeys.update(kind__in=kind) 681 else: 682 searchkeys.update(kind=kind) 683 if filter: 684 searchkeys.update(filter) 685 q_search = Q(**searchkeys) & q_excludes 686 files = self._Metadata.objects.no_cache()(q_search) 687 return [f if raw else str(f.name).replace('.omm', '') for f in files]
688
[docs] 689 def exists(self, name, hidden=False): 690 """ check if object exists 691 692 Args: 693 name (str): name of object 694 hidden (bool): if True, include hidden files, defaults to 695 False, unless name starts with '.' 696 697 Returns: 698 bool, True if object exists 699 700 .. versionchanged:: 0.16.4 701 hidden defaults to True if name starts with '.' 702 """ 703 hidden = True if name.startswith('.') else hidden 704 return name in self.list(name, hidden=hidden)
705
[docs] 706 def object_store_key(self, name, ext, hashed=None): 707 """ 708 Returns the store key 709 710 Unless you write a mixin or a backend you should not use this method 711 712 :param name: The name of object 713 :param ext: The extension of the filename 714 :param hashed: hash the key to support arbitrary name length, defaults 715 to defaults.OMEGA_STORE_HASHEDNAMES, True by default since 0.13.7 716 717 :return: A filename with relative bucket, prefix and name 718 """ 719 # byte string 720 _u8 = lambda t: t.encode('UTF-8', 'replace') if isinstance(t, str) else t 721 key = self._get_obj_store_key(name, ext) 722 hashed = hashed if hashed is not None else self.defaults.OMEGA_STORE_HASHEDNAMES 723 if hashed: 724 from hashlib import sha1 725 # SEC: CWE-916 726 # - status: wontfix 727 # - reason: hashcode is used purely for name resolution, not a security function 728 hasher = sha1() 729 hasher.update(_u8(key)) 730 key = hasher.hexdigest() 731 return key
732 733 def _get_obj_store_key(self, name, ext, prefix=None, bucket=None): 734 # backwards compatilibity implementation of object_store_key() 735 name = '%s.%s' % (name, ext) if not name.endswith(ext) else name 736 filename = '{bucket}.{prefix}.{name}'.format( 737 bucket=bucket or self.bucket, 738 prefix=prefix or self.prefix, 739 name=name, 740 ext=ext).replace('/', '_').replace('..', '.') 741 return filename 742 743 def _ensure_fs_collection(self): 744 # ensure backwards-compatible gridfs access 745 if self.defaults.OMEGA_BUCKET_FS_LEGACY: 746 # prior to 0.13.2 a single gridfs instance was used, always equal to the default collection 747 return self.defaults.OMEGA_MONGO_COLLECTION 748 if self.bucket == self.defaults.OMEGA_MONGO_COLLECTION: 749 # from 0.13.2 onwards, only the default bucket is equal to the default collection 750 # backwards compatibility for existing installations 751 return self.bucket 752 # since 0.13.2, all buckets other than the default use a qualified collection name to 753 # effectively separate files in different buckets, enabling finer-grade access control 754 # and avoiding name collisions from different buckets 755 return '{}_{}'.format(self.defaults.OMEGA_MONGO_COLLECTION, self.bucket) 756 757 def _ensure_fs_index(self, fs): 758 # make sure we have proper chunks and file indicies. this should be created on first write, but sometimes is not 759 # see https://docs.mongodb.com/manual/core/gridfs/#gridfs-indexes 760 # pymongo since 4.1 no longer has fs._GridFS 761 chunks_collection = fs._GridFS__chunks if hasattr(fs, '_GridFS__chunks') else fs._chunks 762 files_collection = fs._GridFS__files if hasattr(fs, '_GridFS__files') else fs._files 763 ensure_index(chunks_collection, {'files_id': 1, 'n': 1}, unique=True) 764 ensure_index(files_collection, {'filename': 1, 'uploadDate': 1}) 765 766 def sign(self, filter): 767 return signature(filter) 768 769 @staticmethod 770 def _cleanup(objrepr, tmppath=None): 771 # cleanup any temporary files on exit 772 # -- this is called as a finalizer (weakref.finalize) 773 try: 774 shutil.rmtree(tmppath, ignore_errors=True) 775 except Exception as e: 776 logger.error(f'finalizing {objrepr}: error occured as {e}') 777 return 778 try: 779 logger.debug(f'finalizing {objrepr}: cleaning up temporary files in {tmppath}') 780 except Exception: 781 pass