1from __future__ import absolute_import
2
3import datetime
4import gridfs
5import re
6import yaml
7from croniter import croniter
8from io import StringIO, BytesIO
9from jupyter_client import AsyncKernelManager
10from nbconvert.preprocessors import ClearOutputPreprocessor
11from nbconvert.preprocessors.execute import ExecutePreprocessor
12from nbformat import read as nbread, write as nbwrite, v4 as nbv4
13from uuid import uuid4
14
15from omegaml.backends.basecommon import BackendBaseCommon
16from omegaml.documents import MDREGISTRY
17from omegaml.notebook.jobschedule import JobSchedule
18from omegaml.store import OmegaStore
19from omegaml.util import settings as omega_settings
20
21
[docs]
22class OmegaJobs(BackendBaseCommon):
23 """
24 Omega Jobs API
25 """
26
27 # TODO this class should be a proper backend class with a mixin for ipynb
28
29 _nb_config_magic = 'omega-ml', 'schedule', 'run-at', 'cron'
30 _dir_placeholder = '_placeholder.ipynb'
31
32 def __init__(self, bucket=None, prefix=None, store=None, defaults=None):
33 self.defaults = defaults or omega_settings()
34 self.prefix = prefix or (store.prefix if store else None) or 'jobs'
35 self.store = store or OmegaStore(prefix=prefix, bucket=bucket, defaults=defaults)
36 self.kind = MDREGISTRY.OMEGAML_JOBS
37 self._include_dir_placeholder = True
38 # convenience so you can do om.jobs.schedule(..., run_at=om.jobs.Schedule(....))
39 self.Schedule = JobSchedule
40
41 def __repr__(self):
42 return 'OmegaJobs(store={})'.format(self.store.__repr__())
43
44 @property
45 def _db(self):
46 return self.store.mongodb
47
48 @property
49 def _fs(self):
50 return self.store.fs
51
52 def collection(self, name):
53 if not name.endswith('.ipynb'):
54 name += '.ipynb'
55 return self.store.collection(name)
56
[docs]
57 def drop(self, name, force=False):
58 """ remove the notebook
59
60 Args:
61 name (str): the name of the notebook
62 force (bool): if True does not raise
63
64 Returns:
65 True if object was deleted, False if not.
66 If force is True and the object does not exist it will still return True
67
68 Raises:
69 DoesNotExist if the notebook does not exist and ```force=False```
70 """
71 meta = self.metadata(name)
72 name = meta.name if meta is not None else name
73 return self.store.drop(name, force=force)
74
89
[docs]
90 def exists(self, name):
91 """ check if the notebook exists
92
93 Args:
94 name (str): the name of the notebook
95
96 Returns:
97 Metadata
98 """
99 return len(self.store.list(name)) + len(self.store.list(name + '.ipynb')) > 0
100
[docs]
101 def put(self, obj, name, attributes=None):
102 """ store a notebook job
103
104 Args:
105 obj (str|NotebookNode): the notebook object or the notebook's code as a string
106 name (str): the name of the job
107 attributes (dict): the attributes to store with the job
108
109 Returns:
110 Metadata
111 """
112 if not name.endswith('.ipynb'):
113 name += '.ipynb'
114 if isinstance(obj, str):
115 return self.create(obj, name)
116 sbuf = StringIO()
117 bbuf = BytesIO()
118 # nbwrite expects string, fs.put expects bytes
119 nbwrite(obj, sbuf, version=4)
120 sbuf.seek(0)
121 bbuf.write(sbuf.getvalue().encode('utf8'))
122 bbuf.seek(0)
123 # see if we have a file already, if so replace the gridfile
124 meta = self.store.metadata(name)
125 if not meta:
126 filename = uuid4().hex
127 fileid = self._store_to_file(self.store, bbuf, filename)
128 meta = self.store._make_metadata(name=name,
129 prefix=self.store.prefix,
130 bucket=self.store.bucket,
131 kind=self.kind,
132 attributes=attributes,
133 gridfile=fileid)
134 meta = meta.save()
135 else:
136 filename = uuid4().hex
137 meta.gridfile = self._store_to_file(self.store, bbuf, filename)
138 meta = meta.save()
139 # set config
140 nb_config = self.get_notebook_config(name)
141 meta_config = meta.attributes.get('config', {})
142 if nb_config:
143 meta_config.update(dict(**nb_config))
144 meta.attributes['config'] = meta_config
145 meta = meta.save()
146 if not name.startswith('results') and ('run-at' in meta_config):
147 meta = self.schedule(name)
148 return meta
149
[docs]
150 def get(self, name):
151 """
152 Retrieve a notebook and return a NotebookNode
153 """
154 if not name.endswith('.ipynb'):
155 name += '.ipynb'
156 meta = self.store.metadata(name)
157 if meta:
158 try:
159 outf = meta.gridfile
160 except gridfs.errors.NoFile as e:
161 raise e
162 # nbwrite wants a string, outf is bytes
163 sbuf = StringIO()
164 data = outf.read()
165 if data is None:
166 msg = 'Expected content in {name}, got None'.format(**locals())
167 raise ValueError(msg)
168 sbuf.write(data.decode('utf8'))
169 sbuf.seek(0)
170 nb = nbread(sbuf, as_version=4)
171 return nb
172 else:
173 raise gridfs.errors.NoFile(
174 ">{0}< does not exist in jobs bucket '{1}'".format(
175 name, self.store.bucket))
176
[docs]
177 def create(self, code, name):
178 """
179 create a notebook from code
180
181 :param code: the code as a string
182 :param name: the name of the job to create
183 :return: the metadata object created
184 """
185 cells = []
186 cells.append(nbv4.new_code_cell(source=code))
187 notebook = nbv4.new_notebook(cells=cells)
188 # put the notebook
189 meta = self.put(notebook, name)
190 return meta
191
192 def get_fs(self, collection=None):
193 # legacy support
194 return self._fs
195
[docs]
196 def get_collection(self, collection):
197 """
198 returns the collection object
199 """
200 # FIXME this should use store.collection
201 return getattr(self.store.mongodb, collection)
202
[docs]
203 def list(self, pattern=None, regexp=None, raw=False, **kwargs):
204 """
205 list all jobs matching filter.
206 filter is a regex on the name of the ipynb entry.
207 The default is all, i.e. `.*`
208 """
209 job_list = self.store.list(pattern=pattern, regexp=regexp, raw=raw, **kwargs)
210 name = lambda v: v.name if raw else v
211 if not self._include_dir_placeholder:
212 job_list = [v for v in job_list if not name(v).endswith(self._dir_placeholder)]
213 return job_list
214
[docs]
215 def get_notebook_config(self, nb_filename):
216 """
217 returns the omegaml script config on the notebook's first cell
218
219 The config cell is in any of the following formats:
220
221 *Option 1*::
222
223 # omega-ml:
224 # run-at: "<cron schedule>"
225
226 *Option 2*::
227
228 # omega-ml:
229 # schedule: "weekday(s), hour[, month]"
230
231 *Option 3*::
232
233 # omega-ml:
234 # cron: "<cron schedule>"
235
236 You may optionally specify only ``run-at``, ``schedule`` or ``cron``,
237 i.e. without the ``omega-ml`` header::
238
239 # run-at: "<cron schedule>"
240 # cron: "<cron schedule>"
241 # schedule: "weekday(s), hour[, month]"
242
243 Args:
244 nb_filename (str): the name of the notebook
245
246 Returns:
247 config(dict) => ``{ 'run-at': <specifier> }``
248
249 Raises:
250 ValueError, if there is no config cell or the config cell is invalid
251
252 See Also:
253 * JobSchedule
254 """
255 notebook = self.get(nb_filename)
256 config_cell = None
257 config_magic = ['# {}'.format(kw) for kw in self._nb_config_magic]
258 for cell in notebook.get('cells'):
259 if any(cell.source.startswith(kw) for kw in config_magic):
260 config_cell = cell
261 if not config_cell:
262 return {}
263 yaml_conf = '\n'.join(
264 [re.sub('#', '', x, 1) for x in str(
265 config_cell.source).splitlines()])
266 try:
267 yaml_conf = yaml.safe_load(yaml_conf)
268 config = yaml_conf.get(self._nb_config_magic[0], yaml_conf)
269 except Exception:
270 raise ValueError(
271 'Notebook configuration cannot be parsed')
272 # translate config to canonical form
273 # TODO refactor to seperate method / mapped translation functions
274 if 'schedule' in config:
275 config['run-at'] = JobSchedule(text=config.get('schedule', '')).cron
276 if 'cron' in config:
277 config['run-at'] = JobSchedule.from_cron(config.get('cron')).cron
278 if 'run-at' in config:
279 config['run-at'] = JobSchedule.from_cron(config.get('run-at')).cron
280 return config
281
[docs]
282 def run(self, name, event=None, timeout=None):
283 """
284 Run a job immediately
285
286 The job is run and the results are stored in om.jobs('results/name <timestamp>') and
287 the result's Metadata is returned.
288
289 ``Metadata.attributes`` of the original job as given by name is updated:
290
291 * ``attributes['job_runs']`` (list) - list of status of each run. Status is
292 a dict as below
293 * ``attributes['job_results']`` (list) - list of results job names in same
294 index-order as job_runs
295 * ``attributes['trigger']`` (list) - list of triggers
296
297 The status of each job run is a dict with keys:
298
299 * ``status`` (str): the status of the job run, OK or ERROR
300 * ``ts`` (datetime): time of execution
301 * ``message`` (str): error mesasge in case of ERROR, else blank
302 * ``results`` (str): name of results in case of OK, else blank
303
304 Usage::
305
306 # directly (sync)
307 meta = om.jobs.run('mynb')
308
309 # via runtime (async)
310 job = om.runtime.job('mynb')
311 result = job.run()
312
313 Args:
314 name (str): the name of the jobfile
315 event (str): an event name
316 timeout (int): timeout in seconds, None means no timeout
317
318 Returns:
319 Metadata of the results entry
320
321 See Also:
322 * OmegaJobs.run_notebook
323 """
324 return self.run_notebook(name, event=event, timeout=timeout)
325
[docs]
326 def run_notebook(self, name, event=None, timeout=None):
327 """ run a given notebook immediately.
328
329 Args:
330 name (str): the name of the jobfile
331 event (str): an event name
332 timeout (int): timeout in seconds
333
334 Returns:
335 Metadata of results
336
337 See Also:
338 * nbconvert https://nbconvert.readthedocs.io/en/latest/execute_api.html
339 """
340 notebook = self.get(name)
341 meta_job = self.metadata(name)
342 ts = datetime.datetime.now()
343 # execute kwargs
344 # -- see ExecuteProcessor class
345 # -- see https://nbconvert.readthedocs.io/en/latest/execute_api.html
346 ep_kwargs = {
347 # avoid timeouts to stop kernel
348 'timeout': timeout,
349 # avoid kernel at exit functions
350 # -- this stops ipykernel AttributeError 'send_multipart'
351 'shutdown_kernel': 'immediate',
352 # set kernel name, blank is default
353 # -- e.g. python3, ir
354 # -- see https://stackoverflow.com/a/47053020/890242
355 'kernel_name': '',
356 # always use the async kernel manager
357 # -- see https://github.com/jupyter/nbconvert/issues/1964
358 'kernel_manager_class': AsyncKernelManager,
359 }
360 # overrides from metadata
361 ep_kwargs.update(meta_job.kind_meta.get('ep_kwargs', {}))
362 try:
363 resources = {
364 'metadata': {
365 'path': self.defaults.OMEGA_TMP,
366 }
367 }
368 if not meta_job.kind_meta.get('keep_output', False):
369 # https://nbconvert.readthedocs.io/en/latest/api/preprocessors.html
370 cp = ClearOutputPreprocessor()
371 cp.preprocess(notebook, resources)
372 ep = ExecutePreprocessor(**ep_kwargs)
373 ep.preprocess(notebook, resources)
374 except Exception as e:
375 status = 'ERROR'
376 message = str(e)
377 else:
378 status = 'OK'
379 message = ''
380 finally:
381 del ep
382 # record results
383 meta_results = self.put(notebook,
384 'results/{name}_{ts}'.format(**locals()))
385 meta_results.attributes['source_job'] = name
386 meta_results.save()
387 job_results = meta_job.attributes.get('job_results', [])
388 job_results.append(meta_results.name)
389 meta_job.attributes['job_results'] = job_results
390 # record final job status
391 job_runs = meta_job.attributes.get('job_runs', [])
392 runstate = {
393 'status': status,
394 'ts': ts,
395 'message': message,
396 'results': meta_results.name if status == 'OK' else None
397 }
398 job_runs.append(runstate)
399 meta_job.attributes['job_runs'] = job_runs
400 # set event run state if event was specified
401 if event:
402 attrs = meta_job.attributes
403 triggers = attrs['triggers'] = attrs.get('triggers', [])
404 scheduled = (trigger for trigger in triggers
405 if trigger['event-kind'] == 'scheduled')
406 for trigger in scheduled:
407 if event == trigger['event']:
408 trigger['status'] = status
409 trigger['ts'] = ts
410 meta_job.save()
411 return meta_results
412
[docs]
413 def schedule(self, nb_file, run_at=None, last_run=None):
414 """
415 Schedule a processing of a notebook as per the interval
416 specified on the job script
417
418 Notes:
419 This updates the notebook's Metadata entry by adding the
420 next scheduled run in ``attributes['triggers']```
421
422 Args:
423 nb_file (str): the name of the notebook
424 run_at (str|dict|JobSchedule): the schedule specified in a format
425 suitable for JobSchedule. If not specified, this value is
426 extracted from the first cell of the notebook
427 last_run (datetime): the last time this job was run, use this to
428 reschedule the job for the next run. Defaults to the last
429 timestamp listed in ``attributes['job_runs']``, or datetime.utcnow()
430 if no previous run exists.
431
432 See Also:
433 * croniter.get_next()
434 * JobSchedule
435 * OmegaJobs.get_notebook_config
436 * cron expression - https://en.wikipedia.org/wiki/Cron#CRON_expression
437 """
438 meta = self.metadata(nb_file)
439 attrs = meta.attributes
440 # get/set run-at spec
441 config = attrs.get('config', {})
442 # see what we have as a schedule
443 # -- a dictionary of JobSchedule
444 if isinstance(run_at, dict):
445 run_at = self.Schedule(**run_at).cron
446 # -- a cron string
447 elif isinstance(run_at, str):
448 run_at = self.Schedule(run_at).cron
449 # -- a JobSchedule
450 elif isinstance(run_at, self.Schedule):
451 run_at = run_at.cron
452 # -- nothing, may we have it on the job's config already
453 if not run_at:
454 interval = config.get('run-at')
455 else:
456 # store the new schedule
457 config['run-at'] = run_at
458 interval = run_at
459 if not interval:
460 # if we don't have a run-spec, return without scheduling
461 raise ValueError('no run-at specification found, cannot schedule')
462 # get last time the job was run
463 if last_run is None:
464 job_runs = attrs.get('job_runs')
465 if job_runs:
466 last_run = job_runs[-1]['ts']
467 else:
468 last_run = datetime.datetime.utcnow()
469 # calculate next run time
470 iter_next = croniter(interval, last_run)
471 run_at = iter_next.get_next(datetime.datetime)
472 # store next scheduled run
473 triggers = attrs['triggers'] = attrs.get('triggers', [])
474 # set up a schedule event
475 scheduled_run = {
476 'event-kind': 'scheduled',
477 'event': run_at.isoformat(),
478 'run-at': run_at,
479 'status': 'PENDING'
480 }
481 # remove all pending triggers, add new triggers
482 past_triggers = [cur for cur in triggers if cur.get('status') != 'PENDING']
483 triggers.clear()
484 triggers.extend(past_triggers)
485 triggers.append(scheduled_run)
486 attrs['config'] = config
487 return meta.save()
488
[docs]
489 def get_schedule(self, name, only_pending=False):
490 """
491 return the cron schedule and corresponding triggers
492
493 Args:
494 name (str): the name of the job
495
496 Returns:
497 tuple of (run_at, triggers)
498
499 run_at (str): the cron spec, None if not scheduled
500 triggers (list): the list of triggers
501 """
502 meta = self.metadata(name)
503 attrs = meta.attributes
504 config = attrs.get('config')
505 triggers = attrs.get('triggers', [])
506 if only_pending:
507 triggers = [trigger for trigger in triggers
508 if trigger['status'] == 'PENDING']
509 if config and 'run-at' in config:
510 run_at = config.get('run-at')
511 else:
512 run_at = None
513 return run_at, triggers
514
[docs]
515 def drop_schedule(self, name):
516 """
517 Drop an existing schedule, if any
518
519 This will drop any existing schedule and any pending triggers of
520 event-kind 'scheduled'.
521
522 Args:
523 name (str): the name of the job
524
525 Returns:
526 Metadata
527 """
528 meta = self.metadata(name)
529 attrs = meta.attributes
530 config = attrs.get('config')
531 triggers = attrs.get('triggers')
532 if 'run-at' in config:
533 del config['run-at']
534 for trigger in triggers:
535 if trigger['event-kind'] == 'scheduled' and trigger['status'] == 'PENDING':
536 trigger['status'] = 'CANCELLED'
537 return meta.save()
538
[docs]
539 def export(self, name, localpath='memory', format='html'):
540 """
541 Export a job or result file to HTML
542
543 The job is exported in the given format.
544
545 :param name: the name of the job, as in jobs.get
546 :param localpath: the path of the local file to write. If you
547 specify an empty path or 'memory' a tuple of (body, resource)
548 is returned instead
549 :param format: the output format. currently only :code:`'html'` is supported
550 :return: the (data, resources) tuple as returned by nbconvert. For
551 format html data is the HTML's body, for PDF it is the pdf file contents
552 """
553 from traitlets.config import Config
554 from nbconvert import SlidesExporter
555 from nbconvert.exporters.html import HTMLExporter
556 from nbconvert.exporters.pdf import PDFExporter
557
558 # https://nbconvert.readthedocs.io/en/latest/nbconvert_library.html
559 # (exporter class, filemode, config-values
560 EXPORTERS = {
561 'html': (HTMLExporter, '', {}),
562 'htmlbody': (HTMLExporter, '', {}),
563 'pdf': (PDFExporter, 'b', {}),
564 'slides': (SlidesExporter, '', {'RevealHelpPreprocessor.url_prefix':
565 'https://cdnjs.cloudflare.com/ajax/libs/reveal.js/3.6.0/'}),
566 }
567 # get exporter according to format
568 if format not in EXPORTERS:
569 raise ValueError('format {} is invalid. Choose one of {}'.format(format, EXPORTERS.keys()))
570 exporter_cls, fmode, configkw = EXPORTERS[format]
571 # prepare config
572 # http://nbconvert.readthedocs.io/en/latest/nbconvert_library.html#Using-different-preprocessors
573 c = Config()
574 for k, v in configkw.items():
575 context, key = k.split('.')
576 setattr(c[context], key, v)
577 # get configured exporter
578 exporter = exporter_cls(config=c)
579 # get notebook, convert and store in file if requested
580 notebook = self.get(name)
581 (data, resources) = exporter.from_notebook_node(notebook)
582 if localpath and localpath != 'memory':
583 with open(localpath, 'w' + fmode) as fout:
584 fout.write(data)
585 return data, resources
586
[docs]
587 def help(self, name_or_obj=None, kind=None, raw=False):
588 """ get help for a notebook
589
590 Args:
591 name_or_obj (str|obj): the name or actual object to get help for
592 kind (str): optional, if specified forces retrieval of backend for the given kind
593 raw (bool): optional, if True forces help to be the backend type of the object.
594 If False returns the attributes[docs] on the object's metadata, if available.
595 Defaults to False
596
597 Returns:
598 * help(obj) if python is in interactive mode
599 * text(str) if python is in not interactive mode
600 """
601 return self.store.help(name_or_obj, kind=kind, raw=raw)
602
603 def to_archive(self, name, path, **kwargs):
604 # TODO remove, pending #218
605 if not name.endswith('.ipynb'):
606 name += '.ipynb'
607 return self.store.to_archive(name, path, **kwargs)
608
609 def from_archive(self, path, name, **kwargs):
610 # TODO remove, pending #218
611 if not name.endswith('.ipynb'):
612 name += '.ipynb'
613 return self.store.from_archive(path, name, **kwargs)
614
615 def promote(self, name, other, **kwargs):
616 # TODO remove, pending #218
617 nb = self.get(name)
618 return other.put(nb, name)
619
620 def summary(self, *args, **kwargs):
621 return self.store.summary(*args, **kwargs)