Skip to content

[ADD] util/update_table_from_dict #297

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions src/base/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,115 @@ def test_parallel_execute_retry_on_serialization_failure(self):
cr.execute(util.format_query(cr, "SELECT 1 FROM {}", TEST_TABLE_NAME))
self.assertFalse(cr.rowcount)

def test_update_one_col_from_dict(self):
TEST_TABLE_NAME = "_upgrade_update_one_col_from_dict_test_table"
N_ROWS = 10

cr = self._get_cr()

cr.execute(
util.format_query(
cr,
"""
DROP TABLE IF EXISTS {table};

CREATE TABLE {table} (
id SERIAL PRIMARY KEY,
col1 INTEGER,
col2 INTEGER
);

INSERT INTO {table} (col1, col2) SELECT v, v FROM GENERATE_SERIES(1, %s) as v;
""",
table=TEST_TABLE_NAME,
),
[N_ROWS],
)
mapping = {id: id * 2 for id in range(1, N_ROWS + 1, 2)}
util.update_table_from_dict(cr, TEST_TABLE_NAME, "col1", mapping)

cr.execute(
util.format_query(
cr,
"SELECT id FROM {table} WHERE col2 != id",
table=TEST_TABLE_NAME,
)
)
self.assertFalse(cr.rowcount, "unintended column 'col2' is affected")

cr.execute(
util.format_query(
cr,
"SELECT id FROM {table} WHERE col1 != id AND MOD(id, 2) = 0",
table=TEST_TABLE_NAME,
)
)
self.assertFalse(cr.rowcount, "unintended rows are affected")

cr.execute(
util.format_query(
cr,
"SELECT id FROM {table} WHERE col1 != 2 * id AND MOD(id, 2) = 1",
table=TEST_TABLE_NAME,
)
)
self.assertFalse(cr.rowcount, "partial/incorrect updates are performed")

def test_update_multiple_cols_from_dict(self):
TEST_TABLE_NAME = "_upgrade_update_multiple_cols_from_dict_test_table"
N_ROWS = 10

cr = self._get_cr()

cr.execute(
util.format_query(
cr,
"""
DROP TABLE IF EXISTS {table};

CREATE TABLE {table} (
id SERIAL PRIMARY KEY,
col1 INTEGER,
col2 INTEGER,
col3 INTEGER
);

INSERT INTO {table} (col1, col2, col3) SELECT v, v, v FROM GENERATE_SERIES(1, %s) as v;
""",
table=TEST_TABLE_NAME,
),
[N_ROWS],
)
mapping = {id: [id * 2, id * 3] for id in range(1, N_ROWS + 1, 2)}
util.update_table_from_dict(cr, TEST_TABLE_NAME, ["col1", "col2"], mapping)

cr.execute(
util.format_query(
cr,
"SELECT id FROM {table} WHERE col3 != id",
table=TEST_TABLE_NAME,
)
)
self.assertFalse(cr.rowcount, "unintended column 'col3' is affected")

cr.execute(
util.format_query(
cr,
"SELECT id FROM {table} WHERE col1 != id AND MOD(id, 2) = 0",
table=TEST_TABLE_NAME,
)
)
self.assertFalse(cr.rowcount, "unintended rows are affected")

cr.execute(
util.format_query(
cr,
"SELECT id FROM {table} WHERE (col1 != 2 * id OR col2 != 3 * id) AND MOD(id, 2) = 1",
table=TEST_TABLE_NAME,
)
)
self.assertFalse(cr.rowcount, "partial/incorrect updates are performed")

def test_create_column_with_fk(self):
cr = self.env.cr
self.assertFalse(util.column_exists(cr, "res_partner", "_test_lang_id"))
Expand Down
70 changes: 70 additions & 0 deletions src/util/pg.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import psycopg2
from psycopg2 import errorcodes, sql
from psycopg2.extensions import quote_ident
from psycopg2.extras import Json

try:
from odoo.modules import module as odoo_module
Expand Down Expand Up @@ -1621,3 +1622,72 @@ def create_id_sequence(cr, table, set_as_default=True):
table=table_sql,
)
)


def update_table_from_dict(cr, table, columns, mapping, key_col="id"):
"""
Update table based on mapping.

Each `mapping` entry defines the new values for the specified `columns` for the row(s) whose `key_col` value matches the key.

.. example::

.. code-block:: python

# single column update
util.update_table_from_dict(
cr,
"account_move",
"always_tax_eligible",
{
1: True,
2: False,
},
)

# multi-column update
util.update_table_from_dict(
cr,
"account_move",
["closing_return_id", "always_tax_eligible"],
{
1: [2, True],
2: [3, False],
},
)

:param str table: database's table to perform the update of
:param str | list[str] columns: table's columns to update
:param dict[any, any | list[any]] mapping: matches values of `key_col` to new values for each of `columns` (in the **same** order)
:param str key_col: `ŧable`'s column to match against keys of `mapping`
"""
_validate_table(table)
Copy link
Contributor

@KangOl KangOl Jul 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also like to have a validation of the mapping

Suggested change
_validate_table(table)
_validate_table(table)
assert isinstance(mapping, dict) and all(isinstance(key, int) and isinstance(value, (list, tuple)) for key, value in mapping.items())

Copy link
Contributor Author

@Pirols Pirols Jul 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to keep key_col and limit its potential targets to integer columns or remove it entirely?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think key_col should always be an int, but I like to have it variable. It would allow to update a table based in a FK instead of a PK. To decide you can grep the repo and see if we currently use the dict trick for other columns than id... I think we did but my memory could betray me.

Copy link
Contributor Author

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, then maybe it's worth taking the type from the key_col in the table directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should not be necessary since JSONB_EACH returns the key as a text1.
Unless we want to validate the type of the keys of mapping. In which case I would need to define (or reuse, if already available) a mapping from pg types to python types. Wdyt?

Footnotes

  1. https://www.postgresql.org/docs/current/functions-json.html#FUNCTIONS-JSON-PROCESSING-TABLE

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well I just noticed you are casting the key_col to text before comparing with the actual json dict key, then it's fine. :) What I had in mind was casting the dict key to the column type. In principle my suggestion was to block/fail if we get an invalid json dict (incompatible with the table), I have no strong opinion against current approach TBH. @KangOl wdyt?

if not columns or not mapping:
return

assert isinstance(mapping, dict)
if isinstance(columns, str):
columns = [columns]
else:
n_columns = len(columns)
assert all(isinstance(value, (list, tuple)) and len(value) == n_columns for value in mapping.values())

query = format_query(
cr,
"""
UPDATE {table} t
SET ({cols}) = ROW({cols_values})
FROM JSONB_EACH(%s) m
WHERE t.{key_col}::text = m.key
""",
table=table,
cols=ColumnList.from_unquoted(cr, columns),
cols_values=SQLStr(
", ".join(
"(m.value->>{:d})::{}".format(col_idx, column_type(cr, table, col_name))
for col_idx, col_name in enumerate(columns)
)
),
key_col=key_col,
)
cr.execute(query, [Json(mapping)])