Skip to content

Commit 69e3403

Browse files
committed
adjust logic of chunking
1 parent 1e83281 commit 69e3403

File tree

1 file changed

+16
-7
lines changed

1 file changed

+16
-7
lines changed

src/vivarium_cluster_tools/psimulate/results/processing.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ def write_results_batch(
176176
start = time.time()
177177
# Write results to chunked files per metric
178178
for metric, new_df in results_to_write.items():
179+
(output_paths.results_dir / metric).mkdir(exist_ok=True)
179180
_write_metric_chunk(
180181
new_data=new_df,
181182
chunk_map=chunk_map,
@@ -204,12 +205,6 @@ def _write_metric_chunk(
204205

205206
while not remaining.empty:
206207
chunk_path = chunk_map.get_path(metric)
207-
chunk_path.parent.mkdir(exist_ok=True)
208-
209-
# If current chunk is already at/over limit, rotate first
210-
if chunk_path.exists() and chunk_path.stat().st_size >= chunk_size:
211-
chunk_map[metric] += 1
212-
chunk_path = chunk_map.get_path(metric)
213208

214209
if chunk_path.exists():
215210
current_size = chunk_path.stat().st_size
@@ -218,7 +213,21 @@ def _write_metric_chunk(
218213
remaining_space = chunk_size
219214

220215
# Calculate rows that fit, but always write at least 1 row to make progress
221-
rows_that_fit = max(1, int(remaining_space / chunk_map.bytes_per_row(metric)))
216+
rows_that_fit = int(remaining_space / chunk_map.bytes_per_row(metric))
217+
if rows_that_fit < 1:
218+
if chunk_map.bytes_per_row(metric) > chunk_size:
219+
logger.warning(
220+
f"Estimated bytes per row for metric '{metric}' "
221+
f"({chunk_map.bytes_per_row(metric):.2f} bytes) "
222+
f"exceeds chunk size ({chunk_size} bytes). "
223+
f"Writing one row per chunk file."
224+
)
225+
rows_that_fit = 1
226+
else:
227+
# No space left in current chunk, rotate to next chunk
228+
chunk_map[metric] += 1
229+
continue
230+
222231
to_write = remaining.iloc[:rows_that_fit].reset_index(drop=True)
223232
remaining = remaining.iloc[rows_that_fit:].reset_index(drop=True)
224233

0 commit comments

Comments
 (0)