From dad144bc40f983aa92758e690b0a24bf39e8c610 Mon Sep 17 00:00:00 2001 From: Bar Shaul Date: Sat, 10 Sep 2022 20:13:31 +0300 Subject: [PATCH 01/14] Cluster&AsyncCluster: Removed handling of timeouts/connection errors within the cluster loop, fixed "cannot pickle '_thread.lock' object" bug, added client's side failover handling improvements --- redis/__init__.py | 2 + redis/asyncio/__init__.py | 2 + redis/asyncio/cluster.py | 98 +++++++-------- redis/asyncio/connection.py | 2 +- redis/backoff.py | 15 ++- redis/cluster.py | 187 ++++++++++++++--------------- tests/test_asyncio/test_cluster.py | 39 ++++++ tests/test_cluster.py | 44 +++++++ 8 files changed, 234 insertions(+), 155 deletions(-) diff --git a/redis/__init__.py b/redis/__init__.py index b7560a6715..99690b1205 100644 --- a/redis/__init__.py +++ b/redis/__init__.py @@ -1,5 +1,6 @@ import sys +from redis.backoff import get_default_backoff from redis.client import Redis, StrictRedis from redis.cluster import RedisCluster from redis.connection import ( @@ -64,6 +65,7 @@ def int_or_str(value): "ConnectionPool", "DataError", "from_url", + "get_default_backoff", "InvalidResponse", "PubSubError", "ReadOnlyError", diff --git a/redis/asyncio/__init__.py b/redis/asyncio/__init__.py index 598791ac15..00a809a2a5 100644 --- a/redis/asyncio/__init__.py +++ b/redis/asyncio/__init__.py @@ -15,6 +15,7 @@ SentinelManagedSSLConnection, ) from redis.asyncio.utils import from_url +from redis.backoff import get_default_backoff from redis.exceptions import ( AuthenticationError, AuthenticationWrongNumberOfArgsError, @@ -43,6 +44,7 @@ "ConnectionPool", "DataError", "from_url", + "get_default_backoff", "InvalidResponse", "PubSubError", "ReadOnlyError", diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 8d34b9ad21..07d0688035 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -25,6 +25,8 @@ parse_url, ) from redis.asyncio.parser import CommandsParser +from redis.asyncio.retry import Retry +from redis.backoff import get_default_backoff from redis.client import EMPTY_RESPONSE, NEVER_DECODE, AbstractRedis from redis.cluster import ( PIPELINE_BLOCKED_COMMANDS, @@ -134,7 +136,10 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand or :class:`~.ConnectionError` or :class:`~.ClusterDownError` are encountered :param connection_error_retry_attempts: | Number of times to retry before reinitializing when :class:`~.TimeoutError` - or :class:`~.ConnectionError` are encountered + or :class:`~.ConnectionError` are encountered. + The default backoff strategy will be set if Retry object is not passed (see + get_default_backoff in backoff.py). To change it, pass a custom Retry object + using the "retry" keyword. :param max_connections: | Maximum number of connections per node. If there are no free connections & the maximum number of connections are already created, a @@ -214,7 +219,7 @@ def __init__( read_from_replicas: bool = False, reinitialize_steps: int = 10, cluster_error_retry_attempts: int = 3, - connection_error_retry_attempts: int = 5, + connection_error_retry_attempts: int = 3, max_connections: int = 2**31, # Client related kwargs db: Union[str, int] = 0, @@ -232,6 +237,8 @@ def __init__( socket_keepalive: bool = False, socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None, socket_timeout: Optional[float] = None, + retry: Optional["Retry"] = None, + retry_on_error: Optional[list[Exception]] = None, # SSL related kwargs ssl: bool = False, ssl_ca_certs: Optional[str] = None, @@ -278,6 +285,7 @@ def __init__( "socket_keepalive": socket_keepalive, "socket_keepalive_options": socket_keepalive_options, "socket_timeout": socket_timeout, + "retry": retry, } if ssl: @@ -298,6 +306,17 @@ def __init__( # Call our on_connect function to configure READONLY mode kwargs["redis_connect_func"] = self.on_connect + if retry or retry_on_error or connection_error_retry_attempts > 0: + # Set a retry object for all cluster nodes + self.retry = retry or Retry( + get_default_backoff(), connection_error_retry_attempts + ) + if retry_on_error is None: + # Default errors for retrying + retry_on_error = [ConnectionError, TimeoutError] + self.retry.update_supported_errors(retry_on_error) + kwargs.update({"retry": self.retry}) + kwargs["response_callbacks"] = self.__class__.RESPONSE_CALLBACKS.copy() self.connection_kwargs = kwargs @@ -318,8 +337,7 @@ def __init__( self.read_from_replicas = read_from_replicas self.reinitialize_steps = reinitialize_steps self.cluster_error_retry_attempts = cluster_error_retry_attempts - self.connection_error_retry_attempts = connection_error_retry_attempts - + self.retry = retry self.reinitialize_counter = 0 self.commands_parser = CommandsParser() self.node_flags = self.__class__.NODE_FLAGS.copy() @@ -614,9 +632,9 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any: if passed_targets and not self._is_node_flag(passed_targets): target_nodes = self._parse_target_nodes(passed_targets) target_nodes_specified = True - retry_attempts = 1 + retry_attempts = 0 - for _ in range(retry_attempts): + while True: if self._initialize: await self.initialize() try: @@ -654,17 +672,14 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any: ) return dict(zip(keys, values)) except Exception as e: - if type(e) in self.__class__.ERRORS_ALLOW_RETRY: - # The nodes and slots cache were reinitialized. + if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: + # The nodes and slots cache were should be reinitialized. # Try again with the new cluster setup. - exception = e + retry_attempts -= 1 + continue else: - # All other errors should be raised. - raise - - # If it fails the configured number of times then raise exception back - # to caller of this method - raise exception + # raise the exception + raise e async def _execute_command( self, target_node: "ClusterNode", *args: Union[KeyT, EncodableT], **kwargs: Any @@ -672,7 +687,6 @@ async def _execute_command( asking = moved = False redirect_addr = None ttl = self.RedisClusterRequestTTL - connection_error_retry_counter = 0 while ttl > 0: ttl -= 1 @@ -691,25 +705,18 @@ async def _execute_command( moved = False return await target_node.execute_command(*args, **kwargs) - except BusyLoadingError: + except (BusyLoadingError, MaxConnectionsError): raise except (ConnectionError, TimeoutError) as e: - # Give the node 0.25 seconds to get back up and retry again with the - # same node and configuration. After the defined number of attempts, try - # to reinitialize the cluster and try again. - connection_error_retry_counter += 1 - if ( - connection_error_retry_counter - < self.connection_error_retry_attempts - ): - await asyncio.sleep(0.25) - else: - if isinstance(e, MaxConnectionsError): - raise - # Hard force of reinitialize of the node/slots setup - # and try again with the new setup - await self.close() - raise + # Connection retries are being handled in the node's + # Retry object. + # Remove the failed node from the startup nodes before we try + # to reinitialize the cluster + self.nodes_manager.startup_nodes.pop(target_node.name, None) + # Hard force of reinitialize of the node/slots setup + # and try again with the new setup + await self.close() + raise except ClusterDownError: # ClusterDownError can occur during a failover and to get # self-healed, we will try to reinitialize the cluster layout @@ -1071,26 +1078,11 @@ async def initialize(self) -> None: ) cluster_slots = await startup_node.execute_command("CLUSTER SLOTS") startup_nodes_reachable = True - except (ConnectionError, TimeoutError) as e: + except Exception as e: + # Try the next startup node. + # The exception is saved and raised only if we have no more nodes. exception = e continue - except ResponseError as e: - # Isn't a cluster connection, so it won't parse these - # exceptions automatically - message = e.__str__() - if "CLUSTERDOWN" in message or "MASTERDOWN" in message: - continue - else: - raise RedisClusterException( - 'ERROR sending "cluster slots" command to redis ' - f"server: {startup_node}. error: {message}" - ) - except Exception as e: - message = e.__str__() - raise RedisClusterException( - 'ERROR sending "cluster slots" command to redis ' - f"server {startup_node.name}. error: {message}" - ) # CLUSTER SLOTS command results in the following output: # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]] @@ -1171,8 +1163,8 @@ async def initialize(self) -> None: if not startup_nodes_reachable: raise RedisClusterException( - "Redis Cluster cannot be connected. Please provide at least " - "one reachable node. " + f"Redis Cluster cannot be connected. Please provide at least " + f"one reachable node" ) from exception # Check if the slots are not fully covered diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index b64bd125eb..01da9a9caf 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -486,7 +486,7 @@ def __init__( retry_on_error.append(socket.timeout) retry_on_error.append(asyncio.TimeoutError) self.retry_on_error = retry_on_error - if retry_on_error: + if retry or retry_on_error: if not retry: self.retry = Retry(NoBackoff(), 1) else: diff --git a/redis/backoff.py b/redis/backoff.py index 5ccdb919f3..2836fdce30 100644 --- a/redis/backoff.py +++ b/redis/backoff.py @@ -1,6 +1,9 @@ import random from abc import ABC, abstractmethod +DEFAULT_CAP = 0.512 +DEFAULT_BASE = 0.008 + class AbstractBackoff(ABC): """Backoff interface""" @@ -40,7 +43,7 @@ def __init__(self): class ExponentialBackoff(AbstractBackoff): """Exponential backoff upon failure""" - def __init__(self, cap, base): + def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE): """ `cap`: maximum backoff time in seconds `base`: base backoff time in seconds @@ -55,7 +58,7 @@ def compute(self, failures): class FullJitterBackoff(AbstractBackoff): """Full jitter backoff upon failure""" - def __init__(self, cap, base): + def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE): """ `cap`: maximum backoff time in seconds `base`: base backoff time in seconds @@ -70,7 +73,7 @@ def compute(self, failures): class EqualJitterBackoff(AbstractBackoff): """Equal jitter backoff upon failure""" - def __init__(self, cap, base): + def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE): """ `cap`: maximum backoff time in seconds `base`: base backoff time in seconds @@ -86,7 +89,7 @@ def compute(self, failures): class DecorrelatedJitterBackoff(AbstractBackoff): """Decorrelated jitter backoff upon failure""" - def __init__(self, cap, base): + def __init__(self, cap=DEFAULT_CAP, base=DEFAULT_BASE): """ `cap`: maximum backoff time in seconds `base`: base backoff time in seconds @@ -103,3 +106,7 @@ def compute(self, failures): temp = random.uniform(self._base, max_backoff) self._previous_backoff = min(self._cap, temp) return self._previous_backoff + + +def get_default_backoff(): + return EqualJitterBackoff() diff --git a/redis/cluster.py b/redis/cluster.py index cee578b075..38ee7cceef 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -5,8 +5,9 @@ import threading import time from collections import OrderedDict -from typing import Any, Callable, Dict, Tuple, Union +from typing import Any, Callable, Dict, Optional, Tuple, Union +from redis.backoff import get_default_backoff from redis.client import CaseInsensitiveDict, PubSub, Redis, parse_scan from redis.commands import READ_COMMANDS, CommandsParser, RedisClusterCommands from redis.connection import ConnectionPool, DefaultParser, Encoder, parse_url @@ -29,6 +30,7 @@ TryAgainError, ) from redis.lock import Lock +from redis.retry import Retry from redis.utils import ( dict_merge, list_keys_to_dict, @@ -425,27 +427,30 @@ class initializer. In the case of conflicting arguments, querystring def __init__( self, - host=None, - port=6379, - startup_nodes=None, - cluster_error_retry_attempts=3, - require_full_coverage=False, - reinitialize_steps=10, - read_from_replicas=False, - dynamic_startup_nodes=True, - url=None, + host: Optional[str] = None, + port: int = 6379, + startup_nodes: Optional[list["ClusterNode"]] = None, + cluster_error_retry_attempts: int = 3, + connection_error_retry_attempts: int = 3, + retry: Optional["Retry"] = None, + retry_on_error: Optional[list[Exception]] = None, + require_full_coverage: bool = False, + reinitialize_steps: int = 10, + read_from_replicas: bool = False, + dynamic_startup_nodes: bool = True, + url: Optional[str] = None, **kwargs, ): """ Initialize a new RedisCluster client. - :startup_nodes: 'list[ClusterNode]' + :param startup_nodes: List of nodes from which initial bootstrapping can be done - :host: 'str' + :param host: Can be used to point to a startup node - :port: 'int' + :param port: Can be used to point to a startup node - :require_full_coverage: 'bool' + :param require_full_coverage: When set to False (default value): the client will not require a full coverage of the slots. However, if not all slots are covered, and at least one node has 'cluster-require-full-coverage' set to @@ -455,12 +460,12 @@ def __init__( When set to True: all slots must be covered to construct the cluster client. If not all slots are covered, RedisClusterException will be thrown. - :read_from_replicas: 'bool' + :param read_from_replicas: Enable read from replicas in READONLY mode. You can read possibly stale data. When set to true, read commands will be assigned between the primary and its replications in a Round-Robin manner. - :dynamic_startup_nodes: 'bool' + :param dynamic_startup_nodes: Set the RedisCluster's startup nodes to all of the discovered nodes. If true (default value), the cluster's discovered nodes will be used to determine the cluster nodes-slots mapping in the next topology refresh. @@ -468,9 +473,15 @@ def __init__( listed in the CLUSTER SLOTS output. If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists specific IP addresses, it is best to set it to false. - :cluster_error_retry_attempts: 'int' - Retry command execution attempts when encountering ClusterDownError - or ConnectionError + :param cluster_error_retry_attempts: + Number of times to retry before raising an error when :class:`~.TimeoutError` + or :class:`~.ConnectionError` or :class:`~.ClusterDownError` are encountered + :param connection_error_retry_attempts: + Number of times to retry before reinitializing when :class:`~.TimeoutError` + or :class:`~.ConnectionError` are encountered. + The default backoff strategy will be set if Retry object is not passed (see + get_default_backoff in backoff.py). To change it, pass a custom Retry object + using the "retry" keyword. :reinitialize_steps: 'int' Specifies the number of MOVED errors that need to occur before reinitializing the whole cluster topology. If a MOVED error occurs @@ -540,6 +551,17 @@ def __init__( kwargs.update({"redis_connect_func": self.on_connect}) kwargs = cleanup_kwargs(**kwargs) + if retry or retry_on_error or connection_error_retry_attempts > 0: + # Set a retry object for all cluster nodes + self.retry = retry or Retry( + get_default_backoff(), connection_error_retry_attempts + ) + if retry_on_error is None: + # Default errors for retrying + retry_on_error = [ConnectionError, TimeoutError] + self.retry.update_supported_errors(retry_on_error) + kwargs.update({"retry": self.retry}) + self.encoder = Encoder( kwargs.get("encoding", "utf-8"), kwargs.get("encoding_errors", "strict"), @@ -985,12 +1007,11 @@ def execute_command(self, *args, **kwargs): # nodes were passed to this function, we cannot retry the command # execution since the nodes may not be valid anymore after the tables # were reinitialized. So in case of passed target nodes, - # retry_attempts will be set to 1. + # retry_attempts will be set to 0. retry_attempts = ( - 1 if target_nodes_specified else self.cluster_error_retry_attempts + 0 if target_nodes_specified else self.cluster_error_retry_attempts ) - exception = None - for _ in range(0, retry_attempts): + while True: try: res = {} if not target_nodes_specified: @@ -1007,18 +1028,15 @@ def execute_command(self, *args, **kwargs): # Return the processed result return self._process_result(args[0], res, **kwargs) except Exception as e: - if type(e) in self.__class__.ERRORS_ALLOW_RETRY: + if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY: # The nodes and slots cache were reinitialized. # Try again with the new cluster setup. - exception = e + retry_attempts -= 1 + continue else: - # All other errors should be raised. + # raise the exception raise e - # If it fails the configured number of times then raise exception back - # to caller of this method - raise exception - def _execute_command(self, target_node, *args, **kwargs): """ Send a command to a node in the cluster @@ -1030,7 +1048,6 @@ def _execute_command(self, target_node, *args, **kwargs): asking = False moved = False ttl = int(self.RedisClusterRequestTTL) - connection_error_retry_counter = 0 while ttl > 0: ttl -= 1 @@ -1063,25 +1080,21 @@ def _execute_command(self, target_node, *args, **kwargs): except AuthenticationError: raise except (ConnectionError, TimeoutError) as e: + # Connection retries are being handled in the node's + # Retry object. # ConnectionError can also be raised if we couldn't get a # connection from the pool before timing out, so check that # this is an actual connection before attempting to disconnect. if connection is not None: connection.disconnect() - connection_error_retry_counter += 1 - - # Give the node 0.25 seconds to get back up and retry again - # with same node and configuration. After 5 attempts then try - # to reinitialize the cluster and see if the nodes - # configuration has changed or not - if connection_error_retry_counter < 5: - time.sleep(0.25) - else: - # Hard force of reinitialize of the node/slots setup - # and try again with the new setup - target_node.redis_connection = None - self.nodes_manager.initialize() - raise e + + # Remove the failed node from the startup nodes before we try + # to reinitialize the cluster + self.nodes_manager.startup_nodes.pop(target_node.name, None) + # Reset the cluster node's connection + target_node.redis_connection = None + self.nodes_manager.initialize() + raise e except MovedError as e: # First, we will try to patch the slots/nodes cache with the # redirected node output and try again. If MovedError exceeds @@ -1405,17 +1418,15 @@ def initialize(self): startup_nodes_reachable = False fully_covered = False kwargs = self.connection_kwargs + exception = None for startup_node in self.startup_nodes.values(): try: if startup_node.redis_connection: r = startup_node.redis_connection else: - # Create a new Redis connection and let Redis decode the - # responses so we won't need to handle that - copy_kwargs = copy.deepcopy(kwargs) - copy_kwargs.update({"decode_responses": True, "encoding": "utf-8"}) + # Create a new Redis connection r = self.create_redis_node( - startup_node.host, startup_node.port, **copy_kwargs + startup_node.host, startup_node.port, **kwargs ) self.startup_nodes[startup_node.name].redis_connection = r # Make sure cluster mode is enabled on this node @@ -1425,25 +1436,11 @@ def initialize(self): ) cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS")) startup_nodes_reachable = True - except (ConnectionError, TimeoutError): - continue - except ResponseError as e: - # Isn't a cluster connection, so it won't parse these - # exceptions automatically - message = e.__str__() - if "CLUSTERDOWN" in message or "MASTERDOWN" in message: - continue - else: - raise RedisClusterException( - 'ERROR sending "cluster slots" command to redis ' - f"server: {startup_node}. error: {message}" - ) except Exception as e: - message = e.__str__() - raise RedisClusterException( - 'ERROR sending "cluster slots" command to redis ' - f"server {startup_node.name}. error: {message}" - ) + # Try the next startup node. + # The exception is saved and raised only if we have no more nodes. + exception = e + continue # CLUSTER SLOTS command results in the following output: # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]] @@ -1513,9 +1510,9 @@ def initialize(self): if not startup_nodes_reachable: raise RedisClusterException( - "Redis Cluster cannot be connected. Please provide at least " - "one reachable node. " - ) + f"Redis Cluster cannot be connected. Please provide at least " + f"one reachable node" + ) from exception # Create Redis connections to all nodes self.create_redis_connections(list(tmp_nodes_cache.values())) @@ -1698,14 +1695,14 @@ class ClusterPipeline(RedisCluster): def __init__( self, - nodes_manager, - commands_parser, - result_callbacks=None, - cluster_response_callbacks=None, - startup_nodes=None, - read_from_replicas=False, - cluster_error_retry_attempts=5, - reinitialize_steps=10, + nodes_manager: "NodesManager", + commands_parser: "CommandsParser", + result_callbacks: Optional[dict[str, Callable]] = None, + cluster_response_callbacks: Optional[dict[str, Callable]] = None, + startup_nodes: Optional[list["ClusterNode"]] = None, + read_from_replicas: bool = False, + cluster_error_retry_attempts: int = 3, + reinitialize_steps: int = 10, lock=None, **kwargs, ): @@ -1857,22 +1854,22 @@ def send_cluster_commands( """ if not stack: return [] - - for _ in range(0, self.cluster_error_retry_attempts): + retry_attempts = self.cluster_error_retry_attempts + while True: try: return self._send_cluster_commands( stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections, ) - except ClusterDownError: - # Try again with the new cluster setup. All other errors - # should be raised. - pass - - # If it fails the configured number of times then raise - # exception back to caller of this method - raise ClusterDownError("CLUSTERDOWN error. Unable to rebuild the cluster") + except (ClusterDownError, ConnectionError) as e: + if retry_attempts > 0: + # Try again with the new cluster setup. All other errors + # should be raised. + retry_attempts -= 1 + pass + else: + raise e def _send_cluster_commands( self, stack, raise_on_error=True, allow_redirections=True @@ -1897,7 +1894,6 @@ def _send_cluster_commands( # we figure out the slot number that command maps to, then from # the slot determine the node. for c in attempt: - connection_error_retry_counter = 0 while True: # refer to our internal node -> slot table that # tells us where a given command should route to. @@ -1930,13 +1926,10 @@ def _send_cluster_commands( try: connection = get_connection(redis_node, c.args) except ConnectionError: - connection_error_retry_counter += 1 - if connection_error_retry_counter < 5: - # reinitialize the node -> slot table - self.nodes_manager.initialize() - continue - else: - raise + # Connection retries are being handled in the node's + # Retry object. Reinitialize the node -> slot table. + self.nodes_manager.initialize() + raise nodes[node_name] = NodeCommands( redis_node.parse_response, redis_node.connection_pool, diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index 27f11900c4..04f0b1a109 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -13,6 +13,8 @@ from redis.asyncio.cluster import ClusterNode, NodesManager, RedisCluster from redis.asyncio.connection import Connection, SSLConnection from redis.asyncio.parser import CommandsParser +from redis.asyncio.retry import Retry +from redis.backoff import get_default_backoff, ExponentialBackoff, NoBackoff from redis.cluster import PIPELINE_BLOCKED_COMMANDS, PRIMARY, REPLICA, get_node_name from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot from redis.exceptions import ( @@ -247,6 +249,43 @@ async def test_startup_nodes(self) -> None: ] ) + async def test_cluster_retry_object(self) -> None: + # Test default retry + rc_default = await RedisCluster("127.0.0.1", 16379) + retry = rc_default.connection_kwargs.get("retry") + assert isinstance(retry, Retry) + assert retry._retries == 3 + assert isinstance(retry._backoff, type(get_default_backoff())) + assert rc_default.get_node("127.0.0.1", 16379).connection_kwargs.get( + "retry" + ) == rc_default.get_node("127.0.0.1", 16380).connection_kwargs.get("retry") + + # Test custom retry + retry = Retry(ExponentialBackoff(10, 5), 5) + rc_custom_retry = await RedisCluster("127.0.0.1", 16379, retry=retry) + assert ( + rc_custom_retry.get_node("127.0.0.1", 16379).connection_kwargs.get("retry") + == retry + ) + + # Test no connection retries + rc_no_retries = await RedisCluster( + "127.0.0.1", 16379, connection_error_retry_attempts=0 + ) + assert ( + rc_no_retries.get_node("127.0.0.1", 16379).connection_kwargs.get("retry") + is None + ) + rc_no_retries = await RedisCluster( + "127.0.0.1", 16379, retry=Retry(NoBackoff(), 0) + ) + assert ( + rc_no_retries.get_node("127.0.0.1", 16379) + .connection_kwargs.get("retry") + ._retries + == 0 + ) + async def test_empty_startup_nodes(self) -> None: """ Test that exception is raised when empty providing empty startup_nodes diff --git a/tests/test_cluster.py b/tests/test_cluster.py index 5652673af9..48ae48b6d3 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -7,6 +7,7 @@ import pytest from redis import Redis +from redis.backoff import ExponentialBackoff, NoBackoff, get_default_backoff from redis.cluster import ( PRIMARY, REDIS_CLUSTER_HASH_SLOTS, @@ -31,6 +32,7 @@ ResponseError, TimeoutError, ) +from redis.retry import Retry from redis.utils import str_if_bytes from tests.test_pubsub import wait_for_message @@ -691,6 +693,48 @@ def moved_redirect_effect(connection, *args, **options): cur_node = r.get_node(node_name=node_name) assert conn == r.get_redis_connection(cur_node) + def test_cluster_retry_object(self) -> None: + # Test default retry + rc_default = RedisCluster("127.0.0.1", 16379) + retry = rc_default.get_connection_kwargs().get("retry") + assert isinstance(retry, Retry) + assert retry._retries == 3 + assert isinstance(retry._backoff, type(get_default_backoff())) + assert rc_default.get_node("127.0.0.1", 16379).get_connection_kwargs().get( + "retry" + ) == rc_default.get_node("127.0.0.1", 16380).get_connection_kwargs().get( + "retry" + ) + + # Test custom retry + retry = Retry(ExponentialBackoff(10, 5), 5) + rc_custom_retry = RedisCluster("127.0.0.1", 16379, retry=retry) + assert ( + rc_custom_retry.get_node("127.0.0.1", 16379) + .get_connection_kwargs() + .get("retry") + == retry + ) + + # Test no connection retries + rc_no_retries = RedisCluster( + "127.0.0.1", 16379, connection_error_retry_attempts=0 + ) + assert ( + rc_no_retries.get_node("127.0.0.1", 16379) + .get_connection_kwargs() + .get("retry") + is None + ) + rc_no_retries = RedisCluster("127.0.0.1", 16379, retry=Retry(NoBackoff(), 0)) + assert ( + rc_no_retries.get_node("127.0.0.1", 16379) + .get_connection_kwargs() + .get("retry") + ._retries + == 0 + ) + @pytest.mark.onlycluster class TestClusterRedisCommands: From 7ece39e95c6565f137ca13511152dcf9be5222c0 Mon Sep 17 00:00:00 2001 From: Bar Shaul Date: Sat, 10 Sep 2022 20:28:49 +0300 Subject: [PATCH 02/14] Fixed linters --- redis/asyncio/cluster.py | 6 +++--- redis/cluster.py | 10 +++++----- tests/test_asyncio/test_cluster.py | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 07d0688035..cc574b34e0 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -707,7 +707,7 @@ async def _execute_command( return await target_node.execute_command(*args, **kwargs) except (BusyLoadingError, MaxConnectionsError): raise - except (ConnectionError, TimeoutError) as e: + except (ConnectionError, TimeoutError): # Connection retries are being handled in the node's # Retry object. # Remove the failed node from the startup nodes before we try @@ -1163,8 +1163,8 @@ async def initialize(self) -> None: if not startup_nodes_reachable: raise RedisClusterException( - f"Redis Cluster cannot be connected. Please provide at least " - f"one reachable node" + "Redis Cluster cannot be connected. Please provide at least " + "one reachable node" ) from exception # Check if the slots are not fully covered diff --git a/redis/cluster.py b/redis/cluster.py index 38ee7cceef..08d8238c26 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -1,4 +1,3 @@ -import copy import random import socket import sys @@ -474,8 +473,9 @@ def __init__( If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists specific IP addresses, it is best to set it to false. :param cluster_error_retry_attempts: - Number of times to retry before raising an error when :class:`~.TimeoutError` - or :class:`~.ConnectionError` or :class:`~.ClusterDownError` are encountered + Number of times to retry before raising an error when + :class:`~.TimeoutError` or :class:`~.ConnectionError` or + :class:`~.ClusterDownError` are encountered :param connection_error_retry_attempts: Number of times to retry before reinitializing when :class:`~.TimeoutError` or :class:`~.ConnectionError` are encountered. @@ -1510,8 +1510,8 @@ def initialize(self): if not startup_nodes_reachable: raise RedisClusterException( - f"Redis Cluster cannot be connected. Please provide at least " - f"one reachable node" + "Redis Cluster cannot be connected. Please provide at least " + "one reachable node" ) from exception # Create Redis connections to all nodes diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index 04f0b1a109..09639de0e7 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -14,7 +14,7 @@ from redis.asyncio.connection import Connection, SSLConnection from redis.asyncio.parser import CommandsParser from redis.asyncio.retry import Retry -from redis.backoff import get_default_backoff, ExponentialBackoff, NoBackoff +from redis.backoff import ExponentialBackoff, NoBackoff, get_default_backoff from redis.cluster import PIPELINE_BLOCKED_COMMANDS, PRIMARY, REPLICA, get_node_name from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot from redis.exceptions import ( From 769f097b006edb47706409cc107d8718500297e4 Mon Sep 17 00:00:00 2001 From: Bar Shaul Date: Sun, 11 Sep 2022 09:24:54 +0300 Subject: [PATCH 03/14] Type fixes --- redis/asyncio/cluster.py | 6 +++--- redis/cluster.py | 16 ++++++++-------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index cc574b34e0..4af65d6ba1 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -238,7 +238,7 @@ def __init__( socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None, socket_timeout: Optional[float] = None, retry: Optional["Retry"] = None, - retry_on_error: Optional[list[Exception]] = None, + retry_on_error: Optional[List[Exception]] = None, # SSL related kwargs ssl: bool = False, ssl_ca_certs: Optional[str] = None, @@ -1163,8 +1163,8 @@ async def initialize(self) -> None: if not startup_nodes_reachable: raise RedisClusterException( - "Redis Cluster cannot be connected. Please provide at least " - "one reachable node" + f"Redis Cluster cannot be connected. Please provide at least " + f"one reachable node: {str(exception)}" ) from exception # Check if the slots are not fully covered diff --git a/redis/cluster.py b/redis/cluster.py index 08d8238c26..7f5e50da96 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -4,7 +4,7 @@ import threading import time from collections import OrderedDict -from typing import Any, Callable, Dict, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union from redis.backoff import get_default_backoff from redis.client import CaseInsensitiveDict, PubSub, Redis, parse_scan @@ -428,11 +428,11 @@ def __init__( self, host: Optional[str] = None, port: int = 6379, - startup_nodes: Optional[list["ClusterNode"]] = None, + startup_nodes: Optional[List["ClusterNode"]] = None, cluster_error_retry_attempts: int = 3, connection_error_retry_attempts: int = 3, retry: Optional["Retry"] = None, - retry_on_error: Optional[list[Exception]] = None, + retry_on_error: Optional[List[Exception]] = None, require_full_coverage: bool = False, reinitialize_steps: int = 10, read_from_replicas: bool = False, @@ -1510,8 +1510,8 @@ def initialize(self): if not startup_nodes_reachable: raise RedisClusterException( - "Redis Cluster cannot be connected. Please provide at least " - "one reachable node" + f"Redis Cluster cannot be connected. Please provide at least " + f"one reachable node: {str(exception)}" ) from exception # Create Redis connections to all nodes @@ -1697,9 +1697,9 @@ def __init__( self, nodes_manager: "NodesManager", commands_parser: "CommandsParser", - result_callbacks: Optional[dict[str, Callable]] = None, - cluster_response_callbacks: Optional[dict[str, Callable]] = None, - startup_nodes: Optional[list["ClusterNode"]] = None, + result_callbacks: Optional[Dict[str, Callable]] = None, + cluster_response_callbacks: Optional[Dict[str, Callable]] = None, + startup_nodes: Optional[List["ClusterNode"]] = None, read_from_replicas: bool = False, cluster_error_retry_attempts: int = 3, reinitialize_steps: int = 10, From d852bc9bfa43e41d35480e96e862042cbd28720b Mon Sep 17 00:00:00 2001 From: Bar Shaul Date: Sun, 11 Sep 2022 09:26:55 +0300 Subject: [PATCH 04/14] Added to CHANGES --- CHANGES | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGES b/CHANGES index f5c267bdda..80ae958c58 100644 --- a/CHANGES +++ b/CHANGES @@ -23,6 +23,8 @@ * ClusterPipeline Doesn't Handle ConnectionError for Dead Hosts (#2225) * Remove compatibility code for old versions of Hiredis, drop Packaging dependency * The `deprecated` library is no longer a dependency + * Failover handling improvements for RedisCluster and Async RedisCluster (#2377) + * Fixed "cannot pickle '_thread.lock' object" bug (#2354, #2297) * 4.1.3 (Feb 8, 2022) * Fix flushdb and flushall (#1926) From 1415633bbd9a07fb6eb5a12b23bf46919c8fb8b4 Mon Sep 17 00:00:00 2001 From: Bar Shaul Date: Sun, 11 Sep 2022 14:31:46 +0300 Subject: [PATCH 05/14] Added getter and setter for the client's retry object and added more tests --- redis/asyncio/client.py | 6 ++ redis/asyncio/cluster.py | 8 +++ redis/client.py | 7 +++ redis/cluster.py | 8 +++ tests/test_asyncio/test_cluster.py | 37 ++++++++--- tests/test_asyncio/test_retry.py | 18 +++++- tests/test_cluster.py | 99 +++++++++++++++++++++++++----- tests/test_retry.py | 12 +++- 8 files changed, 169 insertions(+), 26 deletions(-) diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index 0e40ed70f8..cee740f8ed 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -273,6 +273,12 @@ def get_connection_kwargs(self): """Get the connection's key-word arguments""" return self.connection_pool.connection_kwargs + def get_retry(self) -> Optional["Retry"]: + return self.get_connection_kwargs().get("retry") + + def set_retry(self, retry: "Retry") -> None: + self.get_connection_kwargs().update({"retry": retry}) + def load_external_module(self, funcname, func): """ This function can be used to add externally defined redis modules, diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 4af65d6ba1..3ae5d55f8f 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -495,6 +495,14 @@ def get_connection_kwargs(self) -> Dict[str, Optional[Any]]: """Get the kwargs passed to :class:`~redis.asyncio.connection.Connection`.""" return self.connection_kwargs + def get_retry(self) -> Optional["Retry"]: + return self.retry + + def set_retry(self, retry: "Retry") -> None: + self.retry = retry + for node in self.get_nodes(): + node.connection_kwargs.update({"retry": retry}) + def set_response_callback(self, command: str, callback: ResponseCallbackT) -> None: """Set a custom response callback.""" self.response_callbacks[command] = callback diff --git a/redis/client.py b/redis/client.py index 75a0dac226..040f9a8283 100755 --- a/redis/client.py +++ b/redis/client.py @@ -5,6 +5,7 @@ import time import warnings from itertools import chain +from typing import Optional from redis.commands import ( CoreCommands, @@ -1043,6 +1044,12 @@ def get_connection_kwargs(self): """Get the connection's key-word arguments""" return self.connection_pool.connection_kwargs + def get_retry(self) -> Optional["Retry"]: + return self.get_connection_kwargs().get("retry") + + def set_retry(self, retry: "Retry") -> None: + self.get_connection_kwargs().update({"retry": retry}) + def set_response_callback(self, command, callback): """Set a custom Response Callback""" self.response_callbacks[command] = callback diff --git a/redis/cluster.py b/redis/cluster.py index 7f5e50da96..9f25d3bb69 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -687,6 +687,14 @@ def set_default_node(self, node): self.nodes_manager.default_node = node return True + def get_retry(self) -> Optional["Retry"]: + return self.retry + + def set_retry(self, retry: "Retry") -> None: + self.retry = retry + for node in self.get_nodes(): + node.redis_connection.set_retry(retry) + def monitor(self, target_node=None): """ Returns a Monitor object for the specified target node. diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index 09639de0e7..1aaf90947e 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -249,9 +249,32 @@ async def test_startup_nodes(self) -> None: ] ) - async def test_cluster_retry_object(self) -> None: + async def test_cluster_get_set_retry_object(self, request): + retry = Retry(NoBackoff(), 2) + url = request.config.getoption("--redis-url") + r = await RedisCluster.from_url(url, retry=retry) + assert r.get_retry()._retries == retry._retries + assert isinstance(r.get_retry()._backoff, NoBackoff) + for node in r.get_nodes(): + n_retry = node.connection_kwargs.get("retry") + assert n_retry is not None + assert n_retry._retries == retry._retries + assert isinstance(n_retry._backoff, NoBackoff) + # Change retry policy + new_retry = Retry(ExponentialBackoff(), 3) + r.set_retry(new_retry) + assert r.get_retry()._retries == new_retry._retries + assert isinstance(r.get_retry()._backoff, ExponentialBackoff) + for node in r.get_nodes(): + n_retry = node.connection_kwargs.get("retry") + assert n_retry is not None + assert n_retry._retries == new_retry._retries + assert isinstance(n_retry._backoff, ExponentialBackoff) + + async def test_cluster_retry_object(self, request) -> None: + url = request.config.getoption("--redis-url") + rc_default = await RedisCluster.from_url(url) # Test default retry - rc_default = await RedisCluster("127.0.0.1", 16379) retry = rc_default.connection_kwargs.get("retry") assert isinstance(retry, Retry) assert retry._retries == 3 @@ -262,23 +285,21 @@ async def test_cluster_retry_object(self) -> None: # Test custom retry retry = Retry(ExponentialBackoff(10, 5), 5) - rc_custom_retry = await RedisCluster("127.0.0.1", 16379, retry=retry) + rc_custom_retry = await RedisCluster.from_url(url, retry=retry) assert ( rc_custom_retry.get_node("127.0.0.1", 16379).connection_kwargs.get("retry") == retry ) # Test no connection retries - rc_no_retries = await RedisCluster( - "127.0.0.1", 16379, connection_error_retry_attempts=0 + rc_no_retries = await RedisCluster.from_url( + url, connection_error_retry_attempts=0 ) assert ( rc_no_retries.get_node("127.0.0.1", 16379).connection_kwargs.get("retry") is None ) - rc_no_retries = await RedisCluster( - "127.0.0.1", 16379, retry=Retry(NoBackoff(), 0) - ) + rc_no_retries = await RedisCluster.from_url(url, retry=Retry(NoBackoff(), 0)) assert ( rc_no_retries.get_node("127.0.0.1", 16379) .connection_kwargs.get("retry") diff --git a/tests/test_asyncio/test_retry.py b/tests/test_asyncio/test_retry.py index 38e353bc36..8066e3070d 100644 --- a/tests/test_asyncio/test_retry.py +++ b/tests/test_asyncio/test_retry.py @@ -1,8 +1,9 @@ import pytest +from redis.asyncio import Redis from redis.asyncio.connection import Connection, UnixDomainSocketConnection from redis.asyncio.retry import Retry -from redis.backoff import AbstractBackoff, NoBackoff +from redis.backoff import AbstractBackoff, ExponentialBackoff, NoBackoff from redis.exceptions import ConnectionError, TimeoutError @@ -114,3 +115,18 @@ async def test_infinite_retry(self): assert self.actual_attempts == 5 assert self.actual_failures == 5 + + +class TestRedisClientRetry: + "Test the Redis client behavior with retries" + + async def test_get_set_retry_object(self, request): + retry = Retry(NoBackoff(), 2) + url = request.config.getoption("--redis-url") + r = await Redis.from_url(url, retry_on_timeout=True, retry=retry) + assert r.get_retry()._retries == retry._retries + assert isinstance(r.get_retry()._backoff, NoBackoff) + new_retry_policy = Retry(ExponentialBackoff(), 3) + r.set_retry(new_retry_policy) + assert r.get_retry()._retries == new_retry_policy._retries + assert isinstance(r.get_retry()._backoff, ExponentialBackoff) diff --git a/tests/test_cluster.py b/tests/test_cluster.py index 48ae48b6d3..d55e6161b9 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -360,6 +360,60 @@ def ok_response(connection, *args, **options): assert r.execute_command("SET", "foo", "bar") == "MOCK_OK" + def test_handling_cluster_failover_to_a_replica(self, r): + # Set the key we'll test for + key = "key" + r.set("key", "value") + primary = r.get_node_from_key(key, replica=False) + assert r.get("key") == "value" + # Get the current output of cluster slots + cluster_slots = primary.redis_connection.execute_command("CLUSTER SLOTS") + replica_host = "" + replica_port = 0 + # Replace one of the replicas to be the new primary based on the + # cluster slots output + for slot_range in cluster_slots: + primary_port = slot_range[2][1] + if primary_port == primary.port: + if len(slot_range) <= 3: + # cluster doesn't have a replica, return + return + replica_host = str_if_bytes(slot_range[3][0]) + replica_port = slot_range[3][1] + # replace replica and primary in the cluster slots output + tmp_node = slot_range[2] + slot_range[2] = slot_range[3] + slot_range[3] = tmp_node + break + + def raise_connection_error(): + raise ConnectionError("error") + + def mock_execute_command(*_args, **_kwargs): + if _args[0] == "CLUSTER SLOTS": + return cluster_slots + else: + raise Exception("Failed to mock cluster slots") + + # Mock connection error for the current primary + mock_node_resp_func(primary, raise_connection_error) + primary.redis_connection.set_retry(Retry(NoBackoff(), 1)) + + # Mock the cluster slots response for all other nodes + redis_mock_node = Mock() + redis_mock_node.execute_command.side_effect = mock_execute_command + # Mock response value for all other commands + redis_mock_node.parse_response.return_value = "MOCK_OK" + for node in r.get_nodes(): + if node.port != primary.port: + node.redis_connection = redis_mock_node + + assert r.get(key) == "MOCK_OK" + new_primary = r.get_node_from_key(key, replica=False) + assert new_primary.host == replica_host + assert new_primary.port == replica_port + assert r.get_node(primary.host, primary.port).server_type == REPLICA + def test_moved_redirection(self, request): """ Test that the client handles MOVED response. @@ -693,27 +747,43 @@ def moved_redirect_effect(connection, *args, **options): cur_node = r.get_node(node_name=node_name) assert conn == r.get_redis_connection(cur_node) - def test_cluster_retry_object(self) -> None: + def test_cluster_get_set_retry_object(self, request): + retry = Retry(NoBackoff(), 2) + r = _get_client(Redis, request, retry=retry) + assert r.get_retry()._retries == retry._retries + assert isinstance(r.get_retry()._backoff, NoBackoff) + for node in r.get_nodes(): + assert node.redis_connection.get_retry()._retries == retry._retries + assert isinstance(node.redis_connection.get_retry()._backoff, NoBackoff) + # Change retry policy + new_retry = Retry(ExponentialBackoff(), 3) + r.set_retry(new_retry) + assert r.get_retry()._retries == new_retry._retries + assert isinstance(r.get_retry()._backoff, ExponentialBackoff) + for node in r.get_nodes(): + assert node.redis_connection.get_retry()._retries == new_retry._retries + assert isinstance( + node.redis_connection.get_retry()._backoff, ExponentialBackoff + ) + + def test_cluster_retry_object(self, r) -> None: # Test default retry - rc_default = RedisCluster("127.0.0.1", 16379) - retry = rc_default.get_connection_kwargs().get("retry") + retry = r.get_connection_kwargs().get("retry") assert isinstance(retry, Retry) assert retry._retries == 3 assert isinstance(retry._backoff, type(get_default_backoff())) - assert rc_default.get_node("127.0.0.1", 16379).get_connection_kwargs().get( - "retry" - ) == rc_default.get_node("127.0.0.1", 16380).get_connection_kwargs().get( - "retry" - ) + node1 = r.get_node("127.0.0.1", 16379).redis_connection + node2 = r.get_node("127.0.0.1", 16380).redis_connection + assert node1.get_retry()._retries == node2.get_retry()._retries # Test custom retry retry = Retry(ExponentialBackoff(10, 5), 5) rc_custom_retry = RedisCluster("127.0.0.1", 16379, retry=retry) assert ( rc_custom_retry.get_node("127.0.0.1", 16379) - .get_connection_kwargs() - .get("retry") - == retry + .redis_connection.get_retry() + ._retries + == retry._retries ) # Test no connection retries @@ -721,16 +791,13 @@ def test_cluster_retry_object(self) -> None: "127.0.0.1", 16379, connection_error_retry_attempts=0 ) assert ( - rc_no_retries.get_node("127.0.0.1", 16379) - .get_connection_kwargs() - .get("retry") + rc_no_retries.get_node("127.0.0.1", 16379).redis_connection.get_retry() is None ) rc_no_retries = RedisCluster("127.0.0.1", 16379, retry=Retry(NoBackoff(), 0)) assert ( rc_no_retries.get_node("127.0.0.1", 16379) - .get_connection_kwargs() - .get("retry") + .redis_connection.get_retry() ._retries == 0 ) diff --git a/tests/test_retry.py b/tests/test_retry.py index f844fd0a12..af93805cbc 100644 --- a/tests/test_retry.py +++ b/tests/test_retry.py @@ -2,7 +2,7 @@ import pytest -from redis.backoff import NoBackoff +from redis.backoff import ExponentialBackoff, NoBackoff from redis.client import Redis from redis.connection import Connection, UnixDomainSocketConnection from redis.exceptions import ( @@ -203,3 +203,13 @@ def test_client_retry_on_timeout(self, request): r.get("foo") finally: assert parse_response.call_count == retries + 1 + + def test_get_set_retry_object(self, request): + retry = Retry(NoBackoff(), 2) + r = _get_client(Redis, request, retry_on_timeout=True, retry=retry) + assert r.get_retry()._retries == retry._retries + assert isinstance(r.get_retry()._backoff, NoBackoff) + new_retry_policy = Retry(ExponentialBackoff(), 3) + r.set_retry(new_retry_policy) + assert r.get_retry()._retries == new_retry_policy._retries + assert isinstance(r.get_retry()._backoff, ExponentialBackoff) From c0b57ae48b0f1c2af73d5bdefad33d1406cb88c1 Mon Sep 17 00:00:00 2001 From: Bar Shaul Date: Sun, 11 Sep 2022 14:35:17 +0300 Subject: [PATCH 06/14] Fixed linters --- redis/client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/redis/client.py b/redis/client.py index 040f9a8283..089dbb2969 100755 --- a/redis/client.py +++ b/redis/client.py @@ -25,6 +25,7 @@ WatchError, ) from redis.lock import Lock +from redis.retry import Retry from redis.utils import safe_str, str_if_bytes SYM_EMPTY = b"" From 513f913a41e421eebb0185f57592b15cac151ee8 Mon Sep 17 00:00:00 2001 From: Bar Shaul Date: Sun, 11 Sep 2022 14:46:38 +0300 Subject: [PATCH 07/14] Fixed test --- redis/asyncio/cluster.py | 8 +-- tests/test_asyncio/test_cluster.py | 109 +++++++++++++++-------------- tests/test_cluster.py | 2 +- 3 files changed, 63 insertions(+), 56 deletions(-) diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 3ae5d55f8f..1b65d62eda 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -110,10 +110,10 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand :param startup_nodes: | :class:`~.ClusterNode` to used as a startup node :param require_full_coverage: - | When set to ``False``: the client will not require a full coverage of the - slots. However, if not all slots are covered, and at least one node has - ``cluster-require-full-coverage`` set to ``yes``, the server will throw a - :class:`~.ClusterDownError` for some key-based commands. + | When set to ``False``: the client will not require a full coverage of + the slots. However, if not all slots are covered, and at least one node + has ``cluster-require-full-coverage`` set to ``yes``, the server will throw + a :class:`~.ClusterDownError` for some key-based commands. | When set to ``True``: all slots must be covered to construct the cluster client. If not all slots are covered, :class:`~.RedisClusterException` will be thrown. diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index 1aaf90947e..921591e3e0 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -249,63 +249,70 @@ async def test_startup_nodes(self) -> None: ] ) - async def test_cluster_get_set_retry_object(self, request): + async def test_cluster_set_get_retry_object(self, request: FixtureRequest): retry = Retry(NoBackoff(), 2) url = request.config.getoption("--redis-url") - r = await RedisCluster.from_url(url, retry=retry) - assert r.get_retry()._retries == retry._retries - assert isinstance(r.get_retry()._backoff, NoBackoff) - for node in r.get_nodes(): - n_retry = node.connection_kwargs.get("retry") - assert n_retry is not None - assert n_retry._retries == retry._retries - assert isinstance(n_retry._backoff, NoBackoff) - # Change retry policy - new_retry = Retry(ExponentialBackoff(), 3) - r.set_retry(new_retry) - assert r.get_retry()._retries == new_retry._retries - assert isinstance(r.get_retry()._backoff, ExponentialBackoff) - for node in r.get_nodes(): - n_retry = node.connection_kwargs.get("retry") - assert n_retry is not None - assert n_retry._retries == new_retry._retries - assert isinstance(n_retry._backoff, ExponentialBackoff) - - async def test_cluster_retry_object(self, request) -> None: + async with RedisCluster.from_url(url, retry=retry) as r: + assert r.get_retry()._retries == retry._retries + assert isinstance(r.get_retry()._backoff, NoBackoff) + for node in r.get_nodes(): + n_retry = node.connection_kwargs.get("retry") + assert n_retry is not None + assert n_retry._retries == retry._retries + assert isinstance(n_retry._backoff, NoBackoff) + # Change retry policy + new_retry = Retry(ExponentialBackoff(), 3) + r.set_retry(new_retry) + assert r.get_retry()._retries == new_retry._retries + assert isinstance(r.get_retry()._backoff, ExponentialBackoff) + for node in r.get_nodes(): + n_retry = node.connection_kwargs.get("retry") + assert n_retry is not None + assert n_retry._retries == new_retry._retries + assert isinstance(n_retry._backoff, ExponentialBackoff) + + async def test_cluster_retry_object(self, request: FixtureRequest) -> None: url = request.config.getoption("--redis-url") - rc_default = await RedisCluster.from_url(url) - # Test default retry - retry = rc_default.connection_kwargs.get("retry") - assert isinstance(retry, Retry) - assert retry._retries == 3 - assert isinstance(retry._backoff, type(get_default_backoff())) - assert rc_default.get_node("127.0.0.1", 16379).connection_kwargs.get( - "retry" - ) == rc_default.get_node("127.0.0.1", 16380).connection_kwargs.get("retry") - - # Test custom retry + async with RedisCluster.from_url(url) as rc_default: + # Test default retry + retry = rc_default.connection_kwargs.get("retry") + assert isinstance(retry, Retry) + assert retry._retries == 3 + assert isinstance(retry._backoff, type(get_default_backoff())) + assert rc_default.get_node("127.0.0.1", 16379).connection_kwargs.get( + "retry" + ) == rc_default.get_node("127.0.0.1", 16380).connection_kwargs.get("retry") + retry = Retry(ExponentialBackoff(10, 5), 5) - rc_custom_retry = await RedisCluster.from_url(url, retry=retry) - assert ( - rc_custom_retry.get_node("127.0.0.1", 16379).connection_kwargs.get("retry") - == retry - ) + async with RedisCluster.from_url(url, retry=retry) as rc_custom_retry: + # Test custom retry + assert ( + rc_custom_retry.get_node("127.0.0.1", 16379).connection_kwargs.get( + "retry" + ) + == retry + ) - # Test no connection retries - rc_no_retries = await RedisCluster.from_url( + async with RedisCluster.from_url( url, connection_error_retry_attempts=0 - ) - assert ( - rc_no_retries.get_node("127.0.0.1", 16379).connection_kwargs.get("retry") - is None - ) - rc_no_retries = await RedisCluster.from_url(url, retry=Retry(NoBackoff(), 0)) - assert ( - rc_no_retries.get_node("127.0.0.1", 16379) - .connection_kwargs.get("retry") - ._retries - == 0 - ) + ) as rc_no_retries: + # Test no connection retries + assert ( + rc_no_retries.get_node("127.0.0.1", 16379).connection_kwargs.get( + "retry" + ) + is None + ) + + async with RedisCluster.from_url( + url, retry=Retry(NoBackoff(), 0) + ) as rc_no_retries: + assert ( + rc_no_retries.get_node("127.0.0.1", 16379) + .connection_kwargs.get("retry") + ._retries + == 0 + ) async def test_empty_startup_nodes(self) -> None: """ diff --git a/tests/test_cluster.py b/tests/test_cluster.py index d55e6161b9..427f4b11a5 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -365,7 +365,7 @@ def test_handling_cluster_failover_to_a_replica(self, r): key = "key" r.set("key", "value") primary = r.get_node_from_key(key, replica=False) - assert r.get("key") == "value" + assert str_if_bytes(r.get("key")) == "value" # Get the current output of cluster slots cluster_slots = primary.redis_connection.execute_command("CLUSTER SLOTS") replica_host = "" From 1aa3e7c03bc226fad1f899be3980d95e16c5d583 Mon Sep 17 00:00:00 2001 From: Bar Shaul Date: Mon, 12 Sep 2022 11:52:23 +0300 Subject: [PATCH 08/14] Fixed test_client_kill test --- tests/test_asyncio/test_cluster.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index 921591e3e0..bff18e5e2a 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -1356,8 +1356,11 @@ async def test_client_info(self, r: RedisCluster) -> None: assert "addr" in info @skip_if_server_version_lt("2.6.9") - async def test_client_kill(self, r: RedisCluster, r2: RedisCluster) -> None: + async def test_client_kill( + self, r: RedisCluster, create_redis: Callable[..., RedisCluster] + ) -> None: node = r.get_primaries()[0] + r2 = await create_redis(cls=RedisCluster, flushdb=False) await r.client_setname("redis-py-c1", target_nodes="all") await r2.client_setname("redis-py-c2", target_nodes="all") clients = [ @@ -1378,6 +1381,7 @@ async def test_client_kill(self, r: RedisCluster, r2: RedisCluster) -> None: ] assert len(clients) == 1 assert clients[0].get("name") == "redis-py-c1" + await r2.close() @skip_if_server_version_lt("2.6.0") async def test_cluster_bitop_not_empty_string(self, r: RedisCluster) -> None: From e10092323d5a1b1c298835aaef736403d53a6a0c Mon Sep 17 00:00:00 2001 From: Bar Shaul Date: Mon, 3 Oct 2022 14:32:12 +0300 Subject: [PATCH 09/14] Changed get_default_backoff to default_backoff, removed retry_on_error and connection_error_retry_attempts from RedisCluster, default retry changed to no retries --- redis/__init__.py | 4 ++-- redis/asyncio/__init__.py | 4 ++-- redis/asyncio/cluster.py | 10 ++++++---- redis/backoff.py | 4 +++- redis/cluster.py | 30 +++++++++--------------------- tests/test_asyncio/test_cluster.py | 4 ++-- tests/test_cluster.py | 22 +++------------------- 7 files changed, 27 insertions(+), 51 deletions(-) diff --git a/redis/__init__.py b/redis/__init__.py index 99690b1205..d6d95e9e81 100644 --- a/redis/__init__.py +++ b/redis/__init__.py @@ -1,6 +1,6 @@ import sys -from redis.backoff import get_default_backoff +from redis.backoff import default_backoff from redis.client import Redis, StrictRedis from redis.cluster import RedisCluster from redis.connection import ( @@ -65,7 +65,7 @@ def int_or_str(value): "ConnectionPool", "DataError", "from_url", - "get_default_backoff", + "default_backoff", "InvalidResponse", "PubSubError", "ReadOnlyError", diff --git a/redis/asyncio/__init__.py b/redis/asyncio/__init__.py index 00a809a2a5..bf90dde555 100644 --- a/redis/asyncio/__init__.py +++ b/redis/asyncio/__init__.py @@ -15,7 +15,7 @@ SentinelManagedSSLConnection, ) from redis.asyncio.utils import from_url -from redis.backoff import get_default_backoff +from redis.backoff import default_backoff from redis.exceptions import ( AuthenticationError, AuthenticationWrongNumberOfArgsError, @@ -44,7 +44,7 @@ "ConnectionPool", "DataError", "from_url", - "get_default_backoff", + "default_backoff", "InvalidResponse", "PubSubError", "ReadOnlyError", diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 1b65d62eda..c7da808cd8 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -26,7 +26,7 @@ ) from redis.asyncio.parser import CommandsParser from redis.asyncio.retry import Retry -from redis.backoff import get_default_backoff +from redis.backoff import default_backoff from redis.client import EMPTY_RESPONSE, NEVER_DECODE, AbstractRedis from redis.cluster import ( PIPELINE_BLOCKED_COMMANDS, @@ -138,7 +138,7 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand | Number of times to retry before reinitializing when :class:`~.TimeoutError` or :class:`~.ConnectionError` are encountered. The default backoff strategy will be set if Retry object is not passed (see - get_default_backoff in backoff.py). To change it, pass a custom Retry object + default_backoff in backoff.py). To change it, pass a custom Retry object using the "retry" keyword. :param max_connections: | Maximum number of connections per node. If there are no free connections & the @@ -309,7 +309,7 @@ def __init__( if retry or retry_on_error or connection_error_retry_attempts > 0: # Set a retry object for all cluster nodes self.retry = retry or Retry( - get_default_backoff(), connection_error_retry_attempts + default_backoff(), connection_error_retry_attempts ) if retry_on_error is None: # Default errors for retrying @@ -642,7 +642,9 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any: target_nodes_specified = True retry_attempts = 0 - while True: + # Add one for the first execution + execute_attempts = 1 + retry_attempts + for _ in range(execute_attempts): if self._initialize: await self.initialize() try: diff --git a/redis/backoff.py b/redis/backoff.py index 2836fdce30..c62e760bdc 100644 --- a/redis/backoff.py +++ b/redis/backoff.py @@ -1,7 +1,9 @@ import random from abc import ABC, abstractmethod +# Maximum backoff between each retry in seconds DEFAULT_CAP = 0.512 +# Minimum backoff between each retry in seconds DEFAULT_BASE = 0.008 @@ -108,5 +110,5 @@ def compute(self, failures): return self._previous_backoff -def get_default_backoff(): +def default_backoff(): return EqualJitterBackoff() diff --git a/redis/cluster.py b/redis/cluster.py index 9f25d3bb69..bef94fecfe 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -6,7 +6,7 @@ from collections import OrderedDict from typing import Any, Callable, Dict, List, Optional, Tuple, Union -from redis.backoff import get_default_backoff +from redis.backoff import default_backoff, NoBackoff from redis.client import CaseInsensitiveDict, PubSub, Redis, parse_scan from redis.commands import READ_COMMANDS, CommandsParser, RedisClusterCommands from redis.connection import ConnectionPool, DefaultParser, Encoder, parse_url @@ -430,9 +430,7 @@ def __init__( port: int = 6379, startup_nodes: Optional[List["ClusterNode"]] = None, cluster_error_retry_attempts: int = 3, - connection_error_retry_attempts: int = 3, retry: Optional["Retry"] = None, - retry_on_error: Optional[List[Exception]] = None, require_full_coverage: bool = False, reinitialize_steps: int = 10, read_from_replicas: bool = False, @@ -476,13 +474,7 @@ def __init__( Number of times to retry before raising an error when :class:`~.TimeoutError` or :class:`~.ConnectionError` or :class:`~.ClusterDownError` are encountered - :param connection_error_retry_attempts: - Number of times to retry before reinitializing when :class:`~.TimeoutError` - or :class:`~.ConnectionError` are encountered. - The default backoff strategy will be set if Retry object is not passed (see - get_default_backoff in backoff.py). To change it, pass a custom Retry object - using the "retry" keyword. - :reinitialize_steps: 'int' + :param reinitialize_steps: Specifies the number of MOVED errors that need to occur before reinitializing the whole cluster topology. If a MOVED error occurs and the cluster does not need to be reinitialized on this current @@ -550,17 +542,11 @@ def __init__( self.user_on_connect_func = kwargs.pop("redis_connect_func", None) kwargs.update({"redis_connect_func": self.on_connect}) kwargs = cleanup_kwargs(**kwargs) - - if retry or retry_on_error or connection_error_retry_attempts > 0: - # Set a retry object for all cluster nodes - self.retry = retry or Retry( - get_default_backoff(), connection_error_retry_attempts - ) - if retry_on_error is None: - # Default errors for retrying - retry_on_error = [ConnectionError, TimeoutError] - self.retry.update_supported_errors(retry_on_error) + if retry: + self.retry = retry kwargs.update({"retry": self.retry}) + else: + kwargs.update({"retry": Retry(default_backoff(), 0)}) self.encoder = Encoder( kwargs.get("encoding", "utf-8"), @@ -1019,7 +1005,9 @@ def execute_command(self, *args, **kwargs): retry_attempts = ( 0 if target_nodes_specified else self.cluster_error_retry_attempts ) - while True: + # Add one for the first execution + execute_attempts = 1 + retry_attempts + for _ in range(execute_attempts): try: res = {} if not target_nodes_specified: diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index bff18e5e2a..3b61dc1cad 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -14,7 +14,7 @@ from redis.asyncio.connection import Connection, SSLConnection from redis.asyncio.parser import CommandsParser from redis.asyncio.retry import Retry -from redis.backoff import ExponentialBackoff, NoBackoff, get_default_backoff +from redis.backoff import ExponentialBackoff, NoBackoff, default_backoff from redis.cluster import PIPELINE_BLOCKED_COMMANDS, PRIMARY, REPLICA, get_node_name from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot from redis.exceptions import ( @@ -278,7 +278,7 @@ async def test_cluster_retry_object(self, request: FixtureRequest) -> None: retry = rc_default.connection_kwargs.get("retry") assert isinstance(retry, Retry) assert retry._retries == 3 - assert isinstance(retry._backoff, type(get_default_backoff())) + assert isinstance(retry._backoff, type(default_backoff())) assert rc_default.get_node("127.0.0.1", 16379).connection_kwargs.get( "retry" ) == rc_default.get_node("127.0.0.1", 16380).connection_kwargs.get("retry") diff --git a/tests/test_cluster.py b/tests/test_cluster.py index 427f4b11a5..1c917ed6ea 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -7,7 +7,7 @@ import pytest from redis import Redis -from redis.backoff import ExponentialBackoff, NoBackoff, get_default_backoff +from redis.backoff import ExponentialBackoff, NoBackoff, default_backoff from redis.cluster import ( PRIMARY, REDIS_CLUSTER_HASH_SLOTS, @@ -770,8 +770,8 @@ def test_cluster_retry_object(self, r) -> None: # Test default retry retry = r.get_connection_kwargs().get("retry") assert isinstance(retry, Retry) - assert retry._retries == 3 - assert isinstance(retry._backoff, type(get_default_backoff())) + assert retry._retries == 0 + assert isinstance(retry._backoff, type(default_backoff())) node1 = r.get_node("127.0.0.1", 16379).redis_connection node2 = r.get_node("127.0.0.1", 16380).redis_connection assert node1.get_retry()._retries == node2.get_retry()._retries @@ -786,22 +786,6 @@ def test_cluster_retry_object(self, r) -> None: == retry._retries ) - # Test no connection retries - rc_no_retries = RedisCluster( - "127.0.0.1", 16379, connection_error_retry_attempts=0 - ) - assert ( - rc_no_retries.get_node("127.0.0.1", 16379).redis_connection.get_retry() - is None - ) - rc_no_retries = RedisCluster("127.0.0.1", 16379, retry=Retry(NoBackoff(), 0)) - assert ( - rc_no_retries.get_node("127.0.0.1", 16379) - .redis_connection.get_retry() - ._retries - == 0 - ) - @pytest.mark.onlycluster class TestClusterRedisCommands: From eebf4b0c84a8801dc6642e7f707218c975e2bf37 Mon Sep 17 00:00:00 2001 From: Bar Shaul Date: Tue, 18 Oct 2022 15:47:41 +0300 Subject: [PATCH 10/14] Fixing linters --- redis/cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redis/cluster.py b/redis/cluster.py index bef94fecfe..d3f7f782bd 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -6,7 +6,7 @@ from collections import OrderedDict from typing import Any, Callable, Dict, List, Optional, Tuple, Union -from redis.backoff import default_backoff, NoBackoff +from redis.backoff import default_backoff from redis.client import CaseInsensitiveDict, PubSub, Redis, parse_scan from redis.commands import READ_COMMANDS, CommandsParser, RedisClusterCommands from redis.connection import ConnectionPool, DefaultParser, Encoder, parse_url From 8ca002d2f5d20fb74e062fcb7f8e2a6d0352e3b4 Mon Sep 17 00:00:00 2001 From: Bar Shaul Date: Thu, 27 Oct 2022 10:40:02 +0300 Subject: [PATCH 11/14] Reverting deletion of connection_error_retry_attempts to maintain backward compatibility --- redis/asyncio/cluster.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index c7da808cd8..d1d4d8d426 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -207,6 +207,7 @@ def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster": "reinitialize_steps", "response_callbacks", "result_callbacks", + "retry", ) def __init__( @@ -337,6 +338,7 @@ def __init__( self.read_from_replicas = read_from_replicas self.reinitialize_steps = reinitialize_steps self.cluster_error_retry_attempts = cluster_error_retry_attempts + self.connection_error_retry_attempts = connection_error_retry_attempts self.retry = retry self.reinitialize_counter = 0 self.commands_parser = CommandsParser() From 04dfd25f1b153da4afd27d50db7d0d2f1cce3c8c Mon Sep 17 00:00:00 2001 From: Bar Shaul Date: Thu, 27 Oct 2022 14:36:15 +0300 Subject: [PATCH 12/14] Updating retry object for existing and new connections --- redis/asyncio/client.py | 1 + redis/asyncio/cluster.py | 3 ++- redis/asyncio/connection.py | 6 ++++++ redis/client.py | 1 + redis/connection.py | 9 ++++++++- tests/test_asyncio/test_cluster.py | 5 +++++ tests/test_asyncio/test_retry.py | 4 ++++ tests/test_cluster.py | 7 ++++++- tests/test_retry.py | 4 ++++ 9 files changed, 37 insertions(+), 3 deletions(-) diff --git a/redis/asyncio/client.py b/redis/asyncio/client.py index cee740f8ed..bf001c7a97 100644 --- a/redis/asyncio/client.py +++ b/redis/asyncio/client.py @@ -278,6 +278,7 @@ def get_retry(self) -> Optional["Retry"]: def set_retry(self, retry: "Retry") -> None: self.get_connection_kwargs().update({"retry": retry}) + self.connection_pool.set_retry(retry) def load_external_module(self, funcname, func): """ diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index d1d4d8d426..460e3967d0 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -207,7 +207,6 @@ def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster": "reinitialize_steps", "response_callbacks", "result_callbacks", - "retry", ) def __init__( @@ -504,6 +503,8 @@ def set_retry(self, retry: "Retry") -> None: self.retry = retry for node in self.get_nodes(): node.connection_kwargs.update({"retry": retry}) + for conn in node._connections: + conn.retry = retry def set_response_callback(self, command: str, callback: ResponseCallbackT) -> None: """Set a custom response callback.""" diff --git a/redis/asyncio/connection.py b/redis/asyncio/connection.py index 01da9a9caf..d9a97fd73b 100644 --- a/redis/asyncio/connection.py +++ b/redis/asyncio/connection.py @@ -1426,6 +1426,12 @@ async def disconnect(self, inuse_connections: bool = True): if exc: raise exc + def set_retry(self, retry: "Retry") -> None: + for conn in self._available_connections: + conn.retry = retry + for conn in self._in_use_connections: + conn.retry = retry + class BlockingConnectionPool(ConnectionPool): """ diff --git a/redis/client.py b/redis/client.py index 089dbb2969..18263e26f7 100755 --- a/redis/client.py +++ b/redis/client.py @@ -1050,6 +1050,7 @@ def get_retry(self) -> Optional["Retry"]: def set_retry(self, retry: "Retry") -> None: self.get_connection_kwargs().update({"retry": retry}) + self.connection_pool.set_retry(retry) def set_response_callback(self, command, callback): """Set a custom Response Callback""" diff --git a/redis/connection.py b/redis/connection.py index 2e33e31d2f..3e1b633182 100755 --- a/redis/connection.py +++ b/redis/connection.py @@ -529,7 +529,7 @@ def __init__( # Add TimeoutError to the errors list to retry on retry_on_error.append(TimeoutError) self.retry_on_error = retry_on_error - if retry_on_error: + if retry or retry_on_error: if retry is None: self.retry = Retry(NoBackoff(), 1) else: @@ -1446,6 +1446,13 @@ def disconnect(self, inuse_connections=True): for connection in connections: connection.disconnect() + def set_retry(self, retry: "Retry") -> None: + self.connection_kwargs.update({"retry": retry}) + for conn in self._available_connections: + conn.retry = retry + for conn in self._in_use_connections: + conn.retry = retry + class BlockingConnectionPool(ConnectionPool): """ diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index 3b61dc1cad..38bcaf6c00 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -260,6 +260,8 @@ async def test_cluster_set_get_retry_object(self, request: FixtureRequest): assert n_retry is not None assert n_retry._retries == retry._retries assert isinstance(n_retry._backoff, NoBackoff) + rand_cluster_node = r.get_random_node() + existing_conn = rand_cluster_node.acquire_connection() # Change retry policy new_retry = Retry(ExponentialBackoff(), 3) r.set_retry(new_retry) @@ -270,6 +272,9 @@ async def test_cluster_set_get_retry_object(self, request: FixtureRequest): assert n_retry is not None assert n_retry._retries == new_retry._retries assert isinstance(n_retry._backoff, ExponentialBackoff) + assert existing_conn.retry._retries == new_retry._retries + new_conn = rand_cluster_node.acquire_connection() + assert new_conn.retry._retries == new_retry._retries async def test_cluster_retry_object(self, request: FixtureRequest) -> None: url = request.config.getoption("--redis-url") diff --git a/tests/test_asyncio/test_retry.py b/tests/test_asyncio/test_retry.py index 8066e3070d..86e6ddfa0d 100644 --- a/tests/test_asyncio/test_retry.py +++ b/tests/test_asyncio/test_retry.py @@ -127,6 +127,10 @@ async def test_get_set_retry_object(self, request): assert r.get_retry()._retries == retry._retries assert isinstance(r.get_retry()._backoff, NoBackoff) new_retry_policy = Retry(ExponentialBackoff(), 3) + exiting_conn = await r.connection_pool.get_connection("_") r.set_retry(new_retry_policy) assert r.get_retry()._retries == new_retry_policy._retries assert isinstance(r.get_retry()._backoff, ExponentialBackoff) + assert exiting_conn.retry._retries == new_retry_policy._retries + new_conn = await r.connection_pool.get_connection("_") + assert new_conn.retry._retries == new_retry_policy._retries diff --git a/tests/test_cluster.py b/tests/test_cluster.py index 1c917ed6ea..d18fbbbb33 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -749,12 +749,14 @@ def moved_redirect_effect(connection, *args, **options): def test_cluster_get_set_retry_object(self, request): retry = Retry(NoBackoff(), 2) - r = _get_client(Redis, request, retry=retry) + r = _get_client(RedisCluster, request, retry=retry) assert r.get_retry()._retries == retry._retries assert isinstance(r.get_retry()._backoff, NoBackoff) for node in r.get_nodes(): assert node.redis_connection.get_retry()._retries == retry._retries assert isinstance(node.redis_connection.get_retry()._backoff, NoBackoff) + rand_node = r.get_random_node() + existing_conn = rand_node.redis_connection.connection_pool.get_connection("_") # Change retry policy new_retry = Retry(ExponentialBackoff(), 3) r.set_retry(new_retry) @@ -765,6 +767,9 @@ def test_cluster_get_set_retry_object(self, request): assert isinstance( node.redis_connection.get_retry()._backoff, ExponentialBackoff ) + assert existing_conn.retry._retries == new_retry._retries + new_conn = rand_node.redis_connection.connection_pool.get_connection("_") + assert new_conn.retry._retries == new_retry._retries def test_cluster_retry_object(self, r) -> None: # Test default retry diff --git a/tests/test_retry.py b/tests/test_retry.py index af93805cbc..3cfea5c09e 100644 --- a/tests/test_retry.py +++ b/tests/test_retry.py @@ -207,9 +207,13 @@ def test_client_retry_on_timeout(self, request): def test_get_set_retry_object(self, request): retry = Retry(NoBackoff(), 2) r = _get_client(Redis, request, retry_on_timeout=True, retry=retry) + exist_conn = r.connection_pool.get_connection("_") assert r.get_retry()._retries == retry._retries assert isinstance(r.get_retry()._backoff, NoBackoff) new_retry_policy = Retry(ExponentialBackoff(), 3) r.set_retry(new_retry_policy) assert r.get_retry()._retries == new_retry_policy._retries assert isinstance(r.get_retry()._backoff, ExponentialBackoff) + assert exist_conn.retry._retries == new_retry_policy._retries + new_conn = r.connection_pool.get_connection("_") + assert new_conn.retry._retries == new_retry_policy._retries From b43b1dcc04fa5510ccbb81ac36b79420c6281b9f Mon Sep 17 00:00:00 2001 From: Bar Shaul Date: Wed, 2 Nov 2022 16:46:52 +0200 Subject: [PATCH 13/14] Changed the default value of reinitialize_steps from 10 to 5 --- redis/asyncio/cluster.py | 2 +- redis/cluster.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 460e3967d0..19930157a3 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -217,7 +217,7 @@ def __init__( startup_nodes: Optional[List["ClusterNode"]] = None, require_full_coverage: bool = True, read_from_replicas: bool = False, - reinitialize_steps: int = 10, + reinitialize_steps: int = 5, cluster_error_retry_attempts: int = 3, connection_error_retry_attempts: int = 3, max_connections: int = 2**31, diff --git a/redis/cluster.py b/redis/cluster.py index d3f7f782bd..e9a3f9471d 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -432,7 +432,7 @@ def __init__( cluster_error_retry_attempts: int = 3, retry: Optional["Retry"] = None, require_full_coverage: bool = False, - reinitialize_steps: int = 10, + reinitialize_steps: int = 5, read_from_replicas: bool = False, dynamic_startup_nodes: bool = True, url: Optional[str] = None, @@ -1698,7 +1698,7 @@ def __init__( startup_nodes: Optional[List["ClusterNode"]] = None, read_from_replicas: bool = False, cluster_error_retry_attempts: int = 3, - reinitialize_steps: int = 10, + reinitialize_steps: int = 5, lock=None, **kwargs, ): From c306b37d1dd977cca7905f6680c79203468a1267 Mon Sep 17 00:00:00 2001 From: dvora-h Date: Tue, 8 Nov 2022 17:08:30 +0200 Subject: [PATCH 14/14] fix review comments --- redis/asyncio/cluster.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 19930157a3..1ac8e0ece1 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -306,12 +306,13 @@ def __init__( # Call our on_connect function to configure READONLY mode kwargs["redis_connect_func"] = self.on_connect + self.retry = retry if retry or retry_on_error or connection_error_retry_attempts > 0: # Set a retry object for all cluster nodes self.retry = retry or Retry( default_backoff(), connection_error_retry_attempts ) - if retry_on_error is None: + if not retry_on_error: # Default errors for retrying retry_on_error = [ConnectionError, TimeoutError] self.retry.update_supported_errors(retry_on_error) @@ -338,7 +339,6 @@ def __init__( self.reinitialize_steps = reinitialize_steps self.cluster_error_retry_attempts = cluster_error_retry_attempts self.connection_error_retry_attempts = connection_error_retry_attempts - self.retry = retry self.reinitialize_counter = 0 self.commands_parser = CommandsParser() self.node_flags = self.__class__.NODE_FLAGS.copy()