Skip to content

Commit c3747b5

Browse files
authored
Merge pull request #9062 from gem/_rates
Stored _rates instead of _poes
2 parents 25dbc86 + c373d67 commit c3747b5

File tree

18 files changed

+87
-87
lines changed

18 files changed

+87
-87
lines changed

openquake/baselib/performance.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ def compile(sigstr):
447447
return lambda func: func
448448

449449

450-
# used when reading _poes/sid
450+
# used when reading _rates/sid
451451
@compile(["int64[:, :](uint8[:])",
452452
"int64[:, :](uint16[:])",
453453
"int64[:, :](uint32[:])",

openquake/calculators/base.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
from openquake.hazardlib.site_amplification import Amplifier
4141
from openquake.hazardlib.site_amplification import AmplFunction
4242
from openquake.hazardlib.calc.filters import SourceFilter, getdefault
43+
from openquake.hazardlib.calc.disagg import to_rates
4344
from openquake.hazardlib.source import rupture
4445
from openquake.hazardlib.shakemap.maps import get_sitecol_shakemap
4546
from openquake.hazardlib.shakemap.gmfs import to_gmfs
@@ -574,8 +575,9 @@ def pre_execute(self):
574575
haz_sitecol = readinput.get_site_collection(oq, self.datastore)
575576
self.load_crmodel() # must be after get_site_collection
576577
self.read_exposure(haz_sitecol) # define .assets_by_site
577-
self.datastore.create_df('_poes',
578-
readinput.Global.pmap.to_dframe())
578+
df = readinput.Global.pmap.to_dframe()
579+
df.rate = to_rates(df.rate)
580+
self.datastore.create_df('_rates', df)
579581
self.datastore['assetcol'] = self.assetcol
580582
self.datastore['full_lt'] = fake = logictree.FullLogicTree.fake()
581583
self.datastore['trt_rlzs'] = U32([[0]])
@@ -1059,7 +1061,7 @@ def _gen_riskinputs(self, dstore):
10591061
full_lt = dstore['full_lt'].init()
10601062
out = []
10611063
asset_df = self.assetcol.to_dframe('site_id')
1062-
slices = performance.get_slices(dstore['_poes/sid'][:])
1064+
slices = performance.get_slices(dstore['_rates/sid'][:])
10631065
for sid, assets in asset_df.groupby(asset_df.index):
10641066
# hcurves, shape (R, N)
10651067
getter = getters.PmapGetter(
@@ -1217,8 +1219,9 @@ def import_gmfs_hdf5(dstore, oqparam):
12171219
:param oqparam: an OqParam instance
12181220
:returns: event_ids
12191221
"""
1220-
# NB: you cannot access an external link if the file it points to is already open,
1221-
# therefore you cannot run in parallel two calculations starting from the same GMFs
1222+
# NB: you cannot access an external link if the file it points to is
1223+
# already open, therefore you cannot run in parallel two calculations
1224+
# starting from the same GMFs
12221225
dstore['gmf_data'] = h5py.ExternalLink(oqparam.inputs['gmfs'], "gmf_data")
12231226
attrs = _getset_attrs(oqparam)
12241227
oqparam.hazard_imtls = {imt: [0] for imt in attrs['imts']}

openquake/calculators/classical.py

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
from openquake.hazardlib.contexts import read_cmakers, basename, get_maxsize
3838
from openquake.hazardlib.calc.hazard_curve import classical as hazclassical
3939
from openquake.hazardlib.calc import disagg
40-
from openquake.hazardlib.probability_map import ProbabilityMap, poes_dt
40+
from openquake.hazardlib.probability_map import ProbabilityMap, rates_dt
4141
from openquake.commonlib import calc
4242
from openquake.calculators import base, getters
4343

@@ -177,10 +177,10 @@ def postclassical(pgetter, N, hstats, individual_rlzs,
177177
for sid in sids:
178178
idx = sidx[sid]
179179
with combine_mon:
180-
pc = pgetter.get_pcurve(sid) # shape (L, R)
180+
pc = pgetter.get_hcurve(sid) # shape (L, R)
181181
if amplifier:
182182
pc = amplifier.amplify(ampcode[sid], pc)
183-
# NB: the pcurve have soil levels != IMT levels
183+
# NB: the hcurve have soil levels != IMT levels
184184
if pc.array.sum() == 0: # no data
185185
continue
186186
with compute_mon:
@@ -265,7 +265,7 @@ def get_rates(self, pmap):
265265

266266
def store_poes(self, pnes, the_sids, gid=0):
267267
"""
268-
Store 1-pnes inside the _poes dataset
268+
Store 1-pnes inside the _rates dataset
269269
"""
270270
avg_poe = 0
271271
# store by IMT to save memory
@@ -286,15 +286,16 @@ def store_poes(self, pnes, the_sids, gid=0):
286286
if len(idxs) == 0: # happens in case_60
287287
return 0
288288
sids = the_sids[idxs]
289-
hdf5.extend(self.datastore['_poes/sid'], sids)
290-
hdf5.extend(self.datastore['_poes/gid'], gids + gid)
291-
hdf5.extend(self.datastore['_poes/lid'], lids + slc.start)
292-
hdf5.extend(self.datastore['_poes/poe'], poes[idxs, lids, gids])
289+
hdf5.extend(self.datastore['_rates/sid'], sids)
290+
hdf5.extend(self.datastore['_rates/gid'], gids + gid)
291+
hdf5.extend(self.datastore['_rates/lid'], lids + slc.start)
292+
hdf5.extend(self.datastore['_rates/rate'],
293+
disagg.to_rates(poes[idxs, lids, gids]))
293294

294295
# slice_by_sid contains 3x6=18 slices in classical/case_22
295296
# which has 6 IMTs each one with 20 levels
296297
sbs = build_slice_by_sid(sids, self.offset)
297-
hdf5.extend(self.datastore['_poes/slice_by_sid'], sbs)
298+
hdf5.extend(self.datastore['_rates/slice_by_sid'], sbs)
298299
self.offset += len(sids)
299300
avg_poe += poes.mean(axis=(0, 2)) @ self.level_weights[slc]
300301
self.acc['avg_poe'] = avg_poe
@@ -395,8 +396,8 @@ def init_poes(self):
395396
self.cmakers = read_cmakers(self.datastore, self.csm)
396397
self.cfactor = numpy.zeros(3)
397398
self.rel_ruptures = AccumDict(accum=0) # grp_id -> rel_ruptures
398-
self.datastore.create_df('_poes', poes_dt.items())
399-
self.datastore.create_dset('_poes/slice_by_sid', slice_dt)
399+
self.datastore.create_df('_rates', rates_dt.items())
400+
self.datastore.create_dset('_rates/slice_by_sid', slice_dt)
400401
# NB: compressing the dataset causes a big slowdown in writing :-(
401402

402403
oq = self.oqparam
@@ -444,7 +445,7 @@ def execute(self):
444445
oq.mags_by_trt = {
445446
trt: python3compat.decode(dset[:])
446447
for trt, dset in parent['source_mags'].items()}
447-
if '_poes' in parent:
448+
if '_rates' in parent:
448449
self.build_curves_maps() # repeat post-processing
449450
return {}
450451
else:
@@ -475,7 +476,7 @@ def execute(self):
475476
logging.info('cfactor = {:_d}/{:_d} = {:.1f}'.format(
476477
int(self.cfactor[1]), int(self.cfactor[0]),
477478
self.cfactor[1] / self.cfactor[0]))
478-
if '_poes' in self.datastore:
479+
if '_rates' in self.datastore:
479480
self.build_curves_maps()
480481
if not oq.hazard_calculation_id:
481482
self.classical_time = time.time() - t0
@@ -663,19 +664,19 @@ def build_curves_maps(self):
663664
if not oq.hazard_curves: # do nothing
664665
return
665666
N, S, M, P, L1, individual = self._create_hcurves_maps()
666-
poes_gb = self.datastore.getsize('_poes') / 1024**3
667+
poes_gb = self.datastore.getsize('_rates') / 1024**3
667668
if poes_gb < 1:
668669
ct = int(poes_gb * 32) or 1
669670
else:
670671
ct = int(poes_gb) + 32 # number of tasks > number of GB
671672
if ct > 1:
672673
logging.info('Producing %d postclassical tasks', ct)
673-
if '_poes' in set(self.datastore):
674+
if '_rates' in set(self.datastore):
674675
dstore = self.datastore
675676
else:
676677
dstore = self.datastore.parent
677678
sites_per_task = int(numpy.ceil(self.N / ct))
678-
sbs = dstore['_poes/slice_by_sid'][:]
679+
sbs = dstore['_rates/slice_by_sid'][:]
679680
sbs['sid'] //= sites_per_task
680681
# NB: there is a genious idea here, to split in tasks by using
681682
# the formula ``taskno = sites_ids // sites_per_task`` and then

openquake/calculators/classical_bcr.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,8 @@ def classical_bcr(riskinputs, param, monitor):
4848
haz = ri.hazard_getter.get_hazard()
4949
for taxo, assets in ri.asset_df.groupby('taxonomy'):
5050
for rlz in range(R):
51-
pcurve = haz.extract(rlz)
52-
out = crmodel.get_output(assets, pcurve)
51+
hcurve = haz.extract(rlz)
52+
out = crmodel.get_output(assets, hcurve)
5353
for asset, (eal_orig, eal_retro, bcr) in zip(
5454
assets.to_records(), out['structural']):
5555
aval = asset['value-structural']

openquake/calculators/classical_damage.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ def classical_damage(riskinputs, param, monitor):
4949
haz = ri.hazard_getter.get_hazard()
5050
for taxo, assets in ri.asset_df.groupby('taxonomy'):
5151
for rlz in range(R):
52-
pcurve = haz.extract(rlz)
53-
out = crmodel.get_output(assets, pcurve)
52+
hcurve = haz.extract(rlz)
53+
out = crmodel.get_output(assets, hcurve)
5454
for li, loss_type in enumerate(crmodel.loss_types):
5555
for a, frac in zip(assets.ordinal, out[loss_type]):
5656
result[a][rlz, li] = frac

openquake/calculators/classical_risk.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ def classical_risk(riskinputs, oqparam, monitor):
5050
haz = ri.hazard_getter.get_hazard()
5151
for taxo, asset_df in ri.asset_df.groupby('taxonomy'):
5252
for rlz in range(R):
53-
pcurve = haz.extract(rlz)
54-
out = crmodel.get_output(asset_df, pcurve)
53+
hcurve = haz.extract(rlz)
54+
out = crmodel.get_output(asset_df, hcurve)
5555
for li, loss_type in enumerate(crmodel.loss_types):
5656
# loss_curves has shape (A, C)
5757
for i, asset in enumerate(asset_df.to_records()):
@@ -92,7 +92,7 @@ def pre_execute(self):
9292
"""
9393
oq = self.oqparam
9494
super().pre_execute()
95-
if '_poes' not in self.datastore: # when building short report
95+
if '_rates' not in self.datastore: # when building short report
9696
return
9797
full_lt = self.datastore['full_lt']
9898
self.realizations = full_lt.get_realizations()

openquake/calculators/disaggregation.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,9 @@ def get_curve(self, sid, rlzs):
167167
:returns: a list of Z arrays of PoEs
168168
"""
169169
poes = []
170-
pcurve = self.pgetter.get_pcurve(sid)
170+
hcurve = self.pgetter.get_hcurve(sid)
171171
for z, rlz in enumerate(rlzs):
172-
pc = pcurve.extract(rlz)
172+
pc = hcurve.extract(rlz)
173173
if z == 0:
174174
self.curves.append(pc.array[:, 0])
175175
poes.append(pc.array[:, 0])
@@ -197,7 +197,7 @@ def full_disaggregation(self):
197197
self.M = len(self.imts)
198198
dstore = (self.datastore.parent if self.datastore.parent
199199
else self.datastore)
200-
nrows = len(dstore['_poes/sid'])
200+
nrows = len(dstore['_rates/sid'])
201201
self.pgetter = getters.PmapGetter(
202202
dstore, full_lt, [(0, nrows + 1)], oq.imtls, oq.poes)
203203

@@ -207,12 +207,12 @@ def full_disaggregation(self):
207207
rlzs = numpy.zeros((self.N, Z), int)
208208
if self.R > 1:
209209
for sid in self.sitecol.sids:
210-
pcurve = self.pgetter.get_pcurve(sid)
210+
hcurve = self.pgetter.get_hcurve(sid)
211211
mean = getters.build_stat_curve(
212-
pcurve, oq.imtls, stats.mean_curve, full_lt.weights)
212+
hcurve, oq.imtls, stats.mean_curve, full_lt.weights)
213213
# get the closest realization to the mean
214214
rlzs[sid] = util.closest_to_ref(
215-
pcurve.array.T, mean.array)[:Z]
215+
hcurve.array.T, mean.array)[:Z]
216216
self.datastore['best_rlzs'] = rlzs
217217
else:
218218
Z = len(oq.rlz_index)

openquake/calculators/getters.py

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from openquake.hazardlib import probability_map, stats
2424
from openquake.hazardlib.calc.disagg import to_rates, to_probs
2525
from openquake.hazardlib.source.rupture import (
26-
BaseRupture, RuptureProxy, EBRupture, get_ebr)
26+
BaseRupture, RuptureProxy, get_ebr)
2727
from openquake.commonlib import datastore
2828

2929
U16 = numpy.uint16
@@ -40,11 +40,11 @@ class NotFound(Exception):
4040
pass
4141

4242

43-
def build_stat_curve(pcurve, imtls, stat, weights, use_rates=False):
43+
def build_stat_curve(hcurve, imtls, stat, weights, use_rates=False):
4444
"""
4545
Build statistics by taking into account IMT-dependent weights
4646
"""
47-
poes = pcurve.array.T # shape R, L
47+
poes = hcurve.array.T # shape R, L
4848
assert len(poes) == len(weights), (len(poes), len(weights))
4949
L = imtls.size
5050
array = numpy.zeros((L, 1))
@@ -99,11 +99,11 @@ def get_hcurve(self, src_id, imt=None, site_id=0, gsim_idx=None):
9999
assert ';' in src_id, src_id # must be a realization specific src_id
100100
imt_slc = self.imtls(imt) if imt else slice(None)
101101
start, gsims, weights = self.bysrc[src_id]
102-
dset = self.dstore['_poes']
102+
dset = self.dstore['_rates']
103103
if gsim_idx is None:
104104
curves = dset[start:start + len(gsims), site_id, imt_slc]
105105
return weights @ curves
106-
return dset[start + gsim_idx, site_id, imt_slc]
106+
return to_probs(dset[start + gsim_idx, site_id, imt_slc])
107107

108108
# NB: not used right now
109109
def get_hcurves(self, src, imt=None, site_id=0, gsim_idx=None):
@@ -192,15 +192,15 @@ def init(self):
192192
G = len(self.trt_rlzs)
193193
with hdf5.File(self.filename) as dstore:
194194
for start, stop in self.slices:
195-
poes_df = dstore.read_df('_poes', slc=slice(start, stop))
196-
for sid, df in poes_df.groupby('sid'):
195+
rates_df = dstore.read_df('_rates', slc=slice(start, stop))
196+
for sid, df in rates_df.groupby('sid'):
197197
try:
198198
array = self._pmap[sid].array
199199
except KeyError:
200200
array = numpy.zeros((self.L, G))
201201
self._pmap[sid] = probability_map.ProbabilityCurve(
202202
array)
203-
array[df.lid, df.gid] = df.poe
203+
array[df.lid, df.gid] = to_probs(df.rate)
204204
return self._pmap
205205

206206
# used in risk calculations where there is a single site per getter
@@ -215,20 +215,24 @@ def get_hazard(self, gsim=None):
215215
# classical_risk/case_3 for the first site
216216
return probability_map.ProbabilityCurve(
217217
numpy.zeros((self.L, self.num_rlzs)))
218-
return self.get_pcurve(self.sids[0])
218+
return self.get_hcurve(self.sids[0])
219219

220-
def get_pcurve(self, sid): # used in classical
220+
def get_hcurve(self, sid): # used in classical
221221
"""
222-
:returns: a ProbabilityCurve of shape L, R
222+
:param sid: a site ID
223+
:returns: a ProbabilityCurve of shape L, R for the given site ID
223224
"""
224225
pmap = self.init()
225226
pc0 = probability_map.ProbabilityCurve(
226227
numpy.zeros((self.L, self.num_rlzs)))
227228
if sid not in pmap: # no hazard for sid
228229
return pc0
229-
for g, trs in enumerate(self.trt_rlzs):
230-
probability_map.combine_probs(
231-
pc0.array, pmap[sid].array[:, g], trs % TWO24)
230+
for g, t_rlzs in enumerate(self.trt_rlzs):
231+
rlzs = t_rlzs % TWO24
232+
rates = to_rates(pmap[sid].array[:, g])
233+
for rlz in rlzs:
234+
pc0.array[:, rlz] += rates
235+
pc0.array = to_probs(pc0.array)
232236
return pc0
233237

234238
def get_mean(self):
@@ -243,16 +247,16 @@ def get_mean(self):
243247
if len(self.weights) == 1: # one realization
244248
# the standard deviation is zero
245249
pmap = self.get(0)
246-
for sid, pcurve in pmap.items():
247-
array = numpy.zeros(pcurve.array.shape)
248-
array[:, 0] = pcurve.array[:, 0]
249-
pcurve.array = array
250+
for sid, hcurve in pmap.items():
251+
array = numpy.zeros(hcurve.array.shape)
252+
array[:, 0] = hcurve.array[:, 0]
253+
hcurve.array = array
250254
return pmap
251255
L = self.imtls.size
252256
pmap = probability_map.ProbabilityMap(self.sids, L, 1)
253257
for sid in self.sids:
254258
pmap[sid] = build_stat_curve(
255-
self.get_pcurve(sid),
259+
self.get_hcurve(sid),
256260
self.imtls, stats.mean_curve, self.weights)
257261
return pmap
258262

openquake/calculators/tests/classical_damage_test.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,13 @@
1616
# You should have received a copy of the GNU Affero General Public License
1717
# along with OpenQuake. If not, see <http://www.gnu.org/licenses/>.
1818

19+
import numpy
1920
from openquake.qa_tests_data.classical_damage import (
2021
case_1, case_2, case_1a, case_1b, case_1c, case_2a, case_2b, case_3a,
2122
case_4a, case_4b, case_4c, case_5a, case_6a, case_6b, case_7a, case_7b,
2223
case_7c, case_8a, case_master)
2324
from openquake.calculators.export import export
24-
from openquake.calculators.tests import (
25-
CalculatorTestCase, strip_calc_id, NOT_DARWIN)
26-
27-
import numpy
25+
from openquake.calculators.tests import CalculatorTestCase, strip_calc_id
2826

2927
aae = numpy.testing.assert_almost_equal
3028

openquake/calculators/tests/classical_risk_test.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,6 @@ def test_case_5(self):
6868
# test with different curve resolution for different taxonomies
6969
self.run_calc(case_5.__file__, 'job_h.ini,job_r.ini')
7070

71-
# check the cutoff in classical.fix_ones
72-
df = self.calc.datastore.read_df('_poes')
73-
num_ones = (df.poe == 1.).sum()
74-
self.assertEqual(num_ones, 0)
75-
7671
# check mean loss curves
7772
[fname] = export(('loss_curves/mean', 'csv'), self.calc.datastore)
7873
self.assertEqualFiles('expected/loss_curves-mean.csv', fname)

0 commit comments

Comments
 (0)