Skip to content

Support Netty kqueue transport #1665

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<classifier>osx-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-resolver-dns</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.websocketx.WebSocket08FrameDecoder;
Expand Down Expand Up @@ -119,31 +120,31 @@ public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) {
// check if external EventLoopGroup is defined
ThreadFactory threadFactory = config.getThreadFactory() != null ? config.getThreadFactory() : new DefaultThreadFactory(config.getThreadPoolName());
allowReleaseEventLoopGroup = config.getEventLoopGroup() == null;
ChannelFactory<? extends Channel> channelFactory;
TransportFactory<? extends Channel, ? extends EventLoopGroup> transportFactory;
if (allowReleaseEventLoopGroup) {
if (config.isUseNativeTransport()) {
eventLoopGroup = newEpollEventLoopGroup(config.getIoThreadsCount(), threadFactory);
channelFactory = getEpollSocketChannelFactory();

transportFactory = getNativeTransportFactory();
} else {
eventLoopGroup = new NioEventLoopGroup(config.getIoThreadsCount(), threadFactory);
channelFactory = NioSocketChannelFactory.INSTANCE;
transportFactory = NioTransportFactory.INSTANCE;
}
eventLoopGroup = transportFactory.newEventLoopGroup(config.getIoThreadsCount(), threadFactory);

} else {
eventLoopGroup = config.getEventLoopGroup();
if (eventLoopGroup instanceof OioEventLoopGroup)
throw new IllegalArgumentException("Oio is not supported");

if (eventLoopGroup instanceof NioEventLoopGroup) {
channelFactory = NioSocketChannelFactory.INSTANCE;
transportFactory = NioTransportFactory.INSTANCE;
} else if (eventLoopGroup instanceof EpollEventLoopGroup) {
transportFactory = new EpollTransportFactory();
} else if (eventLoopGroup instanceof KQueueEventLoopGroup) {
transportFactory = new KQueueTransportFactory();
} else {
channelFactory = getEpollSocketChannelFactory();
throw new IllegalArgumentException("Unknown event loop group " + eventLoopGroup.getClass().getSimpleName());
}
}

httpBootstrap = newBootstrap(channelFactory, eventLoopGroup, config);
wsBootstrap = newBootstrap(channelFactory, eventLoopGroup, config);
httpBootstrap = newBootstrap(transportFactory, eventLoopGroup, config);
wsBootstrap = newBootstrap(transportFactory, eventLoopGroup, config);

// for reactive streams
httpBootstrap.option(ChannelOption.AUTO_READ, false);
Expand Down Expand Up @@ -184,21 +185,16 @@ private Bootstrap newBootstrap(ChannelFactory<? extends Channel> channelFactory,
return bootstrap;
}

private EventLoopGroup newEpollEventLoopGroup(int ioThreadsCount, ThreadFactory threadFactory) {
try {
Class<?> epollEventLoopGroupClass = Class.forName("io.netty.channel.epoll.EpollEventLoopGroup");
return (EventLoopGroup) epollEventLoopGroupClass.getConstructor(int.class, ThreadFactory.class).newInstance(ioThreadsCount, threadFactory);
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
}

@SuppressWarnings("unchecked")
private ChannelFactory<? extends Channel> getEpollSocketChannelFactory() {
private TransportFactory<? extends Channel, ? extends EventLoopGroup> getNativeTransportFactory() {
try {
return (ChannelFactory<? extends Channel>) Class.forName("org.asynchttpclient.netty.channel.EpollSocketChannelFactory").newInstance();
return (TransportFactory<? extends Channel, ? extends EventLoopGroup>) Class.forName("org.asynchttpclient.netty.channel.EpollTransportFactory").newInstance();
} catch (Exception e) {
throw new IllegalArgumentException(e);
try {
return (TransportFactory<? extends Channel, ? extends EventLoopGroup>) Class.forName("org.asynchttpclient.netty.channel.KQueueTransportFactory").newInstance();
} catch (Exception e1) {
throw new IllegalArgumentException("No suitable native transport (epoll or kqueue) available");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2016 AsyncHttpClient Project. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package org.asynchttpclient.netty.channel;

import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;

import java.util.concurrent.ThreadFactory;

class EpollTransportFactory implements TransportFactory<EpollSocketChannel, EpollEventLoopGroup> {

EpollTransportFactory() {
try {
Class.forName("io.netty.channel.epoll.Epoll");
} catch (ClassNotFoundException e) {
throw new IllegalStateException("The epoll transport is not available");
}
if (!Epoll.isAvailable()) {
throw new IllegalStateException("The epoll transport is not supported");
}
}

@Override
public EpollSocketChannel newChannel() {
return new EpollSocketChannel();
}

@Override
public EpollEventLoopGroup newEventLoopGroup(int ioThreadsCount, ThreadFactory threadFactory) {
return new EpollEventLoopGroup(ioThreadsCount, threadFactory);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2019 AsyncHttpClient Project. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package org.asynchttpclient.netty.channel;

import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.kqueue.KQueueSocketChannel;

import java.util.concurrent.ThreadFactory;

class KQueueTransportFactory implements TransportFactory<KQueueSocketChannel, KQueueEventLoopGroup> {

KQueueTransportFactory() {
try {
Class.forName("io.netty.channel.kqueue.KQueue");
} catch (ClassNotFoundException e) {
throw new IllegalStateException("The kqueue transport is not available");
}
if (!KQueue.isAvailable()) {
throw new IllegalStateException("The kqueue transport is not supported");
}
}

@Override
public KQueueSocketChannel newChannel() {
return new KQueueSocketChannel();
}

@Override
public KQueueEventLoopGroup newEventLoopGroup(int ioThreadsCount, ThreadFactory threadFactory) {
return new KQueueEventLoopGroup(ioThreadsCount, threadFactory);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016 AsyncHttpClient Project. All rights reserved.
* Copyright (c) 2019 AsyncHttpClient Project. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
Expand All @@ -13,13 +13,22 @@
*/
package org.asynchttpclient.netty.channel;

import io.netty.channel.ChannelFactory;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

class EpollSocketChannelFactory implements ChannelFactory<EpollSocketChannel> {
import java.util.concurrent.ThreadFactory;

enum NioTransportFactory implements TransportFactory<NioSocketChannel, NioEventLoopGroup> {

INSTANCE;

@Override
public NioSocketChannel newChannel() {
return new NioSocketChannel();
}

@Override
public EpollSocketChannel newChannel() {
return new EpollSocketChannel();
public NioEventLoopGroup newEventLoopGroup(int ioThreadsCount, ThreadFactory threadFactory) {
return new NioEventLoopGroup(ioThreadsCount, threadFactory);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016 AsyncHttpClient Project. All rights reserved.
* Copyright (c) 2019 AsyncHttpClient Project. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
Expand All @@ -13,15 +13,14 @@
*/
package org.asynchttpclient.netty.channel;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFactory;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.channel.EventLoopGroup;

enum NioSocketChannelFactory implements ChannelFactory<NioSocketChannel> {
import java.util.concurrent.ThreadFactory;

INSTANCE;
public interface TransportFactory<C extends Channel, L extends EventLoopGroup> extends ChannelFactory<C> {

L newEventLoopGroup(int ioThreadsCount, ThreadFactory threadFactory);

@Override
public NioSocketChannel newChannel() {
return new NioSocketChannel();
}
}
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,13 @@
<version>${netty.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<classifier>osx-x86_64</classifier>
<version>${netty.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
Expand Down