Source code for omegaml.runtimes.daskruntime

  1from __future__ import absolute_import
  2
  3import datetime
  4from importlib import import_module
  5import os
  6
  7from omegaml.runtimes.proxies.jobproxy import OmegaJobProxy
  8
  9
 10class DaskTask(object):
 11    """
 12    A dask remote function wrapper mimicking a Celery task
 13    """
 14
 15    def __init__(self, fn, client, pure=True, **kwargs):
 16        """
 17        :param fn: (function) the function to be called
 18        :param client: (dask client) the dask client to use
 19        :param pure: (bool) whether this is a dask pure function (will
 20            be cached or not). Defaults to True.
 21        """
 22        self.client = client
 23        self.fn = fn
 24        self.pure = pure
 25        self.kwargs = kwargs
 26
 27    def delay(self, *args, **kwargs):
 28        """
 29        submit the function and execute on cluster.  
 30        """
 31        kwargs['pure'] = kwargs.get('pure', self.pure)
 32        if self.kwargs:
 33            kwargs.update(self.kwargs)
 34        return DaskAsyncResult(self.client.submit(self.fn, *args, **kwargs))
 35
 36
 37class DaskAsyncResult(object):
 38    """
 39    A dask Future wrapper mimicking a Celery AsyncResult
 40    """
 41
 42    def __init__(self, future):
 43        self.future = future
 44
 45    def get(self):
 46        import dask
 47
 48        if os.environ.get('DASK_DEBUG'):
 49            with dask.set_options(get=dask.threaded.get):
 50                return self.future.result()
 51        return self.future.result()
 52
 53
 54def daskhello(*args, **kwargs):
 55    # test function for dask distributed
 56    return "hello from {} at {}".format(os.getpid(), datetime.datetime.now())
 57
 58
[docs] 59class OmegaRuntimeDask(object): 60 """ 61 omegaml compute cluster gateway to a dask distributed cluster 62 63 set environ DASK_DEBUG=1 to run dask tasks locally 64 """ 65 66 def __init__(self, omega, dask_url=None): 67 self.dask_url = dask_url 68 self.omega = omega 69 self._client = None 70 71 @property 72 def client(self): 73 from distributed import Client, LocalCluster 74 if self._client is None: 75 if os.environ.get('DASK_DEBUG'): 76 # http://dask.pydata.org/en/latest/setup/single-distributed.html?highlight=single-threaded#localcluster 77 single_threaded = LocalCluster(processes=False) 78 self._client = Client(single_threaded) 79 else: 80 self._client = Client(self.dask_url) 81 return self._client 82
[docs] 83 def model(self, modelname): 84 """ 85 return a model for remote execution 86 """ 87 from omegaml.runtimes.proxies.modelproxy import OmegaModelProxy 88 return OmegaModelProxy(modelname, runtime=self)
89 90 def job(self, jobname): 91 """ 92 return a job for remote exeuction 93 """ 94 return OmegaJobProxy(jobname, runtime=self) 95 96 def task(self, name, **kwargs): 97 """ 98 retrieve the task function from the task module 99 100 This retrieves the task function and wraps it into a 101 DaskTask. DaskTask mimicks a celery task and is 102 called on the cluster using .delay(), the same way we 103 call a celery task. .delay() will return a DaskAsyncResult, 104 supporting the celery .get() semantics. This way we can use 105 the same proxy objects, as all they do is call .delay() and 106 return an AsyncResult. 107 """ 108 modname, funcname = name.rsplit('.', 1) 109 mod = import_module(modname) 110 func = getattr(mod, funcname) 111 # we pass pure=False to force dask to reevaluate the task 112 # http://distributed.readthedocs.io/en/latest/client.html?highlight=pure#pure-functions-by-default 113 return DaskTask(func, self.client, pure=False, **kwargs) 114 115 def settings(self): 116 """ 117 return the runtimes's cluster settings 118 """ 119 return self.task('omegaml.tasks.omega_settings').delay().get() 120 121 def ping(self): 122 return DaskTask(daskhello, self.client, pure=False)