Skip to content

Commit 2085597

Browse files
authored
Add a simple process io proxy (#704)
* Add a simple process io proxy
1 parent 9697ada commit 2085597

File tree

1 file changed

+276
-0
lines changed

1 file changed

+276
-0
lines changed
Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package software.amazon.smithy.java.mcp.cli;
7+
8+
import static java.util.concurrent.TimeUnit.SECONDS;
9+
10+
import java.io.IOException;
11+
import java.io.InputStream;
12+
import java.io.OutputStream;
13+
import java.util.List;
14+
import java.util.Map;
15+
import java.util.concurrent.CompletableFuture;
16+
import java.util.concurrent.atomic.AtomicBoolean;
17+
import software.amazon.smithy.java.logging.InternalLogger;
18+
19+
/**
20+
* A simple proxy that forwards standard input to a subprocess and the subprocess's
21+
* standard output back to the original standard output. It doesn't interpret
22+
* the data flowing through the streams.
23+
*/
24+
public final class ProcessIoProxy {
25+
private static final InternalLogger LOG = InternalLogger.getLogger(ProcessIoProxy.class);
26+
27+
private final ProcessBuilder processBuilder;
28+
private final InputStream inputStream;
29+
private final OutputStream outputStream;
30+
private final OutputStream errorStream;
31+
private volatile Process process;
32+
private volatile Thread inputThread;
33+
private volatile Thread outputThread;
34+
private volatile Thread errorThread;
35+
private final AtomicBoolean running = new AtomicBoolean(false);
36+
37+
private ProcessIoProxy(Builder builder) {
38+
processBuilder = new ProcessBuilder();
39+
processBuilder.command().add(builder.command);
40+
41+
if (builder.arguments != null) {
42+
processBuilder.command().addAll(builder.arguments);
43+
}
44+
45+
// Set environment variables if provided
46+
if (builder.environmentVariables != null) {
47+
processBuilder.environment().putAll(builder.environmentVariables);
48+
}
49+
50+
this.inputStream = builder.inputStream;
51+
this.outputStream = builder.outputStream;
52+
this.errorStream = builder.errorStream;
53+
54+
processBuilder.redirectErrorStream(false); // Keep stderr separate
55+
}
56+
57+
/**
58+
* Builder for creating ProcessStdIoProxy instances.
59+
*/
60+
public static final class Builder {
61+
private String command;
62+
private List<String> arguments;
63+
private Map<String, String> environmentVariables;
64+
private InputStream inputStream = System.in;
65+
private OutputStream outputStream = System.out;
66+
private OutputStream errorStream = System.err;
67+
68+
/**
69+
* Sets the command to execute.
70+
*
71+
* @param command The command to execute
72+
* @return This builder for method chaining
73+
*/
74+
public Builder command(String command) {
75+
this.command = command;
76+
return this;
77+
}
78+
79+
/**
80+
* Sets the command-line arguments.
81+
*
82+
* @param arguments The command-line arguments
83+
* @return This builder for method chaining
84+
*/
85+
public Builder arguments(List<String> arguments) {
86+
this.arguments = arguments;
87+
return this;
88+
}
89+
90+
/**
91+
* Sets the environment variables for the process.
92+
*
93+
* @param environmentVariables The environment variables
94+
* @return This builder for method chaining
95+
*/
96+
public Builder environmentVariables(Map<String, String> environmentVariables) {
97+
this.environmentVariables = environmentVariables;
98+
return this;
99+
}
100+
101+
/**
102+
* Customize IoStreams (input, output, error).
103+
* Any stream that is null will use the system default (System.in, System.out, System.err).
104+
*
105+
* @param input The input stream to use (defaults to System.in if null)
106+
* @param output The output stream to use (defaults to System.out if null)
107+
* @param error The error stream to use (defaults to System.err if null)
108+
* @return This builder for method chaining
109+
*/
110+
public Builder streams(InputStream input, OutputStream output, OutputStream error) {
111+
this.inputStream = input == null ? System.in : input;
112+
this.outputStream = output == null ? System.out : output;
113+
this.errorStream = error == null ? System.err : error;
114+
return this;
115+
}
116+
117+
/**
118+
* Builds a new ProcessStdIoProxy instance.
119+
*
120+
* @return A new ProcessStdIoProxy instance
121+
* @throws IllegalArgumentException if command is null or empty
122+
*/
123+
public ProcessIoProxy build() {
124+
if (command == null || command.isEmpty()) {
125+
throw new IllegalArgumentException("Command must be provided");
126+
}
127+
return new ProcessIoProxy(this);
128+
}
129+
}
130+
131+
/**
132+
* Creates a new builder for ProcessStdIoProxy.
133+
*
134+
* @return A new builder instance
135+
*/
136+
public static Builder builder() {
137+
return new Builder();
138+
}
139+
140+
// Buffer size for reading/writing data
141+
private static final int BUFFER_SIZE = 4096;
142+
143+
/**
144+
* Creates a thread that forwards data from an input stream to an output stream.
145+
*
146+
* @param input The source input stream
147+
* @param output The target output stream
148+
* @param name The name of the thread
149+
* @param errorMessage Error message to log if an exception occurs
150+
* @param closeOutput Whether to close the output stream when done
151+
* @return The created thread
152+
*/
153+
private static Thread createForwardingThread(
154+
Process process,
155+
InputStream input,
156+
OutputStream output,
157+
String name,
158+
String errorMessage,
159+
boolean closeOutput,
160+
AtomicBoolean running
161+
) {
162+
return Thread.ofVirtual()
163+
.name(name)
164+
.start(() -> {
165+
try {
166+
byte[] buffer = new byte[BUFFER_SIZE];
167+
int bytesRead;
168+
while (running.get() && process.isAlive() && (bytesRead = input.read(buffer)) != -1) {
169+
output.write(buffer, 0, bytesRead);
170+
output.flush();
171+
}
172+
} catch (IOException e) {
173+
if (running.get()) {
174+
LOG.error(errorMessage, e);
175+
}
176+
} finally {
177+
if (closeOutput) {
178+
try {
179+
output.close();
180+
} catch (IOException e) {
181+
LOG.error("Error closing stream", e);
182+
}
183+
}
184+
}
185+
});
186+
}
187+
188+
/**
189+
* Starts the proxy, launching the subprocess and beginning to forward stdin/stdout.
190+
*
191+
* @throws RuntimeException If there is an error starting the process
192+
*/
193+
public synchronized void start() {
194+
if (running.compareAndSet(false, true)) {
195+
return;
196+
}
197+
try {
198+
process = processBuilder.start();
199+
200+
// Thread to forward input to process
201+
inputThread = createForwardingThread(
202+
process,
203+
inputStream,
204+
process.getOutputStream(),
205+
"process-stdin-forwarder",
206+
"Error forwarding input to process",
207+
true, // Close the process input stream when done
208+
running);
209+
210+
// Thread to forward process stdout to output
211+
outputThread = createForwardingThread(
212+
process,
213+
process.getInputStream(),
214+
outputStream,
215+
"process-stdout-forwarder",
216+
"Error forwarding process stdout",
217+
false, // Don't close the output stream
218+
running);
219+
220+
// Thread to forward process stderr to error
221+
errorThread = createForwardingThread(
222+
process,
223+
process.getErrorStream(),
224+
errorStream,
225+
"process-stderr-forwarder",
226+
"Error forwarding process stderr",
227+
false, // Don't close the error stream
228+
running);
229+
230+
} catch (IOException e) {
231+
running.set(false);
232+
throw new RuntimeException("Failed to start process: " + e.getMessage(), e);
233+
}
234+
}
235+
236+
/**
237+
* Shuts down the proxy, stopping all forwarding and terminating the subprocess.
238+
*
239+
* @return A CompletableFuture that completes when shutdown is finished
240+
*/
241+
public CompletableFuture<Void> shutdown() {
242+
return CompletableFuture.runAsync(() -> {
243+
synchronized (this) {
244+
if (process != null && process.isAlive()) {
245+
try {
246+
247+
// Destroy the process
248+
process.destroy();
249+
250+
// Wait for termination with timeout
251+
if (!process.waitFor(5, SECONDS)) {
252+
// Force kill if it doesn't terminate gracefully
253+
process.destroyForcibly();
254+
}
255+
256+
// Interrupt the threads
257+
interruptThread(inputThread);
258+
interruptThread(outputThread);
259+
interruptThread(errorThread);
260+
261+
} catch (InterruptedException e) {
262+
LOG.error("Error shutting down process", e);
263+
Thread.currentThread().interrupt();
264+
}
265+
}
266+
}
267+
running.set(false);
268+
});
269+
}
270+
271+
private static void interruptThread(Thread thread) {
272+
if (thread != null && thread.isAlive()) {
273+
thread.interrupt();
274+
}
275+
}
276+
}

0 commit comments

Comments
 (0)