1import pandas as pd
2import logging
3
4# code to insert into first and last cell of generated jobs
5init_cell_code = """
6## generated by omegaml-nbtasks
7import omegaml as om
8_jobdata = {job}
9om.runtime.job(_jobdata['task_name'])._mark_status('pending')
10"""
11done_cell_code = """
12## generated by omegaml-nbtasks
13om.runtime.job(_jobdata['task_name'])._mark_status('finished')
14"""
15
16logger = logging.getLogger(__name__)
17
[docs]
18class JobTasks:
19 """
20 example notebook task runner using omegaml runtime
21
22 Parallel execution of notebooks with parameters on the omegaml runtime
23
24 Usage::
25
26 # submit tasks
27 # -- creates 10 tasks as copies of the main notebook in om.jobs.list('tasks')
28 # -- runs each task using omegaml runtime
29 # -- every .map() call generates a new task group, ensuring unique notebook names
30 # -- the original notebook is not changed
31 job = om.runtime.job('main')
32 job.map(range(10))
33
34 # check status
35 job.status()
36
37 # restart tasks that did not produce a result yet
38 job.restart()
39
40 # get the list of all notebooks created in one .map() call
41 job.list()
42
43 # get the list of all notebooks in any .map() call
44 job.list(task_group='*')
45
46 # example notebook
47 job = globals().get('job', dict(param=my_default_value))
48
49 def calculation(job):
50 # job is a dict with job['param'] set to one value taken from .map(<jobs>)
51 # job contains other keys to identify the task:
52 # job_id: a sequence number
53 # task_group: the group of tasks submitted in one .map() call
54 # task_name: the om.jobs name of the running notebook
55 # status: the status, pending or finished
56 # task_id: the celery task id, if available
57 ... <insert your code here>
58
59 # run the calculation
60 # -- in each task, job will be one of the variables given in nbtasks.map(<jobs>)
61 # <jobs> is an iterable, returning one object for each job
62 # note the job must be serializable. if you need something more complex, pass
63 # the name of an omegaml dataset and possibly some query criteria
64 calculation(job)
65 """
66
67 def _init_mixin(self, *args, **kwargs):
68 self.task_group = None
69
[docs]
70 def map(self, jobs, job_ids=None, require=None, reset=False, task_group=None):
71 """
72 Generate any number of parallel jobs executed through om.runtime
73
74 Args:
75 jobs (iterable): an iterable to yield job parameters, each parameter
76 can be any native object or container (e.g. int, float, dict, list)
77 job_ids (str): optional, list of job ids. If passed, the
78 job id will be used to name the task id, else
79 it is the current count
80 require (str): optional, passed on to om.runtime.require()
81 reset (bool): optional, if True will resubmit finished jobs
82 task_group (str): optional, a specified task group, if not given
83 will generate a unique id
84
85 Returns:
86 list of started tasks
87 """
88 nbname = self.jobname
89 # generate metadata
90 self._generate_jobs(nbname, jobs, job_ids=job_ids, task_group=task_group)
91 tasks = self.restart(require=require, reset=reset, task_group=task_group)
92 return tasks
93
[docs]
94 def list(self, task_group=None, **kwargs):
95 """
96 List all generated tasks for this map call
97
98 Args:
99 task_group (str): optional, the task group
100 **kwargs (kwargs): kwargs to om.jobs.list()
101
102 Returns:
103 list of tasks generated for map call
104 """
105 om = self.runtime.omega
106 task_group = task_group or self.task_group
107 nbname = self.jobname
108 nbname += f'/{task_group}' if task_group else ''
109 return om.jobs.list(f'tasks/{nbname}*', **kwargs)
110
[docs]
111 def restart(self, task_group=None, reset=False, require=None):
112 """
113 Run notebook for every entry in tasks/ with no result
114
115 For every entry in om.jobs.list('tasks/*') will check if
116 there is a result yet. If not, will call om.runtime.job().run()
117 for the given notebook.
118
119 Usage::
120
121 generate_jobs(...)
122 restart()
123
124 This will start all jobs that do not have a result yet. To
125 start the jobs even if there is a result already, set reset=True.
126
127 Notes:
128
129 ``metadata.attributes['job']`` will record the task status::
130
131 * ``task_id``: the celery task id
132 * ``status``: task status, initialized to PENDING
133 """
134 om = self.runtime.omega
135 nbname = self.jobname
136 nbname += f'/{task_group}' if task_group else ''
137 tasks_nb = om.jobs.list(f'tasks/{nbname}*')
138 tasks = []
139 for nb in tasks_nb:
140 results = om.jobs.list(f'results/{nb}*')
141 if not results or reset:
142 # generate meta data before running
143 meta = om.jobs.metadata(nb)
144 job = meta.attributes['job']
145 job['status'] = 'pending'
146 meta.save()
147 # run
148 task_rt = om.runtime.require(require).job(nb).run()
149 logger.info(f'started {nb} => {task_rt}')
150 # update metadata to keep track
151 meta = om.jobs.metadata(nb)
152 job = meta.attributes['job']
153 job['task_id'] = task_rt.id
154 meta.save()
155 tasks.append(meta)
156 else:
157 logger.info(f"{nb} has already got results")
158 return tasks
159
[docs]
160 def status(self, task_group=None):
161 """
162 get the status and the celery id of of each task
163
164 Args:
165 task_group (str): the task group id, defaults to None
166
167 Returns::
168
169 pd.DataFrame
170 name: task name
171 task_id: runtime task id
172 status: final status, PENDING, RUNNING, SUCCESS, FAILURE
173 run_status: current status, PENDING, RUNNING, SUCCESS, FAILURE
174 """
175 om = self.runtime.omega
176 AsyncResult = self.runtime.celeryapp.AsyncResult
177 task_group = task_group or self.task_group
178 nbname = self.jobname
179 nbname += f'/{task_group}' if task_group else ''
180 tasks_nb = om.jobs.list(f'tasks/{nbname}*')
181 stats = []
182 for nb in tasks_nb:
183 meta = om.jobs.metadata(nb)
184 task_id = meta.attributes['job'].get('task_id')
185 status = AsyncResult(task_id).status if task_id else 'invalid'
186 job_runs = meta.attributes.get('job_runs')
187 if job_runs:
188 run_status = job_runs[-1]['status'] if job_runs else '(waiting)'
189 else:
190 run_status = 'unknown'
191 stats.append((nb, task_id, status, run_status))
192 if not tasks_nb:
193 logger.info("there are no tasks")
194 return pd.DataFrame(stats, columns=['name', 'task_id', 'status', 'run_status'])
195
196 def _generate_jobs(self, nb, jobs, job_ids=None, task_group=None):
197 """
198 From a notebook, generate a task notebook, parametrized to the jobs arg
199
200 For every job spec in jobs, will generate a copy of the given notebook
201 as tasks/{nb}/{task_group}-{id}. The task_group can be set arbitrary or
202 be generated as a unique id.
203
204 Once a task is executed, the results will be stored in a new notebook,
205 in results/tasks/{nb}/{task_group}-{id}. This contains the full execution
206 trace, including graphs and logs of the notebook, and can be accessed
207 from Python or Jupyter.
208
209 Usage::
210
211 generate_jobs('main', range(10))
212
213 This will create 10 copies of main, storing each
214 in the tasks folder.
215
216 Use restart() to start the tasks on the om runtime
217
218 Args:
219 nb (str): the name of the notebook
220 jobs (iter): an interable of objects to pass as a job
221 job_ids (list): optional, list of job ids. If passed, the
222 job id will be used to name the task id, else
223 it is the current count
224 """
225 from nbformat import v4 as nbv4
226
227 om = self.runtime.omega
228 job_ids = list(job_ids) if job_ids is not None else None
229 self.task_group = task_group = task_group or self._make_task_group()
230 tasks = []
231 om.jobs.create('#do not delete', 'results/.placeholder')
232 om.jobs.create('#do not delete', 'tasks/.placeholder')
233 om.jobs.create('#do not delete', 'results/tasks/.placeholder')
234 for i, job in enumerate(jobs):
235 main_nb = om.jobs.get(nb)
236 job_id = i if job_ids is None else job_ids[i]
237 task_name = f'tasks/{nb}/{task_group}-{job_id}'
238 logger.info(f'generating task {task_name}')
239 if om.jobs.metadata(task_name):
240 om.jobs.drop(task_name, force=True)
241 # store setup of omegaml in main.nb
242 job = dict(param=job,
243 job_id=job_id,
244 task_group=task_group,
245 task_name=task_name)
246 valid_auth = om.runtime.auth and om.runtime.auth.userid
247 auth = (om.runtime.auth.__dict__
248 if valid_auth else dict(userid='', apikey=''))
249 code = init_cell_code.format(job=job,
250 **auth)
251 init_cell = nbv4.new_code_cell(source=code)
252 done_cell = nbv4.new_code_cell(source=done_cell_code)
253 main_nb['cells'].insert(0, init_cell)
254 main_nb['cells'].append(done_cell)
255 task_meta = om.jobs.put(main_nb, task_name, attributes={'job': job})
256 tasks.append(task_meta)
257
258 def _make_task_group(self, max_idlen=8):
259 from hashlib import md5
260 from uuid import uuid4
261
262 om = self.runtime.omega
263 nbname = self.jobname
264 # SEC: CWE-916
265 # - status: wontfix
266 # - reason: hashcode is used purely for name resolution, not a security function
267 value = md5(uuid4().bytes).hexdigest()
268 existing = ','.join(om.jobs.list(f'tasks/{nbname}*'))
269 while True:
270 candidate = value[0:max_idlen]
271 if candidate not in existing:
272 break
273 return candidate
274
275 def _mark_status(self, status):
276 om = self.runtime.omega
277 task_name = self.jobname or '__testing__'
278 if task_name == '__testing__':
279 logger.info("warning: test run, not recording status")
280 return
281 meta = om.jobs.metadata(task_name)
282 job_meta = meta.attributes['job']
283 job_meta['status'] = status
284 meta.save()
285
286
287JobTasks.map.__doc__ = JobTasks.__doc__