Skip to content
This repository was archived by the owner on Apr 30, 2026. It is now read-only.

Commit ce4bb12

Browse files
committed
Refactor pipeline threading and simplify sdg batch processing.
Added logging to track remaining threads during pipeline execution for better debugging. Removed redundant batching logic in block processing to fix concurrency bug slowing down sdg.
1 parent 6454380 commit ce4bb12

1 file changed

Lines changed: 11 additions & 1 deletion

File tree

src/instructlab/sdg/pipeline.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,11 +170,20 @@ def generate(self, dataset, checkpoint_name=None) -> Dataset:
170170
executor.submit(self._generate_single, input_split)
171171
for input_split in input_splits
172172
]
173-
173+
threads_remaining_to_execute = len(futures)
174+
logger.info(
175+
"Total of %d pipeline threads to execute",
176+
len(futures),
177+
)
174178
# Collect the results of each batch as they finish. This needs to
175179
# wait for them all, so the order of waiting doesn't matter
176180
for future in futures:
177181
ds = future.result()
182+
threads_remaining_to_execute-=1
183+
logger.info(
184+
"Total of %d pipeline threads to check for completion",
185+
threads_remaining_to_execute,
186+
)
178187
output_splits.append(ds)
179188
checkpointer.checkpoint(ds)
180189
checkpointer.done()
@@ -198,6 +207,7 @@ def _generate_single(self, dataset) -> Dataset:
198207
block = block_type(self.ctx, self, block_name, **block_config)
199208
logger.info("Running block: %s", block_name)
200209

210+
201211
# Check if batching is enabled
202212
if not self.ctx.batching_enabled:
203213
logger.info(

0 commit comments

Comments
 (0)