Public API

Python API (overview)

omegaml.datasets

the OmegaStore for datasets

omegaml.models

the OmegaStore for models

omegaml.runtimes

omegaml.jobs

the jobs API

omegaml.scripts

the OmegaStore for lambda scripts

omegaml.store.base.OmegaStore([mongo_url, …])

The storage backend for models and data

omegaml.runtimes.OmegaRuntime(omega[, …])

omegaml compute cluster gateway

omegaml.runtimes.OmegaRuntimeDask(omega[, …])

omegaml compute cluster gateway to a dask distributed cluster

omegaml.runtimes.OmegaModelProxy(modelname)

proxy to a remote model in a celery worker

omegaml.runtimes.OmegaJobProxy(jobname[, …])

proxy to a remote job in a celery worker

omegaml.notebook.jobs.OmegaJobs([prefix, …])

Omega Jobs API

omegaml.mdataframe.MDataFrame(collection[, …])

A DataFrame for mongodb

omegaml.mdataframe.MGrouper(mdataframe, …)

a Grouper for MDataFrames

omegaml.mdataframe.MLocIndexer(mdataframe[, …])

implements the LocIndexer for MDataFrames

omegaml.mdataframe.MPosIndexer(mdataframe)

implements the position-based indexer for MDataFrames

omegaml.backends.sqlalchemy.SQLAlchemyBackend([…])

sqlalchemy plugin for omegaml

Python API

omega|ml

omegaml.datasets - storage for data

the OmegaStore for datasets

omegaml.models - storage for models

the OmegaStore for models

omegaml.runtimes - the cluster runtime API
omegaml.notebook.jobs - the lambda compute service

omegaml.store

class omegaml.store.base.OmegaStore(mongo_url=None, bucket=None, prefix=None, kind=None, defaults=None, dbalias=None)

The storage backend for models and data

collection(name=None, bucket=None, prefix=None)

Returns a mongo db collection as a datastore

If there is an existing object of name, will return the .collection of the object. Otherwise returns the collection according to naming convention {bucket}.{prefix}.{name}.datastore

Parameters

name – the collection to use. if none defaults to the collection name given on instantiation. the actual collection name used is always prefix + name + ‘.data’

drop(name, force=False, version=-1)

Drop the object

Parameters
  • name – The name of the object

  • force – If True ignores DoesNotExist exception, defaults to False meaning this raises a DoesNotExist exception of the name does not exist

Returns

True if object was deleted, False if not. If force is True and the object does not exist it will still return True

property fs

Retrieve a gridfs instance using url and collection provided

Returns

a gridfs instance

get(name, version=-1, force_python=False, kind=None, **kwargs)

Retrieve an object

Parameters
  • name – The name of the object

  • version – Version of the stored object (not supported)

  • force_python – Return as a python object

  • kwargs – kwargs depending on object kind

Returns

an object, estimator, pipelines, data array or pandas dataframe previously stored with put()

get_backend(name, model_store=None, data_store=None, **kwargs)

return the backend by a given object name

Parameters
  • kind – The object kind

  • model_store – the OmegaStore instance used to store models

  • data_store – the OmegaStore instance used to store data

  • kwargs – the kwargs passed to the backend initialization

Returns

the backend

get_backend_bykind(kind, model_store=None, data_store=None, **kwargs)

return the backend by a given object kind

Parameters
  • kind – The object kind

  • model_store – the OmegaStore instance used to store models

  • data_store – the OmegaStore instance used to store data

  • kwargs – the kwargs passed to the backend initialization

Returns

the backend

get_backend_byobj(obj, name, kind=None, attributes=None, model_store=None, data_store=None, **kwargs)

return the matching backend for the given obj

Returns:

the first backend that supports the given parameters or None

get_dataframe_dfgroup(name, version=-1, kwargs=None)

Return a grouped dataframe

Parameters
  • name – the name of the object

  • version – not supported

  • kwargs – mongo db query arguments to be passed to collection.find() as a filter.

get_dataframe_documents(name, columns=None, lazy=False, filter=None, version=-1, is_series=False, chunksize=None, **kwargs)

Internal method to return DataFrame from documents

