om.runtime

omegaml.runtime = OmegaRuntime(Omega())

the omegaml.runtimes.runtime.OmegaRuntime runtime

Methods to run models, scripts, jobs, experiments:

Methods to manage task distribution, logging:

Methods to inspect runtime state:

Backends:

  • omegaml.runtimes.modelproxy.OmegaModelProxy

  • omegaml.runtimes.jobproxy.OmegaJobProxy

  • omegaml.runtimes.scriptproxy.OmegaScriptProxy

  • omegaml.runtimes.trackingproxy.OmegaTrackingProxy

  • omegaml.runtimes.loky.OmegaRuntimeBackend

Mixins:

class omegaml.runtimes.OmegaRuntime(omega, bucket=None, defaults=None, celeryconf=None)

omegaml compute cluster gateway

callback(script_name, always=False, **kwargs)

Add a callback to a registered script

The callback will be triggered upon successful or failed execution of the runtime tasks. The script syntax is:

# script.py
def run(om, state=None, result=None, **kwargs):
    # state (str): 'SUCCESS'|'ERROR'
    # result (obj): the task's serialized result
Parameters:
  • script_name (str) – the name of the script (in om.scripts)

  • always (bool) – if True always apply this callback, defaults to False

  • **kwargs – and other kwargs to pass on to the script

Returns:

self

enable_hostqueues()

enable a worker-specific queue on every worker host

Returns:

list of labels (one entry for each hostname)

experiment(experiment, provider=None, implied_run=True, recreate=False, **tracker_kwargs)

set the tracking backend and experiment

Parameters:
  • experiment (str) – the name of the experiment

  • provider (str) – the name of the provider

  • tracker_kwargs (dict) – additional kwargs for the tracker

  • recreate (bool) – if True, recreate the experiment (i.e. drop and recreate, this is useful to change the provider or other settings. All previous data will be kept)

Returns:

OmegaTrackingProxy

job(jobname, require=None)

return a job for remote execution

Parameters:
  • jobname (str) – the name of the object in om.jobs

  • require (dict) – routing requirements for this job

Returns:

OmegaJobProxy

labels()

list available labels

Returns:

dict of workers => list of lables

mapreduce()

context manager to support sequenced, parallel and mapreduce tasks

Usage:

# run tasks async, in sequence
with om.runtime.sequence() as crt:
    crt.model('mymodel').fit(...)
    crt.model('mymodel').predict(...)
    result = crt.run()

# run tasks async, in parallel
with om.runtime.parallel() as crt:
    crt.model('mymodel').predict(...)
    crt.model('mymodel').predict(...)
    result = crt.run()

# run tasks async, in parallel with a final step
with om.runtime.mapreduce() as crt:
    # map tasks
    crt.model('mymodel').predict(...)
    crt.model('mymodel').predict(...)
    # reduce results - combined is a virtualobj function
    crt.model('combined').reduce(...)
    result = crt.run()

# combined is a virtual obj function, e.g.
@virtualobj
def combined(data=None, **kwargs):
    # data is the list of results from each map step
    return data

Note that the statements inside the context are
executed in sequence, as any normal python code. However,
the actual tasks are only executed on calling crt.run()
Parameters:

self – the runtime

Returns:

OmegaRuntime, for use within context

mode(local=None, logging=None)

specify runtime modes

Parameters:
  • local (bool) – if True, all execution will run locally, else on the configured remote cluster

  • logging (bool|str|tuple) – if True, will set the root logger output at INFO level; a single string is the name of the logger, typically a module name; a tuple (logger, level) will select logger and the level. Valid levels are INFO, WARNING, ERROR, CRITICAL, DEBUG

Usage:

# run all runtime tasks locally
om.runtime.mode(local=True)

# enable logging both in local and remote mode
om.runtime.mode(logging=True)

# select a specific module and level)
om.runtime.mode(logging=('sklearn', 'DEBUG'))

# disable logging
om.runtime.mode(logging=False)
model(modelname, require=None)

return a model for remote execution

Parameters:
  • modelname (str) – the name of the object in om.models

  • require (dict) – routing requirements for this job

Returns:

OmegaModelProxy

parallel()

context manager to support sequenced, parallel and mapreduce tasks

Usage:

# run tasks async, in sequence
with om.runtime.sequence() as crt:
    crt.model('mymodel').fit(...)
    crt.model('mymodel').predict(...)
    result = crt.run()

# run tasks async, in parallel
with om.runtime.parallel() as crt:
    crt.model('mymodel').predict(...)
    crt.model('mymodel').predict(...)
    result = crt.run()

# run tasks async, in parallel with a final step
with om.runtime.mapreduce() as crt:
    # map tasks
    crt.model('mymodel').predict(...)
    crt.model('mymodel').predict(...)
    # reduce results - combined is a virtualobj function
    crt.model('combined').reduce(...)
    result = crt.run()

# combined is a virtual obj function, e.g.
@virtualobj
def combined(data=None, **kwargs):
    # data is the list of results from each map step
    return data

Note that the statements inside the context are
executed in sequence, as any normal python code. However,
the actual tasks are only executed on calling crt.run()
Parameters:

self – the runtime

Returns:

OmegaRuntime, for use within context

ping(*args, require=None, wait=True, **kwargs)

ping the runtime

Parameters:
  • args (tuple) – task args

  • require (dict) – routing requirements for this job

  • wait (bool) – if True, wait for the task to return, else return AsyncResult

  • kwargs (dict) – task kwargs

Returns:

  • response (dict) for wait=True

  • AsyncResult for wait=False

queues()

list queues

Returns:

dict of workers => list of queues

See also

celery Inspect.active_queues()

require(label=None, always=False, routing=None, task=None, logging=None, override=True, **kwargs)

specify requirements for the task execution

Use this to specify resource or routing requirements on the next task call sent to the runtime. Any requirements will be reset after the call has been submitted.

Parameters:
  • always (bool) – if True requirements will persist across task calls. defaults to False

  • label (str) – the label required by the worker to have a runtime task dispatched to it. ‘local’ is equivalent to calling self.mode(local=True).

  • task (dict) – if specified applied to the task kwargs

  • logging (str|tuple) – if specified, same as runtime.mode(logging=…)

  • override (bool) – if True overrides previously set .require(), defaults to True

  • kwargs – requirements specification that the runtime understands

Usage:

om.runtime.require(label=’gpu’).model(‘foo’).fit(…)

Returns:

self

script(scriptname, require=None)

return a script for remote execution

Parameters:
  • scriptname (str) – the name of object in om.scripts

  • require (dict) – routing requirements for this job

Returns:

OmegaScriptProxy

sequence()

context manager to support sequenced, parallel and mapreduce tasks

Usage:

# run tasks async, in sequence
with om.runtime.sequence() as crt:
    crt.model('mymodel').fit(...)
    crt.model('mymodel').predict(...)
    result = crt.run()

# run tasks async, in parallel
with om.runtime.parallel() as crt:
    crt.model('mymodel').predict(...)
    crt.model('mymodel').predict(...)
    result = crt.run()

# run tasks async, in parallel with a final step
with om.runtime.mapreduce() as crt:
    # map tasks
    crt.model('mymodel').predict(...)
    crt.model('mymodel').predict(...)
    # reduce results - combined is a virtualobj function
    crt.model('combined').reduce(...)
    result = crt.run()

# combined is a virtual obj function, e.g.
@virtualobj
def combined(data=None, **kwargs):
    # data is the list of results from each map step
    return data

Note that the statements inside the context are
executed in sequence, as any normal python code. However,
the actual tasks are only executed on calling crt.run()
Parameters:

self – the runtime

Returns:

OmegaRuntime, for use within context

settings(require=None)

return the runtimes’s cluster settings

stats()

worker statistics

Returns:

dict of workers => dict of stats

See also

celery Inspect.stats()

task(name, **kwargs)

retrieve the task function from the celery instance

Parameters:
  • name (str) – a registered celery task as module.tasks.task_name

  • kwargs (dict) – routing keywords to CeleryTask.apply_async

Returns:

CeleryTask

workers()

list of workers

Returns:

dict of workers => list of active tasks

See also

celery Inspect.active()

Backends

