diff --git a/optimizely/src/main/scala/zio/openfeature/optimizely/OptimizelyFeatureProvider.scala b/optimizely/src/main/scala/zio/openfeature/optimizely/OptimizelyFeatureProvider.scala index e96c2db..6ab1d5f 100644 --- a/optimizely/src/main/scala/zio/openfeature/optimizely/OptimizelyFeatureProvider.scala +++ b/optimizely/src/main/scala/zio/openfeature/optimizely/OptimizelyFeatureProvider.scala @@ -226,10 +226,18 @@ final class OptimizelyFeatureProvider private[optimizely] ( } // Common decision pipeline. Returns Left with an error reason string when no decision is possible. + // + // Check order: + // 1. Provider state — if not READY (never initialized, failed init, post-shutdown) we surface PROVIDER_NOT_READY + // without touching the underlying SDK. Calling `optimizely.isValid` post-shutdown would re-enter the polling + // HTTP client, which is closed by then, and throw — bug observed by OptimizelyProviderLifecycleSpec. + // 2. Targeting key — caller-supplied context error; only meaningful once the provider is actually usable. + // 3. `optimizely.isValid` — defence-in-depth in case state says READY but the SDK silently went invalid. private def decide(key: String, ctx: OFEvaluationContext): Either[String, OptimizelyDecision] = { val transformed = ContextTransformer.transform(ctx) - if (transformed.userId.isEmpty) Left("TARGETING_KEY_MISSING") - else if (!optimizely.isValid) Left("PROVIDER_NOT_READY") + if (stateRef.get() != ProviderState.READY) Left("PROVIDER_NOT_READY") + else if (transformed.userId.isEmpty) Left("TARGETING_KEY_MISSING") + else if (!Try(optimizely.isValid).getOrElse(false)) Left("PROVIDER_NOT_READY") else Try(optimizely.createUserContext(transformed.userId, transformed.attributes).decide(key)).toEither.left .map(t => Option(t.getMessage).getOrElse(t.getClass.getSimpleName)) diff --git a/optimizely/src/test/resources/test-datafile-with-flag.json b/optimizely/src/test/resources/test-datafile-with-flag.json new file mode 100644 index 0000000..098ed14 --- /dev/null +++ b/optimizely/src/test/resources/test-datafile-with-flag.json @@ -0,0 +1,51 @@ +{ + "version": "4", + "accountId": "1", + "projectId": "1", + "revision": "1", + "anonymizeIP": false, + "botFiltering": false, + "sendFlagDecisions": true, + "experiments": [], + "featureFlags": [ + { + "id": "100", + "key": "lifecycle_flag", + "rolloutId": "200", + "experimentIds": [], + "variables": [ + { "id": "500", "key": "value", "type": "string", "defaultValue": "default" } + ] + } + ], + "rollouts": [ + { + "id": "200", + "experiments": [ + { + "id": "300", + "key": "rollout-200", + "status": "Running", + "layerId": "200", + "audienceIds": [], + "forcedVariations": {}, + "trafficAllocation": [{ "entityId": "400", "endOfRange": 10000 }], + "variations": [ + { + "id": "400", + "key": "on", + "featureEnabled": true, + "variables": [{ "id": "500", "value": "lifecycle-on" }] + } + ] + } + ] + } + ], + "audiences": [], + "typedAudiences": [], + "groups": [], + "attributes": [], + "variables": [], + "events": [] +} diff --git a/optimizely/src/test/scala/zio/openfeature/optimizely/OptimizelyProviderConcurrencySpec.scala b/optimizely/src/test/scala/zio/openfeature/optimizely/OptimizelyProviderConcurrencySpec.scala new file mode 100644 index 0000000..6922fd4 --- /dev/null +++ b/optimizely/src/test/scala/zio/openfeature/optimizely/OptimizelyProviderConcurrencySpec.scala @@ -0,0 +1,217 @@ +package zio.openfeature.optimizely + +import com.github.tomakehurst.wiremock.WireMockServer +import com.github.tomakehurst.wiremock.client.WireMock._ +import com.github.tomakehurst.wiremock.core.WireMockConfiguration +import com.optimizely.ab.Optimizely +import com.optimizely.ab.config.HttpProjectConfigManager +import dev.openfeature.sdk.{ErrorCode, ImmutableContext, ProviderState, Value} +import zio._ +import zio.test._ + +import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} +import java.util.concurrent.atomic.AtomicInteger + +/** Stress tests for `OptimizelyFeatureProvider`. WireMock-backed so the suite runs on every PR. Verifies that: + * - Many parallel evaluations after init produce consistent results without exceptions. + * - Mixed-type evaluations (boolean / string / int / double / object) interleave safely. + * - Evaluations racing against `initialize` either succeed or surface `PROVIDER_NOT_READY`; no exception escapes. + * - Evaluations racing against `shutdown` likewise return clean errors and never throw. + */ +object OptimizelyProviderConcurrencySpec extends ZIOSpecDefault { + + private val DatafilePath = "/datafiles/concurrency-key.json" + private val ValidDatafile = readResource("/test-datafile-with-flag.json") + private val Flag = "lifecycle_flag" + + private def readResource(path: String): String = + scala.io.Source.fromInputStream(getClass.getResourceAsStream(path)).mkString + + private def withMockServer[A](body: WireMockServer => A): A = { + val server = new WireMockServer(WireMockConfiguration.options().dynamicPort()) + server.start() + try body(server) + finally server.stop() + } + + private def datafileUrl(server: WireMockServer): String = + s"http://localhost:${server.port()}$DatafilePath" + + private def buildClient( + server: WireMockServer, + blockingTimeout: java.time.Duration = java.time.Duration.ofSeconds(2), + pollingInterval: java.time.Duration = java.time.Duration.ofSeconds(3600) + ): Optimizely = { + val mgr = HttpProjectConfigManager + .builder() + .withSdkKey("concurrency-key") + .withUrl(datafileUrl(server)) + .withBlockingTimeout(blockingTimeout.toMillis, TimeUnit.MILLISECONDS) + .withPollingInterval(pollingInterval.toSeconds, TimeUnit.SECONDS) + .build() + Optimizely.builder().withConfigManager(mgr).build() + } + + @scala.annotation.nowarn("msg=deprecated") + private def stateOf(p: OptimizelyFeatureProvider): ProviderState = p.getState + + /** Spawn `n` worker threads, release them simultaneously via a latch, and collect outcomes. */ + private def race[A](n: Int)(work: Int => A): IndexedSeq[Either[Throwable, A]] = { + val pool = Executors.newFixedThreadPool(math.min(n, 64)) + val release = new CountDownLatch(1) + val started = new CountDownLatch(n) + val results = new Array[Either[Throwable, A]](n) + val countdown = new CountDownLatch(n) + try { + (0 until n).foreach { i => + pool.submit(new Runnable { + override def run(): Unit = { + started.countDown() + release.await() + results(i) = + try Right(work(i)) + catch { case t: Throwable => Left(t) } + countdown.countDown() + } + }) + () + } + started.await(10, TimeUnit.SECONDS) + release.countDown() + countdown.await(30, TimeUnit.SECONDS) + results.toIndexedSeq + } finally { + pool.shutdownNow() + () + } + } + + def spec: Spec[TestEnvironment & Scope, Any] = suite("OptimizelyFeatureProvider concurrency")( + test("N concurrent boolean evaluations after init return consistent results without exceptions") { + withMockServer { server => + server.stubFor(get(urlEqualTo(DatafilePath)).willReturn(okJson(ValidDatafile))) + val provider = + new OptimizelyFeatureProvider(buildClient(server), java.time.Duration.ofSeconds(3), closeOnShutdown = true) + try { + provider.initialize(new ImmutableContext()) + val N = 1000 + val ctxs = (0 until N).map(i => new ImmutableContext(s"user-$i")) + val outcomes = race(N) { i => + provider.getBooleanEvaluation(Flag, java.lang.Boolean.FALSE, ctxs(i)) + } + val anyThrew = outcomes.exists(_.isLeft) + val allEnabled = outcomes.collect { case Right(e) => e.getValue }.forall(_ == java.lang.Boolean.TRUE) + val countReturned = outcomes.size + assertTrue( + !anyThrew, + countReturned == N, + allEnabled + ) + } finally provider.shutdown() + } + }, + test("N concurrent mixed-type evaluations don't trip over each other") { + withMockServer { server => + server.stubFor(get(urlEqualTo(DatafilePath)).willReturn(okJson(ValidDatafile))) + val provider = + new OptimizelyFeatureProvider(buildClient(server), java.time.Duration.ofSeconds(3), closeOnShutdown = true) + try { + provider.initialize(new ImmutableContext()) + val N = 500 + val ctxs = (0 until N).map(i => new ImmutableContext(s"user-$i")) + val outcomes = race(N) { i => + val ctx = ctxs(i) + i % 5 match { + case 0 => provider.getBooleanEvaluation(Flag, java.lang.Boolean.FALSE, ctx).getValue.toString + case 1 => provider.getStringEvaluation(Flag, "fallback", ctx).getValue + case 2 => provider.getIntegerEvaluation(Flag, java.lang.Integer.valueOf(-1), ctx).getValue.toString + case 3 => provider.getDoubleEvaluation(Flag, java.lang.Double.valueOf(-1.0), ctx).getValue.toString + case _ => provider.getObjectEvaluation(Flag, new Value(), ctx).getValue.toString + } + } + val anyThrew = outcomes.exists(_.isLeft) + val allReturned = outcomes.size == N + assertTrue(!anyThrew, allReturned) + } finally provider.shutdown() + } + }, + test("evaluations racing initialize never throw — each is either ready or PROVIDER_NOT_READY") { + withMockServer { server => + // Delay the datafile response so initialize is genuinely in-flight while evaluators race against it. + server.stubFor( + get(urlEqualTo(DatafilePath)) + .willReturn(okJson(ValidDatafile).withFixedDelay(200)) + ) + val provider = + new OptimizelyFeatureProvider(buildClient(server), java.time.Duration.ofSeconds(2), closeOnShutdown = true) + try { + val N = 200 + val initStarted = new CountDownLatch(1) + val initThread = new Thread(() => { + initStarted.countDown() + try provider.initialize(new ImmutableContext()) + catch { case _: Throwable => () } + }) + initThread.start() + initStarted.await() + val ctxs = (0 until N).map(i => new ImmutableContext(s"racer-$i")) + val outcomes = race(N) { i => + provider.getBooleanEvaluation(Flag, java.lang.Boolean.FALSE, ctxs(i)) + } + initThread.join(5000) + val anyThrew = outcomes.exists(_.isLeft) + val classified = outcomes.collect { case Right(e) => + if (e.getErrorCode == ErrorCode.PROVIDER_NOT_READY) "not-ready" + else if (e.getValue == java.lang.Boolean.TRUE) "ready-true" + else "other" + } + val onlyKnownStates = classified.forall(s => s == "not-ready" || s == "ready-true") + val finalState = stateOf(provider) + assertTrue(!anyThrew, onlyKnownStates, finalState == ProviderState.READY) + } finally provider.shutdown() + } + }, + test("evaluations racing shutdown never throw — they surface PROVIDER_NOT_READY") { + withMockServer { server => + server.stubFor(get(urlEqualTo(DatafilePath)).willReturn(okJson(ValidDatafile))) + val provider = + new OptimizelyFeatureProvider(buildClient(server), java.time.Duration.ofSeconds(3), closeOnShutdown = true) + provider.initialize(new ImmutableContext()) + val N = 200 + val live = new AtomicInteger(0) + val ctxs = (0 until N).map(i => new ImmutableContext(s"shutdown-racer-$i")) + val pool = Executors.newFixedThreadPool(32) + val gate = new CountDownLatch(1) + val done = new CountDownLatch(N) + val errors = new AtomicInteger(0) + (0 until N).foreach { i => + pool.submit(new Runnable { + override def run(): Unit = { + gate.await() + try { + val e = provider.getBooleanEvaluation(Flag, java.lang.Boolean.FALSE, ctxs(i)) + if (e.getErrorCode == null && e.getValue == java.lang.Boolean.TRUE) { + val _ = live.incrementAndGet() + } + } catch { case _: Throwable => val _ = errors.incrementAndGet() } + done.countDown() + } + }) + () + } + // Fire the racers and call shutdown shortly after — some evaluations land before, some after. + gate.countDown() + Thread.sleep(5) + provider.shutdown() + val finished = done.await(15, TimeUnit.SECONDS) + pool.shutdownNow() + val stateAfter = stateOf(provider) + assertTrue( + finished, + errors.get() == 0, + stateAfter == ProviderState.NOT_READY + ) + } + } + ) @@ TestAspect.sequential @@ TestAspect.timeout(60.seconds) @@ TestAspect.withLiveClock +} diff --git a/optimizely/src/test/scala/zio/openfeature/optimizely/OptimizelyProviderLifecycleSpec.scala b/optimizely/src/test/scala/zio/openfeature/optimizely/OptimizelyProviderLifecycleSpec.scala new file mode 100644 index 0000000..631c164 --- /dev/null +++ b/optimizely/src/test/scala/zio/openfeature/optimizely/OptimizelyProviderLifecycleSpec.scala @@ -0,0 +1,248 @@ +package zio.openfeature.optimizely + +import com.github.tomakehurst.wiremock.WireMockServer +import com.github.tomakehurst.wiremock.client.WireMock._ +import com.github.tomakehurst.wiremock.core.WireMockConfiguration +import com.optimizely.ab.Optimizely +import com.optimizely.ab.config.HttpProjectConfigManager +import dev.openfeature.sdk.{ErrorCode, ImmutableContext, ProviderState, Reason} +import zio._ +import zio.test._ + +import java.util.concurrent.{CountDownLatch, TimeUnit} +import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} + +/** State-machine and lifecycle tests for `OptimizelyFeatureProvider`. WireMock-backed so the suite runs on every PR + * without needing Docker. + * + * Covers behaviours that aren't otherwise exercised by the WireMock failure-mode spec or the docker-compose IT suite: + * - `initialize` and `shutdown` idempotence. + * - `shutdown` without prior `initialize`. + * - Evaluations before `initialize` and after `shutdown` surface `PROVIDER_NOT_READY` rather than throwing. + * - State transitions NOT_READY → READY → NOT_READY on the happy path; NOT_READY → ERROR on a failed init. + * - Concurrent `initialize` from multiple threads converges on a READY state without exceptions escaping. + */ +object OptimizelyProviderLifecycleSpec extends ZIOSpecDefault { + + private val DatafilePath = "/datafiles/lifecycle-key.json" + private val ValidDatafile = readResource("/test-datafile-with-flag.json") + private val emptyContext = new ImmutableContext() + // Evaluations require a targeting key, otherwise `decide` short-circuits with TARGETING_KEY_MISSING before reaching + // the ready-check. We want to observe PROVIDER_NOT_READY, so all eval calls go through this targeted context. + private val targetedContext = new ImmutableContext("user-lifecycle") + + private def readResource(path: String): String = + scala.io.Source.fromInputStream(getClass.getResourceAsStream(path)).mkString + + private def withMockServer[A](body: WireMockServer => A): A = { + val server = new WireMockServer(WireMockConfiguration.options().dynamicPort()) + server.start() + try body(server) + finally server.stop() + } + + private def datafileUrl(server: WireMockServer): String = + s"http://localhost:${server.port()}$DatafilePath" + + /** Build a client with aggressive timeouts so failure-mode paths fail fast and `isValid` doesn't absorb whatever + * delay we configure via the outer init wait. + */ + private def buildClient( + server: WireMockServer, + blockingTimeout: java.time.Duration = java.time.Duration.ofSeconds(2), + pollingInterval: java.time.Duration = java.time.Duration.ofSeconds(3600) + ): Optimizely = { + val mgr = HttpProjectConfigManager + .builder() + .withSdkKey("lifecycle-key") + .withUrl(datafileUrl(server)) + .withBlockingTimeout(blockingTimeout.toMillis, TimeUnit.MILLISECONDS) + .withPollingInterval(pollingInterval.toSeconds, TimeUnit.SECONDS) + .build() + Optimizely.builder().withConfigManager(mgr).build() + } + + @scala.annotation.nowarn("msg=deprecated") + private def stateOf(p: OptimizelyFeatureProvider): ProviderState = p.getState + + private def tryInit(provider: OptimizelyFeatureProvider): Either[Throwable, Unit] = + try { provider.initialize(emptyContext); Right(()) } + catch { case t: Throwable => Left(t) } + + def spec: Spec[TestEnvironment & Scope, Any] = suite("OptimizelyFeatureProvider lifecycle")( + test("initialize is idempotent — second call is a no-op, state stays READY") { + withMockServer { server => + server.stubFor(get(urlEqualTo(DatafilePath)).willReturn(okJson(ValidDatafile))) + val client = buildClient(server) + val provider = new OptimizelyFeatureProvider(client, java.time.Duration.ofSeconds(3), closeOnShutdown = true) + try { + val first = tryInit(provider) + val firstState = stateOf(provider) + val second = tryInit(provider) + val secondState = stateOf(provider) + val evalAfter = provider.getBooleanEvaluation("lifecycle_flag", java.lang.Boolean.FALSE, targetedContext) + assertTrue( + first.isRight, + second.isRight, + firstState == ProviderState.READY, + secondState == ProviderState.READY, + evalAfter.getValue == java.lang.Boolean.TRUE + ) + } finally provider.shutdown() + } + }, + test("shutdown is idempotent — second call is a no-op") { + withMockServer { server => + server.stubFor(get(urlEqualTo(DatafilePath)).willReturn(okJson(ValidDatafile))) + val client = buildClient(server) + val provider = new OptimizelyFeatureProvider(client, java.time.Duration.ofSeconds(3), closeOnShutdown = true) + val initRes = tryInit(provider) + val firstShutdown = + try { provider.shutdown(); Right(()) } + catch { case t: Throwable => Left(t) } + val afterFirst = stateOf(provider) + val secondShutdown = + try { provider.shutdown(); Right(()) } + catch { case t: Throwable => Left(t) } + val afterSecond = stateOf(provider) + assertTrue( + initRes.isRight, + firstShutdown.isRight, + secondShutdown.isRight, + afterFirst == ProviderState.NOT_READY, + afterSecond == ProviderState.NOT_READY + ) + } + }, + test("shutdown before initialize — no exception, state stays NOT_READY") { + withMockServer { server => + server.stubFor(get(urlEqualTo(DatafilePath)).willReturn(okJson(ValidDatafile))) + val client = buildClient(server) + val provider = new OptimizelyFeatureProvider(client, java.time.Duration.ofSeconds(3), closeOnShutdown = true) + val before = stateOf(provider) + val result = + try { provider.shutdown(); Right(()) } + catch { case t: Throwable => Left(t) } + val after = stateOf(provider) + assertTrue( + result.isRight, + before == ProviderState.NOT_READY, + after == ProviderState.NOT_READY + ) + } + }, + test("evaluation before initialize surfaces PROVIDER_NOT_READY") { + withMockServer { server => + server.stubFor(get(urlEqualTo(DatafilePath)).willReturn(okJson(ValidDatafile))) + val client = buildClient(server) + val provider = new OptimizelyFeatureProvider(client, java.time.Duration.ofSeconds(3), closeOnShutdown = true) + try { + val eval = provider.getBooleanEvaluation("lifecycle_flag", java.lang.Boolean.TRUE, targetedContext) + val state = stateOf(provider) + assertTrue( + state == ProviderState.NOT_READY, + eval.getValue == java.lang.Boolean.TRUE, + eval.getErrorCode == ErrorCode.PROVIDER_NOT_READY, + eval.getReason == Reason.ERROR.name() + ) + } finally provider.shutdown() + } + }, + test("evaluation after shutdown surfaces PROVIDER_NOT_READY") { + withMockServer { server => + server.stubFor(get(urlEqualTo(DatafilePath)).willReturn(okJson(ValidDatafile))) + val client = buildClient(server) + val provider = new OptimizelyFeatureProvider(client, java.time.Duration.ofSeconds(3), closeOnShutdown = true) + val init = tryInit(provider) + val ready = stateOf(provider) + provider.shutdown() + val eval = provider.getBooleanEvaluation("lifecycle_flag", java.lang.Boolean.TRUE, targetedContext) + val state = stateOf(provider) + assertTrue( + init.isRight, + ready == ProviderState.READY, + state == ProviderState.NOT_READY, + eval.getValue == java.lang.Boolean.TRUE, + eval.getErrorCode == ErrorCode.PROVIDER_NOT_READY, + eval.getReason == Reason.ERROR.name() + ) + } + }, + test("state transitions: NOT_READY -> READY -> NOT_READY across init/shutdown") { + withMockServer { server => + server.stubFor(get(urlEqualTo(DatafilePath)).willReturn(okJson(ValidDatafile))) + val client = buildClient(server) + val provider = new OptimizelyFeatureProvider(client, java.time.Duration.ofSeconds(3), closeOnShutdown = true) + val initial = stateOf(provider) + val init = tryInit(provider) + val afterInit = stateOf(provider) + provider.shutdown() + val afterShutdown = stateOf(provider) + assertTrue( + init.isRight, + initial == ProviderState.NOT_READY, + afterInit == ProviderState.READY, + afterShutdown == ProviderState.NOT_READY + ) + } + }, + test("failed init (404) transitions NOT_READY -> ERROR; subsequent eval surfaces PROVIDER_NOT_READY") { + withMockServer { server => + server.stubFor(get(urlEqualTo(DatafilePath)).willReturn(aResponse().withStatus(404).withBody("Not Found"))) + val client = buildClient(server) + val provider = new OptimizelyFeatureProvider(client, java.time.Duration.ofMillis(500), closeOnShutdown = true) + try { + val initial = stateOf(provider) + val init = tryInit(provider) + val after = stateOf(provider) + val eval = provider.getBooleanEvaluation("lifecycle_flag", java.lang.Boolean.TRUE, targetedContext) + assertTrue( + initial == ProviderState.NOT_READY, + init.isLeft, + after == ProviderState.ERROR, + eval.getValue == java.lang.Boolean.TRUE, + eval.getErrorCode == ErrorCode.PROVIDER_NOT_READY + ) + } finally provider.shutdown() + } + }, + test("concurrent initialize from N threads — exactly one progresses init, all return cleanly, state ends READY") { + withMockServer { server => + server.stubFor(get(urlEqualTo(DatafilePath)).willReturn(okJson(ValidDatafile))) + val client = buildClient(server) + val provider = new OptimizelyFeatureProvider(client, java.time.Duration.ofSeconds(5), closeOnShutdown = true) + try { + val N = 16 + val release = new CountDownLatch(1) + val started = new CountDownLatch(N) + val errors = new AtomicInteger(0) + val outcomes = new AtomicReference[List[Either[Throwable, Unit]]](Nil) + val threads: Seq[Thread] = (1 to N).map { _ => + new Thread(() => { + started.countDown() + release.await() + val r: Either[Throwable, Unit] = + try { provider.initialize(emptyContext); Right(()) } + catch { case t: Throwable => errors.incrementAndGet(); Left(t) } + val _ = outcomes.updateAndGet(rs => r :: rs) + }) + } + threads.foreach(_.start()) + // Wait for all threads to be parked at `release.await()` before kicking them off so they race for the + // AtomicBoolean as simultaneously as the JVM allows. + val _ = started.await(5, TimeUnit.SECONDS) + release.countDown() + threads.foreach(_.join(5000)) + val finalState = stateOf(provider) + val allReturned = outcomes.get().size == N + val anyFailed = errors.get() > 0 + assertTrue( + allReturned, + !anyFailed, + finalState == ProviderState.READY + ) + } finally provider.shutdown() + } + } + ) @@ TestAspect.sequential @@ TestAspect.timeout(60.seconds) @@ TestAspect.withLiveClock +}