Public API

Python API (overview)

omegaml.datasets

the omegaml.store.base.OmegaStore store for datasets

omegaml.models

the omegaml.store.base.OmegaStore store for models

omegaml.runtimes

omegaml.jobs

the omegaml.notebook.jobs.OmegaJobs store for jobs

omegaml.scripts

the omegaml.store.base.OmegaStore store for scripts

omegaml.store.base.OmegaStore([mongo_url, ...])

The storage backend for models and data

omegaml.runtimes.OmegaRuntime(omega[, ...])

omegaml compute cluster gateway

omegaml.runtimes.OmegaRuntimeDask(omega[, ...])

omegaml compute cluster gateway to a dask distributed cluster

omegaml.runtimes.OmegaModelProxy(modelname)

proxy to a remote model in a celery worker

omegaml.runtimes.OmegaJobProxy(jobname[, ...])

proxy to a remote job in a celery worker

omegaml.notebook.jobs.OmegaJobs([bucket, ...])

Omega Jobs API

omegaml.mdataframe.MDataFrame(collection[, ...])

A DataFrame for mongodb

omegaml.mdataframe.MGrouper(mdataframe, ...)

a Grouper for MDataFrames

omegaml.mdataframe.MLocIndexer(mdataframe[, ...])

implements the LocIndexer for MDataFrames

omegaml.mdataframe.MPosIndexer(mdataframe)

implements the position-based indexer for MDataFrames

omegaml.backends.sqlalchemy.SQLAlchemyBackend([...])

sqlalchemy plugin for omegaml

Python API

omega|ml

omegaml.datasets - storage for data

the omegaml.store.base.OmegaStore store for datasets

omegaml.models - storage for models

the omegaml.store.base.OmegaStore store for models

omegaml.runtimes - the cluster runtime API
omegaml.notebook.jobs - the lambda compute service

omegaml.store

class omegaml.store.base.OmegaStore(mongo_url=None, bucket=None, prefix=None, kind=None, defaults=None, dbalias=None)

The storage backend for models and data

drop(name, force=False, version=-1, **kwargs)

Drop the object

Parameters:
  • name – The name of the object

  • force – If True ignores DoesNotExist exception, defaults to False meaning this raises a DoesNotExist exception of the name does not exist

Returns:

True if object was deleted, False if not. If force is True and the object does not exist it will still return True

Raises:

DoesNotExist if the object does not exist and `force=False`

get(name, version=-1, force_python=False, kind=None, **kwargs)

Retrieve an object

Parameters:
  • name – The name of the object

  • version – Version of the stored object (not supported)

  • force_python – Return as a python object

  • kwargs – kwargs depending on object kind

Returns:

an object, estimator, pipelines, data array or pandas dataframe previously stored with put()

getl(*args, **kwargs)

return a lazy MDataFrame for a given object

Same as .get, but returns a MDataFrame

list(pattern=None, regexp=None, kind=None, raw=False, hidden=None, include_temp=False, bucket=None, prefix=None, filter=None)

List all files in store

