Skip to content

Commit 1522341

Browse files
authored
feat: Add MCP (Model Context Protocol) base protocol implementation with Netty (#3039)
1 parent 0aeffc9 commit 1522341

File tree

60 files changed

+7149
-4
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+7149
-4
lines changed

core/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,12 @@
263263
<scope>provided</scope>
264264
<optional>true</optional>
265265
</dependency>
266+
267+
<dependency>
268+
<groupId>com.taobao.arthas</groupId>
269+
<artifactId>arthas-mcp-server</artifactId>
270+
<version>${project.version}</version>
271+
</dependency>
266272
</dependencies>
267273

268274
</project>
Lines changed: 319 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,319 @@
1+
package com.taobao.arthas.core.command;
2+
3+
import com.alibaba.fastjson2.JSON;
4+
import com.alibaba.fastjson2.JSONWriter;
5+
import com.taobao.arthas.core.command.model.ResultModel;
6+
import com.taobao.arthas.core.distribution.impl.PackingResultDistributorImpl;
7+
import com.taobao.arthas.core.shell.cli.CliToken;
8+
import com.taobao.arthas.core.shell.cli.CliTokens;
9+
import com.taobao.arthas.core.shell.cli.Completion;
10+
import com.taobao.arthas.core.shell.handlers.Handler;
11+
import com.taobao.arthas.core.shell.session.Session;
12+
import com.taobao.arthas.core.shell.session.SessionManager;
13+
import com.taobao.arthas.core.shell.system.Job;
14+
import com.taobao.arthas.core.shell.system.JobController;
15+
import com.taobao.arthas.core.shell.system.JobListener;
16+
import com.taobao.arthas.core.shell.system.impl.InternalCommandManager;
17+
import com.taobao.arthas.core.shell.term.SignalHandler;
18+
import com.taobao.arthas.core.shell.term.Term;
19+
import com.taobao.arthas.mcp.server.CommandExecutor;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
23+
import java.io.ByteArrayOutputStream;
24+
import java.io.UnsupportedEncodingException;
25+
import java.nio.charset.StandardCharsets;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.TreeMap;
29+
import java.util.concurrent.CountDownLatch;
30+
import java.util.concurrent.TimeUnit;
31+
32+
/**
33+
* 命令执行器,用于执行Arthas命令
34+
*/
35+
public class CommandExecutorImpl implements CommandExecutor {
36+
private static final Logger logger = LoggerFactory.getLogger(CommandExecutorImpl.class);
37+
38+
private final SessionManager sessionManager;
39+
private final JobController jobController;
40+
private final InternalCommandManager commandManager;
41+
42+
public CommandExecutorImpl(SessionManager sessionManager) {
43+
this.sessionManager = sessionManager;
44+
this.commandManager = sessionManager.getCommandManager();
45+
this.jobController = sessionManager.getJobController();
46+
}
47+
48+
/**
49+
* 同步执行命令
50+
* @param commandLine 命令行
51+
* @param timeout 超时时间(毫秒)
52+
* @return 命令执行结果
53+
*/
54+
@Override
55+
public Map<String, Object> execute(String commandLine, long timeout) {
56+
if (commandLine == null || commandLine.trim().isEmpty()) {
57+
logger.error("Command line is null or empty");
58+
return createErrorResult(commandLine, "Command line is null or empty");
59+
}
60+
61+
List<CliToken> tokens = CliTokens.tokenize(commandLine);
62+
if (tokens.isEmpty()) {
63+
logger.error("No command found in command line: {}", commandLine);
64+
return createErrorResult(commandLine, "No command found in command line");
65+
}
66+
67+
Map<String, Object> result = new TreeMap<>();
68+
Session session = null;
69+
PackingResultDistributorImpl resultDistributor = null;
70+
Job job = null;
71+
72+
try {
73+
session = sessionManager.createSession();
74+
if (session == null) {
75+
logger.error("Failed to create session for command: {}", commandLine);
76+
return createErrorResult(commandLine, "Failed to create session");
77+
}
78+
resultDistributor = new PackingResultDistributorImpl(session);
79+
InMemoryTerm term = new InMemoryTerm();
80+
term.setSession(session);
81+
82+
job = jobController.createJob(commandManager, tokens, session,
83+
new JobHandle(), term, resultDistributor);
84+
85+
if (job == null) {
86+
logger.error("Failed to create job for command: {}", commandLine);
87+
return createErrorResult(commandLine, "Failed to create job");
88+
}
89+
90+
job.run();
91+
92+
boolean finished = waitForJob(job, (int) timeout);
93+
if (!finished) {
94+
logger.warn("Command timeout after {} ms: {}", timeout, commandLine);
95+
job.interrupt();
96+
return createTimeoutResult(commandLine, timeout);
97+
}
98+
99+
result.put("command", commandLine);
100+
result.put("success", true);
101+
result.put("executionTime", System.currentTimeMillis());
102+
List<ResultModel> results = resultDistributor.getResults();
103+
104+
if (results != null && !results.isEmpty()) {
105+
result.put("results", results);
106+
result.put("resultCount", results.size());
107+
} else {
108+
result.put("results", results);
109+
result.put("resultCount", 0);
110+
}
111+
String termOutput = term.getOutput();
112+
if (termOutput != null && !termOutput.trim().isEmpty()) {
113+
result.put("terminalOutput", termOutput);
114+
}
115+
116+
logger.info("Command executed successfully: {}", commandLine);
117+
return result;
118+
119+
} catch (Exception e) {
120+
logger.error("Error executing command: {}", commandLine, e);
121+
return createErrorResult(commandLine, "Error executing command: " + e.getMessage());
122+
} finally {
123+
if (session != null) {
124+
try {
125+
sessionManager.removeSession(session.getSessionId());
126+
} catch (Exception e) {
127+
logger.warn("Error removing session", e);
128+
}
129+
}
130+
}
131+
}
132+
133+
private boolean waitForJob(Job job, int timeout) {
134+
long startTime = System.currentTimeMillis();
135+
while (true) {
136+
switch (job.status()) {
137+
case STOPPED:
138+
case TERMINATED:
139+
return true;
140+
}
141+
if (System.currentTimeMillis() - startTime > timeout) {
142+
return false;
143+
}
144+
try {
145+
Thread.sleep(100);
146+
} catch (InterruptedException e) {
147+
}
148+
}
149+
}
150+
151+
private Map<String, Object> createErrorResult(String commandLine, String errorMessage) {
152+
Map<String, Object> result = new TreeMap<>();
153+
result.put("command", commandLine);
154+
result.put("success", false);
155+
result.put("error", errorMessage);
156+
result.put("executionTime", System.currentTimeMillis());
157+
return result;
158+
}
159+
160+
/**
161+
* 创建超时结果
162+
*/
163+
private Map<String, Object> createTimeoutResult(String commandLine, long timeout) {
164+
Map<String, Object> result = new TreeMap<>();
165+
result.put("command", commandLine);
166+
result.put("success", false);
167+
result.put("error", "Command timeout after " + timeout + " ms");
168+
result.put("timeout", true);
169+
result.put("executionTime", System.currentTimeMillis());
170+
return result;
171+
}
172+
173+
private static class JobHandle implements JobListener {
174+
private final CountDownLatch latch = new CountDownLatch(1);
175+
176+
@Override
177+
public void onForeground(Job job) {
178+
}
179+
180+
@Override
181+
public void onBackground(Job job) {
182+
}
183+
184+
@Override
185+
public void onTerminated(Job job) {
186+
latch.countDown();
187+
}
188+
189+
@Override
190+
public void onSuspend(Job job) {
191+
}
192+
193+
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
194+
return latch.await(timeout, unit);
195+
}
196+
}
197+
198+
public static class InMemoryTerm implements Term {
199+
private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
200+
private Session session;
201+
private volatile boolean closed = false;
202+
203+
@Override
204+
public Term setSession(Session session) {
205+
this.session = session;
206+
return this;
207+
}
208+
209+
public Session getSession() {
210+
return session;
211+
}
212+
213+
@Override
214+
public void readline(String prompt, Handler<String> lineHandler) {
215+
}
216+
217+
@Override
218+
public void readline(String prompt, Handler<String> lineHandler, Handler<Completion> completionHandler) {
219+
}
220+
221+
@Override
222+
public Term closeHandler(Handler<Void> handler) {
223+
return this;
224+
}
225+
226+
@Override
227+
public long lastAccessedTime() {
228+
return System.currentTimeMillis();
229+
}
230+
231+
@Override
232+
public String type() {
233+
return "inmemory";
234+
}
235+
236+
@Override
237+
public int width() {
238+
return 120;
239+
}
240+
241+
@Override
242+
public int height() {
243+
return 40;
244+
}
245+
246+
@Override
247+
public Term resizehandler(Handler<Void> handler) {
248+
return this;
249+
}
250+
251+
@Override
252+
public Term stdinHandler(Handler<String> handler) {
253+
return this;
254+
}
255+
256+
@Override
257+
public Term stdoutHandler(io.termd.core.function.Function<String, String> handler) {
258+
return this;
259+
}
260+
261+
@Override
262+
public synchronized Term write(String data) {
263+
if (closed) {
264+
return this;
265+
}
266+
267+
try {
268+
if (data != null) {
269+
outputStream.write(data.getBytes(StandardCharsets.UTF_8));
270+
}
271+
} catch (Exception e) {
272+
logger.error("Error writing to terminal", e);
273+
}
274+
return this;
275+
}
276+
277+
@Override
278+
public Term interruptHandler(SignalHandler handler) {
279+
return this;
280+
}
281+
282+
@Override
283+
public Term suspendHandler(SignalHandler handler) {
284+
return this;
285+
}
286+
287+
@Override
288+
public synchronized void close() {
289+
closed = true;
290+
try {
291+
outputStream.close();
292+
} catch (Exception e) {
293+
logger.error("Error closing output stream", e);
294+
}
295+
}
296+
297+
@Override
298+
public Term echo(String text) {
299+
return write(text);
300+
}
301+
302+
public synchronized String getOutput() {
303+
try {
304+
return outputStream.toString(StandardCharsets.UTF_8.name());
305+
} catch (UnsupportedEncodingException e) {
306+
logger.error("Error getting output", e);
307+
return "";
308+
}
309+
}
310+
311+
public synchronized void clearOutput() {
312+
outputStream.reset();
313+
}
314+
315+
public boolean isClosed() {
316+
return closed;
317+
}
318+
}
319+
}

