Enable prism by default (where supported) #26666
86 fail, 1 090 skipped, 6 856 pass in 1h 12m 56s
Annotations
Check warning on line 0 in apache_beam.dataframe.io_test.IOTest
github-actions / Test Results
test_read_write_0_csv (apache_beam.dataframe.io_test.IOTest) failed
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloudcoverage.xml [took 2m 8s]
Raw output
AssertionError: DataFrame are different
DataFrame shape mismatch
[left]: (3, 2)
[right]: (6, 2) [while running 'assert_that/Match']
self = <apache_beam.runners.common.DoFnRunner object at 0x79e61e86a340>
windowed_value = ([ label rank
2 389a 2, label rank
0 11a 0
1 37a 1
2 389a 2, label rank
0 11a 0
1 ...3371950454.773, (GlobalWindow,), PaneInfo(first: True, last: True, timing: ON_TIME, index: 0, nonspeculative_index: 0))
def process(self, windowed_value):
# type: (WindowedValue) -> Iterable[SplitResultResidual]
try:
> return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:1495:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/runners/common.py:684: in invoke_process
windowed_value, self.process_method(windowed_value.value))
apache_beam/transforms/core.py:2086: in <lambda>
wrapper = lambda x: [fn(x)]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
actual = label rank
2 389a 2
0 11a 0
1 37a 1
2 389a 2
0 11a 0
1 37a 1
def check(actual):
expected = expected_
try:
actual = pd.concat(actual)
if not check_index:
expected = expected.sort_values(list(
expected.columns)).reset_index(drop=True)
actual = actual.sort_values(list(
actual.columns)).reset_index(drop=True)
if not check_names:
actual = actual.rename(
columns=dict(zip(actual.columns, expected.columns)))
> return assert_frame_equal(expected, actual, check_like=True)
E AssertionError: DataFrame are different
E
E DataFrame shape mismatch
E [left]: (3, 2)
E [right]: (6, 2)
apache_beam/dataframe/io_test.py:215: AssertionError
During handling of the above exception, another exception occurred:
a = (<apache_beam.dataframe.io_test.IOTest testMethod=test_read_write_0_csv>,)
kw = {}
@wraps(func)
def standalone_func(*a, **kw):
> return func(*(a + p.args), **p.kwargs, **kw)
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/parameterized/parameterized.py:620:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/dataframe/io_test.py:180: in test_read_write
self._run_read_write_test(
apache_beam/dataframe/io_test.py:242: in _run_read_write_test
assert_that(result, frame_equal_to(df, **check_options))
apache_beam/pipeline.py:644: in __exit__
self.result = self.run()
apache_beam/pipeline.py:618: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/direct/direct_runner.py:190: in run_pipeline
return runner.run_pipeline(pipeline, options)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:196: in run_pipeline
self._latest_run_result = self.run_via_runner_api(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:223: in run_via_runner_api
return self.run_stages(stage_context, stages)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:470: in run_stages
bundle_results = self._execute_bundle(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:795: in _execute_bundle
self._run_bundle(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:1034: in _run_bundle
result, splits = bundle_manager.process_bundle(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:1360: in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
apache_beam/runners/portability/fn_api_runner/worker_handlers.py:386: in push
response = self.worker.do_instruction(request)
apache_beam/runners/worker/sdk_worker.py:658: in do_instruction
return getattr(self, request_type)(
apache_beam/runners/worker/sdk_worker.py:696: in process_bundle
bundle_processor.process_bundle(instruction_id))
apache_beam/runners/worker/bundle_processor.py:1274: in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
apache_beam/runners/worker/bundle_processor.py:237: in process_encoded
self.output(decoded_value)
apache_beam/runners/worker/operations.py:568: in output
_cast_to_receiver(self.receivers[output_index]).receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:950: in process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in _reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in _write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:950: in process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in _reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in _write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:950: in process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in _reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in _write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:950: in process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1606: in _reraise_augmented
raise new_exn
apache_beam/runners/common.py:1495: in process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:684: in invoke_process
windowed_value, self.process_method(windowed_value.value))
apache_beam/transforms/core.py:2086: in <lambda>
wrapper = lambda x: [fn(x)]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
actual = label rank
2 389a 2
0 11a 0
1 37a 1
2 389a 2
0 11a 0
1 37a 1
def check(actual):
expected = expected_
try:
actual = pd.concat(actual)
if not check_index:
expected = expected.sort_values(list(
expected.columns)).reset_index(drop=True)
actual = actual.sort_values(list(
actual.columns)).reset_index(drop=True)
if not check_names:
actual = actual.rename(
columns=dict(zip(actual.columns, expected.columns)))
> return assert_frame_equal(expected, actual, check_like=True)
E AssertionError: DataFrame are different
E
E DataFrame shape mismatch
E [left]: (3, 2)
E [right]: (6, 2) [while running 'assert_that/Match']
apache_beam/dataframe/io_test.py:215: AssertionError
Check warning on line 0 in apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery
github-actions / Test Results
test_get_table_non_transient_exception_0 (apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery) failed
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloudcoverage.xml [took 8s]
Raw output
AssertionError: Exception not raised
a = (<apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery testMethod=test_get_table_non_transient_exception_0>,)
kw = {}
@wraps(func)
def standalone_func(*a, **kw):
> return func(*(a + p.args), **p.kwargs, **kw)
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/parameterized/parameterized.py:620:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/io/gcp/bigquery_test.py:655: in test_get_table_non_transient_exception
_ = p | beam.io.ReadFromBigQuery(
E AssertionError: Exception not raised
Check warning on line 0 in apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery
github-actions / Test Results
test_get_table_non_transient_exception_1 (apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery) failed
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloudcoverage.xml [took 8s]
Raw output
AssertionError: Exception not raised
a = (<apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery testMethod=test_get_table_non_transient_exception_1>,)
kw = {}
@wraps(func)
def standalone_func(*a, **kw):
> return func(*(a + p.args), **p.kwargs, **kw)
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/parameterized/parameterized.py:620:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/io/gcp/bigquery_test.py:655: in test_get_table_non_transient_exception
_ = p | beam.io.ReadFromBigQuery(
E AssertionError: Exception not raised
Check warning on line 0 in apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery
github-actions / Test Results
test_get_table_non_transient_exception_2 (apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery) failed
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloudcoverage.xml [took 8s]
Raw output
AssertionError: Exception not raised
a = (<apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery testMethod=test_get_table_non_transient_exception_2>,)
kw = {}
@wraps(func)
def standalone_func(*a, **kw):
> return func(*(a + p.args), **p.kwargs, **kw)
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/parameterized/parameterized.py:620:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/io/gcp/bigquery_test.py:655: in test_get_table_non_transient_exception
_ = p | beam.io.ReadFromBigQuery(
E AssertionError: Exception not raised
Check warning on line 0 in apache_beam.io.mongodbio_test.WriteMongoFnTest
github-actions / Test Results
test_process (apache_beam.io.mongodbio_test.WriteMongoFnTest) failed
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloudcoverage.xml [took 8s]
Raw output
AssertionError: 2 != 4
self = <apache_beam.io.mongodbio_test.WriteMongoFnTest testMethod=test_process>
mock_sink = <MagicMock name='_MongoSink' id='135609300687360'>
@mock.patch('apache_beam.io.mongodbio._MongoSink')
def test_process(self, mock_sink):
docs = [{'x': 1}, {'x': 2}, {'x': 3}]
with TestPipeline() as p:
_ = (
p | "Create" >> beam.Create(docs)
| "Write" >> beam.ParDo(_WriteMongoFn(batch_size=2)))
p.run()
> self.assertEqual(
2, mock_sink.return_value.__enter__.return_value.write.call_count)
E AssertionError: 2 != 4
apache_beam/io/mongodbio_test.py:620: AssertionError
Check warning on line 0 in apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery
github-actions / Test Results
test_get_table_transient_exception_0 (apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery) failed
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloudcoverage.xml [took 8s]
Raw output
AssertionError: 0 != 2
a = (<apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery testMethod=test_get_table_transient_exception_0>,)
kw = {}
@wraps(func)
def standalone_func(*a, **kw):
> return func(*(a + p.args), **p.kwargs, **kw)
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/parameterized/parameterized.py:620:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/io/gcp/bigquery_test.py:548: in test_get_table_transient_exception
self.assertEqual(expected_retries, mock_get_table.call_count - 2)
E AssertionError: 0 != 2
Check warning on line 0 in apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery
github-actions / Test Results
test_get_table_transient_exception_1 (apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery) failed
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloudcoverage.xml [took 8s]
Raw output
AssertionError: 2 != 4
a = (<apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery testMethod=test_get_table_transient_exception_1>,)
kw = {}
@wraps(func)
def standalone_func(*a, **kw):
> return func(*(a + p.args), **p.kwargs, **kw)
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/parameterized/parameterized.py:620:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/io/gcp/bigquery_test.py:548: in test_get_table_transient_exception
self.assertEqual(expected_retries, mock_get_table.call_count - 2)
E AssertionError: 2 != 4
Check warning on line 0 in apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery
github-actions / Test Results
test_get_table_transient_exception_2 (apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery) failed
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloudcoverage.xml [took 9s]
Raw output
AssertionError: 3 != 5
a = (<apache_beam.io.gcp.bigquery_test.TestReadFromBigQuery testMethod=test_get_table_transient_exception_2>,)
kw = {}
@wraps(func)
def standalone_func(*a, **kw):
> return func(*(a + p.args), **p.kwargs, **kw)
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/parameterized/parameterized.py:620:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/io/gcp/bigquery_test.py:548: in test_get_table_transient_exception
self.assertEqual(expected_retries, mock_get_table.call_count - 2)
E AssertionError: 3 != 5
Check warning on line 0 in apache_beam.io.gcp.bigquery_test.TestWriteToBigQuery
github-actions / Test Results
test_streaming_inserts_flush_on_byte_size_limit (apache_beam.io.gcp.bigquery_test.TestWriteToBigQuery) failed
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloudcoverage.xml [took 11s]
Raw output
AssertionError: 2 != 4
self = <apache_beam.io.gcp.bigquery_test.TestWriteToBigQuery testMethod=test_streaming_inserts_flush_on_byte_size_limit>
mock_insert = <MagicMock name='insert_rows_json' id='134343294682832'>
@mock.patch('google.cloud.bigquery.Client.insert_rows_json')
def test_streaming_inserts_flush_on_byte_size_limit(self, mock_insert):
mock_insert.return_value = []
table = 'project:dataset.table'
rows = [
{
'columnA': 'value1'
},
{
'columnA': 'value2'
},
# this very large row exceeds max size, so should be sent to DLQ
{
'columnA': "large_string" * 100
}
]
with beam.Pipeline() as p:
failed_rows = (
p
| beam.Create(rows)
| WriteToBigQuery(
table=table,
method='STREAMING_INSERTS',
create_disposition='CREATE_NEVER',
schema='columnA:STRING',
max_insert_payload_size=500))
expected_failed_rows = [(table, rows[2])]
assert_that(failed_rows.failed_rows, equal_to(expected_failed_rows))
> self.assertEqual(2, mock_insert.call_count)
E AssertionError: 2 != 4
apache_beam/io/gcp/bigquery_test.py:1096: AssertionError
Check warning on line 0 in apache_beam.io.avroio_test.TestFastAvro
github-actions / Test Results
test_read_all_continuously_new (apache_beam.io.avroio_test.TestFastAvro) failed
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloudcoverage.xml [took 11s]
Raw output
apache_beam.testing.util.BeamAssertException: Failed assert: [('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})] == [('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})], unexpected elements [('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'})] [while running 'assert read new files results/Match']
self = <apache_beam.runners.common.DoFnRunner object at 0x7cdbbc46ed00>
windowed_value = ([('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Henry', 'favorite...3371950454.773, (GlobalWindow,), PaneInfo(first: True, last: True, timing: ON_TIME, index: 0, nonspeculative_index: 0))
def process(self, windowed_value):
# type: (WindowedValue) -> Iterable[SplitResultResidual]
try:
> return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:1495:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/runners/common.py:911: in invoke_process
self._invoke_process_per_window(
apache_beam/runners/common.py:1055: in _invoke_process_per_window
self.process_method(*args_for_process, **kwargs_for_process),
apache_beam/transforms/core.py:2084: in <lambda>
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
actual = [('file1', {'favorite_color': 'blue', 'favorite_number': 1, 'name': 'Thomas'}), ('file1', {'favorite_color': 'green', ... 'favorite_number': 3, 'name': 'Henry'}), ('file2', {'favorite_color': 'brown', 'favorite_number': 7, 'name': 'Toby'})]
equals_fn = <function equal_to.<locals>._equal.<locals>.<lambda> at 0x7cdbbc73a4c0>
def _equal(actual, equals_fn=equals_fn):
expected_list = list(expected)
# Try to compare actual and expected by sorting. This fails with a
# TypeError in Python 3 if different types are present in the same
# collection. It can also raise false negatives for types that don't have
# a deterministic sort order, like pyarrow Tables as of 0.14.1
if not equals_fn:
equals_fn = lambda e, a: e == a
try:
sorted_expected = sorted(expected)
sorted_actual = sorted(actual)
if sorted_expected == sorted_actual:
return
except TypeError:
pass
# Slower method, used in two cases:
# 1) If sorted expected != actual, use this method to verify the inequality.
# This ensures we don't raise any false negatives for types that don't
# have a deterministic sort order.
# 2) As a fallback if we encounter a TypeError in python 3. this method
# works on collections that have different types.
unexpected = []
for element in actual:
found = False
for i, v in enumerate(expected_list):
if equals_fn(v, element):
found = True
expected_list.pop(i)
break
if not found:
unexpected.append(element)
if unexpected or expected_list:
msg = 'Failed assert: %r == %r' % (expected, actual)
if unexpected:
msg = msg + ', unexpected elements %r' % unexpected
if expected_list:
msg = msg + ', missing elements %r' % expected_list
> raise BeamAssertException(msg)
E apache_beam.testing.util.BeamAssertException: Failed assert: [('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})] == [('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})], unexpected elements [('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'})]
apache_beam/testing/util.py:192: BeamAssertException
During handling of the above exception, another exception occurred:
self = <apache_beam.io.avroio_test.TestFastAvro testMethod=test_read_all_continuously_new>
def test_read_all_continuously_new(self):
with TestPipeline() as pipeline:
tempdir = tempfile.mkdtemp()
writer_fn = self._WriteFilesFn(self.SCHEMA, self.RECORDS, tempdir)
with open(FileSystems.join(tempdir, 'file1'), 'wb') as f:
writer(f, writer_fn.SCHEMA, writer_fn.gen_records(1))
match_pattern = FileSystems.join(tempdir, '*')
interval = 0.5
last = 2
p_read_once = (
pipeline
| 'Continuously read new files' >> avroio.ReadAllFromAvroContinuously(
match_pattern,
with_filename=True,
start_timestamp=Timestamp.now(),
interval=interval,
stop_timestamp=Timestamp.now() + last,
match_updated_files=False)
| 'add dumb key' >> beam.Map(lambda x: (0, x))
| 'Write files on-the-fly' >> beam.ParDo(writer_fn))
> assert_that(
p_read_once,
equal_to(writer_fn.get_expect(match_updated_files=False)),
label='assert read new files results')
apache_beam/io/avroio_test.py:521:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/pipeline.py:644: in __exit__
self.result = self.run()
apache_beam/testing/test_pipeline.py:115: in run
result = super().run(
apache_beam/pipeline.py:591: in run
return Pipeline.from_runner_api(
apache_beam/pipeline.py:618: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/direct/direct_runner.py:190: in run_pipeline
return runner.run_pipeline(pipeline, options)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:196: in run_pipeline
self._latest_run_result = self.run_via_runner_api(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:223: in run_via_runner_api
return self.run_stages(stage_context, stages)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:470: in run_stages
bundle_results = self._execute_bundle(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:795: in _execute_bundle
self._run_bundle(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:1034: in _run_bundle
result, splits = bundle_manager.process_bundle(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:1360: in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
apache_beam/runners/portability/fn_api_runner/worker_handlers.py:386: in push
response = self.worker.do_instruction(request)
apache_beam/runners/worker/sdk_worker.py:658: in do_instruction
return getattr(self, request_type)(
apache_beam/runners/worker/sdk_worker.py:696: in process_bundle
bundle_processor.process_bundle(instruction_id))
apache_beam/runners/worker/bundle_processor.py:1274: in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
apache_beam/runners/worker/bundle_processor.py:237: in process_encoded
self.output(decoded_value)
apache_beam/runners/worker/operations.py:568: in output
_cast_to_receiver(self.receivers[output_index]).receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:950: in process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in _reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in _write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:950: in process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in _reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in _write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:950: in process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in _reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in _write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:950: in process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1606: in _reraise_augmented
raise new_exn
apache_beam/runners/common.py:1495: in process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:911: in invoke_process
self._invoke_process_per_window(
apache_beam/runners/common.py:1055: in _invoke_process_per_window
self.process_method(*args_for_process, **kwargs_for_process),
apache_beam/transforms/core.py:2084: in <lambda>
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
actual = [('file1', {'favorite_color': 'blue', 'favorite_number': 1, 'name': 'Thomas'}), ('file1', {'favorite_color': 'green', ... 'favorite_number': 3, 'name': 'Henry'}), ('file2', {'favorite_color': 'brown', 'favorite_number': 7, 'name': 'Toby'})]
equals_fn = <function equal_to.<locals>._equal.<locals>.<lambda> at 0x7cdbbc73a4c0>
def _equal(actual, equals_fn=equals_fn):
expected_list = list(expected)
# Try to compare actual and expected by sorting. This fails with a
# TypeError in Python 3 if different types are present in the same
# collection. It can also raise false negatives for types that don't have
# a deterministic sort order, like pyarrow Tables as of 0.14.1
if not equals_fn:
equals_fn = lambda e, a: e == a
try:
sorted_expected = sorted(expected)
sorted_actual = sorted(actual)
if sorted_expected == sorted_actual:
return
except TypeError:
pass
# Slower method, used in two cases:
# 1) If sorted expected != actual, use this method to verify the inequality.
# This ensures we don't raise any false negatives for types that don't
# have a deterministic sort order.
# 2) As a fallback if we encounter a TypeError in python 3. this method
# works on collections that have different types.
unexpected = []
for element in actual:
found = False
for i, v in enumerate(expected_list):
if equals_fn(v, element):
found = True
expected_list.pop(i)
break
if not found:
unexpected.append(element)
if unexpected or expected_list:
msg = 'Failed assert: %r == %r' % (expected, actual)
if unexpected:
msg = msg + ', unexpected elements %r' % unexpected
if expected_list:
msg = msg + ', missing elements %r' % expected_list
> raise BeamAssertException(msg)
E apache_beam.testing.util.BeamAssertException: Failed assert: [('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})] == [('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})], unexpected elements [('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'})] [while running 'assert read new files results/Match']
apache_beam/testing/util.py:192: BeamAssertException
Check warning on line 0 in apache_beam.io.avroio_test.TestFastAvro
github-actions / Test Results
test_read_all_continuously_update (apache_beam.io.avroio_test.TestFastAvro) failed
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloudcoverage.xml [took 11s]
Raw output
apache_beam.testing.util.BeamAssertException: Failed assert: [('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})] == [('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'}), ('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})], unexpected elements [('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})] [while running 'assert read updated files results/Match']
self = <apache_beam.runners.common.DoFnRunner object at 0x7cdbc01c7280>
windowed_value = ([('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Henry', 'favorite...3371950454.773, (GlobalWindow,), PaneInfo(first: True, last: True, timing: ON_TIME, index: 0, nonspeculative_index: 0))
def process(self, windowed_value):
# type: (WindowedValue) -> Iterable[SplitResultResidual]
try:
> return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:1495:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/runners/common.py:911: in invoke_process
self._invoke_process_per_window(
apache_beam/runners/common.py:1055: in _invoke_process_per_window
self.process_method(*args_for_process, **kwargs_for_process),
apache_beam/transforms/core.py:2084: in <lambda>
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
actual = [('file1', {'favorite_color': 'blue', 'favorite_number': 1, 'name': 'Thomas'}), ('file1', {'favorite_color': 'green', ...orite_number': 7, 'name': 'Toby'}), ('file1', {'favorite_color': 'blue', 'favorite_number': 1, 'name': 'Thomas'}), ...]
equals_fn = <function equal_to.<locals>._equal.<locals>.<lambda> at 0x7cdbbc5ae550>
def _equal(actual, equals_fn=equals_fn):
expected_list = list(expected)
# Try to compare actual and expected by sorting. This fails with a
# TypeError in Python 3 if different types are present in the same
# collection. It can also raise false negatives for types that don't have
# a deterministic sort order, like pyarrow Tables as of 0.14.1
if not equals_fn:
equals_fn = lambda e, a: e == a
try:
sorted_expected = sorted(expected)
sorted_actual = sorted(actual)
if sorted_expected == sorted_actual:
return
except TypeError:
pass
# Slower method, used in two cases:
# 1) If sorted expected != actual, use this method to verify the inequality.
# This ensures we don't raise any false negatives for types that don't
# have a deterministic sort order.
# 2) As a fallback if we encounter a TypeError in python 3. this method
# works on collections that have different types.
unexpected = []
for element in actual:
found = False
for i, v in enumerate(expected_list):
if equals_fn(v, element):
found = True
expected_list.pop(i)
break
if not found:
unexpected.append(element)
if unexpected or expected_list:
msg = 'Failed assert: %r == %r' % (expected, actual)
if unexpected:
msg = msg + ', unexpected elements %r' % unexpected
if expected_list:
msg = msg + ', missing elements %r' % expected_list
> raise BeamAssertException(msg)
E apache_beam.testing.util.BeamAssertException: Failed assert: [('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})] == [('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'}), ('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})], unexpected elements [('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})]
apache_beam/testing/util.py:192: BeamAssertException
During handling of the above exception, another exception occurred:
self = <apache_beam.io.avroio_test.TestFastAvro testMethod=test_read_all_continuously_update>
def test_read_all_continuously_update(self):
with TestPipeline() as pipeline:
tempdir = tempfile.mkdtemp()
writer_fn = self._WriteFilesFn(self.SCHEMA, self.RECORDS, tempdir)
with open(FileSystems.join(tempdir, 'file1'), 'wb') as f:
writer(f, writer_fn.SCHEMA, writer_fn.gen_records(1))
match_pattern = FileSystems.join(tempdir, '*')
interval = 0.5
last = 2
p_read_upd = (
pipeline
| 'Continuously read updated files' >>
avroio.ReadAllFromAvroContinuously(
match_pattern,
with_filename=True,
start_timestamp=Timestamp.now(),
interval=interval,
stop_timestamp=Timestamp.now() + last,
match_updated_files=True)
| 'add dumb key' >> beam.Map(lambda x: (0, x))
| 'Write files on-the-fly' >> beam.ParDo(writer_fn))
> assert_that(
p_read_upd,
equal_to(writer_fn.get_expect(match_updated_files=True)),
label='assert read updated files results')
apache_beam/io/avroio_test.py:548:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/pipeline.py:644: in __exit__
self.result = self.run()
apache_beam/testing/test_pipeline.py:115: in run
result = super().run(
apache_beam/pipeline.py:591: in run
return Pipeline.from_runner_api(
apache_beam/pipeline.py:618: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/direct/direct_runner.py:190: in run_pipeline
return runner.run_pipeline(pipeline, options)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:196: in run_pipeline
self._latest_run_result = self.run_via_runner_api(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:223: in run_via_runner_api
return self.run_stages(stage_context, stages)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:470: in run_stages
bundle_results = self._execute_bundle(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:795: in _execute_bundle
self._run_bundle(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:1034: in _run_bundle
result, splits = bundle_manager.process_bundle(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:1360: in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
apache_beam/runners/portability/fn_api_runner/worker_handlers.py:386: in push
response = self.worker.do_instruction(request)
apache_beam/runners/worker/sdk_worker.py:658: in do_instruction
return getattr(self, request_type)(
apache_beam/runners/worker/sdk_worker.py:696: in process_bundle
bundle_processor.process_bundle(instruction_id))
apache_beam/runners/worker/bundle_processor.py:1274: in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
apache_beam/runners/worker/bundle_processor.py:237: in process_encoded
self.output(decoded_value)
apache_beam/runners/worker/operations.py:568: in output
_cast_to_receiver(self.receivers[output_index]).receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:950: in process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in _reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in _write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:950: in process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in _reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in _write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:950: in process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in _reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in _write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:950: in process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1606: in _reraise_augmented
raise new_exn
apache_beam/runners/common.py:1495: in process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:911: in invoke_process
self._invoke_process_per_window(
apache_beam/runners/common.py:1055: in _invoke_process_per_window
self.process_method(*args_for_process, **kwargs_for_process),
apache_beam/transforms/core.py:2084: in <lambda>
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
actual = [('file1', {'favorite_color': 'blue', 'favorite_number': 1, 'name': 'Thomas'}), ('file1', {'favorite_color': 'green', ...orite_number': 7, 'name': 'Toby'}), ('file1', {'favorite_color': 'blue', 'favorite_number': 1, 'name': 'Thomas'}), ...]
equals_fn = <function equal_to.<locals>._equal.<locals>.<lambda> at 0x7cdbbc5ae550>
def _equal(actual, equals_fn=equals_fn):
expected_list = list(expected)
# Try to compare actual and expected by sorting. This fails with a
# TypeError in Python 3 if different types are present in the same
# collection. It can also raise false negatives for types that don't have
# a deterministic sort order, like pyarrow Tables as of 0.14.1
if not equals_fn:
equals_fn = lambda e, a: e == a
try:
sorted_expected = sorted(expected)
sorted_actual = sorted(actual)
if sorted_expected == sorted_actual:
return
except TypeError:
pass
# Slower method, used in two cases:
# 1) If sorted expected != actual, use this method to verify the inequality.
# This ensures we don't raise any false negatives for types that don't
# have a deterministic sort order.
# 2) As a fallback if we encounter a TypeError in python 3. this method
# works on collections that have different types.
unexpected = []
for element in actual:
found = False
for i, v in enumerate(expected_list):
if equals_fn(v, element):
found = True
expected_list.pop(i)
break
if not found:
unexpected.append(element)
if unexpected or expected_list:
msg = 'Failed assert: %r == %r' % (expected, actual)
if unexpected:
msg = msg + ', unexpected elements %r' % unexpected
if expected_list:
msg = msg + ', missing elements %r' % expected_list
> raise BeamAssertException(msg)
E apache_beam.testing.util.BeamAssertException: Failed assert: [('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})] == [('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'}), ('file1', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})], unexpected elements [('file1', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Thomas', 'favorite_number': 1, 'favorite_color': 'blue'}), ('file2', {'name': 'Henry', 'favorite_number': 3, 'favorite_color': 'green'}), ('file2', {'name': 'Toby', 'favorite_number': 7, 'favorite_color': 'brown'})] [while running 'assert read updated files results/Match']
apache_beam/testing/util.py:192: BeamAssertException
Check warning on line 0 in apache_beam.io.textio_test.TextSinkTest
github-actions / Test Results
test_write_max_bytes_per_shard (apache_beam.io.textio_test.TextSinkTest) failed
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloudcoverage.xml [took 10s]
Raw output
AssertionError: Lists differ: [b'', b'', b'x', b'x', b'xx', b'xx', b'xxx', b'xx[10846 chars]xxx'] != [b'', b'x', b'xx', b'xxx', b'xxxx', b'xxxxx', b'x[5396 chars]xxx']
First differing element 1:
b''
b'x'
First list contains 100 additional elements.
First extra element 100:
b'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
Diff is 11822 characters long. Set self.maxDiff to None to see it.
self = <apache_beam.io.textio_test.TextSinkTest testMethod=test_write_max_bytes_per_shard>
def test_write_max_bytes_per_shard(self):
bytes_per_shard = 300
max_len = 100
lines = [b'x' * i for i in range(max_len)]
header = b'a' * 20
footer = b'b' * 30
with TestPipeline() as p:
# pylint: disable=expression-not-assigned
p | beam.core.Create(lines) | WriteToText(
self.path,
header=header,
footer=footer,
max_bytes_per_shard=bytes_per_shard)
read_result = []
for file_name in glob.glob(self.path + '*'):
with open(file_name, 'rb') as f:
contents = f.read()
self.assertLessEqual(
len(contents), bytes_per_shard + max_len + len(footer) + 2)
shard_lines = list(contents.splitlines())
self.assertEqual(shard_lines[0], header)
self.assertEqual(shard_lines[-1], footer)
read_result.extend(shard_lines[1:-1])
> self.assertEqual(sorted(read_result), sorted(lines))
E AssertionError: Lists differ: [b'', b'', b'x', b'x', b'xx', b'xx', b'xxx', b'xx[10846 chars]xxx'] != [b'', b'x', b'xx', b'xxx', b'xxxx', b'xxxxx', b'x[5396 chars]xxx']
E
E First differing element 1:
E b''
E b'x'
E
E First list contains 100 additional elements.
E First extra element 100:
E b'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
E
E Diff is 11822 characters long. Set self.maxDiff to None to see it.
apache_beam/io/textio_test.py:1744: AssertionError
Check warning on line 0 in apache_beam.dataframe.io_test.IOTest
github-actions / Test Results
test_read_write_7_json (apache_beam.dataframe.io_test.IOTest) failed
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloudcoverage.xml [took 20s]
Raw output
AssertionError: DataFrame are different
DataFrame shape mismatch
[left]: (3, 2)
[right]: (6, 2) [while running 'assert_that/Match']
self = <apache_beam.runners.common.DoFnRunner object at 0x79e57077f5b0>
windowed_value = ([ label rank
2000000000000000000 37a 1
2000000000000000001 389a 2, label rank
0 1...3371950454.773, (GlobalWindow,), PaneInfo(first: True, last: True, timing: ON_TIME, index: 0, nonspeculative_index: 0))
def process(self, windowed_value):
# type: (WindowedValue) -> Iterable[SplitResultResidual]
try:
> return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:1495:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/runners/common.py:684: in invoke_process
windowed_value, self.process_method(windowed_value.value))
apache_beam/transforms/core.py:2086: in <lambda>
wrapper = lambda x: [fn(x)]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
actual = label rank
0 11a 0
1 11a 0
2 37a 1
3 37a 1
4 389a 2
5 389a 2
def check(actual):
expected = expected_
try:
actual = pd.concat(actual)
if not check_index:
expected = expected.sort_values(list(
expected.columns)).reset_index(drop=True)
actual = actual.sort_values(list(
actual.columns)).reset_index(drop=True)
if not check_names:
actual = actual.rename(
columns=dict(zip(actual.columns, expected.columns)))
> return assert_frame_equal(expected, actual, check_like=True)
E AssertionError: DataFrame are different
E
E DataFrame shape mismatch
E [left]: (3, 2)
E [right]: (6, 2)
apache_beam/dataframe/io_test.py:215: AssertionError
During handling of the above exception, another exception occurred:
a = (<apache_beam.dataframe.io_test.IOTest testMethod=test_read_write_7_json>,)
kw = {}
@wraps(func)
def standalone_func(*a, **kw):
> return func(*(a + p.args), **p.kwargs, **kw)
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/parameterized/parameterized.py:620:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/dataframe/io_test.py:180: in test_read_write
self._run_read_write_test(
apache_beam/dataframe/io_test.py:242: in _run_read_write_test
assert_that(result, frame_equal_to(df, **check_options))
apache_beam/pipeline.py:644: in __exit__
self.result = self.run()
apache_beam/pipeline.py:618: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/direct/direct_runner.py:190: in run_pipeline
return runner.run_pipeline(pipeline, options)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:196: in run_pipeline
self._latest_run_result = self.run_via_runner_api(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:223: in run_via_runner_api
return self.run_stages(stage_context, stages)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:470: in run_stages
bundle_results = self._execute_bundle(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:795: in _execute_bundle
self._run_bundle(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:1034: in _run_bundle
result, splits = bundle_manager.process_bundle(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:1360: in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
apache_beam/runners/portability/fn_api_runner/worker_handlers.py:386: in push
response = self.worker.do_instruction(request)
apache_beam/runners/worker/sdk_worker.py:658: in do_instruction
return getattr(self, request_type)(
apache_beam/runners/worker/sdk_worker.py:696: in process_bundle
bundle_processor.process_bundle(instruction_id))
apache_beam/runners/worker/bundle_processor.py:1274: in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
apache_beam/runners/worker/bundle_processor.py:237: in process_encoded
self.output(decoded_value)
apache_beam/runners/worker/operations.py:568: in output
_cast_to_receiver(self.receivers[output_index]).receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:950: in process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in _reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in _write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:950: in process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in _reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in _write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:950: in process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in _reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in _write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:950: in process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1606: in _reraise_augmented
raise new_exn
apache_beam/runners/common.py:1495: in process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:684: in invoke_process
windowed_value, self.process_method(windowed_value.value))
apache_beam/transforms/core.py:2086: in <lambda>
wrapper = lambda x: [fn(x)]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
actual = label rank
0 11a 0
1 11a 0
2 37a 1
3 37a 1
4 389a 2
5 389a 2
def check(actual):
expected = expected_
try:
actual = pd.concat(actual)
if not check_index:
expected = expected.sort_values(list(
expected.columns)).reset_index(drop=True)
actual = actual.sort_values(list(
actual.columns)).reset_index(drop=True)
if not check_names:
actual = actual.rename(
columns=dict(zip(actual.columns, expected.columns)))
> return assert_frame_equal(expected, actual, check_like=True)
E AssertionError: DataFrame are different
E
E DataFrame shape mismatch
E [left]: (3, 2)
E [right]: (6, 2) [while running 'assert_that/Match']
apache_beam/dataframe/io_test.py:215: AssertionError
Check warning on line 0 in apache_beam.io.textio_test.TextSinkTest
github-actions / Test Results
test_write_max_records_per_shard (apache_beam.io.textio_test.TextSinkTest) failed
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloudcoverage.xml [took 10s]
Raw output
AssertionError: Lists differ: [b'0'[123 chars]', b'3', b'39', b'4', b'40', b'41', b'42', b'4[511 chars]'99'] != [b'0'[123 chars]', b'26', b'27', b'28', b'29', b'3', b'30', b'[511 chars]'99']
First differing element 19:
b'3'
b'26'
Diff is 1120 characters long. Set self.maxDiff to None to see it.
self = <apache_beam.io.textio_test.TextSinkTest testMethod=test_write_max_records_per_shard>
def test_write_max_records_per_shard(self):
records_per_shard = 13
lines = [str(i).encode('utf-8') for i in range(100)]
with TestPipeline() as p:
# pylint: disable=expression-not-assigned
p | beam.core.Create(lines) | WriteToText(
self.path, max_records_per_shard=records_per_shard)
read_result = []
for file_name in glob.glob(self.path + '*'):
with open(file_name, 'rb') as f:
shard_lines = list(f.read().splitlines())
self.assertLessEqual(len(shard_lines), records_per_shard)
read_result.extend(shard_lines)
> self.assertEqual(sorted(read_result), sorted(lines))
E AssertionError: Lists differ: [b'0'[123 chars]', b'3', b'39', b'4', b'40', b'41', b'42', b'4[511 chars]'99'] != [b'0'[123 chars]', b'26', b'27', b'28', b'29', b'3', b'30', b'[511 chars]'99']
E
E First differing element 19:
E b'3'
E b'26'
E
E Diff is 1120 characters long. Set self.maxDiff to None to see it.
apache_beam/io/textio_test.py:1718: AssertionError
Check warning on line 0 in apache_beam.io.textio_test.TextSinkTest
github-actions / Test Results
test_write_pipeline (apache_beam.io.textio_test.TextSinkTest) failed
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloudcoverage.xml [took 10s]
Raw output
AssertionError: Lists differ: [b'Line 0', b'Line 0', b'Line 1', b'Line 1', b'Line 10', b'L[2315 chars] 99'] != [b'Line 0', b'Line 1', b'Line 10', b'Line 11', b'Line 12', b[1125 chars] 99']
First differing element 1:
b'Line 0'
b'Line 1'
First list contains 100 additional elements.
First extra element 100:
b'Line 54'
Diff is 2980 characters long. Set self.maxDiff to None to see it.
self = <apache_beam.io.textio_test.TextSinkTest testMethod=test_write_pipeline>
def test_write_pipeline(self):
with TestPipeline() as pipeline:
pcoll = pipeline | beam.core.Create(self.lines)
pcoll | 'Write' >> WriteToText(self.path) # pylint: disable=expression-not-assigned
read_result = []
for file_name in glob.glob(self.path + '*'):
with open(file_name, 'rb') as f:
read_result.extend(f.read().splitlines())
> self.assertEqual(sorted(read_result), sorted(self.lines))
E AssertionError: Lists differ: [b'Line 0', b'Line 0', b'Line 1', b'Line 1', b'Line 10', b'L[2315 chars] 99'] != [b'Line 0', b'Line 1', b'Line 10', b'Line 11', b'Line 12', b[1125 chars] 99']
E
E First differing element 1:
E b'Line 0'
E b'Line 1'
E
E First list contains 100 additional elements.
E First extra element 100:
E b'Line 54'
E
E Diff is 2980 characters long. Set self.maxDiff to None to see it.
apache_beam/io/textio_test.py:1610: AssertionError
Check warning on line 0 in apache_beam.io.gcp.bigquery_test.BigQueryStreamingInsertsErrorHandling
github-actions / Test Results
test_insert_rows_json_intermittent_retriable_exception_0 (apache_beam.io.gcp.bigquery_test.BigQueryStreamingInsertsErrorHandling) failed
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloudcoverage.xml [took 13s]
Raw output
RuntimeError: StopIteration [while running 'WriteToBigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)']
args = (<apache_beam.io.gcp.bigquery_tools.BigQueryWrapper object at 0x7a2f3c5b44f0>, 'project', 'dataset', 'table', [{'colum...value1'}, {'columnA': 'value2'}], ['c9c970e9-533c-4b63-864e-3c772dbdb3a6-0', 'c9c970e9-533c-4b63-864e-3c772dbdb3a6-1'])
kwargs = {'ignore_unknown_values': False, 'skip_invalid_rows': True}
retry_intervals = <generator object FuzzedExponentialIntervals.__iter__ at 0x7a2f3c714970>
exn_traceback = None, sleep_interval = 14.107302135101596
@functools.wraps(fun)
def wrapper(*args, **kwargs):
retry_intervals = iter(
FuzzedExponentialIntervals(
initial_delay_secs,
num_retries,
factor,
fuzz=0.5 if fuzz else 0,
max_delay_secs=max_delay_secs,
stop_after_secs=stop_after_secs))
while True:
try:
return fun(*args, **kwargs)
except Exception as exn: # pylint: disable=broad-except
if not retry_filter(exn):
raise
# Get the traceback object for the current exception. The
# sys.exc_info() function returns a tuple with three elements:
# exception type, exception value, and exception traceback.
exn_traceback = sys.exc_info()[2]
try:
try:
> sleep_interval = next(retry_intervals)
E StopIteration
apache_beam/utils/retry.py:310: StopIteration
During handling of the above exception, another exception occurred:
self = <apache_beam.runners.common.DoFnRunner object at 0x7a2f3c15c7c0>
bundle_method = <bound method DoFnInvoker.invoke_finish_bundle of <apache_beam.runners.common.PerWindowInvoker object at 0x7a2f3c15c5e0>>
def _invoke_bundle_method(self, bundle_method):
try:
self.context.set_element(None)
> bundle_method()
apache_beam/runners/common.py:1552:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/runners/common.py:621: in invoke_finish_bundle
self.signature.finish_bundle_method.method_value())
apache_beam/io/gcp/bigquery.py:1645: in finish_bundle
return self._flush_all_batches()
apache_beam/io/gcp/bigquery.py:1653: in _flush_all_batches
*[
apache_beam/io/gcp/bigquery.py:1654: in <listcomp>
self._flush_batch(destination)
apache_beam/io/gcp/bigquery.py:1683: in _flush_batch
passed, errors = self.bigquery_wrapper.insert_rows(
apache_beam/io/gcp/bigquery_tools.py:1293: in insert_rows
result, errors = self._insert_all_rows(
apache_beam/utils/retry.py:313: in wrapper
raise exn.with_traceback(exn_traceback)
apache_beam/utils/retry.py:300: in wrapper
return fun(*args, **kwargs)
apache_beam/io/gcp/bigquery_tools.py:744: in _insert_all_rows
errors = self.gcp_bq_client.insert_rows_json(
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/mock/mock.py:1190: in __call__
return _mock_self._mock_call(*args, **kwargs)
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/mock/mock.py:1194: in _mock_call
return _mock_self._execute_mock_call(*args, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_mock_self = <MagicMock name='insert_rows_json' id='134344757129520'>
args = ('project.dataset.table',)
kwargs = {'ignore_unknown_values': False, 'json_rows': [{'columnA': 'value1'}, {'columnA': 'value2'}], 'row_ids': ['c9c970e9-533c-4b63-864e-3c772dbdb3a6-0', 'c9c970e9-533c-4b63-864e-3c772dbdb3a6-1'], 'skip_invalid_rows': True, ...}
self = <MagicMock name='insert_rows_json' id='134344757129520'>
effect = <list_iterator object at 0x7a2f93861d00>
def _execute_mock_call(_mock_self, *args, **kwargs):
self = _mock_self
# separate from _increment_mock_call so that awaited functions are
# executed separately from their call, also AsyncMock overrides this method
effect = self.side_effect
if effect is not None:
if _is_exception(effect):
raise effect
elif not _callable(effect):
> result = next(effect)
E StopIteration
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/mock/mock.py:1253: StopIteration
During handling of the above exception, another exception occurred:
a = (<apache_beam.io.gcp.bigquery_test.BigQueryStreamingInsertsErrorHandling testMethod=test_insert_rows_json_intermittent_retriable_exception_0>,)
kw = {}
@wraps(func)
def standalone_func(*a, **kw):
> return func(*(a + p.args), **p.kwargs, **kw)
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/parameterized/parameterized.py:620:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/mock/mock.py:1468: in patched
return func(*newargs, **newkeywargs)
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/parameterized/parameterized.py:93: in dummy_func
return orgfunc(*args, **kwargs)
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/mock/mock.py:1468: in patched
return func(*newargs, **newkeywargs)
apache_beam/io/gcp/bigquery_test.py:1540: in test_insert_rows_json_intermittent_retriable_exception
_ = (
apache_beam/pipeline.py:644: in __exit__
self.result = self.run()
apache_beam/pipeline.py:618: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/direct/direct_runner.py:190: in run_pipeline
return runner.run_pipeline(pipeline, options)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:196: in run_pipeline
self._latest_run_result = self.run_via_runner_api(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:223: in run_via_runner_api
return self.run_stages(stage_context, stages)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:470: in run_stages
bundle_results = self._execute_bundle(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:795: in _execute_bundle
self._run_bundle(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:1034: in _run_bundle
result, splits = bundle_manager.process_bundle(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:1360: in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
apache_beam/runners/portability/fn_api_runner/worker_handlers.py:386: in push
response = self.worker.do_instruction(request)
apache_beam/runners/worker/sdk_worker.py:658: in do_instruction
return getattr(self, request_type)(
apache_beam/runners/worker/sdk_worker.py:696: in process_bundle
bundle_processor.process_bundle(instruction_id))
apache_beam/runners/worker/bundle_processor.py:1282: in process_bundle
op.finish()
apache_beam/runners/worker/operations.py:985: in finish
self.dofn_runner.finish()
apache_beam/runners/common.py:1573: in finish
self._invoke_bundle_method(self.do_fn_invoker.invoke_finish_bundle)
apache_beam/runners/common.py:1554: in _invoke_bundle_method
self._reraise_augmented(exn)
apache_beam/runners/common.py:1606: in _reraise_augmented
raise new_exn
apache_beam/runners/common.py:1552: in _invoke_bundle_method
bundle_method()
apache_beam/runners/common.py:621: in invoke_finish_bundle
self.signature.finish_bundle_method.method_value())
apache_beam/io/gcp/bigquery.py:1645: in finish_bundle
return self._flush_all_batches()
apache_beam/io/gcp/bigquery.py:1653: in _flush_all_batches
*[
apache_beam/io/gcp/bigquery.py:1654: in <listcomp>
self._flush_batch(destination)
apache_beam/io/gcp/bigquery.py:1683: in _flush_batch
passed, errors = self.bigquery_wrapper.insert_rows(
apache_beam/io/gcp/bigquery_tools.py:1293: in insert_rows
result, errors = self._insert_all_rows(
apache_beam/utils/retry.py:313: in wrapper
raise exn.with_traceback(exn_traceback)
apache_beam/utils/retry.py:300: in wrapper
return fun(*args, **kwargs)
apache_beam/io/gcp/bigquery_tools.py:744: in _insert_all_rows
errors = self.gcp_bq_client.insert_rows_json(
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/mock/mock.py:1190: in __call__
return _mock_self._mock_call(*args, **kwargs)
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/mock/mock.py:1194: in _mock_call
return _mock_self._execute_mock_call(*args, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_mock_self = <MagicMock name='insert_rows_json' id='134344757129520'>
args = ('project.dataset.table',)
kwargs = {'ignore_unknown_values': False, 'json_rows': [{'columnA': 'value1'}, {'columnA': 'value2'}], 'row_ids': ['c9c970e9-533c-4b63-864e-3c772dbdb3a6-0', 'c9c970e9-533c-4b63-864e-3c772dbdb3a6-1'], 'skip_invalid_rows': True, ...}
self = <MagicMock name='insert_rows_json' id='134344757129520'>
effect = <list_iterator object at 0x7a2f93861d00>
def _execute_mock_call(_mock_self, *args, **kwargs):
self = _mock_self
# separate from _increment_mock_call so that awaited functions are
# executed separately from their call, also AsyncMock overrides this method
effect = self.side_effect
if effect is not None:
if _is_exception(effect):
raise effect
elif not _callable(effect):
> result = next(effect)
E RuntimeError: StopIteration [while running 'WriteToBigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)']
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/mock/mock.py:1253: RuntimeError
Check warning on line 0 in apache_beam.io.avroio_test.TestFastAvro
github-actions / Test Results
test_schema_read_write (apache_beam.io.avroio_test.TestFastAvro) failed
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloudcoverage.xml [took 23s]
Raw output
apache_beam.testing.util.BeamAssertException: Failed assert: ['{"a": 1, "b": ["x", "y"]}', '{"a": 2, "b": ["t", "u"]}'] == ['{"a": 1, "b": ["x", "y"]}', '{"a": 2, "b": ["t", "u"]}', '{"a": 2, "b": ["t", "u"]}', '{"a": 1, "b": ["x", "y"]}'], unexpected elements ['{"a": 2, "b": ["t", "u"]}', '{"a": 1, "b": ["x", "y"]}'] [while running 'assert_that/Match']
self = <apache_beam.runners.common.DoFnRunner object at 0x7cdb7c5eff40>
windowed_value = (['{"a": 1, "b": ["x", "y"]}', '{"a": 2, "b": ["t", "u"]}', '{"a": 2, "b": ["t", "u"]}', '{"a": 1, "b": ["x", "y"]}'],...3371950454.773, (GlobalWindow,), PaneInfo(first: True, last: True, timing: ON_TIME, index: 0, nonspeculative_index: 0))
def process(self, windowed_value):
# type: (WindowedValue) -> Iterable[SplitResultResidual]
try:
> return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:1495:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/runners/common.py:911: in invoke_process
self._invoke_process_per_window(
apache_beam/runners/common.py:1055: in _invoke_process_per_window
self.process_method(*args_for_process, **kwargs_for_process),
apache_beam/transforms/core.py:2084: in <lambda>
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
actual = ['{"a": 1, "b": ["x", "y"]}', '{"a": 2, "b": ["t", "u"]}', '{"a": 2, "b": ["t", "u"]}', '{"a": 1, "b": ["x", "y"]}']
equals_fn = <function equal_to.<locals>._equal.<locals>.<lambda> at 0x7cdbbc50daf0>
def _equal(actual, equals_fn=equals_fn):
expected_list = list(expected)
# Try to compare actual and expected by sorting. This fails with a
# TypeError in Python 3 if different types are present in the same
# collection. It can also raise false negatives for types that don't have
# a deterministic sort order, like pyarrow Tables as of 0.14.1
if not equals_fn:
equals_fn = lambda e, a: e == a
try:
sorted_expected = sorted(expected)
sorted_actual = sorted(actual)
if sorted_expected == sorted_actual:
return
except TypeError:
pass
# Slower method, used in two cases:
# 1) If sorted expected != actual, use this method to verify the inequality.
# This ensures we don't raise any false negatives for types that don't
# have a deterministic sort order.
# 2) As a fallback if we encounter a TypeError in python 3. this method
# works on collections that have different types.
unexpected = []
for element in actual:
found = False
for i, v in enumerate(expected_list):
if equals_fn(v, element):
found = True
expected_list.pop(i)
break
if not found:
unexpected.append(element)
if unexpected or expected_list:
msg = 'Failed assert: %r == %r' % (expected, actual)
if unexpected:
msg = msg + ', unexpected elements %r' % unexpected
if expected_list:
msg = msg + ', missing elements %r' % expected_list
> raise BeamAssertException(msg)
E apache_beam.testing.util.BeamAssertException: Failed assert: ['{"a": 1, "b": ["x", "y"]}', '{"a": 2, "b": ["t", "u"]}'] == ['{"a": 1, "b": ["x", "y"]}', '{"a": 2, "b": ["t", "u"]}', '{"a": 2, "b": ["t", "u"]}', '{"a": 1, "b": ["x", "y"]}'], unexpected elements ['{"a": 2, "b": ["t", "u"]}', '{"a": 1, "b": ["x", "y"]}']
apache_beam/testing/util.py:192: BeamAssertException
During handling of the above exception, another exception occurred:
self = <apache_beam.io.avroio_test.TestFastAvro testMethod=test_schema_read_write>
def test_schema_read_write(self):
with tempfile.TemporaryDirectory() as tmp_dirname:
path = os.path.join(tmp_dirname, 'tmp_filename')
rows = [beam.Row(a=1, b=['x', 'y']), beam.Row(a=2, b=['t', 'u'])]
stable_repr = lambda row: json.dumps(row._asdict())
with TestPipeline() as p:
_ = p | Create(rows) | avroio.WriteToAvro(path) | beam.Map(print)
with TestPipeline() as p:
readback = (
p
| avroio.ReadFromAvro(path + '*', as_rows=True)
| beam.Map(stable_repr))
> assert_that(readback, equal_to([stable_repr(r) for r in rows]))
apache_beam/io/avroio_test.py:171:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/pipeline.py:644: in __exit__
self.result = self.run()
apache_beam/testing/test_pipeline.py:115: in run
result = super().run(
apache_beam/pipeline.py:591: in run
return Pipeline.from_runner_api(
apache_beam/pipeline.py:618: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/direct/direct_runner.py:190: in run_pipeline
return runner.run_pipeline(pipeline, options)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:196: in run_pipeline
self._latest_run_result = self.run_via_runner_api(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:223: in run_via_runner_api
return self.run_stages(stage_context, stages)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:470: in run_stages
bundle_results = self._execute_bundle(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:795: in _execute_bundle
self._run_bundle(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:1034: in _run_bundle
result, splits = bundle_manager.process_bundle(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:1360: in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
apache_beam/runners/portability/fn_api_runner/worker_handlers.py:386: in push
response = self.worker.do_instruction(request)
apache_beam/runners/worker/sdk_worker.py:658: in do_instruction
return getattr(self, request_type)(
apache_beam/runners/worker/sdk_worker.py:696: in process_bundle
bundle_processor.process_bundle(instruction_id))
apache_beam/runners/worker/bundle_processor.py:1274: in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
apache_beam/runners/worker/bundle_processor.py:237: in process_encoded
self.output(decoded_value)
apache_beam/runners/worker/operations.py:568: in output
_cast_to_receiver(self.receivers[output_index]).receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:950: in process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in _reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in _write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:950: in process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in _reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in _write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:950: in process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1585: in _reraise_augmented
raise exn
apache_beam/runners/common.py:1495: in process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:683: in invoke_process
self.output_handler.handle_process_outputs(
apache_beam/runners/common.py:1680: in handle_process_outputs
self._write_value_to_tag(tag, windowed_value, watermark_estimator)
apache_beam/runners/common.py:1793: in _write_value_to_tag
self.main_receivers.receive(windowed_value)
apache_beam/runners/worker/operations.py:262: in receive
self.consumer.process(windowed_value)
apache_beam/runners/worker/operations.py:950: in process
delayed_applications = self.dofn_runner.process(o)
apache_beam/runners/common.py:1497: in process
self._reraise_augmented(exn, windowed_value)
apache_beam/runners/common.py:1606: in _reraise_augmented
raise new_exn
apache_beam/runners/common.py:1495: in process
return self.do_fn_invoker.invoke_process(windowed_value)
apache_beam/runners/common.py:911: in invoke_process
self._invoke_process_per_window(
apache_beam/runners/common.py:1055: in _invoke_process_per_window
self.process_method(*args_for_process, **kwargs_for_process),
apache_beam/transforms/core.py:2084: in <lambda>
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
actual = ['{"a": 1, "b": ["x", "y"]}', '{"a": 2, "b": ["t", "u"]}', '{"a": 2, "b": ["t", "u"]}', '{"a": 1, "b": ["x", "y"]}']
equals_fn = <function equal_to.<locals>._equal.<locals>.<lambda> at 0x7cdbbc50daf0>
def _equal(actual, equals_fn=equals_fn):
expected_list = list(expected)
# Try to compare actual and expected by sorting. This fails with a
# TypeError in Python 3 if different types are present in the same
# collection. It can also raise false negatives for types that don't have
# a deterministic sort order, like pyarrow Tables as of 0.14.1
if not equals_fn:
equals_fn = lambda e, a: e == a
try:
sorted_expected = sorted(expected)
sorted_actual = sorted(actual)
if sorted_expected == sorted_actual:
return
except TypeError:
pass
# Slower method, used in two cases:
# 1) If sorted expected != actual, use this method to verify the inequality.
# This ensures we don't raise any false negatives for types that don't
# have a deterministic sort order.
# 2) As a fallback if we encounter a TypeError in python 3. this method
# works on collections that have different types.
unexpected = []
for element in actual:
found = False
for i, v in enumerate(expected_list):
if equals_fn(v, element):
found = True
expected_list.pop(i)
break
if not found:
unexpected.append(element)
if unexpected or expected_list:
msg = 'Failed assert: %r == %r' % (expected, actual)
if unexpected:
msg = msg + ', unexpected elements %r' % unexpected
if expected_list:
msg = msg + ', missing elements %r' % expected_list
> raise BeamAssertException(msg)
E apache_beam.testing.util.BeamAssertException: Failed assert: ['{"a": 1, "b": ["x", "y"]}', '{"a": 2, "b": ["t", "u"]}'] == ['{"a": 1, "b": ["x", "y"]}', '{"a": 2, "b": ["t", "u"]}', '{"a": 2, "b": ["t", "u"]}', '{"a": 1, "b": ["x", "y"]}'], unexpected elements ['{"a": 2, "b": ["t", "u"]}', '{"a": 1, "b": ["x", "y"]}'] [while running 'assert_that/Match']
apache_beam/testing/util.py:192: BeamAssertException
Check warning on line 0 in apache_beam.io.gcp.bigquery_test.BigQueryStreamingInsertsErrorHandling
github-actions / Test Results
test_insert_rows_json_intermittent_retriable_exception_1 (apache_beam.io.gcp.bigquery_test.BigQueryStreamingInsertsErrorHandling) failed
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloudcoverage.xml [took 9s]
Raw output
RuntimeError: StopIteration [while running 'WriteToBigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)']
args = (<apache_beam.io.gcp.bigquery_tools.BigQueryWrapper object at 0x7a2f93108520>, 'project', 'dataset', 'table', [{'colum...value1'}, {'columnA': 'value2'}], ['d1613e4b-ebd5-4130-8f88-5d70472598df-0', 'd1613e4b-ebd5-4130-8f88-5d70472598df-1'])
kwargs = {'ignore_unknown_values': False, 'skip_invalid_rows': True}
retry_intervals = <generator object FuzzedExponentialIntervals.__iter__ at 0x7a2eb47dd510>
exn_traceback = None, sleep_interval = 12.170291026146646
@functools.wraps(fun)
def wrapper(*args, **kwargs):
retry_intervals = iter(
FuzzedExponentialIntervals(
initial_delay_secs,
num_retries,
factor,
fuzz=0.5 if fuzz else 0,
max_delay_secs=max_delay_secs,
stop_after_secs=stop_after_secs))
while True:
try:
return fun(*args, **kwargs)
except Exception as exn: # pylint: disable=broad-except
if not retry_filter(exn):
raise
# Get the traceback object for the current exception. The
# sys.exc_info() function returns a tuple with three elements:
# exception type, exception value, and exception traceback.
exn_traceback = sys.exc_info()[2]
try:
try:
> sleep_interval = next(retry_intervals)
E StopIteration
apache_beam/utils/retry.py:310: StopIteration
During handling of the above exception, another exception occurred:
self = <apache_beam.runners.common.DoFnRunner object at 0x7a2f93623670>
bundle_method = <bound method DoFnInvoker.invoke_finish_bundle of <apache_beam.runners.common.PerWindowInvoker object at 0x7a2f409a6c70>>
def _invoke_bundle_method(self, bundle_method):
try:
self.context.set_element(None)
> bundle_method()
apache_beam/runners/common.py:1552:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam/runners/common.py:621: in invoke_finish_bundle
self.signature.finish_bundle_method.method_value())
apache_beam/io/gcp/bigquery.py:1645: in finish_bundle
return self._flush_all_batches()
apache_beam/io/gcp/bigquery.py:1653: in _flush_all_batches
*[
apache_beam/io/gcp/bigquery.py:1654: in <listcomp>
self._flush_batch(destination)
apache_beam/io/gcp/bigquery.py:1683: in _flush_batch
passed, errors = self.bigquery_wrapper.insert_rows(
apache_beam/io/gcp/bigquery_tools.py:1293: in insert_rows
result, errors = self._insert_all_rows(
apache_beam/utils/retry.py:313: in wrapper
raise exn.with_traceback(exn_traceback)
apache_beam/utils/retry.py:300: in wrapper
return fun(*args, **kwargs)
apache_beam/io/gcp/bigquery_tools.py:744: in _insert_all_rows
errors = self.gcp_bq_client.insert_rows_json(
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/mock/mock.py:1190: in __call__
return _mock_self._mock_call(*args, **kwargs)
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/mock/mock.py:1194: in _mock_call
return _mock_self._execute_mock_call(*args, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_mock_self = <MagicMock name='insert_rows_json' id='134341618634656'>
args = ('project.dataset.table',)
kwargs = {'ignore_unknown_values': False, 'json_rows': [{'columnA': 'value1'}, {'columnA': 'value2'}], 'row_ids': ['d1613e4b-ebd5-4130-8f88-5d70472598df-0', 'd1613e4b-ebd5-4130-8f88-5d70472598df-1'], 'skip_invalid_rows': True, ...}
self = <MagicMock name='insert_rows_json' id='134341618634656'>
effect = <list_iterator object at 0x7a2efc7273a0>
def _execute_mock_call(_mock_self, *args, **kwargs):
self = _mock_self
# separate from _increment_mock_call so that awaited functions are
# executed separately from their call, also AsyncMock overrides this method
effect = self.side_effect
if effect is not None:
if _is_exception(effect):
raise effect
elif not _callable(effect):
> result = next(effect)
E StopIteration
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/mock/mock.py:1253: StopIteration
During handling of the above exception, another exception occurred:
a = (<apache_beam.io.gcp.bigquery_test.BigQueryStreamingInsertsErrorHandling testMethod=test_insert_rows_json_intermittent_retriable_exception_1>,)
kw = {}
@wraps(func)
def standalone_func(*a, **kw):
> return func(*(a + p.args), **p.kwargs, **kw)
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/parameterized/parameterized.py:620:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/mock/mock.py:1468: in patched
return func(*newargs, **newkeywargs)
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/parameterized/parameterized.py:93: in dummy_func
return orgfunc(*args, **kwargs)
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/mock/mock.py:1468: in patched
return func(*newargs, **newkeywargs)
apache_beam/io/gcp/bigquery_test.py:1540: in test_insert_rows_json_intermittent_retriable_exception
_ = (
apache_beam/pipeline.py:644: in __exit__
self.result = self.run()
apache_beam/pipeline.py:618: in run
return self.runner.run_pipeline(self, self._options)
apache_beam/runners/direct/direct_runner.py:190: in run_pipeline
return runner.run_pipeline(pipeline, options)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:196: in run_pipeline
self._latest_run_result = self.run_via_runner_api(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:223: in run_via_runner_api
return self.run_stages(stage_context, stages)
apache_beam/runners/portability/fn_api_runner/fn_runner.py:470: in run_stages
bundle_results = self._execute_bundle(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:795: in _execute_bundle
self._run_bundle(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:1034: in _run_bundle
result, splits = bundle_manager.process_bundle(
apache_beam/runners/portability/fn_api_runner/fn_runner.py:1360: in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
apache_beam/runners/portability/fn_api_runner/worker_handlers.py:386: in push
response = self.worker.do_instruction(request)
apache_beam/runners/worker/sdk_worker.py:658: in do_instruction
return getattr(self, request_type)(
apache_beam/runners/worker/sdk_worker.py:696: in process_bundle
bundle_processor.process_bundle(instruction_id))
apache_beam/runners/worker/bundle_processor.py:1282: in process_bundle
op.finish()
apache_beam/runners/worker/operations.py:985: in finish
self.dofn_runner.finish()
apache_beam/runners/common.py:1573: in finish
self._invoke_bundle_method(self.do_fn_invoker.invoke_finish_bundle)
apache_beam/runners/common.py:1554: in _invoke_bundle_method
self._reraise_augmented(exn)
apache_beam/runners/common.py:1606: in _reraise_augmented
raise new_exn
apache_beam/runners/common.py:1552: in _invoke_bundle_method
bundle_method()
apache_beam/runners/common.py:621: in invoke_finish_bundle
self.signature.finish_bundle_method.method_value())
apache_beam/io/gcp/bigquery.py:1645: in finish_bundle
return self._flush_all_batches()
apache_beam/io/gcp/bigquery.py:1653: in _flush_all_batches
*[
apache_beam/io/gcp/bigquery.py:1654: in <listcomp>
self._flush_batch(destination)
apache_beam/io/gcp/bigquery.py:1683: in _flush_batch
passed, errors = self.bigquery_wrapper.insert_rows(
apache_beam/io/gcp/bigquery_tools.py:1293: in insert_rows
result, errors = self._insert_all_rows(
apache_beam/utils/retry.py:313: in wrapper
raise exn.with_traceback(exn_traceback)
apache_beam/utils/retry.py:300: in wrapper
return fun(*args, **kwargs)
apache_beam/io/gcp/bigquery_tools.py:744: in _insert_all_rows
errors = self.gcp_bq_client.insert_rows_json(
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/mock/mock.py:1190: in __call__
return _mock_self._mock_call(*args, **kwargs)
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/mock/mock.py:1194: in _mock_call
return _mock_self._execute_mock_call(*args, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_mock_self = <MagicMock name='insert_rows_json' id='134341618634656'>
args = ('project.dataset.table',)
kwargs = {'ignore_unknown_values': False, 'json_rows': [{'columnA': 'value1'}, {'columnA': 'value2'}], 'row_ids': ['d1613e4b-ebd5-4130-8f88-5d70472598df-0', 'd1613e4b-ebd5-4130-8f88-5d70472598df-1'], 'skip_invalid_rows': True, ...}
self = <MagicMock name='insert_rows_json' id='134341618634656'>
effect = <list_iterator object at 0x7a2efc7273a0>
def _execute_mock_call(_mock_self, *args, **kwargs):
self = _mock_self
# separate from _increment_mock_call so that awaited functions are
# executed separately from their call, also AsyncMock overrides this method
effect = self.side_effect
if effect is not None:
if _is_exception(effect):
raise effect
elif not _callable(effect):
> result = next(effect)
E RuntimeError: StopIteration [while running 'WriteToBigQuery/_StreamToBigQuery/StreamInsertRows/ParDo(BigQueryWriteFn)']
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/mock/mock.py:1253: RuntimeError
Check warning on line 0 in apache_beam.io.gcp.bigquery_test.BigQueryStreamingInsertsErrorHandling
github-actions / Test Results
test_insert_rows_json_persistent_retriable_exception_0 (apache_beam.io.gcp.bigquery_test.BigQueryStreamingInsertsErrorHandling) failed
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloudcoverage.xml [took 11s]
Raw output
AssertionError: 4 != 8
a = (<apache_beam.io.gcp.bigquery_test.BigQueryStreamingInsertsErrorHandling testMethod=test_insert_rows_json_persistent_retriable_exception_0>,)
kw = {}
@wraps(func)
def standalone_func(*a, **kw):
> return func(*(a + p.args), **p.kwargs, **kw)
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/parameterized/parameterized.py:620:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/mock/mock.py:1468: in patched
return func(*newargs, **newkeywargs)
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/parameterized/parameterized.py:93: in dummy_func
return orgfunc(*args, **kwargs)
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/mock/mock.py:1468: in patched
return func(*newargs, **newkeywargs)
apache_beam/io/gcp/bigquery_test.py:1514: in test_insert_rows_json_persistent_retriable_exception
self.assertEqual(expected_call_count, mock_send.call_count)
E AssertionError: 4 != 8
Check warning on line 0 in apache_beam.io.textio_test.TextSinkTest
github-actions / Test Results
test_write_pipeline_non_globalwindow_input (apache_beam.io.textio_test.TextSinkTest) failed
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloudcoverage.xml [took 11s]
Raw output
AssertionError: Lists differ: [b'Line 0', b'Line 0', b'Line 1', b'Line 1', b'Line 10', b'L[2315 chars] 99'] != [b'Line 0', b'Line 1', b'Line 10', b'Line 11', b'Line 12', b[1125 chars] 99']
First differing element 1:
b'Line 0'
b'Line 1'
First list contains 100 additional elements.
First extra element 100:
b'Line 54'
Diff is 2980 characters long. Set self.maxDiff to None to see it.
self = <apache_beam.io.textio_test.TextSinkTest testMethod=test_write_pipeline_non_globalwindow_input>
def test_write_pipeline_non_globalwindow_input(self):
with TestPipeline() as p:
_ = (
p
| beam.core.Create(self.lines)
| beam.WindowInto(beam.transforms.window.FixedWindows(1))
| 'Write' >> WriteToText(self.path))
read_result = []
for file_name in glob.glob(self.path + '*'):
with open(file_name, 'rb') as f:
read_result.extend(f.read().splitlines())
> self.assertEqual(sorted(read_result), sorted(self.lines))
E AssertionError: Lists differ: [b'Line 0', b'Line 0', b'Line 1', b'Line 1', b'Line 10', b'L[2315 chars] 99'] != [b'Line 0', b'Line 1', b'Line 10', b'Line 11', b'Line 12', b[1125 chars] 99']
E
E First differing element 1:
E b'Line 0'
E b'Line 1'
E
E First list contains 100 additional elements.
E First extra element 100:
E b'Line 54'
E
E Diff is 2980 characters long. Set self.maxDiff to None to see it.
apache_beam/io/textio_test.py:1625: AssertionError
Check warning on line 0 in apache_beam.io.gcp.bigquery_test.BigQueryStreamingInsertsErrorHandling
github-actions / Test Results
test_insert_rows_json_persistent_retriable_exception_1 (apache_beam.io.gcp.bigquery_test.BigQueryStreamingInsertsErrorHandling) failed
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloudcoverage.xml [took 9s]
Raw output
AssertionError: 4 != 8
a = (<apache_beam.io.gcp.bigquery_test.BigQueryStreamingInsertsErrorHandling testMethod=test_insert_rows_json_persistent_retriable_exception_1>,)
kw = {}
@wraps(func)
def standalone_func(*a, **kw):
> return func(*(a + p.args), **p.kwargs, **kw)
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/parameterized/parameterized.py:620:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/mock/mock.py:1468: in patched
return func(*newargs, **newkeywargs)
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/parameterized/parameterized.py:93: in dummy_func
return orgfunc(*args, **kwargs)
target/.tox-py39-cloudcoverage/py39-cloudcoverage/lib/python3.9/site-packages/mock/mock.py:1468: in patched
return func(*newargs, **newkeywargs)
apache_beam/io/gcp/bigquery_test.py:1514: in test_insert_rows_json_persistent_retriable_exception
self.assertEqual(expected_call_count, mock_send.call_count)
E AssertionError: 4 != 8
Check warning on line 0 in apache_beam.io.filebasedsink_test.TestFileBasedSink
github-actions / Test Results
test_fixed_shard_write (apache_beam.io.filebasedsink_test.TestFileBasedSink) failed
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloudcoverage.xml [took 10s]
Raw output
AssertionError: False is not true : [start][b][end][start][end][start][b][end]
self = <apache_beam.io.filebasedsink_test.TestFileBasedSink testMethod=test_fixed_shard_write>
def test_fixed_shard_write(self):
temp_path = os.path.join(self._new_tempdir(), 'empty')
sink = MyFileBasedSink(
temp_path,
file_name_suffix='.output',
num_shards=3,
shard_name_template='_NN_SSS_',
coder=coders.ToBytesCoder())
with TestPipeline() as p:
p | beam.Create(['a', 'b']) | beam.io.Write(sink) # pylint: disable=expression-not-assigned
concat = ''.join(
open(temp_path + '_03_%03d_.output' % shard_num).read()
for shard_num in range(3))
> self.assertTrue('][a][' in concat, concat)
E AssertionError: False is not true : [start][b][end][start][end][start][b][end]
apache_beam/io/filebasedsink_test.py:193: AssertionError
Check warning on line 0 in apache_beam.dataframe.doctests_test.DoctestTest
github-actions / Test Results
test_failure (apache_beam.dataframe.doctests_test.DoctestTest) failed
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloudcoverage.xml [took 40s]
Raw output
AssertionError: 2 != 1
self = <apache_beam.dataframe.doctests_test.DoctestTest testMethod=test_failure>
def test_failure(self):
result = doctests.teststring(
SAMPLE_DOCTEST.replace('25.0', '25.00001'), report=False)
self.assertEqual(result.attempted, 3)
> self.assertEqual(result.failed, 1)
E AssertionError: 2 != 1
apache_beam/dataframe/doctests_test.py:156: AssertionError
Check warning on line 0 in apache_beam.dataframe.doctests_test.DoctestTest
github-actions / Test Results
test_file (apache_beam.dataframe.doctests_test.DoctestTest) failed
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloudcoverage.xml [took 39s]
Raw output
AssertionError: 2 != 0
self = <apache_beam.dataframe.doctests_test.DoctestTest testMethod=test_file>
def test_file(self):
with tempfile.TemporaryDirectory() as dir:
filename = os.path.join(dir, 'tests.py')
with open(filename, 'w') as fout:
fout.write(SAMPLE_DOCTEST)
result = doctests.testfile(filename, module_relative=False, report=False)
self.assertEqual(result.attempted, 3)
> self.assertEqual(result.failed, 0)
E AssertionError: 2 != 0
apache_beam/dataframe/doctests_test.py:170: AssertionError
Check warning on line 0 in apache_beam.dataframe.doctests_test.DoctestTest
github-actions / Test Results
test_good (apache_beam.dataframe.doctests_test.DoctestTest) failed
sdks/python/test-suites/tox/py39/build/srcs/sdks/python/pytest_py39-cloudcoverage.xml [took 34s]
Raw output
AssertionError: 2 != 0
self = <apache_beam.dataframe.doctests_test.DoctestTest testMethod=test_good>
def test_good(self):
result = doctests.teststring(SAMPLE_DOCTEST, report=False)
self.assertEqual(result.attempted, 3)
> self.assertEqual(result.failed, 0)
E AssertionError: 2 != 0
apache_beam/dataframe/doctests_test.py:150: AssertionError