om.runtime¶
- omegaml.runtime = OmegaRuntime(Omega())¶
the
omegaml.runtimes.runtime.OmegaRuntime
runtimeMethods to run models, scripts, jobs, experiments:
Methods to manage task distribution, logging:
Methods to inspect runtime state:
Backends:
omegaml.runtimes.modelproxy.OmegaModelProxy
omegaml.runtimes.jobproxy.OmegaJobProxy
omegaml.runtimes.scriptproxy.OmegaScriptProxy
omegaml.runtimes.trackingproxy.OmegaTrackingProxy
Mixins:
- class omegaml.runtimes.OmegaRuntime(omega, bucket=None, defaults=None, celeryconf=None)¶
omegaml compute cluster gateway
- callback(script_name, always=False, **kwargs)¶
Add a callback to a registered script
The callback will be triggered upon successful or failed execution of the runtime tasks. The script syntax is:
# script.py def run(om, state=None, result=None, **kwargs): # state (str): 'SUCCESS'|'ERROR' # result (obj): the task's serialized result
- Parameters:
script_name (str) – the name of the script (in om.scripts)
always (bool) – if True always apply this callback, defaults to False
**kwargs – and other kwargs to pass on to the script
- Returns:
self
- enable_hostqueues()¶
enable a worker-specific queue on every worker host
- Returns:
list of labels (one entry for each hostname)
- experiment(experiment, provider=None, implied_run=True, recreate=False, **tracker_kwargs)¶
set the tracking backend and experiment
- Parameters:
experiment (str) – the name of the experiment
provider (str) – the name of the provider
tracker_kwargs (dict) – additional kwargs for the tracker
recreate (bool) – if True, recreate the experiment (i.e. drop and recreate, this is useful to change the provider or other settings. All previous data will be kept)
- Returns:
OmegaTrackingProxy
- job(jobname, require=None)¶
return a job for remote execution
- Parameters:
jobname (str) – the name of the object in om.jobs
require (dict) – routing requirements for this job
- Returns:
OmegaJobProxy
- labels()¶
list available labels
- Returns:
dict of workers => list of lables
- mapreduce()¶
context manager to support sequenced, parallel and mapreduce tasks
Usage:
# run tasks async, in sequence with om.runtime.sequence() as crt: crt.model('mymodel').fit(...) crt.model('mymodel').predict(...) result = crt.run() # run tasks async, in parallel with om.runtime.parallel() as crt: crt.model('mymodel').predict(...) crt.model('mymodel').predict(...) result = crt.run() # run tasks async, in parallel with a final step with om.runtime.mapreduce() as crt: # map tasks crt.model('mymodel').predict(...) crt.model('mymodel').predict(...) # reduce results - combined is a virtualobj function crt.model('combined').reduce(...) result = crt.run() # combined is a virtual obj function, e.g. @virtualobj def combined(data=None, **kwargs): # data is the list of results from each map step return data Note that the statements inside the context are executed in sequence, as any normal python code. However, the actual tasks are only executed on calling crt.run()
- Parameters:
self – the runtime
- Returns:
OmegaRuntime, for use within context
- mode(local=None, logging=None)¶
specify runtime modes
- Parameters:
local (bool) – if True, all execution will run locally, else on the configured remote cluster
logging (bool|str|tuple) – if True, will set the root logger output at INFO level; a single string is the name of the logger, typically a module name; a tuple (logger, level) will select logger and the level. Valid levels are INFO, WARNING, ERROR, CRITICAL, DEBUG
Usage:
# run all runtime tasks locally om.runtime.mode(local=True) # enable logging both in local and remote mode om.runtime.mode(logging=True) # select a specific module and level) om.runtime.mode(logging=('sklearn', 'DEBUG')) # disable logging om.runtime.mode(logging=False)
- model(modelname, require=None)¶
return a model for remote execution
- Parameters:
modelname (str) – the name of the object in om.models
require (dict) – routing requirements for this job
- Returns:
OmegaModelProxy
- parallel()¶
context manager to support sequenced, parallel and mapreduce tasks
Usage:
# run tasks async, in sequence with om.runtime.sequence() as crt: crt.model('mymodel').fit(...) crt.model('mymodel').predict(...) result = crt.run() # run tasks async, in parallel with om.runtime.parallel() as crt: crt.model('mymodel').predict(...) crt.model('mymodel').predict(...) result = crt.run() # run tasks async, in parallel with a final step with om.runtime.mapreduce() as crt: # map tasks crt.model('mymodel').predict(...) crt.model('mymodel').predict(...) # reduce results - combined is a virtualobj function crt.model('combined').reduce(...) result = crt.run() # combined is a virtual obj function, e.g. @virtualobj def combined(data=None, **kwargs): # data is the list of results from each map step return data Note that the statements inside the context are executed in sequence, as any normal python code. However, the actual tasks are only executed on calling crt.run()
- Parameters:
self – the runtime
- Returns:
OmegaRuntime, for use within context
- ping(*args, require=None, wait=True, **kwargs)¶
ping the runtime
- Parameters:
args (tuple) – task args
require (dict) – routing requirements for this job
wait (bool) – if True, wait for the task to return, else return AsyncResult
kwargs (dict) – task kwargs
- Returns:
response (dict) for wait=True
AsyncResult for wait=False
- queues()¶
list queues
- Returns:
dict of workers => list of queues
See also
celery Inspect.active_queues()
- require(label=None, always=False, routing=None, task=None, logging=None, override=True, **kwargs)¶
specify requirements for the task execution
Use this to specify resource or routing requirements on the next task call sent to the runtime. Any requirements will be reset after the call has been submitted.
- Parameters:
always (bool) – if True requirements will persist across task calls. defaults to False
label (str) – the label required by the worker to have a runtime task dispatched to it. ‘local’ is equivalent to calling self.mode(local=True).
task (dict) – if specified applied to the task kwargs
logging (str|tuple) – if specified, same as runtime.mode(logging=…)
override (bool) – if True overrides previously set .require(), defaults to True
kwargs – requirements specification that the runtime understands
- Usage:
om.runtime.require(label=’gpu’).model(‘foo’).fit(…)
- Returns:
self
- script(scriptname, require=None)¶
return a script for remote execution
- Parameters:
scriptname (str) – the name of object in om.scripts
require (dict) – routing requirements for this job
- Returns:
OmegaScriptProxy
- sequence()¶
context manager to support sequenced, parallel and mapreduce tasks
Usage:
# run tasks async, in sequence with om.runtime.sequence() as crt: crt.model('mymodel').fit(...) crt.model('mymodel').predict(...) result = crt.run() # run tasks async, in parallel with om.runtime.parallel() as crt: crt.model('mymodel').predict(...) crt.model('mymodel').predict(...) result = crt.run() # run tasks async, in parallel with a final step with om.runtime.mapreduce() as crt: # map tasks crt.model('mymodel').predict(...) crt.model('mymodel').predict(...) # reduce results - combined is a virtualobj function crt.model('combined').reduce(...) result = crt.run() # combined is a virtual obj function, e.g. @virtualobj def combined(data=None, **kwargs): # data is the list of results from each map step return data Note that the statements inside the context are executed in sequence, as any normal python code. However, the actual tasks are only executed on calling crt.run()
- Parameters:
self – the runtime
- Returns:
OmegaRuntime, for use within context
- settings(require=None)¶
return the runtimes’s cluster settings
- stats()¶
worker statistics
- Returns:
dict of workers => dict of stats
See also
celery Inspect.stats()
- task(name, **kwargs)¶
retrieve the task function from the celery instance
- Parameters:
name (str) – a registered celery task as
module.tasks.task_name
kwargs (dict) – routing keywords to CeleryTask.apply_async
- Returns:
CeleryTask
- workers()¶
list of workers
- Returns:
dict of workers => list of active tasks
See also
celery Inspect.active()
Backends¶
- class omegaml.runtimes.loky.OmegaRuntimeBackend(*args, **kwargs)¶
omega custom parallel backend to print progress
TODO: extend for celery dispatching
- start_call()¶
Call-back method called at the beginning of a Parallel call
- stop_call()¶
Call-back method called at the end of a Parallel call
- terminate()¶
Shutdown the workers and free the shared memory.
Mixins¶
- class omegaml.runtimes.mixins.ModelMixin¶
mixin methods to OmegaModelProxy
- decision_function(Xname, rName=None, **kwargs)¶
calculate score
Calls
.decision_function(X, y, **kwargs)
. If rName is given the result is stored as object rName- Parameters:
Xname – name of the X dataset
rName – name of the resulting dataset (optional)
- Returns:
the data returned by .score, or the metadata of the rName dataset if rName was given
- fit(Xname, Yname=None, **kwargs)¶
fit the model
Calls
.fit(X, Y, **kwargs)
. If instead of dataset names actual data is given, the data is stored using _fitX/fitY prefixes and a unique name.After fitting, a new model version is stored with its attributes fitX and fitY pointing to the datasets, as well as the sklearn version used.
- Parameters:
Xname – name of X dataset or data
Yname – name of Y dataset or data
- Returns:
the model (self) or the string representation (python clients)
- fit_transform(Xname, Yname=None, rName=None, **kwargs)¶
fit & transform X
Calls
.fit_transform(X, Y, **kwargs)
. If rName is given the result is stored as object rName- Parameters:
Xname – name of the X dataset
Yname – name of the Y dataset
rName – name of the resulting dataset (optional)
- Returns:
the data returned by .fit_transform, or the metadata of the rName dataset if rName was given
- partial_fit(Xname, Yname=None, **kwargs)¶
update the model
Calls
.partial_fit(X, Y, **kwargs)
. If instead of dataset names actual data is given, the data is stored using _fitX/fitY prefixes and a unique name.After fitting, a new model version is stored with its attributes fitX and fitY pointing to the datasets, as well as the sklearn version used.
- Parameters:
Xname – name of X dataset or data
Yname – name of Y dataset or data
- Returns:
the model (self) or the string representation (python clients)
- predict(Xpath_or_data, rName=None, **kwargs)¶
Calls
.predict(X)
. If rName is given the result is stored as object rName- Parameters:
Xname – name of the X dataset
rName – name of the resulting dataset (optional)
- Returns:
the data returned by .predict, or the metadata of the rName dataset if rName was given
- predict_proba(Xpath_or_data, rName=None, **kwargs)¶
predict probabilities
Calls
.predict_proba(X)
. If rName is given the result is stored as object rName- Parameters:
Xname – name of the X dataset
rName – name of the resulting dataset (optional)
- Returns:
the data returned by .predict_proba, or the metadata of the rName dataset if rName was given
- score(Xname, Yname=None, rName=None, **kwargs)¶
calculate score
Calls
.score(X, y, **kwargs)
. If rName is given the result is stored as object rName- Parameters:
Xname – name of the X dataset
yName – name of the y dataset
rName – name of the resulting dataset (optional)
- Returns:
the data returned by .score, or the metadata of the rName dataset if rName was given
- transform(Xname, rName=None, **kwargs)¶
transform X
Calls
.transform(X, **kwargs)
. If rName is given the result is stored as object rName- Parameters:
Xname – name of the X dataset
rName – name of the resulting dataset (optional)
- Returns:
the data returned by .transform, or the metadata of the rName dataset if rName was given
- class omegaml.runtimes.mixins.GridSearchMixin¶
- gridsearch(Xname, Yname=None, parameters=None, pure_python=False, **kwargs)¶
run gridsearch on model
- Parameters:
Xname (str|obj) – the name of the X dataset in om.datasets, or the data object
Yname (str|obj) – the name of the Y dataset in om.datasets, or the data object
parameters (dict) – input to GridSearchCV(…, param_grid=parameters)
See also
sklearn.model_selection.GridSearchCV
- class omegaml.runtimes.mixins.taskcanvas.CanvasTask(canvasfn)¶
support for canvas tasks
See also
om.runtime.sequence
om.runtime.parallel
om.runtime.mapreduce