Skip to content

Commit d0d8e00

Browse files
committed
cleaned up
1 parent dd9c479 commit d0d8e00

2 files changed

Lines changed: 177 additions & 75 deletions

File tree

src/valor_lite/cache.py

Lines changed: 145 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ class DataType(StrEnum):
2020
TIMESTAMP = "timestamp"
2121

2222
def to_py(self):
23+
"""Get python type."""
2324
match self:
2425
case DataType.INTEGER:
2526
return int
@@ -31,6 +32,7 @@ def to_py(self):
3132
return datetime
3233

3334
def to_arrow(self):
35+
"""Get arrow type."""
3436
match self:
3537
case DataType.INTEGER:
3638
return pa.int64()
@@ -69,10 +71,35 @@ def __init__(self, path: str | Path):
6971

7072
@property
7173
def path(self) -> Path:
74+
"""Cache directory path."""
7275
return self._path
7376

74-
@property
75-
def files(self) -> list[Path]:
77+
@staticmethod
78+
def _generate_config_path(path: str | Path) -> Path:
79+
"""
80+
Generate cache configuration path.
81+
82+
Parameters
83+
----------
84+
path : str | Path
85+
Where the cache is stored.
86+
87+
Returns
88+
-------
89+
Path
90+
Configuration filepath.
91+
"""
92+
return Path(path) / ".cfg"
93+
94+
def get_files(self) -> list[Path]:
95+
"""
96+
Retrieve all files.
97+
98+
Returns
99+
-------
100+
list[Path]
101+
A list of paths to files in the cache.
102+
"""
76103
if not self.path.exists():
77104
return []
78105
files = []
@@ -82,26 +109,21 @@ def files(self) -> list[Path]:
82109
files.append(Path(full_path))
83110
return files
84111

85-
@property
86-
def num_files(self) -> int:
87-
return len(self.files)
112+
def get_dataset_files(self) -> list[Path]:
113+
"""
114+
Retrieve all dataset files.
88115
89-
@property
90-
def dataset_files(self) -> list[Path]:
116+
Returns
117+
-------
118+
list[Path]
119+
A list of paths to dataset files in the cache.
120+
"""
91121
if not self.path.exists():
92122
return []
93123
return [
94124
Path(filepath) for filepath in glob.glob(f"{self._path}/*.parquet")
95125
]
96126

97-
@property
98-
def num_dataset_files(self) -> int:
99-
return len(self.dataset_files)
100-
101-
@staticmethod
102-
def _generate_config_path(path: str | Path) -> Path:
103-
return Path(path) / ".cfg"
104-
105127

