Skip to content

Commit fde1ebf

Browse files
committed
DagsterInstance.report_dagster_event timestamp override
1 parent 834b9f0 commit fde1ebf

File tree

2 files changed

+47
-2
lines changed

2 files changed

+47
-2
lines changed

python_modules/dagster/dagster/_core/instance/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2751,6 +2751,7 @@ def report_dagster_event(
27512751
run_id: str,
27522752
log_level: Union[str, int] = logging.INFO,
27532753
batch_metadata: Optional["DagsterEventBatchMetadata"] = None,
2754+
timestamp: Optional[float] = None,
27542755
) -> None:
27552756
"""Takes a DagsterEvent and stores it in persistent storage for the corresponding DagsterRun."""
27562757
from dagster._core.events.log import EventLogEntry
@@ -2761,7 +2762,7 @@ def report_dagster_event(
27612762
job_name=dagster_event.job_name,
27622763
run_id=run_id,
27632764
error_info=None,
2764-
timestamp=get_current_timestamp(),
2765+
timestamp=timestamp or get_current_timestamp(),
27652766
step_key=dagster_event.step_key,
27662767
dagster_event=dagster_event,
27672768
)

python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@
121121
ASSET_PARTITION_RANGE_START_TAG,
122122
MULTIDIMENSIONAL_PARTITION_PREFIX,
123123
)
124-
from dagster._core.test_utils import create_run_for_test, instance_for_test
124+
from dagster._core.test_utils import create_run_for_test, freeze_time, instance_for_test
125125
from dagster._core.types.loadable_target_origin import LoadableTargetOrigin
126126
from dagster._core.utils import make_new_run_id
127127
from dagster._loggers import colored_console_logger
@@ -3098,6 +3098,50 @@ def test_write_asset_materialization_failures(self, storage, instance, test_run_
30983098

30993099
assert failed_partitions_by_step_key == failed_partitions
31003100

3101+
def test_timestamp_overrides(self, storage, instance: DagsterInstance) -> None:
3102+
# instance.report_dagster_event(
3103+
# DagsterEvent(
3104+
# event_type_value="PIPELINE_START",
3105+
# job_name=run.job_name
3106+
# ),
3107+
# timestamp=datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc)
3108+
# )
3109+
frozen_time = get_current_datetime()
3110+
with freeze_time(frozen_time):
3111+
instance.report_dagster_event(
3112+
run_id="",
3113+
dagster_event=DagsterEvent(
3114+
event_type_value=DagsterEventType.ASSET_MATERIALIZATION.value,
3115+
job_name="",
3116+
event_specific_data=StepMaterializationData(
3117+
AssetMaterialization(asset_key="foo")
3118+
),
3119+
),
3120+
)
3121+
3122+
record = instance.get_asset_records([AssetKey("foo")])[0]
3123+
assert (
3124+
record.asset_entry.last_materialization_record.timestamp == frozen_time.timestamp()
3125+
)
3126+
3127+
report_date = datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc)
3128+
3129+
instance.report_dagster_event(
3130+
run_id="",
3131+
dagster_event=DagsterEvent(
3132+
event_type_value=DagsterEventType.ASSET_MATERIALIZATION.value,
3133+
job_name="",
3134+
event_specific_data=StepMaterializationData(
3135+
AssetMaterialization(asset_key="foo")
3136+
),
3137+
),
3138+
timestamp=report_date.timestamp(),
3139+
)
3140+
record = instance.get_asset_records([AssetKey("foo")])[0]
3141+
assert (
3142+
record.asset_entry.last_materialization_record.timestamp == report_date.timestamp()
3143+
)
3144+
31013145
def test_get_latest_storage_ids_by_partition(self, storage, instance):
31023146
a = AssetKey(["a"])
31033147
b = AssetKey(["b"])

0 commit comments

Comments
 (0)