om.datasets

omegaml.datasets = OmegaStore(bucket=omegaml, prefix=data/)

the omegaml.store.base.OmegaStore store for datasets

Methods:

Mixins:

Backends:

Backends

class omegaml.backends.sqlalchemy.SQLAlchemyBackend(model_store=None, data_store=None, tracking=None, **kwargs)

sqlalchemy plugin for omegaml

Usage:

Define your sqlalchemy connection:

sqlalchemy_constr = f'sqlalchemy://{user}:{password}@{account}/'

Store the connection in any of three ways:

# -- just the connection
om.datasets.put(sqlalchemy_constr, 'mysqlalchemy')
om.datasets.get('mysqlalchemy', raw=True)
=> the sql connection object

# -- store connection with a predefined sql
om.datasets.put(sqlalchemy_constr, 'mysqlalchemy',
                sql='select ....')
om.datasets.get('mysqlalchemy')
=> will return a pandas dataframe using the specified sql to run.
   specify chunksize= to return an interable of dataframes

# -- predefined sqls can contain variables to be resolved at access time
#    if you miss to specify required variables in sqlvars, a KeyError is raised
om.datasets.put(sqlaclhemy_constr, 'myview',
                sql='select ... from col="{var}"')
om.datasets.get('mysqlalchemy', sqlvars=dict(var="value"))

Query data from a connection and store into an omega-ml dataset:

# -- copy the result of the sqlalchemy query to omegaml
om.datasets.put(sqlalchemy_constr, 'mysqlalchemy',
                sql='select ...', copy=True)
om.datasets.get('mysqlalchemy')
=> will return a pandas dataframe (without executing any additional queries)
=> can also use with om.datasets.getl('mysqlalchemy') to return a MDataFrame

Controlling the table used in the connection:

# -- the default table is {bucket}_{name}, override using table='myname'
om.datasets.put(sqlalchemy_constr, 'mysqlalchemy',
                table='mytable',
                sql='select ...',
                copy=True)
om.datasets.get('mysqlalchemy') # read from {bucket}_myname

# -- to use a specific table, without bucket information use table=':myname'
om.datasets.put(sqlalchemy_constr, 'mysqlalchemy',
                table=':mytable',
                sql='select ...',
                copy=True)
om.datasets.get('mysqlalchemy') # read from myname

Inserting data via a previously stored connection:

# -- store data back through the connection
om.datasets.put(sqlalchemy_constr, 'mysqlalchemy')
om.datasets.put(df, 'mysqlalchemy',
                table='SOMETABLE')

Using variables in connection strings:

Connection strings may contain variables, e.g. userid and password. By default variables are resolved from the os environment. Can also specify using any dict.:

# -- use connection string with variables
sqlalchemy_constr = 'sqlite:///{dbname}.db'
om.datasets.put(sqlalchemy_constr, 'userdb')
om.datasets.get('userdb', secrets=dict(dbname='chuckdb'))

# -- alternatively, create a vault dataset:
secrets = dict(userid='chuck', dbname='chuckdb')
om.datasets.put(secrets, '_omega/vault')
om.datasets.get('userdb')

the '_omega/vault' dataset will be queried using the current userid as
the secret name,ad the dbname retrieved from the document. This is
experimental and the vault is not encrypted.

Advanced:

om.datasets.put() supports the following additional keyword arguments

  • chunksize=int - specify the number of rows to read from sqlalchemy in one chunk. defaults to 10000

  • parse_dates=['col', ...] - list of column names to parse for date, time or datetime. see pd.read_sql for details

  • transform=callable - a callable, is passed the DataFrame of each chunk before it is inserted into the database. use to provide custom transformations. only works on copy=True

  • any other kwargs supported by pandas.read_sql

KIND = 'sqlalchemy.conx'
get(name, sql=None, chunksize=None, raw=False, sqlvars=None, secrets=None, index=True, keep=False, lazy=False, table=None, *args, **kwargs)

retrieve a stored connection or query data from connection

Parameters:
  • name (str) – the name of the connection

  • secrets (dict) – dict to resolve variables in the connection string

  • keep (bool) – if True connection is kept open.

  • table (str) – the name of the table, will be prefixed with the store’s bucket name unless the table is specified as ‘:name’

