Skip to content

Fix MongoDB connector by adding codec options #25913

Fix MongoDB connector by adding codec options

Fix MongoDB connector by adding codec options #25913

GitHub Actions / Test Results failed Mar 21, 2025 in 0s

27 fail, 613 skipped, 1 420 pass in 36m 16s

    4 files  +    2      4 suites  +2   36m 16s ⏱️ + 31m 22s
2 060 tests +1 656  1 420 ✅ +1 106    613 💤 +  524  27 ❌ +26 
4 122 runs  +3 712  2 669 ✅ +2 355  1 426 💤 +1 331  27 ❌ +26 

Results for commit 571bf9d. ± Comparison against earlier commit b591f83.

Annotations

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_callbacks_with_exception (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 1s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:07:32.796359674+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_callbacks_with_exception>

    def test_callbacks_with_exception(self):
      elements_list = ['1', '2']
    
      def raise_expetion():
        raise Exception('raise exception when calling callback')
    
      class FinalizebleDoFnWithException(beam.DoFn):
        def process(
            self, element, bundle_finalizer=beam.DoFn.BundleFinalizerParam):
          bundle_finalizer.register(raise_expetion)
          yield element
    
      with self.create_pipeline() as p:
        res = (
            p
            | beam.Create(elements_list)
            | beam.ParDo(FinalizebleDoFnWithException()))
>       assert_that(res, equal_to(['1', '2']))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1299: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {created_time:"2025-03-21T07:07:32.796359674+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:07:32.796359674+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_combine_per_key (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 1s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:08:34.4091421+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_combine_per_key>

    def test_combine_per_key(self):
      with self.create_pipeline() as p:
        res = (
            p
            | beam.Create([('a', 1), ('a', 2), ('b', 3)])
            | beam.CombinePerKey(beam.combiners.MeanCombineFn()))
>       assert_that(res, equal_to([('a', 1.5), ('b', 3.0)]))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1082: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...eived from peer  {created_time:"2025-03-21T07:08:34.4091421+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:08:34.4091421+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_create (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 2s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:09:38.112143778+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_create>

    def test_create(self):
      with self.create_pipeline() as p:
>       assert_that(p | beam.Create(['a', 'b']), equal_to(['a', 'b']))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:118: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {created_time:"2025-03-21T07:09:38.112143778+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:09:38.112143778+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_create_value_provider_pipeline_option (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 1s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:10:39.802589745+00:00"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_create_value_provider_pipeline_option>

    def test_create_value_provider_pipeline_option(self):
      # Verify that the runner can execute a pipeline when there are value
      # provider pipeline options
      # pylint: disable=unused-variable
      class FooOptions(PipelineOptions):
        @classmethod
        def _add_argparse_args(cls, parser):
          parser.add_value_provider_argument(
              "--foo", help='a value provider argument', default="bar")
    
      RuntimeValueProvider.set_runtime_options({})
    
      with self.create_pipeline() as p:
>       assert_that(p | beam.Create(['a', 'b']), equal_to(['a', 'b']))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1381: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:10:39.802589745+00:00"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:10:39.802589745+00:00"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_custom_merging_window (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 0s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:11:40.911893001+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_custom_merging_window>

    def test_custom_merging_window(self):
      with self.create_pipeline() as p:
        res = (
            p
            | beam.Create([1, 2, 100, 101, 102])
            | beam.Map(lambda t: window.TimestampedValue(('k', t), t))
            | beam.WindowInto(CustomMergingWindowFn())
            | beam.GroupByKey()
            | beam.Map(lambda k_vs1: (k_vs1[0], sorted(k_vs1[1]))))
>       assert_that(
            res, equal_to([('k', [1]), ('k', [101]), ('k', [2, 100, 102])]))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1138: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {created_time:"2025-03-21T07:11:40.911893001+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:11:40.911893001+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_custom_window_type (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 3s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:12:45.011207761+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_custom_window_type>

    def test_custom_window_type(self):
      with self.create_pipeline() as p:
        res = (
            p
            | beam.Create([1, 2, 100, 101, 102])
            | beam.Map(lambda t: window.TimestampedValue(('k', t), t))
            | beam.WindowInto(EvenOddWindows())
            | beam.GroupByKey()
            | beam.Map(lambda k_vs1: (k_vs1[0], sorted(k_vs1[1]))))
>       assert_that(
            res,
            equal_to([('k', [1]), ('k', [2]), ('k', [101]), ('k', [100, 102])]))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1153: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {created_time:"2025-03-21T07:12:45.011207761+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:12:45.011207761+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_element_to_batch_pardo (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 0s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:13:46.116155389+00:00"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_element_to_batch_pardo>

    def test_element_to_batch_pardo(self):
      class ArrayProduceDoFn(beam.DoFn):
        @beam.DoFn.yields_batches
        def process(self, element: np.int64, *unused_args,
                    **unused_kwargs) -> Iterator[np.ndarray]:
          yield np.array([element] * int(element))
    
        # infer_output_type must be defined (when there's no process method),
        # otherwise we don't know the input type is the same as output type.
        def infer_output_type(self, input_type):
          return np.int64
    
      with self.create_pipeline() as p:
        res = (
            p
            | beam.Create(np.array([1, 2, 3], dtype=np.int64)).with_output_types(
                np.int64)
            | beam.ParDo(ArrayProduceDoFn())
            | beam.ParDo(ArrayMultiplyDoFn())
            | beam.Map(lambda x: x * 3))
    
>       assert_that(res, equal_to([6, 12, 12, 18, 18, 18]))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:371: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:13:46.116155389+00:00"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:13:46.116155389+00:00"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_error_message_includes_stage (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 0s]
Raw output
IndexError: tuple index out of range
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_error_message_includes_stage>

    def test_error_message_includes_stage(self):
      with self.assertRaises(BaseException) as e_cm:
        with self.create_pipeline() as p:
    
          def raise_error(x):
            raise RuntimeError(
                'This error is expected and does not indicate a test failure.')
    
          # pylint: disable=expression-not-assigned
          (
              p
              | beam.Create(['a', 'b'])
              | 'StageA' >> beam.Map(lambda x: x)
              | 'StageB' >> beam.Map(lambda x: x)
              | 'StageC' >> beam.Map(raise_error)
              | 'StageD' >> beam.Map(lambda x: x))
>     message = e_cm.exception.args[0]
E     IndexError: tuple index out of range

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1196: IndexError

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_error_traceback_includes_user_code (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 0s]
Raw output
AssertionError: 'second' not found in 'Traceback (most recent call last):\n  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py", line 1213, in test_error_traceback_includes_user_code\n    p | beam.Create([0]) | beam.Map(first)  # pylint: disable=expression-not-assigned\n  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py", line 646, in __exit__\n    self.result.wait_until_finish()\n  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py", line 569, in wait_until_finish\n    raise self._runtime_exception\n  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py", line 575, in _observe_state\n    for state_response in self._state_stream:\n  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py", line 543, in __next__\n    return self._next()\n  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py", line 969, in _next\n    raise self\ngrpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:\n\tstatus = StatusCode.DEADLINE_EXCEEDED\n\tdetails = "Deadline Exceeded"\n\tdebug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:15:48.486222892+00:00"}"\n>\n'
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_error_traceback_includes_user_code>

    def test_error_traceback_includes_user_code(self):
      def first(x):
        return second(x)
    
      def second(x):
        return third(x)
    
      def third(x):
        raise ValueError(
            'This error is expected and does not indicate a test failure.')
    
      try:
        with self.create_pipeline() as p:
          p | beam.Create([0]) | beam.Map(first)  # pylint: disable=expression-not-assigned
      except Exception:  # pylint: disable=broad-except
        message = traceback.format_exc()
      else:
        raise AssertionError('expected exception not raised')
    
      self.assertIn('first', message)
>     self.assertIn('second', message)
E     AssertionError: 'second' not found in 'Traceback (most recent call last):\n  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py", line 1213, in test_error_traceback_includes_user_code\n    p | beam.Create([0]) | beam.Map(first)  # pylint: disable=expression-not-assigned\n  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py", line 646, in __exit__\n    self.result.wait_until_finish()\n  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py", line 569, in wait_until_finish\n    raise self._runtime_exception\n  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py", line 575, in _observe_state\n    for state_response in self._state_stream:\n  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py", line 543, in __next__\n    return self._next()\n  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py", line 969, in _next\n    raise self\ngrpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:\n\tstatus = StatusCode.DEADLINE_EXCEEDED\n\tdetails = "Deadline Exceeded"\n\tdebug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:15:48.486222892+00:00"}"\n>\n'

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1220: AssertionError

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_flatmap_numpy_array (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 3s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:16:52.428227248+00:00"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_flatmap_numpy_array>

    def test_flatmap_numpy_array(self):
      with self.create_pipeline() as p:
        pc = (
            p
            | beam.Create([np.array(range(10))])
            | beam.FlatMap(lambda arr: arr))
    
>       assert_that(pc, equal_to([np.int64(i) for i in range(10)]))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:468: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:16:52.428227248+00:00"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:16:52.428227248+00:00"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_flatten (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 0s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:17:53.546926609+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_flatten>
with_transcoding = True

    def test_flatten(self, with_transcoding=True):
      with self.create_pipeline() as p:
        if with_transcoding:
          # Additional element which does not match with the first type
          additional = [ord('d')]
        else:
          additional = ['d']
        res = (
            p | 'a' >> beam.Create(['a']),
            p | 'bc' >> beam.Create(['b', 'c']),
            p | 'd' >> beam.Create(additional)) | beam.Flatten()
>       assert_that(res, equal_to(['a', 'b', 'c'] + additional))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1069: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {created_time:"2025-03-21T07:17:53.546926609+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:17:53.546926609+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_flatten_same_pcollections (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 0s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:18:54.492982458+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_flatten_same_pcollections>
with_transcoding = True

    def test_flatten_same_pcollections(self, with_transcoding=True):
      with self.create_pipeline() as p:
        pc = p | beam.Create(['a', 'b'])
>       assert_that((pc, pc, pc) | beam.Flatten(), equal_to(['a', 'b'] * 3))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1074: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {created_time:"2025-03-21T07:18:54.492982458+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:18:54.492982458+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_flattened_side_input (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 4s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:19:59.360281425+00:00"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_flattened_side_input>
with_transcoding = True

    def test_flattened_side_input(self, with_transcoding=True):
      with self.create_pipeline() as p:
        main = p | 'main' >> beam.Create([None])
        side1 = p | 'side1' >> beam.Create([('a', 1)])
        side2 = p | 'side2' >> beam.Create([('b', 2)])
        if with_transcoding:
          # Also test non-matching coder types (transcoding required)
          third_element = [('another_type')]
        else:
          third_element = [('b', 3)]
        side3 = p | 'side3' >> beam.Create(third_element)
        side = (side1, side2) | beam.Flatten()
        assert_that(
            main | beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side)),
            equal_to([(None, {
                'a': 1, 'b': 2
            })]),
            label='CheckFlattenAsSideInput')
>       assert_that((side, side3) | 'FlattenAfter' >> beam.Flatten(),
                    equal_to([('a', 1), ('b', 2)] + third_element),
                    label='CheckFlattenOfSideInput')

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:537: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:19:59.360281425+00:00"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:19:59.360281425+00:00"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_gbk_side_input (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 0s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:21:00.470022888+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_gbk_side_input>

    def test_gbk_side_input(self):
      with self.create_pipeline() as p:
        main = p | 'main' >> beam.Create([None])
        side = p | 'side' >> beam.Create([('a', 1)]) | beam.GroupByKey()
>       assert_that(
            main | beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side)),
            equal_to([(None, {
                'a': [1]
            })]))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:545: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {created_time:"2025-03-21T07:21:00.470022888+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:21:00.470022888+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_group_by_key (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 0s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:22:01.431916476+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_group_by_key>

    def test_group_by_key(self):
      with self.create_pipeline() as p:
        res = (
            p
            | beam.Create([('a', 1), ('a', 2), ('b', 3)])
            | beam.GroupByKey()
            | beam.Map(lambda k_vs: (k_vs[0], sorted(k_vs[1]))))
>       assert_that(res, equal_to([('a', [1, 2]), ('b', [3])]))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1050: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {created_time:"2025-03-21T07:22:01.431916476+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:22:01.431916476+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_group_by_key_with_empty_pcoll_elements (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 3s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:23:05.442091109+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_group_by_key_with_empty_pcoll_elements>

    def test_group_by_key_with_empty_pcoll_elements(self):
      with self.create_pipeline() as p:
        res = (
            p
            | beam.Create([('test_key', 'test_value')])
            | beam.Filter(lambda x: False)
            | beam.GroupByKey())
>       assert_that(res, equal_to([]))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1438: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {created_time:"2025-03-21T07:23:05.442091109+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:23:05.442091109+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_metrics (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 3s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:24:09.428083635+00:00"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_metrics>
check_gauge = True, check_bounded_trie = False

    def test_metrics(self, check_gauge=True, check_bounded_trie=False):
      p = self.create_pipeline()
    
      counter = beam.metrics.Metrics.counter('ns', 'counter')
      distribution = beam.metrics.Metrics.distribution('ns', 'distribution')
      gauge = beam.metrics.Metrics.gauge('ns', 'gauge')
      string_set = beam.metrics.Metrics.string_set('ns', 'string_set')
      bounded_trie = beam.metrics.Metrics.bounded_trie('ns', 'bounded_trie')
    
      elements = ['a', 'zzz']
      pcoll = p | beam.Create(elements)
      # pylint: disable=expression-not-assigned
      pcoll | 'count1' >> beam.FlatMap(lambda x: counter.inc())
      pcoll | 'count2' >> beam.FlatMap(lambda x: counter.inc(len(x)))
      pcoll | 'dist' >> beam.FlatMap(lambda x: distribution.update(len(x)))
      pcoll | 'gauge' >> beam.FlatMap(lambda x: gauge.set(3))
      pcoll | 'string_set' >> beam.FlatMap(lambda x: string_set.add(x))
      pcoll | 'bounded_trie' >> beam.FlatMap(lambda x: bounded_trie.add(tuple(x)))
    
      res = p.run()
>     res.wait_until_finish()

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1253: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:24:09.428083635+00:00"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:24:09.428083635+00:00"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_multimap_multiside_input (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 0s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:25:10.438231792+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_multimap_multiside_input>

    def test_multimap_multiside_input(self):
      # A test where two transforms in the same stage consume the same PCollection
      # twice as side input.
      with self.create_pipeline() as p:
        main = p | 'main' >> beam.Create(['a', 'b'])
        side = p | 'side' >> beam.Create([('a', 1), ('b', 2), ('a', 3)])
>       assert_that(
            main | 'first map' >> beam.Map(
                lambda k,
                d,
                l: (k, sorted(d[k]), sorted([e[1] for e in l])),
                beam.pvalue.AsMultiMap(side),
                beam.pvalue.AsList(side))
            | 'second map' >> beam.Map(
                lambda k,
                d,
                l: (k[0], sorted(d[k[0]]), sorted([e[1] for e in l])),
                beam.pvalue.AsMultiMap(side),
                beam.pvalue.AsList(side)),
            equal_to([('a', [1, 3], [1, 2, 3]), ('b', [2], [1, 2, 3])]))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:566: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {created_time:"2025-03-21T07:25:10.438231792+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:25:10.438231792+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_multimap_side_input (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 0s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:26:11.718657336+00:00"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_multimap_side_input>

    def test_multimap_side_input(self):
      with self.create_pipeline() as p:
        main = p | 'main' >> beam.Create(['a', 'b'])
        side = p | 'side' >> beam.Create([('a', 1), ('b', 2), ('a', 3)])
>       assert_that(
            main | beam.Map(
                lambda k, d: (k, sorted(d[k])), beam.pvalue.AsMultiMap(side)),
            equal_to([('a', [1, 3]), ('b', [2])]))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:555: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:26:11.718657336+00:00"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:26:11.718657336+00:00"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_multimap_side_input_type_coercion (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 0s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:27:12.829563478+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_multimap_side_input_type_coercion>

    def test_multimap_side_input_type_coercion(self):
      with self.create_pipeline() as p:
        main = p | 'main' >> beam.Create(['a', 'b'])
        # The type of this side-input is forced to Any (overriding type
        # inference). Without type coercion to Tuple[Any, Any], the usage of this
        # side-input in AsMultiMap() below should fail.
        side = (
            p | 'side' >> beam.Create([('a', 1), ('b', 2),
                                       ('a', 3)]).with_output_types(typing.Any))
>       assert_that(
            main | beam.Map(
                lambda k, d: (k, sorted(d[k])), beam.pvalue.AsMultiMap(side)),
            equal_to([('a', [1, 3]), ('b', [2])]))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:590: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {created_time:"2025-03-21T07:27:12.829563478+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:27:12.829563478+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_no_subtransform_composite (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 0s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:28:13.841530617+00:00"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_no_subtransform_composite>

    def test_no_subtransform_composite(self):
      class First(beam.PTransform):
        def expand(self, pcolls):
          return pcolls[0]
    
      with self.create_pipeline() as p:
        pcoll_a = p | 'a' >> beam.Create(['a'])
        pcoll_b = p | 'b' >> beam.Create(['b'])
>       assert_that((pcoll_a, pcoll_b) | First(), equal_to(['a']))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1231: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:28:13.841530617+00:00"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:28:13.841530617+00:00"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_pack_combiners (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 4s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:29:18.113721567+00:00"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_pack_combiners>

    def test_pack_combiners(self):
>     self._test_pack_combiners(assert_using_counter_names=True)

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1429: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1409: in _test_pack_combiners
    _ = p | beam.Create([10, 20, 30]) | PackableCombines()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:29:18.113721567+00:00"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:29:18.113721567+00:00"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_pardo (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 3s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:30:22.198013312+00:00"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_pardo>

    def test_pardo(self):
      with self.create_pipeline() as p:
        res = (
            p
            | beam.Create(['a', 'bc'])
            | beam.Map(lambda e: e * 2)
            | beam.Map(lambda e: e + 'x'))
>       assert_that(res, equal_to(['aax', 'bcbcx']))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:127: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:30:22.198013312+00:00"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:30:22.198013312+00:00"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_pardo_dynamic_timer (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 4s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:31:27.087066434+00:00"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_pardo_dynamic_timer>

    def test_pardo_dynamic_timer(self):
      class DynamicTimerDoFn(beam.DoFn):
        dynamic_timer_spec = userstate.TimerSpec(
            'dynamic_timer', userstate.TimeDomain.WATERMARK)
    
        def process(
            self, element,
            dynamic_timer=beam.DoFn.TimerParam(dynamic_timer_spec)):
          dynamic_timer.set(element[1], dynamic_timer_tag=element[0])
    
        @userstate.on_timer(dynamic_timer_spec)
        def dynamic_timer_callback(
            self,
            tag=beam.DoFn.DynamicTimerTagParam,
            timestamp=beam.DoFn.TimestampParam):
          yield (tag, timestamp)
    
      with self.create_pipeline() as p:
        actual = (
            p
            | beam.Create([('key1', 10), ('key2', 20), ('key3', 30)])
            | beam.ParDo(DynamicTimerDoFn()))
>       assert_that(actual, equal_to([('key1', 10), ('key2', 20), ('key3', 30)]))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:842: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:31:27.087066434+00:00"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:31:27.087066434+00:00"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_pardo_side_and_main_outputs (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 1s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:32:28.850876193+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_pardo_side_and_main_outputs>

    def test_pardo_side_and_main_outputs(self):
      def even_odd(elem):
        yield elem
        yield beam.pvalue.TaggedOutput('odd' if elem % 2 else 'even', elem)
    
      with self.create_pipeline() as p:
        ints = p | beam.Create([1, 2, 3])
        named = ints | 'named' >> beam.FlatMap(even_odd).with_outputs(
            'even', 'odd', main='all')
        assert_that(named.all, equal_to([1, 2, 3]), label='named.all')
        assert_that(named.even, equal_to([2]), label='named.even')
        assert_that(named.odd, equal_to([1, 3]), label='named.odd')
    
        unnamed = ints | 'unnamed' >> beam.FlatMap(even_odd).with_outputs()
        unnamed[None] | beam.Map(id)  # pylint: disable=expression-not-assigned
        assert_that(unnamed[None], equal_to([1, 2, 3]), label='unnamed.all')
        assert_that(unnamed.even, equal_to([2]), label='unnamed.even')
>       assert_that(unnamed.odd, equal_to([1, 3]), label='unnamed.odd')

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:421: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {created_time:"2025-03-21T07:32:28.850876193+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:32:28.850876193+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous