1from __future__ import absolute_import
2
3import datetime
4from importlib import import_module
5import os
6
7from omegaml.runtimes.proxies.jobproxy import OmegaJobProxy
8
9
10class DaskTask(object):
11 """
12 A dask remote function wrapper mimicking a Celery task
13 """
14
15 def __init__(self, fn, client, pure=True, **kwargs):
16 """
17 :param fn: (function) the function to be called
18 :param client: (dask client) the dask client to use
19 :param pure: (bool) whether this is a dask pure function (will
20 be cached or not). Defaults to True.
21 """
22 self.client = client
23 self.fn = fn
24 self.pure = pure
25 self.kwargs = kwargs
26
27 def delay(self, *args, **kwargs):
28 """
29 submit the function and execute on cluster.
30 """
31 kwargs['pure'] = kwargs.get('pure', self.pure)
32 if self.kwargs:
33 kwargs.update(self.kwargs)
34 return DaskAsyncResult(self.client.submit(self.fn, *args, **kwargs))
35
36
37class DaskAsyncResult(object):
38 """
39 A dask Future wrapper mimicking a Celery AsyncResult
40 """
41
42 def __init__(self, future):
43 self.future = future
44
45 def get(self):
46 import dask
47
48 if os.environ.get('DASK_DEBUG'):
49 with dask.set_options(get=dask.threaded.get):
50 return self.future.result()
51 return self.future.result()
52
53
54def daskhello(*args, **kwargs):
55 # test function for dask distributed
56 return "hello from {} at {}".format(os.getpid(), datetime.datetime.now())
57
58
[docs]
59class OmegaRuntimeDask(object):
60 """
61 omegaml compute cluster gateway to a dask distributed cluster
62
63 set environ DASK_DEBUG=1 to run dask tasks locally
64 """
65
66 def __init__(self, omega, dask_url=None):
67 self.dask_url = dask_url
68 self.omega = omega
69 self._client = None
70
71 @property
72 def client(self):
73 from distributed import Client, LocalCluster
74 if self._client is None:
75 if os.environ.get('DASK_DEBUG'):
76 # http://dask.pydata.org/en/latest/setup/single-distributed.html?highlight=single-threaded#localcluster
77 single_threaded = LocalCluster(processes=False)
78 self._client = Client(single_threaded)
79 else:
80 self._client = Client(self.dask_url)
81 return self._client
82
[docs]
83 def model(self, modelname):
84 """
85 return a model for remote execution
86 """
87 from omegaml.runtimes.proxies.modelproxy import OmegaModelProxy
88 return OmegaModelProxy(modelname, runtime=self)
89
90 def job(self, jobname):
91 """
92 return a job for remote exeuction
93 """
94 return OmegaJobProxy(jobname, runtime=self)
95
96 def task(self, name, **kwargs):
97 """
98 retrieve the task function from the task module
99
100 This retrieves the task function and wraps it into a
101 DaskTask. DaskTask mimicks a celery task and is
102 called on the cluster using .delay(), the same way we
103 call a celery task. .delay() will return a DaskAsyncResult,
104 supporting the celery .get() semantics. This way we can use
105 the same proxy objects, as all they do is call .delay() and
106 return an AsyncResult.
107 """
108 modname, funcname = name.rsplit('.', 1)
109 mod = import_module(modname)
110 func = getattr(mod, funcname)
111 # we pass pure=False to force dask to reevaluate the task
112 # http://distributed.readthedocs.io/en/latest/client.html?highlight=pure#pure-functions-by-default
113 return DaskTask(func, self.client, pure=False, **kwargs)
114
115 def settings(self):
116 """
117 return the runtimes's cluster settings
118 """
119 return self.task('omegaml.tasks.omega_settings').delay().get()
120
121 def ping(self):
122 return DaskTask(daskhello, self.client, pure=False)