-
Notifications
You must be signed in to change notification settings - Fork 268
Description
Description
A JSON payload is first sent to the message store. The message processor then reads the message from the store and forwards it to an endpoint. During the initial attempt, the request is sent with the correct payload. However, if the initial attempt fails and a retry is triggered, the payload sent to the endpoint is modified compared to the original one.
Please find the attached artifacts for your reference.
API to store messages
<?xml version="1.0" encoding="UTF-8"?>
<api context="/store" name="store" xmlns="http://ws.apache.org/ns/synapse">
<resource methods="POST" uri-template="/">
<inSequence>
<property name="FORCE_SC_ACCEPTED" scope="axis2" type="STRING" value="true"/>
<property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
<store messageStore="SampleQueue"/>
</inSequence>
<faultSequence>
</faultSequence>
</resource>
</api>Endpoint used to send requests from the message processor (Mockoon was used, as it makes stopping, starting, and visualizing the incoming payload easier).
<?xml version="1.0" encoding="UTF-8"?>
<endpoint name="testEp" xmlns="http://ws.apache.org/ns/synapse">
<http method="post" uri-template="http://localhost:3000/submit">
<suspendOnFailure>
<initialDuration>-1</initialDuration>
<progressionFactor>1</progressionFactor>
</suspendOnFailure>
<markForSuspension>
<retriesBeforeSuspension>0</retriesBeforeSuspension>
</markForSuspension>
</http>
</endpoint>Message Processor
<?xml version="1.0" encoding="UTF-8"?>
<messageProcessor class="org.apache.synapse.message.processor.impl.forwarder.ScheduledMessageForwardingProcessor" name="SampleProcessor" messageStore="SampleQueue" targetEndpoint="testEp" xmlns="http://ws.apache.org/ns/synapse">
<parameter name="client.retry.interval">10000</parameter>
<parameter name="member.count">5</parameter>
<parameter name="is.active">true</parameter>
<parameter name="max.delivery.attempts">500000</parameter>
<parameter name="store.connection.retry.interval">1000</parameter>
<parameter name="max.store.connection.attempts">-1</parameter>
<parameter name="max.delivery.drop">Disabled</parameter>
<parameter name="interval">250</parameter>
</messageProcessor>Message Store
<?xml version="1.0" encoding="UTF-8"?>
<messageStore name="SampleQueue" class="org.apache.synapse.message.store.impl.rabbitmq.RabbitMQStore" xmlns="http://ws.apache.org/ns/synapse">
<parameter name="store.rabbitmq.host.name">localhost</parameter>
<parameter name="store.producer.guaranteed.delivery.enable">true</parameter>
<parameter name="store.rabbitmq.host.port">5672</parameter>
<parameter name="store.rabbitmq.route.key">queue</parameter>
<parameter name="store.rabbitmq.username">guest</parameter>
<parameter name="store.rabbitmq.virtual.host"/>
<parameter name="rabbitmq.connection.ssl.enabled">false</parameter>
<parameter name="store.rabbitmq.exchange.name">exchange</parameter>
<parameter name="store.rabbitmq.queue.name">queue</parameter>
<parameter name="store.rabbitmq.password">guest</parameter>
</messageStore>Reproducible steps
- Create a .car with the above artifacts.
- Start the MI server
- Stop the endpoint
- Send a request with the initial payload(given below) to the API to store the payload
- Retry interval is set to 10 seconds in the message processor. After sending the request, start the endpoint within this 10-second interval.
- Check the incoming payload for the endpoint
- Compare it with the initial payload.
The notable updates in the payload are:
- An array containing a single element is converted into just the element, removing the array.
- Numeric values represented as strings are converted to integers.
- Attributes with an empty array are removed.
- An empty string is converted to null.
Please consider the following payloads for the reference
Initial
{
"players": ["john"],
"courts": [],
"contactNo": "0721231231234",
"address": ""
}Incoming payload for the endpoint
{
"players": "john",
"contactNo": "0721231231234",
"address": null
}Identified Cause of the Issue
The problem occurs during message dispatch in the ForwardingService class within the package org.apache.synapse.message.processor.impl.forwarder. Specifically, the tryToDispatchToEndpoint method is called inside a while loop, with each retry triggering a new execution of this method.
At the start of tryToDispatchToEndpoint, the method getFreshCopyOfOriginalMessage is used to update the envelope of the messageToDispatch object. However, the behavior of getFreshCopyOfOriginalMessage differs between the first and subsequent attempts, resulting in a different messageToDispatch object for retries.
-
First Attempt: The envelope of
messageToDispatchis initially set to the parsedoriginalEnvelope. After reading the JSON content fromoriginalJsonInputStream, if JSON content is present, a new envelope is created from the JSON and assigned tomessageToDispatch, replacing the existing envelope. -
Second Attempt: Again, the envelope of
messageToDispatchis initially set to the parsedoriginalEnvelope. However, sinceoriginalJsonInputStreamhas already been read in the first attempt, reading it again returns an empty string. As a result,messageToDispatchis not updated with the JSON content, and the endpoint is called with the available envelope.
This difference leads to a modified payload being sent to the endpoint during retries.
Possible Solutions
- Option 1: Provide a fresh copy of the input stream for each retry so that it can be read independently.
- Option 2: Preserve the exact JSON content in the original envelope.
Steps to Reproduce
Provided in the description
Version
4.1.0
Environment Details (with versions)
No response