Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def create_valid_pipeline_run(
run_config=execution_params.run_config,
step_keys_to_execute=step_keys_to_execute,
known_state=known_state,
include_asset_events=execution_params.include_asset_events,
)
tags = merge_dicts(remote_job.tags, execution_params.execution_metadata.tags)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,14 @@ def get_remote_execution_plan_or_raise(
run_config: Mapping[str, object],
step_keys_to_execute: Optional[Sequence[str]],
known_state: Optional[KnownExecutionState],
include_asset_events: bool,
) -> RemoteExecutionPlan:
return graphql_context.get_execution_plan(
remote_job=remote_job,
run_config=run_config,
step_keys_to_execute=step_keys_to_execute,
known_state=known_state,
include_asset_events=include_asset_events,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ class ExecutionParams(
("mode", Optional[str]),
("execution_metadata", "ExecutionMetadata"),
("step_keys", Optional[Sequence[str]]),
("include_asset_events", bool),
],
)
):
Expand All @@ -334,6 +335,7 @@ def __new__(
mode: Optional[str],
execution_metadata: "ExecutionMetadata",
step_keys: Optional[Sequence[str]],
include_asset_events: Optional[bool] = True,
):
check.opt_list_param(step_keys, "step_keys", of_type=str)

Expand All @@ -346,6 +348,7 @@ def __new__(
execution_metadata, "execution_metadata", ExecutionMetadata
),
step_keys=step_keys,
include_asset_events=include_asset_events,
)

