Source code for omegaml.notebook.jobs

  1from __future__ import absolute_import
  2
  3import datetime
  4import gridfs
  5import re
  6import yaml
  7from croniter import croniter
  8from io import StringIO, BytesIO
  9from jupyter_client import AsyncKernelManager
 10from nbconvert.preprocessors import ClearOutputPreprocessor
 11from nbconvert.preprocessors.execute import ExecutePreprocessor
 12from nbformat import read as nbread, write as nbwrite, v4 as nbv4
 13from uuid import uuid4
 14
 15from omegaml.backends.basecommon import BackendBaseCommon
 16from omegaml.documents import MDREGISTRY
 17from omegaml.notebook.jobschedule import JobSchedule
 18from omegaml.store import OmegaStore
 19from omegaml.util import settings as omega_settings
 20
 21
[docs] 22class OmegaJobs(BackendBaseCommon): 23 """ 24 Omega Jobs API 25 """ 26 27 # TODO this class should be a proper backend class with a mixin for ipynb 28 29 _nb_config_magic = 'omega-ml', 'schedule', 'run-at', 'cron' 30 _dir_placeholder = '_placeholder.ipynb' 31 32 def __init__(self, bucket=None, prefix=None, store=None, defaults=None): 33 self.defaults = defaults or omega_settings() 34 self.prefix = prefix or (store.prefix if store else None) or 'jobs' 35 self.store = store or OmegaStore(prefix=prefix, bucket=bucket, defaults=defaults) 36 self.kind = MDREGISTRY.OMEGAML_JOBS 37 self._include_dir_placeholder = True 38 # convenience so you can do om.jobs.schedule(..., run_at=om.jobs.Schedule(....)) 39 self.Schedule = JobSchedule 40 41 def __repr__(self): 42 return 'OmegaJobs(store={})'.format(self.store.__repr__()) 43 44 @property 45 def _db(self): 46 return self.store.mongodb 47 48 @property 49 def _fs(self): 50 return self.store.fs 51 52 def collection(self, name): 53 if not name.endswith('.ipynb'): 54 name += '.ipynb' 55 return self.store.collection(name) 56
[docs] 57 def drop(self, name, force=False): 58 """ remove the notebook 59 60 Args: 61 name (str): the name of the notebook 62 force (bool): if True does not raise 63 64 Returns: 65 True if object was deleted, False if not. 66 If force is True and the object does not exist it will still return True 67 68 Raises: 69 DoesNotExist if the notebook does not exist and ```force=False``` 70 """ 71 meta = self.metadata(name) 72 name = meta.name if meta is not None else name 73 return self.store.drop(name, force=force)
74
[docs] 75 def metadata(self, name, **kwargs): 76 """ retrieve metadata of a notebook 77 78 Args: 79 name (str): the name of the notebook 80 81 Returns: 82 Metadata 83 """ 84 meta = self.store.metadata(name, **kwargs) 85 if meta is None and not name.endswith('.ipynb'): 86 name += '.ipynb' 87 meta = self.store.metadata(name) 88 return meta
89
[docs] 90 def exists(self, name): 91 """ check if the notebook exists 92 93 Args: 94 name (str): the name of the notebook 95 96 Returns: 97 Metadata 98 """ 99 return len(self.store.list(name)) + len(self.store.list(name + '.ipynb')) > 0
100
[docs] 101 def put(self, obj, name, attributes=None): 102 """ store a notebook job 103 104 Args: 105 obj (str|NotebookNode): the notebook object or the notebook's code as a string 106 name (str): the name of the job 107 attributes (dict): the attributes to store with the job 108 109 Returns: 110 Metadata 111 """ 112 if not name.endswith('.ipynb'): 113 name += '.ipynb' 114 if isinstance(obj, str): 115 return self.create(obj, name) 116 sbuf = StringIO() 117 bbuf = BytesIO() 118 # nbwrite expects string, fs.put expects bytes 119 nbwrite(obj, sbuf, version=4) 120 sbuf.seek(0) 121 bbuf.write(sbuf.getvalue().encode('utf8')) 122 bbuf.seek(0) 123 # see if we have a file already, if so replace the gridfile 124 meta = self.store.metadata(name) 125 if not meta: 126 filename = uuid4().hex 127 fileid = self._store_to_file(self.store, bbuf, filename) 128 meta = self.store._make_metadata(name=name, 129 prefix=self.store.prefix, 130 bucket=self.store.bucket, 131 kind=self.kind, 132 attributes=attributes, 133 gridfile=fileid) 134 meta = meta.save() 135 else: 136 filename = uuid4().hex 137 meta.gridfile = self._store_to_file(self.store, bbuf, filename) 138 meta = meta.save() 139 # set config 140 nb_config = self.get_notebook_config(name) 141 meta_config = meta.attributes.get('config', {}) 142 if nb_config: 143 meta_config.update(dict(**nb_config)) 144 meta.attributes['config'] = meta_config 145 meta = meta.save() 146 if not name.startswith('results') and ('run-at' in meta_config): 147 meta = self.schedule(name) 148 return meta
149
[docs] 150 def get(self, name): 151 """ 152 Retrieve a notebook and return a NotebookNode 153 """ 154 if not name.endswith('.ipynb'): 155 name += '.ipynb' 156 meta = self.store.metadata(name) 157 if meta: 158 try: 159 outf = meta.gridfile 160 except gridfs.errors.NoFile as e: 161 raise e 162 # nbwrite wants a string, outf is bytes 163 sbuf = StringIO() 164 data = outf.read() 165 if data is None: 166 msg = 'Expected content in {name}, got None'.format(**locals()) 167 raise ValueError(msg) 168 sbuf.write(data.decode('utf8')) 169 sbuf.seek(0) 170 nb = nbread(sbuf, as_version=4) 171 return nb 172 else: 173 raise gridfs.errors.NoFile( 174 ">{0}< does not exist in jobs bucket '{1}'".format( 175 name, self.store.bucket))
176
[docs] 177 def create(self, code, name): 178 """ 179 create a notebook from code 180 181 :param code: the code as a string 182 :param name: the name of the job to create 183 :return: the metadata object created 184 """ 185 cells = [] 186 cells.append(nbv4.new_code_cell(source=code)) 187 notebook = nbv4.new_notebook(cells=cells) 188 # put the notebook 189 meta = self.put(notebook, name) 190 return meta
191 192 def get_fs(self, collection=None): 193 # legacy support 194 return self._fs 195
[docs] 196 def get_collection(self, collection): 197 """ 198 returns the collection object 199 """ 200 # FIXME this should use store.collection 201 return getattr(self.store.mongodb, collection)
202
[docs] 203 def list(self, pattern=None, regexp=None, raw=False, **kwargs): 204 """ 205 list all jobs matching filter. 206 filter is a regex on the name of the ipynb entry. 207 The default is all, i.e. `.*` 208 """ 209 job_list = self.store.list(pattern=pattern, regexp=regexp, raw=raw, **kwargs) 210 name = lambda v: v.name if raw else v 211 if not self._include_dir_placeholder: 212 job_list = [v for v in job_list if not name(v).endswith(self._dir_placeholder)] 213 return job_list
214
[docs] 215 def get_notebook_config(self, nb_filename): 216 """ 217 returns the omegaml script config on the notebook's first cell 218 219 The config cell is in any of the following formats: 220 221 *Option 1*:: 222 223 # omega-ml: 224 # run-at: "<cron schedule>" 225 226 *Option 2*:: 227 228 # omega-ml: 229 # schedule: "weekday(s), hour[, month]" 230 231 *Option 3*:: 232 233 # omega-ml: 234 # cron: "<cron schedule>" 235 236 You may optionally specify only ``run-at``, ``schedule`` or ``cron``, 237 i.e. without the ``omega-ml`` header:: 238 239 # run-at: "<cron schedule>" 240 # cron: "<cron schedule>" 241 # schedule: "weekday(s), hour[, month]" 242 243 Args: 244 nb_filename (str): the name of the notebook 245 246 Returns: 247 config(dict) => ``{ 'run-at': <specifier> }`` 248 249 Raises: 250 ValueError, if there is no config cell or the config cell is invalid 251 252 See Also: 253 * JobSchedule 254 """ 255 notebook = self.get(nb_filename) 256 config_cell = None 257 config_magic = ['# {}'.format(kw) for kw in self._nb_config_magic] 258 for cell in notebook.get('cells'): 259 if any(cell.source.startswith(kw) for kw in config_magic): 260 config_cell = cell 261 if not config_cell: 262 return {} 263 yaml_conf = '\n'.join( 264 [re.sub('#', '', x, 1) for x in str( 265 config_cell.source).splitlines()]) 266 try: 267 yaml_conf = yaml.safe_load(yaml_conf) 268 config = yaml_conf.get(self._nb_config_magic[0], yaml_conf) 269 except Exception: 270 raise ValueError( 271 'Notebook configuration cannot be parsed') 272 # translate config to canonical form 273 # TODO refactor to seperate method / mapped translation functions 274 if 'schedule' in config: 275 config['run-at'] = JobSchedule(text=config.get('schedule', '')).cron 276 if 'cron' in config: 277 config['run-at'] = JobSchedule.from_cron(config.get('cron')).cron 278 if 'run-at' in config: 279 config['run-at'] = JobSchedule.from_cron(config.get('run-at')).cron 280 return config
281
[docs] 282 def run(self, name, event=None, timeout=None): 283 """ 284 Run a job immediately 285 286 The job is run and the results are stored in om.jobs('results/name <timestamp>') and 287 the result's Metadata is returned. 288 289 ``Metadata.attributes`` of the original job as given by name is updated: 290 291 * ``attributes['job_runs']`` (list) - list of status of each run. Status is 292 a dict as below 293 * ``attributes['job_results']`` (list) - list of results job names in same 294 index-order as job_runs 295 * ``attributes['trigger']`` (list) - list of triggers 296 297 The status of each job run is a dict with keys: 298 299 * ``status`` (str): the status of the job run, OK or ERROR 300 * ``ts`` (datetime): time of execution 301 * ``message`` (str): error mesasge in case of ERROR, else blank 302 * ``results`` (str): name of results in case of OK, else blank 303 304 Usage:: 305 306 # directly (sync) 307 meta = om.jobs.run('mynb') 308 309 # via runtime (async) 310 job = om.runtime.job('mynb') 311 result = job.run() 312 313 Args: 314 name (str): the name of the jobfile 315 event (str): an event name 316 timeout (int): timeout in seconds, None means no timeout 317 318 Returns: 319 Metadata of the results entry 320 321 See Also: 322 * OmegaJobs.run_notebook 323 """ 324 return self.run_notebook(name, event=event, timeout=timeout)
325
[docs] 326 def run_notebook(self, name, event=None, timeout=None): 327 """ run a given notebook immediately. 328 329 Args: 330 name (str): the name of the jobfile 331 event (str): an event name 332 timeout (int): timeout in seconds 333 334 Returns: 335 Metadata of results 336 337 See Also: 338 * nbconvert https://nbconvert.readthedocs.io/en/latest/execute_api.html 339 """ 340 notebook = self.get(name) 341 meta_job = self.metadata(name) 342 ts = datetime.datetime.now() 343 # execute kwargs 344 # -- see ExecuteProcessor class 345 # -- see https://nbconvert.readthedocs.io/en/latest/execute_api.html 346 ep_kwargs = { 347 # avoid timeouts to stop kernel 348 'timeout': timeout, 349 # avoid kernel at exit functions 350 # -- this stops ipykernel AttributeError 'send_multipart' 351 'shutdown_kernel': 'immediate', 352 # set kernel name, blank is default 353 # -- e.g. python3, ir 354 # -- see https://stackoverflow.com/a/47053020/890242 355 'kernel_name': '', 356 # always use the async kernel manager 357 # -- see https://github.com/jupyter/nbconvert/issues/1964 358 'kernel_manager_class': AsyncKernelManager, 359 } 360 # overrides from metadata 361 ep_kwargs.update(meta_job.kind_meta.get('ep_kwargs', {})) 362 try: 363 resources = { 364 'metadata': { 365 'path': self.defaults.OMEGA_TMP, 366 } 367 } 368 if not meta_job.kind_meta.get('keep_output', False): 369 # https://nbconvert.readthedocs.io/en/latest/api/preprocessors.html 370 cp = ClearOutputPreprocessor() 371 cp.preprocess(notebook, resources) 372 ep = ExecutePreprocessor(**ep_kwargs) 373 ep.preprocess(notebook, resources) 374 except Exception as e: 375 status = 'ERROR' 376 message = str(e) 377 else: 378 status = 'OK' 379 message = '' 380 finally: 381 del ep 382 # record results 383 meta_results = self.put(notebook, 384 'results/{name}_{ts}'.format(**locals())) 385 meta_results.attributes['source_job'] = name 386 meta_results.save() 387 job_results = meta_job.attributes.get('job_results', []) 388 job_results.append(meta_results.name) 389 meta_job.attributes['job_results'] = job_results 390 # record final job status 391 job_runs = meta_job.attributes.get('job_runs', []) 392 runstate = { 393 'status': status, 394 'ts': ts, 395 'message': message, 396 'results': meta_results.name if status == 'OK' else None 397 } 398 job_runs.append(runstate) 399 meta_job.attributes['job_runs'] = job_runs 400 # set event run state if event was specified 401 if event: 402 attrs = meta_job.attributes 403 triggers = attrs['triggers'] = attrs.get('triggers', []) 404 scheduled = (trigger for trigger in triggers 405 if trigger['event-kind'] == 'scheduled') 406 for trigger in scheduled: 407 if event == trigger['event']: 408 trigger['status'] = status 409 trigger['ts'] = ts 410 meta_job.save() 411 return meta_results
412
[docs] 413 def schedule(self, nb_file, run_at=None, last_run=None): 414 """ 415 Schedule a processing of a notebook as per the interval 416 specified on the job script 417 418 Notes: 419 This updates the notebook's Metadata entry by adding the 420 next scheduled run in ``attributes['triggers']``` 421 422 Args: 423 nb_file (str): the name of the notebook 424 run_at (str|dict|JobSchedule): the schedule specified in a format 425 suitable for JobSchedule. If not specified, this value is 426 extracted from the first cell of the notebook 427 last_run (datetime): the last time this job was run, use this to 428 reschedule the job for the next run. Defaults to the last 429 timestamp listed in ``attributes['job_runs']``, or datetime.utcnow() 430 if no previous run exists. 431 432 See Also: 433 * croniter.get_next() 434 * JobSchedule 435 * OmegaJobs.get_notebook_config 436 * cron expression - https://en.wikipedia.org/wiki/Cron#CRON_expression 437 """ 438 meta = self.metadata(nb_file) 439 attrs = meta.attributes 440 # get/set run-at spec 441 config = attrs.get('config', {}) 442 # see what we have as a schedule 443 # -- a dictionary of JobSchedule 444 if isinstance(run_at, dict): 445 run_at = self.Schedule(**run_at).cron 446 # -- a cron string 447 elif isinstance(run_at, str): 448 run_at = self.Schedule(run_at).cron 449 # -- a JobSchedule 450 elif isinstance(run_at, self.Schedule): 451 run_at = run_at.cron 452 # -- nothing, may we have it on the job's config already 453 if not run_at: 454 interval = config.get('run-at') 455 else: 456 # store the new schedule 457 config['run-at'] = run_at 458 interval = run_at 459 if not interval: 460 # if we don't have a run-spec, return without scheduling 461 raise ValueError('no run-at specification found, cannot schedule') 462 # get last time the job was run 463 if last_run is None: 464 job_runs = attrs.get('job_runs') 465 if job_runs: 466 last_run = job_runs[-1]['ts'] 467 else: 468 last_run = datetime.datetime.utcnow() 469 # calculate next run time 470 iter_next = croniter(interval, last_run) 471 run_at = iter_next.get_next(datetime.datetime) 472 # store next scheduled run 473 triggers = attrs['triggers'] = attrs.get('triggers', []) 474 # set up a schedule event 475 scheduled_run = { 476 'event-kind': 'scheduled', 477 'event': run_at.isoformat(), 478 'run-at': run_at, 479 'status': 'PENDING' 480 } 481 # remove all pending triggers, add new triggers 482 past_triggers = [cur for cur in triggers if cur.get('status') != 'PENDING'] 483 triggers.clear() 484 triggers.extend(past_triggers) 485 triggers.append(scheduled_run) 486 attrs['config'] = config 487 return meta.save()
488
[docs] 489 def get_schedule(self, name, only_pending=False): 490 """ 491 return the cron schedule and corresponding triggers 492 493 Args: 494 name (str): the name of the job 495 496 Returns: 497 tuple of (run_at, triggers) 498 499 run_at (str): the cron spec, None if not scheduled 500 triggers (list): the list of triggers 501 """ 502 meta = self.metadata(name) 503 attrs = meta.attributes 504 config = attrs.get('config') 505 triggers = attrs.get('triggers', []) 506 if only_pending: 507 triggers = [trigger for trigger in triggers 508 if trigger['status'] == 'PENDING'] 509 if config and 'run-at' in config: 510 run_at = config.get('run-at') 511 else: 512 run_at = None 513 return run_at, triggers
514
[docs] 515 def drop_schedule(self, name): 516 """ 517 Drop an existing schedule, if any 518 519 This will drop any existing schedule and any pending triggers of 520 event-kind 'scheduled'. 521 522 Args: 523 name (str): the name of the job 524 525 Returns: 526 Metadata 527 """ 528 meta = self.metadata(name) 529 attrs = meta.attributes 530 config = attrs.get('config') 531 triggers = attrs.get('triggers') 532 if 'run-at' in config: 533 del config['run-at'] 534 for trigger in triggers: 535 if trigger['event-kind'] == 'scheduled' and trigger['status'] == 'PENDING': 536 trigger['status'] = 'CANCELLED' 537 return meta.save()
538
[docs] 539 def export(self, name, localpath='memory', format='html'): 540 """ 541 Export a job or result file to HTML 542 543 The job is exported in the given format. 544 545 :param name: the name of the job, as in jobs.get 546 :param localpath: the path of the local file to write. If you 547 specify an empty path or 'memory' a tuple of (body, resource) 548 is returned instead 549 :param format: the output format. currently only :code:`'html'` is supported 550 :return: the (data, resources) tuple as returned by nbconvert. For 551 format html data is the HTML's body, for PDF it is the pdf file contents 552 """ 553 from traitlets.config import Config 554 from nbconvert import SlidesExporter 555 from nbconvert.exporters.html import HTMLExporter 556 from nbconvert.exporters.pdf import PDFExporter 557 558 # https://nbconvert.readthedocs.io/en/latest/nbconvert_library.html 559 # (exporter class, filemode, config-values 560 EXPORTERS = { 561 'html': (HTMLExporter, '', {}), 562 'htmlbody': (HTMLExporter, '', {}), 563 'pdf': (PDFExporter, 'b', {}), 564 'slides': (SlidesExporter, '', {'RevealHelpPreprocessor.url_prefix': 565 'https://cdnjs.cloudflare.com/ajax/libs/reveal.js/3.6.0/'}), 566 } 567 # get exporter according to format 568 if format not in EXPORTERS: 569 raise ValueError('format {} is invalid. Choose one of {}'.format(format, EXPORTERS.keys())) 570 exporter_cls, fmode, configkw = EXPORTERS[format] 571 # prepare config 572 # http://nbconvert.readthedocs.io/en/latest/nbconvert_library.html#Using-different-preprocessors 573 c = Config() 574 for k, v in configkw.items(): 575 context, key = k.split('.') 576 setattr(c[context], key, v) 577 # get configured exporter 578 exporter = exporter_cls(config=c) 579 # get notebook, convert and store in file if requested 580 notebook = self.get(name) 581 (data, resources) = exporter.from_notebook_node(notebook) 582 if localpath and localpath != 'memory': 583 with open(localpath, 'w' + fmode) as fout: 584 fout.write(data) 585 return data, resources
586
[docs] 587 def help(self, name_or_obj=None, kind=None, raw=False): 588 """ get help for a notebook 589 590 Args: 591 name_or_obj (str|obj): the name or actual object to get help for 592 kind (str): optional, if specified forces retrieval of backend for the given kind 593 raw (bool): optional, if True forces help to be the backend type of the object. 594 If False returns the attributes[docs] on the object's metadata, if available. 595 Defaults to False 596 597 Returns: 598 * help(obj) if python is in interactive mode 599 * text(str) if python is in not interactive mode 600 """ 601 return self.store.help(name_or_obj, kind=kind, raw=raw)
602 603 def to_archive(self, name, path, **kwargs): 604 # TODO remove, pending #218 605 if not name.endswith('.ipynb'): 606 name += '.ipynb' 607 return self.store.to_archive(name, path, **kwargs) 608 609 def from_archive(self, path, name, **kwargs): 610 # TODO remove, pending #218 611 if not name.endswith('.ipynb'): 612 name += '.ipynb' 613 return self.store.from_archive(path, name, **kwargs) 614 615 def promote(self, name, other, **kwargs): 616 # TODO remove, pending #218 617 nb = self.get(name) 618 return other.put(nb, name) 619 620 def summary(self, *args, **kwargs): 621 return self.store.summary(*args, **kwargs)