Enable prism by default (where supported) #26666
59 fail, 222 skipped, 3 624 pass in 34m 15s
Annotations
Check warning on line 0 in apache_beam.io.textio_test.TextSourceTest
github-actions / Test Results
All 2 runs failed: test_read_all_continuously_new (apache_beam.io.textio_test.TextSourceTest)
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloud.xml [took 16s]
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 12s]
Raw output
apache_beam.testing.util.BeamAssertException: Failed assert: [('file1', 'first'), ('file2', 'first')] == [('file1', 'second A'), ('file1', 'second B'), ('file2', 'first')], unexpected elements [('file1', 'second A'), ('file1', 'second B')], missing elements [('file1', 'first')] [while running 'assert read new files results/Match']
> return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:1495:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/runners/common.py:911: in apache_beam.runners.common.PerWindowInvoker.invoke_process
self._invoke_process_per_window(
apache_beam/runners/common.py:1055: in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
self.process_method(*args_for_process, **kwargs_for_process),
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/transforms/core.py:2084: in <lambda>
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
actual = [('file1', 'second A'), ('file1', 'second B'), ('file2', 'first')]
equals_fn = <function equal_to.<locals>._equal.<locals>.<lambda> at 0x7a8935534c10>
def _equal(actual, equals_fn=equals_fn):
expected_list = list(expected)
# Try to compare actual and expected by sorting. This fails with a
# TypeError in Python 3 if different types are present in the same
# collection. It can also raise false negatives for types that don't have
# a deterministic sort order, like pyarrow Tables as of 0.14.1
if not equals_fn:
equals_fn = lambda e, a: e == a
try:
sorted_expected = sorted(expected)
sorted_actual = sorted(actual)
if sorted_expected == sorted_actual:
return
except TypeError:
pass
# Slower method, used in two cases:
# 1) If sorted expected != actual, use this method to verify the inequality.
# This ensures we don't raise any false negatives for types that don't
# have a deterministic sort order.
# 2) As a fallback if we encounter a TypeError in python 3. this method
# works on collections that have different types.
unexpected = []
for element in actual:
found = False
for i, v in enumerate(expected_list):
if equals_fn(v, element):
found = True
expected_list.pop(i)
break
if not found:
unexpected.append(element)
if unexpected or expected_list:
msg = 'Failed assert: %r == %r' % (expected, actual)
if unexpected:
msg = msg + ', unexpected elements %r' % unexpected
if expected_list:
msg = msg + ', missing elements %r' % expected_list
> raise BeamAssertException(msg)
E apache_beam.testing.util.BeamAssertException: Failed assert: [('file1', 'first'), ('file2', 'first')] == [('file1', 'second A'), ('file1', 'second B'), ('file2', 'first')], unexpected elements [('file1', 'second A'), ('file1', 'second B')], missing elements [('file1', 'first')]
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/testing/util.py:192: BeamAssertException
During handling of the above exception, another exception occurred:
self = <apache_beam.io.textio_test.TextSourceTest testMethod=test_read_all_continuously_new>
def test_read_all_continuously_new(self):
with TempDir() as tempdir, TestPipeline() as pipeline:
temp_path = tempdir.get_path()
# create a temp file at the beginning
with open(FileSystems.join(temp_path, 'file1'), 'w') as f:
f.write('first')
match_pattern = FileSystems.join(temp_path, '*')
interval = 0.5
last = 2
p_read_once = (
pipeline
| 'Continuously read new files' >> ReadAllFromTextContinuously(
match_pattern,
with_filename=True,
start_timestamp=Timestamp.now(),
interval=interval,
stop_timestamp=Timestamp.now() + last,
match_updated_files=False)
| 'add dumb key' >> beam.Map(lambda x: (0, x))
|
'Write files on-the-fly' >> beam.ParDo(self._WriteFilesFn(temp_path)))
> assert_that(
p_read_once,
equal_to([('file1', 'first'), ('file2', 'first')]),
label='assert read new files results')
apache_beam/io/textio_test.py:635:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:644: in __exit__
self.result = self.run()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/testing/test_pipeline.py:115: in run
result = super().run(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:591: in run
return Pipeline.from_runner_api(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:618: in run
return self.runner.run_pipeline(self, self._options)
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/direct/direct_runner.py:190: in run_pipeline
return runner.run_pipeline(pipeline, options)
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:196: in run_pipeline
self._latest_run_result = self.run_via_runner_api(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:223: in run_via_runner_api
return self.run_stages(stage_context, stages)
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:470: in run_stages
bundle_results = self._execute_bundle(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:795: in _execute_bundle
self._run_bundle(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1034: in _run_bundle
result, splits = bundle_manager.process_bundle(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1360: in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py:386: in push
response = self.worker.do_instruction(request)
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py:658: in do_instruction
return getattr(self, request_type)(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py:696: in process_bundle
bundle_processor.process_bundle(instruction_id))
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py:1274: in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py:237: in process_encoded
self.output(decoded_value)
apache_beam/runners/worker/operations.py:566: in apache_beam.runners.worker.operations.Operation.output
def output(self, windowed_value, output_index=0):
apache_beam/runners/worker/operations.py:568: in apache_beam.runners.worker.operations.Operation.output
_cast_to_receiver(self.receivers[output_index]).receive(windowed_value)
apache_beam/runners/worker/operations.py:259: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
def receive(self, windowed_value):
apache_beam/runners/worker/operations.py:262: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:949: in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
apache_beam/runners/worker/operations.py:950: in apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in apache_beam.runners.common._OutputHandler.handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in apache_beam.runners.common._OutputHandler._write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:949: in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
apache_beam/runners/worker/operations.py:950: in apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in apache_beam.runners.common._OutputHandler.handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in apache_beam.runners.common._OutputHandler._write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:949: in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
apache_beam/runners/worker/operations.py:950: in apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in apache_beam.runners.common._OutputHandler.handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in apache_beam.runners.common._OutputHandler._write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:949: in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
apache_beam/runners/worker/operations.py:950: in apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1606: in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise new_exn
apache_beam/runners/common.py:1495: in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:911: in apache_beam.runners.common.PerWindowInvoker.invoke_process
self._invoke_process_per_window(
apache_beam/runners/common.py:1055: in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
self.process_method(*args_for_process, **kwargs_for_process),
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/transforms/core.py:2084: in <lambda>
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
actual = [('file1', 'second A'), ('file1', 'second B'), ('file2', 'first')]
equals_fn = <function equal_to.<locals>._equal.<locals>.<lambda> at 0x7a8935534c10>
def _equal(actual, equals_fn=equals_fn):
expected_list = list(expected)
# Try to compare actual and expected by sorting. This fails with a
# TypeError in Python 3 if different types are present in the same
# collection. It can also raise false negatives for types that don't have
# a deterministic sort order, like pyarrow Tables as of 0.14.1
if not equals_fn:
equals_fn = lambda e, a: e == a
try:
sorted_expected = sorted(expected)
sorted_actual = sorted(actual)
if sorted_expected == sorted_actual:
return
except TypeError:
pass
# Slower method, used in two cases:
# 1) If sorted expected != actual, use this method to verify the inequality.
# This ensures we don't raise any false negatives for types that don't
# have a deterministic sort order.
# 2) As a fallback if we encounter a TypeError in python 3. this method
# works on collections that have different types.
unexpected = []
for element in actual:
found = False
for i, v in enumerate(expected_list):
if equals_fn(v, element):
found = True
expected_list.pop(i)
break
if not found:
unexpected.append(element)
if unexpected or expected_list:
msg = 'Failed assert: %r == %r' % (expected, actual)
if unexpected:
msg = msg + ', unexpected elements %r' % unexpected
if expected_list:
msg = msg + ', missing elements %r' % expected_list
> raise BeamAssertException(msg)
E apache_beam.testing.util.BeamAssertException: Failed assert: [('file1', 'first'), ('file2', 'first')] == [('file1', 'second A'), ('file1', 'second B'), ('file2', 'first')], unexpected elements [('file1', 'second A'), ('file1', 'second B')], missing elements [('file1', 'first')] [while running 'assert read new files results/Match']
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/testing/util.py:192: BeamAssertException
Check warning on line 0 in apache_beam.io.avroio_test.TestFastAvro
github-actions / Test Results
All 2 runs failed: test_read_all_continuously_new (apache_beam.io.avroio_test.TestFastAvro)
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloud.xml [took 19s]
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 10s]
Raw output
apache_beam.testing.util.BeamAssertException: Failed assert: [('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})] == [('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})], unexpected elements [('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'})] [while running 'assert read new files results/Match']
> return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:1495:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/runners/common.py:911: in apache_beam.runners.common.PerWindowInvoker.invoke_process
self._invoke_process_per_window(
apache_beam/runners/common.py:1055: in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
self.process_method(*args_for_process, **kwargs_for_process),
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/transforms/core.py:2084: in <lambda>
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
actual = [('file1', {'favorite_color': 'blue', 'favorite_number': 1, 'name': 'Thomas'}), ('file1', {'favorite_color': 'green', ... 'favorite_number': 3, 'name': 'Henry'}), ('file2', {'favorite_color': 'brown', 'favorite_number': 7, 'name': 'Toby'})]
equals_fn = <function equal_to.<locals>._equal.<locals>.<lambda> at 0x7ea1d2789700>
def _equal(actual, equals_fn=equals_fn):
expected_list = list(expected)
# Try to compare actual and expected by sorting. This fails with a
# TypeError in Python 3 if different types are present in the same
# collection. It can also raise false negatives for types that don't have
# a deterministic sort order, like pyarrow Tables as of 0.14.1
if not equals_fn:
equals_fn = lambda e, a: e == a
try:
sorted_expected = sorted(expected)
sorted_actual = sorted(actual)
if sorted_expected == sorted_actual:
return
except TypeError:
pass
# Slower method, used in two cases:
# 1) If sorted expected != actual, use this method to verify the inequality.
# This ensures we don't raise any false negatives for types that don't
# have a deterministic sort order.
# 2) As a fallback if we encounter a TypeError in python 3. this method
# works on collections that have different types.
unexpected = []
for element in actual:
found = False
for i, v in enumerate(expected_list):
if equals_fn(v, element):
found = True
expected_list.pop(i)
break
if not found:
unexpected.append(element)
if unexpected or expected_list:
msg = 'Failed assert: %r == %r' % (expected, actual)
if unexpected:
msg = msg + ', unexpected elements %r' % unexpected
if expected_list:
msg = msg + ', missing elements %r' % expected_list
> raise BeamAssertException(msg)
E apache_beam.testing.util.BeamAssertException: Failed assert: [('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})] == [('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})], unexpected elements [('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'})]
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/testing/util.py:192: BeamAssertException
During handling of the above exception, another exception occurred:
self = <apache_beam.io.avroio_test.TestFastAvro testMethod=test_read_all_continuously_new>
def test_read_all_continuously_new(self):
with TestPipeline() as pipeline:
tempdir = tempfile.mkdtemp()
writer_fn = self._WriteFilesFn(self.SCHEMA, self.RECORDS, tempdir)
with open(FileSystems.join(tempdir, 'file1'), 'wb') as f:
writer(f, writer_fn.SCHEMA, writer_fn.gen_records(1))
match_pattern = FileSystems.join(tempdir, '*')
interval = 0.5
last = 2
p_read_once = (
pipeline
| 'Continuously read new files' >> avroio.ReadAllFromAvroContinuously(
match_pattern,
with_filename=True,
start_timestamp=Timestamp.now(),
interval=interval,
stop_timestamp=Timestamp.now() + last,
match_updated_files=False)
| 'add dumb key' >> beam.Map(lambda x: (0, x))
| 'Write files on-the-fly' >> beam.ParDo(writer_fn))
> assert_that(
p_read_once,
equal_to(writer_fn.get_expect(match_updated_files=False)),
label='assert read new files results')
apache_beam/io/avroio_test.py:521:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:644: in __exit__
self.result = self.run()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/testing/test_pipeline.py:115: in run
result = super().run(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:591: in run
return Pipeline.from_runner_api(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:618: in run
return self.runner.run_pipeline(self, self._options)
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/direct/direct_runner.py:190: in run_pipeline
return runner.run_pipeline(pipeline, options)
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:196: in run_pipeline
self._latest_run_result = self.run_via_runner_api(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:223: in run_via_runner_api
return self.run_stages(stage_context, stages)
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:470: in run_stages
bundle_results = self._execute_bundle(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:795: in _execute_bundle
self._run_bundle(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1034: in _run_bundle
result, splits = bundle_manager.process_bundle(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1360: in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py:386: in push
response = self.worker.do_instruction(request)
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py:658: in do_instruction
return getattr(self, request_type)(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py:696: in process_bundle
bundle_processor.process_bundle(instruction_id))
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py:1274: in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py:237: in process_encoded
self.output(decoded_value)
apache_beam/runners/worker/operations.py:566: in apache_beam.runners.worker.operations.Operation.output
def output(self, windowed_value, output_index=0):
apache_beam/runners/worker/operations.py:568: in apache_beam.runners.worker.operations.Operation.output
_cast_to_receiver(self.receivers[output_index]).receive(windowed_value)
apache_beam/runners/worker/operations.py:259: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
def receive(self, windowed_value):
apache_beam/runners/worker/operations.py:262: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:949: in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
apache_beam/runners/worker/operations.py:950: in apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in apache_beam.runners.common._OutputHandler.handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in apache_beam.runners.common._OutputHandler._write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:949: in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
apache_beam/runners/worker/operations.py:950: in apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in apache_beam.runners.common._OutputHandler.handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in apache_beam.runners.common._OutputHandler._write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:949: in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
apache_beam/runners/worker/operations.py:950: in apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in apache_beam.runners.common._OutputHandler.handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in apache_beam.runners.common._OutputHandler._write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:949: in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
apache_beam/runners/worker/operations.py:950: in apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1606: in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise new_exn
apache_beam/runners/common.py:1495: in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:911: in apache_beam.runners.common.PerWindowInvoker.invoke_process
self._invoke_process_per_window(
apache_beam/runners/common.py:1055: in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
self.process_method(*args_for_process, **kwargs_for_process),
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/transforms/core.py:2084: in <lambda>
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
actual = [('file1', {'favorite_color': 'blue', 'favorite_number': 1, 'name': 'Thomas'}), ('file1', {'favorite_color': 'green', ... 'favorite_number': 3, 'name': 'Henry'}), ('file2', {'favorite_color': 'brown', 'favorite_number': 7, 'name': 'Toby'})]
equals_fn = <function equal_to.<locals>._equal.<locals>.<lambda> at 0x7ea1d2789700>
def _equal(actual, equals_fn=equals_fn):
expected_list = list(expected)
# Try to compare actual and expected by sorting. This fails with a
# TypeError in Python 3 if different types are present in the same
# collection. It can also raise false negatives for types that don't have
# a deterministic sort order, like pyarrow Tables as of 0.14.1
if not equals_fn:
equals_fn = lambda e, a: e == a
try:
sorted_expected = sorted(expected)
sorted_actual = sorted(actual)
if sorted_expected == sorted_actual:
return
except TypeError:
pass
# Slower method, used in two cases:
# 1) If sorted expected != actual, use this method to verify the inequality.
# This ensures we don't raise any false negatives for types that don't
# have a deterministic sort order.
# 2) As a fallback if we encounter a TypeError in python 3. this method
# works on collections that have different types.
unexpected = []
for element in actual:
found = False
for i, v in enumerate(expected_list):
if equals_fn(v, element):
found = True
expected_list.pop(i)
break
if not found:
unexpected.append(element)
if unexpected or expected_list:
msg = 'Failed assert: %r == %r' % (expected, actual)
if unexpected:
msg = msg + ', unexpected elements %r' % unexpected
if expected_list:
msg = msg + ', missing elements %r' % expected_list
> raise BeamAssertException(msg)
E apache_beam.testing.util.BeamAssertException: Failed assert: [('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})] == [('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})], unexpected elements [('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'})] [while running 'assert read new files results/Match']
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/testing/util.py:192: BeamAssertException
Check warning on line 0 in apache_beam.pipeline_test.PipelineTest
github-actions / Test Results
1 out of 2 runs failed: test_apply_custom_transform (apache_beam.pipeline_test.PipelineTest)
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 10m 0s]
Raw output
Failed: Timeout >600.0s
self = <apache_beam.pipeline_test.PipelineTest testMethod=test_apply_custom_transform>
def test_apply_custom_transform(self):
with TestPipeline() as pipeline:
pcoll = pipeline | 'pcoll' >> Create([1, 2, 3])
result = pcoll | PipelineTest.CustomTransform()
> assert_that(result, equal_to([2, 3, 4]))
apache_beam/pipeline_test.py:256:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:644: in __exit__
self.result = self.run()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/testing/test_pipeline.py:115: in run
result = super().run(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:591: in run
return Pipeline.from_runner_api(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:618: in run
return self.runner.run_pipeline(self, self._options)
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/direct/direct_runner.py:160: in run_pipeline
pr = runner.run_pipeline(pipeline, options)
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/runner.py:180: in run_pipeline
return self.run_portable_pipeline(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:375: in run_portable_pipeline
job_service_handle = self.create_job_service(options)
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/portable_runner.py:289: in create_job_service
return self.create_job_service_handle(server.start(), options)
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/job_server.py:79: in start
self._endpoint = self._job_server.start()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/job_server.py:104: in start
cmd, endpoint = self.subprocess_cmd_and_endpoint()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/prism_runner.py:261: in subprocess_cmd_and_endpoint
self.path_to_binary(), ignore_cache=(self._path is not None))
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/prism_runner.py:209: in path_to_binary
process = subprocess.run(["go", "install", PRISMPKG],
/opt/hostedtoolcache/Python/3.9.22/x64/lib/python3.9/subprocess.py:505: in run
with Popen(*popenargs, **kwargs) as process:
/opt/hostedtoolcache/Python/3.9.22/x64/lib/python3.9/subprocess.py:951: in __init__
self._execute_child(args, executable, preexec_fn, close_fds,
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Popen: returncode: None args: ['go', 'install', 'github.com/apache/beam/sdk...>
args = ['go', 'install', 'github.com/apache/beam/sdks/v2/go/cmd/prism']
executable = b'go', preexec_fn = None, close_fds = True, pass_fds = ()
cwd = None
env = {'GOBIN': '/home/runner/.apache_beam/cache/prism/bin', 'HOME': '/home/runner', 'LC_CTYPE': 'C.UTF-8', 'LD_LIBRARY_PATH': '/opt/hostedtoolcache/Python/3.9.22/x64/lib', ...}
startupinfo = None, creationflags = 0, shell = False, p2cread = -1
p2cwrite = -1, c2pread = 19, c2pwrite = 20, errread = -1, errwrite = 20
restore_signals = True, gid = None, gids = None, uid = None, umask = -1
start_new_session = False
def _execute_child(self, args, executable, preexec_fn, close_fds,
pass_fds, cwd, env,
startupinfo, creationflags, shell,
p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite,
restore_signals,
gid, gids, uid, umask,
start_new_session):
"""Execute program (POSIX version)"""
if isinstance(args, (str, bytes)):
args = [args]
elif isinstance(args, os.PathLike):
if shell:
raise TypeError('path-like args is not allowed when '
'shell is true')
args = [args]
else:
args = list(args)
if shell:
# On Android the default shell is at '/system/bin/sh'.
unix_shell = ('/system/bin/sh' if
hasattr(sys, 'getandroidapilevel') else '/bin/sh')
args = [unix_shell, "-c"] + args
if executable:
args[0] = executable
if executable is None:
executable = args[0]
sys.audit("subprocess.Popen", executable, args, cwd, env)
if (_USE_POSIX_SPAWN
and os.path.dirname(executable)
and preexec_fn is None
and not close_fds
and not pass_fds
and cwd is None
and (p2cread == -1 or p2cread > 2)
and (c2pwrite == -1 or c2pwrite > 2)
and (errwrite == -1 or errwrite > 2)
and not start_new_session
and gid is None
and gids is None
and uid is None
and umask < 0):
self._posix_spawn(args, executable, env, restore_signals,
p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite)
return
orig_executable = executable
# For transferring possible exec failure from child to parent.
# Data format: "exception name:hex errno:description"
# Pickle is not used; it is complex and involves memory allocation.
errpipe_read, errpipe_write = os.pipe()
# errpipe_write must not be in the standard io 0, 1, or 2 fd range.
low_fds_to_close = []
while errpipe_write < 3:
low_fds_to_close.append(errpipe_write)
errpipe_write = os.dup(errpipe_write)
for low_fd in low_fds_to_close:
os.close(low_fd)
try:
try:
# We must avoid complex work that could involve
# malloc or free in the child process to avoid
# potential deadlocks, thus we do all this here.
# and pass it to fork_exec()
if env is not None:
env_list = []
for k, v in env.items():
k = os.fsencode(k)
if b'=' in k:
raise ValueError("illegal environment variable name")
env_list.append(k + b'=' + os.fsencode(v))
else:
env_list = None # Use execv instead of execve.
executable = os.fsencode(executable)
if os.path.dirname(executable):
executable_list = (executable,)
else:
# This matches the behavior of os._execvpe().
executable_list = tuple(
os.path.join(os.fsencode(dir), executable)
for dir in os.get_exec_path(env))
fds_to_keep = set(pass_fds)
fds_to_keep.add(errpipe_write)
self.pid = _posixsubprocess.fork_exec(
args, executable_list,
close_fds, tuple(sorted(map(int, fds_to_keep))),
cwd, env_list,
p2cread, p2cwrite, c2pread, c2pwrite,
errread, errwrite,
errpipe_read, errpipe_write,
restore_signals, start_new_session,
gid, gids, uid, umask,
preexec_fn)
self._child_created = True
finally:
# be sure the FD is closed no matter what
os.close(errpipe_write)
self._close_pipe_fds(p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite)
# Wait for exec to fail or succeed; possibly raising an
# exception (limited in size)
errpipe_data = bytearray()
while True:
> part = os.read(errpipe_read, 50000)
E Failed: Timeout >600.0s
/opt/hostedtoolcache/Python/3.9.22/x64/lib/python3.9/subprocess.py:1793: Failed
Check warning on line 0 in apache_beam.io.avroio_test.TestFastAvro
github-actions / Test Results
All 2 runs failed: test_read_all_continuously_update (apache_beam.io.avroio_test.TestFastAvro)
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloud.xml [took 12s]
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 10s]
Raw output
apache_beam.testing.util.BeamAssertException: Failed assert: [('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})] == [('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'}), ('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})], unexpected elements [('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})] [while running 'assert read updated files results/Match']
> return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:1495:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/runners/common.py:911: in apache_beam.runners.common.PerWindowInvoker.invoke_process
self._invoke_process_per_window(
apache_beam/runners/common.py:1055: in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
self.process_method(*args_for_process, **kwargs_for_process),
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/transforms/core.py:2084: in <lambda>
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
actual = [('file1', {'favorite_color': 'blue', 'favorite_number': 1, 'name': 'Thomas'}), ('file1', {'favorite_color': 'green', ...orite_number': 7, 'name': 'Toby'}), ('file1', {'favorite_color': 'blue', 'favorite_number': 1, 'name': 'Thomas'}), ...]
equals_fn = <function equal_to.<locals>._equal.<locals>.<lambda> at 0x7ea1aaffd310>
def _equal(actual, equals_fn=equals_fn):
expected_list = list(expected)
# Try to compare actual and expected by sorting. This fails with a
# TypeError in Python 3 if different types are present in the same
# collection. It can also raise false negatives for types that don't have
# a deterministic sort order, like pyarrow Tables as of 0.14.1
if not equals_fn:
equals_fn = lambda e, a: e == a
try:
sorted_expected = sorted(expected)
sorted_actual = sorted(actual)
if sorted_expected == sorted_actual:
return
except TypeError:
pass
# Slower method, used in two cases:
# 1) If sorted expected != actual, use this method to verify the inequality.
# This ensures we don't raise any false negatives for types that don't
# have a deterministic sort order.
# 2) As a fallback if we encounter a TypeError in python 3. this method
# works on collections that have different types.
unexpected = []
for element in actual:
found = False
for i, v in enumerate(expected_list):
if equals_fn(v, element):
found = True
expected_list.pop(i)
break
if not found:
unexpected.append(element)
if unexpected or expected_list:
msg = 'Failed assert: %r == %r' % (expected, actual)
if unexpected:
msg = msg + ', unexpected elements %r' % unexpected
if expected_list:
msg = msg + ', missing elements %r' % expected_list
> raise BeamAssertException(msg)
E apache_beam.testing.util.BeamAssertException: Failed assert: [('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})] == [('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'}), ('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})], unexpected elements [('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})]
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/testing/util.py:192: BeamAssertException
During handling of the above exception, another exception occurred:
self = <apache_beam.io.avroio_test.TestFastAvro testMethod=test_read_all_continuously_update>
def test_read_all_continuously_update(self):
with TestPipeline() as pipeline:
tempdir = tempfile.mkdtemp()
writer_fn = self._WriteFilesFn(self.SCHEMA, self.RECORDS, tempdir)
with open(FileSystems.join(tempdir, 'file1'), 'wb') as f:
writer(f, writer_fn.SCHEMA, writer_fn.gen_records(1))
match_pattern = FileSystems.join(tempdir, '*')
interval = 0.5
last = 2
p_read_upd = (
pipeline
| 'Continuously read updated files' >>
avroio.ReadAllFromAvroContinuously(
match_pattern,
with_filename=True,
start_timestamp=Timestamp.now(),
interval=interval,
stop_timestamp=Timestamp.now() + last,
match_updated_files=True)
| 'add dumb key' >> beam.Map(lambda x: (0, x))
| 'Write files on-the-fly' >> beam.ParDo(writer_fn))
> assert_that(
p_read_upd,
equal_to(writer_fn.get_expect(match_updated_files=True)),
label='assert read updated files results')
apache_beam/io/avroio_test.py:548:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:644: in __exit__
self.result = self.run()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/testing/test_pipeline.py:115: in run
result = super().run(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:591: in run
return Pipeline.from_runner_api(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:618: in run
return self.runner.run_pipeline(self, self._options)
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/direct/direct_runner.py:190: in run_pipeline
return runner.run_pipeline(pipeline, options)
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:196: in run_pipeline
self._latest_run_result = self.run_via_runner_api(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:223: in run_via_runner_api
return self.run_stages(stage_context, stages)
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:470: in run_stages
bundle_results = self._execute_bundle(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:795: in _execute_bundle
self._run_bundle(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1034: in _run_bundle
result, splits = bundle_manager.process_bundle(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1360: in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py:386: in push
response = self.worker.do_instruction(request)
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py:658: in do_instruction
return getattr(self, request_type)(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py:696: in process_bundle
bundle_processor.process_bundle(instruction_id))
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py:1274: in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py:237: in process_encoded
self.output(decoded_value)
apache_beam/runners/worker/operations.py:566: in apache_beam.runners.worker.operations.Operation.output
def output(self, windowed_value, output_index=0):
apache_beam/runners/worker/operations.py:568: in apache_beam.runners.worker.operations.Operation.output
_cast_to_receiver(self.receivers[output_index]).receive(windowed_value)
apache_beam/runners/worker/operations.py:259: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
def receive(self, windowed_value):
apache_beam/runners/worker/operations.py:262: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:949: in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
apache_beam/runners/worker/operations.py:950: in apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in apache_beam.runners.common._OutputHandler.handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in apache_beam.runners.common._OutputHandler._write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:949: in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
apache_beam/runners/worker/operations.py:950: in apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in apache_beam.runners.common._OutputHandler.handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in apache_beam.runners.common._OutputHandler._write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:949: in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
apache_beam/runners/worker/operations.py:950: in apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in apache_beam.runners.common._OutputHandler.handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in apache_beam.runners.common._OutputHandler._write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:949: in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
apache_beam/runners/worker/operations.py:950: in apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1606: in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise new_exn
apache_beam/runners/common.py:1495: in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:911: in apache_beam.runners.common.PerWindowInvoker.invoke_process
self._invoke_process_per_window(
apache_beam/runners/common.py:1055: in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
self.process_method(*args_for_process, **kwargs_for_process),
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/transforms/core.py:2084: in <lambda>
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
actual = [('file1', {'favorite_color': 'blue', 'favorite_number': 1, 'name': 'Thomas'}), ('file1', {'favorite_color': 'green', ...orite_number': 7, 'name': 'Toby'}), ('file1', {'favorite_color': 'blue', 'favorite_number': 1, 'name': 'Thomas'}), ...]
equals_fn = <function equal_to.<locals>._equal.<locals>.<lambda> at 0x7ea1aaffd310>
def _equal(actual, equals_fn=equals_fn):
expected_list = list(expected)
# Try to compare actual and expected by sorting. This fails with a
# TypeError in Python 3 if different types are present in the same
# collection. It can also raise false negatives for types that don't have
# a deterministic sort order, like pyarrow Tables as of 0.14.1
if not equals_fn:
equals_fn = lambda e, a: e == a
try:
sorted_expected = sorted(expected)
sorted_actual = sorted(actual)
if sorted_expected == sorted_actual:
return
except TypeError:
pass
# Slower method, used in two cases:
# 1) If sorted expected != actual, use this method to verify the inequality.
# This ensures we don't raise any false negatives for types that don't
# have a deterministic sort order.
# 2) As a fallback if we encounter a TypeError in python 3. this method
# works on collections that have different types.
unexpected = []
for element in actual:
found = False
for i, v in enumerate(expected_list):
if equals_fn(v, element):
found = True
expected_list.pop(i)
break
if not found:
unexpected.append(element)
if unexpected or expected_list:
msg = 'Failed assert: %r == %r' % (expected, actual)
if unexpected:
msg = msg + ', unexpected elements %r' % unexpected
if expected_list:
msg = msg + ', missing elements %r' % expected_list
> raise BeamAssertException(msg)
E apache_beam.testing.util.BeamAssertException: Failed assert: [('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})] == [('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'}), ('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})], unexpected elements [('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})] [while running 'assert read updated files results/Match']
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/testing/util.py:192: BeamAssertException
Check warning on line 0 in apache_beam.io.textio_test.TextSourceTest
github-actions / Test Results
All 2 runs failed: test_read_all_continuously_update (apache_beam.io.textio_test.TextSourceTest)
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloud.xml [took 12s]
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 10s]
Raw output
apache_beam.testing.util.BeamAssertException: Failed assert: [('file1', 'first'), ('file1', 'second A'), ('file1', 'second B'), ('file2', 'first')] == [('file1', 'second A'), ('file1', 'second B'), ('file2', 'first'), ('file1', 'second A'), ('file1', 'second B'), ('file2', 'first')], unexpected elements [('file1', 'second A'), ('file1', 'second B'), ('file2', 'first')], missing elements [('file1', 'first')] [while running 'assert read updated files results/Match']
> return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:1495:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/runners/common.py:911: in apache_beam.runners.common.PerWindowInvoker.invoke_process
self._invoke_process_per_window(
apache_beam/runners/common.py:1055: in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
self.process_method(*args_for_process, **kwargs_for_process),
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/transforms/core.py:2084: in <lambda>
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
actual = [('file1', 'second A'), ('file1', 'second B'), ('file2', 'first'), ('file1', 'second A'), ('file1', 'second B'), ('file2', 'first')]
equals_fn = <function equal_to.<locals>._equal.<locals>.<lambda> at 0x7a8900713c10>
def _equal(actual, equals_fn=equals_fn):
expected_list = list(expected)
# Try to compare actual and expected by sorting. This fails with a
# TypeError in Python 3 if different types are present in the same
# collection. It can also raise false negatives for types that don't have
# a deterministic sort order, like pyarrow Tables as of 0.14.1
if not equals_fn:
equals_fn = lambda e, a: e == a
try:
sorted_expected = sorted(expected)
sorted_actual = sorted(actual)
if sorted_expected == sorted_actual:
return
except TypeError:
pass
# Slower method, used in two cases:
# 1) If sorted expected != actual, use this method to verify the inequality.
# This ensures we don't raise any false negatives for types that don't
# have a deterministic sort order.
# 2) As a fallback if we encounter a TypeError in python 3. this method
# works on collections that have different types.
unexpected = []
for element in actual:
found = False
for i, v in enumerate(expected_list):
if equals_fn(v, element):
found = True
expected_list.pop(i)
break
if not found:
unexpected.append(element)
if unexpected or expected_list:
msg = 'Failed assert: %r == %r' % (expected, actual)
if unexpected:
msg = msg + ', unexpected elements %r' % unexpected
if expected_list:
msg = msg + ', missing elements %r' % expected_list
> raise BeamAssertException(msg)
E apache_beam.testing.util.BeamAssertException: Failed assert: [('file1', 'first'), ('file1', 'second A'), ('file1', 'second B'), ('file2', 'first')] == [('file1', 'second A'), ('file1', 'second B'), ('file2', 'first'), ('file1', 'second A'), ('file1', 'second B'), ('file2', 'first')], unexpected elements [('file1', 'second A'), ('file1', 'second B'), ('file2', 'first')], missing elements [('file1', 'first')]
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/testing/util.py:192: BeamAssertException
During handling of the above exception, another exception occurred:
self = <apache_beam.io.textio_test.TextSourceTest testMethod=test_read_all_continuously_update>
def test_read_all_continuously_update(self):
with TempDir() as tempdir, TestPipeline() as pipeline:
temp_path = tempdir.get_path()
# create a temp file at the beginning
with open(FileSystems.join(temp_path, 'file1'), 'w') as f:
f.write('first')
match_pattern = FileSystems.join(temp_path, '*')
interval = 0.5
last = 2
p_read_upd = (
pipeline
| 'Continuously read updated files' >> ReadAllFromTextContinuously(
match_pattern,
with_filename=True,
start_timestamp=Timestamp.now(),
interval=interval,
stop_timestamp=Timestamp.now() + last,
match_updated_files=True)
| 'add dumb key' >> beam.Map(lambda x: (0, x))
|
'Write files on-the-fly' >> beam.ParDo(self._WriteFilesFn(temp_path)))
> assert_that(
p_read_upd,
equal_to([('file1', 'first'), ('file1', 'second A'),
('file1', 'second B'), ('file2', 'first')]),
label='assert read updated files results')
apache_beam/io/textio_test.py:661:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:644: in __exit__
self.result = self.run()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/testing/test_pipeline.py:115: in run
result = super().run(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:591: in run
return Pipeline.from_runner_api(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:618: in run
return self.runner.run_pipeline(self, self._options)
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/direct/direct_runner.py:190: in run_pipeline
return runner.run_pipeline(pipeline, options)
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:196: in run_pipeline
self._latest_run_result = self.run_via_runner_api(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:223: in run_via_runner_api
return self.run_stages(stage_context, stages)
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:470: in run_stages
bundle_results = self._execute_bundle(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:795: in _execute_bundle
self._run_bundle(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1034: in _run_bundle
result, splits = bundle_manager.process_bundle(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1360: in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py:386: in push
response = self.worker.do_instruction(request)
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py:658: in do_instruction
return getattr(self, request_type)(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py:696: in process_bundle
bundle_processor.process_bundle(instruction_id))
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py:1274: in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py:237: in process_encoded
self.output(decoded_value)
apache_beam/runners/worker/operations.py:566: in apache_beam.runners.worker.operations.Operation.output
def output(self, windowed_value, output_index=0):
apache_beam/runners/worker/operations.py:568: in apache_beam.runners.worker.operations.Operation.output
_cast_to_receiver(self.receivers[output_index]).receive(windowed_value)
apache_beam/runners/worker/operations.py:259: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
def receive(self, windowed_value):
apache_beam/runners/worker/operations.py:262: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:949: in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
apache_beam/runners/worker/operations.py:950: in apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in apache_beam.runners.common._OutputHandler.handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in apache_beam.runners.common._OutputHandler._write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:949: in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
apache_beam/runners/worker/operations.py:950: in apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in apache_beam.runners.common._OutputHandler.handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in apache_beam.runners.common._OutputHandler._write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:949: in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
apache_beam/runners/worker/operations.py:950: in apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in apache_beam.runners.common._OutputHandler.handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in apache_beam.runners.common._OutputHandler._write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:949: in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
apache_beam/runners/worker/operations.py:950: in apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1606: in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise new_exn
apache_beam/runners/common.py:1495: in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:911: in apache_beam.runners.common.PerWindowInvoker.invoke_process
self._invoke_process_per_window(
apache_beam/runners/common.py:1055: in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
self.process_method(*args_for_process, **kwargs_for_process),
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/transforms/core.py:2084: in <lambda>
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
actual = [('file1', 'second A'), ('file1', 'second B'), ('file2', 'first'), ('file1', 'second A'), ('file1', 'second B'), ('file2', 'first')]
equals_fn = <function equal_to.<locals>._equal.<locals>.<lambda> at 0x7a8900713c10>
def _equal(actual, equals_fn=equals_fn):
expected_list = list(expected)
# Try to compare actual and expected by sorting. This fails with a
# TypeError in Python 3 if different types are present in the same
# collection. It can also raise false negatives for types that don't have
# a deterministic sort order, like pyarrow Tables as of 0.14.1
if not equals_fn:
equals_fn = lambda e, a: e == a
try:
sorted_expected = sorted(expected)
sorted_actual = sorted(actual)
if sorted_expected == sorted_actual:
return
except TypeError:
pass
# Slower method, used in two cases:
# 1) If sorted expected != actual, use this method to verify the inequality.
# This ensures we don't raise any false negatives for types that don't
# have a deterministic sort order.
# 2) As a fallback if we encounter a TypeError in python 3. this method
# works on collections that have different types.
unexpected = []
for element in actual:
found = False
for i, v in enumerate(expected_list):
if equals_fn(v, element):
found = True
expected_list.pop(i)
break
if not found:
unexpected.append(element)
if unexpected or expected_list:
msg = 'Failed assert: %r == %r' % (expected, actual)
if unexpected:
msg = msg + ', unexpected elements %r' % unexpected
if expected_list:
msg = msg + ', missing elements %r' % expected_list
> raise BeamAssertException(msg)
E apache_beam.testing.util.BeamAssertException: Failed assert: [('file1', 'first'), ('file1', 'second A'), ('file1', 'second B'), ('file2', 'first')] == [('file1', 'second A'), ('file1', 'second B'), ('file2', 'first'), ('file1', 'second A'), ('file1', 'second B'), ('file2', 'first')], unexpected elements [('file1', 'second A'), ('file1', 'second B'), ('file2', 'first')], missing elements [('file1', 'first')] [while running 'assert read updated files results/Match']
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/testing/util.py:192: BeamAssertException
Check warning on line 0 in apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery
github-actions / Test Results
1 out of 2 runs failed: test_get_table_non_transient_exception_0 (apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery)
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloud.xml [took 7s]
Raw output
AssertionError: Exception not raised
a = (<apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery testMethod=test_get_table_non_transient_exception_0>,)
kw = {}
@wraps(func)
def standalone_func(*a, **kw):
> return func(*(a + p.args), **p.kwargs, **kw)
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/parameterized/parameterized.py:620:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery_test.py:655: in test_get_table_non_transient_exception
_ = p | beam.io.ReadFromBigQuery(
E AssertionError: Exception not raised
Check warning on line 0 in apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery
github-actions / Test Results
1 out of 2 runs failed: test_get_table_non_transient_exception_1 (apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery)
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloud.xml [took 6s]
Raw output
AssertionError: Exception not raised
a = (<apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery testMethod=test_get_table_non_transient_exception_1>,)
kw = {}
@wraps(func)
def standalone_func(*a, **kw):
> return func(*(a + p.args), **p.kwargs, **kw)
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/parameterized/parameterized.py:620:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery_test.py:655: in test_get_table_non_transient_exception
_ = p | beam.io.ReadFromBigQuery(
E AssertionError: Exception not raised
Check warning on line 0 in apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery
github-actions / Test Results
1 out of 2 runs failed: test_get_table_non_transient_exception_2 (apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery)
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloud.xml [took 5s]
Raw output
AssertionError: 2 != 3
a = (<apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery testMethod=test_get_table_non_transient_exception_2>,)
kw = {}
@wraps(func)
def standalone_func(*a, **kw):
> return func(*(a + p.args), **p.kwargs, **kw)
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/parameterized/parameterized.py:620:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery_test.py:663: in test_get_table_non_transient_exception
self.assertEqual(expected_retries, mock_get_table.call_count - 1)
E AssertionError: 2 != 3
Check warning on line 0 in apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery
github-actions / Test Results
1 out of 2 runs failed: test_get_table_transient_exception_0 (apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery)
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloud.xml [took 5s]
Raw output
AssertionError: 0 != 2
a = (<apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery testMethod=test_get_table_transient_exception_0>,)
kw = {}
@wraps(func)
def standalone_func(*a, **kw):
> return func(*(a + p.args), **p.kwargs, **kw)
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/parameterized/parameterized.py:620:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery_test.py:548: in test_get_table_transient_exception
self.assertEqual(expected_retries, mock_get_table.call_count - 2)
E AssertionError: 0 != 2
Check warning on line 0 in apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery
github-actions / Test Results
1 out of 2 runs failed: test_get_table_transient_exception_1 (apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery)
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloud.xml [took 6s]
Raw output
AssertionError: 2 != 4
a = (<apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery testMethod=test_get_table_transient_exception_1>,)
kw = {}
@wraps(func)
def standalone_func(*a, **kw):
> return func(*(a + p.args), **p.kwargs, **kw)
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/parameterized/parameterized.py:620:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery_test.py:548: in test_get_table_transient_exception
self.assertEqual(expected_retries, mock_get_table.call_count - 2)
E AssertionError: 2 != 4
Check warning on line 0 in apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery
github-actions / Test Results
1 out of 2 runs failed: test_get_table_transient_exception_2 (apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery)
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloud.xml [took 5s]
Raw output
AssertionError: 3 != 5
a = (<apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery testMethod=test_get_table_transient_exception_2>,)
kw = {}
@wraps(func)
def standalone_func(*a, **kw):
> return func(*(a + p.args), **p.kwargs, **kw)
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/parameterized/parameterized.py:620:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery_test.py:548: in test_get_table_transient_exception
self.assertEqual(expected_retries, mock_get_table.call_count - 2)
E AssertionError: 3 != 5
Check warning on line 0 in apache_beam.io.gcp.bigquery_test.TestWriteToBigQuery
github-actions / Test Results
1 out of 2 runs failed: test_streaming_inserts_flush_on_byte_size_limit (apache_beam.io.gcp.bigquery_test.TestWriteToBigQuery)
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloud.xml [took 9s]
Raw output
AssertionError: 2 != 4
self = <apache_beam.io.gcp.bigquery_test.TestWriteToBigQuery testMethod=test_streaming_inserts_flush_on_byte_size_limit>
mock_insert = <MagicMock name='insert_rows_json' id='133528053047008'>
@mock.patch('google.cloud.bigquery.Client.insert_rows_json')
def test_streaming_inserts_flush_on_byte_size_limit(self, mock_insert):
mock_insert.return_value = []
table = 'project:dataset.table'
rows = [
{
'columnA': 'value1'
},
{
'columnA': 'value2'
},
# this very large row exceeds max size, so should be sent to DLQ
{
'columnA': "large_string" * 100
}
]
with beam.Pipeline() as p:
failed_rows = (
p
| beam.Create(rows)
| WriteToBigQuery(
table=table,
method='STREAMING_INSERTS',
create_disposition='CREATE_NEVER',
schema='columnA:STRING',
max_insert_payload_size=500))
expected_failed_rows = [(table, rows[2])]
assert_that(failed_rows.failed_rows, equal_to(expected_failed_rows))
> self.assertEqual(2, mock_insert.call_count)
E AssertionError: 2 != 4
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/io/gcp/bigquery_test.py:1096: AssertionError
Check warning on line 0 in apache_beam.io.mongodbio_test.WriteMongoFnTest
github-actions / Test Results
All 2 runs failed: test_process (apache_beam.io.mongodbio_test.WriteMongoFnTest)
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloud.xml [took 8s]
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 7s]
Raw output
AssertionError: 2 != 4
self = <apache_beam.io.mongodbio_test.WriteMongoFnTest testMethod=test_process>
mock_sink = <MagicMock name='_MongoSink' id='134156513693856'>
@mock.patch('apache_beam.io.mongodbio._MongoSink')
def test_process(self, mock_sink):
docs = [{'x': 1}, {'x': 2}, {'x': 3}]
with TestPipeline() as p:
_ = (
p | "Create" >> beam.Create(docs)
| "Write" >> beam.ParDo(_WriteMongoFn(batch_size=2)))
p.run()
> self.assertEqual(
2, mock_sink.return_value.__enter__.return_value.write.call_count)
E AssertionError: 2 != 4
apache_beam/io/mongodbio_test.py:620: AssertionError
Check warning on line 0 in apache_beam.io.gcp.experimental.spannerio_test.SpannerReadTest
github-actions / Test Results
1 out of 2 runs failed: test_read_with_index (apache_beam.io.gcp.experimental.spannerio_test.SpannerReadTest)
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloud.xml [took 16s]
Raw output
RuntimeError: generator raised StopIteration [while running 'read all/Read From Partitions']
self = <apache_beam.io.gcp.experimental.spannerio._ReadFromPartitionFn object at 0x7bfa4c7c5310>
element = {'is_sql': False, 'is_table': True, 'partitions': {'partition': 'test_partition', 'read': {'columns': ['Key', 'Value'], 'index': '', 'keyset': {'all': True}, 'table': 'users'}}, 'read_operation': 'process_read_batch', ...}
def process(self, element):
self._snapshot = BatchSnapshot.from_dict(
self._database, element['transaction_info'])
table_id = self._spanner_configuration.table
query_name = self._spanner_configuration.query_name or ''
if element['is_sql'] is True:
read_action = self._snapshot.process_query_batch
self.service_metric = self._query_metric(query_name)
elif element['is_table'] is True:
read_action = self._snapshot.process_read_batch
self.service_metric = self._table_metric(table_id)
else:
raise ValueError(
"ReadOperation is improperly configure: %s" % str(element))
try:
> for row in read_action(element['partitions']):
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/io/gcp/experimental/spannerio.py:662:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/mock/mock.py:1190: in __call__
return _mock_self._mock_call(*args, **kwargs)
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/mock/mock.py:1194: in _mock_call
return _mock_self._execute_mock_call(*args, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_mock_self = <MagicMock name='BatchSnapshot.from_dict().process_read_batch' id='136315485214848'>
args = ({'partition': 'test_partition', 'read': {'columns': ['Key', 'Value'], 'index': '', 'keyset': {'all': True}, 'table': 'users'}},)
kwargs = {}
self = <MagicMock name='BatchSnapshot.from_dict().process_read_batch' id='136315485214848'>
effect = <list_iterator object at 0x7bfa6c136610>
def _execute_mock_call(_mock_self, *args, **kwargs):
self = _mock_self
# separate from _increment_mock_call so that awaited functions are
# executed separately from their call, also AsyncMock overrides this method
effect = self.side_effect
if effect is not None:
if _is_exception(effect):
raise effect
elif not _callable(effect):
> result = next(effect)
E StopIteration
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/mock/mock.py:1253: StopIteration
The above exception was the direct cause of the following exception:
> return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:1495:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/runners/common.py:683: in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_handler.handle_process_outputs(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> for result in results:
E RuntimeError: generator raised StopIteration
apache_beam/runners/common.py:1670: RuntimeError
During handling of the above exception, another exception occurred:
self = <apache_beam.io.gcp.experimental.spannerio_test.SpannerReadTest testMethod=test_read_with_index>
mock_batch_snapshot_class = <MagicMock name='BatchSnapshot' id='136315485009808'>
mock_client_class = <MagicMock name='Client' id='136315485560304'>
def test_read_with_index(self, mock_batch_snapshot_class, mock_client_class):
mock_snapshot_instance = mock.MagicMock()
mock_snapshot_instance.generate_read_batches.return_value = [{
'read': {
'table': 'users',
'keyset': {
'all': True
},
'columns': ['Key', 'Value'],
'index': ''
},
'partition': 'test_partition'
} for _ in range(3)]
mock_batch_snapshot_instance = mock.MagicMock()
# Prepare process_read_batch return results for three pipelines
mock_batch_snapshot_instance.process_read_batch.side_effect = [
FAKE_ROWS[0:2], FAKE_ROWS[2:4], FAKE_ROWS[4:]
] * 3
mock_snapshot_instance.to_dict.return_value = {}
mock_client_class.return_value.instance.return_value.database.return_value \
.batch_snapshot.return_value = mock_snapshot_instance
mock_batch_snapshot_class.from_dict.return_value \
= mock_batch_snapshot_instance
ro = [ReadOperation.table("users", ["Key", "Value"], index="Key")]
with TestPipeline() as pipeline:
read = (
pipeline
| 'read' >> ReadFromSpanner(
TEST_PROJECT_ID,
TEST_INSTANCE_ID,
_generate_database_name(),
table="users",
columns=["Key", "Value"]))
assert_that(read, equal_to(FAKE_ROWS), label='checkRead')
with TestPipeline() as pipeline:
readall = (
pipeline
| 'read all' >> ReadFromSpanner(
TEST_PROJECT_ID,
TEST_INSTANCE_ID,
_generate_database_name(),
read_operations=ro))
> assert_that(readall, equal_to(FAKE_ROWS), label='checkReadAll')
apache_beam/io/gcp/experimental/spannerio_test.py:260:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/pipeline.py:644: in __exit__
self.result = self.run()
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/testing/test_pipeline.py:115: in run
result = super().run(
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/pipeline.py:591: in run
return Pipeline.from_runner_api(
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/pipeline.py:618: in run
return self.runner.run_pipeline(self, self._options)
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/direct/direct_runner.py:190: in run_pipeline
return runner.run_pipeline(pipeline, options)
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:196: in run_pipeline
self._latest_run_result = self.run_via_runner_api(
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:223: in run_via_runner_api
return self.run_stages(stage_context, stages)
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:470: in run_stages
bundle_results = self._execute_bundle(
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:795: in _execute_bundle
self._run_bundle(
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1034: in _run_bundle
result, splits = bundle_manager.process_bundle(
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1360: in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py:386: in push
response = self.worker.do_instruction(request)
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py:658: in do_instruction
return getattr(self, request_type)(
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py:696: in process_bundle
bundle_processor.process_bundle(instruction_id))
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py:1274: in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py:237: in process_encoded
self.output(decoded_value)
apache_beam/runners/worker/operations.py:566: in apache_beam.runners.worker.operations.Operation.output
def output(self, windowed_value, output_index=0):
apache_beam/runners/worker/operations.py:568: in apache_beam.runners.worker.operations.Operation.output
_cast_to_receiver(self.receivers[output_index]).receive(windowed_value)
apache_beam/runners/worker/operations.py:259: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
def receive(self, windowed_value):
apache_beam/runners/worker/operations.py:262: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:949: in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
apache_beam/runners/worker/operations.py:950: in apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in apache_beam.runners.common._OutputHandler.handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in apache_beam.runners.common._OutputHandler._write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:949: in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
apache_beam/runners/worker/operations.py:950: in apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in apache_beam.runners.common._OutputHandler.handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in apache_beam.runners.common._OutputHandler._write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:949: in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
apache_beam/runners/worker/operations.py:950: in apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1606: in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise new_exn
apache_beam/runners/common.py:1495: in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_handler.handle_process_outputs(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> for result in results:
E RuntimeError: generator raised StopIteration [while running 'read all/Read From Partitions']
apache_beam/runners/common.py:1670: RuntimeError
Check warning on line 0 in apache_beam.io.gcp.experimental.spannerio_test.SpannerReadTest
github-actions / Test Results
1 out of 2 runs failed: test_read_with_query_batch (apache_beam.io.gcp.experimental.spannerio_test.SpannerReadTest)
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloud.xml [took 21s]
Raw output
RuntimeError: generator raised StopIteration [while running 'read all/Read From Partitions']
self = <apache_beam.io.gcp.experimental.spannerio._ReadFromPartitionFn object at 0x7bfb186ee2b0>
element = {'is_sql': True, 'is_table': False, 'partitions': {'partition': 'test_partition', 'query': {'sql': 'SELECT * FROM users'}}, 'read_operation': 'process_query_batch', ...}
def process(self, element):
self._snapshot = BatchSnapshot.from_dict(
self._database, element['transaction_info'])
table_id = self._spanner_configuration.table
query_name = self._spanner_configuration.query_name or ''
if element['is_sql'] is True:
read_action = self._snapshot.process_query_batch
self.service_metric = self._query_metric(query_name)
elif element['is_table'] is True:
read_action = self._snapshot.process_read_batch
self.service_metric = self._table_metric(table_id)
else:
raise ValueError(
"ReadOperation is improperly configure: %s" % str(element))
try:
> for row in read_action(element['partitions']):
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/io/gcp/experimental/spannerio.py:662:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/mock/mock.py:1190: in __call__
return _mock_self._mock_call(*args, **kwargs)
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/mock/mock.py:1194: in _mock_call
return _mock_self._execute_mock_call(*args, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_mock_self = <MagicMock name='BatchSnapshot.from_dict().process_query_batch' id='136318375352016'>
args = ({'partition': 'test_partition', 'query': {'sql': 'SELECT * FROM users'}},)
kwargs = {}
self = <MagicMock name='BatchSnapshot.from_dict().process_query_batch' id='136318375352016'>
effect = <list_iterator object at 0x7bfb1847f400>
def _execute_mock_call(_mock_self, *args, **kwargs):
self = _mock_self
# separate from _increment_mock_call so that awaited functions are
# executed separately from their call, also AsyncMock overrides this method
effect = self.side_effect
if effect is not None:
if _is_exception(effect):
raise effect
elif not _callable(effect):
> result = next(effect)
E StopIteration
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/mock/mock.py:1253: StopIteration
The above exception was the direct cause of the following exception:
> return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:1495:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/runners/common.py:683: in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_handler.handle_process_outputs(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> for result in results:
E RuntimeError: generator raised StopIteration
apache_beam/runners/common.py:1670: RuntimeError
During handling of the above exception, another exception occurred:
self = <apache_beam.io.gcp.experimental.spannerio_test.SpannerReadTest testMethod=test_read_with_query_batch>
mock_batch_snapshot_class = <MagicMock name='BatchSnapshot' id='136318041715424'>
mock_client_class = <MagicMock name='Client' id='136318041740528'>
def test_read_with_query_batch(
self, mock_batch_snapshot_class, mock_client_class):
mock_snapshot_instance = mock.MagicMock()
mock_snapshot_instance.generate_query_batches.return_value = [{
'query': {
'sql': 'SELECT * FROM users'
}, 'partition': 'test_partition'
} for _ in range(3)]
mock_snapshot_instance.to_dict.return_value = {}
mock_batch_snapshot_instance = mock.MagicMock()
# Prepare process_query_batch return results for three pipelines
mock_batch_snapshot_instance.process_query_batch.side_effect = [
FAKE_ROWS[0:2], FAKE_ROWS[2:4], FAKE_ROWS[4:]
] * 3
mock_client_class.return_value.instance.return_value.database.return_value \
.batch_snapshot.return_value = mock_snapshot_instance
mock_batch_snapshot_class.from_dict.return_value \
= mock_batch_snapshot_instance
ro = [ReadOperation.query("Select * from users")]
with TestPipeline() as pipeline:
read = (
pipeline
| 'read' >> ReadFromSpanner(
TEST_PROJECT_ID,
TEST_INSTANCE_ID,
_generate_database_name(),
sql="SELECT * FROM users"))
assert_that(read, equal_to(FAKE_ROWS), label='checkRead')
with TestPipeline() as pipeline:
readall = (
pipeline
| 'read all' >> ReadFromSpanner(
TEST_PROJECT_ID,
TEST_INSTANCE_ID,
_generate_database_name(),
read_operations=ro))
> assert_that(readall, equal_to(FAKE_ROWS), label='checkReadAll')
apache_beam/io/gcp/experimental/spannerio_test.py:123:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/pipeline.py:644: in __exit__
self.result = self.run()
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/testing/test_pipeline.py:115: in run
result = super().run(
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/pipeline.py:591: in run
return Pipeline.from_runner_api(
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/pipeline.py:618: in run
return self.runner.run_pipeline(self, self._options)
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/direct/direct_runner.py:190: in run_pipeline
return runner.run_pipeline(pipeline, options)
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:196: in run_pipeline
self._latest_run_result = self.run_via_runner_api(
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:223: in run_via_runner_api
return self.run_stages(stage_context, stages)
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:470: in run_stages
bundle_results = self._execute_bundle(
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:795: in _execute_bundle
self._run_bundle(
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1034: in _run_bundle
result, splits = bundle_manager.process_bundle(
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1360: in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py:386: in push
response = self.worker.do_instruction(request)
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py:658: in do_instruction
return getattr(self, request_type)(
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py:696: in process_bundle
bundle_processor.process_bundle(instruction_id))
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py:1274: in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py:237: in process_encoded
self.output(decoded_value)
apache_beam/runners/worker/operations.py:566: in apache_beam.runners.worker.operations.Operation.output
def output(self, windowed_value, output_index=0):
apache_beam/runners/worker/operations.py:568: in apache_beam.runners.worker.operations.Operation.output
_cast_to_receiver(self.receivers[output_index]).receive(windowed_value)
apache_beam/runners/worker/operations.py:259: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
def receive(self, windowed_value):
apache_beam/runners/worker/operations.py:262: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:949: in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
apache_beam/runners/worker/operations.py:950: in apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in apache_beam.runners.common._OutputHandler.handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in apache_beam.runners.common._OutputHandler._write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:949: in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
apache_beam/runners/worker/operations.py:950: in apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in apache_beam.runners.common._OutputHandler.handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in apache_beam.runners.common._OutputHandler._write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:949: in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
apache_beam/runners/worker/operations.py:950: in apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1606: in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise new_exn
apache_beam/runners/common.py:1495: in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_handler.handle_process_outputs(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> for result in results:
E RuntimeError: generator raised StopIteration [while running 'read all/Read From Partitions']
apache_beam/runners/common.py:1670: RuntimeError
Check warning on line 0 in apache_beam.io.gcp.experimental.spannerio_test.SpannerReadTest
github-actions / Test Results
1 out of 2 runs failed: test_read_with_table_batch (apache_beam.io.gcp.experimental.spannerio_test.SpannerReadTest)
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloud.xml [took 13s]
Raw output
RuntimeError: generator raised StopIteration [while running 'read all/Read From Partitions']
self = <apache_beam.io.gcp.experimental.spannerio._ReadFromPartitionFn object at 0x7bfa4c7beee0>
element = {'is_sql': False, 'is_table': True, 'partitions': {'partition': 'test_partition', 'read': {'columns': ['Key', 'Value'], 'index': '', 'keyset': {'all': True}, 'table': 'users'}}, 'read_operation': 'process_read_batch', ...}
def process(self, element):
self._snapshot = BatchSnapshot.from_dict(
self._database, element['transaction_info'])
table_id = self._spanner_configuration.table
query_name = self._spanner_configuration.query_name or ''
if element['is_sql'] is True:
read_action = self._snapshot.process_query_batch
self.service_metric = self._query_metric(query_name)
elif element['is_table'] is True:
read_action = self._snapshot.process_read_batch
self.service_metric = self._table_metric(table_id)
else:
raise ValueError(
"ReadOperation is improperly configure: %s" % str(element))
try:
> for row in read_action(element['partitions']):
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/io/gcp/experimental/spannerio.py:662:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/mock/mock.py:1190: in __call__
return _mock_self._mock_call(*args, **kwargs)
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/mock/mock.py:1194: in _mock_call
return _mock_self._execute_mock_call(*args, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_mock_self = <MagicMock name='BatchSnapshot.from_dict().process_read_batch' id='136315485165360'>
args = ({'partition': 'test_partition', 'read': {'columns': ['Key', 'Value'], 'index': '', 'keyset': {'all': True}, 'table': 'users'}},)
kwargs = {}
self = <MagicMock name='BatchSnapshot.from_dict().process_read_batch' id='136315485165360'>
effect = <list_iterator object at 0x7bfb3017e490>
def _execute_mock_call(_mock_self, *args, **kwargs):
self = _mock_self
# separate from _increment_mock_call so that awaited functions are
# executed separately from their call, also AsyncMock overrides this method
effect = self.side_effect
if effect is not None:
if _is_exception(effect):
raise effect
elif not _callable(effect):
> result = next(effect)
E StopIteration
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/mock/mock.py:1253: StopIteration
The above exception was the direct cause of the following exception:
> return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:1495:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/runners/common.py:683: in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_handler.handle_process_outputs(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> for result in results:
E RuntimeError: generator raised StopIteration
apache_beam/runners/common.py:1670: RuntimeError
During handling of the above exception, another exception occurred:
self = <apache_beam.io.gcp.experimental.spannerio_test.SpannerReadTest testMethod=test_read_with_table_batch>
mock_batch_snapshot_class = <MagicMock name='BatchSnapshot' id='136319306221744'>
mock_client_class = <MagicMock name='Client' id='136315485396080'>
def test_read_with_table_batch(
self, mock_batch_snapshot_class, mock_client_class):
mock_snapshot_instance = mock.MagicMock()
mock_snapshot_instance.generate_read_batches.return_value = [{
'read': {
'table': 'users',
'keyset': {
'all': True
},
'columns': ['Key', 'Value'],
'index': ''
},
'partition': 'test_partition'
} for _ in range(3)]
mock_snapshot_instance.to_dict.return_value = {}
mock_batch_snapshot_instance = mock.MagicMock()
# Prepare process_read_batch return results for three pipelines
mock_batch_snapshot_instance.process_read_batch.side_effect = [
FAKE_ROWS[0:2], FAKE_ROWS[2:4], FAKE_ROWS[4:]
] * 3
mock_client_class.return_value.instance.return_value.database.return_value \
.batch_snapshot.return_value = mock_snapshot_instance
mock_batch_snapshot_class.from_dict.return_value \
= mock_batch_snapshot_instance
ro = [ReadOperation.table("users", ["Key", "Value"])]
with TestPipeline() as pipeline:
read = (
pipeline
| 'read' >> ReadFromSpanner(
TEST_PROJECT_ID,
TEST_INSTANCE_ID,
_generate_database_name(),
table="users",
columns=["Key", "Value"]))
assert_that(read, equal_to(FAKE_ROWS), label='checkRead')
with TestPipeline() as pipeline:
readall = (
pipeline
| 'read all' >> ReadFromSpanner(
TEST_PROJECT_ID,
TEST_INSTANCE_ID,
_generate_database_name(),
read_operations=ro))
> assert_that(readall, equal_to(FAKE_ROWS), label='checkReadAll')
apache_beam/io/gcp/experimental/spannerio_test.py:187:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/pipeline.py:644: in __exit__
self.result = self.run()
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/testing/test_pipeline.py:115: in run
result = super().run(
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/pipeline.py:591: in run
return Pipeline.from_runner_api(
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/pipeline.py:618: in run
return self.runner.run_pipeline(self, self._options)
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/direct/direct_runner.py:190: in run_pipeline
return runner.run_pipeline(pipeline, options)
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:196: in run_pipeline
self._latest_run_result = self.run_via_runner_api(
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:223: in run_via_runner_api
return self.run_stages(stage_context, stages)
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:470: in run_stages
bundle_results = self._execute_bundle(
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:795: in _execute_bundle
self._run_bundle(
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1034: in _run_bundle
result, splits = bundle_manager.process_bundle(
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1360: in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py:386: in push
response = self.worker.do_instruction(request)
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py:658: in do_instruction
return getattr(self, request_type)(
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py:696: in process_bundle
bundle_processor.process_bundle(instruction_id))
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py:1274: in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py:237: in process_encoded
self.output(decoded_value)
apache_beam/runners/worker/operations.py:566: in apache_beam.runners.worker.operations.Operation.output
def output(self, windowed_value, output_index=0):
apache_beam/runners/worker/operations.py:568: in apache_beam.runners.worker.operations.Operation.output
_cast_to_receiver(self.receivers[output_index]).receive(windowed_value)
apache_beam/runners/worker/operations.py:259: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
def receive(self, windowed_value):
apache_beam/runners/worker/operations.py:262: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:949: in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
apache_beam/runners/worker/operations.py:950: in apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in apache_beam.runners.common._OutputHandler.handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in apache_beam.runners.common._OutputHandler._write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:949: in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
apache_beam/runners/worker/operations.py:950: in apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in apache_beam.runners.common._OutputHandler.handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in apache_beam.runners.common._OutputHandler._write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:949: in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
apache_beam/runners/worker/operations.py:950: in apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1606: in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise new_exn
apache_beam/runners/common.py:1495: in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_handler.handle_process_outputs(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> for result in results:
E RuntimeError: generator raised StopIteration [while running 'read all/Read From Partitions']
apache_beam/runners/common.py:1670: RuntimeError
Check warning on line 0 in apache_beam.io.gcp.experimental.spannerio_test.SpannerReadTest
github-actions / Test Results
1 out of 2 runs failed: test_read_with_transaction (apache_beam.io.gcp.experimental.spannerio_test.SpannerReadTest)
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloud.xml [took 14s]
Raw output
AssertionError: 2 != 1
self = <apache_beam.io.gcp.experimental.spannerio_test.SpannerReadTest testMethod=test_read_with_transaction>
mock_batch_snapshot_class = <MagicMock name='BatchSnapshot' id='136318375350672'>
mock_client_class = <MagicMock name='Client' id='136318371558880'>
def test_read_with_transaction(
self, mock_batch_snapshot_class, mock_client_class):
mock_snapshot_instance = mock.MagicMock()
mock_snapshot_instance.to_dict.return_value = FAKE_TRANSACTION_INFO
mock_transaction_instance = mock.MagicMock()
mock_transaction_instance.execute_sql.return_value = FAKE_ROWS
mock_transaction_instance.read.return_value = FAKE_ROWS
mock_client_class.return_value.instance.return_value.database.return_value \
.batch_snapshot.return_value = mock_snapshot_instance
mock_client_class.return_value.instance.return_value.database.return_value \
.session.return_value.transaction.return_value.__enter__.return_value \
= mock_transaction_instance
ro = [ReadOperation.query("Select * from users")]
with TestPipeline() as p:
transaction = (
p | create_transaction(
project_id=TEST_PROJECT_ID,
instance_id=TEST_INSTANCE_ID,
database_id=_generate_database_name(),
exact_staleness=datetime.timedelta(seconds=10)))
read_query = (
p | 'with query' >> ReadFromSpanner(
project_id=TEST_PROJECT_ID,
instance_id=TEST_INSTANCE_ID,
database_id=_generate_database_name(),
transaction=transaction,
sql="Select * from users"))
assert_that(read_query, equal_to(FAKE_ROWS), label='checkQuery')
read_table = (
p | 'with table' >> ReadFromSpanner(
project_id=TEST_PROJECT_ID,
instance_id=TEST_INSTANCE_ID,
database_id=_generate_database_name(),
transaction=transaction,
table="users",
columns=["Key", "Value"]))
assert_that(read_table, equal_to(FAKE_ROWS), label='checkTable')
read_indexed_table = (
p | 'with index' >> ReadFromSpanner(
project_id=TEST_PROJECT_ID,
instance_id=TEST_INSTANCE_ID,
database_id=_generate_database_name(),
transaction=transaction,
table="users",
index="Key",
columns=["Key", "Value"]))
assert_that(
read_indexed_table, equal_to(FAKE_ROWS), label='checkTableIndex')
read = (
p | 'read all' >> ReadFromSpanner(
TEST_PROJECT_ID,
TEST_INSTANCE_ID,
_generate_database_name(),
transaction=transaction,
read_operations=ro))
assert_that(read, equal_to(FAKE_ROWS), label='checkReadAll')
read_pipeline = (
p
| 'create read operations' >> beam.Create(ro)
| 'reads' >> ReadFromSpanner(
TEST_PROJECT_ID,
TEST_INSTANCE_ID,
_generate_database_name(),
transaction=transaction))
assert_that(read_pipeline, equal_to(FAKE_ROWS), label='checkReadPipeline')
# transaction setup once
> self.assertEqual(mock_snapshot_instance.to_dict.call_count, 1)
E AssertionError: 2 != 1
apache_beam/io/gcp/experimental/spannerio_test.py:362: AssertionError
Check warning on line 0 in apache_beam.typehints.typecheck_test.RuntimeTypeCheckTest
github-actions / Test Results
All 2 runs failed: test_wrapper_pass_through (apache_beam.typehints.typecheck_test.RuntimeTypeCheckTest)
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloud.xml [took 6s]
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 8s]
Raw output
AssertionError: Lists differ: ['set[76 chars]down'] != ['set[76 chars]down', 'setup', 'start_bundle', 'process', 'pr[42 chars]own']
Second list contains 7 additional elements.
First extra element 7:
'setup'
['setup',
'start_bundle',
'process',
'process',
'process',
'finish_bundle',
+ 'teardown',
+ 'setup',
+ 'start_bundle',
+ 'process',
+ 'process',
+ 'process',
+ 'finish_bundle',
'teardown']
self = <apache_beam.typehints.typecheck_test.RuntimeTypeCheckTest testMethod=test_wrapper_pass_through>
def test_wrapper_pass_through(self):
# We use a file to check the result because the MyDoFn instance passed is
# not the same one that actually runs in the pipeline (it is serialized
# here and deserialized in the worker).
with tempfile.TemporaryDirectory() as tmp_dirname:
path = os.path.join(tmp_dirname + "tmp_filename")
dofn = MyDoFn(path)
result = self.p | beam.Create([1, 2, 3]) | beam.ParDo(dofn)
assert_that(result, equal_to([1, 2, 3]))
self.p.run()
with open(path, mode="r") as ft:
lines = [line.strip() for line in ft]
> self.assertListEqual([
'setup',
'start_bundle',
'process',
'process',
'process',
'finish_bundle',
'teardown',
],
lines)
E AssertionError: Lists differ: ['set[76 chars]down'] != ['set[76 chars]down', 'setup', 'start_bundle', 'process', 'pr[42 chars]own']
E
E Second list contains 7 additional elements.
E First extra element 7:
E 'setup'
E
E ['setup',
E 'start_bundle',
E 'process',
E 'process',
E 'process',
E 'finish_bundle',
E + 'teardown',
E + 'setup',
E + 'start_bundle',
E + 'process',
E + 'process',
E + 'process',
E + 'finish_bundle',
E 'teardown']
apache_beam/typehints/typecheck_test.py:113: AssertionError
Check warning on line 0 in apache_beam.io.fileio_test.MatchContinuouslyTest
github-actions / Test Results
All 2 runs failed: test_without_deduplication (apache_beam.io.fileio_test.MatchContinuouslyTest)
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloud.xml [took 10s]
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 7s]
Raw output
apache_beam.testing.util.BeamAssertException: Failed assert: ['/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpt59wk5_5/tmpv2ai0x7v', '/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpt59wk5_5/tmpv2ai0x7v', '/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpt59wk5_5/extra'] == ['/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpt59wk5_5/tmpv2ai0x7v', '/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpt59wk5_5/extra', '/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpt59wk5_5/tmpv2ai0x7v', '/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpt59wk5_5/extra'], unexpected elements ['/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpt59wk5_5/extra'] [while running 'assert_that/Match']
> return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:1495:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/runners/common.py:911: in apache_beam.runners.common.PerWindowInvoker.invoke_process
self._invoke_process_per_window(
apache_beam/runners/common.py:1055: in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
self.process_method(*args_for_process, **kwargs_for_process),
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/transforms/core.py:2084: in <lambda>
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
actual = ['/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpt59wk5_...r/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpt59wk5_5/extra']
equals_fn = <function equal_to.<locals>._equal.<locals>.<lambda> at 0x7bc449df3310>
def _equal(actual, equals_fn=equals_fn):
expected_list = list(expected)
# Try to compare actual and expected by sorting. This fails with a
# TypeError in Python 3 if different types are present in the same
# collection. It can also raise false negatives for types that don't have
# a deterministic sort order, like pyarrow Tables as of 0.14.1
if not equals_fn:
equals_fn = lambda e, a: e == a
try:
sorted_expected = sorted(expected)
sorted_actual = sorted(actual)
if sorted_expected == sorted_actual:
return
except TypeError:
pass
# Slower method, used in two cases:
# 1) If sorted expected != actual, use this method to verify the inequality.
# This ensures we don't raise any false negatives for types that don't
# have a deterministic sort order.
# 2) As a fallback if we encounter a TypeError in python 3. this method
# works on collections that have different types.
unexpected = []
for element in actual:
found = False
for i, v in enumerate(expected_list):
if equals_fn(v, element):
found = True
expected_list.pop(i)
break
if not found:
unexpected.append(element)
if unexpected or expected_list:
msg = 'Failed assert: %r == %r' % (expected, actual)
if unexpected:
msg = msg + ', unexpected elements %r' % unexpected
if expected_list:
msg = msg + ', missing elements %r' % expected_list
> raise BeamAssertException(msg)
E apache_beam.testing.util.BeamAssertException: Failed assert: ['/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpt59wk5_5/tmpv2ai0x7v', '/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpt59wk5_5/tmpv2ai0x7v', '/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpt59wk5_5/extra'] == ['/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpt59wk5_5/tmpv2ai0x7v', '/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpt59wk5_5/extra', '/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpt59wk5_5/tmpv2ai0x7v', '/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpt59wk5_5/extra'], unexpected elements ['/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpt59wk5_5/extra']
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/testing/util.py:192: BeamAssertException
During handling of the above exception, another exception occurred:
self = <apache_beam.io.fileio_test.MatchContinuouslyTest testMethod=test_without_deduplication>
def test_without_deduplication(self):
interval = 0.2
start = Timestamp.now()
stop = start + interval + 0.1
files = []
tempdir = '%s%s' % (self._new_tempdir(), os.sep)
# Create a file to be matched before pipeline starts
file = self._create_temp_file(dir=tempdir)
# Add file twice, since it will be matched for every interval
files += [file, file]
# Add file name that will be created mid-pipeline
files.append(FileSystems.join(tempdir, 'extra'))
def _create_extra_file(element):
writer = FileSystems.create(FileSystems.join(tempdir, 'extra'))
writer.close()
return element.path
with TestPipeline() as p:
match_continiously = (
p
| fileio.MatchContinuously(
file_pattern=FileSystems.join(tempdir, '*'),
interval=interval,
has_deduplication=False,
start_timestamp=start,
stop_timestamp=stop)
| beam.Map(_create_extra_file))
> assert_that(match_continiously, equal_to(files))
apache_beam/io/fileio_test.py:386:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:644: in __exit__
self.result = self.run()
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/testing/test_pipeline.py:115: in run
result = super().run(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:591: in run
return Pipeline.from_runner_api(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/pipeline.py:618: in run
return self.runner.run_pipeline(self, self._options)
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/direct/direct_runner.py:190: in run_pipeline
return runner.run_pipeline(pipeline, options)
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:196: in run_pipeline
self._latest_run_result = self.run_via_runner_api(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:223: in run_via_runner_api
return self.run_stages(stage_context, stages)
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:470: in run_stages
bundle_results = self._execute_bundle(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:795: in _execute_bundle
self._run_bundle(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1034: in _run_bundle
result, splits = bundle_manager.process_bundle(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1360: in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py:386: in push
response = self.worker.do_instruction(request)
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py:658: in do_instruction
return getattr(self, request_type)(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py:696: in process_bundle
bundle_processor.process_bundle(instruction_id))
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py:1274: in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py:237: in process_encoded
self.output(decoded_value)
apache_beam/runners/worker/operations.py:566: in apache_beam.runners.worker.operations.Operation.output
def output(self, windowed_value, output_index=0):
apache_beam/runners/worker/operations.py:568: in apache_beam.runners.worker.operations.Operation.output
_cast_to_receiver(self.receivers[output_index]).receive(windowed_value)
apache_beam/runners/worker/operations.py:259: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
def receive(self, windowed_value):
apache_beam/runners/worker/operations.py:262: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:949: in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
apache_beam/runners/worker/operations.py:950: in apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in apache_beam.runners.common._OutputHandler.handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in apache_beam.runners.common._OutputHandler._write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:949: in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
apache_beam/runners/worker/operations.py:950: in apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in apache_beam.runners.common._OutputHandler.handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in apache_beam.runners.common._OutputHandler._write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:949: in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
apache_beam/runners/worker/operations.py:950: in apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in apache_beam.runners.common.SimpleInvoker.invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in apache_beam.runners.common._OutputHandler.handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in apache_beam.runners.common._OutputHandler._write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:949: in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
apache_beam/runners/worker/operations.py:950: in apache_beam.runners.worker.operations.DoOperation.process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1606: in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise new_exn
apache_beam/runners/common.py:1495: in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:911: in apache_beam.runners.common.PerWindowInvoker.invoke_process
self._invoke_process_per_window(
apache_beam/runners/common.py:1055: in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
self.process_method(*args_for_process, **kwargs_for_process),
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/transforms/core.py:2084: in <lambda>
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
actual = ['/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpt59wk5_...r/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpt59wk5_5/extra']
equals_fn = <function equal_to.<locals>._equal.<locals>.<lambda> at 0x7bc449df3310>
def _equal(actual, equals_fn=equals_fn):
expected_list = list(expected)
# Try to compare actual and expected by sorting. This fails with a
# TypeError in Python 3 if different types are present in the same
# collection. It can also raise false negatives for types that don't have
# a deterministic sort order, like pyarrow Tables as of 0.14.1
if not equals_fn:
equals_fn = lambda e, a: e == a
try:
sorted_expected = sorted(expected)
sorted_actual = sorted(actual)
if sorted_expected == sorted_actual:
return
except TypeError:
pass
# Slower method, used in two cases:
# 1) If sorted expected != actual, use this method to verify the inequality.
# This ensures we don't raise any false negatives for types that don't
# have a deterministic sort order.
# 2) As a fallback if we encounter a TypeError in python 3. this method
# works on collections that have different types.
unexpected = []
for element in actual:
found = False
for i, v in enumerate(expected_list):
if equals_fn(v, element):
found = True
expected_list.pop(i)
break
if not found:
unexpected.append(element)
if unexpected or expected_list:
msg = 'Failed assert: %r == %r' % (expected, actual)
if unexpected:
msg = msg + ', unexpected elements %r' % unexpected
if expected_list:
msg = msg + ', missing elements %r' % expected_list
> raise BeamAssertException(msg)
E apache_beam.testing.util.BeamAssertException: Failed assert: ['/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpt59wk5_5/tmpv2ai0x7v', '/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpt59wk5_5/tmpv2ai0x7v', '/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpt59wk5_5/extra'] == ['/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpt59wk5_5/tmpv2ai0x7v', '/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpt59wk5_5/extra', '/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpt59wk5_5/tmpv2ai0x7v', '/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpt59wk5_5/extra'], unexpected elements ['/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39/py39/tmp/tmpt59wk5_5/extra'] [while running 'assert_that/Match']
target/.tox-py39/py39/lib/python3.9/site-packages/apache_beam/testing/util.py:192: BeamAssertException
Check warning on line 0 in apache_beam.typehints.typed_pipeline_test.MainInputTest
github-actions / Test Results
1 out of 2 runs failed: test_filter_type_hint (apache_beam.typehints.typed_pipeline_test.MainInputTest)
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 5s]
Raw output
AssertionError: Lists differ: [1, 3] != [1, 3, 1, 3]
Second list contains 2 additional elements.
First extra element 2:
1
- [1, 3]
+ [1, 3, 1, 3]
self = <apache_beam.typehints.typed_pipeline_test.MainInputTest testMethod=test_filter_type_hint>
def test_filter_type_hint(self):
@typehints.with_input_types(int)
def filter_fn(data):
return data % 2
> self.assertEqual([1, 3], [1, 2, 3] | beam.Filter(filter_fn))
E AssertionError: Lists differ: [1, 3] != [1, 3, 1, 3]
E
E Second list contains 2 additional elements.
E First extra element 2:
E 1
E
E - [1, 3]
E + [1, 3, 1, 3]
apache_beam/typehints/typed_pipeline_test.py:192: AssertionError
Check warning on line 0 in apache_beam.typehints.typed_pipeline_test.MainInputTest
github-actions / Test Results
1 out of 2 runs failed: test_non_function (apache_beam.typehints.typed_pipeline_test.MainInputTest)
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 5s]
Raw output
AssertionError: Lists differ: ['A', 'BB', 'C'] != ['A', 'A', 'BB', 'BB', 'C', 'C']
First differing element 1:
'BB'
'A'
Second list contains 3 additional elements.
First extra element 3:
'BB'
- ['A', 'BB', 'C']
+ ['A', 'A', 'BB', 'BB', 'C', 'C']
self = <apache_beam.typehints.typed_pipeline_test.MainInputTest testMethod=test_non_function>
def test_non_function(self):
result = ['a', 'bb', 'c'] | beam.Map(str.upper)
> self.assertEqual(['A', 'BB', 'C'], sorted(result))
E AssertionError: Lists differ: ['A', 'BB', 'C'] != ['A', 'A', 'BB', 'BB', 'C', 'C']
E
E First differing element 1:
E 'BB'
E 'A'
E
E Second list contains 3 additional elements.
E First extra element 3:
E 'BB'
E
E - ['A', 'BB', 'C']
E + ['A', 'A', 'BB', 'BB', 'C', 'C']
apache_beam/typehints/typed_pipeline_test.py:56: AssertionError
Check warning on line 0 in apache_beam.io.textio_test.TextSinkTest
github-actions / Test Results
1 out of 2 runs failed: test_write_max_bytes_per_shard (apache_beam.io.textio_test.TextSinkTest)
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 6s]
Raw output
AssertionError: Lists differ: [b'',[1868 chars]xxxxx', b'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx[3530 chars]xxx'] != [b'',[1868 chars]xxxxxx', b'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx[3526 chars]xxx']
First differing element 56:
b'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
b'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
First list contains 1 additional elements.
First extra element 100:
b'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
Diff is 6236 characters long. Set self.maxDiff to None to see it.
self = <apache_beam.io.textio_test.TextSinkTest testMethod=test_write_max_bytes_per_shard>
def test_write_max_bytes_per_shard(self):
bytes_per_shard = 300
max_len = 100
lines = [b'x' * i for i in range(max_len)]
header = b'a' * 20
footer = b'b' * 30
with TestPipeline() as p:
# pylint: disable=expression-not-assigned
p | beam.core.Create(lines) | WriteToText(
self.path,
header=header,
footer=footer,
max_bytes_per_shard=bytes_per_shard)
read_result = []
for file_name in glob.glob(self.path + '*'):
with open(file_name, 'rb') as f:
contents = f.read()
self.assertLessEqual(
len(contents), bytes_per_shard + max_len + len(footer) + 2)
shard_lines = list(contents.splitlines())
self.assertEqual(shard_lines[0], header)
self.assertEqual(shard_lines[-1], footer)
read_result.extend(shard_lines[1:-1])
> self.assertEqual(sorted(read_result), sorted(lines))
E AssertionError: Lists differ: [b'',[1868 chars]xxxxx', b'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx[3530 chars]xxx'] != [b'',[1868 chars]xxxxxx', b'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx[3526 chars]xxx']
E
E First differing element 56:
E b'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
E b'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
E
E First list contains 1 additional elements.
E First extra element 100:
E b'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
E
E Diff is 6236 characters long. Set self.maxDiff to None to see it.
apache_beam/io/textio_test.py:1744: AssertionError
Check warning on line 0 in apache_beam.typehints.typed_pipeline_test.MainInputTest
github-actions / Test Results
1 out of 2 runs failed: test_typed_callable_instance (apache_beam.typehints.typed_pipeline_test.MainInputTest)
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloud.xml [took 4s]
Raw output
AssertionError: Lists differ: ['1', '2', '3'] != ['1', '1', '2', '2', '3', '3']
First differing element 1:
'2'
'1'
Second list contains 3 additional elements.
First extra element 3:
'2'
- ['1', '2', '3']
+ ['1', '1', '2', '2', '3', '3']
self = <apache_beam.typehints.typed_pipeline_test.MainInputTest testMethod=test_typed_callable_instance>
def test_typed_callable_instance(self):
# Type hints applied to ParDo instance take precedence over callable
# decorators and annotations.
@typehints.with_input_types(typehints.Tuple[int, int])
@typehints.with_output_types(typehints.Generator[int])
def do_fn(element: typehints.Tuple[int, int]) -> typehints.Generator[str]:
yield str(element)
pardo = beam.ParDo(do_fn).with_input_types(int).with_output_types(str)
result = [1, 2, 3] | pardo
> self.assertEqual(['1', '2', '3'], sorted(result))
E AssertionError: Lists differ: ['1', '2', '3'] != ['1', '1', '2', '2', '3', '3']
E
E First differing element 1:
E '2'
E '1'
E
E Second list contains 3 additional elements.
E First extra element 3:
E '2'
E
E - ['1', '2', '3']
E + ['1', '1', '2', '2', '3', '3']
apache_beam/typehints/typed_pipeline_test.py:177: AssertionError
Check warning on line 0 in apache_beam.typehints.typed_pipeline_test.MainInputTest
github-actions / Test Results
1 out of 2 runs failed: test_typed_callable_iterable_output (apache_beam.typehints.typed_pipeline_test.MainInputTest)
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 6s]
Raw output
AssertionError: Lists differ: [['1', '1'], ['2', '2']] != [['1', '1'], ['1', '1'], ['2', '2'], ['2', '2']]
First differing element 1:
['2', '2']
['1', '1']
Second list contains 2 additional elements.
First extra element 2:
['2', '2']
- [['1', '1'], ['2', '2']]
+ [['1', '1'], ['1', '1'], ['2', '2'], ['2', '2']]
self = <apache_beam.typehints.typed_pipeline_test.MainInputTest testMethod=test_typed_callable_iterable_output>
def test_typed_callable_iterable_output(self):
# Only the outer Iterable should be stripped.
def do_fn(element: int) -> typehints.Iterable[typehints.Iterable[str]]:
return [[str(element)] * 2]
result = [1, 2] | beam.ParDo(do_fn)
> self.assertEqual([['1', '1'], ['2', '2']], sorted(result))
E AssertionError: Lists differ: [['1', '1'], ['2', '2']] != [['1', '1'], ['1', '1'], ['2', '2'], ['2', '2']]
E
E First differing element 1:
E ['2', '2']
E ['1', '1']
E
E Second list contains 2 additional elements.
E First extra element 2:
E ['2', '2']
E
E - [['1', '1'], ['2', '2']]
E + [['1', '1'], ['1', '1'], ['2', '2'], ['2', '2']]
apache_beam/typehints/typed_pipeline_test.py:141: AssertionError
Check warning on line 0 in apache_beam.typehints.typed_pipeline_test.MainInputTest
github-actions / Test Results
All 2 runs failed: test_typed_dofn_class (apache_beam.typehints.typed_pipeline_test.MainInputTest)
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloud.xml [took 5s]
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39.xml [took 5s]
Raw output
AssertionError: Lists differ: ['1', '2', '3'] != ['1', '1', '2', '2', '3', '3']
First differing element 1:
'2'
'1'
Second list contains 3 additional elements.
First extra element 3:
'2'
- ['1', '2', '3']
+ ['1', '1', '2', '2', '3', '3']
self = <apache_beam.typehints.typed_pipeline_test.MainInputTest testMethod=test_typed_dofn_class>
def test_typed_dofn_class(self):
@typehints.with_input_types(int)
@typehints.with_output_types(str)
class MyDoFn(beam.DoFn):
def process(self, element):
return [str(element)]
result = [1, 2, 3] | beam.ParDo(MyDoFn())
> self.assertEqual(['1', '2', '3'], sorted(result))
E AssertionError: Lists differ: ['1', '2', '3'] != ['1', '1', '2', '2', '3', '3']
E
E First differing element 1:
E '2'
E '1'
E
E Second list contains 3 additional elements.
E First extra element 3:
E '2'
E
E - ['1', '2', '3']
E + ['1', '1', '2', '2', '3', '3']
apache_beam/typehints/typed_pipeline_test.py:88: AssertionError