Skip to content

Commit 23fb809

Browse files
authored
Merge branch 'data-integrations:develop' into GoogleBigQSource
2 parents 645c24c + b8ca08a commit 23fb809

File tree

7 files changed

+106
-31
lines changed

7 files changed

+106
-31
lines changed

src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,14 @@
3030
import io.cdap.cdap.api.data.format.StructuredRecord;
3131
import io.cdap.cdap.api.data.schema.Schema;
3232
import io.cdap.cdap.api.dataset.lib.KeyValue;
33+
import io.cdap.cdap.api.exception.ProgramFailureException;
3334
import io.cdap.cdap.etl.api.Emitter;
3435
import io.cdap.cdap.etl.api.FailureCollector;
3536
import io.cdap.cdap.etl.api.batch.BatchSink;
3637
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
38+
import io.cdap.cdap.etl.api.exception.ErrorContext;
3739
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
40+
import io.cdap.cdap.etl.api.exception.ErrorPhase;
3841
import io.cdap.plugin.common.Asset;
3942
import io.cdap.plugin.gcp.bigquery.common.BigQueryErrorDetailsProvider;
4043
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema;
@@ -43,6 +46,7 @@
4346
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
4447
import io.cdap.plugin.gcp.common.CmekUtils;
4548
import io.cdap.plugin.gcp.common.GCPUtils;
49+
import io.cdap.plugin.gcp.gcs.GCSErrorDetailsProvider;
4650
import org.apache.hadoop.conf.Configuration;
4751
import org.apache.hadoop.io.NullWritable;
4852
import org.slf4j.Logger;
@@ -90,18 +94,32 @@ public final void prepareRun(BatchSinkContext context) throws Exception {
9094
Credentials credentials = serviceAccount == null ?
9195
null : GCPUtils.loadServiceAccountCredentials(serviceAccount, config.isServiceAccountFilePath());
9296
String project = config.getProject();
93-
bigQuery = GCPUtils.getBigQuery(project, credentials, null);
9497
FailureCollector collector = context.getFailureCollector();
9598
CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector);
9699
collector.getOrThrowException();
97100
baseConfiguration = getBaseConfiguration(cmekKeyName);
98101

99102
// Get required dataset ID and dataset instance (if it exists)
100103
DatasetId datasetId = DatasetId.of(config.getDatasetProject(), config.getDataset());
101-
Dataset dataset = bigQuery.getDataset(datasetId);
104+
Dataset dataset;
105+
try {
106+
bigQuery = GCPUtils.getBigQuery(project, credentials, null);
107+
dataset = bigQuery.getDataset(datasetId);
108+
} catch (Exception e) {
109+
ProgramFailureException ex = new BigQueryErrorDetailsProvider().getExceptionDetails(e,
110+
new ErrorContext(ErrorPhase.WRITING));
111+
throw ex == null ? e : ex;
112+
}
102113

