16
16
17
17
package io .cdap .plugin .gcp .gcs .actions ;
18
18
19
- import com .google .api .pathtemplate .ValidationException ;
20
19
import com .google .auth .Credentials ;
21
- import com .google .auth .oauth2 .ServiceAccountCredentials ;
22
20
import com .google .cloud .kms .v1 .CryptoKeyName ;
23
21
import com .google .cloud .storage .Bucket ;
24
22
import com .google .cloud .storage .Storage ;
25
- import com .google .cloud .storage .StorageException ;
26
- import com .google .common .annotations .VisibleForTesting ;
27
- import com .google .common .base .Strings ;
28
23
import io .cdap .cdap .api .annotation .Description ;
29
24
import io .cdap .cdap .api .annotation .Macro ;
30
25
import io .cdap .cdap .api .annotation .Name ;
31
26
import io .cdap .cdap .api .annotation .Plugin ;
32
- import io .cdap .cdap .etl .api .Arguments ;
27
+ import io .cdap .cdap .api .exception .ErrorCategory ;
28
+ import io .cdap .cdap .api .exception .ErrorType ;
29
+ import io .cdap .cdap .api .exception .ErrorUtils ;
33
30
import io .cdap .cdap .etl .api .FailureCollector ;
34
31
import io .cdap .cdap .etl .api .PipelineConfigurer ;
35
32
import io .cdap .cdap .etl .api .action .Action ;
36
33
import io .cdap .cdap .etl .api .action .ActionContext ;
37
34
import io .cdap .plugin .gcp .common .CmekUtils ;
38
35
import io .cdap .plugin .gcp .common .GCPConfig ;
36
+ import io .cdap .plugin .gcp .common .GCPErrorDetailsProviderUtil ;
39
37
import io .cdap .plugin .gcp .common .GCPUtils ;
40
38
import io .cdap .plugin .gcp .gcs .GCSPath ;
41
- import io .cdap .plugin .gcp .gcs .sink .GCSBatchSink ;
42
39
import org .apache .hadoop .conf .Configuration ;
43
40
import org .apache .hadoop .fs .FileSystem ;
44
41
import org .apache .hadoop .fs .Path ;
@@ -86,9 +83,16 @@ public void run(ActionContext context) throws Exception {
86
83
return ;
87
84
}
88
85
String serviceAccount = config .getServiceAccount ();
89
- Credentials credentials = serviceAccount == null ?
90
- null : GCPUtils .loadServiceAccountCredentials (serviceAccount , isServiceAccountFilePath );
91
-
86
+ Credentials credentials = null ;
87
+ try {
88
+ credentials = serviceAccount == null ? null : GCPUtils .loadServiceAccountCredentials (serviceAccount ,
89
+ isServiceAccountFilePath );
90
+ } catch (IOException e ) {
91
+ String errorReason = "Failed to load service account credentials." ;
92
+ collector .addFailure (String .format ("%s %s: %s" , errorReason , e .getClass ().getName (), e .getMessage ()), null )
93
+ .withStacktrace (e .getStackTrace ());
94
+ collector .getOrThrowException ();
95
+ }
92
96
Map <String , String > map = GCPUtils .generateGCSAuthProperties (serviceAccount , config .getServiceAccountType ());
93
97
map .forEach (configuration ::set );
94
98
@@ -125,19 +129,21 @@ public void run(ActionContext context) throws Exception {
125
129
Bucket bucket = null ;
126
130
try {
127
131
bucket = storage .get (gcsPath .getBucket ());
128
- } catch (StorageException e ) {
132
+ } catch (Exception e ) {
129
133
// Add more descriptive error message
130
- throw new RuntimeException (
131
- String . format ( "Unable to access or create bucket %s. " , gcsPath . getBucket ())
132
- + "Ensure you entered the correct bucket path and have permissions for it." , e );
134
+ String errorReason = String . format ( "Unable to access GCS bucket '%s'" , gcsPath . getBucket ());
135
+ throw GCPErrorDetailsProviderUtil . getHttpResponseExceptionDetailsFromChain ( e , errorReason , ErrorType . UNKNOWN ,
136
+ true , GCPUtils . GCS_SUPPORTED_DOC_URL );
133
137
}
134
138
if (bucket == null ) {
135
139
GCPUtils .createBucket (storage , gcsPath .getBucket (), config .location , cmekKeyName );
136
140
undoBucket .add (bucketPath );
137
141
} else if (gcsPath .equals (bucketPath ) && config .failIfExists ()) {
138
142
// if the gcs path is just a bucket, and it exists, fail the pipeline
139
143
rollback = true ;
140
- throw new Exception (String .format ("Path %s already exists" , gcsPath ));
144
+ String errorReason = String .format ("Path %s already exists" , gcsPath );
145
+ throw ErrorUtils .getProgramFailureException (new ErrorCategory (ErrorCategory .ErrorCategoryEnum .PLUGIN ),
146
+ errorReason , errorReason , ErrorType .USER , true , null );
141
147
}
142
148
}
143
149
@@ -146,7 +152,9 @@ public void run(ActionContext context) throws Exception {
146
152
fs = gcsPath .getFileSystem (configuration );
147
153
} catch (IOException e ) {
148
154
rollback = true ;
149
- throw new Exception ("Unable to get GCS filesystem handler. " + e .getMessage (), e );
155
+ String errorReason = "Unable to get GCS filesystem handler." ;
156
+ throw GCPErrorDetailsProviderUtil .getHttpResponseExceptionDetailsFromChain (e , errorReason , ErrorType .UNKNOWN ,
157
+ true , GCPUtils .GCS_SUPPORTED_DOC_URL );
150
158
}
151
159
if (!fs .exists (gcsPath )) {
152
160
try {
@@ -156,12 +164,16 @@ public void run(ActionContext context) throws Exception {
156
164
} catch (IOException e ) {
157
165
LOG .warn (String .format ("Failed to create path '%s'" , gcsPath ));
158
166
rollback = true ;
159
- throw e ;
167
+ String errorReason = String .format ("Failed to create path %s." , gcsPath );
168
+ throw GCPErrorDetailsProviderUtil .getHttpResponseExceptionDetailsFromChain (e , errorReason ,
169
+ ErrorType .UNKNOWN , true , GCPUtils .GCS_SUPPORTED_DOC_URL );
160
170
}
161
171
} else {
162
172
if (config .failIfExists ()) {
163
173
rollback = true ;
164
- throw new Exception (String .format ("Path %s already exists" , gcsPath ));
174
+ String errorReason = String .format ("Path %s already exists" , gcsPath );
175
+ throw ErrorUtils .getProgramFailureException (new ErrorCategory (ErrorCategory .ErrorCategoryEnum .PLUGIN ),
176
+ errorReason , errorReason , ErrorType .SYSTEM , true , null );
165
177
}
166
178
}
167
179
}
0 commit comments