Source code for omegaml.notebook.jobs

  1from __future__ import absolute_import
  2
  3import datetime
  4import re
  5from io import StringIO, BytesIO
  6from uuid import uuid4
  7
  8import gridfs
  9import yaml
 10from croniter import croniter
 11from jupyter_client import AsyncKernelManager
 12from nbconvert.preprocessors import ClearOutputPreprocessor
 13from nbconvert.preprocessors.execute import ExecutePreprocessor
 14from nbformat import read as nbread, write as nbwrite, v4 as nbv4, NotebookNode
 15
 16from omegaml.backends.basedata import BaseDataBackend
 17from omegaml.documents import MDREGISTRY
 18from omegaml.notebook.jobschedule import JobSchedule
 19
 20
 21class NotebookBackend(BaseDataBackend):
 22    """
 23    Omega Jobs API
 24    """
 25
 26    # TODO this class should be a proper backend class with a mixin for ipynb
 27
 28    KIND = MDREGISTRY.OMEGAML_JOBS
 29
 30    @classmethod
 31    def supports(cls, obj, name, data_store=None, **kwargs):
 32        return data_store.prefix == 'jobs/' and (isinstance(obj, (str, NotebookNode)) or name.endswith('.ipynb'))
 33
 34    @property
 35    def store(self):
 36        return self.data_store
 37
 38    @property
 39    def _db(self):
 40        return self.store.mongodb
 41
 42    @property
 43    def _fs(self):
 44        return self.store.fs
 45
 46    def put(self, obj, name, attributes=None, **kwargs):
 47        """ store a notebook job
 48
 49        Args:
 50            obj (str|NotebookNode): the notebook object or the notebook's code as a string
 51            name (str): the name of the job
 52            attributes (dict): the attributes to store with the job
 53
 54        Returns:
 55            Metadata
 56        """
 57        if not name.endswith('.ipynb'):
 58            name += '.ipynb'
 59        if isinstance(obj, str):
 60            return self.create(obj, name)
 61        sbuf = StringIO()
 62        bbuf = BytesIO()
 63        # nbwrite expects string, fs.put expects bytes
 64        nbwrite(obj, sbuf, version=4)
 65        sbuf.seek(0)
 66        bbuf.write(sbuf.getvalue().encode('utf8'))
 67        bbuf.seek(0)
 68        # see if we have a file already, if so replace the gridfile
 69        meta = self.store.metadata(name)
 70        if not meta:
 71            filename = uuid4().hex
 72            fileid = self._store_to_file(self.store, bbuf, filename)
 73            meta = self.store._make_metadata(name=name,
 74                                             prefix=self.store.prefix,
 75                                             bucket=self.store.bucket,
 76                                             kind=self.KIND,
 77                                             attributes=attributes,
 78                                             gridfile=fileid)
 79            meta = meta.save()
 80        else:
 81            filename = uuid4().hex
 82            meta.gridfile = self._store_to_file(self.store, bbuf, filename)
 83            meta = meta.save()
 84        # set config
 85        nb_config = self.store.get_notebook_config(name)
 86        meta_config = meta.attributes.get('config', {})
 87        if nb_config:
 88            meta_config.update(dict(**nb_config))
 89            meta.attributes['config'] = meta_config
 90        meta = meta.save()
 91        if not name.startswith('results') and ('run-at' in meta_config):
 92            meta = self.store.schedule(name)
 93        return meta
 94
 95    def get(self, name, version=-1, force_python=False, lazy=False, **kwargs):
 96        """
 97        Retrieve a notebook and return a NotebookNode
 98        """
 99        if not name.endswith('.ipynb'):
