Skip to content

Commit 4af6ecc

Browse files
authored
Merge branch 'data-integrations:develop' into develop
2 parents c3416ba + a9967d5 commit 4af6ecc

File tree

9 files changed

+144
-129
lines changed

9 files changed

+144
-129
lines changed

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
<groupId>io.cdap.plugin</groupId>
2222
<artifactId>google-cloud</artifactId>
23-
<version>0.24.0-SNAPSHOT</version>
23+
<version>0.25.0-SNAPSHOT</version>
2424
<name>Google Cloud Plugins</name>
2525
<packaging>jar</packaging>
2626
<description>Plugins for Google Big Query</description>
@@ -1296,7 +1296,7 @@
12961296
<dependency>
12971297
<groupId>io.cdap.tests.e2e</groupId>
12981298
<artifactId>cdap-e2e-framework</artifactId>
1299-
<version>0.4.0-SNAPSHOT</version>
1299+
<version>0.5.0-SNAPSHOT</version>
13001300
<scope>test</scope>
13011301
</dependency>
13021302
<dependency>

src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
4747
import io.cdap.plugin.gcp.common.CmekUtils;
4848
import io.cdap.plugin.gcp.common.GCPUtils;
49-
import io.cdap.plugin.gcp.gcs.GCSErrorDetailsProvider;
5049
import org.apache.hadoop.conf.Configuration;
5150
import org.apache.hadoop.io.NullWritable;
5251
import org.slf4j.Logger;
@@ -112,14 +111,7 @@ public final void prepareRun(BatchSinkContext context) throws Exception {
112111
}
113112

114113
// Get the required bucket name and bucket instance (if it exists)
115-
Storage storage;
116-
try {
117-
storage = GCPUtils.getStorage(project, credentials);;
118-
} catch (Exception e) {
119-
ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e,
120-
new ErrorContext(ErrorPhase.WRITING));
121-
throw ex == null ? e : ex;
122-
}
114+
Storage storage = GCPUtils.getStorage(project, credentials);;
123115
String bucketName = BigQueryUtil.getStagingBucketName(context.getArguments().asMap(), config.getLocation(),
124116
dataset, config.getBucket());
125117
bucketName = BigQuerySinkUtils.configureBucket(baseConfiguration, bucketName, runUUID.toString());

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@
6363
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
6464
import io.cdap.plugin.gcp.common.CmekUtils;
6565
import io.cdap.plugin.gcp.common.GCPUtils;
66-
import io.cdap.plugin.gcp.gcs.GCSErrorDetailsProvider;
6766
import org.apache.avro.generic.GenericData;
6867
import org.apache.hadoop.conf.Configuration;
6968
import org.apache.hadoop.io.LongWritable;
@@ -180,14 +179,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
180179
dataset, config.getBucket());
181180

