Skip to content

Commit c75d105

Browse files
authored
Fix postcommit python flaky tests (#34536)
* Fix postcommit python flaky tests * Fix formatting * Fix formatting * Fix PostCommit Python tests related comments * Formatting
1 parent 7caa09c commit c75d105

File tree

5 files changed

+36
-6
lines changed

5 files changed

+36
-6
lines changed

sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,11 @@ def _setup_new_types_env(self):
131131
request = bigquery.BigqueryTablesInsertRequest(
132132
projectId=self.project, datasetId=self.dataset_id, table=table)
133133
self.bigquery_client.client.tables.Insert(request)
134+
135+
# Call get_table so that we wait until the table is visible.
136+
_ = self.bigquery_client.get_table(
137+
self.project, self.dataset_id, NEW_TYPES_INPUT_TABLE)
138+
134139
table_data = [{
135140
'bytes': b'xyw', 'date': '2011-01-01', 'time': '23:59:59.999999'
136141
}, {

sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,8 @@ def create_table(cls, table_name):
158158
request = bigquery.BigqueryTablesInsertRequest(
159159
projectId=cls.project, datasetId=cls.dataset_id, table=table)
160160
cls.bigquery_client.client.tables.Insert(request)
161+
# Call get_table so that we wait until the table is visible.
162+
_ = cls.bigquery_client.get_table(cls.project, cls.dataset_id, table_name)
161163
cls.bigquery_client.insert_rows(
162164
cls.project, cls.dataset_id, table_name, cls.TABLE_DATA)
163165

@@ -395,6 +397,8 @@ def _create_table(cls, table_name):
395397
request = bigquery.BigqueryTablesInsertRequest(
396398
projectId=cls.project, datasetId=cls.dataset_id, table=table)
397399
cls.bigquery_client.client.tables.Insert(request)
400+
# Call get_table so that we wait until the table is visible.
401+
_ = cls.bigquery_client.get_table(cls.project, cls.dataset_id, table_name)
398402
cls.bigquery_client.insert_rows(
399403
cls.project, cls.dataset_id, table_name, cls.TABLE_DATA)
400404

@@ -656,6 +660,8 @@ def create_table(cls, table_name):
656660
request = bigquery.BigqueryTablesInsertRequest(
657661
projectId=cls.project, datasetId=cls.dataset_id, table=table)
658662
cls.bigquery_client.client.tables.Insert(request)
663+
# Call get_table so that we wait until the table is visible.
664+
_ = cls.bigquery_client.get_table(cls.project, cls.dataset_id, table_name)
659665
row_data = {
660666
'float': 0.33,
661667
'numeric': Decimal('10'),
@@ -760,8 +766,8 @@ def setUpClass(cls):
760766
cls.table_name2 = 'python_rd_table_2'
761767
cls.table_schema2 = cls.create_table(
762768
cls.table_name2, cls.TABLE_DATA_2, cls.SCHEMA_BQ)
763-
table_id2 = '{}.{}'.format(cls.dataset_id, cls.table_name2)
764-
cls.query2 = 'SELECT number, str FROM %s' % table_id2
769+
cls.query2 = 'SELECT number, str FROM [%s:%s.%s]' % (
770+
cls.project, cls.dataset_id, cls.table_name2)
765771

766772
cls.table_name3 = 'python_rd_table_3'
767773
cls.table_schema3 = cls.create_table(
@@ -779,6 +785,8 @@ def create_table(cls, table_name, data, table_schema):
779785
request = bigquery.BigqueryTablesInsertRequest(
780786
projectId=cls.project, datasetId=cls.dataset_id, table=table)
781787
cls.bigquery_client.client.tables.Insert(request)
788+
# Call get_table so that we wait until the table is visible.
789+
_ = cls.bigquery_client.get_table(cls.project, cls.dataset_id, table_name)
782790
cls.bigquery_client.insert_rows(
783791
cls.project, cls.dataset_id, table_name, data)
784792
return table_schema

sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -453,8 +453,12 @@ def test_big_query_write_insert_errors_reporting(self):
453453

454454
assert_that(
455455
errors[BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS]
456-
| 'ParseErrors' >> beam.Map(lambda err: (err[1], err[2])),
457-
equal_to(bq_result_errors))
456+
| 'ParseErrors' >> beam.Map(lambda err: (err[1], err[2]))
457+
| 'ToList' >> beam.combiners.ToList()
458+
| 'SortErrors' >> beam.Map(
459+
lambda errs: sorted(errs, key=lambda x: x[0].get("number", 0))),
460+
equal_to(
461+
[sorted(bq_result_errors, key=lambda x: x[0].get("number", 0))]))
458462

459463
@pytest.mark.it_postcommit
460464
def test_big_query_write_insert_non_transient_api_call_error(self):

sdks/python/apache_beam/io/gcp/gcsio_integration_test.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
# pytype: skip-file
3030

3131
import logging
32+
import time
3233
import unittest
3334
import uuid
3435
import zlib
@@ -58,6 +59,9 @@
5859
except ImportError:
5960
NotFound = None
6061

62+
# Number of seconds to wait for bucket deletion to propagate.
63+
WAIT_DELETE_BUCKET_PROPAGATION_SECONDS = 10
64+
6165

6266
@unittest.skipIf(gcsio is None, 'GCP dependencies are not installed')
6367
@parameterized_class(
@@ -221,6 +225,7 @@ def test_create_default_bucket(self, mock_default_gcs_bucket_name):
221225
if existing_bucket:
222226
try:
223227
existing_bucket.delete()
228+
time.sleep(WAIT_DELETE_BUCKET_PROPAGATION_SECONDS)
224229
except NotFound:
225230
# Bucket existence check from get_bucket may be inaccurate due to gcs
226231
# cache or delay

sdks/python/apache_beam/ml/gcp/recommendations_ai.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
from apache_beam.transforms import ParDo
3434
from apache_beam.transforms import PTransform
3535
from apache_beam.transforms.util import GroupIntoBatches
36+
from apache_beam.utils import retry
3637
from cachetools.func import ttl_cache
3738

3839
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
@@ -53,6 +54,7 @@
5354
]
5455

5556
FAILED_CATALOG_ITEMS = "failed_catalog_items"
57+
MAX_RETRIES = 5
5658

5759

5860
@ttl_cache(maxsize=128, ttl=3600)
@@ -154,13 +156,19 @@ def process(self, element):
154156
request = recommendationengine.CreateCatalogItemRequest(
155157
parent=self.parent, catalog_item=catalog_item)
156158

157-
try:
158-
created_catalog_item = self._client.create_catalog_item(
159+
@retry.with_exponential_backoff(
160+
num_retries=MAX_RETRIES,
161+
retry_filter=retry.retry_on_server_errors_timeout_or_quota_issues_filter
162+
)
163+
def create_item():
164+
return self._client.create_catalog_item(
159165
request=request,
160166
retry=self.retry,
161167
timeout=self.timeout,
162168
metadata=self.metadata)
163169

170+
try:
171+
created_catalog_item = create_item()
164172
self.counter.inc()
165173
yield recommendationengine.CatalogItem.to_dict(created_catalog_item)
166174
except Exception:

0 commit comments

Comments
 (0)