Source code for omegaml.notebook.jobs

  1from __future__ import absolute_import
  2
  3import datetime
  4import re
  5from io import StringIO, BytesIO
  6from uuid import uuid4
  7
  8import gridfs
  9import yaml
 10from croniter import croniter
 11from jupyter_client import AsyncKernelManager
 12from nbformat import read as nbread, write as nbwrite, v4 as nbv4, NotebookNode
 13
 14from omegaml.backends.basedata import BaseDataBackend
 15from omegaml.documents import MDREGISTRY
 16from omegaml.notebook.jobschedule import JobSchedule
 17
 18
 19class NotebookBackend(BaseDataBackend):
 20    """
 21    Omega Jobs API
 22    """
 23
 24    # TODO this class should be a proper backend class with a mixin for ipynb
 25
 26    KIND = MDREGISTRY.OMEGAML_JOBS
 27
 28    @classmethod
 29    def supports(cls, obj, name, data_store=None, **kwargs):
 30        return data_store.prefix == 'jobs/' and (isinstance(obj, (str, NotebookNode)) or name.endswith('.ipynb'))
 31
 32    @property
 33    def store(self):
 34        return self.data_store
 35
 36    @property
 37    def _db(self):
 38        return self.store.mongodb
 39
 40    @property
 41    def _fs(self):
 42        return self.store.fs
 43
 44    def put(self, obj, name, attributes=None, **kwargs):
 45        """ store a notebook job
 46
 47        Args:
 48            obj (str|NotebookNode): the notebook object or the notebook's code as a string
 49            name (str): the name of the job
 50            attributes (dict): the attributes to store with the job
 51
 52        Returns:
 53            Metadata
 54        """
 55        if not name.endswith('.ipynb'):
 56            name += '.ipynb'
 57        if isinstance(obj, str):
 58            return self.store.create(obj, name)
 59        sbuf = StringIO()
 60        bbuf = BytesIO()
 61        # nbwrite expects string, fs.put expects bytes
 62        nbwrite(obj, sbuf, version=4)
 63        sbuf.seek(0)
 64        bbuf.write(sbuf.getvalue().encode('utf8'))
 65        bbuf.seek(0)
 66        # see if we have a file already, if so replace the gridfile
 67        meta = self.store.metadata(name)
 68        if not meta:
 69            filename = uuid4().hex
 70            fileid = self._store_to_file(self.store, bbuf, filename)
 71            meta = self.store._make_metadata(name=name,
 72                                             prefix=self.store.prefix,
 73                                             bucket=self.store.bucket,
 74                                             kind=self.KIND,
 75                                             attributes=attributes,
 76                                             gridfile=fileid)
 77            meta = meta.save()
 78        else:
 79            filename = uuid4().hex
 80            meta.gridfile = self._store_to_file(self.store, bbuf, filename)
 81            meta = meta.save()
 82        # set config
 83        nb_config = self.store.get_notebook_config(name)
 84        meta_config = meta.attributes.get('config', {})
 85        if nb_config:
 86            meta_config.update(dict(**nb_config))
 87            meta.attributes['config'] = meta_config
 88        meta = meta.save()
 89        if not name.startswith('results') and ('run-at' in meta_config):
 90            meta = self.store.schedule(name)
 91        return meta
 92
 93    def get(self, name, version=-1, force_python=False, lazy=False, **kwargs):
 94        """
 95        Retrieve a notebook and return a NotebookNode
 96        """
 97        if not name.endswith('.ipynb'):
 98            name += '.ipynb'
 99        meta = self.store.metadata(name)
100        if meta:
101            try:
102                outf = meta.gridfile
103            except gridfs.errors.NoFile as e:
104                raise e
105            # nbwrite wants a string, outf is bytes
106            sbuf = StringIO()
107            data = outf.read()
108            if data is None:
109                msg = 'Expected content in {name}, got None'.format(**locals())
110                raise ValueError(msg)
111            sbuf.write(data.decode('utf8'))
112            sbuf.seek(0)
113            nb = nbread(sbuf, as_version=4)
114            return nb
115        else:
116            raise gridfs.errors.NoFile(
117                ">{0}< does not exist in jobs bucket '{1}'".format(
118                    name, self.store.bucket))
119
120
121class NotebookMixin:
122    """ om.jobs storage methods
123
124    Mixin to OmegaStore to provide om.jobs-specific methods
125
126    Usage:
127        # create a new notebook from code
128        code = \"""
129        print('hello')
130        \"""
131        om.jobs.create(code, 'name')
132
133        # schedule a notebook
134        om.jobs.schedule('name', run_at='<cron spec>')
135
136        # run a notebook immediately
137        om.jobs.run('name')
138    """
139    _nb_config_magic = 'omega-ml', 'schedule', 'run-at', 'cron'
140    _dir_placeholder = '_placeholder.ipynb'
141
142    def _init_mixin(self, *args, **kwargs):
143        self._include_dir_placeholder = True
144        # convenience so you can do om.jobs.schedule(..., run_at=om.jobs.Schedule(....))
145        self.Schedule = JobSchedule
146
147    @classmethod
148    def supports(cls, store, *args, **kwargs):
149        return store.prefix == 'jobs/'
150
151    def __repr__(self):
152        return 'OmegaJobs(store={})'.format(super().__repr__())
153
154    def collection(self, name):
155        if not name.endswith('.ipynb'):
156            name += '.ipynb'
157        return super().collection(name)
158
159    def metadata(self, name, **kwargs):
160        """ retrieve metadata of a notebook
161
162        Args:
163            name (str): the name of the notebook
164
165        Returns:
166            Metadata
167        """
168        meta = super().metadata(name, **kwargs)
169        if meta is None and not name.endswith('.ipynb'):
170            name += '.ipynb'
171            meta = super().metadata(name)
172        return meta
173
174    def exists(self, name):
175        """ check if the notebook exists
176
177        Args:
178            name (str): the name of the notebook
179
180        Returns:
181            Metadata
182        """
183        return len(super().list(name)) + len(super().list(name + '.ipynb')) > 0
184
185    def create(self, code, name, **kwargs):
186        """
187        create a notebook from code
188
189        :param code: the code as a string
190        :param name: the name of the job to create
191        :return: the metadata object created
192        """
193        cells = []
194        cells.append(nbv4.new_code_cell(source=code))
195        notebook = nbv4.new_notebook(cells=cells)
196        # put the notebook
197        meta = self.put(notebook, name, **kwargs)
198        return meta
199
200    def get_fs(self, collection=None):
201        # legacy support
202        return self._fs
203
204    def get_collection(self, collection):
205        """
206        returns the collection object
207        """
208        # FIXME this should use store.collection
209        return getattr(self.mongodb, collection)
210
211    def list(self, pattern=None, regexp=None, raw=False, **kwargs):
212        """
213        list all jobs matching filter.
214        filter is a regex on the name of the ipynb entry.
215        The default is all, i.e. `.*`
216        """
217        job_list = super().list(pattern=pattern, regexp=regexp, raw=raw, **kwargs)
218        name = lambda v: v.name if raw else v
219        if not self._include_dir_placeholder:
220            job_list = [v for v in job_list if not name(v).endswith(self._dir_placeholder)]
221        return job_list
222
223    def get_notebook_config(self, nb_filename):
224        """
225        returns the omegaml script config on the notebook's first cell
226
227        The config cell is in any of the following formats:
228
229        *Option 1*::
230
231            # omega-ml:
232            #    run-at: "<cron schedule>"
233
234        *Option 2*::
235
236            # omega-ml:
237            #   schedule: "weekday(s), hour[, month]"
238
239        *Option 3*::
240
241            # omega-ml:
242            #   cron: "<cron schedule>"
243
244        You may optionally specify only ``run-at``, ``schedule`` or ``cron``,
245        i.e. without the ``omega-ml`` header::
246
247            # run-at: "<cron schedule>"
248            # cron: "<cron schedule>"
249            # schedule: "weekday(s), hour[, month]"
250
251        Args:
252            nb_filename (str): the name of the notebook
253
254        Returns:
255            config(dict) => ``{ 'run-at': <specifier> }``
256
257        Raises:
258            ValueError, if there is no config cell or the config cell is invalid
259
260        See Also:
261            * JobSchedule
262        """
263        notebook = self.get(nb_filename)
264        config_cell = None
265        config_magic = ['# {}'.format(kw) for kw in self._nb_config_magic]
266        for cell in notebook.get('cells'):
267            if any(cell.source.startswith(kw) for kw in config_magic):
268                config_cell = cell
269        if not config_cell:
270            return {}
271        yaml_conf = '\n'.join(
272            [re.sub('#', '', x, 1) for x in str(
273                config_cell.source).splitlines()])
274        try:
275            yaml_conf = yaml.safe_load(yaml_conf)
276            config = yaml_conf.get(self._nb_config_magic[0], yaml_conf)
277        except Exception:
278            raise ValueError(
279                'Notebook configuration cannot be parsed')
280        # translate config to canonical form
281        # TODO refactor to seperate method / mapped translation functions
282        if 'schedule' in config:
283            config['run-at'] = JobSchedule(text=config.get('schedule', '')).cron
284        if 'cron' in config:
285            config['run-at'] = JobSchedule.from_cron(config.get('cron')).cron
286        if 'run-at' in config:
287            config['run-at'] = JobSchedule.from_cron(config.get('run-at')).cron
288        return config
289
290    def run(self, name, event=None, timeout=None):
291        """
292        Run a job immediately
293
294        The job is run and the results are stored in om.jobs('results/name <timestamp>') and
295        the result's Metadata is returned.
296
297        ``Metadata.attributes`` of the original job as given by name is updated:
298
299        * ``attributes['job_runs']`` (list) - list of status of each run. Status is
300             a dict as below
301        * ``attributes['job_results']`` (list) - list of results job names in same
302             index-order as job_runs
303        * ``attributes['trigger']`` (list) - list of triggers
304
305        The status of each job run is a dict with keys:
306
307        * ``status`` (str): the status of the job run, OK or ERROR
308        * ``ts`` (datetime): time of execution
309        * ``message`` (str): error mesasge in case of ERROR, else blank
310        * ``results`` (str): name of results in case of OK, else blank
311
312        Usage::
313
314            # directly (sync)
315            meta = om.jobs.run('mynb')
316
317            # via runtime (async)
318            job = om.runtime.job('mynb')
319            result = job.run()
320
321        Args:
322            name (str): the name of the jobfile
323            event (str): an event name
324            timeout (int): timeout in seconds, None means no timeout
325
326        Returns:
327             Metadata of the results entry
328
329        See Also:
330            * OmegaJobs.run_notebook
331        """
332        return self.run_notebook(name, event=event, timeout=timeout)
333
334    def run_notebook(self, name, event=None, timeout=None):
335        """ run a given notebook immediately.
336
337        Args:
338            name (str): the name of the jobfile
339            event (str): an event name
340            timeout (int): timeout in seconds
341
342        Returns:
343            Metadata of results
344
345        Notes:
346            The notebook is run using nbconvert.
347
348            The job's metadata is updated to reflect all job runs and their results
349            as follows::
350
351                {
352                    'job_runs': [{
353                        'status': '<execution status>',
354                        'ts': <datetime>,
355                        'message': '<blank | exception>,
356                        'results': 'results/<name>_<datetime>.ipynb',
357                    } ...],
358                    'job_results': [
359                        'results/<name>_<datetime>.ipynb',
360                    ]
361                }
362
363            `job_runs` is a list representing result status of each time the job was
364            executed by the runtime, either by `om.runtime.job('<name>').run()`, or
365            as a scheduled execution. In case of an exception, the message is truncated
366            to contain an excerpt of str(exception).
367
368            `job_results` is a list of members in `om.jobs` representing the result
369            each job execution. Each result is equivalent of an interactive execution
370            in jupyter.
371
372        See Also:
373            * nbconvert https://nbconvert.readthedocs.io/en/latest/execute_api.html
374
375        .. versionchanged:: 0.18
376            `job_runs[].results` lists the name of the respective `job_results` item
377            regardless of status. In prior versions `job_runs[].results` was null for
378            failed jobs.
379
380        .. versionchanged:: 0.18
381            `job_runs[].message` is truncated to contain the first and last 80
382            characters of an exception. The actual exception is preserved in the
383            job_results notebook, stored as a separate object.
384        """
385        from nbconvert.preprocessors import ClearOutputPreprocessor
386        from nbconvert.preprocessors.execute import ExecutePreprocessor
387
388        notebook = self.get(name)
389        meta_job = self.metadata(name)
390        assert meta_job is not None, f"Cannot run non-existent jobs {name}"
391        ts = datetime.datetime.now()
392        # execute kwargs
393        # -- see ExecuteProcessor class
394        # -- see https://nbconvert.readthedocs.io/en/latest/execute_api.html
395        ep_kwargs = {
396            # avoid timeouts to stop kernel
397            'timeout': timeout,
398            # avoid kernel at exit functions
399            # -- this stops ipykernel AttributeError 'send_multipart'
400            'shutdown_kernel': 'immediate',
401            # set kernel name, blank is default
402            # -- e.g. python3, ir
403            # -- see https://stackoverflow.com/a/47053020/890242
404            'kernel_name': '',
405            # always use the async kernel manager
406            # -- see https://github.com/jupyter/nbconvert/issues/1964
407            'kernel_manager_class': AsyncKernelManager,
408        }
409        # overrides from metadata
410        ep_kwargs.update(meta_job.kind_meta.get('ep_kwargs', {}))
411        try:
412            resources = {
413                'metadata': {
414                    'path': self.defaults.OMEGA_TMP,
415                }
416            }
417            if not meta_job.kind_meta.get('keep_output', False):
418                # https://nbconvert.readthedocs.io/en/latest/api/preprocessors.html
419                cp = ClearOutputPreprocessor()
420                cp.preprocess(notebook, resources)
421            ep = ExecutePreprocessor(**ep_kwargs)
422            ep.preprocess(notebook, resources)
423        except Exception as e:
424            status = 'ERROR'
425            message = str(e)
426        else:
427            status = 'OK'
428            message = ''
429        finally:
430            # limit size of metadata.attributes #496
431            message = f'{message[0:80]}...{message[-80:]}'
432            del ep
433        # record results
434        meta_results = self.put(notebook,
435                                'results/{name}_{ts}'.format(**locals()))
436        meta_results.attributes['source_job'] = name
437        meta_results.save()
438        job_results = meta_job.attributes.get('job_results', [])
439        job_results.append(meta_results.name)
440        meta_job.attributes['job_results'] = job_results
441        # record final job status
442        job_runs = meta_job.attributes.get('job_runs', [])
443        runstate = {
444            'status': status,
445            'ts': ts,
446            'message': message,
447            'results': meta_results.name
448        }
449        job_runs.append(runstate)
450        meta_job.attributes['job_runs'] = job_runs
451        # set event run state if event was specified
452        if event:
453            attrs = meta_job.attributes
454            triggers = attrs['triggers'] = attrs.get('triggers', [])
455            scheduled = (trigger for trigger in triggers
456                         if trigger['event-kind'] == 'scheduled')
457            for trigger in scheduled:
458                if event == trigger['event']:
459                    trigger['status'] = status
460                    trigger['ts'] = ts
461        meta_job.save()
462        return meta_results
463
464    def schedule(self, nb_file, run_at=None, last_run=None):
465        """
466        Schedule a processing of a notebook as per the interval
467        specified on the job script
468
469        Notes:
470            This updates the notebook's Metadata entry by adding the
471            next scheduled run in ``attributes['triggers']```
472
473        Args:
474            nb_file (str): the name of the notebook
475            run_at (str|dict|JobSchedule): the schedule specified in a format
476               suitable for JobSchedule. If not specified, this value is
477               extracted from the first cell of the notebook
478            last_run (datetime): the last time this job was run, use this to
479               reschedule the job for the next run. Defaults to the last
480               timestamp listed in ``attributes['job_runs']``, or datetime.utcnow()
481               if no previous run exists.
482
483        See Also:
484            * croniter.get_next()
485            * JobSchedule
486            * OmegaJobs.get_notebook_config
487            * cron expression - https://en.wikipedia.org/wiki/Cron#CRON_expression
488        """
489        meta = self.metadata(nb_file)
490        attrs = meta.attributes
491        # get/set run-at spec
492        config = attrs.get('config', {})
493        # see what we have as a schedule
494        # -- a dictionary of JobSchedule
495        if isinstance(run_at, dict):
496            run_at = self.Schedule(**run_at).cron
497        # -- a cron string
498        elif isinstance(run_at, str):
499            run_at = self.Schedule(run_at).cron
500        # -- a JobSchedule
501        elif isinstance(run_at, self.Schedule):
502            run_at = run_at.cron
503        # -- nothing, may we have it on the job's config already
504        if not run_at:
505            interval = config.get('run-at')
506        else:
507            # store the new schedule
508            config['run-at'] = run_at
509            interval = run_at
510        if not interval:
511            # if we don't have a run-spec, return without scheduling
512            raise ValueError('no run-at specification found, cannot schedule')
513        # get last time the job was run
514        if last_run is None:
515            job_runs = attrs.get('job_runs')
516            if job_runs:
517                last_run = job_runs[-1]['ts']
518            else:
519                last_run = datetime.datetime.utcnow()
520        # calculate next run time
521        iter_next = croniter(interval, last_run)
522        run_at = iter_next.get_next(datetime.datetime)
523        # store next scheduled run
524        triggers = attrs['triggers'] = attrs.get('triggers', [])
525        # set up a schedule event
526        scheduled_run = {
527            'event-kind': 'scheduled',
528            'event': run_at.isoformat(),
529            'run-at': run_at,
530            'status': 'PENDING'
531        }
532        # remove all pending triggers, add new triggers
533        past_triggers = [cur for cur in triggers if cur.get('status') != 'PENDING']
534        triggers.clear()
535        triggers.extend(past_triggers)
536        triggers.append(scheduled_run)
537        attrs['config'] = config
538        return meta.save()
539
540    def get_schedule(self, name, only_pending=False):
541        """
542        return the cron schedule and corresponding triggers
543
544        Args:
545            name (str): the name of the job
546
547        Returns:
548            tuple of (run_at, triggers)
549
550            run_at (str): the cron spec, None if not scheduled
551            triggers (list): the list of triggers
552        """
553        meta = self.metadata(name)
554        attrs = meta.attributes
555        config = attrs.get('config')
556        triggers = attrs.get('triggers', [])
557        if only_pending:
558            triggers = [trigger for trigger in triggers
559                        if trigger['status'] == 'PENDING']
560        if config and 'run-at' in config:
561            run_at = config.get('run-at')
562        else:
563            run_at = None
564        return run_at, triggers
565
566    def drop_schedule(self, name):
567        """
568        Drop an existing schedule, if any
569
570        This will drop any existing schedule and any pending triggers of
571        event-kind 'scheduled'.
572
573        Args:
574            name (str): the name of the job
575
576        Returns:
577            Metadata
578        """
579        meta = self.metadata(name)
580        attrs = meta.attributes
581        config = attrs.get('config')
582        triggers = attrs.get('triggers')
583        if 'run-at' in config:
584            del config['run-at']
585        for trigger in triggers:
586            if trigger['event-kind'] == 'scheduled' and trigger['status'] == 'PENDING':
587                trigger['status'] = 'CANCELLED'
588        return meta.save()
589
590    def export(self, name, localpath='memory', format='html'):
591        """
592        Export a job or result file to HTML
593
594        The job is exported in the given format.
595
596        :param name: the name of the job, as in jobs.get
597        :param localpath: the path of the local file to write. If you
598           specify an empty path or 'memory' a tuple of (body, resource)
599           is returned instead
600        :param format: the output format. currently only :code:`'html'` is supported
601        :return: the (data, resources) tuple as returned by nbconvert. For
602           format html data is the HTML's body, for PDF it is the pdf file contents
603        """
604        from traitlets.config import Config
605        from nbconvert import SlidesExporter
606        from nbconvert.exporters.html import HTMLExporter
607        from nbconvert.exporters.pdf import PDFExporter
608
609        # https://nbconvert.readthedocs.io/en/latest/nbconvert_library.html
610        # (exporter class, filemode, config-values
611        EXPORTERS = {
612            'html': (HTMLExporter, '', {}),
613            'htmlbody': (HTMLExporter, '', {}),
614            'pdf': (PDFExporter, 'b', {}),
615            'slides': (SlidesExporter, '', {'RevealHelpPreprocessor.url_prefix':
616                                                'https://cdnjs.cloudflare.com/ajax/libs/reveal.js/3.6.0/'}),
617        }
618        # get exporter according to format
619        if format not in EXPORTERS:
620            raise ValueError('format {} is invalid. Choose one of {}'.format(format, EXPORTERS.keys()))
621        exporter_cls, fmode, configkw = EXPORTERS[format]
622        # prepare config
623        # http://nbconvert.readthedocs.io/en/latest/nbconvert_library.html#Using-different-preprocessors
624        c = Config()
625        for k, v in configkw.items():
626            context, key = k.split('.')
627            setattr(c[context], key, v)
628        # get configured exporter
629        exporter = exporter_cls(config=c)
630        # get notebook, convert and store in file if requested
631        notebook = self.get(name)
632        (data, resources) = exporter.from_notebook_node(notebook)
633        if localpath and localpath != 'memory':
634            with open(localpath, 'w' + fmode) as fout:
635                fout.write(data)
636        return data, resources
637
638    def help(self, name_or_obj=None, kind=None, raw=False):
639        """ get help for a notebook
640
641        Args:
642            name_or_obj (str|obj): the name or actual object to get help for
643            kind (str): optional, if specified forces retrieval of backend for the given kind
644            raw (bool): optional, if True forces help to be the backend type of the object.
645                If False returns the attributes[docs] on the object's metadata, if available.
646                Defaults to False
647
648        Returns:
649            * help(obj) if python is in interactive mode
650            * text(str) if python is in not interactive mode
651        """
652        return super().help(name_or_obj, kind=kind, raw=raw)
653
654    def to_archive(self, name, path, **kwargs):
655        if not name.endswith('.ipynb'):
656            name += '.ipynb'
657        return super().to_archive(name, path, **kwargs)
658
659    def from_archive(self, path, name, **kwargs):
660        if not name.endswith('.ipynb'):
661            name += '.ipynb'
662        return super().from_archive(path, name, **kwargs)
663
664    def promote(self, name, other, **kwargs):
665        nb = self.get(name)
666        return other.put(nb, name)
667
668    def summary(self, *args, **kwargs):
669        return super().summary(*args, **kwargs)
670
671
672# sphinx docs compatibility
[docs] 673class OmegaJobs(NotebookMixin, NotebookBackend): 674 pass