|
21 | 21 | import os.path |
22 | 22 | import queue |
23 | 23 | import shlex |
24 | | -import threading |
25 | 24 | import time |
26 | 25 | import typing |
27 | 26 | import unittest |
@@ -189,6 +188,7 @@ def create_options(self): |
189 | 188 | options.view_as(StandardOptions).streaming = self.streaming |
190 | 189 | options.view_as( |
191 | 190 | TypeOptions).allow_unsafe_triggers = self.allow_unsafe_triggers |
| 191 | + options.view_as(PortableOptions).job_server_timeout = 10 |
192 | 192 | return options |
193 | 193 |
|
194 | 194 | # Can't read host files from within docker, read a "local" file there. |
@@ -300,6 +300,25 @@ def test_after_count_trigger_streaming(self): |
300 | 300 | ('B-3', {10, 15, 16}), |
301 | 301 | ]))) |
302 | 302 |
|
| 303 | + def test_dofn_failure_clean_exit(self): |
| 304 | + class FailDoFn(beam.DoFn): |
| 305 | + def process(self, element): |
| 306 | + raise ValueError("Failing as intended") |
| 307 | + |
| 308 | + class BlockDoFn(beam.DoFn): |
| 309 | + def process(self, element): |
| 310 | + time.sleep(1000) |
| 311 | + yield element |
| 312 | + |
| 313 | + with self.assertRaisesRegex(Exception, "Failing as intended"): |
| 314 | + with self.create_pipeline() as p: |
| 315 | + imp = p | beam.Create([1, 2]) |
| 316 | + # Ensure the steps are not fused (otherwise siblings are run sequentially |
| 317 | + # in a single thread, making execution order dependent on internal map |
| 318 | + # traversal). Reshuffle acts as a fusion break so they run in parallel. |
| 319 | + _ = imp | 'ReshuffleBlock' >> beam.Reshuffle() | 'Block' >> beam.ParDo(BlockDoFn()) |
| 320 | + _ = imp | 'ReshuffleFail' >> beam.Reshuffle() | 'Fail' >> beam.ParDo(FailDoFn()) |
| 321 | + |
303 | 322 |
|
304 | 323 | class PrismJobServerTest(unittest.TestCase): |
305 | 324 | def setUp(self) -> None: |
|
0 commit comments