Skip to content

Commit 0ce1f60

Browse files
authored
feat: implement background shell sessions (#106)
Signed-off-by: Frost Ming <me@frostming.com>
1 parent 8e9a046 commit 0ce1f60

File tree

3 files changed

+218
-16
lines changed

3 files changed

+218
-16
lines changed

src/bub/builtin/shell_manager.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import contextlib
5+
import uuid
6+
from dataclasses import dataclass, field
7+
8+
9+
@dataclass(slots=True)
10+
class ManagedShell:
11+
shell_id: str
12+
cmd: str
13+
cwd: str | None
14+
process: asyncio.subprocess.Process
15+
output_chunks: list[str] = field(default_factory=list)
16+
read_tasks: list[asyncio.Task[None]] = field(default_factory=list)
17+
18+
@property
19+
def output(self) -> str:
20+
return "".join(self.output_chunks)
21+
22+
@property
23+
def returncode(self) -> int | None:
24+
return self.process.returncode
25+
26+
@property
27+
def status(self) -> str:
28+
return "running" if self.returncode is None else "exited"
29+
30+
31+
class ShellManager:
32+
def __init__(self) -> None:
33+
self._shells: dict[str, ManagedShell] = {}
34+
35+
async def start(self, *, cmd: str, cwd: str | None) -> ManagedShell:
36+
process = await asyncio.create_subprocess_shell(
37+
cmd,
38+
cwd=cwd,
39+
stdout=asyncio.subprocess.PIPE,
40+
stderr=asyncio.subprocess.PIPE,
41+
)
42+
shell = ManagedShell(shell_id=f"bash-{uuid.uuid4().hex[:8]}", cmd=cmd, cwd=cwd, process=process)
43+
shell.read_tasks.extend([
44+
asyncio.create_task(self._drain_stream(shell, process.stdout)),
45+
asyncio.create_task(self._drain_stream(shell, process.stderr)),
46+
])
47+
self._shells[shell.shell_id] = shell
48+
return shell
49+
50+
def get(self, shell_id: str) -> ManagedShell:
51+
try:
52+
return self._shells[shell_id]
53+
except KeyError as exc:
54+
raise KeyError(f"unknown shell id: {shell_id}") from exc
55+
56+
async def terminate(self, shell_id: str) -> ManagedShell:
57+
shell = self.get(shell_id)
58+
if shell.returncode is not None:
59+
await self._finalize_shell(shell)
60+
return shell
61+
62+
shell.process.terminate()
63+
try:
64+
async with asyncio.timeout(3):
65+
await shell.process.wait()
66+
except TimeoutError:
67+
shell.process.kill()
68+
await shell.process.wait()
69+
await self._finalize_shell(shell)
70+
return shell
71+
72+
async def wait_closed(self, shell_id: str) -> ManagedShell:
73+
shell = self.get(shell_id)
74+
if shell.returncode is None:
75+
await shell.process.wait()
76+
await self._finalize_shell(shell)
77+
return shell
78+
79+
async def _finalize_shell(self, shell: ManagedShell) -> None:
80+
for task in shell.read_tasks:
81+
with contextlib.suppress(asyncio.CancelledError):
82+
await task
83+
84+
async def _drain_stream(
85+
self,
86+
shell: ManagedShell,
87+
stream: asyncio.StreamReader | None,
88+
) -> None:
89+
if stream is None:
90+
return
91+
while chunk := await stream.read(4096):
92+
shell.output_chunks.append(chunk.decode("utf-8", errors="replace"))
93+
94+
95+
shell_manager = ShellManager()

src/bub/builtin/tools.py

Lines changed: 48 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from pydantic import BaseModel, Field
99
from republic import AsyncTapeStore, TapeQuery, ToolContext
1010

11+
from bub.builtin.shell_manager import shell_manager
1112
from bub.skills import discover_skills
1213
from bub.tools import tool
1314

@@ -59,24 +60,52 @@ class SubAgentInput(BaseModel):
5960

6061
@tool(context=True)
6162
async def bash(
62-
cmd: str, cwd: str | None = None, timeout_seconds: int = DEFAULT_COMMAND_TIMEOUT_SECONDS, *, context: ToolContext
63+
cmd: str,
64+
cwd: str | None = None,
65+
timeout_seconds: int = DEFAULT_COMMAND_TIMEOUT_SECONDS,
66+
background: bool = False,
67+
*,
68+
context: ToolContext,
6369
) -> str:
64-
"""Run a shell command and return its output within a time limit. Raises if the command fails or times out."""
70+
"""Run a shell command. Use background=true to keep it running and fetch output later via bash_output."""
6571
workspace = context.state.get("_runtime_workspace")
66-
completed = await asyncio.create_subprocess_shell(
67-
cmd,
68-
cwd=cwd or workspace,
69-
stdout=asyncio.subprocess.PIPE,
70-
stderr=asyncio.subprocess.PIPE,
71-
)
72-
async with asyncio.timeout(timeout_seconds):
73-
stdout_bytes, stderr_bytes = await completed.communicate()
74-
stdout_text = (stdout_bytes or b"").decode("utf-8", errors="replace").strip()
75-
stderr_text = (stderr_bytes or b"").decode("utf-8", errors="replace").strip()
76-
if completed.returncode != 0:
77-
message = stderr_text or stdout_text or f"exit={completed.returncode}"
78-
raise RuntimeError(f"exit={completed.returncode}: {message}")
79-
return stdout_text or "(no output)"
72+
target_cwd = cwd or workspace
73+
shell = await shell_manager.start(cmd=cmd, cwd=target_cwd)
74+
if background:
75+
return f"started: {shell.shell_id}"
76+
try:
77+
async with asyncio.timeout(timeout_seconds):
78+
await shell_manager.wait_closed(shell.shell_id)
79+
except TimeoutError:
80+
await shell_manager.terminate(shell.shell_id)
81+
return f"command timed out after {timeout_seconds} seconds and was terminated"
82+
return shell.output.strip() or "(no output)"
83+
84+
85+
@tool(name="bash.output")
86+
async def bash_output(shell_id: str, offset: int = 0, limit: int | None = None) -> str:
87+
"""Read buffered output from a background shell, with optional offset/limit for incremental polling."""
88+
shell = shell_manager.get(shell_id)
89+
if shell.returncode is not None:
90+
await shell_manager.wait_closed(shell_id)
91+
output = shell.output
92+
start = max(0, min(offset, len(output)))
93+
end = len(output) if limit is None else min(len(output), start + max(0, limit))
94+
chunk = output[start:end].rstrip()
95+
exit_code = "null" if shell.returncode is None else str(shell.returncode)
96+
body = chunk or "(no output)"
97+
return f"id: {shell.shell_id}\nstatus: {shell.status}\nexit_code: {exit_code}\nnext_offset: {end}\noutput:\n{body}"
98+
99+
100+
@tool(name="bash.kill")
101+
async def kill_bash(shell_id: str) -> str:
102+
"""Terminate a background shell process."""
103+
shell = shell_manager.get(shell_id)
104+
if shell.returncode is None:
105+
shell = await shell_manager.terminate(shell_id)
106+
else:
107+
await shell_manager.wait_closed(shell_id)
108+
return f"id: {shell.shell_id}\nstatus: {shell.status}\nexit_code: {shell.returncode}"
80109

81110

82111
@tool(context=True, name="fs.read")
@@ -243,6 +272,9 @@ def show_help() -> str:
243272
" ,fs.read path=README.md\n"
244273
" ,fs.write path=tmp.txt content='hello'\n"
245274
" ,fs.edit path=tmp.txt old=hello new=world\n"
275+
" ,bash cmd='sleep 5' background=true\n"
276+
" ,bash_output shell_id=bsh-12345678\n"
277+
" ,kill_bash shell_id=bsh-12345678\n"
246278
"Any unknown command after ',' is executed as shell via bash."
247279
)
248280

