Skip to content

Commit 6652c4b

Browse files
committed
updated file management
1 parent 4f9c0a4 commit 6652c4b

19 files changed

Lines changed: 775 additions & 470 deletions

src/valor_lite/cache.py

Lines changed: 134 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -63,22 +63,15 @@ def convert_type_mapping_to_schema(
6363
return [(k, DataType(v).to_arrow()) for k, v in type_mapping.items()]
6464

6565

66-
class CacheReader:
67-
def __init__(self, where: str | Path):
68-
self._dir = Path(where)
69-
self._cfg = self._dir / ".cfg"
70-
71-
with open(self._cfg, "r") as f:
72-
cfg = json.load(f)
73-
self._batch_size = cfg.get("batch_size")
74-
self._rows_per_file = cfg.get("rows_per_file")
75-
self._compression = cfg.get("compression")
66+
class CacheFiles:
67+
def __init__(self, path: str | Path):
68+
self._path = Path(path)
7669

7770
@property
7871
def files(self) -> list[str]:
7972
files = []
80-
for entry in os.listdir(self._dir):
81-
full_path = os.path.join(self._dir, entry)
73+
for entry in os.listdir(self._path):
74+
full_path = os.path.join(self._path, entry)
8275
if os.path.isfile(full_path):
8376
files.append(full_path)
8477
return files
@@ -89,93 +82,141 @@ def num_files(self) -> int:
8982

9083
@property
9184
def dataset_files(self) -> list[str]:
92-
return glob.glob(f"{self._dir}/*.parquet")
85+
return glob.glob(f"{self._path}/*.parquet")
9386

9487
@property
9588
def num_dataset_files(self) -> int:
9689
return len(self.dataset_files)
9790

91+
@staticmethod
92+
def _generate_config_path(path: str | Path) -> Path:
93+
return Path(path) / ".cfg"
94+
95+
@staticmethod
96+
def _get_dataset_from_path(path: str | Path) -> ds.Dataset:
97+
return ds.dataset(path, format="parquet")
98+
99+
100+
class CacheReader(CacheFiles):
101+
def __init__(self, path: str | Path):
102+
self._path = Path(path)
103+
self._cfg = None
104+
self._dataset = None
105+
106+
# validate path
107+
if not self._path.exists():
108+
raise FileNotFoundError(f"Directory does not exist: {self._path}")
109+
elif not self._path.is_dir():
110+
raise NotADirectoryError(
111+
f"Path exists but is not a directory: {self._path}"
112+
)
113+
98114
@property
99-
def dataset(self):
100-
return ds.dataset(
101-
self._dir,
102-
format="parquet",
103-
)
115+
def dataset(self) -> ds.Dataset:
116+
if not self._dataset:
117+
self._dataset = ds.dataset(
118+
self._path,
119+
format="parquet",
120+
)
121+
return self._dataset
104122

105123
@property
106-
def schema(self):
124+
def schema(self) -> pa.Schema:
107125
return self.dataset.schema
108126

127+
@property
128+
def config(self) -> dict:
129+
if self._cfg is None:
130+
cfg_path = self._generate_config_path(self._path)
131+
with open(cfg_path, "r") as f:
132+
self._cfg = json.load(f)
133+
return self._cfg
134+
135+
def _read_config(self, key: str):
136+
if value := self.config.get(key, None):
137+
return value
138+
raise KeyError(
139+
f"'{key}' is not defined within {self._generate_config_path(self._path)}"
140+
)
141+
109142
@property
110143
def batch_size(self) -> int:
111-
return self._batch_size
144+
return int(self._read_config("batch_size"))
112145

113146
@property
114147
def rows_per_file(self) -> int:
115-
return self._rows_per_file
148+
return int(self._read_config("rows_per_file"))
116149

117150
@property
118151
def compression(self) -> str:
119-
return self._compression
152+
return str(self._read_config("compression"))
120153

121154

122-
class CacheWriter(CacheReader):
155+
class CacheWriter(CacheFiles):
123156
def __init__(
124157
self,
125-
where: str | Path,
158+
path: str | Path,
126159
schema: pa.Schema,
127-
batch_size: int = 1000,
128-
rows_per_file: int = 10000,
129-
compression: str = "snappy",
130-
delete_if_exists: bool = True,
160+
batch_size: int,
161+
rows_per_file: int,
162+
compression: str,
131163
):
132-
self._dir = Path(where)
133-
self._cfg = self._dir / ".cfg"
134-
164+
self._path = Path(path)
135165
self._schema = schema
136166
self._batch_size = batch_size
137167
self._rows_per_file = rows_per_file
138168
self._compression = compression
139169

140-
if delete_if_exists:
141-
self.delete_files()
142-
self._dir.mkdir(parents=True, exist_ok=True)
170+
# validate path
171+
if not self._path.exists():
172+
raise FileNotFoundError(f"Directory does not exist: {self._path}")
173+
elif not self._path.is_dir():
174+
raise NotADirectoryError(
175+
f"Path exists but is not a directory: {self._path}"
176+
)
143177

144-
# Internal state
178+
# internal state
145179
self._writer = None
146180
self._buffer = []
147181
self._count = 0
148182

149-
with open(self._cfg, "w") as f:
150-
info = dict(
183+
@classmethod
184+
def create(
185+
cls,
186+
path: str | Path,
187+
schema: pa.Schema,
188+
batch_size: int = 1000,
189+
rows_per_file: int = 10000,
190+
compression: str = "snappy",
191+
):
192+
Path(path).mkdir(parents=True, exist_ok=False)
193+
cfg_path = cls._generate_config_path(path)
194+
with open(cfg_path, "w") as f:
195+
cfg = dict(
151196
batch_size=batch_size,
152197
rows_per_file=rows_per_file,
153198
compression=compression,
154199
)
155-
json.dump(info, f, indent=2)
156-
157-
@property
158-
def schema(self):
159-
return self._schema
160-
161-
@property
162-
def dataset(self):
163-
return ds.dataset(
164-
self._dir,
165-
format="parquet",
166-
schema=self.schema,
200+
json.dump(cfg, f, indent=2)
201+
return cls(
202+
path=path,
203+
schema=schema,
204+
batch_size=batch_size,
205+
rows_per_file=rows_per_file,
206+
compression=compression,
167207
)
168208

169-
def delete_files(self):
170-
for file in self.dataset_files:
171-
Path(file).unlink()
172-
173-
@property
174-
def next_index(self):
175-
files = self.dataset_files
176-
if not files:
177-
return 0
178-
return max([int(Path(f).stem) for f in files]) + 1
209+
@classmethod
210+
def load(cls, path: str | Path):
211+
cfg_path = cls._generate_config_path(path)
212+
dataset = cls._get_dataset_from_path(path)
213+
with open(cfg_path, "r") as f:
214+
cfg = json.load(f)
215+
return cls(
216+
path=path,
217+
schema=dataset.schema,
218+
**cfg,
219+
)
179220

180221
def write_rows(
181222
self,
@@ -245,8 +286,17 @@ def flush(self):
245286
self._count = 0
246287
self._close_writer()
247288

289+
def delete(self):
290+
for file in self.files:
291+
Path(file).unlink()
292+
248293
def _next_filename(self) -> Path:
249-
return self._dir / f"{self.next_index:06d}.parquet"
294+
files = self.dataset_files
295+
if not files:
296+
next_index = 0
297+
else:
298+
next_index = max([int(Path(f).stem) for f in files]) + 1
299+
return self._path / f"{next_index:06d}.parquet"
250300

251301
def _get_or_create_writer(self) -> pq.ParquetWriter:
252302
"""Open a new parquet file for writing."""
@@ -272,3 +322,27 @@ def __enter__(self):
272322
def __exit__(self, exc_type, exc_val, exc_tb):
273323
"""Context manager exit - ensures data is flushed."""
274324
self.flush()
325+
326+
@property
327+
def schema(self) -> pa.Schema:
328+
return self._schema
329+
330+
@property
331+
def dataset(self) -> ds.Dataset:
332+
return ds.dataset(
333+
self._path,
334+
format="parquet",
335+
schema=self.schema,
336+
)
337+
338+
@property
339+
def batch_size(self) -> int:
340+
return self._batch_size
341+
342+
@property
343+
def rows_per_file(self) -> int:
344+
return self._rows_per_file
345+
346+
@property
347+
def compression(self) -> str:
348+
return self._compression

0 commit comments

Comments
 (0)