-
Notifications
You must be signed in to change notification settings - Fork 142
/
Copy pathbmqimp_brokersession.t.cpp
10608 lines (8723 loc) · 392 KB
/
bmqimp_brokersession.t.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2014-2023 Bloomberg Finance L.P.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// bmqimp_brokersession.t.cpp -*-C++-*-
#include <bmqimp_brokersession.h>
// BMQ
#include <bmqimp_event.h>
#include <bmqimp_manualhosthealthmonitor.h>
#include <bmqimp_queue.h>
#include <bmqp_ackeventbuilder.h>
#include <bmqp_blobpoolutil.h>
#include <bmqp_confirmeventbuilder.h>
#include <bmqp_crc32c.h>
#include <bmqp_ctrlmsg_messages.h>
#include <bmqp_event.h>
#include <bmqp_messageguidgenerator.h>
#include <bmqp_protocol.h>
#include <bmqp_puteventbuilder.h>
#include <bmqp_queueid.h>
#include <bmqp_schemaeventbuilder.h>
#include <bmqpi_dtcontext.h>
#include <bmqpi_dtspan.h>
#include <bmqpi_dttracer.h>
#include <bmqscm_version.h>
#include <bmqt_correlationid.h>
#include <bmqt_messageguid.h>
#include <bmqt_queueflags.h>
#include <bmqt_uri.h>
#include <bmqio_status.h>
#include <bmqio_testchannel.h>
#include <bmqsys_time.h>
#include <bmqu_memoutstream.h>
// BDE
#include <bdlbb_pooledblobbufferfactory.h>
#include <bdlcc_deque.h>
#include <bdlf_memfn.h>
#include <bdlmt_eventscheduler.h>
#include <bdlmt_signaler.h>
#include <bsl_memory.h>
#include <bslma_managedptr.h>
#include <bslmt_lockguard.h>
#include <bslmt_mutex.h>
#include <bslmt_semaphore.h>
#include <bslmt_threadutil.h>
#include <bslmt_timedsemaphore.h>
#include <bsls_assert.h>
#include <bsls_platform.h>
#include <bsls_systemclocktype.h>
#include <bsls_systemtime.h>
// TEST DRIVER
#include <bmqtst_testhelper.h>
// CONVENIENCE
using namespace BloombergLP;
using namespace bsl;
using bmqimp::ManualHostHealthMonitor;
// ============================================================================
// TEST HELPERS UTILITY
// ----------------------------------------------------------------------------
namespace {
// CONSTANTS
const char k_URI[] = "bmq://ts.trades.myapp/my.queue?id=my.app";
/// Struct to initialize system time component
struct TestClock {
// DATA
bdlmt::EventScheduler& d_scheduler;
bdlmt::EventSchedulerTestTimeSource d_timeSource;
// CREATORS
TestClock(bdlmt::EventScheduler& scheduler)
: d_scheduler(scheduler)
, d_timeSource(&scheduler)
{
// NOTHING
}
// MANIPULATORS
bsls::TimeInterval realtimeClock() { return d_timeSource.now(); }
bsls::TimeInterval monotonicClock() { return d_timeSource.now(); }
bsls::Types::Int64 highResTimer()
{
return d_timeSource.now().totalNanoseconds();
}
};
static void eventHandler(bsl::shared_ptr<bmqimp::Event>* resultEvent,
bslmt::TimedSemaphore& eventSem,
const bsl::shared_ptr<bmqimp::Event>& event)
{
PV_SAFE("Got an event: " << *event);
*resultEvent = event;
eventSem.post();
}
static void eventHandlerSyncCall(bsl::shared_ptr<bmqimp::Event>* resultEvent,
bmqimp::BrokerSession* session,
bsl::shared_ptr<bmqimp::Queue> queue,
bslmt::TimedSemaphore& eventSem,
const bsl::shared_ptr<bmqimp::Event>& event)
{
const bsls::TimeInterval& timeout = bsls::TimeInterval();
bmqt::QueueOptions queueOptions;
PV_SAFE("Got an event: " << *event);
*resultEvent = event;
// Check SYNC queue API
BMQTST_ASSERT_EQ(session->openQueue(queue, timeout),
bmqt::OpenQueueResult::e_NOT_CONNECTED);
BMQTST_ASSERT_EQ(session->configureQueue(queue, queueOptions, timeout),
bmqt::ConfigureQueueResult::e_INVALID_QUEUE);
BMQTST_ASSERT_EQ(session->closeQueue(queue, timeout),
bmqt::CloseQueueResult::e_UNKNOWN_QUEUE);
eventSem.post();
}
/// REVISIT: usage of these semaphores here and everywhere after #1974.
static int stateCb(bmqimp::BrokerSession::State::Enum oldState,
bmqimp::BrokerSession::State::Enum newState,
bsls::AtomicInt* startCounter,
bsls::AtomicInt* stopCounter)
{
PV_SAFE("StateCb: " << oldState << " -> " << newState);
if (newState == bmqimp::BrokerSession::State::e_STARTING) {
++(*startCounter);
}
else if (newState == bmqimp::BrokerSession::State::e_STOPPED) {
++(*stopCounter);
}
return 0;
}
static int transitionCb(
bmqimp::BrokerSession::State::Enum oldState,
bmqimp::BrokerSession::State::Enum newState,
bsl::vector<bmqimp::BrokerSession::StateTransition>* transitionTable,
bsls::AtomicInt* rc,
bslmt::TimedSemaphore* doneSem)
{
PV_SAFE("transitionCb: " << oldState << " -> " << newState
<< " (tableSize:" << transitionTable->size()
<< ")");
bsl::vector<bmqimp::BrokerSession::StateTransition>::iterator it;
for (it = transitionTable->begin(); it != transitionTable->end(); ++it) {
if (it->d_currentState == oldState && it->d_newState == newState) {
transitionTable->erase(it);
break;
}
}
if (transitionTable->empty()) {
doneSem->post();
}
return *rc;
}
void channelSetEventHandler(const bsl::shared_ptr<bmqimp::Event>& event,
bsls::AtomicInt& eventCounter,
bslmt::TimedSemaphore& stopSem,
int maxEvents)
{
const bmqt::CorrelationId k_EMPTY_CORRID;
PV_SAFE("Incoming event: " << *event);
BMQTST_ASSERT(event != 0);
BMQTST_ASSERT_EQ(event->type(), bmqimp::Event::EventType::e_SESSION);
++eventCounter;
if (eventCounter == maxEvents) {
BMQTST_ASSERT_EQ(event->sessionEventType(),
bmqt::SessionEventType::e_CONNECTION_LOST);
return; // RETURN
}
if (eventCounter + 1 > maxEvents) {
BMQTST_ASSERT_EQ(event->sessionEventType(),
bmqt::SessionEventType::e_DISCONNECTED);
stopSem.post();
return; // RETURN
}
int order = eventCounter % 3;
if (eventCounter == 1) {
BMQTST_ASSERT_EQ(event->sessionEventType(),
bmqt::SessionEventType::e_CONNECTED);
}
else if (eventCounter == 2) {
BMQTST_ASSERT_EQ(event->sessionEventType(),
bmqt::SessionEventType::e_CONNECTION_LOST);
}
else if (order == 0) {
BMQTST_ASSERT_EQ(event->sessionEventType(),
bmqt::SessionEventType::e_RECONNECTED);
}
else if (order == 1) {
BMQTST_ASSERT_EQ(event->sessionEventType(),
bmqt::SessionEventType::e_STATE_RESTORED);
}
else if (order == 2) {
BMQTST_ASSERT_EQ(event->sessionEventType(),
bmqt::SessionEventType::e_CONNECTION_LOST);
}
else {
BSLS_ASSERT_OPT(false && "Unreachable by design");
}
BMQTST_ASSERT_EQ(event->statusCode(), 0);
BMQTST_ASSERT_EQ(event->correlationId(), k_EMPTY_CORRID);
}
void channelEventHandler(const bsl::shared_ptr<bmqimp::Event>& event,
bslmt::TimedSemaphore* eventSem)
{
PV_SAFE("Incoming event: " << *event);
eventSem->post();
}
void sessionEventHandler(
bdlcc::Deque<bsl::shared_ptr<bmqimp::Event> >* eventQueue,
const bsl::shared_ptr<bmqimp::Event>& event)
{
BMQTST_ASSERT(event);
// May be Session or Message event
PV_SAFE("Incoming event: " << *event);
eventQueue->pushBack(event);
}
bool waitRealTime(bslmt::TimedSemaphore* sem)
{
BMQTST_ASSERT(sem);
const bsls::TimeInterval k_REALTIME_TIMEOUT = bsls::TimeInterval(15);
int rc = sem->timedWait(
bsls::SystemTime::now(bsls::SystemClockType::e_REALTIME) +
k_REALTIME_TIMEOUT);
return rc == 0;
}
bool isConfigure(const bmqp_ctrlmsg::ControlMessage& request)
{
return request.choice().isConfigureStreamValue() ||
request.choice().isConfigureQueueStreamValue();
}
void makeResponse(bmqp_ctrlmsg::ControlMessage* response,
const bmqp_ctrlmsg::ControlMessage& request)
{
BMQTST_ASSERT(!request.rId().isNull());
response->rId().makeValue(request.rId().value());
if (request.choice().isConfigureStreamValue()) {
response->choice().makeConfigureStreamResponse();
response->choice().configureStreamResponse().request() =
request.choice().configureStream();
}
else {
response->choice().makeConfigureQueueStreamResponse();
response->choice().configureQueueStreamResponse().request() =
request.choice().configureQueueStream();
}
}
void getConsumerInfo(bmqp_ctrlmsg::ConsumerInfo* out,
bmqp_ctrlmsg::ControlMessage request)
{
if (request.choice().isConfigureStreamValue()) {
*out = bmqp::ProtocolUtil::consumerInfo(
request.choice().configureStream().streamParameters());
}
else {
bmqp_ctrlmsg::StreamParameters temp;
const bmqp_ctrlmsg::QueueStreamParameters& in =
request.choice().configureQueueStream().streamParameters();
bmqp::ProtocolUtil::convert(&temp, in);
*out = bmqp::ProtocolUtil::consumerInfo(temp);
}
}
/// This class provides a wrapper on top of the BrokerSession under test and
/// implements a few mechanisms to help testing the object.
struct TestSession BSLS_CPP11_FINAL {
public:
// CONSTANTS
static const bsls::TimeInterval k_EVENT_TIMEOUT;
static const bsls::TimeInterval k_TIME_SOURCE_STEP;
public:
// TYPES
enum RequestType {
e_REQ_OPEN_QUEUE = 0,
e_REQ_CONFIG_QUEUE = 1,
e_REQ_CLOSE_QUEUE = 2,
e_REQ_DISCONNECT = 3,
e_REQ_UNDEFINED = -1
};
enum ErrorResult {
e_ERR_NOT_SENT = 0, // request failed to send
e_ERR_BAD_RESPONSE = 1, // bad response
e_ERR_LATE_RESPONSE = 2 // late response
};
enum QueueTestStep {
e_OPEN_OPENING = 0 // pending open queue request
,
e_OPEN_CONFIGURING = 1 // pending config queue request
,
e_OPEN_OPENED = 2 // no pending requests
,
e_REOPEN_OPENING = 3 // pending open queue request
,
e_REOPEN_CONFIGURING = 4 // pending config queue request
,
e_REOPEN_REOPENED = 5 // no pending requests
,
e_CONFIGURING = 6 // pending config queue request
,
e_CONFIGURING_RECONFIGURING = 7 // pending config queue request
,
e_CONFIGURED = 8 // no pending requests
,
e_CLOSE_CONFIGURING = 9 // pending config queue request
,
e_CLOSE_CLOSING = 10 // pending close queue request
,
e_CLOSE_CLOSED = 11 // no pending requests
};
enum LateResponseTestStep {
e_LATE_OPEN_OPENING = 0 // late open 1st part respose
,
e_LATE_OPEN_CONFIGURING_CFG = 1 // late open 2nd part respose
,
e_LATE_OPEN_CONFIGURING_CLS = 2 // pending close queue request
,
e_LATE_REOPEN_OPENING = 3 // late reopen 1st part respose
,
e_LATE_REOPEN_CONFIGURING_CFG = 4 // late reopen 2st part respose
,
e_LATE_REOPEN_CONFIGURING_CLS = 5 // pending close queue request
,
e_LATE_RECONFIGURING = 6 // expired reconfigure request
,
e_LATE_CLOSE_CONFIGURING = 7 // late close 1st part response
,
e_LATE_CLOSE_CLOSING = 8 // late close 2st part response
};
typedef bdlcc::Deque<bsl::shared_ptr<bmqimp::Event> > EventQueue;
public:
// PUBLIC DATA
bslma::Allocator* d_allocator_p;
// Allocator to use
bdlbb::PooledBlobBufferFactory d_blobBufferFactory;
// Buffer factory provided to the
// various builders
/// Blob pool used to provide blobs to event builders.
bmqp::BlobPoolUtil::BlobSpPoolSp d_blobSpPool_sp;
bdlmt::EventScheduler& d_scheduler;
// event scheduler used in the
// broker session
bmqio::TestChannel d_testChannel;
// mocked network channel object
// to be used in the broker session
EventQueue d_eventQueue;
// thread-safe deque to store
// incoming BlazingMQ events
bmqimp::BrokerSession d_brokerSession;
// the broker session object
// under the test
bdlmt::SignalerConnection d_onChannelCloseHandler;
// signaler handler associated with
// onChannelClose slot
TestClock* d_testClock_p;
// pointer to struct to initialize system time
int d_startCounter;
int d_stopCounter;
// register state changes
private:
TestSession(const TestSession& other) BSLS_CPP11_DELETED;
TestSession& operator=(const TestSession& other) BSLS_CPP11_DELETED;
public:
// CREATORS
/// Create a `TestSession` object using the specified `sessionOptions`,
/// `scheduler`, and `allocator` to supply memory.
explicit TestSession(
const bmqt::SessionOptions& sessionOptions,
bdlmt::EventScheduler& scheduler,
bool useEventHandler = true,
bslma::Allocator* allocator = bmqtst::TestHelperUtil::allocator());
/// Create a `TestSession` object using the specified `sessionOptions`,
/// `clockPtr`, and `allocator` to supply memory.
explicit TestSession(
const bmqt::SessionOptions& sessionOptions,
TestClock& testClock,
bool useEventHandler = true,
bslma::Allocator* allocator = bmqtst::TestHelperUtil::allocator());
~TestSession();
// ACCESSORS
bmqimp::BrokerSession& session();
bmqio::TestChannel& channel();
bslma::Allocator* allocator();
bmqp::BlobPoolUtil::BlobSpPool& blobSpPool();
// MANIPULATORS
/// Start the broker session object under the test and sets the test
/// channel. Internally this method waits for
/// SessionEventType::e_CONNECTED event.
/// Assert in case of any error or internal timeout.
void startAndConnect(bool expectHostUnhealthy = false);
/// Stop synchronously. Verify immediate presence of DISCONNECTED event
/// and the state.
bool stop();
/// Open the queue by sending and processing corresponding requests.
/// Inject open and configure queue responses and wait for
/// SessionEventType::e_QUEUE_OPEN_RESULT event.
/// Assert in case of any error or expired timeout
void openQueue(bsl::shared_ptr<bmqimp::Queue> queue,
const bsls::TimeInterval& timeout = bsls::TimeInterval(5),
const bool skipReaderCfgs = false);
/// Execute open queue procedure and reproduce requested error type
/// at corresponding stage (open queue or open configure queue).
/// Assert in case of any error or expired timeout
void openQueueWithError(bsl::shared_ptr<bmqimp::Queue> queue,
const RequestType requestType,
const ErrorResult errorResult,
const bsls::TimeInterval& timeout);
/// Process the specified `queue` reopening by injecting open and
/// configure queue responses and wait for
/// `SessionEventType::e_QUEUE_REOPEN_RESULT` event. Assert in case of
/// any error or expired timeout.
void
reopenQueue(bsl::shared_ptr<bmqimp::Queue> queue,
const bsls::TimeInterval& timeout = bsls::TimeInterval(5));
/// Closes the specified `queue` by injecting close and configure queue
/// responses and wait for `SessionEventType::e_QUEUE_CLOSE_RESULT`
/// event. Assert in case of any error or expired timeout.
void closeQueue(bsl::shared_ptr<bmqimp::Queue> queue,
const bsls::TimeInterval& timeout = bsls::TimeInterval(5),
bool isFinal = true);
/// Execute close queue procedure and reproduce requested error type
/// at corresponding stage (close configure queue and close queue).
/// Assert in case of any error or expired timeout
void closeQueueWithError(bsl::shared_ptr<bmqimp::Queue> queue,
const RequestType requestType,
const ErrorResult errorResult,
bool isFinal,
const bsls::TimeInterval& timeout);
/// Close the specified `queue` by injecting close queueresponses and
/// wait for `SessionEventType::e_QUEUE_CLOSE_RESULT` event. Assert in
/// case of any error or expired timeout.
void closeDeconfiguredQueue(
bsl::shared_ptr<bmqimp::Queue> queue,
const bsls::TimeInterval& timeout = bsls::TimeInterval(5));
void configureQueueSync(
bsl::shared_ptr<bmqimp::Queue>& queue,
const bmqt::QueueOptions& options,
const bsls::TimeInterval& timeout = bsls::TimeInterval(5));
/// For the specified `queue` call async opening and verify open queue
/// request is sent to the channel. Return control message sent to the
/// channel.
bmqp_ctrlmsg::ControlMessage
openQueueFirstStep(bsl::shared_ptr<bmqimp::Queue> queue,
const bsls::TimeInterval& timeout);
/// For the specified `queue` in the OPENED state emulate channel drop
/// so that the queue state is changed to PENDING and then reopening
/// procedure is initiated. Return control message with open queue
/// request sent to the channel.
bmqp_ctrlmsg::ControlMessage
reopenQueueFirstStep(bsl::shared_ptr<bmqimp::Queue> queue);
/// For the specified `queue` call async opening and verify open queue
/// request is sent to the channel. Wait until request timeout and send
/// late open queue response. Return the last control message sent to
/// the channel.
bmqp_ctrlmsg::ControlMessage
openQueueFirstStepExpired(bsl::shared_ptr<bmqimp::Queue> queue,
const bsls::TimeInterval& timeout);
/// For the specified `queue` in the OPENED state emulate channel drop
/// so that the queue state is changed to PENDING and then reopening
/// procedure is initiated. Wait until reopen request is expired and
/// send back late open response. Return control message with close
/// queue request sent to the channel.
bmqp_ctrlmsg::ControlMessage
reopenQueueFirstStepExpired(bsl::shared_ptr<bmqimp::Queue> queue,
const bsls::TimeInterval& timeout);
/// For the specified `queue` send the close queue response generated on
/// the specified `closeRequest`. Verify the queue gets closed waiting
/// for the close queue event depending on the specified
/// `waitCloseEvent` flag.
void closeQueueSecondStep(bsl::shared_ptr<bmqimp::Queue> queue,
const bmqp_ctrlmsg::ControlMessage& closeRequest,
bool waitCloseEvent);
/// For the specified `queue` call async closing and verify close queue
/// request is sent to the channel. Wait until request timeout and send
/// late close queue response. Return the last control message sent to
/// the channel.
bmqp_ctrlmsg::ControlMessage
closeQueueSecondStepExpired(bsl::shared_ptr<bmqimp::Queue> queue,
const bsls::TimeInterval& timeout);
bmqp_ctrlmsg::ControlMessage
arriveAtStepWithCfgs(bsl::shared_ptr<bmqimp::Queue> queue,
const QueueTestStep step,
const bsls::TimeInterval& timeout);
bmqp_ctrlmsg::ControlMessage
arriveAtStepWithoutCfgs(bsl::shared_ptr<bmqimp::Queue> queue,
const QueueTestStep step,
const bsls::TimeInterval& timeout);
/// Bring the specified `queue` to the specified test `step` by sending
/// and processing corresponding requests and responses and waiting for
/// related session events. The steps are always reached one-by-one
/// starting from the `e_OPEN_OPENING`. Return last sent request.
/// Assert in case of any error or expired timeout. Two implementations
/// for reader and writer queues (writers skip queue configuring steps).
bmqp_ctrlmsg::ControlMessage
arriveAtStep(bsl::shared_ptr<bmqimp::Queue> queue,
const QueueTestStep step,
const bsls::TimeInterval& timeout,
const bool skipReaderCfgs = false);
void arriveAtLateResponseStep(bsl::shared_ptr<bmqimp::Queue> queue,
const LateResponseTestStep step,
const bsls::TimeInterval& timeout);
void arriveAtLateResponseStepReader(bsl::shared_ptr<bmqimp::Queue> queue,
const LateResponseTestStep step,
const bsls::TimeInterval& timeout);
/// Bring the specified `queue` to the specified late response test
/// `step` by sending and processing corresponding requests and
/// responses and waiting for related session events. Two versions for
/// reader and writer queues (writers skip queue configuring steps).
void arriveAtLateResponseStepWriter(bsl::shared_ptr<bmqimp::Queue> queue,
const LateResponseTestStep step,
const bsls::TimeInterval& timeout);
/// Return a new queue object
bsl::shared_ptr<bmqimp::Queue>
createQueue(const char* name,
bsls::Types::Uint64 flags,
const bmqt::QueueOptions& options = bmqt::QueueOptions());
/// Return a new queue object with the specified `queueFlags` which has
/// already been put into the specified test `step` by sending and
/// processing corresponsing requests and responses and waiting for
/// related session events. Assert in case of any error or expired
/// timeout.
bsl::shared_ptr<bmqimp::Queue> createQueueOnStep(
const QueueTestStep step,
bsls::Types::Uint64 queueFlags,
const bsls::TimeInterval& timeout = bsls::TimeInterval(5));
bsl::shared_ptr<bmqimp::Queue> createQueueOnStep(
const LateResponseTestStep step,
bsls::Types::Uint64 queueFlags,
const bsls::TimeInterval& timeout = bsls::TimeInterval(5));
/// Wait up to the specified `timeout` for a call to close the channel.
/// If such a call happens, invoke the closeSignaler and return true.
bool
waitForChannelClose(const bsls::TimeInterval& timeout = k_EVENT_TIMEOUT);
/// Wait up to the specified `timeout` for the specified `queue` to
/// reach the specified queue `state`. If this happens return true.
bool
waitForQueueState(const bsl::shared_ptr<bmqimp::Queue>& queue,
const bmqimp::QueueState::Enum state,
const bsls::TimeInterval& timeout = k_EVENT_TIMEOUT);
/// Wait up to the specified `timeout` for the specified `queue` to
/// be closed and removed from the queue list. If this happens return
/// true.
bool
waitForQueueRemoved(const bsl::shared_ptr<bmqimp::Queue>& queue,
const bsls::TimeInterval& timeout = k_EVENT_TIMEOUT);
/// Stop the broker session object under the test. Checks that
/// the object sends disconnect request. Inject disconnect response
/// and wait for e_DISCONNECTED event if the specified
/// `waitForDisconnected` flag is true (default).
/// Assert in case of any error or internal timeout.
void stopGracefully(bool waitForDisconnected = true);
/// Set a mocked network channel to the broker session object under the
/// test.
void setChannel();
bool waitConnectedEvent();
bool waitReconnectedEvent();
bool waitStateRestoredEvent();
bool waitDisconnectedEvent();
/// Wait for the specific session event. If there is no incoming event
/// the method will block for the specified timeout (5s default) and
/// then fire assert if there is still no event.
bool waitConnectionLostEvent();
/// Wait for the event with ACK or NACK messages. Return a shared
/// pointer to the event object or empty pointer in case of any error.
bsl::shared_ptr<bmqimp::Event> waitAckEvent();
/// Wait for an e_HOST_UNHEALTHY session event. If there is no incoming
/// event the method will block for the specified timeout (5s default)
/// and then fire assert if there is still no event.
bool waitHostUnhealthyEvent();
/// Wait for an e_QUEUE_SUSPENDED event. If there is no incoming event
/// the method will block and then fire assert if there is still no
/// event.
bool waitQueueSuspendedEvent(
bmqt::GenericResult::Enum status = bmqt::GenericResult::e_SUCCESS);
/// Wait for an e_QUEUE_RESUMED event. If there is no incoming event
/// the method will block and then fire assert if there is still no
/// event.
bool waitQueueResumedEvent(
bmqt::GenericResult::Enum status = bmqt::GenericResult::e_SUCCESS);
/// Wait for an e_HOST_HEALTH_RESTORED session event. If there is no
/// incoming event the method will block for the specified timeout (5s
/// default) and then fire assert if there is still no event.
bool waitHostHealthRestoredEvent();
/// Ensure there is no incoming event. Return false if there is an
/// event.
bool checkNoEvent();
/// Return a BlazingMQ event that has been passed by the broker session
/// object under the test to the session event handler. If there is no
/// incoming event this method will block forever.
bsl::shared_ptr<bmqimp::Event> getInboundEvent();
/// Return true if there is no data that has been sent to the test
/// network channel.
bool isChannelEmpty();
/// Load the specified outMsg object with the data that has been sent to
/// the test network channel. Asserts if there is no control message in
/// the write buffer of the channel.
void getOutboundControlMessage(bmqp_ctrlmsg::ControlMessage* outMsg);
/// Load the specified `rawEvent` object with the data that has been
/// sent to the test network channel. Asserts if there is no BlazingMQ
/// Event in the write buffer of the channel.
void getOutboundEvent(bmqp::Event* rawEvent);
/// Send the specified control message to the broker session object
/// under the test as if had received a BlazingMQ event from the broker.
void sendControlMessage(const bmqp_ctrlmsg::ControlMessage& message);
/// Send a response to the specified `request` to the broker session
/// under testing as if it had been received from a broker. The type of
/// the response is defined by the type of the specified `request`.
void sendResponse(const bmqp_ctrlmsg::ControlMessage& request);
/// Send status message to the specified `request` to the broker session
/// under testing as if it had been received from a broker.
void sendStatus(const bmqp_ctrlmsg::ControlMessage& request);
/// Check that a request of the specified `requestType` is sent to the
/// channel and return the request object.
bmqp_ctrlmsg::ControlMessage
getNextOutboundRequest(const RequestType requestType);
/// Check that a request of the specified `requestType` is sent to the
/// channel and return the request ID.
int verifyRequestSent(const RequestType requestType);
/// Check that a close queue request is sent to the
/// channel and return the request object
bmqp_ctrlmsg::ControlMessage verifyCloseRequestSent(bool isFinal);
/// Return `true` if a session event of the specified `eventType` and
/// with the specified `eventStatus` was generated by the session object
/// under the test, `false` otherwise.
bool verifyOperationResult(const bmqt::SessionEventType::Enum eventType,
int eventStatus);
/// Check that a session event of the `e_QUEUE_OPEN_RESULT` and with the
/// specified `eventStatus` was generated by the session object under
/// the test. Check that there are no more session events. Check that
/// the queue comes into the specified `queueState`.
void verifyOpenQueueErrorResult(
const bmqp_ctrlmsg::StatusCategory::Value eventStatus,
const bsl::shared_ptr<bmqimp::Queue>& queue,
const bmqimp::QueueState::Enum queueState);
/// Check that a session event of the `e_QUEUE_CLOSE_RESULT` and with
/// the specified `eventStatus` was generated by the session object
/// under the test. Also check that there are no more session events
/// and that the queue is set to specified state.
void verifyCloseQueueResult(
const bmqp_ctrlmsg::StatusCategory::Value status,
const bsl::shared_ptr<bmqimp::Queue>& queue,
const bmqimp::QueueState::Enum state = bmqimp::QueueState::e_CLOSED);
/// Called when the test channel is closed and provides the specified
/// close `status`.
void onChannelClose(const bmqio::Status& status);
/// Wait until stateCb is called informing about reaching the STOPPED
/// state. Must be called after `session.stop()` call. Return false
/// in case of internal timeout.
bool verifySessionIsStopped();
/// A callback provided to the BrokerSession to be called when the
/// session switches from the specified `oldState` to the specified
/// `newState` upon the specified `event`.
int stateCb(bmqimp::BrokerSession::State::Enum oldState,
bmqimp::BrokerSession::State::Enum newState,
bmqimp::BrokerSession::FsmEvent::Enum event);
/// Advance the test time source for the specified `step`. Drain the
/// session FSM event queue before changing the time.
void advanceTime(const bsls::TimeInterval& step);
/// Set the specified `writeStatus` to the test network channel and
/// notify the BrokerSession about channel LWM.
void setChannelLowWaterMark(bmqio::StatusCategory::Enum writeStatus);
};
// CLASS DATA
const bsls::TimeInterval TestSession::k_EVENT_TIMEOUT(30);
const bsls::TimeInterval TestSession::k_TIME_SOURCE_STEP(0.001);
void TestSession::advanceTime(const bsls::TimeInterval& step)
{
// Before changing the time drain the FSM queue to verify that any ongoing
// request is completed. To achieve that call 'session._synchronize()'.
// By return of this call the FSM queue is expected to be empty and there
// should be no in progress requests.
int rc = session()._synchronize();
BMQTST_ASSERT(rc);
if (!d_testClock_p) {
// System time source is used. Cannot advance time manually.
return; // RETURN
}
d_testClock_p->d_timeSource.advanceTime(step);
bslmt::Semaphore sem;
typedef void (bslmt::Semaphore::*PostFn)();
d_scheduler.scheduleEvent(
d_testClock_p->d_timeSource.now(),
bdlf::BindUtil::bind(static_cast<PostFn>(&bslmt::Semaphore::post),
&sem));
sem.wait();
}
bsl::shared_ptr<bmqimp::Event> TestSession::getInboundEvent()
{
bsl::shared_ptr<bmqimp::Event> event;
if (d_brokerSession.isUsingSessionEventHandler()) {
d_eventQueue.popFront(&event);
}
else {
event = d_brokerSession.nextEvent(bsls::TimeInterval(5));
}
return event;
}
TestSession::TestSession(const bmqt::SessionOptions& sessionOptions,
bdlmt::EventScheduler& scheduler,
bool useEventHandler,
bslma::Allocator* allocator)
: d_allocator_p(allocator)
, d_blobBufferFactory(1024, d_allocator_p)
, d_blobSpPool_sp(
bmqp::BlobPoolUtil::createBlobPool(&d_blobBufferFactory,
bmqtst::TestHelperUtil::allocator()))
, d_scheduler(scheduler)
, d_testChannel(d_allocator_p)
, d_eventQueue(bsls::SystemClockType::e_MONOTONIC, d_allocator_p)
, d_brokerSession(&d_scheduler,
&d_blobBufferFactory,
d_blobSpPool_sp.get(),
sessionOptions,
useEventHandler
? bdlf::BindUtil::bind(&sessionEventHandler,
&d_eventQueue,
bdlf::PlaceHolders::_1) // event
: bmqimp::EventQueue::EventHandlerCallback(),
bdlf::MemFnUtil::memFn(&TestSession::stateCb, this),
d_allocator_p)
, d_onChannelCloseHandler(d_testChannel.onClose(
bdlf::MemFnUtil::memFn(&TestSession::onChannelClose, this)))
, d_testClock_p(0)
, d_startCounter(0)
, d_stopCounter(0)
{
// Set peer uri for the sake of better logging
d_testChannel.setPeerUri("tcp://testHost:1234");
int rc = d_scheduler.start();
BMQTST_ASSERT_EQ(rc, 0);
}
TestSession::TestSession(const bmqt::SessionOptions& sessionOptions,
TestClock& testClock,
bool useEventHandler,
bslma::Allocator* allocator)
: d_allocator_p(allocator)
, d_blobBufferFactory(1024, d_allocator_p)
, d_blobSpPool_sp(
bmqp::BlobPoolUtil::createBlobPool(&d_blobBufferFactory,
bmqtst::TestHelperUtil::allocator()))
, d_scheduler(testClock.d_scheduler)
, d_testChannel(d_allocator_p)
, d_eventQueue(bsls::SystemClockType::e_MONOTONIC, d_allocator_p)
, d_brokerSession(&d_scheduler,
&d_blobBufferFactory,
d_blobSpPool_sp.get(),
sessionOptions,
useEventHandler
? bdlf::BindUtil::bind(&sessionEventHandler,
&d_eventQueue,
bdlf::PlaceHolders::_1) // event
: bmqimp::EventQueue::EventHandlerCallback(),
bdlf::MemFnUtil::memFn(&TestSession::stateCb, this),
d_allocator_p)
, d_onChannelCloseHandler(d_testChannel.onClose(
bdlf::MemFnUtil::memFn(&TestSession::onChannelClose, this)))
, d_testClock_p(&testClock)
, d_startCounter(0)
, d_stopCounter(0)
{
// Set peer uri for the sake of better logging
d_testChannel.setPeerUri("tcp://testHost:1234");
bmqsys::Time::shutdown();
bmqsys::Time::initialize(
bdlf::BindUtil::bind(&TestClock::realtimeClock, d_testClock_p),
bdlf::BindUtil::bind(&TestClock::monotonicClock, d_testClock_p),
bdlf::BindUtil::bind(&TestClock::highResTimer, d_testClock_p),
d_allocator_p);
int rc = d_scheduler.start();
BMQTST_ASSERT_EQ(rc, 0);
}
TestSession::~TestSession()
{
d_scheduler.cancelAllEventsAndWait();
d_scheduler.stop();
d_onChannelCloseHandler.disconnect();
}
// ACCESSORS
bmqimp::BrokerSession& TestSession::session()
{
return d_brokerSession;
}
bmqio::TestChannel& TestSession::channel()
{
return d_testChannel;
}
bslma::Allocator* TestSession::allocator()
{
return d_allocator_p;
}
bmqp::BlobPoolUtil::BlobSpPool& TestSession::blobSpPool()
{
return *d_blobSpPool_sp;
}
// MANIPULATORS
void TestSession::startAndConnect(bool expectHostUnhealthy)
{
PVVV_SAFE("Starting session...");
d_startCounter = 0;
d_stopCounter = 0;
int rc = session().startAsync();
BMQTST_ASSERT_EQ(rc, 0);
BMQTST_ASSERT_EQ(d_startCounter, 1);
BMQTST_ASSERT_EQ(session().state(),
bmqimp::BrokerSession::State::e_STARTING);
PVV_SAFE("Session started...");
// Enable test channel
PV_SAFE("Set channel");
setChannel();
if (expectHostUnhealthy) {
BMQTST_ASSERT(waitHostUnhealthyEvent());
}
BMQTST_ASSERT(waitConnectedEvent());
PVVV_SAFE("Channel connected");
}
bool TestSession::stop()
{
session().stop();
bsl::shared_ptr<bmqimp::Event> event;
d_eventQueue.popFront(&event);
if (!event) {
PV_SAFE("No e_DISCONNECTED event");
return false; // RETURN
}
if (event->sessionEventType() != bmqt::SessionEventType::e_DISCONNECTED) {
PV_SAFE("Unexpected DISCONNECTED type: " << event->sessionEventType());
return false; // RETURN
}
if (event->statusCode() != bmqp_ctrlmsg::StatusCategory::E_SUCCESS) {
PV_SAFE("Unexpected DISCONNECTED status: " << event->statusCode());
return false; // RETURN
}
if (!verifySessionIsStopped()) {
PV_SAFE("Unexpected state: " << d_brokerSession.state());
return false; // RETURN
}
if (!checkNoEvent()) {
return false; // RETURN