Parameters
  • name – the name of the object (str)

  • columns – the column projection as a list of column names

  • lazy – if True returns a lazy representation as an MDataFrame. If False retrieves all data and returns a DataFrame (default)

  • filter – the filter to be applied as a column__op=value dict

  • version – the version to retrieve (not supported)

  • is_series – if True retruns a Series instead of a DataFrame

  • kwargs – remaining kwargs are used a filter. The filter kwarg overrides other kwargs.

Returns

the retrieved object (DataFrame, Series or MDataFrame)

get_dataframe_hdf(name, version=-1)

Retrieve dataframe from hdf

Parameters
  • name – The name of object

  • version – The version of object (not supported)

Returns

Returns a python pandas dataframe

Raises

gridfs.errors.NoFile

get_object_as_python(meta, version=-1)

Retrieve object as python object

Parameters
  • meta – The metadata object

  • version – The version of the object

Returns

Returns data as python object

get_python_data(name, version=-1, **kwargs)

Retrieve objects as python data

Parameters
  • name – The name of object

  • version – The version of object

Returns

Returns the object as python list object

getl(*args, **kwargs)

return a lazy MDataFrame for a given object

Same as .get, but returns a MDataFrame

list(pattern=None, regexp=None, kind=None, raw=False, hidden=None, include_temp=False, bucket=None, prefix=None, filter=None)

List all files in store

