Skip to content

Commit 5cafda8

Browse files
authored
Merge branch 'data-integrations:develop' into develop
2 parents 4af6ecc + 77e0ac8 commit 5cafda8

File tree

5 files changed

+148
-61
lines changed

5 files changed

+148
-61
lines changed

src/main/java/io/cdap/plugin/gcp/gcs/StorageClient.java

Lines changed: 103 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,11 @@
2828
import com.google.cloud.storage.StorageException;
2929
import com.google.cloud.storage.StorageOptions;
3030
import com.google.common.annotations.VisibleForTesting;
31+
import io.cdap.cdap.api.exception.ErrorCategory;
32+
import io.cdap.cdap.api.exception.ErrorType;
33+
import io.cdap.cdap.api.exception.ErrorUtils;
3134
import io.cdap.plugin.gcp.common.GCPConnectorConfig;
35+
import io.cdap.plugin.gcp.common.GCPErrorDetailsProviderUtil;
3236
import io.cdap.plugin.gcp.common.GCPUtils;
3337
import org.slf4j.Logger;
3438
import org.slf4j.LoggerFactory;
@@ -68,7 +72,14 @@ public Blob pickABlob(String path) {
6872
return null;
6973
}
7074
GCSPath gcsPath = GCSPath.from(path);
71-
Page<Blob> blobPage = storage.list(gcsPath.getBucket(), Storage.BlobListOption.prefix(gcsPath.getName()));
75+
Page<Blob> blobPage;
76+
try {
77+
blobPage = storage.list(gcsPath.getBucket(), Storage.BlobListOption.prefix(gcsPath.getName()));
78+
} catch (Exception e) {
79+
String errorReason = String.format("Unable to list objects in bucket %s.", gcsPath.getBucket());
80+
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason, ErrorType.UNKNOWN,
81+
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
82+
}
7283
Iterator<Blob> iterator = blobPage.getValues().iterator();
7384
while (iterator.hasNext()) {
7485
Blob blob = iterator.next();
@@ -89,7 +100,13 @@ public void setMetaData(Blob blob, Map<String, String> metaData) {
89100
if (blob == null || metaData == null || metaData.isEmpty()) {
90101
return;
91102
}
92-
storage.update(BlobInfo.newBuilder(blob.getBlobId()).setMetadata(metaData).build());
103+
try {
104+
storage.update(BlobInfo.newBuilder(blob.getBlobId()).setMetadata(metaData).build());
105+
} catch (Exception e) {
106+
String errorReason = String.format("Unable to update metadata for blob %s.", blob.getName());
107+
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason, ErrorType.UNKNOWN,
108+
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
109+
}
93110
}
94111

95112
/**
@@ -102,7 +119,14 @@ public void mapMetaDataForAllBlobs(String path, Consumer<Map<String, String>> fu
102119
return;
103120
}
104121
GCSPath gcsPath = GCSPath.from(path);
105-
Page<Blob> blobPage = storage.list(gcsPath.getBucket(), Storage.BlobListOption.prefix(gcsPath.getName()));
122+
Page<Blob> blobPage;
123+
try {
124+
blobPage = storage.list(gcsPath.getBucket(), Storage.BlobListOption.prefix(gcsPath.getName()));
125+
} catch (Exception e) {
126+
String errorReason = String.format("Unable to list objects in bucket %s.", gcsPath.getBucket());
127+
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason, ErrorType.UNKNOWN,
128+
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
129+
}
106130
Iterator<Blob> blobIterator = blobPage.iterateAll().iterator();
107131
while (blobIterator.hasNext()) {
108132
Blob blob = blobIterator.next();
@@ -132,9 +156,11 @@ public void createBucketIfNotExists(GCSPath path, @Nullable String location, @Nu
132156
LOG.warn("Getting 409 Conflict: {} Bucket at destination path {} may already exist.",
133157
e.getMessage(), path.getUri());
134158
} else {
135-
throw new RuntimeException(
159+
String errorReason =
136160
String.format("Unable to create bucket %s. Ensure you entered the correct bucket path and " +
137-
"have permissions for it.", path.getBucket()), e);
161+
"have permissions for it.", path.getBucket());
162+
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason, ErrorType.UNKNOWN,
163+
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
138164
}
139165
}
140166
}
@@ -173,9 +199,16 @@ public void move(GCSPath sourcePath, GCSPath destPath, boolean recursive, boolea
173199
* Get all the matching wildcard paths given the regex input.
174200
*/
175201
public List<GCSPath> getMatchedPaths(GCSPath sourcePath, boolean recursive, Pattern wildcardRegex) {
176-
Page<Blob> blobPage = storage.list(sourcePath.getBucket(), Storage.BlobListOption.prefix(
202+
Page<Blob> blobPage;
203+
try {
204+
blobPage = storage.list(sourcePath.getBucket(), Storage.BlobListOption.prefix(
177205
getWildcardPathPrefix(sourcePath, wildcardRegex)
178-
));
206+
));
207+
} catch (Exception e) {
208+
String errorReason = String.format("Unable to list objects in bucket %s.", sourcePath.getBucket());
209+
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason, ErrorType.UNKNOWN,
210+
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
211+
}
179212
List<String> blobPageNames = new ArrayList<>();
180213
blobPage.getValues().forEach(blob -> blobPageNames.add(blob.getName()));
181214
return getFilterMatchedPaths(sourcePath, blobPageNames, recursive);
@@ -212,58 +245,84 @@ static List<GCSPath> getFilterMatchedPaths(GCSPath sourcePath, List<String> blob
212245
private void pairTraverse(GCSPath sourcePath, GCSPath destPath, boolean recursive, boolean overwrite,
213246
Consumer<BlobPair> consumer) {
214247

215-
Bucket sourceBucket = null;
248+
Bucket sourceBucket;
216249
try {
217250
sourceBucket = storage.get(sourcePath.getBucket());
218-
} catch (StorageException e) {
251+
} catch (Exception e) {
219252
// Add more descriptive error message
220-
throw new RuntimeException(
221-
String.format("Unable to access source bucket %s. ", sourcePath.getBucket())
222-
+ "Ensure you entered the correct bucket path.", e);
253+
String errorReason = String.format("Unable to access GCS bucket '%s'", sourcePath.getBucket());
254+
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason, ErrorType.UNKNOWN,
255+
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
223256
}
224257
if (sourceBucket == null) {
225-
throw new IllegalArgumentException(
226-
String.format("Source bucket '%s' does not exist.", sourcePath.getBucket()));
258+
String errorReason = String.format("Source bucket '%s' does not exist.", sourcePath.getBucket());
259+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
260+
errorReason, errorReason, ErrorType.USER, true, null);
227261
}
228-
Bucket destBucket = null;
262+
Bucket destBucket;
229263
try {
230264
destBucket = storage.get(destPath.getBucket());
231-
} catch (StorageException e) {
265+
} catch (Exception e) {
232266
// Add more descriptive error message
233-
throw new RuntimeException(
234-
String.format("Unable to access destination bucket %s. ", destPath.getBucket())
235-
+ "Ensure you entered the correct bucket path.", e);
267+
String errorReason = String.format("Unable to access GCS bucket '%s'", destPath.getBucket());
268+
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason, ErrorType.UNKNOWN,
269+
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
236270
}
237271
if (destBucket == null) {
238-
throw new IllegalArgumentException(
239-
String.format("Destination bucket '%s' does not exist. Please create it first.", destPath.getBucket()));
272+
String errorReason =
273+
String.format("Destination bucket '%s' does not exist. Please create it first.", destPath.getBucket());
274+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
275+
errorReason, errorReason, ErrorType.USER, true, null);
240276
}
241277

242278
boolean destinationBaseExists;
243279
String baseDestName = destPath.getName();
244-
if (destPath.isBucket() || storage.get(BlobId.of(destPath.getBucket(), baseDestName)) != null) {
280+
boolean destinationBlobExists;
281+
try {
282+
destinationBlobExists = destPath.isBucket() || storage.get(BlobId.of(destPath.getBucket(), baseDestName)) != null;
283+
} catch (Exception e) {
284+
String errorReason = String.format("Unable to access GCS bucket '%s'", destPath.getBucket());
285+
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason, ErrorType.UNKNOWN,
286+
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
287+
}
288+
if (destinationBlobExists) {
245289
destinationBaseExists = true;
246290
} else {
247291
// if gs://bucket2/subdir doesn't exist, check if gs://bucket2/subdir/ exists
248292
// similarly, if gs://bucket2/subdir/ doesn't exist, check if gs://bucket2/subdir exists
249293
// this is because "cp dir0 subdir" and "cp dir0 subdir/" are equivalent if the 'subdir' directory exists
250294
String modifiedName = baseDestName.endsWith("/") ?
251295
baseDestName.substring(0, baseDestName.length() - 1) : baseDestName + "/";
252-
destinationBaseExists = storage.get(BlobId.of(destPath.getBucket(), modifiedName)) != null;
296+
try {
297+
destinationBaseExists = storage.get(BlobId.of(destPath.getBucket(), modifiedName)) != null;
298+
} catch (Exception e) {
299+
String errorReason = String.format("Unable to access GCS bucket '%s'", destPath.getBucket());
300+
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason, ErrorType.UNKNOWN,
301+
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
302+
}
253303
}
254304

255305
List<BlobPair> copyList = new ArrayList<>();
256306
traverse(BlobId.of(sourcePath.getBucket(), sourcePath.getName()), recursive, sourceBlob -> {
257307
BlobId destBlobID = resolve(sourcePath.getName(), sourceBlob.getBlobId().getName(),
258308
destPath, destinationBaseExists);
259309
if (!overwrite) {
260-
Blob destBlob = storage.get(destBlobID);
310+
Blob destBlob;
311+
try {
312+
destBlob = storage.get(destBlobID);
313+
} catch (Exception e) {
314+
String errorReason = String.format("Unable to access GCS bucket '%s'", destPath.getBucket());
315+
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason, ErrorType.UNKNOWN,
316+
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
317+
}
261318
// we can't just use Blob's isDirectory() because the cloud console will create a 'directory' by creating
262319
// a 0 size placeholder blob that ends with '/'. This placeholder blob's isDirectory() method returns false,
263320
// but we don't want the overwrite check to fail on it. So we explicitly ignore the check for these 0 size
264321
// placeholder blobs.
265322
if (destBlob != null && !destBlob.getName().endsWith("/") && destBlob.getSize() != 0) {
266-
throw new IllegalArgumentException(String.format("%s already exists.", toPath(destBlobID)));
323+
String errorReason = String.format("%s already exists.", toPath(destBlobID));
324+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
325+
errorReason, errorReason, ErrorType.USER, true, null);
267326
}
268327
}
269328
copyList.add(new BlobPair(sourceBlob, destBlobID));
@@ -347,8 +406,15 @@ static String append(String base, String part) {
347406
* @param consumer the blob consumer
348407
*/
349408
private void traverse(BlobId blobId, boolean recursive, Consumer<Blob> consumer) {
350-
Page<Blob> blobList = storage.list(blobId.getBucket(), Storage.BlobListOption.currentDirectory(),
351-
Storage.BlobListOption.prefix(blobId.getName()));
409+
Page<Blob> blobList;
410+
try {
411+
blobList = storage.list(blobId.getBucket(), Storage.BlobListOption.currentDirectory(),
412+
Storage.BlobListOption.prefix(blobId.getName()));
413+
} catch (Exception e) {
414+
String errorReason = String.format("");
415+
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason, ErrorType.UNKNOWN,
416+
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
417+
}
352418
for (Blob blob : blobList.iterateAll()) {
353419
if (!blob.isDirectory()) {
354420
consumer.accept(blob);
@@ -363,11 +429,17 @@ private static String toPath(BlobId blobId) {
363429
}
364430

365431
public static StorageClient create(String project, @Nullable String serviceAccount,
366-
Boolean isServiceAccountFilePath, @Nullable Integer readTimeout)
367-
throws IOException {
432+
Boolean isServiceAccountFilePath, @Nullable Integer readTimeout) {
368433
StorageOptions.Builder builder = StorageOptions.newBuilder().setProjectId(project);
369434
if (serviceAccount != null) {
370-
builder.setCredentials(GCPUtils.loadServiceAccountCredentials(serviceAccount, isServiceAccountFilePath));
435+
try {
436+
builder.setCredentials(GCPUtils.loadServiceAccountCredentials(serviceAccount, isServiceAccountFilePath));
437+
} catch (IOException e) {
438+
String errorReason = "Unable to load service account credentials.";
439+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
440+
errorReason, String.format("%s %s: %s", errorReason, e.getClass().getName(), e.getMessage()),
441+
ErrorType.UNKNOWN, false, null);
442+
}
371443
}
372444
if (readTimeout != null) {
373445
builder.setTransportOptions(HttpTransportOptions.newBuilder().setReadTimeout(readTimeout * 1000).build());
@@ -376,7 +448,7 @@ public static StorageClient create(String project, @Nullable String serviceAccou
376448
return new StorageClient(storage);
377449
}
378450

379-
public static StorageClient create(GCPConnectorConfig config) throws IOException {
451+
public static StorageClient create(GCPConnectorConfig config) {
380452
return create(config.getProject(), config.getServiceAccount(), config.isServiceAccountFilePath(), null);
381453
}
382454

src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSArgumentSetter.java

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
import io.cdap.cdap.api.annotation.Name;
2929
import io.cdap.cdap.api.annotation.Plugin;
3030
import io.cdap.cdap.api.data.schema.Schema;
31+
import io.cdap.cdap.api.exception.ErrorCategory;
32+
import io.cdap.cdap.api.exception.ErrorType;
33+
import io.cdap.cdap.api.exception.ErrorUtils;
3134
import io.cdap.cdap.etl.api.FailureCollector;
3235
import io.cdap.cdap.etl.api.PipelineConfigurer;
3336
import io.cdap.cdap.etl.api.StageConfigurer;
@@ -81,7 +84,7 @@ public void configurePipeline(PipelineConfigurer configurer) {
8184
}
8285

8386
@Override
84-
public void run(ActionContext context) throws Exception {
87+
public void run(ActionContext context) {
8588
config.validateProperties(context.getFailureCollector());
8689
String fileContent = GCSArgumentSetter.getContent(config);
8790

@@ -94,27 +97,36 @@ public void run(ActionContext context) throws Exception {
9497
if (value != null) {
9598
context.getArguments().set(name, value);
9699
} else {
97-
throw new RuntimeException(
98-
"Configuration '" + name + "' is null. Cannot set argument to null.");
100+
String errorReason = String.format("Configuration '%s' is null. Cannot set argument to null.", name);
101+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
102+
errorReason, errorReason, ErrorType.USER, false, null);
99103
}
100104
}
101105
} catch (JsonSyntaxException e) {
102-
throw new RuntimeException(
103-
String.format(
104-
"Could not parse response from '%s': %s", config.getPath(), e.getMessage()));
106+
String errorReason = String.format("Could not parse response from '%s': %s", config.getPath(), e.getMessage());
107+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
108+
errorReason, errorReason, ErrorType.USER, false, null);
105109
}
106110
}
107111

108-
private static Storage getStorage(GCSArgumentSetterConfig config) throws IOException {
112+
private static Storage getStorage(GCSArgumentSetterConfig config) {
109113
String serviceAccount = config.getServiceAccount();
110114
StorageOptions.Builder builder = StorageOptions.newBuilder().setProjectId(config.getProject());
111115
if (serviceAccount != null) {
112-
builder.setCredentials(GCPUtils.loadServiceAccountCredentials(serviceAccount, config.isServiceAccountFilePath()));
116+
try {
117+
builder.setCredentials(
118+
GCPUtils.loadServiceAccountCredentials(serviceAccount, config.isServiceAccountFilePath()));
119+
} catch (IOException e) {
120+
String errorReason = "Failed to load service account credentials.";
121+
String errorMessage = String.format("%s: %s", e.getClass(), e.getMessage());
122+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
123+
errorReason, errorMessage, ErrorType.USER, false, e);
124+
}
113125
}
114126
return builder.build().getService();
115127
}
116128

117-
public static String getContent(GCSArgumentSetterConfig config) throws IOException {
129+
public static String getContent(GCSArgumentSetterConfig config) {
118130
Storage storage = getStorage(config);
119131
GCSPath path = config.getPath();
120132
Blob blob = storage.get(path.getBucket(), path.getName());
@@ -150,7 +162,9 @@ public void setName(String name) {
150162

151163
public String getValue() {
152164
if (value == null) {
153-
throw new IllegalArgumentException("Null Argument value for name '" + name + "'");
165+
String errorReason = String.format("Null Argument value for name '%s'", name);
166+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
167+
errorReason, errorReason, ErrorType.USER, false, null);
154168
}
155169
if (type.equalsIgnoreCase("schema")) {
156170
return createSchema(value).toString();
@@ -180,7 +194,9 @@ public String getValue() {
180194
return Joiner.on(";").join(values);
181195
}
182196

183-
throw new IllegalArgumentException("Invalid argument value '" + value + "'");
197+
String errorReason = String.format("Invalid argument value '%s'", value);
198+
throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
199+
errorReason, errorReason, ErrorType.USER, false, null);
184200
}
185201

186202
private Schema createSchema(JsonElement array) {

src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSBucketDelete.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@
2525
import io.cdap.cdap.api.annotation.Macro;
2626
import io.cdap.cdap.api.annotation.Name;
2727
import io.cdap.cdap.api.annotation.Plugin;
28+
import io.cdap.cdap.api.exception.ErrorType;
2829
import io.cdap.cdap.etl.api.FailureCollector;
2930
import io.cdap.cdap.etl.api.PipelineConfigurer;
3031
import io.cdap.cdap.etl.api.action.Action;
3132
import io.cdap.cdap.etl.api.action.ActionContext;
3233
import io.cdap.plugin.gcp.common.GCPConfig;
34+
import io.cdap.plugin.gcp.common.GCPErrorDetailsProviderUtil;
3335
import io.cdap.plugin.gcp.common.GCPUtils;
3436
import io.cdap.plugin.gcp.gcs.GCSPath;
3537
import org.apache.hadoop.conf.Configuration;
@@ -76,8 +78,17 @@ public void run(ActionContext context) throws Exception {
7678
return;
7779
}
7880
String serviceAccount = config.getServiceAccount();
79-
Credentials credentials = serviceAccount == null ?
80-
null : GCPUtils.loadServiceAccountCredentials(serviceAccount, isServiceAccountFilePath);
81+
Credentials credentials = null;
82+
try {
83+
credentials = serviceAccount == null ? null : GCPUtils.loadServiceAccountCredentials(serviceAccount,
84+
isServiceAccountFilePath);
85+
} catch (IOException e) {
86+
String errorReason = "Failed to load service account credentials. ";
87+
context.getFailureCollector()
88+
.addFailure(String.format("%s %s: %s", errorReason, e.getClass().getName(), e.getMessage()), null)
89+
.withStacktrace(e.getStackTrace());
90+
context.getFailureCollector().getOrThrowException();
91+
}
8192
Map<String, String> map = GCPUtils.generateGCSAuthProperties(serviceAccount, config.getServiceAccountType());
8293
map.forEach(configuration::set);
8394
configuration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem");
@@ -99,9 +110,9 @@ public void run(ActionContext context) throws Exception {
99110
storage.get(gcsPath.getBucket());
100111
} catch (StorageException e) {
101112
// Add more descriptive error message
102-
throw new RuntimeException(
103-
String.format("Unable to access or create bucket %s. ", gcsPath.getBucket())
104-
+ "Ensure you entered the correct bucket path and have permissions for it.", e);
113+
String errorReason = String.format("Unable to access GCS bucket '%s'", gcsPath.getBucket());
114+
throw GCPErrorDetailsProviderUtil.getHttpResponseExceptionDetailsFromChain(e, errorReason, ErrorType.UNKNOWN,
115+
true, GCPUtils.GCS_SUPPORTED_DOC_URL);
105116
}
106117
String exactGCSPath = "gs://" + gcsPath.getBucket() + "/" + gcsPath.getName();
107118
if (exactGCSPath.contains("*")) {

0 commit comments

Comments
 (0)