om.runtime¶
- omegaml.runtime = OmegaRuntime(Omega(bucket=None))¶
- the - omegaml.runtimes.runtime.OmegaRuntimeruntime- Methods to run models, scripts, jobs, experiments: - Methods to manage task distribution, logging: - Methods to inspect runtime state: - Backends: - omegaml.runtimes.modelproxy.OmegaModelProxy
- omegaml.runtimes.jobproxy.OmegaJobProxy
- omegaml.runtimes.scriptproxy.OmegaScriptProxy
- omegaml.runtimes.trackingproxy.OmegaTrackingProxy
 - Mixins: 
- class omegaml.runtimes.OmegaRuntime(omega, bucket=None, defaults=None, celeryconf=None)[source]¶
- omegaml compute cluster gateway - callback(script_name, always=False, **kwargs)[source]¶
- Add a callback to a registered script - The callback will be triggered upon successful or failed execution of the runtime tasks. The script syntax is: - # script.py def run(om, state=None, result=None, **kwargs): # state (str): 'SUCCESS'|'ERROR' # result (obj): the task's serialized result - Parameters:
- script_name (str) – the name of the script (in om.scripts) 
- always (bool) – if True always apply this callback, defaults to False 
- **kwargs – and other kwargs to pass on to the script 
 
- Returns:
- self 
 
 - enable_hostqueues()[source]¶
- enable a worker-specific queue on every worker host - Returns:
- list of labels (one entry for each hostname) 
 
 - experiment(experiment, provider=None, implied_run=True, recreate=False, **tracker_kwargs)[source]¶
- set the tracking backend and experiment - Parameters:
- experiment (str) – the name of the experiment 
- provider (str) – the name of the provider 
- tracker_kwargs (dict) – additional kwargs for the tracker 
- recreate (bool) – if True, recreate the experiment (i.e. drop and recreate, this is useful to change the provider or other settings. All previous data will be kept) 
 
- Returns:
- OmegaTrackingProxy 
 
 - job(jobname, require=None)[source]¶
- return a job for remote execution - Parameters:
- jobname (str) – the name of the object in om.jobs 
- require (dict) – routing requirements for this job 
 
- Returns:
- OmegaJobProxy 
 
 - mapreduce()¶
- context manager to support sequenced, parallel and mapreduce tasks - Usage: - # run tasks async, in sequence with om.runtime.sequence() as crt: crt.model('mymodel').fit(...) crt.model('mymodel').predict(...) result = crt.run() # run tasks async, in parallel with om.runtime.parallel() as crt: crt.model('mymodel').predict(...) crt.model('mymodel').predict(...) result = crt.run() # run tasks async, in parallel with a final step with om.runtime.mapreduce() as crt: # map tasks crt.model('mymodel').predict(...) crt.model('mymodel').predict(...) # reduce results - combined is a virtualobj function crt.model('combined').reduce(...) result = crt.run() # combined is a virtual obj function, e.g. @virtualobj def combined(data=None, **kwargs): # data is the list of results from each map step return data Note that the statements inside the context are executed in sequence, as any normal python code. However, the actual tasks are only executed on calling crt.run() - Parameters:
- self – the runtime 
- Returns:
- OmegaRuntime, for use within context 
 
 - mode(local=None, logging=None)[source]¶
- specify runtime modes - Parameters:
- local (bool) – if True, all execution will run locally, else on the configured remote cluster 
- logging (bool|str|tuple) – if True, will set the root logger output at INFO level; a single string is the name of the logger, typically a module name; a tuple (logger, level) will select logger and the level. Valid levels are INFO, WARNING, ERROR, CRITICAL, DEBUG 
 
 - Usage: - # run all runtime tasks locally om.runtime.mode(local=True) # enable logging both in local and remote mode om.runtime.mode(logging=True) # select a specific module and level) om.runtime.mode(logging=('sklearn', 'DEBUG')) # disable logging om.runtime.mode(logging=False) 
 - model(modelname, require=None)[source]¶
- return a model for remote execution - Parameters:
- modelname (str) – the name of the object in om.models 
- require (dict) – routing requirements for this job 
 
- Returns:
- OmegaModelProxy 
 
 - parallel()¶
