Skip to content

Commit 98bef40

Browse files
committed
Revamp WebSocket API, close #1393
Motivation: Current WebSocket API has many limitations: 1. It's not possible to be notified of received fragmented frames, they are always aggregated 2. It’s not possible to specify extension bits when sending a frame 3. The API for being notified of write completion is cumbersome 4. There are tons of different interfaces to be notified of the different types of frames 5. Method names are not aligned between `WebSocket` and `WebSocketListener` 6. There are 2 onClose listeners, the default one doesn't expose the code and reason Modifications: 1. Add a new `aggregateWebSocketFrameFragments` config param, defaulting to true. When false, fragmented frames are not aggregated. Drop `WebSocketByteFragmentListener` and `WebSocketTextFragmentListener` and add `finalFragment` and `rsv` parameters to `WebSocketListener` 2. Provide a complete set of send methods on `WebSocket` supporting `rsv` and fragmentation/continuation frame. 3. Have send methods return the Netty future so users can register listeners 4. Drop `WebSocketTextListener`, `WebSocketByteListener`, `WebSocketPingListener` and `WebSocketPongListener` and add default methods on `WebSocketListener`. Drop `DefaultWebSocketListener`. 5. Rename all methods to `sendXXXFrame` and `onXXXFrame` 6. Drop `WebSocketCloseCodeReasonListener` and change `WebSocketListener#onClose` to notify with code and reason. Result: More complete and consistent WebSocket support
1 parent 43b20af commit 98bef40

26 files changed

+601
-945
lines changed

client/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,8 @@ public interface AsyncHttpClientConfig {
279279

280280
boolean isValidateResponseHeaders();
281281

282+
boolean isAggregateWebSocketFrameFragments();
283+
282284
boolean isTcpNoDelay();
283285

284286
boolean isSoReuseAddress();

client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClientConfig.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public class DefaultAsyncHttpClientConfig implements AsyncHttpClientConfig {
7575
private final boolean keepEncodingHeader;
7676
private final ProxyServerSelector proxyServerSelector;
7777
private final boolean validateResponseHeaders;
78+
private final boolean aggregateWebSocketFrameFragments;
7879

7980
// timeouts
8081
private final int connectTimeout;
@@ -148,6 +149,7 @@ private DefaultAsyncHttpClientConfig(//
148149
boolean keepEncodingHeader,//
149150
ProxyServerSelector proxyServerSelector,//
150151
boolean validateResponseHeaders,//
152+
boolean aggregateWebSocketFrameFragments,
151153

152154
// timeouts
153155
int connectTimeout,//
@@ -222,6 +224,7 @@ private DefaultAsyncHttpClientConfig(//
222224
this.keepEncodingHeader = keepEncodingHeader;
223225
this.proxyServerSelector = proxyServerSelector;
224226
this.validateResponseHeaders = validateResponseHeaders;
227+
this.aggregateWebSocketFrameFragments = aggregateWebSocketFrameFragments;
225228

226229
// timeouts
227230
this.connectTimeout = connectTimeout;
@@ -418,6 +421,11 @@ public boolean isValidateResponseHeaders() {
418421
return validateResponseHeaders;
419422
}
420423

424+
@Override
425+
public boolean isAggregateWebSocketFrameFragments() {
426+
return aggregateWebSocketFrameFragments;
427+
}
428+
421429
// ssl
422430
@Override
423431
public boolean isUseOpenSsl() {
@@ -617,6 +625,7 @@ public static class Builder {
617625
private boolean useProxySelector = defaultUseProxySelector();
618626
private boolean useProxyProperties = defaultUseProxyProperties();
619627
private boolean validateResponseHeaders = defaultValidateResponseHeaders();
628+
private boolean aggregateWebSocketFrameFragments = defaultAggregateWebSocketFrameFragments();
620629

621630
// timeouts
622631
private int connectTimeout = defaultConnectTimeout();
@@ -819,6 +828,11 @@ public Builder setValidateResponseHeaders(boolean validateResponseHeaders) {
819828
return this;
820829
}
821830

831+
public Builder setAggregateWebSocketFrameFragments(boolean aggregateWebSocketFrameFragments) {
832+
this.aggregateWebSocketFrameFragments = aggregateWebSocketFrameFragments;
833+
return this;
834+
}
835+
822836
public Builder setProxyServer(ProxyServer proxyServer) {
823837
this.proxyServerSelector = uri -> proxyServer;
824838
return this;
@@ -1123,6 +1137,7 @@ public DefaultAsyncHttpClientConfig build() {
11231137
keepEncodingHeader, //
11241138
resolveProxyServerSelector(), //
11251139
validateResponseHeaders, //
1140+
aggregateWebSocketFrameFragments, //
11261141
connectTimeout, //
11271142
requestTimeout, //
11281143
readTimeout, //

client/src/main/java/org/asynchttpclient/config/AsyncHttpClientConfigDefaults.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@ public static boolean defaultValidateResponseHeaders() {
9898
return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getBoolean(ASYNC_CLIENT_CONFIG_ROOT + "validateResponseHeaders");
9999
}
100100

101+
public static boolean defaultAggregateWebSocketFrameFragments() {
102+
return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getBoolean(ASYNC_CLIENT_CONFIG_ROOT + "aggregateWebSocketFrameFragments");
103+
}
104+
101105
public static boolean defaultStrict302Handling() {
102106
return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getBoolean(ASYNC_CLIENT_CONFIG_ROOT + "strict302Handling");
103107
}

client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,9 @@ public Bootstrap getBootstrap(Uri uri, ProxyServer proxy) {
396396
public void upgradePipelineForWebSockets(ChannelPipeline pipeline) {
397397
pipeline.addAfter(HTTP_CLIENT_CODEC, WS_ENCODER_HANDLER, new WebSocket08FrameEncoder(true));
398398
pipeline.addBefore(AHC_WS_HANDLER, WS_DECODER_HANDLER, new WebSocket08FrameDecoder(false, false, config.getWebSocketMaxFrameSize()));
399-
pipeline.addAfter(WS_DECODER_HANDLER, WS_FRAME_AGGREGATOR, new WebSocketFrameAggregator(config.getWebSocketMaxBufferSize()));
399+
if (config.isAggregateWebSocketFrameFragments()) {
400+
pipeline.addAfter(WS_DECODER_HANDLER, WS_FRAME_AGGREGATOR, new WebSocketFrameAggregator(config.getWebSocketMaxBufferSize()));
401+
}
400402
pipeline.remove(HTTP_CLIENT_CODEC);
401403
}
402404

client/src/main/java/org/asynchttpclient/netty/handler/WebSocketHandler.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,14 @@ private void abort(Channel channel, NettyResponseFuture<?> future, WebSocketUpgr
9191
}
9292
}
9393

94+
private static WebSocketUpgradeHandler getWebSocketUpgradeHandler(NettyResponseFuture<?> future) {
95+
return (WebSocketUpgradeHandler) future.getAsyncHandler();
96+
}
97+
98+
private static NettyWebSocket getNettyWebSocket(NettyResponseFuture<?> future) throws Exception {
99+
return getWebSocketUpgradeHandler(future).onCompleted();
100+
}
101+
94102
@Override
95103
public void handleRead(Channel channel, NettyResponseFuture<?> future, Object e) throws Exception {
96104

@@ -101,7 +109,7 @@ public void handleRead(Channel channel, NettyResponseFuture<?> future, Object e)
101109
logger.debug("\n\nRequest {}\n\nResponse {}\n", httpRequest, response);
102110
}
103111

104-
WebSocketUpgradeHandler handler = (WebSocketUpgradeHandler) future.getAsyncHandler();
112+
WebSocketUpgradeHandler handler = getWebSocketUpgradeHandler(future);
105113
HttpResponseStatus status = new NettyResponseStatus(future.getUri(), response, channel);
106114
HttpResponseHeaders responseHeaders = new HttpResponseHeaders(response.headers());
107115

@@ -116,9 +124,8 @@ public void handleRead(Channel channel, NettyResponseFuture<?> future, Object e)
116124
}
117125

118126
} else if (e instanceof WebSocketFrame) {
119-
final WebSocketFrame frame = (WebSocketFrame) e;
120-
WebSocketUpgradeHandler handler = (WebSocketUpgradeHandler) future.getAsyncHandler();
121-
NettyWebSocket webSocket = handler.onCompleted();
127+
WebSocketFrame frame = (WebSocketFrame) e;
128+
NettyWebSocket webSocket = getNettyWebSocket(future);
122129
// retain because we might buffer the frame
123130
if (webSocket.isReady()) {
124131
webSocket.handleFrame(frame);
@@ -139,11 +146,10 @@ public void handleException(NettyResponseFuture<?> future, Throwable e) {
139146
logger.warn("onError", e);
140147

141148
try {
142-
WebSocketUpgradeHandler h = (WebSocketUpgradeHandler) future.getAsyncHandler();
143-
NettyWebSocket webSocket = h.onCompleted();
149+
NettyWebSocket webSocket = getNettyWebSocket(future);
144150
if (webSocket != null) {
145151
webSocket.onError(e.getCause());
146-
webSocket.close();
152+
webSocket.sendCloseFrame();
147153
}
148154
} catch (Throwable t) {
149155
logger.error("onError", t);
@@ -152,15 +158,13 @@ public void handleException(NettyResponseFuture<?> future, Throwable e) {
152158

153159
@Override
154160
public void handleChannelInactive(NettyResponseFuture<?> future) {
155-
logger.trace("onClose");
161+
logger.trace("Connection was closed abnormally (that is, with no close frame being received).");
156162

157163
try {
158-
WebSocketUpgradeHandler h = (WebSocketUpgradeHandler) future.getAsyncHandler();
159-
NettyWebSocket webSocket = h.onCompleted();
160-
161-
logger.trace("Connection was closed abnormally (that is, with no close frame being received).");
162-
if (webSocket != null)
163-
webSocket.close(1006, "Connection was closed abnormally (that is, with no close frame being received).");
164+
NettyWebSocket webSocket = getNettyWebSocket(future);
165+
if (webSocket != null) {
166+
webSocket.onClose(1006, "Connection was closed abnormally (that is, with no close frame being received).");
167+
}
164168
} catch (Throwable t) {
165169
logger.error("onError", t);
166170
}

client/src/main/java/org/asynchttpclient/netty/request/NettyRequestSender.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -339,8 +339,7 @@ public <T> void writeRequest(NettyResponseFuture<T> future, Channel channel) {
339339
HttpRequest httpRequest = nettyRequest.getHttpRequest();
340340
AsyncHandler<T> handler = future.getAsyncHandler();
341341

342-
// if the channel is dead because it was pooled and the remote
343-
// server decided to close it,
342+
// if the channel is dead because it was pooled and the remote server decided to close it,
344343
// we just let it go and the channelInactive do its work
345344
if (!Channels.isChannelValid(channel))
346345
return;
@@ -366,6 +365,7 @@ public <T> void writeRequest(NettyResponseFuture<T> future, Channel channel) {
366365

367366
// if the request has a body, we want to track progress
368367
if (writeBody) {
368+
// FIXME does this really work??? the promise is for the request without body!!!
369369
ChannelProgressivePromise promise = channel.newProgressivePromise();
370370
ChannelFuture f = channel.write(httpRequest, promise);
371371
f.addListener(new WriteProgressListener(future, true, 0L));

0 commit comments

Comments
 (0)