Skip to content

Commit e3939e3

Browse files
committed
refactor
1 parent 0747596 commit e3939e3

File tree

2 files changed

+10
-11
lines changed

2 files changed

+10
-11
lines changed

multi-node-testkit/src/main/scala/org/apache/pekko/remote/testconductor/RemoteConnection.scala

+5-6
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,13 @@ package org.apache.pekko.remote.testconductor
1616
import java.net.InetSocketAddress
1717
import java.util.concurrent.TimeUnit
1818

19-
import scala.annotation.nowarn
2019
import scala.util.control.NonFatal
2120

2221
import io.netty.bootstrap.{ Bootstrap, ServerBootstrap }
2322
import io.netty.buffer.{ ByteBuf, ByteBufUtil }
2423
import io.netty.channel._
2524
import io.netty.channel.ChannelHandler.Sharable
26-
import io.netty.channel.nio.NioEventLoopGroup
25+
import io.netty.channel.nio.NioIoHandler
2726
import io.netty.channel.socket.SocketChannel
2827
import io.netty.channel.socket.nio.{ NioServerSocketChannel, NioSocketChannel }
2928
import io.netty.handler.codec.{
@@ -115,7 +114,6 @@ private[pekko] trait RemoteConnection {
115114
/**
116115
* INTERNAL API.
117116
*/
118-
@nowarn("msg=deprecated")
119117
private[pekko] object RemoteConnection {
120118
def apply(
121119
role: Role,
@@ -125,7 +123,7 @@ private[pekko] object RemoteConnection {
125123
role match {
126124
case Client =>
127125
val bootstrap = new Bootstrap()
128-
val eventLoopGroup = new NioEventLoopGroup(poolSize)
126+
val eventLoopGroup = new MultiThreadIoEventLoopGroup(poolSize, NioIoHandler.newFactory())
129127
val cf = bootstrap
130128
.group(eventLoopGroup)
131129
.channel(classOf[NioSocketChannel])
@@ -150,8 +148,9 @@ private[pekko] object RemoteConnection {
150148

151149
case Server =>
152150
val bootstrap = new ServerBootstrap()
153-
val parentEventLoopGroup = new NioEventLoopGroup(poolSize)
154-
val childEventLoopGroup = new NioEventLoopGroup(poolSize)
151+
val ioHandlerFactory = NioIoHandler.newFactory()
152+
val parentEventLoopGroup = new MultiThreadIoEventLoopGroup(poolSize, ioHandlerFactory)
153+
val childEventLoopGroup = new MultiThreadIoEventLoopGroup(poolSize, ioHandlerFactory)
155154
val cf = bootstrap
156155
.group(parentEventLoopGroup, childEventLoopGroup)
157156
.channel(classOf[NioServerSocketChannel])

remote/src/main/scala/org/apache/pekko/remote/transport/netty/NettyTransport.scala

+5-5
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,11 @@ import io.netty.channel.{
5353
ChannelInitializer,
5454
ChannelOption,
5555
ChannelPipeline,
56-
EventLoopGroup
56+
EventLoopGroup,
57+
MultiThreadIoEventLoopGroup
5758
}
5859
import io.netty.channel.group.{ ChannelGroup, ChannelGroupFuture, ChannelMatchers, DefaultChannelGroup }
59-
import io.netty.channel.nio.NioEventLoopGroup
60+
import io.netty.channel.nio.NioIoHandler
6061
import io.netty.channel.socket.SocketChannel
6162
import io.netty.channel.socket.nio.{ NioServerSocketChannel, NioSocketChannel }
6263
import io.netty.handler.codec.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
@@ -366,11 +367,10 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
366367

367368
private val log = Logging.withMarker(system, classOf[NettyTransport])
368369

369-
@nowarn("msg=deprecated")
370370
private def createEventLoopGroup(nThreadCount: Int): EventLoopGroup =
371371
UseDispatcherForIo.map(system.dispatchers.lookup)
372-
.map(executor => new NioEventLoopGroup(0, executor))
373-
.getOrElse(new NioEventLoopGroup(nThreadCount, system.threadFactory))
372+
.map(executor => new MultiThreadIoEventLoopGroup(0, executor, NioIoHandler.newFactory()))
373+
.getOrElse(new MultiThreadIoEventLoopGroup(nThreadCount, system.threadFactory, NioIoHandler.newFactory()))
374374

375375
/*
376376
* Be aware, that the close() method of DefaultChannelGroup is racy, because it uses an iterator over a ConcurrentHashMap.

0 commit comments

Comments
 (0)