182181
// Configure GCS Bucket to use
183-
Storage storage;
184-
try {
185-
storage = GCPUtils.getStorage(config.getProject(), credentials);;
186-
} catch (Exception e) {
187-
ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e,
188-
new ErrorContext(ErrorPhase.READING));
189-
throw ex == null ? e : ex;
190-
}
182+
Storage storage = GCPUtils.getStorage(config.getProject(), credentials);;
191183
String bucket = null;
192184
try {
193185
bucket = BigQuerySourceUtils.getOrCreateBucket(configuration, storage, bucketName, dataset,

src/main/java/io/cdap/plugin/gcp/common/GCPErrorDetailsProvider.java

Lines changed: 3 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,23 @@
1616

1717
package io.cdap.plugin.gcp.common;
1818

19-
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
2019
import com.google.api.client.http.HttpResponseException;
21-
import com.google.common.base.Strings;
2220
import com.google.common.base.Throwables;
2321
import io.cdap.cdap.api.exception.ErrorCategory;
2422
import io.cdap.cdap.api.exception.ErrorCategory.ErrorCategoryEnum;
25-
import io.cdap.cdap.api.exception.ErrorCodeType;
2623
import io.cdap.cdap.api.exception.ErrorType;
2724
import io.cdap.cdap.api.exception.ErrorUtils;
2825
import io.cdap.cdap.api.exception.ProgramFailureException;
2926
import io.cdap.cdap.etl.api.exception.ErrorContext;
3027
import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider;
3128

32-
import java.io.IOException;
3329
import java.util.List;
3430

3531
/**
3632
* A custom ErrorDetailsProvider for GCP plugins.
3733
*/
3834
public class GCPErrorDetailsProvider implements ErrorDetailsProvider {
39-
private static final String ERROR_MESSAGE_FORMAT = "Error occurred in the phase: '%s'. %s: %s";
35+
static final String ERROR_MESSAGE_FORMAT = "Error occurred in the phase: '%s'. %s: %s";
4036

4137
/**
4238
* Get a ProgramFailureException with the given error
@@ -53,7 +49,8 @@ public ProgramFailureException getExceptionDetails(Exception e, ErrorContext err
5349
return null;
5450
}
5551
if (t instanceof HttpResponseException) {
56-
return getProgramFailureException((HttpResponseException) t, errorContext);
52+
return GCPErrorDetailsProviderUtil.getProgramFailureException((HttpResponseException) t,
53+
getExternalDocumentationLink(), errorContext);
5754
}
5855
if (t instanceof IllegalArgumentException) {
5956
return getProgramFailureException((IllegalArgumentException) t, errorContext);
@@ -65,58 +62,6 @@ public ProgramFailureException getExceptionDetails(Exception e, ErrorContext err
6562
return null;
6663
}
6764

68-
/**
69-
* Get a ProgramFailureException with the given error
70-
* information from {@link HttpResponseException}.
71-
*
72-
* @param e The HttpResponseException to get the error information from.
73-
* @return A ProgramFailureException with the given error information.
74-
*/
75-
private ProgramFailureException getProgramFailureException(HttpResponseException e,
76-
ErrorContext errorContext) {
77-
Integer statusCode = e.getStatusCode();
78-
ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode);
79-
String errorReason = String.format("%s %s. %s", e.getStatusCode(), e.getStatusMessage(),
80-
pair.getCorrectiveAction());
81-
82-
String errorMessage = e.getMessage();
83-
String externalDocumentationLink = null;
84-
if (e instanceof GoogleJsonResponseException) {
85-
errorMessage = getErrorMessage((GoogleJsonResponseException) e);
86-
87-
externalDocumentationLink = getExternalDocumentationLink();
88-
if (!Strings.isNullOrEmpty(externalDocumentationLink)) {
89-
90-
if (!errorReason.endsWith(".")) {
91-
errorReason = errorReason + ".";
92-
}
93-
errorReason = String.format("%s For more details, see %s", errorReason,
94-
externalDocumentationLink);
95-
}
96-
}
97-
98-
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
99-
errorReason, String.format(ERROR_MESSAGE_FORMAT, errorContext.getPhase(),
100-
e.getClass().getName(), errorMessage),
101-
pair.getErrorType(), true, ErrorCodeType.HTTP, statusCode.toString(),
102-
externalDocumentationLink, e);
103-
}
104-
105-
private String getErrorMessage(GoogleJsonResponseException exception) {
106-
if (!Strings.isNullOrEmpty(exception.getMessage())) {
107-
return exception.getMessage();
108-
}
109-
if (exception.getDetails() != null) {
110-
try {
111-
return exception.getDetails().toPrettyString();
112-
} catch (IOException e) {
113-
return exception.getDetails().toString();
114-
}
115-
}
116-
return exception.getMessage();
117-
}
118-
119-
12065
/**
12166
* Get a ProgramFailureException with the given error
12267
* information from {@link IllegalArgumentException}.
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.gcp.common;
18+
19+
20+
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
21+
import com.google.api.client.http.HttpResponseException;
22+
import com.google.common.base.Strings;
23+
import com.google.common.base.Throwables;
24+
import io.cdap.cdap.api.exception.ErrorCategory;
25+
import io.cdap.cdap.api.exception.ErrorCodeType;
26+
import io.cdap.cdap.api.exception.ErrorType;
27+
import io.cdap.cdap.api.exception.ErrorUtils;
28+
import io.cdap.cdap.api.exception.ProgramFailureException;
29+
import io.cdap.cdap.etl.api.exception.ErrorContext;
30+
31+
import java.io.IOException;
32+
import java.util.List;
33+
import javax.annotation.Nullable;
34+
35+
/**
36+
* Common functions for GCP error details provider related functionalities.
37+
*/
38+
public final class GCPErrorDetailsProviderUtil {
39+
40+
/**
41+
* Get a ProgramFailureException with the given error
42+
* information from {@link HttpResponseException}.
43+
*
44+
* @param e The HttpResponseException to get the error information from.
45+
* @return A ProgramFailureException with the given error information.
46+
*/
47+
public static ProgramFailureException getProgramFailureException(HttpResponseException e, String externalDocUrl,
48+
@Nullable ErrorContext errorContext) {
49+
Integer statusCode = e.getStatusCode();
50+
ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode);
51+
String errorReason = String.format("%s %s. %s", e.getStatusCode(), e.getStatusMessage(),
52+
pair.getCorrectiveAction());
53+
String errorMessage = e.getMessage();
54+
String externalDocumentationLink = null;
55+
if (e instanceof GoogleJsonResponseException) {
56+
errorMessage = getErrorMessage((GoogleJsonResponseException) e);
57+
externalDocumentationLink = externalDocUrl;
58+
if (!Strings.isNullOrEmpty(externalDocumentationLink)) {
59+
if (!errorReason.endsWith(".")) {
60+
errorReason = errorReason + ".";
61+
}
62+
errorReason = String.format("%s For more details, see %s", errorReason, externalDocumentationLink);
63+
}
64+
}
65+
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason,
66+
errorContext != null ?
67+
String.format(GCPErrorDetailsProvider.ERROR_MESSAGE_FORMAT, errorContext.getPhase(), e.getClass().getName(),
68+
errorMessage) : String.format("%s: %s", e.getClass().getName(), errorMessage), pair.getErrorType(), true,
69+
ErrorCodeType.HTTP, statusCode.toString(), externalDocumentationLink, e);
70+
}
71+
72+
public static ProgramFailureException getHttpResponseExceptionDetailsFromChain(Exception e, String errorReason,
73+
ErrorType errorType,
74+
boolean dependency,
75+
String externalDocUrl) {
76+
List<Throwable> causalChain = Throwables.getCausalChain(e);
77+
for (Throwable t : causalChain) {
78+
if (t instanceof ProgramFailureException) {
79+
// Avoid double wrap
80+
return (ProgramFailureException) t;
81+
}
82+
if (t instanceof HttpResponseException) {
83+
return getProgramFailureException((HttpResponseException) t, externalDocUrl, null);
84+
}
85+
}
86+
// If no HttpResponseException found in the causal chain, return generic program failure exception
87+
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason,
88+
String.format("%s %s: %s", errorReason, e.getClass().getName(), e.getMessage()), errorType, dependency, e);
89+
}
90+
91+
private static String getErrorMessage(GoogleJsonResponseException exception) {
92+
if (!Strings.isNullOrEmpty(exception.getMessage())) {
93+
return exception.getMessage();
94+
}
95+
if (exception.getDetails() != null) {
96+
try {
97+
return exception.getDetails().toPrettyString();
98+
} catch (IOException e) {
99+
return exception.getDetails().toString();
100+
}
101+
}
102+
return exception.getMessage();
103+
}
104+
}