Returns:

connection

To query data and return a DataFrame, specify sql='select ...':

Parameters:
  • sql (str) – the sql query, defaults to the query specific on .put()

  • chunksize (int) – the number of records for each chunk, if specified returns an iterator

  • sqlvars (dict) – optional, if specified will be used to format sql

Returns:

pd.DataFrame

To get the connection for a data query, instead of a DataFrame:

Parameters:
  • raw (bool) – if True, returns the raw sql alchemy connection

  • keep (bool) – option, if True keeps the connection open. This is potentially unsafe in a multi-user environment where connection strings contain user-specific secrets. To always keep connections open, set om.datasets.defaults.SQLALCHEMY_ALWAYS_CACHE=True

Returns:

connection

put(obj, name, sql=None, copy=False, append=True, chunksize=None, transform=None, table=None, attributes=None, insert=False, secrets=None, *args, **kwargs)

store sqlalchemy connection or insert data into an existing connection

Parameters:
  • obj (str|pd.DataFrame) – the sqlalchemy connection string or a dataframe object

  • name (str) – the name of the object

  • table (str) – optional, if specified is stored along connection

  • sql (str) – optional, if specified is stored along connection

  • copy (bool) – optional, if True the connection is queried using sql and the resulting data is stored instead, see below

  • attributes (dict) – optional, set or update metadata.attributes

Returns:

metadata of the stored connection

Instead of inserting the connection specify copy=True to query data and store it as a DataFrame dataset given by name:

Parameters:
  • sql (str) – sql to query

  • append (bool) – if True the data is appended if exists already

  • chunksize (int) – number of records to query in each chunk

  • transform (callable) – passed as DataFrame.to_sql(method=)

Returns:

metadata of the inserted dataframe

To insert data via a previously stored connection, specify insert=True:

Parameters:
  • insert (bool) – specify True to insert via the connection

  • table (str) – the table name to use for inserting data

  • append (bool) – if False will replace any existing table, defaults to True

  • index (bool) – if False will not attempt to create an index in target, defaults to False

  • chunksize (int) – number of records to insert in each chunk

Returns:

metadata of the connection

classmethod supports(obj, name, insert=False, data_store=None, model_store=None, *args, **kwargs)

test if this backend supports this obj

class omegaml.backends.npndarray.NumpyNDArrayBackend(model_store=None, data_store=None, tracking=None, **kwargs)

Store numpy NDArray of any shape or size

The NDArray is serialized to a byte string and stored as a BLOB. Thus it can have arbitrary size and dimensions, ideal for image data and Tensors.

KIND = 'ndarray.bin'
get(name, version=- 1, force_python=False, lazy=False, **kwargs)

get an obj

Parameters:

name – the name of the object (str)

Returns:

the object as it was originally stored

put(obj, name, attributes=None, allow_pickle=False, **kwargs)

put an obj

Parameters:
  • obj – the object to store (object)

  • name – the name of the object (str)

  • attributes – the attributes dict (dict, optional)

  • kwargs – other kwargs to be passed to the Metadata object

Returns:

the Metadata object

classmethod supports(obj, name, as_pydata=False, **kwargs)

test if this backend supports this obj

class omegaml.backends.virtualobj.VirtualObjectBackend(model_store=None, data_store=None, tracking=None, **kwargs)

Support arbitrary functions as object handlers

Virtual object functions can be any callable that provides a __omega_virtual__ attribute. The callable must support the following signature:

@virtualobj
def virtualobjfn(data=None, method='get|put|drop',
                 meta=None, **kwargs):
    ...
    return data

Note that there is a distinction between storing the function as a virtual object, and passing data in or getting data out of the store. It is the responsibility of the function to implement the appropriate code for get, put, drop, as well as to keep track of the data it actually stores.

As a convenience virtual object handlers can be implemented as a subclass of VirtualObjectHandler

Usage:

# create the 'foo' virtual object
om.datasets.put(virtualobjfn, 'foo')

# get data from the virtualobj
om.datasets.get('foo')
=> will call virtualobjfn(method='get')

# put data into the virtualobj
om.datasets.put(data, 'foo')
=> will call virtualobjfn(data=data, method='put')

