11package com .solace .connector .kafka .connect .sink .it ;
22
3+ import static java .util .concurrent .TimeUnit .MINUTES ;
4+ import static java .util .concurrent .TimeUnit .SECONDS ;
5+ import static org .apache .commons .lang3 .RandomStringUtils .randomAlphanumeric ;
6+ import static org .assertj .core .api .Assertions .assertThat ;
7+ import static org .awaitility .Awaitility .await ;
8+ import static org .junit .jupiter .api .Assertions .assertArrayEquals ;
9+ import static org .junit .jupiter .api .Assertions .assertEquals ;
10+ import static org .junit .jupiter .api .Assertions .assertNotNull ;
11+ import static org .junit .jupiter .api .Assertions .fail ;
12+
313import com .google .gson .Gson ;
414import com .google .gson .GsonBuilder ;
515import com .google .gson .JsonObject ;
2939import com .solacesystems .jcsmp .SDTMap ;
3040import eu .rekawek .toxiproxy .model .ToxicDirection ;
3141import eu .rekawek .toxiproxy .model .toxic .Latency ;
42+ import java .util .ArrayList ;
43+ import java .util .Collection ;
44+ import java .util .List ;
45+ import java .util .Map ;
46+ import java .util .Properties ;
47+ import java .util .concurrent .Future ;
48+ import java .util .concurrent .ScheduledExecutorService ;
49+ import java .util .concurrent .TimeUnit ;
50+ import java .util .stream .Collectors ;
51+ import java .util .stream .Stream ;
3252import org .apache .commons .lang3 .RandomStringUtils ;
3353import org .apache .kafka .clients .admin .ConsumerGroupListing ;
3454import org .apache .kafka .clients .admin .ListConsumerGroupOffsetsResult ;
5474import org .testcontainers .containers .output .WaitingConsumer ;
5575import org .testcontainers .shaded .com .google .common .collect .ImmutableMap ;
5676
57- import java .time .Duration ;
58- import java .util .ArrayList ;
59- import java .util .Collection ;
60- import java .util .List ;
61- import java .util .Map ;
62- import java .util .Properties ;
63- import java .util .concurrent .Future ;
64- import java .util .concurrent .ScheduledExecutorService ;
65- import java .util .concurrent .TimeUnit ;
66- import java .util .concurrent .atomic .AtomicReference ;
67- import java .util .stream .Collectors ;
68- import java .util .stream .Stream ;
69-
70- import static org .apache .commons .lang3 .RandomStringUtils .randomAlphanumeric ;
71- import static org .hamcrest .CoreMatchers .containsString ;
72- import static org .hamcrest .MatcherAssert .assertThat ;
73- import static org .hamcrest .Matchers .hasItems ;
74- import static org .hamcrest .Matchers .hasSize ;
75- import static org .junit .jupiter .api .Assertions .assertArrayEquals ;
76- import static org .junit .jupiter .api .Assertions .assertEquals ;
77- import static org .junit .jupiter .api .Assertions .assertNotNull ;
78- import static org .junit .jupiter .api .Assertions .assertTimeoutPreemptively ;
79- import static org .junit .jupiter .api .Assertions .fail ;
80-
8177@ ExtendWith (ExecutorServiceExtension .class )
8278@ ExtendWith (PubSubPlusExtension .class )
8379@ ExtendWith (KafkaArgumentsProvider .AutoDeleteSolaceConnectorDeploymentAfterEach .class )
84- public class SinkConnectorIT implements TestConstants {
80+ class SinkConnectorIT implements TestConstants {
8581
8682 static Logger logger = LoggerFactory .getLogger (SinkConnectorIT .class );
8783 static TestSolaceQueueConsumer solaceQueueConsumer ;
@@ -90,6 +86,7 @@ public class SinkConnectorIT implements TestConstants {
9086 enum AdditionalCheck { ATTACHMENTBYTEBUFFER , CORRELATIONID }
9187
9288 private Properties connectorProps ;
89+ private static final Gson GSON = new GsonBuilder ().setPrettyPrinting ().create ();
9390
9491 ////////////////////////////////////////////////////
9592 // Main setup/teardown
@@ -106,7 +103,7 @@ static void setUp(JCSMPSession jcsmpSession) throws JCSMPException {
106103 }
107104
108105 @ BeforeEach
109- public void beforeEach (JCSMPProperties jcsmpProperties ) {
106+ void beforeEach (JCSMPProperties jcsmpProperties ) {
110107 connectorProps = new Properties ();
111108 connectorProps .setProperty (SolaceSinkConstants .SOL_HOST , String .format ("tcp://%s:55555" , NetworkPubSubPlusContainerProvider .DOCKER_NET_PUBSUB_ALIAS ));
112109 connectorProps .setProperty (SolaceSinkConstants .SOL_USERNAME , jcsmpProperties .getStringProperty (JCSMPProperties .USERNAME ));
@@ -161,9 +158,11 @@ List<BytesXMLMessage> assertMessageReceived(KafkaContext kafkaContext,
161158 RecordMetadata metadata ,
162159 Map <AdditionalCheck , String > additionalChecks )
163160 throws SDTException , InterruptedException {
164- assertTimeoutPreemptively (Duration .ofMinutes (5 ), () -> {
165- boolean isCommitted ;
166- do {
161+ await ("Kafka offset to be committed" )
162+ .atMost (5 , MINUTES )
163+ .pollInterval (1 , SECONDS )
164+ .ignoreExceptions ()
165+ .until (() -> {
167166 Stream <String > groupIds = kafkaContext .getAdminClient ().listConsumerGroups ().all ().get ().stream ()
168167 .map (ConsumerGroupListing ::groupId );
169168
@@ -183,15 +182,10 @@ List<BytesXMLMessage> assertMessageReceived(KafkaContext kafkaContext,
183182 .findAny ()
184183 .orElse (null );
185184
186- isCommitted = partitionOffset != null && partitionOffset >= metadata .offset ();
187-
188- if (!isCommitted ) {
189- logger .info ("Waiting for record {} to be committed. Partition offset: {}" , metadata , partitionOffset );
190- Thread .sleep (TimeUnit .SECONDS .toMillis (1 ));
191- }
192- } while (!isCommitted );
193- logger .info ("Record {} was committed" , metadata );
194- });
185+ logger .info ("Waiting for record {} to be committed. Partition offset: {}" , metadata , partitionOffset );
186+ return partitionOffset != null && partitionOffset >= metadata .offset ();
187+ });
188+ logger .info ("Record {} was committed" , metadata );
195189
196190 List <BytesXMLMessage > receivedMessages = new ArrayList <>();
197191
@@ -201,7 +195,7 @@ List<BytesXMLMessage> assertMessageReceived(KafkaContext kafkaContext,
201195 assertNotNull (queueMessage );
202196 receivedMessages .add (queueMessage );
203197 } else {
204- assert (TestSolaceQueueConsumer .solaceReceivedQueueMessages .size () == 0 );
198+ assert (TestSolaceQueueConsumer .solaceReceivedQueueMessages .isEmpty () );
205199 }
206200 for (String s : expectedSolaceTopics ) {
207201 BytesXMLMessage newTopicMessage = TestSolaceTopicConsumer .solaceReceivedTopicMessages .poll (5 ,TimeUnit .SECONDS );
@@ -227,7 +221,7 @@ List<BytesXMLMessage> assertMessageReceived(KafkaContext kafkaContext,
227221 assertEquals (metadata .topic (), userHeader .getString ("k_topic" ));
228222 assertEquals (Long .toString (metadata .partition ()), userHeader .getString ("k_partition" ));
229223 assertEquals (Long .toString (metadata .offset ()), userHeader .getString ("k_offset" ));
230- assertThat (message .getApplicationMessageType (), containsString (metadata .topic () ));
224+ assertThat (message .getApplicationMessageType ()). contains (metadata .topic ());
231225 // additional checks as requested
232226 if (additionalChecks != null ) {
233227 for (Map .Entry <AdditionalCheck , String > check : additionalChecks .entrySet ()) {
@@ -478,23 +472,22 @@ void kafkaConsumerTextMessageToTopicTest3(@Values(booleans = { true, false }) bo
478472 @ Nested
479473 @ TestInstance (Lifecycle .PER_CLASS )
480474 class SolaceConnectorLifecycleTests {
481- private final Gson GSON = new GsonBuilder ().setPrettyPrinting ().create ();
482475
483476 @ ParameterizedTest
484477 @ ArgumentsSource (KafkaArgumentsProvider .class )
485478 void testFailPubSubConnection (KafkaContext kafkaContext ) {
486- connectorProps .setProperty ("sol.vpn_name" , randomAlphanumeric (10 ));
479+ connectorProps .setProperty ("sol.vpn_name" , RandomStringUtils . insecure (). nextAlphanumeric (10 ));
487480 kafkaContext .getSolaceConnectorDeployment ().startConnector (connectorProps , true );
488- AtomicReference <JsonObject > connectorStatus = new AtomicReference <>(new JsonObject ());
489- assertTimeoutPreemptively (Duration .ofMinutes (1 ), () -> {
490- JsonObject taskStatus ;
491- do {
481+ await ("connector to fail" )
482+ .atMost (1 , MINUTES )
483+ .untilAsserted (() -> {
492484 JsonObject status = kafkaContext .getSolaceConnectorDeployment ().getConnectorStatus ();
493- connectorStatus .set (status );
494- taskStatus = status .getAsJsonArray ("tasks" ).get (0 ).getAsJsonObject ();
495- } while (!taskStatus .get ("state" ).getAsString ().equals ("FAILED" ));
496- assertThat (taskStatus .get ("trace" ).getAsString (), containsString ("Message VPN Not Allowed" ));
497- }, () -> "Timed out waiting for connector to fail: " + GSON .toJson (connectorStatus .get ()));
485+ JsonObject taskStatus = status .getAsJsonArray ("tasks" ).get (0 ).getAsJsonObject ();
486+ assertThat (taskStatus .get ("state" ).getAsString ())
487+ .as ("Connector task not in FAILED state: %s" , GSON .toJson (status ))
488+ .isEqualTo ("FAILED" );
489+ assertThat (taskStatus .get ("trace" ).getAsString ()).contains ("Message VPN Not Allowed" );
490+ });
498491 }
499492
500493 @ CartesianTest (name = "[{index}] dynamicDestination={0}, autoFlush={1}, kafka={2}" )
@@ -604,13 +597,16 @@ void testCommitRollback(@Values(booleans = { false, true }) boolean dynamicDesti
604597 null , null );
605598 sempV2Api .config ().createMsgVpnQueueSubscription (SOL_VPN , queue .getName (),
606599 new ConfigMsgVpnQueueSubscription ().subscriptionTopic (topicName ), null , null );
607- assertTimeoutPreemptively (Duration .ofSeconds (30 ), () -> {
608- while (sempV2Api .monitor ().getMsgVpnQueue (SOL_VPN , queue .getName (), null ).getData ()
609- .getMaxMsgSize () != 1 ) {
600+ await ("queue max message size to be updated to 1" )
601+ .atMost (30 , SECONDS )
602+ .pollInterval (100 , TimeUnit .MILLISECONDS )
603+ .until (() -> {
610604 logger .info ("Waiting for queue {} to have max message size of 1" , queue .getName ());
611- Thread .sleep (100 );
612- }
613- });
605+ return sempV2Api .monitor ()
606+ .getMsgVpnQueue (SOL_VPN , queue .getName (), null )
607+ .getData ()
608+ .getMaxMsgSize () == 1 ;
609+ });
614610
615611 try (TestSolaceQueueConsumer solaceConsumer1 = new TestSolaceQueueConsumer (jcsmpSession )) {
616612 solaceConsumer1 .setQueueName (queue .getName ());
@@ -669,12 +665,12 @@ void testCommitRollback(@Values(booleans = { false, true }) boolean dynamicDesti
669665 }
670666
671667 if (dynamicDestination ) {
672- assertThat (receivedMsgDestinations , hasSize ( 1 ));
673- assertThat ( receivedMsgDestinations , hasItems ( JCSMPFactory .onlyInstance ().createTopic (topicName ) ));
668+ assertThat (receivedMsgDestinations ). containsExactly (
669+ JCSMPFactory .onlyInstance ().createTopic (topicName ));
674670 } else {
675- assertThat (receivedMsgDestinations , hasSize ( 2 ));
676- assertThat ( receivedMsgDestinations , hasItems ( queue ,
677- JCSMPFactory .onlyInstance ().createTopic (topicName ) ));
671+ assertThat (receivedMsgDestinations ). containsExactlyInAnyOrder (
672+ queue ,
673+ JCSMPFactory .onlyInstance ().createTopic (topicName ));
678674 }
679675 }
680676 }
0 commit comments