You're reading an old version of this documentation.
If you want up-to-date information, please have a look at
release/0.18.0.
Source code for omegaml.runtimes.loky
1from tqdm import tqdm
2import joblib
3
4LokyBackend = joblib.parallel.BACKENDS['loky']
5
6
[docs]
7class OmegaRuntimeBackend(LokyBackend):
8 """
9 omega custom parallel backend to print progress
10
11 TODO: extend for celery dispatching
12 """
13 def __init__(self, *args, **kwargs):
14 self._tqdm = None
15 self._job_count = kwargs.pop('n_tasks', None)
16 self._verbose = kwargs.pop('verbose', True)
17 import multiprocessing as mp
18 # get LokyBackend to run in Celery, see LokyBackend.effective_n_jobs
19 # TODO replace mp with billiard
20 mp.current_process().daemon = False
21 super().__init__(*args, **kwargs)
22
[docs]
23 def start_call(self):
24 if self._verbose:
25 self.tqdm = tqdm(total=self._job_count, unit='tasks')
26 self._orig_print_progress = self.parallel.print_progress
27 self.parallel.print_progress = self.update_progress
28
29 def update_progress(self):
30 try:
31 self.tqdm.update(1) if self._verbose else None
32 except:
33 pass
34
[docs]
35 def stop_call(self):
36 try:
37 self.tqdm.close()
38 except:
39 pass
40
[docs]
41 def terminate(self):
42 try:
43 self.tqdm.close()
44 except:
45 pass
46 finally:
47 super().terminate()
48
49#: register joblib parallel omegaml backend
50joblib.register_parallel_backend('omegaml', OmegaRuntimeBackend)
51