Skip to content

Commit a7a7759

Browse files
committed
Make runs obey event log entry timestamp
1 parent 5176207 commit a7a7759

File tree

5 files changed

+63
-15
lines changed

5 files changed

+63
-15
lines changed

python_modules/dagster/dagster/_core/execution/context_creation_job.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
from dagster._core.execution.retries import RetryMode
4545
from dagster._core.executor.init import InitExecutorContext
4646
from dagster._core.instance import DagsterInstance
47-
from dagster._core.log_manager import DagsterLogManager
4847
from dagster._core.storage.dagster_run import DagsterRun
4948
from dagster._core.system_config.objects import ResolvedRunConfig
5049
from dagster._loggers import default_loggers, default_system_loggers
@@ -55,10 +54,16 @@
5554
from dagster._core.execution.plan.outputs import StepOutputHandle
5655
from dagster._core.executor.base import Executor
5756

57+
# Import within functions so that we can mock the class in tests using freeze_time.
58+
# Essentially, we want to be able to control the timestamp of the log records.
59+
from dagster._core.log_manager import DagsterLogManager
60+
5861

5962
def initialize_console_manager(
6063
dagster_run: Optional[DagsterRun], instance: Optional[DagsterInstance] = None
61-
) -> DagsterLogManager:
64+
) -> "DagsterLogManager":
65+
from dagster._core.log_manager import DagsterLogManager
66+
6267
# initialize default colored console logger
6368
loggers = []
6469
for logger_def, logger_config in default_system_loggers(instance):
@@ -458,7 +463,9 @@ def scoped_job_context(
458463

459464
def create_log_manager(
460465
context_creation_data: ContextCreationData,
461-
) -> DagsterLogManager:
466+
) -> "DagsterLogManager":
467+
from dagster._core.log_manager import DagsterLogManager
468+
462469
check.inst_param(context_creation_data, "context_creation_data", ContextCreationData)
463470

464471
job_def, resolved_run_config, dagster_run = (
@@ -506,14 +513,16 @@ def create_log_manager(
506513

507514
def create_context_free_log_manager(
508515
instance: DagsterInstance, dagster_run: DagsterRun
509-
) -> DagsterLogManager:
516+
) -> "DagsterLogManager":
510517
"""In the event of pipeline initialization failure, we want to be able to log the failure
511518
without a dependency on the PlanExecutionContext to initialize DagsterLogManager.
512519
513520
Args:
514521
dagster_run (PipelineRun)
515522
pipeline_def (JobDefinition)
516523
"""
524+
from dagster._core.log_manager import DagsterLogManager
525+
517526
check.inst_param(instance, "instance", DagsterInstance)
518527
check.inst_param(dagster_run, "dagster_run", DagsterRun)
519528

python_modules/dagster/dagster/_core/execution/host_mode.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
from dagster._core.executor.base import Executor
2727
from dagster._core.executor.init import InitExecutorContext
2828
from dagster._core.instance import DagsterInstance
29-
from dagster._core.log_manager import DagsterLogManager
3029
from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatus
3130
from dagster._loggers import default_system_loggers
3231
from dagster._utils import ensure_single_item
@@ -80,6 +79,8 @@ def host_mode_execution_context_event_generator(
8079
output_capture: None,
8180
resume_from_failure: bool = False,
8281
) -> Iterator[Union[PlanOrchestrationContext, DagsterEvent]]:
82+
from dagster._core.log_manager import DagsterLogManager
83+
8384
check.inst_param(execution_plan, "execution_plan", ExecutionPlan)
8485
check.inst_param(pipeline, "pipeline", ReconstructableJob)
8586

python_modules/dagster/dagster/_core/instance/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@
8484
)
8585
from dagster._core.types.pagination import PaginatedResults
8686
from dagster._serdes import ConfigurableClass
87-
from dagster._time import get_current_datetime, get_current_timestamp
87+
from dagster._time import datetime_from_timestamp, get_current_datetime, get_current_timestamp
8888
from dagster._utils import PrintFn, is_uuid, traced
8989
from dagster._utils.error import serializable_error_info_from_exc_info
9090
from dagster._utils.merger import merge_dicts
@@ -2667,7 +2667,9 @@ def handle_new_event(
26672667
and event.is_dagster_event
26682668
and event.get_dagster_event().is_job_event
26692669
):
2670-
self._run_storage.handle_run_event(run_id, event.get_dagster_event())
2670+
self._run_storage.handle_run_event(
2671+
run_id, event.get_dagster_event(), datetime_from_timestamp(event.timestamp)
2672+
)
26712673
run = self.get_run_by_id(run_id)
26722674
if run and event.get_dagster_event().is_run_failure and self.run_retries_enabled:
26732675
# Note that this tag is only applied to runs that fail. Successful runs will not

