Skip to content

Commit df656a5

Browse files
committed
Fix PostCommit Python tests related comments
1 parent e3855c7 commit df656a5

File tree

4 files changed

+27
-16
lines changed

4 files changed

+27
-16
lines changed

sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,8 @@ def _setup_new_types_env(self):
132132
projectId=self.project, datasetId=self.dataset_id, table=table)
133133
self.bigquery_client.client.tables.Insert(request)
134134

135-
# Wait for table creation to propagate.
136-
time.sleep(10)
135+
# Call get_table so that we wait until the table is visible.
136+
_ = self.bigquery_client.get_table(self.project, self.dataset_id, NEW_TYPES_INPUT_TABLE)
137137

138138
table_data = [{
139139
'bytes': b'xyw', 'date': '2011-01-01', 'time': '23:59:59.999999'

sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py

+16-8
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import datetime
2525
import logging
2626
import secrets
27-
import time as mod_time
27+
import time
2828
import unittest
2929
import uuid
3030
from decimal import Decimal
@@ -99,7 +99,7 @@ def setUpClass(cls):
9999

100100
cls.bigquery_client = BigQueryWrapper()
101101
cls.dataset_id = '%s%d%s' % (
102-
cls.BIG_QUERY_DATASET_ID, int(mod_time.time()), secrets.token_hex(3))
102+
cls.BIG_QUERY_DATASET_ID, int(time.time()), secrets.token_hex(3))
103103
cls.bigquery_client.get_or_create_dataset(cls.project, cls.dataset_id)
104104
_LOGGER.info(
105105
"Created dataset %s in project %s", cls.dataset_id, cls.project)
@@ -158,9 +158,11 @@ def create_table(cls, table_name):
158158
request = bigquery.BigqueryTablesInsertRequest(
159159
projectId=cls.project, datasetId=cls.dataset_id, table=table)
160160
cls.bigquery_client.client.tables.Insert(request)
161+
# Call get_table so that we wait until the table is visible.
162+
_ = cls.bigquery_client.get_table(
163+
cls.project, cls.dataset_id, table_name)
161164
cls.bigquery_client.insert_rows(
162165
cls.project, cls.dataset_id, table_name, cls.TABLE_DATA)
163-
mod_time.sleep(10)
164166

165167
@skip(['PortableRunner', 'FlinkRunner'])
166168
@pytest.mark.it_postcommit
@@ -333,7 +335,7 @@ class ReadUsingStorageApiTests(BigQueryReadIntegrationTests):
333335
def setUpClass(cls):
334336
super(ReadUsingStorageApiTests, cls).setUpClass()
335337
cls.table_name = '%s%d%s' % (
336-
cls.BIG_QUERY_DATASET_ID, int(mod_time.time()), secrets.token_hex(3))
338+
cls.BIG_QUERY_DATASET_ID, int(time.time()), secrets.token_hex(3))
337339
cls._create_table(cls.table_name)
338340

339341
table_id = '{}.{}'.format(cls.dataset_id, cls.table_name)
@@ -396,9 +398,11 @@ def _create_table(cls, table_name):
396398
request = bigquery.BigqueryTablesInsertRequest(
397399
projectId=cls.project, datasetId=cls.dataset_id, table=table)
398400
cls.bigquery_client.client.tables.Insert(request)
401+
# Call get_table so that we wait until the table is visible.
402+
_ = cls.bigquery_client.get_table(
403+
cls.project, cls.dataset_id, table_name)
399404
cls.bigquery_client.insert_rows(
400405
cls.project, cls.dataset_id, table_name, cls.TABLE_DATA)
401-
mod_time.sleep(10)
402406

