Skip to content

Commit 9f720a6

Browse files
Add 30s init timeout and verify sync provider state (#138)
1 parent 80984ac commit 9f720a6

3 files changed

Lines changed: 283 additions & 13 deletions

File tree

core/src/main/scala/zio/openfeature/FeatureFlags.scala

Lines changed: 103 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ package zio.openfeature
33
import zio._
44
import zio.stream._
55
import zio.openfeature.internal.FeatureFlagsState
6-
import dev.openfeature.sdk.{FeatureProvider => OFFeatureProvider, OpenFeatureAPI, OpenFeatureAPIFactory}
6+
import dev.openfeature.sdk.{FeatureProvider => OFFeatureProvider, OpenFeatureAPI, OpenFeatureAPIFactory, ProviderState}
77
import dev.openfeature.sdk.multiprovider.{MultiProvider, Strategy, FirstMatchStrategy, FirstSuccessfulStrategy}
8+
import java.util.concurrent.TimeoutException
89

910
trait FeatureFlags {
1011

@@ -405,6 +406,24 @@ object FeatureFlags {
405406

406407
// Factory Methods
407408

409+
/** Default cap on how long initialization may take before the layer build fails or — for async layers — transitions
410+
* to `Fatal`. Mirrors the documented default referenced in every factory's ScalaDoc.
411+
*/
412+
private[openfeature] val DefaultInitTimeout: Duration = 30.seconds
413+
414+
/** Verify the provider reached a usable state after sync initialization. Anything outside `READY` / `STALE` causes
415+
* the layer build to fail so misconfiguration surfaces at startup, not at first evaluation. Returns the ZIO-level
416+
* `ProviderStatus` that should be reflected for a usable state.
417+
*/
418+
@scala.annotation.nowarn("msg=deprecated")
419+
private def verifyInitState(provider: OFFeatureProvider): ZIO[Any, Throwable, ProviderStatus] =
420+
ZIO.attempt(provider.getState).flatMap {
421+
case ProviderState.READY => ZIO.succeed(ProviderStatus.Ready)
422+
case ProviderState.STALE => ZIO.succeed(ProviderStatus.Stale)
423+
case bad @ (ProviderState.ERROR | ProviderState.FATAL | ProviderState.NOT_READY) =>
424+
ZIO.fail(new IllegalStateException(s"Provider in $bad state after initialization"))
425+
}
426+
408427
/** Shared initialization logic for all factory methods. */
409428
private[openfeature] def build(
410429
provider: OFFeatureProvider,
@@ -414,14 +433,21 @@ object FeatureFlags {
414433
statusRef: Option[Ref[ProviderStatus]],
415434
addShutdownFinalizer: Boolean,
416435
apiOverride: Option[OpenFeatureAPI] = None,
417-
evaluationTimeout: Option[Duration] = None
436+
evaluationTimeout: Option[Duration] = None,
437+
initTimeout: Duration = DefaultInitTimeout
418438
): ZIO[Scope, Throwable, FeatureFlagsLive] =
419439
for {
420440
api <- ZIO.succeed(apiOverride.getOrElse(OpenFeatureAPI.getInstance()))
421-
_ <- domain match {
441+
setAndWait = domain match {
422442
case Some(d) => ZIO.attemptBlocking(api.setProviderAndWait(d, provider))
423443
case None => ZIO.attemptBlocking(api.setProviderAndWait(provider))
424444
}
445+
// Bound the blocking init. `.disconnect` ensures the timeout returns promptly even though
446+
// `attemptBlocking` runs on the blocking pool; the underlying call may still run to completion
447+
// in the background and is handled by the Java SDK / addShutdownFinalizer path.
448+
_ <- setAndWait.disconnect
449+
.timeoutFail(new TimeoutException(s"Provider initialization exceeded $initTimeout"))(initTimeout)
450+
verified <- verifyInitState(provider)
425451
client <- (domain, version) match {
426452
case (Some(d), Some(v)) => ZIO.attempt(api.getClient(d, v))
427453
case (Some(d), None) => ZIO.attempt(api.getClient(d))
@@ -434,7 +460,8 @@ object FeatureFlags {
434460
baseState <- FeatureFlagsState.make
435461
state = statusRef.fold(baseState)(ref => baseState.copy(statusRef = ref))
436462
_ <- state.hooksRef.set(initialHooks)
437-
_ <- statusRef.fold(state.statusRef.set(ProviderStatus.Ready))(_ => ZIO.unit)
463+
// Only seed status when the caller didn't hand us a shared ref (testkit shares one).
464+
_ <- statusRef.fold(state.statusRef.set(verified))(_ => ZIO.unit)
438465
_ <- ZIO.when(addShutdownFinalizer)(ZIO.addFinalizer(ZIO.attemptBlocking(api.shutdown()).ignore))
439466
ff = new FeatureFlagsLive(
440467
client,
@@ -450,7 +477,13 @@ object FeatureFlags {
450477
_ <- ff.startEventBridge
451478
} yield ff
452479

453-
/** Create a FeatureFlags layer from any OpenFeature provider. */
480+
/** Create a FeatureFlags layer from any OpenFeature provider.
481+
*
482+
* Initialization is bounded by the default 30s init timeout: if `setProviderAndWait` takes longer, or the provider
483+
* reports `ERROR`/`FATAL`/`NOT_READY` afterwards, the layer build fails with a `TimeoutException` or
484+
* `IllegalStateException` wrapped at the layer boundary. Use the overload that accepts an explicit `initTimeout` to
485+
* raise/lower this bound; pass a very large duration (e.g. `365.days`) to effectively disable it.
486+
*/
454487
def fromProvider(provider: OFFeatureProvider): ZLayer[Scope, Throwable, FeatureFlags] =
455488
ZLayer.scoped(
456489
build(provider, domain = None, version = None, initialHooks = Nil, statusRef = None, addShutdownFinalizer = true)
@@ -460,7 +493,8 @@ object FeatureFlags {
460493
*
461494
* If a provider evaluation takes longer than `evaluationTimeout`, it fails with `ProviderError` containing a
462495
* `TimeoutException`. This prevents hung providers from blocking fibers indefinitely. Per-call timeouts set via
463-
* `EvaluationOptions.timeout` override this global default.
496+
* `EvaluationOptions.timeout` override this global default. Initialization uses the default 30s init timeout — see
497+
* [[fromProvider(OFFeatureProvider, Duration, Duration)]] to override.
464498
*/
465499
def fromProvider(provider: OFFeatureProvider, evaluationTimeout: Duration): ZLayer[Scope, Throwable, FeatureFlags] =
466500
ZLayer.scoped(
@@ -475,6 +509,31 @@ object FeatureFlags {
475509
)
476510
)
477511

512+
/** Create a FeatureFlags layer with explicit evaluation and initialization timeouts.
513+
*
514+
* `initTimeout` bounds the sync init: if `setProviderAndWait` takes longer, the layer build fails with a
515+
* `TimeoutException`. After init, if the provider reports `ERROR`/`FATAL`/`NOT_READY`, the build also fails so
516+
* misconfiguration surfaces at startup rather than at first evaluation. Pass a very large duration to effectively
517+
* disable the init timeout.
518+
*/
519+
def fromProvider(
520+
provider: OFFeatureProvider,
521+
evaluationTimeout: Duration,
522+
initTimeout: Duration
523+
): ZLayer[Scope, Throwable, FeatureFlags] =
524+
ZLayer.scoped(
525+
build(
526+
provider,
527+
domain = None,
528+
version = None,
529+
initialHooks = Nil,
530+
statusRef = None,
531+
addShutdownFinalizer = true,
532+
evaluationTimeout = Some(evaluationTimeout),
533+
initTimeout = initTimeout
534+
)
535+
)
536+
478537
/** Create a FeatureFlags layer with a named domain/client. */
479538
def fromProviderWithDomain(provider: OFFeatureProvider, domain: String): ZLayer[Scope, Throwable, FeatureFlags] =
480539
ZLayer.scoped(
@@ -564,7 +623,7 @@ object FeatureFlags {
564623
* where the provider becomes ready before the event bridge is registered, we start the event bridge first and then
565624
* check the provider's actual state.
566625
*/
567-
private def buildAsync(
626+
private[openfeature] def buildAsync(
568627
provider: OFFeatureProvider,
569628
domain: Option[String],
570629
version: Option[String],
@@ -573,7 +632,8 @@ object FeatureFlags {
573632
addShutdownFinalizer: Boolean,
574633
apiOverride: Option[OpenFeatureAPI] = None,
575634
onReady: Option[java.util.concurrent.CountDownLatch] = None,
576-
evaluationTimeout: Option[Duration] = None
635+
evaluationTimeout: Option[Duration] = None,
636+
initTimeout: Duration = DefaultInitTimeout
577637
): ZIO[Scope, Throwable, FeatureFlagsLive] =
578638
for {
579639
api <- ZIO.succeed(apiOverride.getOrElse(OpenFeatureAPI.getInstance()))
@@ -609,12 +669,21 @@ object FeatureFlags {
609669
)
610670
// Start event bridge — if provider is already ready, replay fires immediately
611671
_ <- ff.startEventBridge
672+
// Init watchdog: after initTimeout, if the provider hasn't moved past NotReady/Error,
673+
// transition to Fatal so callers polling providerStatus stop waiting. The fiber is forked
674+
// into the layer's Scope so it's interrupted when the layer is released.
675+
_ <- (ZIO.sleep(initTimeout) *> state.statusRef.update {
676+
case ProviderStatus.NotReady | ProviderStatus.Error => ProviderStatus.Fatal
677+
case other => other
678+
}).forkScoped
612679
} yield ff
613680

614681
/** Create a FeatureFlags layer from any OpenFeature provider (non-blocking).
615682
*
616683
* The provider initializes in the background. Evaluations fail with `ProviderNotReady` until the provider is ready.
617-
* Use `onProviderReady` or `providerStatus` to detect when the provider becomes available.
684+
* Use `onProviderReady` or `providerStatus` to detect when the provider becomes available. If the provider has not
685+
* become ready within the default 30s init timeout, status atomically transitions to `Fatal` so callers polling
686+
* `providerStatus` stop waiting. See [[fromProviderAsync(OFFeatureProvider, Duration, Duration)]] to override.
618687
*/
619688
def fromProviderAsync(provider: OFFeatureProvider): ZLayer[Scope, Throwable, FeatureFlags] =
620689
ZLayer.scoped(
@@ -632,7 +701,8 @@ object FeatureFlags {
632701
*
633702
* Combines async initialization with evaluation timeout protection. The provider initializes in the background;
634703
* evaluations fail with `ProviderNotReady` until ready. Once ready, evaluations that exceed `evaluationTimeout` fail
635-
* with `ProviderError` containing a `TimeoutException`.
704+
* with `ProviderError` containing a `TimeoutException`. The init-side default 30s watchdog still applies — see
705+
* [[fromProviderAsync(OFFeatureProvider, Duration, Duration)]] to override.
636706
*/
637707
def fromProviderAsync(
638708
provider: OFFeatureProvider,
@@ -650,6 +720,29 @@ object FeatureFlags {
650720
)
651721
)
652722

723+
/** Create a FeatureFlags layer with explicit evaluation and initialization timeouts (non-blocking).
724+
*
725+
* `initTimeout` bounds the async ready window: after that duration, if status is still `NotReady` or `Error`, it
726+
* atomically transitions to `Fatal`. Pass a very large duration to effectively disable the watchdog.
727+
*/
728+
def fromProviderAsync(
729+
provider: OFFeatureProvider,
730+
evaluationTimeout: Duration,
731+
initTimeout: Duration
732+
): ZLayer[Scope, Throwable, FeatureFlags] =
733+
ZLayer.scoped(
734+
buildAsync(
735+
provider,
736+
domain = None,
737+
version = None,
738+
initialHooks = Nil,
739+
statusRef = None,
740+
addShutdownFinalizer = true,
741+
evaluationTimeout = Some(evaluationTimeout),
742+
initTimeout = initTimeout
743+
)
744+
)
745+
653746
/** Create a FeatureFlags layer with a named domain (non-blocking). */
654747
def fromProviderWithDomainAsync(
655748
provider: OFFeatureProvider,
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
package zio.openfeature
2+
3+
import zio._
4+
import zio.test._
5+
import zio.test.TestAspect.{withLiveClock, sequential}
6+
import dev.openfeature.sdk.{
7+
EvaluationContext => OFEvaluationContext,
8+
EventProvider,
9+
Metadata,
10+
OpenFeatureAPIFactory,
11+
ProviderEvaluation,
12+
ProviderState,
13+
Value
14+
}
15+
import java.util.concurrent.CountDownLatch
16+
import java.util.concurrent.atomic.AtomicReference
17+
18+
/** Covers issues #120 (init timeout) and #121 (verify sync init succeeded). The minimal test providers below avoid
19+
* depending on testkit so the spec can live in `core`.
20+
*/
21+
object ProviderInitHardeningSpec extends ZIOSpecDefault {
22+
23+
/** A provider whose `initialize()` blocks until its latch is released. Used to simulate a hanging sync init. */
24+
final private class BlockingInitProvider(latch: CountDownLatch) extends EventProvider {
25+
private val st = new AtomicReference[ProviderState](ProviderState.NOT_READY)
26+
27+
override def getMetadata: Metadata = new Metadata { def getName: String = "BlockingInitProvider" }
28+
override def getState: ProviderState = st.get()
29+
30+
override def initialize(ctx: OFEvaluationContext): Unit = {
31+
latch.await()
32+
st.set(ProviderState.READY)
33+
}
34+
override def shutdown(): Unit = {
35+
latch.countDown() // release any blocked init so the JVM doesn't leak threads
36+
st.set(ProviderState.NOT_READY)
37+
}
38+
39+
override def getBooleanEvaluation(k: String, d: java.lang.Boolean, c: OFEvaluationContext) =
40+
ProviderEvaluation.builder[java.lang.Boolean]().value(d).reason("DEFAULT").build()
41+
override def getStringEvaluation(k: String, d: String, c: OFEvaluationContext) =
42+
ProviderEvaluation.builder[String]().value(d).reason("DEFAULT").build()
43+
override def getIntegerEvaluation(k: String, d: java.lang.Integer, c: OFEvaluationContext) =
44+
ProviderEvaluation.builder[java.lang.Integer]().value(d).reason("DEFAULT").build()
45+
override def getDoubleEvaluation(k: String, d: java.lang.Double, c: OFEvaluationContext) =
46+
ProviderEvaluation.builder[java.lang.Double]().value(d).reason("DEFAULT").build()
47+
override def getObjectEvaluation(k: String, d: Value, c: OFEvaluationContext) =
48+
ProviderEvaluation.builder[Value]().value(d).reason("DEFAULT").build()
49+
}
50+
51+
/** A provider that returns successfully from `initialize()` but reports ERROR. The library should refuse to mark such
52+
* a provider Ready and instead fail layer construction.
53+
*/
54+
final private class InitToErrorProvider extends EventProvider {
55+
private val st = new AtomicReference[ProviderState](ProviderState.NOT_READY)
56+
override def getMetadata: Metadata = new Metadata { def getName: String = "InitToErrorProvider" }
57+
override def getState: ProviderState = st.get()
58+
override def initialize(ctx: OFEvaluationContext): Unit = st.set(ProviderState.ERROR)
59+
override def shutdown(): Unit = st.set(ProviderState.NOT_READY)
60+
61+
override def getBooleanEvaluation(k: String, d: java.lang.Boolean, c: OFEvaluationContext) =
62+
ProviderEvaluation.builder[java.lang.Boolean]().value(d).reason("DEFAULT").build()
63+
override def getStringEvaluation(k: String, d: String, c: OFEvaluationContext) =
64+
ProviderEvaluation.builder[String]().value(d).reason("DEFAULT").build()
65+
override def getIntegerEvaluation(k: String, d: java.lang.Integer, c: OFEvaluationContext) =
66+
ProviderEvaluation.builder[java.lang.Integer]().value(d).reason("DEFAULT").build()
67+
override def getDoubleEvaluation(k: String, d: java.lang.Double, c: OFEvaluationContext) =
68+
ProviderEvaluation.builder[java.lang.Double]().value(d).reason("DEFAULT").build()
69+
override def getObjectEvaluation(k: String, d: Value, c: OFEvaluationContext) =
70+
ProviderEvaluation.builder[Value]().value(d).reason("DEFAULT").build()
71+
}
72+
73+
// Each sync test uses a unique domain so OpenFeatureAPI clients don't share state across cases.
74+
private def uniqueDomain(label: String): String = s"init-hardening-$label-${java.util.UUID.randomUUID()}"
75+
76+
def spec = suite("ProviderInitHardeningSpec")(
77+
test("[A1] sync fromProvider fails with TimeoutException when init blocks past initTimeout") {
78+
val latch = new CountDownLatch(1)
79+
val provider = new BlockingInitProvider(latch)
80+
val api = OpenFeatureAPIFactory.create()
81+
val build = ZIO.scoped {
82+
FeatureFlags
83+
.build(
84+
provider,
85+
domain = Some(uniqueDomain("sync-timeout")),
86+
version = None,
87+
initialHooks = Nil,
88+
statusRef = None,
89+
addShutdownFinalizer = false,
90+
apiOverride = Some(api),
91+
initTimeout = 200.millis
92+
)
93+
.unit
94+
}
95+
for {
96+
result <- build.either
97+
_ = latch.countDown() // unblock the background thread so it can exit
98+
} yield assertTrue(
99+
result.isLeft,
100+
result.left.exists(_.isInstanceOf[java.util.concurrent.TimeoutException])
101+
)
102+
} @@ withLiveClock,
103+
test("[A2] sync fromProvider fails if provider reports ERROR after init") {
104+
val provider = new InitToErrorProvider
105+
val api = OpenFeatureAPIFactory.create()
106+
val build = ZIO.scoped {
107+
FeatureFlags
108+
.build(
109+
provider,
110+
domain = Some(uniqueDomain("sync-err")),
111+
version = None,
112+
initialHooks = Nil,
113+
statusRef = None,
114+
addShutdownFinalizer = false,
115+
apiOverride = Some(api),
116+
initTimeout = 5.seconds
117+
)
118+
.unit
119+
}
120+
for {
121+
result <- build.either
122+
} yield assertTrue(
123+
result.isLeft,
124+
result.left.exists(t =>
125+
t.isInstanceOf[IllegalStateException] && Option(t.getMessage).exists(_.contains("ERROR"))
126+
)
127+
)
128+
} @@ withLiveClock,
129+
test("[A1 async] watchdog transitions provider to Fatal after initTimeout when init hangs") {
130+
// initialize() blocks forever — Java SDK never fires PROVIDER_READY, so the only way out
131+
// of NotReady is the watchdog.
132+
val latch = new CountDownLatch(1)
133+
val provider = new BlockingInitProvider(latch)
134+
val api = OpenFeatureAPIFactory.create()
135+
ZIO.scoped {
136+
for {
137+
statusRef <- Ref.make[ProviderStatus](ProviderStatus.NotReady)
138+
_ <- FeatureFlags.buildAsync(
139+
provider,
140+
domain = Some(uniqueDomain("async-watchdog")),
141+
version = None,
142+
initialHooks = Nil,
143+
statusRef = Some(statusRef),
144+
addShutdownFinalizer = false,
145+
apiOverride = Some(api),
146+
initTimeout = 100.millis
147+
)
148+
before <- statusRef.get
149+
// Live clock so the watchdog fiber's ZIO.sleep actually elapses.
150+
_ <- ZIO.sleep(400.millis)
151+
after <- statusRef.get
152+
_ = latch.countDown()
153+
} yield assertTrue(before == ProviderStatus.NotReady, after == ProviderStatus.Fatal)
154+
}
155+
} @@ withLiveClock
156+
) @@ sequential
157+
}

0 commit comments

Comments
 (0)