Skip to content

Commit a6d3f0a

Browse files
committed
Add accept_existing mode for incremental model migration
When accept_existing=True, the build keeps cached artifacts even if the transform fingerprint changed (e.g. model swap). Only artifacts with new/changed inputs get rebuilt with the new config. - Skip transform fingerprint comparison in _layer_fully_cached - Skip per-artifact fingerprint check in _save_artifact - Accept any existing artifact in cached_by_inputs lookup - Wire through MeshConfig.server.accept_existing → run()
1 parent 9c2b747 commit a6d3f0a

File tree

3 files changed

+39
-16
lines changed

3 files changed

+39
-16
lines changed

src/synix/build/runner.py

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ def run(
106106
progress=None,
107107
validate: bool = False,
108108
error_classifier: ErrorClassifier | None = None,
109+
accept_existing: bool = False,
109110
) -> RunResult:
110111
"""Execute the full pipeline — walk DAG, run transforms, materialize projections.
111112
@@ -120,6 +121,10 @@ def run(
120121
(default), all LLM errors are fatal and abort the build. Pass
121122
``LLMErrorClassifier()`` to enable DLQ — content-filter and
122123
input-too-large errors will be skipped instead of aborting.
124+
accept_existing: When True, keep existing cached artifacts even if the
125+
transform fingerprint changed (e.g. model swap). Only rebuilds
126+
artifacts with new/changed inputs. Useful for incremental model
127+
migration — existing work is preserved, new inputs use the new config.
123128
124129
Returns:
125130
RunResult with build statistics.
@@ -199,7 +204,7 @@ def run(
199204
layer_built: list[Artifact] = []
200205
dlq_before = len(result.dlq)
201206

202-
if _layer_fully_cached(layer, inputs, store, transform_fp):
207+
if _layer_fully_cached(layer, inputs, store, transform_fp, accept_existing=accept_existing):
203208
# All cached — load existing artifacts
204209
existing = store.list_artifacts(layer.name)
205210
for art in existing:
@@ -226,17 +231,24 @@ def _save_artifact(
226231
_layer=layer,
227232
_transform_fp=transform_fp,
228233
_inputs=inputs,
234+
_accept_existing=accept_existing,
229235
) -> None:
230236
effective_inputs = parent_inputs if parent_inputs is not None else _inputs
231237
# Compute per-artifact build fingerprint
232238
build_fp = compute_build_fingerprint(_transform_fp, artifact.input_ids)
233239

234-
rebuild, _reasons = needs_rebuild(
235-
artifact.label,
236-
artifact.input_ids,
237-
store,
238-
current_build_fingerprint=build_fp,
239-
)
240+
# In accept_existing mode, skip fingerprint comparison —
241+
# only rebuild if the artifact doesn't exist at all.
242+
if _accept_existing:
243+
existing = store.load_artifact(artifact.label)
244+
rebuild = existing is None
245+
else:
246+
rebuild, _reasons = needs_rebuild(
247+
artifact.label,
248+
artifact.input_ids,
249+
store,
250+
current_build_fingerprint=build_fp,
251+
)
240252
if rebuild:
241253
artifact.metadata["layer_name"] = _layer.name
242254
artifact.metadata["layer_level"] = _layer._level
@@ -288,12 +300,17 @@ def _on_batch_complete(artifacts: list[Artifact], unit_inputs: list[Artifact]) -
288300
existing_artifacts = store.list_artifacts(layer.name)
289301
cached_by_inputs: dict[tuple[str, ...], list[Artifact]] = {}
290302
for art in existing_artifacts:
291-
stored_tfp_data = art.metadata.get("transform_fingerprint")
292-
if stored_tfp_data is not None:
293-
stored_fp = Fingerprint.from_dict(stored_tfp_data)
294-
if transform_fp.matches(stored_fp):
295-
key = tuple(sorted(art.input_ids))
296-
cached_by_inputs.setdefault(key, []).append(art)
303+
if accept_existing:
304+
# Accept any existing artifact regardless of fingerprint
305+
key = tuple(sorted(art.input_ids))
306+
cached_by_inputs.setdefault(key, []).append(art)
307+
else:
308+
stored_tfp_data = art.metadata.get("transform_fingerprint")
309+
if stored_tfp_data is not None:
310+
stored_fp = Fingerprint.from_dict(stored_tfp_data)
311+
if transform_fp.matches(stored_fp):
312+
key = tuple(sorted(art.input_ids))
313+
cached_by_inputs.setdefault(key, []).append(art)
297314

298315
def _on_cached(cached_arts: list[Artifact], unit_inputs: list[Artifact]) -> None:
299316
for cached_art in cached_arts:
@@ -658,14 +675,15 @@ def _layer_fully_cached(
658675
inputs: list[Artifact],
659676
store: SnapshotArtifactCache,
660677
transform_fp: Fingerprint | None = None,
678+
accept_existing: bool = False,
661679
) -> bool:
662680
"""Check if a layer can be entirely skipped (all artifacts cached)."""
663681
existing = store.list_artifacts(layer.name)
664682
if not existing:
665683
return False
666684

667-
# Check transform identity via fingerprint
668-
if transform_fp is not None:
685+
# Check transform identity via fingerprint (skip when accept_existing)
686+
if transform_fp is not None and not accept_existing:
669687
for art in existing:
670688
stored_tfp_data = art.metadata.get("transform_fingerprint")
671689
if stored_tfp_data is None:

src/synix/mesh/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ class ServerConfig:
3636
build_min_interval: int = 300
3737
build_quiet_period: int = 60
3838
build_max_delay: int = 1800
39+
accept_existing: bool = False
3940

4041

4142
@dataclass
@@ -185,6 +186,7 @@ def load_mesh_config(path: Path) -> MeshConfig:
185186
build_min_interval=srv.get("build_min_interval", ServerConfig.build_min_interval),
186187
build_quiet_period=quiet_period,
187188
build_max_delay=srv.get("build_max_delay", ServerConfig.build_max_delay),
189+
accept_existing=srv.get("accept_existing", ServerConfig.accept_existing),
188190
)
189191

190192
# Client config

src/synix/mesh/server.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,10 @@ async def _run_build() -> None:
509509
# 3. Run build in a thread (sync build system)
510510
build_phase = "building"
511511
pipeline.build_dir = str(build_dir)
512-
result = await asyncio.to_thread(run, pipeline, source_dir=source_dir)
512+
result = await asyncio.to_thread(
513+
run, pipeline, source_dir=source_dir,
514+
accept_existing=config.server.accept_existing,
515+
)
513516

514517
async with _state_lock:
515518
build_count += 1

0 commit comments

Comments
 (0)