2121import org .agrona .DirectBuffer ;
2222import org .agrona .collections .Long2ObjectHashMap ;
2323import org .agrona .concurrent .EpochNanoClock ;
24+ import org .agrona .concurrent .status .AtomicCounter ;
2425import uk .co .real_logic .artio .builder .Encoder ;
2526import uk .co .real_logic .artio .decoder .AbstractResendRequestDecoder ;
2627import uk .co .real_logic .artio .decoder .SessionHeaderDecoder ;
3031import uk .co .real_logic .artio .messages .MessageStatus ;
3132import uk .co .real_logic .artio .messages .ValidResendRequestDecoder ;
3233import uk .co .real_logic .artio .protocol .GatewayPublication ;
34+ import uk .co .real_logic .artio .util .MutableAsciiBuffer ;
3335
3436import java .util .ArrayDeque ;
3537
@@ -45,21 +47,25 @@ public class GapFiller extends AbstractReplayer
4547 private final GatewayPublication publication ;
4648 private final String agentNamePrefix ;
4749 private final ReplayTimestamper timestamper ;
50+ private final AtomicCounter gapfillerCounter ;
4851
4952 public GapFiller (
5053 final GatewayPublication publication ,
54+ final BufferClaim bufferClaim ,
5155 final String agentNamePrefix ,
5256 final SenderSequenceNumbers senderSequenceNumbers ,
5357 final ReplayerCommandQueue replayerCommandQueue ,
5458 final FixSessionCodecsFactory fixSessionCodecsFactory ,
59+ final AtomicCounter gapfillerCounter ,
5560 final EpochNanoClock clock ,
5661 final DutyCycleTracker dutyCycleTracker )
5762 {
58- super (publication .dataPublication (), fixSessionCodecsFactory , new BufferClaim () , senderSequenceNumbers ,
63+ super (publication .dataPublication (), fixSessionCodecsFactory , bufferClaim , senderSequenceNumbers ,
5964 clock , dutyCycleTracker );
6065 this .publication = publication ;
6166 this .agentNamePrefix = agentNamePrefix ;
6267 this .replayerCommandQueue = replayerCommandQueue ;
68+ this .gapfillerCounter = gapfillerCounter ;
6369
6470 timestamper = new ReplayTimestamper (publication .dataPublication (), clock );
6571 }
@@ -92,27 +98,30 @@ public void onFragment(final DirectBuffer buffer, final int start, final int len
9298 final int endSeqNo = (int )validResendRequest .endSequenceNumber ();
9399 final int sequenceIndex = validResendRequest .sequenceIndex ();
94100 final long correlationId = validResendRequest .correlationId ();
95- validResendRequest .wrapBody (asciiBuffer );
101+
102+ final MutableAsciiBuffer copiedBuffer = new MutableAsciiBuffer (new byte [validResendRequest .bodyLength ()]);
103+ validResendRequest .getBody (copiedBuffer , 0 , validResendRequest .bodyLength ());
96104
97105 onResendRequest (
98- sessionId , connectionId , beginSeqNo , endSeqNo , sequenceIndex , correlationId );
106+ sessionId , connectionId , beginSeqNo , endSeqNo , sequenceIndex , correlationId , copiedBuffer );
99107 }
100108 else
101109 {
102110 fixSessionCodecsFactory .onFragment (buffer , start , length , header );
103111 }
104112 }
105113
106- private void onResendRequest (
114+ void onResendRequest (
107115 final long sessionId ,
108116 final long connectionId ,
109117 final int beginSeqNo ,
110118 final int endSeqNo ,
111119 final int sequenceIndex ,
112- final long correlationId )
120+ final long correlationId ,
121+ final MutableAsciiBuffer copiedBuffer )
113122 {
114123 final GapFillerSession gapFillerSession = new GapFillerSession (
115- sessionId , connectionId , beginSeqNo , endSeqNo , sequenceIndex , correlationId );
124+ sessionId , connectionId , beginSeqNo , endSeqNo , sequenceIndex , correlationId , copiedBuffer );
116125
117126 if (gapFillerChannels .containsKey (connectionId ))
118127 {
@@ -124,6 +133,7 @@ private void onResendRequest(
124133 final GapFillerChannel gapFillerChannel = new GapFillerChannel ();
125134 gapFillerChannel .currentSession (gapFillerSession );
126135 gapFillerChannels .put (connectionId , gapFillerChannel );
136+ gapfillerCounter .increment ();
127137 }
128138 }
129139
@@ -157,6 +167,7 @@ private int sendGapfills()
157167 if (checkDisconnected (connectionId ))
158168 {
159169 gapFillerChannelIterator .remove ();
170+ gapfillerCounter .decrement ();
160171 }
161172 else
162173 {
@@ -169,6 +180,7 @@ private int sendGapfills()
169180 if (null == queuedSession )
170181 {
171182 gapFillerChannelIterator .remove ();
183+ gapfillerCounter .decrement ();
172184 }
173185 else
174186 {
@@ -237,6 +249,7 @@ private enum State
237249 final int endSeqNo ;
238250 final int sequenceIndex ;
239251 final long correlationId ;
252+ final MutableAsciiBuffer copiedBuffer ;
240253 private State state ;
241254 private FixReplayerCodecs fixReplayerCodecs ;
242255
@@ -246,14 +259,16 @@ private enum State
246259 final int beginSeqNo ,
247260 final int endSeqNo ,
248261 final int sequenceIndex ,
249- final long correlationId )
262+ final long correlationId ,
263+ final MutableAsciiBuffer copiedBuffer )
250264 {
251265 this .sessionId = sessionId ;
252266 this .connectionId = connectionId ;
253267 this .beginSeqNo = beginSeqNo ;
254268 this .endSeqNo = endSeqNo ;
255269 this .sequenceIndex = sequenceIndex ;
256270 this .correlationId = correlationId ;
271+ this .copiedBuffer = copiedBuffer ;
257272 this .state = State .INIT ;
258273 }
259274
@@ -308,7 +323,7 @@ int doWork()
308323 final AbstractResendRequestDecoder resendRequest = fixReplayerCodecs .resendRequest ();
309324 final GapFillEncoder encoder = fixReplayerCodecs .gapFillEncoder ();
310325
311- resendRequest .decode (asciiBuffer , 0 , asciiBuffer .capacity ());
326+ resendRequest .decode (copiedBuffer , 0 , copiedBuffer .capacity ());
312327
313328 final SessionHeaderDecoder reqHeader = resendRequest .header ();
314329
0 commit comments