1from __future__ import absolute_import
2
3import numpy as np
4import pandas as pd
5from bson import Code
6from numpy import isscalar
7from pymongo.collection import Collection
8from uuid import uuid4
9
10from omegaml.store import qops
11from omegaml.store.filtered import FilteredCollection
12from omegaml.store.query import Filter, MongoQ
13from omegaml.store.queryops import MongoQueryOps
14from omegaml.util import make_tuple, make_list, restore_index, \
15 cursor_to_dataframe, restore_index_columns_order, PickableCollection, extend_instance, json_normalize, ensure_index
16
17INSPECT_CACHE = []
18
19
[docs]
20class MGrouper(object):
21 """
22 a Grouper for MDataFrames
23 """
24 STATS_MAP = {
25 'std': 'stdDevSamp',
26 'mean': 'avg',
27 }
28
29 def __init__(self, mdataframe, collection, columns, sort=True):
30 self.mdataframe = mdataframe
31 self.collection = collection
32 self.columns = make_tuple(columns)
33 self.should_sort = sort
34
35 def __getattr__(self, attr):
36 if attr in self.columns:
37 return MSeriesGroupby(self, self.collection, attr)
38
39 def statfunc():
40 columns = self.columns or self._non_group_columns()
41 return self.agg({col: attr for col in columns})
42
43 return statfunc
44
45 def __getitem__(self, item):
46 return self.__getattr__(item)
47
[docs]
48 def agg(self, specs):
49 """
50 shortcut for .aggregate
51 """
52 return self.aggregate(specs)
53
[docs]
54 def aggregate(self, specs, **kwargs):
55 """
56 aggregate by given specs
57
58 See the following link for a list of supported operations.
59 https://docs.mongodb.com/manual/reference/operator/aggregation/group/
60
61 :param specs: a dictionary of { column : function | list[functions] }
62 pairs.
63 """
64
65 def add_stats(specs, column, stat):
66 specs['%s_%s' % (column, stat)] = {
67 '$%s' % MGrouper.STATS_MAP.get(stat, stat): '$%s' % column}
68
69 # generate $group command
70 _specs = {}
71 for column, stats in specs.items():
72 stats = make_tuple(stats)
73 for stat in stats:
74 add_stats(_specs, column, stat)
75 groupby = qops.GROUP(columns=self.columns,
76 **_specs)
77 # execute and return a dataframe
78 pipeline = self._amend_pipeline([groupby])
79 data = self.collection.aggregate(pipeline, allowDiskUse=True)
80
81 def get_data():
82 # we need this to build a pipeline for from_records
83 # to process, otherwise the cursor will be exhausted already
84 for group in data:
85 _id = group.pop('_id')
86 if isinstance(_id, dict):
87 group.update(_id)
88 yield group
89
90 df = pd.DataFrame.from_records(get_data())
91 columns = make_list(self.columns)
92 if columns:
93 df = df.set_index(columns, drop=True)
94 return df
95
96 def _amend_pipeline(self, pipeline):
97 """ amend pipeline with default ops on coll.aggregate() calls """
98 if self.should_sort:
99 sort = qops.SORT(**dict(qops.make_sortkey('_id')))
100 pipeline.append(sort)
101 return pipeline
102
103 def _non_group_columns(self):
104 """ get all columns in mdataframe that is not in columns """
105 return [col for col in self.mdataframe.columns
106 if col not in self.columns and col != '_id'
107 and not col.startswith('_idx')
108 and not col.startswith('_om#')]
109
110 def _count(self):
111 count_columns = self._non_group_columns()
112 if len(count_columns) == 0:
113 count_columns.append('_'.join(self.columns) + '_count')
114 groupby = {
115 "$group": {
116 "_id": {k: "$%s" % k for k in self.columns},
117 }
118 }
119 for k in count_columns:
120 groupby['$group']['%s' % k] = {"$sum": 1}
121 pipeline = self._amend_pipeline([groupby])
122 if self.should_sort:
123 sort = qops.SORT(**dict(qops.make_sortkey('_id')))
124 pipeline.append(sort)
125 return list(self.collection.aggregate(pipeline, allowDiskUse=True))
126
[docs]
127 def count(self):
128 """ return counts by group columns """
129 counts = self._count()
130 # remove mongo object _id
131 for group in counts:
132 group.update(group.pop('_id'))
133 # transform results to dataframe, then return as pandas would
134 resultdf = pd.DataFrame(counts).set_index(make_list(self.columns),
135 drop=True)
136 return resultdf
137
138 def __iter__(self):
139 """ for each group returns the key and a Filter object"""
140 # reduce count to only one column
141 groups = getattr(self, self.columns[0])._count()
142 for group in groups:
143 keys = group.get('_id')
144 data = self.mdataframe._clone(query=keys)
145 yield keys, data
146
147
[docs]
148class MLocIndexer(object):
149 """
150 implements the LocIndexer for MDataFrames
151 """
152
153 def __init__(self, mdataframe, positional=False):
154 self.mdataframe = mdataframe
155 # if positional, any loc[spec] will be applied on the rowid only
156 self.positional = positional
157 # indicator will be set true if loc specs are from a range type (list, tuple, np.ndarray)
158 self._from_range = False
159
[docs]
160 def __getitem__(self, specs):
161 """
162 access by index
163
164 use as mdf.loc[specs] where specs is any of
165
166 * a list or tuple of scalar index values, e.g. .loc[(1,2,3)]
167 * a slice of values e.g. .loc[1:5]
168 * a list of slices, e.g. .loc[1:5, 2:3]
169
170 :return: the sliced part of the MDataFrame
171 """
172 filterq, projection = self._get_filter(specs)
173 df = self.mdataframe
174 if filterq:
175 df = self.mdataframe.query(filterq)
176 df.from_loc_indexer = True
177 df.from_loc_range = self._from_range
178 if projection:
179 df = df[projection]
180 if isinstance(self.mdataframe, MSeries):
181 df = df._as_mseries(df.columns[0])
182 if getattr(df, 'immediate_loc', False):
183 df = df.value
184 return df
185
186 def __setitem__(self, specs, value):
187 raise NotImplementedError()
188
189 def _get_filter(self, specs):
190 filterq = []
191 projection = None
192 if self.positional:
193 idx_cols = ['_om#rowid']
194 else:
195 idx_cols = self.mdataframe._get_frame_index()
196 flt_kwargs = {}
197 enumerable_types = (list, tuple, np.ndarray)
198 if isinstance(specs, np.ndarray):
199 specs = specs.tolist()
200 if (isinstance(specs, enumerable_types)
201 and isscalar(specs[0]) and len(idx_cols) == 1
202 and not any(isinstance(s, slice) for s in specs)):
203 # single column index with list of scalar values
204 if (self.positional and isinstance(specs, tuple) and len(specs) == 2
205 and all(isscalar(v) for v in specs)):
206 # iloc[int, int] is a cell access
207 flt_kwargs[idx_cols[0]] = specs[0]
208 projection = self._get_projection(specs[1])
209 else:
210 flt_kwargs['{}__in'.format(idx_cols[0])] = specs
211 self._from_range = True
212 elif isinstance(specs, (int, str)):
213 flt_kwargs[idx_cols[0]] = specs
214 else:
215 specs = make_tuple(specs)
216 # list/tuple of slices or scalar values, or MultiIndex
217 for i, spec in enumerate(specs):
218 if i < len(idx_cols):
219 col = idx_cols[i]
220 if isinstance(spec, slice):
221 self._from_range = True
222 start, stop = spec.start, spec.stop
223 if start is not None:
224 flt_kwargs['{}__gte'.format(col)] = start
225 if stop is not None:
226 if isinstance(stop, int):
227 stop -= int(self.positional)
228 flt_kwargs['{}__lte'.format(col)] = stop
229 elif isinstance(spec, enumerable_types) and isscalar(spec[0]):
230 self._from_range = True
231 # single column index with list of scalar values
232 # -- convert to list for PyMongo serialization
233 if isinstance(spec, np.ndarray):
234 spec = spec.tolist()
235 flt_kwargs['{}__in'.format(col)] = spec
236 elif isscalar(col):
237 flt_kwargs[col] = spec
238 else:
239 # we're out of index columns, let's look at columns
240 cur_proj = self._get_projection(spec)
241 # if we get a single column, that's the projection
242 # if we had a single column, now need to add more, create a list
243 # if we have a list already, extend that
244 if projection is None:
245 projection = cur_proj
246 elif isinstance(projection, list):
247 projection.extend(cur_proj)
248 else:
249 projection = [projection, cur_proj]
250 if flt_kwargs:
251 filterq.append(MongoQ(**flt_kwargs))
252 finalq = None
253 for q in filterq:
254 if finalq:
255 finalq |= q
256 else:
257 finalq = q
258 return finalq, projection
259
260 def _get_projection(self, spec):
261 columns = self.mdataframe.columns
262 if np.isscalar(spec):
263 return [spec]
264 if isinstance(spec, (tuple, list)):
265 assert all(columns.index(col) for col in columns)
266 return spec
267 if isinstance(spec, slice):
268 start, stop = spec.start, spec.stop
269 if all(isinstance(v, int) for v in (start, stop)):
270 start, stop, step = spec.indices(len(columns))
271 else:
272 start = columns.index(start) if start is not None else 0
273 stop = columns.index(stop) + 1 if stop is not None else len(columns)
274 return columns[slice(start, stop)]
275 raise IndexError
276
277
[docs]
278class MPosIndexer(MLocIndexer):
279 """
280 implements the position-based indexer for MDataFrames
281 """
282
283 def __init__(self, mdataframe):
284 super(MPosIndexer, self).__init__(mdataframe, positional=True)
285
286 def _get_projection(self, spec):
287 columns = self.mdataframe.columns
288 if np.isscalar(spec):
289 return columns[spec]
290 if isinstance(spec, (tuple, list)):
291 return [col for i, col in enumerate(spec) if i in spec]
292 if isinstance(spec, slice):
293 start, stop = slice.start, slice.stop
294 if start and not isinstance(start, int):
295 start = 0
296 if stop and not isinstance(stop, int):
297 # sliced ranges are inclusive
298 stop = len(columns)
299 return columns[slice(start, stop)]
300 raise IndexError
301
302
303class MSeriesGroupby(MGrouper):
304 """
305 like a MGrouper but limited to one column
306 """
307
308 def count(self):
309 """
310 return series count
311
312 :return: counts by group
313 """
314 # MGrouper will insert a _count column, see _count(). we remove
315 # that column again and return a series named as the group column
316 resultdf = super(MSeriesGroupby, self).count()
317 count_column = [col for col in resultdf.columns
318 if col.endswith('_count')][0]
319 new_column = count_column.replace('_count', '')
320 resultdf = resultdf.rename(columns={count_column: new_column})
321 return resultdf[new_column]
322
323
[docs]
324class MDataFrame(object):
325 """
326 A DataFrame for mongodb
327
328 Performs out-of-core, lazy computOation on a mongodb cluster.
329 Behaves like a pandas DataFrame. Actual results are returned
330 as pandas DataFrames.
331 """
332
333 STATFUNCS = ['mean', 'std', 'min', 'max', 'sum', 'var']
334
335 def __init__(self, collection, columns=None, query=None,
336 limit=None, skip=None, sort_order=None,
337 force_columns=None, immediate_loc=False, auto_inspect=False,
338 normalize=False, raw=False,
339 parser=None,
340 preparefn=None, from_loc_range=False, metadata=None, **kwargs):
341 self.collection = PickableCollection(collection)
342 # columns in frame
343 self.columns = make_tuple(columns) if columns else self._get_fields(raw=raw)
344 self.columns = [str(col) for col in self.columns]
345 # columns to sort by, defaults to not sorted
346 self.sort_order = sort_order
347 # top n documents to fetch
348 self.head_limit = limit
349 # top n documents to skip before returning
350 self.skip_topn = skip
351 # filter criteria
352 self.filter_criteria = query or {}
353 # force columns -- on output add columns not present
354 self.force_columns = force_columns or []
355 # was this created from the loc indexer?
356 self.from_loc_indexer = kwargs.get('from_loc_indexer', False)
357 # was the loc index used a range? Else a single value
358 self.from_loc_range = from_loc_range
359 # setup query for filter criteries, if provided
360 if self.filter_criteria:
361 # make sure we have a filtered collection with the criteria given
362 if isinstance(self.filter_criteria, dict):
363 self.query_inplace(**self.filter_criteria)
364 elif isinstance(self.filter_criteria, Filter):
365 self.query_inplace(self.filter_criteria)
366 else:
367 raise ValueError('Invalid query specification of type {}'.format(type(self.filter_criteria)))
368 # if immediate_loc is True, .loc and .iloc always evaluate
369 self.immediate_loc = immediate_loc
370 # __array__ will return this value if it is set, set it otherwise
371 self._evaluated = None
372 # set true to automatically capture inspects on .value. retrieve using .inspect(cached=True)
373 self.auto_inspect = auto_inspect
374 self._inspect_cache = INSPECT_CACHE
375 # apply mixins
376 self._applyto = str(self.__class__)
377 self._apply_mixins()
378 # parser to parse documents to dataframe
379 self._parser = json_normalize if normalize else parser
380 # prepare function to be applied just before returning from .value
381 self._preparefn = preparefn
382 # keep technical fields like _id, _idx etc
383 self._raw = raw
384 # metadata stored by omegaml (equiv. of metadata.kind_meta)
385 self.metadata = metadata or dict()
386
387 def _apply_mixins(self, *args, **kwargs):
388 """
389 apply mixins in defaults.OMEGA_MDF_MIXINS
390 """
391 from omegaml import settings
392 defaults = settings()
393 for mixin, applyto in defaults.OMEGA_MDF_MIXINS:
394 if any(v in self._applyto for v in applyto.split(',')):
395 extend_instance(self, mixin, *args, **kwargs)
396
397 def __getstate__(self):
398 # pickle support. note that the hard work is done in PickableCollection
399 data = dict(self.__dict__)
400 data.update(_evaluated=None)
401 data.update(_inspect_cache=None)
402 data.update(auto_inspect=self.auto_inspect)
403 data.update(_preparefn=self._preparefn)
404 data.update(_parser=self._parser)
405 data.update(_raw=self._raw)
406 data.update(collection=self.collection)
407 return data
408
409 def __reduce__(self):
410 state = self.__getstate__()
411 args = self.collection,
412 return _mdf_remake, args, state
413
414 def __setstate__(self, state):
415 # pickle support. note that the hard work is done in PickableCollection
416 for k, v in state.items():
417 setattr(self, k, v)
418
419 def _getcopy_kwargs(self, without=None):
420 """ return all parameters required on a copy of this MDataFrame """
421 kwargs = dict(columns=self.columns,
422 sort_order=self.sort_order,
423 limit=self.head_limit,
424 skip=self.skip_topn,
425 from_loc_indexer=self.from_loc_indexer,
426 from_loc_range=self.from_loc_range,
427 immediate_loc=self.immediate_loc,
428 metadata=self.metadata,
429 query=self.filter_criteria,
430 auto_inspect=self.auto_inspect,
431 parser=self._parser,
432 preparefn=self._preparefn)
433 [kwargs.pop(k) for k in make_tuple(without or [])]
434 return kwargs
435
436 def __array__(self, dtype=None):
437 # FIXME inefficient. make MDataFrame a drop-in replacement for any numpy ndarray
438 # this evaluates every single time
439 if self._evaluated is None:
440 self._evaluated = array = self.value.values
441 else:
442 array = self._evaluated
443 return array
444
445 def __getattr__(self, attr):
446 if attr in MDataFrame.STATFUNCS:
447 return self.statfunc(attr)
448 if attr in self.columns:
449 kwargs = self._getcopy_kwargs()
450 kwargs.update(columns=attr)
451 return MSeries(self.collection, **kwargs)
452 raise AttributeError(attr)
453
454 def __getitem__(self, cols_or_slice):
455 """
456 select and project by column, columns, slice, masked-style filter
457
458 Masked-style filters work similar to pd.DataFrame/Series masks
459 but do not actually return masks but an instance of Filter. A
460 Filter is a delayed evaluation on the data frame.
461
462 # select all rows where any column is == 5
463 mdf = MDataFrame(coll)
464 flt = mdf == 5
465 mdf[flt]
466 =>
467
468 :param cols_or_slice: single column (str), multi-columns (list),
469 slice to select columns or a masked-style
470 :return: filtered MDataFrame or MSeries
471 """
472 if isinstance(cols_or_slice, str):
473 # column name => MSeries
474 return self._as_mseries(cols_or_slice)
475 elif isinstance(cols_or_slice, int):
476 # column number => MSeries
477 column = self.columns[cols_or_slice]
478 return self._as_mseries(column)
479 elif isinstance(cols_or_slice, (tuple, list)):
480 # list of column names => MDataFrame subset on columns
481 kwargs = self._getcopy_kwargs()
482 kwargs.update(columns=cols_or_slice)
483 return MDataFrame(self.collection, **kwargs)
484 elif isinstance(cols_or_slice, Filter):
485 kwargs = self._getcopy_kwargs()
486 kwargs.update(query=cols_or_slice.query)
487 return MDataFrame(self.collection, **kwargs)
488 elif isinstance(cols_or_slice, np.ndarray):
489 return self.iloc[cols_or_slice]
490 raise ValueError('unknown accessor type %s' % type(cols_or_slice))
491
492 def __setitem__(self, column, value):
493 # True for any scalar type, numeric, bool, string
494 if np.isscalar(value):
495 result = self.collection.update_many(filter=self.filter_criteria,
496 update=qops.SET(column, value))
497 self.columns.append(column)
498 return self
499
500 def _clone(self, collection=None, **kwargs):
501 # convenience method to clone itself with updates
502 collection = collection if collection is not None else self.collection
503 return self.__class__(collection, **kwargs,
504 **self._getcopy_kwargs(without=list(kwargs.keys())))
505
506 def statfunc(self, stat):
507 aggr = MGrouper(self, self.collection, [], sort=False)
508 return getattr(aggr, stat)
509
[docs]
510 def groupby(self, columns, sort=True):
511 """
512 Group by a given set of columns
513
514 :param columns: the list of columns
515 :param sort: if True sort by group key
516 :return: MGrouper
517 """
518 return MGrouper(self, self.collection, columns, sort=sort)
519
520 def _get_fields(self, raw=False):
521 result = []
522 doc = self.collection.find_one()
523 if doc is not None:
524 if raw:
525 result = list(doc.keys())
526 else:
527 result = [str(col) for col in doc.keys()
528 if col != '_id'
529 and not col.startswith('_idx')
530 and not col.startswith('_om#')]
531 return result
532
533 def _get_frame_index(self):
534 """ return the dataframe's index columns """
535 doc = self.collection.find_one()
536 if doc is None:
537 result = []
538 else:
539 result = restore_index_columns_order(doc.keys())
540 return result
541
542 def _get_frame_om_fields(self):
543 """ return the dataframe's omega special fields columns """
544 doc = self.collection.find_one()
545 if doc is None:
546 result = []
547 else:
548 result = [k for k in list(doc.keys()) if k.startswith('_om#')]
549 return result
550
551 def _as_mseries(self, column):
552 kwargs = self._getcopy_kwargs()
553 kwargs.update(columns=make_tuple(column))
554 return MSeries(self.collection, **kwargs)
555
[docs]
556 def inspect(self, explain=False, cached=False, cursor=None, raw=False):
557 """
558 inspect this dataframe's actual mongodb query
559
560 :param explain: if True explains access path
561 """
562 if not cached:
563 if isinstance(self.collection, FilteredCollection):
564 query = self.collection.query
565 else:
566 query = '*',
567 if explain:
568 cursor = cursor or self._get_cursor()
569 explain = cursor.explain()
570 data = {
571 'projection': self.columns,
572 'query': query,
573 'explain': explain or 'specify explain=True'
574 }
575 else:
576 data = self._inspect_cache
577 if not (raw or explain):
578 data = pd.DataFrame(json_normalize(data))
579 return data
580
581 def count(self):
582 """
583 projected number of rows when resolving
584 """
585 nrows = len(self)
586 counts = pd.Series({
587 col: nrows
588 for col in self.columns}, index=self.columns)
589 return counts
590
[docs]
591 def __len__(self):
592 """
593 the projected number of rows when resolving
594 """
595 # we reduce to just 1 column to reduce speed
596 short = self._clone()[self.columns[0]]
597 return sum(1 for d in short._get_cursor())
598
599 @property
600 def shape(self):
601 """
602 return shape of dataframe
603 """
604 return len(self), len(self.columns)
605
606 @property
607 def ndim(self):
608 return len(self.shape)
609
610 @property
611 def value(self):
612 """
613 resolve the query and return a Pandas DataFrame
614
615 :return: the result of the query as a pandas DataFrame
616 """
617 cursor = self._get_cursor()
618 df = self._get_dataframe_from_cursor(cursor)
619 if self.auto_inspect:
620 self._inspect_cache.append(self.inspect(explain=True, cursor=cursor, raw=True))
621 # this ensures the equiv. of pandas df.loc[n] is a Series
622 if self.from_loc_indexer:
623 if len(df) == 1 and not self.from_loc_range:
624 idx = df.index
625 df = df.T
626 df = df[df.columns[0]]
627 if df.ndim == 1 and len(df) == 1 and not isinstance(idx, pd.MultiIndex):
628 # single row single dimension, numeric index only
629 df = df.iloc[0]
630 elif (df.ndim == 1 or df.shape[1] == 1) and not self.from_loc_range:
631 df = df[df.columns[0]]
632 if self._preparefn:
633 df = self._preparefn(df)
634 return df
635
636 def reset(self):
637 # TODO if head(), tail(), query(), .loc/iloc should return a new MDataFrame instance to avoid having a reset need
638 self.head_limit = None
639 self.skip_topn = None
640 self.filter_criteria = {}
641 self.force_columns = []
642 self.sort_order = None
643 self.from_loc_indexer = False
644 return self
645
646 def _get_dataframe_from_cursor(self, cursor):
647 """
648 from the given cursor return a DataFrame
649 """
650 df = cursor_to_dataframe(cursor, parser=self._parser)
651 df = self._restore_dataframe_proper(df)
652 return df
653
654 @property
655 def _index_meta(self):
656 return self.metadata.get('idx_meta') or dict()
657
658 def _restore_dataframe_proper(self, df):
659 df = restore_index(df, self._index_meta)
660 if '_id' in df.columns and not self._raw:
661 df.drop('_id', axis=1, inplace=True)
662 if self.force_columns:
663 missing = set(self.force_columns) - set(self.columns)
664 for col in missing:
665 df[col] = np.NaN
666 return df
667
668 def _get_cursor(self):
669 projection = make_tuple(self.columns)
670 projection += make_tuple(self._get_frame_index())
671 if not self.sort_order:
672 # implicit sort
673 projection += make_tuple(self._get_frame_om_fields())
674 cursor = self.collection.find(projection=projection)
675 if self.sort_order:
676 cursor.sort(qops.make_sortkey(make_tuple(self.sort_order)))
677 if self.head_limit:
678 cursor.limit(self.head_limit)
679 if self.skip_topn:
680 cursor.skip(self.skip_topn)
681 return cursor
682
[docs]
683 def sort(self, columns):
684 """
685 sort by specified columns
686
687 :param columns: str of single column or a list of columns. Sort order
688 is specified as the + (ascending) or - (descending)
689 prefix to the column name. Default sort order is
690 ascending.
691 :return: the MDataFrame
692 """
693 self._evaluated = None
694 self.sort_order = make_tuple(columns)
695 return self
696
[docs]
697 def head(self, limit=10):
698 """
699 return up to limit numbers of rows
700
701 :param limit: the number of rows to return. Defaults to 10
702 :return: the MDataFrame
703 """
704 return self._clone(limit=limit)
705
706 def tail(self, limit=10):
707 """
708 return up to limit number of rows from last inserted values
709
710 :param limit:
711 :return:
712 """
713 tail_n = self.skip(len(self) - limit)
714 return self._clone(skip=tail_n)
715
[docs]
716 def skip(self, topn):
717 """
718 skip the topn number of rows
719
720 :param topn: the number of rows to skip.
721 :return: the MDataFrame
722 """
723 return self._clone(skip=topn)
724
[docs]
725 def merge(self, right, on=None, left_on=None, right_on=None,
726 how='inner', target=None, suffixes=('_x', '_y'),
727 sort=False, inspect=False, filter=None):
728 """
729 merge this dataframe with another dataframe. only left outer joins
730 are currently supported. the output is saved as a new collection,
731 target name (defaults to a generated name if not specified).
732
733 :param right: the other MDataFrame
734 :param on: the list of key columns to merge by
735 :param left_on: the list of the key columns to merge on this dataframe
736 :param right_on: the list of the key columns to merge on the other
737 dataframe
738 :param how: the method to merge. supported are left, inner, right.
739 Defaults to inner
740 :param target: the name of the collection to store the merge results
741 in. If not provided a temporary name will be created.
742 :param suffixes: the suffixes to apply to identical left and right
743 columns
744 :param sort: if True the merge results will be sorted. If False the
745 MongoDB natural order is implied.
746 :returns: the MDataFrame to the target MDataFrame
747 """
748 # validate input
749 supported_how = ["left", 'inner', 'right']
750 assert how in supported_how, "only %s merges are currently supported" % supported_how
751 for key in [on, left_on, right_on]:
752 if key:
753 assert isinstance(
754 key, str), "only single column merge keys are supported (%s)" % key
755 if isinstance(right, (Collection, PickableCollection, FilteredCollection)):
756 right = MDataFrame(right)
757 assert isinstance(
758 right, MDataFrame), "both must be MDataFrames, got right=%" % type(right)
759 if how == 'right':
760 # A right B == B left A
761 return right.merge(self, on=on, left_on=right_on, right_on=left_on,
762 how='left', target=target, suffixes=suffixes)
763 # generate lookup parameters
764 on = on or '_id'
765 right_name = self._get_collection_name_of(right, right)
766 target_name = self._get_collection_name_of(
767 target, '_temp.merge.%s' % uuid4().hex)
768 target_field = (
769 "%s_%s" % (right_name.replace('.', '_'), right_on or on))
770 """
771 TODO enable filter criteria on right dataframe. requires changing LOOKUP syntax from
772 equitly to arbitray match
773
774 if right.filter_criteria:
775 right_filter = [qops.MATCH(self._get_filter_criteria(**right.filter_criteria))]
776 else:
777 right_filter = None
778 """
779 right_filter = None
780 lookup = qops.LOOKUP(right_name,
781 key=on,
782 left_key=left_on,
783 right_key=right_on,
784 target=target_field)
785 # unwind merged documents from arrays to top-level document fields
786 unwind = qops.UNWIND(target_field, preserve=how != 'inner')
787 # get all fields from left, right
788 project = {}
789 for left_col in self.columns:
790 source_left_col = left_col
791 if left_col == '_id':
792 project[left_col] = 1
793 continue
794 if left_col.startswith('_idx'):
795 continue
796 if left_col.startswith('_om#'):
797 continue
798 if left_col != (on or left_on) and left_col in right.columns:
799 left_col = '%s%s' % (left_col, suffixes[0])
800 project[left_col] = "$%s" % source_left_col
801 for right_col in right.columns:
802 if right_col == '_id':
803 continue
804 if right_col.startswith('_idx'):
805 continue
806 if right_col.startswith('_om#'):
807 continue
808 if right_col == (on or right_on) and right_col == (on or left_on):
809 # if the merge field is the same in both frames, we already
810 # have it from left
811 continue
812 if right_col in self.columns:
813 left_col = '%s%s' % (right_col, suffixes[1])
814 else:
815 left_col = '%s' % right_col
816 project[left_col] = '$%s.%s' % (target_field, right_col)
817 expected_columns = list(project.keys())
818 if '_id' not in project:
819 project['_id'] = 0 # never copy objectids to avoid duplicate keys, unless requested
820 project = {"$project": project}
821 # store merged documents and return an MDataFrame to it
822 out = qops.OUT(target_name)
823 pipeline = [lookup, unwind, project]
824 if filter:
825 query = qops.MATCH(self._get_filter_criteria(**filter))
826 pipeline.append(query)
827 if sort:
828 sort_cols = make_list(on or [left_on, right_on])
829 sort_key = qops.make_sortkey(sort_cols)
830 sort = qops.SORT(**dict(sort_key))
831 pipeline.append(sort)
832 pipeline.append(out)
833 if inspect:
834 result = pipeline
835 else:
836 result = self.collection.aggregate(pipeline, allowDiskUse=True)
837 result = MDataFrame(self.collection.database[target_name],
838 force_columns=expected_columns)
839 return result
840
841 def append(self, other):
842 if isinstance(other, Collection):
843 other = MDataFrame(other)
844 assert isinstance(
845 other, MDataFrame), "both must be MDataFrames, got other={}".format(type(other))
846 outname = self.collection.name
847 mrout = {
848 'merge': outname,
849 'nonAtomic': True,
850 }
851 mapfn = Code("""
852 function() {
853 this._id = ObjectId();
854 if(this['_om#rowid']) {
855 this['_om#rowid'] += %s;
856 }
857 emit(this._id, this);
858 }
859 """ % len(self))
860 reducefn = Code("""
861 function(key, value) {
862 return value;
863 }
864 """)
865 finfn = Code("""
866 function(key, value) {
867 return value;
868 }
869 """)
870 other.collection.map_reduce(mapfn, reducefn, mrout, finalize=finfn, jsMode=True)
871 unwind = {
872 "$replaceRoot": {
873 "newRoot": {
874 "$ifNull": ["$value", "$$CURRENT"],
875 }
876 }
877 }
878 output = qops.OUT(outname)
879 pipeline = [unwind, output]
880 self.collection.aggregate(pipeline, allowDiskUse=True)
881 return self
882
883 def _get_collection_name_of(self, some, default=None):
884 """
885 determine the collection name of the given parameter
886
887 returns the collection name if some is a MDataFrame, a Collection
888 or a string_type. Otherwise returns default
889 """
890 if isinstance(some, MDataFrame):
891 name = some.collection.name
892 elif isinstance(some, Collection):
893 name = some.name
894 else:
895 name = default
896 return name
897
898 def _get_filter_criteria(self, *args, **kwargs):
899 """
900 return mongo query from filter specs
901
902 this uses a Filter to produce the query from the kwargs.
903
904 :param args: a Q object or logical combination of Q objects
905 (optional)
906 :param kwargs: all AND filter criteria
907 """
908 if len(args) > 0:
909 q = args[0]
910 if isinstance(q, MongoQ):
911 filter_criteria = Filter(self.collection, q).query
912 elif isinstance(q, Filter):
913 filter_criteria = Filter(self.collection, q.q).query
914 else:
915 filter_criteria = Filter(self.collection, **kwargs).query
916 return filter_criteria
917
[docs]
918 def query_inplace(self, *args, **kwargs):
919 """
920 filters this MDataFrame and returns it.
921
922 Any subsequent operation on the dataframe will have the filter
923 applied. To reset the filter call .reset() without arguments.
924
925 :param args: a Q object or logical combination of Q objects
926 (optional)
927 :param kwargs: all AND filter criteria
928 :return: self
929 """
930 self._evaluated = None
931 self.filter_criteria = self._get_filter_criteria(*args, **kwargs)
932 self.collection = FilteredCollection(
933 self.collection, query=self.filter_criteria)
934 return self
935
[docs]
936 def query(self, *args, **kwargs):
937 """
938 return a new MDataFrame with a filter criteria
939
940 Any subsequent operation on the new dataframe will have the filter
941 applied. To reset the filter call .reset() without arguments.
942
943 Note: Unlike pandas DataFrames, a filtered MDataFrame operates
944 on the same collection as the original DataFrame
945
946 :param args: a Q object or logical combination of Q objects
947 (optional)
948 :param kwargs: all AND filter criteria
949 :return: a new MDataFrame with the filter applied
950 """
951 effective_filter = dict(self.filter_criteria)
952 filter_criteria = self._get_filter_criteria(*args, **kwargs)
953 if '$and' in effective_filter:
954 effective_filter['$and'].extend(filter_criteria.get('$and'))
955 else:
956 effective_filter.update(filter_criteria)
957 coll = FilteredCollection(self.collection, query=effective_filter)
958 return self._clone(collection=coll, query=effective_filter)
959
[docs]
960 def create_index(self, keys, **kwargs):
961 """
962 create and index the easy way
963 """
964 keys, kwargs = MongoQueryOps().make_index(keys)
965 result = ensure_index(self.collection, keys, **kwargs)
966 return result
967
968 def list_indexes(self):
969 """
970 list all indices in database
971 """
972 return cursor_to_dataframe(self.collection.list_indexes())
973
974 def iterchunks(self, chunksize=100):
975 """
976 return an iterator
977
978 Args:
979 chunksize (int): number of rows in each chunk
980
981 Returns:
982 a dataframe of max. length chunksize
983 """
984 chunksize = int(chunksize)
985 i = 0
986 while True:
987 chunkdf = self.skip(i).head(chunksize).value
988 if len(chunkdf) == 0:
989 break
990 i += chunksize
991 yield chunkdf
992
993 def itertuples(self, chunksize=1000):
994 chunksize = int(chunksize)
995 __doc__ = pd.DataFrame.itertuples.__doc__
996
997 for chunkdf in self.iterchunks(chunksize=chunksize):
998 for row in chunkdf.iterrows():
999 yield row
1000
1001 def iterrows(self, chunksize=1000):
1002 chunksize = int(chunksize)
1003 __doc__ = pd.DataFrame.iterrows.__doc__
1004
1005 for chunkdf in self.iterchunks(chunksize=chunksize):
1006 if isinstance(chunkdf, pd.DataFrame):
1007 for row in chunkdf.iterrows():
1008 yield row
1009 else:
1010 # Series does not have iterrows
1011 for i in range(0, len(chunkdf), chunksize):
1012 yield chunkdf.iloc[i:i + chunksize]
1013
1014 def iteritems(self):
1015 if not hasattr(pd.DataFrame, 'iteritems'):
1016 raise NotImplementedError('MDataFrame.iteritems has been removed since Pandas 2.0. Use .items instead.')
1017 __doc__ = pd.DataFrame.iteritems.__doc__
1018 return self.items()
1019
1020 def items(self):
1021 __doc__ = pd.DataFrame.items.__doc__
1022
1023 for col in self.columns:
1024 yield col, self[col].value
1025
1026 @property
1027 def loc(self):
1028 """
1029 Access by index
1030
1031 Use as mdf.loc[index_value]
1032
1033 :return: MLocIndexer
1034 """
1035 self._evaluated = None
1036 indexer = MLocIndexer(self)
1037 return indexer
1038
1039 @property
1040 def iloc(self):
1041 self._evaluated = None
1042 indexer = MPosIndexer(self)
1043 return indexer
1044
1045 def rows(self, start=None, end=None, chunksize=1000):
1046 # equivalent to .iloc[start:end].iteritems(),
1047 start, end, chunksize = (int(v) for v in (start, end, chunksize))
1048 return self.iloc[slice(start, end)].iterchunks(chunksize)
1049
1050 def __repr__(self):
1051 kwargs = ', '.join('{}={}'.format(k, v) for k, v in self._getcopy_kwargs().items())
1052 return "MDataFrame(collection={collection.name}, {kwargs})".format(collection=self.collection,
1053 kwargs=kwargs)
1054
1055
[docs]
1056class MSeries(MDataFrame):
1057 """
1058 Series implementation for MDataFrames
1059
1060 behaves like a DataFrame but limited to one column.
1061 """
1062
1063 def __init__(self, *args, **kwargs):
1064 super(MSeries, self).__init__(*args, **kwargs)
1065 # true if only unique values apply
1066 self.is_unique = False
1067 # apply mixins
1068 self._applyto = str(self.__class__)
1069 self._apply_mixins(*args, **kwargs)
1070
1071 def __getitem__(self, cols_or_slice):
1072 if isinstance(cols_or_slice, Filter):
1073 return MSeries(self.collection, columns=self.columns,
1074 query=cols_or_slice.query)
1075 return super(MSeries, self).__getitem__(cols_or_slice)
1076
1077 @property
1078 def name(self):
1079 return self.columns[0]
1080
[docs]
1081 def unique(self):
1082 """
1083 return the unique set of values for the series
1084
1085 :return: MSeries
1086 """
1087 self.is_unique = True
1088 return self
1089
1090 def _get_cursor(self):
1091 if self.is_unique:
1092 # this way indexes get applied
1093 cursor = self.collection.distinct(make_tuple(self.columns)[0])
1094 else:
1095 cursor = super(MSeries, self)._get_cursor()
1096 return cursor
1097
1098 @property
1099 def value(self):
1100 """
1101 return the value of the series
1102
1103 this is a Series unless unique() was called. If unique()
1104 only distinct values are returned as an array, matching
1105 the behavior of a Series
1106
1107 :return: pandas.Series
1108 """
1109 cursor = self._get_cursor()
1110 column = make_tuple(self.columns)[0]
1111 if self.is_unique:
1112 # the .distinct() cursor returns a list of values
1113 # this is to make sure we return the same thing as pandas
1114 val = [v for v in cursor]
1115 else:
1116 val = self._get_dataframe_from_cursor(cursor)
1117 val = val[column]
1118 val.name = self.name
1119 if len(val) == 1 and self.from_loc_indexer:
1120 val = val.iloc[0]
1121 if self.auto_inspect:
1122 self._inspect_cache.append(self.inspect(explain=True, cursor=cursor, raw=True))
1123 if self._preparefn:
1124 df = self._preparefn(val)
1125 return val
1126
1127 def __repr__(self):
1128 kwargs = ', '.join('{}={}'.format(k, v) for k, v in self._getcopy_kwargs().items())
1129 return "MSeries(collection={collection.name}, {kwargs})".format(collection=self.collection,
1130 kwargs=kwargs)
1131
1132 @property
1133 def shape(self):
1134 return len(self),
1135
1136
1137def _mdf_remake(collection):
1138 # recreate a pickled MDF
1139 mdf = MDataFrame(collection)
1140 return mdf