Skip to content

Commit b1b2700

Browse files
authored
[NA] [BE] Prevent nested cache deadlocks (#4404)
1 parent bc3b85c commit b1b2700

File tree

9 files changed

+629
-242
lines changed

9 files changed

+629
-242
lines changed

apps/opik-backend/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,11 @@
313313
<artifactId>tika-core</artifactId>
314314
<version>3.2.0</version>
315315
</dependency>
316+
<dependency>
317+
<groupId>io.vavr</groupId>
318+
<artifactId>vavr</artifactId>
319+
<version>0.10.7</version>
320+
</dependency>
316321

317322
<!-- Test -->
318323

apps/opik-backend/src/main/java/com/comet/opik/infrastructure/cache/CacheInterceptor.java

Lines changed: 86 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.fasterxml.jackson.core.type.TypeReference;
66
import com.fasterxml.jackson.databind.JavaType;
77
import com.fasterxml.jackson.databind.type.TypeFactory;
8+
import io.vavr.CheckedFunction2;
89
import jakarta.inject.Provider;
910
import lombok.NonNull;
1011
import lombok.RequiredArgsConstructor;
@@ -25,7 +26,6 @@
2526
import java.util.List;
2627
import java.util.Map;
2728
import java.util.Objects;
28-
import java.util.function.BiFunction;
2929
import java.util.function.Function;
3030
import java.util.stream.Stream;
3131

@@ -49,113 +49,113 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
4949

5050
var cacheable = method.getAnnotation(Cacheable.class);
5151
if (cacheable != null) {
52-
return runCacheAwareAction(invocation, isReactive, cacheable.name(), cacheable.key(),
52+
return runCacheAwareAction(invocation, cacheable.name(), cacheable.key(),
5353
(key, group) -> processCacheableMethod(invocation, isReactive, key, group, cacheable));
5454
}
5555

5656
var cachePut = method.getAnnotation(CachePut.class);
5757
if (cachePut != null) {
58-
return runCacheAwareAction(invocation, isReactive, cachePut.name(), cachePut.key(),
58+
return runCacheAwareAction(invocation, cachePut.name(), cachePut.key(),
5959
(key, group) -> processCachePutMethod(invocation, isReactive, key, group));
6060
}
6161

6262
var cacheEvict = method.getAnnotation(CacheEvict.class);
6363
if (cacheEvict != null) {
64-
return runCacheAwareAction(invocation, isReactive, cacheEvict.name(), cacheEvict.key(),
64+
return runCacheAwareAction(invocation, cacheEvict.name(), cacheEvict.key(),
6565
(key, group) -> processCacheEvictMethod(invocation, isReactive, key, cacheEvict));
6666
}
6767

6868
return invocation.proceed();
6969
}
7070

71-
private Object runCacheAwareAction(MethodInvocation invocation, boolean isReactive, String group, String keyAgs,
72-
BiFunction<String, String, Object> action) throws Throwable {
73-
71+
private Object runCacheAwareAction(
72+
MethodInvocation invocation, String group, String keyArgs, CheckedFunction2<String, String, Object> action)
73+
throws Throwable {
7474
String key;
75-
7675
try {
77-
key = getKeyName(group, keyAgs, invocation);
76+
key = getKeyName(group, keyArgs, invocation);
7877
} catch (Exception e) {
7978
// If there is an error evaluating the key, proceed without caching
80-
log.error("Error evaluating key expression: {}", keyAgs, e);
81-
log.warn("Cache will be skipped due to error evaluating key expression");
79+
log.warn("Cache will be skipped due to error evaluating key expression '{}'", keyArgs, e);
8280
return invocation.proceed();
8381
}
84-
85-
if (isReactive) {
86-
return action.apply(key, group);
87-
}
88-
89-
return ((Mono<?>) action.apply(key, group)).block();
82+
return action.apply(key, group);
9083
}
9184

92-
private Mono<Object> processCacheEvictMethod(MethodInvocation invocation, boolean isReactive, String key,
93-
CacheEvict cacheEvict) {
85+
private Object processCacheEvictMethod(
86+
MethodInvocation invocation, boolean isReactive, String key, CacheEvict cacheEvict) throws Throwable {
9487
if (isReactive) {
9588
try {
9689
return ((Mono<?>) invocation.proceed())
9790
.flatMap(value -> cacheManager.get().evict(key, cacheEvict.keyUsesPatternMatching())
98-
.thenReturn(value))
91+
.thenReturn(value)
92+
.onErrorResume(exception -> {
93+
log.error("Error evicting cache", exception);
94+
return Mono.just(value); // Return value even if evict fails
95+
}))
9996
.switchIfEmpty(
100-
cacheManager.get().evict(key, cacheEvict.keyUsesPatternMatching()).then(Mono.empty()))
97+
cacheManager.get().evict(key, cacheEvict.keyUsesPatternMatching())
98+
.onErrorResume(exception -> {
99+
log.error("Error evicting cache", exception);
100+
return Mono.empty();
101+
})
102+
.then(Mono.empty()))
101103
.map(Function.identity());
102104
} catch (Throwable e) {
103105
return Mono.error(e);
104106
}
105107
} else {
108+
var value = invocation.proceed();
106109
try {
107-
var value = invocation.proceed();
108-
if (value == null) {
109-
return cacheManager.get().evict(key, cacheEvict.keyUsesPatternMatching()).then(Mono.empty());
110-
}
111-
return cacheManager.get().evict(key, cacheEvict.keyUsesPatternMatching()).thenReturn(value);
112-
} catch (Throwable e) {
113-
return Mono.error(e);
110+
// Evict cache asynchronously to avoid blocking the execution thread. Makes cache eventual consistent
111+
cacheManager.get().evictAsync(key, cacheEvict.keyUsesPatternMatching());
112+
} catch (RuntimeException exception) {
113+
log.error("Error evicting async cache", exception);
114114
}
115+
return value;
115116
}
116117
}
117118

118-
private Mono<Object> processCachePutMethod(MethodInvocation invocation, boolean isReactive, String key,
119-
String group) {
119+
private Object processCachePutMethod(
120+
MethodInvocation invocation, boolean isReactive, String key, String group) throws Throwable {
120121
if (isReactive) {
121122
try {
122123
return ((Mono<?>) invocation.proceed()).flatMap(value -> cachePut(value, key, group));
123124
} catch (Throwable e) {
124125
return Mono.error(e);
125126
}
126127
} else {
127-
try {
128-
var value = invocation.proceed();
129-
return cachePut(value, key, group).thenReturn(value);
130-
} catch (Throwable e) {
131-
return Mono.error(e);
132-
}
128+
return processSyncCacheMiss(invocation, key, group);
133129
}
134130
}
135131

136-
private Object processCacheableMethod(MethodInvocation invocation, boolean isReactive, String key,
137-
String group, Cacheable cacheable) {
138-
132+
private Object processCacheableMethod(
133+
MethodInvocation invocation, boolean isReactive, String key, String group, Cacheable cacheable)
134+
throws Throwable {
139135
if (isReactive) {
140-
141136
if (invocation.getMethod().getReturnType().isAssignableFrom(Mono.class)) {
142137
return handleMono(invocation, key, group, cacheable);
143138
} else {
144139
return handleFlux(invocation, key, group, cacheable);
145140
}
146141
} else {
147-
148-
if (cacheable.wrapperType() != Object.class) {
149-
TypeReference typeReference = TypeReferenceUtils.forTypes(cacheable.wrapperType(),
150-
cacheable.returnType());
151-
152-
return cacheManager.get().get(key, typeReference)
153-
.switchIfEmpty(processSyncCacheMiss(invocation, key, group));
142+
Object cachedValue;
143+
try {
144+
if (cacheable.wrapperType() != Object.class) {
145+
var typeReference = TypeReferenceUtils.forTypes(
146+
cacheable.wrapperType(), cacheable.returnType());
147+
cachedValue = cacheManager.get().getSync(key, typeReference);
148+
} else {
149+
cachedValue = cacheManager.get().getSync(key, invocation.getMethod().getReturnType());
150+
}
151+
} catch (RuntimeException exception) {
152+
log.error("Error getting value synchronously from cache", exception);
153+
cachedValue = null; // Treat as cache miss
154154
}
155-
156-
return cacheManager.get().get(key, invocation.getMethod().getReturnType())
157-
.map(Object.class::cast)
158-
.switchIfEmpty(processSyncCacheMiss(invocation, key, group));
155+
if (cachedValue != null) {
156+
return cachedValue;
157+
}
158+
return processSyncCacheMiss(invocation, key, group);
159159
}
160160
}
161161

@@ -189,6 +189,10 @@ private Flux<Object> getFromCacheOrCallMethod(MethodInvocation invocation, Strin
189189
TypeReference<List<?>> collectionType) {
190190
return cacheManager.get()
191191
.get(key, collectionType)
192+
.onErrorResume(exception -> {
193+
log.error("Error getting value from cache", exception);
194+
return Mono.empty(); // Treat as cache miss
195+
})
192196
.map(Collection.class::cast)
193197
.flatMapMany(Flux::fromIterable)
194198
.switchIfEmpty(processFluxCacheMiss(invocation, key, group));
@@ -200,22 +204,26 @@ private Mono<Object> handleMono(MethodInvocation invocation, String key, String
200204
cacheable.returnType());
201205

202206
return cacheManager.get().get(key, typeReference)
207+
.onErrorResume(exception -> {
208+
log.error("Error getting value from cache", exception);
209+
return Mono.empty(); // Treat as cache miss
210+
})
203211
.switchIfEmpty(processCacheMiss(invocation, key, group));
204212
}
205213

206214
return cacheManager.get().get(key, cacheable.returnType())
215+
.onErrorResume(exception -> {
216+
log.error("Error getting value from cache", exception);
217+
return Mono.empty(); // Treat as cache miss
218+
})
207219
.map(Object.class::cast)
208220
.switchIfEmpty(processCacheMiss(invocation, key, group));
209221
}
210222

211-
private Mono<Object> processSyncCacheMiss(MethodInvocation invocation, String key, String group) {
212-
return Mono.defer(() -> {
213-
try {
214-
return Mono.just(invocation.proceed());
215-
} catch (Throwable e) {
216-
return Mono.error(e);
217-
}
218-
}).flatMap(value -> cachePut(value, key, group));
223+
private Object processSyncCacheMiss(MethodInvocation invocation, String key, String group) throws Throwable {
224+
var value = invocation.proceed();
225+
cachePutAsync(value, key, group);
226+
return value;
219227
}
220228

221229
private Mono<Object> processCacheMiss(MethodInvocation invocation, String key, String group) {
@@ -239,11 +247,9 @@ private Flux<Object> processFluxCacheMiss(MethodInvocation invocation, String ke
239247
.flatMap(value -> cachePut(value, key, group));
240248

241249
return flux
242-
.doOnSubscribe(subscription -> Schedulers.boundedElastic().schedule(() -> {
243-
cacheable.subscribe(
244-
__ -> log.info("Flux value put in cache"),
245-
e -> log.error("Error putting flux value in cache", e));
246-
}));
250+
.doOnSubscribe(subscription -> Schedulers.boundedElastic().schedule(() -> cacheable.subscribe(
251+
__ -> log.debug("Flux value put in cache"),
252+
e -> log.error("Error putting flux value in cache", e))));
247253
} catch (Throwable e) {
248254
return Flux.error(e);
249255
}
@@ -261,10 +267,25 @@ private Mono<Object> cachePut(Object value, String key, String group) {
261267
});
262268
}
263269

270+
private void cachePutAsync(Object value, String key, String group) {
271+
// Methods returning null values are not cached
272+
if (value == null) {
273+
return;
274+
}
275+
try {
276+
var ttlDuration = cacheConfiguration.getCaches()
277+
.getOrDefault(group, cacheConfiguration.getDefaultDuration());
278+
// Set cache asynchronously to avoid blocking the execution thread. Makes cache eventual consistent
279+
cacheManager.get().putAsync(key, value, ttlDuration);
280+
} catch (RuntimeException exception) {
281+
log.error("Error putting async value in cache", exception);
282+
}
283+
}
284+
264285
private String getKeyName(String name, String key, MethodInvocation invocation) {
265286
Map<String, Object> params = new HashMap<>();
266287

267-
// Use Paranamer to resolve parameter names
288+
// Use Parameter to resolve parameter names
268289
Parameter[] parameters = invocation.getMethod().getParameters();
269290
Object[] args = invocation.getArguments();
270291

@@ -281,5 +302,4 @@ private String getKeyName(String name, String key, MethodInvocation invocation)
281302
}
282303
return "%s:-%s".formatted(name, evaluatedKey);
283304
}
284-
285305
}

apps/opik-backend/src/main/java/com/comet/opik/infrastructure/cache/CacheManager.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,35 @@
55
import reactor.core.publisher.Mono;
66

77
import java.time.Duration;
8+
import java.util.concurrent.CompletionStage;
89

10+
/**
11+
* Cache management interface providing both reactive and non-reactive APIs.
12+
*
13+
* <p><b>API Types:</b></p>
14+
* <ul>
15+
* <li><b>Reactive API</b>: Methods without {@code Sync} or {@code Async} suffix (e.g., {@code get()}, {@code put()})
16+
* are designed for use with reactive methods that return {@code Mono} or {@code Flux}.</li>
17+
* <li><b>Non-Reactive API</b>: Methods with {@code Sync} or {@code Async} suffix are designed for non-reactive methods:
18+
* <ul>
19+
* <li><b>Synchronous</b>: Methods with {@code Sync} suffix (e.g., {@code getSync()}) block until the result is available.</li>
20+
* <li><b>Asynchronous</b>: Methods with {@code Async} suffix (e.g., {@code putAsync()}) return {@link CompletionStage} for non-blocking operations without reactiveness.</li>
21+
* </ul>
22+
* </li>
23+
* </ul>
24+
*/
925
public interface CacheManager {
1026

27+
// Reactive API (for reactive methods returning Mono/Flux)
1128
Mono<Boolean> evict(@NonNull String key, boolean usePatternMatching);
1229
Mono<Boolean> put(@NonNull String key, @NonNull Object value, @NonNull Duration ttlDuration);
1330
<T> Mono<T> get(@NonNull String key, @NonNull Class<T> clazz);
1431
<T> Mono<T> get(@NonNull String key, @NonNull TypeReference<T> clazz);
1532
Mono<Boolean> contains(@NonNull String key);
1633

34+
// Non-reactive API (both synchronous and asynchronous methods without reactiveness)
35+
CompletionStage<Boolean> evictAsync(String key, boolean usePatternMatching);
36+
CompletionStage<Void> putAsync(String key, Object value, Duration ttlDuration);
37+
<T> T getSync(String key, Class<T> clazz);
38+
<T> T getSync(String key, TypeReference<T> clazz);
1739
}

apps/opik-backend/src/main/java/com/comet/opik/infrastructure/redis/RedisCacheManager.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,21 @@
66
import lombok.NonNull;
77
import lombok.RequiredArgsConstructor;
88
import org.apache.commons.lang3.StringUtils;
9+
import org.redisson.api.RedissonClient;
910
import org.redisson.api.RedissonReactiveClient;
1011
import reactor.core.publisher.Mono;
1112
import reactor.core.scheduler.Schedulers;
1213

1314
import java.time.Duration;
15+
import java.util.concurrent.CompletionStage;
1416

1517
@RequiredArgsConstructor
1618
class RedisCacheManager implements CacheManager {
1719

1820
private final @NonNull RedissonReactiveClient redisClient;
21+
private final @NonNull RedissonClient nonReactiveRedisClient;
22+
23+
// Reactive API (for reactive methods returning Mono/Flux)
1924

2025
public Mono<Boolean> evict(@NonNull String key, boolean usePatternMatching) {
2126
if (usePatternMatching) {
@@ -51,4 +56,39 @@ public <T> Mono<T> get(@NonNull String key, @NonNull TypeReference<T> clazz) {
5156
public Mono<Boolean> contains(@NonNull String key) {
5257
return redisClient.getBucket(key).isExists();
5358
}
59+
60+
// Non-reactive API (both synchronous and asynchronous methods without reactiveness)
61+
62+
@Override
63+
public CompletionStage<Boolean> evictAsync(@NonNull String key, boolean usePatternMatching) {
64+
if (usePatternMatching) {
65+
return nonReactiveRedisClient.getKeys().deleteByPatternAsync(key)
66+
.thenApplyAsync(count -> count > 0);
67+
}
68+
return nonReactiveRedisClient.getBucket(key).deleteAsync();
69+
}
70+
71+
@Override
72+
public CompletionStage<Void> putAsync(@NonNull String key, @NonNull Object value, @NonNull Duration ttlDuration) {
73+
var json = JsonUtils.writeValueAsString(value);
74+
return nonReactiveRedisClient.getBucket(key).setAsync(json, ttlDuration);
75+
}
76+
77+
@Override
78+
public <T> T getSync(@NonNull String key, @NonNull Class<T> clazz) {
79+
var json = nonReactiveRedisClient.<String>getBucket(key).get();
80+
if (StringUtils.isEmpty(json)) {
81+
return null;
82+
}
83+
return JsonUtils.readValue(json, clazz);
84+
}
85+
86+
@Override
87+
public <T> T getSync(@NonNull String key, @NonNull TypeReference<T> typeReference) {
88+
var json = nonReactiveRedisClient.<String>getBucket(key).get();
89+
if (StringUtils.isEmpty(json)) {
90+
return null;
91+
}
92+
return JsonUtils.readValue(json, typeReference);
93+
}
5494
}

0 commit comments

Comments
 (0)