Skip to content

Commit d5bdc20

Browse files
Copilotwuwen5
andauthored
Fix Reactor adapter async-context race in SentinelReactorSubscriber.currentContext (#1)
* Initial plan * fix(reactor): avoid async context race in currentContext and add regression test Agent-Logs-Url: https://github.com/wuwen5/Sentinel/sessions/1d7e9bbc-a643-4695-a906-b0575add87f0 Co-authored-by: wuwen5 <5037807+wuwen5@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: wuwen5 <5037807+wuwen5@users.noreply.github.com>
1 parent cf97bba commit d5bdc20

2 files changed

Lines changed: 87 additions & 1 deletion

File tree

sentinel-adapter/sentinel-reactor-adapter/src/main/java/com/alibaba/csp/sentinel/adapter/reactor/SentinelReactorSubscriber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public Context currentContext() {
6767
return actual.currentContext();
6868
}
6969
return actual.currentContext()
70-
.put(SentinelReactorConstants.SENTINEL_CONTEXT_KEY, currentEntry.getAsyncContext());
70+
.put(SentinelReactorConstants.SENTINEL_CONTEXT_KEY, sentinelContext);
7171
}
7272

7373
private void doWithContextOrCurrent(Supplier<Optional<com.alibaba.csp.sentinel.context.Context>> contextSupplier,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package com.alibaba.csp.sentinel.adapter.reactor;
2+
3+
import java.lang.reflect.Field;
4+
import java.util.concurrent.atomic.AtomicBoolean;
5+
import java.util.concurrent.atomic.AtomicReference;
6+
7+
import com.alibaba.csp.sentinel.AsyncEntry;
8+
9+
import org.junit.Test;
10+
import org.reactivestreams.Subscription;
11+
import reactor.core.CoreSubscriber;
12+
import reactor.util.context.Context;
13+
14+
import static org.junit.Assert.*;
15+
16+
public class SentinelReactorSubscriberTest {
17+
18+
@Test
19+
public void testCurrentContextShouldUseCapturedAsyncContextWhenExitHappensDuringContextRead() throws Exception {
20+
AtomicBoolean allowCancelInContextRead = new AtomicBoolean(false);
21+
AtomicBoolean canceledInContextRead = new AtomicBoolean(false);
22+
AtomicReference<SentinelReactorSubscriber<Integer>> subscriberRef = new AtomicReference<>();
23+
24+
CoreSubscriber<Integer> actual = new CoreSubscriber<Integer>() {
25+
@Override
26+
public Context currentContext() {
27+
SentinelReactorSubscriber<Integer> subscriber = subscriberRef.get();
28+
if (allowCancelInContextRead.get() && subscriber != null
29+
&& canceledInContextRead.compareAndSet(false, true)) {
30+
subscriber.cancel();
31+
}
32+
return Context.empty();
33+
}
34+
35+
@Override
36+
public void onSubscribe(Subscription s) {
37+
}
38+
39+
@Override
40+
public void onNext(Integer integer) {
41+
}
42+
43+
@Override
44+
public void onError(Throwable t) {
45+
fail("Unexpected error: " + t);
46+
}
47+
48+
@Override
49+
public void onComplete() {
50+
}
51+
};
52+
53+
SentinelReactorSubscriber<Integer> subscriber =
54+
new SentinelReactorSubscriber<>(new EntryConfig("testCurrentContextRace"), actual, false);
55+
subscriberRef.set(subscriber);
56+
subscriber.onSubscribe(new NoopSubscription());
57+
58+
AsyncEntry currentEntry = getCurrentEntry(subscriber);
59+
assertNotNull(currentEntry);
60+
com.alibaba.csp.sentinel.context.Context asyncContextBeforeExit = currentEntry.getAsyncContext();
61+
assertNotNull(asyncContextBeforeExit);
62+
63+
allowCancelInContextRead.set(true);
64+
Context reactorContext = subscriber.currentContext();
65+
66+
assertTrue(canceledInContextRead.get());
67+
assertSame(asyncContextBeforeExit, reactorContext.get(SentinelReactorConstants.SENTINEL_CONTEXT_KEY));
68+
assertNull(currentEntry.getAsyncContext());
69+
}
70+
71+
private static AsyncEntry getCurrentEntry(SentinelReactorSubscriber<?> subscriber) throws Exception {
72+
Field field = SentinelReactorSubscriber.class.getDeclaredField("currentEntry");
73+
field.setAccessible(true);
74+
return (AsyncEntry)field.get(subscriber);
75+
}
76+
77+
private static final class NoopSubscription implements Subscription {
78+
@Override
79+
public void request(long n) {
80+
}
81+
82+
@Override
83+
public void cancel() {
84+
}
85+
}
86+
}

0 commit comments

Comments
 (0)