2323import com .solacesystems .jcsmp .DeliveryMode ;
2424import com .solacesystems .jcsmp .JCSMPException ;
2525import com .solacesystems .jcsmp .JCSMPSession ;
26+ import org .apache .kafka .clients .producer .RecordMetadata ;
2627import org .apache .kafka .connect .errors .ConnectException ;
2728import org .apache .kafka .connect .source .SourceRecord ;
2829import org .apache .kafka .connect .source .SourceTask ;
3435import java .util .List ;
3536import java .util .Map ;
3637import java .util .concurrent .BlockingQueue ;
38+ import java .util .concurrent .ConcurrentHashMap ;
3739import java .util .concurrent .LinkedBlockingQueue ;
3840
3941
@@ -45,8 +47,10 @@ public class SolaceSourceTask extends SourceTask { // implements XMLMessageListe
4547 private SolSessionHandler solSessionHandler = null ;
4648 BlockingQueue <BytesXMLMessage > ingressMessages
4749 = new LinkedBlockingQueue <>(); // LinkedBlockingQueue for any incoming message from PS+ topics and queue
48- BlockingQueue <BytesXMLMessage > outstandingAckList
49- = new LinkedBlockingQueue <>(); // LinkedBlockingQueue for Solace Flow messages
50+ Map <Long , BytesXMLMessage > pendingAcks = new ConcurrentHashMap <>(); // Pending acks for solace messages
51+ Map <SourceRecord , Long > sourceRecordAcks = new ConcurrentHashMap <>(); // Map record to solace message id
52+ Map <Long , Integer > sourceRecordsLeftInAck = new ConcurrentHashMap <>(); // Number of created record from solace message
53+
5054 String skafkaTopic ;
5155 SolaceSourceTopicListener topicListener = null ;
5256 SolaceSourceQueueConsumer queueConsumer = null ;
@@ -132,37 +136,55 @@ public synchronized List<SourceRecord> poll() throws InterruptedException {
132136 } catch (Exception e ) {
133137 if (connectorConfig .getBoolean (SolaceSourceConstants .SOL_MESSAGE_PROCESSOR_IGNORE_ERROR )) {
134138 log .warn ("================ Encountered exception in message processing....discarded." , e );
135- scheduleForAck ( msg );
139+ msg . ackMessage (); // Effective discard solace message
136140 discarded ++;
137141 continue ;
138142 } else {
139143 throw new ConnectException ("Encountered exception in message processing" , e );
140144 }
141145 }
142- Collections .addAll (records , processor .getRecords (skafkaTopic ));
143- scheduleForAck (msg );
146+ SourceRecord [] processorRecords = processor .getRecords (skafkaTopic );
147+ Collections .addAll (records , processorRecords );
148+ scheduleForAck (msg , processorRecords );
144149 }
145150 log .debug ("Processed {} records in this batch. Discarded {}" , processedInThisBatch - discarded , discarded );
146151 return records ;
147152 }
148153
149- private synchronized void scheduleForAck (BytesXMLMessage msg ) {
154+ private synchronized void scheduleForAck (BytesXMLMessage msg , SourceRecord [] processorRecords ) {
150155 if (msg .getDeliveryMode () == DeliveryMode .NON_PERSISTENT
151156 || msg .getDeliveryMode () == DeliveryMode .PERSISTENT ) {
152- outstandingAckList .add (msg ); // enqueue messages received from guaranteed messaging endpoint for later ack
157+ sourceRecordsLeftInAck .put (msg .getAckMessageId (), processorRecords .length ); // Set number of created records per solace message
158+ for (SourceRecord processorRecord : processorRecords ) {
159+ sourceRecordAcks .put (processorRecord , msg .getAckMessageId ()); // Map each record to solace message id
160+ }
161+ pendingAcks .put (msg .getAckMessageId (), msg ); // enqueue messages received from guaranteed messaging endpoint for later ack
153162 }
154163 }
155164
156- /**
157- * Kafka Connect method that write records to disk.
158- */
159- public synchronized void commit () throws InterruptedException {
160- log .trace ("Committing records" );
161- int currentLoad = outstandingAckList .size ();
162- int count = 0 ;
163- while (count != currentLoad ) {
164- outstandingAckList .take ().ackMessage ();
165- count ++;
165+ @ Override
166+ public synchronized void commitRecord (SourceRecord record , RecordMetadata metadata ) {
167+ Long ackMessageId = sourceRecordAcks .remove (record );
168+ if (ackMessageId == null ) {
169+ return ;
170+ }
171+
172+ if (!sourceRecordsLeftInAck .containsKey (ackMessageId )) {
173+ log .error ("Unable to find message counter for {}" , ackMessageId ); // Shouldn't happens
174+ }
175+
176+ sourceRecordsLeftInAck .computeIfPresent (ackMessageId , (k , o ) -> o > 1 ? --o : null );// Reduce counter of records per message, remove on last
177+
178+ if (!sourceRecordsLeftInAck .containsKey (ackMessageId )) {// Last record was commited in the group
179+ BytesXMLMessage msg = pendingAcks .remove (ackMessageId );
180+
181+ if (msg != null ) {
182+ // Acknowledge the message since all records were commited in kafka
183+ msg .ackMessage ();
184+ log .debug ("Acknowledged message for messageId {}" , ackMessageId );
185+ } else {
186+ log .error ("Message for messageId {} was not found in pending ack map." , ackMessageId );
187+ }
166188 }
167189 }
168190
@@ -183,6 +205,13 @@ public synchronized void stop() {
183205 }
184206 solSessionHandler = null ; // At this point filling the ingress queue is stopped
185207 ingressMessages .clear (); // Remove all remaining ingressed messages, these will be no longer imported to Kafka
208+ sourceRecordAcks .clear ();
209+ sourceRecordsLeftInAck .clear ();
210+ if (!pendingAcks .isEmpty ()) {
211+ log .warn ("Potential duplicates might be spotted" );
212+ pendingAcks .forEach ((s , m ) -> log .warn ("CorrelationId: {}" , m .getCorrelationId ()));
213+ pendingAcks .clear ();
214+ }
186215 log .info ("PubSub+ Source Connector stopped" );
187216 }
188217
0 commit comments