Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/atip/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,10 @@ def _recalculate_phys_data(self, callback):
)
except Exception as e:
warn(at.AtWarning(e), stacklevel=1)
logging.warning(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also skips the cothread.Yield(). Is that appropriate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, if you imagine there are no further caputs or similar, doesn't this mean the Signal never gets sent and any pytac gets or wait_for_calculations timeout?

Copy link
Contributor

@MJGaughran MJGaughran May 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be helpful to know the motivation for this change, as this feels like this code would all be changed after the threading rework later on, anyway.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The motivation is:

  1. Don't post changes to PVs when the simulation hasnt been updated because
    - Clients may think that their caput has caused the pvs to update but not change value, so the feedbacks for example may conclude it needs to increase its delta.
    - It wastes time updating thousands of pvs
  2. Improve logging, the warn() is only printed once in its current implementation, so if we keep getting this exception, after the first warning, the user will assume everything is running fine and they will see pvs updating, but they arent updating based on the users setpoints.

So I think it is quite an important change.

Skipping cothread.Yield() may cause an issue, its hard for me to know. We do potentially yield immediately after when we call _gather_one_sample() which waits on an item to be in _queue. Im not sure if cothread will always yield here or only yield if the queue is empty, asyncio would always yield.

Also it would make sense for the wait_for_calculations to timeout as the calculation has failed. We have recieved a caput, so we mark _up_to_date false, then we fail to implement the caput, so _up_to_date should remain false unless a later caput causes the simulation to succeed, in which case we will yield.

So I think its okay, this will all change when we switch to asyncio of course.

"PVs will not be updated due to simulation exception"
)
continue
# Signal the up to date flag since the physics data is now up to date.
# We do this before the callback is executed in case the callback
# checks the flag.
Expand Down
2 changes: 1 addition & 1 deletion src/virtac/atip_ioc_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def main():
ring_mode = str(os.environ["RINGMODE"])
except KeyError:
try:
value = caget("SR-CS-RING-01:MODE", timeout=0.5, format=2)
value = caget("SR-CS-RING-01:MODE", timeout=1, format=2)
ring_mode = value.enums[int(value)]
logging.warning(
f"Ring mode not specified, using value from real "
Expand Down
290 changes: 150 additions & 140 deletions src/virtac/atip_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,16 @@ def _create_records(self, limits_csv, disable_emittance):
"""
limits_dict = {}
if limits_csv is not None:
csv_reader = csv.DictReader(open(limits_csv))
for line in csv_reader:
limits_dict[line["pv"]] = (
float(line["upper"]),
float(line["lower"]),
int(line["precision"]),
float(line["drive high"]),
float(line["drive low"]),
)
with open(limits_csv) as f:
csv_reader = csv.DictReader(f)
for line in csv_reader:
limits_dict[line["pv"]] = (
float(line["upper"]),
float(line["lower"]),
int(line["precision"]),
float(line["drive high"]),
float(line["drive low"]),
)

bend_in_record = None
for element in self.lattice:
Expand Down Expand Up @@ -326,47 +327,51 @@ def _create_feedback_or_bba_records_from_csv(
"""
# We don't set limits or precision but this shouldn't be an issue as these
# records aren't really intended to be set to by a user.
csv_reader = csv.DictReader(open(csv_file))
records: dict[
tuple[int, str], builder.aIn | builder.aOut | builder.WaveformOut
] = {}
for line in csv_reader:
val: typing.Any = 0
prefix, suffix = line["pv"].split(":", 1)
builder.SetDeviceName(prefix)
try:
# Waveform records may have values stored as a list such as: [5 1 3]
# Here we convert that into a numpy array for initialising the record
if (line["value"][0], line["value"][-1]) == ("[", "]"):
val = numpy.fromstring((line["value"])[1:-1], sep=" ")
else:
val = float(line["value"])
except (AssertionError, ValueError) as exc:
raise ValueError(
f"Invalid initial value for {line['record_type']} record: "
f"{line['value']}"
) from exc
else:
if line["record_type"] == "ai":
record = builder.aIn(suffix, initial_value=val, MDEL="-1")
records[(int(line["index"]), line["field"])] = record
elif line["record_type"] == "ao":
record = builder.aOut(suffix, initial_value=val, always_update=True)
records[(int(line["index"]), line["field"])] = record
elif line["record_type"] == "wfm":
record = builder.WaveformOut(
suffix,
# We remove the [] around the string
initial_value=val,
always_update=True,
)
records[(int(line["index"]), line["field"])] = record
else:
with open(csv_file) as f:
csv_reader = csv.DictReader(f)
records: dict[
tuple[int, str], builder.aIn | builder.aOut | builder.WaveformOut
] = {}
for line in csv_reader:
val: typing.Any = 0
prefix, suffix = line["pv"].split(":", 1)
builder.SetDeviceName(prefix)
try:
# Waveform records may have values stored as a list such as: [5 1 3]
# Here we convert that into a numpy array for initialising the
# record
if (line["value"][0], line["value"][-1]) == ("[", "]"):
val = numpy.fromstring((line["value"])[1:-1], sep=" ")
else:
val = float(line["value"])
except (AssertionError, ValueError) as exc:
raise ValueError(
f"Failed to create PV from csv file line num "
f"{csv_reader.line_num} invalid record_type: "
f"{line['record_type']}"
)
f"Invalid initial value for {line['record_type']} record: "
f"{line['value']}"
) from exc
else:
if line["record_type"] == "ai":
record = builder.aIn(suffix, initial_value=val, MDEL="-1")
records[(int(line["index"]), line["field"])] = record
elif line["record_type"] == "ao":
record = builder.aOut(
suffix, initial_value=val, always_update=True
)
records[(int(line["index"]), line["field"])] = record
elif line["record_type"] == "wfm":
record = builder.WaveformOut(
suffix,
# We remove the [] around the string
initial_value=val,
always_update=True,
)
records[(int(line["index"]), line["field"])] = record
else:
raise ValueError(
f"Failed to create PV from csv file line num "
f"{csv_reader.line_num} invalid record_type: "
f"{line['record_type']}"
)
return records

