Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,7 @@ def resolve_tags(self, _graphene_info: ResolveInfo):
return [
GraphenePipelineTag(key=key, value=value)
for key, value in self.dagster_run.tags.items()
if get_tag_type(key) != TagType.HIDDEN
]

def resolve_externalJobSource(self, _graphene_info: ResolveInfo):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ def execute_job_in_process(
]
sensors = []
for sensor in self.sensors or []:
if sensor.has_jobs and any(job.name == job_name for job in sensor.jobs):
if has_job_defs_attached(sensor) and any(job.name == job_name for job in sensor.jobs):
sensors.append(
sensor.with_updated_jobs(
[job for job in sensor.jobs if job.name != job_name] + [job_def]
Expand Down Expand Up @@ -893,3 +893,7 @@ def get_job_from_defs(
iter(job for job in (defs.jobs or []) if job.name == name),
None,
)


def has_job_defs_attached(sensor_def: SensorDefinition) -> bool:
return any(target.has_job_def for target in sensor_def.targets)
10 changes: 7 additions & 3 deletions python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from abc import abstractmethod
from collections import defaultdict
from collections.abc import Iterable, Mapping, Sequence
from datetime import datetime
from enum import Enum
from tempfile import TemporaryDirectory
from types import TracebackType
Expand Down Expand Up @@ -1925,8 +1926,10 @@ def add_snapshot(
return self._run_storage.add_snapshot(snapshot)

@traced
def handle_run_event(self, run_id: str, event: "DagsterEvent") -> None:
return self._run_storage.handle_run_event(run_id, event)
def handle_run_event(
self, run_id: str, event: "DagsterEvent", update_timestamp: Optional[datetime] = None
) -> None:
return self._run_storage.handle_run_event(run_id, event, update_timestamp)

@traced
def add_run_tags(self, run_id: str, new_tags: Mapping[str, str]) -> None:
Expand Down Expand Up @@ -2751,6 +2754,7 @@ def report_dagster_event(
run_id: str,
log_level: Union[str, int] = logging.INFO,
batch_metadata: Optional["DagsterEventBatchMetadata"] = None,
timestamp: Optional[float] = None,
) -> None:
"""Takes a DagsterEvent and stores it in persistent storage for the corresponding DagsterRun."""
from dagster._core.events.log import EventLogEntry
Expand All @@ -2761,7 +2765,7 @@ def report_dagster_event(
job_name=dagster_event.job_name,
run_id=run_id,
error_info=None,
timestamp=get_current_timestamp(),
timestamp=timestamp or get_current_timestamp(),
step_key=dagster_event.step_key,
dagster_event=dagster_event,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,10 @@ def add_historical_run(
) -> "DagsterRun":
return self._storage.run_storage.add_historical_run(dagster_run, run_creation_time)

def handle_run_event(self, run_id: str, event: "DagsterEvent") -> None:
return self._storage.run_storage.handle_run_event(run_id, event)
def handle_run_event(
self, run_id: str, event: "DagsterEvent", update_timestamp: Optional[datetime] = None
) -> None:
return self._storage.run_storage.handle_run_event(run_id, event, update_timestamp)

def get_runs( # pyright: ignore[reportIncompatibleMethodOverride]
self,
Expand Down
4 changes: 3 additions & 1 deletion python_modules/dagster/dagster/_core/storage/runs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ def add_historical_run(
"""Add a historical run to storage."""

@abstractmethod
def handle_run_event(self, run_id: str, event: DagsterEvent) -> None:
def handle_run_event(
self, run_id: str, event: DagsterEvent, update_timestamp: Optional[datetime] = None
) -> None:
"""Update run storage in accordance to a pipeline run related DagsterEvent.

Args:
Expand Down
15 changes: 6 additions & 9 deletions python_modules/dagster/dagster/_core/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from pathlib import Path
from signal import Signals
from threading import Event
from types import TracebackType
from typing import ( # noqa: UP035
AbstractSet,
Any,
Expand Down Expand Up @@ -738,20 +737,18 @@ def makeRecord(
fn: str,
lno: int,
msg: object,
args: tuple[object, ...] | Mapping[str, object],
exc_info: tuple[type[BaseException], BaseException, TracebackType | None]
| tuple[None, None, None]
| None,
func: str | None = None,
extra: Mapping[str, object] | None = None,
sinfo: str | None = None,
args,
exc_info,
func=None,
extra=None,
sinfo=None,
) -> logging.LogRecord:
record = super().makeRecord(
name, level, fn, lno, msg, args, exc_info, func, extra, sinfo
)
record.created = get_current_timestamp()
record.msecs = (record.created - int(record.created)) * 1000
record.relativeCreated = (record.created - logging._startTime) * 1000 # noqa: SLF001
record.relativeCreated = record.created # this is incorrect. You really want to get the start time of the program, but we don't have a great way to do that. Since this is just for testing, we ignore the incosistency.
return record

return FreezableLogManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@
ASSET_PARTITION_RANGE_START_TAG,
MULTIDIMENSIONAL_PARTITION_PREFIX,
)
from dagster._core.test_utils import create_run_for_test, instance_for_test
from dagster._core.test_utils import create_run_for_test, freeze_time, instance_for_test
from dagster._core.types.loadable_target_origin import LoadableTargetOrigin
from dagster._core.utils import make_new_run_id
from dagster._loggers import colored_console_logger
Expand Down Expand Up @@ -3098,6 +3098,44 @@ def test_write_asset_materialization_failures(self, storage, instance, test_run_

assert failed_partitions_by_step_key == failed_partitions

def test_timestamp_overrides(self, storage, instance: DagsterInstance) -> None:
frozen_time = get_current_datetime()
frozen_time = get_current_datetime()
with freeze_time(frozen_time):
instance.report_dagster_event(
run_id="",
dagster_event=DagsterEvent(
event_type_value=DagsterEventType.ASSET_MATERIALIZATION.value,
job_name="",
event_specific_data=StepMaterializationData(
AssetMaterialization(asset_key="foo")
),
),
)

record = instance.get_asset_records([AssetKey("foo")])[0]
assert (
record.asset_entry.last_materialization_record.timestamp == frozen_time.timestamp() # type: ignore
)

report_date = datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc)

instance.report_dagster_event(
run_id="",
dagster_event=DagsterEvent(
event_type_value=DagsterEventType.ASSET_MATERIALIZATION.value,
job_name="",
event_specific_data=StepMaterializationData(
AssetMaterialization(asset_key="foo")
),
),
timestamp=report_date.timestamp(),
)
record = instance.get_asset_records([AssetKey("foo")])[0]
assert (
record.asset_entry.last_materialization_record.timestamp == report_date.timestamp() # type: ignore
)

def test_get_latest_storage_ids_by_partition(self, storage, instance):
a = AssetKey(["a"])
b = AssetKey(["b"])
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from collections.abc import Sequence
from dataclasses import dataclass
from typing import Annotated, Literal, Optional, Union
from typing import Annotated, Any, Literal, Optional, Union

from dagster._core.definitions.definitions_class import Definitions
from dagster.components import Component, ComponentLoadContext, Resolvable
Expand Down Expand Up @@ -42,7 +42,7 @@ def get_scaffold_params(cls) -> Optional[type[BaseModel]]:
return AirflowInstanceScaffolderParams

def scaffold(self, request: ScaffoldRequest, params: AirflowInstanceScaffolderParams) -> None:
full_params = {
full_params: dict[str, Any] = {
"name": params.name,
}
if params.auth_type == "basic_auth":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ def persist_state(
if not relevant_job_def:
return
dagster_run_id = make_new_run_id()
context.instance.add_run(
DagsterRun(
context.instance.run_storage.add_historical_run(
dagster_run=DagsterRun(
run_id=dagster_run_id,
job_name=relevant_job_def.name,
tags={
Expand All @@ -95,7 +95,8 @@ def persist_state(
)
if context.run.remote_job_origin
else None,
)
),
run_creation_time=self.dag_run.start_date,
)

context.instance.report_dagster_event(
Expand All @@ -104,6 +105,7 @@ def persist_state(
event_type_value="PIPELINE_START",
job_name=relevant_job_def.name,
),
timestamp=self.dag_run.start_date.timestamp(),
)

planned_asset_keys = {
Expand All @@ -125,6 +127,7 @@ def persist_state(
),
step_key=NO_STEP_KEY,
),
timestamp=self.dag_run.start_date.timestamp(),
)


Expand All @@ -147,9 +150,10 @@ def persist_state(
for asset in airflow_data.mapped_asset_keys_by_task_handle[self.task_instance.task_handle]:
# IMPROVEME: Add metadata to the materialization event.
_report_materialization(
context,
corresponding_run,
AssetMaterialization(asset_key=asset, metadata=self.metadata),
context=context,
corresponding_run=corresponding_run,
materialization=AssetMaterialization(asset_key=asset, metadata=self.metadata),
airflow_event=self.task_instance,
)


Expand All @@ -170,7 +174,10 @@ def persist_state(
corresponding_run = get_dagster_run_for_airflow_repr(context, self.dag_run)
for asset in airflow_data.all_asset_keys_by_dag_handle[self.dag_run.dag_handle]:
_report_materialization(
context, corresponding_run, AssetMaterialization(asset_key=asset)
context=context,
corresponding_run=corresponding_run,
materialization=AssetMaterialization(asset_key=asset),
airflow_event=self.dag_run,
)

if not corresponding_run:
Expand Down Expand Up @@ -200,7 +207,9 @@ def persist_state(
error=None, failure_reason=None, first_step_failure_event=None
),
)
context.instance.report_dagster_event(run_id=dagster_run_id, dagster_event=event)
context.instance.report_dagster_event(
run_id=dagster_run_id, dagster_event=event, timestamp=self.dag_run.end_date.timestamp()
)


def _process_started_runs(
Expand Down Expand Up @@ -271,7 +280,8 @@ async def _retrieve_logs_for_task_instance(
airflow_instance: AirflowInstance,
task_instance: TaskInstance,
) -> TaskInstanceCompleted:
logs = airflow_instance.get_task_instance_logs(
logs = await asyncio.to_thread(
airflow_instance.get_task_instance_logs,
task_instance.dag_id,
task_instance.task_id,
task_instance.run_id,
Expand Down Expand Up @@ -350,9 +360,11 @@ def persist_events(


def _report_materialization(
*,
context: OpExecutionContext,
corresponding_run: Optional[DagsterRun],
materialization: AssetMaterialization,
airflow_event: Union[TaskInstance, DagRun],
) -> None:
if corresponding_run:
context.instance.report_dagster_event(
Expand All @@ -362,8 +374,10 @@ def _report_materialization(
job_name=corresponding_run.job_name,
event_specific_data=StepMaterializationData(materialization=materialization),
),
timestamp=airflow_event.end_date.timestamp(),
)
else:
# Could also support timestamp override here; but would only benefit jobless Airlift.
context.instance.report_runless_asset_event(
asset_event=AssetMaterialization(
asset_key=materialization.asset_key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ def test_load_dags_basic(component_for_test: type[AirflowInstanceComponent]) ->
assert keyed_spec is not None
assert keyed_spec.metadata["foo"] == "bar"

assert len(defs.jobs) == 3 # monitoring job + 2 dag jobs.
assert defs.jobs
assert len(defs.jobs) == 3 # type: ignore # monitoring job + 2 dag jobs.


def _scaffold_airlift(scaffold_format: str):
Expand Down
Loading