@@ -1204,6 +1204,19 @@ class ConnectionPool:
1204
1204
``connection_class``.
1205
1205
"""
1206
1206
1207
+ __slots__ = (
1208
+ "connection_class" ,
1209
+ "connection_kwargs" ,
1210
+ "max_connections" ,
1211
+ "_fork_lock" ,
1212
+ "_lock" ,
1213
+ "_created_connections" ,
1214
+ "_available_connections" ,
1215
+ "_in_use_connections" ,
1216
+ "encoder_class" ,
1217
+ "pid" ,
1218
+ )
1219
+
1207
1220
@classmethod
1208
1221
def from_url (cls : Type [_CP ], url : str , ** kwargs ) -> _CP :
1209
1222
"""
@@ -1486,36 +1499,34 @@ class BlockingConnectionPool(ConnectionPool):
1486
1499
>>> pool = BlockingConnectionPool(timeout=5)
1487
1500
"""
1488
1501
1502
+ __slots__ = (
1503
+ "queue_class" ,
1504
+ "timeout" ,
1505
+ "pool" ,
1506
+ )
1507
+
1489
1508
def __init__ (
1490
1509
self ,
1491
1510
max_connections : int = 50 ,
1492
1511
timeout : Optional [int ] = 20 ,
1493
1512
connection_class : Type [Connection ] = Connection ,
1494
- queue_class : Type [asyncio .Queue ] = asyncio .LifoQueue ,
1513
+ queue_class : Type [asyncio .Queue ] = asyncio .Queue ,
1495
1514
** connection_kwargs ,
1496
1515
):
1497
1516
1498
1517
self .queue_class = queue_class
1499
1518
self .timeout = timeout
1500
- self ._connections : List [Connection ]
1501
1519
super ().__init__ (
1502
1520
connection_class = connection_class ,
1503
1521
max_connections = max_connections ,
1504
1522
** connection_kwargs ,
1505
1523
)
1506
1524
1507
1525
def reset (self ):
1508
- # Create and fill up a thread safe queue with ``None`` values.
1526
+ # a queue of ready connections. populated lazily
1509
1527
self .pool = self .queue_class (self .max_connections )
1510
- while True :
1511
- try :
1512
- self .pool .put_nowait (None )
1513
- except asyncio .QueueFull :
1514
- break
1515
-
1516
- # Keep a list of actual connection instances so that we can
1517
- # disconnect them later.
1518
- self ._connections = []
1528
+ # used to decide wether we can allocate new connection or wait
1529
+ self ._created_connections = 0
1519
1530
1520
1531
# this must be the last operation in this method. while reset() is
1521
1532
# called when holding _fork_lock, other threads in this process
@@ -1529,41 +1540,36 @@ def reset(self):
1529
1540
self .pid = os .getpid ()
1530
1541
1531
1542
def make_connection (self ):
1532
- """Make a fresh connection."""
1533
- connection = self .connection_class (** self .connection_kwargs )
1534
- self ._connections .append (connection )
1535
- return connection
1543
+ """Create a new connection"""
1544
+ self ._created_connections += 1
1545
+ return self .connection_class (** self .connection_kwargs )
1536
1546
1537
1547
async def get_connection (self , command_name , * keys , ** options ):
1538
1548
"""
1539
1549
Get a connection, blocking for ``self.timeout`` until a connection
1540
1550
is available from the pool.
1541
1551
1542
- If the connection returned is ``None`` then creates a new connection.
1543
- Because we use a last-in first-out queue, the existing connections
1544
- (having been returned to the pool after the initial ``None`` values
1545
- were added) will be returned before ``None`` values. This means we only
1546
- create new connections when we need to, i.e.: the actual number of
1547
- connections will only increase in response to demand.
1552
+ Checks internal connection counter to ensure connections are allocated lazily.
1548
1553
"""
1549
1554
# Make sure we haven't changed process.
1550
1555
self ._checkpid ()
1551
1556
1552
- # Try and get a connection from the pool. If one isn't available within
1553
- # self.timeout then raise a ``ConnectionError``.
1554
- connection = None
1555
- try :
1556
- async with async_timeout .timeout (self .timeout ):
1557
- connection = await self .pool .get ()
1558
- except (asyncio .QueueEmpty , asyncio .TimeoutError ):
1559
- # Note that this is not caught by the redis client and will be
1560
- # raised unless handled by application code. If you want never to
1561
- raise ConnectionError ("No connection available." )
1562
-
1563
- # If the ``connection`` is actually ``None`` then that's a cue to make
1564
- # a new connection to add to the pool.
1565
- if connection is None :
1566
- connection = self .make_connection ()
1557
+ # if we are under max_connections, try getting one immediately. if it fails
1558
+ # it is ok to allocate new one
1559
+ if self ._created_connections < self .max_connections :
1560
+ try :
1561
+ connection = self .pool .get_nowait ()
1562
+ except asyncio .QueueEmpty :
1563
+ connection = self .make_connection ()
1564
+ else :
1565
+ # wait for available connection
1566
+ try :
1567
+ async with async_timeout .timeout (self .timeout ):
1568
+ connection = await self .pool .get ()
1569
+ except asyncio .TimeoutError :
1570
+ # Note that this is not caught by the redis client and will be
1571
+ # raised unless handled by application code.
1572
+ raise ConnectionError ("No connection available." )
1567
1573
1568
1574
try :
1569
1575
# ensure this connection is connected to Redis
@@ -1613,7 +1619,10 @@ async def disconnect(self, inuse_connections: bool = True):
1613
1619
self ._checkpid ()
1614
1620
async with self ._lock :
1615
1621
resp = await asyncio .gather (
1616
- * (connection .disconnect () for connection in self ._connections ),
1622
+ * (
1623
+ self .pool .get_nowait ().disconnect ()
1624
+ for _ in range (self .pool .qsize ())
1625
+ ),
1617
1626
return_exceptions = True ,
1618
1627
)
1619
1628
exc = next ((r for r in resp if isinstance (r , BaseException )), None )
0 commit comments