Skip to content

Commit 43bd063

Browse files
author
Veronica Zheng
committed
Merge pull request #845 from leapmotion/ref-parallel
Do not hold lock under notification where possible
2 parents daee07b + ed1a64b commit 43bd063

17 files changed

+126
-114
lines changed

autowiring/DispatchQueue.h

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -363,9 +363,9 @@ class DispatchQueue {
363363
// Create the thunk first to reduce the amount of time we spend in lock:
364364
auto thunk = new DispatchThunk<_Fx>(std::forward<_Fx>(fx));
365365

366-
std::unique_lock<std::mutex> lk(m_dispatchLock);
366+
m_dispatchLock.lock();
367367
if (m_count >= m_dispatchCap) {
368-
lk.unlock();
368+
m_dispatchLock.unlock();
369369
delete thunk;
370370
return;
371371
}
@@ -374,15 +374,18 @@ class DispatchQueue {
374374
m_count++;
375375

376376
// Linked list setup:
377-
if (m_pHead)
378-
m_pTail->m_pFlink = thunk;
377+
if (m_pHead) {
378+
m_pTail->m_pFlink = thunk;
379+
m_pTail = thunk;
380+
m_dispatchLock.unlock();
381+
}
379382
else {
380-
m_pHead = thunk;
383+
m_pHead = m_pTail = thunk;
384+
m_dispatchLock.unlock();
381385
m_queueUpdated.notify_all();
382386
}
383-
m_pTail = thunk;
384387

385388
// Notification as needed:
386-
OnPended(std::move(lk));
389+
OnPended(std::unique_lock<std::mutex>{});
387390
}
388391
};

src/autonet/AutoNetServerImpl.cpp

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,15 @@ AutoNetServerImpl::AutoNetServerImpl(void):
2424