src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSBucketCreate.java

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,29 +16,26 @@
1616

1717
package io.cdap.plugin.gcp.gcs.actions;
1818

19-
import com.google.api.pathtemplate.ValidationException;
2019
import com.google.auth.Credentials;
21-
import com.google.auth.oauth2.ServiceAccountCredentials;
2220
import com.google.cloud.kms.v1.CryptoKeyName;
2321
import com.google.cloud.storage.Bucket;
2422
import com.google.cloud.storage.Storage;
25-
import com.google.cloud.storage.StorageException;
26-
import com.google.common.annotations.VisibleForTesting;
27-
import com.google.common.base.Strings;
2823
import io.cdap.cdap.api.annotation.Description;
2924
import io.cdap.cdap.api.annotation.Macro;
3025
import io.cdap.cdap.api.annotation.Name;
3126
import io.cdap.cdap.api.annotation.Plugin;
32-
import io.cdap.cdap.etl.api.Arguments;
27+
import io.cdap.cdap.api.exception.ErrorCategory;
28+
import io.cdap.cdap.api.exception.ErrorType;
29+
import io.cdap.cdap.api.exception.ErrorUtils;
3330
import io.cdap.cdap.etl.api.FailureCollector;
3431
import io.cdap.cdap.etl.api.PipelineConfigurer;
3532
import io.cdap.cdap.etl.api.action.Action;
3633
import io.cdap.cdap.etl.api.action.ActionContext;
3734
import io.cdap.plugin.gcp.common.CmekUtils;
3835
import io.cdap.plugin.gcp.common.GCPConfig;
36+
import io.cdap.plugin.gcp.common.GCPErrorDetailsProviderUtil;
3937
import io.cdap.plugin.gcp.common.GCPUtils;
4038
import io.cdap.plugin.gcp.gcs.GCSPath;
41-
import io.cdap.plugin.gcp.gcs.sink.GCSBatchSink;
4239
import org.apache.hadoop.conf.Configuration;
4340
import org.apache.hadoop.fs.FileSystem;
4441
import org.apache.hadoop.fs.Path;
@@ -86,9 +83,16 @@ public void run(ActionContext context) throws Exception {
8683
return;
8784
}
8885
String serviceAccount = config.getServiceAccount();
89-
Credentials credentials = serviceAccount == null ?
90-
null : GCPUtils.loadServiceAccountCredentials(serviceAccount, isServiceAccountFilePath);
91-
86+
Credentials credentials = null;
87+
try {
88+
credentials = serviceAccount == null ? null : GCPUtils.loadServiceAccountCredentials(serviceAccount,
89+
isServiceAccountFilePath);
90+
} catch (IOException e) {
91+
String errorReason = "Failed to load service account credentials.";
92+
collector.addFailure(String.format("%s %s: %s", errorReason, e.getClass().getName(), e.getMessage()), null)
93+
.withStacktrace(e.getStackTrace());
94+
collector.getOrThrowException();
95+
}
9296
Map<String, String> map = GCPUtils.generateGCSAuthProperties(serviceAccount, config.getServiceAccountType());
9397
map.forEach(configuration::set);
9498

