Skip to content

Commit 7e7666e

Browse files
committed
More eagerly merge compatible environments.
Specifically, merge environments that are identical except for their dependencies, but where one environment's dependencies are a superset of the others. This will reduce the number of distinct environments of the same type, and the associated materialization costs of moving between them. Ideally this would be done in a runner for environments that are already co-located on the same worker, but that is a further optimization.
1 parent 781917a commit 7e7666e

File tree

4 files changed

+224
-38
lines changed

4 files changed

+224
-38
lines changed

sdks/python/apache_beam/runners/common.py

Lines changed: 107 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2008,43 +2008,65 @@ def validate_transform(transform_id):
20082008
validate_transform(t)
20092009

20102010

2011-
def merge_common_environments(pipeline_proto, inplace=False):
2012-
def dep_key(dep):
2013-
if dep.type_urn == common_urns.artifact_types.FILE.urn:
2014-
payload = beam_runner_api_pb2.ArtifactFilePayload.FromString(
2015-
dep.type_payload)
2016-
if payload.sha256:
2017-
type_info = 'sha256', payload.sha256
2018-
else:
2019-
type_info = 'path', payload.path
2020-
elif dep.type_urn == common_urns.artifact_types.URL.urn:
2021-
payload = beam_runner_api_pb2.ArtifactUrlPayload.FromString(
2022-
dep.type_payload)
2023-
if payload.sha256:
2024-
type_info = 'sha256', payload.sha256
2025-
else:
2026-
type_info = 'url', payload.url
2011+
def _dep_key(dep):
2012+
if dep.type_urn == common_urns.artifact_types.FILE.urn:
2013+
payload = beam_runner_api_pb2.ArtifactFilePayload.FromString(
2014+
dep.type_payload)
2015+
if payload.sha256:
2016+
type_info = 'sha256', payload.sha256
20272017
else:
2028-
type_info = dep.type_urn, dep.type_payload
2029-
return type_info, dep.role_urn, dep.role_payload
2030-
2031-
def base_env_key(env):
2032-
return (
2033-
env.urn,
2034-
env.payload,
2035-
tuple(sorted(env.capabilities)),
2036-
tuple(sorted(env.resource_hints.items())),
2037-
tuple(sorted(dep_key(dep) for dep in env.dependencies)))
2038-
2039-
def env_key(env):
2040-
return tuple(
2041-
sorted(
2042-
base_env_key(e)
2043-
for e in environments.expand_anyof_environments(env)))
2018+
type_info = 'path', payload.path
2019+
elif dep.type_urn == common_urns.artifact_types.URL.urn:
2020+
payload = beam_runner_api_pb2.ArtifactUrlPayload.FromString(
2021+
dep.type_payload)
2022+
if payload.sha256:
2023+
type_info = 'sha256', payload.sha256
2024+
else:
2025+
type_info = 'url', payload.url
2026+
else:
2027+
type_info = dep.type_urn, dep.type_payload
2028+
return type_info, dep.role_urn, dep.role_payload
2029+
2030+
2031+
def _expanded_dep_keys(dep):
2032+
if (dep.type_urn == common_urns.artifact_types.FILE.urn and
2033+
dep.role_urn == common_urns.artifact_roles.STAGING_TO.urn):
2034+
payload = beam_runner_api_pb2.ArtifactFilePayload.FromString(
2035+
dep.type_payload)
2036+
role = beam_runner_api_pb2.ArtifactStagingToRolePayload.FromString(
2037+
dep.role_payload)
2038+
if role.staged_name == 'submission_environment_dependencies.txt':
2039+
return
2040+
elif role.staged_name == 'requirements.txt':
2041+
with open(payload.path) as fin:
2042+
for line in fin:
2043+
yield 'requirements.txt', line.strip()
2044+
return
2045+
2046+
yield _dep_key(dep)
2047+
2048+
2049+
def _base_env_key(env, include_deps=True):
2050+
return (
2051+
env.urn,
2052+
env.payload,
2053+
tuple(sorted(env.capabilities)),
2054+
tuple(sorted(env.resource_hints.items())),
2055+
tuple(sorted(_dep_key(dep)
2056+
for dep in env.dependencies)) if include_deps else None)
2057+
20442058