2525
AutoNetServerImpl::AutoNetServerImpl(std::unique_ptr<AutoNetTransport>&& transport) :
2626
m_transport(std::move(transport))
27-
{
27+
{
2828
// Register internal event handlers
2929
AddEventHandler("terminateContext", [this] (int contextID) {
3030
ResolveContextID(contextID)->SignalShutdown();
3131
});
32-
32+
3333
AddEventHandler("injectContextMember", [this] (int contextID, const std::string& typeName){
3434
std::shared_ptr<CoreContext> ctxt = ResolveContextID(contextID)->shared_from_this();
35-
35+
3636
if(m_AllTypes.find(typeName) != m_AllTypes.end()) {
3737
CurrentContextPusher pshr(ctxt);
3838
m_AllTypes[typeName]();
@@ -42,11 +42,11 @@ AutoNetServerImpl::AutoNetServerImpl(std::unique_ptr<AutoNetTransport>&& transpo
4242
assert(false);
4343
}
4444
});
45-
45+
4646
AddEventHandler("resumeFromBreakpoint", [this] (const std::string& name){
47-
std::lock_guard<std::mutex> lk(m_breakpoint_mutex);
48-
47+
std::lock_guard<std::mutex>{ m_breakpoint_mutex },
4948
m_breakpoints.erase(name);
49+
5050
m_breakpoint_cv.notify_all();
5151
});
5252

@@ -73,7 +73,7 @@ AutoNetServer* NewAutoNetServerImpl(void) {
7373
// CoreThread overrides
7474
void AutoNetServerImpl::Run(void){
7575
std::cout << "Starting Autonet server..." << std::endl;
76-
76+
7777
// Register ourselves as a handler
7878
m_transport->SetTransportHandler(std::static_pointer_cast<AutoNetServerImpl>(shared_from_this()));
7979

@@ -110,7 +110,7 @@ void AutoNetServerImpl::OnMessage(AutoNetTransportHandler::connection_hdl hdl, c
110110

111111
std::string msgType = msg["type"].string_value();
112112
Json::array msgArgs = msg["args"].array_items();
113-
113+
114114
// Handle client specific internal events
115115
if (msgType == "subscribe") {
116116
*this += [this, hdl] {
@@ -130,7 +130,7 @@ void AutoNetServerImpl::OnMessage(AutoNetTransportHandler::connection_hdl hdl, c
130130
for (const auto& a : msgArgs) {
131131
args.push_back(!a.string_value().empty() ? a.string_value() : a.dump());
132132
}
133-
133+
134134
// call all the handlers
135135
for (const auto& handler : this->m_handlers[msgType]) {
136136
handler(args);
@@ -270,19 +270,19 @@ void AutoNetServerImpl::SendEvent(const std::string& rawEvent, const std::vector
270270
// Prepend '$' to custum event to avoid namespace collitions with internal events
271271
std::string event("$");
272272
event.append(rawEvent);
273-
273+
274274
Json::array jsonArgs;
275275
for (const auto& a : args) {
276276
jsonArgs.push_back(a);
277277
}
278-
278+
279279
*this += [this, event, jsonArgs] {
280280
for(auto hdl : m_Subscribers) {
281281
Json msg = Json::object{
282282
{"type", event},
283283
{"args", jsonArgs}
284284
};
285-
285+
286286
m_transport->Send(hdl, msg.dump());
287287
}
288288
};

src/autowiring/AutoPacket.cpp

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ AutoPacket::~AutoPacket(void) {
3535
std::chrono::high_resolution_clock::now() - m_initTime
3636
)
3737
);
38-
38+
3939
// Mark decorations of successor packets that use decorations
4040
// originating from this packet as unsatisfiable
4141
for (auto& pair : m_decoration_map)
@@ -44,15 +44,15 @@ AutoPacket::~AutoPacket(void) {
4444

4545
// Needed for the AutoPacketGraph
4646
NotifyTeardownListeners();
47-
47+
4848
// Create vector of all successor packets that will be destroyed
4949
// This prevents recursive AutoPacket destructor calls
5050
std::vector<std::shared_ptr<AutoPacket>> packets;
51-
51+
5252
// Recurse through unique successors, storing them in our vector
5353
for (AutoPacket* current = this; current->m_successor.unique();) {
5454
packets.push_back(current->m_successor);
55-
55+
5656
// Reset and continue to next successor
5757
AutoPacket* prev_current = current;
5858
current = current->m_successor.get();
@@ -249,14 +249,14 @@ void AutoPacket::MarkUnsatisfiable(const DecorationKey& key) {
249249

250250
void AutoPacket::MarkSuccessorsUnsatisfiable(DecorationKey key) {
251251
std::lock_guard<std::mutex> lk(m_lock);
252-
252+
253253
// Update key and successor
254254
key.tshift++;
255255
auto successor = SuccessorUnsafe();
256-
256+
257257
while (m_decoration_map.count(key)) {
258258
successor->MarkUnsatisfiable(key);
259-
259+
260260
// Update key and successor
261261
key.tshift++;
262262
successor = successor->Successor();
@@ -661,12 +661,16 @@ bool AutoPacket::Wait(std::condition_variable& cv, const AutoFilterArgument* inp
661661
SignalStub* stub = (SignalStub*)pObj;
662662

663663
// Completed, mark the output as satisfied and update the condition variable
664-
std::lock_guard<std::mutex>(stub->packet.m_lock);
665-
stub->is_satisfied = true;
664+
std::condition_variable* pcv;
665+
{
666+
std::lock_guard<std::mutex>{stub->packet.m_lock};
667+
stub->is_satisfied = true;
668+
pcv = stub->cv;
669+
}
666670

667671
// Only notify while the condition variable is still valid
668-
if (stub->cv)
669-
stub->cv->notify_all();
672+
if (pcv)
673+
pcv->notify_all();
670674
}
671675
)
672676
)
@@ -676,12 +680,16 @@ bool AutoPacket::Wait(std::condition_variable& cv, const AutoFilterArgument* inp
676680
// decorations. In that case, the satisfaction flag is left in its initial state
677681
AddTeardownListener(
678682
[stub] {
679-
std::lock_guard<std::mutex>(stub->packet.m_lock);
680-
stub->is_complete = true;
683+
std::condition_variable* pcv;
684+
{
685+
std::lock_guard<std::mutex> lk(stub->packet.m_lock);
686+
stub->is_complete = true;
687+
pcv = stub->cv;
688+
}
681689

682690
// Only notify the condition variable if it's still present
683-
if (stub->cv)
684-
stub->cv->notify_all();
691+
if (pcv)
692+
pcv->notify_all();
685693
}
686694
);
687695

src/autowiring/AutoPacketFactory.cpp

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,22 @@ std::shared_ptr<AutoPacket> AutoPacketFactory::NewPacket(void) {
2222
throw autowiring_error("Attempted to create a packet on an AutoPacketFactory that was already terminated");
2323
if(!IsRunning())
2424
throw autowiring_error("Cannot create a packet until the AutoPacketFactory is started");
25-
25+
2626
std::shared_ptr<AutoPacketInternal> retVal;
2727
bool isFirstPacket;
2828
{
2929
std::lock_guard<std::mutex> lk(m_lock);
30-
30+
3131
// New packet issued
3232
isFirstPacket = !m_packetCount;
3333
++m_packetCount;
34-
34+
3535
// Create a new next packet
3636
retVal = m_nextPacket;
3737
m_nextPacket = retVal->SuccessorInternal();
3838
m_curPacket = retVal;
3939
}
40-
40+
4141
retVal->Initialize(isFirstPacket);
4242
return retVal;
4343
}
@@ -62,12 +62,12 @@ std::shared_ptr<void> AutoPacketFactory::GetInternalOutstanding(void) {
6262
retVal = std::shared_ptr<void>(
6363
(void*)1,
6464
[this, outstanding] (void*) mutable {
65-
std::lock_guard<std::mutex> lk(m_lock);
66-
m_stateCondition.notify_all();
67-
6865
// Weak pointer will prevent our lambda from being destroyed, so we manually reset
6966
// the outstanding counter in order to force it to be reset here
67+
std::lock_guard<std::mutex>{m_lock},
7068
outstanding.reset();
69+
70+
m_stateCondition.notify_all();
7171
}
7272
);
7373
m_outstandingInternal = retVal;
@@ -105,7 +105,7 @@ SatCounter* AutoPacketFactory::CreateSatCounterList(void) const {
105105
bool AutoPacketFactory::OnStart(void) {
106106
// Initialize first packet
107107
m_nextPacket = ConstructPacket();
108-
108+
109109
// Wake us up. We're starting now
110110
m_stateCondition.notify_all();
111111
return true;
@@ -114,12 +114,12 @@ bool AutoPacketFactory::OnStart(void) {
114114
void AutoPacketFactory::OnStop(bool graceful) {
115115
// Queue of local variables to be destroyed when leaving scope
116116
t_autoFilterSet autoFilters;
117-
117+
118118
// Reset next packet, it will never be issued
119119
m_nextPacket.reset();
120120

121121
// Lock destruction precedes local variables
122-
std::lock_guard<std::mutex> lk(m_lock);
122+
std::lock_guard<std::mutex>{m_lock},
123123

124124
// Same story with the AutoFilters
125125
autoFilters.swap(m_autoFilters);

src/autowiring/BasicThread.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ void BasicThread::DoRunLoopCleanup(std::shared_ptr<CoreContext>&& ctxt, std::sha
9696

9797
// Notify other threads that we are done. At this point, any held references that might still exist
9898
// notification must happen from a synchronized level in order to ensure proper ordering.
99-
std::lock_guard<std::mutex> lk(state->m_lock);
99+
std::lock_guard<std::mutex>{state->m_lock},
100100
state->m_completed = true;
101101
state->m_stateCondition.notify_all();
102102
}
@@ -114,7 +114,7 @@ void BasicThread::WaitForStateUpdate(const std::function<bool()>& fn) const {
114114
}
115115

116116
void BasicThread::PerformStatusUpdate(const std::function<void()>& fn) const {
117-
std::unique_lock<std::mutex> lk(m_state->m_lock);
117+
std::unique_lock<std::mutex>{m_state->m_lock},
118118
fn();
119119
m_state->m_stateCondition.notify_all();
120120
}

src/autowiring/CoreContext.cpp

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -426,9 +426,6 @@ void CoreContext::Initiate(void) {
426426
return;
427427
}
428428

429-
// State change has taken place, we can signal
430-
m_stateBlock->m_stateChanged.notify_all();
431-
432429
// Now we can add the event receivers we haven't been able to add because the context
433430
// wasn't yet started:
434431
AddEventReceiversUnsafe(std::move(m_delayedEventReceivers));
@@ -441,6 +438,7 @@ void CoreContext::Initiate(void) {
441438
if (!IsRunning()) {
442439
lk.unlock();
443440
onInitiated();
441+
m_stateBlock->m_stateChanged.notify_all();
444442

445443
// Need to inject a delayed context type so that this context will not be destroyed until
446444
// it has an opportunity to start.
@@ -471,6 +469,7 @@ void CoreContext::Initiate(void) {
471469
threadPool = m_threadPool;
472470
lk.unlock();
473471
onInitiated();
472+
m_stateBlock->m_stateChanged.notify_all();
474473

475474
// Start the thread pool out of the lock, and then update our start token if our thread pool
476475
// reference has not changed. The next pool could potentially be nullptr if the parent is going
@@ -539,9 +538,8 @@ void CoreContext::SignalShutdown(bool wait, ShutdownMode shutdownMode) {
539538
firstThreadToStop = m_threads.begin();
540539
if (m_beforeRunning)
541540
++firstThreadToStop;
542-
543-
m_stateBlock->m_stateChanged.notify_all();
544541
}
542+
m_stateBlock->m_stateChanged.notify_all();
545543
onShutdown();
546544

547545
// Teardown interleave assurance--all of these contexts will generally be destroyed
@@ -1083,12 +1081,12 @@ void CoreContext::TryTransitionChildrenState(void) {
10831081
auto q = child->m_threads.begin();
10841082
child->m_state = State::Running;
10851083

1086-
// Child had it's state changed
1087-
child->m_stateBlock->m_stateChanged.notify_all();
1088-
10891084
// Raise the run condition in the child
10901085
childLk.unlock();
10911086

1087+
// Child had it's state changed
1088+
child->m_stateBlock->m_stateChanged.notify_all();
1089+
10921090
auto outstanding = child->m_stateBlock->IncrementOutstandingThreadCount(child);
10931091
while (q != child->m_threads.end()) {
10941092
(*q)->Start(outstanding);
@@ -1106,6 +1104,7 @@ void CoreContext::TryTransitionChildrenState(void) {
11061104
child->m_state = State::CanRun;
11071105

11081106
// Child had it's state changed
1107+
childLk.unlock();
11091108
child->m_stateBlock->m_stateChanged.notify_all();
11101109
break;
11111110
case State::CanRun:

src/autowiring/CoreContextStateBlock.cpp

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,15 @@ RunCounter::~RunCounter(void) {
2525
owner.reset();
2626

2727
std::weak_ptr<CoreObject> outstanding;
28-
std::lock_guard<std::mutex> lk(stateBlock->m_lock);
29-
30-
// Unfortunately, this destructor callback is made before weak pointers are
31-
// invalidated, which requires that we manually reset the outstanding count
32-
// We don't want to free memory while holding the lock, so defer
33-
outstanding = std::move(stateBlock->m_outstanding);
34-
stateBlock->m_outstanding.reset();
28+
{
29+
std::lock_guard<std::mutex> lk(stateBlock->m_lock);
30+
31+
// Unfortunately, this destructor callback is made before weak pointers are
32+
// invalidated, which requires that we manually reset the outstanding count
33+
// We don't want to free memory while holding the lock, so defer
34+
outstanding = std::move(stateBlock->m_outstanding);
35+
stateBlock->m_outstanding.reset();
36+
}
3537

3638
// Wake everyone up
3739
stateBlock->m_stateChanged.notify_all();

0 commit comments

Comments
 (0)