Source code for omegaml.mdataframe

   1from __future__ import absolute_import
   2
   3import numpy as np
   4import pandas as pd
   5from bson import Code
   6from numpy import isscalar
   7from pymongo.collection import Collection
   8from uuid import uuid4
   9
  10from omegaml.store import qops
  11from omegaml.store.filtered import FilteredCollection
  12from omegaml.store.query import Filter, MongoQ
  13from omegaml.store.queryops import MongoQueryOps
  14from omegaml.util import make_tuple, make_list, restore_index, \
  15    cursor_to_dataframe, restore_index_columns_order, PickableCollection, extend_instance, json_normalize, ensure_index
  16
  17INSPECT_CACHE = []
  18
  19
[docs] 20class MGrouper(object): 21 """ 22 a Grouper for MDataFrames 23 """ 24 STATS_MAP = { 25 'std': 'stdDevSamp', 26 'mean': 'avg', 27 } 28 29 def __init__(self, mdataframe, collection, columns, sort=True): 30 self.mdataframe = mdataframe 31 self.collection = collection 32 self.columns = make_tuple(columns) 33 self.should_sort = sort 34 35 def __getattr__(self, attr): 36 if attr in self.columns: 37 return MSeriesGroupby(self, self.collection, attr) 38 39 def statfunc(): 40 columns = self.columns or self._non_group_columns() 41 return self.agg({col: attr for col in columns}) 42 43 return statfunc 44 45 def __getitem__(self, item): 46 return self.__getattr__(item) 47
[docs] 48 def agg(self, specs): 49 """ 50 shortcut for .aggregate 51 """ 52 return self.aggregate(specs)
53
[docs] 54 def aggregate(self, specs, **kwargs): 55 """ 56 aggregate by given specs 57 58 See the following link for a list of supported operations. 59 https://docs.mongodb.com/manual/reference/operator/aggregation/group/ 60 61 :param specs: a dictionary of { column : function | list[functions] } 62 pairs. 63 """ 64 65 def add_stats(specs, column, stat): 66 specs['%s_%s' % (column, stat)] = { 67 '$%s' % MGrouper.STATS_MAP.get(stat, stat): '$%s' % column} 68 69 # generate $group command 70 _specs = {} 71 for column, stats in specs.items(): 72 stats = make_tuple(stats) 73 for stat in stats: 74 add_stats(_specs, column, stat) 75 groupby = qops.GROUP(columns=self.columns, 76 **_specs) 77 # execute and return a dataframe 78 pipeline = self._amend_pipeline([groupby]) 79 data = self.collection.aggregate(pipeline, allowDiskUse=True) 80 81 def get_data(): 82 # we need this to build a pipeline for from_records 83 # to process, otherwise the cursor will be exhausted already 84 for group in data: 85 _id = group.pop('_id') 86 if isinstance(_id, dict): 87 group.update(_id) 88 yield group 89 90 df = pd.DataFrame.from_records(get_data()) 91 columns = make_list(self.columns) 92 if columns: 93 df = df.set_index(columns, drop=True) 94 return df
95 96 def _amend_pipeline(self, pipeline): 97 """ amend pipeline with default ops on coll.aggregate() calls """ 98 if self.should_sort: 99 sort = qops.SORT(**dict(qops.make_sortkey('_id'))) 100 pipeline.append(sort) 101 return pipeline 102 103 def _non_group_columns(self): 104 """ get all columns in mdataframe that is not in columns """ 105 return [col for col in self.mdataframe.columns 106 if col not in self.columns and col != '_id' 107 and not col.startswith('_idx') 108 and not col.startswith('_om#')] 109 110 def _count(self): 111 count_columns = self._non_group_columns() 112 if len(count_columns) == 0: 113 count_columns.append('_'.join(self.columns) + '_count') 114 groupby = { 115 "$group": { 116 "_id": {k: "$%s" % k for k in self.columns}, 117 } 118 } 119 for k in count_columns: 120 groupby['$group']['%s' % k] = {"$sum": 1} 121 pipeline = self._amend_pipeline([groupby]) 122 if self.should_sort: 123 sort = qops.SORT(**dict(qops.make_sortkey('_id'))) 124 pipeline.append(sort) 125 return list(self.collection.aggregate(pipeline, allowDiskUse=True)) 126
[docs] 127 def count(self): 128 """ return counts by group columns """ 129 counts = self._count() 130 # remove mongo object _id 131 for group in counts: 132 group.update(group.pop('_id')) 133 # transform results to dataframe, then return as pandas would 134 resultdf = pd.DataFrame(counts).set_index(make_list(self.columns), 135 drop=True) 136 return resultdf
137 138 def __iter__(self): 139 """ for each group returns the key and a Filter object""" 140 # reduce count to only one column 141 groups = getattr(self, self.columns[0])._count() 142 for group in groups: 143 keys = group.get('_id') 144 data = self.mdataframe._clone(query=keys) 145 yield keys, data
146 147
[docs] 148class MLocIndexer(object): 149 """ 150 implements the LocIndexer for MDataFrames 151 """ 152 153 def __init__(self, mdataframe, positional=False): 154 self.mdataframe = mdataframe 155 # if positional, any loc[spec] will be applied on the rowid only 156 self.positional = positional 157 # indicator will be set true if loc specs are from a range type (list, tuple, np.ndarray) 158 self._from_range = False 159
[docs] 160 def __getitem__(self, specs): 161 """ 162 access by index 163 164 use as mdf.loc[specs] where specs is any of 165 166 * a list or tuple of scalar index values, e.g. .loc[(1,2,3)] 167 * a slice of values e.g. .loc[1:5] 168 * a list of slices, e.g. .loc[1:5, 2:3] 169 170 :return: the sliced part of the MDataFrame 171 """ 172 filterq, projection = self._get_filter(specs) 173 df = self.mdataframe 174 if filterq: 175 df = self.mdataframe.query(filterq) 176 df.from_loc_indexer = True 177 df.from_loc_range = self._from_range 178 if projection: 179 df = df[projection] 180 if isinstance(self.mdataframe, MSeries): 181 df = df._as_mseries(df.columns[0]) 182 if getattr(df, 'immediate_loc', False): 183 df = df.value 184 return df
185 186 def __setitem__(self, specs, value): 187 raise NotImplementedError() 188 189 def _get_filter(self, specs): 190 filterq = [] 191 projection = None 192 if self.positional: 193 idx_cols = ['_om#rowid'] 194 else: 195 idx_cols = self.mdataframe._get_frame_index() 196 flt_kwargs = {} 197 enumerable_types = (list, tuple, np.ndarray) 198 if isinstance(specs, np.ndarray): 199 specs = specs.tolist() 200 if (isinstance(specs, enumerable_types) 201 and isscalar(specs[0]) and len(idx_cols) == 1 202 and not any(isinstance(s, slice) for s in specs)): 203 # single column index with list of scalar values 204 if (self.positional and isinstance(specs, tuple) and len(specs) == 2 205 and all(isscalar(v) for v in specs)): 206 # iloc[int, int] is a cell access 207 flt_kwargs[idx_cols[0]] = specs[0] 208 projection = self._get_projection(specs[1]) 209 else: 210 flt_kwargs['{}__in'.format(idx_cols[0])] = specs 211 self._from_range = True 212 elif isinstance(specs, (int, str)): 213 flt_kwargs[idx_cols[0]] = specs 214 else: 215 specs = make_tuple(specs) 216 # list/tuple of slices or scalar values, or MultiIndex 217 for i, spec in enumerate(specs): 218 if i < len(idx_cols): 219 col = idx_cols[i] 220 if isinstance(spec, slice): 221 self._from_range = True 222 start, stop = spec.start, spec.stop 223 if start is not None: 224 flt_kwargs['{}__gte'.format(col)] = start 225 if stop is not None: 226 if isinstance(stop, int): 227 stop -= int(self.positional) 228 flt_kwargs['{}__lte'.format(col)] = stop 229 elif isinstance(spec, enumerable_types) and isscalar(spec[0]): 230 self._from_range = True 231 # single column index with list of scalar values 232 # -- convert to list for PyMongo serialization 233 if isinstance(spec, np.ndarray): 234 spec = spec.tolist() 235 flt_kwargs['{}__in'.format(col)] = spec 236 elif isscalar(col): 237 flt_kwargs[col] = spec 238 else: 239 # we're out of index columns, let's look at columns 240 cur_proj = self._get_projection(spec) 241 # if we get a single column, that's the projection 242 # if we had a single column, now need to add more, create a list 243 # if we have a list already, extend that 244 if projection is None: 245 projection = cur_proj 246 elif isinstance(projection, list): 247 projection.extend(cur_proj) 248 else: 249 projection = [projection, cur_proj] 250 if flt_kwargs: 251 filterq.append(MongoQ(**flt_kwargs)) 252 finalq = None 253 for q in filterq: 254 if finalq: 255 finalq |= q 256 else: 257 finalq = q 258 return finalq, projection 259 260 def _get_projection(self, spec): 261 columns = self.mdataframe.columns 262 if np.isscalar(spec): 263 return [spec] 264 if isinstance(spec, (tuple, list)): 265 assert all(columns.index(col) for col in columns) 266 return spec 267 if isinstance(spec, slice): 268 start, stop = spec.start, spec.stop 269 if all(isinstance(v, int) for v in (start, stop)): 270 start, stop, step = spec.indices(len(columns)) 271 else: 272 start = columns.index(start) if start is not None else 0 273 stop = columns.index(stop) + 1 if stop is not None else len(columns) 274 return columns[slice(start, stop)] 275 raise IndexError
276 277
[docs] 278class MPosIndexer(MLocIndexer): 279 """ 280 implements the position-based indexer for MDataFrames 281 """ 282 283 def __init__(self, mdataframe): 284 super(MPosIndexer, self).__init__(mdataframe, positional=True) 285 286 def _get_projection(self, spec): 287 columns = self.mdataframe.columns 288 if np.isscalar(spec): 289 return columns[spec] 290 if isinstance(spec, (tuple, list)): 291 return [col for i, col in enumerate(spec) if i in spec] 292 if isinstance(spec, slice): 293 start, stop = slice.start, slice.stop 294 if start and not isinstance(start, int): 295 start = 0 296 if stop and not isinstance(stop, int): 297 # sliced ranges are inclusive 298 stop = len(columns) 299 return columns[slice(start, stop)] 300 raise IndexError
301 302 303class MSeriesGroupby(MGrouper): 304 """ 305 like a MGrouper but limited to one column 306 """ 307 308 def count(self): 309 """ 310 return series count 311 312 :return: counts by group 313 """ 314 # MGrouper will insert a _count column, see _count(). we remove 315 # that column again and return a series named as the group column 316 resultdf = super(MSeriesGroupby, self).count() 317 count_column = [col for col in resultdf.columns 318 if col.endswith('_count')][0] 319 new_column = count_column.replace('_count', '') 320 resultdf = resultdf.rename(columns={count_column: new_column}) 321 return resultdf[new_column] 322 323
[docs] 324class MDataFrame(object): 325 """ 326 A DataFrame for mongodb 327 328 Performs out-of-core, lazy computOation on a mongodb cluster. 329 Behaves like a pandas DataFrame. Actual results are returned 330 as pandas DataFrames. 331 """ 332 333 STATFUNCS = ['mean', 'std', 'min', 'max', 'sum', 'var'] 334 335 def __init__(self, collection, columns=None, query=None, 336 limit=None, skip=None, sort_order=None, 337 force_columns=None, immediate_loc=False, auto_inspect=False, 338 normalize=False, raw=False, 339 parser=None, 340 preparefn=None, from_loc_range=False, metadata=None, **kwargs): 341 self.collection = PickableCollection(collection) 342 # columns in frame 343 self.columns = make_tuple(columns) if columns else self._get_fields(raw=raw) 344 self.columns = [str(col) for col in self.columns] 345 # columns to sort by, defaults to not sorted 346 self.sort_order = sort_order 347 # top n documents to fetch 348 self.head_limit = limit 349 # top n documents to skip before returning 350 self.skip_topn = skip 351 # filter criteria 352 self.filter_criteria = query or {} 353 # force columns -- on output add columns not present 354 self.force_columns = force_columns or [] 355 # was this created from the loc indexer? 356 self.from_loc_indexer = kwargs.get('from_loc_indexer', False) 357 # was the loc index used a range? Else a single value 358 self.from_loc_range = from_loc_range 359 # setup query for filter criteries, if provided 360 if self.filter_criteria: 361 # make sure we have a filtered collection with the criteria given 362 if isinstance(self.filter_criteria, dict): 363 self.query_inplace(**self.filter_criteria) 364 elif isinstance(self.filter_criteria, Filter): 365 self.query_inplace(self.filter_criteria) 366 else: 367 raise ValueError('Invalid query specification of type {}'.format(type(self.filter_criteria))) 368 # if immediate_loc is True, .loc and .iloc always evaluate 369 self.immediate_loc = immediate_loc 370 # __array__ will return this value if it is set, set it otherwise 371 self._evaluated = None 372 # set true to automatically capture inspects on .value. retrieve using .inspect(cached=True) 373 self.auto_inspect = auto_inspect 374 self._inspect_cache = INSPECT_CACHE 375 # apply mixins 376 self._applyto = str(self.__class__) 377 self._apply_mixins() 378 # parser to parse documents to dataframe 379 self._parser = json_normalize if normalize else parser 380 # prepare function to be applied just before returning from .value 381 self._preparefn = preparefn 382 # keep technical fields like _id, _idx etc 383 self._raw = raw 384 # metadata stored by omegaml (equiv. of metadata.kind_meta) 385 self.metadata = metadata or dict() 386 387 def _apply_mixins(self, *args, **kwargs): 388 """ 389 apply mixins in defaults.OMEGA_MDF_MIXINS 390 """ 391 from omegaml import settings 392 defaults = settings() 393 for mixin, applyto in defaults.OMEGA_MDF_MIXINS: 394 if any(v in self._applyto for v in applyto.split(',')): 395 extend_instance(self, mixin, *args, **kwargs) 396 397 def __getstate__(self): 398 # pickle support. note that the hard work is done in PickableCollection 399 data = dict(self.__dict__) 400 data.update(_evaluated=None) 401 data.update(_inspect_cache=None) 402 data.update(auto_inspect=self.auto_inspect) 403 data.update(_preparefn=self._preparefn) 404 data.update(_parser=self._parser) 405 data.update(_raw=self._raw) 406 data.update(collection=self.collection) 407 return data 408 409 def __reduce__(self): 410 state = self.__getstate__() 411 args = self.collection, 412 return _mdf_remake, args, state 413 414 def __setstate__(self, state): 415 # pickle support. note that the hard work is done in PickableCollection 416 for k, v in state.items(): 417 setattr(self, k, v) 418 419 def _getcopy_kwargs(self, without=None): 420 """ return all parameters required on a copy of this MDataFrame """ 421 kwargs = dict(columns=self.columns, 422 sort_order=self.sort_order, 423 limit=self.head_limit, 424 skip=self.skip_topn, 425 from_loc_indexer=self.from_loc_indexer, 426 from_loc_range=self.from_loc_range, 427 immediate_loc=self.immediate_loc, 428 metadata=self.metadata, 429 query=self.filter_criteria, 430 auto_inspect=self.auto_inspect, 431 parser=self._parser, 432 preparefn=self._preparefn) 433 [kwargs.pop(k) for k in make_tuple(without or [])] 434 return kwargs 435 436 def __array__(self, dtype=None): 437 # FIXME inefficient. make MDataFrame a drop-in replacement for any numpy ndarray 438 # this evaluates every single time 439 if self._evaluated is None: 440 self._evaluated = array = self.value.values 441 else: 442 array = self._evaluated 443 return array 444 445 def __getattr__(self, attr): 446 if attr in MDataFrame.STATFUNCS: 447 return self.statfunc(attr) 448 if attr in self.columns: 449 kwargs = self._getcopy_kwargs() 450 kwargs.update(columns=attr) 451 return MSeries(self.collection, **kwargs) 452 raise AttributeError(attr) 453 454 def __getitem__(self, cols_or_slice): 455 """ 456 select and project by column, columns, slice, masked-style filter 457 458 Masked-style filters work similar to pd.DataFrame/Series masks 459 but do not actually return masks but an instance of Filter. A 460 Filter is a delayed evaluation on the data frame. 461 462 # select all rows where any column is == 5 463 mdf = MDataFrame(coll) 464 flt = mdf == 5 465 mdf[flt] 466 => 467 468 :param cols_or_slice: single column (str), multi-columns (list), 469 slice to select columns or a masked-style 470 :return: filtered MDataFrame or MSeries 471 """ 472 if isinstance(cols_or_slice, str): 473 # column name => MSeries 474 return self._as_mseries(cols_or_slice) 475 elif isinstance(cols_or_slice, int): 476 # column number => MSeries 477 column = self.columns[cols_or_slice] 478 return self._as_mseries(column) 479 elif isinstance(cols_or_slice, (tuple, list)): 480 # list of column names => MDataFrame subset on columns 481 kwargs = self._getcopy_kwargs() 482 kwargs.update(columns=cols_or_slice) 483 return MDataFrame(self.collection, **kwargs) 484 elif isinstance(cols_or_slice, Filter): 485 kwargs = self._getcopy_kwargs() 486 kwargs.update(query=cols_or_slice.query) 487 return MDataFrame(self.collection, **kwargs) 488 elif isinstance(cols_or_slice, np.ndarray): 489 return self.iloc[cols_or_slice] 490 raise ValueError('unknown accessor type %s' % type(cols_or_slice)) 491 492 def __setitem__(self, column, value): 493 # True for any scalar type, numeric, bool, string 494 if np.isscalar(value): 495 result = self.collection.update_many(filter=self.filter_criteria, 496 update=qops.SET(column, value)) 497 self.columns.append(column) 498 return self 499 500 def _clone(self, collection=None, **kwargs): 501 # convenience method to clone itself with updates 502 collection = collection if collection is not None else self.collection 503 return self.__class__(collection, **kwargs, 504 **self._getcopy_kwargs(without=list(kwargs.keys()))) 505 506 def statfunc(self, stat): 507 aggr = MGrouper(self, self.collection, [], sort=False) 508 return getattr(aggr, stat) 509
[docs] 510 def groupby(self, columns, sort=True): 511 """ 512 Group by a given set of columns 513 514 :param columns: the list of columns 515 :param sort: if True sort by group key 516 :return: MGrouper 517 """ 518 return MGrouper(self, self.collection, columns, sort=sort)
519 520 def _get_fields(self, raw=False): 521 result = [] 522 doc = self.collection.find_one() 523 if doc is not None: 524 if raw: 525 result = list(doc.keys()) 526 else: 527 result = [str(col) for col in doc.keys() 528 if col != '_id' 529 and not col.startswith('_idx') 530 and not col.startswith('_om#')] 531 return result 532 533 def _get_frame_index(self): 534 """ return the dataframe's index columns """ 535 doc = self.collection.find_one() 536 if doc is None: 537 result = [] 538 else: 539 result = restore_index_columns_order(doc.keys()) 540 return result 541 542 def _get_frame_om_fields(self): 543 """ return the dataframe's omega special fields columns """ 544 doc = self.collection.find_one() 545 if doc is None: 546 result = [] 547 else: 548 result = [k for k in list(doc.keys()) if k.startswith('_om#')] 549 return result 550 551 def _as_mseries(self, column): 552 kwargs = self._getcopy_kwargs() 553 kwargs.update(columns=make_tuple(column)) 554 return MSeries(self.collection, **kwargs) 555
[docs] 556 def inspect(self, explain=False, cached=False, cursor=None, raw=False): 557 """ 558 inspect this dataframe's actual mongodb query 559 560 :param explain: if True explains access path 561 """ 562 if not cached: 563 if isinstance(self.collection, FilteredCollection): 564 query = self.collection.query 565 else: 566 query = '*', 567 if explain: 568 cursor = cursor or self._get_cursor() 569 explain = cursor.explain() 570 data = { 571 'projection': self.columns, 572 'query': query, 573 'explain': explain or 'specify explain=True' 574 } 575 else: 576 data = self._inspect_cache 577 if not (raw or explain): 578 data = pd.DataFrame(json_normalize(data)) 579 return data
580 581 def count(self): 582 """ 583 projected number of rows when resolving 584 """ 585 nrows = len(self) 586 counts = pd.Series({ 587 col: nrows 588 for col in self.columns}, index=self.columns) 589 return counts 590
[docs] 591 def __len__(self): 592 """ 593 the projected number of rows when resolving 594 """ 595 # we reduce to just 1 column to reduce speed 596 short = self._clone() 597 short = short[self.columns[0]] if self.columns else short 598 return sum(1 for d in short._get_cursor())
599 600 @property 601 def shape(self): 602 """ 603 return shape of dataframe 604 """ 605 return len(self), len(self.columns) 606 607 @property 608 def ndim(self): 609 return len(self.shape) 610 611 @property 612 def value(self): 613 """ 614 resolve the query and return a Pandas DataFrame 615 616 :return: the result of the query as a pandas DataFrame 617 """ 618 cursor = self._get_cursor() 619 df = self._get_dataframe_from_cursor(cursor) 620 if self.auto_inspect: 621 self._inspect_cache.append(self.inspect(explain=True, cursor=cursor, raw=True)) 622 # this ensures the equiv. of pandas df.loc[n] is a Series 623 if self.from_loc_indexer: 624 if len(df) == 1 and not self.from_loc_range: 625 idx = df.index 626 df = df.T 627 df = df[df.columns[0]] 628 if df.ndim == 1 and len(df) == 1 and not isinstance(idx, pd.MultiIndex): 629 # single row single dimension, numeric index only 630 df = df.iloc[0] 631 elif (df.ndim == 1 or df.shape[1] == 1) and not self.from_loc_range: 632 df = df[df.columns[0]] 633 if self._preparefn: 634 df = self._preparefn(df) 635 return df 636 637 def reset(self): 638 # TODO if head(), tail(), query(), .loc/iloc should return a new MDataFrame instance to avoid having a reset need 639 self.head_limit = None 640 self.skip_topn = None 641 self.filter_criteria = {} 642 self.force_columns = [] 643 self.sort_order = None 644 self.from_loc_indexer = False 645 return self 646 647 def _get_dataframe_from_cursor(self, cursor): 648 """ 649 from the given cursor return a DataFrame 650 """ 651 df = cursor_to_dataframe(cursor, parser=self._parser) 652 df = self._restore_dataframe_proper(df) 653 return df 654 655 @property 656 def _index_meta(self): 657 return self.metadata.get('idx_meta') or dict() 658 659 def _restore_dataframe_proper(self, df): 660 df = restore_index(df, self._index_meta) 661 if '_id' in df.columns and not self._raw: 662 df.drop('_id', axis=1, inplace=True) 663 if self.force_columns: 664 missing = set(self.force_columns) - set(self.columns) 665 for col in missing: 666 df[col] = np.NaN 667 return df 668 669 def _get_cursor(self): 670 projection = make_tuple(self.columns) 671 projection += make_tuple(self._get_frame_index()) 672 if not self.sort_order: 673 # implicit sort 674 projection += make_tuple(self._get_frame_om_fields()) 675 cursor = self.collection.find(projection=projection) 676 if self.sort_order: 677 cursor.sort(qops.make_sortkey(make_tuple(self.sort_order))) 678 if self.head_limit: 679 cursor.limit(self.head_limit) 680 if self.skip_topn: 681 cursor.skip(self.skip_topn) 682 return cursor 683
[docs] 684 def sort(self, columns): 685 """ 686 sort by specified columns 687 688 :param columns: str of single column or a list of columns. Sort order 689 is specified as the + (ascending) or - (descending) 690 prefix to the column name. Default sort order is 691 ascending. 692 :return: the MDataFrame 693 """ 694 self._evaluated = None 695 self.sort_order = make_tuple(columns) 696 return self
697
[docs] 698 def head(self, limit=10): 699 """ 700 return up to limit numbers of rows 701 702 :param limit: the number of rows to return. Defaults to 10 703 :return: the MDataFrame 704 """ 705 return self._clone(limit=limit)
706 707 def tail(self, limit=10): 708 """ 709 return up to limit number of rows from last inserted values 710 711 :param limit: 712 :return: 713 """ 714 tail_n = self.skip(len(self) - limit) 715 return self._clone(skip=tail_n) 716
[docs] 717 def skip(self, topn): 718 """ 719 skip the topn number of rows 720 721 :param topn: the number of rows to skip. 722 :return: the MDataFrame 723 """ 724 return self._clone(skip=topn)
725
[docs] 726 def merge(self, right, on=None, left_on=None, right_on=None, 727 how='inner', target=None, suffixes=('_x', '_y'), 728 sort=False, inspect=False, filter=None): 729 """ 730 merge this dataframe with another dataframe. only left outer joins 731 are currently supported. the output is saved as a new collection, 732 target name (defaults to a generated name if not specified). 733 734 :param right: the other MDataFrame 735 :param on: the list of key columns to merge by 736 :param left_on: the list of the key columns to merge on this dataframe 737 :param right_on: the list of the key columns to merge on the other 738 dataframe 739 :param how: the method to merge. supported are left, inner, right. 740 Defaults to inner 741 :param target: the name of the collection to store the merge results 742 in. If not provided a temporary name will be created. 743 :param suffixes: the suffixes to apply to identical left and right 744 columns 745 :param sort: if True the merge results will be sorted. If False the 746 MongoDB natural order is implied. 747 :returns: the MDataFrame to the target MDataFrame 748 """ 749 # validate input 750 supported_how = ["left", 'inner', 'right'] 751 assert how in supported_how, "only %s merges are currently supported" % supported_how 752 for key in [on, left_on, right_on]: 753 if key: 754 assert isinstance( 755 key, str), "only single column merge keys are supported (%s)" % key 756 if isinstance(right, (Collection, PickableCollection, FilteredCollection)): 757 right = MDataFrame(right) 758 assert isinstance( 759 right, MDataFrame), "both must be MDataFrames, got right=%" % type(right) 760 if how == 'right': 761 # A right B == B left A 762 return right.merge(self, on=on, left_on=right_on, right_on=left_on, 763 how='left', target=target, suffixes=suffixes) 764 # generate lookup parameters 765 on = on or '_id' 766 right_name = self._get_collection_name_of(right, right) 767 target_name = self._get_collection_name_of( 768 target, '_temp.merge.%s' % uuid4().hex) 769 target_field = ( 770 "%s_%s" % (right_name.replace('.', '_'), right_on or on)) 771 """ 772 TODO enable filter criteria on right dataframe. requires changing LOOKUP syntax from 773 equitly to arbitray match 774 775 if right.filter_criteria: 776 right_filter = [qops.MATCH(self._get_filter_criteria(**right.filter_criteria))] 777 else: 778 right_filter = None 779 """ 780 right_filter = None 781 lookup = qops.LOOKUP(right_name, 782 key=on, 783 left_key=left_on, 784 right_key=right_on, 785 target=target_field) 786 # unwind merged documents from arrays to top-level document fields 787 unwind = qops.UNWIND(target_field, preserve=how != 'inner') 788 # get all fields from left, right 789 project = {} 790 for left_col in self.columns: 791 source_left_col = left_col 792 if left_col == '_id': 793 project[left_col] = 1 794 continue 795 if left_col.startswith('_idx'): 796 continue 797 if left_col.startswith('_om#'): 798 continue 799 if left_col != (on or left_on) and left_col in right.columns: 800 left_col = '%s%s' % (left_col, suffixes[0]) 801 project[left_col] = "$%s" % source_left_col 802 for right_col in right.columns: 803 if right_col == '_id': 804 continue 805 if right_col.startswith('_idx'): 806 continue 807 if right_col.startswith('_om#'): 808 continue 809 if right_col == (on or right_on) and right_col == (on or left_on): 810 # if the merge field is the same in both frames, we already 811 # have it from left 812 continue 813 if right_col in self.columns: 814 left_col = '%s%s' % (right_col, suffixes[1]) 815 else: 816 left_col = '%s' % right_col 817 project[left_col] = '$%s.%s' % (target_field, right_col) 818 expected_columns = list(project.keys()) 819 if '_id' not in project: 820 project['_id'] = 0 # never copy objectids to avoid duplicate keys, unless requested 821 project = {"$project": project} 822 # store merged documents and return an MDataFrame to it 823 out = qops.OUT(target_name) 824 pipeline = [lookup, unwind, project] 825 if filter: 826 query = qops.MATCH(self._get_filter_criteria(**filter)) 827 pipeline.append(query) 828 if sort: 829 sort_cols = make_list(on or [left_on, right_on]) 830 sort_key = qops.make_sortkey(sort_cols) 831 sort = qops.SORT(**dict(sort_key)) 832 pipeline.append(sort) 833 pipeline.append(out) 834 if inspect: 835 result = pipeline 836 else: 837 result = self.collection.aggregate(pipeline, allowDiskUse=True) 838 result = MDataFrame(self.collection.database[target_name], 839 force_columns=expected_columns) 840 return result
841 842 def append(self, other): 843 if isinstance(other, Collection): 844 other = MDataFrame(other) 845 assert isinstance( 846 other, MDataFrame), "both must be MDataFrames, got other={}".format(type(other)) 847 outname = self.collection.name 848 mrout = { 849 'merge': outname, 850 'nonAtomic': True, 851 } 852 mapfn = Code(""" 853 function() { 854 this._id = ObjectId(); 855 if(this['_om#rowid']) { 856 this['_om#rowid'] += %s; 857 } 858 emit(this._id, this); 859 } 860 """ % len(self)) 861 reducefn = Code(""" 862 function(key, value) { 863 return value; 864 } 865 """) 866 finfn = Code(""" 867 function(key, value) { 868 return value; 869 } 870 """) 871 other.collection.map_reduce(mapfn, reducefn, mrout, finalize=finfn, jsMode=True) 872 unwind = { 873 "$replaceRoot": { 874 "newRoot": { 875 "$ifNull": ["$value", "$$CURRENT"], 876 } 877 } 878 } 879 output = qops.OUT(outname) 880 pipeline = [unwind, output] 881 self.collection.aggregate(pipeline, allowDiskUse=True) 882 return self 883 884 def _get_collection_name_of(self, some, default=None): 885 """ 886 determine the collection name of the given parameter 887 888 returns the collection name if some is a MDataFrame, a Collection 889 or a string_type. Otherwise returns default 890 """ 891 if isinstance(some, MDataFrame): 892 name = some.collection.name 893 elif isinstance(some, Collection): 894 name = some.name 895 else: 896 name = default 897 return name 898 899 def _get_filter_criteria(self, *args, **kwargs): 900 """ 901 return mongo query from filter specs 902 903 this uses a Filter to produce the query from the kwargs. 904 905 :param args: a Q object or logical combination of Q objects 906 (optional) 907 :param kwargs: all AND filter criteria 908 """ 909 if len(args) > 0: 910 q = args[0] 911 if isinstance(q, MongoQ): 912 filter_criteria = Filter(self.collection, q).query 913 elif isinstance(q, Filter): 914 filter_criteria = Filter(self.collection, q.q).query 915 else: 916 filter_criteria = Filter(self.collection, **kwargs).query 917 return filter_criteria 918
[docs] 919 def query_inplace(self, *args, **kwargs): 920 """ 921 filters this MDataFrame and returns it. 922 923 Any subsequent operation on the dataframe will have the filter 924 applied. To reset the filter call .reset() without arguments. 925 926 :param args: a Q object or logical combination of Q objects 927 (optional) 928 :param kwargs: all AND filter criteria 929 :return: self 930 """ 931 self._evaluated = None 932 self.filter_criteria = self._get_filter_criteria(*args, **kwargs) 933 self.collection = FilteredCollection( 934 self.collection, query=self.filter_criteria) 935 return self
936
[docs] 937 def query(self, *args, **kwargs): 938 """ 939 return a new MDataFrame with a filter criteria 940 941 Any subsequent operation on the new dataframe will have the filter 942 applied. To reset the filter call .reset() without arguments. 943 944 Note: Unlike pandas DataFrames, a filtered MDataFrame operates 945 on the same collection as the original DataFrame 946 947 :param args: a Q object or logical combination of Q objects 948 (optional) 949 :param kwargs: all AND filter criteria 950 :return: a new MDataFrame with the filter applied 951 """ 952 effective_filter = dict(self.filter_criteria) 953 filter_criteria = self._get_filter_criteria(*args, **kwargs) 954 if '$and' in effective_filter: 955 effective_filter['$and'].extend(filter_criteria.get('$and')) 956 else: 957 effective_filter.update(filter_criteria) 958 coll = FilteredCollection(self.collection, query=effective_filter) 959 return self._clone(collection=coll, query=effective_filter)
960
[docs] 961 def create_index(self, keys, **kwargs): 962 """ 963 create and index the easy way 964 """ 965 keys, kwargs = MongoQueryOps().make_index(keys) 966 result = ensure_index(self.collection, keys, **kwargs) 967 return result
968 969 def list_indexes(self): 970 """ 971 list all indices in database 972 """ 973 return cursor_to_dataframe(self.collection.list_indexes()) 974 975 def iterchunks(self, chunksize=100): 976 """ 977 return an iterator 978 979 Args: 980 chunksize (int): number of rows in each chunk 981 982 Returns: 983 a dataframe of max. length chunksize 984 """ 985 chunksize = int(chunksize) 986 i = 0 987 while True: 988 chunkdf = self.skip(i).head(chunksize).value 989 if len(chunkdf) == 0: 990 break 991 i += chunksize 992 yield chunkdf 993 994 def itertuples(self, chunksize=1000): 995 chunksize = int(chunksize) 996 __doc__ = pd.DataFrame.itertuples.__doc__ 997 998 for chunkdf in self.iterchunks(chunksize=chunksize): 999 for row in chunkdf.iterrows(): 1000 yield row 1001 1002 def iterrows(self, chunksize=1000): 1003 chunksize = int(chunksize) 1004 __doc__ = pd.DataFrame.iterrows.__doc__ 1005 1006 for chunkdf in self.iterchunks(chunksize=chunksize): 1007 if isinstance(chunkdf, pd.DataFrame): 1008 for row in chunkdf.iterrows(): 1009 yield row 1010 else: 1011 # Series does not have iterrows 1012 for i in range(0, len(chunkdf), chunksize): 1013 yield chunkdf.iloc[i:i + chunksize] 1014 1015 def iteritems(self): 1016 if not hasattr(pd.DataFrame, 'iteritems'): 1017 raise NotImplementedError('MDataFrame.iteritems has been removed since Pandas 2.0. Use .items instead.') 1018 __doc__ = pd.DataFrame.iteritems.__doc__ 1019 return self.items() 1020 1021 def items(self): 1022 __doc__ = pd.DataFrame.items.__doc__ 1023 1024 for col in self.columns: 1025 yield col, self[col].value 1026 1027 @property 1028 def loc(self): 1029 """ 1030 Access by index 1031 1032 Use as mdf.loc[index_value] 1033 1034 :return: MLocIndexer 1035 """ 1036 self._evaluated = None 1037 indexer = MLocIndexer(self) 1038 return indexer 1039 1040 @property 1041 def iloc(self): 1042 self._evaluated = None 1043 indexer = MPosIndexer(self) 1044 return indexer 1045 1046 def rows(self, start=None, end=None, chunksize=1000): 1047 # equivalent to .iloc[start:end].iteritems(), 1048 start, end, chunksize = (int(v) for v in (start, end, chunksize)) 1049 return self.iloc[slice(start, end)].iterchunks(chunksize) 1050 1051 def __repr__(self): 1052 kwargs = ', '.join('{}={}'.format(k, v) for k, v in self._getcopy_kwargs().items()) 1053 return "MDataFrame(collection={collection.name}, {kwargs})".format(collection=self.collection, 1054 kwargs=kwargs)
1055 1056
[docs] 1057class MSeries(MDataFrame): 1058 """ 1059 Series implementation for MDataFrames 1060 1061 behaves like a DataFrame but limited to one column. 1062 """ 1063 1064 def __init__(self, *args, **kwargs): 1065 super(MSeries, self).__init__(*args, **kwargs) 1066 # true if only unique values apply 1067 self.is_unique = False 1068 # apply mixins 1069 self._applyto = str(self.__class__) 1070 self._apply_mixins(*args, **kwargs) 1071 1072 def __getitem__(self, cols_or_slice): 1073 if isinstance(cols_or_slice, Filter): 1074 return MSeries(self.collection, columns=self.columns, 1075 query=cols_or_slice.query) 1076 return super(MSeries, self).__getitem__(cols_or_slice) 1077 1078 @property 1079 def name(self): 1080 return self.columns[0] 1081
[docs] 1082 def unique(self): 1083 """ 1084 return the unique set of values for the series 1085 1086 :return: MSeries 1087 """ 1088 self.is_unique = True 1089 return self
1090 1091 def _get_cursor(self): 1092 if self.is_unique: 1093 # this way indexes get applied 1094 cursor = self.collection.distinct(make_tuple(self.columns)[0]) 1095 else: 1096 cursor = super(MSeries, self)._get_cursor() 1097 return cursor 1098 1099 @property 1100 def value(self): 1101 """ 1102 return the value of the series 1103 1104 this is a Series unless unique() was called. If unique() 1105 only distinct values are returned as an array, matching 1106 the behavior of a Series 1107 1108 :return: pandas.Series 1109 """ 1110 cursor = self._get_cursor() 1111 column = make_tuple(self.columns)[0] 1112 if self.is_unique: 1113 # the .distinct() cursor returns a list of values 1114 # this is to make sure we return the same thing as pandas 1115 val = [v for v in cursor] 1116 else: 1117 val = self._get_dataframe_from_cursor(cursor) 1118 val = val[column] 1119 val.name = self.name 1120 if len(val) == 1 and self.from_loc_indexer: 1121 val = val.iloc[0] 1122 if self.auto_inspect: 1123 self._inspect_cache.append(self.inspect(explain=True, cursor=cursor, raw=True)) 1124 if self._preparefn: 1125 df = self._preparefn(val) 1126 return val 1127 1128 def __repr__(self): 1129 kwargs = ', '.join('{}={}'.format(k, v) for k, v in self._getcopy_kwargs().items()) 1130 return "MSeries(collection={collection.name}, {kwargs})".format(collection=self.collection, 1131 kwargs=kwargs) 1132 1133 @property 1134 def shape(self): 1135 return len(self),
1136 1137 1138def _mdf_remake(collection): 1139 # recreate a pickled MDF 1140 mdf = MDataFrame(collection) 1141 return mdf