Skip to content

Commit e579d7b

Browse files
committed
[FIX] spreadsheet: batch process spreadsheet_revision.commands
Some dbs have `spreadsheet_revision` records with over 10 millions characters in `commands`. If the number of record is high, this leads to memory errors. We distribute them in buckets of `memory_cap` maximum size, and use a named cursor to process them in buckets. Commands larger than `memory_cap` fit in one bucket.
1 parent 86409e5 commit e579d7b

File tree

1 file changed

+54
-24
lines changed

1 file changed

+54
-24
lines changed

src/util/spreadsheet/misc.py

Lines changed: 54 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,62 @@
1-
from .. import json
1+
from .. import json, pg
2+
3+
MEMORY_CAP = 2 * 10**8 # 200MB
24

35

46
def iter_commands(cr, like_all=(), like_any=()):
57
if not (bool(like_all) ^ bool(like_any)):
68
raise ValueError("Please specify `like_all` or `like_any`, not both")
7-
cr.execute(
8-
"""
9-
SELECT id,
10-
commands
11-
FROM spreadsheet_revision
12-
WHERE commands LIKE {}(%s::text[])
13-
""".format("ALL" if like_all else "ANY"),
14-
[list(like_all or like_any)],
15-
)
16-
for revision_id, data in cr.fetchall():
17-
data_loaded = json.loads(data)
18-
if "commands" not in data_loaded:
19-
continue
20-
data_old = json.dumps(data_loaded, sort_keys=True)
21-
22-
changed = yield data_loaded["commands"]
23-
if changed is None:
24-
changed = data_old != json.dumps(data_loaded, sort_keys=True)
25-
26-
if changed:
27-
cr.execute(
28-
"UPDATE spreadsheet_revision SET commands=%s WHERE id=%s", [json.dumps(data_loaded), revision_id]
29-
)
9+
10+
with pg.named_cursor(cr, itersize=1) as ncr:
11+
ncr.execute(
12+
pg.format_query(
13+
cr,
14+
"""
15+
WITH filtered AS (
16+
SELECT id,
17+
commands,
18+
LENGTH(commands) AS commands_length
19+
FROM spreadsheet_revision
20+
WHERE commands LIKE {condition} (%s::text[])
21+
), smaller AS (
22+
SELECT id,
23+
commands,
24+
sum(commands_length) OVER (ORDER BY id) / %s AS num
25+
FROM filtered
26+
WHERE commands_length <= %s
27+
)
28+
SELECT array_agg(id ORDER BY id),
29+
array_agg(commands ORDER BY id)
30+
FROM smaller
31+
GROUP BY num
32+
33+
UNION ALL
34+
35+
SELECT ARRAY[id],
36+
ARRAY[commands]
37+
FROM filtered
38+
WHERE commands_length > %s
39+
""",
40+
condition=pg.SQLStr("ALL" if like_all else "ANY"),
41+
),
42+
[list(like_any or like_all), MEMORY_CAP, MEMORY_CAP, MEMORY_CAP],
43+
)
44+
for ids, commands in ncr:
45+
for revision_id, data in zip(ids, commands):
46+
data_loaded = json.loads(data)
47+
if "commands" not in data_loaded:
48+
continue
49+
data_old = json.dumps(data_loaded, sort_keys=True)
50+
51+
changed = yield data_loaded["commands"]
52+
if changed is None:
53+
changed = data_old != json.dumps(data_loaded, sort_keys=True)
54+
55+
if changed:
56+
cr.execute(
57+
"UPDATE spreadsheet_revision SET commands=%s WHERE id=%s",
58+
[json.dumps(data_loaded), revision_id],
59+
)
3060

3161

3262
def process_commands(cr, callback, *args, **kwargs):

0 commit comments

Comments
 (0)