Skip to content

Commit f5a4cb2

Browse files
committed
Tie connection counter to the future/channel
Instead of manually tracking where connection counter should be released, just do it when future becomes done or connection becomes closed. I believe, code is safer against leaks this way.
1 parent 5351b5c commit f5a4cb2

File tree

5 files changed

+77
-48
lines changed

5 files changed

+77
-48
lines changed

client/src/main/java/org/asynchttpclient/netty/NettyResponseFuture.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import static org.asynchttpclient.util.DateUtils.unpreciseMillisTime;
1717
import io.netty.channel.Channel;
1818

19+
import java.io.IOException;
1920
import java.util.concurrent.CancellationException;
2021
import java.util.concurrent.CompletableFuture;
2122
import java.util.concurrent.ExecutionException;
@@ -31,6 +32,7 @@
3132
import org.asynchttpclient.Realm;
3233
import org.asynchttpclient.Request;
3334
import org.asynchttpclient.channel.ChannelPoolPartitioning;
35+
import org.asynchttpclient.netty.channel.ChannelManager;
3436
import org.asynchttpclient.netty.channel.ChannelState;
3537
import org.asynchttpclient.netty.channel.Channels;
3638
import org.asynchttpclient.netty.request.NettyRequest;
@@ -56,6 +58,7 @@ public final class NettyResponseFuture<V> implements ListenableFuture<V> {
5658

5759
private final long start = unpreciseMillisTime();
5860
private final ChannelPoolPartitioning connectionPoolPartitioning;
61+
private final ChannelManager channelManager;
5962
private final ProxyServer proxyServer;
6063
private final int maxRetry;
6164
private final CompletableFuture<V> future = new CompletableFuture<>();
@@ -73,6 +76,8 @@ public final class NettyResponseFuture<V> implements ListenableFuture<V> {
7376
private volatile int onThrowableCalled = 0;
7477
@SuppressWarnings("unused")
7578
private volatile TimeoutsHolder timeoutsHolder;
79+
// partition key, when != null used to release lock in ChannelManager
80+
private volatile Object partitionKeyLock;
7681

7782
@SuppressWarnings("rawtypes")
7883
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> isDoneField = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "isDone");
@@ -88,6 +93,7 @@ public final class NettyResponseFuture<V> implements ListenableFuture<V> {
8893
private static final AtomicIntegerFieldUpdater<NettyResponseFuture> onThrowableCalledField = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "onThrowableCalled");
8994
@SuppressWarnings("rawtypes")
9095
private static final AtomicReferenceFieldUpdater<NettyResponseFuture, TimeoutsHolder> timeoutsHolderField = AtomicReferenceFieldUpdater.newUpdater(NettyResponseFuture.class, TimeoutsHolder.class, "timeoutsHolder");
96+
private static final AtomicReferenceFieldUpdater<NettyResponseFuture, Object> partitionKeyLockField = AtomicReferenceFieldUpdater.newUpdater(NettyResponseFuture.class, Object.class, "partitionKeyLock");
9197

9298
// volatile where we need CAS ops
9399
private volatile int redirectCount = 0;
@@ -118,16 +124,36 @@ public NettyResponseFuture(Request originalRequest,//
118124
NettyRequest nettyRequest,//
119125
int maxRetry,//
120126
ChannelPoolPartitioning connectionPoolPartitioning,//
127+
ChannelManager channelManager,//
121128
ProxyServer proxyServer) {
122129

123130
this.asyncHandler = asyncHandler;
124131
this.targetRequest = currentRequest = originalRequest;
125132
this.nettyRequest = nettyRequest;
126133
this.connectionPoolPartitioning = connectionPoolPartitioning;
134+
this.channelManager = channelManager;
127135
this.proxyServer = proxyServer;
128136
this.maxRetry = maxRetry;
129137
}
130138

139+
private void releasePartitionKeyLock() {
140+
Object partitionKey = takePartitionKeyLock();
141+
if (partitionKey != null) {
142+
channelManager.releaseChannelLock(partitionKey);
143+
}
144+
}
145+
146+
// Take partition key lock object,
147+
// but do not release channel lock.
148+
public Object takePartitionKeyLock() {
149+
// shortcut, much faster than getAndSet
150+
if (partitionKeyLock == null) {
151+
return null;
152+
}
153+
154+
return partitionKeyLockField.getAndSet(this, null);
155+
}
156+
131157
// java.util.concurrent.Future
132158

133159
@Override
@@ -142,6 +168,7 @@ public boolean isCancelled() {
142168

143169
@Override
144170
public boolean cancel(boolean force) {
171+
releasePartitionKeyLock();
145172
cancelTimeouts();
146173

147174
if (isCancelledField.getAndSet(this, 1) != 0)
@@ -210,6 +237,7 @@ private V getContent() throws ExecutionException {
210237
// org.asynchttpclient.ListenableFuture
211238

212239
private boolean terminateAndExit() {
240+
releasePartitionKeyLock();
213241
cancelTimeouts();
214242
this.channel = null;
215243
this.reuseChannel = false;
@@ -454,6 +482,29 @@ public Object getPartitionKey() {
454482
return connectionPoolPartitioning.getPartitionKey(targetRequest.getUri(), targetRequest.getVirtualHost(), proxyServer);
455483
}
456484

485+
public void acquirePartitionLockLazily() throws IOException {
486+
if (partitionKeyLock != null) {
487+
return;
488+
}
489+
490+
Object partitionKey = getPartitionKey();
491+
channelManager.acquireChannelLock(partitionKey);
492+
Object prevKey = partitionKeyLockField.getAndSet(this, partitionKey);
493+
if (prevKey != null) {
494+
// self-check
495+
496+
channelManager.releaseChannelLock(prevKey);
497+
releasePartitionKeyLock();
498+
499+
throw new AssertionError("unreachable");
500+
}
501+
502+
if (isDone()) {
503+
// may be cancelled while we acquired a lock
504+
releasePartitionKeyLock();
505+
}
506+
}
507+
457508
public Realm getRealm() {
458509
return realm;
459510
}

client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -144,28 +144,7 @@ public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) {
144144
new NonBlockingSemaphore(config.getMaxConnections()) :
145145
NonBlockingSemaphoreInfinite.INSTANCE;
146146

147-
if (maxTotalConnectionsEnabled || maxConnectionsPerHostEnabled) {
148-
openChannels = new DefaultChannelGroup("asyncHttpClient", GlobalEventExecutor.INSTANCE) {
149-
@Override
150-
public boolean remove(Object o) {
151-
boolean removed = super.remove(o);
152-
if (removed) {
153-
freeChannels.release();
154-
if (maxConnectionsPerHostEnabled) {
155-
Object partitionKey = Channel.class.cast(o).attr(partitionKeyAttr).getAndSet(null);
156-
if (partitionKey != null) {
157-
NonBlockingSemaphore hostFreeChannels = freeChannelsPerHost.get(partitionKey);
158-
if (hostFreeChannels != null)
159-
hostFreeChannels.release();
160-
}
161-
}
162-
}
163-
return removed;
164-
}
165-
};
166-
} else {
167-
openChannels = new DefaultChannelGroup("asyncHttpClient", GlobalEventExecutor.INSTANCE);
168-
}
147+
openChannels = new DefaultChannelGroup("asyncHttpClient", GlobalEventExecutor.INSTANCE);
169148

170149
handshakeTimeout = config.getHandshakeTimeout();
171150

client/src/main/java/org/asynchttpclient/netty/channel/NettyConnectListener.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.net.ConnectException;
2323
import java.net.InetSocketAddress;
2424

25+
import io.netty.util.concurrent.Future;
26+
import io.netty.util.concurrent.GenericFutureListener;
2527
import org.asynchttpclient.Request;
2628
import org.asynchttpclient.handler.AsyncHandlerExtensions;
2729
import org.asynchttpclient.netty.NettyResponseFuture;
@@ -43,26 +45,19 @@ public final class NettyConnectListener<T> {
4345
private final NettyRequestSender requestSender;
4446
private final NettyResponseFuture<T> future;
4547
private final ChannelManager channelManager;
46-
private final boolean channelPreempted;
4748
private final Object partitionKey;
4849

4950
public NettyConnectListener(NettyResponseFuture<T> future,//
5051
NettyRequestSender requestSender,//
5152
ChannelManager channelManager,//
52-
boolean channelPreempted,//
5353
Object partitionKey) {
5454
this.future = future;
5555
this.requestSender = requestSender;
5656
this.channelManager = channelManager;
57-
this.channelPreempted = channelPreempted;
5857
this.partitionKey = partitionKey;
5958
}
6059

6160
private void abortChannelPreemption(Channel channel) {
62-
if (channelPreempted) {
63-
channelManager.releaseChannelLock(partitionKey);
64-
}
65-
6661
Channels.silentlyCloseChannel(channel);
6762
}
6863

@@ -95,6 +90,20 @@ private void writeRequest(Channel channel) {
9590

9691
public void onSuccess(Channel channel, InetSocketAddress remoteAddress) {
9792

93+
{
94+
// transfer lock from future to channel
95+
Object partitionKeyLock = future.takePartitionKeyLock();
96+
97+
if (partitionKeyLock != null) {
98+
channel.closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
99+
@Override
100+
public void operationComplete(Future<? super Void> future) throws Exception {
101+
channelManager.releaseChannelLock(partitionKeyLock);
102+
}
103+
});
104+
}
105+
}
106+
98107
Channels.setInactiveToken(channel);
99108

100109
TimeoutsHolder timeoutsHolder = future.getTimeoutsHolder();

client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -265,16 +265,10 @@ private <T> ListenableFuture<T> sendRequestWithNewChannel(//
265265

266266
Object partitionKey = future.getPartitionKey();
267267

268-
// we disable channelPreemption when performing next requests
269-
final boolean acquireChannelLock = !performingNextRequest;
270-
271268
try {
272269
// Do not throw an exception when we need an extra connection for a
273270
// redirect.
274-
if (acquireChannelLock) {
275-
// if there's an exception here, channel wasn't preempted and resolve won't happen
276-
channelManager.acquireChannelLock(partitionKey);
277-
}
271+
future.acquirePartitionLockLazily();
278272
} catch (Throwable t) {
279273
abort(null, future, getCause(t));
280274
// exit and don't try to resolve address
@@ -288,20 +282,15 @@ private <T> ListenableFuture<T> sendRequestWithNewChannel(//
288282

289283
@Override
290284
protected void onSuccess(List<InetSocketAddress> addresses) {
291-
NettyConnectListener<T> connectListener = new NettyConnectListener<>(future, NettyRequestSender.this, channelManager, acquireChannelLock, partitionKey);
285+
NettyConnectListener<T> connectListener = new NettyConnectListener<>(future, NettyRequestSender.this, channelManager, partitionKey);
292286
NettyChannelConnector connector = new NettyChannelConnector(request.getLocalAddress(), addresses, asyncHandler, clientState, config);
293287
if (!future.isDone()) {
294288
connector.connect(bootstrap, connectListener);
295-
} else if (acquireChannelLock) {
296-
channelManager.releaseChannelLock(partitionKey);
297289
}
298290
}
299291

300292
@Override
301293
protected void onFailure(Throwable cause) {
302-
if (acquireChannelLock) {
303-
channelManager.releaseChannelLock(partitionKey);
304-
}
305294
abort(null, future, getCause(cause));
306295
}
307296
});
@@ -317,6 +306,7 @@ private <T> NettyResponseFuture<T> newNettyResponseFuture(Request request, Async
317306
nettyRequest,//
318307
config.getMaxRequestRetry(),//
319308
request.getChannelPoolPartitioning(),//
309+
channelManager,//
320310
proxyServer);
321311

322312
String expectHeader = request.getHeaders().get(EXPECT);

client/src/test/java/org/asynchttpclient/netty/NettyResponseFutureTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public class NettyResponseFutureTest {
2828
@Test
2929
public void testCancel() {
3030
AsyncHandler<?> asyncHandler = mock(AsyncHandler.class);
31-
NettyResponseFuture<?> nettyResponseFuture = new NettyResponseFuture<>(null, asyncHandler, null, 3, null, null);
31+
NettyResponseFuture<?> nettyResponseFuture = new NettyResponseFuture<>(null, asyncHandler, null, 3, null, null, null);
3232
boolean result = nettyResponseFuture.cancel(false);
3333
verify(asyncHandler).onThrowable(anyObject());
3434
assertTrue(result, "Cancel should return true if the Future was cancelled successfully");
@@ -38,7 +38,7 @@ public void testCancel() {
3838
@Test
3939
public void testCancelOnAlreadyCancelled() {
4040
AsyncHandler<?> asyncHandler = mock(AsyncHandler.class);
41-
NettyResponseFuture<?> nettyResponseFuture = new NettyResponseFuture<>(null, asyncHandler, null, 3, null, null);
41+
NettyResponseFuture<?> nettyResponseFuture = new NettyResponseFuture<>(null, asyncHandler, null, 3, null, null, null);
4242
nettyResponseFuture.cancel(false);
4343
boolean result = nettyResponseFuture.cancel(false);
4444
assertFalse(result, "cancel should return false for an already cancelled Future");
@@ -48,7 +48,7 @@ public void testCancelOnAlreadyCancelled() {
4848
@Test(expectedExceptions = CancellationException.class)
4949
public void testGetContentThrowsCancellationExceptionIfCancelled() throws InterruptedException, ExecutionException {
5050
AsyncHandler<?> asyncHandler = mock(AsyncHandler.class);
51-
NettyResponseFuture<?> nettyResponseFuture = new NettyResponseFuture<>(null, asyncHandler, null, 3, null, null);
51+
NettyResponseFuture<?> nettyResponseFuture = new NettyResponseFuture<>(null, asyncHandler, null, 3, null, null, null);
5252
nettyResponseFuture.cancel(false);
5353
nettyResponseFuture.get();
5454
fail("A CancellationException must have occurred by now as 'cancel' was called before 'get'");
@@ -60,7 +60,7 @@ public void testGet() throws Exception {
6060
AsyncHandler<Object> asyncHandler = mock(AsyncHandler.class);
6161
Object value = new Object();
6262
when(asyncHandler.onCompleted()).thenReturn(value);
63-
NettyResponseFuture<?> nettyResponseFuture = new NettyResponseFuture<>(null, asyncHandler, null, 3, null, null);
63+
NettyResponseFuture<?> nettyResponseFuture = new NettyResponseFuture<>(null, asyncHandler, null, 3, null, null, null);
6464
nettyResponseFuture.done();
6565
Object result = nettyResponseFuture.get();
6666
assertEquals(result, value, "The Future should return the value given by asyncHandler#onCompleted");
@@ -70,7 +70,7 @@ public void testGet() throws Exception {
7070
public void testGetThrowsExceptionThrownByAsyncHandler() throws Exception {
7171
AsyncHandler<?> asyncHandler = mock(AsyncHandler.class);
7272
when(asyncHandler.onCompleted()).thenThrow(new RuntimeException());
73-
NettyResponseFuture<?> nettyResponseFuture = new NettyResponseFuture<>(null, asyncHandler, null, 3, null, null);
73+
NettyResponseFuture<?> nettyResponseFuture = new NettyResponseFuture<>(null, asyncHandler, null, 3, null, null, null);
7474
nettyResponseFuture.done();
7575
nettyResponseFuture.get();
7676
fail("An ExecutionException must have occurred by now as asyncHandler threw an exception in 'onCompleted'");
@@ -79,7 +79,7 @@ public void testGetThrowsExceptionThrownByAsyncHandler() throws Exception {
7979
@Test(expectedExceptions = ExecutionException.class)
8080
public void testGetThrowsExceptionOnAbort() throws InterruptedException, ExecutionException {
8181
AsyncHandler<?> asyncHandler = mock(AsyncHandler.class);
82-
NettyResponseFuture<?> nettyResponseFuture = new NettyResponseFuture<>(null, asyncHandler, null, 3, null, null);
82+
NettyResponseFuture<?> nettyResponseFuture = new NettyResponseFuture<>(null, asyncHandler, null, 3, null, null, null);
8383
nettyResponseFuture.abort(new RuntimeException());
8484
nettyResponseFuture.get();
8585
fail("An ExecutionException must have occurred by now as 'abort' was called before 'get'");

0 commit comments

Comments
 (0)