Skip to content

Commit a7caf6a

Browse files
committed
[dagster-airlift][jobs 5/n] Definitions.execute_job_in_process
1 parent b38fa41 commit a7caf6a

File tree

7 files changed

+275
-81
lines changed

7 files changed

+275
-81
lines changed

python_modules/dagster/dagster/_core/definitions/definitions_class.py

+74
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@
4444
from dagster._utils.warnings import disable_dagster_warnings
4545

4646
if TYPE_CHECKING:
47+
from dagster._core.definitions.run_config import RunConfig
48+
from dagster._core.execution.execute_in_process_result import ExecuteInProcessResult
4749
from dagster._core.storage.asset_value_loader import AssetValueLoader
4850

4951

@@ -795,3 +797,75 @@ def asset2(_): ...
795797
*[d for d in self.assets or [] if not isinstance(d, (AssetsDefinition, AssetSpec))],
796798
]
797799
return replace(self, assets=assets)
800+
801+
def execute_job_in_process(
802+
self,
803+
job_name: str,
804+
run_config: Optional[Union[Mapping[str, Any], "RunConfig"]] = None,
805+
instance: Optional["DagsterInstance"] = None,
806+
partition_key: Optional[str] = None,
807+
raise_on_error: bool = True,
808+
op_selection: Optional[Sequence[str]] = None,
809+
asset_selection: Optional[Sequence[AssetKey]] = None,
810+
run_id: Optional[str] = None,
811+
input_values: Optional[Mapping[str, object]] = None,
812+
tags: Optional[Mapping[str, str]] = None,
813+
resources: Optional[Mapping[str, object]] = None,
814+
) -> "ExecuteInProcessResult":
815+
from dagster._core.definitions.job_base import RepoBackedJob
816+
from dagster._core.execution.execute_in_process import (
817+
core_execute_in_process,
818+
merge_run_tags,
819+
type_check_and_normalize_args,
820+
)
821+
822+
run_config, op_selection, asset_selection, resource_defs, partition_key, input_values = (
823+
type_check_and_normalize_args(
824+
run_config=run_config,
825+
partition_key=partition_key,
826+
op_selection=op_selection,
827+
asset_selection=asset_selection,
828+
input_values=input_values,
829+
resources=resources,
830+
)
831+
)
832+
833+
job_def = check.not_none(get_job_from_defs(job_name, self)).as_ephemeral_job(
834+
resource_defs=resource_defs,
835+
input_values=check.opt_mapping_param(
836+
input_values, "input_values", key_type=str, value_type=object
837+
),
838+
op_selection=op_selection,
839+
asset_selection=asset_selection,
840+
)
841+
new_job_list = [job for job in (self.jobs or []) if job.name != job_name] + [job_def]
842+
new_defs_obj = replace(self, jobs=new_job_list)
843+
resolved_repo = new_defs_obj.get_repository_def()
844+
wrapped_job = RepoBackedJob(job_name=job_name, repository_def=resolved_repo)
845+
return core_execute_in_process(
846+
job=wrapped_job,
847+
run_config=run_config,
848+
instance=instance,
849+
output_capturing_enabled=True,
850+
raise_on_error=raise_on_error,
851+
run_tags=merge_run_tags(
852+
job_def=job_def,
853+
partition_key=partition_key,
854+
tags=tags,
855+
asset_selection=asset_selection,
856+
instance=instance,
857+
run_config=run_config,
858+
),
859+
run_id=run_id,
860+
asset_selection=frozenset(asset_selection),
861+
)
862+
863+
864+
def get_job_from_defs(
865+
name: str, defs: Definitions
866+
) -> Optional[Union[JobDefinition, UnresolvedAssetJobDefinition]]:
867+
"""Get the job from the definitions by its name."""
868+
return next(
869+
iter(job for job in (defs.jobs or []) if job.name == name),
870+
None,
871+
)

python_modules/dagster/dagster/_core/definitions/job_base.py

