Skip to content

Build example identifiers with several processes #59 #69

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 158 additions & 103 deletions dl1_data_handler/reader.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from collections import OrderedDict
import random
import threading
import multiprocessing as mp

import numpy as np
import tables
Expand Down Expand Up @@ -34,12 +35,12 @@ def __init__(self,
shuffle=False,
seed=None,
image_channels=None,
mapping_method=None,
mapping_settings=None,
array_info=None,
event_info=None,
transforms=None,
validate_processor=False
validate_processor=False,
num_workers=1
):

# Construct dict of filename:file_handle pairs
Expand All @@ -60,110 +61,59 @@ def __init__(self,

self.example_identifiers = None
self.telescopes = {}
if selected_telescope_ids is None:
selected_telescope_ids = {}
self.tel_type = None

if event_selection is None:
event_selection = {}
if selected_telescope_ids is not None:
self.selected_telescope_ids = selected_telescope_ids
else:
self.selected_telescope_ids = {}

self.selected_telescope_type = selected_telescope_type

if event_selection is not None:
self.event_selection = event_selection
else:
self.event_selection = {}

if image_selection is None:
image_selection = {}
if image_selection is not None:
self.image_selection = image_selection
else:
self.image_selection = {}

self.selection_string = selection_string

if mapping_settings is None:
mapping_settings = {}

# Loop over the files to assemble the selected event identifiers
for filename, f in self.files.items():
example_identifiers = []

# Get dict of all the tel_types in the file mapped to their tel_ids
telescopes = {}
for row in f.root.Array_Information:
tel_type = row['type'].decode()
if tel_type not in telescopes:
telescopes[tel_type] = []
telescopes[tel_type].append(row['id'])

# Enforce an automatic minimal telescope selection cut:
# there must be at least one triggered telescope of a
# selected type in the event
# Users can include stricter cuts in the selection string
if self.mode in ['mono', 'stereo']:
if selected_telescope_type is None:
# Default: use the first tel type in the file
default = f.root.Array_Information[0]['type'].decode()
selected_telescope_type = default
self.tel_type = selected_telescope_type
selected_tel_types = [selected_telescope_type]
elif self.mode == 'multi-stereo':
if selected_telescope_type is None:
# Default: use all tel types
selected_telescope_type = list(telescopes)
self.tel_type = None
selected_tel_types = selected_telescope_type
multiplicity_conditions = ['(' + tel_type + '_multiplicity > 0)'
for tel_type in selected_tel_types]
tel_cut_string = '(' + ' | '.join(multiplicity_conditions) + ')'
# Combine minimal telescope cut with explicit selection cuts
if selection_string:
cut_condition = selection_string + ' & ' + tel_cut_string
else:
cut_condition = tel_cut_string

# Select which telescopes from the full dataset to include in each
# event by a telescope type and an optional list of telescope ids.
selected_telescopes = {}
for tel_type in selected_tel_types:
available_tel_ids = telescopes[tel_type]
# Keep only the selected tel ids for the tel type
if tel_type in selected_telescope_ids:
# Check all requested telescopes are available to select
requested_tel_ids = selected_telescope_ids[tel_type]
invalid_tel_ids = (set(requested_tel_ids)
- set(available_tel_ids))
if invalid_tel_ids:
raise ValueError("Tel ids {} are not a valid selection"
"for tel type '{}'".format(
invalid_tel_ids, tel_type))
selected_telescopes[tel_type] = requested_tel_ids
else:
selected_telescopes[tel_type] = available_tel_ids

selected_nrows = set([row.nrow for row
in f.root.Events.where(cut_condition)])
selected_nrows &= self._select_event(f, event_selection)
selected_nrows = list(selected_nrows)

# Make list of identifiers of all examples passing event selection
if self.mode in ['stereo', 'multi-stereo']:
example_identifiers = [(filename, nrow) for nrow
in selected_nrows]
elif self.mode == 'mono':
example_identifiers = []
field = '{}_indices'.format(self.tel_type)
selected_indices = f.root.Events.read_coordinates(selected_nrows, field=field)
for tel_id in selected_telescopes[self.tel_type]:
img_ids = set(selected_indices[:, telescopes[self.tel_type].index(tel_id)])
img_ids.remove(0)
img_ids = list(img_ids)
# TODO handle all selected channels
mask = self._select_image(f.root[self.tel_type][img_ids]['charge'], image_selection)
img_ids = np.array(img_ids)[mask]
for index in img_ids:
example_identifiers.append((filename, index, tel_id))

# Confirm that the files are consistent and merge them
if not self.telescopes:
self.telescopes = telescopes
if self.telescopes != telescopes:
raise ValueError("Inconsistent telescope definition in "
"{}".format(filename))
self.selected_telescopes = selected_telescopes
# Load telescope information from first file
first_file = list(self.files)[0]
self.telescopes, self.tel_type, self.selected_telescopes = self._load_telescope_data(self.files[first_file],
self.selected_telescope_type,
self.selected_telescope_ids)

if self.example_identifiers is None:
self.example_identifiers = example_identifiers
else:
self.example_identifiers.extend(example_identifiers)
file_queue = mp.Queue()
# Loop over the files to assemble the selected event identifiers
for filename in list(self.files):
file_queue.put(filename)
# Create shared variables
manager = mp.Manager()
ex_identifiers_mp = manager.list()
# Run processes
if num_workers > 0:
num_workers = num_workers
else:
num_workers = 1
workers = [mp.Process(target=self._create_example_identifiers,
args=(file_queue,
ex_identifiers_mp,
)) for _ in range(num_workers)]
for w in workers:
w.start()
file_queue.close()
for w in workers:
w.join()

self.example_identifiers = list(ex_identifiers_mp)

# Shuffle the examples
if shuffle:
Expand Down Expand Up @@ -196,7 +146,7 @@ def __init__(self,
}
]
for col_name in self.array_info:
col = f.root.Array_Information.cols._f_col(col_name)
col = self.files[first_file].root.Array_Information.cols._f_col(col_name)
self.unprocessed_example_description.append(
{
'name': col_name,
Expand Down Expand Up @@ -226,7 +176,7 @@ def __init__(self,
}
]
for col_name in self.array_info:
col = f.root.Array_Information.cols._f_col(col_name)
col = self.files[first_file].root.Array_Information.cols._f_col(col_name)
self.unprocessed_example_description.append(
{
'name': col_name,
Expand Down Expand Up @@ -258,7 +208,7 @@ def __init__(self,
}
])
for col_name in self.array_info:
col = f.root.Array_Information.cols._f_col(col_name)
col = self.files[first_file].root.Array_Information.cols._f_col(col_name)
self.unprocessed_example_description.append(
{
'name': tel_type + '_' + col_name,
Expand All @@ -270,7 +220,7 @@ def __init__(self,
)
# Add event info to description
for col_name in self.event_info:
col = f.root.Events.cols._f_col(col_name)
col = self.files[first_file].root.Events.cols._f_col(col_name)
self.unprocessed_example_description.append(
{
'name': col_name,
Expand All @@ -291,6 +241,111 @@ def __init__(self,
# Definition of preprocessed example
self.example_description = self.processor.output_description

def _load_telescope_data(self, file, selected_telescope_type, selected_telescope_ids):
# Get dict of all the tel_types in the file mapped to their tel_ids
telescopes = {}
for row in file.root.Array_Information:
t_type = row['type'].decode()
if t_type not in telescopes:
telescopes[t_type] = []
telescopes[t_type].append(row['id'])

tel_type = None
if self.mode in ['mono', 'stereo']:
if selected_telescope_type is None:
# Default: use the first tel type in the file
default = file.root.Array_Information[0]['type'].decode()
tel_type = default
else:
tel_type = selected_telescope_type
selected_tel_types = [tel_type]
elif self.mode == 'multi-stereo':
if selected_telescope_type is None:
# Default: use all tel types
selected_tel_types = list(telescopes)
else:
selected_tel_types = selected_telescope_type
tel_type = None

# Select which telescopes from the full dataset to include in each
# event by a telescope type and an optional list of telescope ids.
selected_telescopes = {}
for tel_type in selected_tel_types:
available_tel_ids = telescopes[tel_type]
# Keep only the selected tel ids for the tel type
if tel_type in selected_telescope_ids:
# Check all requested telescopes are available to select
requested_tel_ids = selected_telescope_ids[tel_type]
invalid_tel_ids = (set(requested_tel_ids)
- set(available_tel_ids))
if invalid_tel_ids:
raise ValueError("Tel ids {} are not a valid selection"
"for tel type '{}'".format(
invalid_tel_ids, tel_type))
selected_telescopes[tel_type] = requested_tel_ids
else:
selected_telescopes[tel_type] = available_tel_ids

return telescopes, tel_type, selected_telescopes

def _check_telescope_consistency(self, telescopes, tel_type, selected_telescopes):
assert self.telescopes == telescopes, 'Files inconsistent'
assert self.tel_type == tel_type, 'Files inconsistent'
assert self.selected_telescopes == selected_telescopes, 'Files inconsistent'

def _create_example_identifiers(self, file_queue, ex_identifiers_mp):

while True:
if file_queue.empty():
break
else:
filename = file_queue.get()
f = self.files[filename]
example_identifiers = []
telescopes, tel_type, selected_telescopes = self._load_telescope_data(f,
self.selected_telescope_type,
self.selected_telescope_ids
)

# Enforce an automatic minimal telescope selection cut:
# there must be at least one triggered telescope of a
# selected type in the event
# Users can include stricter cuts in the selection string
multiplicity_conditions = ['(' + tel_type + '_multiplicity > 0)'
for tel_type in list(selected_telescopes)]
tel_cut_string = '(' + ' | '.join(multiplicity_conditions) + ')'
# Select events
# Combine minimal telescope cut with explicit selection cuts
if self.selection_string is not None:
cut_condition = self.selection_string + ' & ' + tel_cut_string
else:
cut_condition = tel_cut_string
selected_nrows = set([row.nrow for row
in f.root.Events.where(cut_condition)])
selected_nrows &= self._select_event(f, self.event_selection)
selected_nrows = list(selected_nrows)

# Make list of identifiers of all examples passing event selection
if self.mode in ['stereo', 'multi-stereo']:
example_identifiers = [(filename, nrow) for nrow
in selected_nrows]
elif self.mode == 'mono':
example_identifiers = []
field = '{}_indices'.format(self.tel_type)
# Select images
selected_indices = f.root.Events.read_coordinates(selected_nrows, field=field)
for tel_id in selected_telescopes[self.tel_type]:
img_ids = set(selected_indices[:, telescopes[self.tel_type].index(tel_id)])
img_ids.remove(0)
img_ids = list(img_ids)
# TODO handle all selected channels
mask = self._select_image(f.root[self.tel_type][img_ids]['charge'], self.image_selection)
img_ids = np.array(img_ids)[mask]
for index in img_ids:
example_identifiers.append((filename, index, tel_id))

ex_identifiers_mp.extend(example_identifiers)

def _select_event(self, file, filters):
"""
Filter the data event wise.
Expand Down