Skip to content

Commit c69cd0c

Browse files
committed
GH-855: Thread-safe DefaultSftpClient
Synchronize sending messages through the channel. Messages would be synchronized at the connection level anyway, and most uses of SFTP will be single-threaded anyway, so synchronizing on channel level should not impact performance much. With a thread-safe SftpClient, the use of SftpClients in the SftpFileSystem can be simplified.
1 parent c1f31ee commit c69cd0c

1 file changed

Lines changed: 20 additions & 10 deletions

File tree

sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/DefaultSftpClient.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import java.util.concurrent.ConcurrentHashMap;
3939
import java.util.concurrent.atomic.AtomicBoolean;
4040
import java.util.concurrent.atomic.AtomicInteger;
41+
import java.util.concurrent.atomic.AtomicReference;
42+
import java.util.concurrent.locks.ReentrantLock;
4143

4244
import org.apache.sshd.client.channel.ChannelSubsystem;
4345
import org.apache.sshd.client.channel.ClientChannel;
@@ -80,8 +82,9 @@ public class DefaultSftpClient extends AbstractSftpClient {
8082
private final AtomicBoolean closing = new AtomicBoolean(false);
8183
private final NavigableMap<String, byte[]> extensions = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
8284
private final NavigableMap<String, byte[]> exposedExtensions = Collections.unmodifiableNavigableMap(extensions);
83-
private Charset nameDecodingCharset;
84-
private SftpMessage lastMessage;
85+
private final ReentrantLock writeLock = new ReentrantLock();
86+
private final AtomicReference<SftpMessage> lastMessage = new AtomicReference<>();
87+
private volatile Charset nameDecodingCharset;
8588

8689
/**
8790
* @param clientSession The {@link ClientSession}
@@ -167,7 +170,7 @@ public void close() throws IOException {
167170
if (isOpen()) {
168171
this.channel.close(false);
169172
}
170-
lastMessage = null;
173+
lastMessage.set(null);
171174
}
172175

173176
/**
@@ -276,7 +279,7 @@ public int send(int cmd, Buffer buffer) throws IOException {
276279
msg.waitUntilSent();
277280
return msg.getId();
278281
} finally {
279-
lastMessage = null;
282+
lastMessage.compareAndSet(msg, null);
280283
}
281284
}
282285

@@ -311,13 +314,20 @@ public SftpMessage write(int cmd, Buffer buffer) throws IOException {
311314

312315
ClientChannel clientChannel = getClientChannel();
313316
IoOutputStream asyncIn = clientChannel.getAsyncIn();
314-
if (lastMessage != null) {
315-
lastMessage.waitUntilSent();
317+
writeLock.lock();
318+
try {
319+
SftpMessage msg = lastMessage.getAndSet(null);
320+
if (msg != null) {
321+
msg.waitUntilSent();
322+
}
323+
IoWriteFuture writeFuture = asyncIn.writeBuffer(buf);
324+
Duration sendTimeout = SFTP_CLIENT_CMD_TIMEOUT.getRequired(clientChannel);
325+
msg = new SftpMessage(id, writeFuture, sendTimeout);
326+
lastMessage.set(msg);
327+
return msg;
328+
} finally {
329+
writeLock.unlock();
316330
}
317-
IoWriteFuture writeFuture = asyncIn.writeBuffer(buf);
318-
Duration sendTimeout = SFTP_CLIENT_CMD_TIMEOUT.getRequired(clientChannel);
319-
lastMessage = new SftpMessage(id, writeFuture, sendTimeout);
320-
return lastMessage;
321331
}
322332

323333
@Override

0 commit comments

Comments
 (0)