Aggregation Framework

omega-ml provides a rich aggregation framework that leverages MongoDB’s aggregate operator while keeping the ease-of-use of Pandas syntax. Typical Pandas aggregation operations like group-by and descriptive statistics have direct equivalents in omegal|ml with the same or very similar syntax, using MDataFrame.groupby:

Standard Groupby aggregation

mdf = om.datasets.getl('dfx')
mdf.groupby('x').x.mean().head(5)
=>
     x_mean
x
0     0.0
1     1.0
2     2.0
3     3.0
4     4.0

Multiple aggregations can be applied at once by the agg() method:

mdf = om.datasets.getl('dfx')
print(mdf.groupby('x').agg(dict(x='sum', y='mean')).head(5))

The following aggregations are currently supported:

  • sum - sum

  • mean or avg - mean

  • max - the max value in the group

  • min - the min value in the group

  • std - standard deviation in the sample

  • first - the first in the group

  • last - the last in the group

Motivating example

If the standard operations provided in MDataFrame.groupby do not provide the required functionality, custom operators or chains of operators can be easily applied using the MDataFrame.apply() functionality. Much like Pandas DataFrame.apply, MDataFrame.apply takes a callable that operates on the data:

# apply to all columns on a dataframe
mdf.apply(lambda ctx: ctx * 5)

In this example, the lambda will multiply every value in the dataframe by 5. All math operators (*, +, -, /, % etc.) are supported, as well as a number of other operations.

Note

Unlike with Pandas, the callable passed to .apply() is not executed on every row. Instead the callable is executed once during preparation of the MongoDB query. The callable receives an ApplyContext which is responsible for translating requested operations to MongoDB query syntax when the dataframe is resolved by accessing the .value property. Call MDataFrame.inspect() to see the actual MongoDB query.

.apply() can be called either on a MDataFrame or on a MSeries. Further, when called on a MDataFrame, the operations specified are applied to all columns. When called on a MSeries or when the column is selected from the ApplyContext, the operations are applied only to the one column.

# apply to all columns on a dataframe
mdf.apply(lambda ctx: ctx * 5)

# apply to a column, returning a series
mdf['x'].apply(lambda ctx: ctx * 5)

# apply to a column, return a dataframe
mdf.apply(lambda ctx: ctx['x'] * 5)

Math operations

All standard Python math operators are supported, in particular:

  • __mul__ (*)

  • __add__ (+)

  • __sub__ (-)

  • __div__ (/)

  • __floordiv__ (//)

  • __mod__ (%)

  • __pow__ (pow)

  • __ceil__ (ceil)

  • __floor__ (floor)

  • __trunc__ (trunc)

  • __abs__ (abs)

  • sqrt (math.sqrt)

Math operators can be chained. While operator priority is taken care of by the Python compiler, you should use brackets to ensure readability and correct operations in special scenarios:

# while this works, it is not recommended syntax
mdf.apply(lambda ctx: ctx * 5 + 2)

# recommended
mdf.apply(lambda ctx: (ctx * 5) + 2)

Datetime Operators

mdf.apply(lambda ctx: ctx['v'].dt.year)
mdf.apply(lambda ctx: ctx['v'].dt.month)
mdf.apply(lambda ctx: ctx['v'].dt.week)
mdf.apply(lambda ctx: ctx['v'].dt.day)
mdf.apply(lambda ctx: ctx['v'].dt.hour)
mdf.apply(lambda ctx: ctx['v'].dt.minute)
mdf.apply(lambda ctx: ctx['v'].dt.second)
mdf.apply(lambda ctx: ctx['v'].dt.millisecond)
mdf.apply(lambda ctx: ctx['v'].dt.dayofyear)
mdf.apply(lambda ctx: ctx['v'].dt.dayofweek)

String Operators

mdf.apply(lambda ctx: ctx['v'].str.len())
mdf.apply(lambda ctx: ctx['v'].str.concat(['xyz']))
mdf.apply(lambda ctx: ctx['v'].str.split(','))
mdf.apply(lambda ctx: ctx['v'].str.upper())
mdf.apply(lambda ctx: ctx['v'].str.lower())
mdf.apply(lambda ctx: ctx['v'].str.substr(start, end))
mdf.apply(lambda ctx: ctx['v'].str.isequal('string')
mdf.apply(lambda ctx: ctx['v'].str.index('substring'))

Cached operations

Any apply() call results can be cached to speed-up future queries. To do so call persist():

mdf.apply(...).persist()

Any subsequent call to the same apply operations, .value will retrieve the results from the results produced by persist(). Note that persist() returns the cache key, not the actual results.

Note

Using cached operations can tremendously speed up data science work flows for complex aggregation queries that need to be executed repeatedly or are common in your scenario. As an example, consider an aggregation on a 50GB dataset that takes several minutes to compute. Using persist() this calculation can be executed once and stored for subsequent and automatic retrieval by anyone on your team.

Complex operations

MDataFrame.groupby supports only few descriptive statics, namely mean(), std(), min(), max() since these are the MongoDB-provided operations. However using .apply() more complex operators can be easily created. See the MongoDB aggregation reference for details on syntax.

Multiple statistics can be calculated for the same column:

mdf.apply(lambda ctx: ctx.groupby('x', v=['sum', 'mean', 'std'])

Custom statistics using MongoDB syntax

# specify the groupby in mongo db syntax
expr = {'$sum': '$v'}
# add a stage
mdf.apply(labmda ctx: ctx.groupby('x', v=expr)

Parallel execution of multiple calculations:

mdf.apply(lambda ctx: dict(a=ctx['v'] * 5, b=ctx['v'] / 2))

Custom projections:

mdf.apply(lambda ctx: ctx.project(a={'$divide': ['$v', 2]}))

Arbitrary pipeline stages:

# specify the stage in mongo db syntax
stage = {
    '$<stage>': { '<$operator>' : .... }
}
# add a stage
mdf.apply(labmda ctx: ctx.add(stage))

Note

The callable to apply() shall return any of the following result types:

  • None - this is equivalent to returning the ApplyContext passed on calling

  • ApplyContext - the context will be used to generate the stages passed to MongoDB’s aggregate()

  • dict - a mapping of result-column names to an ApplyContext or a valid list of stages in MongoDB-syntax

  • list - a list of stages in MongoDB-syntax