# drop the virtualfn
om.datasets.drop('name')
=> will call virtualobjfn(method='drop') and then
   drop the virtual object completely from the storage

Warning

Virtual objects are executed in the address space of the client or runtime context. Make sure that the source of the code is trustworthy. Note that this is different from Backends and Mixins as these are pro-actively enabled by the administrator of the client or runtime context, respectively - virtual objects can be injected by anyone who are authorized to write data.

KIND = 'virtualobj.dill'
get(name, version=- 1, force_python=False, lazy=False, **kwargs)

get an obj

Parameters:

name – the name of the object (str)

Returns:

the object as it was originally stored

put(obj, name, attributes=None, **kwargs)

put an obj

Parameters:
  • obj – the object to store (object)

  • name – the name of the object (str)

  • attributes – the attributes dict (dict, optional)

  • kwargs – other kwargs to be passed to the Metadata object

Returns:

the Metadata object

reduce(modelname, results, rName=None, **kwargs)

reduce a list of results to a single result

Use this as the last step in a task canvas

Parameters:
  • modelname (str) – the name of the virtualobj

  • results (list) – the list of results forwarded by task canvas

  • rName (result) – the name of the result object

  • **kwargs

Returns:

result of the virtualobj handler

See Also

om.runtime.mapreduce

classmethod supports(obj, name, **kwargs)

test if this backend supports this obj

class omegaml.backends.rawdict.PandasRawDictBackend(model_store=None, data_store=None, tracking=None, **kwargs)

OmegaStore backend to support arbitrary collections

Usage:

# store any collection as part of metadata
coll = db['some_collection']
om.datasets.put(coll, 'foo')
=> Metadata(name='foo', collection='some_collection', ...)
# parse the collection using pandas.io.json_normalize
df = om.datasets.get('foo')
# use an alternate parser that accepts dict|list(dict)
df = om.datasets.get('foo', parser=some_fn)
# get a MDataFrame
om.datasets.getl('foo')
# preserve all document keys, including _id
om.datasets.getl('foo', raw=True)
KIND = 'pandas.rawdict'
get(name, version=- 1, lazy=False, raw=False, parser=None, filter=None, **kwargs)

get an obj

Parameters:

name – the name of the object (str)

Returns:

the object as it was originally stored

put(obj, name, attributes=None, **kwargs)

put an obj

Parameters:
  • obj – the object to store (object)

  • name – the name of the object (str)

  • attributes – the attributes dict (dict, optional)

  • kwargs – other kwargs to be passed to the Metadata object

Returns:

the Metadata object

classmethod supports(obj, name, as_raw=None, **kwargs)

test if this backend supports this obj

class omegaml.backends.rawfiles.PythonRawFileBackend(model_store=None, data_store=None, tracking=None, **kwargs)

OmegaStore backend to support arbitrary files

KIND = 'python.file'
get(name, local=None, mode='wb', open_kwargs=None, **kwargs)

get a stored file as a file-like object with binary contents or a local file

Parameters:
  • name (str) – the name of the file

  • local (str) – if set the local path will be created and the file stored there. If local does not have an extension it is assumed to be a directory name, in which case the file is stored as the same name.

  • mode (str) – the mode to use on .open() for the local file

  • open_kwargs (dict) – the kwargs to use .open() for the local file

  • **kwargs – any kwargs passed to datasets.metadata()

Returns:

the file-like output handler (local is None) the path to the local file (local is given)

put(obj, name, attributes=None, encoding=None, **kwargs)

put an obj

Parameters:
  • obj – the object to store (object)

  • name – the name of the object (str)

  • attributes – the attributes dict (dict, optional)

  • kwargs – other kwargs to be passed to the Metadata object

Returns:

the Metadata object

classmethod supports(obj, name, open_kwargs=None, **kwargs)

test if this backend supports this obj

Mixins

class omegaml.mixins.store.ProjectedMixin

A OmegaStore mixin to process column specifications in dataset name

get(name, *args, **kwargs)

Return a projected dataset given a name of form name[colspec]

colspec can be any of

  • a comma separated list of columns, e.g. foo[a,b]

  • an open-ended slice, e.g. foo[a:] => all columns following a, inclusive

  • an closed slice, e.g. foo[a:b] => all columns between a,b, inclusive

  • a close-ended slice, e.g. foo[:b] => all columns up to b, inclusive

  • an empty slice, e.g. foo[:] => all columns

  • a list of columns to exclude, e.g. foo[^b] => all columns except b