103114
// Get the required bucket name and bucket instance (if it exists)
104-
Storage storage = GCPUtils.getStorage(project, credentials);
115+
Storage storage;
116+
try {
117+
storage = GCPUtils.getStorage(project, credentials);;
118+
} catch (Exception e) {
119+
ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e,
120+
new ErrorContext(ErrorPhase.WRITING));
121+
throw ex == null ? e : ex;
122+
}
105123
String bucketName = BigQueryUtil.getStagingBucketName(context.getArguments().asMap(), config.getLocation(),
106124
dataset, config.getBucket());
107125
bucketName = BigQuerySinkUtils.configureBucket(baseConfiguration, bucketName, runUUID.toString());

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,10 @@ private static void createDataset(BigQuery bigQuery, DatasetId dataset, @Nullabl
199199
ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(e.getCode());
200200
String errorReason = String.format("%s %s %s For more details, see %s", e.getCode(),
201201
e.getMessage(), pair.getCorrectiveAction(), GCPUtils.BQ_SUPPORTED_DOC_URL);
202+
String errorMessageFinal = String.format("%s %s: %s", errorMessage.get(),
203+
e.getClass().getName(), e.getMessage());
202204
throw ErrorUtils.getProgramFailureException(
203-
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage.get(),
205+
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessageFinal,
204206
pair.getErrorType(), true, ErrorCodeType.HTTP, String.valueOf(e.getCode()),
205207
GCPUtils.BQ_SUPPORTED_DOC_URL, e);
206208
}
@@ -249,8 +251,10 @@ private static void createBucket(Storage storage, String bucket, @Nullable Strin
249251
ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(e.getCode());
250252
String errorReason = String.format("%s %s %s For more details, see %s", e.getCode(),
251253
e.getMessage(), pair.getCorrectiveAction(), GCPUtils.GCS_SUPPORTED_DOC_URL);
254+
String errorMessageFinal = String.format("%s %s: %s", errorMessage.get(),
255+
e.getClass().getName(), e.getMessage());
252256
throw ErrorUtils.getProgramFailureException(
253-
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessage.get(),
257+
new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), errorReason, errorMessageFinal,
254258
pair.getErrorType(), true, ErrorCodeType.HTTP, String.valueOf(e.getCode()),
255259
GCPUtils.GCS_SUPPORTED_DOC_URL, e);
256260
}

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import io.cdap.cdap.api.data.format.StructuredRecord;
4040
import io.cdap.cdap.api.data.schema.Schema;
4141
import io.cdap.cdap.api.dataset.lib.KeyValue;
42+
import io.cdap.cdap.api.exception.ProgramFailureException;
4243
import io.cdap.cdap.etl.api.Emitter;
4344
import io.cdap.cdap.etl.api.FailureCollector;
4445
import io.cdap.cdap.etl.api.PipelineConfigurer;
@@ -48,7 +49,9 @@
4849
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
4950
import io.cdap.cdap.etl.api.connector.Connector;
5051
import io.cdap.cdap.etl.api.engine.sql.SQLEngineInput;
52+
import io.cdap.cdap.etl.api.exception.ErrorContext;
5153
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
54+
import io.cdap.cdap.etl.api.exception.ErrorPhase;
5255
import io.cdap.cdap.etl.api.validation.ValidationFailure;
5356
import io.cdap.plugin.common.Asset;
5457
import io.cdap.plugin.common.LineageRecorder;
@@ -60,6 +63,7 @@
6063
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
6164
import io.cdap.plugin.gcp.common.CmekUtils;
6265
import io.cdap.plugin.gcp.common.GCPUtils;
66+
import io.cdap.plugin.gcp.gcs.GCSErrorDetailsProvider;
6367
import org.apache.avro.generic.GenericData;
6468
import org.apache.hadoop.conf.Configuration;
6569
import org.apache.hadoop.io.LongWritable;
@@ -147,9 +151,16 @@ public void prepareRun(BatchSourceContext context) throws Exception {
147151
collector.getOrThrowException();
148152
}
149153

150-
BigQuery bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, null);
151-
Dataset dataset = bigQuery.getDataset(DatasetId.of(config.getDatasetProject(), config.getDataset()));
152-
Storage storage = GCPUtils.getStorage(config.getProject(), credentials);
154+
BigQuery bigQuery;
155+
Dataset dataset;
156+
try {
157+
bigQuery = GCPUtils.getBigQuery(config.getProject(), credentials, null);
158+
dataset = bigQuery.getDataset(DatasetId.of(config.getDatasetProject(), config.getDataset()));
159+
} catch (Exception e) {
160+
ProgramFailureException ex = new BigQueryErrorDetailsProvider().getExceptionDetails(e,
161+
new ErrorContext(ErrorPhase.READING));
162+
throw ex == null ? e : ex;
163+
}
153164

154165
// Get Configuration for this run
155166
bucketPath = UUID.randomUUID().toString();
@@ -169,10 +180,18 @@ public void prepareRun(BatchSourceContext context) throws Exception {
169180
dataset, config.getBucket());
170181

171182
// Configure GCS Bucket to use
183+
Storage storage;
184+
try {
185+
storage = GCPUtils.getStorage(config.getProject(), credentials);;
186+
} catch (Exception e) {
187+
ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e,
188+
new ErrorContext(ErrorPhase.READING));
189+
throw ex == null ? e : ex;
190+
}
172191
String bucket = null;
173192
try {
174-
bucket = BigQuerySourceUtils.getOrCreateBucket(configuration, storage, bucketName, dataset, bucketPath,
175-
cmekKeyName);
193+
bucket = BigQuerySourceUtils.getOrCreateBucket(configuration, storage, bucketName, dataset,
194+
bucketPath, cmekKeyName);
176195
} catch (Exception e) {
177196
String errorReason = "Failed to create bucket.";
178197
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), null)

