Developer API

omega|ml

omegaml.datasets = OmegaStore(bucket=omegaml, prefix=data/)

the OmegaStore for datasets

omegaml.defaults = DefaultsContext({'OMEGA_BROKER': 'amqp://admin:foobar@localhost:5672//', 'OMEGA_BUCKET_FS_LEGACY': False, 'OMEGA_CELERY_CONFIG': {'CELERY_ACCEPT_CONTENT': ['pickle', 'json'], 'CELERY_TASK_SERIALIZER': 'pickle', 'CELERY_RESULT_SERIALIZER': 'pickle', 'CELERY_TASK_RESULT_EXPIRES': 3600, 'CELERY_DEFAULT_QUEUE': 'default', 'BROKER_URL': 'amqp://admin:foobar@localhost:5672//', 'BROKER_HEARTBEAT': 0, 'CELERY_RESULT_BACKEND': 'amqp', 'CELERY_ALWAYS_EAGER': False, 'CELERYBEAT_SCHEDULE': {'execute_scripts': {'task': 'omegaml.notebook.tasks.execute_scripts', 'schedule': 60}}, 'BROKER_USE_SSL': False}, 'OMEGA_CELERY_IMPORTS': ['omegaml', 'omegaml.notebook', 'omegaml.backends.package'], 'OMEGA_CONFIG_FILE': None, 'OMEGA_DISABLE_FRAMEWORKS': None, 'OMEGA_FRAMEWORKS': ['scikit-learn'], 'OMEGA_LOCAL_RUNTIME': False, 'OMEGA_LOG_DATASET': '.omega/logs', 'OMEGA_LOG_FORMAT': '%(asctime)s - %(name)s - %(levelname)s - %(message)s', 'OMEGA_MDF_APPLY_MIXINS': [('omegaml.mixins.mdf.ApplyArithmetics', 'MDataFrame,MSeries'), ('omegaml.mixins.mdf.ApplyDateTime', 'MDataFrame,MSeries'), ('omegaml.mixins.mdf.ApplyString', 'MDataFrame,MSeries'), ('omegaml.mixins.mdf.ApplyAccumulators', 'MDataFrame,MSeries')], 'OMEGA_MDF_MIXINS': [('omegaml.mixins.mdf.ApplyMixin', 'MDataFrame,MSeries'), ('omegaml.mixins.mdf.FilterOpsMixin', 'MDataFrame,MSeries'), ('omegaml.mixins.mdf.apply.ApplyStatistics', 'MDataFrame,MSeries'), ('omegaml.mixins.mdf.iotools.IOToolsMDFMixin', 'MDataFrame'), ('omegaml.mixins.mdf.ParallelApplyMixin', 'MDataFrame')], 'OMEGA_MONGO_COLLECTION': 'omegaml', 'OMEGA_MONGO_SSL_KWARGS': {'ssl': False, 'ssl_ca_certs': None}, 'OMEGA_MONGO_URL': 'mongodb://admin:foobar@localhost:27017/omega', 'OMEGA_NOTEBOOK_COLLECTION': 'ipynb', 'OMEGA_RESULT_BACKEND': 'amqp', 'OMEGA_RUNTIME_MIXINS': ['omegaml.runtimes.mixins.ModelMixin', 'omegaml.runtimes.mixins.GridSearchMixin'], 'OMEGA_STORE_BACKENDS': {'sklearn.joblib': <class 'omegaml.backends.scikitlearn.ScikitLearnBackend'>, 'ndarray.bin': <class 'omegaml.backends.npndarray.NumpyNDArrayBackend'>, 'virtualobj.dill': <class 'omegaml.backends.virtualobj.VirtualObjectBackend'>, 'pandas.rawdict': <class 'omegaml.backends.rawdict.PandasRawDictBackend'>, 'python.file': <class 'omegaml.backends.rawfiles.PythonRawFileBackend'>, 'python.package': <class 'omegaml.backends.package.localpip.PythonPackageData'>, 'pipsrc.package': <class 'omegaml.backends.package.remotepip.PythonPipSourcedPackageData'>, 'pandas.csv': <class 'omegaml.backends.externaldata.PandasExternalData'>, 'sqlalchemy.conx': <class 'omegaml.backends.sqlalchemy.SQLAlchemyBackend'>}, 'OMEGA_STORE_BACKENDS_DASH': {'python.dash': 'omegaml.backends.dashapp.DashAppBackend'}, 'OMEGA_STORE_BACKENDS_KERAS': {'keras.h5': 'omegaml.backends.keras.KerasBackend'}, 'OMEGA_STORE_BACKENDS_SQL': {'sqlalchemy.conx': 'omegaml.backends.sqlalchemy.SQLAlchemyBackend'}, 'OMEGA_STORE_BACKENDS_TENSORFLOW': {'tfkeras.h5': 'omegaml.backends.tensorflow.TensorflowKerasBackend', 'tfkeras.savedmodel': 'omegaml.backends.tensorflow.TensorflowKerasSavedModelBackend', 'tf.savedmodel': 'omegaml.backends.tensorflow.TensorflowSavedModelBackend', 'tfestimator.model': 'omegaml.backends.tensorflow.TFEstimatorModelBackend'}, 'OMEGA_STORE_MIXINS': ['omegaml.mixins.store.ProjectedMixin', 'omegaml.mixins.store.LazyGetMixin', 'omegaml.mixins.store.virtualobj.VirtualObjectMixin', 'omegaml.mixins.store.package.PythonPackageMixin', 'omegaml.mixins.store.promotion.PromotionMixin', 'omegaml.mixins.mdf.iotools.IOToolsStoreMixin', 'omegaml.mixins.store.modelversion.ModelVersionMixin'], 'OMEGA_TASK_ROUTING_ENABLED': False, 'OMEGA_TMP': '/tmp', 'OMEGA_USER_EXTENSIONS': None, 'OMEGA_USESSL': False, 'OMEGA_WORKER_INCLUSTER': False})

