5
5
import threading
6
6
from uuid import uuid4
7
7
from ssl import CERT_NONE , CERT_OPTIONAL , CERT_REQUIRED , create_default_context
8
- import socket , types
9
8
10
9
import pyarrow
11
10
import thrift .transport .THttpClient
40
39
"_retry_delay_max" : (float , 60 , 5 , 3600 ),
41
40
"_retry_stop_after_attempts_count" : (int , 30 , 1 , 60 ),
42
41
"_retry_stop_after_attempts_duration" : (float , 900 , 1 , 86400 ),
43
- "_retry_delay_default" : (float , 5 , 1 , 60 ),
44
42
}
45
43
46
44
@@ -81,8 +79,6 @@ def __init__(
81
79
# next calculated pre-retry delay would go past
82
80
# _retry_stop_after_attempts_duration, stop now.)
83
81
#
84
- # _retry_delay_default (default: 5)
85
- # used when Retry-After is not specified by the server
86
82
# _retry_stop_after_attempts_count
87
83
# The maximum number of times we should retry retryable requests (defaults to 24)
88
84
# _socket_timeout
@@ -247,7 +243,7 @@ def _handle_request_error(self, error_info, attempt, elapsed):
247
243
# FUTURE: Consider moving to https://github.com/litl/backoff or
248
244
# https://github.com/jd/tenacity for retry logic.
249
245
def make_request (self , method , request ):
250
- """Execute given request, attempting retries when TCP connection fils or when receiving HTTP 429/503.
246
+ """Execute given request, attempting retries when receiving HTTP 429/503.
251
247
252
248
For delay between attempts, honor the given Retry-After header, but with bounds.
253
249
Use lower bound of expontial-backoff based on _retry_delay_min,
@@ -265,12 +261,8 @@ def get_elapsed():
265
261
return time .time () - t0
266
262
267
263
def extract_retry_delay (attempt ):
268
- """
269
- Encapsulate retry checks based on HTTP headers. Returns None || delay-in-secs
270
-
271
- Retry IFF 429/503 code + Retry-After header set
272
- """
273
-
264
+ # encapsulate retry checks, returns None || delay-in-secs
265
+ # Retry IFF 429/503 code + Retry-After header set
274
266
http_code = getattr (self ._transport , "code" , None )
275
267
retry_after = getattr (self ._transport , "headers" , {}).get ("Retry-After" )
276
268
if http_code in [429 , 503 ] and retry_after :
@@ -287,63 +279,24 @@ def attempt_request(attempt):
287
279
# - non-None method_return -> success, return and be done
288
280
# - non-None retry_delay -> sleep delay before retry
289
281
# - error, error_message always set when available
290
-
291
- error = None
292
-
293
- # If retry_delay is None the request is treated as non-retryable
294
- retry_delay = None
295
282
try :
296
283
logger .debug ("Sending request: {}" .format (request ))
297
284
response = method (request )
298
285
logger .debug ("Received response: {}" .format (response ))
299
286
return response
300
- except socket .timeout as err :
301
- # We only retry for socket.timeout if the operation that timed out was a connection request
302
- # Otherwise idempotency is not guaranteed because something may have been transmitted to the server
303
-
304
- def _dig_through_traceback (tb : types .TracebackType , mod , meth ):
305
- """Recursively search the traceback stack to see if mod.meth raised the exception
306
- """
307
- _mod , _meth = mod , meth
308
- tb_meth = tb .tb_frame .f_code .co_name
309
- tb_mod = tb .tb_frame .f_code .co_filename .split ("/" )[- 1 ].replace (".py" , "" )
310
-
311
- if tb_meth == _meth and _mod == tb_mod :
312
- return True
313
- elif tb .tb_next is None :
314
- return False
315
-
316
- return _dig_through_traceback (tb .tb_next , mod , meth )
317
-
318
- tb = err .__traceback__
319
- failed_during_socket_connect = _dig_through_traceback (tb , "socket" , "create_connection" )
320
- failed_during_http_open = _dig_through_traceback (tb , "client" , "connect" )
321
-
322
- if failed_during_socket_connect and failed_during_http_open :
323
- retry_delay = self ._retry_delay_default
324
-
325
- error_message = str (err )
326
- error = err
327
- except OSError as err :
328
- # OSError 110 means EHOSTUNREACHABLE, which means the connection was timed out by the operating system
329
- if "Errno 110" in str (err ):
330
- retry_delay = self ._retry_delay_default
331
- error_message = str (err )
332
- error = err
333
- except Exception as err :
287
+ except Exception as error :
334
288
retry_delay = extract_retry_delay (attempt )
335
289
error_message = ThriftBackend ._extract_error_message_from_headers (
336
290
getattr (self ._transport , "headers" , {})
337
291
)
338
- error = err
339
- return RequestErrorInfo (
340
- error = error ,
341
- error_message = error_message ,
342
- retry_delay = retry_delay ,
343
- http_code = getattr (self ._transport , "code" , None ),
344
- method = method .__name__ ,
345
- request = request ,
346
- )
292
+ return RequestErrorInfo (
293
+ error = error ,
294
+ error_message = error_message ,
295
+ retry_delay = retry_delay ,
296
+ http_code = getattr (self ._transport , "code" , None ),
297
+ method = method .__name__ ,
298
+ request = request ,
299
+ )
347
300
348
301
# The real work:
349
302
# - for each available attempt:
0 commit comments