Skip to content

Commit 1fe7ee9

Browse files
authored
Merge pull request #34595 Properly propagate schemas of Beam YAML Partition transform.
2 parents 7373972 + f910184 commit 1fe7ee9

File tree

2 files changed

+4
-0
lines changed

2 files changed

+4
-0
lines changed

Diff for: sdks/python/apache_beam/yaml/yaml_mapping.py

+3
Original file line numberDiff line numberDiff line change
@@ -816,6 +816,9 @@ def split(element):
816816
mapping_transform = mapping_transform.with_outputs(*output_set)
817817
splits = pcoll | mapping_transform.with_input_types(T).with_output_types(T)
818818
result = {out: getattr(splits, out) for out in output_set}
819+
for tag, out in result.items():
820+
if tag != error_output:
821+
out.element_type = pcoll.element_type
819822
if error_output:
820823
result[error_output] = result[error_output] | map_errors_to_standard_format(
821824
pcoll.element_type)

Diff for: sdks/python/apache_beam/yaml/yaml_mapping_test.py

+1
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ def test_partition(self):
212212
language: python
213213
outputs: [even, odd]
214214
''')
215+
self.assertEqual(result['even'].element_type, elements.element_type)
215216
assert_that(
216217
result['even'] | beam.Map(lambda x: x.element),
217218
equal_to(['banana', 'orange']),

0 commit comments

Comments
 (0)