the settings object

omegaml.jobs = OmegaJobs(store=OmegaStore(bucket=omegaml, prefix=jobs/))

the jobs API

omegaml.logger = <omegaml.store.logging.OmegaSimpleLogger object>

the OmegaSimpleLogger for easy log access

omegaml.models = OmegaStore(bucket=omegaml, prefix=models/)

the OmegaStore for models

omegaml.runtime = OmegaRuntime(Omega())

the OmegaRuntime for cluster execution

omegaml.scripts = OmegaStore(bucket=omegaml, prefix=scripts/)

the OmegaStore for lambda scripts

omegaml.store

Native storage for OmegaML using mongodb as the storage layer

An OmegaStore instance is a MongoDB database. It has at least the metadata collection which lists all objects stored in it. A metadata document refers to the following types of objects (metadata.kind):

  • pandas.dfrows - a Pandas DataFrame stored as a collection of rows

  • sklearn.joblib - a scikit learn estimator/pipline dumped using joblib.dump()

  • python.data - an arbitrary python dict, tuple, list stored as a document

Note that storing Pandas and scikit learn objects requires the availability of the respective packages. If either can not be imported, the OmegaStore degrades to a python.data store only. It will still .list() and get() any object, however reverts to pure python objects. In this case it is up to the client to convert the data into an appropriate format for processing.

Pandas and scikit-learn objects can only be stored if these packages are availables. put() raises a TypeError if you pass such objects and these modules cannot be loaded.

All data are stored within the same mongodb, in per-object collections as follows:

  • .metadata

    all metadata. each object is one document, See omegaml.documents.Metadata for details

  • .<bucket>.files

    this is the GridFS instance used to store blobs (models, numpy, hdf). The actual file name will be <prefix>/<name>.<ext>, where ext is optionally generated by put() / get().

  • .<bucket>.<prefix>.<name>.data

    every other dataset is stored in a separate collection (dataframes, dicts, lists, tuples). Any forward slash in prefix is ignored (e.g. ‘data/’ becomes ‘data’)

DataFrames by default are stored in their own collection, every row becomes a document. To store dataframes as a binary file, use put(…., as_hdf=True). .get() will always return a dataframe.

Python dicts, lists, tuples are stored as a single document with a .data attribute holding the JSON-converted representation. .get() will always return the corresponding python object of .data.

Models are joblib.dump()’ed and ziped prior to transferring into GridFs. .get() will always unzip and joblib.load() before returning the model. Note this requires that the process using .get() supports joblib as well as all python classes referred to. If joblib is not supported, .get() returns a file-like object.

The .metadata entry specifies the format used to store each object as well as it’s location:

  • metadata.kind

    the type of object

  • metadata.name

    the name of the object, as given on put()

  • metadata.gridfile

    the gridfs object (if any, null otherwise)

  • metadata.collection

    the name of the collection

  • metadata.attributes

    arbitrary custom attributes set in put(attributes=obj). This is used e.g. by OmegaRuntime’s fit() method to record the data used in the model’s training.

.put() and .get() use helper methods specific to the type in object’s type and metadata.kind, respectively. In the future a plugin system will enable extension to other types.

class omegaml.store.base.OmegaStore(mongo_url=None, bucket=None, prefix=None, kind=None, defaults=None, dbalias=None)

The storage backend for models and data

collection(name=None, bucket=None, prefix=None)

Returns a mongo db collection as a datastore

If there is an existing object of name, will return the .collection of the object. Otherwise returns the collection according to naming convention {bucket}.{prefix}.{name}.datastore

Parameters

name – the collection to use. if none defaults to the collection name given on instantiation. the actual collection name used is always prefix + name + ‘.data’

drop(name, force=False, version=-1)

Drop the object

Parameters
  • name – The name of the object

  • force – If True ignores DoesNotExist exception, defaults to False meaning this raises a DoesNotExist exception of the name does not exist

Returns

True if object was deleted, False if not. If force is True and the object does not exist it will still return True

property fs

Retrieve a gridfs instance using url and collection provided

Returns

a gridfs instance

get(name, version=-1, force_python=False, kind=None, **kwargs)

Retrieve an object

Parameters
  • name – The name of the object

  • version – Version of the stored object (not supported)

  • force_python – Return as a python object

  • kwargs – kwargs depending on object kind

