diff --git a/client/pom.xml b/client/pom.xml index 550b5fd47e..858c5f9ad7 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -55,6 +55,11 @@ netty-transport-native-epoll linux-x86_64 + + io.netty + netty-transport-native-kqueue + osx-x86_64 + io.netty netty-resolver-dns diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java index ac18269c28..eaa7032e41 100755 --- a/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java @@ -16,10 +16,11 @@ import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.*; +import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.kqueue.KQueueEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.oio.OioEventLoopGroup; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpContentDecompressor; import io.netty.handler.codec.http.websocketx.WebSocket08FrameDecoder; @@ -119,31 +120,31 @@ public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) { // check if external EventLoopGroup is defined ThreadFactory threadFactory = config.getThreadFactory() != null ? config.getThreadFactory() : new DefaultThreadFactory(config.getThreadPoolName()); allowReleaseEventLoopGroup = config.getEventLoopGroup() == null; - ChannelFactory channelFactory; + TransportFactory transportFactory; if (allowReleaseEventLoopGroup) { if (config.isUseNativeTransport()) { - eventLoopGroup = newEpollEventLoopGroup(config.getIoThreadsCount(), threadFactory); - channelFactory = getEpollSocketChannelFactory(); - + transportFactory = getNativeTransportFactory(); } else { - eventLoopGroup = new NioEventLoopGroup(config.getIoThreadsCount(), threadFactory); - channelFactory = NioSocketChannelFactory.INSTANCE; + transportFactory = NioTransportFactory.INSTANCE; } + eventLoopGroup = transportFactory.newEventLoopGroup(config.getIoThreadsCount(), threadFactory); } else { eventLoopGroup = config.getEventLoopGroup(); - if (eventLoopGroup instanceof OioEventLoopGroup) - throw new IllegalArgumentException("Oio is not supported"); if (eventLoopGroup instanceof NioEventLoopGroup) { - channelFactory = NioSocketChannelFactory.INSTANCE; + transportFactory = NioTransportFactory.INSTANCE; + } else if (eventLoopGroup instanceof EpollEventLoopGroup) { + transportFactory = new EpollTransportFactory(); + } else if (eventLoopGroup instanceof KQueueEventLoopGroup) { + transportFactory = new KQueueTransportFactory(); } else { - channelFactory = getEpollSocketChannelFactory(); + throw new IllegalArgumentException("Unknown event loop group " + eventLoopGroup.getClass().getSimpleName()); } } - httpBootstrap = newBootstrap(channelFactory, eventLoopGroup, config); - wsBootstrap = newBootstrap(channelFactory, eventLoopGroup, config); + httpBootstrap = newBootstrap(transportFactory, eventLoopGroup, config); + wsBootstrap = newBootstrap(transportFactory, eventLoopGroup, config); // for reactive streams httpBootstrap.option(ChannelOption.AUTO_READ, false); @@ -184,21 +185,16 @@ private Bootstrap newBootstrap(ChannelFactory channelFactory, return bootstrap; } - private EventLoopGroup newEpollEventLoopGroup(int ioThreadsCount, ThreadFactory threadFactory) { - try { - Class epollEventLoopGroupClass = Class.forName("io.netty.channel.epoll.EpollEventLoopGroup"); - return (EventLoopGroup) epollEventLoopGroupClass.getConstructor(int.class, ThreadFactory.class).newInstance(ioThreadsCount, threadFactory); - } catch (Exception e) { - throw new IllegalArgumentException(e); - } - } - @SuppressWarnings("unchecked") - private ChannelFactory getEpollSocketChannelFactory() { + private TransportFactory getNativeTransportFactory() { try { - return (ChannelFactory) Class.forName("org.asynchttpclient.netty.channel.EpollSocketChannelFactory").newInstance(); + return (TransportFactory) Class.forName("org.asynchttpclient.netty.channel.EpollTransportFactory").newInstance(); } catch (Exception e) { - throw new IllegalArgumentException(e); + try { + return (TransportFactory) Class.forName("org.asynchttpclient.netty.channel.KQueueTransportFactory").newInstance(); + } catch (Exception e1) { + throw new IllegalArgumentException("No suitable native transport (epoll or kqueue) available"); + } } } diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/EpollTransportFactory.java b/client/src/main/java/org/asynchttpclient/netty/channel/EpollTransportFactory.java new file mode 100644 index 0000000000..8f84272916 --- /dev/null +++ b/client/src/main/java/org/asynchttpclient/netty/channel/EpollTransportFactory.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2016 AsyncHttpClient Project. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package org.asynchttpclient.netty.channel; + +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollSocketChannel; + +import java.util.concurrent.ThreadFactory; + +class EpollTransportFactory implements TransportFactory { + + EpollTransportFactory() { + try { + Class.forName("io.netty.channel.epoll.Epoll"); + } catch (ClassNotFoundException e) { + throw new IllegalStateException("The epoll transport is not available"); + } + if (!Epoll.isAvailable()) { + throw new IllegalStateException("The epoll transport is not supported"); + } + } + + @Override + public EpollSocketChannel newChannel() { + return new EpollSocketChannel(); + } + + @Override + public EpollEventLoopGroup newEventLoopGroup(int ioThreadsCount, ThreadFactory threadFactory) { + return new EpollEventLoopGroup(ioThreadsCount, threadFactory); + } +} diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/KQueueTransportFactory.java b/client/src/main/java/org/asynchttpclient/netty/channel/KQueueTransportFactory.java new file mode 100644 index 0000000000..f54fe46157 --- /dev/null +++ b/client/src/main/java/org/asynchttpclient/netty/channel/KQueueTransportFactory.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2019 AsyncHttpClient Project. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at + * http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package org.asynchttpclient.netty.channel; + +import io.netty.channel.kqueue.KQueue; +import io.netty.channel.kqueue.KQueueEventLoopGroup; +import io.netty.channel.kqueue.KQueueSocketChannel; + +import java.util.concurrent.ThreadFactory; + +class KQueueTransportFactory implements TransportFactory { + + KQueueTransportFactory() { + try { + Class.forName("io.netty.channel.kqueue.KQueue"); + } catch (ClassNotFoundException e) { + throw new IllegalStateException("The kqueue transport is not available"); + } + if (!KQueue.isAvailable()) { + throw new IllegalStateException("The kqueue transport is not supported"); + } + } + + @Override + public KQueueSocketChannel newChannel() { + return new KQueueSocketChannel(); + } + + @Override + public KQueueEventLoopGroup newEventLoopGroup(int ioThreadsCount, ThreadFactory threadFactory) { + return new KQueueEventLoopGroup(ioThreadsCount, threadFactory); + } +} diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/EpollSocketChannelFactory.java b/client/src/main/java/org/asynchttpclient/netty/channel/NioTransportFactory.java similarity index 55% rename from client/src/main/java/org/asynchttpclient/netty/channel/EpollSocketChannelFactory.java rename to client/src/main/java/org/asynchttpclient/netty/channel/NioTransportFactory.java index c6970b6d6c..d691ff270a 100644 --- a/client/src/main/java/org/asynchttpclient/netty/channel/EpollSocketChannelFactory.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/NioTransportFactory.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 AsyncHttpClient Project. All rights reserved. + * Copyright (c) 2019 AsyncHttpClient Project. All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0. @@ -13,13 +13,22 @@ */ package org.asynchttpclient.netty.channel; -import io.netty.channel.ChannelFactory; -import io.netty.channel.epoll.EpollSocketChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; -class EpollSocketChannelFactory implements ChannelFactory { +import java.util.concurrent.ThreadFactory; + +enum NioTransportFactory implements TransportFactory { + + INSTANCE; + + @Override + public NioSocketChannel newChannel() { + return new NioSocketChannel(); + } @Override - public EpollSocketChannel newChannel() { - return new EpollSocketChannel(); + public NioEventLoopGroup newEventLoopGroup(int ioThreadsCount, ThreadFactory threadFactory) { + return new NioEventLoopGroup(ioThreadsCount, threadFactory); } } diff --git a/client/src/main/java/org/asynchttpclient/netty/channel/NioSocketChannelFactory.java b/client/src/main/java/org/asynchttpclient/netty/channel/TransportFactory.java similarity index 67% rename from client/src/main/java/org/asynchttpclient/netty/channel/NioSocketChannelFactory.java rename to client/src/main/java/org/asynchttpclient/netty/channel/TransportFactory.java index 907623bba6..76f45c2d28 100644 --- a/client/src/main/java/org/asynchttpclient/netty/channel/NioSocketChannelFactory.java +++ b/client/src/main/java/org/asynchttpclient/netty/channel/TransportFactory.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016 AsyncHttpClient Project. All rights reserved. + * Copyright (c) 2019 AsyncHttpClient Project. All rights reserved. * * This program is licensed to you under the Apache License Version 2.0, * and you may not use this file except in compliance with the Apache License Version 2.0. @@ -13,15 +13,14 @@ */ package org.asynchttpclient.netty.channel; +import io.netty.channel.Channel; import io.netty.channel.ChannelFactory; -import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.channel.EventLoopGroup; -enum NioSocketChannelFactory implements ChannelFactory { +import java.util.concurrent.ThreadFactory; - INSTANCE; +public interface TransportFactory extends ChannelFactory { + + L newEventLoopGroup(int ioThreadsCount, ThreadFactory threadFactory); - @Override - public NioSocketChannel newChannel() { - return new NioSocketChannel(); - } } diff --git a/pom.xml b/pom.xml index 6004c2c2db..fdcfde0e5d 100644 --- a/pom.xml +++ b/pom.xml @@ -299,6 +299,13 @@ ${netty.version} true + + io.netty + netty-transport-native-kqueue + osx-x86_64 + ${netty.version} + true + org.reactivestreams reactive-streams