@@ -125,19 +129,21 @@ public void run(ActionContext context) throws Exception {
125129
Bucket bucket = null;
126130
try {
127131
bucket = storage.get(gcsPath.getBucket());
128-
} catch (StorageException e) {
132+
} catch (Exception e) {
129133
// Add more descriptive error message
130-
throw new RuntimeException(
131-
String.format("Unable to access or create bucket %s. ", gcsPath.getBucket())
132-
+ "Ensure you entered the correct bucket path and have permissions for it.", e);
134+
String errorReason = String.format("Unable to access GCS bucket '%s'", gcsPath.getBucket());
135+
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason, ErrorType.UNKNOWN,
136+
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
133137
}
134138
if (bucket == null) {
135139
GCPUtils.createBucket(storage, gcsPath.getBucket(), config.location, cmekKeyName);
136140
undoBucket.add(bucketPath);
137141
} else if (gcsPath.equals(bucketPath) && config.failIfExists()) {
138142
// if the gcs path is just a bucket, and it exists, fail the pipeline
139143
rollback = true;
140-
throw new Exception(String.format("Path %s already exists", gcsPath));
144+
String errorReason = String.format("Path %s already exists", gcsPath);
145+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
146+
errorReason, errorReason, ErrorType.USER, true, null);
141147
}
142148
}
143149

@@ -146,7 +152,9 @@ public void run(ActionContext context) throws Exception {
146152
fs = gcsPath.getFileSystem(configuration);
147153
} catch (IOException e) {
148154
rollback = true;
149-
throw new Exception("Unable to get GCS filesystem handler. " + e.getMessage(), e);
155+
String errorReason = "Unable to get GCS filesystem handler.";
156+
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason, ErrorType.UNKNOWN,
157+
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
150158
}
151159
if (!fs.exists(gcsPath)) {
152160
try {
@@ -156,12 +164,16 @@ public void run(ActionContext context) throws Exception {
156164
} catch (IOException e) {
157165
LOG.warn(String.format("Failed to create path '%s'", gcsPath));
158166
rollback = true;
159-
throw e;
167+
String errorReason = String.format("Failed to create path %s.", gcsPath);
168+
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason,
169+
ErrorType.UNKNOWN, true, GCPUtils.GCS_SUPPORTED_DOC_URL);
160170
}
161171
} else {
162172
if (config.failIfExists()) {
163173
rollback = true;
164-
throw new Exception(String.format("Path %s already exists", gcsPath));
174+
String errorReason = String.format("Path %s already exists", gcsPath);
175+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
176+
errorReason, errorReason, ErrorType.SYSTEM, true, null);
165177
}
166178
}
167179
}

0 commit comments

Comments
 (0)