Source code for omegaml.documents

  1from __future__ import absolute_import
  2
  3import datetime
  4
  5from mongoengine.base.fields import ObjectIdField
  6from mongoengine.document import Document
  7from mongoengine.fields import (
  8    StringField, FileField, DictField, DateTimeField
  9)
 10from mongoengine.pymongo_support import LEGACY_JSON_OPTIONS
 11from pymongo.errors import OperationFailure
 12
 13from omegaml.util import settings
 14
 15
 16# default kinds of objects
 17class MDREGISTRY:
 18    PANDAS_DFROWS = 'pandas.dfrows'  # dataframe
 19    PANDAS_SEROWS = 'pandas.serows'  # series
 20    PANDAS_HDF = 'pandas.hdf'
 21    PYTHON_DATA = 'python.data'
 22    PANDAS_DFGROUP = 'pandas.dfgroup'
 23    SKLEARN_JOBLIB = 'sklearn.joblib'
 24    OMEGAML_JOBS = 'script.ipynb'
 25    SPARK_MLLIB = 'spark.mllib'
 26    OMEGAML_RUNNING_JOBS = 'job.run'
 27    MINIBATCH_STREAM = 'stream.minibatch'
 28
 29    #: the list of accepted data types. extend using OmegaStore.register_backend
 30    KINDS = []
 31
 32
[docs] 33class Metadata: 34 """ 35 Metadata stores information about objects in OmegaStore 36 """ 37 38 # NOTE THIS IS ONLY HERE FOR DOCUMENTATION PURPOSE. 39 # 40 # If you use this class to save a document, it will raise a NameError 41 # 42 # The actual Metadata class is created in make_Metadata() below. 43 # Rationale: If we let mongoengine create Metadata here the class 44 # is bound to a specific MongoClient instance. Using make_Metadata 45 # binds the class to the specific instance that exists at the time 46 # of creation. Open to better ways. 47 48 # fields 49 #: this is the name of the data 50 name = StringField(unique_with=['bucket', 'prefix']) 51 #: bucket 52 bucket = StringField() 53 #: prefix 54 prefix = StringField() 55 #: kind of data 56 kind = StringField(choices=MDREGISTRY.KINDS) 57 #: for PANDAS_HDF and SKLEARN_JOBLIB this is the gridfile 58 gridfile = FileField() 59 #: for PANDAS_DFROWS this is the collection 60 collection = StringField() 61 #: for PYTHON_DATA this is the actual document 62 objid = ObjectIdField() 63 #: omegaml technical attributes, e.g. column indicies 64 kind_meta = DictField() 65 #: customer-defined other meta attributes 66 attributes = DictField() 67 #: s3file attributes 68 s3file = DictField() 69 #: location URI 70 uri = StringField() 71 #: created datetime 72 created = DateTimeField(default=datetime.datetime.now) 73 #: created datetime 74 modified = DateTimeField(default=datetime.datetime.now)
75 76 77def make_Metadata(db_alias='omega', collection=None): 78 # this is to create context specific Metadata class that takes the 79 # database from the given alias at the time of use 80 from omegaml.documents import Metadata as Metadata_base 81 collection = collection or settings().OMEGA_MONGO_COLLECTION 82 83 class Metadata(Metadata_base, Document): 84 # override db_alias in gridfile 85 gridfile = FileField( 86 db_alias=db_alias, 87 collection_name=collection) 88 # the actual db is defined at runtime 89 meta = { 90 'db_alias': db_alias, 91 'strict': False, 92 'indexes': [ 93 # unique entry 94 { 95 'fields': ['bucket', 'prefix', 'name'], 96 }, 97 'created', # most recent is last, i.e. [-1] 98 ] 99 } 100 101 def __new__(cls, *args, **kwargs): 102 # undo the Metadata.__new__ protection 103 newcls = super(Metadata, cls).__real_new__(cls) 104 return newcls 105 106 def __eq__(self, other): 107 return self.objid == other.objid 108 109 def __unicode__(self): 110 fields = ('name', 'bucket', 'prefix', 'created', 'kind') 111 kwargs = ('%s=%s' % (k, getattr(self, k)) 112 for k in self._fields.keys() if k in fields) 113 return u"Metadata(%s)" % ','.join(kwargs) 114 115 def save(self, *args, **kwargs): 116 assert self.name is not None, "a dataset name is needed before saving" 117 self.modified = datetime.datetime.now() 118 return super(Metadata_base, self).save(*args, **kwargs) 119 120 def to_json(self, **kwargs): 121 kwargs['json_options'] = kwargs.get('json_options', 122 LEGACY_JSON_OPTIONS) 123 return super().to_json(**kwargs) 124 125 def to_dict(self): 126 return self.to_mongo().to_dict() 127 128 @classmethod 129 def ensure_indexes(cls): 130 # work around to https://github.com/MongoEngine/mongoengine/issues/2502 131 # read-only users cannot create indexes 132 try: 133 super().ensure_indexes() 134 except OperationFailure: 135 pass 136 137 return Metadata 138 139 140def make_QueryCache(db_alias='omega'): 141 class QueryCache(Document): 142 collection = StringField() 143 key = StringField() 144 value = DictField() 145 meta = { 146 'db_alias': db_alias, 147 'indexes': [ 148 'key', 149 ] 150 } 151 152 return QueryCache 153 154 155def raise_on_use(exc): 156 def inner(*args, **kwargs): 157 raise exc 158 159 return inner 160 161 162Metadata.__real_new__ = Metadata.__new__ 163Metadata.__new__ = raise_on_use(NameError("You must use make_Metadata()() to instantiate a working object"))