Skip to content

Commit da4a2e6

Browse files
authored
Merge pull request #22 from vivetiwa/pause-connector-500-or-authentication-error
Pause Connector in case of 500 and unauthorised error
2 parents 4e43b09 + f459b91 commit da4a2e6

File tree

11 files changed

+59
-17
lines changed

11 files changed

+59
-17
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
##
1212

1313
group=com.adobe.platform.streaming
14-
versionMain=0.0.3
14+
versionMain=0.0.5
1515
versionQualifier=
1616

1717
param_artifactory_user=

streaming-connect-common/src/main/java/com/adobe/platform/streaming/auth/AbstractAuthProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ protected ContentHandler<TokenResponse> getContentHandler() {
5858
public TokenResponse getContent(HttpConnection conn) throws HttpException {
5959
try (InputStream in = conn.getInputStream()) {
6060
return mapper.readValue(in, TokenResponse.class);
61-
} catch (HttpException | IOException e) {
61+
} catch (IOException e) {
6262
throw new HttpException("Error parsing response", e);
6363
}
6464
}

streaming-connect-common/src/main/java/com/adobe/platform/streaming/auth/impl/IMSTokenProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public class IMSTokenProvider extends AbstractAuthProvider {
5050
@Override
5151
protected TokenResponse getTokenResponse() throws AuthException {
5252
LOG.debug("refreshing expired accessToken: {}", clientId);
53-
StringBuffer params = new StringBuffer()
53+
StringBuilder params = new StringBuilder()
5454
.append("grant_type=authorization_code")
5555
.append("&client_id=").append(clientId)
5656
.append("&client_secret=").append(clientSecret)

streaming-connect-common/src/main/java/com/adobe/platform/streaming/http/ContentHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public JSONObject getContent(HttpConnection conn) throws HttpException {
3535
try (InputStream in = conn.getInputStream()) {
3636
return new JSONObject(HttpUtil.streamToString(in));
3737
} catch (IOException e) {
38-
throw new HttpException("Error parsing content", e);
38+
throw new HttpException("Error parsing content", e, 405);
3939
}
4040
}
4141
};

streaming-connect-common/src/main/java/com/adobe/platform/streaming/http/HttpConnection.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ private HttpConnection() {
6464
@SuppressWarnings("squid:S3776")
6565
HttpURLConnection connect() throws HttpException {
6666
Throwable cause = null;
67+
int responseCode = 500;
68+
String errorMsg = "";
69+
6770
while (retries++ < maxRetries) {
6871
try {
6972
URL request = new URL(new URL(endpoint), url);
@@ -104,18 +107,21 @@ HttpURLConnection connect() throws HttpException {
104107
}
105108
}
106109

107-
int responseCode = conn.getResponseCode();
110+
responseCode = conn.getResponseCode();
108111
if (HttpUtil.is2xx(responseCode)) {
109112
break;
110113
}
111114

112-
String errorMsg = errorStreamToString();
115+
errorMsg = errorStreamToString();
113116
if (HttpUtil.is5xx(responseCode)) {
114117
LOG.warn("attempt {} of {} failed with {} response - {}", retries, maxRetries, responseCode, errorMsg);
115118
close();
116119
HttpUtil.sleepUninterrupted(retryBackoff);
120+
} else if (HttpUtil.isUnauthorized(responseCode)) {
121+
throw new AuthException(String.format("requested failed. unauthorized to access the endpoint. " +
122+
"response code %s", responseCode));
117123
} else {
118-
throw new HttpException("request failed (" + responseCode + "): " + errorMsg);
124+
throw new HttpException("request failed (" + responseCode + "): " + errorMsg, responseCode);
119125
}
120126
} catch (MalformedURLException e) {
121127
throw new HttpException(("bad withUrl: " + url), e);
@@ -125,10 +131,14 @@ HttpURLConnection connect() throws HttpException {
125131
cause = e;
126132
HttpUtil.sleepUninterrupted(retryBackoff);
127133
} catch (AuthException authException) {
128-
throw new HttpException("exception while fetching the auth token", authException);
134+
throw new HttpException("exception while fetching the auth token", authException, responseCode);
129135
}
130136
}
131137

138+
if (HttpUtil.is5xx(responseCode)) {
139+
throw new HttpException("request failed (" + responseCode + "): " + errorMsg, responseCode);
140+
}
141+
132142
if (conn == null) {
133143
throw new HttpException("unable to connect", cause);
134144
}

streaming-connect-common/src/main/java/com/adobe/platform/streaming/http/HttpException.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,27 @@
1717
*/
1818
public class HttpException extends Exception {
1919

20-
public HttpException() {}
20+
private int responseCode = 500;
2121

2222
public HttpException(String message) {
2323
super(message);
2424
}
2525

26+
public HttpException(String message, int responseCode) {
27+
super(message);
28+
this.responseCode = responseCode;
29+
}
30+
2631
public HttpException(String message, Throwable cause) {
2732
super(message, cause);
2833
}
2934

35+
public HttpException(String message, Throwable cause, int responseCode) {
36+
super(message, cause);
37+
this.responseCode = responseCode;
38+
}
39+
40+
public int getResponseCode() {
41+
return responseCode;
42+
}
3043
}

streaming-connect-common/src/main/java/com/adobe/platform/streaming/http/HttpUtil.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,24 @@
2121
/**
2222
* @author Adobe Inc.
2323
*/
24-
class HttpUtil {
24+
public class HttpUtil {
2525

26-
static boolean is2xx(int responseCode) {
26+
public static boolean is2xx(int responseCode) {
2727
return (responseCode >= 200 && responseCode < 300);
2828
}
2929

30-
static boolean is5xx(int responseCode) {
30+
public static boolean is5xx(int responseCode) {
3131
return (responseCode >= 500 && responseCode < 600);
3232
}
3333

34+
public static boolean is500(int responseCode) {
35+
return responseCode == 500;
36+
}
37+
38+
public static boolean isUnauthorized(int responseCode) {
39+
return responseCode == 401 || responseCode == 403;
40+
}
41+
3442
static String streamToString(InputStream in) throws HttpException {
3543
StringBuilder sb = new StringBuilder(128);
3644
try (BufferedReader r = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8))) {

streaming-connect-sink/src/main/java/com/adobe/platform/streaming/sink/AbstractSinkTask.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.google.gson.GsonBuilder;
1919
import org.apache.commons.collections4.CollectionUtils;
2020
import org.apache.kafka.common.utils.AppInfoParser;
21+
import org.apache.kafka.connect.errors.ConnectException;
2122
import org.apache.kafka.connect.sink.SinkRecord;
2223
import org.apache.kafka.connect.sink.SinkTask;
2324
import org.slf4j.Logger;
@@ -115,17 +116,21 @@ public void put(Collection<SinkRecord> records) {
115116

116117
public abstract int getPayloadLength(T dataToPublish);
117118

118-
public abstract void publishData(List<T> eventsToPublish);
119+
public abstract void publishData(List<T> eventsToPublish) throws AEPStreamingException;
119120

120121
private boolean flushNow(long tempCurrTime) {
121122
return tempCurrTime >= lastFlushMilliSec + flushIntervalMillis || bytesRead >= flushBytesCount;
122123
}
123124

124125
private void publishAndLogIfRequired(List<T> eventsToPublish) {
126+
try {
127+
publishData(eventsToPublish);
128+
} catch (AEPStreamingException e) {
129+
throw new ConnectException("Failed to sink records.", e);
130+
}
125131
if (LOG.isDebugEnabled()) {
126132
LOG.debug("ConnectorSinkTask: {} events sent to destination", eventsToPublish.size());
127133
}
128-
publishData(eventsToPublish);
129134
}
130135

131136
private void reset(List<T> eventsToPublish, long tempCurrentTime) {

streaming-connect-sink/src/main/java/com/adobe/platform/streaming/sink/DataPublisher.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212

1313
package com.adobe.platform.streaming.sink;
1414

15+
import com.adobe.platform.streaming.AEPStreamingException;
16+
1517
import java.util.List;
1618

1719
/**
@@ -21,7 +23,7 @@ public interface DataPublisher {
2123

2224
void start();
2325

24-
void publishData(List<String> messages);
26+
void publishData(List<String> messages) throws AEPStreamingException;
2527

2628
void stop();
2729
}

streaming-connect-sink/src/main/java/com/adobe/platform/streaming/sink/impl/AEPPublisher.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import com.adobe.platform.streaming.http.ContentHandler;
1717
import com.adobe.platform.streaming.http.HttpException;
1818
import com.adobe.platform.streaming.http.HttpProducer;
19+
import com.adobe.platform.streaming.http.HttpUtil;
1920
import com.adobe.platform.streaming.sink.AbstractAEPPublisher;
2021
import org.apache.commons.collections4.CollectionUtils;
2122
import org.apache.commons.lang3.StringUtils;
@@ -45,7 +46,7 @@ class AEPPublisher extends AbstractAEPPublisher {
4546
}
4647

4748
@Override
48-
public void publishData(List<String> messages) {
49+
public void publishData(List<String> messages) throws AEPStreamingException {
4950
if (CollectionUtils.isEmpty(messages)) {
5051
LOG.debug("No messages to publish");
5152
return;
@@ -68,6 +69,9 @@ public void publishData(List<String> messages) {
6869
LOG.debug("Successfully published data to Adobe Experience Platform: {}", response);
6970
} catch (HttpException httpException) {
7071
LOG.error("Failed to publish data to Adobe Experience Platform", httpException);
72+
if (HttpUtil.is500(httpException.getResponseCode()) || HttpUtil.isUnauthorized(httpException.getResponseCode())) {
73+
throw new AEPStreamingException("Failed to publish", httpException);
74+
}
7175
}
7276
}
7377

0 commit comments

Comments
 (0)