Skip to content

Commit 79e4b35

Browse files
authored
Merge pull request #47029 from makortel/waitingTaskWithArenaHolder
Use `WaitingTaskHolder` to signal `doneWaiting()` instead of `WaitingTaskWithArenaHolder` in framework
2 parents f4e1712 + e77e887 commit 79e4b35

29 files changed

+106
-141
lines changed

FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h

+2-3
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,8 @@ namespace edm {
4040
// eventually intend for the task to be spawned.
4141
explicit WaitingTaskWithArenaHolder(oneapi::tbb::task_group&, WaitingTask* iTask);
4242

43-
// Takes ownership of the underlying task and uses the current
44-
// arena.
45-
explicit WaitingTaskWithArenaHolder(WaitingTaskHolder&& iTask);
43+
// Captures the current arena.
44+
explicit WaitingTaskWithArenaHolder(WaitingTaskHolder iTask);
4645

4746
~WaitingTaskWithArenaHolder();
4847

FWCore/Concurrency/src/WaitingTaskWithArenaHolder.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ namespace edm {
2525
m_task->increment_ref_count();
2626
}
2727

28-
WaitingTaskWithArenaHolder::WaitingTaskWithArenaHolder(WaitingTaskHolder&& iTask)
28+
WaitingTaskWithArenaHolder::WaitingTaskWithArenaHolder(WaitingTaskHolder iTask)
2929
: m_task(iTask.release_no_decrement()),
3030
m_group(iTask.group()),
3131
m_arena(std::make_shared<oneapi::tbb::task_arena>(oneapi::tbb::task_arena::attach())) {}

FWCore/Framework/interface/CallbackExternalWork.h

+23-25
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,9 @@ namespace edm {
114114
WaitingTaskHolder produceTask =
115115
Base::makeProduceTask(group, token, record, es, emitPostPrefetchingSignal, std::move(produceFunctor));
116116

117-
WaitingTaskWithArenaHolder waitingTaskWithArenaHolder =
118-
makeExceptionHandlerTask(std::move(produceTask), group);
117+
WaitingTaskHolder waitingTaskHolder = makeExceptionHandlerTask(std::move(produceTask), group);
119118

120-
return makeAcquireTask(std::move(waitingTaskWithArenaHolder), group, token, record, es);
119+
return makeAcquireTask(std::move(waitingTaskHolder), group, token, record, es);
121120
},
122121
std::move(iTask),
123122
iRecord,
@@ -134,15 +133,15 @@ namespace edm {
134133
const TDecorator& iDec = TDecorator())
135134
: Base(iProd, std::move(iProduceFunc), iID, iDec), acquireFunction_(std::move(iAcquireFunc)) {}
136135

137-
WaitingTaskHolder makeAcquireTask(WaitingTaskWithArenaHolder waitingTaskWithArenaHolder,
136+
WaitingTaskHolder makeAcquireTask(WaitingTaskHolder waitingTaskHolder,
138137
oneapi::tbb::task_group* group,
139138
ServiceWeakToken const& serviceToken,
140139
EventSetupRecordImpl const* record,
141140
EventSetupImpl const* eventSetupImpl) {
142141
return WaitingTaskHolder(
143142
*group,
144143
make_waiting_task(
145-
[this, holder = std::move(waitingTaskWithArenaHolder), group, serviceToken, record, eventSetupImpl](
144+
[this, holder = std::move(waitingTaskHolder), group, serviceToken, record, eventSetupImpl](
146145
std::exception_ptr const* iException) mutable {
147146
std::exception_ptr excptr;
148147
if (iException) {
@@ -191,7 +190,7 @@ namespace edm {
191190
ESModuleCallingContext const& context_;
192191
};
193192
EndGuard guard(record, context);
194-
acquireCache_ = (*acquireFunction_)(rec, holder);
193+
acquireCache_ = (*acquireFunction_)(rec, WaitingTaskWithArenaHolder(holder));
195194
});
196195
} catch (cms::Exception& iException) {
197196
iException.addContext("Running acquire");
@@ -202,25 +201,24 @@ namespace edm {
202201
}));
203202
}
204203

205-
WaitingTaskWithArenaHolder makeExceptionHandlerTask(WaitingTaskHolder produceTask,
206-
oneapi::tbb::task_group* group) {
207-
return WaitingTaskWithArenaHolder(*group,
208-
make_waiting_task([this, produceTask = std::move(produceTask)](
209-
std::exception_ptr const* iException) mutable {
210-
std::exception_ptr excptr;
211-
if (iException) {
212-
excptr = *iException;
213-
}
214-
if (excptr) {
215-
try {
216-
convertException::wrap([excptr]() { std::rethrow_exception(excptr); });
217-
} catch (cms::Exception& exception) {
218-
exception.addContext("Running acquire and external work");
219-
edm::exceptionContext(exception, Base::callingContext());
220-
produceTask.doneWaiting(std::current_exception());
221-
}
222-
}
223-
}));
204+
WaitingTaskHolder makeExceptionHandlerTask(WaitingTaskHolder produceTask, oneapi::tbb::task_group* group) {
205+
return WaitingTaskHolder(*group,
206+
make_waiting_task([this, produceTask = std::move(produceTask)](
207+
std::exception_ptr const* iException) mutable {
208+
std::exception_ptr excptr;
209+
if (iException) {
210+
excptr = *iException;
211+
}
212+
if (excptr) {
213+
try {
214+
convertException::wrap([excptr]() { std::rethrow_exception(excptr); });
215+
} catch (cms::Exception& exception) {
216+
exception.addContext("Running acquire and external work");
217+
edm::exceptionContext(exception, Base::callingContext());
218+
produceTask.doneWaiting(std::current_exception());
219+
}
220+
}
221+
}));
224222
}
225223

226224
std::shared_ptr<TAcquireFunc> acquireFunction_;

FWCore/Framework/interface/StreamSchedule.h

+2
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@
9090
#include "FWCore/Utilities/interface/propagate_const.h"
9191
#include "FWCore/Utilities/interface/thread_safety_macros.h"
9292

93+
#include "oneapi/tbb/task_arena.h"
94+
9395
#include <exception>
9496
#include <map>
9597
#include <memory>

FWCore/Framework/interface/global/EDFilterBase.h

+2-6
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ namespace edm {
3737
class StreamID;
3838
class ActivityRegistry;
3939
class ThinnedAssociationsHelper;
40-
class WaitingTaskWithArenaHolder;
4140
class EventForTransformer;
4241
class ServiceWeakToken;
4342

@@ -75,10 +74,7 @@ namespace edm {
7574

7675
private:
7776
bool doEvent(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*);
78-
void doAcquire(EventTransitionInfo const&,
79-
ActivityRegistry*,
80-
ModuleCallingContext const*,
81-
WaitingTaskWithArenaHolder&);
77+
void doAcquire(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*, WaitingTaskHolder&&);
8278
void doTransformAsync(WaitingTaskHolder iTask,
8379
size_t iTransformIndex,
8480
EventPrincipal const& iEvent,
@@ -169,7 +165,7 @@ namespace edm {
169165
virtual bool hasAcquire() const noexcept { return false; }
170166
bool hasAccumulator() const noexcept { return false; }
171167

172-
virtual void doAcquire_(StreamID, Event const&, edm::EventSetup const&, WaitingTaskWithArenaHolder&);
168+
virtual void doAcquire_(StreamID, Event const&, edm::EventSetup const&, WaitingTaskHolder&&);
173169

174170
void setModuleDescription(ModuleDescription const& md) { moduleDescription_ = md; }
175171
ModuleDescription moduleDescription_;

FWCore/Framework/interface/global/EDProducerBase.h

+2-6
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ namespace edm {
3838
class GlobalSchedule;
3939
class ActivityRegistry;
4040
class ThinnedAssociationsHelper;
41-
class WaitingTaskWithArenaHolder;
4241
class EventForTransformer;
4342
class ServiceWeakToken;
4443

@@ -78,10 +77,7 @@ namespace edm {
7877

7978
private:
8079
bool doEvent(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*);
81-
void doAcquire(EventTransitionInfo const&,
82-
ActivityRegistry*,
83-
ModuleCallingContext const*,
84-
WaitingTaskWithArenaHolder&);
80+
void doAcquire(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*, WaitingTaskHolder&&);
8581
void doTransformAsync(WaitingTaskHolder iTask,
8682
size_t iTransformIndex,
8783
EventPrincipal const& iEvent,
@@ -173,7 +169,7 @@ namespace edm {
173169

174170
virtual bool hasAcquire() const noexcept { return false; }
175171

176-
virtual void doAcquire_(StreamID, Event const&, edm::EventSetup const&, WaitingTaskWithArenaHolder&);
172+
virtual void doAcquire_(StreamID, Event const&, edm::EventSetup const&, WaitingTaskHolder&&);
177173

178174
void setModuleDescription(ModuleDescription const& md) { moduleDescription_ = md; }
179175
ModuleDescription moduleDescription_;

FWCore/Framework/interface/global/OutputModuleBase.h

+2-5
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,7 @@ namespace edm {
5959
void doEndStream(StreamID id) { doEndStream_(id); }
6060

6161
bool doEvent(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*);
62-
void doAcquire(EventTransitionInfo const&,
63-
ActivityRegistry*,
64-
ModuleCallingContext const*,
65-
WaitingTaskWithArenaHolder&);
62+
void doAcquire(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*, WaitingTaskHolder&&);
6663
//For now this is a placeholder
6764
/*virtual*/ void preActionBeforeRunEventAsync(WaitingTaskHolder iTask,
6865
ModuleCallingContext const& iModuleCallingContext,
@@ -86,7 +83,7 @@ namespace edm {
8683
virtual void doEndRunSummary_(RunForOutput const&, EventSetup const&) {}
8784
virtual void doBeginLuminosityBlockSummary_(LuminosityBlockForOutput const&, EventSetup const&) {}
8885
virtual void doEndLuminosityBlockSummary_(LuminosityBlockForOutput const&, EventSetup const&) {}
89-
virtual void doAcquire_(StreamID, EventForOutput const&, WaitingTaskWithArenaHolder&) {}
86+
virtual void doAcquire_(StreamID, EventForOutput const&, WaitingTaskHolder&&) {}
9087

9188
virtual bool hasAcquire() const noexcept { return false; }
9289
};

FWCore/Framework/interface/global/implementors.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@
4848
// forward declarations
4949
namespace edm {
5050

51-
class WaitingTaskWithArenaHolder;
5251
class ServiceWeakToken;
5352
class ActivityRegistry;
53+
class WaitingTaskWithArenaHolder;
5454

5555
namespace global {
5656
namespace impl {
@@ -436,7 +436,7 @@ namespace edm {
436436
private:
437437
bool hasAcquire() const noexcept override { return true; }
438438

439-
void doAcquire_(StreamID, Event const&, edm::EventSetup const&, WaitingTaskWithArenaHolder&) final;
439+
void doAcquire_(StreamID, Event const&, edm::EventSetup const&, WaitingTaskHolder&&) final;
440440

441441
virtual void acquire(StreamID, Event const&, edm::EventSetup const&, WaitingTaskWithArenaHolder) const = 0;
442442
};

FWCore/Framework/interface/global/outputmoduleAbilityToImplementor.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,8 @@ namespace edm {
141141
private:
142142
bool hasAcquire() const noexcept override { return true; }
143143

144-
void doAcquire_(StreamID id, EventForOutput const& event, WaitingTaskWithArenaHolder& holder) final {
145-
acquire(id, event, holder);
144+
void doAcquire_(StreamID id, EventForOutput const& event, WaitingTaskHolder&& holder) final {
145+
acquire(id, event, WaitingTaskWithArenaHolder(std::move(holder)));
146146
}
147147

148148
virtual void acquire(StreamID, EventForOutput const&, WaitingTaskWithArenaHolder) const = 0;

FWCore/Framework/interface/maker/Worker.h

+13-14
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ the worker is reset().
3232
#include "FWCore/Framework/interface/ProductResolverIndexAndSkipBit.h"
3333
#include "FWCore/Concurrency/interface/WaitingTask.h"
3434
#include "FWCore/Concurrency/interface/WaitingTaskHolder.h"
35-
#include "FWCore/Concurrency/interface/WaitingTaskWithArenaHolder.h"
3635
#include "FWCore/Concurrency/interface/WaitingTaskList.h"
3736
#include "FWCore/MessageLogger/interface/MessageLogger.h"
3837
#include "FWCore/ServiceRegistry/interface/ActivityRegistry.h"
@@ -267,9 +266,7 @@ namespace edm {
267266
virtual void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const = 0;
268267
virtual bool implNeedToRunSelection() const noexcept = 0;
269268

270-
virtual void implDoAcquire(EventTransitionInfo const&,
271-
ModuleCallingContext const*,
272-
WaitingTaskWithArenaHolder&) = 0;
269+
virtual void implDoAcquire(EventTransitionInfo const&, ModuleCallingContext const*, WaitingTaskHolder&&) = 0;
273270

274271
virtual void implDoTransformAsync(WaitingTaskHolder,
275272
size_t iTransformIndex,
@@ -394,12 +391,14 @@ namespace edm {
394391
ParentContext const&,
395392
typename T::Context const*) noexcept;
396393

397-
void runAcquire(EventTransitionInfo const&, ParentContext const&, WaitingTaskWithArenaHolder&);
394+
// runAcquire() must take a copy of WaitingTaskHolder
395+
// see comment in runAcquireAfterAsyncPrefetch() definition
396+
void runAcquire(EventTransitionInfo const&, ParentContext const&, WaitingTaskHolder);
398397

399398
void runAcquireAfterAsyncPrefetch(std::exception_ptr,
400399
EventTransitionInfo const&,
401400
ParentContext const&,
402-
WaitingTaskWithArenaHolder) noexcept;
401+
WaitingTaskHolder) noexcept;
403402

404403
std::exception_ptr handleExternalWorkException(std::exception_ptr iEPtr,
405404
ParentContext const& parentContext) noexcept;
@@ -519,7 +518,7 @@ namespace edm {
519518
typename T::TransitionInfoType const&,
520519
ServiceToken const&,
521520
ParentContext const&,
522-
WaitingTaskWithArenaHolder) noexcept {}
521+
WaitingTaskHolder) noexcept {}
523522
void execute() final {}
524523
};
525524

@@ -530,7 +529,7 @@ namespace edm {
530529
EventTransitionInfo const& eventTransitionInfo,
531530
ServiceToken const& token,
532531
ParentContext const& parentContext,
533-
WaitingTaskWithArenaHolder holder) noexcept
532+
WaitingTaskHolder holder) noexcept
534533
: m_worker(worker),
535534
m_eventTransitionInfo(eventTransitionInfo),
536535
m_parentContext(parentContext),
@@ -545,7 +544,7 @@ namespace edm {
545544
// to hold the exception_ptr
546545
std::exception_ptr temp_excptr;
547546
auto excptr = exceptionPtr();
548-
// Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskWithArenaHolder
547+
// Caught exception is passed to Worker::runModuleAfterAsyncPrefetch(), which propagates it via WaitingTaskHolder
549548
CMS_SA_ALLOW try {
550549
//pre was called in prefetchAsync
551550
m_worker->emitPostModuleEventPrefetchingSignal();
@@ -563,12 +562,12 @@ namespace edm {
563562
info = m_eventTransitionInfo,
564563
parentContext = m_parentContext,
565564
serviceToken = m_serviceToken,
566-
holder = m_holder]() {
565+
holder = std::move(m_holder)]() {
567566
//Need to make the services available
568567
ServiceRegistry::Operate operateRunAcquire(serviceToken.lock());
569568

570569
std::exception_ptr ptr;
571-
worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, holder);
570+
worker->runAcquireAfterAsyncPrefetch(ptr, info, parentContext, std::move(holder));
572571
});
573572
return;
574573
}
@@ -581,7 +580,7 @@ namespace edm {
581580
Worker* m_worker;
582581
EventTransitionInfo m_eventTransitionInfo;
583582
ParentContext const m_parentContext;
584-
WaitingTaskWithArenaHolder m_holder;
583+
WaitingTaskHolder m_holder;
585584
ServiceWeakToken m_serviceToken;
586585
};
587586

@@ -1127,7 +1126,7 @@ namespace edm {
11271126
auto* group = task.group();
11281127
moduleTask = make_waiting_task(
11291128
[this, weakToken, transitionInfo, parentContext, ownRunTask, group](std::exception_ptr const* iExcept) {
1130-
WaitingTaskWithArenaHolder runTaskHolder(
1129+
WaitingTaskHolder runTaskHolder(
11311130
*group, new HandleExternalWorkExceptionTask(this, group, ownRunTask->release(), parentContext));
11321131
AcquireTask<T> t(this, transitionInfo, weakToken.lock(), parentContext, runTaskHolder);
11331132
t.execute();
@@ -1154,7 +1153,7 @@ namespace edm {
11541153
auto group = task.group();
11551154
if constexpr (T::isEvent_) {
11561155
if (hasAcquire()) {
1157-
WaitingTaskWithArenaHolder runTaskHolder(
1156+
WaitingTaskHolder runTaskHolder(
11581157
*group, new HandleExternalWorkExceptionTask(this, group, moduleTask, parentContext));
11591158
moduleTask = new AcquireTask<T>(this, transitionInfo, token, parentContext, std::move(runTaskHolder));
11601159
}

FWCore/Framework/interface/maker/WorkerT.h

+1-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ namespace edm {
2727
class ModuleProcessName;
2828
class ProductResolverIndexAndSkipBit;
2929
class ThinnedAssociationsHelper;
30-
class WaitingTaskWithArenaHolder;
3130

3231
template <typename T>
3332
class WorkerT : public Worker {
@@ -90,7 +89,7 @@ namespace edm {
9089
void itemsToGetForSelection(std::vector<ProductResolverIndexAndSkipBit>&) const final;
9190
bool implNeedToRunSelection() const noexcept final;
9291

93-
void implDoAcquire(EventTransitionInfo const&, ModuleCallingContext const*, WaitingTaskWithArenaHolder&) final;
92+
void implDoAcquire(EventTransitionInfo const&, ModuleCallingContext const*, WaitingTaskHolder&&) final;
9493

9594
size_t transformIndex(edm::BranchDescription const&) const noexcept final;
9695
void implDoTransformAsync(WaitingTaskHolder,

FWCore/Framework/interface/stream/EDFilter.h

+2-4
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828

2929
namespace edm {
3030

31-
class WaitingTaskWithArenaHolder;
32-
3331
namespace stream {
3432

3533
template <typename... T>
@@ -66,8 +64,8 @@ namespace edm {
6664
bool hasAbilityToProduceInEndLumis() const final { return HasAbilityToProduceInEndLumis<T...>::value; }
6765

6866
private:
69-
void doAcquire_(Event const& ev, EventSetup const& es, WaitingTaskWithArenaHolder& holder) final {
70-
doAcquireIfNeeded(this, ev, es, holder);
67+
void doAcquire_(Event const& ev, EventSetup const& es, WaitingTaskHolder&& holder) final {
68+
doAcquireIfNeeded(this, ev, es, std::move(holder));
7169
}
7270
};
7371

FWCore/Framework/interface/stream/EDFilterAdaptorBase.h

+1-5
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ namespace edm {
3636

3737
class ModuleCallingContext;
3838
class ActivityRegistry;
39-
class WaitingTaskWithArenaHolder;
4039

4140
namespace maker {
4241
template <typename T>
@@ -70,10 +69,7 @@ namespace edm {
7069
private:
7170
bool doEvent(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*);
7271

73-
void doAcquire(EventTransitionInfo const&,
74-
ActivityRegistry*,
75-
ModuleCallingContext const*,
76-
WaitingTaskWithArenaHolder&);
72+
void doAcquire(EventTransitionInfo const&, ActivityRegistry*, ModuleCallingContext const*, WaitingTaskHolder&&);
7773

7874
//For now this is a placeholder
7975
/*virtual*/ void preActionBeforeRunEventAsync(WaitingTaskHolder,

0 commit comments

Comments
 (0)