om.jobs

omegaml.jobs = OmegaJobs(store=OmegaStore(bucket=omegaml, prefix=jobs/))

the omegaml.notebook.jobs.OmegaJobs store for jobs

Methods:

Mixins:

Backends:

None

class omegaml.notebook.jobs.OmegaJobs(bucket=None, prefix=None, store=None, defaults=None)

Omega Jobs API

create(code, name)

create a notebook from code

Parameters:
  • code – the code as a string

  • name – the name of the job to create

Returns:

the metadata object created

drop(name, force=False)

remove the notebook

Parameters:
  • name (str) – the name of the notebook

  • force (bool) – if True does not raise

Returns:

True if object was deleted, False if not. If force is True and the object does not exist it will still return True

Raises:

DoesNotExist if the notebook does not exist and `force=False`

drop_schedule(name)

Drop an existing schedule, if any

This will drop any existing schedule and any pending triggers of event-kind ‘scheduled’.

Parameters:

name (str) – the name of the job

Returns:

Metadata

exists(name)

check if the notebook exists

Parameters:

name (str) – the name of the notebook

Returns:

Metadata

export(name, localpath, format='html')

Export a job or result file to HTML

The job is exported in the given format.

Parameters:
  • name – the name of the job, as in jobs.get

  • localpath – the path of the local file to write. If you specify an empty path or ‘memory’ a tuple of (body, resource) is returned instead

  • format – the output format. currently only 'html' is supported

Returns:

the (data, resources) tuple as returned by nbconvert. For format html data is the HTML’s body, for PDF it is the pdf file contents

get(name)

Retrieve a notebook and return a NotebookNode

get_collection(collection)

returns the collection object

get_notebook_config(nb_filename)

returns the omegaml script config on the notebook’s first cell

The config cell is in any of the following formats:

Option 1:

# omega-ml:
#    run-at: "<cron schedule>"

Option 2:

# omega-ml:
#   schedule: "weekday(s), hour[, month]"

Option 3:

# omega-ml:
#   cron: "<cron schedule>"

You may optionally specify only run-at, schedule or cron, i.e. without the omega-ml header:

# run-at: "<cron schedule>"
# cron: "<cron schedule>"
# schedule: "weekday(s), hour[, month]"
Parameters:

nb_filename (str) – the name of the notebook

Returns:

config(dict) => { 'run-at': <specifier> }

Raises:

ValueError, if there is no config cell or the config cell is invalid

See also

  • JobSchedule

get_schedule(name, only_pending=False)

return the cron schedule and corresponding triggers

Parameters:

name (str) – the name of the job

Returns:

tuple of (run_at, triggers)

run_at (str): the cron spec, None if not scheduled triggers (list): the list of triggers

help(name_or_obj=None, kind=None, raw=False)

get help for a notebook

Parameters:
  • name_or_obj (str|obj) – the name or actual object to get help for

  • kind (str) – optional, if specified forces retrieval of backend for the given kind

  • raw (bool) – optional, if True forces help to be the backend type of the object. If False returns the attributes[docs] on the object’s metadata, if available. Defaults to False

Returns:

  • help(obj) if python is in interactive mode

  • text(str) if python is in not interactive mode

list(pattern=None, regexp=None, raw=False, **kwargs)

list all jobs matching filter. filter is a regex on the name of the ipynb entry. The default is all, i.e. .*

metadata(name, **kwargs)

retrieve metadata of a notebook

Parameters:

name (str) – the name of the notebook

Returns:

Metadata

put(obj, name, attributes=None)

Store a NotebookNode

Parameters:
  • obj – the NotebookNode to store

  • name – the name of the notebook

run(name, event=None, timeout=None)

Run a job immediately

The job is run and the results are stored in om.jobs(‘results/name <timestamp>’) and the result’s Metadata is returned.

