Skip to content

Commit 2fc30a4

Browse files
authored
Add dedicated fields for job reruns: run_group_id and rerun_from_job_id (#1512)
Add dedicated fields for job reruns: `run_group_id` and `rerun_from_job_id`
1 parent 0ddc937 commit 2fc30a4

File tree

7 files changed

+75
-21
lines changed

7 files changed

+75
-21
lines changed

src/datachain/data_storage/metastore.py

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,8 @@ def create_job(
466466
python_version: str | None = None,
467467
params: dict[str, str] | None = None,
468468
parent_job_id: str | None = None,
469+
rerun_from_job_id: str | None = None,
470+
run_group_id: str | None = None,
469471
) -> str:
470472
"""
471473
Creates a new job.
@@ -1835,7 +1837,11 @@ def _jobs_columns() -> "list[SchemaItem]":
18351837
Column("params", JSON, nullable=False),
18361838
Column("metrics", JSON, nullable=False),
18371839
Column("parent_job_id", Text, nullable=True),
1840+
Column("rerun_from_job_id", Text, nullable=True),
1841+
Column("run_group_id", Text, nullable=True),
18381842
Index("idx_jobs_parent_job_id", "parent_job_id"),
1843+
Index("idx_jobs_rerun_from_job_id", "rerun_from_job_id"),
1844+
Index("idx_jobs_run_group_id", "run_group_id"),
18391845
]
18401846

18411847
@cached_property
@@ -1896,13 +1902,29 @@ def create_job(
18961902
python_version: str | None = None,
18971903
params: dict[str, str] | None = None,
18981904
parent_job_id: str | None = None,
1905+
rerun_from_job_id: str | None = None,
1906+
run_group_id: str | None = None,
18991907
conn: Any = None,
19001908
) -> str:
19011909
"""
19021910
Creates a new job.
19031911
Returns the job id.
19041912
"""
19051913
job_id = str(uuid4())
1914+
1915+
# Validate run_group_id and rerun_from_job_id consistency
1916+
if rerun_from_job_id:
1917+
# Rerun job: run_group_id must be provided by caller
1918+
assert run_group_id is not None, (
1919+
"run_group_id must be provided when rerun_from_job_id is set"
1920+
)
1921+
else:
1922+
# First job: run_group_id should not be provided (we set it here)
1923+
assert run_group_id is None, (
1924+
"run_group_id should not be provided when rerun_from_job_id is not set"
1925+
)
1926+
run_group_id = job_id
1927+
19061928
self.db.execute(
19071929
self._jobs_insert().values(
19081930
id=job_id,
@@ -1918,6 +1940,8 @@ def create_job(
19181940
params=json.dumps(params or {}),
19191941
metrics=json.dumps({}),
19201942
parent_job_id=parent_job_id,
1943+
rerun_from_job_id=rerun_from_job_id,
1944+
run_group_id=run_group_id,
19211945
),
19221946
conn=conn,
19231947
)
@@ -2191,35 +2215,47 @@ def link_dataset_version_to_job(
21912215
self.db.execute(update_query, conn=conn)
21922216

21932217
def get_ancestor_job_ids(self, job_id: str, conn=None) -> list[str]:
2194-
# Use recursive CTE to walk up the parent chain
2195-
# Format: WITH RECURSIVE ancestors(id, parent_job_id, depth) AS (...)
2218+
# Use recursive CTE to walk up the rerun chain
2219+
# Format: WITH RECURSIVE ancestors(id, rerun_from_job_id, run_group_id,
2220+
# depth) AS (...)
21962221
# Include depth tracking to prevent infinite recursion in case of
21972222
# circular dependencies
21982223
ancestors_cte = (
21992224
self._jobs_select(
22002225
self._jobs.c.id.label("id"),
2201-
self._jobs.c.parent_job_id.label("parent_job_id"),
2226+
self._jobs.c.rerun_from_job_id.label("rerun_from_job_id"),
2227+
self._jobs.c.run_group_id.label("run_group_id"),
22022228
literal(0).label("depth"),
22032229
)
22042230
.where(self._jobs.c.id == job_id)
22052231
.cte(name="ancestors", recursive=True)
22062232
)
22072233

22082234
# Recursive part: join with parent jobs, incrementing depth and checking limit
2235+
# Also ensure we only traverse jobs within the same run_group_id for safety
22092236
ancestors_recursive = ancestors_cte.union_all(
22102237
self._jobs_select(
22112238
self._jobs.c.id.label("id"),
2212-
self._jobs.c.parent_job_id.label("parent_job_id"),
2239+
self._jobs.c.rerun_from_job_id.label("rerun_from_job_id"),
2240+
self._jobs.c.run_group_id.label("run_group_id"),
22132241
(ancestors_cte.c.depth + 1).label("depth"),
22142242
).select_from(
22152243
self._jobs.join(
22162244
ancestors_cte,
22172245
(
22182246
self._jobs.c.id
2219-
== cast(ancestors_cte.c.parent_job_id, self._jobs.c.id.type)
2247+
== cast(ancestors_cte.c.rerun_from_job_id, self._jobs.c.id.type)
22202248
)
2221-
& (ancestors_cte.c.parent_job_id.isnot(None)) # Stop at root jobs
2222-
& (ancestors_cte.c.depth < JOB_ANCESTRY_MAX_DEPTH),
2249+
& (
2250+
ancestors_cte.c.rerun_from_job_id.isnot(None)
2251+
) # Stop at root jobs
2252+
& (ancestors_cte.c.depth < JOB_ANCESTRY_MAX_DEPTH)
2253+
& (
2254+
self._jobs.c.run_group_id
2255+
== cast(
2256+
ancestors_cte.c.run_group_id, self._jobs.c.run_group_id.type
2257+
)
2258+
), # Safety: only traverse within same run group
22232259
)
22242260
)
22252261
)

src/datachain/job.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ class Job:
2424
error_message: str = ""
2525
error_stack: str = ""
2626
parent_job_id: str | None = None
27+
rerun_from_job_id: str | None = None
28+
run_group_id: str | None = None
2729

2830
@classmethod
2931
def parse(
@@ -42,6 +44,8 @@ def parse(
4244
params: str,
4345
metrics: str,
4446
parent_job_id: str | None,
47+
rerun_from_job_id: str | None,
48+
run_group_id: str | None,
4549
) -> "Job":
4650
return cls(
4751
str(id),
@@ -58,4 +62,6 @@ def parse(
5862
error_message,
5963
error_stack,
6064
str(parent_job_id) if parent_job_id else None,
65+
str(rerun_from_job_id) if rerun_from_job_id else None,
66+
str(run_group_id) if run_group_id else None,
6167
)

src/datachain/lib/dc/datachain.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -718,9 +718,9 @@ def _resolve_checkpoint(
718718
_hash = self._calculate_job_hash(job.id)
719719

720720
if (
721-
job.parent_job_id
721+
job.rerun_from_job_id
722722
and not checkpoints_reset
723-
and metastore.find_checkpoint(job.parent_job_id, _hash)
723+
and metastore.find_checkpoint(job.rerun_from_job_id, _hash)
724724
):
725725
# checkpoint found → find which dataset version to reuse
726726

src/datachain/query/session.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ def get_or_create_job(self) -> "Job":
154154
script = str(uuid4())
155155
python_version = f"{sys.version_info.major}.{sys.version_info.minor}"
156156

157-
# try to find the parent job
157+
# try to find the parent job for checkpoint/rerun chain
158158
parent = self.catalog.metastore.get_last_job_by_name(script)
159159

160160
job_id = self.catalog.metastore.create_job(
@@ -163,7 +163,8 @@ def get_or_create_job(self) -> "Job":
163163
query_type=JobQueryType.PYTHON,
164164
status=JobStatus.RUNNING,
165165
python_version=python_version,
166-
parent_job_id=parent.id if parent else None,
166+
rerun_from_job_id=parent.id if parent else None,
167+
run_group_id=parent.run_group_id if parent else None,
167168
)
168169
Session._CURRENT_JOB = self.catalog.metastore.get_job(job_id)
169170
Session._OWNS_JOB = True

tests/func/test_metastore.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -912,13 +912,14 @@ def test_get_job_status(metastore):
912912
@pytest.mark.parametrize("depth", [0, 1, 2, 3, 5])
913913
def test_get_ancestor_job_ids(metastore, depth):
914914
"""Test get_ancestor_job_ids with different hierarchy depths."""
915-
# Create a chain of jobs with parent relationships
916-
# depth=0: single job with no parent
917-
# depth=1: job -> parent
918-
# depth=2: job -> parent -> grandparent
915+
# Create a chain of jobs with rerun relationships
916+
# depth=0: single job with no rerun ancestor
917+
# depth=1: job -> rerun_from
918+
# depth=2: job -> rerun_from -> rerun_from
919919

920920
job_ids = []
921-
parent_id = None
921+
rerun_from_id = None
922+
group_id = None
922923

923924
# Create jobs from root to leaf
924925
for i in range(depth + 1):
@@ -928,10 +929,14 @@ def test_get_ancestor_job_ids(metastore, depth):
928929
query_type=JobQueryType.PYTHON,
929930
status=JobStatus.CREATED,
930931
workers=1,
931-
parent_job_id=parent_id,
932+
rerun_from_job_id=rerun_from_id,
933+
run_group_id=group_id,
932934
)
933935
job_ids.append(job_id)
934-
parent_id = job_id
936+
rerun_from_id = job_id
937+
# First job sets the group_id
938+
if group_id is None:
939+
group_id = metastore.get_job(job_id).run_group_id
935940

936941
# The last job is the leaf (youngest)
937942
leaf_job_id = job_ids[-1]

tests/unit/lib/test_checkpoints.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ def test_checkpoints(
104104
chain.save("nums2")
105105
with pytest.raises(CustomMapperError):
106106
chain.map(new=mapper_fail).save("nums3")
107-
first_job_id = test_session.get_or_create_job().id
107+
first_job = test_session.get_or_create_job()
108+
first_job_id = first_job.id
108109

109110
catalog.get_dataset("nums1")
110111
catalog.get_dataset("nums2")
@@ -116,7 +117,12 @@ def test_checkpoints(
116117
if use_datachain_job_id_env:
117118
monkeypatch.setenv(
118119
"DATACHAIN_JOB_ID",
119-
metastore.create_job("my-job", "echo 1;", parent_job_id=first_job_id),
120+
metastore.create_job(
121+
"my-job",
122+
"echo 1;",
123+
rerun_from_job_id=first_job_id,
124+
run_group_id=first_job.run_group_id,
125+
),
120126
)
121127

122128
chain.save("nums1")

tests/unit/test_job_management.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ def test_get_or_create_links_to_parent(test_session, patch_argv, monkeypatch):
117117
session2 = Session(catalog=test_session.catalog)
118118
job2 = session2.get_or_create_job()
119119

120-
assert job2.parent_job_id == job1.id
120+
assert job2.rerun_from_job_id == job1.id
121121

122122

123123
def test_nested_sessions_share_same_job(test_session, patch_argv, monkeypatch):

0 commit comments

Comments
 (0)