Skip to content

Commit c0962e3

Browse files
wl2027hengyunabc
andauthored
fix: concurrent output interleaved (#3101) (#3133)
* fix: concurrent output interleaved (#3101) * fix: restore watch output format --------- Co-authored-by: hengyunabc <hengyunabc@gmail.com>
1 parent 175fdb0 commit c0962e3

File tree

3 files changed

+282
-3
lines changed

3 files changed

+282
-3
lines changed

core/src/main/java/com/taobao/arthas/core/command/view/WatchView.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,13 @@ public void draw(CommandProcess process, WatchModel model) {
1919
ObjectVO objectVO = model.getValue();
2020
String result = StringUtils.objectToString(
2121
objectVO.needExpand() ? new ObjectView(model.getSizeLimit(), objectVO).draw() : objectVO.getObject());
22-
process.write("method=" + model.getClassName() + "." + model.getMethodName() + " location=" + model.getAccessPoint() + "\n");
23-
process.write("ts=" + DateUtils.formatDateTime(model.getTs()) + "; [cost=" + model.getCost() + "ms] result=" + result + "\n");
22+
23+
StringBuilder sb = new StringBuilder();
24+
sb.append("method=").append(model.getClassName()).append(".").append(model.getMethodName())
25+
.append(" location=").append(model.getAccessPoint()).append("\n");
26+
sb.append("ts=").append(DateUtils.formatDateTime(model.getTs()))
27+
.append("; [cost=").append(model.getCost()).append("ms] result=").append(result).append("\n");
28+
29+
process.write(sb.toString());
2430
}
2531
}

core/src/main/java/com/taobao/arthas/core/distribution/impl/TermResultDistributorImpl.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ public class TermResultDistributorImpl implements ResultDistributor {
1616
private final CommandProcess commandProcess;
1717
private final ResultViewResolver resultViewResolver;
1818

19+
private final Object outputLock = new Object();
20+
1921
public TermResultDistributorImpl(CommandProcess commandProcess, ResultViewResolver resultViewResolver) {
2022
this.commandProcess = commandProcess;
2123
this.resultViewResolver = resultViewResolver;
@@ -25,7 +27,9 @@ public TermResultDistributorImpl(CommandProcess commandProcess, ResultViewResolv
2527
public void appendResult(ResultModel model) {
2628
ResultView resultView = resultViewResolver.getResultView(model);
2729
if (resultView != null) {
28-
resultView.draw(commandProcess, model);
30+
synchronized (outputLock) {
31+
resultView.draw(commandProcess, model);
32+
}
2933
}
3034
}
3135

Lines changed: 269 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
package com.taobao.arthas.core.distribution;
2+
3+
import com.taobao.arthas.core.advisor.AdviceListener;
4+
import com.taobao.arthas.core.command.model.ObjectVO;
5+
import com.taobao.arthas.core.command.model.ResultModel;
6+
import com.taobao.arthas.core.command.model.WatchModel;
7+
import com.taobao.arthas.core.command.view.ResultView;
8+
import com.taobao.arthas.core.command.view.ResultViewResolver;
9+
import com.taobao.arthas.core.command.view.WatchView;
10+
import com.taobao.arthas.core.distribution.impl.TermResultDistributorImpl;
11+
import com.taobao.arthas.core.shell.cli.CliToken;
12+
import com.taobao.arthas.core.shell.command.CommandProcess;
13+
import com.taobao.arthas.core.shell.handlers.Handler;
14+
import com.taobao.arthas.core.shell.session.Session;
15+
import com.taobao.middleware.cli.CommandLine;
16+
import org.junit.Assert;
17+
import org.junit.Test;
18+
19+
import java.lang.instrument.ClassFileTransformer;
20+
import java.time.LocalDateTime;
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
import java.util.concurrent.CountDownLatch;
24+
import java.util.concurrent.ExecutorService;
25+
import java.util.concurrent.Executors;
26+
import java.util.concurrent.atomic.AtomicInteger;
27+
import java.util.regex.Matcher;
28+
import java.util.regex.Pattern;
29+
30+
public class TermResultDistributorImplTest {
31+
32+
@Test
33+
public void testConcurrentOutputNotInterleaved() throws InterruptedException {
34+
final StringBuilder outputCollector = new StringBuilder();
35+
36+
CommandProcess mockProcess = new MockCommandProcess(outputCollector);
37+
38+
ResultViewResolver resolver = new ResultViewResolver() {
39+
@Override
40+
public ResultView getResultView(ResultModel model) {
41+
if (model instanceof WatchModel) {
42+
return new WatchView();
43+
}
44+
return null;
45+
}
46+
};
47+
48+
TermResultDistributorImpl distributor = new TermResultDistributorImpl(mockProcess, resolver);
49+
50+
int threadCount = 50;
51+
int outputPerThread = 100;
52+
53+
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
54+
CountDownLatch startLatch = new CountDownLatch(1);
55+
CountDownLatch endLatch = new CountDownLatch(threadCount);
56+
57+
for (int t = 0; t < threadCount; t++) {
58+
final int threadId = t;
59+
executor.submit(() -> {
60+
try {
61+
startLatch.await();
62+
63+
for (int i = 0; i < outputPerThread; i++) {
64+
WatchModel model = new WatchModel();
65+
model.setTs(LocalDateTime.now());
66+
model.setCost(1.5);
67+
model.setClassName("TestClass");
68+
model.setMethodName("testMethod");
69+
model.setAccessPoint("AtExit");
70+
71+
List<String> params = new ArrayList<>();
72+
params.add("Thread-" + threadId + "-Item-" + i + "-A");
73+
params.add("Thread-" + threadId + "-Item-" + i + "-B");
74+
params.add("Thread-" + threadId + "-Item-" + i + "-C");
75+
76+
model.setValue(new ObjectVO(params, 2));
77+
model.setSizeLimit(10 * 1024 * 1024);
78+
79+
distributor.appendResult(model);
80+
}
81+
} catch (InterruptedException e) {
82+
Thread.currentThread().interrupt();
83+
} finally {
84+
endLatch.countDown();
85+
}
86+
});
87+
}
88+
89+
startLatch.countDown();
90+
91+
endLatch.await();
92+
executor.shutdown();
93+
94+
String output = outputCollector.toString();
95+
96+
Pattern blockPattern = Pattern.compile(
97+
"method=TestClass\\.testMethod location=AtExit\\n" +
98+
"ts=[^;]+; \\[cost=[\\d.]+ms\\] result=@ArrayList\\[\\n" +
99+
" @String\\[Thread-\\d+-Item-\\d+-A\\],\\n" +
100+
" @String\\[Thread-\\d+-Item-\\d+-B\\],\\n" +
101+
" @String\\[Thread-\\d+-Item-\\d+-C\\],\\n" +
102+
"\\]\\n"
103+
);
104+
105+
Matcher matcher = blockPattern.matcher(output);
106+
int completeBlockCount = 0;
107+
while (matcher.find()) {
108+
completeBlockCount++;
109+
}
110+
111+
int expectedBlockCount = threadCount * outputPerThread;
112+
113+
Assert.assertEquals("All output blocks should be complete and not interleaved",
114+
expectedBlockCount, completeBlockCount);
115+
116+
Pattern interleavedPattern = Pattern.compile("Thread-\\d+-Item-\\d+-[ABC]\\],\\nmethod=");
117+
Matcher interleavedMatcher = interleavedPattern.matcher(output);
118+
Assert.assertFalse("Output should not be interleaved between different results",
119+
interleavedMatcher.find());
120+
}
121+
122+
private static class MockCommandProcess implements CommandProcess {
123+
private final StringBuilder outputCollector;
124+
private final AtomicInteger times = new AtomicInteger();
125+
126+
public MockCommandProcess(StringBuilder outputCollector) {
127+
this.outputCollector = outputCollector;
128+
}
129+
130+
@Override
131+
public CommandProcess write(String data) {
132+
synchronized (outputCollector) {
133+
outputCollector.append(data);
134+
}
135+
return this;
136+
}
137+
138+
@Override
139+
public List<CliToken> argsTokens() {
140+
return null;
141+
}
142+
143+
@Override
144+
public List<String> args() {
145+
return null;
146+
}
147+
148+
@Override
149+
public CommandLine commandLine() {
150+
return null;
151+
}
152+
153+
@Override
154+
public Session session() {
155+
return null;
156+
}
157+
158+
@Override
159+
public boolean isForeground() {
160+
return true;
161+
}
162+
163+
@Override
164+
public CommandProcess stdinHandler(Handler<String> handler) {
165+
return this;
166+
}
167+
168+
@Override
169+
public CommandProcess interruptHandler(Handler<Void> handler) {
170+
return this;
171+
}
172+
173+
@Override
174+
public CommandProcess suspendHandler(Handler<Void> handler) {
175+
return this;
176+
}
177+
178+
@Override
179+
public CommandProcess resumeHandler(Handler<Void> handler) {
180+
return this;
181+
}
182+
183+
@Override
184+
public CommandProcess endHandler(Handler<Void> handler) {
185+
return this;
186+
}
187+
188+
@Override
189+
public CommandProcess backgroundHandler(Handler<Void> handler) {
190+
return this;
191+
}
192+
193+
@Override
194+
public CommandProcess foregroundHandler(Handler<Void> handler) {
195+
return this;
196+
}
197+
198+
@Override
199+
public CommandProcess resizehandler(Handler<Void> handler) {
200+
return this;
201+
}
202+
203+
@Override
204+
public void end() {
205+
}
206+
207+
@Override
208+
public void end(int status) {
209+
}
210+
211+
@Override
212+
public void end(int status, String message) {
213+
}
214+
215+
@Override
216+
public void register(AdviceListener listener, ClassFileTransformer transformer) {
217+
}
218+
219+
@Override
220+
public void unregister() {
221+
}
222+
223+
@Override
224+
public AtomicInteger times() {
225+
return times;
226+
}
227+
228+
@Override
229+
public void resume() {
230+
}
231+
232+
@Override
233+
public void suspend() {
234+
}
235+
236+
@Override
237+
public void echoTips(String tips) {
238+
}
239+
240+
@Override
241+
public String cacheLocation() {
242+
return null;
243+
}
244+
245+
@Override
246+
public boolean isRunning() {
247+
return true;
248+
}
249+
250+
@Override
251+
public void appendResult(ResultModel result) {
252+
}
253+
254+
@Override
255+
public String type() {
256+
return "test";
257+
}
258+
259+
@Override
260+
public int width() {
261+
return 80;
262+
}
263+
264+
@Override
265+
public int height() {
266+
return 24;
267+
}
268+
}
269+
}

0 commit comments

Comments
 (0)