Source code for omegaml.omega

  1import os
  2from uuid import uuid4
  3
  4from omegaml.util import inprogress
  5from ._version import version
  6from .mixins.store.requests import CombinedStoreRequestCache
  7from .store.combined import CombinedOmegaStoreMixin
  8from .store.logging import OmegaSimpleLogger
  9
 10
[docs] 11class Omega(CombinedStoreRequestCache, CombinedOmegaStoreMixin): 12 """ 13 Client API to omegaml 14 15 Provides the following APIs: 16 17 * :code:`datasets` - access to datasets stored in the cluster 18 * :code:`models` - access to models stored in the cluster 19 * :code:`runtimes` - access to the cluster compute resources 20 * :code:`jobs` - access to jobs stored and executed in the cluster 21 * :code:`scripts` - access to lambda modules stored and executed in the cluster 22 23 """ 24 25 def __init__(self, defaults=None, mongo_url=None, celeryconf=None, bucket=None, 26 **kwargs): 27 """ 28 Initialize the client API 29 30 Without arguments create the client API according to the user's 31 configuration in :code:`~/omegaml/config.yml`. 32 33 Arguments override the user's configuration. 34 35 :param defaults: the DefaultsContext 36 :param mongo_url: the fully qualified URI to the mongo database, 37 of format :code:`mongodb://user:password@host:port/database` 38 :param celeryconf: the celery configuration dictionary 39 """ 40 from omegaml.util import settings 41 # celery and mongo configuration 42 self.defaults = defaults or settings() 43 self.mongo_url = mongo_url or self.defaults.OMEGA_MONGO_URL 44 self.bucket = bucket 45 # setup storage locations 46 self._dbalias = self._make_dbalias() 47 self.models = self._make_store(prefix='models/') 48 self.datasets = self._make_store(prefix='data/') 49 self.jobs = self._make_store(prefix='jobs/') 50 self.scripts = self._make_store(prefix='scripts/') 51 # minibatch integration 52 self.streams = self._make_streams(prefix='streams/') 53 # runtimes environments 54 self.runtime = self._make_runtime(celeryconf) 55 # logger 56 self.logger = OmegaSimpleLogger(store=self.datasets, defaults=self.defaults) 57 # stores 58 self._stores = [self.models, self.datasets, self.scripts, self.jobs, self.streams] 59 # monitoring 60 self._monitor = None # will be created by .status() on first access 61 62 def __repr__(self): 63 return f'Omega(bucket={self.bucket})' 64 65 def _clone(self, **kwargs): 66 return self.__class__(defaults=self.defaults, 67 mongo_url=self.mongo_url, 68 **kwargs) 69 70 def _make_runtime(self, celeryconf): 71 from omegaml.runtimes import OmegaRuntime 72 return OmegaRuntime(self, bucket=self.bucket, defaults=self.defaults, celeryconf=celeryconf) 73 74 def _make_store(self, prefix): 75 from omegaml.store import OmegaStore 76 return OmegaStore(mongo_url=self.mongo_url, bucket=self.bucket, prefix=prefix, defaults=self.defaults, 77 dbalias=self._dbalias) 78 79 def _make_dbalias(self): 80 return 'omega-{}'.format(uuid4().hex) 81 82 def _make_streams(self, prefix): 83 from omegaml.store.streams import StreamsProxy 84 return StreamsProxy(mongo_url=self.mongo_url, bucket=self.bucket, prefix=prefix, defaults=self.defaults) 85 86 def _make_monitor(self): 87 import weakref 88 from omegaml.client.lunamon import LunaMonitor, OmegaMonitors 89 status_logger = self.runtime.experiment('.system') 90 for_keys = lambda event, keys: {k: event[k] for k in keys if k in event} 91 on_status = lambda event: (status_logger.use().log_event('monitor', event['check'], 92 for_keys(event, 93 ('status', 'message', 'error', 94 'elapsed'))) 95 if event['check'] == 'health' else None) 96 monitor = LunaMonitor(checks=OmegaMonitors.on(self), on_status=on_status, interval=15) 97 weakref.finalize(self, monitor.stop) 98 return monitor 99
[docs] 100 def status(self, check=None, data=False, by_status=False, wait=False): 101 """ get the status of the omegaml cluster 102 103 Args: 104 check (str): the check to run, e.g. 'storage', 'runtime' 105 data (bool): return the monitoring data for the check 106 by_status (bool): return data by status 107 wait (bool): wait for the check to complete 108 109 Returns: 110 dict 111 """ 112 if self._monitor is None: 113 # we defer the creation of the monitor to the first access 114 # -- this is to avoid creating a monitor for every instance 115 # -- e.g. in runtime where the instance is created for short-lived tasks, 116 # the monitor would be created and immediately gc'd 117 self._monitor = self._make_monitor() 118 if wait: 119 self._check_connections() 120 return self._monitor.status(check=check, data=data, by_status=by_status)
121 122 def _check_connections(self): 123 return self._monitor.wait_ok() 124 125 def __getitem__(self, bucket): 126 """ 127 return Omega instance configured for the given bucket 128 129 Args: 130 bucket (str): the bucket name. If it does not exist 131 it gets created on first storage of an object. 132 If bucket=None returns self. 133 134 Usage: 135 import omegaml as om 136 137 # om is configured on the default bucket 138 # om_mybucket will use the same settings, but configured for mybucket 139 om_mybucket = om['mybucket'] 140 141 Returns: 142 Omega instance configured for the given bucket 143 """ 144 return self._get_bucket(bucket) 145 146 @property 147 def buckets(self): 148 from itertools import chain 149 return list(set(chain(*[getattr(store, 'buckets', []) for store in self._stores]))) 150 151 def _get_bucket(self, bucket): 152 # enable patching in testing 153 if bucket is None or bucket == self.bucket or (bucket == 'default' and self.bucket is None): 154 return self 155 if bucket == 'default': 156 # actual bucket selection is a responsibility of the store, thus we pass None 157 # (it is for good reason: OmegaStore should be instantiatable without giving a specific bucket) 158 bucket = None 159 return self._clone(bucket=bucket)
160 161 162class OmegaDeferredInstance(object): 163 """ 164 A deferred instance of Omega() that is only instantiated on access 165 166 This is to ensure that module-level imports don't trigger instantiation 167 of Omega. 168 """ 169 170 def __init__(self, base=None, attribute=None): 171 self.omega = 'not initialized -- call .setup() or access an attribute' 172 self.initialized = False 173 self.base = base 174 self.attribute = attribute 175 176 @inprogress(text='loading ...') 177 def setup(self, *args, **kwargs): 178 """loads omegaml 179 180 Loading order 181 - cloud using environment 182 - cloud using config file 183 - local instance 184 185 Returns: 186 omega instance 187 """ 188 from omegaml import _base_config 189 190 @inprogress(text='loading base ...') 191 def setup_base(): 192 from omegaml import _base_config 193 _base_config.load_framework_support() 194 _base_config.load_user_extensions() 195 return Omega(*args, **kwargs) 196 197 @inprogress(text='loading cloud ...') 198 def setup_cloud(): 199 from omegaml.client.cloud import setup 200 return setup(*args, **kwargs) 201 202 @inprogress(text='loading cloud from env ...') 203 def setup_env(): 204 from omegaml.client.cloud import setup 205 return setup(userid=os.environ['OMEGA_USERID'], apikey=os.environ['OMEGA_APIKEY'], 206 qualifier=os.environ.get('OMEGA_QUALIFIER')) 207 208 @inprogress(text='loading cloud from config ...') 209 def setup_cloud_config(): 210 from omegaml.client.cloud import setup_from_config 211 return setup_from_config(fallback=setup_base) 212 213 omega = None 214 from_args = len(args) > 0 or any(kw in kwargs for kw in ('userid', 'apikey', 'api_url', 'qualifier')) 215 from_env = {'OMEGA_USERID', 'OMEGA_APIKEY'} < set(os.environ) 216 from_config = _base_config.OMEGA_CONFIG_FILE and os.path.exists(_base_config.OMEGA_CONFIG_FILE) 217 loaders = ((from_args, setup_cloud), (from_env, setup_env), 218 (from_config, setup_cloud_config), (True, setup_base)) 219 must_load = (from_env, setup_env), (from_config, setup_cloud_config) 220 errors = [] 221 for condition, loader in loaders: 222 if not condition: 223 continue 224 try: 225 omega = loader() 226 except Exception as e: 227 errors.append((loader, e)) 228 if any(condition and loader is expected for condition, expected in must_load): 229 raise 230 else: 231 break 232 else: 233 assert omega is not None, f"failed to load omega due to {errors}" 234 235 if not self.initialized: 236 self.initialized = True 237 self.omega = omega 238 return omega 239 240 def __getattr__(self, name): 241 if self.base: 242 base = getattr(self.base, self.attribute) 243 return getattr(base, name) 244 if not self.initialized: 245 self.setup() 246 return getattr(self.omega, name) 247 248 def __getitem__(self, bucket): 249 if self.base: 250 base = getattr(self.base, self.attribute) 251 return base[bucket] 252 if not self.initialized: 253 self.setup() 254 return self.omega[bucket] 255 256 def __repr__(self): 257 if self.base: 258 return repr(getattr(self.base, self.attribute)) 259 self.setup() 260 return repr(self.omega) 261 262 def __call__(self, *args, **kwargs): 263 if self.base: 264 base = getattr(self.base, self.attribute) 265 return base(*args, **kwargs) 266 if not self.initialized: 267 self.setup() 268 return self(*args, **kwargs) 269 270 @property 271 def instance(self): 272 return self.base 273 274 275def setup(*args, **kwargs): 276 """ 277 configure and return the omega client instance 278 """ 279 return _om.setup(*args, **kwargs) 280 281 282# default instance 283# -- these are deferred instanced that is the actual Omega instance 284# is only created on actual attribute access 285_om = OmegaDeferredInstance() 286version = version # ensure we keep imports