Source code for omegaml.mixins.mdf.apply

   1import hashlib
   2import json
   3import pandas as pd
   4from itertools import product
   5from uuid import uuid4
   6
   7from omegaml.documents import make_QueryCache
   8from omegaml.mdataframe import MDataFrame, MSeries
   9from omegaml.store import qops
  10from omegaml.store.filtered import FilteredCollection
  11from omegaml.util import make_tuple, extend_instance
  12
  13
  14class ApplyMixin(object):
  15    """
  16    Implements the apply() mixin supporting arbitrary functions to build aggregation pipelines
  17
  18    Note that .apply() does not execute immediately. Instead it builds an aggregation pipeline
  19    that is executed on MDataFrame.value. Note that .apply() calls cannot be cascaded yet, i.e.
  20    a later .apply() will override a previous.apply().
  21
  22    See ApplyContext for usage examples.
  23    """
  24
  25    def __init__(self, *args, **kwargs):
  26        super(ApplyMixin, self).__init__(*args, **kwargs)
  27        self._init_mixin(*args, **kwargs)
  28
  29    def _init_mixin(self, *args, **kwargs):
  30        self.apply_fn = kwargs.get('apply_fn', None)
  31        # set to True if the pipeline is a facet operation
  32        self.is_from_facet = kwargs.get('is_from_facet', False)
  33        # index columns
  34        self.index_columns = kwargs.get('index_columns', [])
  35        # db alias
  36        self._db_alias = kwargs.get('db_alias', self._ensure_db_connection())
  37        # cache used on persist()
  38        self.cache = kwargs.get('cache', ApplyCache(self._db_alias))
  39
  40    def _ensure_db_connection(self):
  41        from mongoengine.connection import _dbs, _connections
  42
  43        seek_db = self.collection.database
  44        for alias, db in _dbs.items():
  45            if db is seek_db:
  46                self._db_alias = alias
  47                break
  48        else:
  49            # fake connection register
  50            alias = self._db_alias = 'omega-{}'.format(uuid4().hex)
  51            _connections[alias] = seek_db.client
  52            _dbs[alias] = seek_db
  53        return self._db_alias
  54
  55    def nocache(self):
  56        self.cache = None
  57        return self
  58
  59    def reset_cache(self, full=False):
  60        """
  61        Reset the apply cache
  62
  63        :param full: if True will reset all caches for the collection, if False will only remove
  64            the cache for the specific .apply operations
  65        :return:
  66        """
  67        QueryCache = make_QueryCache(db_alias=self._db_alias)
  68        if full:
  69            QueryCache.objects.filter(value__collection=self.collection.name).delete()
  70        else:
  71            pipeline = self._build_pipeline()
  72            key = self._make_cache_key(self.collection, pipeline)
  73            QueryCache.objects.filter(key=key).delete()
  74        return self
  75
  76    def _make_cache_key(self, collection, pipeline):
  77        # remove random output value
  78        if '$out' in pipeline[-1] and pipeline[-1]['$out'].startswith('cache'):
  79            pipeline = list(pipeline)[:-1]
  80        spipeline = json.dumps(pipeline, sort_keys=True)
  81        data = '{}_{}'.format(collection.name, spipeline).encode('utf-8')
  82        # SEC: CWE-916
  83        # - status: wontfix
  84        # - reason: hashcode is used purely for name resolution, not a security function
  85        key = hashlib.md5(data).hexdigest()
  86        return key
  87
  88    def _getcopy_kwargs(self, **kwargs):
  89        kwargs = super(ApplyMixin, self)._getcopy_kwargs(**kwargs)
  90        kwargs.update(is_from_facet=self.is_from_facet,
  91                      index_columns=self.index_columns,
  92                      cache=self.cache,
  93                      apply_fn=self.apply_fn)
  94        return kwargs
  95
  96    def noapply(self):
  97        self.apply_fn = None
  98        return self
  99
 100    def apply(self, fn, inplace=False, preparefn=None):
 101        if inplace:
 102            obj = self
 103        else:
 104            kwargs = self._getcopy_kwargs()
 105            kwargs.update(preparefn=preparefn)
 106            if isinstance(self, MSeries):
 107                obj = MSeries(self.collection, **kwargs)
 108            else:
 109                obj = MDataFrame(self.collection, **kwargs)
 110        obj.apply_fn = fn
 111        return obj
 112
 113    def persist(self):
 114        """
 115        Execute and store results in cache
 116
 117        Any pipeline of the same operations, in the same order, on
 118        the same collection will return the same result.
 119        """
 120        # generate a cache key
 121        pipeline = self._build_pipeline()
 122        key = self._make_cache_key(self.collection, pipeline)
 123        outname = 'cache_{}'.format(uuid4().hex)
 124        value = {
 125            'collection': self.collection.name,
 126            'result': outname,
 127        }
 128        # do usual processing, store result
 129        # -- note we pass pipeline to avoid processing iterators twice
 130        pipeline.append({
 131            '$out': outname,
 132        })
 133        cursor = self._get_cursor(pipeline=pipeline, use_cache=False)
 134        # consume cursor to store output (via $out)
 135        for v in cursor:
 136            pass
 137        # set cache
 138        self.cache.set(key, value)
 139        return key
 140
 141    def set_index(self, columns):
 142        self.index_columns = make_tuple(columns)
 143        return self
 144
 145    def inspect(self, explain=False, *args, **kwargs):
 146        if self.apply_fn:
 147            details = {
 148                'pipeline': self._build_pipeline()
 149            }
 150            if explain:
 151                details.update(self.__dict__)
 152            return details
 153        return super(ApplyMixin, self).inspect(*args, explain=explain, **kwargs)
 154
 155    def _execute(self):
 156        ctx = ApplyContext(self, columns=self.columns)
 157        try:
 158            result = self.apply_fn(ctx)
 159        except Exception as e:
 160            msg = [repr(stage) for stage in ctx.stages] + [repr(e)]
 161            raise RuntimeError(msg)
 162        if result is None or isinstance(result, ApplyContext):
 163            result = result or ctx
 164            self.index_columns = self.index_columns or result.index_columns
 165            return result
 166        elif isinstance(result, list):
 167            return result
 168        elif isinstance(result, dict):
 169            # expect a mapping of col=ApplyContext each with its own list of stages
 170            # -- build a combined context by adding each expression
 171            #    this ensures any multi-stage projections are carried forward
 172            facets = {}
 173            for col, expr in result.items():
 174                if isinstance(expr, ApplyContext):
 175                    facets[col] = list(expr)
 176                    project = {
 177                        '$project': {
 178                            col: '$' + expr.columns[0]
 179                        },
 180                    }
 181                    facets[col].append(project)
 182                else:
 183                    facets[col] = expr
 184            facet = {
 185                '$facet': facets
 186            }
 187            self.is_from_facet = True
 188            return [facet]
 189        raise ValueError('Cannot build pipeline from apply result of type {}'.format(type(result)))
 190
 191    def _build_pipeline(self):
 192        pipeline = []
 193        stages = self._execute()
 194        pipeline.extend(stages)
 195        self._amend_pipeline(pipeline)
 196        return pipeline
 197
 198    def _amend_pipeline(self, pipeline):
 199        """ amend pipeline with default ops on coll.aggregate() calls """
 200        if self.sort_order:
 201            sort = qops.SORT(**dict(qops.make_sortkey(self.sort_order)))
 202            pipeline.append(sort)
 203        return pipeline
 204
 205    def _get_cached_cursor(self, pipeline=None, use_cache=True):
 206        pipeline = pipeline or self._build_pipeline()
 207        if use_cache and self.cache:
 208            key = self._make_cache_key(self.collection, pipeline)
 209            entry = self.cache.get(key)
 210            if entry is not None:
 211                # read result
 212                outname = entry.value['result']
 213                return self.collection.database[outname].find()
 214
 215    def _get_cursor(self, pipeline=None, use_cache=True):
 216        # for apply functions, call the apply function, expecting a pipeline in return
 217        if self.apply_fn:
 218            pipeline = pipeline or self._build_pipeline()
 219            cursor = self._get_cached_cursor(pipeline=pipeline, use_cache=use_cache)
 220            if cursor is None:
 221                filter_criteria = self._get_filter_criteria()
 222                cursor = FilteredCollection(self.collection).aggregate(pipeline, filter=filter_criteria, allowDiskUse=True)
 223        else:
 224            cursor = super(ApplyMixin, self)._get_cursor()
 225        return cursor
 226
 227    def _get_dataframe_from_cursor(self, cursor):
 228        df = super(ApplyMixin, self)._get_dataframe_from_cursor(cursor)
 229        if self.is_from_facet:
 230            # if this was from a facet pipeline (i.e. multi-column mapping), combine
 231            # $facet returns one document for each stage.
 232            frames = []
 233            for col in df.columns:
 234                coldf = pd.DataFrame(df[col].iloc[0]).set_index('_id')
 235                frames.append(coldf)
 236            df = pd.concat(frames, axis=1).reset_index()
 237            df = self._restore_dataframe_proper(df)
 238        # TODO write a unit test for this condition
 239        if self.index_columns and all(col in df.columns for col in self.index_columns):
 240            df.set_index(list(self.index_columns), inplace=True)
 241        return df
 242
 243
