|
39 | 39 | from apache_beam.runners.portability import prism_runner
|
40 | 40 | from apache_beam.testing.util import assert_that
|
41 | 41 | from apache_beam.testing.util import equal_to
|
| 42 | +from apache_beam.utils import subprocess_server |
42 | 43 |
|
43 | 44 | # Run as
|
44 | 45 | #
|
@@ -389,37 +390,28 @@ def test_singleton(self, enable_singleton):
|
389 | 390 | else:
|
390 | 391 | options = DebugOptions()
|
391 | 392 |
|
| 393 | + runner = prism_runner.PrismRunner() |
392 | 394 | with mock.patch(
|
393 |
| - 'apache_beam.runners.portability.job_server.subprocess_server.SubprocessServer.start' # pylint: disable=line-too-long |
394 |
| - ) as mock_start: |
| 395 | + 'apache_beam.runners.portability.prism_runner.PrismJobServer' |
| 396 | + ) as mock_prism_server: |
| 397 | + |
395 | 398 | # Reset the class-level singleton for every fresh run
|
396 | 399 | prism_runner.PrismRunner._PrismRunner__singleton = None
|
397 | 400 |
|
398 |
| - try: |
399 |
| - with beam.Pipeline(options=options, |
400 |
| - runner=prism_runner.PrismRunner()) as p: |
401 |
| - _ = p | "Create Elements" >> beam.Create( |
402 |
| - range(5)) | "Squares" >> beam.Map(lambda x: x**2) |
403 |
| - except: # pylint: disable=bare-except |
404 |
| - pass |
405 |
| - |
406 |
| - mock_start.assert_called_once() |
407 |
| - mock_start.reset_mock() |
408 |
| - |
409 |
| - try: |
410 |
| - with beam.Pipeline(options=options, |
411 |
| - runner=prism_runner.PrismRunner()) as p: |
412 |
| - _ = p | "Create Elements" >> beam.Create( |
413 |
| - range(5)) | "Squares" >> beam.Map(lambda x: x**2) |
414 |
| - except: # pylint: disable=bare-except |
415 |
| - pass |
| 401 | + runner = prism_runner.PrismRunner() |
| 402 | + runner.default_job_server(options) |
| 403 | + |
| 404 | + mock_prism_server.assert_called_once() |
| 405 | + mock_prism_server.reset_mock() |
416 | 406 |
|
| 407 | + runner = prism_runner.PrismRunner() |
| 408 | + runner.default_job_server(options) |
417 | 409 | if enable_singleton:
|
418 |
| - # If singleton is enabled, we won't try to start a new server for the |
| 410 | + # If singleton is enabled, we won't try to create a new server for the |
419 | 411 | # second run.
|
420 |
| - mock_start.assert_not_called() |
| 412 | + mock_prism_server.assert_not_called() |
421 | 413 | else:
|
422 |
| - mock_start.assert_called_once() |
| 414 | + mock_prism_server.assert_called_once() |
423 | 415 |
|
424 | 416 |
|
425 | 417 | if __name__ == '__main__':
|
|
0 commit comments