@@ -18,9 +18,11 @@ package org.openziti.impl
1818
1919import kotlinx.coroutines.*
2020import kotlinx.coroutines.channels.BufferOverflow
21+ import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
2122import kotlinx.coroutines.flow.*
2223import kotlinx.coroutines.flow.Flow
2324import kotlinx.coroutines.future.asCompletableFuture
25+ import kotlinx.coroutines.selects.onTimeout
2426import kotlinx.coroutines.selects.select
2527import org.openziti.*
2628import org.openziti.Identity
@@ -50,8 +52,7 @@ import kotlin.coroutines.CoroutineContext
5052import kotlin.coroutines.cancellation.CancellationException
5153import kotlin.properties.Delegates
5254import kotlin.random.Random
53- import kotlin.time.DurationUnit
54- import kotlin.time.toDuration
55+ import kotlin.time.Duration.Companion.seconds
5556
5657/* *
5758 * Object maintaining current Ziti session.
@@ -75,7 +76,7 @@ internal class ZitiContextImpl(internal val id: Identity, enabled: Boolean) : Zi
7576 override val coroutineContext: CoroutineContext
7677 get() = Dispatchers .IO + supervisor
7778
78-
79+ private val forceAuthRefresh = kotlinx.coroutines.channels. Channel < Any >( CONFLATED )
7980 // active controller
8081 private val ctrl = MutableStateFlow <Controller ?>(null )
8182 private val controller: Controller
@@ -245,6 +246,7 @@ internal class ZitiContextImpl(internal val id: Identity, enabled: Boolean) : Zi
245246 ctrl.value = c
246247
247248 val oidc = controller.capabilities.contains(Capabilities .OIDC_AUTH )
249+ val ha = controller.capabilities.contains(Capabilities .HA_CONTROLLER )
248250 authenticator = authenticator(controller.endpoint, id.sslContext(), oidc)
249251
250252 val currentSession = runCatching {
@@ -254,7 +256,6 @@ internal class ZitiContextImpl(internal val id: Identity, enabled: Boolean) : Zi
254256 null
255257 }.getOrNull()
256258
257-
258259 val session = if (currentSession != null )
259260 currentSession
260261 else {
@@ -282,13 +283,16 @@ internal class ZitiContextImpl(internal val id: Identity, enabled: Boolean) : Zi
282283
283284 if (! success) continue
284285 }
285- val ctrls = controller.listControllers().mapNotNull {
286- it.apiAddresses?.get(" edge-client" )?.first { addr -> addr.version == " v1" }?.url
287- }.toSet()
288286
289- if (ctrls != ctrlEndpoints) {
290- ctrlEndpoints.clear()
291- ctrlEndpoints.addAll(ctrls)
287+ if (ha) {
288+ val ctrls = controller.listControllers().mapNotNull {
289+ it.apiAddresses?.get(" edge-client" )?.first { addr -> addr.version == " v1" }?.url
290+ }.toSet()
291+
292+ if (ctrls != ctrlEndpoints) {
293+ ctrlEndpoints.clear()
294+ ctrlEndpoints.addAll(ctrls)
295+ }
292296 }
293297
294298 val services = controller.getServices().toList()
@@ -368,7 +372,8 @@ internal class ZitiContextImpl(internal val id: Identity, enabled: Boolean) : Zi
368372 }
369373 }
370374
371- private fun maintainApiSession (authenticator : ZitiAuthenticator ) = async {
375+ @OptIn(ExperimentalCoroutinesApi ::class )
376+ private fun maintainApiSession (authenticator : ZitiAuthenticator ) = async(CoroutineName (" maintainApiSession" )) {
372377 var retries = 5
373378 while (true ) {
374379 val token = accessToken.value ? : break
@@ -380,18 +385,26 @@ internal class ZitiContextImpl(internal val id: Identity, enabled: Boolean) : Zi
380385 break
381386 }
382387
383- val delay = (token.expiration.toEpochSecond() - now.toEpochSecond()) * 2 / 3
388+ val delaySeconds = token.expiration.toEpochSecond() - now.toEpochSecond()
389+ val delay = delaySeconds / 2 + Random .nextLong(delaySeconds / 6 )
384390
385- d(" [${name()} ] sleeping for $delay seconds" )
386- delay(delay.toDuration(DurationUnit .SECONDS ))
387- d(" [${name()} ] refreshing access token" )
391+ d {" [${name()} ] ssleeping for $delay seconds" }
392+ val reason = select {
393+ onTimeout(delay.seconds) { " delay" }
394+ forceAuthRefresh.onReceive { " forced refresh" }
395+ }
396+ d(" [${name()} ] refreshing access token [$reason ]" )
388397 runCatching {
389398 val newToken = authenticator.refresh()
390399 accessToken.value = newToken
391400 retries = 5
392401 }.onFailure {
393- retries--
394402 w{ " failed to refresh access token: ${it.message} " }
403+ if (it is ZitiAuthenticator .AuthException ) {
404+ accessToken.value = null
405+ continue
406+ }
407+ retries--
395408 }
396409 }
397410 }
0 commit comments