Skip to content

[FEAT] support df.to_parquet and df.read_parquet() #165

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
2b5f6b5
feat: add write() for open() in fsspec
machichima Jan 27, 2025
361b30d
temp: upload with iterator
machichima Jan 27, 2025
e0ec01a
refactor: rename data_li to buffer
machichima Jan 27, 2025
d75f3e8
feat: buffered write in fsspec
machichima Jan 29, 2025
75d3734
fix: remove unused code
machichima Jan 29, 2025
f958ec8
Merge branch 'main' into fsspec-open-write
machichima Jan 30, 2025
b2f9d6f
fix: assert mode is either rb or wb
machichima Jan 30, 2025
07ae55d
fix: correctly detect file exist for read_parquet
machichima Jan 30, 2025
428a66d
run pre-commit
machichima Jan 30, 2025
bc4ffaa
feat: split bucket name from path in fsspec _open
machichima Feb 2, 2025
79e40a1
Update obstore/python/obstore/fsspec.py
kylebarron Feb 5, 2025
74dd9ed
fix: move incorrect mode exception into else
machichima Feb 6, 2025
8797944
fix: remove writer in init and add self.closed=True
machichima Feb 6, 2025
cf1856a
fix: self._writer not exist error in close
machichima Feb 9, 2025
ff5d6bd
fix: use info() in AbstractFileSystem
machichima Feb 9, 2025
fb3e6b2
Merge branch 'main' into fsspec-open-write
machichima Mar 1, 2025
19c2646
fix: typing and linting
machichima Mar 1, 2025
23febfb
feat: merge BufferedFileWrite/Read together
machichima Mar 2, 2025
ddcf6f9
test: for write to parquet
machichima Mar 2, 2025
27c62c2
docs: update docstring
machichima Mar 2, 2025
5947e8a
Use underlying reader/writer methods where possible
kylebarron Mar 3, 2025
7f229d0
Updated docs
kylebarron Mar 3, 2025
317e954
Override `loc` property
kylebarron Mar 3, 2025
0442a5f
loc setter to allow `__init__`
kylebarron Mar 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 72 additions & 14 deletions obstore/python/obstore/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import fsspec.spec

import obstore as obs
from obstore import Bytes
from obstore import open_reader, open_writer
from obstore.store import from_url

if TYPE_CHECKING:
Expand Down Expand Up @@ -220,6 +220,14 @@ def _construct_store(self, bucket: str) -> ObjectStore:
retry_config=self.retry_config,
)

def split_path(self, path: str) -> tuple[str, str]:
"""Public method to split a path into bucket and path components."""
return self._split_path(path)

def construct_store(self, bucket: str) -> ObjectStore:
"""Public method to construct a store for the given bucket."""
return self._construct_store(bucket)

async def _rm_file(self, path: str, **_kwargs: Any) -> None:
bucket, path = self._split_path(path)
store = self._construct_store(bucket)
Expand Down Expand Up @@ -462,13 +470,17 @@ def _open(
autocommit: Any = True, # noqa: ARG002
cache_options: Any = None, # noqa: ARG002
**kwargs: Any,
) -> BufferedFileSimple:
) -> BufferedFile:
"""Return raw bytes-mode file-like from the file-system."""
return BufferedFileSimple(self, path, mode, **kwargs)
if mode in ("wb", "rb"):
return BufferedFile(self, path, mode, **kwargs)

err_msg = f"Only 'rb' and 'wb' mode is currently supported, got: {mode}"
raise ValueError(err_msg)


class BufferedFileSimple(fsspec.spec.AbstractBufferedFile):
"""Implementation of buffered file around `fsspec.spec.AbstractBufferedFile`."""
class BufferedFile(fsspec.spec.AbstractBufferedFile):
"""Read/Write buffered file wrapped around `fsspec.spec.AbstractBufferedFile`."""

def __init__(
self,
Expand All @@ -478,25 +490,71 @@ def __init__(
**kwargs: Any,
) -> None:
"""Create new buffered file."""
if mode != "rb":
raise ValueError("Only 'rb' mode is currently supported")
super().__init__(fs, path, mode, **kwargs)

def read(self, length: int = -1) -> Any:
bucket, self.path = self.fs.split_path(path)
self.store = self.fs.construct_store(bucket)

self.mode = mode

if self.mode == "rb":
self._reader = open_reader(self.store, self.path)

def read(self, length: int = -1) -> Bytes:
"""Return bytes from the remote file.

Args:
length: if positive, returns up to this many bytes; if negative, return all
remaining bytes.

Returns:
Data in bytes

"""
if length < 0:
data = self.fs.cat_file(self.path, self.loc, self.size)
self.loc = self.size
else:
data = self.fs.cat_file(self.path, self.loc, self.loc + length)
self.loc += length
return data
length = self.size - self.loc

self._reader.seek(self.loc)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't seek on every underlying read call if we don't need to. Seeking discards the internal buffer kept by rust

(Forgot to post this comment earlier)

out = self._reader.read(length)

self.loc += length

return out

def _initiate_upload(self) -> None:
"""Call by AbstractBufferedFile flusH() on the first flush."""
self._writer = open_writer(self.store, self.path)

def _upload_chunk(self, final: bool = False) -> bool:
"""Call every time fsspec flushes the write buffer.

Returns:
Bool showing if chunk is updated

"""
if self.buffer and len(self.buffer.getbuffer()) > 0:
self.buffer.seek(0)
self._writer.write(self.buffer.read())
# flush all the data in buffer when closing
if final:
self._writer.flush()
return True

return False

def close(self) -> None:
"""Close file. Ensure flushing the buffer."""
if self.closed:
return

try:
if self.mode == "rb":
self._reader.close()
else:
self.flush(force=True)
self._writer.close()
finally:
self.closed = True


def register(protocol: str | Iterable[str], *, asynchronous: bool = False) -> None:
Expand Down
26 changes: 24 additions & 2 deletions tests/test_fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,16 +225,38 @@ async def test_list_async(s3_store_config: S3Config):


@pytest.mark.network
def test_remote_parquet():
register("https")
def test_remote_parquet(s3_store_config: S3Config):
register(["https", "s3"])
fs = fsspec.filesystem("https")
fs_s3 = fsspec.filesystem(
"s3",
config=s3_store_config,
client_options={"allow_http": True},
)

url = "github.com/opengeospatial/geoparquet/raw/refs/heads/main/examples/example.parquet" # noqa: E501
pq.read_metadata(url, filesystem=fs)

# also test with full url
url = "https://github.com/opengeospatial/geoparquet/raw/refs/heads/main/examples/example.parquet"
pq.read_metadata(url, filesystem=fs)

# Read the remote Parquet file into a PyArrow table
table = pq.read_table(url, filesystem=fs)
write_parquet_path = f"{TEST_BUCKET_NAME}/test.parquet"

# Write the table to s3
pq.write_table(table, write_parquet_path, filesystem=fs_s3)

out = fs_s3.ls(f"{TEST_BUCKET_NAME}", detail=False)
assert f"{TEST_BUCKET_NAME}/test.parquet" in out

# Read Parquet file from s3 and verify its contents
parquet_table = pq.read_table(write_parquet_path, filesystem=fs_s3)
assert parquet_table.equals(table), (
"Parquet file contents from s3 do not match the original file"
)


def test_multi_file_ops(fs: AsyncFsspecStore):
data = {
Expand Down
Loading