Skip to content

Commit 631778a

Browse files
committed
drain on closeChannel
Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com>
1 parent 32a567a commit 631778a

2 files changed

Lines changed: 28 additions & 27 deletions

File tree

src/groups/mqb/mqbnet/mqbnet_channel.cpp

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,11 @@ Channel::Channel(bdlbb::BlobBufferFactory* blobBufferFactory,
8181
d_allocators.get("ItemPool"))
8282
, d_buffer(1024, allocator)
8383
, d_doStop(false)
84-
, d_state(e_INITIAL)
84+
, d_state(e_RESET)
8585
, d_description(name + " - ", d_allocator_p)
8686
, d_name(name, d_allocator_p)
8787
, d_stats()
88+
, d_isClosing(false)
8889
{
8990
bslmt::ThreadAttributes attr = bmqsys::ThreadUtil::defaultAttributes();
9091
bsl::string threadName("bmqNet-");
@@ -261,19 +262,14 @@ void Channel::resetChannel()
261262

262263
void Channel::closeChannel()
263264
{
264-
bsl::shared_ptr<bmqio::Channel> channel;
265-
{
266-
bslmt::LockGuard<bslmt::Mutex> guard(&d_mutex); // LOCK
267-
channel = d_channel_wp.lock();
268-
} // UNLOCK
265+
d_isClosing.store(true);
269266

270-
if (BSLS_PERFORMANCEHINT_PREDICT_LIKELY(channel)) {
271-
channel->close();
272-
// Set the state to e_CLOSE to avoid repeated attempts to write until
273-
// 'OnClose' calls 'resetChannel'.
267+
// No need to 'signal' to interrupt the waiting.
268+
// Waiting means there is no connection and therefore nothing to close.
274269

275-
d_state.testAndSwap(e_READY, e_CLOSE);
276-
}
270+
wakeUp();
271+
272+
// do not wait for all items to drain.
277273
}
278274

279275
void Channel::setChannel(const bsl::weak_ptr<bmqio::Channel>& value)
@@ -747,7 +743,7 @@ void Channel::threadFn()
747743
bsl::string description;
748744
int mode = e_BLOCK;
749745

750-
BSLS_ASSERT(d_state == e_INITIAL || d_state == e_RESET);
746+
BSLS_ASSERT(d_state == e_RESET);
751747

752748
while (!d_doStop) {
753749
bmqc::MonitoredQueueState::Enum queueState;
@@ -796,17 +792,14 @@ void Channel::threadFn()
796792
item.reset();
797793
reset();
798794

799-
d_state = e_INITIAL;
800795
mode = e_BLOCK;
801-
}
796+
d_isClosing.store(false);
802797

803-
if (d_state == e_INITIAL) {
804798
channel = d_channel_wp.lock();
805799
description = d_description;
806800

807801
if (channel) {
808-
// This is the only place for transitions:
809-
// e_IDLE -> e_READY
802+
// This is the only place for the transition
810803
// e_RESET -> e_READY
811804
d_state = e_READY;
812805
}
@@ -856,6 +849,13 @@ void Channel::threadFn()
856849
// BLOCK mode and wait for the next batch of items.
857850
mode = e_BLOCK;
858851
}
852+
// if draining, this is where it stops.
853+
// Does not matter if 'flushAll' has failed; must close
854+
if (d_isClosing) {
855+
channel->close();
856+
d_state.testAndSwap(e_READY, e_CLOSE);
857+
// bmqio::Channel observer will trigger 'resetChannel'
858+
}
859859
} break;
860860
default: {
861861
BSLS_ASSERT(false && "Unreachable by design");

src/groups/mqb/mqbnet/mqbnet_channel.h

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
// Internally, 'mqbnet::Channel' starts a new thread and unconditionally
4949
// buffers everything it needs to write into single consumer queue. The thread
5050
// reads the channel state which can be
51-
// - e_INITIAL, not connected
5251
// - e_RESET, indicating a need to reset because of a connection change;
5352
// - e_READY, low-watermark
5453
// - e_HWM, high-watermark
@@ -349,17 +348,15 @@ class Channel {
349348
public:
350349
// PUBLIC TYPES
351350
enum EnumState {
352-
/// Not connected
353-
e_INITIAL = 0,
354351
/// Need resetting because of a connection change
355-
e_RESET = 1,
352+
e_RESET = 0,
356353
/// Between 'Channel::close; and 'resetChannel'
357-
e_CLOSE = 2,
358-
e_READY = 3,
354+
e_CLOSE = 1,
355+
e_READY = 2,
359356
/// LWM
360-
e_LWM = 4,
357+
e_LWM = 3,
361358
/// HWM
362-
e_HWM = 5
359+
e_HWM = 4
363360
};
364361

365362
private:
@@ -422,6 +419,10 @@ class Channel {
422419
// in the internal thread.
423420
Stats d_stats;
424421

422+
/// Indicates graceful shutdown. Drain the buffer if possible and then
423+
/// close the channel.
424+
bsls::AtomicBool d_isClosing;
425+
425426
private:
426427
// NOT IMPLEMENTED
427428
Channel(const Channel&) BSLS_CPP11_DELETED;
@@ -1085,7 +1086,7 @@ Channel::enqueue(bslma::ManagedPtr<Item>& item)
10851086
// ACCESSORS
10861087
inline bool Channel::isAvailable() const
10871088
{
1088-
return d_state != e_INITIAL && d_state != e_RESET && d_state != e_CLOSE;
1089+
return d_state != e_RESET && d_state != e_CLOSE && !d_isClosing;
10891090
}
10901091

10911092
inline const bsl::shared_ptr<bmqio::Channel> Channel::channel() const

0 commit comments

Comments
 (0)