Source code for omegaml.mdataframe

   1from __future__ import absolute_import
   2
   3import numpy as np
   4import pandas as pd
   5from bson import Code
   6from numpy import isscalar
   7from pymongo.collection import Collection
   8from uuid import uuid4
   9
  10from omegaml.store import qops
  11from omegaml.store.filtered import FilteredCollection
  12from omegaml.store.query import Filter, MongoQ
  13from omegaml.store.queryops import MongoQueryOps
  14from omegaml.util import make_tuple, make_list, restore_index, \
  15    cursor_to_dataframe, restore_index_columns_order, PickableCollection, extend_instance, json_normalize, ensure_index
  16
  17INSPECT_CACHE = []
  18
  19
[docs] 20class MGrouper(object): 21 """ 22 a Grouper for MDataFrames 23 """ 24 STATS_MAP = { 25 'std': 'stdDevSamp', 26 'mean': 'avg', 27 } 28 29 def __init__(self, mdataframe, collection, columns, sort=True): 30 self.mdataframe = mdataframe 31 self.collection = collection 32 self.columns = make_tuple(columns) 33 self.should_sort = sort 34 35 def __getattr__(self, attr): 36 if attr in self.columns: 37 return MSeriesGroupby(self, self.collection, attr) 38 39 def statfunc(): 40 columns = self.columns or self._non_group_columns() 41 return self.agg({col: attr for col in columns}) 42 43 return statfunc 44 45 def __getitem__(self, item): 46 return self.__getattr__(item) 47
[docs] 48 def agg(self, specs): 49 """ 50 shortcut for .aggregate 51 """ 52 return self.aggregate(specs)
53
[docs] 54 def aggregate(self, specs, **kwargs): 55 """ 56 aggregate by given specs 57 58 See the following link for a list of supported operations. 59 https://docs.mongodb.com/manual/reference/operator/aggregation/group/ 60 61 :param specs: a dictionary of { column : function | list[functions] } 62 pairs. 63 """ 64 65 def add_stats(specs, column, stat): 66 specs['%s_%s' % (column, stat)] = { 67 '$%s' % MGrouper.STATS_MAP.get(stat, stat): '$%s' % column} 68 69 # generate $group command 70 _specs = {} 71 for column, stats in specs.items(): 72 stats = make_tuple(stats) 73 for stat in stats: 74 add_stats(_specs, column, stat) 75 groupby = qops.GROUP(columns=self.columns, 76 **_specs) 77 # execute and return a dataframe 78 pipeline = self._amend_pipeline([groupby]) 79 data = self.collection.aggregate(pipeline, allowDiskUse=True) 80 81 def get_data(): 82 # we need this to build a pipeline for from_records 83 # to process, otherwise the cursor will be exhausted already 84 for group in data: 85 _id = group.pop('_id') 86 if isinstance(_id, dict): 87 group.update(_id) 88 yield group 89 90 df = pd.DataFrame.from_records(get_data()) 91 columns = make_list(self.columns) 92 if columns: 93 df = df.set_index(columns, drop=True) 94 return df
95 96 def _amend_pipeline(self, pipeline): 97 """ amend pipeline with default ops on coll.aggregate() calls """ 98 if self.should_sort: 99 sort = qops.SORT(**dict(qops.make_sortkey('_id'))) 100 pipeline.append(sort) 101 return pipeline 102 103 def _non_group_columns(self): 104 """ get all columns in mdataframe that is not in columns """ 105 return [col for col in self.mdataframe.columns 106 if col not in self.columns and col != '_id' 107 and not col.startswith('_idx') 108 and not col.startswith('_om#')] 109 110 def _count(self): 111 count_columns = self._non_group_columns() 112 if len(count_columns) == 0: 113 count_columns.append('_'.join(self.columns) + '_count') 114 groupby = { 115 "$group": { 116 "_id": {k: "$%s" % k for k in self.columns}, 117 } 118 } 119 for k in count_columns: 120 groupby['$group']['%s' % k] = {"$sum": 1} 121 pipeline = self._amend_pipeline([groupby]) 122 if self.should_sort: 123 sort = qops.SORT(**dict(qops.make_sortkey('_id'))) 124 pipeline.append(sort) 125 return list(self.collection.aggregate(pipeline, allowDiskUse=True)) 126
[docs] 127 def count(self): 128 """ return counts by group columns """ 129 counts = self._count() 130 # remove mongo object _id 131 for group in counts: 132 group.update(group.pop('_id')) 133 # transform results to dataframe, then return as pandas would 134 resultdf = pd.DataFrame(counts).set_index(make_list(self.columns), 135 drop=True) 136 return resultdf
137 138 def __iter__(self): 139 """ for each group returns the key and a Filter object""" 140 # reduce count to only one column 141 groups = getattr(self, self.columns[0])._count() 142 for group in groups: 143 keys = group.get('_id') 144 data = self.mdataframe._clone(query=keys) 145 yield keys, data
146 147
[docs] 148class MLocIndexer(object): 149 """ 150 implements the LocIndexer for MDataFrames 151 """ 152 153 def __init__(self, mdataframe, positional=False): 154 self.mdataframe = mdataframe 155 # if positional, any loc[spec] will be applied on the rowid only 156 self.positional = positional 157 # indicator will be set true if loc specs are from a range type (list, tuple, np.ndarray) 158 self._from_range = False 159
[docs] 160 def __getitem__(self, specs): 161 """ 162 access by index 163 164 use as mdf.loc[specs] where specs is any of 165 166 * a list or tuple of scalar index values, e.g. .loc[(1,2,3)] 167 * a slice of values e.g. .loc[1:5] 168 * a list of slices, e.g. .loc[1:5, 2:3] 169 170 :return: the sliced part of the MDataFrame 171 """ 172 filterq, projection = self._get_filter(specs) 173 df = self.mdataframe 174 if filterq: 175 df = self.mdataframe.query(filterq) 176 df.from_loc_indexer = True 177 df.from_loc_range = self._from_range 178 if projection: 179 df = df[projection] 180 if isinstance(self.mdataframe, MSeries): 181 df = df._as_mseries(df.columns[0]) 182 if getattr(df, 'immediate_loc', False): 183 df = df.value 184 return df
185 186 def __setitem__(self, specs, value): 187 raise NotImplementedError() 188 189 def _get_filter(self, specs): 190 filterq = [] 191 projection = None 192 if self.positional: 193 idx_cols = ['_om#rowid'] 194 else: 195 idx_cols = self.mdataframe._get_frame_index() 196 flt_kwargs = {} 197 enumerable_types = (list, tuple, np.ndarray) 198 if isinstance(specs, np.ndarray): 199 specs = specs.tolist() 200 if (isinstance(specs, enumerable_types) 201 and isscalar(specs[0]) and len(idx_cols) == 1 202 and not any(isinstance(s, slice) for s in specs)): 203 # single column index with list of scalar values 204 if (self.positional and isinstance(specs, tuple) and len(specs) == 2 205 and all(isscalar(v) for v in specs)): 206 # iloc[int, int] is a cell access 207 flt_kwargs[idx_cols[0]] = specs[0] 208 projection = self._get_projection(specs[1]) 209 else: 210 flt_kwargs['{}__in'.format(idx_cols[0])] = specs 211 self._from_range = True 212 elif isinstance(specs, (int, str)): 213 flt_kwargs[idx_cols[0]] = specs 214 else: 215 specs = make_tuple(specs) 216 # list/tuple of slices or scalar values, or MultiIndex 217 for i, spec in enumerate(specs): 218 if i < len(idx_cols): 219 col = idx_cols[i] 220 if isinstance(spec, slice): 221 self._from_range = True 222 start, stop = spec.start, spec.stop 223 if start is not None: 224 flt_kwargs['{}__gte'.format(col)] = start 225 if stop is not None: 226 if isinstance(stop, int): 227 stop -= int(self.positional) 228 flt_kwargs['{}__lte'.format(col)] = stop 229 elif isinstance(spec, enumerable_types) and isscalar(spec[0]): 230 self._from_range = True 231 # single column index with list of scalar values 232 # -- convert to list for PyMongo serialization 233 if isinstance(spec, np.ndarray): 234 spec = spec.tolist() 235 flt_kwargs['{}__in'.format(col)] = spec 236 elif isscalar(col): 237 flt_kwargs[col] = spec 238 else: 239 # we're out of index columns, let's look at columns 240 cur_proj = self._get_projection(spec) 241 # if we get a single column, that's the projection 242 # if we had a single column, now need to add more, create a list 243 # if we have a list already, extend that 244 if projection is None: 245 projection = cur_proj 246 elif isinstance(projection, list): 247 projection.extend(cur_proj) 248 else: 249 projection = [projection, cur_proj] 250 if flt_kwargs: 251 filterq.append(MongoQ(**flt_kwargs)) 252 finalq = None 253 for q in filterq: 254 if finalq: 255 finalq |= q 256 else: 257 finalq = q 258 return finalq, projection 259 260 def _get_projection(self, spec): 261 columns = self.mdataframe.columns 262 if np.isscalar(spec): 263 return [spec] 264 if isinstance(spec, (tuple, list)): 265 assert all(columns.index(col) for col in columns) 266 return spec 267 if isinstance(spec, slice): 268 start, stop = spec.start, spec.stop 269 if all(isinstance(v, int) for v in (start, stop)): 270 start, stop, step = spec.indices(len(columns)) 271 else: 272 start = columns.index(start) if start is not None else 0 273 stop = columns.index(stop) + 1 if stop is not None else len(columns) 274 return columns[slice(start, stop)] 275 raise IndexError
276 277
[docs] 278class MPosIndexer(MLocIndexer): 279 """ 280 implements the position-based indexer for MDataFrames 281 """ 282 283 def __init__(self, mdataframe): 284 super(MPosIndexer, self).__init__(mdataframe, positional=True) 285 286 def _get_projection(self, spec): 287 columns = self.mdataframe.columns 288 if np.isscalar(spec): 289 return columns[spec] 290 if isinstance(spec, (tuple, list)): 291 return [col for i, col in enumerate(spec) if i in spec] 292 if isinstance(spec, slice): 293 start, stop = slice.start, slice.stop 294 if start and not isinstance(start, int): 295 start = 0 296 if stop and not isinstance(stop, int): 297 # sliced ranges are inclusive 298 stop = len(columns) 299 return columns[slice(start, stop)] 300 raise IndexError
301 302 303class MSeriesGroupby(MGrouper): 304 """ 305 like a MGrouper but limited to one column 306 """ 307 308 def count(self): 309 """ 310 return series count 311 312 :return: counts by group 313 """ 314 # MGrouper will insert a _count column, see _count(). we remove 315 # that column again and return a series named as the group column 316 resultdf = super(MSeriesGroupby, self).count() 317 count_column = [col for col in resultdf.columns 318 if col.endswith('_count')][0] 319 new_column = count_column.replace('_count', '') 320 resultdf = resultdf.rename(columns={count_column: new_column}) 321 return resultdf[new_column] 322 323
[docs] 324class MDataFrame(object): 325 """ 326 A DataFrame for mongodb 327 328 Performs out-of-core, lazy computOation on a mongodb cluster. 329 Behaves like a pandas DataFrame. Actual results are returned 330 as pandas DataFrames. 331 """ 332 333 STATFUNCS = ['mean', 'std', 'min', 'max', 'sum', 'var'] 334 335 def __init__(self, collection, columns=None, query=None, 336 limit=None, skip=None, sort_order=None, 337 force_columns=None, immediate_loc=False, auto_inspect=False, 338 normalize=False, raw=False, 339 parser=None, 340 preparefn=None, from_loc_range=False, metadata=None, **kwargs): 341 self.collection = PickableCollection(collection) 342 # columns in frame 343 self.columns = make_tuple(columns) if columns else self._get_fields(raw=raw) 344 self.columns = [str(col) for col in self.columns] 345 # columns to sort by, defaults to not sorted 346 self.sort_order = sort_order 347 # top n documents to fetch 348 self.head_limit = limit 349 # top n documents to skip before returning 350 self.skip_topn = skip 351 # filter criteria 352 self.filter_criteria = query or {} 353 # force columns -- on output add columns not present 354 self.force_columns = force_columns or [] 355 # was this created from the loc indexer? 356 self.from_loc_indexer = kwargs.get('from_loc_indexer', False) 357 # was the loc index used a range? Else a single value 358 self.from_loc_range = from_loc_range 359 # setup query for filter criteries, if provided 360 if self.filter_criteria: 361 # make sure we have a filtered collection with the criteria given 362 if isinstance(self.filter_criteria, dict): 363 self.query_inplace(**self.filter_criteria) 364 elif isinstance(self.filter_criteria, Filter): 365 self.query_inplace(self.filter_criteria) 366 else: 367 raise ValueError('Invalid query specification of type {}'.format(type(self.filter_criteria))) 368 # if immediate_loc is True, .loc and .iloc always evaluate 369 self.immediate_loc = immediate_loc 370 # __array__ will return this value if it is set, set it otherwise 371 self._evaluated = None 372 # set true to automatically capture inspects on .value. retrieve using .inspect(cached=True) 373 self.auto_inspect = auto_inspect 374 self._inspect_cache = INSPECT_CACHE 375 # apply mixins 376 self._applyto = str(self.__class__) 377 self._apply_mixins() 378 # parser to parse documents to dataframe 379 self._parser = json_normalize if normalize else parser 380 # prepare function to be applied just before returning from .value 381 self._preparefn = preparefn 382 # keep technical fields like _id, _idx etc 383 self._raw = raw 384 # metadata stored by omegaml (equiv. of metadata.kind_meta) 385 self.metadata = metadata or dict() 386 387 def _apply_mixins(self, *args, **kwargs): 388 """ 389 apply mixins in defaults.OMEGA_MDF_MIXINS 390 """ 391 from omegaml import settings 392 defaults = settings() 393 for mixin, applyto in defaults.OMEGA_MDF_MIXINS: 394 if any(v in self._applyto for v in applyto.split(',')): 395 extend_instance(self, mixin, *args, **kwargs) 396 397 def __getstate__(self): 398 # pickle support. note that the hard work is done in PickableCollection 399 data = dict(self.__dict__) 400 data.update(_evaluated=None) 401 data.update(_inspect_cache=None) 402 data.update(auto_inspect=self.auto_inspect) 403 data.update(_preparefn=self._preparefn) 404 data.update(_parser=self._parser) 405 data.update(_raw=self._raw) 406 data.update(collection=self.collection) 407 return data 408 409 def __reduce__(self): 410 state = self.__getstate__() 411 args = self.collection, 412 return _mdf_remake, args, state 413 414 def __setstate__(self, state): 415 # pickle support. note that the hard work is done in PickableCollection 416 for k, v in state.items(): 417 setattr(self, k, v) 418 419 def _getcopy_kwargs(self, without=None): 420 """ return all parameters required on a copy of this MDataFrame """ 421 kwargs = dict(columns=self.columns, 422 sort_order=self.sort_order, 423 limit=self.head_limit, 424 skip=self.skip_topn, 425 from_loc_indexer=self.from_loc_indexer, 426 from_loc_range=self.from_loc_range, 427 immediate_loc=self.immediate_loc, 428 metadata=self.metadata, 429 query=self.filter_criteria, 430 auto_inspect=self.auto_inspect, 431 parser=self._parser, 432 preparefn=self._preparefn) 433 [kwargs.pop(k) for k in make_tuple(without or [])] 434 return kwargs 435 436 def __array__(self, dtype=None): 437 # FIXME inefficient. make MDataFrame a drop-in replacement for any numpy ndarray 438 # this evaluates every single time 439 if self._evaluated is None: 440 self._evaluated = array = self.value.values 441 else: 442 array = self._evaluated 443 return array 444 445 def __getattr__(self, attr): 446 if attr in MDataFrame.STATFUNCS: 447 return self.statfunc(attr) 448 if attr in self.columns: 449 kwargs = self._getcopy_kwargs() 450 kwargs.update(columns=attr) 451 return MSeries(self.collection, **kwargs) 452 raise AttributeError(attr) 453 454 def __getitem__(self, cols_or_slice): 455 """ 456 select and project by column, columns, slice, masked-style filter 457 458 Masked-style filters work similar to pd.DataFrame/Series masks 459 but do not actually return masks but an instance of Filter. A 460 Filter is a delayed evaluation on the data frame. 461 462 # select all rows where any column is == 5 463 mdf = MDataFrame(coll) 464 flt = mdf == 5 465 mdf[flt] 466 => 467 468 :param cols_or_slice: single column (str), multi-columns (list), 469 slice to select columns or a masked-style 470 :return: filtered MDataFrame or MSeries 471 """ 472 if isinstance(cols_or_slice, str): 473 # column name => MSeries 474 return self._as_mseries(cols_or_slice) 475 elif isinstance(cols_or_slice, int): 476 # column number => MSeries 477 column = self.columns[cols_or_slice] 478 return self._as_mseries(column) 479 elif isinstance(cols_or_slice, (tuple, list)): 480 # list of column names => MDataFrame subset on columns 481 kwargs = self._getcopy_kwargs() 482 kwargs.update(columns=cols_or_slice) 483 return MDataFrame(self.collection, **kwargs) 484 elif isinstance(cols_or_slice, Filter): 485 kwargs = self._getcopy_kwargs() 486 kwargs.update(query=cols_or_slice.query) 487 return MDataFrame(self.collection, **kwargs) 488 elif isinstance(cols_or_slice, np.ndarray): 489 return self.iloc[cols_or_slice] 490 raise ValueError('unknown accessor type %s' % type(cols_or_slice)) 491 492 def __setitem__(self, column, value): 493 # True for any scalar type, numeric, bool, string 494 if np.isscalar(value): 495 result = self.collection.update_many(filter=self.filter_criteria, 496 update=qops.SET(column, value)) 497 self.columns.append(column) 498 return self 499 500 def _clone(self, collection=None, **kwargs): 501 # convenience method to clone itself with updates 502 collection = collection if collection is not None else self.collection 503 return self.__class__(collection, **kwargs, 504 **self._getcopy_kwargs(without=list(kwargs.keys()))) 505 506 def statfunc(self, stat): 507 aggr = MGrouper(self, self.collection, [], sort=False) 508 return getattr(aggr, stat) 509
[docs] 510 def groupby(self, columns, sort=True): 511 """ 512 Group by a given set of columns 513 514 :param columns: the list of columns 515 :param sort: if True sort by group key 516 :return: MGrouper 517 """ 518 return MGrouper(self, self.collection, columns, sort=sort)
519 520 def _get_fields(self, raw=False): 521 result = [] 522 doc = self.collection.find_one() 523 if doc is not None: 524 if raw: 525 result = list(doc.keys()) 526 else: 527 result = [str(col) for col in doc.keys() 528 if col != '_id' 529 and not col.startswith('_idx') 530 and not col.startswith('_om#')] 531 return result 532 533 def _get_frame_index(self): 534 """ return the dataframe's index columns """ 535 doc = self.collection.find_one() 536 if doc is None: 537 result = [] 538 else: 539 result = restore_index_columns_order(doc.keys()) 540 return result 541 542 def _get_frame_om_fields(self): 543 """ return the dataframe's omega special fields columns """ 544 doc = self.collection.find_one() 545 if doc is None: 546 result = [] 547 else: 548 result = [k for k in list(doc.keys()) if k.startswith('_om#')] 549 return result 550 551 def _as_mseries(self, column): 552 kwargs = self._getcopy_kwargs() 553 kwargs.update(columns=make_tuple(column)) 554 return MSeries(self.collection, **kwargs) 555
[docs] 556 def inspect(self, explain=False, cached=False, cursor=None, raw=False): 557 """ 558 inspect this dataframe's actual mongodb query 559 560 :param explain: if True explains access path 561 """ 562 if not cached: 563 if isinstance(self.collection, FilteredCollection): 564 query = self.collection.query 565 else: 566 query = '*', 567 if explain: 568 cursor = cursor or self._get_cursor() 569 explain = cursor.explain() 570 data = { 571 'projection': self.columns, 572 'query': query, 573 'explain': explain or 'specify explain=True' 574 } 575 else: 576 data = self._inspect_cache 577 if not (raw or explain): 578 data = pd.DataFrame(json_normalize(data)) 579 return data
580 581 def count(self): 582 """ 583 projected number of rows when resolving 584 """ 585 nrows = len(self) 586 counts = pd.Series({ 587 col: nrows 588 for col in self.columns}, index=self.columns) 589 return counts 590
[docs] 591 def __len__(self): 592 """ 593 the projected number of rows when resolving 594 """ 595 # we reduce to just 1 column to reduce speed 596 short = self._clone()[self.columns[0]] 597 return sum(1 for d in short._get_cursor())
598 599 @property 600 def shape(self): 601 """ 602 return shape of dataframe 603 """ 604 return len(self), len(self.columns) 605 606 @property 607 def ndim(self): 608 return len(self.shape) 609 610 @property 611 def value(self): 612 """ 613 resolve the query and return a Pandas DataFrame 614 615 :return: the result of the query as a pandas DataFrame 616 """ 617 cursor = self._get_cursor() 618 df = self._get_dataframe_from_cursor(cursor) 619 if self.auto_inspect: 620 self._inspect_cache.append(self.inspect(explain=True, cursor=cursor, raw=True)) 621 # this ensures the equiv. of pandas df.loc[n] is a Series 622 if self.from_loc_indexer: 623 if len(df) == 1 and not self.from_loc_range: 624 idx = df.index 625 df = df.T 626 df = df[df.columns[0]] 627 if df.ndim == 1 and len(df) == 1 and not isinstance(idx, pd.MultiIndex): 628 # single row single dimension, numeric index only 629 df = df.iloc[0] 630 elif (df.ndim == 1 or df.shape[1] == 1) and not self.from_loc_range: 631 df = df[df.columns[0]] 632 if self._preparefn: 633 df = self._preparefn(df) 634 return df 635 636 def reset(self): 637 # TODO if head(), tail(), query(), .loc/iloc should return a new MDataFrame instance to avoid having a reset need 638 self.head_limit = None 639 self.skip_topn = None 640 self.filter_criteria = {} 641 self.force_columns = [] 642 self.sort_order = None 643 self.from_loc_indexer = False 644 return self 645 646 def _get_dataframe_from_cursor(self, cursor): 647 """ 648 from the given cursor return a DataFrame 649 """ 650 df = cursor_to_dataframe(cursor, parser=self._parser) 651 df = self._restore_dataframe_proper(df) 652 return df 653 654 @property 655 def _index_meta(self): 656 return self.metadata.get('idx_meta') or dict() 657 658 def _restore_dataframe_proper(self, df): 659 df = restore_index(df, self._index_meta) 660 if '_id' in df.columns and not self._raw: 661 df.drop('_id', axis=1, inplace=True) 662 if self.force_columns: 663 missing = set(self.force_columns) - set(self.columns) 664 for col in missing: 665 df[col] = np.NaN 666 return df 667 668 def _get_cursor(self): 669 projection = make_tuple(self.columns) 670 projection += make_tuple(self._get_frame_index()) 671 if not self.sort_order: 672 # implicit sort 673 projection += make_tuple(self._get_frame_om_fields()) 674 cursor = self.collection.find(projection=projection) 675 if self.sort_order: 676 cursor.sort(qops.make_sortkey(make_tuple(self.sort_order))) 677 if self.head_limit: 678 cursor.limit(self.head_limit) 679 if self.skip_topn: 680 cursor.skip(self.skip_topn) 681 return cursor 682
[docs] 683 def sort(self, columns): 684 """ 685 sort by specified columns 686 687 :param columns: str of single column or a list of columns. Sort order 688 is specified as the + (ascending) or - (descending) 689 prefix to the column name. Default sort order is 690 ascending. 691 :return: the MDataFrame 692 """ 693 self._evaluated = None 694 self.sort_order = make_tuple(columns) 695 return self
696
[docs] 697 def head(self, limit=10): 698 """ 699 return up to limit numbers of rows 700 701 :param limit: the number of rows to return. Defaults to 10 702 :return: the MDataFrame 703 """ 704 return self._clone(limit=limit)
705 706 def tail(self, limit=10): 707 """ 708 return up to limit number of rows from last inserted values 709 710 :param limit: 711 :return: 712 """ 713 tail_n = self.skip(len(self) - limit) 714 return self._clone(skip=tail_n) 715
[docs] 716 def skip(self, topn): 717 """ 718 skip the topn number of rows 719 720 :param topn: the number of rows to skip. 721 :return: the MDataFrame 722 """ 723 return self._clone(skip=topn)
724
[docs] 725 def merge(self, right, on=None, left_on=None, right_on=None, 726 how='inner', target=None, suffixes=('_x', '_y'), 727 sort=False, inspect=False, filter=None): 728 """ 729 merge this dataframe with another dataframe. only left outer joins 730 are currently supported. the output is saved as a new collection, 731 target name (defaults to a generated name if not specified). 732 733 :param right: the other MDataFrame 734 :param on: the list of key columns to merge by 735 :param left_on: the list of the key columns to merge on this dataframe 736 :param right_on: the list of the key columns to merge on the other 737 dataframe 738 :param how: the method to merge. supported are left, inner, right. 739 Defaults to inner 740 :param target: the name of the collection to store the merge results 741 in. If not provided a temporary name will be created. 742 :param suffixes: the suffixes to apply to identical left and right 743 columns 744 :param sort: if True the merge results will be sorted. If False the 745 MongoDB natural order is implied. 746 :returns: the MDataFrame to the target MDataFrame 747 """ 748 # validate input 749 supported_how = ["left", 'inner', 'right'] 750 assert how in supported_how, "only %s merges are currently supported" % supported_how 751 for key in [on, left_on, right_on]: 752 if key: 753 assert isinstance( 754 key, str), "only single column merge keys are supported (%s)" % key 755 if isinstance(right, (Collection, PickableCollection, FilteredCollection)): 756 right = MDataFrame(right) 757 assert isinstance( 758 right, MDataFrame), "both must be MDataFrames, got right=%" % type(right) 759 if how == 'right': 760 # A right B == B left A 761 return right.merge(self, on=on, left_on=right_on, right_on=left_on, 762 how='left', target=target, suffixes=suffixes) 763 # generate lookup parameters 764 on = on or '_id' 765 right_name = self._get_collection_name_of(right, right) 766 target_name = self._get_collection_name_of( 767 target, '_temp.merge.%s' % uuid4().hex) 768 target_field = ( 769 "%s_%s" % (right_name.replace('.', '_'), right_on or on)) 770 """ 771 TODO enable filter criteria on right dataframe. requires changing LOOKUP syntax from 772 equitly to arbitray match 773 774 if right.filter_criteria: 775 right_filter = [qops.MATCH(self._get_filter_criteria(**right.filter_criteria))] 776 else: 777 right_filter = None 778 """ 779 right_filter = None 780 lookup = qops.LOOKUP(right_name, 781 key=on, 782 left_key=left_on, 783 right_key=right_on, 784 target=target_field) 785 # unwind merged documents from arrays to top-level document fields 786 unwind = qops.UNWIND(target_field, preserve=how != 'inner') 787 # get all fields from left, right 788 project = {} 789 for left_col in self.columns: 790 source_left_col = left_col 791 if left_col == '_id': 792 project[left_col] = 1 793 continue 794 if left_col.startswith('_idx'): 795 continue 796 if left_col.startswith('_om#'): 797 continue 798 if left_col != (on or left_on) and left_col in right.columns: 799 left_col = '%s%s' % (left_col, suffixes[0]) 800 project[left_col] = "$%s" % source_left_col 801 for right_col in right.columns: 802 if right_col == '_id': 803 continue 804 if right_col.startswith('_idx'): 805 continue 806 if right_col.startswith('_om#'): 807 continue 808 if right_col == (on or right_on) and right_col == (on or left_on): 809 # if the merge field is the same in both frames, we already 810 # have it from left 811 continue 812 if right_col in self.columns: 813 left_col = '%s%s' % (right_col, suffixes[1]) 814 else: 815 left_col = '%s' % right_col 816 project[left_col] = '$%s.%s' % (target_field, right_col) 817 expected_columns = list(project.keys()) 818 if '_id' not in project: 819 project['_id'] = 0 # never copy objectids to avoid duplicate keys, unless requested 820 project = {"$project": project} 821 # store merged documents and return an MDataFrame to it 822 out = qops.OUT(target_name) 823 pipeline = [lookup, unwind, project] 824 if filter: 825 query = qops.MATCH(self._get_filter_criteria(**filter)) 826 pipeline.append(query) 827 if sort: 828 sort_cols = make_list(on or [left_on, right_on]) 829 sort_key = qops.make_sortkey(sort_cols) 830 sort = qops.SORT(**dict(sort_key)) 831 pipeline.append(sort) 832 pipeline.append(out) 833 if inspect: 834 result = pipeline 835 else: 836 result = self.collection.aggregate(pipeline, allowDiskUse=True) 837 result = MDataFrame(self.collection.database[target_name], 838 force_columns=expected_columns) 839 return result
840 841 def append(self, other): 842 if isinstance(other, Collection): 843 other = MDataFrame(other) 844 assert isinstance( 845 other, MDataFrame), "both must be MDataFrames, got other={}".format(type(other)) 846 outname = self.collection.name 847 mrout = { 848 'merge': outname, 849 'nonAtomic': True, 850 } 851 mapfn = Code(""" 852 function() { 853 this._id = ObjectId(); 854 if(this['_om#rowid']) { 855 this['_om#rowid'] += %s; 856 } 857 emit(this._id, this); 858 } 859 """ % len(self)) 860 reducefn = Code(""" 861 function(key, value) { 862 return value; 863 } 864 """) 865 finfn = Code(""" 866 function(key, value) { 867 return value; 868 } 869 """) 870 other.collection.map_reduce(mapfn, reducefn, mrout, finalize=finfn, jsMode=True) 871 unwind = { 872 "$replaceRoot": { 873 "newRoot": { 874 "$ifNull": ["$value", "$$CURRENT"], 875 } 876 } 877 } 878 output = qops.OUT(outname) 879 pipeline = [unwind, output] 880 self.collection.aggregate(pipeline, allowDiskUse=True) 881 return self 882 883 def _get_collection_name_of(self, some, default=None): 884 """ 885 determine the collection name of the given parameter 886 887 returns the collection name if some is a MDataFrame, a Collection 888 or a string_type. Otherwise returns default 889 """ 890 if isinstance(some, MDataFrame): 891 name = some.collection.name 892 elif isinstance(some, Collection): 893 name = some.name 894 else: 895 name = default 896 return name 897 898 def _get_filter_criteria(self, *args, **kwargs): 899 """ 900 return mongo query from filter specs 901 902 this uses a Filter to produce the query from the kwargs. 903 904 :param args: a Q object or logical combination of Q objects 905 (optional) 906 :param kwargs: all AND filter criteria 907 """ 908 if len(args) > 0: 909 q = args[0] 910 if isinstance(q, MongoQ): 911 filter_criteria = Filter(self.collection, q).query 912 elif isinstance(q, Filter): 913 filter_criteria = Filter(self.collection, q.q).query 914 else: 915 filter_criteria = Filter(self.collection, **kwargs).query 916 return filter_criteria 917
[docs] 918 def query_inplace(self, *args, **kwargs): 919 """ 920 filters this MDataFrame and returns it. 921 922 Any subsequent operation on the dataframe will have the filter 923 applied. To reset the filter call .reset() without arguments. 924 925 :param args: a Q object or logical combination of Q objects 926 (optional) 927 :param kwargs: all AND filter criteria 928 :return: self 929 """ 930 self._evaluated = None 931 self.filter_criteria = self._get_filter_criteria(*args, **kwargs) 932 self.collection = FilteredCollection( 933 self.collection, query=self.filter_criteria) 934 return self
935
[docs] 936 def query(self, *args, **kwargs): 937 """ 938 return a new MDataFrame with a filter criteria 939 940 Any subsequent operation on the new dataframe will have the filter 941 applied. To reset the filter call .reset() without arguments. 942 943 Note: Unlike pandas DataFrames, a filtered MDataFrame operates 944 on the same collection as the original DataFrame 945 946 :param args: a Q object or logical combination of Q objects 947 (optional) 948 :param kwargs: all AND filter criteria 949 :return: a new MDataFrame with the filter applied 950 """ 951 effective_filter = dict(self.filter_criteria) 952 filter_criteria = self._get_filter_criteria(*args, **kwargs) 953 if '$and' in effective_filter: 954 effective_filter['$and'].extend(filter_criteria.get('$and')) 955 else: 956 effective_filter.update(filter_criteria) 957 coll = FilteredCollection(self.collection, query=effective_filter) 958 return self._clone(collection=coll, query=effective_filter)
959
[docs] 960 def create_index(self, keys, **kwargs): 961 """ 962 create and index the easy way 963 """ 964 keys, kwargs = MongoQueryOps().make_index(keys) 965 result = ensure_index(self.collection, keys, **kwargs) 966 return result
967 968 def list_indexes(self): 969 """ 970 list all indices in database 971 """ 972 return cursor_to_dataframe(self.collection.list_indexes()) 973 974 def iterchunks(self, chunksize=100): 975 """ 976 return an iterator 977 978 Args: 979 chunksize (int): number of rows in each chunk 980 981 Returns: 982 a dataframe of max. length chunksize 983 """ 984 chunksize = int(chunksize) 985 i = 0 986 while True: 987 chunkdf = self.skip(i).head(chunksize).value 988 if len(chunkdf) == 0: 989 break 990 i += chunksize 991 yield chunkdf 992 993 def itertuples(self, chunksize=1000): 994 chunksize = int(chunksize) 995 __doc__ = pd.DataFrame.itertuples.__doc__ 996 997 for chunkdf in self.iterchunks(chunksize=chunksize): 998 for row in chunkdf.iterrows(): 999 yield row 1000 1001 def iterrows(self, chunksize=1000): 1002 chunksize = int(chunksize) 1003 __doc__ = pd.DataFrame.iterrows.__doc__ 1004 1005 for chunkdf in self.iterchunks(chunksize=chunksize): 1006 if isinstance(chunkdf, pd.DataFrame): 1007 for row in chunkdf.iterrows(): 1008 yield row 1009 else: 1010 # Series does not have iterrows 1011 for i in range(0, len(chunkdf), chunksize): 1012 yield chunkdf.iloc[i:i + chunksize] 1013 1014 def iteritems(self): 1015 if not hasattr(pd.DataFrame, 'iteritems'): 1016 raise NotImplementedError('MDataFrame.iteritems has been removed since Pandas 2.0. Use .items instead.') 1017 __doc__ = pd.DataFrame.iteritems.__doc__ 1018 return self.items() 1019 1020 def items(self): 1021 __doc__ = pd.DataFrame.items.__doc__ 1022 1023 for col in self.columns: 1024 yield col, self[col].value 1025 1026 @property 1027 def loc(self): 1028 """ 1029 Access by index 1030 1031 Use as mdf.loc[index_value] 1032 1033 :return: MLocIndexer 1034 """ 1035 self._evaluated = None 1036 indexer = MLocIndexer(self) 1037 return indexer 1038 1039 @property 1040 def iloc(self): 1041 self._evaluated = None 1042 indexer = MPosIndexer(self) 1043 return indexer 1044 1045 def rows(self, start=None, end=None, chunksize=1000): 1046 # equivalent to .iloc[start:end].iteritems(), 1047 start, end, chunksize = (int(v) for v in (start, end, chunksize)) 1048 return self.iloc[slice(start, end)].iterchunks(chunksize) 1049 1050 def __repr__(self): 1051 kwargs = ', '.join('{}={}'.format(k, v) for k, v in self._getcopy_kwargs().items()) 1052 return "MDataFrame(collection={collection.name}, {kwargs})".format(collection=self.collection, 1053 kwargs=kwargs)
1054 1055
[docs] 1056class MSeries(MDataFrame): 1057 """ 1058 Series implementation for MDataFrames 1059 1060 behaves like a DataFrame but limited to one column. 1061 """ 1062 1063 def __init__(self, *args, **kwargs): 1064 super(MSeries, self).__init__(*args, **kwargs) 1065 # true if only unique values apply 1066 self.is_unique = False 1067 # apply mixins 1068 self._applyto = str(self.__class__) 1069 self._apply_mixins(*args, **kwargs) 1070 1071 def __getitem__(self, cols_or_slice): 1072 if isinstance(cols_or_slice, Filter): 1073 return MSeries(self.collection, columns=self.columns, 1074 query=cols_or_slice.query) 1075 return super(MSeries, self).__getitem__(cols_or_slice) 1076 1077 @property 1078 def name(self): 1079 return self.columns[0] 1080
[docs] 1081 def unique(self): 1082 """ 1083 return the unique set of values for the series 1084 1085 :return: MSeries 1086 """ 1087 self.is_unique = True 1088 return self
1089 1090 def _get_cursor(self): 1091 if self.is_unique: 1092 # this way indexes get applied 1093 cursor = self.collection.distinct(make_tuple(self.columns)[0]) 1094 else: 1095 cursor = super(MSeries, self)._get_cursor() 1096 return cursor 1097 1098 @property 1099 def value(self): 1100 """ 1101 return the value of the series 1102 1103 this is a Series unless unique() was called. If unique() 1104 only distinct values are returned as an array, matching 1105 the behavior of a Series 1106 1107 :return: pandas.Series 1108 """ 1109 cursor = self._get_cursor() 1110 column = make_tuple(self.columns)[0] 1111 if self.is_unique: 1112 # the .distinct() cursor returns a list of values 1113 # this is to make sure we return the same thing as pandas 1114 val = [v for v in cursor] 1115 else: 1116 val = self._get_dataframe_from_cursor(cursor) 1117 val = val[column] 1118 val.name = self.name 1119 if len(val) == 1 and self.from_loc_indexer: 1120 val = val.iloc[0] 1121 if self.auto_inspect: 1122 self._inspect_cache.append(self.inspect(explain=True, cursor=cursor, raw=True)) 1123 if self._preparefn: 1124 df = self._preparefn(val) 1125 return val 1126 1127 def __repr__(self): 1128 kwargs = ', '.join('{}={}'.format(k, v) for k, v in self._getcopy_kwargs().items()) 1129 return "MSeries(collection={collection.name}, {kwargs})".format(collection=self.collection, 1130 kwargs=kwargs) 1131 1132 @property 1133 def shape(self): 1134 return len(self),
1135 1136 1137def _mdf_remake(collection): 1138 # recreate a pickled MDF 1139 mdf = MDataFrame(collection) 1140 return mdf