1818
1919import com .google .api .gax .longrunning .OperationFuture ;
2020import com .google .api .gax .longrunning .OperationSnapshot ;
21+ import com .google .api .gax .longrunning .OperationTimedPollAlgorithm ;
22+ import com .google .api .gax .retrying .RetrySettings ;
2123import com .google .cloud .dataproc .v1 .*;
2224import com .google .cloud .dataproc .v1 .Batch ;
2325import com .google .cloud .dataproc .v1 .BatchControllerClient ;
4951import org .junit .After ;
5052import org .junit .Before ;
5153import org .junit .Test ;
54+ import org .slf4j .Logger ;
55+ import org .slf4j .LoggerFactory ;
5256
5357/**
5458 * The acceptance test on the Dataproc Serverless. The test have to be running on the project with
5559 * requireOsLogin disabled, otherwise an org policy violation error will be thrown.
5660 */
5761public class DataprocServerlessAcceptanceTestBase {
62+ private static final Logger logger =
63+ LoggerFactory .getLogger (DataprocServerlessAcceptanceTestBase .class );
5864 public static final String REGION = "us-central1" ;
5965 public static final String DATAPROC_ENDPOINT = REGION + "-dataproc.googleapis.com:443" ;
6066 public static final String PROJECT_ID =
@@ -85,21 +91,37 @@ public class DataprocServerlessAcceptanceTestBase {
8591 private static String classTestBaseGcsDir ;
8692
8793 BatchControllerClient batchController ;
88- String testName =
89- getClass ()
90- .getSimpleName ()
91- .substring (0 , getClass ().getSimpleName ().length () - 32 )
92- .toLowerCase (Locale .ENGLISH );
93- String testId = String .format ("%s-%s" , testName , System .currentTimeMillis ());
94- String testBaseGcsDir = AcceptanceTestUtils .createTestBaseGcsDir (testId );
95- AcceptanceTestContext context =
96- new AcceptanceTestContext (
97- testId , generateClusterName (testId ), testBaseGcsDir , classConnectorJarUri );
98-
9994 private final String s8sImageVersion ;
95+ private final String testName ;
96+ private AcceptanceTestContext context ;
10097
10198 public DataprocServerlessAcceptanceTestBase (String s8sImageVersion ) {
10299 this .s8sImageVersion = s8sImageVersion ;
100+ testName = s8sImageVersion .replace ("." , "" ).toLowerCase (Locale .ENGLISH );
101+ }
102+
103+ private AcceptanceTestContext generateContext (String testId ) {
104+ // Cluster name has a length limit of 63 characters. The name format is:
105+ // <prefix><testName>-<testId>-<time in milliseconds>
106+ //
107+ // To prevent naming collisions it will be made unique as follows:
108+ //
109+ // prefix ("spanner-serverless-acceptance-"): 30 characters
110+ // testName - we'll use just the image version as the test name: 2 characters
111+ // The image version will be something like "latest" or "2.2", so we'll remove the dot.
112+ // - : 1 character
113+ // testId a unique name for the individual test: maximum 6 characters
114+ // - : 1 character
115+ // time in milliseconds - currentTimeMillis returns 13 digits (until the year 2286),
116+ // Total 55 characters
117+ final String testUniqueId =
118+ String .format ("%s-%s-%s" , testName , testId , System .currentTimeMillis ());
119+ final String clusterName = generateClusterName (testUniqueId );
120+
121+ logger .info ("clusterName: {}" , clusterName );
122+
123+ final String testBaseGcsDir = AcceptanceTestUtils .createTestBaseGcsDir (testUniqueId );
124+ return new AcceptanceTestContext (testId , clusterName , testBaseGcsDir , classConnectorJarUri );
103125 }
104126
105127 protected static void setup (String connectorJarDirectory , String connectorJarPrefix )
@@ -118,21 +140,40 @@ protected static void teardown() throws Exception {
118140
119141 @ Before
120142 public void createBatchControllerClient () throws Exception {
121- batchController =
122- BatchControllerClient .create (
123- BatchControllerSettings .newBuilder ().setEndpoint (DATAPROC_ENDPOINT ).build ());
143+ org .threeten .bp .Duration totalTimeout =
144+ org .threeten .bp .Duration .ofSeconds (SERVERLESS_BATCH_TIMEOUT_IN_SECONDS );
145+
146+ BatchControllerSettings .Builder settingsBuilder =
147+ BatchControllerSettings .newBuilder ().setEndpoint (DATAPROC_ENDPOINT );
148+
149+ // Configure the polling algorithm for the 'createBatch' operation
150+ settingsBuilder
151+ .createBatchOperationSettings ()
152+ .setPollingAlgorithm (
153+ OperationTimedPollAlgorithm .create (
154+ RetrySettings .newBuilder ()
155+ .setInitialRetryDelay (org .threeten .bp .Duration .ofSeconds (5 ))
156+ .setRetryDelayMultiplier (1.5 )
157+ .setMaxRetryDelay (org .threeten .bp .Duration .ofMinutes (1 ))
158+ .setTotalTimeout (totalTimeout ) // Ensures
159+ .build ()));
160+
161+ batchController = BatchControllerClient .create (settingsBuilder .build ());
124162 }
125163
126164 @ After
127165 public void tearDown () throws Exception {
128166 if (batchController != null ) {
167+ logger .info ("Tearing down batchController..." );
129168 batchController .close ();
130169 }
131170 AcceptanceTestUtils .deleteGcsDir (context .testBaseGcsDir );
132171 }
133172
134173 @ Test
135174 public void testBatch () throws Exception {
175+ // Provide a unique test name to identify the batch associated with this test.
176+ context = generateContext ("batch" );
136177 OperationSnapshot operationSnapshot =
137178 createAndRunPythonBatch (
138179 context ,
@@ -149,6 +190,8 @@ public void testBatch() throws Exception {
149190
150191 @ Test
151192 public void testWrite () throws Exception {
193+ // Provide a unique test name to identify the batch associated with this test.
194+ context = generateContext ("write" );
152195 OperationSnapshot operationSnapshot =
153196 createAndRunPythonBatch (
154197 context ,
@@ -279,6 +322,8 @@ protected PySparkBatch.Builder createPySparkBatchBuilder(
279322 }
280323
281324 public static String generateClusterName (String testId ) {
282- return String .format ("spanner-connector-serverless-acceptance-%s" , testId );
325+ String clusterName = String .format ("spanner-serverless-acceptance-%s" , testId );
326+ // cluster name must conform to pattern '[a-z0-9][a-z0-9\-]{2,61}[a-z0-9]'
327+ return clusterName .toLowerCase ();
283328 }
284329}
0 commit comments