Skip to content

Commit 2191a9a

Browse files
authored
Merge pull request #845 from rmsamitha/5.3.x
Fix continuous broken-pipe error logs in binary transport
2 parents 5f0284b + ce15030 commit 2191a9a

File tree

2 files changed

+17
-0
lines changed

2 files changed

+17
-0
lines changed

components/data-bridge/org.wso2.carbon.databridge.agent/src/main/java/org/wso2/carbon/databridge/agent/endpoint/DataEndpoint.java

+11
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ public abstract class DataEndpoint {
7373

7474
public static HashMap<String, Long> delayMap=new HashMap<String, Long>();
7575

76+
public boolean invalidateTransportPool = false;
77+
7678
public long getReConnectTimestamp() {
7779
return reConnectTimestamp;
7880
}
@@ -349,6 +351,15 @@ public void setPoolSemaphore(Semaphore semaphore) {
349351

350352
private void publish() throws DataEndpointException, SessionTimeoutException, UndefinedEventTypeException {
351353
Object client = getClient();
354+
if (invalidateTransportPool) {
355+
log.debug(
356+
"invalidateTransportPool' is 'true'. Going to discard existing client and get new client " +
357+
"for the DataEndpoint");
358+
discardClient(client);
359+
client = getClient();
360+
invalidateTransportPool = false;
361+
log.debug("'invalidateTransportPool' is set to 'false' for the DataEndpoint");
362+
}
352363
try {
353364
send(client, this.events);
354365
semaphoreRelease();

components/data-bridge/org.wso2.carbon.databridge.agent/src/main/java/org/wso2/carbon/databridge/agent/endpoint/binary/BinaryDataEndpoint.java

+6
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.wso2.carbon.databridge.agent.endpoint.binary;
1919

20+
import org.apache.commons.logging.Log;
21+
import org.apache.commons.logging.LogFactory;
2022
import org.wso2.carbon.databridge.agent.endpoint.DataEndpoint;
2123
import org.wso2.carbon.databridge.agent.exception.DataEndpointAuthenticationException;
2224
import org.wso2.carbon.databridge.agent.exception.DataEndpointException;
@@ -35,6 +37,8 @@
3537
*/
3638
public class BinaryDataEndpoint extends DataEndpoint {
3739

40+
private static Log log = LogFactory.getLog(BinaryDataEndpoint.class);
41+
3842
@Override
3943
protected String login(Object client, String userName, String password)
4044
throws DataEndpointAuthenticationException, DataEndpointLoginException {
@@ -84,6 +88,8 @@ protected void send(Object client, List<Event> events) throws DataEndpointExcept
8488
} else if (e instanceof SessionTimeoutException) {
8589
throw new SessionTimeoutException("Binary Session Expired Exception ", e);
8690
} else {
91+
log.debug("Setting 'invalidateTransportPool' to 'true' for binary data transport");
92+
this.invalidateTransportPool = true;
8793
throw new DataEndpointException("Error while trying to publish events to data receiver :"
8894
+ socket.getRemoteSocketAddress().toString(), e);
8995
}

0 commit comments

Comments
 (0)