Source code for omegaml.mixins.mdf.apply

   1import hashlib
   2import json
   3from itertools import product
   4from uuid import uuid4
   5
   6import pandas as pd
   7
   8from omegaml.documents import make_QueryCache
   9from omegaml.mdataframe import MDataFrame, MSeries
  10from omegaml.store import qops
  11from omegaml.store.filtered import FilteredCollection
  12from omegaml.util import make_tuple, extend_instance
  13
  14
  15class ApplyMixin(object):
  16    """
  17    Implements the apply() mixin supporting arbitrary functions to build aggregation pipelines
  18
  19    Note that .apply() does not execute immediately. Instead it builds an aggregation pipeline
  20    that is executed on MDataFrame.value. Note that .apply() calls cannot be cascaded yet, i.e.
  21    a later .apply() will override a previous.apply().
  22
  23    See ApplyContext for usage examples.
  24    """
  25
  26    def __init__(self, *args, **kwargs):
  27        super(ApplyMixin, self).__init__(*args, **kwargs)
  28        self._init_mixin(*args, **kwargs)
  29
  30    def _init_mixin(self, *args, **kwargs):
  31        self.apply_fn = kwargs.get('apply_fn', None)
  32        # set to True if the pipeline is a facet operation
  33        self.is_from_facet = kwargs.get('is_from_facet', False)
  34        # index columns
  35        self.index_columns = kwargs.get('index_columns', [])
  36        # db alias
  37        self._db_alias = kwargs.get('db_alias', self._ensure_db_connection())
  38        # cache used on persist()
  39        self.cache = kwargs.get('cache', ApplyCache(self._db_alias))
  40
  41    def _ensure_db_connection(self):
  42        # must import _dbs, _connections locally to ensure mongoshim has been applied
  43        from mongoengine.connection import _dbs, _connections
  44
  45        seek_db = self.collection.database
  46        for alias, db in _dbs.items():
  47            if db is seek_db:
  48                self._db_alias = alias
  49                break
  50        else:
  51            # fake connection register
  52            alias = self._db_alias = 'omega-{}'.format(uuid4().hex)
  53            _connections[alias] = seek_db.client
  54            _dbs[alias] = seek_db
  55        return self._db_alias
  56
  57    def nocache(self):
  58        self.cache = None
  59        return self
  60
  61    def reset_cache(self, full=False):
  62        """
  63        Reset the apply cache
  64
  65        :param full: if True will reset all caches for the collection, if False will only remove
  66            the cache for the specific .apply operations
  67        :return:
  68        """
  69        QueryCache = make_QueryCache(db_alias=self._db_alias)
  70        if full:
  71            QueryCache.objects.filter(value__collection=self.collection.name).delete()
  72        else:
  73            pipeline = self._build_pipeline()
  74            key = self._make_cache_key(self.collection, pipeline)
  75            QueryCache.objects.filter(key=key).delete()
  76        return self
  77
  78    def _make_cache_key(self, collection, pipeline):
  79        # remove random output value
  80        if '$out' in pipeline[-1] and pipeline[-1]['$out'].startswith('cache'):
  81            pipeline = list(pipeline)[:-1]
  82        spipeline = json.dumps(pipeline, sort_keys=True)
  83        data = '{}_{}'.format(collection.name, spipeline).encode('utf-8')
  84        # SEC: CWE-916
  85        # - status: wontfix
  86        # - reason: hashcode is used purely for name resolution, not a security function
  87        key = hashlib.md5(data).hexdigest()
  88        return key
  89
  90    def _getcopy_kwargs(self, **kwargs):
  91        kwargs = super(ApplyMixin, self)._getcopy_kwargs(**kwargs)
  92        kwargs.update(is_from_facet=self.is_from_facet,
  93                      index_columns=self.index_columns,
  94                      cache=self.cache,
  95                      apply_fn=self.apply_fn)
  96        return kwargs
  97
  98    def noapply(self):
  99        self.apply_fn = None
 100        return self
 101
 102    def apply(self, fn, inplace=False, preparefn=None):
 103        if inplace:
 104            obj = self
 105        else:
 106            kwargs = self._getcopy_kwargs()
 107            kwargs.update(preparefn=preparefn)
 108            if isinstance(self, MSeries):
 109                obj = MSeries(self.collection, **kwargs)
 110            else:
 111                obj = MDataFrame(self.collection, **kwargs)
 112        obj.apply_fn = fn
 113        return obj
 114
 115    def persist(self):
 116        """
 117        Execute and store results in cache
 118
 119        Any pipeline of the same operations, in the same order, on
 120        the same collection will return the same result.
 121        """
 122        # generate a cache key
 123        pipeline = self._build_pipeline()
 124        key = self._make_cache_key(self.collection, pipeline)
 125        outname = 'cache_{}'.format(uuid4().hex)
 126        value = {
 127            'collection': self.collection.name,
 128            'result': outname,
 129        }
 130        # do usual processing, store result
 131        # -- note we pass pipeline to avoid processing iterators twice
 132        pipeline.append({
 133            '$out': outname,
 134        })
 135        cursor = self._get_cursor(pipeline=pipeline, use_cache=False)
 136        # consume cursor to store output (via $out)
 137        for v in cursor:
 138            pass
 139        # set cache
 140        self.cache.set(key, value)
 141        return key
 142
 143    def set_index(self, columns):
 144        self.index_columns = make_tuple(columns)
 145        return self
 146
 147    def inspect(self, explain=False, *args, **kwargs):
 148        if self.apply_fn:
 149            details = {
 150                'pipeline': self._build_pipeline()
 151            }
 152            if explain:
 153                details.update(self.__dict__)
 154            return details
 155        return super(ApplyMixin, self).inspect(*args, explain=explain, **kwargs)
 156
 157    def _execute(self):
 158        ctx = ApplyContext(self, columns=self.columns)
 159        try:
 160            result = self.apply_fn(ctx)
 161        except Exception as e:
 162            msg = [repr(stage) for stage in ctx.stages] + [repr(e)]
 163            raise RuntimeError(msg)
 164        if result is None or isinstance(result, ApplyContext):
 165            result = result or ctx
 166            self.index_columns = self.index_columns or result.index_columns
 167            return result
 168        elif isinstance(result, list):
 169            return result
 170        elif isinstance(result, dict):
 171            # expect a mapping of col=ApplyContext each with its own list of stages
 172            # -- build a combined context by adding each expression
 173            #    this ensures any multi-stage projections are carried forward
 174            facets = {}
 175            for col, expr in result.items():
 176                if isinstance(expr, ApplyContext):
 177                    facets[col] = list(expr)
 178                    project = {
 179                        '$project': {
 180                            col: '$' + expr.columns[0]
 181                        },
 182                    }
 183                    facets[col].append(project)
 184                else:
 185                    facets[col] = expr
 186            facet = {
 187                '$facet': facets
 188            }
 189            self.is_from_facet = True
 190            return [facet]
 191        raise ValueError('Cannot build pipeline from apply result of type {}'.format(type(result)))
 192
 193    def _build_pipeline(self):
 194        pipeline = []
 195        stages = self._execute()
 196        pipeline.extend(stages)
 197        self._amend_pipeline(pipeline)
 198        return pipeline
 199
 200    def _amend_pipeline(self, pipeline):
 201        """ amend pipeline with default ops on coll.aggregate() calls """
 202        if self.sort_order:
 203            sort = qops.SORT(**dict(qops.make_sortkey(self.sort_order)))
 204            pipeline.append(sort)
 205        return pipeline
 206
 207    def _get_cached_cursor(self, pipeline=None, use_cache=True):
 208        pipeline = pipeline or self._build_pipeline()
 209        if use_cache and self.cache:
 210            key = self._make_cache_key(self.collection, pipeline)
 211            entry = self.cache.get(key)
 212            if entry is not None:
 213                # read result
 214                outname = entry.value['result']
 215                return self.collection.database[outname].find()
 216
 217    def _get_cursor(self, pipeline=None, use_cache=True):
 218        # for apply functions, call the apply function, expecting a pipeline in return
 219        if self.apply_fn:
 220            pipeline = pipeline or self._build_pipeline()
 221            cursor = self._get_cached_cursor(pipeline=pipeline, use_cache=use_cache)
 222            if cursor is None:
 223                filter_criteria = self._get_filter_criteria()
 224                cursor = FilteredCollection(self.collection).aggregate(pipeline, filter=filter_criteria,
 225                                                                       allowDiskUse=True)
 226        else:
 227            cursor = super(ApplyMixin, self)._get_cursor()
 228        return cursor
 229
 230    def _get_dataframe_from_cursor(self, cursor):
 231        df = super(ApplyMixin, self)._get_dataframe_from_cursor(cursor)
 232        if self.is_from_facet:
 233            # if this was from a facet pipeline (i.e. multi-column mapping), combine
 234            # $facet returns one document for each stage.
 235            frames = []
 236            for col in df.columns:
 237                coldf = pd.DataFrame(df[col].iloc[0]).set_index('_id')
 238                frames.append(coldf)
 239            df = pd.concat(frames, axis=1).reset_index()
 240            df = self._restore_dataframe_proper(df)
 241        # TODO write a unit test for this condition
 242        if self.index_columns and all(col in df.columns for col in self.index_columns):
 243            df.set_index(list(self.index_columns), inplace=True)
 244        return df
 245
 246