- context manager to support sequenced, parallel and mapreduce tasks - Usage: - # run tasks async, in sequence with om.runtime.sequence() as crt: crt.model('mymodel').fit(...) crt.model('mymodel').predict(...) result = crt.run() # run tasks async, in parallel with om.runtime.parallel() as crt: crt.model('mymodel').predict(...) crt.model('mymodel').predict(...) result = crt.run() # run tasks async, in parallel with a final step with om.runtime.mapreduce() as crt: # map tasks crt.model('mymodel').predict(...) crt.model('mymodel').predict(...) # reduce results - combined is a virtualobj function crt.model('combined').reduce(...) result = crt.run() # combined is a virtual obj function, e.g. @virtualobj def combined(data=None, **kwargs): # data is the list of results from each map step return data Note that the statements inside the context are executed in sequence, as any normal python code. However, the actual tasks are only executed on calling crt.run() - Parameters:
- self – the runtime 
- Returns:
- OmegaRuntime, for use within context 
 
 - ping(*args, require=None, wait=True, timeout=10, **kwargs)[source]¶
- ping the runtime - Parameters:
- args (tuple) – task args 
- require (dict) – routing requirements for this job 
- wait (bool) – if True, wait for the task to return, else return AsyncResult 
- timeout (int) – if wait is True, the timeout in seconds, defaults to 10 
- kwargs (dict) – task kwargs, as accepted by CeleryTask.apply_async 
 
- Returns:
- response (dict) for wait=True 
- AsyncResult for wait=False 
 
 
 - queues()[source]¶
- list queues - Returns:
- dict of workers => list of queues 
 - See also - celery Inspect.active_queues() 
 - require(label=None, always=False, routing=None, task=None, logging=None, override=True, **kwargs)[source]¶
- specify requirements for the task execution - Use this to specify resource or routing requirements on the next task call sent to the runtime. Any requirements will be reset after the call has been submitted. - Parameters:
- always (bool) – if True requirements will persist across task calls. defaults to False 
- label (str) – the label required by the worker to have a runtime task dispatched to it. ‘local’ is equivalent to calling self.mode(local=True). 
- task (dict) – if specified applied to the task kwargs 
- logging (str|tuple) – if specified, same as runtime.mode(logging=…) 
- override (bool) – if True overrides previously set .require(), defaults to True 
- kwargs – requirements specification that the runtime understands 
 
 - Usage:
- om.runtime.require(label=’gpu’).model(‘foo’).fit(…) 
 - Returns:
- self 
 
 - script(scriptname, require=None)[source]¶
- return a script for remote execution - Parameters:
- scriptname (str) – the name of object in om.scripts 
- require (dict) – routing requirements for this job 
 
- Returns:
- OmegaScriptProxy 
 
 - sequence()¶
- context manager to support sequenced, parallel and mapreduce tasks - Usage: - # run tasks async, in sequence with om.runtime.sequence() as crt: crt.model('mymodel').fit(...) crt.model('mymodel').predict(...) result = crt.run() # run tasks async, in parallel with om.runtime.parallel() as crt: crt.model('mymodel').predict(...) crt.model('mymodel').predict(...) result = crt.run() # run tasks async, in parallel with a final step with om.runtime.mapreduce() as crt: # map tasks crt.model('mymodel').predict(...) crt.model('mymodel').predict(...) # reduce results - combined is a virtualobj function crt.model('combined').reduce(...) result = crt.run() # combined is a virtual obj function, e.g. @virtualobj def combined(data=None, **kwargs): # data is the list of results from each map step return data Note that the statements inside the context are executed in sequence, as any normal python code. However, the actual tasks are only executed on calling crt.run() - Parameters:
- self – the runtime 
- Returns:
- OmegaRuntime, for use within context 
 
 - stats()[source]¶
- worker statistics - Returns:
- dict of workers => dict of stats 
 - See also - celery Inspect.stats() 
 - status()[source]¶
- current cluster status - This collects key information from .labels(), .stats() and the latest worker heartbeat. Note that loadavg is only available if the worker has recently sent a heartbeat and may not be accurate across the cluster. - Returns:
- a snapshot of the cluster status
- ’<worker>’: {
- ‘loadavg’: [0.0, 0.0, 0.0], # load average in % seen by the worker (1, 5, 15 min) ‘processes’: 1, # number of active worker processes ‘concurrency’: 1, # max concurrency ‘uptime’: 0, # uptime in seconds ‘processed’: Counter(task=n), # number of tasks processed ‘queues’: [‘default’], # list of queues (labels) the worker is listening on 
 - } 
 
