Source code for omegaml.store.base

  1""" Native storage for OmegaML using mongodb as the storage layer
  2
  3An OmegaStore instance is a MongoDB database. It has at least the
  4metadata collection which lists all objects stored in it. A metadata
  5document refers to the following types of objects (metadata.kind):
  6
  7* pandas.dfrows - a Pandas DataFrame stored as a collection of rows
  8* sklearn.joblib - a scikit learn estimator/pipline dumped using joblib.dump()
  9* python.data - an arbitrary python dict, tuple, list stored as a document
 10
 11Note that storing Pandas and scikit learn objects requires the availability
 12of the respective packages. If either can not be imported, the OmegaStore
 13degrades to a python.data store only. It will still .list() and get() any
 14object, however reverts to pure python objects. In this case it is up
 15to the client to convert the data into an appropriate format for processing.
 16
 17Pandas and scikit-learn objects can only be stored if these packages are
 18availables. put() raises a TypeError if you pass such objects and these
 19modules cannot be loaded.
 20
 21All data are stored within the same mongodb, in per-object collections
 22as follows:
 23
 24    * .metadata
 25        all metadata. each object is one document,
 26        See **omegaml.documents.Metadata** for details
 27    * .<bucket>.files
 28        this is the GridFS instance used to store
 29        blobs (models, numpy, hdf). The actual file name
 30        will be <prefix>/<name>.<ext>, where ext is
 31        optionally generated by put() / get().
 32    * .<bucket>.<prefix>.<name>.data
 33        every other dataset is stored in a separate
 34        collection (dataframes, dicts, lists, tuples).
 35        Any forward slash in prefix is ignored (e.g. 'data/'
 36        becomes 'data')
 37
 38    DataFrames by default are stored in their own collection, every
 39    row becomes a document. To store dataframes as a binary file,
 40    use `put(...., as_hdf=True).` `.get()` will always return a dataframe.
 41
 42    Python dicts, lists, tuples are stored as a single document with
 43    a `.data` attribute holding the JSON-converted representation. `.get()`
 44    will always return the corresponding python object of .data.
 45
 46    Models are joblib.dump()'ed and ziped prior to transferring into
 47    GridFs. .get() will always unzip and joblib.load() before returning
 48    the model. Note this requires that the process using .get() supports
 49    joblib as well as all python classes referred to. If joblib is not
 50    supported, .get() returns a file-like object.
 51
 52    The .metadata entry specifies the format used to store each
 53    object as well as it's location:
 54
 55    * metadata.kind
 56        the type of object
 57    * metadata.name
 58        the name of the object, as given on put()
 59    * metadata.gridfile
 60        the gridfs object (if any, null otherwise)
 61    * metadata.collection
 62        the name of the collection
 63    * metadata.attributes
 64        arbitrary custom attributes set in
 65        put(attributes=obj). This is used e.g. by
 66        OmegaRuntime's fit() method to record the
 67        data used in the model's training.
 68
 69    **.put()** and **.get()** use helper methods specific to the type in
 70    object's type and metadata.kind, respectively. In the future
 71    a plugin system will enable extension to other types.
 72"""
 73from __future__ import absolute_import
 74
 75import logging
 76import os
 77import shutil
 78import warnings
 79import weakref
 80from uuid import uuid4
 81
 82import bson
 83import gridfs
 84from mongoengine.connection import disconnect, connect, _connections, get_db
 85from mongoengine.errors import DoesNotExist
 86from mongoengine.queryset.visitor import Q
 87
 88from omegaml.documents import make_Metadata, MDREGISTRY
 89from omegaml.mongoshim import sanitize_mongo_kwargs, waitForConnection
 90from omegaml.util import load_class, extend_instance, ensure_index, PickableCollection, signature
 91from omegaml.util import settings as omega_settings, urlparse
 92
 93logger = logging.getLogger(__name__)
 94
 95
