Skip to content

Commit f447afd

Browse files
committed
Error management for Amazon S3 Source and Sink plugin
1 parent bbd7daf commit f447afd

File tree

3 files changed

+76
-4
lines changed

3 files changed

+76
-4
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright © 2025 Cask Data, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package io.cdap.plugin.aws.s3.common;
18+
19+
import com.amazonaws.services.s3.model.AmazonS3Exception;
20+
import com.google.common.base.Throwables;
21+
import io.cdap.cdap.api.exception.ErrorCategory;
22+
import io.cdap.cdap.api.exception.ErrorCodeType;
23+
import io.cdap.cdap.api.exception.ErrorType;
24+
import io.cdap.cdap.api.exception.ErrorUtils;
25+
import io.cdap.cdap.api.exception.ProgramFailureException;
26+
import io.cdap.cdap.etl.api.exception.ErrorContext;
27+
import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider;
28+
import java.util.List;
29+
30+
/**
31+
* Error details provided for the Amazon S3
32+
**/
33+
public class AmazonErrorDetailsProvider implements ErrorDetailsProvider {
34+
35+
static final String S3_EXTERNAL_DOC = "https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html";
36+
37+
@Override
38+
public ProgramFailureException getExceptionDetails(Exception e, ErrorContext errorContext) {
39+
List<Throwable> causalChain = Throwables.getCausalChain(e);
40+
for (Throwable t : causalChain) {
41+
if (t instanceof ProgramFailureException) {
42+
// if causal chain already has program failure exception, return null to avoid double wrap.
43+
return null;
44+
}
45+
if (t instanceof AmazonS3Exception) {
46+
AmazonS3Exception s3Exception = (AmazonS3Exception) t;
47+
String errorMessage = s3Exception.getErrorMessage();
48+
String errorMessageWithDetails = s3Exception.getMessage();
49+
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
50+
errorMessage, errorMessageWithDetails, ErrorType.USER, true, ErrorCodeType.HTTP, s3Exception.getErrorCode(),
51+
S3_EXTERNAL_DOC, t);
52+
}
53+
}
54+
return null;
55+
}
56+
}

src/main/java/io/cdap/plugin/aws/s3/sink/S3BatchSink.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import io.cdap.cdap.etl.api.batch.BatchSink;
3232
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
3333
import io.cdap.cdap.etl.api.connector.Connector;
34+
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
35+
import io.cdap.plugin.aws.s3.common.AmazonErrorDetailsProvider;
3436
import io.cdap.plugin.aws.s3.common.S3ConnectorConfig;
3537
import io.cdap.plugin.aws.s3.common.S3Constants;
3638
import io.cdap.plugin.aws.s3.common.S3Path;
@@ -82,6 +84,11 @@ public void prepareRun(BatchSinkContext context) throws Exception {
8284
super.prepareRun(context);
8385
}
8486

87+
@Override
88+
protected String getErrorDetailsProviderClassName() {
89+
return AmazonErrorDetailsProvider.class.getName();
90+
}
91+
8592
@Override
8693
protected LineageRecorder getLineageRecorder(BatchSinkContext context) {
8794
return new LineageRecorder(context, asset);
@@ -207,8 +214,9 @@ public void validate(FailureCollector collector) {
207214
try {
208215
getFilesystemProperties();
209216
} catch (Exception e) {
210-
collector.addFailure("File system properties must be a valid json.", null)
211-
.withConfigProperty(NAME_FILE_SYSTEM_PROPERTIES).withStacktrace(e.getStackTrace());
217+
collector.addFailure(String.format("File system properties must be a valid json, %s, %s",
218+
e.getClass().getName(), e.getMessage()), null)
219+
.withConfigProperty(NAME_FILE_SYSTEM_PROPERTIES).withStacktrace(e.getStackTrace());
212220
}
213221
}
214222
}

src/main/java/io/cdap/plugin/aws/s3/source/S3BatchSource.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import io.cdap.cdap.etl.api.batch.BatchSource;
3131
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
3232
import io.cdap.cdap.etl.api.connector.Connector;
33+
import io.cdap.cdap.etl.api.exception.ErrorDetailsProviderSpec;
34+
import io.cdap.plugin.aws.s3.common.AmazonErrorDetailsProvider;
3335
import io.cdap.plugin.aws.s3.common.S3ConnectorConfig;
3436
import io.cdap.plugin.aws.s3.common.S3Constants;
3537
import io.cdap.plugin.aws.s3.common.S3EmptyInputFormat;
@@ -89,6 +91,11 @@ public void prepareRun(BatchSourceContext context) throws Exception {
8991
super.prepareRun(context);
9092
}
9193

94+
@Override
95+
protected String getErrorDetailsProviderClassName() {
96+
return AmazonErrorDetailsProvider.class.getName();
97+
}
98+
9299
@Override
93100
protected LineageRecorder getLineageRecorder(BatchSourceContext context) {
94101
return new LineageRecorder(context, asset);
@@ -220,8 +227,9 @@ public void validate(FailureCollector collector) {
220227
try {
221228
getFilesystemProperties();
222229
} catch (Exception e) {
223-
collector.addFailure("File system properties must be a valid json.", null)
224-
.withConfigProperty(NAME_FILE_SYSTEM_PROPERTIES).withStacktrace(e.getStackTrace());
230+
collector.addFailure(String.format("File system properties must be a valid json, %s: %s",
231+
e.getClass().getName(), e.getMessage()), null)
232+
.withConfigProperty(NAME_FILE_SYSTEM_PROPERTIES).withStacktrace(e.getStackTrace());
225233
}
226234
}
227235
}

0 commit comments

Comments
 (0)