tests/test_builtin_tools.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import shlex
5+
import sys
6+
7+
import pytest
8+
from republic import ToolContext
9+
10+
from bub.builtin.tools import bash, bash_output, kill_bash
11+
12+
13+
def _tool_context(tmp_path) -> ToolContext:
14+
return ToolContext(tape="test-tape", run_id="test-run", state={"_runtime_workspace": str(tmp_path)})
15+
16+
17+
def _python_shell(code: str) -> str:
18+
return f"{shlex.quote(sys.executable)} -c {shlex.quote(code)}"
19+
20+
21+
@pytest.mark.asyncio
22+
async def test_bash_returns_stdout_for_foreground_command(tmp_path) -> None:
23+
result = await bash.run(cmd=_python_shell("print('hello')"), context=_tool_context(tmp_path))
24+
25+
assert result == "hello"
26+
27+
28+
@pytest.mark.asyncio
29+
async def test_background_bash_exposes_output_via_bash_output(tmp_path) -> None:
30+
command = _python_shell(
31+
"import sys, time; print('start'); sys.stdout.flush(); time.sleep(0.2); print('done'); sys.stdout.flush()"
32+
)
33+
34+
started = await bash.run(cmd=command, background=True, context=_tool_context(tmp_path))
35+
shell_id = started.removeprefix("started: ").strip()
36+
37+
await asyncio.sleep(0.35)
38+
output = await bash_output.run(shell_id=shell_id)
39+
40+
assert output.startswith(f"id: {shell_id}\nstatus: exited\n")
41+
assert "exit_code: 0" in output
42+
assert "start" in output
43+
assert "done" in output
44+
45+
46+
@pytest.mark.asyncio
47+
async def test_kill_bash_terminates_background_process(tmp_path) -> None:
48+
started = await bash.run(
49+
cmd=_python_shell("import time; time.sleep(10)"),
50+
background=True,
51+
context=_tool_context(tmp_path),
52+
)
53+
shell_id = started.removeprefix("started: ").strip()
54+
55+
killed = await kill_bash.run(shell_id=shell_id)
56+
output = await bash_output.run(shell_id=shell_id)
57+
58+
assert killed.startswith(f"id: {shell_id}\nstatus: exited\nexit_code: ")
59+
assert "exit_code: null" not in killed
60+
assert output.startswith(f"id: {shell_id}\nstatus: exited\n")
61+
62+
63+
@pytest.mark.asyncio
64+
async def test_kill_bash_returns_status_when_process_already_finished(tmp_path) -> None:
65+
started = await bash.run(
66+
cmd=_python_shell("print('done')"),
67+
background=True,
68+
context=_tool_context(tmp_path),
69+
)
70+
shell_id = started.removeprefix("started: ").strip()
71+
72+
await asyncio.sleep(0.1)
73+
result = await kill_bash.run(shell_id=shell_id)
74+
75+
assert result == f"id: {shell_id}\nstatus: exited\nexit_code: 0"

0 commit comments

Comments
 (0)