1from __future__ import absolute_import
2
3import datetime
4from mongoengine.base.fields import ObjectIdField
5from mongoengine.document import Document
6from mongoengine.fields import (
7 StringField, FileField, DictField, DateTimeField
8)
9from mongoengine.pymongo_support import LEGACY_JSON_OPTIONS
10from pymongo.errors import OperationFailure
11
12from omegaml.util import settings
13
14
15# default kinds of objects
16class MDREGISTRY:
17 PANDAS_DFROWS = 'pandas.dfrows' # dataframe
18 PANDAS_SEROWS = 'pandas.serows' # series
19 PANDAS_HDF = 'pandas.hdf'
20 PYTHON_DATA = 'python.data'
21 PANDAS_DFGROUP = 'pandas.dfgroup'
22 SKLEARN_JOBLIB = 'sklearn.joblib'
23 OMEGAML_JOBS = 'script.ipynb'
24 SPARK_MLLIB = 'spark.mllib'
25 OMEGAML_RUNNING_JOBS = 'job.run'
26 MINIBATCH_STREAM = 'stream.minibatch'
27
28 #: the list of accepted data types. extend using OmegaStore.register_backend
29 KINDS = [
30 PANDAS_DFROWS, PANDAS_SEROWS, PANDAS_HDF, PYTHON_DATA, SKLEARN_JOBLIB,
31 PANDAS_DFGROUP, OMEGAML_JOBS, OMEGAML_RUNNING_JOBS, SPARK_MLLIB, MINIBATCH_STREAM,
32 ]
33
34
77
78
79def make_Metadata(db_alias='omega', collection=None):
80 # this is to create context specific Metadata class that takes the
81 # database from the given alias at the time of use
82 from omegaml.documents import Metadata as Metadata_base
83 collection = collection or settings().OMEGA_MONGO_COLLECTION
84 class Metadata(Metadata_base, Document):
85 # override db_alias in gridfile
86 gridfile = FileField(
87 db_alias=db_alias,
88 collection_name=collection)
89 # the actual db is defined at runtime
90 meta = {
91 'db_alias': db_alias,
92 'strict': False,
93 'indexes': [
94 # unique entry
95 {
96 'fields': ['bucket', 'prefix', 'name'],
97 },
98 'created', # most recent is last, i.e. [-1]
99 ]
100 }
101
102 def __new__(cls, *args, **kwargs):
103 # undo the Metadata.__new__ protection
104 newcls = super(Metadata, cls).__real_new__(cls)
105 return newcls
106
107 def __eq__(self, other):
108 return self.objid == other.objid
109
110 def __unicode__(self):
111 fields = ('name', 'bucket', 'prefix', 'created', 'kind')
112 kwargs = ('%s=%s' % (k, getattr(self, k))
113 for k in self._fields.keys() if k in fields)
114 return u"Metadata(%s)" % ','.join(kwargs)
115
116 def save(self, *args, **kwargs):
117 assert self.name is not None, "a dataset name is needed before saving"
118 self.modified = datetime.datetime.now()
119 return super(Metadata_base, self).save(*args, **kwargs)
120
121 def to_json(self, **kwargs):
122 kwargs['json_options'] = kwargs.get('json_options',
123 LEGACY_JSON_OPTIONS)
124 return super().to_json(**kwargs)
125
126 def to_dict(self):
127 return self.to_mongo().to_dict()
128
129 @classmethod
130 def ensure_indexes(cls):
131 # work around to https://github.com/MongoEngine/mongoengine/issues/2502
132 # read-only users cannot create indexes
133 try:
134 super().ensure_indexes()
135 except OperationFailure:
136 pass
137
138 return Metadata
139
140
141def make_QueryCache(db_alias='omega'):
142 class QueryCache(Document):
143 collection = StringField()
144 key = StringField()
145 value = DictField()
146 meta = {
147 'db_alias': db_alias,
148 'indexes': [
149 'key',
150 ]
151 }
152
153 return QueryCache
154
155
156def raise_on_use(exc):
157 def inner(*args, **kwargs):
158 raise exc
159
160 return inner
161
162
163Metadata.__real_new__ = Metadata.__new__
164Metadata.__new__ = raise_on_use(NameError("You must use make_Metadata()() to instantiate a working object"))