1+ /*
2+ * Copyright (c) 2018-2025 NetFoundry Inc.
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ *
8+ * https://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS,
12+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ * See the License for the specific language governing permissions and
14+ * limitations under the License.
15+ */
16+
17+ package org.openziti.net
18+
19+ import kotlinx.coroutines.Dispatchers
20+ import kotlinx.coroutines.delay
21+ import kotlinx.coroutines.flow.filter
22+ import kotlinx.coroutines.flow.first
23+ import kotlinx.coroutines.launch
24+ import kotlinx.coroutines.test.runTest
25+ import kotlinx.coroutines.withContext
26+ import org.junit.jupiter.api.*
27+ import org.openziti.IdentityConfig
28+ import org.openziti.Ziti
29+ import org.openziti.ZitiAddress
30+ import org.openziti.ZitiContext
31+ import org.openziti.api.DNSName
32+ import org.openziti.api.InterceptConfig
33+ import org.openziti.api.InterceptV1Cfg
34+ import org.openziti.api.PortRange
35+ import org.openziti.edge.model.DialBind
36+ import org.openziti.integ.BaseTest
37+ import org.openziti.integ.ManagementHelper
38+ import org.openziti.net.nio.acceptSuspend
39+ import org.openziti.net.nio.readSuspend
40+ import org.openziti.net.nio.writeSuspend
41+ import java.net.ConnectException
42+ import java.net.InetSocketAddress
43+ import java.net.SocketTimeoutException
44+ import java.nio.ByteBuffer
45+ import java.nio.channels.InterruptedByTimeoutException
46+ import java.util.concurrent.ExecutionException
47+ import java.util.concurrent.TimeUnit
48+ import kotlin.test.assertContentEquals
49+ import kotlin.test.assertEquals
50+ import kotlin.test.assertFalse
51+ import kotlin.test.assertTrue
52+ import kotlin.time.Duration.Companion.seconds
53+
54+ class ConnectionTests : BaseTest () {
55+
56+ private val hostname = " test${System .nanoTime()} .ziti"
57+ private val port = 5000
58+ private lateinit var service: String
59+ private lateinit var cfg: IdentityConfig
60+ private lateinit var ztx: ZitiContext
61+
62+ @BeforeEach
63+ fun before () {
64+ service = ManagementHelper .createService(
65+ configs = mapOf (
66+ InterceptV1Cfg to InterceptConfig (
67+ protocols = setOf (Protocol .TCP ),
68+ addresses = setOf (DNSName (hostname)),
69+ portRanges = sortedSetOf(PortRange (port, port)),
70+ )
71+ )
72+ )
73+ cfg = createIdentity()
74+ ztx = Ziti .newContext(cfg)
75+ }
76+
77+ @AfterEach
78+ fun after () {
79+ ztx.destroy()
80+ }
81+
82+ @Test
83+ fun `test dial without terminator` () = runTest(timeout = 10 .seconds) {
84+ val srv = assertDoesNotThrow {
85+ ztx.serviceUpdates().filter { it.service.name == service }.first().service
86+ }
87+
88+ assertFalse(srv.config.isEmpty())
89+ assertThrows<ConnectException > {
90+ ztx.dial(service)
91+ }.run {
92+ assert (message!! .contains(" has no terminators" ))
93+ }
94+
95+ assertThrows<ExecutionException > {
96+ val ch = ztx.open()
97+ ch.connect(ZitiAddress .Dial (service)).get()
98+ }.run {
99+ assert (cause!! .message!! .contains(" has no terminators" ))
100+ }
101+ assertThrows<ExecutionException > {
102+ val ch = ztx.open()
103+ ch.connect(InetSocketAddress .createUnresolved(hostname, port)).get()
104+ }.run {
105+ assert (cause!! .message!! .contains(" has no terminators" ))
106+ }
107+
108+ assertThrows<ConnectException > {
109+ ztx.connect(hostname, port)
110+ }.run {
111+ assertTrue(message!! .contains(" has no terminators" ))
112+ }
113+ }
114+
115+ @Test
116+ fun `test bind-connect-read-timeout` () = runTest(timeout = 10 .seconds) {
117+ val greeting = " Hello from Ziti" .toByteArray()
118+ val s = assertDoesNotThrow {
119+ ztx.serviceUpdates().filter { it.service.name == service }.first().service
120+ }
121+ assertTrue(s.permissions.contains(DialBind .DIAL ))
122+ assertTrue(s.permissions.contains(DialBind .BIND ))
123+
124+ ztx.openServer().use { srv ->
125+
126+ srv.bind(ZitiAddress .Bind (service))
127+
128+ // wait for binding -- test dispatcher skips delays
129+ withContext(Dispatchers .Default ) {
130+ val zrv = srv as ZitiServerSocketChannel
131+ while (zrv.state != ZitiServerSocketChannel .State .bound) {
132+ delay(50 )
133+ }
134+ }
135+
136+ launch(Dispatchers .IO ) {
137+ val c = srv.acceptSuspend()
138+ c.writeSuspend(ByteBuffer .wrap(greeting))
139+ delay(1000 )
140+ c.close()
141+ }
142+
143+ ztx.open().use { clt ->
144+ clt.connect(ZitiAddress .Dial (service)).get(1 , TimeUnit .SECONDS )
145+
146+ val buf = ByteBuffer .allocate(1024 )
147+
148+ // read 1: return greeting
149+ val read1 = clt.readSuspend(buf, 100 , TimeUnit .MILLISECONDS )
150+ assertEquals(read1, greeting.size)
151+ val readMsg = ByteArray (read1)
152+ buf.flip().get(readMsg)
153+ assertContentEquals(greeting, readMsg)
154+ assertFalse(buf.hasRemaining())
155+
156+ buf.clear()
157+ // read 2: should time out
158+ assertThrows<InterruptedByTimeoutException > {
159+ clt.readSuspend(buf, 600 , TimeUnit .MILLISECONDS )
160+ }
161+
162+ buf.clear()
163+ // read 3: should get EOF
164+ assertEquals(- 1 , clt.readSuspend(buf, 600 , TimeUnit .MILLISECONDS ))
165+ }
166+ }
167+ }
168+
169+ @Test
170+ fun `test socket-connect-read-timeout` () = runTest(timeout = 10 .seconds) {
171+ val greeting = " Hello from Ziti" .toByteArray()
172+ val s = assertDoesNotThrow {
173+ ztx.serviceUpdates().filter { it.service.name == service }.first().service
174+ }
175+ assertTrue(s.permissions.contains(DialBind .DIAL ))
176+ assertTrue(s.permissions.contains(DialBind .BIND ))
177+
178+ ztx.openServer().use { srv ->
179+
180+ srv.bind(ZitiAddress .Bind (service))
181+
182+ // wait for binding -- test dispatcher skips delays
183+ withContext(Dispatchers .Default ) {
184+ val zrv = srv as ZitiServerSocketChannel
185+ while (zrv.state != ZitiServerSocketChannel .State .bound) {
186+ delay(50 )
187+ }
188+ }
189+
190+ launch(Dispatchers .IO ) {
191+ val c = srv.acceptSuspend()
192+ c.writeSuspend(ByteBuffer .wrap(greeting))
193+ val b = ByteBuffer .allocate(1024 )
194+ c.readSuspend(b)
195+ }
196+
197+ ztx.connect(hostname, port).use { clt ->
198+ assertTrue(clt.isConnected)
199+ val buf = ByteArray (1024 )
200+ clt.soTimeout = 500
201+ val input = clt.getInputStream()
202+
203+ // read 1: return greeting
204+ val read1 = input.read(buf)
205+ assertEquals(read1, greeting.size)
206+ val readMsg = buf.sliceArray(0 .. < read1)
207+ assertContentEquals(greeting, readMsg)
208+
209+ // other reads would get a timeout
210+ for (i in 0 .. 10 ) {
211+ assertThrows<SocketTimeoutException > {
212+ input.read(buf)
213+ }
214+ }
215+ }
216+ }
217+ }
218+
219+ }
0 commit comments