Skip to content

Commit c506d1e

Browse files
committed
GH-774: Fix WritePendingException
If an SFTP file copy the twin buffering in SftpOutputStreamAsync may lead to it reading from the InputStream before the last buffer has been written. If the InputStream is an SftpInputStreamAsync and has no buffered data, it may also send SFTP messages to request more data. If both streams use the same SftpClient and thus the same SSH channel, this may lead to the input stream writing its "more data" request before the output stream has completed its write, causing the WritePendingException. Fix this by ensuring that DefaultSftpClient.write() waits for the last write before doing a new write. This keeps the benefits of the twin buffer in SftpOututStreamAsync (it still can read ahead most of the time while a buffer is written) and at the same time ensures that other SFTP messages don't step on an asynchronous buffer write.
1 parent b7843c3 commit c506d1e

8 files changed

Lines changed: 214 additions & 3 deletions

File tree

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
* [GH-727](https://github.com/apache/mina-sshd/issues/727) Supply default port 22 for proxy jump hosts for which there is no `HostConfigEntry`
4040
* [GH-733](https://github.com/apache/mina-sshd/issues/733) Fix `SftpRemotePathChannel.transferTo()` (avoid NPE)
4141
* [GH-751](https://github.com/apache/mina-sshd/issues/751) Fix SFTP v3 "long name" if SFTP server uses an `SftpFileSystem` to another server
42+
* [GH-774](https://github.com/apache/mina-sshd/issues/774) Fix `WritePendingException` in SFTP file copy
4243

4344

4445
* [SSHD-1343](https://issues.apache.org/jira/projects/SSHD/issues/SSHD-1343) Correct documentation in `ChannelDataReceiver`

sshd-sftp/src/main/java/org/apache/sshd/sftp/client/impl/DefaultSftpClient.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ public class DefaultSftpClient extends AbstractSftpClient {
8181
private final NavigableMap<String, byte[]> extensions = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
8282
private final NavigableMap<String, byte[]> exposedExtensions = Collections.unmodifiableNavigableMap(extensions);
8383
private Charset nameDecodingCharset;
84+
private SftpMessage lastMessage;
8485

8586
/**
8687
* @param clientSession The {@link ClientSession}
@@ -166,6 +167,7 @@ public void close() throws IOException {
166167
if (isOpen()) {
167168
this.channel.close(false);
168169
}
170+
lastMessage = null;
169171
}
170172

171173
/**
@@ -270,8 +272,12 @@ protected void process(Buffer incoming) throws IOException {
270272
@Override
271273
public int send(int cmd, Buffer buffer) throws IOException {
272274
SftpMessage msg = write(cmd, buffer);
273-
msg.waitUntilSent();
274-
return msg.getId();
275+
try {
276+
msg.waitUntilSent();
277+
return msg.getId();
278+
} finally {
279+
lastMessage = null;
280+
}
275281
}
276282

277283
@Override
@@ -305,9 +311,13 @@ public SftpMessage write(int cmd, Buffer buffer) throws IOException {
305311

306312
ClientChannel clientChannel = getClientChannel();
307313
IoOutputStream asyncIn = clientChannel.getAsyncIn();
314+
if (lastMessage != null) {
315+
lastMessage.waitUntilSent();
316+
}
308317
IoWriteFuture writeFuture = asyncIn.writeBuffer(buf);
309318
Duration sendTimeout = SFTP_CLIENT_CMD_TIMEOUT.getRequired(clientChannel);
310-
return new SftpMessage(id, writeFuture, sendTimeout);
319+
lastMessage = new SftpMessage(id, writeFuture, sendTimeout);
320+
return lastMessage;
311321
}
312322

313323
@Override
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.sshd.sftp.client;
20+
21+
import java.io.ByteArrayInputStream;
22+
import java.io.ByteArrayOutputStream;
23+
import java.io.InputStream;
24+
import java.io.OutputStream;
25+
import java.security.KeyPair;
26+
27+
import org.apache.sshd.client.SshClient;
28+
import org.apache.sshd.client.session.ClientSession;
29+
import org.apache.sshd.common.Factory;
30+
import org.apache.sshd.common.random.Random;
31+
import org.apache.sshd.common.util.io.IoUtils;
32+
import org.apache.sshd.common.util.security.SecurityUtils;
33+
import org.junit.jupiter.api.AfterEach;
34+
import org.junit.jupiter.api.BeforeEach;
35+
import org.junit.jupiter.api.Test;
36+
import org.testcontainers.containers.GenericContainer;
37+
import org.testcontainers.junit.jupiter.Container;
38+
import org.testcontainers.junit.jupiter.Testcontainers;
39+
import org.testcontainers.utility.MountableFile;
40+
41+
import static org.junit.Assert.assertArrayEquals;
42+
43+
@Testcontainers(disabledWithoutDocker = true)
44+
class OpenSshTest {
45+
46+
private static final String RESOURCES = "/" + OpenSshTest.class.getPackage().getName().replace('.', '/');
47+
48+
@Container
49+
static GenericContainer<?> server = new GenericContainer<>("atmoz/sftp:alpine") //
50+
.withEnv("SFTP_USERS", "foo::::upload")
51+
// Set it up for pubkey auth
52+
.withCopyFileToContainer(MountableFile.forClasspathResource(RESOURCES + "/rsa_key.pub"),
53+
"/home/foo/.ssh/keys/id_rsa.pub")
54+
// Give it static known host keys!
55+
.withCopyFileToContainer(MountableFile.forClasspathResource(RESOURCES + "/ed25519_key", 0x180),
56+
"/etc/ssh/ssh_host_ed25519_key")
57+
.withCopyFileToContainer(MountableFile.forClasspathResource(RESOURCES + "/rsa_key", 0x180),
58+
"/etc/ssh/ssh_host_rsa_key")
59+
.withExposedPorts(22);
60+
61+
private SshClient client;
62+
63+
@BeforeEach
64+
void setUp() throws Exception {
65+
client = SshClient.setUpDefaultClient();
66+
client.setServerKeyVerifier((s, a, k) -> true);
67+
// Load the user key
68+
try (InputStream in = this.getClass().getResourceAsStream(RESOURCES + "/rsa_key")) {
69+
Iterable<KeyPair> clientKeys = SecurityUtils.loadKeyPairIdentities(null, null, in, null);
70+
client.setKeyIdentityProvider(s -> clientKeys);
71+
}
72+
client.start();
73+
}
74+
75+
@AfterEach
76+
void teardown() {
77+
if (client != null) {
78+
client.stop();
79+
}
80+
}
81+
82+
@Test // GH-774
83+
void copyByReadWrite() throws Exception {
84+
// Size matters; with only 1MB GH-774 is not reproducible. Probably the size needs to be larger than the channel
85+
// window (2MB by default).
86+
byte[] expected = new byte[8 * 1024 * 1024];
87+
88+
Factory<? extends Random> factory = client.getRandomFactory();
89+
Random rnd = factory.create();
90+
rnd.fill(expected);
91+
92+
try (ClientSession sshSession = client.connect("foo", server.getHost(), server.getMappedPort(22)).verify()
93+
.getClientSession()) {
94+
sshSession.auth().verify(2000);
95+
try (SftpClient sftp = SftpClientFactory.instance().createSftpClient(sshSession)) {
96+
// Copy some data onto the server
97+
try (InputStream in = new ByteArrayInputStream(expected);
98+
OutputStream out = sftp.write("upload/file.bin", SftpClient.OpenMode.Create,
99+
SftpClient.OpenMode.Write)) {
100+
IoUtils.copy(in, out);
101+
}
102+
try (InputStream in = sftp.read("upload/file.bin");
103+
OutputStream out = sftp.write("upload/file.new", SftpClient.OpenMode.Create,
104+
SftpClient.OpenMode.Write)) {
105+
IoUtils.copy(in, out);
106+
}
107+
byte[] actual;
108+
try (InputStream in = sftp.read("upload/file.new");
109+
ByteArrayOutputStream buf = new ByteArrayOutputStream(expected.length)) {
110+
byte[] data = new byte[4096];
111+
for (int n = 0; n >= 0;) {
112+
n = in.read(data, 0, data.length);
113+
if (n > 0) {
114+
buf.write(data, 0, n);
115+
}
116+
}
117+
actual = buf.toByteArray();
118+
}
119+
assertArrayEquals(expected, actual);
120+
}
121+
}
122+
}
123+
124+
}

sshd-sftp/src/test/java/org/apache/sshd/sftp/client/SftpTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,46 @@ public void readWriteDownload(int handleSize) throws Exception {
396396
}
397397
}
398398

399+
@MethodSource("getParameters")
400+
@ParameterizedTest(name = "FILE_HANDLE_SIZE {0}") // see GH-774
401+
public void copyByReadWrite(int handleSize) throws Exception {
402+
initSftpTest(handleSize);
403+
Path targetPath = detectTargetFolder();
404+
Path parentPath = targetPath.getParent();
405+
Path lclSftp = CommonTestSupportUtils.resolve(targetPath, SftpConstants.SFTP_SUBSYSTEM_NAME, getClass().getSimpleName(),
406+
getCurrentTestName());
407+
Path testFile = assertHierarchyTargetFolderExists(lclSftp).resolve("file.bin");
408+
// Size matters; with only 1MB GH-774 is not reproducible. Probably the size needs to be larger than the channel
409+
// window (2MB by default).
410+
byte[] expected = new byte[8 * 1024 * 1024];
411+
412+
Factory<? extends Random> factory = sshd.getRandomFactory();
413+
Random rnd = factory.create();
414+
rnd.fill(expected);
415+
Files.write(testFile, expected);
416+
417+
String file = CommonTestSupportUtils.resolveRelativeRemotePath(parentPath, testFile);
418+
try (SftpClient sftp = createSingleSessionClient()) {
419+
try (InputStream in = sftp.read(file);
420+
OutputStream out = sftp.write(file + ".new", SftpClient.OpenMode.Create, SftpClient.OpenMode.Write)) {
421+
IoUtils.copy(in, out);
422+
}
423+
byte[] actual;
424+
try (InputStream in = sftp.read(file + ".new");
425+
ByteArrayOutputStream buf = new ByteArrayOutputStream(expected.length)) {
426+
byte[] data = new byte[4096];
427+
for (int n = 0; n >= 0;) {
428+
n = in.read(data, 0, data.length);
429+
if (n > 0) {
430+
buf.write(data, 0, n);
431+
}
432+
}
433+
actual = buf.toByteArray();
434+
}
435+
assertArrayEquals(expected, actual);
436+
}
437+
}
438+
399439
@MethodSource("getParameters")
400440
@ParameterizedTest(name = "FILE_HANDLE_SIZE {0}")
401441
public void emptyFileDownload(int handleSize) throws Exception {
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
-----BEGIN OPENSSH PRIVATE KEY-----
2+
b3BlbnNzaC1rZXktdjEAAAAABG5vbmUAAAAEbm9uZQAAAAAAAAABAAAAMwAAAAtzc2gtZW
3+
QyNTUxOQAAACBbwMG5AH0I3R787lv7XKYJkA5p5PoRSCC3YNlHheI+iwAAAIhHK6krRyup
4+
KwAAAAtzc2gtZWQyNTUxOQAAACBbwMG5AH0I3R787lv7XKYJkA5p5PoRSCC3YNlHheI+iw
5+
AAAECegC4DHWX/fh2doVuwKRnSsBRWLsgptHoiJvir77yQGVvAwbkAfQjdHvzuW/tcpgmQ
6+
Dmnk+hFIILdg2UeF4j6LAAAAAAECAwQF
7+
-----END OPENSSH PRIVATE KEY-----
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIFvAwbkAfQjdHvzuW/tcpgmQDmnk+hFIILdg2UeF4j6L
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
-----BEGIN RSA PRIVATE KEY-----
2+
MIIEpAIBAAKCAQEAxY3Hr1SqpJIQ9SbFfGMGweVy8jg2TEH3GC1K0LudQHJwogRi
3+
+debdCqUtuSITbpPhjkeZSk9rq198d6RhT6TQmY9J8wLL2/+VXZk/rMVEEjeXQS3
4+
ImRnL2vVmkAunv6LwfDGHIovkhwj3/lqGWphDAKnHyXusPDwQ3N4LFGgxwXvRGqc
5+
lzmP8H+KDWaaPapk1AZCBIoD4JbL8faBtLNU01r+pB3sIKvfsPJ5DxPErThfrPuD
6+
qIbA3axEqFlgX4aVl3yMnSWjfhLhO7xD3YwrtUhannHt8pZQo5FkwCGWDpkG3xs+
7+
qK3ZACrhMFMTvPuDS83jDtEzNd5KYb4KnkOPMQIDAQABAoIBAQCE5GktgrD/39pU
8+
b25tzFehW25FjpbIGZ/UvbMUUwDnd5RZCMZj9yv1qyc7GOSwFOKmEgpmVqXNuZt9
9+
dxFBJuT8x7Xf7Zygnp/icbBivakvuTUMMb3X/t6CwfGAwCgcgHMXVZaPYE275f4k
10+
Dq3Wxv7di3NMusGkeY/GcAipF4gmGKKe7Ck1ifRypF2cDJsgTtsoFUHNNKfnT3gf
11+
OcJsVLRl0osbsxdqU+Tep46+jHrNt8J9n2VeRNRIqGHj0CkNdpLQOs+MjvIO3Hgq
12+
9NUxwIExwaPnBpTLlWwfemCz3JQnlAineMbYBGa1tpAA3Iw56NWcNbiOPyUyffbI
13+
wBC4r1uZAoGBAPESsergFD+ontChEI+h38oM/D9DKCObZR2kz6WArZ54i1dJWOgh
14+
HCsuxgPjxmaddPKghfNhUORdZBynuS5G7n6BfItNilDiFm2KBk12d38OVovUFo1Q
15+
r5akclKf0kFxHt5TzHIrNAv7B4OF0Uk3kuDHM7ITX3qDpTSBLlzPAUUHAoGBANHJ
16+
QIPmuF2q+PXnnSgdEyiETfl/IqUTXQyxda8kRIPJKKHZKPHZePhgJKUq9VP32PrP
17+
AxIBNrS3Netsp+EAApj09hmWUcgJRIU1/wjpVGqUmguYgh8nVFOPDudOJD5ltQ/A
18+
enzQ19IkGroaQB8CBGZsPaBAvqRZ5PLbm+BZEPQHAoGAblaMMGCXY/udlQfjOJpy
19+
f1wqKBpoyMNbKJJCqBGZZaruu+jKVJSy++DQqP8b0+PFnzdxl8+24o8MP0FVNKUq
20+
i6RgiLHY2ORiN4ixEctjLjg1zJIqMEv50g06di7IYUORSVk5fhfgHourCLu66rQQ
21+
+eiy9JKBZOXUO4/U1I26mwkCgYAhfuCuLsiBLCtUGAcfwISuk3FfxMzjTpQs0qjX
22+
rhLCd/vk26eN9gs6nR88v/8ryQb8BNGYrljtwdL6I/8qDbZcdcBVlYq5RcGLA3QV
23+
GCxCWDfAYjlkgAMW1GCsze07iUG/ohvskevjwaAC1u4mBUxujhnI3I2T8EZ+AFKD
24+
H7V1QQKBgQDNt+zjSdLtA9AczxDwWmi5SbS+k+nGbi6AQO9i73wky/wxx7FonfWS
25+
2skkOUIst3HBc0Oz+CJTfNFQK6GVqtzTdlZFhMYS0ua1Djd6q6S648+K0cieY4r5
26+
5irivHYVN8t7lBcvbA7E7yD6dHXSHsn6yOLTrV382qRfJTbxG7ZVWA==
27+
-----END RSA PRIVATE KEY-----
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDFjcevVKqkkhD1JsV8YwbB5XLyODZMQfcYLUrQu51AcnCiBGL515t0KpS25IhNuk+GOR5lKT2urX3x3pGFPpNCZj0nzAsvb/5VdmT+sxUQSN5dBLciZGcva9WaQC6e/ovB8MYcii+SHCPf+WoZamEMAqcfJe6w8PBDc3gsUaDHBe9EapyXOY/wf4oNZpo9qmTUBkIEigPglsvx9oG0s1TTWv6kHewgq9+w8nkPE8StOF+s+4OohsDdrESoWWBfhpWXfIydJaN+EuE7vEPdjCu1SFqece3yllCjkWTAIZYOmQbfGz6ordkAKuEwUxO8+4NLzeMO0TM13kphvgqeQ48x user01

0 commit comments

Comments
 (0)