Public API

Python API (overview)

omegaml.datasets the OmegaStore for datasets
omegaml.models the OmegaStore for models
omegaml.runtimes
omegaml.jobs
omegaml.scripts the OmegaStore for lambda scripts
omegaml.store.base.OmegaStore([mongo_url, …]) The storage backend for models and data
omegaml.runtimes.OmegaRuntime(omega[, …]) omegaml compute cluster gateway
omegaml.runtimes.OmegaRuntimeDask(omega[, …]) omegaml compute cluster gateway to a dask distributed cluster
omegaml.runtimes.OmegaModelProxy(modelname) proxy to a remote model in a celery worker
omegaml.runtimes.OmegaJobProxy(jobname[, …]) proxy to a remote job in a celery worker
omegaml.jobs.OmegaJobs([prefix, store, defaults]) Omega Jobs API
omegaml.mdataframe.MDataFrame(collection[, …]) A DataFrame for mongodb
omegaml.mdataframe.MGrouper(mdataframe, …) a Grouper for MDataFrames
omegaml.mdataframe.MLocIndexer(mdataframe[, …]) implements the LocIndexer for MDataFrames
omegaml.mdataframe.MPosIndexer(mdataframe) implements the position-based indexer for MDataFrames

Python API

omega|ml

omegaml.datasets - storage for data

the OmegaStore for datasets

omegaml.models - storage for models

the OmegaStore for models

omegaml.runtimes - the cluster runtime API
omegaml.jobs - the lambda compute service

the jobs API

omegaml.store

class omegaml.store.base.OmegaStore(mongo_url=None, bucket=None, prefix=None, kind=None, defaults=None)

The storage backend for models and data

collection(name=None)

Returns a mongo db collection as a datastore

Parameters:name – the collection to use. if none defaults to the collection name given on instantiation. the actual collection name used is always prefix + name + ‘.data’
drop(name, force=False, version=-1)

Drop the object

Parameters:
  • name – The name of the object
  • force – If True ignores DoesNotExist exception, defaults to False meaning this raises a DoesNotExist exception of the name does not exist
Returns:

True if object was deleted, False if not. If force is True and the object does not exist it will still return True

fs

Retrieve a gridfs instance using url and collection provided

Returns:a gridfs instance
get(name, version=-1, force_python=False, **kwargs)

Retrieve an object

Parameters:
  • name – The name of the object
  • version – Version of the stored object (not supported)
  • force_python – Return as a python object
  • kwargs – kwargs depending on object kind
Returns:

an object, estimator, pipelines, data array or pandas dataframe previously stored with put()

get_backend(name, model_store=None, data_store=None, **kwargs)

return the backend by a given object name

Parameters:
  • kind – The object kind
  • model_store – the OmegaStore instance used to store models
  • data_store – the OmegaStore instance used to store data
  • kwargs – the kwargs passed to the backend initialization
Returns:

the backend

get_backend_bykind(kind, model_store=None, data_store=None, **kwargs)

return the backend by a given object kind

Parameters:
  • kind – The object kind
  • model_store – the OmegaStore instance used to store models
  • data_store – the OmegaStore instance used to store data
  • kwargs – the kwargs passed to the backend initialization
Returns:

the backend

get_dataframe_dfgroup(name, version=-1, kwargs=None)

Return a grouped dataframe

Parameters:
  • name – the name of the object
  • version – not supported
  • kwargs – mongo db query arguments to be passed to collection.find() as a filter.
get_dataframe_documents(name, columns=None, lazy=False, filter=None, version=-1, is_series=False, **kwargs)

Internal method to return DataFrame from documents

Parameters:
  • name – the name of the object (str)
  • columns – the column projection as a list of column names
  • lazy – if True returns a lazy representation as an MDataFrame. If False retrieves all data and returns a DataFrame (default)
  • filter – the filter to be applied as a column__op=value dict
  • version – the version to retrieve (not supported)
  • is_series – if True retruns a Series instead of a DataFrame
  • kwargs – remaining kwargs are used a filter. The filter kwarg overrides other kwargs.
Returns:

the retrieved object (DataFrame, Series or MDataFrame)

get_dataframe_hdf(name, version=-1)

Retrieve dataframe from hdf

Parameters:
  • name – The name of object
  • version – The version of object (not supported)
Returns:

Returns a python pandas dataframe

Raises:

gridfs.errors.NoFile

get_object_as_python(meta, version=-1)

Retrieve object as python object

Parameters:
  • meta – The metadata object
  • version – The version of the object
Returns:

Returns data as python object

get_python_data(name, version=-1)

Retrieve objects as python data

Parameters:
  • name – The name of object
  • version – The version of object
Returns:

Returns the object as python list object

getl(*args, **kwargs)

return a lazy MDataFrame for a given object

Same as .get, but returns a MDataFrame

list(pattern=None, regexp=None, kind=None, raw=False, include_temp=False, bucket=None, prefix=None)

List all files in store

specify pattern as a unix pattern (e.g. models/*, or specify regexp)

Parameters:
  • pattern – the unix file pattern or None for all
  • regexp – the regexp. takes precedence over pattern
  • raw – if True return the meta data objects
Returns:

List of files in store

make_metadata(name, kind, bucket=None, prefix=None, **kwargs)

create or update a metadata object

this retrieves a Metadata object if it exists given the kwargs. Only the name, prefix and bucket arguments are considered

for existing Metadata objects, the attributes kw is treated as follows:

  • attributes=None, the existing attributes are left as is
  • attributes={}, the attributes value on an existing metadata object is reset to the empty dict
  • attributes={ some : value }, the existing attributes are updated

For new metadata objects, attributes defaults to {} if not specified, else is set as provided.

Parameters:
  • name – the object name
  • bucket – the bucket, optional, defaults to self.bucket
  • prefix – the prefix, optional, defaults to self.prefix
metadata(name=None, bucket=None, prefix=None, version=-1)

Returns a metadata document for the given entry name

FIXME: version attribute does not do anything FIXME: metadata should be stored in a bucket-specific collection to enable access control, see https://docs.mongodb.com/manual/reference/method/db.createRole/#db.createRole

mongodb

Returns a mongo database object

object_store_key(name, ext)

Returns the store key

Parameters:
  • name – The name of object
  • ext – The extension of the filename
Returns:

A filename with relative bucket, prefix and name

put(obj, name, attributes=None, **kwargs)

Stores an objecs, store estimators, pipelines, numpy arrays or pandas dataframes

put_dataframe_as_dfgroup(obj, name, groupby, attributes=None)

store a dataframe grouped by columns in a mongo document

Example:> # each group > { > #group keys > key: val, > _data: [ > # only data keys > { key: val, … } > ]}
put_dataframe_as_documents(obj, name, append=None, attributes=None, index=None, timestamp=None)

store a dataframe as a row-wise collection of documents

Parameters:
  • obj – the dataframe to store
  • name – the name of the item in the store
  • append – if False collection will be dropped before inserting, if True existing documents will persist. Defaults to True. If not specified and rows have been previously inserted, will issue a warning.
  • index – list of columns, using +, -, @ as a column prefix to specify ASCENDING, DESCENDING, GEOSPHERE respectively. For @ the column has to represent a valid GeoJSON object.
  • timestamp – if True or a field name adds a timestamp. If the value is a boolean or datetime, uses _created as the field name. The timestamp is always datetime.datetime.utcnow(). May be overriden by specifying the tuple (col, datetime).
Returns:

the Metadata object created

put_ndarray_as_hdf(obj, name, attributes=None)

store numpy array as hdf

this is hack, converting the array to a dataframe then storing it

put_pyobj_as_document(obj, name, attributes=None, append=True)

store a dict as a document

similar to put_dataframe_as_documents no data will be replaced by default. that is, obj is appended as new documents into the objects’ mongo collection. to replace the data, specify append=False.

put_pyobj_as_hdf(obj, name, attributes=None)

store list, tuple, dict as hdf

this requires the list, tuple or dict to be convertible into a dataframe

rebuild_params(kwargs, collection)

Returns a modified set of parameters for querying mongodb based on how the mongo document is structured and the fields the document is grouped by.

Note: Explicitly to be used with get_grouped_data only

Parameters:
  • kwargs – Mongo filter arguments
  • collection – The name of mongodb collection
Returns:

Returns a set of parameters as dictionary.

register_backend(kind, backend)

register a backend class

Parameters:
  • kind – (str) the backend kind
  • backend – (class) the backend class
register_backends()

register backends in defaults.OMEGA_STORE_BACKENDS

register_mixin(mixincls)

register a mixin class

Parameters:mixincls – (class) the mixin class
tmppath

return an instance-specific temporary path

omegaml.runtimes

class omegaml.runtimes.OmegaRuntime(omega, backend=None, broker=None, celerykwargs=None, celeryconf=None, defaults=None)

omegaml compute cluster gateway

job(jobname)

return a job for remote exeuction

model(modelname)

return a model for remote execution

ping(*args, **kwargs)

ping the runtimes

settings()

return the runtimes’s cluster settings

task(name)

retrieve the task function from the celery instance

we do it like this so we can per-OmegaRuntime instance celery configurations (as opposed to using the default app’s import, which seems to confuse celery)

class omegaml.runtimes.OmegaRuntimeDask(omega, dask_url=None)

omegaml compute cluster gateway to a dask distributed cluster

set environ DASK_DEBUG=1 to run dask tasks locally

job(jobname)

return a job for remote exeuction

model(modelname)

return a model for remote execution

settings()

return the runtimes’s cluster settings

task(name)

retrieve the task function from the task module

This retrieves the task function and wraps it into a DaskTask. DaskTask mimicks a celery task and is called on the cluster using .delay(), the same way we call a celery task. .delay() will return a DaskAsyncResult, supporting the celery .get() semantics. This way we can use the same proxy objects, as all they do is call .delay() and return an AsyncResult.

class omegaml.runtimes.OmegaModelProxy(modelname, runtime=None)

proxy to a remote model in a celery worker

The proxy provides the same methods as the model but will execute the methods using celery tasks and return celery AsyncResult objects

Usage:

om = Omega()
# train a model
# result is AsyncResult, use .get() to return it's result
result = om.runtime.model('foo').fit('datax', 'datay')
result.get()

# predict
result = om.runtime.model('foo').predict('datax')
# result is AsyncResult, use .get() to return it's result
print result.get()
apply_mixins()

apply mixins in defaults.OMEGA_RUNTIME_MIXINS

class omegaml.runtimes.OmegaJobProxy(jobname, runtime=None)

proxy to a remote job in a celery worker

Usage:

om = Omega()
# result is AsyncResult, use .get() to return it's result
result = om.runtime.job('foojob').run()
result.get()

# result is AsyncResult, use .get() to return it's result
result = om.runtime.job('foojob').schedule()
result.get()
run(**kwargs)

run the job

Returns:the result
schedule(**kwargs)

schedule the job

omegaml.jobs

class omegaml.jobs.OmegaJobs(prefix=None, store=None, defaults=None)

Omega Jobs API

create(code, name)

create a notebook from code

Parameters:
  • code – the code as a string
  • name – the name of the job to create
Returns:

the metadata object created

get(name)

Retrieve a notebook and return a NotebookNode

get_collection(collection)

returns the collection object

get_notebook_config(nb_filename)

returns the omegaml script config on the notebook’s first cell

list(jobfilter='.*', raw=False)

list all jobs matching filter. filter is a regex on the name of the ipynb entry. The default is all, i.e. .*

put(obj, name, attributes=None)

Store a NotebookNode

Parameters:
  • obj – the NotebookNode to store
  • name – the name of the notebook
run(name)

Run a job immediately

The job is run and the results are stored in the given filename

Parameters:name – the name of the jobfile
Returns:the metadata of the job
run_notebook(name)

run a given notebook immediately. the job parameter is the name of the job script as in ipynb. Inserts and returns the Metadata document for the job.

schedule(nb_file)

Schedule a processing of a notebook as per the interval specified on the job script

omegaml.mdataframe

class omegaml.mdataframe.MDataFrame(collection, columns=None, query=None, limit=None, skip=None, sort_order=None, force_columns=None, immediate_loc=False, auto_inspect=False, preparefn=None, **kwargs)

A DataFrame for mongodb

Performs out-of-core, lazy computation on a mongodb cluster. Behaves like a pandas DataFrame. Actual results are returned as pandas DataFrames.

__len__()

the projected number of rows when resolving

count()

projected number of rows when resolving

create_index(keys, **kwargs)

create and index the easy way

groupby(columns, sort=True)

Group by a given set of columns

Parameters:
  • columns – the list of columns
  • sort – if True sort by group key
Returns:

MGrouper

head(limit=10)

return up to limit numbers of rows

Parameters:limit – the number of rows to return. Defaults to 10
Returns:the MDataFrame
inspect(explain=False, cached=False, cursor=None, raw=False)

inspect this dataframe’s actual mongodb query

Parameters:explain – if True explains access path
list_indexes()

list all indices in database

loc

Access by index

Use as mdf.loc[index_value]

Returns:MLocIndexer
merge(right, on=None, left_on=None, right_on=None, how='inner', target=None, suffixes=('_x', '_y'), sort=False, inspect=False)

merge this dataframe with another dataframe. only left outer joins are currently supported. the output is saved as a new collection, target name (defaults to a generated name if not specified).

Parameters:
  • right – the other MDataFrame
  • on – the list of key columns to merge by
  • left_on – the list of the key columns to merge on this dataframe
  • right_on – the list of the key columns to merge on the other dataframe
  • how – the method to merge. supported are left, inner, right. Defaults to inner
  • target – the name of the collection to store the merge results in. If not provided a temporary name will be created.
  • suffixes – the suffixes to apply to identical left and right columns
  • sort – if True the merge results will be sorted. If False the MongoDB natural order is implied.
Returns:

the MDataFrame to the target MDataFrame

query(*args, **kwargs)

return a new MDataFrame with a filter criteria

Any subsequent operation on the new dataframe will have the filter applied. To reset the filter call .reset() without arguments.

Note: Unlike pandas DataFrames, a filtered MDataFrame operates on the same collection as the original DataFrame

Parameters:
  • args – a Q object or logical combination of Q objects (optional)
  • kwargs – all AND filter criteria
Returns:

a new MDataFrame with the filter applied

query_inplace(*args, **kwargs)

filters this MDataFrame and returns it.

Any subsequent operation on the dataframe will have the filter applied. To reset the filter call .reset() without arguments.

Parameters:
  • args – a Q object or logical combination of Q objects (optional)
  • kwargs – all AND filter criteria
Returns:

self

shape

return shape of dataframe

skip(topn)

skip the topn number of rows

Parameters:topn – the number of rows to skip.
Returns:the MDataFrame
sort(columns)

sort by specified columns

Parameters:columns – str of single column or a list of columns. Sort order is specified as the + (ascending) or - (descending) prefix to the column name. Default sort order is ascending.
Returns:the MDataFrame
tail(limit=10)

return up to limit number of rows from last inserted values

Parameters:limit
Returns:
value

resolve the query and return a Pandas DataFrame

Returns:the result of the query as a pandas DataFrame
class omegaml.mdataframe.MSeries(*args, **kwargs)

Series implementation for MDataFrames

behaves like a DataFrame but limited to one column.

__len__()

the projected number of rows when resolving

count()

projected number of rows when resolving

create_index(keys, **kwargs)

create and index the easy way

groupby(columns, sort=True)

Group by a given set of columns

Parameters:
  • columns – the list of columns
  • sort – if True sort by group key
Returns:

MGrouper

head(limit=10)

return up to limit numbers of rows

Parameters:limit – the number of rows to return. Defaults to 10
Returns:the MDataFrame
inspect(explain=False, cached=False, cursor=None, raw=False)

inspect this dataframe’s actual mongodb query

Parameters:explain – if True explains access path
list_indexes()

list all indices in database

loc

Access by index

Use as mdf.loc[index_value]

Returns:MLocIndexer
merge(right, on=None, left_on=None, right_on=None, how='inner', target=None, suffixes=('_x', '_y'), sort=False, inspect=False)

merge this dataframe with another dataframe. only left outer joins are currently supported. the output is saved as a new collection, target name (defaults to a generated name if not specified).

Parameters:
  • right – the other MDataFrame
  • on – the list of key columns to merge by
  • left_on – the list of the key columns to merge on this dataframe
  • right_on – the list of the key columns to merge on the other dataframe
  • how – the method to merge. supported are left, inner, right. Defaults to inner
  • target – the name of the collection to store the merge results in. If not provided a temporary name will be created.
  • suffixes – the suffixes to apply to identical left and right columns
  • sort – if True the merge results will be sorted. If False the MongoDB natural order is implied.
Returns:

the MDataFrame to the target MDataFrame

query(*args, **kwargs)

return a new MDataFrame with a filter criteria

Any subsequent operation on the new dataframe will have the filter applied. To reset the filter call .reset() without arguments.

Note: Unlike pandas DataFrames, a filtered MDataFrame operates on the same collection as the original DataFrame

Parameters:
  • args – a Q object or logical combination of Q objects (optional)
  • kwargs – all AND filter criteria
Returns:

a new MDataFrame with the filter applied

query_inplace(*args, **kwargs)

filters this MDataFrame and returns it.

Any subsequent operation on the dataframe will have the filter applied. To reset the filter call .reset() without arguments.

Parameters:
  • args – a Q object or logical combination of Q objects (optional)
  • kwargs – all AND filter criteria
Returns:

self

shape

return shape of dataframe

skip(topn)

skip the topn number of rows

Parameters:topn – the number of rows to skip.
Returns:the MDataFrame
sort(columns)

sort by specified columns

Parameters:columns – str of single column or a list of columns. Sort order is specified as the + (ascending) or - (descending) prefix to the column name. Default sort order is ascending.
Returns:the MDataFrame
tail(limit=10)

return up to limit number of rows from last inserted values

Parameters:limit
Returns:
unique()

return the unique set of values for the series

Returns:MSeries
value

return the value of the series

this is a Series unless unique() was called. If unique() only distinct values are returned as an array, matching the behavior of a Series

Returns:pandas.Series
class omegaml.mdataframe.MGrouper(mdataframe, collection, columns, sort=True)

a Grouper for MDataFrames

agg(specs)

shortcut for .aggregate

aggregate(specs)

aggregate by given specs

See the following link for a list of supported operations. https://docs.mongodb.com/manual/reference/operator/aggregation/group/

Parameters:specs – a dictionary of { column : function | list[functions] } pairs.
count()

return counts by group columns

class omegaml.mdataframe.MLocIndexer(mdataframe, positional=False)

implements the LocIndexer for MDataFrames

__getitem__(specs)

access by index

use as mdf.loc[specs] where specs is any of

  • a list or tuple of scalar index values, e.g. .loc[(1,2,3)]
  • a slice of values e.g. .loc[1:5]
  • a list of slices, e.g. .loc[1:5, 2:3]
Returns:the sliced part of the MDataFrame
class omegaml.mdataframe.MPosIndexer(mdataframe)

implements the position-based indexer for MDataFrames

class omegaml.mixins.mdf.ApplyContext(caller, columns=None, index=None)

Enable apply functions

.apply(fn) will call fn(ctx) where ctx is an ApplyContext. The context supports methods to apply functions in a Pandas-style apply manner. ApplyContext is extensible by adding an extension class to defaults.OMEGA_MDF_APPLY_MIXINS.

Note that unlike a Pandas DataFrame, ApplyContext does not itself contain any data. Rather it is part of an expression tree, i.e. the aggregation pipeline. Thus any expressions applied are translated into operations on the expression tree. The expression tree is evaluated on MDataFrame.value, at which point the ApplyContext nor the function that created it are active.

Examples:

mdf.apply(lambda v: v * 5 ) => multiply every column in dataframe
mdf.apply(lambda v: v['foo'].dt.week) => get week of date for column foo
mdf.apply(dict(a=v['foo'].dt.week,
               b=v['bar'] * 5) => run multiple pipelines and get results

The callable passed to apply can be any function. It can either return None,
the context passed in or a list of pipeline stages.

# apply any of the below functions
mdf.apply(customfn)

# same as lambda v: v.dt.week
def customfn(ctx):
    return ctx.dt.week

# simple pipeline
def customfn(ctx):
    ctx.project(x={'$multiply: ['$x', 5]})
    ctx.project(y={'$divide: ['$x', 2]})

# complex pipeline
def customfn(ctx):
    return [
        { '$match': ... },
        { '$project': ... },
    ]
add(stage)

Add a processing stage to the pipeline

see https://docs.mongodb.com/manual/meta/aggregation-quick-reference/

groupby(by, expr=None, append=None, **kwargs)

add a groupby accumulation using $group

Parameters:
  • by – the groupby columns, if provided as a list will be transformed
  • expr
  • append
  • kwargs
Returns:

project(expr=None, append=False, keep=False, **kwargs)

add a projection using $project

Parameters:
  • expr – the column-operator mapping
  • append – if True add a $project stage, otherwise add to existing
  • kwargs – if expr is None, the column-operator mapping as kwargs
Returns:

ApplyContext

class omegaml.mixins.mdf.ApplyArithmetics

Math operators for ApplyContext

  • __mul__ (*)
  • __add__ (+)
  • __sub__ (-)
  • __div__ (/)
  • __floordiv__ (//)
  • __mod__ (%)
  • __pow__ (pow)
  • __ceil__ (ceil)
  • __floor__ (floor)
  • __trunc__ (trunc)
  • __abs__ (abs)
  • sqrt (math.sqrt)
sqrt(other)

square root

class omegaml.mixins.mdf.ApplyDateTime

Datetime operators for ApplyContext

day

dayOfMonth

dayofweek

dayOfWeek

dayofyear

dayOfYear

hour
millisecond
minute
month
second
week

isoWeek

year
class omegaml.mixins.mdf.ApplyString

String operators

concat(other, *args)
index(other, *args)

indexOfBytes

split(other, *args)
strcasecmp(other, *args)
substr(other, *args)
class omegaml.mixins.mdf.ApplyAccumulators