diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py b/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py index a0286fc7f1957..69fcb9345f937 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py @@ -645,6 +645,7 @@ def resolve_tags(self, _graphene_info: ResolveInfo): return [ GraphenePipelineTag(key=key, value=value) for key, value in self.dagster_run.tags.items() + if get_tag_type(key) != TagType.HIDDEN ] def resolve_externalJobSource(self, _graphene_info: ResolveInfo): diff --git a/python_modules/dagster/dagster/_core/definitions/definitions_class.py b/python_modules/dagster/dagster/_core/definitions/definitions_class.py index 9ba22a6c0a1cc..e88515406dab3 100644 --- a/python_modules/dagster/dagster/_core/definitions/definitions_class.py +++ b/python_modules/dagster/dagster/_core/definitions/definitions_class.py @@ -855,7 +855,7 @@ def execute_job_in_process( ] sensors = [] for sensor in self.sensors or []: - if sensor.has_jobs and any(job.name == job_name for job in sensor.jobs): + if has_job_defs_attached(sensor) and any(job.name == job_name for job in sensor.jobs): sensors.append( sensor.with_updated_jobs( [job for job in sensor.jobs if job.name != job_name] + [job_def] @@ -893,3 +893,7 @@ def get_job_from_defs( iter(job for job in (defs.jobs or []) if job.name == name), None, ) + + +def has_job_defs_attached(sensor_def: SensorDefinition) -> bool: + return any(target.has_job_def for target in sensor_def.targets) diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index a49cbb5ab503c..fe50830435796 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -7,6 +7,7 @@ from abc import abstractmethod from collections import defaultdict from collections.abc import Iterable, Mapping, Sequence +from datetime import datetime from enum import Enum from tempfile import TemporaryDirectory from types import TracebackType @@ -1925,8 +1926,10 @@ def add_snapshot( return self._run_storage.add_snapshot(snapshot) @traced - def handle_run_event(self, run_id: str, event: "DagsterEvent") -> None: - return self._run_storage.handle_run_event(run_id, event) + def handle_run_event( + self, run_id: str, event: "DagsterEvent", update_timestamp: Optional[datetime] = None + ) -> None: + return self._run_storage.handle_run_event(run_id, event, update_timestamp) @traced def add_run_tags(self, run_id: str, new_tags: Mapping[str, str]) -> None: @@ -2751,6 +2754,7 @@ def report_dagster_event( run_id: str, log_level: Union[str, int] = logging.INFO, batch_metadata: Optional["DagsterEventBatchMetadata"] = None, + timestamp: Optional[float] = None, ) -> None: """Takes a DagsterEvent and stores it in persistent storage for the corresponding DagsterRun.""" from dagster._core.events.log import EventLogEntry @@ -2761,7 +2765,7 @@ def report_dagster_event( job_name=dagster_event.job_name, run_id=run_id, error_info=None, - timestamp=get_current_timestamp(), + timestamp=timestamp or get_current_timestamp(), step_key=dagster_event.step_key, dagster_event=dagster_event, ) diff --git a/python_modules/dagster/dagster/_core/storage/legacy_storage.py b/python_modules/dagster/dagster/_core/storage/legacy_storage.py index 3edaea930ad3d..20fdd6d071e66 100644 --- a/python_modules/dagster/dagster/_core/storage/legacy_storage.py +++ b/python_modules/dagster/dagster/_core/storage/legacy_storage.py @@ -204,8 +204,10 @@ def add_historical_run( ) -> "DagsterRun": return self._storage.run_storage.add_historical_run(dagster_run, run_creation_time) - def handle_run_event(self, run_id: str, event: "DagsterEvent") -> None: - return self._storage.run_storage.handle_run_event(run_id, event) + def handle_run_event( + self, run_id: str, event: "DagsterEvent", update_timestamp: Optional[datetime] = None + ) -> None: + return self._storage.run_storage.handle_run_event(run_id, event, update_timestamp) def get_runs( # pyright: ignore[reportIncompatibleMethodOverride] self, diff --git a/python_modules/dagster/dagster/_core/storage/runs/base.py b/python_modules/dagster/dagster/_core/storage/runs/base.py index 1aaf28fd85fc2..09bf3fedd291b 100644 --- a/python_modules/dagster/dagster/_core/storage/runs/base.py +++ b/python_modules/dagster/dagster/_core/storage/runs/base.py @@ -62,7 +62,9 @@ def add_historical_run( """Add a historical run to storage.""" @abstractmethod - def handle_run_event(self, run_id: str, event: DagsterEvent) -> None: + def handle_run_event( + self, run_id: str, event: DagsterEvent, update_timestamp: Optional[datetime] = None + ) -> None: """Update run storage in accordance to a pipeline run related DagsterEvent. Args: diff --git a/python_modules/dagster/dagster/_core/test_utils.py b/python_modules/dagster/dagster/_core/test_utils.py index 35e2bfdbed1b5..ee370d0453334 100644 --- a/python_modules/dagster/dagster/_core/test_utils.py +++ b/python_modules/dagster/dagster/_core/test_utils.py @@ -15,7 +15,6 @@ from pathlib import Path from signal import Signals from threading import Event -from types import TracebackType from typing import ( # noqa: UP035 AbstractSet, Any, @@ -738,20 +737,18 @@ def makeRecord( fn: str, lno: int, msg: object, - args: tuple[object, ...] | Mapping[str, object], - exc_info: tuple[type[BaseException], BaseException, TracebackType | None] - | tuple[None, None, None] - | None, - func: str | None = None, - extra: Mapping[str, object] | None = None, - sinfo: str | None = None, + args, + exc_info, + func=None, + extra=None, + sinfo=None, ) -> logging.LogRecord: record = super().makeRecord( name, level, fn, lno, msg, args, exc_info, func, extra, sinfo ) record.created = get_current_timestamp() record.msecs = (record.created - int(record.created)) * 1000 - record.relativeCreated = (record.created - logging._startTime) * 1000 # noqa: SLF001 + record.relativeCreated = record.created # this is incorrect. You really want to get the start time of the program, but we don't have a great way to do that. Since this is just for testing, we ignore the incosistency. return record return FreezableLogManager diff --git a/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py b/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py index aae98a3e83289..ab88ffb292b15 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py +++ b/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py @@ -121,7 +121,7 @@ ASSET_PARTITION_RANGE_START_TAG, MULTIDIMENSIONAL_PARTITION_PREFIX, ) -from dagster._core.test_utils import create_run_for_test, instance_for_test +from dagster._core.test_utils import create_run_for_test, freeze_time, instance_for_test from dagster._core.types.loadable_target_origin import LoadableTargetOrigin from dagster._core.utils import make_new_run_id from dagster._loggers import colored_console_logger @@ -3098,6 +3098,44 @@ def test_write_asset_materialization_failures(self, storage, instance, test_run_ assert failed_partitions_by_step_key == failed_partitions + def test_timestamp_overrides(self, storage, instance: DagsterInstance) -> None: + frozen_time = get_current_datetime() + frozen_time = get_current_datetime() + with freeze_time(frozen_time): + instance.report_dagster_event( + run_id="", + dagster_event=DagsterEvent( + event_type_value=DagsterEventType.ASSET_MATERIALIZATION.value, + job_name="", + event_specific_data=StepMaterializationData( + AssetMaterialization(asset_key="foo") + ), + ), + ) + + record = instance.get_asset_records([AssetKey("foo")])[0] + assert ( + record.asset_entry.last_materialization_record.timestamp == frozen_time.timestamp() # type: ignore + ) + + report_date = datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc) + + instance.report_dagster_event( + run_id="", + dagster_event=DagsterEvent( + event_type_value=DagsterEventType.ASSET_MATERIALIZATION.value, + job_name="", + event_specific_data=StepMaterializationData( + AssetMaterialization(asset_key="foo") + ), + ), + timestamp=report_date.timestamp(), + ) + record = instance.get_asset_records([AssetKey("foo")])[0] + assert ( + record.asset_entry.last_materialization_record.timestamp == report_date.timestamp() # type: ignore + ) + def test_get_latest_storage_ids_by_partition(self, storage, instance): a = AssetKey(["a"]) b = AssetKey(["b"]) diff --git a/python_modules/libraries/dagster-airlift/dagster_airlift/core/components/airflow_instance/component.py b/python_modules/libraries/dagster-airlift/dagster_airlift/core/components/airflow_instance/component.py index 17803a45164f4..11540aaf9091f 100644 --- a/python_modules/libraries/dagster-airlift/dagster_airlift/core/components/airflow_instance/component.py +++ b/python_modules/libraries/dagster-airlift/dagster_airlift/core/components/airflow_instance/component.py @@ -1,6 +1,6 @@ from collections.abc import Sequence from dataclasses import dataclass -from typing import Annotated, Literal, Optional, Union +from typing import Annotated, Any, Literal, Optional, Union from dagster._core.definitions.definitions_class import Definitions from dagster.components import Component, ComponentLoadContext, Resolvable @@ -42,7 +42,7 @@ def get_scaffold_params(cls) -> Optional[type[BaseModel]]: return AirflowInstanceScaffolderParams def scaffold(self, request: ScaffoldRequest, params: AirflowInstanceScaffolderParams) -> None: - full_params = { + full_params: dict[str, Any] = { "name": params.name, } if params.auth_type == "basic_auth": diff --git a/python_modules/libraries/dagster-airlift/dagster_airlift/core/monitoring_job/event_stream.py b/python_modules/libraries/dagster-airlift/dagster_airlift/core/monitoring_job/event_stream.py index 63c175846d57a..c40355d52b0b0 100644 --- a/python_modules/libraries/dagster-airlift/dagster_airlift/core/monitoring_job/event_stream.py +++ b/python_modules/libraries/dagster-airlift/dagster_airlift/core/monitoring_job/event_stream.py @@ -74,8 +74,8 @@ def persist_state( if not relevant_job_def: return dagster_run_id = make_new_run_id() - context.instance.add_run( - DagsterRun( + context.instance.run_storage.add_historical_run( + dagster_run=DagsterRun( run_id=dagster_run_id, job_name=relevant_job_def.name, tags={ @@ -95,7 +95,8 @@ def persist_state( ) if context.run.remote_job_origin else None, - ) + ), + run_creation_time=self.dag_run.start_date, ) context.instance.report_dagster_event( @@ -104,6 +105,7 @@ def persist_state( event_type_value="PIPELINE_START", job_name=relevant_job_def.name, ), + timestamp=self.dag_run.start_date.timestamp(), ) planned_asset_keys = { @@ -125,6 +127,7 @@ def persist_state( ), step_key=NO_STEP_KEY, ), + timestamp=self.dag_run.start_date.timestamp(), ) @@ -147,9 +150,10 @@ def persist_state( for asset in airflow_data.mapped_asset_keys_by_task_handle[self.task_instance.task_handle]: # IMPROVEME: Add metadata to the materialization event. _report_materialization( - context, - corresponding_run, - AssetMaterialization(asset_key=asset, metadata=self.metadata), + context=context, + corresponding_run=corresponding_run, + materialization=AssetMaterialization(asset_key=asset, metadata=self.metadata), + airflow_event=self.task_instance, ) @@ -170,7 +174,10 @@ def persist_state( corresponding_run = get_dagster_run_for_airflow_repr(context, self.dag_run) for asset in airflow_data.all_asset_keys_by_dag_handle[self.dag_run.dag_handle]: _report_materialization( - context, corresponding_run, AssetMaterialization(asset_key=asset) + context=context, + corresponding_run=corresponding_run, + materialization=AssetMaterialization(asset_key=asset), + airflow_event=self.dag_run, ) if not corresponding_run: @@ -200,7 +207,9 @@ def persist_state( error=None, failure_reason=None, first_step_failure_event=None ), ) - context.instance.report_dagster_event(run_id=dagster_run_id, dagster_event=event) + context.instance.report_dagster_event( + run_id=dagster_run_id, dagster_event=event, timestamp=self.dag_run.end_date.timestamp() + ) def _process_started_runs( @@ -271,7 +280,8 @@ async def _retrieve_logs_for_task_instance( airflow_instance: AirflowInstance, task_instance: TaskInstance, ) -> TaskInstanceCompleted: - logs = airflow_instance.get_task_instance_logs( + logs = await asyncio.to_thread( + airflow_instance.get_task_instance_logs, task_instance.dag_id, task_instance.task_id, task_instance.run_id, @@ -350,9 +360,11 @@ def persist_events( def _report_materialization( + *, context: OpExecutionContext, corresponding_run: Optional[DagsterRun], materialization: AssetMaterialization, + airflow_event: Union[TaskInstance, DagRun], ) -> None: if corresponding_run: context.instance.report_dagster_event( @@ -362,8 +374,10 @@ def _report_materialization( job_name=corresponding_run.job_name, event_specific_data=StepMaterializationData(materialization=materialization), ), + timestamp=airflow_event.end_date.timestamp(), ) else: + # Could also support timestamp override here; but would only benefit jobless Airlift. context.instance.report_runless_asset_event( asset_event=AssetMaterialization( asset_key=materialization.asset_key, diff --git a/python_modules/libraries/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_component_defs.py b/python_modules/libraries/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_component_defs.py index 458b2fe961b6e..8b695e14e378f 100644 --- a/python_modules/libraries/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_component_defs.py +++ b/python_modules/libraries/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_component_defs.py @@ -76,7 +76,8 @@ def test_load_dags_basic(component_for_test: type[AirflowInstanceComponent]) -> assert keyed_spec is not None assert keyed_spec.metadata["foo"] == "bar" - assert len(defs.jobs) == 3 # monitoring job + 2 dag jobs. + assert defs.jobs + assert len(defs.jobs) == 3 # type: ignore # monitoring job + 2 dag jobs. def _scaffold_airlift(scaffold_format: str): diff --git a/python_modules/libraries/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_monitoring_job.py b/python_modules/libraries/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_monitoring_job.py index 0ce2e8cebc993..384b9d21b25f0 100644 --- a/python_modules/libraries/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_monitoring_job.py +++ b/python_modules/libraries/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_monitoring_job.py @@ -51,6 +51,7 @@ def test_monitoring_job_execution(init_load_context: None, instance: DagsterInst defs, af_instance = create_defs_and_instance( assets_per_task={ "dag": {"task": [("a", [])]}, + "dag2": {"task": [("b", [])]}, }, create_runs=False, create_assets_defs=False, @@ -81,6 +82,14 @@ def test_monitoring_job_execution(init_load_context: None, instance: DagsterInst end_date=freeze_datetime, state="failed", ), + # Newly finished run that started after the last iteration, and therefore has no corresponding run on the instance. + make_dag_run( + dag_id="dag2", + run_id="late-run", + start_date=freeze_datetime - timedelta(seconds=15), + end_date=freeze_datetime - timedelta(seconds=10), + state="success", + ), ], seeded_task_instances=[ # Have a newly completed task instance for the newly started run. @@ -91,6 +100,14 @@ def test_monitoring_job_execution(init_load_context: None, instance: DagsterInst start_date=freeze_datetime - timedelta(seconds=30), end_date=freeze_datetime, ), + # Have a newly completed task instance for the late run. + make_task_instance( + dag_id="dag2", + task_id="task", + run_id="late-run", + start_date=freeze_datetime - timedelta(seconds=15), + end_date=freeze_datetime - timedelta(seconds=10), + ), ], seeded_logs={ "run-dag": {"task": f"DAGSTER_START{json.dumps(raw_metadata)}DAGSTER_END"} @@ -101,8 +118,8 @@ def test_monitoring_job_execution(init_load_context: None, instance: DagsterInst mapped_defs=defs, ) success_dagster_run_id = make_new_run_id() - instance.add_run( - DagsterRun( + instance.run_storage.add_historical_run( + dagster_run=DagsterRun( job_name=job_name("dag"), run_id=success_dagster_run_id, tags={ @@ -110,19 +127,20 @@ def test_monitoring_job_execution(init_load_context: None, instance: DagsterInst DAG_ID_TAG_KEY: "dag", }, status=DagsterRunStatus.STARTED, - ) + ), + run_creation_time=freeze_datetime - timedelta(seconds=30), ) failure_dagster_run_id = make_new_run_id() - instance.add_run( - DagsterRun( + instance.run_storage.add_historical_run( + dagster_run=DagsterRun( job_name=job_name("dag"), run_id=failure_dagster_run_id, tags={ DAG_RUN_ID_TAG_KEY: "failure-run", DAG_ID_TAG_KEY: "dag", }, - status=DagsterRunStatus.STARTED, - ) + ), + run_creation_time=freeze_datetime - timedelta(seconds=30), ) result = defs.execute_job_in_process( job_name=monitoring_job_name(af_instance.name), @@ -140,29 +158,44 @@ def test_monitoring_job_execution(init_load_context: None, instance: DagsterInst assert result.success # Expect that the success and failure runs are marked as finished. - assert ( - check.not_none(instance.get_run_by_id(success_dagster_run_id)).status - == DagsterRunStatus.SUCCESS + success_record = check.not_none(instance.get_run_record_by_id(success_dagster_run_id)) + assert success_record.dagster_run.status == DagsterRunStatus.SUCCESS + assert success_record.end_time == (freeze_datetime).timestamp() + + failure_record = check.not_none(instance.get_run_record_by_id(failure_dagster_run_id)) + assert failure_record.dagster_run.status == DagsterRunStatus.FAILURE + assert failure_record.end_time == (freeze_datetime).timestamp() + + # Expect that we created a new run for the newly running run. + newly_started_run_record = next( + iter(instance.get_run_records(filters=RunsFilter(tags={DAG_RUN_ID_TAG_KEY: "run-dag"}))) ) + assert newly_started_run_record.dagster_run.status == DagsterRunStatus.STARTED assert ( - check.not_none(instance.get_run_by_id(failure_dagster_run_id)).status - == DagsterRunStatus.FAILURE - ) - # Expect that we created a new run for the newly running run. - newly_started_run = next( - iter(instance.get_runs(filters=RunsFilter(tags={DAG_RUN_ID_TAG_KEY: "run-dag"}))) + newly_started_run_record.start_time + == (freeze_datetime - timedelta(seconds=30)).timestamp() ) - assert newly_started_run.status == DagsterRunStatus.STARTED + late_run = next( + iter(instance.get_runs(filters=RunsFilter(tags={DAG_RUN_ID_TAG_KEY: "late-run"}))) + ) + assert late_run.status == DagsterRunStatus.SUCCESS + run_record = check.not_none(instance.get_run_record_by_id(late_run.run_id)) + assert ( + run_record.create_timestamp.timestamp() + == (freeze_datetime - timedelta(seconds=15)).timestamp() + ) + assert run_record.start_time == (freeze_datetime - timedelta(seconds=15)).timestamp() + assert run_record.end_time == (freeze_datetime - timedelta(seconds=10)).timestamp() # There should be planned materialization data for the task. planned_info = instance.get_latest_planned_materialization_info(AssetKey("a")) assert planned_info - assert planned_info.run_id == newly_started_run.run_id + assert planned_info.run_id == newly_started_run_record.dagster_run.run_id # Expect that we emitted asset materialization events for the task. mapped_asset_mat = instance.get_latest_materialization_event(AssetKey("a")) assert mapped_asset_mat is not None - assert mapped_asset_mat.run_id == newly_started_run.run_id + assert mapped_asset_mat.run_id == newly_started_run_record.dagster_run.run_id def get_invalid_json_log_content() -> str: @@ -232,9 +265,16 @@ def test_monitoring_job_log_extraction_errors( job_name=monitoring_job_name(af_instance.name), instance=instance, tags={REPOSITORY_LABEL_TAG: "placeholder"}, + run_config=RunConfig( + ops={ + monitoring_job_op_name(af_instance): MonitoringConfig( + range_start=(freeze_datetime - timedelta(seconds=30)).isoformat(), + range_end=freeze_datetime.isoformat(), + ) + } + ), ) assert result.success - newly_started_run = next( iter(instance.get_runs(filters=RunsFilter(tags={DAG_RUN_ID_TAG_KEY: "run-dag"}))) ) @@ -350,18 +390,16 @@ def test_monitor_sensor_cursor(init_load_context: None, instance: DagsterInstanc instance=instance, repository_def=defs.get_repository_def(), ) - result = defs.sensors[0](context) + result = defs.sensors[0](context) # type: ignore assert isinstance(result, RunRequest) assert result.run_config["ops"][monitoring_job_op_name(af_instance)] == { "config": { - "range_start_iso": (freeze_datetime - timedelta(seconds=30)).isoformat(), - "range_end_iso": freeze_datetime.isoformat(), + "range_start": (freeze_datetime - timedelta(seconds=30)).isoformat(), + "range_end": freeze_datetime.isoformat(), } } - assert ( - result.tags["range_start_iso"] == (freeze_datetime - timedelta(seconds=30)).isoformat() - ) - assert result.tags["range_end_iso"] == freeze_datetime.isoformat() + assert result.tags["range_start"] == (freeze_datetime - timedelta(seconds=30)).isoformat() + assert result.tags["range_end"] == freeze_datetime.isoformat() # Create an actual run for the monitoring job that is not finished. run = instance.create_run_for_job( job_def=defs.get_job_def(monitoring_job_name(af_instance.name)), @@ -370,9 +408,9 @@ def test_monitor_sensor_cursor(init_load_context: None, instance: DagsterInstanc status=DagsterRunStatus.STARTED, run_config=result.run_config, ) - result = defs.sensors[0](context) + result = defs.sensors[0](context) # type: ignore assert isinstance(result, SkipReason) - assert "Monitoring job is still running" in result.skip_message + assert "Monitoring job is still running" in result.skip_message # type: ignore # Move the run to a finished state. instance.report_dagster_event( run_id=run.run_id, @@ -383,11 +421,11 @@ def test_monitor_sensor_cursor(init_load_context: None, instance: DagsterInstanc ) # Move time forward and check that we get a new run request. with freeze_time(freeze_datetime + timedelta(seconds=30)): - result = defs.sensors[0](context) + result = defs.sensors[0](context) # type: ignore assert isinstance(result, RunRequest) assert result.run_config["ops"][monitoring_job_op_name(af_instance)] == { "config": { - "range_start_iso": (freeze_datetime).isoformat(), - "range_end_iso": (freeze_datetime + timedelta(seconds=30)).isoformat(), + "range_start": (freeze_datetime).isoformat(), + "range_end": (freeze_datetime + timedelta(seconds=30)).isoformat(), } } diff --git a/python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink_tests/integration_tests/test_e2e_components.py b/python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink_tests/integration_tests/test_e2e_components.py index c7e31fcfd71d4..a695747917840 100644 --- a/python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink_tests/integration_tests/test_e2e_components.py +++ b/python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink_tests/integration_tests/test_e2e_components.py @@ -1,11 +1,15 @@ +from datetime import datetime, timedelta + from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.metadata.metadata_value import TimestampMetadataValue +from dagster._core.definitions.run_config import RunConfig from dagster._core.event_api import EventRecordsFilter from dagster._core.events import DagsterEventType from dagster._core.instance_for_test import instance_for_test from dagster._core.storage.dagster_run import DagsterRunStatus from dagster._core.storage.tags import EXTERNAL_JOB_SOURCE_TAG_KEY from dagster_airlift.constants import DAG_ID_TAG_KEY, DAG_RUN_ID_TAG_KEY +from dagster_airlift.core.monitoring_job.builder import MonitoringConfig, monitoring_job_op_name from dagster_airlift.core.utils import monitoring_job_name from dagster_airlift.test.test_utils import asset_spec from kitchen_sink.airflow_instance import local_airflow_instance @@ -21,8 +25,9 @@ def test_component_based_defs( """Test that component based defs load properly.""" from kitchen_sink.dagster_defs.component_defs import defs - assert len(defs.jobs) == 20 - assert len(defs.assets) == 1 + assert len(defs.jobs) == 20 # type: ignore + assert len(defs.assets) == 1 # type: ignore + assert len(defs.sensors) == 1 # type: ignore for key in ["example1", "example2"]: assert asset_spec(key, defs) @@ -36,7 +41,16 @@ def test_component_based_defs( # Then, execute monitoring job with instance_for_test() as instance: result = defs.execute_job_in_process( - monitoring_job_name(af_instance.name), instance=instance + monitoring_job_name(af_instance.name), + instance=instance, + run_config=RunConfig( + ops={ + monitoring_job_op_name(af_instance): MonitoringConfig( + range_start=(datetime.now() - timedelta(seconds=30)).isoformat(), + range_end=datetime.now().isoformat(), + ) + } + ), ) assert result.success # There should be a run for the dataset producer dag @@ -85,12 +99,7 @@ def test_component_based_defs( for materialized_records in materialized_records if materialized_records.asset_key == AssetKey(key) ) - assert key_record.asset_materialization.metadata - assert key_record.asset_materialization.metadata[ - "my_timestamp" - ] == TimestampMetadataValue(value=111.0) - assert key_record.asset_materialization.metadata[ - "my_other_timestamp" - ] == TimestampMetadataValue(value=113.0) - # It gets overridden by the second print - assert key_record.asset_materialization.metadata["foo"].value == "baz" + metadata = key_record.asset_materialization.metadata # type: ignore + assert metadata["my_timestamp"] == TimestampMetadataValue(value=111.0) + assert metadata["my_other_timestamp"] == TimestampMetadataValue(value=113.0) + assert metadata["foo"].value == "baz"