@@ -22,6 +22,9 @@ public abstract class AbstractArthasTool {
2222 protected final Logger logger = LoggerFactory .getLogger (this .getClass ());
2323
2424 public static final int DEFAULT_TIMEOUT_SECONDS = (int ) (StreamableToolUtils .DEFAULT_TIMEOUT_MS / 1000 );
25+
26+ private static final long DEFAULT_ASYNC_START_RETRY_INTERVAL_MS = 100L ;
27+ private static final long DEFAULT_ASYNC_START_MAX_WAIT_MS = 3000L ;
2528
2629 /**
2730 * 工具执行上下文,包含所有必要的上下文信息
@@ -112,8 +115,9 @@ protected String executeStreamable(ToolContext toolContext, String commandStr,
112115 Integer expectedResultCount , Integer pollIntervalMs ,
113116 Integer timeoutMs ,
114117 String successMessage ) {
118+ ToolExecutionContext execContext = null ;
115119 try {
116- ToolExecutionContext execContext = new ToolExecutionContext (toolContext , true );
120+ execContext = new ToolExecutionContext (toolContext , true );
117121
118122 logger .info ("Starting streamable execution: {}" , commandStr );
119123
@@ -122,7 +126,11 @@ protected String executeStreamable(ToolContext toolContext, String commandStr,
122126 execContext .getCommandContext ().setSessionUserId (execContext .getUserId ());
123127 }
124128
125- Map <String , Object > asyncResult = execContext .getCommandContext ().executeAsync (commandStr );
129+ Map <String , Object > asyncResult = executeAsyncWithRetry (execContext , commandStr , timeoutMs );
130+ if (!isAsyncExecutionStarted (asyncResult )) {
131+ String errorMessage = asyncResult != null ? String .valueOf (asyncResult .get ("error" )) : "unknown error" ;
132+ return JsonParser .toJson (createErrorResponse ("Failed to start command: " + errorMessage ));
133+ }
126134 logger .debug ("Async execution started: {}" , asyncResult );
127135
128136 Map <String , Object > results = executeAndCollectResults (
@@ -154,7 +162,74 @@ protected String executeStreamable(ToolContext toolContext, String commandStr,
154162 } catch (Exception e ) {
155163 logger .error ("Error executing streamable command: {}" , commandStr , e );
156164 return JsonParser .toJson (createErrorResponse ("Error executing command: " + e .getMessage ()));
165+ } finally {
166+ if (execContext != null ) {
167+ try {
168+ // 确保前台任务被及时释放,避免占用 session 影响后续 streamable 工具执行
169+ execContext .getCommandContext ().interruptJob ();
170+ } catch (Exception ignored ) {
171+ }
172+ }
173+ }
174+ }
175+
176+ private static boolean isAsyncExecutionStarted (Map <String , Object > asyncResult ) {
177+ if (asyncResult == null ) {
178+ return false ;
157179 }
180+ Object success = asyncResult .get ("success" );
181+ return Boolean .TRUE .equals (success );
182+ }
183+
184+ private static boolean isRetryableAsyncStartError (Map <String , Object > asyncResult ) {
185+ if (asyncResult == null ) {
186+ return false ;
187+ }
188+ Object success = asyncResult .get ("success" );
189+ if (Boolean .TRUE .equals (success )) {
190+ return false ;
191+ }
192+ Object error = asyncResult .get ("error" );
193+ if (error == null ) {
194+ return false ;
195+ }
196+ String message = String .valueOf (error );
197+ return message .contains ("Another job is running" ) || message .contains ("Another command is executing" );
198+ }
199+
200+ private static Map <String , Object > executeAsyncWithRetry (ToolExecutionContext execContext , String commandStr , Integer timeoutMs ) {
201+ long maxWaitMs = DEFAULT_ASYNC_START_MAX_WAIT_MS ;
202+ if (timeoutMs != null && timeoutMs > 0 ) {
203+ maxWaitMs = Math .min (maxWaitMs , timeoutMs );
204+ }
205+
206+ long deadline = System .currentTimeMillis () + maxWaitMs ;
207+ Map <String , Object > asyncResult = null ;
208+
209+ while (System .currentTimeMillis () < deadline ) {
210+ asyncResult = execContext .getCommandContext ().executeAsync (commandStr );
211+ if (isAsyncExecutionStarted (asyncResult )) {
212+ return asyncResult ;
213+ }
214+
215+ if (isRetryableAsyncStartError (asyncResult )) {
216+ try {
217+ execContext .getCommandContext ().interruptJob ();
218+ } catch (Exception ignored ) {
219+ }
220+ try {
221+ Thread .sleep (DEFAULT_ASYNC_START_RETRY_INTERVAL_MS );
222+ } catch (InterruptedException e ) {
223+ Thread .currentThread ().interrupt ();
224+ return asyncResult ;
225+ }
226+ continue ;
227+ }
228+
229+ return asyncResult ;
230+ }
231+
232+ return asyncResult ;
158233 }
159234
160235 protected StringBuilder buildCommand (String baseCommand ) {
0 commit comments