+46
Original file line numberDiff line numberDiff line change
@@ -95,3 +95,49 @@ def asset_selection(self) -> Optional[AbstractSet[AssetKey]]:
9595
@property
9696
def asset_check_selection(self) -> Optional[AbstractSet[AssetCheckKey]]:
9797
return self._job_def.asset_check_selection
98+
99+
100+
class RepoBackedJob(IJob):
101+
def __init__(
102+
self,
103+
job_name: str,
104+
repository_def: "RepositoryDefinition",
105+
):
106+
self._job_name = job_name
107+
self._repository_def = repository_def
108+
109+
def get_definition(self) -> "JobDefinition":
110+
return self._repository_def.get_job(self._job_name)
111+
112+
def get_repository_definition(self) -> Optional["RepositoryDefinition"]:
113+
return self._repository_def
114+
115+
@property
116+
def op_selection(self) -> Optional[AbstractSet[str]]:
117+
return self.get_definition().op_selection
118+
119+
@property
120+
def asset_selection(self) -> Optional[AbstractSet[AssetKey]]:
121+
return self.get_definition().asset_selection
122+
123+
@property
124+
def asset_check_selection(self) -> Optional[AbstractSet[AssetCheckKey]]:
125+
return self.get_definition().asset_check_selection
126+
127+
def get_subset(
128+
self,
129+
*,
130+
op_selection: Optional[Iterable[str]] = None,
131+
asset_selection: Optional[AbstractSet[AssetKey]] = None,
132+
asset_check_selection: Optional[AbstractSet[AssetCheckKey]] = None,
133+
) -> "RepoBackedJob":
134+
op_selection = set(op_selection) if op_selection else None
135+
return RepoBackedJob(
136+
self._job_name,
137+
self._repository_def.get_subset(
138+
job_name=self._job_name,
139+
op_selection=op_selection,
140+
asset_selection=asset_selection,
141+
asset_check_selection=asset_check_selection,
142+
),
143+
)

python_modules/dagster/dagster/_core/definitions/job_definition.py