def _create_mirror_records(self, mirror_csv):
Expand All @@ -377,84 +382,88 @@ def _create_mirror_records(self, mirror_csv):
mirror_csv (str): The filepath to the .csv file to load the
records in accordance with.
"""
csv_reader = csv.DictReader(open(mirror_csv))
for line in csv_reader:
# Parse arguments.
input_pvs = line["in"].split(", ")
if (len(input_pvs) > 1) and (
line["mirror type"] in ["basic", "inverse", "refresh"]
):
raise IndexError(
"Transformation, refresher, and basic mirror "
"types take only one input PV."
)
elif (len(input_pvs) < 2) and (
line["mirror type"] in ["collate", "summate"]
):
raise IndexError(
"collation and summation mirror types take at least two input PVs."
)
monitor = input_pvs # need to update to support camonitor multiple
# Convert input pvs to record objects
input_records = []
for pv in input_pvs:
try:
input_records.append(self._record_names[pv])
except KeyError:
input_records.append(caget_mask(pv))
# Create output record.
prefix, suffix = line["out"].split(":", 1)
builder.SetDeviceName(prefix)
if line["mirror type"] == "refresh":
# Refresh records come first as do not require an output record
pass
elif line["output type"] == "caput":
output_record = caput_mask(line["out"])
elif line["output type"] == "aIn":
value = float(line["value"])
output_record = builder.aIn(suffix, initial_value=value, MDEL="-1")
elif line["output type"] == "longIn":
value = int(line["value"])
output_record = builder.longIn(suffix, initial_value=value, MDEL="-1")
elif line["output type"] == "Waveform":
value = numpy.asarray(line["value"][1:-1].split(", "), dtype=float)
output_record = builder.Waveform(suffix, initial_value=value)
else:
raise TypeError(
f"{line['output type']} isn't a supported mirroring output type;"
"please enter 'caput', 'aIn', 'longIn', or 'Waveform'."
)
# Update the mirror dictionary.
for pv in monitor:
if pv not in self._mirrored_records:
self._mirrored_records[pv] = []
if line["mirror type"] == "basic":
self._mirrored_records[monitor[0]].append(output_record)
elif line["mirror type"] == "inverse":
# Other transformation types are not yet supported.
transformation = transform(numpy.invert, output_record)
self._mirrored_records[monitor[0]].append(transformation)
elif line["mirror type"] == "summate":
summation_object = summate(input_records, output_record)
for pv in monitor:
self._mirrored_records[pv].append(summation_object)
elif line["mirror type"] == "collate":
collation_object = collate(input_records, output_record)
with open(mirror_csv) as f:
csv_reader = csv.DictReader(f)
for line in csv_reader:
# Parse arguments.
input_pvs = line["in"].split(", ")
if (len(input_pvs) > 1) and (
line["mirror type"] in ["basic", "inverse", "refresh"]
):
raise IndexError(
"Transformation, refresher, and basic mirror "
"types take only one input PV."
)
elif (len(input_pvs) < 2) and (
line["mirror type"] in ["collate", "summate"]
):
raise IndexError(
"collation and summation mirror types take at least two input "
"PVs."
)
monitor = input_pvs # need to update to support camonitor multiple
# Convert input pvs to record objects
input_records = []
for pv in input_pvs:
try:
input_records.append(self._record_names[pv])
except KeyError:
input_records.append(caget_mask(pv))
# Create output record.
prefix, suffix = line["out"].split(":", 1)
builder.SetDeviceName(prefix)
if line["mirror type"] == "refresh":
# Refresh records come first as do not require an output record
pass
elif line["output type"] == "caput":
output_record = caput_mask(line["out"])
elif line["output type"] == "aIn":
value = float(line["value"])
output_record = builder.aIn(suffix, initial_value=value, MDEL="-1")
elif line["output type"] == "longIn":
value = int(line["value"])
output_record = builder.longIn(
suffix, initial_value=value, MDEL="-1"
)
elif line["output type"] == "Waveform":
value = numpy.asarray(line["value"][1:-1].split(", "), dtype=float)
output_record = builder.Waveform(suffix, initial_value=value)
else:
raise TypeError(
f"{line['output type']} isn't a supported mirroring output "
"type; please enter 'caput', 'aIn', 'longIn', or 'Waveform'."
)
# Update the mirror dictionary.
for pv in monitor:
self._mirrored_records[pv].append(collation_object)
elif line["mirror type"] == "refresh":
refresh_object = refresher(self, line["out"])
self._mirrored_records[pv].append(refresh_object)
else:
raise TypeError(
f"{line['mirror type']} is not a valid mirror type; please enter a "
"a currently supported type from: 'basic', 'summate', 'collate', "
"'inverse', and 'refresh'."
)
mirrored_records = []
for rec_list in self._mirrored_records.values():
for record in rec_list:
mirrored_records.append(record)
if pv not in self._mirrored_records:
self._mirrored_records[pv] = []
if line["mirror type"] == "basic":
self._mirrored_records[monitor[0]].append(output_record)
elif line["mirror type"] == "inverse":
# Other transformation types are not yet supported.
transformation = transform(numpy.invert, output_record)
self._mirrored_records[monitor[0]].append(transformation)
elif line["mirror type"] == "summate":
summation_object = summate(input_records, output_record)
for pv in monitor:
self._mirrored_records[pv].append(summation_object)
elif line["mirror type"] == "collate":
collation_object = collate(input_records, output_record)
for pv in monitor:
self._mirrored_records[pv].append(collation_object)
elif line["mirror type"] == "refresh":
refresh_object = refresher(self, line["out"])
self._mirrored_records[pv].append(refresh_object)
else:
raise TypeError(
f"{line['mirror type']} is not a valid mirror type; please "
"enter a currently supported type from: 'basic', 'summate', "
"'collate', 'inverse', and 'refresh'."
)
mirrored_records = []
for rec_list in self._mirrored_records.values():
for record in rec_list:
mirrored_records.append(record)
self._update_record_names(mirrored_records)

def monitor_mirrored_pvs(self):
Expand Down Expand Up @@ -505,20 +514,21 @@ def setup_tune_feedback(self, tune_csv=None):
"start-up, please provide one now; i.e. "
"server.start_tune_feedback('<path_to_csv>')"
)
csv_reader = csv.DictReader(open(self._tune_fb_csv_path))
if not self._pv_monitoring:
self.monitor_mirrored_pvs()
self.tune_feedback_status = True
for line in csv_reader:
offset_record = self._record_names[line["offset"]]
self._offset_pvs[line["set pv"]] = offset_record
mask = callback_offset(self, line["set pv"], offset_record)
try:
self._monitored_pvs[line["delta"]] = camonitor(
line["delta"], mask.callback
)
except Exception as e:
warn(e, stacklevel=1)
with open(self._tune_fb_csv_path) as f:
csv_reader = csv.DictReader(f)
if not self._pv_monitoring:
self.monitor_mirrored_pvs()
self.tune_feedback_status = True
for line in csv_reader:
offset_record = self._record_names[line["offset"]]
self._offset_pvs[line["set pv"]] = offset_record
mask = callback_offset(self, line["set pv"], offset_record)
try:
self._monitored_pvs[line["delta"]] = camonitor(
line["delta"], mask.callback
)
except Exception as e:
warn(e, stacklevel=1)

def stop_all_monitoring(self):
"""Stop monitoring mirrored records and tune feedback offsets."""
Expand Down