1import os
  2from uuid import uuid4
  3
  4from omegaml.util import inprogress
  5from ._version import version
  6from .mixins.store.requests import CombinedStoreRequestCache
  7from .store.combined import CombinedOmegaStoreMixin
  8from .store.logging import OmegaSimpleLogger
  9
 10
[docs]
 11class Omega(CombinedStoreRequestCache, CombinedOmegaStoreMixin):
 12    """
 13    Client API to omegaml
 14
 15    Provides the following APIs:
 16
 17    * :code:`datasets` - access to datasets stored in the cluster
 18    * :code:`models` - access to models stored in the cluster
 19    * :code:`runtimes` - access to the cluster compute resources
 20    * :code:`jobs` - access to jobs stored and executed in the cluster
 21    * :code:`scripts` - access to lambda modules stored and executed in the cluster
 22
 23    """
 24
 25    def __init__(self, defaults=None, mongo_url=None, celeryconf=None, bucket=None,
 26                 **kwargs):
 27        """
 28        Initialize the client API
 29
 30        Without arguments create the client API according to the user's
 31        configuration in :code:`~/omegaml/config.yml`.
 32
 33        Arguments override the user's configuration.
 34
 35            :param defaults: the DefaultsContext
 36        :param mongo_url: the fully qualified URI to the mongo database,
 37        of format :code:`mongodb://user:password@host:port/database`
 38        :param celeryconf: the celery configuration dictionary
 39        """
 40        from omegaml.util import settings
 41        # celery and mongo configuration
 42        self.defaults = defaults or settings()
 43        self.mongo_url = mongo_url or self.defaults.OMEGA_MONGO_URL
 44        self.bucket = bucket
 45        # setup storage locations
 46        self._dbalias = self._make_dbalias()
 47        self.models = self._make_store(prefix='models/')
 48        self.datasets = self._make_store(prefix='data/')
 49        self.jobs = self._make_store(prefix='jobs/')
 50        self.scripts = self._make_store(prefix='scripts/')
 51        # minibatch integration
 52        self.streams = self._make_streams(prefix='streams/')
 53        # runtimes environments
 54        self.runtime = self._make_runtime(celeryconf)
 55        # logger
 56        self.logger = OmegaSimpleLogger(store=self.datasets, defaults=self.defaults)
 57        # stores
 58        self._stores = [self.models, self.datasets, self.scripts, self.jobs, self.streams]
 59        # monitoring
 60        self._monitor = None  # will be created by .status() on first access
 61
 62    def __repr__(self):
 63        return f'Omega(bucket={self.bucket})'
 64
 65    def _clone(self, **kwargs):
 66        return self.__class__(defaults=self.defaults,
 67                              mongo_url=self.mongo_url,
 68                              **kwargs)
 69
 70    def _make_runtime(self, celeryconf):
 71        from omegaml.runtimes import OmegaRuntime
 72        return OmegaRuntime(self, bucket=self.bucket, defaults=self.defaults, celeryconf=celeryconf)
 73
 74    def _make_store(self, prefix):
 75        from omegaml.store import OmegaStore
 76        return OmegaStore(mongo_url=self.mongo_url, bucket=self.bucket, prefix=prefix, defaults=self.defaults,
 77                          dbalias=self._dbalias)
 78
 79    def _make_dbalias(self):
 80        return 'omega-{}'.format(uuid4().hex)
 81
 82    def _make_streams(self, prefix):
 83        from omegaml.store.streams import StreamsProxy
 84        return StreamsProxy(mongo_url=self.mongo_url, bucket=self.bucket, prefix=prefix, defaults=self.defaults)
 85
 86    def _make_monitor(self):
 87        import weakref
 88        from omegaml.client.lunamon import LunaMonitor, OmegaMonitors
 89        status_logger = self.runtime.experiment('.system')
 90        for_keys = lambda event, keys: {k: event[k] for k in keys if k in event}
 91        on_status = lambda event: (status_logger.use().log_event('monitor', event['check'],
 92                                                                 for_keys(event,
 93                                                                          ('status', 'message', 'error',
 94                                                                           'elapsed')))
 95                                   if event['check'] == 'health' else None)
 96        monitor = LunaMonitor(checks=OmegaMonitors.on(self), on_status=on_status, interval=15)
 97        weakref.finalize(self, monitor.stop)
 98        return monitor
 99
