|
22 | 22 |
|
23 | 23 | # pytype: skip-file |
24 | 24 |
|
25 | | -import collections |
26 | | -import copy |
27 | 25 | import logging |
28 | 26 | import sys |
29 | 27 | import threading |
|
42 | 40 | from apache_beam.coders import coders |
43 | 41 | from apache_beam.internal import util |
44 | 42 | from apache_beam.options.value_provider import RuntimeValueProvider |
45 | | -from apache_beam.portability import common_urns |
46 | | -from apache_beam.portability.api import beam_runner_api_pb2 |
47 | 43 | from apache_beam.pvalue import TaggedOutput |
48 | 44 | from apache_beam.runners.sdf_utils import NoOpWatermarkEstimatorProvider |
49 | 45 | from apache_beam.runners.sdf_utils import RestrictionTrackerView |
|
53 | 49 | from apache_beam.runners.sdf_utils import ThreadsafeWatermarkEstimator |
54 | 50 | from apache_beam.transforms import DoFn |
55 | 51 | from apache_beam.transforms import core |
56 | | -from apache_beam.transforms import environments |
57 | 52 | from apache_beam.transforms import userstate |
58 | 53 | from apache_beam.transforms.core import RestrictionProvider |
59 | 54 | from apache_beam.transforms.core import WatermarkEstimatorProvider |
60 | 55 | from apache_beam.transforms.window import GlobalWindow |
61 | 56 | from apache_beam.transforms.window import GlobalWindows |
62 | 57 | from apache_beam.transforms.window import TimestampedValue |
63 | 58 | from apache_beam.transforms.window import WindowFn |
64 | | -from apache_beam.typehints import typehints |
65 | 59 | from apache_beam.typehints.batch import BatchConverter |
66 | 60 | from apache_beam.utils.counters import Counter |
67 | 61 | from apache_beam.utils.counters import CounterName |
@@ -1920,230 +1914,3 @@ def windows(self): |
1920 | 1914 | raise AttributeError('windows not accessible in this context') |
1921 | 1915 | else: |
1922 | 1916 | return self.windowed_value.windows |
1923 | | - |
1924 | | - |
1925 | | -def group_by_key_input_visitor(deterministic_key_coders=True): |
1926 | | - # Importing here to avoid a circular dependency |
1927 | | - # pylint: disable=wrong-import-order, wrong-import-position |
1928 | | - from apache_beam.pipeline import PipelineVisitor |
1929 | | - from apache_beam.transforms.core import GroupByKey |
1930 | | - |
1931 | | - class GroupByKeyInputVisitor(PipelineVisitor): |
1932 | | - """A visitor that replaces `Any` element type for input `PCollection` of |
1933 | | - a `GroupByKey` with a `KV` type. |
1934 | | -
|
1935 | | - TODO(BEAM-115): Once Python SDK is compatible with the new Runner API, |
1936 | | - we could directly replace the coder instead of mutating the element type. |
1937 | | - """ |
1938 | | - def __init__(self, deterministic_key_coders=True): |
1939 | | - self.deterministic_key_coders = deterministic_key_coders |
1940 | | - |
1941 | | - def enter_composite_transform(self, transform_node): |
1942 | | - self.visit_transform(transform_node) |
1943 | | - |
1944 | | - def visit_transform(self, transform_node): |
1945 | | - if isinstance(transform_node.transform, GroupByKey): |
1946 | | - pcoll = transform_node.inputs[0] |
1947 | | - pcoll.element_type = typehints.coerce_to_kv_type( |
1948 | | - pcoll.element_type, transform_node.full_label) |
1949 | | - pcoll.requires_deterministic_key_coder = ( |
1950 | | - self.deterministic_key_coders and transform_node.full_label) |
1951 | | - key_type, value_type = pcoll.element_type.tuple_types |
1952 | | - if transform_node.outputs: |
1953 | | - key = next(iter(transform_node.outputs.keys())) |
1954 | | - transform_node.outputs[key].element_type = typehints.KV[ |
1955 | | - key_type, typehints.Iterable[value_type]] |
1956 | | - transform_node.outputs[key].requires_deterministic_key_coder = ( |
1957 | | - self.deterministic_key_coders and transform_node.full_label) |
1958 | | - |
1959 | | - return GroupByKeyInputVisitor(deterministic_key_coders) |
1960 | | - |
1961 | | - |
1962 | | -def validate_pipeline_graph(pipeline_proto): |
1963 | | - """Ensures this is a correctly constructed Beam pipeline. |
1964 | | - """ |
1965 | | - def get_coder(pcoll_id): |
1966 | | - return pipeline_proto.components.coders[ |
1967 | | - pipeline_proto.components.pcollections[pcoll_id].coder_id] |
1968 | | - |
1969 | | - def validate_transform(transform_id): |
1970 | | - transform_proto = pipeline_proto.components.transforms[transform_id] |
1971 | | - |
1972 | | - # Currently the only validation we perform is that GBK operations have |
1973 | | - # their coders set properly. |
1974 | | - if transform_proto.spec.urn == common_urns.primitives.GROUP_BY_KEY.urn: |
1975 | | - if len(transform_proto.inputs) != 1: |
1976 | | - raise ValueError("Unexpected number of inputs: %s" % transform_proto) |
1977 | | - if len(transform_proto.outputs) != 1: |
1978 | | - raise ValueError("Unexpected number of outputs: %s" % transform_proto) |
1979 | | - input_coder = get_coder(next(iter(transform_proto.inputs.values()))) |
1980 | | - output_coder = get_coder(next(iter(transform_proto.outputs.values()))) |
1981 | | - if input_coder.spec.urn != common_urns.coders.KV.urn: |
1982 | | - raise ValueError( |
1983 | | - "Bad coder for input of %s: %s" % (transform_id, input_coder)) |
1984 | | - if output_coder.spec.urn != common_urns.coders.KV.urn: |
1985 | | - raise ValueError( |
1986 | | - "Bad coder for output of %s: %s" % (transform_id, output_coder)) |
1987 | | - output_values_coder = pipeline_proto.components.coders[ |
1988 | | - output_coder.component_coder_ids[1]] |
1989 | | - if (input_coder.component_coder_ids[0] != |
1990 | | - output_coder.component_coder_ids[0] or |
1991 | | - output_values_coder.spec.urn != common_urns.coders.ITERABLE.urn or |
1992 | | - output_values_coder.component_coder_ids[0] != |
1993 | | - input_coder.component_coder_ids[1]): |
1994 | | - raise ValueError( |
1995 | | - "Incompatible input coder %s and output coder %s for transform %s" % |
1996 | | - (transform_id, input_coder, output_coder)) |
1997 | | - elif transform_proto.spec.urn == common_urns.primitives.ASSIGN_WINDOWS.urn: |
1998 | | - if not transform_proto.inputs: |
1999 | | - raise ValueError("Missing input for transform: %s" % transform_proto) |
2000 | | - elif transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn: |
2001 | | - if not transform_proto.inputs: |
2002 | | - raise ValueError("Missing input for transform: %s" % transform_proto) |
2003 | | - |
2004 | | - for t in transform_proto.subtransforms: |
2005 | | - validate_transform(t) |
2006 | | - |
2007 | | - for t in pipeline_proto.root_transform_ids: |
2008 | | - validate_transform(t) |
2009 | | - |
2010 | | - |
2011 | | -def _dep_key(dep): |
2012 | | - if dep.type_urn == common_urns.artifact_types.FILE.urn: |
2013 | | - payload = beam_runner_api_pb2.ArtifactFilePayload.FromString( |
2014 | | - dep.type_payload) |
2015 | | - if payload.sha256: |
2016 | | - type_info = 'sha256', payload.sha256 |
2017 | | - else: |
2018 | | - type_info = 'path', payload.path |
2019 | | - elif dep.type_urn == common_urns.artifact_types.URL.urn: |
2020 | | - payload = beam_runner_api_pb2.ArtifactUrlPayload.FromString( |
2021 | | - dep.type_payload) |
2022 | | - if payload.sha256: |
2023 | | - type_info = 'sha256', payload.sha256 |
2024 | | - else: |
2025 | | - type_info = 'url', payload.url |
2026 | | - else: |
2027 | | - type_info = dep.type_urn, dep.type_payload |
2028 | | - return type_info, dep.role_urn, dep.role_payload |
2029 | | - |
2030 | | - |
2031 | | -def _expanded_dep_keys(dep): |
2032 | | - if (dep.type_urn == common_urns.artifact_types.FILE.urn and |
2033 | | - dep.role_urn == common_urns.artifact_roles.STAGING_TO.urn): |
2034 | | - payload = beam_runner_api_pb2.ArtifactFilePayload.FromString( |
2035 | | - dep.type_payload) |
2036 | | - role = beam_runner_api_pb2.ArtifactStagingToRolePayload.FromString( |
2037 | | - dep.role_payload) |
2038 | | - if role.staged_name == 'submission_environment_dependencies.txt': |
2039 | | - return |
2040 | | - elif role.staged_name == 'requirements.txt': |
2041 | | - with open(payload.path) as fin: |
2042 | | - for line in fin: |
2043 | | - yield 'requirements.txt', line.strip() |
2044 | | - return |
2045 | | - |
2046 | | - yield _dep_key(dep) |
2047 | | - |
2048 | | - |
2049 | | -def _base_env_key(env, include_deps=True): |
2050 | | - return ( |
2051 | | - env.urn, |
2052 | | - env.payload, |
2053 | | - tuple(sorted(env.capabilities)), |
2054 | | - tuple(sorted(env.resource_hints.items())), |
2055 | | - tuple(sorted(_dep_key(dep) |
2056 | | - for dep in env.dependencies)) if include_deps else None) |
2057 | | - |
2058 | | - |
2059 | | -def _env_key(env): |
2060 | | - return tuple( |
2061 | | - sorted( |
2062 | | - _base_env_key(e) |
2063 | | - for e in environments.expand_anyof_environments(env))) |
2064 | | - |
2065 | | - |
2066 | | -def merge_common_environments(pipeline_proto, inplace=False): |
2067 | | - canonical_environments = collections.defaultdict(list) |
2068 | | - for env_id, env in pipeline_proto.components.environments.items(): |
2069 | | - canonical_environments[_env_key(env)].append(env_id) |
2070 | | - |
2071 | | - if len(canonical_environments) == len(pipeline_proto.components.environments): |
2072 | | - # All environments are already sufficiently distinct. |
2073 | | - return pipeline_proto |
2074 | | - |
2075 | | - environment_remappings = { |
2076 | | - e: es[0] |
2077 | | - for es in canonical_environments.values() for e in es |
2078 | | - } |
2079 | | - |
2080 | | - return update_environments(pipeline_proto, environment_remappings, inplace) |
2081 | | - |
2082 | | - |
2083 | | -def merge_superset_dep_environments(pipeline_proto): |
2084 | | - """Merges all environemnts A and B where A and B are equivalent except that |
2085 | | - A has a superset of the dependencies of B. |
2086 | | - """ |
2087 | | - docker_envs = {} |
2088 | | - for env_id, env in pipeline_proto.components.environments.items(): |
2089 | | - docker_env = environments.resolve_anyof_environment( |
2090 | | - env, common_urns.environments.DOCKER.urn) |
2091 | | - if docker_env.urn == common_urns.environments.DOCKER.urn: |
2092 | | - docker_envs[env_id] = docker_env |
2093 | | - |
2094 | | - has_base_and_dep = collections.defaultdict(set) |
2095 | | - env_scores = { |
2096 | | - env_id: (len(env.dependencies), env_id) |
2097 | | - for (env_id, env) in docker_envs.items() |
2098 | | - } |
2099 | | - |
2100 | | - for env_id, env in docker_envs.items(): |
2101 | | - base_key = _base_env_key(env, include_deps=False) |
2102 | | - has_base_and_dep[base_key, None].add(env_id) |
2103 | | - for dep in env.dependencies: |
2104 | | - for dep_key in _expanded_dep_keys(dep): |
2105 | | - has_base_and_dep[base_key, dep_key].add(env_id) |
2106 | | - |
2107 | | - environment_remappings = {} |
2108 | | - for env_id, env in docker_envs.items(): |
2109 | | - base_key = _base_env_key(env, include_deps=False) |
2110 | | - # This is the set of all environments that have at least all of env's deps. |
2111 | | - candidates = set.intersection( |
2112 | | - has_base_and_dep[base_key, None], |
2113 | | - *[ |
2114 | | - has_base_and_dep[base_key, dep_key] for dep in env.dependencies |
2115 | | - for dep_key in _expanded_dep_keys(dep) |
2116 | | - ]) |
2117 | | - # Choose the maximal one. |
2118 | | - best = max(candidates, key=env_scores.get) |
2119 | | - if best != env_id: |
2120 | | - environment_remappings[env_id] = best |
2121 | | - |
2122 | | - return update_environments(pipeline_proto, environment_remappings) |
2123 | | - |
2124 | | - |
2125 | | -def update_environments(pipeline_proto, environment_remappings, inplace=False): |
2126 | | - if not environment_remappings: |
2127 | | - return pipeline_proto |
2128 | | - |
2129 | | - if not inplace: |
2130 | | - pipeline_proto = copy.copy(pipeline_proto) |
2131 | | - |
2132 | | - for t in pipeline_proto.components.transforms.values(): |
2133 | | - if t.environment_id not in pipeline_proto.components.environments: |
2134 | | - # TODO(https://github.com/apache/beam/issues/30876): Remove this |
2135 | | - # workaround. |
2136 | | - continue |
2137 | | - if t.environment_id and t.environment_id in environment_remappings: |
2138 | | - t.environment_id = environment_remappings[t.environment_id] |
2139 | | - for w in pipeline_proto.components.windowing_strategies.values(): |
2140 | | - if w.environment_id not in pipeline_proto.components.environments: |
2141 | | - # TODO(https://github.com/apache/beam/issues/30876): Remove this |
2142 | | - # workaround. |
2143 | | - continue |
2144 | | - if w.environment_id and w.environment_id in environment_remappings: |
2145 | | - w.environment_id = environment_remappings[w.environment_id] |
2146 | | - for e in set(environment_remappings.keys()) - set( |
2147 | | - environment_remappings.values()): |
2148 | | - del pipeline_proto.components.environments[e] |
2149 | | - return pipeline_proto |
0 commit comments