Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions src/main/java/hudson/remoting/FastPipedInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
package hudson.remoting;

import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.io.InputStream;
import java.io.PipedInputStream;
import java.lang.ref.Cleaner;
import java.lang.ref.WeakReference;

/**
Expand All @@ -38,6 +40,12 @@
* @see FastPipedOutputStream
*/
public class FastPipedInputStream extends InputStream {
private static final Cleaner CLEANER = Cleaner.create();

@SuppressFBWarnings(
value = "URF_UNREAD_FIELD",
justification = "Cleaner registration must be kept strongly reachable")
private final Cleaner.Cleanable cleanable;

final byte[] buffer;
/**
Expand All @@ -58,6 +66,7 @@ public class FastPipedInputStream extends InputStream {
*/
public FastPipedInputStream() {
this.buffer = new byte[0x10000];
this.cleanable = CLEANER.register(this, new CleanupTask(this.buffer));
}

/**
Expand All @@ -79,6 +88,7 @@ public FastPipedInputStream(FastPipedOutputStream source, int bufferSize) throws
connect(source);
}
this.buffer = new byte[bufferSize];
this.cleanable = CLEANER.register(this, new CleanupTask(this.buffer));
}

private void checkSource() throws IOException {
Expand Down Expand Up @@ -130,12 +140,6 @@ public void connect(FastPipedOutputStream source) throws IOException {
source.sink = new WeakReference<>(this);
}

@Override
protected void finalize() throws Throwable {
super.finalize();
close();
}

@Override
public void mark(int readLimit) {}

Expand Down Expand Up @@ -210,4 +214,24 @@ static final class ClosedBy extends Throwable {
super("The pipe was closed at...", error);
}
}

private static class CleanupTask implements Runnable {
private final byte[] buffer;

CleanupTask(byte[] buffer) {
this.buffer = buffer;
}

@Override
@SuppressFBWarnings(
value = "NN_NAKED_NOTIFY",
justification = "Waking up threads during GC, state is irrelevant")
public void run() {
if (buffer != null) {
synchronized (buffer) {
buffer.notifyAll(); // Wake up any pending writers
}
}
}
}
}
31 changes: 24 additions & 7 deletions src/main/java/hudson/remoting/FastPipedOutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.io.PipedOutputStream;
import java.lang.ref.Cleaner;
import java.lang.ref.WeakReference;

/**
Expand All @@ -40,7 +41,7 @@
* @see FastPipedOutputStream
*/
public class FastPipedOutputStream extends OutputStream implements ErrorPropagatingOutputStream {

private static final Cleaner CLEANER = Cleaner.create();
WeakReference<FastPipedInputStream> sink;

private final Throwable allocatedAt = new Throwable();
Expand Down Expand Up @@ -112,12 +113,7 @@ public void connect(FastPipedInputStream sink) throws IOException {
}
this.sink = new WeakReference<>(sink);
sink.source = new WeakReference<>(this);
}

@Override
protected void finalize() throws Throwable {
super.finalize();
close();
CLEANER.register(this, new CleanupTask(this.sink));
}

@Override
Expand Down Expand Up @@ -202,4 +198,25 @@ public void write(@NonNull byte[] b, int off, int len) throws IOException {
}

static final int TIMEOUT = Integer.getInteger(FastPipedOutputStream.class.getName() + ".timeout", 10 * 1000);

private static class CleanupTask implements Runnable {
private final WeakReference<FastPipedInputStream> sinkRef;

CleanupTask(WeakReference<FastPipedInputStream> sinkRef) {
this.sinkRef = sinkRef;
}

@Override
public void run() {
FastPipedInputStream s = sinkRef.get();
if (s != null) {
synchronized (s.buffer) {
if (s.closed == null) {
s.closed = new FastPipedInputStream.ClosedBy(null);
s.buffer.notifyAll();
}
}
}
}
}
}
42 changes: 30 additions & 12 deletions src/main/java/hudson/remoting/ProxyOutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.lang.ref.Cleaner;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -36,6 +37,9 @@
* {@link OutputStream} on a remote machine.
*/
final class ProxyOutputStream extends OutputStream implements ErrorPropagatingOutputStream {
private static final Cleaner CLEANER = Cleaner.create();
private final Cleaner.Cleanable cleanable;

Comment on lines +40 to +42
private Channel channel;
private int oid;

Expand All @@ -58,7 +62,9 @@ final class ProxyOutputStream extends OutputStream implements ErrorPropagatingOu
* when it's {@link #connect(Channel,int) connected} later,
* the data will be sent at once to the remote stream.
*/
public ProxyOutputStream() {}
public ProxyOutputStream() {
this.cleanable = CLEANER.register(this, new CleanupTask(null, -1));
}
Comment on lines +65 to +67

/**
* Creates an already connected {@link ProxyOutputStream}.
Expand All @@ -67,6 +73,7 @@ public ProxyOutputStream() {}
* The object id of the exported {@link OutputStream}.
*/
public ProxyOutputStream(@NonNull Channel channel, int oid) throws IOException {
this.cleanable = CLEANER.register(this, new CleanupTask(channel, oid));
connect(channel, oid);
}

Expand Down Expand Up @@ -185,17 +192,6 @@ private void doClose(Throwable error) throws IOException {
oid = -1;
}

@Override
protected void finalize() throws Throwable {
super.finalize();
// if we haven't done so, release the exported object on the remote side.
// if the object is auto-unexported, the export entry could have already been removed.
if (channel != null && oid != -1) {
channel.send(new Unexport(channel.newIoId(), oid));
oid = -1;
}
}

/**
* I/O operations in remoting gets executed by a separate pipe thread asynchronously.
* So if a closure performs some I/O (such as writing to the RemoteOutputStream) then returns,
Expand Down Expand Up @@ -465,4 +461,26 @@ public String toString() {
}

private static final Logger LOGGER = Logger.getLogger(ProxyOutputStream.class.getName());

private static class CleanupTask implements Runnable {
private final Channel channel;
private final int oid;

CleanupTask(Channel channel, int oid) {
this.channel = channel;
this.oid = oid;
}

@Override
public void run() {
if (channel != null && oid != -1) {
try {
channel.send(new Unexport(channel.newIoId(), oid));
} catch (Exception e) {
Comment on lines +465 to +479
java.util.logging.Logger.getLogger(ProxyOutputStream.class.getName())
.log(java.util.logging.Level.FINE, "Failed to unexport during cleanup", e);
}
}
}
}
}
50 changes: 24 additions & 26 deletions src/main/java/hudson/remoting/ProxyWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@

import edu.umd.cs.findbugs.annotations.CheckForNull;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.CharArrayWriter;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.Writer;
import java.lang.ref.Cleaner;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -40,6 +40,7 @@
* {@link Writer} on a remote machine.
*/
final class ProxyWriter extends Writer {
private static final Cleaner CLEANER = Cleaner.create();

@GuardedBy("this")
private Channel channel;
Expand Down Expand Up @@ -88,7 +89,7 @@ synchronized void connect(@NonNull Channel channel, int oid) throws IOException
}
this.channel = channel;
this.oid = oid;

CLEANER.register(this, new CleanupTask(channel, oid));
window = channel.getPipeWindow(oid);
Comment on lines 90 to 93

// if we already have bytes to write, do so now.
Expand Down Expand Up @@ -243,24 +244,6 @@ public void error(@CheckForNull Throwable cause) throws IOException {
}
}

