Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
* [GH-827](https://github.com/apache/mina-sshd/issues/827) Don't fail on invalid `known_hosts` lines; log and skip them
* [GH-830](https://github.com/apache/mina-sshd/issues/830) EC public keys: let Bouncy Castle generate X.509 encodings with the curve OID as algorithm parameter
* [GH-856](https://github.com/apache/mina-sshd/issues/856) Fix using ed25519 with BC-FIPS
* [GH-861](https://github.com/apache/mina-sshd/issues/861) SFTP client: prevent sending zero-length writes in `SftpOutputStreamAsync`

* [SSHD-1348](https://issues.apache.org/jira/browse/SSHD-1348) Fix zero-length SFTP reads

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,12 @@ private long internalTransfer(ByteInput stream, boolean forceFlush) throws IOExc

@Override
public void flush() throws IOException {
internalFlush();
if (!isOpen()) {
throw new IOException("flush(" + getPath() + ") stream is closed");
}
if (buffer != null && buffer.available() > 0) {
internalFlush();
}
if (lastMsg != null) {
lastMsg.waitUntilSent();
lastMsg = null;
Expand All @@ -307,8 +312,8 @@ private void internalFlush() throws IOException {
for (int ackIndex = 1;; ackIndex++) {
SftpAckData ack = pendingAcks.peek();
if (ack == null) {
if (debugEnabled) {
log.debug("flush({}) processed {} pending writes", this, ackIndex);
if (debugEnabled && ackIndex > 1) {
log.debug("flush({}) processed {} pending writes", this, ackIndex - 1);
}
break;
}
Expand All @@ -333,39 +338,48 @@ private void internalFlush() throws IOException {
checkStatus(client, buf);
}

if (buffer == null) {
Buffer currentData = buffer;
buffer = null;
if (currentData == null) {
if (debugEnabled) {
log.debug("flush({}) no pending buffer to flush", this);
}
return;
}

int avail = buffer.available();
int avail = currentData.available();

if (avail == 0) {
if (debugEnabled) {
log.debug("flush({}) no pending data in buffer to flush", this);
}
return;
}

long currentOffset = offset;
offset += avail;

int wpos = buffer.wpos();
int wpos = currentData.wpos();
// 4 = handle length
// handle bytes
// 8 = file offset
// 4 = length of actual data
buffer.rpos(buffer.rpos() - 16 - handleId.length);
buffer.wpos(buffer.rpos());
buffer.putBytes(handleId);
buffer.putLong(offset);
buffer.putUInt(avail);
buffer.wpos(wpos);
currentData.rpos(currentData.rpos() - 16 - handleId.length);
currentData.wpos(currentData.rpos());
currentData.putBytes(handleId);
currentData.putLong(currentOffset);
currentData.putUInt(avail);
currentData.wpos(wpos);

if (lastMsg != null) {
lastMsg.waitUntilSent();
}
lastMsg = client.write(SftpConstants.SSH_FXP_WRITE, buffer);
SftpAckData ack = new SftpAckData(lastMsg.getId(), offset, avail);
lastMsg = client.write(SftpConstants.SSH_FXP_WRITE, currentData);
SftpAckData ack = new SftpAckData(lastMsg.getId(), currentOffset, avail);
if (debugEnabled) {
log.debug("flush({}) enqueue pending ack={}", this, ack);
}
pendingAcks.add(ack);

offset += avail;
buffer = null;
}

private void checkStatus(AbstractSftpClient client, Buffer buf) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sshd.sftp.client;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.sshd.common.util.io.IoUtils;
import org.apache.sshd.server.session.ServerSession;
import org.apache.sshd.server.subsystem.SubsystemFactory;
import org.apache.sshd.sftp.SftpModuleProperties;
import org.apache.sshd.sftp.common.SftpConstants;
import org.apache.sshd.sftp.server.FileHandle;
import org.apache.sshd.sftp.server.SftpEventListener;
import org.apache.sshd.sftp.server.SftpSubsystemFactory;
import org.apache.sshd.util.test.CommonTestSupportUtils;
import org.junit.jupiter.api.Test;

class SftpClientTest extends AbstractSftpClientTestSupport {

SftpClientTest() {
super();
}

@Test
void writeExactBufferSize() throws Exception {
SftpSubsystemFactory sftpFactory = null;
for (SubsystemFactory f : sshd.getSubsystemFactories()) {
if (f instanceof SftpSubsystemFactory) {
sftpFactory = (SftpSubsystemFactory) f;
break;
}
}
assertNotNull(sftpFactory);

AtomicInteger zeroWrite = new AtomicInteger();
SftpEventListener listener = new SftpEventListener() {

@Override
public void writing(
ServerSession session, String remoteHandle, FileHandle localHandle, long offset, byte[] data,
int dataOffset, int dataLen) throws IOException {
if (dataLen == 0) {
zeroWrite.incrementAndGet();
}
}
};

sftpFactory.addSftpEventListener(listener);

Path targetPath = detectTargetFolder();
Path parentPath = targetPath.getParent();
Path lclSftp = CommonTestSupportUtils.resolve(targetPath, SftpConstants.SFTP_SUBSYSTEM_NAME, getClass().getSimpleName(),
getCurrentTestName());
Path inputFile = targetPath.resolve("input.bin");
Path testFile = assertHierarchyTargetFolderExists(lclSftp).resolve("file.bin");
int packetSize = 32 * 1024;
int handleSize = SftpModuleProperties.FILE_HANDLE_SIZE.getRequired(sshd).intValue();
int payloadSize = packetSize - 30 - handleSize;
byte[] expected = new byte[payloadSize];

ThreadLocalRandom.current().nextBytes(expected);
try (SftpClient sftp = createSingleSessionClient()) {
Files.write(inputFile, expected);
String file = CommonTestSupportUtils.resolveRelativeRemotePath(parentPath, testFile);
try (InputStream in = Files.newInputStream(inputFile); OutputStream out = sftp.write(file)) {
IoUtils.copy(in, out, 32 * 1024);
out.flush(); // This flush caused a zero-bytes write; see GH-861
// Note: the implicit flush on out.close() did not produce such a zero-bytes write.
}
byte[] transferred = Files.readAllBytes(testFile);
assertArrayEquals(expected, transferred);
} finally {
Files.deleteIfExists(inputFile);
sftpFactory.removeSftpEventListener(listener);
}
assertEquals(0, zeroWrite.get(), "Unexpected SSH_FXP_WRITE with zero data bytes");
}

}