1""" Native storage for OmegaML using mongodb as the storage layer
2
3An OmegaStore instance is a MongoDB database. It has at least the
4metadata collection which lists all objects stored in it. A metadata
5document refers to the following types of objects (metadata.kind):
6
7* pandas.dfrows - a Pandas DataFrame stored as a collection of rows
8* sklearn.joblib - a scikit learn estimator/pipline dumped using joblib.dump()
9* python.data - an arbitrary python dict, tuple, list stored as a document
10
11Note that storing Pandas and scikit learn objects requires the availability
12of the respective packages. If either can not be imported, the OmegaStore
13degrades to a python.data store only. It will still .list() and get() any
14object, however reverts to pure python objects. In this case it is up
15to the client to convert the data into an appropriate format for processing.
16
17Pandas and scikit-learn objects can only be stored if these packages are
18availables. put() raises a TypeError if you pass such objects and these
19modules cannot be loaded.
20
21All data are stored within the same mongodb, in per-object collections
22as follows:
23
24 * .metadata
25 all metadata. each object is one document,
26 See **omegaml.documents.Metadata** for details
27 * .<bucket>.files
28 this is the GridFS instance used to store
29 blobs (models, numpy, hdf). The actual file name
30 will be <prefix>/<name>.<ext>, where ext is
31 optionally generated by put() / get().
32 * .<bucket>.<prefix>.<name>.data
33 every other dataset is stored in a separate
34 collection (dataframes, dicts, lists, tuples).
35 Any forward slash in prefix is ignored (e.g. 'data/'
36 becomes 'data')
37
38 DataFrames by default are stored in their own collection, every
39 row becomes a document. To store dataframes as a binary file,
40 use `put(...., as_hdf=True).` `.get()` will always return a dataframe.
41
42 Python dicts, lists, tuples are stored as a single document with
43 a `.data` attribute holding the JSON-converted representation. `.get()`
44 will always return the corresponding python object of .data.
45
46 Models are joblib.dump()'ed and ziped prior to transferring into
47 GridFs. .get() will always unzip and joblib.load() before returning
48 the model. Note this requires that the process using .get() supports
49 joblib as well as all python classes referred to. If joblib is not
50 supported, .get() returns a file-like object.
51
52 The .metadata entry specifies the format used to store each
53 object as well as it's location:
54
55 * metadata.kind
56 the type of object
57 * metadata.name
58 the name of the object, as given on put()
59 * metadata.gridfile
60 the gridfs object (if any, null otherwise)
61 * metadata.collection
62 the name of the collection
63 * metadata.attributes
64 arbitrary custom attributes set in
65 put(attributes=obj). This is used e.g. by
66 OmegaRuntime's fit() method to record the
67 data used in the model's training.
68
69 **.put()** and **.get()** use helper methods specific to the type in
70 object's type and metadata.kind, respectively. In the future
71 a plugin system will enable extension to other types.
72"""
73from __future__ import absolute_import
74
75import logging
76import os
77import shutil
78import warnings
79import weakref
80from uuid import uuid4
81
82import bson
83import gridfs
84from mongoengine.connection import disconnect, connect, _connections, get_db
85from mongoengine.errors import DoesNotExist
86from mongoengine.queryset.visitor import Q
87
88from omegaml.documents import make_Metadata, MDREGISTRY
89from omegaml.mongoshim import sanitize_mongo_kwargs, waitForConnection
90from omegaml.util import load_class, extend_instance, ensure_index, PickableCollection, signature
91from omegaml.util import settings as omega_settings, urlparse
92
93logger = logging.getLogger(__name__)
94
95
[docs]
96class OmegaStore(object):
97 """ The storage backend for models and data
98
99 .. versionchanged:: NEXT
100 refactored all methods handling Python and Pandas datatypes to omegaml.backends.coreobjects.CoreObjectsBackend
101 """
102
103 def __init__(self, mongo_url=None, bucket=None, prefix=None, kind=None, defaults=None, dbalias=None):
104 """
105 :param mongo_url: the mongourl to use for the gridfs
106 :param bucket: the mongo collection to use for gridfs
107 :param prefix: the path prefix for files. defaults to blank
108 :param kind: the kind or list of kinds to limit this store to
109 """
110 self.defaults = defaults or omega_settings()
111 self.mongo_url = mongo_url or self.defaults.OMEGA_MONGO_URL
112 self.bucket = bucket or self.defaults.OMEGA_MONGO_COLLECTION
113 self._fs_collection = self._ensure_fs_collection()
114 self._fs = None
115 self._tmppath = os.path.join(self.defaults.OMEGA_TMP, uuid4().hex)
116 self.prefix = prefix or ''
117 self.force_kind = kind
118 self._Metadata_cls = None
119 # don't initialize db here to avoid using the default settings
120 # otherwise Metadata will already have a connection and not use
121 # the one provided in override_settings
122 self._db = None
123 self._dbalias = dbalias
124 # add backends and mixins
125 self._apply_mixins()
126 # register backends
127 self.register_backends()
128 # register finalizer for cleanup
129 weakref.finalize(self, self._cleanup, repr(self), tmppath=str(self._tmppath))
130
131 def __repr__(self):
132 return 'OmegaStore(bucket={}, prefix={})'.format(self.bucket, self.prefix)
133
134 def __equal__(self, other):
135 """test for equality of OmegaStore instances
136
137 Args:
138 other: OmegaStore instance
139
140 Returns:
141 True if other is the same database, same bucket, same prefix
142 """
143 return self.mongo_url == other.mongo_url and self.bucket == other.bucket and self.prefix == other.prefix
144
145 @property
146 def tmppath(self):
147 """
148 return an instance-specific temporary path
149 """
150 os.makedirs(self._tmppath, exist_ok=True)
151 return self._tmppath
152
153 @property
154 def mongodb(self):
155 """
156 Returns a mongo database object
157 """
158 if self._db is not None:
159 return self._db
160 # parse salient parts of mongourl, e.g.
161 # mongodb://user:password@host/dbname
162 self.parsed_url = urlparse.urlparse(self.mongo_url)
163 self.database_name = self.parsed_url.path[1:]
164 host = self.parsed_url.netloc
165 scheme = self.parsed_url.scheme
166 username, password = None, None
167 if '@' in host:
168 creds, host = host.split('@', 1)
169 if ':' in creds:
170 username, password = creds.split(':')
171 # connect via mongoengine
172 #
173 # note this uses a MongoClient in the background, with pooled
174 # connections. there are multiprocessing issues with pymongo:
175 # http://api.mongodb.org/python/3.2/faq.html#using-pymongo-with-multiprocessing
176 # connect=False is due to https://jira.mongodb.org/browse/PYTHON-961
177 # this defers connecting until the first access
178 # serverSelectionTimeoutMS=2500 is to fail fast, the default is 30000
179 #
180 # use an instance specific alias, note that access to Metadata and
181 # QueryCache must pass the very same alias
182 self._dbalias = alias = self._dbalias or 'omega-{}'.format(uuid4().hex)
183 # always disconnect before registering a new connection because
184 # mongoengine.connect() forgets all connection settings upon disconnect
185 if alias not in _connections:
186 disconnect(alias)
187 connection = connect(alias=alias, db=self.database_name,
188 host=f'{scheme}://{host}',
189 username=username,
190 password=password,
191 connect=False,
192 authentication_source='admin',
193 **sanitize_mongo_kwargs(self.defaults.OMEGA_MONGO_SSL_KWARGS),
194 )
195 # since PyMongo 4, connect() no longer waits for connection
196 waitForConnection(connection)
197 self._db = get_db(alias)
198 return self._db
199
200 @property
201 def _Metadata(self):
202 if self._Metadata_cls is None:
203 # hack to localize metadata
204 db = self.mongodb
205 self._Metadata_cls = make_Metadata(db_alias=self._dbalias,
206 collection=self._fs_collection)
207 return self._Metadata_cls
208
209 @property
210 def fs(self):
211 """
212 Retrieve a gridfs instance using url and collection provided
213
214 :return: a gridfs instance
215 """
216 if self._fs is not None:
217 return self._fs
218 self._fs = gridfs.GridFS(self.mongodb, collection=self._fs_collection)
219 self._ensure_fs_index(self._fs)
220 return self._fs
221
238
264
265 def _make_metadata(self, name=None, bucket=None, prefix=None, **kwargs):
266 """
267 create or update a metadata object
268
269 this retrieves a Metadata object if it exists given the kwargs. Only
270 the name, prefix and bucket arguments are considered
271
272 for existing Metadata objects, the attributes kw is treated as follows:
273
274 * attributes=None, the existing attributes are left as is
275 * attributes={}, the attributes value on an existing metadata object
276 is reset to the empty dict
277 * attributes={ some : value }, the existing attributes are updated
278
279 For new metadata objects, attributes defaults to {} if not specified,
280 else is set as provided.
281
282 :param name: the object name
283 :param bucket: the bucket, optional, defaults to self.bucket
284 :param prefix: the prefix, optional, defaults to self.prefix
285 """
286 bucket = bucket or self.bucket
287 prefix = prefix or self.prefix
288 meta = self.metadata(name=name,
289 prefix=prefix,
290 bucket=bucket)
291 if meta:
292 dict_fields = 'attributes', 'kind_meta'
293 for k, v in kwargs.items():
294 if k in dict_fields and v is not None and len(v) > 0:
295 previous = getattr(meta, k, {})
296 previous.update(v)
297 setattr(meta, k, previous)
298 elif k in dict_fields and v is not None and len(v) == 0:
299 setattr(meta, k, {})
300 elif k in dict_fields and v is None:
301 # ignore non specified attributes
302 continue
303 else:
304 # by default set whatever attribute is provided
305 setattr(meta, k, v)
306 else:
307 meta = self._Metadata(name=name, bucket=bucket, prefix=prefix,
308 **kwargs)
309 return meta
310
311 def _drop_metadata(self, name=None, **kwargs):
312 # internal method to delete meta data of an object
313 meta = self.metadata(name, **kwargs)
314 if meta is not None:
315 meta.delete()
316
[docs]
317 def collection(self, name=None, bucket=None, prefix=None):
318 """
319 Returns a mongo db collection as a datastore
320
321 If there is an existing object of name, will return the .collection
322 of the object. Otherwise returns the collection according to naming
323 convention {bucket}.{prefix}.{name}.datastore
324
325 :param name: the collection to use. if none defaults to the
326 collection name given on instantiation. the actual collection name
327 used is always prefix + name + '.data'
328 """
329 # see if we have a known object and a collection for that, if not define it
330 meta = self.metadata(name, bucket=bucket, prefix=prefix)
331 collection = meta.collection if meta else None
332 if not collection:
333 collection = self.object_store_key(name, '.datastore')
334 collection = collection.replace('..', '.')
335 # return the collection
336 try:
337 datastore = getattr(self.mongodb, collection)
338 except Exception as e:
339 raise e
340 return PickableCollection(datastore)
341
342 def _apply_mixins(self):
343 """
344 apply mixins in defaults.OMEGA_STORE_MIXINS
345 """
346 for mixin in self.defaults.OMEGA_STORE_MIXINS:
347 conditional = self._mixins_conditional
348 extend_instance(self, mixin,
349 conditional=conditional)
350
351 def _mixins_conditional(self, cls, obj):
352 return cls.supports(obj) if hasattr(cls, 'supports') else True
353
[docs]
354 def register_backends(self):
355 """
356 register backends in defaults.OMEGA_STORE_BACKENDS
357 """
358 # enable list modification within loop
359 # -- avoid RuntimeError: dictionary changed size during iteration
360 backends = list(self.defaults.OMEGA_STORE_BACKENDS.items())
361 for kind, backend in backends:
362 self.register_backend(kind, backend)
363
[docs]
364 def register_backend(self, kind, backend, index=-1):
365 """
366 register a backend class
367
368 :param kind: (str) the backend kind
369 :param backend: (class) the backend class
370 :param index: (int) the insert position, defaults to -1, which means
371 to append
372
373 .. versionchanged:: NEXT
374 added index to have more control over backend evaluation by .get_backend_byobj()
375
376 .. versionchanged:: NEXT
377 backends can specify cls.KIND_EXT to register additional kinds
378 """
379 backend_cls = load_class(backend)
380 backend_kinds = [backend_cls.KIND] + list(getattr(backend_cls, 'KIND_EXT', []))
381 for kind in backend_kinds:
382 self.defaults.OMEGA_STORE_BACKENDS[kind] = backend_cls
383 if kind not in MDREGISTRY.KINDS:
384 pos = len(MDREGISTRY.KINDS) + index if index < 0 else index
385 MDREGISTRY.KINDS.insert(pos, kind)
386 return self
387
[docs]
388 def register_mixin(self, mixincls):
389 """
390 register a mixin class
391
392 :param mixincls: (class) the mixin class
393 """
394 extend_instance(self, mixincls)
395 return self
396
[docs]
397 def put(self, obj, name, attributes=None, kind=None, replace=False, model_store=None,
398 data_store=None, **kwargs):
399 """
400 Stores an object, store estimators, pipelines, numpy arrays or
401 pandas dataframes
402 """
403 if replace:
404 self.drop(name, force=True)
405 backend = self.get_backend_byobj(obj, name, attributes=attributes, kind=kind,
406 model_store=model_store, data_store=data_store,
407 **kwargs)
408 if backend:
409 return backend.put(obj, name, attributes=attributes, **kwargs)
410 raise TypeError('type %s not supported' % type(obj))
411
[docs]
412 def drop(self, name, force=False, version=-1, report=False, **kwargs):
413 """
414 Drop the object
415
416 :param name: The name of the object. If the name is a pattern it will
417 be expanded using .list(), and call .drop() on every obj found.
418 :param force: If True ignores DoesNotExist exception, defaults to False
419 meaning this raises a DoesNotExist exception if the name does not
420 exist
421 :param report: if True returns a dict name=>status, where status is True
422 if deleted, False if not deleted
423 :return: True if object was deleted, False if not.
424 If force is True and the object does not exist it will still return True
425 :raises: DoesNotExist if the object does not exist and ```force=False```
426 """
427 is_pattern = '*' in name
428 objs = [name] if not is_pattern else self.list(name)
429 results = []
430 for name in objs:
431 try:
432 backend = self.get_backend(name)
433 drop = backend.drop if backend else self._drop
434 result = drop(name, force=force, version=version, **kwargs)
435 except Exception as e:
436 result = False
437 if not force and not is_pattern:
438 raise
439 if force:
440 result = self._drop(name, force=force, version=version)
441 results.append((name, result))
442 if not objs:
443 result = self._drop(name, force=force, version=version)
444 results.append((name, result))
445 return dict(results) if report else len(results) > 0
446
447 def _drop(self, name, force=False, version=-1, keep_data=False, **kwargs):
448 meta = self.metadata(name, version=version)
449 if meta is None and not force:
450 raise DoesNotExist(name)
451 collection = self.collection(name)
452 if collection and not keep_data:
453 self.mongodb.drop_collection(collection.name)
454 if meta:
455 if meta.collection and not keep_data:
456 self.mongodb.drop_collection(meta.collection)
457 if meta and meta.gridfile is not None and not keep_data:
458 meta.gridfile.delete()
459 self._drop_metadata(name)
460 return True
461 return False
462
[docs]
463 def get_backend_bykind(self, kind, model_store=None, data_store=None,
464 **kwargs):
465 """
466 return the backend by a given object kind
467
468 :param kind: The object kind
469 :param model_store: the OmegaStore instance used to store models
470 :param data_store: the OmegaStore instance used to store data
471 :param kwargs: the kwargs passed to the backend initialization
472 :return: the backend
473 """
474 try:
475 backend_cls = load_class(self.defaults.OMEGA_STORE_BACKENDS[kind])
476 except KeyError as e:
477 raise ValueError('backend {kind} does not exist'.format(**locals()))
478 model_store = model_store or self
479 data_store = data_store or self
480 backend = backend_cls(model_store=model_store,
481 data_store=data_store, **kwargs)
482 return backend
483
[docs]
484 def get_backend(self, name, model_store=None, data_store=None, **kwargs):
485 """
486 return the backend by a given object name
487
488 :param kind: The object kind
489 :param model_store: the OmegaStore instance used to store models
490 :param data_store: the OmegaStore instance used to store data
491 :param kwargs: the kwargs passed to the backend initialization
492 :return: the backend
493 """
494 meta = self.metadata(name)
495 if meta is not None and meta.kind in self.defaults.OMEGA_STORE_BACKENDS:
496 return self.get_backend_bykind(meta.kind,
497 model_store=model_store,
498 data_store=data_store,
499 **kwargs)
500 return None
501
[docs]
502 def help(self, name_or_obj=None, kind=None, raw=False, display=None, renderer=None):
503 """ get help for an object by looking up its backend and calling help() on it
504
505 Retrieves the object's metadata and looks up its corresponding backend. If the
506 metadata.attributes['docs'] is a string it will display this as the help() contents.
507 If the string starts with 'http://' or 'https://' it will open the web page.
508
509 Args:
510 name_or_obj (str|obj): the name or actual object to get help for
511 kind (str): optional, if specified forces retrieval of backend for the given kind
512 raw (bool): optional, if True forces help to be the backend type of the object.
513 If False returns the attributes[docs] on the object's metadata, if available.
514 Defaults to False
515 display (fn): optional, callable for interactive display, defaults to help in
516 if sys.flags.interactive is True, else uses pydoc.render_doc with plaintext
517 renderer (fn): optional, the renderer= argument for pydoc.render_doc to use if
518 sys.flags.interactive is False and display is not provided
519
520 Returns:
521 * help(obj) if python is in interactive mode
522 * text(str) if python is in not interactive mode
523 """
524 import sys
525 import pydoc
526 interactive = bool(display) if display is not None else sys.flags.interactive
527 display = display or help
528 renderer = renderer or pydoc.plaintext
529 obj = self._resolve_help_backend(name_or_obj=name_or_obj, kind=kind, raw=raw)
530 if any(str(obj.__doc__).startswith(v) for v in ('https://', 'http://')):
531 obj = obj.__doc__
532 if interactive and display is help:
533 import webbrowser
534 display = webbrowser.open
535 return display(obj) if interactive else pydoc.render_doc(obj, renderer=renderer)
536
537 def _resolve_help_backend(self, name_or_obj=None, kind=None, raw=False):
538 # helper so we can test help
539 meta = self.metadata(name_or_obj) if name_or_obj else None
540 if kind:
541 backend = self.get_backend_bykind(kind)
542 else:
543 backend = self.get_backend(name_or_obj) or self.get_backend_byobj(name_or_obj)
544 if backend is None:
545 backend = self.get_backend_bykind('core.object', model_store=self, data_store=self)
546 if not raw and meta is not None and 'docs' in meta.attributes:
547 def UserDocumentation():
548 pass
549
550 basedoc = backend.__doc__ or ''
551 UserDocumentation.__doc__ = (basedoc +
552 meta.attributes['docs'])
553 backend = UserDocumentation
554 return backend
555
[docs]
556 def get_backend_byobj(self, obj, name=None, kind=None, attributes=None,
557 model_store=None, data_store=None, **kwargs):
558 """
559 return the matching backend for the given obj
560
561 Returns:
562 the first backend that supports the given parameters or None
563
564 .. versionchanged:: NEXT
565 backends are tested in order of MDREGISTRY.KINDS, see .register_backend()
566 """
567 model_store = model_store or self
568 data_store = data_store or self
569 meta = self.metadata(name) if name else None
570 kind = kind or (meta.kind if meta is not None else None)
571 backend = None
572 if kind:
573 if kind in self.defaults.OMEGA_STORE_BACKENDS:
574 backend = self.get_backend_bykind(kind, data_store=data_store, model_store=model_store)
575 if not backend.supports(obj, name, attributes=attributes,
576 data_store=data_store,
577 model_store=model_store,
578 meta=meta,
579 kind=kind, **kwargs):
580 objtype = str(type(obj))
581 warnings.warn('Backend {kind} does not support {objtype}'.format(**locals()))
582 else:
583 # revert to core backend
584 backend = self.get_backend_bykind('core.object', model_store=model_store, data_store=data_store)
585 else:
586 # sort by order in MDREGISTRY.KINDS, only using kinds that actually have a registered backend
587 sorted_backends = (k for k in MDREGISTRY.KINDS if k in self.defaults.OMEGA_STORE_BACKENDS)
588 for backend_kind in sorted_backends:
589 backend = self.get_backend_bykind(backend_kind, data_store=data_store, model_store=model_store)
590 if backend.supports(obj, name, attributes=attributes,
591 data_store=data_store, model_store=model_store,
592 **kwargs):
593 break
594 else:
595 backend = None
596 return backend
597
[docs]
598 def getl(self, *args, **kwargs):
599 """ return a lazy MDataFrame for a given object
600
601 Same as .get, but returns a MDataFrame
602
603 """
604 return self.get(*args, lazy=True, **kwargs)
605
[docs]
606 def get(self, name, version=-1, force_python=False,
607 kind=None, model_store=None, data_store=None, **kwargs):
608 """
609 Retrieve an object
610
611 :param name: The name of the object
612 :param version: Version of the stored object (not supported)
613 :param force_python: Return as a python object
614 :param kwargs: kwargs depending on object kind
615 :return: an object, estimator, pipelines, data array or pandas dataframe
616 previously stored with put()
617 """
618 meta = self.metadata(name, version=version)
619 if meta is None:
620 return None
621 if not force_python:
622 backend = (self.get_backend(name, model_store=model_store,
623 data_store=data_store)
624 if not kind else self.get_backend_bykind(kind, model_store=model_store, data_store=data_store))
625 if backend is not None:
626 # FIXME: some backends need to get model_store, data_store, but fails tests
627 return backend.get(name, **kwargs) # model_store=model_store, data_store=data_store, **kwargs)
628 # catch-call to CoreObjectsBackend or force python
629 # -- keeping the same behavior until version 0.17, handling all other KINDs
630 core_backend = self.get_backend_bykind('core.object', model_store=model_store, data_store=data_store)
631 if force_python:
632 return core_backend.get_object_as_python(meta, version=version)
633 return core_backend.get(name, version=version, force_python=force_python, **kwargs)
634
635 def __iter__(self):
636 for f in self.list(include_temp=True):
637 yield f
638
639 @property
640 def buckets(self):
641 return ['default' if b == self.defaults.OMEGA_MONGO_COLLECTION else b
642 for b in self._Metadata.objects.distinct('bucket')]
643
[docs]
644 def list(self, pattern=None, regexp=None, kind=None, raw=False, hidden=None,
645 include_temp=False, bucket=None, prefix=None, filter=None):
646 """
647 List all files in store
648
649 specify pattern as a unix pattern (e.g. :code:`models/*`,
650 or specify regexp)
651
652 :param pattern: the unix file pattern or None for all
653 :param regexp: the regexp. takes precedence over pattern
654 :param raw: if True return the meta data objects
655 :param filter: specify additional filter criteria, optional
656 :return: List of files in store
657
658 """
659 regex = lambda pattern: bson.regex.Regex(f'{pattern}')
660 db = self.mongodb
661 searchkeys = dict(bucket=bucket or self.bucket,
662 prefix=prefix or self.prefix)
663 q_excludes = Q()
664 if regexp:
665 searchkeys['name'] = regex(regexp)
666 elif pattern:
667 re_pattern = pattern.replace('*', '.*').replace('/', r'\/')
668 searchkeys['name'] = regex(f'^{re_pattern}$')
669 if not include_temp:
670 q_excludes &= Q(name__not__startswith='_')
671 q_excludes &= Q(name__not=regex(r'(.{1,*}\/?_.*)'))
672 if not hidden:
673 q_excludes &= Q(name__not__startswith='.')
674 if kind or self.force_kind:
675 kind = kind or self.force_kind
676 if isinstance(kind, (tuple, list)):
677 searchkeys.update(kind__in=kind)
678 else:
679 searchkeys.update(kind=kind)
680 if filter:
681 searchkeys.update(filter)
682 q_search = Q(**searchkeys) & q_excludes
683 files = self._Metadata.objects.no_cache()(q_search)
684 return [f if raw else str(f.name).replace('.omm', '') for f in files]
685
[docs]
686 def exists(self, name, hidden=False):
687 """ check if object exists
688
689 Args:
690 name (str): name of object
691 hidden (bool): if True, include hidden files, defaults to
692 False, unless name starts with '.'
693
694 Returns:
695 bool, True if object exists
696
697 .. versionchanged:: 0.16.4
698 hidden defaults to True if name starts with '.'
699 """
700 hidden = True if name.startswith('.') else hidden
701 return name in self.list(name, hidden=hidden)
702
[docs]
703 def object_store_key(self, name, ext, hashed=None):
704 """
705 Returns the store key
706
707 Unless you write a mixin or a backend you should not use this method
708
709 :param name: The name of object
710 :param ext: The extension of the filename
711 :param hashed: hash the key to support arbitrary name length, defaults
712 to defaults.OMEGA_STORE_HASHEDNAMES, True by default since 0.13.7
713
714 :return: A filename with relative bucket, prefix and name
715 """
716 # byte string
717 _u8 = lambda t: t.encode('UTF-8', 'replace') if isinstance(t, str) else t
718 key = self._get_obj_store_key(name, ext)
719 hashed = hashed if hashed is not None else self.defaults.OMEGA_STORE_HASHEDNAMES
720 if hashed:
721 from hashlib import sha1
722 # SEC: CWE-916
723 # - status: wontfix
724 # - reason: hashcode is used purely for name resolution, not a security function
725 hasher = sha1()
726 hasher.update(_u8(key))
727 key = hasher.hexdigest()
728 return key
729
730 def _get_obj_store_key(self, name, ext, prefix=None, bucket=None):
731 # backwards compatilibity implementation of object_store_key()
732 name = '%s.%s' % (name, ext) if not name.endswith(ext) else name
733 filename = '{bucket}.{prefix}.{name}'.format(
734 bucket=bucket or self.bucket,
735 prefix=prefix or self.prefix,
736 name=name,
737 ext=ext).replace('/', '_').replace('..', '.')
738 return filename
739
740 def _ensure_fs_collection(self):
741 # ensure backwards-compatible gridfs access
742 if self.defaults.OMEGA_BUCKET_FS_LEGACY:
743 # prior to 0.13.2 a single gridfs instance was used, always equal to the default collection
744 return self.defaults.OMEGA_MONGO_COLLECTION
745 if self.bucket == self.defaults.OMEGA_MONGO_COLLECTION:
746 # from 0.13.2 onwards, only the default bucket is equal to the default collection
747 # backwards compatibility for existing installations
748 return self.bucket
749 # since 0.13.2, all buckets other than the default use a qualified collection name to
750 # effectively separate files in different buckets, enabling finer-grade access control
751 # and avoiding name collisions from different buckets
752 return '{}_{}'.format(self.defaults.OMEGA_MONGO_COLLECTION, self.bucket)
753
754 def _ensure_fs_index(self, fs):
755 # make sure we have proper chunks and file indicies. this should be created on first write, but sometimes is not
756 # see https://docs.mongodb.com/manual/core/gridfs/#gridfs-indexes
757 # pymongo since 4.1 no longer has fs._GridFS
758 chunks_collection = fs._GridFS__chunks if hasattr(fs, '_GridFS__chunks') else fs._chunks
759 files_collection = fs._GridFS__files if hasattr(fs, '_GridFS__files') else fs._files
760 ensure_index(chunks_collection, {'files_id': 1, 'n': 1}, unique=True)
761 ensure_index(files_collection, {'filename': 1, 'uploadDate': 1})
762
763 def sign(self, filter):
764 return signature(filter)
765
766 @staticmethod
767 def _cleanup(objrepr, tmppath=None):
768 # cleanup any temporary files on exit
769 # -- this is called as a finalizer (weakref.finalize)
770 try:
771 logger.debug(f'finalizing {objrepr}: cleaning up temporary files in {tmppath}')
772 shutil.rmtree(tmppath, ignore_errors=True)
773 except Exception as e:
774 logger.error(f'finalizing {objrepr}: error occured as {e}')