Skip to content

Commit f5988bb

Browse files
[FIX] Error in uploading new file with put (#332)
* fix: handle fsspec._info for directory in s3 * test: for put, info, and isdir sync and async * docs: add ref link --------- Co-authored-by: Kyle Barron <[email protected]>
1 parent b4e8d97 commit f5988bb

File tree

2 files changed

+127
-12
lines changed

2 files changed

+127
-12
lines changed

obstore/python/obstore/fsspec.py

+21-9
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ def __init__( # noqa: PLR0913
160160
self,
161161
protocol: SUPPORTED_PROTOCOLS_T | str | None = None,
162162
*args: Any,
163-
config: (S3Config | GCSConfig | AzureConfig | None) = None,
163+
config: S3Config | GCSConfig | AzureConfig | None = None,
164164
client_options: ClientConfig | None = None,
165165
retry_config: RetryConfig | None = None,
166166
asynchronous: bool = False,
@@ -480,9 +480,19 @@ async def _info(self, path: str, **_kwargs: Any) -> dict[str, Any]:
480480
"version": head["version"],
481481
}
482482
except FileNotFoundError:
483-
# use info in fsspec.AbstractFileSystem
484-
loop = asyncio.get_running_loop()
485-
return await loop.run_in_executor(None, super().info, path, **_kwargs)
483+
pass
484+
485+
# Ref: https://github.com/fsspec/s3fs/blob/01b9c4b838b81375093ae1d78562edf6bdc616ea/s3fs/core.py#L1471-L1492
486+
# We check to see if the path is a directory by attempting to list its
487+
# contexts. If anything is found, it is indeed a directory
488+
out = await self._ls(path, detail=True)
489+
if len(out) > 0:
490+
return {
491+
"name": f"{bucket}/{path_no_bucket}",
492+
"type": "directory",
493+
"size": 0,
494+
}
495+
raise FileNotFoundError(path)
486496

487497
@overload
488498
async def _ls(
@@ -785,11 +795,13 @@ def loc(self, value: int) -> None:
785795

786796

787797
def register(
788-
protocol: SUPPORTED_PROTOCOLS_T
789-
| str
790-
| Iterable[SUPPORTED_PROTOCOLS_T]
791-
| Iterable[str]
792-
| None = None,
798+
protocol: (
799+
SUPPORTED_PROTOCOLS_T
800+
| str
801+
| Iterable[SUPPORTED_PROTOCOLS_T]
802+
| Iterable[str]
803+
| None
804+
) = None,
793805
*,
794806
asynchronous: bool = False,
795807
) -> None:

tests/test_fsspec.py

+106-3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
from tests.conftest import TEST_BUCKET_NAME
1515

1616
if TYPE_CHECKING:
17+
from pathlib import Path
18+
1719
from obstore.store import S3Config
1820

1921

@@ -227,6 +229,107 @@ async def test_list_async(s3_store_config: S3Config):
227229
assert out[1]["type"] == "directory"
228230

229231

232+
def test_info(fs: FsspecStore):
233+
fs.pipe_file(f"{TEST_BUCKET_NAME}/dir/afile", b"data")
234+
235+
# info for directory
236+
out = fs.info(f"{TEST_BUCKET_NAME}/dir")
237+
assert out == {
238+
"name": f"{TEST_BUCKET_NAME}/dir",
239+
"type": "directory",
240+
"size": 0,
241+
}
242+
243+
# info for file not exist
244+
with pytest.raises(FileNotFoundError):
245+
fs.info(f"{TEST_BUCKET_NAME}/dir/bfile")
246+
247+
# info for directory not exist
248+
with pytest.raises(FileNotFoundError):
249+
fs.info(f"{TEST_BUCKET_NAME}/dir_1/")
250+
251+
# also test with isdir
252+
assert fs.isdir(f"{TEST_BUCKET_NAME}/dir")
253+
assert not fs.isdir(f"{TEST_BUCKET_NAME}/dir/afile")
254+
assert not fs.isdir(f"{TEST_BUCKET_NAME}/dir/bfile")
255+
assert not fs.isdir(f"{TEST_BUCKET_NAME}/dir_1/")
256+
257+
258+
@pytest.mark.asyncio
259+
async def test_info_async(fs: FsspecStore):
260+
await fs._pipe_file(f"{TEST_BUCKET_NAME}/dir/afile", b"data")
261+
262+
# info for directory
263+
out = await fs._info(f"{TEST_BUCKET_NAME}/dir")
264+
assert out == {
265+
"name": f"{TEST_BUCKET_NAME}/dir",
266+
"type": "directory",
267+
"size": 0,
268+
}
269+
270+
# info for file not exist
271+
with pytest.raises(FileNotFoundError):
272+
await fs._info(f"{TEST_BUCKET_NAME}/dir/bfile")
273+
274+
# info for directory not exist
275+
with pytest.raises(FileNotFoundError):
276+
await fs._info(f"{TEST_BUCKET_NAME}/dir_1/")
277+
278+
# also test with isdir
279+
assert await fs._isdir(f"{TEST_BUCKET_NAME}/dir")
280+
assert not await fs._isdir(f"{TEST_BUCKET_NAME}/dir/afile")
281+
assert not await fs._isdir(f"{TEST_BUCKET_NAME}/dir/bfile")
282+
assert not await fs._isdir(f"{TEST_BUCKET_NAME}/dir_1/")
283+
284+
285+
def test_put_files(fs: FsspecStore, tmp_path: Path):
286+
"""Test put new file to S3 synchronously."""
287+
test_data = "Hello, World!"
288+
local_file_path = tmp_path / "test_file.txt"
289+
local_file_path.write_text(test_data)
290+
291+
assert local_file_path.read_text() == test_data
292+
remote_file_path = f"{TEST_BUCKET_NAME}/uploaded_test_file.txt"
293+
294+
fs.put(str(local_file_path), remote_file_path)
295+
296+
# Verify file upload
297+
assert remote_file_path in fs.ls(f"{TEST_BUCKET_NAME}", detail=False)
298+
assert fs.cat(remote_file_path)[remote_file_path] == test_data.encode()
299+
300+
# Cleanup remote file
301+
fs.rm(remote_file_path)
302+
303+
304+
@pytest.mark.asyncio
305+
async def test_put_files_async(s3_store_config: S3Config, tmp_path: Path):
306+
"""Test put new file to S3 asynchronously."""
307+
register("s3")
308+
fs = fsspec.filesystem(
309+
"s3",
310+
config=s3_store_config,
311+
client_options={"allow_http": True},
312+
asynchronous=True,
313+
)
314+
315+
test_data = "Hello, World!"
316+
local_file_path = tmp_path / "test_file.txt"
317+
local_file_path.write_text(test_data)
318+
319+
assert local_file_path.read_text() == test_data
320+
remote_file_path = f"{TEST_BUCKET_NAME}/uploaded_test_file.txt"
321+
322+
await fs._put(str(local_file_path), remote_file_path)
323+
324+
# Verify file upload
325+
assert remote_file_path in await fs._ls(f"{TEST_BUCKET_NAME}", detail=False)
326+
out = await fs._cat([remote_file_path])
327+
assert out[remote_file_path] == test_data.encode()
328+
329+
# Cleanup remote file
330+
await fs._rm(remote_file_path)
331+
332+
230333
@pytest.mark.network
231334
def test_remote_parquet(s3_store_config: S3Config):
232335
register(["https", "s3"])
@@ -256,9 +359,9 @@ def test_remote_parquet(s3_store_config: S3Config):
256359

257360
# Read Parquet file from s3 and verify its contents
258361
parquet_table = pq.read_table(write_parquet_path, filesystem=fs_s3)
259-
assert parquet_table.equals(table), (
260-
"Parquet file contents from s3 do not match the original file"
261-
)
362+
assert parquet_table.equals(
363+
table,
364+
), "Parquet file contents from s3 do not match the original file"
262365

263366

264367
def test_multi_file_ops(fs: FsspecStore):

0 commit comments

Comments
 (0)