Working with MDataFrame ======================= A short guide .. contents:: What is MDataFrame? ------------------- - ``MDataFrame`` provides a Pandas-like API to omega|ml’s analytics storage (backed by MongoDB). The key thing about ``MDataFrames`` is that they represent the *description of data and processes applied to the data*, but do not contain data itself. In this sense ``MDataFrame`` is the same for MongoDB as SQL is for a relational database: a query language. - Like Pandas ``DataFrame``, ``MDataFrame`` provides convenient operations for querying, subsetting, and grouping data. The key difference is that while Pandas must always load all data into memory before operating on it, ``MDataFrame`` passes operations on to the database. The result of such operations is typically a Pandas ``DataFrame``, or another ``MDataFrame``. - For cases where the we need more complex processing than the database supports, ``MDataFrame`` can run a Python function on the data in parallel. - Note that ``MDataFrame`` *is not* a drop-in replacement for Pandas ``DataFrame``. Concepts -------- - ``MDataFrame``: reprents a columnar datastructure made of of columns and rows - ``MSeries``: represents all values in a single column In addition there are several helpers that provide specific functionality, e.g. for positional access, slicing and group-by processing. We don’t specify the details of these helpers as they are tied into the MDataFrame and MSeries API. - ``.value`` resolves the result of all operations, e.g. filtering, slicing, aggregation to a local in-memory pandas dataframe (useful for exploratory tasks and groupby aggregation) - ``.apply()`` provides in-database processing (e.g. filtering, slicing, aggregation) - ``.transform()`` provides parallel out-of-core, chunk-by-chunk processing (use for large datasets) Storing data as an MDataFrame ----------------------------- You can store any of the following objects and return them as an MDataFrame: - Pandas Dataframe (and any source that Pandas supports): ``om.datasets.put(df, 'name')`` - Any sql source supported by the (e.g. snowflake) ``om.datasets.put('connstr', 'name')`` - CSV files from an externally hosted source (ftp, http, s3): ``om.datasets.put(None, 'name',uri='http://....')`` - Any other tabular-like data that you insert into the analytics store (i.e. MongoDB): ``om.datasets.put(docs, 'name')`` Note in general the MDataFrame is not dependent on the original *source* of the data, but on the format it is stored in omega|ml’s analytics storage. As long as the data can be retrieved back and transformed to a format Pandas can work with, MDataFrame will be able to handle it. That said, it works easiest with tabular data of rows and columns where each cell has some scalar value. Getting an MDataFrame --------------------- The following methods are equivalent, both return an instance of ``MDataFrame`` :: mdf = om.datasets.getl('name') mdf = om.datasets.get('name', lazy=True) Using any of these methods, ``mdf`` will represent the ``MDataFrame`` instance. Note that this will return immediately as no data access happens at this time. Executing a query ----------------- To actually get data from a MDataFrame you need to ask for evaluation. This will execute the query according to all operations applied so far. The result is a standard Pandas DataFrame: :: mdf.value **Note this can be a dangerous operation as it will load all the data into memory** If the result of your query is larger than the available memory of your process, it will fail and result in an operating-system level out of memory condition. If you are unsure how many rows a query will return, try using ``.count()`` first. Persisting the result of a query -------------------------------- To evaluate an MDataFrame without returning all data into memory, use the ``.persist()`` method :: mdf.persist('name', store=om.datasets) This is the equivalent of ``df = mdf.value; om.datasets.put(df, 'name')``, however all operations are performed in the database, results are retrieved back to memory only if needed, and if so in small chunks. Slicing ------- Like Pandas DataFrame, MDataFrame can be sliced - by a set of columns: ``mdf[['col1', 'col2']]`` => return a MDataFrame subset to col1, col2 - by single columns ``mdf['col1']`` => return a MSeries - by rows ``mdf.iloc[start:end]`` => return a MDataFrame subset to rows with index start to end. - by index ``mdf.loc[label]`` => return a MDataFrame subset to columns with corresponding labels - by filter ``mdf[filter-mask]``\ => return a MDataFrame subset to the filter mask Note that ``.loc, .iloc`` require the data to have been stored from a Pandas DataFrame. Filtering --------- *By filter masks* :: flt = mdf['column'] == value # use any operator supported by MSeries mdf[flt] Filtering can be done by using a combination of ``keyword__= updated df is written to the db # or return a DataFrame or a Series => returned object is written to the db return result # this will start N = len(mdf) / 50'000 tasks and store the results in om.datasets # conceptually this is the equivalence of df = mdf.value.apply(myproc); om.datasets.put(df) # however using mdf.transform() will use much less memory and easily scale out of core mdf.transform(myproc).persist('name', store=om.datasets) # specify chunksize and n_jobs to influence the number of chunks and the number of parallel workers. # note this comes at a trade-off: many workers will take longer to complete, larger chunksizes will use more memory mdf.transform(myproc, chunksize=<#records>, n_jobs=#numbers).persist('name', store=om.datasets) Customized chunking ------------------- By default ``.transform()`` uses the size of the data (as in number of rows) do determine the number of chunks. You can however create any number chunks: :: mdf = om.datasets.getl('retail') def process(ldf): ldf['comments'] = ldf['l_comment'].str.split(' ') def chunker(mdf, chunksize, maxobs): # for each chunk yield a MDataFrame subset for each chunk # note: don't use .value before yielding as this would resolve the dataframe locally # and potentially consume all memory. groups = mdf['l_returnflag'].unique().value for group in groups: for i in range(0, maxobs, chunksize): yield mdf.skip(i).head(chunksize).query(l_returnflag=group) (mdf .transform(process, chunkfn=chunker, n_jobs=-2) .persist('retail-transformed', store=om.datasets)) Lazy evaluation --------------- Using lazy evaluation we can get back a proxy DataFrame, an :code:`MDataFrame`, which provides many of the features of a Pandas DataFrame including :code:`.loc` indexing and slicing, column projection and aggregation. All of these operations, however, are executed by the database and thus support out-of-core sized DataFrames, that is DataFrames of arbitrary size. .. code:: # ask for a reference to the dfx dataset with lazy evaluation om.datasets.get('dfx', lazy=True) => # same thing, getl is convenience method that automatically specifies lazy=True om.datasets.getl('dfx') => :code:`MDataFrame` in many ways behaves like a normal dataframe, however the evaluation of operations is _lazy_ and is executed by the database as opposed to in-memory. This allows us to process data that is larger than memory. In order to evaluate :code:`MDataFrame` and return an actual :code:`pandas.DataFrame` just access the :code:`.value` property: .. code:: om.datasets.get('dfx', lazy=True).value => x y 0 0 0 1 1 1 2 2 2 3 3 3 4 4 4 What won’t work --------------- *MDataFrame are currently read-only. In other words, assignment, column additions and smilar operations are not currently supported. This is not an inherent restriction, there is just no API for it in the current implementation. Note if updates are required, the MDataFrame plugin mechanism provides a straight-forward way to provide such functionality.* Hence the following kind of operations are **not currently supported**: :: mdf[col] = mdf[col].apply(func) mdf[col] = mdf[col].map(func) mdf[col] = value # partial support is available, but limited to scalar values