src/main/java/io/cdap/plugin/gcp/common/GCPErrorDetailsProvider.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
* A custom ErrorDetailsProvider for GCP plugins.
3737
*/
3838
public class GCPErrorDetailsProvider implements ErrorDetailsProvider {
39+
private static final String ERROR_MESSAGE_FORMAT = "Error occurred in the phase: '%s'. %s: %s";
3940

4041
/**
4142
* Get a ProgramFailureException with the given error
@@ -71,12 +72,12 @@ public ProgramFailureException getExceptionDetails(Exception e, ErrorContext err
7172
* @param e The HttpResponseException to get the error information from.
7273
* @return A ProgramFailureException with the given error information.
7374
*/
74-
private ProgramFailureException getProgramFailureException(HttpResponseException e, ErrorContext errorContext) {
75+
private ProgramFailureException getProgramFailureException(HttpResponseException e,
76+
ErrorContext errorContext) {
7577
Integer statusCode = e.getStatusCode();
7678
ErrorUtils.ActionErrorPair pair = ErrorUtils.getActionErrorByStatusCode(statusCode);
7779
String errorReason = String.format("%s %s. %s", e.getStatusCode(), e.getStatusMessage(),
7880
pair.getCorrectiveAction());
79-
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";
8081

8182
String errorMessage = e.getMessage();
8283
String externalDocumentationLink = null;
@@ -95,7 +96,8 @@ private ProgramFailureException getProgramFailureException(HttpResponseException
9596
}
9697

9798
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
98-
errorReason, String.format(errorMessageFormat, errorContext.getPhase(), errorMessage),
99+
errorReason, String.format(ERROR_MESSAGE_FORMAT, errorContext.getPhase(),
100+
e.getClass().getName(), errorMessage),
99101
pair.getErrorType(), true, ErrorCodeType.HTTP, statusCode.toString(),
100102
externalDocumentationLink, e);
101103
}
@@ -122,11 +124,12 @@ private String getErrorMessage(GoogleJsonResponseException exception) {
122124
* @param e The IllegalArgumentException to get the error information from.
123125
* @return A ProgramFailureException with the given error information.
124126
*/
125-
private ProgramFailureException getProgramFailureException(IllegalArgumentException e, ErrorContext errorContext) {
127+
private ProgramFailureException getProgramFailureException(IllegalArgumentException e,
128+
ErrorContext errorContext) {
126129
String errorMessage = e.getMessage();
127-
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";
128-
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), errorMessage,
129-
String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.USER, false, e);
130+
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
131+
errorMessage, String.format(ERROR_MESSAGE_FORMAT, errorContext.getPhase(),
132+
e.getClass().getName(), errorMessage), ErrorType.USER, false, e);
130133
}
131134

132135
/**
@@ -136,11 +139,12 @@ private ProgramFailureException getProgramFailureException(IllegalArgumentExcept
136139
* @param e The IllegalStateException to get the error information from.
137140
* @return A ProgramFailureException with the given error information.
138141
*/
139-
private ProgramFailureException getProgramFailureException(IllegalStateException e, ErrorContext errorContext) {
142+
private ProgramFailureException getProgramFailureException(IllegalStateException e,
143+
ErrorContext errorContext) {
140144
String errorMessage = e.getMessage();
141-
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";
142-
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN), errorMessage,
143-
String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.SYSTEM, false, e);
145+
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategoryEnum.PLUGIN),
146+
errorMessage, String.format(ERROR_MESSAGE_FORMAT, errorContext.getPhase(),
147+
e.getClass().getName(), errorMessage), ErrorType.SYSTEM, false, e);
144148
}
145149

