Source code for omegaml.runtimes.mixins.nbtasks

  1import pandas as pd
  2import logging
  3
  4# code to insert into first and last cell of generated jobs
  5init_cell_code = """
  6## generated by omegaml-nbtasks
  7import omegaml as om 
  8_jobdata = {job}
  9om.runtime.job(_jobdata['task_name'])._mark_status('pending')
 10"""
 11done_cell_code = """
 12## generated by omegaml-nbtasks
 13om.runtime.job(_jobdata['task_name'])._mark_status('finished')
 14"""
 15
 16logger = logging.getLogger(__name__)
 17
[docs] 18class JobTasks: 19 """ 20 example notebook task runner using omegaml runtime 21 22 Parallel execution of notebooks with parameters on the omegaml runtime 23 24 Usage:: 25 26 # submit tasks 27 # -- creates 10 tasks as copies of the main notebook in om.jobs.list('tasks') 28 # -- runs each task using omegaml runtime 29 # -- every .map() call generates a new task group, ensuring unique notebook names 30 # -- the original notebook is not changed 31 job = om.runtime.job('main') 32 job.map(range(10)) 33 34 # check status 35 job.status() 36 37 # restart tasks that did not produce a result yet 38 job.restart() 39 40 # get the list of all notebooks created in one .map() call 41 job.list() 42 43 # get the list of all notebooks in any .map() call 44 job.list(task_group='*') 45 46 # example notebook 47 job = globals().get('job', dict(param=my_default_value)) 48 49 def calculation(job): 50 # job is a dict with job['param'] set to one value taken from .map(<jobs>) 51 # job contains other keys to identify the task: 52 # job_id: a sequence number 53 # task_group: the group of tasks submitted in one .map() call 54 # task_name: the om.jobs name of the running notebook 55 # status: the status, pending or finished 56 # task_id: the celery task id, if available 57 ... <insert your code here> 58 59 # run the calculation 60 # -- in each task, job will be one of the variables given in nbtasks.map(<jobs>) 61 # <jobs> is an iterable, returning one object for each job 62 # note the job must be serializable. if you need something more complex, pass 63 # the name of an omegaml dataset and possibly some query criteria 64 calculation(job) 65 """ 66 67 def _init_mixin(self, *args, **kwargs): 68 self.task_group = None 69
[docs] 70 def map(self, jobs, job_ids=None, require=None, reset=False, task_group=None): 71 """ 72 Generate any number of parallel jobs executed through om.runtime 73 74 Args: 75 jobs (iterable): an iterable to yield job parameters, each parameter 76 can be any native object or container (e.g. int, float, dict, list) 77 job_ids (str): optional, list of job ids. If passed, the 78 job id will be used to name the task id, else 79 it is the current count 80 require (str): optional, passed on to om.runtime.require() 81 reset (bool): optional, if True will resubmit finished jobs 82 task_group (str): optional, a specified task group, if not given 83 will generate a unique id 84 85 Returns: 86 list of started tasks 87 """ 88 nbname = self.jobname 89 # generate metadata 90 self._generate_jobs(nbname, jobs, job_ids=job_ids, task_group=task_group) 91 tasks = self.restart(require=require, reset=reset, task_group=task_group) 92 return tasks
93
[docs] 94 def list(self, task_group=None, **kwargs): 95 """ 96 List all generated tasks for this map call 97 98 Args: 99 task_group (str): optional, the task group 100 **kwargs (kwargs): kwargs to om.jobs.list() 101 102 Returns: 103 list of tasks generated for map call 104 """ 105 om = self.runtime.omega 106 task_group = task_group or self.task_group 107 nbname = self.jobname 108 nbname += f'/{task_group}' if task_group else '' 109 return om.jobs.list(f'tasks/{nbname}*', **kwargs)
110
[docs] 111 def restart(self, task_group=None, reset=False, require=None): 112 """ 113 Run notebook for every entry in tasks/ with no result 114 115 For every entry in om.jobs.list('tasks/*') will check if 116 there is a result yet. If not, will call om.runtime.job().run() 117 for the given notebook. 118 119 Usage:: 120 121 generate_jobs(...) 122 restart() 123 124 This will start all jobs that do not have a result yet. To 125 start the jobs even if there is a result already, set reset=True. 126 127 Notes: 128 129 ``metadata.attributes['job']`` will record the task status:: 130 131 * ``task_id``: the celery task id 132 * ``status``: task status, initialized to PENDING 133 """ 134 om = self.runtime.omega 135 nbname = self.jobname 136 nbname += f'/{task_group}' if task_group else '' 137 tasks_nb = om.jobs.list(f'tasks/{nbname}*') 138 tasks = [] 139 for nb in tasks_nb: 140 results = om.jobs.list(f'results/{nb}*') 141 if not results or reset: 142 # generate meta data before running 143 meta = om.jobs.metadata(nb) 144 job = meta.attributes['job'] 145 job['status'] = 'pending' 146 meta.save() 147 # run 148 task_rt = om.runtime.require(require).job(nb).run() 149 logger.info(f'started {nb} => {task_rt}') 150 # update metadata to keep track 151 meta = om.jobs.metadata(nb) 152 job = meta.attributes['job'] 153 job['task_id'] = task_rt.id 154 meta.save() 155 tasks.append(meta) 156 else: 157 logger.info(f"{nb} has already got results") 158 return tasks
159
[docs] 160 def status(self, task_group=None): 161 """ 162 get the status and the celery id of of each task 163 164 Args: 165 task_group (str): the task group id, defaults to None 166 167 Returns:: 168 169 pd.DataFrame 170 name: task name 171 task_id: runtime task id 172 status: final status, PENDING, RUNNING, SUCCESS, FAILURE 173 run_status: current status, PENDING, RUNNING, SUCCESS, FAILURE 174 """ 175 om = self.runtime.omega 176 AsyncResult = self.runtime.celeryapp.AsyncResult 177 task_group = task_group or self.task_group 178 nbname = self.jobname 179 nbname += f'/{task_group}' if task_group else '' 180 tasks_nb = om.jobs.list(f'tasks/{nbname}*') 181 stats = [] 182 for nb in tasks_nb: 183 meta = om.jobs.metadata(nb) 184 task_id = meta.attributes['job'].get('task_id') 185 status = AsyncResult(task_id).status if task_id else 'invalid' 186 job_runs = meta.attributes.get('job_runs') 187 if job_runs: 188 run_status = job_runs[-1]['status'] if job_runs else '(waiting)' 189 else: 190 run_status = 'unknown' 191 stats.append((nb, task_id, status, run_status)) 192 if not tasks_nb: 193 logger.info("there are no tasks") 194 return pd.DataFrame(stats, columns=['name', 'task_id', 'status', 'run_status'])
195 196 def _generate_jobs(self, nb, jobs, job_ids=None, task_group=None): 197 """ 198 From a notebook, generate a task notebook, parametrized to the jobs arg 199 200 For every job spec in jobs, will generate a copy of the given notebook 201 as tasks/{nb}/{task_group}-{id}. The task_group can be set arbitrary or 202 be generated as a unique id. 203 204 Once a task is executed, the results will be stored in a new notebook, 205 in results/tasks/{nb}/{task_group}-{id}. This contains the full execution 206 trace, including graphs and logs of the notebook, and can be accessed 207 from Python or Jupyter. 208 209 Usage:: 210 211 generate_jobs('main', range(10)) 212 213 This will create 10 copies of main, storing each 214 in the tasks folder. 215 216 Use restart() to start the tasks on the om runtime 217 218 Args: 219 nb (str): the name of the notebook 220 jobs (iter): an interable of objects to pass as a job 221 job_ids (list): optional, list of job ids. If passed, the 222 job id will be used to name the task id, else 223 it is the current count 224 """ 225 from nbformat import v4 as nbv4 226 227 om = self.runtime.omega 228 job_ids = list(job_ids) if job_ids is not None else None 229 self.task_group = task_group = task_group or self._make_task_group() 230 tasks = [] 231 om.jobs.create('#do not delete', 'results/.placeholder') 232 om.jobs.create('#do not delete', 'tasks/.placeholder') 233 om.jobs.create('#do not delete', 'results/tasks/.placeholder') 234 for i, job in enumerate(jobs): 235 main_nb = om.jobs.get(nb) 236 job_id = i if job_ids is None else job_ids[i] 237 task_name = f'tasks/{nb}/{task_group}-{job_id}' 238 logger.info(f'generating task {task_name}') 239 if om.jobs.metadata(task_name): 240 om.jobs.drop(task_name, force=True) 241 # store setup of omegaml in main.nb 242 job = dict(param=job, 243 job_id=job_id, 244 task_group=task_group, 245 task_name=task_name) 246 valid_auth = om.runtime.auth and om.runtime.auth.userid 247 auth = (om.runtime.auth.__dict__ 248 if valid_auth else dict(userid='', apikey='')) 249 code = init_cell_code.format(job=job, 250 **auth) 251 init_cell = nbv4.new_code_cell(source=code) 252 done_cell = nbv4.new_code_cell(source=done_cell_code) 253 main_nb['cells'].insert(0, init_cell) 254 main_nb['cells'].append(done_cell) 255 task_meta = om.jobs.put(main_nb, task_name, attributes={'job': job}) 256 tasks.append(task_meta) 257 258 def _make_task_group(self, max_idlen=8): 259 from hashlib import md5 260 from uuid import uuid4 261 262 om = self.runtime.omega 263 nbname = self.jobname 264 # SEC: CWE-916 265 # - status: wontfix 266 # - reason: hashcode is used purely for name resolution, not a security function 267 value = md5(uuid4().bytes).hexdigest() 268 existing = ','.join(om.jobs.list(f'tasks/{nbname}*')) 269 while True: 270 candidate = value[0:max_idlen] 271 if candidate not in existing: 272 break 273 return candidate 274 275 def _mark_status(self, status): 276 om = self.runtime.omega 277 task_name = self.jobname or '__testing__' 278 if task_name == '__testing__': 279 logger.info("warning: test run, not recording status") 280 return 281 meta = om.jobs.metadata(task_name) 282 job_meta = meta.attributes['job'] 283 job_meta['status'] = status 284 meta.save()
285 286 287JobTasks.map.__doc__ = JobTasks.__doc__