diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index e556708dc9283..7ff15ec2aae9a 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -11,21 +11,29 @@ from functools import partial from textwrap import dedent import typing -from typing import Any, Callable, FrozenSet, Sequence, Type, Union +from typing import ( + Any, + Callable, + Dict, + FrozenSet, + Hashable, + List, + Optional, + Sequence, + Tuple, + Type, + Union, +) import warnings import numpy as np -from pandas._libs import Timestamp, lib +from pandas._libs import Timestamp from pandas.compat import PY36 from pandas.errors import AbstractMethodError from pandas.util._decorators import Appender, Substitution -from pandas.core.dtypes.cast import ( - maybe_convert_objects, - maybe_downcast_numeric, - maybe_downcast_to_dtype, -) +from pandas.core.dtypes.cast import maybe_convert_objects, maybe_downcast_to_dtype from pandas.core.dtypes.common import ( ensure_int64, ensure_platform_int, @@ -39,24 +47,18 @@ is_object_dtype, is_scalar, ) -from pandas.core.dtypes.missing import _isna_ndarraylike, isna, notna +from pandas.core.dtypes.missing import isna, notna from pandas._typing import FrameOrSeries import pandas.core.algorithms as algorithms -from pandas.core.base import DataError, SpecificationError +from pandas.core.base import SpecificationError import pandas.core.common as com from pandas.core.frame import DataFrame from pandas.core.generic import ABCDataFrame, ABCSeries, NDFrame, _shared_docs from pandas.core.groupby import base -from pandas.core.groupby.groupby import ( - GroupBy, - _apply_docs, - _transform_template, - groupby, -) +from pandas.core.groupby.groupby import GroupBy, _apply_docs, _transform_template from pandas.core.index import Index, MultiIndex, _all_indexes_same import pandas.core.indexes.base as ibase -from pandas.core.internals import BlockManager, make_block from pandas.core.series import Series from pandas.plotting import boxplot_frame_groupby @@ -147,93 +149,6 @@ def _iterate_slices(self): continue yield val, slicer(val) - def _cython_agg_general(self, how, alt=None, numeric_only=True, min_count=-1): - new_items, new_blocks = self._cython_agg_blocks( - how, alt=alt, numeric_only=numeric_only, min_count=min_count - ) - return self._wrap_agged_blocks(new_items, new_blocks) - - _block_agg_axis = 0 - - def _cython_agg_blocks(self, how, alt=None, numeric_only=True, min_count=-1): - # TODO: the actual managing of mgr_locs is a PITA - # here, it should happen via BlockManager.combine - - data, agg_axis = self._get_data_to_aggregate() - - if numeric_only: - data = data.get_numeric_data(copy=False) - - new_blocks = [] - new_items = [] - deleted_items = [] - no_result = object() - for block in data.blocks: - # Avoid inheriting result from earlier in the loop - result = no_result - locs = block.mgr_locs.as_array - try: - result, _ = self.grouper.aggregate( - block.values, how, axis=agg_axis, min_count=min_count - ) - except NotImplementedError: - # generally if we have numeric_only=False - # and non-applicable functions - # try to python agg - - if alt is None: - # we cannot perform the operation - # in an alternate way, exclude the block - deleted_items.append(locs) - continue - - # call our grouper again with only this block - obj = self.obj[data.items[locs]] - s = groupby(obj, self.grouper) - try: - result = s.aggregate(lambda x: alt(x, axis=self.axis)) - except TypeError: - # we may have an exception in trying to aggregate - # continue and exclude the block - deleted_items.append(locs) - continue - finally: - if result is not no_result: - # see if we can cast the block back to the original dtype - result = maybe_downcast_numeric(result, block.dtype) - newb = block.make_block(result) - - new_items.append(locs) - new_blocks.append(newb) - - if len(new_blocks) == 0: - raise DataError("No numeric types to aggregate") - - # reset the locs in the blocks to correspond to our - # current ordering - indexer = np.concatenate(new_items) - new_items = data.items.take(np.sort(indexer)) - - if len(deleted_items): - - # we need to adjust the indexer to account for the - # items we have removed - # really should be done in internals :< - - deleted = np.concatenate(deleted_items) - ai = np.arange(len(data)) - mask = np.zeros(len(data)) - mask[deleted] = 1 - indexer = (ai - mask.cumsum())[indexer] - - offset = 0 - for b in new_blocks: - loc = len(b.mgr_locs) - b.mgr_locs = indexer[offset : (offset + loc)] - offset += loc - - return new_items, new_blocks - def aggregate(self, func, *args, **kwargs): _level = kwargs.pop("_level", None) @@ -355,18 +270,48 @@ def _aggregate_item_by_item(self, func, *args, **kwargs): return DataFrame(result, columns=result_columns) - def _decide_output_index(self, output, labels): - if len(output) == len(labels): - output_keys = labels + def _decide_output_index( + self, + output: Dict, + labels: Index, + col_labels: Optional[List[Union[Hashable, Tuple[Hashable, ...]]]] = None, + ) -> Index: + """ + Determine axis labels to use while wrapping aggregated values. + + Parameters + ---------- + output : dict of ndarrays + Results of aggregating by-column. + labels : Index + Existing labels of selected object. Used to determine resulting + shape and name(s). + col_labels : list, optional + The ultimate column labels for the reshaped object. Each entry in + this list should correspond to a key value in output. Must be valid + column labels and tuples are contained within should map to a + MultiIndex. + + Returns + ------- + Index or MultiIndex + + Notes + ----- + Ideally output should always have integers as a key and the col_labels + should be provided separately, but as of writing this is not the case. + When output is not using integers there is a risk of duplicate column + labels not be handled correctly. + """ + if col_labels: + keys = col_labels else: - output_keys = sorted(output) - try: - output_keys.sort() - except TypeError: - pass + keys = list(output.keys()) - if isinstance(labels, MultiIndex): - output_keys = MultiIndex.from_tuples(output_keys, names=labels.names) + if isinstance(labels, MultiIndex): + output_keys = MultiIndex.from_tuples(keys, names=labels.names) + else: + output_keys = Index(keys, name=labels.name) return output_keys @@ -1385,8 +1330,6 @@ class DataFrameGroupBy(NDFrameGroupBy): _apply_whitelist = base.dataframe_apply_whitelist - _block_agg_axis = 1 - _agg_see_also_doc = dedent( """ See Also @@ -1525,13 +1468,6 @@ def _wrap_generic_output(self, result, obj): else: return DataFrame(result, index=obj.index, columns=result_index) - def _get_data_to_aggregate(self): - obj = self._obj_with_exclusions - if self.axis == 1: - return obj.T._data, 1 - else: - return obj._data, 1 - def _insert_inaxis_grouper_inplace(self, result): # zip in reverse so we can always insert at loc 0 izip = zip( @@ -1549,19 +1485,23 @@ def _insert_inaxis_grouper_inplace(self, result): if in_axis: result.insert(0, name, lev) - def _wrap_aggregated_output(self, output, names=None): + def _wrap_aggregated_output( + self, + output: Dict[int, np.ndarray], + names: Optional[List[Union[Hashable, Tuple[Hashable, ...]]]] = None, + ) -> DataFrame: + index = self.grouper.result_index + result = DataFrame(output, index) + agg_axis = 0 if self.axis == 1 else 1 agg_labels = self._obj_with_exclusions._get_axis(agg_axis) - - output_keys = self._decide_output_index(output, agg_labels) + output_keys = self._decide_output_index(output, agg_labels, names) + result.columns = output_keys if not self.as_index: - result = DataFrame(output, columns=output_keys) self._insert_inaxis_grouper_inplace(result) result = result._consolidate() - else: - index = self.grouper.result_index - result = DataFrame(output, index=index, columns=output_keys) + result = result.reset_index(drop=True) if self.axis == 1: result = result.T @@ -1571,24 +1511,6 @@ def _wrap_aggregated_output(self, output, names=None): def _wrap_transformed_output(self, output, names=None): return DataFrame(output, index=self.obj.index) - def _wrap_agged_blocks(self, items, blocks): - if not self.as_index: - index = np.arange(blocks[0].values.shape[-1]) - mgr = BlockManager(blocks, [items, index]) - result = DataFrame(mgr) - - self._insert_inaxis_grouper_inplace(result) - result = result._consolidate() - else: - index = self.grouper.result_index - mgr = BlockManager(blocks, [items, index]) - result = DataFrame(mgr) - - if self.axis == 1: - result = result.T - - return self._reindex_output(result)._convert(datetime=True) - def _iterate_column_groupbys(self): for i, colname in enumerate(self._selected_obj.columns): yield colname, SeriesGroupBy( @@ -1616,20 +1538,27 @@ def count(self): DataFrame Count of values within each group. """ - data, _ = self._get_data_to_aggregate() + output = OrderedDict() + names = [] + + # TODO: dispatch to _cython_agg_general instead of custom looping + # TODO: refactor with series logic ids, _, ngroups = self.grouper.group_info - mask = ids != -1 - val = ( - (mask & ~_isna_ndarraylike(np.atleast_2d(blk.get_values()))) - for blk in data.blocks - ) - loc = (blk.mgr_locs for blk in data.blocks) + if self.axis == 0: + iter_obj = self._obj_with_exclusions + else: + iter_obj = self._obj_with_exclusions.T - counter = partial(lib.count_level_2d, labels=ids, max_bin=ngroups, axis=1) - blk = map(make_block, map(counter, val), loc) + for index, (name, obj) in enumerate(iter_obj.items()): + mask = (ids != -1) & ~isna(obj) + ids = ensure_platform_int(ids) + minlength = ngroups or 0 + out = np.bincount(ids[mask], minlength=minlength) + output[index] = out + names.append(name) - return self._wrap_agged_blocks(data.items, list(blk)) + return self._wrap_aggregated_output(output, names=names) def nunique(self, dropna=True): """ diff --git a/pandas/core/groupby/groupby.py b/pandas/core/groupby/groupby.py index e93ce3ce93164..e7f29e99af44f 100644 --- a/pandas/core/groupby/groupby.py +++ b/pandas/core/groupby/groupby.py @@ -692,7 +692,7 @@ def __iter__(self): Generator yielding sequence of (name, subsetted object) for each group """ - return self.grouper.get_iterator(self.obj, axis=self.axis) + return self.grouper.get_iterator(self._selected_obj, axis=self.axis) @Appender( _apply_docs["template"].format( diff --git a/pandas/tests/groupby/test_categorical.py b/pandas/tests/groupby/test_categorical.py index fcc0aa3b1c015..7303ffe688475 100644 --- a/pandas/tests/groupby/test_categorical.py +++ b/pandas/tests/groupby/test_categorical.py @@ -300,7 +300,7 @@ def test_observed(observed): exp_index = CategoricalIndex( list("ab"), name="cat", categories=list("abc"), ordered=True ) - expected = DataFrame({"ints": [1.5, 1.5], "val": [20.0, 30]}, index=exp_index) + expected = DataFrame({"ints": [1.5, 1.5], "val": [20, 30]}, index=exp_index) if not observed: index = CategoricalIndex( list("abc"), name="cat", categories=list("abc"), ordered=True diff --git a/pandas/tests/groupby/test_function.py b/pandas/tests/groupby/test_function.py index afb22a732691c..ce65eb20eb853 100644 --- a/pandas/tests/groupby/test_function.py +++ b/pandas/tests/groupby/test_function.py @@ -179,7 +179,13 @@ def test_arg_passthru(): tm.assert_index_equal(result.columns, expected_columns_numeric) result = f(numeric_only=False) - tm.assert_frame_equal(result.reindex_like(expected), expected) + + # TODO: median isn't implemented for DTI but was working blockwise before? + if attr == "median": + new_expected = expected.drop(columns=["datetime", "datetimetz"]) + tm.assert_frame_equal(result, new_expected) + else: + tm.assert_frame_equal(result.reindex_like(expected), expected) # TODO: min, max *should* handle # categorical (ordered) dtype