Skip to content

Commit 2a8d7f7

Browse files
committed
Move the pipeline graph manipulation utilites to their own files.
As well as being very different from the low-level execution details found in common.py, there is no need to compile these (whereas the other structures in common.py are critical to optimize for elementwise efficiency).
1 parent 7e7666e commit 2a8d7f7

File tree

8 files changed

+17
-408
lines changed

8 files changed

+17
-408
lines changed

sdks/python/apache_beam/pipeline.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,8 @@
8787
from apache_beam.portability import common_urns
8888
from apache_beam.portability.api import beam_runner_api_pb2
8989
from apache_beam.runners import PipelineRunner
90-
from apache_beam.runners import common
9190
from apache_beam.runners import create_runner
91+
from apache_beam.runners import pipeline_utils
9292
from apache_beam.transforms import ParDo
9393
from apache_beam.transforms import ptransform
9494
from apache_beam.transforms.display import DisplayData
@@ -1019,7 +1019,7 @@ def merge_compatible_environments(proto):
10191019
10201020
Mutates proto as contexts may have references to proto.components.
10211021
"""
1022-
common.merge_common_environments(proto, inplace=True)
1022+
pipeline_utils.merge_common_environments(proto, inplace=True)
10231023

10241024
@staticmethod
10251025
def from_runner_api(

sdks/python/apache_beam/runners/common.py

Lines changed: 0 additions & 227 deletions
Original file line numberDiff line numberDiff line change
@@ -1920,230 +1920,3 @@ def windows(self):
19201920
raise AttributeError('windows not accessible in this context')
19211921
else:
19221922
return self.windowed_value.windows
1923-
1924-
1925-
def group_by_key_input_visitor(deterministic_key_coders=True):
1926-
# Importing here to avoid a circular dependency
1927-
# pylint: disable=wrong-import-order, wrong-import-position
1928-
from apache_beam.pipeline import PipelineVisitor
1929-
from apache_beam.transforms.core import GroupByKey
1930-
1931-
class GroupByKeyInputVisitor(PipelineVisitor):
1932-
"""A visitor that replaces `Any` element type for input `PCollection` of
1933-
a `GroupByKey` with a `KV` type.
1934-
1935-
TODO(BEAM-115): Once Python SDK is compatible with the new Runner API,
1936-
we could directly replace the coder instead of mutating the element type.
1937-
"""
1938-
def __init__(self, deterministic_key_coders=True):
1939-
self.deterministic_key_coders = deterministic_key_coders
1940-
1941-
def enter_composite_transform(self, transform_node):
1942-
self.visit_transform(transform_node)
1943-
1944-
def visit_transform(self, transform_node):
1945-
if isinstance(transform_node.transform, GroupByKey):
1946-
pcoll = transform_node.inputs[0]
1947-
pcoll.element_type = typehints.coerce_to_kv_type(
1948-
pcoll.element_type, transform_node.full_label)
1949-
pcoll.requires_deterministic_key_coder = (
1950-
self.deterministic_key_coders and transform_node.full_label)
1951-
key_type, value_type = pcoll.element_type.tuple_types
1952-
if transform_node.outputs:
1953-
key = next(iter(transform_node.outputs.keys()))
1954-
transform_node.outputs[key].element_type = typehints.KV[
1955-
key_type, typehints.Iterable[value_type]]
1956-
transform_node.outputs[key].requires_deterministic_key_coder = (
1957-
self.deterministic_key_coders and transform_node.full_label)
1958-
1959-
return GroupByKeyInputVisitor(deterministic_key_coders)
1960-
1961-
1962-
def validate_pipeline_graph(pipeline_proto):
1963-
"""Ensures this is a correctly constructed Beam pipeline.
1964-
"""
1965-
def get_coder(pcoll_id):
1966-
return pipeline_proto.components.coders[
1967-
pipeline_proto.components.pcollections[pcoll_id].coder_id]
1968-
1969-
def validate_transform(transform_id):
1970-
transform_proto = pipeline_proto.components.transforms[transform_id]
1971-
1972-
# Currently the only validation we perform is that GBK operations have
1973-
# their coders set properly.
1974-
if transform_proto.spec.urn == common_urns.primitives.GROUP_BY_KEY.urn:
1975-
if len(transform_proto.inputs) != 1:
1976-
raise ValueError("Unexpected number of inputs: %s" % transform_proto)
1977-
if len(transform_proto.outputs) != 1:
1978-
raise ValueError("Unexpected number of outputs: %s" % transform_proto)
1979-
input_coder = get_coder(next(iter(transform_proto.inputs.values())))
1980-
output_coder = get_coder(next(iter(transform_proto.outputs.values())))
1981-
if input_coder.spec.urn != common_urns.coders.KV.urn:
1982-
raise ValueError(
1983-
"Bad coder for input of %s: %s" % (transform_id, input_coder))
1984-
if output_coder.spec.urn != common_urns.coders.KV.urn:
1985-
raise ValueError(
1986-
"Bad coder for output of %s: %s" % (transform_id, output_coder))
1987-
output_values_coder = pipeline_proto.components.coders[
1988-
output_coder.component_coder_ids[1]]
1989-
if (input_coder.component_coder_ids[0] !=
1990-
output_coder.component_coder_ids[0] or
1991-
output_values_coder.spec.urn != common_urns.coders.ITERABLE.urn or
1992-
output_values_coder.component_coder_ids[0] !=
1993-
input_coder.component_coder_ids[1]):
1994-
raise ValueError(
1995-
"Incompatible input coder %s and output coder %s for transform %s" %
1996-
(transform_id, input_coder, output_coder))
1997-
elif transform_proto.spec.urn == common_urns.primitives.ASSIGN_WINDOWS.urn:
1998-
if not transform_proto.inputs:
1999-
raise ValueError("Missing input for transform: %s" % transform_proto)
2000-
elif transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn:
2001-
if not transform_proto.inputs:
2002-
raise ValueError("Missing input for transform: %s" % transform_proto)
2003-
2004-
for t in transform_proto.subtransforms:
2005-
validate_transform(t)
2006-
2007-
for t in pipeline_proto.root_transform_ids:
2008-
validate_transform(t)
2009-
2010-
2011-
def _dep_key(dep):
2012-
if dep.type_urn == common_urns.artifact_types.FILE.urn:
2013-
payload = beam_runner_api_pb2.ArtifactFilePayload.FromString(
2014-
dep.type_payload)
2015-
if payload.sha256:
2016-
type_info = 'sha256', payload.sha256
2017-
else:
2018-
type_info = 'path', payload.path
2019-
elif dep.type_urn == common_urns.artifact_types.URL.urn:
2020-
payload = beam_runner_api_pb2.ArtifactUrlPayload.FromString(
2021-
dep.type_payload)
2022-
if payload.sha256:
2023-
type_info = 'sha256', payload.sha256
2024-
else:
2025-
type_info = 'url', payload.url
2026-
else:
2027-
type_info = dep.type_urn, dep.type_payload
2028-
return type_info, dep.role_urn, dep.role_payload
2029-
2030-
2031-
def _expanded_dep_keys(dep):
2032-
if (dep.type_urn == common_urns.artifact_types.FILE.urn and
2033-
dep.role_urn == common_urns.artifact_roles.STAGING_TO.urn):
2034-
payload = beam_runner_api_pb2.ArtifactFilePayload.FromString(
2035-
dep.type_payload)
2036-
role = beam_runner_api_pb2.ArtifactStagingToRolePayload.FromString(
2037-
dep.role_payload)
2038-
if role.staged_name == 'submission_environment_dependencies.txt':
2039-
return
2040-
elif role.staged_name == 'requirements.txt':
2041-
with open(payload.path) as fin:
2042-
for line in fin:
2043-
yield 'requirements.txt', line.strip()
2044-
return
2045-
2046-
yield _dep_key(dep)
2047-
2048-
2049-
def _base_env_key(env, include_deps=True):
2050-
return (
2051-
env.urn,
2052-
env.payload,
2053-
tuple(sorted(env.capabilities)),
2054-
tuple(sorted(env.resource_hints.items())),
2055-
tuple(sorted(_dep_key(dep)
2056-
for dep in env.dependencies)) if include_deps else None)
2057-
2058-
2059-
def _env_key(env):
2060-
return tuple(
2061-
sorted(
2062-
_base_env_key(e)
2063-
for e in environments.expand_anyof_environments(env)))
2064-
2065-
2066-
def merge_common_environments(pipeline_proto, inplace=False):
2067-
canonical_environments = collections.defaultdict(list)
2068-
for env_id, env in pipeline_proto.components.environments.items():
2069-
canonical_environments[_env_key(env)].append(env_id)
2070-
2071-
if len(canonical_environments) == len(pipeline_proto.components.environments):
2072-
# All environments are already sufficiently distinct.
2073-
return pipeline_proto
2074-
2075-
environment_remappings = {
2076-
e: es[0]
2077-
for es in canonical_environments.values() for e in es
2078-
}
2079-
2080-
return update_environments(pipeline_proto, environment_remappings, inplace)
2081-
2082-
2083-
def merge_superset_dep_environments(pipeline_proto):
2084-
"""Merges all environemnts A and B where A and B are equivalent except that
2085-
A has a superset of the dependencies of B.
2086-
"""
2087-
docker_envs = {}
2088-
for env_id, env in pipeline_proto.components.environments.items():
2089-
docker_env = environments.resolve_anyof_environment(
2090-
env, common_urns.environments.DOCKER.urn)
2091-
if docker_env.urn == common_urns.environments.DOCKER.urn:
2092-
docker_envs[env_id] = docker_env
2093-
2094-
has_base_and_dep = collections.defaultdict(set)
2095-
env_scores = {
2096-
env_id: (len(env.dependencies), env_id)
2097-
for (env_id, env) in docker_envs.items()
2098-
}
2099-
2100-
for env_id, env in docker_envs.items():
2101-
base_key = _base_env_key(env, include_deps=False)
2102-
has_base_and_dep[base_key, None].add(env_id)
2103-
for dep in env.dependencies:
2104-
for dep_key in _expanded_dep_keys(dep):
2105-
has_base_and_dep[base_key, dep_key].add(env_id)
2106-
2107-
environment_remappings = {}
2108-
for env_id, env in docker_envs.items():
2109-
base_key = _base_env_key(env, include_deps=False)
2110-
# This is the set of all environments that have at least all of env's deps.
2111-
candidates = set.intersection(
2112-
has_base_and_dep[base_key, None],
2113-
*[
2114-
has_base_and_dep[base_key, dep_key] for dep in env.dependencies
2115-
for dep_key in _expanded_dep_keys(dep)
2116-
])
2117-
# Choose the maximal one.
2118-
best = max(candidates, key=env_scores.get)
2119-
if best != env_id:
2120-
environment_remappings[env_id] = best
2121-
2122-
return update_environments(pipeline_proto, environment_remappings)
2123-
2124-
2125-
def update_environments(pipeline_proto, environment_remappings, inplace=False):
2126-
if not environment_remappings:
2127-
return pipeline_proto
2128-
2129-
if not inplace:
2130-
pipeline_proto = copy.copy(pipeline_proto)
2131-
2132-
for t in pipeline_proto.components.transforms.values():
2133-
if t.environment_id not in pipeline_proto.components.environments:
2134-
# TODO(https://github.com/apache/beam/issues/30876): Remove this
2135-
# workaround.
2136-
continue
2137-
if t.environment_id and t.environment_id in environment_remappings:
2138-
t.environment_id = environment_remappings[t.environment_id]
2139-
for w in pipeline_proto.components.windowing_strategies.values():
2140-
if w.environment_id not in pipeline_proto.components.environments:
2141-
# TODO(https://github.com/apache/beam/issues/30876): Remove this
2142-
# workaround.
2143-
continue
2144-
if w.environment_id and w.environment_id in environment_remappings:
2145-
w.environment_id = environment_remappings[w.environment_id]
2146-
for e in set(environment_remappings.keys()) - set(
2147-
environment_remappings.values()):
2148-
del pipeline_proto.components.environments[e]
2149-
return pipeline_proto

0 commit comments

Comments
 (0)