Skip to content

Commit a16c808

Browse files
Fix circuit breaker probe lockup, cache key collision, evaluators leak, and HOCON list corruption (#107)
1 parent 204f7ad commit a16c808

17 files changed

Lines changed: 324 additions & 171 deletions

File tree

core/src/main/scala/zio/openfeature/FeatureFlagRegistry.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,7 @@ final private class FeatureFlagRegistryLive(
7474
case Some(client) =>
7575
client
7676
.setProvider(provider)
77-
.tapBoth(
78-
_ => ZIO.unit,
79-
_ => providers.update(_ + (domain -> provider))
80-
)
77+
.tap(_ => providers.update(_ + (domain -> provider)))
8178
case None =>
8279
providers.update(_ + (domain -> provider))
8380
}

core/src/main/scala/zio/openfeature/FeatureFlagsLive.scala

Lines changed: 60 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,26 @@ final private[openfeature] class FeatureFlagsLive(
3636
// Read provider name dynamically so events after a provider swap use the new name
3737
def currentMetadata(runtime: Runtime[Any]): ProviderMetadata = {
3838
val name = Unsafe.unsafe { implicit u =>
39-
runtime.unsafe.run(providerNameRef.get).getOrThrowFiberFailure()
39+
runtime.unsafe
40+
.run(
41+
providerNameRef.get.catchAllCause(c =>
42+
ZIO.logErrorCause("event bridge: providerNameRef.get", c).as("unknown")
43+
)
44+
)
45+
.getOrElse(_ => "unknown")
4046
}
4147
ProviderMetadata(name)
4248
}
4349

50+
// Run an event-publish effect from a Java SDK thread. Failures are logged via the ZIO logger
51+
// rather than thrown, so the Java SDK's event dispatch thread is never killed by a defect.
52+
def runHandler(runtime: Runtime[Any], label: String)(effect: UIO[Unit]): Unit =
53+
Unsafe.unsafe { implicit u =>
54+
runtime.unsafe
55+
.run(effect.catchAllCause(c => ZIO.logErrorCause(s"event bridge: $label", c)))
56+
.getOrElse(_ => ())
57+
}
58+
4459
ZIO.runtime[Any].flatMap { runtime =>
4560
def extractEventMetadata(details: EventDetails): FlagMetadata =
4661
try {
@@ -49,57 +64,47 @@ final private[openfeature] class FeatureFlagsLive(
4964
else convertImmutableMetadata(javaMeta)
5065
} catch { case _: Exception => FlagMetadata.empty }
5166

52-
val readyHandler: java.util.function.Consumer[EventDetails] = details =>
53-
Unsafe.unsafe { implicit u =>
54-
val em = extractEventMetadata(details)
55-
runtime.unsafe
56-
.run(
57-
state.statusRef.set(ProviderStatus.Ready) *>
58-
state.eventHub.publish(ProviderEvent.Ready(currentMetadata(runtime), em))
59-
)
60-
.getOrThrowFiberFailure()
61-
onReady.foreach(_.countDown())
62-
}
67+
val readyHandler: java.util.function.Consumer[EventDetails] = details => {
68+
val em = extractEventMetadata(details)
69+
runHandler(runtime, "PROVIDER_READY")(
70+
state.statusRef.set(ProviderStatus.Ready) *>
71+
state.eventHub.publish(ProviderEvent.Ready(currentMetadata(runtime), em)).unit
72+
)
73+
onReady.foreach(_.countDown())
74+
}
6375

64-
val errorHandler: java.util.function.Consumer[EventDetails] = details =>
65-
Unsafe.unsafe { implicit u =>
66-
val error = new RuntimeException(Option(details.getMessage).getOrElse("Provider error"))
67-
val errorCode = Option(details.getErrorCode).map(ErrorCodeConverter.fromJava)
68-
val em = extractEventMetadata(details)
69-
runtime.unsafe
70-
.run(
71-
state.statusRef.set(ProviderStatus.Error) *>
72-
state.eventHub.publish(
73-
ProviderEvent.Error(error, currentMetadata(runtime), errorCode, Option(details.getMessage), em)
74-
)
75-
)
76-
.getOrThrowFiberFailure()
77-
}
76+
val errorHandler: java.util.function.Consumer[EventDetails] = details => {
77+
val error = new RuntimeException(Option(details.getMessage).getOrElse("Provider error"))
78+
val errorCode = Option(details.getErrorCode).map(ErrorCodeConverter.fromJava)
79+
val em = extractEventMetadata(details)
80+
runHandler(runtime, "PROVIDER_ERROR")(
81+
state.statusRef.set(ProviderStatus.Error) *>
82+
state.eventHub
83+
.publish(ProviderEvent.Error(error, currentMetadata(runtime), errorCode, Option(details.getMessage), em))
84+
.unit
85+
)
86+
}
7887

79-
val staleHandler: java.util.function.Consumer[EventDetails] = details =>
80-
Unsafe.unsafe { implicit u =>
81-
val reason = Option(details.getMessage).getOrElse("Provider stale")
82-
val em = extractEventMetadata(details)
83-
runtime.unsafe
84-
.run(
85-
state.statusRef.set(ProviderStatus.Stale) *>
86-
state.eventHub.publish(ProviderEvent.Stale(reason, currentMetadata(runtime), em))
87-
)
88-
.getOrThrowFiberFailure()
89-
}
88+
val staleHandler: java.util.function.Consumer[EventDetails] = details => {
89+
val reason = Option(details.getMessage).getOrElse("Provider stale")
90+
val em = extractEventMetadata(details)
91+
runHandler(runtime, "PROVIDER_STALE")(
92+
state.statusRef.set(ProviderStatus.Stale) *>
93+
state.eventHub.publish(ProviderEvent.Stale(reason, currentMetadata(runtime), em)).unit
94+
)
95+
}
9096

91-
val configHandler: java.util.function.Consumer[EventDetails] = details =>
92-
Unsafe.unsafe { implicit u =>
93-
val flags = Option(details.getFlagsChanged)
94-
.map(_.asScala.toSet)
95-
.getOrElse(Set.empty[String])
96-
val em = extractEventMetadata(details)
97-
runtime.unsafe
98-
.run(
99-
state.eventHub.publish(ProviderEvent.ConfigurationChanged(flags, currentMetadata(runtime), em))
100-
)
101-
.getOrThrowFiberFailure()
102-
}
97+
val configHandler: java.util.function.Consumer[EventDetails] = details => {
98+
val flags = Option(details.getFlagsChanged)
99+
.map(_.asScala.toSet)
100+
.getOrElse(Set.empty[String])
101+
val em = extractEventMetadata(details)
102+
runHandler(runtime, "PROVIDER_CONFIGURATION_CHANGED")(
103+
state.eventHub
104+
.publish(ProviderEvent.ConfigurationChanged(flags, currentMetadata(runtime), em))
105+
.unit
106+
)
107+
}
103108

104109
for {
105110
_ <- ZIO.succeed {
@@ -790,8 +795,12 @@ final private[openfeature] class FeatureFlagsLive(
790795
ZIO.attempt {
791796
val jCtx = toJavaHookContext(ctx)
792797
val jHints = hints.values.map { case (k, v) => k -> v.asInstanceOf[Object] }.asJava
793-
val ex = err.cause.getOrElse(new RuntimeException(err.message))
794-
hook.error(jCtx, ex.asInstanceOf[Exception], jHints)
798+
val ex: Exception = err.cause match {
799+
case Some(e: Exception) => e
800+
case Some(t) => new RuntimeException(t.getMessage, t)
801+
case None => new RuntimeException(err.message)
802+
}
803+
hook.error(jCtx, ex, jHints)
795804
}.ignore
796805

797806
override def finallyAfter(

core/src/main/scala/zio/openfeature/Hook.scala

Lines changed: 37 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -279,39 +279,41 @@ object FeatureHook {
279279
}
280280
} yield None
281281

282-
private def elapsed(ctx: HookContext): Duration = {
283-
val end = java.lang.System.nanoTime()
284-
val start = ctx.hookData.get(startTimeKey).getOrElse(end)
285-
Duration.fromNanos(end - start)
286-
}
282+
private def elapsed(ctx: HookContext): UIO[Duration] =
283+
Clock.nanoTime.map { end =>
284+
val start = ctx.hookData.get(startTimeKey).getOrElse(end)
285+
Duration.fromNanos(end - start)
286+
}
287287

288288
override def after[A](ctx: HookContext, details: FlagResolution[A], hints: HookHints): UIO[Unit] =
289289
afterLevel match {
290290
case Some(level) =>
291-
val duration = elapsed(ctx)
292-
val annotations = baseAnnotations(ctx) ++ contextAnnotations(ctx) ++ Set(
293-
zio.LogAnnotation("flag.value", String.valueOf(details.value)),
294-
zio.LogAnnotation("flag.reason", details.reason.toString),
295-
zio.LogAnnotation("flag.duration_ms", duration.toMillis.toString)
296-
) ++ details.variant.map(v => zio.LogAnnotation("flag.variant", v)).toSet
297-
annotate(annotations)(
298-
logAtLevel(level, s"Flag '${ctx.flagKey}' = ${details.value} (${details.reason}, ${duration.toMillis}ms)")
299-
)
291+
elapsed(ctx).flatMap { duration =>
292+
val annotations = baseAnnotations(ctx) ++ contextAnnotations(ctx) ++ Set(
293+
zio.LogAnnotation("flag.value", String.valueOf(details.value)),
294+
zio.LogAnnotation("flag.reason", details.reason.toString),
295+
zio.LogAnnotation("flag.duration_ms", duration.toMillis.toString)
296+
) ++ details.variant.map(v => zio.LogAnnotation("flag.variant", v)).toSet
297+
annotate(annotations)(
298+
logAtLevel(level, s"Flag '${ctx.flagKey}' = ${details.value} (${details.reason}, ${duration.toMillis}ms)")
299+
)
300+
}
300301
case None => ZIO.unit
301302
}
302303

303304
override def error(ctx: HookContext, err: FeatureFlagError, hints: HookHints): UIO[Unit] =
304305
errorLevel match {
305306
case Some(level) =>
306-
val duration = elapsed(ctx)
307-
val annotations = baseAnnotations(ctx) ++ contextAnnotations(ctx) ++ Set(
308-
zio.LogAnnotation("flag.error", err.message),
309-
zio.LogAnnotation("flag.error.type", err.getClass.getSimpleName),
310-
zio.LogAnnotation("flag.duration_ms", duration.toMillis.toString)
311-
)
312-
annotate(annotations)(
313-
logAtLevel(level, s"Flag '${ctx.flagKey}' evaluation failed: ${err.message}")
314-
)
307+
elapsed(ctx).flatMap { duration =>
308+
val annotations = baseAnnotations(ctx) ++ contextAnnotations(ctx) ++ Set(
309+
zio.LogAnnotation("flag.error", err.message),
310+
zio.LogAnnotation("flag.error.type", err.getClass.getSimpleName),
311+
zio.LogAnnotation("flag.duration_ms", duration.toMillis.toString)
312+
)
313+
annotate(annotations)(
314+
logAtLevel(level, s"Flag '${ctx.flagKey}' evaluation failed: ${err.message}")
315+
)
316+
}
315317
case None => ZIO.unit
316318
}
317319
}
@@ -366,19 +368,19 @@ object FeatureHook {
366368
_ = ctx.hookData.set(startTimeKey, now)
367369
} yield None
368370

369-
override def after[A](ctx: HookContext, details: FlagResolution[A], hints: HookHints): UIO[Unit] = {
370-
val end = java.lang.System.nanoTime()
371-
val start = ctx.hookData.get(startTimeKey).getOrElse(end)
372-
val duration = Duration.fromNanos(end - start)
373-
onSuccess(ctx, details, duration)
374-
}
371+
override def after[A](ctx: HookContext, details: FlagResolution[A], hints: HookHints): UIO[Unit] =
372+
Clock.nanoTime.flatMap { end =>
373+
val start = ctx.hookData.get(startTimeKey).getOrElse(end)
374+
val duration = Duration.fromNanos(end - start)
375+
onSuccess(ctx, details, duration)
376+
}
375377

376-
override def error(ctx: HookContext, err: FeatureFlagError, hints: HookHints): UIO[Unit] = {
377-
val end = java.lang.System.nanoTime()
378-
val start = ctx.hookData.get(startTimeKey).getOrElse(end)
379-
val duration = Duration.fromNanos(end - start)
380-
onError(ctx, err, duration)
381-
}
378+
override def error(ctx: HookContext, err: FeatureFlagError, hints: HookHints): UIO[Unit] =
379+
Clock.nanoTime.flatMap { end =>
380+
val start = ctx.hookData.get(startTimeKey).getOrElse(end)
381+
val duration = Duration.fromNanos(end - start)
382+
onError(ctx, err, duration)
383+
}
382384
}
383385

384386
def contextValidator(

core/src/main/scala/zio/openfeature/internal/ContextConverter.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,13 @@ private[openfeature] object ContextConverter {
7676
else if (value.isString) AttributeValue.StringValue(value.asString())
7777
else if (value.isNumber) {
7878
val num = value.asDouble()
79-
if (num == num.toLong.toDouble) AttributeValue.IntValue(num.toInt)
80-
else AttributeValue.DoubleValue(num)
79+
// Long.MaxValue.toDouble rounds up to 2^63 (strictly greater than Long.MaxValue), so .toLong on
80+
// out-of-range doubles silently saturates. Strict `<` on the upper bound rejects the saturation point.
81+
if (num == num.toLong.toDouble && num >= Long.MinValue.toDouble && num < Long.MaxValue.toDouble) {
82+
val asLong = num.toLong
83+
if (asLong >= Int.MinValue && asLong <= Int.MaxValue) AttributeValue.IntValue(asLong.toInt)
84+
else AttributeValue.LongValue(asLong)
85+
} else AttributeValue.DoubleValue(num)
8186
} else if (value.isList) {
8287
val list = value.asList().asScala.map(valueToAttribute).toList
8388
AttributeValue.ListValue(list)

core/src/main/scala/zio/openfeature/internal/FeatureFlagsState.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@ object FeatureFlagsState {
2222
fiberCtxRef <- FiberRef.make(EvaluationContext.empty)
2323
transactionRef <- FiberRef.make[Option[TransactionState]](None)
2424
hooksRef <- Ref.make(List.empty[FeatureHook])
25-
eventHub <- Hub.unbounded[ProviderEvent]
26-
statusRef <- Ref.make[ProviderStatus](ProviderStatus.NotReady)
27-
trackRec <- Ref.make(List.empty[(String, EvaluationContext, Option[TrackingEventDetails])])
25+
// Bounded; subscribers reconcile via current state on next evaluation, so dropping intermediate events is safe.
26+
eventHub <- Hub.dropping[ProviderEvent](256)
27+
statusRef <- Ref.make[ProviderStatus](ProviderStatus.NotReady)
28+
trackRec <- Ref.make(List.empty[(String, EvaluationContext, Option[TrackingEventDetails])])
2829
} yield FeatureFlagsState(
2930
globalCtxRef,
3031
clientCtxRef,

core/src/test/scala/zio/openfeature/HookSpec.scala

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -352,18 +352,14 @@ object HookSpec extends ZIOSpecDefault {
352352
result <- hook.before(makeHookContext(), HookHints.empty)
353353
} yield assertTrue(result.isEmpty)
354354
},
355-
test("logging hook after logs info") {
355+
test("logging hook after completes without error") {
356356
val hook = FeatureHook.logging(logBefore = false, logAfter = true, logError = false)
357357
val resolution = FlagResolution.default("test", true)
358-
for {
359-
_ <- hook.after(makeHookContext(), resolution, HookHints.empty)
360-
} yield assertTrue(true)
358+
hook.after(makeHookContext(), resolution, HookHints.empty).as(assertCompletes)
361359
},
362-
test("logging hook error logs error") {
360+
test("logging hook error completes without error") {
363361
val hook = FeatureHook.logging(logBefore = false, logAfter = false, logError = true)
364-
for {
365-
_ <- hook.error(makeHookContext(), FeatureFlagError.FlagNotFound("test"), HookHints.empty)
366-
} yield assertTrue(true)
362+
hook.error(makeHookContext(), FeatureFlagError.FlagNotFound("test"), HookHints.empty).as(assertCompletes)
367363
},
368364
test("logging hook all disabled") {
369365
val hook = FeatureHook.logging(logBefore = false, logAfter = false, logError = false)
@@ -386,22 +382,22 @@ object HookSpec extends ZIOSpecDefault {
386382
assertTrue(result.isEmpty) &&
387383
assertTrue(hookCtx.hookData.get(TypedKey[Long]("structuredLogging.startTime")).isDefined)
388384
},
389-
test("after completes successfully") {
385+
test("after completes and reads start time from hookData") {
390386
val hook = FeatureHook.structuredLogging()
391387
val hookCtx = makeHookContext()
392388
val resolution = FlagResolution.default("test", true)
393389
for {
394390
_ <- hook.before(hookCtx, HookHints.empty)
395391
_ <- hook.after(hookCtx, resolution, HookHints.empty)
396-
} yield assertTrue(true)
392+
} yield assertTrue(hookCtx.hookData.get(TypedKey[Long]("structuredLogging.startTime")).isDefined)
397393
},
398-
test("error completes successfully") {
394+
test("error completes and reads start time from hookData") {
399395
val hook = FeatureHook.structuredLogging()
400396
val hookCtx = makeHookContext()
401397
for {
402398
_ <- hook.before(hookCtx, HookHints.empty)
403399
_ <- hook.error(hookCtx, FeatureFlagError.FlagNotFound("test"), HookHints.empty)
404-
} yield assertTrue(true)
400+
} yield assertTrue(hookCtx.hookData.get(TypedKey[Long]("structuredLogging.startTime")).isDefined)
405401
},
406402
test("disabled levels skip logging but still track start time") {
407403
val hook = FeatureHook.structuredLogging(beforeLevel = None, afterLevel = None, errorLevel = None)
@@ -426,7 +422,7 @@ object HookSpec extends ZIOSpecDefault {
426422
for {
427423
_ <- hook.before(hookCtx, HookHints.empty)
428424
_ <- hook.after(hookCtx, resolution, HookHints.empty)
429-
} yield assertTrue(true)
425+
} yield assertTrue(hookCtx.hookData.get(TypedKey[Long]("structuredLogging.startTime")).isDefined)
430426
},
431427
test("redactKeys hides sensitive values while preserving others") {
432428
val hook =

0 commit comments

Comments
 (0)