146150
/**

src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSBatchSink.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,17 @@
3232
import io.cdap.cdap.api.annotation.Name;
3333
import io.cdap.cdap.api.annotation.Plugin;
3434
import io.cdap.cdap.api.data.schema.Schema;
35+
import io.cdap.cdap.api.exception.ProgramFailureException;
3536
import io.cdap.cdap.api.plugin.PluginConfig;
3637
import io.cdap.cdap.etl.api.FailureCollector;
3738
import io.cdap.cdap.etl.api.PipelineConfigurer;
3839
import io.cdap.cdap.etl.api.StageMetrics;
3940
import io.cdap.cdap.etl.api.batch.BatchSink;
4041
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
4142
import io.cdap.cdap.etl.api.connector.Connector;
43+
import io.cdap.cdap.etl.api.exception.ErrorContext;
4244
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
45+
import io.cdap.cdap.etl.api.exception.ErrorPhase;
4346
import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat;
4447
import io.cdap.plugin.common.Asset;
4548
import io.cdap.plugin.common.ConfigUtil;
@@ -138,10 +141,14 @@ public void prepareRun(BatchSinkContext context) throws Exception {
138141
}
139142

140143
String bucketName = config.getBucket(collector);
141-
Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
142-
String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket.";
143-
String correctiveAction = "Ensure you entered the correct bucket path and "
144-
+ "have permissions for it.";
144+
Storage storage;
145+
try {
146+
storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
147+
} catch (Exception e) {
148+
ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e,
149+
new ErrorContext(ErrorPhase.READING));
150+
throw ex == null ? e : ex;
151+
}
145152
Bucket bucket;
146153
String location = null;
147154
try {
@@ -153,6 +160,9 @@ public void prepareRun(BatchSinkContext context) throws Exception {
153160
GCPUtils.createBucket(storage, bucketName, location, cmekKeyName);
154161
}
155162
} catch (StorageException e) {
163+
String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket.";
164+
String correctiveAction = "Ensure you entered the correct bucket path and "
165+
+ "have permissions for it.";
156166
String errorReason = String.format(errorReasonFormat, e.getCode());
157167
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), correctiveAction)
158168
.withStacktrace(e.getStackTrace());

src/main/java/io/cdap/plugin/gcp/gcs/sink/GCSMultiBatchSink.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.cdap.cdap.api.data.format.StructuredRecord;
3333
import io.cdap.cdap.api.data.schema.Schema;
3434
import io.cdap.cdap.api.dataset.lib.KeyValue;
35+
import io.cdap.cdap.api.exception.ProgramFailureException;
3536
import io.cdap.cdap.api.plugin.InvalidPluginConfigException;
3637
import io.cdap.cdap.api.plugin.InvalidPluginProperty;
3738
import io.cdap.cdap.api.plugin.PluginProperties;
@@ -41,7 +42,9 @@
4142
import io.cdap.cdap.etl.api.batch.BatchSink;
4243
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
4344
import io.cdap.cdap.etl.api.connector.Connector;
45+
import io.cdap.cdap.etl.api.exception.ErrorContext;
4446
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
47+
import io.cdap.cdap.etl.api.exception.ErrorPhase;
4548
import io.cdap.cdap.etl.api.validation.ValidatingOutputFormat;
4649
import io.cdap.plugin.common.batch.sink.SinkOutputFormatProvider;
4750
import io.cdap.plugin.format.FileFormat;
@@ -126,7 +129,7 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
126129
}
127130

128131
@Override
129-
public void prepareRun(BatchSinkContext context) throws IOException, InstantiationException {
132+
public void prepareRun(BatchSinkContext context) throws Exception {
130133
FailureCollector collector = context.getFailureCollector();
131134
config.validate(collector, context.getArguments().asMap());
132135
collector.getOrThrowException();
@@ -156,15 +159,22 @@ public void prepareRun(BatchSinkContext context) throws IOException, Instantiati
156159
}
157160

158161
String bucketName = config.getBucket(collector);
159-
Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
160-
String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket.";
161-
String correctiveAction = "Ensure you entered the correct bucket path and "
162-
+ "have permissions for it.";
162+
Storage storage;
163+
try {
164+
storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
165+
} catch (Exception e) {
166+
ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e,
167+
new ErrorContext(ErrorPhase.READING));
168+
throw ex == null ? e : ex;
169+
}
163170
try {
164171
if (storage.get(bucketName) == null) {
165172
GCPUtils.createBucket(storage, bucketName, config.getLocation(), cmekKeyName);
166173
}
167174
} catch (StorageException e) {
175+
String errorReasonFormat = "Error code: %s, Unable to read or access GCS bucket.";
176+
String correctiveAction = "Ensure you entered the correct bucket path and "
177+
+ "have permissions for it.";
168178
String errorReason = String.format(errorReasonFormat, e.getCode());
169179
collector.addFailure(String.format("%s %s", errorReason, e.getMessage()), correctiveAction)
170180
.withStacktrace(e.getStackTrace());

src/main/java/io/cdap/plugin/gcp/gcs/source/GCSSource.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,15 @@
2828
import io.cdap.cdap.api.annotation.MetadataProperty;
2929
import io.cdap.cdap.api.annotation.Name;
3030
import io.cdap.cdap.api.annotation.Plugin;
31+
import io.cdap.cdap.api.exception.ProgramFailureException;
3132
import io.cdap.cdap.etl.api.FailureCollector;
3233
import io.cdap.cdap.etl.api.PipelineConfigurer;
3334
import io.cdap.cdap.etl.api.batch.BatchSource;
3435
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
3536
import io.cdap.cdap.etl.api.connector.Connector;
37+
import io.cdap.cdap.etl.api.exception.ErrorContext;
3638
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
39+
import io.cdap.cdap.etl.api.exception.ErrorPhase;
3740
import io.cdap.plugin.common.Asset;
3841
import io.cdap.plugin.common.ConfigUtil;
3942
import io.cdap.plugin.common.LineageRecorder;
@@ -118,7 +121,14 @@ public void prepareRun(BatchSourceContext context) throws Exception {
118121
collector.getOrThrowException();
119122
}
120123

121-
Storage storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
124+
Storage storage;
125+
try {
126+
storage = GCPUtils.getStorage(config.connection.getProject(), credentials);
127+
} catch (Exception e) {
128+
ProgramFailureException ex = new GCSErrorDetailsProvider().getExceptionDetails(e,
129+
new ErrorContext(ErrorPhase.READING));
130+
throw ex == null ? e : ex;
131+
}
122132
String location = null;
123133
try {
124134
// Get location of the source for lineage

0 commit comments

Comments
 (0)