-
Notifications
You must be signed in to change notification settings - Fork 660
Description
Describe the bug
Calling stopDispatcherThreads() seems to act weird. It looks like it's meant to wait for all threads to orderly stop before returning, but it doesn't.
Stop method:
public void stopDispatcherThreads() {
// dispatchersToShutdown is backed by the map itself so changes in one are reflected in the other
final Collection<MessageDispatchingThread> dispatchersToShutdown = dispatchers.values();
for (final MessageDispatchingThread dispatcher : dispatchersToShutdown) {
dispatcher.stopDispatcher();
}
// wait for threads to stop
while (!dispatchersToShutdown.isEmpty()) {
try {
Thread.sleep(100);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
dispatchersToShutdown.removeIf(MessageDispatchingThread::isStopped);
}
}
Looking at
dispatcher.stopDispatcher();
it is clear that the dispatcher immediately marks itself as stopped and returns.
public void stopDispatcher() {
enqueue(END_OF_STREAM);
stopping = true;
stopped = true;
}
So the whole 'wait for threads to stop' loop makes no sense, because 'MessageDispatchingThread::isStopped' returns true immediately as it just reads the 'stopped' property, even though the thread might actually still be running.
It seems to me like someone added 'stopped = true;' to the 'stopDispatcher()' function, even though that was never originally intended. It skips a lot of stopping logic still present elsewhere in the code. Even setting 'stopping = true;' is questionable since it should be a consequence of the 'run()' loop reading the END_OF_STREAM message and not be manually set?
void doRun() {
while (!stopping) {
try {
final Message message = getNextMessage(queueTracker);
if (message == null) {
// no message available in polling interval
continue;
}
quickfixSession.next(message);
**if (message == END_OF_STREAM) {
stopping = true;**
}
Also, from the way it looks the dummy END_OF_STREAM message is send to the 'quickfixSession.next()'. Is there a reason for that? Seems odd.
To Reproduce
- Have messages in the queue. Easiest way is to slow down processing of messages in the application with a delay.
- Call stop() on the acceptor/initiator.
- after stop() returns the engine will still be processing new messages, thus making stop() non-blocking
Expected behavior
After stop() completes, the engine should do nothing anymore. Or alternatively make it clear that stop() is non-blocking and clean up the unnecessary wait loop.