[docs] 247class ApplyContext(object): 248 """ 249 Enable apply functions 250 251 .apply(fn) will call fn(ctx) where ctx is an ApplyContext. 252 The context supports methods to apply functions in a Pandas-style apply manner. ApplyContext is extensible 253 by adding an extension class to defaults.OMEGA_MDF_APPLY_MIXINS. 254 255 Note that unlike a Pandas DataFrame, ApplyContext does not itself contain any data. 256 Rather it is part of an expression tree, i.e. the aggregation pipeline. Thus any 257 expressions applied are translated into operations on the expression tree. The expression 258 tree is evaluated on MDataFrame.value, at which point the ApplyContext nor the function 259 that created it are active. 260 261 Examples:: 262 263 mdf.apply(lambda v: v * 5 ) => multiply every column in dataframe 264 mdf.apply(lambda v: v['foo'].dt.week) => get week of date for column foo 265 mdf.apply(lambda v: dict(a=v['foo'].dt.week, 266 b=v['bar'] * 5) => run multiple pipelines and get results 267 268 The callable passed to apply can be any function. It can either return None, 269 the context passed in or a list of pipeline stages. 270 271 # apply any of the below functions 272 mdf.apply(customfn) 273 274 # same as lambda v: v.dt.week 275 def customfn(ctx): 276 return ctx.dt.week 277 278 # simple pipeline 279 def customfn(ctx): 280 ctx.project(x={'$multiply: ['$x', 5]}) 281 ctx.project(y={'$divide: ['$x', 2]}) 282 283 # complex pipeline 284 def customfn(ctx): 285 return [ 286 { '$match': ... }, 287 { '$project': ... }, 288 ] 289 """ 290 291 def __init__(self, caller, columns=None, index=None): 292 self.caller = caller 293 self.columns = columns 294 self.index_columns = index or [] 295 self.computed = [] 296 self.stages = [] 297 self.expressions = [] 298 self._apply_mixins() 299 300 def _apply_mixins(self): 301 """ 302 apply mixins in defaults.OMEGA_MDF_APPLY_MIXINS 303 """ 304 from omegaml import settings 305 defaults = settings() 306 for mixin, applyto in defaults.OMEGA_MDF_APPLY_MIXINS: 307 if any(v in self.caller._applyto for v in applyto.split(',')): 308 extend_instance(self, mixin) 309 310 def __iter__(self): 311 # return pipeline stages 312 for stage in self.stages: 313 if isinstance(stage, ApplyContext): 314 for sub_stage in stage: 315 yield sub_stage 316 else: 317 yield stage 318 319 def __getitem__(self, sel): 320 """ 321 return a stage subset on a column 322 """ 323 subctx = ApplyContext(self.caller, columns=make_tuple(sel), index=self.index_columns) 324 self.add(subctx) 325 return subctx 326 327 def __setitem__(self, sel, val): 328 """ 329 add a projection to a sub context 330 331 ctx['col'] = value-expression 332 """ 333 mapping = { 334 col: v 335 for (col, v) in zip(make_tuple(sel), make_tuple(val))} 336 self.project(mapping) 337 338 def __repr__(self): 339 return 'ApplyContext(stages={}, expressions={})'.format(self.stages, self.expressions) 340 341 def add(self, stage): 342 """ 343 Add a processing stage to the pipeline 344 345 see https://docs.mongodb.com/manual/meta/aggregation-quick-reference/ 346 """ 347 self.stages.append(stage) 348 return self 349 350 def project_keeper_columns(self): 351 # keep index, computed 352 index = { 353 col: '$' + col 354 for col in self.index_columns} 355 computed = { 356 col: '$' + col 357 for col in self.computed} 358 keep = {} 359 keep.update(index) 360 keep.update(computed) 361 project = self.project(keep, keep=True) 362 return project 363 364 def _getLastStageKind(self, kind): 365 # see if there is already an open projection stage 366 for stage in self.stages[::-1]: 367 if kind in stage: 368 return stage 369 370 def _getProjection(self, append=False): 371 stage = self._getLastStageKind('$project') 372 if stage is None or append: 373 stage = { 374 '$project': { 375 '_id': 1, 376 } 377 } 378 self.stages.append(stage) 379 return stage 380 381 def _getGroupBy(self, by=None, append=False): 382 stage = self._getLastStageKind('$group') 383 if stage and stage['$group']['_id'] != by and by != '$$last': 384 # if a different groupby criteria, add a new one 385 stage = None 386 if stage is None and by == '$$last': 387 by = None 388 if stage is None or append: 389 stage = { 390 '$group': { 391 '_id': by, 392 } 393 } 394 self.stages.append(stage) 395 return stage 396 397 def groupby(self, by, expr=None, append=None, **kwargs): 398 """ 399 add a groupby accumulation using $group 400 401 :param by: the groupby columns, if provided as a list will be transformed 402 :param expr: 403 :param append: 404 :param kwargs: 405 :return: 406 407 """ 408 by = make_tuple(by) 409 self.index_columns = self.index_columns + list(by) 410 # define groupby 411 by = {col: '$' + col for col in by} 412 stage = self._getGroupBy(by) 413 groupby = stage['$group'] 414 # add acccumulators 415 expr = expr or { 416 col: colExpr 417 for col, colExpr in kwargs.items()} 418 groupby.update(expr) 419 # add a projection to extract groupby values 420 extractId = { 421 col: '$_id.' + col 422 for col in by} 423 # add a projection to keep accumulator columns 424 keepCols = { 425 col: 1 426 for col in expr} 427 keepCols.update(extractId) 428 self.project(keepCols, append=True) 429 # sort by groupby keys 430 self.add({ 431 '$sort': { 432 col: 1 433 for col in by} 434 }) 435 return self 436 437 def project(self, expr=None, append=False, keep=False, **kwargs): 438 """ 439 add a projection using $project 440 441 :param expr: the column-operator mapping 442 :param append: if True add a $project stage, otherwise add to existing 443 :param kwargs: if expr is None, the column-operator mapping as kwargs 444 :return: ApplyContext 445 446 """ 447 # get last $project stage in pipeline 448 stage = self._getProjection(append=append) 449 expr = expr or kwargs 450 self.expressions.append(expr) 451 for k, v in expr.items(): 452 # only append to stage if no other column projection was there 453 project = stage.get('$project') 454 if k not in project: 455 project.update({ 456 k: v 457 }) 458 elif not keep: 459 # if a column is already projected, add a new projection stage 460 stage = self._getProjection(append=True) 461 project = stage.get('$project') 462 project.update({ 463 k: v 464 }) 465 return self
466 467
[docs] 468class ApplyArithmetics(object): 469 """ 470 Math operators for ApplyContext 471 472 * :code:`__mul__` (*) 473 * :code:`__add__` (+) 474 * :code:`__sub__` (-) 475 * :code:`__div__` (/) 476 * :code:`__floordiv__` (//) 477 * :code:`__mod__` (%) 478 * :code:`__pow__` (pow) 479 * :code:`__ceil__` (ceil) 480 * :code:`__floor__` (floor) 481 * :code:`__trunc__` (trunc) 482 * :code:`__abs__` (abs) 483 * :code:`sqrt` (math.sqrt) 484 485 """ 486 487 def __arithmop__(op, wrap_op=None): 488 """ 489 return a pipeline $project stage math operator as 490 { col: 491 { '$operator': [ values, ...] } 492 ... 493 } 494 495 If wrap_op is specified, will wrap the $operator clause as 496 { col: 497 { '$wrap_op': { '$operator': [ values, ...] } }0 498 ... 499 } 500 """ 501 502 def inner(self, other): 503 terms = [] 504 for term in make_tuple(other): 505 if isinstance(term, str): 506 term = '$' + term 507 terms.append(term) 508 509 def wrap(expr): 510 if wrap_op is not None: 511 expr = { 512 wrap_op: expr 513 } 514 return expr 515 516 mapping = { 517 col: wrap({ 518 op: ['$' + col] + terms, 519 }) for col in self.columns} 520 keepCols = { 521 col: '$' + col 522 for col in self.index_columns} 523 mapping.update(keepCols) 524 self.project(mapping) 525 return self 526 527 return inner 528 529 #: multiply 530 __mul__ = __arithmop__('$multiply') 531 #: add 532 __add__ = __arithmop__('$add') 533 #: subtract 534 __sub__ = __arithmop__('$subtract') 535 #: divide 536 __div__ = __arithmop__('$divide') 537 __truediv__ = __arithmop__('$divide') 538 #: divide integer 539 __floordiv__ = __arithmop__('$divide', wrap_op='$floor') 540 #: modulo (%) 541 __mod__ = __arithmop__('$mod') 542 #: pow 543 __pow_ = __arithmop__('$pow') 544 #: ceil 545 __ceil__ = __arithmop__('$ceil') 546 #: floor 547 __floor__ = __arithmop__('$floor') 548 #: truncate 549 __trunc__ = __arithmop__('$trunc') 550 #: absolute 551 __abs__ = __arithmop__('$abs') 552 #: square root 553 sqrt = __arithmop__('sqrt')
554 555
[docs] 556class ApplyDateTime(object): 557 """ 558 Datetime operators for ApplyContext 559 """ 560 561 @property 562 def dt(self): 563 return self 564 565 def __dtop__(op): 566 """ 567 return a datetime $project operator as 568 { col: 569 { '$operator': '$col} } 570 ... 571 } 572 """ 573 574 def inner(self, columns=None): 575 columns = make_tuple(columns or self.columns) 576 mapping = { 577 col: { 578 op: '$' + col, 579 } 580 for col in columns} 581 self.project(mapping) 582 return self 583 584 inner.__doc__ = op.replace('$', '') 585 return inner 586 587 # mongodb mappings 588 _year = __dtop__('$year') 589 _month = __dtop__('$month') 590 _week = __dtop__('$week') 591 _dayOfWeek = __dtop__('$dayOfWeek') 592 _dayOfMonth = __dtop__('$dayOfMonth') 593 _dayOfYear = __dtop__('$dayOfYear') 594 _hour = __dtop__('$hour') 595 _minute = __dtop__('$minute') 596 _second = __dtop__('$second') 597 _millisecond = __dtop__('$millisecond') 598 _isoDayOfWeek = __dtop__('$isoDayOfWeek') 599 _isoWeek = __dtop__('$isoWeek') 600 _isoWeekYear = __dtop__('$isoWeekYear') 601 602 # .dt accessor convenience similar to pandas.dt 603 # see https://pandas.pydata.org/pandas-docs/stable/api.html#datetimelike-properties 604 year = property(_year) 605 month = property(_month) 606 day = property(_dayOfMonth) 607 hour = property(_hour) 608 minute = property(_minute) 609 second = property(_second) 610 millisecond = property(_millisecond) 611 week = property(_isoWeek) 612 dayofyear = property(_dayOfYear) 613 dayofweek = property(_dayOfWeek)
614 615
[docs] 616class ApplyString(object): 617 """ 618 String operators 619 """ 620 621 @property 622 def str(self): 623 return self 624 625 def __strexpr__(op, unwind=False, base=None, max_terms=None): 626 """ 627 return a pipeline $project string operator as 628 { col: 629 { '$operator': [ values, ...] } 630 ... 631 } 632 """ 633 634 def inner(self, other, *args): 635 # get all values passed and build terms from them 636 values = list(make_tuple(other) + args) 637 terms = [] 638 for term in values: 639 if isinstance(term, str): 640 # if the term is a column name, add as a column name 641 if term in self.columns: 642 term = '$' + term 643 # allow to specify values explicitely by $$<value> => <value> 644 term = term.replace('$$', '') 645 terms.append(term) 646 # limit number of terms if requested 647 if max_terms: 648 terms = terms[:max_terms] 649 # add projection of output columns to operator 650 mapping = { 651 col: { 652 op: terms if base is None else ['$' + col] + terms, 653 } for col in self.columns} 654 self.project(mapping) 655 # unwind all columns if requested 656 if unwind: 657 exprs = [{'$unwind': { 658 'path': '$' + col 659 }} for col in self.columns] 660 self.stages.extend(exprs) 661 return self 662 663 inner.__doc__ = op.replace('$', '') 664 return inner 665 666 def __strunary__(op, unwind=False): 667 """ 668 return a datetime $project operator as 669 { col: 670 { '$operator': '$col} } 671 ... 672 } 673 """ 674 675 def inner(self, columns=None): 676 columns = make_tuple(columns or self.columns) 677 mapping = { 678 col: { 679 op: '$' + col, 680 } 681 for col in columns} 682 self.project(mapping) 683 if unwind: 684 self.stages.append({ 685 '$unwind': { 686 '' 687 } 688 }) 689 return self 690 691 inner.__doc__ = op.replace('$', '') 692 693 return inner 694 695 def isequal(self, other): 696 self.strcasecmp(other) 697 # strcasecmp returns 0 for equality, 1 and -1 for greater/less than 698 # https://docs.mongodb.com/manual/reference/operator/aggregation/strcasecmp/ 699 mapping = { 700 col: { 701 '$cond': { 702 'if': {'$eq': ['$' + col, 0]}, 703 'then': True, 704 'else': False, 705 } 706 } 707 for col in self.columns} 708 self.project(mapping) 709 710 concat = __strexpr__('$concat', base=True) 711 split = __strexpr__('$split', unwind=True, base=True, max_terms=2) 712 usplit = __strexpr__('$split', unwind=False, base=True, max_terms=2) 713 upper = __strunary__('$toUpper') 714 lower = __strunary__('$toLower') 715 substr = __strexpr__('$substr', base=True) 716 strcasecmp = __strexpr__('$strcasecmp', base=True) 717 len = __strunary__('$strLenBytes') 718 index = __strexpr__('$indexOfBytes', base=True)
719 720
[docs] 721class ApplyAccumulators(object): 722 def agg(self, map=None, **kwargs): 723 stage = self._getGroupBy(by='$$last') 724 specs = map or kwargs 725 for col, colExpr in specs.items(): 726 if isinstance(colExpr, dict): 727 # specify an arbitrary expression 728 groupby = stage['$group'] 729 groupby[col] = colExpr 730 elif isinstance(colExpr, str): 731 # specify some known operator 732 if hasattr(self, colExpr): 733 method = getattr(self, colExpr) 734 method(col) 735 else: 736 raise SyntaxError('{} is not known'.format(colExpr)) 737 elif isinstance(colExpr, (tuple, list)): 738 # specify a list of some known operators 739 for statExpr in colExpr: 740 if hasattr(self, statExpr): 741 method = getattr(self, statExpr) 742 method(col) 743 else: 744 raise SyntaxError('{} is not known'.format(statExpr)) 745 elif callable(colExpr): 746 # specify a callable that returns an expression 747 groupby = stage['$group'] 748 groupby[col] = colExpr(col) 749 else: 750 SyntaxError('{} on column {} is unknown or invalid'.format(colExpr, col)) 751 return self 752 753 def __statop__(op, opname=None): 754 opname = opname or op.replace('$', '') 755 756 def inner(self, columns=None): 757 columns = make_tuple(columns or self.columns) 758 stage = self._getGroupBy(by='$$last') 759 groupby = stage['$group'] 760 groupby.update({ 761 '{}_{}'.format(col, opname): { 762 op: '$' + col 763 } for col in columns 764 }) 765 self.computed.extend(groupby.keys()) 766 self.project_keeper_columns() 767 return self 768 769 return inner 770 771 sum = __statop__('$sum') 772 avg = __statop__('$avg') 773 mean = __statop__('$avg') 774 min = __statop__('$min') 775 max = __statop__('$max') 776 std = __statop__('$stdDevSamp', 'std')
777 778 779class ApplyCache(object): 780 """ 781 A Cache that works on collections and pipelines 782 """ 783 784 def __init__(self, db_alias): 785 self._db_alias = db_alias 786 787 def set(self, key, value): 788 # https://stackoverflow.com/a/22003440/890242 789 QueryCache = make_QueryCache(self._db_alias) 790 QueryCache.objects(key=key).update_one(set__key="{}".format(key), 791 set__value=value, upsert=True) 792 793 def get(self, key): 794 QueryCache = make_QueryCache(self._db_alias) 795 try: 796 result = QueryCache.objects.get(key=key) 797 except: 798 result = None 799 return result 800 801 802class ApplyStatistics(object): 803 def quantile(self, q=.5): 804 def preparefn(val): 805 return val.pivot(columns='var', index='percentile', values='value') 806 807 return self.apply(self._percentile(q), preparefn=preparefn) 808 809 def cov(self): 810 def preparefn(val): 811 val = val.pivot(columns='y', index='x', values='cov') 812 val.index.name = None 813 val.columns.name = None 814 return val 815 816 return self.apply(self._covariance, preparefn=preparefn) 817 818 def corr(self): 819 def preparefn(val): 820 val = val.pivot(columns='y', index='x', values='rho') 821 val.index.name = None 822 val.columns.name = None 823 return val 824 825 return self.apply(self._pearson, preparefn=preparefn) 826 827 def _covariance(self, ctx): 828 # this works 829 # source http://ci.columbia.edu/ci/premba_test/c0331/s7/s7_5.html 830 facets = {} 831 means = {} 832 unwinds = [] 833 count = len(ctx.caller.noapply()) - 1 834 for x, y in product(ctx.columns, ctx.columns): 835 xcol = '$' + x 836 ycol = '$' + y 837 # only calculate the same column's mean once 838 if xcol not in means: 839 means[xcol] = ctx.caller[x].noapply().mean().values[0, 0] 840 if ycol not in means: 841 means[ycol] = ctx.caller[y].noapply().mean().values[0, 0] 842 sumands = { 843 xcol: { 844 '$subtract': [xcol, means[xcol]] 845 }, 846 ycol: { 847 '$subtract': [ycol, means[ycol]] 848 } 849 } 850 multiply = { 851 '$multiply': [sumands[xcol], sumands[ycol]] 852 } 853 agg = { 854 '$group': { 855 '_id': None, 856 'value': { 857 '$sum': multiply 858 } 859 } 860 } 861 project = { 862 '$project': { 863 'cov': { 864 '$divide': ['$value', count], 865 }, 866 'x': x, 867 'y': y, 868 } 869 } 870 pipeline = [agg, project] 871 outcol = '{}_{}'.format(x, y) 872 facets[outcol] = pipeline 873 unwinds.append({'$unwind': '$' + outcol}) 874 facet = { 875 '$facet': facets, 876 } 877 expand = [{ 878 '$project': { 879 'value': { 880 '$objectToArray': '$$CURRENT', 881 } 882 } 883 }, { 884 '$unwind': '$value' 885 }, { 886 '$replaceRoot': { 887 'newRoot': '$value.v' 888 } 889 }] 890 return [facet, *unwinds, *expand] 891 892 def _pearson(self, ctx): 893 # this works 894 # source http://ilearnasigoalong.blogspot.ch/2017/10/calculating-correlation-inside-mongodb.html 895 facets = {} 896 unwinds = [] 897 for x, y in product(ctx.columns, ctx.columns): 898 xcol = '$' + x 899 ycol = '$' + y 900 sumcolumns = {'$group': {'_id': None, 901 'count': {'$sum': 1}, 902 'sumx': {'$sum': xcol}, 903 'sumy': {'$sum': ycol}, 904 'sumxsquared': {'$sum': {'$multiply': [xcol, xcol]}}, 905 'sumysquared': {'$sum': {'$multiply': [ycol, ycol]}}, 906 'sumxy': {'$sum': {'$multiply': [xcol, ycol]}} 907 }} 908 909 multiply_sumx_sumy = {'$multiply': ["$sumx", "$sumy"]} 910 multiply_sumxy_count = {'$multiply': ["$sumxy", "$count"]} 911 partone = {'$subtract': [multiply_sumxy_count, multiply_sumx_sumy]} 912 913 multiply_sumxsquared_count = {'$multiply': ["$sumxsquared", "$count"]} 914 sumx_squared = {'$multiply': ["$sumx", "$sumx"]} 915 subparttwo = {'$subtract': [multiply_sumxsquared_count, sumx_squared]} 916 917 multiply_sumysquared_count = {'$multiply': ["$sumysquared", "$count"]} 918 sumy_squared = {'$multiply': ["$sumy", "$sumy"]} 919 subpartthree = {'$subtract': [multiply_sumysquared_count, sumy_squared]} 920 921 parttwo = {'$sqrt': {'$multiply': [subparttwo, subpartthree]}} 922 923 rho = {'$project': { 924 'rho': { 925 '$divide': [partone, parttwo] 926 }, 927 'x': x, 928 'y': y 929 }} 930 pipeline = [sumcolumns, rho] 931 outcol = '{}_{}'.format(x, y) 932 facets[outcol] = pipeline 933 unwinds.append({'$unwind': '$' + outcol}) 934 facet = { 935 '$facet': facets, 936 } 937 expand = [{ 938 '$project': { 939 'value': { 940 '$objectToArray': '$$CURRENT', 941 } 942 } 943 }, { 944 '$unwind': '$value' 945 }, { 946 '$replaceRoot': { 947 'newRoot': '$value.v' 948 } 949 }] 950 return [facet, *unwinds, *expand] 951 952 def _percentile(self, pctls=None): 953 """ 954 calculate percentiles for all columns 955 """ 956 pctls = pctls or [.25, .5, .75] 957 if not isinstance(pctls, (list, tuple)): 958 pctls = [pctls] 959 960 def calc(col, p, outcol): 961 # sort values 962 sort = { 963 '$sort': { 964 col: 1, 965 } 966 } 967 # group/push to get an array of all values 968 group = { 969 '$group': { 970 '_id': col, 971 'values': { 972 '$push': "$" + col 973 }, 974 } 975 } 976 # find value at requested percentile 977 perc = { 978 '$arrayElemAt': [ 979 '$values', { 980 '$floor': { 981 '$multiply': [{ 982 '$size': '$values' 983 }, p] 984 }} 985 ] 986 } 987 # map percentile value to output column 988 project = { 989 '$project': { 990 'var': col, 991 'percentile': 'p{}'.format(p), 992 'value': perc, 993 } 994 } 995 return [sort, group, project] 996 997 def inner(ctx): 998 # for each column and requested percentile, build a pipeline 999 # all pipelines will be combined into a $facet stage to 1000 # calculate every column/percentile tuple in parallel 1001 facets = {} 1002 unwind = [] 1003 # for each column build a pipeline to calculate the percentiles 1004 for col in ctx.columns: 1005 for p in pctls: 1006 # e.g. outcol for perc .25 of column abc => abcp25 1007 outcol = '{}_p{}'.format(col, p).replace('0.', '') 1008 facets[outcol] = calc(col, p, outcol) 1009 unwind.append({'$unwind': '$' + outcol}) 1010 # process per-column pipelines in parallel, resulting in one 1011 # document for each variable + percentile combination 1012 facet = { 1013 '$facet': facets 1014 } 1015 # expand single document into one document per variable + percentile combo 1016 # the resulting set of documents contains var/percentile/value 1017 expand = [{ 1018 '$project': { 1019 'value': { 1020 '$objectToArray': '$$CURRENT', 1021 } 1022 } 1023 }, { 1024 '$unwind': '$value' 1025 }, { 1026 '$replaceRoot': { 1027 'newRoot': '$value.v' 1028 } 1029 }] 1030 pipeline = [facet, *unwind, *expand] 1031 return pipeline 1032 1033 return inner