Returns

an object, estimator, pipelines, data array or pandas dataframe previously stored with put()

get_backend(name, model_store=None, data_store=None, **kwargs)

return the backend by a given object name

Parameters
  • kind – The object kind

  • model_store – the OmegaStore instance used to store models

  • data_store – the OmegaStore instance used to store data

  • kwargs – the kwargs passed to the backend initialization

Returns

the backend

get_backend_bykind(kind, model_store=None, data_store=None, **kwargs)

return the backend by a given object kind

Parameters
  • kind – The object kind

  • model_store – the OmegaStore instance used to store models

  • data_store – the OmegaStore instance used to store data

  • kwargs – the kwargs passed to the backend initialization

Returns

the backend

get_backend_byobj(obj, name, kind=None, attributes=None, model_store=None, data_store=None, **kwargs)

return the matching backend for the given obj

Returns:

the first backend that supports the given parameters or None

get_dataframe_dfgroup(name, version=-1, kwargs=None)

Return a grouped dataframe

Parameters
  • name – the name of the object

  • version – not supported

  • kwargs – mongo db query arguments to be passed to collection.find() as a filter.

get_dataframe_documents(name, columns=None, lazy=False, filter=None, version=-1, is_series=False, chunksize=None, **kwargs)

Internal method to return DataFrame from documents

Parameters
  • name – the name of the object (str)

  • columns – the column projection as a list of column names

  • lazy – if True returns a lazy representation as an MDataFrame. If False retrieves all data and returns a DataFrame (default)

  • filter – the filter to be applied as a column__op=value dict

  • version – the version to retrieve (not supported)

  • is_series – if True retruns a Series instead of a DataFrame

  • kwargs – remaining kwargs are used a filter. The filter kwarg overrides other kwargs.

Returns

the retrieved object (DataFrame, Series or MDataFrame)

get_dataframe_hdf(name, version=-1)

Retrieve dataframe from hdf

Parameters
  • name – The name of object

  • version – The version of object (not supported)

Returns

Returns a python pandas dataframe

Raises

gridfs.errors.NoFile

get_object_as_python(meta, version=-1)

Retrieve object as python object

Parameters
  • meta – The metadata object

  • version – The version of the object

Returns

Returns data as python object

get_python_data(name, version=-1, **kwargs)

Retrieve objects as python data

Parameters
  • name – The name of object

  • version – The version of object

Returns

Returns the object as python list object

getl(*args, **kwargs)

return a lazy MDataFrame for a given object

Same as .get, but returns a MDataFrame

list(pattern=None, regexp=None, kind=None, raw=False, hidden=None, include_temp=False, bucket=None, prefix=None, filter=None)

List all files in store