+50-60
Original file line numberDiff line numberDiff line change
@@ -666,34 +666,61 @@ def execute_in_process(
666666
:py:class:`~dagster.ExecuteInProcessResult`
667667
668668
"""
669-
from dagster._core.definitions.executor_definition import execute_in_process_executor
670-
from dagster._core.definitions.run_config import convert_config_input
671-
from dagster._core.execution.build_resources import wrap_resources_for_execution
672-
from dagster._core.execution.execute_in_process import core_execute_in_process
673-
674-
run_config = check.opt_mapping_param(convert_config_input(run_config), "run_config")
675-
op_selection = check.opt_sequence_param(op_selection, "op_selection", str)
676-
asset_selection = check.opt_sequence_param(asset_selection, "asset_selection", AssetKey)
677-
resources = check.opt_mapping_param(resources, "resources", key_type=str)
669+
from dagster._core.definitions.job_base import InMemoryJob
670+
from dagster._core.execution.execute_in_process import (
671+
core_execute_in_process,
672+
merge_run_tags,
673+
type_check_and_normalize_args,
674+
)
678675

679-
resource_defs = wrap_resources_for_execution(resources)
676+
run_config, op_selection, asset_selection, resource_defs, partition_key, input_values = (
677+
type_check_and_normalize_args(
678+
run_config=run_config,
679+
partition_key=partition_key,
680+
op_selection=op_selection,
681+
asset_selection=asset_selection,
682+
input_values=input_values,
683+
resources=resources,
684+
)
685+
)
680686

681-
check.invariant(
682-
not (op_selection and asset_selection),
683-
"op_selection and asset_selection cannot both be provided as args to"
684-
" execute_in_process",
687+
ephemeral_job = self.as_ephemeral_job(
688+
resource_defs=resource_defs,
689+
input_values=input_values,
690+
op_selection=op_selection,
691+
asset_selection=asset_selection,
685692
)
686693

687-
partition_key = check.opt_str_param(partition_key, "partition_key")
688-
input_values = check.opt_mapping_param(input_values, "input_values")
694+
wrapped_job = InMemoryJob(job_def=ephemeral_job)
695+
return core_execute_in_process(
696+
job=wrapped_job,
697+
run_config=run_config,
698+
instance=instance,
699+
output_capturing_enabled=True,
700+
raise_on_error=raise_on_error,
701+
run_tags=merge_run_tags(
702+
job_def=self,
703+
partition_key=partition_key,
704+
tags=tags,
705+
asset_selection=asset_selection,
706+
instance=instance,
707+
run_config=run_config,
708+
),
709+
run_id=run_id,
710+
asset_selection=frozenset(asset_selection),
711+
)
689712

690-
# Combine provided input values at execute_in_process with input values
691-
# provided to the definition. Input values provided at
692-
# execute_in_process will override those provided on the definition.
693-
input_values = merge_dicts(self.input_values, input_values)
713+
def as_ephemeral_job(
714+
self,
715+
resource_defs: Mapping[str, ResourceDefinition],
716+
input_values: Mapping[str, object],
717+
op_selection: Optional[Sequence[str]] = None,
718+
asset_selection: Optional[Sequence[AssetKey]] = None,
719+
) -> "JobDefinition":
720+
from dagster._core.definitions.executor_definition import execute_in_process_executor
694721

695722
bound_resource_defs = dict(self.resource_defs)
696-
ephemeral_job = JobDefinition.dagster_internal_init(
723+
return JobDefinition.dagster_internal_init(
697724
name=self._name,
698725
graph_def=self._graph_def,
699726
resource_defs={**_swap_default_io_man(bound_resource_defs, self), **resource_defs},
@@ -705,54 +732,17 @@ def execute_in_process(
705732
run_tags=self._run_tags,
706733
op_retry_policy=self._op_retry_policy,
707734
asset_layer=self.asset_layer,
708-
input_values=input_values,
735+
input_values=merge_dicts(self.input_values, input_values),
709736
description=self.description,
710737
partitions_def=self.partitions_def,
711738
metadata=self.metadata,
712739
_subset_selection_data=None, # this is added below
713740
_was_explicitly_provided_resources=True,
714-
)
715-
716-
ephemeral_job = ephemeral_job.get_subset(
741+
).get_subset(
717742
op_selection=op_selection,
718743
asset_selection=frozenset(asset_selection) if asset_selection else None,
719744
)
720745

721-
merged_run_tags = merge_dicts(self.run_tags, tags or {})
722-
if partition_key:
723-
ephemeral_job.validate_partition_key(
724-
partition_key,
725-
selected_asset_keys=asset_selection,
726-
dynamic_partitions_store=instance,
727-
)
728-
tags_for_partition_key = ephemeral_job.get_tags_for_partition_key(
729-
partition_key,
730-
selected_asset_keys=asset_selection,
731-
)
732-
733-
if not run_config and self.partitioned_config:
734-
run_config = self.partitioned_config.get_run_config_for_partition_key(partition_key)
735-
736-
if self.partitioned_config:
737-
merged_run_tags.update(
738-
self.partitioned_config.get_tags_for_partition_key(
739-
partition_key, job_name=self.name
740-
)
741-
)
742-
else:
743-
merged_run_tags.update(tags_for_partition_key)
744-
745-
return core_execute_in_process(
746-
ephemeral_job=ephemeral_job,
747-
run_config=run_config,
748-
instance=instance,
749-
output_capturing_enabled=True,
750-
raise_on_error=raise_on_error,
751-
run_tags=merged_run_tags,
752-
run_id=run_id,
753-
asset_selection=frozenset(asset_selection),
754-
)
755-
756746
def _get_partitions_def(
757747
self, selected_asset_keys: Optional[Iterable[AssetKey]]
758748
) -> PartitionsDefinition:

0 commit comments

Comments
 (0)