class omegaml.runtimes.loky.OmegaRuntimeBackend(*args, **kwargs)

omega custom parallel backend to print progress

TODO: extend for celery dispatching

start_call()

Call-back method called at the beginning of a Parallel call

stop_call()

Call-back method called at the end of a Parallel call

terminate()

Shutdown the workers and free the shared memory.

Mixins

class omegaml.runtimes.mixins.ModelMixin

mixin methods to OmegaModelProxy

decision_function(Xname, rName=None, **kwargs)

calculate score

Calls .decision_function(X, y, **kwargs). If rName is given the result is stored as object rName

Parameters:
  • Xname – name of the X dataset

  • rName – name of the resulting dataset (optional)

Returns:

the data returned by .score, or the metadata of the rName dataset if rName was given

fit(Xname, Yname=None, **kwargs)

fit the model

Calls .fit(X, Y, **kwargs). If instead of dataset names actual data is given, the data is stored using _fitX/fitY prefixes and a unique name.

After fitting, a new model version is stored with its attributes fitX and fitY pointing to the datasets, as well as the sklearn version used.

Parameters:
  • Xname – name of X dataset or data

  • Yname – name of Y dataset or data

Returns:

the model (self) or the string representation (python clients)

fit_transform(Xname, Yname=None, rName=None, **kwargs)

fit & transform X

Calls .fit_transform(X, Y, **kwargs). If rName is given the result is stored as object rName

Parameters:
  • Xname – name of the X dataset

  • Yname – name of the Y dataset

  • rName – name of the resulting dataset (optional)

Returns:

the data returned by .fit_transform, or the metadata of the rName dataset if rName was given

partial_fit(Xname, Yname=None, **kwargs)

update the model

Calls .partial_fit(X, Y, **kwargs). If instead of dataset names actual data is given, the data is stored using _fitX/fitY prefixes and a unique name.

After fitting, a new model version is stored with its attributes fitX and fitY pointing to the datasets, as well as the sklearn version used.

Parameters:
  • Xname – name of X dataset or data

  • Yname – name of Y dataset or data

Returns:

the model (self) or the string representation (python clients)

predict(Xpath_or_data, rName=None, **kwargs)

Calls .predict(X). If rName is given the result is stored as object rName

Parameters:
  • Xname – name of the X dataset

  • rName – name of the resulting dataset (optional)

Returns:

the data returned by .predict, or the metadata of the rName dataset if rName was given

predict_proba(Xpath_or_data, rName=None, **kwargs)

predict probabilities

Calls .predict_proba(X). If rName is given the result is stored as object rName

Parameters:
  • Xname – name of the X dataset

  • rName – name of the resulting dataset (optional)

Returns:

the data returned by .predict_proba, or the metadata of the rName dataset if rName was given

score(Xname, Yname=None, rName=None, **kwargs)

calculate score

Calls .score(X, y, **kwargs). If rName is given the result is stored as object rName

Parameters:
  • Xname – name of the X dataset

  • yName – name of the y dataset

  • rName – name of the resulting dataset (optional)

Returns:

the data returned by .score, or the metadata of the rName dataset if rName was given

transform(Xname, rName=None, **kwargs)

transform X

Calls .transform(X, **kwargs). If rName is given the result is stored as object rName

Parameters:
  • Xname – name of the X dataset

  • rName – name of the resulting dataset (optional)

Returns:

the data returned by .transform, or the metadata of the rName dataset if rName was given

class omegaml.runtimes.mixins.GridSearchMixin
gridsearch(Xname, Yname=None, parameters=None, pure_python=False, **kwargs)

run gridsearch on model

Parameters:
  • Xname (str|obj) – the name of the X dataset in om.datasets, or the data object

  • Yname (str|obj) – the name of the Y dataset in om.datasets, or the data object

  • parameters (dict) – input to GridSearchCV(…, param_grid=parameters)

See also

  • sklearn.model_selection.GridSearchCV

class omegaml.runtimes.mixins.taskcanvas.CanvasTask(canvasfn)

support for canvas tasks

See also

  • om.runtime.sequence

  • om.runtime.parallel

  • om.runtime.mapreduce