specify pattern as a unix pattern (e.g. models/*, or specify regexp)

Parameters
  • pattern – the unix file pattern or None for all

  • regexp – the regexp. takes precedence over pattern

  • raw – if True return the meta data objects

  • filter – specify additional filter criteria, optional

Returns

List of files in store

make_metadata(name, kind, bucket=None, prefix=None, **kwargs)

create or update a metadata object

this retrieves a Metadata object if it exists given the kwargs. Only the name, prefix and bucket arguments are considered

for existing Metadata objects, the attributes kw is treated as follows:

  • attributes=None, the existing attributes are left as is

  • attributes={}, the attributes value on an existing metadata object is reset to the empty dict

  • attributes={ some : value }, the existing attributes are updated

For new metadata objects, attributes defaults to {} if not specified, else is set as provided.

Parameters
  • name – the object name

  • bucket – the bucket, optional, defaults to self.bucket

  • prefix – the prefix, optional, defaults to self.prefix

metadata(name=None, bucket=None, prefix=None, version=-1)

Returns a metadata document for the given entry name

FIXME: version attribute does not do anything FIXME: metadata should be stored in a bucket-specific collection to enable access control, see https://docs.mongodb.com/manual/reference/method/db.createRole/#db.createRole

property mongodb

Returns a mongo database object

object_store_key(name, ext, hashed=False)

Returns the store key

Unless you write a mixin or a backend you should not use this method

Parameters
  • name – The name of object

  • ext – The extension of the filename

  • hashed – hash the key to support arbitrary name length, defaults to False, will default to True in future versions

Returns

A filename with relative bucket, prefix and name

put(obj, name, attributes=None, kind=None, replace=False, **kwargs)

Stores an object, store estimators, pipelines, numpy arrays or pandas dataframes

put_dataframe_as_dfgroup(obj, name, groupby, attributes=None)

store a dataframe grouped by columns in a mongo document

Example

> # each group > { > #group keys > key: val, > _data: [ > # only data keys > { key: val, … } > ]}

put_dataframe_as_documents(obj, name, append=None, attributes=None, index=None, timestamp=None, chunksize=None)

store a dataframe as a row-wise collection of documents

Parameters
  • obj – the dataframe to store

  • name – the name of the item in the store

  • append – if False collection will be dropped before inserting, if True existing documents will persist. Defaults to True. If not specified and rows have been previously inserted, will issue a warning.

  • index – list of columns, using +, -, @ as a column prefix to specify ASCENDING, DESCENDING, GEOSPHERE respectively. For @ the column has to represent a valid GeoJSON object.

  • timestamp – if True or a field name adds a timestamp. If the value is a boolean or datetime, uses _created as the field name. The timestamp is always datetime.datetime.utcnow(). May be overriden by specifying the tuple (col, datetime).

Returns

the Metadata object created

put_ndarray_as_hdf(obj, name, attributes=None)

store numpy array as hdf

this is hack, converting the array to a dataframe then storing it

put_pyobj_as_document(obj, name, attributes=None, append=True)

store a dict as a document

similar to put_dataframe_as_documents no data will be replaced by default. that is, obj is appended as new documents into the objects’ mongo collection. to replace the data, specify append=False.

put_pyobj_as_hdf(obj, name, attributes=None)

store list, tuple, dict as hdf

this requires the list, tuple or dict to be convertible into a dataframe

rebuild_params(kwargs, collection)

Returns a modified set of parameters for querying mongodb based on how the mongo document is structured and the fields the document is grouped by.

Note: Explicitly to be used with get_grouped_data only

Parameters
  • kwargs – Mongo filter arguments

  • collection – The name of mongodb collection

Returns

Returns a set of parameters as dictionary.

register_backend(kind, backend)

register a backend class

Parameters
  • kind – (str) the backend kind

  • backend – (class) the backend class

register_backends()

register backends in defaults.OMEGA_STORE_BACKENDS

register_mixin(mixincls)

register a mixin class

Parameters

mixincls – (class) the mixin class

property tmppath

return an instance-specific temporary path

class omegaml.store.base.OmegaStore(mongo_url=None, bucket=None, prefix=None, kind=None, defaults=None, dbalias=None)

The storage backend for models and data

collection(name=None, bucket=None, prefix=None)

Returns a mongo db collection as a datastore

If there is an existing object of name, will return the .collection of the object. Otherwise returns the collection according to naming convention {bucket}.{prefix}.{name}.datastore

Parameters

name – the collection to use. if none defaults to the collection name given on instantiation. the actual collection name used is always prefix + name + ‘.data’

drop(name, force=False, version=-1)

Drop the object

Parameters
  • name – The name of the object

  • force – If True ignores DoesNotExist exception, defaults to False meaning this raises a DoesNotExist exception of the name does not exist

Returns

True if object was deleted, False if not. If force is True and the object does not exist it will still return True

property fs

Retrieve a gridfs instance using url and collection provided

Returns

a gridfs instance

get(name, version=-1, force_python=False, kind=None, **kwargs)

Retrieve an object

Parameters
  • name – The name of the object

  • version – Version of the stored object (not supported)

  • force_python – Return as a python object

  • kwargs – kwargs depending on object kind

Returns

an object, estimator, pipelines, data array or pandas dataframe previously stored with put()

get_backend(name, model_store=None, data_store=None, **kwargs)

return the backend by a given object name

Parameters
  • kind – The object kind

  • model_store – the OmegaStore instance used to store models

  • data_store – the OmegaStore instance used to store data

  • kwargs – the kwargs passed to the backend initialization

Returns

the backend

get_backend_bykind(kind, model_store=None, data_store=None, **kwargs)

return the backend by a given object kind

Parameters
  • kind – The object kind

  • model_store – the OmegaStore instance used to store models

  • data_store – the OmegaStore instance used to store data

  • kwargs – the kwargs passed to the backend initialization

Returns

the backend

get_backend_byobj(obj, name, kind=None, attributes=None, model_store=None, data_store=None, **kwargs)

return the matching backend for the given obj

Returns:

the first backend that supports the given parameters or None

get_dataframe_dfgroup(name, version=-1, kwargs=None)

Return a grouped dataframe

Parameters
  • name – the name of the object

  • version – not supported

  • kwargs – mongo db query arguments to be passed to collection.find() as a filter.

get_dataframe_documents(name, columns=None, lazy=False, filter=None, version=-1, is_series=False, chunksize=None, **kwargs)

Internal method to return DataFrame from documents

Parameters
  • name – the name of the object (str)

  • columns – the column projection as a list of column names

  • lazy – if True returns a lazy representation as an MDataFrame. If False retrieves all data and returns a DataFrame (default)

  • filter – the filter to be applied as a column__op=value dict

  • version – the version to retrieve (not supported)

  • is_series – if True retruns a Series instead of a DataFrame

  • kwargs – remaining kwargs are used a filter. The filter kwarg overrides other kwargs.

Returns

the retrieved object (DataFrame, Series or MDataFrame)

get_dataframe_hdf(name, version=-1)

Retrieve dataframe from hdf

Parameters
  • name – The name of object

  • version – The version of object (not supported)

Returns

Returns a python pandas dataframe

Raises

gridfs.errors.NoFile

get_object_as_python(meta, version=-1)

Retrieve object as python object

Parameters
  • meta – The metadata object

  • version – The version of the object

Returns

Returns data as python object

get_python_data(name, version=-1, **kwargs)

Retrieve objects as python data

Parameters
  • name – The name of object

  • version – The version of object

Returns

Returns the object as python list object

getl(*args, **kwargs)

return a lazy MDataFrame for a given object

Same as .get, but returns a MDataFrame

list(pattern=None, regexp=None, kind=None, raw=False, hidden=None, include_temp=False, bucket=None, prefix=None, filter=None)

List all files in store

specify pattern as a unix pattern (e.g. models/*, or specify regexp)

Parameters
  • pattern – the unix file pattern or None for all

  • regexp – the regexp. takes precedence over pattern

  • raw – if True return the meta data objects

  • filter – specify additional filter criteria, optional

Returns

List of files in store

make_metadata(name, kind, bucket=None, prefix=None, **kwargs)

create or update a metadata object

this retrieves a Metadata object if it exists given the kwargs. Only the name, prefix and bucket arguments are considered

for existing Metadata objects, the attributes kw is treated as follows:

  • attributes=None, the existing attributes are left as is

  • attributes={}, the attributes value on an existing metadata object is reset to the empty dict

  • attributes={ some : value }, the existing attributes are updated

For new metadata objects, attributes defaults to {} if not specified, else is set as provided.

Parameters
  • name – the object name

  • bucket – the bucket, optional, defaults to self.bucket

  • prefix – the prefix, optional, defaults to self.prefix

metadata(name=None, bucket=None, prefix=None, version=-1)

Returns a metadata document for the given entry name

FIXME: version attribute does not do anything FIXME: metadata should be stored in a bucket-specific collection to enable access control, see https://docs.mongodb.com/manual/reference/method/db.createRole/#db.createRole

property mongodb

Returns a mongo database object

object_store_key(name, ext, hashed=False)

Returns the store key

Unless you write a mixin or a backend you should not use this method

Parameters
  • name – The name of object

  • ext – The extension of the filename

  • hashed – hash the key to support arbitrary name length, defaults to False, will default to True in future versions

Returns

A filename with relative bucket, prefix and name

put(obj, name, attributes=None, kind=None, replace=False, **kwargs)

Stores an object, store estimators, pipelines, numpy arrays or pandas dataframes

put_dataframe_as_dfgroup(obj, name, groupby, attributes=None)

store a dataframe grouped by columns in a mongo document

Example

> # each group > { > #group keys > key: val, > _data: [ > # only data keys > { key: val, … } > ]}

put_dataframe_as_documents(obj, name, append=None, attributes=None, index=None, timestamp=None, chunksize=None)

store a dataframe as a row-wise collection of documents

Parameters
  • obj – the dataframe to store

  • name – the name of the item in the store

  • append – if False collection will be dropped before inserting, if True existing documents will persist. Defaults to True. If not specified and rows have been previously inserted, will issue a warning.

  • index – list of columns, using +, -, @ as a column prefix to specify ASCENDING, DESCENDING, GEOSPHERE respectively. For @ the column has to represent a valid GeoJSON object.

  • timestamp – if True or a field name adds a timestamp. If the value is a boolean or datetime, uses _created as the field name. The timestamp is always datetime.datetime.utcnow(). May be overriden by specifying the tuple (col, datetime).

Returns

the Metadata object created

put_ndarray_as_hdf(obj, name, attributes=None)

store numpy array as hdf

this is hack, converting the array to a dataframe then storing it

put_pyobj_as_document(obj, name, attributes=None, append=True)

store a dict as a document

similar to put_dataframe_as_documents no data will be replaced by default. that is, obj is appended as new documents into the objects’ mongo collection. to replace the data, specify append=False.

put_pyobj_as_hdf(obj, name, attributes=None)

store list, tuple, dict as hdf

this requires the list, tuple or dict to be convertible into a dataframe

rebuild_params(kwargs, collection)

Returns a modified set of parameters for querying mongodb based on how the mongo document is structured and the fields the document is grouped by.

Note: Explicitly to be used with get_grouped_data only

Parameters
  • kwargs – Mongo filter arguments

  • collection – The name of mongodb collection

Returns

Returns a set of parameters as dictionary.

register_backend(kind, backend)

register a backend class

Parameters
  • kind – (str) the backend kind

  • backend – (class) the backend class

register_backends()

register backends in defaults.OMEGA_STORE_BACKENDS

register_mixin(mixincls)

register a mixin class

Parameters

mixincls – (class) the mixin class

property tmppath

return an instance-specific temporary path

omegaml.backends

class omegaml.backends.basedata.BaseDataBackend(model_store=None, data_store=None, **kwargs)

OmegaML BaseDataBackend to be subclassed by other arbitrary backends

This provides the abstract interface for any data backend to be implemented

get(name, version=-1, force_python=False, lazy=False, **kwargs)

get an obj

Parameters

name – the name of the object (str)

Returns

the object as it was originally stored

getl(*args, **kwargs)

get an lazy implementation to access the obj

A lazy implementation is a proxy to the object that can be evaluated using the .value property. The proxy should ensure that any operations applied on the object are delayed until the .value property is accessed. Typically this is to ensure that the actual computation is executed on the cluster, not on the local machine.

Parameters

name – the name of the object (str)

Returns

the proxy to the object as it was originally stored

put(obj, name, attributes=None, **kwargs)

put an obj

Parameters
  • obj – the object to store (object)

  • name – the name of the object (str)

  • attributes – the attributes dict (dict, optional)

  • kwargs – other kwargs to be passed to the Metadata object

Returns

the Metadata object

classmethod supports(obj, name, **kwargs)

test if this backend supports this obj

class omegaml.backends.basemodel.BaseModelBackend(model_store=None, data_store=None, **kwargs)

OmegaML BaseModelBackend to be subclassed by other arbitrary backends

This provides the abstract interface for any model backend to be implemented Subclass to implement custom backends.

Essentially a model backend:

  • provides methods to serialize and deserialize a machine learning model for a given ML framework

  • offers fit() and predict() methods to be called by the runtime

  • offers additional methods such as score(), partial_fit(), transform()

Model backends are the middleware that connects the om.models API to specific frameworks. This class makes it simple to implement a model backend by offering a common syntax as well as a default implementation for get() and put().

Methods to implement:

# for model serialization (mandatory) @classmethod supports() - determine if backend supports given model instance _package_model() - serialize a model instance into a temporary file _extract_model() - deserialize the model from a file-like

Both methods provide readily set up temporary file names so that all you have to do is actually save the model to the given output file and restore the model from the given input file, respectively. All other logic has already been implemented (see get_model and put_model methods).

# for fitting and predicting (mandatory) fit() predict()

# other methods (optional) fit_transform() - fit and return a transformed dataset partial_fit() - fit incrementally predict_proba() - predict probabilities score() - score fitted classifier vv test dataset

fit(modelname, Xname, Yname=None, pure_python=True, **kwargs)

fit the model with data

Parameters
  • modelname – the name of the model object

  • Xname – the name of the X data set

  • Yname – the name of the Y data set

  • pure_python – if True return a python object. If False return a dataframe. Defaults to True to support any client.

  • kwargs – kwargs passed to the model’s predict method

Returns

return the meta data object of the model

fit_transform(modelname, Xname, Yname=None, rName=None, pure_python=True, **kwargs)

fit and transform using data

Parameters
  • modelname – the name of the model object

  • Xname – the name of the X data set

  • Yname – the name of the Y data set

  • rName – the name of the transforms’s result data object or None

  • pure_python – if True return a python object. If False return a dataframe. Defaults to True to support any client.

  • kwargs – kwargs passed to the model’s transform method

Returns

return the meta data object of the model

get(name, **kwargs)

retrieve a model

Parameters
  • name – the name of the object

  • version – the version of the object (not supported)

get_model(name, version=-1, **kwargs)

Retrieves a pre-stored model

partial_fit(modelname, Xname, Yname=None, pure_python=True, **kwargs)

partially fit the model with data (online)

Parameters
  • modelname – the name of the model object

  • Xname – the name of the X data set

  • Yname – the name of the Y data set

  • pure_python – if True return a python object. If False return a dataframe. Defaults to True to support any client.

  • kwargs – kwargs passed to the model’s predict method

Returns

return the meta data object of the model

predict(modelname, Xname, rName=None, pure_python=True, **kwargs)

predict using data stored in Xname

Parameters
  • modelname – the name of the model object

  • Xname – the name of the X data set

  • rName – the name of the result data object or None

  • pure_python – if True return a python object. If False return a dataframe. Defaults to True to support any client.

  • kwargs – kwargs passed to the model’s predict method

Returns

return the predicted outcome

predict_proba(modelname, Xname, rName=None, pure_python=True, **kwargs)

predict the probability using data stored in Xname

Parameters
  • modelname – the name of the model object

  • Xname – the name of the X data set

  • rName – the name of the result data object or None

  • pure_python – if True return a python object. If False return a dataframe. Defaults to True to support any client.

  • kwargs – kwargs passed to the model’s predict method

Returns

return the predicted outcome

put(obj, name, **kwargs)

store a model

Parameters
  • obj – the model object to be stored

  • name – the name of the object

  • attributes – attributes for meta data

put_model(obj, name, attributes=None, _kind_version=None, **kwargs)

Packages a model using joblib and stores in GridFS

score(modelname, Xname, Yname=None, rName=True, pure_python=True, **kwargs)

score using data

Parameters
  • modelname – the name of the model object

  • Xname – the name of the X data set

  • Yname – the name of the Y data set

  • rName – the name of the transforms’s result data object or None

  • pure_python – if True return a python object. If False return a dataframe. Defaults to True to support any client.

  • kwargs – kwargs passed to the model’s predict method

Returns

return the score result

classmethod supports(obj, name, **kwargs)

test if this backend supports this obj

transform(modelname, Xname, rName=None, **kwargs)

transform using data

Parameters
  • modelname – the name of the model object

  • Xname – the name of the X data set

  • rName – the name of the transforms’s result data object or None

  • kwargs – kwargs passed to the model’s transform method

Returns

return the transform data of the model

class omegaml.documents.Metadata

Metadata stores information about objects in OmegaStore

attributes

customer-defined other meta attributes

bucket

bucket

collection

for PANDAS_DFROWS this is the collection

created

created datetime

gridfile

for PANDAS_HDF and SKLEARN_JOBLIB this is the gridfile

kind

kind of data

kind_meta

omegaml technical attributes, e.g. column indicies

modified

created datetime

name

this is the name of the data

objid

for PYTHON_DATA this is the actual document

prefix

prefix

s3file

s3file attributes

uri

location URI

omegaml.mixins

class omegaml.mixins.store.ProjectedMixin

A OmegaStore mixin to process column specifications in dataset name

get(name, *args, **kwargs)

Return a projected dataset given a name of form name[colspec]

colspec can be any of

  • a comma separated list of columns, e.g. foo[a,b]

  • an open-ended slice, e.g. foo[a:] => all columns following a, inclusive

  • an closed slice, e.g. foo[a:b] => all columns between a,b, inclusive

  • a close-ended slice, e.g. foo[:b] => all columns up to b, inclusive

  • an empty slice, e.g. foo[:] => all columns

  • a list of columns to exclude, e.g. foo[^b] => all columns except b

Parameters

name – (str) the name of the dataset, optionally including a column specification

Returns

the dataset with projected columns

class omegaml.mixins.mdf.FilterOpsMixin

filter operators on MSeries

class omegaml.mixins.mdf.ApplyMixin(*args, **kwargs)

Implements the apply() mixin supporting arbitrary functions to build aggregation pipelines

Note that .apply() does not execute immediately. Instead it builds an aggregation pipeline that is executed on MDataFrame.value. Note that .apply() calls cannot be cascaded yet, i.e. a later .apply() will override a previous.apply().

See ApplyContext for usage examples.

persist()

Execute and store results in cache

Any pipeline of the same operations, in the same order, on the same collection will return the same result.

reset_cache(full=False)

Reset the apply cache

Parameters

full – if True will reset all caches for the collection, if False will only remove the cache for the specific .apply operations

Returns

class omegaml.mixins.mdf.ApplyArithmetics

Math operators for ApplyContext

  • __mul__ (*)

  • __add__ (+)

  • __sub__ (-)

  • __div__ (/)

  • __floordiv__ (//)

  • __mod__ (%)

  • __pow__ (pow)

  • __ceil__ (ceil)

  • __floor__ (floor)

  • __trunc__ (trunc)

  • __abs__ (abs)

  • sqrt (math.sqrt)

__pow_ = None

pow

sqrt(other)

square root

class omegaml.mixins.mdf.ApplyDateTime

Datetime operators for ApplyContext

property day

dayOfMonth

property dayofweek

dayOfWeek

property dayofyear

dayOfYear

property hour
property millisecond
property minute
property month
property second
property week

isoWeek

property year
class omegaml.mixins.mdf.ApplyString

String operators

concat(other, *args)
index(other, *args)

indexOfBytes

split(other, *args)
strcasecmp(other, *args)
substr(other, *args)
usplit(other, *args)

split

class omegaml.mixins.mdf.ApplyAccumulators

omegaml.runtimes

class omegaml.runtimes.OmegaRuntime(omega, bucket=None, defaults=None, celeryconf=None)

omegaml compute cluster gateway

job(jobname, require=None)

return a job for remote exeuction

Args:

require (dict): routing requirements for this job

model(modelname, require=None)

return a model for remote execution

Args:

require (dict): routing requirements for this job

ping(require=None, *args, **kwargs)

ping the runtimes

Args:

require (dict): routing requirements for this job args (tuple): task args kwargs (dict): task kwargs

require(label=None, always=False, **kwargs)

specify requirements for the task execution

Use this to specify resource or routing requirements on the next task call sent to the runtime. Any requirements will be reset after the call has been submitted.

Args:

always (bool): if True requirements will persist across task calls. defaults to False label (str): the label required by the worker to have a runtime task dispatched to it kwargs: requirements specification that the runtime understands

Usage:

om.runtime.require(label=’gpu’).model(‘foo’).fit(…)

Returns:

self

script(scriptname, require=None)

return a script for remote execution

Args:

require (dict): routing requirements for this job

settings(require=None)

return the runtimes’s cluster settings

task(name)

retrieve the task function from the celery instance

Args:

kwargs (dict): routing keywords to CeleryTask.apply_async

class omegaml.runtimes.OmegaModelProxy(modelname, runtime=None)

proxy to a remote model in a celery worker

The proxy provides the same methods as the model but will execute the methods using celery tasks and return celery AsyncResult objects

Usage:

om = Omega()
# train a model
# result is AsyncResult, use .get() to return it's result
result = om.runtime.model('foo').fit('datax', 'datay')
result.get()

# predict
result = om.runtime.model('foo').predict('datax')
# result is AsyncResult, use .get() to return it's result
print result.get()
apply_mixins()

apply mixins in defaults.OMEGA_RUNTIME_MIXINS

task(name)

return the task from the runtime with requirements applied

class omegaml.runtimes.OmegaJobProxy(jobname, runtime=None)

proxy to a remote job in a celery worker

Usage:

om = Omega()
# result is AsyncResult, use .get() to return it's result
result = om.runtime.job('foojob').run()
result.get()

# result is AsyncResult, use .get() to return it's result
result = om.runtime.job('foojob').schedule()
result.get()
run(**kwargs)

run the job

Returns

the result

schedule(**kwargs)

schedule the job

class omegaml.runtimes.OmegaRuntimeDask(omega, dask_url=None)

omegaml compute cluster gateway to a dask distributed cluster

set environ DASK_DEBUG=1 to run dask tasks locally

job(jobname)

return a job for remote exeuction

model(modelname)

return a model for remote execution

settings()

return the runtimes’s cluster settings

task(name, **kwargs)

retrieve the task function from the task module

This retrieves the task function and wraps it into a DaskTask. DaskTask mimicks a celery task and is called on the cluster using .delay(), the same way we call a celery task. .delay() will return a DaskAsyncResult, supporting the celery .get() semantics. This way we can use the same proxy objects, as all they do is call .delay() and return an AsyncResult.

omegaml.documents

class omegaml.documents.Metadata

Metadata stores information about objects in OmegaStore

attributes

customer-defined other meta attributes

bucket

bucket

collection

for PANDAS_DFROWS this is the collection

created

created datetime

gridfile

for PANDAS_HDF and SKLEARN_JOBLIB this is the gridfile

kind

kind of data

kind_meta

omegaml technical attributes, e.g. column indicies

modified

created datetime

name

this is the name of the data

objid

for PYTHON_DATA this is the actual document

prefix

prefix

s3file

s3file attributes

uri

location URI

omegaml.jobs

class omegaml.notebook.jobs.OmegaJobs(prefix=None, store=None, defaults=None)

Omega Jobs API

create(code, name)

create a notebook from code

Parameters
  • code – the code as a string

  • name – the name of the job to create

Returns

the metadata object created

drop_schedule(name)

Drop an existing schedule, if any

This will drop any existing schedule and any pending triggers of event-kind ‘scheduled’.

Args:

name (str): the name of the job

Returns:

Metadata

export(name, localpath, format='html')

Export a job or result file to HTML

The job is exported in the given format.

Parameters
  • name – the name of the job, as in jobs.get

  • localpath – the path of the local file to write. If you specify an empty path or ‘memory’ a tuple of (body, resource) is returned instead

  • format – the output format. currently only 'html' is supported

Returns

the (data, resources) tuple as returned by nbconvert. For format html data is the HTML’s body, for PDF it is the pdf file contents

get(name)

Retrieve a notebook and return a NotebookNode

get_collection(collection)

returns the collection object

get_notebook_config(nb_filename)

returns the omegaml script config on the notebook’s first cell

If there is no config cell or the config cell is invalid raises a ValueError

get_schedule(name, only_pending=False)

return the cron schedule and corresponding triggers

Args:

name (str): the name of the job

Returns:

tuple of (run_at, triggers)

run_at (str): the cron spec, None if not scheduled triggers (list): the list of triggers

list(pattern=None, regexp=None, raw=False, **kwargs)

list all jobs matching filter. filter is a regex on the name of the ipynb entry. The default is all, i.e. .*

put(obj, name, attributes=None)

Store a NotebookNode

Parameters
  • obj – the NotebookNode to store

  • name – the name of the notebook

run(name)

Run a job immediately

The job is run and the results are stored in the given filename

Parameters

name – the name of the jobfile

Returns

the metadata of the job

run_notebook(name, event=None)

run a given notebook immediately. the job parameter is the name of the job script as in ipynb. Inserts and returns the Metadata document for the job.

schedule(nb_file, run_at=None, last_run=None)

Schedule a processing of a notebook as per the interval specified on the job script

omegajobs

class omegaml.notebook.omegacontentsmgr.OmegaStoreContentsManager(**kwargs)

Jupyter notebook storage manager for omegaml

Adopted from notebook/services/contents/filemanager.py

This requires a properly configured omegaml instance. see http://jupyter-notebook.readthedocs.io/en/stable/extending/contents.html

delete_file(path)

delete an entry

this is called by the contents engine to delete an entry

dir_exists(path='')

check if directory exists

Args:

path: name of directory

Returns:

True if directory exists

exists(path)

Does a file or dir exist at the given collection in gridFS? We do not have dir so dir_exists returns true.

Parameters

path – (str) The relative path to the file’s directory (with ‘/’ as separator)

Returns exists

(boo) The relative path to the file’s directory (with ‘/’ as separator)

file_exists(path)

check if file exists

Args:

path: name of file

Returns:

True if file exists

get(path, content=True, type=None, format=None)

get an entry in the store

this is called by the contents engine to get the contents of the jobs store.

is_hidden(path)

check if path or file is hidden

Args:

path: name of file or path

Returns:

False, currently always returns false

property omega

return the omega instance used by the contents manager

rename_file(old_path, new_path)

rename a file

this is called by the contents engine to rename an entry

save(model, path)

save an entry in the store

this is called by the contents engine to store a notebook

property store

return the OmageStore for jobs (notebooks)