Skip to content

Commit 9b1f8b1

Browse files
authored
Merge pull request #2971 from armanbilge/fix/leaked-socket-on-open-error
Prevent socket leaks due to post-open exceptions
2 parents 8f54106 + 44738aa commit 9b1f8b1

File tree

3 files changed

+47
-30
lines changed

3 files changed

+47
-30
lines changed

io/js/src/main/scala/fs2/io/net/SocketGroupPlatform.scala

+11-5
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,18 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type =>
5252
options: List[SocketOption]
5353
): Resource[F, Socket[F]] =
5454
(for {
55-
sock <- F
56-
.delay(
57-
new facade.net.Socket(new facade.net.SocketOptions { allowHalfOpen = true })
55+
sock <- Resource
56+
.make(
57+
F.delay(
58+
new facade.net.Socket(new facade.net.SocketOptions { allowHalfOpen = true })
59+
)
60+
)(sock =>
61+
F.delay {
62+
if (!sock.destroyed)
63+
sock.destroy()
64+
}
5865
)
59-
.flatTap(setSocketOptions(options))
60-
.toResource
66+
.evalTap(setSocketOptions(options))
6167
socket <- Socket.forAsync(sock)
6268
_ <- F
6369
.async[Unit] { cb =>

io/js/src/main/scala/fs2/io/net/unixsocket/UnixSocketsPlatform.scala

+11-4
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,22 @@ private[unixsocket] trait UnixSocketsCompanionPlatform {
4141

4242
override def client(address: UnixSocketAddress): Resource[F, Socket[F]] =
4343
Resource
44-
.eval(for {
45-
socket <- F.delay(
44+
.make(
45+
F.delay(
4646
new facade.net.Socket(new facade.net.SocketOptions { allowHalfOpen = true })
4747
)
48-
_ <- F.async_[Unit] { cb =>
48+
)(socket =>
49+
F.delay {
50+
if (!socket.destroyed)
51+
socket.destroy()
52+
}
53+
)
54+
.evalTap { socket =>
55+
F.async_[Unit] { cb =>
4956
socket.connect(address.path, () => cb(Right(())))
5057
()
5158
}
52-
} yield socket)
59+
}
5360
.flatMap(Socket.forAsync[F])
5461

5562
override def server(

io/jvm/src/main/scala/fs2/io/net/SocketGroupPlatform.scala

+25-21
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,13 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type =>
4848
options: List[SocketOption]
4949
): Resource[F, Socket[F]] = {
5050
def setup: Resource[F, AsynchronousSocketChannel] =
51-
Resource.make(Async[F].delay {
52-
val ch =
53-
AsynchronousChannelProvider.provider.openAsynchronousSocketChannel(channelGroup)
54-
options.foreach(opt => ch.setOption(opt.key, opt.value))
55-
ch
56-
})(ch => Async[F].delay(if (ch.isOpen) ch.close else ()))
51+
Resource
52+
.make(
53+
Async[F].delay(
54+
AsynchronousChannelProvider.provider.openAsynchronousSocketChannel(channelGroup)
55+
)
56+
)(ch => Async[F].delay(if (ch.isOpen) ch.close else ()))
57+
.evalTap(ch => Async[F].delay(options.foreach(opt => ch.setOption(opt.key, opt.value))))
5758

5859
def connect(ch: AsynchronousSocketChannel): F[AsynchronousSocketChannel] =
5960
to.resolve[F].flatMap { ip =>
@@ -80,24 +81,27 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type =>
8081
options: List[SocketOption]
8182
): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] = {
8283

83-
val setup: F[AsynchronousServerSocketChannel] =
84-
address.traverse(_.resolve[F]).flatMap { addr =>
85-
Async[F].delay {
86-
val ch =
87-
AsynchronousChannelProvider.provider.openAsynchronousServerSocketChannel(channelGroup)
88-
ch.bind(
89-
new InetSocketAddress(
90-
addr.map(_.toInetAddress).orNull,
91-
port.map(_.value).getOrElse(0)
84+
val setup: Resource[F, AsynchronousServerSocketChannel] =
85+
Resource.eval(address.traverse(_.resolve[F])).flatMap { addr =>
86+
Resource
87+
.make(
88+
Async[F].delay(
89+
AsynchronousChannelProvider.provider
90+
.openAsynchronousServerSocketChannel(channelGroup)
91+
)
92+
)(sch => Async[F].delay(if (sch.isOpen) sch.close()))
93+
.evalTap(ch =>
94+
Async[F].delay(
95+
ch.bind(
96+
new InetSocketAddress(
97+
addr.map(_.toInetAddress).orNull,
98+
port.map(_.value).getOrElse(0)
99+
)
100+
)
92101
)
93102
)
94-
ch
95-
}
96103
}
97104

98-
def cleanup(sch: AsynchronousServerSocketChannel): F[Unit] =
99-
Async[F].delay(if (sch.isOpen) sch.close())
100-
101105
def acceptIncoming(
102106
sch: AsynchronousServerSocketChannel
103107
): Stream[F, Socket[F]] = {
@@ -137,7 +141,7 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type =>
137141
}
138142
}
139143

140-
Resource.make(setup)(cleanup).map { sch =>
144+
setup.map { sch =>
141145
val jLocalAddress = sch.getLocalAddress.asInstanceOf[java.net.InetSocketAddress]
142146
val localAddress = SocketAddress.fromInetSocketAddress(jLocalAddress)
143147
(localAddress, acceptIncoming(sch))

0 commit comments

Comments
 (0)