1import os
2from uuid import uuid4
3
4from omegaml.util import inprogress
5from ._version import version
6from .mixins.store.requests import CombinedStoreRequestCache
7from .store.combined import CombinedOmegaStoreMixin
8from .store.logging import OmegaSimpleLogger
9
10
[docs]
11class Omega(CombinedStoreRequestCache, CombinedOmegaStoreMixin):
12 """
13 Client API to omegaml
14
15 Provides the following APIs:
16
17 * :code:`datasets` - access to datasets stored in the cluster
18 * :code:`models` - access to models stored in the cluster
19 * :code:`runtimes` - access to the cluster compute resources
20 * :code:`jobs` - access to jobs stored and executed in the cluster
21 * :code:`scripts` - access to lambda modules stored and executed in the cluster
22
23 """
24
25 def __init__(self, defaults=None, mongo_url=None, celeryconf=None, bucket=None,
26 **kwargs):
27 """
28 Initialize the client API
29
30 Without arguments create the client API according to the user's
31 configuration in :code:`~/omegaml/config.yml`.
32
33 Arguments override the user's configuration.
34
35 :param defaults: the DefaultsContext
36 :param mongo_url: the fully qualified URI to the mongo database,
37 of format :code:`mongodb://user:password@host:port/database`
38 :param celeryconf: the celery configuration dictionary
39 """
40 from omegaml.util import settings
41 # celery and mongo configuration
42 self.defaults = defaults or settings()
43 self.mongo_url = mongo_url or self.defaults.OMEGA_MONGO_URL
44 self.bucket = bucket
45 # setup storage locations
46 self._dbalias = self._make_dbalias()
47 self.models = self._make_store(prefix='models/')
48 self.datasets = self._make_store(prefix='data/')
49 self.jobs = self._make_store(prefix='jobs/')
50 self.scripts = self._make_store(prefix='scripts/')
51 # minibatch integration
52 self.streams = self._make_streams(prefix='streams/')
53 # runtimes environments
54 self.runtime = self._make_runtime(celeryconf)
55 # logger
56 self.logger = OmegaSimpleLogger(store=self.datasets, defaults=self.defaults)
57 # stores
58 self._stores = [self.models, self.datasets, self.scripts, self.jobs, self.streams]
59 # monitoring
60 self._monitor = None # will be created by .status() on first access
61
62 def __repr__(self):
63 return f'Omega(bucket={self.bucket})'
64
65 def _clone(self, **kwargs):
66 return self.__class__(defaults=self.defaults,
67 mongo_url=self.mongo_url,
68 **kwargs)
69
70 def _make_runtime(self, celeryconf):
71 from omegaml.runtimes import OmegaRuntime
72 return OmegaRuntime(self, bucket=self.bucket, defaults=self.defaults, celeryconf=celeryconf)
73
74 def _make_store(self, prefix):
75 from omegaml.store import OmegaStore
76 return OmegaStore(mongo_url=self.mongo_url, bucket=self.bucket, prefix=prefix, defaults=self.defaults,
77 dbalias=self._dbalias)
78
79 def _make_dbalias(self):
80 return 'omega-{}'.format(uuid4().hex)
81
82 def _make_streams(self, prefix):
83 from omegaml.store.streams import StreamsProxy
84 return StreamsProxy(mongo_url=self.mongo_url, bucket=self.bucket, prefix=prefix, defaults=self.defaults)
85
86 def _make_monitor(self):
87 import weakref
88 from omegaml.client.lunamon import LunaMonitor, OmegaMonitors
89 status_logger = self.runtime.experiment('.system')
90 for_keys = lambda event, keys: {k: event[k] for k in keys if k in event}
91 on_status = lambda event: (status_logger.use().log_event('monitor', event['check'],
92 for_keys(event,
93 ('status', 'message', 'error',
94 'elapsed')))
95 if event['check'] == 'health' else None)
96 monitor = LunaMonitor(checks=OmegaMonitors.on(self), on_status=on_status, interval=15)
97 weakref.finalize(self, monitor.stop)
98 return monitor
99
[docs]
100 def status(self, check=None, data=False, by_status=False, wait=False):
101 """ get the status of the omegaml cluster
102
103 Args:
104 check (str): the check to run, e.g. 'storage', 'runtime'
105 data (bool): return the monitoring data for the check
106 by_status (bool): return data by status
107 wait (bool): wait for the check to complete
108
109 Returns:
110 dict
111 """
112 if self._monitor is None:
113 # we defer the creation of the monitor to the first access
114 # -- this is to avoid creating a monitor for every instance
115 # -- e.g. in runtime where the instance is created for short-lived tasks,
116 # the monitor would be created and immediately gc'd
117 self._monitor = self._make_monitor()
118 if wait:
119 self._check_connections()
120 return self._monitor.status(check=check, data=data, by_status=by_status)
121
122 def _check_connections(self):
123 return self._monitor.wait_ok()
124
125 def __getitem__(self, bucket):
126 """
127 return Omega instance configured for the given bucket
128
129 Args:
130 bucket (str): the bucket name. If it does not exist
131 it gets created on first storage of an object.
132 If bucket=None returns self.
133
134 Usage:
135 import omegaml as om
136
137 # om is configured on the default bucket
138 # om_mybucket will use the same settings, but configured for mybucket
139 om_mybucket = om['mybucket']
140
141 Returns:
142 Omega instance configured for the given bucket
143 """
144 return self._get_bucket(bucket)
145
146 @property
147 def buckets(self):
148 from itertools import chain
149 return list(set(chain(*[getattr(store, 'buckets', []) for store in self._stores])))
150
151 def _get_bucket(self, bucket):
152 # enable patching in testing
153 if bucket is None or bucket == self.bucket or (bucket == 'default' and self.bucket is None):
154 return self
155 if bucket == 'default':
156 # actual bucket selection is a responsibility of the store, thus we pass None
157 # (it is for good reason: OmegaStore should be instantiatable without giving a specific bucket)
158 bucket = None
159 return self._clone(bucket=bucket)
160
161
162class OmegaDeferredInstance(object):
163 """
164 A deferred instance of Omega() that is only instantiated on access
165
166 This is to ensure that module-level imports don't trigger instantiation
167 of Omega.
168 """
169
170 def __init__(self, base=None, attribute=None):
171 self.omega = 'not initialized -- call .setup() or access an attribute'
172 self.initialized = False
173 self.base = base
174 self.attribute = attribute
175
176 @inprogress(text='loading ...')
177 def setup(self, *args, **kwargs):
178 """loads omegaml
179
180 Loading order
181 - cloud using environment
182 - cloud using config file
183 - local instance
184
185 Returns:
186 omega instance
187 """
188 from omegaml import _base_config
189
190 @inprogress(text='loading base ...')
191 def setup_base():
192 from omegaml import _base_config
193 _base_config.load_framework_support()
194 _base_config.load_user_extensions()
195 return Omega(*args, **kwargs)
196
197 @inprogress(text='loading cloud ...')
198 def setup_cloud():
199 from omegaml.client.cloud import setup
200 return setup(*args, **kwargs)
201
202 @inprogress(text='loading cloud from env ...')
203 def setup_env():
204 from omegaml.client.cloud import setup
205 return setup(userid=os.environ['OMEGA_USERID'], apikey=os.environ['OMEGA_APIKEY'],
206 qualifier=os.environ.get('OMEGA_QUALIFIER'))
207
208 @inprogress(text='loading cloud from config ...')
209 def setup_cloud_config():
210 from omegaml.client.cloud import setup_from_config
211 return setup_from_config(fallback=setup_base)
212
213 omega = None
214 from_args = len(args) > 0 or any(kw in kwargs for kw in ('userid', 'apikey', 'api_url', 'qualifier'))
215 from_env = {'OMEGA_USERID', 'OMEGA_APIKEY'} < set(os.environ)
216 from_config = _base_config.OMEGA_CONFIG_FILE and os.path.exists(_base_config.OMEGA_CONFIG_FILE)
217 loaders = ((from_args, setup_cloud), (from_env, setup_env),
218 (from_config, setup_cloud_config), (True, setup_base))
219 must_load = (from_env, setup_env), (from_config, setup_cloud_config)
220 errors = []
221 for condition, loader in loaders:
222 if not condition:
223 continue
224 try:
225 omega = loader()
226 except Exception as e:
227 errors.append((loader, e))
228 if any(condition and loader is expected for condition, expected in must_load):
229 raise
230 else:
231 break
232 else:
233 assert omega is not None, f"failed to load omega due to {errors}"
234
235 if not self.initialized:
236 self.initialized = True
237 self.omega = omega
238 return omega
239
240 def __getattr__(self, name):
241 if self.base:
242 base = getattr(self.base, self.attribute)
243 return getattr(base, name)
244 if not self.initialized:
245 self.setup()
246 return getattr(self.omega, name)
247
248 def __getitem__(self, bucket):
249 if self.base:
250 base = getattr(self.base, self.attribute)
251 return base[bucket]
252 if not self.initialized:
253 self.setup()
254 return self.omega[bucket]
255
256 def __repr__(self):
257 if self.base:
258 return repr(getattr(self.base, self.attribute))
259 self.setup()
260 return repr(self.omega)
261
262 def __call__(self, *args, **kwargs):
263 if self.base:
264 base = getattr(self.base, self.attribute)
265 return base(*args, **kwargs)
266 if not self.initialized:
267 self.setup()
268 return self(*args, **kwargs)
269
270 @property
271 def instance(self):
272 return self.base
273
274
275def setup(*args, **kwargs):
276 """
277 configure and return the omega client instance
278 """
279 return _om.setup(*args, **kwargs)
280
281
282# default instance
283# -- these are deferred instanced that is the actual Omega instance
284# is only created on actual attribute access
285_om = OmegaDeferredInstance()
286version = version # ensure we keep imports