specify pattern as a unix pattern (e.g. models/*, or specify regexp)

Parameters:
  • pattern – the unix file pattern or None for all

  • regexp – the regexp. takes precedence over pattern

  • raw – if True return the meta data objects

  • filter – specify additional filter criteria, optional

Returns:

List of files in store

put(obj, name, attributes=None, kind=None, replace=False, **kwargs)

Stores an object, store estimators, pipelines, numpy arrays or pandas dataframes

omegaml.runtimes

class omegaml.runtimes.OmegaRuntime(omega, bucket=None, defaults=None, celeryconf=None)

omegaml compute cluster gateway

model(modelname, require=None)

return a model for remote execution

Parameters:
  • modelname (str) – the name of the object in om.models

  • require (dict) – routing requirements for this job

Returns:

OmegaModelProxy

class omegaml.runtimes.OmegaRuntimeDask(omega, dask_url=None)

omegaml compute cluster gateway to a dask distributed cluster

set environ DASK_DEBUG=1 to run dask tasks locally

model(modelname)

return a model for remote execution

class omegaml.runtimes.OmegaModelProxy(modelname, runtime=None)

proxy to a remote model in a celery worker

The proxy provides the same methods as the model but will execute the methods using celery tasks and return celery AsyncResult objects

Usage:

om = Omega()
# train a model
# result is AsyncResult, use .get() to return it's result
result = om.runtime.model('foo').fit('datax', 'datay')
result.get()

# predict
result = om.runtime.model('foo').predict('datax')
# result is AsyncResult, use .get() to return it's result
print result.get()

Notes

The actual methods of ModelProxy are defined in its mixins

See also

  • ModelMixin

  • GridSearchMixin

experiment(experiment=None, label=None, provider=None, **tracker_kwargs)

return the experiment for this model

If an experiment does not exist yet, it will be created. The experiment is automatically set to track this model, unless another experiment has already been set to track this model for the same label. If a previous model has been set to track this model it will be returned. If an experiment name is passed it will be used.

Parameters:
  • experiment (str) – the experiment name, defaults to the modelname

  • label (str) – the runtime label, defaults to ‘default’

  • provider (str) – the provider to use, defaults to ‘default’

  • tracker_kwargs (dict) – additional kwargs to pass to the tracker

Returns:

OmegaTrackingProxy() instance

experiments(label=None, raw=False)

return list of experiments tracking this model

Parameters:
  • label (None|str) – the label for which to return the experiments, or None for all

  • raw (bool) – if True return the metadata for the experiment, else return the OmegaTrackingProxy

Returns:

mapping of label => instance of OmegaTrackingProxy if not raw, else Metadata,

includes a dummy label ‘_all_’, listing all experiments that track this model.

Return type:

instances (dict)

Changed in version 0.17: returns a dict instead of a list

task(name)

return the task from the runtime with requirements applied

class omegaml.runtimes.OmegaJobProxy(jobname, runtime=None)

proxy to a remote job in a celery worker

Usage:

om = Omega()
# result is AsyncResult, use .get() to return it's result
result = om.runtime.job('foojob').run()
result.get()

# result is AsyncResult, use .get() to return it's result
result = om.runtime.job('foojob').schedule()
result.get()
run(timeout=None, **kwargs)

submit the job for immediate execution

Parameters:
  • timeout (int) – optional, timeout in seconds

  • **kwargs – kwargs to CeleryTask.delay

Returns:

AsyncResult

schedule(**kwargs)

schedule the job for repeated execution

Parameters:

**kwargs – see OmegaJob.schedule()

omegaml.jobs

class omegaml.notebook.jobs.OmegaJobs(bucket=None, prefix=None, store=None, defaults=None)

Omega Jobs API

run(name, event=None, timeout=None)

Run a job immediately

The job is run and the results are stored in om.jobs(‘results/name <timestamp>’) and the result’s Metadata is returned.

Metadata.attributes of the original job as given by name is updated:

  • attributes['job_runs'] (list) - list of status of each run. Status is

    a dict as below

  • attributes['job_results'] (list) - list of results job names in same

    index-order as job_runs

  • attributes['trigger'] (list) - list of triggers

The status of each job run is a dict with keys:

  • status (str): the status of the job run, OK or ERROR

  • ts (datetime): time of execution

  • message (str): error mesasge in case of ERROR, else blank

  • results (str): name of results in case of OK, else blank

Usage:

# directly (sync)
meta = om.jobs.run('mynb')

# via runtime (async)
job = om.runtime.job('mynb')
result = job.run()
Parameters:
  • name (str) – the name of the jobfile

  • event (str) – an event name

  • timeout (int) – timeout in seconds, None means no timeout

Returns:

Metadata of the results entry

See also

  • OmegaJobs.run_notebook

run_notebook(name, event=None, timeout=None)

run a given notebook immediately.

Parameters:
  • name (str) – the name of the jobfile

  • event (str) – an event name

  • timeout (int) – timeout in seconds

Returns:

Metadata of results

schedule(nb_file, run_at=None, last_run=None)

Schedule a processing of a notebook as per the interval specified on the job script

Notes

This updates the notebook’s Metadata entry by adding the next scheduled run in attributes['triggers']`

Parameters:
  • nb_file (str) – the name of the notebook

  • run_at (str|dict|JobSchedule) – the schedule specified in a format suitable for JobSchedule. If not specified, this value is extracted from the first cell of the notebook

  • last_run (datetime) – the last time this job was run, use this to reschedule the job for the next run. Defaults to the last timestamp listed in attributes['job_runs'], or datetime.utcnow() if no previous run exists.

See also

omegaml.mdataframe

class omegaml.mdataframe.MDataFrame(collection, columns=None, query=None, limit=None, skip=None, sort_order=None, force_columns=None, immediate_loc=False, auto_inspect=False, normalize=False, raw=False, parser=None, preparefn=None, from_loc_range=False, metadata=None, **kwargs)

A DataFrame for mongodb

Performs out-of-core, lazy computOation on a mongodb cluster. Behaves like a pandas DataFrame. Actual results are returned as pandas DataFrames.

__len__()

the projected number of rows when resolving

create_index(keys, **kwargs)

create and index the easy way

groupby(columns, sort=True)

Group by a given set of columns

Parameters:
  • columns – the list of columns

  • sort – if True sort by group key

Returns:

MGrouper

head(limit=10)

return up to limit numbers of rows

Parameters:

limit – the number of rows to return. Defaults to 10

Returns:

the MDataFrame

inspect(explain=False, cached=False, cursor=None, raw=False)

inspect this dataframe’s actual mongodb query

Parameters:

explain – if True explains access path

property loc

Access by index

Use as mdf.loc[index_value]

Returns:

MLocIndexer

merge(right, on=None, left_on=None, right_on=None, how='inner', target=None, suffixes=('_x', '_y'), sort=False, inspect=False, filter=None)

merge this dataframe with another dataframe. only left outer joins are currently supported. the output is saved as a new collection, target name (defaults to a generated name if not specified).

Parameters:
  • right – the other MDataFrame

  • on – the list of key columns to merge by

  • left_on – the list of the key columns to merge on this dataframe

  • right_on – the list of the key columns to merge on the other dataframe

  • how – the method to merge. supported are left, inner, right. Defaults to inner

  • target – the name of the collection to store the merge results in. If not provided a temporary name will be created.

  • suffixes – the suffixes to apply to identical left and right columns

  • sort – if True the merge results will be sorted. If False the MongoDB natural order is implied.

Returns:

the MDataFrame to the target MDataFrame

query(*args, **kwargs)

return a new MDataFrame with a filter criteria

Any subsequent operation on the new dataframe will have the filter applied. To reset the filter call .reset() without arguments.

Note: Unlike pandas DataFrames, a filtered MDataFrame operates on the same collection as the original DataFrame

Parameters:
  • args – a Q object or logical combination of Q objects (optional)

  • kwargs – all AND filter criteria

Returns:

a new MDataFrame with the filter applied

query_inplace(*args, **kwargs)

filters this MDataFrame and returns it.

Any subsequent operation on the dataframe will have the filter applied. To reset the filter call .reset() without arguments.

Parameters:
  • args – a Q object or logical combination of Q objects (optional)

  • kwargs – all AND filter criteria

Returns:

self

skip(topn)

skip the topn number of rows

Parameters:

topn – the number of rows to skip.

Returns:

the MDataFrame

sort(columns)

sort by specified columns

Parameters:

columns – str of single column or a list of columns. Sort order is specified as the + (ascending) or - (descending) prefix to the column name. Default sort order is ascending.

Returns:

the MDataFrame

property value

resolve the query and return a Pandas DataFrame

Returns:

the result of the query as a pandas DataFrame

class omegaml.mdataframe.MSeries(*args, **kwargs)

Series implementation for MDataFrames

behaves like a DataFrame but limited to one column.

__len__()

the projected number of rows when resolving

count()

projected number of rows when resolving

create_index(keys, **kwargs)

create and index the easy way

groupby(columns, sort=True)

Group by a given set of columns

Parameters:
  • columns – the list of columns

  • sort – if True sort by group key

Returns:

MGrouper

head(limit=10)

return up to limit numbers of rows

Parameters:

limit – the number of rows to return. Defaults to 10

Returns:

the MDataFrame

inspect(explain=False, cached=False, cursor=None, raw=False)

inspect this dataframe’s actual mongodb query

Parameters:

explain – if True explains access path

iterchunks(chunksize=100)

return an iterator

Parameters:

chunksize (int) – number of rows in each chunk

Returns:

a dataframe of max. length chunksize

list_indexes()

list all indices in database

property loc

Access by index

Use as mdf.loc[index_value]

Returns:

MLocIndexer

merge(right, on=None, left_on=None, right_on=None, how='inner', target=None, suffixes=('_x', '_y'), sort=False, inspect=False, filter=None)

merge this dataframe with another dataframe. only left outer joins are currently supported. the output is saved as a new collection, target name (defaults to a generated name if not specified).

Parameters:
  • right – the other MDataFrame

  • on – the list of key columns to merge by

  • left_on – the list of the key columns to merge on this dataframe

  • right_on – the list of the key columns to merge on the other dataframe

  • how – the method to merge. supported are left, inner, right. Defaults to inner

  • target – the name of the collection to store the merge results in. If not provided a temporary name will be created.

  • suffixes – the suffixes to apply to identical left and right columns

  • sort – if True the merge results will be sorted. If False the MongoDB natural order is implied.

Returns:

the MDataFrame to the target MDataFrame

query(*args, **kwargs)

return a new MDataFrame with a filter criteria

Any subsequent operation on the new dataframe will have the filter applied. To reset the filter call .reset() without arguments.

Note: Unlike pandas DataFrames, a filtered MDataFrame operates on the same collection as the original DataFrame

Parameters:
  • args – a Q object or logical combination of Q objects (optional)

  • kwargs – all AND filter criteria

Returns:

a new MDataFrame with the filter applied

query_inplace(*args, **kwargs)

filters this MDataFrame and returns it.

Any subsequent operation on the dataframe will have the filter applied. To reset the filter call .reset() without arguments.

Parameters:
  • args – a Q object or logical combination of Q objects (optional)

  • kwargs – all AND filter criteria

Returns:

self

property shape

return shape of dataframe

skip(topn)

skip the topn number of rows

Parameters:

topn – the number of rows to skip.

Returns:

the MDataFrame

sort(columns)

sort by specified columns

Parameters:

columns – str of single column or a list of columns. Sort order is specified as the + (ascending) or - (descending) prefix to the column name. Default sort order is ascending.

Returns:

the MDataFrame

tail(limit=10)

return up to limit number of rows from last inserted values

Parameters:

limit

Returns:

unique()

return the unique set of values for the series

Returns:

MSeries

property value

return the value of the series

this is a Series unless unique() was called. If unique() only distinct values are returned as an array, matching the behavior of a Series

Returns:

pandas.Series

class omegaml.mdataframe.MGrouper(mdataframe, collection, columns, sort=True)

a Grouper for MDataFrames

agg(specs)

shortcut for .aggregate

aggregate(specs, **kwargs)

aggregate by given specs

See the following link for a list of supported operations. https://docs.mongodb.com/manual/reference/operator/aggregation/group/

Parameters:

specs – a dictionary of { column : function | list[functions] } pairs.

count()

return counts by group columns

class omegaml.mdataframe.MLocIndexer(mdataframe, positional=False)

implements the LocIndexer for MDataFrames

__getitem__(specs)

access by index

use as mdf.loc[specs] where specs is any of

  • a list or tuple of scalar index values, e.g. .loc[(1,2,3)]

  • a slice of values e.g. .loc[1:5]

  • a list of slices, e.g. .loc[1:5, 2:3]

Returns:

the sliced part of the MDataFrame

class omegaml.mdataframe.MPosIndexer(mdataframe)

implements the position-based indexer for MDataFrames

__getitem__(specs)

access by index

use as mdf.loc[specs] where specs is any of

  • a list or tuple of scalar index values, e.g. .loc[(1,2,3)]

  • a slice of values e.g. .loc[1:5]

  • a list of slices, e.g. .loc[1:5, 2:3]

Returns:

the sliced part of the MDataFrame

class omegaml.mixins.mdf.ApplyContext(caller, columns=None, index=None)

Enable apply functions

.apply(fn) will call fn(ctx) where ctx is an ApplyContext. The context supports methods to apply functions in a Pandas-style apply manner. ApplyContext is extensible by adding an extension class to defaults.OMEGA_MDF_APPLY_MIXINS.

Note that unlike a Pandas DataFrame, ApplyContext does not itself contain any data. Rather it is part of an expression tree, i.e. the aggregation pipeline. Thus any expressions applied are translated into operations on the expression tree. The expression tree is evaluated on MDataFrame.value, at which point the ApplyContext nor the function that created it are active.

Examples:

mdf.apply(lambda v: v * 5 ) => multiply every column in dataframe
mdf.apply(lambda v: v['foo'].dt.week) => get week of date for column foo
mdf.apply(lambda v: dict(a=v['foo'].dt.week,
                         b=v['bar'] * 5) => run multiple pipelines and get results

The callable passed to apply can be any function. It can either return None,
the context passed in or a list of pipeline stages.

# apply any of the below functions
mdf.apply(customfn)

# same as lambda v: v.dt.week
def customfn(ctx):
    return ctx.dt.week

# simple pipeline
def customfn(ctx):
    ctx.project(x={'$multiply: ['$x', 5]})
    ctx.project(y={'$divide: ['$x', 2]})

# complex pipeline
def customfn(ctx):
    return [
        { '$match': ... },
        { '$project': ... },
    ]
class omegaml.mixins.mdf.ApplyArithmetics

Math operators for ApplyContext

  • __mul__ (*)

  • __add__ (+)

  • __sub__ (-)

  • __div__ (/)

  • __floordiv__ (//)

  • __mod__ (%)

  • __pow__ (pow)

  • __ceil__ (ceil)

  • __floor__ (floor)

  • __trunc__ (trunc)

  • __abs__ (abs)

  • sqrt (math.sqrt)

__add__(other)

add

__mul__(other)

multiply

class omegaml.mixins.mdf.ApplyDateTime

Datetime operators for ApplyContext

class omegaml.mixins.mdf.ApplyString

String operators

class omegaml.mixins.mdf.ApplyAccumulators

Backends:

class omegaml.backends.sqlalchemy.SQLAlchemyBackend(model_store=None, data_store=None, tracking=None, **kwargs)

sqlalchemy plugin for omegaml

Usage:

Define your sqlalchemy connection:

sqlalchemy_constr = f'sqlalchemy://{user}:{password}@{account}/'

Store the connection in any of three ways:

# -- just the connection
om.datasets.put(sqlalchemy_constr, 'mysqlalchemy')
om.datasets.get('mysqlalchemy', raw=True)
=> the sql connection object

# -- store connection with a predefined sql
om.datasets.put(sqlalchemy_constr, 'mysqlalchemy',
                sql='select ....')
om.datasets.get('mysqlalchemy')
=> will return a pandas dataframe using the specified sql to run.
   specify chunksize= to return an interable of dataframes

# -- predefined sqls can contain variables to be resolved at access time
#    if you miss to specify required variables in sqlvars, a KeyError is raised
om.datasets.put(sqlaclhemy_constr, 'myview',
                sql='select ... from T1 where col="{var}"')
om.datasets.get('mysqlalchemy', sqlvars=dict(var="value"))

# -- Variables are replaced by binding parameters, which is safe for
#    untrusted inputs. To replace variables as strings, use double
#    `{{variable}}` notation. A warning will be issued because this
#    is considered an unsafe practice for untrusted input (REST API).
#    It is in your responsibility to sanitize the value of the `cols` variable.
om.datasets.put(sqlaclhemy_constr, 'myview',
                sql='select {{cols}} from T1 where col="{var}"')
om.datasets.get('mysqlalchemy', sqlvars=dict(cols='foo, bar',
                                             var="value"))

Query data from a connection and store into an omega-ml dataset:

# -- copy the result of the sqlalchemy query to omegaml
om.datasets.put(sqlalchemy_constr, 'mysqlalchemy',
                sql='select ...', copy=True)
om.datasets.get('mysqlalchemy')
=> will return a pandas dataframe (without executing any additional queries)
=> can also use with om.datasets.getl('mysqlalchemy') to return a MDataFrame

Controlling the table used in the connection:

# -- the default table is {bucket}_{name}, override using table='myname'
om.datasets.put(sqlalchemy_constr, 'mysqlalchemy',
                table='mytable',
                sql='select ...',
                copy=True)
om.datasets.get('mysqlalchemy') # read from {bucket}_myname

# -- to use a specific table, without bucket information use table=':myname'
om.datasets.put(sqlalchemy_constr, 'mysqlalchemy',
                table=':mytable',
                sql='select ...',
                copy=True)
om.datasets.get('mysqlalchemy') # read from myname

Inserting data via a previously stored connection:

# -- store data back through the connection
om.datasets.put(sqlalchemy_constr, 'mysqlalchemy')
om.datasets.put(df, 'mysqlalchemy',
                table='SOMETABLE')

Using variables in connection strings:

Connection strings may contain variables, e.g. userid and password. By default variables are resolved from the os environment. Can also specify using any dict.:

# -- use connection string with variables
sqlalchemy_constr = 'sqlite:///{dbname}.db'
om.datasets.put(sqlalchemy_constr, 'userdb')
om.datasets.get('userdb', secrets=dict(dbname='chuckdb'))

# -- alternatively, create a vault dataset:
secrets = dict(userid='chuck', dbname='chuckdb')
om.datasets.put(secrets, '.omega/vault')
om.datasets.get('userdb')

the '.omega/vault' dataset will be queried using the current userid as
the secret name, and the dbname retrieved from the document. This is
experimental and the vault is not encrypted.

Advanced:

om.datasets.put() supports the following additional keyword arguments

  • chunksize=int - specify the number of rows to read from sqlalchemy in one chunk. defaults to 10000

  • parse_dates=['col', ...] - list of column names to parse for date, time or datetime. see pd.read_sql for details

  • transform=callable - a callable, is passed the DataFrame of each chunk before it is inserted into the database. use to provide custom transformations. only works on copy=True

  • any other kwargs supported by pandas.read_sql