Skip to content

Commit fc6daac

Browse files
committed
Add a unit test for stream::FixedQueueEDProducer
1 parent db12e85 commit fc6daac

File tree

3 files changed

+230
-1
lines changed

3 files changed

+230
-1
lines changed
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
#include <cassert>
2+
#include <optional>
3+
4+
#include "DataFormats/PortableTestObjects/interface/alpaka/TestDeviceCollection.h"
5+
#include "DataFormats/TestObjects/interface/ToyProducts.h"
6+
#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
7+
#include "FWCore/ParameterSet/interface/ParameterSet.h"
8+
#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
9+
#include "FWCore/Utilities/interface/InputTag.h"
10+
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/EDPutToken.h"
11+
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/ESGetToken.h"
12+
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/stream/FixedQueueEDProducer.h"
13+
#include "HeterogeneousCore/AlpakaInterface/interface/config.h"
14+
#include "HeterogeneousCore/AlpakaTest/interface/AlpakaESTestRecords.h"
15+
#include "HeterogeneousCore/AlpakaTest/interface/AlpakaESTestData.h"
16+
17+
#include "TestAlgo.h"
18+
19+
namespace ALPAKA_ACCELERATOR_NAMESPACE {
20+
/**
21+
* This class demonstrates a stream FixedQueueEDProducer that
22+
* - consumes a host EDProduct
23+
* - consumes a device ESProduct
24+
* - produces a device EDProduct (that gets transferred to host automatically if needed)
25+
* - optionally uses a product instance label
26+
*
27+
* Unlike stream::EDProducer, that uses a random device queue for every event,
28+
* stream::FixedQueueEDProducer always uses the same device queue for all the events
29+
* processed by a given framework stream.
30+
*
31+
* This module tests that the queue being used is always the same, when no device
32+
* products are consumed.
33+
*/
34+
class TestAlpakaStreamFixedQueueProducer : public stream::FixedQueueEDProducer<> {
35+
public:
36+
TestAlpakaStreamFixedQueueProducer(edm::ParameterSet const& config)
37+
: FixedQueueEDProducer<>(config),
38+
size_{config.getParameter<edm::ParameterSet>("size").getParameter<int32_t>(
39+
EDM_STRINGIZE(ALPAKA_ACCELERATOR_NAMESPACE))},
40+
size2_{config.getParameter<edm::ParameterSet>("size").getParameter<int32_t>(
41+
EDM_STRINGIZE(ALPAKA_ACCELERATOR_NAMESPACE))},
42+
size3_{config.getParameter<edm::ParameterSet>("size").getParameter<int32_t>(
43+
EDM_STRINGIZE(ALPAKA_ACCELERATOR_NAMESPACE))} {
44+
getToken_ = consumes(config.getParameter<edm::InputTag>("source"));
45+
esToken_ = esConsumes(config.getParameter<edm::ESInputTag>("eventSetupSource"));
46+
devicePutToken_ = produces(config.getParameter<std::string>("productInstanceName"));
47+
devicePutTokenMulti2_ = produces(config.getParameter<std::string>("productInstanceName"));
48+
devicePutTokenMulti3_ = produces(config.getParameter<std::string>("productInstanceName"));
49+
}
50+
51+
void beginStream(edm::StreamID sid, Queue queue) override { queue_ = queue; }
52+
53+
void endStream(Queue queue) override { queue_.reset(); }
54+
55+
void produce(device::Event& iEvent, device::EventSetup const& iSetup) override {
56+
[[maybe_unused]] auto inpData = iEvent.getHandle(getToken_);
57+
[[maybe_unused]] auto const& esData = iSetup.getData(esToken_);
58+
59+
auto deviceProduct = std::make_unique<portabletest::TestDeviceCollection>(iEvent.queue(), size_);
60+
auto deviceProductMulti2 = std::make_unique<portabletest::TestDeviceCollection2>(iEvent.queue(), size_, size2_);
61+
auto deviceProductMulti3 =
62+
std::make_unique<portabletest::TestDeviceCollection3>(iEvent.queue(), size_, size2_, size3_);
63+
64+
// run the algorithm, potentially asynchronously
65+
algo_.fill(iEvent.queue(), *deviceProduct);
66+
algo_.fillMulti2(iEvent.queue(), *deviceProductMulti2);
67+
algo_.fillMulti3(iEvent.queue(), *deviceProductMulti3);
68+
69+
iEvent.put(devicePutToken_, std::move(deviceProduct));
70+
iEvent.put(devicePutTokenMulti2_, std::move(deviceProductMulti2));
71+
iEvent.put(devicePutTokenMulti3_, std::move(deviceProductMulti3));
72+
73+
assert(iEvent.queue() == *queue_);
74+
}
75+
76+
static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
77+
edm::ParameterSetDescription desc;
78+
desc.add<edm::InputTag>("source");
79+
desc.add("eventSetupSource", edm::ESInputTag{});
80+
desc.add<std::string>("productInstanceName", "");
81+
82+
edm::ParameterSetDescription psetSize;
83+
psetSize.add<int32_t>("alpaka_serial_sync");
84+
psetSize.add<int32_t>("alpaka_cuda_async");
85+
psetSize.add<int32_t>("alpaka_rocm_async");
86+
desc.add("size", psetSize);
87+
88+
descriptions.addWithDefaultLabel(desc);
89+
}
90+
91+
private:
92+
edm::EDGetTokenT<edmtest::IntProduct> getToken_;
93+
device::ESGetToken<cms::alpakatest::AlpakaESTestDataB<Device>, AlpakaESTestRecordB> esToken_;
94+
device::EDPutToken<portabletest::TestDeviceCollection> devicePutToken_;
95+
device::EDPutToken<portabletest::TestDeviceCollection2> devicePutTokenMulti2_;
96+
device::EDPutToken<portabletest::TestDeviceCollection3> devicePutTokenMulti3_;
97+
const int32_t size_;
98+
const int32_t size2_;
99+
const int32_t size3_;
100+
101+
// implementation of the algorithm
102+
TestAlgo algo_;
103+
104+
// validation of the queue logic
105+
std::optional<Queue> queue_;
106+
};
107+
108+
} // namespace ALPAKA_ACCELERATOR_NAMESPACE
109+
110+
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/MakerMacros.h"
111+
DEFINE_FWK_ALPAKA_MODULE(TestAlpakaStreamFixedQueueProducer);
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
#include "DataFormats/PortableTestObjects/interface/alpaka/TestDeviceCollection.h"
2+
#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
3+
#include "FWCore/ParameterSet/interface/ParameterSet.h"
4+
#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
5+
#include "FWCore/Utilities/interface/InputTag.h"
6+
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/stream/FixedQueueEDProducer.h"
7+
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/EDPutToken.h"
8+
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/ESGetToken.h"
9+
#include "HeterogeneousCore/AlpakaInterface/interface/config.h"
10+
#include "HeterogeneousCore/AlpakaTest/interface/AlpakaESTestRecords.h"
11+
#include "HeterogeneousCore/AlpakaTest/interface/alpaka/AlpakaESTestData.h"
12+
13+
#include "TestAlgo.h"
14+
15+
namespace ALPAKA_ACCELERATOR_NAMESPACE {
16+
/**
17+
* This class demonstrates a stream FixedQueueEDProducer that
18+
* - consumes a device ESProduct
19+
* - consumes a device EDProduct
20+
* - produces a device EDProduct (that can get transferred to host automatically)
21+
*
22+
* Unlike stream::EDProducer, that uses a random device queue for every event,
23+
* stream::FixedQueueEDProducer always uses the same device queue for all the events
24+
* processed by a given framework stream.
25+
*
26+
* This module tests that the queue being used is always the same, even if a device
27+
* product with a reusable queue is consumed first.
28+
*/
29+
class TestAlpakaStreamFixedQueueProducerE : public stream::FixedQueueEDProducer<> {
30+
public:
31+
TestAlpakaStreamFixedQueueProducerE(edm::ParameterSet const& config)
32+
: FixedQueueEDProducer<>(config),
33+
esToken_(esConsumes(config.getParameter<edm::ESInputTag>("eventSetupSource"))),
34+
getToken_(consumes(config.getParameter<edm::InputTag>("source"))),
35+
getTokenMulti2_(consumes(config.getParameter<edm::InputTag>("source"))),
36+
getTokenMulti3_(consumes(config.getParameter<edm::InputTag>("source"))),
37+
putToken_{produces()},
38+
putTokenMulti2_{produces()},
39+
putTokenMulti3_{produces()} {}
40+
41+
void beginStream(edm::StreamID sid, Queue queue) override { queue_ = queue; }
42+
43+
void endStream(Queue queue) override { queue_.reset(); }
44+
45+
void produce(device::Event& iEvent, device::EventSetup const& iSetup) override {
46+
auto const& esData = iSetup.getData(esToken_);
47+
auto const& input = iEvent.get(getToken_);
48+
auto const& inputMulti2 = iEvent.get(getTokenMulti2_);
49+
auto const& inputMulti3 = iEvent.get(getTokenMulti3_);
50+
51+
// run the algorithm, potentially asynchronously
52+
auto deviceProduct = algo_.update(iEvent.queue(), input, esData);
53+
auto deviceProductMulti2 = algo_.updateMulti2(iEvent.queue(), inputMulti2, esData);
54+
auto deviceProductMulti3 = algo_.updateMulti3(iEvent.queue(), inputMulti3, esData);
55+
56+
iEvent.emplace(putToken_, std::move(deviceProduct));
57+
iEvent.emplace(putTokenMulti2_, std::move(deviceProductMulti2));
58+
iEvent.emplace(putTokenMulti3_, std::move(deviceProductMulti3));
59+
60+
assert(iEvent.queue() == *queue_);
61+
}
62+
63+
static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
64+
edm::ParameterSetDescription desc;
65+
desc.add("eventSetupSource", edm::ESInputTag{});
66+
desc.add("source", edm::InputTag{});
67+
68+
descriptions.addWithDefaultLabel(desc);
69+
}
70+
71+
private:
72+
const device::ESGetToken<AlpakaESTestDataEDevice, AlpakaESTestRecordC> esToken_;
73+
const device::EDGetToken<portabletest::TestDeviceCollection> getToken_;
74+
const device::EDGetToken<portabletest::TestDeviceCollection2> getTokenMulti2_;
75+
const device::EDGetToken<portabletest::TestDeviceCollection3> getTokenMulti3_;
76+
const device::EDPutToken<portabletest::TestDeviceCollection> putToken_;
77+
const device::EDPutToken<portabletest::TestDeviceCollection2> putTokenMulti2_;
78+
const device::EDPutToken<portabletest::TestDeviceCollection3> putTokenMulti3_;
79+
80+
// implementation of the algorithm
81+
TestAlgo algo_;
82+
83+
// validation of the queue logic
84+
std::optional<Queue> queue_;
85+
};
86+
87+
} // namespace ALPAKA_ACCELERATOR_NAMESPACE
88+
89+
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/MakerMacros.h"
90+
DEFINE_FWK_ALPAKA_MODULE(TestAlpakaStreamFixedQueueProducerE);

