1import os
2from uuid import uuid4
3
4from omegaml.util import inprogress
5from ._version import version
6from .mixins.store.requests import CombinedStoreRequestCache
7from .store.combined import CombinedOmegaStoreMixin
8from .store.logging import OmegaSimpleLogger
9
10
[docs]
11class Omega(CombinedStoreRequestCache, CombinedOmegaStoreMixin):
12 """
13 Client API to omegaml
14
15 Provides the following APIs:
16
17 * :code:`datasets` - access to datasets stored in the cluster
18 * :code:`models` - access to models stored in the cluster
19 * :code:`runtimes` - access to the cluster compute resources
20 * :code:`jobs` - access to jobs stored and executed in the cluster
21 * :code:`scripts` - access to lambda modules stored and executed in the cluster
22
23 """
24
25 def __init__(self, defaults=None, mongo_url=None, celeryconf=None, bucket=None,
26 **kwargs):
27 """
28 Initialize the client API
29
30 Without arguments create the client API according to the user's
31 configuration in :code:`~/omegaml/config.yml`.
32
33 Arguments override the user's configuration.
34
35 :param defaults: the DefaultsContext
36 :param mongo_url: the fully qualified URI to the mongo database,
37 of format :code:`mongodb://user:password@host:port/database`
38 :param celeryconf: the celery configuration dictionary
39 """
40 from omegaml.util import settings
41 # avoid circular imports
42 from omegaml.notebook.jobs import OmegaJobs
43 # celery and mongo configuration
44 self.defaults = defaults or settings()
45 self.mongo_url = mongo_url or self.defaults.OMEGA_MONGO_URL
46 self.bucket = bucket
47 # setup storage locations
48 self._dbalias = self._make_dbalias()
49 self.models = self._make_store(prefix='models/')
50 self.datasets = self._make_store(prefix='data/')
51 self._jobdata = self._make_store(prefix='jobs/')
52 self.scripts = self._make_store(prefix='scripts/')
53 # minibatch integration
54 self.streams = self._make_streams(prefix='streams/')
55 # runtimes environments
56 self.runtime = self._make_runtime(celeryconf)
57 self.jobs = OmegaJobs(store=self._jobdata, defaults=self.defaults)
58 # logger
59 self.logger = OmegaSimpleLogger(store=self.datasets, defaults=self.defaults)
60 # stores
61 self._stores = [self.models, self.datasets, self.scripts, self.jobs, self.streams]
62 # monitoring
63 self._monitor = None # will be created by .status() on first access
64
65 def __repr__(self):
66 return f'Omega(bucket={self.bucket})'
67
68 def _clone(self, **kwargs):
69 return self.__class__(defaults=self.defaults,
70 mongo_url=self.mongo_url,
71 **kwargs)
72
73 def _make_runtime(self, celeryconf):
74 from omegaml.runtimes import OmegaRuntime
75 return OmegaRuntime(self, bucket=self.bucket, defaults=self.defaults, celeryconf=celeryconf)
76
77 def _make_store(self, prefix):
78 from omegaml.store import OmegaStore
79 return OmegaStore(mongo_url=self.mongo_url, bucket=self.bucket, prefix=prefix, defaults=self.defaults,
80 dbalias=self._dbalias)
81
82 def _make_dbalias(self):
83 return 'omega-{}'.format(uuid4().hex)
84
85 def _make_streams(self, prefix):
86 from omegaml.store.streams import StreamsProxy
87 return StreamsProxy(mongo_url=self.mongo_url, bucket=self.bucket, prefix=prefix, defaults=self.defaults)
88
89 def _make_monitor(self):
90 import weakref
91 from omegaml.client.lunamon import LunaMonitor, OmegaMonitors
92 status_logger = self.runtime.experiment('.system')
93 for_keys = lambda event, keys: {k: event[k] for k in keys if k in event}
94 on_status = lambda event: (status_logger.use().log_event('monitor', event['check'],
95 for_keys(event,
96 ('status', 'message', 'error',
97 'elapsed')))
98 if event['check'] == 'health' else None)
99 monitor = LunaMonitor(checks=OmegaMonitors.on(self), on_status=on_status, interval=15)
100 weakref.finalize(self, monitor.stop)
101 return monitor
102
[docs]
103 def status(self, check=None, data=False, by_status=False, wait=False):
104 """ get the status of the omegaml cluster
105
106 Args:
107 check (str): the check to run, e.g. 'storage', 'runtime'
108 data (bool): return the monitoring data for the check
109 by_status (bool): return data by status
110 wait (bool): wait for the check to complete
111
112 Returns:
113 dict
114 """
115 if self._monitor is None:
116 # we defer the creation of the monitor to the first access
117 # -- this is to avoid creating a monitor for every instance
118 # -- e.g. in runtime where the instance is created for short-lived tasks,
119 # the monitor would be created and immediately gc'd
120 self._monitor = self._make_monitor()
121 if wait:
122 self._check_connections()
123 return self._monitor.status(check=check, data=data, by_status=by_status)
124
125 def _check_connections(self):
126 return self._monitor.wait_ok()
127
128 def __getitem__(self, bucket):
129 """
130 return Omega instance configured for the given bucket
131
132 Args:
133 bucket (str): the bucket name. If it does not exist
134 it gets created on first storage of an object.
135 If bucket=None returns self.
136
137 Usage:
138 import omegaml as om
139
140 # om is configured on the default bucket
141 # om_mybucket will use the same settings, but configured for mybucket
142 om_mybucket = om['mybucket']
143
144 Returns:
145 Omega instance configured for the given bucket
146 """
147 return self._get_bucket(bucket)
148
149 @property
150 def buckets(self):
151 from itertools import chain
152 return list(set(chain(*[getattr(store, 'buckets', []) for store in self._stores])))
153
154 def _get_bucket(self, bucket):
155 # enable patching in testing
156 if bucket is None or bucket == self.bucket or (bucket == 'default' and self.bucket is None):
157 return self
158 if bucket == 'default':
159 # actual bucket selection is a responsibility of the store, thus we pass None
160 # (it is for good reason: OmegaStore should be instantiatable without giving a specific bucket)
161 bucket = None
162 return self._clone(bucket=bucket)
163
164
165class OmegaDeferredInstance(object):
166 """
167 A deferred instance of Omega() that is only instantiated on access
168
169 This is to ensure that module-level imports don't trigger instantiation
170 of Omega.
171 """
172
173 def __init__(self, base=None, attribute=None):
174 self.omega = 'not initialized -- call .setup() or access an attribute'
175 self.initialized = False
176 self.base = base
177 self.attribute = attribute
178
179 @inprogress(text='loading ...')
180 def setup(self, *args, **kwargs):
181 """loads omegaml
182
183 Loading order
184 - cloud using environment
185 - cloud using config file
186 - local instance
187
188 Returns:
189 omega instance
190 """
191 from omegaml import _base_config
192
193 @inprogress(text='loading base ...')
194 def setup_base():
195 from omegaml import _base_config
196 _base_config.load_framework_support()
197 _base_config.load_user_extensions()
198 return Omega(*args, **kwargs)
199
200 @inprogress(text='loading cloud ...')
201 def setup_cloud():
202 from omegaml.client.cloud import setup
203 return setup(*args, **kwargs)
204
205 @inprogress(text='loading cloud from env ...')
206 def setup_env():
207 from omegaml.client.cloud import setup
208 return setup(userid=os.environ['OMEGA_USERID'], apikey=os.environ['OMEGA_APIKEY'],
209 qualifier=os.environ.get('OMEGA_QUALIFIER'))
210
211 @inprogress(text='loading cloud from config ...')
212 def setup_cloud_config():
213 from omegaml.client.cloud import setup_from_config
214 return setup_from_config(fallback=setup_base)
215
216 omega = None
217 from_args = len(args) > 0 or any(kw in kwargs for kw in ('userid', 'apikey', 'api_url', 'qualifier'))
218 from_env = {'OMEGA_USERID', 'OMEGA_APIKEY'} < set(os.environ)
219 from_config = _base_config.OMEGA_CONFIG_FILE and os.path.exists(_base_config.OMEGA_CONFIG_FILE)
220 loaders = ((from_args, setup_cloud), (from_env, setup_env),
221 (from_config, setup_cloud_config), (True, setup_base))
222 must_load = (from_env, setup_env), (from_config, setup_cloud_config)
223 errors = []
224 for condition, loader in loaders:
225 if not condition:
226 continue
227 try:
228 omega = loader()
229 except Exception as e:
230 errors.append((loader, e))
231 if any(condition and loader is expected for condition, expected in must_load):
232 raise
233 else:
234 break
235 else:
236 assert omega is not None, f"failed to load omega due to {errors}"
237
238 if not self.initialized:
239 self.initialized = True
240 self.omega = omega
241 return omega
242
243 def __getattr__(self, name):
244 if self.base:
245 base = getattr(self.base, self.attribute)
246 return getattr(base, name)
247 if not self.initialized:
248 self.setup()
249 return getattr(self.omega, name)
250
251 def __getitem__(self, bucket):
252 if self.base:
253 base = getattr(self.base, self.attribute)
254 return base[bucket]
255 if not self.initialized:
256 self.setup()
257 return self.omega[bucket]
258
259 def __repr__(self):
260 if self.base:
261 return repr(getattr(self.base, self.attribute))
262 self.setup()
263 return repr(self.omega)
264
265 def __call__(self, *args, **kwargs):
266 if self.base:
267 base = getattr(self.base, self.attribute)
268 return base(*args, **kwargs)
269 if not self.initialized:
270 self.setup()
271 return self(*args, **kwargs)
272
273 @property
274 def instance(self):
275 return self.base
276
277
278def setup(*args, **kwargs):
279 """
280 configure and return the omega client instance
281 """
282 return _om.setup(*args, **kwargs)
283
284
285# default instance
286# -- these are deferred instanced that is the actual Omega instance
287# is only created on actual attribute access
288_om = OmegaDeferredInstance()
289version = version # ensure we keep imports