om.jobs

omegaml.jobs = OmegaJobs(store=OmegaStore(bucket=omegaml, prefix=jobs/))

the omegaml.store.base.OmegaStore store for jobs

Methods:

  • omegaml.notebook.jobs.OmegaJobs.list()

  • omegaml.notebook.jobs.OmegaJobs.get()

  • omegaml.notebook.jobs.OmegaJobs.put()

  • omegaml.notebook.jobs.OmegaJobs.drop()

  • omegaml.notebook.jobs.OmegaJobs.exists()

  • omegaml.notebook.jobs.OmegaJobs.create()

  • omegaml.notebook.jobs.OmegaJobs.export()

  • omegaml.notebook.jobs.OmegaJobs.schedule()

  • omegaml.notebook.jobs.OmegaJobs.metadata()

  • omegaml.notebook.jobs.OmegaJobs.help()

Mixins:

Backends:

None

class omegaml.notebook.jobs.OmegaJobs(model_store=None, data_store=None, tracking=None, **kwargs)[source]

Backends

None

Mixins

class omegaml.notebook.jobschedule.JobSchedule(text=None, minute='*', hour='*', weekday='*', monthday='*', month='*', at=None)[source]

Produce a cron tab spec from text, time periods, or a crontab spec

Given any specification format, can translate to a human readable text.

If using time periods (minute, hour, weekday, monthday, month), any argument not specified defaults to ‘*’ for this argument.

If using text, sepearate each time part (weekday, hour, month) by a comma. To specify multiple times for a part, use / instead of comma.

Examples:

# using text
JobSchedule('friday, at 06:00/08:00/10:00')
JobSchedule('Mondays and Fridays, at 06:00')
JobSchedule('every 5 minutes, on weekends, in april')

# using time periods
JobSchedule(weekday='mon-fri', at='06:05,12:05')
JobSchedule(weekday='mon-fri', at='06:00')
JobSchedule(month='every 2', at='08:00', weekday='every 3')

# using a crontab spec
JobSchedule('05 06,12 * * mon-fri')

# given a valid specification get human readable text or crontab format
JobSchedule('05 06,12 * * mon-fri').text
JobSchedule('Mondays and Fridays, at 06:00').cron
Parameters:
  • text (str) – the natural language specification, with time parts separated by comma

  • at (str) – the hh:mm specification, equal to hour=hh, minute=mm

  • minute (str) – run on 0-59th minute in every specified hour

  • hour (str) – run on 0-23th hour on every specified day

  • weekday (str) – run on 0-6th day in every week (0 is Sunday), can also be specified as mon/tue/wed/thu/fri/sat/sun

  • monthday (str) – run on 1-31th day in every month

  • month (str) – run on 1-12th day of every year

Raises:

ValueError if the given specification is not correct

property cron

return the cron representation of the schedule

classmethod from_cron(cronspec)[source]

initialize JobSchedule from a cron specifier

classmethod from_text(text)[source]

initialize JobSchedule from a weekday, hour, month specifier

next_times(n=None, last_run=None)[source]

return the n next times of this schedule, staring from the last run

Parameters:
  • n (int) – the n next times

  • last_run (datetime) – the last time this was run

Notes

if n is None returns a never ending iterator

Returns:

iterator of next times

See also

  • croniter.get_next()

property text

return the human readable representation of the schedule

class omegaml.runtimes.mixins.nbtasks.JobTasks[source]

example notebook task runner using omegaml runtime

Parallel execution of notebooks with parameters on the omegaml runtime

Usage:

# submit tasks
# -- creates 10 tasks as copies of the main notebook in om.jobs.list('tasks')
# -- runs each task using omegaml runtime
# -- every .map() call generates a new task group, ensuring unique notebook names
# -- the original notebook is not changed
job = om.runtime.job('main')
job.map(range(10))

# check status
job.status()

# restart tasks that did not produce a result yet
job.restart()

# get the list of all notebooks created in one .map() call
job.list()

# get the list of all notebooks in any .map() call
job.list(task_group='*')

# example notebook
job = globals().get('job', dict(param=my_default_value))

def calculation(job):
   # job is a dict with job['param'] set to one value taken from .map(<jobs>)
   # job contains other keys to identify the task:
   #   job_id: a sequence number
   #   task_group: the group of tasks submitted in one .map() call
   #   task_name: the om.jobs name of the running notebook
   #   status: the status, pending or finished
   #   task_id: the celery task id, if available
   ... <insert your code here>

# run the calculation
# -- in each task, job will be one of the variables given in nbtasks.map(<jobs>)
#    <jobs> is an iterable, returning one object for each job
#    note the job must be serializable. if you need something more complex, pass
#    the name of an omegaml dataset and possibly some query criteria
calculation(job)
list(task_group=None, **kwargs)[source]

List all generated tasks for this map call

Parameters:
  • task_group (str) – optional, the task group

  • **kwargs (kwargs) – kwargs to om.jobs.list()

Returns:

list of tasks generated for map call

map(jobs, job_ids=None, require=None, reset=False, task_group=None)[source]

example notebook task runner using omegaml runtime

Parallel execution of notebooks with parameters on the omegaml runtime

Usage:

# submit tasks
# -- creates 10 tasks as copies of the main notebook in om.jobs.list('tasks')
# -- runs each task using omegaml runtime
# -- every .map() call generates a new task group, ensuring unique notebook names
# -- the original notebook is not changed
job = om.runtime.job('main')
job.map(range(10))

# check status
job.status()

# restart tasks that did not produce a result yet
job.restart()

# get the list of all notebooks created in one .map() call
job.list()

# get the list of all notebooks in any .map() call
job.list(task_group='*')

# example notebook
job = globals().get('job', dict(param=my_default_value))

def calculation(job):
   # job is a dict with job['param'] set to one value taken from .map(<jobs>)
   # job contains other keys to identify the task:
   #   job_id: a sequence number
   #   task_group: the group of tasks submitted in one .map() call
   #   task_name: the om.jobs name of the running notebook
   #   status: the status, pending or finished
   #   task_id: the celery task id, if available
   ... <insert your code here>

# run the calculation
# -- in each task, job will be one of the variables given in nbtasks.map(<jobs>)
#    <jobs> is an iterable, returning one object for each job
#    note the job must be serializable. if you need something more complex, pass
#    the name of an omegaml dataset and possibly some query criteria
calculation(job)
restart(task_group=None, reset=False, require=None)[source]

Run notebook for every entry in tasks/ with no result

For every entry in om.jobs.list(‘tasks/*’) will check if there is a result yet. If not, will call om.runtime.job().run() for the given notebook.

Usage:

generate_jobs(...)
restart()

This will start all jobs that do not have a result yet. To
start the jobs even if there is a result already, set reset=True.

Notes

metadata.attributes['job'] will record the task status:

* ``task_id``: the celery task id
* ``status``: task status, initialized to PENDING
status(task_group=None)[source]

get the status and the celery id of of each task

Parameters:

task_group (str) – the task group id, defaults to None

Returns:

pd.DataFrame
    name: task name
    task_id: runtime task id
    status: final status, PENDING, RUNNING, SUCCESS, FAILURE
    run_status: current status, PENDING, RUNNING, SUCCESS, FAILURE