Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions FWCore/Framework/interface/OccurrenceForOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ namespace edm {
Handle<PROD> getHandle(EDGetTokenT<PROD> token) const;

Provenance getProvenance(BranchID const& theID) const;
StableProvenance const& getStableProvenance(BranchID const& ithID) const;

void getAllProvenance(std::vector<Provenance const*>& provenances) const;

Expand Down
5 changes: 4 additions & 1 deletion FWCore/Framework/interface/Principal.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ namespace edm {

~Principal() override;

void adjustIndexesAfterProductRegistryAddition();
//This should only be called when this Principal is not being actively used
void possiblyUpdateAfterAddition(std::shared_ptr<ProductRegistry const>);

void fillPrincipal(DelayedReader* reader);
void fillPrincipal(ProcessHistoryID const& hist, ProcessHistory const* phr, DelayedReader* reader);
Expand Down Expand Up @@ -213,6 +214,8 @@ namespace edm {
}

private:
void adjustIndexesAfterProductRegistryAddition();

//called by adjustIndexesAfterProductRegistryAddition only if an index actually changed
virtual void changedIndexes_() {}

Expand Down
2 changes: 1 addition & 1 deletion FWCore/Framework/interface/PrincipalCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ namespace edm {

void adjustEventsToNewProductRegistry(std::shared_ptr<ProductRegistry const>);

void adjustIndexesAfterProductRegistryAddition();
void adjustIndexesAfterProductRegistryAddition(std::shared_ptr<ProductRegistry const>);

private:
std::unique_ptr<ProcessBlockPrincipal> processBlockPrincipal_;
Expand Down
27 changes: 18 additions & 9 deletions FWCore/Framework/src/EventProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1023,20 +1023,27 @@ namespace edm {
SendSourceTerminationSignalIfException sentry(actReg_.get());

if (streamRunActive_ > 0) {
//deals with data structures that allows merged Run products to be split on Lumi boundaries then
// in later processes reintegrated.
streamRunStatus_[0]->runPrincipal()->preReadFile();
streamRunStatus_[0]->runPrincipal()->adjustIndexesAfterProductRegistryAddition();
}

if (streamLumiActive_ > 0) {
streamLumiStatus_[0]->lumiPrincipal()->adjustIndexesAfterProductRegistryAddition();
}

auto sizeBefore = input_->productRegistry().size();
fb_ = input_->readFile();
//incase the input's registry changed
const size_t size = preg_->size();
preg_->merge(input_->productRegistry(), fb_ ? fb_->fileName() : std::string());
if (size < preg_->size()) {
principalCache_.adjustIndexesAfterProductRegistryAddition();
if (input_->productRegistry().size() != sizeBefore) {
auto temp = std::make_shared<edm::ProductRegistry>(*preg_);
temp->merge(input_->productRegistry(), fb_ ? fb_->fileName() : std::string());
preg_ = std::move(temp);
//This handles are presently unused Run/Lumis
principalCache_.adjustIndexesAfterProductRegistryAddition(edm::get_underlying_safe(preg_));
if (streamLumiActive_ > 0) {
//Can update the active ones now, even before an `end` transition is called because no OutputModule
// supports storing ProductDescriptions for Run/LuminosityBlock products which were dropped. Since only
// dropped products can change the ProductRegistry, only changes in Event can cause that.
streamRunStatus_[0]->runPrincipal()->possiblyUpdateAfterAddition(edm::get_underlying_safe(preg_));
streamLumiStatus_[0]->lumiPrincipal()->possiblyUpdateAfterAddition(edm::get_underlying_safe(preg_));
}
}
principalCache_.adjustEventsToNewProductRegistry(preg());
if (preallocations_.numberOfStreams() > 1 and preallocations_.numberOfThreads() > 1) {
Expand Down Expand Up @@ -2022,6 +2029,7 @@ namespace edm {

std::shared_ptr<RunPrincipal> EventProcessor::readRun() {
auto rp = principalCache_.getAvailableRunPrincipalPtr();
rp->possiblyUpdateAfterAddition(preg());
assert(rp);
rp->setAux(*input_->runAuxiliary());
{
Expand All @@ -2046,6 +2054,7 @@ namespace edm {

std::shared_ptr<LuminosityBlockPrincipal> EventProcessor::readLuminosityBlock(std::shared_ptr<RunPrincipal> rp) {
auto lbp = principalCache_.getAvailableLumiPrincipalPtr();
lbp->possiblyUpdateAfterAddition(preg());
assert(lbp);
lbp->setAux(*input_->luminosityBlockAuxiliary());
{
Expand Down
4 changes: 4 additions & 0 deletions FWCore/Framework/src/OccurrenceForOutput.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ namespace edm {
return provRecorder_.principal().getProvenance(bid);
}

StableProvenance const& OccurrenceForOutput::getStableProvenance(BranchID const& bid) const {
return provRecorder_.principal().getStableProvenance(bid);
}

void OccurrenceForOutput::getAllProvenance(std::vector<Provenance const*>& provenances) const {
provRecorder_.principal().getAllProvenance(provenances);
}
Expand Down
8 changes: 7 additions & 1 deletion FWCore/Framework/src/Principal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
#include <stdexcept>
#include <typeinfo>
#include <atomic>

namespace edm {

static ProcessHistory const s_emptyProcessHistory;
Expand Down Expand Up @@ -149,6 +148,13 @@ namespace edm {
return size;
}

void Principal::possiblyUpdateAfterAddition(std::shared_ptr<ProductRegistry const> iProd) {
if (iProd.get() != preg_.get()) {
preg_ = iProd;
adjustIndexesAfterProductRegistryAddition();
}
}

void Principal::addDroppedProduct(ProductDescription const& bd) {
addProductOrThrow(std::make_unique<DroppedDataProductResolver>(std::make_shared<ProductDescription const>(bd)));
}
Expand Down
8 changes: 4 additions & 4 deletions FWCore/Framework/src/PrincipalCache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,22 @@ namespace edm {
void PrincipalCache::adjustEventsToNewProductRegistry(std::shared_ptr<ProductRegistry const> reg) {
for (auto& eventPrincipal : eventPrincipals_) {
if (eventPrincipal) {
eventPrincipal->adjustIndexesAfterProductRegistryAddition();
eventPrincipal->possiblyUpdateAfterAddition(reg);
}
}
}

void PrincipalCache::adjustIndexesAfterProductRegistryAddition() {
void PrincipalCache::adjustIndexesAfterProductRegistryAddition(std::shared_ptr<ProductRegistry const> iReg) {
//Need to temporarily hold all the runs to clear out the runHolder_
std::vector<std::shared_ptr<RunPrincipal>> tempRunPrincipals;
while (auto p = runHolder_.tryToGet()) {
p->adjustIndexesAfterProductRegistryAddition();
p->possiblyUpdateAfterAddition(iReg);
tempRunPrincipals.emplace_back(std::move(p));
}
//Need to temporarily hold all the lumis to clear out the lumiHolder_
std::vector<std::shared_ptr<LuminosityBlockPrincipal>> tempLumiPrincipals;
while (auto p = lumiHolder_.tryToGet()) {
p->adjustIndexesAfterProductRegistryAddition();
p->possiblyUpdateAfterAddition(iReg);
tempLumiPrincipals.emplace_back(std::move(p));
}
}
Expand Down
10 changes: 10 additions & 0 deletions FWCore/Framework/test/stubs/ToyIntProducers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -458,13 +458,23 @@ namespace edmtest {
for (auto const& label : labels) {
tokens_.emplace_back(consumes(label));
}
{
auto const& labels = p.getUntrackedParameter<std::vector<edm::InputTag>>("untrackedLabels");
for (auto const& label : labels) {
tokens_.emplace_back(consumes(label));
}
}
}
void produce(edm::StreamID, edm::Event& e, edm::EventSetup const& c) const override;

static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
edm::ParameterSetDescription desc;
desc.addUntracked<unsigned int>("onlyGetOnEvent", 0u);
desc.add<std::vector<edm::InputTag>>("labels");
desc.addUntracked<std::vector<edm::InputTag>>("untrackedLabels", {})
->setComment(
"Using this can change the stored ProductRegistry for the same ProcessHistory if this is the only module "
"that depends on these labels.");
descriptions.addDefault(desc);
}

Expand Down
1 change: 1 addition & 0 deletions FWCore/Integration/test/BuildFile.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
<test name="TestIntegrationEmptyRootFile" command="run_TestEmptyRootFile.sh"/>
<test name="TestStdProducts" command="cmsRun ${LOCALTOP}/src/FWCore/Integration/test/testStdProducts_cfg.py"/>
<test name="TestPostInsertProducer" command="cmsRun ${LOCALTOP}/src/FWCore/Integration/test/testPostInsertProducer_cfg.py"/>
<test name="TestFWCoreIntegrationProvenance" command ="provenance_test.sh"/>

<bin file="ProcessConfiguration_t.cpp,ProcessHistory_t.cpp" name="TestIntegrationDataFormatsProvenance">
<use name="FWCore/ParameterSet"/>
Expand Down
13 changes: 13 additions & 0 deletions FWCore/Integration/test/provenance_check_cfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import FWCore.ParameterSet.Config as cms

process = cms.Process("READ")

from IOPool.Input.modules import PoolSource

process.source = PoolSource(fileNames = ["file:prov.root", "file:prov_extra.root"])

from FWCore.Modules.modules import ProvenanceCheckerOutputModule, AsciiOutputModule
process.out = ProvenanceCheckerOutputModule()
process.prnt = AsciiOutputModule(verbosity = 2, allProvenance=True)

process.e = cms.EndPath(process.out+process.prnt)
41 changes: 41 additions & 0 deletions FWCore/Integration/test/provenance_prod_cfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import FWCore.ParameterSet.Config as cms
from argparse import ArgumentParser

parser = ArgumentParser(description='Write streamer output file for provenance read test')
parser.add_argument("--consumeProd2", help="add an extra producer to the job and drop on output", action="store_true")
args = parser.parse_args()


process = cms.Process("OUTPUT")

from FWCore.Modules.modules import EmptySource

runNumber = 1
eventNumber = 1
if args.consumeProd2:
eventNumber = 2

process.source = EmptySource(firstRun = runNumber, firstEvent = eventNumber )

from FWCore.Framework.modules import AddIntsProducer, IntProducer

process.one = IntProducer(ivalue=1)
process.two = IntProducer(ivalue=2)
process.sum = AddIntsProducer(labels=['one'])
process.t = cms.Task(process.one, process.two, process.sum)

baseOutFileName = "prov"
if args.consumeProd2 :
process.sum.untrackedLabels = ['two']
baseOutFileName += "_extra"


from IOPool.Output.modules import PoolOutputModule

process.out = PoolOutputModule(fileName = baseOutFileName+".root",
outputCommands = ["drop *", "keep *_sum_*_*"])

from FWCore.Modules.modules import AsciiOutputModule
process.prnt = AsciiOutputModule(verbosity = 2, allProvenance = True)
process.e = cms.EndPath(process.out+process.prnt, process.t)
process.maxEvents.input = 1
12 changes: 12 additions & 0 deletions FWCore/Integration/test/provenance_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/sh


function die { echo $1: status $2 ; exit $2; }

#The two jobs will have different ProductRegistries in their output files but have the same ProcessHistory.
# The ProductRegistry just differ because the internal dependencies between the data products is different
# and PoolOutputModule only stores provenance of 'dropped' data products IFF they are parents of a kept product.
# The check makes sure the provenance in the ProductRegistry is properly updated when the new file is read
cmsRun ${SCRAM_TEST_PATH}/provenance_prod_cfg.py || die 'Failed in provenance_prod_cfg.py' $?
cmsRun ${SCRAM_TEST_PATH}/provenance_prod_cfg.py --consumeProd2 || die 'Failed in provenance_prod_cfg.py --consumeProd2' $?
cmsRun ${SCRAM_TEST_PATH}/provenance_check_cfg.py || die 'Failed test of provenance' $?
29 changes: 28 additions & 1 deletion FWCore/Modules/src/AsciiOutputModule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@ namespace edm {
int prescale_;
int verbosity_;
int counter_;
bool allProvenance_;
};

AsciiOutputModule::AsciiOutputModule(ParameterSet const& pset)
: global::OutputModuleBase(pset),
global::OutputModule<>(pset),
prescale_(pset.getUntrackedParameter<unsigned int>("prescale")),
verbosity_(pset.getUntrackedParameter<unsigned int>("verbosity")),
counter_(0) {
counter_(0),
allProvenance_(pset.getUntrackedParameter<bool>("allProvenance")) {
if (prescale_ == 0)
prescale_ = 1;
}
Expand Down Expand Up @@ -81,6 +83,29 @@ namespace edm {
auto const& prov = e.getProvenance(desc.originalBranchID());
LogAbsolute("AsciiOut") << prov;

if (verbosity_ > 2) {
ProductDescription const& desc2 = prov.productDescription();
std::string const& process = desc2.processName();
std::string const& label = desc2.moduleLabel();
ProcessHistory const& processHistory = e.processHistory();

for (ProcessConfiguration const& pc : processHistory) {
if (pc.processName() == process) {
ParameterSetID const& psetID = pc.parameterSetID();
pset::Registry const* psetRegistry = pset::Registry::instance();
ParameterSet const* processPset = psetRegistry->getMapped(psetID);
if (processPset) {
if (desc.isAlias()) {
LogAbsolute("AsciiOut") << "Alias PSet\n" << processPset->getParameterSet(desc.moduleLabel());
}
LogAbsolute("AsciiOut") << processPset->getParameterSet(label) << "\n";
}
}
}
}
} else if (allProvenance_) {
auto const& prov = e.getStableProvenance(desc.originalBranchID());
LogAbsolute("AsciiOut") << prov;
if (verbosity_ > 2) {
ProductDescription const& desc2 = prov.productDescription();
std::string const& process = desc2.processName();
Expand Down Expand Up @@ -115,6 +140,8 @@ namespace edm {
"1: event ID and timestamp only\n"
"2: provenance for each kept product\n"
">2: PSet and provenance for each kept product");
desc.addUntracked("allProvenance", false)
->setComment("when printing provenance info, also print stable provenance of non-kept data products.");
OutputModule::fillDescription(desc);
descriptions.add("asciiOutput", desc);
}
Expand Down
2 changes: 1 addition & 1 deletion FWCore/TestProcessor/src/TestSourceProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ namespace edm::test {
const size_t size = preg_->size();
preg_->merge(source_->productRegistry(), fb_ ? fb_->fileName() : std::string());
if (size < preg_->size()) {
principalCache_.adjustIndexesAfterProductRegistryAddition();
principalCache_.adjustIndexesAfterProductRegistryAddition(preg_);
}
principalCache_.adjustEventsToNewProductRegistry(preg_);

Expand Down