Source code for omegaml.mdataframe

   1from __future__ import absolute_import
   2
   3from uuid import uuid4
   4
   5import numpy as np
   6import pandas as pd
   7from bson import Code
   8from numpy import isscalar
   9from pymongo.collection import Collection
  10
  11from omegaml.store import qops
  12from omegaml.store.filtered import FilteredCollection
  13from omegaml.store.query import Filter, MongoQ
  14from omegaml.store.queryops import MongoQueryOps
  15from omegaml.util import make_tuple, make_list, restore_index, \
  16    cursor_to_dataframe, restore_index_columns_order, PickableCollection, extend_instance, json_normalize, ensure_index
  17
  18INSPECT_CACHE = []
  19
  20
[docs] 21class MGrouper(object): 22 """ 23 a Grouper for MDataFrames 24 """ 25 STATS_MAP = { 26 'std': 'stdDevSamp', 27 'mean': 'avg', 28 } 29 30 def __init__(self, mdataframe, collection, columns, sort=True): 31 self.mdataframe = mdataframe 32 self.collection = collection 33 self.columns = make_tuple(columns) 34 self.should_sort = sort 35 36 def __getattr__(self, attr): 37 if attr in self.columns: 38 return MSeriesGroupby(self, self.collection, attr) 39 40 def statfunc(): 41 columns = self.columns or self._non_group_columns() 42 return self.agg({col: attr for col in columns}) 43 44 return statfunc 45 46 def __getitem__(self, item): 47 return self.__getattr__(item) 48
[docs] 49 def agg(self, specs): 50 """ 51 shortcut for .aggregate 52 """ 53 return self.aggregate(specs)
54
[docs] 55 def aggregate(self, specs, **kwargs): 56 """ 57 aggregate by given specs 58 59 See the following link for a list of supported operations. 60 https://docs.mongodb.com/manual/reference/operator/aggregation/group/ 61 62 :param specs: a dictionary of { column : function | list[functions] } 63 pairs. 64 """ 65 66 def add_stats(specs, column, stat): 67 specs['%s_%s' % (column, stat)] = { 68 '$%s' % MGrouper.STATS_MAP.get(stat, stat): '$%s' % column} 69 70 # generate $group command 71 _specs = {} 72 for column, stats in specs.items(): 73 stats = make_tuple(stats) 74 for stat in stats: 75 add_stats(_specs, column, stat) 76 groupby = qops.GROUP(columns=self.columns, 77 **_specs) 78 # execute and return a dataframe 79 pipeline = self._amend_pipeline([groupby]) 80 data = self.collection.aggregate(pipeline, allowDiskUse=True) 81 82 def get_data(): 83 # we need this to build a pipeline for from_records 84 # to process, otherwise the cursor will be exhausted already 85 for group in data: 86 _id = group.pop('_id') 87 if isinstance(_id, dict): 88 group.update(_id) 89 yield group 90 91 df = pd.DataFrame.from_records(get_data()) 92 columns = make_list(self.columns) 93 if columns: 94 df = df.set_index(columns, drop=True) 95 return df
96 97 def _amend_pipeline(self, pipeline): 98 """ amend pipeline with default ops on coll.aggregate() calls """ 99 if self.should_sort: 100 sort = qops.SORT(**dict(qops.make_sortkey('_id'))) 101 pipeline.append(sort) 102 return pipeline 103 104 def _non_group_columns(self): 105 """ get all columns in mdataframe that is not in columns """ 106 return [col for col in self.mdataframe.columns 107 if col not in self.columns and col != '_id' 108 and not col.startswith('_idx') 109 and not col.startswith('_om#')] 110 111 def _count(self): 112 count_columns = self._non_group_columns() 113 if len(count_columns) == 0: 114 count_columns.append('_'.join(self.columns) + '_count') 115 groupby = { 116 "$group": { 117 "_id": {k: "$%s" % k for k in self.columns}, 118 } 119 } 120 for k in count_columns: 121 groupby['$group']['%s' % k] = {"$sum": 1} 122 pipeline = self._amend_pipeline([groupby]) 123 if self.should_sort: 124 sort = qops.SORT(**dict(qops.make_sortkey('_id'))) 125 pipeline.append(sort) 126 return list(self.collection.aggregate(pipeline, allowDiskUse=True)) 127
[docs] 128 def count(self): 129 """ return counts by group columns """ 130 counts = self._count() 131 # remove mongo object _id 132 for group in counts: 133 group.update(group.pop('_id')) 134 # transform results to dataframe, then return as pandas would 135 resultdf = pd.DataFrame(counts).set_index(make_list(self.columns), 136 drop=True) 137 return resultdf
138 139 def __iter__(self): 140 """ for each group returns the key and a Filter object""" 141 # reduce count to only one column 142 groups = getattr(self, self.columns[0])._count() 143 for group in groups: 144 keys = group.get('_id') 145 data = self.mdataframe._clone(query=keys) 146 yield keys, data
147 148
[docs] 149class MLocIndexer(object): 150 """ 151 implements the LocIndexer for MDataFrames 152 """ 153 154 def __init__(self, mdataframe, positional=False): 155 self.mdataframe = mdataframe 156 # if positional, any loc[spec] will be applied on the rowid only 157 self.positional = positional 158 # indicator will be set true if loc specs are from a range type (list, tuple, np.ndarray) 159 self._from_range = False 160
[docs] 161 def __getitem__(self, specs): 162 """ 163 access by index 164 165 use as mdf.loc[specs] where specs is any of 166 167 * a list or tuple of scalar index values, e.g. .loc[(1,2,3)] 168 * a slice of values e.g. .loc[1:5] 169 * a list of slices, e.g. .loc[1:5, 2:3] 170 171 :return: the sliced part of the MDataFrame 172 """ 173 filterq, projection = self._get_filter(specs) 174 df = self.mdataframe 175 if filterq: 176 df = self.mdataframe.query(filterq) 177 df.from_loc_indexer = True 178 df.from_loc_range = self._from_range 179 if projection: 180 df = df[projection] 181 if isinstance(self.mdataframe, MSeries): 182 df = df._as_mseries(df.columns[0]) 183 if getattr(df, 'immediate_loc', False): 184 df = df.value 185 return df
186 187 def __setitem__(self, specs, value): 188 raise NotImplementedError() 189 190 def _get_filter(self, specs): 191 filterq = [] 192 projection = None 193 if self.positional: 194 idx_cols = ['_om#rowid'] 195 else: 196 idx_cols = self.mdataframe._get_frame_index() 197 flt_kwargs = {} 198 enumerable_types = (list, tuple, np.ndarray) 199 if isinstance(specs, np.ndarray): 200 specs = specs.tolist() 201 if (isinstance(specs, enumerable_types) 202 and isscalar(specs[0]) and len(idx_cols) == 1 203 and not any(isinstance(s, slice) for s in specs)): 204 # single column index with list of scalar values 205 if (self.positional and isinstance(specs, tuple) and len(specs) == 2 206 and all(isscalar(v) for v in specs)): 207 # iloc[int, int] is a cell access 208 flt_kwargs[idx_cols[0]] = specs[0] 209 projection = self._get_projection(specs[1]) 210 else: 211 flt_kwargs['{}__in'.format(idx_cols[0])] = specs 212 self._from_range = True 213 elif isinstance(specs, (int, str)): 214 flt_kwargs[idx_cols[0]] = specs 215 else: 216 specs = make_tuple(specs) 217 # list/tuple of slices or scalar values, or MultiIndex 218 for i, spec in enumerate(specs): 219 if i < len(idx_cols): 220 col = idx_cols[i] 221 if isinstance(spec, slice): 222 self._from_range = True 223 start, stop = spec.start, spec.stop 224 if start is not None: 225 flt_kwargs['{}__gte'.format(col)] = start 226 if stop is not None: 227 if isinstance(stop, int): 228 stop -= int(self.positional) 229 flt_kwargs['{}__lte'.format(col)] = stop 230 elif isinstance(spec, enumerable_types) and isscalar(spec[0]): 231 self._from_range = True 232 # single column index with list of scalar values 233 # -- convert to list for PyMongo serialization 234 if isinstance(spec, np.ndarray): 235 spec = spec.tolist() 236 flt_kwargs['{}__in'.format(col)] = spec 237 elif isscalar(col): 238 flt_kwargs[col] = spec 239 else: 240 # we're out of index columns, let's look at columns 241 cur_proj = self._get_projection(spec) 242 # if we get a single column, that's the projection 243 # if we had a single column, now need to add more, create a list 244 # if we have a list already, extend that 245 if projection is None: 246 projection = cur_proj 247 elif isinstance(projection, list): 248 projection.extend(cur_proj) 249 else: 250 projection = [projection, cur_proj] 251 if flt_kwargs: 252 filterq.append(MongoQ(**flt_kwargs)) 253 finalq = None 254 for q in filterq: 255 if finalq: 256 finalq |= q 257 else: 258 finalq = q 259 return finalq, projection 260 261 def _get_projection(self, spec): 262 columns = self.mdataframe.columns 263 if np.isscalar(spec): 264 return [spec] 265 if isinstance(spec, (tuple, list)): 266 assert all(columns.index(col) for col in columns) 267 return spec 268 if isinstance(spec, slice): 269 start, stop = spec.start, spec.stop 270 if all(isinstance(v, int) for v in (start, stop)): 271 start, stop, step = spec.indices(len(columns)) 272 else: 273 start = columns.index(start) if start is not None else 0 274 stop = columns.index(stop) + 1 if stop is not None else len(columns) 275 return columns[slice(start, stop)] 276 raise IndexError
277 278
[docs] 279class MPosIndexer(MLocIndexer): 280 """ 281 implements the position-based indexer for MDataFrames 282 """ 283 284 def __init__(self, mdataframe): 285 super(MPosIndexer, self).__init__(mdataframe, positional=True) 286 287 def _get_projection(self, spec): 288 columns = self.mdataframe.columns 289 if np.isscalar(spec): 290 return columns[spec] 291 if isinstance(spec, (tuple, list)): 292 return [col for i, col in enumerate(spec) if i in spec] 293 if isinstance(spec, slice): 294 start, stop = slice.start, slice.stop 295 if start and not isinstance(start, int): 296 start = 0 297 if stop and not isinstance(stop, int): 298 # sliced ranges are inclusive 299 stop = len(columns) 300 return columns[slice(start, stop)] 301 raise IndexError
302 303 304class MSeriesGroupby(MGrouper): 305 """ 306 like a MGrouper but limited to one column 307 """ 308 309 def count(self): 310 """ 311 return series count 312 313 :return: counts by group 314 """ 315 # MGrouper will insert a _count column, see _count(). we remove 316 # that column again and return a series named as the group column 317 resultdf = super(MSeriesGroupby, self).count() 318 count_column = [col for col in resultdf.columns 319 if col.endswith('_count')][0] 320 new_column = count_column.replace('_count', '') 321 resultdf = resultdf.rename(columns={count_column: new_column}) 322 return resultdf[new_column] 323 324
[docs] 325class MDataFrame(object): 326 """ 327 A DataFrame for mongodb 328 329 Performs out-of-core, lazy computOation on a mongodb cluster. 330 Behaves like a pandas DataFrame. Actual results are returned 331 as pandas DataFrames. 332 """ 333 334 STATFUNCS = ['mean', 'std', 'min', 'max', 'sum', 'var'] 335 336 def __init__(self, collection, columns=None, query=None, 337 limit=None, skip=None, sort_order=None, 338 force_columns=None, immediate_loc=False, auto_inspect=False, 339 normalize=False, raw=False, 340 parser=None, 341 preparefn=None, from_loc_range=False, metadata=None, **kwargs): 342 self.collection = PickableCollection(collection) 343 # columns in frame 344 self.columns = make_tuple(columns) if columns else self._get_fields(raw=raw) 345 self.columns = [str(col) for col in self.columns] 346 # columns to sort by, defaults to not sorted 347 self.sort_order = sort_order 348 # top n documents to fetch 349 self.head_limit = limit 350 # top n documents to skip before returning 351 self.skip_topn = skip 352 # filter criteria 353 self.filter_criteria = query or {} 354 # force columns -- on output add columns not present 355 self.force_columns = force_columns or [] 356 # was this created from the loc indexer? 357 self.from_loc_indexer = kwargs.get('from_loc_indexer', False) 358 # was the loc index used a range? Else a single value 359 self.from_loc_range = from_loc_range 360 # setup query for filter criteries, if provided 361 if self.filter_criteria: 362 # make sure we have a filtered collection with the criteria given 363 if isinstance(self.filter_criteria, dict): 364 self.query_inplace(**self.filter_criteria) 365 elif isinstance(self.filter_criteria, Filter): 366 self.query_inplace(self.filter_criteria) 367 else: 368 raise ValueError('Invalid query specification of type {}'.format(type(self.filter_criteria))) 369 # if immediate_loc is True, .loc and .iloc always evaluate 370 self.immediate_loc = immediate_loc 371 # __array__ will return this value if it is set, set it otherwise 372 self._evaluated = None 373 # set true to automatically capture inspects on .value. retrieve using .inspect(cached=True) 374 self.auto_inspect = auto_inspect 375 self._inspect_cache = INSPECT_CACHE 376 # apply mixins 377 self._applyto = str(self.__class__) 378 self._apply_mixins() 379 # parser to parse documents to dataframe 380 self._parser = json_normalize if normalize else parser 381 # prepare function to be applied just before returning from .value 382 self._preparefn = preparefn 383 # keep technical fields like _id, _idx etc 384 self._raw = raw 385 # metadata stored by omegaml (equiv. of metadata.kind_meta) 386 self.metadata = metadata or dict() 387 388 def _apply_mixins(self, *args, **kwargs): 389 """ 390 apply mixins in defaults.OMEGA_MDF_MIXINS 391 """ 392 from omegaml import settings 393 defaults = settings() 394 for mixin, applyto in defaults.OMEGA_MDF_MIXINS: 395 if any(v in self._applyto for v in applyto.split(',')): 396 extend_instance(self, mixin, *args, **kwargs) 397 398 def __getstate__(self): 399 # pickle support. note that the hard work is done in PickableCollection 400 data = dict(self.__dict__) 401 data.update(_evaluated=None) 402 data.update(_inspect_cache=None) 403 data.update(auto_inspect=self.auto_inspect) 404 data.update(_preparefn=self._preparefn) 405 data.update(_parser=self._parser) 406 data.update(_raw=self._raw) 407 data.update(collection=self.collection) 408 return data 409 410 def __reduce__(self): 411 state = self.__getstate__() 412 args = self.collection, 413 return _mdf_remake, args, state 414 415 def __setstate__(self, state): 416 # pickle support. note that the hard work is done in PickableCollection 417 for k, v in state.items(): 418 setattr(self, k, v) 419 420 def _getcopy_kwargs(self, without=None): 421 """ return all parameters required on a copy of this MDataFrame """ 422 kwargs = dict(columns=self.columns, 423 sort_order=self.sort_order, 424 limit=self.head_limit, 425 skip=self.skip_topn, 426 from_loc_indexer=self.from_loc_indexer, 427 from_loc_range=self.from_loc_range, 428 immediate_loc=self.immediate_loc, 429 metadata=self.metadata, 430 query=self.filter_criteria, 431 auto_inspect=self.auto_inspect, 432 parser=self._parser, 433 preparefn=self._preparefn) 434 [kwargs.pop(k) for k in make_tuple(without or [])] 435 return kwargs 436 437 def __array__(self, dtype=None): 438 # FIXME inefficient. make MDataFrame a drop-in replacement for any numpy ndarray 439 # this evaluates every single time 440 if self._evaluated is None: 441 self._evaluated = array = self.value.values 442 else: 443 array = self._evaluated 444 return array 445 446 def __getattr__(self, attr): 447 if attr in MDataFrame.STATFUNCS: 448 return self.statfunc(attr) 449 if attr in self.columns: 450 kwargs = self._getcopy_kwargs() 451 kwargs.update(columns=attr) 452 return MSeries(self.collection, **kwargs) 453 raise AttributeError(attr) 454 455 def __getitem__(self, cols_or_slice): 456 """ 457 select and project by column, columns, slice, masked-style filter 458 459 Masked-style filters work similar to pd.DataFrame/Series masks 460 but do not actually return masks but an instance of Filter. A 461 Filter is a delayed evaluation on the data frame. 462 463 # select all rows where any column is == 5 464 mdf = MDataFrame(coll) 465 flt = mdf == 5 466 mdf[flt] 467 => 468 469 :param cols_or_slice: single column (str), multi-columns (list), 470 slice to select columns or a masked-style 471 :return: filtered MDataFrame or MSeries 472 """ 473 if isinstance(cols_or_slice, str): 474 # column name => MSeries 475 return self._as_mseries(cols_or_slice) 476 elif isinstance(cols_or_slice, int): 477 # column number => MSeries 478 column = self.columns[cols_or_slice] 479 return self._as_mseries(column) 480 elif isinstance(cols_or_slice, (tuple, list)): 481 # list of column names => MDataFrame subset on columns 482 kwargs = self._getcopy_kwargs() 483 kwargs.update(columns=cols_or_slice) 484 return MDataFrame(self.collection, **kwargs) 485 elif isinstance(cols_or_slice, Filter): 486 kwargs = self._getcopy_kwargs() 487 kwargs.update(query=cols_or_slice.query) 488 return MDataFrame(self.collection, **kwargs) 489 elif isinstance(cols_or_slice, np.ndarray): 490 return self.iloc[cols_or_slice] 491 raise ValueError('unknown accessor type %s' % type(cols_or_slice)) 492 493 def __setitem__(self, column, value): 494 # True for any scalar type, numeric, bool, string 495 if np.isscalar(value): 496 result = self.collection.update_many(filter=self.filter_criteria, 497 update=qops.SET(column, value)) 498 self.columns.append(column) 499 return self 500 501 def _clone(self, collection=None, **kwargs): 502 # convenience method to clone itself with updates 503 collection = collection if collection is not None else self.collection 504 return self.__class__(collection, **kwargs, 505 **self._getcopy_kwargs(without=list(kwargs.keys()))) 506 507 def statfunc(self, stat): 508 aggr = MGrouper(self, self.collection, [], sort=False) 509 return getattr(aggr, stat) 510
[docs] 511 def groupby(self, columns, sort=True): 512 """ 513 Group by a given set of columns 514 515 :param columns: the list of columns 516 :param sort: if True sort by group key 517 :return: MGrouper 518 """ 519 return MGrouper(self, self.collection, columns, sort=sort)
520 521 def _get_fields(self, raw=False): 522 result = [] 523 doc = self.collection.find_one() 524 if doc is not None: 525 if raw: 526 result = list(doc.keys()) 527 else: 528 result = [str(col) for col in doc.keys() 529 if col != '_id' 530 and not col.startswith('_idx') 531 and not col.startswith('_om#')] 532 return result 533 534 def _get_frame_index(self): 535 """ return the dataframe's index columns """ 536 doc = self.collection.find_one() 537 if doc is None: 538 result = [] 539 else: 540 result = restore_index_columns_order(doc.keys()) 541 return result 542 543 def _get_frame_om_fields(self): 544 """ return the dataframe's omega special fields columns """ 545 doc = self.collection.find_one() 546 if doc is None: 547 result = [] 548 else: 549 result = [k for k in list(doc.keys()) if k.startswith('_om#')] 550 return result 551 552 def _as_mseries(self, column): 553 kwargs = self._getcopy_kwargs() 554 kwargs.update(columns=make_tuple(column)) 555 return MSeries(self.collection, **kwargs) 556
[docs] 557 def inspect(self, explain=False, cached=False, cursor=None, raw=False): 558 """ 559 inspect this dataframe's actual mongodb query 560 561 :param explain: if True explains access path 562 """ 563 if not cached: 564 if isinstance(self.collection, FilteredCollection): 565 query = self.collection.query 566 else: 567 query = '*', 568 if explain: 569 cursor = cursor or self._get_cursor() 570 explain = cursor.explain() 571 data = { 572 'projection': self.columns, 573 'query': query, 574 'explain': explain or 'specify explain=True' 575 } 576 else: 577 data = self._inspect_cache 578 if not (raw or explain): 579 data = pd.DataFrame(json_normalize(data)) 580 return data
581 582 def count(self): 583 """ 584 projected number of rows when resolving 585 """ 586 nrows = len(self) 587 counts = pd.Series({ 588 col: nrows 589 for col in self.columns}, index=self.columns) 590 return counts 591
[docs] 592 def __len__(self): 593 """ 594 the projected number of rows when resolving 595 """ 596 # we reduce to just 1 column to reduce speed 597 short = self._clone() 598 short = short[self.columns[0]] if self.columns else short 599 return sum(1 for d in short._get_cursor())
600 601 @property 602 def shape(self): 603 """ 604 return shape of dataframe 605 """ 606 return len(self), len(self.columns) 607 608 @property 609 def ndim(self): 610 return len(self.shape) 611 612 @property 613 def value(self): 614 """ 615 resolve the query and return a Pandas DataFrame 616 617 :return: the result of the query as a pandas DataFrame 618 """ 619 cursor = self._get_cursor() 620 df = self._get_dataframe_from_cursor(cursor) 621 if self.auto_inspect: 622 self._inspect_cache.append(self.inspect(explain=True, cursor=cursor, raw=True)) 623 # this ensures the equiv. of pandas df.loc[n] is a Series 624 if self.from_loc_indexer: 625 if len(df) == 1 and not self.from_loc_range: 626 idx = df.index 627 df = df.T 628 df = df[df.columns[0]] 629 if df.ndim == 1 and len(df) == 1 and not isinstance(idx, pd.MultiIndex): 630 # single row single dimension, numeric index only 631 df = df.iloc[0] 632 elif (df.ndim == 1 or df.shape[1] == 1) and not self.from_loc_range: 633 df = df[df.columns[0]] 634 if self._preparefn: 635 df = self._preparefn(df) 636 return df 637 638 def reset(self): 639 # TODO if head(), tail(), query(), .loc/iloc should return a new MDataFrame instance to avoid having a reset need 640 self.head_limit = None 641 self.skip_topn = None 642 self.filter_criteria = {} 643 self.force_columns = [] 644 self.sort_order = None 645 self.from_loc_indexer = False 646 return self 647 648 def _get_dataframe_from_cursor(self, cursor): 649 """ 650 from the given cursor return a DataFrame 651 """ 652 df = cursor_to_dataframe(cursor, parser=self._parser) 653 df = self._restore_dataframe_proper(df) 654 return df 655 656 @property 657 def _index_meta(self): 658 return self.metadata.get('idx_meta') or dict() 659 660 def _restore_dataframe_proper(self, df): 661 df = restore_index(df, self._index_meta) 662 if '_id' in df.columns and not self._raw: 663 df.drop('_id', axis=1, inplace=True) 664 if self.force_columns: 665 missing = set(self.force_columns) - set(self.columns) 666 for col in missing: 667 df[col] = np.nan 668 return df 669 670 def _get_cursor(self): 671 projection = make_tuple(self.columns) 672 projection += make_tuple(self._get_frame_index()) 673 if not self.sort_order: 674 # implicit sort 675 projection += make_tuple(self._get_frame_om_fields()) 676 cursor = self.collection.find(projection=projection) 677 if self.sort_order: 678 cursor.sort(qops.make_sortkey(make_tuple(self.sort_order))) 679 if self.head_limit: 680 cursor.limit(self.head_limit) 681 if self.skip_topn: 682 cursor.skip(self.skip_topn) 683 return cursor 684
[docs] 685 def sort(self, columns): 686 """ 687 sort by specified columns 688 689 :param columns: str of single column or a list of columns. Sort order 690 is specified as the + (ascending) or - (descending) 691 prefix to the column name. Default sort order is 692 ascending. 693 :return: the MDataFrame 694 """ 695 self._evaluated = None 696 self.sort_order = make_tuple(columns) 697 return self
698
[docs] 699 def head(self, limit=10): 700 """ 701 return up to limit numbers of rows 702 703 :param limit: the number of rows to return. Defaults to 10 704 :return: the MDataFrame 705 """ 706 return self._clone(limit=limit)
707 708 def tail(self, limit=10): 709 """ 710 return up to limit number of rows from last inserted values 711 712 :param limit: 713 :return: 714 """ 715 tail_n = self.skip(len(self) - limit) 716 return self._clone(skip=tail_n) 717
[docs] 718 def skip(self, topn): 719 """ 720 skip the topn number of rows 721 722 :param topn: the number of rows to skip. 723 :return: the MDataFrame 724 """ 725 return self._clone(skip=topn)
726
[docs] 727 def merge(self, right, on=None, left_on=None, right_on=None, 728 how='inner', target=None, suffixes=('_x', '_y'), 729 sort=False, inspect=False, filter=None): 730 """ 731 merge this dataframe with another dataframe. only left outer joins 732 are currently supported. the output is saved as a new collection, 733 target name (defaults to a generated name if not specified). 734 735 :param right: the other MDataFrame 736 :param on: the list of key columns to merge by 737 :param left_on: the list of the key columns to merge on this dataframe 738 :param right_on: the list of the key columns to merge on the other 739 dataframe 740 :param how: the method to merge. supported are left, inner, right. 741 Defaults to inner 742 :param target: the name of the collection to store the merge results 743 in. If not provided a temporary name will be created. 744 :param suffixes: the suffixes to apply to identical left and right 745 columns 746 :param sort: if True the merge results will be sorted. If False the 747 MongoDB natural order is implied. 748 :returns: the MDataFrame to the target MDataFrame 749 """ 750 # validate input 751 supported_how = ["left", 'inner', 'right'] 752 assert how in supported_how, "only %s merges are currently supported" % supported_how 753 for key in [on, left_on, right_on]: 754 if key: 755 assert isinstance( 756 key, str), "only single column merge keys are supported (%s)" % key 757 if isinstance(right, (Collection, PickableCollection, FilteredCollection)): 758 right = MDataFrame(right) 759 assert isinstance( 760 right, MDataFrame), "both must be MDataFrames, got right=%" % type(right) 761 if how == 'right': 762 # A right B == B left A 763 return right.merge(self, on=on, left_on=right_on, right_on=left_on, 764 how='left', target=target, suffixes=suffixes) 765 # generate lookup parameters 766 on = on or '_id' 767 right_name = self._get_collection_name_of(right, right) 768 target_name = self._get_collection_name_of( 769 target, '_temp.merge.%s' % uuid4().hex) 770 target_field = ( 771 "%s_%s" % (right_name.replace('.', '_'), right_on or on)) 772 """ 773 TODO enable filter criteria on right dataframe. requires changing LOOKUP syntax from 774 equitly to arbitray match 775 776 if right.filter_criteria: 777 right_filter = [qops.MATCH(self._get_filter_criteria(**right.filter_criteria))] 778 else: 779 right_filter = None 780 """ 781 right_filter = None 782 lookup = qops.LOOKUP(right_name, 783 key=on, 784 left_key=left_on, 785 right_key=right_on, 786 target=target_field) 787 # unwind merged documents from arrays to top-level document fields 788 unwind = qops.UNWIND(target_field, preserve=how != 'inner') 789 # get all fields from left, right 790 project = {} 791 for left_col in self.columns: 792 source_left_col = left_col 793 if left_col == '_id': 794 project[left_col] = 1 795 continue 796 if left_col.startswith('_idx'): 797 continue 798 if left_col.startswith('_om#'): 799 continue 800 if left_col != (on or left_on) and left_col in right.columns: 801 left_col = '%s%s' % (left_col, suffixes[0]) 802 project[left_col] = "$%s" % source_left_col 803 for right_col in right.columns: 804 if right_col == '_id': 805 continue 806 if right_col.startswith('_idx'): 807 continue 808 if right_col.startswith('_om#'): 809 continue 810 if right_col == (on or right_on) and right_col == (on or left_on): 811 # if the merge field is the same in both frames, we already 812 # have it from left 813 continue 814 if right_col in self.columns: 815 left_col = '%s%s' % (right_col, suffixes[1]) 816 else: 817 left_col = '%s' % right_col 818 project[left_col] = '$%s.%s' % (target_field, right_col) 819 expected_columns = list(project.keys()) 820 if '_id' not in project: 821 project['_id'] = 0 # never copy objectids to avoid duplicate keys, unless requested 822 project = {"$project": project} 823 # store merged documents and return an MDataFrame to it 824 out = qops.OUT(target_name) 825 pipeline = [lookup, unwind, project] 826 if filter: 827 query = qops.MATCH(self._get_filter_criteria(**filter)) 828 pipeline.append(query) 829 if sort: 830 sort_cols = make_list(on or [left_on, right_on]) 831 sort_key = qops.make_sortkey(sort_cols) 832 sort = qops.SORT(**dict(sort_key)) 833 pipeline.append(sort) 834 pipeline.append(out) 835 if inspect: 836 result = pipeline 837 else: 838 result = self.collection.aggregate(pipeline, allowDiskUse=True) 839 result = MDataFrame(self.collection.database[target_name], 840 force_columns=expected_columns) 841 return result
842 843 def append(self, other): 844 if isinstance(other, Collection): 845 other = MDataFrame(other) 846 assert isinstance( 847 other, MDataFrame), "both must be MDataFrames, got other={}".format(type(other)) 848 outname = self.collection.name 849 mrout = { 850 'merge': outname, 851 'nonAtomic': True, 852 } 853 mapfn = Code(""" 854 function() { 855 this._id = ObjectId(); 856 if(this['_om#rowid']) { 857 this['_om#rowid'] += %s; 858 } 859 emit(this._id, this); 860 } 861 """ % len(self)) 862 reducefn = Code(""" 863 function(key, value) { 864 return value; 865 } 866 """) 867 finfn = Code(""" 868 function(key, value) { 869 return value; 870 } 871 """) 872 other.collection.map_reduce(mapfn, reducefn, mrout, finalize=finfn, jsMode=True) 873 unwind = { 874 "$replaceRoot": { 875 "newRoot": { 876 "$ifNull": ["$value", "$$CURRENT"], 877 } 878 } 879 } 880 output = qops.OUT(outname) 881 pipeline = [unwind, output] 882 self.collection.aggregate(pipeline, allowDiskUse=True) 883 return self 884 885 def _get_collection_name_of(self, some, default=None): 886 """ 887 determine the collection name of the given parameter 888 889 returns the collection name if some is a MDataFrame, a Collection 890 or a string_type. Otherwise returns default 891 """ 892 if isinstance(some, MDataFrame): 893 name = some.collection.name 894 elif isinstance(some, Collection): 895 name = some.name 896 else: 897 name = default 898 return name 899 900 def _get_filter_criteria(self, *args, **kwargs): 901 """ 902 return mongo query from filter specs 903 904 this uses a Filter to produce the query from the kwargs. 905 906 :param args: a Q object or logical combination of Q objects 907 (optional) 908 :param kwargs: all AND filter criteria 909 """ 910 if len(args) > 0: 911 q = args[0] 912 if isinstance(q, MongoQ): 913 filter_criteria = Filter(self.collection, q).query 914 elif isinstance(q, Filter): 915 filter_criteria = Filter(self.collection, q.q).query 916 else: 917 filter_criteria = Filter(self.collection, **kwargs).query 918 return filter_criteria 919
[docs] 920 def query_inplace(self, *args, **kwargs): 921 """ 922 filters this MDataFrame and returns it. 923 924 Any subsequent operation on the dataframe will have the filter 925 applied. To reset the filter call .reset() without arguments. 926 927 :param args: a Q object or logical combination of Q objects 928 (optional) 929 :param kwargs: all AND filter criteria 930 :return: self 931 """ 932 self._evaluated = None 933 self.filter_criteria = self._get_filter_criteria(*args, **kwargs) 934 self.collection = FilteredCollection( 935 self.collection, query=self.filter_criteria) 936 return self
937
[docs] 938 def query(self, *args, **kwargs): 939 """ 940 return a new MDataFrame with a filter criteria 941 942 Any subsequent operation on the new dataframe will have the filter 943 applied. To reset the filter call .reset() without arguments. 944 945 Note: Unlike pandas DataFrames, a filtered MDataFrame operates 946 on the same collection as the original DataFrame 947 948 :param args: a Q object or logical combination of Q objects 949 (optional) 950 :param kwargs: all AND filter criteria 951 :return: a new MDataFrame with the filter applied 952 """ 953 effective_filter = dict(self.filter_criteria) 954 filter_criteria = self._get_filter_criteria(*args, **kwargs) 955 if '$and' in effective_filter: 956 effective_filter['$and'].extend(filter_criteria.get('$and')) 957 else: 958 effective_filter.update(filter_criteria) 959 coll = FilteredCollection(self.collection, query=effective_filter) 960 return self._clone(collection=coll, query=effective_filter)
961
[docs] 962 def create_index(self, keys, **kwargs): 963 """ 964 create and index the easy way 965 """ 966 keys, kwargs = MongoQueryOps().make_index(keys) 967 result = ensure_index(self.collection, keys, **kwargs) 968 return result
969 970 def list_indexes(self): 971 """ 972 list all indices in database 973 """ 974 return cursor_to_dataframe(self.collection.list_indexes()) 975 976 def iterchunks(self, chunksize=100): 977 """ 978 return an iterator 979 980 Args: 981 chunksize (int): number of rows in each chunk 982 983 Returns: 984 a dataframe of max. length chunksize 985 """ 986 chunksize = int(chunksize) 987 i = 0 988 while True: 989 chunkdf = self.skip(i).head(chunksize).value 990 if len(chunkdf) == 0: 991 break 992 i += chunksize 993 yield chunkdf 994 995 def itertuples(self, chunksize=1000): 996 chunksize = int(chunksize) 997 __doc__ = pd.DataFrame.itertuples.__doc__ 998 999 for chunkdf in self.iterchunks(chunksize=chunksize): 1000 for row in chunkdf.iterrows(): 1001 yield row 1002 1003 def iterrows(self, chunksize=1000): 1004 chunksize = int(chunksize) 1005 __doc__ = pd.DataFrame.iterrows.__doc__ 1006 1007 for chunkdf in self.iterchunks(chunksize=chunksize): 1008 if isinstance(chunkdf, pd.DataFrame): 1009 for row in chunkdf.iterrows(): 1010 yield row 1011 else: 1012 # Series does not have iterrows 1013 for i in range(0, len(chunkdf), chunksize): 1014 yield chunkdf.iloc[i:i + chunksize] 1015 1016 def iteritems(self): 1017 if not hasattr(pd.DataFrame, 'iteritems'): 1018 raise NotImplementedError('MDataFrame.iteritems has been removed since Pandas 2.0. Use .items instead.') 1019 __doc__ = pd.DataFrame.iteritems.__doc__ 1020 return self.items() 1021 1022 def items(self): 1023 __doc__ = pd.DataFrame.items.__doc__ 1024 1025 for col in self.columns: 1026 yield col, self[col].value 1027 1028 @property 1029 def loc(self): 1030 """ 1031 Access by index 1032 1033 Use as mdf.loc[index_value] 1034 1035 :return: MLocIndexer 1036 """ 1037 self._evaluated = None 1038 indexer = MLocIndexer(self) 1039 return indexer 1040 1041 @property 1042 def iloc(self): 1043 self._evaluated = None 1044 indexer = MPosIndexer(self) 1045 return indexer 1046 1047 def rows(self, start=None, end=None, chunksize=1000): 1048 # equivalent to .iloc[start:end].iteritems(), 1049 start, end, chunksize = (int(v) for v in (start, end, chunksize)) 1050 return self.iloc[slice(start, end)].iterchunks(chunksize) 1051 1052 def __repr__(self): 1053 kwargs = ', '.join('{}={}'.format(k, v) for k, v in self._getcopy_kwargs().items()) 1054 return "MDataFrame(collection={collection.name}, {kwargs})".format(collection=self.collection, 1055 kwargs=kwargs)
1056 1057
[docs] 1058class MSeries(MDataFrame): 1059 """ 1060 Series implementation for MDataFrames 1061 1062 behaves like a DataFrame but limited to one column. 1063 """ 1064 1065 def __init__(self, *args, **kwargs): 1066 super(MSeries, self).__init__(*args, **kwargs) 1067 # true if only unique values apply 1068 self.is_unique = False 1069 # apply mixins 1070 self._applyto = str(self.__class__) 1071 self._apply_mixins(*args, **kwargs) 1072 1073 def __getitem__(self, cols_or_slice): 1074 if isinstance(cols_or_slice, Filter): 1075 return MSeries(self.collection, columns=self.columns, 1076 query=cols_or_slice.query) 1077 return super(MSeries, self).__getitem__(cols_or_slice) 1078 1079 @property 1080 def name(self): 1081 return self.columns[0] 1082
[docs] 1083 def unique(self): 1084 """ 1085 return the unique set of values for the series 1086 1087 :return: MSeries 1088 """ 1089 self.is_unique = True 1090 return self
1091 1092 def _get_cursor(self): 1093 if self.is_unique: 1094 # this way indexes get applied 1095 cursor = self.collection.distinct(make_tuple(self.columns)[0]) 1096 else: 1097 cursor = super(MSeries, self)._get_cursor() 1098 return cursor 1099 1100 @property 1101 def value(self): 1102 """ 1103 return the value of the series 1104 1105 this is a Series unless unique() was called. If unique() 1106 only distinct values are returned as an array, matching 1107 the behavior of a Series 1108 1109 :return: pandas.Series 1110 """ 1111 cursor = self._get_cursor() 1112 column = make_tuple(self.columns)[0] 1113 if self.is_unique: 1114 # the .distinct() cursor returns a list of values 1115 # this is to make sure we return the same thing as pandas 1116 val = [v for v in cursor] 1117 else: 1118 val = self._get_dataframe_from_cursor(cursor) 1119 val = val[column] 1120 val.name = self.name 1121 if len(val) == 1 and self.from_loc_indexer: 1122 val = val.iloc[0] 1123 if self.auto_inspect: 1124 self._inspect_cache.append(self.inspect(explain=True, cursor=cursor, raw=True)) 1125 if self._preparefn: 1126 df = self._preparefn(val) 1127 return val 1128 1129 def __repr__(self): 1130 kwargs = ', '.join('{}={}'.format(k, v) for k, v in self._getcopy_kwargs().items()) 1131 return "MSeries(collection={collection.name}, {kwargs})".format(collection=self.collection, 1132 kwargs=kwargs) 1133 1134 @property 1135 def shape(self): 1136 return len(self),
1137 1138 1139def _mdf_remake(collection): 1140 # recreate a pickled MDF 1141 mdf = MDataFrame(collection) 1142 return mdf