Aggregation Framework¶
omega|ml provides a rich aggregation framework that leverages MongoDB’s aggregate operator while keeping
the ease-of-use of Pandas syntax. Typical Pandas aggregation operations like group-by and descriptive statistics
have direct equivalents in omegal|ml with the same or very similar syntax, using MDataFrame.groupby
:
Standard Groupby aggregation¶
mdf = om.datasets.getl('dfx')
mdf.groupby('x').x.mean().head(5)
=>
x_mean
x
0 0.0
1 1.0
2 2.0
3 3.0
4 4.0
Multiple aggregations can be applied at once by the agg()
method:
mdf = om.datasets.getl('dfx')
print(mdf.groupby('x').agg(dict(x='sum', y='mean')).head(5))
The following aggregations are currently supported:
sum
- summean
oravg
- meanmax
- the max value in the groupmin
- the min value in the groupstd
- standard deviation in the samplefirst
- the first in the grouplast
- the last in the group
Motivating example¶
If the standard operations provided in MDataFrame.groupby do not provide the required functionality, custom
operators or chains of operators can be easily applied using the MDataFrame.apply()
functionality. Much like
Pandas DataFrame.apply
, MDataFrame.apply
takes a callable that operates on the data:
# apply to all columns on a dataframe
mdf.apply(lambda ctx: ctx * 5)
In this example, the lambda will multiply every value in the dataframe by 5. All math operators (*, +, -, /, % etc.) are supported, as well as a number of other operations.
Note
Unlike with Pandas, the callable passed to .apply()
is not executed on every row. Instead the
callable is executed once during preparation of the MongoDB query. The callable receives an ApplyContext
which is responsible for translating requested operations to MongoDB query syntax when the dataframe is
resolved by accessing the .value
property. Call MDataFrame.inspect()
to see the actual MongoDB
query.
.apply()
can be called either on a MDataFrame
or on a MSeries
. Further, when called on a
MDataFrame, the operations specified are applied to all columns. When called on a MSeries or when the column is
selected from the ApplyContext
, the operations are applied only to the one column.
# apply to all columns on a dataframe
mdf.apply(lambda ctx: ctx * 5)
# apply to a column, returning a series
mdf['x'].apply(lambda ctx: ctx * 5)
# apply to a column, return a dataframe
mdf.apply(lambda ctx: ctx['x'] * 5)
Math operations¶
All standard Python math operators are supported, in particular:
__mul__
(*)__add__
(+)__sub__
(-)__div__
(/)__floordiv__
(//)__mod__
(%)__pow__
(pow)__ceil__
(ceil)__floor__
(floor)__trunc__
(trunc)__abs__
(abs)sqrt
(math.sqrt)
Math operators can be chained. While operator priority is taken care of by the Python compiler, you should use brackets to ensure readability and correct operations in special scenarios:
# while this works, it is not recommended syntax
mdf.apply(lambda ctx: ctx * 5 + 2)
# recommended
mdf.apply(lambda ctx: (ctx * 5) + 2)
Datetime Operators¶
mdf.apply(lambda ctx: ctx['v'].dt.year)
mdf.apply(lambda ctx: ctx['v'].dt.month)
mdf.apply(lambda ctx: ctx['v'].dt.week)
mdf.apply(lambda ctx: ctx['v'].dt.day)
mdf.apply(lambda ctx: ctx['v'].dt.hour)
mdf.apply(lambda ctx: ctx['v'].dt.minute)
mdf.apply(lambda ctx: ctx['v'].dt.second)
mdf.apply(lambda ctx: ctx['v'].dt.millisecond)
mdf.apply(lambda ctx: ctx['v'].dt.dayofyear)
mdf.apply(lambda ctx: ctx['v'].dt.dayofweek)
String Operators¶
mdf.apply(lambda ctx: ctx['v'].str.len())
mdf.apply(lambda ctx: ctx['v'].str.concat(['xyz']))
mdf.apply(lambda ctx: ctx['v'].str.split(','))
mdf.apply(lambda ctx: ctx['v'].str.upper())
mdf.apply(lambda ctx: ctx['v'].str.lower())
mdf.apply(lambda ctx: ctx['v'].str.substr(start, end))
mdf.apply(lambda ctx: ctx['v'].str.isequal('string')
mdf.apply(lambda ctx: ctx['v'].str.index('substring'))
Cached operations¶
Any apply()
call results can be cached to speed-up future queries. To do so call persist()
:
mdf.apply(...).persist()
Any subsequent call to the same apply operations, .value
will retrieve the results from the results
produced by persist()
. Note that persist()
returns the cache key, not the actual results.
Note
Using cached operations can tremendously speed up data science work flows for complex aggregation
queries that need to be executed repeatedly or are common in your scenario. As an example, consider
an aggregation on a 50GB dataset that takes several minutes to compute. Using persist()
this
calculation can be executed once and stored for subsequent and automatic retrieval by anyone on your team.
Complex operations¶
MDataFrame.groupby
supports only few descriptive statics, namely mean(), std(), min(), max()
since
these are the MongoDB-provided operations. However using .apply()
more complex operators can be easily
created. See the MongoDB aggregation reference for details on syntax.
Multiple statistics can be calculated for the same column:
mdf.apply(lambda ctx: ctx.groupby('x', v=['sum', 'mean', 'std'])
Custom statistics using MongoDB syntax
# specify the groupby in mongo db syntax
expr = {'$sum': '$v'}
# add a stage
mdf.apply(labmda ctx: ctx.groupby('x', v=expr)
Parallel execution of multiple calculations:
mdf.apply(lambda ctx: dict(a=ctx['v'] * 5, b=ctx['v'] / 2))
Custom projections:
mdf.apply(lambda ctx: ctx.project(a={'$divide': ['$v', 2]}))
Arbitrary pipeline stages:
# specify the stage in mongo db syntax
stage = {
'$<stage>': { '<$operator>' : .... }
}
# add a stage
mdf.apply(labmda ctx: ctx.add(stage))
Note
The callable to apply()
shall return any of the following result types:
None
- this is equivalent to returning theApplyContext
passed on callingApplyContext
- the context will be used to generate the stages passed to MongoDB’saggregate()
dict
- a mapping of result-column names to an ApplyContext or a valid list of stages in MongoDB-syntaxlist
- a list of stages in MongoDB-syntax