2059+
def _env_key(env):
2060+
return tuple(
2061+
sorted(
2062+
_base_env_key(e)
2063+
for e in environments.expand_anyof_environments(env)))
2064+
2065+
2066+
def merge_common_environments(pipeline_proto, inplace=False):
20452067
canonical_environments = collections.defaultdict(list)
20462068
for env_id, env in pipeline_proto.components.environments.items():
2047-
canonical_environments[env_key(env)].append(env_id)
2069+
canonical_environments[_env_key(env)].append(env_id)
20482070

20492071
if len(canonical_environments) == len(pipeline_proto.components.environments):
20502072
# All environments are already sufficiently distinct.
@@ -2055,6 +2077,55 @@ def env_key(env):
20552077
for es in canonical_environments.values() for e in es
20562078
}
20572079

2080+
return update_environments(pipeline_proto, environment_remappings, inplace)
2081+
2082+
2083+
def merge_superset_dep_environments(pipeline_proto):
2084+
"""Merges all environemnts A and B where A and B are equivalent except that
2085+
A has a superset of the dependencies of B.
2086+
"""
2087+
docker_envs = {}
2088+
for env_id, env in pipeline_proto.components.environments.items():
2089+
docker_env = environments.resolve_anyof_environment(
2090+
env, common_urns.environments.DOCKER.urn)
2091+
if docker_env.urn == common_urns.environments.DOCKER.urn:
2092+
docker_envs[env_id] = docker_env
2093+
2094+
has_base_and_dep = collections.defaultdict(set)
2095+
env_scores = {
2096+
env_id: (len(env.dependencies), env_id)
2097+
for (env_id, env) in docker_envs.items()
2098+
}
2099+
2100+
for env_id, env in docker_envs.items():
2101+
base_key = _base_env_key(env, include_deps=False)
2102+
has_base_and_dep[base_key, None].add(env_id)
2103+
for dep in env.dependencies:
2104+
for dep_key in _expanded_dep_keys(dep):
2105+
has_base_and_dep[base_key, dep_key].add(env_id)
2106+
2107+
environment_remappings = {}
2108+
for env_id, env in docker_envs.items():
2109+
base_key = _base_env_key(env, include_deps=False)
2110+
# This is the set of all environments that have at least all of env's deps.
2111+
candidates = set.intersection(
2112+
has_base_and_dep[base_key, None],
2113+
*[
2114+
has_base_and_dep[base_key, dep_key] for dep in env.dependencies
2115+
for dep_key in _expanded_dep_keys(dep)
2116+
])
2117+
# Choose the maximal one.
2118+
best = max(candidates, key=env_scores.get)
2119+
if best != env_id:
2120+
environment_remappings[env_id] = best
2121+
2122+
return update_environments(pipeline_proto, environment_remappings)
2123+
2124+
2125+
def update_environments(pipeline_proto, environment_remappings, inplace=False):
2126+
if not environment_remappings:
2127+
return pipeline_proto
2128+
20582129
if not inplace:
20592130
pipeline_proto = copy.copy(pipeline_proto)
20602131