core/src/main/java/com/taobao/arthas/core/server/ArthasBootstrap.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import com.taobao.arthas.core.advisor.Enhancer;
4444
import com.taobao.arthas.core.advisor.TransformerManager;
4545
import com.taobao.arthas.core.command.BuiltinCommandPack;
46+
import com.taobao.arthas.core.command.CommandExecutorImpl;
4647
import com.taobao.arthas.core.command.view.ResultViewResolver;
4748
import com.taobao.arthas.core.config.BinderUtils;
4849
import com.taobao.arthas.core.config.Configure;
@@ -77,6 +78,10 @@
7778
import com.taobao.arthas.core.util.affect.EnhancerAffect;
7879
import com.taobao.arthas.core.util.matcher.WildcardMatcher;
7980

81+
import com.taobao.arthas.mcp.server.ArthasMcpBootstrap;
82+
import com.taobao.arthas.mcp.server.CommandExecutor;
83+
import com.taobao.arthas.mcp.server.protocol.server.handler.McpRequestHandler;
84+
import com.taobao.arthas.mcp.server.protocol.spec.McpServerTransportProvider;
8085
import io.netty.channel.ChannelFuture;
8186
import io.netty.channel.nio.NioEventLoopGroup;
8287
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -125,6 +130,8 @@ public class ArthasBootstrap {
125130

126131
private HttpApiHandler httpApiHandler;
127132

133+
private McpRequestHandler mcpRequestHandler;
134+
128135
private HttpSessionManager httpSessionManager;
129136
private SecurityAuthenticator securityAuthenticator;
130137

@@ -462,6 +469,11 @@ private void bind(Configure configure) throws Throwable {
462469
//http api handler
463470
httpApiHandler = new HttpApiHandler(historyManager, sessionManager);
464471

472+
// Mcp Server
473+
CommandExecutor commandExecutor = new CommandExecutorImpl(sessionManager);
474+
ArthasMcpBootstrap arthasMcpBootstrap = new ArthasMcpBootstrap(commandExecutor);
475+
this.mcpRequestHandler = arthasMcpBootstrap.start().getMcpRequestHandler();
476+
465477
logger().info("as-server listening on network={};telnet={};http={};timeout={};", configure.getIp(),
466478
configure.getTelnetPort(), configure.getHttpPort(), options.getConnectionTimeout());
467479

@@ -671,6 +683,10 @@ public HttpApiHandler getHttpApiHandler() {
671683
return httpApiHandler;
672684
}
673685

686+
public McpRequestHandler getMcpRequestHandler() {
687+
return mcpRequestHandler;
688+
}
689+
674690
public File getOutputPath() {
675691
return outputPath;
676692
}

0 commit comments

Comments
 (0)