python_modules/dagster/dagster/_core/storage/runs/sql_run_storage.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,9 @@ def add_historical_run(
169169
)
170170
return dagster_run
171171

172-
def handle_run_event(self, run_id: str, event: DagsterEvent) -> None:
172+
def handle_run_event(
173+
self, run_id: str, event: DagsterEvent, update_timestamp: Optional[datetime] = None
174+
) -> None:
173175
from dagster._core.events import JobFailureData
174176

175177
check.str_param(run_id, "run_id")
@@ -189,19 +191,17 @@ def handle_run_event(self, run_id: str, event: DagsterEvent) -> None:
189191

190192
kwargs = {}
191193

192-
# consider changing the `handle_run_event` signature to get timestamp off of the
193-
# EventLogEntry instead of the DagsterEvent, for consistency
194-
now = get_current_datetime()
194+
update_timestamp = update_timestamp or get_current_datetime()
195195

196196
if run_stats_cols_in_index and event.event_type == DagsterEventType.PIPELINE_START:
197-
kwargs["start_time"] = now.timestamp()
197+
kwargs["start_time"] = update_timestamp.timestamp()
198198

199199
if run_stats_cols_in_index and event.event_type in {
200200
DagsterEventType.PIPELINE_CANCELED,
201201
DagsterEventType.PIPELINE_FAILURE,
202202
DagsterEventType.PIPELINE_SUCCESS,
203203
}:
204-
kwargs["end_time"] = now.timestamp()
204+
kwargs["end_time"] = update_timestamp.timestamp()
205205

206206
with self.connect() as conn:
207207
conn.execute(
@@ -210,7 +210,7 @@ def handle_run_event(self, run_id: str, event: DagsterEvent) -> None:
210210
.values(
211211
run_body=serialize_value(run.with_status(new_job_status)),
212212
status=new_job_status.value,
213-
update_timestamp=now,
213+
update_timestamp=update_timestamp,
214214
**kwargs,
215215
)
216216
)

python_modules/dagster/dagster/_core/test_utils.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import datetime
3+
import logging
34
import os
45
import re
56
import sys
@@ -14,6 +15,7 @@
1415
from pathlib import Path
1516
from signal import Signals
1617
from threading import Event
18+
from types import TracebackType
1719
from typing import ( # noqa: UP035
1820
AbstractSet,
1921
Any,
@@ -78,7 +80,7 @@
7880
from dagster._core.workspace.workspace import CodeLocationEntry, CurrentWorkspace
7981
from dagster._serdes import ConfigurableClass
8082
from dagster._serdes.config_class import ConfigurableClassData
81-
from dagster._time import create_datetime, get_timezone
83+
from dagster._time import create_datetime, get_current_timestamp, get_timezone
8284
from dagster._utils import Counter, get_terminate_signal, traced, traced_counter
8385
from dagster._utils.log import configure_loggers
8486

@@ -724,6 +726,37 @@ def create_test_asset_job(
724726
).get_job_def(name)
725727

726728

729+
def get_freezable_log_manager():
730+
# The log manager usually sets its own timestamp in the guts of python internals, but we want to be able to control it in test scenarios.
731+
from dagster._core.log_manager import DagsterLogManager
732+
733+
class FreezableLogManager(DagsterLogManager):
734+
def makeRecord(
735+
self,
736+
name: str,
737+
level: int,
738+
fn: str,
739+
lno: int,
740+
msg: object,
741+
args: tuple[object, ...] | Mapping[str, object],
742+
exc_info: tuple[type[BaseException], BaseException, TracebackType | None]
743+
| tuple[None, None, None]
744+
| None,
745+
func: str | None = None,
746+
extra: Mapping[str, object] | None = None,
747+
sinfo: str | None = None,
748+
) -> logging.LogRecord:
749+
record = super().makeRecord(
750+
name, level, fn, lno, msg, args, exc_info, func, extra, sinfo
751+
)
752+
record.created = get_current_timestamp()
753+
record.msecs = (record.created - int(record.created)) * 1000
754+
record.relativeCreated = (record.created - logging._startTime) * 1000 # noqa: SLF001
755+
return record
756+
757+
return FreezableLogManager
758+
759+
727760
@contextmanager
728761
def freeze_time(new_now: Union[datetime.datetime, float]):
729762
new_dt = (
@@ -737,6 +770,9 @@ def freeze_time(new_now: Union[datetime.datetime, float]):
737770
unittest.mock.patch(
738771
"dagster._time._mockable_get_current_timestamp", return_value=new_dt.timestamp()
739772
),
773+
unittest.mock.patch(
774+
"dagster._core.log_manager.DagsterLogManager", new=get_freezable_log_manager()
775+
),
740776
):
741777
yield
742778

0 commit comments

Comments
 (0)