Skip to content

Commit 4b3b452

Browse files
committed
Add history import run method
1 parent de1c219 commit 4b3b452

File tree

4 files changed

+51
-9
lines changed

4 files changed

+51
-9
lines changed

python_modules/dagster/dagster/_core/storage/legacy_storage.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from collections.abc import Iterable, Mapping, Sequence
2+
from datetime import datetime
23
from typing import TYPE_CHECKING, AbstractSet, Optional, Union # noqa: UP035
34

45
from dagster import _check as check
@@ -198,6 +199,11 @@ def from_config_value(
198199
def add_run(self, dagster_run: "DagsterRun") -> "DagsterRun":
199200
return self._storage.run_storage.add_run(dagster_run)
200201

202+
def add_historical_run(
203+
self, dagster_run: "DagsterRun", run_creation_time: datetime
204+
) -> "DagsterRun":
205+
return self._storage.run_storage.add_historical_run(dagster_run, run_creation_time)
206+
201207
def handle_run_event(self, run_id: str, event: "DagsterEvent") -> None:
202208
return self._storage.run_storage.handle_run_event(run_id, event)
203209

python_modules/dagster/dagster/_core/storage/runs/base.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from abc import ABC, abstractmethod
22
from collections.abc import Mapping, Sequence
3+
from datetime import datetime
34
from typing import TYPE_CHECKING, Optional, Union
45

56
from typing_extensions import TypedDict
@@ -54,6 +55,12 @@ def add_run(self, dagster_run: DagsterRun) -> DagsterRun:
5455
dagster_run (DagsterRun): The run to add.
5556
"""
5657

58+
@abstractmethod
59+
def add_historical_run(
60+
self, dagster_run: DagsterRun, run_creation_time: datetime
61+
) -> DagsterRun:
62+
"""Add a historical run to storage."""
63+
5764
@abstractmethod
5865
def handle_run_event(self, run_id: str, event: DagsterEvent) -> None:
5966
"""Update run storage in accordance to a pipeline run related DagsterEvent.

python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,9 @@ def fetchone(self, query: SqlAlchemyQuery) -> Optional[Any]:
110110
else:
111111
return conn.execute(query).fetchone()
112112

113-
def add_run(self, dagster_run: DagsterRun) -> DagsterRun:
113+
def _get_run_insertion_values(
114+
self, dagster_run: DagsterRun, run_creation_time: Optional[datetime] = None
115+
) -> dict[str, Any]:
114116
check.inst_param(dagster_run, "dagster_run", DagsterRun)
115117

116118
if dagster_run.job_snapshot_id and not self.has_job_snapshot(dagster_run.job_snapshot_id):
@@ -132,24 +134,39 @@ def add_run(self, dagster_run: DagsterRun) -> DagsterRun:
132134
}
133135
if self.has_backfill_id_column():
134136
values["backfill_id"] = dagster_run.tags.get(BACKFILL_ID_TAG)
137+
if run_creation_time:
138+
values["create_timestamp"] = run_creation_time
139+
return values
135140

136-
runs_insert = RunsTable.insert().values(**values)
141+
def _core_add_run(self, col_values: dict[str, Any], tags: Mapping[str, str]) -> None:
142+
run_id = col_values["run_id"]
143+
runs_insert = RunsTable.insert().values(**col_values)
137144
with self.connect() as conn:
138145
try:
139146
conn.execute(runs_insert)
140147
except db_exc.IntegrityError as exc:
141148
raise DagsterRunAlreadyExists from exc
142149

143-
tags_to_insert = dagster_run.tags_for_storage()
144-
if tags_to_insert:
150+
if tags:
145151
conn.execute(
146152
RunTagsTable.insert(),
147-
[
148-
dict(run_id=dagster_run.run_id, key=k, value=v)
149-
for k, v in tags_to_insert.items()
150-
],
153+
[dict(run_id=run_id, key=k, value=v) for k, v in tags.items()],
151154
)
152155

156+
def add_run(self, dagster_run: DagsterRun) -> DagsterRun:
157+
self._core_add_run(
158+
self._get_run_insertion_values(dagster_run, get_current_datetime()),
159+
dagster_run.tags_for_storage(),
160+
)
161+
return dagster_run
162+
163+
def add_historical_run(
164+
self, dagster_run: DagsterRun, run_creation_time: datetime
165+
) -> DagsterRun:
166+
self._core_add_run(
167+
self._get_run_insertion_values(dagster_run, run_creation_time),
168+
dagster_run.tags_for_storage(),
169+
)
153170
return dagster_run
154171

155172
def handle_run_event(self, run_id: str, event: DagsterEvent) -> None:

python_modules/dagster/dagster_tests/storage_tests/utils/run_storage.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import time
44
import unittest
55
from contextlib import contextmanager
6-
from datetime import datetime, timedelta
6+
from datetime import datetime, timedelta, timezone
77
from typing import Optional
88
from uuid import uuid4
99

@@ -405,6 +405,18 @@ def test_get_run_tags(self, storage: RunStorage):
405405
# empty tag_keys implies nothing instead of everything
406406
assert storage.get_run_tags(tag_keys=[]) == []
407407

408+
def test_add_historical_run(self, storage):
409+
run = DagsterRun(run_id=make_new_run_id(), job_name="some_job", tags={"foo": "bar"})
410+
storage.add_historical_run(run, datetime(2025, 1, 1, tzinfo=timezone.utc))
411+
record = storage.get_run_records()[0]
412+
assert record is not None
413+
assert record.dagster_run.tags == run.tags
414+
assert record.create_timestamp == datetime(2025, 1, 1, tzinfo=timezone.utc)
415+
run_tags = storage.get_run_tags(tag_keys=["foo"])
416+
assert len(run_tags) == 1
417+
assert run_tags[0][0] == "foo"
418+
assert run_tags[0][1] == {"bar"}
419+
408420
def test_fetch_by_filter(self, storage):
409421
assert storage
410422
one = make_new_run_id()

0 commit comments

Comments
 (0)