Skip to content

Commit b45f02a

Browse files
authored
Merge pull request #72 from RRosio/transfer_tests
Update upload and download tests
2 parents 676c1a3 + fd74761 commit b45f02a

File tree

4 files changed

+138
-250
lines changed

4 files changed

+138
-250
lines changed

conftest.py

Lines changed: 47 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55
from moto.moto_server.threaded_moto_server import ThreadedMotoServer
66
from jupyter_fsspec.file_manager import FileSystemManager
77

8+
PORT = 5555
9+
ENDPOINT_URI = "http://127.0.0.1:%s/" % PORT
10+
11+
812
pytest_plugins = [
913
"pytest_jupyter.jupyter_server",
1014
"jupyter_server.pytest_plugin",
@@ -16,28 +20,39 @@
1620
def setup_tmp_local(tmp_path: Path):
1721
local_root = tmp_path / "test"
1822
local_root.mkdir()
23+
nested_dir = local_root / "nested"
24+
nested_dir.mkdir()
25+
nested_file1 = nested_dir / ".empty"
26+
nested_file1.touch()
27+
nested_file2 = nested_dir / ".keep"
28+
nested_file2.touch()
1929
local_file = local_root / "file_loc.txt"
2030
local_file.touch()
31+
2132
local_empty_root = tmp_path / "empty"
2233
local_empty_root.mkdir()
2334

24-
return [local_root, local_empty_root]
35+
yield [local_root, local_empty_root]
2536

2637

2738
@pytest.fixture(scope="function", autouse=True)
2839
def setup_config_file_fs(tmp_path: Path, setup_tmp_local):
2940
tmp_local = setup_tmp_local[0]
41+
items_tmp_local = list(tmp_local.iterdir())
42+
print(f"items_tmp_local: {items_tmp_local}")
3043
empty_tmp_local = setup_tmp_local[1]
3144
config_dir = tmp_path / "config"
3245
config_dir.mkdir(exist_ok=True)
3346

3447
yaml_content = f"""sources:
3548
- name: "TestSourceAWS"
3649
path: "s3://my-test-bucket/"
37-
additional_options:
50+
kwargs:
3851
anon: false
3952
key: "my-access-key"
4053
secret: "my-secret-key"
54+
client_kwargs:
55+
endpoint_url: "{ENDPOINT_URI}"
4156
- name: "TestDir"
4257
path: "{tmp_local}"
4358
protocol: "local"
@@ -57,11 +72,11 @@ def setup_config_file_fs(tmp_path: Path, setup_tmp_local):
5772
print(f"Patching jupyter_config_dir to: {config_dir}")
5873
fs_manager = FileSystemManager(config_file="jupyter-fsspec.yaml")
5974

60-
return fs_manager
75+
yield fs_manager
6176

6277

6378
@pytest.fixture(scope="function")
64-
def fs_manager_instance(setup_config_file_fs):
79+
def fs_manager_instance(setup_config_file_fs, s3_client):
6580
fs_manager = setup_config_file_fs
6681
fs_info = fs_manager.get_filesystem_by_protocol("memory")
6782
mem_fs = fs_info["info"]["instance"]
@@ -88,8 +103,7 @@ def fs_manager_instance(setup_config_file_fs):
88103
print("In memory filesystem NOT FOUND")
89104

90105
if mem_fs.exists(f"{mem_root_path}/test_dir/file1.txt"):
91-
file_info = mem_fs.info(f"{mem_root_path}/test_dir/file1.txt")
92-
print(f"File exists. size: {file_info}")
106+
mem_fs.info(f"{mem_root_path}/test_dir/file1.txt")
93107
else:
94108
print("File does not exist!")
95109
return fs_manager
@@ -98,120 +112,45 @@ def fs_manager_instance(setup_config_file_fs):
98112
def get_boto3_client():
99113
from botocore.session import Session
100114

101-
# NB: we use the sync botocore client for setup
102115
session = Session()
116+
return session.create_client(
117+
"s3",
118+
endpoint_url=ENDPOINT_URI,
119+
aws_access_key_id="my-access-key",
120+
aws_secret_access_key="my-secret-key",
121+
)
103122

104-
endpoint_uri = "http://127.0.0.1:%s/" % "5555"
105-
return session.create_client("s3", endpoint_url=endpoint_uri)
106-
107-
108-
@pytest.fixture(scope="function")
109-
def s3_client(mock_s3_fs):
110-
s3_client = get_boto3_client()
111-
s3_client.create_bucket(Bucket="my-test-bucket")
112-
return s3_client
113-
114-
115-
@pytest.fixture(scope="function")
116-
def s3_fs_manager_instance(setup_config_file_fs):
117-
fs_manager = setup_config_file_fs
118-
# fs_info = fs_manager.get_filesystem_by_protocol("s3")
119-
# key = fs_info["key"]
120-
# fs = fs_info["info"]["instance"]
121-
# root_path = fs_info["info"]["path"]
122-
123-
# endpoint_uri = "http://127.0.0.1:%s/" % "5555"
124-
# fs = fsspec.filesystem('s3', asynchronous=True, anon=False, client_kwargs={'endpoint_url': endpoint_uri})
125-
return fs_manager
126-
127-
128-
@pytest.fixture(params=["memory", "local", "s3"])
129-
def filesystem_protocol(request):
130-
return request.param
131123

132-
133-
@pytest.fixture(scope="function")
134-
def populated_file_system(filesystem_protocol):
135-
fs_manager = FileSystemManager(config_file="jupyter-fsspec.yaml")
136-
fs_protocol = filesystem_protocol
137-
fs_info = fs_manager.get_filesystem_by_protocol(fs_protocol)
138-
fs = fs_info["info"]["instance"]
139-
140-
if fs:
141-
# Delete any existting directories
142-
# Populate the filesystem
143-
# mkdir => root_path + 'rootA'
144-
# mkdir => root_path + 'rootB'
145-
# touch => root_path + 'file1.txt'
146-
# touch => root_path + 'rootA' + 'file_in_rootA.txt'
147-
print(f"valid filesystem: {fs}")
148-
else:
149-
print(f"invalid filesystem: {fs}")
150-
return {"fs_protocol": fs_protocol, "fs_manager": fs_manager}
151-
152-
153-
# TODO: Update this fixture from s3fs
154-
@pytest.fixture(scope="function")
155-
def mock_s3_fs():
156-
# This fixture is module-scoped, meaning that we can re-use the MotoServer across all tests
157-
server = ThreadedMotoServer(ip_address="127.0.0.1", port=5555)
124+
@pytest.fixture(scope="module")
125+
def s3_base():
126+
server = ThreadedMotoServer(ip_address="127.0.0.1", port=PORT)
158127
server.start()
159128
if "AWS_SECRET_ACCESS_KEY" not in os.environ:
160-
os.environ["AWS_SECRET_ACCESS_KEY"] = "foo"
129+
os.environ["AWS_SECRET_ACCESS_KEY"] = "my-accesss-key"
161130
if "AWS_ACCESS_KEY_ID" not in os.environ:
162-
os.environ["AWS_ACCESS_KEY_ID"] = "foo"
163-
# aws_session_token=os.environ["AWS_SESSION_TOKEN"]
164-
if "AWS_SESSION_TOKEN" not in os.environ:
165-
os.environ["AWS_SESSION_TOKEN"] = "foo"
131+
os.environ["AWS_ACCESS_KEY_ID"] = "my-secret-key"
132+
166133
print("server up")
167134
yield
168135
print("moto done")
169136
server.stop()
170137

171138

172-
@pytest.fixture(scope="function")
173-
def fs_manager_instance_parameterized(populated_file_system):
174-
fs_ret = populated_file_system
175-
fs_protocol = fs_ret["fs_protocol"]
176-
fs_manager = fs_ret["fs_manager"]
177-
fs_info = fs_manager.get_filesystem_by_protocol(fs_protocol)
178-
fs = fs_info["info"]["instance"]
179-
root_path = fs_info["info"]["path"]
180-
181-
# fs_info = fs_manager.get_filesystem_by_protocol('local')
182-
# key = fs_info['key']
183-
# fs = fs_info['info']['instance']
184-
# local_root_path = fs_info['info']['path']
185-
186-
if fs:
187-
# TODO: Update file creation FOR PATHS!!!
188-
if fs.exists(f"{root_path}/test_dir"):
189-
print(f"{root_path}/test_dir EXISTS!!!!")
190-
# fs.rm(f'{root_path}/test_dir', recursive=True)
191-
if fs.exists(f"{root_path}/second_dir"):
192-
print(f"{root_path}/second_dir EXISTS!!!!")
193-
# fs.rm('/my_dir/second_dir', recursive=True)
194-
195-
fs.touch(f"{root_path}/file_in_root.txt")
196-
with fs.open(f"{root_path}/file_in_root.txt", "wb") as f:
197-
f.write("Root file content".encode())
198-
199-
# fs.mkdir('/my_dir/test_dir', exist_ok=True)
200-
# fs.mkdir('/my_dir/second_dir', exist_ok=True)
201-
# # fs.mkdir('/my_dir/second_dir/subdir', exist_ok=True)
202-
# fs.touch('/my_dir/test_dir/file1.txt')
203-
# with fs.open('/my_dir/test_dir/file1.txt', "wb") as f:
204-
# f.write("Test content".encode())
205-
# f.close()
206-
else:
207-
print(f"Filesystem of protocol {fs_protocol} NOT FOUND")
208-
209-
if fs.exists(f"{root_path}test_dir/file1.txt"):
210-
file_info = fs.info(f"/{root_path}/test_dir/file1.txt")
211-
print(f"File exists. size: {file_info}")
212-
else:
213-
print("File does not exist!")
214-
return fs_manager
139+
@pytest.fixture(scope="module")
140+
def s3_client(s3_base):
141+
client = get_boto3_client()
142+
client.create_bucket(Bucket="my-test-bucket", ACL="public-read")
143+
client.put_object(
144+
Body=b"Hello, World1!", Bucket="my-test-bucket", Key="bucket-filename1.txt"
145+
)
146+
client.put_object(
147+
Body=b"Hello, World2!", Bucket="my-test-bucket", Key="some/bucket-filename2.txt"
148+
)
149+
client.put_object(
150+
Body=b"Hello, World3!", Bucket="my-test-bucket", Key="some/bucket-filename3.txt"
151+
)
152+
153+
yield client
215154

216155

217156
@pytest.fixture

jupyter_fsspec/file_manager.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,3 +211,8 @@ def get_filesystem_by_protocol(self, fs_protocol):
211211
if fs_info.get("protocol") == fs_protocol:
212212
return {"key": encoded_key, "info": fs_info}
213213
return None
214+
215+
def get_filesystem_protocol(self, key):
216+
filesystem_rep = self.filesystems.get(key)
217+
print(f"filesystem_rep: {filesystem_rep}")
218+
return filesystem_rep["protocol"] + "://"

jupyter_fsspec/handlers.py

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
1+
from .file_manager import FileSystemManager
12
from jupyter_server.base.handlers import APIHandler
23
from jupyter_server.utils import url_path_join
4+
from .utils import parse_range
35
import tornado
46
import json
57
import traceback
8+
import logging
69

7-
from .file_manager import FileSystemManager
8-
from .utils import parse_range
10+
logging.basicConfig(level=logging.INFO)
11+
logger = logging.getLogger(__name__)
912

1013

1114
class BaseFileSystemHandler(APIHandler):
@@ -155,7 +158,7 @@ def initialize(self, fs_manager):
155158
self.fs_manager = fs_manager
156159

157160
# POST /jupyter_fsspec/files/action?key=my-key&item_path=/some_directory/file.txt
158-
def post(self):
161+
async def post(self):
159162
"""Upload/Download the resource at the input path to destination path.
160163
161164
:param [key]: [Query arg string used to retrieve the appropriate filesystem instance]
@@ -166,32 +169,39 @@ def post(self):
166169
:return: dict with a status, description and (optionally) error
167170
:rtype: dict
168171
"""
169-
key = self.get_argument("key")
170172
request_data = json.loads(self.request.body.decode("utf-8"))
171-
req_item_path = request_data.get("item_path")
172173
action = request_data.get("action")
173-
destination = request_data.get("content")
174-
# dest_fs_key = request_data.get('destination_key')
175-
# dest_fs_info = self.fs_manager.get_filesystem(dest_fs_key)
176-
# dest_path = dest_fs_info["path"]
174+
local_path = request_data.get("local_path")
175+
remote_path = request_data.get("remote_path")
176+
dest_fs_key = request_data.get("destination_key")
177+
dest_fs_info = self.fs_manager.get_filesystem(dest_fs_key)
178+
dest_path = dest_fs_info["canonical_path"]
179+
# if destination is subfolder, need to parse canonical_path for protocol?
177180

178181
response = {"content": None}
179182

180-
fs, item_path = self.validate_fs("post", key, req_item_path)
183+
fs, dest_path = self.validate_fs("post", dest_fs_key, dest_path)
181184
fs_instance = fs["instance"]
185+
print(f"fs_instance: {fs_instance}")
182186

183187
try:
184188
if action == "upload":
185-
# upload fs_instance.put(local_path, remote_path)
186-
local_path = item_path
187-
remote_path = destination
188-
fs_instance.put(local_path, remote_path, recursive=True)
189+
# upload remote.put(local_path, remote_path)
190+
logger.debug("Upload file")
191+
protocol = self.fs_manager.get_filesystem_protocol(dest_fs_key)
192+
if protocol not in remote_path:
193+
remote_path = protocol + remote_path
194+
# TODO: handle creating directories? current: flat item upload
195+
# remote_path = remote_path (root) + 'nested/'
196+
await fs_instance._put(local_path, remote_path, recursive=True)
189197
response["description"] = f"Uploaded {local_path} to {remote_path}."
190-
else: # download
191-
# download fs_instance.get(remote_path, local_path)
192-
local_path = destination
193-
remote_path = item_path
194-
fs_instance.get(remote_path, local_path, recursive=True)
198+
else:
199+
# download remote.get(remote_path, local_path)
200+
logger.debug("Download file")
201+
protocol = self.fs_manager.get_filesystem_protocol(dest_fs_key)
202+
if protocol not in remote_path:
203+
remote_path = protocol + remote_path
204+
await fs_instance._get(remote_path, local_path, recursive=True)
195205
response["description"] = f"Downloaded {remote_path} to {local_path}."
196206

197207
response["status"] = "success"

0 commit comments

Comments
 (0)