Metadata.attributes of the original job as given by name is updated:

  • attributes['job_runs'] (list) - list of status of each run. Status is

    a dict as below

  • attributes['job_results'] (list) - list of results job names in same

    index-order as job_runs

  • attributes['trigger'] (list) - list of triggers

The status of each job run is a dict with keys:

  • status (str): the status of the job run, OK or ERROR

  • ts (datetime): time of execution

  • message (str): error mesasge in case of ERROR, else blank

  • results (str): name of results in case of OK, else blank

Usage:

# directly (sync)
meta = om.jobs.run('mynb')

# via runtime (async)
job = om.runtime.job('mynb')
result = job.run()
Parameters:
  • name (str) – the name of the jobfile

  • event (str) – an event name

  • timeout (int) – timeout in seconds, None means no timeout

Returns:

Metadata of the results entry

See also

  • OmegaJobs.run_notebook

run_notebook(name, event=None, timeout=None)

run a given notebook immediately.

Parameters:
  • name (str) – the name of the jobfile

  • event (str) – an event name

  • timeout (int) – timeout in seconds

Returns:

Metadata of results

schedule(nb_file, run_at=None, last_run=None)

Schedule a processing of a notebook as per the interval specified on the job script

Notes

This updates the notebook’s Metadata entry by adding the next scheduled run in attributes['triggers']`

Parameters:
  • nb_file (str) – the name of the notebook

  • run_at (str|dict|JobSchedule) – the schedule specified in a format suitable for JobSchedule. If not specified, this value is extracted from the first cell of the notebook

  • last_run (datetime) – the last time this job was run, use this to reschedule the job for the next run. Defaults to the last timestamp listed in attributes['job_runs'], or datetime.utcnow() if no previous run exists.

See also

Backends

None

Mixins

class omegaml.notebook.jobschedule.JobSchedule(text=None, minute='*', hour='*', weekday='*', monthday='*', month='*', at=None)

Produce a cron tab spec from text, time periods, or a crontab spec

Given any specification format, can translate to a human readable text.

If using time periods (minute, hour, weekday, monthday, month), any argument not specified defaults to ‘*’ for this argument.

If using text, sepearate each time part (weekday, hour, month) by a comma. To specify multiple times for a part, use / instead of comma.

Examples:

# using text
JobSchedule('friday, at 06:00/08:00/10:00')
JobSchedule('Mondays and Fridays, at 06:00')
JobSchedule('every 5 minutes, on weekends, in april')

# using time periods
JobSchedule(weekday='mon-fri', at='06:05,12:05')
JobSchedule(weekday='mon-fri', at='06:00')
JobSchedule(month='every 2', at='08:00', weekday='every 3')

# using a crontab spec
JobSchedule('05 06,12 * * mon-fri')

# given a valid specification get human readable text or crontab format
JobSchedule('05 06,12 * * mon-fri').text
JobSchedule('Mondays and Fridays, at 06:00').cron
Parameters:
  • text (str) – the natural language specification, with time parts separated by comma

  • at (str) – the hh:mm specification, equal to hour=hh, minute=mm

  • minute (str) – run on 0-59th minute in every specified hour

  • hour (str) – run on 0-23th hour on every specified day

  • weekday (str) – run on 0-6th day in every week (0 is Sunday), can also be specified as mon/tue/wed/thu/fri/sat/sun

  • monthday (str) – run on 1-31th day in every month

  • month (str) – run on 1-12th day of every year

Raises:

ValueError if the given specification is not correct

property cron

return the cron representation of the schedule

classmethod from_cron(cronspec)

initialize JobSchedule from a cron specifier

classmethod from_text(text)

initialize JobSchedule from a weekday, hour, month specifier

next_times(n=None, last_run=None)

return the n next times of this schedule, staring from the last run

Parameters:
  • n (int) – the n next times

  • last_run (datetime) – the last time this was run

Notes

if n is None returns a never ending iterator

Returns:

iterator of next times

See also

  • croniter.get_next()

property text

return the human readable representation of the schedule

class omegaml.runtimes.mixins.nbtasks.JobTasks

example notebook task runner using omegaml runtime

Parallel execution of notebooks with parameters on the omegaml runtime

Usage:

# submit tasks
# -- creates 10 tasks as copies of the main notebook in om.jobs.list('tasks')
# -- runs each task using omegaml runtime
# -- every .map() call generates a new task group, ensuring unique notebook names
# -- the original notebook is not changed
job = om.runtime.job('main')
job.map(range(10))

# check status
job.status()

# restart tasks that did not produce a result yet
job.restart()

# get the list of all notebooks created in one .map() call
job.list()

# get the list of all notebooks in any .map() call
job.list(task_group='*')

# example notebook
job = globals().get('job', dict(param=my_default_value))

def calculation(job):
   # job is a dict with job['param'] set to one value taken from .map(<jobs>)
   # job contains other keys to identify the task:
   #   job_id: a sequence number
   #   task_group: the group of tasks submitted in one .map() call
   #   task_name: the om.jobs name of the running notebook
   #   status: the status, pending or finished
   #   task_id: the celery task id, if available
   ... <insert your code here>

# run the calculation
# -- in each task, job will be one of the variables given in nbtasks.map(<jobs>)
#    <jobs> is an iterable, returning one object for each job
#    note the job must be serializable. if you need something more complex, pass
#    the name of an omegaml dataset and possibly some query criteria
calculation(job)
list(task_group=None, **kwargs)

List all generated tasks for this map call

Parameters:
  • task_group (str) – optional, the task group

  • **kwargs (kwargs) – kwargs to om.jobs.list()

Returns:

list of tasks generated for map call

map(jobs, job_ids=None, require=None, reset=False, task_group=None)

example notebook task runner using omegaml runtime

Parallel execution of notebooks with parameters on the omegaml runtime

Usage:

# submit tasks
# -- creates 10 tasks as copies of the main notebook in om.jobs.list('tasks')
# -- runs each task using omegaml runtime
# -- every .map() call generates a new task group, ensuring unique notebook names
# -- the original notebook is not changed
job = om.runtime.job('main')
job.map(range(10))

# check status
job.status()

# restart tasks that did not produce a result yet
job.restart()

# get the list of all notebooks created in one .map() call
job.list()

# get the list of all notebooks in any .map() call
job.list(task_group='*')

# example notebook
job = globals().get('job', dict(param=my_default_value))

def calculation(job):
   # job is a dict with job['param'] set to one value taken from .map(<jobs>)
   # job contains other keys to identify the task:
   #   job_id: a sequence number
   #   task_group: the group of tasks submitted in one .map() call
   #   task_name: the om.jobs name of the running notebook
   #   status: the status, pending or finished
   #   task_id: the celery task id, if available
   ... <insert your code here>

# run the calculation
# -- in each task, job will be one of the variables given in nbtasks.map(<jobs>)
#    <jobs> is an iterable, returning one object for each job
#    note the job must be serializable. if you need something more complex, pass
#    the name of an omegaml dataset and possibly some query criteria
calculation(job)
restart(task_group=None, reset=False, require=None)

Run notebook for every entry in tasks/ with no result

For every entry in om.jobs.list(‘tasks/*’) will check if there is a result yet. If not, will call om.runtime.job().run() for the given notebook.

Usage:

generate_jobs(...)
restart()

This will start all jobs that do not have a result yet. To
start the jobs even if there is a result already, set reset=True.

Notes

metadata.attributes['job'] will record the task status:

* ``task_id``: the celery task id
* ``status``: task status, initialized to PENDING
status(task_group=None)

get the status and the celery id of of each task

Parameters:

task_group (str) – the task group id, defaults to None

Returns:

pd.DataFrame
    name: task name
    task_id: runtime task id
    status: final status, PENDING, RUNNING, SUCCESS, FAILURE
    run_status: current status, PENDING, RUNNING, SUCCESS, FAILURE