Skip to content

Commit 971ed8f

Browse files
committed
Using ExecutionPlan with GRAPH.EXPLAIN, and added an execution plan interface.
syncing with latest redisgraph-py, prior to its deprecation
1 parent e6ccc9c commit 971ed8f

File tree

3 files changed

+351
-36
lines changed

3 files changed

+351
-36
lines changed

redis/commands/graph/commands.py

Lines changed: 31 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from redis.exceptions import ResponseError
33

44
from .exceptions import VersionMismatchException
5+
from .execution_plan import ExecutionPlan
56
from .query_result import QueryResult
67

78

@@ -118,27 +119,6 @@ def flush(self):
118119
self.nodes = {}
119120
self.edges = []
120121

121-
def explain(self, query, params=None):
122-
"""
123-
Get the execution plan for given query,
124-
Returns an array of operations.
125-
For more information see `GRAPH.EXPLAIN <https://oss.redis.com/redisgraph/master/commands/#graphexplain>`_. # noqa
126-
127-
Args:
128-
129-
query:
130-
The query that will be executed.
131-
params: dict
132-
Query parameters.
133-
"""
134-
if params is not None:
135-
query = self._build_params_header(params) + query
136-
137-
plan = self.execute_command("GRAPH.EXPLAIN", self.name, query)
138-
if isinstance(plan[0], bytes):
139-
plan = [b.decode() for b in plan]
140-
return "\n".join(plan)
141-
142122
def bulk(self, **kwargs):
143123
"""Internal only. Not supported."""
144124
raise NotImplementedError(
@@ -200,3 +180,33 @@ def list_keys(self):
200180
For more information see `GRAPH.LIST <https://oss.redis.com/redisgraph/master/commands/#graphlist>`_. # noqa
201181
"""
202182
return self.execute_command("GRAPH.LIST")
183+
184+
def execution_plan(self, query, params=None):
185+
"""
186+
Get the execution plan for given query,
187+
GRAPH.EXPLAIN returns an array of operations.
188+
189+
Args:
190+
query: the query that will be executed
191+
params: query parameters
192+
"""
193+
if params is not None:
194+
query = self._build_params_header(params) + query
195+
196+
plan = self.execute_command("GRAPH.EXPLAIN", self.name, query)
197+
return "\n".join(plan)
198+
199+
def explain(self, query, params=None):
200+
"""
201+
Get the execution plan for given query,
202+
GRAPH.EXPLAIN returns ExecutionPlan object.
203+
204+
Args:
205+
query: the query that will be executed
206+
params: query parameters
207+
"""
208+
if params is not None:
209+
query = self._build_params_header(params) + query
210+
211+
plan = self.execute_command("GRAPH.EXPLAIN", self.name, query)
212+
return ExecutionPlan(plan)
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
import re
2+
3+
4+
class ProfileStats:
5+
"""
6+
ProfileStats, runtime execution statistics of operation.
7+
"""
8+
9+
def __init__(self, records_produced, execution_time):
10+
self.records_produced = records_produced
11+
self.execution_time = execution_time
12+
13+
14+
class Operation:
15+
"""
16+
Operation, single operation within execution plan.
17+
"""
18+
19+
def __init__(self, name, args=None, profile_stats=None):
20+
"""
21+
Create a new operation.
22+
23+
Args:
24+
name: string that represents the name of the operation
25+
args: operation arguments
26+
profile_stats: profile statistics
27+
"""
28+
self.name = name
29+
self.args = args
30+
self.profile_stats = profile_stats
31+
self.children = []
32+
33+
def append_child(self, child):
34+
if not isinstance(child, Operation) or self is child:
35+
raise Exception("child must be Operation")
36+
37+
self.children.append(child)
38+
return self
39+
40+
def child_count(self):
41+
return len(self.children)
42+
43+
def __eq__(self, o: object) -> bool:
44+
if not isinstance(o, Operation):
45+
return False
46+
47+
return self.name == o.name and self.args == o.args
48+
49+
def __str__(self) -> str:
50+
args_str = "" if self.args is None else " | " + self.args
51+
return f"{self.name}{args_str}"
52+
53+
54+
class ExecutionPlan:
55+
"""
56+
ExecutionPlan, collection of operations.
57+
"""
58+
59+
def __init__(self, plan):
60+
"""
61+
Create a new execution plan.
62+
63+
Args:
64+
plan: array of strings that represents the collection operations
65+
the output from GRAPH.EXPLAIN
66+
"""
67+
if not isinstance(plan, list):
68+
raise Exception("plan must be an array")
69+
70+
self.plan = plan
71+
self.structured_plan = self._operation_tree()
72+
73+
def _compare_operations(self, root_a, root_b):
74+
"""
75+
Compare execution plan operation tree
76+
77+
Return: True if operation trees are equal, False otherwise
78+
"""
79+
80+
# compare current root
81+
if root_a != root_b:
82+
return False
83+
84+
# make sure root have the same number of children
85+
if root_a.child_count() != root_b.child_count():
86+
return False
87+
88+
# recursively compare children
89+
for i in range(root_a.child_count()):
90+
if not self._compare_operations(root_a.children[i], root_b.children[i]):
91+
return False
92+
93+
return True
94+
95+
def __str__(self) -> str:
96+
def aggraget_str(str_children):
97+
return "\n".join(
98+
[
99+
" " + line
100+
for str_child in str_children
101+
for line in str_child.splitlines()
102+
]
103+
)
104+
105+
def combine_str(x, y):
106+
return f"{x}\n{y}"
107+
108+
return self._operation_traverse(
109+
self.structured_plan, str, aggraget_str, combine_str
110+
)
111+
112+
def __eq__(self, o: object) -> bool:
113+
"""Compares two execution plans
114+
115+
Return: True if the two plans are equal False otherwise
116+
"""
117+
# make sure 'o' is an execution-plan
118+
if not isinstance(o, ExecutionPlan):
119+
return False
120+
121+
# get root for both plans
122+
root_a = self.structured_plan
123+
root_b = o.structured_plan
124+
125+
# compare execution trees
126+
return self._compare_operations(root_a, root_b)
127+
128+
def _operation_traverse(self, op, op_f, aggregate_f, combine_f):
129+
"""
130+
Traverse operation tree recursively applying functions
131+
132+
Args:
133+
op: operation to traverse
134+
op_f: function applied for each operation
135+
aggregate_f: aggregation function applied for all children of a single operation
136+
combine_f: combine function applied for the operation result and the children result
137+
""" # noqa
138+
# apply op_f for each operation
139+
op_res = op_f(op)
140+
if len(op.children) == 0:
141+
return op_res # no children return
142+
else:
143+
# apply _operation_traverse recursively
144+
children = [
145+
self._operation_traverse(child, op_f, aggregate_f, combine_f)
146+
for child in op.children
147+
]
148+
# combine the operation result with the children aggregated result
149+
return combine_f(op_res, aggregate_f(children))
150+
151+
def _operation_tree(self):
152+
"""Build the operation tree from the string representation"""
153+
154+
# initial state
155+
i = 0
156+
level = 0
157+
stack = []
158+
current = None
159+
160+
def _create_operation(args):
161+
profile_stats = None
162+
name = args[0].strip()
163+
args.pop(0)
164+
if len(args) > 0 and "Records produced" in args[-1]:
165+
records_produced = int(
166+
re.search("Records produced: (\\d+)", args[-1]).group(1)
167+
)
168+
execution_time = float(
169+
re.search("Execution time: (\\d+.\\d+) ms", args[-1]).group(1)
170+
)
171+
profile_stats = ProfileStats(records_produced, execution_time)
172+
args.pop(-1)
173+
return Operation(
174+
name, None if len(args) == 0 else args[0].strip(), profile_stats
175+
)
176+
177+
# iterate plan operations
178+
while i < len(self.plan):
179+
current_op = self.plan[i]
180+
op_level = current_op.count(" ")
181+
if op_level == level:
182+
# if the operation level equal to the current level
183+
# set the current operation and move next
184+
child = _create_operation(current_op.split("|"))
185+
if current:
186+
current = stack.pop()
187+
current.append_child(child)
188+
current = child
189+
i += 1
190+
elif op_level == level + 1:
191+
# if the operation is child of the current operation
192+
# add it as child and set as current operation
193+
child = _create_operation(current_op.split("|"))
194+
current.append_child(child)
195+
stack.append(current)
196+
current = child
197+
level += 1
198+
i += 1
199+
elif op_level < level:
200+
# if the operation is not child of current operation
201+
# go back to it's parent operation
202+
levels_back = level - op_level + 1
203+
for _ in range(levels_back):
204+
current = stack.pop()
205+
level -= levels_back
206+
else:
207+
raise Exception("corrupted plan")
208+
return stack[0]

0 commit comments

Comments
 (0)