Source code for omegaml.omega

  1import os
  2from uuid import uuid4
  3
  4from omegaml.util import inprogress
  5from ._version import version
  6from .mixins.store.requests import CombinedStoreRequestCache
  7from .store.combined import CombinedOmegaStoreMixin
  8from .store.logging import OmegaSimpleLogger
  9
 10
[docs] 11class Omega(CombinedStoreRequestCache, CombinedOmegaStoreMixin): 12 """ 13 Client API to omegaml 14 15 Provides the following APIs: 16 17 * :code:`datasets` - access to datasets stored in the cluster 18 * :code:`models` - access to models stored in the cluster 19 * :code:`runtimes` - access to the cluster compute resources 20 * :code:`jobs` - access to jobs stored and executed in the cluster 21 * :code:`scripts` - access to lambda modules stored and executed in the cluster 22 23 """ 24 25 def __init__(self, defaults=None, mongo_url=None, celeryconf=None, bucket=None, 26 **kwargs): 27 """ 28 Initialize the client API 29 30 Without arguments create the client API according to the user's 31 configuration in :code:`~/omegaml/config.yml`. 32 33 Arguments override the user's configuration. 34 35 :param defaults: the DefaultsContext 36 :param mongo_url: the fully qualified URI to the mongo database, 37 of format :code:`mongodb://user:password@host:port/database` 38 :param celeryconf: the celery configuration dictionary 39 """ 40 from omegaml.util import settings 41 # avoid circular imports 42 from omegaml.notebook.jobs import OmegaJobs 43 # celery and mongo configuration 44 self.defaults = defaults or settings() 45 self.mongo_url = mongo_url or self.defaults.OMEGA_MONGO_URL 46 self.bucket = bucket 47 # setup storage locations 48 self._dbalias = self._make_dbalias() 49 self.models = self._make_store(prefix='models/') 50 self.datasets = self._make_store(prefix='data/') 51 self._jobdata = self._make_store(prefix='jobs/') 52 self.scripts = self._make_store(prefix='scripts/') 53 # minibatch integration 54 self.streams = self._make_streams(prefix='streams/') 55 # runtimes environments 56 self.runtime = self._make_runtime(celeryconf) 57 self.jobs = OmegaJobs(store=self._jobdata, defaults=self.defaults) 58 # logger 59 self.logger = OmegaSimpleLogger(store=self.datasets, defaults=self.defaults) 60 # stores 61 self._stores = [self.models, self.datasets, self.scripts, self.jobs, self.streams] 62 # monitoring 63 self._monitor = None # will be created by .status() on first access 64 65 def __repr__(self): 66 return f'Omega(bucket={self.bucket})' 67 68 def _clone(self, **kwargs): 69 return self.__class__(defaults=self.defaults, 70 mongo_url=self.mongo_url, 71 **kwargs) 72 73 def _make_runtime(self, celeryconf): 74 from omegaml.runtimes import OmegaRuntime 75 return OmegaRuntime(self, bucket=self.bucket, defaults=self.defaults, celeryconf=celeryconf) 76 77 def _make_store(self, prefix): 78 from omegaml.store import OmegaStore 79 return OmegaStore(mongo_url=self.mongo_url, bucket=self.bucket, prefix=prefix, defaults=self.defaults, 80 dbalias=self._dbalias) 81 82 def _make_dbalias(self): 83 return 'omega-{}'.format(uuid4().hex) 84 85 def _make_streams(self, prefix): 86 from omegaml.store.streams import StreamsProxy 87 return StreamsProxy(mongo_url=self.mongo_url, bucket=self.bucket, prefix=prefix, defaults=self.defaults) 88 89 def _make_monitor(self): 90 import weakref 91 from omegaml.client.lunamon import LunaMonitor, OmegaMonitors 92 status_logger = self.runtime.experiment('.system') 93 for_keys = lambda event, keys: {k: event[k] for k in keys if k in event} 94 on_status = lambda event: (status_logger.use().log_event('monitor', event['check'], 95 for_keys(event, 96 ('status', 'message', 'error', 97 'elapsed'))) 98 if event['check'] == 'health' else None) 99 monitor = LunaMonitor(checks=OmegaMonitors.on(self), on_status=on_status, interval=15) 100 weakref.finalize(self, monitor.stop) 101 return monitor 102
[docs] 103 def status(self, check=None, data=False, by_status=False, wait=False): 104 """ get the status of the omegaml cluster 105 106 Args: 107 check (str): the check to run, e.g. 'storage', 'runtime' 108 data (bool): return the monitoring data for the check 109 by_status (bool): return data by status 110 wait (bool): wait for the check to complete 111 112 Returns: 113 dict 114 """ 115 if self._monitor is None: 116 # we defer the creation of the monitor to the first access 117 # -- this is to avoid creating a monitor for every instance 118 # -- e.g. in runtime where the instance is created for short-lived tasks, 119 # the monitor would be created and immediately gc'd 120 self._monitor = self._make_monitor() 121 if wait: 122 self._check_connections() 123 return self._monitor.status(check=check, data=data, by_status=by_status)
124 125 def _check_connections(self): 126 return self._monitor.wait_ok() 127 128 def __getitem__(self, bucket): 129 """ 130 return Omega instance configured for the given bucket 131 132 Args: 133 bucket (str): the bucket name. If it does not exist 134 it gets created on first storage of an object. 135 If bucket=None returns self. 136 137 Usage: 138 import omegaml as om 139 140 # om is configured on the default bucket 141 # om_mybucket will use the same settings, but configured for mybucket 142 om_mybucket = om['mybucket'] 143 144 Returns: 145 Omega instance configured for the given bucket 146 """ 147 return self._get_bucket(bucket) 148 149 @property 150 def buckets(self): 151 from itertools import chain 152 return list(set(chain(*[getattr(store, 'buckets', []) for store in self._stores]))) 153 154 def _get_bucket(self, bucket): 155 # enable patching in testing 156 if bucket is None or bucket == self.bucket or (bucket == 'default' and self.bucket is None): 157 return self 158 if bucket == 'default': 159 # actual bucket selection is a responsibility of the store, thus we pass None 160 # (it is for good reason: OmegaStore should be instantiatable without giving a specific bucket) 161 bucket = None 162 return self._clone(bucket=bucket)
163 164 165class OmegaDeferredInstance(object): 166 """ 167 A deferred instance of Omega() that is only instantiated on access 168 169 This is to ensure that module-level imports don't trigger instantiation 170 of Omega. 171 """ 172 173 def __init__(self, base=None, attribute=None): 174 self.omega = 'not initialized -- call .setup() or access an attribute' 175 self.initialized = False 176 self.base = base 177 self.attribute = attribute 178 179 @inprogress(text='loading ...') 180 def setup(self, *args, **kwargs): 181 """loads omegaml 182 183 Loading order 184 - cloud using environment 185 - cloud using config file 186 - local instance 187 188 Returns: 189 omega instance 190 """ 191 from omegaml import _base_config 192 193 @inprogress(text='loading base ...') 194 def setup_base(): 195 from omegaml import _base_config 196 _base_config.load_framework_support() 197 _base_config.load_user_extensions() 198 return Omega(*args, **kwargs) 199 200 @inprogress(text='loading cloud ...') 201 def setup_cloud(): 202 from omegaml.client.cloud import setup 203 return setup(*args, **kwargs) 204 205 @inprogress(text='loading cloud from env ...') 206 def setup_env(): 207 from omegaml.client.cloud import setup 208 return setup(userid=os.environ['OMEGA_USERID'], apikey=os.environ['OMEGA_APIKEY'], 209 qualifier=os.environ.get('OMEGA_QUALIFIER')) 210 211 @inprogress(text='loading cloud from config ...') 212 def setup_cloud_config(): 213 from omegaml.client.cloud import setup_from_config 214 return setup_from_config(fallback=setup_base) 215 216 omega = None 217 from_args = len(args) > 0 or any(kw in kwargs for kw in ('userid', 'apikey', 'api_url', 'qualifier')) 218 from_env = {'OMEGA_USERID', 'OMEGA_APIKEY'} < set(os.environ) 219 from_config = _base_config.OMEGA_CONFIG_FILE and os.path.exists(_base_config.OMEGA_CONFIG_FILE) 220 loaders = ((from_args, setup_cloud), (from_env, setup_env), 221 (from_config, setup_cloud_config), (True, setup_base)) 222 must_load = (from_env, setup_env), (from_config, setup_cloud_config) 223 errors = [] 224 for condition, loader in loaders: 225 if not condition: 226 continue 227 try: 228 omega = loader() 229 except Exception as e: 230 errors.append((loader, e)) 231 if any(condition and loader is expected for condition, expected in must_load): 232 raise 233 else: 234 break 235 else: 236 assert omega is not None, f"failed to load omega due to {errors}" 237 238 if not self.initialized: 239 self.initialized = True 240 self.omega = omega 241 return omega 242 243 def __getattr__(self, name): 244 if self.base: 245 base = getattr(self.base, self.attribute) 246 return getattr(base, name) 247 if not self.initialized: 248 self.setup() 249 return getattr(self.omega, name) 250 251 def __getitem__(self, bucket): 252 if self.base: 253 base = getattr(self.base, self.attribute) 254 return base[bucket] 255 if not self.initialized: 256 self.setup() 257 return self.omega[bucket] 258 259 def __repr__(self): 260 if self.base: 261 return repr(getattr(self.base, self.attribute)) 262 self.setup() 263 return repr(self.omega) 264 265 def __call__(self, *args, **kwargs): 266 if self.base: 267 base = getattr(self.base, self.attribute) 268 return base(*args, **kwargs) 269 if not self.initialized: 270 self.setup() 271 return self(*args, **kwargs) 272 273 @property 274 def instance(self): 275 return self.base 276 277 278def setup(*args, **kwargs): 279 """ 280 configure and return the omega client instance 281 """ 282 return _om.setup(*args, **kwargs) 283 284 285# default instance 286# -- these are deferred instanced that is the actual Omega instance 287# is only created on actual attribute access 288_om = OmegaDeferredInstance() 289version = version # ensure we keep imports