diff --git a/modules/core/src/main/java/org/apache/synapse/ContinuationState.java b/modules/core/src/main/java/org/apache/synapse/ContinuationState.java index e64be969cf..41ef89f648 100644 --- a/modules/core/src/main/java/org/apache/synapse/ContinuationState.java +++ b/modules/core/src/main/java/org/apache/synapse/ContinuationState.java @@ -19,6 +19,8 @@ package org.apache.synapse; +import java.util.List; + /** * Implementations of this interface holds the runtime state information of important checkpoints * of the mediation flow. @@ -77,4 +79,28 @@ public interface ContinuationState { */ public void removeLeafChild(); + /** + * Get the statistics parent index stored in this continuation state. + * @return The parent index for statistics tracing + */ + public Integer getStatisticsParentIndex(); + + /** + * Set the statistics parent index for this continuation state. + * @param statisticsParentIndex The parent index to store + */ + public void setStatisticsParentIndex(Integer statisticsParentIndex); + + /** + * Get the statistics parent list stored in this continuation state. + * @return The parent list for statistics tracing + */ + public List getStatisticsParentList(); + + /** + * Set the statistics parent list for this continuation state. + * @param statisticsParentList The parent list to store + */ + public void setStatisticsParentList(List statisticsParentList); + } diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/parentresolving/AbstractParentResolver.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/parentresolving/AbstractParentResolver.java index 2c25bf02f6..dd7dd5f719 100644 --- a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/parentresolving/AbstractParentResolver.java +++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/parentresolving/AbstractParentResolver.java @@ -31,6 +31,10 @@ protected static boolean isEndpointOrInboundEndpoint(StatisticDataUnit statistic statisticDataUnit.getComponentType().equals(ComponentType.INBOUNDENDPOINT); } + protected static boolean isSequence(StatisticDataUnit statisticDataUnit) { + return statisticDataUnit.getComponentType().equals(ComponentType.SEQUENCE); + } + protected static boolean isFlowContinuableMediator(StatisticDataUnit statisticDataUnit) { return statisticDataUnit.isFlowContinuableMediator(); } @@ -49,4 +53,14 @@ protected static boolean isSendMediator(StatisticDataUnit statisticDataUnit) { return ComponentType.MEDIATOR.equals(statisticDataUnit.getComponentType()) && "sendmediator".equalsIgnoreCase(statisticDataUnit.getComponentName()); } + + protected static boolean isCloneMediator(StatisticDataUnit statisticDataUnit) { + return ComponentType.MEDIATOR.equals(statisticDataUnit.getComponentType()) && + "CloneMediator".equalsIgnoreCase(statisticDataUnit.getComponentName()); + } + + protected static boolean isScatterGatherMediator(StatisticDataUnit statisticDataUnit) { + return ComponentType.MEDIATOR.equals(statisticDataUnit.getComponentType()) && + "ScatterGatherMediator".equalsIgnoreCase(statisticDataUnit.getComponentName()); + } } diff --git a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/parentresolving/MessageFlowRepresentationBasedParentResolver.java b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/parentresolving/MessageFlowRepresentationBasedParentResolver.java index 1c65ccb713..568eb8db65 100644 --- a/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/parentresolving/MessageFlowRepresentationBasedParentResolver.java +++ b/modules/core/src/main/java/org/apache/synapse/aspects/flow/statistics/tracing/opentelemetry/management/parentresolving/MessageFlowRepresentationBasedParentResolver.java @@ -52,6 +52,7 @@ public static SpanWrapper resolveParent(StatisticDataUnit child, SpanStore spanS */ return LatestActiveParentResolver.resolveParentForEndpointOrInboundEndpoint(spanStore); } + if (TracingUtils.isAnonymousSequence(parent.getStatisticDataUnit()) || TracingUtils.isAnonymousSequence(child)) { if (isFlowContinuableMediator(parent.getStatisticDataUnit()) || @@ -60,6 +61,12 @@ public static SpanWrapper resolveParent(StatisticDataUnit child, SpanStore spanS } return getLatestEligibleParent(spanStore); } + + // Special handling for sequences executing in cloned/parallel contexts + // (e.g., inside Clone mediator or Scatter Gather mediator) + if (isSequence(child)) { + return findRecentCloneOrScatterGatherMediator(spanStore); + } } return null; } @@ -84,4 +91,25 @@ private static SpanWrapper getLatestEligibleParent(SpanStore spanStore) { } return null; } -} + + /** + * Finds the most recent Clone or Scatter Gather mediator in the span store. + * This is used when sequences are executing inside cloned/parallel contexts. + * @param spanStore Span store object. + * @return The most recent Clone or Scatter Gather mediator span wrapper, or null if not found. + */ + private static SpanWrapper findRecentCloneOrScatterGatherMediator(SpanStore spanStore) { + Object[] spanWrapperKeys = spanStore.getSpanWrappers().keySet().toArray(); + for (int i = spanWrapperKeys.length - 1; i >= 0; i--) { + String key = (String)spanWrapperKeys[i]; + SpanWrapper spanWrapper = spanStore.getSpanWrapper(key); + if (spanWrapper != null && spanWrapper.getStatisticDataUnit() != null) { + StatisticDataUnit unit = spanWrapper.getStatisticDataUnit(); + if (isCloneMediator(unit) || isScatterGatherMediator(unit)) { + return spanWrapper; + } + } + } + return null; + } +} \ No newline at end of file diff --git a/modules/core/src/main/java/org/apache/synapse/continuation/AbstractContinuationState.java b/modules/core/src/main/java/org/apache/synapse/continuation/AbstractContinuationState.java index 24942868fb..b7ef59d8d1 100644 --- a/modules/core/src/main/java/org/apache/synapse/continuation/AbstractContinuationState.java +++ b/modules/core/src/main/java/org/apache/synapse/continuation/AbstractContinuationState.java @@ -20,6 +20,7 @@ package org.apache.synapse.continuation; import org.apache.synapse.ContinuationState; +import java.util.List; public abstract class AbstractContinuationState implements ContinuationState { @@ -30,6 +31,16 @@ public abstract class AbstractContinuationState implements ContinuationState { */ private ContinuationState childContState = null; + /** + * Statistics parent index for tracing context preservation across async boundaries. + */ + private Integer statisticsParentIndex; + + /** + * Statistics parent list for tracing context preservation across async boundaries. + */ + private List statisticsParentList; + /** * Get the child ContinuationState * @return child ContinuationState @@ -115,4 +126,36 @@ public void removeLeafChild() { } } + /** + * Get the statistics parent index stored in this continuation state. + * @return The parent index for statistics tracing + */ + public Integer getStatisticsParentIndex() { + return statisticsParentIndex; + } + + /** + * Set the statistics parent index for this continuation state. + * @param statisticsParentIndex The parent index to store + */ + public void setStatisticsParentIndex(Integer statisticsParentIndex) { + this.statisticsParentIndex = statisticsParentIndex; + } + + /** + * Get the statistics parent list stored in this continuation state. + * @return The parent list for statistics tracing + */ + public List getStatisticsParentList() { + return statisticsParentList; + } + + /** + * Set the statistics parent list for this continuation state. + * @param statisticsParentList The parent list to store + */ + public void setStatisticsParentList(List statisticsParentList) { + this.statisticsParentList = statisticsParentList; + } + } diff --git a/modules/core/src/main/java/org/apache/synapse/continuation/ContinuationStackManager.java b/modules/core/src/main/java/org/apache/synapse/continuation/ContinuationStackManager.java index 9eb8d544c1..2e42726cd5 100644 --- a/modules/core/src/main/java/org/apache/synapse/continuation/ContinuationStackManager.java +++ b/modules/core/src/main/java/org/apache/synapse/continuation/ContinuationStackManager.java @@ -29,6 +29,7 @@ import org.apache.synapse.aspects.flow.statistics.collectors.RuntimeStatisticCollector; import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.management.OpenTelemetryManager; import org.apache.synapse.aspects.flow.statistics.tracing.opentelemetry.OpenTelemetryManagerHolder; +import org.apache.synapse.aspects.flow.statistics.util.StatisticsConstants; import org.apache.synapse.core.axis2.ProxyService; import org.apache.synapse.inbound.InboundEndpoint; import org.apache.synapse.mediators.MediatorFaultHandler; @@ -120,7 +121,21 @@ public static void updateSeqContinuationState(MessageContext synCtx, int positio if (synCtx.isContinuationEnabled()) { ContinuationState seqContState = ContinuationStackManager.peakContinuationStateStack(synCtx); if (seqContState != null) { - seqContState.getLeafChild().setPosition(position); + ContinuationState leafChild = seqContState.getLeafChild(); + leafChild.setPosition(position); + + // Save statistics parent context with the continuation state for tracing + if (RuntimeStatisticCollector.isStatisticsEnabled()) { + Integer parentIndex = (Integer) synCtx.getProperty( + StatisticsConstants.MEDIATION_FLOW_STATISTICS_PARENT_INDEX); + java.util.List parentList = (java.util.List) synCtx.getProperty( + StatisticsConstants.MEDIATION_FLOW_STATISTICS_PARENT_LIST); + + // Store parent context in the continuation state + leafChild.setStatisticsParentIndex(parentIndex); + leafChild.setStatisticsParentList(parentList != null ? + new java.util.LinkedList<>(parentList) : null); + } } else { // Ideally we should not get here. log.warn("Continuation Stack is empty. Probably due to a configuration issue"); diff --git a/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2SynapseEnvironment.java b/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2SynapseEnvironment.java index b356bdb77d..de0e46ac6f 100644 --- a/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2SynapseEnvironment.java +++ b/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2SynapseEnvironment.java @@ -43,6 +43,7 @@ import org.apache.synapse.aspects.flow.statistics.collectors.CloseEventCollector; import org.apache.synapse.aspects.flow.statistics.collectors.OpenEventCollector; import org.apache.synapse.aspects.flow.statistics.collectors.RuntimeStatisticCollector; +import org.apache.synapse.aspects.flow.statistics.util.StatisticsConstants; import org.apache.synapse.aspects.flow.statistics.store.MessageDataStore; import org.apache.synapse.carbonext.TenantInfoConfigurator; import org.apache.synapse.commons.json.JsonUtil; @@ -841,8 +842,34 @@ private boolean mediateFromContinuationStateStack(MessageContext synCtx) { } } + // Store parent context before opening continuation events (they will be consumed) + Integer savedParentIndex = null; + List savedParentList = null; + if (RuntimeStatisticCollector.isStatisticsEnabled()) { + // Get parent context from continuation state before opening events + if (!synCtx.getContinuationStateStack().isEmpty()) { + ContinuationState contState = synCtx.getContinuationStateStack().peek(); + if (contState != null) { + // Get the leaf child where parent context was stored + ContinuationState leafState = contState.getLeafChild(); + if (leafState != null) { + savedParentIndex = leafState.getStatisticsParentIndex(); + savedParentList = leafState.getStatisticsParentList(); + } + } + } OpenEventCollector.openContinuationEvents(synCtx); + + // Restore parent context AFTER opening continuation events + if (savedParentIndex != null) { + synCtx.setProperty(StatisticsConstants.MEDIATION_FLOW_STATISTICS_PARENT_INDEX, + savedParentIndex); + } + if (savedParentList != null) { + synCtx.setProperty(StatisticsConstants.MEDIATION_FLOW_STATISTICS_PARENT_LIST, + new java.util.LinkedList<>(savedParentList)); + } } //First push fault handlers for first continuation state. @@ -870,6 +897,25 @@ private boolean mediateFromContinuationStateStack(MessageContext synCtx) { } private void callMediatorPostMediate(MessageContext response) { + // Restore statistics parent context from continuation state for blocking calls + if (RuntimeStatisticCollector.isStatisticsEnabled() && + !response.getContinuationStateStack().isEmpty()) { + ContinuationState contState = response.getContinuationStateStack().peek(); + if (contState != null) { + Integer savedParentIndex = contState.getStatisticsParentIndex(); + List savedParentList = contState.getStatisticsParentList(); + + if (savedParentIndex != null) { + response.setProperty(StatisticsConstants.MEDIATION_FLOW_STATISTICS_PARENT_INDEX, + savedParentIndex); + } + if (savedParentList != null) { + response.setProperty(StatisticsConstants.MEDIATION_FLOW_STATISTICS_PARENT_LIST, + new java.util.LinkedList<>(savedParentList)); + } + } + } + Target targetForInboundPayload = (Target) response.getProperty(TARGET_FOR_INBOUND_PAYLOAD); String sourceMessageType = (String) response.getProperty(SOURCE_MESSAGE_TYPE); String originalMessageType = (String) response.getProperty(ORIGINAL_MESSAGE_TYPE); diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/template/InvokeMediator.java b/modules/core/src/main/java/org/apache/synapse/mediators/template/InvokeMediator.java index 5b1212972c..8fa4a3ac49 100644 --- a/modules/core/src/main/java/org/apache/synapse/mediators/template/InvokeMediator.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/template/InvokeMediator.java @@ -288,6 +288,12 @@ public boolean mediate(MessageContext synCtx, ContinuationState continuationStat boolean result; int subBranch = ((ReliantContinuationState) continuationState).getSubBranch(); boolean isStatisticsEnabled = RuntimeStatisticCollector.isStatisticsEnabled(); + Integer statisticReportingIndex = null; + + if (isStatisticsEnabled) { + statisticReportingIndex = reportOpenStatistics(synCtx, false); + } + if (subBranch == 0) { // Default flow TemplateMediator templateMediator = (TemplateMediator) synCtx.getSequenceTemplate(targetTemplate); @@ -332,6 +338,11 @@ public boolean mediate(MessageContext synCtx, ContinuationState continuationStat prefetchInvoke.reportCloseStatistics(synCtx, null); } } + + if (isStatisticsEnabled && statisticReportingIndex != null) { + reportCloseStatistics(synCtx, statisticReportingIndex); + } + return result; }