1from __future__ import absolute_import
2
3import datetime
4import re
5from io import StringIO, BytesIO
6from uuid import uuid4
7
8import gridfs
9import yaml
10from croniter import croniter
11from jupyter_client import AsyncKernelManager
12from nbformat import read as nbread, write as nbwrite, v4 as nbv4, NotebookNode
13
14from omegaml.backends.basedata import BaseDataBackend
15from omegaml.documents import MDREGISTRY
16from omegaml.notebook.jobschedule import JobSchedule
17
18
19class NotebookBackend(BaseDataBackend):
20 """
21 Omega Jobs API
22 """
23
24 # TODO this class should be a proper backend class with a mixin for ipynb
25
26 KIND = MDREGISTRY.OMEGAML_JOBS
27
28 @classmethod
29 def supports(cls, obj, name, data_store=None, **kwargs):
30 return data_store.prefix == 'jobs/' and (isinstance(obj, (str, NotebookNode)) or name.endswith('.ipynb'))
31
32 @property
33 def store(self):
34 return self.data_store
35
36 @property
37 def _db(self):
38 return self.store.mongodb
39
40 @property
41 def _fs(self):
42 return self.store.fs
43
44 def put(self, obj, name, attributes=None, **kwargs):
45 """ store a notebook job
46
47 Args:
48 obj (str|NotebookNode): the notebook object or the notebook's code as a string
49 name (str): the name of the job
50 attributes (dict): the attributes to store with the job
51
52 Returns:
53 Metadata
54 """
55 if not name.endswith('.ipynb'):
56 name += '.ipynb'
57 if isinstance(obj, str):
58 return self.store.create(obj, name)
59 sbuf = StringIO()
60 bbuf = BytesIO()
61 # nbwrite expects string, fs.put expects bytes
62 nbwrite(obj, sbuf, version=4)
63 sbuf.seek(0)
64 bbuf.write(sbuf.getvalue().encode('utf8'))
65 bbuf.seek(0)
66 # see if we have a file already, if so replace the gridfile
67 meta = self.store.metadata(name)
68 if not meta:
69 filename = uuid4().hex
70 fileid = self._store_to_file(self.store, bbuf, filename)
71 meta = self.store._make_metadata(name=name,
72 prefix=self.store.prefix,
73 bucket=self.store.bucket,
74 kind=self.KIND,
75 attributes=attributes,
76 gridfile=fileid)
77 meta = meta.save()
78 else:
79 filename = uuid4().hex
80 meta.gridfile = self._store_to_file(self.store, bbuf, filename)
81 meta = meta.save()
82 # set config
83 nb_config = self.store.get_notebook_config(name)
84 meta_config = meta.attributes.get('config', {})
85 if nb_config:
86 meta_config.update(dict(**nb_config))
87 meta.attributes['config'] = meta_config
88 meta = meta.save()
89 if not name.startswith('results') and ('run-at' in meta_config):
90 meta = self.store.schedule(name)
91 return meta
92
93 def get(self, name, version=-1, force_python=False, lazy=False, **kwargs):
94 """
95 Retrieve a notebook and return a NotebookNode
96 """
97 if not name.endswith('.ipynb'):
98 name += '.ipynb'
99 meta = self.store.metadata(name)
100 if meta:
101 try:
102 outf = meta.gridfile
103 except gridfs.errors.NoFile as e:
104 raise e
105 # nbwrite wants a string, outf is bytes
106 sbuf = StringIO()
107 data = outf.read()
108 if data is None:
109 msg = 'Expected content in {name}, got None'.format(**locals())
110 raise ValueError(msg)
111 sbuf.write(data.decode('utf8'))
112 sbuf.seek(0)
113 nb = nbread(sbuf, as_version=4)
114 return nb
115 else:
116 raise gridfs.errors.NoFile(
117 ">{0}< does not exist in jobs bucket '{1}'".format(
118 name, self.store.bucket))
119
120
121class NotebookMixin:
122 """ om.jobs storage methods
123
124 Mixin to OmegaStore to provide om.jobs-specific methods
125
126 Usage:
127 # create a new notebook from code
128 code = \"""
129 print('hello')
130 \"""
131 om.jobs.create(code, 'name')
132
133 # schedule a notebook
134 om.jobs.schedule('name', run_at='<cron spec>')
135
136 # run a notebook immediately
137 om.jobs.run('name')
138 """
139 _nb_config_magic = 'omega-ml', 'schedule', 'run-at', 'cron'
140 _dir_placeholder = '_placeholder.ipynb'
141
142 def _init_mixin(self, *args, **kwargs):
143 self._include_dir_placeholder = True
144 # convenience so you can do om.jobs.schedule(..., run_at=om.jobs.Schedule(....))
145 self.Schedule = JobSchedule
146
147 @classmethod
148 def supports(cls, store, *args, **kwargs):
149 return store.prefix == 'jobs/'
150
151 def __repr__(self):
152 return 'OmegaJobs(store={})'.format(super().__repr__())
153
154 def collection(self, name):
155 if not name.endswith('.ipynb'):
156 name += '.ipynb'
157 return super().collection(name)
158
159 def metadata(self, name, **kwargs):
160 """ retrieve metadata of a notebook
161
162 Args:
163 name (str): the name of the notebook
164
165 Returns:
166 Metadata
167 """
168 meta = super().metadata(name, **kwargs)
169 if meta is None and not name.endswith('.ipynb'):
170 name += '.ipynb'
171 meta = super().metadata(name)
172 return meta
173
174 def exists(self, name):
175 """ check if the notebook exists
176
177 Args:
178 name (str): the name of the notebook
179
180 Returns:
181 Metadata
182 """
183 return len(super().list(name)) + len(super().list(name + '.ipynb')) > 0
184
185 def create(self, code, name, **kwargs):
186 """
187 create a notebook from code
188
189 :param code: the code as a string
190 :param name: the name of the job to create
191 :return: the metadata object created
192 """
193 cells = []
194 cells.append(nbv4.new_code_cell(source=code))
195 notebook = nbv4.new_notebook(cells=cells)
196 # put the notebook
197 meta = self.put(notebook, name, **kwargs)
198 return meta
199
200 def get_fs(self, collection=None):
201 # legacy support
202 return self._fs
203
204 def get_collection(self, collection):
205 """
206 returns the collection object
207 """
208 # FIXME this should use store.collection
209 return getattr(self.mongodb, collection)
210
211 def list(self, pattern=None, regexp=None, raw=False, **kwargs):
212 """
213 list all jobs matching filter.
214 filter is a regex on the name of the ipynb entry.
215 The default is all, i.e. `.*`
216 """
217 job_list = super().list(pattern=pattern, regexp=regexp, raw=raw, **kwargs)
218 name = lambda v: v.name if raw else v
219 if not self._include_dir_placeholder:
220 job_list = [v for v in job_list if not name(v).endswith(self._dir_placeholder)]
221 return job_list
222
223 def get_notebook_config(self, nb_filename):
224 """
225 returns the omegaml script config on the notebook's first cell
226
227 The config cell is in any of the following formats:
228
229 *Option 1*::
230
231 # omega-ml:
232 # run-at: "<cron schedule>"
233
234 *Option 2*::
235
236 # omega-ml:
237 # schedule: "weekday(s), hour[, month]"
238
239 *Option 3*::
240
241 # omega-ml:
242 # cron: "<cron schedule>"
243
244 You may optionally specify only ``run-at``, ``schedule`` or ``cron``,
245 i.e. without the ``omega-ml`` header::
246
247 # run-at: "<cron schedule>"
248 # cron: "<cron schedule>"
249 # schedule: "weekday(s), hour[, month]"
250
251 Args:
252 nb_filename (str): the name of the notebook
253
254 Returns:
255 config(dict) => ``{ 'run-at': <specifier> }``
256
257 Raises:
258 ValueError, if there is no config cell or the config cell is invalid
259
260 See Also:
261 * JobSchedule
262 """
263 notebook = self.get(nb_filename)
264 config_cell = None
265 config_magic = ['# {}'.format(kw) for kw in self._nb_config_magic]
266 for cell in notebook.get('cells'):
267 if any(cell.source.startswith(kw) for kw in config_magic):
268 config_cell = cell
269 if not config_cell:
270 return {}
271 yaml_conf = '\n'.join(
272 [re.sub('#', '', x, 1) for x in str(
273 config_cell.source).splitlines()])
274 try:
275 yaml_conf = yaml.safe_load(yaml_conf)
276 config = yaml_conf.get(self._nb_config_magic[0], yaml_conf)
277 except Exception:
278 raise ValueError(
279 'Notebook configuration cannot be parsed')
280 # translate config to canonical form
281 # TODO refactor to seperate method / mapped translation functions
282 if 'schedule' in config:
283 config['run-at'] = JobSchedule(text=config.get('schedule', '')).cron
284 if 'cron' in config:
285 config['run-at'] = JobSchedule.from_cron(config.get('cron')).cron
286 if 'run-at' in config:
287 config['run-at'] = JobSchedule.from_cron(config.get('run-at')).cron
288 return config
289
290 def run(self, name, event=None, timeout=None):
291 """
292 Run a job immediately
293
294 The job is run and the results are stored in om.jobs('results/name <timestamp>') and
295 the result's Metadata is returned.
296
297 ``Metadata.attributes`` of the original job as given by name is updated:
298
299 * ``attributes['job_runs']`` (list) - list of status of each run. Status is
300 a dict as below
301 * ``attributes['job_results']`` (list) - list of results job names in same
302 index-order as job_runs
303 * ``attributes['trigger']`` (list) - list of triggers
304
305 The status of each job run is a dict with keys:
306
307 * ``status`` (str): the status of the job run, OK or ERROR
308 * ``ts`` (datetime): time of execution
309 * ``message`` (str): error mesasge in case of ERROR, else blank
310 * ``results`` (str): name of results in case of OK, else blank
311
312 Usage::
313
314 # directly (sync)
315 meta = om.jobs.run('mynb')
316
317 # via runtime (async)
318 job = om.runtime.job('mynb')
319 result = job.run()
320
321 Args:
322 name (str): the name of the jobfile
323 event (str): an event name
324 timeout (int): timeout in seconds, None means no timeout
325
326 Returns:
327 Metadata of the results entry
328
329 See Also:
330 * OmegaJobs.run_notebook
331 """
332 return self.run_notebook(name, event=event, timeout=timeout)
333
334 def run_notebook(self, name, event=None, timeout=None):
335 """ run a given notebook immediately.
336
337 Args:
338 name (str): the name of the jobfile
339 event (str): an event name
340 timeout (int): timeout in seconds
341
342 Returns:
343 Metadata of results
344
345 Notes:
346 The notebook is run using nbconvert.
347
348 The job's metadata is updated to reflect all job runs and their results
349 as follows::
350
351 {
352 'job_runs': [{
353 'status': '<execution status>',
354 'ts': <datetime>,
355 'message': '<blank | exception>,
356 'results': 'results/<name>_<datetime>.ipynb',
357 } ...],
358 'job_results': [
359 'results/<name>_<datetime>.ipynb',
360 ]
361 }
362
363 `job_runs` is a list representing result status of each time the job was
364 executed by the runtime, either by `om.runtime.job('<name>').run()`, or
365 as a scheduled execution. In case of an exception, the message is truncated
366 to contain an excerpt of str(exception).
367
368 `job_results` is a list of members in `om.jobs` representing the result
369 each job execution. Each result is equivalent of an interactive execution
370 in jupyter.
371
372 See Also:
373 * nbconvert https://nbconvert.readthedocs.io/en/latest/execute_api.html
374
375 .. versionchanged:: 0.18
376 `job_runs[].results` lists the name of the respective `job_results` item
377 regardless of status. In prior versions `job_runs[].results` was null for
378 failed jobs.
379
380 .. versionchanged:: 0.18
381 `job_runs[].message` is truncated to contain the first and last 80
382 characters of an exception. The actual exception is preserved in the
383 job_results notebook, stored as a separate object.
384 """
385 from nbconvert.preprocessors import ClearOutputPreprocessor
386 from nbconvert.preprocessors.execute import ExecutePreprocessor
387
388 notebook = self.get(name)
389 meta_job = self.metadata(name)
390 assert meta_job is not None, f"Cannot run non-existent jobs {name}"
391 ts = datetime.datetime.now()
392 # execute kwargs
393 # -- see ExecuteProcessor class
394 # -- see https://nbconvert.readthedocs.io/en/latest/execute_api.html
395 ep_kwargs = {
396 # avoid timeouts to stop kernel
397 'timeout': timeout,
398 # avoid kernel at exit functions
399 # -- this stops ipykernel AttributeError 'send_multipart'
400 'shutdown_kernel': 'immediate',
401 # set kernel name, blank is default
402 # -- e.g. python3, ir
403 # -- see https://stackoverflow.com/a/47053020/890242
404 'kernel_name': '',
405 # always use the async kernel manager
406 # -- see https://github.com/jupyter/nbconvert/issues/1964
407 'kernel_manager_class': AsyncKernelManager,
408 }
409 # overrides from metadata
410 ep_kwargs.update(meta_job.kind_meta.get('ep_kwargs', {}))
411 try:
412 resources = {
413 'metadata': {
414 'path': self.defaults.OMEGA_TMP,
415 }
416 }
417 if not meta_job.kind_meta.get('keep_output', False):
418 # https://nbconvert.readthedocs.io/en/latest/api/preprocessors.html
419 cp = ClearOutputPreprocessor()
420 cp.preprocess(notebook, resources)
421 ep = ExecutePreprocessor(**ep_kwargs)
422 ep.preprocess(notebook, resources)
423 except Exception as e:
424 status = 'ERROR'
425 message = str(e)
426 else:
427 status = 'OK'
428 message = ''
429 finally:
430 # limit size of metadata.attributes #496
431 message = f'{message[0:80]}...{message[-80:]}'
432 del ep
433 # record results
434 meta_results = self.put(notebook,
435 'results/{name}_{ts}'.format(**locals()))
436 meta_results.attributes['source_job'] = name
437 meta_results.save()
438 job_results = meta_job.attributes.get('job_results', [])
439 job_results.append(meta_results.name)
440 meta_job.attributes['job_results'] = job_results
441 # record final job status
442 job_runs = meta_job.attributes.get('job_runs', [])
443 runstate = {
444 'status': status,
445 'ts': ts,
446 'message': message,
447 'results': meta_results.name
448 }
449 job_runs.append(runstate)
450 meta_job.attributes['job_runs'] = job_runs
451 # set event run state if event was specified
452 if event:
453 attrs = meta_job.attributes
454 triggers = attrs['triggers'] = attrs.get('triggers', [])
455 scheduled = (trigger for trigger in triggers
456 if trigger['event-kind'] == 'scheduled')
457 for trigger in scheduled:
458 if event == trigger['event']:
459 trigger['status'] = status
460 trigger['ts'] = ts
461 meta_job.save()
462 return meta_results
463
464 def schedule(self, nb_file, run_at=None, last_run=None):
465 """
466 Schedule a processing of a notebook as per the interval
467 specified on the job script
468
469 Notes:
470 This updates the notebook's Metadata entry by adding the
471 next scheduled run in ``attributes['triggers']```
472
473 Args:
474 nb_file (str): the name of the notebook
475 run_at (str|dict|JobSchedule): the schedule specified in a format
476 suitable for JobSchedule. If not specified, this value is
477 extracted from the first cell of the notebook
478 last_run (datetime): the last time this job was run, use this to
479 reschedule the job for the next run. Defaults to the last
480 timestamp listed in ``attributes['job_runs']``, or datetime.utcnow()
481 if no previous run exists.
482
483 See Also:
484 * croniter.get_next()
485 * JobSchedule
486 * OmegaJobs.get_notebook_config
487 * cron expression - https://en.wikipedia.org/wiki/Cron#CRON_expression
488 """
489 meta = self.metadata(nb_file)
490 attrs = meta.attributes
491 # get/set run-at spec
492 config = attrs.get('config', {})
493 # see what we have as a schedule
494 # -- a dictionary of JobSchedule
495 if isinstance(run_at, dict):
496 run_at = self.Schedule(**run_at).cron
497 # -- a cron string
498 elif isinstance(run_at, str):
499 run_at = self.Schedule(run_at).cron
500 # -- a JobSchedule
501 elif isinstance(run_at, self.Schedule):
502 run_at = run_at.cron
503 # -- nothing, may we have it on the job's config already
504 if not run_at:
505 interval = config.get('run-at')
506 else:
507 # store the new schedule
508 config['run-at'] = run_at
509 interval = run_at
510 if not interval:
511 # if we don't have a run-spec, return without scheduling
512 raise ValueError('no run-at specification found, cannot schedule')
513 # get last time the job was run
514 if last_run is None:
515 job_runs = attrs.get('job_runs')
516 if job_runs:
517 last_run = job_runs[-1]['ts']
518 else:
519 last_run = datetime.datetime.utcnow()
520 # calculate next run time
521 iter_next = croniter(interval, last_run)
522 run_at = iter_next.get_next(datetime.datetime)
523 # store next scheduled run
524 triggers = attrs['triggers'] = attrs.get('triggers', [])
525 # set up a schedule event
526 scheduled_run = {
527 'event-kind': 'scheduled',
528 'event': run_at.isoformat(),
529 'run-at': run_at,
530 'status': 'PENDING'
531 }
532 # remove all pending triggers, add new triggers
533 past_triggers = [cur for cur in triggers if cur.get('status') != 'PENDING']
534 triggers.clear()
535 triggers.extend(past_triggers)
536 triggers.append(scheduled_run)
537 attrs['config'] = config
538 return meta.save()
539
540 def get_schedule(self, name, only_pending=False):
541 """
542 return the cron schedule and corresponding triggers
543
544 Args:
545 name (str): the name of the job
546
547 Returns:
548 tuple of (run_at, triggers)
549
550 run_at (str): the cron spec, None if not scheduled
551 triggers (list): the list of triggers
552 """
553 meta = self.metadata(name)
554 attrs = meta.attributes
555 config = attrs.get('config')
556 triggers = attrs.get('triggers', [])
557 if only_pending:
558 triggers = [trigger for trigger in triggers
559 if trigger['status'] == 'PENDING']
560 if config and 'run-at' in config:
561 run_at = config.get('run-at')
562 else:
563 run_at = None
564 return run_at, triggers
565
566 def drop_schedule(self, name):
567 """
568 Drop an existing schedule, if any
569
570 This will drop any existing schedule and any pending triggers of
571 event-kind 'scheduled'.
572
573 Args:
574 name (str): the name of the job
575
576 Returns:
577 Metadata
578 """
579 meta = self.metadata(name)
580 attrs = meta.attributes
581 config = attrs.get('config')
582 triggers = attrs.get('triggers')
583 if 'run-at' in config:
584 del config['run-at']
585 for trigger in triggers:
586 if trigger['event-kind'] == 'scheduled' and trigger['status'] == 'PENDING':
587 trigger['status'] = 'CANCELLED'
588 return meta.save()
589
590 def export(self, name, localpath='memory', format='html'):
591 """
592 Export a job or result file to HTML
593
594 The job is exported in the given format.
595
596 :param name: the name of the job, as in jobs.get
597 :param localpath: the path of the local file to write. If you
598 specify an empty path or 'memory' a tuple of (body, resource)
599 is returned instead
600 :param format: the output format. currently only :code:`'html'` is supported
601 :return: the (data, resources) tuple as returned by nbconvert. For
602 format html data is the HTML's body, for PDF it is the pdf file contents
603 """
604 from traitlets.config import Config
605 from nbconvert import SlidesExporter
606 from nbconvert.exporters.html import HTMLExporter
607 from nbconvert.exporters.pdf import PDFExporter
608
609 # https://nbconvert.readthedocs.io/en/latest/nbconvert_library.html
610 # (exporter class, filemode, config-values
611 EXPORTERS = {
612 'html': (HTMLExporter, '', {}),
613 'htmlbody': (HTMLExporter, '', {}),
614 'pdf': (PDFExporter, 'b', {}),
615 'slides': (SlidesExporter, '', {'RevealHelpPreprocessor.url_prefix':
616 'https://cdnjs.cloudflare.com/ajax/libs/reveal.js/3.6.0/'}),
617 }
618 # get exporter according to format
619 if format not in EXPORTERS:
620 raise ValueError('format {} is invalid. Choose one of {}'.format(format, EXPORTERS.keys()))
621 exporter_cls, fmode, configkw = EXPORTERS[format]
622 # prepare config
623 # http://nbconvert.readthedocs.io/en/latest/nbconvert_library.html#Using-different-preprocessors
624 c = Config()
625 for k, v in configkw.items():
626 context, key = k.split('.')
627 setattr(c[context], key, v)
628 # get configured exporter
629 exporter = exporter_cls(config=c)
630 # get notebook, convert and store in file if requested
631 notebook = self.get(name)
632 (data, resources) = exporter.from_notebook_node(notebook)
633 if localpath and localpath != 'memory':
634 with open(localpath, 'w' + fmode) as fout:
635 fout.write(data)
636 return data, resources
637
638 def help(self, name_or_obj=None, kind=None, raw=False):
639 """ get help for a notebook
640
641 Args:
642 name_or_obj (str|obj): the name or actual object to get help for
643 kind (str): optional, if specified forces retrieval of backend for the given kind
644 raw (bool): optional, if True forces help to be the backend type of the object.
645 If False returns the attributes[docs] on the object's metadata, if available.
646 Defaults to False
647
648 Returns:
649 * help(obj) if python is in interactive mode
650 * text(str) if python is in not interactive mode
651 """
652 return super().help(name_or_obj, kind=kind, raw=raw)
653
654 def to_archive(self, name, path, **kwargs):
655 if not name.endswith('.ipynb'):
656 name += '.ipynb'
657 return super().to_archive(name, path, **kwargs)
658
659 def from_archive(self, path, name, **kwargs):
660 if not name.endswith('.ipynb'):
661 name += '.ipynb'
662 return super().from_archive(path, name, **kwargs)
663
664 def promote(self, name, other, **kwargs):
665 nb = self.get(name)
666 return other.put(nb, name)
667
668 def summary(self, *args, **kwargs):
669 return super().summary(*args, **kwargs)
670
671
672# sphinx docs compatibility
[docs]
673class OmegaJobs(NotebookMixin, NotebookBackend):
674 pass