Source code for omegaml.runtimes.proxies.jobproxy

 1from __future__ import absolute_import
 2
 3import logging
 4
 5from omegaml.runtimes.proxies.baseproxy import RuntimeProxyBase
 6
 7logger = logging.getLogger(__file__)
 8
 9
[docs] 10class OmegaJobProxy(RuntimeProxyBase): 11 """ 12 proxy to a remote job in a celery worker 13 14 Usage: 15 16 .. code:: 17 18 om = Omega() 19 # result is AsyncResult, use .get() to return it's result 20 result = om.runtime.job('foojob').run() 21 result.get() 22 23 # result is AsyncResult, use .get() to return it's result 24 result = om.runtime.job('foojob').schedule() 25 result.get() 26 """ 27 28 def __init__(self, jobname, runtime=None): 29 super().__init__(jobname, runtime=runtime, store=runtime.omega.jobs) 30 self.jobname = jobname 31
[docs] 32 def run(self, timeout=None, **kwargs): 33 """ 34 submit the job for immediate execution 35 36 Args: 37 timeout (int): optional, timeout in seconds 38 **kwargs: kwargs to CeleryTask.delay 39 40 Returns: 41 AsyncResult 42 """ 43 job_run = self.runtime.task('omegaml.notebook.tasks.run_omegaml_job') 44 return job_run.delay(self.jobname, timeout=timeout, **kwargs)
45
[docs] 46 def schedule(self, **kwargs): 47 """ 48 schedule the job for repeated execution 49 50 Args: 51 **kwargs: see OmegaJob.schedule() 52 """ 53 job_run = self.runtime.task('omegaml.notebook.tasks.schedule_omegaml_job') 54 return job_run.delay(self.jobname, **kwargs)
55 56 @property 57 def _mixins(self): 58 return self.runtime.omega.defaults.OMEGA_JOBPROXY_MIXINS