Skip to content

4.1.4 release cherry-picks #1994

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ on:
- '**/*.md'
branches:
- master
- '[0-9].[0-9]'
pull_request:
branches:
- master
- '[0-9].[0-9]'

jobs:

Expand Down
52 changes: 31 additions & 21 deletions redis/commands/graph/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from redis.exceptions import ResponseError

from .exceptions import VersionMismatchException
from .execution_plan import ExecutionPlan
from .query_result import QueryResult


Expand Down Expand Up @@ -118,27 +119,6 @@ def flush(self):
self.nodes = {}
self.edges = []

def explain(self, query, params=None):
"""
Get the execution plan for given query,
Returns an array of operations.
For more information see `GRAPH.EXPLAIN <https://oss.redis.com/redisgraph/master/commands/#graphexplain>`_. # noqa

Args:

query:
The query that will be executed.
params: dict
Query parameters.
"""
if params is not None:
query = self._build_params_header(params) + query

plan = self.execute_command("GRAPH.EXPLAIN", self.name, query)
if isinstance(plan[0], bytes):
plan = [b.decode() for b in plan]
return "\n".join(plan)

def bulk(self, **kwargs):
"""Internal only. Not supported."""
raise NotImplementedError(
Expand Down Expand Up @@ -200,3 +180,33 @@ def list_keys(self):
For more information see `GRAPH.LIST <https://oss.redis.com/redisgraph/master/commands/#graphlist>`_. # noqa
"""
return self.execute_command("GRAPH.LIST")

def execution_plan(self, query, params=None):
"""
Get the execution plan for given query,
GRAPH.EXPLAIN returns an array of operations.

Args:
query: the query that will be executed
params: query parameters
"""
if params is not None:
query = self._build_params_header(params) + query

plan = self.execute_command("GRAPH.EXPLAIN", self.name, query)
return "\n".join(plan)

def explain(self, query, params=None):
"""
Get the execution plan for given query,
GRAPH.EXPLAIN returns ExecutionPlan object.

Args:
query: the query that will be executed
params: query parameters
"""
if params is not None:
query = self._build_params_header(params) + query

plan = self.execute_command("GRAPH.EXPLAIN", self.name, query)
return ExecutionPlan(plan)
208 changes: 208 additions & 0 deletions redis/commands/graph/execution_plan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
import re


class ProfileStats:
"""
ProfileStats, runtime execution statistics of operation.
"""

def __init__(self, records_produced, execution_time):
self.records_produced = records_produced
self.execution_time = execution_time


class Operation:
"""
Operation, single operation within execution plan.
"""

def __init__(self, name, args=None, profile_stats=None):
"""
Create a new operation.

Args:
name: string that represents the name of the operation
args: operation arguments
profile_stats: profile statistics
"""
self.name = name
self.args = args
self.profile_stats = profile_stats
self.children = []

def append_child(self, child):
if not isinstance(child, Operation) or self is child:
raise Exception("child must be Operation")

self.children.append(child)
return self

def child_count(self):
return len(self.children)

def __eq__(self, o: object) -> bool:
if not isinstance(o, Operation):
return False

return self.name == o.name and self.args == o.args

def __str__(self) -> str:
args_str = "" if self.args is None else " | " + self.args
return f"{self.name}{args_str}"


class ExecutionPlan:
"""
ExecutionPlan, collection of operations.
"""

def __init__(self, plan):
"""
Create a new execution plan.

Args:
plan: array of strings that represents the collection operations
the output from GRAPH.EXPLAIN
"""
if not isinstance(plan, list):
raise Exception("plan must be an array")

self.plan = plan
self.structured_plan = self._operation_tree()

def _compare_operations(self, root_a, root_b):
"""
Compare execution plan operation tree

Return: True if operation trees are equal, False otherwise
"""

# compare current root
if root_a != root_b:
return False

# make sure root have the same number of children
if root_a.child_count() != root_b.child_count():
return False

# recursively compare children
for i in range(root_a.child_count()):
if not self._compare_operations(root_a.children[i], root_b.children[i]):
return False

return True

def __str__(self) -> str:
def aggraget_str(str_children):
return "\n".join(
[
" " + line
for str_child in str_children
for line in str_child.splitlines()
]
)

def combine_str(x, y):
return f"{x}\n{y}"

return self._operation_traverse(
self.structured_plan, str, aggraget_str, combine_str
)

def __eq__(self, o: object) -> bool:
"""Compares two execution plans

Return: True if the two plans are equal False otherwise
"""
# make sure 'o' is an execution-plan
if not isinstance(o, ExecutionPlan):
return False

# get root for both plans
root_a = self.structured_plan
root_b = o.structured_plan

# compare execution trees
return self._compare_operations(root_a, root_b)

def _operation_traverse(self, op, op_f, aggregate_f, combine_f):
"""
Traverse operation tree recursively applying functions

Args:
op: operation to traverse
op_f: function applied for each operation
aggregate_f: aggregation function applied for all children of a single operation
combine_f: combine function applied for the operation result and the children result
""" # noqa
# apply op_f for each operation
op_res = op_f(op)
if len(op.children) == 0:
return op_res # no children return
else:
# apply _operation_traverse recursively
children = [
self._operation_traverse(child, op_f, aggregate_f, combine_f)
for child in op.children
]
# combine the operation result with the children aggregated result
return combine_f(op_res, aggregate_f(children))

def _operation_tree(self):
"""Build the operation tree from the string representation"""

# initial state
i = 0
level = 0
stack = []
current = None

def _create_operation(args):
profile_stats = None
name = args[0].strip()
args.pop(0)
if len(args) > 0 and "Records produced" in args[-1]:
records_produced = int(
re.search("Records produced: (\\d+)", args[-1]).group(1)
)
execution_time = float(
re.search("Execution time: (\\d+.\\d+) ms", args[-1]).group(1)
)
profile_stats = ProfileStats(records_produced, execution_time)
args.pop(-1)
return Operation(
name, None if len(args) == 0 else args[0].strip(), profile_stats
)

# iterate plan operations
while i < len(self.plan):
current_op = self.plan[i]
op_level = current_op.count(" ")
if op_level == level:
# if the operation level equal to the current level
# set the current operation and move next
child = _create_operation(current_op.split("|"))
if current:
current = stack.pop()
current.append_child(child)
current = child
i += 1
elif op_level == level + 1:
# if the operation is child of the current operation
# add it as child and set as current operation
child = _create_operation(current_op.split("|"))
current.append_child(child)
stack.append(current)
current = child
level += 1
i += 1
elif op_level < level:
# if the operation is not child of current operation
# go back to it's parent operation
levels_back = level - op_level + 1
for _ in range(levels_back):
current = stack.pop()
level -= levels_back
else:
raise Exception("corrupted plan")
return stack[0]
42 changes: 35 additions & 7 deletions redis/commands/search/commands.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import itertools
import time
from typing import Dict, Union

from ..helpers import parse_to_dict
from ._util import to_string
Expand Down Expand Up @@ -377,7 +378,17 @@ def info(self):
it = map(to_string, res)
return dict(zip(it, it))

def _mk_query_args(self, query):
def get_params_args(self, query_params: Dict[str, Union[str, int, float]]):
args = []
if len(query_params) > 0:
args.append("params")
args.append(len(query_params) * 2)
for key, value in query_params.items():
args.append(key)
args.append(value)
return args

def _mk_query_args(self, query, query_params: Dict[str, Union[str, int, float]]):
args = [self.index_name]

if isinstance(query, str):
Expand All @@ -387,9 +398,16 @@ def _mk_query_args(self, query):
raise ValueError(f"Bad query type {type(query)}")

args += query.get_args()
if query_params is not None:
args += self.get_params_args(query_params)

return args, query

def search(self, query):
def search(
self,
query: Union[str, Query],
query_params: Dict[str, Union[str, int, float]] = None,
):
"""
Search the index for a given query, and return a result of documents

Expand All @@ -401,7 +419,7 @@ def search(self, query):

For more information: https://oss.redis.com/redisearch/Commands/#ftsearch
""" # noqa
args, query = self._mk_query_args(query)
args, query = self._mk_query_args(query, query_params=query_params)
st = time.time()
res = self.execute_command(SEARCH_CMD, *args)

Expand All @@ -413,18 +431,26 @@ def search(self, query):
with_scores=query._with_scores,
)

def explain(self, query):
def explain(
self,
query=Union[str, Query],
query_params: Dict[str, Union[str, int, float]] = None,
):
"""Returns the execution plan for a complex query.

For more information: https://oss.redis.com/redisearch/Commands/#ftexplain
""" # noqa
args, query_text = self._mk_query_args(query)
args, query_text = self._mk_query_args(query, query_params=query_params)
return self.execute_command(EXPLAIN_CMD, *args)

def explain_cli(self, query): # noqa
def explain_cli(self, query: Union[str, Query]): # noqa
raise NotImplementedError("EXPLAINCLI will not be implemented.")

def aggregate(self, query):
def aggregate(
self,
query: Union[str, Query],
query_params: Dict[str, Union[str, int, float]] = None,
):
"""
Issue an aggregation query.

Expand All @@ -445,6 +471,8 @@ def aggregate(self, query):
cmd = [CURSOR_CMD, "READ", self.index_name] + query.build_args()
else:
raise ValueError("Bad query", query)
if query_params is not None:
cmd += self.get_params_args(query_params)

raw = self.execute_command(*cmd)
return self._get_AggregateResult(raw, query, has_cursor)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
long_description_content_type="text/markdown",
keywords=["Redis", "key-value store", "database"],
license="MIT",
version="4.1.3",
version="4.1.4",
packages=find_packages(
include=[
"redis",
Expand Down
Loading