Skip to content

Commit 69df893

Browse files
committed
[CELEBORN-2258][FOLLOWUP] Use IoHandlerFactories for EventLoopGroups to replace deprecated transport-specific event loop groups
### What changes were proposed in this pull request? Use IoHandlerFactories for EventLoopGroups to replace deprecated transport-specific event loop groups. Backport: apache/spark#52719. ### Why are the changes needed? Netty 4.2 introduces some new APIs, and deprecates some old APIs. As part of your migration to Netty 4.2, we encourage you to look through your code base for opportunities to clean up any use of deprecated APIs. - **IoHandlerFactories for EventLoopGroups** All transport-specific event loop groups, such as `NioEventLoopGroup`, have been deprecated. Integrators should now instead pass a transport-specific `IoHandlerFactory` to a `MultiThreadedEventLoopGroup` constructor. Therefore, Netty 4.2 upgrade could follow the best practices from https://netty.io/wiki/netty-4.2-migration-guide.html#new-best-practices. ### Does this PR resolve a correctness bug? No. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? CI. Closes #3669 from SteNicholas/CELEBORN-2258. Authored-by: SteNicholas <programgeek@163.com> Signed-off-by: SteNicholas <programgeek@163.com>
1 parent 59fd7a8 commit 69df893

1 file changed

Lines changed: 23 additions & 15 deletions

File tree

  • common/src/main/java/org/apache/celeborn/common/network/util

common/src/main/java/org/apache/celeborn/common/network/util/NettyUtils.java

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
package org.apache.celeborn.common.network.util;
1919

20-
import java.nio.channels.spi.SelectorProvider;
20+
import static org.apache.celeborn.common.network.util.IOMode.NIO;
21+
2122
import java.util.ArrayList;
2223
import java.util.Collections;
2324
import java.util.HashMap;
@@ -31,16 +32,17 @@
3132
import io.netty.buffer.PooledByteBufAllocator;
3233
import io.netty.buffer.UnpooledByteBufAllocator;
3334
import io.netty.channel.Channel;
34-
import io.netty.channel.DefaultSelectStrategyFactory;
3535
import io.netty.channel.EventLoopGroup;
36+
import io.netty.channel.IoHandlerFactory;
37+
import io.netty.channel.MultiThreadIoEventLoopGroup;
3638
import io.netty.channel.ServerChannel;
37-
import io.netty.channel.epoll.EpollEventLoopGroup;
39+
import io.netty.channel.epoll.EpollIoHandler;
3840
import io.netty.channel.epoll.EpollServerSocketChannel;
3941
import io.netty.channel.epoll.EpollSocketChannel;
40-
import io.netty.channel.kqueue.KQueueEventLoopGroup;
42+
import io.netty.channel.kqueue.KQueueIoHandler;
4143
import io.netty.channel.kqueue.KQueueServerSocketChannel;
4244
import io.netty.channel.kqueue.KQueueSocketChannel;
43-
import io.netty.channel.nio.NioEventLoopGroup;
45+
import io.netty.channel.nio.NioIoHandler;
4446
import io.netty.channel.socket.nio.NioServerSocketChannel;
4547
import io.netty.channel.socket.nio.NioSocketChannel;
4648
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -75,23 +77,29 @@ public static EventLoopGroup createEventLoop(
7577
IOMode mode, int numThreads, boolean conflictAvoidChooserEnable, String threadPrefix) {
7678
ThreadFactory threadFactory = createThreadFactory(threadPrefix);
7779

80+
IoHandlerFactory handlerFactory;
7881
switch (mode) {
7982
case NIO:
80-
return conflictAvoidChooserEnable
81-
? new NioEventLoopGroup(
82-
numThreads,
83-
new ThreadPerTaskExecutor(threadFactory),
84-
ConflictAvoidEventExecutorChooserFactory.INSTANCE,
85-
SelectorProvider.provider(),
86-
DefaultSelectStrategyFactory.INSTANCE)
87-
: new NioEventLoopGroup(numThreads, threadFactory);
83+
handlerFactory = NioIoHandler.newFactory();
84+
break;
8885
case EPOLL:
89-
return new EpollEventLoopGroup(numThreads, threadFactory);
86+
handlerFactory = EpollIoHandler.newFactory();
87+
break;
9088
case KQUEUE:
91-
return new KQueueEventLoopGroup(numThreads, threadFactory);
89+
handlerFactory = KQueueIoHandler.newFactory();
90+
break;
9291
default:
9392
throw new IllegalArgumentException("Unknown io mode: " + mode);
9493
}
94+
if (mode == NIO && conflictAvoidChooserEnable) {
95+
return new MultiThreadIoEventLoopGroup(
96+
numThreads,
97+
new ThreadPerTaskExecutor(threadFactory),
98+
ConflictAvoidEventExecutorChooserFactory.INSTANCE,
99+
handlerFactory);
100+
} else {
101+
return new MultiThreadIoEventLoopGroup(numThreads, threadFactory, handlerFactory);
102+
}
95103
}
96104

97105
/** Returns the correct (client) SocketChannel class based on IOMode. */

0 commit comments

Comments
 (0)