om.datasets¶
- omegaml.datasets = OmegaStore(bucket=omegaml, prefix=data/)¶
the
omegaml.store.base.OmegaStore
store for datasetsMethods:
Mixins:
omegaml.mixins.store.extdmeta.ExtendedMetadataMixin
Backends:
Backends¶
- class omegaml.backends.sqlalchemy.SQLAlchemyBackend(model_store=None, data_store=None, tracking=None, **kwargs)¶
sqlalchemy plugin for omegaml
Usage:
Define your sqlalchemy connection:
sqlalchemy_constr = f'sqlalchemy://{user}:{password}@{account}/'
Store the connection in any of three ways:
# -- just the connection om.datasets.put(sqlalchemy_constr, 'mysqlalchemy') om.datasets.get('mysqlalchemy', raw=True) => the sql connection object # -- store connection with a predefined sql om.datasets.put(sqlalchemy_constr, 'mysqlalchemy', sql='select ....') om.datasets.get('mysqlalchemy') => will return a pandas dataframe using the specified sql to run. specify chunksize= to return an interable of dataframes # -- predefined sqls can contain variables to be resolved at access time # if you miss to specify required variables in sqlvars, a KeyError is raised om.datasets.put(sqlaclhemy_constr, 'myview', sql='select ... from T1 where col="{var}"') om.datasets.get('mysqlalchemy', sqlvars=dict(var="value")) # -- Variables are replaced by binding parameters, which is safe for # untrusted inputs. To replace variables as strings, use double # `{{variable}}` notation. A warning will be issued because this # is considered an unsafe practice for untrusted input (REST API). # It is in your responsibility to sanitize the value of the `cols` variable. om.datasets.put(sqlaclhemy_constr, 'myview', sql='select {{cols}} from T1 where col="{var}"') om.datasets.get('mysqlalchemy', sqlvars=dict(cols='foo, bar', var="value"))
Query data from a connection and store into an omega-ml dataset:
# -- copy the result of the sqlalchemy query to omegaml om.datasets.put(sqlalchemy_constr, 'mysqlalchemy', sql='select ...', copy=True) om.datasets.get('mysqlalchemy') => will return a pandas dataframe (without executing any additional queries) => can also use with om.datasets.getl('mysqlalchemy') to return a MDataFrame
Controlling the table used in the connection:
# -- the default table is {bucket}_{name}, override using table='myname' om.datasets.put(sqlalchemy_constr, 'mysqlalchemy', table='mytable', sql='select ...', copy=True) om.datasets.get('mysqlalchemy') # read from {bucket}_myname # -- to use a specific table, without bucket information use table=':myname' om.datasets.put(sqlalchemy_constr, 'mysqlalchemy', table=':mytable', sql='select ...', copy=True) om.datasets.get('mysqlalchemy') # read from myname
Inserting data via a previously stored connection:
# -- store data back through the connection om.datasets.put(sqlalchemy_constr, 'mysqlalchemy') om.datasets.put(df, 'mysqlalchemy', table='SOMETABLE')
Using variables in connection strings:
Connection strings may contain variables, e.g. userid and password. By default variables are resolved from the os environment. Can also specify using any dict.:
# -- use connection string with variables sqlalchemy_constr = 'sqlite:///{dbname}.db' om.datasets.put(sqlalchemy_constr, 'userdb') om.datasets.get('userdb', secrets=dict(dbname='chuckdb')) # -- alternatively, create a vault dataset: secrets = dict(userid='chuck', dbname='chuckdb') om.datasets.put(secrets, '.omega/vault') om.datasets.get('userdb') the '.omega/vault' dataset will be queried using the current userid as the secret name, and the dbname retrieved from the document. This is experimental and the vault is not encrypted.
Advanced:
om.datasets.put()
supports the following additional keyword argumentschunksize=int
- specify the number of rows to read from sqlalchemy in one chunk. defaults to 10000parse_dates=['col', ...]
- list of column names to parse for date, time or datetime. see pd.read_sql for detailstransform=callable
- a callable, is passed the DataFrame of each chunk before it is inserted into the database. use to provide custom transformations. only works on copy=Trueany other kwargs supported by
pandas.read_sql
- KIND = 'sqlalchemy.conx'¶
- get(name, sql=None, chunksize=None, raw=False, sqlvars=None, secrets=None, index=True, keep=None, lazy=False, table=None, trusted=False, *args, **kwargs)¶
retrieve a stored connection or query data from connection
- Parameters:
name (str) – the name of the connection
secrets (dict) – dict to resolve variables in the connection string
keep (bool) – if True connection is kept open, defaults to True (change default by setting om.defaults.SQLALCHEMY_ALWAYS_CACHE = False)
table (str) – the name of the table, will be prefixed with the store’s bucket name unless the table is specified as ‘:name’
trusted (bool|str) – if passed must be the value for store.sign(sqlvars or kwargs), otherwise a warning is issued for any remaining variables in the sql statement
- Returns:
connection
To query data and return a DataFrame, specify
sql='select ...'
:- Parameters:
sql (str) – the sql query, defaults to the query specific on .put()
chunksize (int) – the number of records for each chunk, if specified returns an iterator
sqlvars (dict) – optional, if specified will be used to format sql
- Returns:
pd.DataFrame
To get the connection for a data query, instead of a DataFrame:
- Parameters:
raw (bool) – if True, returns the raw sql alchemy connection
keep (bool) – option, if True keeps the connection open. Lazy=True implies keep=True. This is potentially unsafe in a multi-user environment where connection strings contain user-specific secrets. To always keep connections open, set
om.datasets.defaults.SQLALCHEMY_ALWAYS_CACHE=True
- Returns:
connection
To get a cursor for a data query, instead of a DataFrame. Note this implies keep=True.
- Parameters:
lazy (bool) – if True, returns a cursor instead of a DataFrame
sql (str) – the sql query, defaults to the query specific on .put()
- Returns:
cursor
- put(obj, name, sql=None, copy=False, append=True, chunksize=None, transform=None, table=None, attributes=None, insert=False, secrets=None, *args, **kwargs)¶
store sqlalchemy connection or insert data into an existing connection
- Parameters:
obj (str|pd.DataFrame) – the sqlalchemy connection string or a dataframe object
name (str) – the name of the object
table (str) – optional, if specified is stored along connection
sql (str) – optional, if specified is stored along connection
copy (bool) – optional, if True the connection is queried using sql and the resulting data is stored instead, see below
attributes (dict) – optional, set or update metadata.attributes
- Returns:
metadata of the stored connection
Instead of inserting the connection specify
copy=True
to query data and store it as a DataFrame dataset given byname
:- Parameters:
sql (str) – sql to query
append (bool) – if True the data is appended if exists already
chunksize (int) – number of records to query in each chunk
transform (callable) – passed as DataFrame.to_sql(method=)
- Returns:
metadata of the inserted dataframe
To insert data via a previously stored connection, specify
insert=True
:- Parameters:
insert (bool) – specify True to insert via the connection
table (str) – the table name to use for inserting data
append (bool) – if False will replace any existing table, defaults to True
index (bool) – if False will not attempt to create an index in target, defaults to False
chunksize (int) – number of records to insert in each chunk
- Returns:
metadata of the connection
- classmethod supports(obj, name, insert=False, data_store=None, model_store=None, *args, **kwargs)¶
test if this backend supports this obj
- class omegaml.backends.npndarray.NumpyNDArrayBackend(model_store=None, data_store=None, tracking=None, **kwargs)¶
Store numpy NDArray of any shape or size
The NDArray is serialized to a byte string and stored as a BLOB. Thus it can have arbitrary size and dimensions, ideal for image data and Tensors.
- KIND = 'ndarray.bin'¶
- get(name, version=-1, force_python=False, lazy=False, **kwargs)¶
get an obj
- Parameters:
name – the name of the object (str)
- Returns:
the object as it was originally stored
- put(obj, name, attributes=None, allow_pickle=False, **kwargs)¶
put an obj
- Parameters:
obj – the object to store (object)
name – the name of the object (str)
attributes – the attributes dict (dict, optional)
kwargs – other kwargs to be passed to the Metadata object
- Returns:
the Metadata object
- classmethod supports(obj, name, as_pydata=False, **kwargs)¶
test if this backend supports this obj
- class omegaml.backends.virtualobj.VirtualObjectBackend(model_store=None, data_store=None, tracking=None, **kwargs)¶
Support arbitrary functions as object handlers
Virtual object functions can be any callable that provides a __omega_virtual__ attribute. The callable must support the following signature:
@virtualobj def virtualobjfn(data=None, method='get|put|drop', meta=None, store=None, **kwargs): ... return data
Note that there is a distinction between storing the function as a virtual object, and passing data in or getting data out of the store. It is the responsibility of the function to implement the appropriate code for get, put, drop, as well as to keep track of the data it actually stores.
As a convenience virtual object handlers can be implemented as a subclass of VirtualObjectHandler
Usage:
1) as a virtual data handler # create the 'foo' virtual object om.datasets.put(virtualobjfn, 'foo') # get data from the virtualobj om.datasets.get('foo') => will call virtualobjfn(method='get') # put data into the virtualobj om.datasets.put(data, 'foo') => will call virtualobjfn(data=data, method='put') # drop the virtualfn om.datasets.drop('name') => will call virtualobjfn(method='drop') and then drop the virtual object completely from the storage 2) as a virtual model # create the mymodel model as a virtualobj om.models.put(virtualobjfn, 'mymodel') # run the model's predict() function om.runtime.model('mymodel').predict(X) => will call virtualobjfn(method='predict') 3) as a virtual script # create the myscript script as a virtualobj om.models.put(virtualobjfn, 'myscript') # run the script om.runtime.script('myscript').run() => will call virtualobjfn(method='run')
Warning
Virtual objects are executed in the address space of the client or runtime context. Make sure that the source of the code is trustworthy. Note that this is different from Backends and Mixins as these are pro-actively enabled by the administrator of the client or runtime context, respectively - virtual objects can be injected by anyone who are authorized to write data.
- KIND = 'virtualobj.dill'¶
- get(name, version=-1, force_python=False, lazy=False, **kwargs)¶
get an obj
- Parameters:
name – the name of the object (str)
- Returns:
the object as it was originally stored
- put(obj, name, attributes=None, dill_kwargs=None, as_source=False, **kwargs)¶
put an obj
- Parameters:
obj – the object to store (object)
name – the name of the object (str)
attributes – the attributes dict (dict, optional)
kwargs – other kwargs to be passed to the Metadata object
- Returns:
the Metadata object
- reduce(modelname, results, rName=None, **kwargs)¶
reduce a list of results to a single result
Use this as the last step in a task canvas
- Parameters:
modelname (str) – the name of the virtualobj
results (list) – the list of results forwarded by task canvas
rName (result) – the name of the result object
**kwargs
- Returns:
result of the virtualobj handler
- See Also
om.runtime.mapreduce
- classmethod supports(obj, name, **kwargs)¶
test if this backend supports this obj
- class omegaml.backends.rawdict.PandasRawDictBackend(model_store=None, data_store=None, tracking=None, **kwargs)¶
OmegaStore backend to support arbitrary collections
Usage:
# store any collection as part of metadata coll = db['some_collection'] om.datasets.put(coll, 'foo') => Metadata(name='foo', collection='some_collection', ...) # parse the collection using pandas.io.json_normalize df = om.datasets.get('foo') # use an alternate parser that accepts dict|list(dict) df = om.datasets.get('foo', parser=some_fn) # get a MDataFrame om.datasets.getl('foo') # preserve all document keys, including _id om.datasets.getl('foo', raw=True)
- KIND = 'pandas.rawdict'¶
- get(name, version=-1, lazy=False, raw=False, parser=None, filter=None, **kwargs)¶
get an obj
- Parameters:
name – the name of the object (str)
- Returns:
the object as it was originally stored
- put(obj, name, attributes=None, as_raw=None, **kwargs)¶
put an obj
- Parameters:
obj – the object to store (object)
name – the name of the object (str)
attributes – the attributes dict (dict, optional)
kwargs – other kwargs to be passed to the Metadata object
- Returns:
the Metadata object
- classmethod supports(obj, name, as_raw=None, data_store=None, **kwargs)¶
test if this backend supports this obj
- class omegaml.backends.rawfiles.PythonRawFileBackend(model_store=None, data_store=None, tracking=None, **kwargs)¶
OmegaStore backend to support arbitrary files
- KIND = 'python.file'¶
- get(name, local=None, mode='wb', open_kwargs=None, **kwargs)¶
get a stored file as a file-like object with binary contents or a local file
- Parameters:
name (str) – the name of the file
local (str) – if set the local path will be created and the file stored there. If local does not have an extension it is assumed to be a directory name, in which case the file is stored as the same name.
mode (str) – the mode to use on .open() for the local file
open_kwargs (dict) – the kwargs to use .open() for the local file
**kwargs – any kwargs passed to datasets.metadata()
- Returns:
the file-like output handler (local is None) the path to the local file (local is given)
- put(obj, name, attributes=None, encoding=None, **kwargs)¶
put an obj
- Parameters:
obj – the object to store (object)
name – the name of the object (str)
attributes – the attributes dict (dict, optional)
kwargs – other kwargs to be passed to the Metadata object
- Returns:
the Metadata object
- classmethod supports(obj, name, open_kwargs=None, **kwargs)¶
test if this backend supports this obj
Mixins¶
- class omegaml.mixins.store.ProjectedMixin¶
A OmegaStore mixin to process column specifications in dataset name
- get(name, *args, **kwargs)¶
Return a projected dataset given a name of form name[colspec]
colspec can be any of
a comma separated list of columns, e.g.
foo[a,b]
an open-ended slice, e.g.
foo[a:]
=> all columns following a, inclusivean closed slice, e.g.
foo[a:b]
=> all columns between a,b, inclusivea close-ended slice, e.g.
foo[:b]
=> all columns up to b, inclusivean empty slice, e.g.
foo[:]
=> all columnsa list of columns to exclude, e.g.
foo[^b]
=> all columns except b
- Parameters:
name – (str) the name of the dataset, optionally including a column specification
- Returns:
the dataset with projected columns
- class omegaml.mixins.store.LazyGetMixin¶
OmegaStore mixin to support chunked lazy get via name
Usage:
equivalent of
om.datasets.get('foo', lazy=True).iterchunks()
:mdf = om.datasets.get('foo#') mdf = om.datasets.get('foo#iterchunks')
equivalent of
om.datasets.get('foo', lazy=True).iterchunks(chunksize=10)
:mdf = om.datasets.get('foo#iterchunks:chunksize=10')
equivalent of
om.datasets.get('foo', lazy=True).iloc[0:10]
:mdf = om.datasets.get('foo#rows:start=1,end=10')
- class omegaml.mixins.store.virtualobj.VirtualObjectMixin¶
process virtual objects
This checks if an object is a VirtualObject and if so retrieves the handler and processes it.
- class omegaml.mixins.store.promotion.PromotionMixin¶
Promote objects from one bucket to another
- Promotion Methods:
- getput - performs target.put(source.get()) and copies metadata
attributes by merging target.metadata’s .attributes and .kind_meta
- metadata - creates a new metadata data entry in target, copying
metadata attributes and kind_meta. Does not get/put the object itself (i.e. no associated data is promoted).
data - like getput, but does not merge metadata
- export - performs .to_archive() and .from_archive(), effectively
copying metadata, the associated gridfile (if available) and collection data (if available). This is equivalent to om runtime export.
The default promotion method is getput(), or the object’s backend.PROMOTE method, if specified.
Some object backends provide a default promotion other than getput:
- sqlalchemy.conx - uses the metadata promotion, effectively copying only
metadata. Use promote(…, method=’metadata,data’) to also promote data
- virtualobj.dill - uses the export promotion, effectively copying all
metadata and the associated @virtualobj function. If the source object is a versioned model, this copies the current version and metadata. To copy a specific version, use promote(‘model@version’). To create a new version in the target bucket use promote(…, method=’getput’).
- promote(name, other, asname=None, drop=None, method='default', get=None, put=None, **kwargs)¶
Promote object to another store.
This effectively copies the object. If the objects exists in the target it will be replaced.
- Parameters:
name – The name of the object
other – the OmegaStore instance to promote to
asname – the name to use in other, defaults to .metadata(name).name
drop (bool) – if True calls other.drop(force=True) before promoting, defaults to False
method (str|list) – specify the method or multiple methods in sequence, available methods are ‘default’, ‘getput’, ‘metadata’, ‘data’. For ‘default’, the object backend’s .PROMOTE property is used, defaulting to ‘getput’
get (dict) – optional, specifies the store.get(**kwargs)
put (dict) – optional, specifies the other.put(**kwargs)
kwargs – additional kwargs are passed to the initial other.put(), for metadata promotion
- Returns:
The Metadata of the new object
- class omegaml.mixins.mdf.iotools.IOToolsStoreMixin¶
- read_csv(csvfn, name, chunksize=10000, append=False, apply=None, mode='r', open_kwargs=None, **kwargs)¶
read large files from s3, hdfs, http/s, sftp, scp, ssh, write to om.datasets
Usage:
To insert a local csv file into a dataset:
om.datasets.read_csv('/path/filename.csv', 'dataset-name')
To insert a file stored in any of the supported locations specify its fully qualified location and filename. The specific format must be specified according to the smart_open library:
om.datasets.read_csv('https://...', 'dataset-name') om.datasets.read_csv('s3://...', 'dataset-name') om.datasets.read_csv('hdfs://...', 'dataset-name') om.datasets.read_csv('sftp://...', 'dataset-name') om.datasets.read_csv('scp://...', 'dataset-name') om.datasets.read_csv('ssh://...', 'dataset-name')
Optionally define a function to receives each chunk as a dataframe and apply further processing (e.g. transformations, filtering):
def process(df): # apply any processing to df return df om.datasets.read_csv(...., apply=process)
- Parameters:
csvfn (str) – the fully qualified path and name of the csv file, according to the smart_open library
chunksize (int) – the size of each chunk processed before writing to the dataset
append (bool) – if True, appends to the dataset. defaults to False
apply (callable) – if specified, each chunk is forwarded as a DataFrame and the returned result is inserted to the dataset. Use this for transformations or filtering
mode (str) – file open mode, defaults to r
open_kwargs (dict) – additional kwargs to smart_open
**kwargs – additional kwargs are passed to
pandas.read_csv
- Returns:
MDataFrame
See also
pandas.read_csv
- to_csv(name, csvfn, chunksize=10000, apply=None, mode='w', open_kwargs=None, **kwargs)¶
write any dataframe to s3, hdfs, http/s, sftp, scp, ssh
Usage:
To write a dataframe:
om.datasets.write_csv('dataframe-dataset', '/path/to/filename')
To write a large dataframe in chunks:
om.datasets.write_csv(‘dataframe-dataset’, ‘/path/to/filename’, chunksize=100)
- Parameters:
name (str) – the name of the dataframe dataset
csvfn (str) – the fully qualified path and name of the csv file, according to the smart_open library
chunksize (int) – the size of each chunk processed before writing to the file
apply (callable) – if specified, each chunk is forwarded as a DataFrame and the returned result is written to the file. Use this for transformations or filtering
mode (str) – file open mode, defaults to w
open_kwargs (dict) – additional kwargs to smart_open
**kwargs – additional kwargs are passed to
pandas.to_csv
See also
pandas.to_csv