diff --git a/FWCore/Framework/interface/OccurrenceForOutput.h b/FWCore/Framework/interface/OccurrenceForOutput.h index aa2065ca85710..ab136fcf731cd 100644 --- a/FWCore/Framework/interface/OccurrenceForOutput.h +++ b/FWCore/Framework/interface/OccurrenceForOutput.h @@ -64,6 +64,7 @@ namespace edm { Handle getHandle(EDGetTokenT token) const; Provenance getProvenance(BranchID const& theID) const; + StableProvenance const& getStableProvenance(BranchID const& ithID) const; void getAllProvenance(std::vector& provenances) const; diff --git a/FWCore/Framework/interface/Principal.h b/FWCore/Framework/interface/Principal.h index 954526fea7515..bf8a9bc95dbe1 100644 --- a/FWCore/Framework/interface/Principal.h +++ b/FWCore/Framework/interface/Principal.h @@ -74,7 +74,8 @@ namespace edm { ~Principal() override; - void adjustIndexesAfterProductRegistryAddition(); + //This should only be called when this Principal is not being actively used + void possiblyUpdateAfterAddition(std::shared_ptr); void fillPrincipal(DelayedReader* reader); void fillPrincipal(ProcessHistoryID const& hist, ProcessHistory const* phr, DelayedReader* reader); @@ -213,6 +214,8 @@ namespace edm { } private: + void adjustIndexesAfterProductRegistryAddition(); + //called by adjustIndexesAfterProductRegistryAddition only if an index actually changed virtual void changedIndexes_() {} diff --git a/FWCore/Framework/interface/PrincipalCache.h b/FWCore/Framework/interface/PrincipalCache.h index de66679f581bf..06f1ad7908d94 100644 --- a/FWCore/Framework/interface/PrincipalCache.h +++ b/FWCore/Framework/interface/PrincipalCache.h @@ -53,7 +53,7 @@ namespace edm { void adjustEventsToNewProductRegistry(std::shared_ptr); - void adjustIndexesAfterProductRegistryAddition(); + void adjustIndexesAfterProductRegistryAddition(std::shared_ptr); private: std::unique_ptr processBlockPrincipal_; diff --git a/FWCore/Framework/src/EventProcessor.cc b/FWCore/Framework/src/EventProcessor.cc index 479047f8a2d9b..caed4bfd4ce1c 100644 --- a/FWCore/Framework/src/EventProcessor.cc +++ b/FWCore/Framework/src/EventProcessor.cc @@ -1023,20 +1023,27 @@ namespace edm { SendSourceTerminationSignalIfException sentry(actReg_.get()); if (streamRunActive_ > 0) { + //deals with data structures that allows merged Run products to be split on Lumi boundaries then + // in later processes reintegrated. streamRunStatus_[0]->runPrincipal()->preReadFile(); - streamRunStatus_[0]->runPrincipal()->adjustIndexesAfterProductRegistryAddition(); - } - - if (streamLumiActive_ > 0) { - streamLumiStatus_[0]->lumiPrincipal()->adjustIndexesAfterProductRegistryAddition(); } + auto sizeBefore = input_->productRegistry().size(); fb_ = input_->readFile(); //incase the input's registry changed - const size_t size = preg_->size(); - preg_->merge(input_->productRegistry(), fb_ ? fb_->fileName() : std::string()); - if (size < preg_->size()) { - principalCache_.adjustIndexesAfterProductRegistryAddition(); + if (input_->productRegistry().size() != sizeBefore) { + auto temp = std::make_shared(*preg_); + temp->merge(input_->productRegistry(), fb_ ? fb_->fileName() : std::string()); + preg_ = std::move(temp); + //This handles are presently unused Run/Lumis + principalCache_.adjustIndexesAfterProductRegistryAddition(edm::get_underlying_safe(preg_)); + if (streamLumiActive_ > 0) { + //Can update the active ones now, even before an `end` transition is called because no OutputModule + // supports storing ProductDescriptions for Run/LuminosityBlock products which were dropped. Since only + // dropped products can change the ProductRegistry, only changes in Event can cause that. + streamRunStatus_[0]->runPrincipal()->possiblyUpdateAfterAddition(edm::get_underlying_safe(preg_)); + streamLumiStatus_[0]->lumiPrincipal()->possiblyUpdateAfterAddition(edm::get_underlying_safe(preg_)); + } } principalCache_.adjustEventsToNewProductRegistry(preg()); if (preallocations_.numberOfStreams() > 1 and preallocations_.numberOfThreads() > 1) { @@ -2022,6 +2029,7 @@ namespace edm { std::shared_ptr EventProcessor::readRun() { auto rp = principalCache_.getAvailableRunPrincipalPtr(); + rp->possiblyUpdateAfterAddition(preg()); assert(rp); rp->setAux(*input_->runAuxiliary()); { @@ -2046,6 +2054,7 @@ namespace edm { std::shared_ptr EventProcessor::readLuminosityBlock(std::shared_ptr rp) { auto lbp = principalCache_.getAvailableLumiPrincipalPtr(); + lbp->possiblyUpdateAfterAddition(preg()); assert(lbp); lbp->setAux(*input_->luminosityBlockAuxiliary()); { diff --git a/FWCore/Framework/src/OccurrenceForOutput.cc b/FWCore/Framework/src/OccurrenceForOutput.cc index 46453619c6150..e36f9770b771a 100644 --- a/FWCore/Framework/src/OccurrenceForOutput.cc +++ b/FWCore/Framework/src/OccurrenceForOutput.cc @@ -29,6 +29,10 @@ namespace edm { return provRecorder_.principal().getProvenance(bid); } + StableProvenance const& OccurrenceForOutput::getStableProvenance(BranchID const& bid) const { + return provRecorder_.principal().getStableProvenance(bid); + } + void OccurrenceForOutput::getAllProvenance(std::vector& provenances) const { provRecorder_.principal().getAllProvenance(provenances); } diff --git a/FWCore/Framework/src/Principal.cc b/FWCore/Framework/src/Principal.cc index 14c431b7087d4..4154960351651 100644 --- a/FWCore/Framework/src/Principal.cc +++ b/FWCore/Framework/src/Principal.cc @@ -31,7 +31,6 @@ #include #include #include - namespace edm { static ProcessHistory const s_emptyProcessHistory; @@ -149,6 +148,13 @@ namespace edm { return size; } + void Principal::possiblyUpdateAfterAddition(std::shared_ptr iProd) { + if (iProd.get() != preg_.get()) { + preg_ = iProd; + adjustIndexesAfterProductRegistryAddition(); + } + } + void Principal::addDroppedProduct(ProductDescription const& bd) { addProductOrThrow(std::make_unique(std::make_shared(bd))); } diff --git a/FWCore/Framework/src/PrincipalCache.cc b/FWCore/Framework/src/PrincipalCache.cc index 52c8dee0d7e46..8d039d9d3e721 100644 --- a/FWCore/Framework/src/PrincipalCache.cc +++ b/FWCore/Framework/src/PrincipalCache.cc @@ -43,22 +43,22 @@ namespace edm { void PrincipalCache::adjustEventsToNewProductRegistry(std::shared_ptr reg) { for (auto& eventPrincipal : eventPrincipals_) { if (eventPrincipal) { - eventPrincipal->adjustIndexesAfterProductRegistryAddition(); + eventPrincipal->possiblyUpdateAfterAddition(reg); } } } - void PrincipalCache::adjustIndexesAfterProductRegistryAddition() { + void PrincipalCache::adjustIndexesAfterProductRegistryAddition(std::shared_ptr iReg) { //Need to temporarily hold all the runs to clear out the runHolder_ std::vector> tempRunPrincipals; while (auto p = runHolder_.tryToGet()) { - p->adjustIndexesAfterProductRegistryAddition(); + p->possiblyUpdateAfterAddition(iReg); tempRunPrincipals.emplace_back(std::move(p)); } //Need to temporarily hold all the lumis to clear out the lumiHolder_ std::vector> tempLumiPrincipals; while (auto p = lumiHolder_.tryToGet()) { - p->adjustIndexesAfterProductRegistryAddition(); + p->possiblyUpdateAfterAddition(iReg); tempLumiPrincipals.emplace_back(std::move(p)); } } diff --git a/FWCore/Framework/test/stubs/ToyIntProducers.cc b/FWCore/Framework/test/stubs/ToyIntProducers.cc index 4e1433e25d80e..bcad657d3765a 100644 --- a/FWCore/Framework/test/stubs/ToyIntProducers.cc +++ b/FWCore/Framework/test/stubs/ToyIntProducers.cc @@ -458,6 +458,12 @@ namespace edmtest { for (auto const& label : labels) { tokens_.emplace_back(consumes(label)); } + { + auto const& labels = p.getUntrackedParameter>("untrackedLabels"); + for (auto const& label : labels) { + tokens_.emplace_back(consumes(label)); + } + } } void produce(edm::StreamID, edm::Event& e, edm::EventSetup const& c) const override; @@ -465,6 +471,10 @@ namespace edmtest { edm::ParameterSetDescription desc; desc.addUntracked("onlyGetOnEvent", 0u); desc.add>("labels"); + desc.addUntracked>("untrackedLabels", {}) + ->setComment( + "Using this can change the stored ProductRegistry for the same ProcessHistory if this is the only module " + "that depends on these labels."); descriptions.addDefault(desc); } diff --git a/FWCore/Integration/test/BuildFile.xml b/FWCore/Integration/test/BuildFile.xml index 7abb1a2c2d89a..008dd1ca98312 100644 --- a/FWCore/Integration/test/BuildFile.xml +++ b/FWCore/Integration/test/BuildFile.xml @@ -43,6 +43,7 @@ + diff --git a/FWCore/Integration/test/provenance_check_cfg.py b/FWCore/Integration/test/provenance_check_cfg.py new file mode 100644 index 0000000000000..d1d316d4c309c --- /dev/null +++ b/FWCore/Integration/test/provenance_check_cfg.py @@ -0,0 +1,13 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("READ") + +from IOPool.Input.modules import PoolSource + +process.source = PoolSource(fileNames = ["file:prov.root", "file:prov_extra.root"]) + +from FWCore.Modules.modules import ProvenanceCheckerOutputModule, AsciiOutputModule +process.out = ProvenanceCheckerOutputModule() +process.prnt = AsciiOutputModule(verbosity = 2, allProvenance=True) + +process.e = cms.EndPath(process.out+process.prnt) diff --git a/FWCore/Integration/test/provenance_prod_cfg.py b/FWCore/Integration/test/provenance_prod_cfg.py new file mode 100644 index 0000000000000..e1cd6390317a5 --- /dev/null +++ b/FWCore/Integration/test/provenance_prod_cfg.py @@ -0,0 +1,41 @@ +import FWCore.ParameterSet.Config as cms +from argparse import ArgumentParser + +parser = ArgumentParser(description='Write streamer output file for provenance read test') +parser.add_argument("--consumeProd2", help="add an extra producer to the job and drop on output", action="store_true") +args = parser.parse_args() + + +process = cms.Process("OUTPUT") + +from FWCore.Modules.modules import EmptySource + +runNumber = 1 +eventNumber = 1 +if args.consumeProd2: + eventNumber = 2 + +process.source = EmptySource(firstRun = runNumber, firstEvent = eventNumber ) + +from FWCore.Framework.modules import AddIntsProducer, IntProducer + +process.one = IntProducer(ivalue=1) +process.two = IntProducer(ivalue=2) +process.sum = AddIntsProducer(labels=['one']) +process.t = cms.Task(process.one, process.two, process.sum) + +baseOutFileName = "prov" +if args.consumeProd2 : + process.sum.untrackedLabels = ['two'] + baseOutFileName += "_extra" + + +from IOPool.Output.modules import PoolOutputModule + +process.out = PoolOutputModule(fileName = baseOutFileName+".root", + outputCommands = ["drop *", "keep *_sum_*_*"]) + +from FWCore.Modules.modules import AsciiOutputModule +process.prnt = AsciiOutputModule(verbosity = 2, allProvenance = True) +process.e = cms.EndPath(process.out+process.prnt, process.t) +process.maxEvents.input = 1 diff --git a/FWCore/Integration/test/provenance_test.sh b/FWCore/Integration/test/provenance_test.sh new file mode 100755 index 0000000000000..5b506006ff4ba --- /dev/null +++ b/FWCore/Integration/test/provenance_test.sh @@ -0,0 +1,12 @@ +#!/bin/sh + + +function die { echo $1: status $2 ; exit $2; } + +#The two jobs will have different ProductRegistries in their output files but have the same ProcessHistory. +# The ProductRegistry just differ because the internal dependencies between the data products is different +# and PoolOutputModule only stores provenance of 'dropped' data products IFF they are parents of a kept product. +# The check makes sure the provenance in the ProductRegistry is properly updated when the new file is read +cmsRun ${SCRAM_TEST_PATH}/provenance_prod_cfg.py || die 'Failed in provenance_prod_cfg.py' $? +cmsRun ${SCRAM_TEST_PATH}/provenance_prod_cfg.py --consumeProd2 || die 'Failed in provenance_prod_cfg.py --consumeProd2' $? +cmsRun ${SCRAM_TEST_PATH}/provenance_check_cfg.py || die 'Failed test of provenance' $? \ No newline at end of file diff --git a/FWCore/Modules/src/AsciiOutputModule.cc b/FWCore/Modules/src/AsciiOutputModule.cc index e9d9fed462149..19897c68b0b5d 100644 --- a/FWCore/Modules/src/AsciiOutputModule.cc +++ b/FWCore/Modules/src/AsciiOutputModule.cc @@ -34,6 +34,7 @@ namespace edm { int prescale_; int verbosity_; int counter_; + bool allProvenance_; }; AsciiOutputModule::AsciiOutputModule(ParameterSet const& pset) @@ -41,7 +42,8 @@ namespace edm { global::OutputModule<>(pset), prescale_(pset.getUntrackedParameter("prescale")), verbosity_(pset.getUntrackedParameter("verbosity")), - counter_(0) { + counter_(0), + allProvenance_(pset.getUntrackedParameter("allProvenance")) { if (prescale_ == 0) prescale_ = 1; } @@ -81,6 +83,29 @@ namespace edm { auto const& prov = e.getProvenance(desc.originalBranchID()); LogAbsolute("AsciiOut") << prov; + if (verbosity_ > 2) { + ProductDescription const& desc2 = prov.productDescription(); + std::string const& process = desc2.processName(); + std::string const& label = desc2.moduleLabel(); + ProcessHistory const& processHistory = e.processHistory(); + + for (ProcessConfiguration const& pc : processHistory) { + if (pc.processName() == process) { + ParameterSetID const& psetID = pc.parameterSetID(); + pset::Registry const* psetRegistry = pset::Registry::instance(); + ParameterSet const* processPset = psetRegistry->getMapped(psetID); + if (processPset) { + if (desc.isAlias()) { + LogAbsolute("AsciiOut") << "Alias PSet\n" << processPset->getParameterSet(desc.moduleLabel()); + } + LogAbsolute("AsciiOut") << processPset->getParameterSet(label) << "\n"; + } + } + } + } + } else if (allProvenance_) { + auto const& prov = e.getStableProvenance(desc.originalBranchID()); + LogAbsolute("AsciiOut") << prov; if (verbosity_ > 2) { ProductDescription const& desc2 = prov.productDescription(); std::string const& process = desc2.processName(); @@ -115,6 +140,8 @@ namespace edm { "1: event ID and timestamp only\n" "2: provenance for each kept product\n" ">2: PSet and provenance for each kept product"); + desc.addUntracked("allProvenance", false) + ->setComment("when printing provenance info, also print stable provenance of non-kept data products."); OutputModule::fillDescription(desc); descriptions.add("asciiOutput", desc); } diff --git a/FWCore/TestProcessor/src/TestSourceProcessor.cc b/FWCore/TestProcessor/src/TestSourceProcessor.cc index eb59d7ce1ff1a..386b77f2871dd 100644 --- a/FWCore/TestProcessor/src/TestSourceProcessor.cc +++ b/FWCore/TestProcessor/src/TestSourceProcessor.cc @@ -229,7 +229,7 @@ namespace edm::test { const size_t size = preg_->size(); preg_->merge(source_->productRegistry(), fb_ ? fb_->fileName() : std::string()); if (size < preg_->size()) { - principalCache_.adjustIndexesAfterProductRegistryAddition(); + principalCache_.adjustIndexesAfterProductRegistryAddition(preg_); } principalCache_.adjustEventsToNewProductRegistry(preg_);