From 7e7666e4d3a34f939ddc021e9aa7012dbd5a8c84 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Tue, 8 Apr 2025 13:54:26 -0700 Subject: [PATCH 1/3] More eagerly merge compatible environments. Specifically, merge environments that are identical except for their dependencies, but where one environment's dependencies are a superset of the others. This will reduce the number of distinct environments of the same type, and the associated materialization costs of moving between them. Ideally this would be done in a runner for environments that are already co-located on the same worker, but that is a further optimization. --- sdks/python/apache_beam/runners/common.py | 143 +++++++++++++----- .../python/apache_beam/runners/common_test.py | 111 ++++++++++++++ .../runners/dataflow/dataflow_runner.py | 4 +- .../portability/fn_api_runner/fn_runner.py | 4 +- 4 files changed, 224 insertions(+), 38 deletions(-) diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index c43870d55ebb..6a4a0404c72e 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -2008,43 +2008,65 @@ def validate_transform(transform_id): validate_transform(t) -def merge_common_environments(pipeline_proto, inplace=False): - def dep_key(dep): - if dep.type_urn == common_urns.artifact_types.FILE.urn: - payload = beam_runner_api_pb2.ArtifactFilePayload.FromString( - dep.type_payload) - if payload.sha256: - type_info = 'sha256', payload.sha256 - else: - type_info = 'path', payload.path - elif dep.type_urn == common_urns.artifact_types.URL.urn: - payload = beam_runner_api_pb2.ArtifactUrlPayload.FromString( - dep.type_payload) - if payload.sha256: - type_info = 'sha256', payload.sha256 - else: - type_info = 'url', payload.url +def _dep_key(dep): + if dep.type_urn == common_urns.artifact_types.FILE.urn: + payload = beam_runner_api_pb2.ArtifactFilePayload.FromString( + dep.type_payload) + if payload.sha256: + type_info = 'sha256', payload.sha256 else: - type_info = dep.type_urn, dep.type_payload - return type_info, dep.role_urn, dep.role_payload - - def base_env_key(env): - return ( - env.urn, - env.payload, - tuple(sorted(env.capabilities)), - tuple(sorted(env.resource_hints.items())), - tuple(sorted(dep_key(dep) for dep in env.dependencies))) - - def env_key(env): - return tuple( - sorted( - base_env_key(e) - for e in environments.expand_anyof_environments(env))) + type_info = 'path', payload.path + elif dep.type_urn == common_urns.artifact_types.URL.urn: + payload = beam_runner_api_pb2.ArtifactUrlPayload.FromString( + dep.type_payload) + if payload.sha256: + type_info = 'sha256', payload.sha256 + else: + type_info = 'url', payload.url + else: + type_info = dep.type_urn, dep.type_payload + return type_info, dep.role_urn, dep.role_payload + + +def _expanded_dep_keys(dep): + if (dep.type_urn == common_urns.artifact_types.FILE.urn and + dep.role_urn == common_urns.artifact_roles.STAGING_TO.urn): + payload = beam_runner_api_pb2.ArtifactFilePayload.FromString( + dep.type_payload) + role = beam_runner_api_pb2.ArtifactStagingToRolePayload.FromString( + dep.role_payload) + if role.staged_name == 'submission_environment_dependencies.txt': + return + elif role.staged_name == 'requirements.txt': + with open(payload.path) as fin: + for line in fin: + yield 'requirements.txt', line.strip() + return + + yield _dep_key(dep) + + +def _base_env_key(env, include_deps=True): + return ( + env.urn, + env.payload, + tuple(sorted(env.capabilities)), + tuple(sorted(env.resource_hints.items())), + tuple(sorted(_dep_key(dep) + for dep in env.dependencies)) if include_deps else None) + +def _env_key(env): + return tuple( + sorted( + _base_env_key(e) + for e in environments.expand_anyof_environments(env))) + + +def merge_common_environments(pipeline_proto, inplace=False): canonical_environments = collections.defaultdict(list) for env_id, env in pipeline_proto.components.environments.items(): - canonical_environments[env_key(env)].append(env_id) + canonical_environments[_env_key(env)].append(env_id) if len(canonical_environments) == len(pipeline_proto.components.environments): # All environments are already sufficiently distinct. @@ -2055,6 +2077,55 @@ def env_key(env): for es in canonical_environments.values() for e in es } + return update_environments(pipeline_proto, environment_remappings, inplace) + + +def merge_superset_dep_environments(pipeline_proto): + """Merges all environemnts A and B where A and B are equivalent except that + A has a superset of the dependencies of B. + """ + docker_envs = {} + for env_id, env in pipeline_proto.components.environments.items(): + docker_env = environments.resolve_anyof_environment( + env, common_urns.environments.DOCKER.urn) + if docker_env.urn == common_urns.environments.DOCKER.urn: + docker_envs[env_id] = docker_env + + has_base_and_dep = collections.defaultdict(set) + env_scores = { + env_id: (len(env.dependencies), env_id) + for (env_id, env) in docker_envs.items() + } + + for env_id, env in docker_envs.items(): + base_key = _base_env_key(env, include_deps=False) + has_base_and_dep[base_key, None].add(env_id) + for dep in env.dependencies: + for dep_key in _expanded_dep_keys(dep): + has_base_and_dep[base_key, dep_key].add(env_id) + + environment_remappings = {} + for env_id, env in docker_envs.items(): + base_key = _base_env_key(env, include_deps=False) + # This is the set of all environments that have at least all of env's deps. + candidates = set.intersection( + has_base_and_dep[base_key, None], + *[ + has_base_and_dep[base_key, dep_key] for dep in env.dependencies + for dep_key in _expanded_dep_keys(dep) + ]) + # Choose the maximal one. + best = max(candidates, key=env_scores.get) + if best != env_id: + environment_remappings[env_id] = best + + return update_environments(pipeline_proto, environment_remappings) + + +def update_environments(pipeline_proto, environment_remappings, inplace=False): + if not environment_remappings: + return pipeline_proto + if not inplace: pipeline_proto = copy.copy(pipeline_proto) @@ -2063,16 +2134,16 @@ def env_key(env): # TODO(https://github.com/apache/beam/issues/30876): Remove this # workaround. continue - if t.environment_id: + if t.environment_id and t.environment_id in environment_remappings: t.environment_id = environment_remappings[t.environment_id] for w in pipeline_proto.components.windowing_strategies.values(): if w.environment_id not in pipeline_proto.components.environments: # TODO(https://github.com/apache/beam/issues/30876): Remove this # workaround. continue - if w.environment_id: + if w.environment_id and w.environment_id in environment_remappings: w.environment_id = environment_remappings[w.environment_id] - for e in set(pipeline_proto.components.environments.keys()) - set( + for e in set(environment_remappings.keys()) - set( environment_remappings.values()): del pipeline_proto.components.environments[e] return pipeline_proto diff --git a/sdks/python/apache_beam/runners/common_test.py b/sdks/python/apache_beam/runners/common_test.py index ca2cd2539a8c..1b5fc115d23b 100644 --- a/sdks/python/apache_beam/runners/common_test.py +++ b/sdks/python/apache_beam/runners/common_test.py @@ -17,6 +17,10 @@ # pytype: skip-file +import hashlib +import os +import random +import tempfile import unittest import hamcrest as hc @@ -26,10 +30,12 @@ from apache_beam.io.restriction_trackers import OffsetRestrictionTracker from apache_beam.io.watermark_estimators import ManualWatermarkEstimator from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.portability import common_urns from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.runners.common import DoFnSignature from apache_beam.runners.common import PerWindowInvoker from apache_beam.runners.common import merge_common_environments +from apache_beam.runners.common import merge_superset_dep_environments from apache_beam.runners.portability.expansion_service_test import FibTransform from apache_beam.runners.sdf_utils import SplitResultPrimary from apache_beam.runners.sdf_utils import SplitResultResidual @@ -632,6 +638,111 @@ def test_equal_environments_merged(self): pipeline_proto.components.windowing_strategies.values())), 1) + def _make_dep(self, path): + hasher = hashlib.sha256() + if os.path.exists(path): + with open(path, 'rb') as fin: + hasher.update(fin.read()) + else: + # A fake file, identified only by its path. + hasher.update(path.encode('utf-8')) + return beam_runner_api_pb2.ArtifactInformation( + type_urn=common_urns.artifact_types.FILE.urn, + type_payload=beam_runner_api_pb2.ArtifactFilePayload( + path=path, sha256=hasher.hexdigest()).SerializeToString(), + role_urn=common_urns.artifact_roles.STAGING_TO.urn, + role_payload=beam_runner_api_pb2.ArtifactStagingToRolePayload( + staged_name=os.path.basename(path)).SerializeToString()) + + def _docker_env(self, id, deps=()): + return beam_runner_api_pb2.Environment( + urn=common_urns.environments.DOCKER.urn, + payload=id.encode('utf8'), + dependencies=[self._make_dep(path) for path in deps], + ) + + def test_subset_deps_environments_merged(self): + environments = { + 'A': self._docker_env('A'), + 'Ax': self._docker_env('A', ['x']), + 'Ay': self._docker_env('A', ['y']), + 'Axy': self._docker_env('A', ['x', 'y']), + 'Bx': self._docker_env('B', ['x']), + 'Bxy': self._docker_env('B', ['x', 'y']), + 'Byz': self._docker_env('B', ['y', 'z']), + } + transforms = { + env_id: beam_runner_api_pb2.PTransform( + unique_name=env_id, environment_id=env_id) + for env_id in environments.keys() + } + pipeline_proto = merge_superset_dep_environments( + beam_runner_api_pb2.Pipeline( + components=beam_runner_api_pb2.Components( + environments=environments, transforms=transforms))) + + # These can all be merged into the same environment. + self.assertEqual( + pipeline_proto.components.transforms['A'].environment_id, 'Axy') + self.assertEqual( + pipeline_proto.components.transforms['Ax'].environment_id, 'Axy') + self.assertEqual( + pipeline_proto.components.transforms['Ay'].environment_id, 'Axy') + self.assertEqual( + pipeline_proto.components.transforms['Axy'].environment_id, 'Axy') + # Despite having the same dependencies, these must be merged into their own. + self.assertEqual( + pipeline_proto.components.transforms['Bx'].environment_id, 'Bxy') + self.assertEqual( + pipeline_proto.components.transforms['Bxy'].environment_id, 'Bxy') + # This is not a subset of any, must be left alone. + self.assertEqual( + pipeline_proto.components.transforms['Byz'].environment_id, 'Byz') + + def test_subset_deps_environments_merged_with_requirements_txt(self): + with tempfile.TemporaryDirectory() as tmpdir: + + def make_file(basename, content): + subdir = tempfile.TemporaryDirectory(dir=tmpdir, delete=False).name + path = os.path.join(subdir, basename) + with open(path, 'w') as fout: + fout.write(content) + return path + + def make_py_deps(*pkgs): + return [ + make_file('requirements.txt', '\n'.join(pkgs)), + make_file( + 'submission_environment_dependencies.txt', str( + random.random())), + ] + [make_file(pkg, pkg) for pkg in pkgs] + + environments = { + 'A': self._docker_env('A'), + 'Ax': self._docker_env('A', make_py_deps('x')), + 'Ay': self._docker_env('A', make_py_deps('y')), + 'Axy': self._docker_env('A', make_py_deps('x', 'y')), + } + transforms = { + env_id: beam_runner_api_pb2.PTransform( + unique_name=env_id, environment_id=env_id) + for env_id in environments.keys() + } + pipeline_proto = merge_superset_dep_environments( + beam_runner_api_pb2.Pipeline( + components=beam_runner_api_pb2.Components( + environments=environments, transforms=transforms))) + + # These can all be merged into the same environment. + self.assertEqual( + pipeline_proto.components.transforms['A'].environment_id, 'Axy') + self.assertEqual( + pipeline_proto.components.transforms['Ax'].environment_id, 'Axy') + self.assertEqual( + pipeline_proto.components.transforms['Ay'].environment_id, 'Axy') + self.assertEqual( + pipeline_proto.components.transforms['Axy'].environment_id, 'Axy') + def test_external_merged(self): p = beam.Pipeline() # This transform recursively creates several external environments. diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 4b6816e84efb..bf5c8a08ce54 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -45,6 +45,7 @@ from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.runners.common import group_by_key_input_visitor from apache_beam.runners.common import merge_common_environments +from apache_beam.runners.common import merge_superset_dep_environments from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api from apache_beam.runners.runner import PipelineResult from apache_beam.runners.runner import PipelineRunner @@ -434,7 +435,8 @@ def run_pipeline(self, pipeline, options, pipeline_proto=None): self.proto_pipeline.components.environments[env_id].CopyFrom( environments.resolve_anyof_environment( env, common_urns.environments.DOCKER.urn)) - self.proto_pipeline = merge_common_environments(self.proto_pipeline) + self.proto_pipeline = merge_common_environments( + merge_superset_dep_environments(self.proto_pipeline)) # Optimize the pipeline if it not streaming and the pre_optimize # experiment is set. diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py index 95bcb7567918..3655446d77ac 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py @@ -64,6 +64,7 @@ from apache_beam.runners import runner from apache_beam.runners.common import group_by_key_input_visitor from apache_beam.runners.common import merge_common_environments +from apache_beam.runners.common import merge_superset_dep_environments from apache_beam.runners.common import validate_pipeline_graph from apache_beam.runners.portability import portable_metrics from apache_beam.runners.portability.fn_api_runner import execution @@ -216,7 +217,8 @@ def run_via_runner_api( if direct_options.direct_embed_docker_python: pipeline_proto = self.embed_default_docker_image(pipeline_proto) pipeline_proto = merge_common_environments( - self.resolve_any_environments(pipeline_proto)) + self.resolve_any_environments( + merge_superset_dep_environments(pipeline_proto))) stage_context, stages = self.create_stages(pipeline_proto) return self.run_stages(stage_context, stages) From cf27624947c160c9ae77abc2af4639b3764dc963 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Tue, 8 Apr 2025 14:33:27 -0700 Subject: [PATCH 2/3] Move the pipeline graph manipulation utilites to their own files. As well as being very different from the low-level execution details found in common.py, there is no need to compile these (whereas the other structures in common.py are critical to optimize for elementwise efficiency). --- sdks/python/apache_beam/pipeline.py | 4 +- sdks/python/apache_beam/runners/common.py | 233 ---------------- .../python/apache_beam/runners/common_test.py | 170 ------------ .../runners/dataflow/dataflow_runner.py | 6 +- .../runners/dataflow/dataflow_runner_test.py | 10 +- .../runners/dataflow/internal/apiclient.py | 2 +- .../apache_beam/runners/pipeline_utils.py | 259 ++++++++++++++++++ .../runners/pipeline_utils_test.py | 196 +++++++++++++ .../portability/fn_api_runner/fn_runner.py | 10 +- sdks/python/apache_beam/runners/runner.py | 2 +- 10 files changed, 472 insertions(+), 420 deletions(-) create mode 100644 sdks/python/apache_beam/runners/pipeline_utils.py create mode 100644 sdks/python/apache_beam/runners/pipeline_utils_test.py diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 5196e5d29d8d..61d6b190d04f 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -87,8 +87,8 @@ from apache_beam.portability import common_urns from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.runners import PipelineRunner -from apache_beam.runners import common from apache_beam.runners import create_runner +from apache_beam.runners import pipeline_utils from apache_beam.transforms import ParDo from apache_beam.transforms import ptransform from apache_beam.transforms.display import DisplayData @@ -1019,7 +1019,7 @@ def merge_compatible_environments(proto): Mutates proto as contexts may have references to proto.components. """ - common.merge_common_environments(proto, inplace=True) + pipeline_utils.merge_common_environments(proto, inplace=True) @staticmethod def from_runner_api( diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 6a4a0404c72e..a40342b7fd27 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -22,8 +22,6 @@ # pytype: skip-file -import collections -import copy import logging import sys import threading @@ -42,8 +40,6 @@ from apache_beam.coders import coders from apache_beam.internal import util from apache_beam.options.value_provider import RuntimeValueProvider -from apache_beam.portability import common_urns -from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.pvalue import TaggedOutput from apache_beam.runners.sdf_utils import NoOpWatermarkEstimatorProvider from apache_beam.runners.sdf_utils import RestrictionTrackerView @@ -53,7 +49,6 @@ from apache_beam.runners.sdf_utils import ThreadsafeWatermarkEstimator from apache_beam.transforms import DoFn from apache_beam.transforms import core -from apache_beam.transforms import environments from apache_beam.transforms import userstate from apache_beam.transforms.core import RestrictionProvider from apache_beam.transforms.core import WatermarkEstimatorProvider @@ -61,7 +56,6 @@ from apache_beam.transforms.window import GlobalWindows from apache_beam.transforms.window import TimestampedValue from apache_beam.transforms.window import WindowFn -from apache_beam.typehints import typehints from apache_beam.typehints.batch import BatchConverter from apache_beam.utils.counters import Counter from apache_beam.utils.counters import CounterName @@ -1920,230 +1914,3 @@ def windows(self): raise AttributeError('windows not accessible in this context') else: return self.windowed_value.windows - - -def group_by_key_input_visitor(deterministic_key_coders=True): - # Importing here to avoid a circular dependency - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam.pipeline import PipelineVisitor - from apache_beam.transforms.core import GroupByKey - - class GroupByKeyInputVisitor(PipelineVisitor): - """A visitor that replaces `Any` element type for input `PCollection` of - a `GroupByKey` with a `KV` type. - - TODO(BEAM-115): Once Python SDK is compatible with the new Runner API, - we could directly replace the coder instead of mutating the element type. - """ - def __init__(self, deterministic_key_coders=True): - self.deterministic_key_coders = deterministic_key_coders - - def enter_composite_transform(self, transform_node): - self.visit_transform(transform_node) - - def visit_transform(self, transform_node): - if isinstance(transform_node.transform, GroupByKey): - pcoll = transform_node.inputs[0] - pcoll.element_type = typehints.coerce_to_kv_type( - pcoll.element_type, transform_node.full_label) - pcoll.requires_deterministic_key_coder = ( - self.deterministic_key_coders and transform_node.full_label) - key_type, value_type = pcoll.element_type.tuple_types - if transform_node.outputs: - key = next(iter(transform_node.outputs.keys())) - transform_node.outputs[key].element_type = typehints.KV[ - key_type, typehints.Iterable[value_type]] - transform_node.outputs[key].requires_deterministic_key_coder = ( - self.deterministic_key_coders and transform_node.full_label) - - return GroupByKeyInputVisitor(deterministic_key_coders) - - -def validate_pipeline_graph(pipeline_proto): - """Ensures this is a correctly constructed Beam pipeline. - """ - def get_coder(pcoll_id): - return pipeline_proto.components.coders[ - pipeline_proto.components.pcollections[pcoll_id].coder_id] - - def validate_transform(transform_id): - transform_proto = pipeline_proto.components.transforms[transform_id] - - # Currently the only validation we perform is that GBK operations have - # their coders set properly. - if transform_proto.spec.urn == common_urns.primitives.GROUP_BY_KEY.urn: - if len(transform_proto.inputs) != 1: - raise ValueError("Unexpected number of inputs: %s" % transform_proto) - if len(transform_proto.outputs) != 1: - raise ValueError("Unexpected number of outputs: %s" % transform_proto) - input_coder = get_coder(next(iter(transform_proto.inputs.values()))) - output_coder = get_coder(next(iter(transform_proto.outputs.values()))) - if input_coder.spec.urn != common_urns.coders.KV.urn: - raise ValueError( - "Bad coder for input of %s: %s" % (transform_id, input_coder)) - if output_coder.spec.urn != common_urns.coders.KV.urn: - raise ValueError( - "Bad coder for output of %s: %s" % (transform_id, output_coder)) - output_values_coder = pipeline_proto.components.coders[ - output_coder.component_coder_ids[1]] - if (input_coder.component_coder_ids[0] != - output_coder.component_coder_ids[0] or - output_values_coder.spec.urn != common_urns.coders.ITERABLE.urn or - output_values_coder.component_coder_ids[0] != - input_coder.component_coder_ids[1]): - raise ValueError( - "Incompatible input coder %s and output coder %s for transform %s" % - (transform_id, input_coder, output_coder)) - elif transform_proto.spec.urn == common_urns.primitives.ASSIGN_WINDOWS.urn: - if not transform_proto.inputs: - raise ValueError("Missing input for transform: %s" % transform_proto) - elif transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn: - if not transform_proto.inputs: - raise ValueError("Missing input for transform: %s" % transform_proto) - - for t in transform_proto.subtransforms: - validate_transform(t) - - for t in pipeline_proto.root_transform_ids: - validate_transform(t) - - -def _dep_key(dep): - if dep.type_urn == common_urns.artifact_types.FILE.urn: - payload = beam_runner_api_pb2.ArtifactFilePayload.FromString( - dep.type_payload) - if payload.sha256: - type_info = 'sha256', payload.sha256 - else: - type_info = 'path', payload.path - elif dep.type_urn == common_urns.artifact_types.URL.urn: - payload = beam_runner_api_pb2.ArtifactUrlPayload.FromString( - dep.type_payload) - if payload.sha256: - type_info = 'sha256', payload.sha256 - else: - type_info = 'url', payload.url - else: - type_info = dep.type_urn, dep.type_payload - return type_info, dep.role_urn, dep.role_payload - - -def _expanded_dep_keys(dep): - if (dep.type_urn == common_urns.artifact_types.FILE.urn and - dep.role_urn == common_urns.artifact_roles.STAGING_TO.urn): - payload = beam_runner_api_pb2.ArtifactFilePayload.FromString( - dep.type_payload) - role = beam_runner_api_pb2.ArtifactStagingToRolePayload.FromString( - dep.role_payload) - if role.staged_name == 'submission_environment_dependencies.txt': - return - elif role.staged_name == 'requirements.txt': - with open(payload.path) as fin: - for line in fin: - yield 'requirements.txt', line.strip() - return - - yield _dep_key(dep) - - -def _base_env_key(env, include_deps=True): - return ( - env.urn, - env.payload, - tuple(sorted(env.capabilities)), - tuple(sorted(env.resource_hints.items())), - tuple(sorted(_dep_key(dep) - for dep in env.dependencies)) if include_deps else None) - - -def _env_key(env): - return tuple( - sorted( - _base_env_key(e) - for e in environments.expand_anyof_environments(env))) - - -def merge_common_environments(pipeline_proto, inplace=False): - canonical_environments = collections.defaultdict(list) - for env_id, env in pipeline_proto.components.environments.items(): - canonical_environments[_env_key(env)].append(env_id) - - if len(canonical_environments) == len(pipeline_proto.components.environments): - # All environments are already sufficiently distinct. - return pipeline_proto - - environment_remappings = { - e: es[0] - for es in canonical_environments.values() for e in es - } - - return update_environments(pipeline_proto, environment_remappings, inplace) - - -def merge_superset_dep_environments(pipeline_proto): - """Merges all environemnts A and B where A and B are equivalent except that - A has a superset of the dependencies of B. - """ - docker_envs = {} - for env_id, env in pipeline_proto.components.environments.items(): - docker_env = environments.resolve_anyof_environment( - env, common_urns.environments.DOCKER.urn) - if docker_env.urn == common_urns.environments.DOCKER.urn: - docker_envs[env_id] = docker_env - - has_base_and_dep = collections.defaultdict(set) - env_scores = { - env_id: (len(env.dependencies), env_id) - for (env_id, env) in docker_envs.items() - } - - for env_id, env in docker_envs.items(): - base_key = _base_env_key(env, include_deps=False) - has_base_and_dep[base_key, None].add(env_id) - for dep in env.dependencies: - for dep_key in _expanded_dep_keys(dep): - has_base_and_dep[base_key, dep_key].add(env_id) - - environment_remappings = {} - for env_id, env in docker_envs.items(): - base_key = _base_env_key(env, include_deps=False) - # This is the set of all environments that have at least all of env's deps. - candidates = set.intersection( - has_base_and_dep[base_key, None], - *[ - has_base_and_dep[base_key, dep_key] for dep in env.dependencies - for dep_key in _expanded_dep_keys(dep) - ]) - # Choose the maximal one. - best = max(candidates, key=env_scores.get) - if best != env_id: - environment_remappings[env_id] = best - - return update_environments(pipeline_proto, environment_remappings) - - -def update_environments(pipeline_proto, environment_remappings, inplace=False): - if not environment_remappings: - return pipeline_proto - - if not inplace: - pipeline_proto = copy.copy(pipeline_proto) - - for t in pipeline_proto.components.transforms.values(): - if t.environment_id not in pipeline_proto.components.environments: - # TODO(https://github.com/apache/beam/issues/30876): Remove this - # workaround. - continue - if t.environment_id and t.environment_id in environment_remappings: - t.environment_id = environment_remappings[t.environment_id] - for w in pipeline_proto.components.windowing_strategies.values(): - if w.environment_id not in pipeline_proto.components.environments: - # TODO(https://github.com/apache/beam/issues/30876): Remove this - # workaround. - continue - if w.environment_id and w.environment_id in environment_remappings: - w.environment_id = environment_remappings[w.environment_id] - for e in set(environment_remappings.keys()) - set( - environment_remappings.values()): - del pipeline_proto.components.environments[e] - return pipeline_proto diff --git a/sdks/python/apache_beam/runners/common_test.py b/sdks/python/apache_beam/runners/common_test.py index 1b5fc115d23b..00645948c3ed 100644 --- a/sdks/python/apache_beam/runners/common_test.py +++ b/sdks/python/apache_beam/runners/common_test.py @@ -17,10 +17,6 @@ # pytype: skip-file -import hashlib -import os -import random -import tempfile import unittest import hamcrest as hc @@ -30,13 +26,8 @@ from apache_beam.io.restriction_trackers import OffsetRestrictionTracker from apache_beam.io.watermark_estimators import ManualWatermarkEstimator from apache_beam.options.pipeline_options import PipelineOptions -from apache_beam.portability import common_urns -from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.runners.common import DoFnSignature from apache_beam.runners.common import PerWindowInvoker -from apache_beam.runners.common import merge_common_environments -from apache_beam.runners.common import merge_superset_dep_environments -from apache_beam.runners.portability.expansion_service_test import FibTransform from apache_beam.runners.sdf_utils import SplitResultPrimary from apache_beam.runners.sdf_utils import SplitResultResidual from apache_beam.testing.test_pipeline import TestPipeline @@ -593,166 +584,5 @@ def test_window_observing_split_on_window_boundary_round_down_on_last_window( self.assertEqual(stop_index, 2) -class UtilitiesTest(unittest.TestCase): - def test_equal_environments_merged(self): - pipeline_proto = merge_common_environments( - beam_runner_api_pb2.Pipeline( - components=beam_runner_api_pb2.Components( - environments={ - 'a1': beam_runner_api_pb2.Environment(urn='A'), - 'a2': beam_runner_api_pb2.Environment(urn='A'), - 'b1': beam_runner_api_pb2.Environment( - urn='B', payload=b'x'), - 'b2': beam_runner_api_pb2.Environment( - urn='B', payload=b'x'), - 'b3': beam_runner_api_pb2.Environment( - urn='B', payload=b'y'), - }, - transforms={ - 't1': beam_runner_api_pb2.PTransform( - unique_name='t1', environment_id='a1'), - 't2': beam_runner_api_pb2.PTransform( - unique_name='t2', environment_id='a2'), - }, - windowing_strategies={ - 'w1': beam_runner_api_pb2.WindowingStrategy( - environment_id='b1'), - 'w2': beam_runner_api_pb2.WindowingStrategy( - environment_id='b2'), - }))) - self.assertEqual(len(pipeline_proto.components.environments), 3) - self.assertTrue(('a1' in pipeline_proto.components.environments) - ^ ('a2' in pipeline_proto.components.environments)) - self.assertTrue(('b1' in pipeline_proto.components.environments) - ^ ('b2' in pipeline_proto.components.environments)) - self.assertEqual( - len( - set( - t.environment_id - for t in pipeline_proto.components.transforms.values())), - 1) - self.assertEqual( - len( - set( - w.environment_id for w in - pipeline_proto.components.windowing_strategies.values())), - 1) - - def _make_dep(self, path): - hasher = hashlib.sha256() - if os.path.exists(path): - with open(path, 'rb') as fin: - hasher.update(fin.read()) - else: - # A fake file, identified only by its path. - hasher.update(path.encode('utf-8')) - return beam_runner_api_pb2.ArtifactInformation( - type_urn=common_urns.artifact_types.FILE.urn, - type_payload=beam_runner_api_pb2.ArtifactFilePayload( - path=path, sha256=hasher.hexdigest()).SerializeToString(), - role_urn=common_urns.artifact_roles.STAGING_TO.urn, - role_payload=beam_runner_api_pb2.ArtifactStagingToRolePayload( - staged_name=os.path.basename(path)).SerializeToString()) - - def _docker_env(self, id, deps=()): - return beam_runner_api_pb2.Environment( - urn=common_urns.environments.DOCKER.urn, - payload=id.encode('utf8'), - dependencies=[self._make_dep(path) for path in deps], - ) - - def test_subset_deps_environments_merged(self): - environments = { - 'A': self._docker_env('A'), - 'Ax': self._docker_env('A', ['x']), - 'Ay': self._docker_env('A', ['y']), - 'Axy': self._docker_env('A', ['x', 'y']), - 'Bx': self._docker_env('B', ['x']), - 'Bxy': self._docker_env('B', ['x', 'y']), - 'Byz': self._docker_env('B', ['y', 'z']), - } - transforms = { - env_id: beam_runner_api_pb2.PTransform( - unique_name=env_id, environment_id=env_id) - for env_id in environments.keys() - } - pipeline_proto = merge_superset_dep_environments( - beam_runner_api_pb2.Pipeline( - components=beam_runner_api_pb2.Components( - environments=environments, transforms=transforms))) - - # These can all be merged into the same environment. - self.assertEqual( - pipeline_proto.components.transforms['A'].environment_id, 'Axy') - self.assertEqual( - pipeline_proto.components.transforms['Ax'].environment_id, 'Axy') - self.assertEqual( - pipeline_proto.components.transforms['Ay'].environment_id, 'Axy') - self.assertEqual( - pipeline_proto.components.transforms['Axy'].environment_id, 'Axy') - # Despite having the same dependencies, these must be merged into their own. - self.assertEqual( - pipeline_proto.components.transforms['Bx'].environment_id, 'Bxy') - self.assertEqual( - pipeline_proto.components.transforms['Bxy'].environment_id, 'Bxy') - # This is not a subset of any, must be left alone. - self.assertEqual( - pipeline_proto.components.transforms['Byz'].environment_id, 'Byz') - - def test_subset_deps_environments_merged_with_requirements_txt(self): - with tempfile.TemporaryDirectory() as tmpdir: - - def make_file(basename, content): - subdir = tempfile.TemporaryDirectory(dir=tmpdir, delete=False).name - path = os.path.join(subdir, basename) - with open(path, 'w') as fout: - fout.write(content) - return path - - def make_py_deps(*pkgs): - return [ - make_file('requirements.txt', '\n'.join(pkgs)), - make_file( - 'submission_environment_dependencies.txt', str( - random.random())), - ] + [make_file(pkg, pkg) for pkg in pkgs] - - environments = { - 'A': self._docker_env('A'), - 'Ax': self._docker_env('A', make_py_deps('x')), - 'Ay': self._docker_env('A', make_py_deps('y')), - 'Axy': self._docker_env('A', make_py_deps('x', 'y')), - } - transforms = { - env_id: beam_runner_api_pb2.PTransform( - unique_name=env_id, environment_id=env_id) - for env_id in environments.keys() - } - pipeline_proto = merge_superset_dep_environments( - beam_runner_api_pb2.Pipeline( - components=beam_runner_api_pb2.Components( - environments=environments, transforms=transforms))) - - # These can all be merged into the same environment. - self.assertEqual( - pipeline_proto.components.transforms['A'].environment_id, 'Axy') - self.assertEqual( - pipeline_proto.components.transforms['Ax'].environment_id, 'Axy') - self.assertEqual( - pipeline_proto.components.transforms['Ay'].environment_id, 'Axy') - self.assertEqual( - pipeline_proto.components.transforms['Axy'].environment_id, 'Axy') - - def test_external_merged(self): - p = beam.Pipeline() - # This transform recursively creates several external environments. - _ = p | FibTransform(4) - pipeline_proto = p.to_runner_api() - # All our external environments are equal and consolidated. - # We also have a placeholder "default" environment that has not been - # resolved do anything concrete yet. - self.assertEqual(len(pipeline_proto.components.environments), 2) - - if __name__ == '__main__': unittest.main() diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index bf5c8a08ce54..860326d02d30 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -43,10 +43,10 @@ from apache_beam.options.pipeline_options import WorkerOptions from apache_beam.portability import common_urns from apache_beam.portability.api import beam_runner_api_pb2 -from apache_beam.runners.common import group_by_key_input_visitor -from apache_beam.runners.common import merge_common_environments -from apache_beam.runners.common import merge_superset_dep_environments from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api +from apache_beam.runners.pipeline_utils import group_by_key_input_visitor +from apache_beam.runners.pipeline_utils import merge_common_environments +from apache_beam.runners.pipeline_utils import merge_superset_dep_environments from apache_beam.runners.runner import PipelineResult from apache_beam.runners.runner import PipelineRunner from apache_beam.runners.runner import PipelineState diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index 5e1cb0ae9a48..65d0525b8a7e 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -35,8 +35,8 @@ from apache_beam.pvalue import PCollection from apache_beam.runners import DataflowRunner from apache_beam.runners import TestDataflowRunner -from apache_beam.runners import common from apache_beam.runners import create_runner +from apache_beam.runners import pipeline_utils from apache_beam.runners.dataflow.dataflow_runner import DataflowPipelineResult from apache_beam.runners.dataflow.dataflow_runner import DataflowRuntimeException from apache_beam.runners.dataflow.dataflow_runner import _check_and_add_missing_options @@ -316,7 +316,7 @@ def test_group_by_key_input_visitor_with_valid_inputs(self): applied = AppliedPTransform( None, beam.GroupByKey(), "label", {'pcoll': pcoll}, None, None) applied.outputs[None] = PCollection(None) - common.group_by_key_input_visitor().visit_transform(applied) + pipeline_utils.group_by_key_input_visitor().visit_transform(applied) self.assertEqual( pcoll.element_type, typehints.KV[typehints.Any, typehints.Any]) @@ -332,7 +332,7 @@ def test_group_by_key_input_visitor_with_invalid_inputs(self): "Found .*") for pcoll in [pcoll1, pcoll2]: with self.assertRaisesRegex(ValueError, err_msg): - common.group_by_key_input_visitor().visit_transform( + pipeline_utils.group_by_key_input_visitor().visit_transform( AppliedPTransform( None, beam.GroupByKey(), "label", {'in': pcoll}, None, None)) @@ -341,7 +341,7 @@ def test_group_by_key_input_visitor_for_non_gbk_transforms(self): pcoll = PCollection(p) for transform in [beam.Flatten(), beam.Map(lambda x: x)]: pcoll.element_type = typehints.Any - common.group_by_key_input_visitor().visit_transform( + pipeline_utils.group_by_key_input_visitor().visit_transform( AppliedPTransform( None, transform, "label", {'in': pcoll}, None, None)) self.assertEqual(pcoll.element_type, typehints.Any) @@ -383,7 +383,7 @@ def test_gbk_then_flatten_input_visitor(self): # to make sure the check below is not vacuous. self.assertNotIsInstance(flat.element_type, typehints.TupleConstraint) - p.visit(common.group_by_key_input_visitor()) + p.visit(pipeline_utils.group_by_key_input_visitor()) p.visit(DataflowRunner.flatten_input_visitor()) # The dataflow runner requires gbk input to be tuples *and* flatten diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index a757d45c0363..b5e4949ad7ca 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -63,10 +63,10 @@ from apache_beam.options.pipeline_options import WorkerOptions from apache_beam.portability import common_urns from apache_beam.portability.api import beam_runner_api_pb2 -from apache_beam.runners.common import validate_pipeline_graph from apache_beam.runners.dataflow.internal import names from apache_beam.runners.dataflow.internal.clients import dataflow from apache_beam.runners.internal import names as shared_names +from apache_beam.runners.pipeline_utils import validate_pipeline_graph from apache_beam.runners.portability.stager import Stager from apache_beam.transforms import DataflowDistributionCounter from apache_beam.transforms import cy_combiners diff --git a/sdks/python/apache_beam/runners/pipeline_utils.py b/sdks/python/apache_beam/runners/pipeline_utils.py new file mode 100644 index 000000000000..9a0fb00586cf --- /dev/null +++ b/sdks/python/apache_beam/runners/pipeline_utils.py @@ -0,0 +1,259 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Pipeline manipulation utilities useful for many runners. + +For internal use only; no backwards-compatibility guarantees. +""" + +# pytype: skip-fileimport collections + + +import collections +import copy + +from apache_beam.portability import common_urns +from apache_beam.portability.api import beam_runner_api_pb2 +from apache_beam.transforms import environments +from apache_beam.typehints import typehints + + +def group_by_key_input_visitor(deterministic_key_coders=True): + # Importing here to avoid a circular dependency + # pylint: disable=wrong-import-order, wrong-import-position + from apache_beam.pipeline import PipelineVisitor + from apache_beam.transforms.core import GroupByKey + + class GroupByKeyInputVisitor(PipelineVisitor): + """A visitor that replaces `Any` element type for input `PCollection` of + a `GroupByKey` with a `KV` type. + + TODO(BEAM-115): Once Python SDK is compatible with the new Runner API, + we could directly replace the coder instead of mutating the element type. + """ + def __init__(self, deterministic_key_coders=True): + self.deterministic_key_coders = deterministic_key_coders + + def enter_composite_transform(self, transform_node): + self.visit_transform(transform_node) + + def visit_transform(self, transform_node): + if isinstance(transform_node.transform, GroupByKey): + pcoll = transform_node.inputs[0] + pcoll.element_type = typehints.coerce_to_kv_type( + pcoll.element_type, transform_node.full_label) + pcoll.requires_deterministic_key_coder = ( + self.deterministic_key_coders and transform_node.full_label) + key_type, value_type = pcoll.element_type.tuple_types + if transform_node.outputs: + key = next(iter(transform_node.outputs.keys())) + transform_node.outputs[key].element_type = typehints.KV[ + key_type, typehints.Iterable[value_type]] + transform_node.outputs[key].requires_deterministic_key_coder = ( + self.deterministic_key_coders and transform_node.full_label) + + return GroupByKeyInputVisitor(deterministic_key_coders) + + +def validate_pipeline_graph(pipeline_proto): + """Ensures this is a correctly constructed Beam pipeline. + """ + def get_coder(pcoll_id): + return pipeline_proto.components.coders[ + pipeline_proto.components.pcollections[pcoll_id].coder_id] + + def validate_transform(transform_id): + transform_proto = pipeline_proto.components.transforms[transform_id] + + # Currently the only validation we perform is that GBK operations have + # their coders set properly. + if transform_proto.spec.urn == common_urns.primitives.GROUP_BY_KEY.urn: + if len(transform_proto.inputs) != 1: + raise ValueError("Unexpected number of inputs: %s" % transform_proto) + if len(transform_proto.outputs) != 1: + raise ValueError("Unexpected number of outputs: %s" % transform_proto) + input_coder = get_coder(next(iter(transform_proto.inputs.values()))) + output_coder = get_coder(next(iter(transform_proto.outputs.values()))) + if input_coder.spec.urn != common_urns.coders.KV.urn: + raise ValueError( + "Bad coder for input of %s: %s" % (transform_id, input_coder)) + if output_coder.spec.urn != common_urns.coders.KV.urn: + raise ValueError( + "Bad coder for output of %s: %s" % (transform_id, output_coder)) + output_values_coder = pipeline_proto.components.coders[ + output_coder.component_coder_ids[1]] + if (input_coder.component_coder_ids[0] != + output_coder.component_coder_ids[0] or + output_values_coder.spec.urn != common_urns.coders.ITERABLE.urn or + output_values_coder.component_coder_ids[0] != + input_coder.component_coder_ids[1]): + raise ValueError( + "Incompatible input coder %s and output coder %s for transform %s" % + (transform_id, input_coder, output_coder)) + elif transform_proto.spec.urn == common_urns.primitives.ASSIGN_WINDOWS.urn: + if not transform_proto.inputs: + raise ValueError("Missing input for transform: %s" % transform_proto) + elif transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn: + if not transform_proto.inputs: + raise ValueError("Missing input for transform: %s" % transform_proto) + + for t in transform_proto.subtransforms: + validate_transform(t) + + for t in pipeline_proto.root_transform_ids: + validate_transform(t) + + +def _dep_key(dep): + if dep.type_urn == common_urns.artifact_types.FILE.urn: + payload = beam_runner_api_pb2.ArtifactFilePayload.FromString( + dep.type_payload) + if payload.sha256: + type_info = 'sha256', payload.sha256 + else: + type_info = 'path', payload.path + elif dep.type_urn == common_urns.artifact_types.URL.urn: + payload = beam_runner_api_pb2.ArtifactUrlPayload.FromString( + dep.type_payload) + if payload.sha256: + type_info = 'sha256', payload.sha256 + else: + type_info = 'url', payload.url + else: + type_info = dep.type_urn, dep.type_payload + return type_info, dep.role_urn, dep.role_payload + + +def _expanded_dep_keys(dep): + if (dep.type_urn == common_urns.artifact_types.FILE.urn and + dep.role_urn == common_urns.artifact_roles.STAGING_TO.urn): + payload = beam_runner_api_pb2.ArtifactFilePayload.FromString( + dep.type_payload) + role = beam_runner_api_pb2.ArtifactStagingToRolePayload.FromString( + dep.role_payload) + if role.staged_name == 'submission_environment_dependencies.txt': + return + elif role.staged_name == 'requirements.txt': + with open(payload.path) as fin: + for line in fin: + yield 'requirements.txt', line.strip() + return + + yield _dep_key(dep) + + +def _base_env_key(env, include_deps=True): + return ( + env.urn, + env.payload, + tuple(sorted(env.capabilities)), + tuple(sorted(env.resource_hints.items())), + tuple(sorted(_dep_key(dep) + for dep in env.dependencies)) if include_deps else None) + + +def _env_key(env): + return tuple( + sorted( + _base_env_key(e) + for e in environments.expand_anyof_environments(env))) + + +def merge_common_environments(pipeline_proto, inplace=False): + canonical_environments = collections.defaultdict(list) + for env_id, env in pipeline_proto.components.environments.items(): + canonical_environments[_env_key(env)].append(env_id) + + if len(canonical_environments) == len(pipeline_proto.components.environments): + # All environments are already sufficiently distinct. + return pipeline_proto + + environment_remappings = { + e: es[0] + for es in canonical_environments.values() for e in es + } + + return update_environments(pipeline_proto, environment_remappings, inplace) + + +def merge_superset_dep_environments(pipeline_proto): + """Merges all environemnts A and B where A and B are equivalent except that + A has a superset of the dependencies of B. + """ + docker_envs = {} + for env_id, env in pipeline_proto.components.environments.items(): + docker_env = environments.resolve_anyof_environment( + env, common_urns.environments.DOCKER.urn) + if docker_env.urn == common_urns.environments.DOCKER.urn: + docker_envs[env_id] = docker_env + + has_base_and_dep = collections.defaultdict(set) + env_scores = { + env_id: (len(env.dependencies), env_id) + for (env_id, env) in docker_envs.items() + } + + for env_id, env in docker_envs.items(): + base_key = _base_env_key(env, include_deps=False) + has_base_and_dep[base_key, None].add(env_id) + for dep in env.dependencies: + for dep_key in _expanded_dep_keys(dep): + has_base_and_dep[base_key, dep_key].add(env_id) + + environment_remappings = {} + for env_id, env in docker_envs.items(): + base_key = _base_env_key(env, include_deps=False) + # This is the set of all environments that have at least all of env's deps. + candidates = set.intersection( + has_base_and_dep[base_key, None], + *[ + has_base_and_dep[base_key, dep_key] for dep in env.dependencies + for dep_key in _expanded_dep_keys(dep) + ]) + # Choose the maximal one. + best = max(candidates, key=env_scores.get) + if best != env_id: + environment_remappings[env_id] = best + + return update_environments(pipeline_proto, environment_remappings) + + +def update_environments(pipeline_proto, environment_remappings, inplace=False): + if not environment_remappings: + return pipeline_proto + + if not inplace: + pipeline_proto = copy.copy(pipeline_proto) + + for t in pipeline_proto.components.transforms.values(): + if t.environment_id not in pipeline_proto.components.environments: + # TODO(https://github.com/apache/beam/issues/30876): Remove this + # workaround. + continue + if t.environment_id and t.environment_id in environment_remappings: + t.environment_id = environment_remappings[t.environment_id] + for w in pipeline_proto.components.windowing_strategies.values(): + if w.environment_id not in pipeline_proto.components.environments: + # TODO(https://github.com/apache/beam/issues/30876): Remove this + # workaround. + continue + if w.environment_id and w.environment_id in environment_remappings: + w.environment_id = environment_remappings[w.environment_id] + for e in set(environment_remappings.keys()) - set( + environment_remappings.values()): + del pipeline_proto.components.environments[e] + return pipeline_proto diff --git a/sdks/python/apache_beam/runners/pipeline_utils_test.py b/sdks/python/apache_beam/runners/pipeline_utils_test.py new file mode 100644 index 000000000000..4601dc01763b --- /dev/null +++ b/sdks/python/apache_beam/runners/pipeline_utils_test.py @@ -0,0 +1,196 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +import hashlib +import os +import random +import tempfile +import unittest + +import apache_beam as beam +from apache_beam.portability import common_urns +from apache_beam.portability.api import beam_runner_api_pb2 +from apache_beam.runners.pipeline_utils import merge_common_environments +from apache_beam.runners.pipeline_utils import merge_superset_dep_environments +from apache_beam.runners.portability.expansion_service_test import FibTransform + + +class PipelineUtilitiesTest(unittest.TestCase): + def test_equal_environments_merged(self): + pipeline_proto = merge_common_environments( + beam_runner_api_pb2.Pipeline( + components=beam_runner_api_pb2.Components( + environments={ + 'a1': beam_runner_api_pb2.Environment(urn='A'), + 'a2': beam_runner_api_pb2.Environment(urn='A'), + 'b1': beam_runner_api_pb2.Environment( + urn='B', payload=b'x'), + 'b2': beam_runner_api_pb2.Environment( + urn='B', payload=b'x'), + 'b3': beam_runner_api_pb2.Environment( + urn='B', payload=b'y'), + }, + transforms={ + 't1': beam_runner_api_pb2.PTransform( + unique_name='t1', environment_id='a1'), + 't2': beam_runner_api_pb2.PTransform( + unique_name='t2', environment_id='a2'), + }, + windowing_strategies={ + 'w1': beam_runner_api_pb2.WindowingStrategy( + environment_id='b1'), + 'w2': beam_runner_api_pb2.WindowingStrategy( + environment_id='b2'), + }))) + self.assertEqual(len(pipeline_proto.components.environments), 3) + self.assertTrue(('a1' in pipeline_proto.components.environments) + ^ ('a2' in pipeline_proto.components.environments)) + self.assertTrue(('b1' in pipeline_proto.components.environments) + ^ ('b2' in pipeline_proto.components.environments)) + self.assertEqual( + len( + set( + t.environment_id + for t in pipeline_proto.components.transforms.values())), + 1) + self.assertEqual( + len( + set( + w.environment_id for w in + pipeline_proto.components.windowing_strategies.values())), + 1) + + def _make_dep(self, path): + hasher = hashlib.sha256() + if os.path.exists(path): + with open(path, 'rb') as fin: + hasher.update(fin.read()) + else: + # A fake file, identified only by its path. + hasher.update(path.encode('utf-8')) + return beam_runner_api_pb2.ArtifactInformation( + type_urn=common_urns.artifact_types.FILE.urn, + type_payload=beam_runner_api_pb2.ArtifactFilePayload( + path=path, sha256=hasher.hexdigest()).SerializeToString(), + role_urn=common_urns.artifact_roles.STAGING_TO.urn, + role_payload=beam_runner_api_pb2.ArtifactStagingToRolePayload( + staged_name=os.path.basename(path)).SerializeToString()) + + def _docker_env(self, id, deps=()): + return beam_runner_api_pb2.Environment( + urn=common_urns.environments.DOCKER.urn, + payload=id.encode('utf8'), + dependencies=[self._make_dep(path) for path in deps], + ) + + def test_subset_deps_environments_merged(self): + environments = { + 'A': self._docker_env('A'), + 'Ax': self._docker_env('A', ['x']), + 'Ay': self._docker_env('A', ['y']), + 'Axy': self._docker_env('A', ['x', 'y']), + 'Bx': self._docker_env('B', ['x']), + 'Bxy': self._docker_env('B', ['x', 'y']), + 'Byz': self._docker_env('B', ['y', 'z']), + } + transforms = { + env_id: beam_runner_api_pb2.PTransform( + unique_name=env_id, environment_id=env_id) + for env_id in environments.keys() + } + pipeline_proto = merge_superset_dep_environments( + beam_runner_api_pb2.Pipeline( + components=beam_runner_api_pb2.Components( + environments=environments, transforms=transforms))) + + # These can all be merged into the same environment. + self.assertEqual( + pipeline_proto.components.transforms['A'].environment_id, 'Axy') + self.assertEqual( + pipeline_proto.components.transforms['Ax'].environment_id, 'Axy') + self.assertEqual( + pipeline_proto.components.transforms['Ay'].environment_id, 'Axy') + self.assertEqual( + pipeline_proto.components.transforms['Axy'].environment_id, 'Axy') + # Despite having the same dependencies, these must be merged into their own. + self.assertEqual( + pipeline_proto.components.transforms['Bx'].environment_id, 'Bxy') + self.assertEqual( + pipeline_proto.components.transforms['Bxy'].environment_id, 'Bxy') + # This is not a subset of any, must be left alone. + self.assertEqual( + pipeline_proto.components.transforms['Byz'].environment_id, 'Byz') + + def test_subset_deps_environments_merged_with_requirements_txt(self): + with tempfile.TemporaryDirectory() as tmpdir: + + def make_file(basename, content): + subdir = tempfile.TemporaryDirectory(dir=tmpdir, delete=False).name + path = os.path.join(subdir, basename) + with open(path, 'w') as fout: + fout.write(content) + return path + + def make_py_deps(*pkgs): + return [ + make_file('requirements.txt', '\n'.join(pkgs)), + make_file( + 'submission_environment_dependencies.txt', str( + random.random())), + ] + [make_file(pkg, pkg) for pkg in pkgs] + + environments = { + 'A': self._docker_env('A'), + 'Ax': self._docker_env('A', make_py_deps('x')), + 'Ay': self._docker_env('A', make_py_deps('y')), + 'Axy': self._docker_env('A', make_py_deps('x', 'y')), + } + transforms = { + env_id: beam_runner_api_pb2.PTransform( + unique_name=env_id, environment_id=env_id) + for env_id in environments.keys() + } + pipeline_proto = merge_superset_dep_environments( + beam_runner_api_pb2.Pipeline( + components=beam_runner_api_pb2.Components( + environments=environments, transforms=transforms))) + + # These can all be merged into the same environment. + self.assertEqual( + pipeline_proto.components.transforms['A'].environment_id, 'Axy') + self.assertEqual( + pipeline_proto.components.transforms['Ax'].environment_id, 'Axy') + self.assertEqual( + pipeline_proto.components.transforms['Ay'].environment_id, 'Axy') + self.assertEqual( + pipeline_proto.components.transforms['Axy'].environment_id, 'Axy') + + def test_external_merged(self): + p = beam.Pipeline() + # This transform recursively creates several external environments. + _ = p | FibTransform(4) + pipeline_proto = p.to_runner_api() + # All our external environments are equal and consolidated. + # We also have a placeholder "default" environment that has not been + # resolved do anything concrete yet. + self.assertEqual(len(pipeline_proto.components.environments), 2) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py index 3655446d77ac..67d8d6fd333f 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py @@ -62,10 +62,10 @@ from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.portability.api import metrics_pb2 from apache_beam.runners import runner -from apache_beam.runners.common import group_by_key_input_visitor -from apache_beam.runners.common import merge_common_environments -from apache_beam.runners.common import merge_superset_dep_environments -from apache_beam.runners.common import validate_pipeline_graph +from apache_beam.runners.pipeline_utils import group_by_key_input_visitor +from apache_beam.runners.pipeline_utils import merge_common_environments +from apache_beam.runners.pipeline_utils import merge_superset_dep_environments +from apache_beam.runners.pipeline_utils import validate_pipeline_graph from apache_beam.runners.portability import portable_metrics from apache_beam.runners.portability.fn_api_runner import execution from apache_beam.runners.portability.fn_api_runner import translations @@ -218,7 +218,7 @@ def run_via_runner_api( pipeline_proto = self.embed_default_docker_image(pipeline_proto) pipeline_proto = merge_common_environments( self.resolve_any_environments( - merge_superset_dep_environments(pipeline_proto))) + merge_superset_dep_environments(pipeline_proto))) stage_context, stages = self.create_stages(pipeline_proto) return self.run_stages(stage_context, stages) diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py index 78022724226a..1030e9fbd8b5 100644 --- a/sdks/python/apache_beam/runners/runner.py +++ b/sdks/python/apache_beam/runners/runner.py @@ -32,7 +32,7 @@ from apache_beam.options.pipeline_options import TypeOptions from apache_beam.portability import common_urns from apache_beam.portability.api import beam_runner_api_pb2 -from apache_beam.runners.common import group_by_key_input_visitor +from apache_beam.runners.pipeline_utils import group_by_key_input_visitor from apache_beam.transforms import environments if TYPE_CHECKING: From 01b5bb9420ea52da0602f1869608a6bfc7b6ef45 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Wed, 9 Apr 2025 08:55:30 -0700 Subject: [PATCH 3/3] Use tempfile.mkdtemp as TemporaryDirectory's delete not available pre 3.12. --- sdks/python/apache_beam/runners/pipeline_utils.py | 1 - sdks/python/apache_beam/runners/pipeline_utils_test.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/runners/pipeline_utils.py b/sdks/python/apache_beam/runners/pipeline_utils.py index 9a0fb00586cf..7c38c034e2a7 100644 --- a/sdks/python/apache_beam/runners/pipeline_utils.py +++ b/sdks/python/apache_beam/runners/pipeline_utils.py @@ -22,7 +22,6 @@ # pytype: skip-fileimport collections - import collections import copy diff --git a/sdks/python/apache_beam/runners/pipeline_utils_test.py b/sdks/python/apache_beam/runners/pipeline_utils_test.py index 4601dc01763b..4359f943cfb8 100644 --- a/sdks/python/apache_beam/runners/pipeline_utils_test.py +++ b/sdks/python/apache_beam/runners/pipeline_utils_test.py @@ -141,7 +141,7 @@ def test_subset_deps_environments_merged_with_requirements_txt(self): with tempfile.TemporaryDirectory() as tmpdir: def make_file(basename, content): - subdir = tempfile.TemporaryDirectory(dir=tmpdir, delete=False).name + subdir = tempfile.mkdtemp(dir=tmpdir) path = os.path.join(subdir, basename) with open(path, 'w') as fout: fout.write(content)