Skip to content

Data migration #29378

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion python_modules/dagster/dagster/_core/storage/runs/migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
RUN_BACKFILL_ID = "run_backfill_id"
BACKFILL_JOB_NAME_AND_TAGS = "backfill_job_name_and_tags"
BACKFILL_END_TIMESTAMP = "backfill_end_timestamp"

PUBLIC_FACING_TIMESTAMPS = "public_facing_timestamps"
PrintFn: TypeAlias = Callable[[Any], None]
MigrationFn: TypeAlias = Callable[[RunStorage, Optional[PrintFn]], None]

Expand All @@ -48,6 +48,7 @@
RUN_BACKFILL_ID: lambda: migrate_run_backfill_id,
BACKFILL_JOB_NAME_AND_TAGS: lambda: migrate_backfill_job_name_and_tags,
BACKFILL_END_TIMESTAMP: lambda: migrate_backfill_end_timestamp,
PUBLIC_FACING_TIMESTAMPS: lambda: migrate_public_facing_timestamps,
}
# for `dagster instance reindex`, optionally run for better read performance
OPTIONAL_DATA_MIGRATIONS: Final[Mapping[str, Callable[[], MigrationFn]]] = {
Expand Down Expand Up @@ -442,3 +443,34 @@ def get_end_timestamp_for_backfill(run_storage: RunStorage, backfill: PartitionB
# time as an estimation
return backfill.backfill_timestamp
return max([record.end_time or 0.0 for record in run_records])


def add_public_facing_timestamps(conn, run_record: RunRecord) -> None:
conn.execute(
RunsTable.update()
.where(RunsTable.c.run_id == run_record.dagster_run.run_id)
.values(
run_creation_time=run_record.create_timestamp.timestamp(),
public_update_timestamp=run_record.public_update_timestamp.timestamp(),
)
)


def migrate_public_facing_timestamps(
storage: RunStorage, print_fn: Optional[PrintFn] = None
) -> None:
"""Utility method to add public_update_timestamp to the runs table."""
from dagster._core.storage.runs.sql_run_storage import SqlRunStorage

if print_fn:
print_fn("Querying run storage.")

check.inst_param(storage, "run_storage", RunStorage)

if not isinstance(storage, SqlRunStorage):
return

with storage.connect() as conn:
for run_record in chunked_run_records_iterator(storage, print_fn):
if run_record.public_update_timestamp is None or run_record.run_creation_time is None:
add_public_facing_timestamps(conn, run_record)
Original file line number Diff line number Diff line change
Expand Up @@ -1803,27 +1803,35 @@ def test_add_user_facing_run_timestamps():
with DagsterInstance.from_ref(InstanceRef.from_dir(test_dir)) as instance:
assert not instance.run_storage.has_user_facing_run_timestamps()
# add a historical run; it should use the current timestamp / ignore the run_creation_time
historical_run_id = make_new_run_id()
historical_run_creation_time = datetime.datetime(
pre_migration_historical_run_id = make_new_run_id()
pre_migration_historical_run_creation_time = datetime.datetime(
2025, 1, 1, tzinfo=datetime.timezone.utc
)
instance.add_historical_run(
DagsterRun(
job_name="foo",
run_id=historical_run_id,
run_id=pre_migration_historical_run_id,
status=DagsterRunStatus.NOT_STARTED,
),
historical_run_creation_time,
pre_migration_historical_run_creation_time,
)
assert (
instance.get_run_record_by_id(pre_migration_historical_run_id).run_creation_time
is None
)
assert (
instance.get_run_record_by_id(
pre_migration_historical_run_id
).public_update_timestamp
is None
)
assert instance.get_run_record_by_id(historical_run_id).run_creation_time is None
assert instance.get_run_record_by_id(historical_run_id).public_update_timestamp is None
# If the columns do not exist, then we will be using the create_timestamp column to filter.
assert (
len(
instance.get_run_records(
filters=RunsFilter(
run_ids=[historical_run_id],
created_after=historical_run_creation_time
run_ids=[pre_migration_historical_run_id],
created_after=pre_migration_historical_run_creation_time
+ datetime.timedelta(seconds=1),
)
)
Expand All @@ -1835,8 +1843,8 @@ def test_add_user_facing_run_timestamps():
len(
instance.get_run_records(
filters=RunsFilter(
run_ids=[historical_run_id],
created_before=historical_run_creation_time
run_ids=[pre_migration_historical_run_id],
created_before=pre_migration_historical_run_creation_time
+ datetime.timedelta(seconds=1),
)
)
Expand All @@ -1848,8 +1856,8 @@ def test_add_user_facing_run_timestamps():
len(
instance.get_run_records(
filters=RunsFilter(
run_ids=[historical_run_id],
updated_after=historical_run_creation_time
run_ids=[pre_migration_historical_run_id],
updated_after=pre_migration_historical_run_creation_time
+ datetime.timedelta(seconds=1),
)
)
Expand All @@ -1860,8 +1868,8 @@ def test_add_user_facing_run_timestamps():
len(
instance.get_run_records(
filters=RunsFilter(
run_ids=[historical_run_id],
updated_before=historical_run_creation_time
run_ids=[pre_migration_historical_run_id],
updated_before=pre_migration_historical_run_creation_time
+ datetime.timedelta(seconds=1),
)
)
Expand All @@ -1871,34 +1879,49 @@ def test_add_user_facing_run_timestamps():

instance.upgrade()
assert instance.run_storage.has_user_facing_run_timestamps()
# the pre-migration historical run should now have the run_creation_time and public_update_timestamp set, but they will reflect the create_timestamp and update_timestamp respectively
pre_migration_historical_run = instance.get_run_record_by_id(
pre_migration_historical_run_id
)
assert (
pre_migration_historical_run.run_creation_time
== pre_migration_historical_run.create_timestamp.timestamp()
)
assert (
pre_migration_historical_run.public_update_timestamp
== pre_migration_historical_run.update_timestamp.timestamp()
)

# Add another historical run, it should use the run_creation_time
historical_run_id = make_new_run_id()
historical_run_creation_time = datetime.datetime(
post_migration_historical_run_id = make_new_run_id()
post_migration_historical_run_creation_time = datetime.datetime(
2025, 1, 2, tzinfo=datetime.timezone.utc
)
instance.add_historical_run(
DagsterRun(
job_name="foo",
run_id=historical_run_id,
run_id=post_migration_historical_run_id,
status=DagsterRunStatus.NOT_STARTED,
),
historical_run_creation_time,
post_migration_historical_run_creation_time,
)
assert (
instance.get_run_record_by_id(historical_run_id).run_creation_time
== historical_run_creation_time.timestamp()
instance.get_run_record_by_id(post_migration_historical_run_id).run_creation_time
== post_migration_historical_run_creation_time.timestamp()
)
assert (
instance.get_run_record_by_id(historical_run_id).public_update_timestamp
== historical_run_creation_time.timestamp()
instance.get_run_record_by_id(
post_migration_historical_run_id
).public_update_timestamp
== post_migration_historical_run_creation_time.timestamp()
)
# If the columns exist, then we will be using the run_creation_time column to filter.
assert (
len(
instance.get_run_records(
filters=RunsFilter(
run_ids=[historical_run_id],
created_after=historical_run_creation_time
run_ids=[post_migration_historical_run_id],
created_after=post_migration_historical_run_creation_time
+ datetime.timedelta(seconds=1),
)
)
Expand All @@ -1909,8 +1932,8 @@ def test_add_user_facing_run_timestamps():
len(
instance.get_run_records(
filters=RunsFilter(
run_ids=[historical_run_id],
created_before=historical_run_creation_time
run_ids=[post_migration_historical_run_id],
created_before=post_migration_historical_run_creation_time
+ datetime.timedelta(seconds=1),
)
)
Expand All @@ -1921,8 +1944,8 @@ def test_add_user_facing_run_timestamps():
len(
instance.get_run_records(
filters=RunsFilter(
run_ids=[historical_run_id],
updated_after=historical_run_creation_time
run_ids=[post_migration_historical_run_id],
updated_after=post_migration_historical_run_creation_time
+ datetime.timedelta(seconds=1),
)
)
Expand All @@ -1933,8 +1956,8 @@ def test_add_user_facing_run_timestamps():
len(
instance.get_run_records(
filters=RunsFilter(
run_ids=[historical_run_id],
updated_before=historical_run_creation_time
run_ids=[post_migration_historical_run_id],
updated_before=post_migration_historical_run_creation_time
+ datetime.timedelta(seconds=1),
)
)
Expand All @@ -1945,26 +1968,29 @@ def test_add_user_facing_run_timestamps():
instance._run_storage._alembic_downgrade(rev="7e2f3204cf8e") # pyright: ignore[reportAttributeAccessIssue]
assert not instance.run_storage.has_user_facing_run_timestamps()
# Finally, add another historical run, it should use the run_creation_time
historical_run_id = make_new_run_id()
historical_run_creation_time = datetime.datetime(
post_downgrade_historical_run_id = make_new_run_id()
post_downgrade_historical_run_creation_time = datetime.datetime(
2025, 1, 3, tzinfo=datetime.timezone.utc
)
instance.add_historical_run(
DagsterRun(
job_name="foo",
run_id=historical_run_id,
run_id=post_downgrade_historical_run_id,
status=DagsterRunStatus.NOT_STARTED,
),
historical_run_creation_time,
post_downgrade_historical_run_creation_time,
)
assert (
instance.get_run_record_by_id(post_downgrade_historical_run_id).run_creation_time
is None
)
assert instance.get_run_record_by_id(historical_run_id).run_creation_time is None
# If the columns do not exist, then we will be using the create_timestamp column to filter.
assert (
len(
instance.get_run_records(
filters=RunsFilter(
run_ids=[historical_run_id],
created_after=historical_run_creation_time
run_ids=[post_downgrade_historical_run_id],
created_after=post_downgrade_historical_run_creation_time
+ datetime.timedelta(seconds=1),
)
)
Expand All @@ -1976,8 +2002,8 @@ def test_add_user_facing_run_timestamps():
len(
instance.get_run_records(
filters=RunsFilter(
run_ids=[historical_run_id],
created_before=historical_run_creation_time
run_ids=[post_downgrade_historical_run_id],
created_before=post_downgrade_historical_run_creation_time
+ datetime.timedelta(seconds=1),
)
)
Expand All @@ -1988,8 +2014,8 @@ def test_add_user_facing_run_timestamps():
len(
instance.get_run_records(
filters=RunsFilter(
run_ids=[historical_run_id],
updated_after=historical_run_creation_time
run_ids=[post_downgrade_historical_run_id],
updated_after=post_downgrade_historical_run_creation_time
+ datetime.timedelta(seconds=1),
)
)
Expand All @@ -2000,8 +2026,8 @@ def test_add_user_facing_run_timestamps():
len(
instance.get_run_records(
filters=RunsFilter(
run_ids=[historical_run_id],
updated_before=historical_run_creation_time
run_ids=[post_downgrade_historical_run_id],
updated_before=post_downgrade_historical_run_creation_time
+ datetime.timedelta(seconds=1),
)
)
Expand Down