@@ -50,7 +50,7 @@ public class SolaceSparkStreamingSourceIT {
5050 .withShmSize (SHM_SIZE )
5151 .withUlimits (ulimitList )
5252 .withCpuCount (1l );
53- }).withExposedPorts (8080 , 55555 ).withTopic ("solace/spark/streaming" , Service .SMF ).withTopic ("solace/spark/connector/offset" , Service .SMF );
53+ }).withExposedPorts (8080 , 55555 ).withTopic ("solace/spark/streaming" , Service .SMF ).withTopic ("solace/spark/connector/offset" , Service .SMF ). withTopic ( "solace/spark/streaming/offset" , Service . SMF ) ;
5454 private SparkSession sparkSession ;
5555 @ BeforeAll
5656 public void beforeAll () throws ApiException {
@@ -73,6 +73,22 @@ public void beforeAll() throws ApiException {
7373
7474 sempV2Api .config ().createMsgVpnQueue ("default" , queue , null , null );
7575 sempV2Api .config ().createMsgVpnQueueSubscription ("default" , "Solace/Queue/0" , subscription , null , null );
76+
77+ MsgVpnQueue lvq = new MsgVpnQueue ();
78+ lvq .queueName ("Solace/Queue/lvq/0" );
79+ lvq .accessType (MsgVpnQueue .AccessTypeEnum .EXCLUSIVE );
80+ lvq .permission (MsgVpnQueue .PermissionEnum .NO_ACCESS );
81+ lvq .setOwner ("default" );
82+ lvq .ingressEnabled (true );
83+ lvq .egressEnabled (true );
84+ lvq .setMaxMsgSpoolUsage (0l );
85+
86+ MsgVpnQueueSubscription lvqSubscription = new MsgVpnQueueSubscription ();
87+ lvqSubscription .setSubscriptionTopic ("solace/spark/streaming/offset" );
88+
89+ sempV2Api .config ().createMsgVpnQueue ("default" , lvq , null , null );
90+ sempV2Api .config ().createMsgVpnQueueSubscription ("default" , "Solace/Queue/lvq/0" , lvqSubscription , null , null );
91+
7692 } else {
7793 throw new RuntimeException ("Solace Container is not started yet" );
7894 }
@@ -742,4 +758,66 @@ void Should_Fail_IfLVQTopic_Has_No_Permission_To_Publish() {
742758 assertTrue (e instanceof StreamingQueryException );
743759 }
744760 }
761+
762+ @ Test
763+ void Should_Fail_IfLVQ_Has_No_Permission_To_Access () {
764+ Path path = Paths .get ("src" , "test" , "resources" , "spark-checkpoint-1" );
765+ try {
766+ DataStreamReader reader = sparkSession .readStream ()
767+ .option (SolaceSparkStreamingProperties .HOST , solaceContainer .getOrigin (Service .SMF ))
768+ .option (SolaceSparkStreamingProperties .VPN , solaceContainer .getVpn ())
769+ .option (SolaceSparkStreamingProperties .USERNAME , solaceContainer .getUsername ())
770+ .option (SolaceSparkStreamingProperties .PASSWORD , solaceContainer .getPassword ())
771+ .option (SolaceSparkStreamingProperties .SOLACE_CONNECT_RETRIES , 1 )
772+ .option (SolaceSparkStreamingProperties .SOLACE_RECONNECT_RETRIES , 1 )
773+ .option (SolaceSparkStreamingProperties .SOLACE_CONNECT_RETRIES_PER_HOST , 1 )
774+ .option (SolaceSparkStreamingProperties .SOLACE_RECONNECT_RETRIES_WAIT_TIME , 100 )
775+ .option (SolaceSparkStreamingProperties .SOLACE_API_PROPERTIES_PREFIX +"sub_ack_window_threshold" , 75 )
776+ .option (SolaceSparkStreamingProperties .QUEUE , "Solace/Queue/0" )
777+ .option (SolaceSparkStreamingProperties .BATCH_SIZE , "1" )
778+ .option (SolaceSparkStreamingProperties .SOLACE_SPARK_CONNECTOR_LVQ_NAME , "Solace/Queue/lvq/0" )
779+ .option (SolaceSparkStreamingProperties .SOLACE_SPARK_CONNECTOR_LVQ_TOPIC , "solace/spark/streaming/offset" )
780+ .option ("checkpointLocation" , path .toAbsolutePath ().toString ())
781+ .format ("solace" );
782+ Dataset <Row > dataset = reader .load ();
783+ StreamingQuery streamingQuery = dataset .writeStream ().foreachBatch ((VoidFunction2 <Dataset <Row >, Long >) (dataset1 , batchId ) -> {
784+ System .out .println (dataset1 .count ());
785+ }).start ();
786+ streamingQuery .awaitTermination ();
787+ } catch (Exception e ) {
788+ System .out .println (e );
789+ assertTrue (e instanceof StreamingQueryException );
790+ }
791+ }
792+
793+ @ Test
794+ void Should_Fail_IfLVQ_Has_No_Permission_To_Add_Subscription () {
795+ Path path = Paths .get ("src" , "test" , "resources" , "spark-checkpoint-1" );
796+ try {
797+ DataStreamReader reader = sparkSession .readStream ()
798+ .option (SolaceSparkStreamingProperties .HOST , solaceContainer .getOrigin (Service .SMF ))
799+ .option (SolaceSparkStreamingProperties .VPN , solaceContainer .getVpn ())
800+ .option (SolaceSparkStreamingProperties .USERNAME , solaceContainer .getUsername ())
801+ .option (SolaceSparkStreamingProperties .PASSWORD , solaceContainer .getPassword ())
802+ .option (SolaceSparkStreamingProperties .SOLACE_CONNECT_RETRIES , 1 )
803+ .option (SolaceSparkStreamingProperties .SOLACE_RECONNECT_RETRIES , 1 )
804+ .option (SolaceSparkStreamingProperties .SOLACE_CONNECT_RETRIES_PER_HOST , 1 )
805+ .option (SolaceSparkStreamingProperties .SOLACE_RECONNECT_RETRIES_WAIT_TIME , 100 )
806+ .option (SolaceSparkStreamingProperties .SOLACE_API_PROPERTIES_PREFIX +"sub_ack_window_threshold" , 75 )
807+ .option (SolaceSparkStreamingProperties .QUEUE , "Solace/Queue/0" )
808+ .option (SolaceSparkStreamingProperties .BATCH_SIZE , "1" )
809+ .option (SolaceSparkStreamingProperties .SOLACE_SPARK_CONNECTOR_LVQ_NAME , "Solace/Queue/lvq/0" )
810+ .option (SolaceSparkStreamingProperties .SOLACE_SPARK_CONNECTOR_LVQ_TOPIC , "invalid/topic" )
811+ .option ("checkpointLocation" , path .toAbsolutePath ().toString ())
812+ .format ("solace" );
813+ Dataset <Row > dataset = reader .load ();
814+ StreamingQuery streamingQuery = dataset .writeStream ().foreachBatch ((VoidFunction2 <Dataset <Row >, Long >) (dataset1 , batchId ) -> {
815+ System .out .println (dataset1 .count ());
816+ }).start ();
817+ streamingQuery .awaitTermination ();
818+ } catch (Exception e ) {
819+ System .out .println (e );
820+ assertTrue (e instanceof StreamingQueryException );
821+ }
822+ }
745823}
0 commit comments