def to_graphql_input(self) -> Mapping[str, Any]:
Expand All @@ -355,6 +358,7 @@ def to_graphql_input(self) -> Mapping[str, Any]:
"mode": self.mode,
"executionMetadata": self.execution_metadata.to_graphql_input(),
"stepKeys": self.step_keys,
"includeAssetEvents": self.include_asset_events,
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ class GrapheneExecutionParams(graphene.InputObjectType):
this parameter, provide an empty array, or provide every step name.""",
)
preset = graphene.InputField(graphene.String)
includeAssetEvents = graphene.Boolean()

class Meta:
name = "ExecutionParams"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def execution_params_from_graphql(graphql_execution_params):
graphql_execution_params.get("executionMetadata")
),
step_keys=graphql_execution_params.get("stepKeys"),
include_asset_events=graphql_execution_params.get("includeAssetEvents", True),
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def sync_get_external_execution_plan_grpc(
step_keys_to_execute: Optional[Sequence[str]] = None,
known_state: Optional[KnownExecutionState] = None,
instance: Optional[DagsterInstance] = None,
include_asset_events: bool = True,
) -> ExecutionPlanSnapshot:
from dagster._grpc.client import DagsterGrpcClient

Expand Down Expand Up @@ -62,6 +63,7 @@ def sync_get_external_execution_plan_grpc(
instance_ref=instance.get_ref() if instance and instance.is_persistent else None,
asset_selection=asset_selection,
asset_check_selection=asset_check_selection,
include_asset_events=include_asset_events,
)
),
(ExecutionPlanSnapshot, ExecutionPlanSnapshotErrorData),
Expand Down
3 changes: 3 additions & 0 deletions python_modules/dagster/dagster/_core/execution/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,7 @@ def _get_execution_plan_from_run(
known_state=(
execution_plan_snapshot.initial_known_state if execution_plan_snapshot else None
),
include_asset_events=execution_plan_snapshot.include_asset_events,
)


Expand All @@ -676,6 +677,7 @@ def create_execution_plan(
instance_ref: Optional[InstanceRef] = None,
tags: Optional[Mapping[str, str]] = None,
repository_load_data: Optional[RepositoryLoadData] = None,
include_asset_events: bool = True,
) -> ExecutionPlan:
if isinstance(job, IJob):
# If you have repository_load_data, make sure to use it when building plan
Expand Down Expand Up @@ -709,6 +711,7 @@ def create_execution_plan(
instance_ref=instance_ref,
tags=tags,
repository_load_data=repository_load_data,
include_asset_events=include_asset_events,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,22 @@ def _user_failure_data_for_exc(exc: Optional[BaseException]) -> Optional[UserFai
return None


def _iterator_with_maybe_blocked_asset_events(
events: Sequence[DagsterEvent], step_context: StepExecutionContext
) -> Iterator[DagsterEvent]:
for event in events:
step_context.log.info(
f"Execution plan events: {step_context.execution_plan.include_asset_events}"
)
if event.is_step_materialization and not step_context.execution_plan.include_asset_events:
step_context.log.warning(
"Skipping asset materialization for step %s because include_asset_events is false",
step_context.step.key,
)
continue
yield event


def dagster_event_sequence_for_step(
step_context: StepExecutionContext, force_local_execution: bool = False
) -> Iterator[DagsterEvent]:
Expand Down Expand Up @@ -242,7 +258,9 @@ def dagster_event_sequence_for_step(
else:
step_events = core_dagster_event_sequence_for_step(step_context)

yield from check.generator(step_events)
yield from check.generator(
_iterator_with_maybe_blocked_asset_events(step_events, step_context)
)

# case (1) in top comment
except RetryRequested as retry_request:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,13 @@
DagsterTypeCheckError,
user_code_error_boundary,
)
from dagster._core.events import DagsterEvent, DagsterEventBatchMetadata, generate_event_batch_id
from dagster._core.events import (
DagsterEvent,
DagsterEventBatchMetadata,
DagsterEventType,
EngineEventData,
generate_event_batch_id,
)
from dagster._core.execution.context.compute import enter_execution_context
from dagster._core.execution.context.output import OutputContext
from dagster._core.execution.context.system import StepExecutionContext, TypeCheckContext
Expand Down Expand Up @@ -831,7 +837,6 @@ def _gen_fn():
)
else:
materialization = mgr_materialization

yield DagsterEvent.asset_materialization(step_context, materialization)

yield from _log_materialization_or_observation_events_for_asset(
Expand Down Expand Up @@ -913,6 +918,21 @@ def _dagster_event_for_asset_event(
asset_event: Union[AssetMaterialization, AssetObservation],
batch_metadata: Optional[DagsterEventBatchMetadata],
) -> DagsterEvent:
from dagster._serdes import serialize_value

if not step_context.execution_plan.include_asset_events:
step_context.log.warning(
"Executing from a context where asset events are being deferred. Logging as an engine event instead."
)
return DagsterEvent.from_step(
event_type=DagsterEventType.ENGINE_EVENT,
event_specific_data=EngineEventData(
metadata={"asset_event": serialize_value(asset_event)}
),
message="Deferring materialization of asset.",
step_context=step_context,
)

if isinstance(asset_event, AssetMaterialization):
return DagsterEvent.asset_materialization(step_context, asset_event, batch_metadata)
else: # observation
Expand Down
10 changes: 10 additions & 0 deletions python_modules/dagster/dagster/_core/execution/plan/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def __init__(
instance_ref: Optional[InstanceRef],
tags: Mapping[str, str],
repository_load_data: Optional[RepositoryLoadData],
include_asset_events: bool = True,
):
self.job_def = check.inst_param(job_def, "job", JobDefinition)
self.resolved_run_config = check.inst_param(
Expand All @@ -119,6 +120,7 @@ def __init__(
NodeOutput, Union[StepOutputHandle, UnresolvedStepOutputHandle]
] = {}
self._seen_handles: set[StepHandleUnion] = set()
self._include_asset_events = include_asset_events

def add_step(self, step: IExecutionStep) -> None:
# Keep track of the step keys we've seen so far to ensure we don't add duplicates
Expand Down Expand Up @@ -200,6 +202,7 @@ def build(self) -> "ExecutionPlan":
),
executor_name=executor_name,
repository_load_data=self.repository_load_data,
include_asset_events=self._include_asset_events,
)

if (
Expand Down Expand Up @@ -631,6 +634,7 @@ class ExecutionPlan(
("step_dict_by_key", dict[str, IExecutionStep]),
("executor_name", Optional[str]),
("repository_load_data", Optional[RepositoryLoadData]),
("include_asset_events", bool),
],
)
):
Expand All @@ -645,6 +649,7 @@ def __new__(
step_dict_by_key: Optional[dict[str, IExecutionStep]] = None,
executor_name: Optional[str] = None,
repository_load_data: Optional[RepositoryLoadData] = None,
include_asset_events: bool = True,
):
return super().__new__(
cls,
Expand Down Expand Up @@ -685,6 +690,7 @@ def __new__(
repository_load_data=check.opt_inst_param(
repository_load_data, "repository_load_data", RepositoryLoadData
),
include_asset_events=check.bool_param(include_asset_events, "include_asset_events"),
)

@property
Expand Down Expand Up @@ -871,6 +877,7 @@ def build_subset_plan(
),
executor_name=self.executor_name,
repository_load_data=self.repository_load_data,
include_asset_events=self.include_asset_events,
)

def get_version_for_step_output_handle(
Expand Down Expand Up @@ -923,6 +930,7 @@ def build(
instance_ref: Optional[InstanceRef] = None,
tags: Optional[Mapping[str, str]] = None,
repository_load_data: Optional[RepositoryLoadData] = None,
include_asset_events: bool = True,
) -> "ExecutionPlan":
"""Here we build a new ExecutionPlan from a job definition and the resolved run config.