HeterogeneousCore/AlpakaTest/test/testAlpakaModules_cfg.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@
8080
process.alpakaGlobalProducerE = cms.EDProducer("TestAlpakaGlobalProducerE@alpaka",
8181
source = cms.InputTag("alpakaGlobalProducer")
8282
)
83+
process.alpakaStreamFixedQueueProducerE = cms.EDProducer("TestAlpakaStreamFixedQueueProducerE@alpaka",
84+
source = cms.InputTag("alpakaGlobalProducer")
85+
)
8386
process.alpakaGlobalProducerCopyToDeviceCache = cms.EDProducer("TestAlpakaGlobalProducerCopyToDeviceCache@alpaka",
8487
source = cms.InputTag("alpakaGlobalProducer"),
8588
x = cms.int32(3),
@@ -124,6 +127,15 @@
124127
alpaka_rocm_async = cms.int32(3),
125128
)
126129
)
130+
process.alpakaStreamFixedQueueProducer = cms.EDProducer("TestAlpakaStreamFixedQueueProducer@alpaka",
131+
source = cms.InputTag("intProduct"),
132+
eventSetupSource = cms.ESInputTag("alpakaESProducerB", "explicitLabel"),
133+
size = cms.PSet(
134+
alpaka_serial_sync = cms.int32(5),
135+
alpaka_cuda_async = cms.int32(25),
136+
alpaka_rocm_async = cms.int32(125),
137+
)
138+
)
127139

