1from __future__ import absolute_import
2
3import datetime
4import re
5from io import StringIO, BytesIO
6from uuid import uuid4
7
8import gridfs
9import yaml
10from croniter import croniter
11from jupyter_client import AsyncKernelManager
12from nbconvert.preprocessors import ClearOutputPreprocessor
13from nbconvert.preprocessors.execute import ExecutePreprocessor
14from nbformat import read as nbread, write as nbwrite, v4 as nbv4, NotebookNode
15
16from omegaml.backends.basedata import BaseDataBackend
17from omegaml.documents import MDREGISTRY
18from omegaml.notebook.jobschedule import JobSchedule
19
20
21class NotebookBackend(BaseDataBackend):
22 """
23 Omega Jobs API
24 """
25
26 # TODO this class should be a proper backend class with a mixin for ipynb
27
28 KIND = MDREGISTRY.OMEGAML_JOBS
29
30 @classmethod
31 def supports(cls, obj, name, data_store=None, **kwargs):
32 return data_store.prefix == 'jobs/' and (isinstance(obj, (str, NotebookNode)) or name.endswith('.ipynb'))
33
34 @property
35 def store(self):
36 return self.data_store
37
38 @property
39 def _db(self):
40 return self.store.mongodb
41
42 @property
43 def _fs(self):
44 return self.store.fs
45
46 def put(self, obj, name, attributes=None, **kwargs):
47 """ store a notebook job
48
49 Args:
50 obj (str|NotebookNode): the notebook object or the notebook's code as a string
51 name (str): the name of the job
52 attributes (dict): the attributes to store with the job
53
54 Returns:
55 Metadata
56 """
57 if not name.endswith('.ipynb'):
58 name += '.ipynb'
59 if isinstance(obj, str):
60 return self.create(obj, name)
61 sbuf = StringIO()
62 bbuf = BytesIO()
63 # nbwrite expects string, fs.put expects bytes
64 nbwrite(obj, sbuf, version=4)
65 sbuf.seek(0)
66 bbuf.write(sbuf.getvalue().encode('utf8'))
67 bbuf.seek(0)
68 # see if we have a file already, if so replace the gridfile
69 meta = self.store.metadata(name)
70 if not meta:
71 filename = uuid4().hex
72 fileid = self._store_to_file(self.store, bbuf, filename)
73 meta = self.store._make_metadata(name=name,
74 prefix=self.store.prefix,
75 bucket=self.store.bucket,
76 kind=self.KIND,
77 attributes=attributes,
78 gridfile=fileid)
79 meta = meta.save()
80 else:
81 filename = uuid4().hex
82 meta.gridfile = self._store_to_file(self.store, bbuf, filename)
83 meta = meta.save()
84 # set config
85 nb_config = self.store.get_notebook_config(name)
86 meta_config = meta.attributes.get('config', {})
87 if nb_config:
88 meta_config.update(dict(**nb_config))
89 meta.attributes['config'] = meta_config
90 meta = meta.save()
91 if not name.startswith('results') and ('run-at' in meta_config):
92 meta = self.store.schedule(name)
93 return meta
94
95 def get(self, name, version=-1, force_python=False, lazy=False, **kwargs):
96 """
97 Retrieve a notebook and return a NotebookNode
98 """
99 if not name.endswith('.ipynb'):
100 name += '.ipynb'
101 meta = self.store.metadata(name)
102 if meta:
103 try:
104 outf = meta.gridfile
105 except gridfs.errors.NoFile as e:
106 raise e
107 # nbwrite wants a string, outf is bytes
108 sbuf = StringIO()
109 data = outf.read()
110 if data is None:
111 msg = 'Expected content in {name}, got None'.format(**locals())
112 raise ValueError(msg)
113 sbuf.write(data.decode('utf8'))
114 sbuf.seek(0)
115 nb = nbread(sbuf, as_version=4)
116 return nb
117 else:
118 raise gridfs.errors.NoFile(
119 ">{0}< does not exist in jobs bucket '{1}'".format(
120 name, self.store.bucket))
121
122
123class NotebookMixin:
124 """ om.jobs storage methods
125
126 Mixin to OmegaStore to provide om.jobs-specific methods
127
128 Usage:
129 # create a new notebook from code
130 code = \"""
131 print('hello')
132 \"""
133 om.jobs.create(code, 'name')
134
135 # schedule a notebook
136 om.jobs.schedule('name', run_at='<cron spec>')
137
138 # run a notebook immediately
139 om.jobs.run('name')
140 """
141 _nb_config_magic = 'omega-ml', 'schedule', 'run-at', 'cron'
142 _dir_placeholder = '_placeholder.ipynb'
143
144 def _init_mixin(self, *args, **kwargs):
145 self._include_dir_placeholder = True
146 # convenience so you can do om.jobs.schedule(..., run_at=om.jobs.Schedule(....))
147 self.Schedule = JobSchedule
148
149 @classmethod
150 def supports(cls, store, *args, **kwargs):
151 return store.prefix == 'jobs/'
152
153 def __repr__(self):
154 return 'OmegaJobs(store={})'.format(super().__repr__())
155
156 def collection(self, name):
157 if not name.endswith('.ipynb'):
158 name += '.ipynb'
159 return super().collection(name)
160
161 def metadata(self, name, **kwargs):
162 """ retrieve metadata of a notebook
163
164 Args:
165 name (str): the name of the notebook
166
167 Returns:
168 Metadata
169 """
170 meta = super().metadata(name, **kwargs)
171 if meta is None and not name.endswith('.ipynb'):
172 name += '.ipynb'
173 meta = super().metadata(name)
174 return meta
175
176 def exists(self, name):
177 """ check if the notebook exists
178
179 Args:
180 name (str): the name of the notebook
181
182 Returns:
183 Metadata
184 """
185 return len(super().list(name)) + len(super().list(name + '.ipynb')) > 0
186
187 def create(self, code, name, **kwargs):
188 """
189 create a notebook from code
190
191 :param code: the code as a string
192 :param name: the name of the job to create
193 :return: the metadata object created
194 """
195 cells = []
196 cells.append(nbv4.new_code_cell(source=code))
197 notebook = nbv4.new_notebook(cells=cells)
198 # put the notebook
199 meta = self.put(notebook, name, **kwargs)
200 return meta
201
202 def get_fs(self, collection=None):
203 # legacy support
204 return self._fs
205
206 def get_collection(self, collection):
207 """
208 returns the collection object
209 """
210 # FIXME this should use store.collection
211 return getattr(self.mongodb, collection)
212
213 def list(self, pattern=None, regexp=None, raw=False, **kwargs):
214 """
215 list all jobs matching filter.
216 filter is a regex on the name of the ipynb entry.
217 The default is all, i.e. `.*`
218 """
219 job_list = super().list(pattern=pattern, regexp=regexp, raw=raw, **kwargs)
220 name = lambda v: v.name if raw else v
221 if not self._include_dir_placeholder:
222 job_list = [v for v in job_list if not name(v).endswith(self._dir_placeholder)]
223 return job_list
224
225 def get_notebook_config(self, nb_filename):
226 """
227 returns the omegaml script config on the notebook's first cell
228
229 The config cell is in any of the following formats:
230
231 *Option 1*::
232
233 # omega-ml:
234 # run-at: "<cron schedule>"
235
236 *Option 2*::
237
238 # omega-ml:
239 # schedule: "weekday(s), hour[, month]"
240
241 *Option 3*::
242
243 # omega-ml:
244 # cron: "<cron schedule>"
245
246 You may optionally specify only ``run-at``, ``schedule`` or ``cron``,
247 i.e. without the ``omega-ml`` header::
248
249 # run-at: "<cron schedule>"
250 # cron: "<cron schedule>"
251 # schedule: "weekday(s), hour[, month]"
252
253 Args:
254 nb_filename (str): the name of the notebook
255
256 Returns:
257 config(dict) => ``{ 'run-at': <specifier> }``
258
259 Raises:
260 ValueError, if there is no config cell or the config cell is invalid
261
262 See Also:
263 * JobSchedule
264 """
265 notebook = self.get(nb_filename)
266 config_cell = None
267 config_magic = ['# {}'.format(kw) for kw in self._nb_config_magic]
268 for cell in notebook.get('cells'):
269 if any(cell.source.startswith(kw) for kw in config_magic):
270 config_cell = cell
271 if not config_cell:
272 return {}
273 yaml_conf = '\n'.join(
274 [re.sub('#', '', x, 1) for x in str(
275 config_cell.source).splitlines()])
276 try:
277 yaml_conf = yaml.safe_load(yaml_conf)
278 config = yaml_conf.get(self._nb_config_magic[0], yaml_conf)
279 except Exception:
280 raise ValueError(
281 'Notebook configuration cannot be parsed')
282 # translate config to canonical form
283 # TODO refactor to seperate method / mapped translation functions
284 if 'schedule' in config:
285 config['run-at'] = JobSchedule(text=config.get('schedule', '')).cron
286 if 'cron' in config:
287 config['run-at'] = JobSchedule.from_cron(config.get('cron')).cron
288 if 'run-at' in config:
289 config['run-at'] = JobSchedule.from_cron(config.get('run-at')).cron
290 return config
291
292 def run(self, name, event=None, timeout=None):
293 """
294 Run a job immediately
295
296 The job is run and the results are stored in om.jobs('results/name <timestamp>') and
297 the result's Metadata is returned.
298
299 ``Metadata.attributes`` of the original job as given by name is updated:
300
301 * ``attributes['job_runs']`` (list) - list of status of each run. Status is
302 a dict as below
303 * ``attributes['job_results']`` (list) - list of results job names in same
304 index-order as job_runs
305 * ``attributes['trigger']`` (list) - list of triggers
306
307 The status of each job run is a dict with keys:
308
309 * ``status`` (str): the status of the job run, OK or ERROR
310 * ``ts`` (datetime): time of execution
311 * ``message`` (str): error mesasge in case of ERROR, else blank
312 * ``results`` (str): name of results in case of OK, else blank
313
314 Usage::
315
316 # directly (sync)
317 meta = om.jobs.run('mynb')
318
319 # via runtime (async)
320 job = om.runtime.job('mynb')
321 result = job.run()
322
323 Args:
324 name (str): the name of the jobfile
325 event (str): an event name
326 timeout (int): timeout in seconds, None means no timeout
327
328 Returns:
329 Metadata of the results entry
330
331 See Also:
332 * OmegaJobs.run_notebook
333 """
334 return self.run_notebook(name, event=event, timeout=timeout)
335
336 def run_notebook(self, name, event=None, timeout=None):
337 """ run a given notebook immediately.
338
339 Args:
340 name (str): the name of the jobfile
341 event (str): an event name
342 timeout (int): timeout in seconds
343
344 Returns:
345 Metadata of results
346
347 See Also:
348 * nbconvert https://nbconvert.readthedocs.io/en/latest/execute_api.html
349 """
350 notebook = self.get(name)
351 meta_job = self.metadata(name)
352 assert meta_job is not None, f"Cannot run non-existent jobs {name}"
353 ts = datetime.datetime.now()
354 # execute kwargs
355 # -- see ExecuteProcessor class
356 # -- see https://nbconvert.readthedocs.io/en/latest/execute_api.html
357 ep_kwargs = {
358 # avoid timeouts to stop kernel
359 'timeout': timeout,
360 # avoid kernel at exit functions
361 # -- this stops ipykernel AttributeError 'send_multipart'
362 'shutdown_kernel': 'immediate',
363 # set kernel name, blank is default
364 # -- e.g. python3, ir
365 # -- see https://stackoverflow.com/a/47053020/890242
366 'kernel_name': '',
367 # always use the async kernel manager
368 # -- see https://github.com/jupyter/nbconvert/issues/1964
369 'kernel_manager_class': AsyncKernelManager,
370 }
371 # overrides from metadata
372 ep_kwargs.update(meta_job.kind_meta.get('ep_kwargs', {}))
373 try:
374 resources = {
375 'metadata': {
376 'path': self.defaults.OMEGA_TMP,
377 }
378 }
379 if not meta_job.kind_meta.get('keep_output', False):
380 # https://nbconvert.readthedocs.io/en/latest/api/preprocessors.html
381 cp = ClearOutputPreprocessor()
382 cp.preprocess(notebook, resources)
383 ep = ExecutePreprocessor(**ep_kwargs)
384 ep.preprocess(notebook, resources)
385 except Exception as e:
386 status = 'ERROR'
387 message = str(e)
388 else:
389 status = 'OK'
390 message = ''
391 finally:
392 del ep
393 # record results
394 meta_results = self.put(notebook,
395 'results/{name}_{ts}'.format(**locals()))
396 meta_results.attributes['source_job'] = name
397 meta_results.save()
398 job_results = meta_job.attributes.get('job_results', [])
399 job_results.append(meta_results.name)
400 meta_job.attributes['job_results'] = job_results
401 # record final job status
402 job_runs = meta_job.attributes.get('job_runs', [])
403 runstate = {
404 'status': status,
405 'ts': ts,
406 'message': message,
407 'results': meta_results.name if status == 'OK' else None
408 }
409 job_runs.append(runstate)
410 meta_job.attributes['job_runs'] = job_runs
411 # set event run state if event was specified
412 if event:
413 attrs = meta_job.attributes
414 triggers = attrs['triggers'] = attrs.get('triggers', [])
415 scheduled = (trigger for trigger in triggers
416 if trigger['event-kind'] == 'scheduled')
417 for trigger in scheduled:
418 if event == trigger['event']:
419 trigger['status'] = status
420 trigger['ts'] = ts
421 meta_job.save()
422 return meta_results
423
424 def schedule(self, nb_file, run_at=None, last_run=None):
425 """
426 Schedule a processing of a notebook as per the interval
427 specified on the job script
428
429 Notes:
430 This updates the notebook's Metadata entry by adding the
431 next scheduled run in ``attributes['triggers']```
432
433 Args:
434 nb_file (str): the name of the notebook
435 run_at (str|dict|JobSchedule): the schedule specified in a format
436 suitable for JobSchedule. If not specified, this value is
437 extracted from the first cell of the notebook
438 last_run (datetime): the last time this job was run, use this to
439 reschedule the job for the next run. Defaults to the last
440 timestamp listed in ``attributes['job_runs']``, or datetime.utcnow()
441 if no previous run exists.
442
443 See Also:
444 * croniter.get_next()
445 * JobSchedule
446 * OmegaJobs.get_notebook_config
447 * cron expression - https://en.wikipedia.org/wiki/Cron#CRON_expression
448 """
449 meta = self.metadata(nb_file)
450 attrs = meta.attributes
451 # get/set run-at spec
452 config = attrs.get('config', {})
453 # see what we have as a schedule
454 # -- a dictionary of JobSchedule
455 if isinstance(run_at, dict):
456 run_at = self.Schedule(**run_at).cron
457 # -- a cron string
458 elif isinstance(run_at, str):
459 run_at = self.Schedule(run_at).cron
460 # -- a JobSchedule
461 elif isinstance(run_at, self.Schedule):
462 run_at = run_at.cron
463 # -- nothing, may we have it on the job's config already
464 if not run_at:
465 interval = config.get('run-at')
466 else:
467 # store the new schedule
468 config['run-at'] = run_at
469 interval = run_at
470 if not interval:
471 # if we don't have a run-spec, return without scheduling
472 raise ValueError('no run-at specification found, cannot schedule')
473 # get last time the job was run
474 if last_run is None:
475 job_runs = attrs.get('job_runs')
476 if job_runs:
477 last_run = job_runs[-1]['ts']
478 else:
479 last_run = datetime.datetime.utcnow()
480 # calculate next run time
481 iter_next = croniter(interval, last_run)
482 run_at = iter_next.get_next(datetime.datetime)
483 # store next scheduled run
484 triggers = attrs['triggers'] = attrs.get('triggers', [])
485 # set up a schedule event
486 scheduled_run = {
487 'event-kind': 'scheduled',
488 'event': run_at.isoformat(),
489 'run-at': run_at,
490 'status': 'PENDING'
491 }
492 # remove all pending triggers, add new triggers
493 past_triggers = [cur for cur in triggers if cur.get('status') != 'PENDING']
494 triggers.clear()
495 triggers.extend(past_triggers)
496 triggers.append(scheduled_run)
497 attrs['config'] = config
498 return meta.save()
499
500 def get_schedule(self, name, only_pending=False):
501 """
502 return the cron schedule and corresponding triggers
503
504 Args:
505 name (str): the name of the job
506
507 Returns:
508 tuple of (run_at, triggers)
509
510 run_at (str): the cron spec, None if not scheduled
511 triggers (list): the list of triggers
512 """
513 meta = self.metadata(name)
514 attrs = meta.attributes
515 config = attrs.get('config')
516 triggers = attrs.get('triggers', [])
517 if only_pending:
518 triggers = [trigger for trigger in triggers
519 if trigger['status'] == 'PENDING']
520 if config and 'run-at' in config:
521 run_at = config.get('run-at')
522 else:
523 run_at = None
524 return run_at, triggers
525
526 def drop_schedule(self, name):
527 """
528 Drop an existing schedule, if any
529
530 This will drop any existing schedule and any pending triggers of
531 event-kind 'scheduled'.
532
533 Args:
534 name (str): the name of the job
535
536 Returns:
537 Metadata
538 """
539 meta = self.metadata(name)
540 attrs = meta.attributes
541 config = attrs.get('config')
542 triggers = attrs.get('triggers')
543 if 'run-at' in config:
544 del config['run-at']
545 for trigger in triggers:
546 if trigger['event-kind'] == 'scheduled' and trigger['status'] == 'PENDING':
547 trigger['status'] = 'CANCELLED'
548 return meta.save()
549
550 def export(self, name, localpath='memory', format='html'):
551 """
552 Export a job or result file to HTML
553
554 The job is exported in the given format.
555
556 :param name: the name of the job, as in jobs.get
557 :param localpath: the path of the local file to write. If you
558 specify an empty path or 'memory' a tuple of (body, resource)
559 is returned instead
560 :param format: the output format. currently only :code:`'html'` is supported
561 :return: the (data, resources) tuple as returned by nbconvert. For
562 format html data is the HTML's body, for PDF it is the pdf file contents
563 """
564 from traitlets.config import Config
565 from nbconvert import SlidesExporter
566 from nbconvert.exporters.html import HTMLExporter
567 from nbconvert.exporters.pdf import PDFExporter
568
569 # https://nbconvert.readthedocs.io/en/latest/nbconvert_library.html
570 # (exporter class, filemode, config-values
571 EXPORTERS = {
572 'html': (HTMLExporter, '', {}),
573 'htmlbody': (HTMLExporter, '', {}),
574 'pdf': (PDFExporter, 'b', {}),
575 'slides': (SlidesExporter, '', {'RevealHelpPreprocessor.url_prefix':
576 'https://cdnjs.cloudflare.com/ajax/libs/reveal.js/3.6.0/'}),
577 }
578 # get exporter according to format
579 if format not in EXPORTERS:
580 raise ValueError('format {} is invalid. Choose one of {}'.format(format, EXPORTERS.keys()))
581 exporter_cls, fmode, configkw = EXPORTERS[format]
582 # prepare config
583 # http://nbconvert.readthedocs.io/en/latest/nbconvert_library.html#Using-different-preprocessors
584 c = Config()
585 for k, v in configkw.items():
586 context, key = k.split('.')
587 setattr(c[context], key, v)
588 # get configured exporter
589 exporter = exporter_cls(config=c)
590 # get notebook, convert and store in file if requested
591 notebook = self.get(name)
592 (data, resources) = exporter.from_notebook_node(notebook)
593 if localpath and localpath != 'memory':
594 with open(localpath, 'w' + fmode) as fout:
595 fout.write(data)
596 return data, resources
597
598 def help(self, name_or_obj=None, kind=None, raw=False):
599 """ get help for a notebook
600
601 Args:
602 name_or_obj (str|obj): the name or actual object to get help for
603 kind (str): optional, if specified forces retrieval of backend for the given kind
604 raw (bool): optional, if True forces help to be the backend type of the object.
605 If False returns the attributes[docs] on the object's metadata, if available.
606 Defaults to False
607
608 Returns:
609 * help(obj) if python is in interactive mode
610 * text(str) if python is in not interactive mode
611 """
612 return super().help(name_or_obj, kind=kind, raw=raw)
613
614 def to_archive(self, name, path, **kwargs):
615 if not name.endswith('.ipynb'):
616 name += '.ipynb'
617 return super().to_archive(name, path, **kwargs)
618
619 def from_archive(self, path, name, **kwargs):
620 if not name.endswith('.ipynb'):
621 name += '.ipynb'
622 return super().from_archive(path, name, **kwargs)
623
624 def promote(self, name, other, **kwargs):
625 nb = self.get(name)
626 return other.put(nb, name)
627
628 def summary(self, *args, **kwargs):
629 return super().summary(*args, **kwargs)
630
631
632# sphinx docs compatibility
[docs]
633class OmegaJobs(NotebookMixin, NotebookBackend):
634 pass