Working with MDataFrame

A short guide

What is MDataFrame?

  • MDataFrame provides a Pandas-like API to omega|ml’s analytics storage (backed by MongoDB). The key thing about MDataFrames is that they represent the description of data and processes applied to the data, but do not contain data itself. In this sense MDataFrame is the same for MongoDB as SQL is for a relational database: a query language.

  • Like Pandas DataFrame, MDataFrame provides convenient operations for querying, subsetting, and grouping data. The key difference is that while Pandas must always load all data into memory before operating on it, MDataFrame passes operations on to the database. The result of such operations is typically a Pandas DataFrame, or another MDataFrame.

  • For cases where the we need more complex processing than the database supports, MDataFrame can run a Python function on the data in parallel.

  • Note that MDataFrame is not a drop-in replacement for Pandas DataFrame.

Concepts

  • MDataFrame: reprents a columnar datastructure made of of columns and rows

  • MSeries: represents all values in a single column

In addition there are several helpers that provide specific functionality, e.g. for positional access, slicing and group-by processing. We don’t specify the details of these helpers as they are tied into the MDataFrame and MSeries API.

  • .value resolves the result of all operations, e.g. filtering, slicing, aggregation to a local in-memory pandas dataframe (useful for exploratory tasks and groupby aggregation)

  • .apply() provides in-database processing (e.g. filtering, slicing, aggregation)

  • .transform() provides parallel out-of-core, chunk-by-chunk processing (use for large datasets)

Storing data as an MDataFrame

You can store any of the following objects and return them as an MDataFrame:

  • Pandas Dataframe (and any source that Pandas supports): om.datasets.put(df, 'name')

  • Any sql source supported by the (e.g. snowflake) om.datasets.put('connstr', 'name')

  • CSV files from an externally hosted source (ftp, http, s3): om.datasets.put(None, 'name',uri='http://....')

  • Any other tabular-like data that you insert into the analytics store (i.e. MongoDB): om.datasets.put(docs, 'name')

Note in general the MDataFrame is not dependent on the original source of the data, but on the format it is stored in omega|ml’s analytics storage. As long as the data can be retrieved back and transformed to a format Pandas can work with, MDataFrame will be able to handle it. That said, it works easiest with tabular data of rows and columns where each cell has some scalar value.

Getting an MDataFrame

The following methods are equivalent, both return an instance of MDataFrame

mdf = om.datasets.getl('name')
mdf = om.datasets.get('name', lazy=True)

Using any of these methods, mdf will represent the MDataFrame instance. Note that this will return immediately as no data access happens at this time.

Executing a query

To actually get data from a MDataFrame you need to ask for evaluation. This will execute the query according to all operations applied so far. The result is a standard Pandas DataFrame:

mdf.value

Note this can be a dangerous operation as it will load all the data into memory If the result of your query is larger than the available memory of your process, it will fail and result in an operating-system level out of memory condition. If you are unsure how many rows a query will return, try using .count() first.

Persisting the result of a query

To evaluate an MDataFrame without returning all data into memory, use the .persist() method

mdf.persist('name', store=om.datasets)

This is the equivalent of df = mdf.value; om.datasets.put(df, 'name'), however all operations are performed in the database, results are retrieved back to memory only if needed, and if so in small chunks.

Slicing

Like Pandas DataFrame, MDataFrame can be sliced

  • by a set of columns: mdf[['col1', 'col2']] => return a MDataFrame subset to col1, col2

  • by single columns mdf['col1'] => return a MSeries

  • by rows mdf.iloc[start:end] => return a MDataFrame subset to rows with index start to end.

  • by index mdf.loc[label] => return a MDataFrame subset to columns with corresponding labels

  • by filter mdf[filter-mask]=> return a MDataFrame subset to the filter mask

Note that .loc, .iloc require the data to have been stored from a Pandas DataFrame.

Filtering

By filter masks

flt = mdf['column'] == value  # use any operator supported by MSeries
mdf[flt]

Filtering can be done by using a combination of keyword__<operator>=<value

Aggregation and transformation

MDataFrame provides a powerful set of aggregations:

  • in-database or local groupby processing mdf.groupby

  • in-database transformation and aggregation mdf.apply()

  • out-of-core parallel processing mdf.transform()

In-database transformations

Using MDataFrame.apply() we can apply several column-wise transformations. Note that the function passed to apply must accept an ApplyContext.

