Skip to content

Commit 2222b03

Browse files
author
Rachid Ben Moussa
committed
Unit test that reproduces the NPE when connection is reset by peer
1 parent bd7b5bd commit 2222b03

File tree

1 file changed

+107
-0
lines changed

1 file changed

+107
-0
lines changed
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package org.asynchttpclient.netty;
2+
3+
import org.asynchttpclient.DefaultAsyncHttpClient;
4+
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
5+
import org.asynchttpclient.RequestBuilder;
6+
import org.testng.annotations.BeforeTest;
7+
import org.testng.annotations.Test;
8+
9+
import java.io.IOException;
10+
import java.io.InputStream;
11+
import java.io.OutputStream;
12+
import java.net.ServerSocket;
13+
import java.net.Socket;
14+
import java.net.SocketException;
15+
import java.util.Arrays;
16+
import java.util.concurrent.Exchanger;
17+
import java.util.concurrent.ExecutionException;
18+
import java.util.concurrent.TimeoutException;
19+
import java.util.function.Consumer;
20+
21+
import static org.hamcrest.CoreMatchers.instanceOf;
22+
import static org.hamcrest.CoreMatchers.is;
23+
import static org.hamcrest.MatcherAssert.assertThat;
24+
import static org.hamcrest.Matchers.not;
25+
import static org.testng.Assert.assertTrue;
26+
27+
public class NettyConnectionResetByPeerTest {
28+
29+
private String resettingServerAddress;
30+
31+
@BeforeTest
32+
public void setUp() {
33+
resettingServerAddress = createResettingServer();
34+
}
35+
36+
@Test
37+
public void testAsyncHttpClientConnectionResetByPeer() throws InterruptedException {
38+
try {
39+
DefaultAsyncHttpClientConfig config = new DefaultAsyncHttpClientConfig.Builder()
40+
.setRequestTimeout(500)
41+
.build();
42+
new DefaultAsyncHttpClient(config).executeRequest(
43+
new RequestBuilder("GET").setUrl(resettingServerAddress)
44+
).get();
45+
} catch (ExecutionException e) {
46+
Throwable ex = e.getCause();
47+
assertThat(ex, is(not(instanceOf(TimeoutException.class))));
48+
assertThat(ex, is(instanceOf(IOException.class)));
49+
// assertTrue(ex.getMessage().equalsIgnoreCase("Connection reset by peer"));
50+
}
51+
}
52+
53+
private static String createResettingServer() {
54+
return createServer(sock -> {
55+
try (Socket socket = sock) {
56+
socket.setSoLinger(true, 0);
57+
InputStream inputStream = socket.getInputStream();
58+
//to not eliminate read
59+
OutputStream os = new OutputStream() {
60+
@Override
61+
public void write(int b) {
62+
// Do nothing
63+
}
64+
};
65+
os.write(startRead(inputStream));
66+
} catch (IOException e) {
67+
throw new RuntimeException(e);
68+
}
69+
});
70+
}
71+
72+
private static String createServer(Consumer<Socket> handler) {
73+
Exchanger<Integer> portHolder = new Exchanger<>();
74+
Thread t = new Thread(() -> {
75+
try (ServerSocket ss = new ServerSocket(0)) {
76+
portHolder.exchange(ss.getLocalPort());
77+
while (true) {
78+
handler.accept(ss.accept());
79+
}
80+
} catch (Exception e) {
81+
if (e instanceof InterruptedException) {
82+
Thread.currentThread().interrupt();
83+
}
84+
throw new RuntimeException(e);
85+
}
86+
});
87+
t.setDaemon(true);
88+
t.start();
89+
return tryGetAddress(portHolder);
90+
}
91+
92+
private static String tryGetAddress(Exchanger<Integer> portHolder) {
93+
try {
94+
return "http://localhost:" + portHolder.exchange(0);
95+
} catch (InterruptedException e) {
96+
Thread.currentThread().interrupt();
97+
throw new RuntimeException(e);
98+
}
99+
}
100+
101+
private static byte[] startRead(InputStream inputStream) throws IOException {
102+
byte[] buffer = new byte[4];
103+
int length = inputStream.read(buffer);
104+
return Arrays.copyOf(buffer, length);
105+
}
106+
107+
}

0 commit comments

Comments
 (0)