1""" Native storage for OmegaML using mongodb as the storage layer
2
3An OmegaStore instance is a MongoDB database. It has at least the
4metadata collection which lists all objects stored in it. A metadata
5document refers to the following types of objects (metadata.kind):
6
7* pandas.dfrows - a Pandas DataFrame stored as a collection of rows
8* sklearn.joblib - a scikit learn estimator/pipline dumped using joblib.dump()
9* python.data - an arbitrary python dict, tuple, list stored as a document
10
11Note that storing Pandas and scikit learn objects requires the availability
12of the respective packages. If either can not be imported, the OmegaStore
13degrades to a python.data store only. It will still .list() and get() any
14object, however reverts to pure python objects. In this case it is up
15to the client to convert the data into an appropriate format for processing.
16
17Pandas and scikit-learn objects can only be stored if these packages are
18availables. put() raises a TypeError if you pass such objects and these
19modules cannot be loaded.
20
21All data are stored within the same mongodb, in per-object collections
22as follows:
23
24 * .metadata
25 all metadata. each object is one document,
26 See **omegaml.documents.Metadata** for details
27 * .<bucket>.files
28 this is the GridFS instance used to store
29 blobs (models, numpy, hdf). The actual file name
30 will be <prefix>/<name>.<ext>, where ext is
31 optionally generated by put() / get().
32 * .<bucket>.<prefix>.<name>.data
33 every other dataset is stored in a separate
34 collection (dataframes, dicts, lists, tuples).
35 Any forward slash in prefix is ignored (e.g. 'data/'
36 becomes 'data')
37
38 DataFrames by default are stored in their own collection, every
39 row becomes a document. To store dataframes as a binary file,
40 use `put(...., as_hdf=True).` `.get()` will always return a dataframe.
41
42 Python dicts, lists, tuples are stored as a single document with
43 a `.data` attribute holding the JSON-converted representation. `.get()`
44 will always return the corresponding python object of .data.
45
46 Models are joblib.dump()'ed and ziped prior to transferring into
47 GridFs. .get() will always unzip and joblib.load() before returning
48 the model. Note this requires that the process using .get() supports
49 joblib as well as all python classes referred to. If joblib is not
50 supported, .get() returns a file-like object.
51
52 The .metadata entry specifies the format used to store each
53 object as well as it's location:
54
55 * metadata.kind
56 the type of object
57 * metadata.name
58 the name of the object, as given on put()
59 * metadata.gridfile
60 the gridfs object (if any, null otherwise)
61 * metadata.collection
62 the name of the collection
63 * metadata.attributes
64 arbitrary custom attributes set in
65 put(attributes=obj). This is used e.g. by
66 OmegaRuntime's fit() method to record the
67 data used in the model's training.
68
69 **.put()** and **.get()** use helper methods specific to the type in
70 object's type and metadata.kind, respectively. In the future
71 a plugin system will enable extension to other types.
72"""
73from __future__ import absolute_import
74
75import logging
76import os
77import shutil
78import warnings
79import weakref
80from uuid import uuid4
81
82import bson
83import gridfs
84from mongoengine.connection import disconnect, connect, get_db
85from mongoengine.errors import DoesNotExist
86from mongoengine.queryset.visitor import Q
87
88from omegaml.documents import make_Metadata, MDREGISTRY
89from omegaml.mongoshim import sanitize_mongo_kwargs, waitForConnection
90from omegaml.util import load_class, extend_instance, ensure_index, PickableCollection, signature
91from omegaml.util import settings as omega_settings, urlparse
92
93logger = logging.getLogger(__name__)
94
95
[docs]
96class OmegaStore(object):
97 """ The storage backend for models and data
98
99 .. versionchanged:: 0.18
100 refactored all methods handling Python and Pandas datatypes to omegaml.backends.coreobjects.CoreObjectsBackend
101 """
102
103 def __init__(self, mongo_url=None, bucket=None, prefix=None, kind=None, defaults=None, dbalias=None):
104 """
105 :param mongo_url: the mongourl to use for the gridfs
106 :param bucket: the mongo collection to use for gridfs
107 :param prefix: the path prefix for files. defaults to blank
108 :param kind: the kind or list of kinds to limit this store to
109 """
110 self.defaults = defaults or omega_settings()
111 self.mongo_url = mongo_url or self.defaults.OMEGA_MONGO_URL
112 self.bucket = bucket or self.defaults.OMEGA_MONGO_COLLECTION
113 self._fs_collection = self._ensure_fs_collection()
114 self._fs = None
115 self._tmppath = os.path.join(self.defaults.OMEGA_TMP, uuid4().hex)
116 self.prefix = prefix or ''
117 self.force_kind = kind
118 self._Metadata_cls = None
119 # don't initialize db here to avoid using the default settings
120 # otherwise Metadata will already have a connection and not use
121 # the one provided in override_settings
122 self._db = None
123 self._dbalias = dbalias
124 # add backends and mixins
125 self._apply_mixins()
126 # register backends
127 self.register_backends()
128 # register finalizer for cleanup
129 weakref.finalize(self, self._cleanup, repr(self), tmppath=str(self._tmppath))
130
131 def __repr__(self):
132 return 'OmegaStore(bucket={}, prefix={})'.format(self.bucket, self.prefix)
133
134 def __equal__(self, other):
135 """test for equality of OmegaStore instances
136
137 Args:
138 other: OmegaStore instance
139
140 Returns:
141 True if other is the same database, same bucket, same prefix
142 """
143 return self.mongo_url == other.mongo_url and self.bucket == other.bucket and self.prefix == other.prefix
144
145 @property
146 def tmppath(self):
147 """
148 return an instance-specific temporary path
149 """
150 os.makedirs(self._tmppath, exist_ok=True)
151 return self._tmppath
152
153 @property
154 def mongodb(self):
155 """
156 Returns a mongo database object
157 """
158 if self._db is not None:
159 return self._db
160 # parse salient parts of mongourl, e.g.
161 # mongodb://user:password@host/dbname
162 self.parsed_url = urlparse.urlparse(self.mongo_url)
163 self.database_name = self.parsed_url.path[1:]
164 host = self.parsed_url.netloc
165 scheme = self.parsed_url.scheme
166 username, password = None, None
167 if '@' in host:
168 creds, host = host.split('@', 1)
169 if ':' in creds:
170 username, password = creds.split(':')
171 # connect via mongoengine
172 #
173 # note this uses a MongoClient in the background, with pooled
174 # connections. there are multiprocessing issues with pymongo:
175 # http://api.mongodb.org/python/3.2/faq.html#using-pymongo-with-multiprocessing
176 # connect=False is due to https://jira.mongodb.org/browse/PYTHON-961
177 # this defers connecting until the first access
178 # serverSelectionTimeoutMS=2500 is to fail fast, the default is 30000
179 #
180 # use an instance specific alias, note that access to Metadata and
181 # QueryCache must pass the very same alias
182 self._dbalias = alias = self._dbalias or 'omega-{}'.format(uuid4().hex)
183 # local import of _connections ensure we get the actual object, not an earlier version
184 from mongoengine.connection import _connections
185 if alias not in _connections:
186 # always disconnect before registering a new connection because
187 # mongoengine.connect() forgets all connection settings upon disconnect
188 disconnect(alias)
189 connection = connect(alias=alias, db=self.database_name,
190 host=f'{scheme}://{host}',
191 username=username,
192 password=password,
193 connect=False,
194 authentication_source='admin',
195 **sanitize_mongo_kwargs(self.defaults.OMEGA_MONGO_SSL_KWARGS),
196 )
197 # since PyMongo 4, connect() no longer waits for connection
198 waitForConnection(connection)
199 self._db = get_db(alias)
200 return self._db
201
202 @property
203 def _Metadata(self):
204 if self._Metadata_cls is None:
205 # hack to localize metadata
206 db = self.mongodb
207 self._Metadata_cls = make_Metadata(db_alias=self._dbalias,
208 collection=self._fs_collection)
209 return self._Metadata_cls
210
211 @property
212 def fs(self):
213 """
214 Retrieve a gridfs instance using url and collection provided
215
216 :return: a gridfs instance
217 """
218 if self._fs is not None:
219 return self._fs
220 self._fs = gridfs.GridFS(self.mongodb, collection=self._fs_collection)
221 self._ensure_fs_index(self._fs)
222 return self._fs
223
240
266
267 def _make_metadata(self, name=None, bucket=None, prefix=None, **kwargs):
268 """
269 create or update a metadata object
270
271 this retrieves a Metadata object if it exists given the kwargs. Only
272 the name, prefix and bucket arguments are considered
273
274 for existing Metadata objects, the attributes kw is treated as follows:
275
276 * attributes=None, the existing attributes are left as is
277 * attributes={}, the attributes value on an existing metadata object
278 is reset to the empty dict
279 * attributes={ some : value }, the existing attributes are updated
280
281 For new metadata objects, attributes defaults to {} if not specified,
282 else is set as provided.
283
284 :param name: the object name
285 :param bucket: the bucket, optional, defaults to self.bucket
286 :param prefix: the prefix, optional, defaults to self.prefix
287 """
288 bucket = bucket or self.bucket
289 prefix = prefix or self.prefix
290 meta = self.metadata(name=name,
291 prefix=prefix,
292 bucket=bucket)
293 if meta:
294 dict_fields = 'attributes', 'kind_meta'
295 for k, v in kwargs.items():
296 if k in dict_fields and v is not None and len(v) > 0:
297 previous = getattr(meta, k, {})
298 previous.update(v)
299 setattr(meta, k, previous)
300 elif k in dict_fields and v is not None and len(v) == 0:
301 setattr(meta, k, {})
302 elif k in dict_fields and v is None:
303 # ignore non specified attributes
304 continue
305 else:
306 # by default set whatever attribute is provided
307 setattr(meta, k, v)
308 else:
309 meta = self._Metadata(name=name, bucket=bucket, prefix=prefix,
310 **kwargs)
311 return meta
312
313 def _drop_metadata(self, name=None, **kwargs):
314 # internal method to delete meta data of an object
315 meta = self.metadata(name, **kwargs)
316 if meta is not None:
317 meta.delete()
318
[docs]
319 def collection(self, name=None, bucket=None, prefix=None):
320 """
321 Returns a mongo db collection as a datastore
322
323 If there is an existing object of name, will return the .collection
324 of the object. Otherwise returns the collection according to naming
325 convention {bucket}.{prefix}.{name}.datastore
326
327 :param name: the collection to use. if none defaults to the
328 collection name given on instantiation. the actual collection name
329 used is always prefix + name + '.data'
330 """
331 # see if we have a known object and a collection for that, if not define it
332 meta = self.metadata(name, bucket=bucket, prefix=prefix)
333 collection = meta.collection if meta else None
334 if not collection:
335 collection = self.object_store_key(name, '.datastore')
336 collection = collection.replace('..', '.')
337 # return the collection
338 try:
339 datastore = getattr(self.mongodb, collection)
340 except Exception as e:
341 raise e
342 return PickableCollection(datastore)
343
344 def _apply_mixins(self):
345 """
346 apply mixins in defaults.OMEGA_STORE_MIXINS
347 """
348 for mixin in self.defaults.OMEGA_STORE_MIXINS:
349 conditional = self._mixins_conditional
350 extend_instance(self, mixin,
351 conditional=conditional)
352
353 def _mixins_conditional(self, cls, obj):
354 return cls.supports(obj) if hasattr(cls, 'supports') else True
355
[docs]
356 def register_backends(self):
357 """
358 register backends in defaults.OMEGA_STORE_BACKENDS
359 """
360 # enable list modification within loop
361 # -- avoid RuntimeError: dictionary changed size during iteration
362 backends = list(self.defaults.OMEGA_STORE_BACKENDS.items())
363 for kind, backend in backends:
364 self.register_backend(kind, backend)
365
[docs]
366 def register_backend(self, kind, backend, index=-1):
367 """
368 register a backend class
369
370 :param kind: (str) the backend kind
371 :param backend: (class) the backend class
372 :param index: (int) the insert position, defaults to -1, which means
373 to append
374
375 .. versionchanged:: 0.18
376 added index to have more control over backend evaluation by .get_backend_byobj()
377
378 .. versionchanged:: 0.18
379 backends can specify cls.KIND_EXT to register additional kinds
380 """
381 backend_cls = load_class(backend)
382 backend_kinds = [backend_cls.KIND] + list(getattr(backend_cls, 'KIND_EXT', []))
383 for kind in backend_kinds:
384 self.defaults.OMEGA_STORE_BACKENDS[kind] = backend_cls
385 if kind not in MDREGISTRY.KINDS:
386 pos = len(MDREGISTRY.KINDS) + index if index < 0 else index
387 MDREGISTRY.KINDS.insert(pos, kind)
388 return self
389
[docs]
390 def register_mixin(self, mixincls):
391 """
392 register a mixin class
393
394 :param mixincls: (class) the mixin class
395 """
396 extend_instance(self, mixincls)
397 return self
398
[docs]
399 def put(self, obj, name, attributes=None, kind=None, replace=False, model_store=None,
400 data_store=None, **kwargs):
401 """
402 Stores an object, store estimators, pipelines, numpy arrays or
403 pandas dataframes
404 """
405 if replace:
406 self.drop(name, force=True)
407 backend = self.get_backend_byobj(obj, name, attributes=attributes, kind=kind,
408 model_store=model_store, data_store=data_store,
409 **kwargs)
410 if backend:
411 return backend.put(obj, name, attributes=attributes, **kwargs)
412 raise TypeError('type %s not supported' % type(obj))
413
[docs]
414 def drop(self, name, force=False, version=-1, report=False, **kwargs):
415 """
416 Drop the object
417
418 :param name: The name of the object. If the name is a pattern it will
419 be expanded using .list(), and call .drop() on every obj found.
420 :param force: If True ignores DoesNotExist exception, defaults to False
421 meaning this raises a DoesNotExist exception if the name does not
422 exist
423 :param report: if True returns a dict name=>status, where status is True
424 if deleted, False if not deleted
425 :return: True if object was deleted, False if not.
426 If force is True and the object does not exist it will still return True
427 :raises: DoesNotExist if the object does not exist and ```force=False```
428 """
429 is_pattern = '*' in name
430 objs = [name] if not is_pattern else self.list(name)
431 results = []
432 for name in objs:
433 try:
434 backend = self.get_backend(name)
435 drop = backend.drop if hasattr(backend, 'drop') else self._drop
436 result = drop(name, force=force, version=version, **kwargs)
437 except Exception as e:
438 result = False
439 if not force and not is_pattern:
440 raise
441 if force:
442 result = self._drop(name, force=force, version=version)
443 results.append((name, result))
444 if not objs:
445 result = self._drop(name, force=force, version=version)
446 results.append((name, result))
447 return dict(results) if report else len(results) > 0
448
449 def _drop(self, name, force=False, version=-1, keep_data=False, **kwargs):
450 meta = self.metadata(name, version=version)
451 if meta is None and not force:
452 raise DoesNotExist(name)
453 collection = self.collection(name)
454 if collection and not keep_data:
455 self.mongodb.drop_collection(collection.name)
456 if meta:
457 if meta.collection and not keep_data:
458 self.mongodb.drop_collection(meta.collection)
459 if meta and meta.gridfile is not None and not keep_data:
460 meta.gridfile.delete()
461 self._drop_metadata(name)
462 return True
463 return False
464
[docs]
465 def get_backend_bykind(self, kind, model_store=None, data_store=None,
466 **kwargs):
467 """
468 return the backend by a given object kind
469
470 :param kind: The object kind
471 :param model_store: the OmegaStore instance used to store models
472 :param data_store: the OmegaStore instance used to store data
473 :param kwargs: the kwargs passed to the backend initialization
474 :return: the backend
475 """
476 try:
477 backend_cls = load_class(self.defaults.OMEGA_STORE_BACKENDS[kind])
478 except KeyError as e:
479 raise ValueError('backend {kind} does not exist'.format(**locals()))
480 model_store = model_store or self
481 data_store = data_store or self
482 backend = backend_cls(model_store=model_store,
483 data_store=data_store, **kwargs)
484 return backend
485
[docs]
486 def get_backend(self, name, model_store=None, data_store=None, **kwargs):
487 """
488 return the backend by a given object name
489
490 :param kind: The object kind
491 :param model_store: the OmegaStore instance used to store models
492 :param data_store: the OmegaStore instance used to store data
493 :param kwargs: the kwargs passed to the backend initialization
494 :return: the backend
495 """
496 meta = self.metadata(name)
497 if meta is not None and meta.kind in self.defaults.OMEGA_STORE_BACKENDS:
498 return self.get_backend_bykind(meta.kind,
499 model_store=model_store,
500 data_store=data_store,
501 **kwargs)
502 return None
503
[docs]
504 def help(self, name_or_obj=None, kind=None, raw=False, display=None, renderer=None):
505 """ get help for an object by looking up its backend and calling help() on it
506
507 Retrieves the object's metadata and looks up its corresponding backend. If the
508 metadata.attributes['docs'] is a string it will display this as the help() contents.
509 If the string starts with 'http://' or 'https://' it will open the web page.
510
511 Args:
512 name_or_obj (str|obj): the name or actual object to get help for
513 kind (str): optional, if specified forces retrieval of backend for the given kind
514 raw (bool): optional, if True forces help to be the backend type of the object.
515 If False returns the attributes[docs] on the object's metadata, if available.
516 Defaults to False
517 display (fn): optional, callable for interactive display, defaults to help in
518 if sys.flags.interactive is True, else uses pydoc.render_doc with plaintext
519 renderer (fn): optional, the renderer= argument for pydoc.render_doc to use if
520 sys.flags.interactive is False and display is not provided
521
522 Returns:
523 * help(obj) if python is in interactive mode
524 * text(str) if python is in not interactive mode
525 """
526 import sys
527 import pydoc
528 interactive = bool(display) if display is not None else sys.flags.interactive
529 display = display or help
530 renderer = renderer or pydoc.plaintext
531 obj = self._resolve_help_backend(name_or_obj=name_or_obj, kind=kind, raw=raw)
532 if any(str(obj.__doc__).startswith(v) for v in ('https://', 'http://')):
533 obj = obj.__doc__
534 if interactive and display is help:
535 import webbrowser
536 display = webbrowser.open
537 return display(obj) if interactive else pydoc.render_doc(obj, renderer=renderer)
538
539 def _resolve_help_backend(self, name_or_obj=None, kind=None, raw=False):
540 # helper so we can test help
541 meta = self.metadata(name_or_obj) if name_or_obj else None
542 if kind:
543 backend = self.get_backend_bykind(kind)
544 else:
545 backend = self.get_backend(name_or_obj) or self.get_backend_byobj(name_or_obj)
546 if backend is None:
547 backend = self.get_backend_bykind('core.object', model_store=self, data_store=self)
548 if not raw and meta is not None and 'docs' in meta.attributes:
549 def UserDocumentation():
550 pass
551
552 basedoc = backend.__doc__ or ''
553 UserDocumentation.__doc__ = (basedoc +
554 meta.attributes['docs'])
555 backend = UserDocumentation
556 return backend
557
[docs]
558 def get_backend_byobj(self, obj, name=None, kind=None, attributes=None,
559 model_store=None, data_store=None, **kwargs):
560 """
561 return the matching backend for the given obj
562
563 Returns:
564 the first backend that supports the given parameters or None
565
566 .. versionchanged:: 0.18
567 backends are tested in order of MDREGISTRY.KINDS, see .register_backend()
568 """
569 model_store = model_store or self
570 data_store = data_store or self
571 meta = self.metadata(name) if name else None
572 kind = kind or (meta.kind if meta is not None else None)
573 backend = None
574 if kind:
575 if kind in self.defaults.OMEGA_STORE_BACKENDS:
576 backend = self.get_backend_bykind(kind, data_store=data_store, model_store=model_store)
577 if not backend.supports(obj, name, attributes=attributes,
578 data_store=data_store,
579 model_store=model_store,
580 meta=meta,
581 kind=kind, **kwargs):
582 objtype = str(type(obj))
583 warnings.warn('Backend {kind} does not support {objtype}'.format(**locals()))
584 else:
585 # revert to core backend
586 backend = self.get_backend_bykind('core.object', model_store=model_store, data_store=data_store)
587 else:
588 # sort by order in MDREGISTRY.KINDS, only using kinds that actually have a registered backend
589 sorted_backends = (k for k in MDREGISTRY.KINDS if k in self.defaults.OMEGA_STORE_BACKENDS)
590 for backend_kind in sorted_backends:
591 backend = self.get_backend_bykind(backend_kind, data_store=data_store, model_store=model_store)
592 if backend.supports(obj, name, attributes=attributes,
593 data_store=data_store, model_store=model_store,
594 **kwargs):
595 break
596 else:
597 backend = None
598 return backend
599
[docs]
600 def getl(self, *args, **kwargs):
601 """ return a lazy MDataFrame for a given object
602
603 Same as .get, but returns a MDataFrame
604
605 """
606 return self.get(*args, lazy=True, **kwargs)
607
[docs]
608 def get(self, name, version=-1, force_python=False,
609 kind=None, model_store=None, data_store=None, **kwargs):
610 """
611 Retrieve an object
612
613 :param name: The name of the object
614 :param version: Version of the stored object (not supported)
615 :param force_python: Return as a python object
616 :param kwargs: kwargs depending on object kind
617 :return: an object, estimator, pipelines, data array or pandas dataframe
618 previously stored with put()
619 """
620 meta = self.metadata(name, version=version)
621 if meta is None:
622 return None
623 if not force_python:
624 backend = (self.get_backend(name, model_store=model_store,
625 data_store=data_store, **kwargs)
626 if not kind else self.get_backend_bykind(kind, model_store=model_store,
627 data_store=data_store, **kwargs))
628 if backend is not None:
629 # FIXME: some backends need to get model_store, data_store, but fails tests
630 return backend.get(name, **kwargs) # model_store=model_store, data_store=data_store, **kwargs)
631 # catch-call to CoreObjectsBackend or force python
632 # -- keeping the same behavior until version 0.17, handling all other KINDs
633 core_backend = self.get_backend_bykind('core.object', model_store=model_store, data_store=data_store, **kwargs)
634 if force_python:
635 return core_backend.get_object_as_python(meta, version=version)
636 return core_backend.get(name, version=version, force_python=force_python, **kwargs)
637
638 def __iter__(self):
639 for f in self.list(include_temp=True):
640 yield f
641
642 @property
643 def buckets(self):
644 return ['default' if b == self.defaults.OMEGA_MONGO_COLLECTION else b
645 for b in self._Metadata.objects.distinct('bucket')]
646
[docs]
647 def list(self, pattern=None, regexp=None, kind=None, raw=False, hidden=None,
648 include_temp=False, bucket=None, prefix=None, filter=None):
649 """
650 List all files in store
651
652 specify pattern as a unix pattern (e.g. :code:`models/*`,
653 or specify regexp)
654
655 :param pattern: the unix file pattern or None for all
656 :param regexp: the regexp. takes precedence over pattern
657 :param raw: if True return the meta data objects
658 :param filter: specify additional filter criteria, optional
659 :return: List of files in store
660
661 """
662 regex = lambda pattern: bson.regex.Regex(f'{pattern}')
663 db = self.mongodb
664 searchkeys = dict(bucket=bucket or self.bucket,
665 prefix=prefix or self.prefix)
666 q_excludes = Q()
667 if regexp:
668 searchkeys['name'] = regex(regexp)
669 elif pattern:
670 re_pattern = pattern.replace('*', '.*').replace('/', r'\/')
671 searchkeys['name'] = regex(f'^{re_pattern}$')
672 if not include_temp:
673 q_excludes &= Q(name__not__startswith='_')
674 q_excludes &= Q(name__not=regex(r'(.{1,*}\/?_.*)'))
675 if not hidden:
676 q_excludes &= Q(name__not__startswith='.')
677 if kind or self.force_kind:
678 kind = kind or self.force_kind
679 if isinstance(kind, (tuple, list)):
680 searchkeys.update(kind__in=kind)
681 else:
682 searchkeys.update(kind=kind)
683 if filter:
684 searchkeys.update(filter)
685 q_search = Q(**searchkeys) & q_excludes
686 files = self._Metadata.objects.no_cache()(q_search)
687 return [f if raw else str(f.name).replace('.omm', '') for f in files]
688
[docs]
689 def exists(self, name, hidden=False):
690 """ check if object exists
691
692 Args:
693 name (str): name of object
694 hidden (bool): if True, include hidden files, defaults to
695 False, unless name starts with '.'
696
697 Returns:
698 bool, True if object exists
699
700 .. versionchanged:: 0.16.4
701 hidden defaults to True if name starts with '.'
702 """
703 hidden = True if name.startswith('.') else hidden
704 return name in self.list(name, hidden=hidden)
705
[docs]
706 def object_store_key(self, name, ext, hashed=None):
707 """
708 Returns the store key
709
710 Unless you write a mixin or a backend you should not use this method
711
712 :param name: The name of object
713 :param ext: The extension of the filename
714 :param hashed: hash the key to support arbitrary name length, defaults
715 to defaults.OMEGA_STORE_HASHEDNAMES, True by default since 0.13.7
716
717 :return: A filename with relative bucket, prefix and name
718 """
719 # byte string
720 _u8 = lambda t: t.encode('UTF-8', 'replace') if isinstance(t, str) else t
721 key = self._get_obj_store_key(name, ext)
722 hashed = hashed if hashed is not None else self.defaults.OMEGA_STORE_HASHEDNAMES
723 if hashed:
724 from hashlib import sha1
725 # SEC: CWE-916
726 # - status: wontfix
727 # - reason: hashcode is used purely for name resolution, not a security function
728 hasher = sha1()
729 hasher.update(_u8(key))
730 key = hasher.hexdigest()
731 return key
732
733 def _get_obj_store_key(self, name, ext, prefix=None, bucket=None):
734 # backwards compatilibity implementation of object_store_key()
735 name = '%s.%s' % (name, ext) if not name.endswith(ext) else name
736 filename = '{bucket}.{prefix}.{name}'.format(
737 bucket=bucket or self.bucket,
738 prefix=prefix or self.prefix,
739 name=name,
740 ext=ext).replace('/', '_').replace('..', '.')
741 return filename
742
743 def _ensure_fs_collection(self):
744 # ensure backwards-compatible gridfs access
745 if self.defaults.OMEGA_BUCKET_FS_LEGACY:
746 # prior to 0.13.2 a single gridfs instance was used, always equal to the default collection
747 return self.defaults.OMEGA_MONGO_COLLECTION
748 if self.bucket == self.defaults.OMEGA_MONGO_COLLECTION:
749 # from 0.13.2 onwards, only the default bucket is equal to the default collection
750 # backwards compatibility for existing installations
751 return self.bucket
752 # since 0.13.2, all buckets other than the default use a qualified collection name to
753 # effectively separate files in different buckets, enabling finer-grade access control
754 # and avoiding name collisions from different buckets
755 return '{}_{}'.format(self.defaults.OMEGA_MONGO_COLLECTION, self.bucket)
756
757 def _ensure_fs_index(self, fs):
758 # make sure we have proper chunks and file indicies. this should be created on first write, but sometimes is not
759 # see https://docs.mongodb.com/manual/core/gridfs/#gridfs-indexes
760 # pymongo since 4.1 no longer has fs._GridFS
761 chunks_collection = fs._GridFS__chunks if hasattr(fs, '_GridFS__chunks') else fs._chunks
762 files_collection = fs._GridFS__files if hasattr(fs, '_GridFS__files') else fs._files
763 ensure_index(chunks_collection, {'files_id': 1, 'n': 1}, unique=True)
764 ensure_index(files_collection, {'filename': 1, 'uploadDate': 1})
765
766 def sign(self, filter):
767 return signature(filter)
768
769 @staticmethod
770 def _cleanup(objrepr, tmppath=None):
771 # cleanup any temporary files on exit
772 # -- this is called as a finalizer (weakref.finalize)
773 try:
774 shutil.rmtree(tmppath, ignore_errors=True)
775 except Exception as e:
776 logger.error(f'finalizing {objrepr}: error occured as {e}')
777 return
778 try:
779 logger.debug(f'finalizing {objrepr}: cleaning up temporary files in {tmppath}')
780 except Exception:
781 pass