Skip to content

Commit 0fe58fd

Browse files
committed
GH-9909: Fix SftpSession for shared client
Fixes: #9909 Issue link: #9909 The `SftpSession.close()` closes `sftpClient` and its `clientSession` unconditionally. At the same time the `DefaultSftpSessionFactory.isSharedSession` is expected to expose only a single shared client. When this `DefaultSftpSessionFactory` is used concurrently, there is a chance that one thread would close that shared client and another won't be able to interact due to `clientSession` is closed. * Fix `SftpSession` accepting an `isSharedClient` flag and doing nothing in the `close()` if client is shared. * Propagate `isSharedSession` state down to the `SftpSession` from the `DefaultSftpSessionFactory` * Closed shared client and its `clientSession` in the `DefaultSftpSessionFactory.destroy()` (cherry picked from commit 2df9611) # Conflicts: # spring-integration-sftp/src/test/java/org/springframework/integration/sftp/session/SftpSessionFactoryTests.java
1 parent f901759 commit 0fe58fd

File tree

3 files changed

+109
-2
lines changed

3 files changed

+109
-2
lines changed

spring-integration-sftp/src/main/java/org/springframework/integration/sftp/session/DefaultSftpSessionFactory.java

+22-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.io.IOException;
2020
import java.io.InputStream;
21+
import java.io.UncheckedIOException;
2122
import java.security.GeneralSecurityException;
2223
import java.security.KeyPair;
2324
import java.time.Duration;
@@ -296,7 +297,7 @@ public SftpSession getSession() {
296297
sftpClient = createSftpClient(initClientSession(), this.sftpVersionSelector, SftpErrorDataHandler.EMPTY);
297298
freshSftpClient = true;
298299
}
299-
sftpSession = new SftpSession(sftpClient);
300+
sftpSession = new SftpSession(sftpClient, this.isSharedSession);
300301
sftpSession.connect();
301302
if (this.isSharedSession && freshSftpClient) {
302303
this.sharedSftpClient = sftpClient;
@@ -428,6 +429,26 @@ public void destroy() throws Exception {
428429
if (this.isInnerClient && this.sshClient != null && this.sshClient.isStarted()) {
429430
this.sshClient.stop();
430431
}
432+
433+
SftpClient sharedSftpClientToClose = this.sharedSftpClient;
434+
if (sharedSftpClientToClose != null) {
435+
try {
436+
sharedSftpClientToClose.close();
437+
}
438+
catch (IOException ex) {
439+
throw new UncheckedIOException("failed to close an SFTP client", ex);
440+
}
441+
442+
try {
443+
ClientSession session = sharedSftpClientToClose.getSession();
444+
if (session != null && session.isOpen()) {
445+
session.close();
446+
}
447+
}
448+
catch (IOException ex) {
449+
throw new UncheckedIOException("failed to close an SFTP client (session)", ex);
450+
}
451+
}
431452
}
432453

433454
/**

spring-integration-sftp/src/main/java/org/springframework/integration/sftp/session/SftpSession.java

+18
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,23 @@ public class SftpSession implements Session<SftpClient.DirEntry> {
5555

5656
private final SftpClient sftpClient;
5757

58+
private final boolean isSharedClient;
59+
5860
public SftpSession(SftpClient sftpClient) {
61+
this(sftpClient, false);
62+
}
63+
64+
/**
65+
* Construct an instance based on a {@link SftpClient} and its {@code shared} status.
66+
* When {@code isSharedClient == true}, the {@link #close()} is void.
67+
* @param sftpClient the {@link SftpClient} to use.
68+
* @param isSharedClient whether the {@link SftpClient} is shared.
69+
* @since 6.3.9
70+
*/
71+
public SftpSession(SftpClient sftpClient, boolean isSharedClient) {
5972
Assert.notNull(sftpClient, "'sftpClient' must not be null");
6073
this.sftpClient = sftpClient;
74+
this.isSharedClient = isSharedClient;
6175
}
6276

6377
@Override
@@ -152,6 +166,10 @@ public void append(InputStream inputStream, String destination) throws IOExcepti
152166

153167
@Override
154168
public void close() {
169+
if (this.isSharedClient) {
170+
return;
171+
}
172+
155173
try {
156174
this.sftpClient.close();
157175
}

spring-integration-sftp/src/test/java/org/springframework/integration/sftp/session/SftpSessionFactoryTests.java

+69-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2024 the original author or authors.
2+
* Copyright 2014-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,6 +24,11 @@
2424
import java.util.ArrayList;
2525
import java.util.Collections;
2626
import java.util.List;
27+
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.ExecutorService;
29+
import java.util.concurrent.Executors;
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.atomic.AtomicInteger;
2732
import java.util.stream.IntStream;
2833

2934
import org.apache.sshd.client.SshClient;
@@ -35,6 +40,8 @@
3540
import org.apache.sshd.server.SshServer;
3641
import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
3742
import org.apache.sshd.sftp.client.SftpClient;
43+
import org.apache.sshd.sftp.client.SftpErrorDataHandler;
44+
import org.apache.sshd.sftp.client.SftpVersionSelector;
3845
import org.apache.sshd.sftp.client.impl.AbstractSftpClient;
3946
import org.apache.sshd.sftp.server.SftpSubsystemFactory;
4047
import org.junit.jupiter.api.Test;
@@ -260,4 +267,65 @@ void clientSessionIsClosedOnSessionClose() throws Exception {
260267
sftpSessionFactory.destroy();
261268
}
262269
}
270+
271+
@Test
272+
void sharedSessionConcurrentAccess() throws Exception {
273+
try (SshServer server = SshServer.setUpDefaultServer()) {
274+
server.setPasswordAuthenticator((arg0, arg1, arg2) -> true);
275+
server.setPort(0);
276+
server.setKeyPairProvider(new SimpleGeneratorHostKeyProvider(new File("hostkey.ser").toPath()));
277+
server.setSubsystemFactories(Collections.singletonList(new SftpSubsystemFactory()));
278+
server.start();
279+
280+
AtomicInteger clientInstances = new AtomicInteger();
281+
282+
DefaultSftpSessionFactory sftpSessionFactory = new DefaultSftpSessionFactory(true) {
283+
284+
@Override
285+
protected SftpClient createSftpClient(ClientSession clientSession,
286+
SftpVersionSelector initialVersionSelector, SftpErrorDataHandler errorDataHandler)
287+
throws IOException {
288+
289+
clientInstances.incrementAndGet();
290+
return super.createSftpClient(clientSession, initialVersionSelector, errorDataHandler);
291+
}
292+
293+
};
294+
sftpSessionFactory.setHost("localhost");
295+
sftpSessionFactory.setPort(server.getPort());
296+
sftpSessionFactory.setUser("user");
297+
sftpSessionFactory.setPassword("pass");
298+
sftpSessionFactory.setAllowUnknownKeys(true);
299+
300+
ExecutorService executorService = Executors.newFixedThreadPool(10);
301+
302+
CountDownLatch executionLatch = new CountDownLatch(20);
303+
List<Exception> errors = Collections.synchronizedList(new ArrayList<>());
304+
305+
for (int i = 0; i < 20; i++) {
306+
executorService.execute(() -> {
307+
try (SftpSession session = sftpSessionFactory.getSession()) {
308+
session.list(".");
309+
}
310+
catch (Exception e) {
311+
errors.add(e);
312+
}
313+
executionLatch.countDown();
314+
});
315+
}
316+
317+
assertThat(executionLatch.await(10, TimeUnit.SECONDS)).isTrue();
318+
synchronized (errors) {
319+
assertThat(errors).isEmpty();
320+
}
321+
322+
assertThat(clientInstances).hasValue(1);
323+
324+
executorService.shutdown();
325+
assertThat(executorService.awaitTermination(10, TimeUnit.SECONDS)).isTrue();
326+
327+
sftpSessionFactory.destroy();
328+
}
329+
}
330+
263331
}

0 commit comments

Comments
 (0)