Skip to content

Commit be71fef

Browse files
committed
checkout cache src
1 parent 75d3c19 commit be71fef

11 files changed

Lines changed: 543 additions & 8 deletions

File tree

.github/workflows/benchmarks.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ jobs:
1515
- uses: actions/checkout@v3
1616
- uses: actions/setup-python@v4
1717
with:
18-
python-version: "3.10"
18+
python-version: "3.11"
1919
- name: install lite
2020
run: pip install -e .[dev]
2121
working-directory: ./src
@@ -41,7 +41,7 @@ jobs:
4141
- uses: actions/checkout@v3
4242
- uses: actions/setup-python@v4
4343
with:
44-
python-version: "3.10"
44+
python-version: "3.11"
4545
- name: install lite
4646
run: pip install -e .[dev]
4747
working-directory: ./src

.github/workflows/build-and-publish.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ jobs:
1515
- uses: actions/checkout@v3
1616
- uses: actions/setup-python@v4
1717
with:
18-
python-version: "3.10"
18+
python-version: "3.11"
1919
- name: Build wheel
2020
run: pip install build && python -m build
2121
- name: Publish to PyPI

.github/workflows/check-pre-commit.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ jobs:
1111
- uses: actions/checkout@v2
1212
- uses: actions/setup-python@v2
1313
with:
14-
python-version: "3.10"
14+
python-version: "3.11"
1515
- name: Install pre-commit
1616
run: pip install pre-commit && pre-commit install
1717
- name: Run pre-commit. This will fail if pre-commit hooks fail.

.github/workflows/publish-docs.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ jobs:
1818
fetch-depth: 0
1919
- uses: actions/setup-python@v4
2020
with:
21-
python-version: "3.10"
21+
python-version: "3.11"
2222
- name: install python dependencies
2323
run: pip install "src/[test, docs]"
2424
- name: deploy docs to gh

