Skip to content

Commit 33b95ed

Browse files
committed
need to override socket input/output stream
1 parent 371195a commit 33b95ed

File tree

3 files changed

+34
-20
lines changed

3 files changed

+34
-20
lines changed

ziti/src/integrationTest/kotlin/org/openziti/net/ConnectionTests.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ class ConnectionTests: BaseTest() {
167167
}
168168

169169
@Test
170-
fun `test socket-connect-read-timeout`() = runTest(timeout = 10.seconds) {
170+
fun `test socket-connect-read-timeout`() = runTest(timeout = 1000.seconds) {
171171
val greeting = "Hello from Ziti".toByteArray()
172172
val s = assertDoesNotThrow {
173173
ztx.serviceUpdates().filter { it.service.name == service }.first().service
@@ -195,6 +195,7 @@ class ConnectionTests: BaseTest() {
195195
}
196196

197197
ztx.connect(hostname, port).use { clt ->
198+
assertTrue(clt.isConnected)
198199
val buf = ByteArray(1024)
199200
clt.soTimeout = 500
200201
val input = clt.getInputStream()

ziti/src/main/kotlin/org/openziti/impl/ZitiContextImpl.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -214,16 +214,17 @@ internal class ZitiContextImpl(internal val id: Identity, enabled: Boolean) : Zi
214214

215215
override fun connect(host: String, port: Int): Socket {
216216
checkEnabled()
217-
val ch = open()
218-
runCatching {
217+
return runCatching {
218+
val ch = open()
219219
ch.connect(InetSocketAddress.createUnresolved(host, port)).get(10, TimeUnit.SECONDS)
220-
}.onFailure {
220+
d { "connected: ${ch.remoteAddress}" }
221+
AsychChannelSocket(ch)
222+
}.getOrElse {
221223
when (it) {
222224
is ExecutionException -> throw it.cause ?: it
223225
else -> throw it
224226
}
225227
}
226-
return AsychChannelSocket(ch)
227228
}
228229

229230
fun start(): Job = launch {

ziti/src/main/kotlin/org/openziti/net/nio/AsychChannelSocket.kt

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,12 @@
1616

1717
package org.openziti.net.nio
1818

19+
import java.io.*
1920
import java.net.InetAddress
2021
import java.net.InetSocketAddress
2122
import java.net.Socket
23+
import java.net.SocketException
2224
import java.nio.channels.AsynchronousSocketChannel
23-
import javax.net.SocketFactory
2425

2526
/**
2627
* Socket wrapper over AsynchronousSocketChannel.
@@ -32,20 +33,6 @@ import javax.net.SocketFactory
3233
internal
3334
class AsychChannelSocket(internal val impl: AsyncSocketImpl = AsyncSocketImpl()): Socket(impl) {
3435

35-
class Factory: SocketFactory() {
36-
override fun createSocket(): Socket = AsychChannelSocket()
37-
override fun createSocket(host: String?, port: Int): Socket = AsychChannelSocket(host, port)
38-
override fun createSocket(host: InetAddress?, port: Int): Socket = AsychChannelSocket(host, port)
39-
40-
override fun createSocket(host: String?, port: Int, localHost: InetAddress?, localPort: Int): Socket {
41-
TODO("Not yet implemented")
42-
}
43-
44-
override fun createSocket(address: InetAddress?, port: Int, localAddress: InetAddress?, localPort: Int): Socket {
45-
TODO("Not yet implemented")
46-
}
47-
}
48-
4936
constructor(ch: AsynchronousSocketChannel): this(AsyncSocketImpl(ch))
5037

5138
constructor(host: String?, port: Int) : this(InetAddress.getByName(host), port)
@@ -64,4 +51,29 @@ class AsychChannelSocket(internal val impl: AsyncSocketImpl = AsyncSocketImpl())
6451
impl.channel.close()
6552
}
6653

54+
override fun getInputStream(): InputStream {
55+
if (isClosed())
56+
throw SocketException("Socket is closed")
57+
if (!isConnected)
58+
throw SocketException("Socket is not connected")
59+
60+
return object : FilterInputStream(impl.inputStream) {
61+
override fun close() {
62+
this@AsychChannelSocket.close()
63+
}
64+
}
65+
}
66+
67+
override fun getOutputStream(): OutputStream {
68+
if (isClosed())
69+
throw SocketException("Socket is closed")
70+
if (!isConnected)
71+
throw SocketException("Socket is not connected")
72+
73+
return object : FilterOutputStream(impl.outputStream) {
74+
override fun close() {
75+
this@AsychChannelSocket.close()
76+
}
77+
}
78+
}
6779
}

0 commit comments

Comments
 (0)