100            name += '.ipynb'
101        meta = self.store.metadata(name)
102        if meta:
103            try:
104                outf = meta.gridfile
105            except gridfs.errors.NoFile as e:
106                raise e
107            # nbwrite wants a string, outf is bytes
108            sbuf = StringIO()
109            data = outf.read()
110            if data is None:
111                msg = 'Expected content in {name}, got None'.format(**locals())
112                raise ValueError(msg)
113            sbuf.write(data.decode('utf8'))
114            sbuf.seek(0)
115            nb = nbread(sbuf, as_version=4)
116            return nb
117        else:
118            raise gridfs.errors.NoFile(
119                ">{0}< does not exist in jobs bucket '{1}'".format(
120                    name, self.store.bucket))
121
122
123class NotebookMixin:
124    """ om.jobs storage methods
125
126    Mixin to OmegaStore to provide om.jobs-specific methods
127
128    Usage:
129        # create a new notebook from code
130        code = \"""
131        print('hello')
132        \"""
133        om.jobs.create(code, 'name')
134
135        # schedule a notebook
136        om.jobs.schedule('name', run_at='<cron spec>')
137
138        # run a notebook immediately
139        om.jobs.run('name')
140    """
141    _nb_config_magic = 'omega-ml', 'schedule', 'run-at', 'cron'
142    _dir_placeholder = '_placeholder.ipynb'
143
144    def _init_mixin(self, *args, **kwargs):
145        self._include_dir_placeholder = True
146        # convenience so you can do om.jobs.schedule(..., run_at=om.jobs.Schedule(....))
147        self.Schedule = JobSchedule
148
149    @classmethod
150    def supports(cls, store, *args, **kwargs):
151        return store.prefix == 'jobs/'
152
153    def __repr__(self):
154        return 'OmegaJobs(store={})'.format(super().__repr__())
155
156    def collection(self, name):
157        if not name.endswith('.ipynb'):
158            name += '.ipynb'
159        return super().collection(name)
160
161    def metadata(self, name, **kwargs):
162        """ retrieve metadata of a notebook
163
164        Args:
165            name (str): the name of the notebook
166
167        Returns:
168            Metadata
169        """
170        meta = super().metadata(name, **kwargs)
171        if meta is None and not name.endswith('.ipynb'):
172            name += '.ipynb'
173            meta = super().metadata(name)
174        return meta
175
176    def exists(self, name):
177        """ check if the notebook exists
178
179        Args:
180            name (str): the name of the notebook
181
182        Returns:
183            Metadata
184        """
185        return len(super().list(name)) + len(super().list(name + '.ipynb')) > 0
186
187    def create(self, code, name, **kwargs):
188        """
189        create a notebook from code
190
191        :param code: the code as a string
192        :param name: the name of the job to create
193        :return: the metadata object created
194        """
195        cells = []
196        cells.append(nbv4.new_code_cell(source=code))
197        notebook = nbv4.new_notebook(cells=cells)
198        # put the notebook
199        meta = self.put(notebook, name, **kwargs)
200        return meta
201
202    def get_fs(self, collection=None):
203        # legacy support
204        return self._fs
205
206    def get_collection(self, collection):
207        """
208        returns the collection object
209        """
210        # FIXME this should use store.collection
211        return getattr(self.mongodb, collection)
212
213    def list(self, pattern=None, regexp=None, raw=False, **kwargs):
214        """
215        list all jobs matching filter.
216        filter is a regex on the name of the ipynb entry.
217        The default is all, i.e. `.*`
218        """
219        job_list = super().list(pattern=pattern, regexp=regexp, raw=raw, **kwargs)
220        name = lambda v: v.name if raw else v
221        if not self._include_dir_placeholder:
222            job_list = [v for v in job_list if not name(v).endswith(self._dir_placeholder)]
223        return job_list
224
225    def get_notebook_config(self, nb_filename):
226        """
227        returns the omegaml script config on the notebook's first cell
228
229        The config cell is in any of the following formats:
230
231        *Option 1*::
232
233            # omega-ml:
234            #    run-at: "<cron schedule>"
235
236        *Option 2*::
237
238            # omega-ml:
239            #   schedule: "weekday(s), hour[, month]"
240
241        *Option 3*::
242
243            # omega-ml:
244            #   cron: "<cron schedule>"
245
246        You may optionally specify only ``run-at``, ``schedule`` or ``cron``,
247        i.e. without the ``omega-ml`` header::
248
249            # run-at: "<cron schedule>"
250            # cron: "<cron schedule>"
251            # schedule: "weekday(s), hour[, month]"
252
253        Args:
254            nb_filename (str): the name of the notebook
255
256        Returns:
257            config(dict) => ``{ 'run-at': <specifier> }``
258
259        Raises:
260            ValueError, if there is no config cell or the config cell is invalid
261
262        See Also:
263            * JobSchedule
264        """
265        notebook = self.get(nb_filename)
266        config_cell = None
267        config_magic = ['# {}'.format(kw) for kw in self._nb_config_magic]
268        for cell in notebook.get('cells'):
269            if any(cell.source.startswith(kw) for kw in config_magic):
270                config_cell = cell
271        if not config_cell:
272            return {}
273        yaml_conf = '\n'.join(
274            [re.sub('#', '', x, 1) for x in str(
275                config_cell.source).splitlines()])
276        try:
277            yaml_conf = yaml.safe_load(yaml_conf)
278            config = yaml_conf.get(self._nb_config_magic[0], yaml_conf)
279        except Exception:
280            raise ValueError(
281                'Notebook configuration cannot be parsed')
282        # translate config to canonical form
283        # TODO refactor to seperate method / mapped translation functions
284        if 'schedule' in config:
285            config['run-at'] = JobSchedule(text=config.get('schedule', '')).cron
286        if 'cron' in config:
287            config['run-at'] = JobSchedule.from_cron(config.get('cron')).cron
288        if 'run-at' in config:
289            config['run-at'] = JobSchedule.from_cron(config.get('run-at')).cron
290        return config
291
292    def run(self, name, event=None, timeout=None):
293        """
294        Run a job immediately
295
296        The job is run and the results are stored in om.jobs('results/name <timestamp>') and
297        the result's Metadata is returned.
298
299        ``Metadata.attributes`` of the original job as given by name is updated:
300
301        * ``attributes['job_runs']`` (list) - list of status of each run. Status is
302             a dict as below
303        * ``attributes['job_results']`` (list) - list of results job names in same
304             index-order as job_runs
305        * ``attributes['trigger']`` (list) - list of triggers
306
307        The status of each job run is a dict with keys:
308
309        * ``status`` (str): the status of the job run, OK or ERROR
310        * ``ts`` (datetime): time of execution
311        * ``message`` (str): error mesasge in case of ERROR, else blank
312        * ``results`` (str): name of results in case of OK, else blank
313
314        Usage::
315
316            # directly (sync)
317            meta = om.jobs.run('mynb')
318
319            # via runtime (async)
320            job = om.runtime.job('mynb')
321            result = job.run()
322
323        Args:
324            name (str): the name of the jobfile
325            event (str): an event name
326            timeout (int): timeout in seconds, None means no timeout
327
328        Returns:
329             Metadata of the results entry
330
331        See Also:
332            * OmegaJobs.run_notebook
333        """
334        return self.run_notebook(name, event=event, timeout=timeout)
335
336    def run_notebook(self, name, event=None, timeout=None):
337        """ run a given notebook immediately.
338
339        Args:
340            name (str): the name of the jobfile
341            event (str): an event name
342            timeout (int): timeout in seconds
343
344        Returns:
345            Metadata of results
346
347        See Also:
348            * nbconvert https://nbconvert.readthedocs.io/en/latest/execute_api.html
349        """
350        notebook = self.get(name)
351        meta_job = self.metadata(name)
352        assert meta_job is not None, f"Cannot run non-existent jobs {name}"
353        ts = datetime.datetime.now()
354        # execute kwargs
355        # -- see ExecuteProcessor class
356        # -- see https://nbconvert.readthedocs.io/en/latest/execute_api.html
357        ep_kwargs = {
358            # avoid timeouts to stop kernel
359            'timeout': timeout,
360            # avoid kernel at exit functions
361            # -- this stops ipykernel AttributeError 'send_multipart'
362            'shutdown_kernel': 'immediate',
363            # set kernel name, blank is default
364            # -- e.g. python3, ir
365            # -- see https://stackoverflow.com/a/47053020/890242
366            'kernel_name': '',
367            # always use the async kernel manager
368            # -- see https://github.com/jupyter/nbconvert/issues/1964
369            'kernel_manager_class': AsyncKernelManager,
370        }
371        # overrides from metadata
372        ep_kwargs.update(meta_job.kind_meta.get('ep_kwargs', {}))
373        try:
374            resources = {
375                'metadata': {
376                    'path': self.defaults.OMEGA_TMP,
377                }
378            }
379            if not meta_job.kind_meta.get('keep_output', False):
380                # https://nbconvert.readthedocs.io/en/latest/api/preprocessors.html
381                cp = ClearOutputPreprocessor()
382                cp.preprocess(notebook, resources)
383            ep = ExecutePreprocessor(**ep_kwargs)
384            ep.preprocess(notebook, resources)
385        except Exception as e:
386            status = 'ERROR'
387            message = str(e)
388        else:
389            status = 'OK'
390            message = ''
391        finally:
392            del ep
393        # record results
394        meta_results = self.put(notebook,
395                                'results/{name}_{ts}'.format(**locals()))
396        meta_results.attributes['source_job'] = name
397        meta_results.save()
398        job_results = meta_job.attributes.get('job_results', [])
399        job_results.append(meta_results.name)
400        meta_job.attributes['job_results'] = job_results
401        # record final job status
402        job_runs = meta_job.attributes.get('job_runs', [])
403        runstate = {
404            'status': status,
405            'ts': ts,
406            'message': message,
407            'results': meta_results.name if status == 'OK' else None
408        }
409        job_runs.append(runstate)
410        meta_job.attributes['job_runs'] = job_runs
411        # set event run state if event was specified
412        if event:
413            attrs = meta_job.attributes
414            triggers = attrs['triggers'] = attrs.get('triggers', [])
415            scheduled = (trigger for trigger in triggers
416                         if trigger['event-kind'] == 'scheduled')
417            for trigger in scheduled:
418                if event == trigger['event']:
419                    trigger['status'] = status
420                    trigger['ts'] = ts
421        meta_job.save()
422        return meta_results
423
424    def schedule(self, nb_file, run_at=None, last_run=None):
425        """
426        Schedule a processing of a notebook as per the interval
427        specified on the job script
428
429        Notes:
430            This updates the notebook's Metadata entry by adding the
431            next scheduled run in ``attributes['triggers']```
432
433        Args:
434            nb_file (str): the name of the notebook
435            run_at (str|dict|JobSchedule): the schedule specified in a format
436               suitable for JobSchedule. If not specified, this value is
437               extracted from the first cell of the notebook
438            last_run (datetime): the last time this job was run, use this to
439               reschedule the job for the next run. Defaults to the last
440               timestamp listed in ``attributes['job_runs']``, or datetime.utcnow()
441               if no previous run exists.
442
443        See Also:
444            * croniter.get_next()
445            * JobSchedule
446            * OmegaJobs.get_notebook_config
447            * cron expression - https://en.wikipedia.org/wiki/Cron#CRON_expression
448        """
449        meta = self.metadata(nb_file)
450        attrs = meta.attributes
451        # get/set run-at spec
452        config = attrs.get('config', {})
453        # see what we have as a schedule
454        # -- a dictionary of JobSchedule
455        if isinstance(run_at, dict):
456            run_at = self.Schedule(**run_at).cron
457        # -- a cron string
458        elif isinstance(run_at, str):
459            run_at = self.Schedule(run_at).cron
460        # -- a JobSchedule
461        elif isinstance(run_at, self.Schedule):
462            run_at = run_at.cron
463        # -- nothing, may we have it on the job's config already
464        if not run_at:
465            interval = config.get('run-at')
466        else:
467            # store the new schedule
468            config['run-at'] = run_at
469            interval = run_at
470        if not interval:
471            # if we don't have a run-spec, return without scheduling
472            raise ValueError('no run-at specification found, cannot schedule')
473        # get last time the job was run
474        if last_run is None:
475            job_runs = attrs.get('job_runs')
476            if job_runs:
477                last_run = job_runs[-1]['ts']
478            else:
479                last_run = datetime.datetime.utcnow()
480        # calculate next run time
481        iter_next = croniter(interval, last_run)
482        run_at = iter_next.get_next(datetime.datetime)
483        # store next scheduled run
484        triggers = attrs['triggers'] = attrs.get('triggers', [])
485        # set up a schedule event
486        scheduled_run = {
487            'event-kind': 'scheduled',
488            'event': run_at.isoformat(),
489            'run-at': run_at,
490            'status': 'PENDING'
491        }
492        # remove all pending triggers, add new triggers
493        past_triggers = [cur for cur in triggers if cur.get('status') != 'PENDING']
494        triggers.clear()
495        triggers.extend(past_triggers)
496        triggers.append(scheduled_run)
497        attrs['config'] = config
498        return meta.save()
499
500    def get_schedule(self, name, only_pending=False):
501        """
502        return the cron schedule and corresponding triggers
503
504        Args:
505            name (str): the name of the job
506
507        Returns:
508            tuple of (run_at, triggers)
509
510            run_at (str): the cron spec, None if not scheduled
511            triggers (list): the list of triggers
512        """
513        meta = self.metadata(name)
514        attrs = meta.attributes
515        config = attrs.get('config')
516        triggers = attrs.get('triggers', [])
517        if only_pending:
518            triggers = [trigger for trigger in triggers
519                        if trigger['status'] == 'PENDING']
520        if config and 'run-at' in config:
521            run_at = config.get('run-at')
522        else:
523            run_at = None
524        return run_at, triggers
525
526    def drop_schedule(self, name):
527        """
528        Drop an existing schedule, if any
529
530        This will drop any existing schedule and any pending triggers of
531        event-kind 'scheduled'.
532
533        Args:
534            name (str): the name of the job
535
536        Returns:
537            Metadata
538        """
539        meta = self.metadata(name)
540        attrs = meta.attributes
541        config = attrs.get('config')
542        triggers = attrs.get('triggers')
543        if 'run-at' in config:
544            del config['run-at']
545        for trigger in triggers:
546            if trigger['event-kind'] == 'scheduled' and trigger['status'] == 'PENDING':
547                trigger['status'] = 'CANCELLED'
548        return meta.save()
549
550    def export(self, name, localpath='memory', format='html'):
551        """
552        Export a job or result file to HTML
553
554        The job is exported in the given format.
555
556        :param name: the name of the job, as in jobs.get
557        :param localpath: the path of the local file to write. If you
558           specify an empty path or 'memory' a tuple of (body, resource)
559           is returned instead
560        :param format: the output format. currently only :code:`'html'` is supported
561        :return: the (data, resources) tuple as returned by nbconvert. For
562           format html data is the HTML's body, for PDF it is the pdf file contents
563        """
564        from traitlets.config import Config
565        from nbconvert import SlidesExporter
566        from nbconvert.exporters.html import HTMLExporter
567        from nbconvert.exporters.pdf import PDFExporter
568
569        # https://nbconvert.readthedocs.io/en/latest/nbconvert_library.html
570        # (exporter class, filemode, config-values
571        EXPORTERS = {
572            'html': (HTMLExporter, '', {}),
573            'htmlbody': (HTMLExporter, '', {}),
574            'pdf': (PDFExporter, 'b', {}),
575            'slides': (SlidesExporter, '', {'RevealHelpPreprocessor.url_prefix':
576                                                'https://cdnjs.cloudflare.com/ajax/libs/reveal.js/3.6.0/'}),
577        }
578        # get exporter according to format
579        if format not in EXPORTERS:
580            raise ValueError('format {} is invalid. Choose one of {}'.format(format, EXPORTERS.keys()))
581        exporter_cls, fmode, configkw = EXPORTERS[format]
582        # prepare config
583        # http://nbconvert.readthedocs.io/en/latest/nbconvert_library.html#Using-different-preprocessors
584        c = Config()
585        for k, v in configkw.items():
586            context, key = k.split('.')
587            setattr(c[context], key, v)
588        # get configured exporter
589        exporter = exporter_cls(config=c)
590        # get notebook, convert and store in file if requested
591        notebook = self.get(name)
592        (data, resources) = exporter.from_notebook_node(notebook)
593        if localpath and localpath != 'memory':
594            with open(localpath, 'w' + fmode) as fout:
595                fout.write(data)
596        return data, resources
597
598    def help(self, name_or_obj=None, kind=None, raw=False):
599        """ get help for a notebook
600
601        Args:
602            name_or_obj (str|obj): the name or actual object to get help for
603            kind (str): optional, if specified forces retrieval of backend for the given kind
604            raw (bool): optional, if True forces help to be the backend type of the object.
605                If False returns the attributes[docs] on the object's metadata, if available.
606                Defaults to False
607
608        Returns:
609            * help(obj) if python is in interactive mode
610            * text(str) if python is in not interactive mode
611        """
612        return super().help(name_or_obj, kind=kind, raw=raw)
613
614    def to_archive(self, name, path, **kwargs):
615        if not name.endswith('.ipynb'):
616            name += '.ipynb'
617        return super().to_archive(name, path, **kwargs)
618
619    def from_archive(self, path, name, **kwargs):
620        if not name.endswith('.ipynb'):
621            name += '.ipynb'
622        return super().from_archive(path, name, **kwargs)
623
624    def promote(self, name, other, **kwargs):
625        nb = self.get(name)
626        return other.put(nb, name)
627
628    def summary(self, *args, **kwargs):
629        return super().summary(*args, **kwargs)
630
631
632# sphinx docs compatibility
[docs] 633class OmegaJobs(NotebookMixin, NotebookBackend): 634 pass