.github/workflows/tests-and-coverage.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ jobs:
1818
- uses: actions/checkout@v3
1919
- uses: actions/setup-python@v4
2020
with:
21-
python-version: "3.10"
21+
python-version: "3.11"
2222
- name: run classification tests and report coverage
2323
run: |
2424
pip install -e "./src/[dev]"
@@ -51,6 +51,7 @@ jobs:
5151
run: |
5252
pip install -e "./src/[dev]"
5353
COVERAGE_FILE=.coverage.segmentation python -m coverage run --include "src/valor_lite/*" -m pytest -v tests/semantic_segmentation/
54+
COVERAGE_FILE=.coverage.common python -m coverage run --include "src/valor_lite/*" -m pytest -v tests/common/
5455
python -m coverage combine
5556
python -m coverage report -m
5657
python -m coverage json

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,5 @@ site/*
2929
*.pt
3030
*.png
3131
*.jpg
32-
*.parquet
32+
*.parquet
33+
*.valor

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ repos:
2525
# supported by your project here, or alternatively use
2626
# pre-commit's default_language_version, see
2727
# https://pre-commit.com/#top_level-default_language_version
28-
language_version: python3.10
28+
language_version: python3.11
2929
args: [--line-length=79]
3030

3131
- repo: https://github.com/RobertCraigie/pyright-python

src/valor_lite/cache.py

Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
import glob
2+
import json
3+
import os
4+
from datetime import datetime
5+
from enum import StrEnum
6+
from pathlib import Path
7+
from typing import Any
8+
9+
import numpy as np
10+
import pyarrow as pa
11+
import pyarrow.dataset as ds
12+
import pyarrow.lib as pl
13+
import pyarrow.parquet as pq
14+
15+
16+
class DataType(StrEnum):
17+
INTEGER = "int"
18+
FLOAT = "float"
19+
STRING = "string"
20+
TIMESTAMP = "timestamp"
21+
22+
def to_py(self):
23+
match self:
24+
case DataType.INTEGER:
25+
return int
26+
case DataType.FLOAT:
27+
return float
28+
case DataType.STRING:
29+
return str
30+
case DataType.TIMESTAMP:
31+
return datetime
32+
33+
def to_arrow(self):
34+
match self:
35+
case DataType.INTEGER:
36+
return pa.int64()
37+
case DataType.FLOAT:
38+
return pa.float64()
39+
case DataType.STRING:
40+
return pa.string()
41+
case DataType.TIMESTAMP:
42+
return pa.timestamp("us")
43+
44+
45+
def convert_type_mapping_to_schema(
46+
type_mapping: dict[str, DataType] | None
47+
) -> list[tuple[str, pl.DataType]]:
48+
"""
49+
Convert type mapping to a pyarrow schema input.
50+
51+
Parameters
52+
----------
53+
type_mapping : dict[str, DataType] | None
54+
A map from string key to datatype. Treats input of `None` as empty mapping.
55+
56+
Returns
57+
-------
58+
list[tuple[str, pyarrow.lib.DataType]]
59+
A list of field name, field type pairs that can be used as input to pyarrow.schema.
60+
"""
61+
if not type_mapping:
62+
return []
63+
return [(k, DataType(v).to_arrow()) for k, v in type_mapping.items()]
64+
65+
66+
class CacheReader:
67+
def __init__(self, where: str | Path):
68+
self._dir = Path(where)
69+
self._cfg = self._dir / ".cfg"
70+
71+
with open(self._cfg, "r") as f:
72+
cfg = json.load(f)
73+
self._batch_size = cfg.get("batch_size")
74+
self._rows_per_file = cfg.get("rows_per_file")
75+
self._compression = cfg.get("compression")
76+
77+
@property
78+
def files(self) -> list[str]:
79+
files = []
80+
for entry in os.listdir(self._dir):
81+
full_path = os.path.join(self._dir, entry)
82+
if os.path.isfile(full_path):
83+
files.append(full_path)
84+
return files
85+
86+
@property
87+
def num_files(self) -> int:
88+
return len(self.files)
89+
90+
@property
91+
def dataset_files(self) -> list[str]:
92+
return glob.glob(f"{self._dir}/*.parquet")
93+
94+
@property
95+
def num_dataset_files(self) -> int:
96+
return len(self.dataset_files)
97+
98+
@property
99+
def dataset(self):
100+
return ds.dataset(
101+
self._dir,
102+
format="parquet",
103+
)
104+
105+
@property
106+
def schema(self):
107+
return self.dataset.schema
108+
109+
@property
110+
def batch_size(self) -> int:
111+
return self._batch_size
112+
113+
@property
114+
def rows_per_file(self) -> int:
115+
return self._rows_per_file
116+
117+
@property
118+
def compression(self) -> str:
119+
return self._compression
120+
121+
122+
class CacheWriter(CacheReader):
123+
def __init__(
124+
self,
125+
where: str | Path,
126+
schema: pa.Schema,
127+
batch_size: int = 1000,
128+
rows_per_file: int = 10000,
129+
compression: str = "snappy",
130+
delete_if_exists: bool = True,
131+
):
132+
self._dir = Path(where)
133+
self._cfg = self._dir / ".cfg"
134+
135+
self._schema = schema
136+
self._batch_size = batch_size
137+
self._rows_per_file = rows_per_file
138+
self._compression = compression
139+
140+
if delete_if_exists:
141+
self.delete_files()
142+
self._dir.mkdir(parents=True, exist_ok=True)
143+
144+
# Internal state
145+
self._writer = None
146+
self._buffer = []
147+
self._count = 0
148+
149+
with open(self._cfg, "w") as f:
150+
info = dict(
151+
batch_size=batch_size,
152+
rows_per_file=rows_per_file,
153+
compression=compression,
154+
)
155+
json.dump(info, f, indent=2)
156+
157+
@property
158+
def schema(self):
159+
return self._schema
160+
161+
@property
162+
def dataset(self):
163+
return ds.dataset(
164+
self._dir,
165+
format="parquet",
166+
schema=self.schema,
167+
)
168+
169+
def delete_files(self):
170+
for file in self.dataset_files:
171+
Path(file).unlink()
172+
173+
@property
174+
def next_index(self):
175+
files = self.dataset_files
176+
if not files:
177+
return 0
178+
return max([int(Path(f).stem) for f in files]) + 1
179+
180+
def write_rows(
181+
self,
182+
rows: list[dict[str, Any]],
183+
):
184+
if not rows:
185+
return
186+
batch = pa.RecordBatch.from_pylist(rows, schema=self.schema)
187+
self.write_batch(batch)
188+
189+
def write_batch(
190+
self,
191+
batch: pa.RecordBatch | dict[str, list | np.ndarray | pa.Array],
192+
):
193+
if isinstance(batch, dict):
194+
batch = pa.RecordBatch.from_pydict(batch)
195+
196+
size = batch.num_rows # type: ignore - pyarrow typing
197+
if self._buffer:
198+
size += sum([b.num_rows for b in self._buffer])
199+
200+
# check size
201+
if size < self.batch_size and self._count < self.rows_per_file:
202+
self._buffer.append(batch)
203+
return
204+
205+
if self._buffer:
206+
self._buffer.append(batch)
207+
combined_arrays = [
208+
pa.concat_arrays([b.column(name) for b in self._buffer])
209+
for name in self.schema.names
210+
]
211+
batch = pa.RecordBatch.from_arrays(
212+
combined_arrays, schema=self.schema
213+
)
214+
self._buffer = []
215+
216+
# write batch
217+
writer = self._get_or_create_writer()
218+
writer.write_batch(batch)
219+
220+
# check file size
221+
self._count += size
222+
if self._count >= self.rows_per_file:
223+
self.flush()
224+
225+
def write_table(
226+
self,
227+
table: pa.Table,
228+
):
229+
self.flush()
230+
pq.write_table(table, where=self._next_filename())
231+
232+
def flush(self):
233+
if self._buffer:
234+
combined_arrays = [
235+
pa.concat_arrays([b.column(name) for b in self._buffer])
236+
for name in self.schema.names
237+
]
238+
batch = pa.RecordBatch.from_arrays(
239+
combined_arrays, schema=self.schema
240+
)
241+
self._buffer = []
242+
writer = self._get_or_create_writer()
243+
writer.write_batch(batch)
244+
self._buffer = []
245+
self._count = 0
246+
self._close_writer()
247+
248+
def _next_filename(self) -> Path:
249+
return self._dir / f"{self.next_index:06d}.parquet"
250+
251+
def _get_or_create_writer(self) -> pq.ParquetWriter:
252+
"""Open a new parquet file for writing."""
253+
if self._writer is not None:
254+
return self._writer
255+
self._writer = pq.ParquetWriter(
256+
where=self._next_filename(),
257+
schema=self.schema,
258+
compression=self.compression,
259+
)
260+
return self._writer
261+
262+
def _close_writer(self) -> None:
263+
"""Close the current parquet file."""
264+
if self._writer is not None:
265+
self._writer.close()
266+
self._writer = None
267+
268+
def __enter__(self):
269+
"""Context manager entry."""
270+
return self
271+
272+
def __exit__(self, exc_type, exc_val, exc_tb):
273+
"""Context manager exit - ensures data is flushed."""
274+
self.flush()

src/valor_lite/exceptions.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@ def __init__(self):
55
)
66

77

8+
class EmptyCacheError(Exception):
9+
def __init__(self):
10+
super().__init__("cache contains no data")
11+
12+
813
class EmptyFilterError(Exception):
914
def __init__(self, message: str):
1015
super().__init__(message)

0 commit comments

Comments
 (0)