128140
process.alpakaGlobalConsumer = cms.EDAnalyzer("TestAlpakaAnalyzer",
129141
source = cms.InputTag("alpakaGlobalProducer"),
@@ -177,6 +189,12 @@
177189
process.alpakaNullESConsumer = cms.EDProducer("TestAlpakaGlobalProducerNullES@alpaka",
178190
eventSetupSource = cms.ESInputTag("", "null")
179191
)
192+
process.alpakaStreamFixedQueueProducerDeviceConsumer = process.alpakaGlobalDeviceConsumer.clone(
193+
source = "alpakaStreamFixedQueueProducer"
194+
)
195+
process.alpakaStreamFixedQueueProducerGlobalConsumerE = process.alpakaGlobalConsumerE.clone(
196+
source = "alpakaStreamFixedQueueProducerE"
197+
)
180198

181199
_postfixes = ["ESProducerA", "ESProducerB", "ESProducerC", "ESProducerD", "ESProducerE", "ESProducerBlocks",
182200
"ESProducerNull",
@@ -188,6 +206,8 @@
188206
"GlobalConsumerImplicitCopyToDevice", "GlobalConsumerImplicitCopyToDeviceInstance",
189207
"GlobalDeviceConsumer", "StreamDeviceConsumer",
190208
"StreamSynchronizingProducerToDeviceDeviceConsumer1", "StreamSynchronizingProducerToDeviceDeviceConsumer2",
209+
"StreamFixedQueueProducer", "StreamFixedQueueProducerDeviceConsumer",
210+
"StreamFixedQueueProducerE",
191211
"NullESConsumer"]
192212
alpakaModules = ["alpaka"+x for x in _postfixes]
193213
if args.processAcceleratorBackend != "":
@@ -209,6 +229,8 @@ def setExpect(m, size):
209229
setExpect(process.alpakaStreamConsumer, size=25)
210230
setExpect(process.alpakaStreamInstanceConsumer, size=36)
211231
setExpect(process.alpakaStreamSynchronizingConsumer, size=20)
232+
setExpect(process.alpakaStreamFixedQueueProducerGlobalConsumerE, size=20)
233+
process.alpakaStreamFixedQueueProducerGlobalConsumerE.expectXvalues.extend([0]*(20-10))
212234
elif args.expectBackend == "rocm_async":
213235
def setExpect(m, size):
214236
m.expectSize = size
@@ -223,6 +245,8 @@ def setExpect(m, size):
223245
setExpect(process.alpakaStreamConsumer, size = 125)
224246
setExpect(process.alpakaStreamInstanceConsumer, size = 216)
225247
setExpect(process.alpakaStreamSynchronizingConsumer, size = 30)
248+
setExpect(process.alpakaStreamFixedQueueProducerGlobalConsumerE, size=30)
249+
process.alpakaStreamFixedQueueProducerGlobalConsumerE.expectXvalues.extend([0]*(30-10))
226250

227251
if args.processAcceleratorSynchronize:
228252
process.ProcessAcceleratorAlpaka.setSynchronize(True)
@@ -254,7 +278,9 @@ def setExpect(m, size):
254278
process.alpakaStreamProducer,
255279
process.alpakaStreamInstanceProducer,
256280
process.alpakaStreamSynchronizingProducer,
257-
process.alpakaStreamSynchronizingProducerToDevice
281+
process.alpakaStreamSynchronizingProducerToDevice,
282+
process.alpakaStreamFixedQueueProducer,
283+
process.alpakaStreamFixedQueueProducerE,
258284
)
259285
process.p = cms.Path(
260286
process.alpakaGlobalConsumer+
@@ -270,6 +296,8 @@ def setExpect(m, size):
270296
process.alpakaStreamSynchronizingConsumer+
271297
process.alpakaStreamSynchronizingProducerToDeviceDeviceConsumer1+
272298
process.alpakaStreamSynchronizingProducerToDeviceDeviceConsumer2+
299+
process.alpakaStreamFixedQueueProducerDeviceConsumer+
300+
process.alpakaStreamFixedQueueProducerGlobalConsumerE+
273301
process.alpakaNullESConsumer,
274302
process.t
275303
)

0 commit comments

Comments
 (0)