Skip to content

Commit 19f9eb0

Browse files
authored
[yaml] - add multi-line windowing json support (#37174)
* add windowing json support * add a few tests * remove code change and just have tests * remove extra line * add multiline windowing config support back * remove extra line
1 parent 44ed498 commit 19f9eb0

File tree

2 files changed

+70
-0
lines changed

2 files changed

+70
-0
lines changed

sdks/python/apache_beam/yaml/yaml_transform.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1033,6 +1033,21 @@ def preprocess_windowing(spec):
10331033
if 'windowing' in spec:
10341034
spec['config'] = spec.get('config', {})
10351035
spec['config']['windowing'] = spec.pop('windowing')
1036+
1037+
if spec.get('config', {}).get('windowing'):
1038+
windowing_config = spec['config']['windowing']
1039+
if isinstance(windowing_config, str):
1040+
try:
1041+
# PyYAML can load a JSON string - one-line and multi-line.
1042+
# Without this code, multi-line is not supported.
1043+
parsed_config = yaml.safe_load(windowing_config)
1044+
if not isinstance(parsed_config, dict):
1045+
raise TypeError('Windowing config string must be a YAML/JSON map.')
1046+
spec['config']['windowing'] = parsed_config
1047+
except Exception as e:
1048+
raise ValueError(
1049+
f'Error parsing windowing config string at \
1050+
{identify_object(spec)}: {e}') from e
10361051
return spec
10371052
elif 'windowing' not in spec:
10381053
# Nothing to do.

sdks/python/apache_beam/yaml/yaml_transform_test.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -993,6 +993,61 @@ def test_explicit_window_into(self):
993993
providers=TEST_PROVIDERS)
994994
assert_that(result, equal_to([6, 9]))
995995

996+
def test_explicit_window_into_with_json_string_config_one_line(self):
997+
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
998+
pickle_library='cloudpickle')) as p:
999+
result = p | YamlTransform(
1000+
'''
1001+
type: chain
1002+
transforms:
1003+
- type: CreateTimestamped
1004+
config:
1005+
elements: [0, 1, 2, 3, 4, 5]
1006+
- type: WindowInto
1007+
config:
1008+
windowing: {"type": "fixed", "size": "4s"}
1009+
- type: SumGlobally
1010+
''',
1011+
providers=TEST_PROVIDERS)
1012+
assert_that(result, equal_to([6, 9]))
1013+
1014+
def test_explicit_window_into_with_json_string_config_multi_line(self):
1015+
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
1016+
pickle_library='cloudpickle')) as p:
1017+
result = p | YamlTransform(
1018+
'''
1019+
type: chain
1020+
transforms:
1021+
- type: CreateTimestamped
1022+
config:
1023+
elements: [0, 1, 2, 3, 4, 5]
1024+
- type: WindowInto
1025+
config:
1026+
windowing: |
1027+
{"type": "fixed", "size": "4s"}
1028+
- type: SumGlobally
1029+
''',
1030+
providers=TEST_PROVIDERS)
1031+
assert_that(result, equal_to([6, 9]))
1032+
1033+
def test_explicit_window_into_with_string_config_fails(self):
1034+
with self.assertRaisesRegex(ValueError, 'Error parsing windowing config'):
1035+
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
1036+
pickle_library='cloudpickle')) as p:
1037+
_ = p | YamlTransform(
1038+
'''
1039+
type: chain
1040+
transforms:
1041+
- type: CreateTimestamped
1042+
config:
1043+
elements: [0, 1, 2, 3, 4, 5]
1044+
- type: WindowInto
1045+
config:
1046+
windowing: |
1047+
'not a valid yaml'
1048+
''',
1049+
providers=TEST_PROVIDERS)
1050+
9961051
def test_windowing_on_input(self):
9971052
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
9981053
pickle_library='cloudpickle')) as p:

0 commit comments

Comments
 (0)