specify pattern as a unix pattern (e.g. models/*, or specify regexp)

Parameters
  • pattern – the unix file pattern or None for all

  • regexp – the regexp. takes precedence over pattern

  • raw – if True return the meta data objects

  • filter – specify additional filter criteria, optional

Returns

List of files in store

make_metadata(name, kind, bucket=None, prefix=None, **kwargs)

create or update a metadata object

this retrieves a Metadata object if it exists given the kwargs. Only the name, prefix and bucket arguments are considered

for existing Metadata objects, the attributes kw is treated as follows:

  • attributes=None, the existing attributes are left as is

  • attributes={}, the attributes value on an existing metadata object is reset to the empty dict

  • attributes={ some : value }, the existing attributes are updated

For new metadata objects, attributes defaults to {} if not specified, else is set as provided.

Parameters
  • name – the object name

  • bucket – the bucket, optional, defaults to self.bucket

  • prefix – the prefix, optional, defaults to self.prefix

metadata(name=None, bucket=None, prefix=None, version=-1)

Returns a metadata document for the given entry name

FIXME: version attribute does not do anything FIXME: metadata should be stored in a bucket-specific collection to enable access control, see https://docs.mongodb.com/manual/reference/method/db.createRole/#db.createRole

property mongodb

Returns a mongo database object

object_store_key(name, ext, hashed=False)

Returns the store key

Unless you write a mixin or a backend you should not use this method

Parameters
  • name – The name of object

  • ext – The extension of the filename

  • hashed – hash the key to support arbitrary name length, defaults to False, will default to True in future versions

Returns

A filename with relative bucket, prefix and name

put(obj, name, attributes=None, kind=None, replace=False, **kwargs)

Stores an object, store estimators, pipelines, numpy arrays or pandas dataframes

put_dataframe_as_dfgroup(obj, name, groupby, attributes=None)

store a dataframe grouped by columns in a mongo document

Example

> # each group > { > #group keys > key: val, > _data: [ > # only data keys > { key: val, … } > ]}

put_dataframe_as_documents(obj, name, append=None, attributes=None, index=None, timestamp=None, chunksize=None)

store a dataframe as a row-wise collection of documents

Parameters
  • obj – the dataframe to store

  • name – the name of the item in the store

  • append – if False collection will be dropped before inserting, if True existing documents will persist. Defaults to True. If not specified and rows have been previously inserted, will issue a warning.

  • index – list of columns, using +, -, @ as a column prefix to specify ASCENDING, DESCENDING, GEOSPHERE respectively. For @ the column has to represent a valid GeoJSON object.

  • timestamp – if True or a field name adds a timestamp. If the value is a boolean or datetime, uses _created as the field name. The timestamp is always datetime.datetime.utcnow(). May be overriden by specifying the tuple (col, datetime).

Returns

the Metadata object created

put_ndarray_as_hdf(obj, name, attributes=None)

store numpy array as hdf

this is hack, converting the array to a dataframe then storing it

put_pyobj_as_document(obj, name, attributes=None, append=True)

store a dict as a document

similar to put_dataframe_as_documents no data will be replaced by default. that is, obj is appended as new documents into the objects’ mongo collection. to replace the data, specify append=False.

put_pyobj_as_hdf(obj, name, attributes=None)

store list, tuple, dict as hdf

this requires the list, tuple or dict to be convertible into a dataframe

rebuild_params(kwargs, collection)

Returns a modified set of parameters for querying mongodb based on how the mongo document is structured and the fields the document is grouped by.

Note: Explicitly to be used with get_grouped_data only

Parameters
  • kwargs – Mongo filter arguments

  • collection – The name of mongodb collection

Returns

Returns a set of parameters as dictionary.

register_backend(kind, backend)

register a backend class

Parameters
  • kind – (str) the backend kind

  • backend – (class) the backend class

register_backends()

register backends in defaults.OMEGA_STORE_BACKENDS

register_mixin(mixincls)

register a mixin class

Parameters

mixincls – (class) the mixin class

property tmppath

return an instance-specific temporary path

omegaml.runtimes

class omegaml.runtimes.OmegaRuntime(omega, bucket=None, defaults=None, celeryconf=None)

omegaml compute cluster gateway

job(jobname, require=None)

return a job for remote exeuction

Args:

require (dict): routing requirements for this job

model(modelname, require=None)

return a model for remote execution

Args:

require (dict): routing requirements for this job

ping(require=None, *args, **kwargs)

ping the runtimes

Args:

require (dict): routing requirements for this job args (tuple): task args kwargs (dict): task kwargs

require(label=None, always=False, **kwargs)

specify requirements for the task execution

Use this to specify resource or routing requirements on the next task call sent to the runtime. Any requirements will be reset after the call has been submitted.

Args:

always (bool): if True requirements will persist across task calls. defaults to False label (str): the label required by the worker to have a runtime task dispatched to it kwargs: requirements specification that the runtime understands

Usage:

om.runtime.require(label=’gpu’).model(‘foo’).fit(…)

Returns:

self

script(scriptname, require=None)

return a script for remote execution

Args:

require (dict): routing requirements for this job

settings(require=None)

return the runtimes’s cluster settings

task(name)

retrieve the task function from the celery instance

Args:

kwargs (dict): routing keywords to CeleryTask.apply_async

class omegaml.runtimes.OmegaRuntimeDask(omega, dask_url=None)

omegaml compute cluster gateway to a dask distributed cluster

set environ DASK_DEBUG=1 to run dask tasks locally

job(jobname)

return a job for remote exeuction

model(modelname)

return a model for remote execution

settings()

return the runtimes’s cluster settings

task(name, **kwargs)

retrieve the task function from the task module

This retrieves the task function and wraps it into a DaskTask. DaskTask mimicks a celery task and is called on the cluster using .delay(), the same way we call a celery task. .delay() will return a DaskAsyncResult, supporting the celery .get() semantics. This way we can use the same proxy objects, as all they do is call .delay() and return an AsyncResult.

class omegaml.runtimes.OmegaModelProxy(modelname, runtime=None)

proxy to a remote model in a celery worker

The proxy provides the same methods as the model but will execute the methods using celery tasks and return celery AsyncResult objects

Usage:

om = Omega()
# train a model
# result is AsyncResult, use .get() to return it's result
result = om.runtime.model('foo').fit('datax', 'datay')
result.get()

# predict
result = om.runtime.model('foo').predict('datax')
# result is AsyncResult, use .get() to return it's result
print result.get()
apply_mixins()

apply mixins in defaults.OMEGA_RUNTIME_MIXINS

task(name)

return the task from the runtime with requirements applied

class omegaml.runtimes.OmegaJobProxy(jobname, runtime=None)

proxy to a remote job in a celery worker

Usage:

om = Omega()
# result is AsyncResult, use .get() to return it's result
result = om.runtime.job('foojob').run()
result.get()

# result is AsyncResult, use .get() to return it's result
result = om.runtime.job('foojob').schedule()
result.get()
run(**kwargs)

run the job

Returns

the result

schedule(**kwargs)

schedule the job

omegaml.jobs

class omegaml.notebook.jobs.OmegaJobs(prefix=None, store=None, defaults=None)

Omega Jobs API

create(code, name)

create a notebook from code

Parameters
  • code – the code as a string

  • name – the name of the job to create

Returns

the metadata object created

drop_schedule(name)

Drop an existing schedule, if any

This will drop any existing schedule and any pending triggers of event-kind ‘scheduled’.

Args:

name (str): the name of the job

Returns:

Metadata

export(name, localpath, format='html')

Export a job or result file to HTML

The job is exported in the given format.

Parameters
  • name – the name of the job, as in jobs.get

  • localpath – the path of the local file to write. If you specify an empty path or ‘memory’ a tuple of (body, resource) is returned instead

  • format – the output format. currently only 'html' is supported

Returns

the (data, resources) tuple as returned by nbconvert. For format html data is the HTML’s body, for PDF it is the pdf file contents

get(name)

Retrieve a notebook and return a NotebookNode

get_collection(collection)

returns the collection object

get_notebook_config(nb_filename)

returns the omegaml script config on the notebook’s first cell

If there is no config cell or the config cell is invalid raises a ValueError

get_schedule(name, only_pending=False)

return the cron schedule and corresponding triggers

Args:

name (str): the name of the job

Returns:

tuple of (run_at, triggers)

run_at (str): the cron spec, None if not scheduled triggers (list): the list of triggers

list(pattern=None, regexp=None, raw=False, **kwargs)

list all jobs matching filter. filter is a regex on the name of the ipynb entry. The default is all, i.e. .*

put(obj, name, attributes=None)

Store a NotebookNode

Parameters
  • obj – the NotebookNode to store

  • name – the name of the notebook

run(name)

Run a job immediately

The job is run and the results are stored in the given filename

Parameters

name – the name of the jobfile

Returns

the metadata of the job

run_notebook(name, event=None)

run a given notebook immediately. the job parameter is the name of the job script as in ipynb. Inserts and returns the Metadata document for the job.

schedule(nb_file, run_at=None, last_run=None)

Schedule a processing of a notebook as per the interval specified on the job script

omegaml.mdataframe

class omegaml.mdataframe.MDataFrame(collection, columns=None, query=None, limit=None, skip=None, sort_order=None, force_columns=None, immediate_loc=False, auto_inspect=False, normalize=False, raw=False, parser=None, preparefn=None, from_loc_range=False, **kwargs)

A DataFrame for mongodb

Performs out-of-core, lazy computOation on a mongodb cluster. Behaves like a pandas DataFrame. Actual results are returned as pandas DataFrames.

__len__()

the projected number of rows when resolving

count()

projected number of rows when resolving

create_index(keys, **kwargs)

create and index the easy way

groupby(columns, sort=True)

Group by a given set of columns

Parameters
  • columns – the list of columns

  • sort – if True sort by group key

Returns

MGrouper

head(limit=10)

return up to limit numbers of rows

Parameters

limit – the number of rows to return. Defaults to 10

Returns

the MDataFrame

inspect(explain=False, cached=False, cursor=None, raw=False)

inspect this dataframe’s actual mongodb query

Parameters

explain – if True explains access path

iterchunks(chunksize=100)

return an iterator

Args:

chunksize (int): number of rows in each chunk

Returns:

a dataframe of max. length chunksize

list_indexes()

list all indices in database

property loc

Access by index

Use as mdf.loc[index_value]

Returns

MLocIndexer

merge(right, on=None, left_on=None, right_on=None, how='inner', target=None, suffixes=('_x', '_y'), sort=False, inspect=False)

merge this dataframe with another dataframe. only left outer joins are currently supported. the output is saved as a new collection, target name (defaults to a generated name if not specified).

Parameters
  • right – the other MDataFrame

  • on – the list of key columns to merge by

  • left_on – the list of the key columns to merge on this dataframe

  • right_on – the list of the key columns to merge on the other dataframe

  • how – the method to merge. supported are left, inner, right. Defaults to inner

  • target – the name of the collection to store the merge results in. If not provided a temporary name will be created.

  • suffixes – the suffixes to apply to identical left and right columns

  • sort – if True the merge results will be sorted. If False the MongoDB natural order is implied.

Returns

the MDataFrame to the target MDataFrame

query(*args, **kwargs)

return a new MDataFrame with a filter criteria

Any subsequent operation on the new dataframe will have the filter applied. To reset the filter call .reset() without arguments.

Note: Unlike pandas DataFrames, a filtered MDataFrame operates on the same collection as the original DataFrame

Parameters
  • args – a Q object or logical combination of Q objects (optional)

  • kwargs – all AND filter criteria

Returns

a new MDataFrame with the filter applied

query_inplace(*args, **kwargs)

filters this MDataFrame and returns it.

Any subsequent operation on the dataframe will have the filter applied. To reset the filter call .reset() without arguments.

Parameters
  • args – a Q object or logical combination of Q objects (optional)

  • kwargs – all AND filter criteria

Returns

self

property shape

return shape of dataframe

skip(topn)

skip the topn number of rows

Parameters

topn – the number of rows to skip.

Returns

the MDataFrame

sort(columns)

sort by specified columns

Parameters

columns – str of single column or a list of columns. Sort order is specified as the + (ascending) or - (descending) prefix to the column name. Default sort order is ascending.

Returns

the MDataFrame

tail(limit=10)

return up to limit number of rows from last inserted values

Parameters

limit

Returns

property value

resolve the query and return a Pandas DataFrame

Returns

the result of the query as a pandas DataFrame

class omegaml.mdataframe.MSeries(*args, **kwargs)

Series implementation for MDataFrames

behaves like a DataFrame but limited to one column.

__len__()

the projected number of rows when resolving

count()

projected number of rows when resolving

create_index(keys, **kwargs)

create and index the easy way

groupby(columns, sort=True)

Group by a given set of columns

Parameters
  • columns – the list of columns

  • sort – if True sort by group key

Returns

MGrouper

head(limit=10)

return up to limit numbers of rows

Parameters

limit – the number of rows to return. Defaults to 10

Returns

the MDataFrame

inspect(explain=False, cached=False, cursor=None, raw=False)

inspect this dataframe’s actual mongodb query

Parameters

explain – if True explains access path

iterchunks(chunksize=100)

return an iterator

Args:

chunksize (int): number of rows in each chunk

Returns:

a dataframe of max. length chunksize

list_indexes()

list all indices in database

property loc

Access by index

Use as mdf.loc[index_value]

Returns

MLocIndexer

merge(right, on=None, left_on=None, right_on=None, how='inner', target=None, suffixes=('_x', '_y'), sort=False, inspect=False)

merge this dataframe with another dataframe. only left outer joins are currently supported. the output is saved as a new collection, target name (defaults to a generated name if not specified).

Parameters
  • right – the other MDataFrame

  • on – the list of key columns to merge by

  • left_on – the list of the key columns to merge on this dataframe

  • right_on – the list of the key columns to merge on the other dataframe

  • how – the method to merge. supported are left, inner, right. Defaults to inner

  • target – the name of the collection to store the merge results in. If not provided a temporary name will be created.

  • suffixes – the suffixes to apply to identical left and right columns

  • sort – if True the merge results will be sorted. If False the MongoDB natural order is implied.

Returns

the MDataFrame to the target MDataFrame

query(*args, **kwargs)

return a new MDataFrame with a filter criteria

Any subsequent operation on the new dataframe will have the filter applied. To reset the filter call .reset() without arguments.

Note: Unlike pandas DataFrames, a filtered MDataFrame operates on the same collection as the original DataFrame

Parameters
  • args – a Q object or logical combination of Q objects (optional)

  • kwargs – all AND filter criteria

Returns

a new MDataFrame with the filter applied

query_inplace(*args, **kwargs)

filters this MDataFrame and returns it.

Any subsequent operation on the dataframe will have the filter applied. To reset the filter call .reset() without arguments.

Parameters
  • args – a Q object or logical combination of Q objects (optional)

  • kwargs – all AND filter criteria

Returns

self

property shape

return shape of dataframe

skip(topn)

skip the topn number of rows

Parameters

topn – the number of rows to skip.

Returns

the MDataFrame

sort(columns)

sort by specified columns

Parameters

columns – str of single column or a list of columns. Sort order is specified as the + (ascending) or - (descending) prefix to the column name. Default sort order is ascending.

Returns

the MDataFrame

tail(limit=10)

return up to limit number of rows from last inserted values

Parameters

limit

Returns

unique()

return the unique set of values for the series

Returns

MSeries

property value

return the value of the series

this is a Series unless unique() was called. If unique() only distinct values are returned as an array, matching the behavior of a Series

Returns

pandas.Series

class omegaml.mdataframe.MGrouper(mdataframe, collection, columns, sort=True)

a Grouper for MDataFrames

agg(specs)

shortcut for .aggregate

aggregate(specs, **kwargs)

aggregate by given specs

See the following link for a list of supported operations. https://docs.mongodb.com/manual/reference/operator/aggregation/group/

Parameters

specs – a dictionary of { column : function | list[functions] } pairs.

count()

return counts by group columns

class omegaml.mdataframe.MLocIndexer(mdataframe, positional=False)

implements the LocIndexer for MDataFrames

__getitem__(specs)

access by index

use as mdf.loc[specs] where specs is any of

  • a list or tuple of scalar index values, e.g. .loc[(1,2,3)]

  • a slice of values e.g. .loc[1:5]

  • a list of slices, e.g. .loc[1:5, 2:3]

Returns

the sliced part of the MDataFrame

class omegaml.mdataframe.MPosIndexer(mdataframe)

implements the position-based indexer for MDataFrames

class omegaml.mixins.mdf.ApplyContext(caller, columns=None, index=None)

Enable apply functions

.apply(fn) will call fn(ctx) where ctx is an ApplyContext. The context supports methods to apply functions in a Pandas-style apply manner. ApplyContext is extensible by adding an extension class to defaults.OMEGA_MDF_APPLY_MIXINS.

Note that unlike a Pandas DataFrame, ApplyContext does not itself contain any data. Rather it is part of an expression tree, i.e. the aggregation pipeline. Thus any expressions applied are translated into operations on the expression tree. The expression tree is evaluated on MDataFrame.value, at which point the ApplyContext nor the function that created it are active.

Examples:

mdf.apply(lambda v: v * 5 ) => multiply every column in dataframe
mdf.apply(lambda v: v['foo'].dt.week) => get week of date for column foo
mdf.apply(lambda v: dict(a=v['foo'].dt.week,
                         b=v['bar'] * 5) => run multiple pipelines and get results

The callable passed to apply can be any function. It can either return None,
the context passed in or a list of pipeline stages.

# apply any of the below functions
mdf.apply(customfn)

# same as lambda v: v.dt.week
def customfn(ctx):
    return ctx.dt.week

# simple pipeline
def customfn(ctx):
    ctx.project(x={'$multiply: ['$x', 5]})
    ctx.project(y={'$divide: ['$x', 2]})

# complex pipeline
def customfn(ctx):
    return [
        { '$match': ... },
        { '$project': ... },
    ]
add(stage)

Add a processing stage to the pipeline

see https://docs.mongodb.com/manual/meta/aggregation-quick-reference/

groupby(by, expr=None, append=None, **kwargs)

add a groupby accumulation using $group

Parameters
  • by – the groupby columns, if provided as a list will be transformed

  • expr

  • append

  • kwargs

Returns

project(expr=None, append=False, keep=False, **kwargs)

add a projection using $project

Parameters
  • expr – the column-operator mapping

  • append – if True add a $project stage, otherwise add to existing

  • kwargs – if expr is None, the column-operator mapping as kwargs

Returns

ApplyContext

class omegaml.mixins.mdf.ApplyArithmetics

Math operators for ApplyContext

  • __mul__ (*)

  • __add__ (+)

  • __sub__ (-)

  • __div__ (/)

  • __floordiv__ (//)

  • __mod__ (%)

  • __pow__ (pow)

  • __ceil__ (ceil)

  • __floor__ (floor)

  • __trunc__ (trunc)

  • __abs__ (abs)

  • sqrt (math.sqrt)

sqrt(other)

square root

class omegaml.mixins.mdf.ApplyDateTime

Datetime operators for ApplyContext

property day

dayOfMonth

property dayofweek

dayOfWeek

property dayofyear

dayOfYear

property hour
property millisecond
property minute
property month
property second
property week

isoWeek

property year
class omegaml.mixins.mdf.ApplyString

String operators

concat(other, *args)
index(other, *args)

indexOfBytes

split(other, *args)
strcasecmp(other, *args)
substr(other, *args)
usplit(other, *args)

split

class omegaml.mixins.mdf.ApplyAccumulators

Backends:

class omegaml.backends.sqlalchemy.SQLAlchemyBackend(model_store=None, data_store=None, **kwargs)

sqlalchemy plugin for omegaml

Usage:

# define your sqlalchemy connection sqlalchemy_constr = f’sqlalchemy://{user}:{password}@{account}/’

Store in any of three ways:

# – just the connection om.datasets.put(sqlalchemy_constr, ‘mysqlalchemy’) om.datasets.get(‘mysqlalchemy’) => the sql connection object

# – store connection with a predefined sql om.datasets.put(sqlalchemy_constr, ‘mysqlalchemy’, sql=’select ….’) om.datasets.get(‘mysqlalchemy’) => will return a pandas dataframe. specify chunksize to return an interable of dataframes

# – predefined sqls can contain variables to be resolved at access time # if you miss to specify required variables in sqlvars, a KeyError is raised om.datasets.put(sqlaclhemy_constr, ‘myview’, sql=’select … from col=”{var}”’) om.datasets.get(‘mysqlalchemy’, sqlvars=dict(var=”value”))

Copy to an omega dataset from the connection

# – copy the result of the sqlalchemy query to omegaml om.datasets.put(sqlalchemy_constr, ‘mysqlalchemy’, sql=’select …’, copy=True) om.datasets.get(‘mysqlalchemy’) => will return a pandas dataframe (without executing any additional queries) => can also use with om.datasets.getl(‘mysqlalchemy’) to return a MDataFrame

Insert data via the connection

# – store data back through the connection om.datasets.put(sqlalchemy_constr, ‘mysqlalchemy’) om.datasets.put(df, ‘mysqlalchemy’, table=’SOMETABLE’)

Connection strings can contain variables, e.g. userid and password. By default variables are resolved from the os environment. Can also specify using any dict.

# – use connection string with variables sqlalchemy_constr = ‘sqlite:///{dbname}.db’ om.datasets.put(sqlalchemy_constr, ‘userdb’) om.datasets.get(‘userdb’, secrets=dict(dbname=’chuckdb’))

# – alternatively, create a vault dataset: secrets = dict(userid=’chuck’, dbname=’chuckdb’) om.datasets.put(secrets, ‘_omega/vault’) om.datasets.get(‘userdb’)

the ‘_omega/vault’ dataset will be queried using the current userid as the secret name,ad the dbname retrieved from the document. This is experimental and the vault is not encrypted.

Advanced:

om.datasets.put() supports the following additional keyword arguments

chunksize=int specify the number of rows to read from sqlalchemy in one chunk.

defaults to 10000

parse_dates=[‘col’, …] list of column names to parse for date, time or datetime.

see pd.read_sql for details

transform=callable a callable, is passed the DataFrame of each chunk before it

is inserted into the database. use to provide custom transformations. only works on copy=True

as well as other kwargs supported by pd.read_sql

get(name, sql=None, chunksize=None, raw=False, sqlvars=None, secrets=None, index=True, keep=False, lazy=False, *args, **kwargs)

retrieve connection or query data from connection

Args:

name (str): the name of the connection secrets (dict): dict to resolve variables in the connection string keep (bool): if True connection is kept open.

Query data, specify sql=’select …’:

sql (str): the sql query, defaults to the query specific on .put() chunksize (int): the number of records for each chunk, if

specified returns an iterator

sqlvars (dict): optional, if specified will be used to format sql

Get connection:

raw (bool): the raw sql alchemy connection

To reuse connections:

specify keep=True. Note this is potentially unsafe in a multi-user environment where connection strings contain user-specific secrets. If you want to always keep connections open, specify om.datasets.defaults.SQLALCHEMY_ALWAYS_CACHE=True

Returns:

connection or pd.DataFrame

put(obj, name, sql=None, copy=False, append=True, chunksize=None, transform=None, table=None, attributes=None, insert=False, secrets=None, *args, **kwargs)

store sqlalchemy connection or data into an existing connection

Args:

obj (str|pd.DataFrame): the sqlalchemy connection string or a dataframe object name (str): the name of the object table (str): optional, if specified is stored along connection sql (str): optional, if specified is stored along connection copy (bool): optional, if True the connection is queried using sql and the resulting data is stored instead,

see Copying

attributes (dict): optional, set or update metadata.attributes

Copying data, specify copy=True:

sql (str): sql to query append (bool): if True the data is appended if exists already chunksize (int): number of records to use transform (callable): passed as DataFrame.to_sql(method=)

Inserting via connection, specify insert=True:

insert (bool): specify True to insert via connection table (str): the table name to use for inserting data append (bool): if False will replace any existing table

Returns:

metadata

classmethod supports(obj, name, insert=False, data_store=None, model_store=None, *args, **kwargs)

test if this backend supports this obj