[docs] 96class OmegaStore(object): 97 """ The storage backend for models and data 98 99 .. versionchanged:: NEXT 100 refactored all methods handling Python and Pandas datatypes to omegaml.backends.coreobjects.CoreObjectsBackend 101 """ 102 103 def __init__(self, mongo_url=None, bucket=None, prefix=None, kind=None, defaults=None, dbalias=None): 104 """ 105 :param mongo_url: the mongourl to use for the gridfs 106 :param bucket: the mongo collection to use for gridfs 107 :param prefix: the path prefix for files. defaults to blank 108 :param kind: the kind or list of kinds to limit this store to 109 """ 110 self.defaults = defaults or omega_settings() 111 self.mongo_url = mongo_url or self.defaults.OMEGA_MONGO_URL 112 self.bucket = bucket or self.defaults.OMEGA_MONGO_COLLECTION 113 self._fs_collection = self._ensure_fs_collection() 114 self._fs = None 115 self._tmppath = os.path.join(self.defaults.OMEGA_TMP, uuid4().hex) 116 self.prefix = prefix or '' 117 self.force_kind = kind 118 self._Metadata_cls = None 119 # don't initialize db here to avoid using the default settings 120 # otherwise Metadata will already have a connection and not use 121 # the one provided in override_settings 122 self._db = None 123 self._dbalias = dbalias 124 # add backends and mixins 125 self._apply_mixins() 126 # register backends 127 self.register_backends() 128 # register finalizer for cleanup 129 weakref.finalize(self, self._cleanup, repr(self), tmppath=str(self._tmppath)) 130 131 def __repr__(self): 132 return 'OmegaStore(bucket={}, prefix={})'.format(self.bucket, self.prefix) 133 134 def __equal__(self, other): 135 """test for equality of OmegaStore instances 136 137 Args: 138 other: OmegaStore instance 139 140 Returns: 141 True if other is the same database, same bucket, same prefix 142 """ 143 return self.mongo_url == other.mongo_url and self.bucket == other.bucket and self.prefix == other.prefix 144 145 @property 146 def tmppath(self): 147 """ 148 return an instance-specific temporary path 149 """ 150 os.makedirs(self._tmppath, exist_ok=True) 151 return self._tmppath 152 153 @property 154 def mongodb(self): 155 """ 156 Returns a mongo database object 157 """ 158 if self._db is not None: 159 return self._db 160 # parse salient parts of mongourl, e.g. 161 # mongodb://user:password@host/dbname 162 self.parsed_url = urlparse.urlparse(self.mongo_url) 163 self.database_name = self.parsed_url.path[1:] 164 host = self.parsed_url.netloc 165 scheme = self.parsed_url.scheme 166 username, password = None, None 167 if '@' in host: 168 creds, host = host.split('@', 1) 169 if ':' in creds: 170 username, password = creds.split(':') 171 # connect via mongoengine 172 # 173 # note this uses a MongoClient in the background, with pooled 174 # connections. there are multiprocessing issues with pymongo: 175 # http://api.mongodb.org/python/3.2/faq.html#using-pymongo-with-multiprocessing 176 # connect=False is due to https://jira.mongodb.org/browse/PYTHON-961 177 # this defers connecting until the first access 178 # serverSelectionTimeoutMS=2500 is to fail fast, the default is 30000 179 # 180 # use an instance specific alias, note that access to Metadata and 181 # QueryCache must pass the very same alias 182 self._dbalias = alias = self._dbalias or 'omega-{}'.format(uuid4().hex) 183 # always disconnect before registering a new connection because 184 # mongoengine.connect() forgets all connection settings upon disconnect 185 if alias not in _connections: 186 disconnect(alias) 187 connection = connect(alias=alias, db=self.database_name, 188 host=f'{scheme}://{host}', 189 username=username, 190 password=password, 191 connect=False, 192 authentication_source='admin', 193 **sanitize_mongo_kwargs(self.defaults.OMEGA_MONGO_SSL_KWARGS), 194 ) 195 # since PyMongo 4, connect() no longer waits for connection 196 waitForConnection(connection) 197 self._db = get_db(alias) 198 return self._db 199 200 @property 201 def _Metadata(self): 202 if self._Metadata_cls is None: 203 # hack to localize metadata 204 db = self.mongodb 205 self._Metadata_cls = make_Metadata(db_alias=self._dbalias, 206 collection=self._fs_collection) 207 return self._Metadata_cls 208 209 @property 210 def fs(self): 211 """ 212 Retrieve a gridfs instance using url and collection provided 213 214 :return: a gridfs instance 215 """ 216 if self._fs is not None: 217 return self._fs 218 self._fs = gridfs.GridFS(self.mongodb, collection=self._fs_collection) 219 self._ensure_fs_index(self._fs) 220 return self._fs 221
[docs] 222 def metadata(self, name=None, bucket=None, prefix=None, version=-1, **kwargs): 223 """ 224 Returns a metadata document for the given entry name 225 """ 226 # FIXME: version attribute does not do anything 227 # FIXME: metadata should be stored in a bucket-specific collection 228 # to enable access control, see https://docs.mongodb.com/manual/reference/method/db.create 229 # 230 # Role/#db.createRole 231 db = self.mongodb 232 fs = self.fs 233 prefix = prefix or self.prefix 234 bucket = bucket or self.bucket 235 # Meta is to silence lint on import error 236 Meta = self._Metadata 237 return Meta.objects(name=str(name), prefix=prefix, bucket=bucket).no_cache().first()
238
[docs] 239 def make_metadata(self, name, kind, bucket=None, prefix=None, **kwargs): 240 """ 241 create or update a metadata object 242 243 this retrieves a Metadata object if it exists given the kwargs. Only 244 the name, prefix and bucket arguments are considered 245 246 for existing Metadata objects, the attributes kw is treated as follows: 247 248 * attributes=None, the existing attributes are left as is 249 * attributes={}, the attributes value on an existing metadata object 250 is reset to the empty dict 251 * attributes={ some : value }, the existing attributes are updated 252 253 For new metadata objects, attributes defaults to {} if not specified, 254 else is set as provided. 255 256 :param name: the object name 257 :param bucket: the bucket, optional, defaults to self.bucket 258 :param prefix: the prefix, optional, defaults to self.prefix 259 260 """ 261 # TODO kept _make_metadata for backwards compatibility. 262 return self._make_metadata(name, bucket=bucket, prefix=prefix, 263 kind=kind, **kwargs)
264 265 def _make_metadata(self, name=None, bucket=None, prefix=None, **kwargs): 266 """ 267 create or update a metadata object 268 269 this retrieves a Metadata object if it exists given the kwargs. Only 270 the name, prefix and bucket arguments are considered 271 272 for existing Metadata objects, the attributes kw is treated as follows: 273 274 * attributes=None, the existing attributes are left as is 275 * attributes={}, the attributes value on an existing metadata object 276 is reset to the empty dict 277 * attributes={ some : value }, the existing attributes are updated 278 279 For new metadata objects, attributes defaults to {} if not specified, 280 else is set as provided. 281 282 :param name: the object name 283 :param bucket: the bucket, optional, defaults to self.bucket 284 :param prefix: the prefix, optional, defaults to self.prefix 285 """ 286 bucket = bucket or self.bucket 287 prefix = prefix or self.prefix 288 meta = self.metadata(name=name, 289 prefix=prefix, 290 bucket=bucket) 291 if meta: 292 dict_fields = 'attributes', 'kind_meta' 293 for k, v in kwargs.items(): 294 if k in dict_fields and v is not None and len(v) > 0: 295 previous = getattr(meta, k, {}) 296 previous.update(v) 297 setattr(meta, k, previous) 298 elif k in dict_fields and v is not None and len(v) == 0: 299 setattr(meta, k, {}) 300 elif k in dict_fields and v is None: 301 # ignore non specified attributes 302 continue 303 else: 304 # by default set whatever attribute is provided 305 setattr(meta, k, v) 306 else: 307 meta = self._Metadata(name=name, bucket=bucket, prefix=prefix, 308 **kwargs) 309 return meta 310 311 def _drop_metadata(self, name=None, **kwargs): 312 # internal method to delete meta data of an object 313 meta = self.metadata(name, **kwargs) 314 if meta is not None: 315 meta.delete() 316
[docs] 317 def collection(self, name=None, bucket=None, prefix=None): 318 """ 319 Returns a mongo db collection as a datastore 320 321 If there is an existing object of name, will return the .collection 322 of the object. Otherwise returns the collection according to naming 323 convention {bucket}.{prefix}.{name}.datastore 324 325 :param name: the collection to use. if none defaults to the 326 collection name given on instantiation. the actual collection name 327 used is always prefix + name + '.data' 328 """ 329 # see if we have a known object and a collection for that, if not define it 330 meta = self.metadata(name, bucket=bucket, prefix=prefix) 331 collection = meta.collection if meta else None 332 if not collection: 333 collection = self.object_store_key(name, '.datastore') 334 collection = collection.replace('..', '.') 335 # return the collection 336 try: 337 datastore = getattr(self.mongodb, collection) 338 except Exception as e: 339 raise e 340 return PickableCollection(datastore)
341 342 def _apply_mixins(self): 343 """ 344 apply mixins in defaults.OMEGA_STORE_MIXINS 345 """ 346 for mixin in self.defaults.OMEGA_STORE_MIXINS: 347 conditional = self._mixins_conditional 348 extend_instance(self, mixin, 349 conditional=conditional) 350 351 def _mixins_conditional(self, cls, obj): 352 return cls.supports(obj) if hasattr(cls, 'supports') else True 353
[docs] 354 def register_backends(self): 355 """ 356 register backends in defaults.OMEGA_STORE_BACKENDS 357 """ 358 # enable list modification within loop 359 # -- avoid RuntimeError: dictionary changed size during iteration 360 backends = list(self.defaults.OMEGA_STORE_BACKENDS.items()) 361 for kind, backend in backends: 362 self.register_backend(kind, backend)
363
[docs] 364 def register_backend(self, kind, backend, index=-1): 365 """ 366 register a backend class 367 368 :param kind: (str) the backend kind 369 :param backend: (class) the backend class 370 :param index: (int) the insert position, defaults to -1, which means 371 to append 372 373 .. versionchanged:: NEXT 374 added index to have more control over backend evaluation by .get_backend_byobj() 375 376 .. versionchanged:: NEXT 377 backends can specify cls.KIND_EXT to register additional kinds 378 """ 379 backend_cls = load_class(backend) 380 backend_kinds = [backend_cls.KIND] + list(getattr(backend_cls, 'KIND_EXT', [])) 381 for kind in backend_kinds: 382 self.defaults.OMEGA_STORE_BACKENDS[kind] = backend_cls 383 if kind not in MDREGISTRY.KINDS: 384 pos = len(MDREGISTRY.KINDS) + index if index < 0 else index 385 MDREGISTRY.KINDS.insert(pos, kind) 386 return self
387
[docs] 388 def register_mixin(self, mixincls): 389 """ 390 register a mixin class 391 392 :param mixincls: (class) the mixin class 393 """ 394 extend_instance(self, mixincls) 395 return self
396
[docs] 397 def put(self, obj, name, attributes=None, kind=None, replace=False, model_store=None, 398 data_store=None, **kwargs): 399 """ 400 Stores an object, store estimators, pipelines, numpy arrays or 401 pandas dataframes 402 """ 403 if replace: 404 self.drop(name, force=True) 405 backend = self.get_backend_byobj(obj, name, attributes=attributes, kind=kind, 406 model_store=model_store, data_store=data_store, 407 **kwargs) 408 if backend: 409 return backend.put(obj, name, attributes=attributes, **kwargs) 410 raise TypeError('type %s not supported' % type(obj))
411
[docs] 412 def drop(self, name, force=False, version=-1, report=False, **kwargs): 413 """ 414 Drop the object 415 416 :param name: The name of the object. If the name is a pattern it will 417 be expanded using .list(), and call .drop() on every obj found. 418 :param force: If True ignores DoesNotExist exception, defaults to False 419 meaning this raises a DoesNotExist exception if the name does not 420 exist 421 :param report: if True returns a dict name=>status, where status is True 422 if deleted, False if not deleted 423 :return: True if object was deleted, False if not. 424 If force is True and the object does not exist it will still return True 425 :raises: DoesNotExist if the object does not exist and ```force=False``` 426 """ 427 is_pattern = '*' in name 428 objs = [name] if not is_pattern else self.list(name) 429 results = [] 430 for name in objs: 431 try: 432 backend = self.get_backend(name) 433 drop = backend.drop if backend else self._drop 434 result = drop(name, force=force, version=version, **kwargs) 435 except Exception as e: 436 result = False 437 if not force and not is_pattern: 438 raise 439 if force: 440 result = self._drop(name, force=force, version=version) 441 results.append((name, result)) 442 if not objs: 443 result = self._drop(name, force=force, version=version) 444 results.append((name, result)) 445 return dict(results) if report else len(results) > 0
446 447 def _drop(self, name, force=False, version=-1, keep_data=False, **kwargs): 448 meta = self.metadata(name, version=version) 449 if meta is None and not force: 450 raise DoesNotExist(name) 451 collection = self.collection(name) 452 if collection and not keep_data: 453 self.mongodb.drop_collection(collection.name) 454 if meta: 455 if meta.collection and not keep_data: 456 self.mongodb.drop_collection(meta.collection) 457 if meta and meta.gridfile is not None and not keep_data: 458 meta.gridfile.delete() 459 self._drop_metadata(name) 460 return True 461 return False 462
[docs] 463 def get_backend_bykind(self, kind, model_store=None, data_store=None, 464 **kwargs): 465 """ 466 return the backend by a given object kind 467 468 :param kind: The object kind 469 :param model_store: the OmegaStore instance used to store models 470 :param data_store: the OmegaStore instance used to store data 471 :param kwargs: the kwargs passed to the backend initialization 472 :return: the backend 473 """ 474 try: 475 backend_cls = load_class(self.defaults.OMEGA_STORE_BACKENDS[kind]) 476 except KeyError as e: 477 raise ValueError('backend {kind} does not exist'.format(**locals())) 478 model_store = model_store or self 479 data_store = data_store or self 480 backend = backend_cls(model_store=model_store, 481 data_store=data_store, **kwargs) 482 return backend
483
[docs] 484 def get_backend(self, name, model_store=None, data_store=None, **kwargs): 485 """ 486 return the backend by a given object name 487 488 :param kind: The object kind 489 :param model_store: the OmegaStore instance used to store models 490 :param data_store: the OmegaStore instance used to store data 491 :param kwargs: the kwargs passed to the backend initialization 492 :return: the backend 493 """ 494 meta = self.metadata(name) 495 if meta is not None and meta.kind in self.defaults.OMEGA_STORE_BACKENDS: 496 return self.get_backend_bykind(meta.kind, 497 model_store=model_store, 498 data_store=data_store, 499 **kwargs) 500 return None
501
[docs] 502 def help(self, name_or_obj=None, kind=None, raw=False, display=None, renderer=None): 503 """ get help for an object by looking up its backend and calling help() on it 504 505 Retrieves the object's metadata and looks up its corresponding backend. If the 506 metadata.attributes['docs'] is a string it will display this as the help() contents. 507 If the string starts with 'http://' or 'https://' it will open the web page. 508 509 Args: 510 name_or_obj (str|obj): the name or actual object to get help for 511 kind (str): optional, if specified forces retrieval of backend for the given kind 512 raw (bool): optional, if True forces help to be the backend type of the object. 513 If False returns the attributes[docs] on the object's metadata, if available. 514 Defaults to False 515 display (fn): optional, callable for interactive display, defaults to help in 516 if sys.flags.interactive is True, else uses pydoc.render_doc with plaintext 517 renderer (fn): optional, the renderer= argument for pydoc.render_doc to use if 518 sys.flags.interactive is False and display is not provided 519 520 Returns: 521 * help(obj) if python is in interactive mode 522 * text(str) if python is in not interactive mode 523 """ 524 import sys 525 import pydoc 526 interactive = bool(display) if display is not None else sys.flags.interactive 527 display = display or help 528 renderer = renderer or pydoc.plaintext 529 obj = self._resolve_help_backend(name_or_obj=name_or_obj, kind=kind, raw=raw) 530 if any(str(obj.__doc__).startswith(v) for v in ('https://', 'http://')): 531 obj = obj.__doc__ 532 if interactive and display is help: 533 import webbrowser 534 display = webbrowser.open 535 return display(obj) if interactive else pydoc.render_doc(obj, renderer=renderer)
536 537 def _resolve_help_backend(self, name_or_obj=None, kind=None, raw=False): 538 # helper so we can test help 539 meta = self.metadata(name_or_obj) if name_or_obj else None 540 if kind: 541 backend = self.get_backend_bykind(kind) 542 else: 543 backend = self.get_backend(name_or_obj) or self.get_backend_byobj(name_or_obj) 544 if backend is None: 545 backend = self.get_backend_bykind('core.object', model_store=self, data_store=self) 546 if not raw and meta is not None and 'docs' in meta.attributes: 547 def UserDocumentation(): 548 pass 549 550 basedoc = backend.__doc__ or '' 551 UserDocumentation.__doc__ = (basedoc + 552 meta.attributes['docs']) 553 backend = UserDocumentation 554 return backend 555
[docs] 556 def get_backend_byobj(self, obj, name=None, kind=None, attributes=None, 557 model_store=None, data_store=None, **kwargs): 558 """ 559 return the matching backend for the given obj 560 561 Returns: 562 the first backend that supports the given parameters or None 563 564 .. versionchanged:: NEXT 565 backends are tested in order of MDREGISTRY.KINDS, see .register_backend() 566 """ 567 model_store = model_store or self 568 data_store = data_store or self 569 meta = self.metadata(name) if name else None 570 kind = kind or (meta.kind if meta is not None else None) 571 backend = None 572 if kind: 573 if kind in self.defaults.OMEGA_STORE_BACKENDS: 574 backend = self.get_backend_bykind(kind, data_store=data_store, model_store=model_store) 575 if not backend.supports(obj, name, attributes=attributes, 576 data_store=data_store, 577 model_store=model_store, 578 meta=meta, 579 kind=kind, **kwargs): 580 objtype = str(type(obj)) 581 warnings.warn('Backend {kind} does not support {objtype}'.format(**locals())) 582 else: 583 # revert to core backend 584 backend = self.get_backend_bykind('core.object', model_store=model_store, data_store=data_store) 585 else: 586 # sort by order in MDREGISTRY.KINDS, only using kinds that actually have a registered backend 587 sorted_backends = (k for k in MDREGISTRY.KINDS if k in self.defaults.OMEGA_STORE_BACKENDS) 588 for backend_kind in sorted_backends: 589 backend = self.get_backend_bykind(backend_kind, data_store=data_store, model_store=model_store) 590 if backend.supports(obj, name, attributes=attributes, 591 data_store=data_store, model_store=model_store, 592 **kwargs): 593 break 594 else: 595 backend = None 596 return backend
597
[docs] 598 def getl(self, *args, **kwargs): 599 """ return a lazy MDataFrame for a given object 600 601 Same as .get, but returns a MDataFrame 602 603 """ 604 return self.get(*args, lazy=True, **kwargs)
605
[docs] 606 def get(self, name, version=-1, force_python=False, 607 kind=None, model_store=None, data_store=None, **kwargs): 608 """ 609 Retrieve an object 610 611 :param name: The name of the object 612 :param version: Version of the stored object (not supported) 613 :param force_python: Return as a python object 614 :param kwargs: kwargs depending on object kind 615 :return: an object, estimator, pipelines, data array or pandas dataframe 616 previously stored with put() 617 """ 618 meta = self.metadata(name, version=version) 619 if meta is None: 620 return None 621 if not force_python: 622 backend = (self.get_backend(name, model_store=model_store, 623 data_store=data_store) 624 if not kind else self.get_backend_bykind(kind, model_store=model_store, data_store=data_store)) 625 if backend is not None: 626 # FIXME: some backends need to get model_store, data_store, but fails tests 627 return backend.get(name, **kwargs) # model_store=model_store, data_store=data_store, **kwargs) 628 # catch-call to CoreObjectsBackend or force python 629 # -- keeping the same behavior until version 0.17, handling all other KINDs 630 core_backend = self.get_backend_bykind('core.object', model_store=model_store, data_store=data_store) 631 if force_python: 632 return core_backend.get_object_as_python(meta, version=version) 633 return core_backend.get(name, version=version, force_python=force_python, **kwargs)
634 635 def __iter__(self): 636 for f in self.list(include_temp=True): 637 yield f 638 639 @property 640 def buckets(self): 641 return ['default' if b == self.defaults.OMEGA_MONGO_COLLECTION else b 642 for b in self._Metadata.objects.distinct('bucket')] 643
[docs] 644 def list(self, pattern=None, regexp=None, kind=None, raw=False, hidden=None, 645 include_temp=False, bucket=None, prefix=None, filter=None): 646 """ 647 List all files in store 648 649 specify pattern as a unix pattern (e.g. :code:`models/*`, 650 or specify regexp) 651 652 :param pattern: the unix file pattern or None for all 653 :param regexp: the regexp. takes precedence over pattern 654 :param raw: if True return the meta data objects 655 :param filter: specify additional filter criteria, optional 656 :return: List of files in store 657 658 """ 659 regex = lambda pattern: bson.regex.Regex(f'{pattern}') 660 db = self.mongodb 661 searchkeys = dict(bucket=bucket or self.bucket, 662 prefix=prefix or self.prefix) 663 q_excludes = Q() 664 if regexp: 665 searchkeys['name'] = regex(regexp) 666 elif pattern: 667 re_pattern = pattern.replace('*', '.*').replace('/', r'\/') 668 searchkeys['name'] = regex(f'^{re_pattern}$') 669 if not include_temp: 670 q_excludes &= Q(name__not__startswith='_') 671 q_excludes &= Q(name__not=regex(r'(.{1,*}\/?_.*)')) 672 if not hidden: 673 q_excludes &= Q(name__not__startswith='.') 674 if kind or self.force_kind: 675 kind = kind or self.force_kind 676 if isinstance(kind, (tuple, list)): 677 searchkeys.update(kind__in=kind) 678 else: 679 searchkeys.update(kind=kind) 680 if filter: 681 searchkeys.update(filter) 682 q_search = Q(**searchkeys) & q_excludes 683 files = self._Metadata.objects.no_cache()(q_search) 684 return [f if raw else str(f.name).replace('.omm', '') for f in files]
685
[docs] 686 def exists(self, name, hidden=False): 687 """ check if object exists 688 689 Args: 690 name (str): name of object 691 hidden (bool): if True, include hidden files, defaults to 692 False, unless name starts with '.' 693 694 Returns: 695 bool, True if object exists 696 697 .. versionchanged:: 0.16.4 698 hidden defaults to True if name starts with '.' 699 """ 700 hidden = True if name.startswith('.') else hidden 701 return name in self.list(name, hidden=hidden)
702
[docs] 703 def object_store_key(self, name, ext, hashed=None): 704 """ 705 Returns the store key 706 707 Unless you write a mixin or a backend you should not use this method 708 709 :param name: The name of object 710 :param ext: The extension of the filename 711 :param hashed: hash the key to support arbitrary name length, defaults 712 to defaults.OMEGA_STORE_HASHEDNAMES, True by default since 0.13.7 713 714 :return: A filename with relative bucket, prefix and name 715 """ 716 # byte string 717 _u8 = lambda t: t.encode('UTF-8', 'replace') if isinstance(t, str) else t 718 key = self._get_obj_store_key(name, ext) 719 hashed = hashed if hashed is not None else self.defaults.OMEGA_STORE_HASHEDNAMES 720 if hashed: 721 from hashlib import sha1 722 # SEC: CWE-916 723 # - status: wontfix 724 # - reason: hashcode is used purely for name resolution, not a security function 725 hasher = sha1() 726 hasher.update(_u8(key)) 727 key = hasher.hexdigest() 728 return key
729 730 def _get_obj_store_key(self, name, ext, prefix=None, bucket=None): 731 # backwards compatilibity implementation of object_store_key() 732 name = '%s.%s' % (name, ext) if not name.endswith(ext) else name 733 filename = '{bucket}.{prefix}.{name}'.format( 734 bucket=bucket or self.bucket, 735 prefix=prefix or self.prefix, 736 name=name, 737 ext=ext).replace('/', '_').replace('..', '.') 738 return filename 739 740 def _ensure_fs_collection(self): 741 # ensure backwards-compatible gridfs access 742 if self.defaults.OMEGA_BUCKET_FS_LEGACY: 743 # prior to 0.13.2 a single gridfs instance was used, always equal to the default collection 744 return self.defaults.OMEGA_MONGO_COLLECTION 745 if self.bucket == self.defaults.OMEGA_MONGO_COLLECTION: 746 # from 0.13.2 onwards, only the default bucket is equal to the default collection 747 # backwards compatibility for existing installations 748 return self.bucket 749 # since 0.13.2, all buckets other than the default use a qualified collection name to 750 # effectively separate files in different buckets, enabling finer-grade access control 751 # and avoiding name collisions from different buckets 752 return '{}_{}'.format(self.defaults.OMEGA_MONGO_COLLECTION, self.bucket) 753 754 def _ensure_fs_index(self, fs): 755 # make sure we have proper chunks and file indicies. this should be created on first write, but sometimes is not 756 # see https://docs.mongodb.com/manual/core/gridfs/#gridfs-indexes 757 # pymongo since 4.1 no longer has fs._GridFS 758 chunks_collection = fs._GridFS__chunks if hasattr(fs, '_GridFS__chunks') else fs._chunks 759 files_collection = fs._GridFS__files if hasattr(fs, '_GridFS__files') else fs._files 760 ensure_index(chunks_collection, {'files_id': 1, 'n': 1}, unique=True) 761 ensure_index(files_collection, {'filename': 1, 'uploadDate': 1}) 762 763 def sign(self, filter): 764 return signature(filter) 765 766 @staticmethod 767 def _cleanup(objrepr, tmppath=None): 768 # cleanup any temporary files on exit 769 # -- this is called as a finalizer (weakref.finalize) 770 try: 771 logger.debug(f'finalizing {objrepr}: cleaning up temporary files in {tmppath}') 772 shutil.rmtree(tmppath, ignore_errors=True) 773 except Exception as e: 774 logger.error(f'finalizing {objrepr}: error occured as {e}')