Expand All @@ -940,6 +948,7 @@ def build(
instance_ref=instance_ref,
tags=tags or {},
repository_load_data=repository_load_data,
include_asset_events=include_asset_events,
).build()

@staticmethod
Expand Down Expand Up @@ -1067,6 +1076,7 @@ def rebuild_from_snapshot(
execution_plan_snapshot.artifacts_persisted,
executor_name=execution_plan_snapshot.executor_name,
repository_load_data=execution_plan_snapshot.repository_load_data,
include_asset_events=execution_plan_snapshot.include_asset_events,
)


Expand Down
4 changes: 4 additions & 0 deletions python_modules/dagster/dagster/_core/executor/multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def __init__(
retry_mode: RetryMode,
known_state: Optional[KnownExecutionState],
repository_load_data: Optional[RepositoryLoadData],
include_asset_events: Optional[bool],
):
self.run_config = run_config
self.dagster_run = dagster_run
Expand All @@ -69,6 +70,7 @@ def __init__(
self.retry_mode = retry_mode
self.known_state = known_state
self.repository_load_data = repository_load_data
self.include_asset_events = include_asset_events

def execute(self) -> Iterator[DagsterEvent]:
recon_job = self.recon_pipeline
Expand All @@ -93,6 +95,7 @@ def execute(self) -> Iterator[DagsterEvent]:
step_keys_to_execute=[self.step_key],
known_state=self.known_state,
repository_load_data=self.repository_load_data,
include_asset_events=self.include_asset_events,
)
yield from execute_plan_iterator(
execution_plan,
Expand Down Expand Up @@ -383,6 +386,7 @@ def execute_step_out_of_process(
retry_mode=retries,
known_state=known_state,
repository_load_data=repository_load_data,
include_asset_events=step_context.execution_plan.include_asset_events,
)

yield DagsterEvent.step_worker_starting(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def get_execution_plan(
run_config: Mapping[str, object],
step_keys_to_execute: Optional[Sequence[str]],
known_state: Optional[KnownExecutionState],
include_asset_events: bool = True,
instance: Optional[DagsterInstance] = None,
) -> RemoteExecutionPlan:
pass
Expand Down Expand Up @@ -475,6 +476,7 @@ def get_execution_plan(
run_config: Mapping[str, object],
step_keys_to_execute: Optional[Sequence[str]],
known_state: Optional[KnownExecutionState],
include_asset_events: bool = True,
instance: Optional[DagsterInstance] = None,
) -> RemoteExecutionPlan:
check.inst_param(remote_job, "remote_job", RemoteJob)
Expand All @@ -497,6 +499,7 @@ def get_execution_plan(
step_keys_to_execute=step_keys_to_execute,
known_state=known_state,
instance_ref=instance.get_ref() if instance and instance.is_persistent else None,
include_asset_events=include_asset_events,
)
return RemoteExecutionPlan(
execution_plan_snapshot=snapshot_from_execution_plan(
Expand Down Expand Up @@ -830,6 +833,7 @@ def get_execution_plan(
run_config: Mapping[str, Any],
step_keys_to_execute: Optional[Sequence[str]],
known_state: Optional[KnownExecutionState],
include_asset_events: bool = True,
instance: Optional[DagsterInstance] = None,
) -> RemoteExecutionPlan:
check.inst_param(remote_job, "remote_job", RemoteJob)
Expand Down Expand Up @@ -862,6 +866,7 @@ def get_execution_plan(
step_keys_to_execute=step_keys_to_execute,
known_state=known_state,
instance=instance,
include_asset_events=include_asset_events,
)

return RemoteExecutionPlan(execution_plan_snapshot=execution_plan_snapshot_or_error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1697,7 +1697,6 @@ def asset_node_snaps_from_repo(repo: RepositoryDefinition) -> Sequence[AssetNode
pools = set()
op_names = []
op_name = None
job_names = []
compute_kind = None
node_definition_name = None
output_name = None
Expand Down
Loading