Skip to content

Commit fed5f39

Browse files
fix: thread memory leak in InProcessResolver (#1678)
Signed-off-by: Paul Johe <paul@kraftlauget.no>
1 parent 5ab258e commit fed5f39

2 files changed

Lines changed: 96 additions & 42 deletions

File tree

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java

Lines changed: 60 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import dev.openfeature.sdk.exceptions.TypeMismatchError;
2929
import dev.openfeature.sdk.internal.TriConsumer;
3030
import java.util.Map;
31+
import java.util.concurrent.atomic.AtomicBoolean;
32+
import java.util.concurrent.atomic.AtomicReference;
3133
import lombok.extern.slf4j.Slf4j;
3234
import org.apache.commons.lang3.StringUtils;
3335

@@ -38,11 +40,15 @@
3840
*/
3941
@Slf4j
4042
public class InProcessResolver implements Resolver {
43+
44+
static final String STATE_WATCHER_THREAD_NAME = "InProcessResolver.stateWatcher";
4145
private final Storage flagStore;
4246
private final TriConsumer<ProviderEvent, ProviderEventDetails, Structure> onConnectionEvent;
4347
private final Operator operator;
4448
private final String scope;
4549
private final QueueSource queueSource;
50+
private final AtomicBoolean shutdown = new AtomicBoolean(false);
51+
private final AtomicReference<Thread> stateWatcher = new AtomicReference<>();
4652

4753
/**
4854
* Resolves flag values using
@@ -67,52 +73,54 @@ public InProcessResolver(
6773
*/
6874
public void init() throws Exception {
6975
flagStore.init();
70-
final Thread stateWatcher = new Thread(() -> {
71-
try {
72-
while (true) {
73-
final StorageStateChange storageStateChange =
74-
flagStore.getStateQueue().take();
75-
switch (storageStateChange.getStorageState()) {
76-
case OK:
77-
log.debug("onConnectionEvent.accept ProviderEvent.PROVIDER_CONFIGURATION_CHANGED");
78-
79-
var eventDetails = ProviderEventDetails.builder()
80-
.flagsChanged(storageStateChange.getChangedFlagsKeys())
81-
.message("configuration changed")
82-
.build();
83-
84-
onConnectionEvent.accept(
85-
ProviderEvent.PROVIDER_CONFIGURATION_CHANGED,
86-
eventDetails,
87-
storageStateChange.getSyncMetadata());
88-
89-
log.debug("post onConnectionEvent.accept ProviderEvent.PROVIDER_CONFIGURATION_CHANGED");
90-
break;
91-
case STALE:
92-
onConnectionEvent.accept(ProviderEvent.PROVIDER_ERROR, null, null);
93-
break;
94-
case ERROR:
95-
onConnectionEvent.accept(
96-
ProviderEvent.PROVIDER_ERROR,
97-
ProviderEventDetails.builder()
98-
.errorCode(ErrorCode.PROVIDER_FATAL)
99-
.build(),
100-
null);
101-
break;
102-
default:
103-
log.warn(String.format(
104-
"Storage emitted unhandled status: %s", storageStateChange.getStorageState()));
105-
}
106-
}
107-
} catch (InterruptedException e) {
108-
log.warn("Storage state watcher interrupted", e);
109-
Thread.currentThread().interrupt();
110-
}
111-
});
76+
final Thread stateWatcher = new Thread(this::stateWatcher, STATE_WATCHER_THREAD_NAME);
11277
stateWatcher.setDaemon(true);
78+
this.stateWatcher.set(stateWatcher);
11379
stateWatcher.start();
11480
}
11581

82+
private void stateWatcher() {
83+
try {
84+
while (!shutdown.get()) {
85+
final StorageStateChange storageStateChange =
86+
flagStore.getStateQueue().take();
87+
switch (storageStateChange.getStorageState()) {
88+
case OK:
89+
log.debug("onConnectionEvent.accept ProviderEvent.PROVIDER_CONFIGURATION_CHANGED");
90+
91+
var eventDetails = ProviderEventDetails.builder()
92+
.flagsChanged(storageStateChange.getChangedFlagsKeys())
93+
.message("configuration changed")
94+
.build();
95+
96+
onConnectionEvent.accept(
97+
ProviderEvent.PROVIDER_CONFIGURATION_CHANGED,
98+
eventDetails,
99+
storageStateChange.getSyncMetadata());
100+
101+
log.debug("post onConnectionEvent.accept ProviderEvent.PROVIDER_CONFIGURATION_CHANGED");
102+
break;
103+
case STALE:
104+
onConnectionEvent.accept(ProviderEvent.PROVIDER_ERROR, null, null);
105+
break;
106+
case ERROR:
107+
onConnectionEvent.accept(
108+
ProviderEvent.PROVIDER_ERROR,
109+
ProviderEventDetails.builder()
110+
.errorCode(ErrorCode.PROVIDER_FATAL)
111+
.build(),
112+
null);
113+
break;
114+
default:
115+
log.warn(String.format(
116+
"Storage emitted unhandled status: %s", storageStateChange.getStorageState()));
117+
}
118+
}
119+
} catch (InterruptedException e) {
120+
log.debug("Storage state watcher interrupted, most likely shutdown was invoked", e);
121+
}
122+
}
123+
116124
/**
117125
* Called when the provider enters error state after grace period.
118126
* Attempts to reinitialize the sync connector if enabled.
@@ -132,7 +140,17 @@ public void onError() {
132140
* @throws InterruptedException if stream can't be closed within deadline.
133141
*/
134142
public void shutdown() throws InterruptedException {
143+
if (!shutdown.compareAndSet(false, true)) {
144+
log.debug("Shutdown already in progress or completed");
145+
return;
146+
}
135147
flagStore.shutdown();
148+
stateWatcher.getAndUpdate(existing -> {
149+
if (existing != null && existing.isAlive()) {
150+
existing.interrupt();
151+
}
152+
return null;
153+
});
136154
}
137155

138156
/**

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolverTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.util.concurrent.BlockingQueue;
5353
import java.util.concurrent.LinkedBlockingQueue;
5454
import java.util.concurrent.TimeUnit;
55+
import org.awaitility.Awaitility;
5556
import org.junit.jupiter.api.Assertions;
5657
import org.junit.jupiter.api.Test;
5758

@@ -543,6 +544,41 @@ void flagSetMetadataIsOverwrittenByFlagMetadataToEvaluation() throws Exception {
543544
assertThat(providerEvaluation.getFlagMetadata().getString("key")).isEqualTo("expected");
544545
}
545546

547+
@Test
548+
void testStateWatcherThreadIsCleanedUpDuringShutdown() throws Exception {
549+
// given
550+
final Map<String, FeatureFlag> flagMap = new HashMap<>();
551+
flagMap.put("booleanFlag", BOOLEAN_FLAG);
552+
553+
var initialThreadCount = currentDaemonThreadCount();
554+
555+
var queue = new LinkedBlockingQueue<StorageStateChange>();
556+
InProcessResolver inProcessResolver =
557+
getInProcessResolverWith(new MockStorage(flagMap, queue), (event, details, metadata) -> {});
558+
559+
// when
560+
inProcessResolver.init();
561+
Thread stateWatcher = Thread.getAllStackTraces().keySet().stream()
562+
.filter(thread -> InProcessResolver.STATE_WATCHER_THREAD_NAME.equals(thread.getName()))
563+
.findFirst()
564+
.orElseThrow();
565+
var threadCountAfterInit = currentDaemonThreadCount();
566+
var stateWatcherWasStarted = stateWatcher.isAlive();
567+
inProcessResolver.shutdown();
568+
569+
// then
570+
assertThat(stateWatcherWasStarted).isTrue();
571+
assertThat(threadCountAfterInit).isGreaterThan(initialThreadCount);
572+
Awaitility.await().until(() -> !stateWatcher.isAlive());
573+
assertThat(currentDaemonThreadCount()).isEqualTo(initialThreadCount);
574+
}
575+
576+
private long currentDaemonThreadCount() {
577+
return Thread.getAllStackTraces().keySet().stream()
578+
.filter(Thread::isDaemon)
579+
.count();
580+
}
581+
546582
private InProcessResolver getInProcessResolverWith(final FlagdOptions options, final MockStorage storage)
547583
throws NoSuchFieldException, IllegalAccessException {
548584

0 commit comments

Comments
 (0)