Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 103 additions & 10 deletions core/src/main/scala/zio/openfeature/FeatureFlags.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package zio.openfeature
import zio._
import zio.stream._
import zio.openfeature.internal.FeatureFlagsState
import dev.openfeature.sdk.{FeatureProvider => OFFeatureProvider, OpenFeatureAPI, OpenFeatureAPIFactory}
import dev.openfeature.sdk.{FeatureProvider => OFFeatureProvider, OpenFeatureAPI, OpenFeatureAPIFactory, ProviderState}
import dev.openfeature.sdk.multiprovider.{MultiProvider, Strategy, FirstMatchStrategy, FirstSuccessfulStrategy}
import java.util.concurrent.TimeoutException

trait FeatureFlags {

Expand Down Expand Up @@ -405,6 +406,24 @@ object FeatureFlags {

// Factory Methods

/** Default cap on how long initialization may take before the layer build fails or — for async layers — transitions
* to `Fatal`. Mirrors the documented default referenced in every factory's ScalaDoc.
*/
private[openfeature] val DefaultInitTimeout: Duration = 30.seconds

/** Verify the provider reached a usable state after sync initialization. Anything outside `READY` / `STALE` causes
* the layer build to fail so misconfiguration surfaces at startup, not at first evaluation. Returns the ZIO-level
* `ProviderStatus` that should be reflected for a usable state.
*/
@scala.annotation.nowarn("msg=deprecated")
private def verifyInitState(provider: OFFeatureProvider): ZIO[Any, Throwable, ProviderStatus] =
ZIO.attempt(provider.getState).flatMap {
case ProviderState.READY => ZIO.succeed(ProviderStatus.Ready)
case ProviderState.STALE => ZIO.succeed(ProviderStatus.Stale)
case bad @ (ProviderState.ERROR | ProviderState.FATAL | ProviderState.NOT_READY) =>
ZIO.fail(new IllegalStateException(s"Provider in $bad state after initialization"))
}

/** Shared initialization logic for all factory methods. */
private[openfeature] def build(
provider: OFFeatureProvider,
Expand All @@ -414,14 +433,21 @@ object FeatureFlags {
statusRef: Option[Ref[ProviderStatus]],
addShutdownFinalizer: Boolean,
apiOverride: Option[OpenFeatureAPI] = None,
evaluationTimeout: Option[Duration] = None
evaluationTimeout: Option[Duration] = None,
initTimeout: Duration = DefaultInitTimeout
): ZIO[Scope, Throwable, FeatureFlagsLive] =
for {
api <- ZIO.succeed(apiOverride.getOrElse(OpenFeatureAPI.getInstance()))
_ <- domain match {
setAndWait = domain match {
case Some(d) => ZIO.attemptBlocking(api.setProviderAndWait(d, provider))
case None => ZIO.attemptBlocking(api.setProviderAndWait(provider))
}
// Bound the blocking init. `.disconnect` ensures the timeout returns promptly even though
// `attemptBlocking` runs on the blocking pool; the underlying call may still run to completion
// in the background and is handled by the Java SDK / addShutdownFinalizer path.
_ <- setAndWait.disconnect
.timeoutFail(new TimeoutException(s"Provider initialization exceeded $initTimeout"))(initTimeout)
verified <- verifyInitState(provider)
client <- (domain, version) match {
case (Some(d), Some(v)) => ZIO.attempt(api.getClient(d, v))
case (Some(d), None) => ZIO.attempt(api.getClient(d))
Expand All @@ -434,7 +460,8 @@ object FeatureFlags {
baseState <- FeatureFlagsState.make
state = statusRef.fold(baseState)(ref => baseState.copy(statusRef = ref))
_ <- state.hooksRef.set(initialHooks)
_ <- statusRef.fold(state.statusRef.set(ProviderStatus.Ready))(_ => ZIO.unit)
// Only seed status when the caller didn't hand us a shared ref (testkit shares one).
_ <- statusRef.fold(state.statusRef.set(verified))(_ => ZIO.unit)
_ <- ZIO.when(addShutdownFinalizer)(ZIO.addFinalizer(ZIO.attemptBlocking(api.shutdown()).ignore))
ff = new FeatureFlagsLive(
client,
Expand All @@ -450,7 +477,13 @@ object FeatureFlags {
_ <- ff.startEventBridge
} yield ff

/** Create a FeatureFlags layer from any OpenFeature provider. */
/** Create a FeatureFlags layer from any OpenFeature provider.
*
* Initialization is bounded by the default 30s init timeout: if `setProviderAndWait` takes longer, or the provider
* reports `ERROR`/`FATAL`/`NOT_READY` afterwards, the layer build fails with a `TimeoutException` or
* `IllegalStateException` wrapped at the layer boundary. Use the overload that accepts an explicit `initTimeout` to
* raise/lower this bound; pass a very large duration (e.g. `365.days`) to effectively disable it.
*/
def fromProvider(provider: OFFeatureProvider): ZLayer[Scope, Throwable, FeatureFlags] =
ZLayer.scoped(
build(provider, domain = None, version = None, initialHooks = Nil, statusRef = None, addShutdownFinalizer = true)
Expand All @@ -460,7 +493,8 @@ object FeatureFlags {
*
* If a provider evaluation takes longer than `evaluationTimeout`, it fails with `ProviderError` containing a
* `TimeoutException`. This prevents hung providers from blocking fibers indefinitely. Per-call timeouts set via
* `EvaluationOptions.timeout` override this global default.
* `EvaluationOptions.timeout` override this global default. Initialization uses the default 30s init timeout — see
* [[fromProvider(OFFeatureProvider, Duration, Duration)]] to override.
*/
def fromProvider(provider: OFFeatureProvider, evaluationTimeout: Duration): ZLayer[Scope, Throwable, FeatureFlags] =
ZLayer.scoped(
Expand All @@ -475,6 +509,31 @@ object FeatureFlags {
)
)

/** Create a FeatureFlags layer with explicit evaluation and initialization timeouts.
*
* `initTimeout` bounds the sync init: if `setProviderAndWait` takes longer, the layer build fails with a
* `TimeoutException`. After init, if the provider reports `ERROR`/`FATAL`/`NOT_READY`, the build also fails so
* misconfiguration surfaces at startup rather than at first evaluation. Pass a very large duration to effectively
* disable the init timeout.
*/
def fromProvider(
provider: OFFeatureProvider,
evaluationTimeout: Duration,
initTimeout: Duration
): ZLayer[Scope, Throwable, FeatureFlags] =
ZLayer.scoped(
build(
provider,
domain = None,
version = None,
initialHooks = Nil,
statusRef = None,
addShutdownFinalizer = true,
evaluationTimeout = Some(evaluationTimeout),
initTimeout = initTimeout
)
)

/** Create a FeatureFlags layer with a named domain/client. */
def fromProviderWithDomain(provider: OFFeatureProvider, domain: String): ZLayer[Scope, Throwable, FeatureFlags] =
ZLayer.scoped(
Expand Down Expand Up @@ -564,7 +623,7 @@ object FeatureFlags {
* where the provider becomes ready before the event bridge is registered, we start the event bridge first and then
* check the provider's actual state.
*/
private def buildAsync(
private[openfeature] def buildAsync(
provider: OFFeatureProvider,
domain: Option[String],
version: Option[String],
Expand All @@ -573,7 +632,8 @@ object FeatureFlags {
addShutdownFinalizer: Boolean,
apiOverride: Option[OpenFeatureAPI] = None,
onReady: Option[java.util.concurrent.CountDownLatch] = None,
evaluationTimeout: Option[Duration] = None
evaluationTimeout: Option[Duration] = None,
initTimeout: Duration = DefaultInitTimeout
): ZIO[Scope, Throwable, FeatureFlagsLive] =
for {
api <- ZIO.succeed(apiOverride.getOrElse(OpenFeatureAPI.getInstance()))
Expand Down Expand Up @@ -609,12 +669,21 @@ object FeatureFlags {
)
// Start event bridge — if provider is already ready, replay fires immediately
_ <- ff.startEventBridge
// Init watchdog: after initTimeout, if the provider hasn't moved past NotReady/Error,
// transition to Fatal so callers polling providerStatus stop waiting. The fiber is forked
// into the layer's Scope so it's interrupted when the layer is released.
_ <- (ZIO.sleep(initTimeout) *> state.statusRef.update {
case ProviderStatus.NotReady | ProviderStatus.Error => ProviderStatus.Fatal
case other => other
}).forkScoped
} yield ff

/** Create a FeatureFlags layer from any OpenFeature provider (non-blocking).
*
* The provider initializes in the background. Evaluations fail with `ProviderNotReady` until the provider is ready.
* Use `onProviderReady` or `providerStatus` to detect when the provider becomes available.
* Use `onProviderReady` or `providerStatus` to detect when the provider becomes available. If the provider has not
* become ready within the default 30s init timeout, status atomically transitions to `Fatal` so callers polling
* `providerStatus` stop waiting. See [[fromProviderAsync(OFFeatureProvider, Duration, Duration)]] to override.
*/
def fromProviderAsync(provider: OFFeatureProvider): ZLayer[Scope, Throwable, FeatureFlags] =
ZLayer.scoped(
Expand All @@ -632,7 +701,8 @@ object FeatureFlags {
*
* Combines async initialization with evaluation timeout protection. The provider initializes in the background;
* evaluations fail with `ProviderNotReady` until ready. Once ready, evaluations that exceed `evaluationTimeout` fail
* with `ProviderError` containing a `TimeoutException`.
* with `ProviderError` containing a `TimeoutException`. The init-side default 30s watchdog still applies — see
* [[fromProviderAsync(OFFeatureProvider, Duration, Duration)]] to override.
*/
def fromProviderAsync(
provider: OFFeatureProvider,
Expand All @@ -650,6 +720,29 @@ object FeatureFlags {
)
)

/** Create a FeatureFlags layer with explicit evaluation and initialization timeouts (non-blocking).
*
* `initTimeout` bounds the async ready window: after that duration, if status is still `NotReady` or `Error`, it
* atomically transitions to `Fatal`. Pass a very large duration to effectively disable the watchdog.
*/
def fromProviderAsync(
provider: OFFeatureProvider,
evaluationTimeout: Duration,
initTimeout: Duration
): ZLayer[Scope, Throwable, FeatureFlags] =
ZLayer.scoped(
buildAsync(
provider,
domain = None,
version = None,
initialHooks = Nil,
statusRef = None,
addShutdownFinalizer = true,
evaluationTimeout = Some(evaluationTimeout),
initTimeout = initTimeout
)
)

/** Create a FeatureFlags layer with a named domain (non-blocking). */
def fromProviderWithDomainAsync(
provider: OFFeatureProvider,
Expand Down
157 changes: 157 additions & 0 deletions core/src/test/scala/zio/openfeature/ProviderInitHardeningSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package zio.openfeature

import zio._
import zio.test._
import zio.test.TestAspect.{withLiveClock, sequential}
import dev.openfeature.sdk.{
EvaluationContext => OFEvaluationContext,
EventProvider,
Metadata,
OpenFeatureAPIFactory,
ProviderEvaluation,
ProviderState,
Value
}
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicReference

/** Covers issues #120 (init timeout) and #121 (verify sync init succeeded). The minimal test providers below avoid
* depending on testkit so the spec can live in `core`.
*/
object ProviderInitHardeningSpec extends ZIOSpecDefault {

/** A provider whose `initialize()` blocks until its latch is released. Used to simulate a hanging sync init. */
final private class BlockingInitProvider(latch: CountDownLatch) extends EventProvider {
private val st = new AtomicReference[ProviderState](ProviderState.NOT_READY)

override def getMetadata: Metadata = new Metadata { def getName: String = "BlockingInitProvider" }
override def getState: ProviderState = st.get()

override def initialize(ctx: OFEvaluationContext): Unit = {
latch.await()
st.set(ProviderState.READY)
}
override def shutdown(): Unit = {
latch.countDown() // release any blocked init so the JVM doesn't leak threads
st.set(ProviderState.NOT_READY)
}

override def getBooleanEvaluation(k: String, d: java.lang.Boolean, c: OFEvaluationContext) =
ProviderEvaluation.builder[java.lang.Boolean]().value(d).reason("DEFAULT").build()
override def getStringEvaluation(k: String, d: String, c: OFEvaluationContext) =
ProviderEvaluation.builder[String]().value(d).reason("DEFAULT").build()
override def getIntegerEvaluation(k: String, d: java.lang.Integer, c: OFEvaluationContext) =
ProviderEvaluation.builder[java.lang.Integer]().value(d).reason("DEFAULT").build()
override def getDoubleEvaluation(k: String, d: java.lang.Double, c: OFEvaluationContext) =
ProviderEvaluation.builder[java.lang.Double]().value(d).reason("DEFAULT").build()
override def getObjectEvaluation(k: String, d: Value, c: OFEvaluationContext) =
ProviderEvaluation.builder[Value]().value(d).reason("DEFAULT").build()
}

/** A provider that returns successfully from `initialize()` but reports ERROR. The library should refuse to mark such
* a provider Ready and instead fail layer construction.
*/
final private class InitToErrorProvider extends EventProvider {
private val st = new AtomicReference[ProviderState](ProviderState.NOT_READY)
override def getMetadata: Metadata = new Metadata { def getName: String = "InitToErrorProvider" }
override def getState: ProviderState = st.get()
override def initialize(ctx: OFEvaluationContext): Unit = st.set(ProviderState.ERROR)
override def shutdown(): Unit = st.set(ProviderState.NOT_READY)

override def getBooleanEvaluation(k: String, d: java.lang.Boolean, c: OFEvaluationContext) =
ProviderEvaluation.builder[java.lang.Boolean]().value(d).reason("DEFAULT").build()
override def getStringEvaluation(k: String, d: String, c: OFEvaluationContext) =
ProviderEvaluation.builder[String]().value(d).reason("DEFAULT").build()
override def getIntegerEvaluation(k: String, d: java.lang.Integer, c: OFEvaluationContext) =
ProviderEvaluation.builder[java.lang.Integer]().value(d).reason("DEFAULT").build()
override def getDoubleEvaluation(k: String, d: java.lang.Double, c: OFEvaluationContext) =
ProviderEvaluation.builder[java.lang.Double]().value(d).reason("DEFAULT").build()
override def getObjectEvaluation(k: String, d: Value, c: OFEvaluationContext) =
ProviderEvaluation.builder[Value]().value(d).reason("DEFAULT").build()
}

// Each sync test uses a unique domain so OpenFeatureAPI clients don't share state across cases.
private def uniqueDomain(label: String): String = s"init-hardening-$label-${java.util.UUID.randomUUID()}"

def spec = suite("ProviderInitHardeningSpec")(
test("[A1] sync fromProvider fails with TimeoutException when init blocks past initTimeout") {
val latch = new CountDownLatch(1)
val provider = new BlockingInitProvider(latch)
val api = OpenFeatureAPIFactory.create()
val build = ZIO.scoped {
FeatureFlags
.build(
provider,
domain = Some(uniqueDomain("sync-timeout")),
version = None,
initialHooks = Nil,
statusRef = None,
addShutdownFinalizer = false,
apiOverride = Some(api),
initTimeout = 200.millis
)
.unit
}
for {
result <- build.either
_ = latch.countDown() // unblock the background thread so it can exit
} yield assertTrue(
result.isLeft,
result.left.exists(_.isInstanceOf[java.util.concurrent.TimeoutException])
)
} @@ withLiveClock,
test("[A2] sync fromProvider fails if provider reports ERROR after init") {
val provider = new InitToErrorProvider
val api = OpenFeatureAPIFactory.create()
val build = ZIO.scoped {
FeatureFlags
.build(
provider,
domain = Some(uniqueDomain("sync-err")),
version = None,
initialHooks = Nil,
statusRef = None,
addShutdownFinalizer = false,
apiOverride = Some(api),
initTimeout = 5.seconds
)
.unit
}
for {
result <- build.either
} yield assertTrue(
result.isLeft,
result.left.exists(t =>
t.isInstanceOf[IllegalStateException] && Option(t.getMessage).exists(_.contains("ERROR"))
)
)
} @@ withLiveClock,
test("[A1 async] watchdog transitions provider to Fatal after initTimeout when init hangs") {
// initialize() blocks forever — Java SDK never fires PROVIDER_READY, so the only way out
// of NotReady is the watchdog.
val latch = new CountDownLatch(1)
val provider = new BlockingInitProvider(latch)
val api = OpenFeatureAPIFactory.create()
ZIO.scoped {
for {
statusRef <- Ref.make[ProviderStatus](ProviderStatus.NotReady)
_ <- FeatureFlags.buildAsync(
provider,
domain = Some(uniqueDomain("async-watchdog")),
version = None,
initialHooks = Nil,
statusRef = Some(statusRef),
addShutdownFinalizer = false,
apiOverride = Some(api),
initTimeout = 100.millis
)
before <- statusRef.get
// Live clock so the watchdog fiber's ZIO.sleep actually elapses.
_ <- ZIO.sleep(400.millis)
after <- statusRef.get
_ = latch.countDown()
} yield assertTrue(before == ProviderStatus.NotReady, after == ProviderStatus.Fatal)
}
} @@ withLiveClock
) @@ sequential
}
Loading
Loading