@@ -2063,16 +2134,16 @@ def env_key(env):
20632134
# TODO(https://github.com/apache/beam/issues/30876): Remove this
20642135
# workaround.
20652136
continue
2066-
if t.environment_id:
2137+
if t.environment_id and t.environment_id in environment_remappings:
20672138
t.environment_id = environment_remappings[t.environment_id]
20682139
for w in pipeline_proto.components.windowing_strategies.values():
20692140
if w.environment_id not in pipeline_proto.components.environments:
20702141
# TODO(https://github.com/apache/beam/issues/30876): Remove this
20712142
# workaround.
20722143
continue
2073-
if w.environment_id:
2144+
if w.environment_id and w.environment_id in environment_remappings:
20742145
w.environment_id = environment_remappings[w.environment_id]
2075-
for e in set(pipeline_proto.components.environments.keys()) - set(
2146+
for e in set(environment_remappings.keys()) - set(
20762147
environment_remappings.values()):
20772148
del pipeline_proto.components.environments[e]
20782149
return pipeline_proto

sdks/python/apache_beam/runners/common_test.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717

1818
# pytype: skip-file
1919

20+
import hashlib
21+
import os
22+
import random
23+
import tempfile
2024
import unittest
2125

2226
import hamcrest as hc
@@ -26,10 +30,12 @@
2630
from apache_beam.io.restriction_trackers import OffsetRestrictionTracker
2731
from apache_beam.io.watermark_estimators import ManualWatermarkEstimator
2832
from apache_beam.options.pipeline_options import PipelineOptions
33+
from apache_beam.portability import common_urns
2934
from apache_beam.portability.api import beam_runner_api_pb2
3035
from apache_beam.runners.common import DoFnSignature
3136
from apache_beam.runners.common import PerWindowInvoker
3237
from apache_beam.runners.common import merge_common_environments
38+
from apache_beam.runners.common import merge_superset_dep_environments
3339
from apache_beam.runners.portability.expansion_service_test import FibTransform
3440
from apache_beam.runners.sdf_utils import SplitResultPrimary
3541
from apache_beam.runners.sdf_utils import SplitResultResidual
@@ -632,6 +638,111 @@ def test_equal_environments_merged(self):
632638
pipeline_proto.components.windowing_strategies.values())),
633639
1)
634640

641+
def _make_dep(self, path):
642+
hasher = hashlib.sha256()
643+
if os.path.exists(path):
644+
with open(path, 'rb') as fin:
645+
hasher.update(fin.read())
646+
else:
647+
# A fake file, identified only by its path.
648+
hasher.update(path.encode('utf-8'))
649+
return beam_runner_api_pb2.ArtifactInformation(
650+
type_urn=common_urns.artifact_types.FILE.urn,
651+
type_payload=beam_runner_api_pb2.ArtifactFilePayload(
652+
path=path, sha256=hasher.hexdigest()).SerializeToString(),
653+
role_urn=common_urns.artifact_roles.STAGING_TO.urn,
654+
role_payload=beam_runner_api_pb2.ArtifactStagingToRolePayload(
655+
staged_name=os.path.basename(path)).SerializeToString())
656+
657+
def _docker_env(self, id, deps=()):
658+
return beam_runner_api_pb2.Environment(
659+
urn=common_urns.environments.DOCKER.urn,
660+
payload=id.encode('utf8'),
661+
dependencies=[self._make_dep(path) for path in deps],
662+
)
663+
664+
def test_subset_deps_environments_merged(self):
665+
environments = {
666+
'A': self._docker_env('A'),
667+
'Ax': self._docker_env('A', ['x']),
668+
'Ay': self._docker_env('A', ['y']),
669+
'Axy': self._docker_env('A', ['x', 'y']),
670+
'Bx': self._docker_env('B', ['x']),
671+
'Bxy': self._docker_env('B', ['x', 'y']),
672+
'Byz': self._docker_env('B', ['y', 'z']),
673+
}
674+
transforms = {
675+
env_id: beam_runner_api_pb2.PTransform(
676+
unique_name=env_id, environment_id=env_id)
677+
for env_id in environments.keys()
678+
}
679+
pipeline_proto = merge_superset_dep_environments(
680+
beam_runner_api_pb2.Pipeline(
681+
components=beam_runner_api_pb2.Components(
682+
environments=environments, transforms=transforms)))
683+
684+
# These can all be merged into the same environment.
685+
self.assertEqual(
686+
pipeline_proto.components.transforms['A'].environment_id, 'Axy')
687+
self.assertEqual(
688+
pipeline_proto.components.transforms['Ax'].environment_id, 'Axy')
689+
self.assertEqual(
690+
pipeline_proto.components.transforms['Ay'].environment_id, 'Axy')
691+
self.assertEqual(
692+
pipeline_proto.components.transforms['Axy'].environment_id, 'Axy')
693+
# Despite having the same dependencies, these must be merged into their own.
694+
self.assertEqual(
695+
pipeline_proto.components.transforms['Bx'].environment_id, 'Bxy')
696+
self.assertEqual(
697+
pipeline_proto.components.transforms['Bxy'].environment_id, 'Bxy')
698+
# This is not a subset of any, must be left alone.
699+
self.assertEqual(
700+
pipeline_proto.components.transforms['Byz'].environment_id, 'Byz')
701+
702+
def test_subset_deps_environments_merged_with_requirements_txt(self):
703+
with tempfile.TemporaryDirectory() as tmpdir:
704+
705+
def make_file(basename, content):
706+
subdir = tempfile.TemporaryDirectory(dir=tmpdir, delete=False).name
707+
path = os.path.join(subdir, basename)
708+
with open(path, 'w') as fout:
709+
fout.write(content)
710+
return path
711+
712+
def make_py_deps(*pkgs):
713+
return [
714+
make_file('requirements.txt', '\n'.join(pkgs)),
715+
make_file(
716+
'submission_environment_dependencies.txt', str(
717+
random.random())),
718+
] + [make_file(pkg, pkg) for pkg in pkgs]
719+
720+
environments = {
721+
'A': self._docker_env('A'),
722+
'Ax': self._docker_env('A', make_py_deps('x')),
723+
'Ay': self._docker_env('A', make_py_deps('y')),
724+
'Axy': self._docker_env('A', make_py_deps('x', 'y')),
725+
}
726+
transforms = {
727+
env_id: beam_runner_api_pb2.PTransform(
728+
unique_name=env_id, environment_id=env_id)
729+
for env_id in environments.keys()
730+
}
731+
pipeline_proto = merge_superset_dep_environments(
732+
beam_runner_api_pb2.Pipeline(
733+
components=beam_runner_api_pb2.Components(
734+
environments=environments, transforms=transforms)))
735+
736+
# These can all be merged into the same environment.
737+
self.assertEqual(
738+
pipeline_proto.components.transforms['A'].environment_id, 'Axy')
739+
self.assertEqual(
740+
pipeline_proto.components.transforms['Ax'].environment_id, 'Axy')
741+
self.assertEqual(
742+
pipeline_proto.components.transforms['Ay'].environment_id, 'Axy')
743+
self.assertEqual(
744+
pipeline_proto.components.transforms['Axy'].environment_id, 'Axy')
745+
635746
def test_external_merged(self):
636747
p = beam.Pipeline()
637748
# This transform recursively creates several external environments.