mdf.iloc[0:1000].apply(lambda v: {
    'key'      : v['l_orderkey'],
    'comment': v['l_comment'].str.concat(' *'),
    'docs': v['l_shipinstruct'].str.usplit(' '),
    'comment_lower': v['l_shipinstruct'].str.lower(),
    'comment_substr': v['l_shipinstruct'].str.substr(1, 5),
    'week': v['l_shipdate'].dt.week,
    'year': v['l_shipdate'].dt.year,
}, inplace=True).value

Note: Unlike a Pandas apply which executes the function for every row or column, MDataFrame will execute the function only once in order to build the database query. If you want to execute Python code row-by-row, or group-by-group, use .tranform(), see below.

Parallel transformations

MDataFrame supports in-parallel processing of arbitrary subsets and size of data. By default, the subset will be by row number, but any other grouping is possible.

The following snipped will start N / chunksize tasks and process them in parallel. Each task processes N / chunksize records. The default chunksize is 50’000. The number of parallel jobs started by default is CPU count - 1.

def myproc(df):
    df['column'] = df['other'].apply(...)

mdf.transform(myproc).persist('name', store=om.datasets)

More explanations:

def myproc(df, i):
    # df is the subset of the ith chunk of the full data
    # it is a Pandas in-memory DataFrame, apply any Pandas function you like
    # assignment is supported
    df['column'] = df['other'].apply(...)
    ...
    # groupby is also possible
    result = df.groupby(...)...
    # either return None (or no return statement) => updated df is written to the db
    # or return a DataFrame or a Series => returned object is written to the db
    return result

# this will start N = len(mdf) / 50'000 tasks and store the results in om.datasets
# conceptually this is the equivalence of df = mdf.value.apply(myproc); om.datasets.put(df)
# however using mdf.transform() will use much less memory and easily scale out of core
mdf.transform(myproc).persist('name', store=om.datasets)

# specify chunksize and n_jobs to influence the number of chunks and the number of parallel workers.
# note this comes at a trade-off: many workers will take longer to complete, larger chunksizes will use more memory
mdf.transform(myproc, chunksize=<#records>, n_jobs=#numbers).persist('name', store=om.datasets)

Customized chunking

By default .transform() uses the size of the data (as in number of rows) do determine the number of chunks. You can however create any number chunks:

mdf = om.datasets.getl('retail')

def process(ldf):
    ldf['comments'] = ldf['l_comment'].str.split(' ')


def chunker(mdf, chunksize, maxobs):
    # for each chunk yield a MDataFrame subset for each chunk
    # note: don't use .value before yielding as this would resolve the dataframe locally
    #       and potentially consume all memory.
    groups = mdf['l_returnflag'].unique().value
    for group in groups:
        for i in range(0, maxobs, chunksize):
            yield mdf.skip(i).head(chunksize).query(l_returnflag=group)

(mdf
 .transform(process, chunkfn=chunker, n_jobs=-2)
 .persist('retail-transformed', store=om.datasets))

Lazy evaluation

Using lazy evaluation we can get back a proxy DataFrame, an MDataFrame, which provides many of the features of a Pandas DataFrame including .loc indexing and slicing, column projection and aggregation. All of these operations, however, are executed by the database and thus support out-of-core sized DataFrames, that is DataFrames of arbitrary size.

# ask for a reference to the dfx dataset with lazy evaluation
om.datasets.get('dfx', lazy=True)
=>
<omegaml.mdataframe.MDataFrame at 0x7fa3e991ee48>

# same thing, getl is convenience method that automatically specifies lazy=True
om.datasets.getl('dfx')
=>
<omegaml.mdataframe.MDataFrame at 0x7fa3e991ee48>

MDataFrame in many ways behaves like a normal dataframe, however the evaluation of operations is _lazy_ and is executed by the database as opposed to in-memory. This allows us to process data that is larger than memory.

In order to evaluate MDataFrame and return an actual pandas.DataFrame just access the .value property:

om.datasets.get('dfx', lazy=True).value
=>
    x  y
 0  0  0
 1  1  1
 2  2  2
 3  3  3
 4  4  4

What won’t work

MDataFrame are currently read-only. In other words, assignment, column additions and smilar operations are not currently supported. This is not an inherent restriction, there is just no API for it in the current implementation. Note if updates are required, the MDataFrame plugin mechanism provides a straight-forward way to provide such functionality.

Hence the following kind of operations are not currently supported:

mdf[col] = mdf[col].apply(func)
mdf[col] = mdf[col].map(func)
mdf[col] = value # partial support is available, but limited to scalar values