From 9ba0c868f5116319a365c12a6bb6f401261d15ee Mon Sep 17 00:00:00 2001 From: Brock Date: Mon, 20 Jul 2020 11:13:34 -0700 Subject: [PATCH 1/6] REF: Avoid post-processing in blockwise op --- pandas/core/groupby/generic.py | 97 ++++++++++++++++------------------ 1 file changed, 47 insertions(+), 50 deletions(-) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index ec7b14f27c5a1..b7aa186b64ad8 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -1031,11 +1031,36 @@ def _cython_agg_blocks( agg_blocks: List[Block] = [] new_items: List[np.ndarray] = [] deleted_items: List[np.ndarray] = [] - # Some object-dtype blocks might be split into List[Block[T], Block[U]] - split_items: List[np.ndarray] = [] - split_frames: List[DataFrame] = [] no_result = object() + + def cast_result_block(result, block: "Block", how: str) -> "Block": + # see if we can cast the block to the desired dtype + # this may not be the original dtype + assert not isinstance(result, DataFrame) + assert result is not no_result + + dtype = maybe_cast_result_dtype(block.dtype, how) + result = maybe_downcast_numeric(result, dtype) + + if block.is_extension and isinstance(result, np.ndarray): + # e.g. block.values was an IntegerArray + # (1, N) case can occur if block.values was Categorical + # and result is ndarray[object] + # TODO(EA2D): special casing not needed with 2D EAs + assert result.ndim == 1 or result.shape[0] == 1 + try: + # Cast back if feasible + result = type(block.values)._from_sequence( + result.ravel(), dtype=block.values.dtype + ) + except (ValueError, TypeError): + # reshape to be valid for non-Extension Block + result = result.reshape(1, -1) + + agg_block: Block = block.make_block(result) + return agg_block + for block in data.blocks: # Avoid inheriting result from earlier in the loop result = no_result @@ -1067,9 +1092,9 @@ def _cython_agg_blocks( # not try to add missing categories if grouping over multiple # Categoricals. This will done by later self._reindex_output() # Doing it here creates an error. See GH#34951 - s = get_groupby(obj, self.grouper, observed=True) + sgb = get_groupby(obj, self.grouper, observed=True) try: - result = s.aggregate(lambda x: alt(x, axis=self.axis)) + result = sgb.aggregate(lambda x: alt(x, axis=self.axis)) except TypeError: # we may have an exception in trying to aggregate # continue and exclude the block @@ -1083,54 +1108,26 @@ def _cython_agg_blocks( # about a single block input returning a single block output # is a lie. To keep the code-path for the typical non-split case # clean, we choose to clean up this mess later on. - split_items.append(locs) - split_frames.append(result) - continue - - assert len(result._mgr.blocks) == 1 - result = result._mgr.blocks[0].values - if isinstance(result, np.ndarray) and result.ndim == 1: - result = result.reshape(1, -1) - - assert not isinstance(result, DataFrame) - - if result is not no_result: - # see if we can cast the block to the desired dtype - # this may not be the original dtype - dtype = maybe_cast_result_dtype(block.dtype, how) - result = maybe_downcast_numeric(result, dtype) - - if block.is_extension and isinstance(result, np.ndarray): - # e.g. block.values was an IntegerArray - # (1, N) case can occur if block.values was Categorical - # and result is ndarray[object] - # TODO(EA2D): special casing not needed with 2D EAs - assert result.ndim == 1 or result.shape[0] == 1 - try: - # Cast back if feasible - result = type(block.values)._from_sequence( - result.ravel(), dtype=block.values.dtype - ) - except (ValueError, TypeError): - # reshape to be valid for non-Extension Block - result = result.reshape(1, -1) - - agg_block: Block = block.make_block(result) - - new_items.append(locs) - agg_blocks.append(agg_block) + assert len(locs) == result.shape[1] + for i, loc in enumerate(locs): + new_items.append(np.array([loc], dtype=locs.dtype)) + agg_block = result.iloc[:, [i]]._mgr.blocks[0] + agg_blocks.append(agg_block) + else: + result = result._mgr.blocks[0].values + if isinstance(result, np.ndarray) and result.ndim == 1: + result = result.reshape(1, -1) + agg_block = cast_result_block(result, block, how) + new_items.append(locs) + agg_blocks.append(agg_block) + else: + agg_block = cast_result_block(result, block, how) + new_items.append(locs) + agg_blocks.append(agg_block) - if not (agg_blocks or split_frames): + if not agg_blocks: raise DataError("No numeric types to aggregate") - if split_items: - # Clean up the mess left over from split blocks. - for locs, result in zip(split_items, split_frames): - assert len(locs) == result.shape[1] - for i, loc in enumerate(locs): - new_items.append(np.array([loc], dtype=locs.dtype)) - agg_blocks.append(result.iloc[:, [i]]._mgr.blocks[0]) - # reset the locs in the blocks to correspond to our # current ordering indexer = np.concatenate(new_items) From 288ca6d43df104dd67cdc6c91b1d2eacce96430e Mon Sep 17 00:00:00 2001 From: Brock Date: Mon, 20 Jul 2020 12:09:37 -0700 Subject: [PATCH 2/6] REF: implement blk_func --- pandas/core/groupby/generic.py | 36 ++++++++++++++++++++----------- pandas/core/internals/managers.py | 17 +++++++++++++++ 2 files changed, 40 insertions(+), 13 deletions(-) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index b7aa186b64ad8..3728e7862c28e 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -1029,7 +1029,6 @@ def _cython_agg_blocks( data = data.get_numeric_data(copy=False) agg_blocks: List[Block] = [] - new_items: List[np.ndarray] = [] deleted_items: List[np.ndarray] = [] no_result = object() @@ -1061,8 +1060,7 @@ def cast_result_block(result, block: "Block", how: str) -> "Block": agg_block: Block = block.make_block(result) return agg_block - for block in data.blocks: - # Avoid inheriting result from earlier in the loop + def blk_func(block): result = no_result locs = block.mgr_locs.as_array try: @@ -1078,8 +1076,7 @@ def cast_result_block(result, block: "Block", how: str) -> "Block": # we cannot perform the operation # in an alternate way, exclude the block assert how == "ohlc" - deleted_items.append(locs) - continue + raise # call our grouper again with only this block obj = self.obj[data.items[locs]] @@ -1098,8 +1095,7 @@ def cast_result_block(result, block: "Block", how: str) -> "Block": except TypeError: # we may have an exception in trying to aggregate # continue and exclude the block - deleted_items.append(locs) - continue + raise else: result = cast(DataFrame, result) # unwrap DataFrame to get array @@ -1109,21 +1105,35 @@ def cast_result_block(result, block: "Block", how: str) -> "Block": # is a lie. To keep the code-path for the typical non-split case # clean, we choose to clean up this mess later on. assert len(locs) == result.shape[1] + new_blocks = [] for i, loc in enumerate(locs): - new_items.append(np.array([loc], dtype=locs.dtype)) agg_block = result.iloc[:, [i]]._mgr.blocks[0] - agg_blocks.append(agg_block) + new_blocks.append(agg_block) else: result = result._mgr.blocks[0].values if isinstance(result, np.ndarray) and result.ndim == 1: result = result.reshape(1, -1) agg_block = cast_result_block(result, block, how) - new_items.append(locs) - agg_blocks.append(agg_block) + new_blocks = [agg_block] else: agg_block = cast_result_block(result, block, how) - new_items.append(locs) - agg_blocks.append(agg_block) + new_blocks = [agg_block] + return new_blocks + + skipped = [] + new_items: List[np.ndarray] = [] + for i, block in enumerate(data.blocks): + try: + nbs = blk_func(block) + except (NotImplementedError, TypeError): + # TypeError -> we may have an exception in trying to aggregate + # continue and exclude the block + # NotImplementedError -> "ohlc" with wrong dtype + skipped.append(i) + deleted_items.append(block.mgr_locs.as_array) + else: + agg_blocks.extend(nbs) + new_items.append(block.mgr_locs.as_array) if not agg_blocks: raise DataError("No numeric types to aggregate") diff --git a/pandas/core/internals/managers.py b/pandas/core/internals/managers.py index 895385b170c91..148f8ee8c02ee 100644 --- a/pandas/core/internals/managers.py +++ b/pandas/core/internals/managers.py @@ -353,6 +353,23 @@ def operate_blockwise(self, other: "BlockManager", array_op) -> "BlockManager": """ return operate_blockwise(self, other, array_op) + def transform(self: T, func, ignore_failures: bool = False) -> T: + """Similar to apply, but not necessarily length-preserving.""" + result_blocks = [] + failed = [] + + for blk in self.blocks: + try: + res = blk.apply(func) + except Exception: + if not ignore_failures: + raise + failed.append(blk) + else: + result_blocks.extend(res) + + raise NotImplementedError + def apply(self: T, f, align_keys=None, **kwargs) -> T: """ Iterate over the blocks, collect and create a new BlockManager. From 6b33a5c376b4498fbeead8a552dc1800d320f137 Mon Sep 17 00:00:00 2001 From: Brock Date: Sat, 8 Aug 2020 10:44:17 -0700 Subject: [PATCH 3/6] revert unnecessary --- pandas/core/internals/managers.py | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/pandas/core/internals/managers.py b/pandas/core/internals/managers.py index fda856ee8a0d6..aa74d173d69b3 100644 --- a/pandas/core/internals/managers.py +++ b/pandas/core/internals/managers.py @@ -352,23 +352,6 @@ def operate_blockwise(self, other: "BlockManager", array_op) -> "BlockManager": """ return operate_blockwise(self, other, array_op) - def transform(self: T, func, ignore_failures: bool = False) -> T: - """Similar to apply, but not necessarily length-preserving.""" - result_blocks = [] - failed = [] - - for blk in self.blocks: - try: - res = blk.apply(func) - except Exception: - if not ignore_failures: - raise - failed.append(blk) - else: - result_blocks.extend(res) - - raise NotImplementedError - def apply(self: T, f, align_keys=None, **kwargs) -> T: """ Iterate over the blocks, collect and create a new BlockManager. From dbf5592c122b24635c05440c5b8105fd5184d515 Mon Sep 17 00:00:00 2001 From: Brock Date: Mon, 10 Aug 2020 14:28:13 -0700 Subject: [PATCH 4/6] typing --- pandas/core/groupby/generic.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index 90b9e33c76d66..d306ec050480a 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -1058,7 +1058,9 @@ def cast_result_block(result, block: "Block", how: str) -> "Block": agg_block: Block = block.make_block(result) return agg_block - def blk_func(block): + def blk_func(block: Block) -> List[Block]: + new_blocks: List[Block] + result = no_result locs = block.mgr_locs.as_array try: @@ -1118,7 +1120,7 @@ def blk_func(block): new_blocks = [agg_block] return new_blocks - skipped = [] + skipped: List[int] = [] new_items: List[np.ndarray] = [] for i, block in enumerate(data.blocks): try: From 20a618d86078091459f546912b1166d3b59c3ade Mon Sep 17 00:00:00 2001 From: Brock Date: Mon, 10 Aug 2020 16:43:10 -0700 Subject: [PATCH 5/6] typing fixup --- pandas/core/groupby/generic.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index d306ec050480a..423489ef74da3 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -1026,7 +1026,7 @@ def _cython_agg_blocks( if numeric_only: data = data.get_numeric_data(copy=False) - agg_blocks: List[Block] = [] + agg_blocks: List["Block"] = [] deleted_items: List[np.ndarray] = [] no_result = object() @@ -1055,11 +1055,11 @@ def cast_result_block(result, block: "Block", how: str) -> "Block": # reshape to be valid for non-Extension Block result = result.reshape(1, -1) - agg_block: Block = block.make_block(result) + agg_block: "Block" = block.make_block(result) return agg_block - def blk_func(block: Block) -> List[Block]: - new_blocks: List[Block] + def blk_func(block: "Block") -> List["Block"]: + new_blocks: List["Block"] result = no_result locs = block.mgr_locs.as_array From 836c60b40d097a9b6a360e9f84d75158913f1a35 Mon Sep 17 00:00:00 2001 From: Brock Date: Wed, 12 Aug 2020 08:20:55 -0700 Subject: [PATCH 6/6] define new_blocks earlier --- pandas/core/groupby/generic.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pandas/core/groupby/generic.py b/pandas/core/groupby/generic.py index 423489ef74da3..b7280a9f7db3c 100644 --- a/pandas/core/groupby/generic.py +++ b/pandas/core/groupby/generic.py @@ -1059,7 +1059,7 @@ def cast_result_block(result, block: "Block", how: str) -> "Block": return agg_block def blk_func(block: "Block") -> List["Block"]: - new_blocks: List["Block"] + new_blocks: List["Block"] = [] result = no_result locs = block.mgr_locs.as_array @@ -1105,7 +1105,6 @@ def blk_func(block: "Block") -> List["Block"]: # is a lie. To keep the code-path for the typical non-split case # clean, we choose to clean up this mess later on. assert len(locs) == result.shape[1] - new_blocks = [] for i, loc in enumerate(locs): agg_block = result.iloc[:, [i]]._mgr.blocks[0] new_blocks.append(agg_block)