sdks/python/apache_beam/runners/dataflow/dataflow_runner.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
from apache_beam.portability.api import beam_runner_api_pb2
4646
from apache_beam.runners.common import group_by_key_input_visitor
4747
from apache_beam.runners.common import merge_common_environments
48+
from apache_beam.runners.common import merge_superset_dep_environments
4849
from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api
4950
from apache_beam.runners.runner import PipelineResult
5051
from apache_beam.runners.runner import PipelineRunner
@@ -434,7 +435,8 @@ def run_pipeline(self, pipeline, options, pipeline_proto=None):
434435
self.proto_pipeline.components.environments[env_id].CopyFrom(
435436
environments.resolve_anyof_environment(
436437
env, common_urns.environments.DOCKER.urn))
437-
self.proto_pipeline = merge_common_environments(self.proto_pipeline)
438+
self.proto_pipeline = merge_common_environments(
439+
merge_superset_dep_environments(self.proto_pipeline))
438440

439441
# Optimize the pipeline if it not streaming and the pre_optimize
440442
# experiment is set.

sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
from apache_beam.runners import runner
6565
from apache_beam.runners.common import group_by_key_input_visitor
6666
from apache_beam.runners.common import merge_common_environments
67+
from apache_beam.runners.common import merge_superset_dep_environments
6768
from apache_beam.runners.common import validate_pipeline_graph
6869
from apache_beam.runners.portability import portable_metrics
6970
from apache_beam.runners.portability.fn_api_runner import execution
@@ -216,7 +217,8 @@ def run_via_runner_api(
216217
if direct_options.direct_embed_docker_python:
217218
pipeline_proto = self.embed_default_docker_image(pipeline_proto)
218219
pipeline_proto = merge_common_environments(
219-
self.resolve_any_environments(pipeline_proto))
220+
self.resolve_any_environments(
221+
merge_superset_dep_environments(pipeline_proto)))
220222
stage_context, stages = self.create_stages(pipeline_proto)
221223
return self.run_stages(stage_context, stages)
222224

0 commit comments

Comments
 (0)