Source code for omegaml.mixins.mdf.apply

   1import hashlib
   2import json
   3import pandas as pd
   4from itertools import product
   5from uuid import uuid4
   6
   7from omegaml.documents import make_QueryCache
   8from omegaml.mdataframe import MDataFrame, MSeries
   9from omegaml.store import qops
  10from omegaml.store.filtered import FilteredCollection
  11from omegaml.util import make_tuple, extend_instance
  12
  13
[docs] 14class ApplyMixin(object): 15 """ 16 Implements the apply() mixin supporting arbitrary functions to build aggregation pipelines 17 18 Note that .apply() does not execute immediately. Instead it builds an aggregation pipeline 19 that is executed on MDataFrame.value. Note that .apply() calls cannot be cascaded yet, i.e. 20 a later .apply() will override a previous.apply(). 21 22 See ApplyContext for usage examples. 23 """ 24 25 def __init__(self, *args, **kwargs): 26 super(ApplyMixin, self).__init__(*args, **kwargs) 27 self._init_mixin(*args, **kwargs) 28 29 def _init_mixin(self, *args, **kwargs): 30 self.apply_fn = kwargs.get('apply_fn', None) 31 # set to True if the pipeline is a facet operation 32 self.is_from_facet = kwargs.get('is_from_facet', False) 33 # index columns 34 self.index_columns = kwargs.get('index_columns', []) 35 # db alias 36 self._db_alias = kwargs.get('db_alias', self._ensure_db_connection()) 37 # cache used on persist() 38 self.cache = kwargs.get('cache', ApplyCache(self._db_alias)) 39 40 def _ensure_db_connection(self): 41 from mongoengine.connection import _dbs, _connections 42 43 seek_db = self.collection.database 44 for alias, db in _dbs.items(): 45 if db is seek_db: 46 self._db_alias = alias 47 break 48 else: 49 # fake connection register 50 alias = self._db_alias = 'omega-{}'.format(uuid4().hex) 51 _connections[alias] = seek_db.client 52 _dbs[alias] = seek_db 53 return self._db_alias 54 55 def nocache(self): 56 self.cache = None 57 return self 58 59 def reset_cache(self, full=False): 60 """ 61 Reset the apply cache 62 63 :param full: if True will reset all caches for the collection, if False will only remove 64 the cache for the specific .apply operations 65 :return: 66 """ 67 QueryCache = make_QueryCache(db_alias=self._db_alias) 68 if full: 69 QueryCache.objects.filter(value__collection=self.collection.name).delete() 70 else: 71 pipeline = self._build_pipeline() 72 key = self._make_cache_key(self.collection, pipeline) 73 QueryCache.objects.filter(key=key).delete() 74 return self 75 76 def _make_cache_key(self, collection, pipeline): 77 # remove random output value 78 if '$out' in pipeline[-1] and pipeline[-1]['$out'].startswith('cache'): 79 pipeline = list(pipeline)[:-1] 80 spipeline = json.dumps(pipeline, sort_keys=True) 81 data = '{}_{}'.format(collection.name, spipeline).encode('utf-8') 82 # SEC: CWE-916 83 # - status: wontfix 84 # - reason: hashcode is used purely for name resolution, not a security function 85 key = hashlib.md5(data).hexdigest() 86 return key 87 88 def _getcopy_kwargs(self, **kwargs): 89 kwargs = super(ApplyMixin, self)._getcopy_kwargs(**kwargs) 90 kwargs.update(is_from_facet=self.is_from_facet, 91 index_columns=self.index_columns, 92 cache=self.cache, 93 apply_fn=self.apply_fn) 94 return kwargs 95 96 def noapply(self): 97 self.apply_fn = None 98 return self 99 100 def apply(self, fn, inplace=False, preparefn=None): 101 if inplace: 102 obj = self 103 else: 104 kwargs = self._getcopy_kwargs() 105 kwargs.update(preparefn=preparefn) 106 if isinstance(self, MSeries): 107 obj = MSeries(self.collection, **kwargs) 108 else: 109 obj = MDataFrame(self.collection, **kwargs) 110 obj.apply_fn = fn 111 return obj 112 113 def persist(self): 114 """ 115 Execute and store results in cache 116 117 Any pipeline of the same operations, in the same order, on 118 the same collection will return the same result. 119 """ 120 # generate a cache key 121 pipeline = self._build_pipeline() 122 key = self._make_cache_key(self.collection, pipeline) 123 outname = 'cache_{}'.format(uuid4().hex) 124 value = { 125 'collection': self.collection.name, 126 'result': outname, 127 } 128 # do usual processing, store result 129 # -- note we pass pipeline to avoid processing iterators twice 130 pipeline.append({ 131 '$out': outname, 132 }) 133 cursor = self._get_cursor(pipeline=pipeline, use_cache=False) 134 # consume cursor to store output (via $out) 135 for v in cursor: 136 pass 137 # set cache 138 self.cache.set(key, value) 139 return key 140 141 def set_index(self, columns): 142 self.index_columns = make_tuple(columns) 143 return self 144 145 def inspect(self, explain=False, *args, **kwargs): 146 if self.apply_fn: 147 details = { 148 'pipeline': self._build_pipeline() 149 } 150 if explain: 151 details.update(self.__dict__) 152 return details 153 return super(ApplyMixin, self).inspect(*args, explain=explain, **kwargs) 154 155 def _execute(self): 156 ctx = ApplyContext(self, columns=self.columns) 157 try: 158 result = self.apply_fn(ctx) 159 except Exception as e: 160 msg = [repr(stage) for stage in ctx.stages] + [repr(e)] 161 raise RuntimeError(msg) 162 if result is None or isinstance(result, ApplyContext): 163 result = result or ctx 164 self.index_columns = self.index_columns or result.index_columns 165 return result 166 elif isinstance(result, list): 167 return result 168 elif isinstance(result, dict): 169 # expect a mapping of col=ApplyContext each with its own list of stages 170 # -- build a combined context by adding each expression 171 # this ensures any multi-stage projections are carried forward 172 facets = {} 173 for col, expr in result.items(): 174 if isinstance(expr, ApplyContext): 175 facets[col] = list(expr) 176 project = { 177 '$project': { 178 col: '$' + expr.columns[0] 179 }, 180 } 181 facets[col].append(project) 182 else: 183 facets[col] = expr 184 facet = { 185 '$facet': facets 186 } 187 self.is_from_facet = True 188 return [facet] 189 raise ValueError('Cannot build pipeline from apply result of type {}'.format(type(result))) 190 191 def _build_pipeline(self): 192 pipeline = [] 193 stages = self._execute() 194 pipeline.extend(stages) 195 self._amend_pipeline(pipeline) 196 return pipeline 197 198 def _amend_pipeline(self, pipeline): 199 """ amend pipeline with default ops on coll.aggregate() calls """ 200 if self.sort_order: 201 sort = qops.SORT(**dict(qops.make_sortkey(self.sort_order))) 202 pipeline.append(sort) 203 return pipeline 204 205 def _get_cached_cursor(self, pipeline=None, use_cache=True): 206 pipeline = pipeline or self._build_pipeline() 207 if use_cache and self.cache: 208 key = self._make_cache_key(self.collection, pipeline) 209 entry = self.cache.get(key) 210 if entry is not None: 211 # read result 212 outname = entry.value['result'] 213 return self.collection.database[outname].find() 214 215 def _get_cursor(self, pipeline=None, use_cache=True): 216 # for apply functions, call the apply function, expecting a pipeline in return 217 if self.apply_fn: 218 pipeline = pipeline or self._build_pipeline() 219 cursor = self._get_cached_cursor(pipeline=pipeline, use_cache=use_cache) 220 if cursor is None: 221 filter_criteria = self._get_filter_criteria() 222 cursor = FilteredCollection(self.collection).aggregate(pipeline, filter=filter_criteria, allowDiskUse=True) 223 else: 224 cursor = super(ApplyMixin, self)._get_cursor() 225 return cursor 226 227 def _get_dataframe_from_cursor(self, cursor): 228 df = super(ApplyMixin, self)._get_dataframe_from_cursor(cursor) 229 if self.is_from_facet: 230 # if this was from a facet pipeline (i.e. multi-column mapping), combine 231 # $facet returns one document for each stage. 232 frames = [] 233 for col in df.columns: 234 coldf = pd.DataFrame(df[col].iloc[0]).set_index('_id') 235 frames.append(coldf) 236 df = pd.concat(frames, axis=1).reset_index() 237 df = self._restore_dataframe_proper(df) 238 # TODO write a unit test for this condition 239 if self.index_columns and all(col in df.columns for col in self.index_columns): 240 df.set_index(list(self.index_columns), inplace=True) 241 return df
242 243
[docs] 244class ApplyContext(object): 245 """ 246 Enable apply functions 247 248 .apply(fn) will call fn(ctx) where ctx is an ApplyContext. 249 The context supports methods to apply functions in a Pandas-style apply manner. ApplyContext is extensible 250 by adding an extension class to defaults.OMEGA_MDF_APPLY_MIXINS. 251 252 Note that unlike a Pandas DataFrame, ApplyContext does not itself contain any data. 253 Rather it is part of an expression tree, i.e. the aggregation pipeline. Thus any 254 expressions applied are translated into operations on the expression tree. The expression 255 tree is evaluated on MDataFrame.value, at which point the ApplyContext nor the function 256 that created it are active. 257 258 Examples:: 259 260 mdf.apply(lambda v: v * 5 ) => multiply every column in dataframe 261 mdf.apply(lambda v: v['foo'].dt.week) => get week of date for column foo 262 mdf.apply(lambda v: dict(a=v['foo'].dt.week, 263 b=v['bar'] * 5) => run multiple pipelines and get results 264 265 The callable passed to apply can be any function. It can either return None, 266 the context passed in or a list of pipeline stages. 267 268 # apply any of the below functions 269 mdf.apply(customfn) 270 271 # same as lambda v: v.dt.week 272 def customfn(ctx): 273 return ctx.dt.week 274 275 # simple pipeline 276 def customfn(ctx): 277 ctx.project(x={'$multiply: ['$x', 5]}) 278 ctx.project(y={'$divide: ['$x', 2]}) 279 280 # complex pipeline 281 def customfn(ctx): 282 return [ 283 { '$match': ... }, 284 { '$project': ... }, 285 ] 286 """ 287 288 def __init__(self, caller, columns=None, index=None): 289 self.caller = caller 290 self.columns = columns 291 self.index_columns = index or [] 292 self.computed = [] 293 self.stages = [] 294 self.expressions = [] 295 self._apply_mixins() 296 297 def _apply_mixins(self): 298 """ 299 apply mixins in defaults.OMEGA_MDF_APPLY_MIXINS 300 """ 301 from omegaml import settings 302 defaults = settings() 303 for mixin, applyto in defaults.OMEGA_MDF_APPLY_MIXINS: 304 if any(v in self.caller._applyto for v in applyto.split(',')): 305 extend_instance(self, mixin) 306 307 def __iter__(self): 308 # return pipeline stages 309 for stage in self.stages: 310 if isinstance(stage, ApplyContext): 311 for sub_stage in stage: 312 yield sub_stage 313 else: 314 yield stage 315 316 def __getitem__(self, sel): 317 """ 318 return a stage subset on a column 319 """ 320 subctx = ApplyContext(self.caller, columns=make_tuple(sel), index=self.index_columns) 321 self.add(subctx) 322 return subctx 323 324 def __setitem__(self, sel, val): 325 """ 326 add a projection to a sub context 327 328 ctx['col'] = value-expression 329 """ 330 mapping = { 331 col: v 332 for (col, v) in zip(make_tuple(sel), make_tuple(val))} 333 self.project(mapping) 334 335 def __repr__(self): 336 return 'ApplyContext(stages={}, expressions={})'.format(self.stages, self.expressions) 337 338 def add(self, stage): 339 """ 340 Add a processing stage to the pipeline 341 342 see https://docs.mongodb.com/manual/meta/aggregation-quick-reference/ 343 """ 344 self.stages.append(stage) 345 return self 346 347 def project_keeper_columns(self): 348 # keep index, computed 349 index = { 350 col: '$' + col 351 for col in self.index_columns} 352 computed = { 353 col: '$' + col 354 for col in self.computed} 355 keep = {} 356 keep.update(index) 357 keep.update(computed) 358 project = self.project(keep, keep=True) 359 return project 360 361 def _getLastStageKind(self, kind): 362 # see if there is already an open projection stage 363 for stage in self.stages[::-1]: 364 if kind in stage: 365 return stage 366 367 def _getProjection(self, append=False): 368 stage = self._getLastStageKind('$project') 369 if stage is None or append: 370 stage = { 371 '$project': { 372 '_id': 1, 373 } 374 } 375 self.stages.append(stage) 376 return stage 377 378 def _getGroupBy(self, by=None, append=False): 379 stage = self._getLastStageKind('$group') 380 if stage and stage['$group']['_id'] != by and by != '$$last': 381 # if a different groupby criteria, add a new one 382 stage = None 383 if stage is None and by == '$$last': 384 by = None 385 if stage is None or append: 386 stage = { 387 '$group': { 388 '_id': by, 389 } 390 } 391 self.stages.append(stage) 392 return stage 393 394 def groupby(self, by, expr=None, append=None, **kwargs): 395 """ 396 add a groupby accumulation using $group 397 398 :param by: the groupby columns, if provided as a list will be transformed 399 :param expr: 400 :param append: 401 :param kwargs: 402 :return: 403 404 """ 405 by = make_tuple(by) 406 self.index_columns = self.index_columns + list(by) 407 # define groupby 408 by = {col: '$' + col for col in by} 409 stage = self._getGroupBy(by) 410 groupby = stage['$group'] 411 # add acccumulators 412 expr = expr or { 413 col: colExpr 414 for col, colExpr in kwargs.items()} 415 groupby.update(expr) 416 # add a projection to extract groupby values 417 extractId = { 418 col: '$_id.' + col 419 for col in by} 420 # add a projection to keep accumulator columns 421 keepCols = { 422 col: 1 423 for col in expr} 424 keepCols.update(extractId) 425 self.project(keepCols, append=True) 426 # sort by groupby keys 427 self.add({ 428 '$sort': { 429 col: 1 430 for col in by} 431 }) 432 return self 433 434 def project(self, expr=None, append=False, keep=False, **kwargs): 435 """ 436 add a projection using $project 437 438 :param expr: the column-operator mapping 439 :param append: if True add a $project stage, otherwise add to existing 440 :param kwargs: if expr is None, the column-operator mapping as kwargs 441 :return: ApplyContext 442 443 """ 444 # get last $project stage in pipeline 445 stage = self._getProjection(append=append) 446 expr = expr or kwargs 447 self.expressions.append(expr) 448 for k, v in expr.items(): 449 # only append to stage if no other column projection was there 450 project = stage.get('$project') 451 if k not in project: 452 project.update({ 453 k: v 454 }) 455 elif not keep: 456 # if a column is already projected, add a new projection stage 457 stage = self._getProjection(append=True) 458 project = stage.get('$project') 459 project.update({ 460 k: v 461 }) 462 return self
463 464
[docs] 465class ApplyArithmetics(object): 466 """ 467 Math operators for ApplyContext 468 469 * :code:`__mul__` (*) 470 * :code:`__add__` (+) 471 * :code:`__sub__` (-) 472 * :code:`__div__` (/) 473 * :code:`__floordiv__` (//) 474 * :code:`__mod__` (%) 475 * :code:`__pow__` (pow) 476 * :code:`__ceil__` (ceil) 477 * :code:`__floor__` (floor) 478 * :code:`__trunc__` (trunc) 479 * :code:`__abs__` (abs) 480 * :code:`sqrt` (math.sqrt) 481 482 """ 483 484 def __arithmop__(op, wrap_op=None): 485 """ 486 return a pipeline $project stage math operator as 487 { col: 488 { '$operator': [ values, ...] } 489 ... 490 } 491 492 If wrap_op is specified, will wrap the $operator clause as 493 { col: 494 { '$wrap_op': { '$operator': [ values, ...] } }0 495 ... 496 } 497 """ 498 499 def inner(self, other): 500 terms = [] 501 for term in make_tuple(other): 502 if isinstance(term, str): 503 term = '$' + term 504 terms.append(term) 505 def wrap(expr): 506 if wrap_op is not None: 507 expr = { 508 wrap_op: expr 509 } 510 return expr 511 mapping = { 512 col: wrap({ 513 op: ['$' + col] + terms, 514 }) for col in self.columns} 515 keepCols = { 516 col: '$' + col 517 for col in self.index_columns} 518 mapping.update(keepCols) 519 self.project(mapping) 520 return self 521 522 return inner 523 524 #: multiply 525 __mul__ = __arithmop__('$multiply') 526 #: add 527 __add__ = __arithmop__('$add') 528 #: subtract 529 __sub__ = __arithmop__('$subtract') 530 #: divide 531 __div__ = __arithmop__('$divide') 532 __truediv__ = __arithmop__('$divide') 533 #: divide integer 534 __floordiv__ = __arithmop__('$divide', wrap_op='$floor') 535 #: modulo (%) 536 __mod__ = __arithmop__('$mod') 537 #: pow 538 __pow_ = __arithmop__('$pow') 539 #: ceil 540 __ceil__ = __arithmop__('$ceil') 541 #: floor 542 __floor__ = __arithmop__('$floor') 543 #: truncate 544 __trunc__ = __arithmop__('$trunc') 545 #: absolute 546 __abs__ = __arithmop__('$abs') 547 #: square root 548 sqrt = __arithmop__('sqrt')
549 550
[docs] 551class ApplyDateTime(object): 552 """ 553 Datetime operators for ApplyContext 554 """ 555 556 @property 557 def dt(self): 558 return self 559 560 def __dtop__(op): 561 """ 562 return a datetime $project operator as 563 { col: 564 { '$operator': '$col} } 565 ... 566 } 567 """ 568 569 def inner(self, columns=None): 570 columns = make_tuple(columns or self.columns) 571 mapping = { 572 col: { 573 op: '$' + col, 574 } 575 for col in columns} 576 self.project(mapping) 577 return self 578 579 inner.__doc__ = op.replace('$', '') 580 return inner 581 582 # mongodb mappings 583 _year = __dtop__('$year') 584 _month = __dtop__('$month') 585 _week = __dtop__('$week') 586 _dayOfWeek = __dtop__('$dayOfWeek') 587 _dayOfMonth = __dtop__('$dayOfMonth') 588 _dayOfYear = __dtop__('$dayOfYear') 589 _hour = __dtop__('$hour') 590 _minute = __dtop__('$minute') 591 _second = __dtop__('$second') 592 _millisecond = __dtop__('$millisecond') 593 _isoDayOfWeek = __dtop__('$isoDayOfWeek') 594 _isoWeek = __dtop__('$isoWeek') 595 _isoWeekYear = __dtop__('$isoWeekYear') 596 597 # .dt accessor convenience similar to pandas.dt 598 # see https://pandas.pydata.org/pandas-docs/stable/api.html#datetimelike-properties 599 year = property(_year) 600 month = property(_month) 601 day = property(_dayOfMonth) 602 hour = property(_hour) 603 minute = property(_minute) 604 second = property(_second) 605 millisecond = property(_millisecond) 606 week = property(_isoWeek) 607 dayofyear = property(_dayOfYear) 608 dayofweek = property(_dayOfWeek)
609 610
[docs] 611class ApplyString(object): 612 """ 613 String operators 614 """ 615 616 @property 617 def str(self): 618 return self 619 620 def __strexpr__(op, unwind=False, base=None, max_terms=None): 621 """ 622 return a pipeline $project string operator as 623 { col: 624 { '$operator': [ values, ...] } 625 ... 626 } 627 """ 628 629 def inner(self, other, *args): 630 # get all values passed and build terms from them 631 values = list(make_tuple(other) + args) 632 terms = [] 633 for term in values: 634 if isinstance(term, str): 635 # if the term is a column name, add as a column name 636 if term in self.columns: 637 term = '$' + term 638 # allow to specify values explicitely by $$<value> => <value> 639 term = term.replace('$$', '') 640 terms.append(term) 641 # limit number of terms if requested 642 if max_terms: 643 terms = terms[:max_terms] 644 # add projection of output columns to operator 645 mapping = { 646 col: { 647 op: terms if base is None else ['$' + col] + terms, 648 } for col in self.columns} 649 self.project(mapping) 650 # unwind all columns if requested 651 if unwind: 652 exprs = [{'$unwind': { 653 'path': '$' + col 654 }} for col in self.columns] 655 self.stages.extend(exprs) 656 return self 657 658 inner.__doc__ = op.replace('$', '') 659 return inner 660 661 def __strunary__(op, unwind=False): 662 """ 663 return a datetime $project operator as 664 { col: 665 { '$operator': '$col} } 666 ... 667 } 668 """ 669 670 def inner(self, columns=None): 671 columns = make_tuple(columns or self.columns) 672 mapping = { 673 col: { 674 op: '$' + col, 675 } 676 for col in columns} 677 self.project(mapping) 678 if unwind: 679 self.stages.append({ 680 '$unwind': { 681 '' 682 } 683 }) 684 return self 685 686 inner.__doc__ = op.replace('$', '') 687 688 return inner 689 690 def isequal(self, other): 691 self.strcasecmp(other) 692 # strcasecmp returns 0 for equality, 1 and -1 for greater/less than 693 # https://docs.mongodb.com/manual/reference/operator/aggregation/strcasecmp/ 694 mapping = { 695 col: { 696 '$cond': { 697 'if': {'$eq': ['$' + col, 0]}, 698 'then': True, 699 'else': False, 700 } 701 } 702 for col in self.columns} 703 self.project(mapping) 704 705 concat = __strexpr__('$concat', base=True) 706 split = __strexpr__('$split', unwind=True, base=True, max_terms=2) 707 usplit = __strexpr__('$split', unwind=False, base=True, max_terms=2) 708 upper = __strunary__('$toUpper') 709 lower = __strunary__('$toLower') 710 substr = __strexpr__('$substr', base=True) 711 strcasecmp = __strexpr__('$strcasecmp', base=True) 712 len = __strunary__('$strLenBytes') 713 index = __strexpr__('$indexOfBytes', base=True)
714 715
[docs] 716class ApplyAccumulators(object): 717 def agg(self, map=None, **kwargs): 718 stage = self._getGroupBy(by='$$last') 719 specs = map or kwargs 720 for col, colExpr in specs.items(): 721 if isinstance(colExpr, dict): 722 # specify an arbitrary expression 723 groupby = stage['$group'] 724 groupby[col] = colExpr 725 elif isinstance(colExpr, str): 726 # specify some known operator 727 if hasattr(self, colExpr): 728 method = getattr(self, colExpr) 729 method(col) 730 else: 731 raise SyntaxError('{} is not known'.format(colExpr)) 732 elif isinstance(colExpr, (tuple, list)): 733 # specify a list of some known operators 734 for statExpr in colExpr: 735 if hasattr(self, statExpr): 736 method = getattr(self, statExpr) 737 method(col) 738 else: 739 raise SyntaxError('{} is not known'.format(statExpr)) 740 elif callable(colExpr): 741 # specify a callable that returns an expression 742 groupby = stage['$group'] 743 groupby[col] = colExpr(col) 744 else: 745 SyntaxError('{} on column {} is unknown or invalid'.format(colExpr, col)) 746 return self 747 748 def __statop__(op, opname=None): 749 opname = opname or op.replace('$', '') 750 751 def inner(self, columns=None): 752 columns = make_tuple(columns or self.columns) 753 stage = self._getGroupBy(by='$$last') 754 groupby = stage['$group'] 755 groupby.update({ 756 '{}_{}'.format(col, opname): { 757 op: '$' + col 758 } for col in columns 759 }) 760 self.computed.extend(groupby.keys()) 761 self.project_keeper_columns() 762 return self 763 764 return inner 765 766 sum = __statop__('$sum') 767 avg = __statop__('$avg') 768 mean = __statop__('$avg') 769 min = __statop__('$min') 770 max = __statop__('$max') 771 std = __statop__('$stdDevSamp', 'std')
772 773 774class ApplyCache(object): 775 """ 776 A Cache that works on collections and pipelines 777 """ 778 def __init__(self, db_alias): 779 self._db_alias = db_alias 780 781 def set(self, key, value): 782 # https://stackoverflow.com/a/22003440/890242 783 QueryCache = make_QueryCache(self._db_alias) 784 QueryCache.objects(key=key).update_one(set__key="{}".format(key), 785 set__value=value, upsert=True) 786 787 def get(self, key): 788 QueryCache = make_QueryCache(self._db_alias) 789 try: 790 result = QueryCache.objects.get(key=key) 791 except: 792 result = None 793 return result 794 795 796class ApplyStatistics(object): 797 def quantile(self, q=.5): 798 def preparefn(val): 799 return val.pivot(columns='var', index='percentile', values='value') 800 return self.apply(self._percentile(q), preparefn=preparefn) 801 802 def cov(self): 803 def preparefn(val): 804 val = val.pivot(columns='y', index='x', values='cov') 805 val.index.name = None 806 val.columns.name = None 807 return val 808 return self.apply(self._covariance, preparefn=preparefn) 809 810 def corr(self): 811 def preparefn(val): 812 val = val.pivot(columns='y', index='x', values='rho') 813 val.index.name = None 814 val.columns.name = None 815 return val 816 return self.apply(self._pearson, preparefn=preparefn) 817 818 def _covariance(self, ctx): 819 # this works 820 # source http://ci.columbia.edu/ci/premba_test/c0331/s7/s7_5.html 821 facets = {} 822 means = {} 823 unwinds = [] 824 count = len(ctx.caller.noapply()) - 1 825 for x, y in product(ctx.columns, ctx.columns): 826 xcol = '$' + x 827 ycol = '$' + y 828 # only calculate the same column's mean once 829 if xcol not in means: 830 means[xcol] = ctx.caller[x].noapply().mean().values[0, 0] 831 if ycol not in means: 832 means[ycol] = ctx.caller[y].noapply().mean().values[0, 0] 833 sumands = { 834 xcol: { 835 '$subtract': [xcol, means[xcol]] 836 }, 837 ycol: { 838 '$subtract': [ycol, means[ycol]] 839 } 840 } 841 multiply = { 842 '$multiply': [sumands[xcol], sumands[ycol]] 843 } 844 agg = { 845 '$group': { 846 '_id': None, 847 'value': { 848 '$sum': multiply 849 } 850 } 851 } 852 project = { 853 '$project': { 854 'cov': { 855 '$divide': ['$value', count], 856 }, 857 'x': x, 858 'y': y, 859 } 860 } 861 pipeline = [agg, project] 862 outcol = '{}_{}'.format(x, y) 863 facets[outcol] = pipeline 864 unwinds.append({'$unwind': '$' + outcol}) 865 facet = { 866 '$facet': facets, 867 } 868 expand = [{ 869 '$project': { 870 'value': { 871 '$objectToArray': '$$CURRENT', 872 } 873 } 874 }, { 875 '$unwind': '$value' 876 }, { 877 '$replaceRoot': { 878 'newRoot': '$value.v' 879 } 880 }] 881 return [facet, *unwinds, *expand] 882 883 def _pearson(self, ctx): 884 # this works 885 # source http://ilearnasigoalong.blogspot.ch/2017/10/calculating-correlation-inside-mongodb.html 886 facets = {} 887 unwinds = [] 888 for x, y in product(ctx.columns, ctx.columns): 889 xcol = '$' + x 890 ycol = '$' + y 891 sumcolumns = {'$group': {'_id': None, 892 'count': {'$sum': 1}, 893 'sumx': {'$sum': xcol}, 894 'sumy': {'$sum': ycol}, 895 'sumxsquared': {'$sum': {'$multiply': [xcol, xcol]}}, 896 'sumysquared': {'$sum': {'$multiply': [ycol, ycol]}}, 897 'sumxy': {'$sum': {'$multiply': [xcol, ycol]}} 898 }} 899 900 multiply_sumx_sumy = {'$multiply': ["$sumx", "$sumy"]} 901 multiply_sumxy_count = {'$multiply': ["$sumxy", "$count"]} 902 partone = {'$subtract': [multiply_sumxy_count, multiply_sumx_sumy]} 903 904 multiply_sumxsquared_count = {'$multiply': ["$sumxsquared", "$count"]} 905 sumx_squared = {'$multiply': ["$sumx", "$sumx"]} 906 subparttwo = {'$subtract': [multiply_sumxsquared_count, sumx_squared]} 907 908 multiply_sumysquared_count = {'$multiply': ["$sumysquared", "$count"]} 909 sumy_squared = {'$multiply': ["$sumy", "$sumy"]} 910 subpartthree = {'$subtract': [multiply_sumysquared_count, sumy_squared]} 911 912 parttwo = {'$sqrt': {'$multiply': [subparttwo, subpartthree]}} 913 914 rho = {'$project': { 915 'rho': { 916 '$divide': [partone, parttwo] 917 }, 918 'x': x, 919 'y': y 920 }} 921 pipeline = [sumcolumns, rho] 922 outcol = '{}_{}'.format(x, y) 923 facets[outcol] = pipeline 924 unwinds.append({'$unwind': '$' + outcol}) 925 facet = { 926 '$facet': facets, 927 } 928 expand = [{ 929 '$project': { 930 'value': { 931 '$objectToArray': '$$CURRENT', 932 } 933 } 934 }, { 935 '$unwind': '$value' 936 }, { 937 '$replaceRoot': { 938 'newRoot': '$value.v' 939 } 940 }] 941 return [facet, *unwinds, *expand] 942 943 def _percentile(self, pctls=None): 944 """ 945 calculate percentiles for all columns 946 """ 947 pctls = pctls or [.25, .5, .75] 948 if not isinstance(pctls, (list, tuple)): 949 pctls = [pctls] 950 951 def calc(col, p, outcol): 952 # sort values 953 sort = { 954 '$sort': { 955 col: 1, 956 } 957 } 958 # group/push to get an array of all values 959 group = { 960 '$group': { 961 '_id': col, 962 'values': { 963 '$push': "$" + col 964 }, 965 } 966 } 967 # find value at requested percentile 968 perc = { 969 '$arrayElemAt': [ 970 '$values', { 971 '$floor': { 972 '$multiply': [{ 973 '$size': '$values' 974 }, p] 975 }} 976 ] 977 } 978 # map percentile value to output column 979 project = { 980 '$project': { 981 'var': col, 982 'percentile': 'p{}'.format(p), 983 'value': perc, 984 } 985 } 986 return [sort, group, project] 987 988 def inner(ctx): 989 # for each column and requested percentile, build a pipeline 990 # all pipelines will be combined into a $facet stage to 991 # calculate every column/percentile tuple in parallel 992 facets = {} 993 unwind = [] 994 # for each column build a pipeline to calculate the percentiles 995 for col in ctx.columns: 996 for p in pctls: 997 # e.g. outcol for perc .25 of column abc => abcp25 998 outcol = '{}_p{}'.format(col, p).replace('0.', '') 999 facets[outcol] = calc(col, p, outcol) 1000 unwind.append({'$unwind': '$'+ outcol}) 1001 # process per-column pipelines in parallel, resulting in one 1002 # document for each variable + percentile combination 1003 facet = { 1004 '$facet': facets 1005 } 1006 # expand single document into one document per variable + percentile combo 1007 # the resulting set of documents contains var/percentile/value 1008 expand = [{ 1009 '$project': { 1010 'value': { 1011 '$objectToArray': '$$CURRENT', 1012 } 1013 } 1014 }, { 1015 '$unwind': '$value' 1016 }, { 1017 '$replaceRoot': { 1018 'newRoot': '$value.v' 1019 } 1020 }] 1021 pipeline = [facet, *unwind, *expand] 1022 return pipeline 1023 1024 return inner 1025 1026