403407
@classmethod
404408
def _setup_temporary_dataset(cls, project, query):
@@ -411,7 +415,7 @@ def _execute_query(cls, project, query):
411415
'materializing_table_before_reading',
412416
str(uuid.uuid4())[0:10],
413417
bigquery_tools.BigQueryJobTypes.QUERY,
414-
'%d_%s' % (int(mod_time.time()), secrets.token_hex(3)))
418+
'%d_%s' % (int(time.time()), secrets.token_hex(3)))
415419
cls._setup_temporary_dataset(cls.project, cls.query)
416420
job = cls.bigquery_client._start_query_job(
417421
project,
@@ -658,6 +662,9 @@ def create_table(cls, table_name):
658662
request = bigquery.BigqueryTablesInsertRequest(
659663
projectId=cls.project, datasetId=cls.dataset_id, table=table)
660664
cls.bigquery_client.client.tables.Insert(request)
665+
# Call get_table so that we wait until the table is visible.
666+
_ = cls.bigquery_client.get_table(
667+
cls.project, cls.dataset_id, table_name)
661668
row_data = {
662669
'float': 0.33,
663670
'numeric': Decimal('10'),
@@ -676,7 +683,6 @@ def create_table(cls, table_name):
676683

677684
cls.bigquery_client.insert_rows(
678685
cls.project, cls.dataset_id, table_name, table_data)
679-
mod_time.sleep(10)
680686

681687
def get_expected_data(self, native=True):
682688
byts = b'\xab\xac'
@@ -782,9 +788,11 @@ def create_table(cls, table_name, data, table_schema):
782788
request = bigquery.BigqueryTablesInsertRequest(
783789
projectId=cls.project, datasetId=cls.dataset_id, table=table)
784790
cls.bigquery_client.client.tables.Insert(request)
791+
# Call get_table so that we wait until the table is visible.
792+
_ = cls.bigquery_client.get_table(
793+
cls.project, cls.dataset_id, table_name)
785794
cls.bigquery_client.insert_rows(
786795
cls.project, cls.dataset_id, table_name, data)
787-
mod_time.sleep(10)
788796
return table_schema
789797

790798
@classmethod

sdks/python/apache_beam/io/gcp/gcsio_integration_test.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@
6060
NotFound = None
6161

6262

63+
# Number of seconds to wait for bucket deletion to propagate.
64+
WAIT_DELETE_BUCKET_PROPAGATION_SECONDS = 10
65+
6366
@unittest.skipIf(gcsio is None, 'GCP dependencies are not installed')
6467
@parameterized_class(
6568
('no_gcsio_throttling_counter', 'enable_gcsio_blob_generation'),
@@ -222,7 +225,7 @@ def test_create_default_bucket(self, mock_default_gcs_bucket_name):
222225
if existing_bucket:
223226
try:
224227
existing_bucket.delete()
225-
time.sleep(10)
228+
time.sleep(WAIT_DELETE_BUCKET_PROPAGATION_SECONDS)
226229
except NotFound:
227230
# Bucket existence check from get_bucket may be inaccurate due to gcs
228231
# cache or delay

sdks/python/apache_beam/ml/gcp/recommendations_ai.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
from typing import Sequence
2525
from typing import Tuple
2626

27-
import tenacity
2827
from google.api_core.retry import Retry
2928

3029
from apache_beam import pvalue
@@ -34,6 +33,7 @@
3433
from apache_beam.transforms import ParDo
3534
from apache_beam.transforms import PTransform
3635
from apache_beam.transforms.util import GroupIntoBatches
36+
from apache_beam.utils import retry
3737
from cachetools.func import ttl_cache
3838

3939
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
@@ -54,6 +54,7 @@
5454
]
5555

5656
FAILED_CATALOG_ITEMS = "failed_catalog_items"
57+
MAX_RETRIES=5
5758

5859

5960
@ttl_cache(maxsize=128, ttl=3600)
@@ -155,10 +156,9 @@ def process(self, element):
155156
request = recommendationengine.CreateCatalogItemRequest(
156157
parent=self.parent, catalog_item=catalog_item)
157158

158-
@tenacity.retry(
159-
wait=tenacity.wait_exponential(multiplier=1, min=1, max=10),
160-
stop=tenacity.stop_after_attempt(5),
161-
reraise=True)
159+
@retry.with_exponential_backoff(
160+
num_retries=MAX_RETRIES,
161+
retry_filter=retry.retry_on_server_errors_timeout_or_quota_issues_filter)
162162
def create_item():
163163
return self._client.create_catalog_item(
164164
request=request,

0 commit comments

Comments
 (0)