Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions .github/workflows/test-windows-bash.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
name: Windows Bash Streaming Tests

on:
workflow_dispatch:
pull_request:
paths:
- "agent/tools/bash/bash.py"
- "tests/test_bash_streaming.py"
- ".github/workflows/test-windows-bash.yml"

jobs:
windows-bash-tests:
runs-on: windows-latest

steps:
- name: Checkout repository
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
cache: pip

- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install pytest
python -m pip install -r requirements.txt

- name: Run Windows Bash streaming tests
run: python -m pytest tests/test_bash_streaming.py -v
15 changes: 13 additions & 2 deletions agent/protocol/agent_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -1174,10 +1174,21 @@ def _execute_tool(self, tool_call: Dict) -> Dict[str, Any]:
# Set tool context
tool.model = self.model
tool.context = self.agent
tool.progress_callback = lambda message: self._emit_event(
"tool_execution_progress",
{
"tool_call_id": tool_id,
"tool_name": tool_name,
"message": message,
}
)

# Execute tool
start_time = time.time()
result: ToolResult = tool.execute_tool(arguments)
try:
result: ToolResult = tool.execute_tool(arguments)
finally:
tool.progress_callback = None
execution_time = time.time() - start_time

result_dict = {
Expand Down Expand Up @@ -1719,4 +1730,4 @@ def _prepare_messages(self) -> List[Dict[str, Any]]:
not as a message. The AgentLLMModel will handle this.
"""
# Don't add system message here - it will be handled separately by the LLM adapter
return self.messages
return self.messages
10 changes: 10 additions & 0 deletions agent/tools/base_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ class BaseTool:
description: str = "Base tool"
params: dict = {} # Store JSON Schema
model: Optional[Any] = None # LLM model instance, type depends on bot implementation
progress_callback = None

def report_progress(self, message: str):
callback = getattr(self, "progress_callback", None)
if not callback:
return
try:
callback(str(message))
except Exception as e:
logger.debug(f"[{self.name}] progress callback failed: {e}")

@classmethod
def get_json_schema(cls) -> dict:
Expand Down
118 changes: 108 additions & 10 deletions agent/tools/bash/bash.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@

import os
import re
import signal
import sys
import subprocess
import tempfile
import threading
import time
from typing import Dict, Any

from agent.tools.base_tool import BaseTool, ToolResult
Expand All @@ -19,6 +22,8 @@ class Bash(BaseTool):
"""Tool for executing bash commands"""

_IS_WIN = sys.platform == "win32"
_PROGRESS_MAX_BYTES = 4 * 1024
_PROGRESS_INTERVAL = 0.5

name: str = "bash"
description: str = f"""Execute a bash command in the current working directory. Returns stdout and stderr. Output is truncated to last {DEFAULT_MAX_LINES} lines or {DEFAULT_MAX_BYTES // 1024}KB (whichever is hit first). If truncated, full output is saved to a temp file.
Expand Down Expand Up @@ -113,17 +118,11 @@ def execute(self, args: Dict[str, Any]) -> ToolResult:
if command and not command.strip().lower().startswith("chcp"):
command = f"chcp 65001 >nul 2>&1 && {command}"

result = subprocess.run(
result = self._run_streaming(
command,
shell=True,
cwd=self.cwd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding="utf-8",
errors="replace",
timeout=timeout,
env=env,
timeout,
env,
dotenv_vars,
)

logger.debug(f"[Bash] Exit code: {result.returncode}")
Expand Down Expand Up @@ -236,6 +235,105 @@ def execute(self, args: Dict[str, Any]) -> ToolResult:
except Exception as e:
return ToolResult.fail(f"Error executing command: {str(e)}")

def _run_streaming(self, command: str, timeout: int, env: dict, dotenv_vars: dict):
process = subprocess.Popen(
command,
shell=True,
cwd=self.cwd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
start_new_session=not self._IS_WIN,
)
stdout_chunks, stderr_chunks = [], []
recent = bytearray()
recent_lock = threading.Lock()

def drain(stream, chunks):
while True:
chunk = os.read(stream.fileno(), 4096)
if not chunk:
break
chunks.append(chunk)
with recent_lock:
recent.extend(chunk)
if len(recent) > self._PROGRESS_MAX_BYTES:
del recent[:-self._PROGRESS_MAX_BYTES]

readers = [
threading.Thread(target=drain, args=(process.stdout, stdout_chunks), daemon=True),
threading.Thread(target=drain, args=(process.stderr, stderr_chunks), daemon=True),
]
for reader in readers:
reader.start()

started = time.monotonic()
last_reported_at = started
last_snapshot = None
try:
while process.poll() is None:
now = time.monotonic()
elapsed = now - started
if elapsed >= timeout:
self._kill_process(process)
raise subprocess.TimeoutExpired(command, timeout)
if elapsed >= self._PROGRESS_INTERVAL and now - last_reported_at >= self._PROGRESS_INTERVAL:
with recent_lock:
snapshot = bytes(recent).decode("utf-8", errors="replace")
snapshot = self._redact_progress(snapshot, dotenv_vars)
if snapshot and snapshot != last_snapshot:
self.report_progress(snapshot)
last_snapshot = snapshot
last_reported_at = now
time.sleep(0.1)
finally:
if process.poll() is None:
self._kill_process(process)
process.wait()
join_deadline = time.monotonic() + 5
for reader in readers:
reader.join(timeout=max(0, join_deadline - time.monotonic()))

from types import SimpleNamespace
return SimpleNamespace(
returncode=process.returncode,
stdout=b"".join(stdout_chunks).decode("utf-8", errors="replace"),
stderr=b"".join(stderr_chunks).decode("utf-8", errors="replace"),
)

def _kill_process(self, process):
if self._IS_WIN:
try:
result = subprocess.run(
["taskkill", "/F", "/T", "/PID", str(process.pid)],
capture_output=True,
timeout=5,
)
if result.returncode != 0 and process.poll() is None:
process.kill()
except (OSError, subprocess.SubprocessError):
if process.poll() is None:
process.kill()
else:
try:
os.killpg(process.pid, signal.SIGKILL)
except (PermissionError, ProcessLookupError):
if process.poll() is None:
process.kill()

@staticmethod
def _redact_progress(text: str, dotenv_vars: dict) -> str:
text = re.sub(
r'(?i)\b(API_KEY|TOKEN|PASSWORD|AUTHORIZATION)\s*=\s*[^\s]+',
lambda match: f"{match.group(1)}=[REDACTED]",
text,
)
for value in dotenv_vars.values():
value = str(value or "")
if len(value) >= 6:
text = text.replace(value, "[REDACTED]")
return text

def _get_safety_warning(self, command: str) -> str:
"""
Get safety warning for absolutely catastrophic commands only.
Expand Down
18 changes: 18 additions & 0 deletions channel/web/static/css/console.css
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,18 @@
color: inherit;
}
.tool-error-text { color: #f87171; }
.tool-live-output:empty { display: none; }
.tool-streaming .tool-live-output:not(:empty)::after {
content: ' ';
display: inline-block;
width: 0.45em;
height: 1em;
margin-left: 0.15em;
vertical-align: -0.15em;
background: currentColor;
animation: tool-cursor-blink 1s steps(1) infinite;
}
@keyframes tool-cursor-blink { 50% { opacity: 0; } }

/* Log level highlighting */
.log-line { display: block; }
Expand Down Expand Up @@ -1571,3 +1583,9 @@
.delete-msg-btn:hover {
color: #ef4444 !important;
}

.edit-msg-btn:disabled,
.delete-msg-btn:disabled {
cursor: not-allowed !important;
opacity: 0.35 !important;
}
Loading