@Override
// TODO: really?
@SuppressFBWarnings(value = "FI_FINALIZER_NULLS_FIELDS", justification = "As designed")
protected synchronized void finalize() throws Throwable {
super.finalize();
// if we haven't done so, release the exported object on the remote side.
// if the object is auto-unexported, the export entry could have already been removed.
if (channel != null) {
if (channel.remoteCapability.supportsProxyWriter2_35()) {
channel.send(new Unexport(channel.newIoId(), oid));
} else {
channel.send(new EOF(channel.newIoId(), oid));
}
channel = null;
oid = -1;
}
}

/**
* {@link Command} for sending bytes.
*/
Expand Down Expand Up @@ -412,11 +395,6 @@ protected void execute(final Channel channel) {
}
channel.pipeWriter.submit(ioId, () -> {
channel.unexport(oid, createdAt, false);
try {
os.close();
} catch (IOException e) {
// ignore errors
}
});
}

Expand Down Expand Up @@ -489,4 +467,24 @@ public String toString() {
}

private static final Logger LOGGER = Logger.getLogger(ProxyWriter.class.getName());
}

private static class CleanupTask implements Runnable {
private final Channel channel;
private final int oid;

CleanupTask(Channel channel, int oid) {
this.channel = channel;
this.oid = oid;
}

@Override
public void run() {
try {
// Sends the dead signal using all THREE required arguments
channel.send(new NotifyDeadWriter(channel, null, oid));
Comment on lines +483 to +484
} catch (IOException e) {
// ignore cleanup failures
}
}
} // Closes CleanupTask
} // Closes ProxyWriter (MUST BE THE VERY LAST LINE)