You're reading the documentation for a development version.
For the latest released version, please have a look at
release/0.17.0.
Source code for omegaml.runtimes.proxies.jobproxy
1from __future__ import absolute_import
2
3import logging
4
5from omegaml.runtimes.proxies.baseproxy import RuntimeProxyBase
6
7logger = logging.getLogger(__file__)
8
9
[docs]
10class OmegaJobProxy(RuntimeProxyBase):
11 """
12 proxy to a remote job in a celery worker
13
14 Usage:
15
16 .. code::
17
18 om = Omega()
19 # result is AsyncResult, use .get() to return it's result
20 result = om.runtime.job('foojob').run()
21 result.get()
22
23 # result is AsyncResult, use .get() to return it's result
24 result = om.runtime.job('foojob').schedule()
25 result.get()
26 """
27
28 def __init__(self, jobname, runtime=None):
29 super().__init__(jobname, runtime=runtime, store=runtime.omega.jobs)
30 self.jobname = jobname
31
[docs]
32 def run(self, timeout=None, **kwargs):
33 """
34 submit the job for immediate execution
35
36 Args:
37 timeout (int): optional, timeout in seconds
38 **kwargs: kwargs to CeleryTask.delay
39
40 Returns:
41 AsyncResult
42 """
43 job_run = self.runtime.task('omegaml.notebook.tasks.run_omegaml_job')
44 return job_run.delay(self.jobname, timeout=timeout, **kwargs)
45
[docs]
46 def schedule(self, **kwargs):
47 """
48 schedule the job for repeated execution
49
50 Args:
51 **kwargs: see OmegaJob.schedule()
52 """
53 job_run = self.runtime.task('omegaml.notebook.tasks.schedule_omegaml_job')
54 return job_run.delay(self.jobname, **kwargs)
55
56 @property
57 def _mixins(self):
58 return self.runtime.omega.defaults.OMEGA_JOBPROXY_MIXINS