1from __future__ import absolute_import
2
3from uuid import uuid4
4
5import numpy as np
6import pandas as pd
7from bson import Code
8from numpy import isscalar
9from pymongo.collection import Collection
10
11from omegaml.store import qops
12from omegaml.store.filtered import FilteredCollection
13from omegaml.store.query import Filter, MongoQ
14from omegaml.store.queryops import MongoQueryOps
15from omegaml.util import make_tuple, make_list, restore_index, \
16 cursor_to_dataframe, restore_index_columns_order, PickableCollection, extend_instance, json_normalize, ensure_index
17
18INSPECT_CACHE = []
19
20
[docs]
21class MGrouper(object):
22 """
23 a Grouper for MDataFrames
24 """
25 STATS_MAP = {
26 'std': 'stdDevSamp',
27 'mean': 'avg',
28 }
29
30 def __init__(self, mdataframe, collection, columns, sort=True):
31 self.mdataframe = mdataframe
32 self.collection = collection
33 self.columns = make_tuple(columns)
34 self.should_sort = sort
35
36 def __getattr__(self, attr):
37 if attr in self.columns:
38 return MSeriesGroupby(self, self.collection, attr)
39
40 def statfunc():
41 columns = self.columns or self._non_group_columns()
42 return self.agg({col: attr for col in columns})
43
44 return statfunc
45
46 def __getitem__(self, item):
47 return self.__getattr__(item)
48
[docs]
49 def agg(self, specs):
50 """
51 shortcut for .aggregate
52 """
53 return self.aggregate(specs)
54
[docs]
55 def aggregate(self, specs, **kwargs):
56 """
57 aggregate by given specs
58
59 See the following link for a list of supported operations.
60 https://docs.mongodb.com/manual/reference/operator/aggregation/group/
61
62 :param specs: a dictionary of { column : function | list[functions] }
63 pairs.
64 """
65
66 def add_stats(specs, column, stat):
67 specs['%s_%s' % (column, stat)] = {
68 '$%s' % MGrouper.STATS_MAP.get(stat, stat): '$%s' % column}
69
70 # generate $group command
71 _specs = {}
72 for column, stats in specs.items():
73 stats = make_tuple(stats)
74 for stat in stats:
75 add_stats(_specs, column, stat)
76 groupby = qops.GROUP(columns=self.columns,
77 **_specs)
78 # execute and return a dataframe
79 pipeline = self._amend_pipeline([groupby])
80 data = self.collection.aggregate(pipeline, allowDiskUse=True)
81
82 def get_data():
83 # we need this to build a pipeline for from_records
84 # to process, otherwise the cursor will be exhausted already
85 for group in data:
86 _id = group.pop('_id')
87 if isinstance(_id, dict):
88 group.update(_id)
89 yield group
90
91 df = pd.DataFrame.from_records(get_data())
92 columns = make_list(self.columns)
93 if columns:
94 df = df.set_index(columns, drop=True)
95 return df
96
97 def _amend_pipeline(self, pipeline):
98 """ amend pipeline with default ops on coll.aggregate() calls """
99 if self.should_sort:
100 sort = qops.SORT(**dict(qops.make_sortkey('_id')))
101 pipeline.append(sort)
102 return pipeline
103
104 def _non_group_columns(self):
105 """ get all columns in mdataframe that is not in columns """
106 return [col for col in self.mdataframe.columns
107 if col not in self.columns and col != '_id'
108 and not col.startswith('_idx')
109 and not col.startswith('_om#')]
110
111 def _count(self):
112 count_columns = self._non_group_columns()
113 if len(count_columns) == 0:
114 count_columns.append('_'.join(self.columns) + '_count')
115 groupby = {
116 "$group": {
117 "_id": {k: "$%s" % k for k in self.columns},
118 }
119 }
120 for k in count_columns:
121 groupby['$group']['%s' % k] = {"$sum": 1}
122 pipeline = self._amend_pipeline([groupby])
123 if self.should_sort:
124 sort = qops.SORT(**dict(qops.make_sortkey('_id')))
125 pipeline.append(sort)
126 return list(self.collection.aggregate(pipeline, allowDiskUse=True))
127
[docs]
128 def count(self):
129 """ return counts by group columns """
130 counts = self._count()
131 # remove mongo object _id
132 for group in counts:
133 group.update(group.pop('_id'))
134 # transform results to dataframe, then return as pandas would
135 resultdf = pd.DataFrame(counts).set_index(make_list(self.columns),
136 drop=True)
137 return resultdf
138
139 def __iter__(self):
140 """ for each group returns the key and a Filter object"""
141 # reduce count to only one column
142 groups = getattr(self, self.columns[0])._count()
143 for group in groups:
144 keys = group.get('_id')
145 data = self.mdataframe._clone(query=keys)
146 yield keys, data
147
148
[docs]
149class MLocIndexer(object):
150 """
151 implements the LocIndexer for MDataFrames
152 """
153
154 def __init__(self, mdataframe, positional=False):
155 self.mdataframe = mdataframe
156 # if positional, any loc[spec] will be applied on the rowid only
157 self.positional = positional
158 # indicator will be set true if loc specs are from a range type (list, tuple, np.ndarray)
159 self._from_range = False
160
[docs]
161 def __getitem__(self, specs):
162 """
163 access by index
164
165 use as mdf.loc[specs] where specs is any of
166
167 * a list or tuple of scalar index values, e.g. .loc[(1,2,3)]
168 * a slice of values e.g. .loc[1:5]
169 * a list of slices, e.g. .loc[1:5, 2:3]
170
171 :return: the sliced part of the MDataFrame
172 """
173 filterq, projection = self._get_filter(specs)
174 df = self.mdataframe
175 if filterq:
176 df = self.mdataframe.query(filterq)
177 df.from_loc_indexer = True
178 df.from_loc_range = self._from_range
179 if projection:
180 df = df[projection]
181 if isinstance(self.mdataframe, MSeries):
182 df = df._as_mseries(df.columns[0])
183 if getattr(df, 'immediate_loc', False):
184 df = df.value
185 return df
186
187 def __setitem__(self, specs, value):
188 raise NotImplementedError()
189
190 def _get_filter(self, specs):
191 filterq = []
192 projection = None
193 if self.positional:
194 idx_cols = ['_om#rowid']
195 else:
196 idx_cols = self.mdataframe._get_frame_index()
197 flt_kwargs = {}
198 enumerable_types = (list, tuple, np.ndarray)
199 if isinstance(specs, np.ndarray):
200 specs = specs.tolist()
201 if (isinstance(specs, enumerable_types)
202 and isscalar(specs[0]) and len(idx_cols) == 1
203 and not any(isinstance(s, slice) for s in specs)):
204 # single column index with list of scalar values
205 if (self.positional and isinstance(specs, tuple) and len(specs) == 2
206 and all(isscalar(v) for v in specs)):
207 # iloc[int, int] is a cell access
208 flt_kwargs[idx_cols[0]] = specs[0]
209 projection = self._get_projection(specs[1])
210 else:
211 flt_kwargs['{}__in'.format(idx_cols[0])] = specs
212 self._from_range = True
213 elif isinstance(specs, (int, str)):
214 flt_kwargs[idx_cols[0]] = specs
215 else:
216 specs = make_tuple(specs)
217 # list/tuple of slices or scalar values, or MultiIndex
218 for i, spec in enumerate(specs):
219 if i < len(idx_cols):
220 col = idx_cols[i]
221 if isinstance(spec, slice):
222 self._from_range = True
223 start, stop = spec.start, spec.stop
224 if start is not None:
225 flt_kwargs['{}__gte'.format(col)] = start
226 if stop is not None:
227 if isinstance(stop, int):
228 stop -= int(self.positional)
229 flt_kwargs['{}__lte'.format(col)] = stop
230 elif isinstance(spec, enumerable_types) and isscalar(spec[0]):
231 self._from_range = True
232 # single column index with list of scalar values
233 # -- convert to list for PyMongo serialization
234 if isinstance(spec, np.ndarray):
235 spec = spec.tolist()
236 flt_kwargs['{}__in'.format(col)] = spec
237 elif isscalar(col):
238 flt_kwargs[col] = spec
239 else:
240 # we're out of index columns, let's look at columns
241 cur_proj = self._get_projection(spec)
242 # if we get a single column, that's the projection
243 # if we had a single column, now need to add more, create a list
244 # if we have a list already, extend that
245 if projection is None:
246 projection = cur_proj
247 elif isinstance(projection, list):
248 projection.extend(cur_proj)
249 else:
250 projection = [projection, cur_proj]
251 if flt_kwargs:
252 filterq.append(MongoQ(**flt_kwargs))
253 finalq = None
254 for q in filterq:
255 if finalq:
256 finalq |= q
257 else:
258 finalq = q
259 return finalq, projection
260
261 def _get_projection(self, spec):
262 columns = self.mdataframe.columns
263 if np.isscalar(spec):
264 return [spec]
265 if isinstance(spec, (tuple, list)):
266 assert all(columns.index(col) for col in columns)
267 return spec
268 if isinstance(spec, slice):
269 start, stop = spec.start, spec.stop
270 if all(isinstance(v, int) for v in (start, stop)):
271 start, stop, step = spec.indices(len(columns))
272 else:
273 start = columns.index(start) if start is not None else 0
274 stop = columns.index(stop) + 1 if stop is not None else len(columns)
275 return columns[slice(start, stop)]
276 raise IndexError
277
278
[docs]
279class MPosIndexer(MLocIndexer):
280 """
281 implements the position-based indexer for MDataFrames
282 """
283
284 def __init__(self, mdataframe):
285 super(MPosIndexer, self).__init__(mdataframe, positional=True)
286
287 def _get_projection(self, spec):
288 columns = self.mdataframe.columns
289 if np.isscalar(spec):
290 return columns[spec]
291 if isinstance(spec, (tuple, list)):
292 return [col for i, col in enumerate(spec) if i in spec]
293 if isinstance(spec, slice):
294 start, stop = slice.start, slice.stop
295 if start and not isinstance(start, int):
296 start = 0
297 if stop and not isinstance(stop, int):
298 # sliced ranges are inclusive
299 stop = len(columns)
300 return columns[slice(start, stop)]
301 raise IndexError
302
303
304class MSeriesGroupby(MGrouper):
305 """
306 like a MGrouper but limited to one column
307 """
308
309 def count(self):
310 """
311 return series count
312
313 :return: counts by group
314 """
315 # MGrouper will insert a _count column, see _count(). we remove
316 # that column again and return a series named as the group column
317 resultdf = super(MSeriesGroupby, self).count()
318 count_column = [col for col in resultdf.columns
319 if col.endswith('_count')][0]
320 new_column = count_column.replace('_count', '')
321 resultdf = resultdf.rename(columns={count_column: new_column})
322 return resultdf[new_column]
323
324
[docs]
325class MDataFrame(object):
326 """
327 A DataFrame for mongodb
328
329 Performs out-of-core, lazy computOation on a mongodb cluster.
330 Behaves like a pandas DataFrame. Actual results are returned
331 as pandas DataFrames.
332 """
333
334 STATFUNCS = ['mean', 'std', 'min', 'max', 'sum', 'var']
335
336 def __init__(self, collection, columns=None, query=None,
337 limit=None, skip=None, sort_order=None,
338 force_columns=None, immediate_loc=False, auto_inspect=False,
339 normalize=False, raw=False,
340 parser=None,
341 preparefn=None, from_loc_range=False, metadata=None, **kwargs):
342 self.collection = PickableCollection(collection)
343 # columns in frame
344 self.columns = make_tuple(columns) if columns else self._get_fields(raw=raw)
345 self.columns = [str(col) for col in self.columns]
346 # columns to sort by, defaults to not sorted
347 self.sort_order = sort_order
348 # top n documents to fetch
349 self.head_limit = limit
350 # top n documents to skip before returning
351 self.skip_topn = skip
352 # filter criteria
353 self.filter_criteria = query or {}
354 # force columns -- on output add columns not present
355 self.force_columns = force_columns or []
356 # was this created from the loc indexer?
357 self.from_loc_indexer = kwargs.get('from_loc_indexer', False)
358 # was the loc index used a range? Else a single value
359 self.from_loc_range = from_loc_range
360 # setup query for filter criteries, if provided
361 if self.filter_criteria:
362 # make sure we have a filtered collection with the criteria given
363 if isinstance(self.filter_criteria, dict):
364 self.query_inplace(**self.filter_criteria)
365 elif isinstance(self.filter_criteria, Filter):
366 self.query_inplace(self.filter_criteria)
367 else:
368 raise ValueError('Invalid query specification of type {}'.format(type(self.filter_criteria)))
369 # if immediate_loc is True, .loc and .iloc always evaluate
370 self.immediate_loc = immediate_loc
371 # __array__ will return this value if it is set, set it otherwise
372 self._evaluated = None
373 # set true to automatically capture inspects on .value. retrieve using .inspect(cached=True)
374 self.auto_inspect = auto_inspect
375 self._inspect_cache = INSPECT_CACHE
376 # apply mixins
377 self._applyto = str(self.__class__)
378 self._apply_mixins()
379 # parser to parse documents to dataframe
380 self._parser = json_normalize if normalize else parser
381 # prepare function to be applied just before returning from .value
382 self._preparefn = preparefn
383 # keep technical fields like _id, _idx etc
384 self._raw = raw
385 # metadata stored by omegaml (equiv. of metadata.kind_meta)
386 self.metadata = metadata or dict()
387
388 def _apply_mixins(self, *args, **kwargs):
389 """
390 apply mixins in defaults.OMEGA_MDF_MIXINS
391 """
392 from omegaml import settings
393 defaults = settings()
394 for mixin, applyto in defaults.OMEGA_MDF_MIXINS:
395 if any(v in self._applyto for v in applyto.split(',')):
396 extend_instance(self, mixin, *args, **kwargs)
397
398 def __getstate__(self):
399 # pickle support. note that the hard work is done in PickableCollection
400 data = dict(self.__dict__)
401 data.update(_evaluated=None)
402 data.update(_inspect_cache=None)
403 data.update(auto_inspect=self.auto_inspect)
404 data.update(_preparefn=self._preparefn)
405 data.update(_parser=self._parser)
406 data.update(_raw=self._raw)
407 data.update(collection=self.collection)
408 return data
409
410 def __reduce__(self):
411 state = self.__getstate__()
412 args = self.collection,
413 return _mdf_remake, args, state
414
415 def __setstate__(self, state):
416 # pickle support. note that the hard work is done in PickableCollection
417 for k, v in state.items():
418 setattr(self, k, v)
419
420 def _getcopy_kwargs(self, without=None):
421 """ return all parameters required on a copy of this MDataFrame """
422 kwargs = dict(columns=self.columns,
423 sort_order=self.sort_order,
424 limit=self.head_limit,
425 skip=self.skip_topn,
426 from_loc_indexer=self.from_loc_indexer,
427 from_loc_range=self.from_loc_range,
428 immediate_loc=self.immediate_loc,
429 metadata=self.metadata,
430 query=self.filter_criteria,
431 auto_inspect=self.auto_inspect,
432 parser=self._parser,
433 preparefn=self._preparefn)
434 [kwargs.pop(k) for k in make_tuple(without or [])]
435 return kwargs
436
437 def __array__(self, dtype=None):
438 # FIXME inefficient. make MDataFrame a drop-in replacement for any numpy ndarray
439 # this evaluates every single time
440 if self._evaluated is None:
441 self._evaluated = array = self.value.values
442 else:
443 array = self._evaluated
444 return array
445
446 def __getattr__(self, attr):
447 if attr in MDataFrame.STATFUNCS:
448 return self.statfunc(attr)
449 if attr in self.columns:
450 kwargs = self._getcopy_kwargs()
451 kwargs.update(columns=attr)
452 return MSeries(self.collection, **kwargs)
453 raise AttributeError(attr)
454
455 def __getitem__(self, cols_or_slice):
456 """
457 select and project by column, columns, slice, masked-style filter
458
459 Masked-style filters work similar to pd.DataFrame/Series masks
460 but do not actually return masks but an instance of Filter. A
461 Filter is a delayed evaluation on the data frame.
462
463 # select all rows where any column is == 5
464 mdf = MDataFrame(coll)
465 flt = mdf == 5
466 mdf[flt]
467 =>
468
469 :param cols_or_slice: single column (str), multi-columns (list),
470 slice to select columns or a masked-style
471 :return: filtered MDataFrame or MSeries
472 """
473 if isinstance(cols_or_slice, str):
474 # column name => MSeries
475 return self._as_mseries(cols_or_slice)
476 elif isinstance(cols_or_slice, int):
477 # column number => MSeries
478 column = self.columns[cols_or_slice]
479 return self._as_mseries(column)
480 elif isinstance(cols_or_slice, (tuple, list)):
481 # list of column names => MDataFrame subset on columns
482 kwargs = self._getcopy_kwargs()
483 kwargs.update(columns=cols_or_slice)
484 return MDataFrame(self.collection, **kwargs)
485 elif isinstance(cols_or_slice, Filter):
486 kwargs = self._getcopy_kwargs()
487 kwargs.update(query=cols_or_slice.query)
488 return MDataFrame(self.collection, **kwargs)
489 elif isinstance(cols_or_slice, np.ndarray):
490 return self.iloc[cols_or_slice]
491 raise ValueError('unknown accessor type %s' % type(cols_or_slice))
492
493 def __setitem__(self, column, value):
494 # True for any scalar type, numeric, bool, string
495 if np.isscalar(value):
496 result = self.collection.update_many(filter=self.filter_criteria,
497 update=qops.SET(column, value))
498 self.columns.append(column)
499 return self
500
501 def _clone(self, collection=None, **kwargs):
502 # convenience method to clone itself with updates
503 collection = collection if collection is not None else self.collection
504 return self.__class__(collection, **kwargs,
505 **self._getcopy_kwargs(without=list(kwargs.keys())))
506
507 def statfunc(self, stat):
508 aggr = MGrouper(self, self.collection, [], sort=False)
509 return getattr(aggr, stat)
510
[docs]
511 def groupby(self, columns, sort=True):
512 """
513 Group by a given set of columns
514
515 :param columns: the list of columns
516 :param sort: if True sort by group key
517 :return: MGrouper
518 """
519 return MGrouper(self, self.collection, columns, sort=sort)
520
521 def _get_fields(self, raw=False):
522 result = []
523 doc = self.collection.find_one()
524 if doc is not None:
525 if raw:
526 result = list(doc.keys())
527 else:
528 result = [str(col) for col in doc.keys()
529 if col != '_id'
530 and not col.startswith('_idx')
531 and not col.startswith('_om#')]
532 return result
533
534 def _get_frame_index(self):
535 """ return the dataframe's index columns """
536 doc = self.collection.find_one()
537 if doc is None:
538 result = []
539 else:
540 result = restore_index_columns_order(doc.keys())
541 return result
542
543 def _get_frame_om_fields(self):
544 """ return the dataframe's omega special fields columns """
545 doc = self.collection.find_one()
546 if doc is None:
547 result = []
548 else:
549 result = [k for k in list(doc.keys()) if k.startswith('_om#')]
550 return result
551
552 def _as_mseries(self, column):
553 kwargs = self._getcopy_kwargs()
554 kwargs.update(columns=make_tuple(column))
555 return MSeries(self.collection, **kwargs)
556
[docs]
557 def inspect(self, explain=False, cached=False, cursor=None, raw=False):
558 """
559 inspect this dataframe's actual mongodb query
560
561 :param explain: if True explains access path
562 """
563 if not cached:
564 if isinstance(self.collection, FilteredCollection):
565 query = self.collection.query
566 else:
567 query = '*',
568 if explain:
569 cursor = cursor or self._get_cursor()
570 explain = cursor.explain()
571 data = {
572 'projection': self.columns,
573 'query': query,
574 'explain': explain or 'specify explain=True'
575 }
576 else:
577 data = self._inspect_cache
578 if not (raw or explain):
579 data = pd.DataFrame(json_normalize(data))
580 return data
581
582 def count(self):
583 """
584 projected number of rows when resolving
585 """
586 nrows = len(self)
587 counts = pd.Series({
588 col: nrows
589 for col in self.columns}, index=self.columns)
590 return counts
591
[docs]
592 def __len__(self):
593 """
594 the projected number of rows when resolving
595 """
596 # we reduce to just 1 column to reduce speed
597 short = self._clone()
598 short = short[self.columns[0]] if self.columns else short
599 return sum(1 for d in short._get_cursor())
600
601 @property
602 def shape(self):
603 """
604 return shape of dataframe
605 """
606 return len(self), len(self.columns)
607
608 @property
609 def ndim(self):
610 return len(self.shape)
611
612 @property
613 def value(self):
614 """
615 resolve the query and return a Pandas DataFrame
616
617 :return: the result of the query as a pandas DataFrame
618 """
619 cursor = self._get_cursor()
620 df = self._get_dataframe_from_cursor(cursor)
621 if self.auto_inspect:
622 self._inspect_cache.append(self.inspect(explain=True, cursor=cursor, raw=True))
623 # this ensures the equiv. of pandas df.loc[n] is a Series
624 if self.from_loc_indexer:
625 if len(df) == 1 and not self.from_loc_range:
626 idx = df.index
627 df = df.T
628 df = df[df.columns[0]]
629 if df.ndim == 1 and len(df) == 1 and not isinstance(idx, pd.MultiIndex):
630 # single row single dimension, numeric index only
631 df = df.iloc[0]
632 elif (df.ndim == 1 or df.shape[1] == 1) and not self.from_loc_range:
633 df = df[df.columns[0]]
634 if self._preparefn:
635 df = self._preparefn(df)
636 return df
637
638 def reset(self):
639 # TODO if head(), tail(), query(), .loc/iloc should return a new MDataFrame instance to avoid having a reset need
640 self.head_limit = None
641 self.skip_topn = None
642 self.filter_criteria = {}
643 self.force_columns = []
644 self.sort_order = None
645 self.from_loc_indexer = False
646 return self
647
648 def _get_dataframe_from_cursor(self, cursor):
649 """
650 from the given cursor return a DataFrame
651 """
652 df = cursor_to_dataframe(cursor, parser=self._parser)
653 df = self._restore_dataframe_proper(df)
654 return df
655
656 @property
657 def _index_meta(self):
658 return self.metadata.get('idx_meta') or dict()
659
660 def _restore_dataframe_proper(self, df):
661 df = restore_index(df, self._index_meta)
662 if '_id' in df.columns and not self._raw:
663 df.drop('_id', axis=1, inplace=True)
664 if self.force_columns:
665 missing = set(self.force_columns) - set(self.columns)
666 for col in missing:
667 df[col] = np.nan
668 return df
669
670 def _get_cursor(self):
671 projection = make_tuple(self.columns)
672 projection += make_tuple(self._get_frame_index())
673 if not self.sort_order:
674 # implicit sort
675 projection += make_tuple(self._get_frame_om_fields())
676 cursor = self.collection.find(projection=projection)
677 if self.sort_order:
678 cursor.sort(qops.make_sortkey(make_tuple(self.sort_order)))
679 if self.head_limit:
680 cursor.limit(self.head_limit)
681 if self.skip_topn:
682 cursor.skip(self.skip_topn)
683 return cursor
684
[docs]
685 def sort(self, columns):
686 """
687 sort by specified columns
688
689 :param columns: str of single column or a list of columns. Sort order
690 is specified as the + (ascending) or - (descending)
691 prefix to the column name. Default sort order is
692 ascending.
693 :return: the MDataFrame
694 """
695 self._evaluated = None
696 self.sort_order = make_tuple(columns)
697 return self
698
[docs]
699 def head(self, limit=10):
700 """
701 return up to limit numbers of rows
702
703 :param limit: the number of rows to return. Defaults to 10
704 :return: the MDataFrame
705 """
706 return self._clone(limit=limit)
707
708 def tail(self, limit=10):
709 """
710 return up to limit number of rows from last inserted values
711
712 :param limit:
713 :return:
714 """
715 tail_n = self.skip(len(self) - limit)
716 return self._clone(skip=tail_n)
717
[docs]
718 def skip(self, topn):
719 """
720 skip the topn number of rows
721
722 :param topn: the number of rows to skip.
723 :return: the MDataFrame
724 """
725 return self._clone(skip=topn)
726
[docs]
727 def merge(self, right, on=None, left_on=None, right_on=None,
728 how='inner', target=None, suffixes=('_x', '_y'),
729 sort=False, inspect=False, filter=None):
730 """
731 merge this dataframe with another dataframe. only left outer joins
732 are currently supported. the output is saved as a new collection,
733 target name (defaults to a generated name if not specified).
734
735 :param right: the other MDataFrame
736 :param on: the list of key columns to merge by
737 :param left_on: the list of the key columns to merge on this dataframe
738 :param right_on: the list of the key columns to merge on the other
739 dataframe
740 :param how: the method to merge. supported are left, inner, right.
741 Defaults to inner
742 :param target: the name of the collection to store the merge results
743 in. If not provided a temporary name will be created.
744 :param suffixes: the suffixes to apply to identical left and right
745 columns
746 :param sort: if True the merge results will be sorted. If False the
747 MongoDB natural order is implied.
748 :returns: the MDataFrame to the target MDataFrame
749 """
750 # validate input
751 supported_how = ["left", 'inner', 'right']
752 assert how in supported_how, "only %s merges are currently supported" % supported_how
753 for key in [on, left_on, right_on]:
754 if key:
755 assert isinstance(
756 key, str), "only single column merge keys are supported (%s)" % key
757 if isinstance(right, (Collection, PickableCollection, FilteredCollection)):
758 right = MDataFrame(right)
759 assert isinstance(
760 right, MDataFrame), "both must be MDataFrames, got right=%" % type(right)
761 if how == 'right':
762 # A right B == B left A
763 return right.merge(self, on=on, left_on=right_on, right_on=left_on,
764 how='left', target=target, suffixes=suffixes)
765 # generate lookup parameters
766 on = on or '_id'
767 right_name = self._get_collection_name_of(right, right)
768 target_name = self._get_collection_name_of(
769 target, '_temp.merge.%s' % uuid4().hex)
770 target_field = (
771 "%s_%s" % (right_name.replace('.', '_'), right_on or on))
772 """
773 TODO enable filter criteria on right dataframe. requires changing LOOKUP syntax from
774 equitly to arbitray match
775
776 if right.filter_criteria:
777 right_filter = [qops.MATCH(self._get_filter_criteria(**right.filter_criteria))]
778 else:
779 right_filter = None
780 """
781 right_filter = None
782 lookup = qops.LOOKUP(right_name,
783 key=on,
784 left_key=left_on,
785 right_key=right_on,
786 target=target_field)
787 # unwind merged documents from arrays to top-level document fields
788 unwind = qops.UNWIND(target_field, preserve=how != 'inner')
789 # get all fields from left, right
790 project = {}
791 for left_col in self.columns:
792 source_left_col = left_col
793 if left_col == '_id':
794 project[left_col] = 1
795 continue
796 if left_col.startswith('_idx'):
797 continue
798 if left_col.startswith('_om#'):
799 continue
800 if left_col != (on or left_on) and left_col in right.columns:
801 left_col = '%s%s' % (left_col, suffixes[0])
802 project[left_col] = "$%s" % source_left_col
803 for right_col in right.columns:
804 if right_col == '_id':
805 continue
806 if right_col.startswith('_idx'):
807 continue
808 if right_col.startswith('_om#'):
809 continue
810 if right_col == (on or right_on) and right_col == (on or left_on):
811 # if the merge field is the same in both frames, we already
812 # have it from left
813 continue
814 if right_col in self.columns:
815 left_col = '%s%s' % (right_col, suffixes[1])
816 else:
817 left_col = '%s' % right_col
818 project[left_col] = '$%s.%s' % (target_field, right_col)
819 expected_columns = list(project.keys())
820 if '_id' not in project:
821 project['_id'] = 0 # never copy objectids to avoid duplicate keys, unless requested
822 project = {"$project": project}
823 # store merged documents and return an MDataFrame to it
824 out = qops.OUT(target_name)
825 pipeline = [lookup, unwind, project]
826 if filter:
827 query = qops.MATCH(self._get_filter_criteria(**filter))
828 pipeline.append(query)
829 if sort:
830 sort_cols = make_list(on or [left_on, right_on])
831 sort_key = qops.make_sortkey(sort_cols)
832 sort = qops.SORT(**dict(sort_key))
833 pipeline.append(sort)
834 pipeline.append(out)
835 if inspect:
836 result = pipeline
837 else:
838 result = self.collection.aggregate(pipeline, allowDiskUse=True)
839 result = MDataFrame(self.collection.database[target_name],
840 force_columns=expected_columns)
841 return result
842
843 def append(self, other):
844 if isinstance(other, Collection):
845 other = MDataFrame(other)
846 assert isinstance(
847 other, MDataFrame), "both must be MDataFrames, got other={}".format(type(other))
848 outname = self.collection.name
849 mrout = {
850 'merge': outname,
851 'nonAtomic': True,
852 }
853 mapfn = Code("""
854 function() {
855 this._id = ObjectId();
856 if(this['_om#rowid']) {
857 this['_om#rowid'] += %s;
858 }
859 emit(this._id, this);
860 }
861 """ % len(self))
862 reducefn = Code("""
863 function(key, value) {
864 return value;
865 }
866 """)
867 finfn = Code("""
868 function(key, value) {
869 return value;
870 }
871 """)
872 other.collection.map_reduce(mapfn, reducefn, mrout, finalize=finfn, jsMode=True)
873 unwind = {
874 "$replaceRoot": {
875 "newRoot": {
876 "$ifNull": ["$value", "$$CURRENT"],
877 }
878 }
879 }
880 output = qops.OUT(outname)
881 pipeline = [unwind, output]
882 self.collection.aggregate(pipeline, allowDiskUse=True)
883 return self
884
885 def _get_collection_name_of(self, some, default=None):
886 """
887 determine the collection name of the given parameter
888
889 returns the collection name if some is a MDataFrame, a Collection
890 or a string_type. Otherwise returns default
891 """
892 if isinstance(some, MDataFrame):
893 name = some.collection.name
894 elif isinstance(some, Collection):
895 name = some.name
896 else:
897 name = default
898 return name
899
900 def _get_filter_criteria(self, *args, **kwargs):
901 """
902 return mongo query from filter specs
903
904 this uses a Filter to produce the query from the kwargs.
905
906 :param args: a Q object or logical combination of Q objects
907 (optional)
908 :param kwargs: all AND filter criteria
909 """
910 if len(args) > 0:
911 q = args[0]
912 if isinstance(q, MongoQ):
913 filter_criteria = Filter(self.collection, q).query
914 elif isinstance(q, Filter):
915 filter_criteria = Filter(self.collection, q.q).query
916 else:
917 filter_criteria = Filter(self.collection, **kwargs).query
918 return filter_criteria
919
[docs]
920 def query_inplace(self, *args, **kwargs):
921 """
922 filters this MDataFrame and returns it.
923
924 Any subsequent operation on the dataframe will have the filter
925 applied. To reset the filter call .reset() without arguments.
926
927 :param args: a Q object or logical combination of Q objects
928 (optional)
929 :param kwargs: all AND filter criteria
930 :return: self
931 """
932 self._evaluated = None
933 self.filter_criteria = self._get_filter_criteria(*args, **kwargs)
934 self.collection = FilteredCollection(
935 self.collection, query=self.filter_criteria)
936 return self
937
[docs]
938 def query(self, *args, **kwargs):
939 """
940 return a new MDataFrame with a filter criteria
941
942 Any subsequent operation on the new dataframe will have the filter
943 applied. To reset the filter call .reset() without arguments.
944
945 Note: Unlike pandas DataFrames, a filtered MDataFrame operates
946 on the same collection as the original DataFrame
947
948 :param args: a Q object or logical combination of Q objects
949 (optional)
950 :param kwargs: all AND filter criteria
951 :return: a new MDataFrame with the filter applied
952 """
953 effective_filter = dict(self.filter_criteria)
954 filter_criteria = self._get_filter_criteria(*args, **kwargs)
955 if '$and' in effective_filter:
956 effective_filter['$and'].extend(filter_criteria.get('$and'))
957 else:
958 effective_filter.update(filter_criteria)
959 coll = FilteredCollection(self.collection, query=effective_filter)
960 return self._clone(collection=coll, query=effective_filter)
961
[docs]
962 def create_index(self, keys, **kwargs):
963 """
964 create and index the easy way
965 """
966 keys, kwargs = MongoQueryOps().make_index(keys)
967 result = ensure_index(self.collection, keys, **kwargs)
968 return result
969
970 def list_indexes(self):
971 """
972 list all indices in database
973 """
974 return cursor_to_dataframe(self.collection.list_indexes())
975
976 def iterchunks(self, chunksize=100):
977 """
978 return an iterator
979
980 Args:
981 chunksize (int): number of rows in each chunk
982
983 Returns:
984 a dataframe of max. length chunksize
985 """
986 chunksize = int(chunksize)
987 i = 0
988 while True:
989 chunkdf = self.skip(i).head(chunksize).value
990 if len(chunkdf) == 0:
991 break
992 i += chunksize
993 yield chunkdf
994
995 def itertuples(self, chunksize=1000):
996 chunksize = int(chunksize)
997 __doc__ = pd.DataFrame.itertuples.__doc__
998
999 for chunkdf in self.iterchunks(chunksize=chunksize):
1000 for row in chunkdf.iterrows():
1001 yield row
1002
1003 def iterrows(self, chunksize=1000):
1004 chunksize = int(chunksize)
1005 __doc__ = pd.DataFrame.iterrows.__doc__
1006
1007 for chunkdf in self.iterchunks(chunksize=chunksize):
1008 if isinstance(chunkdf, pd.DataFrame):
1009 for row in chunkdf.iterrows():
1010 yield row
1011 else:
1012 # Series does not have iterrows
1013 for i in range(0, len(chunkdf), chunksize):
1014 yield chunkdf.iloc[i:i + chunksize]
1015
1016 def iteritems(self):
1017 if not hasattr(pd.DataFrame, 'iteritems'):
1018 raise NotImplementedError('MDataFrame.iteritems has been removed since Pandas 2.0. Use .items instead.')
1019 __doc__ = pd.DataFrame.iteritems.__doc__
1020 return self.items()
1021
1022 def items(self):
1023 __doc__ = pd.DataFrame.items.__doc__
1024
1025 for col in self.columns:
1026 yield col, self[col].value
1027
1028 @property
1029 def loc(self):
1030 """
1031 Access by index
1032
1033 Use as mdf.loc[index_value]
1034
1035 :return: MLocIndexer
1036 """
1037 self._evaluated = None
1038 indexer = MLocIndexer(self)
1039 return indexer
1040
1041 @property
1042 def iloc(self):
1043 self._evaluated = None
1044 indexer = MPosIndexer(self)
1045 return indexer
1046
1047 def rows(self, start=None, end=None, chunksize=1000):
1048 # equivalent to .iloc[start:end].iteritems(),
1049 start, end, chunksize = (int(v) for v in (start, end, chunksize))
1050 return self.iloc[slice(start, end)].iterchunks(chunksize)
1051
1052 def __repr__(self):
1053 kwargs = ', '.join('{}={}'.format(k, v) for k, v in self._getcopy_kwargs().items())
1054 return "MDataFrame(collection={collection.name}, {kwargs})".format(collection=self.collection,
1055 kwargs=kwargs)
1056
1057
[docs]
1058class MSeries(MDataFrame):
1059 """
1060 Series implementation for MDataFrames
1061
1062 behaves like a DataFrame but limited to one column.
1063 """
1064
1065 def __init__(self, *args, **kwargs):
1066 super(MSeries, self).__init__(*args, **kwargs)
1067 # true if only unique values apply
1068 self.is_unique = False
1069 # apply mixins
1070 self._applyto = str(self.__class__)
1071 self._apply_mixins(*args, **kwargs)
1072
1073 def __getitem__(self, cols_or_slice):
1074 if isinstance(cols_or_slice, Filter):
1075 return MSeries(self.collection, columns=self.columns,
1076 query=cols_or_slice.query)
1077 return super(MSeries, self).__getitem__(cols_or_slice)
1078
1079 @property
1080 def name(self):
1081 return self.columns[0]
1082
[docs]
1083 def unique(self):
1084 """
1085 return the unique set of values for the series
1086
1087 :return: MSeries
1088 """
1089 self.is_unique = True
1090 return self
1091
1092 def _get_cursor(self):
1093 if self.is_unique:
1094 # this way indexes get applied
1095 cursor = self.collection.distinct(make_tuple(self.columns)[0])
1096 else:
1097 cursor = super(MSeries, self)._get_cursor()
1098 return cursor
1099
1100 @property
1101 def value(self):
1102 """
1103 return the value of the series
1104
1105 this is a Series unless unique() was called. If unique()
1106 only distinct values are returned as an array, matching
1107 the behavior of a Series
1108
1109 :return: pandas.Series
1110 """
1111 cursor = self._get_cursor()
1112 column = make_tuple(self.columns)[0]
1113 if self.is_unique:
1114 # the .distinct() cursor returns a list of values
1115 # this is to make sure we return the same thing as pandas
1116 val = [v for v in cursor]
1117 else:
1118 val = self._get_dataframe_from_cursor(cursor)
1119 val = val[column]
1120 val.name = self.name
1121 if len(val) == 1 and self.from_loc_indexer:
1122 val = val.iloc[0]
1123 if self.auto_inspect:
1124 self._inspect_cache.append(self.inspect(explain=True, cursor=cursor, raw=True))
1125 if self._preparefn:
1126 df = self._preparefn(val)
1127 return val
1128
1129 def __repr__(self):
1130 kwargs = ', '.join('{}={}'.format(k, v) for k, v in self._getcopy_kwargs().items())
1131 return "MSeries(collection={collection.name}, {kwargs})".format(collection=self.collection,
1132 kwargs=kwargs)
1133
1134 @property
1135 def shape(self):
1136 return len(self),
1137
1138
1139def _mdf_remake(collection):
1140 # recreate a pickled MDF
1141 mdf = MDataFrame(collection)
1142 return mdf