-
-
Notifications
You must be signed in to change notification settings - Fork 290
Expand file tree
/
Copy pathFastPipedOutputStream.java
More file actions
222 lines (195 loc) · 7.41 KB
/
Copy pathFastPipedOutputStream.java
File metadata and controls
222 lines (195 loc) · 7.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
/*
* @(#)$Id: FastPipedOutputStream.java 3619 2008-03-26 07:23:03Z yui $
*
* Copyright 2006-2008 Makoto YUI
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Contributors:
* Makoto YUI - initial implementation
*/
package hudson.remoting;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.io.PipedOutputStream;
import java.lang.ref.Cleaner;
import java.lang.ref.WeakReference;
/**
* This class is equivalent to {@link PipedOutputStream}. In the
* interface it only adds a constructor which allows for specifying the buffer
* size. Its implementation, however, is much simpler and a lot more efficient
* than its equivalent. It doesn't rely on polling. Instead it uses proper
* synchronization with its counterpart {@link FastPipedInputStream}.
*
* @author WD
* @see <a href="http://developer.java.sun.com/developer/bugParade/bugs/4404700.html">4404700</a>
* @see FastPipedOutputStream
*/
public class FastPipedOutputStream extends OutputStream implements ErrorPropagatingOutputStream {
private static final Cleaner CLEANER = Cleaner.create();
WeakReference<FastPipedInputStream> sink;
private final Throwable allocatedAt = new Throwable();
/**
* Creates an unconnected PipedOutputStream.
*/
public FastPipedOutputStream() {
super();
}
/**
* Creates a PipedOutputStream with a default buffer size and connects it to
* <code>sink</code>.
* @exception IOException It was already connected.
*/
public FastPipedOutputStream(FastPipedInputStream sink) throws IOException {
connect(sink);
}
/**
* Creates a PipedOutputStream with buffer size <code>bufferSize</code> and
* connects it to <code>sink</code>.
* @exception IOException It was already connected.
* @deprecated as of 1.350
* bufferSize parameter is ignored.
*/
@Deprecated
public FastPipedOutputStream(FastPipedInputStream sink, int bufferSize) throws IOException {
this(sink);
}
private FastPipedInputStream sink() throws IOException {
FastPipedInputStream s = sink.get();
if (s == null) {
throw new IOException("Reader side has already been abandoned", allocatedAt);
}
return s;
}
/**
* @exception IOException The pipe is not connected.
*/
@Override
public void close() throws IOException {
error(null);
}
@Override
public void error(Throwable e) throws IOException {
if (sink == null) {
throw new IOException("Unconnected pipe");
}
FastPipedInputStream s = sink();
synchronized (s.buffer) {
if (s.closed == null) {
s.closed = new FastPipedInputStream.ClosedBy(e);
flush();
}
}
}
/**
* @exception IOException The pipe is already connected.
*/
public void connect(FastPipedInputStream sink) throws IOException {
if (this.sink != null) {
throw new IOException("Pipe already connected");
}
this.sink = new WeakReference<>(sink);
sink.source = new WeakReference<>(this);
CLEANER.register(this, new CleanupTask(this.sink));
}
@Override
@SuppressFBWarnings(
value = "NN_NAKED_NOTIFY",
justification = "TODO: change to mutable state likely happened elsewhere")
public void flush() throws IOException {
FastPipedInputStream s = sink();
synchronized (s.buffer) {
// Release all readers.
s.buffer.notifyAll();
}
}
@Override
public void write(int b) throws IOException {
write(new byte[] {(byte) b});
}
@Override
public void write(@NonNull byte[] b) throws IOException {
write(b, 0, b.length);
}
/**
* @exception IOException The pipe is not connected or a reader has closed it.
*/
@Override
public void write(@NonNull byte[] b, int off, int len) throws IOException {
if (sink == null) {
throw new IOException("Unconnected pipe");
}
while (len > 0) {
FastPipedInputStream s = sink(); // make sure the sink is still trying to read, or else fail the write.
if (s.closed != null) {
throw new IOException("Pipe is already closed", s.closed);
}
synchronized (s.buffer) {
if (s.writePosition == s.readPosition && s.writeLaps > s.readLaps) {
// The circular buffer is full, so wait for some reader to consume
// something.
// release a reference to 's' during the wait so that if the reader has abandoned the pipe
// we can tell.
byte[] buf = s.buffer;
Thread t = Thread.currentThread();
String oldName = t.getName();
t.setName("Blocking to write " + HexDump.toHex(b, off, Math.min(len, 256)) + ": " + oldName);
try {
buf.wait(TIMEOUT);
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException(e.getMessage()).initCause(e);
} finally {
t.setName(oldName);
}
// Try again.
continue;
}
// Don't write more than the capacity indicated by len or the space
// available in the circular buffer.
int amount = Math.min(
len, (s.writePosition < s.readPosition ? s.readPosition : s.buffer.length) - s.writePosition);
System.arraycopy(b, off, s.buffer, s.writePosition, amount);
s.writePosition += amount;
if (s.writePosition == s.buffer.length) {
s.writePosition = 0;
++s.writeLaps;
}
off += amount;
len -= amount;
s.buffer.notifyAll();
}
}
}
static final int TIMEOUT = Integer.getInteger(FastPipedOutputStream.class.getName() + ".timeout", 10 * 1000);
private static class CleanupTask implements Runnable {
private final WeakReference<FastPipedInputStream> sinkRef;
CleanupTask(WeakReference<FastPipedInputStream> sinkRef) {
this.sinkRef = sinkRef;
}
@Override
public void run() {
FastPipedInputStream s = sinkRef.get();
if (s != null) {
synchronized (s.buffer) {
if (s.closed == null) {
s.closed = new FastPipedInputStream.ClosedBy(null);
s.buffer.notifyAll();
}
}
}
}
}
}