@@ -26,32 +26,37 @@ def __init__(
26
26
self .mode = mode
27
27
self .max_requests = max_requests or _MAX_SFTP_REQUESTS
28
28
29
- if block_size is None :
30
- # "The OpenSSH SFTP server will close the connection
31
- # if it receives a message larger than 256 KB, and
32
- # limits read requests to returning no more than
33
- # 64 KB."
34
- #
35
- # We are going to use the maximum block_size possible
36
- # with a 16KB margin (so instead of sending 256 KB data,
37
- # we'll send 240 KB + headers for write requests)
38
-
39
- if self .readable ():
40
- block_size = READ_BLOCK_SIZE
41
- else :
42
- block_size = WRITE_BLOCK_SIZE
43
-
44
29
# The blocksize is often used with constructs like
45
30
# shutil.copyfileobj(src, dst, length=file.blocksize) and since we are
46
31
# using pipelining, we are going to reflect the total size rather than
47
32
# a size of chunk to our limits.
48
- self .blocksize = block_size * self .max_requests
33
+ self .blocksize = (
34
+ None if block_size is None else block_size * self .max_requests
35
+ )
49
36
50
37
self .kwargs = kwargs
51
38
52
39
self ._file = sync (self .loop , self ._open_file )
53
40
self ._closed = False
54
41
42
+ def _determine_block_size (self , channel ):
43
+ # Use the asyncssh block sizes to ensure the best performance.
44
+ limits = getattr (channel , "limits" , None )
45
+ if limits :
46
+ if self .readable ():
47
+ return limits .max_read_len
48
+ return limits .max_write_len
49
+
50
+ # "The OpenSSH SFTP server will close the connection
51
+ # if it receives a message larger than 256 KB, and
52
+ # limits read requests to returning no more than
53
+ # 64 KB."
54
+ #
55
+ # We are going to use the maximum block_size possible
56
+ # with a 16KB margin (so instead of sending 256 KB data,
57
+ # we'll send 240 KB + headers for write requests)
58
+ return READ_BLOCK_SIZE if self .readable () else WRITE_BLOCK_SIZE
59
+
55
60
@wrap_exceptions
56
61
async def _open_file (self ):
57
62
# TODO: this needs to keep a reference to the
@@ -61,6 +66,10 @@ async def _open_file(self):
61
66
# it's operations but the pool it thinking this
62
67
# channel is freed.
63
68
async with self .fs ._pool .get () as channel :
69
+ if self .blocksize is None :
70
+ self .blocksize = (
71
+ self ._determine_block_size (channel ) * self .max_requests
72
+ )
64
73
return await channel .open (
65
74
self .path ,
66
75
self .mode ,
0 commit comments