Source code for omegaml.runtimes.loky

 1from tqdm import tqdm
 2import joblib
 3
 4LokyBackend = joblib.parallel.BACKENDS['loky']
 5
 6
[docs] 7class OmegaRuntimeBackend(LokyBackend): 8 """ 9 omega custom parallel backend to print progress 10 11 TODO: extend for celery dispatching 12 """ 13 def __init__(self, *args, **kwargs): 14 self._tqdm = None 15 self._job_count = kwargs.pop('n_tasks', None) 16 self._verbose = kwargs.pop('verbose', True) 17 import multiprocessing as mp 18 # get LokyBackend to run in Celery, see LokyBackend.effective_n_jobs 19 # TODO replace mp with billiard 20 mp.current_process().daemon = False 21 super().__init__(*args, **kwargs) 22
[docs] 23 def start_call(self): 24 if self._verbose: 25 self.tqdm = tqdm(total=self._job_count, unit='tasks') 26 self._orig_print_progress = self.parallel.print_progress 27 self.parallel.print_progress = self.update_progress
28 29 def update_progress(self): 30 try: 31 self.tqdm.update(1) if self._verbose else None 32 except: 33 pass 34
[docs] 35 def stop_call(self): 36 try: 37 self.tqdm.close() 38 except: 39 pass
40
[docs] 41 def terminate(self): 42 try: 43 self.tqdm.close() 44 except: 45 pass 46 finally: 47 super().terminate()
48 49#: register joblib parallel omegaml backend 50joblib.register_parallel_backend('omegaml', OmegaRuntimeBackend) 51