106128
class CacheReader(CacheFiles):
107129
def __init__(
@@ -118,9 +140,17 @@ def __init__(
118140

119141
@classmethod
120142
def load(cls, path: str | Path):
121-
path = Path(path)
143+
"""
144+
Load cache from disk.
145+
146+
Parameters
147+
----------
148+
path : str | Path
149+
Where the cache is stored.
150+
"""
122151

123152
# validate path
153+
path = Path(path)
124154
if not path.exists():
125155
raise FileNotFoundError(f"Directory does not exist: {path}")
126156
elif not path.is_dir():
@@ -151,22 +181,27 @@ def _retrieve(config: dict, key: str):
151181

152182
@property
153183
def dataset(self) -> ds.Dataset:
184+
"""Cache dataset."""
154185
return ds.dataset(self._path, format="parquet")
155186

156187
@property
157188
def schema(self) -> pa.Schema:
189+
"""Cache schema."""
158190
return self.dataset.schema
159191

160192
@property
161193
def batch_size(self) -> int:
194+
"""Batch size for writes."""
162195
return self._batch_size
163196

164197
@property
165198
def rows_per_file(self) -> int:
199+
"""Target number of rows per file."""
166200
return self._rows_per_file
167201

168202
@property
169203
def compression(self) -> str:
204+
"""File compression method."""
170205
return self._compression
171206

172207

@@ -190,15 +225,60 @@ def __init__(
190225
self._buffer = []
191226
self._count = 0
192227

228+
@property
229+
def schema(self) -> pa.Schema:
230+
"""Cache schema."""
231+
return self._schema
232+
233+
@property
234+
def dataset(self) -> ds.Dataset:
235+
"""Cache dataset."""
236+
return ds.dataset(
237+
self._path,
238+
format="parquet",
239+
schema=self.schema,
240+
)
241+
242+
@property
243+
def batch_size(self) -> int:
244+
"""Batch size for writes."""
245+
return self._batch_size
246+
247+
@property
248+
def rows_per_file(self) -> int:
249+
"""Target number of rows per file."""
250+
return self._rows_per_file
251+
252+
@property
253+
def compression(self) -> str:
254+
"""File compression method."""
255+
return self._compression
256+
193257
@classmethod
194258
def create(
195259
cls,
196260
path: str | Path,
197261
schema: pa.Schema,
198-
batch_size: int = 1000,
199-
rows_per_file: int = 10000,
262+
batch_size: int = 1_000,
263+
rows_per_file: int = 10_000,
200264
compression: str = "snappy",
201265
):
266+
"""
267+
Create a cache.
268+
269+
Parameters
270+
----------
271+
path : str | Path
272+
Where to write the cache.
273+
schema : pa.Schema
274+
Cache schema.
275+
batch_size : int, default=1_000
276+
Target batch size when writing chunks.
277+
rows_per_file : int, default=10_000
278+
Target number of rows to store per file.
279+
compression : str, default="snappy"
280+
Compression method to use when storing on disk.
281+
"""
202282
Path(path).mkdir(parents=True, exist_ok=False)
203283
cfg_path = cls._generate_config_path(path)
204284
with open(cfg_path, "w") as f:
@@ -218,8 +298,15 @@ def create(
218298

219299
@classmethod
220300
def load(cls, path: str | Path):
301+
"""
302+
Load cache from disk.
303+
304+
Parameters
305+
----------
306+
path : str | Path
307+
Where the cache is stored.
308+
"""
221309
path = Path(path)
222-
# validate path
223310
if not path.exists():
224311
raise FileNotFoundError(f"Directory does not exist: {path}")
225312
elif not path.is_dir():
@@ -239,6 +326,14 @@ def load(cls, path: str | Path):
239326

240327
@classmethod
241328
def delete(cls, path: str | Path):
329+
"""
330+
Delete a cache from disk.
331+
332+
Parameters
333+
----------
334+
path : str | Path
335+
Where the cache is stored.
336+
"""
242337
path = Path(path)
243338
if not path.exists():
244339
return
@@ -248,7 +343,7 @@ def delete(cls, path: str | Path):
248343
if cfg_path.exists() and cfg_path.is_file():
249344
cfg_path.unlink()
250345
# delete parquet files
251-
for file in cache.dataset_files:
346+
for file in cache.get_dataset_files():
252347
if file.exists() and file.is_file() and file.suffix == ".parquet":
253348
file.unlink()
254349
# delete empty cache directory
@@ -258,6 +353,14 @@ def write_rows(
258353
self,
259354
rows: list[dict[str, Any]],
260355
):
356+
"""
357+
Write rows to cache.
358+
359+
Parameters
360+
----------
361+
rows : list[dict[str, Any]]
362+
A list of rows represented by dictionaries mapping fields to values.
363+
"""
261364
if not rows:
262365
return
263366
batch = pa.RecordBatch.from_pylist(rows, schema=self.schema)
@@ -267,6 +370,14 @@ def write_batch(
267370
self,
268371
batch: pa.RecordBatch | dict[str, list | np.ndarray | pa.Array],
269372
):
373+
"""
374+
Write a batch to cache.
375+
376+
Parameters
377+
----------
378+
batch : pa.RecordBatch | dict[str, list | np.ndarray | pa.Array]
379+
A batch of columnar data.
380+
"""
270381
if isinstance(batch, dict):
271382
batch = pa.RecordBatch.from_pydict(batch)
272383

@@ -303,10 +414,19 @@ def write_table(
303414
self,
304415
table: pa.Table,
305416
):
417+
"""
418+
Write a table directly to cache.
419+
420+
Parameters
421+
----------
422+
table : pa.Table
423+
A populated table.
424+
"""
306425
self.flush()
307-
pq.write_table(table, where=self._next_filename())
426+
pq.write_table(table, where=self._generate_next_filename())
308427

309428
def flush(self):
429+
"""Flush the cache buffer."""
310430
if self._buffer:
311431
combined_arrays = [
312432
pa.concat_arrays([b.column(name) for b in self._buffer])
@@ -322,8 +442,9 @@ def flush(self):
322442
self._count = 0
323443
self._close_writer()
324444

325-
def _next_filename(self) -> Path:
326-
files = self.dataset_files
445+
def _generate_next_filename(self) -> Path:
446+
"""Generates next dataset filepath."""
447+
files = self.get_dataset_files()
327448
if not files:
328449
next_index = 0
329450
else:
@@ -335,7 +456,7 @@ def _get_or_create_writer(self) -> pq.ParquetWriter:
335456
if self._writer is not None:
336457
return self._writer
337458
self._writer = pq.ParquetWriter(
338-
where=self._next_filename(),
459+
where=self._generate_next_filename(),
339460
schema=self.schema,
340461
compression=self.compression,
341462
)
@@ -354,27 +475,3 @@ def __enter__(self):
354475
def __exit__(self, exc_type, exc_val, exc_tb):
355476
"""Context manager exit - ensures data is flushed."""
356477
self.flush()
357-
358-
@property
359-
def schema(self) -> pa.Schema:
360-
return self._schema
361-
362-
@property
363-
def dataset(self) -> ds.Dataset:
364-
return ds.dataset(
365-
self._path,
366-
format="parquet",
367-
schema=self.schema,
368-
)
369-
370-
@property
371-
def batch_size(self) -> int:
372-
return self._batch_size
373-
374-
@property
375-
def rows_per_file(self) -> int:
376-
return self._rows_per_file
377-
378-
@property
379-
def compression(self) -> str:
380-
return self._compression

0 commit comments

Comments
 (0)