Parameters:

name – (str) the name of the dataset, optionally including a column specification

Returns:

the dataset with projected columns

class omegaml.mixins.store.LazyGetMixin

OmegaStore mixin to support chunked lazy get via name

Usage:

equivalent of om.datasets.get('foo', lazy=True).iterchunks():

mdf = om.datasets.get('foo#')
mdf = om.datasets.get('foo#iterchunks')

equivalent of om.datasets.get('foo', lazy=True).iterchunks(chunksize=10):

mdf = om.datasets.get('foo#iterchunks:chunksize=10')

equivalent of om.datasets.get('foo', lazy=True).iloc[0:10]:

mdf = om.datasets.get('foo#rows:start=1,end=10')
class omegaml.mixins.store.virtualobj.VirtualObjectMixin

process virtual objects

This checks if an object is a VirtualObject and if so retrieves the handler and processes it.

class omegaml.mixins.store.promotion.PromotionMixin

Promote objects from one bucket to another

promote(name, other, asname=None, drop=True, **kwargs)

Promote object to another bucket.

This effectively copies the object. If the objects exists in the target it will be replaced.

Parameters:
  • name – The name of the object

  • other – the OmegaStore instance to promote to

  • asname – the name to use in other, defaults to .metadata(name).name

  • kwargs – will be forwarded to other.put

Returns:

The Metadata of the new object

class omegaml.mixins.mdf.iotools.IOToolsStoreMixin
read_csv(csvfn, name, chunksize=10000, append=False, apply=None, mode='r', open_kwargs=None, **kwargs)

read large files from s3, hdfs, http/s, sftp, scp, ssh, write to om.datasets

Usage:

To insert a local csv file into a dataset:

om.datasets.read_csv('/path/filename.csv', 'dataset-name')

To insert a file stored in any of the supported locations specify its fully qualified location and filename. The specific format must be specified according to the smart_open library:

om.datasets.read_csv('https://...', 'dataset-name')
om.datasets.read_csv('s3://...', 'dataset-name')
om.datasets.read_csv('hdfs://...', 'dataset-name')
om.datasets.read_csv('sftp://...', 'dataset-name')
om.datasets.read_csv('scp://...', 'dataset-name')
om.datasets.read_csv('ssh://...', 'dataset-name')

Optionally define a function to receives each chunk as a dataframe and apply further processing (e.g. transformations, filtering):

def process(df):
    # apply any processing to df
    return df

om.datasets.read_csv(...., apply=process)
Parameters:
  • csvfn (str) – the fully qualified path and name of the csv file, according to the smart_open library

  • chunksize (int) – the size of each chunk processed before writing to the dataset

  • append (bool) – if True, appends to the dataset. defaults to False

  • apply (callable) – if specified, each chunk is forwarded as a DataFrame and the returned result is inserted to the dataset. Use this for transformations or filtering

  • mode (str) – file open mode, defaults to r

  • open_kwargs (dict) – additional kwargs to smart_open

  • **kwargs – additional kwargs are passed to pandas.read_csv

Returns:

MDataFrame

See also

to_csv(name, csvfn, chunksize=10000, apply=None, mode='w', open_kwargs=None, **kwargs)

write any dataframe to s3, hdfs, http/s, sftp, scp, ssh

Usage:

To write a dataframe:

om.datasets.write_csv('dataframe-dataset', '/path/to/filename')

To write a large dataframe in chunks:

om.datasets.write_csv(‘dataframe-dataset’, ‘/path/to/filename’, chunksize=100)

Parameters:
  • name (str) – the name of the dataframe dataset

  • csvfn (str) – the fully qualified path and name of the csv file, according to the smart_open library

  • chunksize (int) – the size of each chunk processed before writing to the file

  • apply (callable) – if specified, each chunk is forwarded as a DataFrame and the returned result is written to the file. Use this for transformations or filtering

  • mode (str) – file open mode, defaults to w

  • open_kwargs (dict) – additional kwargs to smart_open

  • **kwargs – additional kwargs are passed to pandas.to_csv

See also

  • pandas.to_csv