2424import com .solacesystems .jcsmp .JCSMPException ;
2525import com .solacesystems .jcsmp .JCSMPProperties ;
2626import com .solacesystems .jcsmp .JCSMPSession ;
27- import com .solacesystems .jcsmp .JCSMPSessionStats ;
28- import com .solacesystems .jcsmp .statistics .StatType ;
29-
3027import java .util .ArrayList ;
3128import java .util .Collections ;
32- import java .util .Enumeration ;
3329import java .util .List ;
3430import java .util .Map ;
3531import java .util .concurrent .BlockingQueue ;
3632import java .util .concurrent .LinkedBlockingQueue ;
37- import java .util .concurrent .TimeUnit ;
38- import java .util .concurrent .atomic .AtomicBoolean ;
39-
4033import org .apache .kafka .connect .source .SourceRecord ;
4134import org .apache .kafka .connect .source .SourceTask ;
4235import org .slf4j .Logger ;
@@ -59,7 +52,7 @@ public class SolaceSourceTask extends SourceTask { // implements XMLMessageListe
5952 String skafkaTopic ;
6053 SolaceSourceTopicListener topicListener = null ;
6154 SolaceSourceQueueConsumer queueConsumer = null ;
62-
55+ private int spinTurns = 0 ;
6356 private volatile boolean shuttingDown = false ;
6457
6558 // private Class<?> cProcessor;
@@ -74,6 +67,16 @@ public String version() {
7467 public void start (Map <String , String > props ) {
7568
7669 connectorConfig = new SolaceSourceConnectorConfig (props );
70+ try {
71+ processor = connectorConfig
72+ .getConfiguredInstance (SolaceSourceConstants
73+ .SOL_MESSAGE_PROCESSOR , SolMessageProcessorIF .class );
74+ } catch (Exception e ) {
75+ log .info (
76+ "================ Encountered exception in creating the message processor."
77+ + " Cause: {}, Stacktrace: {} " ,
78+ e .getCause (), e .getStackTrace ());
79+ }
7780 skafkaTopic = connectorConfig .getString (SolaceSourceConstants .KAFKA_TOPIC );
7881 solSessionHandler = new SolSessionHandler (connectorConfig );
7982 try {
@@ -106,20 +109,23 @@ public void start(Map<String, String> props) {
106109 public synchronized List <SourceRecord > poll () throws InterruptedException {
107110
108111 if (shuttingDown || ingressMessages .size () == 0 ) {
109- return null ; // Nothing to do, return control
112+ spinTurns ++;
113+ if (spinTurns > 100 ) {
114+ spinTurns = 0 ;
115+ Thread .sleep (1 );
116+ }
117+ return null ; // Nothing to do, return control
110118 }
111119 // There is at least one message to process
120+ spinTurns = 0 ; // init spinTurns again
112121 List <SourceRecord > records = new ArrayList <>();
113122 int processedInIhisBatch = 0 ;
114123 int count = 0 ;
115124 int arraySize = ingressMessages .size ();
116125 while (count < arraySize ) {
117126 BytesXMLMessage msg = ingressMessages .take ();
118127 try {
119- processor = connectorConfig
120- .getConfiguredInstance (SolaceSourceConstants
121- .SOL_MESSAGE_PROCESSOR , SolMessageProcessorIF .class )
122- .process (connectorConfig .getString (SolaceSourceConstants .SOL_KAFKA_MESSAGE_KEY ), msg );
128+ processor .process (connectorConfig .getString (SolaceSourceConstants .SOL_KAFKA_MESSAGE_KEY ), msg );
123129 } catch (Exception e ) {
124130 log .info (
125131 "================ Encountered exception in message processing....discarded."
0 commit comments