Public API¶
Python API (overview)¶
the |
|
the |
|
|
the |
|
the |
|
The storage backend for models and data |
|
omegaml compute cluster gateway |
|
omegaml compute cluster gateway to a dask distributed cluster |
|
proxy to a remote model in a celery worker |
|
proxy to a remote job in a celery worker |
|
Omega Jobs API |
|
A DataFrame for mongodb |
|
a Grouper for MDataFrames |
|
implements the LocIndexer for MDataFrames |
|
implements the position-based indexer for MDataFrames |
sqlalchemy plugin for omegaml |
Python API¶
omega|ml¶
- omegaml.datasets - storage for data¶
the
omegaml.store.base.OmegaStore
store for datasets
- omegaml.models - storage for models¶
the
omegaml.store.base.OmegaStore
store for models
- omegaml.runtimes - the cluster runtime API¶
- omegaml.notebook.jobs - the lambda compute service¶
omegaml.store¶
- class omegaml.store.base.OmegaStore(mongo_url=None, bucket=None, prefix=None, kind=None, defaults=None, dbalias=None)
The storage backend for models and data
- drop(name, force=False, version=-1, **kwargs)
Drop the object
- Parameters:
name – The name of the object
force – If True ignores DoesNotExist exception, defaults to False meaning this raises a DoesNotExist exception of the name does not exist
- Returns:
True if object was deleted, False if not. If force is True and the object does not exist it will still return True
- Raises:
DoesNotExist if the object does not exist and
`force=False`
- get(name, version=-1, force_python=False, kind=None, **kwargs)
Retrieve an object
- Parameters:
name – The name of the object
version – Version of the stored object (not supported)
force_python – Return as a python object
kwargs – kwargs depending on object kind
- Returns:
an object, estimator, pipelines, data array or pandas dataframe previously stored with put()
- getl(*args, **kwargs)
return a lazy MDataFrame for a given object
Same as .get, but returns a MDataFrame
- list(pattern=None, regexp=None, kind=None, raw=False, hidden=None, include_temp=False, bucket=None, prefix=None, filter=None)
List all files in store
specify pattern as a unix pattern (e.g.
models/*
, or specify regexp)- Parameters:
pattern – the unix file pattern or None for all
regexp – the regexp. takes precedence over pattern
raw – if True return the meta data objects
filter – specify additional filter criteria, optional
- Returns:
List of files in store
- put(obj, name, attributes=None, kind=None, replace=False, **kwargs)
Stores an object, store estimators, pipelines, numpy arrays or pandas dataframes
omegaml.runtimes¶
- class omegaml.runtimes.OmegaRuntime(omega, bucket=None, defaults=None, celeryconf=None)¶
omegaml compute cluster gateway
- model(modelname, require=None)¶
return a model for remote execution
- Parameters:
modelname (str) – the name of the object in om.models
require (dict) – routing requirements for this job
- Returns:
OmegaModelProxy
- class omegaml.runtimes.OmegaRuntimeDask(omega, dask_url=None)¶
omegaml compute cluster gateway to a dask distributed cluster
set environ DASK_DEBUG=1 to run dask tasks locally
- model(modelname)¶
return a model for remote execution
- class omegaml.runtimes.OmegaModelProxy(modelname, runtime=None)¶
proxy to a remote model in a celery worker
The proxy provides the same methods as the model but will execute the methods using celery tasks and return celery AsyncResult objects
Usage:
om = Omega() # train a model # result is AsyncResult, use .get() to return it's result result = om.runtime.model('foo').fit('datax', 'datay') result.get() # predict result = om.runtime.model('foo').predict('datax') # result is AsyncResult, use .get() to return it's result print result.get()
Notes
The actual methods of ModelProxy are defined in its mixins
See also
ModelMixin
GridSearchMixin
- experiment(experiment=None, label=None, provider=None, **tracker_kwargs)¶
return the experiment for this model
If an experiment does not exist yet, it will be created. The experiment is automatically set to track this model, unless another experiment has already been set to track this model for the same label. If a previous model has been set to track this model it will be returned. If an experiment name is passed it will be used.
- Parameters:
experiment (str) – the experiment name, defaults to the modelname
label (str) – the runtime label, defaults to ‘default’
provider (str) – the provider to use, defaults to ‘default’
tracker_kwargs (dict) – additional kwargs to pass to the tracker
- Returns:
OmegaTrackingProxy() instance
- experiments(label=None, raw=False)¶
return list of experiments tracking this model
- Parameters:
label (None|str) – the label for which to return the experiments, or None for all
raw (bool) – if True return the metadata for the experiment, else return the OmegaTrackingProxy
- Returns:
- mapping of label => instance of OmegaTrackingProxy if not raw, else Metadata,
includes a dummy label ‘_all_’, listing all experiments that track this model.
- Return type:
instances (dict)
Changed in version 0.17: returns a dict instead of a list
- task(name)¶
return the task from the runtime with requirements applied
- class omegaml.runtimes.OmegaJobProxy(jobname, runtime=None)¶
proxy to a remote job in a celery worker
Usage:
om = Omega() # result is AsyncResult, use .get() to return it's result result = om.runtime.job('foojob').run() result.get() # result is AsyncResult, use .get() to return it's result result = om.runtime.job('foojob').schedule() result.get()
- run(timeout=None, **kwargs)¶
submit the job for immediate execution
- Parameters:
timeout (int) – optional, timeout in seconds
**kwargs – kwargs to CeleryTask.delay
- Returns:
AsyncResult
- schedule(**kwargs)¶
schedule the job for repeated execution
- Parameters:
**kwargs – see OmegaJob.schedule()
omegaml.jobs¶
- class omegaml.notebook.jobs.OmegaJobs(bucket=None, prefix=None, store=None, defaults=None)¶
Omega Jobs API
- run(name, event=None, timeout=None)¶
Run a job immediately
The job is run and the results are stored in om.jobs(‘results/name <timestamp>’) and the result’s Metadata is returned.
Metadata.attributes
of the original job as given by name is updated:attributes['job_runs']
(list) - list of status of each run. Status isa dict as below
attributes['job_results']
(list) - list of results job names in sameindex-order as job_runs
attributes['trigger']
(list) - list of triggers
The status of each job run is a dict with keys:
status
(str): the status of the job run, OK or ERRORts
(datetime): time of executionmessage
(str): error mesasge in case of ERROR, else blankresults
(str): name of results in case of OK, else blank
Usage:
# directly (sync) meta = om.jobs.run('mynb') # via runtime (async) job = om.runtime.job('mynb') result = job.run()
- Parameters:
name (str) – the name of the jobfile
event (str) – an event name
timeout (int) – timeout in seconds, None means no timeout
- Returns:
Metadata of the results entry
See also
OmegaJobs.run_notebook
- run_notebook(name, event=None, timeout=None)¶
run a given notebook immediately.
- Parameters:
name (str) – the name of the jobfile
event (str) – an event name
timeout (int) – timeout in seconds
- Returns:
Metadata of results
See also
- schedule(nb_file, run_at=None, last_run=None)¶
Schedule a processing of a notebook as per the interval specified on the job script
Notes
This updates the notebook’s Metadata entry by adding the next scheduled run in
attributes['triggers']`
- Parameters:
nb_file (str) – the name of the notebook
run_at (str|dict|JobSchedule) – the schedule specified in a format suitable for JobSchedule. If not specified, this value is extracted from the first cell of the notebook
last_run (datetime) – the last time this job was run, use this to reschedule the job for the next run. Defaults to the last timestamp listed in
attributes['job_runs']
, or datetime.utcnow() if no previous run exists.
See also
croniter.get_next()
JobSchedule
OmegaJobs.get_notebook_config
cron expression - https://en.wikipedia.org/wiki/Cron#CRON_expression
omegaml.mdataframe¶
- class omegaml.mdataframe.MDataFrame(collection, columns=None, query=None, limit=None, skip=None, sort_order=None, force_columns=None, immediate_loc=False, auto_inspect=False, normalize=False, raw=False, parser=None, preparefn=None, from_loc_range=False, metadata=None, **kwargs)¶
A DataFrame for mongodb
Performs out-of-core, lazy computOation on a mongodb cluster. Behaves like a pandas DataFrame. Actual results are returned as pandas DataFrames.
- __len__()¶
the projected number of rows when resolving
- create_index(keys, **kwargs)¶
create and index the easy way
- groupby(columns, sort=True)¶
Group by a given set of columns
- Parameters:
columns – the list of columns
sort – if True sort by group key
- Returns:
MGrouper
- head(limit=10)¶
return up to limit numbers of rows
- Parameters:
limit – the number of rows to return. Defaults to 10
- Returns:
the MDataFrame
- inspect(explain=False, cached=False, cursor=None, raw=False)¶
inspect this dataframe’s actual mongodb query
- Parameters:
explain – if True explains access path
- property loc¶
Access by index
Use as mdf.loc[index_value]
- Returns:
MLocIndexer
- merge(right, on=None, left_on=None, right_on=None, how='inner', target=None, suffixes=('_x', '_y'), sort=False, inspect=False, filter=None)¶
merge this dataframe with another dataframe. only left outer joins are currently supported. the output is saved as a new collection, target name (defaults to a generated name if not specified).
- Parameters:
right – the other MDataFrame
on – the list of key columns to merge by
left_on – the list of the key columns to merge on this dataframe
right_on – the list of the key columns to merge on the other dataframe
how – the method to merge. supported are left, inner, right. Defaults to inner
target – the name of the collection to store the merge results in. If not provided a temporary name will be created.
suffixes – the suffixes to apply to identical left and right columns
sort – if True the merge results will be sorted. If False the MongoDB natural order is implied.
- Returns:
the MDataFrame to the target MDataFrame
- query(*args, **kwargs)¶
return a new MDataFrame with a filter criteria
Any subsequent operation on the new dataframe will have the filter applied. To reset the filter call .reset() without arguments.
Note: Unlike pandas DataFrames, a filtered MDataFrame operates on the same collection as the original DataFrame
- Parameters:
args – a Q object or logical combination of Q objects (optional)
kwargs – all AND filter criteria
- Returns:
a new MDataFrame with the filter applied
- query_inplace(*args, **kwargs)¶
filters this MDataFrame and returns it.
Any subsequent operation on the dataframe will have the filter applied. To reset the filter call .reset() without arguments.
- Parameters:
args – a Q object or logical combination of Q objects (optional)
kwargs – all AND filter criteria
- Returns:
self
- skip(topn)¶
skip the topn number of rows
- Parameters:
topn – the number of rows to skip.
- Returns:
the MDataFrame
- sort(columns)¶
sort by specified columns
- Parameters:
columns – str of single column or a list of columns. Sort order is specified as the + (ascending) or - (descending) prefix to the column name. Default sort order is ascending.
- Returns:
the MDataFrame
- property value¶
resolve the query and return a Pandas DataFrame
- Returns:
the result of the query as a pandas DataFrame
- class omegaml.mdataframe.MSeries(*args, **kwargs)¶
Series implementation for MDataFrames
behaves like a DataFrame but limited to one column.
- __len__()¶
the projected number of rows when resolving
- count()¶
projected number of rows when resolving
- create_index(keys, **kwargs)¶
create and index the easy way
- groupby(columns, sort=True)¶
Group by a given set of columns
- Parameters:
columns – the list of columns
sort – if True sort by group key
- Returns:
MGrouper
- head(limit=10)¶
return up to limit numbers of rows
- Parameters:
limit – the number of rows to return. Defaults to 10
- Returns:
the MDataFrame
- inspect(explain=False, cached=False, cursor=None, raw=False)¶
inspect this dataframe’s actual mongodb query
- Parameters:
explain – if True explains access path
- iterchunks(chunksize=100)¶
return an iterator
- Parameters:
chunksize (int) – number of rows in each chunk
- Returns:
a dataframe of max. length chunksize
- list_indexes()¶
list all indices in database
- property loc¶
Access by index
Use as mdf.loc[index_value]
- Returns:
MLocIndexer
- merge(right, on=None, left_on=None, right_on=None, how='inner', target=None, suffixes=('_x', '_y'), sort=False, inspect=False, filter=None)¶
merge this dataframe with another dataframe. only left outer joins are currently supported. the output is saved as a new collection, target name (defaults to a generated name if not specified).
- Parameters:
right – the other MDataFrame
on – the list of key columns to merge by
left_on – the list of the key columns to merge on this dataframe
right_on – the list of the key columns to merge on the other dataframe
how – the method to merge. supported are left, inner, right. Defaults to inner
target – the name of the collection to store the merge results in. If not provided a temporary name will be created.
suffixes – the suffixes to apply to identical left and right columns
sort – if True the merge results will be sorted. If False the MongoDB natural order is implied.
- Returns:
the MDataFrame to the target MDataFrame
- query(*args, **kwargs)¶
return a new MDataFrame with a filter criteria
Any subsequent operation on the new dataframe will have the filter applied. To reset the filter call .reset() without arguments.
Note: Unlike pandas DataFrames, a filtered MDataFrame operates on the same collection as the original DataFrame
- Parameters:
args – a Q object or logical combination of Q objects (optional)
kwargs – all AND filter criteria
- Returns:
a new MDataFrame with the filter applied
- query_inplace(*args, **kwargs)¶
filters this MDataFrame and returns it.
Any subsequent operation on the dataframe will have the filter applied. To reset the filter call .reset() without arguments.
- Parameters:
args – a Q object or logical combination of Q objects (optional)
kwargs – all AND filter criteria
- Returns:
self
- property shape¶
return shape of dataframe
- skip(topn)¶
skip the topn number of rows
- Parameters:
topn – the number of rows to skip.
- Returns:
the MDataFrame
- sort(columns)¶
sort by specified columns
- Parameters:
columns – str of single column or a list of columns. Sort order is specified as the + (ascending) or - (descending) prefix to the column name. Default sort order is ascending.
- Returns:
the MDataFrame
- tail(limit=10)¶
return up to limit number of rows from last inserted values
- Parameters:
limit
- Returns:
- unique()¶
return the unique set of values for the series
- Returns:
MSeries
- property value¶
return the value of the series
this is a Series unless unique() was called. If unique() only distinct values are returned as an array, matching the behavior of a Series
- Returns:
pandas.Series
- class omegaml.mdataframe.MGrouper(mdataframe, collection, columns, sort=True)¶
a Grouper for MDataFrames
- agg(specs)¶
shortcut for .aggregate
- aggregate(specs, **kwargs)¶
aggregate by given specs
See the following link for a list of supported operations. https://docs.mongodb.com/manual/reference/operator/aggregation/group/
- Parameters:
specs – a dictionary of { column : function | list[functions] } pairs.
- count()¶
return counts by group columns
- class omegaml.mdataframe.MLocIndexer(mdataframe, positional=False)¶
implements the LocIndexer for MDataFrames
- __getitem__(specs)¶
access by index
use as mdf.loc[specs] where specs is any of
a list or tuple of scalar index values, e.g. .loc[(1,2,3)]
a slice of values e.g. .loc[1:5]
a list of slices, e.g. .loc[1:5, 2:3]
- Returns:
the sliced part of the MDataFrame
- class omegaml.mdataframe.MPosIndexer(mdataframe)¶
implements the position-based indexer for MDataFrames
- __getitem__(specs)¶
access by index
use as mdf.loc[specs] where specs is any of
a list or tuple of scalar index values, e.g. .loc[(1,2,3)]
a slice of values e.g. .loc[1:5]
a list of slices, e.g. .loc[1:5, 2:3]
- Returns:
the sliced part of the MDataFrame
- class omegaml.mixins.mdf.ApplyContext(caller, columns=None, index=None)¶
Enable apply functions
.apply(fn) will call fn(ctx) where ctx is an ApplyContext. The context supports methods to apply functions in a Pandas-style apply manner. ApplyContext is extensible by adding an extension class to defaults.OMEGA_MDF_APPLY_MIXINS.
Note that unlike a Pandas DataFrame, ApplyContext does not itself contain any data. Rather it is part of an expression tree, i.e. the aggregation pipeline. Thus any expressions applied are translated into operations on the expression tree. The expression tree is evaluated on MDataFrame.value, at which point the ApplyContext nor the function that created it are active.
Examples:
mdf.apply(lambda v: v * 5 ) => multiply every column in dataframe mdf.apply(lambda v: v['foo'].dt.week) => get week of date for column foo mdf.apply(lambda v: dict(a=v['foo'].dt.week, b=v['bar'] * 5) => run multiple pipelines and get results The callable passed to apply can be any function. It can either return None, the context passed in or a list of pipeline stages. # apply any of the below functions mdf.apply(customfn) # same as lambda v: v.dt.week def customfn(ctx): return ctx.dt.week # simple pipeline def customfn(ctx): ctx.project(x={'$multiply: ['$x', 5]}) ctx.project(y={'$divide: ['$x', 2]}) # complex pipeline def customfn(ctx): return [ { '$match': ... }, { '$project': ... }, ]
- class omegaml.mixins.mdf.ApplyArithmetics¶
Math operators for ApplyContext
__mul__
(*)__add__
(+)__sub__
(-)__div__
(/)__floordiv__
(//)__mod__
(%)__pow__
(pow)__ceil__
(ceil)__floor__
(floor)__trunc__
(trunc)__abs__
(abs)sqrt
(math.sqrt)
- __add__(other)¶
add
- __mul__(other)¶
multiply
- class omegaml.mixins.mdf.ApplyDateTime¶
Datetime operators for ApplyContext
- class omegaml.mixins.mdf.ApplyString¶
String operators
- class omegaml.mixins.mdf.ApplyAccumulators¶
Backends:
- class omegaml.backends.sqlalchemy.SQLAlchemyBackend(model_store=None, data_store=None, tracking=None, **kwargs)¶
sqlalchemy plugin for omegaml
Usage:
Define your sqlalchemy connection:
sqlalchemy_constr = f'sqlalchemy://{user}:{password}@{account}/'
Store the connection in any of three ways:
# -- just the connection om.datasets.put(sqlalchemy_constr, 'mysqlalchemy') om.datasets.get('mysqlalchemy', raw=True) => the sql connection object # -- store connection with a predefined sql om.datasets.put(sqlalchemy_constr, 'mysqlalchemy', sql='select ....') om.datasets.get('mysqlalchemy') => will return a pandas dataframe using the specified sql to run. specify chunksize= to return an interable of dataframes # -- predefined sqls can contain variables to be resolved at access time # if you miss to specify required variables in sqlvars, a KeyError is raised om.datasets.put(sqlaclhemy_constr, 'myview', sql='select ... from T1 where col="{var}"') om.datasets.get('mysqlalchemy', sqlvars=dict(var="value")) # -- Variables are replaced by binding parameters, which is safe for # untrusted inputs. To replace variables as strings, use double # `{{variable}}` notation. A warning will be issued because this # is considered an unsafe practice for untrusted input (REST API). # It is in your responsibility to sanitize the value of the `cols` variable. om.datasets.put(sqlaclhemy_constr, 'myview', sql='select {{cols}} from T1 where col="{var}"') om.datasets.get('mysqlalchemy', sqlvars=dict(cols='foo, bar', var="value"))
Query data from a connection and store into an omega-ml dataset:
# -- copy the result of the sqlalchemy query to omegaml om.datasets.put(sqlalchemy_constr, 'mysqlalchemy', sql='select ...', copy=True) om.datasets.get('mysqlalchemy') => will return a pandas dataframe (without executing any additional queries) => can also use with om.datasets.getl('mysqlalchemy') to return a MDataFrame
Controlling the table used in the connection:
# -- the default table is {bucket}_{name}, override using table='myname' om.datasets.put(sqlalchemy_constr, 'mysqlalchemy', table='mytable', sql='select ...', copy=True) om.datasets.get('mysqlalchemy') # read from {bucket}_myname # -- to use a specific table, without bucket information use table=':myname' om.datasets.put(sqlalchemy_constr, 'mysqlalchemy', table=':mytable', sql='select ...', copy=True) om.datasets.get('mysqlalchemy') # read from myname
Inserting data via a previously stored connection:
# -- store data back through the connection om.datasets.put(sqlalchemy_constr, 'mysqlalchemy') om.datasets.put(df, 'mysqlalchemy', table='SOMETABLE')
Using variables in connection strings:
Connection strings may contain variables, e.g. userid and password. By default variables are resolved from the os environment. Can also specify using any dict.:
# -- use connection string with variables sqlalchemy_constr = 'sqlite:///{dbname}.db' om.datasets.put(sqlalchemy_constr, 'userdb') om.datasets.get('userdb', secrets=dict(dbname='chuckdb')) # -- alternatively, create a vault dataset: secrets = dict(userid='chuck', dbname='chuckdb') om.datasets.put(secrets, '.omega/vault') om.datasets.get('userdb') the '.omega/vault' dataset will be queried using the current userid as the secret name, and the dbname retrieved from the document. This is experimental and the vault is not encrypted.
Advanced:
om.datasets.put()
supports the following additional keyword argumentschunksize=int
- specify the number of rows to read from sqlalchemy in one chunk. defaults to 10000parse_dates=['col', ...]
- list of column names to parse for date, time or datetime. see pd.read_sql for detailstransform=callable
- a callable, is passed the DataFrame of each chunk before it is inserted into the database. use to provide custom transformations. only works on copy=Trueany other kwargs supported by
pandas.read_sql