[docs] 244class ApplyContext(object): 245 """ 246 Enable apply functions 247 248 .apply(fn) will call fn(ctx) where ctx is an ApplyContext. 249 The context supports methods to apply functions in a Pandas-style apply manner. ApplyContext is extensible 250 by adding an extension class to defaults.OMEGA_MDF_APPLY_MIXINS. 251 252 Note that unlike a Pandas DataFrame, ApplyContext does not itself contain any data. 253 Rather it is part of an expression tree, i.e. the aggregation pipeline. Thus any 254 expressions applied are translated into operations on the expression tree. The expression 255 tree is evaluated on MDataFrame.value, at which point the ApplyContext nor the function 256 that created it are active. 257 258 Examples:: 259 260 mdf.apply(lambda v: v * 5 ) => multiply every column in dataframe 261 mdf.apply(lambda v: v['foo'].dt.week) => get week of date for column foo 262 mdf.apply(lambda v: dict(a=v['foo'].dt.week, 263 b=v['bar'] * 5) => run multiple pipelines and get results 264 265 The callable passed to apply can be any function. It can either return None, 266 the context passed in or a list of pipeline stages. 267 268 # apply any of the below functions 269 mdf.apply(customfn) 270 271 # same as lambda v: v.dt.week 272 def customfn(ctx): 273 return ctx.dt.week 274 275 # simple pipeline 276 def customfn(ctx): 277 ctx.project(x={'$multiply: ['$x', 5]}) 278 ctx.project(y={'$divide: ['$x', 2]}) 279 280 # complex pipeline 281 def customfn(ctx): 282 return [ 283 { '$match': ... }, 284 { '$project': ... }, 285 ] 286 """ 287 288 def __init__(self, caller, columns=None, index=None): 289 self.caller = caller 290 self.columns = columns 291 self.index_columns = index or [] 292 self.computed = [] 293 self.stages = [] 294 self.expressions = [] 295 self._apply_mixins() 296 297 def _apply_mixins(self): 298 """ 299 apply mixins in defaults.OMEGA_MDF_APPLY_MIXINS 300 """ 301 from omegaml import settings 302 defaults = settings() 303 for mixin, applyto in defaults.OMEGA_MDF_APPLY_MIXINS: 304 if any(v in self.caller._applyto for v in applyto.split(',')): 305 extend_instance(self, mixin) 306 307 def __iter__(self): 308 # return pipeline stages 309 for stage in self.stages: 310 if isinstance(stage, ApplyContext): 311 for sub_stage in stage: 312 yield sub_stage 313 else: 314 yield stage 315 316 def __getitem__(self, sel): 317 """ 318 return a stage subset on a column 319 """ 320 subctx = ApplyContext(self.caller, columns=make_tuple(sel), index=self.index_columns) 321 self.add(subctx) 322 return subctx 323 324 def __setitem__(self, sel, val): 325 """ 326 add a projection to a sub context 327 328 ctx['col'] = value-expression 329 """ 330 mapping = { 331 col: v 332 for (col, v) in zip(make_tuple(sel), make_tuple(val))} 333 self.project(mapping) 334 335 def __repr__(self): 336 return 'ApplyContext(stages={}, expressions={})'.format(self.stages, self.expressions) 337 338 def add(self, stage): 339 """ 340 Add a processing stage to the pipeline 341 342 see https://docs.mongodb.com/manual/meta/aggregation-quick-reference/ 343 """ 344 self.stages.append(stage) 345 return self 346 347 def project_keeper_columns(self): 348 # keep index, computed 349 index = { 350 col: '$' + col 351 for col in self.index_columns} 352 computed = { 353 col: '$' + col 354 for col in self.computed} 355 keep = {} 356 keep.update(index) 357 keep.update(computed) 358 project = self.project(keep, keep=True) 359 return project 360 361 def _getLastStageKind(self, kind): 362 # see if there is already an open projection stage 363 for stage in self.stages[::-1]: 364 if kind in stage: 365 return stage 366 367 def _getProjection(self, append=False): 368 stage = self._getLastStageKind('$project') 369 if stage is None or append: 370 stage = { 371 '$project': { 372 '_id': 1, 373 } 374 } 375 self.stages.append(stage) 376 return stage 377 378 def _getGroupBy(self, by=None, append=False): 379 stage = self._getLastStageKind('$group') 380 if stage and stage['$group']['_id'] != by and by != '$$last': 381 # if a different groupby criteria, add a new one 382 stage = None 383 if stage is None and by == '$$last': 384 by = None 385 if stage is None or append: 386 stage = { 387 '$group': { 388 '_id': by, 389 } 390 } 391 self.stages.append(stage) 392 return stage 393 394 def groupby(self, by, expr=None, append=None, **kwargs): 395 """ 396 add a groupby accumulation using $group 397 398 :param by: the groupby columns, if provided as a list will be transformed 399 :param expr: 400 :param append: 401 :param kwargs: 402 :return: 403 404 """ 405 by = make_tuple(by) 406 self.index_columns = self.index_columns + list(by) 407 # define groupby 408 by = {col: '$' + col for col in by} 409 stage = self._getGroupBy(by) 410 groupby = stage['$group'] 411 # add acccumulators 412 expr = expr or { 413 col: colExpr 414 for col, colExpr in kwargs.items()} 415 groupby.update(expr) 416 # add a projection to extract groupby values 417 extractId = { 418 col: '$_id.' + col 419 for col in by} 420 # add a projection to keep accumulator columns 421 keepCols = { 422 col: 1 423 for col in expr} 424 keepCols.update(extractId) 425 self.project(keepCols, append=True) 426 # sort by groupby keys 427 self.add({ 428 '$sort': { 429 col: 1 430 for col in by} 431 }) 432 return self 433 434 def project(self, expr=None, append=False, keep=False, **kwargs): 435 """ 436 add a projection using $project 437 438 :param expr: the column-operator mapping 439 :param append: if True add a $project stage, otherwise add to existing 440 :param kwargs: if expr is None, the column-operator mapping as kwargs 441 :return: ApplyContext 442 443 """ 444 # get last $project stage in pipeline 445 stage = self._getProjection(append=append) 446 expr = expr or kwargs 447 self.expressions.append(expr) 448 for k, v in expr.items(): 449 # only append to stage if no other column projection was there 450 project = stage.get('$project') 451 if k not in project: 452 project.update({ 453 k: v 454 }) 455 elif not keep: 456 # if a column is already projected, add a new projection stage 457 stage = self._getProjection(append=True) 458 project = stage.get('$project') 459 project.update({ 460 k: v 461 }) 462 return self
463 464
[docs] 465class ApplyArithmetics(object): 466 """ 467 Math operators for ApplyContext 468 469 * :code:`__mul__` (*) 470 * :code:`__add__` (+) 471 * :code:`__sub__` (-) 472 * :code:`__div__` (/) 473 * :code:`__floordiv__` (//) 474 * :code:`__mod__` (%) 475 * :code:`__pow__` (pow) 476 * :code:`__ceil__` (ceil) 477 * :code:`__floor__` (floor) 478 * :code:`__trunc__` (trunc) 479 * :code:`__abs__` (abs) 480 * :code:`sqrt` (math.sqrt) 481 482 """ 483 484 def __arithmop__(op, wrap_op=None): 485 """ 486 return a pipeline $project stage math operator as 487 { col: 488 { '$operator': [ values, ...] } 489 ... 490 } 491 492 If wrap_op is specified, will wrap the $operator clause as 493 { col: 494 { '$wrap_op': { '$operator': [ values, ...] } }0 495 ... 496 } 497 """ 498 499 def inner(self, other): 500 terms = [] 501 for term in make_tuple(other): 502 if isinstance(term, str): 503 term = '$' + term 504 terms.append(term) 505 def wrap(expr): 506 if wrap_op is not None: 507 expr = { 508 wrap_op: expr 509 } 510 return expr 511 mapping = { 512 col: wrap({ 513 op: ['$' + col] + terms, 514 }) for col in self.columns} 515 keepCols = { 516 col: '$' + col 517 for col in self.index_columns} 518 mapping.update(keepCols) 519 self.project(mapping) 520 return self 521 522 return inner 523 524 #: multiply 525 __mul__ = __arithmop__('$multiply') 526 #: add 527 __add__ = __arithmop__('$add') 528 #: subtract 529 __sub__ = __arithmop__('$subtract') 530 #: divide 531 __div__ = __arithmop__('$divide') 532 __truediv__ = __arithmop__('$divide') 533 #: divide integer 534 __floordiv__ = __arithmop__('$divide', wrap_op='$floor') 535 #: modulo (%) 536 __mod__ = __arithmop__('$mod') 537 #: pow 538 __pow_ = __arithmop__('$pow') 539 #: ceil 540 __ceil__ = __arithmop__('$ceil') 541 #: floor 542 __floor__ = __arithmop__('$floor') 543 #: truncate 544 __trunc__ = __arithmop__('$trunc') 545 #: absolute 546 __abs__ = __arithmop__('$abs') 547 #: square root 548 sqrt = __arithmop__('sqrt')
549 550
[docs] 551class ApplyDateTime(object): 552 """ 553 Datetime operators for ApplyContext 554 """ 555 556 @property 557 def dt(self): 558 return self 559 560 def __dtop__(op): 561 """ 562 return a datetime $project operator as 563 { col: 564 { '$operator': '$col} } 565 ... 566 } 567 """ 568 569 def inner(self, columns=None): 570 columns = make_tuple(columns or self.columns) 571 mapping = { 572 col: { 573 op: '$' + col, 574 } 575 for col in columns} 576 self.project(mapping) 577 return self 578 579 inner.__doc__ = op.replace('$', '') 580 return inner 581 582 # mongodb mappings 583 _year = __dtop__('$year') 584 _month = __dtop__('$month') 585 _week = __dtop__('$week') 586 _dayOfWeek = __dtop__('$dayOfWeek') 587 _dayOfMonth = __dtop__('$dayOfMonth') 588 _dayOfYear = __dtop__('$dayOfYear') 589 _hour = __dtop__('$hour') 590 _minute = __dtop__('$minute') 591 _second = __dtop__('$second') 592 _millisecond = __dtop__('$millisecond') 593 _isoDayOfWeek = __dtop__('$isoDayOfWeek') 594 _isoWeek = __dtop__('$isoWeek') 595 _isoWeekYear = __dtop__('$isoWeekYear') 596 597 # .dt accessor convenience similar to pandas.dt 598 # see https://pandas.pydata.org/pandas-docs/stable/api.html#datetimelike-properties 599 year = property(_year) 600 month = property(_month) 601 day = property(_dayOfMonth) 602 hour = property(_hour) 603 minute = property(_minute) 604 second = property(_second) 605 millisecond = property(_millisecond) 606 week = property(_isoWeek) 607 dayofyear = property(_dayOfYear) 608 dayofweek = property(_dayOfWeek)
609 610
[docs] 611class ApplyString(object): 612 """ 613 String operators 614 """ 615 616 @property 617 def str(self): 618 return self 619 620 def __strexpr__(op, unwind=False, base=None, max_terms=None): 621 """ 622 return a pipeline $project string operator as 623 { col: 624 { '$operator': [ values, ...] } 625 ... 626 } 627 """ 628 629 def inner(self, other, *args): 630 # get all values passed and build terms from them 631 values = list(make_tuple(other) + args) 632 terms = [] 633 for term in values: 634 if isinstance(term, str): 635 # if the term is a column name, add as a column name 636 if term in self.columns: 637 term = '$' + term 638 # allow to specify values explicitely by $$<value> => <value> 639 term = term.replace('$$', '') 640 terms.append(term) 641 # limit number of terms if requested 642 if max_terms: 643 terms = terms[:max_terms] 644 # add projection of output columns to operator 645 mapping = { 646 col: { 647 op: terms if base is None else ['$' + col] + terms, 648 } for col in self.columns} 649 self.project(mapping) 650 # unwind all columns if requested 651 if unwind: 652 exprs = [{'$unwind': { 653 'path': '$' + col 654 }} for col in self.columns] 655 self.stages.extend(exprs) 656 return self 657 658 inner.__doc__ = op.replace('$', '') 659 return inner 660 661 def __strunary__(op, unwind=False): 662 """ 663 return a datetime $project operator as 664 { col: 665 { '$operator': '$col} } 666 ... 667 } 668 """ 669 670 def inner(self, columns=None): 671 columns = make_tuple(columns or self.columns) 672 mapping = { 673 col: { 674 op: '$' + col, 675 } 676 for col in columns} 677 self.project(mapping) 678 if unwind: 679 self.stages.append({ 680 '$unwind': { 681 '' 682 } 683 }) 684 return self 685 686 inner.__doc__ = op.replace('$', '') 687 688 return inner 689 690 def isequal(self, other): 691 self.strcasecmp(other) 692 # strcasecmp returns 0 for equality, 1 and -1 for greater/less than 693 # https://docs.mongodb.com/manual/reference/operator/aggregation/strcasecmp/ 694 mapping = { 695 col: { 696 '$cond': { 697 'if': {'$eq': ['$' + col, 0]}, 698 'then': True, 699 'else': False, 700 } 701 } 702 for col in self.columns} 703 self.project(mapping) 704 705 concat = __strexpr__('$concat', base=True) 706 split = __strexpr__('$split', unwind=True, base=True, max_terms=2) 707 usplit = __strexpr__('$split', unwind=False, base=True, max_terms=2) 708 upper = __strunary__('$toUpper') 709 lower = __strunary__('$toLower') 710 substr = __strexpr__('$substr', base=True) 711 strcasecmp = __strexpr__('$strcasecmp', base=True) 712 len = __strunary__('$strLenBytes') 713 index = __strexpr__('$indexOfBytes', base=True)
714 715
[docs] 716class ApplyAccumulators(object): 717 def agg(self, map=None, **kwargs): 718 stage = self._getGroupBy(by='$$last') 719 specs = map or kwargs 720 for col, colExpr in specs.items(): 721 if isinstance(colExpr, dict): 722 # specify an arbitrary expression 723 groupby = stage['$group'] 724 groupby[col] = colExpr 725 elif isinstance(colExpr, str): 726 # specify some known operator 727 if hasattr(self, colExpr): 728 method = getattr(self, colExpr) 729 method(col) 730 else: 731 raise SyntaxError('{} is not known'.format(colExpr)) 732 elif isinstance(colExpr, (tuple, list)): 733 # specify a list of some known operators 734 for statExpr in colExpr: 735 if hasattr(self, statExpr): 736 method = getattr(self, statExpr) 737 method(col) 738 else: 739 raise SyntaxError('{} is not known'.format(statExpr)) 740 elif callable(colExpr): 741 # specify a callable that returns an expression 742 groupby = stage['$group'] 743 groupby[col] = colExpr(col) 744 else: 745 SyntaxError('{} on column {} is unknown or invalid'.format(colExpr, col)) 746 return self 747 748 def __statop__(op, opname=None): 749 opname = opname or op.replace('$', '') 750 751 def inner(self, columns=None): 752 columns = make_tuple(columns or self.columns) 753 stage = self._getGroupBy(by='$$last') 754 groupby = stage['$group'] 755 groupby.update({ 756 '{}_{}'.format(col, opname): { 757 op: '$' + col 758 } for col in columns 759 }) 760 self.computed.extend(groupby.keys()) 761 self.project_keeper_columns() 762 return self 763 764 return inner 765 766 sum = __statop__('$sum') 767 avg = __statop__('$avg') 768 mean = __statop__('$avg') 769 min = __statop__('$min') 770 max = __statop__('$max') 771 std = __statop__('$stdDevSamp', 'std')
772 773 774class ApplyCache(object): 775 """ 776 A Cache that works on collections and pipelines 777 """ 778 def __init__(self, db_alias): 779 self._db_alias = db_alias 780 781 def set(self, key, value): 782 # https://stackoverflow.com/a/22003440/890242 783 QueryCache = make_QueryCache(self._db_alias) 784 QueryCache.objects(key=key).update_one(set__key="{}".format(key), 785 set__value=value, upsert=True) 786 787 def get(self, key): 788 QueryCache = make_QueryCache(self._db_alias) 789 try: 790 result = QueryCache.objects.get(key=key) 791 except: 792 result = None 793 return result 794 795 796class ApplyStatistics(object): 797 def quantile(self, q=.5): 798 def preparefn(val): 799 return val.pivot(columns='var', index='percentile', values='value') 800 return self.apply(self._percentile(q), preparefn=preparefn) 801 802 def cov(self): 803 def preparefn(val): 804 val = val.pivot(columns='y', index='x', values='cov') 805 val.index.name = None 806 val.columns.name = None 807 return val 808 return self.apply(self._covariance, preparefn=preparefn) 809 810 def corr(self): 811 def preparefn(val): 812 val = val.pivot(columns='y', index='x', values='rho') 813 val.index.name = None 814 val.columns.name = None 815 return val 816 return self.apply(self._pearson, preparefn=preparefn) 817 818 def _covariance(self, ctx): 819 # this works 820 # source http://ci.columbia.edu/ci/premba_test/c0331/s7/s7_5.html 821 facets = {} 822 means = {} 823 unwinds = [] 824 count = len(ctx.caller.noapply()) - 1 825 for x, y in product(ctx.columns, ctx.columns): 826 xcol = '$' + x 827 ycol = '$' + y 828 # only calculate the same column's mean once 829 if xcol not in means: 830 means[xcol] = ctx.caller[x].noapply().mean().values[0, 0] 831 if ycol not in means: 832 means[ycol] = ctx.caller[y].noapply().mean().values[0, 0] 833 sumands = { 834 xcol: { 835 '$subtract': [xcol, means[xcol]] 836 }, 837 ycol: { 838 '$subtract': [ycol, means[ycol]] 839 } 840 } 841 multiply = { 842 '$multiply': [sumands[xcol], sumands[ycol]] 843 } 844 agg = { 845 '$group': { 846 '_id': None, 847 'value': { 848 '$sum': multiply 849 } 850 } 851 } 852 project = { 853 '$project': { 854 'cov': { 855 '$divide': ['$value', count], 856 }, 857 'x': x, 858 'y': y, 859 } 860 } 861 pipeline = [agg, project] 862 outcol = '{}_{}'.format(x, y) 863 facets[outcol] = pipeline 864 unwinds.append({'$unwind': '$' + outcol}) 865 facet = { 866 '$facet': facets, 867 } 868 expand = [{ 869 '$project': { 870 'value': { 871 '$objectToArray': '$$CURRENT', 872 } 873 } 874 }, { 875 '$unwind': '$value' 876 }, { 877 '$replaceRoot': { 878 'newRoot': '$value.v' 879 } 880 }] 881 return [facet, *unwinds, *expand] 882 883 def _pearson(self, ctx): 884 # this works 885 # source http://ilearnasigoalong.blogspot.ch/2017/10/calculating-correlation-inside-mongodb.html 886 facets = {} 887 unwinds = [] 888 for x, y in product(ctx.columns, ctx.columns): 889 xcol = '$' + x 890 ycol = '$' + y 891 sumcolumns = {'$group': {'_id': None, 892 'count': {'$sum': 1}, 893 'sumx': {'$sum': xcol}, 894 'sumy': {'$sum': ycol}, 895 'sumxsquared': {'$sum': {'$multiply': [xcol, xcol]}}, 896 'sumysquared': {'$sum': {'$multiply': [ycol, ycol]}}, 897 'sumxy': {'$sum': {'$multiply': [xcol, ycol]}} 898 }} 899 900 multiply_sumx_sumy = {'$multiply': ["$sumx", "$sumy"]} 901 multiply_sumxy_count = {'$multiply': ["$sumxy", "$count"]} 902 partone = {'$subtract': [multiply_sumxy_count, multiply_sumx_sumy]} 903 904 multiply_sumxsquared_count = {'$multiply': ["$sumxsquared", "$count"]} 905 sumx_squared = {'$multiply': ["$sumx", "$sumx"]} 906 subparttwo = {'$subtract': [multiply_sumxsquared_count, sumx_squared]} 907 908 multiply_sumysquared_count = {'$multiply': ["$sumysquared", "$count"]} 909 sumy_squared = {'$multiply': ["$sumy", "$sumy"]} 910 subpartthree = {'$subtract': [multiply_sumysquared_count, sumy_squared]} 911 912 parttwo = {'$sqrt': {'$multiply': [subparttwo, subpartthree]}} 913 914 rho = {'$project': { 915 'rho': { 916 '$divide': [partone, parttwo] 917 }, 918 'x': x, 919 'y': y 920 }} 921 pipeline = [sumcolumns, rho] 922 outcol = '{}_{}'.format(x, y) 923 facets[outcol] = pipeline 924 unwinds.append({'$unwind': '$' + outcol}) 925 facet = { 926 '$facet': facets, 927 } 928 expand = [{ 929 '$project': { 930 'value': { 931 '$objectToArray': '$$CURRENT', 932 } 933 } 934 }, { 935 '$unwind': '$value' 936 }, { 937 '$replaceRoot': { 938 'newRoot': '$value.v' 939 } 940 }] 941 return [facet, *unwinds, *expand] 942 943 def _percentile(self, pctls=None): 944 """ 945 calculate percentiles for all columns 946 """ 947 pctls = pctls or [.25, .5, .75] 948 if not isinstance(pctls, (list, tuple)): 949 pctls = [pctls] 950 951 def calc(col, p, outcol): 952 # sort values 953 sort = { 954 '$sort': { 955 col: 1, 956 } 957 } 958 # group/push to get an array of all values 959 group = { 960 '$group': { 961 '_id': col, 962 'values': { 963 '$push': "$" + col 964 }, 965 } 966 } 967 # find value at requested percentile 968 perc = { 969 '$arrayElemAt': [ 970 '$values', { 971 '$floor': { 972 '$multiply': [{ 973 '$size': '$values' 974 }, p] 975 }} 976 ] 977 } 978 # map percentile value to output column 979 project = { 980 '$project': { 981 'var': col, 982 'percentile': 'p{}'.format(p), 983 'value': perc, 984 } 985 } 986 return [sort, group, project] 987 988 def inner(ctx): 989 # for each column and requested percentile, build a pipeline 990 # all pipelines will be combined into a $facet stage to 991 # calculate every column/percentile tuple in parallel 992 facets = {} 993 unwind = [] 994 # for each column build a pipeline to calculate the percentiles 995 for col in ctx.columns: 996 for p in pctls: 997 # e.g. outcol for perc .25 of column abc => abcp25 998 outcol = '{}_p{}'.format(col, p).replace('0.', '') 999 facets[outcol] = calc(col, p, outcol) 1000 unwind.append({'$unwind': '$'+ outcol}) 1001 # process per-column pipelines in parallel, resulting in one 1002 # document for each variable + percentile combination 1003 facet = { 1004 '$facet': facets 1005 } 1006 # expand single document into one document per variable + percentile combo 1007 # the resulting set of documents contains var/percentile/value 1008 expand = [{ 1009 '$project': { 1010 'value': { 1011 '$objectToArray': '$$CURRENT', 1012 } 1013 } 1014 }, { 1015 '$unwind': '$value' 1016 }, { 1017 '$replaceRoot': { 1018 'newRoot': '$value.v' 1019 } 1020 }] 1021 pipeline = [facet, *unwind, *expand] 1022 return pipeline 1023 1024 return inner 1025 1026