[docs]
100    def status(self, check=None, data=False, by_status=False, wait=False):
101        """ get the status of the omegaml cluster
102
103        Args:
104            check (str): the check to run, e.g. 'storage', 'runtime'
105            data (bool): return the monitoring data for the check
106            by_status (bool): return data by status
107            wait (bool): wait for the check to complete
108
109        Returns:
110            dict 
111        """
112        if self._monitor is None:
113            # we defer the creation of the monitor to the first access
114            # -- this is to avoid creating a monitor for every instance
115            # -- e.g. in runtime where the instance is created for short-lived tasks,
116            #    the monitor would be created and immediately gc'd
117            self._monitor = self._make_monitor()
118        if wait:
119            self._check_connections()
120        return self._monitor.status(check=check, data=data, by_status=by_status) 
121
122    def _check_connections(self):
123        return self._monitor.wait_ok()
124
125    def __getitem__(self, bucket):
126        """
127        return Omega instance configured for the given bucket
128
129        Args:
130            bucket (str): the bucket name. If it does not exist
131                  it gets created on first storage of an object.
132                  If bucket=None returns self.
133
134        Usage:
135            import omegaml as om
136
137            # om is configured on the default bucket
138            # om_mybucket will use the same settings, but configured for mybucket
139            om_mybucket = om['mybucket']
140
141        Returns:
142            Omega instance configured for the given bucket
143        """
144        return self._get_bucket(bucket)
145
146    @property
147    def buckets(self):
148        from itertools import chain
149        return list(set(chain(*[getattr(store, 'buckets', []) for store in self._stores])))
150
151    def _get_bucket(self, bucket):
152        # enable patching in testing
153        if bucket is None or bucket == self.bucket or (bucket == 'default' and self.bucket is None):
154            return self
155        if bucket == 'default':
156            # actual bucket selection is a responsibility of the store, thus we pass None
157            # (it is for good reason: OmegaStore should be instantiatable without giving a specific bucket)
158            bucket = None
159        return self._clone(bucket=bucket) 
160
161
162class OmegaDeferredInstance(object):
163    """
164    A deferred instance of Omega() that is only instantiated on access
165
166    This is to ensure that module-level imports don't trigger instantiation
167    of Omega.
168    """
169
170    def __init__(self, base=None, attribute=None):
171        self.omega = 'not initialized -- call .setup() or access an attribute'
172        self.initialized = False
173        self.base = base
174        self.attribute = attribute
175
176    @inprogress(text='loading ...')
177    def setup(self, *args, **kwargs):
178        """loads omegaml
179
180        Loading order
181            - cloud using environment
182            - cloud using config file
183            - local instance
184
185        Returns:
186            omega instance
187        """
188        from omegaml import _base_config
189
190        @inprogress(text='loading base ...')
191        def setup_base():
192            from omegaml import _base_config
193            _base_config.load_framework_support()
194            _base_config.load_user_extensions()
195            return Omega(*args, **kwargs)
196
197        @inprogress(text='loading cloud ...')
198        def setup_cloud():
199            from omegaml.client.cloud import setup
200            return setup(*args, **kwargs)
201
202        @inprogress(text='loading cloud from env ...')
203        def setup_env():
204            from omegaml.client.cloud import setup
205            return setup(userid=os.environ['OMEGA_USERID'], apikey=os.environ['OMEGA_APIKEY'],
206                         qualifier=os.environ.get('OMEGA_QUALIFIER'))
207
208        @inprogress(text='loading cloud from config ...')
209        def setup_cloud_config():
210            from omegaml.client.cloud import setup_from_config
211            return setup_from_config(fallback=setup_base)
212
213        omega = None
214        from_args = len(args) > 0 or any(kw in kwargs for kw in ('userid', 'apikey', 'api_url', 'qualifier'))
215        from_env = {'OMEGA_USERID', 'OMEGA_APIKEY'} < set(os.environ)
216        from_config = _base_config.OMEGA_CONFIG_FILE and os.path.exists(_base_config.OMEGA_CONFIG_FILE)
217        loaders = ((from_args, setup_cloud), (from_env, setup_env),
218                   (from_config, setup_cloud_config), (True, setup_base))
219        must_load = (from_env, setup_env), (from_config, setup_cloud_config)
220        errors = []
221        for condition, loader in loaders:
222            if not condition:
223                continue
224            try:
225                omega = loader()
226            except Exception as e:
227                errors.append((loader, e))
228                if any(condition and loader is expected for condition, expected in must_load):
229                    raise
230            else:
231                break
232        else:
233            assert omega is not None, f"failed to load omega due to {errors}"
234
235        if not self.initialized:
236            self.initialized = True
237            self.omega = omega
238        return omega
239
240    def __getattr__(self, name):
241        if self.base:
242            base = getattr(self.base, self.attribute)
243            return getattr(base, name)
244        if not self.initialized:
245            self.setup()
246        return getattr(self.omega, name)
247
248    def __getitem__(self, bucket):
249        if self.base:
250            base = getattr(self.base, self.attribute)
251            return base[bucket]
252        if not self.initialized:
253            self.setup()
254        return self.omega[bucket]
255
256    def __repr__(self):
257        if self.base:
258            return repr(getattr(self.base, self.attribute))
259        self.setup()
260        return repr(self.omega)
261
262    def __call__(self, *args, **kwargs):
263        if self.base:
264            base = getattr(self.base, self.attribute)
265            return base(*args, **kwargs)
266        if not self.initialized:
267            self.setup()
268        return self(*args, **kwargs)
269
270    @property
271    def instance(self):
272        return self.base
273
274
275def setup(*args, **kwargs):
276    """
277    configure and return the omega client instance
278    """
279    return _om.setup(*args, **kwargs)
280
281
282# default instance
283# -- these are deferred instanced that is the actual Omega instance
284#    is only created on actual attribute access
285_om = OmegaDeferredInstance()
286version = version  # ensure we keep imports