Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix MongoDB connector by adding codec options #34365

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from

Merge upstream/master into my branch

571bf9d
Select commit
Loading
Failed to load commit list.
Sign in for the full log view
Draft

Fix MongoDB connector by adding codec options #34365

Merge upstream/master into my branch
571bf9d
Select commit
Loading
Failed to load commit list.
GitHub Actions / Test Results failed Mar 21, 2025 in 0s

27 fail, 613 skipped, 1 420 pass in 36m 16s

    4 files  +    2      4 suites  +2   36m 16s ⏱️ + 31m 22s
2 060 tests +1 656  1 420 ✅ +1 106    613 💤 +  524  27 ❌ +26 
4 122 runs  +3 712  2 669 ✅ +2 355  1 426 💤 +1 331  27 ❌ +26 

Results for commit 571bf9d. ± Comparison against earlier commit b591f83.

Annotations

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_callbacks_with_exception (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 1s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:07:32.796359674+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_callbacks_with_exception>

    def test_callbacks_with_exception(self):
      elements_list = ['1', '2']
    
      def raise_expetion():
        raise Exception('raise exception when calling callback')
    
      class FinalizebleDoFnWithException(beam.DoFn):
        def process(
            self, element, bundle_finalizer=beam.DoFn.BundleFinalizerParam):
          bundle_finalizer.register(raise_expetion)
          yield element
    
      with self.create_pipeline() as p:
        res = (
            p
            | beam.Create(elements_list)
            | beam.ParDo(FinalizebleDoFnWithException()))
>       assert_that(res, equal_to(['1', '2']))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1299: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {created_time:"2025-03-21T07:07:32.796359674+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:07:32.796359674+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_combine_per_key (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 1s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:08:34.4091421+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_combine_per_key>

    def test_combine_per_key(self):
      with self.create_pipeline() as p:
        res = (
            p
            | beam.Create([('a', 1), ('a', 2), ('b', 3)])
            | beam.CombinePerKey(beam.combiners.MeanCombineFn()))
>       assert_that(res, equal_to([('a', 1.5), ('b', 3.0)]))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1082: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...eived from peer  {created_time:"2025-03-21T07:08:34.4091421+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:08:34.4091421+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_create (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 2s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:09:38.112143778+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_create>

    def test_create(self):
      with self.create_pipeline() as p:
>       assert_that(p | beam.Create(['a', 'b']), equal_to(['a', 'b']))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:118: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {created_time:"2025-03-21T07:09:38.112143778+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:09:38.112143778+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_create_value_provider_pipeline_option (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 1s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:10:39.802589745+00:00"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_create_value_provider_pipeline_option>

    def test_create_value_provider_pipeline_option(self):
      # Verify that the runner can execute a pipeline when there are value
      # provider pipeline options
      # pylint: disable=unused-variable
      class FooOptions(PipelineOptions):
        @classmethod
        def _add_argparse_args(cls, parser):
          parser.add_value_provider_argument(
              "--foo", help='a value provider argument', default="bar")
    
      RuntimeValueProvider.set_runtime_options({})
    
      with self.create_pipeline() as p:
>       assert_that(p | beam.Create(['a', 'b']), equal_to(['a', 'b']))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1381: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:10:39.802589745+00:00"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:10:39.802589745+00:00"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_custom_merging_window (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 0s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:11:40.911893001+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_custom_merging_window>

    def test_custom_merging_window(self):
      with self.create_pipeline() as p:
        res = (
            p
            | beam.Create([1, 2, 100, 101, 102])
            | beam.Map(lambda t: window.TimestampedValue(('k', t), t))
            | beam.WindowInto(CustomMergingWindowFn())
            | beam.GroupByKey()
            | beam.Map(lambda k_vs1: (k_vs1[0], sorted(k_vs1[1]))))
>       assert_that(
            res, equal_to([('k', [1]), ('k', [101]), ('k', [2, 100, 102])]))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1138: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {created_time:"2025-03-21T07:11:40.911893001+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:11:40.911893001+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_custom_window_type (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 3s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:12:45.011207761+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_custom_window_type>

    def test_custom_window_type(self):
      with self.create_pipeline() as p:
        res = (
            p
            | beam.Create([1, 2, 100, 101, 102])
            | beam.Map(lambda t: window.TimestampedValue(('k', t), t))
            | beam.WindowInto(EvenOddWindows())
            | beam.GroupByKey()
            | beam.Map(lambda k_vs1: (k_vs1[0], sorted(k_vs1[1]))))
>       assert_that(
            res,
            equal_to([('k', [1]), ('k', [2]), ('k', [101]), ('k', [100, 102])]))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1153: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {created_time:"2025-03-21T07:12:45.011207761+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:12:45.011207761+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_element_to_batch_pardo (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 0s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:13:46.116155389+00:00"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_element_to_batch_pardo>

    def test_element_to_batch_pardo(self):
      class ArrayProduceDoFn(beam.DoFn):
        @beam.DoFn.yields_batches
        def process(self, element: np.int64, *unused_args,
                    **unused_kwargs) -> Iterator[np.ndarray]:
          yield np.array([element] * int(element))
    
        # infer_output_type must be defined (when there's no process method),
        # otherwise we don't know the input type is the same as output type.
        def infer_output_type(self, input_type):
          return np.int64
    
      with self.create_pipeline() as p:
        res = (
            p
            | beam.Create(np.array([1, 2, 3], dtype=np.int64)).with_output_types(
                np.int64)
            | beam.ParDo(ArrayProduceDoFn())
            | beam.ParDo(ArrayMultiplyDoFn())
            | beam.Map(lambda x: x * 3))
    
>       assert_that(res, equal_to([6, 12, 12, 18, 18, 18]))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:371: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:13:46.116155389+00:00"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:13:46.116155389+00:00"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_error_message_includes_stage (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 0s]
Raw output
IndexError: tuple index out of range
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_error_message_includes_stage>

    def test_error_message_includes_stage(self):
      with self.assertRaises(BaseException) as e_cm:
        with self.create_pipeline() as p:
    
          def raise_error(x):
            raise RuntimeError(
                'This error is expected and does not indicate a test failure.')
    
          # pylint: disable=expression-not-assigned
          (
              p
              | beam.Create(['a', 'b'])
              | 'StageA' >> beam.Map(lambda x: x)
              | 'StageB' >> beam.Map(lambda x: x)
              | 'StageC' >> beam.Map(raise_error)
              | 'StageD' >> beam.Map(lambda x: x))
>     message = e_cm.exception.args[0]
E     IndexError: tuple index out of range

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1196: IndexError

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_error_traceback_includes_user_code (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 0s]
Raw output
AssertionError: 'second' not found in 'Traceback (most recent call last):\n  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py", line 1213, in test_error_traceback_includes_user_code\n    p | beam.Create([0]) | beam.Map(first)  # pylint: disable=expression-not-assigned\n  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py", line 646, in __exit__\n    self.result.wait_until_finish()\n  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py", line 569, in wait_until_finish\n    raise self._runtime_exception\n  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py", line 575, in _observe_state\n    for state_response in self._state_stream:\n  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py", line 543, in __next__\n    return self._next()\n  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py", line 969, in _next\n    raise self\ngrpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:\n\tstatus = StatusCode.DEADLINE_EXCEEDED\n\tdetails = "Deadline Exceeded"\n\tdebug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:15:48.486222892+00:00"}"\n>\n'
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_error_traceback_includes_user_code>

    def test_error_traceback_includes_user_code(self):
      def first(x):
        return second(x)
    
      def second(x):
        return third(x)
    
      def third(x):
        raise ValueError(
            'This error is expected and does not indicate a test failure.')
    
      try:
        with self.create_pipeline() as p:
          p | beam.Create([0]) | beam.Map(first)  # pylint: disable=expression-not-assigned
      except Exception:  # pylint: disable=broad-except
        message = traceback.format_exc()
      else:
        raise AssertionError('expected exception not raised')
    
      self.assertIn('first', message)
>     self.assertIn('second', message)
E     AssertionError: 'second' not found in 'Traceback (most recent call last):\n  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py", line 1213, in test_error_traceback_includes_user_code\n    p | beam.Create([0]) | beam.Map(first)  # pylint: disable=expression-not-assigned\n  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py", line 646, in __exit__\n    self.result.wait_until_finish()\n  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py", line 569, in wait_until_finish\n    raise self._runtime_exception\n  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py", line 575, in _observe_state\n    for state_response in self._state_stream:\n  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py", line 543, in __next__\n    return self._next()\n  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py", line 969, in _next\n    raise self\ngrpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:\n\tstatus = StatusCode.DEADLINE_EXCEEDED\n\tdetails = "Deadline Exceeded"\n\tdebug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:15:48.486222892+00:00"}"\n>\n'

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1220: AssertionError

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_flatmap_numpy_array (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 3s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:16:52.428227248+00:00"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_flatmap_numpy_array>

    def test_flatmap_numpy_array(self):
      with self.create_pipeline() as p:
        pc = (
            p
            | beam.Create([np.array(range(10))])
            | beam.FlatMap(lambda arr: arr))
    
>       assert_that(pc, equal_to([np.int64(i) for i in range(10)]))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:468: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:16:52.428227248+00:00"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:16:52.428227248+00:00"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_flatten (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 0s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:17:53.546926609+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_flatten>
with_transcoding = True

    def test_flatten(self, with_transcoding=True):
      with self.create_pipeline() as p:
        if with_transcoding:
          # Additional element which does not match with the first type
          additional = [ord('d')]
        else:
          additional = ['d']
        res = (
            p | 'a' >> beam.Create(['a']),
            p | 'bc' >> beam.Create(['b', 'c']),
            p | 'd' >> beam.Create(additional)) | beam.Flatten()
>       assert_that(res, equal_to(['a', 'b', 'c'] + additional))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1069: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {created_time:"2025-03-21T07:17:53.546926609+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:17:53.546926609+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_flatten_same_pcollections (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 0s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:18:54.492982458+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_flatten_same_pcollections>
with_transcoding = True

    def test_flatten_same_pcollections(self, with_transcoding=True):
      with self.create_pipeline() as p:
        pc = p | beam.Create(['a', 'b'])
>       assert_that((pc, pc, pc) | beam.Flatten(), equal_to(['a', 'b'] * 3))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1074: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {created_time:"2025-03-21T07:18:54.492982458+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:18:54.492982458+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_flattened_side_input (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 4s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:19:59.360281425+00:00"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_flattened_side_input>
with_transcoding = True

    def test_flattened_side_input(self, with_transcoding=True):
      with self.create_pipeline() as p:
        main = p | 'main' >> beam.Create([None])
        side1 = p | 'side1' >> beam.Create([('a', 1)])
        side2 = p | 'side2' >> beam.Create([('b', 2)])
        if with_transcoding:
          # Also test non-matching coder types (transcoding required)
          third_element = [('another_type')]
        else:
          third_element = [('b', 3)]
        side3 = p | 'side3' >> beam.Create(third_element)
        side = (side1, side2) | beam.Flatten()
        assert_that(
            main | beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side)),
            equal_to([(None, {
                'a': 1, 'b': 2
            })]),
            label='CheckFlattenAsSideInput')
>       assert_that((side, side3) | 'FlattenAfter' >> beam.Flatten(),
                    equal_to([('a', 1), ('b', 2)] + third_element),
                    label='CheckFlattenOfSideInput')

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:537: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:19:59.360281425+00:00"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:19:59.360281425+00:00"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_gbk_side_input (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 0s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:21:00.470022888+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_gbk_side_input>

    def test_gbk_side_input(self):
      with self.create_pipeline() as p:
        main = p | 'main' >> beam.Create([None])
        side = p | 'side' >> beam.Create([('a', 1)]) | beam.GroupByKey()
>       assert_that(
            main | beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side)),
            equal_to([(None, {
                'a': [1]
            })]))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:545: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {created_time:"2025-03-21T07:21:00.470022888+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:21:00.470022888+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_group_by_key (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 0s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:22:01.431916476+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_group_by_key>

    def test_group_by_key(self):
      with self.create_pipeline() as p:
        res = (
            p
            | beam.Create([('a', 1), ('a', 2), ('b', 3)])
            | beam.GroupByKey()
            | beam.Map(lambda k_vs: (k_vs[0], sorted(k_vs[1]))))
>       assert_that(res, equal_to([('a', [1, 2]), ('b', [3])]))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1050: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {created_time:"2025-03-21T07:22:01.431916476+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:22:01.431916476+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_group_by_key_with_empty_pcoll_elements (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 3s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:23:05.442091109+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_group_by_key_with_empty_pcoll_elements>

    def test_group_by_key_with_empty_pcoll_elements(self):
      with self.create_pipeline() as p:
        res = (
            p
            | beam.Create([('test_key', 'test_value')])
            | beam.Filter(lambda x: False)
            | beam.GroupByKey())
>       assert_that(res, equal_to([]))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1438: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {created_time:"2025-03-21T07:23:05.442091109+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:23:05.442091109+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_metrics (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 3s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:24:09.428083635+00:00"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_metrics>
check_gauge = True, check_bounded_trie = False

    def test_metrics(self, check_gauge=True, check_bounded_trie=False):
      p = self.create_pipeline()
    
      counter = beam.metrics.Metrics.counter('ns', 'counter')
      distribution = beam.metrics.Metrics.distribution('ns', 'distribution')
      gauge = beam.metrics.Metrics.gauge('ns', 'gauge')
      string_set = beam.metrics.Metrics.string_set('ns', 'string_set')
      bounded_trie = beam.metrics.Metrics.bounded_trie('ns', 'bounded_trie')
    
      elements = ['a', 'zzz']
      pcoll = p | beam.Create(elements)
      # pylint: disable=expression-not-assigned
      pcoll | 'count1' >> beam.FlatMap(lambda x: counter.inc())
      pcoll | 'count2' >> beam.FlatMap(lambda x: counter.inc(len(x)))
      pcoll | 'dist' >> beam.FlatMap(lambda x: distribution.update(len(x)))
      pcoll | 'gauge' >> beam.FlatMap(lambda x: gauge.set(3))
      pcoll | 'string_set' >> beam.FlatMap(lambda x: string_set.add(x))
      pcoll | 'bounded_trie' >> beam.FlatMap(lambda x: bounded_trie.add(tuple(x)))
    
      res = p.run()
>     res.wait_until_finish()

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1253: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:24:09.428083635+00:00"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:24:09.428083635+00:00"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_multimap_multiside_input (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 0s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:25:10.438231792+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_multimap_multiside_input>

    def test_multimap_multiside_input(self):
      # A test where two transforms in the same stage consume the same PCollection
      # twice as side input.
      with self.create_pipeline() as p:
        main = p | 'main' >> beam.Create(['a', 'b'])
        side = p | 'side' >> beam.Create([('a', 1), ('b', 2), ('a', 3)])
>       assert_that(
            main | 'first map' >> beam.Map(
                lambda k,
                d,
                l: (k, sorted(d[k]), sorted([e[1] for e in l])),
                beam.pvalue.AsMultiMap(side),
                beam.pvalue.AsList(side))
            | 'second map' >> beam.Map(
                lambda k,
                d,
                l: (k[0], sorted(d[k[0]]), sorted([e[1] for e in l])),
                beam.pvalue.AsMultiMap(side),
                beam.pvalue.AsList(side)),
            equal_to([('a', [1, 3], [1, 2, 3]), ('b', [2], [1, 2, 3])]))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:566: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {created_time:"2025-03-21T07:25:10.438231792+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:25:10.438231792+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_multimap_side_input (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 0s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:26:11.718657336+00:00"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_multimap_side_input>

    def test_multimap_side_input(self):
      with self.create_pipeline() as p:
        main = p | 'main' >> beam.Create(['a', 'b'])
        side = p | 'side' >> beam.Create([('a', 1), ('b', 2), ('a', 3)])
>       assert_that(
            main | beam.Map(
                lambda k, d: (k, sorted(d[k])), beam.pvalue.AsMultiMap(side)),
            equal_to([('a', [1, 3]), ('b', [2])]))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:555: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:26:11.718657336+00:00"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:26:11.718657336+00:00"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_multimap_side_input_type_coercion (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 0s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:27:12.829563478+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_multimap_side_input_type_coercion>

    def test_multimap_side_input_type_coercion(self):
      with self.create_pipeline() as p:
        main = p | 'main' >> beam.Create(['a', 'b'])
        # The type of this side-input is forced to Any (overriding type
        # inference). Without type coercion to Tuple[Any, Any], the usage of this
        # side-input in AsMultiMap() below should fail.
        side = (
            p | 'side' >> beam.Create([('a', 1), ('b', 2),
                                       ('a', 3)]).with_output_types(typing.Any))
>       assert_that(
            main | beam.Map(
                lambda k, d: (k, sorted(d[k])), beam.pvalue.AsMultiMap(side)),
            equal_to([('a', [1, 3]), ('b', [2])]))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:590: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {created_time:"2025-03-21T07:27:12.829563478+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:27:12.829563478+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_no_subtransform_composite (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 0s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:28:13.841530617+00:00"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_no_subtransform_composite>

    def test_no_subtransform_composite(self):
      class First(beam.PTransform):
        def expand(self, pcolls):
          return pcolls[0]
    
      with self.create_pipeline() as p:
        pcoll_a = p | 'a' >> beam.Create(['a'])
        pcoll_b = p | 'b' >> beam.Create(['b'])
>       assert_that((pcoll_a, pcoll_b) | First(), equal_to(['a']))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1231: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:28:13.841530617+00:00"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:28:13.841530617+00:00"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_pack_combiners (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 4s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:29:18.113721567+00:00"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_pack_combiners>

    def test_pack_combiners(self):
>     self._test_pack_combiners(assert_using_counter_names=True)

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1429: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:1409: in _test_pack_combiners
    _ = p | beam.Create([10, 20, 30]) | PackableCombines()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:29:18.113721567+00:00"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:29:18.113721567+00:00"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_pardo (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 3s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:30:22.198013312+00:00"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_pardo>

    def test_pardo(self):
      with self.create_pipeline() as p:
        res = (
            p
            | beam.Create(['a', 'bc'])
            | beam.Map(lambda e: e * 2)
            | beam.Map(lambda e: e + 'x'))
>       assert_that(res, equal_to(['aax', 'bcbcx']))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:127: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:30:22.198013312+00:00"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:30:22.198013312+00:00"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_pardo_dynamic_timer (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 4s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:31:27.087066434+00:00"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_pardo_dynamic_timer>

    def test_pardo_dynamic_timer(self):
      class DynamicTimerDoFn(beam.DoFn):
        dynamic_timer_spec = userstate.TimerSpec(
            'dynamic_timer', userstate.TimeDomain.WATERMARK)
    
        def process(
            self, element,
            dynamic_timer=beam.DoFn.TimerParam(dynamic_timer_spec)):
          dynamic_timer.set(element[1], dynamic_timer_tag=element[0])
    
        @userstate.on_timer(dynamic_timer_spec)
        def dynamic_timer_callback(
            self,
            tag=beam.DoFn.DynamicTimerTagParam,
            timestamp=beam.DoFn.TimestampParam):
          yield (tag, timestamp)
    
      with self.create_pipeline() as p:
        actual = (
            p
            | beam.Create([('key1', 10), ('key2', 20), ('key3', 30)])
            | beam.ParDo(DynamicTimerDoFn()))
>       assert_that(actual, equal_to([('key1', 10), ('key2', 20), ('key3', 30)]))

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:842: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:31:27.087066434+00:00"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-03-21T07:31:27.087066434+00:00"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses

See this annotation in the file changed.

@github-actions github-actions / Test Results

1 out of 2 runs failed: test_pardo_side_and_main_outputs (apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses)

sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 1m 1s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:32:28.850876193+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>
self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_pardo_side_and_main_outputs>

    def test_pardo_side_and_main_outputs(self):
      def even_odd(elem):
        yield elem
        yield beam.pvalue.TaggedOutput('odd' if elem % 2 else 'even', elem)
    
      with self.create_pipeline() as p:
        ints = p | beam.Create([1, 2, 3])
        named = ints | 'named' >> beam.FlatMap(even_odd).with_outputs(
            'even', 'odd', main='all')
        assert_that(named.all, equal_to([1, 2, 3]), label='named.all')
        assert_that(named.even, equal_to([2]), label='named.even')
        assert_that(named.odd, equal_to([1, 3]), label='named.odd')
    
        unnamed = ints | 'unnamed' >> beam.FlatMap(even_odd).with_outputs()
        unnamed[None] | beam.Map(id)  # pylint: disable=expression-not-assigned
        assert_that(unnamed[None], equal_to([1, 2, 3]), label='unnamed.all')
        assert_that(unnamed.even, equal_to([2]), label='unnamed.even')
>       assert_that(unnamed.odd, equal_to([1, 3]), label='unnamed.odd')

target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py:421: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result.wait_until_finish()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:569: in wait_until_finish
    raise self._runtime_exception
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:575: in _observe_state
    for state_response in self._state_stream:
target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {created_time:"2025-03-21T07:32:28.850876193+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-03-21T07:32:28.850876193+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
E                   >

target/.tox-py39/py39/lib/python3.9/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous