Skip to content

Commit 9e8367f

Browse files
[1.16] Sftp reconnections 1.16 (#4098)
Signed-off-by: Javier Aliaga <[email protected]>
1 parent c68a6ed commit 9e8367f

File tree

11 files changed

+670
-49
lines changed

11 files changed

+670
-49
lines changed

.build-tools/go.mod

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
module github.com/dapr/components-contrib/build-tools
22

3-
go 1.24.4
3+
go 1.24.9
4+
5+
toolchain go1.24.10
46

57
require (
68
github.com/dapr/components-contrib v0.0.0

bindings/sftp/client.go

Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
/*
2+
Copyright 2025 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package sftp
15+
16+
import (
17+
"errors"
18+
"fmt"
19+
"os"
20+
"sync"
21+
"sync/atomic"
22+
23+
sftpClient "github.com/pkg/sftp"
24+
"golang.org/x/crypto/ssh"
25+
26+
"github.com/dapr/kit/logger"
27+
)
28+
29+
type Client struct {
30+
sshClient *ssh.Client
31+
sftpClient *sftpClient.Client
32+
address string
33+
config *ssh.ClientConfig
34+
lock sync.RWMutex
35+
needsReconnect atomic.Bool
36+
log logger.Logger
37+
}
38+
39+
func newClient(address string, config *ssh.ClientConfig, log logger.Logger) (*Client, error) {
40+
if address == "" || config == nil {
41+
return nil, errors.New("sftp binding error: client not initialized")
42+
}
43+
44+
sshClient, err := newSSHClient(address, config)
45+
if err != nil {
46+
return nil, err
47+
}
48+
49+
newSftpClient, err := sftpClient.NewClient(sshClient)
50+
if err != nil {
51+
_ = sshClient.Close()
52+
return nil, fmt.Errorf("sftp binding error: error create sftp client: %w", err)
53+
}
54+
55+
return &Client{
56+
sshClient: sshClient,
57+
sftpClient: newSftpClient,
58+
address: address,
59+
config: config,
60+
log: log,
61+
}, nil
62+
}
63+
64+
func (c *Client) Close() error {
65+
c.lock.Lock()
66+
defer c.lock.Unlock()
67+
68+
// Close SFTP first, then SSH
69+
var sftpErr, sshErr error
70+
if c.sftpClient != nil {
71+
sftpErr = c.sftpClient.Close()
72+
}
73+
if c.sshClient != nil {
74+
sshErr = c.sshClient.Close()
75+
}
76+
77+
// Return the first error encountered
78+
if sftpErr != nil {
79+
return sftpErr
80+
}
81+
return sshErr
82+
}
83+
84+
func (c *Client) list(path string) ([]os.FileInfo, error) {
85+
var fi []os.FileInfo
86+
87+
fn := func() error {
88+
var err error
89+
fi, err = c.sftpClient.ReadDir(path)
90+
return err
91+
}
92+
93+
err := c.withReconnection(fn)
94+
if err != nil {
95+
return nil, err
96+
}
97+
98+
return fi, nil
99+
}
100+
101+
func (c *Client) create(path string) (*sftpClient.File, string, error) {
102+
dir, fileName := sftpClient.Split(path)
103+
104+
var file *sftpClient.File
105+
106+
createFn := func() error {
107+
cErr := c.sftpClient.MkdirAll(dir)
108+
if cErr != nil {
109+
return cErr
110+
}
111+
112+
file, cErr = c.sftpClient.Create(path)
113+
if cErr != nil {
114+
return cErr
115+
}
116+
117+
return nil
118+
}
119+
120+
rErr := c.withReconnection(createFn)
121+
if rErr != nil {
122+
return nil, "", rErr
123+
}
124+
125+
return file, fileName, nil
126+
}
127+
128+
func (c *Client) get(path string) (*sftpClient.File, error) {
129+
var f *sftpClient.File
130+
131+
fn := func() error {
132+
var err error
133+
f, err = c.sftpClient.Open(path)
134+
return err
135+
}
136+
137+
err := c.withReconnection(fn)
138+
if err != nil {
139+
return nil, err
140+
}
141+
142+
return f, nil
143+
}
144+
145+
func (c *Client) delete(path string) error {
146+
fn := func() error {
147+
return c.sftpClient.Remove(path)
148+
}
149+
150+
err := c.withReconnection(fn)
151+
if err != nil {
152+
return err
153+
}
154+
155+
return nil
156+
}
157+
158+
func (c *Client) ping() error {
159+
_, err := c.sftpClient.Getwd()
160+
if err != nil {
161+
return err
162+
}
163+
return nil
164+
}
165+
166+
func (c *Client) withReconnection(fn func() error) error {
167+
err := c.do(fn)
168+
if !c.shouldReconnect(err) {
169+
return err
170+
}
171+
172+
c.log.Debugf("sftp binding error: %s", err)
173+
c.needsReconnect.Store(true)
174+
175+
rErr := c.doReconnect()
176+
if rErr != nil {
177+
c.log.Debugf("sftp binding error: reconnect failed: %s", rErr)
178+
return errors.Join(err, rErr)
179+
}
180+
c.log.Debugf("sftp binding: reconnected to %s", c.address)
181+
182+
c.log.Debugf("sftp binding: retrying operation")
183+
return c.do(fn)
184+
}
185+
186+
func (c *Client) do(fn func() error) error {
187+
c.lock.RLock()
188+
defer c.lock.RUnlock()
189+
return fn()
190+
}
191+
192+
func (c *Client) doReconnect() error {
193+
c.lock.Lock()
194+
defer c.lock.Unlock()
195+
196+
c.log.Debugf("sftp binding: reconnecting to %s", c.address)
197+
198+
if !c.needsReconnect.Load() {
199+
return nil
200+
}
201+
202+
pErr := c.ping()
203+
if pErr == nil {
204+
c.needsReconnect.Store(false)
205+
return nil
206+
}
207+
208+
sshClient, err := newSSHClient(c.address, c.config)
209+
if err != nil {
210+
return err
211+
}
212+
213+
newSftpClient, err := sftpClient.NewClient(sshClient)
214+
if err != nil {
215+
_ = sshClient.Close()
216+
return fmt.Errorf("sftp binding error: error create sftp client: %w", err)
217+
}
218+
219+
if c.sftpClient != nil {
220+
_ = c.sftpClient.Close()
221+
}
222+
if c.sshClient != nil {
223+
_ = c.sshClient.Close()
224+
}
225+
226+
c.sftpClient = newSftpClient
227+
c.sshClient = sshClient
228+
229+
c.needsReconnect.Store(false)
230+
return nil
231+
}
232+
233+
func newSSHClient(address string, config *ssh.ClientConfig) (*ssh.Client, error) {
234+
sshClient, err := ssh.Dial("tcp", address, config)
235+
if err != nil {
236+
return nil, fmt.Errorf("sftp binding error: error dialing ssh server: %w", err)
237+
}
238+
return sshClient, nil
239+
}
240+
241+
// shouldReconnect returns true if the error looks like a transport-level failure
242+
func (c *Client) shouldReconnect(err error) bool {
243+
if err == nil {
244+
return false
245+
}
246+
247+
// SFTP status errors that are logical, not connectivity (avoid reconnect)
248+
if errors.Is(err, sftpClient.ErrSSHFxPermissionDenied) ||
249+
errors.Is(err, sftpClient.ErrSSHFxNoSuchFile) ||
250+
errors.Is(err, sftpClient.ErrSSHFxOpUnsupported) {
251+
return false
252+
}
253+
254+
return true
255+
}

bindings/sftp/docker-compose.yaml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
services:
2+
sftp:
3+
image:
4+
atmoz/sftp
5+
environment:
6+
- SFTP_USERS=foo:pass:1001:1001:upload
7+
volumes:
8+
- ./upload:/home/foo/upload
9+
ports:
10+
- "2222:22"
11+

bindings/sftp/sftp.go

Lines changed: 31 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,16 @@
1+
/*
2+
Copyright 2025 The Dapr Authors
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
114
package sftp
215

316
import (
@@ -25,9 +38,9 @@ const (
2538

2639
// Sftp is a binding for file operations on sftp server.
2740
type Sftp struct {
28-
metadata *sftpMetadata
29-
logger logger.Logger
30-
sftpClient *sftpClient.Client
41+
metadata *sftpMetadata
42+
logger logger.Logger
43+
c *Client
3144
}
3245

3346
// sftpMetadata defines the sftp metadata.
@@ -115,19 +128,12 @@ func (sftp *Sftp) Init(_ context.Context, metadata bindings.Metadata) error {
115128
HostKeyCallback: hostKeyCallback,
116129
}
117130

118-
sshClient, err := ssh.Dial("tcp", m.Address, config)
119-
if err != nil {
120-
return fmt.Errorf("sftp binding error: error create ssh client: %w", err)
121-
}
122-
123-
newSftpClient, err := sftpClient.NewClient(sshClient)
131+
sftp.metadata = m
132+
sftp.c, err = newClient(m.Address, config, sftp.logger)
124133
if err != nil {
125-
return fmt.Errorf("sftp binding error: error create sftp client: %w", err)
134+
return fmt.Errorf("sftp binding error: create sftp client error: %w", err)
126135
}
127136

128-
sftp.metadata = m
129-
sftp.sftpClient = newSftpClient
130-
131137
return nil
132138
}
133139

@@ -161,14 +167,9 @@ func (sftp *Sftp) create(_ context.Context, req *bindings.InvokeRequest) (*bindi
161167
return nil, fmt.Errorf("sftp binding error: %w", err)
162168
}
163169

164-
dir, fileName := sftpClient.Split(path)
170+
c := sftp.c
165171

166-
err = sftp.sftpClient.MkdirAll(dir)
167-
if err != nil {
168-
return nil, fmt.Errorf("sftp binding error: error create dir %s: %w", dir, err)
169-
}
170-
171-
file, err := sftp.sftpClient.Create(path)
172+
file, fileName, err := c.create(path)
172173
if err != nil {
173174
return nil, fmt.Errorf("sftp binding error: error create file %s: %w", path, err)
174175
}
@@ -211,7 +212,9 @@ func (sftp *Sftp) list(_ context.Context, req *bindings.InvokeRequest) (*binding
211212
return nil, fmt.Errorf("sftp binding error: %w", err)
212213
}
213214

214-
files, err := sftp.sftpClient.ReadDir(path)
215+
c := sftp.c
216+
217+
files, err := c.list(path)
215218
if err != nil {
216219
return nil, fmt.Errorf("sftp binding error: error read dir %s: %w", path, err)
217220
}
@@ -246,7 +249,9 @@ func (sftp *Sftp) get(_ context.Context, req *bindings.InvokeRequest) (*bindings
246249
return nil, fmt.Errorf("sftp binding error: %w", err)
247250
}
248251

249-
file, err := sftp.sftpClient.Open(path)
252+
c := sftp.c
253+
254+
file, err := c.get(path)
250255
if err != nil {
251256
return nil, fmt.Errorf("sftp binding error: error open file %s: %w", path, err)
252257
}
@@ -272,7 +277,9 @@ func (sftp *Sftp) delete(_ context.Context, req *bindings.InvokeRequest) (*bindi
272277
return nil, fmt.Errorf("sftp binding error: %w", err)
273278
}
274279

275-
err = sftp.sftpClient.Remove(path)
280+
c := sftp.c
281+
282+
err = c.delete(path)
276283
if err != nil {
277284
return nil, fmt.Errorf("sftp binding error: error remove file %s: %w", path, err)
278285
}
@@ -296,7 +303,7 @@ func (sftp *Sftp) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bin
296303
}
297304

298305
func (sftp *Sftp) Close() error {
299-
return sftp.sftpClient.Close()
306+
return sftp.c.Close()
300307
}
301308

302309
func (metadata sftpMetadata) getPath(requestMetadata map[string]string) (path string, err error) {

0 commit comments

Comments
 (0)