- Return type:
- snapshot (dict) 
 
 
Backends¶
Mixins¶
- class omegaml.runtimes.mixins.ModelMixin[source]¶
- mixin methods to OmegaModelProxy - complete(Xname, rName=None, **kwargs)[source]¶
- Calls - .complete(X). If rName is given the result is stored as object rName- Parameters:
- Xname – name of the X dataset 
- rName – name of the resulting dataset (optional) 
 
- Returns:
- the data returned by .complete, or the metadata of the rName dataset if rName was given 
 
 - decision_function(Xname, rName=None, **kwargs)[source]¶
- calculate score - Calls - .decision_function(X, y, **kwargs). If rName is given the result is stored as object rName- Parameters:
- Xname – name of the X dataset 
- rName – name of the resulting dataset (optional) 
 
- Returns:
- the data returned by .score, or the metadata of the rName dataset if rName was given 
 
 - embed(Xname, rName=None, **kwargs)[source]¶
- Calls - .embed(X). If rName is given the result is stored as object rName- Parameters:
- Xname – name of the X dataset 
- rName – name of the resulting dataset (optional) 
 
- Returns:
- the data returned by .embed, or the metadata of the rName dataset if rName was given 
 
 - fit(Xname, Yname=None, **kwargs)[source]¶
- fit the model - Calls - .fit(X, Y, **kwargs). If instead of dataset names actual data is given, the data is stored using _fitX/fitY prefixes and a unique name.- After fitting, a new model version is stored with its attributes fitX and fitY pointing to the datasets, as well as the sklearn version used. - Parameters:
- Xname – name of X dataset or data 
- Yname – name of Y dataset or data 
 
- Returns:
- the model (self) or the string representation (python clients) 
 
 - fit_transform(Xname, Yname=None, rName=None, **kwargs)[source]¶
- fit & transform X - Calls - .fit_transform(X, Y, **kwargs). If rName is given the result is stored as object rName- Parameters:
- Xname – name of the X dataset 
- Yname – name of the Y dataset 
- rName – name of the resulting dataset (optional) 
 
- Returns:
- the data returned by .fit_transform, or the metadata of the rName dataset if rName was given 
 
 - partial_fit(Xname, Yname=None, **kwargs)[source]¶
- update the model - Calls - .partial_fit(X, Y, **kwargs). If instead of dataset names actual data is given, the data is stored using _fitX/fitY prefixes and a unique name.- After fitting, a new model version is stored with its attributes fitX and fitY pointing to the datasets, as well as the sklearn version used. - Parameters:
- Xname – name of X dataset or data 
- Yname – name of Y dataset or data 
 
- Returns:
- the model (self) or the string representation (python clients) 
 
 - predict(Xpath_or_data, rName=None, **kwargs)[source]¶
- Calls - .predict(X). If rName is given the result is stored as object rName- Parameters:
- Xname – name of the X dataset 
- rName – name of the resulting dataset (optional) 
 
- Returns:
- the data returned by .predict, or the metadata of the rName dataset if rName was given 
 
 - predict_proba(Xpath_or_data, rName=None, **kwargs)[source]¶
- predict probabilities - Calls - .predict_proba(X). If rName is given the result is stored as object rName- Parameters:
- Xname – name of the X dataset 
- rName – name of the resulting dataset (optional) 
 
- Returns:
- the data returned by .predict_proba, or the metadata of the rName dataset if rName was given 
 
 - score(Xname, Yname=None, rName=None, **kwargs)[source]¶
- calculate score - Calls - .score(X, y, **kwargs). If rName is given the result is stored as object rName- Parameters:
- Xname – name of the X dataset 
- yName – name of the y dataset 
- rName – name of the resulting dataset (optional) 
 
- Returns:
- the data returned by .score, or the metadata of the rName dataset if rName was given 
 
 - transform(Xname, rName=None, **kwargs)[source]¶
- transform X - Calls - .transform(X, **kwargs). If rName is given the result is stored as object rName- Parameters:
- Xname – name of the X dataset 
- rName – name of the resulting dataset (optional) 
 
- Returns:
- the data returned by .transform, or the metadata of the rName dataset if rName was given 
 
 
- class omegaml.runtimes.mixins.GridSearchMixin[source]¶
- gridsearch(Xname, Yname=None, parameters=None, pure_python=False, **kwargs)[source]¶
- run gridsearch on model - Parameters:
- Xname (str|obj) – the name of the X dataset in om.datasets, or the data object 
- Yname (str|obj) – the name of the Y dataset in om.datasets, or the data object 
- parameters (dict) – input to GridSearchCV(…, param_grid=parameters) 
 
 - See also - sklearn.model_selection.GridSearchCV