@@ -26,32 +26,34 @@ def __init__(
26
26
self .mode = mode
27
27
self .max_requests = max_requests or _MAX_SFTP_REQUESTS
28
28
29
- if block_size is None :
30
- # "The OpenSSH SFTP server will close the connection
31
- # if it receives a message larger than 256 KB, and
32
- # limits read requests to returning no more than
33
- # 64 KB."
34
- #
35
- # We are going to use the maximum block_size possible
36
- # with a 16KB margin (so instead of sending 256 KB data,
37
- # we'll send 240 KB + headers for write requests)
38
-
39
- if self .readable ():
40
- block_size = READ_BLOCK_SIZE
41
- else :
42
- block_size = WRITE_BLOCK_SIZE
43
-
44
29
# The blocksize is often used with constructs like
45
30
# shutil.copyfileobj(src, dst, length=file.blocksize) and since we are
46
31
# using pipelining, we are going to reflect the total size rather than
47
32
# a size of chunk to our limits.
48
- self .blocksize = block_size * self .max_requests
33
+ self .blocksize = None if block_size is None else block_size * self .max_requests
49
34
50
35
self .kwargs = kwargs
51
36
52
37
self ._file = sync (self .loop , self ._open_file )
53
38
self ._closed = False
54
39
40
+ def _determine_block_size (self , channel ):
41
+ # Use the asyncssh block sizes to ensure the best performance.
42
+ limits = getattr (channel , "limits" , None )
43
+ if limits :
44
+ return limits .max_read_len if self .readable () else limits .max_write_len
45
+
46
+ # "The OpenSSH SFTP server will close the connection
47
+ # if it receives a message larger than 256 KB, and
48
+ # limits read requests to returning no more than
49
+ # 64 KB."
50
+ #
51
+ # We are going to use the maximum block_size possible
52
+ # with a 16KB margin (so instead of sending 256 KB data,
53
+ # we'll send 240 KB + headers for write requests)
54
+ return READ_BLOCK_SIZE if self .readable () else WRITE_BLOCK_SIZE
55
+
56
+
55
57
@wrap_exceptions
56
58
async def _open_file (self ):
57
59
# TODO: this needs to keep a reference to the
@@ -61,6 +63,8 @@ async def _open_file(self):
61
63
# it's operations but the pool it thinking this
62
64
# channel is freed.
63
65
async with self .fs ._pool .get () as channel :
66
+ if self .blocksize is None :
67
+ self .blocksize = self ._determine_block_size (channel ) * self .max_requests
64
68
return await channel .open (
65
69
self .path ,
66
70
self .mode ,
0 commit comments