Skip to content

Cluster bug #10506

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion openquake/calculators/event_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ def set_mags(oq, dstore):
trt: python3compat.decode(dset[:])
for trt, dset in dstore['source_mags'].items()}
elif oq.ruptures_hdf5:
pass
pass
elif 'ruptures' in dstore:
# scenario
trts = dstore['full_lt'].trts
Expand Down
1 change: 1 addition & 0 deletions openquake/calculators/tests/classical_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,7 @@ def test_case_80(self):
calculation_mode='event_based',
ground_motion_fields='false')
rups = self.calc.datastore['ruptures'][()]
rups.sort(order='id')
tbl = text_table(rups[['source_id', 'n_occ', 'mag']], ext='org')
self.assertEqualFiles('expected/rups.org', general.gettemp(tbl))

Expand Down
224 changes: 183 additions & 41 deletions openquake/hazardlib/calc/stochastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import time
import numpy
import shapely

from openquake.baselib import hdf5
from openquake.baselib.general import AccumDict, random_histogram
from openquake.baselib.performance import Monitor
Expand Down Expand Up @@ -145,70 +146,211 @@ def sample_cluster(group, num_ses, ses_seed):
dictionaries with keys rup_array, source_data, eff_ruptures
"""
eb_ruptures = []
# group[0] is a source. 'serial' creates a random seed from source_id and
# ses_seed
seed = group[0].serial(ses_seed)
rng = numpy.random.default_rng(seed)
[trt_smr] = set(src.trt_smr for src in group)

# Set the parameters required to compute the number of occurrences
# of the group of group
# of the cluster
samples = getattr(group[0], 'samples', 1)
grp_probability = getattr(group, 'grp_probability', 1.)
tom = group.temporal_occurrence_model
rate = getattr(tom, 'occurrence_rate', None)
if rate is None: # time dependent sources
tot_num_occ = rng.poisson(grp_probability * samples * num_ses)

# Compute the number of occurrences of the cluster
if rate is None: # time dependent cluster
if hasattr(group, 'grp_probability'):
grp_p = getattr(group, 'grp_probability')
tot_num_occ = rng.poisson(grp_p * samples * num_ses)
else:
msg = 'Rate or Cluster probability of occurrence not defined. '
msg += 'Currently, we support only a Poisson TOM'
raise ValueError(msg)
else: # poissonian sources with ClusterPoissonTOM
tot_num_occ = rng.poisson(rate * tom.time_span * samples * num_ses)

# Now we process the sources included in the group. Possible cases:
# * The group contains nonparametric sources with mutex ruptures, while
# the sources are indepedent.
# * The group contains mutually exclusive sources. In this case we
# choose the source first and then some ruptures from the source.
if group.rup_interdep == 'mutex' and group.src_interdep == 'indep':
tmp = rng.poisson(rate * tom.time_span * samples, num_ses)
tot_num_occ = numpy.sum(tmp)

# Check number of occurrences of the cluster
if tot_num_occ < 1:
return eb_ruptures

# Now process the sources included in the cluster. Possible cases:
# * Traditional cluster = all the sources occur
# * Sources are indepedent
# * Sources are mutually exclusive.
if group.src_interdep is None:

allrups = []
weights = []
rupids = []

# Loop over the sources in the cluster and add them to 'allrups'
for src in group:
rupids.extend(src.offset + numpy.arange(src.num_ruptures))
weights.extend(src.rup_weights)
src_seed = src.serial(ses_seed)
cnt = 0
for i, rup in enumerate(src.iter_ruptures()):
rup.src_id = src.id
allrups.append(rup)
# random distribute in bins according to the rup_weights
n_occs = random_histogram(tot_num_occ, weights, seed)
for rup, rupid, n_occ in zip(allrups, rupids, n_occs):
cnt += 1
rupids.extend(src.offset + numpy.arange(cnt))

# Create the EB ruptures
for rup, rupid in zip(allrups, rupids):
ebr = EBRupture(rup, rup.src_id, trt_smr, tot_num_occ, rupid)
ebr.seed = ebr.id + ses_seed
eb_ruptures.append(ebr)

elif group.src_interdep == 'indep':

eb_ruptures = _get_src_indep_rups(
group, tot_num_occ, trt_smr, seed, ses_seed)

elif group.src_interdep == 'mutex':

# Check that ruptures are independent
if not group.rup_interdep == 'indep':
raise NotImplementedError(
f'{group.src_interdep=}, {group.rup_interdep=}')

eb_ruptures = _get_src_mutex_rups(
group, tot_num_occ, trt_smr, seed, ses_seed)

else:
raise NotImplementedError(
f'{group.src_interdep=}, {group.rup_interdep=}')

return eb_ruptures


def _get_src_mutex_rups(group, tot_num_occ, trt_smr, seed, ses_seed):

eb_ruptures = []

# Compute the number of occurrences of each (mutex) source. One source
# is selected every time we have a realization of a cluster. In other
# words, the array 'src_noccs' must have a number of elements equal to
# 'tot_num_occ'
ws = [src.mutex_weight for src in group]
src_noccs = random_histogram(tot_num_occ, ws, seed)

# Find the index of the ruptures generated at each occurrence of
# a source and create the EBRuptures
for i_src, (src, src_nocc) in enumerate(zip(group, src_noccs)):

# Source seed
src_seed = src.serial(ses_seed)
rng = numpy.random.default_rng(src_seed)

# For each rlz, find the number ruptures produced by each source.
# For each realization this is a number comprised between 1 and the
# number of ruptures admitted by that source.
idxs = numpy.arange(1, src.num_ruptures + 1)
n_rups_rlz = rng.choice(idxs, src_nocc)
ids = numpy.empty((src_nocc, src.num_ruptures))
ids[:] = numpy.nan
_set_ids(n_rups_rlz, ids, src_seed, weights=None)

# Ruptures IDs
rupids = src.offset + numpy.arange(src.num_ruptures)

# Generate ruptures
idxs = numpy.arange(src.num_ruptures)
for i_rup, rup, rupid in zip(idxs, src.iter_ruptures(), rupids):

# Find the number of occurrences per rupture
n_occ = int(numpy.sum(ids == i_rup))

# Update the list with the ruptures per LT branch
if n_occ:
ebr = EBRupture(rup, rup.src_id, trt_smr, n_occ, rupid)
ebr = EBRupture(rup, src.id, trt_smr, n_occ, rupid)
ebr.seed = ebr.id + ses_seed
eb_ruptures.append(ebr)

elif group.src_interdep == 'mutex' and group.rup_interdep == 'indep':
# random distribute in bins according to the srcs_weights
ws = [src.mutex_weight for src in group]
src_occs = random_histogram(tot_num_occ, ws, seed)
# NB: in event_based/src_mutex num_ses=2000, samples=1
# and there are 10 sources with weights
# 0.368, 0.061, 0.299, 0.049, 0.028, 0.011, 0.011, 0.018, 0.113, 0.042
# => src_occs = [758, 120, 600, 84, 58, 16, 24, 28, 230, 82]
for src, src_occ in zip(group, src_occs):
src_seed = src.serial(ses_seed)
# random distribute in bins equally
n_occs = random_histogram(src_occ, src.num_ruptures, src_seed)
rseeds = src_seed + numpy.arange(src.num_ruptures)
rupids = src.offset + numpy.arange(src.num_ruptures)
for rup, rupid, n_occ, rseed in zip(
src.iter_ruptures(), rupids, n_occs, rseeds):
if n_occ:
ebr = EBRupture(rup, src.id, trt_smr, n_occ, rupid)
ebr.seed = ebr.id + ses_seed
eb_ruptures.append(ebr)
else:
return eb_ruptures


def _get_src_indep_rups(group, tot_num_occ, trt_smr, seed, ses_seed):

eb_ruptures = []

# Find the number of occurrences per source
occ_per_src = _get_occs_indep_sources(tot_num_occ, group, seed)

# Create the ruptures per LT branch
if not group.rup_interdep == 'mutex':
raise NotImplementedError(
f'{group.src_interdep=}, {group.rup_interdep=}')

# Create the EBRuptures
for i_src, (nocc, src) in enumerate(zip(occ_per_src, group)):

if nocc < 1:
continue

# Get rupture weights
rwei = src.rup_weights
if not hasattr(src, 'rup_weights'):
rwei = 1.0 / src.num_ruptures

# Source seed
src_seed = src.serial(ses_seed)
rng = numpy.random.default_rng(src_seed)

# Get rupture index for each realization
tmp_rup_ids = numpy.arange(src.num_ruptures)
ridx = rng.choice(tmp_rup_ids, nocc, p=rwei)

# Set the rupture IDs for the current source
rupids = src.offset + numpy.arange(src.num_ruptures)

# Create EBruptures
tmp = zip(rupids, src.iter_ruptures())
for i_rup, (rupid, rup) in enumerate(tmp):
n_occ = sum(ridx == i_rup)
rup.src_id = src.id
ebr = EBRupture(rup, src.id, trt_smr, n_occ, rupid)
ebr.seed = ebr.id + ses_seed
eb_ruptures.append(ebr)

return eb_ruptures


def _get_occs_indep_sources(tot_num_occ, group, seed):
# Compute the number of occurrences for each source given a number of
# occurrences of the cluster 'tot_num_occ' and the whole 'group' object

# Get the number of sources included in each cluster and find the
# IDs of the sources for each realization. 'ids' is a numpy.ndarray
# with shape number_of_cluster_rlz x number_of_sources that provides
# the indexes of the sources activated in each cluster realization
ids = numpy.empty((tot_num_occ, len(group)))
ids[:] = numpy.nan

rng = numpy.random.default_rng(seed)
n_srcs_rlz = rng.choice(range(1, len(group) + 1), tot_num_occ)
_set_ids(n_srcs_rlz, ids, seed, None)

# Find the number of occurrences per source
occ_per_src = numpy.zeros((ids.shape[1]), dtype=int)
for i_src in range(ids.shape[1]):
occ_per_src[i_src] = int(numpy.sum(ids == i_src))

return occ_per_src


def _set_ids(n_srcs, ids, seed, weights=None):
# Set the indexes of the sources (or ruptures) in each realization.
# 'n_srcs' is a 1D array with a length corresponding to the number of
# rlzs; 'ids' is 2D array where the number of sources/ruptures is the
# number of rows and the number of sources/ruptures is the number of colums
rng = numpy.random.default_rng(seed)
if weights is None:
weights = numpy.ones((ids.shape[1])) * 1.0 / ids.shape[1]
src_idxs = numpy.arange(0, ids.shape[1], 1)
for irlz, n_src in enumerate(n_srcs):
ids[irlz, 0:n_src] = sorted(
rng.choice(src_idxs, n_src, replace=False))


# NB: there is postfiltering of the ruptures, which is more efficient
def sample_ruptures(sources, cmaker, sitecol=None, monitor=Monitor()):
"""
Expand Down
1 change: 1 addition & 0 deletions openquake/hazardlib/contexts.py
Original file line number Diff line number Diff line change
Expand Up @@ -1467,6 +1467,7 @@ def __init__(self, cmaker, srcfilter, group):
if self.cluster:
tom = group.temporal_occurrence_model
else:
# NOTE: I think we should raise an error here
tom = getattr(self.sources[0], 'temporal_occurrence_model',
PoissonTOM(self.cmaker.investigation_time))
self.cmaker.tom = self.tom = tom
Expand Down
10 changes: 10 additions & 0 deletions openquake/hazardlib/source/non_parametric.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ def __init__(self, source_id, name, tectonic_region_type, data,
for (rup, pmf), weight in zip(data, weights):
rup.weight = weight

@property
def num_ruptures(self):
if not hasattr(self, '_num_ruptures'):
self._num_ruptures = len(self.data)
return self._num_ruptures

@num_ruptures.setter
def num_ruptures(self, value):
self._num_ruptures = value

@property
def rup_weights(self):
return [rup.weight for rup, pmf in self.data]
Expand Down
16 changes: 11 additions & 5 deletions openquake/hazardlib/sourceconverter.py
Original file line number Diff line number Diff line change
Expand Up @@ -1218,12 +1218,15 @@ def convert_sourceGroup(self, node):
sg = SourceGroup(trt, min_mag=self.minimum_magnitude)
sg.temporal_occurrence_model = self.get_tom(node)
sg.name = node.attrib.get('name')
# Set attributes related to occurrence
sg.src_interdep = node.attrib.get('src_interdep', 'indep')
sg.rup_interdep = node.attrib.get('rup_interdep', 'indep')
sg.grp_probability = node.attrib.get('grp_probability', 1)
# Set the cluster attribute
sg.cluster = node.attrib.get('cluster') == 'true'
# Set attributes related to occurrence
sg.src_interdep = node.attrib.get('src_interdep', None)
sg.rup_interdep = node.attrib.get('rup_interdep', None)
if sg.cluster is False:
sg.src_interdep = node.attrib.get('src_interdep', 'indep')
sg.rup_interdep = node.attrib.get('rup_interdep', 'indep')
sg.grp_probability = node.attrib.get('grp_probability', 1)
# Filter admitted cases
# 1. The source group is a cluster. In this case the cluster must have
# the attributes required to define its occurrence in time.
Expand Down Expand Up @@ -1256,6 +1259,9 @@ def convert_sourceGroup(self, node):
if sg and sg.src_interdep == 'mutex':
# sg can be empty if source_id is specified and it is different
# from any source in sg
if srcs_weights is None:
raise ValueError(
'Please make sure that `srcs_weights` is defined')
if len(node) and len(srcs_weights) != len(node):
raise ValueError(
'There are %d srcs_weights but %d source(s) in %s'
Expand All @@ -1265,7 +1271,7 @@ def convert_sourceGroup(self, node):
for src, sw in zip(sg, srcs_weights):
if ':' not in src.source_id:
raise NameError('You must use the colon convention '
'with mutex sources')
'with mutex sources e.g. src:01')
src.mutex_weight = sw
tot += sw
numpy.testing.assert_allclose(
Expand Down
Loading