om.jobs¶
- omegaml.jobs = OmegaJobs(store=OmegaStore(bucket=omegaml, prefix=jobs/))¶
the
omegaml.notebook.jobs.OmegaJobs
store for jobsMethods:
Mixins:
Backends:
None
- class omegaml.notebook.jobs.OmegaJobs(bucket=None, prefix=None, store=None, defaults=None)¶
Omega Jobs API
- create(code, name)¶
create a notebook from code
- Parameters:
code – the code as a string
name – the name of the job to create
- Returns:
the metadata object created
- drop(name, force=False)¶
remove the notebook
- Parameters:
name (str) – the name of the notebook
force (bool) – if True does not raise
- Returns:
True if object was deleted, False if not. If force is True and the object does not exist it will still return True
- Raises:
DoesNotExist if the notebook does not exist and `force=False` –
- drop_schedule(name)¶
Drop an existing schedule, if any
This will drop any existing schedule and any pending triggers of event-kind ‘scheduled’.
- Parameters:
name (str) – the name of the job
- Returns:
Metadata
- exists(name)¶
check if the notebook exists
- Parameters:
name (str) – the name of the notebook
- Returns:
Metadata
- export(name, localpath, format='html')¶
Export a job or result file to HTML
The job is exported in the given format.
- Parameters:
name – the name of the job, as in jobs.get
localpath – the path of the local file to write. If you specify an empty path or ‘memory’ a tuple of (body, resource) is returned instead
format – the output format. currently only
'html'
is supported
- Returns:
the (data, resources) tuple as returned by nbconvert. For format html data is the HTML’s body, for PDF it is the pdf file contents
- get(name)¶
Retrieve a notebook and return a NotebookNode
- get_collection(collection)¶
returns the collection object
- get_notebook_config(nb_filename)¶
returns the omegaml script config on the notebook’s first cell
The config cell is in any of the following formats:
Option 1:
# omega-ml: # run-at: "<cron schedule>"
Option 2:
# omega-ml: # schedule: "weekday(s), hour[, month]"
Option 3:
# omega-ml: # cron: "<cron schedule>"
You may optionally specify only
run-at
,schedule
orcron
, i.e. without theomega-ml
header:# run-at: "<cron schedule>" # cron: "<cron schedule>" # schedule: "weekday(s), hour[, month]"
- Parameters:
nb_filename (str) – the name of the notebook
- Returns:
config(dict) =>
{ 'run-at': <specifier> }
- Raises:
ValueError, if there is no config cell or the config cell is invalid –
See also
JobSchedule
- get_schedule(name, only_pending=False)¶
return the cron schedule and corresponding triggers
- Parameters:
name (str) – the name of the job
- Returns:
tuple of (run_at, triggers)
run_at (str): the cron spec, None if not scheduled triggers (list): the list of triggers
- help(name_or_obj=None, kind=None, raw=False)¶
get help for a notebook
- Parameters:
name_or_obj (str|obj) – the name or actual object to get help for
kind (str) – optional, if specified forces retrieval of backend for the given kind
raw (bool) – optional, if True forces help to be the backend type of the object. If False returns the attributes[docs] on the object’s metadata, if available. Defaults to False
- Returns:
help(obj) if python is in interactive mode
text(str) if python is in not interactive mode
- list(pattern=None, regexp=None, raw=False, **kwargs)¶
list all jobs matching filter. filter is a regex on the name of the ipynb entry. The default is all, i.e. .*
- metadata(name, **kwargs)¶
retrieve metadata of a notebook
- Parameters:
name (str) – the name of the notebook
- Returns:
Metadata
- put(obj, name, attributes=None)¶
Store a NotebookNode
- Parameters:
obj – the NotebookNode to store
name – the name of the notebook
- run(name, event=None, timeout=None)¶
Run a job immediately
The job is run and the results are stored in om.jobs(‘results/name <timestamp>’) and the result’s Metadata is returned.
Metadata.attributes
of the original job as given by name is updated:attributes['job_runs']
(list) - list of status of each run. Status isa dict as below
attributes['job_results']
(list) - list of results job names in sameindex-order as job_runs
attributes['trigger']
(list) - list of triggers
The status of each job run is a dict with keys:
status
(str): the status of the job run, OK or ERRORts
(datetime): time of executionmessage
(str): error mesasge in case of ERROR, else blankresults
(str): name of results in case of OK, else blank
Usage:
# directly (sync) meta = om.jobs.run('mynb') # via runtime (async) job = om.runtime.job('mynb') result = job.run()
- Parameters:
name (str) – the name of the jobfile
event (str) – an event name
timeout (int) – timeout in seconds, None means no timeout
- Returns:
Metadata of the results entry
See also
OmegaJobs.run_notebook
- run_notebook(name, event=None, timeout=None)¶
run a given notebook immediately.
- Parameters:
name (str) – the name of the jobfile
event (str) – an event name
timeout (int) – timeout in seconds
- Returns:
Metadata of results
See also
- schedule(nb_file, run_at=None, last_run=None)¶
Schedule a processing of a notebook as per the interval specified on the job script
Notes
This updates the notebook’s Metadata entry by adding the next scheduled run in
attributes['triggers']`
- Parameters:
nb_file (str) – the name of the notebook
run_at (str|dict|JobSchedule) – the schedule specified in a format suitable for JobSchedule. If not specified, this value is extracted from the first cell of the notebook
last_run (datetime) – the last time this job was run, use this to reschedule the job for the next run. Defaults to the last timestamp listed in
attributes['job_runs']
, or datetime.utcnow() if no previous run exists.
See also
croniter.get_next()
JobSchedule
OmegaJobs.get_notebook_config
cron expression - https://en.wikipedia.org/wiki/Cron#CRON_expression
Backends¶
None
Mixins¶
- class omegaml.notebook.jobschedule.JobSchedule(text=None, minute='*', hour='*', weekday='*', monthday='*', month='*', at=None)¶
Produce a cron tab spec from text, time periods, or a crontab spec
Given any specification format, can translate to a human readable text.
If using time periods (minute, hour, weekday, monthday, month), any argument not specified defaults to ‘*’ for this argument.
If using text, sepearate each time part (weekday, hour, month) by a comma. To specify multiple times for a part, use / instead of comma.
Examples:
# using text JobSchedule('friday, at 06:00/08:00/10:00') JobSchedule('Mondays and Fridays, at 06:00') JobSchedule('every 5 minutes, on weekends, in april') # using time periods JobSchedule(weekday='mon-fri', at='06:05,12:05') JobSchedule(weekday='mon-fri', at='06:00') JobSchedule(month='every 2', at='08:00', weekday='every 3') # using a crontab spec JobSchedule('05 06,12 * * mon-fri') # given a valid specification get human readable text or crontab format JobSchedule('05 06,12 * * mon-fri').text JobSchedule('Mondays and Fridays, at 06:00').cron
- Parameters:
text (str) – the natural language specification, with time parts separated by comma
at (str) – the hh:mm specification, equal to hour=hh, minute=mm
minute (str) – run on 0-59th minute in every specified hour
hour (str) – run on 0-23th hour on every specified day
weekday (str) – run on 0-6th day in every week (0 is Sunday), can also be specified as mon/tue/wed/thu/fri/sat/sun
monthday (str) – run on 1-31th day in every month
month (str) – run on 1-12th day of every year
- Raises:
ValueError if the given specification is not correct –
- property cron¶
return the cron representation of the schedule
- classmethod from_cron(cronspec)¶
initialize JobSchedule from a cron specifier
- classmethod from_text(text)¶
initialize JobSchedule from a weekday, hour, month specifier
- next_times(n=None, last_run=None)¶
return the n next times of this schedule, staring from the last run
- Parameters:
n (int) – the n next times
last_run (datetime) – the last time this was run
Notes
if n is None returns a never ending iterator
- Returns:
iterator of next times
See also
croniter.get_next()
- property text¶
return the human readable representation of the schedule
- class omegaml.runtimes.mixins.nbtasks.JobTasks¶
example notebook task runner using omegaml runtime
Parallel execution of notebooks with parameters on the omegaml runtime
Usage:
# submit tasks # -- creates 10 tasks as copies of the main notebook in om.jobs.list('tasks') # -- runs each task using omegaml runtime # -- every .map() call generates a new task group, ensuring unique notebook names # -- the original notebook is not changed job = om.runtime.job('main') job.map(range(10)) # check status job.status() # restart tasks that did not produce a result yet job.restart() # get the list of all notebooks created in one .map() call job.list() # get the list of all notebooks in any .map() call job.list(task_group='*') # example notebook job = globals().get('job', dict(param=my_default_value)) def calculation(job): # job is a dict with job['param'] set to one value taken from .map(<jobs>) # job contains other keys to identify the task: # job_id: a sequence number # task_group: the group of tasks submitted in one .map() call # task_name: the om.jobs name of the running notebook # status: the status, pending or finished # task_id: the celery task id, if available ... <insert your code here> # run the calculation # -- in each task, job will be one of the variables given in nbtasks.map(<jobs>) # <jobs> is an iterable, returning one object for each job # note the job must be serializable. if you need something more complex, pass # the name of an omegaml dataset and possibly some query criteria calculation(job)
- list(task_group=None, **kwargs)¶
List all generated tasks for this map call
- Parameters:
task_group (str) – optional, the task group
**kwargs (kwargs) – kwargs to om.jobs.list()
- Returns:
list of tasks generated for map call
- map(jobs, job_ids=None, require=None, reset=False, task_group=None)¶
example notebook task runner using omegaml runtime
Parallel execution of notebooks with parameters on the omegaml runtime
Usage:
# submit tasks # -- creates 10 tasks as copies of the main notebook in om.jobs.list('tasks') # -- runs each task using omegaml runtime # -- every .map() call generates a new task group, ensuring unique notebook names # -- the original notebook is not changed job = om.runtime.job('main') job.map(range(10)) # check status job.status() # restart tasks that did not produce a result yet job.restart() # get the list of all notebooks created in one .map() call job.list() # get the list of all notebooks in any .map() call job.list(task_group='*') # example notebook job = globals().get('job', dict(param=my_default_value)) def calculation(job): # job is a dict with job['param'] set to one value taken from .map(<jobs>) # job contains other keys to identify the task: # job_id: a sequence number # task_group: the group of tasks submitted in one .map() call # task_name: the om.jobs name of the running notebook # status: the status, pending or finished # task_id: the celery task id, if available ... <insert your code here> # run the calculation # -- in each task, job will be one of the variables given in nbtasks.map(<jobs>) # <jobs> is an iterable, returning one object for each job # note the job must be serializable. if you need something more complex, pass # the name of an omegaml dataset and possibly some query criteria calculation(job)
- restart(task_group=None, reset=False, require=None)¶
Run notebook for every entry in tasks/ with no result
For every entry in om.jobs.list(‘tasks/*’) will check if there is a result yet. If not, will call om.runtime.job().run() for the given notebook.
Usage:
generate_jobs(...) restart() This will start all jobs that do not have a result yet. To start the jobs even if there is a result already, set reset=True.
Notes
metadata.attributes['job']
will record the task status:* ``task_id``: the celery task id * ``status``: task status, initialized to PENDING
- status(task_group=None)¶
get the status and the celery id of of each task
- Parameters:
task_group (str) – the task group id, defaults to None
Returns:
pd.DataFrame name: task name task_id: runtime task id status: final status, PENDING, RUNNING, SUCCESS, FAILURE run_status: current status, PENDING, RUNNING, SUCCESS, FAILURE