From 4a482170bb2e7bcf49391c96eed3f772ed6e7fec Mon Sep 17 00:00:00 2001 From: Jonathan Dieter Date: Thu, 13 Jan 2022 15:59:25 +0000 Subject: [PATCH 1/2] Add retries to connections in Sentinel Pools Signed-off-by: Jonathan Dieter --- redis/sentinel.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/redis/sentinel.py b/redis/sentinel.py index 025ab39c8c..37da69aaf8 100644 --- a/redis/sentinel.py +++ b/redis/sentinel.py @@ -1,10 +1,13 @@ +import copy import random import weakref +from redis.backoff import NoBackoff from redis.client import Redis from redis.commands import SentinelCommands from redis.connection import Connection, ConnectionPool, SSLConnection from redis.exceptions import ConnectionError, ReadOnlyError, ResponseError, TimeoutError +from redis.retry import Retry from redis.utils import str_if_bytes @@ -84,6 +87,24 @@ def __init__(self, service_name, sentinel_manager, **kwargs): if kwargs.pop("ssl", False) else SentinelManagedConnection, ) + retry = kwargs.get("retry", None) + retry_on_error = kwargs.get("retry_on_error", None) + retry_on_timeout = kwargs.get("retry_on_timeout", False) + self.retry_on_timeout = retry_on_timeout + if retry_on_timeout: + # Add TimeoutError to the errors list to retry on + retry_on_error.append(TimeoutError) + self.retry_on_error = retry_on_error + if retry_on_error: + if retry is None: + self.retry = Retry(NoBackoff(), 1) + else: + # deep-copy the Retry object as it is mutable + self.retry = copy.deepcopy(retry) + # Update the retry's supported errors with the specified errors + self.retry.update_supported_erros(retry_on_error) + else: + self.retry = Retry(NoBackoff(), 0) self.is_master = kwargs.pop("is_master", True) self.check_connection = kwargs.pop("check_connection", False) super().__init__(**kwargs) @@ -107,6 +128,12 @@ def owns_connection(self, connection): parent = super() return check and parent.owns_connection(connection) + def get_connection(self, *args, **kwargs): + return self.retry.call_with_retry( + lambda: super(type(self), self).get_connection(self, *args, **kwargs), + lambda error: None, + ) + def get_master_address(self): master_address = self.sentinel_manager.discover_master(self.service_name) if self.is_master: From 1fd977fed5dc5a7fc005d0ab1b7b659f60d6228b Mon Sep 17 00:00:00 2001 From: Jonathan Dieter Date: Mon, 17 Jan 2022 09:45:09 +0000 Subject: [PATCH 2/2] Move retries to SentinelManagedConnection Signed-off-by: Jonathan Dieter --- redis/sentinel.py | 35 +++++++---------------------------- 1 file changed, 7 insertions(+), 28 deletions(-) diff --git a/redis/sentinel.py b/redis/sentinel.py index 37da69aaf8..b3f14907d0 100644 --- a/redis/sentinel.py +++ b/redis/sentinel.py @@ -1,13 +1,10 @@ -import copy import random import weakref -from redis.backoff import NoBackoff from redis.client import Redis from redis.commands import SentinelCommands from redis.connection import Connection, ConnectionPool, SSLConnection from redis.exceptions import ConnectionError, ReadOnlyError, ResponseError, TimeoutError -from redis.retry import Retry from redis.utils import str_if_bytes @@ -40,7 +37,7 @@ def connect_to(self, address): if str_if_bytes(self.read_response()) != "PONG": raise ConnectionError("PING failed") - def connect(self): + def _connect_retry(self): if self._sock: return # already connected if self.connection_pool.is_master: @@ -53,6 +50,12 @@ def connect(self): continue raise SlaveNotFoundError # Never be here + def connect(self): + return self.retry.call_with_retry( + self._connect_retry, + lambda error: None, + ) + def read_response(self, disable_decoding=False): try: return super().read_response(disable_decoding=disable_decoding) @@ -87,24 +90,6 @@ def __init__(self, service_name, sentinel_manager, **kwargs): if kwargs.pop("ssl", False) else SentinelManagedConnection, ) - retry = kwargs.get("retry", None) - retry_on_error = kwargs.get("retry_on_error", None) - retry_on_timeout = kwargs.get("retry_on_timeout", False) - self.retry_on_timeout = retry_on_timeout - if retry_on_timeout: - # Add TimeoutError to the errors list to retry on - retry_on_error.append(TimeoutError) - self.retry_on_error = retry_on_error - if retry_on_error: - if retry is None: - self.retry = Retry(NoBackoff(), 1) - else: - # deep-copy the Retry object as it is mutable - self.retry = copy.deepcopy(retry) - # Update the retry's supported errors with the specified errors - self.retry.update_supported_erros(retry_on_error) - else: - self.retry = Retry(NoBackoff(), 0) self.is_master = kwargs.pop("is_master", True) self.check_connection = kwargs.pop("check_connection", False) super().__init__(**kwargs) @@ -128,12 +113,6 @@ def owns_connection(self, connection): parent = super() return check and parent.owns_connection(connection) - def get_connection(self, *args, **kwargs): - return self.retry.call_with_retry( - lambda: super(type(self), self).get_connection(self, *args, **kwargs), - lambda error: None, - ) - def get_master_address(self): master_address = self.sentinel_manager.discover_master(self.service_name) if self.is_master: