diff --git a/client/pom.xml b/client/pom.xml
index 550b5fd47e..858c5f9ad7 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -55,6 +55,11 @@
netty-transport-native-epoll
linux-x86_64
+
+ io.netty
+ netty-transport-native-kqueue
+ osx-x86_64
+
io.netty
netty-resolver-dns
diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java
index ac18269c28..eaa7032e41 100755
--- a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java
+++ b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java
@@ -16,10 +16,11 @@
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.*;
+import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.websocketx.WebSocket08FrameDecoder;
@@ -119,31 +120,31 @@ public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) {
// check if external EventLoopGroup is defined
ThreadFactory threadFactory = config.getThreadFactory() != null ? config.getThreadFactory() : new DefaultThreadFactory(config.getThreadPoolName());
allowReleaseEventLoopGroup = config.getEventLoopGroup() == null;
- ChannelFactory extends Channel> channelFactory;
+ TransportFactory extends Channel, ? extends EventLoopGroup> transportFactory;
if (allowReleaseEventLoopGroup) {
if (config.isUseNativeTransport()) {
- eventLoopGroup = newEpollEventLoopGroup(config.getIoThreadsCount(), threadFactory);
- channelFactory = getEpollSocketChannelFactory();
-
+ transportFactory = getNativeTransportFactory();
} else {
- eventLoopGroup = new NioEventLoopGroup(config.getIoThreadsCount(), threadFactory);
- channelFactory = NioSocketChannelFactory.INSTANCE;
+ transportFactory = NioTransportFactory.INSTANCE;
}
+ eventLoopGroup = transportFactory.newEventLoopGroup(config.getIoThreadsCount(), threadFactory);
} else {
eventLoopGroup = config.getEventLoopGroup();
- if (eventLoopGroup instanceof OioEventLoopGroup)
- throw new IllegalArgumentException("Oio is not supported");
if (eventLoopGroup instanceof NioEventLoopGroup) {
- channelFactory = NioSocketChannelFactory.INSTANCE;
+ transportFactory = NioTransportFactory.INSTANCE;
+ } else if (eventLoopGroup instanceof EpollEventLoopGroup) {
+ transportFactory = new EpollTransportFactory();
+ } else if (eventLoopGroup instanceof KQueueEventLoopGroup) {
+ transportFactory = new KQueueTransportFactory();
} else {
- channelFactory = getEpollSocketChannelFactory();
+ throw new IllegalArgumentException("Unknown event loop group " + eventLoopGroup.getClass().getSimpleName());
}
}
- httpBootstrap = newBootstrap(channelFactory, eventLoopGroup, config);
- wsBootstrap = newBootstrap(channelFactory, eventLoopGroup, config);
+ httpBootstrap = newBootstrap(transportFactory, eventLoopGroup, config);
+ wsBootstrap = newBootstrap(transportFactory, eventLoopGroup, config);
// for reactive streams
httpBootstrap.option(ChannelOption.AUTO_READ, false);
@@ -184,21 +185,16 @@ private Bootstrap newBootstrap(ChannelFactory extends Channel> channelFactory,
return bootstrap;
}
- private EventLoopGroup newEpollEventLoopGroup(int ioThreadsCount, ThreadFactory threadFactory) {
- try {
- Class> epollEventLoopGroupClass = Class.forName("io.netty.channel.epoll.EpollEventLoopGroup");
- return (EventLoopGroup) epollEventLoopGroupClass.getConstructor(int.class, ThreadFactory.class).newInstance(ioThreadsCount, threadFactory);
- } catch (Exception e) {
- throw new IllegalArgumentException(e);
- }
- }
-
@SuppressWarnings("unchecked")
- private ChannelFactory extends Channel> getEpollSocketChannelFactory() {
+ private TransportFactory extends Channel, ? extends EventLoopGroup> getNativeTransportFactory() {
try {
- return (ChannelFactory extends Channel>) Class.forName("org.asynchttpclient.netty.channel.EpollSocketChannelFactory").newInstance();
+ return (TransportFactory extends Channel, ? extends EventLoopGroup>) Class.forName("org.asynchttpclient.netty.channel.EpollTransportFactory").newInstance();
} catch (Exception e) {
- throw new IllegalArgumentException(e);
+ try {
+ return (TransportFactory extends Channel, ? extends EventLoopGroup>) Class.forName("org.asynchttpclient.netty.channel.KQueueTransportFactory").newInstance();
+ } catch (Exception e1) {
+ throw new IllegalArgumentException("No suitable native transport (epoll or kqueue) available");
+ }
}
}
diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/EpollTransportFactory.java b/client/src/main/java/org/asynchttpclient/netty/channel/EpollTransportFactory.java
new file mode 100644
index 0000000000..8f84272916
--- /dev/null
+++ b/client/src/main/java/org/asynchttpclient/netty/channel/EpollTransportFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2016 AsyncHttpClient Project. All rights reserved.
+ *
+ * This program is licensed to you under the Apache License Version 2.0,
+ * and you may not use this file except in compliance with the Apache License Version 2.0.
+ * You may obtain a copy of the Apache License Version 2.0 at
+ * http://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the Apache License Version 2.0 is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
+ */
+package org.asynchttpclient.netty.channel;
+
+import io.netty.channel.epoll.Epoll;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollSocketChannel;
+
+import java.util.concurrent.ThreadFactory;
+
+class EpollTransportFactory implements TransportFactory {
+
+ EpollTransportFactory() {
+ try {
+ Class.forName("io.netty.channel.epoll.Epoll");
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException("The epoll transport is not available");
+ }
+ if (!Epoll.isAvailable()) {
+ throw new IllegalStateException("The epoll transport is not supported");
+ }
+ }
+
+ @Override
+ public EpollSocketChannel newChannel() {
+ return new EpollSocketChannel();
+ }
+
+ @Override
+ public EpollEventLoopGroup newEventLoopGroup(int ioThreadsCount, ThreadFactory threadFactory) {
+ return new EpollEventLoopGroup(ioThreadsCount, threadFactory);
+ }
+}
diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/KQueueTransportFactory.java b/client/src/main/java/org/asynchttpclient/netty/channel/KQueueTransportFactory.java
new file mode 100644
index 0000000000..f54fe46157
--- /dev/null
+++ b/client/src/main/java/org/asynchttpclient/netty/channel/KQueueTransportFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright (c) 2019 AsyncHttpClient Project. All rights reserved.
+ *
+ * This program is licensed to you under the Apache License Version 2.0,
+ * and you may not use this file except in compliance with the Apache License Version 2.0.
+ * You may obtain a copy of the Apache License Version 2.0 at
+ * http://www.apache.org/licenses/LICENSE-2.0.
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the Apache License Version 2.0 is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
+ */
+package org.asynchttpclient.netty.channel;
+
+import io.netty.channel.kqueue.KQueue;
+import io.netty.channel.kqueue.KQueueEventLoopGroup;
+import io.netty.channel.kqueue.KQueueSocketChannel;
+
+import java.util.concurrent.ThreadFactory;
+
+class KQueueTransportFactory implements TransportFactory {
+
+ KQueueTransportFactory() {
+ try {
+ Class.forName("io.netty.channel.kqueue.KQueue");
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException("The kqueue transport is not available");
+ }
+ if (!KQueue.isAvailable()) {
+ throw new IllegalStateException("The kqueue transport is not supported");
+ }
+ }
+
+ @Override
+ public KQueueSocketChannel newChannel() {
+ return new KQueueSocketChannel();
+ }
+
+ @Override
+ public KQueueEventLoopGroup newEventLoopGroup(int ioThreadsCount, ThreadFactory threadFactory) {
+ return new KQueueEventLoopGroup(ioThreadsCount, threadFactory);
+ }
+}
diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/EpollSocketChannelFactory.java b/client/src/main/java/org/asynchttpclient/netty/channel/NioTransportFactory.java
similarity index 55%
rename from client/src/main/java/org/asynchttpclient/netty/channel/EpollSocketChannelFactory.java
rename to client/src/main/java/org/asynchttpclient/netty/channel/NioTransportFactory.java
index c6970b6d6c..d691ff270a 100644
--- a/client/src/main/java/org/asynchttpclient/netty/channel/EpollSocketChannelFactory.java
+++ b/client/src/main/java/org/asynchttpclient/netty/channel/NioTransportFactory.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016 AsyncHttpClient Project. All rights reserved.
+ * Copyright (c) 2019 AsyncHttpClient Project. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
@@ -13,13 +13,22 @@
*/
package org.asynchttpclient.netty.channel;
-import io.netty.channel.ChannelFactory;
-import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
-class EpollSocketChannelFactory implements ChannelFactory {
+import java.util.concurrent.ThreadFactory;
+
+enum NioTransportFactory implements TransportFactory {
+
+ INSTANCE;
+
+ @Override
+ public NioSocketChannel newChannel() {
+ return new NioSocketChannel();
+ }
@Override
- public EpollSocketChannel newChannel() {
- return new EpollSocketChannel();
+ public NioEventLoopGroup newEventLoopGroup(int ioThreadsCount, ThreadFactory threadFactory) {
+ return new NioEventLoopGroup(ioThreadsCount, threadFactory);
}
}
diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/NioSocketChannelFactory.java b/client/src/main/java/org/asynchttpclient/netty/channel/TransportFactory.java
similarity index 67%
rename from client/src/main/java/org/asynchttpclient/netty/channel/NioSocketChannelFactory.java
rename to client/src/main/java/org/asynchttpclient/netty/channel/TransportFactory.java
index 907623bba6..76f45c2d28 100644
--- a/client/src/main/java/org/asynchttpclient/netty/channel/NioSocketChannelFactory.java
+++ b/client/src/main/java/org/asynchttpclient/netty/channel/TransportFactory.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2016 AsyncHttpClient Project. All rights reserved.
+ * Copyright (c) 2019 AsyncHttpClient Project. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
@@ -13,15 +13,14 @@
*/
package org.asynchttpclient.netty.channel;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
-import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.channel.EventLoopGroup;
-enum NioSocketChannelFactory implements ChannelFactory {
+import java.util.concurrent.ThreadFactory;
- INSTANCE;
+public interface TransportFactory extends ChannelFactory {
+
+ L newEventLoopGroup(int ioThreadsCount, ThreadFactory threadFactory);
- @Override
- public NioSocketChannel newChannel() {
- return new NioSocketChannel();
- }
}
diff --git a/pom.xml b/pom.xml
index 6004c2c2db..fdcfde0e5d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -299,6 +299,13 @@
${netty.version}
true
+
+ io.netty
+ netty-transport-native-kqueue
+ osx-x86_64
+ ${netty.version}
+ true
+
org.reactivestreams
reactive-streams