From 1448a65e656a270caf4c6d3ba4eee5de04d0c4e1 Mon Sep 17 00:00:00 2001 From: "Chayim I. Kirshen" Date: Sun, 21 Nov 2021 13:17:31 +0200 Subject: [PATCH 1/6] Adding support for non-decodable commands Some commands (i.e DUMP) should never have their response decoded, as they return binaries, not encoded blobs fixes #1254 --- redis/client.py | 8 +++++++- redis/commands/core.py | 5 ++++- redis/connection.py | 15 +++++++++------ redis/sentinel.py | 4 ++-- 4 files changed, 22 insertions(+), 10 deletions(-) diff --git a/redis/client.py b/redis/client.py index dc6693d111..b218c1d0d9 100755 --- a/redis/client.py +++ b/redis/client.py @@ -27,6 +27,9 @@ SYM_EMPTY = b'' EMPTY_RESPONSE = 'EMPTY_RESPONSE' +# some responses (ie. dump) are binary, and just meant to never be decoded +NEVER_DECODE = 'NEVER_DECODE' + def timestamp_to_datetime(response): "Converts a unix timestamp to a Python datetime object" @@ -1081,7 +1084,10 @@ def execute_command(self, *args, **options): def parse_response(self, connection, command_name, **options): "Parses a response from the Redis server" try: - response = connection.read_response() + if NEVER_DECODE in options: + response = connection.read_response(disable_decoding=True) + else: + response = connection.read_response() except ResponseError: if EMPTY_RESPONSE in options: return options[EMPTY_RESPONSE] diff --git a/redis/commands/core.py b/redis/commands/core.py index 516e7d9c83..09fcc5d2aa 100644 --- a/redis/commands/core.py +++ b/redis/commands/core.py @@ -801,7 +801,10 @@ def dump(self, name): Return a serialized version of the value stored at the specified key. If key does not exist a nil bulk reply is returned. """ - return self.execute_command('DUMP', name) + from redis.client import NEVER_DECODE + options = {} + options[NEVER_DECODE] = [] + return self.execute_command('DUMP', name, **options) def exists(self, *names): "Returns the number of ``names`` that exist" diff --git a/redis/connection.py b/redis/connection.py index e01742d70e..3de87aeb40 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -314,7 +314,7 @@ def on_disconnect(self): def can_read(self, timeout): return self._buffer and self._buffer.can_read(timeout) - def read_response(self): + def read_response(self, disable_decoding=False): raw = self._buffer.readline() if not raw: raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) @@ -354,8 +354,9 @@ def read_response(self): length = int(response) if length == -1: return None - response = [self.read_response() for i in range(length)] - if isinstance(response, bytes): + response = [self.read_response(disable_decoding=disable_decoding) + for i in range(length)] + if isinstance(response, bytes) and disable_decoding is False: response = self.encoder.decode(response) return response @@ -449,7 +450,7 @@ def read_from_socket(self, timeout=SENTINEL, raise_on_timeout=True): if custom_timeout: sock.settimeout(self._socket_timeout) - def read_response(self): + def read_response(self, disable_decoding=False): if not self._reader: raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) @@ -742,10 +743,12 @@ def can_read(self, timeout=0): self.connect() return self._parser.can_read(timeout) - def read_response(self): + def read_response(self, disable_decoding=False): """Read the response from a previously sent command""" try: - response = self._parser.read_response() + response = self._parser.read_response( + disable_decoding=disable_decoding + ) except socket.timeout: self.disconnect() raise TimeoutError("Timeout reading from %s:%s" % diff --git a/redis/sentinel.py b/redis/sentinel.py index 17dd75bf46..3efd58fa39 100644 --- a/redis/sentinel.py +++ b/redis/sentinel.py @@ -51,9 +51,9 @@ def connect(self): continue raise SlaveNotFoundError # Never be here - def read_response(self): + def read_response(self, disable_decoding=False): try: - return super().read_response() + return super().read_response(disable_decoding=disable_decoding) except ReadOnlyError: if self.connection_pool.is_master: # When talking to a master, a ReadOnlyError when likely From dcf9bdd7f60bebd3792b1736ae109fcd308c80ec Mon Sep 17 00:00:00 2001 From: AvitalFineRedis Date: Tue, 23 Nov 2021 10:08:07 +0100 Subject: [PATCH 2/6] Support `SYNC` and `PSYNC` --- redis/commands/core.py | 6 ++++++ tests/test_commands.py | 12 ++++++++++++ 2 files changed, 18 insertions(+) diff --git a/redis/commands/core.py b/redis/commands/core.py index 5fad0b65f1..2b6b55c574 100644 --- a/redis/commands/core.py +++ b/redis/commands/core.py @@ -3487,6 +3487,12 @@ def replicaof(self, *args): """ return self.execute_command('REPLICAOF', *args) + def sync(self): + return self.execute_command('SYNC') + + def psync(self, replicationid, offset): + return self.execute_command('PSYNC', replicationid, offset) + def eval(self, script, numkeys, *keys_and_args): """ Execute the Lua ``script``, specifying the ``numkeys`` the script diff --git a/tests/test_commands.py b/tests/test_commands.py index dbd04429b4..fe5aa1ee22 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -3741,6 +3741,18 @@ def test_replicaof(self, r): assert r.replicaof("NO ONE") assert r.replicaof("NO", "ONE") + @skip_if_server_version_lt('2.8.0') + def test_sync(self, r): + r2 = redis.Redis(port=6380, decode_responses=True) + res = r2.sync() + assert b'REDIS' in res + + @skip_if_server_version_lt('2.8.0') + def test_psync(self, r): + r2 = redis.Redis(port=6380, decode_responses=True) + res = r2.psync(r2.client_id(), 1) + assert b'FULLRESYNC' in res + class TestBinarySave: From 0047fe15786fdcaaa84e7628879d4b6ceafb778d Mon Sep 17 00:00:00 2001 From: AvitalFineRedis Date: Tue, 23 Nov 2021 10:11:17 +0100 Subject: [PATCH 3/6] decode response --- redis/commands/core.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/redis/commands/core.py b/redis/commands/core.py index 8ebdd781c8..ca2621e7db 100644 --- a/redis/commands/core.py +++ b/redis/commands/core.py @@ -3491,10 +3491,16 @@ def replicaof(self, *args): return self.execute_command('REPLICAOF', *args) def sync(self): - return self.execute_command('SYNC') + from redis.client import NEVER_DECODE + options = {} + options[NEVER_DECODE] = [] + return self.execute_command('SYNC', **options) def psync(self, replicationid, offset): - return self.execute_command('PSYNC', replicationid, offset) + from redis.client import NEVER_DECODE + options = {} + options[NEVER_DECODE] = [] + return self.execute_command('PSYNC', replicationid, offset, **options) def eval(self, script, numkeys, *keys_and_args): """ From 37dfc9fa4e9b3f756f9da7c29f016d5b2fce3aa1 Mon Sep 17 00:00:00 2001 From: AvitalFineRedis Date: Tue, 23 Nov 2021 10:15:45 +0100 Subject: [PATCH 4/6] docstring --- redis/commands/core.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/redis/commands/core.py b/redis/commands/core.py index ca2621e7db..a4ad1f44f3 100644 --- a/redis/commands/core.py +++ b/redis/commands/core.py @@ -3491,12 +3491,23 @@ def replicaof(self, *args): return self.execute_command('REPLICAOF', *args) def sync(self): + """ + Initiates a replication stream from the master. + + For more information check https://redis.io/commands/sync + """ from redis.client import NEVER_DECODE options = {} options[NEVER_DECODE] = [] return self.execute_command('SYNC', **options) def psync(self, replicationid, offset): + """ + Initiates a replication stream from the master. + Newer version for `sync`. + + For more information check https://redis.io/commands/sync + """ from redis.client import NEVER_DECODE options = {} options[NEVER_DECODE] = [] From e9119657c59d6bbaa8ad9c119ad3bb6f88c75add Mon Sep 17 00:00:00 2001 From: Avital Fine <79420960+AvitalFineRedis@users.noreply.github.com> Date: Mon, 29 Nov 2021 09:29:20 +0100 Subject: [PATCH 5/6] Update core.py --- redis/commands/core.py | 26 -------------------------- 1 file changed, 26 deletions(-) diff --git a/redis/commands/core.py b/redis/commands/core.py index 5ed9231528..96e35d8964 100644 --- a/redis/commands/core.py +++ b/redis/commands/core.py @@ -3568,37 +3568,11 @@ def pubsub_numsub(self, *args): return self.execute_command('PUBSUB NUMSUB', *args) -<<<<<<<<< Temporary merge branch 1 - def sync(self): - """ - Initiates a replication stream from the master. - - For more information check https://redis.io/commands/sync - """ - from redis.client import NEVER_DECODE - options = {} - options[NEVER_DECODE] = [] - return self.execute_command('SYNC', **options) - - def psync(self, replicationid, offset): - """ - Initiates a replication stream from the master. - Newer version for `sync`. - - For more information check https://redis.io/commands/sync - """ - from redis.client import NEVER_DECODE - options = {} - options[NEVER_DECODE] = [] - return self.execute_command('PSYNC', replicationid, offset, **options) - -========= class ScriptCommands: """ Redis Lua script commands. see: https://redis.com/ebook/part-3-next-steps/chapter-11-scripting-redis-with-lua/ """ ->>>>>>>>> Temporary merge branch 2 def eval(self, script, numkeys, *keys_and_args): """ Execute the Lua ``script``, specifying the ``numkeys`` the script From 64ae74a58e5f31f8e2744228e1feb1e05c91ec72 Mon Sep 17 00:00:00 2001 From: "Chayim I. Kirshen" Date: Wed, 15 Dec 2021 16:00:39 +0200 Subject: [PATCH 6/6] Connection change --- redis/commands/core.py | 6 ++++-- redis/connection.py | 2 +- tests/test_commands.py | 12 ++++++------ 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/redis/commands/core.py b/redis/commands/core.py index 1ae94d86d7..21f9f61d6e 100644 --- a/redis/commands/core.py +++ b/redis/commands/core.py @@ -644,9 +644,10 @@ def sync(self): For more information check https://redis.io/commands/sync """ from redis.client import NEVER_DECODE + options = {} options[NEVER_DECODE] = [] - return self.execute_command('SYNC', **options) + return self.execute_command("SYNC", **options) def psync(self, replicationid, offset): """ @@ -656,9 +657,10 @@ def psync(self, replicationid, offset): For more information check https://redis.io/commands/sync """ from redis.client import NEVER_DECODE + options = {} options[NEVER_DECODE] = [] - return self.execute_command('PSYNC', replicationid, offset, **options) + return self.execute_command("PSYNC", replicationid, offset, **options) def swapdb(self, first, second, **kwargs): """ diff --git a/redis/connection.py b/redis/connection.py index 2001c6447b..dc04f8f240 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -382,7 +382,7 @@ def __del__(self): except Exception: pass - def on_connect(self, connection): + def on_connect(self, connection, **kwargs): self._sock = connection._sock self._socket_timeout = connection.socket_timeout kwargs = { diff --git a/tests/test_commands.py b/tests/test_commands.py index a7b67d3b64..b8dc69f9eb 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -4151,17 +4151,17 @@ def test_replicaof(self, r): assert r.replicaof("NO ONE") assert r.replicaof("NO", "ONE") - @skip_if_server_version_lt('2.8.0') + @skip_if_server_version_lt("2.8.0") def test_sync(self, r): - r2 = redis.Redis(port=6380, decode_responses=True) + r2 = redis.Redis(port=6380, decode_responses=False) res = r2.sync() - assert b'REDIS' in res + assert b"REDIS" in res - @skip_if_server_version_lt('2.8.0') + @skip_if_server_version_lt("2.8.0") def test_psync(self, r): - r2 = redis.Redis(port=6380, decode_responses=True) + r2 = redis.Redis(port=6380, decode_responses=False) res = r2.psync(r2.client_id(), 1) - assert b'FULLRESYNC' in res + assert b"FULLRESYNC" in res @pytest.mark.onlynoncluster