Skip to content

Commit 4e6cbfc

Browse files
authored
Prism windowed value coder (#34830)
* Support windowed value coder in prism. * Add a new test case that previously failed to run in prism. * Fix lints and trigger some validatesrunner tests. * Disable the newly added test for samza runner.
1 parent 1763e7b commit 4e6cbfc

File tree

6 files changed

+31
-1
lines changed

6 files changed

+31
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
{
2-
"https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support"
2+
"https://github.com/apache/beam/pull/32648": "testing addition of Flink 1.19 support",
3+
"https://github.com/apache/beam/pull/34830": "testing"
34
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"https://github.com/apache/beam/pull/34830": "testing"
3+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"https://github.com/apache/beam/pull/34830": "testing"
3+
}

sdks/go/pkg/beam/runners/prism/internal/coders.go

+11
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,17 @@ func pullDecoderNoAlloc(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(i
331331
kd(r)
332332
vd(r)
333333
}
334+
case urns.CoderWindowedValue:
335+
ccids := c.GetComponentCoderIds()
336+
if len(ccids) != 2 {
337+
panic(fmt.Sprintf("WindowedValue coder with more than 2 components: %s", prototext.Format(c)))
338+
}
339+
ed := pullDecoderNoAlloc(coders[ccids[0]], coders)
340+
wd := pullDecoderNoAlloc(coders[ccids[1]], coders)
341+
return func(r io.Reader) {
342+
ed(r)
343+
wd(r)
344+
}
334345
case urns.CoderRow:
335346
panic(fmt.Sprintf("Runner forgot to LP this Row Coder. %v", prototext.Format(c)))
336347
default:

sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py

+9
Original file line numberDiff line numberDiff line change
@@ -1071,6 +1071,15 @@ def test_reshuffle(self):
10711071
assert_that(
10721072
p | beam.Create([1, 2, 3]) | beam.Reshuffle(), equal_to([1, 2, 3]))
10731073

1074+
def test_reshuffle_after_custom_window(self):
1075+
with self.create_pipeline() as p:
1076+
assert_that(
1077+
p | beam.Create([12, 2, 1])
1078+
| beam.Map(lambda t: window.TimestampedValue(t, t))
1079+
| beam.WindowInto(beam.transforms.window.FixedWindows(2))
1080+
| beam.Reshuffle(),
1081+
equal_to([12, 2, 1]))
1082+
10741083
def test_flatten(self, with_transcoding=True):
10751084
with self.create_pipeline() as p:
10761085
if with_transcoding:

sdks/python/apache_beam/runners/portability/samza_runner_test.py

+3
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,9 @@ def test_custom_merging_window(self):
186186
def test_custom_window_type(self):
187187
raise unittest.SkipTest("https://github.com/apache/beam/issues/21049")
188188

189+
def test_reshuffle_after_custom_window(self):
190+
raise unittest.SkipTest("https://github.com/apache/beam/issues/34831")
191+
189192

190193
if __name__ == '__main__':
191194
# Run the tests.

0 commit comments

Comments
 (0)