Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
import org.apache.synapse.aspects.AspectConfiguration;
Expand Down Expand Up @@ -86,7 +87,8 @@
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

@SuppressWarnings({"UnusedDeclaration"})
public class SynapseConfigUtils {
Expand Down Expand Up @@ -895,6 +897,33 @@ public static boolean isFailSafeEnabled(String componentName) {
return false;
}

/**
* Replaces occurrences of a pattern in the input string with corresponding property values from the
* message context.
*
* @param input the input string containing patterns to be replaced
* @param pattern the pattern to be matched in the input string
* @param messageContext the message context containing property values
* @return the resulting string with patterns replaced by property values
*/
public static String replacePatternWithProperties(String input, Pattern pattern, MessageContext messageContext) {
Matcher matcher = pattern.matcher(input);
StringBuilder result = new StringBuilder();

int s = 0;
while (matcher.find()) {
String propertyName = matcher.group(1);
Object propertyValue = messageContext.getProperty(propertyName);
if (propertyValue != null) {
result.append(input.substring(s, matcher.start()));
result.append(propertyValue.toString());
s = matcher.end();
}
}
result.append(input.substring(s));
return result.toString();
}


public static SynapseConfiguration getSynapseConfiguration(String tenantDomain){
return lastRegisteredSynapseConfigurationMap.get(tenantDomain);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.axis2.context.ConfigurationContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
import org.apache.synapse.config.SynapsePropertiesLoader;
Expand Down Expand Up @@ -201,6 +202,11 @@ private void recordStatistics(int state) {
*/
private void setState(int state) {

setState(state, null);
}

private void setState(int state, MessageContext synCtx) {

recordStatistics(state);

if (isClustered) {
Expand All @@ -225,28 +231,26 @@ private void setState(int state) {
}

if (retries <= 0) {
log.info("Endpoint : " + endpointName + printEndpointAddress() +
log.info("Endpoint : " + endpointName + printEndpointAddress(synCtx) +
" has been marked for SUSPENSION," +
" but no further retries remain. Thus it will be SUSPENDED.");

setState(ST_SUSPENDED);

setState(ST_SUSPENDED, synCtx);
} else {
Replicator.setAndReplicateState(
REMAINING_RETRIES_KEY, (retries - 1), cfgCtx);
long nextRetry = System.currentTimeMillis()
+ definition.getRetryDurationOnTimeout();
Replicator.setAndReplicateState(NEXT_RETRY_TIME_KEY, nextRetry, cfgCtx);

log.warn("Endpoint : " + endpointName + printEndpointAddress() +
log.warn("Endpoint : " + endpointName + printEndpointAddress(synCtx) +
" is marked as TIMEOUT and " +
"will be retried : " + (retries - 1) + " more time/s after : " +
new Date(nextRetry) + " until its marked SUSPENDED for failure");
}
break;
}
case ST_SUSPENDED: {
computeNextRetryTimeForSuspended();
computeNextRetryTimeForSuspended(synCtx);
break;
}
case ST_OFF: {
Expand Down Expand Up @@ -290,17 +294,15 @@ private void setState(int state) {
}

if (retries <= 0) {
log.info("Endpoint : " + endpointName + printEndpointAddress()
log.info("Endpoint : " + endpointName + printEndpointAddress(synCtx)
+ " has been marked for SUSPENSION, "
+ "but no further retries remain. Thus it will be SUSPENDED.");

setState(ST_SUSPENDED);

setState(ST_SUSPENDED, synCtx);
} else {
localRemainingRetries = retries - 1;
localNextRetryTime =
System.currentTimeMillis() + definition.getRetryDurationOnTimeout();
log.warn("Endpoint : " + endpointName + printEndpointAddress()
log.warn("Endpoint : " + endpointName + printEndpointAddress(synCtx)
+ " is marked as TIMEOUT and " +
"will be retried : " + localRemainingRetries + " more time/s " +
"after : " + new Date(localNextRetryTime)
Expand All @@ -309,7 +311,7 @@ private void setState(int state) {
break;
}
case ST_SUSPENDED: {
computeNextRetryTimeForSuspended();
computeNextRetryTimeForSuspended(synCtx);
break;
}
case ST_OFF: {
Expand All @@ -332,18 +334,23 @@ private void setState(int state) {
* Endpoint has processed a message successfully
*/
public void onSuccess() {

onSuccess(null);
}

public void onSuccess(MessageContext synCtx) {
if (isClustered) {
Integer state = (Integer) cfgCtx.getPropertyNonReplicable(STATE_KEY);

if ((state != null) && ((state != ST_ACTIVE) && (state != ST_OFF))) {
log.info("Endpoint : " + endpointName + printEndpointAddress()
log.info("Endpoint : " + endpointName + printEndpointAddress(synCtx)
+ " currently " + getStateAsString() +
" will now be marked active since it processed its last message");
setState(ST_ACTIVE);
}
} else {
if (localState != ST_ACTIVE && localState != ST_OFF) {
log.info("Endpoint : " + endpointName + printEndpointAddress()
log.info("Endpoint : " + endpointName + printEndpointAddress(synCtx)
+ " currently " + getStateAsString() +
" will now be marked active since it processed its last message");
setState(ST_ACTIVE);
Expand All @@ -355,26 +362,37 @@ public void onSuccess() {
* Endpoint failed processing a message
*/
public void onFault() {
log.warn("Endpoint : " + endpointName + printEndpointAddress() +
" will be marked SUSPENDED as it failed");
setState(ST_SUSPENDED);

onFault(null);
}

public void onFault(MessageContext synCtx) {

log.warn("Endpoint : " + endpointName + printEndpointAddress(synCtx) + " will be marked SUSPENDED as it failed");
setState(ST_SUSPENDED, synCtx);
}

/**
* Endpoint timeout processing a message
*/
public void onTimeout() {

onTimeout(null);
}

public void onTimeout(MessageContext synCtx) {

if (log.isDebugEnabled()) {
log.debug("Endpoint : " + endpointName + printEndpointAddress() + " will be marked for " +
log.debug("Endpoint : " + endpointName + printEndpointAddress(synCtx) + " will be marked for " +
"SUSPENSION due to the occurrence of one of the configured errors");
}
setState(ST_TIMEOUT);
setState(ST_TIMEOUT, synCtx);
}

/**
* Compute the suspension duration according to the geometric series parameters defined
*/
private void computeNextRetryTimeForSuspended() {
private void computeNextRetryTimeForSuspended(MessageContext synCtx) {
boolean notYetSuspended = true;
long lastSuspendDuration = definition.getInitialSuspendDuration();
if (isClustered) {
Expand Down Expand Up @@ -408,7 +426,7 @@ private void computeNextRetryTimeForSuspended() {
localNextRetryTime = nextRetryTime;
}

log.warn("Suspending endpoint : " + endpointName + printEndpointAddress() +
log.warn("Suspending endpoint : " + endpointName + printEndpointAddress(synCtx) +
(notYetSuspended ? " -" :
" - last suspend duration was : " + lastSuspendDuration + "ms and") +
" current suspend duration is : " + nextSuspendDuration + "ms - " +
Expand Down Expand Up @@ -598,8 +616,20 @@ public String toString() {
}

private String printEndpointAddress() {
if(this.definition != null && this.definition.getAddress() != null) {
return " with address " + MessageHelper.maskURLPassword(this.definition.getAddress());

return printEndpointAddress(null);
}

private String printEndpointAddress(MessageContext synCtx) {

if (this.definition != null && this.definition.getAddress() != null) {
String address = "";
if (synCtx != null) {
address = this.definition.getAddress(synCtx);
} else {
address = this.definition.getAddress();
}
return " with address " + MessageHelper.maskURLPassword(address);
} else {
return " ";
}
Expand All @@ -610,6 +640,11 @@ private String printEndpointAddress() {
*/
public boolean isMaxRetryLimitReached(boolean isRecursiveEndpoint) {

return isMaxRetryLimitReached(isRecursiveEndpoint, null);
}

public boolean isMaxRetryLimitReached(boolean isRecursiveEndpoint, MessageContext synCtx) {

if (isClustered) {
Integer remainingMaxRetries;
int maximumRetryLimitCount;
Expand All @@ -629,7 +664,7 @@ public boolean isMaxRetryLimitReached(boolean isRecursiveEndpoint) {
return false;
}
if (log.isDebugEnabled()) {
log.debug("Endpoint : " + endpointName + printEndpointAddress()
log.debug("Endpoint : " + endpointName + printEndpointAddress(synCtx)
+ " has " + remainingMaxRetries + " maximum retries before suspension");
}

Expand Down Expand Up @@ -657,7 +692,7 @@ public boolean isMaxRetryLimitReached(boolean isRecursiveEndpoint) {
}

if (log.isDebugEnabled()) {
log.debug("Endpoint : " + endpointName + printEndpointAddress()
log.debug("Endpoint : " + endpointName + printEndpointAddress(synCtx)
+ " has " + maximumRemainingRetriesCount + " maximum retries before suspension");
}

Expand All @@ -683,6 +718,11 @@ public boolean isMaxRetryLimitReached(boolean isRecursiveEndpoint) {
*/
public void onFailoverRetryLimit(boolean isRecursiveEndpoint) {

onFailoverRetryLimit(isRecursiveEndpoint, null);
}

public void onFailoverRetryLimit(boolean isRecursiveEndpoint, MessageContext synCtx) {

recordStatistics(ST_SUSPENDED);
long suspendDuration = isRecursiveEndpoint ? suspendDurationOnMaximumRecursiveFailover :
suspendDurationOnMaximumFailover;
Expand All @@ -697,7 +737,7 @@ public void onFailoverRetryLimit(boolean isRecursiveEndpoint) {
localNextRetryTime = nextRetryTime;
}

log.warn("Endpoint : " + endpointName + printEndpointAddress() +
log.warn("Endpoint : " + endpointName + printEndpointAddress(synCtx) +
" will be marked SUSPENDED as it failed until the maximum failover retry limit. Current suspend " +
"duration is : " +
suspendDuration + "ms - Next retry after : " + new Date(nextRetryTime));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,32 +267,19 @@ public String getAddress(MessageContext messageContext) {
if (dynamicUrl != null && !dynamicUrl.isEmpty()) {
addressString = dynamicUrl;
}
boolean matches = false;
int s = 0;
Pattern pattern = Pattern.compile("\\$\\{.*?\\}");

StringBuffer computedAddress = new StringBuffer();

Matcher matcher = pattern.matcher(addressString);
while (matcher.find()) {


Object property = messageContext.getProperty(
addressString.substring(matcher.start() + 2, matcher.end() - 1));
if (property != null) {
computedAddress.append(addressString.substring(s, matcher.start()));
computedAddress.append(property.toString());
s = matcher.end();
matches = true;
}
Pattern oldPattern = Pattern.compile("\\$\\{(.*?)\\}");
String oldProcessedAddress = SynapseConfigUtils.replacePatternWithProperties(addressString, oldPattern, messageContext);
if (!oldProcessedAddress.equals(addressString)) {
return oldProcessedAddress;
}

if (!matches) {
return addressString;
} else {
computedAddress.append(addressString.substring(s, addressString.length()));
return computedAddress.toString();
Pattern newPattern = Pattern.compile("\\{\\+?([^}]+)\\}");
String newProcessedAddress = SynapseConfigUtils.replacePatternWithProperties(addressString, newPattern, messageContext);
if (!newProcessedAddress.equals(addressString)) {
return newProcessedAddress;
}
return addressString;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ public void onFault(MessageContext synCtx) {
logSetter();
boolean isRecursive = getParentEndpoint() instanceof FailoverEndpoint ||
getParentEndpoint() instanceof LoadbalanceEndpoint;
if (getContext().isMaxRetryLimitReached(isRecursive)) {
getContext().onFailoverRetryLimit(isRecursive);
if (getContext().isMaxRetryLimitReached(isRecursive, synCtx)) {
getContext().onFailoverRetryLimit(isRecursive, synCtx);
} else {
// is this really a fault or a timeout/connection close etc?
if (isTimeout(synCtx)) {
getContext().onTimeout();
getContext().onTimeout(synCtx);
} else if (isSuspendFault(synCtx)) {
getContext().onFault();
getContext().onFault(synCtx);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,25 @@ public void testQueryParamsAsReservedChars() throws AxisFault, XMLStreamExceptio

}

@Test
public void testDynamicEndpointAddress() throws AxisFault, XMLStreamException {
HTTPEndpointFactory factory = new HTTPEndpointFactory();
OMElement em = AXIOMUtil.stringToOM("<http method=\"GET\"/>");
EndpointDefinition ep = factory.createEndpointDefinition(em);
ep.setAddress("{uri.var.base}{+uri.var.path}{+uri.var.query}");

HTTPEndpoint httpEndpoint = new HTTPEndpoint();
httpEndpoint.setDefinition(ep);
MessageContext messageContext = createMessageContext();
messageContext.setProperty("uri.var.base", "http://localhost:5000");
messageContext.setProperty("uri.var.path", "/service/test");
messageContext.setProperty("uri.var.query", "?param1=value1&param2=value2");

String actualAddress = httpEndpoint.getDefinition().getAddress(messageContext);
String expectedAddress = "http://localhost:5000/service/test?param1=value1&param2=value2";
Assert.assertEquals("Dynamic endpoint address mismatch: expected concatenated base + path + query", expectedAddress, actualAddress);
}

/**
* Tests sending sample characters that may be unreserved (@,: etc) as query parameter content
*/
Expand Down
Loading